Source code for anuga.operators.rate_operators

"""
Rate operators (such as rain)

Constraints: See GPL license in the user guide
Version: 1.0 ($Revision: 7731 $)
"""
__author__="steve"
__date__ ="$09/03/2012 4:46:39 PM$"



from anuga.config import indent
from anuga.config import MULTIPROCESSOR_OPENMP, MULTIPROCESSOR_GPU
import numpy as num
import anuga.utilities.log as log
from anuga.utilities.function_utils import evaluate_temporal_function


from anuga import Quantity
from anuga.operators.base_operator import Operator
from anuga import Region

[docs] class Rate_operator(Operator):
[docs] def __init__(self, domain, rate=0.0, factor=1.0, region=None, indices=None, polygon=None, center=None, radius=None, default_rate=0.0, description = None, label = None, logging = False, verbose = False, monitor = False): """ Create a Rate_operator that adds water over a region at a specified rate (ms^{-1} = vol/Area/sec) Parameters specifying locaton of operator :param region: Region object where water applied :param indices: List of triangles where water applied :param polygon: List of [x,y] points specifying a polygon where water applied :param center: [x.y] point of circle where water applied :param radius: radius of circle where water applied Parameters specifying rate :param rate: scalar, function of (t), (x,y), or (x,y,t), or a Quantity, a numpy array of size (number_of_triangles), or an xarray with rate at points and time :param factor: scalar, function of t, or 2 by n numpy array time sequence, used to specify conversion from rate argument to m/s :param default_rate: use this rate if outside time interval of rate function or xarray Parameters involving communication :param description: :param label: :param logging: :param verbose: :param monitor: """ Operator.__init__(self, domain, description, label, logging, verbose) #----------------------------------------------------- # Make sure region is actually an instance of a region # Otherwise create a new region based on the other # input arguments #----------------------------------------------------- if isinstance(region,Region): region.set_verbose(verbose) self.region = region else: self.region = Region(domain, indices=indices, polygon=polygon, center=center, radius=radius, verbose=verbose) #------------------------------------------ # Local variables #------------------------------------------ self.indices = self.region.indices self.set_areas() self.set_full_indices() #-------------------------------- # Setting up rate #-------------------------------- self.rate_callable = False self.rate_spatial = False self.rate_xarray = False #------------------------------- # Check if rate is actually an xarray. # Need xarray package installed #------------------------------- try: import xarray except ImportError as e: log.debug('xarray not available, xarray rate inputs disabled: %s' % e) xarray = None if xarray is not None: if type(rate) is xarray.core.dataarray.DataArray: self.rate_xarray = True xa = rate rate = 0.0 self._prepare_xarray_rate(xa) self.set_rate(rate) self.set_default_rate(default_rate) self.default_rate_invoked = False # Flag #------------------------------ # Setting up factor, can be a scalar # or a function of time. # Limitation: factor does not currently support time-series files # or arrays. Only scalars and callables of the form f(t) are # accepted. Extend set_factor() if file/array support is needed. #------------------------------ self.factor_callable = False self.set_factor(factor) # ---------------- # Mass tracking #----------------- self.monitor = monitor self.cumulative_influx = 0.0 self.local_influx=0.0 self.local_max = 0.0 self.local_min = 0.0
def __call__(self): """Apply rate operator to the domain for one timestep. Adds water (or removes it when the rate is negative) to the stage quantity of each triangle selected by ``indices``, scaled by the current timestep and factor. - If ``indices`` is an empty list, no triangles are modified. - If ``indices`` is None, all triangles are modified. - Otherwise only the triangles at the given indices are modified. Returns ------- None Modifies ``domain.quantities['stage']`` (and momentum quantities when the rate is negative) in place. """ if self.indices is not None and len(self.indices) == 0: return if self.rate_xarray: # setup centroid_array from xarray corresponding to current time self._update_Q_xarray() t = self.domain.get_time() timestep = self.domain.get_timestep() factor = self.get_factor() indices = self.indices if self.rate_spatial: if indices is None: x = self.coord_c[:,0] y = self.coord_c[:,1] else: x = self.coord_c[indices,0] y = self.coord_c[indices,1] rate = self.get_spatial_rate(x,y,t) elif self.rate_type == 'quantity': if indices is None: rate = self.rate.centroid_values else: rate = self.rate.centroid_values[indices] elif self.rate_type == 'centroid_array': if indices is None: rate = self.rate else: rate = self.rate[indices] else: rate = self.get_non_spatial_rate(t) factor = self.get_factor(t) # We need to adjust the momentums if rate < 0 since otherwise # the xmom and ymom stay the same but height -> 0 which leads to xvel, yvel -> infty fid = self.full_indices if num.all(rate >= 0.0): # Record the local flux for mass conservation tracking if indices is None: local_rates = factor*timestep*rate self.local_influx = (local_rates*self.areas)[fid].sum() self.stage_c[:] = self.stage_c[:] + local_rates else: local_rates = factor*timestep*rate self.local_influx=(local_rates*self.areas)[fid].sum() self.stage_c[indices] = self.stage_c[indices] + local_rates else: # Be more careful if rate < 0 if indices is None: #self.local_influx=(num.minimum(factor*timestep*rate, self.stage_c[:]-self.elev_c[:])*self.areas)[fid].sum() #self.stage_c[:] = num.maximum(self.stage_c \ # + factor*rate*timestep, self.elev_c ) self.height_c[:] = self.stage_c[:] - self.elev_c[:] local_rates = num.maximum(factor*timestep*rate, -self.height_c) local_factors = num.where(local_rates < 0.0, (local_rates+self.height_c)/(self.height_c+1.0e-10), 1.0) #print(local_factors, local_rates) self.local_influx = (local_rates*self.areas)[fid].sum() self.stage_c[:] = self.stage_c + local_rates self.xmom_c[:] = self.xmom_c[:]*local_factors self.ymom_c[:] = self.ymom_c[:]*local_factors else: #self.local_influx=(num.minimum(factor*timestep*rate, self.stage_c[indices]-self.elev_c[indices])*self.areas)[fid].sum() #self.stage_c[indices] = num.maximum(self.stage_c[indices] \ # + factor*rate*timestep, self.elev_c[indices]) #local_rates = num.maximum(factor*timestep*rate, self.elev_c[indices]-self.stage_c[indices]) heights = self.stage_c[indices] - self.elev_c[indices] local_rates = num.maximum(factor*timestep*rate, -heights) local_factors = num.where(local_rates < 0.0, (local_rates+heights)/(heights+1.0e-10), 1.0) #print(local_factors, local_rates, fid) self.local_influx = (local_rates*self.areas)[fid].sum() self.stage_c[indices] = self.stage_c[indices] + local_rates self.xmom_c[indices] = self.xmom_c[indices]*local_factors self.ymom_c[indices] = self.ymom_c[indices]*local_factors try: self.local_max = (local_rates[fid].max()/timestep) self.local_min = (local_rates[fid].min()/timestep) except (TypeError, IndexError): self.local_max = local_rates/timestep self.local_min = local_rates/timestep if isinstance(self.local_max, num.ndarray) and self.local_max.size == 0: self.local_max = 0.0 self.local_min = 0.0 # print(self.local_min, self.local_max) self.cumulative_influx += self.local_influx # Update mass inflows from fractional steps self.domain.fractional_step_volume_integral+=self.local_influx if self.monitor: log.critical('Local Flux at time %.2f = %f' % (self.domain.get_time(), self.local_influx)) return def get_non_spatial_rate(self, t=None): """Provide a rate to calculate added volume """ if t is None: t = self.get_time() assert not self.rate_spatial rate = evaluate_temporal_function(self.rate, t, default_right_value=self.default_rate, default_left_value=self.default_rate) if rate is None: msg = ('Attribute rate must be specified in '+self.__name__+ ' before attempting to call it') raise Exception(msg) return rate def get_spatial_rate(self, x=None, y=None, t=None): """Provide a rate to calculate added volume only call if self.rate_spatial = True """ assert self.rate_spatial if t is None: t = self.get_time() if x is None: assert y is None if self.indices is None: x = self.coord_c[:,0] y = self.coord_c[:,1] else: x = self.coord_c[self.indices,0] y = self.coord_c[self.indices,1] assert x is not None assert y is not None assert isinstance(t, (int, float)) assert len(x) == len(y) #print xy #print t #print self.rate_type, self.rate_type == 'x,y,t' if self.rate_type == 'x,y,t': rate = self.rate(x,y,t) else: rate = self.rate(x,y) return rate def set_rate(self, rate): """Set rate. Can change rate while running Can be a scalar, numpy array, or a function of t or x,y or x,y,t or a quantity """ # Test if rate is a quantity if isinstance(rate, Quantity): self.rate_type = 'quantity' elif isinstance(rate, num.ndarray): rate_shape = rate.shape msg = f"The shape {rate_shape} of the input rate " msg += f"should match (number of triangles,) i.e. ({self.domain.number_of_triangles},)" assert rate_shape == (self.domain.number_of_triangles,) \ or rate_shape == (self.domain.number_of_triangles, 1), msg self.rate_type = 'centroid_array' rate = rate.reshape((-1,)) else: # Possible types are 'scalar', 't', 'x,y' and 'x,y,t' from anuga.utilities.function_utils import determine_function_type self.rate_type = determine_function_type(rate) self.rate = rate if self.rate_type == 'scalar': self.rate_callable = False self.rate_spatial = False elif self.rate_type == 'quantity': self.rate_callable = False self.rate_spatial = False elif self.rate_type == 'centroid_array': self.rate_callable = False self.rate_spatial = False elif self.rate_type == 't': self.rate_callable = True self.rate_spatial = False else: self.rate_callable = True self.rate_spatial = True def set_factor(self, factor): """Set factor. Can change factor while running Can be a scalar, a function of t, or an n by 2 numpy array defining a time sequence """ if isinstance(factor, num.ndarray): factor_shape = factor.shape msg = f"The shape {factor_shape} of the input factor " msg += f"should be (2,n) so that a time function can be constructed" assert factor_shape[0] == 2, msg self.factor_type = 'time_sequence' from scipy.interpolate import interp1d factor = interp1d(factor[0,:], factor[1,:], kind='zero', bounds_error=False, fill_value = (0.0, 0.0)) from anuga.utilities.function_utils import determine_function_type self.factor_type = determine_function_type(factor) self.factor = factor if self.factor_type == 'scalar': self.factor_callable = False elif self.factor_type == 't': self.factor_callable = True else: msg = f'factor must be a scalar or a function of t. It was determined to be a function of {self.factor_type}' raise Exception(msg) def get_factor(self, t=None): """Provide a factor to calculate added volume """ if t is None: t = self.get_time() assert isinstance(t, (int, float)) if self.factor_type == 'scalar': factor = self.factor else: factor = self.factor(t) return factor def set_areas(self): if self.indices is None: self.areas = self.domain.areas return if self.indices is not None and len(self.indices) == 0: self.areas = num.array([]) return self.areas = self.domain.areas[self.indices] def set_full_indices(self): if self.indices is None: self.full_indices = num.where(self.domain.tri_full_flag ==1)[0] return if self.indices is not None and len(self.indices) == 0: self.full_indices = num.array([], dtype=int) return self.full_indices = num.where(self.domain.tri_full_flag[self.indices] == 1)[0] def get_Q(self, full_only=True): """ Calculate current overall discharge """ # FIXME SR: this does not take into account the zeroing of large negative rates factor = self.get_factor() if full_only: if self.rate_spatial: rate = self.get_spatial_rate() # rate is an array fid = self.full_indices return num.sum(self.areas[fid]*rate[fid])*factor elif self.rate_type == 'quantity': rate = self.rate.centroid_values # rate is a quantity fid = self.full_indices return num.sum(self.areas[fid]*rate[fid])*factor elif self.rate_type == 'centroid_array': rate = self.rate # rate is already a centroid sized array fid = self.full_indices return num.sum(self.areas[fid]*rate[fid])*factor else: rate = self.get_non_spatial_rate() # rate is a scalar fid = self.full_indices return num.sum(self.areas[fid]*rate)*factor else: if self.rate_spatial: rate = self.get_spatial_rate() # rate is an array return num.sum(self.areas*rate)*factor elif self.rate_type == 'quantity': rate = self.rate.centroid_values # rate is a quantity return num.sum(self.areas*rate)*factor elif self.rate_type == 'centroid_array': rate = self.rate # rate is already a centroid sized array return num.sum(self.areas*rate)*factor else: rate = self.get_non_spatial_rate() # rate is a scalar return num.sum(self.areas*rate)*factor def set_default_rate(self, default_rate): """ Check and store default_rate """ msg = ('Default_rate must be either None ' 'a scalar, or a function of time.\nI got %s.' % str(default_rate)) assert (default_rate is None or isinstance(default_rate, (int, float)) or callable(default_rate)), msg #------------------------------------------ # Allow longer than data #------------------------------------------ if default_rate is not None: # If it is a constant, make it a function if not callable(default_rate): tmp = default_rate default_rate = lambda t: tmp # Check that default_rate is a function of one argument try: default_rate(0.0) except TypeError: raise Exception(msg) self.default_rate = default_rate def _prepare_xarray_rate(self, xa): import numpy as np import pandas # to speed up parallel code it helps to load the xarray self.xa = xa.load() self.xa['time'] = pandas.to_datetime(xa['time'], utc=True) # these are absolute coord (since we haven't implemented offsets) # Convert to relative coords to domain xllcorner and yllcorner xllcorner = self.domain.geo_reference.xllcorner yllcorner = self.domain.geo_reference.yllcorner self.xy = np.array([self.xa['eastings']-xllcorner, self.xa['northings']-yllcorner]).T # Determine data timestep from xarray. We assume the timestep is constant, so just test first 2 timeslices. try: data_dt = (self.xa['time'][1].values.astype('int64')-self.xa['time'][0].values.astype('int64'))/1.0e9 self.domain.set_evolve_max_timestep(min(data_dt, self.domain.get_evolve_max_timestep())) except Exception: # if we can't determine the timestep probably means there is just one timeslice so just pass from scipy.spatial import KDTree tree = KDTree(self.xy) if self.verbose: print(tree.size, self.xy.shape) #dd, ii = tree.query(self.domain.centroid_coordinates) dd, ii = tree.query(self.domain.get_centroid_coordinates(absolute=True)) self.ii = ii self.previous_Q_ref_time = None self.previous_Q_numpy = None def _update_Q_xarray(self): import pandas current_utc_datetime64 = pandas.to_datetime(self.domain.get_datetime()).tz_convert('UTC')#.replace(tzinfo=None) if self.verbose: print(f"{self.domain.get_time()} {self.domain.get_datetime()}") print(f"UTC time {current_utc_datetime64} type {type(current_utc_datetime64)} ") print(self.xa.sel(time=current_utc_datetime64, method='nearest')) try: Q_ref = self.xa.sel(time=current_utc_datetime64, method="ffill", tolerance='5m') Q_ref_time = Q_ref['time'].values if self.verbose: print(f"UTC time {current_utc_datetime64} Q_ref time {Q_ref_time} {Q_ref_time == self.previous_Q_ref_time} ") optimize = True if optimize: if Q_ref_time == self.previous_Q_ref_time : Q_numpy = self.previous_Q_numpy else: Q_numpy = Q_ref[self.ii].to_numpy() self.previous_Q_numpy = Q_numpy self.previous_Q_ref_time = Q_ref_time else: Q_numpy = Q_ref[self.ii].to_numpy() except Exception: Q_numpy = self.default_rate if self.verbose: print(f"UTC time {current_utc_datetime64} Using default rate Q = {Q_numpy(self.get_time())}") self.set_rate(rate=Q_numpy) def parallel_safe(self): """Operator is applied independently on each cell and so is parallel safe. """ return True def statistics(self): message = 'You need to implement operator statistics for your operator' return message def timestepping_statistics(self): # retrieve data from last __call__ call message = indent + self.label + f': At time {self.domain.get_time()} Min rate = {self.local_min:.2e} m/s, Max rate = {self.local_max:.2e} m/s, Total Q = {self.cumulative_influx:.2e} m^3' # if self.rate_spatial: # rate = self.get_spatial_rate() # try: # min_rate = num.min(rate) # except ValueError: # min_rate = 0.0 # try: # max_rate = num.max(rate) # except ValueError: # max_rate = 0.0 # Q = self.get_Q() # message = indent + self.label + ': Min rate = %g m/s, Max rate = %g m/s, Total Q = %g m^3/s'% (min_rate,max_rate, Q) # elif self.rate_type == 'quantity': # rate = self.get_non_spatial_rate() # return quantity # min_rate = rate.get_minimum_value() # max_rate = rate.get_maximum_value() # Q = self.get_Q() # message = indent + self.label + ': Min rate = %g m/s, Max rate = %g m/s, Total Q = %g m^3/s'% (min_rate,max_rate, Q) # elif self.rate_type == 'centroid_array': # rate = self.get_non_spatial_rate() # return centroid_array # min_rate = rate.min() # max_rate = rate.max() # Q = self.get_Q() # message = indent + self.label + ': Min rate = %g m/s, Max rate = %g m/s, Total Q = %g m^3/s'% (min_rate,max_rate, Q) # else: # rate = self.get_non_spatial_rate() # Q = self.get_Q() # message = indent + self.label + ': Rate = %g m/s, Total Q = %g m^3/s' % (rate, Q) #print(message) return message
# =============================================================================== # Specific Rate Operators for circular region. # =============================================================================== class Circular_rate_operator(Rate_operator): """ Add water at certain rate (ms^{-1} = vol/Area/sec) over a circular region rate can be a function of time. Other units can be used by using the factor argument. """ def __init__(self, domain, rate=0.0, factor=1.0, center=None, radius=None, default_rate=None, verbose=False): Rate_operator.__init__(self, domain, rate=rate, factor=factor, center=center, radius=radius, default_rate=default_rate, verbose=verbose) #=============================================================================== # Specific Rate Operators for polygonal region. #=============================================================================== class Polygonal_rate_operator(Rate_operator): """ Add water at certain rate (ms^{-1} = vol/Area/sec) over a polygonal region rate can be a function of time. Other units can be used by using the factor argument. """ def __init__(self, domain, rate=0.0, factor=1.0, polygon=None, default_rate=None, verbose=False): Rate_operator.__init__(self, domain, rate=rate, factor=factor, polygon=polygon, default_rate=default_rate, verbose=verbose)