diff --git a/src/holt59/aoc/2022/day15.py b/src/holt59/aoc/2022/day15.py index ba537aa..fb13272 100644 --- a/src/holt59/aoc/2022/day15.py +++ b/src/holt59/aoc/2022/day15.py @@ -1,90 +1,96 @@ import sys -from typing import Any +from typing import Any, Iterator import numpy as np import parse # type: ignore from numpy.typing import NDArray - -def part1(sensor_to_beacon: dict[tuple[int, int], tuple[int, int]], row: int) -> int: - no_beacons_row_l: list[NDArray[np.floating[Any]]] = [] - - for (sx, sy), (bx, by) in sensor_to_beacon.items(): - d = abs(sx - bx) + abs(sy - by) # closest - - no_beacons_row_l.append(sx - np.arange(0, d - abs(sy - row) + 1)) # type: ignore - no_beacons_row_l.append(sx + np.arange(0, d - abs(sy - row) + 1)) # type: ignore - - beacons_at_row = set(bx for (bx, by) in sensor_to_beacon.values() if by == row) - no_beacons_row = set(np.concatenate(no_beacons_row_l)).difference(beacons_at_row) # type: ignore - - return len(no_beacons_row) +from ..base import BaseSolver -def part2_intervals( - sensor_to_beacon: dict[tuple[int, int], tuple[int, int]], xy_max: int -) -> tuple[int, int, int]: - from tqdm import trange +class Solver(BaseSolver): + def part1( + self, sensor_to_beacon: dict[tuple[int, int], tuple[int, int]], row: int + ) -> int: + no_beacons_row_l: list[NDArray[np.floating[Any]]] = [] + + for (sx, sy), (bx, by) in sensor_to_beacon.items(): + d = abs(sx - bx) + abs(sy - by) # closest + + no_beacons_row_l.append(sx - np.arange(0, d - abs(sy - row) + 1)) # type: ignore + no_beacons_row_l.append(sx + np.arange(0, d - abs(sy - row) + 1)) # type: ignore + + beacons_at_row = set(bx for (bx, by) in sensor_to_beacon.values() if by == row) + no_beacons_row = set(np.concatenate(no_beacons_row_l)).difference( + beacons_at_row + ) # type: ignore + + return len(no_beacons_row) + + def part2_intervals( + self, sensor_to_beacon: dict[tuple[int, int], tuple[int, int]], xy_max: int + ) -> tuple[int, int, int]: + for y in self.progress.wrap(range(xy_max + 1)): + its: list[tuple[int, int]] = [] + for (sx, sy), (bx, by) in sensor_to_beacon.items(): + d = abs(sx - bx) + abs(sy - by) + dx = d - abs(sy - y) + + if dx >= 0: + its.append((max(0, sx - dx), min(sx + dx, xy_max))) + + its = sorted(its) + _, e = its[0] + + for si, ei in its[1:]: + if si > e + 1: + return si - 1, y, 4_000_000 * (si - 1) + y + if ei > e: + e = ei + + return (0, 0, 0) + + def part2_cplex( + self, sensor_to_beacon: dict[tuple[int, int], tuple[int, int]], xy_max: int + ) -> tuple[int, int, int]: + from docplex.mp.model import Model + + m = Model() + + x, y = m.continuous_var_list(2, ub=xy_max, name=["x", "y"]) - for y in trange(xy_max + 1): - its: list[tuple[int, int]] = [] for (sx, sy), (bx, by) in sensor_to_beacon.items(): d = abs(sx - bx) + abs(sy - by) - dx = d - abs(sy - y) + m.add_constraint( + m.abs(x - sx) + m.abs(y - sy) >= d + 1, ctname=f"ct_{sx}_{sy}" + ) # type: ignore - if dx >= 0: - its.append((max(0, sx - dx), min(sx + dx, xy_max))) + m.set_objective("min", x + y) - its = sorted(its) - _, e = its[0] + s = m.solve() + assert s is not None - for si, ei in its[1:]: - if si > e + 1: - return si - 1, y, 4_000_000 * (si - 1) + y - if ei > e: - e = ei + vx = int(s.get_value(x)) + vy = int(s.get_value(y)) + return vx, vy, 4_000_000 * vx + vy - return (0, 0, 0) + def solve(self, input: str) -> Iterator[Any]: + lines = input.splitlines() + sensor_to_beacon: dict[tuple[int, int], tuple[int, int]] = {} -def part2_cplex( - sensor_to_beacon: dict[tuple[int, int], tuple[int, int]], xy_max: int -) -> tuple[int, int, int]: - from docplex.mp.model import Model + for line in lines: + r: dict[str, str] = parse.parse( # type: ignore + "Sensor at x={sx}, y={sy}: closest beacon is at x={bx}, y={by}", line + ) + sensor_to_beacon[int(r["sx"]), int(r["sy"])] = (int(r["bx"]), int(r["by"])) - m = Model() + xy_max = 4_000_000 if max(sensor_to_beacon) > (1_000, 0) else 20 + row = 2_000_000 if max(sensor_to_beacon) > (1_000, 0) else 10 - x, y = m.continuous_var_list(2, ub=xy_max, name=["x", "y"]) + yield self.part1(sensor_to_beacon, row) - for (sx, sy), (bx, by) in sensor_to_beacon.items(): - d = abs(sx - bx) + abs(sy - by) - m.add_constraint(m.abs(x - sx) + m.abs(y - sy) >= d + 1, ctname=f"ct_{sx}_{sy}") # type: ignore - - m.set_objective("min", x + y) - - s = m.solve() - assert s is not None - - vx = int(s.get_value(x)) - vy = int(s.get_value(y)) - return vx, vy, 4_000_000 * vx + vy - - -lines = sys.stdin.read().splitlines() - -sensor_to_beacon: dict[tuple[int, int], tuple[int, int]] = {} - -for line in lines: - r: dict[str, str] = parse.parse( # type: ignore - "Sensor at x={sx}, y={sy}: closest beacon is at x={bx}, y={by}", line - ) - sensor_to_beacon[int(r["sx"]), int(r["sy"])] = (int(r["bx"]), int(r["by"])) - -xy_max = 4_000_000 if max(sensor_to_beacon) > (1_000, 0) else 20 -row = 2_000_000 if max(sensor_to_beacon) > (1_000, 0) else 10 - -print(f"answer 1 is {part1(sensor_to_beacon, row)}") - -# x, y, a2 = part2_cplex(sensor_to_beacon, xy_max) -x, y, a2 = part2_intervals(sensor_to_beacon, xy_max) -print(f"answer 2 is {a2} (x={x}, y={y})") + # x, y, a2 = part2_cplex(sensor_to_beacon, xy_max) + x, y, a2 = self.part2_intervals(sensor_to_beacon, xy_max) + self.logger.info("answer 2 is {at} (x={x}, y={y})") + yield a2 diff --git a/src/holt59/aoc/__main__.py b/src/holt59/aoc/__main__.py index 7fba62d..90509b2 100644 --- a/src/holt59/aoc/__main__.py +++ b/src/holt59/aoc/__main__.py @@ -2,12 +2,99 @@ import argparse import importlib import json import logging +import logging.handlers import sys -from datetime import datetime +from datetime import datetime, timedelta from pathlib import Path +from typing import Any, Iterable, Iterator, Literal, Sequence, TextIO, TypeVar + +from tqdm import tqdm from .base import BaseSolver +_T = TypeVar("_T") + + +def dump_api_message( + type: Literal["log", "answer", "progress-start", "progress-step", "progress-end"], + content: Any, + file: TextIO = sys.stdout, +): + print( + json.dumps( + {"type": type, "time": datetime.now().isoformat(), "content": content} + ), + file=file, + ) + + +class LoggerAPIHandler(logging.Handler): + def __init__(self, output: TextIO = sys.stdout): + super().__init__() + self.output = output + + def emit(self, record: logging.LogRecord): + dump_api_message( + "log", {"level": record.levelname, "message": record.getMessage()} + ) + + +class ProgressAPI: + def __init__( + self, + min_step: int = 1, + min_time: timedelta = timedelta(milliseconds=100), + output: TextIO = sys.stdout, + ): + super().__init__() + + self.counter = 0 + self.output = output + self.min_step = min_step + self.min_time = min_time + + def wrap( + self, values: Sequence[_T] | Iterable[_T], total: int | None = None + ) -> Iterator[_T]: + total = total or len(values) # type: ignore + + current = self.counter + self.counter += 1 + + dump_api_message("progress-start", {"counter": current, "total": total}) + + try: + percent = 0 + time = datetime.now() + + for i_value, value in enumerate(values): + yield value + + if datetime.now() - time < self.min_time: + continue + + time = datetime.now() + + c_percent = round(i_value / total * 100) + + if c_percent >= percent + self.min_step: + dump_api_message( + "progress-step", {"counter": current, "percent": c_percent} + ) + percent = c_percent + finally: + dump_api_message( + "progress-end", + {"counter": current}, + ) + + +class ProgressTQDM: + def wrap( + self, values: Sequence[_T] | Iterable[_T], total: int | None = None + ) -> Iterator[_T]: + return iter(tqdm(values, total=total)) + def main(): parser = argparse.ArgumentParser("Holt59 Advent-Of-Code Runner") @@ -46,7 +133,10 @@ def main(): day: int = args.day # TODO: change this - logging.basicConfig(level=logging.INFO if verbose else logging.WARNING) + logging.basicConfig( + level=logging.INFO if verbose or api else logging.WARNING, + handlers=[LoggerAPIHandler()] if api else None, + ) if input_path is None: input_path = Path(__file__).parent.joinpath( @@ -59,7 +149,12 @@ def main(): ).Solver solver = solver_class( - logging.getLogger("AOC"), verbose=verbose, year=year, day=day, outputs=not api + logging.getLogger("AOC"), + verbose=verbose, + year=year, + day=day, + progress=ProgressAPI() if api else ProgressTQDM(), # type: ignore + outputs=not api, ) data: str @@ -78,6 +173,7 @@ def main(): print( json.dumps( { + "type": "answer", "answer": i_answer + 1, "value": answer, "answerTime_s": (current - last).total_seconds(), diff --git a/src/holt59/aoc/base.py b/src/holt59/aoc/base.py index 45893d2..97cb666 100644 --- a/src/holt59/aoc/base.py +++ b/src/holt59/aoc/base.py @@ -1,16 +1,33 @@ from abc import abstractmethod from logging import Logger -from typing import Any, Final, Iterator +from typing import Any, Final, Iterable, Iterator, Protocol, Sequence, TypeVar, overload + +_T = TypeVar("_T") + + +class ProgressHandler(Protocol): + @overload + def wrap(self, values: Sequence[_T]) -> Iterator[_T]: ... + + @overload + def wrap(self, values: Iterable[_T], total: int) -> Iterator[_T]: ... class BaseSolver: def __init__( - self, logger: Logger, verbose: bool, year: int, day: int, outputs: bool = False + self, + logger: Logger, + verbose: bool, + year: int, + day: int, + progress: ProgressHandler, + outputs: bool = False, ): self.logger: Final = logger self.verbose: Final = verbose self.year: Final = year self.day: Final = day + self.progress: Final = progress self.outputs = outputs @abstractmethod