#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Created on Wed Jan 29 13:20:59 2025 @author: aedrich """ import numpy as np import pandas as pd import netCDF4 as nc import pickle as pkl import os import logging import re from sklearn.model_selection import train_test_split from sklearn.ensemble import RandomForestClassifier from sklearn.metrics import mean_squared_error, f1_score, roc_curve, auc, fbeta_score from joblib import delayed, Parallel from tkinter import Label from utilities.ncfile_generation import generate_basic_ncfile from utilities.strings_for_ncfile import char_to_string, features_to_char class comparison_training_prediction_dataset: def __init__(self, logger): self.logger = logger self.error = False self.import_parameters() self.import_prediction_dataset() self.import_training_dataset() self.compare_features() if not self.error: self.additional_instances_to_drop() self.save_prediction_dataset() self.save_training_dataset() def import_parameters(self): with open('tmp_map.pkl', 'rb') as handle: self.properties_map = pkl.load(handle) with open('tmp_settings.pkl', 'rb') as handle: self.properties_settings = pkl.load(handle) if self.properties_map['drop_pred'] == '': self.not_included_pred_data = [] else: self.not_included_pred_data = self.properties_map[ 'drop_pred'].split(',') if self.properties_map['drop_train'] == '': self.not_included_train_data = [] else: self.not_included_train_data = self.properties_map[ 'drop_train'].split(',') def import_prediction_dataset(self): ds = nc.Dataset(self.properties_map['pred_path']) pred = ds['Result'][:, :].data pred_features = ds['features'][:].data self.feature_list = char_to_string(pred_features) if 'xcoord' in self.feature_list and 'ycoord' in self.feature_list: self.pred = pd.DataFrame(pred, columns=self.feature_list) else: self.pred = pd.DataFrame(pred, columns=['xcoord', 'ycoord']+self.feature_list) self.xy = pd.DataFrame() self.xy['ycoord'] = self.pred['ycoord'] self.xy['xcoord'] = self.pred['xcoord'] self.idx = ds['Dropped'][:].data self.idx = [int(x) for x in self.idx] if len(self.not_included_pred_data) > 0: for dataset in self.not_included_pred_data: if dataset in self.pred.columns.tolist(): self.pred = self.pred.drop(dataset, axis=1) self.logger.info('Prediction dataset imported') self.logger.info('The following ' + str(len(self.pred.columns.tolist())) + ' features are included in the prediction dataset: ' + str(self.pred.columns.tolist())) def import_training_dataset(self): # Import training dataset as csv file self.train = pd.read_csv(self.properties_map['train_path']) # Extract and remove labels from training dataset self.labels = np.array( self.train[self.properties_map['name_label']]).reshape( [np.shape(self.train[self.properties_map['name_label']])[0], 1]) self.xy_train = pd.DataFrame() self.xy_train['ID'] = self.train['ID'] self.xy_train[self.properties_map['name_label']] = self.train[self.properties_map['name_label']] self.xy_train['ycoord'] = self.train['ycoord'] self.xy_train['xcoord'] = self.train['xcoord'] self.train = self.train.drop(['xcoord', 'ycoord', 'ID', self.properties_map['name_label']], axis=1) if len(self.not_included_train_data) > 0: for dataset in self.not_included_train_data: if dataset in self.train.columns.tolist(): self.train = self.train.drop(dataset, axis=1) self.logger.info('Training dataset imported') self.logger.info('The following ' + str(len(self.train.columns.tolist())) + ' features are included in the training dataset: ' + str(self.train.columns.tolist())) def compare_features(self): """ It is assessed if all features in the training dataset also appear in the prediction dataset. If that is not the case, the training process will be relaunched with an adapted training dataset where the feature(s) that is/are not contrained in the training dataset are removed. The second trained model will be stored in a seperate folder which is named <old_folder_name>_retrain. If more features appear in the prediction dataset, the additional features are removed. """ self.logger.info('Features are compared between training and prediction dataset') if set(self.train.columns) == set(self.pred.columns): self.logger.info('Features are identical in both training and prediction dataset') self.pred = self.pred[self.train.columns] self.logger.info('Potentially varying order of features has been fixed') self.error = False else: self.logger.warning('Features are not identical in the training and prediction dataset') extra_in_pred = set(self.pred.columns) - set(self.train.columns) extra_in_train = set(self.train.columns) - set(self.pred.columns) if len(extra_in_pred) > 0 and len(extra_in_train) == 0: self.logger.warning('More features in prediction dataset, additional features are removed') self.pred = self.pred[self.train.columns] self.error = False elif len(extra_in_train) > 0 and len(extra_in_pred) == 0 : self.logger.warning('More features in training dataset, additional features are removed') self.train = self.train[self.pred.columns] self.error = False elif len(extra_in_train) > 0 and len(extra_in_pred) > 0: self.logger.warning('There are mismatching features in both datasets') self.common_columns = self.train.columns.intersection(self.pred.columns) if len(self.common_columns.tolist()) == 0: self.logger.error('Error: No common columns in training and prediction dataset') self.error = True elif len(self.common_columns.tolist()) < 6: self.logger.warning('Warning: only ' + str(len(self.common_columns.tolist())) + ' common columns in training and prediction dataset') self.error = False self.train = self.train[self.common_columns] self.pred = self.pred[self.common_columns] else: self.logger.info(str(len(self.common_columns.tolist())) + ' common columns in training and prediction dataset') self.error = False self.train = self.train[self.common_columns] self.pred = self.pred[self.common_columns] else: self.logger.error('Error: Unknown issue detected. Check features manually!') self.error = True self.logger.info('Feature comparison completed') def additional_instances_to_drop(self): """ All instances that have a value of zero in all columns of a categorical feature are identified and appended to the list of instances for which a reliable prediction is not possible. Input: master: related to information display in external window logger: related to generation of a process log row: related to information display in external window, int idx: Previously defined instances for which prediction is not possible, list pred: prediction dataset, pandas DataFrame Output: idx: Updated list of instances for which prediction is not possible, list row: Updated row information related to information display in external window, int """ self.logger.info('Start identification of instances that are not represented by at least one categorical feature') columns = self.pred.columns # Regular expression to match "<feature>_<value>_encoded" pattern = re.compile(r"^(.*?)(_?\d+)?_encode$") encoded_features = {pattern.match(col).group(1) for col in columns if pattern.match(col)} self.logger.info('Identified encoded features: ' + str(encoded_features)) count = 0 for feature in encoded_features: feature_cols = [col for col in self.pred.columns if col.startswith(feature) and col.endswith("_encode")] all_zero_rows = (self.pred[feature_cols] == 0).all(axis=1) all_zero_rows = self.pred.index[all_zero_rows].tolist() self.idx = list(set(self.idx + all_zero_rows)) count = count + len(all_zero_rows) self.logger.info(str(count) + ' instances have been identified that are not represented by at least one categorical feature') def save_prediction_dataset(self): """ Save prediction dataset and information on dropped rows as nc-file """ self.pred = pd.concat([self.xy, self.pred], axis=1) self.logger.info('Features in the prediction dataset: ' + str(self.pred.columns.tolist())) pred = self.pred.to_numpy() char_features = features_to_char(self.pred.columns) outfile = self.properties_map['pred_path'] self.logger.info('Prediction dataset is saved to ' + outfile) if os.path.exists(outfile): os.remove(outfile) ds = generate_basic_ncfile(outfile, crs=None) ds.createDimension('lat', (np.shape(pred)[0])) ds.createDimension('lon', (np.shape(pred)[1])) ds.createDimension('ix', (len(self.idx))) ds.createDimension('feat', len(char_features)) result = ds.createVariable('Result', 'f4', ('lat', 'lon')) dropped = ds.createVariable('Dropped', 'u8', 'ix') Features = ds.createVariable('features', 'S1', 'feat') result[:, :] = pred dropped[:] = np.array(self.idx) Features[:] = char_features ds.close() def save_training_dataset(self): """ Save dataframe as csv. If necessary folder is created. """ self.logger.info('Saving of training data in progress') outfile = self.properties_map['train_path'] # If outfile exists already, delete if os.path.exists(outfile): os.remove(outfile) self.train = pd.concat([self.xy_train, self.train], axis=1) self.logger.info('Features in the training dataset: ' + str(self.train.columns.tolist())) # Save dataframe as csv self.train.to_csv(outfile, sep=',', index=False) self.logger.info('Training dataset saved')