My AsyncExecutionQueue

alex_ber
15 min readNov 30, 2024

--

How to run background long-running tasks with asyncio

UPDATE 2025–02–20: The source code is actually different from the provided here. Some critical bugs was fixed arround ContextVar and Exception prorogation. FYI, these fixes are not reflected here.

In My exec_in_executor() API we go through exec_in_executor()/exec_in_executor_threading_future()APIs and see how they can be used to run sync & async from both sync & async context. Moreover, code examples that I gave their, promotes Structured Concurrency. You can read about standard asyncio API here https://alex-ber.medium.com/how-to-call-i-o-bound-operation-from-async-context-565a504548b0.

But sometimes, you need something different. For example, upon HTTP request, you want to start some long-running background task.

What you really want is to return to the client some id, start background task execution, but don’t wait until it will be finished (as I said, it is too long).

This is what AsyncExecutionQueue for.

The source code you can found here. It is available as part of my AlexBerUtils s project.

You can install AlexBerUtils from PyPi:

python -m pip install -U alex-ber-utils

See here for more details explanation on how to install.

This is the source-code (doc-strings are omitted):

_CLOSE_SENTINEL = object()

class AsyncExecutionQueue(RootMixin):

def __init__(self, **kwargs):
self.queue = kwargs.pop("queue", None)
if not self.queue:
self.queue = asyncio.Queue()
self.executor = kwargs.pop("executor", None)
super().__init__(**kwargs)

async def __aenter__(self):
self._worker_task = asyncio.create_task(self.worker())
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.aclose()

async def worker(self):
while True:
task, task_future = await self.queue.get()
try:
if task is _CLOSE_SENTINEL:
return # Exit the worker loop
func, args, kwargs = task
# Use the helper function to execute the task and set the result in the task_future
result_future = exec_in_executor(self.executor, func, *args, **kwargs)
result_future.add_done_callback(lambda fut: chain_future_results(fut, task_future))
finally:
# Mark the task as done, regardless of what the task was
self.queue.task_done()


async def aadd_task(self, func, /, *args, **kwargs):
future = asyncio.get_running_loop().create_future()
await self.queue.put(((func, args, kwargs), future))
return future

def add_task(self, executor, func, /, *args, **kwargs):
fut = exec_in_executor_threading_future(executor, self.aadd_task, func, *args, **kwargs)
return fut


async def aclose(self):
await self.queue.put((_CLOSE_SENTINEL, None))
if self._worker_task:
await self._worker_task

def chain_future_results(source_future: FutureType, target_future: FutureType):
try:
result = source_future.result()
target_future.set_result(result)
except Exception as e:
target_future.set_exception(e)


class RootMixin:
def __init__(self, **kwargs):
# The delegation chain stops here
pass

This is example usage:

import time
from fastapi import FastAPI
from contextlib import asynccontextmanager, AsyncExitStack
from concurrent.futures import ThreadPoolExecutor
from alexber.utils.thread_locals import AsyncExecutionQueue, exec_in_executor, exec_in_executor_threading_future
import uuid
import uvicorn

EXECUTOR = None
QUEUE = None

@asynccontextmanager
async def lifespan(app: FastAPI):
async with AsyncExitStack() as stack:
global EXECUTOR, QUEUE
# Create and manage the ThreadPoolExecutor
EXECUTOR = stack.enter_context(ThreadPoolExecutor())
# Create and manage the AsyncExecutionQueue
QUEUE = await stack.enter_async_context(AsyncExecutionQueue(executor=EXECUTOR))
print("Application has started with ThreadPoolExecutor and AsyncExecutionQueue.")
yield
# Cleanup will happen here when the context exits
print("Application is shutting down, resources are cleaned up.")

app = FastAPI(lifespan=lifespan)

@app.get("/")
async def read_root():
task_id = str(uuid.uuid4())

await QUEUE.aadd_task(create_some_task)
#QUEUE.add_task(EXECUTOR, create_some_task)

print(f"Going to return {task_id}")
return {"task_id": task_id}

async def create_some_task():
print("start create_some_task async function!")
time.sleep(120) #mimic long-running blocking task
return 75

if __name__ == "__main__":
print("API MAIN Function Initialized!")
uvicorn.run(app, host="0.0.0.0", port=8080)

In lifespan()async context manager function, we’re creating EXECUTOR as ThreadPoolExecutor and QUEUE as AsyncExecutionQueue.

Note, that we’re passing EXECUTOR as parameter to QUEUE. AsyncExecutionQueue doesn’t manage EXECUTOR lifecycle.

Note: You can read more about Lifecycle of EXECUTOR here My exec_in_executor() API https://medium.com/@alex-ber/my-exec-in-executor-api-72797e232f99

Instead, it is explicitly managed by application code via AsyncExitStack.

Note: You can read more about ExitStack / AsyncExitStack in Side Note here My exec_in_executor() API https://medium.com/@alex-ber/my-exec-in-executor-api-72797e232f99

We’re storing EXECUTOR and QUEUE as global variables.

Note: The ThreadPoolExecutor and AsyncExecutionQueue are designed to handle concurrent operations, so it is ok, to store them as global variables.

Let’s go over the code. In QUEUE = AsyncExecutionQueue.__init__(...) in the example above, new asyncio.Queue() is created. EXECUTOR was created and is managed in lifespan() method and is passed as parameter into AsyncExecutionQueue.__init__(),so it will be used for running syncand/or async functions in another thread.

Now, on GET / call, we’re generating unique task_id. What we want to do now, that split our flow, on the first branch, we want to return task_id to the client (he will later use it to retrieve the result of the create_some_task() execution) and on the second branch we want fire-and-forget execution of create_some_task() somewhere in background. What is crucial, we don’t want to “join” to the result of create_some_task() execution. It is not Structured Concurrency, but this is done by design.

So, we’re using await QUEUE.aadd_task(create_some_task) for fire-and-forget create_some_task. I will go back to the syntax in a while. For now, it is sufficient to realize that we just putting create_some_task on the QUEUE and we’re not going to wait that it will be executing (even, we’re not waiting, that it will be start to be executed). The second branch isfire-and-forget execution of create_some_task(). The first just continue to run normally, without “joining” the second branch, it logs and returns task_id.

1. await QUEUE.aadd_task(create_some_task)

async def aadd_task(self, func, /, *args, **kwargs):
future = asyncio.get_running_loop().create_future()
await self.queue.put(((func, args, kwargs), future))
return future

As you see, future that is bound to currently running event loop is created. func context together with this future is put on the internal self.queue. await here means to await while the item is successfully put into internal data-structure of the asyncio.Queue. This operation is pretty fast. Than futureof type asyncio.Future (most-likely uncompleted) is return to the client code. He can, for example, store it anywhere and periodically check (busy-loop) if the future is complete. This check can be done by another endpoint. Of course, you can implement custom mechanism to retrieve the result of computation (of create_some_task in this example).

Alternatively, we could use:

2. QUEUE.add_task(EXECUTOR, create_some_task)

def add_task(self, executor, func, /, *args, **kwargs):
fut = exec_in_executor_threading_future(executor, self.aadd_task, func, *args, **kwargs)
return fut

This is alternative API. As you can see it leverage exec_in_executor_threading_future() primitive to execute self.aadd_task() inside executor. The future it returns is of type threading.Future). You can disregard it or store for later use like in p.1.

This API is handful, when you want to use AsyncExecutionQueuein sync context.

Now, let see what doesn’t work as expe

3. await exec_in_executor(EXECUTOR, create_some_task) #doesn’t work as expected

What will happen we will fire create_some_task() on some thread inside EXECUTOR, but we will await for the result of it’s calculation. That is not what we want.

4. exec_in_executor(EXECUTOR, create_some_task) #works with limitations

While technically this would work, it suffice for straightforward use cases where task management and tracking are not necessary. It allows for direct execution of functions or coroutines in a separate thread. This method has minimal overhead. However, it lacks built-in mechanisms for tracking task completion or managing task dependencies. It offers limited control over concurrency and resource usage and does not support graceful shutdown, meaning tasks may not be completed if the application is shut down abruptly.

5. exec_in_executor_threading_future(EXECUTOR, create_some_task) #work with limitations

Same as 4.

6. FastAPI/starlette BackgroundTask #doesn’t work as expected

Let’s take example from documentation: https://fastapi.tiangolo.com/tutorial/background-tasks/#dependency-injection

from fastapi import BackgroundTasks, Depends, FastAPI

app = FastAPI()


def write_log(message: str):
with open("log.txt", mode="a") as log:
log.write(message)


def get_query(background_tasks: BackgroundTasks, q: str | None = None):
if q:
message = f"found query: {q}\n"
background_tasks.add_task(write_log, message)
return q


@app.post("/send-notification/{email}")
async def send_notification(
email: str, background_tasks: BackgroundTasks, q: str = Depends(get_query)
):
message = f"message to {email}\n"
background_tasks.add_task(write_log, message)
return {"message": "Message sent"}

FastAPI application typically running in a single-threaded environment, the background task will run in the same thread as the main application. While the background task is executing, the server will not be able to process other requests in that thread. This can lead to delays in handling new requests if the background task is long-running.

FastAPI may run with multiple workers or threads (e.g., using uvicorn with multiple workers). In such cases, while one worker is busy executing a background task, other workers can continue to handle incoming requests. This setup mitigates the impact of background tasks on the server's ability to handle new requests. But usually, you have small number of running workers, say k, so, if /send-notification/{email} was called more than k times you will still found that server is not be able to process other requests.

AsyncExecutionQueue

The AsyncExecutionQueue offers structured task management, allowing for easy addition, execution, and completion tracking of tasks. It provides concurrency control, enabling you to manage the number of concurrent tasks and resource usage effectively. Additionally, it supports graceful shutdown, ensuring all tasks are completed before the application shuts down. This approach is scalable, as you can adjust the executor or queue size to meet your needs. However, it introduces complexity and overhead due to queue management and task scheduling. It also requires careful resource management to avoid bottlenecks or excessive memory usage.

AsyncExecutionQueue implements asynchronous producer-consumer pattern. It is also async context manager.

Async context manager

This is relevant code. Dosctring are ommited for brevity:

def __init__(self, **kwargs):
self.queue = kwargs.pop("queue", None)
if not self.queue:
self.queue = asyncio.Queue()
self.executor = kwargs.pop("executor", None)
super().__init__(**kwargs)

async def __aenter__(self):
self.worker_task = asyncio.create_task(self.worker())
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.aclose()

async def aclose(self):
await self.queue.put((_CLOSE_SENTINEL, None))
await self.worker_task

So, in __init__() method you can pass configured asyncio.Queue.If not passed, the default one will be created. You can also explicitly pass executor. Anyway, the executor is resolved in the following order:

  1. If the executor parameter is provided, it is used.
  2. if an executor was passed via initConfig(), it is used.
  3. If neither is set, None is used, which means the default asyncio executor will be used.

Note: as a side fact, threads in the executor may have an event loop attached. This allows for the execution of asynchronous tasks within those threads. See What are the differences? here My exec_in_executor() API https://medium.com/@alex-ber/my-exec-in-executor-api-72797e232f99 for more details.

Quick summary, in __init__() you can customize or use defaults for executor and asyncio.Queue.

__aenter__() is called in async with AsyncExecutionQueue(...) as queue after __init__(). Note, that last line is return self, this means that queue variable above will hold AsyncExecutionQueue. This is typical pattern for context-managers.It also creates Task from self.worker() (this is actually consumer), and assigns it to self.worker_task (for future use in aclose()for graceful shutdown).

__aexit__() will be called leaving async with AsyncExecutionQueue(...) as queue. It delegates resource cleanup to aclose() async function.

aclose() — is designed to be called implicitly from __aexit__(), but explicit call is also supported. It first, put on the asyncio.queue _CLOSE_SENTINEL and awaits until self.worker_task will be completed. More no this below.

Producer

async def aadd_task(self, func, /, *args, **kwargs):
future = asyncio.get_running_loop().create_future()
await self.queue.put(((func, args, kwargs), future))
return future

threading.Feature can be standalone, can be not binding to any ThreadPool, among other things. asyncio.Future is always binding to the event loop. Without loss of generality, lets assume that aadd_task() is called from the MainThread, so asyncio.get_running_loop() is MainThread’s event loop. We’re creating asyncio.Future that will be schedule on this event loop and we can, if we like await on it later, but typically, we will disregard it and use some custom mechanism to get indication whether it finished and if so what is the result.

await self.queue.put(((func, args, kwargs), future))

Here we’re putting on asyncio.queue pair, second element of it is asyncio.Future described above, and first is what sync/async function and it’s arguments to run. We’re await-ing while the arguments are safely placed into asyncio.queue. When is done, we can immediately proceed. As said above we’re returning asyncio.Future that can be used to retrieve result, but typically will be disregarded.

Note:
1
. I want to emphasized asyncio.queue is a tool that enables decoupling between producers and consumers. For example, if we os.cpu_count() equal to 8, default max_workers of ThreadPoolExecutor, will be min(32, (os.cpu_count() or 1) + 4)=12. So, you can have 1producer and up to 12 concurrent consumers.Tasks aadd_task() and worker(), without loss of generality, are scheduled on MainThread’s event loop. They are very short running. More on this below.
2.
1. In threading world queue.Queue is used for decoupling between producers and consumers. There you also must have at least one additional Thread(the best practice, of course, is to use ThreadPoolExecutor; you can start running with 1 thread and adjust this parameter as you go).
3.
In asyncio world if you now, that your task are short or use heavily async I/O, you can rely only on 1 event loop without any additional Thread. If your task are CPU bound, it is better to run it in ThreadPoolExecutor.

So, producer, creates asycnio.Future on the, without loss of generality, MainThread’s event loop, (fast operation) puts func, args, kwars and future on the asycnio.Queue (fast operation) and returns future (fast operation).

Consumer

async def worker(self):
while True:
task, task_future = await self.queue.get()
try:
if task is _CLOSE_SENTINEL:
return # Exit the worker loop
func, args, kwargs = task
result_future = exec_in_executor(self.executor, func, *args, **kwargs)
result_future.add_done_callback(lambda fut: chain_future_results(fut, task_future))
finally:
# Mark the task as done, regardless of what the task was
self.queue.task_done()

This is endless loop that gets item from the asyncio.Queue(when it availble), does some computation and sends acks to asyncio.Queue to throw it away from the asyncio.Queue.It does it regardless whether compuation raised exception or not.

worker() tasks starts running when MainThread reaches async with AsyncExecutionQueue(...) as queue point before "as queue". It passes “while True” check and immideately awaits on self.queue.get().

Afterproducer (self.aad_task()) will send item to the self.queue, next time when worker() task will have opportunity to run self.queue.get() will actually return the item and worker() will make a computation.

Each item in the self.queue is a pair, task and task_future. Quick reminder, without lose of generality, task_future is bound to MainThread’s event loop, worker(), aad_task() also, without lose of generality, runs on the same MainThread’s event loop.

Now, if task is sentinel value _CLOSE_SENTINEL (aclose() coroutine that is, technically, also producer, sends it), worker() task returns from the worker() task right away. (aclose() coroutine will await to ensure that it ends).

So, _CLOSE_SENTINEL is specical value to stop the consumer- worker() coroutine in our case.

func, args, kwargs = task
result_future = exec_in_executor(self.executor, func, *args, **kwargs)
result_future.add_done_callback(lambda fut: chain_future_results(fut, task_future))

def chain_future_results(source_future: FutureType, target_future: FutureType):
try:
result = source_future.result()
target_future.set_result(result)
except Exception as e:
target_future.set_exception(e)

If self.queue doesn't contain special value, it’s first component can be expanded as func, arg, kwargs (line 1 above).

So, we want to run func with arg, kwargs and returns the result of it’s running to producer.

The challange here is that we know nothing about the nature of func. It can be well-behaved and can be not. I remind you that without lose of generality,worker()runs on the MainThread’s event loop. So we don’t want to accedantely block the event loop.

Here exec_in_executor() API, see https://medium.com/@alex-ber/my-exec-in-executor-api-72797e232f99, go to the rescue.

Note that there is no immideate await after exec_in_executor() call. Instead the result (asyncio.Future) is assinging to the variable result_future. This variable (a monad if you like) represents the value of calculation that will be available in future.

So, we have task_future from asyncio on which AsyncExecutionQueue’s client might await, and we have result_future also from asyncio that exec_in_executor() returned and on which result of execution func with *args and **kwargs will be eventually available.

What we want to do is, whener result_future.result() is available, if it is usual value, it will be set via task_future.set_result() and if it is exception, that it will be reraised, caught and set via task_future.set_exception(). This is exactely what chain_future_results() function do.

Last line I need to explain is:

result_future.add_done_callback(lambda fut: \
chain_future_results(fut, task_future))

Method result_future.add_done_callback() provide ability to register callback that will be fired whener result_future is done.

Note: Quick reminder, without lose of generality, result_future is bound to MainThread’s event loop, task_future is bound to MainThread’s event loop, worker(), aad_task() also are running on the same MainThread’s event loop. So, add_done_callback() will be triggered when nessecery as expected.

In this stateit is guranteed that result_future.result() is holds either usual value or exception. When the future completes, the callback is invoked, and the future instance (result_futurein our case) that completed is passed as an argument to the callback.

Note: It is tempting to use result_future directely inside callback. The proposed change introduces a potential issue by relying on the outer variable result_future instead of the callback argument fut. In asynchronous programming, result_futurecould be reassigned or modified before the callback executes, leading to incorrect or unintended behavior. The add_done_callback method guarantees that the completed Future is passed as an argument (fut) to the callback. Ignoring this and referencing futdirectly assumes that the variable will always point to the correct Future, which may not be true in dynamic or concurrent scenarios. Using fut explicitly ensures the callback always operates on the correct Future that triggered it, avoiding ambiguity or errors caused by variable reassignment. This makes the callback more robust, clear, and less prone to bugs.

So,

result_future.add_done_callback(lambda fut: \
chain_future_results(fut, task_future))

When result_future is completed, callback will be called where fut as Future in completed state, will be passed. This callback will call chain_future_results() that will “chain” result_future in completed state to task_future, making result of func execution available to the AsyncExecutionQueue’s client.

Appendix A: Backpressure in asyncio.Queue

asyncio.Queue is explicitly designed for asynchronous systems, where tasks (coroutines) are suspended using await. The backpressure mechanism in asyncio.Queue is more explicit in that it uses the await keyword, which suspends the task until the operation can proceed (i.e., when space is available in the queue or when an item is available to consume). This provides a more cooperative multitasking model where the event loop can continue running other tasks while a specific task is waiting.

In the context of asyncio.Queue, backpressure is achieved through the following features:

  1. Queue Size Limitation:
    When creating a queue, you can specify a maximum size (maxsize). If the queue is full and the producer tries to put an item into the queue, it will wait until there is space available. This prevents the producer from overwhelming the queue and the consumer.
    queue = asyncio.Queue(maxsize=10)
    If the queue is full (i.e., the number of items in the queue equals maxsize), the put() method will await until space is available.
  2. Consumer Waiting:
    If the queue is empty, the consumer will have to await for an item to be available. The consumer can use await queue.get() to asynchronously wait for an item, and it will not block the event loop.
    item = await queue.get()
  3. asyncio.Queue Awaiting Behavior:
    When the queue reaches its maximum size, the producer is awaits until there is space.
    When the queue is empty, the consumer is awaits until there is data to consume.

Code example:

import asyncio

async def producer(queue):
for i in range(20):
print(f"Producing item {i}")
await queue.put(i) # This will await if the queue is full
await asyncio.sleep(0.1) # Simulate some async work

async def consumer(queue):
while True:
item = await queue.get() # This will await if the queue is empty
print(f"Consuming item {item}")
await asyncio.sleep(0.2) # Simulate processing the item

async def main():
queue = asyncio.Queue(maxsize=5)
await asyncio.gather(producer(queue), consumer(queue))

# Run the asyncio event loop
asyncio.run(main())

In this example:

  • The producer will await if the queue reaches a size of 5 (because maxsize=5).
  • The consumer will await if the queue is empty.

asyncio.Queue provides an implicit backpressure mechanism by awaiting the producer when the queue is full and awaiting the consumer when the queue is empty, ensuring that both sides do not overwhelm each other. This is how the backpressure is handled asynchronously without explicitly needing a separate mechanism.

Appendix B: Backpressure in queue.Queue

The queue.Queue in Python's standard library (from the queue module) does not offer a direct or explicit backpressure mechanism like asyncio.Queue, but it exhibits similar behavior through blocking operations.

queue.Queue is primarily designed for synchronous, thread-based programs (using the threading module). While it lacks the asynchronous features of asyncio.Queue, it still manages flow control in a comparable way by allowing the producer and consumer to block when needed:

  1. Queue Size Limitation:
    Just like asyncio.Queue, you can set a maxsize for the queue. When the queue is full, the producer will block when trying to put an item into the queue, waiting until space becomes available.
    queue = queue.Queue(maxsize=10)
    If the queue is full, calling queue.put(item) will block (by default) until there is space.
  2. Blocking Behavior:
  • Producer (Blocking on Put): When the queue is full, calling queue.put(item) will block the producer thread. This is a form of backpressure on the producer, as it slows down the production of items when the queue is unable to handle more.
  • Consumer (Blocking on Get): When the queue is empty, calling queue.get() will block the consumer thread until there is an item to consumer.

The queue.Queue provides block as a parameter for both put and get methods to allow controlling this blocking behavior:

  • queue.put(item, block=True, timeout=None): Blocks by default when the queue is full.
  • queue.get(block=True, timeout=None): Blocks by default when the queue is empty.

Code example:

import threading
import queue
import time

def producer(q):
for i in range(20):
print(f"Producing item {i}")
q.put(i) # Blocks if the queue is full
time.sleep(0.1)

def consumer(q):
while True:
item = q.get() # Blocks if the queue is empty
print(f"Consuming item {item}")
time.sleep(0.2)

def main():
q = queue.Queue(maxsize=5) # Set a maximum size for the queue
producer_thread = threading.Thread(target=producer, args=(q,))
consumer_thread = threading.Thread(target=consumer, args=(q,))

producer_thread.start()
consumer_thread.start()

producer_thread.join()
consumer_thread.join()

if __name__ == "__main__":
main()

In this example:

  • The producer will block if the queue reaches a size of 5 (because maxsize=5).
  • The consumer will block if the queue is empty.

Summary:

queue.Queue does provide a form of backpressure, but in a synchronous blocking way rather than asynchronously like asyncio.Queue.

  • If the queue is full, the producer will block (wait) when calling put.
  • If the queue is empty, the consumer will block (wait) when calling get.

This mechanism helps manage flow control, but it is not as flexible or sophisticated as the asynchronous backpressure mechanisms provided by asyncio.Queue. It is best suited for threaded programs rather than asynchronous event loops.

--

--

alex_ber
alex_ber

Written by alex_ber

Senior Software Engineer at Pursway

No responses yet