The Multiplexer can be used stand-alone. This way a developer can utilise the protocols and connections independent of the Agent or AEA classes.

First, import the Python and application specific libraries and set the static variables.

import os
import time
from copy import copy
from threading import Thread
from typing import Optional

from aea.configurations.base import ConnectionConfig
from aea.connections.stub.connection import StubConnection
from aea.identity.base import Identity
from aea.mail.base import Envelope
from aea.multiplexer import Multiplexer

INPUT_FILE = "input.txt"
OUTPUT_FILE = "output.txt"

Instantiate a Multiplexer

A Multiplexer only needs a list of connections. The StubConnection is a simple connection which reads from and writes to file.

    # Ensure the input and output files do not exist initially
    if os.path.isfile(INPUT_FILE):
        os.remove(INPUT_FILE)
    if os.path.isfile(OUTPUT_FILE):
        os.remove(OUTPUT_FILE)

    # create the connection and multiplexer objects
    configuration = ConnectionConfig(
        input_file=INPUT_FILE,
        output_file=OUTPUT_FILE,
        connection_id=StubConnection.connection_id,
    )
    stub_connection = StubConnection(
        configuration=configuration, identity=Identity("some_agent", "some_address")
    )
    multiplexer = Multiplexer([stub_connection])

Start the Multiplexer

We can run a multiplexer by calling, connect() which starts the receive and sending loops. We run the multiplexer from a different thread so that we can still use the main thread to pass it messages.

    try:
        # Set the multiplexer running in a different thread
        t = Thread(target=multiplexer.connect)
        t.start()

        # Wait for everything to start up
        time.sleep(3)

Send and receive an envelope

We use the input and output text files to send an envelope to our agent and receive a response

        # Create a message inside an envelope and get the stub connection to pass it into the multiplexer
        message_text = (
            "multiplexer,some_agent,fetchai/default:0.3.0,\x08\x01*\x07\n\x05hello,"
        )
        with open(INPUT_FILE, "w") as f:
            f.write(message_text)

        # Wait for the envelope to get processed
        time.sleep(2)

        # get the envelope
        envelope = multiplexer.get()  # type: Optional[Envelope]
        assert envelope is not None

        # Inspect its contents
        print(
            "Envelope received by Multiplexer: sender={}, to={}, protocol_id={}, message={}".format(
                envelope.sender, envelope.to, envelope.protocol_id, envelope.message
            )
        )

        # Create a mirrored response envelope
        response_envelope = copy(envelope)
        response_envelope.to = envelope.sender
        response_envelope.sender = envelope.to

        # Send the envelope back
        multiplexer.put(response_envelope)

        # Read the output envelope generated by the multiplexer
        with open(OUTPUT_FILE, "r") as f:
            print("Envelope received from Multiplexer: " + f.readline())

Shutdown

Finally stop our multiplexer and wait for it to finish

    finally:
        # Shut down the multiplexer
        multiplexer.disconnect()
        t.join()

Your turn

Now it is your turn to develop a simple usecase which utilises the Multiplexer to send and receive Envelopes.

Entire code listing

If you just want to copy and paste the entire script in you can find it here:

Click here to see full listing

import os
import time
from copy import copy
from threading import Thread
from typing import Optional

from aea.configurations.base import ConnectionConfig
from aea.connections.stub.connection import StubConnection
from aea.identity.base import Identity
from aea.mail.base import Envelope
from aea.multiplexer import Multiplexer

INPUT_FILE = "input.txt"
OUTPUT_FILE = "output.txt"


def run():
    # Ensure the input and output files do not exist initially
    if os.path.isfile(INPUT_FILE):
        os.remove(INPUT_FILE)
    if os.path.isfile(OUTPUT_FILE):
        os.remove(OUTPUT_FILE)

    # create the connection and multiplexer objects
    configuration = ConnectionConfig(
        input_file=INPUT_FILE,
        output_file=OUTPUT_FILE,
        connection_id=StubConnection.connection_id,
    )
    stub_connection = StubConnection(
        configuration=configuration, identity=Identity("some_agent", "some_address")
    )
    multiplexer = Multiplexer([stub_connection])
    try:
        # Set the multiplexer running in a different thread
        t = Thread(target=multiplexer.connect)
        t.start()

        # Wait for everything to start up
        time.sleep(3)

        # Create a message inside an envelope and get the stub connection to pass it into the multiplexer
        message_text = (
            "multiplexer,some_agent,fetchai/default:0.3.0,\x08\x01*\x07\n\x05hello,"
        )
        with open(INPUT_FILE, "w") as f:
            f.write(message_text)

        # Wait for the envelope to get processed
        time.sleep(2)

        # get the envelope
        envelope = multiplexer.get()  # type: Optional[Envelope]
        assert envelope is not None

        # Inspect its contents
        print(
            "Envelope received by Multiplexer: sender={}, to={}, protocol_id={}, message={}".format(
                envelope.sender, envelope.to, envelope.protocol_id, envelope.message
            )
        )

        # Create a mirrored response envelope
        response_envelope = copy(envelope)
        response_envelope.to = envelope.sender
        response_envelope.sender = envelope.to

        # Send the envelope back
        multiplexer.put(response_envelope)

        # Read the output envelope generated by the multiplexer
        with open(OUTPUT_FILE, "r") as f:
            print("Envelope received from Multiplexer: " + f.readline())
    finally:
        # Shut down the multiplexer
        multiplexer.disconnect()
        t.join()


if __name__ == "__main__":
    run()