Skip to content
Snippets Groups Projects
Commit 88eae717 authored by Ann-Kathrin Margarete Edrich's avatar Ann-Kathrin Margarete Edrich
Browse files

SHIRE version 1.0

parent d4ae7c20
Branches
Tags
No related merge requests found
Showing
with 6931 additions and 1 deletion
path,keys,no_value,continuous
\ No newline at end of file
path,keys,no_value,categorical
\ No newline at end of file
affine==2.3.1
attrs==22.2.0
certifi==2021.5.30
cftime==1.6.0
click==8.0.4
click-plugins==1.1.1
cligj==0.7.2
cycler==0.11.0
importlib-metadata==4.8.3
importlib-resources==5.4.0
joblib==1.1.1
kiwisolver==1.3.1
LatLon23==1.0.7
matplotlib==3.3.4
netCDF4==1.6.2
numpy==1.19.5
pandas==1.1.5
Pillow==8.4.0
pyparsing==3.1.1
pyproj==3.0.1
python-dateutil==2.8.2
pytz==2023.3.post1
rasterio==1.2.10
scikit-learn==0.24.2
scipy==1.5.4
six==1.16.0
sklearn==0.0
snuggs==1.4.7
threadpoolctl==3.1.0
tqdm==4.64.1
typing_extensions==4.1.1
zipp==3.6.0
This diff is collapsed.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import pickle
import pandas as pd
import os
import logging
from utilities.initialise_log import save_log
class check_general_settings():
"""
This class imports the temporary pickle files saved with the
information provided by the user through the GUI and performs quality
control on the provided information.
The class instance error then decides whether SHIRE aborts the run
or if the next step is initialised.
The information on the results of the quality check is saved in a log
file which stored either at the location of the training dataset or
the prediction dataset.
"""
def __init__(self):
path = ['tmp_settings.pkl', 'tmp_map.pkl', 'tmp_train.pkl', 'tmp_pred.pkl']
self.error = False
save_path = 'check_user_input.log'
if os.path.exists(save_path):
os.remove(save_path)
self.logger = save_log(save_path)
self.logger.info("Start checking user input")
self.open_reference()
for self.path in path:
self.logger.info('Check: ' + self.path)
if os.path.exists(self.path):
self.logger.info(self.path + ': available')
self.open_file()
self.check_parameters()
if self.path == path[3]:
self.check_coordinates()
if self.path == path[3] or self.path == path[2]:
self.check_path_extension_geosummary()
else:
self.logger.info(self.path + ': not available')
self.logger.info('Check completed')
for handler in self.logger.handlers:
handler.close()
self.logger.removeHandler(handler)
def open_file(self, path=None):
if path == None:
path = self.path
with open(path, 'rb') as handle:
self.params = pickle.load(handle)
if path == None:
self.logger.info('Parameters loaded')
def open_reference(self):
self.ref = pd.read_csv('./utilities/properties_user_input.csv')
self.logger.info('Reference imported')
def compare_type(self, key, types):
type_map = {
'int': int,
'float': float,
'str': str,
'list': list,
'dict': dict,
'bool': bool
}
types = types.split(',')
types = [type_map.get(i) for i in types]
if type(self.params[key]) not in types:
self.logger.error(key + ': Wrong parameter type provided! Provide: ' + str(types))
self.error = True
def compare_extension(self, key, ext):
if len(self.params[key].split('.')) != 2:
self.logger.error(key + ': Path names should not contain full stops!')
self.error = True
ext = ext.split(',')
if self.params[key].split('.')[1] not in ext:
self.logger.error(key + ': Wrong file format provided! Provide: ' + str(ext))
self.error = True
def compare_range(self, key, r):
r = r.split(',')
if r[1] == 'inf':
if self.params[key] < float(r[0]):
self.logger.error(key + ': Value not within range!')
self.error = True
else:
if self.params[key] < float(r[0]) or self.params[key] > float(r[1]):
self.logger.error(key + ': Value not within range!')
self.error = True
def check_coordinates(self):
if self.params['south'] >= self.params['north']:
if self.params['south'] == self.params['north']:
self.logger.error('Careful! South coordinate indentical to north coordinate!')
else:
self.logger.error('Careful! South coordinate north of north coordinate!')
self.error = True
if self.params['west'] >= self.params['east']:
if self.params['west'] == self.params['east']:
self.logger.error('Careful! West coordinate identical to east coordinate!')
self.error = True
else:
if ((self.params['west'] < 0 and self.params['west'] > -10) and (self.params['east'] > 0 and self.params['east'] < 10)) or ((self.params['west'] > 0 and self.params['west'] > 170) and (self.params['east'] < 0 and self.params['east'] < -170)):
self.logger.warning('Careful! Please check east and west coordinates!')
else:
self.logger.error('Careful! West coordinate east of east coordinate!')
self.error = True
def check_file_exist(self, key, path):
if not os.path.exists(path) and not os.path.isdir(os.path.dirname(path)):
self.logger.error(key + ': Path or file does not exist!')
self.error = True
def check_path_extension_geosummary(self):
self.logger.info('Check paths in geospatial data summary')
summary = pd.read_csv(self.params['geo_path'])
keys_to_include = pd.read_csv(self.params['feat_path'])
for key in list(keys_to_include['keys_to_include']):
idx = list(summary['keys']).index(key)
if summary.at[idx, 'path'].split('.')[1] not in ['nc', 'tif', 'tiff']:
self.logger.error(key + ': Wrong file format!')
self.error = True
if not os.path.exists(summary.at[idx, 'path']):
self.logger.error(key + ': File cannot be found!')
self.error = True
def check_parameters(self):
ref_keys = self.ref['key'].tolist()
for key in list(self.params.keys()):
idx = ref_keys.index(key)
self.logger.info('Check ' + key + ' | is path: ' + str(self.ref.at[idx, 'path']) + ' | Range: ' + str(self.ref.at[idx, 'range']) + ' | Extension: ' + str(self.ref.at[idx, 'extension']) + ' | Type: ' + str(self.ref.at[idx, 'type']))
if self.ref.at[idx, 'path'] == 1:
#print(self.ref.at[idx, 'path'])
self.check_file_exist(key, self.params[key])
if self.ref.at[idx, 'range'] != 'None':
self.compare_range(key, self.ref.at[idx, 'range'])
if self.ref.at[idx, 'extension'] != 'None' and self.ref.at[idx, 'path'] == 1:
self.compare_extension(key, self.ref.at[idx, 'extension'])
if self.ref.at[idx, 'type'] != 'None':
self.compare_type(key, self.ref.at[idx, 'type'])
\ No newline at end of file
This diff is collapsed.
This diff is collapsed.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import numpy as np
import os
import netCDF4 as nc
from tqdm import tqdm
from utilities.ncfile_generation import generate_3dncfile
from utilities.cut_and_interpolate_gui import cut_and_interpolate
from utilities.strings_for_ncfile import char_to_string, features_to_char
class generate_data_matrix:
"""
This class generates a nc-file containing all datasets,
a list of all contained features and their respective longitude and
latitude vectors. Provided are interpolated excerpts of the datasets
determined by the provided bounding box.
Input:
from_scratch: boolean, True if nc-file should be generated from
scratch, otherwise false
delete: True if dataset/feature should be deleted from nc-file
False if dataset should be added to existing nc-file
(careful: from_scratch needs to be False!)
bb: list, bounding box in the format
[<y_max>, <y_min>, <x_min>, <x_max>]
data_to_handle: list of features that should be added/deleted
datasets need to be listed in list_of_raw_datasets
keys_already_included: list of already included features in the
training/prediction dataset
(from_scratch=False, delete=False)
Output:
netCDF4 file
"""
def __init__(self,
from_scratch=True,
delete=False,
dataset='undefined',
bb=None,
data_to_handle=None,
geo_overview=None,
settings=None,
settings_train_pred=None,
keys_already_included=None):
self.from_scratch = from_scratch
self.dataset = dataset
self.bb = bb
self.keys_already_included = keys_already_included
self.settings_train_pred = settings_train_pred
self.keys = geo_overview['keys'].tolist()
self.raw_datasets_path = geo_overview['path'].tolist()
self.data_no_value = geo_overview['no_value'].tolist()
self.geo_overview = geo_overview
self.category = []
for x in geo_overview['categorical'].tolist():
if x == 0:
self.category.append(False)
else:
self.category.append(True)
self.settings = settings
self.data_to_handle = data_to_handle
if not from_scratch:
self.delete = delete
self.import_cube()
if delete:
# Delete dataset from cube
self.delete_dataset()
else:
# Add dataset to cube
self.add_dataset()
else:
# Generate cube from scratch
self.main()
def find_dataset(self):
"""
Find the index of the features to handle in the list of features
contained in the nc-file.
Return:
idx: list of indices
"""
return self.features.index(self.data_to_handle)
def add_dataset(self):
# Number of overlapping features between datasets in the cube
# and the datasets to add/delete
print('features')
print(self.features)
if self.dataset == 'prediction':
for_prediction = True
else:
for_prediction = False
# Define new cube in the size of existing cube of cut and interpolated
# datasets with the depth equaling the number of existing datasets plus
# the ones to add
ges = list(self.features) + [x for x in self.data_to_handle if x not in self.features]
cube = np.zeros((np.shape(self.cube)[0],
np.shape(self.cube)[1],
len(ges)))
for feat in self.features:
cube[:, :, ges.index(feat)] = self.cube[:, :, self.features.index(feat)]
for key in self.data_to_handle:
s = cut_and_interpolate(
key=key,
path=self.raw_datasets_path[self.keys.index(key)],
no_data_value=self.data_no_value[self.keys.index(key)],
categorical=list(self.geo_overview['categorical'])[self.keys.index(key)],
several=True,
several_same=False,
first=False,
#bb=self.bb,
for_prediction=for_prediction,
prop_settings=self.settings,
prop=self.settings_train_pred,
path_properties=self.folder
+ '/data_combined_'
+ self.dataset
+ '_'
+ str(self.settings['resolution']) + '.pkl')
array, _, _, cuttable = s.array, s.x, s.y, s.cuttable
if not cuttable:
print('Error! Bounding box larger than dataset!\
Please adapt bounding_box!')
break
else:
# Store it at respective position in cube
# Add cut and interpolated dataset to cube
cube[:, :, ges.index(key)] = array
# Save the updated cube to nc file
self.determine_outfile()
self.char_features = features_to_char(ges)
generate_3dncfile(self.outfile,
self.x,
self.y,
cube,
len(ges),
self.char_features,
crs='wgs84',
data_unit=None,
missing_value=self.settings['no_value'])
def delete_dataset(self):
"""
Delte datasets from data_to_handle
from nc-file and save new nc-file
"""
# Determine indices of the datasets that shall be removed
idx = []
for data in self.data_to_handle:
idx.append(self.find_dataset)
# Define new cube in the size of existing cube of
# cut and interpolated datasets
cube = np.zeros((np.shape(self.cube)[0],
np.shape(self.cube)[1],
np.shape(self.cube)[2]-len(self.data_to_handle)))
count = 0
# Go through the datasets and transfer
# all datasets except for them to be removed
for i in range(np.shape(self.cube)[2]):
if self.features[i] not in self.data_to_handle:
cube[:, :, count] = self.cube[:, :, i]
count = count + 1
# Update the feature list
for data in self.data_to_handle:
self.features.remove(data)
# Save new data cube
self.determine_outfile()
self.char_features = features_to_char(self.features)
generate_3dncfile(self.outfile,
self.x,
self.y,
cube,
len(self.features),
self.char_features,
crs='wgs84',
data_unit=None,
missing_value=self.settings['no_value'])
def import_cube(self):
"""
Existing nc-file is imported for adding/deleting another feature.
"""
self.determine_outfile() # Determine where cube is stored
# Import cube
self.ds = nc.Dataset(self.outfile)
self.cube = self.ds['Result'][:, :, :].data
self.x = self.ds['Longitude'][:].data
self.y = self.ds['Latitude'][:].data
self.features = self.ds['features'][:].data
self.features = char_to_string(self.features)
def determine_outfile(self):
"""
Determine whether folder to store the nc-file already exists.
If not, it is created. Outfile path is determined.
"""
# Cube is stored in the same folder
# as the final training/prediction dataset
if self.dataset == 'training':
self.folder = os.path.dirname(self.settings_train_pred['train_path'])
self.outfile = self.folder + '/data_combined_training_' + str(self.settings['resolution']) + '.nc'
elif self.dataset == 'prediction':
self.folder = os.path.dirname(self.settings_train_pred['pred_path'])
self.outfile = self.folder + '/data_combined_prediction_' + str(self.settings['resolution']) + '.nc'
# Create folder if it doesn't yet exist
isExist = os.path.exists(self.folder)
if not isExist:
os.makedirs(self.folder)
def check_existence_datasets(self):
"""
Check if dataset exists
"""
# Check existance of all datasets that shall be pre-processed
for i in range(len(self.raw_datasets_path)):
self.all_exist = os.path.isfile(str(self.raw_datasets_path[i]))
if not self.all_exist:
print('Path '
+ str(self.raw_datasets_path[i])
+ ' does not exist!')
break
def main(self):
"""
Routine to pre-process the datasets from scratch
"""
#self.check_existence_datasets() # Check the existance of all datasets
#if self.all_exist: # If all datasets exist
# Go through all datasets that shall be pre-processed
for i in tqdm(range(len(self.data_to_handle))):
j = self.keys.index(self.data_to_handle[i])
print(self.data_to_handle[i])
if i == 0:
if self.dataset == 'prediction':
# Cut and interpolate dataset to desired resolution.
# Check script for information on input parameters.
s = cut_and_interpolate(
key=self.data_to_handle[i],
path=self.raw_datasets_path[j],
no_data_value=self.data_no_value[j],
categorical=self.category[j],
several=True,
several_same=False,
first=True,
bb=self.bb,
for_prediction=True,
prop_settings=self.settings,
prop=self.settings_train_pred,
path_properties=self.settings[
'pred_path'].rsplit('/', 1)[0]
+ '/data_combined_prediction_'
+ str(self.settings['resolution'])
+ '.pkl')
else:
# Cut and interpolate dataset to desired resolution.
# Check script for information on input parameters.
s = cut_and_interpolate(
key=self.data_to_handle[i],
path=self.raw_datasets_path[j],
no_data_value=self.data_no_value[j],
categorical=self.category[j],
several=True,
several_same=False,
first=True,
bb=self.bb,
prop_settings=self.settings,
prop=self.settings_train_pred,
path_properties=self.settings[
'train_path'].rsplit('/', 1)[0]
+ '/data_combined_training_'
+ str(self.settings['resolution'])
+ '.pkl')
array = s.array
self.x = s.x
self.y = s.y
cuttable = s.cuttable
if not cuttable:
print('Error! Bounding box larger than dataset!\
Please adapt bounding_box!')
break
# Store cut and interpolated dataset in array
cube = np.zeros((np.shape(array)[0],
np.shape(array)[1],
len(self.data_to_handle)))
cube[:, :, 0] = array
else:
if self.dataset == 'prediction':
s = cut_and_interpolate(
key=self.data_to_handle[i],
path=self.raw_datasets_path[j],
no_data_value=self.data_no_value[j],
categorical=self.category[j],
several=True,
several_same=False,
first=False,
bb=self.bb,
for_prediction=True,
prop_settings=self.settings,
prop=self.settings_train_pred,
path_properties=self.settings[
'pred_path'].rsplit('/', 1)[0]
+ '/data_combined_prediction_'
+ str(self.settings['resolution'])
+ '.pkl')
else:
# Cut and interpolate dataset to desired resolution.
# Check script for information on input parameters.
s = cut_and_interpolate(
key=self.data_to_handle[i],
path=self.raw_datasets_path[j],
no_data_value=self.data_no_value[j],
categorical=self.category[j],
several=True,
several_same=False,
first=False,
bb=self.bb,
prop_settings=self.settings,
prop=self.settings_train_pred,
path_properties=self.settings[
'train_path'].rsplit('/', 1)[0]
+ '/data_combined_training_'
+ str(self.settings['resolution'])
+ '.pkl')
array, cuttable = s.array, s.cuttable
if not cuttable:
print('Error! Bounding box larger than dataset!\
Please adapt bounding_box!')
print(self.data_to_handle[i])
break
# Store cut and interpolated dataset in array
cube[:, :, i] = array
# Store the array in a nc-file and meta data in pickle file
if cuttable:
self.determine_outfile()
self.char_features = features_to_char(self.data_to_handle)
generate_3dncfile(self.outfile,
self.x,
self.y,
cube,
len(self.data_to_handle),
self.char_features,
crs='wgs84',
data_unit=None,
missing_value=self.settings['no_value'])
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import pickle
import os
import tkinter as tk
from create_training_data_gui import *
from create_prediction_data_gui import *
from RandomForest_gui import *
from check_user_input import check_general_settings
from utilities.initialise_log import save_log
from utilities.gui import *
"""
This script controls the hazard mapping framework SHIRE.
Ensure a data summary csv file
and a csv file containing the keys to include have been prepared.
For more information please refer to the user manual.
"""
if os.path.isfile('tmp_settings.pkl'):
os.remove('tmp_settings.pkl')
if os.path.isfile('tmp_train.pkl'):
os.remove('tmp_train.pkl')
if os.path.isfile('tmp_pred.pkl'):
os.remove('tmp_pred.pkl')
if os.path.isfile('tmp_map.pkl'):
os.remove('tmp_map.pkl')
#Get the general settings
master = tk.Tk()
general_settings(master)
master.mainloop()
s = check_general_settings()
if os.path.exists('shire_run.log'):
os.remove('shire_run.log')
logger = save_log('shire_run.log')
logger.info('SHIRE has successfully been launched')
logger.info('User input required')
logger.info('General settings defined')
if s.error:
logger.info('There is an error in the user input. For more infos check the check_user_input.log')
else:
if os.path.isfile('tmp_settings.pkl'):
with open('tmp_settings.pkl', 'rb') as handle:
properties_settings = pickle.load(handle)
master = tk.Tk()
if properties_settings['train'] == 1:
logger.info('Training dataset generation started')
s = create_training_data(master=master, log=logger)
os.remove('tmp_train.pkl')
logger = s.logger
if properties_settings['pred'] != 1 and properties_settings['map'] != 1:
for handler in logger.handlers:
handler.close()
logger.removeHandler(handler)
master.destroy()
master = tk.Tk()
if properties_settings['pred'] == 1:
logger.info('Prediction dataset generation started')
s = create_prediction_data(master=master, log=logger)
os.remove('tmp_pred.pkl')
logger = s.logger
if properties_settings['pred'] != 1 and properties_settings['map'] != 1:
for handler in logger.handlers:
handler.close()
logger.removeHandler(handler)
master.destroy()
master = tk.Tk()
if properties_settings['map'] == 1:
logger.info('Map generation started')
with open('tmp_map.pkl', 'rb') as handle:
properties_map = pickle.load(handle)
if properties_map['training'] == 1 and properties_map['prediction'] == 1:
for mode in ['train_test', 'prediction']:
if mode == 'train_test':
s = RandomForest(master, mode, log=logger)
else:
if properties_map['parallel'] == 1:
s = RandomForest(master, mode, parallel=True, log=logger)
else:
s = RandomForest(master, mode, log=logger)
elif properties_map['training'] == 1 and properties_map['prediction'] == 0:
s = RandomForest(master, 'train_test', log=logger)
elif properties_map['prediction'] == 1 and properties_map['training'] == 0:
if properties_map['parallel'] == 1:
s = RandomForest(master, 'prediction', parallel=True, log=logger)
else:
s = RandomForest(master, 'prediction', log=logger)
os.remove('tmp_map.pkl')
logger = s.logger
for handler in logger.handlers:
handler.close()
logger.removeHandler(handler)
This diff is collapsed.
This diff is collapsed.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import pandas as pd
import numpy as np
from sklearn.preprocessing import OneHotEncoder
def handle_categorical_values(df, datasets_summary, ohe, basic, var=None):
"""
Categorical features in the training dataset are either one hot
encoded or ordinal encoded
Input:
df: DataFrame containing continuous and categorical features, Pandas DataFrame
datasets_summary: Information on the datasets from which the values in df have been extracted, Pandas DataFrame
ohe: True for One-hot encoding, False for ordinal encoding, Boolean
basic: columns in df not to be considered such as coordinates, ID and label, list
var: specific features to consider only, list
"""
if var == None:
cat = []
for feat in df.columns.tolist():
if feat not in basic:
index = datasets_summary['keys'].tolist().index(feat)
if bool(datasets_summary['categorical'].tolist()[index]) == True:
cat.append(feat)
else:
cat = []
for feat in var:
index = datasets_summary['keys'].tolist().index(feat)
if bool(datasets_summary['categorical'].tolist()[index]) == True:
cat.append(feat)
if len(cat) > 0:
if ohe:
encoder = OneHotEncoder(sparse=False)
encoded_data = encoder.fit_transform(df[cat])
unique_categories = {col: df[col].unique() for col in cat}
print(unique_categories)
custom_column_names = []
for col in cat:
for unique_value in unique_categories[col]:
if isinstance(unique_value, (float, np.float32)):
unique_value = int(unique_value)
custom_column_names.append(f'{col}_{str(unique_value)}_encode')
encoded_df = pd.DataFrame(encoded_data, columns=custom_column_names)
df = pd.concat([df.drop(columns=cat), encoded_df], axis=1)
else:
columns_to_encode = df.select_dtypes(include=['object', 'category']).columns.tolist()
encoder = OrdinalEncoder()
encoded_data = encoder.fit_transform(df[columns_to_encode])
encoded_df = pd.DataFrame(encoded_data, columns=[f"{col}_encoded" for col in columns_to_encode])
df = pd.concat([df.drop(columns=columns_to_encode), encoded_df], axis=1)
return df
\ No newline at end of file
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import rasterio
import numpy as np
import netCDF4 as nc
import pandas as pd
def import_tif(path):
"""
Import a geotiff file
Input:
path: Path to the tif file to open, string
missing_value = no data value of the
"""
raster = rasterio.open(path, 'r')
data = raster.read()[0, :, :]
if np.dtype(data[0, 0]) == 'uint8':
data = np.int32(data)
bounds = raster.bounds
x = np.linspace(bounds[0], bounds[2], np.shape(data)[1])
y = np.linspace(bounds[1], bounds[3], np.shape(data)[0])
crs = raster.crs
if y[0] < y[-1]:
y = np.flip(y)
return data, x, y, crs
def import_nc(path):
"""
Import a netCDF4 file and contained metadata
Input:
path: Path to the netCDF4 file to open, string
"""
ds = nc.Dataset(path)
x = ds['Longitude'][:]
y = ds['Latitude'][:]
if 'Result' in ds.variables.keys():
data = ds['Result'][:][:]
data = np.float64(data)
data = data.data
else:
data = None
if 'Time' in ds.variables.keys():
data = ds['Result'][:][:]
data = data.data
if hasattr(ds.variables['Longitude'], 'units'):
crs = ds['Longitude'].units
else:
crs = None
x = x.data
y = y.data
if y[0] < y[-1]:
y = np.flip(y)
return data, x, y, crs
def import_cvs(path):
"""
Import a csv file
Input:
path: Path to the csv file to open, string
"""
df = pd.read_csv(path)
return df
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import numpy as np
from utilities.import_format import import_tif, import_nc
def import_raw_dataset(path, no_data, no_value):
"""
Import geotiff or netCDF4 file
Input:
path: path to the dataset, string
no_data: no data values, list
no_value: general no data value, int or float
Output:
data: dataset, numpy array
x_org: longitude coordinates, list
y_org: latitude coordinates, list
"""
warning = False
if path.split('.')[-1] == 'tif':
data, x_org, y_org, _ = import_tif(path)
elif path.split('.')[-1] == 'nc':
data, x_org, y_org, _ = import_nc(path)
else:
warning = True
if y_org[0] < y_org[-1]:
y_org = np.flip(y_org)
if no_data != 'None':
for val in no_data:
data[data == val] = no_value
data[np.isnan(data)] = no_value
if warning:
return None, None, None
else:
return data, x_org, y_org
\ No newline at end of file
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import logging
def save_log(path):
"""
Initialisation of a log file using the python package logging to store
information, warnings and errors
Input:
path: Path where to store the log file
Output:
logger: Logger
"""
path_log = os.path.dirname(path)
logger = logging.getLogger()
logger.setLevel(logging.INFO)
formatter = logging.Formatter(
'%(asctime)s | %(levelname)s | %(message)s')
file_handler = logging.FileHandler(path)
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
return logger
\ No newline at end of file
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import netCDF4 as nc
import settings
def generate_basic_ncfile(outfile, crs=None):
"""
Initialise basic netCDF4 file
Input:
Outfile: path to store the netcdf file, string
crs: coordinate reference system, string
"""
# If outfile exists already, delete
if os.path.exists(outfile):
os.remove(outfile)
ds = nc.Dataset(outfile, 'w', format='NETCDF4')
return ds
def generate_ncfile(outfile, x, y, data, crs=None,
data_unit=None, missing_value=settings.no_value):
"""
Save 2D dataset as netCDF4 file
Input:
Outfile: path to store the netcdf file, string
x: longitude vector, list
y: latitude vector, list
data: 2D data array
crs: coordinate reference system, string
data_unit: data unit, string
missing_value: no data value, integer or float
"""
# If outfile exists already, delete
if os.path.exists(outfile):
os.remove(outfile)
ds = nc.Dataset(outfile, 'w', format='NETCDF4')
ds.createDimension('lon', len(x))
ds.createDimension('lat', len(y))
longitude = ds.createVariable('Longitude', 'f4', 'lon')
latitude = ds.createVariable('Latitude', 'f4', 'lat')
result = ds.createVariable('Result', 'f4', ('lat', 'lon'))
longitude[:] = x
latitude[:] = y
result[:, :] = data
# Provide global information in output-file
if crs is not None:
longitude.units = crs
latitude.units = crs
if data_unit is not None:
result.units = data_unit
ds.missing_value = missing_value
ds.close()
def generate_3dncfile(outfile, x, y, data, dim, features, crs='wgs84',
data_unit=None, missing_value=settings.no_value):
"""
Save 3D dataset as netCDF4 file, e.g. data cube
Input:
Outfile: path to store the netcdf file, string
x: longitude vector, list
y: latitude vector, list
dim: number of 2D datasets, integer
data: 2D data array
features: contained features in prediction dataset, list of chars
crs: coordinate reference system, string
data_unit: data unit, string
missing_value: no data value, integer or float
"""
# If outfile exists already, delete
if os.path.exists(outfile):
os.remove(outfile)
ds = nc.Dataset(outfile, 'w', format='NETCDF4')
ds.createDimension('lon', len(x))
ds.createDimension('lat', len(y))
ds.createDimension('dim', dim)
ds.createDimension('feat', len(features))
longitude = ds.createVariable('Longitude', 'f4', 'lon')
latitude = ds.createVariable('Latitude', 'f4', 'lat')
result = ds.createVariable('Result', 'f4', ('lat', 'lon', 'dim'))
Features = ds.createVariable('features', 'S1', 'feat')
longitude[:] = x
latitude[:] = y
result[:, :, :] = data
Features[:] = features
# Provide global information in output-file
if crs is not None:
longitude.units = crs
latitude.units = crs
if data_unit is not None:
result.units = data_unit
ds.missing_value = missing_value
ds.close()
def generate_2dncfile(outfile, x, y, data, features, crs='wgs84',
data_unit=None, missing_value=settings.no_value):
"""
Save 2D dataset as netCDF4 file, e.g. Prediction dataset
Input:
Outfile: path to store the netcdf file, string
x: longitude vector, list
y: latitude vector, list
data: 2D data array
features: contained features in prediction dataset, list of chars
crs: coordinate reference system, string
data_unit: data unit, string
missing_value: no data value, integer or float
"""
# If outfile exists already, delete
if os.path.exists(outfile):
os.remove(outfile)
ds = nc.Dataset(outfile, 'w', format='NETCDF4')
ds.createDimension('lon', len(x))
ds.createDimension('lat', len(y))
ds.createDimension('feat', len(features))
longitude = ds.createVariable('Longitude', 'f4', 'lon')
latitude = ds.createVariable('Latitude', 'f4', 'lat')
result = ds.createVariable('Result', 'f4', ('lat', 'lon'))
Features = ds.createVariable('features', 'S1', 'feat')
longitude[:] = x
latitude[:] = y
result[:, :] = data
Features[:] = features
# Provide global information in output-file
if crs is not None:
longitude.units = crs
latitude.units = crs
if data_unit is not None:
result.units = data_unit
ds.missing_value = missing_value
ds.close()
key,type,range,extension,path
ls_path,str,None,csv,1
nonls_path,str,None,nc,1
train_path,str,None,csv,1
geo_path,str,None,csv,1
feat_path,str,None,csv,1
x,str,None,None,0
y,str,None,None,0
id,str,None,None,0
x_nonls,str,None,None,0
y_nonls,str,None,None,0
num_nonls,"int,float",None,None,0
from_scratch,"int,bool",None,"0,1",0
delete,"int,bool",None,"0,1",0
add,"int,bool",None,"0,1",0
cluster,"int,bool",None,"0,1",0
data_to_handle,str,None,None,0
preprocess,str,None,None,0
no_interpolation,"int,bool",None,"0,1",0
interpolation,"int,bool",None,"0,1",0
resolution,int,"1,inf",None,0
random_seed,int,"1,inf",None,0
crs,str,None,None,0
no_value,int,None,None,0
train,bool,None,None,0
pred,bool,None,None,0
map,bool,None,None,0
pred_path,str,None,nc,1
east,"int,float","-180,180",None,0
west,"int,float","-180,180",None,0
north,"int,float","-90,90",None,0
south,"int,float","-90,90",None,0
model_path,str,None,None,1
drop_train,str,None,None,0
drop_pred,str,None,None,0
model_to_load,str,None,None,0
model_to_save,str,None,None,0
num_trees,int,"1,inf",None,0
size_val,"int,float","0,1",None,0
depth_trees,int,"1,inf",None,0
name_label,str,None,None,0
criterion,str,None,None,0
training,"int,bool",None,None,0
prediction,"int,bool",None,None,0
parallel,"int,bool",None,None,0
keep,"int,bool","0,1",None,0
remove_instances,"int,bool","0,1",None,0
ohe,"int,bool","0,1",None,0
\ No newline at end of file
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
def features_to_char(feat):
"""
Turn list of features to chars so it can be stored in the nc-file.
"""
char_features = []
for feature in feat:
for letter in feature:
char_features.append(letter)
char_features.append('/')
char_features = char_features[0:-1]
return char_features
def char_to_string(features):
"""
Input:
features: list of features as chars
Return:
features as strings
Turns list of chars into strings providing information on
contained features in nc-file. Feature names have to be separated
by '/'.
"""
features_decode = []
for feature in features:
features_decode.append(feature.decode('UTF-8'))
tmp = ''.join(features_decode)
return tmp.split('/')
This diff is collapsed.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import pandas as pd
from settings import *
from utilities.initialise_log import save_log
class check_general_settings():
def __init__(self):
if training_dataset or map_generation:
save_path = os.path.dirname(path_train) + '/check_user_input.log'
elif prediction_dataset:
save_path = os.path.dirname(path_pred) + '/check_user_input.log'
else:
save_path = 'check_user_input.log'
if os.path.exists(save_path):
os.remove(save_path)
self.logger = save_log(save_path)
self.logger.info("Start checking user input")
self.error = False
self.set_up_dic()
self.check_bools()
self.check_list()
self.check_int()
self.check_int_float()
self.check_string()
self.check_path()
self.check_bb()
self.check_if_right_params_are_set()
self.check_extension()
self.check_path_extension_geosummary()
for handler in self.logger.handlers:
handler.close()
self.logger.removeHandler(handler)
def check_if_right_params_are_set(self):
if training_dataset is None and prediction_dataset is None and map_generation is None:
self.logger.error('Specify a purpose of the run! Set either training_dataset, prediction_dataset and/or map_generation')
self.error = True
if None in [crs, no_value, random_seed, resolution]:
self.logger.error('Set the general settings crs, no_value, random_seed and resolution!')
self.error = True
if training_dataset:
if train_from_scratch is None and train_delete is None:
self.logger.error('Speciy whether you want to generate training dataset from scratch or add/remove feature(s)')
self.error = True
else:
if None in [cluster, interpolation, data_summary_path, key_to_include_path, path_train, path_landslide_database, ID, landslide_database_x, landslide_database_y, path_nonls_locations, num_nonls, nonls_database_x, nonls_database_y]:
self.logger.error('Speciy all necessary parameters for training dataset generation!')
self.error = True
if prediction_dataset:
if pred_from_scratch is None and pred_delete is None:
self.logger.error('Speciy whether you want to generate prediction dataset from scratch or add/remove feature(s)')
self.error = True
else:
if None in [data_summary_path, key_to_include_path, bounding_box, path_pred]:
self.logger.error('Speciy all necessary parameters for prediction dataset generation!')
self.error = True
if map_generation:
if None in [path_ml, size, not_included_pred_data, not_included_train_data, num_trees, criterion, depth, model_to_save, model_to_load, model_database_dir, parallel]:
self.logger.error('Speciy all necessary parameters for map generation!')
self.error = True
def set_up_dic(self):
self.dic = {}
self.dic['bool'] = [training_dataset, train_from_scratch, train_delete, prediction_dataset, pred_from_scratch, pred_delete, map_generation, parallel]
self.dic['path'] = [path_ml, data_summary_path, key_to_include_path, path_train, path_landslide_database, path_nonls_locations, path_pred, model_database_dir]
self.dic['str'] = [crs, ID, landslide_database_x, landslide_database_y, nonls_database_x, nonls_database_y, criterion, model_to_save, model_to_load]
self.dic['int'] = [resolution, random_seed, num_nonls, num_trees, depth]
self.dic['int_float'] = [size, no_value]
self.dic['list'] = [bounding_box, not_included_pred_data, not_included_train_data]
self.dic_steps = {}
self.dic_steps['general'] = []
self.dic_steps['run_purpose'] = [training_dataset, prediction_dataset, map_generation]
def check_extension(self):
for path in [data_summary_path, key_to_include_path, path_landslide_database, path_train]:
if path is not None:
if len(path.split('.')) != 2:
self.logger.error(path + ': Paths must not contain full stops!')
self.error = True
else:
if path.split('.')[1] != 'csv':
self.logger.error(path + ': wrong file format! Needs to be csv')
self.error = True
for path in [path_pred, path_nonls_locations]:
if path is not None:
if len(path.split('.')) != 2:
self.logger.error(path + ': Paths must not contain full stops!')
self.error = True
else:
if path.split('.')[1] != 'nc':
self.logger.error(path + ': wrong file format! Needs to be nc')
self.error = True
def check_bools(self):
self.logger.info("Start checking bools")
for key in self.dic['bool']:
if key is not None:
if type(key) is not bool:
self.logger.info(key + ': not a bool')
self.error = True
def check_list(self):
self.logger.info("Start checking list")
for key in self.dic['list']:
if key is not None:
if type(key) is not list:
self.logger.info(key + ': not a list')
self.error = True
def check_int(self):
self.logger.info("Start checking integers")
for key in self.dic['int']:
if key is not None:
if type(key) is not int:
self.logger.info(key + ': not an integer')
self.error = True
def check_int_float(self):
self.logger.info("Start checking integers and floats")
for key in self.dic['int_float']:
if key is not None:
if type(key) is not int and type(key) is not float:
self.logger.info(key + ': not an integer or float')
self.error = True
def check_string(self):
self.logger.info("Start checking strings")
for key in self.dic['str']:
if key is not None:
if type(key) is not str:
self.logger.info(key + ': not a string')
self.error = True
def check_path(self):
self.logger.info("Start checking paths")
for key in self.dic['path']:
if key is not None:
if type(key) is not str:
self.logger.info(key + ': path is not a string')
self.error = True
else:
if key == path_train and training_dataset is True:
pass
elif key == path_pred and prediction_dataset is True:
pass
else:
if not os.path.exists(key):
self.logger.error(key + ': path could not be found!')
self.error = True
def check_bb(self):
if bounding_box is not None:
if bounding_box[1] >= bounding_box[0]:
self.logger.error('Careful! South coordinate north of north coordinate!')
self.error = True
if bounding_box[2] >= bounding_box[3]:
if (((bounding_box[2] < 0 and bounding_box[2] > -10) and (bounding_box[3] > 0 and bounding_box[3] < 10))
or ((bounding_box[2] > 0 and bounding_box[2] > 170) and (bounding_box[3] < 0 and bounding_box[3] < -170))):
self.logger.warning('Careful! Please check east and west coordinates!')
else:
self.logger.error('Careful! West coordinate east of east coordinate!')
def check_path_extension_geosummary(self):
self.logger.info('Start checking paths in geospatial data summary')
if data_summary_path is not None and key_to_include_path is not None:
if os.path.exists(data_summary_path) and os.path.exists(key_to_include_path):
if data_summary_path.split('.')[1] != 'csv' and key_to_include_path.split('.')[1] != 'csv':
summary = pd.read_csv(data_summary_path)
keys_to_include = pd.read_csv(key_to_include_path)
for key in list(keys_to_include['keys_to_include']):
idx = list(summary['keys']).index(key)
if summary.at[idx, 'path'].split('.')[1] not in ['nc', 'tif', 'tiff']:
self.logger.error(key + ': Wrong file format!')
self.error = True
if not os.path.exists(summary.at[idx, 'path']):
self.logger.error(key + ': File cannot be found!')
self.error = True
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment