Source code for anuga.operators.kinematic_viscosity_operator


from anuga import Quantity
from anuga.utilities.sparse import Sparse, Sparse_CSR
from anuga.utilities.cg_solve import conjugate_gradient, Stats, ConvergenceError
from anuga.utilities.cg_ext import cg_solve_c_precon, jacobi_precon_c
import anuga.abstract_2d_finite_volumes.neighbour_mesh as neighbour_mesh
from anuga import Dirichlet_boundary
import numpy as num
from . import kinematic_viscosity_operator_ext
import anuga.utilities.log as log

from anuga.operators.base_operator import Operator



[docs] class Kinematic_viscosity_operator(Operator): """ Class for setting up structures and matrices for kinematic viscosity differential operator using centroid values. As an anuga operator, when the __call__ method is called one step of the parabolic step is applied. In particular the x and y velocities are updated using du/dt = div( h grad u ) dv/dt = div( h grad v ) """
[docs] def __init__(self, domain, diffusivity='height', use_triangle_areas=True, add_safety = False, verbose=False): if verbose: log.info('Kinematic Viscosity: Beginning Initialisation') Operator.__init__(self,domain) #Expose the domain attributes self.mesh = self.domain.mesh self.boundary = domain.boundary self.boundary_enumeration = domain.boundary_enumeration self.diffusivity = diffusivity # Setup a quantity as diffusivity if diffusivity is None: self.diffusivity = Quantity(self.domain) self.diffusivity.set_values(1.0) self.diffusivity.set_boundary_values(1.0) if isinstance(diffusivity, (int, float)): self.diffusivity = Quantity(self.domain) self.diffusivity.set_values(diffusivity) self.diffusivity.set_boundary_values(diffusivity) if isinstance(diffusivity, str): self.diffusivity = self.domain.get_quantity(diffusivity) self.add_safety = add_safety self.smooth = 0.1 assert isinstance(self.diffusivity, Quantity) # n: total local triangles (full + ghost in parallel; all in serial) # n_full: full (owned) triangles only self.n = len(self.domain) self.n_full = domain.number_of_full_triangles self.dt = 0.0 #Need to set to domain.timestep self.dt_apply = 0.0 self.boundary_len = len(self.domain.boundary) self.tot_len = self.n + self.boundary_len self.verbose = verbose #Geometric Information if verbose: log.info('Kinematic Viscosity: Building geometric structure') self.geo_structure_indices = num.zeros((self.n, 3), int) self.geo_structure_values = num.zeros((self.n, 3), float) # Only needs to built once, doesn't change kinematic_viscosity_operator_ext.build_geo_structure(self) # Setup type of scaling self.set_triangle_areas(use_triangle_areas) # FIXME SR: should this really be a matrix? temp = Sparse(self.n, self.n) for i in range(self.n): temp[i, i] = 1.0 / self.mesh.areas[i] self.triangle_areas = Sparse_CSR(temp) # FIXME SR: More to do with solving equation self.qty_considered = 1 #1 or 2 (uh or vh respectively) #Sparse_CSR.data self.operator_data = num.zeros((4 * self.n, ), float) #Sparse_CSR.colind self.operator_colind = num.zeros((4 * self.n, ), int) #Sparse_CSR.rowptr (4 entries in every row, we know this already) = [0,4,8,...,4*n] self.operator_rowptr = 4 * num.arange(self.n + 1) # Build matrix self.elliptic_matrix [A B] self.build_elliptic_matrix(self.diffusivity) self.boundary_term = num.zeros((self.n, ), float) self.parabolic = False #Are we doing a parabolic solve at the moment? self.u_stats = None self.v_stats = None # Pre-allocate workspace for the distributed CG matvec self._kv_V = num.zeros(self.tot_len, dtype=float) if verbose: log.info('Kinematic Viscosity: Initialisation Done')
def parallel_safe(self): return True def __call__(self): """ Parabolic update of x and y velocity (I + dt (div d grad) ) U^{n+1} = U^{n} """ domain = self.domain self.dt = self.dt + domain.get_timestep() if self.dt < self.dt_apply: return # Setup initial values of velocity domain.update_centroids_of_velocities_and_height() # diffusivity if self.add_safety: d = self.diffusivity + 0.1 else: d = self.diffusivity assert num.all(d.centroid_values >= 0.0) # Quantities to solve # Boundary values are implied from BC set in advection part of code u = domain.quantities['xvelocity'] v = domain.quantities['yvelocity'] #Update operator using current height self.update_elliptic_matrix(d) (u, self.u_stats) = self.parabolic_solve(u, u, d, u_out=u, update_matrix=False, output_stats=True) (v, self.v_stats) = self.parabolic_solve(v, v, d, u_out=v, update_matrix=False, output_stats=True) # Update the conserved quantities domain.update_centroids_of_momentum_from_velocity() self.dt = 0.0 def statistics(self): message = 'Kinematic_viscosity_operator ' return message def timestepping_statistics(self): from anuga import indent message = indent+'Kinematic Viscosity Operator: \n' if self.u_stats is not None: message += indent + indent + 'u: ' + self.u_stats.__str__() +'\n' if self.v_stats is not None: message += indent + indent + 'v: ' + self.v_stats.__str__() return message def set_triangle_areas(self,flag=True): self.apply_triangle_areas = flag def set_parabolic_solve(self,flag): self.parabolic = flag def build_elliptic_matrix(self, a): """ Builds matrix representing div ( a grad ) which has the form [ A B ] """ #Arrays self.operator_data, self.operator_colind, self.operator_rowptr # are setup via this call kinematic_viscosity_operator_ext.build_elliptic_matrix(self, \ a.centroid_values, \ a.boundary_values) self.elliptic_matrix = Sparse_CSR(None, \ self.operator_data, self.operator_colind, self.operator_rowptr, \ self.n, self.tot_len) def update_elliptic_matrix(self, a=None): """ Updates the data values of matrix representing div ( a grad ) If a is None then we set a = quantity which is set to 1 """ #Array self.operator_data is changed by this call, which should flow # through to the Sparse_CSR matrix. if a is None: a = Quantity(self.domain) a.set_values(1.0) a.set_boundary_values(1.0) kinematic_viscosity_operator_ext.update_elliptic_matrix(self, \ a.centroid_values, \ a.boundary_values) def update_elliptic_boundary_term(self, boundary): if isinstance(boundary, Quantity): self._update_elliptic_boundary_term(boundary.boundary_values) elif isinstance(boundary, num.ndarray): self._update_elliptic_boundary_term(boundary) else: raise TypeError('expecting quantity or numpy array') def _update_elliptic_boundary_term(self, b): """ Operator has form [A B] and u = [ u_1 ; b] u_1 associated with centroid values of u u_2 associated with boundary_values of u This procedure calculates B u_2 which can be calculated as [A B] [ 0 ; b] Assumes that update_elliptic_matrix has just been run. """ n = self.n tot_len = self.tot_len X = num.zeros((tot_len,), float) X[n:] = b self.boundary_term[:] = self.elliptic_matrix * X #Tidy up if self.apply_triangle_areas: self.boundary_term[:] = self.triangle_areas * self.boundary_term def elliptic_multiply(self, input, output=None): if isinstance(input, Quantity): assert isinstance(output, Quantity) or output is None output = self._elliptic_multiply_quantity(input, output) elif isinstance(input, num.ndarray): assert isinstance(output, num.ndarray) or output is None output = self._elliptic_multiply_array(input, output) else: raise TypeError('expecting quantity or numpy array') return output def _elliptic_multiply_quantity(self, quantity_in, quantity_out): """ Assumes that update_elliptic_matrix has been run """ if quantity_out is None: quantity_out = Quantity(self.domain) array_in = quantity_in.centroid_values array_out = quantity_out.centroid_values X = self._elliptic_multiply_array(array_in, array_out) quantity_out.set_values(X, location = 'centroids') return quantity_out def _elliptic_multiply_array(self, array_in, array_out): """ calculates [A B] [array_in ; 0] """ n = self.n tot_len = self.tot_len V = num.zeros((tot_len,), float) assert len(array_in) == n if array_out is None: array_out = num.zeros_like(array_in) V[0:n] = array_in V[n:] = 0.0 if self.apply_triangle_areas: V[0:n] = self.triangle_areas * V[0:n] array_out[:] = self.elliptic_matrix * V return array_out def parabolic_multiply(self, input, output=None): if isinstance(input, Quantity): assert isinstance(output, Quantity) or output is None output = self._parabolic_multiply_quantity(input, output) elif isinstance(input, num.ndarray): assert isinstance(output, num.ndarray) or output is None output = self._parabolic_multiply_array(input, output) else: raise TypeError('expecting quantity or numpy array') return output def _parabolic_multiply_quantity(self, quantity_in, quantity_out): """ Assumes that update_elliptic_matrix has been run """ if quantity_out is None: quantity_out = Quantity(self.domain) array_in = quantity_in.centroid_values array_out = quantity_out.centroid_values X = self._parabolic_multiply_array(array_in, array_out) quantity_out.set_values(X, location = 'centroids') return quantity_out def _parabolic_multiply_array(self, array_in, array_out): """ calculates ( [ I 0 ; 0 0] + dt [A B] ) [array_in ; 0] """ n = self.n tot_len = self.tot_len V = num.zeros((tot_len,), float) assert len(array_in) == n if array_out is None: array_out = num.zeros_like(array_in) V[0:n] = array_in V[n:] = 0.0 if self.apply_triangle_areas: V[0:n] = self.triangle_areas * V[0:n] array_out[:] = array_in - self.dt * (self.elliptic_matrix * V) return array_out def __mul__(self, vector): #Vector if self.parabolic: R = self.parabolic_multiply(vector) else: #include_boundary=False is this is *only* used for cg_solve() R = self.elliptic_multiply(vector) return R def __rmul__(self, other): #Right multiply with scalar try: other = float(other) except (ValueError, TypeError): msg = 'Sparse matrix can only "right-multiply" onto a scalar' raise TypeError(msg) else: new = self.elliptic_matrix * other return new def elliptic_solve(self, u_in, b, a = None, u_out = None, update_matrix=True, \ imax=10000, tol=1.0e-8, atol=1.0e-8, iprint=None, output_stats=False): """ Solving div ( a grad u ) = b u | boundary = g u_in, u_out, f anf g are Quantity objects Dirichlet BC g encoded into u_in boundary_values Initial guess for iterative scheme is given by centroid values of u_in Centroid values of a and b provide diffusivity and rhs Solution u is retruned in u_out """ if u_out is None: u_out = Quantity(self.domain) if update_matrix : self.update_elliptic_matrix(a) self.update_elliptic_boundary_term(u_in) # Pull out arrays and a matrix operator A = self rhs = b.centroid_values - self.boundary_term x0 = u_in.centroid_values x, stats = conjugate_gradient(A,rhs,x0,imax=imax, tol=tol, atol=atol, iprint=iprint, output_stats=True) u_out.set_values(x, location='centroids') u_out.set_boundary_values(u_in.boundary_values) if output_stats: return u_out, stats else: return u_out # ------------------------------------------------------------------ # Distributed (MPI) CG helpers # ------------------------------------------------------------------ def _exchange_ghost_vector(self, v): """In-place ghost exchange of vector v (length n = n_full + n_ghost). Full-triangle values at v[Idf] are sent to neighbouring ranks and placed into v[Idg] (ghost indices) on each receiving rank. Uses MPI tag 198 to avoid collisions with the existing tag-123 path. """ domain = self.domain if domain.numproc == 1: return import mpi4py import anuga.utilities.parallel_abstraction as pypar recv_bufs = {} recv_reqs = [] for recv_proc, recv_entry in domain.ghost_recv_dict.items(): Idg = recv_entry[0] buf = num.empty(len(Idg), dtype=float) recv_bufs[recv_proc] = (Idg, buf) recv_reqs.append(pypar.comm.Irecv(buf, recv_proc, tag=198)) send_bufs = [] send_reqs = [] for send_proc, send_entry in domain.full_send_dict.items(): Idf = send_entry[0] buf = v[Idf].copy() # contiguous copy — MPI must not alias with recv send_bufs.append(buf) send_reqs.append(pypar.comm.Isend(buf, send_proc, tag=198)) mpi4py.MPI.Request.Waitall(recv_reqs + send_reqs) for Idg, buf in recv_bufs.values(): v[Idg] = buf def _distributed_dot(self, a, b): """Global dot product: sum of local dot(a, b) across all ranks.""" local = num.array([num.dot(a, b)]) if self.domain.numproc == 1: return local[0] import anuga.utilities.parallel_abstraction as pypar from mpi4py.MPI import SUM result = num.zeros(1, dtype=float) pypar.comm.Allreduce(local, result, op=SUM) return result[0] def _parabolic_matvec_distributed(self, d_full): """Compute P * d_full for the n_full owned triangles. d_full : 1-D array of length n_full (search direction on full cells). Returns array of length n_full: result = d_full - dt * (A_{nn+ghost} * T^{-1} * d_extended)[:n_full] where d_extended = [d_full; d_ghost] after ghost exchange. """ n = self.n # full + ghost n_full = self.n_full tot_len = self.tot_len # Reuse pre-allocated workspace of length tot_len V = self._kv_V V[:n_full] = d_full V[n_full:n] = 0.0 # ghost portion — will be filled by exchange V[n:] = 0.0 # boundary virtual nodes always zero here # Fill ghost values of the search direction self._exchange_ghost_vector(V) # operates on V[0:n] # Apply T^{-1} (divide by triangle areas) for all local triangles if self.apply_triangle_areas: V[:n] = V[:n] / self.mesh.areas # element-wise, shape (n,) # V[n:] stays zero (boundary virtual nodes) # SpMV: elliptic_matrix is (n x tot_len); take only first n_full rows AV = self.elliptic_matrix * V # length n return d_full - self.dt * AV[:n_full] def _parabolic_solve_distributed(self, rhs_full, x0_full, imax, tol, atol): """Distributed CG: solve P x = rhs for x on n_full owned triangles. Dot products are reduced globally via Allreduce. Each SpMV exchanges ghost values of the search direction so that off-diagonal entries coupling full triangles to ghost triangles are correct. Returns x of length n_full. """ n_full = self.n_full x = x0_full.copy() r = rhs_full - self._parabolic_matvec_distributed(x) d = r.copy() rTr = self._distributed_dot(r, r) rTr0 = rTr if rTr0 == 0.0: return x for i in range(1, imax + 1): q = self._parabolic_matvec_distributed(d) dTq = self._distributed_dot(d, q) alpha = rTr / dTq x = x + alpha * d r = r - alpha * q rTr_new = self._distributed_dot(r, r) if rTr_new <= tol**2 * rTr0 or rTr_new <= atol**2: break bt = rTr_new / rTr d = r + bt * d rTr = rTr_new else: raise ConvergenceError( 'parabolic_solve (distributed): CG did not converge ' f'after {imax} iterations; rTr={rTr_new:.3e} rTr0={rTr0:.3e}') return x # ------------------------------------------------------------------ # _build_parabolic_csr — used only in the serial (single-rank) path # ------------------------------------------------------------------ def _build_parabolic_csr(self): """Return P = I - dt * A_{nn} * T^{-1} as a Sparse_CSR (n x n). Only interior columns (colind < n) of the elliptic matrix contribute; boundary columns are handled via boundary_term in the rhs. If apply_triangle_areas is False, T^{-1} is omitted (identity scaling). """ n = self.n dt = self.dt areas = self.mesh.areas op_data = self.operator_data # shape (4n,) op_colind = self.operator_colind # shape (4n,), some entries may be >= n # Row index for every entry (exactly 4 entries per row) row_idx = num.repeat(num.arange(n, dtype=num.int64), 4) # Keep only entries whose column is an interior triangle interior = op_colind < n i_rows = row_idx[interior] i_cols = op_colind[interior].astype(num.int64) if self.apply_triangle_areas: i_vals = -dt * op_data[interior] / areas[i_cols] else: i_vals = -dt * op_data[interior].copy() # Add identity on the diagonal i_vals[i_rows == i_cols] += 1.0 counts = num.bincount(i_rows, minlength=n).astype(num.int64) rowptr = num.zeros(n + 1, dtype=num.int64) num.cumsum(counts, out=rowptr[1:]) return Sparse_CSR(None, i_vals, i_cols, rowptr, n, n) def parabolic_solve(self, u_in, b, a = None, u_out = None, update_matrix=True, \ output_stats=False, use_dt_tol=True, iprint=None, imax=10000): """ Solve for u in the equation ( I + dt div a grad ) u = b u | boundary = g u_in, u_out, f anf g are Quantity objects Dirichlet BC g encoded into u_in boundary_values Initial guess for iterative scheme is given by centroid values of u_in Centroid values of a and b provide diffusivity and rhs Solution u is retruned in u_out """ if use_dt_tol: tol = min(self.dt, 1.0e-5) atol = min(self.dt, 1.0e-5) else: tol = 1.0e-5 atol = 1.0e-5 if u_out is None: u_out = Quantity(self.domain) if update_matrix: self.update_elliptic_matrix(a) self.update_elliptic_boundary_term(u_in) rhs = num.ascontiguousarray(b.centroid_values + self.dt * self.boundary_term) x0 = u_in.centroid_values.copy() if self.domain.numproc > 1: # ---------------------------------------------------------- # Distributed path: CG with ghost exchange + global Allreduce # ---------------------------------------------------------- n_full = self.n_full x = self._parabolic_solve_distributed( rhs[:n_full], x0[:n_full], imax, tol, atol) # Only update owned (full) triangles; ghosts are refreshed by # the domain's normal ghost exchange in the next timestep. u_out.centroid_values[:n_full] = x else: # ---------------------------------------------------------- # Serial path: materialise P and call the OpenMP C CG # ---------------------------------------------------------- P = self._build_parabolic_csr() precon = num.empty(self.n, dtype=float) jacobi_precon_c(P, precon) err = cg_solve_c_precon(P, x0, rhs, imax, tol, atol, 1, precon) if err == -1: raise ConvergenceError('parabolic_solve: conjugate gradient did not converge') u_out.set_values(x0, location='centroids') u_out.set_boundary_values(u_in.boundary_values) if output_stats: stats = Stats() stats.iter = 0 stats.rTr = 0.0 stats.rTr0 = 0.0 stats.dx = 0.0 stats.x = num.linalg.norm(u_out.centroid_values[:self.n_full]) stats.x0 = num.linalg.norm(x0[:self.n_full]) return u_out, stats else: return u_out