import argparse import importlib import json import logging import logging.handlers import sys from datetime import datetime, timedelta from pathlib import Path from typing import Any, Iterable, Iterator, Literal, Sequence, TextIO, TypeVar from tqdm import tqdm from .base import BaseSolver _T = TypeVar("_T") def dump_api_message( type: Literal["log", "answer", "progress-start", "progress-step", "progress-end"], content: Any, file: TextIO = sys.stdout, ): print( json.dumps( {"type": type, "time": datetime.now().isoformat(), "content": content} ), flush=True, file=file, ) class LoggerAPIHandler(logging.Handler): def __init__(self, output: TextIO = sys.stdout): super().__init__() self.output = output def emit(self, record: logging.LogRecord): dump_api_message( "log", {"level": record.levelname, "message": record.getMessage()} ) class ProgressAPI: def __init__( self, min_step: int = 1, min_time: timedelta = timedelta(milliseconds=100), output: TextIO = sys.stdout, ): super().__init__() self.counter = 0 self.output = output self.min_step = min_step self.min_time = min_time def wrap( self, values: Sequence[_T] | Iterable[_T], total: int | None = None ) -> Iterator[_T]: total = total or len(values) # type: ignore current = self.counter self.counter += 1 dump_api_message("progress-start", {"counter": current, "total": total}) try: percent = 0 time = datetime.now() for i_value, value in enumerate(values): yield value if datetime.now() - time < self.min_time: continue time = datetime.now() c_percent = round(i_value / total * 100) if c_percent >= percent + self.min_step: dump_api_message( "progress-step", {"counter": current, "percent": c_percent} ) percent = c_percent finally: dump_api_message( "progress-end", {"counter": current}, ) class ProgressTQDM: def wrap( self, values: Sequence[_T] | Iterable[_T], total: int | None = None ) -> Iterator[_T]: return iter(tqdm(values, total=total)) class ProgressNone: def wrap( self, values: Sequence[_T] | Iterable[_T], total: int | None = None ) -> Iterator[_T]: return iter(values) def main(): parser = argparse.ArgumentParser("Holt59 Advent-Of-Code Runner") parser.add_argument("-v", "--verbose", action="store_true", help="verbose mode") parser.add_argument("-t", "--test", action="store_true", help="test mode") parser.add_argument("-a", "--api", action="store_true", help="API mode") parser.add_argument( "-u", "--user", type=str, default="holt59", help="user input to use" ) parser.add_argument( "--stdin", action="store_true", default=False, help="use stdin as input", ) parser.add_argument( "-i", "--input", type=Path, default=None, help="input to use (override user and test)", ) parser.add_argument("-y", "--year", type=int, help="year to run", default=2024) parser.add_argument("day", type=int, help="day to run") args = parser.parse_args() verbose: bool = args.verbose api: bool = args.api test: bool = args.test stdin: bool = args.stdin user: str = args.user input_path: Path | None = args.input year: int = args.year day: int = args.day # TODO: change this logging.basicConfig( level=logging.INFO if verbose or api else logging.WARNING, handlers=[LoggerAPIHandler()] if api else None, ) if input_path is None: input_path = Path(__file__).parent.joinpath( "inputs", "tests" if test else user, str(year), f"day{day}.txt" ) assert input_path.exists(), f"{input_path} missing" solver_class: type[BaseSolver] = importlib.import_module( f".{year}.day{day}", __package__ ).Solver solver = solver_class( logging.getLogger("AOC"), verbose=verbose, year=year, day=day, progress=ProgressAPI() if api else ProgressTQDM() if verbose else ProgressNone(), # type: ignore outputs=not api, ) data: str if stdin: data = sys.stdin.read() else: with open(input_path) as fp: data = fp.read() start = datetime.now() last = start it = solver.solve(data.rstrip()) if it is None: solver.logger.error(f"no implementation for {year} day {day}") exit() for i_answer, answer in enumerate(it): current = datetime.now() if api: dump_api_message( "answer", { "answer": i_answer + 1, "value": answer, "answerTime_s": (current - last).total_seconds(), "totalTime_s": (current - start).total_seconds(), }, ) else: print( f"answer {i_answer + 1} is {answer} (found in {(current - last).total_seconds():.2f}s)" ) last = current