.. _a-networked-laboratory: A Networked Laboratory ###################### A research laboratory probably has a variety of interesting devices performing experiments that generate data over time. There is probably a location within the laboratory that provides for monitoring and control of those devices. Depending on the scale of the site there may be a handful of devices or hundreds. From a technical perspective, a networked laboratory is different to a :ref:`composite application ` in that the latter is constrained to a single known network interface and therefore a single known IP address. Networking at a site like a laboratory involves multiple physical computing platforms with unknown network interfaces and unknown configuration of those interfaces. There is likely to be a DHCP server in operation but that is only the beginning of how complex networks can be. Arranging for the correct configuration of all the operational components within a large site can require specific expertise and ongoing diligence. Download materials supporting this guide and setup the environment with the following commands; .. code:: $ cd $ git clone https://github.com/mr-ansar/a-networked-laboratory.git $ cd a-networked-laboratory $ python3 -m venv .env $ source .env/bin/activate $ pip3 install ansar-connect pyinstaller This repo may be downloaded to multiple different hosts during the performance of this guide. The same download and setup commands are required for each host. Each host should also be enabled for pub-sub networking using the procedure described :ref:`here `. If the associated services are already in place this step can be skipped. .. _connecting-hosts-on-a-lan: Connecting Hosts On A LAN ************************* Pub-sub networking can be extended to operate across a LAN using a procedure similar to the procedure introduced for :ref:`composite-applications`. This procedure arranges for the presence of the third item of ansar infra-structure, i.e. **ansar-lan**. A specific technique is used for acquiring the host addresses in an automated fashion. This is described in :ref:`design and implementation ` and preserves the goals of pub-sub networking. For successful operation **ansar-lan** must be installed on a host within the LAN where the pub-sub activity is taking place, but the new service *cannot share a host* with any of the pub-sub software components. There are at least the following alternatives; * install on a virtual machine (e.g. `VirtualBox `_) running on an existing host, * install on an existing host reserved for site network services, * or install on a dedicated host, e.g. a Raspberry Pi. Wherever **ansar-lan** is installed the host must be configured with a static IP, ideally a standard value that is an element of a :ref:`defined address plan ` for ansar services. That address will be configured into every other host that wishes to be a part of pub-sub networking within this particular LAN, activity that is significantly automated with **ansar-lan** listening at the standard address. Connection of hosts to **ansar-lan** must be configured, i.e. by default the pub-sub activities within a host are *private*. Having established a Python environment on a host with a static IP, installation of **ansar-lan** can be achieved with the following commands; .. code:: $ cd $ git clone https://github.com/mr-ansar/ansar-services.git $ cd ansar-services $ python3 -m venv .services $ source .services/bin/activate $ pip3 install ansar-connect pyinstaller $ make ansar-lan-service $ make start .. warning:: Installation of **ansar-lan** as a platform service places the associated processes under the management of the platform. The ``ansar-services`` Makefile provides ``start``, ``stop`` and ``status`` targets that implement the appropriate platform commands. Use of the ansar process orchestration commands may clash with underlying technologies (e.g. cgroups) and should be limited to read-only commands, e.g. :ref:`ansar log `. There is now a small group of services, collectively known as **ansar-lan**, running on the current host. Polling Versus Device Events ++++++++++++++++++++++++++++ Network devices come in a variety of flavours. One of the operational distinctions is whether a device sends a stream of edge events or does the device need to be polled for a series of discrete samples. Due to its asynchronous underpinnings **ansar-connect** is comfortable with either. It also provides a nice feature - referred to as *fan-out* - for establishing connections to groups of services. Implementation of a small-to-medium laboratory is presented in the following paragraphs. There are distinct device implementations to meet the requirements of the different device types. A main workflow process is included that captures the streams of numeric values generated by the distributed devices. These values are recorded in the process logs. An :ref:`ansar log ` command can be used to retrieve the values and output a table amenable to post-processing. A small module is used to simulate a physical device (``wandering_float.py``). It provides both a traditional blocking interface (i.e. ``get_wandering()``) and an async interface. Passing an async address at creation time results in a stream of simulated edge events arriving at the specified address. This simulation is used for implementation of both the poll and event based devices. A copy is provided at the end of this section as a quick reference. .. _periodic-sampling: Periodic Sampling ***************** A temperature sensor provides a single request-response facility where any connected client can request the current temperature at any time. There is an expectation that clients are making their requests in a periodic fashion but this responsibility lies entirely with the client; .. code:: python import uuid import ansar.connect as ar from temperature_if import * from wandering_float import * # # def temperature_device(self): metric = 'temperature' style = 'polled' unique = uuid.uuid4() name = f'device-{metric}-{style}+{unique}' ar.publish(self, name) m = self.select(ar.Published, ar.NotPublished, ar.Stop) if isinstance(m, ar.NotPublished): return m elif isinstance(m, ar.Stop): return ar.Aborted() a = self.create(wandering_float, 50.0, -50.0) while True: m = self.select(ar.Enquiry, ar.Stop) if isinstance(m, ar.Enquiry): pass else: self.send(ar.Stop(), a) m = self.select(ar.Completed, ar.Stop) ar.retract(self) return ar.Aborted() # Call the blocking interface to the simulation # and reply to the poll. temperature = get_wandering() self.reply(TemperatureSample(temperature)) ar.bind(temperature_device) # # if __name__ == '__main__': ar.create_node(temperature_device) Significant points in this code are; * :func:`~.create_node` is used to initiate the process and create the ``temperature_device`` object, * construction of a formatted ``name`` that is passed to :func:`~.publish`, * the first call to :meth:`~.select` to confirm publication, * creation of the ``wandering_float`` object for simulation of change, * a message processing loop that accepts requests and terminations, * the call to ``get_wandering()`` to extract the current reading, * the call to :meth:`~.Point.reply` to send the reading to the requesting client. There are zero or more clients connected at any one time. The request-response nature of the messaging means that there is no need for processing of connection-related messages, such as :class:`~.Delivered` and :class:`~.Dropped`. By simply calling the :meth:`~.Point.reply` method the response to each :class:`~.lifecycle.Enquiry` is routed to the correct client. The result is a very compact implementation, though the lack of anything suggesting the handling of inbound connections can seem odd. A quick scan of related logs will reveal the connection activity happening behind the scenes. Use of a formatted name for the call to :func:`~.publish` allows for specific processing that will be explained in following paragraphs. Appending a UUID ensures that every instance of this executable publishes a unique name. Unsolicited Events ****************** A strain guage provides a stream of edge events, i.e. a sequence of unsolicited messages arriving at unknown and variable intervals. Implementation of this type of device looks like; .. code:: python import uuid import ansar.connect as ar from strain_if import * from wandering_float import * # # def strain_device(self): metric = 'strain' style = 'async' unique = uuid.uuid4() name = f'device-{metric}-{style}+{unique}' delivered = {} # Remember clients. ar.publish(self, name) m = self.select(ar.Published, ar.NotPublished, ar.Stop) if isinstance(m, ar.NotPublished): return m elif isinstance(m, ar.Stop): return ar.Aborted() a = self.create(wandering_float, 100.0, -100.0, self.address) while True: m = self.select(Wandering, ar.Delivered, ar.Cleared, ar.Dropped, ar.Stop) if isinstance(m, Wandering): pass elif isinstance(m, ar.Delivered): delivered[self.return_address] = m continue elif isinstance(m, (ar.Cleared, ar.Dropped)): delivered.pop(self.return_address) continue else: self.send(ar.Stop(), a) m = self.select(ar.Completed, ar.Stop) ar.retract(self) return ar.Aborted() # Convert the simulation value to a strain value and # broadcast to the current set of clients. edge = StrainEdge(m.value) for d in delivered.keys(): self.send(edge, d) ar.bind(strain_device) # # if __name__ == '__main__': ar.create_node(strain_device) Significant points in this code are; * :func:`~.create_node` is used to initiate the process and create the ``strain_device`` object, * construction of a formatted ``name`` that is passed to :func:`~.publish`, * the first call to :meth:`~.select` to confirm publication, * creation of the ``wandering_float`` object for simulation of change, * a message processing loop that accepts simulation messages, session messages and terminations, * the broadcast of a ``StrainEdge`` to each connected client. This device provides a more sophisticated service. Finer details of activity in the physical world are not lost to long polling intervals and long periods of inactivity are appropriately easy on computing resources, such as network bandwidth and disk space. The service is registered with a formatted name, the same format as that adopted by the polling device. However, values inserted within the name are different and reflect the different natures of the respective devices. The ``wandering_float`` simulation sends notifications of significant change as they happen. These arrive as ``Wandering`` messages and cause the broadcast of ``StrainEdge`` messages to the current set of clients. Unsolicited messaging requires a list of clients to be notified and this is derived from the connection-related messages, :class:`~.Delivered`, :class:`~.Cleared` and :class:`~.Dropped`. As a consequence implementation of an async device is slightly more involved than the polling device. .. _device-connection-by-fan-out: Device Connection By Fan-Out **************************** There are assumed to be multiple instances of both the polling and event-based devices, spread around the LAN. Gathering all the associated measurements into a central location would normally require a significant quantity of code to arrange for network connections, message exchange and the logging of sample values. Using a specific capability of pub-sub networking, a minimal implementation can be achieved with a page of concise code; .. code:: python import ansar.connect as ar from db_if import * from temperature_if import * from strain_if import * # # def networked_laboratory(self): device_search = r'device-[a-zA-Z]+-[a-zA-Z]+\+.*' ar.subscribe(self, device_search) m = self.select(ar.Subscribed, ar.Stop) if isinstance(m, ar.Stop): return ar.Aborted() polled = {} self.start(ar.T1, 2.0, repeating=True) while True: m = self.select(ar.T1, TemperatureSample, StrainEdge, ar.Available, ar.Cleared, ar.Dropped, ar.Stop) if isinstance(m, ar.T1): for a in polled.keys(): self.send(ar.Enquiry(), a) elif isinstance(m, ar.Available): if 'polled' in m.matched_name: polled[self.return_address] = m.matched_name elif isinstance(m, (ar.Cleared, ar.Dropped)): polled.pop(self.return_address, None) elif isinstance(m, TemperatureSample): device = self.return_address[-1] self.sample(temperature=m.temperature, device=device) elif isinstance(m, StrainEdge): device = self.return_address[-1] self.sample(strain=m.strain, device=device) else: self.cancel(ar.T1) ar.retract(self) return ar.Aborted() ar.bind(networked_laboratory) # # if __name__ == '__main__': ar.create_node(networked_laboratory) Significant points in this code are; * :func:`~.create_node` is used to initiate the process and create the ``networked_laboratory`` object, * declaration of a search pattern ``device_search`` that is passed to :func:`~.subscribe`, * the first call to :meth:`~.select` to confirm subscription, * definition of the ``polled`` map that will hold addresses for those devices needing explicit requests, * initiation of a repeating timer, i.e. instances of ``T1`` will arrive at regular 2.0 second intervals, * a message processing loop that accepts timers, session messages, device measurements and terminations, * the saving of received measurements using sample(). The name passed to the :func:`~.subscribe` function is internally treated as a search string or *regular expression*. Each instance of a service with a matching name causes a connection process, i.e. a single :func:`~.subscribe` can result in multiple, concurrent message sessions. There is 1-to-many *fan-out* from client to services. By adopting a naming convention for services there is also the ability to embed attributes of the service into its name. Laboratory devices are expected to include the type of value being measured and the model of operation. The latter is used to select new connections for inclusion in the ``polled`` map. All the devices in this map receive the periodic :class:`~.lifecycle.Enquiry` message, prompting those devices to send their measurements in response. An alternative to the formatting of service names might be an initial exchange of device type messages. Calls to the sample() method places the named values in the logs. As raw logging entries they look like this; .. code:: $ ansar log laboratory 2023-05-14T21:28:36.201 & <00000010>networked_laboratory - strain=-71.75689944050605:device=79 2023-05-14T21:28:36.201 < <00000010>networked_laboratory - Received StrainEdge from <0000004f> 2023-05-14T21:28:36.201 & <00000010>networked_laboratory - strain=-71.46197966572227:device=79 2023-05-14T21:28:36.448 > <0000000a>SocketSelect - Forward StrainEdge to <0000004d> (from <0000004c>) 2023-05-14T21:28:36.448 < <00000010>networked_laboratory - Received StrainEdge from <0000004c> 2023-05-14T21:28:36.449 & <00000010>networked_laboratory - strain=7.539539017493073:device=76 2023-05-14T21:28:36.698 > <0000000a>SocketSelect - Forward StrainEdge to <0000004d> (from <0000004c>) 2023-05-14T21:28:36.699 < <00000010>networked_laboratory - Received StrainEdge from <0000004c> .. Adding a sample selection critera results in a format more amenable to post-processing; .. code:: $ ansar log laboratory --sample=time,strain,device 2023-05-14T21:29:36.290 -40.85305843723194 72 2023-05-14T21:29:36.290 -72.18458625093172 79 2023-05-14T21:29:36.539 92.21213120840237 28 2023-05-14T21:29:36.539 56.87339143612081 25 2023-05-14T21:29:37.040 -71.71396139352186 79 2023-05-14T21:29:37.041 50.52315045983453 27 2023-05-14T21:29:38.290 -71.99501985844601 79 2023-05-14T21:29:38.793 48.628142543211254 82 .. To be included in the output all the named values must appear in the sample key-value list, e.g. ``strain=7.539539017493073:device=76``. The timestamp for each log is made available for selection. An address value is used as a device id. This produces a small number that is unique to the origin of the sample but doesnt identify the device in a readable or enduring way. Using the UUID from the matched name would be a better approach but would require a bit more code. Deployment Of A Networked Laboratory ************************************ To deploy the main process on the current host, use the following commands; .. code:: $ cd <../a-networked-laboratory> $ source .env/bin/activate $ make laboratory pyinstaller --onefile --hidden-import=_cffi_backend --log-level ERROR -p . networked-laboratory.py pyinstaller --onefile --hidden-import=_cffi_backend --log-level ERROR -p . strain_device.py pyinstaller --onefile --hidden-import=_cffi_backend --log-level ERROR -p . temperature_device.py pyinstaller --onefile --hidden-import=_cffi_backend --log-level ERROR -p . `which ansar-group` ansar create ansar deploy dist ansar add networked-laboratory laboratory ansar network --connect-scope=GROUP --to-scope=HOST ansar network --connect-scope=HOST --to-scope=LAN $ ansar network + LAN 192.168.1.176:32177 + HOST 127.0.0.1:32177 + GROUP 127.0.0.1:45489 This builds an ansar container of processes (i.e. ``.ansar-home``) and populates it with the single ``networked-laboratory`` process. The final :ref:`ansar network ` command verifies all the connections from the default group to the local instance of **ansar-host** and from that service to **ansar-lan**. To start the main process use; .. code:: $ cd <../a-networked-laboratory> $ source .env/bin/activate $ make start-laboratory $ ansar log laboratory 02:22:36.247 + <0000000a>SocketSelect - Created by <00000001> 02:22:36.247 < <0000000a>SocketSelect - Received Start from <00000001> 02:22:36.247 > <0000000a>SocketSelect - Sent SocketChannel to <00000001> 02:22:36.247 + <0000000b>PubSub[INITIAL] - Created by <00000001> 02:22:36.247 < <0000000b>PubSub[INITIAL] - Received Start from <00000001> 02:22:36.247 + <0000000c>lock_and_hold - Created by <00000001> 02:22:36.248 > <0000000c>lock_and_hold - Sent Ready to <00000001> 02:22:36.248 + <0000000d>node_vector - Created by <00000001> 02:22:36.248 ~ <0000000d>node_vector - Executable "/home/buster/a-networked-laboratory/.ansar-home/bin/networked-laboratory" as node process (762586) 02:22:36.248 ~ <0000000d>node_vector - Working folder "/" 02:22:36.248 ~ <0000000d>node_vector - Running object "__main__.networked_laboratory" 02:22:36.248 ~ <0000000d>node_vector - Class threads (3) "subscribed" (3),"published" (2),"connect-to-peer" (1) .. Deployment Of Laboratory Devices ******************************** To deploy a set of device processes alonside the main process, use the following commands; .. code:: $ cd <../a-networked-laboratory> $ source .env/bin/activate $ make devices ansar add strain_device --count=8 ansar add temperature_device --count=8 ansar network devices --connect-scope=GROUP --to-scope=HOST ansar network devices --connect-scope=HOST --to-scope=LAN $ ansar network devices + LAN 192.168.1.176:32177 + HOST 127.0.0.1:32177 + GROUP 127.0.0.1:45489 This adds a set of ``strain_device`` and ``temperature_device`` processes to the ansar container of processes (i.e. ``.ansar-home``). The final :ref:`ansar network ` command verifies all the connections from the ``devices`` group to the local instance of **ansar-host** and from that service to **ansar-lan**. To start the device processes and observe their immediate integration; .. code:: $ cd <../a-networked-laboratory> $ source .env/bin/activate $ make start-devices $ ansar log laboratory .. 03:30:12.651 < <00000049>SocketProxy[NORMAL] - Received Enquiry from <00000010> 03:30:12.651 < <0000004c>SocketProxy[NORMAL] - Received Enquiry from <00000010> 03:30:12.651 < <0000004e>SocketProxy[NORMAL] - Received Enquiry from <00000010> 03:30:12.651 < <00000051>SocketProxy[NORMAL] - Received Enquiry from <00000010> 03:30:12.652 < <00000057>SocketProxy[NORMAL] - Received Enquiry from <00000010> 03:30:12.652 > <0000000a>SocketSelect - Forward TemperatureSample to <00000010> (from <00000018>) 03:30:12.652 < <00000010>networked_laboratory - Received TemperatureSample from <00000018> 03:30:12.652 ^ <00000010>networked_laboratory - temperature=2.195881429827529 03:30:12.652 > <0000000a>SocketSelect - Forward TemperatureSample to <00000010> (from <0000002f>) 03:30:12.652 > <0000000a>SocketSelect - Forward TemperatureSample to <00000010> (from <00000049>) 03:30:12.653 > <0000000a>SocketSelect - Forward TemperatureSample to <00000010> (from <0000004c>) 03:30:12.653 < <00000010>networked_laboratory - Received TemperatureSample from <0000002f> 03:30:12.653 ^ <00000010>networked_laboratory - temperature=0.3588938138273894 03:30:12.653 < <00000010>networked_laboratory - Received TemperatureSample from <00000049> 03:30:12.653 < <00000010>networked_laboratory - Received TemperatureSample from <0000005b> 03:30:12.653 ^ <00000010>networked_laboratory - temperature=-1.3324234364876795 03:30:12.788 > <0000000a>SocketSelect - Forward StrainEdge to <00000019> (from <00000017>) 03:30:12.788 < <00000010>networked_laboratory - Received StrainEdge from <00000017> 03:30:12.788 ^ <00000010>networked_laboratory - strain=3.6376587452001408 03:30:13.302 > <0000000a>SocketSelect - Forward StrainEdge to <0000005c> (from <0000005a>) 03:30:13.302 < <00000010>networked_laboratory - Received StrainEdge from <0000005a> 03:30:13.302 ^ <00000010>networked_laboratory - strain=0.5940089160811511 03:30:13.789 > <0000000a>SocketSelect - Forward StrainEdge to <0000002a> (from <00000029>) 03:30:13.790 < <00000010>networked_laboratory - Received StrainEdge from <00000029> 03:30:13.790 ^ <00000010>networked_laboratory - strain=-0.7012809873488278 03:30:14.039 > <0000000a>SocketSelect - Forward StrainEdge to <00000019> (from <00000017>) 03:30:14.040 < <00000010>networked_laboratory - Received StrainEdge from <00000017> 03:30:14.040 ^ <00000010>networked_laboratory - strain=3.1584128234835824 03:30:14.056 > <0000000a>SocketSelect - Forward StrainEdge to <0000005c> (from <0000005a>) 03:30:14.057 < <00000010>networked_laboratory - Received StrainEdge from <0000005a> 03:30:14.057 ^ <00000010>networked_laboratory - strain=0.12563950083121467 $ Device readings from both the strain and temperature devices are finding their way to the ``networked_laboratory`` object. Logs show the outbound :class:`~.lifecycle.Enquiry` messages being sent to the appropriate poll-based devices and the strain readings arriving according to their own rhythms. To demonstrate the operation of pub-sub networking across the LAN, deploy the devices at an additional host. Download of the repo and initial setup remains the same. Output from the deployment command is longer reflecting the fact that the main process was not previously deployed; .. code:: $ cd $ git clone https://github.com/mr-ansar/a-networked-laboratory.git $ cd a-networked-laboratory $ python3 -m venv .env $ source .env/bin/activate $ pip3 install ansar-connect pyinstaller $ make devices pyinstaller --onefile --hidden-import=_cffi_backend --log-level ERROR -p . networked-laboratory.py pyinstaller --onefile --hidden-import=_cffi_backend --log-level ERROR -p . strain_device.py pyinstaller --onefile --hidden-import=_cffi_backend --log-level ERROR -p . temperature_device.py pyinstaller --onefile --hidden-import=_cffi_backend --log-level ERROR -p . `which ansar-group` ansar create ansar deploy dist ansar add strain_device --count=8 ansar add temperature_device --count=8 ansar network devices --connect-scope=GROUP --to-scope=HOST ansar network devices --connect-scope=HOST --to-scope=LAN $ ansar network devices + LAN 192.168.1.176:32177 + HOST 127.0.0.1:32177 + GROUP 127.0.0.1:45489 Starting the devices remains the same. To observe the immediate integration of the new devices, view the logs from one of the local devices; .. code:: $ cd <../a-networked-laboratory> $ source .env/bin/activate $ make start-devices $ ansar log temperature_device-0 04:03:02.541 + <0000000a>SocketSelect - Created by <00000001> 04:03:02.541 < <0000000a>SocketSelect - Received Start from <00000001> 04:03:02.541 > <0000000a>SocketSelect - Sent SocketChannel to <00000001> 04:03:02.541 + <0000000b>PubSub[INITIAL] - Created by <00000001> 04:03:02.542 < <0000000b>PubSub[INITIAL] - Received Start from <00000001> 04:03:02.542 + <0000000c>lock_and_hold - Created by <00000001> 04:03:02.542 > <0000000c>lock_and_hold - Sent Ready to <00000001> 04:03:02.543 + <0000000d>node_vector - Created by <00000001> 04:03:02.543 ~ <0000000d>node_vector - Executable "/home/buster/a-networked-laboratory/.ansar-home/bin/temperature_device" as node process (774827) 04:03:02.543 ~ <0000000d>node_vector - Working folder "/" .. 04:03:02.545 > <0000000f>ConnectToDirectory[PENDING] - Sent CancelTimer to <00000003> 04:03:02.545 > <0000000f>ConnectToDirectory[PENDING] - Sent UseAddress to <0000000e> 04:03:02.545 < <0000000e>ServiceDirectory[NORMAL] - Received ServiceListing from <00000011> 04:03:02.545 ~ <0000000e>ServiceDirectory[NORMAL] - Added listing "device-temperature-polled-36b3dcaa-f49a-4f6e-b98e-e8fe756813a4" 04:03:02.545 < <0000000e>ServiceDirectory[NORMAL] - Received UseAddress from <0000000f> 04:03:02.545 > <0000000e>ServiceDirectory[NORMAL] - Sent PushedDirectory to <00000012> .. 04:03:02.575 > <0000000a>SocketSelect - Forward InboundOverAccepted to <00000011> (from <00000012>) 04:03:02.575 < <00000011>PublishingAgent[READY] - Received InboundOverAccepted from <00000012> 04:03:02.575 ~ <00000011>PublishingAgent[READY] - Added peer route [LAN](11,14,59,24@(17, 58, 91, 36)4354f399-7a92-40f9-9770-6043c1e36fa8:device-temperature-polled-36b3dcaa-f49a-4f6e-b98e-e8fe756813a4) 04:03:02.580 > <0000000a>SocketSelect - Forward OpenLoop to <00000011> (from <00000014>) 04:03:02.580 < <00000011>PublishingAgent[READY] - Received OpenLoop from <00000014> 04:03:02.580 + <00000015>PublisherLoop[INITIAL] - Created by <00000011> 04:03:02.580 < <00000015>PublisherLoop[INITIAL] - Received Start from <00000011> 04:03:02.580 > <00000015>PublisherLoop[INITIAL] - Sent LoopOpened to <00000014> 04:03:02.580 > <00000015>PublisherLoop[INITIAL] - Forward Delivered to <00000010> (from <00000014>) 04:03:02.580 < <00000010>temperature_device - Dropped Delivered from <00000014> 04:03:02.580 < <00000014>SocketProxy[NORMAL] - Received LoopOpened from <00000015> 04:03:04.223 > <0000000a>SocketSelect - Forward Enquiry to <00000010> (from <00000014>) 04:03:04.223 < <00000010>temperature_device - Received Enquiry from <00000014> 04:03:04.223 > <00000010>temperature_device - Sent TemperatureSample to <00000014> 04:03:04.223 < <00000014>SocketProxy[NORMAL] - Received TemperatureSample from <00000010> 04:03:06.045 < <00000013>wandering_float - Received T1 from <00000003> 04:03:06.045 > <00000013>wandering_float - Sent StartTimer to <00000003> 04:03:06.226 > <0000000a>SocketSelect - Forward Enquiry to <00000010> (from <00000014>) These logs show the activity related to the call to :func:`~.publish`, negotiation of a communications path, and the first request-response messaging. The remote ``networked_laboratory`` process is sending the :class:`~.lifecycle.Enquiry` request and the local ``temperature_device`` is responding with a ``TemperatureSample``. Wandering Float *************** Implementation for the device simulation appears below. A thread-based async object enters a forever loop that performs the following steps; * calculates a random time period * waits for the calculated period or an interruption (e.g. control-c) * synthesizes an "adjustment" to the current value * ensures the updated value is within limits * sends the new value to a client (optional) Clients may also be calling the ``get_wandering()`` function which provides blocking access to the current simulation value. There may be potential for problems given the lack of multi-access protection. However, with only one party writing to the simulation value and the tutorial nature of this material, this seems good enough. .. code:: python import random import ansar.connect as ar __all__ = [ 'Wandering', 'get_wandering', 'wandering_float', ] # Edge event. class Wandering(object): def __init__(self, value=None): self.value = value ar.bind(Wandering, object_schema={'value': ar.Float8()}) # The changing value. float_value = 0.0 # Blocking interface for polling clients. def get_wandering(): global float_value return float_value random.seed() DOWN_UP= (-1.0, 1.0) # Thread to modify the value over time. def wandering_float(self, high, low, client=None): global float_value seconds = random.random() * 5.0 self.start(ar.T1, seconds) while True: m = self.select(ar.T1, ar.Stop) if isinstance(m, ar.T1): pass else: return ar.Aborted() # Control-c. # Fabricate a change of float_value. s = random.randrange(2) # Up or down. d = random.random() * DOWN_UP[s] # Delta. c = float_value # Current. t = c + d if t < low or t > high: # Keep it in range. float_value = c - d # Invert. else: float_value = t if client: self.send(Wandering(float_value), client) seconds = random.random() * 5.0 self.start(ar.T1, seconds) ar.bind(wandering_float)