Skip to content
Snippets Groups Projects
Select Git revision
  • main
1 result

dashboard-php-helper.iml

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    FootprintGenerator.py 16.50 KiB
    """
    This file contains the methods necessary to extract
    a footprint matrix from an eventlog or a processtree
    """
    from pm4py.objects.process_tree import semantics
    from pm4py.algo.discovery.dfg import factory as dfg_factory
    import numpy as np
    import pandas as pd
    import pm4py.objects.process_tree.util as ut
    from pm4py.objects.process_tree import pt_operator as pt_op
    
    from pm4py.objects.process_tree.util import parse
    import Conformance_Checker.GeneratorUtil as gu
    
    import time
    
    
    
    def construct_dataframe(footprint, activity_list):
        """
        This function takes a footprint matrix and the activity names to make a pandas dataframe
        representing a full footprint matrix
    
        :param footprint: a raw footprint matrix given as a numpy array
        :param activity_list: a list of the activities to use as names for rows and columns
        :return: Panda Dataframe: a dataframe as a ready to use Footprint matrix
        """
        dataframe = pd.DataFrame(data=footprint[0:, 0:], index=activity_list[0:], columns=activity_list[0:])
        return dataframe
    
    
    def extract_footprint_from_eventlog(eventlog):
        """
        The main function
        :param
        :param eventlog: an eventlog object from pm4py for which the
                        footprint matrix shall be extracted
    
        :return: a footprint matrix as a pandas dataframe with values of [1,2,3,4]
                1: -> ; 2: <- ; 3: || ; 4: #
                index and columns are the activities
        """
        dfg = dfg_factory.apply(eventlog)
        activity_list = gu.get_activities_from_log(eventlog)
        activity_count = len(activity_list)
        footprint = np.zeros((activity_count, activity_count), dtype=np.dtype('U25'))
        s1 = "->"
        s2 = "<-"
        s3 = "||"
        s4 = "#"
    
        for i in range(activity_count):
            for j in range(activity_count):
                if (activity_list[i], activity_list[j]) in dfg and \
                        (activity_list[j], activity_list[i]) not in dfg:
                    footprint[i][j] = s1
                elif (activity_list[j], activity_list[i]) in dfg and \
                        (activity_list[i], activity_list[j]) not in dfg:
                    footprint[i][j] = s2
                elif (activity_list[j], activity_list[i]) in dfg and \
                        (activity_list[i], activity_list[j]) in dfg:
                    footprint[i][j] = s3
                else:
                    footprint[i][j] = s4
        footprint = construct_dataframe(footprint, activity_list)
        return footprint
    
    
    def is_first_child_skippable_at_start(tree):
        """
        This method checks if the first child of a tree has only skippable start activities
        :param tree: the tree whos first child needs to be checked
        :return: True if the first child has only skippable start activities, False if not
        """
        child = tree.children[0]
        if child is None:
            return True
        sa, ea, fp, act, skippable = extract_footprints(child)
        for activity in sa:
            if activity[-1] == 'ns':
                return False
    
        return True
    
    
    def is_first_child_skippable_at_end(tree):
        """
        This method checks if the first child of a tree has only skippable end activities
        :param tree: the tree whos first child need to be checked
        :return: True if all endactivities of the first child are skippable, False if not
        """
        child = tree.children[0]
        if child is None:
            return True
        sa, ea, fp, act, skippable = extract_footprints(child)
        for activity in ea:
            if activity[-1] == 'ns':
                return False
    
        return True
    
    
    def extract_footprints(tree):
        """
        This function extract the footprints for a given process tree. the footprints will be given in two lists
        first list contains all sequential footprints second list contains all parallel footprints
        :param tree: a processtree object from pm4py library
        :return: a footprint matrix as a pandas dataframe with values of [1,2,3,4]
                1: -> ; 2: <- ; 3: || ; 4: #
                index and columns are the activities
        """
        start_activity = []
        end_activity = []
        sequences = []
        parallel = []
        footprints = [[], []]
        activities = []
        skip = False
    
        if len(tree.children) > 0:
            # In this case we are in an operator node and need to deal with it according to the type of the operator
            if tree.operator == pt_op.Operator.XOR:
    
                # check if we have a tau leaf. If we do all activities are skippable
                for c in tree.children:
                    if ut.is_tau_leaf(c):
                        skip = True
                # for every child append start activities, end activities and activities
                # no new parallel or sequence footprints are produced
                for c in tree.children:
                    sa, ea, fp, act, skippable = extract_footprints(c)
                    for x in sa:
                        if skip:
                            x[1] = 's'
                        start_activity.append(x)
                    for y in ea:
                        if skip:
                            y[1] = 's'
                        end_activity.append(y)
                    for ac in act:
                        activities.append(ac)
                    for f in fp[0]:
                        sequences.append(f)
                    for f in fp[1]:
                        parallel.append(f)
    
            elif tree.operator == pt_op.Operator.OR:
                # check if we have a tau leaf. If we do all activities are skippable
                for c in tree.children:
                    if ut.is_tau_leaf(c):
                        skip = True
    
                # for every child append the start activities, the end activities
                # and the footprints both parallel and sequence
                for c in tree.children:
                    sa, ea, fp, act, skippable = extract_footprints(c)
                    for x in sa:
                        if skip:
                            x[1] = 's'
                        start_activity.append(x)
                    for y in ea:
                        if skip:
                            y[1] = 's'
                        end_activity.append(y)
                    for ac in act:
                        activities.append(ac)
                    for x in fp[0]:
                        sequences.append(x)
                    for x in fp[1]:
                        parallel.append(x)
                # now build own parallel footprints for every binary combination of activities
                # that do not belong to the same child note
                for child1 in tree.children:
                    for child2 in tree.children:
                        if child1 == child2:
                            continue
                        sa1, ea1, fp1, act1, skippable1 = extract_footprints(child1)
                        sa2, ea2, fp2, act2, skippable2 = extract_footprints(child2)
                        for x in act1:
                            for y in act2:
                                parallel.append((x, y))
    
            # does Or and And produce the same footprints?
            elif tree.operator == pt_op.Operator.PARALLEL:
                # for every child append the start activities, the end activities
                # and the footprints both parallel and sequence
                for c in tree.children:
                    sa, ea, fp, act, skippable = extract_footprints(c)
                    for x in sa:
                        start_activity.append(x)
                    for y in ea:
                        end_activity.append(y)
                    for ac in act:
                        activities.append(ac)
                    for x in fp[0]:
                        sequences.append(x)
                    for x in fp[1]:
                        parallel.append(x)
    
                # no build own parallel footprints for every binary combination of activities
                # that do not belong to the same child note
                for child1 in tree.children:
                    for child2 in tree.children:
                        if child1 == child2:
                            continue
                        sa1, ea1, fp1, act1, skippable1 = extract_footprints(child1)
                        sa2, ea2, fp2, act2, skippable2 = extract_footprints(child2)
                        for x in ea1:
                            for y in sa2:
                                if x in sa1 and y in ea2:
                                    parallel.append((x, y))
                                else:
                                    sequences.append((x, y))
    
            elif tree.operator == pt_op.Operator.SEQUENCE:
                size = len(tree.children)
                # set start activity equal to the start activities of first child
                first_child = tree.children[0]
                sa, ea, fp, act, skippable = extract_footprints(first_child)
                start_activity = sa
                i = 1
                while skippable and i < size:
                    first_child = tree.children[i]
                    sa, ea, fp, act, skippable = extract_footprints(first_child)
                    for x in sa:
                        start_activity.append(x)
                    i += 1
    
                # set end activities to end activities of last child
                last_child = tree.children[-1]
                sa, ea, fp, act, skippable = extract_footprints(last_child)
                end_activity = ea
                i = 2
                while skippable and i < (size+1):
                    last_child = tree.children[size - i]
                    sa, ea, fp, act, skippable = extract_footprints(last_child)
                    for x in ea:
                        end_activity.append(x)
                    i += 1
    
                # for every child append the activities, the sequence footprints and the parallel footprints
                for c in tree.children:
                    sa, ea, fp, act, skippable = extract_footprints(c)
                    for ac in act:
                        activities.append(ac)
                    for x in fp[0]:
                        sequences.append(x)
                    for x in fp[1]:
                        parallel.append(x)
    
                # now build own sequence footprints any combination of all start activities of one child with all end
                # activities of the next child
                size = len(tree.children)
                for i in range(size - 1):
                    sa1, ea1, fp1, act1, skippable = extract_footprints(tree.children[i])
                    sa2, ea2, fp2, act2, skippable = extract_footprints(tree.children[i + 1])
                    for x in ea1:
                        for y in sa2:
                            sequences.append((x, y))
                    current_child = tree.children[i+1]
                    #include other following nodes based on skippable loop nodes
                    j=1
                    while (i+j)< size-1 and current_child.operator == pt_op.Operator.LOOP:
                        if is_first_child_skippable_at_start(current_child) and is_first_child_skippable_at_end(current_child):
                            sa2, ea2, fp2, act2, skippable = extract_footprints(tree.children[i + j + 1])
                            for x in ea1:
                                for y in sa2:
                                    sequences.append((x, y))
                        j += 1
                        current_child = tree.children[i+j]
                    #include other following nodes based on skippable xor nodes
                    j = 1
                    while (i + j) < size-1 and current_child.operator == pt_op.Operator.XOR:
                        if is_first_child_skippable_at_start(current_child) and is_first_child_skippable_at_end(
                                current_child):
                            sa2, ea2, fp2, act2, skippable = extract_footprints(tree.children[i + j + 1])
                            for x in ea1:
                                for y in sa2:
                                    sequences.append((x, y))
                        j += 1
                        current_child = tree.children[i + j]
    
            elif tree.operator == pt_op.Operator.LOOP:
                # for every child append start activities, end activities, activities and footprints
                # still needs to be completed
                for c in tree.children:
                    sa, ea, fp, act, skippable = extract_footprints(c)
                    for ac in act:
                        activities.append(ac)
                    for x in fp[0]:
                        sequences.append(x)
                    for x in fp[1]:
                        parallel.append(x)
    
                # separate first child and other children
                first_child = tree.children[0]
                if ut.is_tau_leaf(first_child):
                    skip = True
                other_children = tree.children[1:]
                # get start and end activities of the first child
                sa, ea, fp, act, skippable = extract_footprints(first_child)
                start_activity = sa
                end_activity = ea
                # for every end activity of the first child build a sequence footprint with every start activity of every
                # other child
                for x in ea:
                    for c in other_children:
                        sa2, ea2, fp2, act2, skippable = extract_footprints(c)
                        for y in sa2:
                            sequences.append((x, y))
                        if skippable or ut.is_tau_leaf(c):
                            for x in ea:
                                for y in sa:
                                    sequences.append((x, y))
    
                for x in sa:
                    for c in other_children:
                        sa2, ea2, fp2, act2, skippable = extract_footprints(c)
                        for y in ea2:
                            sequences.append((y, x))
    
    
    
    
        elif ut.is_leaf(tree):
            # In this case we are in a leave and we initialise start end and activities with the label of the node
            if not ut.is_tau_leaf(tree):
                activities.append(tree.label)
                start_activity.append([tree.label, "ns"])
                end_activity.append([tree.label, "ns"])
    
        footprints = [sequences, parallel]
        return start_activity, end_activity, footprints, activities, skip
    
    
    def build_footprint_matrix(footprints, processtree):
        """
        This function build the footprint matrix from a given process tree and extracted footprints
        :param processtree:  a processtree object from pm4py library for which the footprints are extracted
        :param footprints: list of lists of sequential footprints and parallel footprints
        :return: a footprint matrix as a pandas dataframe with values of [1,2,3,4]
                1: -> ; 2: <- ; 3: || ; 4: #
                index and columns are the activities
        """
        activities = gu.extract_activity_names(gu.get_leaves(processtree))
        activity_count = len(activities)
        values = np.zeros((activity_count, activity_count), dtype=np.dtype('U25'))
        sequences = footprints[0]
        parallel = footprints[1]
        sequence_values = []
        for (x, y) in sequences:
            sequence_values.append((x[0], y[0]))
        parallel_values = []
        for (x, y) in parallel:
            parallel_values.append((x[0], y[0]))
        s1 = "->"
        s2 = "<-"
        s3 = "||"
        s4 = "#"
        for i in range(activity_count):
            for j in range(activity_count):
                if (activities[i], activities[j]) in sequence_values:
                    values[i, j] = s1
                    if i is not j:
                        values[j, i] = s2
                elif (activities[i], activities[j]) in parallel_values:
                    values[i, j] = s3
                    values[j, i] = s3
                elif (activities[i], activities[j]) not in sequence_values and (activities[j], activities[i]) not in sequence_values:
                    values[i, j] = s4
                    values[j, i] = s4
    
        foot_print = construct_dataframe(values, activities)
    
        return foot_print
    
    
    def extract_footprint_from_process_tree(tree):
        """
            This function extract a footprint matrix from a given process tree
            :param tree: a processtree object from pm4py
            :return: a footprint matrix as a pandas dataframe with values of [1,2,3,4]
                    1: -> ; 2: <- ; 3: || ; 4: #
                    index and columns are the activities
        """
        sa, ea, fp, activities, skippable = extract_footprints(tree)
        foot_print = build_footprint_matrix(fp, tree)
        return foot_print
    
    
    def extract_footprint_from_process_tree_old(tree):
        """
            This function extract a footprint matrix from a given process tree
            :param tree: a processtree object from pm4py
            :return: a footprint matrix as a pandas dataframe with values of [1,2,3,4]
                    1: -> ; 2: <- ; 3: || ; 4: #
                    index and columns are the activities
        """
        event_log = semantics.generate_log(tree, no_traces=100)
        foot_print = extract_footprint_from_eventlog(event_log)
        return foot_print
    
    
    if __name__ == "__main__":
        # footprint = extract_footprint_from_event_log(import_csv())
        # print(footprint)
        rep_tree = "+( 'a', ->( 'b', +( ->( *( 'c', τ ), X( 'e', τ ) ), X( 'd', 'f' ) ) ) )"
        tree = parse(rep_tree)
        print(rep_tree)
        t1 = time.time()
        footprint = extract_footprint_from_process_tree(tree)
        t2 = time.time()
        print(t2 - t1)
        print(footprint)