# -*- coding: utf-8 -*- # imports import thread import vispa import cherrypy from time import time, sleep from threading import Thread from json import loads, dumps import logging import Queue logger = logging.getLogger(__name__) # use websockets? USE_SOCKETS = vispa.config("websockets", "enabled", False) # subscribers stored as {window_id: Publisher} SUBSCRIBERS = {} # a dict, user_id -> window_ids (list) USER_SESSIONS = {} # a dict, window_id -> user_id SESSION_OWNERS = {} # timestamps (only for PollingPublishers) to handle # the deletion of their entry in SUBSCRIBERS # (SocketPublishers are removed automatically) POLLING_TIMESTAMPS = {} def add_session(window_id, user_id, publisher): logger.debug("add session %s for user %s" % (window_id, user_id)) window_id = unicode(window_id) user_id = unicode(user_id) # add SUBSCRIBERS entry SUBSCRIBERS[window_id] = publisher # add POLLING_TIMESTAMPS entry? if isinstance(publisher, PollingPublisher): POLLING_TIMESTAMPS[window_id] = int(time()) # add USER_SESSIONS entry if user_id not in USER_SESSIONS.keys(): USER_SESSIONS[user_id] = [] if window_id not in USER_SESSIONS[user_id]: USER_SESSIONS[user_id].append(window_id) # add SESSION_OWNERS entry SESSION_OWNERS[window_id] = user_id vispa.publish("bus.session_added", window_id, user_id) def remove_session(window_id, delay=False): logger.debug("remove session %s" % window_id) window_id = unicode(window_id) # remove from SUBSCRIBERS if window_id in SUBSCRIBERS.keys(): del SUBSCRIBERS[window_id] if delay: POLLING_TIMESTAMPS[window_id] = int(time()) # remove from SESSION_OWNERS user_id = None last_lession = False if window_id in SESSION_OWNERS.keys(): user_id = SESSION_OWNERS[window_id] # remove from USER_SESSIONS if user_id in USER_SESSIONS.keys(): USER_SESSIONS[user_id].remove(window_id) if not len(USER_SESSIONS[user_id]): last_lession = True del USER_SESSIONS[user_id] # delete the entry in SESSION_OWNERS del SESSION_OWNERS[window_id] if window_id in POLLING_TIMESTAMPS.keys(): # delete the timestamp del POLLING_TIMESTAMPS[window_id] vispa.publish("bus.session_removed", window_id, user_id) if last_lession: logger.debug("no more user sessions %d" % int(user_id)) vispa.publish("bus.all_user_sessions_removed", int(user_id)) def get_polling_publisher(window_id, user_id): POLLING_TIMESTAMPS[window_id] = int(time()) if window_id in SUBSCRIBERS.keys(): publisher = SUBSCRIBERS[window_id] if not isinstance(publisher, PollingPublisher): return None else: publisher = PollingPublisher(window_id, user_id) add_session(window_id, user_id, publisher) return publisher class Bus: def __init__(self): # pub/sub data storage self._handlers = {} # configure and start the cleaner thread self.cleaner = CleanerThread(name="SocketBusPollingCleaner") self.cleaner.start() def send(self, window_id=None, user_id=None, data=None, except_sessions=None, binary=False, encode_json=True, broadcast=False): except_sessions = except_sessions or [] if window_id is not None: window_id = unicode(window_id) if user_id is not None: user_id = unicode(user_id) logger.debug("send to window_id '%s', user_id '%s', broadcast %s" % (window_id, user_id, str(broadcast))) if broadcast: logger.debug("broadcast") subscribers = SUBSCRIBERS.values() else: subscribers = [] # window_id is preferred over user_id if window_id: if window_id in SUBSCRIBERS.keys(): if window_id not in except_sessions: subscribers.append(SUBSCRIBERS[window_id]) elif user_id: user_id = unicode(user_id) if user_id in USER_SESSIONS.keys(): for window_id in USER_SESSIONS[user_id]: if window_id not in except_sessions: subscribers.append(SUBSCRIBERS[window_id]) if encode_json: data = dumps(data) for subscriber in subscribers: logger.debug(" -> sending to subscriber: window_id '%s', user_id '%s'" % (subscriber.window_id, subscriber.user_id)) subscriber.send(data, binary=binary) def send_topic(self, topic, *args, **kwargs): data = kwargs.get("data", None) data = {"data": data, "topic": topic} kwargs["data"] = data self.send(*args, **kwargs) def received_message(self, msg, window_id): # json? try: data = loads(str(msg)) except: data = None # atm, we cannot do anything when the msg # is not json parsable or has no key 'topic' if not data or "topic" not in data.keys(): return # window_id has to be set for safety reasons if not window_id: return self.__publish(data["topic"], window_id, data=data) def __publish(self, topic, window_id, data=None): if topic in self._handlers.keys(): # user_id? user_id = None if window_id in SESSION_OWNERS.keys(): user_id = SESSION_OWNERS[window_id] for handler in self._handlers[topic]: handler(window_id=window_id, user_id=user_id, data=data, topic=topic) def subscribe(self, topic, handler): if topic not in self._handlers.keys(): self._handlers[topic] = [] self._handlers[topic].append(handler) def unsubscribe(self, topic, handler=None): if topic in self._handlers.keys(): if handler is None: del self._handlers[topic] elif handler in self._handlers[topic]: self._handlers[topic].remove(handler) class SocketPublisher: pass if USE_SOCKETS: from ws4py.websocket import WebSocket class SocketPublisher(WebSocket): def __init__(self, *args, **kwargs): WebSocket.__init__(self, *args, **kwargs) # self.sesseionid and self.user_id are set in 'socket_hook' # in the bus controller self.window_id = None self.user_id = None def store(self): if self.window_id: add_session(self.window_id, self.user_id, self) # prevent session from being removed soon POLLING_TIMESTAMPS[self.window_id] = int(time()) + 48 * 3600 def closed(self, *args, **kwargs): super(SocketPublisher, self).closed(*args, **kwargs) if self.window_id: remove_session(self.window_id, True) def received_message(self, msg): vispa.bus.received_message(msg.data, self.window_id) class PollingPublisher: def __init__(self, window_id, user_id): self.window_id = window_id self.user_id = user_id self._queue = Queue.Queue() self._waiting_thread_id = None cherrypy.engine.subscribe("stop", self._stop, 10) def _stop(self): logger.debug("stop poller") q = self._queue self._queue = None q.put("{}") logger.debug("stop poller done") def send(self, data, binary=False, timeout=None): if self._queue is not None: self._queue.put(data, timeout=timeout) # TODO: implement binary data def fetch(self, timeout=None): if self._queue is None: return None # finish existing thread by feeding empty event other_wainting = self._waiting_thread_id != thread.get_ident() if self._waiting_thread_id is not None and other_wainting: self._queue.put("{}") while self._waiting_thread_id is not None: sleep(0.01) try: self._waiting_thread_id = thread.get_ident() result = [self._queue.get(timeout=timeout)] while True: try: r = self._queue.get(timeout=0) result.append(r) except Queue.Empty: break return result except Queue.Empty: return None finally: self._waiting_thread_id = None def received_message(self, msg): vispa.bus.received_message(msg, self.window_id) class CleanerThread(Thread): def __init__(self, *kargs, **kwargs): Thread.__init__(self, *kargs, **kwargs) self.daemon = True self.running = True self.session_timeout = vispa.config("web", "session_timeout", 3600) self.session_interval = vispa.config("web", "session_interval", 60) cherrypy.engine.subscribe("exit", self._exit) def _exit(self): self._running = False def run(self): logger.info("start cleaning thread: %d / %d", self.session_timeout, self.session_interval) while self.running: for window_id, stamp in POLLING_TIMESTAMPS.items(): last_access = int(time()) - stamp if last_access > self.session_timeout: # delete the publisher and all its data remove_session(window_id) sleep(self.session_interval) logger.info("stop cleaning thread")