Commit 029993e8 authored by Steffen Vogel's avatar Steffen Vogel 🎅🏼

Merge branch 'master' of git.rwth-aachen.de:acs/public/simulation/data-processing

parents fad1d0f5 f92f812c
build
dist
# ignore results
*.csv
*.mat
......
__all__ = ["readtools", "plottools", "timeseries"]
\ No newline at end of file
from dataprocessing.readtools import *
from dataprocessing.timeseries import *
from dataprocessing.plottools import *
def get_node_voltage_phasors(dpsim_timeseries_list):
"""Calculate voltage phasors of all nodes
......@@ -24,7 +25,7 @@ def get_node_emt_voltages(timeseries_list, freq):
"""
voltages_list = {}
for name, ts in timeseries_list.items():
ts_emt = ts.dynphasor_shift_to_emt(ts.name, freq)
ts_emt = ts.frequency_shift(ts.name, freq)
voltages_list[ts.name] = ts_emt
return voltages_list
......@@ -100,7 +100,7 @@ def read_timeseries_simulink(filename, timeseries_names=None):
return timeseries_list
def read_timeseries_dpsim(filename, timeseries_names=None):
def read_timeseries_dpsim(filename, timeseries_names=None, print_status=True):
"""Reads complex time series data from DPsim log file. Real and
imaginary part are stored in one complex variable.
:param filename: name of the csv file that has the data
......@@ -122,38 +122,76 @@ def read_timeseries_dpsim(filename, timeseries_names=None):
timestamps = pd_df.iloc[:, 0]
# Find real and complex variable names
real_string = '.real'
imaginary_string = '.imag'
suffixes = [ ('_re', '_im'), ('.real', '.imag') ]
for column in column_names:
if real_string in column:
tmp = column.replace(real_string, '')
cmpl_result_columns.append(tmp)
#print("Found complex variable: " + tmp)
elif not imaginary_string in column:
real_result_columns.append(column)
#print("Found real variable: " + column)
is_complex = False
for suffix in suffixes:
real_suffix = suffix[0]
imag_suffix = suffix[1]
if column.endswith(imag_suffix):
is_complex = True
break # Ignore imag columns
if column.endswith(real_suffix):
is_complex = True
column_base = column.replace(real_suffix, '')
if column_base + imag_suffix not in column_names:
continue
cmpl_result_columns.append(column_base)
timeseries_list[column_base] = TimeSeries(column_base, timestamps,
np.vectorize(complex)(
pd_df[column_base + real_suffix],
pd_df[column_base + imag_suffix]
)
)
break
for column in real_result_columns:
timeseries_list[column] = TimeSeries(column, timestamps, pd_df[column])
if is_complex:
continue
for column in cmpl_result_columns:
try:
timeseries_list[column] = TimeSeries(column, timestamps,
np.vectorize(complex)(
pd_df[column + real_string],
pd_df[column + imaginary_string]
)
)
except:
pass
real_result_columns.append(column)
timeseries_list[column] = TimeSeries(column, timestamps, pd_df[column])
else:
# Read in specified time series
print('cannot read specified columns yet')
print('DPsim results real column names: ' + str(real_result_columns))
print('DPsim results complex column names: ' + str(cmpl_result_columns))
print('DPsim results variable number: ' + str(len(timeseries_list)))
print('DPsim results length: ' + str(len(timestamps)))
if print_status :
print('DPsim results real column names: ' + str(real_result_columns))
print('DPsim results complex column names: ' + str(cmpl_result_columns))
print('DPsim results variable number: ' + str(len(timeseries_list)))
print('DPsim results length: ' + str(len(timestamps)))
return timeseries_list
def read_dpsim_log(log_path):
log_file = open(log_path, "r")
log_lines = [line for line in log_file]
log_file.close()
# Sectionize
log_sections = {'init':[], 'none':[], 'sysmat_stamp':[], 'sysmat_final':[], 'sourcevec_stamp':[], 'sourcevec_final':[], 'ludecomp':[]}
section = 'init'
for line_pos in range(len(log_lines)):
if re.search('DEBUG: Stamping .+ into system matrix:', log_lines[line_pos]):
section = 'sysmat_stamp'
elif re.search('INFO: System matrix:', log_lines[line_pos]):
section = 'sysmat_final'
elif re.search('DEBUG: Stamping .+ into source vector:', log_lines[line_pos]):
section = 'sourcevec_stamp'
elif re.search('INFO: Right side vector:', log_lines[line_pos]):
section = 'sourcevec_final'
elif re.search('INFO: LU decomposition:', log_lines[line_pos]):
section = 'ludecomp'
elif re.search('INFO: Number of network simulation nodes:', log_lines[line_pos]):
section = 'none'
elif re.search('INFO: Added .+ to simulation.', log_lines[line_pos]):
section = 'none'
elif re.search('INFO: Initial switch status:', log_lines[line_pos]):
section = 'none'
log_sections[section].append(line_pos)
return log_lines, log_sections
......@@ -37,6 +37,31 @@ class TimeSeries:
ts_phase = TimeSeries(name, self.time, phase_values)
return ts_phase
def phasor(self, name):
"""Calculate phasor of complex time series and return dict with abs and phase.
"""
ts_abs = ts.abs(ts.name + '_abs')
ts_phase = ts.phase(ts.name + '_phase')
ts_phasor = {}
ts_phasor['abs'] = ts_abs
ts_phasor['phase'] = ts_phase
return ts_phasor
@staticmethod
def frequency_shift_list(timeseries_list, freq):
"""Calculate shifted frequency results of all time series
:param timeseries_list: timeseries list retrieved from dpsim results
:param freq: frequency by which the timeseries should be shifted
:return: list of shifted time series
"""
result_list = {}
for name, ts in timeseries_list.items():
ts_emt = ts.frequency_shift(ts.name, freq)
result_list[ts.name] = ts_emt
return result_list
@staticmethod
def rmse(ts1, ts2):
""" Calculate root mean square error between two time series
......@@ -69,7 +94,7 @@ class TimeSeries:
ts_diff = TimeSeries(name, time, (interp_vals_ts2 - interp_vals_ts1))
return ts_diff
def dynphasor_shift_to_emt(self, name, freq):
def frequency_shift(self, name, freq):
""" Shift dynamic phasor values to EMT by frequency freq.
Assumes the same time steps for both timeseries.
:param name: name of returned time series
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment