MPI Distribute Domain-first parallel workflow (distribute)
Choosing an MPI distribution strategy
Six functions and function-pairs are available for distributing a domain across MPI ranks. The right choice depends on mesh size and whether initial conditions need to be stored in the partition files.
Function |
Rank-0 peak memory |
When to use |
|---|---|---|
|
Full domain + quantities |
Default choice. Rank 0 builds the complete |
|
Full domain + quantities (shared) |
Same interface as |
|
Topology only |
Rank 0 builds a lightweight |
|
Topology only (shared) |
Like |
Full domain + quantities (preprocessing only) |
Offline two-phase workflow: build and partition the full domain once (including quantities), write one set of pickle + npy files per rank, then load and evolve. Nothing is held on rank 0 at runtime. See Offline domain partitioning (sequential_distribute_dump / sequential_distribute_load). |
|
Topology only (preprocessing only) |
Offline two-phase workflow: partition the mesh topology only (no quantities), write one NetCDF4 file per rank, then load and set quantities per-rank at runtime. Smallest files; ideal for very large meshes or ensemble runs. See Offline mesh partitioning (sequential_mesh_dump / sequential_mesh_load). |
Decision guide
Does rank 0 have enough RAM to hold the full Domain (mesh + quantities)?
│
├─ Yes ──► Run immediately (no preprocessing)?
│ ├─ Yes ──► Do multiple ranks share the same node?
│ │ ├─ No ──► distribute()
│ │ └─ Yes ──► distribute_collaborative()
│ └─ No ──► Preprocess offline with quantities:
│ sequential_distribute_dump / sequential_distribute_load
│ (see use_sequential_domain_io)
│
└─ No ──► Does rank 0 have enough RAM for topology only (no quantities)?
├─ Yes ──► Do multiple ranks share the same node?
│ ├─ No ──► distribute_basic_mesh()
│ └─ Yes ──► distribute_basic_mesh_collaborative()
└─ No ──► Preprocess offline (mesh only):
sequential_mesh_dump / sequential_mesh_load
(see use_sequential_mesh_io)
As a rough guide, a mesh of N triangles with P quantities requires
approximately 8 × N × P bytes for quantity arrays alone on rank 0
(double precision). Topology (coordinates + connectivity) is
~56 × N bytes.
All six approaches differ in how much memory and work rank 0 must do before the evolve loop begins.
Feature |
|
|
|
|
|
|---|---|---|---|---|---|
Mesh built on rank 0 |
Full |
Full |
|
Preprocessing only (offline, no MPI) |
Preprocessing only (offline, no MPI) |
Quantities set |
On rank 0, then distributed |
On rank 0, then distributed |
Per-rank after distribution |
Stored in files; loaded per-rank |
Per-rank after load |
Topology broadcast |
Point-to-point per rank |
Shared memory per node |
Point-to-point / shared memory |
Read from disk per rank |
Read from disk per rank |
Peak rank-0 memory at runtime |
O(N) mesh + O(N) quantities |
O(N) mesh + O(N) quantities |
O(N) mesh only |
Per-rank partition only |
Per-rank partition only |
Partition file format |
— |
— |
— |
Pickle + npy arrays |
NetCDF4 |
Best for |
Small–medium meshes, simple scripts |
Many ranks per node, large meshes |
Very large meshes; quantities set from functions |
Offline partitioning with quantities included (fast restart) |
Very large meshes or ensemble runs; quantities set per-rank |
distribute
The simplest workflow. Rank 0 builds a full Domain (mesh and
quantities), then distribute partitions it and sends each rank its
submesh. All ranks can set boundary conditions and operators after the
call.
import anuga
if anuga.myid == 0:
domain = anuga.rectangular_cross_domain(1000, 1000, len1=10.0, len2=10.0)
domain.set_quantity('elevation', lambda x, y: x / 10.0)
domain.set_quantity('stage', expression='elevation + 0.2')
else:
domain = None
# Rank 0 partitions; all ranks receive their submesh
domain = anuga.distribute(domain)
Br = anuga.Reflective_boundary(domain)
domain.set_boundary({'left': Br, 'right': Br, 'top': Br, 'bottom': Br})
rain = anuga.Rate_operator(domain, rate=lambda t: 0.001, factor=1.0)
for t in domain.evolve(yieldstep=1.0, finaltime=10.0):
if anuga.myid == 0:
domain.print_timestepping_statistics()
domain.sww_merge()
anuga.finalize()
The main limitation is that rank 0 must hold the complete mesh and all quantity arrays before any MPI communication starts. For very large meshes this can exhaust available memory on rank 0.
distribute_collaborative
A drop-in replacement for distribute that reduces peak memory use and
inter-rank data movement. The API is identical — rank 0 still builds a
full Domain with quantities set — but the distribution is done
differently:
Rank 0 partitions the mesh (METIS / Morton / Hilbert).
The full mesh topology is broadcast using
MPI.Win.Allocate_shared: one physical copy per compute node, shared by all ranks on that node. Between nodes a singleBcastamong node leaders transfers the data.Each rank independently builds its own ghost layer using a Cython BFS kernel rather than receiving it from rank 0.
Quantities are distributed with
MPI_Scatterv(each rank receives only its O(N/P) full-triangle rows) plus targetedMPI_Isendfor ghost triangles.
The result is that topology is stored only once per node (not once per rank), and quantity communication scales as O(N/P) rather than O(N).
import anuga
from anuga.parallel.parallel_api import distribute_collaborative
if anuga.myid == 0:
domain = anuga.rectangular_cross_domain(1000, 1000, len1=10.0, len2=10.0)
domain.set_quantity('elevation', lambda x, y: x / 10.0)
domain.set_quantity('stage', expression='elevation + 0.2')
else:
domain = None
# Shared-memory broadcast + per-rank BFS ghost layer
domain = distribute_collaborative(domain)
Br = anuga.Reflective_boundary(domain)
domain.set_boundary({'left': Br, 'right': Br, 'top': Br, 'bottom': Br})
for t in domain.evolve(yieldstep=1.0, finaltime=10.0):
if anuga.myid == 0:
domain.print_timestepping_statistics()
domain.sww_merge()
anuga.finalize()
Use distribute_collaborative when you have many MPI ranks sharing the
same node (e.g. 32–64 ranks on a large-memory node) or when the mesh is
large enough that topology duplication per rank becomes a concern.
Note
distribute_collaborative still requires rank 0 to allocate the full
quantity arrays before distribution. If that alone exceeds available
memory, use distribute_basic_mesh or
distribute_basic_mesh_collaborative instead.
Mesh-first workflows
For very large meshes where even allocating the full quantity arrays on rank 0 is impractical, two mesh-first strategies are available:
Online (MPI Distribute Mesh-first parallel workflow (distribute_basic_mesh)) — rank 0 builds a
Basic_meshat runtime, distributes topology to all ranks, and each rank sets its own quantities. Rank 0 still needs enough RAM for the full mesh topology.Offline (Offline mesh partitioning (sequential_mesh_dump / sequential_mesh_load)) — partition the mesh in a separate preprocessing step (no MPI required, can use a large-memory workstation or login node), write one NetCDF4 file per rank, then each rank reads only its own file at simulation time. Nothing beyond a per-rank partition is ever resident in memory. This is also the recommended approach for ensemble / scenario runs on the same mesh.
The online approach via distribute_basic_mesh() is described below.
import anuga
from anuga.abstract_2d_finite_volumes.basic_mesh import rectangular_cross_basic_mesh
from anuga.parallel.parallel_api import distribute_basic_mesh
if anuga.myid == 0:
bm = rectangular_cross_basic_mesh(1000, 1000, len1=10.0, len2=10.0)
else:
bm = None
# Only mesh topology is distributed; quantities are never on rank 0
domain = distribute_basic_mesh(bm)
domain.set_quantity('elevation', lambda x, y: x / 10.0)
domain.set_quantity('stage', expression='elevation + 0.2')
Br = anuga.Reflective_boundary(domain)
domain.set_boundary({'left': Br, 'right': Br, 'top': Br, 'bottom': Br})
for t in domain.evolve(yieldstep=1.0, finaltime=10.0):
if anuga.myid == 0:
domain.print_timestepping_statistics()
domain.sww_merge()
anuga.finalize()
See MPI Distribute Mesh-first parallel workflow (distribute_basic_mesh) for full details, including the
shared-memory variant distribute_basic_mesh_collaborative.
Running parallel scripts
Use mpiexec (or mpirun) to launch the script. For example, to
run on 8 MPI processes:
mpiexec -np 8 python -u run_model.py
The -u flag gives unbuffered output so that print statements from
all ranks appear in real time.
Example scripts are in examples/parallel/:
Script |
Demonstrates |
|---|---|
|
|
|
|
|
|
|
|
Merging parallel SWW files
At the end of a parallel run each MPI rank writes its own per-rank SWW
file. domain.sww_merge() (called from rank 0) reassembles them into
a single global SWW file.
By default the merge reads all timesteps from every per-rank file at once, which is the fastest approach but requires enough RAM to hold the full merged time series for each dynamic quantity:
domain.sww_merge(delete_old=True) # default: all timesteps in RAM
For large simulations where the full time series does not fit in RAM, use
the chunk_size parameter to process only N timesteps at a time.
The merge will make multiple passes through the input files — more I/O,
but peak memory is bounded to approximately
chunk_size × n_global_nodes × 4 bytes per dynamic quantity:
# Process 50 timesteps per pass — reduces peak RAM substantially
domain.sww_merge(delete_old=True, chunk_size=50)
As a rough sizing guide:
Global nodes (smooth) / vertices (non-smooth) |
chunk_size=50 peak RAM (MB) |
chunk_size=200 peak RAM (MB) |
|---|---|---|
100 000 |
~20 MB |
~80 MB |
1 000 000 |
~200 MB |
~800 MB |
10 000 000 |
~2 GB |
~8 GB |
(figures are per dynamic quantity and assume float32 storage)
You can also call sww_merge_parallel directly from post-processing
scripts without a running domain:
from anuga.utilities.sww_merge import sww_merge_parallel
sww_merge_parallel('cairns', np=8, delete_old=True, chunk_size=100)
Choosing the number of processes
There is a practical limit to how many MPI ranks benefit a simulation. As a rough guide, aim for at least 1000–2000 triangles per rank after partitioning. Below that threshold, the overhead of ghost-cell communication and MPI synchronisation outweighs the computational saving. Some experimentation for your specific mesh and hardware is always worthwhile.
Load balance monitoring
METIS partitions the mesh so that each rank owns roughly the same number of triangles. However, in inundation simulations the computational cost per triangle depends on whether it is wet or dry:
Wet triangle: full flux computation, extrapolation, friction update.
Dry triangle: flux computation is skipped (
optimise_dry_cells); far cheaper.
As the inundation front advances, some ranks end up owning mostly wet
triangles while others own mostly dry coastal-plain triangles. The dry
ranks finish their compute phase early and then sit idle in the
MPI_Allreduce timestep-synchronisation barrier waiting for the wet
ranks. This idle time appears in domain.communication_reduce_time and
is a direct measure of load imbalance caused by the wet/dry distribution.
load_balance_statistics and print_load_balance_statistics gather
these per-rank numbers via MPI_Allgather and report them on rank 0.
for t in domain.evolve(yieldstep=60.0, finaltime=3600.0):
if anuga.myid == 0:
domain.print_timestepping_statistics()
domain.print_load_balance_statistics() # all ranks participate
Example output for a 2-rank inundation run where rank 0 owns the wet half and rank 1 owns the dry coastal floodplain:
Load balance statistics=========================================
Rank n_full n_ghost wet% ghost% compute(s) comm(s) Allreduce_wait(s)
------------------------------------------------------------------------
0 19999 145 75.0 0.7 1.42 0.016 0.008
1 20001 142 0.0 0.7 0.63 0.016 0.792
================================================================
Imbalance ratio (max/mean compute): 1.38
Pearson r(wet_fraction, compute_time): +1.000
Interpretation: wetter ranks do more work (|r| = 1.000)
Key fields:
Field |
Meaning |
|---|---|
|
Percentage of owned (full) triangles with depth >
|
|
Halo triangles as a fraction of all triangles. A value above ~15% suggests the subdomain boundary is too large relative to the interior — consider a different partition or fewer ranks. |
|
Wall time minus all communication time. The limiting rank sets the pace of the whole simulation. |
|
Time spent idle in the timestep-sync barrier. Large values on a rank indicate it arrives early because it has little wet work to do. This is wasted wall time. |
|
|
|
Correlation between wet fraction and compute time across ranks. Values near +1 confirm that wet/dry distribution is the dominant source of imbalance (as opposed to, for example, ghost-cell overhead or structure operator costs). |
The method can also be called programmatically:
stats = domain.load_balance_statistics()
# stats is a dict of numpy arrays (length numproc) on rank 0; None on others
if anuga.myid == 0:
print(f"Imbalance ratio: {stats['imbalance_ratio']:.2f}")
print(f"Wet fractions: {stats['wet_fraction']}")
What to do about imbalance
Static METIS decomposition cannot know in advance how the inundation front will evolve, so some wet/dry imbalance is expected for most real scenarios. Practical mitigations:
Use fewer ranks for mostly-dry domains. If most of the domain is dry for most of the simulation, MPI overhead may not be worthwhile.
Weight the METIS partition towards expected wet regions. If you know in advance (from a coarse serial run or bathymetry) which regions will remain wet, you can provide per-triangle weights to the partitioner so that wet triangles are counted more heavily.
Accept the imbalance and use the saved time differently. If ranks are idle, consider adding more output or operator work on those ranks rather than spinning in the barrier.
Dynamic repartitioning (future work). Repartitioning the mesh mid-run as the wet front advances is the algorithmically correct solution but requires transferring all quantity arrays between ranks and is not yet implemented in ANUGA.