import inspect
from collections import deque
from typing import TYPE_CHECKING, Any
import structlog
from ..exceptions import BuildError
from .consts import (
COMPONENT_TYPE_FIELD,
DEPENDS_ON_FIELD,
START_FUNC_NAME,
STOP_FUNC_NAME
)
from .component import CompType
if TYPE_CHECKING:
from .assembler import Assembler
log = structlog.stdlib.get_logger()
[docs]
class BuildArtifact:
assembler: "Assembler"
comp_dict: dict[str, Any]
comp_types: dict[str, CompType]
init_graph: dict[str, set[str]]
start_graph: dict[str, set[str]]
stop_graph: dict[str, set[str]]
init_order: list[str]
start_order: list[str]
stop_order: list[str]
def __init__(self, assembler: "Assembler"):
self.assembler = assembler
[docs]
def collect_components(self):
"""Collects components from class definition."""
self.comp_dict = {}
# adds components from class and all base classes. skips `type`, and runs in reverse so that sub classes override super class values
for base in reversed(inspect.getmro(self.assembler)[:-1]):
for k, v in vars(base).items():
# excludes built in, private, and `None` attributes
if k.startswith("_") or v is None:
continue
self.comp_dict[k] = v
log.debug(f"Collected {len(self.comp_dict)} components")
[docs]
def build_init_graph(self):
"""Builds dependency graph and component type map.
Graph representation is an adjacency list: the key is a component
name, and the value is a tuple containing names of the depedencies.
"""
self.comp_types = {}
self.init_graph = {}
for comp_name, comp in self.comp_dict.items():
init_dependencies = []
explict_type = getattr(comp, COMPONENT_TYPE_FIELD, None)
if explict_type:
self.comp_types[comp_name] = explict_type
elif not callable(comp):
# non callable components are objects treated "as is"
self.comp_types[comp_name] = CompType.OBJECT
else:
# callable components default to singletons
self.comp_types[comp_name] = CompType.SINGLETON
if self.comp_types[comp_name] == CompType.SINGLETON:
sig = inspect.signature(comp)
init_dependencies = set(sig.parameters)
# difference of sets: dependencies and component names
# non empty set indicates invalid dependency
invalid_init_deps = init_dependencies - set(self.comp_dict)
if invalid_init_deps:
log.warning(f"Ignoring undefined init dependencies {invalid_init_deps} on component '{comp_name}'")
init_dependencies -= invalid_init_deps
self.init_graph[comp_name] = init_dependencies
log.debug("Built init dependency graph")
[docs]
def build_start_graph(self):
self.start_graph = {}
start_components = {
name for name, comp in self.comp_dict.items()
if getattr(comp, START_FUNC_NAME, None)
}
for comp_name, comp in self.comp_dict.items():
if self.comp_types[comp_name] != CompType.SINGLETON:
continue
if comp_name not in start_components:
continue
start_func = getattr(comp, START_FUNC_NAME)
start_dependencies = getattr(start_func, DEPENDS_ON_FIELD, set())
invalid_start_deps = start_dependencies - start_components
if invalid_start_deps:
log.warning(f"Ignoring undefined start dependencies {invalid_start_deps} on component '{comp_name}'")
start_dependencies -= invalid_start_deps
self.start_graph[comp_name] = start_dependencies
log.debug("Built start dependency graph")
[docs]
def build_stop_graph(self):
self.stop_graph = {}
stop_components = {
name for name, comp in self.comp_dict.items()
if getattr(comp, STOP_FUNC_NAME, None)
}
reverse_start_graph = self.reverse_adj_list(self.start_graph)
for comp_name, comp in self.comp_dict.items():
if self.comp_types[comp_name] != CompType.SINGLETON:
continue
if comp_name not in stop_components:
continue
# looks for dependencies in this order:
# @depends_on decorator -> reverse start graph -> empty set
stop_func = getattr(comp, STOP_FUNC_NAME)
stop_dependencies = getattr(
stop_func,
DEPENDS_ON_FIELD,
# default:
reverse_start_graph.get(
comp_name,
# default:
set()
)
)
invalid_stop_deps = stop_dependencies - stop_components
if invalid_stop_deps:
log.warning(f"Ignoring undefined stop dependencies {invalid_stop_deps} on component '{comp_name}'")
stop_dependencies -= invalid_stop_deps
self.stop_graph[comp_name] = stop_dependencies
log.debug("Built stop dependency graph")
[docs]
@staticmethod
def reverse_adj_list(adj: dict[str, set[str]]):
r_adj: dict[str, set[str]] = {}
for node in adj:
r_adj.setdefault(node, set())
for n in adj[node]:
r_adj.setdefault(n, set())
r_adj[n].add(node)
return r_adj
[docs]
@staticmethod
def topo_sort(adj: dict[str, set[str]]):
"""Topological sort of direct graph using Kahn's algorithm."""
# reverse adj list: n -> incoming neighbors
r_adj = BuildArtifact.reverse_adj_list(adj)
# how many outgoing edges each node has
out_degree = {
n: len(neighbors)
for n, neighbors in adj.items()
}
# initializing queue: nodes w/o dependencies
queue = deque()
for node in out_degree:
if out_degree[node] == 0:
queue.append(node)
ordering = []
while queue:
# removes node from graph
n = queue.popleft()
ordering.append(n)
# updates out degree for nodes dependent on this node
for next_n in r_adj[n]:
out_degree[next_n] -= 1
# adds nodes now without dependencies to queue
if out_degree[next_n] == 0:
queue.append(next_n)
if len(ordering) != len(adj):
cycle_nodes = set(adj) - set(ordering)
raise BuildError(f"Found cycle in dependency graph, the following nodes could not be ordered: {cycle_nodes}")
return ordering
[docs]
def build_stop_order(self, start_order: list[str]) -> list[str]:
"""Builds component stop order.
Reverse of start order, only including components with a stop method.
NOTE: Components defining a stop method MUST also define a start method.
"""
stop_order = []
for comp_name in reversed(start_order):
comp = self.comp_dict[comp_name]
if getattr(comp, STOP_FUNC_NAME, None):
stop_order.append(comp_name)
return stop_order
[docs]
@staticmethod
def visualize(adj: dict[str, list[str]]) -> str:
"""Creates representation of dependency graph in Graphviz DOT language."""
s = "digraph G {\n"
for node, neighbors in adj.items():
if node == "graph":
node = "graph_"
s += f"\t{node};\n"
for n in neighbors:
if n == "graph":
n = "graph_"
s += f"\t{node} -> {n};\n"
s += "}"
return s
[docs]
def build(self):
log.debug("Creating build artifact...")
self.collect_components()
self.build_init_graph()
log.debug("Starting init graph topo sort...")
self.init_order = self.topo_sort(self.init_graph)
log.debug("Init order: " + " -> ".join(self.init_order))
self.build_start_graph()
log.debug("Starting start graph topo sort...")
self.start_order = self.topo_sort(self.start_graph)
log.debug("Start order: " + " -> ".join(self.start_order))
self.build_stop_graph()
log.debug("Starting stop graph topo sort...")
self.stop_order = self.topo_sort(self.stop_graph)
log.debug("Stop order: " + " -> ".join(self.stop_order))
log.debug("Done")