Skip to content

aea.helpers.pipe

Portable pipe implementation for Linux, MacOS, and Windows.

IPCChannelClient Objects

class IPCChannelClient(ABC)

Multi-platform interprocess communication channel for the client side.

connect

 | @abstractmethod
 | async connect(timeout=PIPE_CONN_TIMEOUT) -> bool

Connect to communication channel

Arguments:

  • timeout: timeout for other end to connect

write

 | @abstractmethod
 | async write(data: bytes) -> None

Write data bytes to the other end of the channel

Will first write the size than the actual data

Arguments:

  • data: bytes to write

read

 | @abstractmethod
 | async read() -> Optional[bytes]

Read bytes from the other end of the channel

Will first read the size than the actual data

Returns:

read bytes

close

 | @abstractmethod
 | async close() -> None

Close the communication channel.

Returns:

None

IPCChannel Objects

class IPCChannel(IPCChannelClient)

Multi-platform interprocess communication channel.

in_path

 | @property
 | @abstractmethod
 | in_path() -> str

Rendezvous point for incoming communication.

Returns:

path

out_path

 | @property
 | @abstractmethod
 | out_path() -> str

Rendezvous point for outgoing communication.

Returns:

path

PosixNamedPipeProtocol Objects

class PosixNamedPipeProtocol()

Posix named pipes async wrapper communication protocol.

__init__

 | __init__(in_path: str, out_path: str, logger: logging.Logger = _default_logger, loop: Optional[AbstractEventLoop] = None)

Initialize a new posix named pipe.

Arguments:

  • in_path: rendezvous point for incoming data
  • out_path: rendezvous point for outgoing daa

connect

 | async connect(timeout: float = PIPE_CONN_TIMEOUT) -> bool

Connect to the other end of the pipe

Arguments:

  • timeout: timeout before failing

Returns:

connection success

write

 | async write(data: bytes) -> None

Write to pipe.

Arguments:

  • data: bytes to write to pipe

read

 | async read() -> Optional[bytes]

Read from pipe.

Returns:

read bytes

close

 | async close() -> None

Disconnect pipe.

TCPSocketProtocol Objects

class TCPSocketProtocol()

TCP socket communication protocol.

__init__

 | __init__(reader: asyncio.StreamReader, writer: asyncio.StreamWriter, logger: logging.Logger = _default_logger, loop: Optional[AbstractEventLoop] = None)

Initialize the tcp socket protocol.

Arguments:

  • reader: established asyncio reader
  • writer: established asyncio writer

write

 | async write(data: bytes) -> None

Write to socket.

Arguments:

  • data: bytes to write

read

 | async read() -> Optional[bytes]

Read from socket.

Returns:

read bytes

close

 | async close() -> None

Disconnect socket.

TCPSocketChannel Objects

class TCPSocketChannel(IPCChannel)

Interprocess communication channel implementation using tcp sockets.

__init__

 | __init__(logger: logging.Logger = _default_logger, loop: Optional[AbstractEventLoop] = None)

Initialize tcp socket interprocess communication channel.

connect

 | async connect(timeout: float = PIPE_CONN_TIMEOUT) -> bool

Setup communication channel and wait for other end to connect.

Arguments:

  • timeout: timeout for the connection to be established

write

 | async write(data: bytes) -> None

Write to channel.

Arguments:

  • data: bytes to write

read

 | async read() -> Optional[bytes]

Read from channel.

Arguments:

  • data: read bytes

close

 | async close() -> None

Disconnect from channel and clean it up.

in_path

 | @property
 | in_path() -> str

Rendezvous point for incoming communication.

out_path

 | @property
 | out_path() -> str

Rendezvous point for outgoing communication.

PosixNamedPipeChannel Objects

class PosixNamedPipeChannel(IPCChannel)

Interprocess communication channel implementation using Posix named pipes.

__init__

 | __init__(logger: logging.Logger = _default_logger, loop: Optional[AbstractEventLoop] = None)

Initialize posix named pipe interprocess communication channel.

connect

 | async connect(timeout: float = PIPE_CONN_TIMEOUT) -> bool

Setup communication channel and wait for other end to connect.

Arguments:

  • timeout: timeout for connection to be established

Returns:

bool, indicating sucess

write

 | async write(data: bytes) -> None

Write to the channel.

Arguments:

  • data: data to write to channel

read

 | async read() -> Optional[bytes]

Read from the channel.

Returns:

read bytes

close

 | async close() -> None

Close the channel and clean it up.

in_path

 | @property
 | in_path() -> str

Rendezvous point for incoming communication.

out_path

 | @property
 | out_path() -> str

Rendezvous point for outgoing communication.

TCPSocketChannelClient Objects

class TCPSocketChannelClient(IPCChannelClient)

Interprocess communication channel client using tcp sockets.

__init__

 | __init__(in_path: str, out_path: str, logger: logging.Logger = _default_logger, loop: Optional[AbstractEventLoop] = None)

Initialize a tcp socket communication channel client.

Arguments:

  • in_path: rendezvous point for incoming data
  • out_path: rendezvous point for outgoing data

connect

 | async connect(timeout: float = PIPE_CONN_TIMEOUT) -> bool

Connect to the other end of the communication channel.

Arguments:

  • timeout: timeout for connection to be established

write

 | async write(data: bytes) -> None

Write data to channel.

Arguments:

  • data: bytes to write

read

 | async read() -> Optional[bytes]

Read data from channel.

Returns:

read bytes

close

 | async close() -> None

Disconnect from communication channel.

PosixNamedPipeChannelClient Objects

class PosixNamedPipeChannelClient(IPCChannelClient)

Interprocess communication channel client using Posix named pipes.

__init__

 | __init__(in_path: str, out_path: str, logger: logging.Logger = _default_logger, loop: Optional[AbstractEventLoop] = None)

Initialize a posix named pipe communication channel client.

Arguments:

  • in_path: rendezvous point for incoming data
  • out_path: rendezvous point for outgoing data

connect

 | async connect(timeout: float = PIPE_CONN_TIMEOUT) -> bool

Connect to the other end of the communication channel.

Arguments:

  • timeout: timeout for connection to be established

write

 | async write(data: bytes) -> None

Write data to channel.

Arguments:

  • data: bytes to write

read

 | async read() -> Optional[bytes]

Read data from channel.

Returns:

read bytes

close

 | async close() -> None

Disconnect from communication channel.

make_ipc_channel

make_ipc_channel(logger: logging.Logger = _default_logger, loop: Optional[AbstractEventLoop] = None) -> IPCChannel

Build a portable bidirectional InterProcess Communication channel

Arguments:

  • logger: the logger
  • loop: the loop

Returns:

IPCChannel

make_ipc_channel_client

make_ipc_channel_client(in_path: str, out_path: str, logger: logging.Logger = _default_logger, loop: Optional[AbstractEventLoop] = None) -> IPCChannelClient

Build a portable bidirectional InterProcess Communication client channel

Arguments:

  • in_path: rendezvous point for incoming communication
  • out_path: rendezvous point for outgoing outgoing
  • logger: the logger
  • loop: the loop

Returns:

IPCChannel