Commit d435b0b0 authored by Jiahang Chen's avatar Jiahang Chen
Browse files

fix broker rest interface

parent 6f930d36
Pipeline #439294 passed with stages
in 51 seconds
...@@ -394,20 +394,21 @@ def __connect_with_broker(self): ...@@ -394,20 +394,21 @@ def __connect_with_broker(self):
) )
if self.__is_broker_rest: if self.__is_broker_rest:
self.broker = BrokerREST(token=self.access_token) self.broker = BrokerREST(token=self.access_token)
def receive(): def receive():
self.__endpoint = find_broker_endpoint(self.dir, thing_id=self.thing_id)
while True: while True:
try: try:
time.sleep(0.1) time.sleep(0.1)
msg_str = self.broker.receive_once(self.__endpoint) msg = self.broker.receive_once(self.__endpoint)
if msg_str == "": msg_to_json = lambda msg: msg if isinstance(msg, dict) else json.loads(msg)
if msg == {}:
continue continue
else: else:
self.__on_broker_callback( self.__on_broker_callback(
ch=None, ch=None,
method=None, method=None,
properties=None, properties=None,
body=json.loads(msg_str), body=msg_to_json(msg),
) )
except: except:
continue continue
...@@ -453,7 +454,8 @@ def __on_broker_callback(self, ch, method, properties, body): ...@@ -453,7 +454,8 @@ def __on_broker_callback(self, ch, method, properties, body):
elif isinstance(body, str): elif isinstance(body, str):
pass pass
self.broker.receiver.acknowledge_message(method.delivery_tag) if ch is not None:
ch.basic_ack(method.delivery_tag)
try: try:
message_type = body.get("messageType") message_type = body.get("messageType")
if message_type == "userMessage": if message_type == "userMessage":
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment