Commit 19069960 authored by Jiahang Chen's avatar Jiahang Chen
Browse files

add hmi for permanent digital twin

parent 7e5ff90e
Pipeline #367706 passed with stages
in 35 seconds
import s3i
import jwt
from ml.tools import load_config, make_thing_config, make_sub_thing
from ml.dt_factory import create_thing, build_feature, add_function_impl_obj
from ml.fml40.features.functionalities.accepts_felling_jobs import AcceptsFellingJobs
from ml.ml40.features.properties.values.documents.jobs.job_status import JobStatus
from ml.app_logger import APP_LOGGER, setup_logger
import time
from config import *
import os
import json
from config import *
dt_creation_app_id = dt_creation_app_id
dt_creation_app_secret = dt_creation_app_secret
# username = input('[S3I]: Please enter your username:').strip('," ')
# password = input('[S3I]: Please enter your password:')
cred = chen
username = cred["username"]
password = cred["password"]
s3i_identity_provider = s3i.IdentityProvider(grant_type='password',
identity_provider_url="https://idp.s3i.vswf.dev/",
realm='KWH',
client_id=dt_creation_app_id,
client_secret=dt_creation_app_secret,
username=username,
password=password)
access_token = s3i_identity_provider.get_token(s3i.TokenType.ACCESS_TOKEN)
''' decode the access token'''
parsed_username = jwt.decode(access_token, verify=False)["preferred_username"]
### create identity of digital twin
"""
s3i_config = s3i.Config(access_token)
resp = s3i_config.create_thing()
dt_id = resp.json().get("identifier", None)
dt_secret = resp.json().get("secret", None)
s3i_config.create_broker_queue(thing_id=dt_id)
res = s3i_config.create_cloud_copy(thing_id=dt_id)
"""
dt_cred = {
"identifier": dt_id,
"secret": dt_secret
}
dt_name = "my_dt_harvester"
config_path = os.path.abspath(os.path.join(os.path.dirname(__file__), os.path.pardir, "configs"))
cred_filepath = os.path.join(config_path, "{}_cred.json".format(dt_name))
with open(cred_filepath, 'wb') as file:
file.write(json.dumps(dt_cred).encode('utf-8'))
config_engine = make_sub_thing(name="my_engine", roles=[{"class": "ml40::Engine"}],
features=[
{"class": "ml40::RotationalSpeed",
"rpm": 2001}
])
config_cran = make_sub_thing(name="my_bord_computer", roles=[{"class": "ml40::MachineUI"}])
config_file_name = make_thing_config(thing_id=dt_id, name=dt_name, roles=[{"class": "fml40::Harvester"}],
features=[{"class": "fml40::ProvidesProductionData"},
{"class": "fml40::AcceptsFellingJobs"},
{"class": "ml40::Location", "longitude": 6.45435, "latitude": 52.543534},
{"class": "ml40::Composite",
"targets": [config_engine, config_cran]}],
config_path=config_path)
setup_logger(dt_name)
dt_model = load_config(config_filepath=os.path.join(config_path, "my_dt_harvester.json"))
with open(cred_filepath) as file:
dt_cred = json.load(file)
dt = create_thing(model=dt_model, grant_type="client_credentials", secret=dt_cred.get("secret"),
is_broker_rest=True,
is_broker=True, is_repo=True)
class AcceptsFellingJobsImpl(AcceptsFellingJobs):
def __init__(self, name="", identifier=""):
super(AcceptsFellingJobs, self).__init__(
name=name,
identifier=identifier)
self.job_list = []
def acceptJob(self, job):
APP_LOGGER.info("Checking if the felling job can be accepted.")
if isinstance(job, dict):
try:
felling_job = build_feature(feature=job)
for job in self.job_list:
if job.identifier == felling_job.identifier:
APP_LOGGER.info("Job with ID {} has been rejected, because this job was already accepted".format(
felling_job.identifier))
return False
felling_job.status = JobStatus.InProgress.name
self.job_list.append(felling_job)
APP_LOGGER.info("Job with ID {} has been accepted".format(felling_job.identifier))
return True
except:
APP_LOGGER.info("Job with ID {} has been rejected".format(felling_job.identifier))
return False
def queryJobStatus(self, identifier):
APP_LOGGER.info("Checking the job status of job {}".format(identifier))
for job in self.job_list:
if job.identifier == identifier:
APP_LOGGER.info("Job {} is now in status {}".format(identifier, job.status))
return {"identifier": identifier, "status": job.status}
APP_LOGGER.info("Job {} can not be queried".format(identifier))
return {"identifier": identifier, "status": "NOT FOUND"}
def removeJob(self, identifier):
APP_LOGGER.info("Checking if i can remove the job {}".format(identifier))
for job in self.job_list:
if job.identifier == identifier:
self.job_list.remove(job)
APP_LOGGER.info("Job {} removed".format(identifier))
return True
APP_LOGGER.info("Job {} can not be found".format(identifier))
return False
def simulate_rpm():
my_engine = dt.features["ml40::Composite"].targets["my_engine"]
tank = "up"
while True:
if tank == "down":
__new_rpm = my_engine.features["ml40::RotationalSpeed"].rpm - 10
if __new_rpm < 2000:
tank = "up"
elif tank == "up":
__new_rpm = my_engine.features["ml40::RotationalSpeed"].rpm + 10
if __new_rpm > 2500:
tank = "down"
my_engine.features["ml40::RotationalSpeed"].rpm = __new_rpm
time.sleep(1)
add_function_impl_obj(dt, AcceptsFellingJobsImpl, "fml40::AcceptsFellingJobs")
dt.add_user_def(func=simulate_rpm)
dt.run_forever()
import s3i
import jwt
import uuid
from ml.tools import find_broker_endpoint, make_thing_config, load_config, make_feature_config
from ml.dt_factory import create_thing, build_feature
from ml.app_logger import setup_logger
import requests
import json
#from config import *
import os
from config import *
root_path = os.path.abspath(os.path.join(os.path.dirname(__file__), os.path.pardir))
config_path = os.path.join(root_path, "configs")
cred = chen
hmi_id = cred["application_id"]
hmi_secret = cred["application_secret"]
username = cred["username"]
password = cred["password"]
setup_logger("my HMI")
config_file_name = make_thing_config(thing_id=hmi_id, name="my_HMI", roles=[{"class": "ml40::HMI"}],
config_path=config_path)
hmi_model = load_config(config_filepath=os.path.join(config_path, config_file_name))
hmi = create_thing(model=hmi_model, grant_type="password", secret=hmi_secret,
username=username, password=password,
is_broker_rest=True, is_broker=True, is_repo=False)
hmi.run_forever()
hmi_endpoint = find_broker_endpoint(hmi.dir, hmi_id)
receiver = input("[S³I]: Please enter the id of your digital twin: ")
"""
Print out thing entry from directory and repository
"""
serv_req = s3i.messages.ServiceRequest()
subFeatures = [{
"class": "fml40::Assortment",
"grade": "fl",
"name": "Stammholz Abschnitte",
"subFeatures": [
{
"class": "fml40::ThicknessClass",
"name": ">"
},
{
"class": "fml40::WoodQuality",
"name": "B-C"
},
{
"class": "fml40::HarvestingParameters",
"cuttingLengths": 20
},
{
"class": "fml40::TreeType",
"name": "Spruce",
"conifer": True
},
{
"class": "fml40::HarvestedVolume",
"volume": 140
}
]
}]
feature_config_json = make_feature_config(class_name="fml40::FellingJob", subFeatures=subFeatures,
name="my_felling_job")
felling_job = build_feature(feature=feature_config_json)
serv_req.fillServiceRequest(
senderUUID=hmi_id, receiverUUID=[receiver], sender_endpoint=hmi_endpoint,
#serviceType="fml40::AcceptsFellingJobs/removeJob",
serviceType="fml40::AcceptsFellingJobs/acceptJob",
#parameters={"identifier": "s3i:d09214a7-46bb-4672-b4e0-2c0aa0e395a3"},
parameters={"job": felling_job.to_json()},
msgUUID="s3i:{}".format(uuid.uuid4())
)
getv_req = s3i.GetValueRequest()
rpm_path = "attributes/features/ml40::Composite/targets/ml40::Engine/features/ml40::RotationalSpeed/rpm"
#rpm_path = "attributes/name"
#rpm_path = ""
getv_req.fillGetValueRequest(
senderUUID=hmi_id, receiverUUID=[receiver], sender_endpoint=hmi_endpoint,
attributePath=rpm_path, msgUUID="s3i:{}".format(uuid.uuid4())
)
receiver_endpoint = find_broker_endpoint(hmi.dir, thing_id=receiver)
resp = hmi.broker.send([receiver_endpoint], json.dumps(serv_req.msg))
print(resp.text)
\ No newline at end of file
import ml
import os
import json
import s3i
import uuid
"""
Configure the HMI.
The credentials file named hmi_cred.json must be located in the folder configs.
The configuration file named hmi.json must also be located in the same folder
"""
ml.setup_logger("hmi")
config_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "configs"))
cred_path = os.path.join(config_path, "hmi_cred.json")
with open(cred_path) as file:
hmi_cred = json.load(file)
class bcolors:
"""colors for the console log"""
HEADER = '\033[95m'
OKBLUE = '\033[94m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
def prepare_service_request():
"""
Prepare a S³I-B Service Request
:return: S³I-B Service Request
:rtype: dict
"""
available_services = ["fml40::AcceptsFellingJobs", "fml40::ProvidesProductionData"]
print("[S³I]: Following services are available: {0}{1}{2}".format(bcolors.UNDERLINE, available_services,
bcolors.ENDC))
class_name = input("[S³I]: Please enter one of these services: ")
serv_req = s3i.messages.ServiceRequest()
while class_name not in available_services:
print("[S³I]: service {0}{1}{2} is not available".format(bcolors.FAIL, class_name, bcolors.ENDC))
print("[S³I]: Following services are available: {0}{1}{2}".format(bcolors.UNDERLINE, available_services,
bcolors.ENDC))
class_name = input("[S³I]: Please enter one of these services: ")
if class_name == "fml40::AcceptsFellingJobs":
available_methods = ["acceptJob", "queryJobStatus", "removeJob"]
method = input("[S³I]: which method should be called? {0}{1}{2} ".format(bcolors.UNDERLINE, available_methods,
bcolors.ENDC))
while method not in available_methods:
method = input(
"[S³I]: which method should be called? {0}{1}{2} ".format(bcolors.UNDERLINE, available_methods,
bcolors.ENDC))
if method == "acceptJob":
subFeatures = [{
"class": "fml40::Assortment",
"grade": "fl",
"name": "Stammholz Abschnitte",
"subFeatures": [
{
"class": "fml40::ThicknessClass",
"name": ">"
},
{
"class": "fml40::WoodQuality",
"name": "B-C"
},
{
"class": "fml40::HarvestingParameters",
"cuttingLengths": 20
},
{
"class": "fml40::TreeType",
"name": "Spruce",
"conifer": True
},
{
"class": "fml40::HarvestedVolume",
"volume": 140
}
]
}]
feature_config_json = ml.make_feature_config(class_name="fml40::FellingJob", subFeatures=subFeatures,
name="my_felling_job")
felling_job = ml.build_feature(feature=feature_config_json)
felling_job_json = felling_job.to_json()
print("[S³I]: A felling job is generated with the job id: {}".format(felling_job_json["identifier"]))
parameter = {"job": felling_job_json}
elif method == "queryJobStatus":
job_id = input("[S³I]: Please enter the job id: ")
parameter = {"identifier": job_id}
elif method == "removeJob":
job_id = input("[S³I]: Please enter the job id: ")
parameter = {"identifier": job_id}
elif class_name == "fml40::ProvidesProductionData":
available_methods = ["getProductionData"]
method = input("[S³I]: which method should be called? {0}{1}{2} ".format(bcolors.UNDERLINE, available_methods,
bcolors.ENDC))
while method not in available_methods:
method = input(
"[S³I]: which method should be called? {0}{1}{2} ".format(bcolors.UNDERLINE, available_methods,
bcolors.ENDC))
if method == "getProductionData":
name = input("[S³I]: Please enter the name of your production data: [Stammsegment 4711, Stammsegment 4712] ")
parameter = {"name": name}
serv_req.fillServiceRequest(
senderUUID=hmi_cred.get("identifier"), receiverUUID=[receiver], sender_endpoint=hmi_endpoint,
serviceType="{}/{}".format(class_name, method),
parameters=parameter,
msgUUID="s3i:{}".format(uuid.uuid4())
)
return serv_req.msg
def prepare_get_value_request():
"""
Prepare a S³I-B GetValueRequest
:return: S³I-B GetValueRequest
:rtype: dict
"""
getv_req = s3i.GetValueRequest()
attribute_path = input("[S³I]: Please enter the attribute path: ["
"attributes/features/ml40::Composite/targets/ml40::Engine/features/ml40::RotationalSpeed"
"/rpm]")
getv_req.fillGetValueRequest(
senderUUID=hmi_cred.get("identifier"), receiverUUID=[receiver], sender_endpoint=hmi_endpoint,
attributePath=attribute_path, msgUUID="s3i:{}".format(uuid.uuid4())
)
return getv_req.msg
if __name__ == "__main__":
"""
Configure and run the HMI
"""
config_file_name = ml.make_thing_config(thing_id=hmi_cred.get("identifier"), name="hmi",
roles=[{"class": "ml40::HMI"}],
config_path=config_path)
hmi_model = ml.load_config(config_filepath=os.path.join(config_path, config_file_name))
hmi = ml.create_thing(model=hmi_model, grant_type="password", secret=hmi_cred.get("secret"),
username=hmi_cred.get("username"), password=hmi_cred.get("password"),
is_broker_rest=False, is_broker=True, is_repo=False)
hmi.run_forever()
hmi_endpoint = ml.find_broker_endpoint(hmi.dir, hmi_cred.get("identifier"))
receiver = "s3i:b6d1cc6d-896c-40fe-9403-b5b7682b1d03"
"""
While loop to edit S³I messages and then send it to the Digital Twin
"""
while True:
print("[S³I]: You can send following messages to forwarder a by entering 1 or 2")
request_type = input(" {0}[1]: service request, [2]: get value request:{1} ".format(
bcolors.OKBLUE, bcolors.ENDC
))
if request_type == "1":
msg = prepare_service_request()
if msg is None:
continue
elif request_type == "2":
msg = prepare_get_value_request()
if msg is None:
continue
else:
continue
receiver_endpoint = ml.find_broker_endpoint(hmi.dir, thing_id=receiver)
resp = hmi.broker.send([receiver_endpoint], json.dumps(msg))
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment