Source code for mf6adj.pm

import logging
import pathlib as pl
import types
from datetime import datetime
from typing import Any, List, Optional, Union

import h5py
import numpy as np
import pandas as pd
import scipy.sparse as sparse
from scipy.sparse.csgraph import reverse_cuthill_mckee
from scipy.sparse.linalg import (
    LinearOperator,
    bicgstab,
    cg,
    gmres,
    lgmres,
    lsqr,
    spilu,
    spsolve,
    svds,
)

from .utils.utils import SolverCallback
from .utils.utils_fileio import _write_group_to_hdf
from .utils.utils_logger import _LoggerUtil
from .utils.utils_modflow import (
    get_mf6_bound_dict,
)

PathLike = Union[str, pl.Path]


[docs] class PerfMeasRecord: """Single record from a performance-measure block. Parameters ---------- kper : int Zero-based stress period. kstp : int Zero-based time step. inode : int Zero-based node number. pm_type : str Either ``head`` or a boundary package name from the GWF name file. pm_form : str Either ``direct`` or ``residual``. weight : float Weight value. obsval : float Observed counterpart; used only for ``residual`` form. k : int, optional Zero-based layer index for structured-grid reporting. i : int, optional Zero-based row index for structured-grid reporting. j : int, optional Zero-based column index for structured-grid reporting. """ def __init__( self, kper: int, kstp: int, inode: int, pm_type: str, pm_form: str, weight: float, obsval: float, k: Optional[int] = None, i: Optional[int] = None, j: Optional[int] = None, ): """Initialize a performance-measure record. Parameters ---------- kper : int Zero-based stress period. kstp : int Zero-based time step. inode : int Zero-based node number. pm_type : str Performance-measure type. pm_form : str Performance-measure form. weight : float Weight value. obsval : float Observed value used for residual measures. k : int, optional Zero-based layer index for structured-grid reporting. i : int, optional Zero-based row index for structured-grid reporting. j : int, optional Zero-based column index for structured-grid reporting. """ self._kper = int(kper) self._kstp = int(kstp) self.kperkstp = (self._kper, self._kstp) if isinstance(inode, np.int64): inode = int(inode) elif isinstance(inode, np.ndarray): inode = int(inode[0]) self.inode = int(inode) self._k = None if k is not None: self._k = int(k) self._i = None if i is not None: self._i = int(i) self._j = None if j is not None: self._j = int(j) self.weight = float(weight) self.obsval = float(obsval) self.pm_type = pm_type.lower().strip() self.pm_form = pm_form.lower().strip() if self.pm_form not in ["direct", "residual"]: raise Exception( "PerfMeasRecord.pm_form must be 'direct' or 'residual', " + f"not '{self.pm_form}'" ) def __repr__(self) -> str: """Return a compact string representation of the record. Returns ------- str Summary string containing time-step, node, type, and form fields. """ s = ( f"kperkstp:{self.kperkstp}, inode:{self.inode}, " + f"k:{self._k}, type:{self.pm_type}, form:{self.pm_form}" ) if self._i is not None: s += f", i:{self._i}" if self._j is not None: s += f", j:{self._j}" return s
[docs] class PerfMeas: """Performance-measure container and adjoint-solve helper. Parameters ---------- pm_name : str Name of the performance measure. pm_entries : list[PerfMeasRecord] Performance-measure entries. logging_level : str or int, optional Logging level. logger : logging.Logger, optional Logger instance. If omitted, a new logger is created. """ def __init__( self, pm_name: str, pm_entries: List[PerfMeasRecord], logging_level: Union[int, str] = "INFO", logger: Optional[_LoggerUtil] = None, ): """Initialize a performance-measure container. Parameters ---------- pm_name : str Name of the performance measure. pm_entries : list[PerfMeasRecord] Performance-measure entries. logging_level : int or str, optional Logging level to use when creating a new logger. logger : logging.Logger, optional Existing logger utility. If omitted, a new logger is created. """ self._name = pm_name.lower().strip() self._entries = pm_entries if logger is None: logger_name = f"{self.__class__.__name__}-{self.name}" logger_filename = f"{self.name}.log" self.logger = _LoggerUtil( logger_name, logging_level, logger_filename, ) else: self.new_logger = False self.logger = logger @property def name(self) -> str: """Return the performance measure name. Returns ------- str Performance-measure name. """ return str(self._name) def _performance_measure_forward(self, head_dict, sp_package_dict) -> float: """Calculate the forward value of the performance measure. This helper is used during perturbation testing to evaluate the forward-response value associated with the configured entries. Parameters ---------- head_dict : dict Mapping from ``(kper, kstp)`` tuples to simulated head arrays. sp_package_dict : dict Nested mapping of stress-package results produced by :meth:`Mf6Adj.solve_forward_model` when ``pert_save=True``. Returns ------- float Scalar forward value for the performance measure. Direct measures are accumulated linearly and residual measures are accumulated as weighted squared residuals. """ result = 0.0 for pfr in self._entries: if pfr.pm_type == "head": if pfr.pm_form == "direct": result += pfr.weight * head_dict[pfr.kperkstp][pfr.inode] elif pfr.pm_form == "residual": result += ( pfr.weight * (head_dict[pfr.kperkstp][pfr.inode] - pfr.obsval) ) ** 2 else: raise Exception("something is wrong") else: for gwf_ptype, bnd_d in sp_package_dict.items(): for kk, kk_list in bnd_d.items(): if kk == pfr.kperkstp: for kk_d in kk_list: if ( kk_d["node"] == pfr.inode + 1 and kk_d["packagename"] == pfr.pm_type ): if pfr.pm_form == "direct": result += pfr.weight * kk_d["simval"] elif pfr.pm_form == "residual": result += ( pfr.weight * (kk_d["simval"] - pfr.obsval) ) ** 2 return result def _setup_block_jacobi_preconditioner(self, amat, block_size=5000): """Setup a block Jacobi preconditioner Parameters ---------- amat : scipy.sparse.spmatrix Sparse matrix for which to create the block Jacobi preconditioner. block_size : int Size of the blocks for the block Jacobi preconditioner. Returns ------- scipy.sparse.linalg.LinearOperator Linear operator representing the block Jacobi preconditioner. """ n = amat.shape[0] self.logger.logger.debug( f"Setup block Jacobi preconditioner with {block_size:,} block size " + f"for matrix with {n:,} rows" ) # 1. Compute RCM permutation and reorder matrix # This clusters non-zeros to make the diagonal blocks more "meaningful" perm = reverse_cuthill_mckee(amat) inv_perm = np.argsort(perm) # Used to "undo" the permutation after solving amat_rcm = amat[perm, :][:, perm].tocsc() # 2. Define block boundaries block_indices = [] for i in range(0, n, block_size): end = min(i + block_size, n) block_indices.append((i, end)) self.logger.logger.debug( "Number of blocks for block Jacobi preconditioner: " + f"{len(block_indices):,}" ) # 3. Pre-factorize diagonal blocks using SPILU ilu_solvers = [] for start, end in block_indices: block = amat_rcm[start:end, start:end] # spilu requires CSC format and usually benefits # from fill_factor adjustments try: ilu_solvers.append(spilu(block, fill_factor=10.0)) except RuntimeError: # Fallback if a block is singular or problematic self.logger.logger.debug( f"Warning: Block {start}:{end} is singular, using simpler solve." ) ilu_solvers.append(None) # 4. Define M^-1 * v def block_jacobi_solve(v): # Reorder input vector to RCM space v_rcm = v[perm] res_rcm = np.zeros_like(v_rcm) for i, (start, end) in enumerate(block_indices): if ilu_solvers[i] is not None: res_rcm[start:end] = ilu_solvers[i].solve(v_rcm[start:end]) else: # Basic identity fallback for failed blocks res_rcm[start:end] = v_rcm[start:end] # Return to original ordering return res_rcm[inv_perm] return LinearOperator((n, n), matvec=block_jacobi_solve) def _setup_point_jacobi_preconditioner(self, amat): """Setup the point Jacobi preconditioner Parameters ---------- amat : scipy.sparse.spmatrix Sparse matrix for which to create the Jacobi preconditioner. Returns ------- scipy.sparse.linalg.LinearOperator Linear operator representing the Jacobi preconditioner. """ self.logger.logger.debug("Setup Jacobi preconditioner") d = amat.diagonal() d_inv = 1.0 / d def point_jacobi_solve(v): return d_inv * v return LinearOperator( amat.shape, matvec=point_jacobi_solve, ) def _setup_jacobi_preconditioner( self, amat, jacobi_type="point", precon_kwargs=None, ): """Setup a Jacobi preconditioner Parameters ---------- amat : scipy.sparse.spmatrix Sparse matrix for which to create the Jacobi preconditioner. jacobi_type : str, optional Type of Jacobi preconditioner to use. Supported values are "point" and "block". precon_kwargs : dict, optional Keyword arguments passed to the Jacobi preconditioner setup method. Returns ------- scipy.sparse.linalg.LinearOperator Linear operator representing the Jacobi preconditioner. """ if jacobi_type == "point": return self._setup_point_jacobi_preconditioner(amat) elif jacobi_type == "block": block_size = (precon_kwargs or {}).get("block_size") precon_kwargs = ( {"block_size": int(block_size)} if block_size is not None else {} ) return self._setup_block_jacobi_preconditioner( amat, **precon_kwargs, )
[docs] def solve_adjoint( self, hdf5_forward_solution_fname: PathLike, hdf5_adjoint_solution_fname: Optional[PathLike] = None, skip_solve: bool = False, csv_summary: bool = False, linear_solver=None, linear_solver_kwargs: Optional[dict] = None, use_precon: bool = True, jacobi_preconditioner: Optional[str] = None, precon_kwargs: Optional[dict] = None, singular_test: bool = False, tikhonov: float = 0.0, dvclose: Optional[float] = 1e-6, rclose: Optional[float] = 1e-3, dvscale: bool = False, ) -> pd.DataFrame: """Solve for the adjoint state for the performance measure. This method writes an adjoint-solution HDF5 file and, when requested, writes a CSV summary file. Parameters ---------- hdf5_forward_solution_fname : PathLike HDF5 file created by :meth:`Mf6Adj.solve_forward_model` containing the forward-solution information needed for the adjoint solve. hdf5_adjoint_solution_fname : PathLike, optional HDF5 file to write the adjoint solution. If omitted, a default name based on the performance-measure name is used. If the target file already exists, it is removed before writing. skip_solve : bool, optional Skip the adjoint solve for time steps with no performance-measure entries. csv_summary : bool, optional Write a CSV summary of the sensitivity information beside the adjoint HDF5 output file. linear_solver : str or callable, optional Sparse linear solver to use. Supported string values are ``"direct"``, ``"bicgstab"``, ``"cg"``, ``"gmres"``, ``"lgmres"``, and ``"lsqr"``. linear_solver_kwargs : dict, optional Keyword arguments passed to the configured linear solver callable. use_precon : bool, optional Use a preconditioner with the iterative linear solver. jacobi_preconditioner : str, optional Use a Jacobi preconditioner with the iterative linear solver. If ``None``, the ILU preconditioner is used. Supported string values are ``"point"`` and ``"block"``. precon_kwargs : dict, optional Keyword arguments passed to the preconditioner setup. For the default ILU preconditioner, these are passed to ``spilu``. When ``jacobi_preconditioner="block"``, the ``block_size`` key sets the block size. singular_test : bool, optional Test for a singular matrix and apply Tikhonov regularization when needed. tikhonov : float, optional Tikhonov regularization value. dvclose : float, optional Custom convergence criterion based on the maximum absolute change between consecutive iterates. rclose : float, optional Custom convergence criterion based on the maximum absolute residual. dvscale : bool, optional Scale lambda and the right-hand side to improve iterative solver convergence for large lambda values. Returns ------- DataFrame Summary of composite sensitivity information. """ supported_iterative_solvers = ( "bicgstab", "cg", "gmres", "lgmres", "lsqr", ) supported_jacobi_preconditioners = ( "point", "block", ) if jacobi_preconditioner is not None: if jacobi_preconditioner not in supported_jacobi_preconditioners: raise Exception( ( "Invalid 'jacobi_preconditioner' value: " + f"{jacobi_preconditioner!r}. " + "Supported values are None, " + ", ".join(f"'{s}'" for s in supported_jacobi_preconditioners) ) ) else: self.logger.logger.info( f"Using '{jacobi_preconditioner}' Jacobi preconditioner" ) use_precon = True linear_solver_kwargs = ( {} if linear_solver_kwargs is None else dict(linear_solver_kwargs) ) precon_kwargs = {} if precon_kwargs is None else dict(precon_kwargs) adj_start = datetime.now() self.logger.logger.info(f"Starting solve_adjoint at {adj_start}") hdf5_forward_solution_fname = pl.Path(hdf5_forward_solution_fname) try: hdf = h5py.File(hdf5_forward_solution_fname, "r") except Exception as e: raise Exception( ( f"error opening hdf5 file '{hdf5_forward_solution_fname}' " + f"for PerfMeas {self._name}: {e!s}" ) ) if hdf5_adjoint_solution_fname is None: hdf5_adjoint_solution_fname = hdf5_forward_solution_fname.parent / ( f"adjoint_solution_{self._name}_" + f"{hdf5_forward_solution_fname.name}" ) else: hdf5_adjoint_solution_fname = pl.Path(hdf5_adjoint_solution_fname) if hdf5_adjoint_solution_fname.exists(): self.logger.logger.warning( ( "Removing existing adjoint solution " + f"file '{hdf5_adjoint_solution_fname}'" ) ) hdf5_adjoint_solution_fname.unlink() adf = h5py.File(hdf5_adjoint_solution_fname, "w") keys = list(hdf.keys()) gwf_package_dict = dict(hdf["gwf_info"].attrs.items()) sol_keys = [k for k in keys if k.startswith("solution")] sol_keys.sort() if len(sol_keys) == 0: raise Exception("no 'solution' keys found") kperkstp = [ (kper, kstp) for kper, kstp in zip(hdf["aux"]["kper"], hdf["aux"]["kstp"]) ] if len(kperkstp) != len(sol_keys): raise Exception( ( f"number of solution datasets ({len(sol_keys)}) != number " + f"of kper,kstp entries ({len(kperkstp)})" ) ) kk_sol_map = {} for kk in kperkstp: sol = None for s in sol_keys: skper, skstp = hdf[s].attrs["kper"], hdf[s].attrs["kstp"] # print(skper, skstp) if skper == kk[0] and skstp == kk[1]: sol = s break if sol is None: raise Exception(f"no solution dataset found for kper,kstp:{kk!s}") kk_sol_map[kk] = sol nnodes = hdf["gwf_info"]["nnodes"][:] nodeuser = hdf["gwf_info"]["nodeuser"][:] nodereduced = hdf["gwf_info"]["nodereduced"][:] if len(nodeuser) == 1: nodeuser = np.arange(nnodes[0], dtype=int) if len(nodereduced) == 1: nodereduced = None lamb = np.zeros(nnodes) grid_shape = None if "nrow" in hdf["gwf_info"].keys(): grid_shape = ( int(hdf["gwf_info"]["nlay"][0]), int(hdf["gwf_info"]["nrow"][0]), int(hdf["gwf_info"]["ncol"][0]), ) self.logger.logger.info(f"Structured grid found, shape: {grid_shape}") ia = hdf["gwf_info"]["ia"][:] ja = hdf["gwf_info"]["ja"][:] ihc = hdf["gwf_info"]["ihc"][:] jas = hdf["gwf_info"]["jas"][:] cl1 = hdf["gwf_info"]["cl1"][:] cl2 = hdf["gwf_info"]["cl2"][:] hwva = hdf["gwf_info"]["hwva"][:] top = hdf["gwf_info"]["top"][:] bot = hdf["gwf_info"]["bot"][:] icelltype = hdf["gwf_info"]["icelltype"][:] ihighcellsat = hdf["gwf_info"]["ihighcellsat"][0] comp_k33_sens = np.zeros(nnodes) comp_k_sens = np.zeros(nnodes) comp_ss_sens = None has_sto = hdf[sol_keys[0]].attrs["has_sto"] if has_sto: comp_ss_sens = np.zeros(nnodes) has_flux_pm = False for entry in self._entries: if entry.pm_type != "head": has_flux_pm = True break comp_welq_sens = np.zeros(nnodes) comp_rch_sens = np.zeros((nnodes)) bnd_dict = get_mf6_bound_dict() comp_bnd_results = {} for ptype, pnames in gwf_package_dict.items(): if ptype in bnd_dict: for pname in pnames: for idx, aname in bnd_dict[ptype].items(): comp_bnd_results[pname + "_" + aname] = np.zeros(nnodes) for itime, kk in enumerate(kperkstp[::-1]): if skip_solve and not self.__pm_available(kk): continue data = {} kper_start = datetime.now() msg = ( "Starting adjoint solve for PerfMeas: " + f"{self._name} (kper, kstp) ({int(kk[0] + 1)}, {int(kk[1] + 1)})" ) self.logger.logger.info(msg) # if itime == 0: # print(f"starting adjoint solve for PerfMeas: {self._name}") sol_key = kk_sol_map[kk] if sol_key in adf: raise Exception( f"solution key '{sol_key}' already in adjoint hdf5 file" ) start = datetime.now() self.logger.logger.debug("Forming rhs") self.logger.logger.debug("Calculate dfdh") dfdh = self.__dfdh(kk, hdf[sol_key]) self.logger.logger.debug( "Calculating dfdh took: " + f"{(datetime.now() - start).total_seconds()} seconds" ) data["dfdh"] = dfdh iss = hdf[sol_key]["iss"][0] if iss == 0: # transient # get the derv of RHS WRT head drhsdh = hdf[sol_key]["drhsdh"][:] data["drhsdh"] = drhsdh rhs = (drhsdh * lamb) - dfdh else: rhs = -dfdh self.logger.logger.debug( "Formulating rhs took: " + f"{(datetime.now() - start).total_seconds()} seconds" ) start = datetime.now() self.logger.logger.debug("Taking transpose of amat") amat = hdf[sol_key]["amat"][:] head = hdf[sol_key]["head"][:] residual = hdf[sol_key]["residual"][:] amat = sparse.csr_matrix( (amat.copy()[: ja.shape[0]], ja.copy(), ia.copy()), shape=(len(ia) - 1, len(ia) - 1), ) amat = amat.transpose() self.logger.logger.debug( ( "Transpose of amat took: " + f"{(datetime.now() - start).total_seconds()} seconds" ) ) start = datetime.now() self.logger.logger.info("Solving for lambda") is_newton = hdf[sol_key].attrs["is_newton"] self.logger.logger.debug(f"Newton-Raphson: {is_newton}") if linear_solver is None: if head.shape[0] < 50000: linear_solver = "direct" else: linear_solver = "bicgstab" self.logger.logger.debug(f"Linear solver: {linear_solver}") if linear_solver == "direct": _linear_solver = spsolve if not linear_solver_kwargs: _linear_solver_kwargs = {"use_umfpack": True} else: _linear_solver_kwargs = linear_solver_kwargs elif linear_solver in supported_iterative_solvers: if not linear_solver_kwargs: if linear_solver == "lsqr": _linear_solver_kwargs = { "btol": 1e-6, "atol": 1e-6, "iter_lim": 5000, } else: _linear_solver_kwargs = { "rtol": 1e-6, "atol": 1e-6, "maxiter": 200, } else: _linear_solver_kwargs = linear_solver_kwargs if linear_solver == "bicgstab": _linear_solver = bicgstab elif linear_solver == "cg": if is_newton: self.logger.logger.warning( "GWF model is using the Newton-Raphson formulation. " + f"Switching from '{linear_solver}' to 'bicgstab' " + "linear solver." ) linear_solver = "bicgstab" _linear_solver = bicgstab else: _linear_solver = cg elif linear_solver == "lgmres": _linear_solver = lgmres elif linear_solver == "gmres": _linear_solver = gmres elif linear_solver == "lsqr": _linear_solver = lsqr else: raise Exception( "unrecognized iterative 'linear_solver' value: " + f"'{linear_solver}', " + f"should be ({', '.join(supported_iterative_solvers)})." ) else: if not isinstance(linear_solver, types.FunctionType): raise Exception( f"specified linear_solver ('{linear_solver}') must be a " + " function if it not a supported iterative solver type. " + "Supported iterative solver types are " + f"({', '.join(supported_iterative_solvers)})." ) _linear_solver = linear_solver _linear_solver_kwargs = linear_solver_kwargs # test if a singular matrix if singular_test: self.logger.logger.info("Testing if transpose of amat is singular") is_singular = self.__is_singular(amat) if is_singular: self.logger.logger.info("Transpose of amat is singular") if tikhonov <= 0.0: tikhonov = 1e-6 # add tikhonov regularization if tikhonov > 0.0: self.logger.logger.info(f"Adding Tikhonov regularization ({tikhonov})") sign = np.sign(amat.diagonal()) mult = sign[0] I = sparse.eye(amat.shape[0], format=amat.format) amat = amat + mult * tikhonov * I # set up preconditioner m = None if linear_solver not in ("direct", "lsqr"): if use_precon: self.logger.logger.debug("Setup preconditioner") if jacobi_preconditioner is not None: m = self._setup_jacobi_preconditioner( amat, jacobi_preconditioner, precon_kwargs, ) else: if not precon_kwargs: _precon_kwargs = { "drop_tol": 1e-4, "fill_factor": 10, "drop_rule": "basic,area", } else: _precon_kwargs = precon_kwargs try: amat_ilu = spilu(amat, **_precon_kwargs) m = LinearOperator( (head.shape[0], head.shape[0]), amat_ilu.solve, ) except Exception as e: msg = ( "Failed to form preconditioner - " + f"using Jacobi preconditioned {linear_solver} " + "solver and reset maxiter to " + f"{_linear_solver_kwargs['maxiter']}" ) self.logger.logger.info(msg) error_lines = str(e).splitlines() msg = "" for line in error_lines: msg += f" {line}." self.logger.logger.debug(msg) # generate a simple Jacobi preconditioner # and increase maxiter if "maxiter" in _linear_solver_kwargs: _linear_solver_kwargs["maxiter"] += 10000 else: _linear_solver_kwargs["maxiter"] = 10000 m = self._setup_jacobi_preconditioner( amat, jacobi_type="block", precon_kwargs=_precon_kwargs, ) _linear_solver_kwargs["M"] = m if linear_solver != "direct" and ( dvclose is not None or rclose is not None ): _linear_solver_kwargs["atol"] = 0.0 _linear_solver_kwargs["rtol"] = 0.0 self.logger.logger.info( f"Solving with {linear_solver} " + f"with solver options: {_linear_solver_kwargs}" ) # solve the system of equations if linear_solver == "direct": lamb = _linear_solver(amat, rhs, **_linear_solver_kwargs) else: if dvscale: idx_max = np.abs(lamb).argmax() scale = float(np.abs(lamb[idx_max])) if scale > 1e-30: self.logger.logger.debug(f"Scaling lambda and rhs ({scale})") lamb /= scale rhs /= scale try: solver_cb = SolverCallback( logger=self.logger, A=amat, b=rhs, dvclose=dvclose, rclose=rclose, ) if linear_solver == "gmres": lamb, info = _linear_solver( amat, rhs, x0=lamb, callback=solver_cb, callback_type="x", **_linear_solver_kwargs, ) elif linear_solver == "lsqr": lamb, info = _linear_solver( amat, rhs, damp=tikhonov, x0=lamb, **_linear_solver_kwargs, ) else: lamb, info = _linear_solver( amat, rhs, x0=lamb, callback=solver_cb, **_linear_solver_kwargs, ) except StopIteration as e: self.logger.logger.info(e) lamb, info = solver_cb.xold, 0 if linear_solver in supported_iterative_solvers: # info = lamb[1] # lamb = lamb[0] if dvscale: if scale > 1e-30: self.logger.logger.debug(f"Unscaling lambda and rhs ({scale})") lamb *= scale rhs *= scale residual = rhs - amat @ lamb residual_2norm = np.linalg.norm(residual) residual_infnorm = np.linalg.norm(residual, ord=np.inf) self.logger.logger.info( ( f"Solver return code: {info} " + f"iterations: {solver_cb.niter} " + f"solver norms: L2 ({residual_2norm}) " + f"infinity ({residual_infnorm})" ) ) if info < 0: self.logger.logger.error("Solver parameter breakdown") elif info > 0: self.logger.logger.warning("Solver convergence not achieved") if np.any(np.isnan(lamb)): self.logger.logger.warning( ( f"Adjoint states for pm {self.name} contain nans " + f"at (kper,kstp) ({int(kk[0] + 1)}, {int(kk[1] + 1)})" ) ) self.logger.logger.info( ( "Solving for lambda took: " + f"{(datetime.now() - start).total_seconds()} seconds" ) ) is_newton = hdf[sol_key].attrs["is_newton"] # zero out the adj state for chd nodes chd_nodelist = [] if "chd6" in gwf_package_dict: for pname in gwf_package_dict["chd6"]: nodelist = list(hdf[sol_key][pname]["nodelist"][:] - 1) chd_nodelist.extend(nodelist) chd_nodelist = np.array(chd_nodelist, dtype=int) lamb[chd_nodelist] = 0.0 start = datetime.now() self.logger.logger.debug("Formulating lam_dresdk_h") k_sens, k33_sens = self.__lam_dresdk_h( is_newton, ihighcellsat, lamb, hdf[sol_key]["sat"][:], head, ihc, ia, ja, jas, cl1, cl2, hwva, top, bot, icelltype, hdf[sol_key]["k11"][:], hdf[sol_key]["k33"][:], ) data["k11"] = k_sens data["k33"] = k33_sens comp_k_sens += k_sens comp_k33_sens += k33_sens self.logger.logger.debug( ( "Formulating lam_dresdk_h took: " + f"{(datetime.now() - start).total_seconds()} seconds" ) ) if has_sto: start = datetime.now() self.logger.logger.debug("Formulating storage") if iss == 0: ss_sens = lamb * hdf[sol_key]["dresdss_h"][:] else: ss_sens = np.zeros_like(lamb) data["ss"] = ss_sens comp_ss_sens += ss_sens self.logger.logger.debug( ( "Formulating storage took: " + f"{(datetime.now() - start).total_seconds()} seconds" ) ) data["wel6_q"] = lamb comp_welq_sens += lamb comp_rch_sens += lamb data["rch6_recharge"] = lamb for ptype, pnames in gwf_package_dict.items(): if ptype == "chd6": continue if ptype in bnd_dict: for pname in pnames: if pname not in hdf[sol_key]: continue start = datetime.now() self.logger.logger.debug(f"Formulating {ptype}, {pname}") sp_bnd_dict = { "bound": hdf[sol_key][pname]["bound"][:], "node": hdf[sol_key][pname]["nodelist"][:], } sens_level, sens_cond = self.__lam_drhs_dbnd( lamb, head, sp_bnd_dict, has_flux_pm ) comp_bnd_results[pname + "_" + bnd_dict[ptype][0]] += sens_level data[pname + "_" + bnd_dict[ptype][0]] = sens_level if len(bnd_dict[ptype]) > 1: comp_bnd_results[pname + "_" + bnd_dict[ptype][1]] += ( sens_cond ) data[pname + "_" + bnd_dict[ptype][1]] = sens_cond self.logger.logger.debug( ( f"Formulating {ptype}, {pname} took: " + f"{(datetime.now() - start).total_seconds()} " + "seconds" ) ) data["lambda"] = lamb data["head"] = hdf[sol_key]["head"][:] msg = ( "Adjoint solve took: " + f"{(datetime.now() - kper_start).total_seconds()!s} " + f"seconds to solve adjoint solution for PerfMeas: {self._name} " + f"(kper, kstp) ({int(kk[0] + 1)}, {int(kk[1] + 1)})" ) self.logger.logger.info(msg) if self.logger.isDebugLogger: self.logger.logger.debug("Adding amat, rhs, and residual to hdf file") data["amat"] = amat data["rhs"] = rhs data["residual"] = residual self.logger.logger.info("Write group to hdf file") _write_group_to_hdf( adf, sol_key, data, nodeuser=nodeuser, grid_shape=grid_shape, nodereduced=nodereduced, logger=self.logger.logger, ) self.logger.logger.info("Formulate composite sensitivities") data = {} data["k11"] = comp_k_sens data["k33"] = comp_k33_sens if has_sto: data["ss"] = comp_ss_sens data["wel6_q"] = comp_welq_sens data["rch6_recharge"] = comp_rch_sens for name, vals in comp_bnd_results.items(): data[name] = vals self.logger.logger.info("Writing composite sensitivities") _write_group_to_hdf( adf, "composite", data, nodeuser=nodeuser, grid_shape=grid_shape, nodereduced=nodereduced, logger=self.logger.logger, ) adf.close() hdf.close() df = pd.DataFrame( { "k11": comp_k_sens, "k33": comp_k33_sens, "wel6_q": comp_welq_sens, "rch6_recharge": comp_rch_sens, }, index=nodeuser + 1, ) for name, vals in comp_bnd_results.items(): df[name] = vals if has_sto: df["ss"] = comp_ss_sens df.index.name = "node" if csv_summary: csv_path = hdf5_adjoint_solution_fname.with_suffix(".csv") df.to_csv(csv_path) kper, kstp = kk dtsec = (datetime.now() - adj_start).total_seconds() line = ( "Adjoint solve took: " + f"{dtsec:10.5g} seconds " + f"for pm '{self._name}' at (kper,kstp) ({int(kper + 1)}, {int(kstp + 1)})" ) self.logger.logger.info(line) return df
def __dconddhk(self, k1, k2, cl1, cl2, width, height1, height2) -> float: """Return the derivative of intercell conductance with respect to ``K``. The expression follows the MODFLOW-style conductance formulation for a two-cell connection and is used when building hydraulic-conductivity sensitivities. Parameters ---------- k1 : float Hydraulic conductivity for connection 1. k2 : float Hydraulic conductivity for connection 2. cl1 : float Length of connection 1. cl2 : float Length of connection 2. width : float Connection width. height1 : float Saturated height of connection 1. height2 : float Saturated height of connection 2. Returns ------- float Derivative of connection conductance with respect to hydraulic conductivity. """ # todo: upstream weighting - could use height1 and height2 to check... # todo: vertically staggered d = (width * cl1 * height1 * (height2**2) * (k2**2)) / ( ((cl2 * height1 * k1) + (cl1 * height2 * k2)) ** 2 ) return d def __smooth_sat_simple(self, sat) -> float: """Smooth saturation using the MODFLOW 6 sigmoid-style function. Parameters ---------- sat : float Saturation value. Returns ------- float Smoothed saturation bounded between 0 and 1. """ satomega = 1.0e-6 A_omega = 1 / (1 - satomega) s_sat = 1.0 if sat < 0: s_sat = 0 elif sat >= 0 and sat < satomega: s_sat = (A_omega / (2 * satomega)) * sat**2 elif sat >= satomega and sat < 1 - satomega: s_sat = A_omega * sat + 0.5 * (1 - A_omega) elif sat >= 1 - satomega and sat < 1: s_sat = 1 - (A_omega / (2 * satomega)) * ((1 - sat) ** 2) return s_sat def __d_smooth_sat_dh_simple(self, sat, top, bot) -> float: """Return the derivative of smoothed saturation with respect to head. Parameters ---------- sat : float Saturation. top : float Cell top elevation. bot : float Cell bottom elevation. Returns ------- float Derivative of the smoothed saturation curve with respect to head. """ satomega = 1.0e-6 A_omega = 1 / (1 - satomega) d_s_sat_dh = 0.0 if sat >= 0 and sat < satomega: d_s_sat_dh = (A_omega / satomega) * sat / (top - bot) elif sat >= satomega and sat < 1 - satomega: d_s_sat_dh = A_omega / (top - bot) elif sat >= 1 - satomega and sat < 1: d_s_sat_dh = (A_omega / satomega) * (1 - sat) / (top - bot) return d_s_sat_dh def __cell_sat(self, top, bot, h) -> float: """Return cell saturation from cell top, bottom, and head. Parameters ---------- top : float Cell top elevation. bot : float Cell bottom elevation. h : float Cell head. Returns ------- float Cell saturation clipped to the interval ``[0, 1]``. """ if h > top: sat = 1.0 elif h < bot: sat = 0.0 else: sat = (h - bot) / (top - bot) return sat def __smooth_sat(self, ihighcellsat, top1, top2, bot1, bot2, h1, h2) -> float: """Return smoothed saturation for the upstream cell. Parameters ---------- ihighcellsat : int Whether to use the highest cell bottom when calculating saturation. top1 : float Top elevation of node 1. top2 : float Top elevation of node 2. bot1 : float Bottom elevation of node 1. bot2 : float Bottom elevation of node 2. h1 : float Head of node 1. h2 : float Head of node 2. Returns ------- float Smoothed saturation of the upstream node selected from the connection pair. """ bot = None if ihighcellsat != 0: if (abs(bot1) - abs(bot2)) > 1e-2: bot = max(bot1, bot2) if h1 > h2: if bot is None: sat = self.__cell_sat(top1, bot1, h1) else: sat = self.__cell_sat(top1, bot, h1) else: if bot is None: sat = self.__cell_sat(top2, bot2, h2) else: sat = self.__cell_sat(top2, bot, h2) return self.__smooth_sat_simple(sat) def __d_smooth_sat_dh(self, sat, h1, h2, top, bot) -> float: """Return the derivative of smoothed saturation with respect to upstream head. Parameters ---------- sat : float Saturation. h1 : float Head of node 1. h2 : float Head of node 2. top : float Upstream cell top elevation. bot : float Upstream cell bottom elevation. Returns ------- float Derivative of smoothed saturation with respect to the upstream head. Returns zero when ``h1`` is not the upstream head. """ value = 0.0 if h1 >= h2: value = self.__d_smooth_sat_dh_simple(sat, top, bot) return value def __lam_dresdk_h( self, is_newton, ihighcellsat, lamb, sat, head, ihc, ia, ja, jas, cl1, cl2, hwva, top, bot, icelltype, k11, k33, ) -> tuple[np.ndarray, np.ndarray]: """Return adjoint-weighted residual derivatives. Derivatives are with respect to ``k11`` and ``k33``. Parameters ---------- is_newton : bool Whether Newton terms are active. ihighcellsat : int Whether to use the highest cell bottom when calculating saturation. lamb : ndarray Adjoint state array. sat : ndarray Saturation array. head : ndarray Head array. ihc : ndarray Horizontal connection indicator array. ia : ndarray Connection index array in compressed sparse row format. ja : ndarray Connection array in compressed sparse row format. jas : ndarray Full connectivity array. cl1 : ndarray Connection length array for connection 1. cl2 : ndarray Connection length array for connection 2. hwva : ndarray Horizontal-width/vertical-area array. top : ndarray Top elevations. bot : ndarray Bottom elevations. icelltype : ndarray Convertible-cell type indicator array. k11 : ndarray Horizontal hydraulic conductivity array. k33 : ndarray Vertical hydraulic conductivity array. Returns ------- tuple[ndarray, ndarray] Adjoint-weighted residual derivatives with respect to ``k11`` and ``k33``. """ iac = np.array([ia[i + 1] - ia[i] for i in range(len(ia) - 1)]) # array of number of connections per node (size nodes) sat_mod = sat.copy() sat_mod[icelltype == 0] = 1.0 height = top - bot result33 = np.zeros_like(head) result = np.zeros_like(head) for node, (offset, ncon) in enumerate(zip(ia, iac)): sum1 = 0.0 sum2 = 0.0 height1 = height[node] for ii in range(offset + 1, offset + ncon): mnode = ja[ii] height2 = height[mnode] jj = jas[ii] if jj < 0: raise Exception() iihc = ihc[jj] if iihc == 0: # vertical con dconddk33 = self.__dconddhk( k33[node], k33[mnode], 0.5 * height1, 0.5 * height2, hwva[jj], 1.0, 1.0, ) t2 = ( dconddk33 * (head[mnode] - head[node]) * (lamb[node] - lamb[mnode]) ) sum1 += t2 else: # TODO: check if one cell is convertible (??is this required??) if is_newton: dconddk = self.__dconddhk( k11[node], k11[mnode], cl1[jj], cl2[jj], hwva[jj], height1, height2, ) SF = self.__smooth_sat( ihighcellsat, top[node], top[mnode], bot[node], bot[mnode], head[node], head[mnode], ) else: dconddk = self.__dconddhk( k11[node], k11[mnode], cl1[jj], cl2[jj], hwva[jj], height1 * sat_mod[node], height2 * sat_mod[mnode], ) SF = 1.0 t1 = ( SF * dconddk * (head[mnode] - head[node]) * (lamb[node] - lamb[mnode]) ) sum2 += t1 result33[node] = sum1 result[node] = sum2 return result, result33 def __lam_drhs_dbnd( self, lamb: np.ndarray, head: np.ndarray, sp_dict: dict, has_flux_pm: bool, ) -> tuple[np.ndarray, np.ndarray]: """Return adjoint-weighted derivatives with respect to boundary terms. The ``sp_dict["bound"]`` array is interpreted positionally with ``bound[0]`` as boundary head and optional ``bound[1]`` as boundary conductance. Parameters ---------- lamb : ndarray Adjoint state array. head : ndarray Head array. sp_dict : dict Stress-package data for a single time step containing at least ``node`` and ``bound`` arrays. has_flux_pm : bool Whether the performance measure has a direct flux contribution in addition to the adjoint-weighted response. Returns ------- tuple[ndarray, ndarray] Two arrays containing derivatives with respect to the head-like boundary term and the conductance-like boundary term, respectively. """ result_head = np.zeros_like(lamb) result_cond = np.zeros_like(lamb) # for id in sp_dict: for node, bound in zip(sp_dict["node"], sp_dict["bound"]): n = node - 1 boundcond = 1e10 if len(bound) > 1: boundcond = bound[1] # the second item in bound should be cond result_head[n] = lamb[n] * boundcond # Add the direct effect if has_flux_pm: result_head[n] += boundcond # the first item in bound should be head lam_drhs_dcond = lamb[n] * bound[0] lam_dadcond_h = -1.0 * lamb[n] * head[n] result_cond[n] = lam_drhs_dcond + lam_dadcond_h # Add the direct effect if has_flux_pm: result_cond[n] += bound[0] - head[n] return result_head, result_cond def __pm_available(self, kk) -> bool: """Return whether a performance measure entry exists for a given time. Parameters ---------- kk : tuple Zero-based ``(kper, kstp)`` tuple. Returns ------- bool True if a performance measure entry exists, otherwise False. """ relevant_entries = [p for p in self._entries if p.kperkstp == kk] return len(relevant_entries) > 0 def __dfdh(self, kk, sol_dataset) -> np.ndarray: """Return the derivative of the performance measure with respect to head. Parameters ---------- kk : tuple Zero-based stress period and time step. sol_dataset : h5py.Group Forward-solution HDF5 group for one time step. The group must contain a ``head`` dataset and, for flux performance measures, package subgroups with ``nodelist`` and ``hcof`` datasets. Returns ------- ndarray Derivative of the performance measure with respect to head at the requested time step. """ # 1. Load data once. Avoid [:] if sol_dataset is already in memory, # but for HDF5, reading once into a local variable is best. head = sol_dataset["head"][:] dfdh = np.zeros_like(head) # 2. Cache 'hcof' and 'nodelist' to avoid repeated disk I/O # Also pre-build a map for O(1) index lookups cached_maps = {} # 3. Filter entries first to reduce iterations relevant_entries = [p for p in self._entries if p.kperkstp == kk] for pfr in relevant_entries: if pfr.pm_type == "head": if pfr.pm_form == "direct": dfdh[pfr.inode] = pfr.weight elif pfr.pm_form == "residual": # Scalar math is fine here, but make sure head is a numpy array dfdh[pfr.inode] = 2.0 * pfr.weight * (head[pfr.inode] - pfr.obsval) else: # Handle non-head types with caching if pfr.pm_type not in cached_maps: # Store hcof and a mapping of inode -> index hcof = sol_dataset[pfr.pm_type]["hcof"][:] nodes = sol_dataset[pfr.pm_type]["nodelist"][:] - 1 # Dict comprehension is much faster than np.where inside a loop node_to_idx = {node: i for i, node in enumerate(nodes)} cached_maps[pfr.pm_type] = (hcof, node_to_idx) hcof_arr, node_map = cached_maps[pfr.pm_type] # Fast O(1) lookup if pfr.inode in node_map: idx = node_map[pfr.inode] dfdh[pfr.inode] = hcof_arr[idx] return dfdh def __get_value_from_gwf(self, gwf_name, pak_name, prop_name, gwf) -> Any: """Return a copy of a quantity from the MODFLOW 6 API. Parameters ---------- gwf_name : str Name of the groundwater-flow instance. pak_name : str Package name from the GWF name file. prop_name : str Property name in ``pak_name``. gwf : modflowapi.ModflowApi MODFLOW 6 groundwater-flow API instance. Returns ------- Any Requested quantity copied from the MODFLOW 6 API. """ addr = gwf.get_var_address(prop_name, gwf_name, pak_name) return gwf.get_value(addr) def __is_singular(self, A, tol=1e-10) -> bool: """Return whether a matrix is singular or numerically ill-conditioned. Parameters ---------- A : scipy.sparse.spmatrix or LinearOperator Matrix to evaluate. tol : float, optional Absolute tolerance used to test the smallest singular value. Returns ------- bool True if the matrix is singular or effectively singular. """ # svds computes the k largest or smallest singular values. # To check for singularity, we need the smallest ones. # 'SM' specifies "smallest magnitude". # We request only one singular value (k=1) for efficiency. try: # Note: svds works on square or rectangular matrices # We need to ensure k is less than min(M, N) for svds to work min_dim = min(A.shape) if min_dim == 0: return True # Empty matrix is singular k = max(1, min(5, min_dim - 1)) # Get a few smallest, ensure k >= 1 # Use 'SM' to find smallest magnitude singular values # svds returns U, s, Vh _, s, _ = svds(A, k=k, which="SM") # Check if the smallest singular value is close to zero smallest_s_value = np.min(s) return np.isclose(smallest_s_value, 0.0, atol=tol) except RuntimeError: # svds might raise a RuntimeError for very difficult # cases (e.g., convergence issues) return True # Assume singular or ill-conditioned if solver fails