import cmd
import inspect
import shlex
from functools import wraps
from rich.console import Console
from rich.table import Table
from rich import box
from koi_net.infra import NodeState, LogSystem
from .module import module_interface
from .network import NetworkInterface
from .node import NodeInterface
[docs]
class KoiShell(cmd.Cmd):
# intro = "Welcome to the KOI shell, type help for a list of commands.\n"
prompt = ">=> "
file = None
def __init__(self):
super().__init__()
LogSystem(use_console_handler=False)
self.console = Console()
self.console.print("KOI shell: [cyan]type `help` for a list of commands[/cyan]")
self.network = NetworkInterface()
self.console.print(f"Found [green]{len(module_interface.module_names)} module(s)[/green] and [green]{len(self.network.nodes)} node(s)[/green]")
print()
[docs]
def emptyline(self):
pass
[docs]
@staticmethod
def load_node(func):
@wraps(func)
def wrapper(self: "KoiShell", name: str, *args):
node = self.network.resolve_node(name)
if node:
return func(self, node, *args)
else:
print(f"Could not find node '{name}'")
return wrapper
[docs]
@staticmethod
def parse_args(func):
@wraps(func)
def wrapper(self: "KoiShell", arg: str, *args):
parsed_args = shlex.split(arg)
return func(self, *parsed_args)
return wrapper
[docs]
@staticmethod
def validate_args(func):
@wraps(func)
def wrapper(self: "KoiShell", *args):
sig = inspect.signature(func)
try:
sig.bind(self, *args)
except TypeError as err:
self.console.print(f"[bold red]{str(err).capitalize()}[/bold red]")
return
return func(self, *args)
return wrapper
[docs]
def do_quit(self, arg: str):
if all(not node.initialized or node.state() == NodeState.IDLE for node in self.network.nodes):
return True
print("Nodes are still running! Run `network stop` first, or `QUIT` to exit anyway.")
[docs]
def do_QUIT(self, arg: str):
self.network.stop()
return True
[docs]
def do_help(self, subcmd: str):
table = Table("command", "arguments", "info", box=box.SIMPLE)
match subcmd:
case "node":
cmd_help = [
("add", r"<module ref> \[name]", "adds a new node of type <module ref> to the network, if unset, name defaults to <module ref>"),
("rm", "<name>", "removes a node from the network"),
("init", "<name>", "initializes a node"),
("wipe-config", "", "wipes a node's configuration, including private key"),
("wipe-cache", "<name>", "wipes a node's RID cache"),
("wipe-logs", "<name>", "wipes a node's logs"),
("config-get", "<name> <loc>", "prints the config value at the specified JSON pointer location"),
("config-set", "<name> <loc> <val>", "sets the config at JSON pointer location to value"),
("config-unset", "<name> <loc>", ""),
("info", "<name>", "shows info about nodes edges"),
("run", "<name>", "runs a node in the foreground"),
("start", "<name>", "starts a node in the background"),
("stop", "<name>", "stops a running background node"),
("list", "", "lists all nodes in the network")
]
case "network":
cmd_help = [
("sync", "", "synchronizes the local environment with the network configuration"),
("wipe-config", "", "wipes configuration, including private key, of all network nodes"),
("wipe-cache", "", "wipes RID cache of all network nodes"),
("wipe-logs", "", "wipes logs of all network nodes"),
("status", "", "lists the current state of all network nodes"),
("set-first-contact", "<name>", "Sets first contact of all nodes in the network"),
("unset-first-contact", "", "Unsets first contact from all nodes in the network"),
("run", "", "runs all network nodes in the foreground"),
("start", "", "starts all network nodes in the backround"),
("stop", "", "stops all running background nodes"),
]
case "module":
cmd_help = [
("list", "", "lists all detected node modules"),
]
case _:
cmd_help = [
("node", "<sub cmd>", "group of node commands"),
("network", "<sub cmd>", "group of network commands"),
("module", "<sub cmd>", "group of module commands"),
("help", r"\[cmd]", "list available commands or subcommands"),
("quit", "", "exits the shell"),
("QUIT", "", "exits the shell, even if nodes are running"),
]
for cmd_info in cmd_help:
table.add_row(*cmd_info)
self.console.print(table)
[docs]
@parse_args
def do_node(self, sub_cmd: str, *args):
match sub_cmd:
case "add":
self.node_add(*args)
case "rm":
self.node_rm(*args)
case "init":
self.node_init(*args)
case "list":
self.node_list(*args)
case "config-get":
self.node_config_get(*args)
case "config-set":
self.node_config_set(*args)
case "config-unset":
self.node_config_unset(*args)
case "info":
self.node_info(*args)
case "wipe-config":
self.node_wipe_config(*args)
case "wipe-cache":
self.node_wipe_cache(*args)
case "wipe-logs":
self.node_wipe_logs(*args)
case "start":
self.node_start(*args)
case "stop":
self.node_stop(*args)
case "run":
self.node_run(*args)
case _:
print(f"Unknown subcommand '{sub_cmd}'")
[docs]
@parse_args
def do_network(self, sub_cmd: str, *args):
match sub_cmd:
case "sync":
self.network_sync(*args)
case "wipe-config":
self.network_wipe_config(*args)
case "wipe-cache":
self.network_wipe_cache(*args)
case "wipe-logs":
self.network_wipe_logs(*args)
case "status":
self.network_status(*args)
case "set-first-contact":
self.network_set_first_contact(*args)
case "unset-first-contact":
self.network_unset_first_contact(*args)
case "start":
self.network_start(*args)
case "stop":
self.network_stop(*args)
case "run":
self.network_run(*args)
case "set-first-contact":
self.network_set_first_contact(*args)
case _:
print(f"Unknown subcommand '{sub_cmd}'")
[docs]
@parse_args
def do_module(self, sub_cmd: str, *args):
match sub_cmd:
case "list":
self.module_list(*args)
case "reload":
self.module_reload(*args)
case _:
print(f"Unknown subcommand '{sub_cmd}'")
[docs]
@validate_args
def node_add(self, module_ref: str, name: str | None = None):
name = name or module_ref
if self.network.resolve_node(name):
print(f"Node with name '{name}' already exists")
try:
node = NodeInterface.from_ref(name, module_ref)
except ModuleNotFoundError as err:
print(err)
return
if not node.exists():
node.create()
if node.initialized:
self.network.add_node(node)
[docs]
@validate_args
@load_node
def node_rm(self, node: NodeInterface):
if node.exists():
if node.state() == NodeState.IDLE:
node.delete()
print(f"Removed node '{node.name}'")
else:
print(f"Node is running, run `node stop {node.name}` first")
return
self.network.remove_node(node)
[docs]
@validate_args
@load_node
def node_init(self, node: NodeInterface):
node.init()
[docs]
@validate_args
def node_list(self):
table = Table("name", "module", "rid", box=box.SIMPLE)
for node in self.network.nodes:
if not node.exists():
continue
node_rid = node.node.config.koi_net.node_rid
table.add_row(node.name, node.module, str(node_rid))
self.console.print(table)
[docs]
@validate_args
@load_node
def node_config_get(self, node: NodeInterface, loc: str):
try:
val = node.get_config(loc)
self.console.print(val)
except KeyError:
pass
[docs]
@validate_args
@load_node
def node_config_set(self, node: NodeInterface, loc: str, val: str):
node.set_config(loc, val)
[docs]
@validate_args
@load_node
def node_config_unset(self, node: NodeInterface, loc: str):
node.unset_config(loc)
[docs]
@load_node
def node_info(self, node: NodeInterface):
node.info()
[docs]
@validate_args
@load_node
def node_run(self, node: NodeInterface):
node.run()
[docs]
@validate_args
@load_node
def node_start(self, node: NodeInterface):
node.start()
[docs]
@validate_args
@load_node
def node_stop(self, node: NodeInterface):
node.stop()
[docs]
@validate_args
@load_node
def node_wipe_config(self, node: NodeInterface):
node.wipe_config()
print(f"Wiped config of '{node.name}'")
[docs]
@validate_args
@load_node
def node_wipe_cache(self, node: NodeInterface):
node.wipe_cache()
print(f"Wiped RID cache of '{node.name}'")
[docs]
@validate_args
@load_node
def node_wipe_logs(self, node: NodeInterface):
node.wipe_logs()
print(f"Wiped logs of '{node.name}'")
[docs]
@validate_args
def network_sync(self):
self.network.sync()
[docs]
@validate_args
def network_wipe_cache(self):
self.network.wipe_cache()
[docs]
@validate_args
def network_wipe_logs(self):
self.network.wipe_logs()
[docs]
@validate_args
def network_wipe_config(self):
self.network.wipe_config()
[docs]
@validate_args
def network_status(self):
table = Table("node", "state", box=box.SIMPLE)
for node in self.network.nodes:
node_state = node.state()
match node_state:
case NodeState.IDLE:
c = "white"
case NodeState.STARTING:
c = "blue"
case NodeState.RUNNING:
c = "green"
case NodeState.STOPPING:
c = "red"
table.add_row(node.name, f"[{c}]{node.state()}[/{c}]")
self.console.print(table)
[docs]
@validate_args
def network_run(self):
self.network.run()
[docs]
@validate_args
def network_start(self):
self.network.start()
[docs]
@validate_args
def network_stop(self):
self.network.stop()
[docs]
@validate_args
def module_list(self):
table = Table("module", "alias(es)", box=box.SIMPLE)
module_alias_map: dict[str, set[str]] = {}
for alias, module in module_interface.alias_module_map.items():
module_alias_map.setdefault(module, set()).add(alias)
for module, alias_set in module_alias_map.items():
table.add_row(module, ','.join(alias_set))
self.console.print(table)
[docs]
@validate_args
def module_reload(self, module_ref: str):
try:
module = module_interface.resolve_ref(module_ref)
except ModuleNotFoundError:
self.console.print(f"Couldn't resolve module reference '{module_ref}'")
# can only reload modules of nodes at rest
updated_node_class = module_interface.load_class(module, reload_module=True)
affected_nodes = 0
updated_nodes = 0
for node in self.network.nodes:
if node.module != module:
continue
affected_nodes += 1
if node.state() != NodeState.IDLE:
self.console.print(f"Skipping running node '{node.name}'")
continue
node.set_node_class(updated_node_class)
updated_nodes += 1
if affected_nodes == 0:
self.console.print("No nodes were affected")
else:
self.console.print(f"Reload module for {updated_nodes}/{affected_nodes} nodes")
[docs]
def run():
koi_sh = KoiShell()
try:
koi_sh.cmdloop()
except KeyboardInterrupt:
print("\nDetected keyboard interrupt, hard quitting...")
koi_sh.do_QUIT("")