Source code for nrv.fmod.FEM.fenics_utils._FEMSimulation

"""
NRV-:class:`.FEMSimulation` handling.
"""

import os
import time
import numpy as np
import sys

from dolfinx import default_scalar_type
from dolfinx.fem import (
    Constant,
    Function,
    functionspace,
    assemble_scalar,
    dirichletbc,
    form,
    locate_dofs_topological,
)
from dolfinx.fem.petsc import LinearProblem
from mpi4py.MPI import COMM_WORLD, COMM_SELF, SUM
from petsc4py.PETSc import ScalarType, Viewer
from ufl import (
    Measure,
    TestFunction,
    TrialFunction,
    avg,
    inner,
    nabla_grad,
    CellDiameter,
)
from basix.ufl import element, mixed_element


from ....backend._log_interface import pass_info, rise_error, rise_warning
from ....backend._file_handler import rmv_ext
from ....utils._units import S, V, m
from ._fenics_materials import fenics_material
from ._FEMParameters import FEMParameters
from ._FEMResults import read_gmsh, FEMResults

# Lists of available solvers and conditioners
# go to https://petsc4py.readthedocs.io/en/stable/manual/ksp/ for more info

ksp_type_list = [
    "cg",
    "pipecg",
    "chebyshev",
    "groppcg",
    "pipecgrr",
    "cgne",
    "fcg",
    "cgls",
    "pipefcg",
    "nash",
    "stcg",
    "stcg",
    "qcg",
    "bicg",
    "bcgs",
    "ibcgs",
    "fbcgs",
    "fbcgsr",
    "symmlq",
    "bcgsl",
    "minres",
    "gmres",
    "fgmres",
    "dgmres",
    "pgmres",
    "pipefgmres",
    "lgmres",
    "cr",
    "gcr",
    "pipecr",
    "pipegcr",
    "fetidp",
    "cgs",
    "tfqmr",
    "tcqmr",
    "lsqr",
    "tsirm",
    "python",
    "preonly",
]

pc_type_list = [
    "lu",
    "ilu",
    "gamg",
    "jacobi",
    "sor",
    "eisenstat",
    "icc",
    "asm",
    "gasm",
    "bddc",
    "ksp",
    "composite",
    "cholesky",
    "none",
    "shell",
    "hypre",
]


[docs] class FEMSimulation(FEMParameters): r""" Class usefull to solve the Static/Quasi-Static electrical current problem using FEM with FEniCSx algorithms (https://fenicsproject.org). .. math:: \nabla \mathbf{j}(\mathbf{r}) = 0 .. math:: \mathbf{j}(\mathbf{r}) = \mathbf{\sigma} (\mathbf{r}) \nabla V (\mathbf{r}), \forall \mathbf{r} \in \Omega Where $\\Omega$ is the simulation space, $\\bf{j}$ the the current density and $V$ the electrical potential The problem parameters (domains and boundaries condition) can be define using FEMParameters methods Contains methods to setup the matrix sytstem, to solve it and to access the results. Parameters ---------- D : int dim of the mesh, by default 3 NB: only 3 is implemented mesh_file : str mesh directory and file name: by default "" mesh : None or MshCreator if not None, (MshCreator) from which the mesh sould be used, by default None data : str, dict or FEMParameters if not None, load FEMParameters attribute from data, by default None elem :tupple (str, int) if None, ("Lagrange", 1), else (element type, element order), by default None ummesh : bool if True the scale of mesh space dimensions should be (um), else (m), by default True Usefull to link the update materials conductivity as in NRV conductivities are in S/m but NerveMshCreator space scale is um) comm : int The MPI communicator to use for mesh creation, by default COMM_SELF rank : int The rank the Gmsh model is initialized on, by default 0 """
[docs] def __init__( self, D=3, mesh_file="", mesh=None, data=None, elem=None, ummesh=True, comm=COMM_SELF, rank=0, ): """ initialisation of the FEMSimulation """ super().__init__(D=D, mesh_file=mesh_file, data=data) if elem is not None: self.elem = elem else: self.elem = ("Lagrange", 1) self.multi_elem = None # Mesh and Meshtag self.mesh = mesh if self.mesh is None: self.mesh = self.mesh_file self.domain = None self.subdomains = None self.boundaries = None # Space and mesure self.V = None self.V_DG = None self.dS = None self.ds = None self.dx = None if ummesh: self.mat_unit = "S/um" else: self.mat_unit = "S/m" # Multimesh parmeters self.Nspace = 1 # Bilinear and linear forms self.a = None self.L = None self.mat_list_ID = [] self.sigma_list = [] self.mat_list = [] self.mat_list_space = [] self.mixedvouts = [] # gather fen_mat name of all domain and internal layer self.mat_map = {} self.Neuman_list = {} # Solver parameters self.petsc_opt = { "ksp_type": "cg", "pc_type": "ilu", "ksp_rtol": 1e-4, "ksp_atol": 1e-7, "ksp_max_it": 1000, } self.cg_problem = None self.dg_problem = None self.u = None self.v = None self.mixedvout = None self.vout = None self.result = None # Mcore attributes self.comm = comm self.rank = rank # Timmers self.solving_timer = 0 self.bcs = [] # added for overzriting false option self.file = [] self.data_status = True self.domain_status = False self.material_map_satus = False self.dirichlet_BC_status = False self.neumann_BC_status = False self.bilinear_form_status = False self.jump_status = False self.linear_form_status = False self.setup_status = False self.solve_status = False self.to_merge = True
##################################################### ########### SimParameter methods overload ########### #####################################################
[docs] def add_inboundary( self, mesh_domain, in_domains, thickness, mat_pty=None, mat_file=None, mat_perm=None, ID=None, ): """ Add or update one internal boundary layer in the simulation definition. """ super().add_inboundary( mesh_domain, in_domains, thickness, mat_pty, mat_file, mat_perm, ID, ) if self.setup_status: if mesh_domain in self.mat_map: self.mat_map[mesh_domain].update_mat(mat_pty) else: rise_warning( "Domain not added: new domain cannot be added between 2 simulations,\ (set domain before simulation or create a new one)" )
[docs] def add_domain( self, mesh_domain, mat_pty=None, mat_file=None, mat_perm=None, ID=None ): """ Add or update one material domain in the simulation definition. """ super().add_domain(mesh_domain, mat_pty, mat_file, mat_perm, ID) if self.setup_status: if mesh_domain in self.mat_map: self.mat_map[mesh_domain].update_mat(mat_pty) else: rise_warning( "Domain not added: new domain cannot be added between 2 simulations,\ (set domain before simulation or create a new one)" )
##################################################### ############ setup the matrix sytstem ############# #####################################################
[docs] def setup_sim(self, **kwargs): """ setup Bilinear form, Linear form and boundary conditions using paramters and kwargs to set the variable defined Neumann boundary conditions (see FEMParameters.add_boundary) If FEMSimulation already defined, can be used to modify variable defined NBC """ t0 = time.time() if self.data_status: self.args = kwargs pass_info("Static/Quasi-Static electrical current problem") if self.D == 1: rise_error("1D not implemented yet") else: # Initialize the domain if not self.domain_status: self.__init_domain() # DIRICHLET BOUNDARY CONDITIONS if not self.dirichlet_BC_status: self.__set_dirichlet_BC() # DEFINING THE VARIATIONAL PROBLEM self.v = TrialFunction(self.V) self.u = TestFunction(self.V) # defining the bilinear form a(vout,u) if not self.bilinear_form_status: self.__set_bilinear_form() # defining the linear form L(u) # NB1: in the case of the current-problem, the source term is nul if not self.linear_form_status: self.__set_linear_form() # NEUMANN BOUNDARY CONDITIONS self.__set_neumann_BC() self.setup_status = True self.solving_timer += time.time() - t0 else: rise_warning("no parameters are set, Sim can not be setup")
def __init_domain(self): """ internal use only: """ if not self.domain_status: # RECOVERING THE GEOMETRY self.domain, self.subdomains, self.boundaries = read_gmsh( self.mesh, comm=self.comm, rank=self.rank, gdim=self.D ) # SPACE FOR INTEGRATION if self.inbound: self.Nspace = self.Ninboundaries + 1 ME = [ element( self.elem[0], self.domain.basix_cell(), self.elem[1], dtype=ScalarType, ) for _ in range(self.Nspace) ] self.multi_elem = mixed_element(ME) else: self.multi_elem = self.elem self.V = functionspace(self.domain, self.multi_elem) # MEASURES FOR INTEGRATION self.dx = Measure("dx", domain=self.domain, subdomain_data=self.subdomains) self.ds = Measure("ds", domain=self.domain, subdomain_data=self.boundaries) self.dS = Measure("dS", domain=self.domain, subdomain_data=self.boundaries) self.domain_status = True def __set_dirichlet_BC(self): """ internal use only: set the Dirichlet boundary condition from parameters NB: DBC cannot be change between simulations """ self.bcs = [] for i in self.dboundaries_list: bound = self.dboundaries_list[i] condition = bound["condition"] if condition.lower() == "dirichlet": value = Constant(self.domain, ScalarType(float(bound["value"]))) label = self.boundaries.find(int(bound["mesh_domain"])) if not self.inbound: dofs = locate_dofs_topological( self.V, self.domain.topology.dim - 1, label ) self.bcs.append(dirichletbc(value, dofs, self.V)) else: i_space = self.get_space_of_domain(bound["mesh_domain_3D"]) dofs = locate_dofs_topological( self.V.sub(i_space), self.domain.topology.dim - 1, label ) self.bcs.append(dirichletbc(value, dofs, self.V.sub(i_space))) if not self.bcs: rise_warning( "no Dirichlet Condition implemented on the Computed Field, Simulation maybe unsuccessful" ) self.dirichlet_BC_status = True def __set_neumann_BC(self): """ internal use only: set the Neuman boundary condition from parameters or update variable between to simulation NB: Only variable NBC (see FEMParameters.add_boundary) can be changed between simulations """ if not self.neumann_BC_status: for i_bound in self.nboundaries_list: bound = self.nboundaries_list[i_bound] condition = bound["condition"] if condition.lower() in "neumann": dom = int(bound["mesh_domain"]) if "value" in bound: self.Neuman_list[i_bound] = Constant( self.domain, ScalarType(bound["value"]) ) elif "variable" in bound: self.Neuman_list[i_bound] = Constant( self.domain, ScalarType(self.args[bound["variable"]]) ) else: rise_error( "A Neuman Boundary condition must be associated with a value or variable" ) if not self.inbound: self.L = self.L + self.Neuman_list[i_bound] * self.u * self.ds( dom ) else: i_space = self.get_space_of_domain(bound["mesh_domain_3D"]) self.L = self.L + self.Neuman_list[i_bound] * self.u[ i_space ] * self.ds(dom) self.neumann_BC_status = True else: for i_bound in self.nboundaries_list: bound = self.nboundaries_list[i_bound] condition = bound["condition"] if condition.lower() in "neumann" and "variable" in bound: if bound["variable"] in self.args: self.Neuman_list[i_bound].value = self.args[bound["variable"]] def __set_bilinear_form(self): """ internal use only: set the bilinear form a(vout, u) from the parameters """ pass_info("FEN4NRV: setup the bilinear form") self.__set_material_map() for i_space in range(self.Nspace): for i_domain in self.domains_list: i_mat = self.get_mixedspace_domain(i_space=i_space, i_domain=i_domain) if self.a is None: self.a = self.__get_static_component(i_domain, i_mat, i_space) else: self.a += self.__get_static_component(i_domain, i_mat, i_space) if self.inbound: self.__set_jump() self.bilinear_form_status = True def __get_static_component(self, i_dom, i_mat, i_space): r""" Set a static componnent of the bimlinear form: .. math:: a = \nablav[i_{space}] \sigma[i_mat] \nabla u[i_{space}] dx(i_{dom}) Parameters ---------- i_dom : int id of the domain on which the component should be set i_mat : int id of the material corresponding domain on which the component should be set i_space : int id of the i_space """ if not self.inbound: return inner( nabla_grad(self.v), self.mat_map[i_mat].sigma_fen * nabla_grad(self.u), ) * self.dx(i_dom) else: return inner( nabla_grad(self.v[i_space]), self.mat_map[i_mat].sigma_fen * nabla_grad(self.u[i_space]), ) * self.dx(i_dom) def __set_jump(self): """ internal use only: set the jump for all internale boundary to mimic the thin layers """ for i_ibound in self.inboundaries_list: in_space, out_space = self.get_spaces_of_ibound(i_ibound) local_thickness = Constant( self.domain, ScalarType(self.inboundaries_list[i_ibound]["thickness"]) ) jmp_v = avg(self.v[out_space]) - avg(self.v[in_space]) jmp_u = avg(self.u[out_space]) - avg(self.u[in_space]) self.a += ( self.mat_map[i_ibound].sigma_fen / local_thickness * jmp_u * jmp_v * self.dS(i_ibound) ) self.jump_status = True def __set_material_map(self): """ internal use only: build a dictionnary mat_map containing a material for every domain and layer """ if not self.material_map_satus: if self.mat_unit == "S/um": UN = S / m else: UN = 1 for dom, pty in self.mat_pty_map.items(): self.mat_map[dom] = fenics_material(pty) self.mat_map[dom].update_fenics_sigma( domain=self.domain, elem=self.elem, UN=UN, id=dom ) self.material_map_satus = True def __set_linear_form(self): """ internal use only: set the linear form L(u) from the parameters """ pass_info("FEN4NRV: setup the linear form") # Check if quicker without c_0 = Constant(self.domain, ScalarType(0.0)) if not self.inbound: self.L = c_0 * self.u * self.dx else: for i_space in range(self.Nspace): self.L = c_0 * self.u[i_space] * self.dx self.linear_form_status = True ##################################################### ################# Solve the sytstem ################# #####################################################
[docs] def get_solver_opt(self): """ get krylov solver options """ return self.petsc_opt
[docs] def set_solver_opt( self, ksp_type=None, pc_type=None, ksp_rtol=None, ksp_atol=None, ksp_max_it=None, **kwargs, ): """ set krylov solver options Parameters ---------- ksp_type : str value to set for ksp_type (solver type), if None don"t change, None by default pc_type : str value to set for pc_type (preconditioner type), if None don"t change, None by default list of possible ksp_rtol : float value to set for ksp_rtol (relative tolerance), if None don"t change, None by default ksp_atol : float value to set for ksp_atol (absolute tolerance), if None don"t change, None by default ksp_max_it : int value to set for ksp_max_it (max number of iterations), if None don"t change, None by default """ if ksp_type is not None: if ksp_type in ksp_type_list: self.petsc_opt["ksp_type"] = ksp_type else: rise_warning(ksp_type + " not set, should be in:\n" + ksp_type_list) if pc_type is not None: if pc_type in pc_type_list: self.petsc_opt["pc_type"] = pc_type else: rise_warning(pc_type + " not set, should be in:\n" + pc_type_list) if ksp_rtol is not None: self.petsc_opt["ksp_rtol"] = ksp_rtol if ksp_atol is not None: self.petsc_opt["ksp_atol"] = ksp_atol if ksp_max_it is not None: self.petsc_opt["ksp_max_it"] = ksp_max_it for key in kwargs: if "pc" in key and self.petsc_opt["pc_type"] in key: self.petsc_opt[key] = kwargs[key] elif "ksp" in key and self.petsc_opt["ksp_type"] in key: self.petsc_opt[key] = kwargs[key] else: rise_warning(key + " is not a valid solver option not set")
[docs] def set_result_merging(self, to_merge=None): """ Enable, disable, or toggle the merging of mixed-space solutions. Parameters ---------- to_merge : bool | None, optional Explicit merge state. If ``None``, toggle the current behavior. """ if isinstance(to_merge, bool): self.to_merge = to_merge else: self.to_merge = not self.to_merge
[docs] def solve(self, overwrite=False): """ Assemble and solve the linear problem Parameters ---------- overwrite : bool if true modify the existing sim_res value, else create a new one. by default False Returns ------- self.results : FEMResults FEMResults containing the result of the resulting field of the FEM simulation """ t0 = time.time() pass_info("FEN4NRV: solving electrical potential") if self.cg_problem is None: self.mixedvout = Function(self.V) self.cg_problem = LinearProblem( self.a, self.L, bcs=self.bcs, petsc_options=self.petsc_opt ) self.mixedvout.x.array[:] = self.cg_problem.solve().x.array self.solve_status = True if self.inbound and self.to_merge: V_sol = self.__merge_mixed_solutions() else: self.vout = self.mixedvout.copy() V_sol = self.V # return simulation result self.result = FEMResults() if not overwrite: vout = Function(V_sol) vout.x.array[:] = self.vout.x.array[:] else: vout = self.vout self.result.set_sim_result( mesh_file=self.mesh_file, domain=self.domain, elem=self.multi_elem, V=V_sol, vout=vout, comm=self.domain.comm, ) self.solving_timer += time.time() - t0 pass_info("FEN4NRV: solved in " + str(self.solving_timer) + " s") return self.result
def __merge_mixed_solutions(self): """ Merge mixed-space solutions into a discontinuous scalar field. Returns ------- Any Function space associated with the merged solution. """ if self.dg_problem is None: self.mixedvouts = self.mixedvout.split() self.V_DG = functionspace( self.domain, ("Discontinuous Lagrange", self.elem[1]) ) u_, v_ = TrialFunction(self.V_DG), TestFunction(self.V_DG) adg = u_ * v_ * self.dx Ldg = 0 for i_domain in self.domainsID: i_space = self.get_space_of_domain(i_domain) Ldg += v_ * self.mixedvout[i_space] * self.dx(i_domain) self.dg_problem = LinearProblem( adg, Ldg, bcs=[], petsc_options=self.petsc_opt ) else: mixedvouts = self.mixedvout.split() for i in range(len(mixedvouts)): self.mixedvouts[i].x.array[:] = mixedvouts[i].x.array self.vout = self.dg_problem.solve() return self.V_DG
[docs] def compute_conductance(self, order=None): """ Export the conductivity distribution as FEM result objects. Parameters ---------- order : int | None, optional DG interpolation order. Defaults to the simulation element order. Returns ------- list[FEMResults] Conductivity fields, one per mixed space when relevant. """ self.__init_domain() self.__set_material_map() V_sigma = self.V od = order or self.elem[1] V_sigma = functionspace(self.domain, ("DG", od)) sigma_out = [] for i_space in range(self.Nspace): sigma = Function(V_sigma) for i_domain in self.domainsID: dom_cells = self.subdomains.find(i_domain) dom_dofs = locate_dofs_topological( V_sigma, self.domain.topology.dim - 1, dom_cells ) i_mat = self.get_mixedspace_domain(i_space=i_space, i_domain=i_domain) mat = self.mat_map[i_mat].mat if mat.is_isotropic(): val = np.full_like(dom_dofs, mat.sigma, dtype=default_scalar_type) elif not mat.is_func: # val = mat.sigma_fen.value _sig = sum(mat.sigma**2) ** 0.5 val = np.full_like(dom_dofs, _sig, dtype=default_scalar_type) elif mat.is_func: sig_ = Function(V_sigma) sig_.interpolate(mat.sigma_func) val = sig_.x.array[dom_dofs] sigma.x.array[dom_dofs] = val self.sigma_results = FEMResults() self.sigma_results.set_sim_result( mesh_file=self.mesh_file, domain=self.domain, elem=self.multi_elem, V=V_sigma, vout=sigma, comm=self.domain.comm, ) sigma_out += [self.sigma_results] return sigma_out
##################################################### ################ Access the results ################# #####################################################
[docs] def solver_info(self, txt_fname="solver.txt"): """ Dump the PETSc solver information to a text file and echo it to the log. Parameters ---------- txt_fname : str, optional Output text filename. """ solver = self.cg_problem.solver viewer = Viewer().createASCII(txt_fname) solver.view(viewer) solver_output = open(txt_fname, "r") for line in solver_output.readlines(): pass_info(line)
[docs] def solve_and_save_sim(self, filename, save=True): """ Solve the simulation if needed and save the resulting field. Parameters ---------- filename : str Output filename. save : bool, optional Reserved flag for API compatibility. Returns ------- FEMResults Stored simulation result. """ if not self.solve_status: self.solve() fname = rmv_ext(filename) self.result.save(fname) return self.result
[docs] def get_timers(self): """ Return the accumulated solving time. Returns ------- float Solving timer in seconds. """ return self.solving_timer
[docs] def visualize_mesh(self): """ Open the mesh file in Gmsh. """ os.system("gmsh " + self.mesh_file + ".msh")
[docs] def get_surface(self, dom_id): """ Compute the measure of a boundary surface. Parameters ---------- dom_id : int Boundary domain identifier. Returns ------- float Surface measure. """ S = assemble_scalar(form(1 * self.ds(dom_id))) if self.comm == COMM_WORLD: S = self.comm.reduce(S, op=SUM, root=0) S = self.comm.bcast(S, root=0) return S
[docs] def get_domain_potential(self, dom_id, dim=2, space=0, surf=None): """ Compute the average potential over a boundary or volume domain. Parameters ---------- dom_id : int Domain identifier. dim : int, optional Integration dimension: ``2`` for surfaces, ``3`` for volumes. space : int, optional Mixed-space index used when results are not merged. surf : float | None, optional Precomputed domain measure. If omitted, it is evaluated internally. Returns ------- float Average potential over the selected domain. """ if dim == 2: do = self.ds elif dim == 3: do = self.dx if surf is None: surf = self.get_surface(dom_id) if self.to_merge: v_surf = assemble_scalar(form(self.vout * do(dom_id))) else: v_surf = assemble_scalar(form(self.vout[space] * do(dom_id))) return v_surf / surf