Commit b5150d33 authored by Gero Müller's avatar Gero Müller
Browse files

add direct remote mode, disable fifo mode as default: 100% cpu fix

parent eef9c03a
......@@ -398,12 +398,21 @@ class Connection(object):
loader_file = os.path.join(os.path.dirname(__file__),
"workspace_loader.py")
try:
# send in memory loader
self._send_file(loader_file)
# send packages
self._send_file(_zip_file)
# wait for remote site finish
time.sleep(0.1)
self._connection.print_stderr()
# send
logger.debug("send commands")
self._connection.writeln("start")
self._connection.writeln("direct")
# TODO: make direct or fifo mode selectable
#self._connection.writeln("start")
self._connection.writeln(kwargs.get("tempdir", ""))
rval = self._readline()
parts = rval.split(":")
......
......@@ -286,6 +286,40 @@ class RemoteHandler(logging.Handler):
self._handler(record.name, record.levelno, msg)
def main_loop(fin, fout):
# connect the rpyc server to fifo
conn = rpyc.connect_pipes(fin, fout, Service,
{'allow_public_attrs': True})
try:
logging.getLogger().addHandler(RemoteHandler(conn.root.log))
logger.info("connected, serving")
except:
log_exception()
logger.debug("daemon loop")
try:
try:
while True:
if not conn.poll_all(3600):
logging.info("timeout reached: disconnect")
break
# else:
# logger.debug("daemon next loop")
except EOFError:
# log_exception()
pass
except Exception:
# log_exception()
if not conn.closed:
raise
finally:
conn.close()
logging.getLogger().handlers = []
logger.debug("daemon done")
class RpycDaemon(Daemon):
def __init__(self, base):
......@@ -313,37 +347,7 @@ class RpycDaemon(Daemon):
logger.debug("daemon connect pipes")
# connect the rpyc server to fifo
conn = rpyc.connect_pipes(fin, fout, Service,
{'allow_public_attrs': True})
try:
logging.getLogger().addHandler(RemoteHandler(conn.root.log))
logger.info("connected, serving")
except:
log_exception()
logger.debug("daemon loop")
try:
try:
while True:
if not conn.poll_all(3600):
logging.info("timeout reached: disconnect")
break
# else:
# logger.debug("daemon next loop")
except EOFError:
# log_exception()
pass
except Exception:
# log_exception()
if not conn.closed:
raise
finally:
conn.close()
logging.getLogger().handlers = []
logger.debug("daemon done")
main_loop(fin, fout)
# close fifos
fin.close()
......@@ -380,8 +384,6 @@ def safe_read(fd, size=1024 * 1024):
def safe_write(fd, data):
'''write data to pipe'''
if data == None:
return
written = 0
length = len(data)
while written < length:
......@@ -438,12 +440,15 @@ def stream_fifo(base):
break
for s in irdy:
if s == fdin:
d = safe_read(fdin)
safe_write(fin, d)
elif s == fout:
d = safe_read(fout)
safe_write(fdout, d)
d = safe_read(fdin)
if d == None:
time.sleep(0)
else:
if s == fdin:
safe_write(fin, d)
elif s == fout:
safe_write(fdout, d)
except Exception, e:
logger.debug("exception: %s" % str(e))
running = False
......@@ -465,6 +470,14 @@ logger.info("action: %s", action)
logger.info("base: %s", base)
def cleanup_base():
# TODO: be more careful in deleting directories
if os.path.isdir(base):
shutil.rmtree(base)
atexit.register(cleanup_base)
daemon = RpycDaemon(base)
if 'start' == str(action).strip():
sys.stdout.write("start:%s\n" % base)
......@@ -480,6 +493,19 @@ elif 'restart' == action:
logger.info("restart daemon")
sys.stdout.write("restart:%s\n" % base)
daemon.restart()
elif 'direct' == action:
sys.stdout.write("start:%s\n" % base)
logger.info("direction connection")
sys.stdout.flush()
sys.stderr.flush()
stdout = sys.stdout
# redirect stdout and err to files
sys.stdout = file(join_path(base, 'rpycd.out'), 'ab+', 0)
sys.stderr = file(join_path(base, 'rpycd.err'), 'ab+', 0)
main_loop(sys.stdin, stdout)
else:
sys.stdout.write("unknown:%s\n" % base)
logger.error("unknown action: '%s'" % action)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment