Distributed Query Planning
Concept. A distributed query splits work across many machines by partitioning the input tables, executing local plans on each shard, and shuffling intermediate results so matching keys land together.
Intuition. When Listens is 100 TB and lives on 1,000 machines, no one machine can join it to Users alone. The optimizer hashes both tables on user_id, ships each user's rows to one shard, then runs the same join algorithm locally on every shard.
Scaling Queries Across Multiple Machines
When your data outgrows a single machine, you don't just need more storage. You need a strategy. Queries must be sliced, diced, and reassembled with precision.
100TB
Beyond single node capacity
1000 Nodes
Parallel execution
Shuffle
Data movement
100x Faster
Compared to single node
What is "Shuffle"?
Shuffle is data relocation across nodes. It gets all rows with the same key onto the same machine before joining or grouping. It's essentially hash partitioning over the network.
Figure: rows starting on multiple nodes are hashed on the join key and shipped across the network so that all rows with the same key land on the same target node, ready for a local join.
| Local hash partitioning | Distributed shuffle |
|---|---|
hash(key) % B picks a partition file on the same disk |
hash(key) % nodes picks a target machine on the network |
| Output: B files on local disk | Output: each node receives the keys it owns |
| Cost: 2N IO | Cost: 2N IO + network transfer |
Same algorithm, different destination. See Hash Partitioning for the local primitive.
Core Concepts of Distributed Planning
Fragmentation
The optimizer dissects the logical plan into independent fragments. Each node runs the same code but on its own data slice.
Shuffle Strategies
| Strategy | Mechanism | Best For | Network Cost |
|---|---|---|---|
| Broadcast Shuffle | Send a copy of the entire table to every node. | Joining a tiny table with a massive one. | Small Table Size × # Nodes |
| Hash Shuffle | Redistribute both tables by a common hash key. | Joining two large tables. | Total Size of Both Tables |
| Co-located (Zero Shuffle) | No movement; data is already partitioned correctly. | Tables pre-partitioned on the same key. | Zero Network I/O |
Reaggregation Patterns: The Two-Phase Execution
Problem: Global grouping across 1,000 nodes would flood the network with billions of minuscule records.
Phase 1: Local Pre-Aggregation
Each node groups and aggregates its local data.
-
Input: 1,000,000 "listen" rows per node.
-
Output: 1,000 "artist totals" per node.
-
Impact: Shrinks data by 1,000x before network transfer.
Phase 2: Global Merge
Partial results are shuffled by the grouping key (e.g., artist_id) to a coordinator node.
-
Action: Coordinator aggregates partial counts to derive the true total.
-
Why: Ensures accuracy while reducing network load.
Optional: Real Systems Comparison
| System | Architecture | Key Optimization | Shuffle Strategy |
|---|---|---|---|
| Apache Spark | In-memory RDDs | Catalyst optimizer + code gen | Hash & broadcast joins |
| BigQuery (Dremel) | Columnar tree | Hierarchical aggregation | Minimize data movement |
| Snowflake | Storage/compute separation | Micro-partitions + caching | Automatic clustering |
| Presto/Trino | Pipelined execution | Dynamic filtering | Statistics-driven |
Next: Spotify End-to-End ties everything together. See how one click on a play button cascades through OLTP writes, ETL transforms (with the shuffle and broadcast joins from this page), and OLAP dashboards.