Source code for anuga.shallow_water.boundaries

""" Boundary conditions specific to the shallow water wave equation

Title: ANUGA boundaries with dependencies on shallow_water_domain
Author:
    Ole Nielsen, Ole.Nielsen@ga.gov.au
    Stephen Roberts, Stephen.Roberts@anu.edu.au
    Duncan Gray, Duncan.Gray@ga.gov.au
    Gareth Davies, gareth.davies.ga.code@gmail.com
CreationDate: 2010
Description::
    This module contains boundary functions for ANUGA that are specific to the shallow water Domain class.
Constraints: See GPL license in the user guide
Version: 1.0 ($Revision: 7731 $)
ModifiedBy::
    Author: hudson
    Date: 2010-05-18 14:54:05 +1000 (Tue, 18 May 2010)

"""

from anuga.abstract_2d_finite_volumes.generic_boundary_conditions\
     import Boundary, File_boundary
import numpy as np

import anuga.utilities.log as log
from anuga.fit_interpolate.interpolate import Modeltime_too_late
from anuga.fit_interpolate.interpolate import Modeltime_too_early
from anuga.config import g as gravity

from anuga.shallow_water.sw_domain_openmp_ext import rotate, evaluate_reflective_segment

try:
    from numba import jit
except ImportError:
    def jit(nopython=True):
        """Dummy decorator for numba"""
        def dummy_decorator(func):
            return func
        return dummy_decorator


x = np.arange(100).reshape(10, 10)


[docs] class Reflective_boundary(Boundary): """Reflective boundary condition. Returns the same conserved quantities as the neighbour volume edge but with the normal momentum component negated, so the net mass flux through the boundary is zero (wall / no-slip analogue for the shallow-water equations). Parameters ---------- domain : anuga.Domain The domain to which this boundary is attached. Examples -------- >>> import anuga >>> domain = anuga.rectangular_cross_domain(10, 10) >>> Br = anuga.Reflective_boundary(domain) >>> domain.set_boundary({'left': Br, 'right': Br, 'top': Br, 'bottom': Br}) """ def __init__(self, domain=None): """Initialise a reflective boundary. Parameters ---------- domain : anuga.Domain The domain to which this boundary is attached. Raises ------ Exception If *domain* is ``None``. """ Boundary.__init__(self) if domain is None: msg = 'Domain must be specified for reflective boundary' raise Exception(msg) # Handy shorthands self.stage = domain.quantities['stage'].edge_values self.xmom = domain.quantities['xmomentum'].edge_values self.ymom = domain.quantities['ymomentum'].edge_values self.normals = domain.normals self.conserved_quantities = np.zeros(3, float) def __repr__(self): return 'Reflective_boundary' def evaluate(self, vol_id, edge_id): """Calculate BC associated to specified edge :param int vol_id: Triangle ID :param int edge_id: Edge opposite to Vertex ID """ q = self.conserved_quantities q[0] = self.stage[vol_id, edge_id] q[1] = self.xmom[vol_id, edge_id] q[2] = self.ymom[vol_id, edge_id] normal = self.normals[vol_id, 2*edge_id:2*edge_id+2] r = rotate(q, normal, direction = 1) r[1] = -r[1] q = rotate(r, normal, direction = -1) return q @jit(nopython=True) def evaluate_segment(self, domain, segment_edges): """Apply BC on the boundary edges defined by segment_edges :param domain: Apply BC on this domain :param segment_edges: List of boundary cells on which to apply BC """ if segment_edges is None: return if domain is None: return ids = segment_edges vol_ids = domain.boundary_cells[ids] edge_ids = domain.boundary_edges[ids] Stage = domain.quantities['stage'] Elev = domain.quantities['elevation'] Height= domain.quantities['height'] Xmom = domain.quantities['xmomentum'] Ymom = domain.quantities['ymomentum'] Xvel = domain.quantities['xvelocity'] Yvel = domain.quantities['yvelocity'] Normals = domain.normals #print vol_ids #print edge_ids #Normals.reshape((4,3,2)) #print Normals.shape #print Normals[vol_ids, 2*edge_ids] #print Normals[vol_ids, 2*edge_ids+1] n1 = Normals[vol_ids,2*edge_ids] n2 = Normals[vol_ids,2*edge_ids+1] # Transfer these quantities to the boundary array Stage.boundary_values[ids] = Stage.edge_values[vol_ids,edge_ids] Elev.boundary_values[ids] = Elev.edge_values[vol_ids,edge_ids] Height.boundary_values[ids] = Height.edge_values[vol_ids,edge_ids] # Rotate and negate Momemtum q1 = Xmom.edge_values[vol_ids,edge_ids] q2 = Ymom.edge_values[vol_ids,edge_ids] r1 = -q1*n1 - q2*n2 r2 = -q1*n2 + q2*n1 Xmom.boundary_values[ids] = n1*r1 - n2*r2 Ymom.boundary_values[ids] = n2*r1 + n1*r2 # Rotate and negate Velocity q1 = Xvel.edge_values[vol_ids,edge_ids] q2 = Yvel.edge_values[vol_ids,edge_ids] r1 = q1*n1 + q2*n2 r2 = q1*n2 - q2*n1 Xvel.boundary_values[ids] = n1*r1 - n2*r2 Yvel.boundary_values[ids] = n2*r1 + n1*r2 # TODO JLGV, reflective boundary condition needs openmp version # this one first def evaluate_segment(self, domain, segment_edges): """Apply BC on the boundary edges defined by segment_edges :param domain: Apply BC on this domain :param segment_edges: List of boundary cells on which to apply BC """ if segment_edges is None: return if domain is None: return ids = segment_edges vol_ids = domain.boundary_cells[ids] edge_ids = domain.boundary_edges[ids] ids_array = np.array(ids, dtype=np.int64) evaluate_reflective_segment(domain, ids_array, vol_ids, edge_ids)
class Transmissive_momentum_set_stage_boundary(Boundary): """Transmissive momentum boundary with prescribed stage. Returns the same momentum conserved quantities as the neighbour volume edge (transmissive / open condition) while overriding stage with a caller-supplied scalar function of time. Parameters ---------- domain : anuga.Domain The domain to which this boundary is attached. function : callable A function ``f(t)`` returning the stage value at time *t*. May also be a scalar ``int`` or ``float``, in which case the stage is held constant. Examples -------- >>> import anuga >>> domain = anuga.rectangular_cross_domain(10, 10) >>> BC = anuga.Transmissive_momentum_set_stage_boundary(domain, lambda t: 0.5) >>> domain.set_boundary({'left': BC, 'right': BC, 'top': BC, 'bottom': BC}) """ def __init__(self, domain=None, function=None): """Initialise a transmissive-momentum / set-stage boundary. Parameters ---------- domain : anuga.Domain The domain to which this boundary is attached. function : callable or float Stage function ``f(t)`` or a constant value. Raises ------ Exception If *domain* or *function* is ``None``. """ Boundary.__init__(self) if domain is None: msg = 'Domain must be specified for this type boundary' raise Exception(msg) if function is None: msg = 'Function must be specified for this type boundary' raise Exception(msg) self.domain = domain if isinstance(function, (int, float)): tmp = function function = lambda t: tmp self.function = function def __repr__(self): """ Return a representation of this object. """ return 'Transmissive_momentum_set_stage_boundary(%s)' % self.domain def evaluate(self, vol_id, edge_id): """Transmissive momentum set stage boundaries return the edge momentum values of the volume they serve. vol_id is volume id edge_id is the edge within the volume """ q = self.domain.get_conserved_quantities(vol_id, edge = edge_id) t = self.domain.get_time() if hasattr(self.function, 'time'): # Roll boundary over if time exceeds while t > self.function.time[-1]: msg = 'WARNING: domain time %.2f has exceeded' % t msg += 'time provided in ' msg += 'transmissive_momentum_set_stage_boundary object.\n' msg += 'I will continue, reusing the object from t==0' log.info(msg) t -= self.function.time[-1] value = self.function(t) try: x = float(value) except (ValueError, TypeError): x = float(value[0]) q[0] = x return q # FIXME: Consider this (taken from File_boundary) to allow # spatial variation # if vol_id is not None and edge_id is not None: # i = self.boundary_indices[ vol_id, edge_id ] # return self.F(t, point_id = i) # else: # return self.F(t)
[docs] class Transmissive_n_momentum_zero_t_momentum_set_stage_boundary(Boundary): """Transmissive normal momentum, zero tangential momentum, prescribed stage. Returns the same momentum component normal to the boundary as the neighbour volume edge. The tangential momentum component is zeroed. Stage is set by a caller-supplied function of time. Parameters ---------- domain : anuga.Domain The domain to which this boundary is attached. function : callable A function ``f(t)`` returning the stage at time *t*. default_boundary : float, optional Stage value returned when model time exceeds the range of *function*. Default is ``0.0``. Examples -------- >>> import anuga >>> domain = anuga.rectangular_cross_domain(10, 10) >>> BC = anuga.Transmissive_n_momentum_zero_t_momentum_set_stage_boundary( ... domain, lambda t: 0.5) >>> domain.set_boundary({'left': BC, 'right': BC, 'top': BC, 'bottom': BC}) """ def __init__(self, domain=None, function=None, default_boundary=0.0): """Initialise the boundary. Parameters ---------- domain : anuga.Domain The domain to which this boundary is attached. function : callable Stage function ``f(t)``. default_boundary : float, optional Fallback stage when model time is out of range. Default ``0.0``. Raises ------ Exception If *domain* or *function* is ``None``. """ Boundary.__init__(self) if domain is None: msg = 'Domain must be specified for this type boundary' raise Exception(msg) if function is None: msg = 'Function must be specified for this type boundary' raise Exception(msg) self.domain = domain self.function = function self.default_boundary = default_boundary def __repr__(self): """ Return a representation of this instance. """ msg = 'Transmissive_n_momentum_zero_t_momentum_set_stage_boundary' msg += '(%s)' % self.domain return msg def evaluate(self, vol_id, edge_id): """Transmissive_n_momentum_zero_t_momentum_set_stage_boundary return the edge momentum values of the volume they serve. """ q = self.domain.get_conserved_quantities(vol_id, edge = edge_id) normal = self.domain.get_normal(vol_id, edge_id) value = self.get_boundary_values() try: x = float(value) except (ValueError, TypeError): x = float(value[0]) q[0] = x ndotq = (normal[0]*q[1] + normal[1]*q[2]) q[1] = normal[0]*ndotq q[2] = normal[1]*ndotq return q # TODO JLGV, needs openmp version def evaluate_segment(self, domain, segment_edges): """Apply BC on the boundary edges defined by segment_edges :param domain: Apply BC on this domain :param segment_edges: List of boundary cells on which to apply BC Vectorized form for speed. Gareth Davies 14/07/2016 """ Stage = domain.quantities['stage'] Elev = domain.quantities['elevation'] Height= domain.quantities['height'] Xmom = domain.quantities['xmomentum'] Ymom = domain.quantities['ymomentum'] ids = segment_edges vol_ids = domain.boundary_cells[ids] edge_ids = domain.boundary_edges[ids] Normals = domain.normals n1 = Normals[vol_ids,2*edge_ids] n2 = Normals[vol_ids,2*edge_ids+1] # Call the boundary function which returns stage value = self.get_boundary_values() try: x = float(value) except (ValueError, TypeError): x = float(value[0]) # Set stage Stage.boundary_values[ids] = x # Compute flux normal to edge q1 = Xmom.edge_values[vol_ids,edge_ids] q2 = Ymom.edge_values[vol_ids,edge_ids] ndotq = n1*q1 + n2*q2 Xmom.boundary_values[ids] = ndotq * n1 Ymom.boundary_values[ids] = ndotq * n2
class Transmissive_stage_zero_momentum_boundary(Boundary): """Transmissive stage with zero momentum boundary. Copies the stage from the neighbour volume edge while forcing both momentum components to zero. Useful as a simple open boundary where water level is inherited from the interior but no momentum is imposed. Parameters ---------- domain : anuga.Domain The domain to which this boundary is attached. Examples -------- >>> import anuga >>> domain = anuga.rectangular_cross_domain(10, 10) >>> Bt = anuga.Transmissive_stage_zero_momentum_boundary(domain) >>> domain.set_boundary({'left': Bt, 'right': Bt, 'top': Bt, 'bottom': Bt}) """ def __init__(self, domain=None): """Initialise a transmissive-stage / zero-momentum boundary. Parameters ---------- domain : anuga.Domain The domain to which this boundary is attached. Raises ------ Exception If *domain* is ``None``. """ Boundary.__init__(self) if domain is None: msg = ('Domain must be specified for ' 'Transmissive_stage_zero_momentum boundary') raise Exception(msg) self.domain = domain def __repr__(self): """ Return a representation of this instance. """ return 'Transmissive_stage_zero_momentum_boundary(%s)' % self.domain def evaluate(self, vol_id, edge_id): """Calculate transmissive (zero momentum) results. """ q = self.domain.get_conserved_quantities(vol_id, edge=edge_id) q[1] = q[2] = 0.0 return q class Time_stage_zero_momentum_boundary(Boundary): """Time-varying stage boundary with zero momentum. Sets stage as a scalar function of time while holding both momentum components at zero. Parameters ---------- domain : anuga.Domain The domain to which this boundary is attached. function : callable A function ``f(t)`` returning the stage value at model time *t*. Must be convertible to a scalar float. default_boundary : float or None, optional Stage value returned when model time is out of the function's range. ``None`` means raise an exception on out-of-range. verbose : bool, optional If ``True``, emit informational messages. Default ``False``. Examples -------- >>> import anuga >>> domain = anuga.rectangular_cross_domain(10, 10) >>> # 2 m square wave from t=60 s to t=3660 s, zero elsewhere >>> Bt = anuga.Time_stage_zero_momentum_boundary( ... domain, function=lambda t: (60 < t < 3660) * 2.0) >>> domain.set_boundary({'left': Bt, 'right': Bt, 'top': Bt, 'bottom': Bt}) """ def __init__(self, domain=None, #f=None, # Should be removed and replaced by function below function=None, default_boundary=None, verbose=False): """Initialise a time-stage / zero-momentum boundary. Parameters ---------- domain : anuga.Domain The domain to which this boundary is attached. function : callable Stage function ``f(t)``. default_boundary : float or None, optional Fallback stage when model time is out of range. verbose : bool, optional Verbosity flag. Default ``False``. Raises ------ Exception If *domain* or *function* is ``None``, or if *function* cannot be evaluated at ``t=0`` or does not return a scalar float. """ Boundary.__init__(self) self.default_boundary = default_boundary self.default_boundary_invoked = False # Flag self.domain = domain self.verbose = verbose if domain is None: raise Exception('You must specify a domain to Time_stage_zero_momemtum_boundary') if function is None: raise Exception('You must specify a function to Time_stage_zero_momemtum_boundary') try: q = function(0.0) except Exception as e: msg = 'Function for time stage boundary could not be executed:\n%s' %e raise Exception(msg) try: q = float(q) except (ValueError, TypeError): msg = 'Return value from time boundary function could ' msg += 'not be converted into a float.\n' msg += 'I got %s' %str(q) raise Exception(msg) self.f = function self.domain = domain def __repr__(self): return 'Time_stage_zero_momemtum_boundary' def evaluate(self, vol_id=None, edge_id=None): return self.get_boundary_values() def evaluate_segment(self, domain, segment_edges): if segment_edges is None: return if domain is None: return ids = segment_edges vol_ids = domain.boundary_cells[ids] edge_ids = domain.boundary_edges[ids] q_bdry = self.get_boundary_values() #------------------------------------------------- # Now update boundary values #------------------------------------------------- domain.quantities['stage'].boundary_values[ids] = q_bdry domain.quantities['xmomentum'].boundary_values[ids] = 0.0 domain.quantities['ymomentum'].boundary_values[ids] = 0.0 class Characteristic_stage_boundary(Boundary): """Stage boundary using characteristic (Riemann-invariant) extrapolation. Sets the exterior stage via a function of time. Momentum at the boundary is determined from a characteristic decomposition, giving a weakly reflecting open boundary: outgoing waves leave cleanly while incoming waves are set by the prescribed stage. Parameters ---------- domain : anuga.Domain The domain to which this boundary is attached. function : callable A function ``f(t)`` returning the exterior stage at model time *t*. default_stage : float, optional Ambient stage assumed before the wave arrives. Default ``0.0``. Examples -------- >>> import anuga >>> domain = anuga.rectangular_cross_domain(10, 10) >>> Bcs = anuga.Characteristic_stage_boundary(domain, lambda t: 0.1) >>> domain.set_boundary({'left': Bcs, 'right': Bcs, 'top': Bcs, 'bottom': Bcs}) """ def __init__(self, domain=None, function=None, default_stage=0.0): """Initialise a characteristic-stage boundary. Parameters ---------- domain : anuga.Domain The domain to which this boundary is attached. function : callable Exterior stage function ``f(t)``. default_stage : float, optional Ambient stage assumed before the wave. Default ``0.0``. Raises ------ Exception If *domain* or *function* is ``None``. """ #raise Exception('This boundary type is not implemented yet') Boundary.__init__(self) if domain is None: msg = 'Domain must be specified for this type boundary' raise Exception(msg) if function is None: msg = 'Function must be specified for this type boundary' raise Exception(msg) self.domain = domain self.function = function self.default_stage = default_stage self.elev = domain.quantities['elevation'] self.stage = domain.quantities['stage'] #self.height = domain.quantities['height'] self.xmom = domain.quantities['xmomentum'] self.ymom = domain.quantities['ymomentum'] def __repr__(self): """ Return a representation of this instance. """ msg = 'Characteristic_stage_boundary ' msg += '(%s) ' % self.domain msg += '(%s) ' % self.default_stage return msg def evaluate(self, vol_id, edge_id): """Calculate reflections (reverse outward momentum). vol_id edge_id """ t = self.domain.get_time() value = self.function(t) try: w_outside = float(value) except (ValueError, TypeError): w_outside = float(value[0]) q = np.zeros(len(self.conserved_quantities), float) q[0] = self.stage.edge_values[vol_id, edge_id] q[1] = self.xmom.edge_values[vol_id, edge_id] q[2] = self.ymom.edge_values[vol_id, edge_id] elev = self.elev.edge_values[vol_id, edge_id] normal = self.normals[vol_id, 2*edge_id:2*edge_id+2] uh_inside = normal[0]*q[1] + normal[1]*q[2] vh_inside = normal[1]*q[1] - normal[0]*q[2] # use elev as elev both inside and outside h_outside = max(w_outside - elev,0) h_inside = max(q[0] - elev, 0) u_inside = uh_inside/h_inside sqrt_g = gravity**0.5 sqrt_h_inside = h_inside**0.5 sqrt_h_outside = h_outside**0.5 h_m = (0.5*(sqrt_h_inside+sqrt_h_outside) + u_inside/4.0/sqrt_g )**2 u_m = 0.5*u_inside + sqrt_g*(sqrt_h_inside - sqrt_h_outside) uh_m = h_m*u_m vh_m = vh_inside # if uh_inside > 0.0 then outflow if uh_inside > 0.0 : vh_m = vh_inside else: vh_m = 0.0 if h_inside == 0.0: q[0] = w_outside q[1] = 0.0 q[2] = 0.0 else: q[0] = h_m + elev q[1] = uh_m*normal[0] + vh_m*normal[1] q[2] = uh_m*normal[1] - vh_m*normal[0] return q def evaluate_segment(self, domain, segment_edges): """Apply BC on the boundary edges defined by segment_edges """ Stage = domain.quantities['stage'] Elev = domain.quantities['elevation'] Xmom = domain.quantities['xmomentum'] Ymom = domain.quantities['ymomentum'] ids = segment_edges vol_ids = domain.boundary_cells[ids] edge_ids = domain.boundary_edges[ids] Normals = domain.normals n1 = Normals[vol_ids,2*edge_ids] n2 = Normals[vol_ids,2*edge_ids+1] # Get stage value t = self.domain.get_time() value = self.function(t) try: w_outside = float(value) except (ValueError, TypeError): w_outside = float(value[0]) # Transfer these quantities to the boundary array Stage.boundary_values[ids] = Stage.edge_values[vol_ids,edge_ids] Xmom.boundary_values[ids] = Xmom.edge_values[vol_ids,edge_ids] Ymom.boundary_values[ids] = Ymom.edge_values[vol_ids,edge_ids] Elev.boundary_values[ids] = Elev.edge_values[vol_ids,edge_ids] h_inside = np.maximum(Stage.boundary_values[ids]-Elev.boundary_values[ids], 0.0) w_outside = 0.0*Stage.boundary_values[ids] + w_outside # Do vectorized operations here # # In dry cells, the values will be .... q0_dry = np.where(Elev.boundary_values[ids] <= w_outside, w_outside, Elev.boundary_values[ids]) q1_dry = 0.0 * Xmom.boundary_values[ids] q2_dry = 0.0 * Ymom.boundary_values[ids] # # and in wet cells, the values will be ... # (see 'evaluate' method above for more comments on theory, # in particular we assume subcritical flow and a zero outside velocity) # # (note: When cells are dry, this calculation will throw invalid # values, but such values will never be selected to be returned) sqrt_g = gravity**0.5 h_inside = np.maximum(Stage.boundary_values[ids] - Elev.boundary_values[ids], 0) uh_inside = n1 * Xmom.boundary_values[ids] + n2 * Ymom.boundary_values[ids] vh_inside = n2 * Xmom.boundary_values[ids] - n1 * Ymom.boundary_values[ids] u_inside = np.divide(uh_inside, h_inside, out=np.zeros_like(uh_inside), where=h_inside > 0.0) h_outside = np.maximum(w_outside - Elev.boundary_values[ids], 0) sqrt_h_inside = h_inside**0.5 sqrt_h_outside = h_outside**0.5 h_m = (0.5*(sqrt_h_inside+sqrt_h_outside) + u_inside/4.0/sqrt_g )**2 u_m = 0.5*u_inside + sqrt_g*(sqrt_h_inside - sqrt_h_outside) uh_m = h_m*u_m # if uh_inside > 0.0 then outflow vh_m = np.where(uh_inside > 0.0, vh_inside, 0.0) w_m = h_m + Elev.boundary_values[ids] dry_test = np.logical_or(h_inside == 0.0, h_outside == 0.0) q1 = uh_m*n1 + vh_m*n2 q2 = uh_m*n2 - vh_m*n1 Stage.boundary_values[ids] = np.where( dry_test, w_outside, w_m ) Xmom.boundary_values[ids] = np.where( dry_test, 0.0, q1 ) Ymom.boundary_values[ids] = np.where( dry_test, 0.0, q2) class Dirichlet_discharge_boundary(Boundary): """Dirichlet boundary with prescribed stage and inward-normal discharge. Sets stage to a constant *stage0* and momentum in the inward-normal direction to *wh0*. The tangential momentum is always zero. Parameters ---------- domain : anuga.Domain The domain to which this boundary is attached. stage0 : float Prescribed water stage (m). wh0 : float, optional Momentum magnitude in the inward-normal direction (m^2/s). Default is ``0.0`` (stage only, no imposed flow). Examples -------- >>> import anuga >>> domain = anuga.rectangular_cross_domain(10, 10) >>> Bd = anuga.Dirichlet_discharge_boundary(domain, stage0=1.0, wh0=0.5) >>> domain.set_boundary({'left': Bd, 'right': Bd, 'top': Bd, 'bottom': Bd}) """ def __init__(self, domain=None, stage0=None, wh0=None): """Initialise a Dirichlet discharge boundary. Parameters ---------- domain : anuga.Domain The domain to which this boundary is attached. stage0 : float Prescribed stage. wh0 : float, optional Inward-normal momentum. Default ``0.0``. Raises ------ Exception If *domain* or *stage0* is ``None``. """ Boundary.__init__(self) if domain is None: msg = 'Domain must be specified for this type of boundary' raise Exception(msg) if stage0 is None: raise Exception('Stage must be specified for this type of boundary') if wh0 is None: wh0 = 0.0 self.domain = domain self.stage0 = stage0 self.wh0 = wh0 def __repr__(self): """ Return a representation of this instance. """ return 'Dirichlet_Discharge_boundary(%s)' % self.domain def evaluate(self, vol_id, edge_id): """Set discharge in the (inward) normal direction""" normal = self.domain.get_normal(vol_id,edge_id) q = [self.stage0, -self.wh0*normal[0], -self.wh0*normal[1]] return q # FIXME: Consider this (taken from File_boundary) to allow # spatial variation # if vol_id is not None and edge_id is not None: # i = self.boundary_indices[ vol_id, edge_id ] # return self.F(t, point_id = i) # else: # return self.F(t) class Inflow_boundary(Boundary): """Inflow boundary that imposes a volumetric flow rate. Distributes the prescribed flow (m^3/s) uniformly along the boundary segment. Depth and momentum are derived from Manning's formula using the local bed gradient and friction coefficient. .. note:: This class is work in progress and the associated test is disabled. Parameters ---------- domain : anuga.Domain The domain to which this boundary is attached. rate : float, optional Total volumetric inflow rate in m^3/s. Default ``0.0``. Examples -------- >>> import anuga >>> domain = anuga.rectangular_cross_domain(10, 10) >>> Bi = anuga.Inflow_boundary(domain, rate=1.0) >>> domain.set_boundary({'left': Bi, 'right': Bi, 'top': Bi, 'bottom': Bi}) """ # FIXME (Ole): This is work in progress and definitely not finished. # The associated test has been disabled def __init__(self, domain=None, rate=0.0): """Initialise an inflow boundary. Parameters ---------- domain : anuga.Domain The domain to which this boundary is attached. rate : float, optional Volumetric inflow rate (m^3/s). Default ``0.0``. Raises ------ Exception If *domain* is ``None``. """ Boundary.__init__(self) if domain is None: msg = 'Domain must be specified for ' msg += 'Inflow boundary' raise Exception(msg) self.domain = domain # FIXME(Ole): Allow rate to be time dependent as well self.rate = rate self.tag = None # Placeholder for tag associated with this object. def __repr__(self): return 'Inflow_boundary(%s)' %self.domain def evaluate(self, vol_id, edge_id): """Apply inflow rate at each edge of this boundary """ # First find all segments having the same tag is vol_id, edge_id # This will be done the first time evaluate is called. if self.tag is None: boundary = self.domain.boundary self.tag = boundary[(vol_id, edge_id)] # Find total length of boundary with this tag length = 0.0 for v_id, e_id in boundary: if self.tag == boundary[(v_id, e_id)]: length += self.domain.mesh.get_edgelength(v_id, e_id) self.length = length self.average_momentum = self.rate/length # Average momentum has now been established across this boundary # Compute momentum in the inward normal direction inward_normal = -self.domain.mesh.get_normal(vol_id, edge_id) xmomentum, ymomentum = self.average_momentum * inward_normal # Compute depth based on Manning's formula v = 1/n h^{2/3} sqrt(S) # Where v is velocity, n is manning's coefficient, h is depth # and S is the slope into the domain. # Let mu be the momentum (vh), then this equation becomes: # mu = 1/n h^{5/3} sqrt(S) # from which we can isolate depth to get # h = (mu n/sqrt(S) )^{3/5} slope = 0 # get gradient for this triangle dot normal epsilon = 1.0e-12 import math # get manning coef from this triangle friction = self.domain.get_quantity('friction').get_values(\ location='edges', indices=[vol_id])[0] mannings_n = friction[edge_id] if slope > epsilon and mannings_n > epsilon: depth = pow(self.average_momentum * mannings_n/math.sqrt(slope), \ 3.0/5) else: depth = 1.0 # Elevation on this edge z = self.domain.get_quantity('elevation').get_values(\ location='edges', indices=[vol_id])[0] elevation = z[edge_id] # Assign conserved quantities and return q = np.array([elevation + depth, xmomentum, ymomentum], float) return q
[docs] class Field_boundary(Boundary): """Boundary condition driven by an SWW field file with optional stage offset. Reads stage, x-momentum and y-momentum time series from an SWW file and applies them as a boundary condition, linearly interpolating in time. An optional *mean_stage* offset can be added to the stage values, which avoids regenerating the SWW file when running at different tide levels. This is a thin wrapper around :class:`File_boundary`; the only difference is the *mean_stage* offset capability. Parameters ---------- filename : str Path to the SWW file containing stage and momentum time series. domain : anuga.Domain The domain to which this boundary is attached. mean_stage : float, optional Constant offset added to the stage read from the file. Useful for running at different tidal datums without recreating the SWW file. Default ``0.0``. time_thinning : int, optional Read every *time_thinning*-th time step from the file. Larger values speed up model setup at the cost of temporal resolution. Default ``1`` (all steps). time_limit : float or None, optional Stop reading the file after this time (seconds). ``None`` means read to the end. boundary_polygon : list or None, optional Clip the SWW points to this polygon. ``None`` means use all points. default_boundary : float or None, optional Stage returned when model time exceeds the file's time range. ``None`` raises an exception on out-of-range. use_cache : bool, optional Cache the interpolated field function. Default ``False``. verbose : bool, optional Emit progress messages. Default ``False``. Examples -------- >>> import anuga >>> domain = anuga.rectangular_cross_domain(10, 10) >>> Bf = anuga.Field_boundary('boundary.sww', domain, mean_stage=0.5) >>> domain.set_boundary({'left': Bf, 'right': Bf, 'top': Bf, 'bottom': Bf}) """ def __init__(self, filename, domain, mean_stage=0.0, time_thinning=1, time_limit=None, boundary_polygon=None, default_boundary=None, use_cache=False, verbose=False): """Initialise a field boundary. Parameters ---------- filename : str Path to the SWW file. domain : anuga.Domain The domain to which this boundary is attached. mean_stage : float, optional Stage offset (m). Default ``0.0``. time_thinning : int, optional Step stride when reading time steps. Default ``1``. time_limit : float or None, optional Maximum time to read from file. boundary_polygon : list or None, optional Polygon used to clip SWW points. default_boundary : float or None, optional Fallback stage when model time is out of range. use_cache : bool, optional Enable caching. Default ``False``. verbose : bool, optional Verbosity flag. Default ``False``. """ # Create generic file_boundary object self.file_boundary = File_boundary(filename, domain, time_thinning=time_thinning, time_limit=time_limit, boundary_polygon=boundary_polygon, default_boundary=default_boundary, use_cache=use_cache, verbose=verbose) # Record information from File_boundary self.F = self.file_boundary.F self.domain = self.file_boundary.domain # Record mean stage self.mean_stage = mean_stage def __repr__(self): """ Generate a string representation of this instance. """ return 'Field boundary' def evaluate(self, vol_id=None, edge_id=None): """ Calculate 'field' boundary results. vol_id and edge_id are ignored Return linearly interpolated values based on domain.time """ # Evaluate file boundary q = self.file_boundary.evaluate(vol_id, edge_id) # Adjust stage for j, name in enumerate(self.domain.conserved_quantities): if name == 'stage': q[j] += self.mean_stage return q
[docs] class Flather_external_stage_zero_velocity_boundary(Boundary): """Weakly-reflecting open boundary using a Flather-type characteristic approach. Sets the exterior stage via a function of time and assumes zero exterior velocity. Interior values are taken from the domain. The boundary conserved quantities are then computed from characteristic-like variables, making this boundary weakly reflecting — outgoing waves leave with minimal spurious reflection while incoming wave forcing is prescribed. The approach is similar (but not identical) to that described on page 239 of: .. code-block:: bibtex @Article{blayo05, title = {Revisiting open boundary conditions from the point of view of characteristic variables}, author = {Blayo, E. and Debreu, L.}, journal = {Ocean Modelling}, year = {2005}, volume = {9}, pages = {231--252}, } Algorithm --------- 1. The exterior stage is set from *function(t)*; exterior velocity is zero; interior stage and velocity are taken from the domain edge values. 2. Characteristic-like variables are computed depending on whether flow is incoming or outgoing (see Blayo & Debreu 2005). 3. The boundary conserved quantities (stage, x-momentum, y-momentum) are recovered from these characteristic variables. Parameters ---------- domain : anuga.Domain The domain to which this boundary is attached. function : callable A function ``f(t)`` returning the exterior stage at model time *t*. Typically a :func:`~anuga.file_function` time series. default_boundary : float, optional Stage value returned when model time exceeds the range of *function* (e.g. when a file-function time series ends). ``0.0`` corresponds to ambient sea level / no wave forcing. Default ``0.0``. Examples -------- >>> import anuga >>> domain = anuga.rectangular_cross_domain(10, 10) >>> Bf = anuga.Flather_external_stage_zero_velocity_boundary( ... domain, lambda t: 0.1, default_boundary=0.0) >>> domain.set_boundary({'left': Bf, 'right': Bf, 'top': Bf, 'bottom': Bf}) """ def __init__(self, domain=None, function=None, default_boundary=0.0): """Initialise a Flather-type open boundary. Parameters ---------- domain : anuga.Domain The domain to which this boundary is attached. function : callable Exterior stage function ``f(t)``. default_boundary : float, optional Fallback stage when model time is out of range. Default ``0.0``. Raises ------ Exception If *domain* or *function* is ``None``. """ Boundary.__init__(self) if domain is None: msg = 'Domain must be specified for this type boundary' raise Exception(msg) if function is None: msg = 'Function must be specified for this type boundary' raise Exception(msg) self.domain = domain self.function = function self.default_boundary = default_boundary def __repr__(self): """ Return a representation of this instance. """ msg = 'Flather_external_stage_zero_velocity_boundary' msg += '(%s)' % self.domain return msg def evaluate(self, vol_id, edge_id): """ """ q = self.domain.get_conserved_quantities(vol_id, edge = edge_id) bed = self.domain.quantities['elevation'].centroid_values[vol_id] depth_inside=max(q[0]-bed,0.0) dt=self.domain.timestep normal = self.domain.get_normal(vol_id, edge_id) t = self.domain.get_time() value = self.get_boundary_values(t) try: stage_outside = float(value) except (ValueError, TypeError): stage_outside = float(value[0]) if(depth_inside==0.): q[0] = stage_outside q[1] = 0. q[2] = 0. else: # Asssume sub-critical flow. Set the values of the characteristics as # appropriate, depending on whether we have inflow or outflow # These calculations are based on the paper cited above sqrt_g_on_depth_inside = (gravity/depth_inside)**0.5 ndotq_inside = (normal[0]*q[1] + normal[1]*q[2]) # momentum perpendicular to the boundary if(ndotq_inside>0.): # Outflow (assumed subcritical) # Compute characteristics using a particular extrapolation # # Theory: 2 characteristics coming from inside domain, only # need to impose one characteristic from outside # # w1 = u - sqrt(g/depth)*(Stage_outside) -- uses 'outside' info w1 = 0. - sqrt_g_on_depth_inside*stage_outside # w2 = v [velocity parallel to boundary] -- uses 'inside' info w2 = (+normal[1]*q[1] -normal[0]*q[2])/depth_inside # w3 = u + sqrt(g/depth)*(Stage_inside) -- uses 'inside info' w3 = ndotq_inside/depth_inside + sqrt_g_on_depth_inside*q[0] else: # Inflow (assumed subcritical) # Need to set 2 characteristics from outside information # w1 = u - sqrt(g/depth)*(Stage_outside) -- uses 'outside' info w1 = 0. - sqrt_g_on_depth_inside*stage_outside # w2 = v [velocity parallel to boundary] -- uses 'outside' info w2 = 0. # w3 = u + sqrt(g/depth)*(Stage_inside) -- uses 'inside info' w3 = ndotq_inside/depth_inside + sqrt_g_on_depth_inside*q[0] q[0] = (w3-w1)/(2*sqrt_g_on_depth_inside) qperp= (w3+w1)/2.*depth_inside qpar= w2*depth_inside # So q[1], q[2] = qperp*(normal[0], normal[1]) + qpar*(-normal[1], normal[0]) q[1] = qperp*normal[0] + qpar*normal[1] q[2] = qperp*normal[1] - qpar*normal[0] return q def evaluate_segment(self, domain, segment_edges): """Applied in vectorized form for speed. Gareth Davies 14/07/2016 """ Stage = domain.quantities['stage'] Elev = domain.quantities['elevation'] #Height= domain.quantities['height'] Xmom = domain.quantities['xmomentum'] Ymom = domain.quantities['ymomentum'] ids = segment_edges vol_ids = domain.boundary_cells[ids] edge_ids = domain.boundary_edges[ids] Normals = domain.normals n1 = Normals[vol_ids,2*edge_ids] n2 = Normals[vol_ids,2*edge_ids+1] # Get stage value t = self.domain.get_time() value = self.get_boundary_values(t) try: stage_outside = float(value) except (ValueError, TypeError): stage_outside = float(value[0]) # Transfer these quantities to the boundary array Stage.boundary_values[ids] = Stage.edge_values[vol_ids,edge_ids] Xmom.boundary_values[ids] = Xmom.edge_values[vol_ids,edge_ids] Ymom.boundary_values[ids] = Ymom.edge_values[vol_ids,edge_ids] Elev.boundary_values[ids] = Elev.edge_values[vol_ids,edge_ids] bed = Elev.centroid_values[vol_ids] depth_inside = np.maximum(Stage.boundary_values[ids]-bed, 0.0) stage_outside = 0.0*Stage.boundary_values[ids] + stage_outside # Do vectorized operations here # # In dry cells, the values will be .... q0_dry = np.where(bed <= stage_outside, stage_outside, Elev.boundary_values[ids]) q1_dry = 0.0 * Xmom.boundary_values[ids] q2_dry = 0.0 * Ymom.boundary_values[ids] # # and in wet cells, the values will be ... # (see 'evaluate' method above for more comments on theory, # in particular we assume subcritical flow and a zero outside velocity) # # (note: When cells are dry, this calculation will throw invalid # values, but such values will never be selected to be returned) with np.errstate(invalid='ignore', divide='ignore'): sqrt_g_on_depth_inside = (gravity/depth_inside)**0.5 ndotq_inside = (n1 * Xmom.boundary_values[ids] + n2 * Ymom.boundary_values[ids]) # w1 = u - sqrt(g/depth)*(Stage_outside) -- uses 'outside' info w1 = 0.0 - sqrt_g_on_depth_inside * stage_outside # w2 = v [velocity parallel to boundary] -- uses 'inside' or 'outside' # info as required w2 = np.where(ndotq_inside > 0.0, (n2 * Xmom.boundary_values[ids] - n1 * Ymom.boundary_values[ids])/depth_inside, 0.0 * ndotq_inside) # w3 = u + sqrt(g/depth)*(Stage_inside) -- uses 'inside info' w3 = ndotq_inside/depth_inside + sqrt_g_on_depth_inside*Stage.boundary_values[ids] q0_wet = (w3 - w1)/(2.0 * sqrt_g_on_depth_inside) qperp = (w3 + w1)/2.0 * depth_inside qpar = w2 * depth_inside q1_wet = qperp * n1 + qpar * n2 q2_wet = qperp * n2 - qpar * n1 dry_test = np.logical_or(depth_inside == 0.0, stage_outside > bed) Stage.boundary_values[ids] = np.where( dry_test, q0_dry, q0_wet) Xmom.boundary_values[ids] = np.where( dry_test, q1_dry, q1_wet) Ymom.boundary_values[ids] = np.where( dry_test, q2_dry, q2_wet)
[docs] class Absorbing_wave_boundary(Boundary): """Active-absorption open boundary with prescribed incoming wave. Simultaneously prescribes an incoming wave and absorbs outgoing (reflected) waves by setting the ghost-cell stage to:: stage_ghost = 2 * wave(t) - stage_interior so that the boundary face always sees exactly ``wave(t)`` regardless of what is propagating back from inside the domain. The ghost normal momentum is set to ``ghost_depth × interior_velocity`` (velocity- preserving transmissive): this keeps the ghost velocity bounded when ghost depth is small (e.g. after bed-clamping) and gives the correct incoming-wave energy flux. Zero ghost momentum would halve the incoming wave amplitude. Tangential momentum is zeroed. When the ghost cell is dry (bed-clamped to zero depth), both momentum components are set to zero to avoid a pathological dry-ghost state in the Riemann solver. This is the numerical equivalent of an *active-absorption wavemaker* used in physical wave-tank experiments: the paddle continuously adjusts to cancel any reflected wave arriving at the boundary while still generating the desired incoming signal. .. note:: Absorption efficiency depends on the phase of the standing-wave cycle. At a standing-wave antinode (``u ≈ 0`` at the boundary) the net momentum flux is zero and energy drains slowly over multiple wave periods. For highly reflective short-domain problems (e.g. the Okushiri benchmark) this is still preferable to a pure Flather condition, which can exhibit 2× stage amplification at the boundary gauge. Parameters ---------- domain : anuga.Domain The domain to which this boundary is attached. function : callable A function ``f(t)`` returning the prescribed stage at model time *t*. Typically a :func:`~anuga.file_function` time series. default_boundary : float, optional Stage returned when model time exceeds the range of *function*. Defaults to ``0.0``. Examples -------- >>> import anuga >>> domain = anuga.rectangular_cross_domain(10, 10) >>> Ba = anuga.Absorbing_wave_boundary(domain, lambda t: 0.1 * (t < 5)) >>> domain.set_boundary({'left': Ba, 'right': Ba, 'top': Ba, 'bottom': Ba}) """ def __init__(self, domain=None, function=None, default_boundary=0.0): """Initialise an active-absorption open boundary. Parameters ---------- domain : anuga.Domain The domain to which this boundary is attached. function : callable Prescribed stage function ``f(t)``. default_boundary : float, optional Fallback stage when model time is out of range. Default ``0.0``. Raises ------ Exception If *domain* or *function* is ``None``. """ Boundary.__init__(self) if domain is None: raise Exception('Domain must be specified for this type boundary') if function is None: raise Exception('Function must be specified for this type boundary') self.domain = domain self.function = function self.default_boundary = default_boundary def __repr__(self): return 'Absorbing_wave_boundary(%s)' % self.domain def evaluate(self, vol_id, edge_id): """Return ghost-cell state for active-absorption boundary.""" q = self.domain.get_conserved_quantities(vol_id, edge=edge_id) bed = self.domain.quantities['elevation'].centroid_values[vol_id] depth_inside = max(q[0] - bed, 0.0) t = self.domain.get_time() value = self.get_boundary_values(t) try: wave = float(value) except (ValueError, TypeError): wave = float(value[0]) normal = self.domain.get_normal(vol_id, edge_id) if depth_inside == 0.0: # Dry interior: prescribe wave stage directly q[0] = wave q[1] = 0.0 q[2] = 0.0 else: # Active absorption: ghost stage ensures face = wave(t). # Clamp to bed so the ghost depth is never negative — this # can occur when a large reflected wave makes stage_interior # exceed 2*wave(t). ghost_stage = max(2.0 * wave - q[0], bed) ghost_depth = ghost_stage - bed # Interior velocity (bounded by depth_inside > 0 guard above) u_int = q[1] / depth_inside v_int = q[2] / depth_inside q[0] = ghost_stage if ghost_depth > 0.0: # Ghost momentum = ghost_depth × interior_velocity (normal # component only, tangential zeroed). Preserving velocity # (not raw momentum) keeps the ghost velocity bounded when # ghost_depth << interior_depth; raw transmissive momentum # (ghost_hu = interior_hu) causes ghost velocity → ∞ as # ghost_depth → 0. For deep boundaries (e.g. Okushiri) # ghost_depth ≈ interior_depth and the result is identical. ndotu = normal[0] * u_int + normal[1] * v_int q[1] = ghost_depth * ndotu * normal[0] q[2] = ghost_depth * ndotu * normal[1] else: q[1] = 0.0 q[2] = 0.0 return q def evaluate_segment(self, domain, segment_edges): """Vectorised form for speed.""" Stage = domain.quantities['stage'] Elev = domain.quantities['elevation'] Xmom = domain.quantities['xmomentum'] Ymom = domain.quantities['ymomentum'] ids = segment_edges vol_ids = domain.boundary_cells[ids] edge_ids = domain.boundary_edges[ids] Normals = domain.normals n1 = Normals[vol_ids, 2 * edge_ids] n2 = Normals[vol_ids, 2 * edge_ids + 1] t = self.domain.get_time() value = self.get_boundary_values(t) try: wave_val = float(value) except (ValueError, TypeError): wave_val = float(value[0]) stage_interior = Stage.edge_values[vol_ids, edge_ids] bed = Elev.centroid_values[vol_ids] depth_inside = np.maximum(stage_interior - bed, 0.0) # Active absorption ghost stage; clamp to bed to prevent negative # ghost depth when a large reflected wave makes stage_interior > 2*wave raw_ghost = np.where(depth_inside > 0.0, 2.0 * wave_val - stage_interior, wave_val) stage_ghost = np.maximum(raw_ghost, bed) ghost_depth = stage_ghost - bed Stage.boundary_values[ids] = stage_ghost # Ghost momentum = ghost_depth × interior_velocity (normal component # only, tangential zeroed). Preserving velocity (not raw momentum) # keeps ghost velocity bounded when ghost_depth → 0 (bed-clamped); # raw transmissive momentum (ghost_hu = interior_hu) would cause # ghost velocity → ∞. Zero ghost velocity when either side is dry. both_wet = (depth_inside > 0.0) & (ghost_depth > 0.0) safe_depth = np.where(depth_inside > 0.0, depth_inside, 1.0) q1 = Xmom.edge_values[vol_ids, edge_ids] q2 = Ymom.edge_values[vol_ids, edge_ids] u_int = q1 / safe_depth v_int = q2 / safe_depth ndotu = n1 * u_int + n2 * v_int Xmom.boundary_values[ids] = np.where(both_wet, ghost_depth * ndotu * n1, 0.0) Ymom.boundary_values[ids] = np.where(both_wet, ghost_depth * ndotu * n2, 0.0)
[docs] class Characteristic_wave_boundary(Boundary): """Nonlinear characteristic open boundary with prescribed incoming wave. Prescribes the incoming Riemann invariant from a wave perturbation and extrapolates the outgoing Riemann invariant from the interior, solving the characteristic equations exactly (no linearisation). The ghost state is derived from:: c_ghost = (v_n_int + 2*c_int - 2*c0 + 4*c_wave) / 4 v_n_ghost = c0 - 2*c_wave + v_n_int/2 + c_int h_ghost = c_ghost**2 / g stage_ghost = h_ghost + bed where *c0* = sqrt(g*h0) is the background wave speed (computed from ``background_stage`` at each cell), *c_wave* = sqrt(g*h_wave) is the prescribed wave speed, and *v_n* is the outward-normal velocity. The wave function returns a **perturbation** from ``background_stage`` (not an absolute stage). This is important: the background depth h0 = ``background_stage`` − bed is used to set the incoming Riemann invariant assuming no outgoing wave at the exterior. Compared to :class:`Absorbing_wave_boundary`: * Better for large-amplitude waves (η ~ h) where linearisation error in the Flather form is significant. * The face stage is not guaranteed to equal wave(t) exactly; the characteristic condition controls energy transport rather than stage directly. * For small-amplitude waves (η ≪ h) both classes give similar results. Parameters ---------- domain : anuga.Domain The domain to which this boundary is attached. function : callable ``f(t)`` returning the stage *perturbation* at time *t*. background_stage : float, optional Still-water stage around which the perturbation is measured. Defaults to ``0.0`` (sea level). default_boundary : float, optional Perturbation returned when model time is out of range. Default ``0.0``. """ def __init__(self, domain=None, function=None, background_stage=0.0, default_boundary=0.0): Boundary.__init__(self) if domain is None: raise Exception('Domain must be specified for this type boundary') if function is None: raise Exception('Function must be specified for this type boundary') self.domain = domain self.function = function self.background_stage = float(background_stage) self.default_boundary = default_boundary def __repr__(self): return 'Characteristic_wave_boundary(%s)' % self.domain def evaluate(self, vol_id, edge_id): """Return ghost-cell state from nonlinear characteristic decomposition.""" q = self.domain.get_conserved_quantities(vol_id, edge=edge_id) bed = self.domain.quantities['elevation'].centroid_values[vol_id] depth_inside = max(q[0] - bed, 0.0) normal = self.domain.get_normal(vol_id, edge_id) t = self.domain.get_time() value = self.get_boundary_values(t) try: perturb = float(value) except (ValueError, TypeError): perturb = float(value[0]) stage_wave = self.background_stage + perturb h0 = max(self.background_stage - bed, 0.0) c0 = (gravity * max(h0, 1e-10)) ** 0.5 h_wave = max(stage_wave - bed, 0.0) c_wave = (gravity * max(h_wave, 1e-10)) ** 0.5 if depth_inside == 0.0: q[0] = stage_wave q[1] = 0.0 q[2] = 0.0 else: u_int = q[1] / depth_inside v_int = q[2] / depth_inside c_int = (gravity * depth_inside) ** 0.5 v_n_int = normal[0] * u_int + normal[1] * v_int c_ghost = max((v_n_int + 2.0*c_int - 2.0*c0 + 4.0*c_wave) / 4.0, 0.0) v_n_ghost = c0 - 2.0*c_wave + 0.5*v_n_int + c_int h_ghost = c_ghost**2 / gravity q[0] = h_ghost + bed if c_ghost > 0.0: q[1] = h_ghost * v_n_ghost * normal[0] q[2] = h_ghost * v_n_ghost * normal[1] else: q[1] = 0.0 q[2] = 0.0 return q def evaluate_segment(self, domain, segment_edges): """Vectorised form for speed.""" Stage = domain.quantities['stage'] Elev = domain.quantities['elevation'] Xmom = domain.quantities['xmomentum'] Ymom = domain.quantities['ymomentum'] ids = segment_edges vol_ids = domain.boundary_cells[ids] edge_ids = domain.boundary_edges[ids] Normals = domain.normals n1 = Normals[vol_ids, 2 * edge_ids] n2 = Normals[vol_ids, 2 * edge_ids + 1] t = self.domain.get_time() value = self.get_boundary_values(t) try: perturb = float(value) except (ValueError, TypeError): perturb = float(value[0]) bed = Elev.centroid_values[vol_ids] stage_wave = self.background_stage + perturb # Background wave speed from still-water depth at each cell h0 = np.maximum(self.background_stage - bed, 0.0) c0 = np.sqrt(gravity * np.maximum(h0, 1e-10)) # Prescribed wave h_wave = np.maximum(stage_wave - bed, 0.0) c_wave = np.sqrt(gravity * np.maximum(h_wave, 1e-10)) # Interior state stage_interior = Stage.edge_values[vol_ids, edge_ids] depth_inside = np.maximum(stage_interior - bed, 0.0) safe_depth = np.where(depth_inside > 0.0, depth_inside, 1.0) q1 = Xmom.edge_values[vol_ids, edge_ids] q2 = Ymom.edge_values[vol_ids, edge_ids] c_int = np.sqrt(gravity * safe_depth) v_n_int = (n1 * q1 + n2 * q2) / safe_depth # Nonlinear characteristic solution c_ghost = np.maximum((v_n_int + 2.0*c_int - 2.0*c0 + 4.0*c_wave) / 4.0, 0.0) v_n_ghost = c0 - 2.0*c_wave + 0.5*v_n_int + c_int h_ghost = c_ghost**2 / gravity stage_ghost = h_ghost + bed wet = depth_inside > 0.0 Stage.boundary_values[ids] = np.where(wet, stage_ghost, stage_wave) Xmom.boundary_values[ids] = np.where(wet & (c_ghost > 0.0), h_ghost * v_n_ghost * n1, 0.0) Ymom.boundary_values[ids] = np.where(wet & (c_ghost > 0.0), h_ghost * v_n_ghost * n2, 0.0)