#!/usr/bin/env python3 # -*- coding: utf-8 -*- import numpy as np import pandas as pd import netCDF4 as nc import pickle as pkl import os import pickle from sklearn.model_selection import train_test_split from sklearn.ensemble import RandomForestClassifier from sklearn.metrics import mean_squared_error, f1_score, roc_curve, auc, fbeta_score from joblib import Parallel, delayed import settings from utilities.ncfile_generation import generate_ncfile from utilities.strings_for_ncfile import char_to_string class prepare_data: """ This class prepares the data to be used in the Random Forest classifier. """ def __init__(self, aim, logger): invalid = False self.aim = aim self.logger = logger if aim == 'train_test': print('Train the model') invalid = False elif aim == 'prediction': print('Prepare the hazard map') invalid = False else: print('Not a valid command. Enter train_test or prediction.') invalid = True if not invalid: self.test_size = settings.size # Size of validation dataset # Column name of label in training dataset self.label_name = 'label' self.xy = pd.DataFrame() # Array to save coordinates for reshaping if aim == 'train_test': self.import_features_labels() # Prepare the training # Generate train/validation dataset self.split_training_testing() elif aim == 'prediction': self.import_features() # Import prediction dataset def import_features(self): """ Imports the features for prediction. """ # Import prediction dataset either as csv file or nc file if os.path.isdir(settings.path_pred): path_pred = settings.path_pred + 'prediction.nc' else: path_pred = settings.path_pred ds = nc.Dataset(path_pred) pred = ds['Result'][:, :].data pred_features = ds['features'][:].data self.feature_list = char_to_string(pred_features) self.features = pd.DataFrame(pred, columns=self.feature_list) self.dropped = ds['Dropped'][:].data self.dropped = [int(x) for x in self.dropped] # Save the prediction coordinates in the prediction dataset self.xy['ycoord'] = self.features['ycoord'] self.xy['xcoord'] = self.features['xcoord'] self.features = self.features.drop(['xcoord', 'ycoord'], axis=1) self.feature_list = list(self.features.columns) self.features = np.array(self.features) self.logger.info('Features for prediction were imported') self.logger.info('The following ' + str(len(self.feature_list)) + ' features are included in the prediction dataset: ' + str(self.feature_list)) def import_features_labels(self): """ Imports the features for training. """ # Import training dataset as csv file if os.path.isdir(settings.path_train): self.features = pd.read_csv(settings.path_train + 'training.csv') else: self.features = pd.read_csv(settings.path_train) # Extract and remove labels from training dataset self.labels = np.array(self.features[self.label_name]).reshape( [np.shape(self.features[self.label_name])[0], 1]) self.features = self.features.drop(self.label_name, axis=1) # Store coordinates from training data self.xy['ycoord'] = self.features['ycoord'] self.xy['xcoord'] = self.features['xcoord'] self.features = self.features.drop(['xcoord', 'ycoord', 'ID'], axis=1) self.feature_list = list(self.features.columns) self.features = np.array(self.features) self.logger.info('Features for training were imported') self.logger.info('The following ' + str(len(self.feature_list)) + ' features are included in the training dataset: ' + str(self.feature_list)) def split_training_testing(self): """ Splits the training data into training and validation data. """ self.train_features, self.test_features, self.train_labels, self.test_labels = \ train_test_split(self.features, self.labels, test_size=self.test_size, random_state=settings.random_seed, stratify=self.labels) print('Data split') self.logger.info('Training data split in training and test dataset') class RandomForest(prepare_data): def __init__(self, aim, parallel=False, log=None): super().__init__(aim, log) self.aim = aim self.parallel = parallel self.logger = log self.num_chunks = 10 # Random Forest settings self.criterion = settings.criterion self.n_estimators = settings.num_trees self.max_depth = settings.depth self.model_dir = settings.model_database_dir self.model_to_load = settings.model_to_load self.model_to_save = settings.model_to_save self.output_dir = None if aim == 'train_test': print('Model is trained') self.define() self.train() self.predict() self.evaluate() self.create_output_dir() self.save_model() self.save_parameters() self.feature_importance() elif aim == 'prediction': print('Prediction is performed') self.create_output_dir() self.load_model() self.predict() self.extract_pos_neg_predictions() self.reshape_prediction() self.save_prediction() def define(self): """ Define the Random Forest Classifier model. """ self.model = RandomForestClassifier(n_estimators=self.n_estimators, max_depth=self.max_depth, random_state=settings.random_seed) self.logger.info('Model is defined') def train(self): """ Train the Random Forest Classifier model. """ self.model.fit(self.train_features, np.ravel(self.train_labels)) self.logger.info('Model is trained') def predict(self): """ Make the prediction. """ print('Predicting...') self.logger.info('Predicting...') if self.aim == 'prediction': pred = self.features elif self.aim == 'train_test': pred = self.test_features if self.parallel: self.split_array_into_chunks(pred) prediction = Parallel(n_jobs=10)( delayed(self.model.predict)(chunk) for chunk in self.chunks) self.prediction = np.concatenate(prediction, axis=0) else: self.prediction = self.model.predict(pred) def split_array_into_chunks(self, pred): """ Split a NumPy array into chunks without changing the number of columns. """ self.logger.info('Prediction dataset is split in chunks') # Calculate the number of rows in each chunk rows_per_chunk = pred.shape[0] // self.num_chunks remaining_rows = pred.shape[0] % self.num_chunks # Create chunks self.chunks = [] start = 0 length = 0 for i in range(self.num_chunks): end = start + rows_per_chunk + (1 if i < remaining_rows else 0) chunk = pred[start:end, :] self.chunks.append(chunk) start = end length = length + len(chunk) def evaluate(self): """ Evaluate the validation dataset. """ self.logger.info('Model is evaluated') y_pred_prob = self.model.predict_proba(self.test_features)[:, 1] self.fpr, self.tpr, self.thresholds = roc_curve(self.test_labels, y_pred_prob) # Calculate AUC (Area Under the Curve) self.roc_auc = auc(self.fpr, self.tpr) diff = [abs(pred-test_labels) for pred, test_labels in zip(list(self.prediction), list(self.test_labels))] self.acc = str(diff.count(1)) + '/' + str(len(diff)) self.mae = round(np.mean(diff), 2) print('Mean absolute error: ' + str(self.mae)) print('Wrongly predicted: ' + str(np.count_nonzero(diff)) + '/' + str(len(diff))) self.mse = mean_squared_error(self.test_labels, self.prediction) self.f1 = f1_score(self.test_labels, self.prediction) self.fbeta = fbeta_score(self.test_labels, self.prediction, beta=2) print('Mean squared error: ' + str(self.mse)) def create_output_dir(self): """ Define and create the output directory. """ self.output_dir = self.model_dir + self.model_to_save if not os.path.isdir(self.output_dir): os.makedirs(self.output_dir, exist_ok=True) def save_model(self): """ Save the Random Forest Classifier model. """ with open(self.output_dir + '/saved_model.pkl', 'wb') as file: pkl.dump(self.model, file) self.logger.info('Model is saved') def save_parameters(self): """ Save the metadata associated with the prediction. """ params = {'Area': settings.bounding_box, 'criterion': [self.criterion], 'n_estimators': [self.n_estimators], 'max_depth': [self.max_depth], 'features': self.feature_list, 'mse': self.mse, 'mae': self.mae, 'f1': self.f1, 'roc_threshold': self.thresholds, 'roc_fpr': self.fpr, 'roc_tpr': self.tpr, 'roc_auc': self.roc_auc, 'accuracy': self.acc, 'fbeta': self.fbeta } with open(settings.model_database_dir + self.model_to_save + '/model_params.pkl', 'wb') as file: pkl.dump(params, file) self.logger.info('Parameters are saved') def load_model(self): """ Load the Random Forest Classifier model and the metadata. Make sure to compare features of training and prediction dataset as well as their order. """ print('Loading model ' + self.model_dir + self.model_to_load + '/saved_model.pkl') self.logger.info('Loading model ' + self.model_dir + self.model_to_load + '/saved_model.pkl') with open(self.model_dir + self.model_to_load + '/saved_model.pkl', 'rb') as file: self.model = pkl.load(file) with open(settings.model_database_dir + self.model_to_save + '/model_params.pkl', 'rb') as f: params = pkl.load(f) features = params['features'] self.logger.info('Model loaded from ' + self.model_dir + self.model_to_load) print('Model loaded from ' + self.model_dir + self.model_to_load) print("Model successfully loaded") def save_prediction(self): """ Save the prediction. """ if self.aim == 'prediction': output_dir = self.model_dir + self.model_to_load self.xy.to_csv(output_dir + '/prediction_results.csv', columns=['xcoord', 'ycoord', 'pred'], index=True) self.df_pos.to_csv(output_dir + '/pos_prediction_results.csv', columns=['xcoord', 'ycoord', 'pred'], index=True) self.df_neg.to_csv(output_dir + '/neg_prediction_results.csv', columns=['xcoord', 'ycoord', 'pred'], index=True) print('Predictions saved in ' + output_dir) self.logger.info('Prediction saved in ' + output_dir) def reshape_prediction(self): """ Reshape the individual predictions into a map. """ arr_xy = np.array(self.xy) arr_xy[self.dropped, :] = settings.no_value#*np.shape(arr_xy)[1] result = np.reshape(list(arr_xy[:, 2]), (len(list(set(self.xy['ycoord']))), len(list(set(self.xy['xcoord']))))) self.logger.info('Prediction is reshaped into the final map') self.save_prediction_as_nc(result) def extract_pos_neg_predictions(self): """ Distinguish between the classes of the Classifier. """ print('Extract pos and neg predictions...') self.logger.info('Extract positive and negative predictions...') self.xy['pred'] = self.prediction self.df_pos = self.xy[self.xy.pred == 1] self.df_neg = self.xy[self.xy.pred == 0] def save_prediction_as_nc(self, prediction): """ Save the hazard map to a netCDF4 file. """ print('Saving as nc-File') outfile_name = self.model_dir + self.model_to_load + '/prediction.nc' if os.path.exists(outfile_name): os.remove(outfile_name) generate_ncfile(outfile_name, np.array(list(set(self.xy['xcoord']))), np.array(list(set(self.xy['ycoord']))), prediction, crs=settings.crs, missing_value=settings.no_value) self.logger.info('Map is saved as nc-file') def feature_importance(self): """ Access feature importance information from the Random Forest. """ feature_imp = pd.Series(self.model.feature_importances_, index=self.feature_list).sort_values( ascending=False) feature_imp.to_csv(self.model_dir + settings.model_to_load + '/feature_importance.csv') self.logger.info('Feature importance is saved')