Spaces:
Paused
Paused
| import asyncio | |
| import functools | |
| from typing import Awaitable, Callable, Optional | |
| import anyio | |
| import anyio.to_thread | |
| from typing_extensions import ParamSpec, TypeVar | |
| T_ParamSpec = ParamSpec("T_ParamSpec") | |
| T_Retval = TypeVar("T_Retval") | |
| def function_has_argument(function: Callable, arg_name: str) -> bool: | |
| """Helper function to check if a function has a specific argument.""" | |
| import inspect | |
| signature = inspect.signature(function) | |
| return arg_name in signature.parameters | |
| def asyncify( | |
| function: Callable[T_ParamSpec, T_Retval], | |
| *, | |
| cancellable: bool = False, | |
| limiter: Optional[anyio.CapacityLimiter] = None, | |
| ) -> Callable[T_ParamSpec, Awaitable[T_Retval]]: | |
| """ | |
| Take a blocking function and create an async one that receives the same | |
| positional and keyword arguments, and that when called, calls the original function | |
| in a worker thread using `anyio.to_thread.run_sync()`. | |
| If the `cancellable` option is enabled and the task waiting for its completion is | |
| cancelled, the thread will still run its course but its return value (or any raised | |
| exception) will be ignored. | |
| ## Arguments | |
| - `function`: a blocking regular callable (e.g. a function) | |
| - `cancellable`: `True` to allow cancellation of the operation | |
| - `limiter`: capacity limiter to use to limit the total amount of threads running | |
| (if omitted, the default limiter is used) | |
| ## Return | |
| An async function that takes the same positional and keyword arguments as the | |
| original one, that when called runs the same original function in a thread worker | |
| and returns the result. | |
| """ | |
| async def wrapper( | |
| *args: T_ParamSpec.args, **kwargs: T_ParamSpec.kwargs | |
| ) -> T_Retval: | |
| partial_f = functools.partial(function, *args, **kwargs) | |
| # In `v4.1.0` anyio added the `abandon_on_cancel` argument and deprecated the old | |
| # `cancellable` argument, so we need to use the new `abandon_on_cancel` to avoid | |
| # surfacing deprecation warnings. | |
| if function_has_argument(anyio.to_thread.run_sync, "abandon_on_cancel"): | |
| return await anyio.to_thread.run_sync( | |
| partial_f, | |
| abandon_on_cancel=cancellable, | |
| limiter=limiter, | |
| ) | |
| return await anyio.to_thread.run_sync( | |
| partial_f, | |
| cancellable=cancellable, | |
| limiter=limiter, | |
| ) | |
| return wrapper | |
| def run_async_function(async_function, *args, **kwargs): | |
| """ | |
| Helper utility to run an async function in a sync context. | |
| Handles the case where there is an existing event loop running. | |
| Args: | |
| async_function (Callable): The async function to run | |
| *args: Positional arguments to pass to the async function | |
| **kwargs: Keyword arguments to pass to the async function | |
| Returns: | |
| The result of the async function execution | |
| Example: | |
| ```python | |
| async def my_async_func(x, y): | |
| return x + y | |
| result = run_async_function(my_async_func, 1, 2) | |
| ``` | |
| """ | |
| from concurrent.futures import ThreadPoolExecutor | |
| def run_in_new_loop(): | |
| """Run the coroutine in a new event loop within this thread.""" | |
| new_loop = asyncio.new_event_loop() | |
| try: | |
| asyncio.set_event_loop(new_loop) | |
| return new_loop.run_until_complete(async_function(*args, **kwargs)) | |
| finally: | |
| new_loop.close() | |
| asyncio.set_event_loop(None) | |
| try: | |
| # First, try to get the current event loop | |
| _ = asyncio.get_running_loop() | |
| # If we're already in an event loop, run in a separate thread | |
| # to avoid nested event loop issues | |
| with ThreadPoolExecutor(max_workers=1) as executor: | |
| future = executor.submit(run_in_new_loop) | |
| return future.result() | |
| except RuntimeError: | |
| # No running event loop, we can safely run in this thread | |
| return run_in_new_loop() | |