A Networked Laboratory
A research laboratory probably has a variety of interesting devices performing experiments that generate data over time. There is probably a location within the laboratory that provides for monitoring and control of those devices. Depending on the scale of the site there may be a handful of devices or hundreds.
From a technical perspective, a networked laboratory is different to a composite application in that the latter is constrained to a single known network interface and therefore a single known IP address. Networking at a site like a laboratory involves multiple physical computing platforms with unknown network interfaces and unknown configuration of those interfaces. There is likely to be a DHCP server in operation but that is only the beginning of how complex networks can be. Arranging for the correct configuration of all the operational components within a large site can require specific expertise and ongoing diligence.
Download materials supporting this guide and setup the environment with the following commands;
$ cd <folder-of-repos>
$ git clone https://github.com/mr-ansar/a-networked-laboratory.git
$ cd a-networked-laboratory
$ python3 -m venv .env
$ source .env/bin/activate
$ pip3 install ansar-connect pyinstaller
This repo may be downloaded to multiple different hosts during the performance of this guide. The same download and setup commands are required for each host. Each host should also be enabled for pub-sub networking using the procedure described here. If the associated services are already in place this step can be skipped.
Connecting Hosts On A LAN
Pub-sub networking can be extended to operate across a LAN using a procedure similar to the procedure introduced for Composite Applications. This procedure arranges for the presence of the third item of ansar infra-structure, i.e. ansar-lan.
A specific technique is used for acquiring the host addresses in an automated fashion. This is described in design and implementation and preserves the goals of pub-sub networking. For successful operation ansar-lan must be installed on a host within the LAN where the pub-sub activity is taking place, but the new service cannot share a host with any of the pub-sub software components. There are at least the following alternatives;
install on a virtual machine (e.g. VirtualBox) running on an existing host,
install on an existing host reserved for site network services,
or install on a dedicated host, e.g. a Raspberry Pi.
Wherever ansar-lan is installed the host must be configured with a static IP, ideally a standard value that is an element of a defined address plan for ansar services. That address will be configured into every other host that wishes to be a part of pub-sub networking within this particular LAN, activity that is significantly automated with ansar-lan listening at the standard address. Connection of hosts to ansar-lan must be configured, i.e. by default the pub-sub activities within a host are private.
Having established a Python environment on a host with a static IP, installation of ansar-lan can be achieved with the following commands;
$ cd <folder-of-operational-services>
$ git clone https://github.com/mr-ansar/ansar-services.git
$ cd ansar-services
$ python3 -m venv .services
$ source .services/bin/activate
$ pip3 install ansar-connect pyinstaller
$ make ansar-lan-service
$ make start
Warning
Installation of ansar-lan as a platform service places the associated processes under
the management of the platform. The ansar-services Makefile provides start, stop and
status targets that implement the appropriate platform commands. Use of the ansar
process orchestration commands may clash with underlying technologies (e.g. cgroups) and
should be limited to read-only commands, e.g. ansar log.
There is now a small group of services, collectively known as ansar-lan, running on the current host.
Polling Versus Device Events
Network devices come in a variety of flavours. One of the operational distinctions is whether a device sends a stream of edge events or does the device need to be polled for a series of discrete samples. Due to its asynchronous underpinnings ansar-connect is comfortable with either. It also provides a nice feature - referred to as fan-out - for establishing connections to groups of services.
Implementation of a small-to-medium laboratory is presented in the following paragraphs. There are distinct device implementations to meet the requirements of the different device types. A main workflow process is included that captures the streams of numeric values generated by the distributed devices. These values are recorded in the process logs. An ansar log command can be used to retrieve the values and output a table amenable to post-processing.
A small module is used to simulate a physical device (wandering_float.py). It provides both a
traditional blocking interface (i.e. get_wandering()) and an async interface. Passing an async address
at creation time results in a stream of simulated edge events arriving at the specified address. This
simulation is used for implementation of both the poll and event based devices. A copy is provided at
the end of this section as a quick reference.
Periodic Sampling
A temperature sensor provides a single request-response facility where any connected client can request the current temperature at any time. There is an expectation that clients are making their requests in a periodic fashion but this responsibility lies entirely with the client;
import uuid
import ansar.connect as ar
from temperature_if import *
from wandering_float import *
#
#
def temperature_device(self):
metric = 'temperature'
style = 'polled'
unique = uuid.uuid4()
name = f'device-{metric}-{style}+{unique}'
ar.publish(self, name)
m = self.select(ar.Published, ar.NotPublished, ar.Stop)
if isinstance(m, ar.NotPublished):
return m
elif isinstance(m, ar.Stop):
return ar.Aborted()
a = self.create(wandering_float, 50.0, -50.0)
while True:
m = self.select(ar.Enquiry, ar.Stop)
if isinstance(m, ar.Enquiry):
pass
else:
self.send(ar.Stop(), a)
m = self.select(ar.Completed, ar.Stop)
ar.retract(self)
return ar.Aborted()
# Call the blocking interface to the simulation
# and reply to the poll.
temperature = get_wandering()
self.reply(TemperatureSample(temperature))
ar.bind(temperature_device)
#
#
if __name__ == '__main__':
ar.create_node(temperature_device)
Significant points in this code are;
create_node()is used to initiate the process and create thetemperature_deviceobject,construction of a formatted
namethat is passed topublish(),the first call to
select()to confirm publication,creation of the
wandering_floatobject for simulation of change,a message processing loop that accepts requests and terminations,
the call to
get_wandering()to extract the current reading,the call to
reply()to send the reading to the requesting client.
There are zero or more clients connected at any one time. The request-response nature of the
messaging means that there is no need for processing of connection-related messages, such
as Delivered and Dropped. By simply calling the reply() method the response to
each Enquiry is routed to the correct client. The result is a very compact implementation,
though the lack of anything suggesting the handling of inbound connections can seem
odd. A quick scan of related logs will reveal the connection activity happening behind
the scenes.
Use of a formatted name for the call to publish() allows for specific processing that will
be explained in following paragraphs. Appending a UUID ensures that every instance of this
executable publishes a unique name.
Unsolicited Events
A strain guage provides a stream of edge events, i.e. a sequence of unsolicited messages arriving at unknown and variable intervals. Implementation of this type of device looks like;
import uuid
import ansar.connect as ar
from strain_if import *
from wandering_float import *
#
#
def strain_device(self):
metric = 'strain'
style = 'async'
unique = uuid.uuid4()
name = f'device-{metric}-{style}+{unique}'
delivered = {} # Remember clients.
ar.publish(self, name)
m = self.select(ar.Published, ar.NotPublished, ar.Stop)
if isinstance(m, ar.NotPublished):
return m
elif isinstance(m, ar.Stop):
return ar.Aborted()
a = self.create(wandering_float, 100.0, -100.0, self.address)
while True:
m = self.select(Wandering, ar.Delivered, ar.Cleared, ar.Dropped, ar.Stop)
if isinstance(m, Wandering):
pass
elif isinstance(m, ar.Delivered):
delivered[self.return_address] = m
continue
elif isinstance(m, (ar.Cleared, ar.Dropped)):
delivered.pop(self.return_address)
continue
else:
self.send(ar.Stop(), a)
m = self.select(ar.Completed, ar.Stop)
ar.retract(self)
return ar.Aborted()
# Convert the simulation value to a strain value and
# broadcast to the current set of clients.
edge = StrainEdge(m.value)
for d in delivered.keys():
self.send(edge, d)
ar.bind(strain_device)
#
#
if __name__ == '__main__':
ar.create_node(strain_device)
Significant points in this code are;
create_node()is used to initiate the process and create thestrain_deviceobject,construction of a formatted
namethat is passed topublish(),the first call to
select()to confirm publication,creation of the
wandering_floatobject for simulation of change,a message processing loop that accepts simulation messages, session messages and terminations,
the broadcast of a
StrainEdgeto each connected client.
This device provides a more sophisticated service. Finer details of activity in the physical world are not lost to long polling intervals and long periods of inactivity are appropriately easy on computing resources, such as network bandwidth and disk space.
The service is registered with a formatted name, the same format as that adopted by the polling device. However, values inserted within the name are different and reflect the different natures of the respective devices.
The wandering_float simulation sends notifications of significant change as they
happen. These arrive as Wandering messages and cause the broadcast of StrainEdge
messages to the current set of clients.
Unsolicited messaging requires a list of clients to be notified and this is derived from
the connection-related messages, Delivered, Cleared and Dropped.
As a consequence implementation of an async device is slightly more involved than the polling device.
Device Connection By Fan-Out
There are assumed to be multiple instances of both the polling and event-based devices, spread around the LAN. Gathering all the associated measurements into a central location would normally require a significant quantity of code to arrange for network connections, message exchange and the logging of sample values. Using a specific capability of pub-sub networking, a minimal implementation can be achieved with a page of concise code;
import ansar.connect as ar
from db_if import *
from temperature_if import *
from strain_if import *
#
#
def networked_laboratory(self):
device_search = r'device-[a-zA-Z]+-[a-zA-Z]+\+.*'
ar.subscribe(self, device_search)
m = self.select(ar.Subscribed, ar.Stop)
if isinstance(m, ar.Stop):
return ar.Aborted()
polled = {}
self.start(ar.T1, 2.0, repeating=True)
while True:
m = self.select(ar.T1,
TemperatureSample, StrainEdge,
ar.Available, ar.Cleared, ar.Dropped,
ar.Stop)
if isinstance(m, ar.T1):
for a in polled.keys():
self.send(ar.Enquiry(), a)
elif isinstance(m, ar.Available):
if 'polled' in m.matched_name:
polled[self.return_address] = m.matched_name
elif isinstance(m, (ar.Cleared, ar.Dropped)):
polled.pop(self.return_address, None)
elif isinstance(m, TemperatureSample):
device = self.return_address[-1]
self.sample(temperature=m.temperature, device=device)
elif isinstance(m, StrainEdge):
device = self.return_address[-1]
self.sample(strain=m.strain, device=device)
else:
self.cancel(ar.T1)
ar.retract(self)
return ar.Aborted()
ar.bind(networked_laboratory)
#
#
if __name__ == '__main__':
ar.create_node(networked_laboratory)
Significant points in this code are;
create_node()is used to initiate the process and create thenetworked_laboratoryobject,declaration of a search pattern
device_searchthat is passed tosubscribe(),the first call to
select()to confirm subscription,definition of the
polledmap that will hold addresses for those devices needing explicit requests,initiation of a repeating timer, i.e. instances of
T1will arrive at regular 2.0 second intervals,a message processing loop that accepts timers, session messages, device measurements and terminations,
the saving of received measurements using sample().
The name passed to the subscribe() function is internally treated as a search string or regular expression.
Each instance of a service with a matching name causes a connection process, i.e. a single subscribe() can
result in multiple, concurrent message sessions. There is 1-to-many fan-out from client to services.
By adopting a naming convention for services there is also the ability to embed attributes of the service
into its name. Laboratory devices are expected to include the type of value being measured and the model
of operation. The latter is used to select new connections for inclusion in the polled map. All the
devices in this map receive the periodic Enquiry message, prompting those devices to send their
measurements in response. An alternative to the formatting of service names might be an initial exchange
of device type messages.
Calls to the sample() method places the named values in the logs. As raw logging entries they look like this;
$ ansar log laboratory
2023-05-14T21:28:36.201 & <00000010>networked_laboratory - strain=-71.75689944050605:device=79
2023-05-14T21:28:36.201 < <00000010>networked_laboratory - Received StrainEdge from <0000004f>
2023-05-14T21:28:36.201 & <00000010>networked_laboratory - strain=-71.46197966572227:device=79
2023-05-14T21:28:36.448 > <0000000a>SocketSelect - Forward StrainEdge to <0000004d> (from <0000004c>)
2023-05-14T21:28:36.448 < <00000010>networked_laboratory - Received StrainEdge from <0000004c>
2023-05-14T21:28:36.449 & <00000010>networked_laboratory - strain=7.539539017493073:device=76
2023-05-14T21:28:36.698 > <0000000a>SocketSelect - Forward StrainEdge to <0000004d> (from <0000004c>)
2023-05-14T21:28:36.699 < <00000010>networked_laboratory - Received StrainEdge from <0000004c>
..
Adding a sample selection critera results in a format more amenable to post-processing;
$ ansar log laboratory --sample=time,strain,device
2023-05-14T21:29:36.290 -40.85305843723194 72
2023-05-14T21:29:36.290 -72.18458625093172 79
2023-05-14T21:29:36.539 92.21213120840237 28
2023-05-14T21:29:36.539 56.87339143612081 25
2023-05-14T21:29:37.040 -71.71396139352186 79
2023-05-14T21:29:37.041 50.52315045983453 27
2023-05-14T21:29:38.290 -71.99501985844601 79
2023-05-14T21:29:38.793 48.628142543211254 82
..
To be included in the output all the named values must appear in the sample key-value list, e.g.
strain=7.539539017493073:device=76. The timestamp for each log is made available for selection.
An address value is used as a device id. This produces a small number that is unique to the origin of the sample but doesnt identify the device in a readable or enduring way. Using the UUID from the matched name would be a better approach but would require a bit more code.
Deployment Of A Networked Laboratory
To deploy the main process on the current host, use the following commands;
$ cd <../a-networked-laboratory>
$ source .env/bin/activate
$ make laboratory
pyinstaller --onefile --hidden-import=_cffi_backend --log-level ERROR -p . networked-laboratory.py
pyinstaller --onefile --hidden-import=_cffi_backend --log-level ERROR -p . strain_device.py
pyinstaller --onefile --hidden-import=_cffi_backend --log-level ERROR -p . temperature_device.py
pyinstaller --onefile --hidden-import=_cffi_backend --log-level ERROR -p . `which ansar-group`
ansar create
ansar deploy dist
ansar add networked-laboratory laboratory
ansar network --connect-scope=GROUP --to-scope=HOST
ansar network --connect-scope=HOST --to-scope=LAN
$ ansar network
+ LAN 192.168.1.176:32177
+ HOST 127.0.0.1:32177
+ GROUP 127.0.0.1:45489
This builds an ansar container of processes (i.e. .ansar-home) and populates it with the single
networked-laboratory process. The final ansar network command verifies all the connections from
the default group to the local instance of ansar-host and from that service to ansar-lan.
To start the main process use;
$ cd <../a-networked-laboratory>
$ source .env/bin/activate
$ make start-laboratory
$ ansar log laboratory
02:22:36.247 + <0000000a>SocketSelect - Created by <00000001>
02:22:36.247 < <0000000a>SocketSelect - Received Start from <00000001>
02:22:36.247 > <0000000a>SocketSelect - Sent SocketChannel to <00000001>
02:22:36.247 + <0000000b>PubSub[INITIAL] - Created by <00000001>
02:22:36.247 < <0000000b>PubSub[INITIAL] - Received Start from <00000001>
02:22:36.247 + <0000000c>lock_and_hold - Created by <00000001>
02:22:36.248 > <0000000c>lock_and_hold - Sent Ready to <00000001>
02:22:36.248 + <0000000d>node_vector - Created by <00000001>
02:22:36.248 ~ <0000000d>node_vector - Executable "/home/buster/a-networked-laboratory/.ansar-home/bin/networked-laboratory" as node process (762586)
02:22:36.248 ~ <0000000d>node_vector - Working folder "/"
02:22:36.248 ~ <0000000d>node_vector - Running object "__main__.networked_laboratory"
02:22:36.248 ~ <0000000d>node_vector - Class threads (3) "subscribed" (3),"published" (2),"connect-to-peer" (1)
..
Deployment Of Laboratory Devices
To deploy a set of device processes alonside the main process, use the following commands;
$ cd <../a-networked-laboratory>
$ source .env/bin/activate
$ make devices
ansar add strain_device --count=8
ansar add temperature_device --count=8
ansar network devices --connect-scope=GROUP --to-scope=HOST
ansar network devices --connect-scope=HOST --to-scope=LAN
$ ansar network devices
+ LAN 192.168.1.176:32177
+ HOST 127.0.0.1:32177
+ GROUP 127.0.0.1:45489
This adds a set of strain_device and temperature_device processes to the ansar container of
processes (i.e. .ansar-home). The final ansar network command verifies all the connections from
the devices group to the local instance of ansar-host and from that service to ansar-lan.
To start the device processes and observe their immediate integration;
$ cd <../a-networked-laboratory>
$ source .env/bin/activate
$ make start-devices
$ ansar log laboratory
..
03:30:12.651 < <00000049>SocketProxy[NORMAL] - Received Enquiry from <00000010>
03:30:12.651 < <0000004c>SocketProxy[NORMAL] - Received Enquiry from <00000010>
03:30:12.651 < <0000004e>SocketProxy[NORMAL] - Received Enquiry from <00000010>
03:30:12.651 < <00000051>SocketProxy[NORMAL] - Received Enquiry from <00000010>
03:30:12.652 < <00000057>SocketProxy[NORMAL] - Received Enquiry from <00000010>
03:30:12.652 > <0000000a>SocketSelect - Forward TemperatureSample to <00000010> (from <00000018>)
03:30:12.652 < <00000010>networked_laboratory - Received TemperatureSample from <00000018>
03:30:12.652 ^ <00000010>networked_laboratory - temperature=2.195881429827529
03:30:12.652 > <0000000a>SocketSelect - Forward TemperatureSample to <00000010> (from <0000002f>)
03:30:12.652 > <0000000a>SocketSelect - Forward TemperatureSample to <00000010> (from <00000049>)
03:30:12.653 > <0000000a>SocketSelect - Forward TemperatureSample to <00000010> (from <0000004c>)
03:30:12.653 < <00000010>networked_laboratory - Received TemperatureSample from <0000002f>
03:30:12.653 ^ <00000010>networked_laboratory - temperature=0.3588938138273894
03:30:12.653 < <00000010>networked_laboratory - Received TemperatureSample from <00000049>
03:30:12.653 < <00000010>networked_laboratory - Received TemperatureSample from <0000005b>
03:30:12.653 ^ <00000010>networked_laboratory - temperature=-1.3324234364876795
03:30:12.788 > <0000000a>SocketSelect - Forward StrainEdge to <00000019> (from <00000017>)
03:30:12.788 < <00000010>networked_laboratory - Received StrainEdge from <00000017>
03:30:12.788 ^ <00000010>networked_laboratory - strain=3.6376587452001408
03:30:13.302 > <0000000a>SocketSelect - Forward StrainEdge to <0000005c> (from <0000005a>)
03:30:13.302 < <00000010>networked_laboratory - Received StrainEdge from <0000005a>
03:30:13.302 ^ <00000010>networked_laboratory - strain=0.5940089160811511
03:30:13.789 > <0000000a>SocketSelect - Forward StrainEdge to <0000002a> (from <00000029>)
03:30:13.790 < <00000010>networked_laboratory - Received StrainEdge from <00000029>
03:30:13.790 ^ <00000010>networked_laboratory - strain=-0.7012809873488278
03:30:14.039 > <0000000a>SocketSelect - Forward StrainEdge to <00000019> (from <00000017>)
03:30:14.040 < <00000010>networked_laboratory - Received StrainEdge from <00000017>
03:30:14.040 ^ <00000010>networked_laboratory - strain=3.1584128234835824
03:30:14.056 > <0000000a>SocketSelect - Forward StrainEdge to <0000005c> (from <0000005a>)
03:30:14.057 < <00000010>networked_laboratory - Received StrainEdge from <0000005a>
03:30:14.057 ^ <00000010>networked_laboratory - strain=0.12563950083121467
$
Device readings from both the strain and temperature devices are finding their way to the networked_laboratory
object. Logs show the outbound Enquiry messages being sent to the appropriate poll-based
devices and the strain readings arriving according to their own rhythms.
To demonstrate the operation of pub-sub networking across the LAN, deploy the devices at an additional host. Download of the repo and initial setup remains the same. Output from the deployment command is longer reflecting the fact that the main process was not previously deployed;
$ cd <folder-on-additional-host>
$ git clone https://github.com/mr-ansar/a-networked-laboratory.git
$ cd a-networked-laboratory
$ python3 -m venv .env
$ source .env/bin/activate
$ pip3 install ansar-connect pyinstaller
$ make devices
pyinstaller --onefile --hidden-import=_cffi_backend --log-level ERROR -p . networked-laboratory.py
pyinstaller --onefile --hidden-import=_cffi_backend --log-level ERROR -p . strain_device.py
pyinstaller --onefile --hidden-import=_cffi_backend --log-level ERROR -p . temperature_device.py
pyinstaller --onefile --hidden-import=_cffi_backend --log-level ERROR -p . `which ansar-group`
ansar create
ansar deploy dist
ansar add strain_device --count=8
ansar add temperature_device --count=8
ansar network devices --connect-scope=GROUP --to-scope=HOST
ansar network devices --connect-scope=HOST --to-scope=LAN
$ ansar network devices
+ LAN 192.168.1.176:32177
+ HOST 127.0.0.1:32177
+ GROUP 127.0.0.1:45489
Starting the devices remains the same. To observe the immediate integration of the new devices, view the logs from one of the local devices;
$ cd <../a-networked-laboratory>
$ source .env/bin/activate
$ make start-devices
$ ansar log temperature_device-0
04:03:02.541 + <0000000a>SocketSelect - Created by <00000001>
04:03:02.541 < <0000000a>SocketSelect - Received Start from <00000001>
04:03:02.541 > <0000000a>SocketSelect - Sent SocketChannel to <00000001>
04:03:02.541 + <0000000b>PubSub[INITIAL] - Created by <00000001>
04:03:02.542 < <0000000b>PubSub[INITIAL] - Received Start from <00000001>
04:03:02.542 + <0000000c>lock_and_hold - Created by <00000001>
04:03:02.542 > <0000000c>lock_and_hold - Sent Ready to <00000001>
04:03:02.543 + <0000000d>node_vector - Created by <00000001>
04:03:02.543 ~ <0000000d>node_vector - Executable "/home/buster/a-networked-laboratory/.ansar-home/bin/temperature_device" as node process (774827)
04:03:02.543 ~ <0000000d>node_vector - Working folder "/"
..
04:03:02.545 > <0000000f>ConnectToDirectory[PENDING] - Sent CancelTimer to <00000003>
04:03:02.545 > <0000000f>ConnectToDirectory[PENDING] - Sent UseAddress to <0000000e>
04:03:02.545 < <0000000e>ServiceDirectory[NORMAL] - Received ServiceListing from <00000011>
04:03:02.545 ~ <0000000e>ServiceDirectory[NORMAL] - Added listing "device-temperature-polled-36b3dcaa-f49a-4f6e-b98e-e8fe756813a4"
04:03:02.545 < <0000000e>ServiceDirectory[NORMAL] - Received UseAddress from <0000000f>
04:03:02.545 > <0000000e>ServiceDirectory[NORMAL] - Sent PushedDirectory to <00000012>
..
04:03:02.575 > <0000000a>SocketSelect - Forward InboundOverAccepted to <00000011> (from <00000012>)
04:03:02.575 < <00000011>PublishingAgent[READY] - Received InboundOverAccepted from <00000012>
04:03:02.575 ~ <00000011>PublishingAgent[READY] - Added peer route [LAN](11,14,59,24@(17, 58, 91, 36)4354f399-7a92-40f9-9770-6043c1e36fa8:device-temperature-polled-36b3dcaa-f49a-4f6e-b98e-e8fe756813a4)
04:03:02.580 > <0000000a>SocketSelect - Forward OpenLoop to <00000011> (from <00000014>)
04:03:02.580 < <00000011>PublishingAgent[READY] - Received OpenLoop from <00000014>
04:03:02.580 + <00000015>PublisherLoop[INITIAL] - Created by <00000011>
04:03:02.580 < <00000015>PublisherLoop[INITIAL] - Received Start from <00000011>
04:03:02.580 > <00000015>PublisherLoop[INITIAL] - Sent LoopOpened to <00000014>
04:03:02.580 > <00000015>PublisherLoop[INITIAL] - Forward Delivered to <00000010> (from <00000014>)
04:03:02.580 < <00000010>temperature_device - Dropped Delivered from <00000014>
04:03:02.580 < <00000014>SocketProxy[NORMAL] - Received LoopOpened from <00000015>
04:03:04.223 > <0000000a>SocketSelect - Forward Enquiry to <00000010> (from <00000014>)
04:03:04.223 < <00000010>temperature_device - Received Enquiry from <00000014>
04:03:04.223 > <00000010>temperature_device - Sent TemperatureSample to <00000014>
04:03:04.223 < <00000014>SocketProxy[NORMAL] - Received TemperatureSample from <00000010>
04:03:06.045 < <00000013>wandering_float - Received T1 from <00000003>
04:03:06.045 > <00000013>wandering_float - Sent StartTimer to <00000003>
04:03:06.226 > <0000000a>SocketSelect - Forward Enquiry to <00000010> (from <00000014>)
These logs show the activity related to the call to publish(), negotiation of a communications path, and the first
request-response messaging. The remote networked_laboratory process is sending the Enquiry
request and the local temperature_device is responding with a TemperatureSample.
Wandering Float
Implementation for the device simulation appears below. A thread-based async object enters a forever loop that performs the following steps;
calculates a random time period
waits for the calculated period or an interruption (e.g. control-c)
synthesizes an “adjustment” to the current value
ensures the updated value is within limits
sends the new value to a client (optional)
Clients may also be calling the get_wandering() function which provides blocking access to the current
simulation value. There may be potential for problems given the lack of multi-access protection. However, with
only one party writing to the simulation value and the tutorial nature of this material, this seems
good enough.
import random
import ansar.connect as ar
__all__ = [
'Wandering',
'get_wandering',
'wandering_float',
]
# Edge event.
class Wandering(object):
def __init__(self, value=None):
self.value = value
ar.bind(Wandering, object_schema={'value': ar.Float8()})
# The changing value.
float_value = 0.0
# Blocking interface for polling clients.
def get_wandering():
global float_value
return float_value
random.seed()
DOWN_UP= (-1.0, 1.0)
# Thread to modify the value over time.
def wandering_float(self, high, low, client=None):
global float_value
seconds = random.random() * 5.0
self.start(ar.T1, seconds)
while True:
m = self.select(ar.T1, ar.Stop)
if isinstance(m, ar.T1):
pass
else:
return ar.Aborted() # Control-c.
# Fabricate a change of float_value.
s = random.randrange(2) # Up or down.
d = random.random() * DOWN_UP[s] # Delta.
c = float_value # Current.
t = c + d
if t < low or t > high: # Keep it in range.
float_value = c - d # Invert.
else:
float_value = t
if client:
self.send(Wandering(float_value), client)
seconds = random.random() * 5.0
self.start(ar.T1, seconds)
ar.bind(wandering_float)