Commit b1ba2165 authored by Jiahang Chen's avatar Jiahang Chen
Browse files

add callback functions for setvalue msgs

parent 79d2a8ed
Pipeline #459321 passed with stages
in 51 seconds
...@@ -6,11 +6,11 @@ ...@@ -6,11 +6,11 @@
import uuid import uuid
import time import time
import copy import copy
from s3i import IdentityProvider, TokenType, GetValueReply, Directory from s3i import IdentityProvider, TokenType, Directory
from s3i import Repository from s3i import Repository
import s3i.exception import s3i.exception
from s3i.broker import Broker, BrokerREST from s3i.broker import Broker, BrokerREST
from s3i.messages import ServiceReply from s3i.messages import ServiceReply, GetValueReply, SetValueReply
from ml.tools import find_broker_endpoint from ml.tools import find_broker_endpoint
from ml.app_logger import APP_LOGGER from ml.app_logger import APP_LOGGER
...@@ -445,6 +445,10 @@ def __on_broker_callback(self, ch, method, properties, body): ...@@ -445,6 +445,10 @@ def __on_broker_callback(self, ch, method, properties, body):
self.on_get_value_reply(body) self.on_get_value_reply(body)
elif message_type == "serviceReply": elif message_type == "serviceReply":
self.on_service_reply(body) self.on_service_reply(body)
elif message_type == "setValueRequest":
self.on_set_value_request(body)
elif message_type == "setValueReply":
self.on_set_value_reply(body)
else: else:
### TODO send user message reply back ### TODO send user message reply back
pass pass
...@@ -734,6 +738,55 @@ def on_service_request(self, body_json): ...@@ -734,6 +738,55 @@ def on_service_request(self, body_json):
else: else:
APP_LOGGER.critical(__log) APP_LOGGER.critical(__log)
def on_set_value_request(self, msg):
"""Handles incoming S³I-B SetValueRequest. Prints the content of msg to stdout.
:param msg: GetValueReply
"""
__log = "[S3I][Broker]: You have received a S3I-B SetValueRequest"
APP_LOGGER.info(__log)
set_value_reply = SetValueReply()
request_sender = msg.get("sender")
request_msg_id = msg.get("identifier")
request_sender_endpoint = msg.get("replyToEndpoint")
attribute_path = msg.get("attributePath")
new_value = msg.get("newValue")
reply_msg_uuid = "s3i:" + str(uuid.uuid4())
try:
__log = "[S3I]: Search the given attribute path: {}".format(attribute_path)
APP_LOGGER.info(__log)
self._uriToData(attribute_path)
self.dt_json[attribute_path] = new_value
result = True
except KeyError:
__log = "[S3I]: Invalid attribute path"
APP_LOGGER.critical(__log)
result = False
set_value_reply.fillSetValueReply(
senderUUID=self.thing_id,
receiverUUIDs=[request_sender],
ok=result,
replyingToUUID=request_msg_id,
msgUUID=reply_msg_uuid
)
res = self.__send_message_to_broker(
receiver_endpoints=[request_sender_endpoint],
msg=set_value_reply.msg
)
if self.__is_broker_rest:
if res.status_code == 201:
__log = "[S3I][Broker]: Send S3I-B GetValueReply back to the requester"
APP_LOGGER.info(__log)
else:
__log = "[S3I[Broker]: " + res.text
APP_LOGGER.info(__log)
def on_get_value_reply(self, msg): def on_get_value_reply(self, msg):
"""Handles incoming S³I-B GetValueReply. Prints the content of msg to stdout. """Handles incoming S³I-B GetValueReply. Prints the content of msg to stdout.
...@@ -746,7 +799,7 @@ def on_get_value_reply(self, msg): ...@@ -746,7 +799,7 @@ def on_get_value_reply(self, msg):
__log = "[S3I][Broker]: You have received a S3I-B GetValueReply" __log = "[S3I][Broker]: You have received a S3I-B GetValueReply"
APP_LOGGER.info(__log) APP_LOGGER.info(__log)
value = msg.get("value", None) value = msg.get("value", None)
if isinstance(value, dict): if isinstance(value, (dict, list)):
value = json.dumps(value, indent=2) value = json.dumps(value, indent=2)
__log = "[S3I][Broker]: The queried value is: {0}".format(value) __log = "[S3I][Broker]: The queried value is: {0}".format(value)
APP_LOGGER.info(__log) APP_LOGGER.info(__log)
...@@ -766,6 +819,18 @@ def on_service_reply(self, msg): ...@@ -766,6 +819,18 @@ def on_service_reply(self, msg):
__log = "[S3I][Broker]: The result is: {0}".format(results) __log = "[S3I][Broker]: The result is: {0}".format(results)
APP_LOGGER.info(__log) APP_LOGGER.info(__log)
def on_set_value_reply(self, msg):
"""Handles incoming S³I-B SetValueReply. Prints the content of msg to stdout.
:param msg: GetValueReply
"""
__log = "[S3I][Broker]: You have received a S3I-B SetValueReply"
APP_LOGGER.info(__log)
result = msg.get("ok", None)
__log = "[S3I][Broker]: The queried value is: {0}".format(result)
APP_LOGGER.info(__log)
def to_dir_json(self): def to_dir_json(self):
"""Returns a dictionary representing this thing's directory entry. """Returns a dictionary representing this thing's directory entry.
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment