Commit 6a8eb75a authored by Jan Dinkelbach's avatar Jan Dinkelbach
Browse files

merging dev-bli denying file removals

parents a3ba9ed8 1ccf9738
......@@ -4,6 +4,7 @@
# ignore symbolic links
*.egg-info
*.eggs
# ignore compiled python files
*.pyc
......
import re
import os
import sys
from dataprocessing.Validationtools import *
from dataprocessing.readtools import *
print("Test Start")
# We need to extract all the result files from git now
for files in os.listdir(
os.path.abspath("reference-results/Neplan/BasicGrids")):
# Assert the result, model result path read from cmd line
assert_modelia_neplan_results(os.path.splitext(files)[0],
os.path.abspath("reference-results/Modelica/BasicGrids/" +
os.path.splitext(files)[0] + ".mat"),
os.path.abspath("reference-results/Neplan/BasicGrids/" +
os.path.splitext(files)[0] + ".rlf"))
print("Test End")
......@@ -2,7 +2,7 @@ import numpy as np
import pandas as pd
from .timeseries import *
import re
import cmath
def read_timeseries_Modelica(filename, timeseries_names=None, is_regex=False):
from modelicares import SimRes
......@@ -12,6 +12,7 @@ def read_timeseries_Modelica(filename, timeseries_names=None, is_regex=False):
timeseries = []
for name in sim.names():
timeseries.append(TimeSeries(name, sim(name).times(), sim(name).values()))
timeseries_names = sim.names()
elif is_regex is True:
# Read in variables which match with regex
timeseries = []
......@@ -29,8 +30,8 @@ def read_timeseries_Modelica(filename, timeseries_names=None, is_regex=False):
for name in timeseries_names:
timeseries.append(TimeSeries(name, sim(name).times(), sim(name).values()))
print('Modelica results column names: ' + str(timeseries_names))
print('Modelica results number: ' + str(len(timeseries_list)))
#print('Modelica results column names: ' + str(timeseries_names))
#print('Modelica results number: ' + str(len(timeseries_names)))
return timeseries
......@@ -178,3 +179,95 @@ def read_timeseries_dpsim_cmpl_separate(filename, timeseries_names=None):
for result in timeseries_list:
print(result.name)
return timeseries_list
def read_timeseries_NEPLAN_loadflow(file_name, timeseries_names=None, is_regex=False):
"""
Read in NEPLAN loadflow result from result file, the result is in angle notation, amplitude and angle are stored
separately
To keep consistent with the names of voltage in most cases, the name of voltage variables are changed into '.V*'
instead of '.U*' as in the result file
:param file_name: name of the mat file for the loadflow result from neplan
:param timeseries_names: column name to be read
:param is_regex: flag for using regular expression
:return: list of Timeseries objects
"""
str_tmp = open(file_name, "r") # Read in files
low = 0 # flag for the start of a new data in str_cmp
high = 0 # flag for the end of this new data in str_cmp
flag = True # To judge if this is the first line of the file, which will be the names for the data type
# Read in data from result file of neplan
seq = [] # list for data type names
value = [] # list for data
namelist = ['U', 'ANGLEU', 'P', 'Q','I', 'ANGLEI']
timeseries = []
line_del = [] # a list for the value to be deleted
isfloat = re.compile(r'^[-+]?[0-9]+\.[0-9]+$') # regular expression to find float values
# the ',' in the floats in result file to '.'
for line in str_tmp.readlines():
line = line.replace(",", ".")
high -= high
low -= low
del value[:]
# read in different data and start processing
for letter in line:
if letter == " " or letter == "\n": # different data(separated by ' ') or end(/n)
if low is not high: # if low is equal to high, no data read in
if flag: # first line of the file, list for data-type name
seq.append(line[low:high])
else: # not first line of the file,list for data
if isfloat.match(line[low:high]):
value.append(float(line[low:high]))
else:
value.append(line[low:high])
else: # no data for this datatype
value.append(r'#') # No value, set as #
low = high + 1 # refresh low flag
high += 1
"""
A typical line current in neplan has two parts from both end, but we doesn't have to calculate them
with the assumption that the topology of the gird should be correct with which we can validate the
current by comparing the voltage of the nodes connected to the ends of the line
"""
if flag is not True:
if value[3] is not '#':
for m in range(6):
timeseries.append(TimeSeries(value[3] + '.' + namelist[m],
np.array([0., 1.]), np.array([value[m + 6], value[m + 6]])))
else:
for m in range(2):
timeseries.append(TimeSeries(value[1] + '.' + namelist[m],
np.array([0., 1.]), np.array([value[m + 6], value[m + 6]])))
flag = False
str_tmp.close()
# Read in variables which match with regex
if is_regex is True:
p = re.compile(timeseries_names)
length = len(timeseries)
for rule_check in range(length):
if p.search(timeseries[rule_check].name):
pass
else:
line_del.append(rule_check)
# Read in specified time series
elif timeseries_names is not None:
length = len(timeseries)
for rule_check in range(length):
if timeseries_names == timeseries[rule_check].name:
pass
else:
line_del.append(rule_check)
# delete those values that are not needed.
line_del = set(line_del)
line_del = sorted(line_del)
for num_to_del in range(len(line_del)):
del timeseries[line_del[len(line_del) - num_to_del - 1]]
return timeseries
#!/usr/bin/python
# -*- coding: UTF-8 -*-
import os
from dataprocessing.readtools import *
def convert_neplan_to_modelica_timeseries(neplan_timeseries):
"""
Mapping the variable names between modelica and neplan
- Voltage: change *.U and *.ANGLEU to *.V and *.Vangle
- Current: remove unnecessary current variables
:param neplan_timeseries: result of neplan in timeseries
:return: a mapped neplan_timeseries
"""
line_del = []
# remove all the line current
# Find current of the same component, which means the current needn't to be validated
for check in range(len(neplan_timeseries)):
if neplan_timeseries[check].values[0] == '#':
line_del.append(check)
if '.P' in neplan_timeseries[check].name:
line_del.append(check)
if '.Q' in neplan_timeseries[check].name:
line_del.append(check)
for i in range(check + 1, len(neplan_timeseries)):
if neplan_timeseries[check].name == neplan_timeseries[i].name:
line_del.append(check) # delete list of the unnecessary data
line_del.append(i)
line_del = sorted(set(line_del))
for num_to_del in range(len(line_del)):
del neplan_timeseries[line_del[len(line_del) - num_to_del - 1]]
# Change the unit of variables to keep consistent with those in modelica
for i in range(len(neplan_timeseries)):
if 'ANGLE' in neplan_timeseries[i].name:
neplan_timeseries[i].values = neplan_timeseries[i].values / 180 * cmath.pi # unification of the unit
elif '.U' in neplan_timeseries[i].name or '.I' in neplan_timeseries[i].name:
neplan_timeseries[i].values = neplan_timeseries[i].values * 1000
# Change the name of variables to keep consistent with those in modelica
for i in range(len(neplan_timeseries)):
neplan_timeseries[i].name = neplan_timeseries[i].name.replace(' ', '')
neplan_timeseries[i].name = neplan_timeseries[i].name.replace('.ANGLEU', '.Vangle')
neplan_timeseries[i].name = neplan_timeseries[i].name.replace('.U', '.Vpp')
neplan_timeseries[i].name = neplan_timeseries[i].name.replace('.ANGLEI', '.Iangle')
return neplan_timeseries
def compare_modelica_neplan(modelica_res, neplan_res): # compare the result file from NEPLAN and Modelica
"""
Compare Results from modelic and neplan, the name of the components should be kept consistent.
:param modelica_res: the path of the modelica result file, whose suffix should be .mat
:param neplan_res: the path of the neplan result file, whose suffix should be .rlf
:return:
"""
# Read in original neplan result file
file_Neplan = os.path.abspath(neplan_res)
# Read in original Modelica result file
file_Modelica = os.path.abspath(modelica_res)
result_neplan = convert_neplan_to_modelica_timeseries(read_timeseries_NEPLAN_loadflow(file_Neplan))
result_modelica = read_timeseries_Modelica(file_Modelica)
# Transfer the angle unit to degree
for i in range(len(result_neplan)):
result_neplan[i].name = result_neplan[i].name.upper()
if 'ANGLE' in result_neplan[i].name:
result_neplan[i].values = result_neplan[i].values / cmath.pi * 180
for i in range(len(result_modelica)):
result_modelica[i].name = result_modelica[i].name.upper()
if 'ANGLE' in result_modelica[i].name:
result_modelica[i].values = result_modelica[i].values / cmath.pi * 180
timeseries_names = [] # list for names of components
timeseries_error = [] # list for error
len_limit = len(result_modelica)
# Match the components in result files, and compare them
for i in range(len(result_neplan)):
flag_not_found = False
for j in range(len_limit):
if result_neplan[i].name == result_modelica[j].name: # Find the same variable
timeseries_names.append(result_neplan[i].name)
timeseries_error.append(TimeSeries.rmse(result_modelica[j], result_neplan[i]))
flag_not_found = True
if flag_not_found is False:
# No such variable in Modelica model, set the error to -1
timeseries_error.append(-1)
return dict(zip(timeseries_names, timeseries_error))
def assert_modelia_neplan_results(net_name, modelica_res, neplan_res): # Assert the model using the function above
"""
Assert the result in Modelica according to the results from neplan
:param net_name: The name of the net should be clarified manually
:param modelica_res: the path of the modelica result file, whose suffix should be .mat
:param neplan_res: the path of the neplan result file, whose suffix should be .rlf
:return:
"""
fail_list = [] # List for all the failed test
error = compare_modelica_neplan(modelica_res, neplan_res)
# the limitations are set to 0.5
for name in error.keys():
if abs(error[name]) > 0.5:
fail_list.append(name)
else:
print("Test on %s Passed" % name)
# fail_list is 0, which means all the tests are passed
if len(fail_list) is 0:
print("\033[1;36;40mModel %s Passed\033[0m" % net_name)
else:
for name in fail_list:
print("\033[1;31;40mTest on %s of %s Failed\033[0m" % (name, net_name))
raise ValueError('Test on %s is not passed!' % net_name)
#!/usr/bin/python
# -*- coding: UTF-8 -*-
import re
from dataprocessing.readtools import *
file = r"C:\Users\admin\Desktop\Load_read\Load_flow_WCSS.rlf"
# Example 1: Read in all variable
print('************************ Test for read in all variable start ****************')
result = read_timeseries_NEPLAN_loadflow(file)
print(result[24].values)
for i in range(len(result)):
print('%s is %s' % (result[i].name, result[i].values)) # result as list of TimeSeries
print('************************ Test for read in all variable end ****************')
print('\n')
# Example 2: Read in specific variable
print('************************ Test for read in specific variable start ****************')
print('************************ Read in specific Voltage ****************')
result2 = read_timeseries_NEPLAN_loadflow(file, 'FOUR.U')
for i in range(len(result2)):
print('%s is %s' % (result2[i].name, result2[i].values))
print('************************ Read in specific Current ****************')
result3 = read_timeseries_NEPLAN_loadflow(file, 'LINE89.I')
for i in range(len(result3)):
print('%s is %s' % (result3[i].name, result3[i].values))
print('************************ Test for read in specific variable end ****************')
print('\n')
# Example 3: Read in using regular expression
print('************************ Test for read in using Regular Expression start ****************')
result4 = read_timeseries_NEPLAN_loadflow(file, '^.*\.I$', True)
for i in range(len(result4)):
print('%s is %s' % (result4[i].name, result4[i].values))
print('************************ Test for read in using Regular Expression end ****************')
print('\n')
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment