Source code for koi_net.infra.log_system
import os
import sys
import logging
from logging.handlers import RotatingFileHandler
from pathlib import Path
from datetime import datetime
from typing import Callable
import structlog
import colorama
shared_log_processors: list[Callable] = [
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.UnicodeDecoder(),
structlog.processors.CallsiteParameterAdder({
structlog.processors.CallsiteParameter.MODULE,
structlog.processors.CallsiteParameter.FUNC_NAME
}),
structlog.contextvars.merge_contextvars
]
[docs]
class PartitionedFileHandler(logging.Handler):
def __init__(
self,
log_file_name: str = "log.ndjson",
max_log_file_size: int = 10 * 1024 ** 2,
num_log_file_backups: int = 5,
log_file_encoding: str = "utf-8"
):
self.handlers: dict[str, RotatingFileHandler] = {}
self.log_file_name = log_file_name
self.max_log_file_size = max_log_file_size
self.max_log_file_backups = num_log_file_backups
self.log_file_encoding = log_file_encoding
self.processor_formatter = structlog.stdlib.ProcessorFormatter(
processor=structlog.processors.JSONRenderer(),
foreign_pre_chain=shared_log_processors
)
self.dropped_log_handler = RotatingFileHandler(
filename="dropped_logs.txt",
maxBytes=self.max_log_file_size,
backupCount=self.max_log_file_backups,
encoding=self.log_file_encoding,
delay=True
)
super().__init__()
[docs]
def del_handler(self, log_dir: str, wipe_logs: bool = False):
if log_dir in self.handlers:
self.handlers[log_dir].close()
if wipe_logs:
try:
os.remove(self.handlers[log_dir].baseFilename)
except OSError:
pass
del self.handlers[log_dir]
[docs]
def get_handler(self, log_dir: str):
if log_dir not in self.handlers:
file_handler = RotatingFileHandler(
filename=Path(log_dir) / Path(self.log_file_name),
maxBytes=self.max_log_file_size,
backupCount=self.max_log_file_backups,
encoding=self.log_file_encoding,
delay=True
)
file_handler.setFormatter(self.processor_formatter)
file_handler.setLevel(logging.DEBUG)
self.handlers[log_dir] = file_handler
return self.handlers[log_dir]
[docs]
def emit(self, record: logging.LogRecord):
if record.log_dir is not None:
log_dir = record.log_dir
elif type(record.msg) is dict and "log_dir" in record.msg:
log_dir = record.msg["log_dir"]
else:
self.dropped_log_handler.emit(record)
return
self.get_handler(str(log_dir)).emit(record)
[docs]
class LogSystem:
"""Configures and initializes the logging system."""
use_file_handler: bool
use_console_handler: bool
file_handler_log_level: int
console_handler_log_level: int
_instance = None
def __new__(
cls,
use_file_handler: bool = True,
use_console_handler: bool = True,
file_handler_log_level: int = logging.DEBUG,
console_handler_log_level: int = logging.DEBUG
):
"""Only instantiable once, other calls will return the first object."""
if not cls._instance:
obj = super().__new__(cls)
obj.use_file_handler = use_file_handler
obj.use_console_handler = use_console_handler
obj.file_handler_log_level = file_handler_log_level
obj.console_handler_log_level = console_handler_log_level
obj.configure()
cls._instance = obj
return cls._instance
[docs]
@staticmethod
def delete_file_handler(log_dir: str, wipe_logs: bool = False):
for handler in logging.getLogger().handlers:
if isinstance(handler, PartitionedFileHandler):
handler.del_handler(log_dir, wipe_logs=wipe_logs)
[docs]
def configure(self):
handlers = []
if self.use_file_handler:
handlers.append(PartitionedFileHandler())
if self.use_console_handler:
handlers.append(self.configure_console_handler())
logging.basicConfig(level=logging.DEBUG, handlers=handlers)
old_factory = logging.getLogRecordFactory()
def record_factory(*args, **kwargs):
record = old_factory(*args, *kwargs)
ctx = structlog.contextvars.get_contextvars()
record.log_dir = ctx.get("log_dir")
return record
logging.setLogRecordFactory(record_factory)
structlog.configure(
processors=shared_log_processors + [
structlog.stdlib.ProcessorFormatter.wrap_for_formatter],
wrapper_class=structlog.stdlib.BoundLogger,
logger_factory=structlog.stdlib.LoggerFactory(),
cache_logger_on_first_use=True,
)
[docs]
def configure_console_handler(self):
console_renderer = structlog.dev.ConsoleRenderer(
columns=[
# Render the timestamp without the key name in yellow.
structlog.dev.Column(
"timestamp",
structlog.dev.KeyValueColumnFormatter(
key_style=None,
value_style=colorama.Style.DIM,
reset_style=colorama.Style.RESET_ALL,
value_repr=lambda t: datetime.fromisoformat(t).strftime("%Y-%m-%d %H:%M:%S"),
),
),
structlog.dev.Column(
"level",
structlog.dev.LogLevelColumnFormatter(
level_styles={
level: colorama.Style.BRIGHT + color
for level, color in {
"critical": colorama.Fore.RED,
"exception": colorama.Fore.RED,
"error": colorama.Fore.RED,
"warn": colorama.Fore.YELLOW,
"warning": colorama.Fore.YELLOW,
"info": colorama.Fore.GREEN,
"debug": colorama.Fore.GREEN,
"notset": colorama.Back.RED,
}.items()
},
reset_style=colorama.Style.RESET_ALL,
width=9
)
),
# Render the event without the key name in bright magenta.
# Default formatter for all keys not explicitly mentioned. The key is
# cyan, the value is green.
structlog.dev.Column(
"path",
structlog.dev.KeyValueColumnFormatter(
key_style=None,
value_style=colorama.Fore.MAGENTA,
reset_style=colorama.Style.RESET_ALL,
value_repr=str,
width=30
),
),
structlog.dev.Column(
"event",
structlog.dev.KeyValueColumnFormatter(
key_style=None,
value_style=colorama.Fore.WHITE,
reset_style=colorama.Style.RESET_ALL,
value_repr=str,
width=30
),
),
structlog.dev.Column(
"",
structlog.dev.KeyValueColumnFormatter(
key_style=colorama.Fore.BLUE,
value_style=colorama.Fore.GREEN,
reset_style=colorama.Style.RESET_ALL,
value_repr=str,
),
)
]
)
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(
structlog.stdlib.ProcessorFormatter(
processor=console_renderer,
foreign_pre_chain=shared_log_processors
)
)
console_handler.setLevel(self.console_handler_log_level)
return console_handler