Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
classes.py 18.17 KiB
"""
File consists of several classes to model elements and assembly
layers of a composed device.
"""
from __future__ import annotations
import uuid
import json
import operator
from enum import Enum, auto
from typing import List, Dict, Optional
from copy import deepcopy


class AggregationLayer(Enum):
    """Describes the levl of aggregation for the objects LegoComponent
    and LegoAssembly and provides the 4 applicable layers.
    """

    SYSTEM = auto()
    ASSEMBLY = auto()
    SUBASSEMBLY = auto()
    COMPONENT = auto()


class LegoComponent:
    """
    A class for storing information about a single Lego component.

    ...

    Attributes
    ----------
    uuid : UUID
        A randomly generated unique identifier for the component.
    parent : None | LegoAssembly
        The parent of the component. Can be either None or a LegoAssembly object.
    layer : AggregationLayer
        An enumeration indicating the hierarchy level. For compoennts, this is
        COMPONENT by default.
    properties : dict
        Dictionary that holds all properties of the component that can be saved
        in a dictionary format.

    Methods
    -------
    clone(new_label=None)
        Returns a new instance of LegoComponent identical to the current instance.
    get_root_assembly()
        Returns the top-level assembly in which the component belongs.
    to_dict()
        Returns the current instance represented as a dictionary.
    """

    def __init__(
        self,
        label: Optional[str] = None,
        datasheet: Optional[dict] = None,
        *more_properties: dict,
        **kwargs,
    ) -> None:
        """
        Constructs all the necessary attributes for the LegoComponent object.

        Parameters
        ----------
            label : str, optional
                The name of the component to add.
            datasheet : dict, optional
                Metadata describing the component, read from datasheet.
            more_properties : tuple of dict, dict
                Additional dictionaries representing custom properties.
            kwargs
                Arbitrary keyword arguments representing custom properties.

        Raises
        ------
            ValueError
                If the type of more_properties argument is not a dictionary.
        """
        self.uuid: uuid.UUID = uuid.uuid4()
        self.parent: None | LegoAssembly = None
        self.layer: AggregationLayer = AggregationLayer.COMPONENT
        self.properties: dict = {}
        if label is not None:
            self.properties["label"] = label
        if datasheet is not None:
            self.properties.update(deepcopy(datasheet))
        for prop in more_properties:
            if isinstance(prop, dict):
                self.properties.update(deepcopy(prop))
            else:
                raise ValueError(f"Unexpected argument type: {type(more_properties)}")
        for key, value in kwargs.items():
            self.properties[key] = deepcopy(value)

    def clone(self, new_label: Optional[str] = None) -> LegoComponent:
        """
        Returns a new instance of LegoComponent identical to the current instance.

        This method creates a new instance of LegoComponent that is identical to the
        current instance with an optional different label.
        The assigned uuid changes and elements are copied.

        Parameters
        ----------
            new_label : str, optional
                A new label string to use. Defaults to None.

        Returns
        -------
        LegoComponent
            A new instance of LegoComponent that is identical to the current instance.

        """
        if new_label is None:
            new_label = self.properties["label"]
        clone = LegoComponent(None, None, deepcopy(self.properties))
        clone.properties["label"] = new_label
        return clone

    def get_root_assembly(self):
        """
        Returns the top-level assembly in which the component belongs.

        This method traverses the parent hierarchy of a LegoComponent object until it
        finds the root-level LegoAssembly, returning it to the caller. A parent is
        assigned when a LegoComponent or LegoItem is added to a LegoAssembly object.

        Returns
        -------
        None | LegoAssembly
            The root-level LegoAssembly or None if the component has no parent.
        """
        if self.parent is None:
            return None
        current_assembly = self.parent
        while current_assembly.parent is not None:
            current_assembly = current_assembly.parent
        return current_assembly

    def to_dict(self) -> Dict:
        """
        Returns the current instance represented as a dictionary.

        This method returns a dictionary representation of the LegoComponent object
        suitable for serialization as JSON.

        Returns
        -------
        dict
            A dictionary representation of the object.
        """
        dict_ = {
            "uuid": self.uuid,
            "properties": self.properties,
            "layer": self.layer,
        }
        return {"component": dict_}

    def __str__(self):
        """
        Simple string representation of the object.
        """
        return self.__repr__()

    def __repr__(self):
        """
        String representation of the object including the component label and UUID.
        """
        return f"LegoComponent {self.properties['label']} [{self.uuid}]"


class LegoAssembly:
    """
    Represents a Lego assembly that can contain Lego Components and subassemblies.

    Attributes:
        uuid (uuid.UUID): The unique ID of the assembly.
        parent (LegoAssembly or None): The parent assembly containing this one, if any.
        properties (dict): Optional properties for the assembly, such as a label.
        layer (AggregationLayer): The aggregation layer of the assembly.
        components (List[LegoComponent]): The list of contained components.
        assemblies (List[LegoAssembly]): The list of contained subassemblies.
    """

    def __init__(
        self,
        layer: AggregationLayer,
        label: Optional[str] = None,
        *properties: dict,
        **kwargs,
    ) -> None:
        """
        Initializes a new LegoAssembly instance.

        Args:
            layer (AggregationLayer): The aggregation layer of the assembly.
            label (Optional[str], optional): Optional label for the assembly.
                Defaults to None.
            properties (dict): Optional properties for the assembly, such as a label.
            **kwargs: Optional keyword arguments for additional properties.
        """
        self.uuid: uuid.UUID = uuid.uuid4()
        self.parent: None | LegoAssembly = None
        self.properties: dict = {}
        self.layer: AggregationLayer = layer
        if label is not None:
            self.properties["label"] = label
        for prop in properties:
            if isinstance(prop, dict):
                self.properties.update(deepcopy(prop))
            else:
                raise ValueError(f"Unexpected argument type: {type(properties)}")
        for key, value in kwargs.items():
            self.properties[key] = deepcopy(value)
        self.components: List[LegoComponent] = []
        self.assemblies: List[LegoAssembly] = []

    def add_component(self, component: LegoComponent | List[LegoComponent]) -> None:
        """
        Adds a Lego component to the current assembly.
        Use add() to handle both components and assemblies.

        Args:
            component (LegoComponent or List[LegoComponent]):
                The component or list of components to add.

        Raises:
            TypeError: If the argument is not a LegoComponent instance
                or a list of LegoComponent instances.
            AssertionError: If the component or a component with the same UUID
            is already contained in the assembly or any of its child assemblies.
        """
        if isinstance(component, list):
            for c in component:
                self.add_component(c)
            return

        if not isinstance(component, LegoComponent):
            raise TypeError(
                f"Argument should be of type {LegoComponent.__name__}, "
                f"got {type(component).__name__} instead."
            )

        if self.get_root_assembly().contains_uuid(component.uuid):
            raise AssertionError(
                f"This assembly or a subassembly already contains "
                f"the component with ID "
                f"{component.uuid}."
            )
        component.parent = self
        self.components.append(component)

    def add_assembly(self, assembly: LegoAssembly | List[LegoAssembly]) -> None:
        """
        Adds a subassembly to the current assembly.
        Use add() to handle both components and assemblies.

        Args:
            assembly (LegoAssembly or List[LegoAssembly]):
                The subassembly or list of subassemblies to add.

        Raises:
            TypeError: If the argument is not a LegoAssembly instance
                or a list of LegoAssembly instances.
            AssertionError: If the subassembly or a subassembly with the same UUID
                is already contained in the assembly or any of its child assemblies.
        """
        if isinstance(assembly, list):
            for a in assembly:
                self.add_assembly(a)
            return

        if not isinstance(assembly, LegoAssembly):
            raise TypeError(
                f"Argument should be of type {LegoAssembly.__name__}, "
                f"got {type(assembly).__name__} instead."
            )

        if self.get_root_assembly().contains_uuid(assembly.uuid):
            raise AssertionError(
                f"This assembly or a subassembly already contains "
                f"the assembly with ID {assembly.uuid}."
            )
        assembly.parent = self
        self.assemblies.append(assembly)

    def add(
        self, part: LegoAssembly | LegoComponent | List[LegoAssembly | LegoComponent]
    ) -> None:
        """
        Adds either a Lego component, a subassembly or a (mixed) list of them to the
        current assembly. Uses internal functions add_component() and add_assembly().

        Args:
            part (LegoAssembly or LegoComponent or List[LegoAssembly or LegoComponent]):
                The part or parts to add.

        Raises:
            TypeError: If the argument is not a LegoAssembly instance or a LegoComponent
                instance, or a (mixed) list of them.
        """
        if isinstance(part, LegoComponent):
            self.add_component(part)
        elif isinstance(part, LegoAssembly):
            self.add_assembly(part)
        elif isinstance(part, list):
            for p in part:
                self.add(p)
        else:
            raise TypeError(
                f"Argument should be of types {LegoAssembly.__name__}, "
                f"{LegoComponent.__name__} or a list of them. "
                f"Got {type(part).__name__} instead."
            )

    def children(self) -> Dict[str, List[LegoComponent] | List[LegoAssembly]]:
        """
        Returns a dictionary of the assembly's children (components and
        sub-assemblies), sorted by category.

        Returns:
            A dictionary with two keys - "components" and "assemblies" - and
            corresponding lists of LegoComponents and LegoAssemblies as values.
        """
        return {"components": self.components, "assemblies": self.assemblies}

    def get_component_list(self, max_depth: int = -1) -> List[LegoComponent]:
        """
        Gets a full list of all components contained in the assembly or any of
        its sub-assemblies.

        Args:
            max_depth: An integer indicating the maximum depth of recursion.
                -1 means unlimited depth. 0 returns only direc children.

        Returns:
            A list of all LegoComponents contained in the current assembly or
            its sub-assemblies.
        """
        component_list = []
        component_list.extend(self.components)
        if max_depth > 0 or max_depth < 0:
            for assembly in self.assemblies:
                component_list.extend(assembly.get_component_list(max_depth - 1))
        return component_list

    def get_root_assembly(self) -> LegoAssembly:
        """
        Returns the root LegoAssembly of the current assembly by recursively
        searching up the tree until the top-most parent (the root) is found.

        Returns:
            The root LegoAssembly of the current LegoAssembly instance.
        """
        current_assembly = self
        while current_assembly.parent is not None:
            current_assembly = current_assembly.parent
        return current_assembly

    def contains_uuid(self, uuid_: uuid.UUID):
        """
        Recursively searches through the assembly  and component
        tree to determine whether the specified UUID exists anywhere
        within it.

        Args:
            uuid_: A UUID object representing the ID to search for.

        Returns:
            True if the UUID exists in the assembly or any of its sub-assemblies,
                False otherwise.
        """
        component_ids = list(map(lambda c: c.uuid, self.components))
        if uuid_ in component_ids:
            return True
        assembly_ids = list(map(lambda a: a.uuid, self.assemblies))
        if uuid_ in assembly_ids:
            return True
        for assembly in self.assemblies:
            if assembly.contains_uuid(uuid_):
                return True
        return False

    def to_dict(self) -> Dict:
        """
        Serializes the current LegoAssembly instance and its descendants into a dict.

        Returns:
            A dictionary representation of the current assembly, including all
            of its component and sub-assembly children.
        """
        dict_ = {
            "uuid": self.uuid,
            "properties": self.properties,
            "layer": self.layer,
        }
        dict_["components"] = [component.to_dict() for component in self.components]
        dict_["assemblies"] = [assembly.to_dict() for assembly in self.assemblies]
        return {"assembly": dict_}

    def __repr__(self):
        """
        String representation of the object including the component label and UUID.
        """
        return f"LegoAssembly {self.properties['label']} [{self.uuid}]"

    def clone(self, label: Optional[str] = None) -> LegoAssembly:
        """
        Creates a deep clone of the current LegoAssembly instance, including
        all of its component and sub-assembly children. The assigned uuid changes.
        Optionally a new label can be passed.

        Args:
            label: The label (name) for the cloned assembly. If none is passed,
            uses the same label as the original assembly.

        Returns:
            The cloned LegoAssembly instance.
        """
        if label is None:
            label = self.properties["label"]
        clone = LegoAssembly(self.layer, None, deepcopy(self.properties))
        clone.properties["label"] = label
        for component in self.components:
            clone.add_component(component.clone())
        for assembly in self.assemblies:
            clone.add_assembly(assembly.clone())
        return clone


def print_assembly_tree(root, levels=None):
    """
    Prints the assembly tree starting from root with a visualization
    implemented with text characters.

    Args:
        root (LegoAssembly): The root of the assembly tree to print.
        levels (List[bool]): Internally used by recursion to know where
            to print vertical connection. Defaults to an empty list.
    """
    if not isinstance(root, LegoAssembly):
        raise TypeError(
            f"Argument should be of type {LegoAssembly.__name__}, "
            f"got {type(root).__name__} instead."
        )
    if levels is None:
        levels = []
    connection_padding = "".join(map(lambda draw: "" if draw else "    ", levels))
    assembly_padding = ""
    if len(levels) > 0:
        assembly_padding = "├── " if levels[-1] else "└── "
    print(f"{connection_padding[:-4]}{assembly_padding}{root}")
    """ Recursively print child components. """
    for i, assembly in enumerate(root.assemblies):
        is_last = i == len(root.assemblies) - 1 and len(root.components) == 0
        print_assembly_tree(assembly, [*levels, not is_last])
    """ Print the components. """
    for i, component in enumerate(root.components):
        component_padding = "├── " if i < len(root.components) - 1 else "└── "
        print(f"{connection_padding}{component_padding}{component}")


def correct_aggregation_hierarchy(root: LegoAssembly, strict: bool = False):
    """
    Recursively checks whether the aggregation hierarchy from `root` is correct.

    Args:
        root (LegoAssembly): The root of the assembly tree.
        strict (bool): If True, the function will return False if any assembly
            or component with a layer level equal to root's is found.
            Defaults to False.

    Returns:
        True if the aggregation hierarchy is correct. False otherwise.
    """
    if not isinstance(root, LegoAssembly):
        raise TypeError(
            f"Argument should be of type {LegoAssembly.__name__}, "
            f"got {type(root).__name__} instead."
        )
    higher_level = operator.le
    if strict:
        higher_level = operator.lt
    for component in root.components:
        if not higher_level(root.layer.value, component.layer.value):
            return False
    for assembly in root.assemblies:
        if not higher_level(root.layer.value, assembly.layer.value):
            return False
        if not correct_aggregation_hierarchy(assembly, strict):
            return False
    return True


class KPIEncoder(json.JSONEncoder):
    """
    JSON encoder that handles special class types for KPI serialization.
    """

    def default(self, o):
        """
        Overrides default method to handle special conversion cases.

        Args:
            o : Object to be converted.

        Returns:
            Converted object or super method if no applicable case is found.
        """
        if isinstance(o, uuid.UUID):
            return "kpi-" + str(o)
        if isinstance(o, (AggregationLayer)):
            return "kpi-" + o.name
        return super().default(o)