Refactor code for API #3

Merged
mikael.capelle merged 13 commits from dev/refactor-for-ui into master 2024-12-08 13:06:42 +00:00
3 changed files with 194 additions and 75 deletions
Showing only changes of commit ae4f42517c - Show all commits

View File

@ -1,90 +1,96 @@
import sys import sys
from typing import Any from typing import Any, Iterator
import numpy as np import numpy as np
import parse # type: ignore import parse # type: ignore
from numpy.typing import NDArray from numpy.typing import NDArray
from ..base import BaseSolver
def part1(sensor_to_beacon: dict[tuple[int, int], tuple[int, int]], row: int) -> int:
no_beacons_row_l: list[NDArray[np.floating[Any]]] = []
for (sx, sy), (bx, by) in sensor_to_beacon.items():
d = abs(sx - bx) + abs(sy - by) # closest
no_beacons_row_l.append(sx - np.arange(0, d - abs(sy - row) + 1)) # type: ignore
no_beacons_row_l.append(sx + np.arange(0, d - abs(sy - row) + 1)) # type: ignore
beacons_at_row = set(bx for (bx, by) in sensor_to_beacon.values() if by == row)
no_beacons_row = set(np.concatenate(no_beacons_row_l)).difference(beacons_at_row) # type: ignore
return len(no_beacons_row)
def part2_intervals( class Solver(BaseSolver):
sensor_to_beacon: dict[tuple[int, int], tuple[int, int]], xy_max: int def part1(
) -> tuple[int, int, int]: self, sensor_to_beacon: dict[tuple[int, int], tuple[int, int]], row: int
from tqdm import trange ) -> int:
no_beacons_row_l: list[NDArray[np.floating[Any]]] = []
for (sx, sy), (bx, by) in sensor_to_beacon.items():
d = abs(sx - bx) + abs(sy - by) # closest
no_beacons_row_l.append(sx - np.arange(0, d - abs(sy - row) + 1)) # type: ignore
no_beacons_row_l.append(sx + np.arange(0, d - abs(sy - row) + 1)) # type: ignore
beacons_at_row = set(bx for (bx, by) in sensor_to_beacon.values() if by == row)
no_beacons_row = set(np.concatenate(no_beacons_row_l)).difference(
beacons_at_row
) # type: ignore
return len(no_beacons_row)
def part2_intervals(
self, sensor_to_beacon: dict[tuple[int, int], tuple[int, int]], xy_max: int
) -> tuple[int, int, int]:
for y in self.progress.wrap(range(xy_max + 1)):
its: list[tuple[int, int]] = []
for (sx, sy), (bx, by) in sensor_to_beacon.items():
d = abs(sx - bx) + abs(sy - by)
dx = d - abs(sy - y)
if dx >= 0:
its.append((max(0, sx - dx), min(sx + dx, xy_max)))
its = sorted(its)
_, e = its[0]
for si, ei in its[1:]:
if si > e + 1:
return si - 1, y, 4_000_000 * (si - 1) + y
if ei > e:
e = ei
return (0, 0, 0)
def part2_cplex(
self, sensor_to_beacon: dict[tuple[int, int], tuple[int, int]], xy_max: int
) -> tuple[int, int, int]:
from docplex.mp.model import Model
m = Model()
x, y = m.continuous_var_list(2, ub=xy_max, name=["x", "y"])
for y in trange(xy_max + 1):
its: list[tuple[int, int]] = []
for (sx, sy), (bx, by) in sensor_to_beacon.items(): for (sx, sy), (bx, by) in sensor_to_beacon.items():
d = abs(sx - bx) + abs(sy - by) d = abs(sx - bx) + abs(sy - by)
dx = d - abs(sy - y) m.add_constraint(
m.abs(x - sx) + m.abs(y - sy) >= d + 1, ctname=f"ct_{sx}_{sy}"
) # type: ignore
if dx >= 0: m.set_objective("min", x + y)
its.append((max(0, sx - dx), min(sx + dx, xy_max)))
its = sorted(its) s = m.solve()
_, e = its[0] assert s is not None
for si, ei in its[1:]: vx = int(s.get_value(x))
if si > e + 1: vy = int(s.get_value(y))
return si - 1, y, 4_000_000 * (si - 1) + y return vx, vy, 4_000_000 * vx + vy
if ei > e:
e = ei
return (0, 0, 0) def solve(self, input: str) -> Iterator[Any]:
lines = input.splitlines()
sensor_to_beacon: dict[tuple[int, int], tuple[int, int]] = {}
def part2_cplex( for line in lines:
sensor_to_beacon: dict[tuple[int, int], tuple[int, int]], xy_max: int r: dict[str, str] = parse.parse( # type: ignore
) -> tuple[int, int, int]: "Sensor at x={sx}, y={sy}: closest beacon is at x={bx}, y={by}", line
from docplex.mp.model import Model )
sensor_to_beacon[int(r["sx"]), int(r["sy"])] = (int(r["bx"]), int(r["by"]))
m = Model() xy_max = 4_000_000 if max(sensor_to_beacon) > (1_000, 0) else 20
row = 2_000_000 if max(sensor_to_beacon) > (1_000, 0) else 10
x, y = m.continuous_var_list(2, ub=xy_max, name=["x", "y"]) yield self.part1(sensor_to_beacon, row)
for (sx, sy), (bx, by) in sensor_to_beacon.items(): # x, y, a2 = part2_cplex(sensor_to_beacon, xy_max)
d = abs(sx - bx) + abs(sy - by) x, y, a2 = self.part2_intervals(sensor_to_beacon, xy_max)
m.add_constraint(m.abs(x - sx) + m.abs(y - sy) >= d + 1, ctname=f"ct_{sx}_{sy}") # type: ignore self.logger.info("answer 2 is {at} (x={x}, y={y})")
yield a2
m.set_objective("min", x + y)
s = m.solve()
assert s is not None
vx = int(s.get_value(x))
vy = int(s.get_value(y))
return vx, vy, 4_000_000 * vx + vy
lines = sys.stdin.read().splitlines()
sensor_to_beacon: dict[tuple[int, int], tuple[int, int]] = {}
for line in lines:
r: dict[str, str] = parse.parse( # type: ignore
"Sensor at x={sx}, y={sy}: closest beacon is at x={bx}, y={by}", line
)
sensor_to_beacon[int(r["sx"]), int(r["sy"])] = (int(r["bx"]), int(r["by"]))
xy_max = 4_000_000 if max(sensor_to_beacon) > (1_000, 0) else 20
row = 2_000_000 if max(sensor_to_beacon) > (1_000, 0) else 10
print(f"answer 1 is {part1(sensor_to_beacon, row)}")
# x, y, a2 = part2_cplex(sensor_to_beacon, xy_max)
x, y, a2 = part2_intervals(sensor_to_beacon, xy_max)
print(f"answer 2 is {a2} (x={x}, y={y})")

View File

@ -2,12 +2,99 @@ import argparse
import importlib import importlib
import json import json
import logging import logging
import logging.handlers
import sys import sys
from datetime import datetime from datetime import datetime, timedelta
from pathlib import Path from pathlib import Path
from typing import Any, Iterable, Iterator, Literal, Sequence, TextIO, TypeVar
from tqdm import tqdm
from .base import BaseSolver from .base import BaseSolver
_T = TypeVar("_T")
def dump_api_message(
type: Literal["log", "answer", "progress-start", "progress-step", "progress-end"],
content: Any,
file: TextIO = sys.stdout,
):
print(
json.dumps(
{"type": type, "time": datetime.now().isoformat(), "content": content}
),
file=file,
)
class LoggerAPIHandler(logging.Handler):
def __init__(self, output: TextIO = sys.stdout):
super().__init__()
self.output = output
def emit(self, record: logging.LogRecord):
dump_api_message(
"log", {"level": record.levelname, "message": record.getMessage()}
)
class ProgressAPI:
def __init__(
self,
min_step: int = 1,
min_time: timedelta = timedelta(milliseconds=100),
output: TextIO = sys.stdout,
):
super().__init__()
self.counter = 0
self.output = output
self.min_step = min_step
self.min_time = min_time
def wrap(
self, values: Sequence[_T] | Iterable[_T], total: int | None = None
) -> Iterator[_T]:
total = total or len(values) # type: ignore
current = self.counter
self.counter += 1
dump_api_message("progress-start", {"counter": current, "total": total})
try:
percent = 0
time = datetime.now()
for i_value, value in enumerate(values):
yield value
if datetime.now() - time < self.min_time:
continue
time = datetime.now()
c_percent = round(i_value / total * 100)
if c_percent >= percent + self.min_step:
dump_api_message(
"progress-step", {"counter": current, "percent": c_percent}
)
percent = c_percent
finally:
dump_api_message(
"progress-end",
{"counter": current},
)
class ProgressTQDM:
def wrap(
self, values: Sequence[_T] | Iterable[_T], total: int | None = None
) -> Iterator[_T]:
return iter(tqdm(values, total=total))
def main(): def main():
parser = argparse.ArgumentParser("Holt59 Advent-Of-Code Runner") parser = argparse.ArgumentParser("Holt59 Advent-Of-Code Runner")
@ -46,7 +133,10 @@ def main():
day: int = args.day day: int = args.day
# TODO: change this # TODO: change this
logging.basicConfig(level=logging.INFO if verbose else logging.WARNING) logging.basicConfig(
level=logging.INFO if verbose or api else logging.WARNING,
handlers=[LoggerAPIHandler()] if api else None,
)
if input_path is None: if input_path is None:
input_path = Path(__file__).parent.joinpath( input_path = Path(__file__).parent.joinpath(
@ -59,7 +149,12 @@ def main():
).Solver ).Solver
solver = solver_class( solver = solver_class(
logging.getLogger("AOC"), verbose=verbose, year=year, day=day, outputs=not api logging.getLogger("AOC"),
verbose=verbose,
year=year,
day=day,
progress=ProgressAPI() if api else ProgressTQDM(), # type: ignore
outputs=not api,
) )
data: str data: str
@ -78,6 +173,7 @@ def main():
print( print(
json.dumps( json.dumps(
{ {
"type": "answer",
"answer": i_answer + 1, "answer": i_answer + 1,
"value": answer, "value": answer,
"answerTime_s": (current - last).total_seconds(), "answerTime_s": (current - last).total_seconds(),

View File

@ -1,16 +1,33 @@
from abc import abstractmethod from abc import abstractmethod
from logging import Logger from logging import Logger
from typing import Any, Final, Iterator from typing import Any, Final, Iterable, Iterator, Protocol, Sequence, TypeVar, overload
_T = TypeVar("_T")
class ProgressHandler(Protocol):
@overload
def wrap(self, values: Sequence[_T]) -> Iterator[_T]: ...
@overload
def wrap(self, values: Iterable[_T], total: int) -> Iterator[_T]: ...
class BaseSolver: class BaseSolver:
def __init__( def __init__(
self, logger: Logger, verbose: bool, year: int, day: int, outputs: bool = False self,
logger: Logger,
verbose: bool,
year: int,
day: int,
progress: ProgressHandler,
outputs: bool = False,
): ):
self.logger: Final = logger self.logger: Final = logger
self.verbose: Final = verbose self.verbose: Final = verbose
self.year: Final = year self.year: Final = year
self.day: Final = day self.day: Final = day
self.progress: Final = progress
self.outputs = outputs self.outputs = outputs
@abstractmethod @abstractmethod