Source code for mf6adj.adj

import logging
import pathlib as pl
import shutil
import uuid
from collections.abc import Callable
from datetime import datetime
from typing import Optional, Union

import flopy
import h5py
import modflowapi
import numpy as np
import pandas as pd

from .pm import PerfMeas, PerfMeasRecord
from .utils.utils import _utils_cd
from .utils.utils_fileio import _write_group_to_hdf
from .utils.utils_logger import _LoggerUtil
from .utils.utils_modflow import (
    get_lrc,
    get_mf6_bound_dict,
    get_model_names_from_mfsim,
    get_package_names_from_gwfname,
    get_ptr_from_gwf,
    has_sto_iconvert,
)
from .utils.utils_pm_read import (
    get_adj_parse_context,
    read_adj_file,
)

DT_FMT = "%Y-%m-%d %H:%M:%S"
PathLike = Union[str, pl.Path]


[docs] class Mf6Adj: """MODFLOW 6 adjoint solver interface. Parameters ---------- adj_filename : str Adjoint input filename. lib_name : str MODFLOW 6 shared library path. logging_level : str or int, optional Logging level (``DEBUG``, ``INFO``, ``WARNING``, ``ERROR``, or ``CRITICAL``). logging_filename : str or pl.Path, optional Optional log filename. If omitted, logging is limited to the console. working_directory : str or pl.Path, optional Working directory. If omitted, the current directory is used. """ def __init__( self, adj_filename: str, lib_name: str, logging_level: Union[int, str] = "INFO", logging_filename: Optional[PathLike] = None, working_directory: Optional[PathLike] = None, ): """Initialize the MODFLOW 6 adjoint helper.""" if working_directory is None: working_directory = pl.Path(".").resolve() self.working_directory = pl.Path(working_directory).resolve() with _utils_cd(self.working_directory): adj_filename = pl.Path(adj_filename) if not adj_filename.is_file(): raise Exception(f"adj_filename '{adj_filename}' not found") self.adj_filename = adj_filename # setup logger logger_name = f"{self.__class__.__name__}-{adj_filename.stem}" self.logger = _LoggerUtil( logger_name, logging_level, logging_filename, ) # process the flow model # make sure the lib exists if not pl.Path(lib_name).exists(): self.logger.logger.warning( f"MODFLOW 6 library file '{lib_name}' not found...continuing..." ) # find the model name self._gwf_model_dict, namfile_dict = get_model_names_from_mfsim(".") if len(self._gwf_model_dict) != 1: raise Exception("only one model is currently supported") self._gwf_name = next(iter(self._gwf_model_dict.keys())) self._gwf_namfile = namfile_dict[self._gwf_name] self._gwf_package_dict = get_package_names_from_gwfname(self._gwf_namfile) if self._gwf_model_dict[self._gwf_name] != "gwf6": raise Exception( f"model is not a gwf6 type: {self._gwf_model_dict[self._gwf_name]}" ) if "dis6" in self._gwf_package_dict: self.logger.logger.info("Structured grid found") is_structured = True unstructured_type = None elif "disv6" in self._gwf_package_dict: self.logger.logger.info("Unstructured disv grid found") is_structured = False unstructured_type = "disv" elif "disu6" in self._gwf_package_dict: self.logger.logger.info("Unstructured disu grid found") is_structured = False unstructured_type = "disu" else: raise Exception("gwf6 model discretization is not dis, disu, or disv.") self._gwf = None self._lib_name = lib_name self._flow_dir = "." self._gwf = self._initialize_gwf(lib_name, self._flow_dir) self._gwf_version = self._get_gwf_version() self._hdf5_name = None self._structured_mg = None self.is_structured = is_structured self.unstructured_type = unstructured_type self._shape = None if self.is_structured: nlay = self._gwf.get_value( self._gwf.get_var_address("NLAY", self._gwf_name.upper(), "DIS") )[0] nrow = self._gwf.get_value( self._gwf.get_var_address("NROW", self._gwf_name.upper(), "DIS") )[0] ncol = self._gwf.get_value( self._gwf.get_var_address("NCOL", self._gwf_name.upper(), "DIS") )[0] self._structured_mg = flopy.discretization.StructuredGrid( nrow=nrow, ncol=ncol, nlay=nlay ) self._shape = (nlay, nrow, ncol) self._performance_measures = [] self._read_adj_file() self._gwf_package_types = [ "chd6", "wel6", "ghb6", "riv6", "drn6", "sfr6", "rch6", "recha6", "evt6", ] self._gwf_boundary_attr_dict = { "chd6": ["head"], "ghb6": ["bhead", "cond"], "riv6": ["stage", "cond"], "drn6": ["elev", "cond"], "wel6": ["q"], "rch6": ["recharge"], } def _add_performance_measure( self, pm_name: str, pm_entries: list[PerfMeasRecord] ) -> None: """Validate and register a parsed performance measure. Parameters ---------- pm_name : str Performance-measure name. pm_entries : list[PerfMeasRecord] Entries that make up the performance measure. Notes ----- A performance measure must contain at least one entry, cannot mix ``direct`` and ``residual`` forms, and cannot mix ``head`` entries with flux-package entries. """ if len(pm_entries) == 0: raise Exception(f"no entries found for PM {pm_name}") pm_types = {entry.pm_type for entry in pm_entries} pm_forms = {entry.pm_form for entry in pm_entries} if len(pm_forms) > 1: raise Exception( "performance measure" + f"{pm_name} has mixed 'pm_forms' ({pm_forms}), " + "this is not supported" ) if next(iter(pm_types)) != "head" and next(iter(pm_forms)) != "direct": raise Exception( "performance measure" + pm_name + " has a flux 'pm_form' and is a " + "residual 'pm_type', this is not supported" ) if pm_name in [pm._name for pm in self._performance_measures]: raise Exception(f"PM {pm_name} multiply defined") self._performance_measures.append( PerfMeas( pm_name, pm_entries, self.logger.level, self.logger, ) ) def _read_adj_file(self) -> None: """Load performance-measure definitions from the adjoint input file. This method resets the in-memory performance-measure list, gathers the MODFLOW 6 parse context, and delegates the file parsing to ``utils_pm_read.read_adj_file``. Returns ------- None Parsed performance measures are stored on ``self._performance_measures``. """ # clear any existing PMs self._performance_measures = [] self.logger.logger.info(f"Processing adjoint file: {self.adj_filename}") nuser, nstp, nper, ncpl = get_adj_parse_context( gwf=self._gwf, gwf_name=self._gwf_name, is_structured=self.is_structured, unstructured_type=self.unstructured_type, ) self._hdf5_name = read_adj_file( adj_filename=self.adj_filename, nuser=nuser, nstp=nstp, nper=nper, ncpl=ncpl, is_structured=self.is_structured, unstructured_type=self.unstructured_type, shape=self._shape, gwf_package_dict=self._gwf_package_dict, record_factory=PerfMeasRecord, add_performance_measure=self._add_performance_measure, current_hdf5_name=self._hdf5_name, logger=self.logger.logger, ) def _open_hdf(self, tag: Optional[PathLike]) -> h5py.File: """Open an HDF5 file for writing. Parameters ---------- tag : str or PathLike, optional Filename or filename prefix. Returns ------- h5py.File Open HDF5 file handle. The resolved filename is also stored on ``self._hdf5_name``. """ if tag is None: fname = ( self._gwf_name + "_" + datetime.now().strftime("%Y-%m-%d_%H-%M-%S") + ".hd5" ) else: fname = tag fname = pl.Path(fname) self._hdf5_name = fname if fname.exists(): fname.unlink() f = h5py.File(fname, "w") return f def _add_gwf_info_to_hdf(self, hdf: h5py.File) -> None: """Add model structure and metadata to an HDF5 file. Parameters ---------- hdf : h5py.File Open HDF5 file handle. Notes ----- The written group includes grid geometry, connectivity arrays, hydraulic-property arrays, and selected MODFLOW 6 metadata needed by later adjoint-processing steps. Returns ------- None This method writes data in place to the supplied HDF5 handle. """ gwf_name = self._gwf_name gwf = self._gwf has_sto = has_sto_iconvert(gwf) data_dict = {} dis_pak = "DIS" ihc = get_ptr_from_gwf(gwf_name, "CON", "IHC", gwf) data_dict["ihc"] = ihc ia = get_ptr_from_gwf(gwf_name, "CON", "IA", gwf) - 1 data_dict["ia"] = ia ja = get_ptr_from_gwf(gwf_name, "CON", "JA", gwf) - 1 data_dict["ja"] = ja jas = get_ptr_from_gwf(gwf_name, "CON", "JAS", gwf) - 1 data_dict["jas"] = jas cl1 = get_ptr_from_gwf(gwf_name, "CON", "CL1", gwf) data_dict["cl1"] = cl1 cl2 = get_ptr_from_gwf(gwf_name, "CON", "CL2", gwf) data_dict["cl2"] = cl2 hwva = get_ptr_from_gwf(gwf_name, "CON", "HWVA", gwf) data_dict["hwva"] = hwva top = get_ptr_from_gwf(gwf_name, dis_pak, "TOP", gwf) data_dict["top"] = top bot = get_ptr_from_gwf(gwf_name, dis_pak, "BOT", gwf) data_dict["bot"] = bot iac = np.array([ia[i + 1] - ia[i] for i in range(len(ia) - 1)]) data_dict["iac"] = iac icelltype = get_ptr_from_gwf(gwf_name, "NPF", "ICELLTYPE", gwf) data_dict["icelltype"] = icelltype if self._gwf_version > "6.6.3": ihighcellsat = get_ptr_from_gwf( gwf_name, "NPF", "IHIGHCELLSAT", gwf, ) else: ihighcellsat = np.array([0], dtype=int) ihighcellsat_value = int(np.asarray(ihighcellsat).ravel()[0]) if ihighcellsat_value != 0: self.logger.logger.info("HIGHEST_CELL_SATURATION option specified") data_dict["ihighcellsat"] = ihighcellsat area = get_ptr_from_gwf(gwf_name, dis_pak, "AREA", gwf) data_dict["area"] = area if has_sto: iconvert = get_ptr_from_gwf(gwf_name, "STO", "ICONVERT", gwf) data_dict["iconvert"] = iconvert storage = get_ptr_from_gwf(gwf_name, "STO", "SS", gwf) data_dict["storage"] = storage sy = get_ptr_from_gwf(gwf_name, "STO", "SY", gwf) data_dict["sy"] = sy nodeuser = get_ptr_from_gwf(gwf_name, dis_pak, "NODEUSER", gwf) - 1 data_dict["nodeuser"] = nodeuser nodereduced = get_ptr_from_gwf(gwf_name, dis_pak, "NODEREDUCED", gwf) - 1 data_dict["nodereduced"] = nodereduced ndim = get_ptr_from_gwf(gwf_name, dis_pak, "NDIM", gwf) data_dict["ndim"] = ndim nnodes = get_ptr_from_gwf(gwf_name, "CON", "NODES", gwf) data_dict["nnodes"] = nnodes idomain = get_ptr_from_gwf(gwf_name, "DIS", "IDOMAIN", gwf) data_dict["idomain"] = idomain if self.is_structured: nlay = get_ptr_from_gwf(gwf_name, dis_pak, "NLAY", gwf) data_dict["nlay"] = nlay nrow = get_ptr_from_gwf(gwf_name, dis_pak, "NROW", gwf) data_dict["nrow"] = nrow ncol = get_ptr_from_gwf(gwf_name, dis_pak, "NCOL", gwf) data_dict["ncol"] = ncol _write_group_to_hdf( hdf, "gwf_info", data_dict, attr_dict=self._gwf_package_dict, logger=self.logger.logger, ) def _dresdss_h( self, head: np.ndarray, head_old: np.ndarray, dt: float, sat: np.ndarray, sat_old: np.ndarray, ) -> np.ndarray: """Return the storage contribution to the residual derivative. This term represents the derivative of the transient groundwater-flow residual with respect to specific storage for the current time step. It accounts for partially saturated conditions by forcing confined cells (``ICONVERT == 0``) to remain fully saturated in the calculation. Parameters ---------- head : ndarray Current heads. head_old : ndarray Heads from the previous solution step. dt : float Length of the current solution step in model time. sat : ndarray Current saturation. sat_old : ndarray Saturation from the previous solution step. Returns ------- ndarray Per-cell derivative term used later in the adjoint sensitivity calculation for specific storage. """ top = get_ptr_from_gwf(self._gwf_name, "DIS", "TOP", self._gwf) bot = get_ptr_from_gwf(self._gwf_name, "DIS", "BOT", self._gwf) area = get_ptr_from_gwf(self._gwf_name, "DIS", "AREA", self._gwf) iconvert = get_ptr_from_gwf(self._gwf_name, "STO", "ICONVERT", self._gwf) # handle iconvert sat_mod = sat.copy() sat_mod[iconvert == 0] = 1.0 sat_old_mod = sat_old.copy() sat_old_mod[iconvert == 0] = 1.0 height = top - bot # result = np.zeros_like(head) dSC1 = area * height result = ( (dSC1 / dt) * (sat_old_mod * head_old - sat_mod * head) + (dSC1 / dt) * bot * (sat_mod - sat_old_mod) + (dSC1 / (2.0 * dt)) * height * (sat_mod**2 - sat_old_mod**2) ) # zero out dry cells result[head <= bot] = 0.0 result[head_old <= bot] = 0.0 return result def _drhsdh( self, dt: float, sat_old: np.ndarray, ) -> np.ndarray: """Return the head derivative of the storage-related right-hand side. The derivative combines confined-storage and specific-yield terms for the previous time step. Specific-yield contributions are suppressed in fully saturated cells. Parameters ---------- dt : float Length of the current solution step in model time. sat_old : ndarray Saturation from the previous solution step. Returns ------- ndarray Per-cell derivative of the storage-related right-hand side with respect to head. """ area = get_ptr_from_gwf(self._gwf_name, "DIS", "AREA", self._gwf) # specific storage top = get_ptr_from_gwf(self._gwf_name, "DIS", "TOP", self._gwf) bot = get_ptr_from_gwf(self._gwf_name, "DIS", "BOT", self._gwf) storage = get_ptr_from_gwf(self._gwf_name, "STO", "SS", self._gwf) # specific yield iconvert = get_ptr_from_gwf(self._gwf_name, "STO", "ICONVERT", self._gwf) sy = get_ptr_from_gwf(self._gwf_name, "STO", "SY", self._gwf) sat_old_mod = sat_old.copy() sat_old_mod[iconvert == 0] = 1.0 sy_mod = sy.copy() sy_mod[sat_old_mod == 1.0] = 0.0 # calculate drhsdh drhsdh = -1.0 * area * (storage * (top - bot) + sy_mod) / dt return drhsdh
[docs] def solve_forward_model( self, verbose: bool = True, force_k_update: bool = False, sp_pert_dict: dict | None = None, pert_save: bool = False, hdf5_name: PathLike | None = None, solve_func_ptr: Callable[[modflowapi.ModflowApi], None] | None = None, presolve_func_ptr: Callable[[modflowapi.ModflowApi], None] | None = None, postsolve_func_ptr: Callable[[modflowapi.ModflowApi], None] | None = None, ) -> tuple[dict, dict] | None: """Solve the forward model and store adjoint inputs in HDF5. The forward model is run for all MODFLOW 6 time steps, and the solution components needed for the adjoint solve are harvested and written to the HDF5 file. Parameters ---------- verbose : bool, optional Whether to report progress to stdout. force_k_update : bool, optional Force MODFLOW 6 to reprocess the ``K`` and ``K33`` arrays. Used only during perturbation testing. sp_pert_dict : dict, optional Perturbed boundary information used during perturbation testing. pert_save : bool, optional Save additional information for perturbation testing. hdf5_name : PathLike, optional Output HDF5 filename for forward-solution components. If omitted, a generic time-stamped filename is created. solve_func_ptr : callable, optional Callback invoked with the ``modflowapi.ModflowApi`` instance before each solver iteration. If omitted, no callback is run. presolve_func_ptr : callable, optional Callback invoked with the ``modflowapi.ModflowApi`` instance at the start of each time step, before the solve loop begins. If omitted, no callback is run. postsolve_func_ptr : callable, optional Callback invoked with the ``modflowapi.ModflowApi`` instance after each time step is finalized. If omitted, no callback is run. Returns ------- tuple[dict, dict] or None Perturbation-testing data when ``pert_save`` is ``True``; otherwise ``None``. """ with _utils_cd(self.working_directory): if self._gwf is None: raise Exception("gwf is None") if hdf5_name is not None: self._hdf5_name = hdf5_name fhd = self._open_hdf(self._hdf5_name) sim_start = datetime.now() self.logger.logger.info("Starting flow solution") # get current sim time ctime = self._gwf.get_current_time() # get ending sim time etime = self._gwf.get_end_time() # max number of iterations max_iter = self._gwf.get_value(self._gwf.get_var_address("MXITER", "SLN_1")) # let's do it! num_fails = 0 sat_old = None visited = [] ctimes = [] dts = [] kpers, kstps = [], [] nnode = self._gwf.get_value( self._gwf.get_var_address("NODES", self._gwf_name, "DIS") )[0] is_newton = self._gwf.get_value( self._gwf.get_var_address("INEWTON", self._gwf_name) )[0] has_sto = False if has_sto_iconvert(self._gwf): has_sto = True sp_package_data = None head_dict = None if pert_save: sp_package_data = {} head_dict = {} while ctime < etime: sol_start = datetime.now() # the length of this sim time dt = self._gwf.get_time_step() # prep the current time step self._gwf.prepare_time_step(dt) kiter = 0 # prep to solve stress_period = self._gwf.get_value( self._gwf.get_var_address("KPER", "TDIS") )[0] time_step = self._gwf.get_value( self._gwf.get_var_address("KSTP", "TDIS") )[0] kper, kstp = stress_period - 1, time_step - 1 kperkstp = (kper, kstp) # this is to force mf6 to update cond sat using the k11 and k33 arrays # which is needed for the perturbation testing if kper == 0 and kstp == 0 and force_k_update: kchangeper = self._gwf.get_value_ptr( self._gwf.get_var_address("KCHANGEPER", self._gwf_name, "NPF") ) kchangestp = self._gwf.get_value_ptr( self._gwf.get_var_address("KCHANGESTP", self._gwf_name, "NPF") ) kchangestp[0] = time_step kchangeper[0] = stress_period nodekchange = self._gwf.get_value_ptr( self._gwf.get_var_address("NODEKCHANGE", self._gwf_name, "NPF") ) nodekchange[:] = 1 # apply any boundary condition perturbation info if sp_pert_dict is not None: if sp_pert_dict["kperkstp"] == kperkstp: for pert_item in self._gwf_boundary_attr_dict[ sp_pert_dict["packagetype"] ]: if pert_item not in sp_pert_dict: self.logger.logger.info( f"pert_item '{pert_item}' not in sp_pert_dict" ) continue addr = [ pert_item.upper(), self._gwf_name, sp_pert_dict["packagename"].upper(), ] wbaddr = self._gwf.get_var_address(*addr) bnd_ptr = self._gwf.get_value_ptr(wbaddr) wbaddr = self._gwf.get_var_address( "NODELIST", self._gwf_name, sp_pert_dict["packagename"].upper(), ) nodelist = self._gwf.get_value_ptr(wbaddr) idx = np.where(nodelist == sp_pert_dict["node"])[0] if idx.shape[0] == 0: print(nodelist) raise Exception( "sp pert dict node not found :" + str(sp_pert_dict) ) bnd_ptr[idx] = sp_pert_dict[pert_item] if presolve_func_ptr is not None: presolve_func_ptr(self._gwf) self._gwf.prepare_solve(1) if sat_old is None: sat_old = self._gwf.get_value( self._gwf.get_var_address("SAT", self._gwf_name, "NPF") ) # solve until converged while kiter < max_iter: if solve_func_ptr is not None: solve_func_ptr(self._gwf) convg = self._gwf.solve(1) if convg: td = (datetime.now() - sol_start).total_seconds() / 60.0 if verbose: self.logger.logger.info( f"Flow (stress period,time step) ({stress_period}," + f"{time_step}) converged in {kiter} iters, took " + f"{td:10.5G} mins" ) break kiter += 1 if not convg: td = (datetime.now() - sol_start).total_seconds() / 60.0 if verbose: self.logger.logger.info( f"Flow stress period,time step {stress_period},{time_step} " + f"did not converge, {kiter} iters, took {td:10.5G} mins" ) num_fails += 1 try: self._gwf.finalize_solve(1) except Exception as e: print(f"{e}\n\nCould not execute finalize_solve()") self._gwf.finalize_time_step() if postsolve_func_ptr is not None: postsolve_func_ptr(self._gwf) # update current sim time ctime = self._gwf.get_current_time() dt1 = self._gwf.get_time_step() ctimes.append(ctime) dts.append(dt1) kpers.append(kper) kstps.append(kstp) if kperkstp in visited: raise Exception(f"{kperkstp} already visited") visited.append(kperkstp) amat = self._gwf.get_value( self._gwf.get_var_address("AMAT", "SLN_1") ).copy() data_dict = {"amat": amat} residual = self._gwf.get_value( self._gwf.get_var_address("D", "SLN_1", "IMSLINEAR") ).copy() data_dict["residual"] = residual head = self._gwf.get_value( self._gwf.get_var_address("X", self._gwf_name.upper()) )[:nnode] data_dict["head"] = head if pert_save: head_dict[kperkstp] = head head_old = self._gwf.get_value( self._gwf.get_var_address("XOLD", self._gwf_name.upper()) )[:nnode] data_dict["head_old"] = head_old k11 = self._gwf.get_value( self._gwf.get_var_address("K11", self._gwf_name.upper(), "NPF") ) data_dict["k11"] = k11 k33 = self._gwf.get_value( self._gwf.get_var_address("K33", self._gwf_name.upper(), "NPF") ) data_dict["k33"] = k33 condsat = self._gwf.get_value( self._gwf.get_var_address("CONDSAT", self._gwf_name.upper(), "NPF") ) data_dict["condsat"] = condsat iss = self._gwf.get_value( self._gwf.get_var_address("ISS", self._gwf_name.upper()) ) data_dict["iss"] = iss sat = self._gwf.get_value( self._gwf.get_var_address("SAT", self._gwf_name, "NPF") ) data_dict["sat"] = sat data_dict["sat_old"] = sat_old sat_old = sat.copy() if has_sto: # has storage dresdss_h = self._dresdss_h(head, head_old, dt1, sat, sat_old) data_dict["dresdss_h"] = dresdss_h drhsdh = self._drhsdh(dt1, sat_old) data_dict["drhsdh"] = drhsdh else: data_dict["drhsdh"] = np.zeros_like(sat_old) for package_type in self._gwf_package_types: if package_type in self._gwf_package_dict: if pert_save and package_type not in sp_package_data: sp_package_data[package_type] = {} for tag in self._gwf_package_dict[package_type]: nbound = self._gwf.get_value( self._gwf.get_var_address( "NBOUND", self._gwf_name, tag.upper() ) )[0] if nbound > 0: if ( pert_save and kperkstp in sp_package_data[package_type] ): if len(self._gwf_package_dict[package_type]) == 1: raise Exception( f"kperkstp '{kperkstp}' already in " + "sp_package_data" ) else: pass elif pert_save: sp_package_data[package_type][kperkstp] = [] nodelist = self._gwf.get_value( self._gwf.get_var_address( "NODELIST", self._gwf_name, tag.upper() ) ) bound = self._gwf.get_value( self._gwf.get_var_address( "BOUND", self._gwf_name, tag.upper() ) ) hcof = self._gwf.get_value( self._gwf.get_var_address( "HCOF", self._gwf_name, tag.upper() ) ) rhs = self._gwf.get_value( self._gwf.get_var_address( "RHS", self._gwf_name, tag.upper() ) ) simvals = self._gwf.get_value( self._gwf.get_var_address( "SIMVALS", self._gwf_name, tag.upper() ) ) bnd_attrs = {} if package_type in self._gwf_boundary_attr_dict: fill_bound = False if bound.size == 0: bound = np.zeros( ( len(nodelist), len( self._gwf_boundary_attr_dict[ package_type ] ), ) ) fill_bound = True for i, attr in enumerate( self._gwf_boundary_attr_dict[package_type] ): vals = self._gwf.get_value( self._gwf.get_var_address( attr.upper(), self._gwf_name, tag.upper(), ) ) bnd_attrs[attr] = vals if fill_bound: bound[:, i] = vals if package_type == "sfr6": tag = self._gwf_package_dict[package_type][0] stage = self._gwf.get_value( self._gwf.get_var_address( "STAGE", self._gwf_name, tag.upper() ) ) bound[:, 0] = stage bound[:, 1] = -1.0 * hcof if pert_save: for i in range(nbound): # note bound is an array! pak_data = { "node": nodelist[i], "bound": bound[i], "hcof": hcof[i], "rhs": rhs[i], "packagename": tag, "simval": simvals[i], } for key, val in bnd_attrs.items(): pak_data[key] = val[i] sp_package_data[package_type][kperkstp].append( pak_data ) data_dict[tag] = { "ptype": package_type, "nodelist": nodelist, "bound": bound, "hcof": hcof, "rhs": rhs, "simvals": simvals, } for key, val in bnd_attrs.items(): assert key not in data_dict[tag], ( f"boundary attribute '{key}' already in " + f"data dict for {tag}" ) data_dict[tag][key] = val attr_dict = { "ctime": ctime, "dt": dt1, "kper": kper, "kstp": kstp, "is_newton": is_newton, "has_sto": has_sto, } _write_group_to_hdf( fhd, group_name=f"solution_kper:{kper:05d}_kstp:{kstp:05d}", data_dict=data_dict, attr_dict=attr_dict, logger=self.logger.logger, ) sim_end = datetime.now() td = (sim_end - sim_start).total_seconds() / 60.0 if verbose: self.logger.logger.info( f"Flow solution finished and took {td:10.5G} minutes" ) if num_fails > 0: self.logger.logger.info( f"Flow solution failed to converge {num_fails} times" ) _write_group_to_hdf( fhd, "aux", {"totime": ctimes, "dt": dts, "kper": kpers, "kstp": kstps}, logger=self.logger.logger, ) self._add_gwf_info_to_hdf(fhd) fhd.close() if pert_save: return head_dict, sp_package_data
[docs] def solve_adjoint( self, hdf5_adjoint_solution_fname: Optional[PathLike] = None, skip_solve: bool = False, csv_summary: bool = False, linear_solver=None, linear_solver_kwargs: dict = {}, use_precon: bool = True, jacobi_preconditioner: Optional[str] = None, precon_kwargs: dict = {}, singular_test: bool = False, tikhonov: float = 0.0, dvclose: Optional[float] = 1e-6, rclose: Optional[float] = 1e-3, dvscale: bool = False, ) -> dict[str, pd.DataFrame]: """Solve for the adjoint state, one performance measure at a time. Parameters ---------- hdf5_adjoint_solution_fname : PathLike, optional HDF5 file to write the adjoint solution. If omitted, a default name based on the performance-measure name is used. skip_solve : bool, optional Skip the adjoint solve for time steps with no performance-measure entries. csv_summary : bool, optional Write a CSV summary of the sensitivity information. linear_solver : str or callable, optional Sparse linear solver to use. If ``None``, either a direct solver or ``bicgstab`` is selected based on model size. If a string, supported values are ``"direct"`` and ``"bicgstab"``. A compatible solver callable may also be supplied. linear_solver_kwargs : dict, optional Keyword arguments passed to ``linear_solver``. use_precon : bool, optional Use a preconditioner with the iterative linear solver. jacobi_preconditioner : str, optional Use Jacobi preconditioner with the iterative linear solver. If ``None``, the ILU preconditioner will be used. If a string, supported values are ``"point"`` and ``"block"``. precon_kwargs : dict, optional Keyword arguments passed to the preconditioner setup. For the default ILU preconditioner, these are passed to ``spilu``. When ``jacobi_preconditioner="block"``, the ``block_size`` key sets the block size. singular_test : bool, optional Test for a singular matrix and apply Tikhonov regularization when needed. tikhonov : float, optional Tikhonov regularization value. dvclose : float, optional Custom convergence criterion based on the maximum absolute change between consecutive iterates. rclose : float, optional Custom convergence criterion based on the maximum absolute residual. dvscale : bool, optional Scale lambda and the right-hand side to improve iterative solver convergence for large lambda values. Returns ------- dict[str, DataFrame] Composite sensitivity summaries keyed by performance-measure name. """ generate_name = hdf5_adjoint_solution_fname is None dfs = {} with _utils_cd(self.working_directory): if self._hdf5_name is None or not pl.Path(self._hdf5_name).exists(): raise Exception("need to call solve_forward_model() first") for pm in self._performance_measures: if generate_name: hdf5_name = pl.Path(self._hdf5_name) path = hdf5_name.parent extension = hdf5_name.suffix hdf5_adjoint_solution_fname = ( path / f"adjoint_solution_{pm.name}{extension}" ) df = pm.solve_adjoint( hdf5_forward_solution_fname=self._hdf5_name, hdf5_adjoint_solution_fname=hdf5_adjoint_solution_fname, skip_solve=skip_solve, csv_summary=csv_summary, linear_solver=linear_solver, linear_solver_kwargs=linear_solver_kwargs, jacobi_preconditioner=jacobi_preconditioner, use_precon=use_precon, precon_kwargs=precon_kwargs, singular_test=singular_test, tikhonov=tikhonov, dvclose=dvclose, rclose=rclose, dvscale=dvscale, ) dfs[pm.name] = df return dfs
def _initialize_gwf(self, lib_name: str, sim_ws: PathLike) -> modflowapi.ModflowApi: """Initialize the MODFLOW 6 API. Parameters ---------- lib_name : str MODFLOW 6 shared library filename. sim_ws : PathLike Simulation directory containing the shared library file. Returns ------- modflowapi.ModflowApi Initialized MODFLOW 6 API instance for the requested simulation workspace. """ # instantiate the flow model api if self._gwf is not None: self._gwf.finalize() self._gwf = None sim_ws = pl.Path(sim_ws) gwf = modflowapi.ModflowApi( str(sim_ws / lib_name), working_directory=str(sim_ws) ) gwf.initialize() return gwf def _get_gwf_version(self) -> str: """Return the MODFLOW 6 version number. Returns ------- str MODFLOW 6 version string reported by the API. """ version = self._gwf.get_version() self.logger.logger.info(f"MODFLOW 6 version: {version}") return version
[docs] def finalize(self) -> None: """Close the API and file handles.""" self.logger.logger.info(f"Finalizing {self.__class__.__name__}") try: self._gwf.finalize() except Exception as e: print(f"{e}\n\nCould not execute finalize()") self._gwf = None
[docs] def perturbation_method(self, pert_mult: float = 1.01) -> pd.DataFrame: """Calculate finite-difference perturbation sensitivities. This utility perturbs active parameter entries one at a time, reruns the forward simulation, and estimates sensitivity using a forward-difference quotient. The resulting sensitivities can be compared directly with adjoint-based sensitivities for verification during development. Parameters ---------- pert_mult : float, default 1.01 Multiplicative perturbation factor applied to each parameter value. The perturbation increment is computed as ``epsilon = value * (pert_mult - 1.0)``. Returns ------- DataFrame Dataframe with one row per perturbation and columns containing perturbation metadata (for example parameter, index, epsilon, and node/layer labels where applicable) plus finite-difference sensitivities for each configured performance measure. """ working_directory = pl.Path(self.working_directory) self._gwf = self._initialize_gwf(self._lib_name, working_directory) self._gwf_version = self._get_gwf_version() gwf_name = self._gwf_name.upper() org_head, org_sp_package_data = self.solve_forward_model(pert_save=True) # tot = 0 # for d in org_sp_package_data["ghb6"][(0, 0)]: # # print(d) # tot += d["simval"] base_results = { pm.name: pm._performance_measure_forward(org_head, org_sp_package_data) for pm in self._performance_measures } assert len(base_results) == len(self._performance_measures) addr = ["NODEUSER", gwf_name, "DIS"] wbaddr = self._gwf.get_var_address(*addr) nuser = self._gwf.get_value(wbaddr) - 1 if len(nuser) == 1: nuser = np.arange(org_head[next(iter(org_head.keys()))].shape[0], dtype=int) kijs = None nlay = 1 if self.is_structured or self.unstructured_type == "disv": addr = ["NLAY", gwf_name, "DIS"] wbaddr = self._gwf.get_var_address(*addr) nlay = self._gwf.get_value(wbaddr)[0] if self.is_structured: kijs = get_lrc(self._shape, list(nuser)) kijs = dict(zip(nuser, kijs)) def _compute_perturbation_results( pert_head: dict, pert_sp_dict: dict, epsilon: float, ) -> dict[str, float]: """Return finite-difference perturbation responses for each measure. Parameters ---------- pert_head : dict Perturbed head results by time step. pert_sp_dict : dict Perturbed stress-package data by time step. epsilon : float Perturbation magnitude. Returns ------- dict[str, float] Perturbation responses indexed by performance-measure name. """ return { pm.name: ( pm._performance_measure_forward(pert_head, pert_sp_dict) - base_results[pm.name] ) / epsilon for pm in self._performance_measures } def _add_spatial_labels(df: pd.DataFrame) -> pd.DataFrame: """Add structured-grid spatial labels to a perturbation summary. Parameters ---------- df : DataFrame Perturbation summary indexed by node number. Returns ------- DataFrame Input summary with optional ``k``, ``i``, and ``j`` columns. """ if kijs is not None: for idx, lab in zip([0, 1, 2], ["k", "i", "j"]): df.loc[:, lab] = df.index.map(lambda x: kijs[x][idx]) return df dfs = [] # boundary condition perturbations _ = get_mf6_bound_dict() for paktype, pdict in org_sp_package_data.items(): if paktype == "chd6": continue pert_items = self._gwf_boundary_attr_dict[paktype] epsilons = [] node_ids = [] names = [] pert_results_dict = {pm.name: [] for pm in self._performance_measures} self.logger.logger.info(f"Running perturbations for {paktype}") for kk, infolist in pdict.items(): for ibnd, infodict in enumerate(infolist): for pert_item in pert_items: new_bound = infodict[pert_item].copy() delt = new_bound * pert_mult epsilon = delt - new_bound epsilons.append(epsilon) new_bound = delt pakname = infodict["packagename"] pert_dict = { "kperkstp": kk, "packagename": pakname, "node": infodict["node"], pert_item: new_bound, "packagetype": paktype, } self._gwf = self._initialize_gwf( self._lib_name, working_directory, ) pert_head, pert_sp_dict = self.solve_forward_model( verbose=False, sp_pert_dict=pert_dict, pert_save=True ) pert_results = _compute_perturbation_results( pert_head, pert_sp_dict, epsilon, ) for pm_name, result in pert_results.items(): pert_results_dict[pm_name].append(result) node_ids.append(infodict["node"]) if paktype == "wel6": names.append("wel6_q") elif paktype == "rch6": names.append("rch6_recharge") else: names.append(pakname + "_" + pert_item + f"_{ibnd}") if not epsilons: continue df = pd.DataFrame(pert_results_dict) df.loc[:, "node"] = node_ids df.loc[:, "epsilon"] = epsilons df.loc[:, "addr"] = names df.index = df.pop("node") - 1 df = df.loc[df.index != -1, :] df.index = df.index.map(lambda x: nuser[x]) df.index.name = "node" df = _add_spatial_labels(df) agg_map = dict.fromkeys(pert_results_dict, "sum") for col in ["epsilon", "k", "i", "j"]: if col in df.columns: agg_map[col] = "first" gdf = ( df.reset_index() .groupby(["node", "addr"], as_index=False) .agg(agg_map) .set_index("node") ) dfs.append(gdf) # property perturbations addresses = [["K11", gwf_name, "NPF"]] if nlay > 1: addresses.append(["K33", gwf_name, "NPF"]) has_sto = False if has_sto_iconvert(self._gwf): has_sto = True wbaddr = self._gwf.get_var_address(*addresses[0]) inodes = self._gwf.get_value_ptr(wbaddr).shape[0] for addr in addresses: self.logger.logger.info(f"Running perturbations for {addr}") pert_results_dict = {pm.name: [] for pm in self._performance_measures} wbaddr = self._gwf.get_var_address(*addr) epsilons = [] for inode in range(inodes): self._gwf = self._initialize_gwf(self._lib_name, self.working_directory) pert_arr = self._gwf.get_value_ptr(wbaddr) org = pert_arr[inode] delt = org * pert_mult epsilon = delt - pert_arr[inode] epsilons.append(epsilon) pert_arr[inode] = delt pert_head, pert_sp_dict = self.solve_forward_model( verbose=False, force_k_update=True, pert_save=True ) pert_results = _compute_perturbation_results( pert_head, pert_sp_dict, epsilon, ) for pm_name, result in pert_results.items(): pert_results_dict[pm_name].append(result) df = pd.DataFrame(pert_results_dict) df.index = [nuser[inode] for inode in range(inodes)] df.index.name = "node" df.loc[:, "epsilon"] = epsilons df = _add_spatial_labels(df) tag = "_".join(addr).lower() df.loc[:, "addr"] = tag dfs.append(df) if has_sto: test_dir = working_directory / "_pert_temp" if pl.Path(test_dir).exists(): shutil.rmtree(test_dir) sim = flopy.mf6.MFSimulation.load(sim_ws=working_directory) gwf = sim.get_model() ss = gwf.sto.ss.array.copy().flatten() # this is an attempt to make sure we aren't using "layered" gwf.sto.ss = ss sim.set_sim_path(test_dir) sim.set_all_data_external() sim.write_simulation() ss_arr_name = pl.Path(test_dir) / f"{gwf.name}.sto_ss.txt" if not ss_arr_name.exists(): raise Exception( "couldn't find ss_arr_name '{0}' needed for BS super hack" ) self.logger.logger.info( "Running manual flopy based perturbations for sto ss" ) pert_results_dict = {pm.name: [] for pm in self._performance_measures} epsilons = [] for inode in range(inodes): arr_node = nuser[inode] pert_arr = ss.copy() org = ss[arr_node] delt = org * pert_mult epsilon = delt - pert_arr[arr_node] epsilons.append(epsilon) pert_arr[arr_node] = delt # reset the ss property np.savetxt(ss_arr_name, pert_arr.flatten(), fmt="%15.6E") self._gwf = self._initialize_gwf(self._lib_name, test_dir) pert_head, pert_sp_dict = self.solve_forward_model( verbose=False, force_k_update=True, pert_save=True ) pert_results = _compute_perturbation_results( pert_head, pert_sp_dict, epsilon, ) for pm_name, result in pert_results.items(): pert_results_dict[pm_name].append(result) df = pd.DataFrame(pert_results_dict) df.index = [nuser[inode] for inode in range(inodes)] df.index.name = "node" df.loc[:, "epsilon"] = epsilons df = _add_spatial_labels(df) tag = "sto_ss" df.loc[:, "addr"] = tag dfs.append(df) df = pd.concat(dfs) df.index = df.index.values + 1 df.index.name = "node" df.sort_index(inplace=True) df.to_csv(working_directory / "pert_results.csv") return df