Source code for koi_net.components.knowledge_handlers.node_contact_handler

from dataclasses import dataclass

from rid_lib.ext import Bundle
from rid_lib.types import KoiNetNode

from koi_net.exceptions import RequestError
from koi_net.config.base import BaseNodeConfig
from koi_net.infra import depends_on
from koi_net.protocol.node import NodeProfile, NodeType
from koi_net.protocol.edge import EdgeProfile, EdgeStatus, EdgeType, generate_edge_bundle
from koi_net.protocol.knowledge_object import KnowledgeObject
from ..interfaces import KnowledgeHandler, HandlerType
from ..identity import NodeIdentity
from ..kobj_queue import KobjQueue
from ..cache import Cache
from ..request_handler import RequestHandler
from ..graph import NetworkGraph


[docs] @dataclass class NodeContactHandler(KnowledgeHandler): identity: NodeIdentity cache: Cache config: BaseNodeConfig kobj_queue: KobjQueue graph: NetworkGraph request_handler: RequestHandler handler_type = HandlerType.Network rid_types = (KoiNetNode,)
[docs] def process_node(self, node_rid: KoiNetNode, node_bundle: Bundle): # prevents nodes from attempting to form a self loop if node_rid == self.identity.rid: return node_profile = node_bundle.validate_contents(NodeProfile) # None indicates interest in all types, while an empty list would indicate interest in no types if self.config.koi_net.rid_types_of_interest is None: available_rid_types = node_profile.provides.event else: available_rid_types = list( set(self.config.koi_net.rid_types_of_interest) & set(node_profile.provides.event) ) if not available_rid_types: return edge_rid = self.graph.get_edge( source=node_rid, target=self.identity.rid, ) # already have an edge established if edge_rid: prev_edge_bundle = self.cache.read(edge_rid) edge_profile = prev_edge_bundle.validate_contents(EdgeProfile) if set(edge_profile.rid_types) == set(available_rid_types): # no change in rid types return self.log.info(f"Proposing updated edge with node provider {available_rid_types}") edge_profile.rid_types = available_rid_types edge_profile.status = EdgeStatus.PROPOSED edge_bundle = Bundle.generate(edge_rid, edge_profile.model_dump()) # no existing edge else: self.log.info(f"Proposing new edge with node provider {available_rid_types}") edge_bundle = generate_edge_bundle( source=node_rid, target=self.identity.rid, rid_types=available_rid_types, edge_type=( EdgeType.WEBHOOK if self.identity.profile.node_type == NodeType.FULL else EdgeType.POLL ) ) # queued for processing self.kobj_queue.push(bundle=edge_bundle) self.log.info("Catching up on network state") try: payload = self.request_handler.fetch_rids( node=node_rid, rid_types=available_rid_types ) except RequestError: self.log.info("Failed to reach node") return for rid in payload.rids: if rid == self.identity.rid: self.log.info("Skipping myself") continue if self.cache.exists(rid): self.log.info(f"Skipping known RID {rid!r}") continue # marked as external since we are handling RIDs from another node # will fetch remotely instead of checking local cache self.kobj_queue.push(rid=rid, source=node_rid) self.log.info("Done")
[docs] def handle(self, kobj: KnowledgeObject): """Makes contact with providers of RID types of interest. When an incoming node knowledge object is identified as a provider of an RID type of interest, this handler will propose a new edge subscribing to future node events, and fetch existing nodes to catch up to the current state. """ self.process_node(kobj.rid, kobj.bundle)
[docs] @depends_on("graph", "kobj_worker") def start(self): self.log.info("Starting node contact analysis on cached profiles...") for rid in self.cache.list_rids(rid_types=(KoiNetNode,)): bundle = self.cache.read(rid) if not bundle: continue self.process_node(rid, bundle)