Aufgrund von Umarbeiten des s3 Storage wird es in GitLab, in nächster Zeit, mögliche Performance-Einbußen geben. Näheres dazu unter: https://maintenance.itc.rwth-aachen.de/ticket/status/messages/43/show_ticket/6670

Commit cb8c438b authored by Bichen Li's avatar Bichen Li

- Update the dockerfile, add the pull of py4mod package

parent 836bc55b
# ignore results
*.csv
*.mat
# ignore symbolic links
*.egg-info
*.eggs
# ignore compiled python files
*.pyc
# ignore logging files
*.log
# ignore generated dymola files
buildlog.txt
dsfinal.txt
dsin.txt
dslog.txt
dsmodel*
dymosim*
# ignore matlab dumping file
*.mdmp
# ignore spyder project
.spyderproject
.spyproject
# ignore pycharm files
.idea
__pycache__
# ignore jupyter notebook files
.ipynb_checkpoints
# ignore results
*.csv
*.mat
# ignore symbolic links
*.egg-info
*.eggs
# ignore compiled python files
*.pyc
# ignore logging files
*.log
# ignore generated dymola files
buildlog.txt
dsfinal.txt
dsin.txt
dslog.txt
dsmodel*
dymosim*
# ignore matlab dumping file
*.mdmp
# ignore spyder project
.spyderproject
.spyproject
# ignore pycharm files
.idea
__pycache__
# ignore jupyter notebook files
.ipynb_checkpoints
Test.py
\ No newline at end of file
# Dataprocessing toolkit for RWTH ACS simulators
## Copyright
2017, Institute for Automation of Complex Power Systems, EONERC, RWTH Aachen University
## License
This project is released under the terms of the [GPL version 3](COPYING.md).
```
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
```
For other licensing options please consult [Prof. Antonello Monti](mailto:amonti@eonerc.rwth-aachen.de).
## Contact
[![EONERC ACS Logo](doc/eonerc_logo.png)](http://www.acs.eonerc.rwth-aachen.de)
- Markus Mirz <mmirz@eonerc.rwth-aachen.de>
- Jan Dinkelbach <JDinkelbach@eonerc.rwth-aachen.de>
- Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
[Institute for Automation of Complex Power Systems (ACS)](http://www.acs.eonerc.rwth-aachen.de)
[EON Energy Research Center (EONERC)](http://www.eonerc.rwth-aachen.de)
[RWTH University Aachen, Germany](http://www.rwth-aachen.de)
# Dataprocessing toolkit for RWTH ACS simulators
## Copyright
2017, Institute for Automation of Complex Power Systems, EONERC, RWTH Aachen University
## License
This project is released under the terms of the [GPL version 3](COPYING.md).
```
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
```
For other licensing options please consult [Prof. Antonello Monti](mailto:amonti@eonerc.rwth-aachen.de).
## Contact
[![EONERC ACS Logo](doc/eonerc_logo.png)](http://www.acs.eonerc.rwth-aachen.de)
- Markus Mirz <mmirz@eonerc.rwth-aachen.de>
- Jan Dinkelbach <JDinkelbach@eonerc.rwth-aachen.de>
- Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
[Institute for Automation of Complex Power Systems (ACS)](http://www.acs.eonerc.rwth-aachen.de)
[EON Energy Research Center (EONERC)](http://www.eonerc.rwth-aachen.de)
[RWTH University Aachen, Germany](http://www.rwth-aachen.de)
from dataprocessing.readtools import *
from dataprocessing.timeseries import *
def get_node_voltage_phasors(dpsim_timeseries_list):
"""Calculate voltage phasors of all nodes
:param dpsim_timeseries_list: timeseries list retrieved from dpsim results
:return:
"""
voltage_phasor_list = {}
for ts in dpsim_timeseries_list:
ts_abs = ts.abs(ts.name + '_abs')
ts_phase = ts.phase(ts.name + '_phase')
ts_phasor = {}
ts_phasor['abs'] = ts_abs
ts_phasor['phase'] = ts_phase
voltage_phasor_list[ts.name] = ts_phasor
return voltage_phasor_list
def get_node_emt_voltages(timeseries_list, freq):
"""Calculate voltage phasors of all nodes
:param timeseries_list: timeseries list retrieved from dpsim results
:return:
"""
voltages_list = {}
for ts in timeseries_list:
ts_emt = ts.dynphasor_shift_to_emt(ts.name, freq)
voltages_list[ts.name] = ts_emt
return voltages_list
from dataprocessing.readtools import *
from dataprocessing.timeseries import *
def get_node_voltage_phasors(dpsim_timeseries_list):
"""Calculate voltage phasors of all nodes
:param dpsim_timeseries_list: timeseries list retrieved from dpsim results
:return:
"""
voltage_phasor_list = {}
for ts in dpsim_timeseries_list:
ts_abs = ts.abs(ts.name + '_abs')
ts_phase = ts.phase(ts.name + '_phase')
ts_phasor = {}
ts_phasor['abs'] = ts_abs
ts_phasor['phase'] = ts_phase
voltage_phasor_list[ts.name] = ts_phasor
return voltage_phasor_list
def get_node_emt_voltages(timeseries_list, freq):
"""Calculate voltage phasors of all nodes
:param timeseries_list: timeseries list retrieved from dpsim results
:return:
"""
voltages_list = {}
for ts in timeseries_list:
ts_emt = ts.dynphasor_shift_to_emt(ts.name, freq)
voltages_list[ts.name] = ts_emt
return voltages_list
import matplotlib.pyplot as plt
import numpy as np
from .timeseries import *
def plot_timeseries(figure_id, timeseries, plt_linestyle='-', plt_linewidth=2, plt_color=None, plt_legend_loc='lower right'):
"""
This function plots either a single timeseries or several timeseries in the figure defined by figure_id.
Several timeseries (handed over in a list) are plotted in several subplots.
In order to plot several timeseries in one plot, the function is to be called several times (hold is activated).
"""
plt.figure(figure_id)
if not isinstance(timeseries, list):
if plt_color:
plt.plot(timeseries.time, timeseries.values, linestyle=plt_linestyle, label=timeseries.label, linewidth=plt_linewidth, color=plt_color)
else:
plt.plot(timeseries.time, timeseries.values, linestyle=plt_linestyle, label=timeseries.label, linewidth=plt_linewidth)
plt.gca().autoscale(axis='x', tight=True)
plt.legend(loc=plt_legend_loc)
else:
for ts in timeseries:
plt.subplot(len(timeseries), 1, timeseries.index(ts) + 1)
if plt_color:
plt.plot(ts.time, ts.values, linestyle=plt_linestyle, label=ts.label, linewidth=plt_linewidth, color=plt_color)
else:
plt.plot(ts.time, ts.values, linestyle=plt_linestyle, label=ts.label, linewidth=plt_linewidth)
plt.gca().autoscale(axis='x', tight=True)
plt.legend()
def set_timeseries_labels(timeseries, timeseries_labels):
"""
Sets label attribute of timeseries, later used in plotting functions.
Suitable for single timeseries as well as for several timeseries (handed over in a list).
"""
if not isinstance(timeseries, list):
timeseries.label = timeseries_labels
else:
for ts in timeseries:
ts.label = timeseries_labels[timeseries.index(ts)]
import matplotlib.pyplot as plt
import numpy as np
from .timeseries import *
def plot_timeseries(figure_id, timeseries, plt_linestyle='-', plt_linewidth=2, plt_color=None, plt_legend_loc='lower right'):
"""
This function plots either a single timeseries or several timeseries in the figure defined by figure_id.
Several timeseries (handed over in a list) are plotted in several subplots.
In order to plot several timeseries in one plot, the function is to be called several times (hold is activated).
"""
plt.figure(figure_id)
if not isinstance(timeseries, list):
if plt_color:
plt.plot(timeseries.time, timeseries.values, linestyle=plt_linestyle, label=timeseries.label, linewidth=plt_linewidth, color=plt_color)
else:
plt.plot(timeseries.time, timeseries.values, linestyle=plt_linestyle, label=timeseries.label, linewidth=plt_linewidth)
plt.gca().autoscale(axis='x', tight=True)
plt.legend(loc=plt_legend_loc)
else:
for ts in timeseries:
plt.subplot(len(timeseries), 1, timeseries.index(ts) + 1)
if plt_color:
plt.plot(ts.time, ts.values, linestyle=plt_linestyle, label=ts.label, linewidth=plt_linewidth, color=plt_color)
else:
plt.plot(ts.time, ts.values, linestyle=plt_linestyle, label=ts.label, linewidth=plt_linewidth)
plt.gca().autoscale(axis='x', tight=True)
plt.legend()
def set_timeseries_labels(timeseries, timeseries_labels):
"""
Sets label attribute of timeseries, later used in plotting functions.
Suitable for single timeseries as well as for several timeseries (handed over in a list).
"""
if not isinstance(timeseries, list):
timeseries.label = timeseries_labels
else:
for ts in timeseries:
ts.label = timeseries_labels[timeseries.index(ts)]
This diff is collapsed.
import numpy as np
import cmath
class TimeSeries:
"""Stores data from different simulation sources.
A TimeSeries object always consists of timestamps and datapoints.
"""
def __init__(self, name, time, values, label=""):
self.time = np.array(time)
self.values = np.array(values)
self.name = name
self.label = name
def scale(self, name, factor):
"""Returns scaled timeseries.
Assumes the same time steps for both timeseries.
"""
ts_scaled = TimeSeries(name, self.time, self.values * factor)
return ts_scaled
def abs(self, name):
""" Calculate absolute value of complex time series.
"""
abs_values = []
for value in self.values:
abs_values.append(np.abs(value))
ts_abs = TimeSeries(name, self.time, abs_values)
return ts_abs
def phase(self, name):
""" Calculate absolute value of complex time series.
"""
phase_values = []
for value in self.values:
phase_values.append(np.angle(value, deg=True))
ts_abs = TimeSeries(name, self.time, phase_values)
ts_phase = TimeSeries(name, self.time, phase_values)
return ts_phase
@staticmethod
def rmse(ts1, ts2):
""" Calculate root mean square error between two time series
"""
return np.sqrt((TimeSeries.diff('diff', ts1, ts2).values ** 2).mean())
@staticmethod
def diff(name, ts1, ts2):
"""Returns difference between values of two Timeseries objects.
"""
if len(ts1.time) == len(ts2.time):
ts_diff = TimeSeries(name, ts1.time, (ts1.values - ts2.values))
else: # different timestamps, common time vector and interpolation required before substraction
time = sorted(set(list(ts1.time) + list(ts2.time)))
interp_vals_ts1 = np.interp(time, ts1.time, ts1.values)
interp_vals_ts2 = np.interp(time, ts2.time, ts2.values)
ts_diff = TimeSeries(name, time, (interp_vals_ts2 - interp_vals_ts1))
return ts_diff
def dynphasor_shift_to_emt(self, name, freq):
""" Shift dynamic phasor values to EMT by frequency freq.
Assumes the same time steps for both timeseries.
:param name: name of returned time series
:param freq: shift frequency
:return: new timeseries with shifted time domain values
"""
ts_shift = TimeSeries(name, self.time, self.values.real*np.cos(2*np.pi*freq*self.time)
- self.values.imag*np.sin(2*np.pi*freq*self.time))
return ts_shift
def interpolate_cmpl(self, name, timestep):
""" Not tested yet!
Interpolates complex timeseries with timestep
:param name:
:param timestep:
:return:
"""
interpl_time = np.arange(self.time[0], self.time[-1], timestep)
realValues = interp1d(interpl_time, self.values.real)
imagValues = interp1d(interpl_time, self.values.imag)
ts_return = TimeSeries(name, time, np.vectorize(complex)(realValues, imagValues))
return timeseries
@staticmethod
def sep_dynphasor_shift_to_emt(name, real, imag, freq):
""" Shift dynamic phasor values to EMT by frequency freq.
Assumes the same time steps for both timeseries.
:param name: name of returned time series
:param real: timeseries with real values
:param imag: timeseries with imaginary values
:param freq: shift frequency
:return: new timeseries with shifted time domain values
"""
ts_shift = TimeSeries(name, real.time,
real.values * np.cos(2 * np.pi * freq * real.time) - imag.values * np.sin(
2 * np.pi * freq * real.time))
return ts_shift
@staticmethod
def check_node_number_comp(ts_list_comp, node):
"""
Check if node number is available in complex time series.
:param ts_comp: complex time series list
:param node: node number to be checked
:return: true if node number is available, false if out of range
"""
ts_comp_length = len(ts_comp)
im_offset = int(ts_comp_length / 2)
if im_offset <= node or node < 0:
print('Complex node not available')
return false
else:
return true
@staticmethod
def check_node_number(ts_list, node):
"""
Check if node number is available in time series.
:param ts: time series list
:param node: node number to be checked
:return: true if node number is available, false if out of range
"""
ts_length = len(ts)
if ts_length <= node or node < 0:
print('Node not available')
return false
else:
return true
@staticmethod
def complex_abs(name, ts_real, ts_imag):
""" Calculate absolute value of complex variable.
Assumes the same time steps for both timeseries.
"""
ts_complex = np.vectorize(complex)(ts_real.values, ts_imag.values)
ts_abs = TimeSeries(name, ts_real.time, ts_complex.abs())
import numpy as np
import cmath
class TimeSeries:
"""Stores data from different simulation sources.
A TimeSeries object always consists of timestamps and datapoints.
"""
def __init__(self, name, time, values, label=""):
self.time = np.array(time)
self.values = np.array(values)
self.name = name
self.label = name
def scale(self, name, factor):
"""Returns scaled timeseries.
Assumes the same time steps for both timeseries.
"""
ts_scaled = TimeSeries(name, self.time, self.values * factor)
return ts_scaled
def abs(self, name):
""" Calculate absolute value of complex time series.
"""
abs_values = []
for value in self.values:
abs_values.append(np.abs(value))
ts_abs = TimeSeries(name, self.time, abs_values)
return ts_abs
def phase(self, name):
""" Calculate absolute value of complex time series.
"""
phase_values = []
for value in self.values:
phase_values.append(np.angle(value, deg=True))
ts_abs = TimeSeries(name, self.time, phase_values)
ts_phase = TimeSeries(name, self.time, phase_values)
return ts_phase
@staticmethod
def rmse(ts1, ts2):
""" Calculate root mean square error between two time series
"""
return np.sqrt((TimeSeries.diff('diff', ts1, ts2).values ** 2).mean())
@staticmethod
def diff(name, ts1, ts2):
"""Returns difference between values of two Timeseries objects.
"""
if len(ts1.time) == len(ts2.time):
ts_diff = TimeSeries(name, ts1.time, (ts1.values - ts2.values))
else: # different timestamps, common time vector and interpolation required before substraction
time = sorted(set(list(ts1.time) + list(ts2.time)))
interp_vals_ts1 = np.interp(time, ts1.time, ts1.values)
interp_vals_ts2 = np.interp(time, ts2.time, ts2.values)
ts_diff = TimeSeries(name, time, (interp_vals_ts2 - interp_vals_ts1))
return ts_diff
def dynphasor_shift_to_emt(self, name, freq):
""" Shift dynamic phasor values to EMT by frequency freq.
Assumes the same time steps for both timeseries.
:param name: name of returned time series
:param freq: shift frequency
:return: new timeseries with shifted time domain values
"""
ts_shift = TimeSeries(name, self.time, self.values.real*np.cos(2*np.pi*freq*self.time)
- self.values.imag*np.sin(2*np.pi*freq*self.time))
return ts_shift
def interpolate_cmpl(self, name, timestep):
""" Not tested yet!
Interpolates complex timeseries with timestep
:param name:
:param timestep:
:return:
"""
interpl_time = np.arange(self.time[0], self.time[-1], timestep)
realValues = interp1d(interpl_time, self.values.real)
imagValues = interp1d(interpl_time, self.values.imag)
ts_return = TimeSeries(name, time, np.vectorize(complex)(realValues, imagValues))
return timeseries
@staticmethod
def sep_dynphasor_shift_to_emt(name, real, imag, freq):
""" Shift dynamic phasor values to EMT by frequency freq.
Assumes the same time steps for both timeseries.
:param name: name of returned time series
:param real: timeseries with real values
:param imag: timeseries with imaginary values
:param freq: shift frequency
:return: new timeseries with shifted time domain values
"""
ts_shift = TimeSeries(name, real.time,
real.values * np.cos(2 * np.pi * freq * real.time) - imag.values * np.sin(
2 * np.pi * freq * real.time))
return ts_shift
@staticmethod
def check_node_number_comp(ts_list_comp, node):
"""
Check if node number is available in complex time series.
:param ts_comp: complex time series list
:param node: node number to be checked
:return: true if node number is available, false if out of range
"""
ts_comp_length = len(ts_comp)
im_offset = int(ts_comp_length / 2)
if im_offset <= node or node < 0:
print('Complex node not available')
return false
else:
return true
@staticmethod
def check_node_number(ts_list, node):
"""
Check if node number is available in time series.
:param ts: time series list
:param node: node number to be checked
:return: true if node number is available, false if out of range
"""
ts_length = len(ts)
if ts_length <= node or node < 0:
print('Node not available')
return false
else:
return true
@staticmethod
def complex_abs(name, ts_real, ts_imag):
""" Calculate absolute value of complex variable.
Assumes the same time steps for both timeseries.
"""
ts_complex = np.vectorize(complex)(ts_real.values, ts_imag.values)
ts_abs = TimeSeries(name, ts_real.time, ts_complex.abs())
return ts_abs
\ No newline at end of file
#!/usr/bin/python
# -*- coding: UTF-8 -*-
import os
from readtools import *
def convert_neplan_to_modelica_timeseries(neplan_timeseries):
"""
Mapping the variable names between modelica and neplan
- Voltage: change *.U and *.ANGLEU to *.V and *.Vangle
- Current: remove unnecessary current variables
:param neplan_timeseries: result of neplan in timeseries
:return: a mapped neplan_timeseries
"""
line_del = []
# remove all the line current
# Find current of the same component, which means the current needn't to be validated
for check in range(len(neplan_timeseries)):
if neplan_timeseries[check].values[0] == '#':
line_del.append(check)
if '.P' in neplan_timeseries[check].name:
line_del.append(check)
if '.Q' in neplan_timeseries[check].name:
line_del.append(check)
for i in range(check + 1, len(neplan_timeseries)):
if neplan_timeseries[check].name == neplan_timeseries[i].name:
line_del.append(check) # delete list of the unnecessary data
line_del.append(i)
line_del = sorted(set(line_del))
for num_to_del in range(len(line_del)):
del neplan_timeseries[line_del[len(line_del) - num_to_del - 1]]