#!/usr/bin/env python3 # -*- coding: utf-8 -*- import numpy as np import pandas as pd import netCDF4 as nc import pickle as pkl import os import logging from sklearn.model_selection import train_test_split from sklearn.ensemble import RandomForestClassifier from sklearn.metrics import mean_squared_error, f1_score, roc_curve, auc, fbeta_score from joblib import delayed, Parallel from tkinter import Label from utilities.ncfile_generation import generate_ncfile from utilities.strings_for_ncfile import char_to_string class prepare_data: """ This class prepares the data to be used in the Random Forest classifier. """ def __init__(self, master, aim, log=None): self.master = master self.logger = log self.row = 0 self.import_parameters() self.logger.info("Susceptibility/hazard map generation started") self.master.geometry() self.master.winfo_toplevel().title("Map generation") Label(self.master, text="Log:").grid(row=self.row, column=0) self.row = self.row + 1 Label(self.master, text="Map generation started").grid( row=self.row, column=1) self.row = self.row + 1 self.master.update() aim = aim invalid = False if aim == 'train_test': invalid = False self.logger.info("Train the model") elif aim == 'prediction': invalid = False self.logger.info("Prepare the susceptibility/hazard map") else: self.logger.info( "Not a valid command. Enter train_test or prediction") invalid = True if not invalid: self.test_size = self.properties_map['size_val'] self.label_name = self.properties_map['name_label'] self.xy = pd.DataFrame() if aim == 'train_test': self.import_features_labels() self.split_training_testing() elif aim == 'prediction': self.import_features() def import_parameters(self): """ User-defined parameters are imported. """ with open('tmp_map.pkl', 'rb') as handle: self.properties_map = pkl.load(handle) with open('tmp_settings.pkl', 'rb') as handle: self.properties_settings = pkl.load(handle) def import_features(self): """ Imports the features for prediction. """ ds = nc.Dataset(self.properties_map['pred_path']) pred = ds['Result'][:, :].data pred_features = ds['features'][:].data self.feature_list = char_to_string(pred_features) self.features = pd.DataFrame(pred, columns=self.feature_list) self.dropped = ds['Dropped'][:].data self.dropped = [int(x) for x in self.dropped] # Save the prediction coordinates in the prediction dataset self.xy['ycoord'] = self.features['ycoord'] self.xy['xcoord'] = self.features['xcoord'] self.features = self.features.drop(['xcoord', 'ycoord'], axis=1) self.feature_list = list(self.features.columns) self.features = np.array(self.features) self.logger.info('Features imported') self.logger.info('The following ' + str(len(self.feature_list)) + ' features are included in the prediction dataset: ' + str(self.feature_list)) Label(self.master, text="Features successfully imported").grid( row=self.row, column=1) self.row = self.row + 1 self.master.update() def import_features_labels(self): """ Imports the features for training. """ # Import training dataset as csv file self.features = pd.read_csv(self.properties_map['train_path']) # Extract and remove labels from training dataset self.labels = np.array( self.features[self.label_name]).reshape( [np.shape(self.features[self.label_name])[0], 1]) self.features = self.features.drop(self.label_name, axis=1) # Store coordinates from training data self.xy['ycoord'] = self.features['ycoord'] self.xy['xcoord'] = self.features['xcoord'] self.features = self.features.drop(['xcoord', 'ycoord', 'ID'], axis=1) self.feature_list = list(self.features.columns) self.features = np.array(self.features) self.logger.info('Features imported') self.logger.info('The following ' + str(len(self.feature_list)) + ' features are included in the training dataset: ' + str(self.feature_list)) Label(self.master, text="Features and label successfully imported").grid( row=self.row, column=1) self.row = self.row + 1 self.master.update() def split_training_testing(self): """ Splits the training data into training and validation data. """ self.train_features, self.test_features, self.train_labels, self.test_labels = \ train_test_split( self.features, self.labels, test_size=self.test_size, random_state=int(self.properties_settings['random_seed']), stratify=self.labels) self.logger.info('Validation dataset split from training dataset') Label(self.master, text="Training dataset splitted").grid( row=self.row, column=1) self.row = self.row + 1 self.master.update() class RandomForest(prepare_data): """ This class conducts the training of the Random Forest model and the generation of the landslide susceptibility and hazard map. """ def __init__(self, master, aim, parallel=False, log=None): super().__init__(master, aim, log=log) self.aim = aim self.logger = log self.parallel = parallel self.num_chunks = 10 # Random Forest settings self.criterion = self.properties_map['criterion'] self.n_estimators = self.properties_map['num_trees'] self.max_depth = self.properties_map['depth_trees'] self.logger.info('Aim: ' + str(aim)) if aim == 'prediction': self.model_dir = self.properties_map['model_path'] + '/' self.model_to_load = self.properties_map['model_to_load'] + '/' else: self.model_dir = self.properties_map['model_path'] + '/' self.model_to_save = self.properties_map['model_to_save'] + '/' self.output_dir = None if aim == 'train_test': Label(self.master, text="Model training started").grid( row=self.row, column=1) self.row = self.row + 1 self.master.update() self.define() self.train() self.predict() self.evaluate() self.create_output_dir() self.save_model() self.save_parameters() self.feature_importance() elif aim == 'prediction': Label(self.master, text="Mapping started").grid( row=self.row, column=1) self.row = self.row + 1 self.master.update() self.create_output_dir() self.load_model() self.predict() self.extract_pos_neg_predictions() self.reshape_prediction() self.save_prediction() def define(self): """ Define the Random Forest Classifier model. """ self.model = RandomForestClassifier( n_estimators=self.n_estimators, max_depth=self.max_depth, random_state=int(self.properties_settings['random_seed'])) self.logger.info('Parameters: ' + str(self.n_estimators) + ' (Num. estimators) ' + '|' + str(self.max_depth) + ' (Depth) ' + '|' + ' (Random seed) ' + '|' + str(self.criterion) + ' (Criterion) ' + '|' + str(self.test_size) + ' (Splitting ratio) ' + '|') Label(self.master, text="Model is defined").grid( row=self.row, column=1) self.row = self.row + 1 self.master.update() def train(self): """ Train the Random Forest Classifier model. """ self.model.fit(self.train_features, np.ravel(self.train_labels)) self.logger.info('Model trained') Label(self.master, text="Model successfully trained").grid( row=self.row, column=1) self.row = self.row + 1 self.master.update() def predict(self): """ Make the prediction. """ if self.aim == 'prediction': pred = self.features elif self.aim == 'train_test': pred = self.test_features if self.parallel: self.split_array_into_chunks(pred) prediction = Parallel(n_jobs=10)(delayed( self.model.predict)(chunk) for chunk in self.chunks) self.prediction = np.concatenate(prediction, axis=0) else: self.prediction = self.model.predict(pred) if self.aim == 'prediction': self.logger.info('Prediction completed') Label(self.master, text="Prediction completed").grid( row=self.row, column=1) self.row = self.row + 1 self.master.update() elif self.aim == 'train_test': Label(self.master, text="Validation data predicted").grid( row=self.row, column=1) self.row = self.row + 1 self.master.update() self.logger.info('Validation data predicted') def split_array_into_chunks(self, pred): """ Split a NumPy array into chunks without changing the number of columns. Input: pred: prediction dataset, varies depending on if the current run is for model training or map generation Output: Nones """ # Calculate the number of rows in each chunk rows_per_chunk = pred.shape[0] // self.num_chunks remaining_rows = pred.shape[0] % self.num_chunks # Create chunks self.chunks = [] start = 0 for i in range(self.num_chunks): end = start + rows_per_chunk + (1 if i < remaining_rows else 0) chunk = pred[start:end, :] self.chunks.append(chunk) start = end self.logger.info('Prediction dataset split into chunks') Label(self.master, text="Prediction dataset split into chunks").grid( row=self.row, column=1) self.row = self.row + 1 self.master.update() def evaluate(self): """ Evaluate the validation dataset. """ y_pred_prob = self.model.predict_proba(self.test_features)[:, 1] self.fpr, self.tpr, self.thresholds = roc_curve(self.test_labels, y_pred_prob) self.roc_auc = auc(self.fpr, self.tpr) diff = [abs(pred-test_labels) for pred, test_labels in zip(list(self.prediction), list(self.test_labels))] self.acc = str(diff.count(1)) + '/' + str(len(diff)) self.mae = round(np.mean(diff), 2) self.mse = mean_squared_error(self.test_labels, self.prediction) self.f1 = f1_score(self.test_labels, self.prediction) self.fbeta = fbeta_score(self.test_labels, self.prediction, beta=2) self.logger.info('Evaluation metrics computed') Label(self.master, text="Evaluation metrics computed").grid( row=self.row, column=1) self.row = self.row + 1 self.master.update() def create_output_dir(self): """ Define and create the output directory. """ if self.aim == 'train_test': self.output_dir = self.model_dir + self.model_to_save else: self.output_dir = self.model_dir + self.model_to_load if not os.path.isdir(self.output_dir): os.makedirs(self.output_dir, exist_ok=True) def save_model(self): """ Save the Random Forest Classifier model. """ with open(self.output_dir + '/saved_model.pkl', 'wb') as file: pkl.dump(self.model, file) self.logger.info('Model saved to ' + self.output_dir + '/saved_model.pkl') Label(self.master, text="Model saved").grid( row=self.row, column=1) self.row = self.row + 1 self.master.update() def save_parameters(self): """ Save the metadata associated with the prediction. """ tmp_max = self.xy.max(axis=0) tmp_min = self.xy.min(axis=0) params = {'Area': [tmp_min[0], tmp_max[0], tmp_min[1], tmp_max[1]], 'criterion': [self.criterion], 'n_estimators': [self.n_estimators], 'max_depth': [self.max_depth], 'features': self.feature_list, 'mse': self.mse, 'mae': self.mae, 'f1': self.f1, 'roc_threshold': self.thresholds, 'roc_fpr': self.fpr, 'roc_tpr': self.tpr, 'roc_auc': self.roc_auc, 'accuracy': self.acc, 'fbeta': self.fbeta } with open(self.model_dir + self.model_to_save + 'model_params.pkl', 'wb') as file: pkl.dump(params, file) self.logger.info('Parameters saved to ' + self.model_dir + self.model_to_save + 'model_params.pkl') Label(self.master, text="Parameters saved").grid( row=self.row, column=1) self.row = self.row + 1 self.master.update() def load_model(self): """ Load the Random Forest Classifier model and the metadata. Make sure to compare features of training and prediction dataset as well as their order. """ with open( self.model_dir + self.model_to_load + 'saved_model.pkl', 'rb') as file: self.model = pkl.load(file) with open( self.model_dir + self.properties_map['model_to_load'] + '/model_params.pkl', 'rb') as f: params = pkl.load(f) self.logger.info('Model succesfully loaded from ' + self.model_dir + self.model_to_load) Label(self.master, text=('Model loaded from ' + self.model_dir + self.model_to_load)).grid(row=self.row, column=1) self.row = self.row + 1 self.master.update() def save_prediction(self): """ Save the prediction. """ if self.aim == 'prediction': output_dir = self.model_dir + self.model_to_load self.xy.to_csv( output_dir + 'prediction_results.csv', columns=['xcoord', 'ycoord', 'pred'], index=True) self.df_pos.to_csv( output_dir + 'pos_prediction_results.csv', columns=['xcoord', 'ycoord', 'pred'], index=True) self.df_neg.to_csv( output_dir + 'neg_prediction_results.csv', columns=['xcoord', 'ycoord', 'pred'], index=True) self.logger.info('Prediction saved in ' + output_dir) Label(self.master, text="Prediction saved as csv-file").grid( row=self.row, column=1) self.row = self.row + 1 self.master.update() def reshape_prediction(self): """ Reshape the individual predictions into a map. """ arr_xy = np.array(self.xy) arr_xy[self.dropped, :] = [self.properties_settings['no_value']] result = np.reshape(list(arr_xy[:, 2]), (len(list(set(self.xy['ycoord']))), len(list(set(self.xy['xcoord']))))) self.logger.info('Map generated') Label(self.master, text="Prediction reshaped into map").grid( row=self.row, column=1) self.row = self.row + 1 self.master.update() self.save_prediction_as_nc(result) def extract_pos_neg_predictions(self): """ Distinguish between the classes of the Classifier. """ self.xy['pred'] = self.prediction self.df_pos = self.xy[self.xy.pred == 1] self.df_neg = self.xy[self.xy.pred == 0] self.logger.info('Positive and negative predictions extracted') def save_prediction_as_nc(self, prediction): """ Save the hazard map to a netCDF4 file. """ outfile_name = self.model_dir + self.model_to_load + 'prediction.nc' if os.path.exists(outfile_name): os.remove(outfile_name) generate_ncfile(outfile_name, np.array(sorted(set(list(self.xy['xcoord'])))), np.array(sorted(set(list(self.xy['ycoord'])))), prediction, crs=self.properties_settings['crs'], missing_value=self.properties_settings['no_value']) self.logger.info('Map saved in ' + outfile_name) Label(self.master, text="Map saved as nc-file").grid( row=self.row, column=1) self.row = self.row + 1 self.master.update() def feature_importance(self): """ Access feature importance information from the Random Forest. """ feature_imp = pd.Series(self.model.feature_importances_, index=self.feature_list).sort_values( ascending=False) feature_imp.to_csv(self.model_dir + self.model_to_save + 'feature_importance.csv') self.logger.info('Feature importance determined') Label(self.master, text="Feature importance computed").grid( row=self.row, column=1) self.row = self.row + 1 self.master.update()