My exec_in_executor() API
Extension of asyncio.to_thread()
async def to_thread(func, /, *args, **kwargs):
"""Asynchronously run function *func* in a separate thread..."
loop = events.get_running_loop()
ctx = contextvars.copy_context()
func_call = functools.partial(ctx.run, func, *args, **kwargs)
return await loop.run_in_executor(None, func_call)This is utility function that exists in asyncio. You can read detail explanation about it here https://alex-ber.medium.com/how-to-call-i-o-bound-operation-from-async-context-565a504548b0
Also, there API for more advanced cases here My AsyncExecutionQueue https://alex-ber.medium.com/my-asyncexecutionqueue-4001ac168675.
Note: I have implicit assumption, that MainThread has all infrastructure for running
asyncfunction (runningevent loop, for example) is set. It can be done by FastAPI, Quart,asyncio.run(), it doesn’t really matter, but it is done.
Quick recap:
Quote:
We often need to execute a blocking function call within an
asyncioapplication because, in practice, most workloads include a mix of IO-bound operations and also CPU-bound operations.This could be for many reasons, such as:
* To execute a CPU-bound task like calculating something.
* To execute a blocking IO-bound task like reading or writing from a file.
* To call into a third-party library that does not support
asyncioyet.Making a blocking call directly in an
asyncioprogram will cause the event loop to stop while the blocking call is executing. It will not allow other coroutines to run in the background.This can be prevented by running the blocking call outside of the event loop, which we can do with
asyncio.to_thread().
# execute a function in a separate thread
import asyncio
await asyncio.to_thread(blocking_io_operation)How to run blocking task with asyncio
The
asyncio.to_thread()function takes a function to execute and any arguments. It returns acoroutinethat can be awaited or scheduled as an independent task. The function is then executed in a separate thread.
https://dev.to/hackerculture/python-asyncio-a-guide-to-asynchronous-programming-43j2
You can read more details about
asyncio.to_thread()here How to call I/O bound operation from async context.
So, how asyncio.to_thread() works?
- This is
asyncfunction, so you can’t naturally call it from regular function (event if you have running loop in your thread). - It takes
regularfunctionfuncand it’s arguments, copies allContextVars that exist on the execution context of the runningTaks(copy_context().runis responsible for this). - It uses
asyncio’s default ThreadPoolExecutorto runfunc.That is, it is running on another thread thanto_thread(). - Because immediate
await(return await loop.run_in_executor(None, func_call)) thecall to_thread()is not returned until we have result available. In more details:
loop.run_in_executor(None, func_call) returns asynio.Future, and we immediately await on it.
Now, what asyncio.to_thread() do?
- So, your code will fire
funcin differentThread, your event-loop will be able to run anothercoroutines, whilefuncis executing in anotherThreadand when it is done, you will get result on the caller ofasyncio.to_thread()coroutine and continue to run as usual.
Now, we will try to improve the implementation.
The source code you can found here. It is available as part of my AlexBerUtils s project.
You can install AlexBerUtils from PyPi:
python -m pip install -U alex-ber-utils
See here for more details explanation on how to install.
Why asyncio.to_thread() is not just regular function? Why only default asyncio executor supported?
Indeed, this change means that it can’t await on loop.run_in_executor(). But it can just return asyncio.Future(that is returned by loop.run_in_executor()). If the caller of asyncio.to_thread() is coroutine, this change is transparent to it, if it want to get result immediately it will await on asyncio.to_thread() anyway.
Let start with very simple example. We’re in async context and have regular function, say sample_function(), that has blocking I/O call. We can’t run it on the main event loop, because it going to block it. We should run it in another thread. Good practice is to use ThreadPoolExecutor.
#1. From async context we're calling sync function.
import time
import asyncio
import atexit
from concurrent.futures import ThreadPoolExecutor
from alexber.utils.thread_locals import exec_in_executor
EXECUTOR = ThreadPoolExecutor()
def blocking_io(x: int, y: int) -> int:
time.sleep(5) # mimicking blocking I/O call
return x + y
def close_executor() -> None:
EXECUTOR.shutdown(wait=True)
atexit.register(close_executor)
# Alternative using ExitStack:
# with contextlib.ExitStack() as stack:
# stack.enter_context(EXECUTOR)
# or
# stack.callback(EXECUTOR.shutdown, wait=True)
async def main() -> None:
result = await exec_in_executor(EXECUTOR, blocking_io, 2, 3)
print(result)
if __name__ == "__main__":
asyncio.run(main())You can change the line
result = await exec_in_executor(EXECUTOR, blocking_io, 2, 3)to
result = await asyncio.to_thread(blocking_io, 2, 3)and you will get the same result.
What are the differences?
There are 2 main differences.
asyncio.to_thread()run with defaultThreadPoolExecutor. Inexec_in_executor()there multiple ways to set one.- There is side-effect that outlives,
sample_function()execution and stay on the Thread fromThreadPoolExecutorthat run it.
Actually, there are multiple ways to pass executor:
- You can pass it explicitly on each call to
exec_in_executor()as in example above. - You can pass executor in
initConf()function. If passed it will be used globally by all threads (provided executor was not passed inexec_in_executor()itself). - You can pass
Nonetoexec_in_executor()(provided you didn’t set global executor byinitConf()) thanasyncio’s default ThreadPoolExecutorwill be used.
Note:
asynciodoesn’t have publicget_default_executor(). The next best thing you can do it useloop.set_default_executor()(for example,asyncio.get_event_loop().set_default_executor()). The problem is, that this is not “global” setting, but only for one loop. If somewhere in the code, anotherloopwill be createdasyncio’s default ThreadPoolExecutorwill be used, unless you will explicitly setdefault_executorto the same that yousetin previousloop.
So, I decided to avoid all of these all together and to write very simple custom mechanism described above.
The second point “side-effect that outlives, sample_function() execution” can be viewed as drawback, but it can be viewed also as forward optimization.
If we want to run not only regular/sync function as asyncio.to_thread(), if we want to extend out API to async function, we should provide running event loop for them. I see only 2 way to do it:
- “Reuse” event loop from the MainThread.
- Create new
event loopper Thread inside executor.
First option was explored in lift_to_async() API. The experience was pretty bad. You should craft the caller side in very specific way (for example, you shouldn’t be on the MainThread, otherwise, your event loop will be blocked, so in order to avoid this, you will get exception; still bad solution).
Second option is to ensure that every thread in the executor has set distinct event loop before the caller “real” code is called. This is what ensure_thread_event_loop() for. In the final release, while this function remain public, you can omit a call to it, it will be called implicitly. Note, however, that it will be called regardless whether regular or async function you want to execute (the rationale for sync function is to enable call to coroutine later on).
Internally, ensure_thread_event_loop()function check, whether we have been created event loop for the current thread (from ThreadPoolExecutor). It do it, using thread locals.
If so, this means that we’ve already set event loop on current thread, so there is nothing to do.
Otherwise,
# If no event loop is present, create a new one
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)Also, this event loopis stored in the thread locals for future use.
Advantage:
- We’re creating
event loopperthreadonly once. In the next time when we will run another coroutine via executor , we will skip this step, saving runtime.
Disadvantage:
- Unrelated pieces of the code now affects each other.
- If executor that we use in
exec_in_executor()is also used somewhere else, such asasyncio’s default ThreadPoolExecutor, for example, it can lead to unexpected behavior.
Note: In general, it is recommended either not to use
asyncio.to_thread()ever in your and your libraries code, or to use dedicatedThreadPoolExecutorforexec_in_executor().Structure Concurrency Note: In the line below:
result = await exec_in_executor(EXECUTOR, blocking_io, 2, 3)
exec_in_executor()executesblocking_io()in another thread.
Program flow effectively splits: first part is execution ofblocking_io()in another thread, the second branch of this flow is current task execution, that “joins”/awaitsthe end ofblocking_io()execution. Afterawait,we have only one flow.
Lifecycle of EXECUTOR
if you’re using asyncio’s default ThreadPoolExecutor, you shouldn’t worry about EXECUTOR's lifecycle.
Another option, is use locally defined ThreadPoolExecutor like this:
from concurrent.futures import ThreadPoolExecutor
from alexber.utils.thread_locals import exec_in_executor
async def some_coroutine() -> None:
with ThreadPoolExecutor() as executor:
result = await exec_in_executor(executor, blocking_io, 2, 3)
print(result)The drawback of this method is obvious: each time some_coroutine() will be called new ThreadPoolExecutor() will be allocated. This is at waste of runtime and memory, due to it has resource allocation overhead.
However, If it is “main” function, as in provided example, or lifespan analogue of FastAPI, this is ok from resource allocation management point of view. It has minor drawback, that you should pass it explicitly to your sync/async functions. You can put it as global variable of cause to solve last issue (ThreadPoolExecutor is thread-safe, otherwise, ContextVar should be used instead).
Side note
Another options, in the “main” function you can do:
from concurrent.futures import ThreadPoolExecutor
from alexber.utils.thread_locals import exec_in_executor
EXECUTOR = None
async def main():
with ThreadPoolExecutor() as executor:
global EXECUTOR
EXECUTOR = executor
result = await exec_in_executor(EXECUTOR, blocking_io, 2, 3)
print(result)Note: Using a
withstatement to manage aThreadPoolExecutoris often preferable because it ensures that resources are automatically cleaned up when the block of code is exited. This automatic cleanup occurs even if an exception is raised, providing a robust mechanism for resource management. The context manager's__exit__method is called when the block is exited, which handles the shutdown of the executor. This approach reduces the risk of leaving resources open or in an inconsistent state, as the executor is guaranteed to be properly shut down.In contrast, using the
atexitmodule to register cleanup functions relies on the program terminating normally.atexitfunctions are executed when a program exits naturally or through a controlled exit, such as callingsys.exit(). This includes situations where the program reaches the end of its execution or explicitly callssys.exit(). In multi-threaded programs, if the main thread finishes and there are no other non-daemon threads running, the program is considered to have terminated normally, andatexitfunctions will be called. Additionally, when the Python interpreter shuts down in a controlled manner, such as at the end of a script or after asys.exit()call,atexitfunctions are executed.However,
atexitfunctions may not be executed if the program terminates abnormally. This can occur if an uncaught exception causes the program to crash, although exceptions likeSystemExitorKeyboardInterruptthat are caught and handled will still allowatexitfunctions to run. If the program is forcibly terminated by the operating system or by a user action, such as using thekillcommand or pressingCtrl+C,atexitfunctions may not be executed. Additionally, if the program experiences a crash or segmentation fault,atexitfunctions will not be called.
or
import contextlib
from concurrent.futures import ThreadPoolExecutor
from alexber.utils.thread_locals import exec_in_executor
EXECUTOR = ThreadPoolExecutor()
async def main():
with contextlib.ExitStack() as stack:
stack.enter_context(EXECUTOR)
result = await exec_in_executor(EXECUTOR, blocking_io, 2, 3)
print(result)or
import contextlib
from concurrent.futures import ThreadPoolExecutor
from alexber.utils.thread_locals import exec_in_executor
EXECUTOR = ThreadPoolExecutor()
async def main():
with contextlib.ExitStack() as stack:
stack.callback(EXECUTOR.shutdown, wait=True)
result = await exec_in_executor(EXECUTOR, blocking_io, 2, 3)
print(result)or
import contextlib
from concurrent.futures import ThreadPoolExecutor
from alexber.utils.thread_locals import exec_in_executor, ensure_thread_event_loop
EXECUTOR = None
async def main():
async with contextlib.AsyncExitStack() as stack: #just to demonstrate also this option
global EXECUTOR
EXECUTOR = stack.enter_context(
ThreadPoolExecutor(initializer=ensure_thread_event_loop))
result = await exec_in_executor(EXECUTOR, blocking_io, 2, 3)
print(result)Note:
1.AsyncExitStackwas used for to illustrate thatThreadPoolExecutorcan use bothsyncandasync“stack”.
2. UsingExitStack/AsyncExitStackis best when you have multiple context-managers in order to avoid nesting. As bonus you can call also call tostack.pop_all(), which is used to remove all context managers and cleanup callbacks from the stack without invoking their exit or cleanup procedures. See below.
2. As I said aboveensure_thread_event_loopwill be called implicitly, so you can omitinitializer=ensure_thread_event_loop.
Let’s elaborate on stack.pop_all(). For example, before adding lifespan to FastAPI, it has only @app.on_event(“startup”) and @app.on_event(“shutdown”) that can be used as decorator on two your functions. So, in order to implement the same logic you may write something like:
from fastapi import FastAPI
import contextlib
from concurrent.futures import ThreadPoolExecutor
app = FastAPI()
stack = contextlib.AsyncExitStack()
executor = None
@app.on_event("startup")
async def startup_event():
global executor
# Use async with to manage the AsyncExitStack
async with stack:
# Create and enter the ThreadPoolExecutor into the stack
executor = stack.enter_context(ThreadPoolExecutor())
print("Application has started with ThreadPoolExecutor.")
# Prevent the stack from closing at the end of the with block
stack.pop_all()
@app.on_event("shutdown")
async def shutdown_event():
# Manually cleanup all resources in the stack
await stack.aclose()
print("Application is shutting down, resources are cleaned up.")
@app.get("/")
async def read_root():
return {"Hello": "World"}
You see yourself, how verbose it is, and still you should pay attention, because:
Note:
1. Usingglobalvariables can lead to issues with state management and concurrency, especially in larger applications.ThreadPoolExecutoris thread safe-no problems here.
ButAsyncExitStackis not inherently thread-safe. You should limit access to thestackto thestartup_event()andshutdown_event()only. Otherwise, you can have concurrency issues.
2. In general useContextVarto store “global” or request/task scope vaiables.
So, for clarity, I will go with atexit, but bare in your mind, that in real application you should, probably, use context-manager in one way or another.
P.S. This is how the same code looks like with lifespan.
from fastapi import FastAPI
import contextlib
from concurrent.futures import ThreadPoolExecutor
async def app_lifespan():
async with contextlib.AsyncExitStack() as stack:
# Create and enter the ThreadPoolExecutor into the stack
executor = await stack.enter_async_context(ThreadPoolExecutor())
print("Application has started with ThreadPoolExecutor.")
yield
# Cleanup will happen here when the context exits
print("Application is shutting down, resources are cleaned up.")
app = FastAPI(lifespan=app_lifespan)
@app.get("/")
async def read_root():
return {"Hello": "World"}END OF SIDE NOTE.
Before side note, we were looking on the lifecycle of ThreadPoolExecutor.
The last option left is to defined ThreadPoolExecutor as global variable on the first place as in the example above. If we do so, we should ensure that it will be properly closed, all resources will be released and all the task running will be either canceled (1) or will have opportunely to finish (2).
In the example above ThreadPoolExecutor is defined as global variable and uses atexit built-in model that registers a function to be executed upon normal program termination, we register function that shutdowns ThreadPoolExecutor upon normal program termination and waits for all running tasks to finish, option (2).
END OF Lifecycle of EXECUTOR
Let’s look on more complex use-case:
- Suppose we have some library/framework (it can be simple
asyncio.run()) that taking care about infrastructure ofasync/awaitand spinning event loop in the MainThread. - Suppose that for whatever reason we found ourselves in regular/sync
some_legacy_function(). - We want to call our regular/sync
blocking_io()function, that we want eventually to call asyncsample_coroutine()function, execute it and return it’s value.
Having exec_in_executor() as regular function makes it available for calls in the sync-context. For examples, it enable to define exec_in_executor_threading_future() function that return concurrent.futures.Future. Let see client code first:
#2. From sync contex we're calling sync function that calls async function.
import time
import asyncio
import atexit
from concurrent.futures import ThreadPoolExecutor
from alexber.utils.thread_locals import exec_in_executor_threading_future
EXECUTOR = ThreadPoolExecutor()
async def sample_coroutine(x: int, y: int) -> int:
await asyncio.sleep(1)
#time.sleep(10)
return x * y
def blocking_io():
time.sleep(5) # mimicking blocking I/O call
async def helper_example_usage():
#time.sleep(15)
result = await sample_coroutine(2, 3) # Example values for x and y
return result
loop = asyncio.get_event_loop()
return loop.run_until_complete(helper_example_usage())
def close_executor() -> None:
EXECUTOR.shutdown(wait=True)
atexit.register(close_executor)
# Alternative using ExitStack:
# with contextlib.ExitStack() as stack:
# stack.enter_context(EXECUTOR)
# or
# stack.callback(EXECUTOR.shutdown, wait=True)
def some_legacy_function() -> None:
fut = exec_in_executor_threading_future(EXECUTOR, blocking_io)
result = fut.result()
print(result)
async def main() -> None:
some_legacy_function()
if __name__ == "__main__":
asyncio.run(main())
And below is simplified implementation of exec_in_executor_threading_future(). It work both for sync and sync function, here for clarity there is only sync case.
import concurrent
from alexber.utils.thread_locals import exec_in_executor
def exec_in_executor_threading_future(executor, func, *args, **kwargs):
future = concurrent.futures.Future()
def wrapper(future):
result = func(*args, **kwargs)
future.set_result(result)
exec_in_executor(executor, wrapper, future)
return futureAs you can see, because exec_in_executor() is regular/sync function we can call it from sync context. We’re disregarding asyncio.Future it returns, instead we’re implemented custom mechanism to retrieve result of func(). We’re defining threading.Future that will be returned. We’re defining wrapper() regular/sync function that will be send together with executor and the feature above to exec_in_executor() API.
In side wrapper() function that will be run on another Thread, we’re calling to func that we’re wrapping, get the result and set it on the future.
Let’s go back to the usage of exec_in_executor_threading_future().
So, suppose we have FastAPI application (or any other or just asyncio.run()) that taking care about infrastructure of async/await and spinning event loop in the MainThread.
So, our application’s code starts from main(). For whatever reason, we’re found ourselves in regular/sync function, for example in some_legacy_function(). Now, we want to call sync function that do blocking_io() call and it want to call async function (some rewritten legacy code, for example), sample_coroutine().
So, from some_legacy_function() we can use exec_in_executor_threading_future() API and run blocking_io(). This API also responsible for spinning event loop, if needed, in the Thread from executor, that runs blocking_io().
Structure Concurrency Note: In this example,
exec_in_executor_threading_future()splits the flow. MainThread is going to “join”Threadfromexecutor, that runsblocking_io()in thefut.result(). After this call we have single branch flow.
So, on another flow branch we’re inside def blocking_io() on some thread, let say, executor-thread-1, (other that MainThread) inside executor.
If we were using
asyncio.to_thread()API, there were no spinningevent loopinsidelocking_io().exec_in_executor()ensures that this thread do have spinningevent loop.
This is the reason that line:
loop = asyncio.get_event_loop()works. It retrieves this event loop. Nothing is run on it. So,
loop.run_until_complete(helper_example_usage())is also work. Note, that helper_example_usage() is async function, so we’re entering async context.
Structure Concurrency Note: Here,
helper_example_usage()splits the flow.executor-thread-1is going to “join” the result it will get fromloop.run_until_complete(helper_example_usage()).executor-thread-1will not proceed until the result was “joint”. After this call we have single branch flow, that returns fromblocking_io().
Now, when we’re in async context inside helper_example_usage(), we can do all regular things that we can do with async/await. The only caveat is, that we’re not running in MainThread.
In this example, there is simple async/await to sample_coroutine(), that make some calculation (solely for demo purposes). Everything is working as expected.
This use-case is simpler than previous one:
- Suppose we have some library/framework (it can be simple
asyncio.run()) that taking care about infrastructure ofasync/awaitand spinning event loop in the MainThread. - We want to call our regular/sync
blocking_io()function, that we want eventually to call asyncsample_coroutine()function, execute it and return it’s value.
So, suppose we have FastAPI application (or any other or just asyncio.run()) that taking care about infrastructure of async/await and spinning event loop in the MainThread.
So, our application’s code starts from main(). Now, we have regular/sync function blocking_io() . So, we want to run in another thread. We can use exec_in_executor() directly. This API also responsible for spinning event loop, if needed, in the Thread from executor, that runs blocking_io().
Structure Concurrency Note: In this example,
exec_in_executor()splits the flow. MainThread is going to “join”Threadfromexecutor, that runsblocking_io()in theawait(onasyncio.Future). After this call we have single branch flow.
#3 From async context we're calling sync function that calls async function.
import time
import asyncio
import atexit
from concurrent.futures import ThreadPoolExecutor
from alexber.utils.thread_locals import exec_in_executor
EXECUTOR = ThreadPoolExecutor()
async def sample_coroutine(x: int, y: int) -> int:
await asyncio.sleep(1)
#time.sleep(10)
return x * y
def blocking_io() -> None:
time.sleep(5) # mimicking blocking I/O call
async def helper_example_usage():
#time.sleep(15)
result = await sample_coroutine(2, 3) # Example values for x and y
return result
loop = asyncio.get_event_loop()
return loop.run_until_complete(helper_example_usage())
def close_executor() -> None:
EXECUTOR.shutdown(wait=True)
atexit.register(close_executor)
# Alternative using ExitStack:
# with contextlib.ExitStack() as stack:
# stack.enter_context(EXECUTOR)
# or
# stack.callback(EXECUTOR.shutdown, wait=True)
async def main() -> None:
result = await exec_in_executor(EXECUTOR, blocking_io)
print(result)
if __name__ == "__main__":
asyncio.run(main())After we got to blocking_io(), all other is similar as in previous example. Only, the “joining” point here is await(on asyncio.Future), not fut.result().
Let look on another use-case.
So, suppose we have FastAPI application (or any other or just asyncio.run()) that taking care about infrastructure of async/await and spinning event loop in the MainThread.
So, our application’s code starts from main(). Now, we want to call async function sample_coroutine() call.
So, from main() we can use exec_in_executor() API and run sample_coroutine(). This API also responsible for spinning event loop, if needed, in the Thread from executor, that runs sample_coroutine(). So, when we’re in sample_coroutine() there is running event loop.
#4 From async context we're calling async function.
#import time
import asyncio
import atexit
from concurrent.futures import ThreadPoolExecutor
from alexber.utils.thread_locals import exec_in_executor
EXECUTOR = ThreadPoolExecutor()
async def sample_coroutine(x: int, y: int) -> int:
await asyncio.sleep(1)
#time.sleep(10)
return x * y
def close_executor() -> None:
EXECUTOR.shutdown(wait=True)
atexit.register(close_executor)
# Alternative using ExitStack:
# with contextlib.ExitStack() as stack:
# stack.enter_context(EXECUTOR)
# or
# stack.callback(EXECUTOR.shutdown, wait=True)
async def main() -> None:
result = await exec_in_executor(EXECUTOR, sample_coroutine, 2, 3)
print(result)
if __name__ == "__main__":
asyncio.run(main())Note:
It should be very rare in real production code.
Ifsample_coroutine()is “well-behaved” coroutine, there is no need to run in on another thread.
But sometimes, for example, it use blocking I/O somewhere in your libraries stack, so refactoring is painful. So, for a meanwhile, you can use this as workaround.Structure Concurrency Note: In this example,
exec_in_executor()splits the flow. MainThread is going to “join”Threadfromexecutor, that runssample_coroutine()in theawait(onasyncio.Future). After this call we have single branch flow.
Let look on another use-case.
So, suppose we have FastAPI application (or any other or just asyncio.run()) that taking care about infrastructure of async/await and spinning event loop in the MainThread.
So, our application’s code starts from main(). For whatever reason, we’re found ourselves in regular/sync function, for example in some_legacy_function(). Now, we want to call async function sample_coroutine().
So, from some_legacy_function() we can use exec_in_executor_threading_future() API and run sample_coroutine(). This API also responsible for spinning event loop, if needed, in the Thread from executor, that runs sample_coroutine().
#5 From sync context calls async function.
#import time
import asyncio
import atexit
from concurrent.futures import ThreadPoolExecutor
from alexber.utils.thread_locals import exec_in_executor_threading_future
EXECUTOR = ThreadPoolExecutor()
async def sample_coroutine(x: int, y: int) -> int:
await asyncio.sleep(1)
#time.sleep(10)
return x * y
def close_executor() -> None:
EXECUTOR.shutdown(wait=True)
atexit.register(close_executor)
# Alternative using ExitStack:
# with contextlib.ExitStack() as stack:
# stack.enter_context(EXECUTOR)
# or
# stack.callback(EXECUTOR.shutdown, wait=True)
def some_legacy_function():
fut = exec_in_executor_threading_future(EXECUTOR, sample_coroutine, 2, 3)
result = fut.result()
print(result)
async def main() -> None:
some_legacy_function()
if __name__ == "__main__":
asyncio.run(main())Note:
It should be very rare in real production code.
Ifsample_coroutine()is “well-behaved” coroutine, there is no need to run in on another thread.
But sometimes, for example, it use blocking I/O somewhere in your libraries stack, so refactoring is painful. So, for a meanwhile, you can use this as workaround.Structure Concurrency Note: In this example,
exec_in_executor_threading_future()splits the flow. MainThread is going to “join”Threadfromexecutor, that runssample_coroutine()in thefut.result(). After this call we have single branch flow.
Last thoughts
While exec_in_executor()/exec_in_executor_threading_future() primitives by themselves does not promote Structured Concurrency, their intended usage, as demonstrated above enables to write such code. For more information, what Structured Concurrency is and what “well-behaved coroutine” is (I use the later term wagly, but I have thorough breakdown), see
Structured Concurrency as extension of Go statement considered harmful.
