Composite Applications

It starts with a simple need to split off a small part of an application into a separate process. This may be motivated by a need for concurrency, e.g. for background printing, or it might be a workaround for build problems. In a large-scale example the application might be the frontend for a pool of job processors.

In all these examples there are a tightly-coupled group of processes with a main process that implements the workflow across the group. It knows the goals and where the inputs are. Once the goals are reached the process generates any outputs and terminates. With nothing more to do the remainder are terminated as well. This abstraction is referred to here as a composite application.

There would likely be larger numbers of composite applications but for three major difficulties. How is data moved around an application that now spans several processes and how is that same collection of processes started and stopped at the right times. Developing an application that functions as a sub-process manager is a partial solution but that can be difficult code to write and it would need to be maintained as the set of sub-processes evolved. Passing the responsibility for starting and stopping sub-processes on to users is not a realistic solution.

The third difficulty is more subtle. If a set of network addresses is needed for communications within the group, problems begin when there is a need for two instances of the composite application within the same host. Lurking within a solution based on networking is a requirement for management of port numbers.

Download materials supporting this guide and setup the environment with the following commands;

$ cd <folder-of-repos>
$ git clone https://github.com/mr-ansar/composite-applications.git
$ cd composite-applications
$ python3 -m venv .env
$ source .env/bin/activate
$ pip3 install ansar-connect pyinstaller ply

A Composite Job Processor

There is a need for an application that accepts a batch of jobs as input, uses multi-processing and multi-threading to improve throughput, and generates a version of the input annotated with the results, as output.

Each job involves a complex text input that needs to be passed through several phases of processing. The output from each phase becomes the input for the next phase. Output from the last phase becomes the annotation for that particular job.

This implementation uses a process for each phase and creates multiple threads within those phases. There is the understanding that this should move to multiple-processes-per-phase at some later time, to truly exploit concurrency under Python.

Each job is an arithmetic expression. The expression must be parsed into an abstract syntax tree, the tree is the input to a code generator and the generated code is submitted to a virtual machine to produce the final result.

parse.py

expression text ⟹ abstract syntax tree

codegen.py

abstract syntax tree ⟹ machine code

vm.py

machine code ⟹ floating-point

The main workflow process job-batch.py accepts a file of expressions as input and generates an output file where each expression is annotated with the result.

Note

This expression parsing first appeared in ansar-create. Refer to the associated documentation for related details. Each phase module has been modified to provide a network service, using publish().

Execution Of A Composite Application

A Makefile in the repo provides convenient shorthands. To construct the application use the composite-application target;

$ cd <../composite-applications>
$ source .env/bin/activate
$ make composite-application
pyinstaller --onefile --hidden-import=_cffi_backend --log-level ERROR -p . job-batch.py
pyinstaller --onefile --hidden-import=_cffi_backend --log-level ERROR -p . parse.py
pyinstaller --onefile --hidden-import=_cffi_backend --log-level ERROR -p . codegen.py
pyinstaller --onefile --hidden-import=_cffi_backend --log-level ERROR -p . vm.py
pyinstaller --onefile --hidden-import=_cffi_backend --log-level ERROR -p . `which ansar-group`
ansar create
ansar deploy dist
ansar add job-batch job-batch
ansar add parse parse
ansar add codegen codegen
ansar add vm vm
$

The composite application is located in .ansar-home and is ready to go. Use the run target to initiate batch processing;

$ cd <../composite-applications>
$ source .env/bin/activate
$ make run INPUT_BATCH=batch
ansar update job-batch --input-batch=batch --output-batch=
ansar run --main-role=job-batch
Ack()

This produces the batch.annotated file;

$ cat batch.annotated
1 (1.0)
2 (2.0)
3 (3.0)
4 (4.0)
5 (5.0)
6 (6.0)
7 (7.0)
8 (8.0)
9 (9.0)
0 (0.0)
1 + 2 (3.0)
3 + 4 (7.0)
5 + 6 (11.0)
7 + 8 (15.0)
9 + 0 (9.0)
..

Try the run-debug target for a view of the internal operations.

How Composition Works

Most of what happens in the job-batch.py module is common to every ansar async application. Specific behaviour includes the delegation of establishing network transports to GroupTable and the associated connection manager. For a general introduction to this facility, go to Exchanging Messages With Multiple Servers. In addition, a special feature of the manager is used to simplify the application even further - a session value is passed at creation time;

def job_batch(self, group, settings):
        with open(settings.input_batch) as file:
                batch = [j.rstrip() for j in file]

        running = {}
        for i, j in enumerate(batch):
                a = self.create(Job, group, j)
                running[a] = i

        while running:
                m = self.select(ar.Completed, ar.Stop)
                if isinstance(m, ar.Stop):
                        return ar.Aborted()

                i = running.pop(self.return_address, None)
                if i is None:
                        continue
                if not isinstance(m.value, MachineValue):
                        return ar.Faulted(f'job [{i}] returned unexpected value')

                batch[i] = f'{batch[i]} ({m.value.value})'

        with open(settings.output_batch, 'w') as f:
                for b in batch:
                        f.write(f'{b}\n')

        return ar.Ack()

ar.bind(job_batch)

def create_job_batch(self, settings):
        group = ar.GroupTable(
                parse=ar.CreateFrame(ar.SubscribeToListing, PARSE_SERVICE),
                codegen=ar.CreateFrame(ar.SubscribeToListing, CODEGEN_SERVICE),
                vm=ar.CreateFrame(ar.SubscribeToListing, VM_SERVICE),
        )
        session = ar.CreateFrame(job_batch, settings)

        a = group.create(self, session=session, get_ready=5.0)
        m = self.select(ar.Completed, ar.Stop)
        if isinstance(m, ar.Stop):
                self.send(m, a)
                self.select(ar.Completed, ar.Stop)
                return ar.Aborted()

        return m.value

ar.bind(create_job_batch)
..
..

#
#
if __name__ == '__main__':
        ar.create_node(create_job_batch, factory_settings=factory_settings)

Access to the parse, codegen and vm processes is declared by the named parameters passed to GroupTable. A CreateFrame is then used to capture the creation details for an instance of the job_batch function, and this is passed as the session value when creating the connection manager. When a full complement of connections is detected by the manager, it instantiates the session object passing the fully populated group variable as the first argument. Any additional arguments present in the CreateFrame are pushed to the right. When the session object terminates the manager terminates. In addition, if any of the connections is lost a Stop message is sent to the session.

The result of these machinations is the simplest possible scenario for the actual application object (i.e. job_batch). That object knows nothing about establishing connections or the loss of those connections. It is provided with all the address information it needs to perform the workflow and if anything occurs to interrupt that flow, it receives a Stop message. The outcome is a very concise implementation. In fact, job_batch is not even aware it is dealing with objects running in other processes.

The only downside to this arrangement is a lack of feedback to the user if there is a problem acquiring a full complement of connections. Without the get_ready timer value, the connection manager would just wait forever.

Application workflow includes the following steps;

  • read the input file to produce a list of lines

  • create an async Job() object for each line

  • wait for the matching number of object completions

  • annotate the list with each completion

  • write the annotated list to the output file

A FSM-based approach is used to move each expression through the different phases;

class Job(ar.Point, ar.StateMachine):
        def __init__(self, group, line):
                ar.Point.__init__(self)
                ar.StateMachine.__init__(self, INITIAL)
                self.group = group
                self.line = line

def Job_INITIAL_Start(self, message):
        self.send(Bodmas(self.line), self.group.parse)
        return PARSING

def Job_PARSING_AbstractSyntaxTree(self, message):
        self.send(message, self.group.codegen)
        return GENERATING
..
..
def Job_GENERATING_VirtualMachine(self, message):
        self.send(message, self.group.vm)
        return RUNNING
..
..
def Job_RUNNING_MachineValue(self, message):
        self.complete(message)

Each Job() is initialized with the group of proxy addresses and an expression. Processing begins with the sending of the expression to the group.parse address. An AST is expected in response. This continues through the phases until a MachineValue (i.e. a float) is received from the vm process.

The set of Job() objects run concurrently and analysis of the logs shows the general movement of activity from the parse phase through to the vm phase. There are several areas where this could be improved.

An Obvious Improvement

Initiation and termination of the language phase processes on every execution of the job-batch is a good demonstration of process orchestration but is also inefficient.

There is no technical reason that these processes could not remain running at all times. They need to be placed in a separate group and made available for whenever the job-batch might require them. For this rearrangement to work there needs to be communication across the groups. The scope of pub-sub operations needs to be extended such that a subscribe() in the job-batch group can match a publish() in the language process group.

Connecting Groups Of Processes

A special ansar service is provided as a repo. Cloning the repo and following the setup instructions below results in a network facility that is a part of the ansar-connect operational environment.

$ cd <folder-of-operational-services>
$ git clone https://github.com/mr-ansar/ansar-services.git
$ cd ansar-services
$ python3 -m venv .services
$ source .services/bin/activate
$ pip3 install ansar-connect pyinstaller
$ make ansar-host-service
$ make start

Warning

Installation of ansar-host as a platform service places the associated processes under the management of the platform. The ansar-services Makefile provides start, stop and status targets that implement the appropriate platform commands. Use of the ansar process orchestration commands may clash with underlying technologies (e.g. cgroups) and should be limited to read-only commands, e.g. ansar log.

This is the second service in a set of four related services provided by ansar-connect. Every use of the ansar CLI involving run or start automatically injects an instance of the first service - ansar-group. This is the infra-structure process that gathers processes into a group while the ansar-host service gathers groups together.

There are actually several services provided by ansar-host using a small group of ports starting at a standard port number (all the services listen on the loopback interface). These guides make use of the simplest option. When the networking requirements become more demanding there are the other options. Refer to the later sections that discuss the full ansar-connect architecture.

Asynchronous applications join their group courtesy of the create_node() function. Connection is then discreet and automatic. Connection of groups to ansar-host must be configured, i.e. by default the pub-sub activities within a group are private.

A Language Processing Network API

A distinct group needs to be created that will be used as the context for running the language processes;

$ cd <../composite-applications>
$ source .env/bin/activate
$ make language-group
ansar network language --connect-scope=GROUP --to-scope=HOST
ansar network language
+ HOST 127.0.0.1:32177
+ GROUP 127.0.0.1:45489
$

Creation and connection of the language group is achieved with the first ansar network command. The arguments express the specific connection, from the ansar-group to ansar-host. The second command presents a view of the current network configuration, verifying the efforts of the first command. It presents a simple rendering of the network environment available to any process within the named group, i.e. there is an instance of ansar-group at the network address 127.0.0.1:45489 and that process is further connected up to the ansar-host at 127.0.0.1:32177. The port number for the GROUP is ephemeral and changes on every run or start of the group (and every use of the ansar network command). The port number for the HOST is fixed.

The following command starts the set of language services. The ansar CLI places them in the background and returns immediately;

$ cd <../composite-applications>
$ source .env/bin/activate
$ make start-language
ansar start --group-name=language parse codegen vm
$

Use ansar status and ansar log to check for a successful start;

$ cd <../composite-applications>
$ source .env/bin/activate
$ ansar status --long-listing --groups
codegen                  <2574151> (2574130) 13.9s
group.language           <2574130> (0) 14.1s
parse                    <2574150> (2574130) 13.9s
vm                       <2574152> (2574130) 13.9s
$ ansar log parse
10:40:36.818 + <0000000a>SocketSelect - Created by <00000001>
10:40:36.818 < <0000000a>SocketSelect - Received Start from <00000001>
10:40:36.818 > <0000000a>SocketSelect - Sent SocketChannel to <00000001>
10:40:36.818 + <0000000b>PubSub[INITIAL] - Created by <00000001>
10:40:36.818 < <0000000b>PubSub[INITIAL] - Received Start from <00000001>
10:40:36.818 + <0000000c>lock_and_hold - Created by <00000001>
10:40:36.818 > <0000000c>lock_and_hold - Sent Ready to <00000001>
10:40:36.819 + <0000000d>node_vector - Created by <00000001>
10:40:36.819 ~ <0000000d>node_vector - Executable "/home/buster/composite-applications/.ansar-home/bin/parse" as node process (2554360)
10:40:36.819 ~ <0000000d>node_vector - Working folder "/"
10:40:36.819 ~ <0000000d>node_vector - Running object "__main__.parse"
10:40:36.819 ~ <0000000d>node_vector - Class threads (3) "subscribed" (3),"published" (2),"connect-to-peer" (1)
10:40:36.819 ~ <0000000d>node_vector - Detected group port 127.0.0.1:39627
10:40:36.819 + <0000000e>ServiceDirectory[INITIAL] - Created by <0000000d>
10:40:36.819 < <0000000e>ServiceDirectory[INITIAL] - Received Start from <0000000d>
10:40:36.819 ~ <0000000e>ServiceDirectory[INITIAL] - Scope of PROCESS
10:40:36.819 ~ <0000000e>ServiceDirectory[INITIAL] - Connecting up to "LocalPort"
10:40:36.819 ~ <0000000e>ServiceDirectory[INITIAL] - Listening below at "HostPort"
10:40:36.819 + <0000000f>ConnectToDirectory[INITIAL] - Created by <0000000e>
10:40:36.819 > <0000000e>ServiceDirectory[INITIAL] - Sent HostPort to <0000000d>
10:40:36.819 < <0000000d>node_vector - Received HostPort from <0000000e>
10:40:36.819 + <00000010>parse - Created by <0000000d>
..

The language processes are ready.

Connecting To Network APIs

Previous use of ansar run has been making discreet use of the default group. This group also needs to be connected to ansar-host;

$ cd <../composite-applications>
$ source .env/bin/activate
$ make default-network
ansar network --connect-scope=GROUP --to-scope=HOST
ansar network --published-services
+ HOST 127.0.0.1:32177
+   + Published services (3)
+   +   + codegen (0)
+   +   + vm (0)
+   +   + parse (0)
+ GROUP 127.0.0.1:45489
$

Adding the --published-services parameter produces a display of the current service information. That information indicates that there are services called codegen, vm and parse available beyond the default group, but within the same host.

Running job-batch within the new arrangement of processes looks like;

$ cd <../composite-applications>
$ source .env/bin/activate
$ make run-language INPUT_BATCH=batch
ansar update job-batch --input-batch=batch --output-batch=
ansar run job-batch --main-role=job-batch
Ack()

Rather than defaulting to all the processes defined within .ansar-home, there is an explicit naming of the batch process. This creates a group of the single named process. The --main-role is still required to ensure that the final output from ansar run is from the named process rather than a list of outputs from all the processes, i.e. a list of 1.

The pub-sub machinery now finds the required services at a new location. All resulting connections are the same as for the previous arrangement, i.e. there is no overhead associated with network messaging between groups versus within a group.

Further Notes

Execution times are substantially reduced by avoiding the initiation and termination time consumed by the language processes. More importantly this was a demonstration of publish-subscribe networking and the flexibility that it brings. Relocation of signifant functionality was effected via a handful of CLI commands, all with zero impact on the application itself (i.e. job_batch()). For those after ultimate performance it’s also possible to locate one or more of those services within the application process. Again, with zero impact on the application.

Relocation of the language processes into a separate group and connecting that group to ansar-host means that it is available to any process within the same host, i.e. given the necessary ansar network commands. The language processes have not only become enduring (they outlive instances of job-batch) they are now also available to others. Multiple copies of job-batch will connect to the single copy of the language processes.

Using a single home folder for this demonstration kept the quantity of materials as low as possible. A group of processes that do not include a central workflow - only network services - is otherwise known as a network API. As an API this should have its own .ansar-home folder and starting the API group should be integrated into the platform for full availability.