Co-authored-by: Mikael CAPELLE <mikael.capelle@thalesaleniaspace.com> Co-authored-by: Mikaël Capelle <capelle.mikael@gmail.com> Reviewed-on: #3
135 lines
4.8 KiB
Python
135 lines
4.8 KiB
Python
import operator
|
|
from math import prod
|
|
from typing import Any, Iterator, Literal, TypeAlias, cast
|
|
|
|
from ..base import BaseSolver
|
|
|
|
Category: TypeAlias = Literal["x", "m", "a", "s"]
|
|
Part: TypeAlias = dict[Category, int]
|
|
PartWithBounds: TypeAlias = dict[Category, tuple[int, int]]
|
|
|
|
OPERATORS = {"<": operator.lt, ">": operator.gt}
|
|
|
|
# None if there is no check (last entry), otherwise (category, sense, value)
|
|
Check: TypeAlias = tuple[Category, Literal["<", ">"], int] | None
|
|
|
|
# workflow as a list of check, in specified order, with target
|
|
Workflow: TypeAlias = list[tuple[Check, str]]
|
|
|
|
|
|
class Solver(BaseSolver):
|
|
def accept(self, workflows: dict[str, Workflow], part: Part) -> bool:
|
|
workflow = "in"
|
|
decision: bool | None = None
|
|
|
|
while decision is None:
|
|
for check, target in workflows[workflow]:
|
|
passed = check is None
|
|
if check is not None:
|
|
category, sense, value = check
|
|
passed = OPERATORS[sense](part[category], value)
|
|
|
|
if passed:
|
|
if target in workflows:
|
|
workflow = target
|
|
else:
|
|
decision = target == "A"
|
|
break
|
|
|
|
return decision
|
|
|
|
def propagate(self, workflows: dict[str, Workflow], start: PartWithBounds) -> int:
|
|
def _fmt(meta: PartWithBounds) -> str:
|
|
return "{" + ", ".join(f"{k}={v}" for k, v in meta.items()) + "}"
|
|
|
|
def transfer_or_accept(
|
|
target: str, meta: PartWithBounds, queue: list[tuple[PartWithBounds, str]]
|
|
) -> int:
|
|
count = 0
|
|
if target in workflows:
|
|
self.logger.info(f" transfer to {target}")
|
|
queue.append((meta, target))
|
|
elif target == "A":
|
|
count = prod((high - low + 1) for low, high in meta.values())
|
|
self.logger.info(f" accepted ({count})")
|
|
else:
|
|
self.logger.info(" rejected")
|
|
return count
|
|
|
|
accepted = 0
|
|
queue: list[tuple[PartWithBounds, str]] = [(start, "in")]
|
|
|
|
n_iterations = 0
|
|
|
|
while queue:
|
|
n_iterations += 1
|
|
meta, workflow = queue.pop()
|
|
self.logger.info(f"{workflow}: {_fmt(meta)}")
|
|
for check, target in workflows[workflow]:
|
|
if check is None:
|
|
self.logger.info(" end-of-workflow")
|
|
accepted += transfer_or_accept(target, meta, queue)
|
|
continue
|
|
|
|
category, sense, value = check
|
|
bounds, op = meta[category], OPERATORS[sense]
|
|
|
|
self.logger.info(
|
|
f" checking {_fmt(meta)} against {category} {sense} {value}"
|
|
)
|
|
|
|
if not op(bounds[0], value) and not op(bounds[1], value):
|
|
self.logger.info(" reject, always false")
|
|
continue
|
|
|
|
if op(meta[category][0], value) and op(meta[category][1], value):
|
|
self.logger.info(" accept, always true")
|
|
accepted += transfer_or_accept(target, meta, queue)
|
|
break
|
|
|
|
meta2 = meta.copy()
|
|
low, high = meta[category]
|
|
if sense == "<":
|
|
meta[category], meta2[category] = (value, high), (low, value - 1)
|
|
else:
|
|
meta[category], meta2[category] = (low, value), (value + 1, high)
|
|
self.logger.info(f" split {_fmt(meta2)} ({target}), {_fmt(meta)}")
|
|
|
|
accepted += transfer_or_accept(target, meta2, queue)
|
|
|
|
self.logger.info(f"run took {n_iterations} iterations")
|
|
return accepted
|
|
|
|
def solve(self, input: str) -> Iterator[Any]:
|
|
workflows_s, parts_s = input.split("\n\n")
|
|
|
|
workflows: dict[str, Workflow] = {}
|
|
for workflow_s in workflows_s.split("\n"):
|
|
name, block_s = workflow_s.split("{")
|
|
workflows[name] = []
|
|
|
|
for block in block_s[:-1].split(","):
|
|
check: Check
|
|
if (i := block.find(":")) >= 0:
|
|
check = (
|
|
cast(Category, block[0]),
|
|
cast(Literal["<", ">"], block[1]),
|
|
int(block[2:i]),
|
|
)
|
|
target = block[i + 1 :]
|
|
else:
|
|
check, target = None, block
|
|
workflows[name].append((check, target))
|
|
|
|
# part 1
|
|
parts: list[Part] = [
|
|
{cast(Category, s[0]): int(s[2:]) for s in part_s[1:-1].split(",")}
|
|
for part_s in parts_s.split("\n")
|
|
]
|
|
yield sum(sum(part.values()) for part in parts if self.accept(workflows, part))
|
|
|
|
# part 2
|
|
yield self.propagate(
|
|
workflows, {cast(Category, c): (1, 4000) for c in ["x", "m", "a", "s"]}
|
|
)
|