Source code for koi_net.components.sync_manager

from dataclasses import dataclass
from logging import Logger
from rid_lib import RIDType
from rid_lib.ext import Cache
from rid_lib.types import KoiNetNode

from ..config.base import BaseNodeConfig
from ..infra import depends_on
from ..exceptions import RequestError
from .graph import NetworkGraph
from .request_handler import RequestHandler
from .kobj_queue import KobjQueue
from ..protocol.node import NodeProfile, NodeType


[docs] @dataclass class SyncManager: """Handles state synchronization actions with other nodes.""" log: Logger graph: NetworkGraph cache: Cache config: BaseNodeConfig request_handler: RequestHandler kobj_queue: KobjQueue
[docs] @depends_on("graph", "kobj_worker") def start(self): """Catches up with providers on startup.""" self.catch_up_with_all(self.config.koi_net.rid_types_of_interest)
[docs] def catch_up_with_all(self, rid_types: list[RIDType]): node_providers = [] for rid_type in rid_types: providers = self.graph.get_neighbors( direction="in", allowed_type=rid_type ) if not node_providers: continue self.log.debug(f"Catching up with {rid_type} providers: {node_providers}") self.catch_up_with(node_providers, [rid_type])
[docs] def catch_up_with(self, nodes: list[KoiNetNode], rid_types: list[RIDType]): """Catches up with the state of RID types within other nodes.""" for node in nodes: node_bundle = self.cache.read(node) node_profile = node_bundle.validate_contents(NodeProfile) # can't catch up with partial nodes if node_profile.node_type != NodeType.FULL: continue try: payload = self.request_handler.fetch_manifests( node, rid_types=rid_types) except RequestError: continue for manifest in payload.manifests: self.kobj_queue.push( manifest=manifest, source=node )