from dataclasses import dataclass
from rid_lib.ext import Bundle
from rid_lib.types import KoiNetEdge, KoiNetNode
from koi_net.protocol.node import NodeProfile, NodeType
from koi_net.protocol.edge import EdgeProfile, EdgeStatus, EdgeType
from koi_net.protocol.knowledge_object import KnowledgeObject
from koi_net.protocol.event import Event, EventType
from ..interfaces import KnowledgeHandler, STOP_CHAIN, HandlerType
from ..identity import NodeIdentity
from ..kobj_queue import KobjQueue
from ..event_queue import EventQueue
from ..cache import Cache
[docs]
@dataclass
class EdgeNegotiationHandler(KnowledgeHandler):
identity: NodeIdentity
cache: Cache
event_queue: EventQueue
kobj_queue: KobjQueue
handler_type = HandlerType.Bundle
rid_types = (KoiNetEdge,)
event_types = (EventType.NEW, EventType.UPDATE)
[docs]
def handle(self, kobj: KnowledgeObject):
"""Handles edge negotiation process.
Automatically approves proposed edges if they request RID types this
node can provide (or KOI node, edge RIDs). Validates the edge type
is allowed for the node type (partial nodes cannot use webhooks). If
edge is invalid, a `FORGET` event is sent to the other node.
"""
# only handle incoming events (ignore internal edge knowledge objects)
if kobj.source is None:
return
edge_profile = kobj.bundle.validate_contents(EdgeProfile)
# indicates peer subscribing to this node
if edge_profile.source == self.identity.rid:
if edge_profile.status != EdgeStatus.PROPOSED:
return
self.log.debug("Handling edge negotiation")
peer_rid = edge_profile.target
peer_bundle = self.cache.read(peer_rid)
if not peer_bundle:
self.log.warning(f"Peer {peer_rid!r} unknown to me")
return STOP_CHAIN
peer_profile = peer_bundle.validate_contents(NodeProfile)
# explicitly provided event RID types and (self) node + edge objects
provided_events = (
*self.identity.profile.provides.event,
KoiNetNode, KoiNetEdge
)
abort = False
if (edge_profile.edge_type == EdgeType.WEBHOOK and
peer_profile.node_type == NodeType.PARTIAL):
self.log.debug("Partial nodes cannot use webhooks")
abort = True
if not set(edge_profile.rid_types).issubset(provided_events):
not_provided = set(edge_profile.rid_types) - set(provided_events)
self.log.debug(f"Requested RID types {not_provided} not provided by this node")
abort = True
if abort:
event = Event.from_rid(EventType.FORGET, kobj.rid)
self.event_queue.push(event, peer_rid)
return STOP_CHAIN
else:
self.log.debug("Approving proposed edge")
edge_profile.status = EdgeStatus.APPROVED
updated_bundle = Bundle.generate(kobj.rid, edge_profile.model_dump())
self.kobj_queue.push(bundle=updated_bundle, event_type=EventType.UPDATE)
return
elif edge_profile.target == self.identity.rid:
if edge_profile.status == EdgeStatus.APPROVED:
self.log.debug("Edge approved by other node!")