aea.helpers.async_utils

This module contains the misc utils for async code.

ensure_list

ensure_list(value: Any) -> List

Return [value] or list(value) if value is a sequence.

AsyncState Objects

class AsyncState()

Awaitable state.

__init__

 | __init__(initial_state: Any = None, states_enum: Optional[Container[Any]] = None)

Init async state.

Arguments:

  • initial_state: state to set on start.
  • states_enum: container of valid states if not provided state not checked on set.

set

 | set(state: Any) -> None

Set state.

add_callback

 | add_callback(callback_fn: Callable[[Any], None]) -> None

Add callback to track state changes.

Arguments:

  • callback_fn: callable object to be called on state changed.

Returns:

None

get

 | get() -> Any

Get state.

wait

 | async wait(state_or_states: Union[Any, Sequence[Any]]) -> Tuple[Any, Any]

Wait state to be set.

:params state_or_states: state or list of states.

Returns:

tuple of previous state and new state.

transit

 | @contextmanager
 | transit(initial: Any = not_set, success: Any = not_set, fail: Any = not_set) -> Generator

Change state context according to success or not.

Arguments:

  • initial: set state on context enter, not_set by default
  • success: set state on context block done, not_set by default
  • fail: set state on context block raises exception, not_set by default

Returns:

None

PeriodicCaller Objects

class PeriodicCaller()

Schedule a periodic call of callable using event loop.

Used for periodic function run using asyncio.

__init__

 | __init__(callback: Callable, period: float, start_at: Optional[datetime.datetime] = None, exception_callback: Optional[Callable[[Callable, Exception], None]] = None, loop: Optional[AbstractEventLoop] = None)

Init periodic caller.

Arguments:

  • callback: function to call periodically
  • period: period in seconds.
  • start_at: optional first call datetime
  • exception_callback: optional handler to call on exception raised.
  • loop: optional asyncio event loop

start

 | start() -> None

Activate period calls.

stop

 | stop() -> None

Remove from schedule.

ensure_loop

ensure_loop(loop: Optional[AbstractEventLoop] = None) -> AbstractEventLoop

Use loop provided or create new if not provided or closed.

Return loop passed if its provided,not closed and not running, otherwise returns new event loop.

Arguments:

  • loop: optional event loop

Returns:

asyncio event loop

AnotherThreadTask Objects

class AnotherThreadTask()

Schedule a task to run on the loop in another thread.

Provides better cancel behaviour: on cancel it will wait till cancelled completely.

__init__

 | __init__(coro: Awaitable, loop: AbstractEventLoop) -> None

Init the task.

Arguments:

  • coro: coroutine to schedule
  • loop: an event loop to schedule on.

result

 | result(timeout: Optional[float] = None) -> Any

Wait for coroutine execution result.

Arguments:

  • timeout: optional timeout to wait in seconds.

cancel

 | cancel() -> None

Cancel coroutine task execution in a target loop.

done

 | done() -> bool

Check task is done.

ThreadedAsyncRunner Objects

class ThreadedAsyncRunner(Thread)

Util to run thread with event loop and execute coroutines inside.

__init__

 | __init__(loop=None) -> None

Init threaded runner.

Arguments:

  • loop: optional event loop. is it's running loop, threaded runner will use it.

start

 | start() -> None

Start event loop in dedicated thread.

run

 | run() -> None

Run code inside thread.

call

 | call(coro: Awaitable) -> Any

Run a coroutine inside the event loop.

Arguments:

  • coro: a coroutine to run.

stop

 | stop() -> None

Stop event loop in thread.

AwaitableProc Objects

class AwaitableProc()

Async-friendly subprocess.Popen

__init__

 | __init__(*args, **kwargs)

Initialise awaitable proc.

start

 | async start()

Start the subprocess

ItemGetter Objects

class ItemGetter()

Virtual queue like object to get items from getters function.

__init__

 | __init__(getters: List[Callable]) -> None

Init ItemGetter.

Arguments:

  • getters: List of couroutines to be awaited.

get

 | async get() -> Any

Get item.

HandlerItemGetter Objects

class HandlerItemGetter(ItemGetter)

ItemGetter with handler passed.

__init__

 | __init__(getters: List[Tuple[Callable[[Any], None], Callable]])

Init HandlerItemGetter.

Arguments:

  • getters: List of tuples of handler and couroutine to be awaiteed for an item.