Listening and Connecting

In the fundamental networking scenario there is a server listening at a known address, a client connecting to that same address and agreed mechanisms for the presentation of requests and the receiving of responses. Traditionally these interactions are synchronous in nature where the client submits the request and waits for a response from the server. There can be additional code required to get application data on and off the wire, activity known as marshaling and encoding.

This library supports that traditional style of networking but also goes much further. By adopting the asynchronous model of operation from ansar-create the library manages to provide a much more flexible and sophisticated style of networking, while also hiding most of the technical details behind the messaging system that is built into async operation. Features and behaviours that were difficult or impossible to implement with traditional networking become achievable. Other benefits include code clarity and maintainability.

Download materials supporting this guide and setup the environment with the following commands;

$ cd <folder-of-repos>
$ git clone https://github.com/mr-ansar/listening-and-connecting.git
$ cd listening-and-connecting
$ python3 -m venv .env
$ source .env/bin/activate
$ pip3 install ansar-connect

A Complete, Minimal Server

Implementation of a server looks like;

import ansar.connect as ar
from hello_welcome import *

# The server object.
def listen_at_address(self, settings):
    server_name = settings.server_name

    ipp = ar.HostPort(settings.host, settings.port)
    ar.listen(self, ipp)

    m = self.select(ar.Listening, ar.NotListening, ar.Stop)
    if isinstance(m, ar.NotListening):
        return m
    elif isinstance(m, ar.Stop):
        return ar.Aborted()

    while True:
        m = self.select(ar.Accepted, Hello, ar.Closed, ar.Abandoned, ar.Stop)
        if isinstance(m, ar.Accepted):
            self.console(f'Accepted {m.accepted_ipp}')          # Acquired a client.
            continue
        elif isinstance(m, (ar.Closed, ar.Abandoned)):
            self.console(f'Closed/Abandoned {m.opened_ipp}')    # Lost a client.
            continue
        elif isinstance(m, ar.Stop):    # Control-c.
            return ar.Aborted()         # Terminate this process.

        # Must have been the initial greeting.
        hello = m

        # Provide the expected response.
        welcome = Welcome(your_name=hello.my_name, my_name=server_name)
        self.reply(welcome)

        self.console(f'At server - {welcome}')

ar.bind(listen_at_address)

# Configuration for this executable.
class Settings(object):
    def __init__(self, server_name=None, host=None, port=None):
        self.server_name = server_name
        self.host = host
        self.port = port

SETTINGS_SCHEMA = {
    'server_name': ar.Unicode(),
    'host': ar.Unicode(),
    'port': ar.Integer8(),
}

ar.bind(Settings, object_schema=SETTINGS_SCHEMA)

# Initial values.
factory_settings = Settings(server_name='Buster', host='127.0.0.1', port=32011)

if __name__ == '__main__':
    ar.create_object(listen_at_address, factory_settings=factory_settings)

The essential points in this server are;

  • the call to create_object() to create an instance of listen_at_address(),

  • the call to listen() that establishes the object as a service at the configured network address,

  • the first call to select() to verify the listen(),

  • the message loop that waits for the next async event,

  • the call to reply() that sends the response after receiving a Hello() message,

  • the return ar.Aborted() statement that terminates the server on a control-c.

Note

This arrangement of an application is inherited from ansar-create. It takes care of persistent settings, provides logging and manages platform signals such as control-c. The reply() method is a shorthand for send(). It passes the current return address as the destination, ie. the address the Hello() message came from.

Everything to do with networking such as sockets and their related operations, is fully integrated into async operation. The same send() method used to transfer messages between async objects within a process, is also used to transfer messages across network transports.

A command to run the server looks like;

$ cd <../listening-and-connecting>
$ source .env/bin/activate
$ python3 listen-at-address.py --debug-level=DEBUG

A Matching Client

The matching client looks like;

import ansar.connect as ar
from hello_welcome import *

# The client object.
def connect_to_address(self, settings):
    client_name = settings.client_name

    ipp = ar.HostPort(settings.host, settings.port)     # Where to expect the service.
    ar.connect(self, ipp)

    m = self.select(ar.Connected, ar.NotConnected, ar.Stop)
    if isinstance(m, ar.NotConnected):
        return m
    elif isinstance(m, ar.Stop):
        return ar.Aborted()
    server_address = self.return_address    # Where the Connected message came from.

    hello = Hello(my_name=client_name)
    self.send(hello, server_address)

    m = self.select(Welcome, ar.Closed, ar.Abandoned, ar.Stop, seconds=3.0)

    if isinstance(m, Welcome):      # Intended outcome.
        pass
    elif isinstance(m, (ar.Closed, ar.Abandoned)):
        return m
    elif isinstance(m, ar.Stop):
        return ar.Aborted()
    elif isinstance(m, ar.SelectTimer):
        return ar.TimedOut(m)

    # Must have been the proper response.
    welcome = m

    self.console(f'At client - {welcome}')

    return welcome      # Return the result of Hello.

ar.bind(connect_to_address)

#
#
class Settings(object):
    def __init__(self, client_name=None, host=None, port=None):
        self.client_name = client_name
        self.host = host
        self.port = port

SETTINGS_SCHEMA = {
    'client_name': str,
    'host': str,
    'port': int,
}

ar.bind(Settings, object_schema=SETTINGS_SCHEMA)

# Initial values.
factory_settings = Settings(client_name='Gladys', host='127.0.0.1', port=32011)

if __name__ == '__main__':
    ar.create_object(connect_to_address, factory_settings=factory_settings)

The essential points in this client are;

  • the call to create_object() to create an instance of connect_to_address(),

  • the call to connect() to initiate a network connection,

  • the call to select() to confirm the connection,

  • the call to send() in response to a Connected message,

  • the second call to select() in expectation of a Welcome() message,

  • the return welcome statement that terminates the client.

Defining A Set Of Reusable Messages

It is a good idea to create a separate file for message definitions. These are the messages that will be exchanged by the client and server. This serves a similar purpose to a header or include file from other languages. In this context they are referred to as interface files;

import ansar.encode as ar

__all__ = [
    'Hello',
    'Welcome',
]

class Hello(object):
    def __init__(self, my_name=None):
        self.my_name = my_name

class Welcome(object):
    def __init__(self, your_name=None, my_name=None):
        self.your_name = your_name
        self.my_name = my_name
    
    def __str__(self):
        return f'Hello "{self.your_name}", my name is "{self.my_name}"'

SCHEMA = {
    'my_name': str,
    'your_name': str,
}

ar.bind(Hello, object_schema=SCHEMA)
ar.bind(Welcome, object_schema=SCHEMA)

The essential points in the hello_welcome.py module are;

  • import of ansar.encode reflecting the singular purpose of the module,

  • definition of the Hello() and Welcome() classes,

  • registration of each message class using the bind() function,

  • passing explicit type information in the SCHEMA table.

The module path and filename are used to auto-generate a message identity at the moment of registration (i.e. the call to bind()). Message definitions can appear in the module in which they are used. However, creation of interface files is a good defense against circular import problems.

A Complete Request-Response Sequence

A run of the client appears below (some header details omitted for clarity);

$ cd <../listening-and-connecting>
$ source .env/bin/activate
$ python3 connect-to-address.py -dl=DEBUG
20:32:21.639 + <0000000a>SocketSelect - Created by <00000001>
20:32:21.639 < <0000000a>SocketSelect - Received Start from <00000001>
20:32:21.639 > <0000000a>SocketSelect - Sent SocketChannel to <00000001>
20:32:21.639 + <0000000b>PubSub[INITIAL] - Created by <00000001>
20:32:21.640 < <0000000b>PubSub[INITIAL] - Received Start from <00000001>
20:32:21.640 + <0000000c>object_vector - Created by <00000001>
20:32:21.640 ~ <0000000c>object_vector - Executable "/home/buster/connect-to-address.py" as object process (2363848)
20:32:21.640 ~ <0000000c>object_vector - Working folder "/home/buster"
20:32:21.640 ~ <0000000c>object_vector - Running object "__main__.connect_to_address"
20:32:21.640 ~ <0000000c>object_vector - Class threads (3) "subscribed" (3),"published" (2),"connect-to-peer" (1)
20:32:21.640 + <0000000d>connect_to_address - Created by <0000000c>
20:32:21.640 + <0000000e>SocketProxy[INITIAL] - Created by <0000000a>
20:32:21.640 ~ <0000000a>SocketSelect - Connected to "127.0.0.1:32011", at local address "127.0.0.1:37560"
20:32:21.640 > <0000000a>SocketSelect - Forward Connected to <0000000d> (from <0000000e>)
20:32:21.640 < <0000000e>SocketProxy[INITIAL] - Received Start from <0000000a>
20:32:21.640 < <0000000d>connect_to_address - Received Connected from <0000000e>
20:32:21.640 > <0000000d>connect_to_address - Sent Hello to <0000000e>
20:32:21.640 > <0000000d>connect_to_address - Sent StartTimer to <00000003>
20:32:21.640 < <0000000e>SocketProxy[NORMAL] - Received Hello from <0000000d>
20:32:21.641 > <0000000a>SocketSelect - Forward Welcome to <0000000d> (from <0000000e>)
20:32:21.641 > <0000000d>connect_to_address - Sent CancelTimer to <00000003>
20:32:21.641 < <0000000d>connect_to_address - Received Welcome from <0000000e>
20:32:21.641 ^ <0000000d>connect_to_address - At client - Hello "Gladys", my name is "Buster"
20:32:21.641 X <0000000d>connect_to_address - Destroyed
20:32:21.641 < <0000000c>object_vector - Received Completed from <0000000d>
20:32:21.641 X <0000000c>object_vector - Destroyed
20:32:21.740 < <0000000b>PubSub[NORMAL] - Received Stop from <00000001>
20:32:21.740 X <0000000b>PubSub[NORMAL] - Destroyed
20:32:21.741 > <0000000a>SocketSelect - Sent Close to <0000000e>
20:32:21.741 < <0000000e>SocketProxy[NORMAL] - Received Close from <0000000a>
20:32:21.742 X <0000000e>SocketProxy[NORMAL] - Destroyed
20:32:21.742 > <0000000a>SocketSelect - Forward Closed to <0000000d> (from <0000000e>)
20:32:21.743 X <0000000a>SocketSelect - Destroyed
Welcome
+   your_name: "Gladys"
+   my_name: "Buster"
$

Messages are encoded and decoded automatically using the ansar-encode library. A full set of types is supported including generic containers (e.g. arrays, vectors and sets), user-defined types and graphs.

Note

Generation of logs is automatic. Adding a --debug-level argument directs the logs to stderr, else they are discarded. The return value of the object created by create_object() is rendered on stdout. These behaviours are inherited from ansar-create and can be controlled in different ways.

Network Connections As Asynchronous Addresses

The logs include information relating to the async object called SocketProxy. This object is the gateway between the world of async messages and the world of network sockets. Each network connection involves two proxy objects - one at the connecting end and one at the accepting end. When a client receives the Connected message or the server receives the Accepted message they were sent from proxy objects. The return address (i.e. self.return_address) is then the proxy - the local representation of a remote object. That address can be passed around for use by other parts of the application. The result of sending to a proxy is always the transfer of the message across the underlying connection to the remote end.

Once a connection is established there is the ability to perform the familiar request-response operations found in traditional networking. However, with ansar-connect there is no real constraint on who can send what and when they can send it. Network connections can be treated as a multiplexed transports. Every message transferred across a connection has address information tacked on such that the receiver is always able to direct a message back to the correct sender, e.g. the connect_to_address() and listen_at_address() objects. Connections can support multiple ongoing conversations. Activity like this is safe even in the presence of multi-threading.

Separation Of Session And Application

Placing network details behind an abstraction such as async addresses is a useful strategy in several ways. The less technical details that appear in an implementation the more clearly the application is manifested in the source code.

Another strategy available in the library to help with code clarity, is the separation of session management and application messages. The former includes messages like Connected, Accepted and Abandoned while the latter includes all the messages that might be exchanged between clients and servers once a connection is established. This issue is most obviously apparent at the listening end, where the server must be prepared to receive Accepted and Abandoned messages intermingled with application messages like Hello(). All the session management messages and all the application messages for every connected client are being directed to a single point.

Note

The files listen-sessions-at-address.py and connect-session-to-address.py are part of the repo downloaded at the start of this section. They are functional equivalents of listen-at-address.py and connect-to-address.py, respectively. Running the session-based implementations uses the same commands except for the different module names. Clients and servers are inter-changeable, e.g. connect-to-address.py can be used to connect to listen-sessions-at-address.py.

A session-based server looks like;

import ansar.connect as ar
from hello_welcome import *

# Session object.
def accepted_at_address(self, server_name, **kv):
    while True:
        m = self.select(Hello, ar.Stop)
        if isinstance(m, Hello):
            welcome = Welcome(your_name=m.my_name, my_name=server_name)
            self.reply(welcome)
        elif isinstance(m, ar.Stop):
            return ar.Aborted()

ar.bind(accepted_at_address)

# Server object.
def listen_at_address(self, settings):
    server_name = settings.server_name

    # Establish the listen.
    ipp = ar.HostPort(settings.host, settings.port)
    session = ar.CreateFrame(accepted_at_address, server_name)
    ar.listen(self, ipp, session=session)
    m = self.select(ar.Listening, ar.NotListening, ar.Stop)
    if isinstance(m, ar.NotListening):
        return m
    elif isinstance(m, ar.Stop):
        return ar.Aborted()

    # Defer to the sessions.
    self.select(ar.Stop)
    return ar.Aborted()

ar.bind(listen_at_address)

# Configuration for this executable.
class Settings(object):
    def __init__(self, server_name=None, host=None, port=None):
        self.server_name = server_name
        self.host = host
        self.port = port

SETTINGS_SCHEMA = {
    'server_name': str,
    'host': str,
    'port': int,
}

ar.bind(Settings, object_schema=SETTINGS_SCHEMA)

# Initial values.
factory_settings = Settings(server_name='Buster', host='127.0.0.1', port=32011)

# Entry point.
if __name__ == '__main__':
    ar.create_object(listen_at_address, factory_settings=factory_settings)

The essential points in this server are;

  • the addition of the accepted_at_address() function,

  • the use of CreateFrame() to capture all the details of a session object,

  • passing the session variable to the listen() function to enable sessions in the network machinery.

An instance of accepted_at_address() is created after every successful connection of a client. This simplifies the coding of listen_at_address() - with no duties to perform around the comings and goings of clients, related processing has been stripped out. It also creates a dedicated context for each connected client. As the goals of the application become more ambitious the correct handling of individual clients becomes more complicated and less practical to implement at a single point, i.e. listen_at_address().

A session-based client looks like;

import ansar.connect as ar
from hello_welcome import *

# Session object.
def connected_to_address(self, client_name, remote_address=None, **kv):
    hello = Hello(my_name=client_name)
    self.send(hello, remote_address)

    m = self.select(Welcome, ar.Stop)
    if isinstance(m, ar.Stop):
        return ar.Aborted()
    return m

ar.bind(connected_to_address)

# Client object.
def connect_to_address(self, settings):
    client_name = settings.client_name

    ipp = ar.HostPort(settings.host, settings.port)             # Where to expect the service.
    session = ar.CreateFrame(connected_to_address, client_name) # Description of a session.
    ar.connect(self, ipp, session=session)
    m = self.select(ar.Connected, ar.NotConnected, ar.Stop)
    if isinstance(m, ar.NotConnected):
        return m
    elif isinstance(m, ar.Stop):
        return ar.Aborted()

    # Session has started.
    m = self.select(ar.Abandoned, ar.Closed, ar.Stop)
    if isinstance(m, ar.Abandoned):
        return m
    elif isinstance(m, ar.Closed):
        return m.value
    elif isinstance(m, ar.Stop):
        return ar.Aborted()

ar.bind(connect_to_address)

#
#
class Settings(object):
    def __init__(self, client_name=None, host=None, port=None):
        self.client_name = client_name
        self.host = host
        self.port = port

SETTINGS_SCHEMA = {
    'client_name': str,
    'host': str,
    'port': int,
}

ar.bind(Settings, object_schema=SETTINGS_SCHEMA)

factory_settings = Settings(client_name='Gladys', host='127.0.0.1', port=32011)

if __name__ == '__main__':
    ar.create_object(connect_to_address, factory_settings=factory_settings)

The essential points in this client are;

  • the addition of the connected_to_address() function,

  • the use of CreateFrame() to capture all the details of a session object,

  • passing the session variable to the connect() function to enable sessions in the network machinery.

An instance of connected_to_address() is created after a successful connection. There is an exchange of Hello() and Welcome() messages and the client session object terminates. This causes the shutdown of the underlying socket and the sending of an Abandoned message within the server.

The demand for client-side session objects is less obvious. The connect_to_address() function served as a context for the single connection supported by the connect() function. However, the separation still improves code clarity and there is also the ability to move the session objects around. As a slightly odd example it would be possible to switch the client and server session objects around so that the server made a request to the client. It is also possible to combine multiple session object pairs over a single connection. Effectively this would run multiple exchanges side-by-side.

Async objects such as listen_at_address() and connect_to_address() are known as network controllers and async objects such as accepted_at_address() and connected_to_address() are known as network sessions. Session objects follow a standard object lifecycle - there is a start, an exchange of messages and a termination. There is nothing in that cycle that involves technical networking details. Messages associated with termination of a session object are mapped onto appropriate networking messages, e.g. a Completed message becomes a Closed message sent to the controller.

Due to the different routing of session vs application messages the session objects do not receive Connected or Accepted messages and therefore miss out on the implicit availability of proxy addresses. For this reason the addresses for the proxy and their associated controller are passed as named variables at the time a session object is created, e.g. remote_address. When sessions are enabled the Connected and Accepted messages are still sent to the appropriate controllers but the return address changes to the address of the newly created session object. By this mechanism controllers have timely visibility of their sessions.

Networking With Finite State Machines

Finite state machines are particularly well-matched to the requirements of more sophisticated networking. An example would be a software solution involving multiple components spread around a network, e.g. a laboratory with many complex scientific devices or a production line with a high level of automation. Rather than multiple clients making RPC-style requests to a server, there are peer processes making connections within the group and exchanging bi-directional messages over extended periods. It is the ongoing messages that maintain a relationship between the two peers and it is the set of relationships that give the software solution its collective behaviour.

In this documentation, examples of async objects such as clients and servers are implemented as functions. This is for reasons such as readability and the typically smaller number of source lines required. It is worth noting that the implementation of ansar-connect involves over 100 async objects and almost 100% of this number are implemented as finite state machines. Function-based async objects have their place but the more ambitious networking components are likely to be implemented as FSMs.

Note

The files listen-fsm-at-address.py and connect-fsm-to-address.py are part of the repo downloaded at the start of this section. They are functional equivalents of listen-at-address.py and connect-to-address.py, respectively. Running the FSM-based implementations uses the same commands except for the different module names. Clients and servers are inter-changeable, e.g. connect-to-address.py can be used to connect to listen-fsm-at-address.py. Finite state machines are a part of the ansar-create library. For details on FSMs and a broader understanding of asynchronous programming, refer to the associated documentation.

The FSM version of the server looks like;

import ansar.connect as ar
from hello_welcome import *


# Server FSM object.
class INITIAL: pass
class PENDING: pass
class LISTENING: pass

class ListenAtAddress(ar.Point, ar.StateMachine):
    def __init__(self, settings):
        ar.Point.__init__(self)
        ar.StateMachine.__init__(self, INITIAL)
        self.settings = settings
        self.server_name = settings.server_name
        self.ipp = None
        self.listening = None

def ListenAtAddress_INITIAL_Start(self, message):
    self.ipp = ar.HostPort(self.settings.host, self.settings.port)
    ar.listen(self, self.ipp)
    return PENDING

def ListenAtAddress_PENDING_Listening(self, message):
    self.listening = message
    return LISTENING

def ListenAtAddress_PENDING_NotListening(self, message):
    self.complete(message)

def ListenAtAddress_PENDING_Stop(self, message):
    self.complete(ar.Aborted())

def ListenAtAddress_LISTENING_Accepted(self, message):
    self.console(f'Accepted at {message.accepted_ipp}')
    return LISTENING

def ListenAtAddress_LISTENING_Hello(self, message):
    welcome = Welcome(your_name=message.my_name, my_name=self.server_name)
    self.reply(welcome)
    return LISTENING

def ListenAtAddress_LISTENING_Abandoned(self, message):
    self.console(f'Abandoned')
    return LISTENING

def ListenAtAddress_LISTENING_Stop(self, message):
    self.complete(ar.Aborted())

LISTEN_AT_ADDRESS_DISPATCH = {
    INITIAL: (
        (ar.Start,), ()
    ),
    PENDING: (
        (ar.Listening, ar.NotListening, ar.Stop), ()
    ),
    LISTENING: (
        (ar.Accepted, Hello, ar.Abandoned, ar.Stop,), ()
    ),
}

ar.bind(ListenAtAddress, LISTEN_AT_ADDRESS_DISPATCH)

# Configuration for this executable.
class Settings(object):
    def __init__(self, server_name=None, host=None, port=None):
        self.server_name = server_name
        self.host = host
        self.port = port

SETTINGS_SCHEMA = {
    'server_name': ar.Unicode(),
    'host': ar.Unicode(),
    'port': ar.Integer8(),
}

ar.bind(Settings, object_schema=SETTINGS_SCHEMA)

# Initial values.
factory_settings = Settings(server_name='Buster', host='127.0.0.1', port=32011)

# Entry point.
if __name__ == '__main__':
    ar.create_object(ListenAtAddress, factory_settings=factory_settings)

The salient points in this server are;

  • replacement of the listen_at_address() function with the ListenAtAddress class,

  • inclusion of StateMachine as a base class,

  • passing of INITIAL to the base __init__() call,

  • the set of functions with names like ListenAtAddress_PENDING_Stop,

  • registration of the ListenAtAddress class using bind(),

  • passing the LISTEN_AT_ADDRESS_DISPATCH dictionary to the registration,

  • passing ListenAtAddress to create_object()

Functionally this implementation is equivalent to previous versions. However, operationally this version of the server is lighter in that there is no dedicated thread, allowing large numbers of servers to be created without stressing the local computing resources. While a large number of server objects in a process seems unlikely, this property of FSMs has utility. It also has an improved chance at robustness and reliability courtesy of the formal nature of FSMs.

The FSM version of the client looks like;

import ansar.connect as ar
from hello_welcome import *

# Client FSM object.
class INITIAL: pass
class PENDING: pass
class CONNECTED: pass

class ConnectToAddress(ar.Point, ar.StateMachine):
    def __init__(self, settings):
        ar.Point.__init__(self)
        ar.StateMachine.__init__(self, INITIAL)
        self.settings = settings
        self.client_name = settings.client_name
        self.ipp = None
        self.connected = None

def ConnectToAddress_INITIAL_Start(self, message):
    self.ipp = ar.HostPort(self.settings.host, self.settings.port)
    ar.connect(self, self.ipp)
    return PENDING

def ConnectToAddress_PENDING_Connected(self, message):
    self.connected = message
    hello = Hello(my_name=self.client_name)
    self.reply(hello)
    return CONNECTED

def ConnectToAddress_PENDING_NotConnected(self, message):
    self.complete(message)

def ConnectToAddress_PENDING_Stop(self, message):
    self.complete(ar.Aborted())

def ConnectToAddress_CONNECTED_Welcome(self, message):
    self.complete(message)

def ConnectToAddress_CONNECTED_Abandoned(self, message):
    self.complete(message)

def ConnectToAddress_CONNECTED_Stop(self, message):
    self.complete(ar.Aborted())

CONNECT_TO_ADDRESS_DISPATCH = {
    INITIAL: (
        (ar.Start,), ()
    ),
    PENDING: (
        (ar.Connected, ar.NotConnected, ar.Stop), ()
    ),
    CONNECTED: (
        (Welcome, ar.Abandoned, ar.Stop,), ()
    ),
}

ar.bind(ConnectToAddress, CONNECT_TO_ADDRESS_DISPATCH)

#
#
class Settings(object):
    def __init__(self, client_name=None, host=None, port=None):
        self.client_name = client_name
        self.host = host
        self.port = port

SETTINGS_SCHEMA = {
    'client_name': ar.Unicode(),
    'host': ar.Unicode(),
    'port': ar.Integer8(),
}

ar.bind(Settings, object_schema=SETTINGS_SCHEMA)

factory_settings = Settings(client_name='Gladys', host='127.0.0.1', port=32011)

if __name__ == '__main__':
    ar.create_object(ConnectToAddress, factory_settings=factory_settings)

The session and FSM capabilities can be combined. Functions such as accepted_at_address() and connected_to_address() would become the AcceptedAtAddress and ConnectedToAddress classes. It is also possible to mix the implementation styles. Implementation of these variants is left as an exercise.

Exchanging Messages With Multiple Servers

Implementing a software component that requires the services of multiple servers is a good coding challenge. Complexity rises quickly as the number of servers increases. There is the initial establishment of all the required connections to consider and then the need to implement recovery after the loss of a connection. Recovery will likely involve multiple attempts to connect and periods of delay to avoid floods of connections. In yet more complex scenarios there may be the ongoing demands of upstream clients to manage.

At some inconvenient moment in the development of this component there might be a moment of insight - every connection can be lost at any moment and this is true from the moment a networking component starts through to the moment it terminates. The component may start, acquire a partial set of connections and then lose them all.

It is much less stressful - and more accurate - to think of the component as an item of software that is continually attempting to establish a full set of connections. The component is always aware of its current state but achieving a full set is just one state of many and as transient as the others. To simplify their implementation some components may choose to give the fully-connected state special significance, e.g. by responding to its upstream clients with temporarily-out-of-service in any other state.

Note

The file connect-and-reconnect-to-address.py is included in the repo downloaded at the start of this section. It is a functional equivalent of connect-to-address.py.

Consider the following revision of the client;

import ansar.connect as ar
from hello_welcome import *

def say_hello(self, client_name, group):
    while True:
        m = self.select(ar.GroupUpdate, ar.Ready, ar.Completed, ar.Stop)
        if isinstance(m, ar.GroupUpdate):
            group.update(m)                 # Connected or existing connection lost.
        elif isinstance(m, ar.Ready):
            break                           # Full set of connections.
        elif isinstance(m, ar.Completed):
            return m.value                  # There was a group problem, e.g. retries were exhausted.
        elif isinstance(m, ar.Stop):
            return ar.Aborted()

    server_address = group.server

    hello = Hello(my_name=client_name)
    self.send(hello, server_address)

    m = self.select(Welcome, ar.GroupUpdate, ar.Stop, seconds=3.0)

    if isinstance(m, Welcome):      # Intended outcome.
        pass
    elif isinstance(m, ar.GroupUpdate):     # Can only be bad - terminate.
        return m
    elif isinstance(m, ar.Stop):
        return ar.Aborted()
    elif isinstance(m, ar.SelectTimer):
        return ar.TimedOut(m)

    # Must have been the proper response.
    welcome = m

    self.console(f'At client - {welcome}')

    return welcome

# The client object.
def connect_to_address(self, settings):
    client_name = settings.client_name

    ipp = ar.HostPort(settings.host, settings.port)     # Where to expect the service.

    group = ar.GroupTable(
        server=ar.CreateFrame(ar.ConnectToAddress, ipp),
    )
    g = group.create(self)

    welcome = say_hello(self, client_name, group)

    self.send(ar.Stop(), g)     # Clean up.
    self.select(ar.Completed)

    return welcome

ar.bind(connect_to_address)

#
#
class Settings(object):
    def __init__(self, client_name=None, host=None, port=None):
        self.client_name = client_name
        self.host = host
        self.port = port

SETTINGS_SCHEMA = {
    'client_name': str,
    'host': str,
    'port': int,
}

ar.bind(Settings, object_schema=SETTINGS_SCHEMA)

# Initial values.
factory_settings = Settings(client_name='Gladys', host='127.0.0.1', port=32011)

if __name__ == '__main__':
    ar.create_object(connect_to_address, factory_settings=factory_settings)

The salient moments in this client are;

  • the use of GroupTable to define a list of servers (i.e. currently one),

  • the call to create() to launch a connection manager,

  • the processing of messages such as GroupUpdate and Ready,

  • the separate say_hello() function to allow for cleanup of the connection manager.

The connect_to_address() function has delegated the responsibility for connecting to a connection manager. The first step is to define a list of the desired connections using GroupTable. The create() method is then used to start the manager which initiates all connection activity and then provides a stream of updates to the object specified at creation time (i.e. the passing of self).

Each time a new connection is established or an existing connection is lost, a GroupUpdate is sent. In addition the Ready and NotReady messages are sent to demarcate a period where all connections are present. The GroupUpdate messages must be passed to the update() method. This is the mechanism by which the group attributes reflect the current status of the members listed in the GroupTable, e.g. the group.server value is set to the address of a proxy object.

To observe an example of the work carried out by the connection manager (AddressGroup), start the client before the server. The client will start a series of connection attempts that succeeds once the server has started;

$ cd <../listening-and-connecting>
$ source .env/bin/activate
$ python3 connect-and-reconnect-to-address.py -dl=DEBUG
20:17:17.496 + <0000000a>SocketSelect - Created by <00000001>
20:17:17.496 < <0000000a>SocketSelect - Received Start from <00000001>
20:17:17.496 > <0000000a>SocketSelect - Sent SocketChannel to <00000001>
20:17:17.496 + <0000000b>PubSub[INITIAL] - Created by <00000001>
20:17:17.496 < <0000000b>PubSub[INITIAL] - Received Start from <00000001>
20:17:17.496 + <0000000c>object_vector - Created by <00000001>
20:17:17.496 ~ <0000000c>object_vector - Executable "/home/buster/connect-and-reconnect-to-address.py" as object process (2408477)
20:17:17.496 ~ <0000000c>object_vector - Working folder "/home/buster"
20:17:17.496 ~ <0000000c>object_vector - Running object "__main__.connect_to_address"
20:17:17.496 ~ <0000000c>object_vector - Class threads (3) "subscribed" (3),"published" (2),"connect-to-peer" (1)
20:17:17.496 + <0000000d>connect_to_address - Created by <0000000c>
20:17:17.496 + <0000000e>AddressGroup[INITIAL] - Created by <0000000d>
20:17:17.496 < <0000000e>AddressGroup[INITIAL] - Received Start from <0000000d>
20:17:17.496 + <0000000f>ConnectToAddress[INITIAL] - Created by <0000000e>
20:17:17.496 < <0000000f>ConnectToAddress[INITIAL] - Received Start from <0000000e>
20:17:17.497 > <0000000a>SocketSelect - Sent NotConnected to <0000000f>
20:17:17.497 < <0000000f>ConnectToAddress[PENDING] - Received NotConnected from <0000000a>
20:17:17.497 > <0000000f>ConnectToAddress[PENDING] - Sent StartTimer to <00000003>
20:17:19.498 < <0000000f>ConnectToAddress[GLARING] - Received GlareTimer from <00000003>
20:17:19.499 > <0000000a>SocketSelect - Sent NotConnected to <0000000f>
20:17:19.499 < <0000000f>ConnectToAddress[PENDING] - Received NotConnected from <0000000a>
20:17:19.499 > <0000000f>ConnectToAddress[PENDING] - Sent StartTimer to <00000003>
20:17:24.005 < <0000000f>ConnectToAddress[GLARING] - Received GlareTimer from <00000003>
20:17:24.006 + <00000010>SocketProxy[INITIAL] - Created by <0000000a>
20:17:24.006 ~ <0000000a>SocketSelect - Connected to "127.0.0.1:32011", at local address "127.0.0.1:46152"
20:17:24.006 > <0000000a>SocketSelect - Forward Connected to <0000000f> (from <00000010>)
20:17:24.007 < <00000010>SocketProxy[INITIAL] - Received Start from <0000000a>
20:17:24.007 < <0000000f>ConnectToAddress[PENDING] - Received Connected from <00000010>
20:17:24.007 > <0000000f>ConnectToAddress[PENDING] - Sent UseAddress to <0000000e>
20:17:24.007 < <0000000e>AddressGroup[PENDING] - Received UseAddress from <0000000f>
20:17:24.007 > <0000000e>AddressGroup[PENDING] - Sent GroupUpdate to <0000000d>
20:17:24.007 > <0000000e>AddressGroup[PENDING] - Sent Ready to <0000000d>
20:17:24.008 < <0000000d>connect_to_address - Received GroupUpdate from <0000000e>
20:17:24.008 < <0000000d>connect_to_address - Received Ready from <0000000e>
20:17:24.008 > <0000000d>connect_to_address - Sent Hello to <00000010>
20:17:24.008 > <0000000d>connect_to_address - Sent StartTimer to <00000003>
20:17:24.008 < <00000010>SocketProxy[NORMAL] - Received Hello from <0000000d>
20:17:24.013 > <0000000a>SocketSelect - Forward Welcome to <0000000d> (from <00000010>)
20:17:24.014 > <0000000d>connect_to_address - Sent CancelTimer to <00000003>
20:17:24.014 < <0000000d>connect_to_address - Received Welcome from <00000010>
20:17:24.014 ^ <0000000d>connect_to_address - At client - Hello "Gladys", my name is "Buster"
..

Adding a server is now trivial. Just create another CreateFrame() and add it to the list of named parameters for GroupTable. The new name will be added to the group and a value assigned via the GroupUpdate message and update() mechanism. Initially all the named attributes of the group object are set to None.

Control flow through the client is arranged to reach the ready state. The assigment of a proxy address with the server_address = group.server statement occurs after the latest message has been checked against the Ready type which results in a break from the while loop. Minor changes to the flow could produce a client that runs forever, always attempting to maintain a full set of connections.

There is a lot more going on under the hood of the manager. Delays between connection attempts include backoff and randomization. Each successive delay is longer (up to a maximum) and also tweaked by a random amount. These are efforts to reduce noise on the network and also to avoid swamping a server immediately after it joins the fray. There are also distinct delay sequences for the different scopes of network - attempts to connect across the loopback interface occur with shorter delays than attempts to connect across the LAN. Attempts to connect across the Internet are separated by the longest delays.

The GroupTable works with several different types of entry. As well as ConnectToAddress there is also ListenAtAddress and several more. Together these cater to just about every arrangement of networking components and inter-component connections that might be needed.

Encryption

Security is a part of networking. For this reason encryption is built into the ansar-connect library. To activate encryption there is a simple boolean flag on both the listen() and the connect() functions;

def listen(self, requested_ipp, session=None, encrypted=False):
    ..
    ..
def connect(self, requested_ipp, session=None, encrypted=False):
    ..
    ..

For networking to succeed the values assigned at each end must match. Either they are both True or they are both False.

Encryption is based around Curve25519 high-speed elliptic curve cryptography. There is an initial Diffie-Hellman style exchange of public keys in cleartext, after which all data frames passing across the network are encrypted. All key creation and runtime encryption/decryption is performed by the Salt library.

Authentication and authorization is left to the user.