.. _composite-applications: Composite Applications ********************** It starts with a simple need to split off a small part of an application into a separate process. This may be motivated by a need for concurrency, e.g. for background printing, or it might be a workaround for build problems. In a large-scale example the application might be the frontend for a pool of job processors. In all these examples there are a tightly-coupled group of processes with a main process that implements the workflow across the group. It knows the goals and where the inputs are. Once the goals are reached the process generates any outputs and terminates. With nothing more to do the remainder are terminated as well. This abstraction is referred to here as a *composite application*. There would likely be larger numbers of composite applications but for three major difficulties. How is data moved around an application that now spans several processes and how is that same collection of processes started and stopped at the right times. Developing an application that functions as a sub-process manager is a partial solution but that can be difficult code to write and it would need to be maintained as the set of sub-processes evolved. Passing the responsibility for starting and stopping sub-processes on to users is not a realistic solution. The third difficulty is more subtle. If a set of network addresses is needed for communications within the group, problems begin when there is a need for two instances of the composite application within the same host. Lurking within a solution based on networking is a requirement for management of port numbers. Download materials supporting this guide and setup the environment with the following commands; .. code:: $ cd $ git clone https://github.com/mr-ansar/composite-applications.git $ cd composite-applications $ python3 -m venv .env $ source .env/bin/activate $ pip3 install ansar-connect pyinstaller ply A Composite Job Processor ========================= There is a need for an application that accepts a batch of jobs as input, uses multi-processing and multi-threading to improve throughput, and generates a version of the input annotated with the results, as output. Each job involves a complex text input that needs to be passed through several phases of processing. The output from each phase becomes the input for the next phase. Output from the last phase becomes the annotation for that particular job. This implementation uses a process for each phase and creates multiple threads within those phases. There is the understanding that this should move to multiple-processes-per-phase at some later time, to truly exploit concurrency under Python. Each job is an arithmetic expression. The expression must be parsed into an abstract syntax tree, the tree is the input to a code generator and the generated code is submitted to a virtual machine to produce the final result. .. list-table:: :widths: 25 75 * - ``parse.py`` - *expression text ⟹ abstract syntax tree* * - ``codegen.py`` - *abstract syntax tree ⟹ machine code* * - ``vm.py`` - *machine code ⟹ floating-point* The main workflow process ``job-batch.py`` accepts a file of expressions as input and generates an output file where each expression is annotated with the result. .. note:: This expression parsing first appeared in `ansar-create `_. Refer to the associated documentation for related details. Each phase module has been modified to provide a network service, using :func:`~.publish`. Execution Of A Composite Application ==================================== A ``Makefile`` in the repo provides convenient shorthands. To construct the application use the ``composite-application`` target; .. code:: $ cd <../composite-applications> $ source .env/bin/activate $ make composite-application pyinstaller --onefile --hidden-import=_cffi_backend --log-level ERROR -p . job-batch.py pyinstaller --onefile --hidden-import=_cffi_backend --log-level ERROR -p . parse.py pyinstaller --onefile --hidden-import=_cffi_backend --log-level ERROR -p . codegen.py pyinstaller --onefile --hidden-import=_cffi_backend --log-level ERROR -p . vm.py pyinstaller --onefile --hidden-import=_cffi_backend --log-level ERROR -p . `which ansar-group` ansar create ansar deploy dist ansar add job-batch job-batch ansar add parse parse ansar add codegen codegen ansar add vm vm $ The composite application is located in ``.ansar-home`` and is ready to go. Use the ``run`` target to initiate batch processing; .. code:: $ cd <../composite-applications> $ source .env/bin/activate $ make run INPUT_BATCH=batch ansar update job-batch --input-batch=batch --output-batch= ansar run --main-role=job-batch Ack() This produces the ``batch.annotated`` file; .. code:: $ cat batch.annotated 1 (1.0) 2 (2.0) 3 (3.0) 4 (4.0) 5 (5.0) 6 (6.0) 7 (7.0) 8 (8.0) 9 (9.0) 0 (0.0) 1 + 2 (3.0) 3 + 4 (7.0) 5 + 6 (11.0) 7 + 8 (15.0) 9 + 0 (9.0) .. Try the ``run-debug`` target for a view of the internal operations. How Composition Works ===================== Most of what happens in the ``job-batch.py`` module is common to every ansar async application. Specific behaviour includes the delegation of establishing network transports to :class:`~.grouping.GroupTable` and the associated connection manager. For a general introduction to this facility, go to :ref:`exchanging-messages-with-multiple-servers`. In addition, a special feature of the manager is used to simplify the application even further - a session value is passed at creation time; .. code:: python def job_batch(self, group, settings): with open(settings.input_batch) as file: batch = [j.rstrip() for j in file] running = {} for i, j in enumerate(batch): a = self.create(Job, group, j) running[a] = i while running: m = self.select(ar.Completed, ar.Stop) if isinstance(m, ar.Stop): return ar.Aborted() i = running.pop(self.return_address, None) if i is None: continue if not isinstance(m.value, MachineValue): return ar.Faulted(f'job [{i}] returned unexpected value') batch[i] = f'{batch[i]} ({m.value.value})' with open(settings.output_batch, 'w') as f: for b in batch: f.write(f'{b}\n') return ar.Ack() ar.bind(job_batch) def create_job_batch(self, settings): group = ar.GroupTable( parse=ar.CreateFrame(ar.SubscribeToListing, PARSE_SERVICE), codegen=ar.CreateFrame(ar.SubscribeToListing, CODEGEN_SERVICE), vm=ar.CreateFrame(ar.SubscribeToListing, VM_SERVICE), ) session = ar.CreateFrame(job_batch, settings) a = group.create(self, session=session, get_ready=5.0) m = self.select(ar.Completed, ar.Stop) if isinstance(m, ar.Stop): self.send(m, a) self.select(ar.Completed, ar.Stop) return ar.Aborted() return m.value ar.bind(create_job_batch) .. .. # # if __name__ == '__main__': ar.create_node(create_job_batch, factory_settings=factory_settings) Access to the ``parse``, ``codegen`` and ``vm`` processes is declared by the named parameters passed to :class:`~.grouping.GroupTable`. A :class:`~.socketry.CreateFrame` is then used to capture the creation details for an instance of the ``job_batch`` function, and this is passed as the ``session`` value when creating the connection manager. When a full complement of connections is detected by the manager, it instantiates the session object passing the fully populated ``group`` variable as the first argument. Any additional arguments present in the :class:`~.socketry.CreateFrame` are pushed to the right. When the session object terminates the manager terminates. In addition, if any of the connections is lost a :class:`~.lifecycle.Stop` message is sent to the session. The result of these machinations is the simplest possible scenario for the actual application object (i.e. ``job_batch``). That object knows nothing about establishing connections or the loss of those connections. It is provided with all the address information it needs to perform the workflow and if anything occurs to interrupt that flow, it receives a :class:`~.lifecycle.Stop` message. The outcome is a very concise implementation. In fact, ``job_batch`` is not even aware it is dealing with objects running in other processes. The only downside to this arrangement is a lack of feedback to the user if there is a problem acquiring a full complement of connections. Without the ``get_ready`` timer value, the connection manager would just wait forever. Application workflow includes the following steps; * read the input file to produce a list of lines * create an async ``Job()`` object for each line * wait for the matching number of object completions * annotate the list with each completion * write the annotated list to the output file A FSM-based approach is used to move each expression through the different phases; .. code:: python class Job(ar.Point, ar.StateMachine): def __init__(self, group, line): ar.Point.__init__(self) ar.StateMachine.__init__(self, INITIAL) self.group = group self.line = line def Job_INITIAL_Start(self, message): self.send(Bodmas(self.line), self.group.parse) return PARSING def Job_PARSING_AbstractSyntaxTree(self, message): self.send(message, self.group.codegen) return GENERATING .. .. def Job_GENERATING_VirtualMachine(self, message): self.send(message, self.group.vm) return RUNNING .. .. def Job_RUNNING_MachineValue(self, message): self.complete(message) Each ``Job()`` is initialized with the group of proxy addresses and an expression. Processing begins with the sending of the expression to the ``group.parse`` address. An AST is expected in response. This continues through the phases until a ``MachineValue`` (i.e. a ``float``) is received from the ``vm`` process. The set of ``Job()`` objects run concurrently and analysis of the logs shows the general movement of activity from the ``parse`` phase through to the ``vm`` phase. There are several areas where this could be improved. An Obvious Improvement ====================== Initiation and termination of the language phase processes on every execution of the ``job-batch`` is a good demonstration of process orchestration but is also inefficient. There is no technical reason that these processes could not remain running at all times. They need to be placed in a separate group and made available for whenever the ``job-batch`` might require them. For this rearrangement to work there needs to be communication across the groups. The scope of pub-sub operations needs to be extended such that a :func:`~.subscribe` in the ``job-batch`` group can match a :func:`~.publish` in the language process group. .. _connecting-groups-of-processes: Connecting Groups Of Processes ------------------------------ A special ansar service is provided as a repo. Cloning the repo and following the setup instructions below results in a network facility that is a part of the **ansar-connect** operational environment. .. code:: $ cd $ git clone https://github.com/mr-ansar/ansar-services.git $ cd ansar-services $ python3 -m venv .services $ source .services/bin/activate $ pip3 install ansar-connect pyinstaller $ make ansar-host-service $ make start .. warning:: Installation of **ansar-host** as a platform service places the associated processes under the management of the platform. The ``ansar-services`` Makefile provides ``start``, ``stop`` and ``status`` targets that implement the appropriate platform commands. Use of the ansar process orchestration commands may clash with underlying technologies (e.g. cgroups) and should be limited to read-only commands, e.g. :ref:`ansar log `. This is the second service in a set of four related services provided by **ansar-connect**. Every use of the ansar CLI involving ``run`` or ``start`` automatically injects an instance of the first service - ``ansar-group``. This is the infra-structure process that gathers processes into a group while the **ansar-host** service gathers groups together. There are actually several services provided by **ansar-host** using a small group of ports starting at a standard port number (all the services listen on the loopback interface). These guides make use of the simplest option. When the networking requirements become more demanding there are the other options. Refer to the later sections that discuss the full **ansar-connect** architecture. Asynchronous applications join their group courtesy of the :func:`~.create_node` function. Connection is then discreet and automatic. Connection of groups to **ansar-host** must be configured, i.e. by default the pub-sub activities within a group are *private*. A Language Processing Network API --------------------------------- A distinct group needs to be created that will be used as the context for running the language processes; .. code:: $ cd <../composite-applications> $ source .env/bin/activate $ make language-group ansar network language --connect-scope=GROUP --to-scope=HOST ansar network language + HOST 127.0.0.1:32177 + GROUP 127.0.0.1:45489 $ Creation and connection of the ``language`` group is achieved with the first :ref:`ansar network ` command. The arguments express the specific connection, from the ``ansar-group`` to **ansar-host**. The second command presents a view of the current network configuration, verifying the efforts of the first command. It presents a simple rendering of the network environment available to any process within the named group, i.e. there is an instance of ``ansar-group`` at the network address ``127.0.0.1:45489`` and that process is further connected up to the **ansar-host** at ``127.0.0.1:32177``. The port number for the ``GROUP`` is ephemeral and changes on every ``run`` or ``start`` of the group (and every use of the :ref:`ansar network ` command). The port number for the ``HOST`` is fixed. The following command starts the set of language services. The ansar CLI places them in the background and returns immediately; .. code:: $ cd <../composite-applications> $ source .env/bin/activate $ make start-language ansar start --group-name=language parse codegen vm $ Use :ref:`ansar status ` and :ref:`ansar log ` to check for a successful start; .. code:: $ cd <../composite-applications> $ source .env/bin/activate $ ansar status --long-listing --groups codegen <2574151> (2574130) 13.9s group.language <2574130> (0) 14.1s parse <2574150> (2574130) 13.9s vm <2574152> (2574130) 13.9s $ ansar log parse 10:40:36.818 + <0000000a>SocketSelect - Created by <00000001> 10:40:36.818 < <0000000a>SocketSelect - Received Start from <00000001> 10:40:36.818 > <0000000a>SocketSelect - Sent SocketChannel to <00000001> 10:40:36.818 + <0000000b>PubSub[INITIAL] - Created by <00000001> 10:40:36.818 < <0000000b>PubSub[INITIAL] - Received Start from <00000001> 10:40:36.818 + <0000000c>lock_and_hold - Created by <00000001> 10:40:36.818 > <0000000c>lock_and_hold - Sent Ready to <00000001> 10:40:36.819 + <0000000d>node_vector - Created by <00000001> 10:40:36.819 ~ <0000000d>node_vector - Executable "/home/buster/composite-applications/.ansar-home/bin/parse" as node process (2554360) 10:40:36.819 ~ <0000000d>node_vector - Working folder "/" 10:40:36.819 ~ <0000000d>node_vector - Running object "__main__.parse" 10:40:36.819 ~ <0000000d>node_vector - Class threads (3) "subscribed" (3),"published" (2),"connect-to-peer" (1) 10:40:36.819 ~ <0000000d>node_vector - Detected group port 127.0.0.1:39627 10:40:36.819 + <0000000e>ServiceDirectory[INITIAL] - Created by <0000000d> 10:40:36.819 < <0000000e>ServiceDirectory[INITIAL] - Received Start from <0000000d> 10:40:36.819 ~ <0000000e>ServiceDirectory[INITIAL] - Scope of PROCESS 10:40:36.819 ~ <0000000e>ServiceDirectory[INITIAL] - Connecting up to "LocalPort" 10:40:36.819 ~ <0000000e>ServiceDirectory[INITIAL] - Listening below at "HostPort" 10:40:36.819 + <0000000f>ConnectToDirectory[INITIAL] - Created by <0000000e> 10:40:36.819 > <0000000e>ServiceDirectory[INITIAL] - Sent HostPort to <0000000d> 10:40:36.819 < <0000000d>node_vector - Received HostPort from <0000000e> 10:40:36.819 + <00000010>parse - Created by <0000000d> .. The language processes are ready. Connecting To Network APIs -------------------------- Previous use of :ref:`ansar run ` has been making discreet use of the ``default`` group. This group also needs to be connected to **ansar-host**; .. code:: $ cd <../composite-applications> $ source .env/bin/activate $ make default-network ansar network --connect-scope=GROUP --to-scope=HOST ansar network --published-services + HOST 127.0.0.1:32177 + + Published services (3) + + + codegen (0) + + + vm (0) + + + parse (0) + GROUP 127.0.0.1:45489 $ Adding the ``--published-services`` parameter produces a display of the current service information. That information indicates that there are services called ``codegen``, ``vm`` and ``parse`` available beyond the ``default`` group, but within the same host. Running ``job-batch`` within the new arrangement of processes looks like; .. code:: $ cd <../composite-applications> $ source .env/bin/activate $ make run-language INPUT_BATCH=batch ansar update job-batch --input-batch=batch --output-batch= ansar run job-batch --main-role=job-batch Ack() Rather than defaulting to all the processes defined within ``.ansar-home``, there is an explicit naming of the batch process. This creates a group of the single named process. The ``--main-role`` is still required to ensure that the final output from :ref:`ansar run ` is from the named process rather than a list of outputs from all the processes, i.e. a list of 1. The pub-sub machinery now finds the required services at a new location. All resulting connections are the same as for the previous arrangement, i.e. there is no overhead associated with network messaging between groups versus within a group. Further Notes ------------- Execution times are substantially reduced by avoiding the initiation and termination time consumed by the language processes. More importantly this was a demonstration of publish-subscribe networking and the flexibility that it brings. Relocation of signifant functionality was effected via a handful of CLI commands, all with zero impact on the application itself (i.e. ``job_batch()``). For those after ultimate performance it's also possible to locate one or more of those services within the application process. Again, with zero impact on the application. Relocation of the language processes into a separate group and connecting that group to **ansar-host** means that it is available to any process within the same host, i.e. given the necessary :ref:`ansar network ` commands. The language processes have not only become enduring (they outlive instances of ``job-batch``) they are now also available to others. Multiple copies of ``job-batch`` will connect to the single copy of the language processes. Using a single home folder for this demonstration kept the quantity of materials as low as possible. A group of processes that do not include a central workflow - only network services - is otherwise known as a network API. As an API this should have its own ``.ansar-home`` folder and starting the API group should be integrated into the platform for full availability.