Commit aed5b2d6 authored by Gero Müller's avatar Gero Müller

fix workspace locks, use fixed rpyc, allow output in bashrc/zshrc

parent 90455209
......@@ -42,10 +42,18 @@ setup(
packages=packages,
package_data={"vispa": files},
scripts=[os.path.join(srcdir, 'bin', 'vispa'), os.path.join(srcdir, 'bin', 'vispad'), os.path.join(srcdir, 'bin', 'vispa-ldap-export')],
install_requires=["sqlalchemy >= 0.9.0", "mako", "cherrypy",
"paramiko", "rpyc",
install_requires=["sqlalchemy >= 0.9.0",
"mako",
"cherrypy<9.0.0", # cherrypy removed wsgiserver, see https://github.com/Lawouach/WebSocket-for-Python/issues/205
"paramiko",
"rpyc>3.3.0",
"alembic >= 0.7.3", # for Operations.batch_alter_table
"passlib", "ws4py", "ldap3"],
"passlib",
"ws4py",
"ldap3"],
dependency_links=[
'https://github.com/geromueller/rpyc/archive/master.zip#egg=rpyc-3.4.0',
],
extras_require={"doc": ["sphinx", "sphinx-bootstrap-theme"]},
classifiers=[
"Development Status :: 5 - Production/Stable",
......
......@@ -235,6 +235,7 @@ class Server(object):
if not os.path.exists(self.cache_dir):
os.mkdir(self.cache_dir)
# conf dir
self.conf_dir = os.path.abspath(kwargs.get('configdir', ''))
vispa.setup_config(self.conf_dir)
......@@ -244,7 +245,8 @@ class Server(object):
if vispa.config('alembic', 'use_alembic', False):
logger.info("Use alembic")
vispa.models.alembic.migrate(self._engine)
if vispa.config('alembic', 'auto_migrate', True):
vispa.models.alembic.migrate(self._engine)
else:
logger.info("Do not use alembic")
sql_base.metadata.create_all(self._engine)
......@@ -363,11 +365,9 @@ class Server(object):
from vispa.tools.permission import PermissionTool
cherrypy.tools.permission = PermissionTool()
try:
if vispa.config('websockets', 'enabled', False):
from ws4py.server.cherrypyserver import WebSocketTool
cherrypy.tools.websocket = WebSocketTool()
except:
pass
def __init_platform(self, **kwargs):
cherrypy.config.update(self.__default_server_config)
......
......@@ -10,14 +10,15 @@ import os
import random
import select
import shutil
import socket
import subprocess
import sys
import tempfile
import thread
import threading
import time
import zipfile
import Queue
from threading import Thread
from vispa import AjaxException
from vispa.models.user import User
......@@ -34,6 +35,67 @@ RPYC_COMPRESSION = False
_remote_files = []
_zip_file = None
class Worker(Thread):
"""Thread executing tasks from a given tasks queue"""
def __init__(self, tasks, timeout=None):
Thread.__init__(self)
self.tasks = tasks
self.daemon = True
self.timeout = timeout
self.start()
def run(self):
while True:
try:
func, args, kargs = self.tasks.get(timeout=self.timeout)
except Queue.Empty:
return
try:
func(*args, **kargs)
except:
logger.exception("Worker")
finally:
self.tasks.task_done()
class ThreadPool:
"""Pool of threads consuming tasks from a queue"""
worker_timeout = 60
new_worker_threshold = 3
def __init__(self, min_threads=1, max_threads=10):
self.tasks = Queue.Queue()
self._min_threads = min_threads
self._max_threads = max_threads
self._workers = []
for _ in range(min_threads):
self._workers.append(Worker(self.tasks))
def add_thread(self):
# remove dead threads:
self._workers = filter(lambda w: w.is_alive(), self._workers)
if len(self._workers) < self._max_threads:
logger.debug("add new worker %d" % len(self._workers))
self._workers.append(Worker(self.tasks, self.worker_timeout))
def add_task(self, func, *args, **kargs):
"""Add a task to the queue"""
self.tasks.put((func, args, kargs))
if self.tasks.qsize() > self.new_worker_threshold:
self.add_thread()
def wait_completion(self):
"""Wait for completion of all the tasks in the queue"""
self.tasks.join()
_thread_pool = ThreadPool()
def _create_zip():
logger.debug("create workspace zip")
......@@ -282,7 +344,7 @@ class SSHConnectionImpl(object):
else:
if missing_host_key_policy != "reject":
import warnings
warnings.warn("Unknown missing host key policy: " +
warnings.warn("Unknown missing host key policy: " +
missing_host_key_policy)
self.__client.set_missing_host_key_policy(
paramiko.RejectPolicy())
......@@ -311,7 +373,7 @@ class SSHConnectionImpl(object):
host, port = self.address2host_port(address)
# Create the connection.
connect_args = {"hostname": host, "port": port, "timeout": 1}
connect_args = {"hostname": host, "port": port, "timeout": 1, "compress": False}
# TODO: check for local user
local_user = False
......@@ -473,6 +535,28 @@ class Connection(object):
else:
return ""
def find_magic_string(self):
logger.debug("find magic string")
MAGIC_STRING = "START_RPYC".encode("ascii")
MAGIC_STRING_LENGTH = len(MAGIC_STRING)
buf = b''
while True:
# fill buffer to needed size
remaining = MAGIC_STRING_LENGTH - len(buf)
buf += self._connection.stdout.read(remaining)
# compare
if buf == MAGIC_STRING:
return
# jump to next S
try:
buf = buf[buf.index(MAGIC_STRING[0], 1):]
except:
buf = b''
def connected(self, timeout=10):
_timeout = timeout
while _timeout > 0 and self._status == Connection.CONNECTING:
......@@ -572,11 +656,11 @@ class Connection(object):
"workspaceId": self.__workspaceid})
self.status(Connection.DISCONNECTED)
return
except Exception as e:
except Exception:
self.status(Connection.DISCONNECTED)
logger.exception("SSH implementation")
#raise AjaxException("Failed to setup SSH Connection!")
raise AjaxException(str(e))
# raise AjaxException("Failed to setup SSH Connection!")
return
loader_file = os.path.join(os.path.dirname(__file__),
"workspace_loader.py")
......@@ -589,13 +673,18 @@ class Connection(object):
self._send_file(_zip_file)
# wait for remote site finish
time.sleep(0.1)
self._connection.print_stderr()
self.find_magic_string()
# send
logger.info("connect rpyc")
channel = rpyc.Channel(self._connection.stream(), RPYC_COMPRESSION)
self.__rpyc = rpyc.Connection(LoggingService, channel, {})
channel = rpyc.Channel(self._connection.stream(), False)
self._connection.print_stderr()
self.__rpyc = rpyc.Connection(LoggingService, channel, {"allow_public_attrs": True})
time.sleep(.1)
logger.info("connected rpyc")
self._connection.print_stderr()
if not hasattr(self.__rpyc, "_sync_lock"):
logger.warn("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
logger.warn("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
......@@ -603,26 +692,26 @@ class Connection(object):
logger.warn("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
logger.warn("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
self._connection.set_on_feed(self._on_feed)
self._connection.print_stderr()
logger.info("get logger")
rlogger = self.__rpyc.root.getmodule('logging').getLogger()
self._connection.print_stderr()
rlogger.setLevel(logger.getEffectiveLevel())
self._connection.print_stderr()
except AjaxException:
raise
except:
self.status(Connection.DISCONNECTED)
logger.exception("SSH implementation")
time.sleep(0.1)
#raise AjaxException("Could not connect to workspace!")
# raise AjaxException("Could not connect to workspace!")
return
self._connection.print_stderr()
logger.info("connection open")
self.status(Connection.CONNECTED)
def _on_feed(self):
try:
while self.__rpyc and self.__rpyc.poll():
pass
except:
logger.exception("Connection: feed")
_thread_pool.add_task(self.__rpyc.poll)
def close(self):
if self._status in [Connection.DISCONNECTED, Connection.DISCONNECTING]:
......@@ -729,7 +818,7 @@ class ConnectionPool(object):
key = (user.id, workspace.id)
if key in self._connections and self._connections[key].active():
logger.debug(
"workspace already conected: %d - %d" %
"workspace already conected: %d - %d" %
(user.id, workspace.id))
self._connections[key].send_status()
return self._connections[key]
......
from Queue import Queue, Empty
from threading import Thread, RLock, Event
# -*- coding: utf-8 -*-
from threading import Thread
import StringIO
import imp
import logging
import os.path
import signal
import socket
import sys
import tempfile
import time
import traceback
import zipfile
import thread
import os
# make i/o binary
sys.stdin = os.fdopen(sys.stdin.fileno(), 'rb', 0)
sys.stdout = os.fdopen(sys.stdout.fileno(), 'wb', 0)
# setup logging
logging.basicConfig(
level=logging.DEBUG,
stream=sys.stderr,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
logger = logging.getLogger("vispa.workspace-loader")
sys.stdin = os.fdopen(sys.stdin.fileno(), 'rb', 0)
sys.stdout = os.fdopen(sys.stdout.fileno(), 'wb', 0)
# use provided loglevel
try:
loglevel = int(sys.stdin.readline())
logging.getLogger().setLevel(loglevel)
except:
pass
loglevel = 0
logging.getLogger().setLevel(loglevel)
l = int(sys.stdin.readline())
zipbuffer = sys.stdin.read(l)
f = StringIO.StringIO()
f.write(zipbuffer)
packagezip = zipfile.ZipFile(f)
# read zipped packages
length = int(sys.stdin.readline())
zipbuffer = sys.stdin.read(length)
iobuffer = StringIO.StringIO(zipbuffer)
packagezip = zipfile.ZipFile(iobuffer)
class InMemoryZipImporter:
......@@ -109,7 +110,7 @@ class InMemoryZipImporter:
exec(code, mod.__dict__)
return mod
# install memory loaded
# install memory loader
sys.meta_path.insert(0, InMemoryZipImporter(packagezip))
......@@ -162,31 +163,23 @@ def dump_thread_status_on_signal(signal, stack):
"vispa-workspace-loader-%d.html" % os.getpid())
dump_thread_status(f)
except Exception as e:
print str(e)
sys.stderr.write((str(e)))
# install dump threads signal
signal.signal(signal.SIGUSR2, dump_thread_status_on_signal)
logger.debug("load rpyc")
import rpyc
import vispa.remote
class Service(rpyc.Service):
def on_connect(self):
pass
def on_disconnect(self):
pass
def exposed_getmodule(self, name):
"""imports an arbitrary module"""
return __import__(name, None, None, "*", -1)
class RemoteHandler(logging.Handler):
def __init__(self, handler):
......@@ -197,117 +190,15 @@ class RemoteHandler(logging.Handler):
msg = self.format(record)
self._handler(record.name, record.levelno, msg)
class Worker(Thread):
"""Thread executing tasks from a given tasks queue"""
def __init__(self, tasks, timeout=None):
Thread.__init__(self)
self.tasks = tasks
self.daemon = True
self.timeout = timeout
self.start()
def run(self):
while True:
try:
func, args, kargs = self.tasks.get(timeout=self.timeout)
except Empty:
return
try:
func(*args, **kargs)
except Exception as e:
logger.exception("Worker")
finally:
self.tasks.task_done()
class ThreadPool:
"""Pool of threads consuming tasks from a queue"""
worker_timeout = 60
new_worker_threshold = 3
def __init__(self, min_threads=1, max_threads=10):
self.tasks = Queue()
self._min_threads = min_threads
self._max_threads = max_threads
self._workers = []
for _ in range(min_threads):
self._workers.append(Worker(self.tasks))
def add_thread(self):
# remove dead threads:
self._workers = filter(lambda w: w.is_alive(), self._workers)
if len(self._workers) < self._max_threads:
logger.debug("add new worker %d" % len(self._workers))
self._workers.append(Worker(self.tasks, self.worker_timeout))
def add_task(self, func, *args, **kargs):
"""Add a task to the queue"""
self.tasks.put((func, args, kargs))
if self.tasks.qsize() > self.new_worker_threshold:
self.add_thread()
def wait_completion(self):
"""Wait for completion of all the tasks in the queue"""
self.tasks.join()
class RpycConnection(rpyc.Connection):
POLL_TIMEOUT = 3
REQUEST_TIMEOUT = 60
def __init__(self, service, channel, config={}, _lazy=False):
rpyc.Connection.__init__(self, service, channel, config, _lazy)
self._sync_lock = RLock()
self._sync_event = Event()
def serve_threaded(self, idle_timeout=3600):
thread_pool = ThreadPool()
try:
idle_start = None
while True:
if self._sync_lock.acquire(False) is not False:
self._sync_event.clear()
try:
data = self._recv(self.POLL_TIMEOUT, wait_for_lock=True)
if data is None:
if idle_start is None:
idle_start = time.time()
elif (time.time() - idle_start) > idle_timeout:
break
else:
idle_start = None
thread_pool.add_task(self._dispatch, data)
except:
logger.exception("threaded serve")
finally:
self._sync_event.set()
self._sync_lock.release()
else:
self._sync_event.wait(idle_timeout)
# conn._dispatch(data)
# else:
# logger.debug("daemon next loop")
finally:
thread_pool.wait_completion()
def main_loop(fin, fout):
# connect the rpyc server to fifo
stream = rpyc.PipeStream(fin, fout)
channel = rpyc.Channel(stream)
vispa.remote.connection = RpycConnection(
channel = rpyc.Channel(stream, False)
vispa.remote.connection = rpyc.Connection(
Service, channel, {
'allow_public_attrs': True, 'allow_setattr': True})
vispa.remote.log = rpyc.async(vispa.remote.connection.root.log)
vispa.remote.send = rpyc.async(vispa.remote.connection.root.send)
vispa.remote.log = vispa.remote.connection.root.log
vispa.remote.send = vispa.remote.connection.root.send
exising_handlers = []
try:
......@@ -315,46 +206,51 @@ def main_loop(fin, fout):
logging.getLogger().handlers = [RemoteHandler(vispa.remote.log)]
logger.info("connected, serving")
except:
logging.getLogger().handlers = exising_handlers
logger.exception("main_loop")
return
logger.debug("daemon loop")
logger.info("daemon loop")
try:
try:
vispa.remote.connection.serve_threaded()
#while True:
# vispa.remote.connection.serve()
except EOFError:
# log_exception()
pass
except Exception:
# log_exception()
if not vispa.remote.connection.closed:
raise
vispa.remote.connection.serve_threaded()
except EOFError:
logging.getLogger().handlers = exising_handlers
logger.exception("main_loop")
pass
except Exception:
logging.getLogger().handlers = exising_handlers
logger.exception("main_loop")
finally:
logging.getLogger().handlers = exising_handlers
vispa.remote.connection.close()
logger.debug("daemon done")
# notify subprocesses that we are on a workspace
os.environ['VISPA_WORKSPACE'] = "1"
# send magic string to notify server that the following output is valid
sys.stdout.write("START_RPYC".encode("ascii"))
# prepare stdin/stdout to be used by RPYC
# redirect python sys.stdout and ALL output to err to /dev/null
sys.stdout.flush()
ssh_stdout = sys.stdout
ssh_stdin = sys.stdin
sys.stdout = file(os.devnull, 'w', 0)
# keep fd so ssh does not shutdown
real_stderr = os.dup(2)
# redirect fd 2 to /dev/null. this even works for 3rd party libraries like ROOT
os.dup2(os.open(os.devnull, os.O_WRONLY | os.O_NONBLOCK), 2)
# start main loop in thread
t = Thread(target=main_loop, args=(ssh_stdin, ssh_stdout))
t.daemon = True
t.start()
# wait for main loop to finish
while t.is_alive():
t.join(0.5)
time.sleep(0.5)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment