Commit ea6caa93 authored by Gero Müller's avatar Gero Müller
Browse files

move get_polling_publihser to socketbus, improve send logging

parent 8efab7ca
......@@ -5,7 +5,7 @@ import cherrypy
import vispa
from vispa.controller import AbstractController
from vispa.socketbus import SocketPublisher, PollingPublisher, add_session, \
USE_SOCKETS, SUBSCRIBERS, POLLING_TIMESTAMPS
USE_SOCKETS, SUBSCRIBERS, POLLING_TIMESTAMPS, get_polling_publisher
from vispa import MessageException
from time import time
import json
......@@ -20,25 +20,14 @@ class BusController(AbstractController):
def index(self, *args, **kwargs):
pass
def get_polling_publisher(self, window_id, user_id):
if window_id in SUBSCRIBERS.keys():
publisher = SUBSCRIBERS[window_id]
if not isinstance(publisher, PollingPublisher):
return None
else:
publisher = PollingPublisher(window_id)
add_session(window_id, user_id, publisher)
return publisher
@cherrypy.expose
@cherrypy.tools.ajax()
def poll(self, timeoutms=10000):
wid = cherrypy.request.private_params.get("_windowId")
uid = cherrypy.request.user.id
publisher = self.get_polling_publisher(wid, uid)
publisher = get_polling_publisher(wid, uid)
if not isinstance(publisher, PollingPublisher):
return []
POLLING_TIMESTAMPS[wid] = int(time())
self.release()
return publisher.fetch(int(timeoutms)*0.001)
......@@ -49,7 +38,7 @@ class BusController(AbstractController):
msg = kwargs.keys()[0]
window_id = cherrypy.request.private_params.get("_windowId")
user_id = self.get("user_id")
publisher = self.get_polling_publisher(window_id, user_id)
publisher = get_polling_publisher(window_id, user_id)
if isinstance(publisher, PollingPublisher):
publisher.received_message(msg)
......
......@@ -72,6 +72,18 @@ def remove_session(window_id):
vispa.publish("bus.session_removed", window_id, user_id)
def get_polling_publisher(window_id, user_id):
POLLING_TIMESTAMPS[window_id] = int(time())
if window_id in SUBSCRIBERS.keys():
publisher = SUBSCRIBERS[window_id]
if not isinstance(publisher, PollingPublisher):
return None
else:
publisher = PollingPublisher(window_id, user_id)
add_session(window_id, user_id, publisher)
return publisher
class Bus:
def __init__(self):
......@@ -91,6 +103,8 @@ class Bus:
if user_id is not None:
user_id = unicode(user_id)
logger.debug("send to window_id '%s', user_id '%s', broadcast %s" % (window_id, user_id, str(broadcast)))
if broadcast:
logger.debug("broadcast")
subscribers = SUBSCRIBERS.values()
......@@ -111,7 +125,7 @@ class Bus:
if encode_json:
data = dumps(data)
for subscriber in subscribers:
logger.debug("send")
logger.debug(" -> sending to subscriber: window_id '%s', user_id '%s'" % (subscriber.window_id, subscriber.user_id))
subscriber.send(data, binary=binary)
def send_topic(self, topic, *args, **kwargs):
......@@ -188,8 +202,9 @@ if USE_SOCKETS:
class PollingPublisher:
def __init__(self, window_id):
def __init__(self, window_id, user_id):
self.window_id = window_id
self.user_id = user_id
self._queue = Queue.Queue()
self._waiting_thread_id = None
cherrypy.engine.subscribe("stop", self._stop, 10)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment