socketbus.py 9.43 KB
Newer Older
1
2
3
# -*- coding: utf-8 -*-

# imports
Gero Müller's avatar
Gero Müller committed
4
import thread
5
6
7
8
import vispa
import cherrypy
from time import time, sleep
from threading import Thread
9
from json import loads, dumps
10
import logging
Gero Müller's avatar
Gero Müller committed
11
import Queue
12
13
14
15
16

logger = logging.getLogger(__name__)

# use websockets?
USE_SOCKETS = vispa.config("websockets", "enabled", False)
Gero Müller's avatar
Gero Müller committed
17
# subscribers stored as {window_id: Publisher}
18
SUBSCRIBERS = {}
Gero Müller's avatar
Gero Müller committed
19
# a dict, user_id -> window_ids (list)
20
USER_SESSIONS = {}
Gero Müller's avatar
Gero Müller committed
21
# a dict, window_id -> user_id
22
23
24
25
26
27
28
SESSION_OWNERS = {}
# timestamps (only for PollingPublishers) to handle
# the deletion of their entry in SUBSCRIBERS
# (SocketPublishers are removed automatically)
POLLING_TIMESTAMPS = {}


Gero Müller's avatar
Gero Müller committed
29
30
31
def add_session(window_id, user_id, publisher):
    logger.debug("add session %s for user %s" % (window_id, user_id))
    window_id = unicode(window_id)
32
    user_id = unicode(user_id)
Gero Müller's avatar
Gero Müller committed
33

34
    # add SUBSCRIBERS entry
Gero Müller's avatar
Gero Müller committed
35
    SUBSCRIBERS[window_id] = publisher
36
37
    # add POLLING_TIMESTAMPS entry?
    if isinstance(publisher, PollingPublisher):
Gero Müller's avatar
Gero Müller committed
38
        POLLING_TIMESTAMPS[window_id] = int(time())
39
40
41
    # add USER_SESSIONS entry
    if user_id not in USER_SESSIONS.keys():
        USER_SESSIONS[user_id] = []
Gero Müller's avatar
Gero Müller committed
42
43
    if window_id not in USER_SESSIONS[user_id]:
        USER_SESSIONS[user_id].append(window_id)
44
    # add SESSION_OWNERS entry
Gero Müller's avatar
Gero Müller committed
45
46
    SESSION_OWNERS[window_id] = user_id

47
48
    vispa.publish("bus.session_added", window_id, user_id)

49

50
def remove_session(window_id, delay=False):
Gero Müller's avatar
Gero Müller committed
51
52
    logger.debug("remove session %s" % window_id)
    window_id = unicode(window_id)
53
54

    # remove from SUBSCRIBERS
Gero Müller's avatar
Gero Müller committed
55
56
    if window_id in SUBSCRIBERS.keys():
        del SUBSCRIBERS[window_id]
57
58
59
60
        
    if delay:
        POLLING_TIMESTAMPS[window_id] = int(time())

61
    # remove from SESSION_OWNERS
62
    user_id = None
Gero Müller's avatar
Gero Müller committed
63
    last_lession = False
Gero Müller's avatar
Gero Müller committed
64
65
    if window_id in SESSION_OWNERS.keys():
        user_id = SESSION_OWNERS[window_id]
66
67
        # remove from USER_SESSIONS
        if user_id in USER_SESSIONS.keys():
Gero Müller's avatar
Gero Müller committed
68
            USER_SESSIONS[user_id].remove(window_id)
69
            if not len(USER_SESSIONS[user_id]):
Gero Müller's avatar
Gero Müller committed
70
                last_lession = True
71
72
                del USER_SESSIONS[user_id]
        # delete the entry in SESSION_OWNERS
Gero Müller's avatar
Gero Müller committed
73
74
        del SESSION_OWNERS[window_id]
    if window_id in POLLING_TIMESTAMPS.keys():
75
        # delete the timestamp
Gero Müller's avatar
Gero Müller committed
76
        del POLLING_TIMESTAMPS[window_id]
77

78
    vispa.publish("bus.session_removed", window_id, user_id)
Benjamin Fischer's avatar
Benjamin Fischer committed
79
    
Gero Müller's avatar
Gero Müller committed
80
81
82
    if last_lession:
        logger.debug("no more user sessions %d" % int(user_id))
        vispa.publish("bus.all_user_sessions_removed", int(user_id))
83

84
85
86
87
88
89
90
91
92
93
94
95
def get_polling_publisher(window_id, user_id):
    POLLING_TIMESTAMPS[window_id] = int(time())
    if window_id in SUBSCRIBERS.keys():
        publisher = SUBSCRIBERS[window_id]
        if not isinstance(publisher, PollingPublisher):
            return None
    else:
        publisher = PollingPublisher(window_id, user_id)
        add_session(window_id, user_id, publisher)
    return publisher


96
class Bus:
Gero Müller's avatar
Gero Müller committed
97

98
99
100
101
102
    def __init__(self):
        # pub/sub data storage
        self._handlers = {}

        # configure and start the cleaner thread
Gero Müller's avatar
Gero Müller committed
103
104
        self.cleaner = CleanerThread(name="SocketBusPollingCleaner")
        self.cleaner.start()
105

Gero Müller's avatar
Gero Müller committed
106
    def send(self, window_id=None, user_id=None, data=None,
107
108
             except_sessions=None, binary=False, encode_json=True,
             broadcast=False):
109
        except_sessions = except_sessions or []
Gero Müller's avatar
Gero Müller committed
110
111
112
113
        if window_id is not None:
            window_id = unicode(window_id)
        if user_id is not None:
            user_id = unicode(user_id)
114

115
116
        logger.debug("send to window_id '%s', user_id '%s', broadcast %s" % (window_id, user_id, str(broadcast)))

117
118
119
120
121
        if broadcast:
            logger.debug("broadcast")
            subscribers = SUBSCRIBERS.values()
        else:
            subscribers = []
Gero Müller's avatar
Gero Müller committed
122
123
124
125
126
            # window_id is preferred over user_id
            if window_id:
                if window_id in SUBSCRIBERS.keys():
                    if window_id not in except_sessions:
                        subscribers.append(SUBSCRIBERS[window_id])
127
128
129
            elif user_id:
                user_id = unicode(user_id)
                if user_id in USER_SESSIONS.keys():
Gero Müller's avatar
Gero Müller committed
130
131
132
                    for window_id in USER_SESSIONS[user_id]:
                        if window_id not in except_sessions:
                            subscribers.append(SUBSCRIBERS[window_id])
133

Marcel Rieger's avatar
Marcel Rieger committed
134
        if encode_json:
Benjamin Fischer's avatar
Benjamin Fischer committed
135
            data = dumps(data)
136
        for subscriber in subscribers:
137
            logger.debug("  -> sending to subscriber: window_id '%s', user_id '%s'" % (subscriber.window_id, subscriber.user_id))
138
            subscriber.send(data, binary=binary)
139
140
141

    def send_topic(self, topic, *args, **kwargs):
        data = kwargs.get("data", None)
142
        data = {"data": data, "topic": topic}
143
144
145
        kwargs["data"] = data
        self.send(*args, **kwargs)

Gero Müller's avatar
Gero Müller committed
146
    def received_message(self, msg, window_id):
147
148
        # json?
        try:
149
            data = loads(str(msg))
150
151
152
153
        except:
            data = None
        # atm, we cannot do anything when the msg
        # is not json parsable or has no key 'topic'
154
        if not data or "topic" not in data.keys():
155
            return
Gero Müller's avatar
Gero Müller committed
156
157
        # window_id has to be set for safety reasons
        if not window_id:
158
            return
Gero Müller's avatar
Gero Müller committed
159
        self.__publish(data["topic"], window_id, data=data)
160

Gero Müller's avatar
Gero Müller committed
161
    def __publish(self, topic, window_id, data=None):
162
163
164
        if topic in self._handlers.keys():
            # user_id?
            user_id = None
Gero Müller's avatar
Gero Müller committed
165
166
            if window_id in SESSION_OWNERS.keys():
                user_id = SESSION_OWNERS[window_id]
167
            for handler in self._handlers[topic]:
Gero Müller's avatar
Gero Müller committed
168
                handler(window_id=window_id, user_id=user_id, data=data,
169
                        topic=topic)
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190

    def subscribe(self, topic, handler):
        if topic not in self._handlers.keys():
            self._handlers[topic] = []
        self._handlers[topic].append(handler)

    def unsubscribe(self, topic, handler=None):
        if topic in self._handlers.keys():
            if handler is None:
                del self._handlers[topic]
            elif handler in self._handlers[topic]:
                self._handlers[topic].remove(handler)


class SocketPublisher:
    pass

if USE_SOCKETS:
    from ws4py.websocket import WebSocket

    class SocketPublisher(WebSocket):
Gero Müller's avatar
Gero Müller committed
191

192
193
194
195
        def __init__(self, *args, **kwargs):
            WebSocket.__init__(self, *args, **kwargs)
            # self.sesseionid and self.user_id are set in 'socket_hook'
            # in the bus controller
Gero Müller's avatar
Gero Müller committed
196
            self.window_id = None
197
198
199
            self.user_id = None

        def store(self):
Gero Müller's avatar
Gero Müller committed
200
201
            if self.window_id:
                add_session(self.window_id, self.user_id, self)
202
203
                # prevent session from being removed soon
                POLLING_TIMESTAMPS[self.window_id] = int(time()) + 48 * 3600
204

205
206
        def closed(self, *args, **kwargs):
            super(SocketPublisher, self).closed(*args, **kwargs)
Gero Müller's avatar
Gero Müller committed
207
            if self.window_id:
208
                remove_session(self.window_id, True)
209
210

        def received_message(self, msg):
Gero Müller's avatar
Gero Müller committed
211
            vispa.bus.received_message(msg.data, self.window_id)
212
213
214
215


class PollingPublisher:

216
    def __init__(self, window_id, user_id):
Gero Müller's avatar
Gero Müller committed
217
        self.window_id = window_id
218
        self.user_id = user_id
Gero Müller's avatar
Gero Müller committed
219
        self._queue = Queue.Queue()
Gero Müller's avatar
Gero Müller committed
220
        self._waiting_thread_id = None
Gero Müller's avatar
Gero Müller committed
221
222
223
224
225
226
227
228
        cherrypy.engine.subscribe("stop", self._stop, 10)

    def _stop(self):
        logger.debug("stop poller")
        q = self._queue
        self._queue = None
        q.put("{}")
        logger.debug("stop poller done")
229

Gero Müller's avatar
Gero Müller committed
230
    def send(self, data, binary=False, timeout=None):
Gero Müller's avatar
Gero Müller committed
231
232
        if self._queue is not None:
            self._queue.put(data, timeout=timeout)
233
234
        # TODO: implement binary data

Gero Müller's avatar
Gero Müller committed
235
    def fetch(self, timeout=None):
Gero Müller's avatar
Gero Müller committed
236
237
        if self._queue is None:
            return None
Gero Müller's avatar
Gero Müller committed
238
239

        # finish existing thread by feeding empty event
Gero Müller's avatar
Gero Müller committed
240
241
        other_wainting = self._waiting_thread_id != thread.get_ident()
        if self._waiting_thread_id is not None and other_wainting:
Gero Müller's avatar
Gero Müller committed
242
243
244
245
246
            self._queue.put("{}")

        while self._waiting_thread_id is not None:
            sleep(0.01)

Gero Müller's avatar
Gero Müller committed
247
        try:
Gero Müller's avatar
Gero Müller committed
248
            self._waiting_thread_id = thread.get_ident()
249
250
251
252
253
254
255
256
            result = [self._queue.get(timeout=timeout)]
            while True:
                try:
                    r = self._queue.get(timeout=0)
                    result.append(r)
                except Queue.Empty:
                    break
            return result
Gero Müller's avatar
Gero Müller committed
257
258
        except Queue.Empty:
            return None
Gero Müller's avatar
Gero Müller committed
259
260
261
        finally:
            self._waiting_thread_id = None

262
    def received_message(self, msg):
Gero Müller's avatar
Gero Müller committed
263
        vispa.bus.received_message(msg, self.window_id)
264
265


Gero Müller's avatar
Gero Müller committed
266
class CleanerThread(Thread):
267

Gero Müller's avatar
Gero Müller committed
268
269
    def __init__(self, *kargs, **kwargs):
        Thread.__init__(self, *kargs, **kwargs)
Gero Müller's avatar
Gero Müller committed
270
271
        self.daemon = True
        self.running = True
Gero Müller's avatar
Gero Müller committed
272
273
        self.session_timeout = vispa.config("web", "session_timeout", 3600)
        self.session_interval = vispa.config("web", "session_interval", 60)
Gero Müller's avatar
Gero Müller committed
274
        cherrypy.engine.subscribe("exit", self._exit)
Gero Müller's avatar
Gero Müller committed
275

Gero Müller's avatar
Gero Müller committed
276
    def _exit(self):
Gero Müller's avatar
Gero Müller committed
277
278
279
        self._running = False

    def run(self):
Gero Müller's avatar
Gero Müller committed
280
        logger.info("start cleaning thread: %d / %d", self.session_timeout, self.session_interval)
Gero Müller's avatar
Gero Müller committed
281
        while self.running:
Gero Müller's avatar
Gero Müller committed
282
            for window_id, stamp in POLLING_TIMESTAMPS.items():
Gero Müller's avatar
Gero Müller committed
283
284
                last_access = int(time()) - stamp
                if last_access > self.session_timeout:
285
                    # delete the publisher and all its data
Gero Müller's avatar
Gero Müller committed
286
                    remove_session(window_id)
Gero Müller's avatar
Gero Müller committed
287
            sleep(self.session_interval)
Gero Müller's avatar
Gero Müller committed
288
        logger.info("stop cleaning thread")