Skip to content
Snippets Groups Projects
Select Git revision
  • 9917f51a902f00b5a27f21fb1260f21bf1cfb0c9
  • master default protected
  • feature/golang-alpine
  • 2.0.0
  • 1.0.2
5 results

clear.go

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    hmi.py 7.29 KiB
    import ml
    import os
    import json
    import s3i
    import uuid
    
    """
    Configure the HMI. 
    The credentials file named hmi_cred.json must be located in the folder configs.
    The configuration file named hmi.json must also be located in the same folder 
    """
    ml.setup_logger("hmi")
    config_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "configs"))
    cred_path = os.path.join(config_path, "hmi_cred.json")
    with open(cred_path) as file:
        hmi_cred = json.load(file)
    
    
    class bcolors:
        """colors for the console log"""
        HEADER = '\033[95m'
        OKBLUE = '\033[94m'
        OKGREEN = '\033[92m'
        WARNING = '\033[93m'
        FAIL = '\033[91m'
        ENDC = '\033[0m'
        BOLD = '\033[1m'
        UNDERLINE = '\033[4m'
    
    
    def prepare_service_request():
        """
        Prepare a S³I-B Service Request
    
        :return: S³I-B Service Request
        :rtype: dict
        """
        available_services = ["fml40::AcceptsFellingJobs", "fml40::ProvidesProductionData"]
        print("[S³I]: Following services are available: {0}{1}{2}".format(bcolors.UNDERLINE, available_services,
                                                                          bcolors.ENDC))
        class_name = input("[S³I]: Please enter one of these services: ")
        serv_req = s3i.messages.ServiceRequest()
    
        while class_name not in available_services:
            print("[S³I]: service {0}{1}{2} is not available".format(bcolors.FAIL, class_name, bcolors.ENDC))
            print("[S³I]: Following services are available: {0}{1}{2}".format(bcolors.UNDERLINE, available_services,
                                                                              bcolors.ENDC))
            class_name = input("[S³I]: Please enter one of these services: ")
    
        if class_name == "fml40::AcceptsFellingJobs":
            available_methods = ["acceptJob", "queryJobStatus", "removeJob"]
            method = input("[S³I]: which method should be called? {0}{1}{2} ".format(bcolors.UNDERLINE, available_methods,
                                                                               bcolors.ENDC))
            while method not in available_methods:
                method = input(
                    "[S³I]: which method should be called? {0}{1}{2} ".format(bcolors.UNDERLINE, available_methods,
                                                                              bcolors.ENDC))
            if method == "acceptJob":
                subFeatures = [{
                    "class": "fml40::Assortment",
                    "grade": "fl",
                    "name": "Stammholz Abschnitte",
                    "subFeatures": [
                        {
                            "class": "fml40::ThicknessClass",
                            "name": ">"
                        },
                        {
                            "class": "fml40::WoodQuality",
                            "name": "B-C"
                        },
                        {
                            "class": "fml40::HarvestingParameters",
                            "cuttingLengths": 20
                        },
                        {
                            "class": "fml40::TreeType",
                            "name": "Spruce",
                            "conifer": True
                        },
                        {
                            "class": "fml40::HarvestedVolume",
                            "volume": 140
                        }
                    ]
                }]
                feature_config_json = ml.make_feature_config(class_name="fml40::FellingJob", subFeatures=subFeatures,
                                                             name="my_felling_job")
                felling_job = ml.build_feature(feature=feature_config_json)
                felling_job_json = felling_job.to_json()
                print("[S³I]: A felling job is generated with the job id: {}".format(felling_job_json["identifier"]))
                parameter = {"job": felling_job_json}
    
            elif method == "queryJobStatus":
                job_id = input("[S³I]: Please enter the job id: ")
                parameter = {"identifier": job_id}
    
            elif method == "removeJob":
                job_id = input("[S³I]: Please enter the job id: ")
                parameter = {"identifier": job_id}
    
        elif class_name == "fml40::ProvidesProductionData":
            available_methods = ["getProductionData"]
            method = input("[S³I]: which method should be called? {0}{1}{2} ".format(bcolors.UNDERLINE, available_methods,
                                                                                    bcolors.ENDC))
            while method not in available_methods:
                method = input(
                    "[S³I]: which method should be called? {0}{1}{2} ".format(bcolors.UNDERLINE, available_methods,
                                                                             bcolors.ENDC))
            if method == "getProductionData":
                name = input("[S³I]: Please enter the name of your production data: [Stammsegment 4711, Stammsegment 4712] ")
                parameter = {"name": name}
    
        serv_req.fillServiceRequest(
            senderUUID=hmi_cred.get("identifier"), receiverUUID=[receiver], sender_endpoint=hmi_endpoint,
            serviceType="{}/{}".format(class_name, method),
            parameters=parameter,
            msgUUID="s3i:{}".format(uuid.uuid4())
        )
        return serv_req.msg
    
    
    def prepare_get_value_request():
        """
        Prepare a S³I-B GetValueRequest
    
        :return: S³I-B GetValueRequest
        :rtype: dict
        """
        getv_req = s3i.GetValueRequest()
        attribute_path = input("[S³I]: Please enter the attribute path: ["
                               "attributes/features/ml40::Composite/targets/ml40::Engine/features/ml40::RotationalSpeed"
                               "/rpm]")
        getv_req.fillGetValueRequest(
            senderUUID=hmi_cred.get("identifier"), receiverUUID=[receiver], sender_endpoint=hmi_endpoint,
            attributePath=attribute_path, msgUUID="s3i:{}".format(uuid.uuid4())
        )
        return getv_req.msg
    
    
    if __name__ == "__main__":
        """
        Configure and run the HMI 
        """
        config_file_name = ml.make_thing_config(thing_id=hmi_cred.get("identifier"), name="hmi",
                                                roles=[{"class": "ml40::HMI"}],
                                                config_path=config_path)
        hmi_model = ml.load_config(config_filepath=os.path.join(config_path, config_file_name))
        hmi = ml.create_thing(model=hmi_model, grant_type="password", secret=hmi_cred.get("secret"),
                              username=hmi_cred.get("username"), password=hmi_cred.get("password"),
                              is_broker_rest=False, is_broker=True, is_repo=False)
        hmi.run_forever()
        hmi_endpoint = ml.find_broker_endpoint(hmi.dir, hmi_cred.get("identifier"))
        receiver = "s3i:b6d1cc6d-896c-40fe-9403-b5b7682b1d03"
    
        """
        While loop to edit S³I messages and then send it to the Digital Twin 
        """
        while True:
            print("[S³I]: You can send following messages to forwarder a by entering 1 or 2")
            request_type = input(" {0}[1]: service request, [2]: get value request:{1} ".format(
                bcolors.OKBLUE, bcolors.ENDC
            ))
            if request_type == "1":
                msg = prepare_service_request()
                if msg is None:
                    continue
            elif request_type == "2":
                msg = prepare_get_value_request()
                if msg is None:
                    continue
            else:
                continue
            receiver_endpoint = ml.find_broker_endpoint(hmi.dir, thing_id=receiver)
            resp = hmi.broker.send([receiver_endpoint], json.dumps(msg))