Source code for koi_net.components.kobj_queue

import threading
from dataclasses import dataclass, field
from logging import Logger
from queue import Queue

from rid_lib.core import RID
from rid_lib.ext import Bundle, Manifest
from rid_lib.types import KoiNetNode

from ..protocol.event import Event, EventType
from ..protocol.knowledge_object import KnowledgeObject


[docs] @dataclass class KobjQueue: """Queue for knowledge objects entering the processing pipeline.""" log: Logger shutdown_signal: threading.Event q: Queue[KnowledgeObject] = field(init=False, default_factory=Queue)
[docs] def push( self, *, rid: RID | None = None, manifest: Manifest | None = None, bundle: Bundle | None = None, event: Event | None = None, kobj: KnowledgeObject | None = None, event_type: EventType | None = None, source: KoiNetNode | None = None ): """Pushes knowledge object to queue. Input may take the form of an RID, manifest, bundle, event, or knowledge object (with an optional event type for RID, manifest, or bundle objects). All objects will be normalized to knowledge objects and queued. """ if rid: _kobj = KnowledgeObject.from_rid(rid, event_type, source) elif manifest: _kobj = KnowledgeObject.from_manifest(manifest, event_type, source) elif bundle: _kobj = KnowledgeObject.from_bundle(bundle, event_type, source) elif event: _kobj = KnowledgeObject.from_event(event, source) elif kobj: _kobj = kobj else: raise ValueError("One of 'rid', 'manifest', 'bundle', 'event', or 'kobj' must be provided") self.q.put(_kobj) self.log.debug(f"Queued {_kobj!r}")
[docs] def wait(self): """Safe join, prevents deadlock if `kobj_worker` fails.""" while not self.shutdown_signal.wait(0.1): if self.q.unfinished_tasks == 0: return print("WAIT FAILED") raise RuntimeError("Shutdown while awaiting queue")