-
Ann-Kathrin Margarete Edrich authoredAnn-Kathrin Margarete Edrich authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
cut_and_interpolate_gui.py 42.35 KiB
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import numpy as np
import pickle
import os
import netCDF4 as nc
from LatLon23 import LatLon, Latitude, Longitude
from scipy.interpolate import interp2d, interp1d
from utilities.import_raw_dataset import import_raw_dataset
from utilities.import_format import import_tif, import_nc
from utilities.ncfile_generation import generate_basic_ncfile
class cut_and_interpolate:
"""
This class imports a dataset, cuts it to the desired extent and
interpolates it to the desired resolution.
The settings are taken from the user input through the gui.
"""
def __init__(self, key=None, path=None, no_data_value=None,
categorical=None, several=None, several_same=None,
first=None, bb=None, cluster=False, for_prediction=False,
prop_settings=None, prop=None, path_properties=None):
"""
Input:
key: key belonging to the dataset (data_summary.csv)
path: path where the dataset is stored (data_summary.csv)
no_data_value: value representing no data (data_summary.csv)
categorical: boolean if dataset contains categorical
information (data_summary.csv)
several: boolean if class is called several times
e.g. in a for loop over several datasets
several_same: boolean if all datasets have the same
spatial extent and resolution
first: boolean,
only important if several or several_same is True
bb: list of bounding boxes, format list ymax, ymin, xmin, xmax
cluster: boolean, determines whether several sections of the
dataset are interpolated in a loop
for_pediction: boolean,
results used for creating the prediction dataset
prop_settings: dictionary containing general settings
prop: dictionary containing necessary
information for cutting and interpolation
path_properties: path to separate file storing information
on applied cutting extent and
interpolation vectors
"""
# Import cutting and interpolation information if this is not the
# first dataset of several to be cut and interpolated
if several and not first:
with open(path_properties, 'rb') as handle:
self.properties = pickle.load(handle)
self.path = path
self.key = key
self.cluster = cluster
self.for_prediction = for_prediction
self.prop = prop
self.prop_settings = prop_settings
# Define bounding box
if cluster:
self.bb_ges = bb
self.to_cluster = True
elif self.for_prediction:
self.to_cluster = False
self.bb = [self.prop['north'], self.prop['south'],
self.prop['west'], self.prop['east']]
if several and not first:
self.bb = [self.properties['interp_vectors']['y'][0],
self.properties['interp_vectors']['y'][-1],
self.properties['interp_vectors']['x'][0],
self.properties['interp_vectors']['x'][-1]]
else:
self.to_cluster = False
if several and not first:
self.bb = [self.properties['interp_vectors']['y'][0],
self.properties['interp_vectors']['y'][-1],
self.properties['interp_vectors']['x'][0],
self.properties['interp_vectors']['x'][-1]]
else:
self.bb = bb
self.path_properties = path_properties
if no_data_value != 'None':
self.no_data = no_data_value.split(',')
self.no_data = [float(val) for val in self.no_data]
else:
self.no_data = no_data_value
self.categorical = categorical
self.several = several
self.several_same = several_same
self.first = first
# Define limits to determine interpolation approach for dataset
self.limit_org = 50000000
self.limit_interp = 7500000
self.size = 200
self.overlap = 100
self.data, self.x_org, self.y_org = import_raw_dataset(self.path, self.no_data, prop_settings['no_value']) # Import raw datasets
# If training locations are clustered
if self.to_cluster:
self.x_raw = self.x_org
self.y_raw = self.y_org
self.data_raw = self.data
def parallized_interpolation(num):
# Interpolate the cut dataset
a = self.interpolate_dataset(
self.subsets[num],
self.y_orgs[num],
self.x_orgs[num],
self.ds['Longitude' + str(num)][:].data,
self.ds['Latitude' + str(num)][:].data)
# Save the interpolated dataset in the nc file/Update the cut
# and interpolated dataset for the 2nd and following datasets
if self.first_dataset:
result = self.ds.createVariable(
'Result' + str(num),
'f4',
('lat' + str(num), 'lon' + str(num)))
result[:, :] = a
else:
self.ds['Result' + str(num)][:, :] = a
self.subsets = []
self.x_orgs = []
self.y_orgs = []
self.cuttables = []
self.first_dataset = False
# Iterate over all bounding boxes of
# the clustered training locations
for count, self.bb in enumerate(self.bb_ges):
self.x_org = self.x_raw
self.y_org = self.y_raw
self.data = self.data_raw
# Check that all bounding boxes are
# covered by the extent of the dataset
self.compare_extends()
self.cuttables.append(self.cuttable)
# Cut the original dataset to the
# currently considered bounding box
self.cut_to_boundingbox()
# Store cut properties to be used in the interpolation
self.subsets.append(self.data)
self.x_orgs.append(self.x_org)
self.y_orgs.append(self.y_org)
if not os.path.isfile('tmp.nc') or self.first_dataset:
if count == 0:
# Open temporarty file to store the
# interpolated subsets of the dataset
self.ds = generate_basic_ncfile('tmp.nc')
self.first_dataset = True
# Determine the x and y vectors for interpolation
self.determine_reference_vectors()
# Saving the interpolation vectors to the temporary file
self.ds.createDimension('lat' + str(count), len(self.y))
self.ds.createDimension('lon' + str(count), len(self.x))
longitude = self.ds.createVariable(
'Longitude' + str(count),
'f4',
'lon' + str(count))
latitude = self.ds.createVariable(
'Latitude' + str(count),
'f4',
'lat' + str(count))
longitude[:] = self.x
latitude[:] = self.y
elif (os.path.isfile('tmp.nc')
and not self.first_dataset and count == 0):
# If it's not the first dataset to be cut, open the nc file
self.ds = nc.Dataset('tmp.nc', mode='a')
self.one_go, self.as_chunks, self.as_cols = True, False, False
# Final decision whether cutting and interpolation is possible
if False in self.cuttables:
self.cuttable = False
else:
self.cuttable = True
# Interpolate all subsets in parallel
#Parallel(n_jobs=5, backend='threading', timeout=999999)
#(delayed(parallized_interpolation)(num)
# for num in range(len(self.bb_ges)))
for num in range(len(self.bb_ges)):
parallized_interpolation(num)
self.ds.close()
elif self.for_prediction:
def test_parallel_interpolation(i):
ref = self.interpolate_dataset(
np.array(chunks_old[i]),
np.array(np.linspace(
self.y_org[pixels_old[i][0]],
self.y_org[pixels_old[i][1]],
abs(pixels_old[i][1]-pixels_old[i][0]))),
np.array(np.linspace(
self.x_org[pixels_old[i][2]],
self.x_org[pixels_old[i][3]],
abs(pixels_old[i][3]-pixels_old[i][2]))),
self.x_final[i],
self.y_final[i])
return ref
self.compare_extends()
# If bounding box is within limits of dataset
if self.cuttable:
self.cut_to_boundingbox() # Cut to the bounding box
# Determine interpolation vectors
self.determine_reference_vectors()
# Depending on dataset size determine interpolation approach
self.determine_interpolation_approach()
if self.one_go:
# Interpolate dataset
self.array = self.interpolate_dataset(
self.data,
self.y_org,
self.x_org,
self.x,
self.y)
# If original dataset has to be split into chunks
elif self.as_chunks:
# Split the dataset into chunks
chunks_old, pixels_old = self.split_into_chunks()
# Determine interpolation vectors for each chunk
self.determine_new_vector()
#ref_tmp = Parallel(n_jobs=5,
# backend='threading',
# timeout=999999)
#(delayed(test_parallel_interpolation)(num)
# for num in range(len(self.x_final)))
ref_tmp = []
for num in range(len(self.x_final)):
ref_tmp.append(test_parallel_interpolation(num))
# Combine the individual interpolated
# chunks into one dataset
self.array = self.reshape_chunks(ref_tmp)
elif self.as_cols:
self.split_into_chunks() # Split the dataset into chunks
ref_tmp = []
# Go through all chunks and interpolate them individually
for i in range(len(self.x_final)):
ref = self.interpolate_dataset(self.data,
self.y_org,
self.x_org,
self.x_final[i],
self.y_final[i])
ref_tmp.append(list(ref))
# Combine the individual interpolated
# chunks into one dataset
self.array = self.reshape_chunks(ref_tmp)
# If a path is provided, the cutting and interpolation
# information is saved in a pickle file
if self.path_properties is not None:
with open(self.path_properties, 'wb') as handle:
pickle.dump(self.properties, handle)
else:
# Check if bounding box is covered by limits of dataset
self.compare_extends()
# If bounding box is within limits of dataset
if self.cuttable:
self.cut_to_boundingbox() # Cut to the bounding box
# Determine interpolation vectors
self.determine_reference_vectors()
# Depending on dataset size determine interpolation approach
self.determine_interpolation_approach()
# If interpolation can be done in one go
if self.one_go:
# Interpolate dataset
self.array = self.interpolate_dataset(self.data,
self.y_org,
self.x_org,
self.x,
self.y)
# If original dataset has to be split into chunks
elif self.as_chunks:
# Split the dataset into chunks
chunks_old, pixels_old = self.split_into_chunks()
# Determine interpolation vectors for each chunk
self.determine_new_vector()
ref_tmp = []
# Go through all chunks and interpolate them individually
for i in range(len(chunks_old)):
ref = self.interpolate_dataset(
np.array(chunks_old[i]),
np.array(np.linspace(
self.y_org[pixels_old[i][0]],
self.y_org[pixels_old[i][1]],
abs(pixels_old[i][1]-pixels_old[i][0]))),
np.array(np.linspace(
self.x_org[pixels_old[i][2]],
self.x_org[pixels_old[i][3]],
abs(pixels_old[i][3]-pixels_old[i][2]))),
self.x_final[i],
self.y_final[i])
ref_tmp.append(list(ref))
# Combine the individual interpolated
# chunks into one dataset
self.array = self.reshape_chunks(ref_tmp)
elif self.as_cols:
self.split_into_chunks() # Split the dataset into chunks
ref_tmp = []
# Go through all chunks and interpolate them individually
for i in range(len(self.x_final)):
ref = self.interpolate_dataset(self.data,
self.y_org,
self.x_org,
self.x_final[i],
self.y_final[i])
ref_tmp.append(list(ref))
# Combine the individual interpolated
# chunks into one dataset
self.array = self.reshape_chunks(ref_tmp)
# If a path is provided, the cutting and interpolation
# information is saved in a pickle file
if self.path_properties is not None:
with open(self.path_properties, 'wb') as handle:
pickle.dump(self.properties, handle)
def compare_extends(self):
"""
Determine if the bounding box to which the dataset shall be cut is
completely covered by the dataset.
If not, the execution of the script will be aborted.
"""
self.cuttable = True
self.left_too_short = False
self.right_too_short = False
self.bottom_too_short = False
self.top_too_short = False
y, x = [], []
for coord in [self.y_org[0], self.y_org[-1], self.bb[0], self.bb[1]]:
if coord >= 0:
y.append(90 + coord)
if coord < 0:
y.append(90 - abs(coord))
for coord in [self.x_org[0], self.x_org[-1], self.bb[2], self.bb[3]]:
if coord >= 0:
x.append(180 + coord)
if coord < 0:
x.append(180 - abs(coord))
if y[2] > y[0]:
self.top_too_short = True
if y[3] < y[1]:
self.bottom_too_short = True
if x[2] < x[0]:
self.left_too_short = True
if x[3] > x[1]:
self.right_too_short = True
if (self.bottom_too_short or self.top_too_short
or self.left_too_short or self.right_too_short):
self.cuttable = False
self.array = None
self.x = None
self.y = None
return self.cuttable
def cut_to_boundingbox(self):
"""
Cut the dataset to the bounding box
"""
if self.several_same and not self.first:
# Load the indices of the bounding box from the properties file
self.top = self.properties['boundaries']['top']
self.bottom = self.properties['boundaries']['bottom']
self.left = self.properties['boundaries']['left']
self.right = self.properties['boundaries']['right']
else:
# If several datasets shall be interpolated after another and the
# current run is the first dataset
if (self.several and self.first) or (self.several_same and self.first):
# Open empty dictionary to store the cutting and
# interpolation information in
self.properties = {}
# Determine if the coordinate vectors
# contain both pos and neg values
if (all(val >= 0 for val in self.x_org)
or all(val <= 0 for val in self.x_org)):
# Determine pixel index of left and right edge of bounding box
self.left = int((np.abs(self.x_org - self.bb[2])).argmin())
self.right = int((np.abs(self.x_org - self.bb[3])).argmin())
else:
if self.bb[2] <= 0:
tmp = [x for x in self.x_org if x <= 0]
else:
tmp = [x for x in self.x_org if x >= 0]
self.left = list(self.x_org).index(
tmp[int((np.abs(np.array(tmp) - self.bb[2])).argmin())])
if self.bb[3] <= 0:
tmp = [x for x in self.x_org if x <= 0]
else:
tmp = [x for x in self.x_org if x >= 0]
self.right = list(self.x_org).index(
tmp[int((np.abs(np.array(tmp) - self.bb[3])).argmin())])
if (all(val >= 0 for val in self.y_org)
or all(val <= 0 for val in self.y_org)):
# Determine pixel index of top and bottom edge of bounding box
self.top = int((np.abs(self.y_org - self.bb[0])).argmin())
self.bottom = int((np.abs(self.y_org - self.bb[1])).argmin())
else:
if self.bb[0] <= 0:
tmp = [y for y in self.y_org if y <= 0]
else:
tmp = [y for y in self.y_org if y >= 0]
self.top = list(self.y_org).index(
tmp[int((np.abs(np.array(tmp) - self.bb[0])).argmin())])
if self.bb[1] <= 0:
tmp = [y for y in self.y_org if y <= 0]
else:
tmp = [y for y in self.y_org if y >= 0]
self.bottom = list(self.y_org).index(
tmp[int((np.abs(np.array(tmp) - self.bb[1])).argmin())])
# Add pixel in all directions to account for rounding issues
if not self.for_prediction:
if self.left-100 >= 0:
self.left = self.left - 100
if self.top-100 >= 0:
self.top = self.top - 100
if self.bottom+100 <= np.shape(self.data)[0]:
self.bottom = self.bottom + 100
if self.right+100 <= np.shape(self.data)[1]:
self.right = self.right + 100
if self.several_same and self.first:
# Store the indices to be used again with the next dataset
self.properties['boundaries'] = {}
self.properties['boundaries']['top'] = self.top
self.properties['boundaries']['bottom'] = self.bottom
self.properties['boundaries']['left'] = self.left
self.properties['boundaries']['right'] = self.right
# Cut the dataset and x, y vectors to the determined extent
self.data = self.data[self.top:self.bottom, self.left:self.right]
self.x_org = self.x_org[self.left:self.right]
self.y_org = self.y_org[self.top:self.bottom]
def determine_reference_vectors(self):
"""
Determine interpolation vectors x and y.
"""
# If several datasets shall be interpolated after another and the
# current run is the first dataset
if self.several and self.first:
# Determine distance in meters in x and y
# direction between bounds of dataset
point1_x = LatLon(Latitude(self.y_org[0]),
Longitude(self.x_org[0]))
point2_x = LatLon(Latitude(self.y_org[0]),
Longitude(self.x_org[-1]))
distance_x = point1_x.distance(point2_x)*1000
point1_y = LatLon(Latitude(self.y_org[0]),
Longitude(self.x_org[0]))
point2_y = LatLon(Latitude(self.y_org[-1]),
Longitude(self.x_org[0]))
distance_y = point1_y.distance(point2_y)*1000
# Determine interpolation vector with desired resolution
self.x = np.linspace(
self.x_org[0],
self.x_org[-1],
int(distance_x/self.prop_settings['resolution']))
self.y = np.linspace(
self.y_org[0],
self.y_org[-1],
int(distance_y/self.prop_settings['resolution']))
# Store interpolation vector in properties file
self.properties['interp_vectors'] = {}
self.properties['interp_vectors']['x'] = self.x
self.properties['interp_vectors']['y'] = self.y
# If only one dataset shall be interpolated
elif not self.several:
# Determine distance in meters in x and y
# direction between bounds of dataset
point1_x = LatLon(Latitude(self.y_org[0]),
Longitude(self.x_org[0]))
point2_x = LatLon(Latitude(self.y_org[0]),
Longitude(self.x_org[-1]))
distance_x = point1_x.distance(point2_x)*1000
point1_y = LatLon(Latitude(self.y_org[0]),
Longitude(self.x_org[0]))
point2_y = LatLon(Latitude(self.y_org[-1]),
Longitude(self.x_org[0]))
distance_y = point1_y.distance(point2_y)*1000
# Determine interpolation vector with desired resolution
self.x = np.linspace(
self.x_org[0],
self.x_org[-1],
int(distance_x/self.prop_settings['resolution']))
self.y = np.linspace(
self.y_org[0],
self.y_org[-1],
int(distance_y/self.prop_settings['resolution']))
# If several datasets shall be interpolated after another and the
# current run is not the first dataset
elif self.several and not self.first:
self.x = np.array(self.properties['interp_vectors']['x'])
self.y = np.array(self.properties['interp_vectors']['y'])
def determine_new_vector(self):
"""
Determine interpolation vectors for the chunks.
"""
# For each chunk determine the original x and y vectors
x_ref = [[self.x_org[self.x_limits[i][0]],
self.x_org[self.x_limits[i][1]]]
for i in range(len(self.x_limits))]
y_ref = [[self.y_org[self.y_limits[i][0]],
self.y_org[self.y_limits[i][1]]]
for i in range(len(self.y_limits))]
self.x_final = []
self.y_final = []
for j in range(np.shape(x_ref)[0]):
ind_min_x = int((np.abs(self.x - x_ref[j][0])).argmin())
ind_max_x = int((np.abs(self.x - x_ref[j][1])).argmin())
self.x_final.append(self.x[ind_min_x:ind_max_x])
for j in range(np.shape(y_ref)[0]):
ind_min_y = int((np.abs(self.y - y_ref[j][0])).argmin())
ind_max_y = int((np.abs(self.y - y_ref[j][1])).argmin())
self.y_final.append(self.y[ind_min_y:ind_max_y])
def split_into_chunks(self):
"""
Split the dataset into chunks for interpolation
"""
# If the dataset needs to be split into chunks
if self.as_chunks:
y_len, x_len = np.shape(self.data)[0], np.shape(self.data)[1]
# Split in equal sized chunks and treat the bottom and right
# differently that have different shape than the equal sized chunks
plus_y = self.data.shape[0] % self.size
plus_x = self.data.shape[1] % self.size
# Number of equal sized chunks in x and y direction
num_y = int(self.data.shape[0] / self.size)
num_x = int(self.data.shape[1] / self.size)
# If final columns and row too small to be called individual
# chunks, combine with second to last row and column
if plus_y < 2/3*self.size:
num_y = num_y - 1
if plus_x < 2/3*self.size:
num_x = num_x - 1
self.num_y = num_y
self.num_x = num_x
chunks = [] # Store the chunks
pixels = [] # Store the pixel limits to acces original coordinates
count = 0
# Store the coord limits to acces original coordinates
self.x_limits = []
self.y_limits = []
# Save the chunks in a list
count_ges = 0
tmpy = 0
for i in range(num_y):
tmpx = 0
for j in range(num_x):
if ((i+1)*self.size-1+self.overlap <= self.data.shape[0]) and ((j+1)*self.size-1+self.overlap <= self.data.shape[1]):
chunks.append(
list(self.data[i*self.size:(i+1)*self.size-1+self.overlap,
j*self.size:(j+1)*self.size-1+self.overlap]))
pixels.append(
[i*self.size, (i+1)*self.size-1+self.overlap,
j*self.size, (j+1)*self.size-1+self.overlap])
self.x_limits.append([j*self.size, (j+1)*self.size-1+self.overlap])
self.y_limits.append([i*self.size, (i+1)*self.size-1+self.overlap])
elif ((i+1)*self.size-1+self.overlap > self.data.shape[0]) and ((j+1)*self.size-1+self.overlap <= self.data.shape[1]):
chunks.append(
list(self.data[i*self.size:,
j*self.size:(j+1)*self.size-1+self.overlap]))
pixels.append(
[i*self.size, np.shape(self.data)[0]-1,
j*self.size, (j+1)*self.size-1+self.overlap])
elif ((j+1)*self.size-1+self.overlap > self.data.shape[1]) and ((i+1)*self.size-1+self.overlap <= self.data.shape[0]):
chunks.append(
list(self.data[i*self.size:(i+1)*self.size-1+self.overlap,
j*self.size:]))
pixels.append(
[i*self.size, (i+1)*self.size-1+self.overlap,
j*self.size, np.shape(self.data)[1]-1])
elif ((j+1)*self.size-1+self.overlap > self.data.shape[1]) and ((i+1)*self.size-1+self.overlap > self.data.shape[0]):
chunks.append(
list(self.data[i*self.size:,
j*self.size:]))
pixels.append(
[i*self.size, np.shape(self.data)[0]-1,
j*self.size, np.shape(self.data)[1]-1])
tmpy = tmpy + 1
# Chunks most bottom column
tmpx = 0
for j in range(num_x):
if ((j+1)*self.size-1+self.overlap <= self.data.shape[1]):
chunks.append(
list(self.data[(num_y)*self.size:-1,
j*self.size:(j+1)*self.size-1+self.overlap]))
pixels.append(
[(num_y)*self.size, np.shape(self.data)[0]-1,
j*self.size, (j+1)*self.size-1+self.overlap])
self.x_limits.append([j*self.size, (j+1)*self.size-1+self.overlap])
self.y_limits.append([(num_y)*self.size, np.shape(self.data)[0]-1])
else:
chunks.append(
list(self.data[(num_y)*self.size:-1,
j*self.size:]))
pixels.append(
[(num_y)*self.size, np.shape(self.data)[0]-1,
j*self.size, np.shape(self.data)[1]-1])
self.x_limits.append([j*self.size, (j+1)*self.size-1])
# Chunks most right column
tmpy = 0
for j in range(num_y):
if ((j+1)*self.size-1+self.overlap <= self.data.shape[0]):
chunks.append(
list(self.data[j*self.size:(j+1)*self.size-1+self.overlap,
(num_x)*self.size:-1]))
pixels.append(
[j*self.size, (j+1)*self.size-1+self.overlap,
(num_x)*self.size, x_len-1])
self.y_limits.append([j*self.size, (j+1)*self.size-1+self.overlap])
self.x_limits.append([(num_x)*self.size, x_len-1])
else:
chunks.append(
list(self.data[j*self.size:-1,
(num_x)*self.size:-1]))
pixels.append(
[j*self.size, np.shape(self.data)[0]-1,
(num_x)*self.size, x_len-1])
self.y_limits.append([j*self.size, (j+1)*self.size-1])
# Chunk bottom right
chunks.append(
list(self.data[num_y*self.size:-1,
num_x*self.size:-1]))
pixels.append(
[num_y*self.size, y_len-1,
num_x*self.size, x_len-1])
# Save corner indices for the chunks
self.x_limits.append([num_x*self.size, x_len-1])
self.y_limits.append([num_y*self.size, y_len-1])
return chunks, pixels
# If dataset is interpolated columns-wise
elif self.as_cols:
chunks, pixels = None, None
self.x_limits = [[], [], [], [], [], [], [], []]
# Determine columns to be interpolated in each chunk
i = 0
while i <= len(self.x):
for j in range(len(self.x_limits)):
if i+j <= len(self.x)-1:
self.x_limits[j].append(i + j)
i = i + j + 1
# Determine the coordinates in the interpolation vector
self.x_final = [[], [], [], [], [], [], [], []]
self.y_final = []
for i in range(len(self.x_limits)):
for j in self.x_limits[i]:
self.x_final[i].append(self.x[j])
self.y_final.append(list(self.y))
def determine_interpolation_approach(self):
"""
Depending on the siz of the original dataset and the size of the
dataset after the interpolation, the computational
power might be exceeded and the dataset needs to be
split up to be interpolated.
Different cases are covered in this function and depending
on the sizes, the approach is determined.
Approaches:
one_go: dataset before and after interpolation small
enough to be interpolated in one go
as_chunks: dataset before interpolation already so large that
it needs to be split into chunks which then
area interpolated independently
as_cols: dataset after interpolation so large,
that interpolation is done columnwise
"""
# If several datasets shall be interpolated after another and the
# current run is the first dataset
if (self.several and self.first) or not self.several:
if len(self.x_org) < 2*self.size and len(self.y_org) < 2*self.size:
self.one_go, self.as_chunks, self.as_cols = True, False, False
else:
# Assessment of the size of the dataset before and after
# interpolation and comparison with manually defined limit
# to decide for interpolation approach
if ((len(self.x) * len(self.y) < self.limit_interp)
and (len(self.x_org) * len(self.y_org) < self.limit_org)):
self.one_go, self.as_chunks, self.as_cols = True, False, False
elif len(self.x_org) * len(self.y_org) >= self.limit_org:
self.one_go, self.as_chunks, self.as_cols = False, True, False
elif (len(self.x) * len(self.y) > self.limit_interp):
self.one_go, self.as_chunks, self.as_cols = False, False, True
if self.several and self.first:
# Store the interpolation approach in the properties file
self.properties['interp_approach'] = {}
self.properties['interp_approach']['one_go'] = self.one_go
self.properties['interp_approach']['as_chunks'] = self.as_chunks
self.properties['interp_approach']['as_cols'] = self.as_cols
# If several datasets shall be interpolated after another and the
# current run is not the first dataset
elif self.several and not self.first:
# Load the interpolation approach from the properties file
self.one_go = self.properties['interp_approach']['one_go']
self.as_chunks = self.properties['interp_approach']['as_chunks']
self.as_cols = self.properties['interp_approach']['as_cols']
def interpolate_dataset(self, data, y, x, x_new, y_new):
"""
Interpolate dataset. Categorical data is interpolated using
nearest neighbor first into x direction then into y direction
Input:
data: data to interpolate, depending on the interpolation
appraoch the whole dataset or a chunk
y: original y vector
x: original x vector
x_new: interpolation vector x
y_new: interpolation vector y
Return:
data_interp: interpolated data
"""
# Interpolation vectors
x_new = np.array(x_new)
y_new = np.array(y_new)
# Make sure that no data values do not corrupt the interpolation
data = data.astype(float)
data[data == self.prop_settings['no_value']] = np.nan
if self.categorical==False:
data = np.flipud(data)
if self.prop_settings['no_value'] != None:
nan_map = np.zeros_like(data)
nan_map[np.isnan(data)] = 1
filled_z = data.copy()
filled_z[np.isnan(data)] = 0
# Interpolation
f = interp2d(x, np.flip(y), filled_z, kind='linear')
data_interp = f(x_new, y_new)
if self.prop_settings['no_value'] != None:
f_nan = interp2d(x, np.flip(y), nan_map, kind='linear')
nan_new = f_nan(x_new, y_new)
# Set all by nan values affected pixels to no data value
data_interp[nan_new > 0] = self.prop_settings['no_value']
return np.flipud(data_interp)
# If data is categorical
elif self.categorical==True:
# Define empty arrays to be filled
if self.prop_settings['no_value'] != None:
nan_map = np.zeros_like(data)
nan_map[np.isnan(data)] = 1
filled_z = data.copy()
filled_z[np.isnan(data)] = 0
data_interp_x = np.zeros((len(y), len(x_new)))
nan_interp_x = np.zeros((len(y), len(x_new)))
# Interpolate first in x direction
for i in range(len(y)):
tmp = filled_z[i, :]
f = interp1d(x, tmp, kind='nearest', fill_value="extrapolate")
data_interp_x[i, :] = f(x_new)
if self.prop_settings['no_value'] != None:
tmp = nan_map[i, :]
f = interp1d(x, tmp, kind='nearest', fill_value="extrapolate")
nan_interp_x[i, :] = f(x_new)
nan_interp = np.zeros((len(y_new), len(x_new)))
# Define empty arrays to be filled
data_interp = np.zeros((len(y_new), len(x_new)))
# Then interpolate in y direction
for i in range(len(x_new)):
tmp = data_interp_x[:, i]
f = interp1d(y, tmp, kind='nearest', fill_value="extrapolate")
data_interp[:, i] = f(y_new)
if self.prop_settings['no_value'] != None:
tmp = nan_interp_x[:, i]
f = interp1d(y, tmp, kind='nearest', fill_value="extrapolate")
nan_interp[:, i] = f(y_new)
# Set all by nan values affected pixels to no data value
if self.prop_settings['no_value'] != None:
data_interp[nan_interp > 0] = self.prop_settings['no_value']
return data_interp
def reshape_chunks(self, chunks):
"""
Interpolated chunks are attached to form the interpolated dataset.
The chunks overlap and for categorical features, only one version
is used. For continuous features, the overlapping parts are averaged.
Input:
chunks: interpolated chunks, list of lists
"""
if self.as_chunks:
array = np.zeros((len(self.y), len(self.x)))
aa = np.zeros((len(self.y), len(self.x)))
test = np.zeros((len(self.y), len(self.x)))
shape_x, shape_y = [], []
for chunk in chunks:
shape_x.append(np.shape(np.array(chunk))[1])
shape_y.append(np.shape(np.array(chunk))[0])
count = 0
for count, chunk in enumerate(chunks):
xt = int((np.abs(self.x - self.x_final[count][0])).argmin())
yt = int((np.abs(self.y - self.y_final[count][0])).argmin())
tmp = np.array(chunks[count])
tmp1 = array[yt:yt+shape_y[count], xt:xt+shape_x[count]]
aa[yt:yt+shape_y[count], xt:xt+shape_x[count]] = tmp
mask = (tmp1 == 0) | (tmp1 == -999) | (tmp == -999)
if not self.categorical:
# Calculate the element-wise average only where mask is False
average_array = np.zeros_like(tmp, dtype=float) # Initialize array for the result
average_array[~mask] = (tmp[~mask] + tmp1[~mask]) / 2
# Assign elements from arr2 where arr1 is equal to zero
average_array[mask] = tmp[mask]
array[yt:yt+shape_y[count], xt:xt+shape_x[count]] = average_array
tmp = np.ones_like(tmp, dtype=float)*count + 1
tmp1 = test[yt:yt+shape_y[count], xt:xt+shape_x[count]]
mask = (tmp1 == 0)
# Calculate the element-wise average only where mask is False
average_array = np.zeros_like(tmp, dtype=float) # Initialize array for the result
average_array[~mask] = (tmp[~mask] + tmp1[~mask]) / 2
# Assign elements from arr2 where arr1 is equal to zero
average_array[mask] = tmp[mask]
test[yt:yt+shape_y[count], xt:xt+shape_x[count]] = average_array
elif self.categorical:
average_array = np.zeros_like(tmp, dtype=float) # Initialize array for the result
average_array[~mask] = (tmp[~mask] + tmp1[~mask]) / 2
# Assign elements from arr2 where arr1 is equal to zero
average_array[mask] = tmp[mask]
array[yt:yt+shape_y[count], xt:xt+shape_x[count]] = tmp
test[yt:yt+shape_y[count], xt:xt+shape_x[count]] = average_array
self.test = test.copy()
elif self.as_cols:
# Final array to be filled
array = np.zeros((len(self.y), len(self.x)))
# Insert the columns of the individual
# chunks into the final dataset
for i in range(len(chunks)):
array[:, self.x_limits[i]] = np.array(chunks[i])
return array