-
Ann-Kathrin Margarete Edrich authoredAnn-Kathrin Margarete Edrich authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
compatibility_of_input_datasets.py 11.57 KiB
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Wed Jan 29 13:20:59 2025
@author: aedrich
"""
import numpy as np
import pandas as pd
import netCDF4 as nc
import pickle as pkl
import os
import logging
import re
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import mean_squared_error, f1_score, roc_curve, auc, fbeta_score
from joblib import delayed, Parallel
from tkinter import Label
from utilities.ncfile_generation import generate_basic_ncfile
from utilities.strings_for_ncfile import char_to_string, features_to_char
class comparison_training_prediction_dataset:
def __init__(self, logger):
self.logger = logger
self.error = False
self.import_parameters()
self.import_prediction_dataset()
self.import_training_dataset()
self.compare_features()
if not self.error:
self.additional_instances_to_drop()
self.save_prediction_dataset()
self.save_training_dataset()
def import_parameters(self):
with open('tmp_map.pkl', 'rb') as handle:
self.properties_map = pkl.load(handle)
with open('tmp_settings.pkl', 'rb') as handle:
self.properties_settings = pkl.load(handle)
if self.properties_map['drop_pred'] == '':
self.not_included_pred_data = []
else:
self.not_included_pred_data = self.properties_map[
'drop_pred'].split(',')
if self.properties_map['drop_train'] == '':
self.not_included_train_data = []
else:
self.not_included_train_data = self.properties_map[
'drop_train'].split(',')
def import_prediction_dataset(self):
ds = nc.Dataset(self.properties_map['pred_path'])
pred = ds['Result'][:, :].data
pred_features = ds['features'][:].data
self.feature_list = char_to_string(pred_features)
if 'xcoord' in self.feature_list and 'ycoord' in self.feature_list:
self.pred = pd.DataFrame(pred, columns=self.feature_list)
else:
self.pred = pd.DataFrame(pred, columns=['xcoord', 'ycoord']+self.feature_list)
self.xy = pd.DataFrame()
self.xy['ycoord'] = self.pred['ycoord']
self.xy['xcoord'] = self.pred['xcoord']
self.idx = ds['Dropped'][:].data
self.idx = [int(x) for x in self.idx]
if len(self.not_included_pred_data) > 0:
for dataset in self.not_included_pred_data:
if dataset in self.pred.columns.tolist():
self.pred = self.pred.drop(dataset, axis=1)
self.logger.info('Prediction dataset imported')
self.logger.info('The following ' + str(len(self.pred.columns.tolist()))
+ ' features are included in the prediction dataset: '
+ str(self.pred.columns.tolist()))
def import_training_dataset(self):
# Import training dataset as csv file
self.train = pd.read_csv(self.properties_map['train_path'])
# Extract and remove labels from training dataset
self.labels = np.array(
self.train[self.properties_map['name_label']]).reshape(
[np.shape(self.train[self.properties_map['name_label']])[0], 1])
self.xy_train = pd.DataFrame()
self.xy_train['ID'] = self.train['ID']
self.xy_train[self.properties_map['name_label']] = self.train[self.properties_map['name_label']]
self.xy_train['ycoord'] = self.train['ycoord']
self.xy_train['xcoord'] = self.train['xcoord']
self.train = self.train.drop(['xcoord', 'ycoord', 'ID', self.properties_map['name_label']], axis=1)
if len(self.not_included_train_data) > 0:
for dataset in self.not_included_train_data:
if dataset in self.train.columns.tolist():
self.train = self.train.drop(dataset, axis=1)
self.logger.info('Training dataset imported')
self.logger.info('The following ' + str(len(self.train.columns.tolist()))
+ ' features are included in the training dataset: '
+ str(self.train.columns.tolist()))
def compare_features(self):
"""
It is assessed if all features in the training dataset also appear
in the prediction dataset. If that is not the case, the training
process will be relaunched with an adapted training dataset where the
feature(s) that is/are not contrained in the training dataset are
removed. The second trained model will be stored in a seperate
folder which is named <old_folder_name>_retrain.
If more features appear in the prediction dataset, the additional
features are removed.
"""
self.logger.info('Features are compared between training and prediction dataset')
if set(self.train.columns) == set(self.pred.columns):
self.logger.info('Features are identical in both training and prediction dataset')
self.pred = self.pred[self.train.columns]
self.logger.info('Potentially varying order of features has been fixed')
self.error = False
else:
self.logger.warning('Features are not identical in the training and prediction dataset')
extra_in_pred = set(self.pred.columns) - set(self.train.columns)
extra_in_train = set(self.train.columns) - set(self.pred.columns)
if len(extra_in_pred) > 0 and len(extra_in_train) == 0:
self.logger.warning('More features in prediction dataset, additional features are removed')
self.pred = self.pred[self.train.columns]
self.error = False
elif len(extra_in_train) > 0 and len(extra_in_pred) == 0 :
self.logger.warning('More features in training dataset, additional features are removed')
self.train = self.train[self.pred.columns]
self.error = False
elif len(extra_in_train) > 0 and len(extra_in_pred) > 0:
self.logger.warning('There are mismatching features in both datasets')
self.common_columns = self.train.columns.intersection(self.pred.columns)
if len(self.common_columns.tolist()) == 0:
self.logger.error('Error: No common columns in training and prediction dataset')
self.error = True
elif len(self.common_columns.tolist()) < 6:
self.logger.warning('Warning: only ' + str(len(self.common_columns.tolist())) + ' common columns in training and prediction dataset')
self.error = False
self.train = self.train[self.common_columns]
self.pred = self.pred[self.common_columns]
else:
self.logger.info(str(len(self.common_columns.tolist())) + ' common columns in training and prediction dataset')
self.error = False
self.train = self.train[self.common_columns]
self.pred = self.pred[self.common_columns]
else:
self.logger.error('Error: Unknown issue detected. Check features manually!')
self.error = True
self.logger.info('Feature comparison completed')
def additional_instances_to_drop(self):
"""
All instances that have a value of zero in all columns of a categorical
feature are identified and appended to the list of instances for which
a reliable prediction is not possible.
Input:
master: related to information display in external window
logger: related to generation of a process log
row: related to information display in external window, int
idx: Previously defined instances for which prediction is not
possible, list
pred: prediction dataset, pandas DataFrame
Output:
idx: Updated list of instances for which prediction is not
possible, list
row: Updated row information related to information display in
external window, int
"""
self.logger.info('Start identification of instances that are not represented by at least one categorical feature')
columns = self.pred.columns
# Regular expression to match "<feature>_<value>_encoded"
pattern = re.compile(r"^(.*?)(_?\d+)?_encode$")
encoded_features = {pattern.match(col).group(1) for col in columns if pattern.match(col)}
self.logger.info('Identified encoded features: ' + str(encoded_features))
count = 0
for feature in encoded_features:
feature_cols = [col for col in self.pred.columns if col.startswith(feature) and col.endswith("_encode")]
all_zero_rows = (self.pred[feature_cols] == 0).all(axis=1)
all_zero_rows = self.pred.index[all_zero_rows].tolist()
self.idx = list(set(self.idx + all_zero_rows))
count = count + len(all_zero_rows)
self.logger.info(str(count) + ' instances have been identified that are not represented by at least one categorical feature')
def save_prediction_dataset(self):
"""
Save prediction dataset and information on dropped rows as nc-file
"""
self.pred = pd.concat([self.xy, self.pred], axis=1)
self.logger.info('Features in the prediction dataset: ' + str(self.pred.columns.tolist()))
pred = self.pred.to_numpy()
char_features = features_to_char(self.pred.columns)
outfile = self.properties_map['pred_path']
self.logger.info('Prediction dataset is saved to ' + outfile)
if os.path.exists(outfile):
os.remove(outfile)
ds = generate_basic_ncfile(outfile, crs=None)
ds.createDimension('lat', (np.shape(pred)[0]))
ds.createDimension('lon', (np.shape(pred)[1]))
ds.createDimension('ix', (len(self.idx)))
ds.createDimension('feat', len(char_features))
result = ds.createVariable('Result', 'f4', ('lat', 'lon'))
dropped = ds.createVariable('Dropped', 'u8', 'ix')
Features = ds.createVariable('features', 'S1', 'feat')
result[:, :] = pred
dropped[:] = np.array(self.idx)
Features[:] = char_features
ds.close()
def save_training_dataset(self):
"""
Save dataframe as csv. If necessary folder is created.
"""
self.logger.info('Saving of training data in progress')
outfile = self.properties_map['train_path']
# If outfile exists already, delete
if os.path.exists(outfile):
os.remove(outfile)
self.train = pd.concat([self.xy_train, self.train], axis=1)
self.logger.info('Features in the training dataset: ' + str(self.train.columns.tolist()))
# Save dataframe as csv
self.train.to_csv(outfile, sep=',', index=False)
self.logger.info('Training dataset saved')