Source code for ogstools.msh2vtu

# Copyright (c) 2012-2024, OpenGeoSys Community (http://www.opengeosys.org)
#            Distributed under a Modified BSD License.
#            See accompanying file LICENSE.txt or
#            http://www.opengeosys.org/project/license
#

# Author: Dominik Kern (TU Bergakademie Freiberg)
import logging
from pathlib import Path
from typing import Any, Union

import meshio
import numpy as np

logger = logging.getLogger(__name__)

# For more info on __all__ see: https://stackoverflow.com/a/35710527/80480
__all__ = [
    "my_remove_orphaned_nodes",
    "print_info",
    "find_cells_at_nodes",
    "find_connected_domain_cells",
    "msh2vtu",
]


[docs] def my_remove_orphaned_nodes(my_mesh: meshio.Mesh) -> None: """Auxiliary function to remove points not belonging to any cell""" # find connected points and derive mapping from all points to them connected_point_index = np.array([]) for cell_block in my_mesh.cells: cell_block_values = cell_block.data connected_point_index = np.concatenate( [connected_point_index, cell_block_values.flatten()] ).astype(int) unique_point_index = np.unique(connected_point_index) old2new = np.zeros(len(my_mesh.points)) for new_index, old_index in enumerate(unique_point_index): old2new[old_index] = int(new_index) # update mesh to remaining points my_mesh.points = my_mesh.points[unique_point_index] # update points output_point_data = {} for pd_key, pd_value in my_mesh.point_data.items(): output_point_data[pd_key] = pd_value[unique_point_index] my_mesh.point_data = output_point_data # update point data output_cell_blocks = [] for cell_block in my_mesh.cells: cell_type = cell_block.type cell_block_values = cell_block.data updated_values = old2new[cell_block_values].astype(int) output_cell_blocks.append(meshio.CellBlock(cell_type, updated_values)) # cell data are not affected by point changes my_mesh.cells = output_cell_blocks # update cells
# print info for mesh: statistics and data field names # function to create node connectivity list, i.e. store for each node (point) to # which element (cell) it belongs
[docs] def find_cells_at_nodes( cells: Any, node_count: int, cell_start_index: int ) -> list[set]: # depending on the numbering of mixed meshes in OGS one may think of an # object-oriented way to add elements (of different type) to node # connectivity # initialize list of sets node_connectivity: list[set] = [set() for _ in range(node_count)] cell_index = cell_start_index for cell in cells: for node in cell: node_connectivity[node].add(cell_index) cell_index += 1 if node_connectivity.count(set()) > 0: unconnected_nodes = [ node for node in range(node_count) if node_connectivity[node] == set() ] logging.info("Points not connected with domain cells:") logging.info(unconnected_nodes) return node_connectivity
# function to find out to which domain elements a boundary element belongs
[docs] def find_connected_domain_cells( boundary_cells_values: Any, domain_cells_at_node: list[set[int]] ) -> tuple[np.ndarray, np.ndarray]: warned_gt1 = False # to avoid flood of warnings warned_lt1 = False # to avoid flood of warnings number_of_boundary_cells = len(boundary_cells_values) # to return unique common connected domain cell to be stored as cell_data # ("bulk_element_id"), if there are more than one do not store anything domain_cells_array = np.zeros(number_of_boundary_cells) domain_cells_number = np.zeros( number_of_boundary_cells ) # number of connected domain_cells for cell_index, cell_values in enumerate( boundary_cells_values ): # cell lists node of which it is comprised connected_domain_cells = [] for node in cell_values: connected_domain_cells.append(domain_cells_at_node[node]) common_domain_cells = set.intersection(*connected_domain_cells) number_of_connected_domain_cells = len(common_domain_cells) domain_cells_number[cell_index] = number_of_connected_domain_cells # there should be one domain cell for each boundary cell, however cells # of boundary dimension may be in the domain (e.g. as sources) if number_of_connected_domain_cells == 1: # assign only one (unique) connected dmain cell domain_cells_array[cell_index] = common_domain_cells.pop() elif number_of_connected_domain_cells < 1 and not warned_lt1: logging.warning( "find_connected_domain_cells: cell %s" " of boundary dimension does not belong to any domain cell!", str(cell_index), ) # domain_cell in domain_cells_array remains zero, as there is no # cell to assign warned_lt1 = True logging.warning( "Possibly more cells may not belong to any domain cell." ) elif not warned_gt1: logging.warning( "find_connected_domain_cells: cell %s" " of boundary dimension belongs to more than one domain cell!" ) # domain_cell in domain_cells_array remains zero, because structure # is 1D and only the boundary case is relevant for further use warned_gt1 = True logging.warning( "Possibly more cells may belong to more than one domain cell." ) return domain_cells_array, domain_cells_number
# TODO: rename
[docs] def msh2vtu( filename: Path, output_path: Path = Path(), output_prefix: str = "", dim: Union[int, list[int]] = 0, delz: bool = False, swapxy: bool = False, reindex: bool = False, keep_ids: bool = False, ascii: bool = False, log_level: Union[int, str] = "DEBUG", ) -> int: """ Convert a gmsh mesh (.msh) to an unstructured grid file (.vtu). Prepares a Gmsh-mesh for use in OGS by extracting domain-, boundary- and physical group-submeshes, and saves them in vtu-format. Note that all mesh entities should belong to physical groups. :param filename: Gmsh mesh file (.msh) as input data :param output_path: Path of output files, defaults to current working dir :param output_prefix: Output files prefix, defaults to basename of inputfile :param dim: Spatial dimension (1, 2 or 3), trying automatic detection, if not given. If multiple dimensions are provided, all elements of these dimensions are embedded in the resulting domain mesh. :param delz: Delete z-coordinate, for 2D-meshes with z=0. Note that vtu-format requires 3D points. :param swapxy: Swap x and y coordinate :param reindex: Renumber physical group / region / Material IDs to be renumbered beginning with zero. :param keep_ids: By default, rename 'gmsh:physical' to 'MaterialIDs' and change type of corresponding cell data to INT32. If True, this is skipped. :param ascii: Save output files (.vtu) in ascii format. :param log_level: Level of log output. Possible values: <https://docs.python.org/3/library/logging.html#levels> :returns: A MeshSeries object """ logging.getLogger().setLevel(log_level) if isinstance(dim, list): assert len(dim) < 3, "Specify at most 3 dim values." filename = Path(filename) # some variable declarations ph_index = 0 # to access physical group id in field data geo_index = 1 # to access geometrical dimension in field data dim0 = 0 dim1 = 1 dim2 = 2 dim3 = 3 # for all points, as the selection goes via the cells and subsequent trim ogs_point_data_key = "bulk_node_ids" # to associate domain points with original points ogs_domain_point_data_key = "original_node_number" available_cell_types = { dim0: {"vertex"}, dim1: {"line", "line3", "line4"}, dim2: { *["triangle", "triangle6", "triangle9", "triangle10"], *["quad", "quad8", "quad9"], }, dim3: { *["tetra", "tetra10"], *["pyramid", "pyramid13", "pyramid15", "pyramid14"], *["wedge", "wedge15", "wedge18"], *["hexahedron", "hexahedron20", "hexahedron27"], *["prism", "prism15", "prism18"], }, } gmsh_physical_cell_data_key = "gmsh:physical" ogs_domain_cell_data_key = "MaterialIDs" ogs_boundary_cell_data_key = "bulk_elem_ids" ogs = not keep_ids # check if input file exists and is in gmsh-format if not filename.is_file(): logging.warning("No input file (mesh) found.") # raise FileNotFoundError return 1 if filename.suffix != ".msh": logging.warning( "Warning, input file seems not to be in gmsh-format (*.msh)" ) # if no parameter given, use same basename as input file output_basename = filename.stem if output_prefix == "" else output_prefix logging.info("Output: %s", output_basename) # read in mesh (be aware of shallow copies, i.e. by reference) mesh: meshio.Mesh = meshio.read(str(filename)) points, point_data = mesh.points, mesh.point_data cells_dict, cell_data, cell_data_dict = ( mesh.cells_dict, mesh.cell_data, mesh.cell_data_dict, ) field_data = mesh.field_data number_of_original_points = len(points) existing_cell_types = set(mesh.cells_dict.keys()) # elements found in mesh logging.info("Original mesh (read)") logging.info(mesh) # check if element types are supported in current version of this script all_available_cell_types: set[str] = set() for cell_types in available_cell_types.values(): all_available_cell_types = all_available_cell_types.union(cell_types) for cell_type in existing_cell_types: if cell_type not in all_available_cell_types: logging.warning("Cell type %s not supported", str(cell_type)) # set spatial dimension of mesh if dim == 0: assert isinstance(dim, int) # automatically detect spatial dimension of mesh _dim = dim0 # initial value for test_dim, test_cell_types in available_cell_types.items(): if ( len(test_cell_types.intersection(existing_cell_types)) and test_dim > dim ): _dim = test_dim logging.info("Detected mesh dimension: %s", str(_dim)) logging.info("##") else: # trust the user _dim = max(dim) if isinstance(dim, list) else dim # delete third dimension if wanted by user if delz: if _dim <= dim2: logging.info("Remove z coordinate of all points.") points = mesh.points[:, :2] else: logging.info( "Mesh seems to be in 3D, z-coordinate cannot be removed. Option" " -z ignored." ) # special case in 2D workflow if swapxy: logging.info("Swapping x- and y-coordinate") points[:, 0], points[:, 1] = points[:, 1], -points[:, 0] # boundary and domain cell types depend on dimension if dim1 <= _dim <= dim3: boundary_dim = _dim - 1 domain_dim = _dim boundary_cell_types = existing_cell_types.intersection( available_cell_types[boundary_dim] ) domain_cell_types = existing_cell_types.intersection( available_cell_types[domain_dim] if isinstance(dim, int) else set().union(*[available_cell_types[d] for d in dim]) ) else: logging.warning("Error, invalid dimension dim=%s!", str(_dim)) return 1 # sys.exit() # Check for existence of physical groups if gmsh_physical_cell_data_key in cell_data: physical_groups_found = True # reconstruct field data, when empty (physical groups may have a number, # but no name) # TODO may there be other field data, than physical groups? if field_data == {}: # detect dimension by cell type for pg_cell_type, pg_cell_data in cell_data_dict[ gmsh_physical_cell_data_key ].items(): if pg_cell_type in available_cell_types[dim0]: pg_dim = dim0 if pg_cell_type in available_cell_types[dim1]: pg_dim = dim1 if pg_cell_type in available_cell_types[dim2]: pg_dim = dim2 if pg_cell_type in available_cell_types[dim3]: pg_dim = dim3 pg_ids = np.unique(pg_cell_data) for pg_id in pg_ids: pg_key = "PhysicalGroup_" + str(pg_id) field_data[pg_key] = np.array([pg_id, pg_dim]) id_list_domains = [ physical_group[ph_index] for physical_group in field_data.values() if physical_group[geo_index] == domain_dim ] id_offset = min(id_list_domains, default=0) if reindex else 0 else: logging.info("No physical groups found.") physical_groups_found = False ############################################################################ # Extract domain mesh, note that meshio 4.3.3. offers # remove_lower_dimensional_cells(), but we want to keep a uniform style for # domain and subdomains. Make sure to use domain_mesh=deepcopy(mesh) in this # case! ############################################################################ all_points = np.copy(points) # copy all, superfluous get deleted later if ogs: # to associate domain points later original_point_numbers = np.arange(number_of_original_points) all_point_data = {} all_point_data[ogs_domain_point_data_key] = np.uint64( original_point_numbers ) else: all_point_data = { key: value[:] for key, value in point_data.items() } # deep copy domain_cells = [] if ogs: domain_cell_data_key = ogs_domain_cell_data_key else: domain_cell_data_key = gmsh_physical_cell_data_key domain_cell_data: dict[str, list[int]] = {} domain_cell_data[domain_cell_data_key] = [] for domain_cell_type in domain_cell_types: # cells domain_cells_values = cells_dict[domain_cell_type] number_of_domain_cells = len(domain_cells_values) domain_cells_block = (domain_cell_type, domain_cells_values) domain_cells.append(domain_cells_block) # cell_data if physical_groups_found: if domain_cell_type in cell_data_dict[gmsh_physical_cell_data_key]: domain_in_physical_group = True else: domain_in_physical_group = False else: domain_in_physical_group = False if domain_in_physical_group: if ogs: domain_cell_data_values = cell_data_dict[ gmsh_physical_cell_data_key ][domain_cell_type] # ogs needs MaterialIDs as int32, possibly beginning with zero # (by id_offset) domain_cell_data_values = np.int32( domain_cell_data_values - id_offset ) else: domain_cell_data_values = cell_data_dict[ gmsh_physical_cell_data_key ][domain_cell_type] else: domain_cell_data_values = np.zeros( (number_of_domain_cells), dtype=int ) logging.info( "Some domain cells are not in a physical group, their" " PhysicalID/MaterialID is set to zero." ) domain_cell_data[domain_cell_data_key].append(domain_cell_data_values) if len(domain_cells): domain_mesh = meshio.Mesh( points=all_points, point_data=all_point_data, cells=domain_cells, cell_data=domain_cell_data, ) my_remove_orphaned_nodes(domain_mesh) if len(domain_mesh.points) != number_of_original_points: logging.warning( "There are nodes out of the domain mesh. If ogs option is set," " then no bulk_node_id can be assigned to these nodes." ) meshio.write( Path(output_path, output_basename + "_domain.vtu"), domain_mesh, binary=not ascii, ) logging.info("Domain mesh (written)") print_info(domain_mesh) if ogs: # store domain node numbers for use as bulk_node_id (point_data) original2domain_point_table = ( np.ones(number_of_original_points) * number_of_original_points ) # initialize with non-existing number --> error when bulk_id for # non-domain mesh should be written for domain_point_index, original_point_index in enumerate( domain_mesh.point_data[ogs_domain_point_data_key] ): original2domain_point_table[ original_point_index ] = domain_point_index # prepare data needed for bulk_elem_id (cell_data), also needed without # ogs option to detect boundaries # node connectivity for a mixed mesh (check for OGS compliance), needed # with and without ogs option to identify boundary cells cell_start_index = 0 domain_cells_at_node: list[set[int]] = [ set() for _ in range(number_of_original_points) ] # initialize list of sets # make a list for each type of domain cells for cell_block in domain_cells: block_domain_cells_at_node = find_cells_at_nodes( cell_block[1], number_of_original_points, cell_start_index ) # later used for boundary mesh and submeshes cell_start_index += len( cell_block[1] ) # assume consecutive cell numbering (as it is written to vtu) # add connectivities of current cell type to entries (sets) of total # connectivity (list of sets) for total_list, block_list in zip( domain_cells_at_node, block_domain_cells_at_node ): total_list.update(block_list) else: logging.info("Empty domain mesh, nothing written to file.") ############################################################################ # Extract boundary mesh ############################################################################ # points, process full list (all points), later trimmed according to cell # selection, deep copy needed because removed_orphaned_nodes() operates on # shallow copy of point_data # copy again, in case previous remove_orphaned_nodes() affected all_points all_points = np.copy(points) if ogs: all_point_data = {} # now containing domain node numbers all_point_data[ogs_point_data_key] = np.uint64( original2domain_point_table ) else: all_point_data = { key: value[:] for key, value in point_data.items() } # deep copy boundary_cells = [] boundary_cell_data: dict[str, list] = {} if ogs: boundary_cell_data_key = ogs_boundary_cell_data_key else: boundary_cell_data_key = gmsh_physical_cell_data_key boundary_cell_data[boundary_cell_data_key] = [] for boundary_cell_type in boundary_cell_types: # preliminary, as there may be cells of boundary dimension inside domain # (i.e. which are no boundary cells) boundary_cells_values = cells_dict[boundary_cell_type] connected_cells, connected_cells_count = ( np.asarray(np.uint64(t)) for t in find_connected_domain_cells( boundary_cells_values, domain_cells_at_node ) ) # a boundary cell is connected with exactly one domain cell boundary_index = connected_cells_count == 1 if not boundary_index.all(): logging.info( "For information, there are cells of boundary dimension not on" " the boundary (e.g. inside domain)." ) multi_connection_index = connected_cells_count > 1 logging.info( "Cells of type %s connected to more than one domain cell:", boundary_cell_type, ) logging.info(boundary_cells_values[multi_connection_index]) zero_connection_index = connected_cells_count < 1 logging.info( "Cells of type %s not connected to any domain cell:", boundary_cell_type, ) logging.info(boundary_cells_values[zero_connection_index]) boundary_cells_values = boundary_cells_values[ boundary_index ] # final boundary cells boundary_cells_block = (boundary_cell_type, boundary_cells_values) boundary_cells.append(boundary_cells_block) # cell_data boundary_in_physical_group = physical_groups_found and ( boundary_cell_type in cell_data_dict[gmsh_physical_cell_data_key] ) if ogs: boundary_cell_data_values = connected_cells[boundary_index] else: if boundary_in_physical_group: boundary_cell_data_values = cell_data_dict[ gmsh_physical_cell_data_key ][boundary_cell_type] else: # cells of specific type number_of_boundary_cells = len(boundary_cells_values) boundary_cell_data_values = np.zeros( (number_of_boundary_cells), dtype=int ) logging.info( "Some boundary cells are not in a physical group, their" " PhysicalID is set to zero." ) boundary_cell_data[boundary_cell_data_key].append( boundary_cell_data_values ) boundary_mesh = meshio.Mesh( points=all_points, point_data=all_point_data, cells=boundary_cells, cell_data=boundary_cell_data, ) if len(boundary_cells): my_remove_orphaned_nodes(boundary_mesh) meshio.write( Path(output_path, output_basename + "_boundary.vtu"), boundary_mesh, binary=not ascii, ) logging.info("Boundary mesh (written)") print_info(boundary_mesh) else: logging.info("No boundary elements detected.") ############################################################################ # Now we want to extract subdomains given by physical groups in gmsh # name=user-defined name of physical group, data=[physical_id, geometry_id] ############################################################################ if not physical_groups_found: return 0 for name, data in field_data.items(): subdomain_dim = data[geo_index] # 0 or 1 or 2 or 3 if dim0 <= subdomain_dim and subdomain_dim <= dim3: subdomain_cell_types = existing_cell_types.intersection( available_cell_types[subdomain_dim] ) else: logging.warning("Invalid dimension found in physical groups.") continue all_points = np.copy(points) # point data, make another copy due to possible changes by previous # actions if ogs: all_point_data = {} all_point_data[ogs_point_data_key] = np.uint64( original2domain_point_table ) else: all_point_data = { key: value[:] for key, value in point_data.items() } # deep copy # cells, cell_data subdomain_cells = [] # list subdomain_cell_data: dict[str, list] = {} # dict if ogs: if subdomain_dim == domain_dim: subdomain_cell_data_key = ogs_domain_cell_data_key elif subdomain_dim == boundary_dim: subdomain_cell_data_key = ogs_boundary_cell_data_key else: # use gmsh, as the requirements from OGS subdomain_cell_data_key = gmsh_physical_cell_data_key else: # same for all dimensions subdomain_cell_data_key = gmsh_physical_cell_data_key subdomain_cell_data[subdomain_cell_data_key] = [] # list # flag to indicate invalid bulk_element_ids, then no cell data will be # written subdomain_cell_data_trouble = False for cell_type in subdomain_cell_types: # cells all_false = np.full( cell_data_dict[gmsh_physical_cell_data_key][cell_type].shape, False, ) selection_index = mesh.cell_sets_dict[name].get( cell_type, all_false ) selection_cells_values = cells_dict[cell_type][selection_index] if len(selection_cells_values): # if there are some data selection_cells_block = (cell_type, selection_cells_values) subdomain_cells.append(selection_cells_block) # cell data if ogs: selection_cell_data_values: Union[np.int32, np.uint64] if subdomain_dim == boundary_dim: ( connected_cells, connected_cells_count, ) = find_connected_domain_cells( selection_cells_values, domain_cells_at_node ) # a boundary cell is connected with one domain cell, # needed to write bulk_elem_id boundary_index = connected_cells_count == 1 selection_cell_data_values = np.uint64(connected_cells) if not boundary_index.all(): logging.info( "In physical group %s" " are bulk_elem_ids not uniquely defined," " e.g. for cells of boundary dimension inside" " the domain, and thus not written. If" " bulk_elem_ids should be written for a" " physical group, then make sure all its" " cells of boundary dimension are located at" " the boundary.", name, ) subdomain_cell_data_trouble = True elif subdomain_dim == domain_dim: selection_cell_data_values = np.int32( cell_data_dict[gmsh_physical_cell_data_key][ cell_type ][selection_index] - id_offset ) else: # any cells of lower dimension than boundary selection_cell_data_values = np.int32( cell_data_dict[gmsh_physical_cell_data_key][ cell_type ][selection_index] ) else: selection_cell_data_values = cell_data_dict[ gmsh_physical_cell_data_key ][cell_type][selection_index] subdomain_cell_data[subdomain_cell_data_key].append( selection_cell_data_values ) outputfilename = output_basename + "_physical_group_" + name + ".vtu" if subdomain_cell_data_trouble: submesh = meshio.Mesh( points=all_points, point_data=all_point_data, cells=subdomain_cells, ) # do not write invalid cell_data else: submesh = meshio.Mesh( points=all_points, point_data=all_point_data, cells=subdomain_cells, cell_data=subdomain_cell_data, ) if len(subdomain_cells): my_remove_orphaned_nodes(submesh) outputfilename = ( output_basename + "_physical_group_" + name + ".vtu" ) meshio.write( Path(output_path, outputfilename), submesh, binary=not ascii ) logging.info("Submesh %s (written)", name) print_info(submesh) else: logging.info("Submesh %s empty (not written)", name) return 0 # successfully finished