Skip to content
Snippets Groups Projects

Feature reconstructe thing

Merged Jiahang Chen requested to merge feature_reconstructe_thing into master
2 files
+ 66
11
Compare changes
  • Side-by-side
  • Inline
Files
2
+ 47
1
@@ -9,6 +9,7 @@
@@ -9,6 +9,7 @@
from pika import PlainCredentials
from pika import PlainCredentials
from pika.exceptions import UnroutableError
from pika.exceptions import UnroutableError
from ml.app_logger import APP_LOGGER
from ml.app_logger import APP_LOGGER
 
from ml.callback import CallbackManager
MSG_EXCHANGE = "demo.direct"
MSG_EXCHANGE = "demo.direct"
EVENT_EXCHANGE = "eventExchange"
EVENT_EXCHANGE = "eventExchange"
@@ -45,6 +46,11 @@ def receive_once(self, endpoint):
@@ -45,6 +46,11 @@ def receive_once(self, endpoint):
class Broker:
class Broker:
 
_ON_CONNECTION_OPEN = "_on_connection_open"
 
_ON_CONNECTION_CLOSED = "_on_connection_closed"
 
_ON_CHANNEL_OPEN = "_on_channel_open"
 
_ON_CHANNEL_CLOSED = "_on_channel_closed"
 
def __init__(self, token, endpoint, callback, loop):
def __init__(self, token, endpoint, callback, loop):
self.__token = token
self.__token = token
self.__endpoint = endpoint #TODO Event Queue, more queue?
self.__endpoint = endpoint #TODO Event Queue, more queue?
@@ -57,7 +63,7 @@ def __init__(self, token, endpoint, callback, loop):
@@ -57,7 +63,7 @@ def __init__(self, token, endpoint, callback, loop):
self.__consumer_tag = None
self.__consumer_tag = None
self.__is_consuming = False
self.__is_consuming = False
self.__callbacks = CallbackManager()
@property
@property
def token(self):
def token(self):
@@ -100,6 +106,10 @@ def on_connection_open(self, _unused_connection):
@@ -100,6 +106,10 @@ def on_connection_open(self, _unused_connection):
self.__channel = _unused_connection.channel(
self.__channel = _unused_connection.channel(
on_open_callback=self.on_channel_open
on_open_callback=self.on_channel_open
)
)
 
self.__callbacks.process_async_task(
 
self._ON_CONNECTION_OPEN,
 
self.__loop
 
)
@staticmethod
@staticmethod
def on_connection_open_error(_unused_connection, err):
def on_connection_open_error(_unused_connection, err):
@@ -117,6 +127,28 @@ def on_channel_open(self, _unused_channel):
@@ -117,6 +127,28 @@ def on_channel_open(self, _unused_channel):
prefetch_count=1
prefetch_count=1
)
)
self.start_consuming()
self.start_consuming()
 
self.__callbacks.process_async_task(
 
self._ON_CHANNEL_OPEN,
 
self.__loop
 
)
 
 
def add_on_channel_open_callback(self, callback, one_shot, *args, **kwargs):
 
self.__callbacks.add(
 
self._ON_CHANNEL_OPEN,
 
callback,
 
one_shot,
 
*args,
 
**kwargs
 
)
 
 
def add_on_connection_open_callback(self, callback, one_shot, *args, **kwargs):
 
self.__callbacks.add(
 
self._ON_CONNECTION_OPEN,
 
callback,
 
one_shot,
 
*args,
 
**kwargs
 
)
def start_consuming(self):
def start_consuming(self):
self.__consumer_tag = self.__channel.basic_consume(
self.__consumer_tag = self.__channel.basic_consume(
@@ -155,6 +187,20 @@ def reconnect_token_expired(self, token):
@@ -155,6 +187,20 @@ def reconnect_token_expired(self, token):
"""
"""
self.__until_all_closed_and_reconnect()
self.__until_all_closed_and_reconnect()
 
async def __until_connected(self):
 
if self.__channel is None or self.__connection is None:
 
await asyncio.sleep(0.1)
 
print("test")
 
else:
 
if self.__channel.is_open and self.__connection.is_open:
 
return
 
else:
 
print("test")
 
self.__loop.call_later(
 
0.1,
 
self.__until_connected
 
)
 
def __until_all_closed_and_reconnect(self):
def __until_all_closed_and_reconnect(self):
if not self.__channel.is_closed or not self.__connection.is_closed:
if not self.__channel.is_closed or not self.__connection.is_closed:
self.__loop.call_later(
self.__loop.call_later(
Loading