-
Ann-Kathrin Margarete Edrich authoredAnn-Kathrin Margarete Edrich authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
RandomForest_gui.py 18.38 KiB
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import numpy as np
import pandas as pd
import netCDF4 as nc
import pickle as pkl
import os
import logging
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import mean_squared_error, f1_score, roc_curve, auc, fbeta_score
from joblib import delayed, Parallel
from tkinter import Label
from utilities.ncfile_generation import generate_ncfile
from utilities.strings_for_ncfile import char_to_string
class prepare_data:
"""
This class prepares the data to be
used in the Random Forest classifier.
"""
def __init__(self, master, aim, log=None):
self.master = master
self.logger = log
self.row = 0
self.import_parameters()
self.logger.info("Susceptibility/hazard map generation started")
self.master.geometry()
self.master.winfo_toplevel().title("Map generation")
Label(self.master, text="Log:").grid(row=self.row, column=0)
self.row = self.row + 1
Label(self.master, text="Map generation started").grid(
row=self.row, column=1)
self.row = self.row + 1
self.master.update()
aim = aim
invalid = False
if aim == 'train_test':
invalid = False
self.logger.info("Train the model")
elif aim == 'prediction':
invalid = False
self.logger.info("Prepare the susceptibility/hazard map")
else:
self.logger.info(
"Not a valid command. Enter train_test or prediction")
invalid = True
if not invalid:
self.test_size = self.properties_map['size_val']
self.label_name = self.properties_map['name_label']
self.xy = pd.DataFrame()
if aim == 'train_test':
self.import_features_labels()
self.split_training_testing()
elif aim == 'prediction':
self.import_features()
def import_parameters(self):
"""
User-defined parameters are imported.
"""
with open('tmp_map.pkl', 'rb') as handle:
self.properties_map = pkl.load(handle)
with open('tmp_settings.pkl', 'rb') as handle:
self.properties_settings = pkl.load(handle)
def import_features(self):
"""
Imports the features for prediction.
"""
ds = nc.Dataset(self.properties_map['pred_path'])
pred = ds['Result'][:, :].data
pred_features = ds['features'][:].data
self.feature_list = char_to_string(pred_features)
self.features = pd.DataFrame(pred, columns=self.feature_list)
self.dropped = ds['Dropped'][:].data
self.dropped = [int(x) for x in self.dropped]
# Save the prediction coordinates in the prediction dataset
self.xy['ycoord'] = self.features['ycoord']
self.xy['xcoord'] = self.features['xcoord']
self.features = self.features.drop(['xcoord', 'ycoord'], axis=1)
self.feature_list = list(self.features.columns)
self.features = np.array(self.features)
self.logger.info('Features imported')
self.logger.info('The following ' + str(len(self.feature_list))
+ ' features are included in the prediction dataset: '
+ str(self.feature_list))
Label(self.master, text="Features successfully imported").grid(
row=self.row, column=1)
self.row = self.row + 1
self.master.update()
def import_features_labels(self):
"""
Imports the features for training.
"""
# Import training dataset as csv file
self.features = pd.read_csv(self.properties_map['train_path'])
# Extract and remove labels from training dataset
self.labels = np.array(
self.features[self.label_name]).reshape(
[np.shape(self.features[self.label_name])[0], 1])
self.features = self.features.drop(self.label_name, axis=1)
# Store coordinates from training data
self.xy['ycoord'] = self.features['ycoord']
self.xy['xcoord'] = self.features['xcoord']
self.features = self.features.drop(['xcoord', 'ycoord', 'ID'], axis=1)
self.feature_list = list(self.features.columns)
self.features = np.array(self.features)
self.logger.info('Features imported')
self.logger.info('The following ' + str(len(self.feature_list))
+ ' features are included in the training dataset: '
+ str(self.feature_list))
Label(self.master,
text="Features and label successfully imported").grid(
row=self.row, column=1)
self.row = self.row + 1
self.master.update()
def split_training_testing(self):
"""
Splits the training data into training and validation data.
"""
self.train_features, self.test_features, self.train_labels, self.test_labels = \
train_test_split(
self.features,
self.labels,
test_size=self.test_size,
random_state=int(self.properties_settings['random_seed']),
stratify=self.labels)
self.logger.info('Validation dataset split from training dataset')
Label(self.master, text="Training dataset splitted").grid(
row=self.row, column=1)
self.row = self.row + 1
self.master.update()
class RandomForest(prepare_data):
"""
This class conducts the training of the Random Forest model and the
generation of the landslide susceptibility and hazard map.
"""
def __init__(self, master, aim, parallel=False, log=None):
super().__init__(master, aim, log=log)
self.aim = aim
self.logger = log
self.parallel = parallel
self.num_chunks = 10
# Random Forest settings
self.criterion = self.properties_map['criterion']
self.n_estimators = self.properties_map['num_trees']
self.max_depth = self.properties_map['depth_trees']
self.logger.info('Aim: ' + str(aim))
if aim == 'prediction':
self.model_dir = self.properties_map['model_path'] + '/'
self.model_to_load = self.properties_map['model_to_load'] + '/'
else:
self.model_dir = self.properties_map['model_path'] + '/'
self.model_to_save = self.properties_map['model_to_save'] + '/'
self.output_dir = None
if aim == 'train_test':
Label(self.master, text="Model training started").grid(
row=self.row, column=1)
self.row = self.row + 1
self.master.update()
self.define()
self.train()
self.predict()
self.evaluate()
self.create_output_dir()
self.save_model()
self.save_parameters()
self.feature_importance()
elif aim == 'prediction':
Label(self.master, text="Mapping started").grid(
row=self.row, column=1)
self.row = self.row + 1
self.master.update()
self.create_output_dir()
self.load_model()
self.predict()
self.extract_pos_neg_predictions()
self.reshape_prediction()
self.save_prediction()
def define(self):
"""
Define the Random Forest Classifier model.
"""
self.model = RandomForestClassifier(
n_estimators=self.n_estimators,
max_depth=self.max_depth,
random_state=int(self.properties_settings['random_seed']))
self.logger.info('Parameters: '
+ str(self.n_estimators) + ' (Num. estimators) ' + '|'
+ str(self.max_depth) + ' (Depth) ' + '|'
+ ' (Random seed) ' + '|'
+ str(self.criterion) + ' (Criterion) ' + '|'
+ str(self.test_size) + ' (Splitting ratio) ' + '|')
Label(self.master, text="Model is defined").grid(
row=self.row, column=1)
self.row = self.row + 1
self.master.update()
def train(self):
"""
Train the Random Forest Classifier model.
"""
self.model.fit(self.train_features, np.ravel(self.train_labels))
self.logger.info('Model trained')
Label(self.master, text="Model successfully trained").grid(
row=self.row, column=1)
self.row = self.row + 1
self.master.update()
def predict(self):
"""
Make the prediction.
"""
if self.aim == 'prediction':
pred = self.features
elif self.aim == 'train_test':
pred = self.test_features
if self.parallel:
self.split_array_into_chunks(pred)
prediction = Parallel(n_jobs=10)(delayed(
self.model.predict)(chunk) for chunk in self.chunks)
self.prediction = np.concatenate(prediction, axis=0)
else:
self.prediction = self.model.predict(pred)
if self.aim == 'prediction':
self.logger.info('Prediction completed')
Label(self.master, text="Prediction completed").grid(
row=self.row, column=1)
self.row = self.row + 1
self.master.update()
elif self.aim == 'train_test':
Label(self.master, text="Validation data predicted").grid(
row=self.row, column=1)
self.row = self.row + 1
self.master.update()
self.logger.info('Validation data predicted')
def split_array_into_chunks(self, pred):
"""
Split a NumPy array into chunks without changing the number of columns.
Input:
pred: prediction dataset, varies depending on if the current run
is for model training or map generation
Output:
Nones
"""
# Calculate the number of rows in each chunk
rows_per_chunk = pred.shape[0] // self.num_chunks
remaining_rows = pred.shape[0] % self.num_chunks
# Create chunks
self.chunks = []
start = 0
for i in range(self.num_chunks):
end = start + rows_per_chunk + (1 if i < remaining_rows else 0)
chunk = pred[start:end, :]
self.chunks.append(chunk)
start = end
self.logger.info('Prediction dataset split into chunks')
Label(self.master, text="Prediction dataset split into chunks").grid(
row=self.row, column=1)
self.row = self.row + 1
self.master.update()
def evaluate(self):
"""
Evaluate the validation dataset.
"""
y_pred_prob = self.model.predict_proba(self.test_features)[:, 1]
self.fpr, self.tpr, self.thresholds = roc_curve(self.test_labels, y_pred_prob)
self.roc_auc = auc(self.fpr, self.tpr)
diff = [abs(pred-test_labels)
for pred, test_labels
in zip(list(self.prediction), list(self.test_labels))]
self.acc = str(diff.count(1)) + '/' + str(len(diff))
self.mae = round(np.mean(diff), 2)
self.mse = mean_squared_error(self.test_labels, self.prediction)
self.f1 = f1_score(self.test_labels, self.prediction)
self.fbeta = fbeta_score(self.test_labels, self.prediction, beta=2)
self.logger.info('Evaluation metrics computed')
Label(self.master, text="Evaluation metrics computed").grid(
row=self.row, column=1)
self.row = self.row + 1
self.master.update()
def create_output_dir(self):
"""
Define and create the output directory.
"""
if self.aim == 'train_test':
self.output_dir = self.model_dir + self.model_to_save
else:
self.output_dir = self.model_dir + self.model_to_load
if not os.path.isdir(self.output_dir):
os.makedirs(self.output_dir, exist_ok=True)
def save_model(self):
"""
Save the Random Forest Classifier model.
"""
with open(self.output_dir + '/saved_model.pkl', 'wb') as file:
pkl.dump(self.model, file)
self.logger.info('Model saved to '
+ self.output_dir
+ '/saved_model.pkl')
Label(self.master, text="Model saved").grid(
row=self.row, column=1)
self.row = self.row + 1
self.master.update()
def save_parameters(self):
"""
Save the metadata associated with the prediction.
"""
tmp_max = self.xy.max(axis=0)
tmp_min = self.xy.min(axis=0)
params = {'Area': [tmp_min[0], tmp_max[0], tmp_min[1], tmp_max[1]],
'criterion': [self.criterion],
'n_estimators': [self.n_estimators],
'max_depth': [self.max_depth],
'features': self.feature_list,
'mse': self.mse,
'mae': self.mae,
'f1': self.f1,
'roc_threshold': self.thresholds,
'roc_fpr': self.fpr,
'roc_tpr': self.tpr,
'roc_auc': self.roc_auc,
'accuracy': self.acc,
'fbeta': self.fbeta
}
with open(self.model_dir
+ self.model_to_save
+ 'model_params.pkl', 'wb') as file:
pkl.dump(params, file)
self.logger.info('Parameters saved to '
+ self.model_dir
+ self.model_to_save
+ 'model_params.pkl')
Label(self.master, text="Parameters saved").grid(
row=self.row, column=1)
self.row = self.row + 1
self.master.update()
def load_model(self):
"""
Load the Random Forest Classifier model and the metadata.
Make sure to compare features of training and prediction dataset
as well as their order.
"""
with open(
self.model_dir
+ self.model_to_load
+ 'saved_model.pkl', 'rb') as file:
self.model = pkl.load(file)
with open(
self.model_dir
+ self.properties_map['model_to_load']
+ '/model_params.pkl', 'rb') as f:
params = pkl.load(f)
self.logger.info('Model succesfully loaded from '
+ self.model_dir
+ self.model_to_load)
Label(self.master, text=('Model loaded from '
+ self.model_dir
+ self.model_to_load)).grid(row=self.row, column=1)
self.row = self.row + 1
self.master.update()
def save_prediction(self):
"""
Save the prediction.
"""
if self.aim == 'prediction':
output_dir = self.model_dir + self.model_to_load
self.xy.to_csv(
output_dir + 'prediction_results.csv',
columns=['xcoord', 'ycoord', 'pred'],
index=True)
self.df_pos.to_csv(
output_dir + 'pos_prediction_results.csv',
columns=['xcoord', 'ycoord', 'pred'],
index=True)
self.df_neg.to_csv(
output_dir + 'neg_prediction_results.csv',
columns=['xcoord', 'ycoord', 'pred'],
index=True)
self.logger.info('Prediction saved in ' + output_dir)
Label(self.master, text="Prediction saved as csv-file").grid(
row=self.row, column=1)
self.row = self.row + 1
self.master.update()
def reshape_prediction(self):
"""
Reshape the individual predictions into a map.
"""
arr_xy = np.array(self.xy)
arr_xy[self.dropped, :] = [self.properties_settings['no_value']]
result = np.reshape(list(arr_xy[:, 2]),
(len(list(set(self.xy['ycoord']))),
len(list(set(self.xy['xcoord'])))))
self.logger.info('Map generated')
Label(self.master, text="Prediction reshaped into map").grid(
row=self.row, column=1)
self.row = self.row + 1
self.master.update()
self.save_prediction_as_nc(result)
def extract_pos_neg_predictions(self):
"""
Distinguish between the classes of the Classifier.
"""
self.xy['pred'] = self.prediction
self.df_pos = self.xy[self.xy.pred == 1]
self.df_neg = self.xy[self.xy.pred == 0]
self.logger.info('Positive and negative predictions extracted')
def save_prediction_as_nc(self, prediction):
"""
Save the hazard map to a netCDF4 file.
"""
outfile_name = self.model_dir + self.model_to_load + 'prediction.nc'
if os.path.exists(outfile_name):
os.remove(outfile_name)
generate_ncfile(outfile_name,
np.array(sorted(set(list(self.xy['xcoord'])))),
np.array(sorted(set(list(self.xy['ycoord'])))),
prediction,
crs=self.properties_settings['crs'],
missing_value=self.properties_settings['no_value'])
self.logger.info('Map saved in ' + outfile_name)
Label(self.master, text="Map saved as nc-file").grid(
row=self.row, column=1)
self.row = self.row + 1
self.master.update()
def feature_importance(self):
"""
Access feature importance information from the Random Forest.
"""
feature_imp = pd.Series(self.model.feature_importances_,
index=self.feature_list).sort_values(
ascending=False)
feature_imp.to_csv(self.model_dir
+ self.model_to_save
+ 'feature_importance.csv')
self.logger.info('Feature importance determined')
Label(self.master, text="Feature importance computed").grid(
row=self.row, column=1)
self.row = self.row + 1
self.master.update()