How to call I/O bound operation from async context

alex_ber
7 min readJun 19, 2024

--

I’m going to talk about async/await syntax in Python. More specifically it’s usage via asyncio Python’s built-in module.

TLDR; Sometimes, see below, you want to call some “sync” "function" (“regular” function foo) from async context. Just do:

# execute a function in a separate thread
import asyncio

await asyncio.to_thread(foo)

I assume that the reader knows well how to use it. I want to focus on scenario where you want to call some sync blocking function from async context. There is many literature about it, this https://dev.to/hackerculture/python-asyncio-a-guide-to-asynchronous-programming-43j2 is just one example.

So, let’s suppose that you have some io_bound_operation_foo() that is defined as regular/sync function and you want to use it.

The easiest way to do it is … to find it’s async alternative. Maybe it sits just near io_bound_operation_foo(), maybe you can use another package that has async variant, may be you can refactor it yourself to be async. Or may be there is variant of io_bound_operation_foo()with non-blocking I/O. For example, aiohttp. As a side note: a lot of popular libraries are rewriting to facilitate non-blocking I/O, you should prefer to use them when they are available.

Ok. But if all that fails. You have some 3rd-party package with io_bound_operation_foo() that can’t be changed (it can also has calls to multiple function inside that 3rd-party package, so you can’t just duplicate & refactor it).

Now, the bad news. If you have CPU-bound operation, the recipe below will not help you. Technically, it will seems ok, but you will not gain a lot from it. I will expand upon this below.

Old-way of doing things.

To call a io_bound_operation_foo() using asyncio, you can use the run_in_executor method provided by the asyncio event loop. This method allows you to run a blocking function in a separate thread or process, so it doesn't block the main event loop.

Here’s a basic example of how you can do this:

import asyncio
from concurrent.futures import ThreadPoolExecutor

# Assume this is the blocking I/O operation from the third-party library
def blocking_io_operation():
# Simulate a blocking I/O operation
import time
time.sleep(5)
return "Blocking I/O operation result"

async def main():
loop = asyncio.get_running_loop()
# Run the blocking I/O operation in a separate thread
with ThreadPoolExecutor() as pool:
result = await loop.run_in_executor(pool, blocking_io_operation)
print(result)

# Run the main function
asyncio.run(main())

This pattern was useful when integrating with 3rd-party libraries that perform blocking I/O operations, allowing you to leverage concurrency without blocking the main event loop in asyncio-based applications.

In this example:

  • blocking_io_operation() is a synchronous function that performs a blocking I/O operation.
  • asyncio.get_event_loop()Creates a new event loop if one does not exist in the main thread context. Raises a RuntimeError if called from a non-main thread without an existing event. This is useful for cases where you want to ensure there is an event loop available, such as in scripts or applications that initialize their own event loop.
  • main is an asynchronous function that uses loop.run_in_executor to run blocking_io_operation in a separate thread.
  • ThreadPoolExecutor is used to manage the pool of threads.

What problem does this code has.

  1. No “local thread” or more broadly contextvars does not propagate to another thread. This can be easily fixed.
  2. We’re creating new ThreadPoolExecutor each time we want to call blocking_io_operation(), waste of resources. Can also be fixed, but more challenging to do it right.
  3. If main thread didn’t have event-loop already running it will be created as side-effect, that can cause potential problem in another parts of the application.
  4. GIL. I will go back to this one later.

Recommended way

Quote:

We often need to execute a blocking function call within an asyncio application because, in practice, most workloads include a mix of IO-bound operations and also CPU-bound operations.

This could be for many reasons, such as:

* To execute a CPU-bound task like calculating something.

* To execute a blocking IO-bound task like reading or writing from a file.

* To call into a third-party library that does not support asyncio yet.

Making a blocking call directly in an asyncio program will cause the event loop to stop while the blocking call is executing. It will not allow other coroutines to run in the background.

This can be prevented by running the blocking call outside of the event loop, which we can do with asyncio.to_thread().

# execute a function in a separate thread
import asyncio

await asyncio.to_thread(blocking_io_operation)

How to run blocking task with asyncio

The asyncio.to_thread() function takes a function to execute and any arguments. It returns a coroutine that can be awaited or scheduled as an independent task. The function is then executed in a separate thread.

https://dev.to/hackerculture/python-asyncio-a-guide-to-asynchronous-programming-43j2

Let’s dive in asyncio.to_thread() implementation

This is the code:

import functools
import contextvars

from . import events


__all__ = "to_thread",


async def to_thread(func, /, *args, **kwargs):
"""Asynchronously run function *func* in a separate thread.

Any *args and **kwargs supplied for this function are directly passed
to *func*. Also, the current :class:`contextvars.Context` is propagated,
allowing context variables from the main thread to be accessed in the
separate thread.

Return a coroutine that can be awaited to get the eventual result of *func*.
"""
loop = events.get_running_loop()
ctx = contextvars.copy_context()
func_call = functools.partial(ctx.run, func, *args, **kwargs)
return await loop.run_in_executor(None, func_call)

With some GIL-related caveat, asyncio.to_thread() ensures that the main event loop remains responsive by offloading blocking operations to separate threads.

First of all all “thread local” (more precisely state (such as variables) across asynchronous tasks and threads) variables propagates to the new thread. (This is contextvars thing). In old way we’re creating new ThreadPoolExecutor each time we call our blocking_io_operation(). Here default (always available) ThreadPoolExecutor is used (this is None in loop.run_in_executor(None, func_call) is responsible for).

This address p. 1–2 above. Now, please note that old-way code uses asyncio.get_running_loop(), while this code uses asyncio.events.get_running_loop(). Let’s compare them side-by-side.

asyncio.get_event_loop()Creates a new event loop if one does not exist in the main thread context. Raises a RuntimeError if called from a non-main thread without an existing event loop. This is useful for cases where you want to ensure there is an event loop available, such as in scripts or applications that initialize their own event loop.

events.get_running_loop()Does not create a new event loop. It only retrieves the existing running loop and raises an error if none is running. Raises a RuntimeError if no event loop is currently running. Strictly retrieves the loop running in the current thread. This is typically used in contexts where you are working within an existing asynchronous environment (e.g., inside an async function or task) and you need to access the running event loop.

So, they pretty close, but crucially events.get_running_loop() doesn't alter global application state. It is up to you to ensure that some running loop was spin.

GIL

TLDR; It doesn’t mater whether we’re using old way with explicit ThreadPoolExector or recommended way with implicit and default, the limitation of GIL on it remains the same. From practical perspective, both solution for CPU-bound tasks will work, but will have some performance degradation (depends on the specific task that you’re implemented). But if you have I/O-bound tasks or some other task that release GIL (see below) it will work perfectly fine.

Quote:

ThreadPoolExecutor vs. the Global Interpreter Lock

The presence of the GIL in Python impacts the ThreadPoolExecutor.

The ThreadPoolExecutor maintains a fixed-sized pool of worker threads that supports concurrent tasks, but the presence of the GIL means that most tasks will not run in parallel.

You may recall that concurrency is a general term that suggests an order independence between tasks, e.g. they can be completed at any time or at the same time. Parallel might be considered a subset of concurrency and explicitly suggests that tasks are executed simultaneously.

The GIL means that worker threads cannot run in parallel, in most cases.

Specifically, in cases where the target task functions are CPU-bound tasks. These are tasks that are limited by the speed of the CPU in the system, such as working no data in memory or calculating something.

Nevertheless, worker threads can run in parallel in some special circumstances, one of which is when an IO task is being performed.

https://superfastpython.com/threadpoolexecutor-vs-gil/

Quotes:

The lock [GIL] is explicitly released and re-acquired periodically by each Python thread, specifically after approximately every 100 bytecode instructions executed within the interpreter. This allows other threads within the Python process to run, if present.

The lock is also released in some circumstances, allowing other threads to run.

An important example is when a thread performs an I/O operation, such as reading or writing from an external resource like a file, socket, or device.

Luckily, many potentially blocking or long-running operations, such as I/O, image processing, and NumPy number crunching, happen outside the GIL. Therefore it is only in multithreaded programs that spend a lot of time inside the GIL, interpreting CPython bytecode, that the GIL becomes a bottleneck.

Global Interpreter Lock, Python Wiki.

The lock is also explicitly released by some third-party Python libraries when performing computationally expensive operations in C-code, such as many array operations in NumPy.

https://superfastpython.com/threadpool-gil/#

You can read more here https://realpython.com/python-parallel-processing/ As a bonus, there are some ways to circumvent the GIL.

So, for I/O-bound tasks, the GIL does not significantly affect the performance when using a ThreadPool. The GIL is released during blocking I/O operations, allowing other threads to run and improving concurrency. CPU-bound task will not run in parallel because of GIL. But such tasks are also not suitable to be awaited on.

--

--