Aufgrund einer Wartung wird GitLab am 25.01 zwischen 8:00 und 9:00 Uhr kurzzeitig nicht zur Verfügung stehen. / Due to maintenance, GitLab will be temporarily unavailable on 25.01 between 8:00 and 9:00 am.

Commit 3506e290 authored by C. Albrecht's avatar C. Albrecht
Browse files

WIP

parent aade1cae
build
dist
*mypy_cache
**/__pycache__/
*.py[cod]
.vscode
......@@ -8,9 +11,11 @@ logs
.python-version
*.org
*.sh
/configs/
!/configs/config_forest.json
!/configs/config_forwarder_komatsu.json
!/configs/config_harvester_john_deere.json
!/configs/config_harvester_ponsse.json
scripts/credentials
\ No newline at end of file
credentials/*.json
credentials/*.txt
!./credentials/collection.txt.gpg
\ No newline at end of file
"""Implementation of the digital twin mini tractor driver"""
import json
from s3i import TokenType
from fml40.app_logger import APP_LOGGER
from fml40.tools import send_request
from fml40.tools import decode_message
from fml40.tools import verify_message
from fml40.worker import Worker
from ml.app_logger import APP_LOGGER
from ml.tools import send_request
from ml.tools import decode_message
from ml.tools import verify_message
from ml.thing import Thing
class MiniTractorDriver(Worker):
class MiniTractorDriver(Thing):
def __init__(self, idp, config):
APP_LOGGER.info("Hello I'm the mini tractor driver.")
super().__init__(idp, config)
......
......@@ -7,27 +7,27 @@
import argparse
import json
from fml40.dt_factory import create_dt_ref
from fml40.dt_factory import get_dt_names
from fml40.tools import send_requests
from fml40.tools import get_requests
from fml40.tools import load_config
from fml40.tools import get_receiver_callback_func
from fml40.tools import get_s3i_broker
from fml40.tools import GRANT_TYPES
from fml40.functionalities.functionalities import get_functionality_names
from fml40.dt_factory import DT_FACTORY
from fml40.app_logger import setup_logger
from customer_code_example.harvester_john_deere import john_deere
from customer_code_example.harvester_ponsse import ponsse
from customer_code_example.forwarder_komatsu import komatsu
from customer_code_example.mini_tractor_driver import MiniTractorDriver
DT_FACTORY["JohnDeere"] = john_deere.JohnDeere
DT_FACTORY["Komatsu"] = komatsu.Komatsu
DT_FACTORY["Ponsse"] = ponsse.Ponsse
DT_FACTORY["MiniTractorDriver"] = MiniTractorDriver
import threading
import time
from s3i import TokenType
from s3i import IdentityProvider
from s3i import Broker
from ml.dt_factory import create_dt_ref
from ml.dt_factory import get_dt_names
from ml.tools import create_request
from ml.tools import get_requests
from ml.tools import load_config
from ml.tools import get_receiver_callback_func
from ml.tools import get_s3i_broker
from ml.tools import GRANT_TYPES
from ml.dt_factory import DT_FACTORY
from ml.app_logger import setup_logger
from ml.authentication import get_credentials_dispatch
from ml.authentication import configure_credentials_parser
from ml.tools import send_requests
from ml.tools import send_request
from ml.tools import send_message
from ml.fml40.features.functionalities.forwards import send_moisture_get_value_requests
def create_argparser():
......@@ -42,29 +42,25 @@ def create_argparser():
subparsers = parent_parser.add_subparsers(dest="command")
launch_parser = subparsers.add_parser("launch")
launch_parser.add_argument("dt_config_fp", nargs=1, type=str)
launch_parser.add_argument("-s", "--singleshot", action="store_true")
launch_parser.add_argument("-u", default="", help="Username")
launch_parser.add_argument("-p", default="", help="Password")
launch_parser.add_argument(
"-g", choices=GRANT_TYPES, default=GRANT_TYPES[0], help="grant type"
)
launch_parser.add_argument("--f",
default=None,
help="Filepath to credentials in json format.")
configure_credentials_parser(launch_parser)
list_parser = subparsers.add_parser("list")
list_parser.add_argument("class_type", nargs=1, choices=["functionalities", "dts"])
list_parser.add_argument("class_type",
nargs=1,
choices=["functionalities", "dts"])
return parent_parser
def print_digital_twins(dt_names):
print(
"The following digital twins are "
"available:\n\n\t{}".format("\n\t".join(dt_names))
)
print("The following digital twins are "
"available:\n\n\t{}".format("\n\t".join(dt_names)))
def print_functionalities(func_names):
print(
"The following functionalities are available:"
"\n\n\t{}".format("\n\t".join(func_names))
)
print("The following functionalities are available:"
"\n\n\t{}".format("\n\t".join(func_names)))
def main():
......@@ -77,30 +73,19 @@ def main():
if args.class_type[0] == "dts":
dt_names = get_dt_names()
print_digital_twins(dt_names)
elif args.class_type[0] == "functionalities":
func_names = get_functionality_names()
print_functionalities(func_names)
elif args.command == "launch":
dt_config_fp = args.dt_config_fp[0]
config = load_config(dt_config_fp)
dt_name = config["type"]
sender_id = config["thingId"]
app_logger = setup_logger(dt_name)
app_logger.debug("Parsed config: %s", json.dumps(config, indent=2))
dt_ref = create_dt_ref(config, args.g, args.u, args.p)
if dt_ref is None:
print(f"Thing called {dt_name} is not available.")
if args.singleshot:
app_logger.debug("Running in singleshot mode")
requests = get_requests(config)
send_requests(dt_ref, requests)
dt_ref.stop()
else:
app_logger.debug("Running in persistent mode")
s3i_broker = get_s3i_broker(dt_ref)
callback_p = get_receiver_callback_func(dt_ref)
s3i_broker.receive("s3ib://{}".format(sender_id), callback_p)
model = load_config(dt_config_fp)
app_logger = setup_logger(__name__)
app_logger.debug("Parsed model: %s", json.dumps(model, indent=2))
_, secret, username, password = get_credentials_dispatch(
args.g, args.f)
thing_ref = create_dt_ref(model, args.g, secret, username, password,
True, False)
thing_proxy = thing_ref.proxy()
thing_proxy.run_forever()
access_token = thing_proxy.access_token.get()
print(access_token)
if __name__ == "__main__":
......
import logging
import os
APP_LOGGER = logging.getLogger("app_logger")
def setup_logger(dt_name):
"""Creates logger named app_logger.
Logs are printed to stdout and saved to log files under '/logs'.
Each file is names 'dt_name'.log.
:param dt_name: Name of the digital twin.
:returns: Created logger
:rtype: Logger
"""
if not os.path.exists("logs"):
os.mkdir("logs")
app_logger = logging.getLogger('app_logger')
app_logger.setLevel(logging.DEBUG)
log_formatter = logging.Formatter(
"%(asctime)s - %(levelname)s - {}: %(message)s".format(dt_name)
)
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(log_formatter)
stream_handler.setLevel(logging.DEBUG)
file_handler = logging.FileHandler(filename="./logs/{}.log".format(dt_name))
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(log_formatter)
app_logger.addHandler(file_handler)
app_logger.addHandler(stream_handler)
return app_logger
from s3i import IdentityProvider, TokenType
import argparse
import json
GRANT_TYPES = ("client_credentials", "password")
def configure_credentials_parser(parser):
parser.add_argument(
"--g", choices=GRANT_TYPES, default=GRANT_TYPES[0], help="grant type"
)
parser.add_argument("--d", action="store_false", help="Prompt for client")
parser.add_argument("--s", action="store_true", help="Prompt for scope")
parser.add_argument("--c", default=None, help="Filepath to credentials")
def get_username_and_password(filepath=None):
username = None
password = None
if filepath is not None:
with open(filepath, "r") as json_f:
js_object = json.load(json_f)
username = js_object["name"]
password = js_object["password"]
else:
username = input("[S3I]: Please enter your username: ")
# TODO: Use getpass
password = input("[S3I]: Please enter your password: ")
return username, password
def get_client_id_and_secret(filepath=None):
client_id = None
client_secret = None
if filepath is not None:
with open(filepath, "r") as json_f:
js_object = json.load(json_f)
client_id = js_object.get("thingId", None)
if not client_id:
client_id = js_object.get("identifier")
client_secret = js_object.get("client_secret")
if not client_secret:
client_secret = js_object.get("secret")
else:
client_id = input("[S3I]: Please enter client id: ")
# TODO: Use getpass
client_secret = input("[S3I]: Please enter client secret: ")
return client_id, client_secret
def get_credentials_dispatch(grant_type, filepath=None):
username = ""
password = ""
secret = ""
client_id = ""
if grant_type == GRANT_TYPES[0]:
client_id, secret = get_client_id_and_secret(filepath)
else:
username, password = get_username_and_password(filepath)
return client_id, secret, username, password
def authorize_client(default_client, with_scope):
client = "admin-cli"
client_secret = ""
scope = ""
if not default_client:
tmp = input(
"[S3I]: Please enter the thing (client) you want to "
"authorize (or leave blank to default to admin client): "
)
if tmp != "":
client = tmp
client_secret = input(
"[S3I]: Please enter the thing's secret (client secret): "
)
if with_scope:
scope = input(
"[S3I]: Please enter the client scope which is requested for the "
"access token and has been assigned to the client (or leave blank): "
)
return client, client_secret, scope
def get_access_token(grant_type, client_id, client_secret, username, password, scope):
idp = IdentityProvider(
grant_type=grant_type,
client_id=client_id,
username=username,
password=password,
client_secret=client_secret,
realm="KWH",
identity_provider_url="https://idp.s3i.vswf.dev/",
)
return idp.get_token(TokenType.ACCESS_TOKEN, scope=scope)
def get_access_token_dispatch(file_path, grant_type, ask_client, ask_scope):
access_token = None
if grant_type == GRANT_TYPES[1]:
username, password = get_username_and_password(file_path)
client_id, client_secret, scope = authorize_client(ask_client, ask_scope)
access_token = get_access_token(
grant_type, client_id, client_secret, username, password, scope
)
else:
client_id, client_secret = get_client_id_and_secret(file_path)
access_token = get_access_token(
grant_type, client_id, client_secret, "", "", ""
)
return access_token
""" Implements a factory for managing digital twins."""
import sys
from s3i import IdentityProvider
from s3i import TokenType
from ml.tools import get_idp
from ml.managing_actor import ManagingActor
from ml.tools import load_config
from ml.app_logger import APP_LOGGER
from ml.thing import Thing
from customer_code_example.harvester_john_deere import john_deere
from customer_code_example.harvester_ponsse import ponsse
from customer_code_example.forwarder_komatsu import komatsu
from customer_code_example.mini_tractor_driver import MiniTractorDriver
from ml.ml40.roles.service.service import Service
from ml.ml40.features.functionalities.manages_jobs import ManagesJobs
from ml.ml40.features.properties.values.location import Location
from ml.ml40.features.properties.shared import Shared
from ml.ml40.features.properties.composite import Composite
from ml.ml40.features.properties.values.weight import Weight
from ml.ml40.features.properties.values.moisture import Moisture
from ml.fml40.features.functionalities.accepts_proximity_alert import (
AcceptsProximityAlert,
)
from ml.fml40.features.functionalities.accepts_felling_jobs import AcceptsFellingJobs
from ml.fml40.features.functionalities.harvests import Harvests
from ml.fml40.features.functionalities.forwards import Forwards
from ml.fml40.features.functionalities.provides_production_data import (
ProvidesProductionData,
)
from ml.fml40.features.functionalities.accepts_forwarding_jobs import (
AcceptsForwardingJobs,
)
from ml.fml40.features.functionalities.provides_passability_information import (
ProvidesPassabilityInformation,
)
from ml.fml40.features.functionalities.provides_moisture_prediction import (
ProvidesMoisturePrediction,
)
from ml.fml40.features.functionalities.accepts_passability_report import (
AcceptsPassabilityReport,
)
from ml.fml40.features.properties.values.documents.reports.moisture_prediction_report import (
MoisturePredictionReport,
)
# TODO: Get rid of this global variable
DT_FACTORY = {}
DT_FACTORY["Service"] = Service
DT_FACTORY["ml40::Thing"] = Thing
DT_FACTORY["ml40::ManagesJobs"] = ManagesJobs
DT_FACTORY["ml40::Location"] = Location
DT_FACTORY["ml40::Composite"] = Composite
DT_FACTORY["ml40::Shared"] = Shared
DT_FACTORY["ml40::Weight"] = Weight
DT_FACTORY["ml40::Moisture"] = Moisture
DT_FACTORY["fml40::AcceptsProximityAlert"] = AcceptsProximityAlert
DT_FACTORY["fml40::AcceptsFellingJobs"] = AcceptsFellingJobs
DT_FACTORY["fml40::ProvidesProductionData"] = ProvidesProductionData
DT_FACTORY["fml40::Harvests"] = Harvests
DT_FACTORY["fml40::Forwards"] = Forwards
DT_FACTORY["fml40::AcceptsForwardingJobs"] = AcceptsForwardingJobs
DT_FACTORY["fml40::ProvidesPassabilityInformation"] = ProvidesPassabilityInformation
DT_FACTORY["fml40::ProvidesMoisturePrediction"] = ProvidesMoisturePrediction
DT_FACTORY["fml40::MoisturePredictionReport"] = MoisturePredictionReport
DT_FACTORY["fml40::AcceptsPassabilityReport"] = AcceptsPassabilityReport
def get_dt_names():
"""Returns a list containg the names of all registered digital twins.
"""
dt_names = list(map(lambda x: x, DT_FACTORY.keys()))
return dt_names
def create_dt_with_idp(config, id_p):
"""Creates and returns a ditigal twin.
:param config: A json-object describing the digital twin.
:param i_d_p: A s3i.IdentityProvider object
:returns: Tuple (digital twin, access token)
:rtype: tuple(config[type], str)
"""
dt_type = config["type"]
d_t = DT_FACTORY.get(dt_type, ManagingActor)
return d_t
# def create_dt(config):
# """Creates and returns a ditigal twin and returns .
# :param config: A json-object describing the digital twin.
# :returns: Tuple (digital twin, access token)
# :rtype: tuple(config[type], str)
# """
# id_p = get_idp(config)
# dt_type = config["type"]
# d_t = DT_FACTORY.get(dt_type, ManagingActor)(id_p, config)
# return d_t
def build_sub_featrues(thing_ref, feature_proxy, json_feature):
json_sub_features = json_feature.get("subFeatures", [])
for json_sub_feature in json_sub_features:
class_name = json_sub_feature.get("class", "")
sub_feature = DT_FACTORY.get(class_name, None)
if not sub_feature:
APP_LOGGER.critical("Subfeature: %s is missing" % class_name)
else:
APP_LOGGER.debug("Adding subfeature: %s" % class_name)
sub_feature_ref = sub_feature.start("", thing_ref)
sub_feature_proxy = sub_feature_ref.proxy()
sub_feature_proxy.from_json(json_feature).get()
type_name = sub_feature_proxy.type_name.get()
sub_features = feature_proxy.subfeatures.get()
sub_features.append(sub_feature_ref)
def build(thing_ref, attributes):
thing_proxy = thing_ref.proxy()
json_features = attributes.get("features", [])
for json_feature in json_features:
class_name = json_feature.get("class", "")
feature = DT_FACTORY.get(class_name, None)
if not feature:
APP_LOGGER.critical("Feature: %s is missing" % class_name)
else:
APP_LOGGER.debug("Adding feature: %s" % class_name)
feature_ref = feature.start("", thing_ref)
feature_proxy = feature_ref.proxy()
feature_proxy.from_json(json_feature).get()
type_name = feature_proxy.type_name.get()
features = thing_proxy.features.get()
features[type_name] = feature_ref
build_sub_featrues(thing_ref, feature_proxy, json_feature)
def create_dt_ref(model, grant_type, secret, username, password, is_broker, is_repo):
"""Creates a ditigal twin, runs it in an own thread and returns a
reference to it.
:param config: A json-object describing the digital twin.
:param i_d_p: A s3i.IdentityProvider object
:returns: Tuple (digital twin, access token)
:rtype: tuple(Reference, str)
"""
# id_p = get_idp(grant_type, thing_id, secret, username, password)
attributes = model.get("attributes", None)
if attributes is None:
print("Incomplete model: attributes missing!")
sys.exit()
thing_type = attributes.get("class", "")
if thing_type == "":
print("Unknown type %s" % thing_type)
sys.exit()
thing_name = attributes.get("name", "")
APP_LOGGER.debug("Creating ditigtal twin %s" % thing_name)
d_t = DT_FACTORY.get(thing_type, ManagingActor)
thing_ref = d_t.start(
model=model,
grant_type=grant_type,
client_secret=secret,
username=username,
password=password,
is_broker=is_broker,
is_repo=is_repo,
)
build(thing_ref, attributes)
return thing_ref
from ml.ml40.features.functionalities.accepts_jobs import AcceptsJobs
from ml.fml40.features.properties.values.documents.jobs.felling_job import FellingJob
from ml.app_logger import APP_LOGGER
class AcceptsFellingJobs(AcceptsJobs):
def __init__(self, name, ref_managing_actor):
super(AcceptsFellingJobs, self).__init__(
name=name, ref_managing_actor=ref_managing_actor
)
def acceptJob(self, job: FellingJob) -> bool:
APP_LOGGER.info("Checking if the job can be accepted.")
proxy_executer = self.managing_actor.proxy_functionalities.get()[
self.proxy_name
]
proxy_executer.executeJob(job)
return True
def from_json(self, json_obj):
super().from_json(json_obj)
from modelling_language.ml40.feature.functionality.AcceptsJob import AcceptsJobs
from modelling_language.fml40.feature.property.value.document.job.FellingSupportJob import FellingSupportJob
class AcceptFellingSupportJobs(AcceptsJobs):
def acceptJob(self, job: FellingSupportJob) -> bool:
pass
from ml.ml40.features.functionalities.accepts_jobs import AcceptsJobs
from ml.fml40.features.properties.values.documents.jobs.forwarding_job import (
ForwardingJob,
)
from ml.app_logger import APP_LOGGER
class AcceptsForwardingJobs(AcceptsJobs):
def __init__(self, name, ref_managing_actor):
super().__init__(ref_managing_actor=ref_managing_actor, name=name)
def acceptJob(self, job: ForwardingJob) -> bool:
APP_LOGGER.info("I check if I (the Komatsu Forwarder) can accept the job.")
proxy_executer = self.managing_actor.proxy_functionalities.get().get(
"fml40::Forwards", None
)
proxy_executer.executeJob(job)
return True
def from_json(self, json_obj):
super().from_json(json_obj)
from modelling_language.ml40.feature.functionality import Functionality
from modelling_language.fml40.feature.property.value.document.report import LogMeasurement
class AcceptsLogMeasurements(Functionality):
def acceptLogMeasurement(self, log_measurement: LogMeasurement) -> bool:
pass
from modelling_language.ml40.feature.functionality import AcceptsJobs
from modelling_language.fml40.feature.property.value.document.job import LogTransportationJob
class AcceptsLogTransportationJobs(AcceptsJobs):
def acceptJob(self, job: LogTransportationJob) -> bool:
pass
from modelling_language.ml40.feature.functionality import Functionality
from modelling_language.fml40.feature.property.value.document.report import SoilMoistureMeasurement
class AcceptsMoistureMeasurement(Functionality):
def acceptMoistureMeasurement(self, input: SoilMoistureMeasurement):
pass
\ No newline at end of file
from modelling_language.ml40.feature.functionality import Functionality
class AcceptsMoveCommands(Functionality):
def __init__(self, name, ref_managing_actor):
super().__init__(
name=name,
ref_managing_actor=ref_managing_actor
)
def move(self, longitude: float, latitude: float):
print("move to longitude: {}, latitude : {}".format(longitude, latitude))
return {"longitude": longitude, "la