Skip to content

aea.helpers.async_utils

This module contains the misc utils for async code.

ensure_list

ensure_list(value: Any) -> List

Return [value] or list(value) if value is a sequence.

AsyncState Objects

class AsyncState()

Awaitable state.

__init__

 | __init__(initial_state: Any = None, states_enum: Optional[Container[Any]] = None)

Init async state.

Arguments:

  • initial_state: state to set on start.
  • states_enum: container of valid states if not provided state not checked on set.

set

 | set(state: Any) -> None

Set state.

add_callback

 | add_callback(callback_fn: Callable[[Any], None]) -> None

Add callback to track state changes.

Arguments:

  • callback_fn: callable object to be called on state changed.

Returns:

None

get

 | get() -> Any

Get state.

wait

 | async wait(state_or_states: Union[Any, Sequence[Any]]) -> Tuple[Any, Any]

Wait state to be set.

:params state_or_states: state or list of states.

Returns:

tuple of previous state and new state.

transit

 | @contextmanager
 | transit(initial: Any = not_set, success: Any = not_set, fail: Any = not_set) -> Generator

Change state context according to success or not.

Arguments:

  • initial: set state on context enter, not_set by default
  • success: set state on context block done, not_set by default
  • fail: set state on context block raises exception, not_set by default

Returns:

None

PeriodicCaller Objects

class PeriodicCaller()

Schedule a periodic call of callable using event loop.

Used for periodic function run using asyncio.

__init__

 | __init__(callback: Callable, period: float, start_at: Optional[datetime.datetime] = None, exception_callback: Optional[Callable[[Callable, Exception], None]] = None, loop: Optional[AbstractEventLoop] = None)

Init periodic caller.

Arguments:

  • callback: function to call periodically
  • period: period in seconds.
  • start_at: optional first call datetime
  • exception_callback: optional handler to call on exception raised.
  • loop: optional asyncio event loop

start

 | start() -> None

Activate period calls.

stop

 | stop() -> None

Remove from schedule.

AnotherThreadTask Objects

class AnotherThreadTask()

Schedule a task to run on the loop in another thread.

Provides better cancel behaviour: on cancel it will wait till cancelled completely.

__init__

 | __init__(coro: Awaitable, loop: AbstractEventLoop) -> None

Init the task.

Arguments:

  • coro: coroutine to schedule
  • loop: an event loop to schedule on.

result

 | result(timeout: Optional[float] = None) -> Any

Wait for coroutine execution result.

Arguments:

  • timeout: optional timeout to wait in seconds.

cancel

 | cancel() -> None

Cancel coroutine task execution in a target loop.

done

 | done() -> bool

Check task is done.

ThreadedAsyncRunner Objects

class ThreadedAsyncRunner(Thread)

Util to run thread with event loop and execute coroutines inside.

__init__

 | __init__(loop=None) -> None

Init threaded runner.

Arguments:

  • loop: optional event loop. is it's running loop, threaded runner will use it.

start

 | start() -> None

Start event loop in dedicated thread.

run

 | run() -> None

Run code inside thread.

call

 | call(coro: Awaitable) -> Any

Run a coroutine inside the event loop.

Arguments:

  • coro: a coroutine to run.

stop

 | stop() -> None

Stop event loop in thread.

AwaitableProc Objects

class AwaitableProc()

Async-friendly subprocess.Popen.

__init__

 | __init__(*args, **kwargs)

Initialise awaitable proc.

start

 | async start()

Start the subprocess.

ItemGetter Objects

class ItemGetter()

Virtual queue like object to get items from getters function.

__init__

 | __init__(getters: List[Callable]) -> None

Init ItemGetter.

Arguments:

  • getters: List of couroutines to be awaited.

get

 | async get() -> Any

Get item.

HandlerItemGetter Objects

class HandlerItemGetter(ItemGetter)

ItemGetter with handler passed.

__init__

 | __init__(getters: List[Tuple[Callable[[Any], None], Callable]])

Init HandlerItemGetter.

Arguments:

  • getters: List of tuples of handler and couroutine to be awaiteed for an item.

Runnable Objects

class Runnable(ABC)

Abstract Runnable class.

Use to run async task in same event loop or in dedicated thread. Provides: start, stop sync methods to start and stop task Use wait_completed to await task was completed.

__init__

 | __init__(loop: asyncio.AbstractEventLoop = None, threaded: bool = False) -> None

Init runnable.

Arguments:

  • loop: asyncio event loop to use.
  • threaded: bool. start in thread if True.

Returns:

None

start

 | start() -> bool

Start runnable.

Returns:

bool started or not.

is_running

 | @property
 | is_running() -> bool

Get running state.

run

 | @abstractmethod
 | async run() -> Any

Implement run logic respectfull to CancelError on termination.

wait_completed

 | wait_completed(sync: bool = False, timeout: float = None, force_result: bool = False) -> Awaitable

Wait runnable execution completed.

Arguments:

  • sync: bool. blocking wait
  • timeout: float seconds
  • force_result: check result even it was waited.

Returns:

awaitable if sync is False, otherise None

stop

 | stop(force: bool = False) -> None

Stop runnable.

start_and_wait_completed

 | start_and_wait_completed(*args, **kwargs) -> Awaitable

Alias for start and wait methods.