Source code for koi_net.infra.assembler

from typing import Any, Self

import structlog

from ..exceptions import BuildError
from .build_artifact import BuildArtifact, CompType
from .container import NodeContainer

log = structlog.stdlib.get_logger()


[docs] class Assembler: _container: type[NodeContainer] = NodeContainer # optional order overrides: _start_order: list[str] _stop_order: list[str] # annotation hack to show the components and container methods def __new__(cls, *args, **kwargs) -> Self | NodeContainer: """Returns assembled node container.""" log.debug(f"Assembling '{cls.__name__}'") artifact = BuildArtifact(cls) artifact.build() components = cls._build_components(artifact) log.debug("Returning assembled node") return cls._container( artifact=artifact, components=components, **kwargs ) @staticmethod def _build_components(artifact: BuildArtifact): """Returns assembled components as a dict.""" log.debug("Building components...") components: dict[str, Any] = {} for comp_name in artifact.init_order: # for comp_name, (comp_type, dep_names) in dep_graph.items(): comp = artifact.comp_dict[comp_name] comp_type = artifact.comp_types[comp_name] if comp_type == CompType.OBJECT: components[comp_name] = comp elif comp_type == CompType.SINGLETON: # builds depedency dict for current component dependencies = {} for dep in artifact.init_graph[comp_name]: if dep not in components: raise BuildError(f"Couldn't find required component '{dep}'") dependencies[dep] = components[dep] components[comp_name] = comp(**dependencies) log.debug("Done") return components