"""
Rate operators (such as rain)
Constraints: See GPL license in the user guide
Version: 1.0 ($Revision: 7731 $)
"""
__author__="steve"
__date__ ="$09/03/2012 4:46:39 PM$"
from anuga.config import indent
from anuga.config import MULTIPROCESSOR_OPENMP, MULTIPROCESSOR_GPU
import numpy as num
import anuga.utilities.log as log
from anuga.utilities.function_utils import evaluate_temporal_function
from anuga import Quantity
from anuga.operators.base_operator import Operator
from anuga import Region
[docs]
class Rate_operator(Operator):
[docs]
def __init__(self,
domain,
rate=0.0,
factor=1.0,
region=None,
indices=None,
polygon=None,
center=None,
radius=None,
default_rate=0.0,
description = None,
label = None,
logging = False,
verbose = False,
monitor = False):
"""Create a Rate_operator that adds water over a region at a specified rate.
The applied rate is ``rate * factor`` in m/s (depth per second).
For common use-cases prefer the factory constructors:
* ``Rate_operator.rainfall(domain, rate_mm_hr)`` — rainfall in mm/hr
* ``Rate_operator.inflow(domain, rate_m3_s)`` — inflow in m³/s
Parameters
----------
domain : anuga.Domain
The simulation domain.
rate : scalar, callable, Quantity, or ndarray
Rate in m/s (after multiplication by *factor*). May be a scalar,
a function of ``t``, ``(x, y)``, or ``(x, y, t)``, a Quantity,
a numpy array of shape ``(n_triangles,)``, or an xarray DataArray.
factor : scalar or callable(t)
Multiplier applied to *rate* before adding to stage. Use to
convert units (e.g. ``1/(1000*3600)`` for mm/hr → m/s).
region : Region, optional
Pre-built Region. Cannot be combined with *polygon*, *center*,
*radius*, or *indices*.
indices : list, optional
Triangle indices where the rate is applied.
polygon : list of [x, y], optional
Polygon bounding the application area.
center : [x, y], optional
Centre of a circular application area.
radius : float, optional
Radius of the circular application area.
default_rate : scalar or callable(t), optional
Rate to use outside the time interval of the rate function/xarray.
description, label, logging, verbose, monitor :
Passed to the base ``Operator``.
"""
# --------------------------------------------------
# Input validation
# --------------------------------------------------
if isinstance(region, Region) and any(
arg is not None for arg in (polygon, center, radius, indices)):
raise ValueError(
'Rate_operator: cannot specify both a Region object and '
'polygon/center/radius/indices — the Region already encodes location')
if not (rate is None
or isinstance(rate, (int, float, num.ndarray))
or callable(rate)
or hasattr(rate, 'centroid_values')): # Quantity duck-type
raise TypeError(
'Rate_operator: rate must be a number, callable, Quantity, or '
'numpy array; got %s' % type(rate).__name__)
Operator.__init__(self, domain, description, label, logging, verbose)
#-----------------------------------------------------
# Make sure region is actually an instance of a region
# Otherwise create a new region based on the other
# input arguments
#-----------------------------------------------------
if isinstance(region,Region):
region.set_verbose(verbose)
self.region = region
else:
self.region = Region(domain,
indices=indices,
polygon=polygon,
center=center,
radius=radius,
verbose=verbose)
#------------------------------------------
# Local variables
#------------------------------------------
self.indices = self.region.indices
self.set_areas()
self.set_full_indices()
#--------------------------------
# Setting up rate
#--------------------------------
self.rate_callable = False
self.rate_spatial = False
self.rate_xarray = False
#-------------------------------
# Check if rate is actually an xarray.
# Need xarray package installed
#-------------------------------
try:
import xarray
except ImportError as e:
log.debug('xarray not available, xarray rate inputs disabled: %s' % e)
xarray = None
if xarray is not None:
if type(rate) is xarray.core.dataarray.DataArray:
self.rate_xarray = True
xa = rate
rate = 0.0
self._prepare_xarray_rate(xa)
self.set_rate(rate)
self.set_default_rate(default_rate)
self.default_rate_invoked = False # Flag
#------------------------------
# Setting up factor, can be a scalar
# or a function of time.
# Limitation: factor does not currently support time-series files
# or arrays. Only scalars and callables of the form f(t) are
# accepted. Extend set_factor() if file/array support is needed.
#------------------------------
self.factor_callable = False
self.set_factor(factor)
# ----------------
# Mass tracking
#-----------------
self.monitor = monitor
self.cumulative_influx = 0.0
self.local_influx=0.0
self.local_max = 0.0
self.local_min = 0.0
# ----------------
# GPU support
#-----------------
self._gpu_op_id = None # GPU operator ID (set on first GPU call)
self._gpu_initialized = False
self._gpu_rate_array_cache = None # Cached rate array for GPU (avoids recreating every call)
self._gpu_rate_min_cache = None # Cached min value for statistics
self._gpu_rate_max_cache = None # Cached max value for statistics
self._gpu_rate_changed = True # Flag to indicate rate data needs to be transferred to GPU
def _init_gpu(self):
"""Initialize GPU operator for this rate operator."""
if self._gpu_initialized:
return
# Check if domain is in GPU mode
if not hasattr(self.domain, 'multiprocessor_mode') or self.domain.multiprocessor_mode != MULTIPROCESSOR_GPU:
return
# Check if we have a GPU interface
if not hasattr(self.domain, 'gpu_interface') or self.domain.gpu_interface is None:
return
gpu_interface = self.domain.gpu_interface
if not hasattr(gpu_interface, 'gpu_dom') or gpu_interface.gpu_dom is None:
return
# Only support non-spatial, non-xarray rates for GPU
# Spatial rates (x,y or x,y,t functions) and xarray rates need more complex handling
if self.rate_spatial or self.rate_xarray:
return
# Supported rate types: scalar, t, quantity, centroid_array
if self.rate_type not in ('scalar', 't', 'quantity', 'centroid_array'):
return
# Get indices - if None, apply to all elements
if self.indices is None:
indices = num.arange(self.domain.number_of_elements, dtype=num.intc)
areas = self.domain.areas.copy()
elif hasattr(self.indices, '__len__') and len(self.indices) == 0:
# No local elements on this rank (e.g. rainfall polygon doesn't
# overlap this rank's partition). This operator is a no-op here.
# Mark as GPU-initialized so _has_cpu_only_fractional_operators
# doesn't trigger an unnecessary GPU<->CPU sync every timestep.
# __call__ still returns immediately at the empty-indices guard.
self._gpu_initialized = True
return
else:
indices = num.asarray(self.indices, dtype=num.intc)
areas = self.domain.areas[indices].copy()
# Get full indices for mass tracking
if self.full_indices is not None and len(self.full_indices) > 0:
full_indices = num.asarray(self.full_indices, dtype=num.intc)
else:
full_indices = None
from anuga.shallow_water.sw_domain_gpu_ext import init_rate_operator
self._gpu_op_id = init_rate_operator(
gpu_interface.gpu_dom,
indices,
areas.astype(num.float64),
full_indices
)
if self._gpu_op_id < 0:
raise RuntimeError(
f"Failed to register GPU rate operator '{getattr(self, 'label', repr(self))}': "
f"slot limit exceeded (MAX_RATE_OPERATORS=64). "
f"Reduce the number of Rate_operator instances or increase MAX_RATE_OPERATORS in gpu_domain.h."
)
self._gpu_initialized = True
def __call__(self):
"""Apply rate operator to the domain for one timestep.
Adds water (or removes it when the rate is negative) to the stage
quantity of each triangle selected by ``indices``, scaled by the
current timestep and factor.
- If ``indices`` is an empty list, no triangles are modified.
- If ``indices`` is None, all triangles are modified.
- Otherwise only the triangles at the given indices are modified.
Returns
-------
None
Modifies ``domain.quantities['stage']`` (and momentum quantities
when the rate is negative) in place.
"""
if self.indices is not None and len(self.indices) == 0:
return
# Check for GPU execution path
if (hasattr(self.domain, 'multiprocessor_mode') and
self.domain.multiprocessor_mode == MULTIPROCESSOR_GPU and
not self.rate_spatial and
not self.rate_xarray):
# Lazy initialization of GPU operator
if not self._gpu_initialized:
self._init_gpu()
if self._gpu_initialized and self._gpu_op_id is not None and self._gpu_op_id >= 0:
t = self.domain.get_time()
timestep = self.domain.get_timestep()
factor = self.get_factor(t)
# DEBUG: Confirm GPU path taken
#print(f"DEBUG Rate_operator: GPU path, op_id={self._gpu_op_id}, rate_type={self.rate_type}, t={t}, timestep={timestep}, factor={factor}")
# DEBUG: Check rate quantity info
#if self.rate_type == 'quantity':
# print(f"DEBUG Rate_operator: rate quantity name={self.rate.name}, rate object id={id(self.rate)}")
if self.rate_type == 'quantity':
# Quantity type - use array-based GPU kernel
from anuga.shallow_water.sw_domain_gpu_ext import apply_rate_operator_array_gpu
# Use cached rate array if available (avoids expensive array copy every RK2 step)
if self._gpu_rate_array_cache is None:
self._gpu_rate_array_cache = num.ascontiguousarray(self.rate.centroid_values, dtype=num.float64)
# Cache min/max too (avoids iterating 5M elements every call)
if self.indices is None:
self._gpu_rate_min_cache = self._gpu_rate_array_cache.min()
self._gpu_rate_max_cache = self._gpu_rate_array_cache.max()
else:
self._gpu_rate_min_cache = self._gpu_rate_array_cache[self.indices].min()
self._gpu_rate_max_cache = self._gpu_rate_array_cache[self.indices].max()
self._gpu_rate_changed = True # New cache, needs transfer
rate_array = self._gpu_rate_array_cache
# rate_array is full domain size, use_indices_into_rate=1
# Pass rate_changed flag to avoid unnecessary H2D transfer
rate_changed = 1 if self._gpu_rate_changed else 0
self.local_influx = apply_rate_operator_array_gpu(
self.domain.gpu_interface.gpu_dom,
self._gpu_op_id,
rate_array,
1, # use_indices_into_rate
rate_changed,
float(factor),
float(timestep)
)
self._gpu_rate_changed = False # Data transferred, don't repeat
# Use cached min/max for statistics
self.local_max = self._gpu_rate_max_cache * factor
self.local_min = self._gpu_rate_min_cache * factor
elif self.rate_type == 'centroid_array':
# Centroid array type - use array-based GPU kernel
from anuga.shallow_water.sw_domain_gpu_ext import apply_rate_operator_array_gpu
# Use cached rate array if available
if self._gpu_rate_array_cache is None:
self._gpu_rate_array_cache = num.ascontiguousarray(self.rate, dtype=num.float64)
# Cache min/max too
if self.indices is None:
self._gpu_rate_min_cache = self._gpu_rate_array_cache.min()
self._gpu_rate_max_cache = self._gpu_rate_array_cache.max()
else:
self._gpu_rate_min_cache = self._gpu_rate_array_cache[self.indices].min()
self._gpu_rate_max_cache = self._gpu_rate_array_cache[self.indices].max()
self._gpu_rate_changed = True # New cache, needs transfer
rate_array = self._gpu_rate_array_cache
# rate_array is full domain size, use_indices_into_rate=1
# Pass rate_changed flag to avoid unnecessary H2D transfer
rate_changed = 1 if self._gpu_rate_changed else 0
self.local_influx = apply_rate_operator_array_gpu(
self.domain.gpu_interface.gpu_dom,
self._gpu_op_id,
rate_array,
1, # use_indices_into_rate
rate_changed,
float(factor),
float(timestep)
)
self._gpu_rate_changed = False # Data transferred, don't repeat
# Use cached min/max for statistics
self.local_max = self._gpu_rate_max_cache * factor
self.local_min = self._gpu_rate_min_cache * factor
else:
# Scalar or time-dependent rate - use scalar GPU kernel
from anuga.shallow_water.sw_domain_gpu_ext import apply_rate_operator_gpu
rate = self.get_non_spatial_rate(t)
# Handle case where rate is an array (from file_function)
try:
rate_scalar = float(rate)
except (TypeError, ValueError):
rate_scalar = float(rate[0])
self.local_influx = apply_rate_operator_gpu(
self.domain.gpu_interface.gpu_dom,
self._gpu_op_id,
rate_scalar,
float(factor),
float(timestep)
)
# Estimate min/max rate for statistics
self.local_max = rate_scalar * factor if rate_scalar >= 0 else 0.0
self.local_min = rate_scalar * factor if rate_scalar < 0 else 0.0
# Update tracking
self.cumulative_influx += self.local_influx
self.domain.fractional_step_volume_integral += self.local_influx
return
# Fall back to CPU path
if self.rate_xarray:
# setup centroid_array from xarray corresponding to current time
self._update_Q_xarray()
t = self.domain.get_time()
timestep = self.domain.get_timestep()
factor = self.get_factor()
indices = self.indices
if self.rate_spatial:
if indices is None:
x = self.coord_c[:,0]
y = self.coord_c[:,1]
else:
x = self.coord_c[indices,0]
y = self.coord_c[indices,1]
rate = self.get_spatial_rate(x,y,t)
elif self.rate_type == 'quantity':
if indices is None:
rate = self.rate.centroid_values
else:
rate = self.rate.centroid_values[indices]
elif self.rate_type == 'centroid_array':
if indices is None:
rate = self.rate
else:
rate = self.rate[indices]
else:
rate = self.get_non_spatial_rate(t)
factor = self.get_factor(t)
# We need to adjust the momentums if rate < 0 since otherwise
# the xmom and ymom stay the same but height -> 0 which leads to xvel, yvel -> infty
fid = self.full_indices
if num.all(rate >= 0.0):
# Record the local flux for mass conservation tracking
if indices is None:
local_rates = factor*timestep*rate
self.local_influx = (local_rates*self.areas)[fid].sum()
self.stage_c[:] = self.stage_c[:] + local_rates
else:
local_rates = factor*timestep*rate
self.local_influx=(local_rates*self.areas)[fid].sum()
self.stage_c[indices] = self.stage_c[indices] + local_rates
else: # Be more careful if rate < 0
if indices is None:
#self.local_influx=(num.minimum(factor*timestep*rate, self.stage_c[:]-self.elev_c[:])*self.areas)[fid].sum()
#self.stage_c[:] = num.maximum(self.stage_c \
# + factor*rate*timestep, self.elev_c )
self.height_c[:] = self.stage_c[:] - self.elev_c[:]
local_rates = num.maximum(factor*timestep*rate, -self.height_c)
local_factors = num.where(local_rates < 0.0, (local_rates+self.height_c)/(self.height_c+1.0e-10), 1.0)
#print(local_factors, local_rates)
self.local_influx = (local_rates*self.areas)[fid].sum()
self.stage_c[:] = self.stage_c + local_rates
self.xmom_c[:] = self.xmom_c[:]*local_factors
self.ymom_c[:] = self.ymom_c[:]*local_factors
else:
#self.local_influx=(num.minimum(factor*timestep*rate, self.stage_c[indices]-self.elev_c[indices])*self.areas)[fid].sum()
#self.stage_c[indices] = num.maximum(self.stage_c[indices] \
# + factor*rate*timestep, self.elev_c[indices])
#local_rates = num.maximum(factor*timestep*rate, self.elev_c[indices]-self.stage_c[indices])
heights = self.stage_c[indices] - self.elev_c[indices]
local_rates = num.maximum(factor*timestep*rate, -heights)
local_factors = num.where(local_rates < 0.0, (local_rates+heights)/(heights+1.0e-10), 1.0)
#print(local_factors, local_rates, fid)
self.local_influx = (local_rates*self.areas)[fid].sum()
self.stage_c[indices] = self.stage_c[indices] + local_rates
self.xmom_c[indices] = self.xmom_c[indices]*local_factors
self.ymom_c[indices] = self.ymom_c[indices]*local_factors
try:
self.local_max = (local_rates[fid].max()/timestep) if timestep != 0.0 else 0.0
self.local_min = (local_rates[fid].min()/timestep) if timestep != 0.0 else 0.0
except (TypeError, IndexError):
self.local_max = local_rates/timestep if timestep != 0.0 else 0.0
self.local_min = local_rates/timestep if timestep != 0.0 else 0.0
if isinstance(self.local_max, num.ndarray) and self.local_max.size == 0:
self.local_max = 0.0
self.local_min = 0.0
# print(self.local_min, self.local_max)
self.cumulative_influx += self.local_influx
# Update mass inflows from fractional steps
self.domain.fractional_step_volume_integral+=self.local_influx
if self.monitor:
log.info('Local Flux at time %.2f = %f'
% (self.domain.get_time(), self.local_influx))
return
def get_non_spatial_rate(self, t=None):
"""Provide a rate to calculate added volume
"""
if t is None:
t = self.get_time()
assert not self.rate_spatial
rate = evaluate_temporal_function(self.rate, t,
default_right_value=self.default_rate,
default_left_value=self.default_rate)
if rate is None:
msg = ('Attribute rate must be specified in '+self.__name__+
' before attempting to call it')
raise Exception(msg)
return rate
def get_spatial_rate(self, x=None, y=None, t=None):
"""Provide a rate to calculate added volume
only call if self.rate_spatial = True
"""
assert self.rate_spatial
if t is None:
t = self.get_time()
if x is None:
assert y is None
if self.indices is None:
x = self.coord_c[:,0]
y = self.coord_c[:,1]
else:
x = self.coord_c[self.indices,0]
y = self.coord_c[self.indices,1]
assert x is not None
assert y is not None
assert isinstance(t, (int, float))
assert len(x) == len(y)
#print xy
#print t
#print self.rate_type, self.rate_type == 'x,y,t'
if self.rate_type == 'x,y,t':
rate = self.rate(x,y,t)
else:
rate = self.rate(x,y)
return rate
def set_rate(self, rate):
"""Set rate. Can change rate while running
Can be a scalar, numpy array, or a function of t or x,y or x,y,t or a quantity
"""
# Test if rate is a quantity
if isinstance(rate, Quantity):
self.rate_type = 'quantity'
elif isinstance(rate, num.ndarray):
rate_shape = rate.shape
msg = f"The shape {rate_shape} of the input rate "
msg += f"should match (number of triangles,) i.e. ({self.domain.number_of_triangles},)"
assert rate_shape == (self.domain.number_of_triangles,) \
or rate_shape == (self.domain.number_of_triangles, 1), msg
self.rate_type = 'centroid_array'
rate = rate.reshape((-1,))
else:
# Possible types are 'scalar', 't', 'x,y' and 'x,y,t'
from anuga.utilities.function_utils import determine_function_type
self.rate_type = determine_function_type(rate)
self.rate = rate
# Invalidate GPU caches (will be recreated on next GPU call)
# Use hasattr since set_rate() can be called from __init__ before cache is initialized
if hasattr(self, '_gpu_rate_array_cache'):
self._gpu_rate_array_cache = None
self._gpu_rate_min_cache = None
self._gpu_rate_max_cache = None
self._gpu_rate_changed = True # Signal that rate data needs to be transferred to GPU
if self.rate_type == 'scalar':
self.rate_callable = False
self.rate_spatial = False
elif self.rate_type == 'quantity':
self.rate_callable = False
self.rate_spatial = False
elif self.rate_type == 'centroid_array':
self.rate_callable = False
self.rate_spatial = False
elif self.rate_type == 't':
self.rate_callable = True
self.rate_spatial = False
else:
self.rate_callable = True
self.rate_spatial = True
def set_factor(self, factor):
"""Set factor. Can change factor while running
Can be a scalar, a function of t, or an n by 2 numpy array defining a time sequence
"""
if isinstance(factor, num.ndarray):
factor_shape = factor.shape
msg = f"The shape {factor_shape} of the input factor "
msg += "should be (2,n) so that a time function can be constructed"
assert factor_shape[0] == 2, msg
self.factor_type = 'time_sequence'
from scipy.interpolate import interp1d
factor = interp1d(factor[0,:], factor[1,:], kind='zero', bounds_error=False, fill_value = (0.0, 0.0))
from anuga.utilities.function_utils import determine_function_type
self.factor_type = determine_function_type(factor)
self.factor = factor
if self.factor_type == 'scalar':
self.factor_callable = False
elif self.factor_type == 't':
self.factor_callable = True
else:
msg = f'factor must be a scalar or a function of t. It was determined to be a function of {self.factor_type}'
raise Exception(msg)
def get_factor(self, t=None):
"""Provide a factor to calculate added volume
"""
if t is None:
t = self.get_time()
assert isinstance(t, (int, float))
if self.factor_type == 'scalar':
factor = self.factor
else:
factor = self.factor(t)
return factor
def set_areas(self):
if self.indices is None:
self.areas = self.domain.areas
return
if self.indices is not None and len(self.indices) == 0:
self.areas = num.array([])
return
self.areas = self.domain.areas[self.indices]
def set_full_indices(self):
if self.indices is None:
self.full_indices = num.where(self.domain.tri_full_flag ==1)[0]
return
if self.indices is not None and len(self.indices) == 0:
self.full_indices = num.array([], dtype=int)
return
self.full_indices = num.where(self.domain.tri_full_flag[self.indices] == 1)[0]
def get_Q(self, full_only=True):
""" Calculate current overall discharge
"""
# FIXME SR: this does not take into account the zeroing of large negative rates
factor = self.get_factor()
if full_only:
if self.rate_spatial:
rate = self.get_spatial_rate() # rate is an array
fid = self.full_indices
return num.sum(self.areas[fid]*rate[fid])*factor
elif self.rate_type == 'quantity':
rate = self.rate.centroid_values # rate is a quantity
fid = self.full_indices
return num.sum(self.areas[fid]*rate[fid])*factor
elif self.rate_type == 'centroid_array':
rate = self.rate # rate is already a centroid sized array
fid = self.full_indices
return num.sum(self.areas[fid]*rate[fid])*factor
else:
rate = self.get_non_spatial_rate() # rate is a scalar
fid = self.full_indices
return num.sum(self.areas[fid]*rate)*factor
else:
if self.rate_spatial:
rate = self.get_spatial_rate() # rate is an array
return num.sum(self.areas*rate)*factor
elif self.rate_type == 'quantity':
rate = self.rate.centroid_values # rate is a quantity
return num.sum(self.areas*rate)*factor
elif self.rate_type == 'centroid_array':
rate = self.rate # rate is already a centroid sized array
return num.sum(self.areas*rate)*factor
else:
rate = self.get_non_spatial_rate() # rate is a scalar
return num.sum(self.areas*rate)*factor
def set_default_rate(self, default_rate):
"""
Check and store default_rate
"""
msg = ('Default_rate must be either None '
'a scalar, or a function of time.\nI got %s.' % str(default_rate))
assert (default_rate is None or
isinstance(default_rate, (int, float)) or
callable(default_rate)), msg
#------------------------------------------
# Allow longer than data
#------------------------------------------
if default_rate is not None:
# If it is a constant, make it a function
if not callable(default_rate):
tmp = default_rate
default_rate = lambda t: tmp
# Check that default_rate is a function of one argument
try:
default_rate(0.0)
except TypeError:
raise Exception(msg)
self.default_rate = default_rate
# ------------------------------------------------------------------
# Factory constructors
# ------------------------------------------------------------------
@classmethod
def rainfall(cls, domain, rate, polygon=None, region=None,
center=None, radius=None, indices=None,
default_rate=0.0, label=None, description=None,
logging=False, verbose=False, monitor=False):
"""Create a Rate_operator for rainfall.
Parameters
----------
domain : anuga.Domain
rate : scalar, callable(t), or array
Rainfall intensity in **mm/hr**. All other rate forms accepted
by ``Rate_operator`` (callables, arrays) are also supported and
are interpreted as mm/hr.
polygon, region, center, radius, indices :
Location arguments — same as ``Rate_operator.__init__``.
Returns
-------
Rate_operator
Operator with ``factor = 1 / (1000 * 3600)`` so that mm/hr is
converted to m/s automatically.
Examples
--------
>>> rain = Rate_operator.rainfall(domain, rate=10.0) # 10 mm/hr
>>> rain = Rate_operator.rainfall(domain, rate=lambda t: 5.0) # time-varying
"""
MM_HR_TO_M_S = 1.0 / (1000.0 * 3600.0)
return cls(domain, rate=rate, factor=MM_HR_TO_M_S,
polygon=polygon, region=region,
center=center, radius=radius, indices=indices,
default_rate=default_rate, label=label,
description=description, logging=logging,
verbose=verbose, monitor=monitor)
@classmethod
def inflow(cls, domain, rate, polygon=None, region=None,
center=None, radius=None, indices=None,
default_rate=0.0, label=None, description=None,
logging=False, verbose=False, monitor=False):
"""Create a Rate_operator for a volumetric inflow.
Parameters
----------
domain : anuga.Domain
rate : scalar or callable(t)
Volumetric flow rate in **m³/s**. The operator divides by the
total region area so that the net inflow equals *rate* m³/s.
Returns
-------
Rate_operator
Raises
------
ValueError
If the specified region has zero area.
Examples
--------
>>> op = Rate_operator.inflow(domain, rate=0.5, polygon=poly)
>>> op = Rate_operator.inflow(domain, rate=lambda t: 0.1*t)
"""
# Build the operator with rate=0 first to resolve the region/area.
op = cls(domain, rate=0.0, factor=1.0,
polygon=polygon, region=region,
center=center, radius=radius, indices=indices,
default_rate=default_rate, label=label,
description=description, logging=logging,
verbose=verbose, monitor=monitor)
total_area = float(op.areas.sum()) if op.areas is not None and len(op.areas) > 0 else 0.0
if total_area <= 0.0:
raise ValueError(
'Rate_operator.inflow: region has zero area — '
'check that the polygon/center/radius overlaps the domain')
# Convert m³/s → m/s by dividing by region area.
# Use a closure factory so the wrapper has exactly one argument (t),
# which determine_function_type correctly classifies as type 't'.
if callable(rate):
def _make_scaled(fn, area):
def scaled(t):
return fn(t) / area
return scaled
op.set_rate(_make_scaled(rate, total_area))
else:
op.set_rate(rate / total_area)
return op
def _prepare_xarray_rate(self, xa):
import numpy as np
import pandas
# to speed up parallel code it helps to load the xarray
self.xa = xa.load()
self.xa['time'] = pandas.to_datetime(xa['time'], utc=True)
# these are absolute coord (since we haven't implemented offsets)
# Convert to relative coords to domain xllcorner and yllcorner
xllcorner = self.domain.geo_reference.xllcorner
yllcorner = self.domain.geo_reference.yllcorner
self.xy = np.array([self.xa['eastings']-xllcorner, self.xa['northings']-yllcorner]).T
# Determine data timestep from xarray. We assume the timestep is constant, so just test first 2 timeslices.
try:
data_dt = (self.xa['time'][1].values.astype('int64')-self.xa['time'][0].values.astype('int64'))/1.0e9
self.domain.set_evolve_max_timestep(min(data_dt, self.domain.get_evolve_max_timestep()))
except Exception: # if we can't determine the timestep probably means there is just one timeslice so just
pass
from scipy.spatial import KDTree
tree = KDTree(self.xy)
if self.verbose:
print(tree.size, self.xy.shape)
#dd, ii = tree.query(self.domain.centroid_coordinates)
dd, ii = tree.query(self.domain.get_centroid_coordinates(absolute=True))
self.ii = ii
self.previous_Q_ref_time = None
self.previous_Q_numpy = None
def _update_Q_xarray(self):
import pandas
current_utc_datetime64 = pandas.to_datetime(self.domain.get_datetime()).tz_convert('UTC')#.replace(tzinfo=None)
if self.verbose:
print(f"{self.domain.get_time()} {self.domain.get_datetime()}")
print(f"UTC time {current_utc_datetime64} type {type(current_utc_datetime64)} ")
print(self.xa.sel(time=current_utc_datetime64, method='nearest'))
try:
Q_ref = self.xa.sel(time=current_utc_datetime64, method="ffill", tolerance='5m')
Q_ref_time = Q_ref['time'].values
if self.verbose:
print(f"UTC time {current_utc_datetime64} Q_ref time {Q_ref_time} {Q_ref_time == self.previous_Q_ref_time} ")
optimize = True
if optimize:
if Q_ref_time == self.previous_Q_ref_time :
Q_numpy = self.previous_Q_numpy
else:
Q_numpy = Q_ref[self.ii].to_numpy()
self.previous_Q_numpy = Q_numpy
self.previous_Q_ref_time = Q_ref_time
else:
Q_numpy = Q_ref[self.ii].to_numpy()
except Exception:
Q_numpy = self.default_rate
if self.verbose:
print(f"UTC time {current_utc_datetime64} Using default rate Q = {Q_numpy(self.get_time())}")
self.set_rate(rate=Q_numpy)
def parallel_safe(self):
"""Operator is applied independently on each cell and
so is parallel safe.
"""
return True
def statistics(self):
message = 'You need to implement operator statistics for your operator'
return message
def timestepping_statistics(self):
# retrieve data from last __call__ call
message = indent + self.label + f': At time {self.domain.get_time()} Min rate = {self.local_min:.2e} m/s, Max rate = {self.local_max:.2e} m/s, Total Q = {self.cumulative_influx:.2e} m^3'
# if self.rate_spatial:
# rate = self.get_spatial_rate()
# try:
# min_rate = num.min(rate)
# except ValueError:
# min_rate = 0.0
# try:
# max_rate = num.max(rate)
# except ValueError:
# max_rate = 0.0
# Q = self.get_Q()
# message = indent + self.label + ': Min rate = %g m/s, Max rate = %g m/s, Total Q = %g m^3/s'% (min_rate,max_rate, Q)
# elif self.rate_type == 'quantity':
# rate = self.get_non_spatial_rate() # return quantity
# min_rate = rate.get_minimum_value()
# max_rate = rate.get_maximum_value()
# Q = self.get_Q()
# message = indent + self.label + ': Min rate = %g m/s, Max rate = %g m/s, Total Q = %g m^3/s'% (min_rate,max_rate, Q)
# elif self.rate_type == 'centroid_array':
# rate = self.get_non_spatial_rate() # return centroid_array
# min_rate = rate.min()
# max_rate = rate.max()
# Q = self.get_Q()
# message = indent + self.label + ': Min rate = %g m/s, Max rate = %g m/s, Total Q = %g m^3/s'% (min_rate,max_rate, Q)
# else:
# rate = self.get_non_spatial_rate()
# Q = self.get_Q()
# message = indent + self.label + ': Rate = %g m/s, Total Q = %g m^3/s' % (rate, Q)
#print(message)
return message
# ===============================================================================
# Specific Rate Operators for circular region.
# ===============================================================================
class Circular_rate_operator(Rate_operator):
"""
Add water at certain rate (ms^{-1} = vol/Area/sec) over a
circular region
rate can be a function of time.
Other units can be used by using the factor argument.
"""
def __init__(self, domain,
rate=0.0,
factor=1.0,
center=None,
radius=None,
default_rate=None,
verbose=False):
Rate_operator.__init__(self,
domain,
rate=rate,
factor=factor,
center=center,
radius=radius,
default_rate=default_rate,
verbose=verbose)
#===============================================================================
# Specific Rate Operators for polygonal region.
#===============================================================================
class Polygonal_rate_operator(Rate_operator):
"""
Add water at certain rate (ms^{-1} = vol/Area/sec) over a
polygonal region
rate can be a function of time.
Other units can be used by using the factor argument.
"""
def __init__(self, domain,
rate=0.0,
factor=1.0,
polygon=None,
default_rate=None,
verbose=False):
Rate_operator.__init__(self,
domain,
rate=rate,
factor=factor,
polygon=polygon,
default_rate=default_rate,
verbose=verbose)