Commit 8e87c71d authored by Gero Müller's avatar Gero Müller
Browse files

make workspace connection async, fix server shutdown

parent 4764c496
......@@ -172,7 +172,8 @@ class AjaxController(AbstractController):
if not isinstance(workspace, Workspace):
raise MessageException("Unknown Workspace! (id: %s)" % wid)
try:
vispa.workspace.connect(workspace, user, db, password)
connection = vispa.workspace.connect(workspace, user, db, password)
connection.connected()
except (paramiko.AuthenticationException, paramiko.SSHException):
return {"wrong_password": True}
......
# -*- coding: utf-8 -*-
import sys
import os
import cherrypy
import logging
import inspect
from logging import config as loggingcfg
from pkgutil import iter_modules
import importlib
import sqlalchemy
import inspect
import logging
import os
import sys
import vispa.url
import vispa.plugins.template
import vispa.extensions
from sqlalchemy.orm.scoping import scoped_session
from sqlalchemy.orm.session import sessionmaker
from vispa.models import Base as sql_base
import vispa.models.alembic
from vispa.models.preference import *
from vispa.models.stats import *
from vispa.models.user import *
from vispa.models.workspace import *
from vispa.models.stats import *
from vispa.models.preference import *
from vispa.models.shortcuts import *
import cherrypy
import sqlalchemy
import vispa.extensions
import vispa.plugins.template
import vispa.url
from vispa.models.shortcuts import *
import vispa.models.alembic
logger = logging.getLogger(__name__)
......@@ -298,6 +300,7 @@ class Server(object):
vispa.setup_thread_dump()
def stop(self):
vispa.publish("shutdown")
cherrypy.engine.stop()
cherrypy.engine.exit()
self._engine.dispose()
......
......@@ -126,7 +126,7 @@ class LoggingService(rpyc.Service):
logging.getLogger(name).log(level, msg)
class LocalConnection(object):
class LocalConnectionImpl(object):
def __init__(self, command, **kwargs):
......@@ -240,7 +240,7 @@ class RpycConnection(rpyc.Connection):
return obj
class SSHConnection(object):
class SSHConnectionImpl(object):
"""
An SSH transport for Pushy, which uses Paramiko.
......@@ -267,9 +267,10 @@ class SSHConnection(object):
- allow_agent
- look_for_keys
"""
self.__client = paramiko.SSHClient()
self.__client.load_system_host_keys()
# only do this when connecting as local user, SLOW!!
if False:
self.__client.load_system_host_keys()
# Set the missing host key policy.
if isinstance(missing_host_key_policy, str):
......@@ -328,7 +329,7 @@ class SSHConnection(object):
"look_for_keys"):
if name in kwargs:
connect_args[name] = kwargs[name]
logger.debug("connect via ssh to workspace: %s" % address)
self.__client.connect(**connect_args)
# Join arguments into a string
......@@ -338,11 +339,13 @@ class SSHConnection(object):
args[i] = "'%s'" % args[i]
command = " ".join(args)
command = '/bin/sh -c \"exec $SHELL -l -c \\\"%s\\\"\"' % command
logger.debug("execute remote command: %s" % command)
stdin, stdout, stderr = self.__client.exec_command(command)
stdin.channel.settimeout(self.TIMEOUT)
self.stdin = WrappedChannelFile(stdin, 1)
self.stdout = WrappedChannelFile(stdout, 0)
self.stderr = WrappedChannelFile(stderr, 0)
logger.debug("ssh connection established")
def address2host_port(self, host):
host_port = host.split(':')
......@@ -413,6 +416,17 @@ class SSHConnection(object):
class Connection(object):
DISCONNECTED = 0
ESTABLISH_CONNECTION = 1
SETUP_REMOTE = 2
CONNECTED = 3
STATUS_MESSAGES = {
DISCONNECTED: 'disconnected',
ESTABLISH_CONNECTION: 'establish connection via ssh or local',
SETUP_REMOTE: 'setup remote system',
CONNECTED: 'connected'}
def __init__(self, host, **kwargs):
self.__host = host
self.__python = kwargs.get('python', 'python') or "python"
......@@ -424,6 +438,7 @@ class Connection(object):
self.__password = kwargs.get("password", "")
self.__key = kwargs.get("key", "")
self._connection = None
self._status = Connection.DISCONNECTED
def _send_file(self, filename):
logger.debug("Send file to remote connection: %s" % filename)
......@@ -436,9 +451,40 @@ class Connection(object):
irdy, _, _ = select.select([self._connection.stdout], [], [], 10)
if irdy:
return self._connection.stdout.readline()
return ""
else:
return ""
def connected(self, timeout=10):
_timeout = timeout
while _timeout > 0 and self._status > Connection.DISCONNECTED and self._status < Connection.CONNECTED:
logger.debug("active but not connected, waiting")
time.sleep(0.1)
_timeout -= 0.1
if self._status == Connection.CONNECTED:
return True
else:
return False
def active(self):
if self._status == Connection.DISCONNECTED:
return False
elif self._status == Connection.CONNECTED:
try:
self.__rpyc.ping()
return True
except:
logger.exception("active ping")
self.close()
return False
else:
return True
def status(self):
return self._status
def open(self, **kwargs):
self._status = Connection.ESTABLISH_CONNECTION
global _zip_file
if not _zip_file:
_zip_file = _create_zip()
......@@ -450,14 +496,17 @@ class Connection(object):
if self.__host == "local:":
if vispa.config('workspace', 'allow_local', True):
self._connection = LocalConnection(command)
self._connection = LocalConnectionImpl(command)
else:
raise MessageException("local workspaces to allowed!")
else:
self._connection = SSHConnection(command, self.__host,
password=self.__password,
key=self.__key,
username=self.__username)
self._connection = SSHConnectionImpl(command, self.__host,
password=self.__password,
key=self.__key,
username=self.__username)
self._status = Connection.SETUP_REMOTE
loader_file = os.path.join(os.path.dirname(__file__),
"workspace_loader.py")
try:
......@@ -495,6 +544,7 @@ class Connection(object):
raise
self._connection.print_stderr()
logger.info("connection open")
self._status = Connection.CONNECTED
def close(self):
......@@ -508,9 +558,13 @@ class Connection(object):
logger.debug("close transport")
if hasattr(self, '_connection') and self._connection:
self._connection.close()
try:
self._connection.close()
except:
vispa.log_exception()
self._connection = None
self._status = Connection.DISCONNECTED
logger.debug("connection closed")
def stdin(self):
......@@ -548,6 +602,7 @@ class ConnectionPool(object):
self._serv_thread.start()
vispa.register_callback('user.logout', self.clear)
vispa.register_callback('shutdown', self.clear)
def __del__(self):
self._serv_running = False
......@@ -556,102 +611,110 @@ class ConnectionPool(object):
def _serve_connections(self):
logger.info("start rpyc serving thread")
while self._serv_running:
for conn in self._connections.values():
try:
conn.rpyc().ping()
except:
logger.exception("server rpyc connection")
for key in self._connections.keys():
if not self._connections[key].active():
del self._connections[key]
time.sleep(0.01)
logger.info("stop rpyc serving thread")
def _try_host(self, user, workspace, host, **kwargs):
host = host or "local:"
def _connect_host(self, user, workspace, host, **kwargs):
python = None
if workspace.command and len(workspace.command):
python = workspace.command
logger.info("spawn remote process: '%s' using command '%s'"
% (host, python))
username = workspace.login or user.name
client = Connection(
connection = Connection(
host,
python=python,
username=username,
key=workspace.key,
**kwargs)
client.open()
return client
threading.Thread(target=connection.open).start()
time.sleep(0.01)
return connection
def _connect(self, user, workspace, password=None):
logger.debug("connect new workspace")
if workspace.host:
hosts = workspace.host.split(',')
else:
hosts = ['local:']
while len(hosts) > 0:
idx = random.randint(0, len(hosts) - 1)
host = hosts.pop(idx).strip()
connection = self._connect_host(
user,
workspace,
host,
password=password)
if connection:
return connection
return None
def connect(self, user, workspace, password=None):
key = (user.id, workspace.id)
if key in self._connections:
try:
self._connections[key].rpyc().ping()
logger.debug(
"workspace already conected: %d - %d" %
(user.id, workspace.id))
return self._connections[key]
except:
logger.debug(
"exisitng workspace not connected: %d - %d" %
(user.id, workspace.id))
connection = self._connect(user, workspace, password)
if connection:
self._connections[key] = connection
return connection
else:
return None
def get(self, user, workspace, **kwargs):
key = (user.id, workspace.id)
if key in self._connections:
logger.debug("return pooled connection: %d - %d"
% (user.id, workspace.id))
return self._connections[key]
connection = self._connections[key]
if connection.connected():
return connection
else:
return None
else:
'''
logger.debug("check database for running workspaces %d - %d"
logger.debug("workspace not connected: %d - %d"
% (user.id, workspace.id))
db = kwargs.get("db", cherrypy.request.db)
q = db.query(WorkspaceConnection)
q = q.filter_by(workspace_id=workspace.id, user_id=user.id)
wc = q.first()
if wc:
logger.debug("try exisitng tempdir: %s" % wc.tempdir)
client = self._try_host(user, workspace, wc.host,
tempdir=wc.tempdir, **kwargs)
if client:
self._connections[key] = client
wc.tempdir = client.tempdir()
wc.host = client.host()
db.commit()
return client
else:
db.delete(wc)
'''
logger.debug("connect new workspace")
if workspace.host:
hosts = workspace.host.split(',')
else:
hosts = ['local:']
while len(hosts) > 0:
idx = random.randint(0, len(hosts) - 1)
host = hosts.pop(idx).strip()
client = self._try_host(user, workspace, host, **kwargs)
if client:
self._connections[key] = client
'''
wc = WorkspaceConnection()
wc.user_id = user.id
wc.workspace_id = workspace.id
wc.host = client.host()
wc.tempdir = client.tempdir()
db.add(wc)
db.commit()
'''
return client
return None
def clear(self, user, workspace=None, db=None):
db = db or cherrypy.request.db
def clear(self, user=None, workspace=None):
for _uid, _wid in self._connections.keys():
if user.id != _uid:
if user and user.id != _uid:
continue
if workspace and workspace.id != _wid:
continue
logger.info("clear client %d %d" % (_uid, _wid))
# TODO: put client.close in a Thread
del self._connections[(_uid, _wid)]
if workspace:
q = db.query(WorkspaceConnection)
q.filter_by(workspace_id=workspace.id,
user_id=user.id).delete()
else:
q = db.query(WorkspaceConnection)
q.filter_by(user_id=user.id).delete()
# if workspace:
# q = db.query(WorkspaceConnection)
# q.filter_by(workspace_id=workspace.id,
# user_id=user.id).delete()
# elif user:
# q = db.query(WorkspaceConnection)
# q.filter_by(user_id=user.id).delete()
# TODO: clear workspaces
def connections_of_user(self, user):
l = []
for key in self._connections.iterkeys():
if user.id != key[0]:
continue
l.append(self._connections[key])
return l
_connection_pool = ConnectionPool()
......@@ -662,6 +725,7 @@ class InstancePool(object):
self._instances = {}
vispa.register_callback('user.logout', self.clear)
vispa.register_callback('shutdown', self.clear)
def __del__(self):
for key in self._instances.keys():
......@@ -669,7 +733,7 @@ class InstancePool(object):
def get(self, _user, _workspace,
classname=None, key=None, init_args=None, **kwargs):
client = connect(_workspace, user=_user, **kwargs)
client = _connection_pool.get(_user, _workspace)
if classname:
_key = (_user.id, _workspace.id, classname, key)
logger.debug("get: %s", str(_key))
......@@ -695,9 +759,9 @@ class InstancePool(object):
else:
return client.rpyc().root
def clear(self, user, workspace=None, classname=None, key=None):
def clear(self, user=None, workspace=None, classname=None, key=None):
for _uid, _wid, _classname, _key in self._instances.keys():
if user.id != _uid:
if user and user.id != _uid:
continue
if workspace and workspace.id != _wid:
continue
......@@ -772,7 +836,7 @@ def connect(workspace, user=None, db=None, password=None):
if not workspace.has_access(user):
raise vispa.MessageException("Access to workspace not allowed!")
return _connection_pool.get(user, workspace, password=password)
return _connection_pool.connect(user, workspace, password=password)
def disconnect(workspace=None, user=None, db=None):
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment