Skip to content
Snippets Groups Projects
compatibility_of_input_datasets.py 11.4 KiB
Newer Older
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Wed Jan 29 13:20:59 2025

@author: aedrich
"""

import numpy as np
import pandas as pd
import netCDF4 as nc
import pickle as pkl
import os
import logging
import re

from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import mean_squared_error, f1_score, roc_curve, auc, fbeta_score
from joblib import delayed, Parallel
from tkinter import Label

from utilities.ncfile_generation import generate_basic_ncfile
from utilities.strings_for_ncfile import char_to_string, features_to_char


class comparison_training_prediction_dataset:
    
    """
        This class imports the training and prediction dataset and compares
        the two datasets with respect to the contained features and their order.
        The maximum overlap of features is identified and an adapted training
        and prediction dataset is saved.
    
    """
    
    def __init__(self, logger):
        
        self.logger = logger
        self.error = False
        
        self.import_parameters()
        self.import_prediction_dataset()
        self.import_training_dataset()
        self.compare_features()
        if not self.error:
            self.additional_instances_to_drop()
            self.save_prediction_dataset()
            self.save_training_dataset()
        
    def import_parameters(self):
        
        """
            Import user defined parameters.
        """

        with open('tmp_map.pkl', 'rb') as handle:
            self.properties_map = pkl.load(handle)

        with open('tmp_settings.pkl', 'rb') as handle:
            self.properties_settings = pkl.load(handle)
            
        if self.properties_map['drop_pred'] == '':
            self.not_included_pred_data = []
        else:
            self.not_included_pred_data = self.properties_map[
                'drop_pred'].split(',')

        if self.properties_map['drop_train'] == '':
            self.not_included_train_data = []
        else:
            self.not_included_train_data = self.properties_map[
                'drop_train'].split(',')
            
    def import_prediction_dataset(self):
        
        """
            The prediction dataset is imported, features to be removed are
            dropped. 
        """

        ds = nc.Dataset(self.properties_map['pred_path'])
        pred = ds['Result'][:, :].data
        pred_features = ds['features'][:].data
        self.feature_list = char_to_string(pred_features)
        
        if 'xcoord' in self.feature_list and 'ycoord' in self.feature_list:
            self.pred = pd.DataFrame(pred, columns=self.feature_list)
        else:
            self.pred = pd.DataFrame(pred, columns=['xcoord', 'ycoord']+self.feature_list)
            
        self.xy = pd.DataFrame()
        self.xy['ycoord'] = self.pred['ycoord']
        self.xy['xcoord'] = self.pred['xcoord']
        
        self.idx = ds['Dropped'][:].data
        self.idx = [int(x) for x in self.idx]
            
        if len(self.not_included_pred_data) > 0:
            for dataset in self.not_included_pred_data:
                if dataset in self.pred.columns.tolist():
                    self.pred = self.pred.drop(dataset, axis=1)
                
        self.logger.info('Prediction dataset imported')
        self.logger.info('The following ' + str(len(self.pred.columns.tolist()))
                         + ' features are included in the prediction dataset: '
                         + str(self.pred.columns.tolist()))
                
    def import_training_dataset(self):
        
        """
            The training dataset is imported, features to be removed are
            dropped. 
        """
        
        # Import training dataset as csv file
        self.train = pd.read_csv(self.properties_map['train_path'])
        # Extract and remove labels from training dataset
        self.labels = np.array(
            self.train[self.properties_map['name_label']]).reshape(
                [np.shape(self.train[self.properties_map['name_label']])[0], 1])
        
        self.xy_train = pd.DataFrame()
        self.xy_train['ID'] = self.train['ID']
        self.xy_train[self.properties_map['name_label']] = self.train[self.properties_map['name_label']]
        self.xy_train['ycoord'] = self.train['ycoord']
        self.xy_train['xcoord'] = self.train['xcoord']
        
        self.train = self.train.drop(['xcoord', 'ycoord', 'ID', self.properties_map['name_label']], axis=1)

        if len(self.not_included_train_data) > 0:
            for dataset in self.not_included_train_data:
                if dataset in self.train.columns.tolist():
                    self.train = self.train.drop(dataset, axis=1)

        self.logger.info('Training dataset imported')
        self.logger.info('The following ' + str(len(self.train.columns.tolist()))
                         + ' features are included in the training dataset: '
                         + str(self.train.columns.tolist()))

    def compare_features(self):
        
        """
            It is assessed if all features in the training dataset also appear
            in the prediction dataset. If that is not the case, the maximum 
            overlap between the features is determined and an adapted version
            of the training and prediction dataset containing only the identified
            features is generated.           
        """
    
        self.logger.info('Features are compared between training and prediction dataset')
        
        if set(self.train.columns) == set(self.pred.columns):  
            self.logger.info('Features are identical in both training and prediction dataset')
            self.pred = self.pred[self.train.columns]
            
            self.logger.info('Potentially varying order of features has been fixed')
            self.error = False
            
        else:
            self.logger.warning('Features are not identical in the training and prediction dataset')

            extra_in_pred = set(self.pred.columns) - set(self.train.columns)
            extra_in_train = set(self.train.columns) - set(self.pred.columns)
            
            if len(extra_in_pred) > 0 and len(extra_in_train) == 0:
                self.logger.warning('More features in prediction dataset, additional features are removed')
                
                self.pred = self.pred[self.train.columns]
                self.error = False
                
            elif len(extra_in_train) > 0  and len(extra_in_pred) == 0 :
                self.logger.warning('More features in training dataset, additional features are removed')
                
                self.train = self.train[self.pred.columns]
                self.error = False
                
            elif len(extra_in_train) > 0  and len(extra_in_pred) > 0:
                self.logger.warning('There are mismatching features in both datasets')

                self.common_columns = self.train.columns.intersection(self.pred.columns)
                
                if len(self.common_columns.tolist()) == 0:
                    self.logger.error('Error: No common columns in training and prediction dataset')
                    self.error = True

                elif len(self.common_columns.tolist()) < 6:
                    self.logger.warning('Warning: only ' + str(len(self.common_columns.tolist())) + ' common columns in training and prediction dataset')
                    self.error = False
                    
                    self.train = self.train[self.common_columns]
                    self.pred = self.pred[self.common_columns]
                    
                else:
                    self.logger.info(str(len(self.common_columns.tolist())) + ' common columns in training and prediction dataset')         
                    self.error = False
                    
                    self.train = self.train[self.common_columns]
                    self.pred = self.pred[self.common_columns]
            else:
                self.logger.error('Error: Unknown issue detected. Check features manually!')
                self.error = True
                
            self.logger.info('Feature comparison completed')
            
    def additional_instances_to_drop(self):      
    
        """
            All instances that have a value of zero in all columns of a categorical
            feature are identified and appended to the list of instances for which
            a reliable prediction is not possible.
        """
    
        self.logger.info('Start identification of instances that are not represented by at least one categorical feature')
    
        columns = self.pred.columns
        # Regular expression to match "<feature>_<value>_encoded"
        pattern = re.compile(r"^(.*?)(_?\d+)?_encode$")
        encoded_features = {pattern.match(col).group(1) for col in columns if pattern.match(col)}
        
        self.logger.info('Identified encoded features: ' + str(encoded_features))
        count = 0
        for feature in encoded_features:
            
            feature_cols = [col for col in self.pred.columns if col.startswith(feature) and col.endswith("_encode")]
            all_zero_rows = (self.pred[feature_cols] == 0).all(axis=1)
            all_zero_rows = self.pred.index[all_zero_rows].tolist()
            self.idx = list(set(self.idx + all_zero_rows))
            count = count + len(all_zero_rows)

        self.logger.info(str(count) + ' instances have been identified that are not represented by at least one categorical feature')
        
    def save_prediction_dataset(self):
        
        """
            Save prediction dataset and information on dropped rows as nc-file
        """

        self.pred = pd.concat([self.xy, self.pred], axis=1)
        
        self.logger.info('Features in the prediction dataset: ' + str(self.pred.columns.tolist()))
        
        pred = self.pred.to_numpy()
        char_features = features_to_char(self.pred.columns)

        outfile = outfile = os.path.splitext(self.properties_map['path_pred'])[0] + '_adapt.csv'
        self.logger.info('Prediction dataset is saved to ' + outfile)
        
        if os.path.exists(outfile):
            os.remove(outfile)

        ds = generate_basic_ncfile(outfile, crs=None)
        ds.createDimension('lat', (np.shape(pred)[0]))
        ds.createDimension('lon', (np.shape(pred)[1]))
        ds.createDimension('ix', (len(self.idx)))
        ds.createDimension('feat', len(char_features))
        result = ds.createVariable('Result', 'f4', ('lat', 'lon'))
        dropped = ds.createVariable('Dropped', 'u8', 'ix')
        Features = ds.createVariable('features', 'S1', 'feat')
        result[:, :] = pred
        dropped[:] = np.array(self.idx)
        Features[:] = char_features
        ds.close()
    
    def save_training_dataset(self):

        """
            Save dataframe as csv. If necessary folder is created.
        """

        self.logger.info('Saving of training data in progress')

        outfile = outfile = os.path.splitext(self.properties_map['train_path'])[0] + '_adapt.csv'

        # If outfile exists already, delete
        if os.path.exists(outfile):
            os.remove(outfile)
            
        self.train = pd.concat([self.xy_train, self.train], axis=1)
        self.logger.info('Features in the training dataset: ' + str(self.train.columns.tolist()))
        # Save dataframe as csv
        self.train.to_csv(outfile, sep=',', index=False)
        self.logger.info('Training dataset saved')