Extension of asyncio.to_thread()
async def to_thread(func, /, *args, **kwargs):
"""Asynchronously run function *func* in a separate thread..."
loop = events.get_running_loop()
ctx = contextvars.copy_context()
func_call = functools.partial(ctx.run, func, *args, **kwargs)
return await loop.run_in_executor(None, func_call)
This is utility function that exists in asyncio
. You can read detail explanation about it here https://alex-ber.medium.com/how-to-call-i-o-bound-operation-from-async-context-565a504548b0
Also, there API for more advanced cases here My AsyncExecutionQueue
https://alex-ber.medium.com/my-asyncexecutionqueue-4001ac168675.
Note: I have implicit assumption, that MainThread has all infrastructure for running
async
function (runningevent loop
, for example) is set. It can be done by FastAPI, Quart,asyncio.run()
, it doesn’t really matter, but it is done.
Quick recap:
Quote:
We often need to execute a blocking function call within an
asyncio
application because, in practice, most workloads include a mix of IO-bound operations and also CPU-bound operations.This could be for many reasons, such as:
* To execute a CPU-bound task like calculating something.
* To execute a blocking IO-bound task like reading or writing from a file.
* To call into a third-party library that does not support
asyncio
yet.Making a blocking call directly in an
asyncio
program will cause the event loop to stop while the blocking call is executing. It will not allow other coroutines to run in the background.This can be prevented by running the blocking call outside of the event loop, which we can do with
asyncio.to_thread()
.
# execute a function in a separate thread
import asyncio
await asyncio.to_thread(blocking_io_operation)
How to run blocking task with asyncio
The
asyncio.to_thread()
function takes a function to execute and any arguments. It returns acoroutine
that can be awaited or scheduled as an independent task. The function is then executed in a separate thread.
https://dev.to/hackerculture/python-asyncio-a-guide-to-asynchronous-programming-43j2
You can read more details about
asyncio.to_thread()
here How to call I/O bound operation from async context.
So, how asyncio.to_thread()
works?
- This is
async
function, so you can’t naturally call it from regular function (event if you have running loop in your thread). - It takes
regular
functionfunc
and it’s arguments, copies allContextVar
s that exist on the execution context of the runningTaks
(copy_context().run
is responsible for this). - It uses
asyncio’s default ThreadPoolExecutor
to runfunc.
That is, it is running on another thread thanto_thread()
. - Because immediate
await
(return await loop.run_in_executor(None, func_call)) thecall to_thread()
is not returned until we have result available. In more details:
loop.run_in_executor(None, func_call) returns asynio.Future
, and we immediately await
on it.
Now, what asyncio.to_thread()
do?
- So, your code will fire
func
in differentThread
, your event-loop will be able to run anothercoroutines
, whilefunc
is executing in anotherThread
and when it is done, you will get result on the caller ofasyncio.to_thread()
coroutine and continue to run as usual.
Now, we will try to improve the implementation.
The source code you can found here. It is available as part of my AlexBerUtils s project.
You can install AlexBerUtils from PyPi:
python -m pip install -U alex-ber-utils
See here for more details explanation on how to install.
Why asyncio.to_thread()
is not just regular function? Why only default asyncio executor supported?
Indeed, this change means that it can’t await
on loop.run_in_executor()
. But it can just return asyncio.Future
(that is returned by loop.run_in_executor()
). If the caller of asyncio.to_thread()
is coroutine, this change is transparent to it, if it want to get result immediately it will await
on asyncio.to_thread()
anyway.
Let start with very simple example. We’re in async context and have regular function, say sample_function()
, that has blocking I/O call. We can’t run it on the main event loop, because it going to block it. We should run it in another thread. Good practice is to use ThreadPoolExecutor
.
#1. From async context we're calling sync function.
import time
import asyncio
import atexit
from concurrent.futures import ThreadPoolExecutor
from alexber.utils.thread_locals import exec_in_executor
EXECUTOR = ThreadPoolExecutor()
def blocking_io(x: int, y: int) -> int:
time.sleep(5) # mimicking blocking I/O call
return x + y
def close_executor() -> None:
EXECUTOR.shutdown(wait=True)
atexit.register(close_executor)
# Alternative using ExitStack:
# with contextlib.ExitStack() as stack:
# stack.enter_context(EXECUTOR)
# or
# stack.callback(EXECUTOR.shutdown, wait=True)
async def main() -> None:
result = await exec_in_executor(EXECUTOR, blocking_io, 2, 3)
print(result)
if __name__ == "__main__":
asyncio.run(main())
You can change the line
result = await exec_in_executor(EXECUTOR, blocking_io, 2, 3)
to
result = await asyncio.to_thread(blocking_io, 2, 3)
and you will get the same result.
What are the differences?
There are 2 main differences.
asyncio.to_thread()
run with defaultThreadPoolExecutor
. Inexec_in_executor()
there multiple ways to set one.- There is side-effect that outlives,
sample_function()
execution and stay on the Thread fromThreadPoolExecutor
that run it.
Actually, there are multiple ways to pass executor:
- You can pass it explicitly on each call to
exec_in_executor()
as in example above. - You can pass executor in
initConf()
function. If passed it will be used globally by all threads (provided executor was not passed inexec_in_executor()
itself). - You can pass
None
toexec_in_executor()
(provided you didn’t set global executor byinitConf()
) thanasyncio’s default ThreadPoolExecutor
will be used.
Note:
asyncio
doesn’t have publicget_default_executor()
. The next best thing you can do it useloop.set_default_executor()
(for example,asyncio.get_event_loop().set_default_executor()
). The problem is, that this is not “global” setting, but only for one loop. If somewhere in the code, anotherloop
will be createdasyncio’s default ThreadPoolExecutor
will be used, unless you will explicitly setdefault_executor
to the same that youset
in previousloop
.
So, I decided to avoid all of these all together and to write very simple custom mechanism described above.
The second point “side-effect that outlives, sample_function()
execution” can be viewed as drawback, but it can be viewed also as forward optimization.
If we want to run not only regular/sync function as asyncio.to_thread(),
if we want to extend out API to async
function, we should provide running event loop for them. I see only 2 way to do it:
- “Reuse” event loop from the MainThread.
- Create new
event loop
per Thread inside executor.
First option was explored in lift_to_async()
API. The experience was pretty bad. You should craft the caller side in very specific way (for example, you shouldn’t be on the MainThread
, otherwise, your event loop will be blocked, so in order to avoid this, you will get exception; still bad solution).
Second option is to ensure
that every thread in the executor has set distinct event loop before the caller “real” code is called. This is what ensure_thread_event_loop()
for. In the final release, while this function remain public, you can omit a call to it, it will be called implicitly. Note, however, that it will be called regardless whether regular
or async
function you want to execute (the rationale for sync
function is to enable call to coroutine later on).
Internally, ensure_thread_event_loop()
function check, whether we have been created event loop
for the current thread
(from ThreadPoolExecutor
). It do it, using thread locals
.
If so, this means that we’ve already set event loop
on current thread
, so there is nothing to do.
Otherwise,
# If no event loop is present, create a new one
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
Also, this event loop
is stored in the thread locals
for future use.
Advantage:
- We’re creating
event loop
perthread
only once. In the next time when we will run another coroutine via executor , we will skip this step, saving runtime.
Disadvantage:
- Unrelated pieces of the code now affects each other.
- If executor that we use in
exec_in_executor()
is also used somewhere else, such asasyncio’s default ThreadPoolExecutor
, for example, it can lead to unexpected behavior.
Note: In general, it is recommended either not to use
asyncio.to_thread()
ever in your and your libraries code, or to use dedicatedThreadPoolExecutor
forexec_in_executor()
.Structure Concurrency Note: In the line below:
result = await exec_in_executor(EXECUTOR, blocking_io, 2, 3)
exec_in_executor()
executesblocking_io()
in another thread.
Program flow effectively splits: first part is execution ofblocking_io()
in another thread, the second branch of this flow is current task execution, that “joins”/awaits
the end ofblocking_io()
execution. Afterawait,
we have only one flow.
Lifecycle of EXECUTOR
if you’re using asyncio’s default ThreadPoolExecutor,
you shouldn’t worry about EXECUTOR
's lifecycle.
Another option, is use locally defined ThreadPoolExecutor
like this:
from concurrent.futures import ThreadPoolExecutor
from alexber.utils.thread_locals import exec_in_executor
async def some_coroutine() -> None:
with ThreadPoolExecutor() as executor:
result = await exec_in_executor(executor, blocking_io, 2, 3)
print(result)
The drawback of this method is obvious: each time some_coroutine()
will be called new ThreadPoolExecutor()
will be allocated. This is at waste of runtime and memory, due to it has resource allocation overhead.
However, If it is “main”
function, as in provided example, or lifespan
analogue of FastAPI, this is ok from resource allocation management point of view. It has minor drawback, that you should pass it explicitly to your sync
/async functions
. You can put it as global
variable of cause to solve last issue (ThreadPoolExecutor
is thread-safe, otherwise, ContextVar
should be used instead).
Side note
Another options, in the “main”
function you can do:
from concurrent.futures import ThreadPoolExecutor
from alexber.utils.thread_locals import exec_in_executor
EXECUTOR = None
async def main():
with ThreadPoolExecutor() as executor:
global EXECUTOR
EXECUTOR = executor
result = await exec_in_executor(EXECUTOR, blocking_io, 2, 3)
print(result)
Note: Using a
with
statement to manage aThreadPoolExecutor
is often preferable because it ensures that resources are automatically cleaned up when the block of code is exited. This automatic cleanup occurs even if an exception is raised, providing a robust mechanism for resource management. The context manager's__exit__
method is called when the block is exited, which handles the shutdown of the executor. This approach reduces the risk of leaving resources open or in an inconsistent state, as the executor is guaranteed to be properly shut down.In contrast, using the
atexit
module to register cleanup functions relies on the program terminating normally.atexit
functions are executed when a program exits naturally or through a controlled exit, such as callingsys.exit()
. This includes situations where the program reaches the end of its execution or explicitly callssys.exit()
. In multi-threaded programs, if the main thread finishes and there are no other non-daemon threads running, the program is considered to have terminated normally, andatexit
functions will be called. Additionally, when the Python interpreter shuts down in a controlled manner, such as at the end of a script or after asys.exit()
call,atexit
functions are executed.However,
atexit
functions may not be executed if the program terminates abnormally. This can occur if an uncaught exception causes the program to crash, although exceptions likeSystemExit
orKeyboardInterrupt
that are caught and handled will still allowatexit
functions to run. If the program is forcibly terminated by the operating system or by a user action, such as using thekill
command or pressingCtrl+C
,atexit
functions may not be executed. Additionally, if the program experiences a crash or segmentation fault,atexit
functions will not be called.
or
import contextlib
from concurrent.futures import ThreadPoolExecutor
from alexber.utils.thread_locals import exec_in_executor
EXECUTOR = ThreadPoolExecutor()
async def main():
with contextlib.ExitStack() as stack:
stack.enter_context(EXECUTOR)
result = await exec_in_executor(EXECUTOR, blocking_io, 2, 3)
print(result)
or
import contextlib
from concurrent.futures import ThreadPoolExecutor
from alexber.utils.thread_locals import exec_in_executor
EXECUTOR = ThreadPoolExecutor()
async def main():
with contextlib.ExitStack() as stack:
stack.callback(EXECUTOR.shutdown, wait=True)
result = await exec_in_executor(EXECUTOR, blocking_io, 2, 3)
print(result)
or
import contextlib
from concurrent.futures import ThreadPoolExecutor
from alexber.utils.thread_locals import exec_in_executor, ensure_thread_event_loop
EXECUTOR = None
async def main():
async with contextlib.AsyncExitStack() as stack: #just to demonstrate also this option
global EXECUTOR
EXECUTOR = stack.enter_context(
ThreadPoolExecutor(initializer=ensure_thread_event_loop))
result = await exec_in_executor(EXECUTOR, blocking_io, 2, 3)
print(result)
Note:
1.AsyncExitStack
was used for to illustrate thatThreadPoolExecutor
can use bothsync
andasync
“stack”.
2. UsingExitStack
/AsyncExitStack
is best when you have multiple context-managers in order to avoid nesting. As bonus you can call also call tostack.pop_all()
, which is used to remove all context managers and cleanup callbacks from the stack without invoking their exit or cleanup procedures. See below.
2. As I said aboveensure_thread_event_loop
will be called implicitly, so you can omitinitializer=ensure_thread_event_loop
.
Let’s elaborate on stack.pop_all()
. For example, before adding lifespan
to FastAPI, it has only @app.on_event(“startup”)
and @app.on_event(“shutdown”)
that can be used as decorator on two your functions. So, in order to implement the same logic you may write something like:
from fastapi import FastAPI
import contextlib
from concurrent.futures import ThreadPoolExecutor
app = FastAPI()
stack = contextlib.AsyncExitStack()
executor = None
@app.on_event("startup")
async def startup_event():
global executor
# Use async with to manage the AsyncExitStack
async with stack:
# Create and enter the ThreadPoolExecutor into the stack
executor = stack.enter_context(ThreadPoolExecutor())
print("Application has started with ThreadPoolExecutor.")
# Prevent the stack from closing at the end of the with block
stack.pop_all()
@app.on_event("shutdown")
async def shutdown_event():
# Manually cleanup all resources in the stack
await stack.aclose()
print("Application is shutting down, resources are cleaned up.")
@app.get("/")
async def read_root():
return {"Hello": "World"}
You see yourself, how verbose it is, and still you should pay attention, because:
Note:
1. Usingglobal
variables can lead to issues with state management and concurrency, especially in larger applications.ThreadPoolExecutor
is thread safe-no problems here.
ButAsyncExitStack
is not inherently thread-safe. You should limit access to thestack
to thestartup_event()
andshutdown_event()
only. Otherwise, you can have concurrency issues.
2. In general useContextVar
to store “global” or request/task scope vaiables.
So, for clarity, I will go with atexit
, but bare in your mind, that in real application you should, probably, use context-manager in one way or another.
P.S. This is how the same code looks like with lifespan
.
from fastapi import FastAPI
import contextlib
from concurrent.futures import ThreadPoolExecutor
async def app_lifespan():
async with contextlib.AsyncExitStack() as stack:
# Create and enter the ThreadPoolExecutor into the stack
executor = await stack.enter_async_context(ThreadPoolExecutor())
print("Application has started with ThreadPoolExecutor.")
yield
# Cleanup will happen here when the context exits
print("Application is shutting down, resources are cleaned up.")
app = FastAPI(lifespan=app_lifespan)
@app.get("/")
async def read_root():
return {"Hello": "World"}
END OF SIDE NOTE.
Before side note, we were looking on the lifecycle of ThreadPoolExecutor
.
The last option left is to defined ThreadPoolExecutor
as global
variable on the first place as in the example above. If we do so, we should ensure that it will be properly closed, all resources will be released and all the task running will be either canceled (1) or will have opportunely to finish (2).
In the example above ThreadPoolExecutor
is defined as global
variable and uses atexit
built-in model that registers a function to be executed upon normal program termination, we register function that shutdowns ThreadPoolExecutor
upon normal program termination and waits for all running tasks to finish, option (2).
END OF Lifecycle of EXECUTOR
Let’s look on more complex use-case:
- Suppose we have some library/framework (it can be simple
asyncio.run()
) that taking care about infrastructure ofasync
/await
and spinning event loop in the MainThread. - Suppose that for whatever reason we found ourselves in regular/sync
some_legacy_function()
. - We want to call our regular/sync
blocking_io()
function, that we want eventually to call asyncsample_coroutine()
function, execute it and return it’s value.
Having exec_in_executor()
as regular function makes it available for calls in the sync-context. For examples, it enable to define exec_in_executor_threading_future()
function that return concurrent.futures.Future
. Let see client code first:
#2. From sync contex we're calling sync function that calls async function.
import time
import asyncio
import atexit
from concurrent.futures import ThreadPoolExecutor
from alexber.utils.thread_locals import exec_in_executor_threading_future
EXECUTOR = ThreadPoolExecutor()
async def sample_coroutine(x: int, y: int) -> int:
await asyncio.sleep(1)
#time.sleep(10)
return x * y
def blocking_io():
time.sleep(5) # mimicking blocking I/O call
async def helper_example_usage():
#time.sleep(15)
result = await sample_coroutine(2, 3) # Example values for x and y
return result
loop = asyncio.get_event_loop()
return loop.run_until_complete(helper_example_usage())
def close_executor() -> None:
EXECUTOR.shutdown(wait=True)
atexit.register(close_executor)
# Alternative using ExitStack:
# with contextlib.ExitStack() as stack:
# stack.enter_context(EXECUTOR)
# or
# stack.callback(EXECUTOR.shutdown, wait=True)
def some_legacy_function() -> None:
fut = exec_in_executor_threading_future(EXECUTOR, blocking_io)
result = fut.result()
print(result)
async def main() -> None:
some_legacy_function()
if __name__ == "__main__":
asyncio.run(main())
And below is simplified implementation of exec_in_executor_threading_future()
. It work both for sync
and sync
function, here for clarity there is only sync
case.
import concurrent
from alexber.utils.thread_locals import exec_in_executor
def exec_in_executor_threading_future(executor, func, *args, **kwargs):
future = concurrent.futures.Future()
def wrapper(future):
result = func(*args, **kwargs)
future.set_result(result)
exec_in_executor(executor, wrapper, future)
return future
As you can see, because exec_in_executor()
is regular
/sync
function we can call it from sync
context. We’re disregarding asyncio.Future
it returns, instead we’re implemented custom mechanism to retrieve result of func()
. We’re defining threading.Future
that will be returned. We’re defining wrapper()
regular
/sync
function that will be send together with executor
and the feature
above to exec_in_executor()
API.
In side wrapper()
function that will be run on another Thread, we’re calling to func
that we’re wrapping, get the result and set it on the future.
Let’s go back to the usage of exec_in_executor_threading_future()
.
So, suppose we have FastAPI application (or any other or just asyncio.run()
) that taking care about infrastructure of async
/await
and spinning event loop in the MainThread.
So, our application’s code starts from main()
. For whatever reason, we’re found ourselves in regular
/sync
function, for example in some_legacy_function()
. Now, we want to call sync
function that do blocking_io()
call and it want to call async function
(some rewritten legacy code, for example), sample_coroutine()
.
So, from some_legacy_function()
we can use exec_in_executor_threading_future()
API and run blocking_io()
. This API also responsible for spinning event loop, if needed, in the Thread
from executor
, that runs blocking_io()
.
Structure Concurrency Note: In this example,
exec_in_executor_threading_future()
splits the flow. MainThread is going to “join”Thread
fromexecutor
, that runsblocking_io()
in thefut.result()
. After this call we have single branch flow.
So, on another flow branch we’re inside def blocking_io()
on some thread,
let say, executor-thread-1
, (other that MainThread
) inside executor
.
If we were using
asyncio.to_thread()
API, there were no spinningevent loop
insidelocking_io()
.exec_in_executor()
ensures that this thread do have spinningevent loop.
This is the reason that line:
loop = asyncio.get_event_loop()
works. It retrieves this event loop. Nothing is run on it. So,
loop.run_until_complete(helper_example_usage())
is also work. Note, that helper_example_usage()
is async
function, so we’re entering async
context.
Structure Concurrency Note: Here,
helper_example_usage()
splits the flow.executor-thread-1
is going to “join” the result it will get fromloop.run_until_complete(helper_example_usage())
.executor-thread-1
will not proceed until the result was “joint”. After this call we have single branch flow, that returns fromblocking_io()
.
Now, when we’re in async
context inside helper_example_usage()
, we can do all regular things that we can do with async/await
. The only caveat is, that we’re not running in MainThread.
In this example, there is simple async
/await
to sample_coroutine()
, that make some calculation (solely for demo purposes). Everything is working as expected.
This use-case is simpler than previous one:
- Suppose we have some library/framework (it can be simple
asyncio.run()
) that taking care about infrastructure ofasync
/await
and spinning event loop in the MainThread. - We want to call our regular/sync
blocking_io()
function, that we want eventually to call asyncsample_coroutine()
function, execute it and return it’s value.
So, suppose we have FastAPI application (or any other or just asyncio.run()
) that taking care about infrastructure of async
/await
and spinning event loop in the MainThread.
So, our application’s code starts from main()
. Now, we have regular
/sync
function blocking_io()
. So, we want to run in another thread. We can use exec_in_executor()
directly. This API also responsible for spinning event loop, if needed, in the Thread
from executor
, that runs blocking_io()
.
Structure Concurrency Note: In this example,
exec_in_executor()
splits the flow. MainThread is going to “join”Thread
fromexecutor
, that runsblocking_io()
in theawait
(onasyncio.Future
). After this call we have single branch flow.
#3 From async context we're calling sync function that calls async function.
import time
import asyncio
import atexit
from concurrent.futures import ThreadPoolExecutor
from alexber.utils.thread_locals import exec_in_executor
EXECUTOR = ThreadPoolExecutor()
async def sample_coroutine(x: int, y: int) -> int:
await asyncio.sleep(1)
#time.sleep(10)
return x * y
def blocking_io() -> None:
time.sleep(5) # mimicking blocking I/O call
async def helper_example_usage():
#time.sleep(15)
result = await sample_coroutine(2, 3) # Example values for x and y
return result
loop = asyncio.get_event_loop()
return loop.run_until_complete(helper_example_usage())
def close_executor() -> None:
EXECUTOR.shutdown(wait=True)
atexit.register(close_executor)
# Alternative using ExitStack:
# with contextlib.ExitStack() as stack:
# stack.enter_context(EXECUTOR)
# or
# stack.callback(EXECUTOR.shutdown, wait=True)
async def main() -> None:
result = await exec_in_executor(EXECUTOR, blocking_io)
print(result)
if __name__ == "__main__":
asyncio.run(main())
After we got to blocking_io(),
all other is similar as in previous example. Only, the “joining” point here is await
(on asyncio.Future
), not fut.result()
.
Let look on another use-case.
So, suppose we have FastAPI application (or any other or just asyncio.run()
) that taking care about infrastructure of async
/await
and spinning event loop in the MainThread.
So, our application’s code starts from main()
. Now, we want to call async
function sample_coroutine()
call.
So, from main()
we can use exec_in_executor()
API and run sample_coroutine()
. This API also responsible for spinning event loop, if needed, in the Thread
from executor
, that runs sample_coroutine()
. So, when we’re in sample_coroutine()
there is running event loop
.
#4 From async context we're calling async function.
#import time
import asyncio
import atexit
from concurrent.futures import ThreadPoolExecutor
from alexber.utils.thread_locals import exec_in_executor
EXECUTOR = ThreadPoolExecutor()
async def sample_coroutine(x: int, y: int) -> int:
await asyncio.sleep(1)
#time.sleep(10)
return x * y
def close_executor() -> None:
EXECUTOR.shutdown(wait=True)
atexit.register(close_executor)
# Alternative using ExitStack:
# with contextlib.ExitStack() as stack:
# stack.enter_context(EXECUTOR)
# or
# stack.callback(EXECUTOR.shutdown, wait=True)
async def main() -> None:
result = await exec_in_executor(EXECUTOR, sample_coroutine, 2, 3)
print(result)
if __name__ == "__main__":
asyncio.run(main())
Note:
It should be very rare in real production code.
Ifsample_coroutine()
is “well-behaved” coroutine, there is no need to run in on another thread.
But sometimes, for example, it use blocking I/O somewhere in your libraries stack, so refactoring is painful. So, for a meanwhile, you can use this as workaround.Structure Concurrency Note: In this example,
exec_in_executor()
splits the flow. MainThread is going to “join”Thread
fromexecutor
, that runssample_coroutine()
in theawait
(onasyncio.Future
). After this call we have single branch flow.
Let look on another use-case.
So, suppose we have FastAPI application (or any other or just asyncio.run()
) that taking care about infrastructure of async
/await
and spinning event loop in the MainThread.
So, our application’s code starts from main()
. For whatever reason, we’re found ourselves in regular
/sync
function, for example in some_legacy_function()
. Now, we want to call async
function sample_coroutine()
.
So, from some_legacy_function()
we can use exec_in_executor_threading_future()
API and run sample_coroutine()
. This API also responsible for spinning event loop, if needed, in the Thread
from executor
, that runs sample_coroutine()
.
#5 From sync context calls async function.
#import time
import asyncio
import atexit
from concurrent.futures import ThreadPoolExecutor
from alexber.utils.thread_locals import exec_in_executor_threading_future
EXECUTOR = ThreadPoolExecutor()
async def sample_coroutine(x: int, y: int) -> int:
await asyncio.sleep(1)
#time.sleep(10)
return x * y
def close_executor() -> None:
EXECUTOR.shutdown(wait=True)
atexit.register(close_executor)
# Alternative using ExitStack:
# with contextlib.ExitStack() as stack:
# stack.enter_context(EXECUTOR)
# or
# stack.callback(EXECUTOR.shutdown, wait=True)
def some_legacy_function():
fut = exec_in_executor_threading_future(EXECUTOR, sample_coroutine, 2, 3)
result = fut.result()
print(result)
async def main() -> None:
some_legacy_function()
if __name__ == "__main__":
asyncio.run(main())
Note:
It should be very rare in real production code.
Ifsample_coroutine()
is “well-behaved” coroutine, there is no need to run in on another thread.
But sometimes, for example, it use blocking I/O somewhere in your libraries stack, so refactoring is painful. So, for a meanwhile, you can use this as workaround.Structure Concurrency Note: In this example,
exec_in_executor_threading_future()
splits the flow. MainThread is going to “join”Thread
fromexecutor
, that runssample_coroutine()
in thefut.result()
. After this call we have single branch flow.
Last thoughts
While exec_in_executor()
/exec_in_executor_threading_future()
primitives by themselves does not promote Structured Concurrency
, their intended usage, as demonstrated above enables to write such code. For more information, what Structured Concurrency
is and what “well-behaved coroutine” is (I use the later term wagly, but I have thorough breakdown), see
Structured Concurrency as extension of Go statement considered harmful.