How to run background long-running tasks with asyncio
UPDATE 2025–02–20: The source code is actually different from the provided here. Some critical bugs was fixed arround ContextVar and Exception prorogation. FYI, these fixes are not reflected here.
In My exec_in_executor() API we go through exec_in_executor()
/exec_in_executor_threading_future()
APIs and see how they can be used to run sync
& async
from both sync
& async
context.
Moreover, code examples that I gave their, promotes Structured Concurrency. You can read about standard asyncio
API here https://alex-ber.medium.com/how-to-call-i-o-bound-operation-from-async-context-565a504548b0.
But sometimes, you need something different. For example, upon HTTP request, you want to start some long-running
background task.
What you really want is to return to the client some id, start background task execution, but don’t wait until it will be finished (as I said, it is too long).
This is what AsyncExecutionQueue
for.
The source code you can found here. It is available as part of my AlexBerUtils s project.
You can install AlexBerUtils from PyPi:
python -m pip install -U alex-ber-utils
See here for more details explanation on how to install.
This is the source-code (doc-strings are omitted):
_CLOSE_SENTINEL = object()
class AsyncExecutionQueue(RootMixin):
def __init__(self, **kwargs):
self.queue = kwargs.pop("queue", None)
if not self.queue:
self.queue = asyncio.Queue()
self.executor = kwargs.pop("executor", None)
super().__init__(**kwargs)
async def __aenter__(self):
self._worker_task = asyncio.create_task(self.worker())
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.aclose()
async def worker(self):
while True:
task, task_future = await self.queue.get()
try:
if task is _CLOSE_SENTINEL:
return # Exit the worker loop
func, args, kwargs = task
# Use the helper function to execute the task and set the result in the task_future
result_future = exec_in_executor(self.executor, func, *args, **kwargs)
result_future.add_done_callback(lambda fut: chain_future_results(fut, task_future))
finally:
# Mark the task as done, regardless of what the task was
self.queue.task_done()
async def aadd_task(self, func, /, *args, **kwargs):
future = asyncio.get_running_loop().create_future()
await self.queue.put(((func, args, kwargs), future))
return future
def add_task(self, executor, func, /, *args, **kwargs):
fut = exec_in_executor_threading_future(executor, self.aadd_task, func, *args, **kwargs)
return fut
async def aclose(self):
await self.queue.put((_CLOSE_SENTINEL, None))
if self._worker_task:
await self._worker_task
def chain_future_results(source_future: FutureType, target_future: FutureType):
try:
result = source_future.result()
target_future.set_result(result)
except Exception as e:
target_future.set_exception(e)
class RootMixin:
def __init__(self, **kwargs):
# The delegation chain stops here
pass
This is example usage:
import time
from fastapi import FastAPI
from contextlib import asynccontextmanager, AsyncExitStack
from concurrent.futures import ThreadPoolExecutor
from alexber.utils.thread_locals import AsyncExecutionQueue, exec_in_executor, exec_in_executor_threading_future
import uuid
import uvicorn
EXECUTOR = None
QUEUE = None
@asynccontextmanager
async def lifespan(app: FastAPI):
async with AsyncExitStack() as stack:
global EXECUTOR, QUEUE
# Create and manage the ThreadPoolExecutor
EXECUTOR = stack.enter_context(ThreadPoolExecutor())
# Create and manage the AsyncExecutionQueue
QUEUE = await stack.enter_async_context(AsyncExecutionQueue(executor=EXECUTOR))
print("Application has started with ThreadPoolExecutor and AsyncExecutionQueue.")
yield
# Cleanup will happen here when the context exits
print("Application is shutting down, resources are cleaned up.")
app = FastAPI(lifespan=lifespan)
@app.get("/")
async def read_root():
task_id = str(uuid.uuid4())
await QUEUE.aadd_task(create_some_task)
#QUEUE.add_task(EXECUTOR, create_some_task)
print(f"Going to return {task_id}")
return {"task_id": task_id}
async def create_some_task():
print("start create_some_task async function!")
time.sleep(120) #mimic long-running blocking task
return 75
if __name__ == "__main__":
print("API MAIN Function Initialized!")
uvicorn.run(app, host="0.0.0.0", port=8080)
In lifespan()
async context manager function, we’re creating EXECUTOR as ThreadPoolExecutor and QUEUE as AsyncExecutionQueue
.
Note, that we’re passing EXECUTOR as parameter to QUEUE. AsyncExecutionQueue
doesn’t manage EXECUTOR lifecycle.
Note: You can read more about
Lifecycle of EXECUTOR
here My exec_in_executor() API https://medium.com/@alex-ber/my-exec-in-executor-api-72797e232f99
Instead, it is explicitly managed by application code via AsyncExitStack
.
Note: You can read more about
ExitStack
/AsyncExitStack
in Side Note here My exec_in_executor() API https://medium.com/@alex-ber/my-exec-in-executor-api-72797e232f99
We’re storing EXECUTOR and QUEUE as global variables.
Note: The
ThreadPoolExecutor
andAsyncExecutionQueue
are designed to handle concurrent operations, so it is ok, to store them asglobal
variables.
Let’s go over the code. In QUEUE = AsyncExecutionQueue.__init__(...)
in the example above, new asyncio.Queue()
is created. EXECUTOR was created and is managed in lifespan()
method and is passed as parameter into AsyncExecutionQueue.__init__(),
so it will be used for running sync
and/or async
functions in another thread.
Now, on GET /
call, we’re generating unique task_id
. What we want to do now, that split our flow, on the first branch, we want to return task_id
to the client (he will later use it to retrieve the result of the create_some_task()
execution) and on the second branch we want fire-and-forget
execution of create_some_task()
somewhere in background. What is crucial, we don’t want to “join” to the result of create_some_task()
execution. It is not Structured Concurrency, but this is done by design.
So, we’re using await QUEUE.aadd_task(create_some_task)
for fire-and-forget
create_some_task. I will go back to the syntax in a while. For now, it is sufficient to realize that we just putting create_some_task
on the QUEUE
and we’re not going to wait that it will be executing (even, we’re not waiting, that it will be start to be executed). The second branch isfire-and-forget
execution of create_some_task()
. The first just continue to run normally, without “joining” the second branch, it logs and returns task_id
.
1. await QUEUE.aadd_task(create_some_task)
async def aadd_task(self, func, /, *args, **kwargs):
future = asyncio.get_running_loop().create_future()
await self.queue.put(((func, args, kwargs), future))
return future
As you see, future
that is bound to currently running event loop
is created. func
context together with this future
is put on the internal self.queue
. await
here means to await
while the item is successfully put into internal data-structure of the asyncio.Queue
. This operation is pretty fast. Than future
of type asyncio.Future
(most-likely uncompleted) is return to the client code. He can, for example, store it anywhere and periodically check (busy-loop
) if the future
is complete. This check can be done by another endpoint
. Of course, you can implement custom mechanism to retrieve the result of computation (of create_some_task
in this example).
Alternatively, we could use:
2. QUEUE.add_task(EXECUTOR, create_some_task)
def add_task(self, executor, func, /, *args, **kwargs):
fut = exec_in_executor_threading_future(executor, self.aadd_task, func, *args, **kwargs)
return fut
This is alternative API. As you can see it leverage exec_in_executor_threading_future()
primitive to execute self.aadd_task()
inside executor. The future it returns is of type threading.Future
). You can disregard it or store for later use like in p.1.
This API is handful, when you want to use AsyncExecutionQueue
in sync
context.
Now, let see what doesn’t work as expe
3. await exec_in_executor(EXECUTOR, create_some_task) #doesn’t work as expected
What will happen we will fire create_some_task()
on some thread inside EXECUTOR, but we will await
for the result of it’s calculation. That is not what we want.
4. exec_in_executor(EXECUTOR, create_some_task)
#works with limitations
While technically this would work, it suffice for straightforward use cases where task management and tracking are not necessary. It allows for direct execution of functions or coroutines in a separate thread. This method has minimal overhead. However, it lacks built-in mechanisms for tracking task completion or managing task dependencies. It offers limited control over concurrency
and resource usage
and does not support graceful shutdown
, meaning tasks may not be completed if the application is shut down abruptly.
5. exec_in_executor_threading_future(EXECUTOR, create_some_task) #work with limitations
Same as 4.
6. FastAPI/starlette BackgroundTask #doesn’t work as expected
Let’s take example from documentation: https://fastapi.tiangolo.com/tutorial/background-tasks/#dependency-injection
from fastapi import BackgroundTasks, Depends, FastAPI
app = FastAPI()
def write_log(message: str):
with open("log.txt", mode="a") as log:
log.write(message)
def get_query(background_tasks: BackgroundTasks, q: str | None = None):
if q:
message = f"found query: {q}\n"
background_tasks.add_task(write_log, message)
return q
@app.post("/send-notification/{email}")
async def send_notification(
email: str, background_tasks: BackgroundTasks, q: str = Depends(get_query)
):
message = f"message to {email}\n"
background_tasks.add_task(write_log, message)
return {"message": "Message sent"}
FastAPI application typically running in a single-threaded environment, the background task will run in the same thread as the main application
. While the background task is executing, the server will not be able to process other requests in that thread. This can lead to delays in handling new requests if the background task is long-running.
FastAPI may run with multiple workers or threads (e.g., using uvicorn
with multiple workers). In such cases, while one worker is busy executing a background task, other workers can continue to handle incoming requests. This setup mitigates the impact of background tasks on the server's ability to handle new requests. But usually, you have small number of running workers, say k
, so, if /send-notification/{email}
was called more than k
times you will still found that server is not be able to process other requests.
AsyncExecutionQueue
The AsyncExecutionQueue
offers structured task management, allowing for easy addition
, execution
, and completion tracking of tasks
. It provides concurrency control
, enabling you to manage the number of concurrent tasks and resource usage effectively. Additionally, it supports graceful shutdown
, ensuring all tasks are completed before the application shuts down. This approach is scalable
, as you can adjust the executor or queue size
to meet your needs. However, it introduces complexity
and overhead
due to queue management and task scheduling. It also requires careful resource management
to avoid bottlenecks or excessive memory usage.
AsyncExecutionQueue
implements asynchronous producer-consumer pattern
. It is also async context manager
.
Async context manager
This is relevant code. Dosctring are ommited for brevity:
def __init__(self, **kwargs):
self.queue = kwargs.pop("queue", None)
if not self.queue:
self.queue = asyncio.Queue()
self.executor = kwargs.pop("executor", None)
super().__init__(**kwargs)
async def __aenter__(self):
self.worker_task = asyncio.create_task(self.worker())
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.aclose()
async def aclose(self):
await self.queue.put((_CLOSE_SENTINEL, None))
await self.worker_task
So, in __init__()
method you can pass configured asyncio.Queue
.If not passed, the default one will be created. You can also explicitly pass executor. Anyway, the executor is resolved in the following order:
- If the
executor
parameter is provided, it is used. - if an
executor
was passed viainitConfig(),
it is used. - If neither is set,
None
is used, which means thedefault asyncio executor
will be used.
Note: as a side fact, threads in the executor may have an event loop attached. This allows for the execution of asynchronous tasks within those threads. See What are the differences? here My exec_in_executor() API https://medium.com/@alex-ber/my-exec-in-executor-api-72797e232f99 for more details.
Quick summary, in __init__()
you can customize or use defaults for executor
and asyncio.Queue
.
__aenter__()
is called in async with AsyncExecutionQueue(...) as queue
after __init__()
. Note, that last line is return self
, this means that queue
variable above will hold AsyncExecutionQueue.
This is typical pattern for context-managers
.It also creates Task
from self.worker()
(this is actually consumer
), and assigns it to self.worker_task
(for future use in aclose()
for graceful shutdown
).
__aexit__()
will be called leaving async with AsyncExecutionQueue(...) as queue
. It delegates resource cleanup
to aclose()
async function
.
aclose()
— is designed to be called implicitly from __aexit__()
, but explicit call is also supported. It first, put on the asyncio.queue
_CLOSE_SENTINEL
and awaits
until self.worker_task
will be completed. More no this below.
Producer
async def aadd_task(self, func, /, *args, **kwargs):
future = asyncio.get_running_loop().create_future()
await self.queue.put(((func, args, kwargs), future))
return future
threading.Feature
can be standalone, can be not binding to any ThreadPool
, among other things. asyncio.Future
is always binding to the event loop
. Without loss of generality, lets assume that aadd_task()
is called from the MainThread, so asyncio.get_running_loop()
is MainThread’s event loop
. We’re creating asyncio.Future
that will be schedule on this event loop and we can, if we like await
on it later, but typically, we will disregard it and use some custom mechanism to get indication whether it finished and if so what is the result.
await self.queue.put(((func, args, kwargs), future))
Here we’re putting on asyncio.queue
pair, second element of it is asyncio.Future
described above, and first is what sync
/async
function and it’s arguments to run. We’re await
-ing while the arguments are safely placed into asyncio.queue
. When is done, we can immediately proceed. As said above we’re returning asyncio.Future
that can be used to retrieve result, but typically will be disregarded.
Note:
1. I want to emphasizedasyncio.queue
is a tool that enables decoupling betweenproducers
andconsumers
. For example, if weos.cpu_count()
equal to 8, defaultmax_workers
ofThreadPoolExecutor
, will be min(32, (os.cpu_count() or 1) + 4)=12. So, you can have 1producer
and up to 12 concurrentconsumers.
Tasksaadd_task()
andworker()
, without loss of generality, are scheduled on MainThread’sevent loop.
They arevery short running
. More on this below.
2. 1. In threading worldqueue.Queue
is used for decoupling between producers and consumers. There you also must have at leastone additional Thread
(the best practice, of course, is to useThreadPoolExecutor;
you can start running with 1 thread and adjust this parameter as you go).
3. In asyncio world if you now, that your task are short or useheavily async I/O
, you can rely only on 1 event loop without any additional Thread. If your task are CPU bound, it is better to run it inThreadPoolExecutor
.
So, producer
, creates asycnio.Future
on the, without loss of generality, MainThread’s event loop
, (fast operation) puts func, args, kwars
and future
on the asycnio.Queue
(fast operation) and returns future (fast operation).
Consumer
async def worker(self):
while True:
task, task_future = await self.queue.get()
try:
if task is _CLOSE_SENTINEL:
return # Exit the worker loop
func, args, kwargs = task
result_future = exec_in_executor(self.executor, func, *args, **kwargs)
result_future.add_done_callback(lambda fut: chain_future_results(fut, task_future))
finally:
# Mark the task as done, regardless of what the task was
self.queue.task_done()
This is endless loop
that get
s item from the asyncio.Queue
(when it availble), does some computation and sends ack
s to asyncio.Queue
to throw it away from the asyncio.Queue
.It does it regardless whether compuation raised exception or not.
worker()
tasks starts running when MainThread
reaches async with AsyncExecutionQueue(...) as queue
point before "as queue"
. It passes “while True”
check and immideately await
s on self.queue.get()
.
Afterproducer
(self.aad_task()
) will send item to the self.queue
, next time when worker()
task will have opportunity to run self.queue.get()
will actually return the item and worker()
will make a computation.
Each item in the self.queue
is a pair, task and task_future
. Quick reminder, without lose of generality, task_future
is bound to MainThread’s event loop
, worker()
, aad_task()
also, without lose of generality, runs on the same MainThread’s event loop
.
Now, if task is sentinel value _CLOSE_SENTINEL (aclose()
coroutine that is, technically, also producer
, sends it), worker()
task returns
from the worker()
task right away. (aclose()
coroutine will await
to ensure that it ends).
So, _CLOSE_SENTINEL is specical value to stop the consumer
- worker()
coroutine in our case.
func, args, kwargs = task
result_future = exec_in_executor(self.executor, func, *args, **kwargs)
result_future.add_done_callback(lambda fut: chain_future_results(fut, task_future))
def chain_future_results(source_future: FutureType, target_future: FutureType):
try:
result = source_future.result()
target_future.set_result(result)
except Exception as e:
target_future.set_exception(e)
If self.queue
doesn't contain special value, it’s first component can be expanded as func, arg, kwargs
(line 1 above).
So, we want to run func
with arg, kwargs
and returns the result of it’s running to producer
.
The challange here is that we know nothing about the nature of func
. It can be well-behaved and can be not. I remind you that without lose of generality,worker()
runs on the MainThread’s event loop
. So we don’t want to accedantely block the event loop
.
Here exec_in_executor()
API, see https://medium.com/@alex-ber/my-exec-in-executor-api-72797e232f99, go to the rescue.
Note that there is no immideate await
after exec_in_executor()
call. Instead the result (asyncio.Future
) is assinging to the variable result_future
. This variable (a monad
if you like) represents the value of calculation that will be available in future.
So, we have task_future
from asyncio
on which AsyncExecutionQueue
’s client might await
, and we have result_future
also from asyncio
that exec_in_executor()
returned and on which result of execution func
with *args
and **kwargs
will be eventually available.
What we want to do is, whener result_future.result()
is available, if it is usual value, it will be set via task_future.set_result()
and if it is exception, that it will be reraised, caught and set via task_future.set_exception()
. This is exactely what chain_future_results()
function do.
Last line I need to explain is:
result_future.add_done_callback(lambda fut: \
chain_future_results(fut, task_future))
Method result_future.add_done_callback()
provide ability to register callback that will be fired whener result_future is done
.
Note: Quick reminder, without lose of generality,
result_future
is bound to MainThread’sevent loop,
task_future
is bound to MainThread’sevent loop
,worker()
,aad_task()
also are running on the same MainThread’sevent loop
. So,add_done_callback()
will be triggered when nessecery as expected.
In this state
it is guranteed that result_future.result()
is holds either usual value or exception. When the future completes
, the callback
is invoked, and the future instance (result_future
in our case) that completed
is passed as an argument to the callback.
Note: It is tempting to use result_future directely inside callback. The proposed change introduces a potential issue by relying on the outer variable
result_future
instead of the callback argumentfut
. In asynchronous programming,result_future
could be reassigned or modified before the callback executes, leading to incorrect or unintended behavior. Theadd_done_callback
method guarantees that the completedFuture
is passed as an argument (fut
) to the callback. Ignoring this and referencingfut
directly assumes that the variable will always point to the correctFuture
, which may not be true in dynamic or concurrent scenarios. Usingfut
explicitly ensures the callback always operates on the correctFuture
that triggered it, avoiding ambiguity or errors caused by variable reassignment. This makes the callback more robust, clear, and less prone to bugs.
So,
result_future.add_done_callback(lambda fut: \
chain_future_results(fut, task_future))
When result_future
is completed, callback will be called where fut
as Future
in completed state, will be passed. This callback will call chain_future_results()
that will “chain” result_future
in completed state to task_future
, making result of func
execution available to the AsyncExecutionQueue’
s client.
Appendix A: Backpressure in asyncio.Queue
asyncio.Queue
is explicitly designed for asynchronous systems, where tasks (coroutines) are suspended using await
. The backpressure mechanism in asyncio.Queue
is more explicit in that it uses the await
keyword, which suspends the task until the operation can proceed (i.e., when space is available in the queue or when an item is available to consume). This provides a more cooperative multitasking model where the event loop can continue running other tasks while a specific task is waiting.
In the context of asyncio.Queue
, backpressure is achieved through the following features:
- Queue Size Limitation:
When creating a queue, you can specify a maximum size (maxsize
). If the queue is full and the producer tries to put an item into the queue, it will wait until there is space available. This prevents the producer from overwhelming the queue and the consumer.queue = asyncio.Queue(maxsize=10)
If the queue is full (i.e., the number of items in the queue equalsmaxsize
), theput()
method will await until space is available. - Consumer Waiting:
If the queue is empty, the consumer will have to await for an item to be available. The consumer can useawait queue.get()
to asynchronously wait for an item, and it will not block the event loop.item = await queue.get()
asyncio.Queue
Awaiting Behavior:
When the queue reaches its maximum size, the producer is awaits until there is space.
When the queue is empty, the consumer is awaits until there is data to consume.
Code example:
import asyncio
async def producer(queue):
for i in range(20):
print(f"Producing item {i}")
await queue.put(i) # This will await if the queue is full
await asyncio.sleep(0.1) # Simulate some async work
async def consumer(queue):
while True:
item = await queue.get() # This will await if the queue is empty
print(f"Consuming item {item}")
await asyncio.sleep(0.2) # Simulate processing the item
async def main():
queue = asyncio.Queue(maxsize=5)
await asyncio.gather(producer(queue), consumer(queue))
# Run the asyncio event loop
asyncio.run(main())
In this example:
- The producer will await if the queue reaches a size of 5 (because
maxsize=5
). - The consumer will await if the queue is empty.
asyncio.Queue
provides an implicit backpressure mechanism by awaiting the producer
when the queue is full and awaiting the consumer
when the queue is empty, ensuring that both sides do not overwhelm each other. This is how the backpressure is handled asynchronously without explicitly needing a separate mechanism.
Appendix B: Backpressure in queue.Queue
The queue.Queue
in Python's standard library (from the queue
module) does not offer a direct or explicit backpressure mechanism like asyncio.Queue
, but it exhibits similar behavior through blocking operations.
queue.Queue
is primarily designed for synchronous, thread-based programs (using the threading
module). While it lacks the asynchronous features of asyncio.Queue
, it still manages flow control in a comparable way by allowing the producer and consumer to block when needed:
- Queue Size Limitation:
Just likeasyncio.Queue
, you can set amaxsize
for the queue. When the queue is full, the producer will block when trying to put an item into the queue, waiting until space becomes available.queue = queue.Queue(maxsize=10)
If the queue is full, callingqueue.put(item)
will block (by default) until there is space. - Blocking Behavior:
- Producer (Blocking on Put): When the
queue
is full, callingqueue.put(item)
will block the producer thread. This is a form of backpressure on theproducer
, as it slows down the production of items when the queue is unable to handle more. - Consumer (Blocking on Get): When the
queue
is empty, callingqueue.get()
will block theconsumer
thread until there is an item toconsumer
.
The queue.Queue
provides block
as a parameter for both put
and get
methods to allow controlling this blocking behavior:
queue.put(item, block=True, timeout=None)
: Blocks by default when the queue is full.queue.get(block=True, timeout=None)
: Blocks by default when the queue is empty.
Code example:
import threading
import queue
import time
def producer(q):
for i in range(20):
print(f"Producing item {i}")
q.put(i) # Blocks if the queue is full
time.sleep(0.1)
def consumer(q):
while True:
item = q.get() # Blocks if the queue is empty
print(f"Consuming item {item}")
time.sleep(0.2)
def main():
q = queue.Queue(maxsize=5) # Set a maximum size for the queue
producer_thread = threading.Thread(target=producer, args=(q,))
consumer_thread = threading.Thread(target=consumer, args=(q,))
producer_thread.start()
consumer_thread.start()
producer_thread.join()
consumer_thread.join()
if __name__ == "__main__":
main()
In this example:
- The
producer
will block if thequeue
reaches a size of 5 (becausemaxsize=5
). - The
consumer
will block if thequeue
is empty.
Summary:
queue.Queue
does provide a form of backpressure, but in a synchronous blocking way rather than asynchronously like asyncio.Queue
.
- If the queue is full, the producer will block (wait) when calling
put
. - If the queue is empty, the consumer will block (wait) when calling
get
.
This mechanism helps manage flow control, but it is not as flexible or sophisticated as the asynchronous backpressure mechanisms provided by asyncio.Queue
. It is best suited for threaded programs rather than asynchronous event loops.