Listening and Connecting
In the fundamental networking scenario there is a server listening at a known address, a client connecting to that same address and agreed mechanisms for the presentation of requests and the receiving of responses. Traditionally these interactions are synchronous in nature where the client submits the request and waits for a response from the server. There can be additional code required to get application data on and off the wire, activity known as marshaling and encoding.
This library supports that traditional style of networking but also goes much further. By adopting the asynchronous model of operation from ansar-create the library manages to provide a much more flexible and sophisticated style of networking, while also hiding most of the technical details behind the messaging system that is built into async operation. Features and behaviours that were difficult or impossible to implement with traditional networking become achievable. Other benefits include code clarity and maintainability.
Download materials supporting this guide and setup the environment with the following commands;
$ cd <folder-of-repos>
$ git clone https://github.com/mr-ansar/listening-and-connecting.git
$ cd listening-and-connecting
$ python3 -m venv .env
$ source .env/bin/activate
$ pip3 install ansar-connect
A Complete, Minimal Server
Implementation of a server looks like;
import ansar.connect as ar
from hello_welcome import *
# The server object.
def listen_at_address(self, settings):
server_name = settings.server_name
ipp = ar.HostPort(settings.host, settings.port)
ar.listen(self, ipp)
m = self.select(ar.Listening, ar.NotListening, ar.Stop)
if isinstance(m, ar.NotListening):
return m
elif isinstance(m, ar.Stop):
return ar.Aborted()
while True:
m = self.select(ar.Accepted, Hello, ar.Closed, ar.Abandoned, ar.Stop)
if isinstance(m, ar.Accepted):
self.console(f'Accepted {m.accepted_ipp}') # Acquired a client.
continue
elif isinstance(m, (ar.Closed, ar.Abandoned)):
self.console(f'Closed/Abandoned {m.opened_ipp}') # Lost a client.
continue
elif isinstance(m, ar.Stop): # Control-c.
return ar.Aborted() # Terminate this process.
# Must have been the initial greeting.
hello = m
# Provide the expected response.
welcome = Welcome(your_name=hello.my_name, my_name=server_name)
self.reply(welcome)
self.console(f'At server - {welcome}')
ar.bind(listen_at_address)
# Configuration for this executable.
class Settings(object):
def __init__(self, server_name=None, host=None, port=None):
self.server_name = server_name
self.host = host
self.port = port
SETTINGS_SCHEMA = {
'server_name': ar.Unicode(),
'host': ar.Unicode(),
'port': ar.Integer8(),
}
ar.bind(Settings, object_schema=SETTINGS_SCHEMA)
# Initial values.
factory_settings = Settings(server_name='Buster', host='127.0.0.1', port=32011)
if __name__ == '__main__':
ar.create_object(listen_at_address, factory_settings=factory_settings)
The essential points in this server are;
the call to
create_object()to create an instance oflisten_at_address(),the call to
listen()that establishes the object as a service at the configured network address,the first call to
select()to verify thelisten(),the message loop that waits for the next async event,
the call to
reply()that sends the response after receiving aHello()message,the
return ar.Aborted()statement that terminates the server on a control-c.
Note
This arrangement of an application is inherited from ansar-create.
It takes care of persistent settings, provides logging and manages platform signals such as control-c.
The reply() method is a shorthand for send(). It passes the current return address
as the destination, ie. the address the Hello() message came from.
Everything to do with networking such as sockets and their related operations, is fully integrated
into async operation. The same send() method used to transfer messages between async objects within
a process, is also used to transfer messages across network transports.
A command to run the server looks like;
$ cd <../listening-and-connecting>
$ source .env/bin/activate
$ python3 listen-at-address.py --debug-level=DEBUG
A Matching Client
The matching client looks like;
import ansar.connect as ar
from hello_welcome import *
# The client object.
def connect_to_address(self, settings):
client_name = settings.client_name
ipp = ar.HostPort(settings.host, settings.port) # Where to expect the service.
ar.connect(self, ipp)
m = self.select(ar.Connected, ar.NotConnected, ar.Stop)
if isinstance(m, ar.NotConnected):
return m
elif isinstance(m, ar.Stop):
return ar.Aborted()
server_address = self.return_address # Where the Connected message came from.
hello = Hello(my_name=client_name)
self.send(hello, server_address)
m = self.select(Welcome, ar.Closed, ar.Abandoned, ar.Stop, seconds=3.0)
if isinstance(m, Welcome): # Intended outcome.
pass
elif isinstance(m, (ar.Closed, ar.Abandoned)):
return m
elif isinstance(m, ar.Stop):
return ar.Aborted()
elif isinstance(m, ar.SelectTimer):
return ar.TimedOut(m)
# Must have been the proper response.
welcome = m
self.console(f'At client - {welcome}')
return welcome # Return the result of Hello.
ar.bind(connect_to_address)
#
#
class Settings(object):
def __init__(self, client_name=None, host=None, port=None):
self.client_name = client_name
self.host = host
self.port = port
SETTINGS_SCHEMA = {
'client_name': str,
'host': str,
'port': int,
}
ar.bind(Settings, object_schema=SETTINGS_SCHEMA)
# Initial values.
factory_settings = Settings(client_name='Gladys', host='127.0.0.1', port=32011)
if __name__ == '__main__':
ar.create_object(connect_to_address, factory_settings=factory_settings)
The essential points in this client are;
the call to
create_object()to create an instance ofconnect_to_address(),the call to
connect()to initiate a network connection,the call to
select()to confirm the connection,the call to
send()in response to aConnectedmessage,the second call to
select()in expectation of aWelcome()message,the
return welcomestatement that terminates the client.
Defining A Set Of Reusable Messages
It is a good idea to create a separate file for message definitions. These are the messages that will be exchanged by the client and server. This serves a similar purpose to a header or include file from other languages. In this context they are referred to as interface files;
import ansar.encode as ar
__all__ = [
'Hello',
'Welcome',
]
class Hello(object):
def __init__(self, my_name=None):
self.my_name = my_name
class Welcome(object):
def __init__(self, your_name=None, my_name=None):
self.your_name = your_name
self.my_name = my_name
def __str__(self):
return f'Hello "{self.your_name}", my name is "{self.my_name}"'
SCHEMA = {
'my_name': str,
'your_name': str,
}
ar.bind(Hello, object_schema=SCHEMA)
ar.bind(Welcome, object_schema=SCHEMA)
The essential points in the hello_welcome.py module are;
import of
ansar.encodereflecting the singular purpose of the module,definition of the
Hello()andWelcome()classes,registration of each message class using the bind() function,
passing explicit type information in the
SCHEMAtable.
The module path and filename are used to auto-generate a message identity at the moment of registration (i.e. the call to bind()). Message definitions can appear in the module in which they are used. However, creation of interface files is a good defense against circular import problems.
A Complete Request-Response Sequence
A run of the client appears below (some header details omitted for clarity);
$ cd <../listening-and-connecting>
$ source .env/bin/activate
$ python3 connect-to-address.py -dl=DEBUG
20:32:21.639 + <0000000a>SocketSelect - Created by <00000001>
20:32:21.639 < <0000000a>SocketSelect - Received Start from <00000001>
20:32:21.639 > <0000000a>SocketSelect - Sent SocketChannel to <00000001>
20:32:21.639 + <0000000b>PubSub[INITIAL] - Created by <00000001>
20:32:21.640 < <0000000b>PubSub[INITIAL] - Received Start from <00000001>
20:32:21.640 + <0000000c>object_vector - Created by <00000001>
20:32:21.640 ~ <0000000c>object_vector - Executable "/home/buster/connect-to-address.py" as object process (2363848)
20:32:21.640 ~ <0000000c>object_vector - Working folder "/home/buster"
20:32:21.640 ~ <0000000c>object_vector - Running object "__main__.connect_to_address"
20:32:21.640 ~ <0000000c>object_vector - Class threads (3) "subscribed" (3),"published" (2),"connect-to-peer" (1)
20:32:21.640 + <0000000d>connect_to_address - Created by <0000000c>
20:32:21.640 + <0000000e>SocketProxy[INITIAL] - Created by <0000000a>
20:32:21.640 ~ <0000000a>SocketSelect - Connected to "127.0.0.1:32011", at local address "127.0.0.1:37560"
20:32:21.640 > <0000000a>SocketSelect - Forward Connected to <0000000d> (from <0000000e>)
20:32:21.640 < <0000000e>SocketProxy[INITIAL] - Received Start from <0000000a>
20:32:21.640 < <0000000d>connect_to_address - Received Connected from <0000000e>
20:32:21.640 > <0000000d>connect_to_address - Sent Hello to <0000000e>
20:32:21.640 > <0000000d>connect_to_address - Sent StartTimer to <00000003>
20:32:21.640 < <0000000e>SocketProxy[NORMAL] - Received Hello from <0000000d>
20:32:21.641 > <0000000a>SocketSelect - Forward Welcome to <0000000d> (from <0000000e>)
20:32:21.641 > <0000000d>connect_to_address - Sent CancelTimer to <00000003>
20:32:21.641 < <0000000d>connect_to_address - Received Welcome from <0000000e>
20:32:21.641 ^ <0000000d>connect_to_address - At client - Hello "Gladys", my name is "Buster"
20:32:21.641 X <0000000d>connect_to_address - Destroyed
20:32:21.641 < <0000000c>object_vector - Received Completed from <0000000d>
20:32:21.641 X <0000000c>object_vector - Destroyed
20:32:21.740 < <0000000b>PubSub[NORMAL] - Received Stop from <00000001>
20:32:21.740 X <0000000b>PubSub[NORMAL] - Destroyed
20:32:21.741 > <0000000a>SocketSelect - Sent Close to <0000000e>
20:32:21.741 < <0000000e>SocketProxy[NORMAL] - Received Close from <0000000a>
20:32:21.742 X <0000000e>SocketProxy[NORMAL] - Destroyed
20:32:21.742 > <0000000a>SocketSelect - Forward Closed to <0000000d> (from <0000000e>)
20:32:21.743 X <0000000a>SocketSelect - Destroyed
Welcome
+ your_name: "Gladys"
+ my_name: "Buster"
$
Messages are encoded and decoded automatically using the ansar-encode library. A full set of types is supported including generic containers (e.g. arrays, vectors and sets), user-defined types and graphs.
Note
Generation of logs is automatic. Adding a --debug-level argument directs the logs to stderr, else they are
discarded. The return value of the object created by create_object() is rendered on stdout. These
behaviours are inherited from ansar-create and can be controlled in
different ways.
Network Connections As Asynchronous Addresses
The logs include information relating to the async object called SocketProxy. This object
is the gateway between the world of async messages and the world of network sockets. Each network connection
involves two proxy objects - one at the connecting end and one at the accepting end. When a client receives
the Connected message or the server receives the Accepted message they
were sent from proxy objects. The return address (i.e. self.return_address) is then the proxy - the local
representation of a remote object. That address can be passed around for use by other parts of the application.
The result of sending to a proxy is always the transfer of the message across the underlying connection to the
remote end.
Once a connection is established there is the ability to perform the familiar request-response
operations found in traditional networking. However, with ansar-connect there is no real
constraint on who can send what and when they can send it. Network connections can be treated
as a multiplexed transports. Every message transferred across a connection has address information
tacked on such that the receiver is always able to direct a message back to the correct sender, e.g.
the connect_to_address() and listen_at_address() objects. Connections can support multiple ongoing
conversations. Activity like this is safe even in the presence of multi-threading.
Separation Of Session And Application
Placing network details behind an abstraction such as async addresses is a useful strategy in several ways. The less technical details that appear in an implementation the more clearly the application is manifested in the source code.
Another strategy available in the library to help with code clarity, is the separation of session
management and application messages. The former includes messages like Connected, Accepted
and Abandoned while the latter includes all the messages that might be exchanged between clients
and servers once a connection is established. This issue is most obviously apparent at the listening
end, where the server must be prepared to receive Accepted and Abandoned
messages intermingled with application messages like Hello(). All the session management messages and all the
application messages for every connected client are being directed to a single point.
Note
The files listen-sessions-at-address.py and connect-session-to-address.py are part of the
repo downloaded at the start of this section. They are functional equivalents of listen-at-address.py
and connect-to-address.py, respectively. Running the session-based implementations uses the same
commands except for the different module names. Clients and servers are inter-changeable,
e.g. connect-to-address.py can be used to connect to listen-sessions-at-address.py.
A session-based server looks like;
import ansar.connect as ar
from hello_welcome import *
# Session object.
def accepted_at_address(self, server_name, **kv):
while True:
m = self.select(Hello, ar.Stop)
if isinstance(m, Hello):
welcome = Welcome(your_name=m.my_name, my_name=server_name)
self.reply(welcome)
elif isinstance(m, ar.Stop):
return ar.Aborted()
ar.bind(accepted_at_address)
# Server object.
def listen_at_address(self, settings):
server_name = settings.server_name
# Establish the listen.
ipp = ar.HostPort(settings.host, settings.port)
session = ar.CreateFrame(accepted_at_address, server_name)
ar.listen(self, ipp, session=session)
m = self.select(ar.Listening, ar.NotListening, ar.Stop)
if isinstance(m, ar.NotListening):
return m
elif isinstance(m, ar.Stop):
return ar.Aborted()
# Defer to the sessions.
self.select(ar.Stop)
return ar.Aborted()
ar.bind(listen_at_address)
# Configuration for this executable.
class Settings(object):
def __init__(self, server_name=None, host=None, port=None):
self.server_name = server_name
self.host = host
self.port = port
SETTINGS_SCHEMA = {
'server_name': str,
'host': str,
'port': int,
}
ar.bind(Settings, object_schema=SETTINGS_SCHEMA)
# Initial values.
factory_settings = Settings(server_name='Buster', host='127.0.0.1', port=32011)
# Entry point.
if __name__ == '__main__':
ar.create_object(listen_at_address, factory_settings=factory_settings)
The essential points in this server are;
the addition of the
accepted_at_address()function,the use of CreateFrame() to capture all the details of a session object,
passing the session variable to the
listen()function to enable sessions in the network machinery.
An instance of accepted_at_address() is created after every successful connection of a
client. This simplifies the coding of listen_at_address() - with no duties to perform around
the comings and goings of clients, related processing has been stripped out. It also creates
a dedicated context for each connected client. As the goals of the application become more
ambitious the correct handling of individual clients becomes more complicated and less practical
to implement at a single point, i.e. listen_at_address().
A session-based client looks like;
import ansar.connect as ar
from hello_welcome import *
# Session object.
def connected_to_address(self, client_name, remote_address=None, **kv):
hello = Hello(my_name=client_name)
self.send(hello, remote_address)
m = self.select(Welcome, ar.Stop)
if isinstance(m, ar.Stop):
return ar.Aborted()
return m
ar.bind(connected_to_address)
# Client object.
def connect_to_address(self, settings):
client_name = settings.client_name
ipp = ar.HostPort(settings.host, settings.port) # Where to expect the service.
session = ar.CreateFrame(connected_to_address, client_name) # Description of a session.
ar.connect(self, ipp, session=session)
m = self.select(ar.Connected, ar.NotConnected, ar.Stop)
if isinstance(m, ar.NotConnected):
return m
elif isinstance(m, ar.Stop):
return ar.Aborted()
# Session has started.
m = self.select(ar.Abandoned, ar.Closed, ar.Stop)
if isinstance(m, ar.Abandoned):
return m
elif isinstance(m, ar.Closed):
return m.value
elif isinstance(m, ar.Stop):
return ar.Aborted()
ar.bind(connect_to_address)
#
#
class Settings(object):
def __init__(self, client_name=None, host=None, port=None):
self.client_name = client_name
self.host = host
self.port = port
SETTINGS_SCHEMA = {
'client_name': str,
'host': str,
'port': int,
}
ar.bind(Settings, object_schema=SETTINGS_SCHEMA)
factory_settings = Settings(client_name='Gladys', host='127.0.0.1', port=32011)
if __name__ == '__main__':
ar.create_object(connect_to_address, factory_settings=factory_settings)
The essential points in this client are;
the addition of the
connected_to_address()function,the use of CreateFrame() to capture all the details of a session object,
passing the session variable to the
connect()function to enable sessions in the network machinery.
An instance of connected_to_address() is created after a successful connection. There is an
exchange of Hello() and Welcome() messages and the client session object terminates. This causes
the shutdown of the underlying socket and the sending of an Abandoned message within the server.
The demand for client-side session objects is less obvious. The connect_to_address() function
served as a context for the single connection supported by the connect() function. However, the
separation still improves code clarity and there is also the ability to move the session objects
around. As a slightly odd example it would be possible to switch the client and server session
objects around so that the server made a request to the client. It is also possible to combine
multiple session object pairs over a single connection. Effectively this would run multiple exchanges
side-by-side.
Async objects such as listen_at_address() and connect_to_address() are known as network controllers
and async objects such as accepted_at_address() and connected_to_address() are known as network
sessions. Session objects follow a standard object lifecycle - there is a start, an exchange of
messages and a termination. There is nothing in that cycle that involves technical networking
details. Messages associated with termination of a session object are mapped onto appropriate
networking messages, e.g. a Completed message becomes a Closed
message sent to the controller.
Due to the different routing of session vs application messages the session objects do not receive
Connected or Accepted messages and therefore miss out on the
implicit availability of proxy addresses. For this reason the addresses for the proxy and their
associated controller are passed as named variables at the time a session object is created,
e.g. remote_address. When sessions are enabled the Connected and
Accepted messages are still sent to the appropriate controllers but the return
address changes to the address of the newly created session object. By this mechanism controllers
have timely visibility of their sessions.
Networking With Finite State Machines
Finite state machines are particularly well-matched to the requirements of more sophisticated networking. An example would be a software solution involving multiple components spread around a network, e.g. a laboratory with many complex scientific devices or a production line with a high level of automation. Rather than multiple clients making RPC-style requests to a server, there are peer processes making connections within the group and exchanging bi-directional messages over extended periods. It is the ongoing messages that maintain a relationship between the two peers and it is the set of relationships that give the software solution its collective behaviour.
In this documentation, examples of async objects such as clients and servers are implemented as functions. This is for reasons such as readability and the typically smaller number of source lines required. It is worth noting that the implementation of ansar-connect involves over 100 async objects and almost 100% of this number are implemented as finite state machines. Function-based async objects have their place but the more ambitious networking components are likely to be implemented as FSMs.
Note
The files listen-fsm-at-address.py and connect-fsm-to-address.py are part of the
repo downloaded at the start of this section. They are functional equivalents of listen-at-address.py
and connect-to-address.py, respectively. Running the FSM-based implementations uses the same
commands except for the different module names. Clients and servers are inter-changeable,
e.g. connect-to-address.py can be used to connect to listen-fsm-at-address.py. Finite
state machines are a part of the ansar-create library.
For details on FSMs and a broader understanding of asynchronous programming, refer
to the associated documentation.
The FSM version of the server looks like;
import ansar.connect as ar
from hello_welcome import *
# Server FSM object.
class INITIAL: pass
class PENDING: pass
class LISTENING: pass
class ListenAtAddress(ar.Point, ar.StateMachine):
def __init__(self, settings):
ar.Point.__init__(self)
ar.StateMachine.__init__(self, INITIAL)
self.settings = settings
self.server_name = settings.server_name
self.ipp = None
self.listening = None
def ListenAtAddress_INITIAL_Start(self, message):
self.ipp = ar.HostPort(self.settings.host, self.settings.port)
ar.listen(self, self.ipp)
return PENDING
def ListenAtAddress_PENDING_Listening(self, message):
self.listening = message
return LISTENING
def ListenAtAddress_PENDING_NotListening(self, message):
self.complete(message)
def ListenAtAddress_PENDING_Stop(self, message):
self.complete(ar.Aborted())
def ListenAtAddress_LISTENING_Accepted(self, message):
self.console(f'Accepted at {message.accepted_ipp}')
return LISTENING
def ListenAtAddress_LISTENING_Hello(self, message):
welcome = Welcome(your_name=message.my_name, my_name=self.server_name)
self.reply(welcome)
return LISTENING
def ListenAtAddress_LISTENING_Abandoned(self, message):
self.console(f'Abandoned')
return LISTENING
def ListenAtAddress_LISTENING_Stop(self, message):
self.complete(ar.Aborted())
LISTEN_AT_ADDRESS_DISPATCH = {
INITIAL: (
(ar.Start,), ()
),
PENDING: (
(ar.Listening, ar.NotListening, ar.Stop), ()
),
LISTENING: (
(ar.Accepted, Hello, ar.Abandoned, ar.Stop,), ()
),
}
ar.bind(ListenAtAddress, LISTEN_AT_ADDRESS_DISPATCH)
# Configuration for this executable.
class Settings(object):
def __init__(self, server_name=None, host=None, port=None):
self.server_name = server_name
self.host = host
self.port = port
SETTINGS_SCHEMA = {
'server_name': ar.Unicode(),
'host': ar.Unicode(),
'port': ar.Integer8(),
}
ar.bind(Settings, object_schema=SETTINGS_SCHEMA)
# Initial values.
factory_settings = Settings(server_name='Buster', host='127.0.0.1', port=32011)
# Entry point.
if __name__ == '__main__':
ar.create_object(ListenAtAddress, factory_settings=factory_settings)
The salient points in this server are;
replacement of the
listen_at_address()function with theListenAtAddressclass,inclusion of
StateMachineas a base class,passing of
INITIALto the base __init__() call,the set of functions with names like
ListenAtAddress_PENDING_Stop,registration of the
ListenAtAddressclass using bind(),passing the
LISTEN_AT_ADDRESS_DISPATCHdictionary to the registration,passing
ListenAtAddresstocreate_object()
Functionally this implementation is equivalent to previous versions. However, operationally this version of the server is lighter in that there is no dedicated thread, allowing large numbers of servers to be created without stressing the local computing resources. While a large number of server objects in a process seems unlikely, this property of FSMs has utility. It also has an improved chance at robustness and reliability courtesy of the formal nature of FSMs.
The FSM version of the client looks like;
import ansar.connect as ar
from hello_welcome import *
# Client FSM object.
class INITIAL: pass
class PENDING: pass
class CONNECTED: pass
class ConnectToAddress(ar.Point, ar.StateMachine):
def __init__(self, settings):
ar.Point.__init__(self)
ar.StateMachine.__init__(self, INITIAL)
self.settings = settings
self.client_name = settings.client_name
self.ipp = None
self.connected = None
def ConnectToAddress_INITIAL_Start(self, message):
self.ipp = ar.HostPort(self.settings.host, self.settings.port)
ar.connect(self, self.ipp)
return PENDING
def ConnectToAddress_PENDING_Connected(self, message):
self.connected = message
hello = Hello(my_name=self.client_name)
self.reply(hello)
return CONNECTED
def ConnectToAddress_PENDING_NotConnected(self, message):
self.complete(message)
def ConnectToAddress_PENDING_Stop(self, message):
self.complete(ar.Aborted())
def ConnectToAddress_CONNECTED_Welcome(self, message):
self.complete(message)
def ConnectToAddress_CONNECTED_Abandoned(self, message):
self.complete(message)
def ConnectToAddress_CONNECTED_Stop(self, message):
self.complete(ar.Aborted())
CONNECT_TO_ADDRESS_DISPATCH = {
INITIAL: (
(ar.Start,), ()
),
PENDING: (
(ar.Connected, ar.NotConnected, ar.Stop), ()
),
CONNECTED: (
(Welcome, ar.Abandoned, ar.Stop,), ()
),
}
ar.bind(ConnectToAddress, CONNECT_TO_ADDRESS_DISPATCH)
#
#
class Settings(object):
def __init__(self, client_name=None, host=None, port=None):
self.client_name = client_name
self.host = host
self.port = port
SETTINGS_SCHEMA = {
'client_name': ar.Unicode(),
'host': ar.Unicode(),
'port': ar.Integer8(),
}
ar.bind(Settings, object_schema=SETTINGS_SCHEMA)
factory_settings = Settings(client_name='Gladys', host='127.0.0.1', port=32011)
if __name__ == '__main__':
ar.create_object(ConnectToAddress, factory_settings=factory_settings)
The session and FSM capabilities can be combined. Functions such as accepted_at_address()
and connected_to_address() would become the AcceptedAtAddress and ConnectedToAddress
classes. It is also possible to mix the implementation styles. Implementation of these
variants is left as an exercise.
Exchanging Messages With Multiple Servers
Implementing a software component that requires the services of multiple servers is a good coding challenge. Complexity rises quickly as the number of servers increases. There is the initial establishment of all the required connections to consider and then the need to implement recovery after the loss of a connection. Recovery will likely involve multiple attempts to connect and periods of delay to avoid floods of connections. In yet more complex scenarios there may be the ongoing demands of upstream clients to manage.
At some inconvenient moment in the development of this component there might be a moment of insight - every connection can be lost at any moment and this is true from the moment a networking component starts through to the moment it terminates. The component may start, acquire a partial set of connections and then lose them all.
It is much less stressful - and more accurate - to think of the component as an item of software that is continually attempting to establish a full set of connections. The component is always aware of its current state but achieving a full set is just one state of many and as transient as the others. To simplify their implementation some components may choose to give the fully-connected state special significance, e.g. by responding to its upstream clients with temporarily-out-of-service in any other state.
Note
The file connect-and-reconnect-to-address.py is included in the repo downloaded at
the start of this section. It is a functional equivalent of connect-to-address.py.
Consider the following revision of the client;
import ansar.connect as ar
from hello_welcome import *
def say_hello(self, client_name, group):
while True:
m = self.select(ar.GroupUpdate, ar.Ready, ar.Completed, ar.Stop)
if isinstance(m, ar.GroupUpdate):
group.update(m) # Connected or existing connection lost.
elif isinstance(m, ar.Ready):
break # Full set of connections.
elif isinstance(m, ar.Completed):
return m.value # There was a group problem, e.g. retries were exhausted.
elif isinstance(m, ar.Stop):
return ar.Aborted()
server_address = group.server
hello = Hello(my_name=client_name)
self.send(hello, server_address)
m = self.select(Welcome, ar.GroupUpdate, ar.Stop, seconds=3.0)
if isinstance(m, Welcome): # Intended outcome.
pass
elif isinstance(m, ar.GroupUpdate): # Can only be bad - terminate.
return m
elif isinstance(m, ar.Stop):
return ar.Aborted()
elif isinstance(m, ar.SelectTimer):
return ar.TimedOut(m)
# Must have been the proper response.
welcome = m
self.console(f'At client - {welcome}')
return welcome
# The client object.
def connect_to_address(self, settings):
client_name = settings.client_name
ipp = ar.HostPort(settings.host, settings.port) # Where to expect the service.
group = ar.GroupTable(
server=ar.CreateFrame(ar.ConnectToAddress, ipp),
)
g = group.create(self)
welcome = say_hello(self, client_name, group)
self.send(ar.Stop(), g) # Clean up.
self.select(ar.Completed)
return welcome
ar.bind(connect_to_address)
#
#
class Settings(object):
def __init__(self, client_name=None, host=None, port=None):
self.client_name = client_name
self.host = host
self.port = port
SETTINGS_SCHEMA = {
'client_name': str,
'host': str,
'port': int,
}
ar.bind(Settings, object_schema=SETTINGS_SCHEMA)
# Initial values.
factory_settings = Settings(client_name='Gladys', host='127.0.0.1', port=32011)
if __name__ == '__main__':
ar.create_object(connect_to_address, factory_settings=factory_settings)
The salient moments in this client are;
the use of
GroupTableto define a list of servers (i.e. currently one),the call to
create()to launch a connection manager,the processing of messages such as
GroupUpdateandReady,the separate
say_hello()function to allow for cleanup of the connection manager.
The connect_to_address() function has delegated the responsibility for connecting to
a connection manager. The first step is to define a list of the desired connections
using GroupTable. The create() method is then used
to start the manager which initiates all connection activity and then provides a stream of
updates to the object specified at creation time (i.e. the passing of self).
Each time a new connection is established or an existing connection is lost, a
GroupUpdate is sent. In addition the Ready and NotReady messages are
sent to demarcate a period where all connections are present. The GroupUpdate
messages must be passed to the update() method. This is the mechanism by
which the group attributes reflect the current status of the members listed in
the GroupTable, e.g. the group.server value is set to the address of a proxy
object.
To observe an example of the work carried out by the connection manager (AddressGroup),
start the client before the server. The client will start a series of connection attempts
that succeeds once the server has started;
$ cd <../listening-and-connecting>
$ source .env/bin/activate
$ python3 connect-and-reconnect-to-address.py -dl=DEBUG
20:17:17.496 + <0000000a>SocketSelect - Created by <00000001>
20:17:17.496 < <0000000a>SocketSelect - Received Start from <00000001>
20:17:17.496 > <0000000a>SocketSelect - Sent SocketChannel to <00000001>
20:17:17.496 + <0000000b>PubSub[INITIAL] - Created by <00000001>
20:17:17.496 < <0000000b>PubSub[INITIAL] - Received Start from <00000001>
20:17:17.496 + <0000000c>object_vector - Created by <00000001>
20:17:17.496 ~ <0000000c>object_vector - Executable "/home/buster/connect-and-reconnect-to-address.py" as object process (2408477)
20:17:17.496 ~ <0000000c>object_vector - Working folder "/home/buster"
20:17:17.496 ~ <0000000c>object_vector - Running object "__main__.connect_to_address"
20:17:17.496 ~ <0000000c>object_vector - Class threads (3) "subscribed" (3),"published" (2),"connect-to-peer" (1)
20:17:17.496 + <0000000d>connect_to_address - Created by <0000000c>
20:17:17.496 + <0000000e>AddressGroup[INITIAL] - Created by <0000000d>
20:17:17.496 < <0000000e>AddressGroup[INITIAL] - Received Start from <0000000d>
20:17:17.496 + <0000000f>ConnectToAddress[INITIAL] - Created by <0000000e>
20:17:17.496 < <0000000f>ConnectToAddress[INITIAL] - Received Start from <0000000e>
20:17:17.497 > <0000000a>SocketSelect - Sent NotConnected to <0000000f>
20:17:17.497 < <0000000f>ConnectToAddress[PENDING] - Received NotConnected from <0000000a>
20:17:17.497 > <0000000f>ConnectToAddress[PENDING] - Sent StartTimer to <00000003>
20:17:19.498 < <0000000f>ConnectToAddress[GLARING] - Received GlareTimer from <00000003>
20:17:19.499 > <0000000a>SocketSelect - Sent NotConnected to <0000000f>
20:17:19.499 < <0000000f>ConnectToAddress[PENDING] - Received NotConnected from <0000000a>
20:17:19.499 > <0000000f>ConnectToAddress[PENDING] - Sent StartTimer to <00000003>
20:17:24.005 < <0000000f>ConnectToAddress[GLARING] - Received GlareTimer from <00000003>
20:17:24.006 + <00000010>SocketProxy[INITIAL] - Created by <0000000a>
20:17:24.006 ~ <0000000a>SocketSelect - Connected to "127.0.0.1:32011", at local address "127.0.0.1:46152"
20:17:24.006 > <0000000a>SocketSelect - Forward Connected to <0000000f> (from <00000010>)
20:17:24.007 < <00000010>SocketProxy[INITIAL] - Received Start from <0000000a>
20:17:24.007 < <0000000f>ConnectToAddress[PENDING] - Received Connected from <00000010>
20:17:24.007 > <0000000f>ConnectToAddress[PENDING] - Sent UseAddress to <0000000e>
20:17:24.007 < <0000000e>AddressGroup[PENDING] - Received UseAddress from <0000000f>
20:17:24.007 > <0000000e>AddressGroup[PENDING] - Sent GroupUpdate to <0000000d>
20:17:24.007 > <0000000e>AddressGroup[PENDING] - Sent Ready to <0000000d>
20:17:24.008 < <0000000d>connect_to_address - Received GroupUpdate from <0000000e>
20:17:24.008 < <0000000d>connect_to_address - Received Ready from <0000000e>
20:17:24.008 > <0000000d>connect_to_address - Sent Hello to <00000010>
20:17:24.008 > <0000000d>connect_to_address - Sent StartTimer to <00000003>
20:17:24.008 < <00000010>SocketProxy[NORMAL] - Received Hello from <0000000d>
20:17:24.013 > <0000000a>SocketSelect - Forward Welcome to <0000000d> (from <00000010>)
20:17:24.014 > <0000000d>connect_to_address - Sent CancelTimer to <00000003>
20:17:24.014 < <0000000d>connect_to_address - Received Welcome from <00000010>
20:17:24.014 ^ <0000000d>connect_to_address - At client - Hello "Gladys", my name is "Buster"
..
Adding a server is now trivial. Just create another CreateFrame() and add it to the list of named parameters
for GroupTable. The new name will be added to the group and a value assigned via
the GroupUpdate message and update() mechanism. Initially all the named
attributes of the group object are set to None.
Control flow through the client is arranged to reach the ready state. The assigment of a proxy address
with the server_address = group.server statement occurs after the latest message has been checked
against the Ready type which results in a break from the while loop. Minor
changes to the flow could produce a client that runs forever, always attempting to maintain a full set of
connections.
There is a lot more going on under the hood of the manager. Delays between connection attempts include backoff and randomization. Each successive delay is longer (up to a maximum) and also tweaked by a random amount. These are efforts to reduce noise on the network and also to avoid swamping a server immediately after it joins the fray. There are also distinct delay sequences for the different scopes of network - attempts to connect across the loopback interface occur with shorter delays than attempts to connect across the LAN. Attempts to connect across the Internet are separated by the longest delays.
The GroupTable works with several different types of entry. As well as ConnectToAddress
there is also ListenAtAddress and several more. Together these cater to just about
every arrangement of networking components and inter-component connections that might be needed.
Encryption
Security is a part of networking. For this reason encryption is built into the ansar-connect
library. To activate encryption there is a simple boolean flag on both the listen() and the
connect() functions;
def listen(self, requested_ipp, session=None, encrypted=False):
..
..
def connect(self, requested_ipp, session=None, encrypted=False):
..
..
For networking to succeed the values assigned at each end must match. Either they are both
True or they are both False.
Encryption is based around Curve25519 high-speed elliptic curve cryptography. There is an initial Diffie-Hellman style exchange of public keys in cleartext, after which all data frames passing across the network are encrypted. All key creation and runtime encryption/decryption is performed by the Salt library.
Authentication and authorization is left to the user.