"""
Finite-volume computations of the shallow water wave equation.
Title: ANGUA shallow_water_domain - 2D triangular domains for finite-volume
computations of the shallow water wave equation.
Author: Ole Nielsen, Ole.Nielsen@ga.gov.au
Stephen Roberts, Stephen.Roberts@anu.edu.au
Duncan Gray, Duncan.Gray@ga.gov.au
Gareth Davies, gareth.davies.ga.code@gmail.com
CreationDate: 2004
Description:
This module contains a specialisation of class Generic_Domain from
module generic_domain.py consisting of methods specific to the
Shallow Water Wave Equation
U_t + E_x + G_y = S
where
U = [w, uh, vh]
E = [uh, u^2h + gh^2/2, uvh]
G = [vh, uvh, v^2h + gh^2/2]
S represents source terms forcing the system
(e.g. gravity, friction, wind stress, ...)
and _t, _x, _y denote the derivative with respect to t, x and y
respectively.
The quantities are
symbol variable name explanation
x x horizontal distance from origin [m]
y y vertical distance from origin [m]
z elevation elevation of bed on which flow is modelled [m]
h height water height above z [m]
w stage absolute water level, w = z+h [m]
u speed in the x direction [m/s]
v speed in the y direction [m/s]
uh xmomentum momentum in the x direction [m^2/s]
vh ymomentum momentum in the y direction [m^2/s]
eta mannings friction coefficient [to appear]
nu wind stress coefficient [to appear]
The conserved quantities are w, uh, vh
Reference:
Catastrophic Collapse of Water Supply Reservoirs in Urban Areas,
Christopher Zoppou and Stephen Roberts,
Journal of Hydraulic Engineering, vol. 127, No. 7 July 1999
Hydrodynamic modelling of coastal inundation.
Nielsen, O., S. Roberts, D. Gray, A. McPherson and A. Hitchman
In Zerger, A. and Argent, R.M. (eds) MODSIM 2005 International Congress on
Modelling and Simulation. Modelling and Simulation Society of Australia and
New Zealand, December 2005, pp. 518-523. ISBN: 0-9758400-2-9.
http://www.mssanz.org.au/modsim05/papers/nielsen.pdf
See also: https://anuga.anu.edu.au and https://en.wikipedia.org/wiki/ANUGA_Hydro
Constraints: See license in the user guide
"""
from __future__ import annotations
# Decorator added for profiling
#------------------------------
def profileit(name):
def inner(func):
def wrapper(*args, **kwargs):
import cProfile
prof = cProfile.Profile()
retval = prof.runcall(func, *args, **kwargs)
# Note use of name from outer scope
print(str(args[1])+'_'+name)
prof.dump_stats(str(args[1])+'_'+name)
return retval
return wrapper
return inner
#-----------------------------
import numpy as num
import sys
import os
import time
from typing import TYPE_CHECKING
from collections.abc import Callable, Iterator
from numpy.typing import ArrayLike
if TYPE_CHECKING:
from datetime import datetime as DateTime
from zoneinfo import ZoneInfo as ZoneInfoType
try:
from zoneinfo import ZoneInfo
except ImportError:
from backports.zoneinfo import ZoneInfo
try:
import dill as pickle
except ImportError:
import pickle
from anuga.abstract_2d_finite_volumes.generic_domain \
import Generic_Domain
from anuga.config import MULTIPROCESSOR_OPENMP, MULTIPROCESSOR_GPU
from anuga.config import LOW_FROUDE_OFF, LOW_FROUDE_1, LOW_FROUDE_2
from anuga.shallow_water.forcing import Cross_section
from anuga.utilities.numerical_tools import mean
from anuga.file.sww import SWW_file
import anuga.utilities.log as log
from anuga.utilities.parallel_abstraction import size, rank, get_processor_name
from anuga.utilities.parallel_abstraction import finalize, send, receive
from anuga.utilities.parallel_abstraction import pypar_available, barrier
#-----------------------------------------------------
# Code for profiling cuda version
#-----------------------------------------------------
def nvtxRangePush(*arg):
pass
def nvtxRangePop(*arg):
pass
try:
from cupy.cuda.nvtx import RangePush as nvtxRangePush
from cupy.cuda.nvtx import RangePop as nvtxRangePop
except ImportError:
pass
try:
from nvtx import range_push as nvtxRangePush
from nvtx import range_pop as nvtxRangePop
except ImportError:
pass
#-----------------------------------------------------
# Process-global GPU offload control
#
# Whether mode 2 ('unified') offloads to a GPU is a *process-level* OpenMP
# setting (the target-offload runtime ICV), not a per-domain property: a single
# process cannot run one domain on the GPU and another on the CPU with the same
# unified kernels. These module functions own that process-wide decision; the
# per-domain choice (legacy vs unified) lives on Domain.set_compute_mode().
#-----------------------------------------------------
def _gpu_ext_or_none():
try:
from anuga.shallow_water import sw_domain_gpu_ext as gpu_ext
return gpu_ext
except Exception:
return None
def gpu_offload_supported() -> bool:
"""True if this build/run can offload mode 2 to a GPU device.
Reflects build + hardware: False for a CPU-only build (``gpu_offload=false`` /
``CPU_ONLY_MODE`` — the standard pip/conda install), when no device is
present, or when offload was disabled at launch via
``OMP_TARGET_OFFLOAD=disabled`` (``gpu_available()`` covers all three). This
is a static capability and is *not* affected by :func:`set_gpu_offload`.
"""
ge = _gpu_ext_or_none()
return bool(ge.gpu_available()) if ge is not None else False
def gpu_offload_enabled() -> bool:
"""Return the resolved process-global offload state for mode 2 ('unified').
True only when the build supports offload *and* it has not been switched off
via :func:`set_gpu_offload`. Always False on a CPU-only build.
"""
if not gpu_offload_supported():
return False
ge = _gpu_ext_or_none()
return bool(ge.get_offload_enabled()) if ge is not None else False
def set_gpu_offload(enable: bool = True, verbose: bool = True) -> bool:
"""Enable or disable GPU offload for mode-2 ('unified') domains, process-wide.
This is a *process-level* switch, not per-domain: it affects every domain
that runs in 'unified' mode, because OpenMP target offload is a process-wide
runtime setting (one process cannot run the same unified kernels on a GPU for
one domain and on the CPU for another).
The setting is honoured at GPU-domain init, where arrays are mapped to the
chosen device. **Call it before building the first 'unified' domain** — once
a domain's data is mapped to a device, switching this domain's offload would
leave data and execution on different devices.
Implemented via the OpenMP default-device ICV (``omp_set_default_device`` /
a process-global flag in ``sw_domain_gpu_ext``) — robust and re-enableable,
unlike mutating ``OMP_TARGET_OFFLOAD`` after the runtime has initialised.
Parameters
----------
enable : bool
True to offload 'unified' domains to a GPU (GPU build + device required);
False to force 'unified' to run CPU-multicore.
verbose : bool
Print a one-line confirmation.
Returns
-------
bool
The resolved offload state (:func:`gpu_offload_enabled`). Requesting
``enable=True`` on a build without offload support warns and returns
False — never hard-fails.
"""
import warnings
ge = _gpu_ext_or_none()
was_supported = gpu_offload_supported()
if enable:
# Clear any prior disable BEFORE checking capability, so re-enabling is
# not blocked by our own OMP_TARGET_OFFLOAD=disabled.
os.environ.pop('OMP_TARGET_OFFLOAD', None)
if not gpu_offload_supported():
warnings.warn(
"set_gpu_offload(True): this ANUGA build has no GPU offload support "
"(built with gpu_offload=false) or no device is present; 'unified' "
"domains will run on CPU multicore. Rebuild with -Dgpu_offload=true "
"and a GPU-capable compiler to enable offload.",
stacklevel=2)
enable = False
elif was_supported:
# Disabling offload on a GPU build. This forces the unified kernels onto
# the host, but a GPU build (nvc -mp=gpu,multicore) runs the `omp target`
# regions on the host through a slow fallback that does NOT scale with
# threads — so this is for correctness A/B (GPU vs CPU give the same
# results), NOT for performance. For fast CPU multicore, use a
# gpu_offload=false (gcc) build instead.
try:
from anuga import myid
except Exception:
myid = 0
if myid == 0:
warnings.warn(
"set_gpu_offload(False) on a GPU build: 'unified' will run on the "
"host, but the nvc GPU build's host fallback is slow and does not "
"scale with threads (use it for correctness checks, not timing). "
"For fast CPU multicore, build with -Dgpu_offload=false.",
stacklevel=2)
if not enable:
# OMP_TARGET_OFFLOAD=disabled is what actually keeps the `omp target`
# regions (solver AND operators) on the host. The default-device flag
# alone does NOT stop the solver offloading, so it must be set here.
# The OpenMP runtime reads this at its first target region, so call
# set_gpu_offload() before the first evolve()/domain build.
os.environ['OMP_TARGET_OFFLOAD'] = 'disabled'
# Keep the C-side flag consistent so gpu_domain_init picks the host device
# and the inlet/culvert operators route there too (gpu_compute_device).
if ge is not None:
ge.set_offload_enabled(bool(enable))
state = bool(enable)
if verbose:
print(f"GPU offload {'enabled' if state else 'disabled'} "
f"(process-wide; 'unified' domains run on {'GPU' if state else 'CPU multicore'})")
return state
def set_omp_num_threads(omp_num_threads: int | None = None, verbose: bool = True) -> int:
"""Set the OpenMP thread count for ANUGA kernels (process-wide).
``OMP_NUM_THREADS`` / ``omp_set_num_threads`` controls the whole process, so
this is a module-level setting, not per-domain — it affects every domain's
OpenMP regions (both the legacy ``sw_domain_openmp_ext`` solver and the
unified ``gpu_ext`` kernels). ``Domain.set_omp_num_threads`` delegates here.
Parameters
----------
omp_num_threads : int or None
Thread count. If None, use ``OMP_NUM_THREADS`` from the environment,
defaulting to 1 when unset.
verbose : bool
Print a one-line confirmation.
Returns
-------
int
The thread count applied.
"""
if omp_num_threads is None:
omp_num_threads = os.environ.get('OMP_NUM_THREADS', None)
if verbose:
print(f'Using OMP_NUM_THREADS from environment: {omp_num_threads}')
if omp_num_threads is None:
omp_num_threads = 1 # Default to 1 if not set
try:
omp_num_threads = int(omp_num_threads)
except (ValueError, TypeError):
raise ValueError('OMP_NUM_THREADS must be an integer')
# omp_set_num_threads is process-global: one call covers every OpenMP region
# in the process, so routing through the legacy extension also sets the
# thread count for the unified gpu_ext kernels.
from .sw_domain_openmp_ext import set_omp_num_threads as set_omp_num_threads_ext
set_omp_num_threads_ext(omp_num_threads)
# Keep the env var consistent so banners / introspection / any subprocess
# report the same count (the runtime ICV is already set above).
os.environ['OMP_NUM_THREADS'] = str(omp_num_threads)
if verbose:
print(f'Setting omp_num_threads to {omp_num_threads}')
return omp_num_threads
[docs]
class Domain(Generic_Domain):
"""Object which encapulates the shallow water model
This class is a specialization of class Generic_Domain from
module generic_domain.py consisting of methods specific to the
Shallow Water Wave Equation
Shallow Water Wave Equation
.. math::
U_t + E_x + G_y = S
where
.. math::
U = [w, uh, vh]^T
.. math::
E = [uh, u^2h + gh^2/2, uvh]
.. math::
G = [vh, uvh, v^2h + gh^2/2]
S represents source terms forcing the system
(e.g. gravity, friction, wind stress, ...)
and _t, _x, _y denote the derivative with respect to t, x and y
respectively.
The quantities are
.. list-table::
:widths: 25 25 50
:header-rows: 1
* - symbol
- variable name
- explanation
* - x
- x
- horizontal distance from origin [m]
* - y
- y
- vertical distance from origin [m]
* - z
- elevation
- elevation of bed on which flow is modelled [m]
* - h
- height
- water height above z [m]
* - w
- stage
- absolute water level, w = z+h [m]
* - u
-
- speed in the x direction [m/s]
* - v
-
- speed in the y direction [m/s]
* - uh
- xmomentum
- momentum in the x direction [m^2/s]
* - vh
- ymomentum
- momentum in the y direction [m^2/s]
* -
-
-
* - eta
-
- mannings friction coefficient [to appear]
* - nu
-
- wind stress coefficient [to appear]
The conserved quantities are w, uh, vh
"""
[docs]
def __init__(self,
coordinates: ArrayLike | str | None = None,
vertices: ArrayLike | None = None,
boundary: dict | None = None,
tagged_elements: dict | None = None,
geo_reference=None,
use_inscribed_circle: bool = False,
mesh_filename: str | None = None,
use_cache: bool = False,
verbose: bool = False,
conserved_quantities: list[str] | None = None,
evolved_quantities: list[str] | None = None,
other_quantities: list[str] | None = None,
full_send_dict: dict | None = None,
ghost_recv_dict: dict | None = None,
starttime: float = 0,
processor: int = 0,
numproc: int = 1,
number_of_full_nodes: int | None = None,
number_of_full_triangles: int | None = None,
ghost_layer_width: int = 2,
**kwargs) -> None:
"""Instantiate a shallow water domain.
:param coordinates: vertex locations for the mesh
:param vertices: vertex indices defining the triangles of the mesh
:param boundary: boundaries of the mesh
"""
# Define quantities for the shallow_water domain
if conserved_quantities is None:
conserved_quantities = ['stage', 'xmomentum', 'ymomentum']
if evolved_quantities is None:
evolved_quantities = ['stage', 'xmomentum', 'ymomentum']
if other_quantities is None:
other_quantities = ['elevation', 'friction', 'height',
'xvelocity', 'yvelocity', 'x', 'y']
# Selective array allocation per quantity type.
# Quantities not listed default to 'evolved' (all arrays) for
# backward compatibility with user-defined quantities.
self._quantity_type_map = {
'stage': 'evolved',
'xmomentum': 'evolved',
'ymomentum': 'evolved',
# elevation: centroid + edge only; gradients lazy (erosion operators
# trigger allocation via compute_local_gradients when needed)
'elevation': 'edge_diagnostic',
'friction': 'centroid_only',
'height': 'edge_diagnostic',
'xvelocity': 'edge_diagnostic',
'yvelocity': 'edge_diagnostic',
'x': 'coordinate',
'y': 'coordinate',
}
Generic_Domain.__init__(self,
coordinates,
vertices,
boundary,
conserved_quantities,
evolved_quantities,
other_quantities,
tagged_elements,
geo_reference,
use_inscribed_circle,
mesh_filename,
use_cache,
verbose,
full_send_dict,
ghost_recv_dict,
starttime,
processor,
numproc,
number_of_full_nodes=number_of_full_nodes,
number_of_full_triangles=number_of_full_triangles,
ghost_layer_width=ghost_layer_width)
#-------------------------------
# Operator Data Structures
#-------------------------------
self.fractional_step_operators = []
self.kv_operator = None
self.max_quantities_operator = None
self.dplotter = None
self.gpu_culvert_manager = None # Initialized when GPU mode + Boyd operators
#-------------------------------
# Set flow defaults
#-------------------------------
self.set_flow_algorithm()
#-------------------------------
# Set default multiprocessor mode
# 1. Openmp
# 2. Cuda
#-------------------------------
self.gpu_interface = None
self.use_c_rk_loop = True # Use C RK loop (faster) vs Python-orchestrated GPU loop
# Default compute mode: 'legacy' (mode 1). Set ANUGA_DEFAULT_COMPUTE_MODE=unified
# to default new domains to mode 2 (the migration target); the device interface
# is then built lazily at first evolve() so construction needs no boundaries.
# SERIAL ONLY: under MPI (numprocs > 1) the env opt-in is ignored and domains
# stay 'legacy'. Mode-2 parallel is validated for purpose-built setups (it must
# be selected explicitly), but is not yet robust as a blanket default for every
# parallel test's evolve pattern — defaulting all parallel domains to mode 2
# deadlocks the MPI path. Parallel unified is a later migration step.
try:
from anuga import numprocs
except Exception:
numprocs = 1
if (numprocs == 1
and os.environ.get('ANUGA_DEFAULT_COMPUTE_MODE', 'legacy').lower() == 'unified'):
self.set_compute_mode('unified')
else:
self.set_compute_mode('legacy')
#-------------------------------
# C extension domain structure
# Will be setup by setup_domain_openmp_ext
#-------------------------------
self._Domain_C_struct = None
#-------------------------------
# If environment variable OMP_NUM_THREADS is not set,
# then set to default (1 thread). If a value is given to
# the method, then it will override the default.
#------------------------------
self.set_omp_num_threads(verbose=False)
#-------------------------------
# datetime and timezone
#-------------------------------
self.set_timezone()
#-------------------------------
# Forcing Terms
#
# Gravity is now incorporated in
# compute_fluxes routine
#-------------------------------
from .friction import manning_friction_semi_implicit
self.forcing_terms.append(manning_friction_semi_implicit)
#-------------------------------
# Stored output
#-------------------------------
self.set_store(True)
self.set_store_centroids(True)
self.set_store_vertices_uniquely(False)
self.quantities_to_be_stored = {'elevation': 1,
'friction':1,
'stage': 2,
'xmomentum': 2,
'ymomentum': 2}
#-------------------------------
# Set up check pointing every n
# yieldsteps
#-------------------------------
self.checkpoint = False
self.yieldstep_counter = 0
self.checkpoint_step = 10
#-------------------------------
# Useful auxiliary quantity
# Set centroid and edge values directly from mesh coordinate arrays
# without allocating vertex_values (kept lazy to save memory).
# Edge formula matches _interpolate in quantity_openmp.c:
# edge[0] = 0.5*(v[1]+v[2]), edge[1] = 0.5*(v[2]+v[0]),
# edge[2] = 0.5*(v[0]+v[1])
#-------------------------------
n = self.number_of_elements
vx = self.vertex_coordinates[:, 0].reshape(n, 3)
vy = self.vertex_coordinates[:, 1].reshape(n, 3)
qx = self.quantities['x']
qx.centroid_values[:] = (vx[:, 0] + vx[:, 1] + vx[:, 2]) / 3.0
qx.edge_values[:, 0] = 0.5 * (vx[:, 1] + vx[:, 2])
qx.edge_values[:, 1] = 0.5 * (vx[:, 2] + vx[:, 0])
qx.edge_values[:, 2] = 0.5 * (vx[:, 0] + vx[:, 1])
qx.set_boundary_values_from_edges()
qy = self.quantities['y']
qy.centroid_values[:] = (vy[:, 0] + vy[:, 1] + vy[:, 2]) / 3.0
qy.edge_values[:, 0] = 0.5 * (vy[:, 1] + vy[:, 2])
qy.edge_values[:, 1] = 0.5 * (vy[:, 2] + vy[:, 0])
qy.edge_values[:, 2] = 0.5 * (vy[:, 0] + vy[:, 1])
qy.set_boundary_values_from_edges()
# For riverwalls, we need to know the 'edge_flux_type' for each edge
# Edge-flux-type of 0 == Normal edge, with shallow water flux
# 1 == riverwall
# 2 == ?
# etc
# Lazy: allocated by _ensure_work_arrays() (no river walls) or by
# create_riverwalls() (before evolve). Saves ~108 MB at N=2.25M.
self.edge_flux_type = None # (3N,) int — 0=normal, 1=riverwall
self.edge_river_wall_counter = None # (3N,) int — per-edge riverwall index
self.number_of_riverwall_edges = 0
# Riverwalls -- initialise with dummy values
# Presently only works with DE algorithms, will fail otherwise
import anuga.structures.riverwall
self.riverwallData=anuga.structures.riverwall.RiverWall(self)
self.create_riverwalls = self.riverwallData.create_riverwalls
## Keep track of the fluxes through the boundaries
## Only works for DE algorithms at present
max_time_substeps=3 # Maximum number of substeps supported by any timestepping method
# boundary_flux_sum holds boundary fluxes on each sub-step [unused substeps = 0.]
self.boundary_flux_sum=num.array([0.]*max_time_substeps)
from anuga.operators.boundary_flux_integral_operator import boundary_flux_integral_operator
self.boundary_flux_integral=boundary_flux_integral_operator(self)
# Make an integer counting how many times we call compute_fluxes_central -- so we know which substep we are on
#self.call=1
# List to store the volumes we computed before
self.volume_history=[]
# Work arrays actually read by the C extension — allocated lazily on the
# first evolve step (via setup_Domain_C_struct → _ensure_work_arrays).
self.x_centroid_work = None # (N,) scratch for velocity extrapolation
self.y_centroid_work = None # (N,) scratch for velocity extrapolation
# The following arrays were historically allocated but are confirmed dead
# (C computation never reads them). They remain None (→ NULL in C struct)
# for the entire simulation lifetime, saving ~600+ MB at N=2.25M.
self.edge_flux_work = None # (9N,) — unused, NULL in C struct
self.neigh_work = None # (9N,) — unused, NULL in C struct
self.pressuregrad_work = None # (3N,) — unused, NULL in C struct
#-----------------------------------
# parameters for structures
#-----------------------------------
self.use_new_velocity_head = False
#------------------------------------------------
# Domain_C_struct is a cdef class with a custom __cinit__,
# so Cython will not auto-generate a default pickling protocol for it;
# when pickle reaches the Domain object and tries to pickle _Domain_C_struct,
# you get TypeError: no default __reduce__ due to non-trivial __cinit__.
# So we implement __getstate__ and __setstate__ to exclude it from pickling,
# and recreate it lazily when needed.
#------------------------------------------------
def __getstate__(self):
state = self.__dict__.copy()
# Do not pickle the C wrapper; it can be recreated
state.pop('_Domain_C_struct', None)
# The mode-2 ('unified') GPU interface wraps a cdef GPUDomain with a
# non-trivial __cinit__ (not picklable) and holds device handles. Drop
# it; _ensure_gpu_interface() rebuilds it lazily after unpickling.
state.pop('gpu_interface', None)
state.pop('_gpu_boundary_info_initialized', None)
return state
def __setstate__(self, state):
self.__dict__.update(state)
# Recreate C wrapper lazily when needed
self._Domain_C_struct = None
# Force the mode-2 device interface to be rebuilt on demand.
self.gpu_interface = None
def update_domain_c_struct(self):
"""Update the C domain structure from the Python Domain object.
"""
from .sw_domain_openmp_ext import update_Domain_C_struct
update_Domain_C_struct(self)
def _ensure_work_arrays(self):
"""Allocate the work arrays actually needed by the C extension.
Called by ``setup_Domain_C_struct`` in the Cython extension before the
C domain struct is populated. Only arrays that the C computation code
actually reads are allocated here — confirmed-dead arrays remain None
(passed as NULL to the C struct) for the full simulation lifetime.
"""
if self.x_centroid_work is not None:
return # already allocated
N = self.number_of_elements
# ---- scratch arrays used by the C extrapolation kernel ----
self.x_centroid_work = num.zeros(N) # (N,)
self.y_centroid_work = num.zeros(N) # (N,)
# ---- per-element max wave speed (CFL timestep calculation) ----
self.max_speed = num.zeros(N, float) # (N,)
# Note: edge_flux_type / edge_river_wall_counter stay None unless
# create_riverwalls() is called; all other struct fields remain NULL.
#---------------------------------------------------------------
# Plotting methods
#---------------------------------------------------------------
def set_plotter(self, *args, **kwargs):
"""Set the plotter for this domain
"""
#FIXME SR: Should look into seeing if the triang can use the
# triangulation from Domain rather than having two copies
if self.dplotter is None:
import anuga
self.dplotter = anuga.Domain_plotter(self, *args, **kwargs)
self.triang = self.dplotter.triang
self.stage = self.dplotter.stage
self.xmom = self.dplotter.xmom
self.ymom = self.dplotter.ymom
self.elev = self.dplotter.elev
self.friction = self.dplotter.friction
self.xvel = self.dplotter.xvel
self.yvel = self.dplotter.yvel
self.x = self.dplotter.x
self.y = self.dplotter.y
self.xc = self.dplotter.xc
self.yc = self.dplotter.yc
self.plot_mesh = self.dplotter.plot_mesh
self.save_depth_frame = self.dplotter.save_depth_frame
self.plot_depth_frame = self.dplotter.plot_depth_frame
self.make_depth_animation = self.dplotter.make_depth_animation
self.save_stage_frame = self.dplotter.save_stage_frame
self.plot_stage_frame = self.dplotter.plot_stage_frame
self.make_stage_animation = self.dplotter.make_stage_animation
self.save_speed_frame = self.dplotter.save_speed_frame
self.plot_speed_frame = self.dplotter.plot_speed_frame
self.make_speed_animation = self.dplotter.make_speed_animation
def triplot(self, *args, **kwargs):
self.set_plotter()
import matplotlib.pyplot as plt
fig, ax = plt.subplots()
lines = ax.triplot(self.triang, *args, **kwargs)
ax.set_xlabel('Easting (m)')
ax.set_ylabel('Northing (m)')
return fig, ax, lines
def tripcolor(self, *args, **kwargs):
self.set_plotter()
import matplotlib.pyplot as plt
fig, ax = plt.subplots()
im = ax.tripcolor(self.triang, *args, **kwargs)
ax.set_xlabel('Easting (m)')
ax.set_ylabel('Northing (m)')
return fig, ax, im
#==============================================================
# Methods to set and get domain parameters
#
# FIXME SR: These (and other paramters) should be refactored
# to save the underlying quantities in np.ndarray(s) for
# efficient access in Cython
#==============================================================
@property
def g(self) -> float:
"""Gravitational acceleration [m/s^2]"""
return self._g
@g.setter
def g(self, value: float):
"""Set gravitational acceleration [m/s^2]"""
self._g = value
@property
def timestep(self) -> float:
"""Current timestep [s]"""
return self._timestep
@timestep.setter
def timestep(self, value: float):
"""Set current timestep [s]"""
self._timestep = value
@property
def flux_timestep(self) -> float:
"""Current flux timestep [s]"""
return self._flux_timestep
@flux_timestep.setter
def flux_timestep(self, value: float):
"""Set current flux timestep [s]"""
self._flux_timestep = value
#==============================================================
# Set config defaults
#==============================================================
def _set_config_defaults(self):
"""Set the default values in this routine. That way we can inherit class
and just redefine the defaults for the new class
"""
from anuga.config import minimum_storable_height
from anuga.config import minimum_allowed_height, maximum_allowed_speed
from anuga.config import g
from anuga.config import tight_slope_limiters
from anuga.config import extrapolate_velocity_second_order
from anuga.config import alpha_balance
from anuga.config import optimise_dry_cells
from anuga.config import use_centroid_velocities
from anuga.config import compute_fluxes_method
from anuga.config import sloped_mannings_function
from anuga.config import low_froude
# Early algorithms need elevation to remain continuous
self.set_using_discontinuous_elevation(False)
self.set_minimum_allowed_height(minimum_allowed_height)
self.maximum_allowed_speed = maximum_allowed_speed
self.minimum_storable_height = minimum_storable_height
self.g = g
self.alpha_balance = alpha_balance
self.tight_slope_limiters = tight_slope_limiters
self.set_low_froude(low_froude)
self.set_use_optimise_dry_cells(optimise_dry_cells)
self.set_extrapolate_velocity(extrapolate_velocity_second_order)
self.use_centroid_velocities = use_centroid_velocities
self.set_sloped_mannings_function(sloped_mannings_function)
self.set_compute_fluxes_method(compute_fluxes_method)
self.set_store_centroids(False)
def get_algorithm_parameters(self) -> dict:
"""
Get the standard parameter that are currently set (as a dictionary)
"""
parameters = {}
parameters['minimum_allowed_height'] = self.minimum_allowed_height
parameters['maximum_allowed_speed'] = self.maximum_allowed_speed
parameters['minimum_storable_height'] = self.minimum_storable_height
parameters['g'] = self.g
parameters['alpha_balance'] = self.alpha_balance
parameters['tight_slope_limiters'] = self.tight_slope_limiters
parameters['optimise_dry_cells'] = self.optimise_dry_cells
parameters['low_froude'] = self.low_froude
parameters['use_centroid_velocities'] = self.use_centroid_velocities
parameters['use_sloped_mannings'] = self.use_sloped_mannings
parameters['compute_fluxes_method'] = self.get_compute_fluxes_method()
parameters['flow_algorithm'] = self.get_flow_algorithm()
parameters['CFL'] = self.get_cfl()
parameters['timestepping_method'] = self.get_timestepping_method()
parameters['extrapolate_velocity_second_order'] = self.extrapolate_velocity_second_order
return parameters
def print_algorithm_parameters(self) -> None:
"""
Print the standard parameters that are curently set (as a dictionary)
"""
print('#============================')
print('# Domain Algorithm Parameters ')
print('#============================')
from pprint import pprint
pprint(self.get_algorithm_parameters(),indent=4)
print('#----------------------------')
def _set_DE0_defaults(self):
"""Set up the defaults for running the flow_algorithm "DE0"
A 'discontinuous elevation' method
"""
self._set_config_defaults()
self.set_cfl(0.9)
self.set_use_kinematic_viscosity(False)
#self.timestepping_method='rk2'#'rk3'#'euler'#'rk2'
self.set_timestepping_method('euler')
self.set_using_discontinuous_elevation(True)
self.set_using_centroid_averaging(True)
self.set_compute_fluxes_method('DE')
# Don't place any restriction on the minimum storable height
#self.minimum_storable_height=-99999999999.0
self.minimum_allowed_height=1.0e-12
self.set_default_order(2)
self.set_extrapolate_velocity()
self.beta_w=0.5
self.beta_w_dry=0.0
self.beta_uh=0.5
self.beta_uh_dry=0.0
self.beta_vh=0.5
self.beta_vh_dry=0.0
#self.set_quantities_to_be_stored({'stage': 2, 'xmomentum': 2,
# 'ymomentum': 2, 'elevation': 2, 'height':2})
#self.set_quantities_to_be_stored({'stage': 2, 'xmomentum': 2,
# 'ymomentum': 2, 'elevation': 1})
self.set_store_centroids(True)
self.optimise_dry_cells=False
# We need the edge_coordinates for the extrapolation
self.edge_coordinates=self.get_edge_midpoint_coordinates()
# By default vertex values are NOT stored uniquely
# for storage efficiency. We may override this (but not so important since
# centroids are stored anyway
# self.set_store_vertices_smoothly(False)
self.maximum_allowed_speed=0.0
if self.processor == 0 and self.verbose:
print('Domain: Using discontinuous elevation solver DE0')
# print('##########################################################################')
# print('#')
# print('# Using discontinuous elevation solver DE0')
# print('#')
# print('# First order timestepping')
# print('#')
# print('# Make sure you use centroid values when reporting on important output quantities')
# print('#')
# print('##########################################################################')
def _set_DE1_defaults(self):
"""Set up the defaults for running the flow_algorithm "DE1"
A 'discontinuous elevation' method
"""
self._set_config_defaults()
self.set_cfl(1.0)
self.set_use_kinematic_viscosity(False)
#self.timestepping_method='rk2'#'rk3'#'euler'#'rk2'
self.set_timestepping_method(2)
self.set_using_discontinuous_elevation(True)
self.set_using_centroid_averaging(True)
self.set_compute_fluxes_method('DE')
# Don't place any restriction on the minimum storable height
#self.minimum_storable_height=-99999999999.0
self.minimum_allowed_height=1.0e-5
self.set_default_order(2)
self.set_extrapolate_velocity()
self.beta_w=1.0
self.beta_w_dry=0.0
self.beta_uh=1.0
self.beta_uh_dry=0.0
self.beta_vh=1.0
self.beta_vh_dry=0.0
#self.set_quantities_to_be_stored({'stage': 2, 'xmomentum': 2,
# 'ymomentum': 2, 'elevation': 2, 'height':2})
#self.set_quantities_to_be_stored({'stage': 2, 'xmomentum': 2,
# 'ymomentum': 2, 'elevation': 1})
self.set_store_centroids(True)
self.optimise_dry_cells=False
# We need the edge_coordinates for the extrapolation
self.edge_coordinates=self.get_edge_midpoint_coordinates()
# By default vertex values are NOT stored uniquely
# for storage efficiency. We may override this (but not so important since
# centroids are stored anyway
# self.set_store_vertices_smoothly(False)
self.maximum_allowed_speed=0.0
if self.processor == 0 and self.verbose:
print('##########################################################################')
print('#')
print('# Using discontinuous elevation solver DE1 ')
print('#')
print('# Uses rk2 timestepping')
print('#')
print('# Make sure you use centroid values when reporting on important output quantities')
print('#')
print('##########################################################################')
def _set_DE_ader2_defaults(self):
"""Set up the defaults for running the flow_algorithm "DE_ader2"
DE1 settings with ADER-2 (Cauchy-Kovalewski) timestepping.
"""
self._set_config_defaults()
self.set_cfl(1.0)
self.set_use_kinematic_viscosity(False)
self.set_timestepping_method('ader2')
self.set_using_discontinuous_elevation(True)
self.set_using_centroid_averaging(True)
self.set_compute_fluxes_method('DE')
self.minimum_allowed_height = 1.0e-5
self.set_default_order(2)
self.set_extrapolate_velocity()
self.beta_w = 0.5
self.beta_w_dry = 0.0
self.beta_uh = 0.5
self.beta_uh_dry = 0.0
self.beta_vh = 0.5
self.beta_vh_dry = 0.0
self.set_store_centroids(True)
self.optimise_dry_cells = False
self.edge_coordinates = self.get_edge_midpoint_coordinates()
self.maximum_allowed_speed = 0.0
if self.processor == 0 and self.verbose:
print('##########################################################################')
print('#')
print('# Using discontinuous elevation solver DE_ader2')
print('#')
print('# Uses ADER-2 (Cauchy-Kovalewski) timestepping')
print('#')
print('# Make sure you use centroid values when reporting on important output quantities')
print('#')
print('##########################################################################')
def _set_DE2_defaults(self):
"""Set up the defaults for running the flow_algorithm "DE2"
A 'discontinuous elevation' method
"""
self._set_config_defaults()
self.set_cfl(1.0)
self.set_use_kinematic_viscosity(False)
#self.timestepping_method='rk2'#'rk3'#'euler'#'rk2'
self.set_timestepping_method(3)
self.set_using_discontinuous_elevation(True)
self.set_using_centroid_averaging(True)
self.set_compute_fluxes_method('DE')
# Don't place any restriction on the minimum storable height
#self.minimum_storable_height=-99999999999.0
self.minimum_allowed_height=1.0e-5
self.set_default_order(2)
self.set_extrapolate_velocity()
self.beta_w=1.0
self.beta_w_dry=0.0
self.beta_uh=1.0
self.beta_uh_dry=0.0
self.beta_vh=1.0
self.beta_vh_dry=0.0
#self.set_quantities_to_be_stored({'stage': 2, 'xmomentum': 2,
# 'ymomentum': 2, 'elevation': 2, 'height':2})
#self.set_quantities_to_be_stored({'stage': 2, 'xmomentum': 2,
# 'ymomentum': 2, 'elevation': 1})
self.set_store_centroids(True)
self.optimise_dry_cells=False
# We need the edge_coordinates for the extrapolation
self.edge_coordinates=self.get_edge_midpoint_coordinates()
# By default vertex values are NOT stored uniquely
# for storage efficiency. We may override this (but not so important since
# centroids are stored anyway
# self.set_store_vertices_smoothly(False)
self.maximum_allowed_speed=0.0
if self.processor == 0 and self.verbose:
print('##########################################################################')
print('#')
print('# Using discontinuous elevation solver DE2')
print('#')
print('# Using rk3 timestepping')
print('#')
print('# Make sure you use centroid values when reporting on important output quantities')
print('#')
print('##########################################################################')
def _set_DE1_7_defaults(self):
"""Set up the defaults for running the flow_algorithm "DE0_7"
A 'discontinuous elevation' method
"""
self._set_config_defaults()
self.set_cfl(1.0)
self.set_use_kinematic_viscosity(False)
#self.timestepping_method='rk2'#'rk3'#'euler'#'rk2'
self.set_timestepping_method(2)
self.set_using_discontinuous_elevation(True)
self.set_using_centroid_averaging(True)
self.set_compute_fluxes_method('DE')
# Don't place any restriction on the minimum storable height
#self.minimum_storable_height=-99999999999.0
self.minimum_allowed_height=1.0e-12
self.set_default_order(2)
self.set_extrapolate_velocity()
self.beta_w=0.75
self.beta_w_dry=0.1
self.beta_uh=0.75
self.beta_uh_dry=0.1
self.beta_vh=0.75
self.beta_vh_dry=0.1
#self.set_quantities_to_be_stored({'stage': 2, 'xmomentum': 2,
# 'ymomentum': 2, 'elevation': 2, 'height':2})
#self.set_quantities_to_be_stored({'stage': 2, 'xmomentum': 2,
# 'ymomentum': 2, 'elevation': 1})
self.set_store_centroids(True)
self.optimise_dry_cells=False
# We need the edge_coordinates for the extrapolation
self.edge_coordinates=self.get_edge_midpoint_coordinates()
# By default vertex values are NOT stored uniquely
# for storage efficiency. We may override this (but not so important since
# centroids are stored anyway
# self.set_store_vertices_smoothly(False)
self.maximum_allowed_speed=0.0
if self.processor == 0 and self.verbose:
print('##########################################################################')
print('#')
print('# Using discontinuous elevation solver DE1_7 ')
print('#')
print('# A slightly more diffusive version of DE1, does use rk2 timestepping')
print('#')
print('# Make sure you use centroid values when reporting on important output quantities')
print('#')
print('##########################################################################')
def _set_DE0_7_defaults(self):
"""Set up the defaults for running the flow_algorithm "DE3"
A 'discontinuous elevation' method
"""
self._set_config_defaults()
self.set_cfl(0.9)
self.set_use_kinematic_viscosity(False)
#self.timestepping_method='rk2'#'rk3'#'euler'#'rk2'
self.set_timestepping_method(1)
self.set_using_discontinuous_elevation(True)
self.set_using_centroid_averaging(True)
self.set_compute_fluxes_method('DE')
# Don't place any restriction on the minimum storable height
#self.minimum_storable_height=-99999999999.0
self.minimum_allowed_height=1.0e-12
self.set_default_order(2)
self.set_extrapolate_velocity()
self.beta_w=0.7
self.beta_w_dry=0.1
self.beta_uh=0.7
self.beta_uh_dry=0.1
self.beta_vh=0.7
self.beta_vh_dry=0.1
#self.set_quantities_to_be_stored({'stage': 2, 'xmomentum': 2,
# 'ymomentum': 2, 'elevation': 2, 'height':2})
#self.set_quantities_to_be_stored({'stage': 2, 'xmomentum': 2,
# 'ymomentum': 2, 'elevation': 1})
self.set_store_centroids(True)
self.optimise_dry_cells=False
# We need the edge_coordinates for the extrapolation
self.edge_coordinates=self.get_edge_midpoint_coordinates()
# By default vertex values are NOT stored uniquely
# for storage efficiency. We may override this (but not so important since
# centroids are stored anyway
# self.set_store_vertices_smoothly(False)
self.maximum_allowed_speed=0.0
if self.processor == 0 and self.verbose:
print('Domain: Using discontinuous elevation solver DE0_7')
# print('##########################################################################')
# print('#')
# print('# Using discontinuous elevation solver DE0_7')
# print('#')
# print('# A slightly less diffusive version than DE0, uses euler timestepping')
# print('#')
# print('# Make sure you use centroid values when reporting on important output quantities')
# print('#')
# print('##########################################################################')
def update_special_conditions(self):
my_update_special_conditions(self)
# Note Padarn 06/12/12: The following line decorates
# the set_quantity function to be profiled individually.
# Need to uncomment the decorator at top of file.
#@profileit("set_quantity.profile")
[docs]
def set_quantity(self, name: str, *args, **kwargs) -> None:
"""Set values for named quantity
We have to do something special for 'elevation'
otherwise pass through to generic set_quantity
"""
# if name == 'elevation':
# stage_c = self.get_quantity('stage').centroid_values
# elev_c = self.get_quantity('elevation').centroid_values
# height_c = stage_c - elev_c
# Generic_Domain.set_quantity(self, name, *args, **kwargs)
# stage_c[:] = elev_c + height_c
# else:
# Generic_Domain.set_quantity(self, name, *args, **kwargs)
Generic_Domain.set_quantity(self, name, *args, **kwargs)
def set_timezone(self, tz: str | ZoneInfoType | None = None) -> None:
"""Set timezone for domain
:param tz: either a timezone object or string
We recommend using the ZoneInfo provided by zoneinfo.
Default is ZoneInfo('UTC')
Example: Set default timezone UTC
>>> domain.set_timezone()
Example: Set timezone using tsdata string
>>> domain.set_timezone('Australia/Syndey')
Example: Set timezone using ZoneInfo timezone
>>> from zoneinfo import ZoneInfo
>>> new_tz = ZoneInfo('Australia/Sydney')
>>> domain.set_timezone(new_tz)
"""
try:
from zoneinfo import ZoneInfo
except ImportError:
from backports.zoneinfo import ZoneInfo
if tz is None:
new_tz = ZoneInfo('UTC')
elif isinstance(tz,str):
new_tz = ZoneInfo(tz)
elif isinstance(tz, ZoneInfo):
new_tz = tz
else:
msg = "Unknown timezone %s" % tz
raise Exception(msg)
self.timezone = new_tz
def get_timezone(self) -> ZoneInfoType:
"""Retrieve current domain timezone"""
return self.timezone
def get_datetime(self, timestamp: float | None = None) -> DateTime:
"""Retrieve datetime corresponding to current timestamp wrt to domain timezone
param: timestamp: return datetime corresponding to given timestamp"""
from datetime import datetime
try:
from datetime import UTC
except ImportError:
from datetime import timezone
UTC = timezone.utc
if timestamp is None:
timestamp = self.get_time()
#utc_datetime = datetime.utcfromtimestamp(timestamp).replace(tzinfo=ZoneInfo('UTC'))
utc_datetime = datetime.fromtimestamp(timestamp,UTC)
current_dt = utc_datetime.astimezone(self.timezone)
return current_dt
def set_starttime(self, timestamp: float | DateTime = 0.0) -> None:
"""Set the starttime for the evolution
:param timestamp: Either a float or a datetime object
Essentially we use unix time as our absolute time. So
time = 0 corresponds to Jan 1st 1970 UTC
Use naive datetime which will be localized to the domain timezone or
or use zoneinfo.ZoneInfo to set the timezone of datetime.
Don't use the tzinfo argument of datetime to set timezone as this does not work!
Example:
Without setting timezone for the `domain` and the `starttime` then time
calculations are all based on UTC. Note the timestamp, which is time in seconds
from 1st Jan 1970 UTC.
>>> import anuga
>>> from zoneinfo import ZoneInfo
>>> from datetime import datetime
>>>
>>> domain = anuga.rectangular_cross_domain(10,10)
>>> dt = datetime(2021,3,21,18,30)
>>> domain.set_starttime(dt)
>>> print(domain.get_datetime(), 'TZ', domain.get_timezone(), 'Timestamp: ', domain.get_time())
2021-03-21 18:30:00+00:00 TZ UTC Timestamp: 1616351400.0
Example:
Setting timezone for the `domain`, then naive `datetime` will be localizes to
the `domain` timezone. Note the timestamp, which is time in seconds
from 1st Jan 1970 UTC.
>>> import anuga
>>> from zoneinfo import ZoneInfo
>>> from datetime import datetime
>>>
>>> domain = anuga.rectangular_cross_domain(10,10)
>>> AEST = ZoneInfo('Australia/Sydney')
>>> domain.set_timezone(AEST)
>>>
>>> dt = datetime(2021,3,21,18,30)
>>> domain.set_starttime(dt)
>>> print(domain.get_datetime(), 'TZ', domain.get_timezone(), 'Timestamp: ', domain.get_time())
2021-03-21 18:30:00+11:00 TZ Australia/Sydney Timestamp: 1616311800.0
Example:
Setting timezone for the `domain`, and setting the timezone for the `datetime`.
Note the timestamp, which is time in seconds from 1st Jan 1970 UTC is the same
as the previous example.
>>> import anuga
>>> from zoneinfo import ZoneInfo
>>> from datetime import datetime
>>>
>>> domain = anuga.rectangular_cross_domain(10,10)
>>>
>>> ACST = ZoneInfo('Australia/Adelaide')
>>> domain.set_timezone(ACST)
>>>
>>> AEST = ZoneInfo('Australia/Sydney')
>>> dt = datetime(2021,3,21,18,30, tzinfo=AEST)
>>>
>>> domain.set_starttime(dt)
>>> print(domain.get_datetime(), 'TZ', domain.get_timezone(), 'Timestamp: ', domain.get_time())
2021-03-21 18:00:00+10:30 TZ Australia/Adelaide Timestamp: 1616311800.0
"""
from datetime import datetime
if self.evolved_called:
msg = ('Can\'t change simulation start time once evolve has '
'been called')
raise Exception(msg)
if isinstance(timestamp, datetime):
if timestamp.tzinfo is None:
dt = timestamp.replace(tzinfo=self.timezone)
time = dt.timestamp()
else:
time = timestamp.timestamp()
else:
time = float(timestamp)
self.starttime = time
# starttime is now the origin for relative_time
self.set_relative_time(0.0)
def get_starttime(self, datetime: bool = False) -> float | DateTime:
"""return starttime, either as timestamp, or as a datetime"""
starttime = self.starttime
if not datetime:
return starttime
else:
return self.get_datetime(starttime)
def set_store(self, flag: bool = True) -> None:
"""Set whether data saved to sww file.
"""
self.store = flag
def get_store(self) -> bool:
"""Get whether data saved to sww file.
"""
return self.store
def set_store_centroids(self, flag: bool = True) -> None:
"""Set whether centroid data is saved to sww file.
"""
self.store_centroids = flag
def get_store_centroids(self) -> bool:
"""Get whether data saved to sww file.
"""
return self.store_centroids
def set_checkpointing(self, checkpoint: bool = True, checkpoint_dir: str = 'CHECKPOINTS',
checkpoint_step: int = 10, checkpoint_time: float | None = None) -> None:
"""Set up checkpointing.
param checkpoint: Default = True. Set to False will turn off checkpointing
param checkpoint_dir: Where to store checkpointing files
param checkpoint_step: Save checkpoint files after this many yieldsteps
param checkpoint_time: If set, over-rides checkpoint_step. save checkpoint files
after this amount of walltime
"""
if checkpoint:
from anuga import myid
# On processor 0 create checkpoint directory if necessary
if myid == 0:
if True:
if not os.path.exists(checkpoint_dir):
os.mkdir(checkpoint_dir)
assert os.path.exists(checkpoint_dir)
self.checkpoint_dir = checkpoint_dir
if checkpoint_time is not None:
#import time
self.walltime_prev = time.time()
self.checkpoint_time = checkpoint_time
self.checkpoint_step = 0
else:
self.checkpoint_step = checkpoint_step
self.checkpoint = True
#print(self.checkpoint_dir, self.checkpoint_step)
else:
self.checkpoint = False
def set_sloped_mannings_function(self, flag: bool = True) -> None:
"""Set mannings friction function to use the sloped
wetted area.
The flag is tested in the python wrapper
mannings_friction_implicit
"""
if flag:
self.use_sloped_mannings = True
else:
self.use_sloped_mannings = False
def set_compute_fluxes_method(self, flag: str = 'original') -> None:
"""Set method for computing fluxes.
Currently
original
wb_1
wb_2
wb_3
tsunami
DE
"""
compute_fluxes_methods = ['original', 'wb_1', 'wb_2', 'wb_3', 'tsunami', 'DE']
if flag in compute_fluxes_methods:
self.compute_fluxes_method = flag
else:
msg = 'Unknown compute_fluxes_method. \nPossible choices are:\n'+ \
', '.join(compute_fluxes_methods)+'.'
raise Exception(msg)
def get_compute_fluxes_method(self) -> str:
"""Get method for computing fluxes.
See set_compute_fluxes_method for possible choices.
"""
return self.compute_fluxes_method
[docs]
def set_flow_algorithm(self, algorithm: str = 'DE0') -> None:
"""Set combination of slope limiting and time stepping
Currently
DE0
DE1
DE2
DE0_7
DE1_7
"""
algorithm = str(algorithm)
# Replace any dots with dashes
algorithm = algorithm.replace('.', '_')
flow_algorithms = ['DE0', 'DE1', 'DE2', \
'DE0_7', 'DE1_7', 'DE_ader2']
if algorithm in flow_algorithms:
self.flow_algorithm = algorithm
else:
msg = 'Unknown flow_algorithm. \nPossible choices are:\n'+ \
', '.join(flow_algorithms)+'.'
raise Exception(msg)
if self.flow_algorithm == 'DE0':
self._set_DE0_defaults()
if self.flow_algorithm == 'DE1':
self._set_DE1_defaults()
if self.flow_algorithm == 'DE2':
self._set_DE2_defaults()
if self.flow_algorithm == 'DE0_7':
self._set_DE0_7_defaults()
if self.flow_algorithm == 'DE1_7':
self._set_DE1_7_defaults()
if self.flow_algorithm == 'DE_ader2':
self._set_DE_ader2_defaults()
[docs]
def get_flow_algorithm(self) -> str:
"""
Get method used for timestepping and spatial discretisation
"""
return self.flow_algorithm
# def set_gravity_method(self):
# """Gravity method is determined by the compute_fluxes_method
# This is now not used, as gravity is combine in the compute_fluxes method
# """
# if self.get_compute_fluxes_method() == 'original':
# self.forcing_terms[0] = gravity
# elif self.get_compute_fluxes_method() == 'wb_1':
# self.forcing_terms[0] = gravity_wb
# elif self.get_compute_fluxes_method() == 'wb_2':
# self.forcing_terms[0] = gravity
# else:
# raise Exception('undefined compute_fluxes method')
def set_extrapolate_velocity(self, flag: bool = True) -> None:
""" Extrapolation routine uses momentum by default,
can change to velocity extrapolation which
seems to work better.
"""
if flag is True:
self.extrapolate_velocity_second_order = True
elif flag is False:
self.extrapolate_velocity_second_order = False
[docs]
def set_low_froude(self, low_froude: int = 0) -> None:
""" For low Froude problems the standard flux calculations
can lead to excessive damping. Set low_froude to 1 or 2 for
flux calculations which minimize the damping in this case.
"""
assert low_froude in [LOW_FROUDE_OFF, LOW_FROUDE_1, LOW_FROUDE_2]
self.low_froude = low_froude
def set_use_optimise_dry_cells(self, flag: bool = True) -> None:
""" Try to optimize calculations where region is dry
"""
if flag is True:
self.optimise_dry_cells = int(True)
elif flag is False:
self.optimise_dry_cells = int(False)
def set_use_kinematic_viscosity(self, flag: bool = True) -> None:
from anuga.operators.kinematic_viscosity_operator import Kinematic_viscosity_operator
if flag :
# Create Operator if necessary
if self.kv_operator is None:
self.kv_operator = Kinematic_viscosity_operator(self)
else:
if self.kv_operator is None:
return
else:
# Remove operator from fractional_step_operators
self.fractional_step_operators.remove(self.kv_operator)
self.kv_operator = None
[docs]
def set_collect_max_quantities(self,
update_frequency=1,
collection_start_time=0.,
velocity_zero_height=None,
store_to_sww=True) -> Collect_max_quantities_operator:
"""Create (or return existing) Collect_max_quantities_operator on this domain.
Tracks running maxima of stage, depth, speed, and momentum magnitude
(||(uh, vh)||) over the simulation. Call once before domain.evolve().
Parameters
----------
update_frequency : int
Update maxima every this many timesteps (default 1).
collection_start_time : float
Only collect after this simulation time (default 0).
velocity_zero_height : float or None
Zero velocity below this depth; defaults to minimum_allowed_height.
store_to_sww : bool
If True (default), write running maxima to the SWW file every yield
step as centroid quantities max_stage_c, max_depth_c, max_speed_c,
max_uh_c.
Returns
-------
Collect_max_quantities_operator
"""
from anuga.operators.collect_max_quantities_operator import \
Collect_max_quantities_operator
if self.max_quantities_operator is None:
self.max_quantities_operator = Collect_max_quantities_operator(
self,
update_frequency=update_frequency,
collection_start_time=collection_start_time,
velocity_zero_height=velocity_zero_height,
store_to_sww=store_to_sww,
)
return self.max_quantities_operator
def set_beta(self, beta: float) -> None:
"""Shorthand to assign one constant value [0,2] to all limiters.
0 Corresponds to first order, where as larger values make use of
the second order scheme.
"""
self.beta_w = beta
self.beta_w_dry = beta
self.quantities['stage'].beta = beta
self.beta_uh = beta
self.beta_uh_dry = beta
self.quantities['xmomentum'].beta = beta
self.beta_vh = beta
self.beta_vh_dry = beta
self.quantities['ymomentum'].beta = beta
def set_betas(self, beta_w: float, beta_w_dry: float, beta_uh: float,
beta_uh_dry: float, beta_vh: float, beta_vh_dry: float) -> None:
"""Assign beta values in the range [0,2] to all limiters.
0 Corresponds to first order, where as larger values make use of
the second order scheme.
"""
self.beta_w = beta_w
self.beta_w_dry = beta_w_dry
self.quantities['stage'].beta = beta_w
self.beta_uh = beta_uh
self.beta_uh_dry = beta_uh_dry
self.quantities['xmomentum'].beta = beta_uh
self.beta_vh = beta_vh
self.beta_vh_dry = beta_vh_dry
self.quantities['ymomentum'].beta = beta_vh
def set_store_vertices_uniquely(self, flag: bool = True, reduction: Callable | None = None) -> None:
"""Decide whether vertex values should be stored uniquely as
computed in the model (True) or whether they should be reduced to one
value per vertex using self.reduction (False).
"""
# FIXME (Ole): how about using the word "continuous vertex values" or
# "continuous stage surface"
self.smooth = not flag
# Reduction operation for get_vertex_values
if reduction is None:
self.reduction = mean
#self.reduction = min #Looks better near steep slopes
def set_store_vertices_smoothly(self, flag: bool = True, reduction: Callable | None = None) -> None:
"""Decide whether vertex values should be stored smoothly (one value per vertex)
or uniquely as
computed in the model (False).
"""
# FIXME (Ole): how about using the word "continuous vertex values" or
# "continuous stage surface"
self.smooth = flag
# Reduction operation for get_vertex_values
if reduction is None:
self.reduction = mean
#self.reduction = min #Looks better near steep slopes
def set_minimum_storable_height(self, minimum_storable_height: float) -> None:
"""Set the minimum depth that will be written to an SWW file.
minimum_storable_height minimum allowed SWW depth is in meters
This is useful for removing thin water layers that seems to be caused
by friction creep.
"""
self.minimum_storable_height = minimum_storable_height
def get_minimum_storable_height(self) -> float:
return self.minimum_storable_height
def set_minimum_allowed_height(self, minimum_allowed_height: float) -> None:
"""Set minimum depth that will be recognised in the numerical scheme.
minimum_allowed_height minimum allowed depth in meters
The parameter H0 (Minimal height for flux computation) is also set by
this function.
"""
#FIXME (Ole): rename H0 to minimum_allowed_height_in_flux_computation
#FIXME (Ole): Maybe use histogram to identify isolated extreme speeds
#and deal with them adaptively similarly to how we used to use 1 order
#steps to recover.
self.minimum_allowed_height = minimum_allowed_height
self.H0 = minimum_allowed_height
def get_minimum_allowed_height(self) -> float:
return self.minimum_allowed_height
def set_maximum_allowed_speed(self, maximum_allowed_speed: float) -> None:
"""Set the maximum particle speed that is allowed in water shallower
than minimum_allowed_height.
maximum_allowed_speed
This is useful for controlling speeds in very thin layers of water and
at the same time allow some movement avoiding pooling of water.
"""
self.maximum_allowed_speed = maximum_allowed_speed
def set_points_file_block_line_size(self, points_file_block_line_size: int) -> None:
"""
"""
self.points_file_block_line_size = points_file_block_line_size
# FIXME: Probably obsolete in its curren form
def set_quantities_to_be_stored(self, q: dict[str, int] | list[str] | None) -> None:
"""Specify which quantities will be stored in the SWW file.
q must be either:
- a dictionary with quantity names
- a list of quantity names (for backwards compatibility)
- None
The format of the dictionary is as follows
quantity_name: flag where flag must be either 1 or 2.
If flag is 1, the quantity is considered static and will be
stored once at the beginning of the simulation in a 1D array.
If flag is 2, the quantity is considered time dependent and
it will be stored at each yieldstep by appending it to the
appropriate 2D array in the sww file.
If q is None, storage will be switched off altogether.
Once the simulation has started and thw sww file opened,
this function will have no effect.
The format, where q is a list of names is for backwards compatibility
only.
It will take the specified quantities to be time dependent and assume
'elevation' to be static regardless.
"""
if q is None:
self.quantities_to_be_stored = {}
self.store = False
return
# Check correctness
for quantity_name in q:
msg = ('Quantity %s is not a valid conserved quantity'
% quantity_name)
assert quantity_name in self.quantities, msg
assert isinstance(q, dict)
self.quantities_to_be_stored = q
def get_wet_elements(self, indices: list[int] | num.ndarray | None = None,
minimum_height: float | None = None) -> num.ndarray:
"""Return indices for elements where h > minimum_allowed_height
Optional argument:
indices is the set of element ids that the operation applies to.
Usage:
indices = get_wet_elements()
Note, centroid values are used for this operation
"""
# Water depth below which it is considered to be 0 in the model
# FIXME (Ole): Allow this to be specified as a keyword argument as well
from anuga.config import minimum_allowed_height
if minimum_height is None:
minimum_height = minimum_allowed_height
elevation = self.get_quantity('elevation').\
get_values(location='centroids', indices=indices)
stage = self.get_quantity('stage').\
get_values(location='centroids', indices=indices)
depth = stage - elevation
# Select indices for which depth > 0
wet_indices = num.compress(depth > minimum_height,
num.arange(len(depth)))
return wet_indices
def load_balance_statistics(self, minimum_height: float | None = None) -> dict:
"""Return load balance statistics for this domain (single-rank version).
For a parallel domain use :meth:`Parallel_domain.load_balance_statistics`
which gathers across all MPI ranks via Allgather. This serial version
returns a dict with length-1 arrays so the interface is identical.
Parameters
----------
minimum_height : float, optional
Depth threshold for "wet" classification. Defaults to
``anuga.config.minimum_allowed_height``.
Returns
-------
dict
Keys and shapes are the same as the parallel version::
n_full int[1] total triangle count
n_ghost int[1] 0 (no ghost triangles in serial)
n_wet_full int[1] wet triangle count
wet_fraction float[1] n_wet_full / n_full
ghost_fraction float[1] 0.0
wall_time float[1] total wall time since evolve() started
comm_time float[1] 0.0
reduce_wait_time float[1] 0.0
compute_time float[1] same as wall_time
imbalance_ratio float 1.0
wet_compute_corr float nan
"""
from time import time as walltime
from anuga.config import minimum_allowed_height as default_mah
if minimum_height is None:
minimum_height = default_mah
n_full = self.get_number_of_triangles()
stage_c = self.get_quantity('stage').centroid_values
elev_c = self.get_quantity('elevation').centroid_values
n_wet = int(num.sum((stage_c - elev_c) > minimum_height))
w_time = walltime() - self.evolve_start_walltime
return {
'n_full': num.array([n_full], dtype=int),
'n_ghost': num.array([0], dtype=int),
'n_wet_full': num.array([n_wet], dtype=int),
'wet_fraction': num.array([n_wet / n_full if n_full > 0 else 0.0]),
'ghost_fraction': num.array([0.0]),
'wall_time': num.array([w_time]),
'comm_time': num.array([0.0]),
'reduce_wait_time': num.array([0.0]),
'compute_time': num.array([w_time]),
'imbalance_ratio': 1.0,
'wet_compute_corr': float('nan'),
}
def print_load_balance_statistics(self, minimum_height: float | None = None) -> None:
"""Print a load balance summary to stdout.
For a single-process domain this just reports wet fraction and
triangle count. The parallel override prints a per-rank table.
Parameters
----------
minimum_height : float, optional
Passed through to :meth:`load_balance_statistics`.
"""
stats = self.load_balance_statistics(minimum_height=minimum_height)
n = stats['n_full'][0]
n_wet = stats['n_wet_full'][0]
wf = stats['wet_fraction'][0]
w_time = stats['wall_time'][0]
print(f"Load balance: triangles={n}, wet={n_wet} ({100*wf:.1f}%), "
f"wall_time={w_time:.3f}s")
def get_maximum_inundation_elevation(self, indices: list[int] | num.ndarray | None = None,
minimum_height: float | None = None) -> float:
"""Return highest elevation where h > 0
Optional argument:
indices is the set of element ids that the operation applies to.
minimum_height for testing h > minimum_height
Usage:
q = get_maximum_inundation_elevation()
Note, centroid values are used for this operation
"""
wet_elements = self.get_wet_elements(indices, minimum_height)
return self.get_quantity('elevation').\
get_maximum_value(indices=wet_elements)
def get_maximum_inundation_location(self, indices: list[int] | num.ndarray | None = None) -> tuple[float, float]:
"""Return location of highest elevation where h > 0
Optional argument:
indices is the set of element ids that the operation applies to.
Usage:
q = get_maximum_inundation_location()
Note, centroid values are used for this operation
"""
wet_elements = self.get_wet_elements(indices)
return self.get_quantity('elevation').\
get_maximum_location(indices=wet_elements)
def get_global_wet_element_count(self, indices: list[int] | num.ndarray | None = None,
minimum_height: float | None = None) -> int:
"""Return total number of wet elements across all MPI ranks.
Optional arguments:
indices: set of element ids that the operation applies to
minimum_height: threshold for considering an element wet
Usage:
count = get_global_wet_element_count()
Note: This performs MPI reduction, so all ranks get the same result.
"""
from anuga import numprocs
wet_indices = self.get_wet_elements(indices, minimum_height)
local_count = len(wet_indices)
if numprocs == 1:
return local_count
from mpi4py import MPI
global_count = MPI.COMM_WORLD.allreduce(local_count, op=MPI.SUM)
return global_count
def get_global_max_stage(self, indices: list[int] | num.ndarray | None = None) -> float:
"""Return maximum stage value across all MPI ranks.
Optional argument:
indices: set of element ids that the operation applies to
Usage:
max_stage = get_global_max_stage()
Note: This performs MPI reduction, so all ranks get the same result.
"""
from anuga import numprocs
stage = self.get_quantity('stage')
local_max = stage.get_maximum_value(indices)
if numprocs == 1:
return local_max
from mpi4py import MPI
global_max = MPI.COMM_WORLD.allreduce(local_max, op=MPI.MAX)
return global_max
def get_global_max_speed(self) -> float:
"""Return maximum speed across all MPI ranks.
Usage:
max_speed = get_global_max_speed()
Note: This performs MPI reduction, so all ranks get the same result.
"""
from anuga import numprocs
import numpy as num
if self.max_speed is None:
return 0.0
local_max = num.max(self.max_speed)
if numprocs == 1:
return local_max
from mpi4py import MPI
global_max = MPI.COMM_WORLD.allreduce(local_max, op=MPI.MAX)
return global_max
def get_water_volume(self) -> float:
from anuga import numprocs
# GPU path: compute volume directly on GPU (avoids expensive D2H sync)
if self.multiprocessor_mode == MULTIPROCESSOR_GPU and self.gpu_interface is not None:
from anuga.shallow_water.sw_domain_gpu_ext import compute_water_volume_gpu
volume = compute_water_volume_gpu(self.gpu_interface.gpu_dom)
elif not self.evolved_called:
Stage = self.quantities['stage']
Elev = self.quantities['elevation']
h_c = Stage.centroid_values - Elev.centroid_values
from anuga import Quantity
Height = Quantity(self)
Height.set_values(h_c, location='centroids')
volume = Height.get_integral()
elif self.get_using_discontinuous_elevation():
Height = self.quantities['height']
volume = Height.get_integral()
else:
Stage = self.quantities['stage']
Elev = self.quantities['elevation']
Height = Stage-Elev
volume = Height.get_integral()
if numprocs == 1:
self.volume_history.append(volume)
return volume
# Use MPI_Allreduce instead of manual gather-broadcast
from mpi4py import MPI
water_volume = MPI.COMM_WORLD.allreduce(volume, op=MPI.SUM)
self.volume_history.append(water_volume)
return water_volume
def get_boundary_flux_integral(self) -> float:
"""Compute the boundary flux integral.
Should work in parallel
"""
from anuga import numprocs
if not self.compute_fluxes_method=='DE':
msg='Boundary flux integral only supported for DE fluxes '+\
'(because computation of boundary_flux_sum is only implemented there)'
raise Exception(msg)
flux_integral = self.boundary_flux_integral.boundary_flux_integral[0]
if numprocs == 1:
return flux_integral
# Use MPI_Allreduce instead of manual gather-broadcast
from mpi4py import MPI
flux_integral = MPI.COMM_WORLD.allreduce(flux_integral, op=MPI.SUM)
return flux_integral
def get_fractional_step_volume_integral(self) -> float:
"""Compute the integrated flows from fractional steps.
This requires that the fractional step operators update the fractional_step_volume_integral.
Should work in parallel
"""
from anuga import numprocs
flux_integral = self.fractional_step_volume_integral
if numprocs == 1:
return flux_integral
# Use MPI_Allreduce instead of manual gather-broadcast
from mpi4py import MPI
flux_integral = MPI.COMM_WORLD.allreduce(flux_integral, op=MPI.SUM)
return flux_integral
def get_flow_through_cross_section(self, polyline: ArrayLike, verbose: bool = False) -> float:
"""Get the total flow through an arbitrary poly line.
This is a run-time equivalent of the function with same name
in sww_interrogate.py
Input:
polyline: Representation of desired cross section - it may contain
multiple sections allowing for complex shapes. Assume
absolute UTM coordinates.
Format [[x0, y0], [x1, y1], ...]
Output:
Q: Total flow [m^3/s] across given segments.
"""
cross_section = Cross_section(self, polyline, verbose)
return cross_section.get_flow_through_cross_section()
def get_energy_through_cross_section(self, polyline: ArrayLike,
kind: str = 'total',
verbose: bool = False) -> float:
"""Obtain average energy head [m] across specified cross section.
Inputs:
polyline: Representation of desired cross section - it may contain
multiple sections allowing for complex shapes. Assume
absolute UTM coordinates.
Format [[x0, y0], [x1, y1], ...]
kind: Select which energy to compute.
Options are 'specific' and 'total' (default)
Output:
E: Average energy [m] across given segments for all stored times.
The average velocity is computed for each triangle intersected by
the polyline and averaged weighted by segment lengths.
The typical usage of this function would be to get average energy of
flow in a channel, and the polyline would then be a cross section
perpendicular to the flow.
#FIXME (Ole) - need name for this energy reflecting that its dimension
is [m].
"""
cross_section = Cross_section(self, polyline, verbose)
return cross_section.get_energy_through_cross_section(kind)
def check_integrity(self) -> None:
""" Run integrity checks on shallow water domain. """
Generic_Domain.check_integrity(self)
#Check that we are solving the shallow water wave equation
msg = 'First conserved quantity must be "stage"'
assert self.conserved_quantities[0] == 'stage', msg
msg = 'Second conserved quantity must be "xmomentum"'
assert self.conserved_quantities[1] == 'xmomentum', msg
msg = 'Third conserved quantity must be "ymomentum"'
assert self.conserved_quantities[2] == 'ymomentum', msg
#@profile
def compute_fluxes(self):
"""Compute fluxes and timestep suitable for all volumes in domain.
Compute total flux for each conserved quantity using "flux_function"
Fluxes across each edge are scaled by edgelengths and summed up
Resulting flux is then scaled by area and stored in
explicit_update for each of the three conserved quantities
stage, xmomentum and ymomentum
The maximal allowable speed computed by the flux_function for each volume
is converted to a timestep that must not be exceeded. The minimum of
those is computed as the next overall timestep.
Post conditions:
domain.explicit_update is reset to computed flux values
domain.flux_timestep is set to the largest step satisfying all volumes.
This wrapper calls the underlying C version of compute fluxes
"""
# Using Gareth Davies discontinuous elevation scheme
# Flux calculation and gravity incorporated in same
# procedure
nvtxRangePush("compute_fluxes")
self._ensure_gpu_interface()
# Choose the correct extension module
if self.multiprocessor_mode == MULTIPROCESSOR_OPENMP:
from .sw_domain_openmp_ext import compute_fluxes_ext_central
elif self.multiprocessor_mode == MULTIPROCESSOR_GPU:
# change over to cuda routines as developed
# from .sw_domain_simd_ext import compute_fluxes_ext_central
# FIXME SR: 2023_10_16 currently compute_fluxes and distribute together
# is producing incorrect results, but work separately!
compute_fluxes_ext_central = self.gpu_interface.compute_fluxes_ext_central_kernel
else:
raise Exception('Not implemented')
timestep = self.evolve_max_timestep
self.flux_timestep = compute_fluxes_ext_central(self, timestep)
nvtxRangePop()
def update_boundary(self):
"""Go through list of boundary objects and update boundary values
for all conserved quantities on boundary.
It is assumed that the ordering of conserved quantities is
consistent between the domain and the boundary object, i.e.
the jth element of vector q must correspond to the jth conserved
quantity in domain.
"""
nvtxRangePush('update_boundary')
# GPU mode - use GPU boundary functions if all boundaries are GPU-supported
if self.multiprocessor_mode == MULTIPROCESSOR_GPU and self.gpu_interface is not None:
from anuga.shallow_water.sw_domain_gpu_ext import (
evaluate_reflective_boundary_gpu,
evaluate_dirichlet_boundary_gpu,
evaluate_transmissive_boundary_gpu,
set_transmissive_n_zero_t_stage,
evaluate_transmissive_n_zero_t_boundary_gpu,
set_time_boundary_values,
evaluate_time_boundary_gpu,
set_file_boundary_values_from_domain,
evaluate_file_boundary_gpu,
set_absorbing_wave_value,
evaluate_absorbing_wave_boundary_gpu,
set_characteristic_wave_value,
evaluate_characteristic_wave_boundary_gpu,
set_flather_value,
evaluate_flather_boundary_gpu,
boundary_edge_sync,
sync_boundary_values,
init_boundary_edge_sync,
)
import numpy as np
gpu_dom = self.gpu_interface.gpu_dom
# Lazily initialize GPU boundary info
GPU_BOUNDARY_TYPES = {'Reflective_boundary', 'Dirichlet_boundary', 'Transmissive_boundary',
'Transmissive_n_momentum_zero_t_momentum_set_stage_boundary',
'Time_boundary', 'File_boundary', 'Field_boundary',
'Absorbing_wave_boundary', 'Characteristic_wave_boundary',
'Flather_external_stage_zero_velocity_boundary'}
if not hasattr(self, '_gpu_boundary_info_initialized'):
self._gpu_cpu_tags = []
self._gpu_all_on_gpu = True
self._gpu_transmissive_n_zero_t_boundaries = []
self._gpu_time_boundaries = []
self._gpu_absorbing_wave_boundaries = []
self._gpu_characteristic_wave_boundaries = []
self._gpu_flather_boundaries = []
for tag, B in self.boundary_map.items():
if B is not None:
btype = B.__class__.__name__
if btype not in GPU_BOUNDARY_TYPES:
self._gpu_cpu_tags.append(tag)
self._gpu_all_on_gpu = False
elif btype == 'Transmissive_n_momentum_zero_t_momentum_set_stage_boundary':
self._gpu_transmissive_n_zero_t_boundaries.append(B)
elif btype == 'Time_boundary':
self._gpu_time_boundaries.append(B)
elif btype == 'Absorbing_wave_boundary':
self._gpu_absorbing_wave_boundaries.append(B)
elif btype == 'Characteristic_wave_boundary':
self._gpu_characteristic_wave_boundaries.append(B)
elif btype == 'Flather_external_stage_zero_velocity_boundary':
self._gpu_flather_boundaries.append(B)
# Set up boundary edge sync if we have ANY CPU-evaluated boundaries
if not self._gpu_all_on_gpu:
boundary_cell_ids = np.unique(self.boundary_cells).astype(np.intc)
init_boundary_edge_sync(gpu_dom, boundary_cell_ids)
self._gpu_boundary_info_initialized = True
if self._gpu_all_on_gpu:
# All boundaries are GPU-supported - evaluate entirely on GPU
evaluate_reflective_boundary_gpu(gpu_dom)
evaluate_dirichlet_boundary_gpu(gpu_dom)
evaluate_transmissive_boundary_gpu(gpu_dom)
# Handle transmissive_n_zero_t boundaries (need Python function call for stage)
for B in self._gpu_transmissive_n_zero_t_boundaries:
stage_val = B.get_boundary_values()
try:
stage_val = float(stage_val)
except (TypeError, ValueError):
stage_val = float(stage_val[0])
set_transmissive_n_zero_t_stage(gpu_dom, stage_val)
evaluate_transmissive_n_zero_t_boundary_gpu(gpu_dom)
# Handle Time_boundary (need Python function call for values)
for B in self._gpu_time_boundaries:
q = B.get_boundary_values()
set_time_boundary_values(gpu_dom, float(q[0]), float(q[1]), float(q[2]))
evaluate_time_boundary_gpu(gpu_dom)
# Handle File_boundary / Field_boundary (per-edge values from SWW interpolation)
set_file_boundary_values_from_domain(gpu_dom, self)
evaluate_file_boundary_gpu(gpu_dom)
# Handle Absorbing_wave_boundary (scalar wave stage updated each timestep)
for B in self._gpu_absorbing_wave_boundaries:
value = B.get_boundary_values()
try:
wave_val = float(value)
except (TypeError, ValueError):
wave_val = float(value[0])
set_absorbing_wave_value(gpu_dom, wave_val)
evaluate_absorbing_wave_boundary_gpu(gpu_dom)
# Handle Characteristic_wave_boundary (scalar perturbation updated each timestep)
for B in self._gpu_characteristic_wave_boundaries:
value = B.get_boundary_values()
try:
perturb = float(value)
except (TypeError, ValueError):
perturb = float(value[0])
set_characteristic_wave_value(gpu_dom, perturb)
evaluate_characteristic_wave_boundary_gpu(gpu_dom)
for B in self._gpu_flather_boundaries:
value = B.get_boundary_values()
try:
stage_val = float(value)
except (TypeError, ValueError):
stage_val = float(value[0])
set_flather_value(gpu_dom, stage_val)
evaluate_flather_boundary_gpu(gpu_dom)
else:
# Some boundaries need CPU - sync edge values, evaluate on CPU, sync back
boundary_edge_sync(gpu_dom)
for tag in self.tag_boundary_cells:
B = self.boundary_map[tag]
if B is not None:
B.evaluate_segment(self, self.tag_boundary_cells[tag])
sync_boundary_values(gpu_dom)
else:
# CPU mode - evaluate boundaries on CPU
for tag in self.tag_boundary_cells:
B = self.boundary_map[tag]
if B is None:
continue
boundary_segment_edges = self.tag_boundary_cells[tag]
B.evaluate_segment(self, boundary_segment_edges)
nvtxRangePop()
def compute_forcing_terms(self):
"""If there are any forcing functions driving the system
they should be defined in Domain subclass and appended to
the list self.forcing_terms
"""
# The parameter self.flux_timestep should be updated
# by the forcing_terms to ensure stability but it isn't
# currently.
nvtxRangePush('compute_forcing_terms')
self._ensure_gpu_interface()
if self.multiprocessor_mode == MULTIPROCESSOR_GPU:
# GPU mode: use GPU Manning friction, fall back to CPU for others.
# Forcing terms may be plain functions (with __name__) or callable
# operator objects (Rainfall, Wind_stress, ...) which have none.
for f in self.forcing_terms:
if getattr(f, '__name__', None) == 'manning_friction_semi_implicit':
self.gpu_interface.manning_friction_kernel(self)
else:
# Other forcing terms (rain, etc.) run on CPU
# Need to sync from device, run, sync back
self.gpu_interface.sync_from_device()
f(self)
self.gpu_interface.sync_to_device()
else:
for f in self.forcing_terms:
f(self)
nvtxRangePop()
def _warn_unsupported_mode2_forcing(self):
"""Warn (once) if forcing terms other than Manning friction are present
in mode 2.
The mode-2 C step loop applies forcing in C and only handles Manning
friction, so Python ``forcing_terms`` (e.g. the Rainfall / Wind_stress /
Barometric_pressure forcing-function classes) are NOT applied — they are
silently skipped. Use the equivalent operators instead. This converts
that silent correctness gap into a loud, actionable message.
"""
if getattr(self, '_warned_mode2_forcing', False):
return
self._warned_mode2_forcing = True
ignored = [getattr(f, '__name__', None) or f.__class__.__name__
for f in self.forcing_terms
if getattr(f, '__name__', None) != 'manning_friction_semi_implicit']
if ignored:
import warnings
warnings.warn(
"multiprocessor_mode=2 ('unified') applies forcing in C and only "
"handles Manning friction; these Python forcing terms are NOT "
f"applied and are silently skipped: {ignored}. Use the equivalent "
"operators instead — Rate_operator.rainfall()/inflow(), "
"Wind_stress_operator, Barometric_pressure_operator.",
stacklevel=2)
def set_boundary(self, boundary_map):
"""Associate boundary objects with tagged segments (see base class).
Mode-2 ('unified') captures a device-side boundary classification and
the per-edge Dirichlet/Time/File/... mappings when the GPU interface is
first built. A later set_boundary() — e.g. switching a tag from
Reflective to Dirichlet partway through a simulation — must invalidate
those caches, otherwise the stale boundary keeps being applied on the
device and the new condition is silently ignored.
"""
super().set_boundary(boundary_map)
if self.multiprocessor_mode == MULTIPROCESSOR_GPU and self.gpu_interface is not None:
# Preserve current conserved-quantity state before tearing the
# interface down. In CPU-no-offload mode host arrays alias the
# device; the sync makes this correct for the offload case too.
try:
self.gpu_interface.sync_from_device()
except Exception:
pass
self.gpu_interface = None
if hasattr(self, '_gpu_boundary_info_initialized'):
del self._gpu_boundary_info_initialized
# Rebuild immediately from the new boundary map so subsequent
# mode-2 steps see a valid interface.
self._ensure_gpu_interface()
def distribute_to_vertices_and_edges(self, distribute_to_vertices=True):
""" extrapolate centroid values to vertices and edges"""
nvtxRangePush('distribute_to_vertices_and_edges')
# Build a deferred mode-2 device interface on demand (a default-'unified'
# domain may reach here, e.g. from a test, without going through evolve()).
self._ensure_gpu_interface()
if self.multiprocessor_mode == MULTIPROCESSOR_GPU and self.gpu_interface is not None:
# Output path (yieldsteps / direct reads). This method is NOT on the
# mode-2 stepping path (the C RK loop extrapolates on the device
# itself); it exists to make vertex/edge values readable from Python
# (SWW writer, inundation queries, tests). The gpu edge kernel does
# NOT compute vertex values, so sync centroids from the device and
# run the host (openmp) protect + extrapolate, which produces edges
# AND vertices. The host protect does not affect the device
# trajectory — the next C step re-protects on the device.
self.gpu_interface.sync_from_device()
from .sw_domain_openmp_ext import (protect_new,
extrapolate_second_order_edge_sw)
protect_new(self)
extrapolate_second_order_edge_sw(self, distribute_to_vertices=distribute_to_vertices)
nvtxRangePop()
return
# Do protection step
self.protect_against_infinitesimal_and_negative_heights()
# Do extrapolation step
# Choose the correct extension module
if self.multiprocessor_mode == MULTIPROCESSOR_OPENMP:
from .sw_domain_openmp_ext import extrapolate_second_order_edge_sw
else:
raise Exception('Not implemented')
extrapolate_second_order_edge_sw(self, distribute_to_vertices=distribute_to_vertices)
nvtxRangePop()
def distribute_to_edges(self):
""" extrapolate centroid values edges"""
nvtxRangePush('distribute_to_edges')
# Do protection step
self.protect_against_infinitesimal_and_negative_heights()
# Do extrapolation step
# Choose the correct extension module
if self.multiprocessor_mode == MULTIPROCESSOR_OPENMP:
from .sw_domain_openmp_ext import distribute_to_edges as extrapolate_second_order_edge_sw
extrapolate_second_order_edge_sw(self)
elif self.multiprocessor_mode == MULTIPROCESSOR_GPU:
# change over to cuda routines as developed
#from .sw_domain_simd_ext import extrapolate_second_order_edge_sw
extrapolate_second_order_edge_sw = self.gpu_interface.extrapolate_second_order_edge_sw_kernel
extrapolate_second_order_edge_sw(self)
else:
raise Exception('Not implemented')
nvtxRangePop()
def distribute_edges_to_vertices(self):
"""Distribute edge values to vertices.
This is a wrapper for the C implementation of the distribution
from edges to vertices.
"""
nvtxRangePush('distribute_edges_to_vertices')
if self.multiprocessor_mode == MULTIPROCESSOR_OPENMP:
# Using OpenMP extension
from .sw_domain_openmp_ext import distribute_edges_to_vertices as distribute_edges_to_vertices_ext
elif self.multiprocessor_mode == MULTIPROCESSOR_GPU:
# Using cupy extension
# FIXME SR: Not implemented yet so use OpenMP version
from .sw_domain_openmp_ext import distribute_edges_to_vertices as distribute_edges_to_vertices_ext
# distribute_edges_to_vertices_ext = self.gpu_interface.distribute_edges_to_vertices_kernel
else:
raise Exception('Not implemented')
distribute_edges_to_vertices_ext(self)
nvtxRangePop()
def update_timestep(self, yieldstep, finaltime):
"""Calculate the next timestep to take
"""
# Protect against degenerate timesteps arising from isolated
# triangles
self.apply_protection_against_isolated_degenerate_timesteps()
# disable variable timestepping
if self.fixed_flux_timestep is not None:
self.flux_timestep = self.fixed_flux_timestep
timestep = self.fixed_flux_timestep
else:
# self.timestep is calculated from speed of characteristics
# Apply CFL condition here
timestep = min(self.CFL * self.flux_timestep, self.evolve_max_timestep)
# Record maximal and minimal values of timestep for reporting
self.recorded_max_timestep = max(timestep, self.recorded_max_timestep)
self.recorded_min_timestep = min(timestep, self.recorded_min_timestep)
# Stop if degenerate timestep
if timestep < self.evolve_min_timestep:
msg = 'WARNING: Too small timestep %.16f reached ' \
% timestep
msg += 'even after %d steps of 1 order scheme' \
% self.max_smallsteps
log.info(msg)
timestep = self.evolve_min_timestep # Try enforce min_step
stats = self.timestepping_statistics(track_speeds=True)
log.info(stats)
raise Exception(msg)
# NOTE: Now timestep is redefined. This can lead to a timestep
# being smaller than the self.recorded_min_timestep, which
# confused me (GD).
# The behaviour is good though, since then the
# recorded_min_timestep reflects the mathematical constraints on
# the timestep, EXCEPT the constraint that we yield at the
# required time. Otherwise we would often have very small
# recorded_min_timesteps simply because of we have to yield at a
# given time
# Ensure that final time is not exceeded
if self.relative_finaltime is not None and self.relative_time + timestep > self.relative_finaltime:
timestep = self.relative_finaltime - self.relative_time
# Ensure that model time is aligned with yieldsteps
if self.relative_time + timestep > self.relative_yieldtime:
timestep = self.relative_yieldtime - self.relative_time
self.timestep = timestep
def protect_against_infinitesimal_and_negative_heights(self):
""" Clean up the stage and momentum values to ensure non-negative heights
"""
# nvtxRangePush('protect_new')
# Choose the correct extension module
if self.multiprocessor_mode == MULTIPROCESSOR_OPENMP:
from .sw_domain_openmp_ext import protect_new
elif self.multiprocessor_mode == MULTIPROCESSOR_GPU:
# change over to cuda routines as developed
# # from .sw_domain_simd_ext import protect_new
#from .sw_domain_openmp_ext import protect_new
protect_new = self.gpu_interface.protect_against_infinitesimal_and_negative_heights_kernel
else:
raise Exception('Not implemented')
mass_error = protect_new(self)
# nvtxRangePop()
if mass_error > 0.0 and self.verbose :
#print('Cumulative mass protection: ' + str(mass_error) + ' m^3 ')
# From https://stackoverflow.com/questions/22397261/cant-convert-float-object-to-str-implicitly
print('Cumulative mass protection: {0} m^3'.format(mass_error))
def apply_protection_against_isolated_degenerate_timesteps(self):
if self.protect_against_isolated_degenerate_timesteps is False:
return
# FIXME (Ole): Make this configurable
if num.max(self.max_speed) < 10.0:
return
# Setup 10 bins for speed histogram
from anuga.utilities.numerical_tools import histogram, create_bins
bins = create_bins(self.max_speed, 10)
hist = histogram(self.max_speed, bins)
# Look for characteristic signature
if len(hist) > 1 and hist[-1] > 0 and \
hist[4] == hist[5] == hist[6] == hist[7] == hist[8] == 0:
# Danger of isolated degenerate triangles
# Find triangles in last bin
# FIXME - speed up using numeric package
d = 0
for i in range(self.number_of_triangles):
if self.max_speed[i] > bins[-1]:
msg = 'Time=%f: Ignoring isolated high ' % self.get_time()
msg += 'speed triangle '
msg += '#%d of %d with max speed = %f' \
% (i, self.number_of_triangles, self.max_speed[i])
self.get_quantity('xmomentum').set_values(0.0, indices=[i])
self.get_quantity('ymomentum').set_values(0.0, indices=[i])
self.max_speed[i] = 0.0
d += 1
def update_conserved_quantities(self):
"""Update vectors of conserved quantities using previously
computed fluxes and specified forcing functions.
"""
nvtxRangePush('update_conserved_quantities')
timestep = self.timestep
# Update height based on discontinuous elevation
assert self.get_using_discontinuous_elevation()
if self.multiprocessor_mode == MULTIPROCESSOR_OPENMP:
from .sw_domain_openmp_ext import update_conserved_quantities
elif self.multiprocessor_mode == MULTIPROCESSOR_GPU:
update_conserved_quantities = self.gpu_interface.update_conserved_quantities_kernel
else:
raise Exception('Not implemented')
num_negative_ids = update_conserved_quantities(self, timestep)
if num_negative_ids > 0:
# FIXME: This only warns the first time -- maybe we should warn whenever loss occurs?
import warnings
msg = f'{num_negative_ids} negative cells being set to zero depth, possible loss of conservation. \n' +\
'Consider using domain.report_water_volume_statistics() to check the extent of the problem'
warnings.warn(msg)
nvtxRangePop()
def update_other_quantities(self):
""" There may be a need to calculate some of the other quantities
based on the new values of conserved quantities
"""
return
def update_centroids_of_velocities_and_height(self):
"""Calculate the centroid values of velocities and height based
on the values of the quantities stage and x and y momentum
Assumes that stage and momentum are up to date
Useful for kinematic viscosity calculations
"""
# For shallow water we need to update height xvelocity and yvelocity
#Shortcuts
W = self.quantities['stage']
UH = self.quantities['xmomentum']
VH = self.quantities['ymomentum']
H = self.quantities['height']
Z = self.quantities['elevation']
U = self.quantities['xvelocity']
V = self.quantities['yvelocity']
#print num.min(W.centroid_values)
# Make sure boundary values of conserved quantites
# are consistent with value of functions at centroids
Z.set_boundary_values_from_edges()
# FIXME(Ole): Why are these not necessary?
#W.set_boundary_values_from_edges()
#UH.set_boundary_values_from_edges()
#VH.set_boundary_values_from_edges()
#Aliases
w_C = W.centroid_values
z_C = Z.centroid_values
uh_C = UH.centroid_values
vh_C = VH.centroid_values
u_C = U.centroid_values
v_C = V.centroid_values
h_C = H.centroid_values
w_B = W.boundary_values
z_B = Z.boundary_values
uh_B = UH.boundary_values
vh_B = VH.boundary_values
u_B = U.boundary_values
v_B = V.boundary_values
h_B = H.boundary_values
h_C[:] = w_C-z_C
h_C[:] = num.where(h_C >= 0, h_C , 0.0)
h_B[:] = w_B-z_B
h_B[:] = num.where(h_B >=0, h_B, 0.0)
# Update height values
#H.set_values( num.where(W.centroid_values-Z.centroid_values>=0,
# W.centroid_values-Z.centroid_values, 0.0), location='centroids')
#H.set_boundary_values( num.where(W.boundary_values-Z.boundary_values>=0,
# W.boundary_values-Z.boundary_values, 0.0))
#assert num.min(h_C) >= 0
#assert num.min(h_B) >= 0
H0 = 1.0e-8
#U.set_values(uh_C/(h_C + H0/h_C), location='centroids')
#V.set_values(vh_C/(h_C + H0/h_C), location='centroids')
factor = h_C/(h_C*h_C + H0)
u_C[:] = uh_C*factor
v_C[:] = vh_C*factor
#U.set_boundary_values(uh_B/(h_B + H0/h_B))
#V.set_boundary_values(vh_B/(h_B + H0/h_B))
factor = h_B/(h_B*h_B + H0)
u_B[:] = uh_B*factor
v_B[:] = vh_B*factor
def update_centroids_of_momentum_from_velocity(self):
"""
Calculate the centroid value of x and y momentum from height and velocities.
This method computes the centroid values of x and y momentum (xmomentum and
ymomentum) by multiplying the centroid velocities by the centroid height values.
The method assumes that the centroids of height and velocities are already
up to date.
This is particularly useful for kinematic viscosity calculations where momentum
values at cell centroids are required.
The method updates:
- xmomentum.centroid_values: product of xvelocity and height at centroids
- ymomentum.centroid_values: product of yvelocity and height at centroids
After updating centroid values, the method distributes these values to vertices
and edges via distribute_to_vertices_and_edges().
Notes
-----
This method modifies the centroid_values arrays in-place for both xmomentum
and ymomentum quantities.
See Also
--------
distribute_to_vertices_and_edges : Distribute centroid values to vertices and edges
"""
# For shallow water we need to update height xvelocity and yvelocity
#Shortcuts
UH = self.quantities['xmomentum']
VH = self.quantities['ymomentum']
H = self.quantities['height']
Z = self.quantities['elevation']
U = self.quantities['xvelocity']
V = self.quantities['yvelocity']
#Arrays
u_C = U.centroid_values
v_C = V.centroid_values
uh_C = UH.centroid_values
vh_C = VH.centroid_values
h_C = H.centroid_values
u_B = U.boundary_values
v_B = V.boundary_values
uh_B = UH.boundary_values
vh_B = VH.boundary_values
h_B = H.boundary_values
uh_C[:] = u_C*h_C
vh_C[:] = v_C*h_C
#UH.set_values(u_C*h_C , location='centroids')
#VH.set_values(v_C*h_C , location='centroids')
self.distribute_to_vertices_and_edges()
def evolve(self,
yieldstep: float | None = None,
outputstep: float | None = None,
finaltime: float | DateTime | None = None,
duration: float | None = None,
skip_initial_step: bool = False) -> Iterator[float]:
"""Evolve method from Domain class.
Parameters
----------
yieldstep : float, optional
Yield every yieldstep time period
outputstep : float, optional
Output to sww file every outputstep time period. outputstep should be
an integer multiple of yieldstep.
finaltime : float or datetime, optional
Evolve until finaltime (can be a float in seconds or a datetime object)
duration : float, optional
Evolve for a time of length duration (seconds)
skip_initial_step : bool, optional
Can be used to restart a simulation (not often used).
Notes
-----
If outputstep is None, the output to sww file happens every yieldstep.
If yieldstep is None then simply evolve to finaltime or for a duration.
"""
# Call check integrity here rather than from user scripts
# self.check_integrity()
from time import time as walltime
self.evolve_start_walltime = walltime()
self.last_walltime = self.evolve_start_walltime
from datetime import datetime
if finaltime is not None:
if isinstance(finaltime, datetime):
if finaltime.tzinfo is None:
dt = finaltime.replace(tzinfo=self.timezone)
else:
dt = finaltime
finaltime = dt.timestamp()
else:
finaltime = float(finaltime)
if outputstep is None:
outputstep = yieldstep
if yieldstep is None:
self.output_frequency = 1
else:
msg = f'outputstep ({outputstep}) should be an integer multiple of yieldstep ({yieldstep})'
output_frequency = outputstep/yieldstep
assert float(output_frequency).is_integer(), msg
self.output_frequency = int(output_frequency)
msg = 'Attribute self.beta_w must be in the interval [0, 2]'
assert 0 <= self.beta_w <= 2.0, msg
# Build the mode-2 device interface lazily if it was deferred (a
# default-'unified' domain constructed before boundaries were set). Must
# happen before distribute_to_vertices_and_edges(), which uses the
# interface in mode 2.
self._ensure_gpu_interface()
# Initial update of vertex and edge values before any STORAGE
# and or visualisation.
# This is done again in the initialisation of the Generic_Domain
# evolve loop but we do it here to ensure the values are ok for storage.
self.distribute_to_vertices_and_edges()
if self.store is True and (self.get_relative_time() == 0.0 or self.evolved_called is False):
self.initialise_storage()
# Eagerly run the mode-2 fractional-step setup (Boyd culvert registration
# and CPU-only-operator detection) so its log lines print before the first
# yielded step instead of after the t=0 yield. Cached, so the first
# apply_fractional_steps() does not repeat it.
if self.multiprocessor_mode == MULTIPROCESSOR_GPU and self.gpu_interface is not None:
self._has_cpu_only_fractional_operators()
self._warn_unsupported_mode2_forcing()
#nvtx marker
nvtxRangePush('_evolve_base')
# Call basic machinery from parent class
for t in self._evolve_base(yieldstep=yieldstep,
finaltime=finaltime, duration=duration,
skip_initial_step=skip_initial_step):
walltime = time.time()
#print t , self.get_time()
# Store model data, e.g. for subsequent visualisation
if self.store:
if self.yieldstep_counter%self.output_frequency == 0:
self.store_timestep()
if self.checkpoint:
save_checkpoint=False
if self.checkpoint_step == 0:
if rank() == 0:
if walltime - self.walltime_prev > self.checkpoint_time:
save_checkpoint = True
for cpu in range(size()):
if cpu != rank():
send(save_checkpoint, cpu)
else:
save_checkpoint = receive(0)
elif self.yieldstep_counter%self.checkpoint_step == 0:
save_checkpoint = True
if save_checkpoint:
pickle_name = os.path.join(self.checkpoint_dir,self.get_name())+'_'+str(self.get_time())+'.pickle'
pickle.dump(self, open(pickle_name, 'wb'))
barrier()
self.walltime_prev = time.time()
#print 'Stored Checkpoint File '+pickle_name
# Pass control on to outer loop for more specific actions
yield(t)
self.yieldstep_counter += 1
#nvtx marker
nvtxRangePop()
def initialise_storage(self) -> None:
"""Create and initialise self.writer object for storing data.
Also, save x,y and bed elevation
"""
nvtxRangePush('SWW_file')
# Initialise writer
self.writer = SWW_file(self)
# Store vertices and connectivity
self.writer.store_connectivity()
nvtxRangePop()
def store_timestep(self) -> None:
"""Store time dependent quantities and time.
Precondition:
self.writer has been initialised
"""
nvtxRangePush('store_timestep')
self.writer.store_timestep()
nvtxRangePop()
def sww_merge(self, *args, **kwargs) -> None:
"""Dummy function for sequential algorithms where the sww produced is the final products.
For parallel runs, a similarly named routine in parallel_shallow_water will merge all the
sub domain sww files into a global sww file
:param bool verbose: Flag to produce more output
:param bool delete_old: Flag to delete sub domain sww files after
creating global sww file
"""
pass
def evolve_one_euler_step(self, yieldstep, finaltime):
"""One Euler Time Step
Q^{n+1} = E(h) Q^n
Does not assume that centroid values have been extrapolated to
vertices and edges
"""
# GPU mode: dispatch to C Euler loop (eliminates Python round-trip overhead)
if self.multiprocessor_mode == MULTIPROCESSOR_GPU and self.gpu_interface is not None:
self._evolve_one_euler_step_c(yieldstep, finaltime)
return
# From centroid values calculate edge
self.distribute_to_vertices_and_edges(distribute_to_vertices=False)
# Apply boundary conditions
self.update_boundary()
# Compute fluxes across each element edge
# In MPI parallel mode this involves an allreduce to find global minimal timestep
self.compute_fluxes()
# Compute forcing terms (friction)
self.compute_forcing_terms()
# Update timestep to fit yieldstep and finaltime
self.update_timestep(yieldstep, finaltime)
# Update conserved quantities
self.update_conserved_quantities()
def evolve_one_rk2_step(self, yieldstep, finaltime):
"""One 2nd order RK timestep
Q^{n+1} = 0.5 Q^n + 0.5 E(h)^2 Q^n
Does not assume that centroid values have been extrapolated to
vertices and edges
"""
# GPU mode: use C RK loop (faster) or Python-orchestrated GPU loop
if self.multiprocessor_mode == MULTIPROCESSOR_GPU and self.gpu_interface is not None:
if self.use_c_rk_loop:
self._evolve_one_rk2_step_c(yieldstep, finaltime)
else:
self._evolve_one_rk2_step_gpu(yieldstep, finaltime)
return
# Save initial initial conserved quantities values
self.backup_conserved_quantities() # has C, ported to GPU
#==========================================
# First euler step
#==========================================
# From centroid values calculate edge values
self.distribute_to_vertices_and_edges(distribute_to_vertices=False) # has C, ported to GPU
# Apply boundary conditions
self.update_boundary() # has C, ported to GPU
# Compute fluxes across each element edge
# In MPI parallel mode this involves an allreduce to find global minimal timestep
self.compute_fluxes()
# Compute forcing terms (friction)
self.compute_forcing_terms()
# Update timestep to fit yieldstep and finaltime
self.update_timestep(yieldstep, finaltime) #needs C
# Update centroid values of conserved quantities
self.update_conserved_quantities() # has C, ported to GPU
#===========================
# End of first euler step
#===========================
# Update time
self.set_relative_time(self.get_relative_time() + self.timestep) # needs C
# Update ghosts
if self.ghost_layer_width < 4:
self.update_ghosts() # needs C
#=========================================
# Second Euler step using the same timestep
# calculated in the first step. Might lead to
# stability problems but we have not seen any
# example.
#=========================================
# Update edge values
self.distribute_to_vertices_and_edges(distribute_to_vertices=False)
# Update boundary values
self.update_boundary()
# Compute fluxes across each element edge
# In MPI parallel mode this involves an allreduce to find global minimal timestep
self.compute_fluxes()
# Compute forcing terms (friction)
self.compute_forcing_terms()
# Update conserved quantities using timestep from first step
self.update_conserved_quantities()
#========================================
# Combine initial and final values
# of conserved quantities and cleanup
#========================================
# Combine steps
self.saxpy_conserved_quantities(0.5, 0.5) # has C, not ported
def evolve_one_ader2_step(self, yieldstep, finaltime):
"""One ADER-2 timestep using the local Cauchy-Kovalewski predictor.
Q^{n+1} = Q^n + dt * R(Q^{n+1/2})
Uses the fused edge predictor: edge values are shifted to Q^{n+1/2}
in-place while centroid values remain at Q^n, eliminating the second
extrapolation pass and the backup/saxpy restore pattern.
Single-flux-call variant: the previous step's CFL timestep is reused
for the predictor half-advance. The first step bootstraps with dt=0
(Euler) to establish the initial CFL timestep.
Cost: 1 flux call + 1 extrapolation + 1 edge C-K predictor (after step 1).
Accuracy: 2nd-order in space and time.
"""
# GPU mode: use C loop (faster) or Python-orchestrated GPU loop
if self.multiprocessor_mode == MULTIPROCESSOR_GPU and self.gpu_interface is not None:
if self.use_c_rk_loop:
self._evolve_one_ader2_step_c(yieldstep, finaltime)
else:
self._evolve_one_ader2_step_gpu(yieldstep, finaltime)
return
from .sw_domain_openmp_ext import ader_ck_predictor_edge
# Bootstrap: prev_dt=0 on first call → Euler step to establish CFL dt
if not hasattr(self, '_ader2_prev_dt'):
self._ader2_prev_dt = 0.0
prev_dt = self._ader2_prev_dt
# Extrapolate Q^n centroids → edges (1 pass; centroids untouched)
self.distribute_to_vertices_and_edges(distribute_to_vertices=False)
self.update_boundary()
if prev_dt > 0.0:
# Fused edge predictor: shift edge values to Q^{n+1/2}, Q^n in centroids
ader_ck_predictor_edge(self, prev_dt * 0.5)
# Re-apply boundary conditions to boundary edges
self.update_boundary()
# Single flux call from Q^{n+1/2} edge values (or Q^n on bootstrap step)
self.compute_fluxes()
self.compute_forcing_terms()
# Clip to yieldstep / finaltime / evolve_max_timestep.
# In MPI parallel mode update_timestep does an Allreduce so that
# self.timestep is the global-minimum CFL dt across all ranks.
# _ader2_prev_dt must equal the timestep actually taken so that
# all ranks use the same predictor half-step next iteration;
# saving the local flux_timestep before the Allreduce would give
# each rank a different prev_dt, corrupting ghost-boundary edges.
self.update_timestep(yieldstep, finaltime)
# Record the timestep actually taken as prev_dt for the next predictor.
self._ader2_prev_dt = self.timestep
# Q^n centroids are untouched — update directly (no saxpy restore needed)
self.update_conserved_quantities() # Q^{n+1} = Q^n + dt*R(Q^{n+1/2})
def _evolve_one_ader2_step_gpu(self, yieldstep, finaltime):
"""Python-orchestrated GPU implementation of ADER-2 step.
Fallback when the C ADER-2 loop cannot be used (e.g., unsupported
boundary types). Prefer _evolve_one_ader2_step_c() for better performance.
Uses the fused edge predictor with _ader2_prev_dt to match the CPU path
exactly (single flux call, no backup/restore of centroid values needed).
"""
from anuga.shallow_water.sw_domain_gpu_ext import (
extrapolate_second_order_gpu,
protect_gpu,
compute_fluxes_gpu,
update_conserved_quantities_gpu,
ader_ck_predictor_edge_gpu,
sync_boundary_values,
init_boundary_edge_sync,
boundary_edge_sync,
exchange_ghosts,
evaluate_reflective_boundary_gpu,
evaluate_dirichlet_boundary_gpu,
evaluate_transmissive_boundary_gpu,
set_transmissive_n_zero_t_stage,
evaluate_transmissive_n_zero_t_boundary_gpu,
set_time_boundary_values,
evaluate_time_boundary_gpu,
set_file_boundary_values_from_domain,
evaluate_file_boundary_gpu,
set_absorbing_wave_value,
evaluate_absorbing_wave_boundary_gpu,
set_characteristic_wave_value,
evaluate_characteristic_wave_boundary_gpu,
set_flather_value,
evaluate_flather_boundary_gpu,
)
import numpy as np
gpu_dom = self.gpu_interface.gpu_dom
GPU_BOUNDARY_TYPES = {'Reflective_boundary', 'Dirichlet_boundary', 'Transmissive_boundary',
'Transmissive_n_momentum_zero_t_momentum_set_stage_boundary',
'Time_boundary', 'File_boundary', 'Field_boundary',
'Absorbing_wave_boundary', 'Characteristic_wave_boundary',
'Flather_boundary'}
if not hasattr(self, '_gpu_boundary_info_initialized'):
self._gpu_cpu_tags = []
self._gpu_all_on_gpu = True
cpu_boundary_types = []
self._gpu_transmissive_n_zero_t_boundaries = []
self._gpu_time_boundaries = []
self._gpu_absorbing_wave_boundaries = []
self._gpu_characteristic_wave_boundaries = []
self._gpu_flather_boundaries = []
for tag, B in self.boundary_map.items():
if B is not None:
btype = B.__class__.__name__
if btype not in GPU_BOUNDARY_TYPES:
self._gpu_cpu_tags.append(tag)
self._gpu_all_on_gpu = False
cpu_boundary_types.append((tag, btype))
elif btype == 'Transmissive_n_momentum_zero_t_momentum_set_stage_boundary':
self._gpu_transmissive_n_zero_t_boundaries.append(B)
elif btype == 'Time_boundary':
self._gpu_time_boundaries.append(B)
elif btype == 'Absorbing_wave_boundary':
self._gpu_absorbing_wave_boundaries.append(B)
elif btype == 'Characteristic_wave_boundary':
self._gpu_characteristic_wave_boundaries.append(B)
elif btype == 'Flather_boundary':
self._gpu_flather_boundaries.append(B)
if not self._gpu_all_on_gpu:
boundary_cell_ids = np.unique(self.boundary_cells).astype(np.intc)
init_boundary_edge_sync(gpu_dom, boundary_cell_ids)
print("WARNING: GPU boundary evaluation disabled - falling back to CPU")
print(f" Unsupported boundary types: {cpu_boundary_types}")
self._gpu_boundary_info_initialized = True
# Bootstrap: prev_dt=0 on first call → plain Euler step
if not hasattr(self, '_ader2_prev_dt'):
self._ader2_prev_dt = 0.0
prev_dt = self._ader2_prev_dt
def _eval_boundaries():
if self._gpu_all_on_gpu:
evaluate_reflective_boundary_gpu(gpu_dom)
evaluate_dirichlet_boundary_gpu(gpu_dom)
evaluate_transmissive_boundary_gpu(gpu_dom)
for B in self._gpu_transmissive_n_zero_t_boundaries:
stage_val = B.get_boundary_values()
try:
stage_val = float(stage_val)
except (TypeError, ValueError):
stage_val = float(stage_val[0])
set_transmissive_n_zero_t_stage(gpu_dom, stage_val)
evaluate_transmissive_n_zero_t_boundary_gpu(gpu_dom)
for B in self._gpu_time_boundaries:
q = B.get_boundary_values()
set_time_boundary_values(gpu_dom, float(q[0]), float(q[1]), float(q[2]))
evaluate_time_boundary_gpu(gpu_dom)
set_file_boundary_values_from_domain(gpu_dom, self)
evaluate_file_boundary_gpu(gpu_dom)
for B in self._gpu_absorbing_wave_boundaries:
value = B.get_boundary_values()
try:
wave_val = float(value)
except (TypeError, ValueError):
wave_val = float(value[0])
set_absorbing_wave_value(gpu_dom, wave_val)
evaluate_absorbing_wave_boundary_gpu(gpu_dom)
for B in self._gpu_characteristic_wave_boundaries:
value = B.get_boundary_values()
try:
perturb = float(value)
except (TypeError, ValueError):
perturb = float(value[0])
set_characteristic_wave_value(gpu_dom, perturb)
evaluate_characteristic_wave_boundary_gpu(gpu_dom)
for B in self._gpu_flather_boundaries:
value = B.get_boundary_values()
try:
stage_val = float(value)
except (TypeError, ValueError):
stage_val = float(value[0])
set_flather_value(gpu_dom, stage_val)
evaluate_flather_boundary_gpu(gpu_dom)
else:
boundary_edge_sync(gpu_dom)
for tag in self.tag_boundary_cells:
B = self.boundary_map[tag]
if B is not None:
B.evaluate_segment(self, self.tag_boundary_cells[tag])
sync_boundary_values(gpu_dom)
# --- Step 1: extrapolate Q^n → edges + evaluate boundaries ---
protect_gpu(gpu_dom)
extrapolate_second_order_gpu(gpu_dom)
_eval_boundaries()
if prev_dt > 0.0:
# --- Step 2: fused edge C-K predictor → Q^{n+1/2} edges in-place ---
# (centroid values unchanged — no backup/restore needed)
ader_ck_predictor_edge_gpu(gpu_dom, prev_dt * 0.5)
# Re-apply boundaries to boundary edges
_eval_boundaries()
# --- Step 3: single flux call from Q^{n+1/2} edges ---
self.flux_timestep = compute_fluxes_gpu(gpu_dom)
self.compute_forcing_terms()
# --- Step 4: Allreduce + yieldstep/finaltime clip ---
self.update_timestep(yieldstep, finaltime)
self._ader2_prev_dt = self.timestep
# --- Step 5: update Q^{n+1} = Q^n + timestep * R (no restore needed) ---
update_conserved_quantities_gpu(gpu_dom, self.timestep)
self.set_relative_time(self.get_relative_time() + self.timestep)
# Record the CFL-constrained step (pre yield/final cap), matching legacy
# update_timestep(), rather than the yield-limited step actually taken.
cfl_dt = min(self.CFL * self.flux_timestep, self.evolve_max_timestep)
self.recorded_max_timestep = max(cfl_dt, self.recorded_max_timestep)
self.recorded_min_timestep = min(cfl_dt, self.recorded_min_timestep)
# Post-step ghost exchange — update_ghosts() is a no-op in GPU mode
if self.ghost_layer_width < 4:
exchange_ghosts(gpu_dom)
def _evolve_one_ader2_step_c(self, yieldstep, finaltime):
"""ADER-2 step executed entirely in C - eliminates Python round-trip overhead.
Prefer this over _evolve_one_ader2_step_gpu() for better performance.
Falls back to _evolve_one_ader2_step_gpu() if any boundary requires CPU.
Rate_operators must be applied separately (after this call).
"""
from anuga.shallow_water.sw_domain_gpu_ext import (
evolve_one_ader2_step_gpu,
set_transmissive_n_zero_t_stage,
set_time_boundary_values,
set_file_boundary_values_from_domain,
set_absorbing_wave_value,
set_characteristic_wave_value,
set_flather_value,
)
gpu_dom = self.gpu_interface.gpu_dom
GPU_BOUNDARY_TYPES = {'Reflective_boundary', 'Dirichlet_boundary', 'Transmissive_boundary',
'Transmissive_n_momentum_zero_t_momentum_set_stage_boundary',
'Time_boundary', 'File_boundary', 'Field_boundary',
'Absorbing_wave_boundary', 'Characteristic_wave_boundary'}
if not hasattr(self, '_gpu_boundary_info_initialized'):
self._gpu_cpu_tags = []
self._gpu_all_on_gpu = True
cpu_boundary_types = []
self._gpu_transmissive_n_zero_t_boundaries = []
self._gpu_time_boundaries = []
self._gpu_absorbing_wave_boundaries = []
self._gpu_characteristic_wave_boundaries = []
for tag, B in self.boundary_map.items():
if B is not None:
btype = B.__class__.__name__
if btype not in GPU_BOUNDARY_TYPES:
self._gpu_cpu_tags.append(tag)
self._gpu_all_on_gpu = False
cpu_boundary_types.append((tag, btype))
elif btype == 'Transmissive_n_momentum_zero_t_momentum_set_stage_boundary':
self._gpu_transmissive_n_zero_t_boundaries.append(B)
elif btype == 'Time_boundary':
self._gpu_time_boundaries.append(B)
elif btype == 'Absorbing_wave_boundary':
self._gpu_absorbing_wave_boundaries.append(B)
elif btype == 'Characteristic_wave_boundary':
self._gpu_characteristic_wave_boundaries.append(B)
if not self._gpu_all_on_gpu:
print("WARNING: C ADER-2 loop requires all GPU-supported boundary types")
print(" Falling back to Python-orchestrated GPU loop")
print(f" Unsupported types: {cpu_boundary_types}")
self._gpu_boundary_info_initialized = True
if not self._gpu_all_on_gpu:
return self._evolve_one_ader2_step_gpu(yieldstep, finaltime)
for B in self._gpu_transmissive_n_zero_t_boundaries:
stage_val = B.get_boundary_values()
try:
stage_val = float(stage_val)
except (TypeError, ValueError):
stage_val = float(stage_val[0])
set_transmissive_n_zero_t_stage(gpu_dom, stage_val)
for B in self._gpu_time_boundaries:
q = B.get_boundary_values()
set_time_boundary_values(gpu_dom, float(q[0]), float(q[1]), float(q[2]))
set_file_boundary_values_from_domain(gpu_dom, self)
for B in self._gpu_absorbing_wave_boundaries:
value = B.get_boundary_values()
try:
wave_val = float(value)
except (TypeError, ValueError):
wave_val = float(value[0])
set_absorbing_wave_value(gpu_dom, wave_val)
for B in self._gpu_characteristic_wave_boundaries:
value = B.get_boundary_values()
try:
perturb = float(value)
except (TypeError, ValueError):
perturb = float(value[0])
set_characteristic_wave_value(gpu_dom, perturb)
for B in self._gpu_flather_boundaries:
value = B.get_boundary_values()
try:
stage_val = float(value)
except (TypeError, ValueError):
stage_val = float(value[0])
set_flather_value(gpu_dom, stage_val)
# Time remaining to the next yield boundary. Use the explicitly tracked
# relative_yieldtime (set and incremented by the evolve loop), NOT
# (relative_time % yieldstep): the modulo is floating-point fragile, e.g.
# 0.3 % 0.1 == 0.0999... gives remaining ~1e-17 -> max_timestep ~0 ->
# the step never advances and evolve() spins. Mirrors update_timestep().
relative_yieldtime = getattr(self, 'relative_yieldtime', None)
if relative_yieldtime is not None:
remaining_yieldstep = relative_yieldtime - self.get_relative_time()
else:
remaining_yieldstep = yieldstep
if finaltime is not None:
remaining_finaltime = finaltime - self.get_time()
max_timestep = min(self.evolve_max_timestep, remaining_yieldstep, remaining_finaltime)
else:
max_timestep = min(self.evolve_max_timestep, remaining_yieldstep)
if not hasattr(self, '_ader2_prev_dt'):
self._ader2_prev_dt = 0.0
self.timestep = evolve_one_ader2_step_gpu(gpu_dom, max_timestep, 1, self._ader2_prev_dt)
self._ader2_prev_dt = self.timestep
# Do NOT advance relative_time here — the evolve loop does it after
# apply_fractional_steps(); see _evolve_one_euler_step_c for why.
# Record the CFL-constrained step (pre yield/final cap), matching legacy
# update_timestep(), rather than the yield-limited step actually taken.
cfl_dt = gpu_dom.recorded_flux_timestep
self.recorded_max_timestep = max(cfl_dt, self.recorded_max_timestep)
self.recorded_min_timestep = min(cfl_dt, self.recorded_min_timestep)
# Post-step ghost exchange — update_ghosts() is a no-op in GPU mode
if self.ghost_layer_width < 4:
from anuga.shallow_water.sw_domain_gpu_ext import exchange_ghosts
exchange_ghosts(gpu_dom)
def _evolve_one_rk2_step_gpu(self, yieldstep, finaltime):
"""Python-orchestrated GPU implementation of RK2 step.
This is a fallback for when the C RK2 loop cannot be used (e.g., unsupported
boundary types). Prefer _evolve_one_rk2_step_c() for better performance.
"""
from anuga.shallow_water.sw_domain_gpu_ext import (
backup_conserved_quantities_gpu,
extrapolate_second_order_gpu,
protect_gpu,
compute_fluxes_gpu,
update_conserved_quantities_gpu,
saxpy_conserved_quantities_gpu,
sync_boundary_values,
init_boundary_edge_sync,
boundary_edge_sync,
exchange_ghosts,
evaluate_reflective_boundary_gpu,
evaluate_dirichlet_boundary_gpu,
evaluate_transmissive_boundary_gpu,
set_transmissive_n_zero_t_stage,
evaluate_transmissive_n_zero_t_boundary_gpu,
set_time_boundary_values,
evaluate_time_boundary_gpu,
set_file_boundary_values_from_domain,
evaluate_file_boundary_gpu,
set_absorbing_wave_value,
evaluate_absorbing_wave_boundary_gpu,
set_characteristic_wave_value,
evaluate_characteristic_wave_boundary_gpu,
set_flather_value,
evaluate_flather_boundary_gpu,
)
import numpy as np
gpu_dom = self.gpu_interface.gpu_dom
# Supported GPU boundary types
GPU_BOUNDARY_TYPES = {'Reflective_boundary', 'Dirichlet_boundary', 'Transmissive_boundary',
'Transmissive_n_momentum_zero_t_momentum_set_stage_boundary',
'Time_boundary', 'File_boundary', 'Field_boundary',
'Absorbing_wave_boundary', 'Characteristic_wave_boundary'}
# Lazy init: identify which boundaries need CPU evaluation vs GPU
if not hasattr(self, '_gpu_boundary_info_initialized'):
self._gpu_cpu_tags = []
self._gpu_all_on_gpu = True
cpu_boundary_types = []
self._gpu_transmissive_n_zero_t_boundaries = []
self._gpu_time_boundaries = []
self._gpu_absorbing_wave_boundaries = []
self._gpu_characteristic_wave_boundaries = []
for tag, B in self.boundary_map.items():
if B is not None:
btype = B.__class__.__name__
if btype not in GPU_BOUNDARY_TYPES:
self._gpu_cpu_tags.append(tag)
self._gpu_all_on_gpu = False
cpu_boundary_types.append((tag, btype))
elif btype == 'Transmissive_n_momentum_zero_t_momentum_set_stage_boundary':
self._gpu_transmissive_n_zero_t_boundaries.append(B)
elif btype == 'Time_boundary':
self._gpu_time_boundaries.append(B)
elif btype == 'Absorbing_wave_boundary':
self._gpu_absorbing_wave_boundaries.append(B)
elif btype == 'Characteristic_wave_boundary':
self._gpu_characteristic_wave_boundaries.append(B)
if not self._gpu_all_on_gpu:
boundary_cell_ids = np.unique(self.boundary_cells).astype(np.intc)
init_boundary_edge_sync(gpu_dom, boundary_cell_ids)
print("WARNING: GPU boundary evaluation disabled - falling back to CPU")
print(f" Unsupported boundary types: {cpu_boundary_types}")
self._gpu_boundary_info_initialized = True
# Backup for RK2
backup_conserved_quantities_gpu(gpu_dom)
# ==========================================
# First Euler step
# ==========================================
protect_gpu(gpu_dom)
extrapolate_second_order_gpu(gpu_dom)
# Evaluate boundaries
if self._gpu_all_on_gpu:
evaluate_reflective_boundary_gpu(gpu_dom)
evaluate_dirichlet_boundary_gpu(gpu_dom)
evaluate_transmissive_boundary_gpu(gpu_dom)
for B in self._gpu_transmissive_n_zero_t_boundaries:
stage_val = B.get_boundary_values()
try:
stage_val = float(stage_val)
except (TypeError, ValueError):
stage_val = float(stage_val[0])
set_transmissive_n_zero_t_stage(gpu_dom, stage_val)
evaluate_transmissive_n_zero_t_boundary_gpu(gpu_dom)
for B in self._gpu_time_boundaries:
q = B.get_boundary_values()
set_time_boundary_values(gpu_dom, float(q[0]), float(q[1]), float(q[2]))
evaluate_time_boundary_gpu(gpu_dom)
set_file_boundary_values_from_domain(gpu_dom, self)
evaluate_file_boundary_gpu(gpu_dom)
for B in self._gpu_absorbing_wave_boundaries:
value = B.get_boundary_values()
try:
wave_val = float(value)
except (TypeError, ValueError):
wave_val = float(value[0])
set_absorbing_wave_value(gpu_dom, wave_val)
evaluate_absorbing_wave_boundary_gpu(gpu_dom)
for B in self._gpu_characteristic_wave_boundaries:
value = B.get_boundary_values()
try:
perturb = float(value)
except (TypeError, ValueError):
perturb = float(value[0])
set_characteristic_wave_value(gpu_dom, perturb)
evaluate_characteristic_wave_boundary_gpu(gpu_dom)
for B in self._gpu_flather_boundaries:
value = B.get_boundary_values()
try:
stage_val = float(value)
except (TypeError, ValueError):
stage_val = float(value[0])
set_flather_value(gpu_dom, stage_val)
evaluate_flather_boundary_gpu(gpu_dom)
else:
boundary_edge_sync(gpu_dom)
for tag in self.tag_boundary_cells:
B = self.boundary_map[tag]
if B is not None:
B.evaluate_segment(self, self.tag_boundary_cells[tag])
sync_boundary_values(gpu_dom)
# Compute fluxes
self.flux_timestep = compute_fluxes_gpu(gpu_dom)
# Forcing terms
self.compute_forcing_terms()
# Update timestep
self.update_timestep(yieldstep, finaltime)
# Update conserved quantities
update_conserved_quantities_gpu(gpu_dom, self.timestep)
# End of first Euler step
self.set_relative_time(self.get_relative_time() + self.timestep)
# Ghost exchange (MPI)
if self.ghost_layer_width < 4:
exchange_ghosts(gpu_dom)
# =========================================
# Second Euler step
# =========================================
protect_gpu(gpu_dom)
extrapolate_second_order_gpu(gpu_dom)
# Evaluate boundaries
if self._gpu_all_on_gpu:
evaluate_reflective_boundary_gpu(gpu_dom)
evaluate_dirichlet_boundary_gpu(gpu_dom)
evaluate_transmissive_boundary_gpu(gpu_dom)
for B in self._gpu_transmissive_n_zero_t_boundaries:
stage_val = B.get_boundary_values()
try:
stage_val = float(stage_val)
except (TypeError, ValueError):
stage_val = float(stage_val[0])
set_transmissive_n_zero_t_stage(gpu_dom, stage_val)
evaluate_transmissive_n_zero_t_boundary_gpu(gpu_dom)
for B in self._gpu_time_boundaries:
q = B.get_boundary_values()
set_time_boundary_values(gpu_dom, float(q[0]), float(q[1]), float(q[2]))
evaluate_time_boundary_gpu(gpu_dom)
set_file_boundary_values_from_domain(gpu_dom, self)
evaluate_file_boundary_gpu(gpu_dom)
for B in self._gpu_absorbing_wave_boundaries:
value = B.get_boundary_values()
try:
wave_val = float(value)
except (TypeError, ValueError):
wave_val = float(value[0])
set_absorbing_wave_value(gpu_dom, wave_val)
evaluate_absorbing_wave_boundary_gpu(gpu_dom)
for B in self._gpu_characteristic_wave_boundaries:
value = B.get_boundary_values()
try:
perturb = float(value)
except (TypeError, ValueError):
perturb = float(value[0])
set_characteristic_wave_value(gpu_dom, perturb)
evaluate_characteristic_wave_boundary_gpu(gpu_dom)
for B in self._gpu_flather_boundaries:
value = B.get_boundary_values()
try:
stage_val = float(value)
except (TypeError, ValueError):
stage_val = float(value[0])
set_flather_value(gpu_dom, stage_val)
evaluate_flather_boundary_gpu(gpu_dom)
else:
boundary_edge_sync(gpu_dom)
for tag in self.tag_boundary_cells:
B = self.boundary_map[tag]
if B is not None:
B.evaluate_segment(self, self.tag_boundary_cells[tag])
sync_boundary_values(gpu_dom)
# Compute fluxes (ignore timestep from second step)
compute_fluxes_gpu(gpu_dom)
# Forcing terms
self.compute_forcing_terms()
# Update conserved quantities
update_conserved_quantities_gpu(gpu_dom, self.timestep)
# RK2 averaging
saxpy_conserved_quantities_gpu(gpu_dom, 0.5, 0.5)
# Post-step ghost exchange — update_ghosts() is a no-op in GPU mode
if self.ghost_layer_width < 4:
exchange_ghosts(gpu_dom)
def _evolve_one_euler_step_gpu(self, yieldstep, finaltime):
"""Python-orchestrated GPU Euler (DE0) step.
Fallback for _evolve_one_euler_step_c() when the boundary map contains a
type the C Euler loop cannot evaluate on the device (e.g.
Transmissive_momentum_set_stage_boundary). GPU-supported boundaries are
evaluated on the device; any others are evaluated on the host via
evaluate_segment() and synced back. This keeps DE0 results correct
(identical to legacy) for every boundary type — the same fallback that
rk2/rk3/ader2 already perform. Prefer _evolve_one_euler_step_c() (faster)
when all boundaries are GPU-supported.
"""
from anuga.shallow_water.sw_domain_gpu_ext import (
extrapolate_second_order_gpu,
protect_gpu,
compute_fluxes_gpu,
update_conserved_quantities_gpu,
sync_boundary_values,
init_boundary_edge_sync,
boundary_edge_sync,
exchange_ghosts,
evaluate_reflective_boundary_gpu,
evaluate_dirichlet_boundary_gpu,
evaluate_transmissive_boundary_gpu,
set_transmissive_n_zero_t_stage,
evaluate_transmissive_n_zero_t_boundary_gpu,
set_time_boundary_values,
evaluate_time_boundary_gpu,
set_file_boundary_values_from_domain,
evaluate_file_boundary_gpu,
set_absorbing_wave_value,
evaluate_absorbing_wave_boundary_gpu,
set_characteristic_wave_value,
evaluate_characteristic_wave_boundary_gpu,
)
import numpy as np
gpu_dom = self.gpu_interface.gpu_dom
GPU_BOUNDARY_TYPES = {'Reflective_boundary', 'Dirichlet_boundary', 'Transmissive_boundary',
'Transmissive_n_momentum_zero_t_momentum_set_stage_boundary',
'Time_boundary', 'File_boundary', 'Field_boundary',
'Absorbing_wave_boundary', 'Characteristic_wave_boundary'}
if not hasattr(self, '_gpu_boundary_info_initialized'):
self._gpu_cpu_tags = []
self._gpu_all_on_gpu = True
cpu_boundary_types = []
self._gpu_transmissive_n_zero_t_boundaries = []
self._gpu_time_boundaries = []
self._gpu_absorbing_wave_boundaries = []
self._gpu_characteristic_wave_boundaries = []
for tag, B in self.boundary_map.items():
if B is not None:
btype = B.__class__.__name__
if btype not in GPU_BOUNDARY_TYPES:
self._gpu_cpu_tags.append(tag)
self._gpu_all_on_gpu = False
cpu_boundary_types.append((tag, btype))
elif btype == 'Transmissive_n_momentum_zero_t_momentum_set_stage_boundary':
self._gpu_transmissive_n_zero_t_boundaries.append(B)
elif btype == 'Time_boundary':
self._gpu_time_boundaries.append(B)
elif btype == 'Absorbing_wave_boundary':
self._gpu_absorbing_wave_boundaries.append(B)
elif btype == 'Characteristic_wave_boundary':
self._gpu_characteristic_wave_boundaries.append(B)
if not self._gpu_all_on_gpu:
boundary_cell_ids = np.unique(self.boundary_cells).astype(np.intc)
init_boundary_edge_sync(gpu_dom, boundary_cell_ids)
self._gpu_boundary_info_initialized = True
protect_gpu(gpu_dom)
extrapolate_second_order_gpu(gpu_dom)
if self._gpu_all_on_gpu:
evaluate_reflective_boundary_gpu(gpu_dom)
evaluate_dirichlet_boundary_gpu(gpu_dom)
evaluate_transmissive_boundary_gpu(gpu_dom)
for B in self._gpu_transmissive_n_zero_t_boundaries:
stage_val = B.get_boundary_values()
try:
stage_val = float(stage_val)
except (TypeError, ValueError):
stage_val = float(stage_val[0])
set_transmissive_n_zero_t_stage(gpu_dom, stage_val)
evaluate_transmissive_n_zero_t_boundary_gpu(gpu_dom)
for B in self._gpu_time_boundaries:
q = B.get_boundary_values()
set_time_boundary_values(gpu_dom, float(q[0]), float(q[1]), float(q[2]))
evaluate_time_boundary_gpu(gpu_dom)
set_file_boundary_values_from_domain(gpu_dom, self)
evaluate_file_boundary_gpu(gpu_dom)
for B in self._gpu_absorbing_wave_boundaries:
value = B.get_boundary_values()
try:
wave_val = float(value)
except (TypeError, ValueError):
wave_val = float(value[0])
set_absorbing_wave_value(gpu_dom, wave_val)
evaluate_absorbing_wave_boundary_gpu(gpu_dom)
for B in self._gpu_characteristic_wave_boundaries:
value = B.get_boundary_values()
try:
perturb = float(value)
except (TypeError, ValueError):
perturb = float(value[0])
set_characteristic_wave_value(gpu_dom, perturb)
evaluate_characteristic_wave_boundary_gpu(gpu_dom)
else:
# Host evaluation of any non-GPU boundary type (e.g.
# Transmissive_momentum_set_stage_boundary), then sync edge values
# back to the device for the flux kernel.
boundary_edge_sync(gpu_dom)
for tag in self.tag_boundary_cells:
B = self.boundary_map[tag]
if B is not None:
B.evaluate_segment(self, self.tag_boundary_cells[tag])
sync_boundary_values(gpu_dom)
# Compute fluxes (sets flux_timestep = CFL flux step)
self.flux_timestep = compute_fluxes_gpu(gpu_dom)
# Forcing terms (friction)
self.compute_forcing_terms()
# Update timestep to fit yieldstep/finaltime (also records
# recorded_min/max_timestep from the CFL constraint).
self.update_timestep(yieldstep, finaltime)
# Update conserved quantities
update_conserved_quantities_gpu(gpu_dom, self.timestep)
# Do NOT advance relative_time here — the evolve loop advances it after
# apply_fractional_steps(); see _evolve_one_euler_step_c for why.
# Post-step ghost exchange
if self.ghost_layer_width < 4:
exchange_ghosts(gpu_dom)
def _evolve_one_euler_step_c(self, yieldstep, finaltime):
"""Euler step executed entirely in C - eliminates Python round-trip overhead.
All kernel calls (extrapolation, boundaries, fluxes, forcing, update) happen
in C with MPI timestep reduction also in C. Only one Python->C call per step.
Limitations:
- Only supports GPU-evaluated boundary types
- Rate_operators must be applied separately (after this call)
"""
from anuga.shallow_water.sw_domain_gpu_ext import (
evolve_one_euler_step_gpu,
set_transmissive_n_zero_t_stage,
set_time_boundary_values,
set_file_boundary_values_from_domain,
set_absorbing_wave_value,
set_characteristic_wave_value,
set_flather_value,
exchange_ghosts,
)
gpu_dom = self.gpu_interface.gpu_dom
GPU_BOUNDARY_TYPES = {'Reflective_boundary', 'Dirichlet_boundary', 'Transmissive_boundary',
'Transmissive_n_momentum_zero_t_momentum_set_stage_boundary',
'Time_boundary', 'File_boundary', 'Field_boundary',
'Absorbing_wave_boundary', 'Characteristic_wave_boundary',
'Flather_boundary'}
if not hasattr(self, '_gpu_boundary_info_initialized'):
self._gpu_cpu_tags = []
self._gpu_all_on_gpu = True
cpu_boundary_types = []
self._gpu_transmissive_n_zero_t_boundaries = []
self._gpu_time_boundaries = []
self._gpu_absorbing_wave_boundaries = []
self._gpu_characteristic_wave_boundaries = []
self._gpu_flather_boundaries = []
for tag, B in self.boundary_map.items():
if B is not None:
btype = B.__class__.__name__
if btype not in GPU_BOUNDARY_TYPES:
self._gpu_cpu_tags.append(tag)
self._gpu_all_on_gpu = False
cpu_boundary_types.append((tag, btype))
elif btype == 'Transmissive_n_momentum_zero_t_momentum_set_stage_boundary':
self._gpu_transmissive_n_zero_t_boundaries.append(B)
elif btype == 'Time_boundary':
self._gpu_time_boundaries.append(B)
elif btype == 'Absorbing_wave_boundary':
self._gpu_absorbing_wave_boundaries.append(B)
elif btype == 'Characteristic_wave_boundary':
self._gpu_characteristic_wave_boundaries.append(B)
elif btype == 'Flather_boundary':
self._gpu_flather_boundaries.append(B)
if not self._gpu_all_on_gpu:
print("WARNING: C Euler loop requires all GPU-supported boundary types")
print(" Falling back to Python-orchestrated GPU loop")
print(" Unsupported types: " + str(cpu_boundary_types))
self._gpu_boundary_info_initialized = True
# Boundaries the C loop cannot evaluate on the device (e.g.
# Transmissive_momentum_set_stage_boundary) are handled by the
# Python-orchestrated fallback, which evaluates them on the host and
# syncs — matching rk2/rk3/ader2. Without this, those boundaries are
# silently ignored in DE0 and results diverge from legacy.
if not self._gpu_all_on_gpu:
return self._evolve_one_euler_step_gpu(yieldstep, finaltime)
# Set time-dependent boundary values before calling C function
for B in self._gpu_transmissive_n_zero_t_boundaries:
stage_val = B.get_boundary_values()
try:
stage_val = float(stage_val)
except (TypeError, ValueError):
stage_val = float(stage_val[0])
set_transmissive_n_zero_t_stage(gpu_dom, stage_val)
for B in self._gpu_time_boundaries:
q = B.get_boundary_values()
set_time_boundary_values(gpu_dom, float(q[0]), float(q[1]), float(q[2]))
set_file_boundary_values_from_domain(gpu_dom, self)
for B in self._gpu_absorbing_wave_boundaries:
value = B.get_boundary_values()
try:
wave_val = float(value)
except (TypeError, ValueError):
wave_val = float(value[0])
set_absorbing_wave_value(gpu_dom, wave_val)
for B in self._gpu_characteristic_wave_boundaries:
value = B.get_boundary_values()
try:
perturb = float(value)
except (TypeError, ValueError):
perturb = float(value[0])
set_characteristic_wave_value(gpu_dom, perturb)
for B in self._gpu_flather_boundaries:
value = B.get_boundary_values()
try:
stage_val = float(value)
except (TypeError, ValueError):
stage_val = float(value[0])
set_flather_value(gpu_dom, stage_val)
# Compute max allowed timestep (mirrors update_timestep() logic)
# Time remaining to the next yield boundary. Use the explicitly tracked
# relative_yieldtime (set and incremented by the evolve loop), NOT
# (relative_time % yieldstep): the modulo is floating-point fragile, e.g.
# 0.3 % 0.1 == 0.0999... gives remaining ~1e-17 -> max_timestep ~0 ->
# the step never advances and evolve() spins. Mirrors update_timestep().
relative_yieldtime = getattr(self, 'relative_yieldtime', None)
if relative_yieldtime is not None:
remaining_yieldstep = relative_yieldtime - self.get_relative_time()
else:
remaining_yieldstep = yieldstep
if finaltime is not None:
remaining_finaltime = finaltime - self.get_time()
max_timestep = min(self.evolve_max_timestep, remaining_yieldstep, remaining_finaltime)
else:
max_timestep = min(self.evolve_max_timestep, remaining_yieldstep)
# Execute full Euler step in C (includes MPI timestep reduction)
self.timestep = evolve_one_euler_step_gpu(gpu_dom, max_timestep, 1)
# NOTE: do NOT advance relative_time here. The evolve loop advances it
# (relative_time = initial_relative_time + timestep) AFTER
# apply_fractional_steps(), so advancing it here makes time-dependent
# operators (variable-Q inlet, time-varying rate, ...) see the time one
# step too far — they would evaluate forcing at t+dt instead of t.
# Record the CFL-constrained step (pre yield/final cap), matching legacy
# update_timestep(), rather than the yield-limited step actually taken.
cfl_dt = gpu_dom.recorded_flux_timestep
self.recorded_max_timestep = max(cfl_dt, self.recorded_max_timestep)
self.recorded_min_timestep = min(cfl_dt, self.recorded_min_timestep)
# Post-step ghost exchange — update_ghosts() is a no-op in GPU mode
if self.ghost_layer_width < 4:
exchange_ghosts(gpu_dom)
def _evolve_one_rk2_step_c(self, yieldstep, finaltime):
"""RK2 step executed entirely in C - eliminates Python round-trip overhead.
This is faster than _evolve_one_rk2_step_gpu() because:
- All kernel calls happen in C without Python round-trips
- MPI reduction for timestep happens in C
- Only one Python->C call per RK2 step
Limitations:
- Only supports GPU-evaluated boundary types
- Rate_operators must be applied separately (after this call)
"""
from anuga.shallow_water.sw_domain_gpu_ext import (
evolve_one_rk2_step_gpu,
set_transmissive_n_zero_t_stage,
set_time_boundary_values,
set_file_boundary_values_from_domain,
set_absorbing_wave_value,
set_characteristic_wave_value,
set_flather_value,
)
gpu_dom = self.gpu_interface.gpu_dom
# Supported GPU boundary types
GPU_BOUNDARY_TYPES = {'Reflective_boundary', 'Dirichlet_boundary', 'Transmissive_boundary',
'Transmissive_n_momentum_zero_t_momentum_set_stage_boundary',
'Time_boundary', 'File_boundary', 'Field_boundary',
'Absorbing_wave_boundary', 'Characteristic_wave_boundary',
'Flather_boundary'}
# Lazy init: identify which boundaries need special handling
if not hasattr(self, '_gpu_boundary_info_initialized'):
self._gpu_cpu_tags = []
self._gpu_all_on_gpu = True
cpu_boundary_types = []
self._gpu_transmissive_n_zero_t_boundaries = []
self._gpu_time_boundaries = []
self._gpu_absorbing_wave_boundaries = []
self._gpu_characteristic_wave_boundaries = []
self._gpu_flather_boundaries = []
for tag, B in self.boundary_map.items():
if B is not None:
btype = B.__class__.__name__
if btype not in GPU_BOUNDARY_TYPES:
self._gpu_cpu_tags.append(tag)
self._gpu_all_on_gpu = False
cpu_boundary_types.append((tag, btype))
elif btype == 'Transmissive_n_momentum_zero_t_momentum_set_stage_boundary':
self._gpu_transmissive_n_zero_t_boundaries.append(B)
elif btype == 'Time_boundary':
self._gpu_time_boundaries.append(B)
elif btype == 'Absorbing_wave_boundary':
self._gpu_absorbing_wave_boundaries.append(B)
elif btype == 'Characteristic_wave_boundary':
self._gpu_characteristic_wave_boundaries.append(B)
elif btype == 'Flather_boundary':
self._gpu_flather_boundaries.append(B)
if not self._gpu_all_on_gpu:
print("WARNING: C RK2 loop requires all GPU-supported boundary types")
print(" Falling back to Python-orchestrated GPU loop")
print(f" Unsupported types: {cpu_boundary_types}")
self._gpu_boundary_info_initialized = True
# If any boundary requires CPU, fall back to Python-orchestrated loop
if not self._gpu_all_on_gpu:
return self._evolve_one_rk2_step_gpu(yieldstep, finaltime)
# Set time-dependent boundary values BEFORE calling C function
# Python function calls are cheap (~microseconds)
for B in self._gpu_transmissive_n_zero_t_boundaries:
stage_val = B.get_boundary_values()
try:
stage_val = float(stage_val)
except (TypeError, ValueError):
stage_val = float(stage_val[0])
set_transmissive_n_zero_t_stage(gpu_dom, stage_val)
for B in self._gpu_time_boundaries:
q = B.get_boundary_values()
set_time_boundary_values(gpu_dom, float(q[0]), float(q[1]), float(q[2]))
set_file_boundary_values_from_domain(gpu_dom, self)
for B in self._gpu_absorbing_wave_boundaries:
value = B.get_boundary_values()
try:
wave_val = float(value)
except (TypeError, ValueError):
wave_val = float(value[0])
set_absorbing_wave_value(gpu_dom, wave_val)
for B in self._gpu_characteristic_wave_boundaries:
value = B.get_boundary_values()
try:
perturb = float(value)
except (TypeError, ValueError):
perturb = float(value[0])
set_characteristic_wave_value(gpu_dom, perturb)
for B in self._gpu_flather_boundaries:
value = B.get_boundary_values()
try:
stage_val = float(value)
except (TypeError, ValueError):
stage_val = float(value[0])
set_flather_value(gpu_dom, stage_val)
# Compute max allowed timestep (respecting yieldstep and finaltime)
# This mirrors the logic in update_timestep()
# Time remaining to the next yield boundary. Use the explicitly tracked
# relative_yieldtime (set and incremented by the evolve loop), NOT
# (relative_time % yieldstep): the modulo is floating-point fragile, e.g.
# 0.3 % 0.1 == 0.0999... gives remaining ~1e-17 -> max_timestep ~0 ->
# the step never advances and evolve() spins. Mirrors update_timestep().
relative_yieldtime = getattr(self, 'relative_yieldtime', None)
if relative_yieldtime is not None:
remaining_yieldstep = relative_yieldtime - self.get_relative_time()
else:
remaining_yieldstep = yieldstep
if finaltime is not None:
remaining_finaltime = finaltime - self.get_time()
max_timestep = min(self.evolve_max_timestep, remaining_yieldstep, remaining_finaltime)
else:
max_timestep = min(self.evolve_max_timestep, remaining_yieldstep)
# Execute full RK2 step in C (includes MPI timestep reduction)
# apply_forcing=1 enables Manning friction on GPU
self.timestep = evolve_one_rk2_step_gpu(gpu_dom, max_timestep, 1)
# Do NOT advance relative_time here — the evolve loop does it after
# apply_fractional_steps(); see _evolve_one_euler_step_c for why.
# Record the CFL-constrained step (pre yield/final cap), matching legacy
# update_timestep(), rather than the yield-limited step actually taken.
cfl_dt = gpu_dom.recorded_flux_timestep
self.recorded_max_timestep = max(cfl_dt, self.recorded_max_timestep)
self.recorded_min_timestep = min(cfl_dt, self.recorded_min_timestep)
# Post-step ghost exchange — update_ghosts() is a no-op in GPU mode
if self.ghost_layer_width < 4:
from anuga.shallow_water.sw_domain_gpu_ext import exchange_ghosts
exchange_ghosts(gpu_dom)
def _evolve_one_rk3_step_gpu(self, yieldstep, finaltime):
"""Python-orchestrated GPU implementation of SSP-RK3 step.
This is a fallback for when the C RK3 loop cannot be used (e.g., unsupported
boundary types). Prefer _evolve_one_rk3_step_c() for better performance.
"""
from anuga.shallow_water.sw_domain_gpu_ext import (
backup_conserved_quantities_gpu,
extrapolate_second_order_gpu,
protect_gpu,
compute_fluxes_gpu,
update_conserved_quantities_gpu,
saxpy_conserved_quantities_gpu,
saxpy3_conserved_quantities_gpu,
sync_boundary_values,
init_boundary_edge_sync,
boundary_edge_sync,
exchange_ghosts,
evaluate_reflective_boundary_gpu,
evaluate_dirichlet_boundary_gpu,
evaluate_transmissive_boundary_gpu,
set_transmissive_n_zero_t_stage,
evaluate_transmissive_n_zero_t_boundary_gpu,
set_time_boundary_values,
evaluate_time_boundary_gpu,
set_file_boundary_values_from_domain,
evaluate_file_boundary_gpu,
set_absorbing_wave_value,
evaluate_absorbing_wave_boundary_gpu,
set_characteristic_wave_value,
evaluate_characteristic_wave_boundary_gpu,
set_flather_value,
evaluate_flather_boundary_gpu,
)
import numpy as np
gpu_dom = self.gpu_interface.gpu_dom
# Supported GPU boundary types
GPU_BOUNDARY_TYPES = {'Reflective_boundary', 'Dirichlet_boundary', 'Transmissive_boundary',
'Transmissive_n_momentum_zero_t_momentum_set_stage_boundary',
'Time_boundary', 'File_boundary', 'Field_boundary',
'Absorbing_wave_boundary', 'Characteristic_wave_boundary'}
# Lazy init: identify which boundaries need CPU evaluation vs GPU
if not hasattr(self, '_gpu_boundary_info_initialized'):
self._gpu_cpu_tags = []
self._gpu_all_on_gpu = True
cpu_boundary_types = []
self._gpu_transmissive_n_zero_t_boundaries = []
self._gpu_time_boundaries = []
self._gpu_absorbing_wave_boundaries = []
self._gpu_characteristic_wave_boundaries = []
for tag, B in self.boundary_map.items():
if B is not None:
btype = B.__class__.__name__
if btype not in GPU_BOUNDARY_TYPES:
self._gpu_cpu_tags.append(tag)
self._gpu_all_on_gpu = False
cpu_boundary_types.append((tag, btype))
elif btype == 'Transmissive_n_momentum_zero_t_momentum_set_stage_boundary':
self._gpu_transmissive_n_zero_t_boundaries.append(B)
elif btype == 'Time_boundary':
self._gpu_time_boundaries.append(B)
elif btype == 'Absorbing_wave_boundary':
self._gpu_absorbing_wave_boundaries.append(B)
elif btype == 'Characteristic_wave_boundary':
self._gpu_characteristic_wave_boundaries.append(B)
if not self._gpu_all_on_gpu:
boundary_cell_ids = np.unique(self.boundary_cells).astype(np.intc)
init_boundary_edge_sync(gpu_dom, boundary_cell_ids)
print("WARNING: GPU boundary evaluation disabled - falling back to CPU")
print(f" Unsupported boundary types: {cpu_boundary_types}")
self._gpu_boundary_info_initialized = True
def _eval_boundaries():
"""Evaluate all boundary conditions on GPU (or CPU fallback)."""
if self._gpu_all_on_gpu:
evaluate_reflective_boundary_gpu(gpu_dom)
evaluate_dirichlet_boundary_gpu(gpu_dom)
evaluate_transmissive_boundary_gpu(gpu_dom)
for B in self._gpu_transmissive_n_zero_t_boundaries:
stage_val = B.get_boundary_values()
try:
stage_val = float(stage_val)
except (TypeError, ValueError):
stage_val = float(stage_val[0])
set_transmissive_n_zero_t_stage(gpu_dom, stage_val)
evaluate_transmissive_n_zero_t_boundary_gpu(gpu_dom)
for B in self._gpu_time_boundaries:
q = B.get_boundary_values()
set_time_boundary_values(gpu_dom, float(q[0]), float(q[1]), float(q[2]))
evaluate_time_boundary_gpu(gpu_dom)
set_file_boundary_values_from_domain(gpu_dom, self)
evaluate_file_boundary_gpu(gpu_dom)
for B in self._gpu_absorbing_wave_boundaries:
value = B.get_boundary_values()
try:
wave_val = float(value)
except (TypeError, ValueError):
wave_val = float(value[0])
set_absorbing_wave_value(gpu_dom, wave_val)
evaluate_absorbing_wave_boundary_gpu(gpu_dom)
for B in self._gpu_characteristic_wave_boundaries:
value = B.get_boundary_values()
try:
perturb = float(value)
except (TypeError, ValueError):
perturb = float(value[0])
set_characteristic_wave_value(gpu_dom, perturb)
evaluate_characteristic_wave_boundary_gpu(gpu_dom)
for B in self._gpu_flather_boundaries:
value = B.get_boundary_values()
try:
stage_val = float(value)
except (TypeError, ValueError):
stage_val = float(value[0])
set_flather_value(gpu_dom, stage_val)
evaluate_flather_boundary_gpu(gpu_dom)
else:
boundary_edge_sync(gpu_dom)
for tag in self.tag_boundary_cells:
B = self.boundary_map[tag]
if B is not None:
B.evaluate_segment(self, self.tag_boundary_cells[tag])
sync_boundary_values(gpu_dom)
initial_relative_time = self.get_relative_time()
# Backup Q^n
backup_conserved_quantities_gpu(gpu_dom)
# ==========================================
# Stage 1: Q^(1) = Q^n + h*L(Q^n)
# ==========================================
protect_gpu(gpu_dom)
extrapolate_second_order_gpu(gpu_dom)
_eval_boundaries()
self.flux_timestep = compute_fluxes_gpu(gpu_dom)
# Forcing terms
self.compute_forcing_terms()
# Update timestep
self.update_timestep(yieldstep, finaltime)
update_conserved_quantities_gpu(gpu_dom, self.timestep)
self.set_relative_time(initial_relative_time + self.timestep)
if self.ghost_layer_width < 4:
exchange_ghosts(gpu_dom)
# ==========================================
# Stage 2: Q^(2) = Q^(1) + h*L(Q^(1))
# ==========================================
protect_gpu(gpu_dom)
extrapolate_second_order_gpu(gpu_dom)
_eval_boundaries()
compute_fluxes_gpu(gpu_dom)
self.compute_forcing_terms()
update_conserved_quantities_gpu(gpu_dom, self.timestep)
# Intermediate: Q = 0.25*Q^(2) + 0.75*Q^n
saxpy_conserved_quantities_gpu(gpu_dom, 0.25, 0.75)
self.set_relative_time(initial_relative_time + self.timestep * 0.5)
if self.ghost_layer_width < 4:
exchange_ghosts(gpu_dom)
# ==========================================
# Stage 3: Q^(3) = Q^(1)_mid + h*L(Q^(1)_mid)
# ==========================================
protect_gpu(gpu_dom)
extrapolate_second_order_gpu(gpu_dom)
_eval_boundaries()
compute_fluxes_gpu(gpu_dom)
self.compute_forcing_terms()
update_conserved_quantities_gpu(gpu_dom, self.timestep)
# Final: Q^{n+1} = (2*Q^(3) + Q^n) / 3
saxpy3_conserved_quantities_gpu(gpu_dom, 2.0, 1.0, 3.0)
self.set_relative_time(initial_relative_time + self.timestep)
# Post-step ghost exchange — update_ghosts() is a no-op in GPU mode
if self.ghost_layer_width < 4:
exchange_ghosts(gpu_dom)
def _evolve_one_rk3_step_c(self, yieldstep, finaltime):
"""RK3 step executed entirely in C - eliminates Python round-trip overhead.
This is faster than _evolve_one_rk3_step_gpu() because:
- All kernel calls happen in C without Python round-trips
- MPI reduction for timestep happens in C
- Only one Python->C call per RK3 step
Limitations:
- Only supports GPU-evaluated boundary types
- Rate_operators must be applied separately (after this call)
"""
from anuga.shallow_water.sw_domain_gpu_ext import (
evolve_one_rk3_step_gpu,
set_transmissive_n_zero_t_stage,
set_time_boundary_values,
set_file_boundary_values_from_domain,
set_absorbing_wave_value,
set_characteristic_wave_value,
set_flather_value,
)
gpu_dom = self.gpu_interface.gpu_dom
# Supported GPU boundary types
GPU_BOUNDARY_TYPES = {'Reflective_boundary', 'Dirichlet_boundary', 'Transmissive_boundary',
'Transmissive_n_momentum_zero_t_momentum_set_stage_boundary',
'Time_boundary', 'File_boundary', 'Field_boundary',
'Absorbing_wave_boundary', 'Characteristic_wave_boundary',
'Flather_boundary'}
# Lazy init: identify which boundaries need special handling
if not hasattr(self, '_gpu_boundary_info_initialized'):
self._gpu_cpu_tags = []
self._gpu_all_on_gpu = True
cpu_boundary_types = []
self._gpu_transmissive_n_zero_t_boundaries = []
self._gpu_time_boundaries = []
self._gpu_absorbing_wave_boundaries = []
self._gpu_characteristic_wave_boundaries = []
self._gpu_flather_boundaries = []
for tag, B in self.boundary_map.items():
if B is not None:
btype = B.__class__.__name__
if btype not in GPU_BOUNDARY_TYPES:
self._gpu_cpu_tags.append(tag)
self._gpu_all_on_gpu = False
cpu_boundary_types.append((tag, btype))
elif btype == 'Transmissive_n_momentum_zero_t_momentum_set_stage_boundary':
self._gpu_transmissive_n_zero_t_boundaries.append(B)
elif btype == 'Time_boundary':
self._gpu_time_boundaries.append(B)
elif btype == 'Absorbing_wave_boundary':
self._gpu_absorbing_wave_boundaries.append(B)
elif btype == 'Characteristic_wave_boundary':
self._gpu_characteristic_wave_boundaries.append(B)
elif btype == 'Flather_boundary':
self._gpu_flather_boundaries.append(B)
if not self._gpu_all_on_gpu:
print("WARNING: C RK3 loop requires all GPU-supported boundary types")
print(" Falling back to Python-orchestrated GPU loop")
print(f" Unsupported types: {cpu_boundary_types}")
self._gpu_boundary_info_initialized = True
# If any boundary requires CPU, fall back to Python-orchestrated loop
if not self._gpu_all_on_gpu:
return self._evolve_one_rk3_step_gpu(yieldstep, finaltime)
# Set time-dependent boundary values BEFORE calling C function
for B in self._gpu_transmissive_n_zero_t_boundaries:
stage_val = B.get_boundary_values()
try:
stage_val = float(stage_val)
except (TypeError, ValueError):
stage_val = float(stage_val[0])
set_transmissive_n_zero_t_stage(gpu_dom, stage_val)
for B in self._gpu_time_boundaries:
q = B.get_boundary_values()
set_time_boundary_values(gpu_dom, float(q[0]), float(q[1]), float(q[2]))
set_file_boundary_values_from_domain(gpu_dom, self)
for B in self._gpu_absorbing_wave_boundaries:
value = B.get_boundary_values()
try:
wave_val = float(value)
except (TypeError, ValueError):
wave_val = float(value[0])
set_absorbing_wave_value(gpu_dom, wave_val)
for B in self._gpu_characteristic_wave_boundaries:
value = B.get_boundary_values()
try:
perturb = float(value)
except (TypeError, ValueError):
perturb = float(value[0])
set_characteristic_wave_value(gpu_dom, perturb)
for B in self._gpu_flather_boundaries:
value = B.get_boundary_values()
try:
stage_val = float(value)
except (TypeError, ValueError):
stage_val = float(value[0])
set_flather_value(gpu_dom, stage_val)
# Compute max allowed timestep (respecting yieldstep and finaltime)
# Time remaining to the next yield boundary. Use the explicitly tracked
# relative_yieldtime (set and incremented by the evolve loop), NOT
# (relative_time % yieldstep): the modulo is floating-point fragile, e.g.
# 0.3 % 0.1 == 0.0999... gives remaining ~1e-17 -> max_timestep ~0 ->
# the step never advances and evolve() spins. Mirrors update_timestep().
relative_yieldtime = getattr(self, 'relative_yieldtime', None)
if relative_yieldtime is not None:
remaining_yieldstep = relative_yieldtime - self.get_relative_time()
else:
remaining_yieldstep = yieldstep
if finaltime is not None:
remaining_finaltime = finaltime - self.get_time()
max_timestep = min(self.evolve_max_timestep, remaining_yieldstep, remaining_finaltime)
else:
max_timestep = min(self.evolve_max_timestep, remaining_yieldstep)
# Execute full RK3 step in C (includes MPI timestep reduction)
# apply_forcing=1 enables Manning friction on GPU
self.timestep = evolve_one_rk3_step_gpu(gpu_dom, max_timestep, 1)
# Update internal time tracking
self.set_relative_time(self.get_relative_time() + self.timestep)
# Record the CFL-constrained step (pre yield/final cap), matching legacy
# update_timestep(), rather than the yield-limited step actually taken.
cfl_dt = gpu_dom.recorded_flux_timestep
self.recorded_max_timestep = max(cfl_dt, self.recorded_max_timestep)
self.recorded_min_timestep = min(cfl_dt, self.recorded_min_timestep)
# Post-step ghost exchange — update_ghosts() is a no-op in GPU mode
if self.ghost_layer_width < 4:
from anuga.shallow_water.sw_domain_gpu_ext import exchange_ghosts
exchange_ghosts(gpu_dom)
def evolve_one_rk3_step(self, yieldstep, finaltime):
"""One 3rd order RK timestep
Q^(1) = 3/4 Q^n + 1/4 E(h)^2 Q^n (at time t^n + h/2)
Q^{n+1} = 1/3 Q^n + 2/3 E(h) Q^(1) (at time t^{n+1})
Does not assume that centroid values have been extrapolated to
vertices and edges
"""
# GPU mode: use C RK loop (faster) or Python-orchestrated GPU loop
if self.multiprocessor_mode == MULTIPROCESSOR_GPU and self.gpu_interface is not None:
if self.use_c_rk_loop:
self._evolve_one_rk3_step_c(yieldstep, finaltime)
else:
self._evolve_one_rk3_step_gpu(yieldstep, finaltime)
return
# Save initial initial conserved quantities values
self.backup_conserved_quantities()
initial_relative_time = self.get_relative_time()
######
# First euler step
######
# From centroid values calculate edge values
self.distribute_to_vertices_and_edges(distribute_to_vertices=False)
# Apply boundary conditions
self.update_boundary()
# Compute fluxes across each element edge
# In MPI parallel mode this involves an allreduce to find global minimal timestep
self.compute_fluxes()
# Compute forcing terms (friction)
self.compute_forcing_terms()
# Update timestep to fit yieldstep and finaltime
self.update_timestep(yieldstep, finaltime)
# Update conserved quantities
self.update_conserved_quantities()
#====================================
# End of first euler step
#====================================
# Update time
self.set_relative_time(self.relative_time+ self.timestep)
# Update ghosts
self.update_ghosts()
#============================================
# Second Euler step using the same timestep
# calculated in the first step. Might lead to
# stability problems but we have not seen any
# example.
#============================================
# Update edge values
self.distribute_to_vertices_and_edges(distribute_to_vertices=False)
# Update boundary values
self.update_boundary()
# Compute fluxes across each element edge
# In MPI parallel mode this involves an allreduce to find global minimal timestep
self.compute_fluxes()
# Compute forcing terms (friction)
self.compute_forcing_terms()
# Update conserved quantities using timestep from first step
self.update_conserved_quantities()
#============================================
# End of second euler step
#============================================
#============================================
# Combine steps to obtain intermediate
# solution at time t^n + 0.5 h
#============================================
# Combine steps
self.saxpy_conserved_quantities(0.25, 0.75)
# Set substep time
self.set_relative_time(initial_relative_time + self.timestep * 0.5)
# Update ghosts
self.update_ghosts()
######
# Third Euler step
######
# Update edge values
self.distribute_to_vertices_and_edges(distribute_to_vertices=False)
# Update boundary values
self.update_boundary()
# Compute fluxes across each element edge
# In MPI parallel mode this involves an allreduce to find global minimal timestep
self.compute_fluxes()
# Compute forcing terms (friction)
self.compute_forcing_terms()
# Update conserved quantities using timestep from first step
self.update_conserved_quantities()
#=======================================
# Combine final and initial values
# and cleanup
#=======================================
# self.saxpy_conserved_quantities(2.0/3.0, 1.0/3.0)
# This caused a roundoff error that created negative water heights
# So do this instead!
self.saxpy_conserved_quantities(2.0, 1.0, 3.0)
# Set new time
self.set_relative_time(initial_relative_time + self.timestep)
def backup_conserved_quantities(self):
# Backup conserved_quantities centroid values
if self.multiprocessor_mode == MULTIPROCESSOR_OPENMP:
from .sw_domain_openmp_ext import backup_conserved_quantities
backup_conserved_quantities(self)
elif self.multiprocessor_mode == MULTIPROCESSOR_GPU:
self.gpu_interface.backup_conserved_quantities_kernel(self)
else:
for name in self.conserved_quantities:
Q = self.quantities[name]
Q.backup_centroid_values()
def saxpy_conserved_quantities(self, a, b, c=None):
# saxpy conserved_quantities centroid values with backup values
if self.multiprocessor_mode == MULTIPROCESSOR_OPENMP:
if c is None:
c = 1.0
from .sw_domain_openmp_ext import saxpy_conserved_quantities
saxpy_conserved_quantities(self, a, b, c)
elif self.multiprocessor_mode == MULTIPROCESSOR_GPU:
if c is not None:
from anuga.shallow_water.sw_domain_gpu_ext import saxpy3_conserved_quantities_gpu
saxpy3_conserved_quantities_gpu(self.gpu_interface.gpu_dom, a, b, c)
else:
self.gpu_interface.saxpy_conserved_quantities_kernel(self, a, b)
else:
for name in self.conserved_quantities:
Q = self.quantities[name]
Q.saxpy_centroid_values(a, b)
if c is not None:
Q.centroid_values[:] = Q.centroid_values / c
def _has_cpu_only_fractional_operators(self):
"""Check if any fractional step operators require CPU execution.
Rate_operators with GPU support don't need CPU sync.
boundary_flux_integral_operator is GPU-safe (only reads boundary_flux_sum).
Boyd_box_operator/Boyd_pipe_operator are GPU-safe via GPUCulvertManager.
Inlet_operator with GPU support doesn't need CPU sync.
Result is cached after first call since operators don't change during simulation.
"""
# Check cache first
if hasattr(self, '_cached_has_cpu_only_ops'):
return self._cached_has_cpu_only_ops
from anuga.operators.rate_operators import Rate_operator
from anuga.operators.boundary_flux_integral_operator import boundary_flux_integral_operator
from anuga.structures.inlet_operator import Inlet_operator
from anuga.structures.gpu_culvert_manager import GPUCulvertManager
from anuga.operators.collect_max_quantities_operator import Collect_max_quantities_operator
# Initialize GPU culvert manager for Boyd operators if needed
has_boyd_ops = any(GPUCulvertManager.is_boyd_operator(op)
for op in self.fractional_step_operators)
if has_boyd_ops and self.gpu_culvert_manager is None:
self.gpu_culvert_manager = GPUCulvertManager(self)
self.gpu_culvert_manager.register_all()
result = False
cpu_only_ops = []
for op in self.fractional_step_operators:
op_name = op.__class__.__name__
if isinstance(op, Rate_operator):
# Force GPU initialization if not already done (lazy init causes race with caching)
if hasattr(op, '_init_gpu') and not getattr(op, '_gpu_initialized', False):
op._init_gpu()
# Rate_operator with GPU support doesn't need CPU sync
if hasattr(op, '_gpu_initialized') and op._gpu_initialized:
continue # GPU-accelerated, no sync needed
elif isinstance(op, boundary_flux_integral_operator):
# boundary_flux_integral_operator only reads boundary_flux_sum (small array)
# and accumulates a scalar - doesn't need full centroid sync
continue
elif isinstance(op, Inlet_operator):
# Force GPU initialization if not already done
if hasattr(op, '_init_gpu') and not getattr(op, '_gpu_initialized', False):
op._init_gpu()
if hasattr(op, '_gpu_initialized') and op._gpu_initialized:
continue # GPU-accelerated, no sync needed
elif isinstance(op, Collect_max_quantities_operator):
# GPU kernel reads device-resident quantities and updates device-resident max
# arrays; no host centroid sync needed.
if hasattr(op, '_init_gpu') and not getattr(op, '_gpu_initialized', False):
op._init_gpu()
if hasattr(op, '_gpu_initialized') and op._gpu_initialized:
continue # GPU-accelerated, no sync needed
elif GPUCulvertManager.is_boyd_operator(op):
# Handled by GPUCulvertManager (local + cross-boundary via MPI in C)
if (self.gpu_culvert_manager is not None
and op in self.gpu_culvert_manager.operators):
continue
# All other operators need CPU sync
cpu_only_ops.append(op_name)
result = True
rank = getattr(self, 'processor', 0)
if result:
# Only print GPU sync warning if GPU offload is actually active
if getattr(self, 'gpu_offload_active', True):
print(f"[Rank {rank}] WARNING: CPU-only fractional operators detected (GPU<->CPU sync every RK2 step):")
for name in cpu_only_ops:
print(f" - {name}")
else:
# Only print GPU-safe message if GPU offload is actually active
if rank == 0 and getattr(self, 'gpu_offload_active', True):
print(f"[Rank {rank}] All fractional operators are GPU-safe, no GPU<->CPU sync needed")
import sys; sys.stdout.flush()
self._cached_has_cpu_only_ops = result
return result
def apply_fractional_steps(self):
"""Override to sync GPU data before fractional step operators run.
Boyd culvert operators are handled via GPUCulvertManager (batched,
only 2 GPU sync points) instead of the per-operator Python loop.
"""
gpu_mode = (self.multiprocessor_mode == MULTIPROCESSOR_GPU and self.gpu_interface is not None)
gpu_culverts_active = (gpu_mode and self.gpu_culvert_manager is not None
and self.gpu_culvert_manager.num_culverts > 0)
needs_cpu_sync = False
if gpu_mode:
needs_cpu_sync = self._has_cpu_only_fractional_operators()
if needs_cpu_sync:
self.gpu_interface.sync_from_device()
# Execute all Boyd culverts in one batched GPU cycle
if gpu_culverts_active:
self.gpu_culvert_manager.apply_all()
# Run remaining operators (skip Boyd operators handled by GPU manager)
for operator in self.fractional_step_operators:
if gpu_culverts_active and operator in self.gpu_culvert_manager.operators:
continue # Already handled by GPUCulvertManager
operator()
if gpu_mode and needs_cpu_sync:
self.gpu_interface.sync_to_device()
def update_ghosts(self, quantities=None):
"""Override to use GPU ghost exchange when in GPU mode."""
if self.multiprocessor_mode == MULTIPROCESSOR_GPU and self.gpu_interface is not None:
# GPU RK2 loop handles ghost exchange internally via exchange_ghosts(gpu_dom)
# Don't do another exchange here - it would cause MPI message conflicts
pass
else:
# Fall back to parent implementation
super().update_ghosts(quantities)
def timestepping_statistics(self,
track_speeds: bool = False,
triangle_id: int | None = None,
relative_time: bool = False,
time_unit: str = 'sec',
datetime: bool = False) -> str:
"""Return string with time stepping statistics for printing or logging
Parameters
----------
time_units : str, optional
Time units for reporting. Options are 'sec', 'min', 'hr', 'day'.
datetime : bool, optional
Flag to use timestamp or datetime.
track_speeds : bool, optional
Optional boolean keyword that decides whether to report location of
smallest timestep as well as a histogram and percentile report.
relative_time : bool, optional
Flag to report relative time instead of absolute time.
triangle_id : int, optional
Can be used to specify a particular triangle rather than the one with
the largest speed.
Returns
-------
str
Formatted string with time stepping statistics.
"""
from anuga.config import epsilon, g
# Call basic machinery from parent class
msg = Generic_Domain.timestepping_statistics(self,
track_speeds=track_speeds,
triangle_id=triangle_id,
relative_time=relative_time,
time_unit=time_unit,
datetime=datetime)
if track_speeds is True and self.max_speed is not None:
# qwidth determines the text field used for quantities
qwidth = self.qwidth
# Selected triangle
k = self.k
# Report some derived quantities at vertices, edges and centroid
# specific to the shallow water wave equation
z = self.quantities['elevation']
w = self.quantities['stage']
Vw = w.get_values(location='vertices', indices=[k])[0]
Ew = w.get_values(location='edges', indices=[k])[0]
Cw = w.get_values(location='centroids', indices=[k])
Vz = z.get_values(location='vertices', indices=[k])[0]
Ez = z.get_values(location='edges', indices=[k])[0]
Cz = z.get_values(location='centroids', indices=[k])
name = 'depth'
Vh = Vw-Vz
Eh = Ew-Ez
Ch = Cw-Cz
message = ' %s: vertex_values = %.4f,\t %.4f,\t %.4f\n'\
% (name.ljust(qwidth), Vh[0], Vh[1], Vh[2])
message += ' %s: edge_values = %.4f,\t %.4f,\t %.4f\n'\
% (name.ljust(qwidth), Eh[0], Eh[1], Eh[2])
message += ' %s: centroid_value = %.4f\n'\
% (name.ljust(qwidth), Ch[0])
msg += message
uh = self.quantities['xmomentum']
vh = self.quantities['ymomentum']
Vuh = uh.get_values(location='vertices', indices=[k])[0]
Euh = uh.get_values(location='edges', indices=[k])[0]
Cuh = uh.get_values(location='centroids', indices=[k])
Vvh = vh.get_values(location='vertices', indices=[k])[0]
Evh = vh.get_values(location='edges', indices=[k])[0]
Cvh = vh.get_values(location='centroids', indices=[k])
# Speeds in each direction
Vu = Vuh/(Vh + epsilon)
Eu = Euh/(Eh + epsilon)
Cu = Cuh/(Ch + epsilon)
name = 'U'
message = ' %s: vertex_values = %.4f,\t %.4f,\t %.4f\n' \
% (name.ljust(qwidth), Vu[0], Vu[1], Vu[2])
message += ' %s: edge_values = %.4f,\t %.4f,\t %.4f\n' \
% (name.ljust(qwidth), Eu[0], Eu[1], Eu[2])
message += ' %s: centroid_value = %.4f\n' \
% (name.ljust(qwidth), Cu[0])
msg += message
Vv = Vvh/(Vh + epsilon)
Ev = Evh/(Eh + epsilon)
Cv = Cvh/(Ch + epsilon)
name = 'V'
message = ' %s: vertex_values = %.4f,\t %.4f,\t %.4f\n' \
% (name.ljust(qwidth), Vv[0], Vv[1], Vv[2])
message += ' %s: edge_values = %.4f,\t %.4f,\t %.4f\n' \
% (name.ljust(qwidth), Ev[0], Ev[1], Ev[2])
message += ' %s: centroid_value = %.4f\n'\
%(name.ljust(qwidth), Cv[0])
msg += message
# Froude number in each direction
name = 'Froude (x)'
Vfx = Vu/(num.sqrt(g*Vh + epsilon))
Efx = Eu/(num.sqrt(g*Eh + epsilon))
Cfx = Cu/(num.sqrt(g*Ch + epsilon))
message = ' %s: vertex_values = %.4f,\t %.4f,\t %.4f\n'\
% (name.ljust(qwidth), Vfx[0], Vfx[1], Vfx[2])
message += ' %s: edge_values = %.4f,\t %.4f,\t %.4f\n'\
% (name.ljust(qwidth), Efx[0], Efx[1], Efx[2])
message += ' %s: centroid_value = %.4f\n'\
% (name.ljust(qwidth), Cfx[0])
msg += message
name = 'Froude (y)'
Vfy = Vv/(num.sqrt(g*Vh + epsilon))
Efy = Ev/(num.sqrt(g*Eh + epsilon))
Cfy = Cv/(num.sqrt(g*Ch + epsilon))
message = ' %s: vertex_values = %.4f,\t %.4f,\t %.4f\n'\
% (name.ljust(qwidth), Vfy[0], Vfy[1], Vfy[2])
message += ' %s: edge_values = %.4f,\t %.4f,\t %.4f\n'\
% (name.ljust(qwidth), Efy[0], Efy[1], Efy[2])
message += ' %s: centroid_value = %.4f\n'\
% (name.ljust(qwidth), Cfy[0])
msg += message
return msg
def print_timestepping_statistics(self, *args, **kwargs) -> None:
"""Print time stepping statistics.
Parameters
----------
time_units : str, optional
Time units for reporting. Options are 'sec', 'min', 'hr', 'day'.
datetime : bool, optional
Flag to use timestamp or datetime.
track_speed : bool, optional
Optional boolean keyword that decides whether to report location of
smallest timestep as well as a histogram and percentile report.
relative_time : bool, optional
Flag to report relative time instead of absolute time.
triangle_id : int, optional
Can be used to specify a particular triangle rather than the one with
the largest speed.
"""
msg = self.timestepping_statistics(*args, **kwargs)
print(msg, flush=True)
def compute_boundary_flows(self) -> tuple[dict[str, float], float, float]:
"""Compute boundary flows at current timestep.
Computes the total inflow and outflow across the domain boundary,
as well as the flow across each tagged boundary segment.
Returns
-------
boundary_flows : dict
Flow rates [m^3/s] for each boundary tag
total_boundary_inflow : float
Total inflow across boundary [m^3/s]
total_boundary_outflow : float
Total outflow across boundary [m^3/s]
Notes
-----
These calculations are only approximate since they don't use the
flux calculation used in evolve. For exact computation, see
get_boundary_flux_integral.
"""
# Run through boundary array and compute for each segment
# the normal momentum ((uh, vh) dot normal) times segment length.
# Based on sign accumulate this into boundary_inflow and
# boundary_outflow.
# Compute flows along boundary
uh = self.get_quantity('xmomentum').get_values(location='edges')
vh = self.get_quantity('ymomentum').get_values(location='edges')
# Loop through edges that lie on the boundary and calculate
# flows
boundary_flows = {}
total_boundary_inflow = 0.0
total_boundary_outflow = 0.0
for vol_id, edge_id in self.boundary:
# Compute normal flow across edge. Since normal vector points
# away from triangle, a positive sign means that water
# flows *out* from this triangle.
momentum = [uh[vol_id, edge_id], vh[vol_id, edge_id]]
normal = self.mesh.get_normal(vol_id, edge_id)
length = self.mesh.get_edgelength(vol_id, edge_id)
normal_flow = num.dot(momentum, normal)*length
# Reverse sign so that + is taken to mean inflow
# and - means outflow. This is more intuitive.
edge_flow = -normal_flow
# Tally up inflows and outflows separately
if edge_flow > 0:
# Flow is inflow
total_boundary_inflow += edge_flow
else:
# Flow is outflow
total_boundary_outflow += edge_flow
# Tally up flows by boundary tag
tag = self.boundary[(vol_id, edge_id)]
if tag not in boundary_flows:
boundary_flows[tag] = 0.0
boundary_flows[tag] += edge_flow
return boundary_flows, total_boundary_inflow, total_boundary_outflow
def compute_total_volume(self) -> float:
"""
Compute total volume (m^3) of water in entire domain
"""
return self.get_water_volume()
def volumetric_balance_statistics(self) -> str:
"""Create volumetric balance report suitable for printing or logging.
"""
(boundary_flows, total_boundary_inflow,
total_boundary_outflow) = self.compute_boundary_flows()
message = '---------------------------\n'
message += 'Volumetric balance report:\n'
message += 'Note: Boundary fluxes are not exact\n'
message += 'See get_boundary_flux_integral for exact computation\n'
message += '--------------------------\n'
message += 'Total boundary inflow [m^3/s]: %.2f\n' % total_boundary_inflow
message += 'Total boundary outflow [m^3/s]: %.2f\n' % total_boundary_outflow
message += 'Net boundary flow by tags [m^3/s]\n'
for tag in boundary_flows:
message += ' %s [m^3/s]: %.2f\n' % (tag, boundary_flows[tag])
message += 'Total net boundary flow [m^3/s]: %.2f\n' % \
(total_boundary_inflow + total_boundary_outflow)
message += 'Total volume in domain [m^3]: %.2f\n' % \
self.compute_total_volume()
# The go through explicit forcing update and record the rate of change
# for stage and
# record into forcing_inflow and forcing_outflow. Finally compute
# integral of depth to obtain total volume of domain.
# FIXME(Ole): This part is not yet done.
return message
def print_volumetric_balance_statistics(self) -> None:
print (self.volumetric_balance_statistics())
def report_water_volume_statistics(self, verbose: bool = True,
returnStats: bool = False) -> list[float] | None:
"""
Compute the volume, boundary flux integral, fractional step volume integral, and their difference
If verbose, print a summary
If returnStats, return a list with the volume statistics
"""
from anuga import myid
if(self.compute_fluxes_method != 'DE'):
if(myid == 0):
print('Water_volume_statistics only supported for DE algorithm ')
return
# Compute the volume
Vol = self.get_water_volume()
# Compute the boundary flux integral
fluxIntegral=self.get_boundary_flux_integral()
fracIntegral=self.get_fractional_step_volume_integral()
if(verbose and myid==0):
print(' ')
print(f' Volume V at time {self.get_time()}:', Vol)
print(' Boundary Flux integral BF: ', fluxIntegral)
print(' (rate + inlet) Fractional Step volume integral FS: ', fracIntegral)
print(' V - BF - FS - InitialVolume :', Vol- fluxIntegral -fracIntegral - self.volume_history[0])
print(' ')
if returnStats:
return [Vol, fluxIntegral, fracIntegral]
else:
return
def report_cells_with_small_local_timestep(self, threshold_depth: float | None = None) -> None:
"""
Convenience function to print the locations of cells
with a small local timestep.
Computations are at cell centroids
Useful in models
with complex meshes, to find ways to speed up the model
"""
from anuga.parallel import myid, numprocs
from anuga.config import g, epsilon
if threshold_depth is None:
threshold_depth=self.minimum_allowed_height
uh = self.quantities['xmomentum'].centroid_values
vh = self.quantities['ymomentum'].centroid_values
d = self.quantities['stage'].centroid_values - self.quantities['elevation'].centroid_values
d = num.maximum(d, threshold_depth)
v = ((uh)**2 + (vh)**2)**0.5/d
v = v * (d > threshold_depth)
for i in range(numprocs):
if myid == i:
print(' Processor ', myid)
gravSpeed = (g * d)**0.5
waveSpeed = abs(v) + gravSpeed
localTS = self.radii / num.maximum(waveSpeed, epsilon)
controlling_pt_ind = localTS.argmin()
print(f' * Smallest LocalTS at time {self.get_time()} is approximately: ', localTS[controlling_pt_ind])
print(' -- Location: ', round(self.centroid_coordinates[controlling_pt_ind,0]+self.geo_reference.xllcorner,2),\
round(self.centroid_coordinates[controlling_pt_ind,1]+self.geo_reference.yllcorner,2))
print(' -+ Speed: ', v[controlling_pt_ind])
print(' -* Gravity_wave_speed', gravSpeed[controlling_pt_ind])
print(' ')
barrier()
return
def diagnose_timestep(self,
threshold_dt: float | None = None,
top_n: int = 5,
threshold_depth: float | None = None,
verbose: bool = True) -> dict:
"""Diagnose CFL-limited timestep and NaN/inf in conserved quantities.
Computes the CFL-limiting dt per centroid (radius / wave_speed) and
reports the cells driving the global minimum. Also scans stage,
xmomentum and ymomentum for NaN / inf, which is the usual signature
of an ADER-2 (or any) timestepping blow-up. Intended to be called
from inside an evolve loop (e.g. each yieldstep) to localise where
and when the solver is about to fail.
Parameters
----------
threshold_dt : float, optional
If given, a WARNING is emitted whenever the global min local-dt
falls below this value. Useful to catch the moment dt collapses.
top_n : int
Number of worst (smallest-dt) cells per rank to include in the
returned dict and printed report.
threshold_depth : float, optional
Depths below this are clamped when computing speed, to avoid
division-by-tiny. Defaults to self.minimum_allowed_height.
verbose : bool
If True, print a per-rank report.
Returns
-------
dict
Keys:
time relative simulation time
current_timestep self.timestep (last accepted global dt)
cfl self.CFL setting
local_min_dt smallest cell-local CFL dt on this rank
global_min_dt smallest cell-local CFL dt across all ranks
local_max_speed max wave_speed on this rank
global_max_speed max wave_speed across all ranks
nan_counts {'stage': n, 'xmomentum': n, 'ymomentum': n}
first_nan_cell centroid xy of the first NaN/inf cell on
this rank, or None
worst_cells list of dicts (top_n) with keys
{triangle, x, y, depth, speed, local_dt}
"""
from anuga import numprocs
from anuga.config import g, epsilon
if threshold_depth is None:
threshold_depth = self.minimum_allowed_height
stage_c = self.quantities['stage'].centroid_values
elev_c = self.quantities['elevation'].centroid_values
uh_c = self.quantities['xmomentum'].centroid_values
vh_c = self.quantities['ymomentum'].centroid_values
bad_stage = ~num.isfinite(stage_c)
bad_uh = ~num.isfinite(uh_c)
bad_vh = ~num.isfinite(vh_c)
nan_counts = {
'stage': int(bad_stage.sum()),
'xmomentum': int(bad_uh.sum()),
'ymomentum': int(bad_vh.sum()),
}
first_nan_cell = None
any_bad = bad_stage | bad_uh | bad_vh
if any_bad.any():
i = int(num.argmax(any_bad))
x = self.centroid_coordinates[i, 0] + self.geo_reference.xllcorner
y = self.centroid_coordinates[i, 1] + self.geo_reference.yllcorner
first_nan_cell = (float(x), float(y), i)
# Local CFL dt per cell. Mask NaNs out so argmin/min are meaningful.
d = stage_c - elev_c
d_safe = num.where(num.isfinite(d), num.maximum(d, threshold_depth),
threshold_depth)
speed_h = num.sqrt(uh_c * uh_c + vh_c * vh_c) / d_safe
speed_h = num.where(num.isfinite(speed_h), speed_h, 0.0)
speed_h = speed_h * (d > threshold_depth)
grav_speed = num.sqrt(g * d_safe)
wave_speed = num.abs(speed_h) + grav_speed
local_dt = self.radii / num.maximum(wave_speed, epsilon)
# Don't let NaN cells dominate the argmin
local_dt = num.where(num.isfinite(local_dt), local_dt, num.inf)
local_min_dt = float(local_dt.min())
local_max_speed = float(num.nanmax(wave_speed)) if wave_speed.size else 0.0
order = num.argsort(local_dt)[:max(top_n, 1)]
worst_cells = []
for idx in order:
i = int(idx)
worst_cells.append({
'triangle': i,
'x': float(self.centroid_coordinates[i, 0]
+ self.geo_reference.xllcorner),
'y': float(self.centroid_coordinates[i, 1]
+ self.geo_reference.yllcorner),
'depth': float(d[i]),
'speed': float(wave_speed[i]),
'local_dt': float(local_dt[i]),
})
if numprocs > 1:
from mpi4py import MPI
comm = MPI.COMM_WORLD
global_min_dt = comm.allreduce(local_min_dt, op=MPI.MIN)
global_max_speed = comm.allreduce(local_max_speed, op=MPI.MAX)
global_nan = {
k: comm.allreduce(v, op=MPI.SUM) for k, v in nan_counts.items()
}
else:
global_min_dt = local_min_dt
global_max_speed = local_max_speed
global_nan = dict(nan_counts)
result = {
'time': self.get_time(),
'current_timestep': self.timestep,
'cfl': self.CFL,
'local_min_dt': local_min_dt,
'global_min_dt': global_min_dt,
'local_max_speed': local_max_speed,
'global_max_speed': global_max_speed,
'nan_counts': nan_counts,
'global_nan_counts': global_nan,
'first_nan_cell': first_nan_cell,
'worst_cells': worst_cells,
}
if verbose:
from anuga.parallel import myid
for i in range(numprocs):
if myid == i:
print(f' [diagnose_timestep] rank {myid} t={result["time"]:.3f}')
print(f' domain.timestep={self.timestep:.6e} CFL={self.CFL}')
print(f' local min cell-dt = {local_min_dt:.6e} max wave_speed = {local_max_speed:.4f}')
if numprocs > 1:
print(f' global min cell-dt = {global_min_dt:.6e} max wave_speed = {global_max_speed:.4f}')
total_bad = sum(nan_counts.values())
if total_bad:
print(f' NaN/inf cells: stage={nan_counts["stage"]} '
f'xmom={nan_counts["xmomentum"]} '
f'ymom={nan_counts["ymomentum"]}')
if first_nan_cell is not None:
fx, fy, fi = first_nan_cell
print(f' first bad cell: tri={fi} at ({fx:.2f}, {fy:.2f})')
for c in worst_cells:
print(f' tri {c["triangle"]:>7} '
f'xy=({c["x"]:.2f}, {c["y"]:.2f}) '
f'depth={c["depth"]:.4f} speed={c["speed"]:.4f} '
f'local_dt={c["local_dt"]:.6e}')
if threshold_dt is not None and global_min_dt < threshold_dt:
print(f' WARNING: global min cell-dt {global_min_dt:.3e} '
f'< threshold {threshold_dt:.3e}')
sys.stdout.flush()
barrier()
return result
# =======================================================================
# PETE: NEW METHODS FOR FOR PARALLEL STRUCTURES. Note that we assume the
# first "number_of_full_[nodes|triangles]" are full [nodes|triangles]
# For full triangles it is possible to enquire self.tri_full_flag == True
# =======================================================================
def get_number_of_full_triangles(self, *args, **kwargs) -> int:
return self.number_of_full_triangles
def get_full_centroid_coordinates(self, *args, **kwargs) -> num.ndarray:
C = self.mesh.get_centroid_coordinates(*args, **kwargs)
return C[:self.number_of_full_triangles, :]
def get_full_vertex_coordinates(self, *args, **kwargs) -> num.ndarray:
V = self.mesh.get_vertex_coordinates(*args, **kwargs)
return V[:3*self.number_of_full_triangles,:]
def get_full_triangles(self, *args, **kwargs) -> num.ndarray:
T = self.mesh.get_triangles(*args, **kwargs)
return T[:self.number_of_full_triangles,:]
def get_full_nodes(self, *args, **kwargs) -> num.ndarray:
N = self.mesh.get_nodes(*args, **kwargs)
return N[:self.number_of_full_nodes,:]
def get_tri_map(self) -> num.ndarray | None:
return self.tri_map
def get_inv_tri_map(self) -> num.ndarray | None:
return self.inv_tri_map
# ==============================================================================
# Multiprocessor Mode (1=openmp, 2=cupy (in development))
# ==============================================================================
# User-facing *per-domain* compute mode. This is the only genuinely
# per-domain choice — it selects the internal ``multiprocessor_mode``:
# 'legacy' -> mode 1: sw_domain_openmp_ext solver + serial-Python operators
# 'unified' -> mode 2: the unified gpu_ext C kernels (solver + operators)
# Whether 'unified' runs on CPU or offloads to a GPU is NOT a per-domain
# property: it is a *process-global* OpenMP offload setting controlled by
# the module function ``set_gpu_offload()`` (and only possible on a GPU
# build). On a CPU-only build, 'unified' simply runs CPU-multicore.
COMPUTE_MODES = ('legacy', 'unified')
def _mode2_mpi_available(self) -> bool:
"""True if ``sw_domain_gpu_ext`` was built with real C MPI support.
Mode 2 ('unified') performs its halo exchange at the C level
(``exchange_ghosts``). Without an MPI-enabled build that exchange is a
silent no-op, so a multi-rank 'unified' run would compute wrong results —
the selector falls back to 'legacy' (Python MPI exchange) in that case.
Serial (single-rank) 'unified' needs no MPI and is unaffected.
"""
try:
from anuga.shallow_water import sw_domain_gpu_ext as gpu_ext
return bool(gpu_ext.gpu_has_mpi())
except Exception:
return False
def compute_capabilities(self) -> dict:
"""Report which compute backends this build/run supports.
Returns a dict with:
'gpu_offload' : bool — process can offload mode-2 to a GPU device
(build supports it, device present, offload
not disabled); see :func:`set_gpu_offload`
'num_gpu_devices' : int — number of offload devices visible
'mpi' : bool — gpu_ext built with C MPI ('unified' parallel ok)
'modes' : list — per-domain modes available ('unified' only
when the gpu_ext extension is importable)
"""
try:
from anuga.shallow_water import sw_domain_gpu_ext as gpu_ext # noqa: F401
unified = True
ndev = int(gpu_ext.get_num_gpu_devices())
mpi = bool(gpu_ext.gpu_has_mpi())
except Exception:
unified, ndev, mpi = False, 0, False
modes = ['legacy'] + (['unified'] if unified else [])
return {'gpu_offload': gpu_offload_enabled(), 'num_gpu_devices': ndev,
'mpi': mpi, 'modes': modes}
def set_compute_mode(self, mode: str = 'unified', verbose: bool = False) -> None:
"""Select this domain's compute mode (per-domain).
Parameters
----------
mode : {'legacy', 'unified'}
- ``'legacy'`` — mode 1: the ``sw_domain_openmp_ext`` solver with
serial-Python fractional-step operators.
- ``'unified'`` — mode 2: the unified ``sw_domain_gpu_ext`` C kernels
(solver and operators). Runs CPU-multicore by default; offloads to
a GPU only when GPU offload is enabled process-wide via
:func:`anuga.set_gpu_offload` on a GPU-capable build.
This is a per-domain setting — different domains in one script may use
different modes. Whether 'unified' uses a GPU is a separate, process-wide
decision (see :func:`set_gpu_offload`), because OpenMP target offload is
a process-level runtime setting, not a per-domain one.
Under MPI, 'unified' requires a gpu_ext built with MPI; otherwise this
falls back to 'legacy' (whose Python MPI exchange is correct in parallel)
with a rank-0 warning. The active mode is recorded in
``self.compute_mode``; the original request in
``self.requested_compute_mode``.
"""
import warnings
if mode not in self.COMPUTE_MODES:
raise ValueError(
f"Invalid compute mode {mode!r}. Must be one of {self.COMPUTE_MODES}.")
requested = mode
if mode == 'unified':
# Parallel guard: 'unified' exchanges ghosts at the C level, a silent
# no-op without an MPI-enabled gpu_ext build. Under MPI that gives
# wrong results, so fall back to 'legacy' (Python MPI exchange).
try:
from anuga import numprocs
except Exception:
numprocs = 1
if numprocs > 1 and not self._mode2_mpi_available():
try:
from anuga import myid
except Exception:
myid = 0
if myid == 0:
warnings.warn(
f"compute mode 'unified' selected under MPI ({numprocs} ranks) but "
"this ANUGA build's sw_domain_gpu_ext was compiled without MPI; the "
"C-level ghost exchange would be a silent no-op and give wrong "
"parallel results. Falling back to 'legacy' (mode 1, Python MPI "
"exchange). Rebuild with MPI to run 'unified' in parallel.",
stacklevel=2)
mode = 'legacy'
self.requested_compute_mode = requested
self.compute_mode = mode
if mode == 'legacy':
self.multiprocessor_mode = MULTIPROCESSOR_OPENMP
self.use_c_rk_loop = False
else: # 'unified' -> mode 2 (unified gpu_ext kernels)
self.multiprocessor_mode = MULTIPROCESSOR_GPU
self.use_c_rk_loop = True
# Build the device interface now if boundaries are ready; otherwise
# defer to the first evolve(). Boundaries are typically set AFTER
# construction, so a default-'unified' domain must not require them
# at __init__ time.
if self._boundaries_ready():
self.set_gpu_interface()
if verbose:
print(f"Compute mode: requested {requested!r} -> active {self.compute_mode!r} "
f"(multiprocessor_mode={self.multiprocessor_mode}, "
f"gpu_offload={'on' if gpu_offload_enabled() else 'off'})")
def _boundaries_ready(self) -> bool:
"""True if real boundary objects are set — required to build the gpu_ext
device interface. After distribute() boundary_map may be
``{'exterior': None, 'ghost': None}`` (not None but no real boundaries).
"""
bmap = getattr(self, 'boundary_map', None)
return bool(bmap) and any(b is not None for b in bmap.values())
def _ensure_gpu_interface(self) -> None:
"""Build a deferred mode-2 device interface on demand.
For a default-'unified' domain the interface is built lazily (boundaries
are set after construction). Build it the first time a mode-2 path needs
it. If boundaries are still not set — e.g. a test that pokes the domain
without a full boundary setup — fall back to 'legacy' so the operation
can proceed rather than hit a None gpu_interface.
"""
if self.multiprocessor_mode != MULTIPROCESSOR_GPU or self.gpu_interface is not None:
return
if self._boundaries_ready():
self.set_gpu_interface()
else:
self.set_compute_mode('legacy')
def get_compute_mode(self) -> str:
"""Return the active per-domain compute mode: 'legacy' or 'unified'."""
return getattr(self, 'compute_mode', 'legacy')
def set_multiprocessor_mode(self, multiprocessor_mode: int = 1) -> None:
"""
Set multiprocessor mode (legacy integer API).
1. openmp - Python RK loop (use_c_rk_loop=False)
2. gpu/mpi - C RK loop (use_c_rk_loop=True)
Thin wrapper over :meth:`set_compute_mode`: 1 maps to ``'legacy'``, 2 to
``'unified'``. Whether 'unified' offloads to a GPU is a separate,
process-wide choice — see :func:`anuga.set_gpu_offload`. New code should
prefer :meth:`set_compute_mode`.
"""
if multiprocessor_mode not in [MULTIPROCESSOR_OPENMP, MULTIPROCESSOR_GPU]:
raise ValueError('Invalid multiprocessor mode. Must be one of [1,2] (openmp, gpu/mpi)')
if multiprocessor_mode == MULTIPROCESSOR_OPENMP:
self.set_compute_mode('legacy')
else:
self.set_compute_mode('unified')
@property
def use_c_rk2_loop(self):
"""Deprecated: use use_c_rk_loop instead."""
import warnings
warnings.warn(
"use_c_rk2_loop is deprecated; use use_c_rk_loop instead.",
DeprecationWarning, stacklevel=2)
return self.use_c_rk_loop
@use_c_rk2_loop.setter
def use_c_rk2_loop(self, value):
import warnings
warnings.warn(
"use_c_rk2_loop is deprecated; use use_c_rk_loop instead.",
DeprecationWarning, stacklevel=2)
self.use_c_rk_loop = value
def get_multiprocessor_mode(self) -> int:
"""
Get multiprocessor mode
1. openmp (in development)
2. gpu/mpi (in development)
"""
return self.multiprocessor_mode
[docs]
def set_omp_num_threads(self, omp_num_threads: int | None = None, verbose: bool = True) -> None:
"""Set the OpenMP thread count (process-wide).
OpenMP thread count is a process-level setting, not per-domain. This is a
thin wrapper that delegates to the module-level
:func:`anuga.set_omp_num_threads`; prefer that in new code. Kept for
backward compatibility, and records ``self.omp_num_threads``.
"""
self.omp_num_threads = set_omp_num_threads(omp_num_threads, verbose=verbose)
@property
def gpu(self):
"""
Shortcut to access the GPU interface.
Returns the gpu_interface object which provides FLOP counters and kernel wrappers.
Raises AttributeError if GPU mode is not enabled.
"""
if self.gpu_interface is None:
raise AttributeError(
"GPU interface not initialized. Call domain.set_multiprocessor_mode(2) first."
)
return self.gpu_interface
def set_gpu_interface(self):
if self.multiprocessor_mode == MULTIPROCESSOR_GPU and self.gpu_interface is None:
# Check that boundaries are properly set before GPU initialization
# After distribute(), boundary_map may be {'exterior': None, 'ghost': None}
# which is not None but has no actual boundary objects - this causes silent failures
if self.boundary_map is None:
raise RuntimeError(
"GPU mode requires boundaries to be set before calling set_multiprocessor_mode(2).\n"
"Please call domain.set_boundary({...}) BEFORE domain.set_multiprocessor_mode(2)."
)
has_real_boundary = any(b is not None for b in self.boundary_map.values())
if not has_real_boundary:
raise RuntimeError(
"GPU mode requires boundaries to be set before calling set_multiprocessor_mode(2).\n"
"Please call domain.set_boundary({...}) BEFORE domain.set_multiprocessor_mode(2).\n"
f"Current boundary_map has no boundary objects: {list(self.boundary_map.keys())}"
)
# Try OpenMP target offloading interface first
try:
from .sw_domain_gpu_omp import GPU_OMP_interface
self.gpu_interface = GPU_OMP_interface(self)
self.gpu_interface.setup()
# Only print from rank 0
from anuga import myid, numprocs
omp_num_threads = os.environ.get('OMP_NUM_THREADS', '1')
# Offload is a process-wide decision (set_gpu_offload), resolved
# against the build. False on a CPU-only build: 'unified' is CPU
# multicore, not GPU.
self.gpu_offload_active = gpu_offload_enabled()
if myid == 0:
device_id = self.gpu_interface.gpu_dom.device_id
print('+==============================================================================+')
if not self.gpu_offload_active:
print("| ANUGA compute mode: 'unified' CPU multicore (gpu_ext kernels, no offload) |")
print(f'| OMP_NUM_THREADS={omp_num_threads}')
elif device_id < 0:
print('| WARNING: No GPU devices found, running on CPU via OpenMP target offloading |')
else:
print(f'| GPU interface initialized: {numprocs} GPU(s) using OpenMP target offloading')
print('+==============================================================================+')
return
except Exception as e:
print(f'OpenMP GPU interface not available: {e}')
# Fall back to CUDA/CuPy interface
try:
import cupy as cp
test_cupy_array = cp.array([1,2,3])
from .sw_domain_cuda import GPU_interface
self.gpu_interface = GPU_interface(self)
self.gpu_interface.allocate_gpu_arrays()
self.gpu_interface.compile_gpu_kernels()
return
except Exception:
pass
# No GPU available
from anuga import myid
if myid == 0:
print('+==============================================================================+')
print('| WARNING: GPU not available, falling back to multiprocessor_mode 1 (OpenMP) |')
print('+==============================================================================+')
self.set_multiprocessor_mode(1)
################################################################################
# End of class Shallow Water Domain
################################################################################
# FIXME (Ole): Does this do anything?
def my_update_special_conditions(domain):
pass
if __name__ == "__main__":
pass