Commit 22172efb authored by Marcel's avatar Marcel
Browse files

Add socket bus on server-side. Add pubsub system over websockets with fallback...

Add socket bus on server-side. Add pubsub system over websockets with fallback method. Use vispa.ini for configuration.
parent 626ff17f
......@@ -12,6 +12,10 @@ welcome_text = <center>Welcome to VISPA!</center>
forgot_text = A mail with further instructions will be sent to your mail account!
use_forgot = False
[websockets]
use_websockets = True
secure_websockets = False
[executable]
# pxlrun command options, call of executable will be:
# pre_command_options command post_command_options
......
......@@ -10,14 +10,9 @@ var _bus = {
},
onmessage: function(event) {
var data = $.parseJSON(event.data.replace(/\'/g, '"'));
var topic = data.topic;
if (!topic) {
return;
} else {
delete data.topic;
}
if (_bus.topics[topic]) {
Bus(topic).publish(data);
console.log(data);return;
if (data.topic && _bus.topics[data.topic]) {
Bus(data.topic).publish(data);
}
},
onerror: function() {
......@@ -27,7 +22,7 @@ var _bus = {
var msgs = event.data.split('VISPAPOLLSTACKDELIMITER');
$.each(msgs, function(i, msg) {
event.data = msg;
_bus.config.onmessage(event);
_bus.settings.onmessage(event);
});
}
},
......@@ -67,9 +62,12 @@ var Bus = function(id) {
return self;
},
send = function() {
send = function(msg) {
if (_bus.websocket) {
_bus.websocket.send.apply(_bus.websocket, arguments);
if($.isPlainObject(msg)) {
msg = JSON.stringify(msg);
}
_bus.websocket.send(msg);
}
return self;
},
......
......@@ -37,7 +37,7 @@ var Vispa = function() {
updateConfig(_config);
commandPalette.startup(config.commandPalette);
messenger.startup(config.messenger);
bus = Bus(config.bus);
self.bus = bus = Bus(config.bus);
extensionManager.startup(config.extensionManager);
urlHandler.startup(config.url);
extensionView.startup(config.extensionView);
......
......@@ -85,6 +85,7 @@ $(function() {
var promise = configHandler.get();
// config extension with template values
var wsPath = $.Helpers.strFormat('{0}/{1}/bus/', location.host, $.Helpers.strBounds('${base_dynamic}', false, false));
var templateConfig = {
userName: '${username}',
url: {
......@@ -92,9 +93,10 @@ $(function() {
staticBase: '${base_static}'
},
bus: {//TODO
forceFallback: true,
fallbackPollUrl: 'http://localhost:4282/vispa/bus/poll',
fallbackSendUrl: null
address: $.Helpers.strFormat('ws{0}://{1}', '${'s' if secure_websockets else ''}', wsPath),
forceFallback: ${'false' if use_websockets else 'true'},
fallbackPollUrl: $.Helpers.strFormat('{0}//{1}poll', location.protocol, wsPath),
fallbackSendUrl: $.Helpers.strFormat('{0}//{1}receive', location.protocol, wsPath)
}
};
......@@ -107,6 +109,9 @@ $(function() {
startupLock.resolve();
// start the config update polling
configHandler.poll();
// test
vispa.bus('bla').subscribe(function(){console.log(arguments);});
};
$.when(promise).then(start);
});
......
# -*- coding: utf-8 -*-
# imports
import vispa
import cherrypy
from time import time, sleep
from threading import Thread
import json
# use websockets?
USE_SOCKETS = vispa.config.get2("websockets", "use_websockets", False)
# self.subscribers stored as {sessionid: Publisher}
SUBSCRIBERS = {}
# timestamps only for PollingPublishers to handle
# the deletion of their entry in self.subscribers
# (SocketPublishers are removed automatically)
TIMESTAMPS = {}
class Bus:
def __init__(self):
# pub/sub data storage
self.handlers = {}
# configure and start the cleaner thread
cleaner = Thread(target=CleanerThread, name='WebSocketPollingCleaner')
cherrypy.engine.subscribe('stop', self._stop_cleaner)
cleaner.start()
def _stop_cleaner(self):
CLEANEROPTIONS['RUN'] = False
def send(self, sessionid=None, payload=None, binary=False):
if sessionid:
SUBSCRIBERS[sessionid].send(json.dumps(payload), binary=binary)
else:
for sid in SUBSCRIBERS.keys():
SUBSCRIBERS[sid].send(json.dumps(payload), binary=binary)
def sendtopic(self, topic, sessionid=None, payload=None, binary=False):
payload = payload or {}
if isinstance(payload, dict):
payload['topic'] = topic
self.send(sessionid=sessionid, payload=payload, binary=binary)
def received_message(self, msg, sessionid):
# json?
try:
data = json.loads(str(msg))
except:
data = None
# atm, we cannot do anything when the msg
# is not json parsable or has no key 'topic'
if not data or not 'topic' in data.keys():
return
# sessionid has to be set for safety reasons
if not sessionid:
return
self.publish(data['topic'], sessionid, payload=data)
def publish(self, topic, sessionid, payload=None):
if topic in self.handlers.keys():
for handler in self.handlers[topic]:
handler(sessionid, payload=payload)
def subscribe(self, topic, handler):
if topic not in self.handlers.keys():
self.handlers[topic] = []
self.handlers[topic].append(handler)
def unsubscribe(self, topic, handler):
if topic in self.handlers.keys():
if handler in self.handlers[topic]:
self.handlers[topic].remove(handler)
if USE_SOCKETS:
from ws4py.websocket import WebSocket
class SocketPublisher(WebSocket):
def __init__(self, *args, **kwargs):
WebSocket.__init__(self, *args, **kwargs)
# self.platform is set in the WSTool right after 'upgrade'
self.platform = None
request = args[3]
self.__sessionid = request["HTTP_COOKIE"].split("=")[-1]
# TODO: test with more than one cookie in 'HTTP_COOKIE'
self.store()
def store(self):
SUBSCRIBERS[self.__sessionid] = self
def closed(self, code, reason=None):
del SUBSCRIBERS[self.__sessionid]
def received_message(self, msg):
if self.platform:
self.platform.bus.received_message(msg, self.__sessionid)
class PollingPublisher:
def __init__(self, platform, sessionid):
self.platform = platform
self.__sessionid = sessionid
self._stack = []
def send(self, payload, binary=False):
self._stack.append(payload)
# TODO: implement binary payloads
def fetch(self, n=0):
stack = []
if not n or n > len(self._stack):
stack = self._stack
self._stack = []
else:
stack = self._stack[:n]
self._stack = self._stack[n:]
return stack
def received_message(self, msg):
if self.platform:
self.platform.bus.received_message(msg, self.__sessionid)
# options for the CleanerThread
# this has to be a dict, since elementary
# datatypes don't live in the shared memory
CLEANEROPTIONS = {'RUN': True,
'DELAY': 5,
'MAXDIFF': 10}
class CleanerThread:
def __init__(self):
while CLEANEROPTIONS['RUN']:
for sessionid, stamp in TIMESTAMPS.items():
if int(time()) - stamp > CLEANEROPTIONS['MAXDIFF']:
if sessionid in SUBSCRIBERS.keys():
del SUBSCRIBERS[sessionid]
del TIMESTAMPS[sessionid]
sleep(CLEANEROPTIONS['DELAY'])
\ No newline at end of file
......@@ -2,96 +2,49 @@
# imports
import cherrypy
from time import time, sleep
from threading import Thread
import vispa
from vispa.controller import AbstractController
from vispa.bus import SocketPublisher, PollingPublisher, USE_SOCKETS, SUBSCRIBERS, TIMESTAMPS
from time import time
"""
from ws4py.server.cherrypyserver import WebSocketPlugin, WebSocketTool
#from ws4py.websocket import WebSocket
#WebSocketPlugin(cherrypy.engine).subscribe()
cherrypy.tools.websocket = WebSocketTool()
"""
# subscribers stored as {sessionid: Publisher}
SUBSCRIBERS = {}
# timestamps only for PollingPublishers to handle
# the deletion of their entry in SUBSCRIBERS
# (SocketPublishers are removed automatically)
TIMESTAMPS = {}
"""
class SocketPublisher(WebSocket):
def __init__(self, *args, **kwargs):
WebSocket.__init__(self, *args, **kwargs)
request = args[3]
self.sessionid = request["HTTP_COOKIE"].split("=")[-1]
# TODO: test with more than one cookie in 'HTTP_COOKIE'
SUBSCRIBERS[self.sessionid] = self
def closed(self, code, reason=None):
del SUBSCRIBERS[self.sessionid]
"""
class PollingPublisher:
def __init__(self, sessionid):
self._sessionid = sessionid
self._stack = []
def send(self, payload, binary=False):
self._stack.append(payload)
print self._stack
# TODO: implement binary payloads
def fetch(self, n=0):
stack = []
if not n or n > len(self._stack):
stack = self._stack
self._stack = []
else:
stack = self._stack[:n]
self._stack = self._stack[n:]
return stack
class BusController:
def __init__(self):
cleaner = Thread(target=CleanerThread, name='WebSocketPollingCleaner')
cherrypy.engine.subscribe('stop', self.stop_cleaner)
cleaner.start()
def stop_cleaner(self):
CLEANEROPTIONS['RUN'] = False
def send(self, sessionid, payload, binary=False):
if sessionid in SUBSCRIBERS.keys():
SUBSCRIBERS[sessionid].send(str(payload), binary=binary)
def sendtopic(self, sessionid, topic, payload, binary=False):
data = {'topic': str(topic), 'payload': str(payload)}
payload = str(data)
self.send(sessionid, payload, binary=binary)
"""
if USE_SOCKETS:
@cherrypy.expose
@cherrypy.tools.websocket(handler_cls=SocketPublisher)
def socket(self, *args, **kwargs):
def socket_hook(*args, **kwargs):
pass
else:
@cherrypy.expose
def socket_hook(*args, **kwargs):
pass
"""
@cherrypy.expose
def poll(self, *args, **kwargs):
sessionid = cherrypy.session.id
class BusController(AbstractController):
def __init__(self, platform):
AbstractController.__init__(self, platform)
self.platform = platform
# use websockets?
if USE_SOCKETS:
self.index = socket_hook
def get_pollingpublisher(self, sessionid):
if sessionid in SUBSCRIBERS.keys():
publisher = SUBSCRIBERS[sessionid]
if not isinstance(publisher, PollingPublisher):
return ''
return None
else:
publisher = PollingPublisher(sessionid)
publisher = PollingPublisher(self.platform, sessionid)
SUBSCRIBERS[sessionid] = publisher
return publisher
@cherrypy.expose
@cherrypy.tools.user()
@cherrypy.tools.allow(methods=["POST"])
def poll(self, *args, **kwargs):
sessionid = cherrypy.session.id
publisher = self.get_pollingpublisher(sessionid)
if not isinstance(publisher, PollingPublisher):
return ''
TIMESTAMPS[sessionid] = int(time())
stack = publisher.fetch()
if len(stack):
......@@ -99,21 +52,14 @@ class BusController:
else:
return ''
# options for the CleanerThread
# this has to be a dict, since elementary
# datatypes don't live in the shared memory
CLEANEROPTIONS = {'RUN': True,
'DELAY': 5,
'MAXDIFF': 10}
class CleanerThread:
def __init__(self):
while CLEANEROPTIONS['RUN']:
for sessionid, stamp in TIMESTAMPS.items():
if int(time()) - stamp > CLEANEROPTIONS['MAXDIFF']:
if sessionid in SUBSCRIBERS.keys():
del SUBSCRIBERS[sessionid]
del TIMESTAMPS[sessionid]
sleep(CLEANEROPTIONS['DELAY'])
\ No newline at end of file
@cherrypy.expose
@cherrypy.tools.user()
@cherrypy.tools.allow(methods=["POST"])
def receive(self, *args, **kwargs):
# msg is the first key of kwargs
msg = kwargs.keys()[0]
sessionid = cherrypy.session.id
publisher = self.get_pollingpublisher(sessionid)
if isinstance(publisher, PollingPublisher):
publisher.received_message(msg)
return ''
\ No newline at end of file
......@@ -26,7 +26,7 @@ class PlatformController(object):
self.error = ErrorController()
self.static = StaticController("static")
self.extensions = StaticController("vispa/extensions")
self.bus = BusController()
self.bus = BusController(platform)
@cherrypy.expose
@cherrypy.tools.user()
......@@ -35,9 +35,13 @@ class PlatformController(object):
@cherrypy.tools.render(template="sites/index.html")
def index(self, *args, **kwargs):
username = cherrypy.session['user_name']
use_websockets = vispa.config.get2("websockets", "use_websockets", False)
secure_websockets = vispa.config.get2("websockets", "secure_websockets", False)
data = {'username': username,
'common_js': self._platform.common_js,
'common_css': self._platform.common_css}
'common_css': self._platform.common_css,
'use_websockets': use_websockets,
'secure_websockets': secure_websockets}
return data
@cherrypy.expose
......
......@@ -16,11 +16,13 @@ from vispa.vfs import VirtualFileSystem
from vispa.helpers import browser
from vispa.models.user import User
from vispa.controller.platform import PlatformController
from vispa.bus import Bus
class Platform(object):
def __init__(self, var_dir, base_url="/"):
self.bus = Bus()
self.controller = PlatformController(self)
self.base_url = base_url
self.common_js = []
......
......@@ -88,6 +88,7 @@ class Server(object):
# replace the default log
#cherrypy.log = log
'''
# Register the Template engine tool
from vispa.tools.template import MakoTool
cherrypy.tools.render = MakoTool({'base_dynamic': vispa.url.base_dynamic, 'base_static': vispa.url.base_static})
......@@ -109,10 +110,22 @@ class Server(object):
from vispa.tools.stats import StatsTool
cherrypy.tools.stats = StatsTool()
# the websocket tool
use_websockets = vispa.config.get2("websockets", "use_websockets", False)
if use_websockets:
from vispa.tools.socket import WSTool
cherrypy.tools.websocket = WSTool()
from vispa.platform import Platform
platform = Platform(self.var_dir)
platform.load_extensions(platform)
# the websocket plugin (and ws tool update)
if use_websockets:
cherrypy.tools.websocket.set_platform(platform)
from ws4py.server.cherrypyserver import WebSocketPlugin
WebSocketPlugin(engine).subscribe()
# mount the app on cherrypy
platform_config = os.path.join(self.conf_dir, "platform.conf")
app = cherrypy.tree.mount(platform.controller, vispa.url.base_dynamic, platform_config)
......@@ -160,6 +173,7 @@ class Server(object):
engine.rpc.subscribe()
# store the a reference to the server in the cherrypy object
# TODO: really bad code, fix this!
cherrypy.vispaServer = self
def run(self):
......
# -*- coding: utf-8 -*-
# imports
import cherrypy
from ws4py.server.cherrypyserver import WebSocketTool
class WSTool(WebSocketTool):
def __init__(self, *args, **kwargs):
WebSocketTool.__init__(self, *args, **kwargs)
self.__platform = None
def set_platform(self, platform):
self.__platform = platform
def upgrade(self, *args, **kwargs):
super(WSTool, self).upgrade(*args, **kwargs)
setattr(cherrypy.serving.request.ws_handler, 'platform', self.__platform)
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment