Source code for anuga.shallow_water.shallow_water_domain

"""
Finite-volume computations of the shallow water wave equation.

Title: ANGUA shallow_water_domain - 2D triangular domains for finite-volume
       computations of the shallow water wave equation.


Author: Ole Nielsen, Ole.Nielsen@ga.gov.au
        Stephen Roberts, Stephen.Roberts@anu.edu.au
        Duncan Gray, Duncan.Gray@ga.gov.au
        Gareth Davies, gareth.davies.ga.code@gmail.com

CreationDate: 2004

Description:
    This module contains a specialisation of class Generic_Domain from
    module generic_domain.py consisting of methods specific to the
    Shallow Water Wave Equation

    U_t + E_x + G_y = S

    where

    U = [w, uh, vh]
    E = [uh, u^2h + gh^2/2, uvh]
    G = [vh, uvh, v^2h + gh^2/2]
    S represents source terms forcing the system
    (e.g. gravity, friction, wind stress, ...)

    and _t, _x, _y denote the derivative with respect to t, x and y
    respectively.


    The quantities are

    symbol    variable name    explanation

    x         x                horizontal distance from origin [m]
    y         y                vertical distance from origin [m]
    z         elevation        elevation of bed on which flow is modelled [m]
    h         height           water height above z [m]
    w         stage            absolute water level, w = z+h [m]
    u                          speed in the x direction [m/s]
    v                          speed in the y direction [m/s]
    uh        xmomentum        momentum in the x direction [m^2/s]
    vh        ymomentum        momentum in the y direction [m^2/s]


    eta                        mannings friction coefficient [to appear]
    nu                         wind stress coefficient [to appear]

    The conserved quantities are w, uh, vh

Reference:
    Catastrophic Collapse of Water Supply Reservoirs in Urban Areas,
    Christopher Zoppou and Stephen Roberts,
    Journal of Hydraulic Engineering, vol. 127, No. 7 July 1999

    Hydrodynamic modelling of coastal inundation.
    Nielsen, O., S. Roberts, D. Gray, A. McPherson and A. Hitchman
    In Zerger, A. and Argent, R.M. (eds) MODSIM 2005 International Congress on
    Modelling and Simulation. Modelling and Simulation Society of Australia and
    New Zealand, December 2005, pp. 518-523. ISBN: 0-9758400-2-9.
    http://www.mssanz.org.au/modsim05/papers/nielsen.pdf

    See also: https://anuga.anu.edu.au and https://en.wikipedia.org/wiki/ANUGA_Hydro


Constraints: See license in the user guide
"""

from __future__ import annotations




# Decorator added for profiling
#------------------------------


def profileit(name):
    def inner(func):
        def wrapper(*args, **kwargs):
            import cProfile
            prof = cProfile.Profile()
            retval = prof.runcall(func, *args, **kwargs)

            # Note use of name from outer scope
            print(str(args[1])+'_'+name)
            prof.dump_stats(str(args[1])+'_'+name)
            return retval
        return wrapper
    return inner
#-----------------------------

import numpy as num
import sys
import os
import time
from typing import TYPE_CHECKING
from collections.abc import Callable, Iterator
from numpy.typing import ArrayLike

if TYPE_CHECKING:
    from datetime import datetime as DateTime
    from zoneinfo import ZoneInfo as ZoneInfoType


try:
    from zoneinfo import ZoneInfo
except ImportError:
    from backports.zoneinfo import ZoneInfo

try:
    import dill as pickle
except ImportError:
    import pickle

from anuga.abstract_2d_finite_volumes.generic_domain \
                    import Generic_Domain

from anuga.config import MULTIPROCESSOR_OPENMP, MULTIPROCESSOR_GPU
from anuga.config import LOW_FROUDE_OFF, LOW_FROUDE_1, LOW_FROUDE_2

from anuga.shallow_water.forcing import Cross_section
from anuga.utilities.numerical_tools import mean
from anuga.file.sww import SWW_file

import anuga.utilities.log as log

from anuga.utilities.parallel_abstraction import size, rank, get_processor_name
from anuga.utilities.parallel_abstraction import finalize, send, receive
from anuga.utilities.parallel_abstraction import pypar_available, barrier


#-----------------------------------------------------
# Code for profiling cuda version
#-----------------------------------------------------
def nvtxRangePush(*arg):
    pass
def nvtxRangePop(*arg):
    pass

try:
    from cupy.cuda.nvtx import RangePush as nvtxRangePush
    from cupy.cuda.nvtx import RangePop  as nvtxRangePop
except ImportError:
    pass

try:
    from nvtx import range_push as nvtxRangePush
    from nvtx import range_pop  as nvtxRangePop
except ImportError:
    pass


#-----------------------------------------------------
# Process-global GPU offload control
#
# Whether mode 2 ('unified') offloads to a GPU is a *process-level* OpenMP
# setting (the target-offload runtime ICV), not a per-domain property: a single
# process cannot run one domain on the GPU and another on the CPU with the same
# unified kernels. These module functions own that process-wide decision; the
# per-domain choice (legacy vs unified) lives on Domain.set_compute_mode().
#-----------------------------------------------------

def _gpu_ext_or_none():
    try:
        from anuga.shallow_water import sw_domain_gpu_ext as gpu_ext
        return gpu_ext
    except Exception:
        return None


def gpu_offload_supported() -> bool:
    """True if this build/run can offload mode 2 to a GPU device.

    Reflects build + hardware: False for a CPU-only build (``gpu_offload=false`` /
    ``CPU_ONLY_MODE`` — the standard pip/conda install), when no device is
    present, or when offload was disabled at launch via
    ``OMP_TARGET_OFFLOAD=disabled`` (``gpu_available()`` covers all three). This
    is a static capability and is *not* affected by :func:`set_gpu_offload`.
    """
    ge = _gpu_ext_or_none()
    return bool(ge.gpu_available()) if ge is not None else False


def gpu_offload_enabled() -> bool:
    """Return the resolved process-global offload state for mode 2 ('unified').

    True only when the build supports offload *and* it has not been switched off
    via :func:`set_gpu_offload`. Always False on a CPU-only build.
    """
    if not gpu_offload_supported():
        return False
    ge = _gpu_ext_or_none()
    return bool(ge.get_offload_enabled()) if ge is not None else False


def set_gpu_offload(enable: bool = True, verbose: bool = True) -> bool:
    """Enable or disable GPU offload for mode-2 ('unified') domains, process-wide.

    This is a *process-level* switch, not per-domain: it affects every domain
    that runs in 'unified' mode, because OpenMP target offload is a process-wide
    runtime setting (one process cannot run the same unified kernels on a GPU for
    one domain and on the CPU for another).

    The setting is honoured at GPU-domain init, where arrays are mapped to the
    chosen device. **Call it before building the first 'unified' domain** — once
    a domain's data is mapped to a device, switching this domain's offload would
    leave data and execution on different devices.

    Implemented via the OpenMP default-device ICV (``omp_set_default_device`` /
    a process-global flag in ``sw_domain_gpu_ext``) — robust and re-enableable,
    unlike mutating ``OMP_TARGET_OFFLOAD`` after the runtime has initialised.

    Parameters
    ----------
    enable : bool
        True to offload 'unified' domains to a GPU (GPU build + device required);
        False to force 'unified' to run CPU-multicore.
    verbose : bool
        Print a one-line confirmation.

    Returns
    -------
    bool
        The resolved offload state (:func:`gpu_offload_enabled`). Requesting
        ``enable=True`` on a build without offload support warns and returns
        False — never hard-fails.
    """
    import warnings

    ge = _gpu_ext_or_none()
    was_supported = gpu_offload_supported()

    if enable:
        # Clear any prior disable BEFORE checking capability, so re-enabling is
        # not blocked by our own OMP_TARGET_OFFLOAD=disabled.
        os.environ.pop('OMP_TARGET_OFFLOAD', None)
        if not gpu_offload_supported():
            warnings.warn(
                "set_gpu_offload(True): this ANUGA build has no GPU offload support "
                "(built with gpu_offload=false) or no device is present; 'unified' "
                "domains will run on CPU multicore. Rebuild with -Dgpu_offload=true "
                "and a GPU-capable compiler to enable offload.",
                stacklevel=2)
            enable = False
    elif was_supported:
        # Disabling offload on a GPU build. This forces the unified kernels onto
        # the host, but a GPU build (nvc -mp=gpu,multicore) runs the `omp target`
        # regions on the host through a slow fallback that does NOT scale with
        # threads — so this is for correctness A/B (GPU vs CPU give the same
        # results), NOT for performance. For fast CPU multicore, use a
        # gpu_offload=false (gcc) build instead.
        try:
            from anuga import myid
        except Exception:
            myid = 0
        if myid == 0:
            warnings.warn(
                "set_gpu_offload(False) on a GPU build: 'unified' will run on the "
                "host, but the nvc GPU build's host fallback is slow and does not "
                "scale with threads (use it for correctness checks, not timing). "
                "For fast CPU multicore, build with -Dgpu_offload=false.",
                stacklevel=2)

    if not enable:
        # OMP_TARGET_OFFLOAD=disabled is what actually keeps the `omp target`
        # regions (solver AND operators) on the host. The default-device flag
        # alone does NOT stop the solver offloading, so it must be set here.
        # The OpenMP runtime reads this at its first target region, so call
        # set_gpu_offload() before the first evolve()/domain build.
        os.environ['OMP_TARGET_OFFLOAD'] = 'disabled'

    # Keep the C-side flag consistent so gpu_domain_init picks the host device
    # and the inlet/culvert operators route there too (gpu_compute_device).
    if ge is not None:
        ge.set_offload_enabled(bool(enable))

    state = bool(enable)
    if verbose:
        print(f"GPU offload {'enabled' if state else 'disabled'} "
              f"(process-wide; 'unified' domains run on {'GPU' if state else 'CPU multicore'})")
    return state


def set_omp_num_threads(omp_num_threads: int | None = None, verbose: bool = True) -> int:
    """Set the OpenMP thread count for ANUGA kernels (process-wide).

    ``OMP_NUM_THREADS`` / ``omp_set_num_threads`` controls the whole process, so
    this is a module-level setting, not per-domain — it affects every domain's
    OpenMP regions (both the legacy ``sw_domain_openmp_ext`` solver and the
    unified ``gpu_ext`` kernels). ``Domain.set_omp_num_threads`` delegates here.

    Parameters
    ----------
    omp_num_threads : int or None
        Thread count. If None, use ``OMP_NUM_THREADS`` from the environment,
        defaulting to 1 when unset.
    verbose : bool
        Print a one-line confirmation.

    Returns
    -------
    int
        The thread count applied.
    """
    if omp_num_threads is None:
        omp_num_threads = os.environ.get('OMP_NUM_THREADS', None)
        if verbose:
            print(f'Using OMP_NUM_THREADS from environment: {omp_num_threads}')

    if omp_num_threads is None:
        omp_num_threads = 1  # Default to 1 if not set

    try:
        omp_num_threads = int(omp_num_threads)
    except (ValueError, TypeError):
        raise ValueError('OMP_NUM_THREADS must be an integer')

    # omp_set_num_threads is process-global: one call covers every OpenMP region
    # in the process, so routing through the legacy extension also sets the
    # thread count for the unified gpu_ext kernels.
    from .sw_domain_openmp_ext import set_omp_num_threads as set_omp_num_threads_ext
    set_omp_num_threads_ext(omp_num_threads)
    # Keep the env var consistent so banners / introspection / any subprocess
    # report the same count (the runtime ICV is already set above).
    os.environ['OMP_NUM_THREADS'] = str(omp_num_threads)

    if verbose:
        print(f'Setting omp_num_threads to {omp_num_threads}')
    return omp_num_threads


[docs] class Domain(Generic_Domain): """Object which encapulates the shallow water model This class is a specialization of class Generic_Domain from module generic_domain.py consisting of methods specific to the Shallow Water Wave Equation Shallow Water Wave Equation .. math:: U_t + E_x + G_y = S where .. math:: U = [w, uh, vh]^T .. math:: E = [uh, u^2h + gh^2/2, uvh] .. math:: G = [vh, uvh, v^2h + gh^2/2] S represents source terms forcing the system (e.g. gravity, friction, wind stress, ...) and _t, _x, _y denote the derivative with respect to t, x and y respectively. The quantities are .. list-table:: :widths: 25 25 50 :header-rows: 1 * - symbol - variable name - explanation * - x - x - horizontal distance from origin [m] * - y - y - vertical distance from origin [m] * - z - elevation - elevation of bed on which flow is modelled [m] * - h - height - water height above z [m] * - w - stage - absolute water level, w = z+h [m] * - u - - speed in the x direction [m/s] * - v - - speed in the y direction [m/s] * - uh - xmomentum - momentum in the x direction [m^2/s] * - vh - ymomentum - momentum in the y direction [m^2/s] * - - - * - eta - - mannings friction coefficient [to appear] * - nu - - wind stress coefficient [to appear] The conserved quantities are w, uh, vh """
[docs] def __init__(self, coordinates: ArrayLike | str | None = None, vertices: ArrayLike | None = None, boundary: dict | None = None, tagged_elements: dict | None = None, geo_reference=None, use_inscribed_circle: bool = False, mesh_filename: str | None = None, use_cache: bool = False, verbose: bool = False, conserved_quantities: list[str] | None = None, evolved_quantities: list[str] | None = None, other_quantities: list[str] | None = None, full_send_dict: dict | None = None, ghost_recv_dict: dict | None = None, starttime: float = 0, processor: int = 0, numproc: int = 1, number_of_full_nodes: int | None = None, number_of_full_triangles: int | None = None, ghost_layer_width: int = 2, **kwargs) -> None: """Instantiate a shallow water domain. :param coordinates: vertex locations for the mesh :param vertices: vertex indices defining the triangles of the mesh :param boundary: boundaries of the mesh """ # Define quantities for the shallow_water domain if conserved_quantities is None: conserved_quantities = ['stage', 'xmomentum', 'ymomentum'] if evolved_quantities is None: evolved_quantities = ['stage', 'xmomentum', 'ymomentum'] if other_quantities is None: other_quantities = ['elevation', 'friction', 'height', 'xvelocity', 'yvelocity', 'x', 'y'] # Selective array allocation per quantity type. # Quantities not listed default to 'evolved' (all arrays) for # backward compatibility with user-defined quantities. self._quantity_type_map = { 'stage': 'evolved', 'xmomentum': 'evolved', 'ymomentum': 'evolved', # elevation: centroid + edge only; gradients lazy (erosion operators # trigger allocation via compute_local_gradients when needed) 'elevation': 'edge_diagnostic', 'friction': 'centroid_only', 'height': 'edge_diagnostic', 'xvelocity': 'edge_diagnostic', 'yvelocity': 'edge_diagnostic', 'x': 'coordinate', 'y': 'coordinate', } Generic_Domain.__init__(self, coordinates, vertices, boundary, conserved_quantities, evolved_quantities, other_quantities, tagged_elements, geo_reference, use_inscribed_circle, mesh_filename, use_cache, verbose, full_send_dict, ghost_recv_dict, starttime, processor, numproc, number_of_full_nodes=number_of_full_nodes, number_of_full_triangles=number_of_full_triangles, ghost_layer_width=ghost_layer_width) #------------------------------- # Operator Data Structures #------------------------------- self.fractional_step_operators = [] self.kv_operator = None self.max_quantities_operator = None self.dplotter = None self.gpu_culvert_manager = None # Initialized when GPU mode + Boyd operators #------------------------------- # Set flow defaults #------------------------------- self.set_flow_algorithm() #------------------------------- # Set default multiprocessor mode # 1. Openmp # 2. Cuda #------------------------------- self.gpu_interface = None self.use_c_rk_loop = True # Use C RK loop (faster) vs Python-orchestrated GPU loop # Default compute mode: 'legacy' (mode 1). Set ANUGA_DEFAULT_COMPUTE_MODE=unified # to default new domains to mode 2 (the migration target); the device interface # is then built lazily at first evolve() so construction needs no boundaries. # SERIAL ONLY: under MPI (numprocs > 1) the env opt-in is ignored and domains # stay 'legacy'. Mode-2 parallel is validated for purpose-built setups (it must # be selected explicitly), but is not yet robust as a blanket default for every # parallel test's evolve pattern — defaulting all parallel domains to mode 2 # deadlocks the MPI path. Parallel unified is a later migration step. try: from anuga import numprocs except Exception: numprocs = 1 if (numprocs == 1 and os.environ.get('ANUGA_DEFAULT_COMPUTE_MODE', 'legacy').lower() == 'unified'): self.set_compute_mode('unified') else: self.set_compute_mode('legacy') #------------------------------- # C extension domain structure # Will be setup by setup_domain_openmp_ext #------------------------------- self._Domain_C_struct = None #------------------------------- # If environment variable OMP_NUM_THREADS is not set, # then set to default (1 thread). If a value is given to # the method, then it will override the default. #------------------------------ self.set_omp_num_threads(verbose=False) #------------------------------- # datetime and timezone #------------------------------- self.set_timezone() #------------------------------- # Forcing Terms # # Gravity is now incorporated in # compute_fluxes routine #------------------------------- from .friction import manning_friction_semi_implicit self.forcing_terms.append(manning_friction_semi_implicit) #------------------------------- # Stored output #------------------------------- self.set_store(True) self.set_store_centroids(True) self.set_store_vertices_uniquely(False) self.quantities_to_be_stored = {'elevation': 1, 'friction':1, 'stage': 2, 'xmomentum': 2, 'ymomentum': 2} #------------------------------- # Set up check pointing every n # yieldsteps #------------------------------- self.checkpoint = False self.yieldstep_counter = 0 self.checkpoint_step = 10 #------------------------------- # Useful auxiliary quantity # Set centroid and edge values directly from mesh coordinate arrays # without allocating vertex_values (kept lazy to save memory). # Edge formula matches _interpolate in quantity_openmp.c: # edge[0] = 0.5*(v[1]+v[2]), edge[1] = 0.5*(v[2]+v[0]), # edge[2] = 0.5*(v[0]+v[1]) #------------------------------- n = self.number_of_elements vx = self.vertex_coordinates[:, 0].reshape(n, 3) vy = self.vertex_coordinates[:, 1].reshape(n, 3) qx = self.quantities['x'] qx.centroid_values[:] = (vx[:, 0] + vx[:, 1] + vx[:, 2]) / 3.0 qx.edge_values[:, 0] = 0.5 * (vx[:, 1] + vx[:, 2]) qx.edge_values[:, 1] = 0.5 * (vx[:, 2] + vx[:, 0]) qx.edge_values[:, 2] = 0.5 * (vx[:, 0] + vx[:, 1]) qx.set_boundary_values_from_edges() qy = self.quantities['y'] qy.centroid_values[:] = (vy[:, 0] + vy[:, 1] + vy[:, 2]) / 3.0 qy.edge_values[:, 0] = 0.5 * (vy[:, 1] + vy[:, 2]) qy.edge_values[:, 1] = 0.5 * (vy[:, 2] + vy[:, 0]) qy.edge_values[:, 2] = 0.5 * (vy[:, 0] + vy[:, 1]) qy.set_boundary_values_from_edges() # For riverwalls, we need to know the 'edge_flux_type' for each edge # Edge-flux-type of 0 == Normal edge, with shallow water flux # 1 == riverwall # 2 == ? # etc # Lazy: allocated by _ensure_work_arrays() (no river walls) or by # create_riverwalls() (before evolve). Saves ~108 MB at N=2.25M. self.edge_flux_type = None # (3N,) int — 0=normal, 1=riverwall self.edge_river_wall_counter = None # (3N,) int — per-edge riverwall index self.number_of_riverwall_edges = 0 # Riverwalls -- initialise with dummy values # Presently only works with DE algorithms, will fail otherwise import anuga.structures.riverwall self.riverwallData=anuga.structures.riverwall.RiverWall(self) self.create_riverwalls = self.riverwallData.create_riverwalls ## Keep track of the fluxes through the boundaries ## Only works for DE algorithms at present max_time_substeps=3 # Maximum number of substeps supported by any timestepping method # boundary_flux_sum holds boundary fluxes on each sub-step [unused substeps = 0.] self.boundary_flux_sum=num.array([0.]*max_time_substeps) from anuga.operators.boundary_flux_integral_operator import boundary_flux_integral_operator self.boundary_flux_integral=boundary_flux_integral_operator(self) # Make an integer counting how many times we call compute_fluxes_central -- so we know which substep we are on #self.call=1 # List to store the volumes we computed before self.volume_history=[] # Work arrays actually read by the C extension — allocated lazily on the # first evolve step (via setup_Domain_C_struct → _ensure_work_arrays). self.x_centroid_work = None # (N,) scratch for velocity extrapolation self.y_centroid_work = None # (N,) scratch for velocity extrapolation # The following arrays were historically allocated but are confirmed dead # (C computation never reads them). They remain None (→ NULL in C struct) # for the entire simulation lifetime, saving ~600+ MB at N=2.25M. self.edge_flux_work = None # (9N,) — unused, NULL in C struct self.neigh_work = None # (9N,) — unused, NULL in C struct self.pressuregrad_work = None # (3N,) — unused, NULL in C struct #----------------------------------- # parameters for structures #----------------------------------- self.use_new_velocity_head = False
#------------------------------------------------ # Domain_C_struct is a cdef class with a custom __cinit__, # so Cython will not auto-generate a default pickling protocol for it; # when pickle reaches the Domain object and tries to pickle _Domain_C_struct, # you get TypeError: no default __reduce__ due to non-trivial __cinit__. # So we implement __getstate__ and __setstate__ to exclude it from pickling, # and recreate it lazily when needed. #------------------------------------------------ def __getstate__(self): state = self.__dict__.copy() # Do not pickle the C wrapper; it can be recreated state.pop('_Domain_C_struct', None) # The mode-2 ('unified') GPU interface wraps a cdef GPUDomain with a # non-trivial __cinit__ (not picklable) and holds device handles. Drop # it; _ensure_gpu_interface() rebuilds it lazily after unpickling. state.pop('gpu_interface', None) state.pop('_gpu_boundary_info_initialized', None) return state def __setstate__(self, state): self.__dict__.update(state) # Recreate C wrapper lazily when needed self._Domain_C_struct = None # Force the mode-2 device interface to be rebuilt on demand. self.gpu_interface = None def update_domain_c_struct(self): """Update the C domain structure from the Python Domain object. """ from .sw_domain_openmp_ext import update_Domain_C_struct update_Domain_C_struct(self) def _ensure_work_arrays(self): """Allocate the work arrays actually needed by the C extension. Called by ``setup_Domain_C_struct`` in the Cython extension before the C domain struct is populated. Only arrays that the C computation code actually reads are allocated here — confirmed-dead arrays remain None (passed as NULL to the C struct) for the full simulation lifetime. """ if self.x_centroid_work is not None: return # already allocated N = self.number_of_elements # ---- scratch arrays used by the C extrapolation kernel ---- self.x_centroid_work = num.zeros(N) # (N,) self.y_centroid_work = num.zeros(N) # (N,) # ---- per-element max wave speed (CFL timestep calculation) ---- self.max_speed = num.zeros(N, float) # (N,) # Note: edge_flux_type / edge_river_wall_counter stay None unless # create_riverwalls() is called; all other struct fields remain NULL. #--------------------------------------------------------------- # Plotting methods #--------------------------------------------------------------- def set_plotter(self, *args, **kwargs): """Set the plotter for this domain """ #FIXME SR: Should look into seeing if the triang can use the # triangulation from Domain rather than having two copies if self.dplotter is None: import anuga self.dplotter = anuga.Domain_plotter(self, *args, **kwargs) self.triang = self.dplotter.triang self.stage = self.dplotter.stage self.xmom = self.dplotter.xmom self.ymom = self.dplotter.ymom self.elev = self.dplotter.elev self.friction = self.dplotter.friction self.xvel = self.dplotter.xvel self.yvel = self.dplotter.yvel self.x = self.dplotter.x self.y = self.dplotter.y self.xc = self.dplotter.xc self.yc = self.dplotter.yc self.plot_mesh = self.dplotter.plot_mesh self.save_depth_frame = self.dplotter.save_depth_frame self.plot_depth_frame = self.dplotter.plot_depth_frame self.make_depth_animation = self.dplotter.make_depth_animation self.save_stage_frame = self.dplotter.save_stage_frame self.plot_stage_frame = self.dplotter.plot_stage_frame self.make_stage_animation = self.dplotter.make_stage_animation self.save_speed_frame = self.dplotter.save_speed_frame self.plot_speed_frame = self.dplotter.plot_speed_frame self.make_speed_animation = self.dplotter.make_speed_animation def triplot(self, *args, **kwargs): self.set_plotter() import matplotlib.pyplot as plt fig, ax = plt.subplots() lines = ax.triplot(self.triang, *args, **kwargs) ax.set_xlabel('Easting (m)') ax.set_ylabel('Northing (m)') return fig, ax, lines def tripcolor(self, *args, **kwargs): self.set_plotter() import matplotlib.pyplot as plt fig, ax = plt.subplots() im = ax.tripcolor(self.triang, *args, **kwargs) ax.set_xlabel('Easting (m)') ax.set_ylabel('Northing (m)') return fig, ax, im #============================================================== # Methods to set and get domain parameters # # FIXME SR: These (and other paramters) should be refactored # to save the underlying quantities in np.ndarray(s) for # efficient access in Cython #============================================================== @property def g(self) -> float: """Gravitational acceleration [m/s^2]""" return self._g @g.setter def g(self, value: float): """Set gravitational acceleration [m/s^2]""" self._g = value @property def timestep(self) -> float: """Current timestep [s]""" return self._timestep @timestep.setter def timestep(self, value: float): """Set current timestep [s]""" self._timestep = value @property def flux_timestep(self) -> float: """Current flux timestep [s]""" return self._flux_timestep @flux_timestep.setter def flux_timestep(self, value: float): """Set current flux timestep [s]""" self._flux_timestep = value #============================================================== # Set config defaults #============================================================== def _set_config_defaults(self): """Set the default values in this routine. That way we can inherit class and just redefine the defaults for the new class """ from anuga.config import minimum_storable_height from anuga.config import minimum_allowed_height, maximum_allowed_speed from anuga.config import g from anuga.config import tight_slope_limiters from anuga.config import extrapolate_velocity_second_order from anuga.config import alpha_balance from anuga.config import optimise_dry_cells from anuga.config import use_centroid_velocities from anuga.config import compute_fluxes_method from anuga.config import sloped_mannings_function from anuga.config import low_froude # Early algorithms need elevation to remain continuous self.set_using_discontinuous_elevation(False) self.set_minimum_allowed_height(minimum_allowed_height) self.maximum_allowed_speed = maximum_allowed_speed self.minimum_storable_height = minimum_storable_height self.g = g self.alpha_balance = alpha_balance self.tight_slope_limiters = tight_slope_limiters self.set_low_froude(low_froude) self.set_use_optimise_dry_cells(optimise_dry_cells) self.set_extrapolate_velocity(extrapolate_velocity_second_order) self.use_centroid_velocities = use_centroid_velocities self.set_sloped_mannings_function(sloped_mannings_function) self.set_compute_fluxes_method(compute_fluxes_method) self.set_store_centroids(False) def get_algorithm_parameters(self) -> dict: """ Get the standard parameter that are currently set (as a dictionary) """ parameters = {} parameters['minimum_allowed_height'] = self.minimum_allowed_height parameters['maximum_allowed_speed'] = self.maximum_allowed_speed parameters['minimum_storable_height'] = self.minimum_storable_height parameters['g'] = self.g parameters['alpha_balance'] = self.alpha_balance parameters['tight_slope_limiters'] = self.tight_slope_limiters parameters['optimise_dry_cells'] = self.optimise_dry_cells parameters['low_froude'] = self.low_froude parameters['use_centroid_velocities'] = self.use_centroid_velocities parameters['use_sloped_mannings'] = self.use_sloped_mannings parameters['compute_fluxes_method'] = self.get_compute_fluxes_method() parameters['flow_algorithm'] = self.get_flow_algorithm() parameters['CFL'] = self.get_cfl() parameters['timestepping_method'] = self.get_timestepping_method() parameters['extrapolate_velocity_second_order'] = self.extrapolate_velocity_second_order return parameters def print_algorithm_parameters(self) -> None: """ Print the standard parameters that are curently set (as a dictionary) """ print('#============================') print('# Domain Algorithm Parameters ') print('#============================') from pprint import pprint pprint(self.get_algorithm_parameters(),indent=4) print('#----------------------------') def _set_DE0_defaults(self): """Set up the defaults for running the flow_algorithm "DE0" A 'discontinuous elevation' method """ self._set_config_defaults() self.set_cfl(0.9) self.set_use_kinematic_viscosity(False) #self.timestepping_method='rk2'#'rk3'#'euler'#'rk2' self.set_timestepping_method('euler') self.set_using_discontinuous_elevation(True) self.set_using_centroid_averaging(True) self.set_compute_fluxes_method('DE') # Don't place any restriction on the minimum storable height #self.minimum_storable_height=-99999999999.0 self.minimum_allowed_height=1.0e-12 self.set_default_order(2) self.set_extrapolate_velocity() self.beta_w=0.5 self.beta_w_dry=0.0 self.beta_uh=0.5 self.beta_uh_dry=0.0 self.beta_vh=0.5 self.beta_vh_dry=0.0 #self.set_quantities_to_be_stored({'stage': 2, 'xmomentum': 2, # 'ymomentum': 2, 'elevation': 2, 'height':2}) #self.set_quantities_to_be_stored({'stage': 2, 'xmomentum': 2, # 'ymomentum': 2, 'elevation': 1}) self.set_store_centroids(True) self.optimise_dry_cells=False # We need the edge_coordinates for the extrapolation self.edge_coordinates=self.get_edge_midpoint_coordinates() # By default vertex values are NOT stored uniquely # for storage efficiency. We may override this (but not so important since # centroids are stored anyway # self.set_store_vertices_smoothly(False) self.maximum_allowed_speed=0.0 if self.processor == 0 and self.verbose: print('Domain: Using discontinuous elevation solver DE0') # print('##########################################################################') # print('#') # print('# Using discontinuous elevation solver DE0') # print('#') # print('# First order timestepping') # print('#') # print('# Make sure you use centroid values when reporting on important output quantities') # print('#') # print('##########################################################################') def _set_DE1_defaults(self): """Set up the defaults for running the flow_algorithm "DE1" A 'discontinuous elevation' method """ self._set_config_defaults() self.set_cfl(1.0) self.set_use_kinematic_viscosity(False) #self.timestepping_method='rk2'#'rk3'#'euler'#'rk2' self.set_timestepping_method(2) self.set_using_discontinuous_elevation(True) self.set_using_centroid_averaging(True) self.set_compute_fluxes_method('DE') # Don't place any restriction on the minimum storable height #self.minimum_storable_height=-99999999999.0 self.minimum_allowed_height=1.0e-5 self.set_default_order(2) self.set_extrapolate_velocity() self.beta_w=1.0 self.beta_w_dry=0.0 self.beta_uh=1.0 self.beta_uh_dry=0.0 self.beta_vh=1.0 self.beta_vh_dry=0.0 #self.set_quantities_to_be_stored({'stage': 2, 'xmomentum': 2, # 'ymomentum': 2, 'elevation': 2, 'height':2}) #self.set_quantities_to_be_stored({'stage': 2, 'xmomentum': 2, # 'ymomentum': 2, 'elevation': 1}) self.set_store_centroids(True) self.optimise_dry_cells=False # We need the edge_coordinates for the extrapolation self.edge_coordinates=self.get_edge_midpoint_coordinates() # By default vertex values are NOT stored uniquely # for storage efficiency. We may override this (but not so important since # centroids are stored anyway # self.set_store_vertices_smoothly(False) self.maximum_allowed_speed=0.0 if self.processor == 0 and self.verbose: print('##########################################################################') print('#') print('# Using discontinuous elevation solver DE1 ') print('#') print('# Uses rk2 timestepping') print('#') print('# Make sure you use centroid values when reporting on important output quantities') print('#') print('##########################################################################') def _set_DE_ader2_defaults(self): """Set up the defaults for running the flow_algorithm "DE_ader2" DE1 settings with ADER-2 (Cauchy-Kovalewski) timestepping. """ self._set_config_defaults() self.set_cfl(1.0) self.set_use_kinematic_viscosity(False) self.set_timestepping_method('ader2') self.set_using_discontinuous_elevation(True) self.set_using_centroid_averaging(True) self.set_compute_fluxes_method('DE') self.minimum_allowed_height = 1.0e-5 self.set_default_order(2) self.set_extrapolate_velocity() self.beta_w = 0.5 self.beta_w_dry = 0.0 self.beta_uh = 0.5 self.beta_uh_dry = 0.0 self.beta_vh = 0.5 self.beta_vh_dry = 0.0 self.set_store_centroids(True) self.optimise_dry_cells = False self.edge_coordinates = self.get_edge_midpoint_coordinates() self.maximum_allowed_speed = 0.0 if self.processor == 0 and self.verbose: print('##########################################################################') print('#') print('# Using discontinuous elevation solver DE_ader2') print('#') print('# Uses ADER-2 (Cauchy-Kovalewski) timestepping') print('#') print('# Make sure you use centroid values when reporting on important output quantities') print('#') print('##########################################################################') def _set_DE2_defaults(self): """Set up the defaults for running the flow_algorithm "DE2" A 'discontinuous elevation' method """ self._set_config_defaults() self.set_cfl(1.0) self.set_use_kinematic_viscosity(False) #self.timestepping_method='rk2'#'rk3'#'euler'#'rk2' self.set_timestepping_method(3) self.set_using_discontinuous_elevation(True) self.set_using_centroid_averaging(True) self.set_compute_fluxes_method('DE') # Don't place any restriction on the minimum storable height #self.minimum_storable_height=-99999999999.0 self.minimum_allowed_height=1.0e-5 self.set_default_order(2) self.set_extrapolate_velocity() self.beta_w=1.0 self.beta_w_dry=0.0 self.beta_uh=1.0 self.beta_uh_dry=0.0 self.beta_vh=1.0 self.beta_vh_dry=0.0 #self.set_quantities_to_be_stored({'stage': 2, 'xmomentum': 2, # 'ymomentum': 2, 'elevation': 2, 'height':2}) #self.set_quantities_to_be_stored({'stage': 2, 'xmomentum': 2, # 'ymomentum': 2, 'elevation': 1}) self.set_store_centroids(True) self.optimise_dry_cells=False # We need the edge_coordinates for the extrapolation self.edge_coordinates=self.get_edge_midpoint_coordinates() # By default vertex values are NOT stored uniquely # for storage efficiency. We may override this (but not so important since # centroids are stored anyway # self.set_store_vertices_smoothly(False) self.maximum_allowed_speed=0.0 if self.processor == 0 and self.verbose: print('##########################################################################') print('#') print('# Using discontinuous elevation solver DE2') print('#') print('# Using rk3 timestepping') print('#') print('# Make sure you use centroid values when reporting on important output quantities') print('#') print('##########################################################################') def _set_DE1_7_defaults(self): """Set up the defaults for running the flow_algorithm "DE0_7" A 'discontinuous elevation' method """ self._set_config_defaults() self.set_cfl(1.0) self.set_use_kinematic_viscosity(False) #self.timestepping_method='rk2'#'rk3'#'euler'#'rk2' self.set_timestepping_method(2) self.set_using_discontinuous_elevation(True) self.set_using_centroid_averaging(True) self.set_compute_fluxes_method('DE') # Don't place any restriction on the minimum storable height #self.minimum_storable_height=-99999999999.0 self.minimum_allowed_height=1.0e-12 self.set_default_order(2) self.set_extrapolate_velocity() self.beta_w=0.75 self.beta_w_dry=0.1 self.beta_uh=0.75 self.beta_uh_dry=0.1 self.beta_vh=0.75 self.beta_vh_dry=0.1 #self.set_quantities_to_be_stored({'stage': 2, 'xmomentum': 2, # 'ymomentum': 2, 'elevation': 2, 'height':2}) #self.set_quantities_to_be_stored({'stage': 2, 'xmomentum': 2, # 'ymomentum': 2, 'elevation': 1}) self.set_store_centroids(True) self.optimise_dry_cells=False # We need the edge_coordinates for the extrapolation self.edge_coordinates=self.get_edge_midpoint_coordinates() # By default vertex values are NOT stored uniquely # for storage efficiency. We may override this (but not so important since # centroids are stored anyway # self.set_store_vertices_smoothly(False) self.maximum_allowed_speed=0.0 if self.processor == 0 and self.verbose: print('##########################################################################') print('#') print('# Using discontinuous elevation solver DE1_7 ') print('#') print('# A slightly more diffusive version of DE1, does use rk2 timestepping') print('#') print('# Make sure you use centroid values when reporting on important output quantities') print('#') print('##########################################################################') def _set_DE0_7_defaults(self): """Set up the defaults for running the flow_algorithm "DE3" A 'discontinuous elevation' method """ self._set_config_defaults() self.set_cfl(0.9) self.set_use_kinematic_viscosity(False) #self.timestepping_method='rk2'#'rk3'#'euler'#'rk2' self.set_timestepping_method(1) self.set_using_discontinuous_elevation(True) self.set_using_centroid_averaging(True) self.set_compute_fluxes_method('DE') # Don't place any restriction on the minimum storable height #self.minimum_storable_height=-99999999999.0 self.minimum_allowed_height=1.0e-12 self.set_default_order(2) self.set_extrapolate_velocity() self.beta_w=0.7 self.beta_w_dry=0.1 self.beta_uh=0.7 self.beta_uh_dry=0.1 self.beta_vh=0.7 self.beta_vh_dry=0.1 #self.set_quantities_to_be_stored({'stage': 2, 'xmomentum': 2, # 'ymomentum': 2, 'elevation': 2, 'height':2}) #self.set_quantities_to_be_stored({'stage': 2, 'xmomentum': 2, # 'ymomentum': 2, 'elevation': 1}) self.set_store_centroids(True) self.optimise_dry_cells=False # We need the edge_coordinates for the extrapolation self.edge_coordinates=self.get_edge_midpoint_coordinates() # By default vertex values are NOT stored uniquely # for storage efficiency. We may override this (but not so important since # centroids are stored anyway # self.set_store_vertices_smoothly(False) self.maximum_allowed_speed=0.0 if self.processor == 0 and self.verbose: print('Domain: Using discontinuous elevation solver DE0_7') # print('##########################################################################') # print('#') # print('# Using discontinuous elevation solver DE0_7') # print('#') # print('# A slightly less diffusive version than DE0, uses euler timestepping') # print('#') # print('# Make sure you use centroid values when reporting on important output quantities') # print('#') # print('##########################################################################') def update_special_conditions(self): my_update_special_conditions(self) # Note Padarn 06/12/12: The following line decorates # the set_quantity function to be profiled individually. # Need to uncomment the decorator at top of file. #@profileit("set_quantity.profile")
[docs] def set_quantity(self, name: str, *args, **kwargs) -> None: """Set values for named quantity We have to do something special for 'elevation' otherwise pass through to generic set_quantity """ # if name == 'elevation': # stage_c = self.get_quantity('stage').centroid_values # elev_c = self.get_quantity('elevation').centroid_values # height_c = stage_c - elev_c # Generic_Domain.set_quantity(self, name, *args, **kwargs) # stage_c[:] = elev_c + height_c # else: # Generic_Domain.set_quantity(self, name, *args, **kwargs) Generic_Domain.set_quantity(self, name, *args, **kwargs)
def set_timezone(self, tz: str | ZoneInfoType | None = None) -> None: """Set timezone for domain :param tz: either a timezone object or string We recommend using the ZoneInfo provided by zoneinfo. Default is ZoneInfo('UTC') Example: Set default timezone UTC >>> domain.set_timezone() Example: Set timezone using tsdata string >>> domain.set_timezone('Australia/Syndey') Example: Set timezone using ZoneInfo timezone >>> from zoneinfo import ZoneInfo >>> new_tz = ZoneInfo('Australia/Sydney') >>> domain.set_timezone(new_tz) """ try: from zoneinfo import ZoneInfo except ImportError: from backports.zoneinfo import ZoneInfo if tz is None: new_tz = ZoneInfo('UTC') elif isinstance(tz,str): new_tz = ZoneInfo(tz) elif isinstance(tz, ZoneInfo): new_tz = tz else: msg = "Unknown timezone %s" % tz raise Exception(msg) self.timezone = new_tz def get_timezone(self) -> ZoneInfoType: """Retrieve current domain timezone""" return self.timezone def get_datetime(self, timestamp: float | None = None) -> DateTime: """Retrieve datetime corresponding to current timestamp wrt to domain timezone param: timestamp: return datetime corresponding to given timestamp""" from datetime import datetime try: from datetime import UTC except ImportError: from datetime import timezone UTC = timezone.utc if timestamp is None: timestamp = self.get_time() #utc_datetime = datetime.utcfromtimestamp(timestamp).replace(tzinfo=ZoneInfo('UTC')) utc_datetime = datetime.fromtimestamp(timestamp,UTC) current_dt = utc_datetime.astimezone(self.timezone) return current_dt def set_starttime(self, timestamp: float | DateTime = 0.0) -> None: """Set the starttime for the evolution :param timestamp: Either a float or a datetime object Essentially we use unix time as our absolute time. So time = 0 corresponds to Jan 1st 1970 UTC Use naive datetime which will be localized to the domain timezone or or use zoneinfo.ZoneInfo to set the timezone of datetime. Don't use the tzinfo argument of datetime to set timezone as this does not work! Example: Without setting timezone for the `domain` and the `starttime` then time calculations are all based on UTC. Note the timestamp, which is time in seconds from 1st Jan 1970 UTC. >>> import anuga >>> from zoneinfo import ZoneInfo >>> from datetime import datetime >>> >>> domain = anuga.rectangular_cross_domain(10,10) >>> dt = datetime(2021,3,21,18,30) >>> domain.set_starttime(dt) >>> print(domain.get_datetime(), 'TZ', domain.get_timezone(), 'Timestamp: ', domain.get_time()) 2021-03-21 18:30:00+00:00 TZ UTC Timestamp: 1616351400.0 Example: Setting timezone for the `domain`, then naive `datetime` will be localizes to the `domain` timezone. Note the timestamp, which is time in seconds from 1st Jan 1970 UTC. >>> import anuga >>> from zoneinfo import ZoneInfo >>> from datetime import datetime >>> >>> domain = anuga.rectangular_cross_domain(10,10) >>> AEST = ZoneInfo('Australia/Sydney') >>> domain.set_timezone(AEST) >>> >>> dt = datetime(2021,3,21,18,30) >>> domain.set_starttime(dt) >>> print(domain.get_datetime(), 'TZ', domain.get_timezone(), 'Timestamp: ', domain.get_time()) 2021-03-21 18:30:00+11:00 TZ Australia/Sydney Timestamp: 1616311800.0 Example: Setting timezone for the `domain`, and setting the timezone for the `datetime`. Note the timestamp, which is time in seconds from 1st Jan 1970 UTC is the same as the previous example. >>> import anuga >>> from zoneinfo import ZoneInfo >>> from datetime import datetime >>> >>> domain = anuga.rectangular_cross_domain(10,10) >>> >>> ACST = ZoneInfo('Australia/Adelaide') >>> domain.set_timezone(ACST) >>> >>> AEST = ZoneInfo('Australia/Sydney') >>> dt = datetime(2021,3,21,18,30, tzinfo=AEST) >>> >>> domain.set_starttime(dt) >>> print(domain.get_datetime(), 'TZ', domain.get_timezone(), 'Timestamp: ', domain.get_time()) 2021-03-21 18:00:00+10:30 TZ Australia/Adelaide Timestamp: 1616311800.0 """ from datetime import datetime if self.evolved_called: msg = ('Can\'t change simulation start time once evolve has ' 'been called') raise Exception(msg) if isinstance(timestamp, datetime): if timestamp.tzinfo is None: dt = timestamp.replace(tzinfo=self.timezone) time = dt.timestamp() else: time = timestamp.timestamp() else: time = float(timestamp) self.starttime = time # starttime is now the origin for relative_time self.set_relative_time(0.0) def get_starttime(self, datetime: bool = False) -> float | DateTime: """return starttime, either as timestamp, or as a datetime""" starttime = self.starttime if not datetime: return starttime else: return self.get_datetime(starttime) def set_store(self, flag: bool = True) -> None: """Set whether data saved to sww file. """ self.store = flag def get_store(self) -> bool: """Get whether data saved to sww file. """ return self.store def set_store_centroids(self, flag: bool = True) -> None: """Set whether centroid data is saved to sww file. """ self.store_centroids = flag def get_store_centroids(self) -> bool: """Get whether data saved to sww file. """ return self.store_centroids def set_checkpointing(self, checkpoint: bool = True, checkpoint_dir: str = 'CHECKPOINTS', checkpoint_step: int = 10, checkpoint_time: float | None = None) -> None: """Set up checkpointing. param checkpoint: Default = True. Set to False will turn off checkpointing param checkpoint_dir: Where to store checkpointing files param checkpoint_step: Save checkpoint files after this many yieldsteps param checkpoint_time: If set, over-rides checkpoint_step. save checkpoint files after this amount of walltime """ if checkpoint: from anuga import myid # On processor 0 create checkpoint directory if necessary if myid == 0: if True: if not os.path.exists(checkpoint_dir): os.mkdir(checkpoint_dir) assert os.path.exists(checkpoint_dir) self.checkpoint_dir = checkpoint_dir if checkpoint_time is not None: #import time self.walltime_prev = time.time() self.checkpoint_time = checkpoint_time self.checkpoint_step = 0 else: self.checkpoint_step = checkpoint_step self.checkpoint = True #print(self.checkpoint_dir, self.checkpoint_step) else: self.checkpoint = False def set_sloped_mannings_function(self, flag: bool = True) -> None: """Set mannings friction function to use the sloped wetted area. The flag is tested in the python wrapper mannings_friction_implicit """ if flag: self.use_sloped_mannings = True else: self.use_sloped_mannings = False def set_compute_fluxes_method(self, flag: str = 'original') -> None: """Set method for computing fluxes. Currently original wb_1 wb_2 wb_3 tsunami DE """ compute_fluxes_methods = ['original', 'wb_1', 'wb_2', 'wb_3', 'tsunami', 'DE'] if flag in compute_fluxes_methods: self.compute_fluxes_method = flag else: msg = 'Unknown compute_fluxes_method. \nPossible choices are:\n'+ \ ', '.join(compute_fluxes_methods)+'.' raise Exception(msg) def get_compute_fluxes_method(self) -> str: """Get method for computing fluxes. See set_compute_fluxes_method for possible choices. """ return self.compute_fluxes_method
[docs] def set_flow_algorithm(self, algorithm: str = 'DE0') -> None: """Set combination of slope limiting and time stepping Currently DE0 DE1 DE2 DE0_7 DE1_7 """ algorithm = str(algorithm) # Replace any dots with dashes algorithm = algorithm.replace('.', '_') flow_algorithms = ['DE0', 'DE1', 'DE2', \ 'DE0_7', 'DE1_7', 'DE_ader2'] if algorithm in flow_algorithms: self.flow_algorithm = algorithm else: msg = 'Unknown flow_algorithm. \nPossible choices are:\n'+ \ ', '.join(flow_algorithms)+'.' raise Exception(msg) if self.flow_algorithm == 'DE0': self._set_DE0_defaults() if self.flow_algorithm == 'DE1': self._set_DE1_defaults() if self.flow_algorithm == 'DE2': self._set_DE2_defaults() if self.flow_algorithm == 'DE0_7': self._set_DE0_7_defaults() if self.flow_algorithm == 'DE1_7': self._set_DE1_7_defaults() if self.flow_algorithm == 'DE_ader2': self._set_DE_ader2_defaults()
[docs] def get_flow_algorithm(self) -> str: """ Get method used for timestepping and spatial discretisation """ return self.flow_algorithm
# def set_gravity_method(self): # """Gravity method is determined by the compute_fluxes_method # This is now not used, as gravity is combine in the compute_fluxes method # """ # if self.get_compute_fluxes_method() == 'original': # self.forcing_terms[0] = gravity # elif self.get_compute_fluxes_method() == 'wb_1': # self.forcing_terms[0] = gravity_wb # elif self.get_compute_fluxes_method() == 'wb_2': # self.forcing_terms[0] = gravity # else: # raise Exception('undefined compute_fluxes method') def set_extrapolate_velocity(self, flag: bool = True) -> None: """ Extrapolation routine uses momentum by default, can change to velocity extrapolation which seems to work better. """ if flag is True: self.extrapolate_velocity_second_order = True elif flag is False: self.extrapolate_velocity_second_order = False
[docs] def set_low_froude(self, low_froude: int = 0) -> None: """ For low Froude problems the standard flux calculations can lead to excessive damping. Set low_froude to 1 or 2 for flux calculations which minimize the damping in this case. """ assert low_froude in [LOW_FROUDE_OFF, LOW_FROUDE_1, LOW_FROUDE_2] self.low_froude = low_froude
def set_use_optimise_dry_cells(self, flag: bool = True) -> None: """ Try to optimize calculations where region is dry """ if flag is True: self.optimise_dry_cells = int(True) elif flag is False: self.optimise_dry_cells = int(False) def set_use_kinematic_viscosity(self, flag: bool = True) -> None: from anuga.operators.kinematic_viscosity_operator import Kinematic_viscosity_operator if flag : # Create Operator if necessary if self.kv_operator is None: self.kv_operator = Kinematic_viscosity_operator(self) else: if self.kv_operator is None: return else: # Remove operator from fractional_step_operators self.fractional_step_operators.remove(self.kv_operator) self.kv_operator = None
[docs] def set_collect_max_quantities(self, update_frequency=1, collection_start_time=0., velocity_zero_height=None, store_to_sww=True) -> Collect_max_quantities_operator: """Create (or return existing) Collect_max_quantities_operator on this domain. Tracks running maxima of stage, depth, speed, and momentum magnitude (||(uh, vh)||) over the simulation. Call once before domain.evolve(). Parameters ---------- update_frequency : int Update maxima every this many timesteps (default 1). collection_start_time : float Only collect after this simulation time (default 0). velocity_zero_height : float or None Zero velocity below this depth; defaults to minimum_allowed_height. store_to_sww : bool If True (default), write running maxima to the SWW file every yield step as centroid quantities max_stage_c, max_depth_c, max_speed_c, max_uh_c. Returns ------- Collect_max_quantities_operator """ from anuga.operators.collect_max_quantities_operator import \ Collect_max_quantities_operator if self.max_quantities_operator is None: self.max_quantities_operator = Collect_max_quantities_operator( self, update_frequency=update_frequency, collection_start_time=collection_start_time, velocity_zero_height=velocity_zero_height, store_to_sww=store_to_sww, ) return self.max_quantities_operator
def set_beta(self, beta: float) -> None: """Shorthand to assign one constant value [0,2] to all limiters. 0 Corresponds to first order, where as larger values make use of the second order scheme. """ self.beta_w = beta self.beta_w_dry = beta self.quantities['stage'].beta = beta self.beta_uh = beta self.beta_uh_dry = beta self.quantities['xmomentum'].beta = beta self.beta_vh = beta self.beta_vh_dry = beta self.quantities['ymomentum'].beta = beta def set_betas(self, beta_w: float, beta_w_dry: float, beta_uh: float, beta_uh_dry: float, beta_vh: float, beta_vh_dry: float) -> None: """Assign beta values in the range [0,2] to all limiters. 0 Corresponds to first order, where as larger values make use of the second order scheme. """ self.beta_w = beta_w self.beta_w_dry = beta_w_dry self.quantities['stage'].beta = beta_w self.beta_uh = beta_uh self.beta_uh_dry = beta_uh_dry self.quantities['xmomentum'].beta = beta_uh self.beta_vh = beta_vh self.beta_vh_dry = beta_vh_dry self.quantities['ymomentum'].beta = beta_vh def set_store_vertices_uniquely(self, flag: bool = True, reduction: Callable | None = None) -> None: """Decide whether vertex values should be stored uniquely as computed in the model (True) or whether they should be reduced to one value per vertex using self.reduction (False). """ # FIXME (Ole): how about using the word "continuous vertex values" or # "continuous stage surface" self.smooth = not flag # Reduction operation for get_vertex_values if reduction is None: self.reduction = mean #self.reduction = min #Looks better near steep slopes def set_store_vertices_smoothly(self, flag: bool = True, reduction: Callable | None = None) -> None: """Decide whether vertex values should be stored smoothly (one value per vertex) or uniquely as computed in the model (False). """ # FIXME (Ole): how about using the word "continuous vertex values" or # "continuous stage surface" self.smooth = flag # Reduction operation for get_vertex_values if reduction is None: self.reduction = mean #self.reduction = min #Looks better near steep slopes def set_minimum_storable_height(self, minimum_storable_height: float) -> None: """Set the minimum depth that will be written to an SWW file. minimum_storable_height minimum allowed SWW depth is in meters This is useful for removing thin water layers that seems to be caused by friction creep. """ self.minimum_storable_height = minimum_storable_height def get_minimum_storable_height(self) -> float: return self.minimum_storable_height def set_minimum_allowed_height(self, minimum_allowed_height: float) -> None: """Set minimum depth that will be recognised in the numerical scheme. minimum_allowed_height minimum allowed depth in meters The parameter H0 (Minimal height for flux computation) is also set by this function. """ #FIXME (Ole): rename H0 to minimum_allowed_height_in_flux_computation #FIXME (Ole): Maybe use histogram to identify isolated extreme speeds #and deal with them adaptively similarly to how we used to use 1 order #steps to recover. self.minimum_allowed_height = minimum_allowed_height self.H0 = minimum_allowed_height def get_minimum_allowed_height(self) -> float: return self.minimum_allowed_height def set_maximum_allowed_speed(self, maximum_allowed_speed: float) -> None: """Set the maximum particle speed that is allowed in water shallower than minimum_allowed_height. maximum_allowed_speed This is useful for controlling speeds in very thin layers of water and at the same time allow some movement avoiding pooling of water. """ self.maximum_allowed_speed = maximum_allowed_speed def set_points_file_block_line_size(self, points_file_block_line_size: int) -> None: """ """ self.points_file_block_line_size = points_file_block_line_size # FIXME: Probably obsolete in its curren form def set_quantities_to_be_stored(self, q: dict[str, int] | list[str] | None) -> None: """Specify which quantities will be stored in the SWW file. q must be either: - a dictionary with quantity names - a list of quantity names (for backwards compatibility) - None The format of the dictionary is as follows quantity_name: flag where flag must be either 1 or 2. If flag is 1, the quantity is considered static and will be stored once at the beginning of the simulation in a 1D array. If flag is 2, the quantity is considered time dependent and it will be stored at each yieldstep by appending it to the appropriate 2D array in the sww file. If q is None, storage will be switched off altogether. Once the simulation has started and thw sww file opened, this function will have no effect. The format, where q is a list of names is for backwards compatibility only. It will take the specified quantities to be time dependent and assume 'elevation' to be static regardless. """ if q is None: self.quantities_to_be_stored = {} self.store = False return # Check correctness for quantity_name in q: msg = ('Quantity %s is not a valid conserved quantity' % quantity_name) assert quantity_name in self.quantities, msg assert isinstance(q, dict) self.quantities_to_be_stored = q def get_wet_elements(self, indices: list[int] | num.ndarray | None = None, minimum_height: float | None = None) -> num.ndarray: """Return indices for elements where h > minimum_allowed_height Optional argument: indices is the set of element ids that the operation applies to. Usage: indices = get_wet_elements() Note, centroid values are used for this operation """ # Water depth below which it is considered to be 0 in the model # FIXME (Ole): Allow this to be specified as a keyword argument as well from anuga.config import minimum_allowed_height if minimum_height is None: minimum_height = minimum_allowed_height elevation = self.get_quantity('elevation').\ get_values(location='centroids', indices=indices) stage = self.get_quantity('stage').\ get_values(location='centroids', indices=indices) depth = stage - elevation # Select indices for which depth > 0 wet_indices = num.compress(depth > minimum_height, num.arange(len(depth))) return wet_indices def load_balance_statistics(self, minimum_height: float | None = None) -> dict: """Return load balance statistics for this domain (single-rank version). For a parallel domain use :meth:`Parallel_domain.load_balance_statistics` which gathers across all MPI ranks via Allgather. This serial version returns a dict with length-1 arrays so the interface is identical. Parameters ---------- minimum_height : float, optional Depth threshold for "wet" classification. Defaults to ``anuga.config.minimum_allowed_height``. Returns ------- dict Keys and shapes are the same as the parallel version:: n_full int[1] total triangle count n_ghost int[1] 0 (no ghost triangles in serial) n_wet_full int[1] wet triangle count wet_fraction float[1] n_wet_full / n_full ghost_fraction float[1] 0.0 wall_time float[1] total wall time since evolve() started comm_time float[1] 0.0 reduce_wait_time float[1] 0.0 compute_time float[1] same as wall_time imbalance_ratio float 1.0 wet_compute_corr float nan """ from time import time as walltime from anuga.config import minimum_allowed_height as default_mah if minimum_height is None: minimum_height = default_mah n_full = self.get_number_of_triangles() stage_c = self.get_quantity('stage').centroid_values elev_c = self.get_quantity('elevation').centroid_values n_wet = int(num.sum((stage_c - elev_c) > minimum_height)) w_time = walltime() - self.evolve_start_walltime return { 'n_full': num.array([n_full], dtype=int), 'n_ghost': num.array([0], dtype=int), 'n_wet_full': num.array([n_wet], dtype=int), 'wet_fraction': num.array([n_wet / n_full if n_full > 0 else 0.0]), 'ghost_fraction': num.array([0.0]), 'wall_time': num.array([w_time]), 'comm_time': num.array([0.0]), 'reduce_wait_time': num.array([0.0]), 'compute_time': num.array([w_time]), 'imbalance_ratio': 1.0, 'wet_compute_corr': float('nan'), } def print_load_balance_statistics(self, minimum_height: float | None = None) -> None: """Print a load balance summary to stdout. For a single-process domain this just reports wet fraction and triangle count. The parallel override prints a per-rank table. Parameters ---------- minimum_height : float, optional Passed through to :meth:`load_balance_statistics`. """ stats = self.load_balance_statistics(minimum_height=minimum_height) n = stats['n_full'][0] n_wet = stats['n_wet_full'][0] wf = stats['wet_fraction'][0] w_time = stats['wall_time'][0] print(f"Load balance: triangles={n}, wet={n_wet} ({100*wf:.1f}%), " f"wall_time={w_time:.3f}s") def get_maximum_inundation_elevation(self, indices: list[int] | num.ndarray | None = None, minimum_height: float | None = None) -> float: """Return highest elevation where h > 0 Optional argument: indices is the set of element ids that the operation applies to. minimum_height for testing h > minimum_height Usage: q = get_maximum_inundation_elevation() Note, centroid values are used for this operation """ wet_elements = self.get_wet_elements(indices, minimum_height) return self.get_quantity('elevation').\ get_maximum_value(indices=wet_elements) def get_maximum_inundation_location(self, indices: list[int] | num.ndarray | None = None) -> tuple[float, float]: """Return location of highest elevation where h > 0 Optional argument: indices is the set of element ids that the operation applies to. Usage: q = get_maximum_inundation_location() Note, centroid values are used for this operation """ wet_elements = self.get_wet_elements(indices) return self.get_quantity('elevation').\ get_maximum_location(indices=wet_elements) def get_global_wet_element_count(self, indices: list[int] | num.ndarray | None = None, minimum_height: float | None = None) -> int: """Return total number of wet elements across all MPI ranks. Optional arguments: indices: set of element ids that the operation applies to minimum_height: threshold for considering an element wet Usage: count = get_global_wet_element_count() Note: This performs MPI reduction, so all ranks get the same result. """ from anuga import numprocs wet_indices = self.get_wet_elements(indices, minimum_height) local_count = len(wet_indices) if numprocs == 1: return local_count from mpi4py import MPI global_count = MPI.COMM_WORLD.allreduce(local_count, op=MPI.SUM) return global_count def get_global_max_stage(self, indices: list[int] | num.ndarray | None = None) -> float: """Return maximum stage value across all MPI ranks. Optional argument: indices: set of element ids that the operation applies to Usage: max_stage = get_global_max_stage() Note: This performs MPI reduction, so all ranks get the same result. """ from anuga import numprocs stage = self.get_quantity('stage') local_max = stage.get_maximum_value(indices) if numprocs == 1: return local_max from mpi4py import MPI global_max = MPI.COMM_WORLD.allreduce(local_max, op=MPI.MAX) return global_max def get_global_max_speed(self) -> float: """Return maximum speed across all MPI ranks. Usage: max_speed = get_global_max_speed() Note: This performs MPI reduction, so all ranks get the same result. """ from anuga import numprocs import numpy as num if self.max_speed is None: return 0.0 local_max = num.max(self.max_speed) if numprocs == 1: return local_max from mpi4py import MPI global_max = MPI.COMM_WORLD.allreduce(local_max, op=MPI.MAX) return global_max def get_water_volume(self) -> float: from anuga import numprocs # GPU path: compute volume directly on GPU (avoids expensive D2H sync) if self.multiprocessor_mode == MULTIPROCESSOR_GPU and self.gpu_interface is not None: from anuga.shallow_water.sw_domain_gpu_ext import compute_water_volume_gpu volume = compute_water_volume_gpu(self.gpu_interface.gpu_dom) elif not self.evolved_called: Stage = self.quantities['stage'] Elev = self.quantities['elevation'] h_c = Stage.centroid_values - Elev.centroid_values from anuga import Quantity Height = Quantity(self) Height.set_values(h_c, location='centroids') volume = Height.get_integral() elif self.get_using_discontinuous_elevation(): Height = self.quantities['height'] volume = Height.get_integral() else: Stage = self.quantities['stage'] Elev = self.quantities['elevation'] Height = Stage-Elev volume = Height.get_integral() if numprocs == 1: self.volume_history.append(volume) return volume # Use MPI_Allreduce instead of manual gather-broadcast from mpi4py import MPI water_volume = MPI.COMM_WORLD.allreduce(volume, op=MPI.SUM) self.volume_history.append(water_volume) return water_volume def get_boundary_flux_integral(self) -> float: """Compute the boundary flux integral. Should work in parallel """ from anuga import numprocs if not self.compute_fluxes_method=='DE': msg='Boundary flux integral only supported for DE fluxes '+\ '(because computation of boundary_flux_sum is only implemented there)' raise Exception(msg) flux_integral = self.boundary_flux_integral.boundary_flux_integral[0] if numprocs == 1: return flux_integral # Use MPI_Allreduce instead of manual gather-broadcast from mpi4py import MPI flux_integral = MPI.COMM_WORLD.allreduce(flux_integral, op=MPI.SUM) return flux_integral def get_fractional_step_volume_integral(self) -> float: """Compute the integrated flows from fractional steps. This requires that the fractional step operators update the fractional_step_volume_integral. Should work in parallel """ from anuga import numprocs flux_integral = self.fractional_step_volume_integral if numprocs == 1: return flux_integral # Use MPI_Allreduce instead of manual gather-broadcast from mpi4py import MPI flux_integral = MPI.COMM_WORLD.allreduce(flux_integral, op=MPI.SUM) return flux_integral def get_flow_through_cross_section(self, polyline: ArrayLike, verbose: bool = False) -> float: """Get the total flow through an arbitrary poly line. This is a run-time equivalent of the function with same name in sww_interrogate.py Input: polyline: Representation of desired cross section - it may contain multiple sections allowing for complex shapes. Assume absolute UTM coordinates. Format [[x0, y0], [x1, y1], ...] Output: Q: Total flow [m^3/s] across given segments. """ cross_section = Cross_section(self, polyline, verbose) return cross_section.get_flow_through_cross_section() def get_energy_through_cross_section(self, polyline: ArrayLike, kind: str = 'total', verbose: bool = False) -> float: """Obtain average energy head [m] across specified cross section. Inputs: polyline: Representation of desired cross section - it may contain multiple sections allowing for complex shapes. Assume absolute UTM coordinates. Format [[x0, y0], [x1, y1], ...] kind: Select which energy to compute. Options are 'specific' and 'total' (default) Output: E: Average energy [m] across given segments for all stored times. The average velocity is computed for each triangle intersected by the polyline and averaged weighted by segment lengths. The typical usage of this function would be to get average energy of flow in a channel, and the polyline would then be a cross section perpendicular to the flow. #FIXME (Ole) - need name for this energy reflecting that its dimension is [m]. """ cross_section = Cross_section(self, polyline, verbose) return cross_section.get_energy_through_cross_section(kind) def check_integrity(self) -> None: """ Run integrity checks on shallow water domain. """ Generic_Domain.check_integrity(self) #Check that we are solving the shallow water wave equation msg = 'First conserved quantity must be "stage"' assert self.conserved_quantities[0] == 'stage', msg msg = 'Second conserved quantity must be "xmomentum"' assert self.conserved_quantities[1] == 'xmomentum', msg msg = 'Third conserved quantity must be "ymomentum"' assert self.conserved_quantities[2] == 'ymomentum', msg #@profile def compute_fluxes(self): """Compute fluxes and timestep suitable for all volumes in domain. Compute total flux for each conserved quantity using "flux_function" Fluxes across each edge are scaled by edgelengths and summed up Resulting flux is then scaled by area and stored in explicit_update for each of the three conserved quantities stage, xmomentum and ymomentum The maximal allowable speed computed by the flux_function for each volume is converted to a timestep that must not be exceeded. The minimum of those is computed as the next overall timestep. Post conditions: domain.explicit_update is reset to computed flux values domain.flux_timestep is set to the largest step satisfying all volumes. This wrapper calls the underlying C version of compute fluxes """ # Using Gareth Davies discontinuous elevation scheme # Flux calculation and gravity incorporated in same # procedure nvtxRangePush("compute_fluxes") self._ensure_gpu_interface() # Choose the correct extension module if self.multiprocessor_mode == MULTIPROCESSOR_OPENMP: from .sw_domain_openmp_ext import compute_fluxes_ext_central elif self.multiprocessor_mode == MULTIPROCESSOR_GPU: # change over to cuda routines as developed # from .sw_domain_simd_ext import compute_fluxes_ext_central # FIXME SR: 2023_10_16 currently compute_fluxes and distribute together # is producing incorrect results, but work separately! compute_fluxes_ext_central = self.gpu_interface.compute_fluxes_ext_central_kernel else: raise Exception('Not implemented') timestep = self.evolve_max_timestep self.flux_timestep = compute_fluxes_ext_central(self, timestep) nvtxRangePop() def update_boundary(self): """Go through list of boundary objects and update boundary values for all conserved quantities on boundary. It is assumed that the ordering of conserved quantities is consistent between the domain and the boundary object, i.e. the jth element of vector q must correspond to the jth conserved quantity in domain. """ nvtxRangePush('update_boundary') # GPU mode - use GPU boundary functions if all boundaries are GPU-supported if self.multiprocessor_mode == MULTIPROCESSOR_GPU and self.gpu_interface is not None: from anuga.shallow_water.sw_domain_gpu_ext import ( evaluate_reflective_boundary_gpu, evaluate_dirichlet_boundary_gpu, evaluate_transmissive_boundary_gpu, set_transmissive_n_zero_t_stage, evaluate_transmissive_n_zero_t_boundary_gpu, set_time_boundary_values, evaluate_time_boundary_gpu, set_file_boundary_values_from_domain, evaluate_file_boundary_gpu, set_absorbing_wave_value, evaluate_absorbing_wave_boundary_gpu, set_characteristic_wave_value, evaluate_characteristic_wave_boundary_gpu, set_flather_value, evaluate_flather_boundary_gpu, boundary_edge_sync, sync_boundary_values, init_boundary_edge_sync, ) import numpy as np gpu_dom = self.gpu_interface.gpu_dom # Lazily initialize GPU boundary info GPU_BOUNDARY_TYPES = {'Reflective_boundary', 'Dirichlet_boundary', 'Transmissive_boundary', 'Transmissive_n_momentum_zero_t_momentum_set_stage_boundary', 'Time_boundary', 'File_boundary', 'Field_boundary', 'Absorbing_wave_boundary', 'Characteristic_wave_boundary', 'Flather_external_stage_zero_velocity_boundary'} if not hasattr(self, '_gpu_boundary_info_initialized'): self._gpu_cpu_tags = [] self._gpu_all_on_gpu = True self._gpu_transmissive_n_zero_t_boundaries = [] self._gpu_time_boundaries = [] self._gpu_absorbing_wave_boundaries = [] self._gpu_characteristic_wave_boundaries = [] self._gpu_flather_boundaries = [] for tag, B in self.boundary_map.items(): if B is not None: btype = B.__class__.__name__ if btype not in GPU_BOUNDARY_TYPES: self._gpu_cpu_tags.append(tag) self._gpu_all_on_gpu = False elif btype == 'Transmissive_n_momentum_zero_t_momentum_set_stage_boundary': self._gpu_transmissive_n_zero_t_boundaries.append(B) elif btype == 'Time_boundary': self._gpu_time_boundaries.append(B) elif btype == 'Absorbing_wave_boundary': self._gpu_absorbing_wave_boundaries.append(B) elif btype == 'Characteristic_wave_boundary': self._gpu_characteristic_wave_boundaries.append(B) elif btype == 'Flather_external_stage_zero_velocity_boundary': self._gpu_flather_boundaries.append(B) # Set up boundary edge sync if we have ANY CPU-evaluated boundaries if not self._gpu_all_on_gpu: boundary_cell_ids = np.unique(self.boundary_cells).astype(np.intc) init_boundary_edge_sync(gpu_dom, boundary_cell_ids) self._gpu_boundary_info_initialized = True if self._gpu_all_on_gpu: # All boundaries are GPU-supported - evaluate entirely on GPU evaluate_reflective_boundary_gpu(gpu_dom) evaluate_dirichlet_boundary_gpu(gpu_dom) evaluate_transmissive_boundary_gpu(gpu_dom) # Handle transmissive_n_zero_t boundaries (need Python function call for stage) for B in self._gpu_transmissive_n_zero_t_boundaries: stage_val = B.get_boundary_values() try: stage_val = float(stage_val) except (TypeError, ValueError): stage_val = float(stage_val[0]) set_transmissive_n_zero_t_stage(gpu_dom, stage_val) evaluate_transmissive_n_zero_t_boundary_gpu(gpu_dom) # Handle Time_boundary (need Python function call for values) for B in self._gpu_time_boundaries: q = B.get_boundary_values() set_time_boundary_values(gpu_dom, float(q[0]), float(q[1]), float(q[2])) evaluate_time_boundary_gpu(gpu_dom) # Handle File_boundary / Field_boundary (per-edge values from SWW interpolation) set_file_boundary_values_from_domain(gpu_dom, self) evaluate_file_boundary_gpu(gpu_dom) # Handle Absorbing_wave_boundary (scalar wave stage updated each timestep) for B in self._gpu_absorbing_wave_boundaries: value = B.get_boundary_values() try: wave_val = float(value) except (TypeError, ValueError): wave_val = float(value[0]) set_absorbing_wave_value(gpu_dom, wave_val) evaluate_absorbing_wave_boundary_gpu(gpu_dom) # Handle Characteristic_wave_boundary (scalar perturbation updated each timestep) for B in self._gpu_characteristic_wave_boundaries: value = B.get_boundary_values() try: perturb = float(value) except (TypeError, ValueError): perturb = float(value[0]) set_characteristic_wave_value(gpu_dom, perturb) evaluate_characteristic_wave_boundary_gpu(gpu_dom) for B in self._gpu_flather_boundaries: value = B.get_boundary_values() try: stage_val = float(value) except (TypeError, ValueError): stage_val = float(value[0]) set_flather_value(gpu_dom, stage_val) evaluate_flather_boundary_gpu(gpu_dom) else: # Some boundaries need CPU - sync edge values, evaluate on CPU, sync back boundary_edge_sync(gpu_dom) for tag in self.tag_boundary_cells: B = self.boundary_map[tag] if B is not None: B.evaluate_segment(self, self.tag_boundary_cells[tag]) sync_boundary_values(gpu_dom) else: # CPU mode - evaluate boundaries on CPU for tag in self.tag_boundary_cells: B = self.boundary_map[tag] if B is None: continue boundary_segment_edges = self.tag_boundary_cells[tag] B.evaluate_segment(self, boundary_segment_edges) nvtxRangePop() def compute_forcing_terms(self): """If there are any forcing functions driving the system they should be defined in Domain subclass and appended to the list self.forcing_terms """ # The parameter self.flux_timestep should be updated # by the forcing_terms to ensure stability but it isn't # currently. nvtxRangePush('compute_forcing_terms') self._ensure_gpu_interface() if self.multiprocessor_mode == MULTIPROCESSOR_GPU: # GPU mode: use GPU Manning friction, fall back to CPU for others. # Forcing terms may be plain functions (with __name__) or callable # operator objects (Rainfall, Wind_stress, ...) which have none. for f in self.forcing_terms: if getattr(f, '__name__', None) == 'manning_friction_semi_implicit': self.gpu_interface.manning_friction_kernel(self) else: # Other forcing terms (rain, etc.) run on CPU # Need to sync from device, run, sync back self.gpu_interface.sync_from_device() f(self) self.gpu_interface.sync_to_device() else: for f in self.forcing_terms: f(self) nvtxRangePop() def _warn_unsupported_mode2_forcing(self): """Warn (once) if forcing terms other than Manning friction are present in mode 2. The mode-2 C step loop applies forcing in C and only handles Manning friction, so Python ``forcing_terms`` (e.g. the Rainfall / Wind_stress / Barometric_pressure forcing-function classes) are NOT applied — they are silently skipped. Use the equivalent operators instead. This converts that silent correctness gap into a loud, actionable message. """ if getattr(self, '_warned_mode2_forcing', False): return self._warned_mode2_forcing = True ignored = [getattr(f, '__name__', None) or f.__class__.__name__ for f in self.forcing_terms if getattr(f, '__name__', None) != 'manning_friction_semi_implicit'] if ignored: import warnings warnings.warn( "multiprocessor_mode=2 ('unified') applies forcing in C and only " "handles Manning friction; these Python forcing terms are NOT " f"applied and are silently skipped: {ignored}. Use the equivalent " "operators instead — Rate_operator.rainfall()/inflow(), " "Wind_stress_operator, Barometric_pressure_operator.", stacklevel=2) def set_boundary(self, boundary_map): """Associate boundary objects with tagged segments (see base class). Mode-2 ('unified') captures a device-side boundary classification and the per-edge Dirichlet/Time/File/... mappings when the GPU interface is first built. A later set_boundary() — e.g. switching a tag from Reflective to Dirichlet partway through a simulation — must invalidate those caches, otherwise the stale boundary keeps being applied on the device and the new condition is silently ignored. """ super().set_boundary(boundary_map) if self.multiprocessor_mode == MULTIPROCESSOR_GPU and self.gpu_interface is not None: # Preserve current conserved-quantity state before tearing the # interface down. In CPU-no-offload mode host arrays alias the # device; the sync makes this correct for the offload case too. try: self.gpu_interface.sync_from_device() except Exception: pass self.gpu_interface = None if hasattr(self, '_gpu_boundary_info_initialized'): del self._gpu_boundary_info_initialized # Rebuild immediately from the new boundary map so subsequent # mode-2 steps see a valid interface. self._ensure_gpu_interface() def distribute_to_vertices_and_edges(self, distribute_to_vertices=True): """ extrapolate centroid values to vertices and edges""" nvtxRangePush('distribute_to_vertices_and_edges') # Build a deferred mode-2 device interface on demand (a default-'unified' # domain may reach here, e.g. from a test, without going through evolve()). self._ensure_gpu_interface() if self.multiprocessor_mode == MULTIPROCESSOR_GPU and self.gpu_interface is not None: # Output path (yieldsteps / direct reads). This method is NOT on the # mode-2 stepping path (the C RK loop extrapolates on the device # itself); it exists to make vertex/edge values readable from Python # (SWW writer, inundation queries, tests). The gpu edge kernel does # NOT compute vertex values, so sync centroids from the device and # run the host (openmp) protect + extrapolate, which produces edges # AND vertices. The host protect does not affect the device # trajectory — the next C step re-protects on the device. self.gpu_interface.sync_from_device() from .sw_domain_openmp_ext import (protect_new, extrapolate_second_order_edge_sw) protect_new(self) extrapolate_second_order_edge_sw(self, distribute_to_vertices=distribute_to_vertices) nvtxRangePop() return # Do protection step self.protect_against_infinitesimal_and_negative_heights() # Do extrapolation step # Choose the correct extension module if self.multiprocessor_mode == MULTIPROCESSOR_OPENMP: from .sw_domain_openmp_ext import extrapolate_second_order_edge_sw else: raise Exception('Not implemented') extrapolate_second_order_edge_sw(self, distribute_to_vertices=distribute_to_vertices) nvtxRangePop() def distribute_to_edges(self): """ extrapolate centroid values edges""" nvtxRangePush('distribute_to_edges') # Do protection step self.protect_against_infinitesimal_and_negative_heights() # Do extrapolation step # Choose the correct extension module if self.multiprocessor_mode == MULTIPROCESSOR_OPENMP: from .sw_domain_openmp_ext import distribute_to_edges as extrapolate_second_order_edge_sw extrapolate_second_order_edge_sw(self) elif self.multiprocessor_mode == MULTIPROCESSOR_GPU: # change over to cuda routines as developed #from .sw_domain_simd_ext import extrapolate_second_order_edge_sw extrapolate_second_order_edge_sw = self.gpu_interface.extrapolate_second_order_edge_sw_kernel extrapolate_second_order_edge_sw(self) else: raise Exception('Not implemented') nvtxRangePop() def distribute_edges_to_vertices(self): """Distribute edge values to vertices. This is a wrapper for the C implementation of the distribution from edges to vertices. """ nvtxRangePush('distribute_edges_to_vertices') if self.multiprocessor_mode == MULTIPROCESSOR_OPENMP: # Using OpenMP extension from .sw_domain_openmp_ext import distribute_edges_to_vertices as distribute_edges_to_vertices_ext elif self.multiprocessor_mode == MULTIPROCESSOR_GPU: # Using cupy extension # FIXME SR: Not implemented yet so use OpenMP version from .sw_domain_openmp_ext import distribute_edges_to_vertices as distribute_edges_to_vertices_ext # distribute_edges_to_vertices_ext = self.gpu_interface.distribute_edges_to_vertices_kernel else: raise Exception('Not implemented') distribute_edges_to_vertices_ext(self) nvtxRangePop() def update_timestep(self, yieldstep, finaltime): """Calculate the next timestep to take """ # Protect against degenerate timesteps arising from isolated # triangles self.apply_protection_against_isolated_degenerate_timesteps() # disable variable timestepping if self.fixed_flux_timestep is not None: self.flux_timestep = self.fixed_flux_timestep timestep = self.fixed_flux_timestep else: # self.timestep is calculated from speed of characteristics # Apply CFL condition here timestep = min(self.CFL * self.flux_timestep, self.evolve_max_timestep) # Record maximal and minimal values of timestep for reporting self.recorded_max_timestep = max(timestep, self.recorded_max_timestep) self.recorded_min_timestep = min(timestep, self.recorded_min_timestep) # Stop if degenerate timestep if timestep < self.evolve_min_timestep: msg = 'WARNING: Too small timestep %.16f reached ' \ % timestep msg += 'even after %d steps of 1 order scheme' \ % self.max_smallsteps log.info(msg) timestep = self.evolve_min_timestep # Try enforce min_step stats = self.timestepping_statistics(track_speeds=True) log.info(stats) raise Exception(msg) # NOTE: Now timestep is redefined. This can lead to a timestep # being smaller than the self.recorded_min_timestep, which # confused me (GD). # The behaviour is good though, since then the # recorded_min_timestep reflects the mathematical constraints on # the timestep, EXCEPT the constraint that we yield at the # required time. Otherwise we would often have very small # recorded_min_timesteps simply because of we have to yield at a # given time # Ensure that final time is not exceeded if self.relative_finaltime is not None and self.relative_time + timestep > self.relative_finaltime: timestep = self.relative_finaltime - self.relative_time # Ensure that model time is aligned with yieldsteps if self.relative_time + timestep > self.relative_yieldtime: timestep = self.relative_yieldtime - self.relative_time self.timestep = timestep def protect_against_infinitesimal_and_negative_heights(self): """ Clean up the stage and momentum values to ensure non-negative heights """ # nvtxRangePush('protect_new') # Choose the correct extension module if self.multiprocessor_mode == MULTIPROCESSOR_OPENMP: from .sw_domain_openmp_ext import protect_new elif self.multiprocessor_mode == MULTIPROCESSOR_GPU: # change over to cuda routines as developed # # from .sw_domain_simd_ext import protect_new #from .sw_domain_openmp_ext import protect_new protect_new = self.gpu_interface.protect_against_infinitesimal_and_negative_heights_kernel else: raise Exception('Not implemented') mass_error = protect_new(self) # nvtxRangePop() if mass_error > 0.0 and self.verbose : #print('Cumulative mass protection: ' + str(mass_error) + ' m^3 ') # From https://stackoverflow.com/questions/22397261/cant-convert-float-object-to-str-implicitly print('Cumulative mass protection: {0} m^3'.format(mass_error)) def apply_protection_against_isolated_degenerate_timesteps(self): if self.protect_against_isolated_degenerate_timesteps is False: return # FIXME (Ole): Make this configurable if num.max(self.max_speed) < 10.0: return # Setup 10 bins for speed histogram from anuga.utilities.numerical_tools import histogram, create_bins bins = create_bins(self.max_speed, 10) hist = histogram(self.max_speed, bins) # Look for characteristic signature if len(hist) > 1 and hist[-1] > 0 and \ hist[4] == hist[5] == hist[6] == hist[7] == hist[8] == 0: # Danger of isolated degenerate triangles # Find triangles in last bin # FIXME - speed up using numeric package d = 0 for i in range(self.number_of_triangles): if self.max_speed[i] > bins[-1]: msg = 'Time=%f: Ignoring isolated high ' % self.get_time() msg += 'speed triangle ' msg += '#%d of %d with max speed = %f' \ % (i, self.number_of_triangles, self.max_speed[i]) self.get_quantity('xmomentum').set_values(0.0, indices=[i]) self.get_quantity('ymomentum').set_values(0.0, indices=[i]) self.max_speed[i] = 0.0 d += 1 def update_conserved_quantities(self): """Update vectors of conserved quantities using previously computed fluxes and specified forcing functions. """ nvtxRangePush('update_conserved_quantities') timestep = self.timestep # Update height based on discontinuous elevation assert self.get_using_discontinuous_elevation() if self.multiprocessor_mode == MULTIPROCESSOR_OPENMP: from .sw_domain_openmp_ext import update_conserved_quantities elif self.multiprocessor_mode == MULTIPROCESSOR_GPU: update_conserved_quantities = self.gpu_interface.update_conserved_quantities_kernel else: raise Exception('Not implemented') num_negative_ids = update_conserved_quantities(self, timestep) if num_negative_ids > 0: # FIXME: This only warns the first time -- maybe we should warn whenever loss occurs? import warnings msg = f'{num_negative_ids} negative cells being set to zero depth, possible loss of conservation. \n' +\ 'Consider using domain.report_water_volume_statistics() to check the extent of the problem' warnings.warn(msg) nvtxRangePop() def update_other_quantities(self): """ There may be a need to calculate some of the other quantities based on the new values of conserved quantities """ return def update_centroids_of_velocities_and_height(self): """Calculate the centroid values of velocities and height based on the values of the quantities stage and x and y momentum Assumes that stage and momentum are up to date Useful for kinematic viscosity calculations """ # For shallow water we need to update height xvelocity and yvelocity #Shortcuts W = self.quantities['stage'] UH = self.quantities['xmomentum'] VH = self.quantities['ymomentum'] H = self.quantities['height'] Z = self.quantities['elevation'] U = self.quantities['xvelocity'] V = self.quantities['yvelocity'] #print num.min(W.centroid_values) # Make sure boundary values of conserved quantites # are consistent with value of functions at centroids Z.set_boundary_values_from_edges() # FIXME(Ole): Why are these not necessary? #W.set_boundary_values_from_edges() #UH.set_boundary_values_from_edges() #VH.set_boundary_values_from_edges() #Aliases w_C = W.centroid_values z_C = Z.centroid_values uh_C = UH.centroid_values vh_C = VH.centroid_values u_C = U.centroid_values v_C = V.centroid_values h_C = H.centroid_values w_B = W.boundary_values z_B = Z.boundary_values uh_B = UH.boundary_values vh_B = VH.boundary_values u_B = U.boundary_values v_B = V.boundary_values h_B = H.boundary_values h_C[:] = w_C-z_C h_C[:] = num.where(h_C >= 0, h_C , 0.0) h_B[:] = w_B-z_B h_B[:] = num.where(h_B >=0, h_B, 0.0) # Update height values #H.set_values( num.where(W.centroid_values-Z.centroid_values>=0, # W.centroid_values-Z.centroid_values, 0.0), location='centroids') #H.set_boundary_values( num.where(W.boundary_values-Z.boundary_values>=0, # W.boundary_values-Z.boundary_values, 0.0)) #assert num.min(h_C) >= 0 #assert num.min(h_B) >= 0 H0 = 1.0e-8 #U.set_values(uh_C/(h_C + H0/h_C), location='centroids') #V.set_values(vh_C/(h_C + H0/h_C), location='centroids') factor = h_C/(h_C*h_C + H0) u_C[:] = uh_C*factor v_C[:] = vh_C*factor #U.set_boundary_values(uh_B/(h_B + H0/h_B)) #V.set_boundary_values(vh_B/(h_B + H0/h_B)) factor = h_B/(h_B*h_B + H0) u_B[:] = uh_B*factor v_B[:] = vh_B*factor def update_centroids_of_momentum_from_velocity(self): """ Calculate the centroid value of x and y momentum from height and velocities. This method computes the centroid values of x and y momentum (xmomentum and ymomentum) by multiplying the centroid velocities by the centroid height values. The method assumes that the centroids of height and velocities are already up to date. This is particularly useful for kinematic viscosity calculations where momentum values at cell centroids are required. The method updates: - xmomentum.centroid_values: product of xvelocity and height at centroids - ymomentum.centroid_values: product of yvelocity and height at centroids After updating centroid values, the method distributes these values to vertices and edges via distribute_to_vertices_and_edges(). Notes ----- This method modifies the centroid_values arrays in-place for both xmomentum and ymomentum quantities. See Also -------- distribute_to_vertices_and_edges : Distribute centroid values to vertices and edges """ # For shallow water we need to update height xvelocity and yvelocity #Shortcuts UH = self.quantities['xmomentum'] VH = self.quantities['ymomentum'] H = self.quantities['height'] Z = self.quantities['elevation'] U = self.quantities['xvelocity'] V = self.quantities['yvelocity'] #Arrays u_C = U.centroid_values v_C = V.centroid_values uh_C = UH.centroid_values vh_C = VH.centroid_values h_C = H.centroid_values u_B = U.boundary_values v_B = V.boundary_values uh_B = UH.boundary_values vh_B = VH.boundary_values h_B = H.boundary_values uh_C[:] = u_C*h_C vh_C[:] = v_C*h_C #UH.set_values(u_C*h_C , location='centroids') #VH.set_values(v_C*h_C , location='centroids') self.distribute_to_vertices_and_edges() def evolve(self, yieldstep: float | None = None, outputstep: float | None = None, finaltime: float | DateTime | None = None, duration: float | None = None, skip_initial_step: bool = False) -> Iterator[float]: """Evolve method from Domain class. Parameters ---------- yieldstep : float, optional Yield every yieldstep time period outputstep : float, optional Output to sww file every outputstep time period. outputstep should be an integer multiple of yieldstep. finaltime : float or datetime, optional Evolve until finaltime (can be a float in seconds or a datetime object) duration : float, optional Evolve for a time of length duration (seconds) skip_initial_step : bool, optional Can be used to restart a simulation (not often used). Notes ----- If outputstep is None, the output to sww file happens every yieldstep. If yieldstep is None then simply evolve to finaltime or for a duration. """ # Call check integrity here rather than from user scripts # self.check_integrity() from time import time as walltime self.evolve_start_walltime = walltime() self.last_walltime = self.evolve_start_walltime from datetime import datetime if finaltime is not None: if isinstance(finaltime, datetime): if finaltime.tzinfo is None: dt = finaltime.replace(tzinfo=self.timezone) else: dt = finaltime finaltime = dt.timestamp() else: finaltime = float(finaltime) if outputstep is None: outputstep = yieldstep if yieldstep is None: self.output_frequency = 1 else: msg = f'outputstep ({outputstep}) should be an integer multiple of yieldstep ({yieldstep})' output_frequency = outputstep/yieldstep assert float(output_frequency).is_integer(), msg self.output_frequency = int(output_frequency) msg = 'Attribute self.beta_w must be in the interval [0, 2]' assert 0 <= self.beta_w <= 2.0, msg # Build the mode-2 device interface lazily if it was deferred (a # default-'unified' domain constructed before boundaries were set). Must # happen before distribute_to_vertices_and_edges(), which uses the # interface in mode 2. self._ensure_gpu_interface() # Initial update of vertex and edge values before any STORAGE # and or visualisation. # This is done again in the initialisation of the Generic_Domain # evolve loop but we do it here to ensure the values are ok for storage. self.distribute_to_vertices_and_edges() if self.store is True and (self.get_relative_time() == 0.0 or self.evolved_called is False): self.initialise_storage() # Eagerly run the mode-2 fractional-step setup (Boyd culvert registration # and CPU-only-operator detection) so its log lines print before the first # yielded step instead of after the t=0 yield. Cached, so the first # apply_fractional_steps() does not repeat it. if self.multiprocessor_mode == MULTIPROCESSOR_GPU and self.gpu_interface is not None: self._has_cpu_only_fractional_operators() self._warn_unsupported_mode2_forcing() #nvtx marker nvtxRangePush('_evolve_base') # Call basic machinery from parent class for t in self._evolve_base(yieldstep=yieldstep, finaltime=finaltime, duration=duration, skip_initial_step=skip_initial_step): walltime = time.time() #print t , self.get_time() # Store model data, e.g. for subsequent visualisation if self.store: if self.yieldstep_counter%self.output_frequency == 0: self.store_timestep() if self.checkpoint: save_checkpoint=False if self.checkpoint_step == 0: if rank() == 0: if walltime - self.walltime_prev > self.checkpoint_time: save_checkpoint = True for cpu in range(size()): if cpu != rank(): send(save_checkpoint, cpu) else: save_checkpoint = receive(0) elif self.yieldstep_counter%self.checkpoint_step == 0: save_checkpoint = True if save_checkpoint: pickle_name = os.path.join(self.checkpoint_dir,self.get_name())+'_'+str(self.get_time())+'.pickle' pickle.dump(self, open(pickle_name, 'wb')) barrier() self.walltime_prev = time.time() #print 'Stored Checkpoint File '+pickle_name # Pass control on to outer loop for more specific actions yield(t) self.yieldstep_counter += 1 #nvtx marker nvtxRangePop() def initialise_storage(self) -> None: """Create and initialise self.writer object for storing data. Also, save x,y and bed elevation """ nvtxRangePush('SWW_file') # Initialise writer self.writer = SWW_file(self) # Store vertices and connectivity self.writer.store_connectivity() nvtxRangePop() def store_timestep(self) -> None: """Store time dependent quantities and time. Precondition: self.writer has been initialised """ nvtxRangePush('store_timestep') self.writer.store_timestep() nvtxRangePop() def sww_merge(self, *args, **kwargs) -> None: """Dummy function for sequential algorithms where the sww produced is the final products. For parallel runs, a similarly named routine in parallel_shallow_water will merge all the sub domain sww files into a global sww file :param bool verbose: Flag to produce more output :param bool delete_old: Flag to delete sub domain sww files after creating global sww file """ pass def evolve_one_euler_step(self, yieldstep, finaltime): """One Euler Time Step Q^{n+1} = E(h) Q^n Does not assume that centroid values have been extrapolated to vertices and edges """ # GPU mode: dispatch to C Euler loop (eliminates Python round-trip overhead) if self.multiprocessor_mode == MULTIPROCESSOR_GPU and self.gpu_interface is not None: self._evolve_one_euler_step_c(yieldstep, finaltime) return # From centroid values calculate edge self.distribute_to_vertices_and_edges(distribute_to_vertices=False) # Apply boundary conditions self.update_boundary() # Compute fluxes across each element edge # In MPI parallel mode this involves an allreduce to find global minimal timestep self.compute_fluxes() # Compute forcing terms (friction) self.compute_forcing_terms() # Update timestep to fit yieldstep and finaltime self.update_timestep(yieldstep, finaltime) # Update conserved quantities self.update_conserved_quantities() def evolve_one_rk2_step(self, yieldstep, finaltime): """One 2nd order RK timestep Q^{n+1} = 0.5 Q^n + 0.5 E(h)^2 Q^n Does not assume that centroid values have been extrapolated to vertices and edges """ # GPU mode: use C RK loop (faster) or Python-orchestrated GPU loop if self.multiprocessor_mode == MULTIPROCESSOR_GPU and self.gpu_interface is not None: if self.use_c_rk_loop: self._evolve_one_rk2_step_c(yieldstep, finaltime) else: self._evolve_one_rk2_step_gpu(yieldstep, finaltime) return # Save initial initial conserved quantities values self.backup_conserved_quantities() # has C, ported to GPU #========================================== # First euler step #========================================== # From centroid values calculate edge values self.distribute_to_vertices_and_edges(distribute_to_vertices=False) # has C, ported to GPU # Apply boundary conditions self.update_boundary() # has C, ported to GPU # Compute fluxes across each element edge # In MPI parallel mode this involves an allreduce to find global minimal timestep self.compute_fluxes() # Compute forcing terms (friction) self.compute_forcing_terms() # Update timestep to fit yieldstep and finaltime self.update_timestep(yieldstep, finaltime) #needs C # Update centroid values of conserved quantities self.update_conserved_quantities() # has C, ported to GPU #=========================== # End of first euler step #=========================== # Update time self.set_relative_time(self.get_relative_time() + self.timestep) # needs C # Update ghosts if self.ghost_layer_width < 4: self.update_ghosts() # needs C #========================================= # Second Euler step using the same timestep # calculated in the first step. Might lead to # stability problems but we have not seen any # example. #========================================= # Update edge values self.distribute_to_vertices_and_edges(distribute_to_vertices=False) # Update boundary values self.update_boundary() # Compute fluxes across each element edge # In MPI parallel mode this involves an allreduce to find global minimal timestep self.compute_fluxes() # Compute forcing terms (friction) self.compute_forcing_terms() # Update conserved quantities using timestep from first step self.update_conserved_quantities() #======================================== # Combine initial and final values # of conserved quantities and cleanup #======================================== # Combine steps self.saxpy_conserved_quantities(0.5, 0.5) # has C, not ported def evolve_one_ader2_step(self, yieldstep, finaltime): """One ADER-2 timestep using the local Cauchy-Kovalewski predictor. Q^{n+1} = Q^n + dt * R(Q^{n+1/2}) Uses the fused edge predictor: edge values are shifted to Q^{n+1/2} in-place while centroid values remain at Q^n, eliminating the second extrapolation pass and the backup/saxpy restore pattern. Single-flux-call variant: the previous step's CFL timestep is reused for the predictor half-advance. The first step bootstraps with dt=0 (Euler) to establish the initial CFL timestep. Cost: 1 flux call + 1 extrapolation + 1 edge C-K predictor (after step 1). Accuracy: 2nd-order in space and time. """ # GPU mode: use C loop (faster) or Python-orchestrated GPU loop if self.multiprocessor_mode == MULTIPROCESSOR_GPU and self.gpu_interface is not None: if self.use_c_rk_loop: self._evolve_one_ader2_step_c(yieldstep, finaltime) else: self._evolve_one_ader2_step_gpu(yieldstep, finaltime) return from .sw_domain_openmp_ext import ader_ck_predictor_edge # Bootstrap: prev_dt=0 on first call → Euler step to establish CFL dt if not hasattr(self, '_ader2_prev_dt'): self._ader2_prev_dt = 0.0 prev_dt = self._ader2_prev_dt # Extrapolate Q^n centroids → edges (1 pass; centroids untouched) self.distribute_to_vertices_and_edges(distribute_to_vertices=False) self.update_boundary() if prev_dt > 0.0: # Fused edge predictor: shift edge values to Q^{n+1/2}, Q^n in centroids ader_ck_predictor_edge(self, prev_dt * 0.5) # Re-apply boundary conditions to boundary edges self.update_boundary() # Single flux call from Q^{n+1/2} edge values (or Q^n on bootstrap step) self.compute_fluxes() self.compute_forcing_terms() # Clip to yieldstep / finaltime / evolve_max_timestep. # In MPI parallel mode update_timestep does an Allreduce so that # self.timestep is the global-minimum CFL dt across all ranks. # _ader2_prev_dt must equal the timestep actually taken so that # all ranks use the same predictor half-step next iteration; # saving the local flux_timestep before the Allreduce would give # each rank a different prev_dt, corrupting ghost-boundary edges. self.update_timestep(yieldstep, finaltime) # Record the timestep actually taken as prev_dt for the next predictor. self._ader2_prev_dt = self.timestep # Q^n centroids are untouched — update directly (no saxpy restore needed) self.update_conserved_quantities() # Q^{n+1} = Q^n + dt*R(Q^{n+1/2}) def _evolve_one_ader2_step_gpu(self, yieldstep, finaltime): """Python-orchestrated GPU implementation of ADER-2 step. Fallback when the C ADER-2 loop cannot be used (e.g., unsupported boundary types). Prefer _evolve_one_ader2_step_c() for better performance. Uses the fused edge predictor with _ader2_prev_dt to match the CPU path exactly (single flux call, no backup/restore of centroid values needed). """ from anuga.shallow_water.sw_domain_gpu_ext import ( extrapolate_second_order_gpu, protect_gpu, compute_fluxes_gpu, update_conserved_quantities_gpu, ader_ck_predictor_edge_gpu, sync_boundary_values, init_boundary_edge_sync, boundary_edge_sync, exchange_ghosts, evaluate_reflective_boundary_gpu, evaluate_dirichlet_boundary_gpu, evaluate_transmissive_boundary_gpu, set_transmissive_n_zero_t_stage, evaluate_transmissive_n_zero_t_boundary_gpu, set_time_boundary_values, evaluate_time_boundary_gpu, set_file_boundary_values_from_domain, evaluate_file_boundary_gpu, set_absorbing_wave_value, evaluate_absorbing_wave_boundary_gpu, set_characteristic_wave_value, evaluate_characteristic_wave_boundary_gpu, set_flather_value, evaluate_flather_boundary_gpu, ) import numpy as np gpu_dom = self.gpu_interface.gpu_dom GPU_BOUNDARY_TYPES = {'Reflective_boundary', 'Dirichlet_boundary', 'Transmissive_boundary', 'Transmissive_n_momentum_zero_t_momentum_set_stage_boundary', 'Time_boundary', 'File_boundary', 'Field_boundary', 'Absorbing_wave_boundary', 'Characteristic_wave_boundary', 'Flather_boundary'} if not hasattr(self, '_gpu_boundary_info_initialized'): self._gpu_cpu_tags = [] self._gpu_all_on_gpu = True cpu_boundary_types = [] self._gpu_transmissive_n_zero_t_boundaries = [] self._gpu_time_boundaries = [] self._gpu_absorbing_wave_boundaries = [] self._gpu_characteristic_wave_boundaries = [] self._gpu_flather_boundaries = [] for tag, B in self.boundary_map.items(): if B is not None: btype = B.__class__.__name__ if btype not in GPU_BOUNDARY_TYPES: self._gpu_cpu_tags.append(tag) self._gpu_all_on_gpu = False cpu_boundary_types.append((tag, btype)) elif btype == 'Transmissive_n_momentum_zero_t_momentum_set_stage_boundary': self._gpu_transmissive_n_zero_t_boundaries.append(B) elif btype == 'Time_boundary': self._gpu_time_boundaries.append(B) elif btype == 'Absorbing_wave_boundary': self._gpu_absorbing_wave_boundaries.append(B) elif btype == 'Characteristic_wave_boundary': self._gpu_characteristic_wave_boundaries.append(B) elif btype == 'Flather_boundary': self._gpu_flather_boundaries.append(B) if not self._gpu_all_on_gpu: boundary_cell_ids = np.unique(self.boundary_cells).astype(np.intc) init_boundary_edge_sync(gpu_dom, boundary_cell_ids) print("WARNING: GPU boundary evaluation disabled - falling back to CPU") print(f" Unsupported boundary types: {cpu_boundary_types}") self._gpu_boundary_info_initialized = True # Bootstrap: prev_dt=0 on first call → plain Euler step if not hasattr(self, '_ader2_prev_dt'): self._ader2_prev_dt = 0.0 prev_dt = self._ader2_prev_dt def _eval_boundaries(): if self._gpu_all_on_gpu: evaluate_reflective_boundary_gpu(gpu_dom) evaluate_dirichlet_boundary_gpu(gpu_dom) evaluate_transmissive_boundary_gpu(gpu_dom) for B in self._gpu_transmissive_n_zero_t_boundaries: stage_val = B.get_boundary_values() try: stage_val = float(stage_val) except (TypeError, ValueError): stage_val = float(stage_val[0]) set_transmissive_n_zero_t_stage(gpu_dom, stage_val) evaluate_transmissive_n_zero_t_boundary_gpu(gpu_dom) for B in self._gpu_time_boundaries: q = B.get_boundary_values() set_time_boundary_values(gpu_dom, float(q[0]), float(q[1]), float(q[2])) evaluate_time_boundary_gpu(gpu_dom) set_file_boundary_values_from_domain(gpu_dom, self) evaluate_file_boundary_gpu(gpu_dom) for B in self._gpu_absorbing_wave_boundaries: value = B.get_boundary_values() try: wave_val = float(value) except (TypeError, ValueError): wave_val = float(value[0]) set_absorbing_wave_value(gpu_dom, wave_val) evaluate_absorbing_wave_boundary_gpu(gpu_dom) for B in self._gpu_characteristic_wave_boundaries: value = B.get_boundary_values() try: perturb = float(value) except (TypeError, ValueError): perturb = float(value[0]) set_characteristic_wave_value(gpu_dom, perturb) evaluate_characteristic_wave_boundary_gpu(gpu_dom) for B in self._gpu_flather_boundaries: value = B.get_boundary_values() try: stage_val = float(value) except (TypeError, ValueError): stage_val = float(value[0]) set_flather_value(gpu_dom, stage_val) evaluate_flather_boundary_gpu(gpu_dom) else: boundary_edge_sync(gpu_dom) for tag in self.tag_boundary_cells: B = self.boundary_map[tag] if B is not None: B.evaluate_segment(self, self.tag_boundary_cells[tag]) sync_boundary_values(gpu_dom) # --- Step 1: extrapolate Q^n → edges + evaluate boundaries --- protect_gpu(gpu_dom) extrapolate_second_order_gpu(gpu_dom) _eval_boundaries() if prev_dt > 0.0: # --- Step 2: fused edge C-K predictor → Q^{n+1/2} edges in-place --- # (centroid values unchanged — no backup/restore needed) ader_ck_predictor_edge_gpu(gpu_dom, prev_dt * 0.5) # Re-apply boundaries to boundary edges _eval_boundaries() # --- Step 3: single flux call from Q^{n+1/2} edges --- self.flux_timestep = compute_fluxes_gpu(gpu_dom) self.compute_forcing_terms() # --- Step 4: Allreduce + yieldstep/finaltime clip --- self.update_timestep(yieldstep, finaltime) self._ader2_prev_dt = self.timestep # --- Step 5: update Q^{n+1} = Q^n + timestep * R (no restore needed) --- update_conserved_quantities_gpu(gpu_dom, self.timestep) self.set_relative_time(self.get_relative_time() + self.timestep) # Record the CFL-constrained step (pre yield/final cap), matching legacy # update_timestep(), rather than the yield-limited step actually taken. cfl_dt = min(self.CFL * self.flux_timestep, self.evolve_max_timestep) self.recorded_max_timestep = max(cfl_dt, self.recorded_max_timestep) self.recorded_min_timestep = min(cfl_dt, self.recorded_min_timestep) # Post-step ghost exchange — update_ghosts() is a no-op in GPU mode if self.ghost_layer_width < 4: exchange_ghosts(gpu_dom) def _evolve_one_ader2_step_c(self, yieldstep, finaltime): """ADER-2 step executed entirely in C - eliminates Python round-trip overhead. Prefer this over _evolve_one_ader2_step_gpu() for better performance. Falls back to _evolve_one_ader2_step_gpu() if any boundary requires CPU. Rate_operators must be applied separately (after this call). """ from anuga.shallow_water.sw_domain_gpu_ext import ( evolve_one_ader2_step_gpu, set_transmissive_n_zero_t_stage, set_time_boundary_values, set_file_boundary_values_from_domain, set_absorbing_wave_value, set_characteristic_wave_value, set_flather_value, ) gpu_dom = self.gpu_interface.gpu_dom GPU_BOUNDARY_TYPES = {'Reflective_boundary', 'Dirichlet_boundary', 'Transmissive_boundary', 'Transmissive_n_momentum_zero_t_momentum_set_stage_boundary', 'Time_boundary', 'File_boundary', 'Field_boundary', 'Absorbing_wave_boundary', 'Characteristic_wave_boundary'} if not hasattr(self, '_gpu_boundary_info_initialized'): self._gpu_cpu_tags = [] self._gpu_all_on_gpu = True cpu_boundary_types = [] self._gpu_transmissive_n_zero_t_boundaries = [] self._gpu_time_boundaries = [] self._gpu_absorbing_wave_boundaries = [] self._gpu_characteristic_wave_boundaries = [] for tag, B in self.boundary_map.items(): if B is not None: btype = B.__class__.__name__ if btype not in GPU_BOUNDARY_TYPES: self._gpu_cpu_tags.append(tag) self._gpu_all_on_gpu = False cpu_boundary_types.append((tag, btype)) elif btype == 'Transmissive_n_momentum_zero_t_momentum_set_stage_boundary': self._gpu_transmissive_n_zero_t_boundaries.append(B) elif btype == 'Time_boundary': self._gpu_time_boundaries.append(B) elif btype == 'Absorbing_wave_boundary': self._gpu_absorbing_wave_boundaries.append(B) elif btype == 'Characteristic_wave_boundary': self._gpu_characteristic_wave_boundaries.append(B) if not self._gpu_all_on_gpu: print("WARNING: C ADER-2 loop requires all GPU-supported boundary types") print(" Falling back to Python-orchestrated GPU loop") print(f" Unsupported types: {cpu_boundary_types}") self._gpu_boundary_info_initialized = True if not self._gpu_all_on_gpu: return self._evolve_one_ader2_step_gpu(yieldstep, finaltime) for B in self._gpu_transmissive_n_zero_t_boundaries: stage_val = B.get_boundary_values() try: stage_val = float(stage_val) except (TypeError, ValueError): stage_val = float(stage_val[0]) set_transmissive_n_zero_t_stage(gpu_dom, stage_val) for B in self._gpu_time_boundaries: q = B.get_boundary_values() set_time_boundary_values(gpu_dom, float(q[0]), float(q[1]), float(q[2])) set_file_boundary_values_from_domain(gpu_dom, self) for B in self._gpu_absorbing_wave_boundaries: value = B.get_boundary_values() try: wave_val = float(value) except (TypeError, ValueError): wave_val = float(value[0]) set_absorbing_wave_value(gpu_dom, wave_val) for B in self._gpu_characteristic_wave_boundaries: value = B.get_boundary_values() try: perturb = float(value) except (TypeError, ValueError): perturb = float(value[0]) set_characteristic_wave_value(gpu_dom, perturb) for B in self._gpu_flather_boundaries: value = B.get_boundary_values() try: stage_val = float(value) except (TypeError, ValueError): stage_val = float(value[0]) set_flather_value(gpu_dom, stage_val) # Time remaining to the next yield boundary. Use the explicitly tracked # relative_yieldtime (set and incremented by the evolve loop), NOT # (relative_time % yieldstep): the modulo is floating-point fragile, e.g. # 0.3 % 0.1 == 0.0999... gives remaining ~1e-17 -> max_timestep ~0 -> # the step never advances and evolve() spins. Mirrors update_timestep(). relative_yieldtime = getattr(self, 'relative_yieldtime', None) if relative_yieldtime is not None: remaining_yieldstep = relative_yieldtime - self.get_relative_time() else: remaining_yieldstep = yieldstep if finaltime is not None: remaining_finaltime = finaltime - self.get_time() max_timestep = min(self.evolve_max_timestep, remaining_yieldstep, remaining_finaltime) else: max_timestep = min(self.evolve_max_timestep, remaining_yieldstep) if not hasattr(self, '_ader2_prev_dt'): self._ader2_prev_dt = 0.0 self.timestep = evolve_one_ader2_step_gpu(gpu_dom, max_timestep, 1, self._ader2_prev_dt) self._ader2_prev_dt = self.timestep # Do NOT advance relative_time here — the evolve loop does it after # apply_fractional_steps(); see _evolve_one_euler_step_c for why. # Record the CFL-constrained step (pre yield/final cap), matching legacy # update_timestep(), rather than the yield-limited step actually taken. cfl_dt = gpu_dom.recorded_flux_timestep self.recorded_max_timestep = max(cfl_dt, self.recorded_max_timestep) self.recorded_min_timestep = min(cfl_dt, self.recorded_min_timestep) # Post-step ghost exchange — update_ghosts() is a no-op in GPU mode if self.ghost_layer_width < 4: from anuga.shallow_water.sw_domain_gpu_ext import exchange_ghosts exchange_ghosts(gpu_dom) def _evolve_one_rk2_step_gpu(self, yieldstep, finaltime): """Python-orchestrated GPU implementation of RK2 step. This is a fallback for when the C RK2 loop cannot be used (e.g., unsupported boundary types). Prefer _evolve_one_rk2_step_c() for better performance. """ from anuga.shallow_water.sw_domain_gpu_ext import ( backup_conserved_quantities_gpu, extrapolate_second_order_gpu, protect_gpu, compute_fluxes_gpu, update_conserved_quantities_gpu, saxpy_conserved_quantities_gpu, sync_boundary_values, init_boundary_edge_sync, boundary_edge_sync, exchange_ghosts, evaluate_reflective_boundary_gpu, evaluate_dirichlet_boundary_gpu, evaluate_transmissive_boundary_gpu, set_transmissive_n_zero_t_stage, evaluate_transmissive_n_zero_t_boundary_gpu, set_time_boundary_values, evaluate_time_boundary_gpu, set_file_boundary_values_from_domain, evaluate_file_boundary_gpu, set_absorbing_wave_value, evaluate_absorbing_wave_boundary_gpu, set_characteristic_wave_value, evaluate_characteristic_wave_boundary_gpu, set_flather_value, evaluate_flather_boundary_gpu, ) import numpy as np gpu_dom = self.gpu_interface.gpu_dom # Supported GPU boundary types GPU_BOUNDARY_TYPES = {'Reflective_boundary', 'Dirichlet_boundary', 'Transmissive_boundary', 'Transmissive_n_momentum_zero_t_momentum_set_stage_boundary', 'Time_boundary', 'File_boundary', 'Field_boundary', 'Absorbing_wave_boundary', 'Characteristic_wave_boundary'} # Lazy init: identify which boundaries need CPU evaluation vs GPU if not hasattr(self, '_gpu_boundary_info_initialized'): self._gpu_cpu_tags = [] self._gpu_all_on_gpu = True cpu_boundary_types = [] self._gpu_transmissive_n_zero_t_boundaries = [] self._gpu_time_boundaries = [] self._gpu_absorbing_wave_boundaries = [] self._gpu_characteristic_wave_boundaries = [] for tag, B in self.boundary_map.items(): if B is not None: btype = B.__class__.__name__ if btype not in GPU_BOUNDARY_TYPES: self._gpu_cpu_tags.append(tag) self._gpu_all_on_gpu = False cpu_boundary_types.append((tag, btype)) elif btype == 'Transmissive_n_momentum_zero_t_momentum_set_stage_boundary': self._gpu_transmissive_n_zero_t_boundaries.append(B) elif btype == 'Time_boundary': self._gpu_time_boundaries.append(B) elif btype == 'Absorbing_wave_boundary': self._gpu_absorbing_wave_boundaries.append(B) elif btype == 'Characteristic_wave_boundary': self._gpu_characteristic_wave_boundaries.append(B) if not self._gpu_all_on_gpu: boundary_cell_ids = np.unique(self.boundary_cells).astype(np.intc) init_boundary_edge_sync(gpu_dom, boundary_cell_ids) print("WARNING: GPU boundary evaluation disabled - falling back to CPU") print(f" Unsupported boundary types: {cpu_boundary_types}") self._gpu_boundary_info_initialized = True # Backup for RK2 backup_conserved_quantities_gpu(gpu_dom) # ========================================== # First Euler step # ========================================== protect_gpu(gpu_dom) extrapolate_second_order_gpu(gpu_dom) # Evaluate boundaries if self._gpu_all_on_gpu: evaluate_reflective_boundary_gpu(gpu_dom) evaluate_dirichlet_boundary_gpu(gpu_dom) evaluate_transmissive_boundary_gpu(gpu_dom) for B in self._gpu_transmissive_n_zero_t_boundaries: stage_val = B.get_boundary_values() try: stage_val = float(stage_val) except (TypeError, ValueError): stage_val = float(stage_val[0]) set_transmissive_n_zero_t_stage(gpu_dom, stage_val) evaluate_transmissive_n_zero_t_boundary_gpu(gpu_dom) for B in self._gpu_time_boundaries: q = B.get_boundary_values() set_time_boundary_values(gpu_dom, float(q[0]), float(q[1]), float(q[2])) evaluate_time_boundary_gpu(gpu_dom) set_file_boundary_values_from_domain(gpu_dom, self) evaluate_file_boundary_gpu(gpu_dom) for B in self._gpu_absorbing_wave_boundaries: value = B.get_boundary_values() try: wave_val = float(value) except (TypeError, ValueError): wave_val = float(value[0]) set_absorbing_wave_value(gpu_dom, wave_val) evaluate_absorbing_wave_boundary_gpu(gpu_dom) for B in self._gpu_characteristic_wave_boundaries: value = B.get_boundary_values() try: perturb = float(value) except (TypeError, ValueError): perturb = float(value[0]) set_characteristic_wave_value(gpu_dom, perturb) evaluate_characteristic_wave_boundary_gpu(gpu_dom) for B in self._gpu_flather_boundaries: value = B.get_boundary_values() try: stage_val = float(value) except (TypeError, ValueError): stage_val = float(value[0]) set_flather_value(gpu_dom, stage_val) evaluate_flather_boundary_gpu(gpu_dom) else: boundary_edge_sync(gpu_dom) for tag in self.tag_boundary_cells: B = self.boundary_map[tag] if B is not None: B.evaluate_segment(self, self.tag_boundary_cells[tag]) sync_boundary_values(gpu_dom) # Compute fluxes self.flux_timestep = compute_fluxes_gpu(gpu_dom) # Forcing terms self.compute_forcing_terms() # Update timestep self.update_timestep(yieldstep, finaltime) # Update conserved quantities update_conserved_quantities_gpu(gpu_dom, self.timestep) # End of first Euler step self.set_relative_time(self.get_relative_time() + self.timestep) # Ghost exchange (MPI) if self.ghost_layer_width < 4: exchange_ghosts(gpu_dom) # ========================================= # Second Euler step # ========================================= protect_gpu(gpu_dom) extrapolate_second_order_gpu(gpu_dom) # Evaluate boundaries if self._gpu_all_on_gpu: evaluate_reflective_boundary_gpu(gpu_dom) evaluate_dirichlet_boundary_gpu(gpu_dom) evaluate_transmissive_boundary_gpu(gpu_dom) for B in self._gpu_transmissive_n_zero_t_boundaries: stage_val = B.get_boundary_values() try: stage_val = float(stage_val) except (TypeError, ValueError): stage_val = float(stage_val[0]) set_transmissive_n_zero_t_stage(gpu_dom, stage_val) evaluate_transmissive_n_zero_t_boundary_gpu(gpu_dom) for B in self._gpu_time_boundaries: q = B.get_boundary_values() set_time_boundary_values(gpu_dom, float(q[0]), float(q[1]), float(q[2])) evaluate_time_boundary_gpu(gpu_dom) set_file_boundary_values_from_domain(gpu_dom, self) evaluate_file_boundary_gpu(gpu_dom) for B in self._gpu_absorbing_wave_boundaries: value = B.get_boundary_values() try: wave_val = float(value) except (TypeError, ValueError): wave_val = float(value[0]) set_absorbing_wave_value(gpu_dom, wave_val) evaluate_absorbing_wave_boundary_gpu(gpu_dom) for B in self._gpu_characteristic_wave_boundaries: value = B.get_boundary_values() try: perturb = float(value) except (TypeError, ValueError): perturb = float(value[0]) set_characteristic_wave_value(gpu_dom, perturb) evaluate_characteristic_wave_boundary_gpu(gpu_dom) for B in self._gpu_flather_boundaries: value = B.get_boundary_values() try: stage_val = float(value) except (TypeError, ValueError): stage_val = float(value[0]) set_flather_value(gpu_dom, stage_val) evaluate_flather_boundary_gpu(gpu_dom) else: boundary_edge_sync(gpu_dom) for tag in self.tag_boundary_cells: B = self.boundary_map[tag] if B is not None: B.evaluate_segment(self, self.tag_boundary_cells[tag]) sync_boundary_values(gpu_dom) # Compute fluxes (ignore timestep from second step) compute_fluxes_gpu(gpu_dom) # Forcing terms self.compute_forcing_terms() # Update conserved quantities update_conserved_quantities_gpu(gpu_dom, self.timestep) # RK2 averaging saxpy_conserved_quantities_gpu(gpu_dom, 0.5, 0.5) # Post-step ghost exchange — update_ghosts() is a no-op in GPU mode if self.ghost_layer_width < 4: exchange_ghosts(gpu_dom) def _evolve_one_euler_step_gpu(self, yieldstep, finaltime): """Python-orchestrated GPU Euler (DE0) step. Fallback for _evolve_one_euler_step_c() when the boundary map contains a type the C Euler loop cannot evaluate on the device (e.g. Transmissive_momentum_set_stage_boundary). GPU-supported boundaries are evaluated on the device; any others are evaluated on the host via evaluate_segment() and synced back. This keeps DE0 results correct (identical to legacy) for every boundary type — the same fallback that rk2/rk3/ader2 already perform. Prefer _evolve_one_euler_step_c() (faster) when all boundaries are GPU-supported. """ from anuga.shallow_water.sw_domain_gpu_ext import ( extrapolate_second_order_gpu, protect_gpu, compute_fluxes_gpu, update_conserved_quantities_gpu, sync_boundary_values, init_boundary_edge_sync, boundary_edge_sync, exchange_ghosts, evaluate_reflective_boundary_gpu, evaluate_dirichlet_boundary_gpu, evaluate_transmissive_boundary_gpu, set_transmissive_n_zero_t_stage, evaluate_transmissive_n_zero_t_boundary_gpu, set_time_boundary_values, evaluate_time_boundary_gpu, set_file_boundary_values_from_domain, evaluate_file_boundary_gpu, set_absorbing_wave_value, evaluate_absorbing_wave_boundary_gpu, set_characteristic_wave_value, evaluate_characteristic_wave_boundary_gpu, ) import numpy as np gpu_dom = self.gpu_interface.gpu_dom GPU_BOUNDARY_TYPES = {'Reflective_boundary', 'Dirichlet_boundary', 'Transmissive_boundary', 'Transmissive_n_momentum_zero_t_momentum_set_stage_boundary', 'Time_boundary', 'File_boundary', 'Field_boundary', 'Absorbing_wave_boundary', 'Characteristic_wave_boundary'} if not hasattr(self, '_gpu_boundary_info_initialized'): self._gpu_cpu_tags = [] self._gpu_all_on_gpu = True cpu_boundary_types = [] self._gpu_transmissive_n_zero_t_boundaries = [] self._gpu_time_boundaries = [] self._gpu_absorbing_wave_boundaries = [] self._gpu_characteristic_wave_boundaries = [] for tag, B in self.boundary_map.items(): if B is not None: btype = B.__class__.__name__ if btype not in GPU_BOUNDARY_TYPES: self._gpu_cpu_tags.append(tag) self._gpu_all_on_gpu = False cpu_boundary_types.append((tag, btype)) elif btype == 'Transmissive_n_momentum_zero_t_momentum_set_stage_boundary': self._gpu_transmissive_n_zero_t_boundaries.append(B) elif btype == 'Time_boundary': self._gpu_time_boundaries.append(B) elif btype == 'Absorbing_wave_boundary': self._gpu_absorbing_wave_boundaries.append(B) elif btype == 'Characteristic_wave_boundary': self._gpu_characteristic_wave_boundaries.append(B) if not self._gpu_all_on_gpu: boundary_cell_ids = np.unique(self.boundary_cells).astype(np.intc) init_boundary_edge_sync(gpu_dom, boundary_cell_ids) self._gpu_boundary_info_initialized = True protect_gpu(gpu_dom) extrapolate_second_order_gpu(gpu_dom) if self._gpu_all_on_gpu: evaluate_reflective_boundary_gpu(gpu_dom) evaluate_dirichlet_boundary_gpu(gpu_dom) evaluate_transmissive_boundary_gpu(gpu_dom) for B in self._gpu_transmissive_n_zero_t_boundaries: stage_val = B.get_boundary_values() try: stage_val = float(stage_val) except (TypeError, ValueError): stage_val = float(stage_val[0]) set_transmissive_n_zero_t_stage(gpu_dom, stage_val) evaluate_transmissive_n_zero_t_boundary_gpu(gpu_dom) for B in self._gpu_time_boundaries: q = B.get_boundary_values() set_time_boundary_values(gpu_dom, float(q[0]), float(q[1]), float(q[2])) evaluate_time_boundary_gpu(gpu_dom) set_file_boundary_values_from_domain(gpu_dom, self) evaluate_file_boundary_gpu(gpu_dom) for B in self._gpu_absorbing_wave_boundaries: value = B.get_boundary_values() try: wave_val = float(value) except (TypeError, ValueError): wave_val = float(value[0]) set_absorbing_wave_value(gpu_dom, wave_val) evaluate_absorbing_wave_boundary_gpu(gpu_dom) for B in self._gpu_characteristic_wave_boundaries: value = B.get_boundary_values() try: perturb = float(value) except (TypeError, ValueError): perturb = float(value[0]) set_characteristic_wave_value(gpu_dom, perturb) evaluate_characteristic_wave_boundary_gpu(gpu_dom) else: # Host evaluation of any non-GPU boundary type (e.g. # Transmissive_momentum_set_stage_boundary), then sync edge values # back to the device for the flux kernel. boundary_edge_sync(gpu_dom) for tag in self.tag_boundary_cells: B = self.boundary_map[tag] if B is not None: B.evaluate_segment(self, self.tag_boundary_cells[tag]) sync_boundary_values(gpu_dom) # Compute fluxes (sets flux_timestep = CFL flux step) self.flux_timestep = compute_fluxes_gpu(gpu_dom) # Forcing terms (friction) self.compute_forcing_terms() # Update timestep to fit yieldstep/finaltime (also records # recorded_min/max_timestep from the CFL constraint). self.update_timestep(yieldstep, finaltime) # Update conserved quantities update_conserved_quantities_gpu(gpu_dom, self.timestep) # Do NOT advance relative_time here — the evolve loop advances it after # apply_fractional_steps(); see _evolve_one_euler_step_c for why. # Post-step ghost exchange if self.ghost_layer_width < 4: exchange_ghosts(gpu_dom) def _evolve_one_euler_step_c(self, yieldstep, finaltime): """Euler step executed entirely in C - eliminates Python round-trip overhead. All kernel calls (extrapolation, boundaries, fluxes, forcing, update) happen in C with MPI timestep reduction also in C. Only one Python->C call per step. Limitations: - Only supports GPU-evaluated boundary types - Rate_operators must be applied separately (after this call) """ from anuga.shallow_water.sw_domain_gpu_ext import ( evolve_one_euler_step_gpu, set_transmissive_n_zero_t_stage, set_time_boundary_values, set_file_boundary_values_from_domain, set_absorbing_wave_value, set_characteristic_wave_value, set_flather_value, exchange_ghosts, ) gpu_dom = self.gpu_interface.gpu_dom GPU_BOUNDARY_TYPES = {'Reflective_boundary', 'Dirichlet_boundary', 'Transmissive_boundary', 'Transmissive_n_momentum_zero_t_momentum_set_stage_boundary', 'Time_boundary', 'File_boundary', 'Field_boundary', 'Absorbing_wave_boundary', 'Characteristic_wave_boundary', 'Flather_boundary'} if not hasattr(self, '_gpu_boundary_info_initialized'): self._gpu_cpu_tags = [] self._gpu_all_on_gpu = True cpu_boundary_types = [] self._gpu_transmissive_n_zero_t_boundaries = [] self._gpu_time_boundaries = [] self._gpu_absorbing_wave_boundaries = [] self._gpu_characteristic_wave_boundaries = [] self._gpu_flather_boundaries = [] for tag, B in self.boundary_map.items(): if B is not None: btype = B.__class__.__name__ if btype not in GPU_BOUNDARY_TYPES: self._gpu_cpu_tags.append(tag) self._gpu_all_on_gpu = False cpu_boundary_types.append((tag, btype)) elif btype == 'Transmissive_n_momentum_zero_t_momentum_set_stage_boundary': self._gpu_transmissive_n_zero_t_boundaries.append(B) elif btype == 'Time_boundary': self._gpu_time_boundaries.append(B) elif btype == 'Absorbing_wave_boundary': self._gpu_absorbing_wave_boundaries.append(B) elif btype == 'Characteristic_wave_boundary': self._gpu_characteristic_wave_boundaries.append(B) elif btype == 'Flather_boundary': self._gpu_flather_boundaries.append(B) if not self._gpu_all_on_gpu: print("WARNING: C Euler loop requires all GPU-supported boundary types") print(" Falling back to Python-orchestrated GPU loop") print(" Unsupported types: " + str(cpu_boundary_types)) self._gpu_boundary_info_initialized = True # Boundaries the C loop cannot evaluate on the device (e.g. # Transmissive_momentum_set_stage_boundary) are handled by the # Python-orchestrated fallback, which evaluates them on the host and # syncs — matching rk2/rk3/ader2. Without this, those boundaries are # silently ignored in DE0 and results diverge from legacy. if not self._gpu_all_on_gpu: return self._evolve_one_euler_step_gpu(yieldstep, finaltime) # Set time-dependent boundary values before calling C function for B in self._gpu_transmissive_n_zero_t_boundaries: stage_val = B.get_boundary_values() try: stage_val = float(stage_val) except (TypeError, ValueError): stage_val = float(stage_val[0]) set_transmissive_n_zero_t_stage(gpu_dom, stage_val) for B in self._gpu_time_boundaries: q = B.get_boundary_values() set_time_boundary_values(gpu_dom, float(q[0]), float(q[1]), float(q[2])) set_file_boundary_values_from_domain(gpu_dom, self) for B in self._gpu_absorbing_wave_boundaries: value = B.get_boundary_values() try: wave_val = float(value) except (TypeError, ValueError): wave_val = float(value[0]) set_absorbing_wave_value(gpu_dom, wave_val) for B in self._gpu_characteristic_wave_boundaries: value = B.get_boundary_values() try: perturb = float(value) except (TypeError, ValueError): perturb = float(value[0]) set_characteristic_wave_value(gpu_dom, perturb) for B in self._gpu_flather_boundaries: value = B.get_boundary_values() try: stage_val = float(value) except (TypeError, ValueError): stage_val = float(value[0]) set_flather_value(gpu_dom, stage_val) # Compute max allowed timestep (mirrors update_timestep() logic) # Time remaining to the next yield boundary. Use the explicitly tracked # relative_yieldtime (set and incremented by the evolve loop), NOT # (relative_time % yieldstep): the modulo is floating-point fragile, e.g. # 0.3 % 0.1 == 0.0999... gives remaining ~1e-17 -> max_timestep ~0 -> # the step never advances and evolve() spins. Mirrors update_timestep(). relative_yieldtime = getattr(self, 'relative_yieldtime', None) if relative_yieldtime is not None: remaining_yieldstep = relative_yieldtime - self.get_relative_time() else: remaining_yieldstep = yieldstep if finaltime is not None: remaining_finaltime = finaltime - self.get_time() max_timestep = min(self.evolve_max_timestep, remaining_yieldstep, remaining_finaltime) else: max_timestep = min(self.evolve_max_timestep, remaining_yieldstep) # Execute full Euler step in C (includes MPI timestep reduction) self.timestep = evolve_one_euler_step_gpu(gpu_dom, max_timestep, 1) # NOTE: do NOT advance relative_time here. The evolve loop advances it # (relative_time = initial_relative_time + timestep) AFTER # apply_fractional_steps(), so advancing it here makes time-dependent # operators (variable-Q inlet, time-varying rate, ...) see the time one # step too far — they would evaluate forcing at t+dt instead of t. # Record the CFL-constrained step (pre yield/final cap), matching legacy # update_timestep(), rather than the yield-limited step actually taken. cfl_dt = gpu_dom.recorded_flux_timestep self.recorded_max_timestep = max(cfl_dt, self.recorded_max_timestep) self.recorded_min_timestep = min(cfl_dt, self.recorded_min_timestep) # Post-step ghost exchange — update_ghosts() is a no-op in GPU mode if self.ghost_layer_width < 4: exchange_ghosts(gpu_dom) def _evolve_one_rk2_step_c(self, yieldstep, finaltime): """RK2 step executed entirely in C - eliminates Python round-trip overhead. This is faster than _evolve_one_rk2_step_gpu() because: - All kernel calls happen in C without Python round-trips - MPI reduction for timestep happens in C - Only one Python->C call per RK2 step Limitations: - Only supports GPU-evaluated boundary types - Rate_operators must be applied separately (after this call) """ from anuga.shallow_water.sw_domain_gpu_ext import ( evolve_one_rk2_step_gpu, set_transmissive_n_zero_t_stage, set_time_boundary_values, set_file_boundary_values_from_domain, set_absorbing_wave_value, set_characteristic_wave_value, set_flather_value, ) gpu_dom = self.gpu_interface.gpu_dom # Supported GPU boundary types GPU_BOUNDARY_TYPES = {'Reflective_boundary', 'Dirichlet_boundary', 'Transmissive_boundary', 'Transmissive_n_momentum_zero_t_momentum_set_stage_boundary', 'Time_boundary', 'File_boundary', 'Field_boundary', 'Absorbing_wave_boundary', 'Characteristic_wave_boundary', 'Flather_boundary'} # Lazy init: identify which boundaries need special handling if not hasattr(self, '_gpu_boundary_info_initialized'): self._gpu_cpu_tags = [] self._gpu_all_on_gpu = True cpu_boundary_types = [] self._gpu_transmissive_n_zero_t_boundaries = [] self._gpu_time_boundaries = [] self._gpu_absorbing_wave_boundaries = [] self._gpu_characteristic_wave_boundaries = [] self._gpu_flather_boundaries = [] for tag, B in self.boundary_map.items(): if B is not None: btype = B.__class__.__name__ if btype not in GPU_BOUNDARY_TYPES: self._gpu_cpu_tags.append(tag) self._gpu_all_on_gpu = False cpu_boundary_types.append((tag, btype)) elif btype == 'Transmissive_n_momentum_zero_t_momentum_set_stage_boundary': self._gpu_transmissive_n_zero_t_boundaries.append(B) elif btype == 'Time_boundary': self._gpu_time_boundaries.append(B) elif btype == 'Absorbing_wave_boundary': self._gpu_absorbing_wave_boundaries.append(B) elif btype == 'Characteristic_wave_boundary': self._gpu_characteristic_wave_boundaries.append(B) elif btype == 'Flather_boundary': self._gpu_flather_boundaries.append(B) if not self._gpu_all_on_gpu: print("WARNING: C RK2 loop requires all GPU-supported boundary types") print(" Falling back to Python-orchestrated GPU loop") print(f" Unsupported types: {cpu_boundary_types}") self._gpu_boundary_info_initialized = True # If any boundary requires CPU, fall back to Python-orchestrated loop if not self._gpu_all_on_gpu: return self._evolve_one_rk2_step_gpu(yieldstep, finaltime) # Set time-dependent boundary values BEFORE calling C function # Python function calls are cheap (~microseconds) for B in self._gpu_transmissive_n_zero_t_boundaries: stage_val = B.get_boundary_values() try: stage_val = float(stage_val) except (TypeError, ValueError): stage_val = float(stage_val[0]) set_transmissive_n_zero_t_stage(gpu_dom, stage_val) for B in self._gpu_time_boundaries: q = B.get_boundary_values() set_time_boundary_values(gpu_dom, float(q[0]), float(q[1]), float(q[2])) set_file_boundary_values_from_domain(gpu_dom, self) for B in self._gpu_absorbing_wave_boundaries: value = B.get_boundary_values() try: wave_val = float(value) except (TypeError, ValueError): wave_val = float(value[0]) set_absorbing_wave_value(gpu_dom, wave_val) for B in self._gpu_characteristic_wave_boundaries: value = B.get_boundary_values() try: perturb = float(value) except (TypeError, ValueError): perturb = float(value[0]) set_characteristic_wave_value(gpu_dom, perturb) for B in self._gpu_flather_boundaries: value = B.get_boundary_values() try: stage_val = float(value) except (TypeError, ValueError): stage_val = float(value[0]) set_flather_value(gpu_dom, stage_val) # Compute max allowed timestep (respecting yieldstep and finaltime) # This mirrors the logic in update_timestep() # Time remaining to the next yield boundary. Use the explicitly tracked # relative_yieldtime (set and incremented by the evolve loop), NOT # (relative_time % yieldstep): the modulo is floating-point fragile, e.g. # 0.3 % 0.1 == 0.0999... gives remaining ~1e-17 -> max_timestep ~0 -> # the step never advances and evolve() spins. Mirrors update_timestep(). relative_yieldtime = getattr(self, 'relative_yieldtime', None) if relative_yieldtime is not None: remaining_yieldstep = relative_yieldtime - self.get_relative_time() else: remaining_yieldstep = yieldstep if finaltime is not None: remaining_finaltime = finaltime - self.get_time() max_timestep = min(self.evolve_max_timestep, remaining_yieldstep, remaining_finaltime) else: max_timestep = min(self.evolve_max_timestep, remaining_yieldstep) # Execute full RK2 step in C (includes MPI timestep reduction) # apply_forcing=1 enables Manning friction on GPU self.timestep = evolve_one_rk2_step_gpu(gpu_dom, max_timestep, 1) # Do NOT advance relative_time here — the evolve loop does it after # apply_fractional_steps(); see _evolve_one_euler_step_c for why. # Record the CFL-constrained step (pre yield/final cap), matching legacy # update_timestep(), rather than the yield-limited step actually taken. cfl_dt = gpu_dom.recorded_flux_timestep self.recorded_max_timestep = max(cfl_dt, self.recorded_max_timestep) self.recorded_min_timestep = min(cfl_dt, self.recorded_min_timestep) # Post-step ghost exchange — update_ghosts() is a no-op in GPU mode if self.ghost_layer_width < 4: from anuga.shallow_water.sw_domain_gpu_ext import exchange_ghosts exchange_ghosts(gpu_dom) def _evolve_one_rk3_step_gpu(self, yieldstep, finaltime): """Python-orchestrated GPU implementation of SSP-RK3 step. This is a fallback for when the C RK3 loop cannot be used (e.g., unsupported boundary types). Prefer _evolve_one_rk3_step_c() for better performance. """ from anuga.shallow_water.sw_domain_gpu_ext import ( backup_conserved_quantities_gpu, extrapolate_second_order_gpu, protect_gpu, compute_fluxes_gpu, update_conserved_quantities_gpu, saxpy_conserved_quantities_gpu, saxpy3_conserved_quantities_gpu, sync_boundary_values, init_boundary_edge_sync, boundary_edge_sync, exchange_ghosts, evaluate_reflective_boundary_gpu, evaluate_dirichlet_boundary_gpu, evaluate_transmissive_boundary_gpu, set_transmissive_n_zero_t_stage, evaluate_transmissive_n_zero_t_boundary_gpu, set_time_boundary_values, evaluate_time_boundary_gpu, set_file_boundary_values_from_domain, evaluate_file_boundary_gpu, set_absorbing_wave_value, evaluate_absorbing_wave_boundary_gpu, set_characteristic_wave_value, evaluate_characteristic_wave_boundary_gpu, set_flather_value, evaluate_flather_boundary_gpu, ) import numpy as np gpu_dom = self.gpu_interface.gpu_dom # Supported GPU boundary types GPU_BOUNDARY_TYPES = {'Reflective_boundary', 'Dirichlet_boundary', 'Transmissive_boundary', 'Transmissive_n_momentum_zero_t_momentum_set_stage_boundary', 'Time_boundary', 'File_boundary', 'Field_boundary', 'Absorbing_wave_boundary', 'Characteristic_wave_boundary'} # Lazy init: identify which boundaries need CPU evaluation vs GPU if not hasattr(self, '_gpu_boundary_info_initialized'): self._gpu_cpu_tags = [] self._gpu_all_on_gpu = True cpu_boundary_types = [] self._gpu_transmissive_n_zero_t_boundaries = [] self._gpu_time_boundaries = [] self._gpu_absorbing_wave_boundaries = [] self._gpu_characteristic_wave_boundaries = [] for tag, B in self.boundary_map.items(): if B is not None: btype = B.__class__.__name__ if btype not in GPU_BOUNDARY_TYPES: self._gpu_cpu_tags.append(tag) self._gpu_all_on_gpu = False cpu_boundary_types.append((tag, btype)) elif btype == 'Transmissive_n_momentum_zero_t_momentum_set_stage_boundary': self._gpu_transmissive_n_zero_t_boundaries.append(B) elif btype == 'Time_boundary': self._gpu_time_boundaries.append(B) elif btype == 'Absorbing_wave_boundary': self._gpu_absorbing_wave_boundaries.append(B) elif btype == 'Characteristic_wave_boundary': self._gpu_characteristic_wave_boundaries.append(B) if not self._gpu_all_on_gpu: boundary_cell_ids = np.unique(self.boundary_cells).astype(np.intc) init_boundary_edge_sync(gpu_dom, boundary_cell_ids) print("WARNING: GPU boundary evaluation disabled - falling back to CPU") print(f" Unsupported boundary types: {cpu_boundary_types}") self._gpu_boundary_info_initialized = True def _eval_boundaries(): """Evaluate all boundary conditions on GPU (or CPU fallback).""" if self._gpu_all_on_gpu: evaluate_reflective_boundary_gpu(gpu_dom) evaluate_dirichlet_boundary_gpu(gpu_dom) evaluate_transmissive_boundary_gpu(gpu_dom) for B in self._gpu_transmissive_n_zero_t_boundaries: stage_val = B.get_boundary_values() try: stage_val = float(stage_val) except (TypeError, ValueError): stage_val = float(stage_val[0]) set_transmissive_n_zero_t_stage(gpu_dom, stage_val) evaluate_transmissive_n_zero_t_boundary_gpu(gpu_dom) for B in self._gpu_time_boundaries: q = B.get_boundary_values() set_time_boundary_values(gpu_dom, float(q[0]), float(q[1]), float(q[2])) evaluate_time_boundary_gpu(gpu_dom) set_file_boundary_values_from_domain(gpu_dom, self) evaluate_file_boundary_gpu(gpu_dom) for B in self._gpu_absorbing_wave_boundaries: value = B.get_boundary_values() try: wave_val = float(value) except (TypeError, ValueError): wave_val = float(value[0]) set_absorbing_wave_value(gpu_dom, wave_val) evaluate_absorbing_wave_boundary_gpu(gpu_dom) for B in self._gpu_characteristic_wave_boundaries: value = B.get_boundary_values() try: perturb = float(value) except (TypeError, ValueError): perturb = float(value[0]) set_characteristic_wave_value(gpu_dom, perturb) evaluate_characteristic_wave_boundary_gpu(gpu_dom) for B in self._gpu_flather_boundaries: value = B.get_boundary_values() try: stage_val = float(value) except (TypeError, ValueError): stage_val = float(value[0]) set_flather_value(gpu_dom, stage_val) evaluate_flather_boundary_gpu(gpu_dom) else: boundary_edge_sync(gpu_dom) for tag in self.tag_boundary_cells: B = self.boundary_map[tag] if B is not None: B.evaluate_segment(self, self.tag_boundary_cells[tag]) sync_boundary_values(gpu_dom) initial_relative_time = self.get_relative_time() # Backup Q^n backup_conserved_quantities_gpu(gpu_dom) # ========================================== # Stage 1: Q^(1) = Q^n + h*L(Q^n) # ========================================== protect_gpu(gpu_dom) extrapolate_second_order_gpu(gpu_dom) _eval_boundaries() self.flux_timestep = compute_fluxes_gpu(gpu_dom) # Forcing terms self.compute_forcing_terms() # Update timestep self.update_timestep(yieldstep, finaltime) update_conserved_quantities_gpu(gpu_dom, self.timestep) self.set_relative_time(initial_relative_time + self.timestep) if self.ghost_layer_width < 4: exchange_ghosts(gpu_dom) # ========================================== # Stage 2: Q^(2) = Q^(1) + h*L(Q^(1)) # ========================================== protect_gpu(gpu_dom) extrapolate_second_order_gpu(gpu_dom) _eval_boundaries() compute_fluxes_gpu(gpu_dom) self.compute_forcing_terms() update_conserved_quantities_gpu(gpu_dom, self.timestep) # Intermediate: Q = 0.25*Q^(2) + 0.75*Q^n saxpy_conserved_quantities_gpu(gpu_dom, 0.25, 0.75) self.set_relative_time(initial_relative_time + self.timestep * 0.5) if self.ghost_layer_width < 4: exchange_ghosts(gpu_dom) # ========================================== # Stage 3: Q^(3) = Q^(1)_mid + h*L(Q^(1)_mid) # ========================================== protect_gpu(gpu_dom) extrapolate_second_order_gpu(gpu_dom) _eval_boundaries() compute_fluxes_gpu(gpu_dom) self.compute_forcing_terms() update_conserved_quantities_gpu(gpu_dom, self.timestep) # Final: Q^{n+1} = (2*Q^(3) + Q^n) / 3 saxpy3_conserved_quantities_gpu(gpu_dom, 2.0, 1.0, 3.0) self.set_relative_time(initial_relative_time + self.timestep) # Post-step ghost exchange — update_ghosts() is a no-op in GPU mode if self.ghost_layer_width < 4: exchange_ghosts(gpu_dom) def _evolve_one_rk3_step_c(self, yieldstep, finaltime): """RK3 step executed entirely in C - eliminates Python round-trip overhead. This is faster than _evolve_one_rk3_step_gpu() because: - All kernel calls happen in C without Python round-trips - MPI reduction for timestep happens in C - Only one Python->C call per RK3 step Limitations: - Only supports GPU-evaluated boundary types - Rate_operators must be applied separately (after this call) """ from anuga.shallow_water.sw_domain_gpu_ext import ( evolve_one_rk3_step_gpu, set_transmissive_n_zero_t_stage, set_time_boundary_values, set_file_boundary_values_from_domain, set_absorbing_wave_value, set_characteristic_wave_value, set_flather_value, ) gpu_dom = self.gpu_interface.gpu_dom # Supported GPU boundary types GPU_BOUNDARY_TYPES = {'Reflective_boundary', 'Dirichlet_boundary', 'Transmissive_boundary', 'Transmissive_n_momentum_zero_t_momentum_set_stage_boundary', 'Time_boundary', 'File_boundary', 'Field_boundary', 'Absorbing_wave_boundary', 'Characteristic_wave_boundary', 'Flather_boundary'} # Lazy init: identify which boundaries need special handling if not hasattr(self, '_gpu_boundary_info_initialized'): self._gpu_cpu_tags = [] self._gpu_all_on_gpu = True cpu_boundary_types = [] self._gpu_transmissive_n_zero_t_boundaries = [] self._gpu_time_boundaries = [] self._gpu_absorbing_wave_boundaries = [] self._gpu_characteristic_wave_boundaries = [] self._gpu_flather_boundaries = [] for tag, B in self.boundary_map.items(): if B is not None: btype = B.__class__.__name__ if btype not in GPU_BOUNDARY_TYPES: self._gpu_cpu_tags.append(tag) self._gpu_all_on_gpu = False cpu_boundary_types.append((tag, btype)) elif btype == 'Transmissive_n_momentum_zero_t_momentum_set_stage_boundary': self._gpu_transmissive_n_zero_t_boundaries.append(B) elif btype == 'Time_boundary': self._gpu_time_boundaries.append(B) elif btype == 'Absorbing_wave_boundary': self._gpu_absorbing_wave_boundaries.append(B) elif btype == 'Characteristic_wave_boundary': self._gpu_characteristic_wave_boundaries.append(B) elif btype == 'Flather_boundary': self._gpu_flather_boundaries.append(B) if not self._gpu_all_on_gpu: print("WARNING: C RK3 loop requires all GPU-supported boundary types") print(" Falling back to Python-orchestrated GPU loop") print(f" Unsupported types: {cpu_boundary_types}") self._gpu_boundary_info_initialized = True # If any boundary requires CPU, fall back to Python-orchestrated loop if not self._gpu_all_on_gpu: return self._evolve_one_rk3_step_gpu(yieldstep, finaltime) # Set time-dependent boundary values BEFORE calling C function for B in self._gpu_transmissive_n_zero_t_boundaries: stage_val = B.get_boundary_values() try: stage_val = float(stage_val) except (TypeError, ValueError): stage_val = float(stage_val[0]) set_transmissive_n_zero_t_stage(gpu_dom, stage_val) for B in self._gpu_time_boundaries: q = B.get_boundary_values() set_time_boundary_values(gpu_dom, float(q[0]), float(q[1]), float(q[2])) set_file_boundary_values_from_domain(gpu_dom, self) for B in self._gpu_absorbing_wave_boundaries: value = B.get_boundary_values() try: wave_val = float(value) except (TypeError, ValueError): wave_val = float(value[0]) set_absorbing_wave_value(gpu_dom, wave_val) for B in self._gpu_characteristic_wave_boundaries: value = B.get_boundary_values() try: perturb = float(value) except (TypeError, ValueError): perturb = float(value[0]) set_characteristic_wave_value(gpu_dom, perturb) for B in self._gpu_flather_boundaries: value = B.get_boundary_values() try: stage_val = float(value) except (TypeError, ValueError): stage_val = float(value[0]) set_flather_value(gpu_dom, stage_val) # Compute max allowed timestep (respecting yieldstep and finaltime) # Time remaining to the next yield boundary. Use the explicitly tracked # relative_yieldtime (set and incremented by the evolve loop), NOT # (relative_time % yieldstep): the modulo is floating-point fragile, e.g. # 0.3 % 0.1 == 0.0999... gives remaining ~1e-17 -> max_timestep ~0 -> # the step never advances and evolve() spins. Mirrors update_timestep(). relative_yieldtime = getattr(self, 'relative_yieldtime', None) if relative_yieldtime is not None: remaining_yieldstep = relative_yieldtime - self.get_relative_time() else: remaining_yieldstep = yieldstep if finaltime is not None: remaining_finaltime = finaltime - self.get_time() max_timestep = min(self.evolve_max_timestep, remaining_yieldstep, remaining_finaltime) else: max_timestep = min(self.evolve_max_timestep, remaining_yieldstep) # Execute full RK3 step in C (includes MPI timestep reduction) # apply_forcing=1 enables Manning friction on GPU self.timestep = evolve_one_rk3_step_gpu(gpu_dom, max_timestep, 1) # Update internal time tracking self.set_relative_time(self.get_relative_time() + self.timestep) # Record the CFL-constrained step (pre yield/final cap), matching legacy # update_timestep(), rather than the yield-limited step actually taken. cfl_dt = gpu_dom.recorded_flux_timestep self.recorded_max_timestep = max(cfl_dt, self.recorded_max_timestep) self.recorded_min_timestep = min(cfl_dt, self.recorded_min_timestep) # Post-step ghost exchange — update_ghosts() is a no-op in GPU mode if self.ghost_layer_width < 4: from anuga.shallow_water.sw_domain_gpu_ext import exchange_ghosts exchange_ghosts(gpu_dom) def evolve_one_rk3_step(self, yieldstep, finaltime): """One 3rd order RK timestep Q^(1) = 3/4 Q^n + 1/4 E(h)^2 Q^n (at time t^n + h/2) Q^{n+1} = 1/3 Q^n + 2/3 E(h) Q^(1) (at time t^{n+1}) Does not assume that centroid values have been extrapolated to vertices and edges """ # GPU mode: use C RK loop (faster) or Python-orchestrated GPU loop if self.multiprocessor_mode == MULTIPROCESSOR_GPU and self.gpu_interface is not None: if self.use_c_rk_loop: self._evolve_one_rk3_step_c(yieldstep, finaltime) else: self._evolve_one_rk3_step_gpu(yieldstep, finaltime) return # Save initial initial conserved quantities values self.backup_conserved_quantities() initial_relative_time = self.get_relative_time() ###### # First euler step ###### # From centroid values calculate edge values self.distribute_to_vertices_and_edges(distribute_to_vertices=False) # Apply boundary conditions self.update_boundary() # Compute fluxes across each element edge # In MPI parallel mode this involves an allreduce to find global minimal timestep self.compute_fluxes() # Compute forcing terms (friction) self.compute_forcing_terms() # Update timestep to fit yieldstep and finaltime self.update_timestep(yieldstep, finaltime) # Update conserved quantities self.update_conserved_quantities() #==================================== # End of first euler step #==================================== # Update time self.set_relative_time(self.relative_time+ self.timestep) # Update ghosts self.update_ghosts() #============================================ # Second Euler step using the same timestep # calculated in the first step. Might lead to # stability problems but we have not seen any # example. #============================================ # Update edge values self.distribute_to_vertices_and_edges(distribute_to_vertices=False) # Update boundary values self.update_boundary() # Compute fluxes across each element edge # In MPI parallel mode this involves an allreduce to find global minimal timestep self.compute_fluxes() # Compute forcing terms (friction) self.compute_forcing_terms() # Update conserved quantities using timestep from first step self.update_conserved_quantities() #============================================ # End of second euler step #============================================ #============================================ # Combine steps to obtain intermediate # solution at time t^n + 0.5 h #============================================ # Combine steps self.saxpy_conserved_quantities(0.25, 0.75) # Set substep time self.set_relative_time(initial_relative_time + self.timestep * 0.5) # Update ghosts self.update_ghosts() ###### # Third Euler step ###### # Update edge values self.distribute_to_vertices_and_edges(distribute_to_vertices=False) # Update boundary values self.update_boundary() # Compute fluxes across each element edge # In MPI parallel mode this involves an allreduce to find global minimal timestep self.compute_fluxes() # Compute forcing terms (friction) self.compute_forcing_terms() # Update conserved quantities using timestep from first step self.update_conserved_quantities() #======================================= # Combine final and initial values # and cleanup #======================================= # self.saxpy_conserved_quantities(2.0/3.0, 1.0/3.0) # This caused a roundoff error that created negative water heights # So do this instead! self.saxpy_conserved_quantities(2.0, 1.0, 3.0) # Set new time self.set_relative_time(initial_relative_time + self.timestep) def backup_conserved_quantities(self): # Backup conserved_quantities centroid values if self.multiprocessor_mode == MULTIPROCESSOR_OPENMP: from .sw_domain_openmp_ext import backup_conserved_quantities backup_conserved_quantities(self) elif self.multiprocessor_mode == MULTIPROCESSOR_GPU: self.gpu_interface.backup_conserved_quantities_kernel(self) else: for name in self.conserved_quantities: Q = self.quantities[name] Q.backup_centroid_values() def saxpy_conserved_quantities(self, a, b, c=None): # saxpy conserved_quantities centroid values with backup values if self.multiprocessor_mode == MULTIPROCESSOR_OPENMP: if c is None: c = 1.0 from .sw_domain_openmp_ext import saxpy_conserved_quantities saxpy_conserved_quantities(self, a, b, c) elif self.multiprocessor_mode == MULTIPROCESSOR_GPU: if c is not None: from anuga.shallow_water.sw_domain_gpu_ext import saxpy3_conserved_quantities_gpu saxpy3_conserved_quantities_gpu(self.gpu_interface.gpu_dom, a, b, c) else: self.gpu_interface.saxpy_conserved_quantities_kernel(self, a, b) else: for name in self.conserved_quantities: Q = self.quantities[name] Q.saxpy_centroid_values(a, b) if c is not None: Q.centroid_values[:] = Q.centroid_values / c def _has_cpu_only_fractional_operators(self): """Check if any fractional step operators require CPU execution. Rate_operators with GPU support don't need CPU sync. boundary_flux_integral_operator is GPU-safe (only reads boundary_flux_sum). Boyd_box_operator/Boyd_pipe_operator are GPU-safe via GPUCulvertManager. Inlet_operator with GPU support doesn't need CPU sync. Result is cached after first call since operators don't change during simulation. """ # Check cache first if hasattr(self, '_cached_has_cpu_only_ops'): return self._cached_has_cpu_only_ops from anuga.operators.rate_operators import Rate_operator from anuga.operators.boundary_flux_integral_operator import boundary_flux_integral_operator from anuga.structures.inlet_operator import Inlet_operator from anuga.structures.gpu_culvert_manager import GPUCulvertManager from anuga.operators.collect_max_quantities_operator import Collect_max_quantities_operator # Initialize GPU culvert manager for Boyd operators if needed has_boyd_ops = any(GPUCulvertManager.is_boyd_operator(op) for op in self.fractional_step_operators) if has_boyd_ops and self.gpu_culvert_manager is None: self.gpu_culvert_manager = GPUCulvertManager(self) self.gpu_culvert_manager.register_all() result = False cpu_only_ops = [] for op in self.fractional_step_operators: op_name = op.__class__.__name__ if isinstance(op, Rate_operator): # Force GPU initialization if not already done (lazy init causes race with caching) if hasattr(op, '_init_gpu') and not getattr(op, '_gpu_initialized', False): op._init_gpu() # Rate_operator with GPU support doesn't need CPU sync if hasattr(op, '_gpu_initialized') and op._gpu_initialized: continue # GPU-accelerated, no sync needed elif isinstance(op, boundary_flux_integral_operator): # boundary_flux_integral_operator only reads boundary_flux_sum (small array) # and accumulates a scalar - doesn't need full centroid sync continue elif isinstance(op, Inlet_operator): # Force GPU initialization if not already done if hasattr(op, '_init_gpu') and not getattr(op, '_gpu_initialized', False): op._init_gpu() if hasattr(op, '_gpu_initialized') and op._gpu_initialized: continue # GPU-accelerated, no sync needed elif isinstance(op, Collect_max_quantities_operator): # GPU kernel reads device-resident quantities and updates device-resident max # arrays; no host centroid sync needed. if hasattr(op, '_init_gpu') and not getattr(op, '_gpu_initialized', False): op._init_gpu() if hasattr(op, '_gpu_initialized') and op._gpu_initialized: continue # GPU-accelerated, no sync needed elif GPUCulvertManager.is_boyd_operator(op): # Handled by GPUCulvertManager (local + cross-boundary via MPI in C) if (self.gpu_culvert_manager is not None and op in self.gpu_culvert_manager.operators): continue # All other operators need CPU sync cpu_only_ops.append(op_name) result = True rank = getattr(self, 'processor', 0) if result: # Only print GPU sync warning if GPU offload is actually active if getattr(self, 'gpu_offload_active', True): print(f"[Rank {rank}] WARNING: CPU-only fractional operators detected (GPU<->CPU sync every RK2 step):") for name in cpu_only_ops: print(f" - {name}") else: # Only print GPU-safe message if GPU offload is actually active if rank == 0 and getattr(self, 'gpu_offload_active', True): print(f"[Rank {rank}] All fractional operators are GPU-safe, no GPU<->CPU sync needed") import sys; sys.stdout.flush() self._cached_has_cpu_only_ops = result return result def apply_fractional_steps(self): """Override to sync GPU data before fractional step operators run. Boyd culvert operators are handled via GPUCulvertManager (batched, only 2 GPU sync points) instead of the per-operator Python loop. """ gpu_mode = (self.multiprocessor_mode == MULTIPROCESSOR_GPU and self.gpu_interface is not None) gpu_culverts_active = (gpu_mode and self.gpu_culvert_manager is not None and self.gpu_culvert_manager.num_culverts > 0) needs_cpu_sync = False if gpu_mode: needs_cpu_sync = self._has_cpu_only_fractional_operators() if needs_cpu_sync: self.gpu_interface.sync_from_device() # Execute all Boyd culverts in one batched GPU cycle if gpu_culverts_active: self.gpu_culvert_manager.apply_all() # Run remaining operators (skip Boyd operators handled by GPU manager) for operator in self.fractional_step_operators: if gpu_culverts_active and operator in self.gpu_culvert_manager.operators: continue # Already handled by GPUCulvertManager operator() if gpu_mode and needs_cpu_sync: self.gpu_interface.sync_to_device() def update_ghosts(self, quantities=None): """Override to use GPU ghost exchange when in GPU mode.""" if self.multiprocessor_mode == MULTIPROCESSOR_GPU and self.gpu_interface is not None: # GPU RK2 loop handles ghost exchange internally via exchange_ghosts(gpu_dom) # Don't do another exchange here - it would cause MPI message conflicts pass else: # Fall back to parent implementation super().update_ghosts(quantities) def timestepping_statistics(self, track_speeds: bool = False, triangle_id: int | None = None, relative_time: bool = False, time_unit: str = 'sec', datetime: bool = False) -> str: """Return string with time stepping statistics for printing or logging Parameters ---------- time_units : str, optional Time units for reporting. Options are 'sec', 'min', 'hr', 'day'. datetime : bool, optional Flag to use timestamp or datetime. track_speeds : bool, optional Optional boolean keyword that decides whether to report location of smallest timestep as well as a histogram and percentile report. relative_time : bool, optional Flag to report relative time instead of absolute time. triangle_id : int, optional Can be used to specify a particular triangle rather than the one with the largest speed. Returns ------- str Formatted string with time stepping statistics. """ from anuga.config import epsilon, g # Call basic machinery from parent class msg = Generic_Domain.timestepping_statistics(self, track_speeds=track_speeds, triangle_id=triangle_id, relative_time=relative_time, time_unit=time_unit, datetime=datetime) if track_speeds is True and self.max_speed is not None: # qwidth determines the text field used for quantities qwidth = self.qwidth # Selected triangle k = self.k # Report some derived quantities at vertices, edges and centroid # specific to the shallow water wave equation z = self.quantities['elevation'] w = self.quantities['stage'] Vw = w.get_values(location='vertices', indices=[k])[0] Ew = w.get_values(location='edges', indices=[k])[0] Cw = w.get_values(location='centroids', indices=[k]) Vz = z.get_values(location='vertices', indices=[k])[0] Ez = z.get_values(location='edges', indices=[k])[0] Cz = z.get_values(location='centroids', indices=[k]) name = 'depth' Vh = Vw-Vz Eh = Ew-Ez Ch = Cw-Cz message = ' %s: vertex_values = %.4f,\t %.4f,\t %.4f\n'\ % (name.ljust(qwidth), Vh[0], Vh[1], Vh[2]) message += ' %s: edge_values = %.4f,\t %.4f,\t %.4f\n'\ % (name.ljust(qwidth), Eh[0], Eh[1], Eh[2]) message += ' %s: centroid_value = %.4f\n'\ % (name.ljust(qwidth), Ch[0]) msg += message uh = self.quantities['xmomentum'] vh = self.quantities['ymomentum'] Vuh = uh.get_values(location='vertices', indices=[k])[0] Euh = uh.get_values(location='edges', indices=[k])[0] Cuh = uh.get_values(location='centroids', indices=[k]) Vvh = vh.get_values(location='vertices', indices=[k])[0] Evh = vh.get_values(location='edges', indices=[k])[0] Cvh = vh.get_values(location='centroids', indices=[k]) # Speeds in each direction Vu = Vuh/(Vh + epsilon) Eu = Euh/(Eh + epsilon) Cu = Cuh/(Ch + epsilon) name = 'U' message = ' %s: vertex_values = %.4f,\t %.4f,\t %.4f\n' \ % (name.ljust(qwidth), Vu[0], Vu[1], Vu[2]) message += ' %s: edge_values = %.4f,\t %.4f,\t %.4f\n' \ % (name.ljust(qwidth), Eu[0], Eu[1], Eu[2]) message += ' %s: centroid_value = %.4f\n' \ % (name.ljust(qwidth), Cu[0]) msg += message Vv = Vvh/(Vh + epsilon) Ev = Evh/(Eh + epsilon) Cv = Cvh/(Ch + epsilon) name = 'V' message = ' %s: vertex_values = %.4f,\t %.4f,\t %.4f\n' \ % (name.ljust(qwidth), Vv[0], Vv[1], Vv[2]) message += ' %s: edge_values = %.4f,\t %.4f,\t %.4f\n' \ % (name.ljust(qwidth), Ev[0], Ev[1], Ev[2]) message += ' %s: centroid_value = %.4f\n'\ %(name.ljust(qwidth), Cv[0]) msg += message # Froude number in each direction name = 'Froude (x)' Vfx = Vu/(num.sqrt(g*Vh + epsilon)) Efx = Eu/(num.sqrt(g*Eh + epsilon)) Cfx = Cu/(num.sqrt(g*Ch + epsilon)) message = ' %s: vertex_values = %.4f,\t %.4f,\t %.4f\n'\ % (name.ljust(qwidth), Vfx[0], Vfx[1], Vfx[2]) message += ' %s: edge_values = %.4f,\t %.4f,\t %.4f\n'\ % (name.ljust(qwidth), Efx[0], Efx[1], Efx[2]) message += ' %s: centroid_value = %.4f\n'\ % (name.ljust(qwidth), Cfx[0]) msg += message name = 'Froude (y)' Vfy = Vv/(num.sqrt(g*Vh + epsilon)) Efy = Ev/(num.sqrt(g*Eh + epsilon)) Cfy = Cv/(num.sqrt(g*Ch + epsilon)) message = ' %s: vertex_values = %.4f,\t %.4f,\t %.4f\n'\ % (name.ljust(qwidth), Vfy[0], Vfy[1], Vfy[2]) message += ' %s: edge_values = %.4f,\t %.4f,\t %.4f\n'\ % (name.ljust(qwidth), Efy[0], Efy[1], Efy[2]) message += ' %s: centroid_value = %.4f\n'\ % (name.ljust(qwidth), Cfy[0]) msg += message return msg def print_timestepping_statistics(self, *args, **kwargs) -> None: """Print time stepping statistics. Parameters ---------- time_units : str, optional Time units for reporting. Options are 'sec', 'min', 'hr', 'day'. datetime : bool, optional Flag to use timestamp or datetime. track_speed : bool, optional Optional boolean keyword that decides whether to report location of smallest timestep as well as a histogram and percentile report. relative_time : bool, optional Flag to report relative time instead of absolute time. triangle_id : int, optional Can be used to specify a particular triangle rather than the one with the largest speed. """ msg = self.timestepping_statistics(*args, **kwargs) print(msg, flush=True) def compute_boundary_flows(self) -> tuple[dict[str, float], float, float]: """Compute boundary flows at current timestep. Computes the total inflow and outflow across the domain boundary, as well as the flow across each tagged boundary segment. Returns ------- boundary_flows : dict Flow rates [m^3/s] for each boundary tag total_boundary_inflow : float Total inflow across boundary [m^3/s] total_boundary_outflow : float Total outflow across boundary [m^3/s] Notes ----- These calculations are only approximate since they don't use the flux calculation used in evolve. For exact computation, see get_boundary_flux_integral. """ # Run through boundary array and compute for each segment # the normal momentum ((uh, vh) dot normal) times segment length. # Based on sign accumulate this into boundary_inflow and # boundary_outflow. # Compute flows along boundary uh = self.get_quantity('xmomentum').get_values(location='edges') vh = self.get_quantity('ymomentum').get_values(location='edges') # Loop through edges that lie on the boundary and calculate # flows boundary_flows = {} total_boundary_inflow = 0.0 total_boundary_outflow = 0.0 for vol_id, edge_id in self.boundary: # Compute normal flow across edge. Since normal vector points # away from triangle, a positive sign means that water # flows *out* from this triangle. momentum = [uh[vol_id, edge_id], vh[vol_id, edge_id]] normal = self.mesh.get_normal(vol_id, edge_id) length = self.mesh.get_edgelength(vol_id, edge_id) normal_flow = num.dot(momentum, normal)*length # Reverse sign so that + is taken to mean inflow # and - means outflow. This is more intuitive. edge_flow = -normal_flow # Tally up inflows and outflows separately if edge_flow > 0: # Flow is inflow total_boundary_inflow += edge_flow else: # Flow is outflow total_boundary_outflow += edge_flow # Tally up flows by boundary tag tag = self.boundary[(vol_id, edge_id)] if tag not in boundary_flows: boundary_flows[tag] = 0.0 boundary_flows[tag] += edge_flow return boundary_flows, total_boundary_inflow, total_boundary_outflow def compute_total_volume(self) -> float: """ Compute total volume (m^3) of water in entire domain """ return self.get_water_volume() def volumetric_balance_statistics(self) -> str: """Create volumetric balance report suitable for printing or logging. """ (boundary_flows, total_boundary_inflow, total_boundary_outflow) = self.compute_boundary_flows() message = '---------------------------\n' message += 'Volumetric balance report:\n' message += 'Note: Boundary fluxes are not exact\n' message += 'See get_boundary_flux_integral for exact computation\n' message += '--------------------------\n' message += 'Total boundary inflow [m^3/s]: %.2f\n' % total_boundary_inflow message += 'Total boundary outflow [m^3/s]: %.2f\n' % total_boundary_outflow message += 'Net boundary flow by tags [m^3/s]\n' for tag in boundary_flows: message += ' %s [m^3/s]: %.2f\n' % (tag, boundary_flows[tag]) message += 'Total net boundary flow [m^3/s]: %.2f\n' % \ (total_boundary_inflow + total_boundary_outflow) message += 'Total volume in domain [m^3]: %.2f\n' % \ self.compute_total_volume() # The go through explicit forcing update and record the rate of change # for stage and # record into forcing_inflow and forcing_outflow. Finally compute # integral of depth to obtain total volume of domain. # FIXME(Ole): This part is not yet done. return message def print_volumetric_balance_statistics(self) -> None: print (self.volumetric_balance_statistics()) def report_water_volume_statistics(self, verbose: bool = True, returnStats: bool = False) -> list[float] | None: """ Compute the volume, boundary flux integral, fractional step volume integral, and their difference If verbose, print a summary If returnStats, return a list with the volume statistics """ from anuga import myid if(self.compute_fluxes_method != 'DE'): if(myid == 0): print('Water_volume_statistics only supported for DE algorithm ') return # Compute the volume Vol = self.get_water_volume() # Compute the boundary flux integral fluxIntegral=self.get_boundary_flux_integral() fracIntegral=self.get_fractional_step_volume_integral() if(verbose and myid==0): print(' ') print(f' Volume V at time {self.get_time()}:', Vol) print(' Boundary Flux integral BF: ', fluxIntegral) print(' (rate + inlet) Fractional Step volume integral FS: ', fracIntegral) print(' V - BF - FS - InitialVolume :', Vol- fluxIntegral -fracIntegral - self.volume_history[0]) print(' ') if returnStats: return [Vol, fluxIntegral, fracIntegral] else: return def report_cells_with_small_local_timestep(self, threshold_depth: float | None = None) -> None: """ Convenience function to print the locations of cells with a small local timestep. Computations are at cell centroids Useful in models with complex meshes, to find ways to speed up the model """ from anuga.parallel import myid, numprocs from anuga.config import g, epsilon if threshold_depth is None: threshold_depth=self.minimum_allowed_height uh = self.quantities['xmomentum'].centroid_values vh = self.quantities['ymomentum'].centroid_values d = self.quantities['stage'].centroid_values - self.quantities['elevation'].centroid_values d = num.maximum(d, threshold_depth) v = ((uh)**2 + (vh)**2)**0.5/d v = v * (d > threshold_depth) for i in range(numprocs): if myid == i: print(' Processor ', myid) gravSpeed = (g * d)**0.5 waveSpeed = abs(v) + gravSpeed localTS = self.radii / num.maximum(waveSpeed, epsilon) controlling_pt_ind = localTS.argmin() print(f' * Smallest LocalTS at time {self.get_time()} is approximately: ', localTS[controlling_pt_ind]) print(' -- Location: ', round(self.centroid_coordinates[controlling_pt_ind,0]+self.geo_reference.xllcorner,2),\ round(self.centroid_coordinates[controlling_pt_ind,1]+self.geo_reference.yllcorner,2)) print(' -+ Speed: ', v[controlling_pt_ind]) print(' -* Gravity_wave_speed', gravSpeed[controlling_pt_ind]) print(' ') barrier() return def diagnose_timestep(self, threshold_dt: float | None = None, top_n: int = 5, threshold_depth: float | None = None, verbose: bool = True) -> dict: """Diagnose CFL-limited timestep and NaN/inf in conserved quantities. Computes the CFL-limiting dt per centroid (radius / wave_speed) and reports the cells driving the global minimum. Also scans stage, xmomentum and ymomentum for NaN / inf, which is the usual signature of an ADER-2 (or any) timestepping blow-up. Intended to be called from inside an evolve loop (e.g. each yieldstep) to localise where and when the solver is about to fail. Parameters ---------- threshold_dt : float, optional If given, a WARNING is emitted whenever the global min local-dt falls below this value. Useful to catch the moment dt collapses. top_n : int Number of worst (smallest-dt) cells per rank to include in the returned dict and printed report. threshold_depth : float, optional Depths below this are clamped when computing speed, to avoid division-by-tiny. Defaults to self.minimum_allowed_height. verbose : bool If True, print a per-rank report. Returns ------- dict Keys: time relative simulation time current_timestep self.timestep (last accepted global dt) cfl self.CFL setting local_min_dt smallest cell-local CFL dt on this rank global_min_dt smallest cell-local CFL dt across all ranks local_max_speed max wave_speed on this rank global_max_speed max wave_speed across all ranks nan_counts {'stage': n, 'xmomentum': n, 'ymomentum': n} first_nan_cell centroid xy of the first NaN/inf cell on this rank, or None worst_cells list of dicts (top_n) with keys {triangle, x, y, depth, speed, local_dt} """ from anuga import numprocs from anuga.config import g, epsilon if threshold_depth is None: threshold_depth = self.minimum_allowed_height stage_c = self.quantities['stage'].centroid_values elev_c = self.quantities['elevation'].centroid_values uh_c = self.quantities['xmomentum'].centroid_values vh_c = self.quantities['ymomentum'].centroid_values bad_stage = ~num.isfinite(stage_c) bad_uh = ~num.isfinite(uh_c) bad_vh = ~num.isfinite(vh_c) nan_counts = { 'stage': int(bad_stage.sum()), 'xmomentum': int(bad_uh.sum()), 'ymomentum': int(bad_vh.sum()), } first_nan_cell = None any_bad = bad_stage | bad_uh | bad_vh if any_bad.any(): i = int(num.argmax(any_bad)) x = self.centroid_coordinates[i, 0] + self.geo_reference.xllcorner y = self.centroid_coordinates[i, 1] + self.geo_reference.yllcorner first_nan_cell = (float(x), float(y), i) # Local CFL dt per cell. Mask NaNs out so argmin/min are meaningful. d = stage_c - elev_c d_safe = num.where(num.isfinite(d), num.maximum(d, threshold_depth), threshold_depth) speed_h = num.sqrt(uh_c * uh_c + vh_c * vh_c) / d_safe speed_h = num.where(num.isfinite(speed_h), speed_h, 0.0) speed_h = speed_h * (d > threshold_depth) grav_speed = num.sqrt(g * d_safe) wave_speed = num.abs(speed_h) + grav_speed local_dt = self.radii / num.maximum(wave_speed, epsilon) # Don't let NaN cells dominate the argmin local_dt = num.where(num.isfinite(local_dt), local_dt, num.inf) local_min_dt = float(local_dt.min()) local_max_speed = float(num.nanmax(wave_speed)) if wave_speed.size else 0.0 order = num.argsort(local_dt)[:max(top_n, 1)] worst_cells = [] for idx in order: i = int(idx) worst_cells.append({ 'triangle': i, 'x': float(self.centroid_coordinates[i, 0] + self.geo_reference.xllcorner), 'y': float(self.centroid_coordinates[i, 1] + self.geo_reference.yllcorner), 'depth': float(d[i]), 'speed': float(wave_speed[i]), 'local_dt': float(local_dt[i]), }) if numprocs > 1: from mpi4py import MPI comm = MPI.COMM_WORLD global_min_dt = comm.allreduce(local_min_dt, op=MPI.MIN) global_max_speed = comm.allreduce(local_max_speed, op=MPI.MAX) global_nan = { k: comm.allreduce(v, op=MPI.SUM) for k, v in nan_counts.items() } else: global_min_dt = local_min_dt global_max_speed = local_max_speed global_nan = dict(nan_counts) result = { 'time': self.get_time(), 'current_timestep': self.timestep, 'cfl': self.CFL, 'local_min_dt': local_min_dt, 'global_min_dt': global_min_dt, 'local_max_speed': local_max_speed, 'global_max_speed': global_max_speed, 'nan_counts': nan_counts, 'global_nan_counts': global_nan, 'first_nan_cell': first_nan_cell, 'worst_cells': worst_cells, } if verbose: from anuga.parallel import myid for i in range(numprocs): if myid == i: print(f' [diagnose_timestep] rank {myid} t={result["time"]:.3f}') print(f' domain.timestep={self.timestep:.6e} CFL={self.CFL}') print(f' local min cell-dt = {local_min_dt:.6e} max wave_speed = {local_max_speed:.4f}') if numprocs > 1: print(f' global min cell-dt = {global_min_dt:.6e} max wave_speed = {global_max_speed:.4f}') total_bad = sum(nan_counts.values()) if total_bad: print(f' NaN/inf cells: stage={nan_counts["stage"]} ' f'xmom={nan_counts["xmomentum"]} ' f'ymom={nan_counts["ymomentum"]}') if first_nan_cell is not None: fx, fy, fi = first_nan_cell print(f' first bad cell: tri={fi} at ({fx:.2f}, {fy:.2f})') for c in worst_cells: print(f' tri {c["triangle"]:>7} ' f'xy=({c["x"]:.2f}, {c["y"]:.2f}) ' f'depth={c["depth"]:.4f} speed={c["speed"]:.4f} ' f'local_dt={c["local_dt"]:.6e}') if threshold_dt is not None and global_min_dt < threshold_dt: print(f' WARNING: global min cell-dt {global_min_dt:.3e} ' f'< threshold {threshold_dt:.3e}') sys.stdout.flush() barrier() return result # ======================================================================= # PETE: NEW METHODS FOR FOR PARALLEL STRUCTURES. Note that we assume the # first "number_of_full_[nodes|triangles]" are full [nodes|triangles] # For full triangles it is possible to enquire self.tri_full_flag == True # ======================================================================= def get_number_of_full_triangles(self, *args, **kwargs) -> int: return self.number_of_full_triangles def get_full_centroid_coordinates(self, *args, **kwargs) -> num.ndarray: C = self.mesh.get_centroid_coordinates(*args, **kwargs) return C[:self.number_of_full_triangles, :] def get_full_vertex_coordinates(self, *args, **kwargs) -> num.ndarray: V = self.mesh.get_vertex_coordinates(*args, **kwargs) return V[:3*self.number_of_full_triangles,:] def get_full_triangles(self, *args, **kwargs) -> num.ndarray: T = self.mesh.get_triangles(*args, **kwargs) return T[:self.number_of_full_triangles,:] def get_full_nodes(self, *args, **kwargs) -> num.ndarray: N = self.mesh.get_nodes(*args, **kwargs) return N[:self.number_of_full_nodes,:] def get_tri_map(self) -> num.ndarray | None: return self.tri_map def get_inv_tri_map(self) -> num.ndarray | None: return self.inv_tri_map # ============================================================================== # Multiprocessor Mode (1=openmp, 2=cupy (in development)) # ============================================================================== # User-facing *per-domain* compute mode. This is the only genuinely # per-domain choice — it selects the internal ``multiprocessor_mode``: # 'legacy' -> mode 1: sw_domain_openmp_ext solver + serial-Python operators # 'unified' -> mode 2: the unified gpu_ext C kernels (solver + operators) # Whether 'unified' runs on CPU or offloads to a GPU is NOT a per-domain # property: it is a *process-global* OpenMP offload setting controlled by # the module function ``set_gpu_offload()`` (and only possible on a GPU # build). On a CPU-only build, 'unified' simply runs CPU-multicore. COMPUTE_MODES = ('legacy', 'unified') def _mode2_mpi_available(self) -> bool: """True if ``sw_domain_gpu_ext`` was built with real C MPI support. Mode 2 ('unified') performs its halo exchange at the C level (``exchange_ghosts``). Without an MPI-enabled build that exchange is a silent no-op, so a multi-rank 'unified' run would compute wrong results — the selector falls back to 'legacy' (Python MPI exchange) in that case. Serial (single-rank) 'unified' needs no MPI and is unaffected. """ try: from anuga.shallow_water import sw_domain_gpu_ext as gpu_ext return bool(gpu_ext.gpu_has_mpi()) except Exception: return False def compute_capabilities(self) -> dict: """Report which compute backends this build/run supports. Returns a dict with: 'gpu_offload' : bool — process can offload mode-2 to a GPU device (build supports it, device present, offload not disabled); see :func:`set_gpu_offload` 'num_gpu_devices' : int — number of offload devices visible 'mpi' : bool — gpu_ext built with C MPI ('unified' parallel ok) 'modes' : list — per-domain modes available ('unified' only when the gpu_ext extension is importable) """ try: from anuga.shallow_water import sw_domain_gpu_ext as gpu_ext # noqa: F401 unified = True ndev = int(gpu_ext.get_num_gpu_devices()) mpi = bool(gpu_ext.gpu_has_mpi()) except Exception: unified, ndev, mpi = False, 0, False modes = ['legacy'] + (['unified'] if unified else []) return {'gpu_offload': gpu_offload_enabled(), 'num_gpu_devices': ndev, 'mpi': mpi, 'modes': modes} def set_compute_mode(self, mode: str = 'unified', verbose: bool = False) -> None: """Select this domain's compute mode (per-domain). Parameters ---------- mode : {'legacy', 'unified'} - ``'legacy'`` — mode 1: the ``sw_domain_openmp_ext`` solver with serial-Python fractional-step operators. - ``'unified'`` — mode 2: the unified ``sw_domain_gpu_ext`` C kernels (solver and operators). Runs CPU-multicore by default; offloads to a GPU only when GPU offload is enabled process-wide via :func:`anuga.set_gpu_offload` on a GPU-capable build. This is a per-domain setting — different domains in one script may use different modes. Whether 'unified' uses a GPU is a separate, process-wide decision (see :func:`set_gpu_offload`), because OpenMP target offload is a process-level runtime setting, not a per-domain one. Under MPI, 'unified' requires a gpu_ext built with MPI; otherwise this falls back to 'legacy' (whose Python MPI exchange is correct in parallel) with a rank-0 warning. The active mode is recorded in ``self.compute_mode``; the original request in ``self.requested_compute_mode``. """ import warnings if mode not in self.COMPUTE_MODES: raise ValueError( f"Invalid compute mode {mode!r}. Must be one of {self.COMPUTE_MODES}.") requested = mode if mode == 'unified': # Parallel guard: 'unified' exchanges ghosts at the C level, a silent # no-op without an MPI-enabled gpu_ext build. Under MPI that gives # wrong results, so fall back to 'legacy' (Python MPI exchange). try: from anuga import numprocs except Exception: numprocs = 1 if numprocs > 1 and not self._mode2_mpi_available(): try: from anuga import myid except Exception: myid = 0 if myid == 0: warnings.warn( f"compute mode 'unified' selected under MPI ({numprocs} ranks) but " "this ANUGA build's sw_domain_gpu_ext was compiled without MPI; the " "C-level ghost exchange would be a silent no-op and give wrong " "parallel results. Falling back to 'legacy' (mode 1, Python MPI " "exchange). Rebuild with MPI to run 'unified' in parallel.", stacklevel=2) mode = 'legacy' self.requested_compute_mode = requested self.compute_mode = mode if mode == 'legacy': self.multiprocessor_mode = MULTIPROCESSOR_OPENMP self.use_c_rk_loop = False else: # 'unified' -> mode 2 (unified gpu_ext kernels) self.multiprocessor_mode = MULTIPROCESSOR_GPU self.use_c_rk_loop = True # Build the device interface now if boundaries are ready; otherwise # defer to the first evolve(). Boundaries are typically set AFTER # construction, so a default-'unified' domain must not require them # at __init__ time. if self._boundaries_ready(): self.set_gpu_interface() if verbose: print(f"Compute mode: requested {requested!r} -> active {self.compute_mode!r} " f"(multiprocessor_mode={self.multiprocessor_mode}, " f"gpu_offload={'on' if gpu_offload_enabled() else 'off'})") def _boundaries_ready(self) -> bool: """True if real boundary objects are set — required to build the gpu_ext device interface. After distribute() boundary_map may be ``{'exterior': None, 'ghost': None}`` (not None but no real boundaries). """ bmap = getattr(self, 'boundary_map', None) return bool(bmap) and any(b is not None for b in bmap.values()) def _ensure_gpu_interface(self) -> None: """Build a deferred mode-2 device interface on demand. For a default-'unified' domain the interface is built lazily (boundaries are set after construction). Build it the first time a mode-2 path needs it. If boundaries are still not set — e.g. a test that pokes the domain without a full boundary setup — fall back to 'legacy' so the operation can proceed rather than hit a None gpu_interface. """ if self.multiprocessor_mode != MULTIPROCESSOR_GPU or self.gpu_interface is not None: return if self._boundaries_ready(): self.set_gpu_interface() else: self.set_compute_mode('legacy') def get_compute_mode(self) -> str: """Return the active per-domain compute mode: 'legacy' or 'unified'.""" return getattr(self, 'compute_mode', 'legacy') def set_multiprocessor_mode(self, multiprocessor_mode: int = 1) -> None: """ Set multiprocessor mode (legacy integer API). 1. openmp - Python RK loop (use_c_rk_loop=False) 2. gpu/mpi - C RK loop (use_c_rk_loop=True) Thin wrapper over :meth:`set_compute_mode`: 1 maps to ``'legacy'``, 2 to ``'unified'``. Whether 'unified' offloads to a GPU is a separate, process-wide choice — see :func:`anuga.set_gpu_offload`. New code should prefer :meth:`set_compute_mode`. """ if multiprocessor_mode not in [MULTIPROCESSOR_OPENMP, MULTIPROCESSOR_GPU]: raise ValueError('Invalid multiprocessor mode. Must be one of [1,2] (openmp, gpu/mpi)') if multiprocessor_mode == MULTIPROCESSOR_OPENMP: self.set_compute_mode('legacy') else: self.set_compute_mode('unified') @property def use_c_rk2_loop(self): """Deprecated: use use_c_rk_loop instead.""" import warnings warnings.warn( "use_c_rk2_loop is deprecated; use use_c_rk_loop instead.", DeprecationWarning, stacklevel=2) return self.use_c_rk_loop @use_c_rk2_loop.setter def use_c_rk2_loop(self, value): import warnings warnings.warn( "use_c_rk2_loop is deprecated; use use_c_rk_loop instead.", DeprecationWarning, stacklevel=2) self.use_c_rk_loop = value def get_multiprocessor_mode(self) -> int: """ Get multiprocessor mode 1. openmp (in development) 2. gpu/mpi (in development) """ return self.multiprocessor_mode
[docs] def set_omp_num_threads(self, omp_num_threads: int | None = None, verbose: bool = True) -> None: """Set the OpenMP thread count (process-wide). OpenMP thread count is a process-level setting, not per-domain. This is a thin wrapper that delegates to the module-level :func:`anuga.set_omp_num_threads`; prefer that in new code. Kept for backward compatibility, and records ``self.omp_num_threads``. """ self.omp_num_threads = set_omp_num_threads(omp_num_threads, verbose=verbose)
@property def gpu(self): """ Shortcut to access the GPU interface. Returns the gpu_interface object which provides FLOP counters and kernel wrappers. Raises AttributeError if GPU mode is not enabled. """ if self.gpu_interface is None: raise AttributeError( "GPU interface not initialized. Call domain.set_multiprocessor_mode(2) first." ) return self.gpu_interface def set_gpu_interface(self): if self.multiprocessor_mode == MULTIPROCESSOR_GPU and self.gpu_interface is None: # Check that boundaries are properly set before GPU initialization # After distribute(), boundary_map may be {'exterior': None, 'ghost': None} # which is not None but has no actual boundary objects - this causes silent failures if self.boundary_map is None: raise RuntimeError( "GPU mode requires boundaries to be set before calling set_multiprocessor_mode(2).\n" "Please call domain.set_boundary({...}) BEFORE domain.set_multiprocessor_mode(2)." ) has_real_boundary = any(b is not None for b in self.boundary_map.values()) if not has_real_boundary: raise RuntimeError( "GPU mode requires boundaries to be set before calling set_multiprocessor_mode(2).\n" "Please call domain.set_boundary({...}) BEFORE domain.set_multiprocessor_mode(2).\n" f"Current boundary_map has no boundary objects: {list(self.boundary_map.keys())}" ) # Try OpenMP target offloading interface first try: from .sw_domain_gpu_omp import GPU_OMP_interface self.gpu_interface = GPU_OMP_interface(self) self.gpu_interface.setup() # Only print from rank 0 from anuga import myid, numprocs omp_num_threads = os.environ.get('OMP_NUM_THREADS', '1') # Offload is a process-wide decision (set_gpu_offload), resolved # against the build. False on a CPU-only build: 'unified' is CPU # multicore, not GPU. self.gpu_offload_active = gpu_offload_enabled() if myid == 0: device_id = self.gpu_interface.gpu_dom.device_id print('+==============================================================================+') if not self.gpu_offload_active: print("| ANUGA compute mode: 'unified' CPU multicore (gpu_ext kernels, no offload) |") print(f'| OMP_NUM_THREADS={omp_num_threads}') elif device_id < 0: print('| WARNING: No GPU devices found, running on CPU via OpenMP target offloading |') else: print(f'| GPU interface initialized: {numprocs} GPU(s) using OpenMP target offloading') print('+==============================================================================+') return except Exception as e: print(f'OpenMP GPU interface not available: {e}') # Fall back to CUDA/CuPy interface try: import cupy as cp test_cupy_array = cp.array([1,2,3]) from .sw_domain_cuda import GPU_interface self.gpu_interface = GPU_interface(self) self.gpu_interface.allocate_gpu_arrays() self.gpu_interface.compile_gpu_kernels() return except Exception: pass # No GPU available from anuga import myid if myid == 0: print('+==============================================================================+') print('| WARNING: GPU not available, falling back to multiprocessor_mode 1 (OpenMP) |') print('+==============================================================================+') self.set_multiprocessor_mode(1)
################################################################################ # End of class Shallow Water Domain ################################################################################ # FIXME (Ole): Does this do anything? def my_update_special_conditions(domain): pass if __name__ == "__main__": pass