Dataset Shuffling at Scale
A 100 TB dataset does not fit in any GPU server's RAM. The classical shuffle algorithm (random permutation of indices) does not work. The training data has to be shuffled, because optimizer convergence demands batches that look statistically independent. The reconciliation is "streaming shuffle": approximate the random permutation well enough that training works, without ever holding the full dataset in memory.
What "shuffle" means at scale
Optimizer convergence assumes that consecutive minibatches see statistically similar gradient distributions. If batches are correlated (e.g., 10 images of cats, then 10 images of dogs), the gradient estimates are biased and convergence stalls or oscillates. The classical fix is to shuffle the dataset uniformly at random and walk through it sequentially.
For datasets that fit in RAM (a few GB), this is trivial: numpy.random.shuffle(indices). For 100 TB datasets streaming in from object store, you cannot hold the index array in any memory tier that responds in microseconds. The streaming shuffle algorithms exist to approximate uniform shuffle within a fixed memory budget.
The two main algorithms
Shard-and-buffer (also called epoch shuffle): the dataset is pre-split into shards (large files, typically 100 MB-1 GB each). At the start of each epoch, the order of shards is randomly permuted. Within each shard, the items are read sequentially (or randomly within the shard). The shuffle "radius" (the maximum distance two consecutive items might have been apart in the original ordering) is roughly the shard size in items.
Window shuffle (PyTorch DataLoader's shuffle=True over an IterableDataset, MosaicML StreamingDataset's shuffle_block_size): each reader maintains a buffer of K shards in memory. It samples randomly from the buffer to produce the next item, then refills the buffer with the next shard. Shuffle radius = K * items_per_shard. Larger K = better shuffle, more memory.
These two compose. The standard production setup is: shard the dataset into 100 MB chunks, shuffle the order of chunks at epoch start, run a window-shuffle of K=16 to 64 chunks per reader, and use multiple readers per training rank.
What "shuffle radius" buys you
The math gets concrete. With 4 reader workers per training rank, each holding a 16-shard window, the shuffle radius is 64 shards visible at any moment. If each shard holds 100K items, that is 6.4M items in a randomly-sampled buffer. For most training workloads, that is far better than what convergence requires. Going larger (K=128, 256) marginally improves the shuffle quality but linearly increases memory per worker.
The practical question is: how big does the shuffle radius need to be? For most LLM and vision training, a shuffle radius of a few million items is plenty. For class-imbalanced datasets where rare classes appear once per epoch, you may need a larger window or per-class oversampling to ensure each batch sees enough rare-class examples.
What stresses the IO budget
Streaming shuffle has two IO patterns. Shard rotation: when a shard is exhausted, a new one is read from storage. Item-within-shard: items are read sequentially from the cached shard. The first is bandwidth-heavy (reading 100 MB at a time); the second is CPU-bound (deserializing and decoding items).
For PFS-backed training, the shard rotation hits the storage fabric. With K shards * N workers * R training ranks, the active read rate is roughly K*N*R / shard_lifetime shard-fetches per second. For a 4096-rank cluster with 4 workers per rank and K=16 shards, that is 256K shard-fetches per epoch, or roughly 2.5 GB/s of sustained shard rotation traffic.
For S3-backed training, the same shard-fetch traffic crosses the cache boundary. If the shuffle radius exceeds the warm cache capacity, the cache miss rate climbs and S3 read traffic dominates the wall-clock. This is the primary tuning constraint: shuffle radius vs cache size vs S3 cost.
Reproducibility, determinism, and recovery
Streaming shuffle has a determinism problem. Two training runs starting from the same checkpoint and running on the same data should see the same batches if you want bit-exact reproducibility. The standard fix is a deterministic shuffle seed per epoch, so the shard order and the per-worker random state are reproducible.
When training resumes from a checkpoint mid-epoch, the data loader has to recover its position. Some libraries (MosaicML StreamingDataset, NVIDIA DALI) save the data loader state alongside the model checkpoint. Others (vanilla PyTorch DataLoader) require restarting the epoch. The latter is simpler but wastes compute on items you have already trained on.
What this means in practice
- Pre-pack datasets into 100 MB-1 GB shards. WebDataset (
tarfiles), TFRecord, MosaicML StreamingDataset, and NVIDIA DALI's record formats all do this. - For PyTorch users:
torch.utils.data.IterableDatasetwith shuffle is the standard interface. PyTorch's built-in shuffle is a window shuffle. - For larger-scale: switch to MosaicML StreamingDataset or DALI for explicit shuffle radius control and resumable iteration.
- Tune shuffle radius based on convergence behavior, not theoretical purity. Most workloads converge fine at a 1M-10M item shuffle radius.
- Save data-loader state in your checkpoint so resumption is exact. This pairs with checkpoint sharding to make mid-epoch recovery trivial.
- For class-imbalanced datasets: oversample rare classes at the shard-creation stage, not at the shuffle stage. Pre-rebalanced shards mean the shuffle algorithm does not need to know about class distribution.
Streaming shuffle is one of those topics that is invisible when it works and dominates the wall-clock when it does not. Get the shard size and window right and the storage tier feeds the GPUs without anyone noticing it is there.
See also
Updated 2026-05-10