Skip to content
Snippets Groups Projects

Resolve "Event Log Handler"

Merged Shufei Du requested to merge 6-event-log-handler into main
Files
2
+ 125
2
class EventlogHandler:
#TODO
\ No newline at end of file
import pandas as pd
import pm4py
from loguru import logger
class Event_handler:
"""
This class is all about handling the file ulpaded by the user
to get it ready for the next step, which is creating the KG
"""
def __init__(self, file_path):
self.file_path = file_path
self.df = None
self.case_id_index = None
self.activity_name_index = None
self.timestamp_index = None
self.resource_index = None
self.file_type = None
self.main_columns_dict = None
def check_file_type(self) -> bool:
logger.info("Checking file path")
for end in [".xes", ".csv", ".parquet", ".xls", ".xlsx"]:
if self.file_path.lower().endswith(end):
self.file_type = end
logger.success(f"file type is: {end}, ACCEPTED ")
return True
logger.error(f"file type is not supported")
return False
def read_file(self) -> None:
"""
Change the file to a dataframe for ease of use
"""
logger.info("converting file to dataframe")
if self.file_type == ".csv":
self.df = pd.read_csv(self.file_path)
elif self.file_type == ".parquet":
self.df = pd.read_parquet(self.file_path)
elif self.file_type == ".xls":
self.df = pd.read_excel(self.file_path)
else:
log = pm4py.xes_importer.import_log(self.file_path)
self.df = pm4py.convert_to_dataframe(log)
logger.success("Converted file to dataframe")
def list_of_columns(self) -> dict:
return list(self.df.columns)
def select_columns(self, case_id:str, timestamp:str, activity:str, resource :str = None) -> dict:
"""
The user needs to define which columns are the timestamp, activity
case_id and resource for further development
Returns:
dict: dictionary with what the user chose for relevant columns
"""
main_columns_dict = {"case_id" : case_id, "timestamp" : timestamp, "activity": activity, "resource": resource}
self.main_columns_dict = main_columns_dict
logger.success(f"case_id, timestamp, activity chosen. Resource is {resource}")
return main_columns_dict
def timestamp_handler(self) -> None:
"""
To convert a dataframe to an eventlog, the timestamp column has to have
a certain type.
This function handles that.
"""
timestamp_col = self.main_columns_dict["timestamp"]
self.df[timestamp_col] = pd.to_datetime(self.df[timestamp_col], format='%Y-%m-%dT%H:%M:%S')
logger.success("timestamp converted")
def clean_data(self) -> None:
"""
This function is for data cleaning, it handles:
1- missing values
2- empty columns
3- timestamp type
At the end, the function gives a report.
"""
logger.info("Data Cleaning started")
initial_row_count = self.df.shape[0]
initial_col_count = self.df.shape[1]
# Missing values
row_missing = self.df[self.df.isnull().any(axis=1)]
self.df = self.df.dropna(subset=row_missing.index)
rows_dropped = initial_row_count - self.df.shape[0]
logger.success(f"{rows_dropped} rows with missing values dropped")
# Handle Empty Columns
col_empty = self.df.columns[self.df.isnull().all()]
self.df = self.df.drop(columns=col_empty)
cols_dropped = initial_col_count - self.df.shape[1]
logger.success(f"{len(col_empty)} empty columns dropped")
# Deal with TimeStamp
self.timestamp_handler()
# Final report
final_row_count = self.df.shape[0]
final_col_count = self.df.shape[1]
logger.info(f"Initial shape: {initial_row_count} rows, {initial_col_count} columns")
logger.info(f"Final shape: {final_row_count} rows, {final_col_count} columns")
logger.info(f"Total rows dropped: {rows_dropped}")
logger.info(f"Total columns dropped: {cols_dropped}")
logger.info("Data Cleaning completed")
def return_dataframe(self):
return self.df
Loading