Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision
  • connection-msg
  • controller-rtds
  • deps
  • fix-node-manager
  • fix-pipeline
  • master
  • simple-kubernetes-manager
  • update-build
  • update-mail
  • v0.1.0
  • v0.2.0
  • v0.3.2
  • v0.4.0
13 results

Target

Select target project
  • acs/public/villas/controller
1 result
Select Git revision
  • connection-msg
  • controller-rtds
  • deps
  • fix-node-manager
  • fix-pipeline
  • master
  • simple-kubernetes-manager
  • update-build
  • update-mail
  • v0.1.0
  • v0.2.0
  • v0.3.2
  • v0.4.0
13 results
Show changes
Commits on Source (11)
Showing
with 762 additions and 100 deletions
......@@ -2,3 +2,5 @@
*.egg-info/
build/
dist/
.vscode
.eggs
FROM fedora:30
ENV PYCURL_SSL_LIBRARY openssl
RUN dnf -y install \
gcc \
curl \
python3 \
python3-pip \
python3-devel \
libcurl-devel \
openssl-devel \
nmap-ncat
COPY . /tmp/controller
RUN cd /tmp/controller && \
python3 setup.py install && \
rm -rf /tmp/controller
ADD https://raw.githubusercontent.com/eficode/wait-for/master/wait-for /usr/bin/wait-for
RUN chmod +x /usr/bin/wait-for
FROM dpsim
RUN dnf -y install curl libcurl-devel openssl-devel
RUN pip3 install --upgrade pip
ENV PYCURL_SSL_LIBRARY openssl
RUN pip3 install pycurl
RUN useradd -m vc
COPY bin /home/vc/bin/
COPY setup.py /home/vc
COPY villas /home/vc/villas/
WORKDIR /home/vc
RUN python3.6 setup.py install
USER vc
CMD sleep 20 && villas-ctl -b "amqp://guest:guest@rabbitmq/%2F" --config config.json daemon
......@@ -6,33 +6,37 @@
"results" : "/var/lib/villas/results",
"models" : "/var/lib/villas/models"
},
"simulators" : [
"components" : [
{
"name" : "Generic Simulator #1",
"type" : "generic",
"realm" : "de.rwth-aachen.eonerc.acs",
"uuid" : "4bc4ab42-6b1b-11e8-b38e-63d6cb797c3c",
"location" : "Richard's PC",
"owner" : "rmr",
"name" : "Generic Simulator #1",
"category": "simulator",
"type" : "generic",
"realm" : "de.rwth-aachen.eonerc.acs",
"uuid" : "4bc4ab42-6b1b-11e8-b38e-63d6cb797c3c",
"location" : "Richard's PC",
"owner" : "rmr",
"whitelist" : [
"/sbin/ping",
"^echo"
],
"shell" : true
"shell" : true,
"enabled" : false
},
{
"name" : "Generic Simulator #2",
"type" : "generic",
"realm" : "de.rwth-aachen.eonerc.acs",
"uuid" : "82b0d882-6b1c-11e8-9708-ffdb6522d53c",
"location" : "Steffens PC",
"owner" : "svg",
"name" : "Generic Simulator #2",
"category": "simulator",
"type" : "generic",
"realm" : "de.rwth-aachen.eonerc.acs",
"uuid" : "82b0d882-6b1c-11e8-9708-ffdb6522d53c",
"location" : "Steffen's Laptop",
"owner" : "svg",
"whitelist" : [
"/sbin/ping",
"^echo"
],
"shell" : true
},
"shell" : true,
"enabled" : false
},
{
"name" : "Dummy Simulator #1",
"category" : "simulator",
......@@ -40,7 +44,34 @@
"realm" : "de.rwth-aachen.eonerc.acs",
"uuid" : "e15f5ad4-041f-11e8-9bf3-23372608bf60",
"location" : "Steffen's Laptop",
"owner" : "svo"
"owner" : "svg",
"enabled" : false
},
{
"name" : "VILLASnode local",
"category" : "gateway",
"type" : "villas-node",
"realm" : "de.rwth-aachen.eonerc.acs",
"uuid" : "7e5e1b2a-eff9-11e8-9425-320084a2b301",
"location" : "Steffen's Laptop",
"owner" : "svg",
"enabled" : true,
"autostart" : true,
"executable" : "/Users/stv0g/workspace/rwth/acs/public/villas/node/build_devel_osx/src/villas-node",
"api" : "http://localhost:8080",
"config_filename" : "/Users/stv0g/workspace/rwth/acs/public/villas/node/build_devel_osx/file-playback.conf",
"config" : {
}
},
{
"name" : "Playback controller",
"category" : "controller",
"type" : "playback",
"realm" : "de.rwth-aachen.eonerc.acs",
"uuid" : "c7bec168-f006-11e8-9405-320084a2b301",
"location" : "Steffen's Laptop",
"owner" : "svg"
}
]
}
from setuptools import setup, find_packages
from pkg_resources import parse_version # part of `setuptools`
from setuptools import setup
from glob import glob
import subprocess
import re
import os
def git_version():
"""Return version with local version identifier."""
try:
import git
repo = git.Repo('.git')
repo.git.status()
latest_tag = repo.git.describe(match='v[0-9]*', tags=True, abbrev=0)
sha = repo.head.commit.hexsha[:6]
version = latest_tag.lstrip('v')
return '{v}-{sha}'.format(v = version, sha = sha)
except:
return 'unknown'
def cleanhtml(raw_html):
cleanr = re.compile('<.*?>')
cleantext = re.sub(cleanr, '', raw_html)
return cleantext
def read(fname):
dname = os.path.dirname(__file__)
fname = os.path.join(dname, fname)
with open(fname) as f:
contents = f.read()
sanatized = cleanhtml(contents)
try:
from m2r import M2R
m2r = M2R()
return m2r(sanatized)
except:
return sanatized
with open('README.md') as f:
long_description = f.read()
setup(
name = 'villas-controller',
version = git_version(),
version = '0.3.0',
description = 'A controller/orchestration API for real-time power system simulators',
long_description = read('README.md'),
long_description = long_description,
long_description_content_type = 'text/markdown',
url = 'https://www.fein-aachen.org/projects/villas-controller/',
author = 'Steffen Vogel',
author_email = 'stvogel@eonrc.rwth-aachen.de',
maintainer = 'Steffen Vogel',
maintainer_email = 'stvogel@eonerc.rwth-aachen.de',
author_email = 'acs-software@eonerc.rwth-aachen.de',
license = 'GPL-3.0',
keywords = 'simulation controller villas',
classifiers = [
'Development Status :: 3 - Alpha',
'License :: OSI Approved :: GPL3',
'License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)',
'Programming Language :: Python :: 3'
],
packages = find_packages(),
packages = [ 'villas.controller' ],
setup_requires = [
'm2r',
'gitpython'
......
from .. import command
import logging
import functools as ft
import villas.controller.controller
from villas.controller.components.gateways.villas_node import VILLASnodeGateway
from villas.node.node import Node
LOGGER = logging.getLogger(__name__)
......@@ -14,8 +17,18 @@ class DaemonCommand(command.Command):
@staticmethod
def run(connection, args):
components = args.config.components
# Automatically create a VILLASnode Gateway if not present in config
node_present = ft.reduce(lambda present, comp: present or type(comp) is VILLASnodeGateway, components, False)
if not node_present:
LOGGER.info('Creating default VILLASnodeGateway component')
node_comp = VILLASnodeGateway()
components.append(node_comp)
try:
d = villas.controller.controller.Controller(connection, args.config.simulators)
d = villas.controller.controller.Controller(connection, components)
d.run()
except KeyboardInterrupt:
pass
......
import logging
import kombu
import time
import socket
import os
import uuid
from . import __version__ as version
from .exceptions import *
class Component(object):
def __init__(self, **props):
# Default values
if 'enabled' not in props:
props['enabled'] = True
if 'uuid' not in props:
props['uuid'] = str(uuid.uuid4())
self.realm = props['realm']
self.type = props['type']
self.name = props['name']
self.category = props['category']
self.enabled = props['enabled']
self.uuid = props['uuid']
self.started = time.time()
self.properties = props
self._state = 'idle'
self._stateargs = {}
self.logger = logging.getLogger('villas.controller:' + self.category + ':' + self.name)
self.producer = None
self.exchange = kombu.Exchange(
name = 'villas',
type = 'headers',
durable = True
)
def on_ready(self):
pass
def set_controller(self, ctrl):
self.controller = ctrl
self.connection = ctrl.connection
self.producer = kombu.Producer(
channel = self.connection.channel(),
exchange = self.exchange
)
self.publish_state()
def get_consumer(self, channel):
self.channel = channel
return kombu.Consumer(
channel = self.channel,
on_message = self.on_message,
queues = kombu.Queue(
exchange = self.exchange,
binding_arguments = self.headers,
durable = False
),
no_ack = True,
accept = {'application/json'}
)
@property
def headers(self):
return {
'x-match' : 'any',
'category' : self.category,
'realm' : self.realm,
'uuid' : self.uuid,
'type' : self.type,
'action' : 'ping'
}
@property
def state(self):
return {
'state' : self._state,
'version' : version,
'properties' : self.properties,
'uptime' : time.time() - self.started,
'host' : socket.gethostname(),
'kernel' : os.uname(),
**self._stateargs
}
def on_message(self, message):
self.logger.debug('Received message: %s', message.payload)
if 'action' in message.payload:
self.run_action(message.payload['action'], message)
def run_action(self, action, message):
if action == 'ping':
self.logger.debug('Received action: %s', action)
else:
self.logger.info('Received action: %s', action)
try:
if action == 'ping':
self.ping(message)
elif action == 'start':
self.change_state('starting')
self.start(message)
elif action == 'stop':
# state changed to stopping after the simulation
# has ended, to avoid missing log entries
self.stop(message)
elif action == 'pause':
self.change_state('pausing')
self.pause(message)
elif action == 'resume':
self.change_state('resuming')
self.resume(message)
elif action == 'shutdown':
self.change_state('shuttingdown')
self.shutdown(message)
elif action == 'reset':
self.change_state('resetting')
self.reset(message)
else:
raise SimulationException(self, 'Unknown action', action = action)
except SimulationException as se:
self.change_state('error', msg = se.msg, **se.info)
finally:
message.ack()
def change_state(self, state, **kwargs):
if self._state == state:
return
self.logger.info('State transition: %s => %s' % (self._state, state))
self._state = state
self._stateargs = kwargs
self.publish_state()
# Actions
def ping(self, message):
self.publish_state()
def start(self, message):
pass
def stop(self, message):
pass
def pause(self, message):
pass
def resume(self, message):
pass
def shutdown(self, message):
pass
def reset(self, message):
self.started = time.time()
@staticmethod
def from_json(json):
from .components.simulator import Simulator
from .components.gateway import Gateway
from .components.controller import Controller
if json['category'] == 'simulator':
return Simulator.from_json(json)
elif json['category'] == 'gateway':
return Gateway.from_json(json)
elif json['category'] == 'controller':
return Controller.from_json(json)
def publish_state(self):
if self.producer is None:
return
self.producer.publish(
self.state,
headers = self.headers
)
def __str__(self):
return '%s %s <%s: %s>' % (self.type, self.category, self.name, self.uuid)
import logging
from ..component import Component
class Controller(Component):
def __init__(self, **args):
super().__init__(**args)
@staticmethod
def from_json(json):
from .controllers import playback
if json['type'] == 'playback':
return playback.PlaybackController(**json)
else:
return None
from ..controller import Controller
class PlaybackController(Controller):
def __init__(self, **args):
super().__init__(**args)
self.controller_state = 'm1'
self.m1_uuid = 'edaf647a-f012-11e8-b711-320084a2b301'
self.m2_uuid = '73469f62-f1ae-11e8-9f9a-375372c8e719'
self._state = 'running'
def on_message(self, message):
self.logger.debug('Received message: %s', message.payload)
if 'action' in message.payload:
if message.payload['action'] == 'toggle':
self.toggle()
def toggle(self):
if self.controller_state == 'm1':
self.controller_state = 'm2'
self.producer.publish(
{
'action' : 'pause'
},
headers = {
'uuid' : self.m2_uuid
}
)
self.producer.publish(
{
'action' : 'resume'
},
headers = {
'uuid' : self.m1_uuid
}
)
elif self.controller_state == 'm2':
self.controller_state = 'm1'
self.producer.publish(
{
'action' : 'pause'
},
headers = {
'uuid' : self.m1_uuid
}
)
self.producer.publish(
{
'action' : 'resume'
},
headers = {
'uuid' : self.m2_uuid
}
)
import logging
from ..component import Component
class Gateway(Component):
def __init__(self, **args):
super().__init__(**args)
@staticmethod
def from_json(json):
from .gateways import villas_node
if json['type'] == 'villas-node':
return villas_node.VILLASnodeGateway(**json)
else:
return None
import threading
import requests
import time
from villas.node.node import Node
from ..gateway import Gateway
class VILLASnodeNode(Gateway):
def __init__(self, gateway, args):
# Some default properties
props = {
'category' : 'villas'
}
if args['type'] == 'websocket':
props['endpoint'] = gateway.node.api_endpoint + '/' + args['name']
props.update(args)
super().__init__(**props)
self.name = args['name']
self._state = args['state']
self.gateway = gateway
def start(self, message):
try:
self.gateway.node.request('node.start', { 'node' : self.name })
self.gateway.check_state()
except:
self.logger.warn('Failed to start node')
def stop(self, message):
try:
self.gateway.node.request('node.stop', { 'node' : self.name })
self.gateway.check_state()
except:
self.logger.warn('Failed to stop node')
def pause(self, message):
try:
self.gateway.node.request('node.pause', { 'node' : self.name })
self.gateway.check_state()
except:
self.logger.warn('Failed to pause node')
def resume(self, message):
try:
self.gateway.node.request('node.resume', { 'node' : self.name })
self.gateway.check_state()
except:
self.logger.warn('Failed to resume node')
def reset(self, message):
try:
self.gateway.node.reset('node.restart', { 'node' : self.name })
self.gateway.check_state()
except:
self.logger.warn('Failed to reset node')
class VILLASnodeGateway(Gateway):
def __init__(self, **args):
super().__init__(**args)
self.node = Node(**args)
self.nodes = []
if 'autostart' in args:
self.autostart = args['autostart']
else:
self.autostart = False
self.version = 'unknown'
self.thread_stop = threading.Event()
self.thread = threading.Thread(target = self.check_state_periodically)
self.thread.start()
def __del__(self):
self.thread_stop.set()
self.thread.join()
def check_state_periodically(self):
while not self.thread_stop.wait(2):
self.check_state()
def check_state(self):
try:
for node in self.node.nodes:
found = False
for comp in self.controller.components:
if comp.uuid == node['uuid']:
found = True
break
if found:
comp.change_state(node['state'])
else:
node = VILLASnodeNode(self, node)
self.nodes.append(node)
self.controller.components.add(node)
self.change_state('running')
except requests.RequestException as e:
self.change_state('idle')
@property
def state(self):
return {
'villas_version' : self.version,
**super().state
}
def on_ready(self):
try:
self.version = self.node.get_version()
except:
self.change_state('error', error='VILLASnode not installed')
if self.autostart and not self.node.is_running():
self.start()
def start(self, message = None):
self.node.start()
self.change_state('starting')
def stop(self, message):
if self.node.is_running():
self.node.stop()
self.change_state('idle')
# Once the gateway shutsdown, all the gateway nodes are also shutdown
for node in self.nodes:
node.change_state('shutdown')
def pause(self, message):
self.node.pause()
self.change_state('paused')
# Once the gateway shutsdown, all the gateway nodes are also shutdown
for node in self.nodes:
node.change_state('paused')
def resume(self, message):
self.node.resume()
def reset(self, message):
self.node.restart()
import logging
import kombu
import uuid
import time
import socket
import os
import pycurl
import io
import tempfile
import zipfile
from .exceptions import SimulationException
from . import __version__ as version
from ..component import Component
from ..exceptions import SimulationException
from .exceptions import SimulationException
from . import __version__ as version
import pycurl
class Simulator(object):
class Simulator(Component):
def __init__(self, **args):
self.realm = args['realm']
self.type = args['type']
self.name = args['name']
self.enabled = args['enabled'] if 'enabled' in args else True
self.uuid = args['uuid'] if 'uuid' in args else uuid.uuid4()
self.started = time.time()
self.properties = args
super().__init__(**args)
self.model = None
self._state = 'idle'
self._stateargs = {}
self.results = None
self.logger = logging.getLogger("villas.controller.simulator:" + self.uuid)
self.exchange = kombu.Exchange(
name = 'villas',
type = 'headers',
durable = True
)
try:
self.uuid = args['uuid'] if 'uuid' in args else uuid.uuid4()
except Exception as e:
self.logger = logging.getLogger("villas.controller.simulator:")
self.logger.info(e.msg)
self.logger = logging.getLogger("villas.controller.simulator:" + self.uuid)
def set_connection(self, connection):
self.connection = connection
self.producer = kombu.Producer(
channel = self.connection.channel(),
exchange = self.exchange
)
@property
def state(self):
return {
'model' : self.model,
'results' : self.results,
self.publish_state()
**super().state
}
@staticmethod
def from_json(json):
from .simulators import dummy, generic, rtlab, rscad
if json['type'] == "dummy":
if json['type'] == 'dummy':
return dummy.DummySimulator(**json)
if json['type'] == "generic":
if json['type'] == 'generic':
return generic.GenericSimulator(**json)
elif json['type'] == "dpsim":
elif json['type'] == 'dpsim':
from .simulators import dpsim
return dpsim.DPsimSimulator(**json)
elif json['type'] == "rtlab":
elif json['type'] == 'rtlab':
return dpsim.RTLabSimulator(**json)
elif json['type'] == "rscad":
elif json['type'] == 'rscad':
return dpsim.RSCADSimulator(**json)
else:
return None
def get_consumer(self, channel):
self.channel = channel
return kombu.Consumer(
channel = self.channel,
on_message = self.on_message,
queues = kombu.Queue(
exchange = self.exchange,
binding_arguments = self.headers,
durable = False
),
no_ack = True,
accept = {'application/json'}
)
@property
def headers(self):
return {
'x-match' : 'any',
'category' : 'simulator',
'realm' : self.realm,
'uuid' : self.uuid,
'type' : self.type
}
@property
def state(self):
state = {
'state' : self._state,
'model' : self.model,
'version' : version,
'properties' : self.properties,
'uptime' : time.time() - self.started,
'host' : socket.getfqdn(),
'kernel' : os.uname(),
**self._stateargs
}
def change_state(self, state, force=False, **kwargs):
if self._state == state:
return
return state
def on_message(self, message):
self.logger.debug('Received message: %s', message.payload)
if 'action' in message.payload:
self.run_action(message.payload['action'], message)
def run_action(self, action, message):
self.logger.info('Received %s command', action)
try:
if action == 'ping':
self.ping(message)
elif action == 'start':
self.change_state('starting')
self.start(message)
elif action == 'stop':
# state changed to stopping after the simulation
# has ended, to avoid missing log entries
self.stop(message)
elif action == 'pause':
self.change_state('pausing')
self.pause(message)
elif action == 'resume':
self.change_state('resuming')
self.resume(message)
elif action == 'shutdown':
self.change_state('shuttingdown')
self.shutdown(message)
elif action == 'reset':
self.change_state('resetting')
self.reset(message)
else:
raise SimulationException(self, 'Unknown action', action = action)
except SimulationException as se:
self.change_state('error', msg = se.msg, **se.info)
finally:
message.ack()
def publish_state(self):
self.producer.publish(
self.state,
headers = self.headers
)
def change_state(self, state, **kwargs):
valid_state_transitions = {
# current # list of valid next states
'error': [ 'resetting', 'error' ],
......@@ -171,14 +60,15 @@ class Simulator(object):
'resuming': [ 'resetting', 'error', 'running' ],
'stopping': [ 'resetting', 'error', 'idle' ],
'resetting' : [ 'resetting', 'error', 'idle' ],
'shuttingdown' : [ 'shutdown' ]
'shuttingdown' : [ 'shutdown', 'error' ],
'shutdown' : [ 'starting', 'error' ]
}
# check that we have been asked for a valid state
if state not in valid_state_transitions:
raise SimulationException(self, msg = 'Invalid state', state = state)
if state not in valid_state_transitions[self._state]:
if not force and state not in valid_state_transitions[self._state]:
raise SimulationException(self, msg = 'Invalid state transtion', current = self._state, next = state)
self._state = state
......@@ -195,44 +85,32 @@ class Simulator(object):
self.publish_state()
# Actions
def ping(self, message):
self.publish_state()
def start(self, message):
self.started = time.time()
self.simuuid = uuid.uuid4();
self.simuuid = uuid.uuid4()
if 'parameters' in message.payload:
self.params = message.payload['parameters']
if 'model' in message.payload:
self.model = message.payload['model']
if 'results' in message.payload:
self.results = message.payload['results']
self.workdir = "/var/villas/controller/simulators/" + \
str(self.uuid) + "/simulation/" + str(self.simuuid);
self.logdir = self.workdir + "/Logs/"
self.logger.info("Target working directory: %s" % self.workdir)
self.workdir = '/var/villas/controller/simulators/' + \
str(self.uuid) + '/simulation/' + str(self.simuuid)
self.logdir = self.workdir + '/Logs/'
self.logger.info('Target working directory: %s' % self.workdir)
try:
os.makedirs(self.logdir)
os.chdir(self.logdir)
except Exception as e:
raise SimulationException(self, 'Failed to create and change to working directory: %s ( %s )' % (self.logdir, e))
def stop(self, message):
pass
def pause(self, message):
pass
def resume(self, message):
pass
def shutdown(self, message):
pass
def reset(self, message):
self.started = time.time()
def pycurl_upload(self, filename):
def _pycurl_upload(self, filename):
try:
c = pycurl.Curl()
url = self.results['url']
......@@ -248,65 +126,63 @@ class Simulator(object):
except Exception as e:
self.logger.error('Curl failed: %s' % str(e))
def upload_results(self):
def _pycurl_download(self, url):
try:
filename = self.workdir + '/results.zip'
with zipfile.ZipFile(filename, 'w') as results_zip:
for sub in os.scandir(self.logdir):
results_zip.write(sub);
results_zip.close();
buffer = io.BytesIO()
c = pycurl.Curl()
c.setopt(c.URL, url)
c.setopt(c.WRITEDATA, buffer)
c.perform()
c.close()
except Exception as e:
self.logger.error('Zip failed: %s' % str(e))
fp = tempfile.NamedTemporaryFile(delete = False, suffix = '.xml')
fp.write(buffer.getvalue())
fp.close()
if 'url' in self.results:
self.pycurl_upload(filename)
def writeBufferToTemporaryFile(self, buf):
if buf != None:
try:
fp = tempfile.NamedTemporaryFile(delete=False, suffix=".xml")
fp.write(buf.getvalue())
fp.close()
return fp.name
except IOError as e:
self.logger.error('Failed to process url: ' + url + ' in temporary file: ' + fp.name + str(e))
return None
def unzipFile(self, filename):
except pycurl.error as e:
self.logger.error('Failed to load url: ' + url + ' error: ' + str(e))
return None
except IOError as e:
self.logger.error('Failed to process url: ' + url + ' in temporary file: ' + fp.name + str(e))
return None
finally:
return fp.name
def _zip_files(self, folder):
pass
def _unzip_files(self, filename):
if filename is not None:
if zipfile.is_zipfile(filename):
with zipfile.ZipFile(filename,"r") as zip_ref:
with zipfile.ZipFile(filename, 'r') as zip_ref:
zipdir = tempfile.mkdtemp()
zip_ref.extractall(zipdir)
return zipdir
else:
return filename
def check_download(self, message):
self.logger.info(self.model)
if self.model:
if 'url' in self.model:
buf = self.downloadURL(self.model['url'])
filename = self.writeBufferToTemporaryFile(buf)
return self.unzipFile(filename)
else:
self.logger.info("No url in message.properties['application_headers']:")
self.logger.info(message.properties['application_headers'])
def downloadURL(self, url):
def upload_results(self):
try:
buffer = io.BytesIO()
c = pycurl.Curl()
c.setopt(c.URL, url)
c.setopt(c.WRITEDATA, buffer)
c.perform()
c.close()
except pycurl.error as e:
self.logger.error('Failed to load url: ' + url + " error: " + str(e))
return None
filename = self.workdir + '/results.zip'
with zipfile.ZipFile(filename, 'w') as results_zip:
for sub in os.scandir(self.logdir):
results_zip.write(sub)
results_zip.close()
except Exception as e:
self.logger.error('Zip failed: %s' % str(e))
return buffer
if 'url' in self.results:
self._pycurl_upload(filename)
else:
self.logger.info('No URL provided for result upload. Skipping upload.')
def download_model(self):
if self.model:
if 'url' in self.model:
filename = self._pycurl_download(self.model['url'])
def __str__(self):
return "%sSimulator <%s: %s>" % (self.type, self.name, self.uuid)
return self._unzip_files(filename)
else:
self.logger.info('No URL provided for model download. Skipping download.')
......@@ -40,13 +40,13 @@ class DPsimSimulator(simulator.Simulator):
if fp != None:
try:
self.sim = dpsim.load_cim(fp.name)
self.logger.info(simulation)
self.logger.info(self.sim)
os.unlink(fp.name)
except IOError:
self.logger.error('Failed to process url: ' + url + ' in temporary file: ' + fp.name)
def start(self, message):
fp = self.check_download(message)
fp = self.download_model(message)
if (fp):
self.load_cim(fp)
......
......@@ -4,9 +4,18 @@ from .. import simulator
class DummySimulator(simulator.Simulator):
def __init__(self, **args):
super().__init__(**args)
self.timer = None
def __del__(self):
if self.timer:
self.timer.cancel()
def _schedule_state_transition(self, state, time = 1.0):
t = threading.Timer(time, self.change_state, args=[state])
t.start()
self.timer = threading.Timer(time, self.change_state, args=[state])
self.timer.start()
def start(self, message):
self._schedule_state_transition('running')
......
......@@ -7,16 +7,25 @@ import subprocess
import signal
import uuid
from ..exceptions import SimulationException
from ...exceptions import SimulationException
from .. import simulator
class GenericSimulator(simulator.Simulator):
def __init__(self, **args):
super().__init__(**args)
self.child = None
self.return_code = None
self.timer = None
self.thread = None
super().__init__(**args)
def __del__(self):
if self.timer:
self.timer.cancel()
if self.thread:
self
@property
def state(self):
......@@ -26,9 +35,6 @@ class GenericSimulator(simulator.Simulator):
return state
def upload_results(self):
super().upload_results()
def start(self, message):
super().start(message)
self.logger.info("Working directory: %s" % os.getcwd())
......@@ -41,8 +47,8 @@ class GenericSimulator(simulator.Simulator):
try:
params = message.payload['parameters']
thread = threading.Thread(target = GenericSimulator.run, args = (self, params, path))
thread.start()
self.thread = threading.Thread(target = GenericSimulator.run, args = (self, params, path))
self.thread.start()
except Exception as e:
raise SimulationException(self, msg = 'Failed to start child process: %s' % e)
......@@ -51,8 +57,8 @@ class GenericSimulator(simulator.Simulator):
self.change_state('error', msg = 'Failed to transition to state "%s"!' % state)
def check_state_deferred(self, state, timeout = 5):
t = threading.Timer(timeout, self.check_state, args = [state])
t.start()
self.timer = threading.Timer(timeout, self.check_state, args = [state])
self.timer.start()
def run(self, params):
try:
......