Source code for ogstools.materiallib.core.material

# SPDX-FileCopyrightText: Copyright (c) OpenGeoSys Community (opengeosys.org)
# SPDX-License-Identifier: BSD-3-Clause

from __future__ import annotations

import copy
import logging
import re
import warnings
from collections.abc import Iterator, Mapping
from pathlib import Path
from typing import Any

import yaml

from ogstools._internal import deprecated
from ogstools.materiallib.schema.required_properties import (
    required_property_names,
)

from .property import MaterialProperty

logger = logging.getLogger(__name__)


[docs] class Material(Mapping[str, MaterialProperty]): """ Represents a single material. - Can be constructed directly from YAML raw data. - Provides access to all properties. - Supports filtering by process schemas or property names. """ __hash__ = None # type: ignore[assignment] # Mutable with __eq__
[docs] def __init__(self, name: str, raw_data: dict[str, Any]): self.name = name self.raw = raw_data # full YAML (e.g. for debugging or export) self.properties: list[MaterialProperty] = [] self._parse_properties()
[docs] @classmethod def from_file(cls, file_path: str | Path) -> Material | None: """Create a Material from a YAML file or return None if invalid.""" with Path(file_path).open(encoding="utf-8") as file: raw_data = yaml.safe_load(file) if not isinstance(raw_data, dict): logger.debug("Skipping invalid YAML file: %s", file_path) return None name = raw_data.get("name") if not isinstance(name, str): logger.debug( "Skipping YAML file without valid 'name': %s", file_path ) return None return cls(name=name, raw_data=raw_data)
[docs] def to_file(self, file_path: str | Path) -> None: """Write this Material to a YAML file.""" output_data = dict(self.raw) output_data["name"] = self.name with Path(file_path).open("w", encoding="utf-8") as file: yaml.safe_dump(output_data, file, sort_keys=False)
def _parse_properties(self) -> None: block = self.raw.get("properties", {}) if not block: logger.debug("Material %s has no properties", self.name) for prop_name, entries in block.items(): for entry in entries if isinstance(entries, list) else [entries]: type_ = entry.get( "type", "Constant" ) # TODO: Error if 'type' not found value = entry.get("value", None) extra = { k: v for k, v in entry.items() if k not in ("type", "value") } self.properties.append( MaterialProperty( name=prop_name, type_=type_, value=value, **extra ) )
[docs] def __getitem__(self, key: str) -> MaterialProperty: for p in self.properties: if p.name == key: return p msg = ( f"No property with name {key} found. Available properties are: " + ", ".join(self) ) raise KeyError(msg)
def __iter__(self) -> Iterator[str]: return iter(dict.fromkeys(p.name for p in self.properties)) def __len__(self) -> int: return len(self.properties) def __bool__(self) -> bool: return bool(self.name) @staticmethod def _raw_from_properties( name: str, properties: list[MaterialProperty] ) -> dict: "raw yaml data dict with lists if multiple entries have same names" raw_block: dict[str, list[dict[str, Any]]] = {} for p in properties: entry = {"type": p.type, "value": p.value, **p.extra} raw_block.setdefault(p.name, []).append(entry) return {"name": name, "properties": raw_block} @property def property_names(self) -> list[str]: """Returns a list of all property names of this material.""" return list(self)
[docs] @deprecated(""": use mat[key] instead.""") def get_property(self, key: str) -> MaterialProperty: warnings.warn( "get_property() is deprecated, use mat[key] instead.", DeprecationWarning, stacklevel=2, ) return self[key]
[docs] def filter_process(self, process_schema: dict[str, Any]) -> None: """ Filter self, to only contain properties required by a given process. """ allowed = required_property_names(process_schema) self.filter_properties(allowed)
[docs] def filter_properties( self, allowed: set[str] | str, key: str = "name" ) -> None: """ Filter self, to only contain properties in 'allowed', preserving all extra fields (e.g. scope, unit). :param allowed: values to filter for :param key: attribute to filter for (e.g. 'name' or 'type') """ if isinstance(allowed, str): allowed = {allowed} filtered_props = [p for p in self.properties if p.get(key) in allowed] logger.debug( "Material %s: filtered %d/%d properties (%s)", self.name, len(filtered_props), len(self.properties), ", ".join(p.name for p in filtered_props), ) self.properties = filtered_props self.raw = Material._raw_from_properties(self.name, filtered_props)
@property def duplicates(self) -> list[MaterialProperty]: "Returns all material properties with multiple definitions." prop_names = [p.name for p in self.properties] dupe_names = [x for x in self.property_names if prop_names.count(x) > 1] return [p for p in self.properties if p.name in dupe_names] def _filter( self, selection: dict[str, dict[str, str | re.Pattern]] ) -> None: "Reduce properties by the given selection." if len(selection) == 0: return def matching(value: str | re.Pattern, text: str) -> bool: if isinstance(value, re.Pattern): return re.search(value, text) is not None return text == value pick: list[MaterialProperty] = [] for name, restrictions in selection.items(): filtered = [ p for p in self.properties if matching(name, p.name) and all(matching(v, p.get(k)) for k, v in restrictions.items()) ] pick += filtered others = [ p for p in self.properties if not all(matching(name, p.name) for name in selection) ] self.properties = sorted(others + pick, key=lambda p: p.name) self.raw = Material._raw_from_properties(self.name, self.properties) def __eq__(self, other: object) -> bool: if not isinstance(other, Material): return NotImplemented def sort_key(d: dict) -> list: return sorted((k, str(v)) for k, v in d.items()) return self.name == other.name and sorted( [p.to_dict() for p in self.properties], key=sort_key ) == sorted([p.to_dict() for p in other.properties], key=sort_key)
[docs] def copy( self, selection: dict[str, dict[str, str | re.Pattern]] | None = None ) -> Material: """Return a deep copy, optionally with a filtered selection. :param selection: Maps restrictions to different properties. They will be only present in the resulting copy, if the properties named in selection adhere to the given constraint. The values can be regular expressions. Shape: `{"propertynames": {"attributes": "values"}}` Example: `{"saturation": {"type": re.compile("SaturationVan.*")}, "density": {"type": "Constant", "source": re.compile(".*2018.*")}}` """ new_mat = copy.deepcopy(self) if selection is not None: new_mat._filter(selection) return new_mat
def __repr__(self) -> str: return ( f"<Material '{self.name}' with {len(self.properties)} properties>" ) def __str__(self) -> str: lines = [repr(self)] for p in self.properties: lines.append(f" {p}") return "\n".join(lines)