Commit 2189551a authored by Marcel Rieger's avatar Marcel Rieger
Browse files

Use socketbus for polling, draft.

parent dfacde98
......@@ -29,7 +29,6 @@ use_forgot = False
[websockets]
enabled = False
secure = False
[executable]
# pxlrun command options, call of executable will be:
......
......@@ -20,7 +20,7 @@ from . import url
try:
import ConfigParser as cp
except:
import configparser as cp # @UnresolvedImport @Reimport
import configparser as cp
logger = logging.getLogger(__name__)
......@@ -236,3 +236,7 @@ class Netstat(object):
class MessageException(Exception):
pass
# the bus
bus = None
# -*- coding: utf-8 -*-
# imports
import vispa
import cherrypy
from time import time, sleep
from threading import Thread
import json
# use websockets?
USE_SOCKETS = vispa.config('websockets', 'enabled', False)
# subscribers stored as {sessionid: Publisher}
SUBSCRIBERS = {}
# a dict, userid -> sessionids (list)
USERSESSIONS = {}
# a dict, sessionid -> userid
SESSIONOWNERS = {}
# timestamps only for PollingPublishers to handle
# the deletion of their entry in SUBSCRIBERS
# (SocketPublishers are removed automatically)
TIMESTAMPS = {}
# add function
def add_session(sessionid, userid, publisher):
# add SUBSCRIBERS entry
SUBSCRIBERS[sessionid] = publisher
# add TIMESTMAPS entry?
if isinstance(publisher, PollingPublisher):
TIMESTAMPS[sessionid] = int(time())
# add USERSESSIONS entry
if userid not in USERSESSIONS.keys():
USERSESSIONS[userid] = []
if sessionid not in USERSESSIONS[userid]:
USERSESSIONS[userid].append(sessionid)
# add SESSIONOWNERS entry
SESSIONOWNERS[sessionid] = userid
# remove function
def remove_session(sessionid):
# remove from SUBSCRIBERS
if sessionid in SUBSCRIBERS.keys():
del SUBSCRIBERS[sessionid]
# remove from SESSIONOWNERS
if sessionid in SESSIONOWNERS.keys():
userid = SESSIONOWNERS[sessionid]
# remove from USERSESSIONS
if userid in USERSESSIONS.keys():
USERSESSIONS[userid].remove(sessionid)
if not len(USERSESSIONS[userid]):
del USERSESSIONS[userid]
# delete the entry in SESSIONOWNERS
del SESSIONOWNERS[sessionid]
if sessionid in TIMESTAMPS.keys():
# delete the timestamp
del TIMESTAMPS[sessionid]
# options for the CleanerThread
# this has to be a dict, since elementary
# datatypes don't live in the shared memory
CLEANEROPTIONS = {'RUN': True,
'DELAY': 5,
'MAXDIFF': 10}
class CleanerThread:
def __init__(self):
while CLEANEROPTIONS['RUN']:
for sessionid, stamp in TIMESTAMPS.items():
if int(time()) - stamp > CLEANEROPTIONS['MAXDIFF']:
# delete the publisher and all its data
remove_session(sessionid)
sleep(CLEANEROPTIONS['DELAY'])
class Bus:
def __init__(self):
# pub/sub data storage
self.handlers = {}
# configure and start the cleaner thread
cleaner = Thread(target=CleanerThread, name='BusPollingCleaner')
cleaner.daemon = True
cherrypy.engine.subscribe('stop', self._stop_cleaner)
cleaner.start()
def _stop_cleaner(self):
CLEANEROPTIONS['RUN'] = False
def send(self, sessionid=None, userid=None, except_sids=[], payload=None, binary=False):
# sessionid is preferred wrt userid
if sessionid:
sessionid = unicode(sessionid)
if sessionid in SUBSCRIBERS.keys():
SUBSCRIBERS[sessionid].send(json.dumps(payload), binary=binary)
elif userid:
userid = unicode(userid)
if userid in USERSESSIONS.keys():
if not isinstance(except_sids, list):
except_sids = [except_sids]
for sid in USERSESSIONS[userid]:
if sid in except_sids:
continue
SUBSCRIBERS[sid].send(json.dumps(payload), binary=binary)
def sendtopic(self, topic, sessionid=None, userid=None, except_sids=[], payload=None, binary=False):
payload = payload or {}
if not isinstance(payload, dict):
payload = {'payload': payload}
payload['topic'] = topic
self.send(sessionid=sessionid, userid=userid, except_sids=except_sids, payload=payload, binary=binary)
def received_message(self, msg, sessionid):
# json?
try:
data = json.loads(str(msg))
except:
data = None
# atm, we cannot do anything when the msg
# is not json parsable or has no key 'topic'
if not data or not 'topic' in data.keys():
return
# sessionid has to be set for safety reasons
if not sessionid:
return
self.publish(data['topic'], sessionid, payload=data)
def publish(self, topic, sessionid, payload=None):
if topic in self.handlers.keys():
# userid?
userid = None
if sessionid in SESSIONOWNERS.keys():
userid = SESSIONOWNERS[sessionid]
for handler in self.handlers[topic]:
handler(sessionid=sessionid, userid=userid, payload=payload)
def subscribe(self, topic, handler):
if topic not in self.handlers.keys():
self.handlers[topic] = []
self.handlers[topic].append(handler)
def unsubscribe(self, topic, handler):
if topic in self.handlers.keys():
if handler in self.handlers[topic]:
self.handlers[topic].remove(handler)
bus = Bus()
class SocketPublisher:
pass
if USE_SOCKETS:
from ws4py.websocket import WebSocket
class SocketPublisher(WebSocket):
def __init__(self, *args, **kwargs):
WebSocket.__init__(self, *args, **kwargs)
# self.platform is set in the WSTool right after 'upgrade'
self.platform = None
# self.sesseionid and self.userid are set in 'socket_hook'
# in the bus controller
self.sessionid = None
self.userid = None
def store(self):
if self.sessionid:
add_session(self.sessionid, self.userid, self)
def closed(self, code, reason=None):
remove_session(self.sessionid)
def received_message(self, msg):
if self.platform:
self.platform.bus.received_message(msg, self.sessionid)
class PollingPublisher:
def __init__(self, sessionid):
self.sessionid = sessionid
self._stack = []
def send(self, payload, binary=False):
self._stack.append(payload)
# TODO: implement binary payloads
def fetch(self, n=0):
stack = []
if not n or n > len(self._stack):
stack = self._stack
self._stack = []
else:
stack = self._stack[:n]
self._stack = self._stack[n:]
return stack
def received_message(self, msg):
if self.platform:
bus.received_message(msg, self.sessionid)
......@@ -4,19 +4,19 @@
import cherrypy
import vispa
from vispa.controller import AbstractController
from vispa.bus import SocketPublisher, PollingPublisher, USE_SOCKETS, \
SUBSCRIBERS, USERSESSIONS, SESSIONOWNERS, TIMESTAMPS, add_session
from vispa.socketbus import SocketPublisher, PollingPublisher, add_session, \
USE_SOCKETS, SUBSCRIBERS, POLLING_TIMESTAMPS
from vispa import MessageException
from time import time
import json
class BusController(AbstractController):
@cherrypy.expose
def index(self, *args, **kwargs):
pass
def get_pollingpublisher(self, session_id, user_id):
def get_polling_publisher(self, session_id, user_id):
if session_id in SUBSCRIBERS.keys():
publisher = SUBSCRIBERS[session_id]
if not isinstance(publisher, PollingPublisher):
......@@ -27,24 +27,24 @@ class BusController(AbstractController):
return publisher
@cherrypy.expose
@cherrypy.tools.json_out()
def poll(self, *args, **kwargs):
session_id = self.get('session_id')
user_id = self.get('user_id')
publisher = self.get_pollingpublisher(session_id, user_id)
@cherrypy.tools.ajax()
def poll(self):
session_id = self.get("session_id")
user_id = self.get("user_id")
publisher = self.get_polling_publisher(session_id, user_id)
if not isinstance(publisher, PollingPublisher):
return ''
TIMESTAMPS[session_id] = int(time())
stack = publisher.fetch()
return stack
return []
POLLING_TIMESTAMPS[session_id] = int(time())
return publisher.fetch()
@cherrypy.expose
def receive(self, *args, **kwargs):
def send(self, *args, **kwargs):
# due to the application/x-www-form-urlencoded content type
# msg is the first key of kwargs
msg = kwargs.keys()[0]
session_id = cherrypy.session.id
user_id = self.get('user_id')
publisher = self.get_pollingpublisher(session_id, user_id)
session_id = self.get("session_id")
user_id = self.get("user_id")
publisher = self.get_polling_publisher(session_id, user_id)
if isinstance(publisher, PollingPublisher):
publisher.received_message(msg)
......@@ -56,11 +56,11 @@ if USE_SOCKETS:
def socket_hook(self, *args, **kwargs):
# the user is connected via this controller function
# on startup, so store user_id<->session_id information
# in the publisher at 'cherrypy.serving.request.ws_handler'
# in the publisher at "cherrypy.serving.request.ws_handler"
publisher = cherrypy.serving.request.ws_handler
setattr(publisher, 'session_id', cherrypy.session.id)
setattr(publisher, 'user_id', self.get('user_id'))
# call the publisher's 'store' method to register it to the bus
setattr(publisher, "session_id", self.get("session_id"))
setattr(publisher, "user_id", self.get("user_id"))
# call the publisher's "store" method to register it to the bus
publisher.store()
BusController.index = socket_hook
\ No newline at end of file
BusController.index = socket_hook
......@@ -20,7 +20,6 @@ logger = logging.getLogger(__name__)
class RootController(AbstractController):
def __init__(self):
"""
The Constructor. Members from other classes
......@@ -99,8 +98,7 @@ class RootController(AbstractController):
db = cherrypy.request.db
username = cherrypy.request.user.name
dev_mode = vispa.config("web", "dev_mode", True)
# use_websockets = vispa.config("websockets", "enabled", False)
# secure_websockets = vispa.config("websockets", "secure", False)
use_websockets = vispa.config("websockets", "enabled", False)
preferences = self.get_preferences(db, cherrypy.request.user.id, parse_json=True)
client_log_level = vispa.config("web", "client_log_level", "info")
workspace_ids = self.convert(self.get("workspace_ids"), int)
......@@ -113,8 +111,7 @@ class RootController(AbstractController):
"username" : username,
"common_js" : self.common_js,
"common_css" : self.common_css,
# "use_websockets" : use_websockets,
# "secure_websockets": secure_websockets,
"use_websockets" : use_websockets,
"workspace_ids" : workspace_ids,
"workspace_data" : workspace_data,
# "add_workspaces" : add_workspaces,
......
......@@ -125,8 +125,12 @@ class Server(object):
'tools.sessions.timeout': 1440,
'tools.staticdir.root': vispa.codepath('vispa', 'static'),
'tools.gzip.on': True,
'tools.gzip.mime_types': ['text/html', 'text/css', 'application/x-javascript', 'application/json'],
'tools.render.common_data': {'base_dynamic': base_dynamic, 'base_static': base_static},
'tools.gzip.mime_types': ['text/html', 'text/css',
'application/x-javascript', 'application/json'],
'tools.render.common_data': {
'base_dynamic': base_dynamic,
'base_static': base_static
},
},
'/extensions': {
'tools.workspace.on': True
......@@ -147,7 +151,7 @@ class Server(object):
'tools.sessions.on': False,
'tools.expires.on': True,
'tools.expires.secs': 3600 * 24 * 365,
},
}
}
def __init__(self, args):
......@@ -234,8 +238,10 @@ class Server(object):
def __init_plugins(self, args):
if vispa.config('websockets', 'enabled', False):
from ws4py.server.cherrypyserver import WebSocketPlugin # @UnresolvedImport
from ws4py.server.cherrypyserver import WebSocketPlugin
WebSocketPlugin(cherrypy.engine).subscribe()
from vispa.socketbus import Bus
vispa.bus = Bus()
mako_lookup_dir = os.path.join(os.path.dirname(__file__), "templates")
vispa.plugins.template.MakoPlugin(cherrypy.engine,
......@@ -272,7 +278,7 @@ class Server(object):
cherrypy.tools.json_parameters = JsonParameters()
try:
from ws4py.server.cherrypyserver import WebSocketTool # @UnresolvedImport
from ws4py.server.cherrypyserver import WebSocketTool
cherrypy.tools.websocket = WebSocketTool()
except:
pass
......
# -*- coding: utf-8 -*-
# imports
import vispa
import cherrypy
from time import time, sleep
from threading import Thread
import json
import logging
logger = logging.getLogger(__name__)
# use websockets?
USE_SOCKETS = vispa.config("websockets", "enabled", False)
# subscribers stored as {session_id: Publisher}
SUBSCRIBERS = {}
# a dict, user_id -> session_ids (list)
USER_SESSIONS = {}
# a dict, session_id -> user_id
SESSION_OWNERS = {}
# timestamps (only for PollingPublishers) to handle
# the deletion of their entry in SUBSCRIBERS
# (SocketPublishers are removed automatically)
POLLING_TIMESTAMPS = {}
def add_session(session_id, user_id, publisher):
logger.debug("add session %s for user %s" % (session_id, user_id))
session_id = unicode(session_id)
user_id = unicode(user_id)
# add SUBSCRIBERS entry
SUBSCRIBERS[session_id] = publisher
# add POLLING_TIMESTAMPS entry?
if isinstance(publisher, PollingPublisher):
POLLING_TIMESTAMPS[session_id] = int(time())
# add USER_SESSIONS entry
if user_id not in USER_SESSIONS.keys():
USER_SESSIONS[user_id] = []
if session_id not in USER_SESSIONS[user_id]:
USER_SESSIONS[user_id].append(session_id)
# add SESSION_OWNERS entry
SESSION_OWNERS[session_id] = user_id
def remove_session(session_id):
logger.debug("remove session %s" % session_id)
# remove from SUBSCRIBERS
if session_id in SUBSCRIBERS.keys():
del SUBSCRIBERS[session_id]
# remove from SESSION_OWNERS
if session_id in SESSION_OWNERS.keys():
user_id = SESSION_OWNERS[session_id]
# remove from USER_SESSIONS
if user_id in USER_SESSIONS.keys():
USER_SESSIONS[user_id].remove(session_id)
if not len(USER_SESSIONS[user_id]):
del USER_SESSIONS[user_id]
# delete the entry in SESSION_OWNERS
del SESSION_OWNERS[session_id]
if session_id in POLLING_TIMESTAMPS.keys():
# delete the timestamp
del POLLING_TIMESTAMPS[session_id]
class Bus:
def __init__(self):
# pub/sub data storage
self._handlers = {}
# configure and start the cleaner thread
cleaner = Thread(target=CleanerThread, name="SocketBusPollingCleaner")
cleaner.daemon = True
cherrypy.engine.subscribe("stop", self._stop_cleaner)
cleaner.start()
def _stop_cleaner(self):
CLEANEROPTIONS["RUN"] = False
def send(self, session_id=None, user_id=None, data=None,
except_sessions=None, binary=False):
except_sessions = except_sessions or []
subscribers = []
# session_id is preferred over user_id
if session_id:
session_id = unicode(session_id)
if session_id in SUBSCRIBERS.keys():
if session_id not in except_sessions:
subscribers.append(SUBSCRIBERS[session_id])
elif user_id:
user_id = unicode(user_id)
if user_id in USER_SESSIONS.keys():
for session_id in USER_SESSIONS[user_id]:
if session_id not in except_sessions:
subscribers.append(SUBSCRIBERS[session_id])
for subscriber in subscribers:
subscriber.send(json.dumps(data), binary=binary)
def send_topic(self, topic, *args, **kwargs):
data = kwargs.get("data", None)
data = {"data": data}
data["topic"] = topic
kwargs["data"] = data
self.send(*args, **kwargs)
def received_message(self, msg, session_id):
# json?
try:
data = json.loads(str(msg))
except:
data = None
# atm, we cannot do anything when the msg
# is not json parsable or has no key 'topic'
if not data or not "topic" in data.keys():
return
# session_id has to be set for safety reasons
if not session_id:
return
self.publish(data["topic"], session_id, data=data)
def publish(self, topic, session_id, data=None):
if topic in self._handlers.keys():
# user_id?
user_id = None
if session_id in SESSION_OWNERS.keys():
user_id = SESSION_OWNERS[session_id]
for handler in self._handlers[topic]:
handler(session_id=session_id, user_id=user_id, data=data)
def subscribe(self, topic, handler):
if topic not in self._handlers.keys():
self._handlers[topic] = []
self._handlers[topic].append(handler)
def unsubscribe(self, topic, handler=None):
if topic in self._handlers.keys():
if handler is None:
del self._handlers[topic]
elif handler in self._handlers[topic]:
self._handlers[topic].remove(handler)
class SocketPublisher:
pass
if USE_SOCKETS:
from ws4py.websocket import WebSocket
class SocketPublisher(WebSocket):
def __init__(self, *args, **kwargs):
WebSocket.__init__(self, *args, **kwargs)
# self.sesseionid and self.user_id are set in 'socket_hook'
# in the bus controller
self.session_id = None
self.user_id = None
def store(self):
if self.session_id:
add_session(self.session_id, self.user_id, self)
def closed(self, code, reason=None):
if self.session_id:
remove_session(self.session_id)
def received_message(self, msg):
vispa.bus.received_message(msg.data, self.session_id)
class PollingPublisher:
def __init__(self, session_id):
self.session_id = session_id
self._stack = []
def send(self, data, binary=False):
self._stack.append(data)
# TODO: implement binary data
def fetch(self, n=0):
stack = []
if not n or n > len(self._stack):
stack = self._stack
self._stack = []
else:
stack = self._stack[:n]
self._stack = self._stack[n:]
return stack
def received_message(self, msg):
vispa.bus.received_message(msg, self.session_id)
# options for the CleanerThread
# this has to be a dict, since elementary
# datatypes don't live in the shared memory
CLEANEROPTIONS = {
"RUN": True,
"DELAY": 5,
"MAXDIFF": 15
}
class CleanerThread:
def __init__(self):
logger.info("start SocketBusPollingCleaner thread")
while CLEANEROPTIONS["RUN"]:
for session_id, stamp in POLLING_TIMESTAMPS.items():
if int(time()) - stamp > CLEANEROPTIONS["MAXDIFF"]:
# delete the publisher and all its data
remove_session(session_id)
sleep(CLEANEROPTIONS["DELAY"])
\ No newline at end of file
var socketStates = {
"CONNECTING": 0,
"CONNECTED": 1,
"CLOSING": 2,
"CLOSED": 3
};
var Socket = Emitter.extend({
init: function(url, options) {
var self = this;
this._super();
this.url = url;
var defaultOptions = {
autoReconnect: true,
fallback: true,
forceFallback: false,
fallbackSendUrl: url ? url.replace("wss:", "https:").replace("ws:", "http:") : url,
fallbackSendMethod: "POST",
fallbackPollUrl: url ? url.replace("wss:", "https:").replace("ws:", "http:") : url,
fallbackPollMethod: "POST",
fallbackPollDelay: 3000,
fallbackPollGracePeriod: 5000,