asyncio.run_coroutine_threadsafe API

alex_ber
10 min readDec 3, 2024

--

My complementary API for it

When using SQLAlchemy with psycopg2 and gevent, the typical setup involves running your application in a single-threaded environment where gevent can manage concurrency using greenlets. This setup is effective for handling I/O-bound tasks, such as database operations, in a non-blocking manner.

In this scenario, you would initialize and run your database operations using SQLAlchemy and psycopg2 in the MainThread. The MainThread runs the event loop, managing greenlets for concurrent operations.

If you have an additional thread for some other purpose, such as handling CPU-bound tasks or integrating with a library that requires a separate thread, this additional thread does not benefit from gevent’s concurrency model. Gevent is designed to work with the event loop in the MainThread, and its concurrency model is not intended to work across multiple threads. If you attempt to use gevent in a non-MainThread, it can lead to unexpected behavior and is generally not recommended.

The source code you can found here. It is available as part of my AlexBerUtils s project.

You can install AlexBerUtils from PyPi:

python -m pip install -U alex-ber-utils

See here for more details explanation on how to install.

But! You can use asyncio.run_coroutine_threadsafe()API to schedule a coroutine to run on an event loop that is running in the MainThread, even if you’re calling it from a different thread. It looks like this is a way to go. Let’s have a closer look:

def run_coroutine_threadsafe(coro, loop):
"""Submit a coroutine object to a given event loop.
Return a concurrent.futures.Future to access the result.
"""

There are 3 things that spotted my view:

  1. Why this is regular/sync function?
  2. What is the loop- object? If I’m in non-MainThread, how can I obtain it?
  3. coro — obviously means coroutine. What about *args and **kwargs?

1. Why this is regular/sync function?

This function returns threading.Future (not exactly, more on this below).

This function just scheduled coroutine to run, it doesn’t call the coro itself. So, this is fast. And thread_feature is easily convertible to asyncio_future by

asyncio_future = asyncio.wrap_future(threading_future, loop)

where loop in typical cases can be provided as None, running event loop will be used in this case.

See Appendix A for more details.

2. What is the loop- object? If I’m in non-MainThread, how can I obtain it?

I can think only on the solution when on application startup, loop is obtained (by asyncio.get_running_loop(), for example) and stored in some global variable.

So, this what my API do. You have to call initConf() method of alexber.utils.thread_local module, you can omit all parameters. The call should be done from the MainThread when there is running event loop. Internally, it will retrieve event loop (of theMainThread) and store in global variable. Later run_coroutine_threadsafe() will use it for call to asyncio.get_running_loop().

I also added get_main_event_loop() to give the ability to retrieve it and use in different asyncio’s APIs.

Note: In many APIs passing None as loop means to use asyncio.get_running_loop().

3. coro — obviously means coroutine. What about *args and **kwargs?

The short answer, because it seems to be redundant. I will explain why in a moment, now I want to point out that in my API I decided to take the parameters as usual *args and **kwargs.

The long answer. In Python, when you define an async function, it is considered a coroutine function. However, when you call this coroutine function, it is not called, but returns a coroutine object, which is what you actually await.

Here’s a breakdown of the behavior:

  1. Coroutine Function: This is the function itself, defined with async def. It is of type <class 'function'> but is recognized as a coroutine function by inspect.iscoroutinefunction().
  2. Coroutine Object: When you call a coroutine function (e.g., sample_coroutine(2, 3) below), it returns a coroutine object. This object is what you await to execute the coroutine. It is of type <class 'coroutine'>.

Example:

import asyncio
import inspect

async def sample_coroutine(x, y):
await asyncio.sleep(1)
return x * y

async def main():
ref_func = sample_coroutine
ref_coroutine = ref_func(2, 3)

print(f"ref_func is a coroutine function: {inspect.iscoroutinefunction(ref_func)}")
print(f"Type of ref_func: {type(ref_func)}")
print(f"Type of ref_coroutine: {type(ref_coroutine)}")

asyncio.run(main())

This will output:

ref_func is a coroutine function: True
Type of ref_func: <class 'function'>
Type of ref_coroutine: <class 'coroutine'>

In this example:

  • ref_func is a reference to the coroutine function sample_coroutine. It is seen as a function because it is the function object itself, not yet called.
  • ref_coroutine is the result of calling ref_func(2, 3), which is a coroutine object. This is what you would await to perform the asynchronous operation.

As you can see asyncio.run_coroutine_threadsafe() expect for the caller to pass “ref_func(2, 3)” as coro.

Note: If you want to run on currently running event loop, you can obtain it by calling asyncio.get_running_event_loop() or, even better, to use loop.create_task().

And now comes the trickier part.

Suppose, you are in running event loop on some non-MainThreadin async context. How to retrieve the answer from threading future in non-blocking way?

The naïve solution is chain base_future with asyncio_future the same way as we did in run_coroutine_threadsafe():

asyncio_future = loop.create_future()
threading_future.add_done_callback(lambda fut: chain_future_results(fut, asyncio_future))

But it doesn’t work. I’m not 100% sure why. It seems it is due to the fact that threading_future was created in one thread, MainThread in our case; this is the thread where loop = _EVENT_LOOP runs; and we’re running on non-MainThread, and we’re adding callback to in another one thread, so event loop ofMainThread is unaware of it.

So, what can you do? There is asyncio.wrap_future() function.It “knows” how to convert threading.Future to asyncio.Future. So, you can use and get asyncio.Future. Than, if you want, you can the answer by simply awaiting on it.

Note: You should use correct loop paramaeter of asyncio.wrap_future(). asyncio.Future that will be returned will be bound to provided loop. Typically, you want to bound asyncio.Future to currently running event loop. In this case you can provide None, it will be resolved as currently running event loop. If you want event loop of MainThread you can use get_main_event_loop().

Appendix A: deep-dive to asyncio.wrap_future()

Let’s start by seeing the code:

def wrap_future(future, *, loop=None):
"""Wrap concurrent.futures.Future object."""
if isfuture(future):
return future
assert isinstance(future, concurrent.futures.Future), \
f'concurrent.futures.Future is expected, got {future!r}'
if loop is None:
loop = events._get_event_loop()
new_future = loop.create_future()
_chain_future(future, new_future)
return new_future

Ok, here is the actual isfuture() code:

def isfuture(obj):
"""Check for a Future.

This returns True when obj is a Future instance or is advertising
itself as duck-type compatible by setting _asyncio_future_blocking.
See comment in Future for more details.
"""
return (hasattr(obj.__class__, '_asyncio_future_blocking') and
obj._asyncio_future_blocking is not None)

So, if future is asyncio.Future, it just returned.

Then, there is assert that future is of type of concurrent.futures.Future, if not AssertionError will be raised.

loop parameter is None by default and in order to supply it should be passed by name (loop=loop). This is a loop to which we want to tie our newly created asyncio.Future. If None, I believe, this is typical case, loop will be call to events._get_event_loop() to get the loop. Here is it’s source code:

def _get_event_loop(stacklevel=3):
# This internal method is going away in Python 3.12, left here only for
# backwards compatibility with 3.10.0 - 3.10.8 and 3.11.0.
# Similarly, this method's C equivalent in _asyncio is going away as well.
# See GH-99949 for more details.
current_loop = _get_running_loop()
if current_loop is not None:
return current_loop
return get_event_loop_policy().get_event_loop()

I have no idea what stacklevel=3 is doing here, but the rest of the code is pretty obvious. The function is used to get the current event loop. It first tries to find a running event loop and, if none is found, it retrieves the default event loop as defined by the current event loop policy. This is internal function and it was deprecated and is maintained temporarily to ensure compatibility with earlier Python 3.10 and 3.11 versions.

The reason that it is deprecated is slow migration from get_event_loop() that can in some conditions, see above, create new loop implicitly, to get_running_loop() that demands that running event loop was set beforehand, otherwise it raises exception.

So, loop = events._get_event_loop() will typically return running event loop or,if not exists, will create one.

new_future = loop.create_future() — creates asyncio.Future tied to the event loop.

_chain_future(future, new_future) — this is the most trickest past. I’ve expected that this will be something along the lines of chain_future_results(), but this is how actual source code looks like:

def _chain_future(source, destination):
"""Chain two futures so that when one completes, so does the other.

The result (or exception) of source will be copied to destination.
If destination is cancelled, source gets cancelled too.
Compatible with both asyncio.Future and concurrent.futures.Future.
"""
if not isfuture(source) and not isinstance(source,
concurrent.futures.Future):
raise TypeError('A future is required for source argument')
if not isfuture(destination) and not isinstance(destination,
concurrent.futures.Future):
raise TypeError('A future is required for destination argument')

source_loop = _get_loop(source) if isfuture(source) else None
dest_loop = _get_loop(destination) if isfuture(destination) else None

def _set_state(future, other):
if isfuture(future):
_copy_future_state(other, future)
else:
_set_concurrent_future_state(future, other)

def _call_check_cancel(destination):
if destination.cancelled():
if source_loop is None or source_loop is dest_loop:
source.cancel()
else:
source_loop.call_soon_threadsafe(source.cancel)

def _call_set_state(source):
if (destination.cancelled() and
dest_loop is not None and dest_loop.is_closed()):
return
if dest_loop is None or dest_loop is source_loop:
_set_state(destination, source)
else:
if dest_loop.is_closed():
return
dest_loop.call_soon_threadsafe(_set_state, destination, source)

destination.add_done_callback(_call_check_cancel)
source.add_done_callback(_call_set_state)


def _copy_future_state(source, dest):
"""Internal helper to copy state from another Future.

The other Future may be a concurrent.futures.Future.
"""
assert source.done()
if dest.cancelled():
return
assert not dest.done()
if source.cancelled():
dest.cancel()
else:
exception = source.exception()
if exception is not None:
dest.set_exception(_convert_future_exc(exception))
else:
result = source.result()
dest.set_result(result)


def _set_concurrent_future_state(concurrent, source):
"""Copy state from a future to a concurrent.futures.Future."""
assert source.done()
if source.cancelled():
concurrent.cancel()
if not concurrent.set_running_or_notify_cancel():
return
exception = source.exception()
if exception is not None:
concurrent.set_exception(_convert_future_exc(exception))
else:
result = source.result()
concurrent.set_result(result)

def _convert_future_exc(exc):
exc_class = type(exc)
if exc_class is concurrent.futures.CancelledError:
return exceptions.CancelledError(*exc.args)
elif exc_class is concurrent.futures.TimeoutError:
return exceptions.TimeoutError(*exc.args)
elif exc_class is concurrent.futures.InvalidStateError:
return exceptions.InvalidStateError(*exc.args)
else:
return exc


def _get_loop(fut):
# Tries to call Future.get_loop() if it's available.
# Otherwise fallbacks to using the old '_loop' property.
try:
get_loop = fut.get_loop
except AttributeError:
pass
else:
return get_loop()
return fut._loop

Here we see all mechanic of copying state from threading.Future to asyncio.Future, and handling corner-cases like what if asyncio.Future's event loop is closed or what if threading.Future is canceled.

Note: Quote: “Compatible with both asyncio.Future and concurrent.futures.Future. This code is more general that asyncio.wraps() needs, I will stick with this use-case only.

So, _call_check_cancel is added as done callback to destination. If asyncio.Future is cancelled we want “to propagate” cancellation signal back to threading.Future. When asyncio.Future is done, _call_check_cancel() will be called.

It checks, if destination was cancelled, it will cancel the source.

If source_loop exists and is different from dest_loop, it will schedule to call the source.cancel() on the source_loop (call_soon_threadsafe()).

If source_loop doesn’t exist source.cancel() will be called directly.

In the code there is also case when source_loop exists and source_loop are actually the same as dest_loop . In this case it is crucial that the call to _chain_future() is done from the same event loop, otherwise the call source.cancel() will not be safe.

_get_loop() and _convert_future_exc() are pretty straight-forward.

_copy_future_state() and _set_concurrent_future_state() looks similar, but in our case _copy_future_state() is relevant.

So, _call_set_state is added as done callback to source. When threading.Future is done we want “to propagate” done signal forward to asyncio.Future.

When threading.Future is done, _call_set_state() will be called.

It checks, if destination is cancelled and there is dest_loop that is closed, nothing will be done.

If dest_loop exists and closed and is not the same as source_loop, nothing will be done.

If dest_loop exists not closed and is the same as source_loop, _set_state(destination, source) will be called directly. In this case it is crucial that the call to _chain_future() is done from the same event loop, otherwise the call _set_state(destination, source)will not be safe.

If dest_loop exists and not closed and is not the same as source_loop, it will schedule to call the _set_state(destination, source) on the dest_loop (call_soon_threadsafe()).

def _set_state(future, other), as you can see above future=destination (asyncio.Future), other=source (threading.Future). It is unclear for me why the order of parameters was changes.

Anyway, first line isfuture(future) (as I described above it checks whether it is asyncio.Future), in the case that we’re interesting it it will be False. So, _copy_future_state().

_copy_future_state() first asserts that source is done. it should be true if _copy_future_state() was called from add_done_callback() of the source.

Than if destination is cancelled, nothing will be done.

If source is done because it was cancelled, destination will be cancelled.

Otherwise, this code will be executed:

exception = source.exception()
if exception is not None:
dest.set_exception(_convert_future_exc(exception))
else:
result = source.result()
dest.set_result(result)

def _convert_future_exc(exc):
exc_class = type(exc)
if exc_class is concurrent.futures.CancelledError:
return exceptions.CancelledError(*exc.args)
elif exc_class is concurrent.futures.TimeoutError:
return exceptions.TimeoutError(*exc.args)
elif exc_class is concurrent.futures.InvalidStateError:
return exceptions.InvalidStateError(*exc.args)
else:
return exc

This is pretty straightforward.

Before coming up to the surface, lets look on _set_concurrent_future_state(). The difference is how they checks if source is canceled and how exactly destination cancellation is done.

In _copy_future_state() we have:

if dest.cancelled():
return

If destination is cancelled the code, simply returns from _copy_future_state().

In _set_concurrent_future_state():

if not concurrent.set_running_or_notify_cancel():
return

The method set_running_or_notify_cancel() is part of the concurrent.futures.Future class in Python's concurrent.futures module. This method is used internally by executors or by the system that manages the future, rather than by users directly. It is designed to handle the transition of a future's state in a thread-safe manner.

Purpose of set_running_or_notify_cancel()

The set_running_or_notify_cancel() method serves two primary purposes:

  1. Set the Future as Running: If the future has not been cancelled, this method sets the state of the future to running. This is important to prevent other operations from starting the same future again and to signal that the future’s operation is in progress.
  2. Notify if Cancelled: If the future has been cancelled before this method is called, it ensures that the cancellation is acknowledged, and no further action is taken on this future. This helps in managing the cancellation of asynchronous tasks effectively.

Usage Context

This method is typically used by the executor (like ThreadPoolExecutor or ProcessPoolExecutor) when a task associated with the future is about to start execution. It checks if the future has been cancelled between the time it was scheduled and the time it is supposed to run. If it hasn't been cancelled, it marks the future as running.

Example Scenario

Here’s a conceptual example to illustrate how this might be used internally:

from concurrent.futures import Future

def task():
future = Future()
# Simulate checking and setting the future's state before running the task
if not future.set_running_or_notify_cancel():
print("Future was cancelled, stopping execution.")
return
# Proceed with the task
print("Executing task...")
# Set result upon completion
future.set_result("Task completed successfully.")

# This would typically be part of the executor's logic

Note: As a library developer or a user of the concurrent.futures module, you generally do not need to call set_running_or_notify_cancel() directly. This method is meant to be used by the infrastructure that manages the execution of futures, such as the executors provided by the concurrent.futures module.

So,

if not concurrent.set_running_or_notify_cancel():
return

in _set_concurrent_future_state() does 2 things:

  1. If the concurrent future has not been cancelled, this method sets the state of the future to running. It will return True.
  2. If the concurrent future (aka destination) has been cancelled before this method is called, it will return False, and the code will return from _set_concurrent_future_state().

All other logic is exactly the same.

--

--

alex_ber
alex_ber

Written by alex_ber

Senior Software Engineer at Pursway

No responses yet