Commit 20fee08f authored by Jan Dinkelbach's avatar Jan Dinkelbach

Merge branch 'validation'

parents b8de0fa6 224a02d7
import re
import os
import sys
sys.path.append(os.path.normpath(os.getcwd() + "/data-processing/dataprocessing"))
from validationtools import *
from readtools import *
#from ModelicaModel import ModelicaModel
print("Test Start")
# We need to extract all the result files from git now
for files in os.listdir(
os.path.abspath("reference-results/Neplan/BasicGrids")):
# Assert the result, model result path read from cmd line
validate_modelica_res(os.path.splitext(files)[0],
os.path.abspath("reference-results/Modelica/BasicGrids/" +
os.path.splitext(files)[0] + ".mat"),
os.path.abspath("reference-results/Neplan/BasicGrids/" +
os.path.splitext(files)[0] + ".rlf"))
print("Test End")
#!/usr/bin/python
# -*- coding: UTF-8 -*-
import re
from dataprocessing.readtools import *
file = r"C:\Users\admin\Desktop\Load_read\Load_flow_WCSS.rlf"
# Example 1: Read in all variable
print('************************ Test for read in all variable start ****************')
result_ALL = read_timeseries_NEPLAN_loadflow(file)
for i in range(len(result_ALL)):
print('%s is %s' % (result_ALL[i].name, result_ALL[i].values)) # result as list of TimeSeries
print('************************ Test for read in all variable end ****************')
print('\n')
# Example 2: Read in specific variable
print('************************ Test for read in specific variable start ****************')
print('************************ Read in specific Voltage ****************')
result_U = read_timeseries_NEPLAN_loadflow(file, 'FOUR.U')
for i in range(len(result_U)):
print('%s is %s' % (result_U[i].name, result_U[i].values))
print('************************ Read in specific Voltage Angel ****************')
result_ANGELU = read_timeseries_NEPLAN_loadflow(file, 'FOUR.ANGELU')
for i in range(len(result_ANGELU)):
print('%s is %s' % (result_ANGELU[i].name, result_ANGELU[i].values))
print('************************ Read in specific Current ****************')
result_I = read_timeseries_NEPLAN_loadflow(file, 'LINE89.I')
for i in range(len(result_I)):
print('%s is %s' % (result_I[i].name, result_I[i].values))
print('************************ Read in specific Current Angel ****************')
result_ANGELI = read_timeseries_NEPLAN_loadflow(file, 'LINE89.ANGELI')
for i in range(len(result_ANGELI)):
print('%s is %s' % (result_ANGELI[i].name, result_ANGELI[i].values))
print('************************ Test for read in specific variable end ****************')
print('\n')
# Example 3: Read in using regular expression
print('************************ Test for read in using Regular Expression start ****************')
print('************************ Read in Current using Regular Expression ****************')
result_I_REG = read_timeseries_NEPLAN_loadflow(file, '^.*\.I$', True)
for i in range(len(result_I_REG)):
print('%s is %s' % (result_I_REG[i].name, result_I_REG[i].values))
print('************************ Read in Current Angel using Regular Expression ****************')
result_ANGERLI_REG = read_timeseries_NEPLAN_loadflow(file, '^.*\.ANGELI$', True)
for i in range(len(result_ANGERLI_REG)):
print('%s is %s' % (result_ANGERLI_REG[i].name, result_ANGERLI_REG[i].values))
print('************************ Read in Voltage using Regular Expression ****************')
result_U_REG = read_timeseries_NEPLAN_loadflow(file, '^.*\.U$', True)
for i in range(len(result_U_REG)):
print('%s is %s' % (result_U_REG[i].name, result_U_REG[i].values))
print('************************ Read in Voltage Angel using Regular Expression ****************')
result_ANGELU_REG = read_timeseries_NEPLAN_loadflow(file, '^.*\.ANGELU$', True)
for i in range(len(result_ANGELU_REG)):
print('%s is %s' % (result_ANGELU_REG[i].name, result_ANGELU_REG[i].values))
print('************************ Test for read in using Regular Expression end ****************')
\ No newline at end of file
......@@ -12,6 +12,7 @@ def read_timeseries_Modelica(filename, timeseries_names=None, is_regex=False):
timeseries = []
for name in sim.names():
timeseries.append(TimeSeries(name, sim(name).times(), sim(name).values()))
timeseries_names = sim.names()
elif is_regex is True:
# Read in variables which match with regex
timeseries = []
......@@ -136,3 +137,181 @@ def read_dpsim_log(log_path):
log_sections[section].append(line_pos)
return log_lines, log_sections
def read_timeseries_PLECS(filename, timeseries_names=None):
pd_df = pd.read_csv(filename)
timeseries_list = []
if timeseries_names is None:
# No trajectory names specified, thus read in all
timeseries_names = list(pd_df.columns.values)
timeseries_names.remove('Time')
for name in timeseries_names:
timeseries_list.append(TimeSeries(name, pd_df['Time'].values, pd_df[name].values))
else:
# Read in specified time series
for name in timeseries_names:
timeseries_list.append(TimeSeries(name, pd_df['Time'].values, pd_df[name].values))
print('PLECS results column names: ' + str(timeseries_names))
print('PLECS results number: ' + str(len(timeseries_list)))
return timeseries_list
def read_timeseries_NEPLAN_loadflow(file_name, timeseries_names=None, is_regex=False):
"""
Read in NEPLAN loadflow result from result file, the result is in angle notation, amplitude and angle are stored
separately
To keep consistent with the names of voltage in most cases, the name of voltage variables are changed into '.V*'
instead of '.U*' as in the result file
:param file_name: name of the mat file for the loadflow result from neplan
:param timeseries_names: column name to be read
:param is_regex: flag for using regular expression
:return: list of Timeseries objects
"""
str_tmp = open(file_name, "r") # Read in files
low = 0 # flag for the start of a new data in str_cmp
high = 0 # flag for the end of this new data in str_cmp
flag = True # To judge if this is the first line of the file, which will be the names for the data type
# Read in data from result file of neplan
seq = [] # list for data type names
value = [] # list for data
namelist = ['U', 'ANGLEU', 'P', 'Q', 'I', 'ANGLEI'] # Suffix of the data name
timeseries = []
line_del = [] # a list for the value to be deleted
isfloat = re.compile(r'^[-+]?[0-9]+\.[0-9]+$') # regular expression to find float values
# Transfer ',' in the floats in result file to '.'
for line in str_tmp.readlines(): # Check the data to find out floats with ','
line = line.replace(",", ".")
high -= high
low -= low
del value[:]
# read in different data and start processing
for letter in line:
if letter == " " or letter == "\n": # different data(separated by ' ') or end(/n)
if low is not high: # if low is equal to high, no data read in
if flag: # first line of the file, list for data-type name
seq.append(line[low:high])
else: # not first line of the file,list for data
if isfloat.match(line[low:high]):
value.append(float(line[low:high]))
else:
value.append(line[low:high])
else: # no data for this datatype
value.append(r'#') # No value, set as #
low = high + 1 # refresh low flag
high += 1
"""
A typical line current in neplan has two parts from both end, but we doesn't have to calculate them
with the assumption that the topology of the gird should be correct with which we can validate the
current by comparing the voltage of the nodes connected to the ends of the line
"""
if flag is not True: # flag is true when it's the first line
if value[3] is not '#':
for m in range(6):
timeseries.append(TimeSeries(value[3] + '.' + namelist[m],
np.array([0., 1.]), np.array([value[m + 6], value[m + 6]])))
else:
for m in range(2):
timeseries.append(TimeSeries(value[1] + '.' + namelist[m],
np.array([0., 1.]), np.array([value[m + 6], value[m + 6]])))
flag = False
str_tmp.close()
# Read in variables which match with regex
if is_regex is True:
p = re.compile(timeseries_names)
length = len(timeseries)
for rule_check in range(length):
if p.search(timeseries[rule_check].name):
pass
else:
line_del.append(rule_check)
# Read in specified time series
elif timeseries_names is not None:
length = len(timeseries)
for rule_check in range(length):
if timeseries_names == timeseries[rule_check].name:
pass
else:
line_del.append(rule_check)
# delete those values that are not needed.
line_del = set(line_del)
line_del = sorted(line_del)
for num_to_del in range(len(line_del)):
del timeseries[line_del[len(line_del) - num_to_del - 1]]
return timeseries
def read_timeseries_simulink_loadflow(file_name, timeseries_names=None, is_regex=False):
"""
Read in simulink load-flow result from result file(.rep), the result is in angle notation, amplitude and angle are stored
separately.
A suffix is used to tag different data for a component:
.Arms/.IDegree for current/current angle,
.Vrms/.VDegree for voltage/voltage angle.
:param file_name:path of the .rep file for the loadflow result from simulink
:param timeseries_names: specific values to be read
:param is_regex: flag for using regular expression
:return: list of Timeseries objects
"""
str_tmp = open(file_name, 'r', encoding='latin-1') # Read in files, using latin-1 to decode /xb0
# Read in data from result file of neplan
name = [] # list for data type names
value = [] # list for data
timeseries = []
line_del = [] # a list for the value to be deleted
for line in str_tmp.readlines():
line = line.replace("°", "")
del value[:]
del name[:]
# read in different data and start processing
if len(line) > 37:
if line[31:35] == '--->':
if line[13:17] == 'Arms':
name = [line[37:len(line)].rstrip() + '.Arms', line[37:len(line)].rstrip() + '.IDegree']
elif line[13:17] == 'Vrms':
name = [line[37:len(line)].rstrip() + '.Vrms', line[37:len(line)].rstrip() + '.VDegree']
value = [float(line[0:13]), float(line[18:31])]
timeseries.append(TimeSeries(name[0],
np.array([0., 1.]), np.array([value[0], value[0]])))
timeseries.append(TimeSeries(name[1],
np.array([0., 1.]), np.array([value[1], value[1]])))
# Read in variables which match with regex
if is_regex is True:
p = re.compile(timeseries_names)
length = len(timeseries)
for rule_check in range(length):
if p.search(timeseries[rule_check].name):
pass
else:
line_del.append(rule_check)
# Read in specified time series
elif timeseries_names is not None:
length = len(timeseries)
for rule_check in range(length):
if timeseries_names == timeseries[rule_check].name:
pass
else:
line_del.append(rule_check)
# delete those values that are not needed.
line_del = set(line_del)
line_del = sorted(line_del)
for num_to_del in range(len(line_del)):
del timeseries[line_del[len(line_del) - num_to_del - 1]]
return timeseries
#!/usr/bin/python
# -*- coding: UTF-8 -*-
import os
from readtools import *
"""
The validationtools are used to validate the simulate results from the model.
A typical process to validate a model contains four parts
- Building & Running the module to get results
- Reading in the results
- Mapping the results with the reference results
- Asserting the module
The first step is done by Py4Mod package, the second by readtool.
The validationtool focuses on the last two steps: the conversion function converts the reference-results
timeseries into the modelica timeseries (mapping the names and units), the comparision function compares two
timeseries, and the assert function gives an assertion to the result comparison.
At last, a top level validation function is introduced to organize the whole job.
"""
def convert_neplan_to_modelica_timeseries(neplan_timeseries):
"""
Mapping the variable names between modelica and neplan
- Voltage: change *.U and *.ANGLEU to *.V and *.Vangle
- Current: remove unnecessary current variables
:param neplan_timeseries: result of neplan in timeseries
:return: a mapped neplan_timeseries
"""
line_del = []
# remove all the line current
# Find current of the same component, which means the current don't need to be validated
for check in range(len(neplan_timeseries)):
if neplan_timeseries[check].values[0] == '#':
line_del.append(check)
if '.P' in neplan_timeseries[check].name:
line_del.append(check)
if '.Q' in neplan_timeseries[check].name:
line_del.append(check)
for i in range(check + 1, len(neplan_timeseries)):
if neplan_timeseries[check].name == neplan_timeseries[i].name:
line_del.append(check) # delete list of the unnecessary data
line_del.append(i)
line_del = sorted(set(line_del))
for num_to_del in range(len(line_del)):
del neplan_timeseries[line_del[len(line_del) - num_to_del - 1]]
# Change the unit of variables to keep consistent with those in modelica
for i in range(len(neplan_timeseries)):
if 'ANGLE' in neplan_timeseries[i].name:
neplan_timeseries[i].values = neplan_timeseries[i].values / 180 * cmath.pi # unification of the unit
elif '.U' in neplan_timeseries[i].name or '.I' in neplan_timeseries[i].name:
neplan_timeseries[i].values = neplan_timeseries[i].values * 1000
# Change the name of variables to keep consistent with those in modelica
for i in range(len(neplan_timeseries)):
neplan_timeseries[i].name = neplan_timeseries[i].name.replace(' ', '')
neplan_timeseries[i].name = neplan_timeseries[i].name.replace('.ANGLEU', '.Vangle')
neplan_timeseries[i].name = neplan_timeseries[i].name.replace('.U', '.Vpp')
neplan_timeseries[i].name = neplan_timeseries[i].name.replace('.ANGLEI', '.Iangle')
return neplan_timeseries
def convert_simulink_to_modelica_timeseries(simseri):
"""
Convert the steady-state results timeseries from simulink to modelica timeseries
:param simseri: simulate timeseries, generated by the result file from simulink
:return: a result timeseries
"""
res = []
for check in range(len(simseri)):
if 'U AB:' in simseri[check].name:
simseri[check].name = simseri[check].name.replace('U AB:', '')
simseri[check].name = simseri[check].name.replace('Vrms', 'Vpp')
simseri[check].name = simseri[check].name.replace('VDegree', 'Vangle')
simseri[check].name = simseri[check].name.replace(' ', '')
simseri[check].name = simseri[check].name.replace('_', '')
if 'Vangle' in simseri[check].name:
simseri[check].values = (simseri[check].values - 30)/180 * cmath.pi
res.append(simseri[check])
return res
def compare_timeseries(ts1, ts2):
"""
Compare the result from two timeseries.
:param ts1: timeseries
:param ts2: timeseries
:return: an error dic
"""
if len(ts1) > len(ts2):
tmp = ts2
ts2 = ts1
ts1 = tmp
for i in range(len(ts1)):
ts1[i].name = ts1[i].name.upper()
for i in range(len(ts2)):
ts2[i].name = ts2[i].name.upper()
timeseries_names = [] # list for names of components
timeseries_error = [] # list for error
len_ts1 = len(ts1)
len_limit = len(ts2)
# Match the components in result files, and compare them
for i in range(len_ts1):
flag_not_found = False
for j in range(len_limit):
if ts1[i].name == ts2[j].name: # Find the same variable
timeseries_names.append(ts1[i].name)
if ts1[i].values[0] == 0:
timeseries_error.append(TimeSeries.rmse(ts2[j], ts1[i])) # is it good to do so?
else:
timeseries_error.append(TimeSeries.rmse(ts2[j], ts1[i])/ts1[i].values[0])
print(ts1[i].name)
print(timeseries_error[len(timeseries_error) - 1])
flag_not_found = True
if flag_not_found is False:
# No such variable in Modelica model, set the error to -1
timeseries_names.append(ts1[i].name)
timeseries_error.append(-1)
return dict(zip(timeseries_names, timeseries_error))
def assert_modelia_results(net_name, error, threshold):
"""
assert the result data of a net.
:param net_name: name of the network
:param modelica_res: timeseries of modelica result
:param simulink_res: timeseries of reference result
:return: outputs to command line which are the results of the assert
"""
fail_list = [] # List for all the failed test
# the limitations are set to 0.5
for name in error.keys():
if abs(error[name]) > threshold:
fail_list.append(name)
else:
print("Test on %s Passed" % name)
# fail_list is 0, which means all the tests are passed
if len(fail_list) is 0:
print("\033[1;36;40mModel %s Passed\033[0m" % net_name)
else:
for name in fail_list:
print("\033[1;31;40mTest on %s of %s Failed\033[0m" % (name, net_name))
raise ValueError('Test on %s is not passed!' % net_name)
def validate_modelica_res(net_name, modelica_res_path, reference_res_path, threshold=0.5):
"""
Top level function for the validation of modelica, calls all the function needed to execute the validation.
:param modelica_res_path: the path of the modelica result file, whose suffix should be .mat
:param reference_res_path: the path of the reference result file, whose suffix should be .rep(simulink)/.rlf(neplan)
:param threshold: the threshold of the assertion, a default value of 0.5 is introduced.
:return: outputs to command line which are the results of the validation.
"""
res_mod = read_timeseries_Modelica (modelica_res_path)
if os.path.splitext(reference_res_path)[1] == '.rep':
res_ref = convert_simulink_to_modelica_timeseries(read_timeseries_simulink_loadflow(reference_res_path))
elif os.path.splitext(reference_res_path)[1] == '.rlf':
res_ref = convert_neplan_to_modelica_timeseries(read_timeseries_NEPLAN_loadflow(reference_res_path))
res_err = compare_timeseries(res_ref, res_mod)
assert_modelia_results(net_name, res_err, threshold)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment