Select Git revision
dashboard-php-helper.iml
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
FootprintGenerator.py 16.50 KiB
"""
This file contains the methods necessary to extract
a footprint matrix from an eventlog or a processtree
"""
from pm4py.objects.process_tree import semantics
from pm4py.algo.discovery.dfg import factory as dfg_factory
import numpy as np
import pandas as pd
import pm4py.objects.process_tree.util as ut
from pm4py.objects.process_tree import pt_operator as pt_op
from pm4py.objects.process_tree.util import parse
import Conformance_Checker.GeneratorUtil as gu
import time
def construct_dataframe(footprint, activity_list):
"""
This function takes a footprint matrix and the activity names to make a pandas dataframe
representing a full footprint matrix
:param footprint: a raw footprint matrix given as a numpy array
:param activity_list: a list of the activities to use as names for rows and columns
:return: Panda Dataframe: a dataframe as a ready to use Footprint matrix
"""
dataframe = pd.DataFrame(data=footprint[0:, 0:], index=activity_list[0:], columns=activity_list[0:])
return dataframe
def extract_footprint_from_eventlog(eventlog):
"""
The main function
:param
:param eventlog: an eventlog object from pm4py for which the
footprint matrix shall be extracted
:return: a footprint matrix as a pandas dataframe with values of [1,2,3,4]
1: -> ; 2: <- ; 3: || ; 4: #
index and columns are the activities
"""
dfg = dfg_factory.apply(eventlog)
activity_list = gu.get_activities_from_log(eventlog)
activity_count = len(activity_list)
footprint = np.zeros((activity_count, activity_count), dtype=np.dtype('U25'))
s1 = "->"
s2 = "<-"
s3 = "||"
s4 = "#"
for i in range(activity_count):
for j in range(activity_count):
if (activity_list[i], activity_list[j]) in dfg and \
(activity_list[j], activity_list[i]) not in dfg:
footprint[i][j] = s1
elif (activity_list[j], activity_list[i]) in dfg and \
(activity_list[i], activity_list[j]) not in dfg:
footprint[i][j] = s2
elif (activity_list[j], activity_list[i]) in dfg and \
(activity_list[i], activity_list[j]) in dfg:
footprint[i][j] = s3
else:
footprint[i][j] = s4
footprint = construct_dataframe(footprint, activity_list)
return footprint
def is_first_child_skippable_at_start(tree):
"""
This method checks if the first child of a tree has only skippable start activities
:param tree: the tree whos first child needs to be checked
:return: True if the first child has only skippable start activities, False if not
"""
child = tree.children[0]
if child is None:
return True
sa, ea, fp, act, skippable = extract_footprints(child)
for activity in sa:
if activity[-1] == 'ns':
return False
return True
def is_first_child_skippable_at_end(tree):
"""
This method checks if the first child of a tree has only skippable end activities
:param tree: the tree whos first child need to be checked
:return: True if all endactivities of the first child are skippable, False if not
"""
child = tree.children[0]
if child is None:
return True
sa, ea, fp, act, skippable = extract_footprints(child)
for activity in ea:
if activity[-1] == 'ns':
return False
return True
def extract_footprints(tree):
"""
This function extract the footprints for a given process tree. the footprints will be given in two lists
first list contains all sequential footprints second list contains all parallel footprints
:param tree: a processtree object from pm4py library
:return: a footprint matrix as a pandas dataframe with values of [1,2,3,4]
1: -> ; 2: <- ; 3: || ; 4: #
index and columns are the activities
"""
start_activity = []
end_activity = []
sequences = []
parallel = []
footprints = [[], []]
activities = []
skip = False
if len(tree.children) > 0:
# In this case we are in an operator node and need to deal with it according to the type of the operator
if tree.operator == pt_op.Operator.XOR:
# check if we have a tau leaf. If we do all activities are skippable
for c in tree.children:
if ut.is_tau_leaf(c):
skip = True
# for every child append start activities, end activities and activities
# no new parallel or sequence footprints are produced
for c in tree.children:
sa, ea, fp, act, skippable = extract_footprints(c)
for x in sa:
if skip:
x[1] = 's'
start_activity.append(x)
for y in ea:
if skip:
y[1] = 's'
end_activity.append(y)
for ac in act:
activities.append(ac)
for f in fp[0]:
sequences.append(f)
for f in fp[1]:
parallel.append(f)
elif tree.operator == pt_op.Operator.OR:
# check if we have a tau leaf. If we do all activities are skippable
for c in tree.children:
if ut.is_tau_leaf(c):
skip = True
# for every child append the start activities, the end activities
# and the footprints both parallel and sequence
for c in tree.children:
sa, ea, fp, act, skippable = extract_footprints(c)
for x in sa:
if skip:
x[1] = 's'
start_activity.append(x)
for y in ea:
if skip:
y[1] = 's'
end_activity.append(y)
for ac in act:
activities.append(ac)
for x in fp[0]:
sequences.append(x)
for x in fp[1]:
parallel.append(x)
# now build own parallel footprints for every binary combination of activities
# that do not belong to the same child note
for child1 in tree.children:
for child2 in tree.children:
if child1 == child2:
continue
sa1, ea1, fp1, act1, skippable1 = extract_footprints(child1)
sa2, ea2, fp2, act2, skippable2 = extract_footprints(child2)
for x in act1:
for y in act2:
parallel.append((x, y))
# does Or and And produce the same footprints?
elif tree.operator == pt_op.Operator.PARALLEL:
# for every child append the start activities, the end activities
# and the footprints both parallel and sequence
for c in tree.children:
sa, ea, fp, act, skippable = extract_footprints(c)
for x in sa:
start_activity.append(x)
for y in ea:
end_activity.append(y)
for ac in act:
activities.append(ac)
for x in fp[0]:
sequences.append(x)
for x in fp[1]:
parallel.append(x)
# no build own parallel footprints for every binary combination of activities
# that do not belong to the same child note
for child1 in tree.children:
for child2 in tree.children:
if child1 == child2:
continue
sa1, ea1, fp1, act1, skippable1 = extract_footprints(child1)
sa2, ea2, fp2, act2, skippable2 = extract_footprints(child2)
for x in ea1:
for y in sa2:
if x in sa1 and y in ea2:
parallel.append((x, y))
else:
sequences.append((x, y))
elif tree.operator == pt_op.Operator.SEQUENCE:
size = len(tree.children)
# set start activity equal to the start activities of first child
first_child = tree.children[0]
sa, ea, fp, act, skippable = extract_footprints(first_child)
start_activity = sa
i = 1
while skippable and i < size:
first_child = tree.children[i]
sa, ea, fp, act, skippable = extract_footprints(first_child)
for x in sa:
start_activity.append(x)
i += 1
# set end activities to end activities of last child
last_child = tree.children[-1]
sa, ea, fp, act, skippable = extract_footprints(last_child)
end_activity = ea
i = 2
while skippable and i < (size+1):
last_child = tree.children[size - i]
sa, ea, fp, act, skippable = extract_footprints(last_child)
for x in ea:
end_activity.append(x)
i += 1
# for every child append the activities, the sequence footprints and the parallel footprints
for c in tree.children:
sa, ea, fp, act, skippable = extract_footprints(c)
for ac in act:
activities.append(ac)
for x in fp[0]:
sequences.append(x)
for x in fp[1]:
parallel.append(x)
# now build own sequence footprints any combination of all start activities of one child with all end
# activities of the next child
size = len(tree.children)
for i in range(size - 1):
sa1, ea1, fp1, act1, skippable = extract_footprints(tree.children[i])
sa2, ea2, fp2, act2, skippable = extract_footprints(tree.children[i + 1])
for x in ea1:
for y in sa2:
sequences.append((x, y))
current_child = tree.children[i+1]
#include other following nodes based on skippable loop nodes
j=1
while (i+j)< size-1 and current_child.operator == pt_op.Operator.LOOP:
if is_first_child_skippable_at_start(current_child) and is_first_child_skippable_at_end(current_child):
sa2, ea2, fp2, act2, skippable = extract_footprints(tree.children[i + j + 1])
for x in ea1:
for y in sa2:
sequences.append((x, y))
j += 1
current_child = tree.children[i+j]
#include other following nodes based on skippable xor nodes
j = 1
while (i + j) < size-1 and current_child.operator == pt_op.Operator.XOR:
if is_first_child_skippable_at_start(current_child) and is_first_child_skippable_at_end(
current_child):
sa2, ea2, fp2, act2, skippable = extract_footprints(tree.children[i + j + 1])
for x in ea1:
for y in sa2:
sequences.append((x, y))
j += 1
current_child = tree.children[i + j]
elif tree.operator == pt_op.Operator.LOOP:
# for every child append start activities, end activities, activities and footprints
# still needs to be completed
for c in tree.children:
sa, ea, fp, act, skippable = extract_footprints(c)
for ac in act:
activities.append(ac)
for x in fp[0]:
sequences.append(x)
for x in fp[1]:
parallel.append(x)
# separate first child and other children
first_child = tree.children[0]
if ut.is_tau_leaf(first_child):
skip = True
other_children = tree.children[1:]
# get start and end activities of the first child
sa, ea, fp, act, skippable = extract_footprints(first_child)
start_activity = sa
end_activity = ea
# for every end activity of the first child build a sequence footprint with every start activity of every
# other child
for x in ea:
for c in other_children:
sa2, ea2, fp2, act2, skippable = extract_footprints(c)
for y in sa2:
sequences.append((x, y))
if skippable or ut.is_tau_leaf(c):
for x in ea:
for y in sa:
sequences.append((x, y))
for x in sa:
for c in other_children:
sa2, ea2, fp2, act2, skippable = extract_footprints(c)
for y in ea2:
sequences.append((y, x))
elif ut.is_leaf(tree):
# In this case we are in a leave and we initialise start end and activities with the label of the node
if not ut.is_tau_leaf(tree):
activities.append(tree.label)
start_activity.append([tree.label, "ns"])
end_activity.append([tree.label, "ns"])
footprints = [sequences, parallel]
return start_activity, end_activity, footprints, activities, skip
def build_footprint_matrix(footprints, processtree):
"""
This function build the footprint matrix from a given process tree and extracted footprints
:param processtree: a processtree object from pm4py library for which the footprints are extracted
:param footprints: list of lists of sequential footprints and parallel footprints
:return: a footprint matrix as a pandas dataframe with values of [1,2,3,4]
1: -> ; 2: <- ; 3: || ; 4: #
index and columns are the activities
"""
activities = gu.extract_activity_names(gu.get_leaves(processtree))
activity_count = len(activities)
values = np.zeros((activity_count, activity_count), dtype=np.dtype('U25'))
sequences = footprints[0]
parallel = footprints[1]
sequence_values = []
for (x, y) in sequences:
sequence_values.append((x[0], y[0]))
parallel_values = []
for (x, y) in parallel:
parallel_values.append((x[0], y[0]))
s1 = "->"
s2 = "<-"
s3 = "||"
s4 = "#"
for i in range(activity_count):
for j in range(activity_count):
if (activities[i], activities[j]) in sequence_values:
values[i, j] = s1
if i is not j:
values[j, i] = s2
elif (activities[i], activities[j]) in parallel_values:
values[i, j] = s3
values[j, i] = s3
elif (activities[i], activities[j]) not in sequence_values and (activities[j], activities[i]) not in sequence_values:
values[i, j] = s4
values[j, i] = s4
foot_print = construct_dataframe(values, activities)
return foot_print
def extract_footprint_from_process_tree(tree):
"""
This function extract a footprint matrix from a given process tree
:param tree: a processtree object from pm4py
:return: a footprint matrix as a pandas dataframe with values of [1,2,3,4]
1: -> ; 2: <- ; 3: || ; 4: #
index and columns are the activities
"""
sa, ea, fp, activities, skippable = extract_footprints(tree)
foot_print = build_footprint_matrix(fp, tree)
return foot_print
def extract_footprint_from_process_tree_old(tree):
"""
This function extract a footprint matrix from a given process tree
:param tree: a processtree object from pm4py
:return: a footprint matrix as a pandas dataframe with values of [1,2,3,4]
1: -> ; 2: <- ; 3: || ; 4: #
index and columns are the activities
"""
event_log = semantics.generate_log(tree, no_traces=100)
foot_print = extract_footprint_from_eventlog(event_log)
return foot_print
if __name__ == "__main__":
# footprint = extract_footprint_from_event_log(import_csv())
# print(footprint)
rep_tree = "+( 'a', ->( 'b', +( ->( *( 'c', τ ), X( 'e', τ ) ), X( 'd', 'f' ) ) ) )"
tree = parse(rep_tree)
print(rep_tree)
t1 = time.time()
footprint = extract_footprint_from_process_tree(tree)
t2 = time.time()
print(t2 - t1)
print(footprint)