Publish And Subscribe Networking
Traditional networking based around listen() and connect() functions and the foreknowledge of network
addresses has several difficulties. Top of the list is the need for both parties to be aware of an agreed
address. Most obviously this is about the client having knowledge of the address to connect to, i.e.
the IP address and port where a network service is expected to be listening.
The server does not escape this issue either. The potential presence of multiple network interfaces means that there is either some heuristic for selection of the correct interface (and therefore the correct IP address) or the server must be explicitly configured. Interfaces with multiple IP addresses (IP aliasing) are another concern.
Most importantly, there is the issue of static vs dynamic IP addresses. A DHCP server is a standard part of operational IP networks, ensuring that hosts can be added and removed in a flexible manner. However, the presence of DHCP means that without explicit configuration, hosts may be assigned a different IP address from one day to the next. Depending on local networking policies it can be difficult or impossible to arrange a static address for a server and without a known, stable address, configuration of a client is effectively impossible.
As networking software becomes more ambitious it brings the inevitable need for additional hosts requiring additional static addresses. Even if large numbers of static addresses were permitted within the target network, it conflicts with the goals of DHCP which was invented to sidestep the need for ongoing address management. A DNS service ameliorates some of the issues and dynamic DNS goes a little further, but neither solves the requirements for networking as described in this documentation.
Another difficulty is that having properly configured a networking software solution, it can be difficult or even impossible to repeat the process for another instance of the same software solution. These kind of difficulties are common during development of networking software but other examples might arise where a company wishes to run separate instances of its software product for both sales and training purposes. At the root of things is the fact that IP addresses and port numbers are global to a network. An entirely new set of IP addresses and port numbers (or just port numbers) must be allocated to each new instance of a software solution. All of this assumes that the solution is even amenable to such reconfiguration.
A final difficulty with the listen() and connect() style of networking is that clients will typically
be required to implement a recovery mechanism for restoring connections after they are lost. The same
problem actually exists from the moment a client starts operation and discovers that the server is MIA.
Specifically the problem is polling - the client is forced to make a series of attempts to connect.
This is not only an implementation complexity but also an activity that creates noise on the network.
Large numbers of clients trying to reconnect to a restarting server at the same time, can cause its
own special problems.
Expressed in more practical terms, the typical IP network is not an environment conducive to the operation of arbitrary collections of networking processes. Pub-sub networking is one response to this constraint.
Download materials supporting this guide and setup the environment with the following commands;
$ cd <folder-of-repos>
$ git clone https://github.com/mr-ansar/publish-and-subscribe.git
$ cd publish-and-subscribe
$ python3 -m venv .env
$ source .env/bin/activate
$ pip3 install ansar-connect pyinstaller
Networking By Names
A runtime registry is needed where names and network addresses can be held. A phonebook is a reasonable
metaphor where each entry is a simple text name along with an IP address and port number. The pub-sub
phonebook will be created by the ansar-connect environment and populated automatically by calls to
publish() and subscribe(), the pub-sub equivalents for listen() and connect().
For the first pub-sub demonstration the phonebook will be created by the ansar process orchestration
tools, e.g. ansar run and ansar start. The server and client will run as a group under the
control of ansar. This is convenient as process management is a necessary aspect of networking anyway.
Adoption of ansar process orchestration does require the building of executable images. The necessary
commands are built into the Makefile that arrives with the download.
The following commands prepare everything for demonstration of pub-sub networking;
$ cd <../publish-and-subscribe>
$ source .env/bin/activate
$ make pub-sub
pyinstaller --onefile --hidden-import=_cffi_backend --log-level ERROR -p . publish-a-service.py
pyinstaller --onefile --hidden-import=_cffi_backend --log-level ERROR -p . subscribe-to-service.py
pyinstaller --onefile --hidden-import=_cffi_backend --log-level ERROR -p . `which ansar-group`
ansar create
ansar deploy dist
ansar add publish-a-service pub
ansar add subscribe-to-service sub
$
Note
The --hidden-import directive is required for any module using the Salt
encryption library.
A Complete, Minimal Publisher
Changes from a listen()-based to a publish()-based server are minor;
import ansar.connect as ar
from hello_welcome import *
# The service object.
def publish_a_service(self, settings):
server_name = settings.server_name
listing = settings.listing
ar.publish(self, listing)
m = self.select(ar.Published, ar.NotPublished, ar.Stop)
if isinstance(m, ar.NotPublished):
return m
elif isinstance(m, ar.Stop):
return ar.Aborted()
while True:
m = self.select(ar.Delivered, Hello, ar.Dropped, ar.Cleared, ar.Stop)
if isinstance(m, ar.Delivered):
self.console(f'Delivered to {m.agent_address}') # Acquired a subscriber.
continue
elif isinstance(m, Hello):
pass
elif isinstance(m, (ar.Cleared, ar.Dropped)): # Lost a subscriber.
self.console(f'Cleared/Dropped')
continue
else:
return ar.Aborted() # Control-c.
welcome = Welcome(your_name=m.my_name, my_name=server_name)
self.reply(welcome)
ar.bind(publish_a_service)
# Configuration for this executable.
class Settings(object):
def __init__(self, server_name=None, listing=None):
self.server_name = server_name
self.listing = listing
SETTINGS_SCHEMA = {
'server_name': ar.Unicode(),
'listing': ar.Unicode(),
}
ar.bind(Settings, object_schema=SETTINGS_SCHEMA)
# Initial values.
factory_settings = Settings(server_name='Buster', listing='hello-welcome')
if __name__ == '__main__':
ar.create_node(publish_a_service, factory_settings=factory_settings)
The salient moments in this server are;
call to
create_node()to create an instance ofpublish_a_service().call to
publish()passing the configured listing name, “hello-welcome”.call to
select()in expection of aDeliveredmessage.a second call to
select()in expectation of aHello()message.the call to the
reply()method in response to theHello().a third call to
select()in expectation of aDroppedmessage.a fourth call to
select()in expectation of a Stop() message.the
return ar.Aborted()statement that terminates the server after a control-c.
Processes involved in pub-sub begin with a call to create_node() rather than create_object(). This is
how the process “registers” itself with the local environment. A different set of messages are used
to notify the server of the different networking events. e.g. the Dropped message is
equivalent to the Abandoned message.
A Matching Subscriber
A subscribing client to match the publishing server looks like;
import ansar.connect as ar
from hello_welcome import *
# The subscriber object.
def subscribe_to_service(self, settings):
client_name = settings.client_name
# Declare interest in the service.
listing = settings.listing
ar.subscribe(self, listing)
m = self.select(ar.Subscribed, ar.Stop)
if isinstance(m, ar.Stop):
return ar.Aborted()
m = self.select(ar.Available, ar.Stop)
if isinstance(m, ar.Stop):
return ar.Aborted()
server_address = self.return_address
hello = Hello(my_name=client_name)
self.send(hello, server_address)
r = self.select(Welcome, ar.Cleared, ar.Dropped, ar.Stop, seconds=3.0)
if isinstance(r, Welcome): # Intended outcome.
pass
elif isinstance(r, (ar.Cleared, ar.Dropped)):
return r
elif isinstance(r, ar.Stop):
return ar.Aborted()
elif isinstance(r, ar.SelectTimer):
return ar.TimedOut(r)
return r # Return the result of Enquiry.
ar.bind(subscribe_to_service)
# Configuration for this executable.
class Settings(object):
def __init__(self, client_name=None, listing=None):
self.client_name = client_name
self.listing = listing
SETTINGS_SCHEMA = {
'client_name': ar.Unicode(),
'listing': ar.Unicode(),
}
ar.bind(Settings, object_schema=SETTINGS_SCHEMA)
# Initial values.
factory_settings = Settings(client_name='Gladys', listing='hello-welcome')
if __name__ == '__main__':
ar.create_node(subscribe_to_service, factory_settings=factory_settings)
The salient moments in this client are;
call to
create_node()to create an instance ofsubscribe_to_service().call to
subscribe()passing the configured listing name, “hello-welcome”.call to
select()in expection of anAvailablemessage.the call to the
send()method in response to theAvailable.a second call to
select()in expectation of aWelcome()message.the
return rstatement that terminates the client.
The client process “registers” itself with the local environment in the same manner as the server.
A Request-Response Sequence Over Pub-Sub
Running the demonstration looks like;
$ cd <../publish-and-subscribe>
$ source .env/bin/activate
$ ansar run --main-role=sub
ansar run --main-role=sub
Incognito
+ type_name: "hello_welcome.Welcome"
+ decoded_word: {"my_name": "Buster", "your_name": "Gladys"}
+ saved_pointers: {}
The client and server have successfully exchanged names and the ansar CLI tool has reported the output
of the sub role, as requested on the command line.
The ansar run command is an example of ansar process orchestration. It accepts a list of roles and
arranges for the execution of the appropriate executables as a logical group. Where the list is empty
it defaults to everything that has been added, i.e. pub and sub.
The ansar CLI provides two methods for starting processes. The ansar start command looks the same
but at runtime places the group behind a daemon process and returns immediately. Logging is directed
into storage areas within the .ansar-home folder and can be extracted with the ansar log command.
During a run command logging is disabled by default. Adding the flag --debug-level=DEBUG sends
all logging to the stderr stream. Effectively the run command is for debugging and other
investigations, while the start command is targeted at production scenarios.
Specifying a main role on the command line causes two behaviours. Firstly, if the named process terminates there is a forced termination of any remaining processes. Secondly, the command reports the output of that single specified process. The default is to report the output of all the processes in the group. This feature can be used to implement composite applications in a very concise way.
Where the output type is unknown to process orchestration (i.e. the Hello and Welcome types are
never imported by orchestration tools) the data is converted to an Incognito type. There is enough
information in the reported value to verify the client-server exchange.
Noteworthy aspects of this demonstration include;
successful network messaging between a client and a server,
zero network address configuration,
management of a configured set of processes, from startup through to termination.
Explanation of how a network transport can be established without configuration of a network address is outside the scope of this documentation. Suffice to say that published services listen on ephemeral ports and the runtime environment makes that information available to subscribers in a timely fashion.
Extending The Scope Of Pub-Sub
For simplicity the initial demo of pub-sub was constrained to messaging within a small group of processes (i.e. two) on a single host. This is the lowest scope of pub-sub networking in a sequence of four scopes. These scopes are mostly a reflection of familiar network topology;
GROUP - a group of processes on a single host,
HOST - a collection of groups on a single host,
LAN - a collection of multiple hosts,
WAN - a collection of multiple LANs.
Each scope of pub-sub networking is supported by different ansar-connect services. These are;
ansar-group - a service process added to every instance of process orchestration,
ansar-host - a set of network services installed on each host as required,
ansar-lan - a set of network services installed on a single host per site,
ansar-wan - an online service.
As seen in the demo, pub-sub at the lowest scope is automatic. The second and third scopes require the installation of ansar software and the fourth scope exists as a service on the Internet. A dedicated public repo provides the ansar-host and ansar-lan software. Installation instructions appear in other sections of this documentation. Instructions for setup of the online service can be found in Remote Weather Stations.
Setup of pub-sub services in all cases is simple and operational requirements are low. Setup of the full suite - assuming availability of an ansar-lan host - can be completed within half an hour. It should be noted that these services are multi-tenanted in that they all support multiple distinct messaging environments. Specific documentation can be found here.
The remainder of the pub-sub documentation is presented as a small set of guides, each guide aimed at a particular scope of operation. These are;
Composite Applications - breakup of a monolithic application into multiple processes
A Networked Laboratory - monitoring of multiple devices around a LAN
Remote Weather Stations - connecting to devices on the other side of the Internet