Commit e9951b4a authored by Benjamin Fischer's avatar Benjamin Fischer
Browse files

Reworked WatchService: now implemented as its own class

also implemented remote side event buffering refs #1955, #1952
parent b25e0384
......@@ -43,9 +43,6 @@ var CodeEditor = Emitter.extend({
if (data.watch_id != 'code') {
return;
}
if (data.action_name == 'attrib') {
return;
}
if (self.modified_processing)
return;
if (data.mtime == -1) {
......@@ -62,7 +59,7 @@ var CodeEditor = Emitter.extend({
}
self.modified_processing = false;
});
} else if (Math.abs(data.mtime - self.mtime) > 0.5 && !self.saving_processing) { //here: 0.5s is an allowed time between the two mtimes
} else if (data.mtime > self.mtime && !self.saving_processing) {
self.modified_processing = true;
self.view.confirm(
"The file " + self.path + " has changed on disk." +
......
......@@ -48,7 +48,7 @@ var CodeEditorPreview = Emitter.extend({
if (data.watch_id != 'preview') {
return;
}
if (data.action_name == 'attrib') {
if (data.event != 'modify') {
return;
}
self.previewChanged = true;
......
......@@ -154,7 +154,7 @@ var FileBrowserView = vispa.ExtensionView.Center.extend({
// register watch event
this.onSocket('watch', function(data){
if (~['attrib','modify'].indexOf(data.action_name))
if (data.event != 'change')
return
self.fb.updateView(100)
})
......@@ -279,7 +279,7 @@ var FileSelectorView = vispa.ExtensionView.Dialog.extend({
// register watch event
this.onSocket('watch', function(data){
if (~['attrib','modify'].indexOf(data.action_name))
if (data.event != 'change')
return
self.fb.updateView(100)
})
......
......@@ -5,7 +5,8 @@ from StringIO import StringIO
from distutils.spawn import find_executable
from mimetypes import guess_type
from zipfile import ZipFile
from threading import Lock
from threading import Lock, Thread
from time import time
import ConfigParser
import json
import logging
......@@ -49,15 +50,9 @@ class FileSystem(object):
def __init__(self, userid, workspaceid):
# allowed extensions
self.allowed_extensions = FileSystem.FILE_EXTENSIONS
self.watchservice = WatchService()
self._userid = userid
self._workspaceid = workspaceid
self._monitor_thread = fsmonitor.FSMonitorThread(self._monitor_callback)
self._monitor_watches = {}
self._monitor_listener = {}
self._monitor_patterns = {}
self._monitor_reverse = {}
self._monitor_lock = Lock()
self._monitor_watch_workspaceini = None
def __del__(self):
self.close()
......@@ -524,132 +519,17 @@ class FileSystem(object):
else:
return self.get_file_content(path)
def _monitor_callback(self, event):
if event.action_name == 'access':
return
if os.path.exists(event.watch.path):
mtime = os.path.getmtime(event.watch.path)
else:
mtime = -1
if event.watch == self._monitor_watch_workspaceini:
if mtime == -1 or event.action_name == 'modify':
vispa.remote.send_topic('workspace.ini_modified',
user_id=self._userid, data={
"workspaceId": self._workspaceid,
"mtime": mtime
})
if mtime == -1:
self._monitor_thread.remove_watch(event.watch)
self._monitor_watch_workspaceini = None
return
else:
with self._monitor_lock:
# inform all listener
for combined_id in event.watch.listener:
if event.name and combined_id in self._monitor_patterns and \
self._monitor_patterns[combined_id].search(event.name) !=\
self._monitor_reverse[combined_id]:
continue
vispa.remote.send_topic(
"extension.%s.socket.watch" % combined_id[1],
window_id=combined_id[0],
data={
"subject": event.name,
"path": event.watch.path,
"action_name": event.action_name,
"watch_id": combined_id[2],
"mtime": mtime
})
# cleanup yourself
if event.action_name in ['delete self','move self']:
for combined_id in event.watch.listener:
del self._monitor_listener[combined_id]
del self._monitor_patterns[combined_id]
del self._monitor_reverse[combined_id]
event.watch.listener = []
self._check_watch(event.watch)
def _check_watch(self, watch):
if len(watch.listener) == 0:
del self._monitor_watches[watch.path]
self._monitor_thread.remove_watch(watch)
def watch(self, path, window_id, view_id, watch_id, pattern=None, reverse=False):
# fail if there is no such fie
path = self.expand(path)
if not os.path.exists(path):
return "The file does not exist"
combined_id = (window_id, view_id, watch_id)
with self._monitor_lock:
# update pattern
if pattern:
if combined_id not in self._monitor_patterns or self._monitor_patterns[combined_id].pattern != pattern:
self._monitor_patterns[combined_id] = re.compile(pattern)
self._monitor_reverse[combined_id] = reverse
else:
if combined_id in self._monitor_patterns:
del self._monitor_patterns[combined_id]
del self._monitor_reverse[combined_id]
# watch is in use
if combined_id in self._monitor_listener:
# has path changed?
if self._monitor_listener[combined_id].path == path:
return ""
else:
self._monitor_listener[combined_id].listener.remove(combined_id)
self._check_watch(self._monitor_listener[combined_id])
del self._monitor_listener[combined_id]
# create or get the desired watch
if path not in self._monitor_watches:
if os.path.isfile(path):
watch = self._monitor_thread.add_file_watch(path)
elif os.path.isdir(path):
watch = self._monitor_thread.add_dir_watch(path)
else:
return "This kind of file can't be wachted"
watch.listener = []
self._monitor_watches[path] = watch
else:
watch = self._monitor_watches[path]
if combined_id in watch.listener:
return "This file is already watched" # this should never occur
watch.listener.append(combined_id)
self._monitor_listener[combined_id] = watch
self.watchservice.subscribe((window_id, view_id, watch_id), path, pattern, reverse)
return ""
def unwatch(self, window_id, view_id, watch_id=None):
with self._monitor_lock:
if watch_id is None:
# will perform poor with lots of watches/listener
for path, watch in self._monitor_watches.items():
for combined_id in watch.listener:
if window_id==combined_id[0] and view_id==combined_id[1]:
watch.listener.remove(combined_id)
del self._monitor_listener[combined_id]
del self._monitor_patterns[combined_id]
del self._monitor_reverse[combined_id]
self._check_watch(watch)
else:
combined_id = (window_id, view_id, watch_id)
if combined_id in self._monitor_listener:
self._monitor_listener[combined_id].listener.remove(combined_id)
self._check_watch(self._monitor_listener[combined_id])
del self._monitor_listener[combined_id]
del self._monitor_patterns[combined_id]
del self._monitor_reverse[combined_id]
self.watchservice.unsubscribe((window_id, view_id, watch_id))
return ""
def get_workspaceini(self, request, fail_on_missing=False):
......@@ -658,8 +538,11 @@ class FileSystem(object):
config = ConfigParser.ConfigParser()
config.read([FileSystem.GLOBAL_WORKSPACE_CONF,
self.expand(FileSystem.PRIVATE_WORKSPACE_CONF)])
mtime = os.path.getmtime(self.expand(FileSystem.PRIVATE_WORKSPACE_CONF))
self._watch_workspaceini()
if self.exists(FileSystem.PRIVATE_WORKSPACE_CONF):
mtime = self.get_mtime(FileSystem.PRIVATE_WORKSPACE_CONF)
self._watch_workspaceini()
else:
mtime = -1
if not isinstance(request_dict, dict):
request_dict = dict.fromkeys(config.sections(), True)
data = {}
......@@ -712,12 +595,196 @@ class FileSystem(object):
return str(e)
def _watch_workspaceini(self):
if self._monitor_watch_workspaceini:
if self.exists(FileSystem.PRIVATE_WORKSPACE_CONF, 'f'):
self.watchservice.subscribe((self._userid, self._workspaceid), FileSystem.PRIVATE_WORKSPACE_CONF)
class WatchService(object):
def __init__(self):
self.subscriber_buffer = []
self.subscribers = {}
self.watches = {}
self.lock = Lock()
self.monitor = fsmonitor.FSMonitor()
self.run = True
self.thread = Thread(target=self._worker)
self.thread.start()
def subscribe(self, id, path, pattern=None, reverse=False):
if not path:
return self.unsubscribe(id)
path = os.path.expanduser(os.path.expandvars(path))
with self.lock:
if id not in self.subscribers:
WatchSubscriber(self, id)
self.subscribers[id].update(path, pattern, reverse)
def unsubscribe(self, id):
with self.lock:
if hasattr(id, '__contains__') and None in id:
for subscriber in self.subscribers.values():
if False not in map(lambda e,c: c is None or e == c, subscriber.id, id):
subscriber.destroy()
elif id in self.subscribers:
self.subscribers[id].destroy()
def stop(self):
self.run = False
def _worker(self):
while self.run:
events = self.monitor.read_events(0.05)
if events:
with self.lock:
for event in events:
if event.action_name in ['delete self','move self']:
kind = 'vanish'
elif event.action_name == 'modify':
kind = 'modify'
elif event.watch.isdir and event.action_name in ['create','delete','move from','move to']:
kind = 'change'
else:
kind = None
if kind:
if not event.watch.isdir:
if os.path.exists(event.watch.path):
event.watch.mtime = os.path.getmtime(event.watch.path)
else:
event.watch.mtime = -1
for subscriber in event.watch.subscribers[:]:
subscriber.process(kind, event.name)
if self.subscriber_buffer:
with self.lock:
for subscriber in self.subscriber_buffer[:]:
subscriber.flush(False)
for subscriber in self.subscribers:
subscriber.destroy()
self.monitor.remove_all_watches()
self.monitor.close()
class WatchSubscriber(object): # this should never be instanced manually
EVENT_DELAYS = {
'change': [1.0,0.1],
'modify': [1.0,0.2]
}
def __init__(self, service, id):
if not isinstance(service, WatchService):
raise TypeError("No valid WatchService instance was provided")
if id in service.subscribers:
raise RuntimeError("There is already a subscriber with this id: "+str(id))
self.id = id
self.service = service
self.service.subscribers[self.id] = self
self.watch = None
self.pattern = None
self.reverse = None
self.event_buffer = {}
def destroy(self):
self.unbind()
if self in self.service.subscriber_buffer:
self.service.subscriber_buffer.remove(self)
del self.service.subscribers[self.id]
del self.service
del self.watch
del self.event_buffer
def process(self, event, subject=""):
if self.watch.isdir and subject and self.pattern and self.pattern.search(subject) != self.reverse:
return
if not self.exists(FileSystem.PRIVATE_WORKSPACE_CONF, 'f'):
if event in WatchSubscriber.EVENT_DELAYS:
now = time()
if event in self.event_buffer:
self.event_buffer[event][1] = now + WatchSubscriber.EVENT_DELAYS[event][1]
else:
self.event_buffer[event] = [now + delay for delay in WatchSubscriber.EVENT_DELAYS[event]] #first & last event
if self not in self.service.subscriber_buffer:
self.service.subscriber_buffer.append(self)
else:
self.emit(event)
def flush(self, force=False):
now = time()
for event, delays in self.event_buffer.items():
if force or min(delays) < now:
self.emit(event)
del self.event_buffer[event]
if not self.event_buffer and self in self.service.subscriber_buffer:
self.service.subscriber_buffer.remove(self)
def emit(self, event):
if len(self.id) == 3: # window_id, view_id, watch_id
data = {
'event': event,
'path': self.watch.path,
'watch_id': self.id[2]
}
if not self.watch.isdir:
data['mtime'] = self.watch.mtime
vispa.remote.send_topic("extension.%s.socket.watch" % self.id[1], window_id=self.id[0], data=data)
elif len(self.id) == 2: # userid, workspaceid
vispa.remote.send_topic('workspace.ini_modified', user_id=self.id[0], data={
"workspaceId": self.id[1],
"mtime": self.watch.mtime
})
elif hasattr(self.id, '__call__'):
self.id(event, self)
def update(self, path, pattern="", reverse=False):
self.bind(path)
if self.watch.isdir and pattern:
if not self.pattern or self.pattern.pattern != pattern:
self.pattern = re.compile(pattern)
self.reverse = reverse
else:
self.pattern = None
self.reverse = None
def bind(self, path):
if self.watch:
if self.watch.path == path:
return
else:
self.unbind()
if path not in self.service.watches:
if not os.path.exists(path):
raise IOError("File to be watched does not exist: %s" % path)
if os.path.isfile(path):
isdir = False
watch = self.service.monitor.add_file_watch(path)
elif os.path.isdir(path):
isdir = True
watch = self.service.monitor.add_dir_watch(path)
else:
raise IOError("This kind of file can't be watched!")
watch.isdir = isdir
watch.subscribers = []
self.service.watches[path] = watch
else:
watch = self.service.watches[path]
self.watch = watch
if self not in watch.subscribers:
watch.subscribers.append(self)
def unbind(self):
if not self.watch:
return
self._monitor_watch_workspaceini = self._monitor_thread.add_file_watch(
self.expand(FileSystem.PRIVATE_WORKSPACE_CONF))
self.watch.subscribers.remove(self)
if len(self.watch.subscribers) == 0:
del self.service.watches[self.watch.path]
self.service.monitor.remove_watch(self.watch)
self.watch = None
def string_compare(a, b):
if a == b:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment