File handling for API.

This commit is contained in:
Mikael CAPELLE 2024-12-10 15:38:00 +01:00
parent 3c544c559b
commit 46558672e8
18 changed files with 288 additions and 160 deletions

View File

@ -97,7 +97,12 @@ def neighbors(
class Solver(BaseSolver):
def print_path(self, path: list[tuple[int, int]], n_rows: int, n_cols: int) -> None:
def print_path(
self, name: str, path: list[tuple[int, int]], n_rows: int, n_cols: int
) -> None:
if not self.files:
return
end = path[-1]
graph = [["." for _c in range(n_cols)] for _r in range(n_rows)]
@ -118,8 +123,11 @@ class Solver(BaseSolver):
else:
assert False, "{} -> {} infeasible".format(path[i], path[i + 1])
for row in graph:
self.logger.info("".join(row))
self.files.create(
f"graph_{name}.txt",
"\n".join("".join(row) for row in graph).encode(),
text=True,
)
def solve(self, input: str) -> Iterator[Any]:
lines = input.splitlines()
@ -157,7 +165,7 @@ class Solver(BaseSolver):
path_1 = make_path(parents_1, start, end)
assert path_1 is not None
self.print_path(path_1, n_rows=len(grid), n_cols=len(grid[0]))
self.print_path("answer1", path_1, n_rows=len(grid), n_cols=len(grid[0]))
yield lengths_1[end] - 1
lengths_2, _ = dijkstra(

View File

@ -67,13 +67,16 @@ def flow(
class Solver(BaseSolver):
def print_blocks(self, blocks: dict[tuple[int, int], Cell]):
def print_blocks(self, name: str, blocks: dict[tuple[int, int], Cell]):
"""
Print the given set of blocks on a grid.
Args:
blocks: Set of blocks to print.
"""
if not self.files:
return
x_min, y_min, x_max, y_max = (
min(x for x, _ in blocks),
0,
@ -81,12 +84,16 @@ class Solver(BaseSolver):
max(y for _, y in blocks),
)
for y in range(y_min, y_max + 1):
self.logger.info(
self.files.create(
f"blocks_{name}.txt",
"\n".join(
"".join(
str(blocks.get((x, y), Cell.AIR)) for x in range(x_min, x_max + 1)
)
)
for y in range(y_min, y_max + 1)
).encode(),
True,
)
def solve(self, input: str) -> Iterator[Any]:
lines = [line.strip() for line in input.splitlines()]
@ -115,7 +122,7 @@ class Solver(BaseSolver):
for y in range(y_start, y_end):
blocks[x, y] = Cell.ROCK
self.print_blocks(blocks)
self.print_blocks("start", blocks)
y_max = max(y for _, y in blocks)
@ -124,7 +131,7 @@ class Solver(BaseSolver):
blocks_1 = flow(
blocks.copy(), stop_fn=lambda x, y: y > y_max, fill_fn=lambda x, y: Cell.AIR
)
self.print_blocks(blocks_1)
self.print_blocks("part1", blocks_1)
yield sum(v == Cell.SAND for v in blocks_1.values())
# === part 2 ===
@ -135,5 +142,5 @@ class Solver(BaseSolver):
fill_fn=lambda x, y: Cell.AIR if y < y_max + 2 else Cell.ROCK,
)
blocks_2[500, 0] = Cell.SAND
self.print_blocks(blocks_2)
self.print_blocks("part2", blocks_2)
yield sum(v == Cell.SAND for v in blocks_2.values())

View File

@ -83,18 +83,17 @@ class Solver(BaseSolver):
if (i, j) in loop_s and lines[i][j] in "|LJ":
cnt += 1
if self.verbose:
for i in range(len(lines)):
s = ""
for j in range(len(lines[0])):
if (i, j) == (si, sj):
s += "\033[91mS\033[0m"
elif (i, j) in loop:
s += lines[i][j]
elif (i, j) in inside:
s += "\033[92mI\033[0m"
else:
s += "."
self.logger.info(s)
if self.files:
rows = [["." for _j in range(len(lines[0]))] for _i in range(len(lines))]
rows[si][sj] = "\033[91mS\033[0m"
for i, j in loop:
rows[i][j] = lines[i][j]
for i, j in inside:
rows[i][j] = "\033[92mI\033[0m"
self.files.create(
"output.txt", "\n".join("".join(row) for row in rows).encode(), True
)
yield len(inside)

View File

@ -84,9 +84,14 @@ class Solver(BaseSolver):
beams = propagate(layout, (0, 0), "R")
if self.verbose:
for row in beams:
self.logger.info("".join("#" if col else "." for col in row))
if self.files:
self.files.create(
"beams.txt",
"\n".join(
"".join("#" if col else "." for col in row) for row in beams
).encode(),
True,
)
# part 1
yield sum(sum(map(bool, row)) for row in beams)

View File

@ -33,10 +33,14 @@ MAPPINGS: dict[Direction, tuple[int, int, Direction]] = {
class Solver(BaseSolver):
def print_shortest_path(
self,
name: str,
grid: list[list[int]],
target: tuple[int, int],
per_cell: dict[tuple[int, int], list[tuple[Label, int]]],
):
if not self.files:
return
assert len(per_cell[target]) == 1
label = per_cell[target][0][0]
@ -74,8 +78,9 @@ class Solver(BaseSolver):
p_grid[0][0] = f"\033[92m{grid[0][0]}\033[0m"
for row in p_grid:
self.logger.info("".join(row))
self.files.create(
name, "\n".join("".join(row) for row in p_grid).encode(), True
)
def shortest_many_paths(self, grid: list[list[int]]) -> dict[tuple[int, int], int]:
n_rows, n_cols = len(grid), len(grid[0])
@ -129,6 +134,7 @@ class Solver(BaseSolver):
def shortest_path(
self,
name: str,
grid: list[list[int]],
min_straight: int,
max_straight: int,
@ -217,8 +223,7 @@ class Solver(BaseSolver):
),
)
if self.verbose:
self.print_shortest_path(grid, target, per_cell)
self.print_shortest_path(f"shortest-path_{name}.txt", grid, target, per_cell)
return per_cell[target][0][1]
@ -227,7 +232,7 @@ class Solver(BaseSolver):
estimates = self.shortest_many_paths(data)
# part 1
yield self.shortest_path(data, 1, 3, lower_bounds=estimates)
yield self.shortest_path("answer_1", data, 1, 3, lower_bounds=estimates)
# part 2
yield self.shortest_path(data, 4, 10, lower_bounds=estimates)
yield self.shortest_path("answer_2", data, 4, 10, lower_bounds=estimates)

View File

@ -80,22 +80,23 @@ class Solver(BaseSolver):
outputs,
)
if self.outputs:
with open("./day20.dot", "w") as fp:
fp.write("digraph G {\n")
fp.write("rx [shape=circle, color=red, style=filled];\n")
for name, (type, outputs) in self._modules.items():
if type == "conjunction":
shape = "diamond"
elif type == "flip-flop":
shape = "box"
else:
shape = "circle"
fp.write(f"{name} [shape={shape}];\n")
for name, (type, outputs) in self._modules.items():
for output in outputs:
fp.write(f"{name} -> {output};\n")
fp.write("}\n")
if self.files:
contents = "digraph G {\n"
contents += "rx [shape=circle, color=red, style=filled];\n"
for name, (type, outputs) in self._modules.items():
if type == "conjunction":
shape = "diamond"
elif type == "flip-flop":
shape = "box"
else:
shape = "circle"
contents += f"{name} [shape={shape}];\n"
for name, (type, outputs) in self._modules.items():
for output in outputs:
contents += f"{name} -> {output};\n"
contents += "}\n"
self.files.create("day20.dot", contents.encode(), False)
# part 1
flip_flop_states: dict[str, Literal["on", "off"]] = {

View File

@ -50,7 +50,7 @@ class Solver(BaseSolver):
values.append(len(tiles := reachable(map, tiles, cycle)))
values.append(len(tiles := reachable(map, tiles, cycle)))
if self.verbose:
if self.files:
n_rows, n_cols = len(map), len(map[0])
rows = [
@ -66,8 +66,9 @@ class Solver(BaseSolver):
if (i // cycle) % 2 == (j // cycle) % 2:
rows[i][j] = f"\033[91m{rows[i][j]}\033[0m"
for row in rows:
self.logger.info("".join(row))
self.files.create(
"cycle.txt", "\n".join("".join(row) for row in rows).encode(), True
)
self.logger.info(f"values to fit: {values}")

View File

@ -1,107 +1,15 @@
import argparse
import importlib
import json
import logging
import logging.handlers
import sys
from datetime import datetime, timedelta
from datetime import datetime
from pathlib import Path
from typing import Any, Iterable, Iterator, Literal, Sequence, TextIO, TypeVar
from tqdm import tqdm
from .base import BaseSolver
_T = TypeVar("_T")
def dump_api_message(
type: Literal["log", "answer", "progress-start", "progress-step", "progress-end"],
content: Any,
file: TextIO = sys.stdout,
):
print(
json.dumps(
{"type": type, "time": datetime.now().isoformat(), "content": content}
),
flush=True,
file=file,
)
class LoggerAPIHandler(logging.Handler):
def __init__(self, output: TextIO = sys.stdout):
super().__init__()
self.output = output
def emit(self, record: logging.LogRecord):
dump_api_message(
"log", {"level": record.levelname, "message": record.getMessage()}
)
class ProgressAPI:
def __init__(
self,
min_step: int = 1,
min_time: timedelta = timedelta(milliseconds=100),
output: TextIO = sys.stdout,
):
super().__init__()
self.counter = 0
self.output = output
self.min_step = min_step
self.min_time = min_time
def wrap(
self, values: Sequence[_T] | Iterable[_T], total: int | None = None
) -> Iterator[_T]:
total = total or len(values) # type: ignore
current = self.counter
self.counter += 1
dump_api_message("progress-start", {"counter": current, "total": total})
try:
percent = 0
time = datetime.now()
for i_value, value in enumerate(values):
yield value
if datetime.now() - time < self.min_time:
continue
time = datetime.now()
c_percent = round(i_value / total * 100)
if c_percent >= percent + self.min_step:
dump_api_message(
"progress-step", {"counter": current, "percent": c_percent}
)
percent = c_percent
finally:
dump_api_message(
"progress-end",
{"counter": current},
)
class ProgressTQDM:
def wrap(
self, values: Sequence[_T] | Iterable[_T], total: int | None = None
) -> Iterator[_T]:
return iter(tqdm(values, total=total))
class ProgressNone:
def wrap(
self, values: Sequence[_T] | Iterable[_T], total: int | None = None
) -> Iterator[_T]:
return iter(values)
from .utils.api import FileHandlerAPI, LoggerAPIHandler, ProgressAPI, dump_answer
from .utils.files import SimpleFileHandler
from .utils.progress import ProgressNone, ProgressTQDM
def main():
@ -109,6 +17,13 @@ def main():
parser.add_argument("-v", "--verbose", action="store_true", help="verbose mode")
parser.add_argument("-t", "--test", action="store_true", help="test mode")
parser.add_argument("-a", "--api", action="store_true", help="API mode")
parser.add_argument(
"-o",
"--output",
type=Path,
default=Path("files"),
help="output folder for created files",
)
parser.add_argument(
"-u", "--user", type=str, default="holt59", help="user input to use"
)
@ -135,12 +50,12 @@ def main():
test: bool = args.test
stdin: bool = args.stdin
user: str = args.user
files_output: Path = args.output
input_path: Path | None = args.input
year: int = args.year
day: int = args.day
# TODO: change this
logging.basicConfig(
level=logging.INFO if verbose or api else logging.WARNING,
handlers=[LoggerAPIHandler()] if api else None,
@ -166,7 +81,11 @@ def main():
else ProgressTQDM()
if verbose
else ProgressNone(), # type: ignore
outputs=not api,
files=FileHandlerAPI(files_output)
if api
else SimpleFileHandler(logging.getLogger("AOC"), files_output)
if verbose
else None,
)
data: str
@ -189,14 +108,11 @@ def main():
current = datetime.now()
if api:
dump_api_message(
"answer",
{
"answer": i_answer + 1,
"value": str(answer),
"answerTime_s": (current - last).total_seconds(),
"totalTime_s": (current - start).total_seconds(),
},
dump_answer(
part=i_answer + 1,
answer=answer,
answer_time=current - last,
total_time=current - start,
)
else:
print(

View File

@ -13,6 +13,10 @@ class ProgressHandler(Protocol):
def wrap(self, values: Iterable[_T], total: int) -> Iterator[_T]: ...
class FileHandler(Protocol):
def create(self, filename: str, content: bytes, text: bool = False): ...
class BaseSolver:
def __init__(
self,
@ -21,14 +25,14 @@ class BaseSolver:
year: int,
day: int,
progress: ProgressHandler,
outputs: bool = False,
files: FileHandler | None = None,
):
self.logger: Final = logger
self.verbose: Final = verbose
self.year: Final = year
self.day: Final = day
self.progress: Final = progress
self.outputs = outputs
self.files: Final = files
@abstractmethod
def solve(self, input: str) -> Iterator[Any] | None: ...

View File

View File

@ -0,0 +1,13 @@
from .answer import dump_answer
from .base import dump_api_message
from .files import FileHandlerAPI
from .logger import LoggerAPIHandler
from .progress import ProgressAPI
__all__ = [
"dump_answer",
"dump_api_message",
"FileHandlerAPI",
"LoggerAPIHandler",
"ProgressAPI",
]

View File

@ -0,0 +1,16 @@
from datetime import timedelta
from typing import Any
from .base import dump_api_message
def dump_answer(part: int, answer: Any, answer_time: timedelta, total_time: timedelta):
dump_api_message(
"answer",
{
"answer": part,
"value": str(answer),
"answerTime_s": answer_time.total_seconds(),
"totalTime_s": total_time.total_seconds(),
},
)

View File

@ -0,0 +1,28 @@
import json
import sys
from datetime import datetime
from typing import Any, Literal, TextIO
def _datetime_formatter(value: Any) -> Any:
if isinstance(value, datetime):
return value.isoformat()
else:
return value
def dump_api_message(
type: Literal[
"log", "answer", "file", "progress-start", "progress-step", "progress-end"
],
content: Any,
file: TextIO = sys.stdout,
):
print(
json.dumps(
{"type": type, "time": datetime.now(), "content": content},
default=_datetime_formatter,
),
flush=True,
file=file,
)

View File

@ -0,0 +1,15 @@
from pathlib import Path
from typing import Final
from .base import dump_api_message
class FileHandlerAPI:
def __init__(self, folder: Path):
self.folder: Final = folder
def create(self, filename: str, content: bytes, text: bool = False):
self.folder.mkdir(exist_ok=True)
with open(self.folder.joinpath(filename), "wb") as fp:
fp.write(content)
dump_api_message("file", {"filename": filename, "size": len(content)})

View File

@ -0,0 +1,16 @@
import logging.handlers
import sys
from typing import TextIO
from .base import dump_api_message
class LoggerAPIHandler(logging.Handler):
def __init__(self, output: TextIO = sys.stdout):
super().__init__()
self.output = output
def emit(self, record: logging.LogRecord):
dump_api_message(
"log", {"level": record.levelname, "message": record.getMessage()}
)

View File

@ -0,0 +1,57 @@
import sys
from datetime import datetime, timedelta
from typing import Iterable, Iterator, Sequence, TextIO, TypeVar
from .base import dump_api_message
_T = TypeVar("_T")
class ProgressAPI:
def __init__(
self,
min_step: int = 1,
min_time: timedelta = timedelta(milliseconds=100),
output: TextIO = sys.stdout,
):
super().__init__()
self.counter = 0
self.output = output
self.min_step = min_step
self.min_time = min_time
def wrap(
self, values: Sequence[_T] | Iterable[_T], total: int | None = None
) -> Iterator[_T]:
total = total or len(values) # type: ignore
current = self.counter
self.counter += 1
dump_api_message("progress-start", {"counter": current, "total": total})
try:
percent = 0
time = datetime.now()
for i_value, value in enumerate(values):
yield value
if datetime.now() - time < self.min_time:
continue
time = datetime.now()
c_percent = round(i_value / total * 100)
if c_percent >= percent + self.min_step:
dump_api_message(
"progress-step", {"counter": current, "percent": c_percent}
)
percent = c_percent
finally:
dump_api_message(
"progress-end",
{"counter": current},
)

View File

@ -0,0 +1,18 @@
import logging
from pathlib import Path
from typing import Final
class SimpleFileHandler:
def __init__(self, logger: logging.Logger, folder: Path):
self.logger: Final = logger
self.folder: Final = folder
def create(self, filename: str, content: bytes, text: bool = False):
if text:
for line in content.decode("utf-8").splitlines():
self.logger.info(line)
else:
self.folder.mkdir(exist_ok=True)
with open(self.folder.joinpath(filename), "wb") as fp:
fp.write(content)

View File

@ -0,0 +1,19 @@
from typing import Iterable, Iterator, Sequence, TypeVar
_T = TypeVar("_T")
class ProgressTQDM:
def wrap(
self, values: Sequence[_T] | Iterable[_T], total: int | None = None
) -> Iterator[_T]:
from tqdm import tqdm
return iter(tqdm(values, total=total))
class ProgressNone:
def wrap(
self, values: Sequence[_T] | Iterable[_T], total: int | None = None
) -> Iterator[_T]:
return iter(values)