Skip to content
Snippets Groups Projects
Commit 9a683852 authored by Bichen Li's avatar Bichen Li
Browse files

- Remove unification part frome read_timeseries_NEPLAN_loadflow

- Add mapping_modcelica_neplan function which makes the data structure of nepla$
- Update Assert_Results function
- update .gitignore
parent c79ebffc
No related branches found
No related tags found
No related merge requests found
......@@ -32,4 +32,6 @@ dymosim*
__pycache__
# ignore jupyter notebook files
.ipynb_checkpoints
\ No newline at end of file
.ipynb_checkpoints
Test.py
......@@ -8,8 +8,11 @@ print("Test Start")
# We need to extract all the result files from git now
for files in os.listdir(
os.path.abspath("reference-results/Neplan/BasicGrids")):
if os.path.splitext(files)[0] == "Slack_Rxline_PQLoad":
pass
else:
assert_modelia_neplan_results(os.path.splitext(files)[0]) # Assert the result, model result path read from cmd line
# Assert the result, model result path read from cmd line
assert_modelia_neplan_results(os.path.splitext(files)[0],
os.path.abspath("reference-results/Modelica/BasicGrids/" +
os.path.splitext(files)[0] + ".mat"),
os.path.abspath("reference-results/Neplan/BasicGrids/" +
os.path.splitext(files)[0] + ".rlf"))
print("Test End")
#!/usr/bin/python
# -*- coding: UTF-8 -*-
import re
import os
import sys
from dataprocessing.readtools import *
def mapping_modcelica_neplan(neplan_timeseries):
"""
Mapping the variable names between modelica and neplan
- Voltage: change *.U and *.ANGLEU to *.V and *.Vangle
- Current: remove unnecessary current variables
:param neplan_timeseries: result of neplan in timeseries
:return: a mapped neplan_timeseries
"""
line_del = []
# remove all the line current
def compare_modelica_neplan(Net_Name): # compare the result file from NEPLAN and Modelica
# Read in original nepaln result file
file_Neplan = os.path.abspath("reference-results/Neplan/BasicGrids/" + Net_Name + ".rlf")
# Find current of the same component, which means the current needn't to be validated
for check in range(len(neplan_timeseries)):
if neplan_timeseries[check].values[0] == '#':
line_del.append(check)
if '.P' in neplan_timeseries[check].name:
line_del.append(check)
if '.Q' in neplan_timeseries[check].name:
line_del.append(check)
for i in range(check + 1, len(neplan_timeseries)):
if neplan_timeseries[check].name == neplan_timeseries[i].name:
line_del.append(check) # delete list of the unnecessary data
line_del.append(i)
line_del = sorted(set(line_del))
for num_to_del in range(len(line_del)):
del neplan_timeseries[line_del[len(line_del) - num_to_del - 1]]
# Change the unit of variables to keep consistent with those in modelica
for i in range(len(neplan_timeseries)):
if 'ANGLE' in neplan_timeseries[i].name:
neplan_timeseries[i].values = neplan_timeseries[i].values / 180 * cmath.pi # unification of the unit
elif '.U' in neplan_timeseries[i].name or '.I' in neplan_timeseries[i].name:
neplan_timeseries[i].values = neplan_timeseries[i].values * 1000
# Change the name of variables to keep consistent with those in modelica
for i in range(len(neplan_timeseries)):
neplan_timeseries[i].name = neplan_timeseries[i].name.replace(' ', '')
neplan_timeseries[i].name = neplan_timeseries[i].name.replace('.ANGLEU', '.Vangle')
neplan_timeseries[i].name = neplan_timeseries[i].name.replace('.U', '.Vpp')
neplan_timeseries[i].name = neplan_timeseries[i].name.replace('.ANGLEI', '.Iangle')
return neplan_timeseries
def compare_modelica_neplan(modelica_res, neplan_res): # compare the result file from NEPLAN and Modelica
"""
Compare Results from modelic and neplan, the name of the components should be kept consistent.
:param modelica_res: the path of the modelica result file, whose suffix should be .mat
:param neplan_res: the path of the neplan result file, whose suffix should be .rlf
:return:
"""
# Read in original neplan result file
file_Neplan = os.path.abspath(neplan_res)
# Read in original Modelica result file
file_Modelica = os.path.abspath("reference-results/Modelica/BasicGrids/" + Net_Name + ".mat")
result_neplan = read_timeseries_NEPLAN_loadflow(file_Neplan)
file_Modelica = os.path.abspath(modelica_res)
result_neplan = mapping_modcelica_neplan(read_timeseries_NEPLAN_loadflow(file_Neplan))
result_modelica = read_timeseries_Modelica(file_Modelica)
list_del = []
# Unification of the variable names and units of the voltage and current
for i in range(len(result_neplan)):
result_neplan[i].name = result_neplan[i].name.replace(' ', '')
result_neplan[i].name = result_neplan[i].name.upper()
if 'ANGLE' in result_neplan[i].name:
pass
else:
result_neplan[i].values = result_neplan[i].values * 1000 # unification of the unit,which is kV/kA in neplan
result_neplan[i].values = result_neplan[i].values / cmath.pi * 180 # transfer the angle unit to degree
# Unification of the units of the angle
for i in range(len(result_modelica)):
result_modelica[i].name = result_modelica[i].name.upper()
if 'ANGLE' in result_modelica[i].name:
result_modelica[i].values = result_modelica[i].values / cmath.pi * 180 # unification of the unit
#f_modelica.write('%s is %s \n' % (result_modelica[i].name, result_modelica[i].values[1]))
timeseries_names = [] # list for names
result_modelica[i].values = result_modelica[i].values / cmath.pi * 180 # transfer the angle unit to degree
timeseries_names = [] # list for names of components
timeseries_error = [] # list for error
len_limit = len(result_modelica)
# Match the components in result files, and compare them
for i in range(len(result_neplan)):
flag_NOT_found = False
flag_not_found = False
for j in range(len_limit):
if result_neplan[i].name == result_modelica[j].name: # Find the same variable
timeseries_names.append(result_neplan[i].name)
timeseries_error.append(TimeSeries.rmse(result_modelica[j], result_neplan[i]))
flag_NOT_found = True
if not flag_NOT_found:
timeseries_error.append(TimeSeries(result_neplan[i].name, 0, -1))
flag_not_found = True
if flag_not_found is False:
# No such variable in Modelica model, set the error to -1
timeseries_error.append(-1)
return dict(zip(timeseries_names, timeseries_error))
def assert_modelia_neplan_results(net_name): # Assert the model using the function above
fail_list = []
error = compare_modelica_neplan(net_name)
def assert_modelia_neplan_results(net_name, modelica_res, neplan_res): # Assert the model using the function above
"""
Assert the result in Modelica according to the results from neplan
:param net_name: The name of the net should be clarified manually
:param modelica_res: the path of the modelica result file, whose suffix should be .mat
:param neplan_res: the path of the neplan result file, whose suffix should be .rlf
:return:
"""
fail_list = [] # List for all the failed test
error = compare_modelica_neplan(modelica_res, neplan_res)
# the limitations are set to 0.5
for name in error.keys():
if abs(error[name]) > 0.5:
fail_list.append(name)
else:
print("Test on %s Passed" % name)
# fail_list is 0, which means all the tests are passed
if len(fail_list) is 0:
print("\033[1;36;40mModel %s Passed\033[0m" % net_name)
else:
for name in fail_list:
print("\033[1;31;40mTest on %s of %s Failed\033[0m" % (name,net_name))
raise ValueError('Test is not passed!')
print("\033[1;31;40mTest on %s of %s Failed\033[0m" % (name, net_name))
raise ValueError('Test on %s is not passed!' % net_name)
......
......@@ -166,6 +166,9 @@ def read_timeseries_NEPLAN_loadflow(file_name, timeseries_names=None, is_regex=F
"""
Read in NEPLAN loadflow result from result file, the result is in angle notation, amplitude and angle are stored
separately
To keep consistent with the names of voltage in most cases, the name of voltage variables are changed into '.V*'
instead of '.U*' as in the result file
:param file_name: name of the mat file for the loadflow result from neplan
:param timeseries_names: column name to be read
:param is_regex: flag for using regular expression
......@@ -174,24 +177,27 @@ def read_timeseries_NEPLAN_loadflow(file_name, timeseries_names=None, is_regex=F
str_tmp = open(file_name, "r") # Read in files
low = 0 # flag for the start of a new data in str_cmp
high = 0 # flag for the end of this new data in str_cmp
flag = True # To judge if this the first line of the file, which will be the names for the data type
flag = True # To judge if this is the first line of the file, which will be the names for the data type
# Read in data from result file of neplan
seq = [] # list for data type names
value = [] # list for data
i = 0
namelist = ['Vpp', 'Vangle', 'I', 'Iangle']
namelist = ['U', 'ANGLEU', 'P', 'Q','I', 'ANGLEI']
timeseries = []
line_del = [] # a list for the value to be deleted
isfloat = re.compile(r'^[-+]?[0-9]+\.[0-9]+$') # regular expression to find float values
# the ',' in the floats in result file to '.'
for line in str_tmp.readlines():
line = line.replace(",", ".")
high -= high
low -= low
del value[:]
# read in different data and start processing
for letter in line:
if letter == " " or letter == "\n": # different data( separated by ' ') or end(/n)
if letter == " " or letter == "\n": # different data(separated by ' ') or end(/n)
if low is not high: # if low is equal to high, no data read in
if flag: # first line of the file, list for data-type name
seq.append(line[low:high])
......@@ -207,26 +213,18 @@ def read_timeseries_NEPLAN_loadflow(file_name, timeseries_names=None, is_regex=F
"""
A typical line current in neplan has two parts from both end, but we doesn't have to calculate them
under the assumption that the topology of the gird should be correct with which we can validate the
current by compare the voltage of the nodes connected to the ends of the line
with the assumption that the topology of the gird should be correct with which we can validate the
current by comparing the voltage of the nodes connected to the ends of the line
"""
if flag is False:
i += 1
check_pass = True # Check for current of the same component
if value[0] == '0': # value[0] == '0', no current data to be expected
if flag is not True:
if value[3] is not '#':
for m in range(6):
timeseries.append(TimeSeries(value[3] + '.' + namelist[m],
np.array([0., 1.]), np.array([value[m + 6], value[m + 6]])))
else:
for m in range(2):
timeseries.append(TimeSeries(value[1] + '.' + namelist[m],
np.array([0., 1.]), np.array([value[m + 6], value[m + 6]])))
else:
for check in range(len(timeseries) - 1):
if timeseries[check].name == value[3] + '.' + namelist[2]:
# Find current of the same component, which means the current needn't to be validated
check_pass = False
line_del.append(check) # delete the unnecessary data
line_del.append(check + 1)
if check_pass: # The load current
for m in range(2, 4):
timeseries.append(TimeSeries(value[3] + '.' + namelist[m], np.array([0., 1.]), np.array([value[m + 8], value[m + 8]])))
flag = False
str_tmp.close()
......@@ -248,6 +246,7 @@ def read_timeseries_NEPLAN_loadflow(file_name, timeseries_names=None, is_regex=F
pass
else:
line_del.append(rule_check)
# delete those values that are not needed.
line_del = set(line_del)
line_del = sorted(line_del)
for num_to_del in range(len(line_del)):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment