Select Git revision
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
hmi.py 7.29 KiB
import ml
import os
import json
import s3i
import uuid
"""
Configure the HMI.
The credentials file named hmi_cred.json must be located in the folder configs.
The configuration file named hmi.json must also be located in the same folder
"""
ml.setup_logger("hmi")
config_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "configs"))
cred_path = os.path.join(config_path, "hmi_cred.json")
with open(cred_path) as file:
hmi_cred = json.load(file)
class bcolors:
"""colors for the console log"""
HEADER = '\033[95m'
OKBLUE = '\033[94m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
def prepare_service_request():
"""
Prepare a S³I-B Service Request
:return: S³I-B Service Request
:rtype: dict
"""
available_services = ["fml40::AcceptsFellingJobs", "fml40::ProvidesProductionData"]
print("[S³I]: Following services are available: {0}{1}{2}".format(bcolors.UNDERLINE, available_services,
bcolors.ENDC))
class_name = input("[S³I]: Please enter one of these services: ")
serv_req = s3i.messages.ServiceRequest()
while class_name not in available_services:
print("[S³I]: service {0}{1}{2} is not available".format(bcolors.FAIL, class_name, bcolors.ENDC))
print("[S³I]: Following services are available: {0}{1}{2}".format(bcolors.UNDERLINE, available_services,
bcolors.ENDC))
class_name = input("[S³I]: Please enter one of these services: ")
if class_name == "fml40::AcceptsFellingJobs":
available_methods = ["acceptJob", "queryJobStatus", "removeJob"]
method = input("[S³I]: which method should be called? {0}{1}{2} ".format(bcolors.UNDERLINE, available_methods,
bcolors.ENDC))
while method not in available_methods:
method = input(
"[S³I]: which method should be called? {0}{1}{2} ".format(bcolors.UNDERLINE, available_methods,
bcolors.ENDC))
if method == "acceptJob":
subFeatures = [{
"class": "fml40::Assortment",
"grade": "fl",
"name": "Stammholz Abschnitte",
"subFeatures": [
{
"class": "fml40::ThicknessClass",
"name": ">"
},
{
"class": "fml40::WoodQuality",
"name": "B-C"
},
{
"class": "fml40::HarvestingParameters",
"cuttingLengths": 20
},
{
"class": "fml40::TreeType",
"name": "Spruce",
"conifer": True
},
{
"class": "fml40::HarvestedVolume",
"volume": 140
}
]
}]
feature_config_json = ml.make_feature_config(class_name="fml40::FellingJob", subFeatures=subFeatures,
name="my_felling_job")
felling_job = ml.build_feature(feature=feature_config_json)
felling_job_json = felling_job.to_json()
print("[S³I]: A felling job is generated with the job id: {}".format(felling_job_json["identifier"]))
parameter = {"job": felling_job_json}
elif method == "queryJobStatus":
job_id = input("[S³I]: Please enter the job id: ")
parameter = {"identifier": job_id}
elif method == "removeJob":
job_id = input("[S³I]: Please enter the job id: ")
parameter = {"identifier": job_id}
elif class_name == "fml40::ProvidesProductionData":
available_methods = ["getProductionData"]
method = input("[S³I]: which method should be called? {0}{1}{2} ".format(bcolors.UNDERLINE, available_methods,
bcolors.ENDC))
while method not in available_methods:
method = input(
"[S³I]: which method should be called? {0}{1}{2} ".format(bcolors.UNDERLINE, available_methods,
bcolors.ENDC))
if method == "getProductionData":
name = input("[S³I]: Please enter the name of your production data: [Stammsegment 4711, Stammsegment 4712] ")
parameter = {"name": name}
serv_req.fillServiceRequest(
senderUUID=hmi_cred.get("identifier"), receiverUUID=[receiver], sender_endpoint=hmi_endpoint,
serviceType="{}/{}".format(class_name, method),
parameters=parameter,
msgUUID="s3i:{}".format(uuid.uuid4())
)
return serv_req.msg
def prepare_get_value_request():
"""
Prepare a S³I-B GetValueRequest
:return: S³I-B GetValueRequest
:rtype: dict
"""
getv_req = s3i.GetValueRequest()
attribute_path = input("[S³I]: Please enter the attribute path: ["
"attributes/features/ml40::Composite/targets/ml40::Engine/features/ml40::RotationalSpeed"
"/rpm]")
getv_req.fillGetValueRequest(
senderUUID=hmi_cred.get("identifier"), receiverUUID=[receiver], sender_endpoint=hmi_endpoint,
attributePath=attribute_path, msgUUID="s3i:{}".format(uuid.uuid4())
)
return getv_req.msg
if __name__ == "__main__":
"""
Configure and run the HMI
"""
config_file_name = ml.make_thing_config(thing_id=hmi_cred.get("identifier"), name="hmi",
roles=[{"class": "ml40::HMI"}],
config_path=config_path)
hmi_model = ml.load_config(config_filepath=os.path.join(config_path, config_file_name))
hmi = ml.create_thing(model=hmi_model, grant_type="password", secret=hmi_cred.get("secret"),
username=hmi_cred.get("username"), password=hmi_cred.get("password"),
is_broker_rest=False, is_broker=True, is_repo=False)
hmi.run_forever()
hmi_endpoint = ml.find_broker_endpoint(hmi.dir, hmi_cred.get("identifier"))
receiver = "s3i:b6d1cc6d-896c-40fe-9403-b5b7682b1d03"
"""
While loop to edit S³I messages and then send it to the Digital Twin
"""
while True:
print("[S³I]: You can send following messages to forwarder a by entering 1 or 2")
request_type = input(" {0}[1]: service request, [2]: get value request:{1} ".format(
bcolors.OKBLUE, bcolors.ENDC
))
if request_type == "1":
msg = prepare_service_request()
if msg is None:
continue
elif request_type == "2":
msg = prepare_get_value_request()
if msg is None:
continue
else:
continue
receiver_endpoint = ml.find_broker_endpoint(hmi.dir, thing_id=receiver)
resp = hmi.broker.send([receiver_endpoint], json.dumps(msg))