validationtools.py 8.61 KB
Newer Older
1 2 3 4 5
#!/usr/bin/python
# -*- coding: UTF-8 -*-

import os

6
from .readtools import *
7

Bichen Li's avatar
Bichen Li committed
8 9
"""
The validationtools are used to validate the simulate results from the model.
10
A typical process to validate a model contains four parts
Bichen Li's avatar
Bichen Li committed
11 12 13 14 15
 - Building & Running the module to get results
 - Reading in the results
 - Mapping the results with the reference results
 - Asserting the module
The first step is done by Py4Mod package, the second by readtool.
16 17 18 19 20 21

The validationtool focuses on the last two steps: the conversion function converts the reference-results 
timeseries into the modelica timeseries (mapping the names and units), the comparision function compares two 
timeseries, and the assert function gives an assertion to the result comparison. 

At last, a top level validation function is introduced to organize the whole job.   
Bichen Li's avatar
Bichen Li committed
22
"""
23 24 25 26 27 28 29 30 31 32 33 34

def convert_neplan_to_modelica_timeseries(neplan_timeseries):
    """
    Mapping the variable names between modelica and neplan
        - Voltage: change *.U and *.ANGLEU to *.V and *.Vangle
        - Current: remove unnecessary current variables
    :param neplan_timeseries: result of neplan in timeseries
    :return: a mapped neplan_timeseries
    """
    line_del = []
    # remove all the line current

35
    # Find current of the same component, which means the current don't need to be validated
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
    for check in range(len(neplan_timeseries)):
        if neplan_timeseries[check].values[0] == '#':
            line_del.append(check)
        if '.P' in neplan_timeseries[check].name:
            line_del.append(check)
        if '.Q' in neplan_timeseries[check].name:
            line_del.append(check)
        for i in range(check + 1, len(neplan_timeseries)):
            if neplan_timeseries[check].name == neplan_timeseries[i].name:
                line_del.append(check)  # delete list of the unnecessary data
                line_del.append(i)
    line_del = sorted(set(line_del))
    for num_to_del in range(len(line_del)):
        del neplan_timeseries[line_del[len(line_del) - num_to_del - 1]]

    # Change the unit of variables to keep consistent with those in modelica
    for i in range(len(neplan_timeseries)):
        if 'ANGLE' in neplan_timeseries[i].name:
            neplan_timeseries[i].values = neplan_timeseries[i].values / 180 * cmath.pi  # unification of the unit
        elif '.U' in neplan_timeseries[i].name or '.I' in neplan_timeseries[i].name:
            neplan_timeseries[i].values = neplan_timeseries[i].values * 1000

    # Change the name of variables to keep consistent with those in modelica
    for i in range(len(neplan_timeseries)):
        neplan_timeseries[i].name = neplan_timeseries[i].name.replace(' ', '')
        neplan_timeseries[i].name = neplan_timeseries[i].name.replace('.ANGLEU', '.Vangle')
        neplan_timeseries[i].name = neplan_timeseries[i].name.replace('.U', '.Vpp')
        neplan_timeseries[i].name = neplan_timeseries[i].name.replace('.ANGLEI', '.Iangle')

    return neplan_timeseries

def convert_simulink_to_modelica_timeseries(simseri):
    """
    Convert the steady-state results timeseries from simulink to modelica timeseries
    :param simseri: simulate timeseries, generated by the result file from simulink
    :return: a result timeseries
    """
    res = []
    for check in range(len(simseri)):
        if 'U AB:' in simseri[check].name:
            simseri[check].name = simseri[check].name.replace('U AB:', '')
            simseri[check].name = simseri[check].name.replace('Vrms', 'Vpp')
            simseri[check].name = simseri[check].name.replace('VDegree', 'Vangle')
            simseri[check].name = simseri[check].name.replace(' ', '')
            simseri[check].name = simseri[check].name.replace('_', '')
            if 'Vangle' in simseri[check].name:
                simseri[check].values = (simseri[check].values - 30)/180 * cmath.pi
            res.append(simseri[check])
    return res

86 87 88 89 90 91 92 93 94 95 96 97 98 99
def convert_dpsim_to_modelica_timeseries(dpsim_timeseries):
    """
    Convert the steady-state results timeseries from dpsim to modelica timeseries
    :param dpsim_timeseries: dict of dpsim timeseries, generated by the csv result file from dpsim
    :return: a list of dpsim timeseries
    """
    ts_dpsimList=[]
    for ts,values in dpsim_timeseries.items():
        ts_abs = values.abs(ts + '.Vpp')
        ts_phase = values.phase(ts + '.Vangle')
        ts_phase.values*=np.pi/180
        ts_dpsimList.append(ts_abs)
        ts_dpsimList.append(ts_phase)
    return ts_dpsimList
100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123

def compare_timeseries(ts1, ts2):
    """
    Compare the result from two timeseries.
    :param ts1: timeseries
    :param ts2: timeseries
    :return: an error dic
    """
    if len(ts1) > len(ts2):
        tmp = ts2
        ts2 = ts1
        ts1 = tmp
    for i in range(len(ts1)):
        ts1[i].name = ts1[i].name.upper()
    for i in range(len(ts2)):
        ts2[i].name = ts2[i].name.upper()

    timeseries_names = []  # list for names of components
    timeseries_error = []  # list for error
    len_ts1 = len(ts1)
    len_limit = len(ts2)

    # Match the components in result files, and compare them
    for i in range(len_ts1):
124
        flag_found = False
125 126 127 128 129 130 131 132 133 134
        for j in range(len_limit):
            if ts1[i].name == ts2[j].name:  # Find the same variable
                timeseries_names.append(ts1[i].name)
                if ts1[i].values[0] == 0:
                    timeseries_error.append(TimeSeries.rmse(ts2[j], ts1[i]))  # is it good to do so?
                else:
                    timeseries_error.append(TimeSeries.rmse(ts2[j], ts1[i])/ts1[i].values[0])

                print(ts1[i].name)
                print(timeseries_error[len(timeseries_error) - 1])
135 136
                flag_found = True
        if flag_found is False:
137
            # No such variable in Modelica model, set the error to -1
138 139 140
            print("Warning: no matching variable found for " + ts1[i].name + "! Skipping error calculation and assertion for this variable...")
            # timeseries_names.append(ts1[i].name)
            # timeseries_error.append(-1)
141 142 143
    return dict(zip(timeseries_names, timeseries_error))


144
def assert_modelica_results(net_name, error, threshold):
145 146 147 148 149 150 151 152 153
    """
    assert the result data of a net.
    :param net_name: name of the network
    :param modelica_res: timeseries of modelica result
    :param simulink_res: timeseries of reference result
    :return: outputs to command line which are the results of the assert
    """
    fail_list = []  # List for all the failed test
    for name in error.keys():
Bichen Li's avatar
Bichen Li committed
154
        if abs(error[name]) > threshold:
155 156 157 158 159 160 161 162 163 164 165 166 167
            fail_list.append(name)
        else:
            print("Test on %s Passed" % name)

    #  fail_list is 0, which means all the tests are passed
    if len(fail_list) is 0:
        print("\033[1;36;40mModel %s Passed\033[0m" % net_name)
    else:
        for name in fail_list:
            print("\033[1;31;40mTest on %s of %s Failed\033[0m" % (name, net_name))
        raise ValueError('Test on %s is not passed!' % net_name)


Bichen Li's avatar
Bichen Li committed
168
def validate_modelica_res(net_name, modelica_res_path, reference_res_path, threshold=0.5):
169 170 171 172
    """
    Top level function for the validation of modelica, calls all the function needed to execute the validation.
    :param modelica_res_path: the path of the modelica result file, whose suffix should be .mat
    :param reference_res_path: the path of the reference result file, whose suffix should be .rep(simulink)/.rlf(neplan)
173
    :param threshold: the threshold of the assertion, a default value of 0.5 is introduced.
174 175
    :return: outputs to command line which are the results of the validation.
    """
176 177

    print('\n************************ Modelica Results ****************')
178
    res_mod = read_timeseries_Modelica (modelica_res_path)
179 180 181 182 183
    for res in res_mod:
        print(res.name)
        print(res.values[0]) # only show first value of time series

    print('\n************************ Reference Results ****************')    
184 185 186 187
    if os.path.splitext(reference_res_path)[1] == '.rep':
        res_ref = convert_simulink_to_modelica_timeseries(read_timeseries_simulink_loadflow(reference_res_path))
    elif os.path.splitext(reference_res_path)[1] == '.rlf':
        res_ref = convert_neplan_to_modelica_timeseries(read_timeseries_NEPLAN_loadflow(reference_res_path))
188 189 190 191 192
    for res in res_ref:
        print(res.name)
        print(res.values)

    print('\n************************ Comparison ****************')    
193
    res_err = compare_timeseries(res_ref, res_mod)
194 195 196

    print('\n************************ Assertion ****************')    
    assert_modelica_results(net_name, res_err, threshold)