""" File consists of several classes to model elements and assembly layers of a composed device. """ from __future__ import annotations import uuid import json import operator from enum import Enum, auto from typing import List, Dict, Optional from copy import deepcopy class AggregationLayer(Enum): """Describes the levl of aggregation for the objects LegoComponent and LegoAssembly and provides the 4 applicable layers. """ SYSTEM = auto() ASSEMBLY = auto() SUBASSEMBLY = auto() COMPONENT = auto() class LegoComponent: """ A class for storing information about a single Lego component. ... Attributes ---------- uuid : UUID A randomly generated unique identifier for the component. parent : None | LegoAssembly The parent of the component. Can be either None or a LegoAssembly object. layer : AggregationLayer An enumeration indicating the hierarchy level. For compoennts, this is COMPONENT by default. properties : dict Dictionary that holds all properties of the component that can be saved in a dictionary format. Methods ------- clone(new_label=None) Returns a new instance of LegoComponent identical to the current instance. get_root_assembly() Returns the top-level assembly in which the component belongs. to_dict() Returns the current instance represented as a dictionary. """ def __init__( self, label: Optional[str] = None, datasheet: Optional[dict] = None, *more_properties: dict, **kwargs, ) -> None: """ Constructs all the necessary attributes for the LegoComponent object. Parameters ---------- label : str, optional The name of the component to add. datasheet : dict, optional Metadata describing the component, read from datasheet. more_properties : tuple of dict, dict Additional dictionaries representing custom properties. kwargs Arbitrary keyword arguments representing custom properties. Raises ------ ValueError If the type of more_properties argument is not a dictionary. """ self.uuid: uuid.UUID = uuid.uuid4() self.parent: None | LegoAssembly = None self.layer: AggregationLayer = AggregationLayer.COMPONENT self.properties: dict = {} if label is not None: self.properties["label"] = label if datasheet is not None: self.properties.update(deepcopy(datasheet)) for prop in more_properties: if isinstance(prop, dict): self.properties.update(deepcopy(prop)) else: raise ValueError(f"Unexpected argument type: {type(more_properties)}") for key, value in kwargs.items(): self.properties[key] = deepcopy(value) def clone(self, new_label: Optional[str] = None) -> LegoComponent: """ Returns a new instance of LegoComponent identical to the current instance. This method creates a new instance of LegoComponent that is identical to the current instance with an optional different label. The assigned uuid changes and elements are copied. Parameters ---------- new_label : str, optional A new label string to use. Defaults to None. Returns ------- LegoComponent A new instance of LegoComponent that is identical to the current instance. """ if new_label is None: new_label = self.properties["label"] clone = LegoComponent(None, None, deepcopy(self.properties)) clone.properties["label"] = new_label return clone def get_root_assembly(self): """ Returns the top-level assembly in which the component belongs. This method traverses the parent hierarchy of a LegoComponent object until it finds the root-level LegoAssembly, returning it to the caller. A parent is assigned when a LegoComponent or LegoItem is added to a LegoAssembly object. Returns ------- None | LegoAssembly The root-level LegoAssembly or None if the component has no parent. """ if self.parent is None: return None current_assembly = self.parent while current_assembly.parent is not None: current_assembly = current_assembly.parent return current_assembly def to_dict(self) -> Dict: """ Returns the current instance represented as a dictionary. This method returns a dictionary representation of the LegoComponent object suitable for serialization as JSON. Returns ------- dict A dictionary representation of the object. """ dict_ = { "uuid": self.uuid, "properties": self.properties, "layer": self.layer, } return {"component": dict_} def __str__(self): """ Simple string representation of the object. """ return self.__repr__() def __repr__(self): """ String representation of the object including the component label and UUID. """ return f"LegoComponent {self.properties['label']} [{self.uuid}]" class LegoAssembly: """ Represents a Lego assembly that can contain Lego Components and subassemblies. Attributes: uuid (uuid.UUID): The unique ID of the assembly. parent (LegoAssembly or None): The parent assembly containing this one, if any. properties (dict): Optional properties for the assembly, such as a label. layer (AggregationLayer): The aggregation layer of the assembly. components (List[LegoComponent]): The list of contained components. assemblies (List[LegoAssembly]): The list of contained subassemblies. """ def __init__( self, layer: AggregationLayer, label: Optional[str] = None, *properties: dict, **kwargs, ) -> None: """ Initializes a new LegoAssembly instance. Args: layer (AggregationLayer): The aggregation layer of the assembly. label (Optional[str], optional): Optional label for the assembly. Defaults to None. properties (dict): Optional properties for the assembly, such as a label. **kwargs: Optional keyword arguments for additional properties. """ self.uuid: uuid.UUID = uuid.uuid4() self.parent: None | LegoAssembly = None self.properties: dict = {} self.layer: AggregationLayer = layer if label is not None: self.properties["label"] = label for prop in properties: if isinstance(prop, dict): self.properties.update(deepcopy(prop)) else: raise ValueError(f"Unexpected argument type: {type(properties)}") for key, value in kwargs.items(): self.properties[key] = deepcopy(value) self.components: List[LegoComponent] = [] self.assemblies: List[LegoAssembly] = [] def add_component(self, component: LegoComponent | List[LegoComponent]) -> None: """ Adds a Lego component to the current assembly. Use add() to handle both components and assemblies. Args: component (LegoComponent or List[LegoComponent]): The component or list of components to add. Raises: TypeError: If the argument is not a LegoComponent instance or a list of LegoComponent instances. AssertionError: If the component or a component with the same UUID is already contained in the assembly or any of its child assemblies. """ if isinstance(component, list): for c in component: self.add_component(c) return if not isinstance(component, LegoComponent): raise TypeError( f"Argument should be of type {LegoComponent.__name__}, " f"got {type(component).__name__} instead." ) if self.get_root_assembly().contains_uuid(component.uuid): raise AssertionError( f"This assembly or a subassembly already contains " f"the component with ID " f"{component.uuid}." ) component.parent = self self.components.append(component) def add_assembly(self, assembly: LegoAssembly | List[LegoAssembly]) -> None: """ Adds a subassembly to the current assembly. Use add() to handle both components and assemblies. Args: assembly (LegoAssembly or List[LegoAssembly]): The subassembly or list of subassemblies to add. Raises: TypeError: If the argument is not a LegoAssembly instance or a list of LegoAssembly instances. AssertionError: If the subassembly or a subassembly with the same UUID is already contained in the assembly or any of its child assemblies. """ if isinstance(assembly, list): for a in assembly: self.add_assembly(a) return if not isinstance(assembly, LegoAssembly): raise TypeError( f"Argument should be of type {LegoAssembly.__name__}, " f"got {type(assembly).__name__} instead." ) if self.get_root_assembly().contains_uuid(assembly.uuid): raise AssertionError( f"This assembly or a subassembly already contains " f"the assembly with ID {assembly.uuid}." ) assembly.parent = self self.assemblies.append(assembly) def add( self, part: LegoAssembly | LegoComponent | List[LegoAssembly | LegoComponent] ) -> None: """ Adds either a Lego component, a subassembly or a (mixed) list of them to the current assembly. Uses internal functions add_component() and add_assembly(). Args: part (LegoAssembly or LegoComponent or List[LegoAssembly or LegoComponent]): The part or parts to add. Raises: TypeError: If the argument is not a LegoAssembly instance or a LegoComponent instance, or a (mixed) list of them. """ if isinstance(part, LegoComponent): self.add_component(part) elif isinstance(part, LegoAssembly): self.add_assembly(part) elif isinstance(part, list): for p in part: self.add(p) else: raise TypeError( f"Argument should be of types {LegoAssembly.__name__}, " f"{LegoComponent.__name__} or a list of them. " f"Got {type(part).__name__} instead." ) def children(self) -> Dict[str, List[LegoComponent] | List[LegoAssembly]]: """ Returns a dictionary of the assembly's children (components and sub-assemblies), sorted by category. Returns: A dictionary with two keys - "components" and "assemblies" - and corresponding lists of LegoComponents and LegoAssemblies as values. """ return {"components": self.components, "assemblies": self.assemblies} def get_component_list(self, max_depth: int = -1) -> List[LegoComponent]: """ Gets a full list of all components contained in the assembly or any of its sub-assemblies. Args: max_depth: An integer indicating the maximum depth of recursion. -1 means unlimited depth. 0 returns only direc children. Returns: A list of all LegoComponents contained in the current assembly or its sub-assemblies. """ component_list = [] component_list.extend(self.components) if max_depth > 0 or max_depth < 0: for assembly in self.assemblies: component_list.extend(assembly.get_component_list(max_depth - 1)) return component_list def get_root_assembly(self) -> LegoAssembly: """ Returns the root LegoAssembly of the current assembly by recursively searching up the tree until the top-most parent (the root) is found. Returns: The root LegoAssembly of the current LegoAssembly instance. """ current_assembly = self while current_assembly.parent is not None: current_assembly = current_assembly.parent return current_assembly def contains_uuid(self, uuid_: uuid.UUID): """ Recursively searches through the assembly and component tree to determine whether the specified UUID exists anywhere within it. Args: uuid_: A UUID object representing the ID to search for. Returns: True if the UUID exists in the assembly or any of its sub-assemblies, False otherwise. """ component_ids = list(map(lambda c: c.uuid, self.components)) if uuid_ in component_ids: return True assembly_ids = list(map(lambda a: a.uuid, self.assemblies)) if uuid_ in assembly_ids: return True for assembly in self.assemblies: if assembly.contains_uuid(uuid_): return True return False def to_dict(self) -> Dict: """ Serializes the current LegoAssembly instance and its descendants into a dict. Returns: A dictionary representation of the current assembly, including all of its component and sub-assembly children. """ dict_ = { "uuid": self.uuid, "properties": self.properties, "layer": self.layer, } dict_["components"] = [component.to_dict() for component in self.components] dict_["assemblies"] = [assembly.to_dict() for assembly in self.assemblies] return {"assembly": dict_} def __repr__(self): """ String representation of the object including the component label and UUID. """ return f"LegoAssembly {self.properties['label']} [{self.uuid}]" def clone(self, label: Optional[str] = None) -> LegoAssembly: """ Creates a deep clone of the current LegoAssembly instance, including all of its component and sub-assembly children. The assigned uuid changes. Optionally a new label can be passed. Args: label: The label (name) for the cloned assembly. If none is passed, uses the same label as the original assembly. Returns: The cloned LegoAssembly instance. """ if label is None: label = self.properties["label"] clone = LegoAssembly(self.layer, None, deepcopy(self.properties)) clone.properties["label"] = label for component in self.components: clone.add_component(component.clone()) for assembly in self.assemblies: clone.add_assembly(assembly.clone()) return clone def print_assembly_tree(root, levels=None): """ Prints the assembly tree starting from root with a visualization implemented with text characters. Args: root (LegoAssembly): The root of the assembly tree to print. levels (List[bool]): Internally used by recursion to know where to print vertical connection. Defaults to an empty list. """ if not isinstance(root, LegoAssembly): raise TypeError( f"Argument should be of type {LegoAssembly.__name__}, " f"got {type(root).__name__} instead." ) if levels is None: levels = [] connection_padding = "".join(map(lambda draw: "│ " if draw else " ", levels)) assembly_padding = "" if len(levels) > 0: assembly_padding = "├── " if levels[-1] else "└── " print(f"{connection_padding[:-4]}{assembly_padding}{root}") """ Recursively print child components. """ for i, assembly in enumerate(root.assemblies): is_last = i == len(root.assemblies) - 1 and len(root.components) == 0 print_assembly_tree(assembly, [*levels, not is_last]) """ Print the components. """ for i, component in enumerate(root.components): component_padding = "├── " if i < len(root.components) - 1 else "└── " print(f"{connection_padding}{component_padding}{component}") def correct_aggregation_hierarchy(root: LegoAssembly, strict: bool = False): """ Recursively checks whether the aggregation hierarchy from `root` is correct. Args: root (LegoAssembly): The root of the assembly tree. strict (bool): If True, the function will return False if any assembly or component with a layer level equal to root's is found. Defaults to False. Returns: True if the aggregation hierarchy is correct. False otherwise. """ if not isinstance(root, LegoAssembly): raise TypeError( f"Argument should be of type {LegoAssembly.__name__}, " f"got {type(root).__name__} instead." ) higher_level = operator.le if strict: higher_level = operator.lt for component in root.components: if not higher_level(root.layer.value, component.layer.value): return False for assembly in root.assemblies: if not higher_level(root.layer.value, assembly.layer.value): return False if not correct_aggregation_hierarchy(assembly, strict): return False return True class KPIEncoder(json.JSONEncoder): """ JSON encoder that handles special class types for KPI serialization. """ def default(self, o): """ Overrides default method to handle special conversion cases. Args: o : Object to be converted. Returns: Converted object or super method if no applicable case is found. """ if isinstance(o, uuid.UUID): return "kpi-" + str(o) if isinstance(o, (AggregationLayer)): return "kpi-" + o.name return super().default(o)