Skip to content

aea.helpers.async_utils

This module contains the misc utils for async code.

ensure_list

ensure_list(value: Any) -> List

Return [value] or list(value) if value is a sequence.

AsyncState Objects

class AsyncState()

Awaitable state.

__init__

 | __init__(initial_state: Any = None, states_enum: Optional[Container[Any]] = None) -> None

Init async state.

Arguments:

  • initial_state: state to set on start.
  • states_enum: container of valid states if not provided state not checked on set.

set

 | set(state: Any) -> None

Set state.

add_callback

 | add_callback(callback_fn: Callable[[Any], None]) -> None

Add callback to track state changes.

Arguments:

  • callback_fn: callable object to be called on state changed.

Returns:

None

get

 | get() -> Any

Get state.

wait

 | async wait(state_or_states: Union[Any, Sequence[Any]]) -> Tuple[Any, Any]

Wait state to be set.

Arguments:

  • state_or_states: state or list of states.

Returns:

tuple of previous state and new state.

transit

 | @contextmanager
 | transit(initial: Any = not_set, success: Any = not_set, fail: Any = not_set) -> Generator

Change state context according to success or not.

Arguments:

  • initial: set state on context enter, not_set by default
  • success: set state on context block done, not_set by default
  • fail: set state on context block raises exception, not_set by default

Returns:

None

PeriodicCaller Objects

class PeriodicCaller()

Schedule a periodic call of callable using event loop.

Used for periodic function run using asyncio.

__init__

 | __init__(callback: Callable, period: float, start_at: Optional[datetime.datetime] = None, exception_callback: Optional[Callable[[Callable, Exception], None]] = None, loop: Optional[AbstractEventLoop] = None) -> None

Init periodic caller.

Arguments:

  • callback: function to call periodically
  • period: period in seconds.
  • start_at: optional first call datetime
  • exception_callback: optional handler to call on exception raised.
  • loop: optional asyncio event loop

start

 | start() -> None

Activate period calls.

stop

 | stop() -> None

Remove from schedule.

AnotherThreadTask Objects

class AnotherThreadTask()

Schedule a task to run on the loop in another thread.

Provides better cancel behaviour: on cancel it will wait till cancelled completely.

__init__

 | __init__(coro: Awaitable, loop: AbstractEventLoop) -> None

Init the task.

Arguments:

  • coro: coroutine to schedule
  • loop: an event loop to schedule on.

result

 | result(timeout: Optional[float] = None) -> Any

Wait for coroutine execution result.

Arguments:

  • timeout: optional timeout to wait in seconds.

cancel

 | cancel() -> None

Cancel coroutine task execution in a target loop.

done

 | done() -> bool

Check task is done.

ThreadedAsyncRunner Objects

class ThreadedAsyncRunner(Thread)

Util to run thread with event loop and execute coroutines inside.

__init__

 | __init__(loop: Optional[AbstractEventLoop] = None) -> None

Init threaded runner.

Arguments:

  • loop: optional event loop. is it's running loop, threaded runner will use it.

start

 | start() -> None

Start event loop in dedicated thread.

run

 | run() -> None

Run code inside thread.

call

 | call(coro: Awaitable) -> Any

Run a coroutine inside the event loop.

Arguments:

  • coro: a coroutine to run.

stop

 | stop() -> None

Stop event loop in thread.

Runnable Objects

class Runnable(ABC)

Abstract Runnable class.

Use to run async task in same event loop or in dedicated thread. Provides: start, stop sync methods to start and stop task Use wait_completed to await task was completed.

__init__

 | __init__(loop: asyncio.AbstractEventLoop = None, threaded: bool = False) -> None

Init runnable.

Arguments:

  • loop: asyncio event loop to use.
  • threaded: bool. start in thread if True.

Returns:

None

start

 | start() -> bool

Start runnable.

Returns:

bool started or not.

is_running

 | @property
 | is_running() -> bool

Get running state.

run

 | @abstractmethod
 | async run() -> Any

Implement run logic respectful to CancelError on termination.

wait_completed

 | wait_completed(sync: bool = False, timeout: float = None, force_result: bool = False) -> Awaitable

Wait runnable execution completed.

Arguments:

  • sync: bool. blocking wait
  • timeout: float seconds
  • force_result: check result even it was waited.

Returns:

awaitable if sync is False, otherwise None

stop

 | stop(force: bool = False) -> None

Stop runnable.

start_and_wait_completed

 | start_and_wait_completed(*args: Any, **kwargs: Any) -> Awaitable

Alias for start and wait methods.