Skip to content
Snippets Groups Projects
compatibility_of_input_datasets.py 10.5 KiB
Newer Older
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Wed Jan 29 13:20:59 2025

@author: aedrich
"""

import numpy as np
import pandas as pd
import netCDF4 as nc
import pickle as pkl
import os
import logging
import settings
import re

from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import mean_squared_error, f1_score, roc_curve, auc, fbeta_score
from joblib import delayed, Parallel
from tkinter import Label

from utilities.ncfile_generation import generate_basic_ncfile
from utilities.strings_for_ncfile import char_to_string, features_to_char


class comparison_training_prediction_dataset:
    
    def __init__(self, logger):
        
        self.logger = logger
        self.error = False
        
        self.import_prediction_dataset()
        self.import_training_dataset()
        self.compare_features()
        if not self.error:
            self.additional_instances_to_drop()
            self.save_prediction_dataset()
            self.save_training_dataset()
            
    def import_prediction_dataset(self):

        ds = nc.Dataset(settings.path_pred)
        pred = ds['Result'][:, :].data
        pred_features = ds['features'][:].data
        self.feature_list = char_to_string(pred_features)
        
        if 'xcoord' in self.feature_list and 'ycoord' in self.feature_list:
            self.pred = pd.DataFrame(pred, columns=self.feature_list)
        else:
            self.pred = pd.DataFrame(pred, columns=['xcoord', 'ycoord']+self.feature_list)
            
        self.xy = pd.DataFrame()
        self.xy['ycoord'] = self.pred['ycoord']
        self.xy['xcoord'] = self.pred['xcoord']
        
        self.idx = ds['Dropped'][:].data
        self.idx = [int(x) for x in self.idx]
            
        if len(settings.not_included_pred_data) > 0:
            for dataset in settings.not_included_pred_data:
                if dataset in self.pred.columns.tolist():
                    self.pred = self.pred.drop(dataset, axis=1)
                
        self.logger.info('Prediction dataset imported')
        self.logger.info('The following ' + str(len(self.pred.columns.tolist())) 
                         + ' features are included in the prediction dataset: ' 
                         + str(self.pred.columns.tolist()))
                
    def import_training_dataset(self):
        
        # Import training dataset as csv file
        self.train = pd.read_csv(settings.path_train)
        # Extract and remove labels from training dataset
        self.labels = np.array(
            self.train['label']).reshape(
                [np.shape(self.train['label'])[0], 1])
        
        self.xy_train = pd.DataFrame()
        self.xy_train['ID'] = self.train['ID']
        self.xy_train['label'] = self.train['label']
        self.xy_train['ycoord'] = self.train['ycoord']
        self.xy_train['xcoord'] = self.train['xcoord']
        
        self.train = self.train.drop(['xcoord', 'ycoord', 'ID', 'label'], axis=1)

        if len(settings.not_included_train_data) > 0:
            for dataset in settings.not_included_train_data:
                if dataset in self.train.columns.tolist():
                    self.train = self.train.drop(dataset, axis=1)

        self.logger.info('Training dataset imported')
        self.logger.info('The following ' + str(len(self.train.columns.tolist()))
                         + ' features are included in the training dataset: '
                         + str(self.train.columns.tolist()))

    def compare_features(self):
        
        """
            It is assessed if all features in the training dataset also appear
            in the prediction dataset. If that is not the case, the training 
            process will be relaunched with an adapted training dataset where the 
            feature(s) that is/are not contrained in the training dataset are
            removed. The second trained model will be stored in a seperate
            folder which is named <old_folder_name>_retrain.
            
            If more features appear in the prediction dataset, the additional 
            features are removed.
            
        """
    
        self.logger.info('Features are compared between training and prediction dataset')
        
        if set(self.train.columns) == set(self.pred.columns):  
            self.logger.info('Features are identical in both training and prediction dataset')
            self.pred = self.pred[self.train.columns]
            
            self.logger.info('Potentially varying order of features has been fixed')
            self.error = False
            
        else:
            self.logger.warning('Features are not identical in the training and prediction dataset')

            extra_in_pred = set(self.pred.columns) - set(self.train.columns)
            extra_in_train = set(self.train.columns) - set(self.pred.columns)
            
            if len(extra_in_pred) > 0 and len(extra_in_train) == 0:
                self.logger.warning('More features in prediction dataset, additional features are removed')
                
                self.pred = self.pred[self.train.columns]
                self.error = False
                
            elif len(extra_in_train) > 0  and len(extra_in_pred) == 0 :
                self.logger.warning('More features in training dataset, additional features are removed')
                
                self.train = self.train[self.pred.columns]
                self.error = False
                
            elif len(extra_in_train) > 0  and len(extra_in_pred) > 0:
                self.logger.warning('There are mismatching features in both datasets')

                self.common_columns = self.train.columns.intersection(self.pred.columns)
                
                if len(self.common_columns.tolist()) == 0:
                    self.logger.error('Error: No common columns in training and prediction dataset')
                    self.error = True

                elif len(self.common_columns.tolist()) < 6:
                    self.logger.warning('Warning: only ' + str(len(self.common_columns.tolist())) + ' common columns in training and prediction dataset')
                    self.error = False
                    
                    self.train = self.train[self.common_columns]
                    self.pred = self.pred[self.common_columns]
                    
                else:
                    self.logger.info(str(len(self.common_columns.tolist())) + ' common columns in training and prediction dataset')         
                    self.error = False
                    
                    self.train = self.train[self.common_columns]
                    self.pred = self.pred[self.common_columns]
            else:
                self.logger.error('Error: Unknown issue detected. Check features manually!')
                self.error = True
                
            self.logger.info('Feature comparison completed')
            
    def additional_instances_to_drop(self):     
    
        """
            All instances that have a value of zero in all columns of a categorical
            feature are identified and appended to the list of instances for which
            a reliable prediction is not possible.
            
            Input:
                master: related to information display in external window
                logger: related to generation of a process log
                row: related to information display in external window, int
                idx: Previously defined instances for which prediction is not
                     possible, list
                pred: prediction dataset, pandas DataFrame
                
            Output:
                idx: Updated list of instances for which prediction is not
                     possible, list
                row: Updated row information related to information display in
                     external window, int
        
        """
    
        self.logger.info('Start identification of instances that are not represented by at least one categorical feature')
    
        columns = self.pred.columns
        # Regular expression to match "<feature>_<value>_encoded"
        pattern = re.compile(r"^(.*?)(_?\d+)?_encoded$")
        encoded_features = {pattern.match(col).group(1) for col in columns if pattern.match(col)}
        
        self.logger.info('Identified encoded features: ' + str(encoded_features))
        count = 0
        for feature in encoded_features:
            
            feature_cols = [col for col in self.pred.columns if col.startswith(feature) and col.endswith("_encoded")]
            all_zero_rows = (self.pred[feature_cols] == 0).all(axis=1)
            all_zero_rows = self.pred.index[all_zero_rows].tolist()
            self.idx = list(set(self.idx + all_zero_rows))
            count = count + len(all_zero_rows)

        self.logger.info(str(count) + ' instances have been identified that are not represented by at least one categorical feature')
        
    def save_prediction_dataset(self):
        
        """
            Save prediction dataset and information on dropped rows as nc-file
        """

        self.pred = pd.concat([self.xy, self.pred], axis=1)
        pred = self.pred.to_numpy()
        char_features = features_to_char(self.pred.columns)

        outfile = settings.path_pred
        self.logger.info('Prediction dataset is saved to ' + outfile)
        
        if os.path.exists(outfile):
            os.remove(outfile)

        ds = generate_basic_ncfile(outfile, crs=None)
        ds.createDimension('lat', (np.shape(pred)[0]))
        ds.createDimension('lon', (np.shape(pred)[1]))
        ds.createDimension('ix', (len(self.idx)))
        ds.createDimension('feat', len(char_features))
        result = ds.createVariable('Result', 'f4', ('lat', 'lon'))
        dropped = ds.createVariable('Dropped', 'u8', 'ix')
        Features = ds.createVariable('features', 'S1', 'feat')
        result[:, :] = pred
        dropped[:] = np.array(self.idx)
        Features[:] = char_features
        ds.close()
    
    def save_training_dataset(self):

        """
            Save dataframe as csv. If necessary folder is created.
        """

        self.logger.info('Saving of training data in progress')

        outfile = settings.path_train

        # If outfile exists already, delete
        if os.path.exists(outfile):
            os.remove(outfile)

        self.train = pd.concat([self.xy_train, self.train], axis=1)

        # Save dataframe as csv
        self.train.to_csv(outfile, sep=',', index=False)
        self.logger.info('Training dataset saved')