Source code for anuga.structures.inlet_operator


import anuga
import numpy
from . import inlet

import warnings

[docs] class Inlet_operator(anuga.Operator): """Inlet Operator - add water to an inlet. Sets up the geometry of problem Inherit from this class (and overwrite discharge_routine method for specific subclasses) """
[docs] def __init__(self, domain, region, Q = 0.0, velocity = None, zero_velocity = False, default = 0.0, description = None, label = None, logging = False, verbose = False): """Inlet Operator - add water to a domain via an inlet. :param domain: Specify domain :param region: Apply Inlet flow over a region (which can be a Region, Polygon or line) :param Q: function(t) or scalar discharge (m^3/s) :param velocity: Optional [u,v] to set velocity of applied discharge :param zero_velocity: If set to True, velocity of inlet region set to 0 :param default: If outside time domain of the Q function, use this default discharge :param description: Describe the Inlet_operator :param label: Give Inlet_operator a label (name) :param verbose: Provide verbose output Example: >>> inflow_region = anuga.Region(domain, center=[0.0,0.0], radius=1.0) >>> inflow = anuga.Inlet_operator(domain, inflow_region, Q = lambda t : 1 + 0.5*math.sin(t/60)) """ anuga.Operator.__init__(self, domain, description, label, logging, verbose) self.inlet = inlet.Inlet(self.domain, region, verbose= verbose) # constant or function of time, m^3/s self.Q = Q if velocity is not None: assert len(velocity)==2 self.velocity = velocity self.zero_velocity = zero_velocity self.applied_Q = 0.0 self.total_applied_volume = 0.0 self.total_requested_volume = 0.0 self.set_default(default) self.activate_logging() # GPU state (lazy init on first __call__) self._gpu_op_id = None self._gpu_initialized = False
def _init_gpu(self): """Initialize GPU inlet operator (lazy, called on first __call__ in GPU mode).""" try: from anuga.shallow_water import sw_domain_gpu_ext as gpu_ext gpu_dom = self.domain.gpu_interface.gpu_dom tri_indices = numpy.ascontiguousarray( self.inlet.triangle_indices, dtype=numpy.intc) areas = numpy.ascontiguousarray( self.inlet.get_areas(), dtype=numpy.float64) op_id = gpu_ext.init_inlet_operator(gpu_dom, tri_indices, areas) if op_id < 0: raise RuntimeError( f"Failed to register GPU inlet operator '{getattr(self, 'label', repr(self))}': " f"slot limit exceeded (MAX_INLET_OPERATORS=32). " f"Reduce the number of Inlet_operator instances or increase MAX_INLET_OPERATORS in gpu_domain.h." ) self._gpu_op_id = op_id self._gpu_initialized = True except RuntimeError: raise except Exception as e: raise RuntimeError(f"GPU inlet operator init failed: {e}") from e def _call_gpu(self): """GPU path for __call__ - transfers only inlet data (~6KB).""" from anuga.shallow_water import sw_domain_gpu_ext as gpu_ext gpu_dom = self.domain.gpu_interface.gpu_dom op_id = self._gpu_op_id timestep = self.domain.get_timestep() t = self.domain.get_time() # Get current volume from GPU (small reduction, returns scalar) current_volume = gpu_ext.inlet_get_volume_gpu(gpu_dom, op_id) total_area = self.inlet.area assert current_volume >= 0.0 # Compute Q on CPU (scalar, cheap) Q1 = self.update_Q(t) Q2 = self.update_Q(t + timestep) Q = 0.5 * (Q1 + Q2) volume = Q * timestep self.applied_Q = Q # Get velocities from GPU (small D2H) vel_u, vel_v = gpu_ext.inlet_get_velocities_gpu(gpu_dom, op_id) has_velocity = 1 if self.velocity is not None else 0 ext_vel_u = self.velocity[0] if has_velocity else 0.0 ext_vel_v = self.velocity[1] if has_velocity else 0.0 zero_vel = 1 if self.zero_velocity else 0 # Apply on GPU (handles all 3 cases) actual_volume = gpu_ext.inlet_apply_gpu( gpu_dom, op_id, volume, current_volume, total_area, vel_u, vel_v, has_velocity, ext_vel_u, ext_vel_v, zero_vel) # Update tracking variables self.total_requested_volume += volume if volume >= 0.0: self.domain.fractional_step_volume_integral += volume elif current_volume + volume >= 0.0: self.domain.fractional_step_volume_integral += volume else: self.applied_Q = -current_volume / timestep self.domain.fractional_step_volume_integral -= current_volume self.total_applied_volume += actual_volume def __call__(self): # GPU path: skip full domain sync, transfer only inlet data if getattr(self.domain, 'multiprocessor_mode', 0) == 2: if not self._gpu_initialized: self._init_gpu() if self._gpu_initialized: return self._call_gpu() timestep = self.domain.get_timestep() #print('Timestep', timestep) t = self.domain.get_time() # Need to run global command on all processors current_volume = self.inlet.get_total_water_volume() total_area = self.inlet.get_area() #print(current_volume) #print(total_area) assert current_volume >= 0.0 Q1 = self.update_Q(t) Q2 = self.update_Q(t + timestep) #print Q1,Q2 Q = 0.5*(Q1+Q2) volume = Q*timestep #print(volume) #print volume #print Q, volume # store last discharge self.applied_Q = Q #print(self.domain.fractional_step_volume_integral) u,v = self.inlet.get_velocities() # Distribute positive volume so as to obtain flat surface otherwise # just pull water off to have a uniform depth. if volume >= 0.0 : #print('volume>=0.0') self.inlet.set_stages_evenly(volume) self.domain.fractional_step_volume_integral+=volume self.total_requested_volume += volume if self.velocity is not None: depths = self.inlet.get_depths() self.inlet.set_xmoms(depths*self.velocity[0]) self.inlet.set_ymoms(depths*self.velocity[1]) else: depths = self.inlet.get_depths() self.inlet.set_xmoms(depths*u) self.inlet.set_ymoms(depths*v) if self.zero_velocity: self.inlet.set_xmoms(0.0) self.inlet.set_ymoms(0.0) elif current_volume + volume >= 0.0 : depth = (current_volume + volume)/total_area self.inlet.set_depths(depth) self.total_requested_volume += volume self.domain.fractional_step_volume_integral+=volume if self.velocity is not None: depths = self.inlet.get_depths() self.inlet.set_xmoms(depths*self.velocity[0]) self.inlet.set_ymoms(depths*self.velocity[1]) else: depths = self.inlet.get_depths() self.inlet.set_xmoms(depths*u) self.inlet.set_ymoms(depths*v) if self.zero_velocity: self.inlet.set_xmoms(0.0) self.inlet.set_ymoms(0.0) else: #extracting too much water! self.inlet.set_depths(0.0) self.total_requested_volume += volume volume = -current_volume self.applied_Q = -current_volume/timestep self.domain.fractional_step_volume_integral-=current_volume self.applied_Q = - current_volume/timestep self.inlet.set_xmoms(0.0) self.inlet.set_ymoms(0.0) self.total_applied_volume += volume def update_Q(self, t): """Allowing local modifications of Q """ from anuga.fit_interpolate.interpolate import Modeltime_too_early, Modeltime_too_late if callable(self.Q): try: Q = self.Q(t) except Modeltime_too_early as e: Q = self.get_default(t,err_msg=e) except Modeltime_too_late as e: Q = self.get_default(t,err_msg=e) else: Q = self.Q return Q def statistics(self): message = '=====================================\n' message += 'Inlet Operator: %s\n' % self.label message += '=====================================\n' message += 'Description\n' message += '%s' % self.description message += '\n' inlet = self.inlet message += '-------------------------------------\n' message += 'Inlet\n' message += '-------------------------------------\n' message += 'inlet triangle indices and centres\n' message += '%s' % inlet.triangle_indices message += '\n' message += '%s' % self.domain.get_centroid_coordinates()[inlet.triangle_indices] message += '\n' message += 'region\n' message += '%s' % inlet message += '\n' message += '=====================================\n' return message def timestepping_statistics(self): message = '---------------------------\n' message += 'Inlet report for %s:\n' % self.label message += '--------------------------\n' message += 'Q [m^3/s]: %.2f\n' % self.applied_Q message += 'Total volume [m^3]: %.2f\n' % self.total_applied_volume return message def print_timestepping_statisitics(self): message = self.timestepping_statistics() print(message) def set_default(self, default=None): """ Either leave default as None or change it into a function""" if default is not None: # If it is a constant, make it a function if not callable(default): tmp = default default = lambda t: tmp # Check that default_rate is a function of one argument try: default(0.0) except TypeError: msg = "could not call default" raise Exception(msg) self.default = default self.default_invoked = False def get_default(self,t, err_msg=' '): """ Call get_default only if exception Modeltime_too_late(msg) has been raised """ # Pass control to default rate function value = self.default(t) if self.default_invoked is False: # Issue warning the first time msg = ('%s\n' 'Instead I will use the default rate: %s\n' 'Note: Further warnings will be suppressed' % (str(err_msg), str(self.default(t)))) warnings.warn(msg) # FIXME (Ole): Replace this crude flag with # Python's ability to print warnings only once. # See http://docs.python.org/lib/warning-filter.html self.default_invoked = True return value def set_Q(self, Q): self.Q = Q def get_Q(self): return self.Q def get_inlet(self): return self.inlet def get_applied_Q(self): return self.applied_Q def get_total_applied_volume(self): return self.total_applied_volume