Phase VII — Game Theory for Optimization & Robotics | Week 16 | 2.5 hours Synthesize everything: auctions, congestion, robustness, and learning dynamics in a full warehouse fleet simulation.
This capstone IS the OKS fleet problem. Robots compete for aisle capacity while collaboratively completing pick-and-deliver orders. The simulation integrates: auction-based task allocation (Day 110), congestion-aware routing (Day 109), robust replanning when corridors are blocked (Day 111), and decentralized learning dynamics (Days 107-108). The Price of Anarchy directly measures the cost of decentralization in the OKS warehouse.
Warehouse model: A graph $G = (V, E)$ representing the warehouse floor. Nodes are intersections/stations, edges are corridors with capacity and delay functions.
Components integrated:
| Component | Game-theoretic concept | Day reference |
|---|---|---|
| Task allocation | Combinatorial auction + VCG | Day 110 |
| Corridor routing | Congestion game | Day 109 |
| Edge failures | Robust optimization (game vs nature) | Day 111 |
| Online adaptation | No-regret learning (MWU) | Day 108 |
| Equilibrium analysis | Nash equilibrium + PoA | Days 103, 106 |
Each task $j$ has a value $v_j$, a pickup location $p_j$, and a delivery location $d_j$. Robot $i$ bids $b_i(j) = v_j - c_i(p_j, d_j)$ where $c_i$ is robot $i$'s estimated travel cost. Allocation maximizes welfare via greedy auction.
After task allocation, each robot chooses a path. The corridor delay on edge $e$ is:
$$d_e(n_e) = \tau_e (1 + \alpha (n_e / c_e)^\beta)$$
where $\tau_e$ is free-flow travel time, $c_e$ is capacity, and $\alpha, \beta$ are BPR parameters. This is a congestion game — the Rosenthal potential guarantees a pure NE exists.
When an edge fails (blocked corridor), robots must reroute. The robust formulation:
$$\min_{\text{paths}} \max_{\text{edge failure} \in \mathcal{F}} \text{total delay}$$
where $\mathcal{F}$ is the set of possible single-edge failures. This is a minimax game against nature.
"""Capstone: Multi-Robot Delivery Game — full warehouse fleet simulation."""
import numpy as np
from collections import defaultdict
from typing import Optional
import heapq
class WarehouseGraph:
"""Warehouse floor as a weighted graph with congestion."""
def __init__(self, n_rows: int = 4, n_cols: int = 5):
self.n_rows = n_rows
self.n_cols = n_cols
self.nodes = [(r, c) for r in range(n_rows) for c in range(n_cols)]
self.edges = {}
self.failed_edges = set()
# Create grid edges with free-flow times
for r in range(n_rows):
for c in range(n_cols):
for dr, dc in [(0, 1), (1, 0)]:
nr, nc = r + dr, c + dc
if 0 <= nr < n_rows and 0 <= nc < n_cols:
e = ((r, c), (nr, nc))
self.edges[e] = {'tau': 1.0, 'capacity': 2, 'alpha': 0.5, 'beta': 2}
self.edges[(e[1], e[0])] = self.edges[e] # bidirectional
def delay(self, edge: tuple, congestion: int) -> float:
"""BPR delay function."""
if edge in self.failed_edges:
return float('inf')
e = self.edges.get(edge)
if e is None:
return float('inf')
return e['tau'] * (1 + e['alpha'] * (congestion / e['capacity']) ** e['beta'])
def shortest_path(self, src, dst, edge_congestion: dict) -> tuple:
"""Dijkstra with congestion-aware delays."""
dist = {n: float('inf') for n in self.nodes}
dist[src] = 0
prev = {}
pq = [(0, src)]
while pq:
d, u = heapq.heappop(pq)
if d > dist[u]:
continue
if u == dst:
break
for dr, dc in [(0, 1), (0, -1), (1, 0), (-1, 0)]:
v = (u[0] + dr, u[1] + dc)
if v not in dist:
continue
edge = (u, v)
cong = edge_congestion.get(edge, 0)
w = self.delay(edge, cong + 1)
if dist[u] + w < dist[v]:
dist[v] = dist[u] + w
prev[v] = u
heapq.heappush(pq, (dist[v], v))
# Reconstruct path
if dist[dst] == float('inf'):
return [], float('inf')
path = []
node = dst
while node != src:
path.append(node)
node = prev[node]
path.append(src)
path.reverse()
return path, dist[dst]
class MultiRobotDeliveryGame:
"""Full simulation: auction + congestion routing + robust replanning."""
def __init__(self, n_robots: int = 6, n_tasks: int = 8, seed: int = 42):
self.rng = np.random.default_rng(seed)
self.n_robots = n_robots
self.n_tasks = n_tasks
self.graph = WarehouseGraph()
self.nodes = self.graph.nodes
# Robot positions
self.robot_positions = [self.nodes[self.rng.integers(len(self.nodes))]
for _ in range(n_robots)]
# Tasks: (pickup, delivery, value)
self.tasks = []
for _ in range(n_tasks):
p = self.nodes[self.rng.integers(len(self.nodes))]
d = self.nodes[self.rng.integers(len(self.nodes))]
while d == p:
d = self.nodes[self.rng.integers(len(self.nodes))]
v = self.rng.uniform(5, 20)
self.tasks.append({'pickup': p, 'delivery': d, 'value': v})
def auction_allocate(self) -> dict:
"""Greedy auction-based task allocation."""
allocation = {i: [] for i in range(self.n_robots)}
assigned = set()
# Compute bids: value - estimated travel cost
bids = []
for i in range(self.n_robots):
for j in range(self.n_tasks):
task = self.tasks[j]
# Manhattan distance as cost estimate
cost = (abs(self.robot_positions[i][0] - task['pickup'][0]) +
abs(self.robot_positions[i][1] - task['pickup'][1]) +
abs(task['pickup'][0] - task['delivery'][0]) +
abs(task['pickup'][1] - task['delivery'][1]))
bid = task['value'] - cost
bids.append((bid, i, j))
bids.sort(reverse=True)
robot_busy = set()
for bid, robot, task in bids:
if task not in assigned and robot not in robot_busy:
allocation[robot].append(task)
assigned.add(task)
robot_busy.add(robot)
return allocation
def compute_routes(self, allocation: dict) -> dict:
"""Compute congestion-aware routes for all robots."""
edge_congestion = defaultdict(int)
routes = {}
# Sequential routing (simple; could iterate for equilibrium)
for robot, tasks in allocation.items():
if not tasks:
routes[robot] = {'path': [], 'cost': 0}
continue
task = self.tasks[tasks[0]]
pos = self.robot_positions[robot]
# Route: current -> pickup -> delivery
path1, cost1 = self.graph.shortest_path(pos, task['pickup'],
edge_congestion)
for k in range(len(path1) - 1):
edge_congestion[(path1[k], path1[k+1])] += 1
path2, cost2 = self.graph.shortest_path(task['pickup'],
task['delivery'],
edge_congestion)
for k in range(len(path2) - 1):
edge_congestion[(path2[k], path2[k+1])] += 1
full_path = path1 + path2[1:]
routes[robot] = {'path': full_path, 'cost': cost1 + cost2}
return routes, edge_congestion
def robust_reroute(self, routes: dict, failed_edge: tuple) -> dict:
"""Reroute after edge failure."""
self.graph.failed_edges.add(failed_edge)
self.graph.failed_edges.add((failed_edge[1], failed_edge[0]))
new_routes, new_congestion = self.compute_routes(
{r: [self.tasks.index(self.tasks[routes[r].get('task_idx', 0)])]
if routes[r]['path'] else []
for r in routes}
)
self.graph.failed_edges.clear()
return new_routes
def compute_welfare(self, allocation: dict, routes: dict) -> dict:
"""Compute social welfare and metrics."""
total_value = 0
total_cost = 0
for robot, tasks in allocation.items():
for task_idx in tasks:
total_value += self.tasks[task_idx]['value']
total_cost += routes[robot]['cost']
return {
'welfare': total_value - total_cost,
'total_value': total_value,
'total_cost': total_cost,
'tasks_completed': sum(len(t) for t in allocation.values()),
}
def compute_poa(self, n_samples: int = 100) -> float:
"""Estimate PoA by comparing auction vs random allocations."""
# Auction allocation (equilibrium proxy)
alloc_auction = self.auction_allocate()
routes_auction, _ = self.compute_routes(alloc_auction)
welfare_auction = self.compute_welfare(alloc_auction, routes_auction)
# Best of random allocations (optimum proxy)
best_welfare = welfare_auction['welfare']
for _ in range(n_samples):
perm = self.rng.permutation(self.n_tasks)
alloc_rand = {i: [] for i in range(self.n_robots)}
for idx, task_idx in enumerate(perm[:self.n_robots]):
alloc_rand[idx % self.n_robots].append(int(task_idx))
routes_rand, _ = self.compute_routes(alloc_rand)
w = self.compute_welfare(alloc_rand, routes_rand)
best_welfare = max(best_welfare, w['welfare'])
poa = best_welfare / max(welfare_auction['welfare'], 1e-10)
return poa
def run_simulation(self) -> dict:
"""Run full simulation pipeline."""
print("=" * 60)
print("MULTI-ROBOT DELIVERY GAME SIMULATION")
print("=" * 60)
print(f"Robots: {self.n_robots}, Tasks: {self.n_tasks}")
print(f"Grid: {self.graph.n_rows} x {self.graph.n_cols}")
# Phase 1: Auction
print("\n--- Phase 1: Auction Allocation ---")
allocation = self.auction_allocate()
for r, tasks in allocation.items():
if tasks:
t = self.tasks[tasks[0]]
print(f" Robot {r} at {self.robot_positions[r]}: "
f"task {tasks[0]} (val={t['value']:.1f}, "
f"{t['pickup']}→{t['delivery']})")
# Phase 2: Congestion routing
print("\n--- Phase 2: Congestion-Aware Routing ---")
routes, congestion = self.compute_routes(allocation)
for r in range(self.n_robots):
if routes[r]['path']:
print(f" Robot {r}: cost={routes[r]['cost']:.2f}, "
f"hops={len(routes[r]['path'])-1}")
max_cong = max(congestion.values()) if congestion else 0
print(f" Max edge congestion: {max_cong}")
# Phase 3: Welfare analysis
print("\n--- Phase 3: Welfare Analysis ---")
metrics = self.compute_welfare(allocation, routes)
print(f" Tasks completed: {metrics['tasks_completed']}/{self.n_tasks}")
print(f" Total value: {metrics['total_value']:.2f}")
print(f" Total cost: {metrics['total_cost']:.2f}")
print(f" Social welfare: {metrics['welfare']:.2f}")
# Phase 4: Price of Anarchy
print("\n--- Phase 4: Price of Anarchy ---")
poa = self.compute_poa()
print(f" Estimated PoA: {poa:.3f}")
# Phase 5: Robustness test
print("\n--- Phase 5: Robustness Under Edge Failure ---")
if congestion:
busiest = max(congestion, key=congestion.get)
print(f" Failing busiest edge: {busiest} "
f"(congestion={congestion[busiest]})")
self.graph.failed_edges.add(busiest)
self.graph.failed_edges.add((busiest[1], busiest[0]))
routes_robust, cong_robust = self.compute_routes(allocation)
metrics_robust = self.compute_welfare(allocation, routes_robust)
self.graph.failed_edges.clear()
degradation = 1 - metrics_robust['welfare'] / max(metrics['welfare'], 1e-10)
print(f" Welfare after failure: {metrics_robust['welfare']:.2f}")
print(f" Degradation: {degradation:.1%}")
return {
'allocation': allocation, 'routes': routes,
'metrics': metrics, 'poa': poa,
}
# Run the simulation
game = MultiRobotDeliveryGame(n_robots=6, n_tasks=8, seed=42)
results = game.run_simulation()
# Parameter sensitivity
print("\n--- Parameter Sensitivity ---")
for n_robots in [3, 6, 9, 12]:
g = MultiRobotDeliveryGame(n_robots=n_robots, n_tasks=8, seed=42)
alloc = g.auction_allocate()
routes, cong = g.compute_routes(alloc)
m = g.compute_welfare(alloc, routes)
poa = g.compute_poa(n_samples=50)
print(f" {n_robots} robots: welfare={m['welfare']:.1f}, "
f"max_cong={max(cong.values()) if cong else 0}, PoA={poa:.3f}")
P1: Run the simulation with 4, 8, and 16 robots. At what point does congestion start dominating welfare? Plot welfare vs. number of robots.
P2: Compare the auction allocation with a round-robin baseline. Measure the welfare gap.
P3: Add a second edge failure and measure the additional welfare degradation. Is the degradation superlinear?
E1: Add dynamic task arrival (Poisson process) and implement regret-based online task acceptance. Show that MWU-based allocation has sublinear regret.
E2: Prove that the PoA for your warehouse congestion game with BPR delay functions $d_e(x) = \tau_e(1 + \alpha(x/c_e)^\beta)$ is bounded. Derive the bound as a function of $\alpha, \beta$.
E3: Implement a hybrid controller: centralized allocation + decentralized routing + robust replanning. Compare with fully decentralized and fully centralized baselines. Find the Pareto-optimal operating point.