Efficient distributed data loading for distributed data-parallel (DDP) training.
Each MPI rank holds a shard of the full dataset in memory. DDStore exposes a global index space so any rank can read any sample via one-sided remote memory access — either MPI RMA (default) or libfabric RDMA — without coordinator synchronization.
| Dependency | Notes |
|---|---|
| MPI (OpenMPI / MPICH) | mpicc and mpicxx must be on PATH |
| libfabric | Required for RDMA backend (method=1) |
| Python ≥ 3.6 | |
| NumPy, mpi4py, Cython | Python build dependencies |
# Install Python build dependencies
pip install numpy mpi4py Cython
# Build in-place (use with PYTHONPATH=$PWD:$PYTHONPATH)
CC=mpicc CXX=mpicxx python setup.py build_ext --inplace
# Or install into the active virtual environment
CC=mpicc CXX=mpicxx pip install .
# Or install in editable/development mode
CC=mpicc CXX=mpicxx pip install -e .
# Or install directly from GitHub
CC=mpicc CXX=mpicxx pip install git+https://github.com/ORNL/DDStore.gitimport mpi4py
mpi4py.rc.thread_level = "serialized"
mpi4py.rc.threads = False
import numpy as np
from mpi4py import MPI
import pyddstore as dds
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
# Each rank contributes its own shard
store = dds.PyDDStore(comm) # MPI RMA backend (default)
# store = dds.PyDDStore(comm, method=1) # libfabric RDMA backend
data = np.random.rand(1024, 64).astype(np.float32)
store.add("features", data) # collective — all ranks must call
# Read any global sample index
out = np.zeros((1, 64), dtype=np.float32)
store.epoch_begin()
store.get("features", out, start=2048) # global index across all shards
store.epoch_end()
store.free()Run with:
mpirun -n 4 python my_script.py| Parameter | Type | Description |
|---|---|---|
comm |
mpi4py.MPI.Comm |
MPI communicator covering all ranks |
method |
int |
0 = MPI RMA (default), 1 = libfabric RDMA |
ddstore_width |
int or None |
Ranks per DDStore group. None uses all ranks in comm as a single group |
Pre-allocate a named variable without providing data yet. Use update() to fill it in afterwards. Collective.
| Parameter | Type | Description |
|---|---|---|
name |
str |
Variable identifier |
nrows |
int |
Number of rows in this rank's shard |
disp |
int |
Number of elements per row |
itemsize |
int |
Bytes per element (default 1) |
Register a NumPy array as a named variable. Each rank contributes its local shard; the global index space is the concatenation of all shards in rank order. Collective — all ranks in comm must call with the same name.
| Parameter | Type | Description |
|---|---|---|
name |
str |
Variable identifier |
arr |
np.ndarray |
C-contiguous 2-D (or 1-D) array. Supported dtypes: int32, int64, uint8, float32, float64, bool_ |
Overwrite a region of the local shard for a variable registered with init(). Local operation — does not require epoch or barrier.
| Parameter | Type | Description |
|---|---|---|
name |
str |
Variable identifier |
arr |
np.ndarray |
Data to write |
offset |
int |
Row offset within the local shard |
Read arr.shape[0] consecutive rows starting at global index start into arr. The range must fall within a single rank's shard. Must be called inside an epoch_begin / epoch_end pair when using the MPI backend.
| Parameter | Type | Description |
|---|---|---|
name |
str |
Variable identifier |
arr |
np.ndarray |
Pre-allocated, C-contiguous output buffer |
start |
int |
Global row index |
Open and close an MPI RMA access epoch (calls MPI_Win_fence). Collective. Required around get() calls when using method=0. No-op for method=1.
Release all MPI windows and allocated memory. Safe to call after MPI_Finalize.
Uses MPI_Win_create and MPI_Get for one-sided remote reads. Works on any MPI-capable cluster without additional hardware. epoch_begin/epoch_end are required to delimit access epochs.
Uses fi_read for true RDMA transfers over high-speed interconnects (Infiniband/verbs, Cray GNI, Intel PSM2). Lower latency than MPI RMA on supported hardware. epoch_begin/epoch_end are no-ops with this backend.
Set FABRIC_IFACE to select a specific network interface when the automatic selection picks the wrong one:
export FABRIC_IFACE=hsn0 # e.g. Cray Slingshotddstore_width controls how many MPI ranks form a single DDStore group. The global communicator is split so that each group of ddstore_width consecutive ranks shares one independent store, with each group holding a full replica of the dataset partitioned across its members.
Example — 16 ranks, ddstore_width=4:
ranks 0– 3 → DDStore group 0
ranks 4– 7 → DDStore group 1
ranks 8–11 → DDStore group 2
ranks 12–15 → DDStore group 3
This is useful when you want one store per node (e.g. 4 GPUs per node → ddstore_width=4), limiting cross-node RDMA traffic to the dataset replication step at startup rather than every sample fetch.
store = dds.PyDDStore(comm, ddstore_width=4) # e.g. 4 GPUs per storeIf ddstore_width is omitted, all ranks in comm form a single store.
See examples/vae/distdataset.py for a torch.utils.data.Dataset wrapper and examples/vae/vae-ddp.py for a full DDP training example.
mpirun -n 4 python examples/vae/vae-ddp.py# Basic functional test (MPI RMA)
mpirun -n 4 python test/demo.py
# Integration test with PyTorch DDP
mpirun -n 4 python test/test.pyOptional arguments for test/demo.py and test/test.py:
| Flag | Default | Description |
|---|---|---|
--num |
1048576 |
Rows per rank |
--dim |
64 |
Elements per row |
--nbatch |
32 |
Number of random reads |
If you use DDStore in your research, please cite:
@inproceedings{choi2023ddstore,
title={DDStore: Distributed data store for scalable training of graph neural networks on large atomistic modeling datasets},
author={Choi, Jong Youl and Lupo Pasini, Massimiliano and Zhang, Pei and Mehta, Kshitij and Liu, Frank and Bae, Jonghyun and Ibrahim, Khaled},
booktitle={Proceedings of the SC'23 Workshops of the International Conference on High Performance Computing, Network, Storage, and Analysis},
pages={941--950},
year={2023}
}See LICENSE.