Source code for koi_net.components.event_worker

import queue
import time
from dataclasses import dataclass

from rid_lib.ext import Cache
from rid_lib.types import KoiNetNode

from ..infra import depends_on
from ..config.base import BaseNodeConfig
from ..protocol.node import NodeProfile, NodeType
from ..exceptions import RequestError

from .event_queue import EventQueue
from .request_handler import RequestHandler
from .event_buffer import EventBuffer
from .interfaces import ThreadedComponent


[docs] class End: """Class for STOP_WORKER sentinel pushed to worker queues.""" pass
STOP_WORKER = End()
[docs] @dataclass class EventProcessingWorker(ThreadedComponent): """Thread worker that processes the `event_queue`.""" config: BaseNodeConfig cache: Cache event_queue: EventQueue request_handler: RequestHandler poll_event_buf: EventBuffer broadcast_event_buf: EventBuffer
[docs] def flush_and_broadcast(self, target: KoiNetNode, force_flush: bool = False): """Broadcasts all events to target in event buffer.""" # TODO: deal with automated retries when unreachable node's buffer is full try: with self.broadcast_event_buf.safe_flush(target, force_flush) as events: self.request_handler.broadcast_events(target, events=events) except RequestError: self.log.warning("Failed to reach target, event buffer reset") pass
[docs] @depends_on("kobj_worker") def stop(self): self.event_queue.q.put(STOP_WORKER) super().stop()
[docs] def run(self): while True: try: item = self.event_queue.q.get( timeout=self.config.koi_net.event_worker.queue_timeout) try: if item is STOP_WORKER: self.log.info(f"Received 'STOP_WORKER' signal, flushing all buffers...") for target in list(self.broadcast_event_buf.buffers.keys()): self.flush_and_broadcast(target, force_flush=True) return self.log.info(f"Dequeued {item.event!r} -> {item.target!r}") # determines which buffer to push event to based on target node type node_bundle = self.cache.read(item.target) if node_bundle: node_profile = node_bundle.validate_contents(NodeProfile) if node_profile.node_type == NodeType.FULL: self.broadcast_event_buf.push(item.target, item.event) elif node_profile.node_type == NodeType.PARTIAL: self.poll_event_buf.push(item.target, item.event) continue elif item.target == self.config.koi_net.first_contact.rid: self.broadcast_event_buf.push(item.target, item.event) else: self.log.warning(f"Couldn't handle event {item.event!r} in queue, node {item.target!r} unknown to me") continue buf_len = self.broadcast_event_buf.buf_len(item.target) if buf_len > self.config.koi_net.event_worker.max_buf_len: self.flush_and_broadcast(item.target) finally: self.event_queue.q.task_done() except queue.Empty: # On timeout, check all buffers for max wait time for target in list(self.broadcast_event_buf.buffers): start_time = self.broadcast_event_buf.start_time.get(target) if (start_time is None) or (self.broadcast_event_buf.buf_len(target) == 0): continue now = time.time() if (now - start_time) >= self.config.koi_net.event_worker.max_wait_time: self.flush_and_broadcast(target)