Commit d01c0b9b authored by martin.moraga's avatar martin.moraga
Browse files

improve nv_state_estimator_cim.py

parent 347be7ee
......@@ -3,16 +3,21 @@ import numpy as np
class ElemType(Enum):
Node = 1 #Node Voltage
Branch = 2 #Complex Power Injection at node
Branch = 2 #Complex Power flow at branch
class MeasType(Enum):
V = 1 #Node Voltage
Sinj= 2 #Complex Power Injection at node
S1 = 4 #Complex Power flow at branch, measured at initial node
S2 = 5 #Complex Power flow at branch, measured at final node
I = 6 #Branch Current
Vpmu = 7 #Node Voltage
Ipmu = 9 #Branch Current
V_mag = 1 #Node Voltage
Sinj_real= 2 #Complex Power Injection at node
Sinj_imag= 3 #Complex Power Injection at node
S1_real = 4 #Active Power flow at branch, measured at initial node (S1.real)
S1_imag = 5 #Reactive Power flow at branch, measured at initial node (S1.imag)
I_mag = 6 #Branch Current
Vpmu_mag = 7 #Node Voltage
Vpmu_phase = 8 #Node Voltage
Ipmu_mag = 9 #Branch Current
Ipmu_phase = 10 #Branch Current
S2_real = 11 #Active Power flow at branch, measured at final node (S2.real)
S2_imag = 12 #Reactive Power flow at branch, measured at final node (S2.imag)
class Measurement():
def __init__(self, element, element_type, meas_type, meas_value, std_dev):
......@@ -35,16 +40,18 @@ class Measurement():
self.element_type = element_type
self.meas_type = meas_type
self.meas_value = meas_value
self.std_dev = self.meas_value*(std_dev/300)
if self.std_dev<10**(-6)
self.std_dev = 10**(-6)
self.std_dev = std_dev
self.std_dev = std_dev
self.mval = 0.0 #measured values (affected by uncertainty)
class Measurents_set():
def __init__(self):
self.measurements = []
self.measurements = [] #array with all measurements
def create_measurement(self, element, element_type, meas_type, meas_value, std_dev):
"""
to add elements to the measurements array
"""
self.measurements.append(Measurement(element, element_type, meas_type, meas_value, std_dev))
def meas_creation(self):
......@@ -55,78 +62,113 @@ class Measurents_set():
for index, measurement in enumerate(self.measurements):
measurement.mval = measurement.meas_value + self.std_dev*err_pu[index]
def getNumberOfMeasurements(self)
def meas_creation_test(self, err_pu):
"""
return number of measurements of each type in the array Measurents_set.measurements
For test purposes.
It calculates the measured values (affected by uncertainty) at the measurement points.
This function takes as paramenter the random gaussian distribution.
"""
nvi, npi, nqi, npf, nqf, nii, nvpum, nipmu = 0
for elem in self.measurements:
if elem.meas_type is MeasType.V:
nvi = nvi+1
elif elem.meas_type is MeasType.Sinj:
npi = npi+1
nqi = nqi+1
elif elem.meas_type is MeasType.S1 or elem.meas_type is MeasType.S2:
npf = npf+1
nqf = nqf+1
elif elem.meas_type is MeasType.I:
nii = nii+1
elif elem.meas_type is MeasType.Vpmu:
nvpum = nvpum+1
elif elem.meas_type is MeasType.Ipmu:
nipmu = nipmu+1
return nvi, npi, nqi, npf, nqf, nii, nvpum, nipmu
for index, measurement in enumerate(self.measurements):
measurement.mval = measurement.meas_value + measurement.std_dev*err_pu[index]
def getMeasuredActiveInjPowers(self):
def getMeasurements(self, type):
"""
return an array with the measurements of type Sinj.real
return an array with all measurements of type "type" in the array Measurents_set.measurements.
"""
Pinj = np.array([])
for elem in self.measurements:
if elem.meas_type is MeasType.Sinj:
Pinj = np.append(Pinj, elem.real)
measurements = []
for measurement in self.measurements:
if measurement.meas_type is type:
measurements.append(measurement)
return Pinj
return measurements
def getMeasuredReactiveInjPowers(self):
def getNumberOfMeasurements(self, type):
"""
return an array with the measurements of type Sinj.imag
return number of measurements of type "type" in the array Measurents_set.measurements
"""
Qinj = np.array([])
for elem in self.measurements:
if elem.meas_type is MeasType.Sinj:
Qinj = np.append(Qinj, elem.imag)
number = 0
for measurement in self.measurements:
if measurement.meas_type is type:
number = number+1
return Qinj
return number
def getMeasuredActiveBPowers(self):
def getIndexOfMeasurements(self, type):
"""
return an array with the measurements of type S1.real or S2.real
return index of all measurements of type "type" in the array Measurents_set.measurements
"""
Pbr = np.array([])
for elem in self.measurements:
if elem.meas_type is MeasType.S1 or elem.meas_type is MeasType.S2:
Pbr = np.append(Pbr, elem.real)
return Pbr
idx = np.zeros(self.getNumberOfMeasurements(type), dtype=int)
i = 0
for index, measurement in enumerate(self.measurements):
if measurement.meas_type is type:
idx[i] = index
i = i+1
def getMeasuredReactiveBPowers(self):
"""
return an array with the measurements of type S1.imag or S2.imag
"""
Qbr = np.array([])
for elem in self.measurements:
if elem.meas_type is MeasType.S1 or elem.meas_type is MeasType.S2:
Qbr = np.append(Qbr, elem.imag)
return Qbr
return idx
def getWeightsMatrix(self)
def getWeightsMatrix(self):
"""
creates the weights matrix (obtained as standard_deviations^-2)
return an array the weights (obtained as standard_deviations^-2)
"""
weights = np.zeros(len(self.measurements))
for index, measurement in enumerate(self.measurements):
#the weight is small and can bring instability during matrix inversion, so we "cut" everything below 10^-6
if measurement.std_dev<10**(-6):
measurement.std_dev = 10**(-6)
weights[index] = measurement.std_dev**(-2)
return np.diag(weights)
return weights
def getMeasValues(self):
"""
for test purposes
returns an array with all measured values
"""
meas_val = np.zeros(len(self.measurements))
for index, measurement in enumerate(self.measurements):
meas_val[index] = measurement.meas_value
return meas_val
def getmVal(self):
"""
returns an array with all measured values (affected by uncertainty)
"""
mVal = np.zeros(len(self.measurements))
for index, measurement in enumerate(self.measurements):
mVal[index] = measurement.mval
""" Replace in mVal amplitude and phase of Vpmu by real and imaginary part """
#get all measurements of type MeasType.Vpmu_mag
Vpmu_mag_idx = self.getIndexOfMeasurements(type=MeasType.Vpmu_mag)
#get all measurements of type MeasType.Vpmu_phase
Vpmu_phase_idx = self.getIndexOfMeasurements(type=MeasType.Vpmu_phase)
for vpmu_mag_index, vpmu_phase_index in zip(Vpmu_mag_idx, Vpmu_phase_idx):
vamp = self.measurements[vpmu_mag_index].mval
vtheta = self.measurements[vpmu_phase_index].mval
mVal[vpmu_mag_index] = vamp*np.cos(vtheta)
mVal[vpmu_phase_index] = vamp*np.sin(vtheta)
""" Replace in z amplitude and phase of Ipmu by real and imaginary part """
#get all measurements of type MeasType.Ipmu_mag
Ipmu_mag_idx = self.getIndexOfMeasurements(type=MeasType.Ipmu_mag)
#get all measurements of type MeasType.Ipmu_phase
Ipmu_phase_idx = self.getIndexOfMeasurements(type=MeasType.Ipmu_phase)
for ipmu_mag_index, ipmu_phase_index in zip(Ipmu_mag_idx, Ipmu_phase_idx):
iamp = self.measurements[ipmu_mag_index].mval
itheta = self.measurements[ipmu_phase_index].mval
mVal[ipmu_mag_index] = iamp*np.cos(itheta)
mVal[ipmu_phase_index] = iamp*np.sin(itheta)
return mVal
def getStd_Dev(self):
"""
for test purposes
returns an array with all standard deviations
"""
std_dev = np.zeros(len(self.measurements))
for index, measurement in enumerate(self.measurements):
std_dev[index] = measurement.std_dev
return std_dev
\ No newline at end of file
......@@ -255,58 +255,3 @@ def solve(system):
results.calculateS2()
return results, num_iter
\ No newline at end of file
def calculateI(system, V):
"""
To calculate the branch currents
"""
Ymatrix, Adj = Ymatrix_calc(system)
I = np.zeros((len(system.branches)), dtype=np.complex)
for idx in range(len(system.branches)):
fr = system.branches[idx].start_node.index
to = system.branches[idx].end_node.index
I[idx] = - (V[fr] - V[to])*Ymatrix[fr][to]
return I
def calculateInj(system, I):
"""
To calculate current injections at a node
"""
Iinj = np.zeros((len(system.nodes)), dtype=np.complex)
for k in range(0, (len(system.nodes))):
to=[]
fr=[]
for m in range(len(system.branches)):
if k==system.branches[m].start_node.index:
fr.append(m)
if k==system.branches[m].end_node.index:
to.append(m)
Iinj[k] = np.sum(I[to]) - np.sum(I[fr])
return Iinj
def calculateS1(system, V, I):
"""
To calculate powerflow on branches
"""
S1 = np.zeros((len(system.branches)), dtype=np.complex)
for i in range(0, len(system.branches)):
S1[i] = V[system.branches[i].start_node.index]*(np.conj(I[i]))
return S1
def calculateS2(system, V, I):
"""
To calculate powerflow on branches
"""
S2 = np.zeros((len(system.branches)), dtype=np.complex)
for i in range(0, len(system.branches)):
S2[i] = -V[system.branches[i].end_node.index]*(np.conj(I[i]))
return S2
def calculateSinj(V, Iinj):
"""
To calculate power injection at a node
"""
Sinj = np.multiply(V, np.conj(Iinj))
return Sinj
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment