Skip to content
Snippets Groups Projects
Commit 09e6a3f5 authored by Steffen Vogel's avatar Steffen Vogel :santa_tone2:
Browse files

add python script for spawning remote session

parent fa477c61
No related branches found
No related tags found
No related merge requests found
python/dist/
python/rwth_jupyter.egg-info/
__pycache__/
[metadata]
name = rwth-jupyter
version = 0.0.4
author = Steffen Vogel
author_email = post@steffenvogel.de
description = Start a RWTHjupyter instance on your machine
long_description = file: README.md
long_description_content_type = text/markdown
url = https://git.rwth-aachen.de/jupyter/remote-spawn
project_urls =
Bug Tracker = https://git.rwth-aachen.de/jupyter/remote-spawn/-/issues
classifiers =
Programming Language :: Python :: 3
License :: OSI Approved :: MIT License
Operating System :: OS Independent
[options]
package_dir =
= src
packages = find:
python_requires = >=3.6
install_requires =
requests
rpmfile
jupyterhub
jupyterlab
[options.packages.find]
where = src
[options.entry_points]
console_scripts =
rwth-jupyter = rwth_jupyter.main:main
jupyter-remote = rwth_jupyter.main:main
from setuptools import setup
setup()
import threading
import shutil
import tempfile
import os
import requests
import logging
import gzip
from rwth_jupyter.process import Process
from rwth_jupyter.config import CHISEL_PATH
class Chisel(Process):
def __init__(self, host, remotes, auth=None, fingerprint=None):
command = ['chisel', 'client']
if auth:
username = auth.get('username')
password = auth.get('password')
command += ['--auth', f'{username}:{password}']
if fingerprint:
command += ['--fingerprint', fingerprint]
command += [host] + remotes
super().__init__('chisel', command)
self.connected = threading.Event()
def process_line(self, line):
if line.find(b'Connected'):
self.connected.set()
@staticmethod
def install(url):
# Abort if executable already exists
if shutil.which('chisel'):
logging.info('Tool chisel already exists. Skipping installation')
return
logging.info('Installing Chisel...')
# Download an unzip Chisel
r = requests.get(url)
with tempfile.TemporaryFile() as tf:
tf.write(r.content)
tf.seek(0)
with gzip.open(tf) as sf:
with open(CHISEL_PATH, '+wb') as df:
df.write(sf.read())
os.chmod(CHISEL_PATH, 0o744)
import site
import socket
import pathlib
import getpass
import argparse
JUPYTERHUB_URL = 'https://jupyter.rwth-aachen.de'
CHISEL_URL = 'https://github.com/jpillora/chisel/releases/download/v1.7.6/chisel_1.7.6_linux_amd64.gz'
SSHFS_URL = 'https://download-ib01.fedoraproject.org/pub/epel/7/x86_64/Packages/f/fuse-sshfs-2.10-1.el7.x86_64.rpm'
HOME_PATH = str(pathlib.Path.home())
LOCAL_BIN_PATH = f'{site.USER_BASE}/bin'
CHISEL_PATH = f'{LOCAL_BIN_PATH}/chisel'
JUPYTER_PATH = f'{LOCAL_BIN_PATH}/jupyterhub-singleuser'
SSHFS_PATH = f'{LOCAL_BIN_PATH}/sshfs'
USER_MOUNT_PATH = f'{HOME_PATH}/jupyter-home'
MOUNT_PATH = f'{HOME_PATH}/jupyter-home'
def get_config():
is_rwth = socket.gethostname().endswith('hpc.itc.rwth-aachen.de')
parser = argparse.ArgumentParser('rwth-jupyter')
parser.add_argument('--rwth', '-r', default=is_rwth)
parser.add_argument('--token', '-t')
parser.add_argument('--token-path', '-T', default=f'{HOME_PATH}/.jupyter/token')
parser.add_argument('--mount', '-m', default=True)
parser.add_argument('--mount-point', '-M', default=MOUNT_PATH)
parser.add_argument('--jupyterhub-url', '-J', default=JUPYTERHUB_URL)
parser.add_argument('--sshfs-url', '-S', default=SSHFS_URL)
parser.add_argument('--chisel-url', '-C', default=CHISEL_URL)
return parser.parse_args()
import requests
import logging
import secrets
import string
import urllib
import json
import os
from rwth_jupyter.process import Process
from rwth_jupyter.config import HOME_PATH
class JupyterSingleuser(Process):
def __init__(self, username, token, notebook_port, hub_port):
hub_api_url = f'http://localhost:{hub_port}/hub/api'
env={
**os.environ,
'JUPYTERHUB_API_TOKEN': token,
'JUPYTERHUB_CLIENT_ID': f'jupyterhub-user-{username}',
'JUPYTERHUB_API_URL': hub_api_url,
'JUPYTERHUB_ACTIVITY_URL': f'{hub_api_url}/users/{username}/activity',
'JUPYTERHUB_OAUTH_CALLBACK_URL': f'/user/{username}/oauth_callback',
'JUPYTERHUB_USER': username
}
options = {
'SingleUserNotebookApp.base_url': f'/user/{username}',
'SingleUserNotebookApp.port': notebook_port,
'SingleUserNotebookApp.allow_origin': '*',
}
command = ['jupyterhub-singleuser']
for k, v in options.items():
command += [f'--{k}', str(v)]
command.append(HOME_PATH)
super().__init__('jupyter', command, env=env)
class JupyterHub:
def __init__(self, url, token_path):
self.url = url
self.token_path = token_path
def get_username(self, token):
r = requests.get(self.url+'/hub/api/user',
headers={
'Authorization': f'token {token}'
})
r.raise_for_status()
return r.json().get('name')
def get_token(self, next=None):
# Generate random reflection token
reflect_token = ''.join(secrets.choice(string.ascii_letters + string.digits) for i in range(32))
url = f'{self.url}/services/reflector/api/request/{reflect_token}'
if next:
url += '?next=' + urllib.parse.quote_plus(next)
# Direct user to token reflector
logging.info('')
logging.info('Please visit the following link to generate an API token for RWTHjupyter:')
logging.info('')
logging.info(' %s', url)
logging.info('')
logging.info('Waiting for token...')
r = requests.get(f'{self.url}/services/reflector/api/retrieve/{reflect_token}',
timeout=60)
r.raise_for_status()
return r.json()
@property
def api_token(self):
return self.token.get('token')
def login(self, api_token, next=None):
# Check if privided token is valid
if api_token:
self.token = {
'token': api_token
}
try:
self.username = self.get_username(self.api_token)
except requests.RequestException as e:
raise Exception('Invalid command line token: ' + str(e))
else:
try:
with open(self.token_path, 'r') as tf:
logging.info('Found existing API token for JupyterHub')
self.token = json.load(tf)
except (OSError, KeyError, json.JSONDecodeError):
logging.error('Existing token is invalid. Requesting a new one...')
try:
self.token = self.get_token(next)
except requests.RequestException as e:
logging.error('Failed to get token via reflector: %s', e)
logging.error('')
self.token = {
'token': input('Please provide a token by hand: ')
}
try:
self.username = self.get_username(self.api_token)
except requests.RequestException as e:
raise Exception('Invalid command line token: ' + str(e))
logging.info('Got RWTHjupyter token: %s', self.api_token)
logging.info('Got RWTHjupyter username: %s', self.username)
logging.info('Saving token to: %s', self.token_path)
with open(self.token_path, 'w+') as tf:
json.dump(self.token, tf)
def spawn(self, slug='remote-spawn'):
logging.info('Spawning JupyterHub session...')
r = requests.post(f'{self.url}/hub/api/users/{self.username}/server',
headers={
'Authorization': f'token {self.api_token}'
},
json={
'profile': slug
})
r.raise_for_status()
# Observe progress
r = requests.get(f'{self.url}/hub/api/users/{self.username}/server/progress',
headers={
'Authorization': f'token {self.api_token}'
},
stream=True,
timeout=20)
for raw_line in r.iter_lines():
if raw_line:
line = raw_line.decode('utf-8')
if line.startswith('data: '):
payload_raw = line[6:]
payload = json.loads(payload_raw)
raw = payload.get('raw_event')
if raw:
type = raw.get('type')
if type in ['Warning']:
log = logging.warn
elif type in ['Normal']:
log = logging.info
else:
log = logging.info
log(' %d%% - %s',
payload.get('progress'),
raw.get('message')
)
else:
logging.info(' %d%% - %s',
payload.get('progress'),
payload.get('message')
)
def get_server(self):
r = requests.get(f'{self.url}/hub/api/user',
headers={
'Authorization': f'token {self.api_token}'
})
r.raise_for_status()
j = r.json()
return j.get('server')
#!/bin/env python3
import atexit
import logging
import os
import pathlib
import random
import requests
import socket
import time
from rwth_jupyter.config import *
from rwth_jupyter.jupyter import JupyterSingleuser, JupyterHub
from rwth_jupyter.chisel import Chisel
from rwth_jupyter.sshfs import Sshfs
jupyter = None
chisel = None
sshfs = None
def get_free_port():
for _ in range(5):
port = random.randint(1024, 65535)
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
r = sock.connect_ex(('127.0.0.1', port))
if r != 0:
return port
def stop():
if jupyter is not None:
jupyter.stop()
if chisel is not None:
chisel.stop()
if sshfs is not None:
sshfs.unmount()
def main():
# Setup logging
FORMAT = '%(asctime)-15s [%(levelname)s] %(message)s'
logging.basicConfig(level=logging.INFO, format=FORMAT)
# Register signals for catching Ctrl-C
atexit.register(stop)
# Command line arguments
cfg = get_config()
# Find free ports
jupyter_notebook_port = get_free_port()
jupyter_hub_port = get_free_port()
sftp_port = get_free_port()
if None in [jupyter_notebook_port, jupyter_hub_port, sftp_port]:
raise Exception('Failed to find free ports')
# Create directories
pathlib.Path(os.path.dirname(cfg.token_path)).mkdir(parents=True, exist_ok=True)
pathlib.Path(LOCAL_BIN_PATH).mkdir(parents=True, exist_ok=True)
os.environ['PATH'] += os.pathsep + LOCAL_BIN_PATH
if os.environ.get('PYTHONPATH'):
os.environ['PYTHONPATH'] += os.pathsep + site.USER_BASE + '/lib/python3.6/site-packages'
else:
os.environ['PYTHONPATH'] = site.USER_BASE + '/lib/python3.6/site-packages'
# Install tools
if cfg.rwth:
Sshfs.install(cfg.sshfs_url)
Chisel.install(cfg.chisel_url)
next = '/user-redirect/'
hub = JupyterHub(cfg.jupyterhub_url, cfg.token_path)
hub.login(cfg.token, next)
# Check if proper server is running in JupyterHub
server = hub.get_server()
if server is None:
hub.spawn('hpc')
# Get tunnel connection details
r = requests.post(JUPYTERHUB_URL+f'/user/{hub.username}/api/v1',
headers={
'Authorization': f'token {hub.api_token}'
},
data={
'stop': 'true'
})
r.raise_for_status()
j = r.json()
chisel_username = j.get('chisel').get('username')
chisel_password = j.get('chisel').get('password')
chisel_fingerprint = j.get('chisel').get('fingerprint')
jupyter_token = j.get('jupyter').get('token')
# Establish chisel tunnel
chisel = Chisel(cfg.jupyterhub_url+f'/user/{hub.username}/', [
f'{sftp_port}:localhost:7777',
f'{jupyter_hub_port}:hub.jhub:8081',
f'R:8890:localhost:{jupyter_notebook_port}'
],
auth={
'username': chisel_username,
'password': chisel_password
},
fingerprint=chisel_fingerprint)
if not chisel.connected.wait(timeout=5):
raise Exception('Failed to connect to chisel server')
# Start jupyterhub-singleuser
jupyter = JupyterSingleuser(hub.username, jupyter_token, jupyter_notebook_port, jupyter_hub_port)
# Mount RWTHjupyter home directory
if cfg.mount and False:
if cfg.rwth:
# For some reason we can not create an SSHFS mountpoint
# within an existing NFS mount point which is the case for the RWTH compute cluster
# As a workaround we mount it into /tmp and create a symlink
orig_mount_point = cfg.mount_point
cfg.mount_point = f'/tmp/' + getpass.getuser() + '/jupyter'
if not os.path.islink(orig_mount_point):
os.symlink(orig_mount_point, cfg.mount_point)
sshfs = Sshfs(cfg.mount_point, sftp_port)
sshfs.mount()
time.sleep(3) # Delay the notes a bit so that they arnt hidden by Jupyter logs
logging.info('')
logging.info('You can now access your RWTHjupyter session here:')
logging.info('')
logging.info(' %s/user/%s/', hub.url, hub.username)
logging.info('')
import threading
import subprocess
import logging
import sys
import os
class Process(threading.Thread):
def __init__(self, name, command, env=None):
super().__init__()
self.name = name
self.command = command
self.env = env
self.start()
def stop(self):
self.process.kill()
self.process.wait()
self.join()
def process_line(self, line):
pass
def run(self):
logging.info('Starting %s process in background: %s', self.name, ' '.join(self.command))
self.process = subprocess.Popen(self.command,
env=self.env or os.environ,
shell=False,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
for line in iter(self.process.stdout.readline, ''):
sys.stdout.buffer.write(line)
sys.stdout.flush()
self.process_line(line)
rc = self.process.poll()
if rc is not None:
break
logging.info('Process %s has stopped: rc=%d', self.name, rc)
def run(command):
logging.info('Running command: %s', ' '.join(command))
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
for line in iter(process.stdout.readline, ''):
sys.stdout.buffer.write(line)
sys.stdout.flush()
rc = process.poll()
if rc is not None:
break
return rc
import shutil
import logging
import os
import pathlib
import requests
import tempfile
import rpmfile
from rwth_jupyter.process import run
from rwth_jupyter.config import SSHFS_PATH
class Sshfs:
def __init__(self, mp, port):
self.mount_point = mp
self.port = port
def mount(self):
logging.info('Mounting RWTHjupyter home...')
# Create mountpoint
pathlib.Path(self.mount_point).mkdir(parents=True, exist_ok=True)
if os.path.ismount(self.mount_point):
logging.warn('Already mounted')
return
options = {
'directport': port,
'cache': True,
'cache_max_size': 536870912, # 512 MiB
'cache_timeout': 60,
'compression': False,
'kernel_cache': None,
'auto_cache': None,
'large_read': None
}
command = ['sshfs']
for k, v in options.items():
if v is None:
command += ['-o', k]
else:
if v in [True, False]:
v = 'yes' if v else 'no'
command += ['-o', f'{k}={v}']
command += ['127.0.0.1:/home/jovyan', self.mount_point]
rc = run(command)
if rc != 0:
raise Exception('Failed to mount remote dir: rc=' + rc)
def sshfs_unmount():
if os.path.ismount(self.mount_point):
logging.info('Un-mounting RWTHjupyter home...')
run(['fusermount', '-u', self.mount_point])
@staticmethod
def install(url):
# Abort if executable already exists
if shutil.which('sshfs'):
logging.info('Tool sshfs already exists. Skipping installation')
return
logging.info('Install sshfs...')
# Download an extract executable from RPM
r = requests.get(url)
with tempfile.TemporaryFile() as tf:
tf.write(r.content)
tf.seek(0)
with rpmfile.open(fileobj=tf) as f:
with f.extractfile('./usr/bin/sshfs') as sf:
with open(SSHFS_PATH, 'wb') as df:
df.write(sf.read())
os.chmod(SSHFS_PATH, 0o744)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment