aea.helpers.async_utils

This module contains the misc utils for async code.

ensure_list

ensure_list(value: Any) -> List

Return [value] or list(value) if value is a sequence.

AsyncState Objects

class AsyncState()

Awaitable state.

__init__

 | __init__(initial_state: Any = None)

Init async state.

Arguments:

  • initial_state: state to set on start.

state

 | @state.setter
 | state(state: Any) -> None

Set state.

set

 | set(state: Any) -> None

Set state.

get

 | get() -> Any

Get state.

wait

 | async wait(state_or_states: Union[Any, Sequence[Any]]) -> Tuple[Any, Any]

Wait state to be set.

:params state_or_states: state or list of states.

Returns:

tuple of previous state and new state.

PeriodicCaller Objects

class PeriodicCaller()

Schedule a periodic call of callable using event loop.

Used for periodic function run using asyncio.

__init__

 | __init__(callback: Callable, period: float, start_at: Optional[datetime.datetime] = None, exception_callback: Optional[Callable[[Callable, Exception], None]] = None, loop: Optional[AbstractEventLoop] = None)

Init periodic caller.

Arguments:

  • callback: function to call periodically
  • period: period in seconds.
  • start_at: optional first call datetime
  • exception_callback: optional handler to call on exception raised.
  • loop: optional asyncio event loop

start

 | start() -> None

Activate period calls.

stop

 | stop() -> None

Remove from schedule.

ensure_loop

ensure_loop(loop: AbstractEventLoop = None) -> AbstractEventLoop

Use loop provided or create new if not provided or closed.

Return loop passed if its provided,not closed and not running, otherwise returns new event loop.

Arguments:

  • loop: optional event loop

Returns:

asyncio event loop

AnotherThreadTask Objects

class AnotherThreadTask()

Schedule a task to run on the loop in another thread.

Provides better cancel behaviour: on cancel it will wait till cancelled completely.

__init__

 | __init__(coro: Awaitable, loop: AbstractEventLoop) -> None

Init the task.

Arguments:

  • coro: coroutine to schedule
  • loop: an event loop to schedule on.

result

 | result(timeout: Optional[float] = None) -> Any

Wait for coroutine execution result.

Arguments:

  • timeout: optional timeout to wait in seconds.

cancel

 | cancel() -> None

Cancel coroutine task execution in a target loop.

future_cancel

 | future_cancel() -> None

Cancel task waiting future.

In this case future result will raise CanclledError not waiting for real task exit.

done

 | done() -> bool

Check task is done.

ThreadedAsyncRunner Objects

class ThreadedAsyncRunner(Thread)

Util to run thread with event loop and execute coroutines inside.

__init__

 | __init__(loop=None) -> None

Init threaded runner.

Arguments:

  • loop: optional event loop. is it's running loop, threaded runner will use it.

start

 | start() -> None

Start event loop in dedicated thread.

run

 | run() -> None

Run code inside thread.

call

 | call(coro: Awaitable) -> Any

Run a coroutine inside the event loop.

Arguments:

  • coro: a coroutine to run.

stop

 | stop() -> None

Stop event loop in thread.

cancel_and_wait

async cancel_and_wait(task: Optional[Task]) -> Any

Wait cancelled task and skip CancelledError.