My complementary API for it
When using SQLAlchemy
with psycopg2
and gevent
, the typical setup involves running your application in a single-threaded environment where gevent
can manage concurrency using greenlets
. This setup is effective for handling I/O-bound tasks, such as database operations, in a non-blocking manner.
In this scenario, you would initialize and run your database operations using SQLAlchemy
and psycopg2
in the MainThread
. The MainThread
runs the event loop
, managing greenlets
for concurrent operations.
If you have an additional thread for some other purpose, such as handling CPU-bound tasks or integrating with a library that requires a separate thread, this additional thread does not benefit from gevent’s concurrency model. Gevent
is designed to work with the event loop
in the MainThread
, and its concurrency model is not intended to work across multiple threads. If you attempt to use gevent
in a non-MainThread
, it can lead to unexpected behavior and is generally not recommended.
The source code you can found here. It is available as part of my AlexBerUtils s project.
You can install AlexBerUtils from PyPi:
python -m pip install -U alex-ber-utils
See here for more details explanation on how to install.
But! You can use asyncio.run_coroutine_threadsafe()
API to schedule a coroutine to run on an event loop that is running in the MainThread
, even if you’re calling it from a different thread. It looks like this is a way to go. Let’s have a closer look:
def run_coroutine_threadsafe(coro, loop):
"""Submit a coroutine object to a given event loop.
Return a concurrent.futures.Future to access the result.
"""
There are 3 things that spotted my view:
- Why this is
regular
/sync
function? - What is the
loop
- object? If I’m innon-MainThread
, how can I obtain it? coro
— obviously means coroutine. What about *args and **kwargs?
1. Why this is regular
/sync
function?
This function returns threading.Future
(not exactly, more on this below).
This function just scheduled coroutine to run, it doesn’t call the coro
itself. So, this is fast. And thread_feature
is easily convertible to asyncio_future
by
asyncio_future = asyncio.wrap_future(threading_future, loop)
where loop
in typical cases can be provided as None
, running event loop will be used in this case.
See Appendix A for more details.
2. What is the loop
- object? If I’m in non-MainThread
, how can I obtain it?
I can think only on the solution when on application startup, loop
is obtained (by asyncio.get_running_loop()
, for example) and stored in some global variable.
So, this what my API do. You have to call initConf()
method of alexber.utils.thread_local
module, you can omit all parameters. The call should be done from the MainThread
when there is running event loop
. Internally, it will retrieve event loop
(of theMainThread
) and store in global variable. Later run_coroutine_threadsafe()
will use it for call to asyncio.get_running_loop()
.
I also added
get_main_event_loop()
to give the ability to retrieve it and use in different asyncio’s APIs.
Note: In many APIs passing
None
asloop
means to useasyncio.get_running_loop()
.
3. coro
— obviously means coroutine. What about *args and **kwargs?
The short answer, because it seems to be redundant. I will explain why in a moment, now I want to point out that in my API I decided to take the parameters as usual *args
and **kwargs
.
The long answer. In Python, when you define an async
function, it is considered a coroutine
function. However, when you call this coroutine
function
, it is not called, but returns a coroutine object
, which is what you actually await
.
Here’s a breakdown of the behavior:
Coroutine Function
: This is the function itself, defined withasync def
. It is of type<class 'function'>
but is recognized as a coroutine function byinspect.iscoroutinefunction()
.Coroutine Object:
When you call a coroutine function (e.g.,sample_coroutine(2, 3)
below), it returns a coroutine object. This object is what youawait
to execute the coroutine. It is of type<class 'coroutine'>
.
Example:
import asyncio
import inspect
async def sample_coroutine(x, y):
await asyncio.sleep(1)
return x * y
async def main():
ref_func = sample_coroutine
ref_coroutine = ref_func(2, 3)
print(f"ref_func is a coroutine function: {inspect.iscoroutinefunction(ref_func)}")
print(f"Type of ref_func: {type(ref_func)}")
print(f"Type of ref_coroutine: {type(ref_coroutine)}")
asyncio.run(main())
This will output:
ref_func is a coroutine function: True
Type of ref_func: <class 'function'>
Type of ref_coroutine: <class 'coroutine'>
In this example:
ref_func
is a reference to the coroutine functionsample_coroutine
. It is seen as a function because it is the function object itself, not yet called.ref_coroutine
is the result of callingref_func(2, 3)
, which is a coroutine object. This is what you wouldawait
to perform the asynchronous operation.
As you can see asyncio.run_coroutine_threadsafe()
expect for the caller to pass “ref_func(2, 3)
” as coro
.
Note: If you want to run on currently running
event loop,
you can obtain it by callingasyncio.get_running_event_loop()
or, even better, to useloop.create_task().
And now comes the trickier part.
Suppose, you are in running event loop on some non-MainThread
in async context.
How to retrieve the answer from threading future
in non-blocking way?
The naïve solution is chain base_future with asyncio_future the same way as we did in run_coroutine_threadsafe()
:
asyncio_future = loop.create_future()
threading_future.add_done_callback(lambda fut: chain_future_results(fut, asyncio_future))
But it doesn’t work. I’m not 100% sure why. It seems it is due to the fact that threading_future was created in one thread, MainThread
in our case; this is the thread where loop = _EVENT_LOOP
runs; and we’re running on non-MainThread
, and we’re adding callback to in another one thread, so event loop
ofMainThread
is unaware of it.
So, what can you do? There is asyncio.wrap_future()
function.It “knows” how to convert threading.Future
to asyncio.Future
. So, you can use and get asyncio.Future
. Than, if you want, you can the answer by simply await
ing on it.
Note: You should use correct
loop
paramaeter ofasyncio.wrap_future()
.asyncio.Future
that will be returned will be bound to providedloop
. Typically, you want to boundasyncio.Future
to currently runningevent loop
. In this case you can provideNone,
it will be resolved as currently runningevent loop
. If you want event loop ofMainThread
you can useget_main_event_loop()
.
Appendix A: deep-dive to asyncio.wrap_future()
Let’s start by seeing the code:
def wrap_future(future, *, loop=None):
"""Wrap concurrent.futures.Future object."""
if isfuture(future):
return future
assert isinstance(future, concurrent.futures.Future), \
f'concurrent.futures.Future is expected, got {future!r}'
if loop is None:
loop = events._get_event_loop()
new_future = loop.create_future()
_chain_future(future, new_future)
return new_future
Ok, here is the actual isfuture()
code:
def isfuture(obj):
"""Check for a Future.
This returns True when obj is a Future instance or is advertising
itself as duck-type compatible by setting _asyncio_future_blocking.
See comment in Future for more details.
"""
return (hasattr(obj.__class__, '_asyncio_future_blocking') and
obj._asyncio_future_blocking is not None)
So, if future is asyncio.Future
, it just returned.
Then, there is assert that future is of type of concurrent.futures.Future
, if not AssertionError
will be raised.
loop
parameter is None
by default and in order to supply it should be passed by name (loop=loop
). This is a loop
to which we want to tie our newly created asyncio.Future.
If None, I believe, this is typical case, loop
will be call to events._get_event_loop()
to get the loop. Here is it’s source code:
def _get_event_loop(stacklevel=3):
# This internal method is going away in Python 3.12, left here only for
# backwards compatibility with 3.10.0 - 3.10.8 and 3.11.0.
# Similarly, this method's C equivalent in _asyncio is going away as well.
# See GH-99949 for more details.
current_loop = _get_running_loop()
if current_loop is not None:
return current_loop
return get_event_loop_policy().get_event_loop()
I have no idea what stacklevel=3
is doing here, but the rest of the code is pretty obvious. The function is used to get the current event loop. It first tries to find a running event loop and, if none is found, it retrieves the default event loop as defined by the current event loop policy. This is internal function and it was deprecated and is maintained temporarily to ensure compatibility with earlier Python 3.10 and 3.11 versions.
The reason that it is deprecated is slow migration from get_event_loop()
that can in some conditions, see above, create new loop
implicitly,
to get_running_loop()
that demands that running event loop was set beforehand, otherwise it raises exception.
So, loop = events._get_event_loop()
will typically return running event loop or,if not exists, will create one.
new_future = loop.create_future()
— creates asyncio.Future
tied to the event loop.
_chain_future(future, new_future)
— this is the most trickest past. I’ve expected that this will be something along the lines of chain_future_results(),
but this is how actual source code looks like:
def _chain_future(source, destination):
"""Chain two futures so that when one completes, so does the other.
The result (or exception) of source will be copied to destination.
If destination is cancelled, source gets cancelled too.
Compatible with both asyncio.Future and concurrent.futures.Future.
"""
if not isfuture(source) and not isinstance(source,
concurrent.futures.Future):
raise TypeError('A future is required for source argument')
if not isfuture(destination) and not isinstance(destination,
concurrent.futures.Future):
raise TypeError('A future is required for destination argument')
source_loop = _get_loop(source) if isfuture(source) else None
dest_loop = _get_loop(destination) if isfuture(destination) else None
def _set_state(future, other):
if isfuture(future):
_copy_future_state(other, future)
else:
_set_concurrent_future_state(future, other)
def _call_check_cancel(destination):
if destination.cancelled():
if source_loop is None or source_loop is dest_loop:
source.cancel()
else:
source_loop.call_soon_threadsafe(source.cancel)
def _call_set_state(source):
if (destination.cancelled() and
dest_loop is not None and dest_loop.is_closed()):
return
if dest_loop is None or dest_loop is source_loop:
_set_state(destination, source)
else:
if dest_loop.is_closed():
return
dest_loop.call_soon_threadsafe(_set_state, destination, source)
destination.add_done_callback(_call_check_cancel)
source.add_done_callback(_call_set_state)
def _copy_future_state(source, dest):
"""Internal helper to copy state from another Future.
The other Future may be a concurrent.futures.Future.
"""
assert source.done()
if dest.cancelled():
return
assert not dest.done()
if source.cancelled():
dest.cancel()
else:
exception = source.exception()
if exception is not None:
dest.set_exception(_convert_future_exc(exception))
else:
result = source.result()
dest.set_result(result)
def _set_concurrent_future_state(concurrent, source):
"""Copy state from a future to a concurrent.futures.Future."""
assert source.done()
if source.cancelled():
concurrent.cancel()
if not concurrent.set_running_or_notify_cancel():
return
exception = source.exception()
if exception is not None:
concurrent.set_exception(_convert_future_exc(exception))
else:
result = source.result()
concurrent.set_result(result)
def _convert_future_exc(exc):
exc_class = type(exc)
if exc_class is concurrent.futures.CancelledError:
return exceptions.CancelledError(*exc.args)
elif exc_class is concurrent.futures.TimeoutError:
return exceptions.TimeoutError(*exc.args)
elif exc_class is concurrent.futures.InvalidStateError:
return exceptions.InvalidStateError(*exc.args)
else:
return exc
def _get_loop(fut):
# Tries to call Future.get_loop() if it's available.
# Otherwise fallbacks to using the old '_loop' property.
try:
get_loop = fut.get_loop
except AttributeError:
pass
else:
return get_loop()
return fut._loop
Here we see all mechanic of copying state from threading.Future
to asyncio.Future
, and handling corner-cases like what if asyncio.Future
's event loop
is closed or what if threading.Future
is canceled.
Note: Quote: “Compatible with both
asyncio.Future
andconcurrent.futures.Future
. This code is more general thatasyncio.wraps()
needs, I will stick with this use-case only.
So, _call_check_cancel
is added as done callback to destination. If asyncio.Future
is cancelled we want “to propagate” cancellation signal back to threading.Future
. When asyncio.Future
is done, _call_check_cancel()
will be called.
It checks, if destination was cancelled, it will cancel the source.
If source_loop exists and is different from dest_loop, it will schedule to call the source.cancel()
on the source_loop (call_soon_threadsafe()
).
If source_loop doesn’t exist source.cancel()
will be called directly.
In the code there is also case when source_loop exists and source_loop are actually the same as dest_loop . In this case it is crucial that the call to _chain_future()
is done from the same event loop, otherwise the call source.cancel()
will not be safe.
_get_loop()
and _convert_future_exc()
are pretty straight-forward.
_copy_future_state()
and _set_concurrent_future_state()
looks similar, but in our case _copy_future_state()
is relevant.
So, _call_set_state
is added as done callback to source. When threading.Future
is done we want “to propagate” done signal forward to asyncio.Future.
When threading.Future
is done, _call_set_state()
will be called.
It checks, if destination is cancelled and there is dest_loop that is closed, nothing will be done.
If dest_loop exists and closed and is not the same as source_loop, nothing will be done.
If dest_loop exists not closed and is the same as source_loop, _set_state(destination, source)
will be called directly. In this case it is crucial that the call to _chain_future()
is done from the same event loop, otherwise the call _set_state(destination, source)
will not be safe.
If dest_loop exists and not closed and is not the same as source_loop, it will schedule to call the _set_state(destination, source)
on the dest_loop (call_soon_threadsafe()
).
def _set_state(future, other)
, as you can see above future=destination (asyncio.Future), other=source (threading.Future)
. It is unclear for me why the order of parameters was changes.
Anyway, first line isfuture(future)
(as I described above it checks whether it is asyncio.Future
), in the case that we’re interesting it it will be False
. So, _copy_future_state()
.
_copy_future_state()
first asserts that source
is done. it should be true if _copy_future_state()
was called from add_done_callback()
of the source.
Than if destination
is cancelled, nothing will be done.
If source
is done because it was cancelled, destination
will be cancelled.
Otherwise, this code will be executed:
exception = source.exception()
if exception is not None:
dest.set_exception(_convert_future_exc(exception))
else:
result = source.result()
dest.set_result(result)
def _convert_future_exc(exc):
exc_class = type(exc)
if exc_class is concurrent.futures.CancelledError:
return exceptions.CancelledError(*exc.args)
elif exc_class is concurrent.futures.TimeoutError:
return exceptions.TimeoutError(*exc.args)
elif exc_class is concurrent.futures.InvalidStateError:
return exceptions.InvalidStateError(*exc.args)
else:
return exc
This is pretty straightforward.
Before coming up to the surface, lets look on _set_concurrent_future_state()
. The difference is how they checks if source is canceled and how exactly destination cancellation is done.
In _copy_future_state()
we have:
if dest.cancelled():
return
If destination is cancelled the code, simply returns from _copy_future_state()
.
In _set_concurrent_future_state():
if not concurrent.set_running_or_notify_cancel():
return
The method set_running_or_notify_cancel()
is part of the concurrent.futures.Future
class in Python's concurrent.futures
module. This method is used internally by executors or by the system that manages the future, rather than by users directly. It is designed to handle the transition of a future's state in a thread-safe manner.
Purpose of set_running_or_notify_cancel()
The set_running_or_notify_cancel()
method serves two primary purposes:
- Set the Future as Running: If the future has not been cancelled, this method sets the state of the future to running. This is important to prevent other operations from starting the same future again and to signal that the future’s operation is in progress.
- Notify if Cancelled: If the future has been cancelled before this method is called, it ensures that the cancellation is acknowledged, and no further action is taken on this future. This helps in managing the cancellation of asynchronous tasks effectively.
Usage Context
This method is typically used by the executor (like ThreadPoolExecutor
or ProcessPoolExecutor
) when a task associated with the future is about to start execution. It checks if the future has been cancelled between the time it was scheduled and the time it is supposed to run. If it hasn't been cancelled, it marks the future as running.
Example Scenario
Here’s a conceptual example to illustrate how this might be used internally:
from concurrent.futures import Future
def task():
future = Future()
# Simulate checking and setting the future's state before running the task
if not future.set_running_or_notify_cancel():
print("Future was cancelled, stopping execution.")
return
# Proceed with the task
print("Executing task...")
# Set result upon completion
future.set_result("Task completed successfully.")
# This would typically be part of the executor's logic
Note: As a library developer or a user of the
concurrent.futures
module, you generally do not need to callset_running_or_notify_cancel()
directly. This method is meant to be used by the infrastructure that manages the execution of futures, such as the executors provided by theconcurrent.futures
module.
So,
if not concurrent.set_running_or_notify_cancel():
return
in _set_concurrent_future_state()
does 2 things:
- If the concurrent future has not been cancelled, this method sets the state of the future to running. It will return
True
. - If the concurrent future (aka destination) has been cancelled before this method is called, it will return
False,
and the code will return from_set_concurrent_future_state().
All other logic is exactly the same.