My exec_in_executor() API

alex_ber
16 min readNov 26, 2024

--

Extension of asyncio.to_thread()

async def to_thread(func, /, *args, **kwargs):
"""Asynchronously run function *func* in a separate thread..."
loop = events.get_running_loop()
ctx = contextvars.copy_context()
func_call = functools.partial(ctx.run, func, *args, **kwargs)
return await loop.run_in_executor(None, func_call)

This is utility function that exists in asyncio. You can read detail explanation about it here https://alex-ber.medium.com/how-to-call-i-o-bound-operation-from-async-context-565a504548b0

Also, there API for more advanced cases here My AsyncExecutionQueue https://alex-ber.medium.com/my-asyncexecutionqueue-4001ac168675.

Note: I have implicit assumption, that MainThread has all infrastructure for running async function (running event loop, for example) is set. It can be done by FastAPI, Quart, asyncio.run(), it doesn’t really matter, but it is done.

Quick recap:

Quote:

We often need to execute a blocking function call within an asyncio application because, in practice, most workloads include a mix of IO-bound operations and also CPU-bound operations.

This could be for many reasons, such as:

* To execute a CPU-bound task like calculating something.

* To execute a blocking IO-bound task like reading or writing from a file.

* To call into a third-party library that does not support asyncio yet.

Making a blocking call directly in an asyncio program will cause the event loop to stop while the blocking call is executing. It will not allow other coroutines to run in the background.

This can be prevented by running the blocking call outside of the event loop, which we can do with asyncio.to_thread().

# execute a function in a separate thread
import asyncio

await asyncio.to_thread(blocking_io_operation)

How to run blocking task with asyncio

The asyncio.to_thread() function takes a function to execute and any arguments. It returns a coroutine that can be awaited or scheduled as an independent task. The function is then executed in a separate thread.

https://dev.to/hackerculture/python-asyncio-a-guide-to-asynchronous-programming-43j2

You can read more details about asyncio.to_thread() here How to call I/O bound operation from async context.

So, how asyncio.to_thread() works?

  • This is async function, so you can’t naturally call it from regular function (event if you have running loop in your thread).
  • It takes regular function func and it’s arguments, copies all ContextVars that exist on the execution context of the running Taks (copy_context().run is responsible for this).
  • It uses asyncio’s default ThreadPoolExecutor to run func. That is, it is running on another thread than to_thread().
  • Because immediate await (return await loop.run_in_executor(None, func_call)) the call to_thread() is not returned until we have result available. In more details:

loop.run_in_executor(None, func_call) returns asynio.Future, and we immediately await on it.

Now, what asyncio.to_thread() do?

  • So, your code will fire func in different Thread, your event-loop will be able to run another coroutines, while func is executing in another Thread and when it is done, you will get result on the caller of asyncio.to_thread() coroutine and continue to run as usual.

Now, we will try to improve the implementation.

The source code you can found here. It is available as part of my AlexBerUtils s project.

You can install AlexBerUtils from PyPi:

python -m pip install -U alex-ber-utils

See here for more details explanation on how to install.

Why asyncio.to_thread() is not just regular function? Why only default asyncio executor supported?

Indeed, this change means that it can’t await on loop.run_in_executor(). But it can just return asyncio.Future(that is returned by loop.run_in_executor()). If the caller of asyncio.to_thread() is coroutine, this change is transparent to it, if it want to get result immediately it will await on asyncio.to_thread() anyway.

Let start with very simple example. We’re in async context and have regular function, say sample_function(), that has blocking I/O call. We can’t run it on the main event loop, because it going to block it. We should run it in another thread. Good practice is to use ThreadPoolExecutor.

#1. From async context we're calling sync function.
import time
import asyncio
import atexit
from concurrent.futures import ThreadPoolExecutor

from alexber.utils.thread_locals import exec_in_executor

EXECUTOR = ThreadPoolExecutor()

def blocking_io(x: int, y: int) -> int:
time.sleep(5) # mimicking blocking I/O call

return x + y

def close_executor() -> None:
EXECUTOR.shutdown(wait=True)

atexit.register(close_executor)
# Alternative using ExitStack:
# with contextlib.ExitStack() as stack:
# stack.enter_context(EXECUTOR)
# or
# stack.callback(EXECUTOR.shutdown, wait=True)

async def main() -> None:
result = await exec_in_executor(EXECUTOR, blocking_io, 2, 3)
print(result)

if __name__ == "__main__":
asyncio.run(main())

You can change the line

result = await exec_in_executor(EXECUTOR, blocking_io, 2, 3)

to

result = await asyncio.to_thread(blocking_io, 2, 3)

and you will get the same result.

What are the differences?

There are 2 main differences.

  1. asyncio.to_thread() run with default ThreadPoolExecutor. In exec_in_executor() there multiple ways to set one.
  2. There is side-effect that outlives, sample_function() execution and stay on the Thread from ThreadPoolExecutorthat run it.

Actually, there are multiple ways to pass executor:

  • You can pass it explicitly on each call to exec_in_executor() as in example above.
  • You can pass executor in initConf() function. If passed it will be used globally by all threads (provided executor was not passed in exec_in_executor()itself).
  • You can pass None to exec_in_executor() (provided you didn’t set global executor by initConf()) than asyncio’s default ThreadPoolExecutor will be used.

Note: asyncio doesn’t have public get_default_executor(). The next best thing you can do it use loop.set_default_executor() (for example, asyncio.get_event_loop().set_default_executor()). The problem is, that this is not “global” setting, but only for one loop. If somewhere in the code, another loop will be created asyncio’s default ThreadPoolExecutor will be used, unless you will explicitly set default_executor to the same that you set in previous loop.

So, I decided to avoid all of these all together and to write very simple custom mechanism described above.

The second point “side-effect that outlives, sample_function() execution” can be viewed as drawback, but it can be viewed also as forward optimization.

If we want to run not only regular/sync function as asyncio.to_thread(), if we want to extend out API to async function, we should provide running event loop for them. I see only 2 way to do it:

  1. “Reuse” event loop from the MainThread.
  2. Create new event loop per Thread inside executor.

First option was explored in lift_to_async() API. The experience was pretty bad. You should craft the caller side in very specific way (for example, you shouldn’t be on the MainThread, otherwise, your event loop will be blocked, so in order to avoid this, you will get exception; still bad solution).

Second option is to ensure that every thread in the executor has set distinct event loop before the caller “real” code is called. This is what ensure_thread_event_loop() for. In the final release, while this function remain public, you can omit a call to it, it will be called implicitly. Note, however, that it will be called regardless whether regular or async function you want to execute (the rationale for sync function is to enable call to coroutine later on).

Internally, ensure_thread_event_loop()function check, whether we have been created event loop for the current thread (from ThreadPoolExecutor). It do it, using thread locals.

If so, this means that we’ve already set event loop on current thread, so there is nothing to do.

Otherwise,

# If no event loop is present, create a new one
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

Also, this event loopis stored in the thread locals for future use.

Advantage:

  • We’re creating event loop per thread only once. In the next time when we will run another coroutine via executor , we will skip this step, saving runtime.

Disadvantage:

  • Unrelated pieces of the code now affects each other.
  • If executor that we use in exec_in_executor() is also used somewhere else, such as asyncio’s default ThreadPoolExecutor, for example, it can lead to unexpected behavior.

Note: In general, it is recommended either not to use asyncio.to_thread() ever in your and your libraries code, or to use dedicated ThreadPoolExecutor for exec_in_executor().

Structure Concurrency Note: In the line below:

result = await exec_in_executor(EXECUTOR, blocking_io, 2, 3)

exec_in_executor() executes blocking_io() in another thread.
Program flow effectively splits: first part is execution of blocking_io() in another thread, the second branch of this flow is current task execution, that “joins”/awaits the end of blocking_io() execution. After await,we have only one flow.

Lifecycle of EXECUTOR

if you’re using asyncio’s default ThreadPoolExecutor, you shouldn’t worry about EXECUTOR's lifecycle.

Another option, is use locally defined ThreadPoolExecutor like this:

from concurrent.futures import ThreadPoolExecutor
from alexber.utils.thread_locals import exec_in_executor

async def some_coroutine() -> None:
with ThreadPoolExecutor() as executor:
result = await exec_in_executor(executor, blocking_io, 2, 3)
print(result)

The drawback of this method is obvious: each time some_coroutine() will be called new ThreadPoolExecutor() will be allocated. This is at waste of runtime and memory, due to it has resource allocation overhead.

However, If it is “main” function, as in provided example, or lifespan analogue of FastAPI, this is ok from resource allocation management point of view. It has minor drawback, that you should pass it explicitly to your sync/async functions. You can put it as global variable of cause to solve last issue (ThreadPoolExecutor is thread-safe, otherwise, ContextVar should be used instead).

Side note

Another options, in the “main” function you can do:

from concurrent.futures import ThreadPoolExecutor
from alexber.utils.thread_locals import exec_in_executor

EXECUTOR = None

async def main():
with ThreadPoolExecutor() as executor:
global EXECUTOR
EXECUTOR = executor
result = await exec_in_executor(EXECUTOR, blocking_io, 2, 3)
print(result)

Note: Using a with statement to manage a ThreadPoolExecutor is often preferable because it ensures that resources are automatically cleaned up when the block of code is exited. This automatic cleanup occurs even if an exception is raised, providing a robust mechanism for resource management. The context manager's __exit__ method is called when the block is exited, which handles the shutdown of the executor. This approach reduces the risk of leaving resources open or in an inconsistent state, as the executor is guaranteed to be properly shut down.

In contrast, using the atexit module to register cleanup functions relies on the program terminating normally. atexit functions are executed when a program exits naturally or through a controlled exit, such as calling sys.exit(). This includes situations where the program reaches the end of its execution or explicitly calls sys.exit(). In multi-threaded programs, if the main thread finishes and there are no other non-daemon threads running, the program is considered to have terminated normally, and atexit functions will be called. Additionally, when the Python interpreter shuts down in a controlled manner, such as at the end of a script or after a sys.exit() call, atexit functions are executed.

However, atexit functions may not be executed if the program terminates abnormally. This can occur if an uncaught exception causes the program to crash, although exceptions like SystemExit or KeyboardInterrupt that are caught and handled will still allow atexit functions to run. If the program is forcibly terminated by the operating system or by a user action, such as using the kill command or pressing Ctrl+C, atexit functions may not be executed. Additionally, if the program experiences a crash or segmentation fault, atexit functions will not be called.

or

import contextlib
from concurrent.futures import ThreadPoolExecutor
from alexber.utils.thread_locals import exec_in_executor

EXECUTOR = ThreadPoolExecutor()

async def main():
with contextlib.ExitStack() as stack:
stack.enter_context(EXECUTOR)
result = await exec_in_executor(EXECUTOR, blocking_io, 2, 3)
print(result)

or

import contextlib
from concurrent.futures import ThreadPoolExecutor
from alexber.utils.thread_locals import exec_in_executor

EXECUTOR = ThreadPoolExecutor()

async def main():
with contextlib.ExitStack() as stack:
stack.callback(EXECUTOR.shutdown, wait=True)
result = await exec_in_executor(EXECUTOR, blocking_io, 2, 3)
print(result)

or

import contextlib
from concurrent.futures import ThreadPoolExecutor
from alexber.utils.thread_locals import exec_in_executor, ensure_thread_event_loop


EXECUTOR = None

async def main():
async with contextlib.AsyncExitStack() as stack: #just to demonstrate also this option
global EXECUTOR
EXECUTOR = stack.enter_context(
ThreadPoolExecutor(initializer=ensure_thread_event_loop))
result = await exec_in_executor(EXECUTOR, blocking_io, 2, 3)
print(result)

Note:
1. AsyncExitStack was used for to illustrate that ThreadPoolExecutor can use both sync and async “stack”.
2. Using
ExitStack/AsyncExitStack is best when you have multiple context-managers in order to avoid nesting. As bonus you can call also call to stack.pop_all(), which is used to remove all context managers and cleanup callbacks from the stack without invoking their exit or cleanup procedures. See below.
2. As I said above
ensure_thread_event_loop will be called implicitly, so you can omit initializer=ensure_thread_event_loop.

Let’s elaborate on stack.pop_all(). For example, before adding lifespan to FastAPI, it has only @app.on_event(“startup”) and @app.on_event(“shutdown”) that can be used as decorator on two your functions. So, in order to implement the same logic you may write something like:

from fastapi import FastAPI
import contextlib
from concurrent.futures import ThreadPoolExecutor

app = FastAPI()
stack = contextlib.AsyncExitStack()
executor = None

@app.on_event("startup")
async def startup_event():
global executor
# Use async with to manage the AsyncExitStack
async with stack:
# Create and enter the ThreadPoolExecutor into the stack
executor = stack.enter_context(ThreadPoolExecutor())
print("Application has started with ThreadPoolExecutor.")
# Prevent the stack from closing at the end of the with block
stack.pop_all()

@app.on_event("shutdown")
async def shutdown_event():
# Manually cleanup all resources in the stack
await stack.aclose()
print("Application is shutting down, resources are cleaned up.")

@app.get("/")
async def read_root():
return {"Hello": "World"}

You see yourself, how verbose it is, and still you should pay attention, because:

Note:
1. Using
global variables can lead to issues with state management and concurrency, especially in larger applications. ThreadPoolExecutor is thread safe-no problems here.
But
AsyncExitStack is not inherently thread-safe. You should limit access to the stack to the startup_event() and shutdown_event() only. Otherwise, you can have concurrency issues.
2.
In general use ContextVar to store “global” or request/task scope vaiables.

So, for clarity, I will go with atexit, but bare in your mind, that in real application you should, probably, use context-manager in one way or another.

P.S. This is how the same code looks like with lifespan.

from fastapi import FastAPI
import contextlib
from concurrent.futures import ThreadPoolExecutor

async def app_lifespan():
async with contextlib.AsyncExitStack() as stack:
# Create and enter the ThreadPoolExecutor into the stack
executor = await stack.enter_async_context(ThreadPoolExecutor())
print("Application has started with ThreadPoolExecutor.")
yield
# Cleanup will happen here when the context exits
print("Application is shutting down, resources are cleaned up.")

app = FastAPI(lifespan=app_lifespan)

@app.get("/")
async def read_root():
return {"Hello": "World"}

END OF SIDE NOTE.

Before side note, we were looking on the lifecycle of ThreadPoolExecutor.

The last option left is to defined ThreadPoolExecutor as global variable on the first place as in the example above. If we do so, we should ensure that it will be properly closed, all resources will be released and all the task running will be either canceled (1) or will have opportunely to finish (2).

In the example above ThreadPoolExecutor is defined as global variable and uses atexit built-in model that registers a function to be executed upon normal program termination, we register function that shutdowns ThreadPoolExecutor upon normal program termination and waits for all running tasks to finish, option (2).

END OF Lifecycle of EXECUTOR

Let’s look on more complex use-case:

  1. Suppose we have some library/framework (it can be simple asyncio.run()) that taking care about infrastructure of async/await and spinning event loop in the MainThread.
  2. Suppose that for whatever reason we found ourselves in regular/sync some_legacy_function().
  3. We want to call our regular/sync blocking_io() function, that we want eventually to call async sample_coroutine() function, execute it and return it’s value.

Having exec_in_executor() as regular function makes it available for calls in the sync-context. For examples, it enable to define exec_in_executor_threading_future() function that return concurrent.futures.Future. Let see client code first:

#2. From sync contex we're calling sync function that calls async function.
import time
import asyncio
import atexit
from concurrent.futures import ThreadPoolExecutor

from alexber.utils.thread_locals import exec_in_executor_threading_future


EXECUTOR = ThreadPoolExecutor()

async def sample_coroutine(x: int, y: int) -> int:
await asyncio.sleep(1)
#time.sleep(10)

return x * y

def blocking_io():
time.sleep(5) # mimicking blocking I/O call

async def helper_example_usage():
#time.sleep(15)
result = await sample_coroutine(2, 3) # Example values for x and y
return result

loop = asyncio.get_event_loop()
return loop.run_until_complete(helper_example_usage())

def close_executor() -> None:
EXECUTOR.shutdown(wait=True)

atexit.register(close_executor)
# Alternative using ExitStack:
# with contextlib.ExitStack() as stack:
# stack.enter_context(EXECUTOR)
# or
# stack.callback(EXECUTOR.shutdown, wait=True)

def some_legacy_function() -> None:
fut = exec_in_executor_threading_future(EXECUTOR, blocking_io)
result = fut.result()
print(result)

async def main() -> None:
some_legacy_function()

if __name__ == "__main__":
asyncio.run(main())

And below is simplified implementation of exec_in_executor_threading_future(). It work both for sync and sync function, here for clarity there is only sync case.

import concurrent
from alexber.utils.thread_locals import exec_in_executor

def exec_in_executor_threading_future(executor, func, *args, **kwargs):
future = concurrent.futures.Future()

def wrapper(future):
result = func(*args, **kwargs)
future.set_result(result)

exec_in_executor(executor, wrapper, future)
return future

As you can see, because exec_in_executor() is regular/sync function we can call it from sync context. We’re disregarding asyncio.Future it returns, instead we’re implemented custom mechanism to retrieve result of func(). We’re defining threading.Future that will be returned. We’re defining wrapper() regular/sync function that will be send together with executor and the feature above to exec_in_executor() API.

In side wrapper() function that will be run on another Thread, we’re calling to func that we’re wrapping, get the result and set it on the future.

Let’s go back to the usage of exec_in_executor_threading_future().

So, suppose we have FastAPI application (or any other or just asyncio.run()) that taking care about infrastructure of async/await and spinning event loop in the MainThread.

So, our application’s code starts from main(). For whatever reason, we’re found ourselves in regular/sync function, for example in some_legacy_function(). Now, we want to call sync function that do blocking_io() call and it want to call async function (some rewritten legacy code, for example), sample_coroutine().

So, from some_legacy_function() we can use exec_in_executor_threading_future() API and run blocking_io(). This API also responsible for spinning event loop, if needed, in the Thread from executor, that runs blocking_io().

Structure Concurrency Note: In this example, exec_in_executor_threading_future() splits the flow. MainThread is going to “join” Thread from executor, that runs blocking_io() in the fut.result(). After this call we have single branch flow.

So, on another flow branch we’re inside def blocking_io() on some thread, let say, executor-thread-1, (other that MainThread) inside executor.

If we were using asyncio.to_thread() API, there were no spinning event loop inside locking_io(). exec_in_executor() ensures that this thread do have spinning event loop.

This is the reason that line:

loop = asyncio.get_event_loop()

works. It retrieves this event loop. Nothing is run on it. So,

loop.run_until_complete(helper_example_usage())

is also work. Note, that helper_example_usage() is async function, so we’re entering async context.

Structure Concurrency Note: Here, helper_example_usage() splits the flow. executor-thread-1is going to “join” the result it will get from loop.run_until_complete(helper_example_usage()). executor-thread-1 will not proceed until the result was “joint”. After this call we have single branch flow, that returns from blocking_io().

Now, when we’re in async context inside helper_example_usage(), we can do all regular things that we can do with async/await. The only caveat is, that we’re not running in MainThread.

In this example, there is simple async/await to sample_coroutine(), that make some calculation (solely for demo purposes). Everything is working as expected.

This use-case is simpler than previous one:

  1. Suppose we have some library/framework (it can be simple asyncio.run()) that taking care about infrastructure of async/await and spinning event loop in the MainThread.
  2. We want to call our regular/sync blocking_io() function, that we want eventually to call async sample_coroutine() function, execute it and return it’s value.

So, suppose we have FastAPI application (or any other or just asyncio.run()) that taking care about infrastructure of async/await and spinning event loop in the MainThread.

So, our application’s code starts from main(). Now, we have regular/sync function blocking_io() . So, we want to run in another thread. We can use exec_in_executor() directly. This API also responsible for spinning event loop, if needed, in the Thread from executor, that runs blocking_io().

Structure Concurrency Note: In this example, exec_in_executor() splits the flow. MainThread is going to “join” Thread from executor, that runs blocking_io() in the await(on asyncio.Future). After this call we have single branch flow.

#3 From async context we're calling sync function that calls async function.
import time
import asyncio
import atexit
from concurrent.futures import ThreadPoolExecutor

from alexber.utils.thread_locals import exec_in_executor


EXECUTOR = ThreadPoolExecutor()

async def sample_coroutine(x: int, y: int) -> int:
await asyncio.sleep(1)
#time.sleep(10)

return x * y

def blocking_io() -> None:
time.sleep(5) # mimicking blocking I/O call

async def helper_example_usage():
#time.sleep(15)
result = await sample_coroutine(2, 3) # Example values for x and y
return result

loop = asyncio.get_event_loop()
return loop.run_until_complete(helper_example_usage())



def close_executor() -> None:
EXECUTOR.shutdown(wait=True)

atexit.register(close_executor)
# Alternative using ExitStack:
# with contextlib.ExitStack() as stack:
# stack.enter_context(EXECUTOR)
# or
# stack.callback(EXECUTOR.shutdown, wait=True)

async def main() -> None:
result = await exec_in_executor(EXECUTOR, blocking_io)
print(result)

if __name__ == "__main__":
asyncio.run(main())

After we got to blocking_io(), all other is similar as in previous example. Only, the “joining” point here is await(on asyncio.Future), not fut.result().

Let look on another use-case.

So, suppose we have FastAPI application (or any other or just asyncio.run()) that taking care about infrastructure of async/await and spinning event loop in the MainThread.

So, our application’s code starts from main(). Now, we want to call async function sample_coroutine() call.

So, from main() we can use exec_in_executor() API and run sample_coroutine(). This API also responsible for spinning event loop, if needed, in the Thread from executor, that runs sample_coroutine(). So, when we’re in sample_coroutine() there is running event loop.

#4 From async context we're calling async function.
#import time
import asyncio
import atexit
from concurrent.futures import ThreadPoolExecutor

from alexber.utils.thread_locals import exec_in_executor


EXECUTOR = ThreadPoolExecutor()

async def sample_coroutine(x: int, y: int) -> int:
await asyncio.sleep(1)
#time.sleep(10)

return x * y


def close_executor() -> None:
EXECUTOR.shutdown(wait=True)

atexit.register(close_executor)
# Alternative using ExitStack:
# with contextlib.ExitStack() as stack:
# stack.enter_context(EXECUTOR)
# or
# stack.callback(EXECUTOR.shutdown, wait=True)

async def main() -> None:
result = await exec_in_executor(EXECUTOR, sample_coroutine, 2, 3)
print(result)

if __name__ == "__main__":
asyncio.run(main())

Note:
It should be very rare in real production code.
If sample_coroutine() is “well-behaved” coroutine, there is no need to run in on another thread.
But sometimes, for example, it use blocking I/O somewhere in your libraries stack, so refactoring is painful. So, for a meanwhile, you can use this as workaround.

Structure Concurrency Note: In this example, exec_in_executor() splits the flow. MainThread is going to “join” Thread from executor, that runs sample_coroutine() in the await(on asyncio.Future). After this call we have single branch flow.

Let look on another use-case.

So, suppose we have FastAPI application (or any other or just asyncio.run()) that taking care about infrastructure of async/await and spinning event loop in the MainThread.

So, our application’s code starts from main(). For whatever reason, we’re found ourselves in regular/sync function, for example in some_legacy_function(). Now, we want to call async function sample_coroutine().

So, from some_legacy_function() we can use exec_in_executor_threading_future() API and run sample_coroutine(). This API also responsible for spinning event loop, if needed, in the Thread from executor, that runs sample_coroutine().

#5 From sync context calls async function.
#import time
import asyncio
import atexit
from concurrent.futures import ThreadPoolExecutor

from alexber.utils.thread_locals import exec_in_executor_threading_future


EXECUTOR = ThreadPoolExecutor()

async def sample_coroutine(x: int, y: int) -> int:
await asyncio.sleep(1)
#time.sleep(10)

return x * y


def close_executor() -> None:
EXECUTOR.shutdown(wait=True)

atexit.register(close_executor)
# Alternative using ExitStack:
# with contextlib.ExitStack() as stack:
# stack.enter_context(EXECUTOR)
# or
# stack.callback(EXECUTOR.shutdown, wait=True)


def some_legacy_function():
fut = exec_in_executor_threading_future(EXECUTOR, sample_coroutine, 2, 3)
result = fut.result()
print(result)

async def main() -> None:
some_legacy_function()

if __name__ == "__main__":
asyncio.run(main())

Note:
It should be very rare in real production code.
If sample_coroutine() is “well-behaved” coroutine, there is no need to run in on another thread.
But sometimes, for example, it use blocking I/O somewhere in your libraries stack, so refactoring is painful. So, for a meanwhile, you can use this as workaround.

Structure Concurrency Note: In this example, exec_in_executor_threading_future() splits the flow. MainThread is going to “join” Thread from executor, that runs sample_coroutine() in the fut.result(). After this call we have single branch flow.

Last thoughts

While exec_in_executor()/exec_in_executor_threading_future() primitives by themselves does not promote Structured Concurrency, their intended usage, as demonstrated above enables to write such code. For more information, what Structured Concurrency is and what “well-behaved coroutine” is (I use the later term wagly, but I have thorough breakdown), see

Structured Concurrency as extension of Go statement considered harmful.

--

--

alex_ber
alex_ber

Written by alex_ber

Senior Software Engineer at Pursway

No responses yet