← Back to Week 16: Algorithmic GT & Capstone

Day 112: Capstone — Multi-Robot Delivery Game

Phase VII — Game Theory for Optimization & Robotics | Week 16 | 2.5 hours Synthesize everything: auctions, congestion, robustness, and learning dynamics in a full warehouse fleet simulation.

OKS Relevance

This capstone IS the OKS fleet problem. Robots compete for aisle capacity while collaboratively completing pick-and-deliver orders. The simulation integrates: auction-based task allocation (Day 110), congestion-aware routing (Day 109), robust replanning when corridors are blocked (Day 111), and decentralized learning dynamics (Days 107-108). The Price of Anarchy directly measures the cost of decentralization in the OKS warehouse.


Theory (45 min)

112.1 Problem Formulation

Warehouse model: A graph $G = (V, E)$ representing the warehouse floor. Nodes are intersections/stations, edges are corridors with capacity and delay functions.

Components integrated:

Component Game-theoretic concept Day reference
Task allocation Combinatorial auction + VCG Day 110
Corridor routing Congestion game Day 109
Edge failures Robust optimization (game vs nature) Day 111
Online adaptation No-regret learning (MWU) Day 108
Equilibrium analysis Nash equilibrium + PoA Days 103, 106

112.2 Auction Phase

Each task $j$ has a value $v_j$, a pickup location $p_j$, and a delivery location $d_j$. Robot $i$ bids $b_i(j) = v_j - c_i(p_j, d_j)$ where $c_i$ is robot $i$'s estimated travel cost. Allocation maximizes welfare via greedy auction.

112.3 Congestion-Aware Routing

After task allocation, each robot chooses a path. The corridor delay on edge $e$ is:

$$d_e(n_e) = \tau_e (1 + \alpha (n_e / c_e)^\beta)$$

where $\tau_e$ is free-flow travel time, $c_e$ is capacity, and $\alpha, \beta$ are BPR parameters. This is a congestion game — the Rosenthal potential guarantees a pure NE exists.

112.4 Robust Replanning

When an edge fails (blocked corridor), robots must reroute. The robust formulation:

$$\min_{\text{paths}} \max_{\text{edge failure} \in \mathcal{F}} \text{total delay}$$

where $\mathcal{F}$ is the set of possible single-edge failures. This is a minimax game against nature.

112.5 Analysis Metrics

  • Social welfare: Total value of completed tasks minus total travel cost
  • Price of Anarchy: $\text{PoA} = W^{\text{opt}} / W^{\text{NE}}$ (welfare ratio)
  • Convergence: How many rounds until learning dynamics stabilize
  • Robustness: Welfare degradation under worst-case failures

Implementation (60 min)

"""Capstone: Multi-Robot Delivery Game — full warehouse fleet simulation."""
import numpy as np
from collections import defaultdict
from typing import Optional
import heapq

class WarehouseGraph:
    """Warehouse floor as a weighted graph with congestion."""

    def __init__(self, n_rows: int = 4, n_cols: int = 5):
        self.n_rows = n_rows
        self.n_cols = n_cols
        self.nodes = [(r, c) for r in range(n_rows) for c in range(n_cols)]
        self.edges = {}
        self.failed_edges = set()

        # Create grid edges with free-flow times
        for r in range(n_rows):
            for c in range(n_cols):
                for dr, dc in [(0, 1), (1, 0)]:
                    nr, nc = r + dr, c + dc
                    if 0 <= nr < n_rows and 0 <= nc < n_cols:
                        e = ((r, c), (nr, nc))
                        self.edges[e] = {'tau': 1.0, 'capacity': 2, 'alpha': 0.5, 'beta': 2}
                        self.edges[(e[1], e[0])] = self.edges[e]  # bidirectional

    def delay(self, edge: tuple, congestion: int) -> float:
        """BPR delay function."""
        if edge in self.failed_edges:
            return float('inf')
        e = self.edges.get(edge)
        if e is None:
            return float('inf')
        return e['tau'] * (1 + e['alpha'] * (congestion / e['capacity']) ** e['beta'])

    def shortest_path(self, src, dst, edge_congestion: dict) -> tuple:
        """Dijkstra with congestion-aware delays."""
        dist = {n: float('inf') for n in self.nodes}
        dist[src] = 0
        prev = {}
        pq = [(0, src)]

        while pq:
            d, u = heapq.heappop(pq)
            if d > dist[u]:
                continue
            if u == dst:
                break
            for dr, dc in [(0, 1), (0, -1), (1, 0), (-1, 0)]:
                v = (u[0] + dr, u[1] + dc)
                if v not in dist:
                    continue
                edge = (u, v)
                cong = edge_congestion.get(edge, 0)
                w = self.delay(edge, cong + 1)
                if dist[u] + w < dist[v]:
                    dist[v] = dist[u] + w
                    prev[v] = u
                    heapq.heappush(pq, (dist[v], v))

        # Reconstruct path
        if dist[dst] == float('inf'):
            return [], float('inf')
        path = []
        node = dst
        while node != src:
            path.append(node)
            node = prev[node]
        path.append(src)
        path.reverse()
        return path, dist[dst]


class MultiRobotDeliveryGame:
    """Full simulation: auction + congestion routing + robust replanning."""

    def __init__(self, n_robots: int = 6, n_tasks: int = 8, seed: int = 42):
        self.rng = np.random.default_rng(seed)
        self.n_robots = n_robots
        self.n_tasks = n_tasks
        self.graph = WarehouseGraph()
        self.nodes = self.graph.nodes

        # Robot positions
        self.robot_positions = [self.nodes[self.rng.integers(len(self.nodes))]
                                for _ in range(n_robots)]

        # Tasks: (pickup, delivery, value)
        self.tasks = []
        for _ in range(n_tasks):
            p = self.nodes[self.rng.integers(len(self.nodes))]
            d = self.nodes[self.rng.integers(len(self.nodes))]
            while d == p:
                d = self.nodes[self.rng.integers(len(self.nodes))]
            v = self.rng.uniform(5, 20)
            self.tasks.append({'pickup': p, 'delivery': d, 'value': v})

    def auction_allocate(self) -> dict:
        """Greedy auction-based task allocation."""
        allocation = {i: [] for i in range(self.n_robots)}
        assigned = set()

        # Compute bids: value - estimated travel cost
        bids = []
        for i in range(self.n_robots):
            for j in range(self.n_tasks):
                task = self.tasks[j]
                # Manhattan distance as cost estimate
                cost = (abs(self.robot_positions[i][0] - task['pickup'][0]) +
                        abs(self.robot_positions[i][1] - task['pickup'][1]) +
                        abs(task['pickup'][0] - task['delivery'][0]) +
                        abs(task['pickup'][1] - task['delivery'][1]))
                bid = task['value'] - cost
                bids.append((bid, i, j))

        bids.sort(reverse=True)
        robot_busy = set()

        for bid, robot, task in bids:
            if task not in assigned and robot not in robot_busy:
                allocation[robot].append(task)
                assigned.add(task)
                robot_busy.add(robot)

        return allocation

    def compute_routes(self, allocation: dict) -> dict:
        """Compute congestion-aware routes for all robots."""
        edge_congestion = defaultdict(int)
        routes = {}

        # Sequential routing (simple; could iterate for equilibrium)
        for robot, tasks in allocation.items():
            if not tasks:
                routes[robot] = {'path': [], 'cost': 0}
                continue

            task = self.tasks[tasks[0]]
            pos = self.robot_positions[robot]

            # Route: current -> pickup -> delivery
            path1, cost1 = self.graph.shortest_path(pos, task['pickup'],
                                                     edge_congestion)
            for k in range(len(path1) - 1):
                edge_congestion[(path1[k], path1[k+1])] += 1

            path2, cost2 = self.graph.shortest_path(task['pickup'],
                                                     task['delivery'],
                                                     edge_congestion)
            for k in range(len(path2) - 1):
                edge_congestion[(path2[k], path2[k+1])] += 1

            full_path = path1 + path2[1:]
            routes[robot] = {'path': full_path, 'cost': cost1 + cost2}

        return routes, edge_congestion

    def robust_reroute(self, routes: dict, failed_edge: tuple) -> dict:
        """Reroute after edge failure."""
        self.graph.failed_edges.add(failed_edge)
        self.graph.failed_edges.add((failed_edge[1], failed_edge[0]))

        new_routes, new_congestion = self.compute_routes(
            {r: [self.tasks.index(self.tasks[routes[r].get('task_idx', 0)])]
             if routes[r]['path'] else []
             for r in routes}
        )

        self.graph.failed_edges.clear()
        return new_routes

    def compute_welfare(self, allocation: dict, routes: dict) -> dict:
        """Compute social welfare and metrics."""
        total_value = 0
        total_cost = 0

        for robot, tasks in allocation.items():
            for task_idx in tasks:
                total_value += self.tasks[task_idx]['value']
            total_cost += routes[robot]['cost']

        return {
            'welfare': total_value - total_cost,
            'total_value': total_value,
            'total_cost': total_cost,
            'tasks_completed': sum(len(t) for t in allocation.values()),
        }

    def compute_poa(self, n_samples: int = 100) -> float:
        """Estimate PoA by comparing auction vs random allocations."""
        # Auction allocation (equilibrium proxy)
        alloc_auction = self.auction_allocate()
        routes_auction, _ = self.compute_routes(alloc_auction)
        welfare_auction = self.compute_welfare(alloc_auction, routes_auction)

        # Best of random allocations (optimum proxy)
        best_welfare = welfare_auction['welfare']
        for _ in range(n_samples):
            perm = self.rng.permutation(self.n_tasks)
            alloc_rand = {i: [] for i in range(self.n_robots)}
            for idx, task_idx in enumerate(perm[:self.n_robots]):
                alloc_rand[idx % self.n_robots].append(int(task_idx))
            routes_rand, _ = self.compute_routes(alloc_rand)
            w = self.compute_welfare(alloc_rand, routes_rand)
            best_welfare = max(best_welfare, w['welfare'])

        poa = best_welfare / max(welfare_auction['welfare'], 1e-10)
        return poa

    def run_simulation(self) -> dict:
        """Run full simulation pipeline."""
        print("=" * 60)
        print("MULTI-ROBOT DELIVERY GAME SIMULATION")
        print("=" * 60)
        print(f"Robots: {self.n_robots}, Tasks: {self.n_tasks}")
        print(f"Grid: {self.graph.n_rows} x {self.graph.n_cols}")

        # Phase 1: Auction
        print("\n--- Phase 1: Auction Allocation ---")
        allocation = self.auction_allocate()
        for r, tasks in allocation.items():
            if tasks:
                t = self.tasks[tasks[0]]
                print(f"  Robot {r} at {self.robot_positions[r]}: "
                      f"task {tasks[0]} (val={t['value']:.1f}, "
                      f"{t['pickup']}→{t['delivery']})")

        # Phase 2: Congestion routing
        print("\n--- Phase 2: Congestion-Aware Routing ---")
        routes, congestion = self.compute_routes(allocation)
        for r in range(self.n_robots):
            if routes[r]['path']:
                print(f"  Robot {r}: cost={routes[r]['cost']:.2f}, "
                      f"hops={len(routes[r]['path'])-1}")

        max_cong = max(congestion.values()) if congestion else 0
        print(f"  Max edge congestion: {max_cong}")

        # Phase 3: Welfare analysis
        print("\n--- Phase 3: Welfare Analysis ---")
        metrics = self.compute_welfare(allocation, routes)
        print(f"  Tasks completed: {metrics['tasks_completed']}/{self.n_tasks}")
        print(f"  Total value: {metrics['total_value']:.2f}")
        print(f"  Total cost: {metrics['total_cost']:.2f}")
        print(f"  Social welfare: {metrics['welfare']:.2f}")

        # Phase 4: Price of Anarchy
        print("\n--- Phase 4: Price of Anarchy ---")
        poa = self.compute_poa()
        print(f"  Estimated PoA: {poa:.3f}")

        # Phase 5: Robustness test
        print("\n--- Phase 5: Robustness Under Edge Failure ---")
        if congestion:
            busiest = max(congestion, key=congestion.get)
            print(f"  Failing busiest edge: {busiest} "
                  f"(congestion={congestion[busiest]})")
            self.graph.failed_edges.add(busiest)
            self.graph.failed_edges.add((busiest[1], busiest[0]))
            routes_robust, cong_robust = self.compute_routes(allocation)
            metrics_robust = self.compute_welfare(allocation, routes_robust)
            self.graph.failed_edges.clear()

            degradation = 1 - metrics_robust['welfare'] / max(metrics['welfare'], 1e-10)
            print(f"  Welfare after failure: {metrics_robust['welfare']:.2f}")
            print(f"  Degradation: {degradation:.1%}")

        return {
            'allocation': allocation, 'routes': routes,
            'metrics': metrics, 'poa': poa,
        }

# Run the simulation
game = MultiRobotDeliveryGame(n_robots=6, n_tasks=8, seed=42)
results = game.run_simulation()

# Parameter sensitivity
print("\n--- Parameter Sensitivity ---")
for n_robots in [3, 6, 9, 12]:
    g = MultiRobotDeliveryGame(n_robots=n_robots, n_tasks=8, seed=42)
    alloc = g.auction_allocate()
    routes, cong = g.compute_routes(alloc)
    m = g.compute_welfare(alloc, routes)
    poa = g.compute_poa(n_samples=50)
    print(f"  {n_robots} robots: welfare={m['welfare']:.1f}, "
          f"max_cong={max(cong.values()) if cong else 0}, PoA={poa:.3f}")

Practice Problems (45 min)

P1: Run the simulation with 4, 8, and 16 robots. At what point does congestion start dominating welfare? Plot welfare vs. number of robots.

Answer With 8 tasks on a 4×5 grid: 4 robots — welfare is high (low congestion, not all tasks served); 8 robots — peak welfare (all tasks served, moderate congestion); 16 robots — welfare drops (severe congestion on shared corridors, idle robots). The transition point depends on grid topology and task distribution. Typically, welfare peaks near $n_{\text{robots}} \approx n_{\text{tasks}}$ and declines when congestion cost exceeds marginal task value.

P2: Compare the auction allocation with a round-robin baseline. Measure the welfare gap.

Answer Round-robin: assign task $j$ to robot $j \mod n$, ignoring distances. With 6 robots, 8 tasks: round-robin welfare $\approx 40\%$ lower than auction because it ignores proximity. The auction's greedy matching by bid value accounts for travel cost, producing allocations where nearby robots serve nearby tasks.

P3: Add a second edge failure and measure the additional welfare degradation. Is the degradation superlinear?

Answer With two failures, robots may face severely constrained corridors. If failures are on independent paths, degradation is roughly additive ($\approx 2\times$ single failure). If failures create a bottleneck (disconnecting a region), degradation is superlinear — some tasks become unreachable, causing welfare to drop sharply. This motivates redundant corridor design in warehouse layouts.

Expert Challenges

E1: Add dynamic task arrival (Poisson process) and implement regret-based online task acceptance. Show that MWU-based allocation has sublinear regret.

Hint Each robot runs MWU over action space {accept, reject} for each task type. Loss = travel cost if accepted, opportunity cost if rejected. Over $T$ task arrivals, regret $\leq O(\sqrt{T \log 2})$. Implement by maintaining per-robot weights and updating after each task completion.

E2: Prove that the PoA for your warehouse congestion game with BPR delay functions $d_e(x) = \tau_e(1 + \alpha(x/c_e)^\beta)$ is bounded. Derive the bound as a function of $\alpha, \beta$.

Answer For polynomial delay of degree $\beta$, the PoA is bounded by $\frac{(1+\alpha)}{1 + \alpha/(1+\beta)^{1+1/\beta}}$ (extending Roughgarden-Tardos). For $\beta = 2, \alpha = 0.5$: PoA $\leq 1.5/(1 + 0.5/3^{1.5}) \approx 1.39$. For $\beta = 4$ (BPR standard): PoA $\leq 2.15$. Higher-degree delays allow worse equilibria.

E3: Implement a hybrid controller: centralized allocation + decentralized routing + robust replanning. Compare with fully decentralized and fully centralized baselines. Find the Pareto-optimal operating point.

Answer Fully centralized: optimal welfare but high communication overhead ($O(n^2)$ messages per step). Fully decentralized: low overhead but PoA $\approx 1.3$–$1.5$. Hybrid: centralized allocation (low-frequency, $O(n)$ messages), decentralized routing (zero messages), centralized replanning only on failure ($O(n)$ messages, rare). Pareto frontier: hybrid achieves $>95\%$ of centralized welfare with $<20\%$ of communication cost for typical warehouse parameters (6–12 robots, grid topology).

Connections

  • Back: Synthesizes all of Week 15-16: Days 99-111
  • Forward: This capstone project can be extended into a full OKS fleet simulator
  • OKS: This IS the core OKS fleet coordination problem — auctions for task assignment, congestion-aware routing through warehouse aisles, robust replanning when obstacles or robot failures occur

Self-Check

  • [ ] I can implement a multi-robot delivery game combining auctions, congestion, and robustness
  • [ ] I can compute and interpret the Price of Anarchy for a warehouse scenario
  • [ ] I understand when decentralized coordination approximates centralized optimality
  • [ ] I can analyze the welfare impact of edge failures and design robust rerouting
  • [ ] I can identify the game-theoretic structure in real OKS fleet operations
← Day 111: Robust Optimization Week Index →