Source code for nrv.fmod.FEM.fenics_utils._FEMResults

"""
NRV-:class:`.FEMResults` handling.
"""

import gmsh
import numpy as np
import scipy
from time import perf_counter

from dolfinx.fem import Expression, Function, functionspace
from dolfinx.io.gmshio import model_to_mesh
from dolfinx.io.utils import XDMFFile, VTXWriter, VTKFile
from dolfinx.cpp.mesh import entities_to_geometry
from dolfinx.geometry import (
    bb_tree,
    compute_colliding_cells,
    compute_collisions_points,
    compute_closest_entity,
    compute_distance_gjk,
    create_midpoint_tree,
)
from mpi4py.MPI import COMM_WORLD


from ....backend._file_handler import rmv_ext
from ....backend._log_interface import rise_error, rise_warning
from ....backend._parameters import parameters
from ....backend._NRV_Class import NRV_class
from ..mesh_creator._MshCreator import (
    is_MshCreator,
    clear_gmsh,
)


###############
## Functions ##
###############
[docs] def is_sim_res(result): """ check if an object is a FEMResults, return True if yes, else False Parameters ---------- result : object object to test Returns ------- bool True it the type is a FEMResults object """ return isinstance(result, FEMResults)
[docs] def save_sim_res_list(sim_res_list, fname, dt=1.0): r""" save a list of SimResults in a .bp folder which can be open with PrarView Parameters ---------- sim_res_list : list(FEMResults) list of :class:`.FEMResults` to be saved fname : str File name and path ftype : str - if True .bp folder, else saved in a .xdmf file. dt : float time step between each results Warning ------- For this function to work dolfinx and ParaView must be up to date: - dolfinx $\\geq$ 0.8.0 - ParaView $\\geq$ 5.12.0 """ N_list = len(sim_res_list) fname = rmv_ext(fname) + ".bp" vcp = sim_res_list[0].vout vtxf = VTXWriter(sim_res_list[0].domain.comm, fname, vcp) for i in range(N_list): vcp = sim_res_list[i].vout vtxf.write(i * dt) vtxf.close()
[docs] def read_gmsh(mesh, comm=COMM_WORLD, rank=0, gdim=3): """ Given a mesh_file or a MeshCreator returns mesh cell_tags and facet_tags (see dolfinx.io.gmshio.model_to_mesh for more details) NB: copy of dolfinx.io.gmshio.read_from_msh verbose from gmsh Parameters ---------- mesh_file : str, Path and name of mesh file NB: extention is not required but must be a .msh file comm : tupple (str, int) The MPI communicator to use for mesh creation rank : tupple (str, int) The rank the Gmsh model is initialized on gdim : int dimension of the mesh, by default 3 Returns ------- output : tuple(3) (domain, cell_tag, facet_tag) """ if isinstance(mesh, str): mesh_file = rmv_ext(mesh) + ".msh" clear_gmsh() gmsh.initialize() gmsh.option.setNumber("General.Verbosity", 2) gmsh.model.add("Mesh from file") gmsh.merge(mesh_file) output = model_to_mesh(gmsh.model, comm=comm, rank=rank, gdim=gdim) gmsh.finalize() elif is_MshCreator(mesh): output = model_to_mesh(mesh.model, comm=comm, rank=rank, gdim=gdim) else: rise_error("mesh should be either a filename or a MeshCreator") return output
[docs] def domain_from_meshfile(mesh_file): """ return only the domain from mesh_file Parameters ---------- mesh_file : str Path and name of mesh file NB: extention is not required but must be a .msh file Returns ------- output : domain of the mesh contain in the mesh_file """ return read_gmsh(mesh_file)[0]
[docs] def V_from_meshfile(mesh_file, elem=("Lagrange", 1)): """ Build a function space directly from a mesh file. Parameters ---------- mesh_file : str Mesh filename. elem : tuple, optional Finite-element family and order. Returns ------- dolfinx.fem.FunctionSpace Function space defined on the loaded mesh. """ mesh = domain_from_meshfile(mesh_file) V = functionspace(mesh, elem) return V
[docs] def closest_point_in_mesh(mesh, point, tree, tdim, midpoint_tree): """ Find the closest point on a mesh entity when a query point lies outside the mesh. Parameters ---------- mesh : Any Mesh object. point : array-like Query point coordinates. tree : Any Bounding-box tree for the mesh. tdim : int Topological dimension. midpoint_tree : Any Midpoint tree used by the nearest-entity search. Returns ------- tuple Closest entity identifier and closest point coordinates. """ points = np.reshape(point, (1, 3)) entity = compute_closest_entity(tree, midpoint_tree, mesh, points) mesh_geom = mesh.geometry.x geom_dofs = entities_to_geometry(mesh._cpp_object, tdim, entity, False) mesh_nodes = mesh_geom[geom_dofs][0] displacement = compute_distance_gjk(points, mesh_nodes) return entity, points[0] - displacement
[docs] class FEMResults(NRV_class): """ Result of a FEMSimulation. Store the resulting function space giving the possibility to apply basic mathematical operations on multiple results """
[docs] def __init__( self, mesh_file="", domain=None, elem=("Lagrange", 1), V=None, vout=None, comm=COMM_WORLD, ): """ initialisation of the FEMParameters: Parameters ---------- mesh_file :str mesh directory and file name: by default "" domain : None or mesh mesh domain on which the result is defined, by default None elem :tupple (str, int) if None, ("Lagrange", 1), else (element type, element order), by default None V : None or dolfinx.fem.functionspace functionspace on which the result is defined, by default None vout : None or dolfinx.fem.Function Function resulting from the FEMSimulation, by default None comm :int The MPI communicator to use for mesh creation, by default MPI.COMM_WORLD """ super().__init__() self.type = "simresult" self.mesh_file = mesh_file self.domain = domain self.V = V self.elem = elem self.vout = vout self.comm = comm # For eval() serialized calls acceleration self.is_evaluated = False self.tdim = None self.n_entities_local = None self.entities = None self.midpoint_tree = None self.tree = None
[docs] def set_sim_result( self, mesh_file="", domain=None, V=None, elem=None, vout=None, comm=None ): """ Update the FEM result metadata and underlying function objects. """ if mesh_file != "": self.mesh_file = mesh_file if domain is not None: self.domain = domain if elem is not None: self.elem = elem if V is not None: self.V = V if vout is not None: self.vout = vout self.comm = comm
[docs] def save(self, file, ftype="vtx", overwrite=True, t=0.0): """ Save the FEM result to disk. Parameters ---------- file : str Output filename without mandatory extension. ftype : str, optional Storage format, such as ``"vtx"``, ``"xdmf"``, or the legacy ``.sres`` format. overwrite : bool, optional Control mesh rewriting for XDMF output. t : float, optional Time stamp written with transient outputs. """ fname = rmv_ext(file) if ftype.lower() == "vtx": # and dfx_utd: fname += ".bp" with VTXWriter(self.domain.comm, fname, self.vout, "bp4") as f: f.write(t) elif ftype == "xdmf": fname += ".xdmf" with XDMFFile(self.comm, fname, "w") as file: if not overwrite: file.parameters.update( {"functions_share_mesh": True, "rewrite_function_mesh": False} ) else: file.write_mesh(self.domain) file.write_function(self.vout) else: fname += ".sres" mdict = { "mesh_file": self.mesh_file, "element": self.elem, "vout": self.vector, } scipy.io.savemat(fname, mdict) return mdict
[docs] def load(self, file): """ Load a FEM result from the legacy ``.sres`` format. Parameters ---------- file : str Input filename without mandatory extension. """ fname = rmv_ext(file) + ".sres" mdict = scipy.io.loadmat(fname) self.mesh_file = mdict["mesh_file"][0] self.elem = (mdict["element"][0].strip(), int(mdict["element"][1])) if self.domain is None: self.domain = domain_from_meshfile(self.mesh_file) self.V = functionspace(self.domain, self.elem) self.vout = Function(self.V) self.vout.x.array[:] = mdict["vout"]
[docs] def save_sim_result(self, file, ftype="vtx", overwrite=True): """ Deprecated alias of :meth:`save`. """ rise_warning("save_sim_result is a deprecated method use save") self.save(file=file, ftype=ftype, overwrite=overwrite)
[docs] def load_sim_result(self, data="sim_result.json"): """ Deprecated alias of :meth:`load`. """ rise_warning("load_sim_result is a deprecated method use load") # self.load(data=data) # FIXME: data does not exist yet in load method self.load(file=data)
############# ## methods ## ############# @property def vector(self): """ Raw coefficient vector of the stored FEM function. Returns ------- np.ndarray Underlying array of degrees of freedom. """ return self.vout.x.array
[docs] def aline_V(self, res2): """ Change the function space of the result to aline with the result of another FEMResults Parameters ---------- res2 : FEMResults result to aline with """ if is_sim_res(res2): if self.mesh_file == res2.mesh_file: _vout = Function(res2.V) _vout.interpolate(self.vout) self.vout = _vout self.V = res2.V else: rise_error( "To aline mesh function reslults must have the same meshfile" ) else: rise_error("Mesh function alinment must be done with FEMResults")
[docs] def clone_res(self): """ Create an empty FEMResults shell sharing the same mesh/function-space metadata. Returns ------- FEMResults Cloned result container without copying field values. """ return FEMResults( mesh_file=self.mesh_file, V=self.V, domain=self.domain, elem=self.elem, comm=self.comm, )
[docs] def eval(self, X, is_multi_proc=False): """ Eval the result field at X position """ X = np.array(X) N = len(X) cells = [] points_on_proc = [] if self.is_evaluated is False: self.is_evaluated = True self.tdim = self.domain.geometry.dim self.n_entities_local = ( self.domain.topology.index_map(self.tdim).size_local + self.domain.topology.index_map(self.tdim).num_ghosts ) self.entities = np.arange(self.n_entities_local, dtype=np.int32) self.midpoint_tree = create_midpoint_tree( self.domain, self.tdim, self.entities ) self.tree = bb_tree(self.domain, self.tdim) # Find cells whose bounding-box collide with the the points cells_candidates = compute_collisions_points(self.tree, X) # Choose one of the cells that contains the point cells_colliding = compute_colliding_cells(self.domain, cells_candidates, X) for i in range(N): cell = cells_colliding.links(i) # point not in the mesh if len(cell) == 0: cell, x_closest = closest_point_in_mesh( self.domain, X[i], self.tree, self.tdim, self.midpoint_tree ) rise_warning( X[i], " not found in mesh, value of ", x_closest, " reused" ) # compute_colliding_cells(self.domain, cells_candidates, X) cells += [cell[0]] else: cells += [cell[0]] values = self.vout.eval(X, cells) if N > 1: values = values[:, 0] return values
##################### ## special methods ## ##################### def __abs__(self): """ Return a result containing the absolute value of the field coefficients. """ res = self.clone_res() if self.vout is not None: res.vout = Function(self.V) res.vout.x.array[:] = abs(self.vout.x.array) return res def __neg__(self): """ Return a result containing the negated field coefficients. """ res = self.clone_res() if self.vout is not None: res.vout = Function(self.V) res.vout.x.array[:] = -self.vout.x.array return res def __add__(self, b): """ Add another result or scalar to this FEM field. """ if is_sim_res(b): self.aline_V(b) B = b.vout.x.array else: B = b A = self.vout.x.array C = self.clone_res() C.vout = Function(self.V) C.vout.x.array[:] = A + B return C def __sub__(self, b): """ Subtract another result or scalar from this FEM field. """ if is_sim_res(b): self.aline_V(b) B = b.vout.x.array else: B = b A = self.vout.x.array C = self.clone_res() C.vout = Function(self.V) C.vout.x.array[:] = A - B return C def __mul__(self, b): """ Multiply this FEM field by another result or scalar. """ if is_sim_res(b): self.aline_V(b) B = b.vout.x.array else: B = b A = self.vout.x.array C = self.clone_res() C.vout = Function(self.V) C.vout.x.array[:] = A * B return C def __radd__(self, b): """ Right-handed addition with another result or scalar. """ if is_sim_res(b): self.aline_V(b) B = b.vout.x.array else: B = b A = self.vout.x.array C = self.clone_res() C.vout = Function(self.V) C.vout.x.array[:] = B + A return C def __rsub__(self, b): """ Right-handed subtraction with another result or scalar. """ if is_sim_res(b): self.aline_V(b) B = b.vout.x.array else: B = b A = self.vout.x.array C = self.clone_res() C.vout = Function(self.V) C.vout.x.array[:] = B - A return C def __rmul__(self, b): """ Right-handed multiplication with another result or scalar. """ if is_sim_res(b): self.aline_V(b) B = b.vout.x.array else: B = b A = self.vout.x.array C = self.clone_res() C.vout = Function(self.V) C.vout.x.array[:] = B * A return C def __eq__(self, b): """ Compare two FEM results for mesh compatibility and coefficient equality. """ if is_sim_res(b): if b.mesh_file == self.mesh_file: if np.allclose(self.vector, b.vector): return True return False def __ne__(self, b): # self != b """ Return the logical negation of :meth:`__eq__`. """ return not self == b