""" Boundary conditions specific to the shallow water wave equation
Title: ANUGA boundaries with dependencies on shallow_water_domain
Author:
Ole Nielsen, Ole.Nielsen@ga.gov.au
Stephen Roberts, Stephen.Roberts@anu.edu.au
Duncan Gray, Duncan.Gray@ga.gov.au
Gareth Davies, gareth.davies.ga.code@gmail.com
CreationDate: 2010
Description::
This module contains boundary functions for ANUGA that are specific to the shallow water Domain class.
Constraints: See GPL license in the user guide
Version: 1.0 ($Revision: 7731 $)
ModifiedBy::
Author: hudson
Date: 2010-05-18 14:54:05 +1000 (Tue, 18 May 2010)
"""
from anuga.abstract_2d_finite_volumes.generic_boundary_conditions\
import Boundary, File_boundary
import numpy as num
import anuga.utilities.log as log
from anuga.fit_interpolate.interpolate import Modeltime_too_late
from anuga.fit_interpolate.interpolate import Modeltime_too_early
from anuga.config import g as gravity
from .shallow_water_ext import rotate
#from anuga.utilities import compile
#if compile.can_use_C_extension('shallow_water_ext.c'):
# # Underlying C implementations can be accessed
# from shallow_water_ext import rotate
#else:
# msg = 'C implementations could not be accessed by %s.\n ' % __file__
# msg += 'Make sure compile_all.py has been run as described in '
# msg += 'the ANUGA installation guide.'
# raise Exception, msg
[docs]class Reflective_boundary(Boundary):
"""Reflective boundary condition object
Reflective boundary returns same conserved quantities as
those present in its neighbour volume but with normal momentum reflected.
"""
[docs] def __init__(self, domain=None):
"""Create boundary condition object
:param domain: domain on which to apply BC
Example:
Set all the tagged boundaries to use the Reflective boundaries
>>> domain = anuga.rectangular_cross_domain(10, 10)
>>> BC = anuga.Reflective_boundary(domain)
>>> domain.set_boundary({'left': BC, 'right': BC, 'top': BC, 'bottom': BC})
"""
Boundary.__init__(self)
if domain is None:
msg = 'Domain must be specified for reflective boundary'
raise Exception(msg)
# Handy shorthands
self.stage = domain.quantities['stage'].edge_values
self.xmom = domain.quantities['xmomentum'].edge_values
self.ymom = domain.quantities['ymomentum'].edge_values
self.normals = domain.normals
self.conserved_quantities = num.zeros(3, float)
def __repr__(self):
return 'Reflective_boundary'
def evaluate(self, vol_id, edge_id):
"""Calculate BC associated to specified edge
:param int vol_id: Triangle ID
:param int edge_id: Edge opposite to Vertex ID
"""
q = self.conserved_quantities
q[0] = self.stage[vol_id, edge_id]
q[1] = self.xmom[vol_id, edge_id]
q[2] = self.ymom[vol_id, edge_id]
normal = self.normals[vol_id, 2*edge_id:2*edge_id+2]
r = rotate(q, normal, direction = 1)
r[1] = -r[1]
q = rotate(r, normal, direction = -1)
return q
def evaluate_segment(self, domain, segment_edges):
"""Apply BC on the boundary edges defined by segment_edges
:param domain: Apply BC on this domain
:param segment_edges: List of boundary cells on which to apply BC
"""
if segment_edges is None:
return
if domain is None:
return
ids = segment_edges
vol_ids = domain.boundary_cells[ids]
edge_ids = domain.boundary_edges[ids]
Stage = domain.quantities['stage']
Elev = domain.quantities['elevation']
Height= domain.quantities['height']
Xmom = domain.quantities['xmomentum']
Ymom = domain.quantities['ymomentum']
Xvel = domain.quantities['xvelocity']
Yvel = domain.quantities['yvelocity']
Normals = domain.normals
#print vol_ids
#print edge_ids
#Normals.reshape((4,3,2))
#print Normals.shape
#print Normals[vol_ids, 2*edge_ids]
#print Normals[vol_ids, 2*edge_ids+1]
n1 = Normals[vol_ids,2*edge_ids]
n2 = Normals[vol_ids,2*edge_ids+1]
# Transfer these quantities to the boundary array
Stage.boundary_values[ids] = Stage.edge_values[vol_ids,edge_ids]
Elev.boundary_values[ids] = Elev.edge_values[vol_ids,edge_ids]
Height.boundary_values[ids] = Height.edge_values[vol_ids,edge_ids]
# Rotate and negate Momemtum
q1 = Xmom.edge_values[vol_ids,edge_ids]
q2 = Ymom.edge_values[vol_ids,edge_ids]
r1 = -q1*n1 - q2*n2
r2 = -q1*n2 + q2*n1
Xmom.boundary_values[ids] = n1*r1 - n2*r2
Ymom.boundary_values[ids] = n2*r1 + n1*r2
# Rotate and negate Velocity
q1 = Xvel.edge_values[vol_ids,edge_ids]
q2 = Yvel.edge_values[vol_ids,edge_ids]
r1 = q1*n1 + q2*n2
r2 = q1*n2 - q2*n1
Xvel.boundary_values[ids] = n1*r1 - n2*r2
Yvel.boundary_values[ids] = n2*r1 + n1*r2
[docs]class Transmissive_momentum_set_stage_boundary(Boundary):
""" Bounday condition object that returns transmissive momentum and sets stage
Returns same momentum conserved quantities as
those present in its neighbour volume.
Sets stage by specifying a function f of time which may either be a
vector function or a scalar function
"""
[docs] def __init__(self, domain=None, function=None):
"""Create boundary condition object.
:param domain: domain on which to apply BC
:param function: function to set stage
Example: Set all the tagged boundaries to use the
>>> domain = anuga.rectangular_cross_domain(10, 10)
>>> def waveform(t):
>>> return sea_level + normalized_amplitude/cosh(t-25)**2
>>> BC = anuga.Transmissive_momentum_set_stage_boundary(domain, waveform)
>>> domain.set_boundary({'left': BC, 'right': BC, 'top': BC, 'bottom': BC})
"""
Boundary.__init__(self)
if domain is None:
msg = 'Domain must be specified for this type boundary'
raise Exception(msg)
if function is None:
msg = 'Function must be specified for this type boundary'
raise Exception(msg)
self.domain = domain
if isinstance(function, (int, float)):
tmp = function
function = lambda t: tmp
self.function = function
def __repr__(self):
""" Return a representation of this object. """
return 'Transmissive_momentum_set_stage_boundary(%s)' % self.domain
def evaluate(self, vol_id, edge_id):
"""Transmissive momentum set stage boundaries return the edge momentum
values of the volume they serve.
vol_id is volume id
edge_id is the edge within the volume
"""
q = self.domain.get_conserved_quantities(vol_id, edge = edge_id)
t = self.domain.get_time()
if hasattr(self.function, 'time'):
# Roll boundary over if time exceeds
while t > self.function.time[-1]:
msg = 'WARNING: domain time %.2f has exceeded' % t
msg += 'time provided in '
msg += 'transmissive_momentum_set_stage_boundary object.\n'
msg += 'I will continue, reusing the object from t==0'
log.critical(msg)
t -= self.function.time[-1]
value = self.function(t)
try:
x = float(value)
except:
x = float(value[0])
q[0] = x
return q
# FIXME: Consider this (taken from File_boundary) to allow
# spatial variation
# if vol_id is not None and edge_id is not None:
# i = self.boundary_indices[ vol_id, edge_id ]
# return self.F(t, point_id = i)
# else:
# return self.F(t)
[docs]class Transmissive_n_momentum_zero_t_momentum_set_stage_boundary(Boundary):
"""Bounday condition object that returns transmissive normal momentum and sets stage
Returns the same normal momentum as that
present in neighbour volume edge. Zero out the tangential momentum.
Sets stage by specifying a function f of time which may either be a
vector function or a scalar function
"""
[docs] def __init__(self, domain=None, function=None, default_boundary=0.0):
"""Create boundary condition object.
:param domain: domain on which to apply BC
:param function: function to set stage
:param float default_boundary:
Example: Set all the tagged boundaries to use the BC
>>> domain = anuga.rectangular_cross_domain(10, 10)
>>> def waveform(t):
>>> return sea_level + normalized_amplitude/cosh(t-25)**2
>>> BC = anuga.Transmissive_n_momentum_zero_t_momentum_set_stage_boundary(domain, waveform)
>>> domain.set_boundary({'left': BC, 'right': BC, 'top': BC, 'bottom': BC})
"""
Boundary.__init__(self)
if domain is None:
msg = 'Domain must be specified for this type boundary'
raise Exception(msg)
if function is None:
msg = 'Function must be specified for this type boundary'
raise Exception(msg)
self.domain = domain
self.function = function
self.default_boundary = default_boundary
def __repr__(self):
""" Return a representation of this instance. """
msg = 'Transmissive_n_momentum_zero_t_momentum_set_stage_boundary'
msg += '(%s)' % self.domain
return msg
def evaluate(self, vol_id, edge_id):
"""Transmissive_n_momentum_zero_t_momentum_set_stage_boundary
return the edge momentum values of the volume they serve.
"""
q = self.domain.get_conserved_quantities(vol_id, edge = edge_id)
normal = self.domain.get_normal(vol_id, edge_id)
## t = self.domain.get_time()
## if hasattr(self.function, 'time'):
## # Roll boundary over if time exceeds
## while t > self.function.time[-1]:
## msg = 'WARNING: domain time %.2f has exceeded' % t
## msg += 'time provided in '
## msg += 'transmissive_momentum_set_stage_boundary object.\n'
## msg += 'I will continue, reusing the object from t==0'
## log.critical(msg)
## t -= self.function.time[-1]
## value = self.function(t)
## try:
## x = float(value)
## except:
## x = float(value[0])
value = self.get_boundary_values()
try:
x = float(value)
except:
x = float(value[0])
q[0] = x
ndotq = (normal[0]*q[1] + normal[1]*q[2])
q[1] = normal[0]*ndotq
q[2] = normal[1]*ndotq
return q
def evaluate_segment(self, domain, segment_edges):
"""Apply BC on the boundary edges defined by segment_edges
:param domain: Apply BC on this domain
:param segment_edges: List of boundary cells on which to apply BC
Vectorized form for speed. Gareth Davies 14/07/2016
"""
Stage = domain.quantities['stage']
#Elev = domain.quantities['elevation']
#Height= domain.quantities['height']
Xmom = domain.quantities['xmomentum']
Ymom = domain.quantities['ymomentum']
ids = segment_edges
vol_ids = domain.boundary_cells[ids]
edge_ids = domain.boundary_edges[ids]
Normals = domain.normals
n1 = Normals[vol_ids,2*edge_ids]
n2 = Normals[vol_ids,2*edge_ids+1]
# Call the boundary function which returns stage
value = self.get_boundary_values()
try:
x = float(value)
except:
x = float(value[0])
# Set stage
Stage.boundary_values[ids] = x
# Compute flux normal to edge
q1 = Xmom.edge_values[vol_ids,edge_ids]
q2 = Ymom.edge_values[vol_ids,edge_ids]
ndotq = n1*q1 + n2*q2
Xmom.boundary_values[ids] = ndotq * n1
Ymom.boundary_values[ids] = ndotq * n2
[docs]class Transmissive_stage_zero_momentum_boundary(Boundary):
"""BC where stage is same as neighbour volume and momentum to zero.
Underlying domain must be specified when boundary is instantiated
"""
[docs] def __init__(self, domain=None):
""" Instantiate a Transmissive (zero momentum) boundary. """
Boundary.__init__(self)
if domain is None:
msg = ('Domain must be specified for '
'Transmissive_stage_zero_momentum boundary')
raise Exception(msg)
self.domain = domain
def __repr__(self):
""" Return a representation of this instance. """
return 'Transmissive_stage_zero_momentum_boundary(%s)' % self.domain
def evaluate(self, vol_id, edge_id):
"""Calculate transmissive (zero momentum) results. """
q = self.domain.get_conserved_quantities(vol_id, edge=edge_id)
q[1] = q[2] = 0.0
return q
[docs]class Time_stage_zero_momentum_boundary(Boundary):
"""Time dependent boundary returns values for stage
conserved quantities as a function of time.
Must specify domain to get access to model time and a function of t
which must return conserved stage quantities as a function time.
Example:
B = Time_stage_zero_momentum_boundary(domain,
function=lambda t: (60<t<3660)*2)
This will produce a boundary condition with is a 2m high square wave
starting 60 seconds into the simulation and lasting one hour.
Momentum applied will be 0 at all times.
"""
[docs] def __init__(self, domain=None,
#f=None, # Should be removed and replaced by function below
function=None,
default_boundary=None,
verbose=False):
Boundary.__init__(self)
self.default_boundary = default_boundary
self.default_boundary_invoked = False # Flag
self.domain = domain
self.verbose = verbose
if domain is None:
raise Exception('You must specify a domain to Time_stage_zero_momemtum_boundary')
if function is None:
raise Exception('You must specify a function to Time_stage_zero_momemtum_boundary')
try:
q = function(0.0)
except Exception as e:
msg = 'Function for time stage boundary could not be executed:\n%s' %e
raise Exception(msg)
try:
q = float(q)
except:
msg = 'Return value from time boundary function could '
msg += 'not be converted into a float.\n'
msg += 'I got %s' %str(q)
raise Exception(msg)
self.f = function
self.domain = domain
def __repr__(self):
return 'Time_stage_zero_momemtum_boundary'
def evaluate(self, vol_id=None, edge_id=None):
return self.get_boundary_values()
def evaluate_segment(self, domain, segment_edges):
if segment_edges is None:
return
if domain is None:
return
ids = segment_edges
vol_ids = domain.boundary_cells[ids]
edge_ids = domain.boundary_edges[ids]
q_bdry = self.get_boundary_values()
#-------------------------------------------------
# Now update boundary values
#-------------------------------------------------
domain.quantities['stage'].boundary_values[ids] = q_bdry
domain.quantities['xmomentum'].boundary_values[ids] = 0.0
domain.quantities['ymomentum'].boundary_values[ids] = 0.0
class Characteristic_stage_boundary(Boundary):
"""Sets the stage via a function and the momentum is determined
via assumption of simple incoming wave (uses Riemann invariant)
Example:
def waveform(t):
return sea_level + normalized_amplitude/cosh(t-25)**2
Bcs = Characteristic_stage_boundary(domain, waveform)
Underlying domain must be specified when boundary is instantiated
"""
def __init__(self, domain=None, function=None, default_stage = 0.0):
""" Instantiate a
Characteristic_stage_boundary.
domain is the domain containing the boundary
function is the function to apply the wave
default_stage is the assumed stage pre the application of wave
"""
#raise Exception('This boundary type is not implemented yet')
Boundary.__init__(self)
if domain is None:
msg = 'Domain must be specified for this type boundary'
raise Exception(msg)
if function is None:
msg = 'Function must be specified for this type boundary'
raise Exception(msg)
self.domain = domain
self.function = function
self.default_stage = default_stage
self.elev = domain.quantities['elevation']
self.stage = domain.quantities['stage']
#self.height = domain.quantities['height']
self.xmom = domain.quantities['xmomentum']
self.ymom = domain.quantities['ymomentum']
def __repr__(self):
""" Return a representation of this instance. """
msg = 'Characteristic_stage_boundary '
msg += '(%s) ' % self.domain
msg += '(%s) ' % self.default_stage
return msg
def evaluate(self, vol_id, edge_id):
"""Calculate reflections (reverse outward momentum).
vol_id
edge_id
"""
t = self.domain.get_time()
value = self.function(t)
try:
w_outside = float(value)
except:
w_outside = float(value[0])
q = num.zeros(len(self.conserved_quantities), float)
q[0] = self.stage.edge_values[vol_id, edge_id]
q[1] = self.xmom.edge_values[vol_id, edge_id]
q[2] = self.ymom.edge_values[vol_id, edge_id]
elev = self.elev.edge_values[vol_id, edge_id]
normal = self.normals[vol_id, 2*edge_id:2*edge_id+2]
uh_inside = normal[0]*q[1] + normal[1]*q[2]
vh_inside = normal[1]*q[1] - normal[0]*q[2]
# use elev as elev both inside and outside
h_outside = max(w_outside - elev,0)
h_inside = max(q[0] - elev, 0)
u_inside = uh_inside/h_inside
sqrt_g = gravity**0.5
sqrt_h_inside = h_inside**0.5
sqrt_h_outside = h_outside**0.5
h_m = (0.5*(sqrt_h_inside+sqrt_h_outside) + u_inside/4.0/sqrt_g )**2
u_m = 0.5*u_inside + sqrt_g*(sqrt_h_inside - sqrt_h_outside)
uh_m = h_m*u_m
vh_m = vh_inside
# if uh_inside > 0.0 then outflow
if uh_inside > 0.0 :
vh_m = vh_inside
else:
vh_m = 0.0
if h_inside == 0.0:
q[0] = w_outside
q[1] = 0.0
q[2] = 0.0
else:
q[0] = h_m + elev
q[1] = uh_m*normal[0] + vh_m*normal[1]
q[2] = uh_m*normal[1] - vh_m*normal[0]
return q
def evaluate_segment(self, domain, segment_edges):
"""Apply BC on the boundary edges defined by
segment_edges
"""
Stage = domain.quantities['stage']
Elev = domain.quantities['elevation']
Xmom = domain.quantities['xmomentum']
Ymom = domain.quantities['ymomentum']
ids = segment_edges
vol_ids = domain.boundary_cells[ids]
edge_ids = domain.boundary_edges[ids]
Normals = domain.normals
n1 = Normals[vol_ids,2*edge_ids]
n2 = Normals[vol_ids,2*edge_ids+1]
# Get stage value
t = self.domain.get_time()
value = self.function(t)
try:
w_outside = float(value)
except:
w_outside = float(value[0])
# Transfer these quantities to the boundary array
Stage.boundary_values[ids] = Stage.edge_values[vol_ids,edge_ids]
Xmom.boundary_values[ids] = Xmom.edge_values[vol_ids,edge_ids]
Ymom.boundary_values[ids] = Ymom.edge_values[vol_ids,edge_ids]
Elev.boundary_values[ids] = Elev.edge_values[vol_ids,edge_ids]
h_inside = num.maximum(Stage.boundary_values[ids]-Elev.boundary_values[ids], 0.0)
w_outside = 0.0*Stage.boundary_values[ids] + w_outside
# Do vectorized operations here
#
# In dry cells, the values will be ....
q0_dry = num.where(Elev.boundary_values[ids] <= w_outside, w_outside, Elev.boundary_values[ids])
q1_dry = 0.0 * Xmom.boundary_values[ids]
q2_dry = 0.0 * Ymom.boundary_values[ids]
#
# and in wet cells, the values will be ...
# (see 'evaluate' method above for more comments on theory,
# in particular we assume subcritical flow and a zero outside velocity)
#
# (note: When cells are dry, this calculation will throw invalid
# values, but such values will never be selected to be returned)
sqrt_g = gravity**0.5
h_inside = num.maximum(Stage.boundary_values[ids] - Elev.boundary_values[ids], 0)
uh_inside = n1 * Xmom.boundary_values[ids] + n2 * Ymom.boundary_values[ids]
vh_inside = n2 * Xmom.boundary_values[ids] - n1 * Ymom.boundary_values[ids]
u_inside = num.where(h_inside>0.0, uh_inside/h_inside, 0.0)
h_outside = num.maximum(w_outside - Elev.boundary_values[ids], 0)
sqrt_h_inside = h_inside**0.5
sqrt_h_outside = h_outside**0.5
h_m = (0.5*(sqrt_h_inside+sqrt_h_outside) + u_inside/4.0/sqrt_g )**2
u_m = 0.5*u_inside + sqrt_g*(sqrt_h_inside - sqrt_h_outside)
uh_m = h_m*u_m
# if uh_inside > 0.0 then outflow
vh_m = num.where(uh_inside > 0.0, vh_inside, 0.0)
w_m = h_m + Elev.boundary_values[ids]
dry_test = num.logical_or(h_inside == 0.0, h_outside == 0.0)
q1 = uh_m*n1 + vh_m*n2
q2 = uh_m*n2 - vh_m*n1
Stage.boundary_values[ids] = num.where(
dry_test,
w_outside,
w_m
)
Xmom.boundary_values[ids] = num.where(
dry_test,
0.0,
q1
)
Ymom.boundary_values[ids] = num.where(
dry_test,
0.0,
q2)
class Dirichlet_discharge_boundary(Boundary):
""" Class for a Dirichlet discharge boundary.
Sets stage (stage0)
Sets momentum (wh0) in the inward normal direction.
Underlying domain must be specified when boundary is instantiated
"""
def __init__(self, domain=None, stage0=None, wh0=None):
Boundary.__init__(self)
""" Instantiate a Dirichlet discharge boundary.
domain underlying domain
stage0 stag
wh0 momentum in the inward normal direction.
"""
if domain is None:
msg = 'Domain must be specified for this type of boundary'
raise Exception(msg)
if stage0 is None:
raise Exception('Stage must be specified for this type of boundary')
if wh0 is None:
wh0 = 0.0
self.domain = domain
self.stage0 = stage0
self.wh0 = wh0
def __repr__(self):
""" Return a representation of this instance. """
return 'Dirichlet_Discharge_boundary(%s)' % self.domain
def evaluate(self, vol_id, edge_id):
"""Set discharge in the (inward) normal direction"""
normal = self.domain.get_normal(vol_id,edge_id)
q = [self.stage0, -self.wh0*normal[0], -self.wh0*normal[1]]
return q
# FIXME: Consider this (taken from File_boundary) to allow
# spatial variation
# if vol_id is not None and edge_id is not None:
# i = self.boundary_indices[ vol_id, edge_id ]
# return self.F(t, point_id = i)
# else:
# return self.F(t)
class Inflow_boundary(Boundary):
"""Apply given flow in m^3/s to boundary segment.
Depth and momentum is derived using Manning's formula.
Underlying domain must be specified when boundary is instantiated
"""
# FIXME (Ole): This is work in progress and definitely not finished.
# The associated test has been disabled
def __init__(self, domain=None, rate=0.0):
Boundary.__init__(self)
if domain is None:
msg = 'Domain must be specified for '
msg += 'Inflow boundary'
raise Exception(msg)
self.domain = domain
# FIXME(Ole): Allow rate to be time dependent as well
self.rate = rate
self.tag = None # Placeholder for tag associated with this object.
def __repr__(self):
return 'Inflow_boundary(%s)' %self.domain
def evaluate(self, vol_id, edge_id):
"""Apply inflow rate at each edge of this boundary
"""
# First find all segments having the same tag is vol_id, edge_id
# This will be done the first time evaluate is called.
if self.tag is None:
boundary = self.domain.boundary
self.tag = boundary[(vol_id, edge_id)]
# Find total length of boundary with this tag
length = 0.0
for v_id, e_id in boundary:
if self.tag == boundary[(v_id, e_id)]:
length += self.domain.mesh.get_edgelength(v_id, e_id)
self.length = length
self.average_momentum = self.rate/length
# Average momentum has now been established across this boundary
# Compute momentum in the inward normal direction
inward_normal = -self.domain.mesh.get_normal(vol_id, edge_id)
xmomentum, ymomentum = self.average_momentum * inward_normal
# Compute depth based on Manning's formula v = 1/n h^{2/3} sqrt(S)
# Where v is velocity, n is manning's coefficient, h is depth
# and S is the slope into the domain.
# Let mu be the momentum (vh), then this equation becomes:
# mu = 1/n h^{5/3} sqrt(S)
# from which we can isolate depth to get
# h = (mu n/sqrt(S) )^{3/5}
slope = 0 # get gradient for this triangle dot normal
epsilon = 1.0e-12
import math
# get manning coef from this triangle
friction = self.domain.get_quantity('friction').get_values(\
location='edges', indices=[vol_id])[0]
mannings_n = friction[edge_id]
if slope > epsilon and mannings_n > epsilon:
depth = pow(self.average_momentum * mannings_n/math.sqrt(slope), \
3.0/5)
else:
depth = 1.0
# Elevation on this edge
z = self.domain.get_quantity('elevation').get_values(\
location='edges', indices=[vol_id])[0]
elevation = z[edge_id]
# Assign conserved quantities and return
q = num.array([elevation + depth, xmomentum, ymomentum], float)
return q
[docs]class Field_boundary(Boundary):
"""Set boundary from given field.
Given field is represented in an sww file containing
values for stage, xmomentum and ymomentum.
Optionally, the user can specify mean_stage to offset the stage provided
in the sww file.
This function is a thin wrapper around the generic File_boundary. The
difference between the File_boundary and Field_boundary is only that the
Field_boundary will allow you to change the level of the stage height when
you read in the boundary condition. This is very useful when running
different tide heights in the same area as you need only to convert one
boundary condition to a SWW file, ideally for tide height of 0 m
(saving disk space). Then you can use Field_boundary to read this SWW file
and change the stage height (tide) on the fly depending on the scenario.
"""
[docs] def __init__(self,
filename,
domain,
mean_stage=0.0,
time_thinning=1,
time_limit=None,
boundary_polygon=None,
default_boundary=None,
use_cache=False,
verbose=False):
"""Constructor
:param filename: Name of sww file containing stage and x/ymomentum
:param domain: pointer to shallow water domain for which the boundary applies
:param mean_stage: The mean water level which will be added to stage derived from the boundary condition
:param time_thinning: Will set how many time steps from the sww file read in will be interpolated to the boundary.
:param default_boundary: This will be used in case model time exceeds that available in the underlying data.
:param time_limit:
:param boundary_polygon:
:param use_cache: True if caching is to be used.
:param verbose: True if this method is to be verbose.
For example if
the sww file has 1 second time steps and is 24 hours
in length it has 86400 time steps. If you set
time_thinning to 1 it will read all these steps.
If you set it to 100 it will read every 100th step eg
only 864 step. This parameter is very useful to increase
the speed of a model run that you are setting up
and testing.
"""
# Create generic file_boundary object
self.file_boundary = File_boundary(filename,
domain,
time_thinning=time_thinning,
time_limit=time_limit,
boundary_polygon=boundary_polygon,
default_boundary=default_boundary,
use_cache=use_cache,
verbose=verbose)
# Record information from File_boundary
self.F = self.file_boundary.F
self.domain = self.file_boundary.domain
# Record mean stage
self.mean_stage = mean_stage
def __repr__(self):
""" Generate a string representation of this instance. """
return 'Field boundary'
def evaluate(self, vol_id=None, edge_id=None):
""" Calculate 'field' boundary results.
vol_id and edge_id are ignored
Return linearly interpolated values based on domain.time
"""
# Evaluate file boundary
q = self.file_boundary.evaluate(vol_id, edge_id)
# Adjust stage
for j, name in enumerate(self.domain.conserved_quantities):
if name == 'stage':
q[j] += self.mean_stage
return q
[docs]class Flather_external_stage_zero_velocity_boundary(Boundary):
"""Boundary condition based on a Flather type approach
Setting the external stage with a function, and a zero external velocity,
The idea is similar (but not identical) to that described on page 239 of
the following article::
Article{blayo05,
Title = {Revisiting open boundary conditions from the point of view of characteristic variables},
Author = {Blayo, E. and Debreu, L.},
Journal = {Ocean Modelling},
Year = {2005},
Pages = {231-252},
Volume = {9},
}
Approach
#. The external (outside boundary) stage is set with a function, the
external velocity is zero, the internal stage and velocity are taken from the
domain values.
#. Some 'characteristic like' variables are computed, depending on whether
the flow is incoming or outgoing. See Blayo and Debreu (2005)
#. The boundary conserved quantities are computed from these characteristic
like variables
This has been useful as a 'weakly reflecting' boundary when the stage should
be approximately specified but allowed to adapt to outgoing waves.
"""
[docs] def __init__(self, domain=None, function=None):
"""Create boundary condition object.
:param domain: The domain on which to apply boundary condition
:param function: Function to apply on the boundary
Example:
.. code:: python
def waveform(t):
return sea_level + normalized_amplitude/cosh(t-25)**2
Bf = Flather_external_stage_zero_velocity_boundary(domain, waveform)
"""
Boundary.__init__(self)
if domain is None:
msg = 'Domain must be specified for this type boundary'
raise Exception(msg)
if function is None:
msg = 'Function must be specified for this type boundary'
raise Exception(msg)
self.domain = domain
self.function = function
def __repr__(self):
""" Return a representation of this instance. """
msg = 'Flather_external_stage_zero_velocity_boundary'
msg += '(%s)' % self.domain
return msg
def evaluate(self, vol_id, edge_id):
"""
"""
q = self.domain.get_conserved_quantities(vol_id, edge = edge_id)
bed = self.domain.quantities['elevation'].centroid_values[vol_id]
depth_inside=max(q[0]-bed,0.0)
dt=self.domain.timestep
normal = self.domain.get_normal(vol_id, edge_id)
t = self.domain.get_time()
value = self.function(t)
try:
stage_outside = float(value)
except:
stage_outside = float(value[0])
if(depth_inside==0.):
q[0] = stage_outside
q[1] = 0.
q[2] = 0.
else:
# Asssume sub-critical flow. Set the values of the characteristics as
# appropriate, depending on whether we have inflow or outflow
# These calculations are based on the paper cited above
sqrt_g_on_depth_inside = (gravity/depth_inside)**0.5
ndotq_inside = (normal[0]*q[1] + normal[1]*q[2]) # momentum perpendicular to the boundary
if(ndotq_inside>0.):
# Outflow (assumed subcritical)
# Compute characteristics using a particular extrapolation
#
# Theory: 2 characteristics coming from inside domain, only
# need to impose one characteristic from outside
#
# w1 = u - sqrt(g/depth)*(Stage_outside) -- uses 'outside' info
w1 = 0. - sqrt_g_on_depth_inside*stage_outside
# w2 = v [velocity parallel to boundary] -- uses 'inside' info
w2 = (+normal[1]*q[1] -normal[0]*q[2])/depth_inside
# w3 = u + sqrt(g/depth)*(Stage_inside) -- uses 'inside info'
w3 = ndotq_inside/depth_inside + sqrt_g_on_depth_inside*q[0]
else:
# Inflow (assumed subcritical)
# Need to set 2 characteristics from outside information
# w1 = u - sqrt(g/depth)*(Stage_outside) -- uses 'outside' info
w1 = 0. - sqrt_g_on_depth_inside*stage_outside
# w2 = v [velocity parallel to boundary] -- uses 'outside' info
w2 = 0.
# w3 = u + sqrt(g/depth)*(Stage_inside) -- uses 'inside info'
w3 = ndotq_inside/depth_inside + sqrt_g_on_depth_inside*q[0]
q[0] = (w3-w1)/(2*sqrt_g_on_depth_inside)
qperp= (w3+w1)/2.*depth_inside
qpar= w2*depth_inside
# So q[1], q[2] = qperp*(normal[0], normal[1]) + qpar*(-normal[1], normal[0])
q[1] = qperp*normal[0] + qpar*normal[1]
q[2] = qperp*normal[1] - qpar*normal[0]
return q
def evaluate_segment(self, domain, segment_edges):
"""Applied in vectorized form for speed. Gareth Davies 14/07/2016
"""
Stage = domain.quantities['stage']
Elev = domain.quantities['elevation']
#Height= domain.quantities['height']
Xmom = domain.quantities['xmomentum']
Ymom = domain.quantities['ymomentum']
ids = segment_edges
vol_ids = domain.boundary_cells[ids]
edge_ids = domain.boundary_edges[ids]
Normals = domain.normals
n1 = Normals[vol_ids,2*edge_ids]
n2 = Normals[vol_ids,2*edge_ids+1]
# Get stage value
t = self.domain.get_time()
value = self.function(t)
try:
stage_outside = float(value)
except:
stage_outside = float(value[0])
# Transfer these quantities to the boundary array
Stage.boundary_values[ids] = Stage.edge_values[vol_ids,edge_ids]
Xmom.boundary_values[ids] = Xmom.edge_values[vol_ids,edge_ids]
Ymom.boundary_values[ids] = Ymom.edge_values[vol_ids,edge_ids]
Elev.boundary_values[ids] = Elev.edge_values[vol_ids,edge_ids]
bed = Elev.centroid_values[vol_ids]
depth_inside = num.maximum(Stage.boundary_values[ids]-bed, 0.0)
stage_outside = 0.0*Stage.boundary_values[ids] + stage_outside
# Do vectorized operations here
#
# In dry cells, the values will be ....
q0_dry = num.where(bed <= stage_outside, stage_outside, Elev.boundary_values[ids])
q1_dry = 0.0 * Xmom.boundary_values[ids]
q2_dry = 0.0 * Ymom.boundary_values[ids]
#
# and in wet cells, the values will be ...
# (see 'evaluate' method above for more comments on theory,
# in particular we assume subcritical flow and a zero outside velocity)
#
# (note: When cells are dry, this calculation will throw invalid
# values, but such values will never be selected to be returned)
sqrt_g_on_depth_inside = (gravity/depth_inside)**0.5
ndotq_inside = (n1 * Xmom.boundary_values[ids] +
n2 * Ymom.boundary_values[ids])
# w1 = u - sqrt(g/depth)*(Stage_outside) -- uses 'outside' info
w1 = 0.0 - sqrt_g_on_depth_inside * stage_outside
# w2 = v [velocity parallel to boundary] -- uses 'inside' or 'outside'
# info as required
w2 = num.where(ndotq_inside > 0.0,
(n2 * Xmom.boundary_values[ids] - n1 * Ymom.boundary_values[ids])/depth_inside,
0.0 * ndotq_inside)
# w3 = u + sqrt(g/depth)*(Stage_inside) -- uses 'inside info'
w3 = ndotq_inside/depth_inside + sqrt_g_on_depth_inside*Stage.boundary_values[ids]
q0_wet = (w3 - w1)/(2.0 * sqrt_g_on_depth_inside)
qperp = (w3 + w1)/2.0 * depth_inside
qpar = w2 * depth_inside
q1_wet = qperp * n1 + qpar * n2
q2_wet = qperp * n2 - qpar * n1
dry_test = num.logical_or(depth_inside == 0.0, stage_outside > bed)
Stage.boundary_values[ids] = num.where(
dry_test,
q0_dry,
q0_wet)
Xmom.boundary_values[ids] = num.where(
dry_test,
q1_dry,
q1_wet)
Ymom.boundary_values[ids] = num.where(
dry_test,
q2_dry,
q2_wet)