diff --git a/README.md b/README.md index 2094113529de67266925e82f42d5626b20d0ec89..52ad0a8f0a7e4ce1fb520d7842bde6b7f098a4c8 100644 --- a/README.md +++ b/README.md @@ -105,6 +105,8 @@ submodel reference * `examples.tutorial_serialization_deserialization_json`: Tutorial for the serialization and deserialization of asset administration shells, submodels and assets * `examples.tutorial_storage`: Tutorial for storing asset administration shells, submodels and assets +* `examples.tutorial_dynamic_model`: Tutorial about creating a submodel with elements that update dynamically from a + custom data source ## Contributing diff --git a/aas/examples/tutorial_dynamic_model.py b/aas/examples/tutorial_dynamic_model.py new file mode 100644 index 0000000000000000000000000000000000000000..c79aaf32bd0dd1c53c33e87c028c6c743af33299 --- /dev/null +++ b/aas/examples/tutorial_dynamic_model.py @@ -0,0 +1,151 @@ +#!/usr/bin/env python3 +# This work is licensed under a Creative Commons CCZero 1.0 Universal License. +# See http://creativecommons.org/publicdomain/zero/1.0/ for more information. +""" +Tutorial for the creation of a SubmodelElementCollection of Properties, which are dynamically updated from a proprietary +data source. As an example, this tutorial uses the `psutil` library to represent the local computer's process list. +""" +import datetime +from typing import List + +import psutil # type: ignore + +from aas import model +from aas.adapter.json import write_aas_json_file + +# This tutorial shows how SubmodelElements can be customized to dynamically reflect values from a proprietary data +# source. The presented method can be used to create dynamic submodels, representing an SQL database's data or values +# of local variables within the Asset Administration Shell data model. These submodels can be serialized to create +# an AAS-compliant snapshot of the current data or be attached to the HTTP server for reading (and even modifying) the +# values via the AAS API. +# +# The basic idea is to create custom subclasses of PyI40AAS's model classes, which are bound to the specific data source +# and fetch the current values dynamically from that data source. For this purpose, the `update()` and `commit()` +# methods are overridden, which should be called by any code using the AAS model objects (including the HTTP server) to +# values should be synchronized from or to underlying data sources. +# +# In this tutorial, we use the `psutil` library to create a SubmodelElementCollection of all processes running on the +# local computer, containing a SubmodelElementCollection for each process, which contain three Properties each: the +# process id, the process name and the current memory usage of the process. (Providing CPU utilization values would be +# a bit harder, since it needs to be measured over a period of time.) + + +############################################################################################################## +# Step 1: create custom Property and SubmodelElementCollection classes to hold a process's memory usage, all # +# properties of a single process and a list of all processes. # +############################################################################################################## + +class ProcessMemProperty(model.Property): + """ + A special Property class to represent a system process's memory usage + + It inherits from the normal `model.Propery` class, but overrides the `__init__()` method, as well as the `update()` + method. Each instance of such a Property is bound to a specific process. The new `update()` method updates the + object's `value` attribute with the current memory usage of that process. For this purpose, it holds a reference to + psutil's `Process` object representing the process. + """ + def __init__(self, process: psutil.Process, id_short: str): + super().__init__(id_short, model.datatypes.Float, None) + self.process = process + self.last_update: datetime.datetime = datetime.datetime.fromtimestamp(0) + + def update(self, timeout: float = 0) -> None: + if datetime.datetime.now() - self.last_update < datetime.timedelta(seconds=timeout): + return + self.value = self.process.memory_percent() + self.last_update = datetime.datetime.now() + + def commit(self) -> None: + raise AttributeError("A process's memory usage is a read-only property.") + + +class ProcessDataCollection(model.SubmodelElementCollectionUnordered): + """ + This class is a special SubmodelElementCollection to hold all properties of a single process. + + It inherits from the normal `model.SubmodelElementCollection` class. In addition to the usual attributes, it defines + a new instance attribute `pid`, which holds the process id of the represented process. The new `__init__()` method + retrieves the static attributes of the process using `psutil` and creates all the Properties to represent the + process with well-known idShort values: + + * pid -- A static int-Property containing the process id + * name -- A static string-Property containing the process name (executable name) + * mem -- A dynamic float-Property representing the current memory usage of the process in percent. Uses the custom + `ProcessMemProperty` class + """ + def __init__(self, pid: int, id_short: str): + super().__init__(id_short) + self.pid = pid + process = psutil.Process(pid) + + self.value.add(model.Property("pid", model.datatypes.Integer, pid)) + self.value.add(model.Property("name", model.datatypes.String, process.name())) + self.value.add(ProcessMemProperty(process, "mem")) + + def update(self, timeout: float = 0) -> None: + # Only the 'mem' Property needs to be updated dynamically. The other Properties are static. + self.get_referable('mem').update(timeout) + + def commit(self) -> None: + raise AttributeError("A process is a read-only property collection.") + + +class ProcessList(model.SubmodelElementCollectionUnordered): + """ + A special SubmodelElementCollection, representing a (dynamically updated) list of all processes on the local system + + It inherits from the normal `model.SubmodelElementCollection` class and amends it with a more sophisticated + `update()` method to update the list of contained elements based on the current process list of the computer. + """ + def __init__(self, id_short: str): + super().__init__(id_short) + + def update(self, timeout: float = 0) -> None: + # Get the current list of running processes' ids + pids = psutil.pids() + + # 1. Step: Delete old ProcessDataCollections + # Attention: We must not modify the NamespaceSet in `self.value` while iterating over it. Thus, we first create + # a list of the children to be deleted and delete them in a second run. + children_to_remove: List[model.SubmodelElement] = [c + for c in self.value + if (not isinstance(c, ProcessDataCollection) + or c.pid not in pids)] + for c in children_to_remove: + self.value.discard(c) + + # 2. Step: Add ProcessDataCollections for new processes + # We use the process id (pid) as idShort, so we can easily check if the ProcessDataCollection exists already + for pid in pids: + id_short = "process_{}".format(pid) + if id_short not in self.value: # The NamespaceSet object in `self.value` allows for a given id_short + self.value.add(ProcessDataCollection(pid, id_short)) + + # 3. Step: Update the data within the ProcessDataCollections recursively + for c in self.value: + c.update() + + def commit(self) -> None: + raise AttributeError("A process list is a read-only collection.") + + +########################################################### +# Step 2: Use an instance of the custom ProcessList class # +########################################################### + +# Create a Submodel including a ProcessList instance +submodel = model.Submodel( + identification=model.Identifier('https://acplt.org/ComputerInformationTest', model.IdentifierType.IRI), + submodel_element={ProcessList('processes')} +) + +# Update the complete process list to have current data +# TODO in future it will be sufficient to `update()` the submodel, which should propagate the request recursively. +submodel.get_referable('processes').update() + +# Write the submodel to a JSON file. +# See `tutorial_serialization_deserialization_json.py` for more information. +obj_store: model.DictObjectStore[model.Identifiable] = model.DictObjectStore() +obj_store.add(submodel) +with open('ComputerInformationTest.json', 'w') as f: + write_aas_json_file(f, obj_store, indent=4) diff --git a/requirements.txt b/requirements.txt index c83c985c77f8da68a7807b86f27b180f52506a89..8b162e39f79dabbac3a2e816161f4c1df794271d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,3 +2,4 @@ jsonschema>=3.2,<4.0 lxml>=4.2,<5 python-dateutil>=2.8,<3.0 pyecma376-2>=0.2 +psutil>=5.6 diff --git a/test/examples/test_tutorials.py b/test/examples/test_tutorials.py index 9291baa425ad4096a381fbfc105db1eb097c64d9..3a1569ddf43856e8cd104928a8ba3398014f5984 100644 --- a/test/examples/test_tutorials.py +++ b/test/examples/test_tutorials.py @@ -13,7 +13,14 @@ Tests for the tutorials Functions to test if a tutorial is executable """ +import os +import re +import tempfile import unittest +from contextlib import contextmanager + +from aas import model +from aas.adapter.json import read_aas_json_file class TutorialTest(unittest.TestCase): @@ -25,3 +32,47 @@ class TutorialTest(unittest.TestCase): def test_tutorial_serialization_deserialization_json(self): from aas.examples import tutorial_serialization_deserialization_json + + def test_tutorial_dynamic_model(self) -> None: + with temporary_workingdirectory(): + from aas.examples import tutorial_dynamic_model + + # After executing the tutorial, there should be an AAS JSON file in the temporary working directory, + # containing a list of processes, which should at least contain one "python" process. + with open('ComputerInformationTest.json') as f: + objects = read_aas_json_file(f, failsafe=False) + + submodel = objects.get_identifiable(model.Identifier('https://acplt.org/ComputerInformationTest', + model.IdentifierType.IRI)) + assert(isinstance(submodel, model.Submodel)) + process_list = submodel.get_referable('processes') + assert(isinstance(process_list, model.SubmodelElementCollection)) + processes = [(p.get_referable('pid').value, # type: ignore + p.get_referable('name').value, # type: ignore + p.get_referable('mem').value) # type: ignore + for p in process_list.value] + r = re.compile(r'[Pp]ython|coverage') # When tests are run via `coverage`, there might not be a Python process + python_processes = list(filter(lambda p: r.match(p[1]), processes)) + self.assertTrue(len(python_processes) > 0, + "'Python' not found in Process list {}".format([p[1] for p in processes])) + self.assertGreater(python_processes[0][0], 0) + self.assertGreater(python_processes[0][2], 0.0) + + +@contextmanager +def temporary_workingdirectory(): + """ + A helper contextmanager to temporarily change the current working directory of the Python process to a temporary + directory. + + The temp directory is deleted with all its contents when leaving the with-context. This can be used to test Python + scripts, which write files to the current working directory. + """ + cwd = os.getcwd() + tempdir = tempfile.TemporaryDirectory() + os.chdir(tempdir.name) + try: + yield None + finally: + os.chdir(cwd) + tempdir.cleanup()