Source code for anuga.parallel.sequential_distribute

"""Trying to lump parallel stuff into simpler interface


"""

import numpy as num

from anuga import Domain

from anuga.parallel.distribute_mesh  import send_submesh
from anuga.parallel.distribute_mesh  import rec_submesh
from anuga.parallel.distribute_mesh  import extract_submesh

# Mesh partitioning using Metis
from anuga.parallel.distribute_mesh import build_submesh
from anuga.parallel.distribute_mesh import partition_mesh

from anuga.parallel.parallel_shallow_water import Parallel_domain



class Sequential_distribute:

    def __init__(self, domain, verbose=False, debug=False, parameters=None):

        if debug:
            verbose = True

        self.domain = domain
        self.verbose = verbose
        self.debug = debug
        self.parameters = parameters


    def distribute(self, numprocs=1):

        self.numprocs = numprocs

        domain = self.domain
        verbose = self.verbose
        debug = self.debug
        parameters = self.parameters

        # FIXME: Dummy assignment (until boundaries are refactored to
        # be independent of domains until they are applied)
        bdmap = {}
        for tag in domain.get_boundary_tags():
            bdmap[tag] = None

        domain.set_boundary(bdmap)


        self.domain_name = domain.get_name()
        self.domain_dir = domain.get_datadir()
        self.domain_store = domain.get_store()
        self.domain_store_centroids = domain.get_store_centroids()
        self.domain_minimum_storable_height = domain.minimum_storable_height
        self.domain_flow_algorithm = domain.get_flow_algorithm()
        self.domain_minimum_allowed_height = domain.get_minimum_allowed_height()
        self.domain_georef = domain.geo_reference
        self.domain_quantities_to_be_stored = domain.quantities_to_be_stored
        self.domain_smooth = domain.smooth
        self.domain_low_froude = domain.low_froude
        self.number_of_global_triangles = domain.number_of_triangles
        self.number_of_global_nodes = domain.number_of_nodes
        self.boundary_map = domain.boundary_map


        # Subdivide the mesh
        if verbose: print('sequential_distribute: Subdivide mesh')

        new_mesh, triangles_per_proc, quantities, \
               s2p_map, p2s_map = \
               partition_mesh(domain, numprocs, parameters=parameters, verbose=verbose)


        # Build the mesh that should be assigned to each processor,
        # this includes ghost nodes and the communication pattern
        if verbose: print('sequential_distribute: Build submeshes')
        if verbose: print('sequential_distribute: parameters: ',parameters)

        submesh = build_submesh(new_mesh, quantities, triangles_per_proc,
                                parameters=parameters, verbose=verbose)

        if verbose:
            for p in range(numprocs):
                N = len(submesh['ghost_nodes'][p])
                M = len(submesh['ghost_triangles'][p])
                print('sequential_distribute: There are %d ghost nodes and %d ghost triangles on proc %d'\
                      %(N, M, p))


        self.submesh = submesh
        self.triangles_per_proc = triangles_per_proc
        self.p2s_map =  p2s_map


    def extract_submesh(self, p=0):
        """Build the local mesh for processor p
        """

        submesh = self.submesh
        triangles_per_proc = self.triangles_per_proc
        p2s_map = self.p2s_map
        verbose = self.verbose
        debug = self.debug

        assert p>=0
        assert p<self.numprocs


        points, vertices, boundary, quantities, \
            ghost_recv_dict, full_send_dict, \
            tri_map, node_map, tri_l2g, node_l2g, ghost_layer_width =\
              extract_submesh(submesh, triangles_per_proc, p2s_map, p)


        number_of_full_nodes = len(submesh['full_nodes'][p])
        number_of_full_triangles = len(submesh['full_triangles'][p])


        if debug:
            import pprint
            print(50*"=")
            print('sequential_distribute: NODE_L2G')
            pprint.pprint(node_l2g)

            pprint.pprint(node_l2g[vertices[:,0]])

            print('sequential_distribute: VERTICES')
            pprint.pprint(vertices[:,0])
            # FIXME: new_triangles, new_nodes, original_triangles, tri_l2orig
            # are not available in this scope — these assertions are incomplete

            print('sequential_distribute: POINTS')
            pprint.pprint(points)

            print('sequential_distribute: TRI')
            pprint.pprint(tri_l2g)
            pprint.pprint(p2s_map[tri_l2g])

            print('NODES')
            pprint.pprint(node_map)
            pprint.pprint(node_l2g)

        #tri_l2orig = p2s_map[tri_l2g]

        s2p_map = None
        p2s_map = None

        #------------------------------------------------------------------------
        # Build the parallel domain for this processor using partion structures
        #------------------------------------------------------------------------

        if verbose:
            print('sequential_distribute: P%g, no_full_nodes = %g, no_full_triangles = %g' % (p, number_of_full_nodes, number_of_full_triangles))


        kwargs = {'full_send_dict': full_send_dict,
                'ghost_recv_dict': ghost_recv_dict,
                'number_of_full_nodes': number_of_full_nodes,
                'number_of_full_triangles': number_of_full_triangles,
                'geo_reference': self.domain_georef,
                'number_of_global_triangles':  self.number_of_global_triangles,
                'number_of_global_nodes':  self.number_of_global_nodes,
                'processor':  p,
                'numproc':  self.numprocs,
                's2p_map':  s2p_map,
                'p2s_map':  p2s_map, ## jj added this
                'tri_l2g':  tri_l2g, ## SR added this
                'node_l2g':  node_l2g,
                'ghost_layer_width':  ghost_layer_width}

        boundary_map = self.boundary_map
        domain_name = self.domain_name
        domain_dir = self.domain_dir
        domain_store = self.domain_store
        domain_store_centroids = self.domain_store_centroids
        domain_minimum_storable_height = self.domain_minimum_storable_height
        domain_minimum_allowed_height = self.domain_minimum_allowed_height
        domain_flow_algorithm = self.domain_flow_algorithm
        domain_georef = self.domain_georef
        domain_quantities_to_be_stored = self.domain_quantities_to_be_stored
        domain_smooth = self.domain_smooth
        domain_low_froude = self.domain_low_froude

        tostore = (kwargs, points, vertices, boundary, quantities, \
                   boundary_map, \
                   domain_name, domain_dir, domain_store, domain_store_centroids, \
                   domain_minimum_storable_height, \
                   domain_minimum_allowed_height, domain_flow_algorithm, \
                   domain_georef, domain_quantities_to_be_stored, domain_smooth, \
                   domain_low_froude)

        return tostore


    def _release_submesh_rank(self, p):
        """Null out submesh data for rank p to allow garbage collection.

        Called by sequential_distribute_dump after rank p's files have been
        written.  Once all arrays for rank p are released the GC can recover
        the memory before proceeding to rank p+1.
        """
        submesh = self.submesh
        for key in ('full_nodes', 'full_triangles', 'full_boundary',
                    'ghost_nodes', 'ghost_triangles', 'ghost_commun',
                    'ghost_boundary', 'ghost_layer_width', 'full_commun'):
            lst = submesh.get(key)
            if lst is not None and p < len(lst):
                lst[p] = None
        for k in submesh.get('full_quan', {}):
            submesh['full_quan'][k][p] = None
        for k in submesh.get('ghost_quan', {}):
            submesh['ghost_quan'][k][p] = None




[docs] def sequential_distribute_dump(domain, numprocs=1, verbose=False, partition_dir='.', debug=False, parameters = None): """ Distribute the domain, create parallel domain and pickle result """ import gc import pickle from os.path import join partition = Sequential_distribute(domain, verbose, debug, parameters) if verbose: print('sequential_distribute_dump: Partitioning mesh to %d processes'%numprocs) partition.distribute(numprocs) # Make sure the partition_dir exists if partition_dir != '.': import os import errno try: os.makedirs(partition_dir, exist_ok=True) except OSError as exception: if exception.errno != errno.EEXIST: raise if verbose: print('sequential_distribute_dump: Dumping partitions to %s'%partition_dir) for p in range(0, numprocs): tostore = partition.extract_submesh(p) pickle_name = partition.domain_name + '_P%g_%g.pickle'% (numprocs,p) pickle_name = join(partition_dir,pickle_name) lst = list(tostore) # Write points and triangles to their own files num.save(pickle_name+".np1",tostore[1]) # num.save appends .npy to filename lst[1] = pickle_name+".np1.npy" num.save(pickle_name+".np2",tostore[2]) lst[2] = pickle_name+".np2.npy" # Write each quantity to its own file for k in tostore[4]: num.save(pickle_name+".np4."+k, num.asarray(tostore[4][k])) lst[4][k] = pickle_name+".np4."+k+".npy" with open(pickle_name, 'wb') as f: pickle.dump(tuple(lst), f, protocol=pickle.HIGHEST_PROTOCOL) # Release this rank's submesh data so memory can be reclaimed before # processing the next rank. Without this, all P subdomains' arrays # remain live throughout the entire dump loop. partition._release_submesh_rank(p) del tostore, lst # Run GC every rank — cyclic references inside submesh dicts are not # freed by refcounting alone and must be collected explicitly. gc.collect() gc.collect() return
[docs] def sequential_distribute_load(filename = 'domain', partition_dir = '.', verbose = False): from anuga import myid, numprocs from os.path import join pickle_name = filename+'_P%g_%g.pickle'% (numprocs,myid) pickle_name = join(partition_dir,pickle_name) return sequential_distribute_load_pickle_file(pickle_name, numprocs, verbose = verbose)
def sequential_distribute_load_pickle_file(pickle_name, np=1, verbose = False): """ Open pickle files """ f = open(pickle_name, 'rb') import pickle kwargs, points, vertices, boundary, quantities, boundary_map, \ domain_name, domain_dir, domain_store, domain_store_centroids, \ domain_minimum_storable_height, domain_minimum_allowed_height, \ domain_flow_algorithm, domain_georef, \ domain_quantities_to_be_stored, domain_smooth, \ domain_low_froude = pickle.load(f) f.close() # Note that quantities is a dictionary with quantity name keys and filenames of numpy arrays. # points and vertices are filenames of numpy arrays. These need to be loaded. for k in quantities: quantities[k] = num.load(quantities[k]) points = num.load(points) vertices = num.load(vertices) #--------------------------------------------------------------------------- # Create domain (parallel if np>1) #--------------------------------------------------------------------------- if np>1: domain = Parallel_domain(points, vertices, boundary, **kwargs) else: domain = Domain(points, vertices, boundary, **kwargs) #------------------------------------------------------------------------ # Copy in quantity data #------------------------------------------------------------------------ for q in quantities: domain.set_quantity(q, quantities[q], location='centroids') #------------------------------------------------------------------------ # Transfer boundary conditions to each subdomain #------------------------------------------------------------------------ boundary_map['ghost'] = None # Add binding to ghost boundary domain.set_boundary(boundary_map) #------------------------------------------------------------------------ # Transfer other attributes to each subdomain #------------------------------------------------------------------------ domain.set_name(domain_name) domain.set_datadir(domain_dir) domain.set_flow_algorithm(domain_flow_algorithm) domain.set_low_froude(domain_low_froude) domain.set_store(domain_store) domain.set_store_centroids(domain_store_centroids) domain.set_minimum_storable_height(domain_minimum_storable_height) domain.set_minimum_allowed_height(domain_minimum_allowed_height) domain.geo_reference = domain_georef domain.set_quantities_to_be_stored(domain_quantities_to_be_stored) domain.smooth = domain_smooth return domain # --------------------------------------------------------------------------- # Mesh-only partition save / load (no quantities) # --------------------------------------------------------------------------- def _write_mesh_partition(fname, rank, numprocs, points, vertices, boundary, ghost_recv_dict, full_send_dict, tri_l2g, node_l2g, number_of_full_triangles, number_of_full_nodes, number_of_global_triangles, number_of_global_nodes, ghost_layer_width, geo_ref): """Write one rank's mesh partition to a NetCDF4 file.""" import netCDF4 with netCDF4.Dataset(fname, 'w', format='NETCDF4') as nc: # --- global scalar attributes --- nc.rank = rank nc.numprocs = numprocs nc.number_of_full_triangles = number_of_full_triangles nc.number_of_full_nodes = number_of_full_nodes nc.number_of_global_triangles = number_of_global_triangles nc.number_of_global_nodes = number_of_global_nodes nc.ghost_layer_width = ghost_layer_width geo_ref.write_NetCDF(nc) Nnodes = len(points) Ntri = len(vertices) nc.createDimension('node', Nnodes) nc.createDimension('tri', Ntri) nc.createDimension('two', 2) nc.createDimension('three', 3) # --- mesh arrays --- v = nc.createVariable('points', 'f8', ('node', 'two')) v[:] = points v = nc.createVariable('vertices', 'i4', ('tri', 'three')) v[:] = vertices v = nc.createVariable('tri_l2g', 'i4', ('tri',)) v[:] = tri_l2g v = nc.createVariable('node_l2g', 'i4', ('node',)) v[:] = node_l2g # --- boundary: encode {(tri, edge): tag} as three parallel arrays --- keys = list(boundary.keys()) bnd_tris = num.array([k[0] for k in keys], dtype='i4') bnd_edges = num.array([k[1] for k in keys], dtype='i4') bnd_tags = [boundary[k] for k in keys] Nbnd = len(bnd_tris) max_tag_len = max((len(t) for t in bnd_tags), default=1) nc.createDimension('bnd', Nbnd) nc.createDimension('tag_len', max_tag_len) v = nc.createVariable('boundary_tri', 'i4', ('bnd',)) v[:] = bnd_tris v = nc.createVariable('boundary_edge', 'i4', ('bnd',)) v[:] = bnd_edges tag_var = nc.createVariable('boundary_tag', 'S1', ('bnd', 'tag_len')) for i, tag in enumerate(bnd_tags): padded = tag.ljust(max_tag_len, '\x00') tag_var[i, :] = num.frombuffer(padded.encode('ascii'), dtype='S1') # --- send/recv communication: CSR encoding --- # full_send_dict / ghost_recv_dict: {rank: [local_indices, global_indices]} for prefix, comm_dict in (('send', full_send_dict), ('recv', ghost_recv_dict)): ranks_sorted = sorted(comm_dict.keys()) local_arrays = [comm_dict[r][0] for r in ranks_sorted] global_arrays = [comm_dict[r][1] for r in ranks_sorted] offsets = num.zeros(len(ranks_sorted) + 1, dtype='i4') for i, arr in enumerate(local_arrays): offsets[i + 1] = offsets[i] + len(arr) local_packed = num.concatenate(local_arrays).astype('i4') \ if local_arrays else num.array([], dtype='i4') global_packed = num.concatenate(global_arrays).astype('i4') \ if global_arrays else num.array([], dtype='i4') nr = len(ranks_sorted) nc.createDimension(f'{prefix}_nranks', nr) nc.createDimension(f'{prefix}_nranks_plus1', nr + 1) nc.createDimension(f'{prefix}_total', len(local_packed)) v = nc.createVariable(f'{prefix}_ranks', 'i4', (f'{prefix}_nranks',)) v[:] = num.array(ranks_sorted, dtype='i4') if ranks_sorted \ else num.array([], dtype='i4') v = nc.createVariable(f'{prefix}_offsets', 'i4', (f'{prefix}_nranks_plus1',)) v[:] = offsets lv = nc.createVariable(f'{prefix}_local', 'i4', (f'{prefix}_total',)) gv = nc.createVariable(f'{prefix}_global', 'i4', (f'{prefix}_total',)) if len(local_packed): lv[:] = local_packed gv[:] = global_packed def _release_mesh_submesh_rank(submesh, p): """Null out submesh arrays for rank *p* to allow GC before next rank.""" for key in ('full_nodes', 'full_triangles', 'full_boundary', 'ghost_nodes', 'ghost_triangles', 'ghost_commun', 'ghost_boundary', 'ghost_layer_width', 'full_commun'): lst = submesh.get(key) if lst is not None and p < len(lst): lst[p] = None
[docs] def sequential_mesh_dump(domain, numprocs, partition_dir='.', name=None, verbose=False, parameters=None): """Partition a domain mesh and write one NetCDF4 file per rank. Saves mesh topology and halo structure only — no quantities. Suitable as an offline preprocessing step before large parallel runs. After loading with :func:`sequential_mesh_load` the caller sets initial conditions via ``domain.set_quantity()`` before evolving. Files are written to ``<partition_dir>/<name>_mesh_P<numprocs>_<rank>.nc``. Parameters ---------- domain : Domain or Basic_mesh Source mesh. Quantities present on the domain are ignored. numprocs : int Number of partitions to create. partition_dir : str or path-like Output directory, created if it does not exist. name : str, optional Base name for output files. Defaults to ``domain.get_name()`` when available, otherwise ``'mesh'``. verbose : bool Print progress messages if True. parameters : dict, optional Forwarded to :func:`~anuga.parallel.distribute_mesh.partition_mesh` and :func:`~anuga.parallel.distribute_mesh.build_submesh`. Recognised keys include ``'partition_scheme'`` (``'metis'``, ``'morton'``, or ``'hilbert'``), ``'ghost_layer_width'``, and ``'cache_dir'``. """ import gc import os from anuga.coordinate_transforms.geo_reference import Geo_reference if name is None: name = domain.get_name() if hasattr(domain, 'get_name') else 'mesh' os.makedirs(partition_dir, exist_ok=True) geo_ref = getattr(domain, 'geo_reference', Geo_reference()) number_of_global_triangles = domain.number_of_triangles number_of_global_nodes = domain.number_of_nodes if verbose: print(f'sequential_mesh_dump: partitioning {number_of_global_triangles}' f' triangles across {numprocs} ranks') new_mesh, triangles_per_proc, _, _s2p_map, p2s_map = partition_mesh( domain, numprocs, parameters=parameters, verbose=verbose) if verbose: print('sequential_mesh_dump: building submeshes') submesh = build_submesh(new_mesh, {}, triangles_per_proc, parameters=parameters, verbose=verbose) if verbose: for p in range(numprocs): N = len(submesh['ghost_nodes'][p]) M = len(submesh['ghost_triangles'][p]) print(f'sequential_mesh_dump: rank {p}: ' f'{len(submesh["full_triangles"][p])} full triangles, ' f'{M} ghost triangles, {N} ghost nodes') for p in range(numprocs): points, vertices, boundary, _quantities, ghost_recv_dict, \ full_send_dict, _tri_map, _node_map, tri_l2g, node_l2g, \ ghost_layer_width = \ extract_submesh(submesh, triangles_per_proc, p2s_map, p) number_of_full_triangles = len(submesh['full_triangles'][p]) number_of_full_nodes = len(submesh['full_nodes'][p]) fname = os.path.join(partition_dir, f'{name}_mesh_P{numprocs}_{p}.nc') if verbose: print(f'sequential_mesh_dump: writing {fname}') _write_mesh_partition( fname, p, numprocs, points, vertices, boundary, ghost_recv_dict, full_send_dict, tri_l2g, node_l2g, number_of_full_triangles, number_of_full_nodes, number_of_global_triangles, number_of_global_nodes, ghost_layer_width, geo_ref) _release_mesh_submesh_rank(submesh, p) del points, vertices, boundary, tri_l2g, node_l2g del ghost_recv_dict, full_send_dict gc.collect() gc.collect()
[docs] def sequential_mesh_load(name, partition_dir='.', verbose=False): """Load this rank's mesh partition and return a bare :class:`Parallel_domain`. Reads the NetCDF4 file written by :func:`sequential_mesh_dump` for the calling MPI rank. No quantities are set; call ``domain.set_quantity()`` and ``domain.set_boundary()`` before evolving. Parameters ---------- name : str Base name passed to :func:`sequential_mesh_dump`. partition_dir : str or path-like Directory containing the partition files. verbose : bool Print progress messages if True. Returns ------- Parallel_domain Domain with mesh topology and halo structure initialised. All quantities are zero; boundary conditions are unset (``None``). """ import os import netCDF4 from anuga import myid, numprocs from anuga.coordinate_transforms.geo_reference import Geo_reference fname = os.path.join(partition_dir, f'{name}_mesh_P{numprocs}_{myid}.nc') if verbose: print(f'sequential_mesh_load: rank {myid} reading {fname}') with netCDF4.Dataset(fname, 'r') as nc: # --- scalar metadata --- number_of_full_triangles = int(nc.number_of_full_triangles) number_of_full_nodes = int(nc.number_of_full_nodes) number_of_global_triangles = int(nc.number_of_global_triangles) number_of_global_nodes = int(nc.number_of_global_nodes) ghost_layer_width = int(nc.ghost_layer_width) geo_ref = Geo_reference(NetCDFObject=nc) # --- mesh arrays --- points = num.array(nc['points'][:]) vertices = num.array(nc['vertices'][:]) tri_l2g = num.array(nc['tri_l2g'][:]) node_l2g = num.array(nc['node_l2g'][:]) # --- boundary dict --- bnd_tris = num.array(nc['boundary_tri'][:]) bnd_edges = num.array(nc['boundary_edge'][:]) raw_tags = nc['boundary_tag'][:] bnd_tags = [ b''.join(row).decode('ascii').rstrip('\x00') for row in raw_tags ] boundary = { (int(bnd_tris[i]), int(bnd_edges[i])): bnd_tags[i] for i in range(len(bnd_tris)) } # --- communication dicts (CSR → dict) --- def _read_comm_dict(nc, prefix): ranks = list(nc[f'{prefix}_ranks'][:].astype(int)) offsets = nc[f'{prefix}_offsets'][:].astype(int) local_ = num.array(nc[f'{prefix}_local'][:]) global_ = num.array(nc[f'{prefix}_global'][:]) d = {} for i, r in enumerate(ranks): s, e = offsets[i], offsets[i + 1] d[r] = [local_[s:e], global_[s:e]] return d full_send_dict = _read_comm_dict(nc, 'send') ghost_recv_dict = _read_comm_dict(nc, 'recv') domain = Parallel_domain( points, vertices, boundary, full_send_dict=full_send_dict, ghost_recv_dict=ghost_recv_dict, number_of_full_nodes=number_of_full_nodes, number_of_full_triangles=number_of_full_triangles, number_of_global_triangles=number_of_global_triangles, number_of_global_nodes=number_of_global_nodes, processor=myid, numproc=numprocs, s2p_map=None, p2s_map=None, tri_l2g=tri_l2g, node_l2g=node_l2g, ghost_layer_width=ghost_layer_width, geo_reference=geo_ref, ) # Register all boundary tags with None BCs (caller replaces these). boundary_map = {tag: None for tag in set(boundary.values())} boundary_map['ghost'] = None domain.set_boundary(boundary_map) return domain
# --------------------------------------------------------------------------- # Uniform mesh refinement # --------------------------------------------------------------------------- # ANUGA edge convention: edge j is opposite vertex j in triangle (v0, v1, v2). # Each parent boundary edge j maps to child boundary edges of two children. _EDGE_CHILD_INHERITANCE = { 0: ((1, 0), (2, 0)), # edge 0 (v1-v2) → child1 edge0, child2 edge0 1: ((0, 1), (2, 1)), # edge 1 (v0-v2) → child0 edge1, child2 edge1 2: ((0, 2), (1, 2)), # edge 2 (v0-v1) → child0 edge2, child1 edge2 }
[docs] def uniform_refine_domain(domain, verbose=False): """Uniformly refine a sequential domain; return a quantity-free Domain. Each triangle (v0, v1, v2) is replaced by four children using red refinement (connect edge midpoints): - child 0 = (v0, m01, m02) corner near v0 - child 1 = (m01, v1, m12) corner near v1 - child 2 = (m02, m12, v2) corner near v2 - child 3 = (m01, m12, m02) central where m01, m12, m02 are the midpoints of edges (v0,v1), (v1,v2), (v0,v2). Boundary tags are propagated to the two child edges that inherit each parent boundary edge. Quantities from *domain* are not copied. Parameters ---------- domain : Domain Sequential domain to refine. verbose : bool Print progress messages if True. Returns ------- Domain Refined domain with 4x triangles; no quantities set. """ from anuga import Domain triangles = domain.triangles # (N, 3) node indices nodes = domain.mesh.nodes # (M, 2) coordinates boundary = domain.boundary # {(tri, edge): tag} N = len(triangles) M = len(nodes) if verbose: print(f'uniform_refine_domain: {N} triangles, {M} nodes ' f'→ {4*N} triangles') # Assign a midpoint node ID to every unique edge. edge_to_mid = {} for v0, v1, v2 in triangles: for a, b in ((v0, v1), (v1, v2), (v0, v2)): e = (min(a, b), max(a, b)) if e not in edge_to_mid: edge_to_mid[e] = M + len(edge_to_mid) M_new = M + len(edge_to_mid) new_nodes = num.zeros((M_new, 2)) new_nodes[:M] = nodes for (la, lb), mid_id in edge_to_mid.items(): new_nodes[mid_id] = (nodes[la] + nodes[lb]) * 0.5 new_triangles = num.zeros((4 * N, 3), dtype='i4') for t, (v0, v1, v2) in enumerate(triangles): m01 = edge_to_mid[(min(v0, v1), max(v0, v1))] m12 = edge_to_mid[(min(v1, v2), max(v1, v2))] m02 = edge_to_mid[(min(v0, v2), max(v0, v2))] new_triangles[4*t + 0] = (v0, m01, m02) new_triangles[4*t + 1] = (m01, v1, m12) new_triangles[4*t + 2] = (m02, m12, v2) new_triangles[4*t + 3] = (m01, m12, m02) new_boundary = {} for (tri, edge_j), tag in boundary.items(): for child_idx, child_edge in _EDGE_CHILD_INHERITANCE[edge_j]: new_boundary[(4*tri + child_idx, child_edge)] = tag geo_ref = getattr(domain, 'geo_reference', None) kwargs = {'geo_reference': geo_ref} if geo_ref is not None else {} return Domain(new_nodes, new_triangles, new_boundary, **kwargs)
def _read_partition_nc(fname): """Read one partition NetCDF4 file; return a plain data dict.""" import netCDF4 from anuga.coordinate_transforms.geo_reference import Geo_reference with netCDF4.Dataset(fname, 'r') as nc: rank = int(nc.rank) numprocs = int(nc.numprocs) N_full = int(nc.number_of_full_triangles) M_full = int(nc.number_of_full_nodes) N_global = int(nc.number_of_global_triangles) M_global = int(nc.number_of_global_nodes) glw = int(nc.ghost_layer_width) geo_ref = Geo_reference(NetCDFObject=nc) points = num.array(nc['points'][:]) vertices = num.array(nc['vertices'][:], dtype='i4') tri_l2g = num.array(nc['tri_l2g'][:], dtype='i4') node_l2g = num.array(nc['node_l2g'][:], dtype='i4') bnd_tris = num.array(nc['boundary_tri'][:]) bnd_edges = num.array(nc['boundary_edge'][:]) raw_tags = nc['boundary_tag'][:] bnd_tags = [b''.join(row).decode('ascii').rstrip('\x00') for row in raw_tags] boundary = {(int(bnd_tris[i]), int(bnd_edges[i])): bnd_tags[i] for i in range(len(bnd_tris))} def _read_comm(nc, prefix): ranks_ = list(nc[f'{prefix}_ranks'][:].astype(int)) offsets = nc[f'{prefix}_offsets'][:].astype(int) local_ = num.array(nc[f'{prefix}_local'][:], dtype='i4') global_ = num.array(nc[f'{prefix}_global'][:], dtype='i4') d = {} for i, r in enumerate(ranks_): s, e = offsets[i], offsets[i + 1] d[r] = [local_[s:e], global_[s:e]] return d full_send_dict = _read_comm(nc, 'send') ghost_recv_dict = _read_comm(nc, 'recv') return dict( rank=rank, numprocs=numprocs, points=points, vertices=vertices, tri_l2g=tri_l2g, node_l2g=node_l2g, boundary=boundary, ghost_recv_dict=ghost_recv_dict, full_send_dict=full_send_dict, number_of_full_triangles=N_full, number_of_full_nodes=M_full, number_of_global_triangles=N_global, number_of_global_nodes=M_global, ghost_layer_width=glw, geo_ref=geo_ref, ) def _refine_partition_worker(args): """Refine one partition file (Pass 2 of _refine_one_level). Designed as a module-level function so it can be pickled for ``concurrent.futures.ProcessPoolExecutor``. Global edge data is passed as sorted numpy arrays and looked up with binary search to avoid pickling large Python dicts. Parameters (packed into *args* tuple) -------------------------------------- p : int Partition rank. numprocs : int fname_in, fname_out : str edge_keys : ndarray int64 Encoded global edge keys (ga * M_enc + gb), sorted. mid_global_ids : ndarray int64 Global midpoint IDs in the same order as *edge_keys*. full_rank_arr : ndarray int32 Owning rank for each edge (-1 if not owned by any full triangle). M_enc : int Encoding multiplier (max global node ID + 1). M_global_new, N_global_new : int Post-refinement global counts. verbose : bool """ import numpy as _np (p, numprocs, fname_in, fname_out, edge_keys, mid_global_ids, full_rank_arr, M_enc, M_global_new, N_global_new, verbose) = args data = _read_partition_nc(fname_in) points = data['points'] vertices = data['vertices'] tri_l2g = data['tri_l2g'] node_l2g = data['node_l2g'] boundary = data['boundary'] ghost_recv = data['ghost_recv_dict'] full_send = data['full_send_dict'] N_full = data['number_of_full_triangles'] M_full = data['number_of_full_nodes'] glw = data['ghost_layer_width'] geo_ref = data['geo_ref'] N_local = len(vertices) M_local = len(points) M_ghost = M_local - M_full def _enc(ga, gb): return int(ga) * M_enc + int(gb) def _mid_global(encoded): idx = int(_np.searchsorted(edge_keys, encoded)) return int(mid_global_ids[idx]) def _owner(encoded): idx = int(_np.searchsorted(edge_keys, encoded)) return int(full_rank_arr[idx]) # Collect all unique local edges and their encoded global keys. local_edges_map = {} # (la, lb) → encoded global key for col_a, col_b in ((0, 1), (1, 2), (0, 2)): la_arr = _np.minimum(vertices[:, col_a], vertices[:, col_b]) lb_arr = _np.maximum(vertices[:, col_a], vertices[:, col_b]) ga_arr = _np.minimum(node_l2g[la_arr], node_l2g[lb_arr]) gb_arr = _np.maximum(node_l2g[la_arr], node_l2g[lb_arr]) for k in range(N_local): le = (int(la_arr[k]), int(lb_arr[k])) if le not in local_edges_map: local_edges_map[le] = _enc(ga_arr[k], gb_arr[k]) # Classify: full (owned by this rank) vs ghost midpoints. full_mid_list = [] # [(le, encoded_ge), ...] ghost_mid_list = [] for le, enc_ge in local_edges_map.items(): if _owner(enc_ge) == p: full_mid_list.append((le, enc_ge)) else: ghost_mid_list.append((le, enc_ge)) full_mid_list.sort(key=lambda x: _mid_global(x[1])) ghost_mid_list.sort(key=lambda x: _mid_global(x[1])) n_full_mids = len(full_mid_list) n_ghost_mids = len(ghost_mid_list) M_full_new = M_full + n_full_mids M_local_new = M_local + n_full_mids + n_ghost_mids N_full_new = 4 * N_full # Local midpoint IDs: # 0..M_full-1 original full nodes (unchanged) # M_full..M_full_new-1 new full midpoints # M_full_new..M_full_new+M_ghost-1 original ghost nodes (shifted) # M_full_new+M_ghost.. ghost-only midpoints edge_to_local_mid = {} for k, (le, _enc_ge) in enumerate(full_mid_list): edge_to_local_mid[le] = M_full + k for k, (le, _enc_ge) in enumerate(ghost_mid_list): edge_to_local_mid[le] = M_full_new + M_ghost + k new_points = _np.zeros((M_local_new, 2)) new_points[:M_full] = points[:M_full] for k, ((la, lb), _enc_ge) in enumerate(full_mid_list): new_points[M_full + k] = (points[la] + points[lb]) * 0.5 new_points[M_full_new: M_full_new + M_ghost] = points[M_full:] for k, ((la, lb), _enc_ge) in enumerate(ghost_mid_list): new_points[M_full_new + M_ghost + k] = (points[la] + points[lb]) * 0.5 new_node_l2g = _np.zeros(M_local_new, dtype='i4') new_node_l2g[:M_full] = node_l2g[:M_full] for k, (_le, enc_ge) in enumerate(full_mid_list): new_node_l2g[M_full + k] = _mid_global(enc_ge) new_node_l2g[M_full_new: M_full_new + M_ghost] = node_l2g[M_full:] for k, (_le, enc_ge) in enumerate(ghost_mid_list): new_node_l2g[M_full_new + M_ghost + k] = _mid_global(enc_ge) new_vertices = _np.zeros((4 * N_local, 3), dtype='i4') for t in range(N_local): v0 = int(vertices[t, 0]) v1 = int(vertices[t, 1]) v2 = int(vertices[t, 2]) rv0 = v0 if v0 < M_full else v0 + n_full_mids rv1 = v1 if v1 < M_full else v1 + n_full_mids rv2 = v2 if v2 < M_full else v2 + n_full_mids m01 = edge_to_local_mid[(min(v0, v1), max(v0, v1))] m12 = edge_to_local_mid[(min(v1, v2), max(v1, v2))] m02 = edge_to_local_mid[(min(v0, v2), max(v0, v2))] new_vertices[4*t + 0] = (rv0, m01, m02) new_vertices[4*t + 1] = (m01, rv1, m12) new_vertices[4*t + 2] = (m02, m12, rv2) new_vertices[4*t + 3] = (m01, m12, m02) new_tri_l2g = ( 4 * tri_l2g[:, None] + _np.arange(4, dtype='i4')[None, :] ).flatten().astype('i4') new_boundary = {} for (tri_loc, edge_j), tag in boundary.items(): for child_idx, child_edge in _EDGE_CHILD_INHERITANCE[edge_j]: new_boundary[(4 * tri_loc + child_idx, child_edge)] = tag def _expand_comm(d): out = {} for rank_r, (la_arr, ga_arr) in d.items(): la = _np.asarray(la_arr, dtype='i4') ga = _np.asarray(ga_arr, dtype='i4') c = _np.arange(4, dtype='i4') out[rank_r] = [ (4 * la[:, None] + c[None, :]).flatten(), (4 * ga[:, None] + c[None, :]).flatten(), ] return out new_ghost_recv = _expand_comm(ghost_recv) new_full_send = _expand_comm(full_send) _write_mesh_partition( fname_out, p, numprocs, new_points, new_vertices, new_boundary, new_ghost_recv, new_full_send, new_tri_l2g, new_node_l2g, N_full_new, M_full_new, N_global_new, M_global_new, glw, geo_ref) if verbose: print(f'_refine_one_level: wrote {fname_out} ' f'({N_full_new} full tri, {M_full_new} full nodes)') def _refine_one_level(input_name, numprocs, output_name, input_dir, output_dir, verbose=False, num_workers=1): """Refine all partition files by one level (each triangle → 4 children). Two-pass algorithm: 1. Read all partition files and collect every unique global edge. Determine midpoint ownership: a midpoint is owned by the lowest-rank partition that has the edge in one of its full triangles. 2. For each partition, compute the refined mesh arrays and write a new partition file. When *num_workers* > 1 the per-partition work runs in parallel via ``concurrent.futures.ProcessPoolExecutor``. Parameters ---------- input_name, output_name : str Base names for input/output partition files. numprocs : int input_dir, output_dir : str or path-like verbose : bool num_workers : int Number of worker processes for Pass 2. 1 (default) uses the calling process; N > 1 spawns up to N workers in parallel. """ import os os.makedirs(output_dir, exist_ok=True) # ------------------------------------------------------------------ # Pass 1: collect global edges and assign midpoint ownership. # ------------------------------------------------------------------ all_global_edges = set() edge_full_rank = {} # global_edge → lowest rank with it in a full tri M_global_in = None N_global_in = None for p in range(numprocs): fname = os.path.join(input_dir, f'{input_name}_mesh_P{numprocs}_{p}.nc') if verbose: print(f'_refine_one_level: pass 1 reading {fname}') data = _read_partition_nc(fname) if M_global_in is None: M_global_in = data['number_of_global_nodes'] N_global_in = data['number_of_global_triangles'] vertices_p = data['vertices'] # (N_local, 3) local IDs node_l2g_p = data['node_l2g'] # (M_local,) global IDs N_full_p = data['number_of_full_triangles'] vg = node_l2g_p[vertices_p] # (N_local, 3) global IDs for col_a, col_b in ((0, 1), (1, 2), (0, 2)): ea = num.minimum(vg[:, col_a], vg[:, col_b]) eb = num.maximum(vg[:, col_a], vg[:, col_b]) all_global_edges.update(zip(ea.tolist(), eb.tolist())) # Full-triangle edges → candidate ownership for rank p. vg_f = vg[:N_full_p] for col_a, col_b in ((0, 1), (1, 2), (0, 2)): ea = num.minimum(vg_f[:, col_a], vg_f[:, col_b]) eb = num.maximum(vg_f[:, col_a], vg_f[:, col_b]) for ga, gb in zip(ea.tolist(), eb.tolist()): ge = (ga, gb) if ge not in edge_full_rank or edge_full_rank[ge] > p: edge_full_rank[ge] = p sorted_edges = sorted(all_global_edges) M_global_new = M_global_in + len(sorted_edges) N_global_new = 4 * N_global_in if verbose: print(f'_refine_one_level: global tri {N_global_in}{N_global_new}, ' f'nodes {M_global_in}{M_global_new}') # Build sorted numpy arrays for efficient pickling + binary-search lookup. M_enc = int(M_global_in) + 1 # encoding multiplier (> any node ID) if sorted_edges: _se = num.array(sorted_edges, dtype='i8') edge_keys = _se[:, 0] * M_enc + _se[:, 1] # sorted by construction mid_global_ids = num.arange(len(sorted_edges), dtype='i8') + M_global_in full_rank_arr = num.full(len(sorted_edges), -1, dtype='i4') if edge_full_rank: _fr_enc = num.array( [ga * M_enc + gb for (ga, gb) in edge_full_rank.keys()], dtype='i8') _fr_rk = num.array(list(edge_full_rank.values()), dtype='i4') _idx = num.searchsorted(edge_keys, _fr_enc) full_rank_arr[_idx] = _fr_rk else: edge_keys = num.empty(0, dtype='i8') mid_global_ids = num.empty(0, dtype='i8') full_rank_arr = num.empty(0, dtype='i4') # ------------------------------------------------------------------ # Pass 2: refine each partition (embarrassingly parallel). # ------------------------------------------------------------------ worker_args = [ (p, numprocs, os.path.join(input_dir, f'{input_name}_mesh_P{numprocs}_{p}.nc'), os.path.join(output_dir, f'{output_name}_mesh_P{numprocs}_{p}.nc'), edge_keys, mid_global_ids, full_rank_arr, M_enc, M_global_new, N_global_new, verbose) for p in range(numprocs) ] if num_workers == 1: for args in worker_args: _refine_partition_worker(args) else: import concurrent.futures import multiprocessing import warnings actual_workers = min(num_workers, numprocs) # mpi4py changes the default start method to 'forkserver'. Both # 'forkserver' and 'spawn' re-execute/import __main__ in each # worker, which breaks (RuntimeError or deadlock) when the caller # is an unguarded top-level script — the common ANUGA usage. So # use 'fork' explicitly on POSIX (safe here because workers do no # MPI); fall back to None (system default) on Windows where fork # is unavailable. Python 3.12+ warns that forking a process with # live threads (OpenMP from the anuga extensions) may deadlock; # the workers only call module-level numpy/netCDF code, so # suppress that specific DeprecationWarning. mp_ctx = (multiprocessing.get_context('fork') if hasattr(os, 'fork') else None) with warnings.catch_warnings(): warnings.filterwarnings( 'ignore', message=r'.*use of fork\(\) may lead to deadlocks.*', category=DeprecationWarning) with concurrent.futures.ProcessPoolExecutor( max_workers=actual_workers, mp_context=mp_ctx) as executor: list(executor.map(_refine_partition_worker, worker_args))
[docs] def sequential_mesh_refine(name, numprocs, levels=1, output_name=None, partition_dir='.', output_dir=None, verbose=False, num_workers=1): """Uniformly refine pre-computed partition files by one or more levels. Each refinement level replaces every triangle with four children using red refinement (connect edge midpoints). After *levels* refinements the mesh has 4^levels times as many triangles. The output files can be loaded with :func:`sequential_mesh_load`. Parameters ---------- name : str Base name of the input partition files (written by :func:`sequential_mesh_dump`). numprocs : int Number of partitions. levels : int Number of refinement levels. Default is 1. output_name : str, optional Base name for output files. Defaults to *name*. partition_dir : str Directory containing the input partition files. output_dir : str, optional Directory for output files. Defaults to *partition_dir*. verbose : bool Print progress messages if True. num_workers : int Number of worker processes for the per-partition refinement step. 1 (default) is single-process; N > 1 runs up to N partitions in parallel via ``concurrent.futures.ProcessPoolExecutor``. """ import os import shutil import tempfile if output_name is None: output_name = name if output_dir is None: output_dir = partition_dir if levels <= 0: return os.makedirs(output_dir, exist_ok=True) if levels == 1: _refine_one_level(name, numprocs, output_name, partition_dir, output_dir, verbose=verbose, num_workers=num_workers) return # Multi-level: pipe each level's output into the next via temp dirs. tmp_dirs = [] current_name = name current_dir = partition_dir try: for level in range(levels): is_last = (level == levels - 1) out_name = output_name if is_last else 'refined' out_dir = output_dir if is_last else tempfile.mkdtemp() if not is_last: tmp_dirs.append(out_dir) if verbose: print(f'sequential_mesh_refine: level {level + 1} / {levels}') _refine_one_level(current_name, numprocs, out_name, current_dir, out_dir, verbose=verbose, num_workers=num_workers) current_name = out_name current_dir = out_dir finally: for d in tmp_dirs: shutil.rmtree(d, ignore_errors=True)
[docs] def create_parallel_mesh(domain, numprocs, refinement_levels=0, name=None, partition_dir='.', verbose=False, parameters=None, num_workers=1): """Partition a mesh and optionally refine it offline. Combines :func:`sequential_mesh_dump` and :func:`sequential_mesh_refine` into a single call. A coarse mesh is partitioned on a single process, then each partition is uniformly refined *refinement_levels* times. The resulting files are loaded at run-time with :func:`sequential_mesh_load`. Parameters ---------- domain : Domain or Basic_mesh Sequential domain or lightweight :class:`Basic_mesh` to partition. Quantities present on a Domain are not stored in the partition files. numprocs : int Number of partitions. refinement_levels : int Number of uniform refinement levels. 0 means no refinement (just partition and dump). name : str, optional Base name for the partition files. Defaults to ``domain.get_name()`` when available, otherwise ``'mesh'``. partition_dir : str Output directory for the final partition files. verbose : bool Print progress messages if True. parameters : dict, optional Forwarded to :func:`sequential_mesh_dump`. num_workers : int Number of worker processes for the per-partition refinement step. 1 (default) is single-process; N > 1 uses ``concurrent.futures.ProcessPoolExecutor``. Returns ------- str The *name* used for the partition files (pass to :func:`sequential_mesh_load`). """ import os import tempfile if name is None: name = domain.get_name() if hasattr(domain, 'get_name') else 'mesh' os.makedirs(partition_dir, exist_ok=True) if refinement_levels == 0: sequential_mesh_dump(domain, numprocs, partition_dir=partition_dir, name=name, verbose=verbose, parameters=parameters) else: with tempfile.TemporaryDirectory() as tmpdir: sequential_mesh_dump(domain, numprocs, partition_dir=tmpdir, name=name, verbose=verbose, parameters=parameters) sequential_mesh_refine(name, numprocs, levels=refinement_levels, output_name=name, partition_dir=tmpdir, output_dir=partition_dir, verbose=verbose, num_workers=num_workers) return name