diff --git a/2022/day16.py b/2022/day16.py index 845ae3f..1cc0329 100644 --- a/2022/day16.py +++ b/2022/day16.py @@ -7,10 +7,9 @@ import itertools import re import sys from collections import defaultdict -from typing import NamedTuple +from typing import FrozenSet, NamedTuple -from docplex.mp.model import Model -from docplex.mp.vartype import BinaryVarType +from tqdm import tqdm class Pipe(NamedTuple): @@ -55,6 +54,75 @@ def breadth_first_search(pipes: dict[str, Pipe], pipe_1: Pipe, pipe_2: Pipe) -> return -1 +def update_with_better( + node_at_times: dict[FrozenSet[Pipe], int], flow: int, flowing: FrozenSet[Pipe] +) -> None: + node_at_times[flowing] = max(node_at_times[flowing], flow) + + +def part_1( + start_pipe: Pipe, + max_time: int, + distances: dict[tuple[Pipe, Pipe], int], + relevant_pipes: FrozenSet[Pipe], +): + + node_at_times: dict[int, dict[Pipe, dict[FrozenSet[Pipe], int]]] = defaultdict( + lambda: defaultdict(lambda: defaultdict(lambda: 0)) + ) + node_at_times[0] = {start_pipe: {frozenset(): 0}} + + for time in range(max_time): + for c_pipe, nodes in node_at_times[time].items(): + for flowing, flow in nodes.items(): + for target in relevant_pipes: + + distance = distances[c_pipe, target] + 1 + if time + distance >= max_time or target in flowing: + continue + + update_with_better( + node_at_times[time + distance][target], + flow + sum(pipe.flow for pipe in flowing) * distance, + flowing | {target}, + ) + + update_with_better( + node_at_times[max_time][c_pipe], + flow + sum(pipe.flow for pipe in flowing) * (max_time - time), + flowing, + ) + + return max( + flow + for nodes_of_pipe in node_at_times[max_time].values() + for flow in nodes_of_pipe.values() + ) + + +def part_2( + start_pipe: Pipe, + max_time: int, + distances: dict[tuple[Pipe, Pipe], int], + relevant_pipes: FrozenSet[Pipe], +): + def compute(pipes_for_me: FrozenSet[Pipe]) -> int: + return part_1(start_pipe, max_time, distances, pipes_for_me) + part_1( + start_pipe, max_time, distances, relevant_pipes - pipes_for_me + ) + + combs = [ + frozenset(relevant_pipes_1) + for r in range(2, len(relevant_pipes) // 2 + 1) + for relevant_pipes_1 in itertools.combinations(relevant_pipes, r) + ] + + return max(compute(comb) for comb in tqdm(combs)) + + +# === MAIN === + + lines = sys.stdin.read().splitlines() @@ -77,171 +145,11 @@ for pipe_1 in pipes.values(): distances[pipe_1, pipe_2] = breadth_first_search(pipes, pipe_1, pipe_2) # valves with flow -relevant_pipes = [pipe for pipe in pipes.values() if pipe.flow > 0] +relevant_pipes = frozenset(pipe for pipe in pipes.values() if pipe.flow > 0) -# nodes: list[tuple[Pipe, int, int, list[Pipe]]] = [(start_pipe, 0, 0, [])] -# best_flow: int = 0 +# 1651, 1653 +print(part_1(pipes["AA"], 30, distances, relevant_pipes)) -# while nodes: -# current, time, flow, flowing = nodes.pop(0) - -# if time == max_time: -# if flow > best_flow: -# best_flow = flow -# continue - -# next_nodes: list[tuple[Pipe, int, int, list[Pipe]]] = [] -# for target in relevant_pipes: - -# if target is current or target in flowing: -# continue - -# distance = distances[current, target] + 1 - -# if time + distance >= max_time: -# continue - -# next_nodes.append( -# ( -# target, -# time + distance, -# flow + distance * sum(pipe.flow for pipe in flowing) + target.flow, -# flowing + [target], -# ) -# ) - -# # print(time, current, flow, next_nodes) - -# if not next_nodes: -# next_nodes.append( -# ( -# current, -# max_time, -# flow + sum(pipe.flow for pipe in flowing) * (max_time - time - 1), -# flowing, -# ) -# ) - -# nodes.extend(next_nodes) - -# # if time >= 4: -# # break - -# print(best_flow) - - -# nodes = [best] -# while nodes[-1].parent is not None: -# nodes.append(nodes[-1].parent) -# nodes = list(reversed(nodes)) - -# for node in nodes: -# print(node.time, node.valve, node.flow, node.flowing) - -# -start_pipe = pipes["AA"] -max_time = 30 -ee = [0] - -# max_time = 26 -# ee = [0, 1] - -m = Model() - -var_out: dict[Pipe, dict[Pipe, BinaryVarType]] = { - pipe: m.binary_var_dict(relevant_pipes) for pipe in relevant_pipes + [start_pipe] -} -var_in: dict[Pipe, dict[Pipe, BinaryVarType]] = {pipe: {} for pipe in relevant_pipes} -for p1 in var_out: - for p2 in var_out[p1]: - var_in[p2][p1] = var_out[p1][p2] - -open_at: dict[tuple[int, Pipe], BinaryVarType] = m.continuous_var_dict( - ( - (t, pipe) - for t, pipe in itertools.product(range(max_time), [start_pipe] + relevant_pipes) - ), - lb=0, - ub=1, -) - -for time, pipe in itertools.product(range(max_time), relevant_pipes): - m.add_constraint(open_at[time, pipe] <= m.sum()) - - -for e in ee: - m.add_constraint(open_at[e, 0, start_pipe] == 1) - -for e, pipe in itertools.product(ee, relevant_pipes): - m.add_constraint(open_at[e, 0, pipe] == 0) - -for e, t, p1 in itertools.product(ee, range(max_time), relevant_pipes): - from_time_and_pipe = [ - (p2, t - distances[p2, p1] - 1) - for p2 in relevant_pipes + [start_pipe] - if t - distances[p2, p1] - 1 >= 0 and p2 is not p1 - ] - - if from_time_and_pipe: - m.add_constraint( - open_at[e, t, p1] - <= m.sum(open_at[e, t2, p2] for p2, t2 in from_time_and_pipe) - ) - else: - m.add_constraint(open_at[e, t, p1] == 0) - -for pipe in relevant_pipes + [start_pipe]: - m.add_constraint( - m.sum(open_at[e, t, pipe] for e, t in itertools.product(ee, range(max_time))) - <= 1 - ) -for e, t in itertools.product(ee, range(max_time)): - m.add_constraint( - m.sum(open_at[e, t, pipe] for pipe in relevant_pipes + [start_pipe]) <= 1 - ) - -# keeps flowing -flowing_at = { - (t, pipe): m.sum( - open_at[e, t2, pipe] for e, t2 in itertools.product(ee, range(0, t)) - ) - for t, pipe in itertools.product(range(max_time), relevant_pipes) -} - - -# objective -m.set_objective( - "max", - m.sum( - flowing_at[t, pipe] * pipe.flow - for t, pipe in itertools.product(range(max_time), relevant_pipes) - ), -) - -m.log_output = True -s = m.solve() - -print(s.get_objective_value()) - - -for t in range(max_time): - opent = { - e: [ - pipe - for pipe in relevant_pipes + [start_pipe] - if s.get_value(open_at[e, t, pipe]) > 1e-8 - ] - for e in ee - } - flowing = [ - pipe - for pipe in relevant_pipes - if any(s.get_value(flowing_at[t, pipe]) > 1e-8 for e in ee) - ] - - assert all(len(opent[e]) <= 1 for e in ee) - - o = [opent[e][0] if opent[e] else "-" for e in ee] - - print(f"t={t}, open={o}, flowing={flowing}") +# 1707, 2223 +print(part_2(pipes["AA"], 26, distances, relevant_pipes))