Skip to content

Calibration

The calibration module provides tools for model calibration, global sensitivity analysis, and uncertainty quantification using PEST++ via pyemu.

Installation

Calibration dependencies are optional and must be installed separately:

make install-calib
# or manually:
pip install .[calibration] && get-pestpp :pyemu

Make sure the PEST++ executables (pestpp-glm, pestpp-ies, pestpp-sen) are on the system PATH.

Overview

The calibration workflow is driven by a single YAML configuration file (alongside the main MOBIDIC configuration). The PestSetup class orchestrates all steps: generating PEST++ template and instruction files, running the forward model, and parsing results.

Currently supported PEST++ tools:

Tool Method Use case
glm Gauss-Levenberg-Marquardt Gradient-based calibration
ies Iterative Ensemble Smoother Ensemble-based calibration and uncertainty
sen Sensitivity analysis Global sensitivity analysis

Calibration setup

Orchestrator for PEST++ calibration of MOBIDICpy.

Creates a complete PEST++ working directory with all necessary files and provides methods to execute calibration.

Parameters:

Name Type Description Default
calib_config CalibrationConfig | str | Path

Calibration configuration (CalibrationConfig object or path to YAML).

required
base_path Path | None

Base directory for resolving relative paths in the config.

None
Source code in mobidic/calibration/pest_setup.py
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
class PestSetup:
    """Orchestrator for PEST++ calibration of MOBIDICpy.

    Creates a complete PEST++ working directory with all necessary files
    and provides methods to execute calibration.

    Args:
        calib_config: Calibration configuration (CalibrationConfig object or path to YAML).
        base_path: Base directory for resolving relative paths in the config.
    """

    def __init__(self, calib_config: CalibrationConfig | str | Path, base_path: Path | None = None):
        if isinstance(calib_config, (str, Path)):
            config_path = Path(calib_config)
            self.calib_config = load_calibration_config(config_path)
            self.base_path = base_path or config_path.parent
        else:
            self.calib_config = calib_config
            self.base_path = base_path or Path.cwd()

        self._pest_exe = PEST_TOOL_MAP[self.calib_config.pest_tool]
        self._working_dir = None
        self._obs_data = None  # Cached observation alignment data
        self._n_obs_per_group = None

    @property
    def working_dir(self) -> Path:
        """Path to the PEST++ working directory."""
        if self._working_dir is None:
            wd = Path(self.calib_config.working_dir)
            if not wd.is_absolute():
                wd = self.base_path / wd
            self._working_dir = wd
        return self._working_dir

    def setup(self) -> Path:
        """Create complete PEST++ working directory with all files.

        Returns:
            Path to the working directory.
        """

        cc = self.calib_config
        wd = self.working_dir

        logger.info(f"Setting up PEST++ working directory: {wd}")

        # Create working directory
        wd.mkdir(parents=True, exist_ok=True)

        # Resolve MOBIDIC config path
        mobidic_config_path = Path(cc.mobidic_config)
        if not mobidic_config_path.is_absolute():
            mobidic_config_path = self.base_path / mobidic_config_path

        # Load MOBIDIC config to get paths
        from mobidic.config import load_config

        mobidic_config = load_config(mobidic_config_path)
        gisdata_path = Path(mobidic_config.paths.gisdata)
        network_path = Path(mobidic_config.paths.network)

        # Determine forcing path
        if mobidic_config.paths.meteoraster:
            forcing_path = Path(mobidic_config.paths.meteoraster)
        elif cc.use_raster_forcing:
            # Will be generated by initial forward run
            forcing_path = wd / "forcing_raster.nc"
        else:
            forcing_path = Path(mobidic_config.paths.meteodata)

        # Determine calibration period (controls which observations are used)
        if cc.calibration_period:
            calib_start = cc.calibration_period.start_date
            calib_end = cc.calibration_period.end_date
        else:
            # Use first/last observation time
            all_obs = []
            for obs_group in cc.observations:
                obs_df = load_observations(obs_group, self.base_path)
                all_obs.append(obs_df)
            all_times = pd.concat(all_obs)["time"]
            calib_start = str(all_times.min())
            calib_end = str(all_times.max())

        # Determine simulation period (the full window the forward model runs,
        # including warm-up). Defaults to calibration period if not specified.
        if cc.simulation_period:
            sim_start = cc.simulation_period.start_date
            sim_end = cc.simulation_period.end_date
        else:
            sim_start = calib_start
            sim_end = calib_end

        # Validate simulation period against forcing time range
        self._validate_calibration_period(forcing_path, mobidic_config, sim_start, sim_end)

        # Run initial forward run if raster forcing needs generation
        if cc.use_raster_forcing and not forcing_path.exists():
            logger.info("Running initial forward run to generate raster forcing...")
            self._run_initial_forward(mobidic_config_path, mobidic_config, forcing_path, sim_start, sim_end)

        # Load observations and align to simulation timesteps
        # Observations are filtered to calibration period, but aligned to the
        # full simulation time grid (so sim_indices are correct)
        self._load_and_align_observations(sim_start, sim_end, calib_start, calib_end, mobidic_config)

        # Check if routing parameters are being calibrated
        routing_keys = {"parameters.routing.wcel", "parameters.routing.Br0", "parameters.routing.NBr"}
        routing_calibrated = any(p.parameter_key in routing_keys for p in cc.parameters)

        # 1. Generate template file
        generate_template_file(cc, wd / "model_input.csv.tpl")

        # 2. Generate initial model_input.csv
        generate_model_input_csv(cc, wd / "model_input.csv")

        # 3. Generate instruction file
        ins_path, obs_names = generate_instruction_file(cc, self._n_obs_per_group, wd / "model_output.csv.ins")

        # 4. Generate forward model runner script
        obs_data_json = json.dumps(self._obs_data)
        observation_reaches = [og.reach_id for og in cc.observations]

        generate_forward_run_script(
            base_config_path=mobidic_config_path,
            gisdata_path=gisdata_path,
            network_path=network_path,
            forcing_path=forcing_path,
            start_date=sim_start,
            end_date=sim_end,
            observation_reaches=observation_reaches,
            obs_data_json=obs_data_json,
            routing_params_calibrated=routing_calibrated,
            output_script_path=wd / "forward_run.py",
        )

        # 5. Build PEST control file via pyemu
        pst = self._build_pst(obs_names, wd)

        # Write .pst file
        pst_path = wd / f"{self.calib_config.case_name}.pst"
        pst_version = self.calib_config.pest_options.get("pst_version", 2)
        pst.write(str(pst_path), version=pst_version)
        logger.info(f"PEST control file written: {pst_path} (version={pst_version})")

        logger.success(f"PEST++ setup complete in {wd}")
        return wd

    def run(self, num_workers: int | None = None, start_manager: bool = True) -> "CalibrationResults":
        """Execute PEST++ with parallel workers.

        Args:
            num_workers: Override number of workers (default: from config or os.cpu_count()).
            start_manager: If True (default), start the manager on this machine (local mode).
                If False, start agents only for cluster mode.

        Returns:
            CalibrationResults with parsed output.
        """
        import pyemu

        cfg = self.calib_config.parallel
        n_workers = num_workers or cfg.num_workers or os.cpu_count()
        is_cluster = cfg.manager_ip is not None and not start_manager

        wd = self.working_dir

        # Create a clean template directory containing only PEST files (no subdirectories).
        # This prevents recursive directory nesting: if worker_dir == worker_root, pyemu
        # copies the growing directory into each new worker, causing exponential growth.
        template_dir = wd / "_template"
        if template_dir.exists():
            shutil.rmtree(template_dir)
        template_dir.mkdir()
        for f in wd.iterdir():
            if f.is_file():
                shutil.copy2(f, template_dir / f.name)

        pst_rel_path = f"{self.calib_config.case_name}.pst"

        if is_cluster:
            logger.info(f"Starting {n_workers} PEST++ agents connecting to {cfg.manager_ip}:{cfg.port}")
            pyemu.os_utils.start_workers(
                worker_dir=str(template_dir),
                exe_rel_path=self._pest_exe,
                pst_rel_path=pst_rel_path,
                num_workers=n_workers,
                worker_root=str(wd),
                port=cfg.port,
                master_dir=None,
                local=cfg.manager_ip,
            )
        else:
            logger.info(f"Starting PEST++ ({self._pest_exe}) with {n_workers} local workers")
            pyemu.os_utils.start_workers(
                worker_dir=str(template_dir),
                exe_rel_path=self._pest_exe,
                pst_rel_path=pst_rel_path,
                num_workers=n_workers,
                worker_root=str(wd),
                port=cfg.port,
                master_dir=str(wd / "master"),
                local=True,
            )

        # Clean up template directory
        shutil.rmtree(template_dir, ignore_errors=True)

        return self.load_results()

    def load_results(self) -> "CalibrationResults":
        """Load results from a completed PEST++ run.

        Returns:
            CalibrationResults parsed from PEST++ output files.
        """
        from mobidic.calibration.results import CalibrationResults

        master_dir = self.working_dir / "master"
        if not master_dir.exists():
            master_dir = self.working_dir

        return CalibrationResults.from_pest_output(
            master_dir=master_dir,
            calib_config=self.calib_config,
        )

    def _load_and_align_observations(
        self,
        sim_start: str,
        sim_end: str,
        calib_start: str,
        calib_end: str,
        mobidic_config,
    ) -> None:
        """Load all observations and align to simulation time grid.

        Observations are filtered to the calibration period but aligned to the
        full simulation time grid so that sim_indices correctly reference
        positions in the forward model output (which runs the full simulation).

        Populates self._obs_data and self._n_obs_per_group.
        """
        dt = mobidic_config.simulation.timestep
        sim_times = pd.date_range(start=sim_start, end=sim_end, freq=f"{int(dt)}s")

        self._obs_data = []
        self._n_obs_per_group = {}

        for obs_group in self.calib_config.observations:
            # Filter observations to calibration period (not simulation period)
            obs_df = load_observations(obs_group, self.base_path, calib_start, calib_end)
            # Align to full simulation time grid
            sim_indices, obs_values, obs_times = align_observations_to_simulation(obs_df, sim_times)

            n_obs = len(sim_indices)
            self._n_obs_per_group[obs_group.name] = n_obs

            obs_info = {
                "name": obs_group.name,
                "reach_id": obs_group.reach_id,
                "n_obs": n_obs,
                "sim_indices": sim_indices.tolist(),
                "obs_values": obs_values.tolist(),
            }

            if obs_group.metrics:
                obs_info["metrics"] = [
                    {"metric": m.metric, "target": m.target, "weight": m.weight} for m in obs_group.metrics
                ]

            self._obs_data.append(obs_info)

            logger.info(f"Observation group '{obs_group.name}': {n_obs} matched observations")

    def _build_pst(self, obs_names: list[str], wd: Path):
        """Build the PEST control file (.pst) using pyemu.

        Args:
            obs_names: List of all observation names.
            wd: Working directory.

        Returns:
            pyemu.Pst object.
        """
        import pyemu

        cc = self.calib_config

        # Build parameter data
        par_data = []
        par_groups = set()
        for p in cc.parameters:
            par_groups.add(p.par_group)
            par_data.append(
                {
                    "parnme": p.name,
                    "partrans": p.transform if p.transform != "fixed" else "fixed",
                    "parchglim": "factor",
                    "parval1": p.initial_value,
                    "parlbnd": p.lower_bound,
                    "parubnd": p.upper_bound,
                    "pargp": p.par_group,
                    "scale": p.scale,
                    "offset": p.offset,
                    "dercom": 1,
                }
            )
        par_df = pd.DataFrame(par_data)
        par_df.index = par_df["parnme"]

        # Build observation data
        obs_data = []
        obs_name_set = set()

        for obs_group in cc.observations:
            n_obs = self._n_obs_per_group.get(obs_group.name, 0)
            obs_info = next((d for d in self._obs_data if d["name"] == obs_group.name), None)
            obs_values = obs_info["obs_values"] if obs_info else []

            # Time-series observations
            for i in range(n_obs):
                obs_name = f"{obs_group.name}_{i:04d}"
                obs_val = obs_values[i] if i < len(obs_values) else 0.0
                obs_data.append(
                    {
                        "obsnme": obs_name,
                        "obsval": obs_val,
                        "weight": obs_group.weight,
                        "obgnme": obs_group.name,
                    }
                )
                obs_name_set.add(obs_name)

            # Metric pseudo-observations
            if obs_group.metrics:
                for mc in obs_group.metrics:
                    obs_name = f"{obs_group.name}_{mc.metric}"
                    obs_data.append(
                        {
                            "obsnme": obs_name,
                            "obsval": mc.target,
                            "weight": mc.weight,
                            "obgnme": f"{obs_group.name}_metrics",
                        }
                    )
                    obs_name_set.add(obs_name)

        obs_df = pd.DataFrame(obs_data)
        obs_df.index = obs_df["obsnme"]

        # Create Pst object
        pst = pyemu.Pst.from_par_obs_names(
            par_names=par_df.index.tolist(),
            obs_names=obs_df.index.tolist(),
        )

        # Update parameter data
        pst.parameter_data.loc[par_df.index, "partrans"] = par_df["partrans"]
        pst.parameter_data.loc[par_df.index, "parchglim"] = par_df["parchglim"]
        pst.parameter_data.loc[par_df.index, "parval1"] = par_df["parval1"]
        pst.parameter_data.loc[par_df.index, "parlbnd"] = par_df["parlbnd"]
        pst.parameter_data.loc[par_df.index, "parubnd"] = par_df["parubnd"]
        pst.parameter_data.loc[par_df.index, "pargp"] = par_df["pargp"]
        pst.parameter_data.loc[par_df.index, "scale"] = par_df["scale"]
        pst.parameter_data.loc[par_df.index, "offset"] = par_df["offset"]
        pst.parameter_data.loc[par_df.index, "dercom"] = par_df["dercom"]

        # Update observation data
        pst.observation_data.loc[obs_df.index, "obsval"] = obs_df["obsval"]
        pst.observation_data.loc[obs_df.index, "weight"] = obs_df["weight"]
        pst.observation_data.loc[obs_df.index, "obgnme"] = obs_df["obgnme"]

        # Set model command
        pst.model_command = ["python forward_run.py"]

        # Set template and instruction file pairs
        pst.model_input_data = pd.DataFrame(
            {
                "pest_file": ["model_input.csv.tpl"],
                "model_file": ["model_input.csv"],
            }
        )
        pst.model_output_data = pd.DataFrame(
            {
                "pest_file": ["model_output.csv.ins"],
                "model_file": ["model_output.csv"],
            }
        )

        # Set control data from pest_options (with defaults)
        control_data_keys = {"noptmax", "relparmax", "facparmax"}
        pst.control_data.noptmax = cc.pest_options.get("noptmax", 20)
        pst.control_data.relparmax = cc.pest_options.get("relparmax", 10.0)
        pst.control_data.facparmax = cc.pest_options.get("facparmax", 10.0)
        pst.svd_data.maxsing = len(cc.parameters)

        # Apply additional PEST++ options (skip keys already handled as control_data)
        for key, value in cc.pest_options.items():
            if key in control_data_keys or key == "pst_version":
                continue
            pst.pestpp_options[key] = value

        logger.info(
            f"Built PEST control file: {len(cc.parameters)} parameters, "
            f"{len(obs_data)} observations, tool={self._pest_exe}"
        )
        return pst

    def _validate_calibration_period(self, forcing_path: Path, mobidic_config, start_date: str, end_date: str) -> None:
        """Validate that the simulation period is contained within the forcing time range.

        The simulation period (which includes any warm-up before the calibration
        window) must fit within the available forcing data so that PEST++ forward
        runs can actually be executed.

        Args:
            forcing_path: Path to forcing NetCDF file.
            mobidic_config: Loaded MOBIDICConfig object.
            start_date: Simulation start date string.
            end_date: Simulation end date string.

        Raises:
            ValueError: If the simulation period extends outside the forcing data range.
        """
        sim_start = pd.Timestamp(start_date)
        sim_end = pd.Timestamp(end_date)

        # Determine the forcing time range
        forcing_start = None
        forcing_end = None

        if forcing_path.exists():
            # Load just enough to get the time range (MeteoRaster or MeteoData)
            try:
                from mobidic.preprocessing.meteo_raster import MeteoRaster

                forcing = MeteoRaster.from_netcdf(forcing_path, preload=False)
                forcing_start = forcing.start_date
                forcing_end = forcing.end_date
            except Exception:
                try:
                    from mobidic.preprocessing.meteo_preprocessing import MeteoData

                    forcing = MeteoData.from_netcdf(forcing_path)
                    forcing_start = forcing.start_date
                    forcing_end = forcing.end_date
                except Exception:
                    logger.warning(
                        f"Could not determine forcing time range from {forcing_path}. "
                        "Skipping calibration period validation."
                    )
                    return
        elif not self.calib_config.use_raster_forcing:
            # Forcing file should exist if we're not generating it
            logger.warning(f"Forcing file not found: {forcing_path}. Skipping calibration period validation.")
            return
        else:
            # Raster forcing will be generated — skip validation (it will use the simulation period)
            return

        if forcing_start is None or forcing_end is None:
            return

        # Check: simulation start_date >= forcing start_date
        if sim_start < forcing_start:
            raise ValueError(
                f"Simulation start_date ({start_date}) is before the forcing data start "
                f"({forcing_start}). The simulation period must be contained within the "
                f"available forcing data period."
            )

        # Check: simulation end_date <= forcing end_date
        if sim_end > forcing_end:
            raise ValueError(
                f"Simulation end_date ({end_date}) is after the forcing data end "
                f"({forcing_end}). The simulation period must be contained within the "
                f"available forcing data period."
            )

        logger.info(
            f"Simulation period validated: [{start_date}, {end_date}] is within "
            f"forcing period [{forcing_start}, {forcing_end}]"
        )

    def _run_initial_forward(self, config_path, config, forcing_path, start_date, end_date):
        """Run initial forward simulation to generate raster forcing."""
        from mobidic.core.simulation import Simulation
        from mobidic.preprocessing.io import load_gisdata
        from mobidic.preprocessing.meteo_preprocessing import MeteoData

        gisdata = load_gisdata(config.paths.gisdata, config.paths.network)

        # Enable meteo output so interpolated forcing is saved as raster
        config.output_forcing_data.meteo_data = True

        forcing = MeteoData.from_netcdf(config.paths.meteodata)
        sim = Simulation(gisdata, forcing, config)
        sim.run(start_date, end_date)

        # Copy the generated meteo_forcing.nc to the expected forcing_path
        output_dir = Path(config.paths.output)
        meteo_output = output_dir / "meteo_forcing.nc"
        if meteo_output.exists():
            shutil.copy2(meteo_output, forcing_path)
            logger.info(f"Rasterized forcing saved to {forcing_path}")
        else:
            raise FileNotFoundError(f"Expected rasterized forcing at {meteo_output} but not found")

working_dir property

Path to the PEST++ working directory.

load_results()

Load results from a completed PEST++ run.

Returns:

Type Description
'CalibrationResults'

CalibrationResults parsed from PEST++ output files.

Source code in mobidic/calibration/pest_setup.py
def load_results(self) -> "CalibrationResults":
    """Load results from a completed PEST++ run.

    Returns:
        CalibrationResults parsed from PEST++ output files.
    """
    from mobidic.calibration.results import CalibrationResults

    master_dir = self.working_dir / "master"
    if not master_dir.exists():
        master_dir = self.working_dir

    return CalibrationResults.from_pest_output(
        master_dir=master_dir,
        calib_config=self.calib_config,
    )

run(num_workers=None, start_manager=True)

Execute PEST++ with parallel workers.

Parameters:

Name Type Description Default
num_workers int | None

Override number of workers (default: from config or os.cpu_count()).

None
start_manager bool

If True (default), start the manager on this machine (local mode). If False, start agents only for cluster mode.

True

Returns:

Type Description
'CalibrationResults'

CalibrationResults with parsed output.

Source code in mobidic/calibration/pest_setup.py
def run(self, num_workers: int | None = None, start_manager: bool = True) -> "CalibrationResults":
    """Execute PEST++ with parallel workers.

    Args:
        num_workers: Override number of workers (default: from config or os.cpu_count()).
        start_manager: If True (default), start the manager on this machine (local mode).
            If False, start agents only for cluster mode.

    Returns:
        CalibrationResults with parsed output.
    """
    import pyemu

    cfg = self.calib_config.parallel
    n_workers = num_workers or cfg.num_workers or os.cpu_count()
    is_cluster = cfg.manager_ip is not None and not start_manager

    wd = self.working_dir

    # Create a clean template directory containing only PEST files (no subdirectories).
    # This prevents recursive directory nesting: if worker_dir == worker_root, pyemu
    # copies the growing directory into each new worker, causing exponential growth.
    template_dir = wd / "_template"
    if template_dir.exists():
        shutil.rmtree(template_dir)
    template_dir.mkdir()
    for f in wd.iterdir():
        if f.is_file():
            shutil.copy2(f, template_dir / f.name)

    pst_rel_path = f"{self.calib_config.case_name}.pst"

    if is_cluster:
        logger.info(f"Starting {n_workers} PEST++ agents connecting to {cfg.manager_ip}:{cfg.port}")
        pyemu.os_utils.start_workers(
            worker_dir=str(template_dir),
            exe_rel_path=self._pest_exe,
            pst_rel_path=pst_rel_path,
            num_workers=n_workers,
            worker_root=str(wd),
            port=cfg.port,
            master_dir=None,
            local=cfg.manager_ip,
        )
    else:
        logger.info(f"Starting PEST++ ({self._pest_exe}) with {n_workers} local workers")
        pyemu.os_utils.start_workers(
            worker_dir=str(template_dir),
            exe_rel_path=self._pest_exe,
            pst_rel_path=pst_rel_path,
            num_workers=n_workers,
            worker_root=str(wd),
            port=cfg.port,
            master_dir=str(wd / "master"),
            local=True,
        )

    # Clean up template directory
    shutil.rmtree(template_dir, ignore_errors=True)

    return self.load_results()

setup()

Create complete PEST++ working directory with all files.

Returns:

Type Description
Path

Path to the working directory.

Source code in mobidic/calibration/pest_setup.py
def setup(self) -> Path:
    """Create complete PEST++ working directory with all files.

    Returns:
        Path to the working directory.
    """

    cc = self.calib_config
    wd = self.working_dir

    logger.info(f"Setting up PEST++ working directory: {wd}")

    # Create working directory
    wd.mkdir(parents=True, exist_ok=True)

    # Resolve MOBIDIC config path
    mobidic_config_path = Path(cc.mobidic_config)
    if not mobidic_config_path.is_absolute():
        mobidic_config_path = self.base_path / mobidic_config_path

    # Load MOBIDIC config to get paths
    from mobidic.config import load_config

    mobidic_config = load_config(mobidic_config_path)
    gisdata_path = Path(mobidic_config.paths.gisdata)
    network_path = Path(mobidic_config.paths.network)

    # Determine forcing path
    if mobidic_config.paths.meteoraster:
        forcing_path = Path(mobidic_config.paths.meteoraster)
    elif cc.use_raster_forcing:
        # Will be generated by initial forward run
        forcing_path = wd / "forcing_raster.nc"
    else:
        forcing_path = Path(mobidic_config.paths.meteodata)

    # Determine calibration period (controls which observations are used)
    if cc.calibration_period:
        calib_start = cc.calibration_period.start_date
        calib_end = cc.calibration_period.end_date
    else:
        # Use first/last observation time
        all_obs = []
        for obs_group in cc.observations:
            obs_df = load_observations(obs_group, self.base_path)
            all_obs.append(obs_df)
        all_times = pd.concat(all_obs)["time"]
        calib_start = str(all_times.min())
        calib_end = str(all_times.max())

    # Determine simulation period (the full window the forward model runs,
    # including warm-up). Defaults to calibration period if not specified.
    if cc.simulation_period:
        sim_start = cc.simulation_period.start_date
        sim_end = cc.simulation_period.end_date
    else:
        sim_start = calib_start
        sim_end = calib_end

    # Validate simulation period against forcing time range
    self._validate_calibration_period(forcing_path, mobidic_config, sim_start, sim_end)

    # Run initial forward run if raster forcing needs generation
    if cc.use_raster_forcing and not forcing_path.exists():
        logger.info("Running initial forward run to generate raster forcing...")
        self._run_initial_forward(mobidic_config_path, mobidic_config, forcing_path, sim_start, sim_end)

    # Load observations and align to simulation timesteps
    # Observations are filtered to calibration period, but aligned to the
    # full simulation time grid (so sim_indices are correct)
    self._load_and_align_observations(sim_start, sim_end, calib_start, calib_end, mobidic_config)

    # Check if routing parameters are being calibrated
    routing_keys = {"parameters.routing.wcel", "parameters.routing.Br0", "parameters.routing.NBr"}
    routing_calibrated = any(p.parameter_key in routing_keys for p in cc.parameters)

    # 1. Generate template file
    generate_template_file(cc, wd / "model_input.csv.tpl")

    # 2. Generate initial model_input.csv
    generate_model_input_csv(cc, wd / "model_input.csv")

    # 3. Generate instruction file
    ins_path, obs_names = generate_instruction_file(cc, self._n_obs_per_group, wd / "model_output.csv.ins")

    # 4. Generate forward model runner script
    obs_data_json = json.dumps(self._obs_data)
    observation_reaches = [og.reach_id for og in cc.observations]

    generate_forward_run_script(
        base_config_path=mobidic_config_path,
        gisdata_path=gisdata_path,
        network_path=network_path,
        forcing_path=forcing_path,
        start_date=sim_start,
        end_date=sim_end,
        observation_reaches=observation_reaches,
        obs_data_json=obs_data_json,
        routing_params_calibrated=routing_calibrated,
        output_script_path=wd / "forward_run.py",
    )

    # 5. Build PEST control file via pyemu
    pst = self._build_pst(obs_names, wd)

    # Write .pst file
    pst_path = wd / f"{self.calib_config.case_name}.pst"
    pst_version = self.calib_config.pest_options.get("pst_version", 2)
    pst.write(str(pst_path), version=pst_version)
    logger.info(f"PEST control file written: {pst_path} (version={pst_version})")

    logger.success(f"PEST++ setup complete in {wd}")
    return wd

Calibration results

Container for parsed PEST++ calibration results.

Provides access to: - Optimal parameter values - Objective function history - Residuals - Parameter sensitivities (GLM/SEN) - Ensemble statistics (IES)

Parameters:

Name Type Description Default
master_dir Path

Path to the PEST++ master directory.

required
calib_config CalibrationConfig

Calibration configuration.

required
Source code in mobidic/calibration/results.py
class CalibrationResults:
    """Container for parsed PEST++ calibration results.

    Provides access to:
    - Optimal parameter values
    - Objective function history
    - Residuals
    - Parameter sensitivities (GLM/SEN)
    - Ensemble statistics (IES)

    Args:
        master_dir: Path to the PEST++ master directory.
        calib_config: Calibration configuration.
    """

    def __init__(self, master_dir: Path, calib_config: CalibrationConfig):
        self.master_dir = Path(master_dir)
        self.calib_config = calib_config
        self._pst = None
        self._rec_data = None

    @classmethod
    def from_pest_output(cls, master_dir: Path, calib_config: CalibrationConfig) -> "CalibrationResults":
        """Create CalibrationResults from completed PEST++ output.

        Args:
            master_dir: Path to PEST++ master directory.
            calib_config: Calibration configuration.

        Returns:
            CalibrationResults object.
        """
        return cls(master_dir=master_dir, calib_config=calib_config)

    @property
    def pst(self):
        """Load the PEST control file."""
        if self._pst is None:
            import pyemu

            pst_path = self.master_dir / f"{self.calib_config.case_name}.pst"
            if pst_path.exists():
                self._pst = pyemu.Pst(str(pst_path))
            else:
                logger.warning(f"PST file not found: {pst_path}")
        return self._pst

    def get_optimal_parameters(self) -> dict[str, float]:
        """Get the optimal parameter values from the calibration.

        Returns:
            Dict mapping parameter name to optimal value.
        """
        # Try to read from .par file (final parameter values)
        par_files = sorted(self.master_dir.glob(f"{self.calib_config.case_name}.*.par"))
        if not par_files:
            par_files = sorted(self.master_dir.glob("*.par"))

        if par_files:
            # Use the last .par file
            par_file = par_files[-1]
            logger.info(f"Reading optimal parameters from: {par_file}")
            return self._parse_par_file(par_file)

        # Fallback: read from .pst parameter_data (initial values)
        if self.pst is not None:
            return dict(zip(self.pst.parameter_data.index, self.pst.parameter_data["parval1"]))

        return {}

    def get_objective_function_history(self) -> pd.DataFrame | None:
        """Get objective function values across iterations.

        Returns a DataFrame with at minimum ``iteration`` and ``phi`` columns.
        The source file and meaning of ``phi`` depend on the PEST++ tool:

        - ``glm``: reads ``calibration.iobj`` (CSV); ``phi`` = ``total_phi``.
        - ``ies``: reads ``calibration.phi.actual.csv``; ``phi`` = mean across
          ensemble members. Extra columns ``std`` and one column per member are
          also included.
        - other tools: reads ``calibration.rec``; ``phi`` = total phi extracted
          from the record file.

        Returns:
            DataFrame with iteration number and phi (objective function value),
            or None if the expected file is not found.
        """
        match self.calib_config.pest_tool:
            case "glm":
                iobj_path = self.master_dir / f"{self.calib_config.case_name}.iobj"
                if not iobj_path.exists():
                    logger.warning(f"iobj file not found: {iobj_path}")
                    return None
                return self._parse_iobj_phi(iobj_path)

            case "ies":
                phi_path = self.master_dir / f"{self.calib_config.case_name}.phi.actual.csv"
                if not phi_path.exists():
                    logger.warning(f"IES phi file not found: {phi_path}")
                    return None
                return self._parse_ies_phi(phi_path)

            case _:
                rec_path = self.master_dir / f"{self.calib_config.case_name}.rec"
                if not rec_path.exists():
                    logger.warning(f"Record file not found: {rec_path}")
                    return None
                return self._parse_rec_phi(rec_path)

    def get_residuals(self) -> pd.DataFrame | None:
        """Get observation residuals (simulated - observed) from the final iteration.

        Returns:
            DataFrame with obs_name, observed, simulated, residual, weight columns,
            or None if .rei file not found.
        """
        rei_files = sorted(self.master_dir.glob(f"{self.calib_config.case_name}.*.rei"))
        if not rei_files:
            rei_files = sorted(self.master_dir.glob("*.rei"))

        if not rei_files:
            logger.warning("No .rei (residuals) files found")
            return None

        rei_file = rei_files[-1]
        logger.info(f"Reading residuals from: {rei_file}")
        return self._parse_rei_file(rei_file)

    def get_parameter_sensitivities(self) -> pd.DataFrame | None:
        """Get parameter sensitivities.

        - ``sen``: reads ``{case}.msn`` (Morris sensitivity); returns all columns
          (par_name, n_samples, sen_mean, sen_mean_abs, sen_std_dev).
        - ``glm``: loads the Jacobian (``.jcb`` / ``.jco``) via pyemu and computes
          composite sensitivity (column-wise L2 norm).

        Returns:
            DataFrame with sensitivity information, or None if not available.
        """
        match self.calib_config.pest_tool:
            case "sen":
                msn_path = self.master_dir / f"{self.calib_config.case_name}.msn"
                if not msn_path.exists():
                    logger.warning(f"Morris sensitivity file not found: {msn_path}")
                    return None
                df = pd.read_csv(msn_path)
                logger.info(f"Read Morris sensitivities from {msn_path}: {len(df)} parameters")
                return df

            case "glm":
                jco_path = self.master_dir / f"{self.calib_config.case_name}.jcb"
                if not jco_path.exists():
                    jco_path = self.master_dir / f"{self.calib_config.case_name}.jco"
                if not jco_path.exists():
                    logger.warning(f"Jacobian file not found in {self.master_dir}")
                    return None
                import pyemu

                jco = pyemu.Jco.from_binary(str(jco_path))
                sens = pd.DataFrame(
                    {
                        "parameter": jco.col_names,
                        "sensitivity": np.sqrt(np.asarray((jco.x**2).sum(axis=0)).ravel()),
                    }
                )
                logger.info(f"Computed GLM composite sensitivities from {jco_path}: {len(sens)} parameters")
                return sens.sort_values("sensitivity", ascending=False).reset_index(drop=True)

            case _:
                logger.warning(f"Sensitivity output not supported for pest_tool='{self.calib_config.pest_tool}'")
                return None

    def get_ensemble_results(self) -> dict | None:
        """Get IES ensemble results (prior and posterior).

        Returns:
            Dict with 'prior_parameters', 'posterior_parameters',
            'prior_observations', 'posterior_observations' DataFrames,
            or None if not available.
        """

        results = {}

        # Parameter ensembles
        prior_par = self.master_dir / f"{self.calib_config.case_name}.0.par.csv"
        if prior_par.exists():
            results["prior_parameters"] = pd.read_csv(prior_par, index_col=0)

        # Find last iteration's parameter ensemble
        par_csvs = sorted(self.master_dir.glob(f"{self.calib_config.case_name}.*.par.csv"))
        if par_csvs:
            last_par = par_csvs[-1]
            results["posterior_parameters"] = pd.read_csv(last_par, index_col=0)

        # Observation ensembles
        prior_obs = self.master_dir / f"{self.calib_config.case_name}.0.obs.csv"
        if prior_obs.exists():
            results["prior_observations"] = pd.read_csv(prior_obs, index_col=0)

        obs_csvs = sorted(self.master_dir.glob(f"{self.calib_config.case_name}.*.obs.csv"))
        if obs_csvs:
            last_obs = obs_csvs[-1]
            results["posterior_observations"] = pd.read_csv(last_obs, index_col=0)

        if not results:
            return None
        return results

    def _parse_par_file(self, par_file: Path) -> dict[str, float]:
        """Parse a PEST .par file to extract parameter values."""
        params = {}
        with open(par_file, "r", encoding="utf-8") as f:
            lines = f.readlines()
        # Skip header line ("single point")
        for line in lines[1:]:
            parts = line.strip().split()
            if len(parts) >= 2:
                name = parts[0]
                value = float(parts[1])
                params[name] = value
        return params

    def _parse_ies_phi(self, phi_path: Path) -> pd.DataFrame:
        """Parse objective function history from IES calibration.phi.actual.csv."""
        df = pd.read_csv(phi_path, index_col=0)
        df.index.name = "iteration"
        df = df.reset_index()
        member_cols = [c for c in df.columns if c != "iteration"]
        df["phi"] = df[member_cols].mean(axis=1)
        df["std"] = df[member_cols].std(axis=1)
        logger.info(f"Read IES phi history from {phi_path}: {len(df)} iterations, {len(member_cols)} ensemble members")
        return df

    def _parse_iobj_phi(self, iobj_path: Path) -> pd.DataFrame:
        """Parse objective function history from GLM .iobj CSV file."""
        df = pd.read_csv(iobj_path)
        result = df[["iteration", "total_phi"]].rename(columns={"total_phi": "phi"})
        logger.info(f"Read GLM phi history from {iobj_path}: {len(result)} iterations")
        return result

    def _parse_rec_phi(self, rec_path: Path) -> pd.DataFrame:
        """Parse objective function history from .rec file."""
        iterations = []
        phis = []
        with open(rec_path, "r", encoding="utf-8") as f:
            for line in f:
                line_stripped = line.strip().lower()
                if "starting phi for this iteration" in line_stripped or "total phi" in line_stripped:
                    # Try to extract the phi value
                    parts = line.strip().split()
                    for i, part in enumerate(parts):
                        try:
                            phi = float(part)
                            phis.append(phi)
                            iterations.append(len(phis))
                            break
                        except ValueError:
                            continue

        return pd.DataFrame({"iteration": iterations, "phi": phis})

    def _parse_rei_file(self, rei_file: Path) -> pd.DataFrame:
        """Parse a PEST .rei residuals file."""
        rows = []
        with open(rei_file, "r", encoding="utf-8") as f:
            lines = f.readlines()

        # Find data start (after header)
        data_start = 0
        for i, line in enumerate(lines):
            if line.strip().startswith("Name"):
                data_start = i + 1
                break

        for line in lines[data_start:]:
            parts = line.strip().split()
            if len(parts) >= 6:
                rows.append(
                    {
                        "obs_name": parts[0],
                        "group": parts[1],
                        "observed": float(parts[2]),
                        "simulated": float(parts[3]),
                        "residual": float(parts[4]),
                        "weight": float(parts[5]),
                    }
                )

        return pd.DataFrame(rows) if rows else None

pst property

Load the PEST control file.

from_pest_output(master_dir, calib_config) classmethod

Create CalibrationResults from completed PEST++ output.

Parameters:

Name Type Description Default
master_dir Path

Path to PEST++ master directory.

required
calib_config CalibrationConfig

Calibration configuration.

required

Returns:

Type Description
CalibrationResults

CalibrationResults object.

Source code in mobidic/calibration/results.py
@classmethod
def from_pest_output(cls, master_dir: Path, calib_config: CalibrationConfig) -> "CalibrationResults":
    """Create CalibrationResults from completed PEST++ output.

    Args:
        master_dir: Path to PEST++ master directory.
        calib_config: Calibration configuration.

    Returns:
        CalibrationResults object.
    """
    return cls(master_dir=master_dir, calib_config=calib_config)

get_ensemble_results()

Get IES ensemble results (prior and posterior).

Returns:

Type Description
dict | None

Dict with ‘prior_parameters’, ‘posterior_parameters’,

dict | None

‘prior_observations’, ‘posterior_observations’ DataFrames,

dict | None

or None if not available.

Source code in mobidic/calibration/results.py
def get_ensemble_results(self) -> dict | None:
    """Get IES ensemble results (prior and posterior).

    Returns:
        Dict with 'prior_parameters', 'posterior_parameters',
        'prior_observations', 'posterior_observations' DataFrames,
        or None if not available.
    """

    results = {}

    # Parameter ensembles
    prior_par = self.master_dir / f"{self.calib_config.case_name}.0.par.csv"
    if prior_par.exists():
        results["prior_parameters"] = pd.read_csv(prior_par, index_col=0)

    # Find last iteration's parameter ensemble
    par_csvs = sorted(self.master_dir.glob(f"{self.calib_config.case_name}.*.par.csv"))
    if par_csvs:
        last_par = par_csvs[-1]
        results["posterior_parameters"] = pd.read_csv(last_par, index_col=0)

    # Observation ensembles
    prior_obs = self.master_dir / f"{self.calib_config.case_name}.0.obs.csv"
    if prior_obs.exists():
        results["prior_observations"] = pd.read_csv(prior_obs, index_col=0)

    obs_csvs = sorted(self.master_dir.glob(f"{self.calib_config.case_name}.*.obs.csv"))
    if obs_csvs:
        last_obs = obs_csvs[-1]
        results["posterior_observations"] = pd.read_csv(last_obs, index_col=0)

    if not results:
        return None
    return results

get_objective_function_history()

Get objective function values across iterations.

Returns a DataFrame with at minimum iteration and phi columns. The source file and meaning of phi depend on the PEST++ tool:

  • glm: reads calibration.iobj (CSV); phi = total_phi.
  • ies: reads calibration.phi.actual.csv; phi = mean across ensemble members. Extra columns std and one column per member are also included.
  • other tools: reads calibration.rec; phi = total phi extracted from the record file.

Returns:

Type Description
DataFrame | None

DataFrame with iteration number and phi (objective function value),

DataFrame | None

or None if the expected file is not found.

Source code in mobidic/calibration/results.py
def get_objective_function_history(self) -> pd.DataFrame | None:
    """Get objective function values across iterations.

    Returns a DataFrame with at minimum ``iteration`` and ``phi`` columns.
    The source file and meaning of ``phi`` depend on the PEST++ tool:

    - ``glm``: reads ``calibration.iobj`` (CSV); ``phi`` = ``total_phi``.
    - ``ies``: reads ``calibration.phi.actual.csv``; ``phi`` = mean across
      ensemble members. Extra columns ``std`` and one column per member are
      also included.
    - other tools: reads ``calibration.rec``; ``phi`` = total phi extracted
      from the record file.

    Returns:
        DataFrame with iteration number and phi (objective function value),
        or None if the expected file is not found.
    """
    match self.calib_config.pest_tool:
        case "glm":
            iobj_path = self.master_dir / f"{self.calib_config.case_name}.iobj"
            if not iobj_path.exists():
                logger.warning(f"iobj file not found: {iobj_path}")
                return None
            return self._parse_iobj_phi(iobj_path)

        case "ies":
            phi_path = self.master_dir / f"{self.calib_config.case_name}.phi.actual.csv"
            if not phi_path.exists():
                logger.warning(f"IES phi file not found: {phi_path}")
                return None
            return self._parse_ies_phi(phi_path)

        case _:
            rec_path = self.master_dir / f"{self.calib_config.case_name}.rec"
            if not rec_path.exists():
                logger.warning(f"Record file not found: {rec_path}")
                return None
            return self._parse_rec_phi(rec_path)

get_optimal_parameters()

Get the optimal parameter values from the calibration.

Returns:

Type Description
dict[str, float]

Dict mapping parameter name to optimal value.

Source code in mobidic/calibration/results.py
def get_optimal_parameters(self) -> dict[str, float]:
    """Get the optimal parameter values from the calibration.

    Returns:
        Dict mapping parameter name to optimal value.
    """
    # Try to read from .par file (final parameter values)
    par_files = sorted(self.master_dir.glob(f"{self.calib_config.case_name}.*.par"))
    if not par_files:
        par_files = sorted(self.master_dir.glob("*.par"))

    if par_files:
        # Use the last .par file
        par_file = par_files[-1]
        logger.info(f"Reading optimal parameters from: {par_file}")
        return self._parse_par_file(par_file)

    # Fallback: read from .pst parameter_data (initial values)
    if self.pst is not None:
        return dict(zip(self.pst.parameter_data.index, self.pst.parameter_data["parval1"]))

    return {}

get_parameter_sensitivities()

Get parameter sensitivities.

  • sen: reads {case}.msn (Morris sensitivity); returns all columns (par_name, n_samples, sen_mean, sen_mean_abs, sen_std_dev).
  • glm: loads the Jacobian (.jcb / .jco) via pyemu and computes composite sensitivity (column-wise L2 norm).

Returns:

Type Description
DataFrame | None

DataFrame with sensitivity information, or None if not available.

Source code in mobidic/calibration/results.py
def get_parameter_sensitivities(self) -> pd.DataFrame | None:
    """Get parameter sensitivities.

    - ``sen``: reads ``{case}.msn`` (Morris sensitivity); returns all columns
      (par_name, n_samples, sen_mean, sen_mean_abs, sen_std_dev).
    - ``glm``: loads the Jacobian (``.jcb`` / ``.jco``) via pyemu and computes
      composite sensitivity (column-wise L2 norm).

    Returns:
        DataFrame with sensitivity information, or None if not available.
    """
    match self.calib_config.pest_tool:
        case "sen":
            msn_path = self.master_dir / f"{self.calib_config.case_name}.msn"
            if not msn_path.exists():
                logger.warning(f"Morris sensitivity file not found: {msn_path}")
                return None
            df = pd.read_csv(msn_path)
            logger.info(f"Read Morris sensitivities from {msn_path}: {len(df)} parameters")
            return df

        case "glm":
            jco_path = self.master_dir / f"{self.calib_config.case_name}.jcb"
            if not jco_path.exists():
                jco_path = self.master_dir / f"{self.calib_config.case_name}.jco"
            if not jco_path.exists():
                logger.warning(f"Jacobian file not found in {self.master_dir}")
                return None
            import pyemu

            jco = pyemu.Jco.from_binary(str(jco_path))
            sens = pd.DataFrame(
                {
                    "parameter": jco.col_names,
                    "sensitivity": np.sqrt(np.asarray((jco.x**2).sum(axis=0)).ravel()),
                }
            )
            logger.info(f"Computed GLM composite sensitivities from {jco_path}: {len(sens)} parameters")
            return sens.sort_values("sensitivity", ascending=False).reset_index(drop=True)

        case _:
            logger.warning(f"Sensitivity output not supported for pest_tool='{self.calib_config.pest_tool}'")
            return None

get_residuals()

Get observation residuals (simulated - observed) from the final iteration.

Returns:

Type Description
DataFrame | None

DataFrame with obs_name, observed, simulated, residual, weight columns,

DataFrame | None

or None if .rei file not found.

Source code in mobidic/calibration/results.py
def get_residuals(self) -> pd.DataFrame | None:
    """Get observation residuals (simulated - observed) from the final iteration.

    Returns:
        DataFrame with obs_name, observed, simulated, residual, weight columns,
        or None if .rei file not found.
    """
    rei_files = sorted(self.master_dir.glob(f"{self.calib_config.case_name}.*.rei"))
    if not rei_files:
        rei_files = sorted(self.master_dir.glob("*.rei"))

    if not rei_files:
        logger.warning("No .rei (residuals) files found")
        return None

    rei_file = rei_files[-1]
    logger.info(f"Reading residuals from: {rei_file}")
    return self._parse_rei_file(rei_file)

Configuration

Bases: BaseModel

Complete calibration configuration for PEST++ integration.

Source code in mobidic/calibration/config.py
class CalibrationConfig(BaseModel):
    """Complete calibration configuration for PEST++ integration."""

    mobidic_config: str = Field(..., description="Path to MOBIDIC simulation YAML config file")

    simulation_period: Optional[CalibrationPeriod] = Field(
        None,
        description="Simulation period (start_date, end_date) for the forward model. "
        "Can be longer than calibration_period to include warm-up. "
        "Must be contained within the forcing data time range. "
        "If None, defaults to calibration_period.",
    )

    calibration_period: Optional[CalibrationPeriod] = Field(
        None,
        description="Calibration period (start_date, end_date): only observations within "
        "this window are used by PEST++. Must be contained within simulation_period. "
        "If None, defaults to full observation period.",
    )

    use_raster_forcing: bool = Field(
        False,
        description="If True, make a first forward run to rasterize station forcing for faster subsequent runs",
    )

    parameters: list[CalibrationParameter] = Field(..., min_length=1, description="List of parameters to calibrate")

    observations: list[ObservationGroup] = Field(..., min_length=1, description="List of observation groups")

    pest_tool: Literal["glm", "ies", "sen", "da", "opt", "mou", "sqp"] = Field("glm", description="PEST++ tool to use")

    case_name: Optional[str] = Field(
        None,
        description="File name prefix for PEST++ output files (e.g. 'calibration', 'sensitivity'). "
        "Defaults to 'sensitivity' when pest_tool='sen', 'calibration' otherwise.",
    )

    pest_options: Optional[dict] = Field(
        default_factory=dict,
        description="PEST++ and tool-specific options (e.g., noptmax, "
        "relparmax, facparmax, pst_version, ies_num_reals)",
    )

    working_dir: str = Field("pest_run", description="Working directory for PEST++ files")

    parallel: Optional[ParallelConfig] = Field(default_factory=ParallelConfig)

    @model_validator(mode="after")
    def set_case_name_default(self) -> "CalibrationConfig":
        """Default case_name to 'sensitivity' for pestpp-sen, 'calibration' otherwise."""
        if self.case_name is None:
            self.case_name = "sensitivity" if self.pest_tool == "sen" else "calibration"
        return self

    @model_validator(mode="after")
    def check_parameter_names_unique(self) -> "CalibrationConfig":
        """Ensure all parameter names are unique."""
        names = [p.name for p in self.parameters]
        if len(names) != len(set(names)):
            duplicates = [n for n in names if names.count(n) > 1]
            raise ValueError(f"Duplicate parameter names: {set(duplicates)}")
        return self

    @model_validator(mode="after")
    def check_observation_names_unique(self) -> "CalibrationConfig":
        """Ensure all observation group names are unique."""
        names = [o.name for o in self.observations]
        if len(names) != len(set(names)):
            duplicates = [n for n in names if names.count(n) > 1]
            raise ValueError(f"Duplicate observation group names: {set(duplicates)}")
        return self

    @model_validator(mode="after")
    def check_calibration_within_simulation(self) -> "CalibrationConfig":
        """Ensure calibration_period is contained within simulation_period.

        If simulation_period is not set, it defaults to calibration_period at runtime.
        But if both are set, calibration must be within simulation.
        """
        if self.simulation_period is not None and self.calibration_period is not None:
            sim_start = pd.Timestamp(self.simulation_period.start_date)
            sim_end = pd.Timestamp(self.simulation_period.end_date)
            cal_start = pd.Timestamp(self.calibration_period.start_date)
            cal_end = pd.Timestamp(self.calibration_period.end_date)

            if cal_start < sim_start:
                raise ValueError(
                    f"calibration_period.start_date ({self.calibration_period.start_date}) "
                    f"must be >= simulation_period.start_date ({self.simulation_period.start_date})"
                )
            if cal_end > sim_end:
                raise ValueError(
                    f"calibration_period.end_date ({self.calibration_period.end_date}) "
                    f"must be <= simulation_period.end_date ({self.simulation_period.end_date})"
                )
        return self

check_calibration_within_simulation()

Ensure calibration_period is contained within simulation_period.

If simulation_period is not set, it defaults to calibration_period at runtime. But if both are set, calibration must be within simulation.

Source code in mobidic/calibration/config.py
@model_validator(mode="after")
def check_calibration_within_simulation(self) -> "CalibrationConfig":
    """Ensure calibration_period is contained within simulation_period.

    If simulation_period is not set, it defaults to calibration_period at runtime.
    But if both are set, calibration must be within simulation.
    """
    if self.simulation_period is not None and self.calibration_period is not None:
        sim_start = pd.Timestamp(self.simulation_period.start_date)
        sim_end = pd.Timestamp(self.simulation_period.end_date)
        cal_start = pd.Timestamp(self.calibration_period.start_date)
        cal_end = pd.Timestamp(self.calibration_period.end_date)

        if cal_start < sim_start:
            raise ValueError(
                f"calibration_period.start_date ({self.calibration_period.start_date}) "
                f"must be >= simulation_period.start_date ({self.simulation_period.start_date})"
            )
        if cal_end > sim_end:
            raise ValueError(
                f"calibration_period.end_date ({self.calibration_period.end_date}) "
                f"must be <= simulation_period.end_date ({self.simulation_period.end_date})"
            )
    return self

check_observation_names_unique()

Ensure all observation group names are unique.

Source code in mobidic/calibration/config.py
@model_validator(mode="after")
def check_observation_names_unique(self) -> "CalibrationConfig":
    """Ensure all observation group names are unique."""
    names = [o.name for o in self.observations]
    if len(names) != len(set(names)):
        duplicates = [n for n in names if names.count(n) > 1]
        raise ValueError(f"Duplicate observation group names: {set(duplicates)}")
    return self

check_parameter_names_unique()

Ensure all parameter names are unique.

Source code in mobidic/calibration/config.py
@model_validator(mode="after")
def check_parameter_names_unique(self) -> "CalibrationConfig":
    """Ensure all parameter names are unique."""
    names = [p.name for p in self.parameters]
    if len(names) != len(set(names)):
        duplicates = [n for n in names if names.count(n) > 1]
        raise ValueError(f"Duplicate parameter names: {set(duplicates)}")
    return self

set_case_name_default()

Default case_name to ‘sensitivity’ for pestpp-sen, ‘calibration’ otherwise.

Source code in mobidic/calibration/config.py
@model_validator(mode="after")
def set_case_name_default(self) -> "CalibrationConfig":
    """Default case_name to 'sensitivity' for pestpp-sen, 'calibration' otherwise."""
    if self.case_name is None:
        self.case_name = "sensitivity" if self.pest_tool == "sen" else "calibration"
    return self

Bases: BaseModel

A single parameter to be calibrated by PEST++.

Source code in mobidic/calibration/config.py
class CalibrationParameter(BaseModel):
    """A single parameter to be calibrated by PEST++."""

    name: str = Field(..., description="Parameter name (used in PEST++ control file)")
    parameter_key: str = Field(
        ..., description="Dot-notation path into MOBIDIC YAML config (e.g., parameters.multipliers.ks_factor)"
    )
    initial_value: float = Field(..., description="Starting value for optimization")
    lower_bound: float = Field(..., description="Lower bound for parameter")
    upper_bound: float = Field(..., description="Upper bound for parameter")
    transform: Literal["none", "log", "fixed"] = Field("none", description="Parameter transformation")
    scale: float = Field(1.0, description="Multiplication factor applied by PEST++")
    offset: float = Field(0.0, description="Additive offset applied by PEST++")
    par_group: str = Field("default", description="Parameter group name")

    @field_validator("name")
    @classmethod
    def check_name_no_spaces(cls, v: str) -> str:
        """PEST++ parameter names cannot contain spaces."""
        if " " in v:
            raise ValueError("Parameter name cannot contain spaces")
        return v

    @model_validator(mode="after")
    def check_bounds(self) -> "CalibrationParameter":
        """Validate that lower_bound < upper_bound and initial_value is within bounds."""
        if self.lower_bound >= self.upper_bound:
            raise ValueError(f"lower_bound ({self.lower_bound}) must be less than upper_bound ({self.upper_bound})")
        if not self.lower_bound <= self.initial_value <= self.upper_bound:
            raise ValueError(
                f"initial_value ({self.initial_value}) must be between "
                f"lower_bound ({self.lower_bound}) and upper_bound ({self.upper_bound})"
            )
        if self.transform == "log" and self.lower_bound <= 0:
            raise ValueError("lower_bound must be positive when transform='log'")
        return self

check_bounds()

Validate that lower_bound < upper_bound and initial_value is within bounds.

Source code in mobidic/calibration/config.py
@model_validator(mode="after")
def check_bounds(self) -> "CalibrationParameter":
    """Validate that lower_bound < upper_bound and initial_value is within bounds."""
    if self.lower_bound >= self.upper_bound:
        raise ValueError(f"lower_bound ({self.lower_bound}) must be less than upper_bound ({self.upper_bound})")
    if not self.lower_bound <= self.initial_value <= self.upper_bound:
        raise ValueError(
            f"initial_value ({self.initial_value}) must be between "
            f"lower_bound ({self.lower_bound}) and upper_bound ({self.upper_bound})"
        )
    if self.transform == "log" and self.lower_bound <= 0:
        raise ValueError("lower_bound must be positive when transform='log'")
    return self

check_name_no_spaces(v) classmethod

PEST++ parameter names cannot contain spaces.

Source code in mobidic/calibration/config.py
@field_validator("name")
@classmethod
def check_name_no_spaces(cls, v: str) -> str:
    """PEST++ parameter names cannot contain spaces."""
    if " " in v:
        raise ValueError("Parameter name cannot contain spaces")
    return v

Bases: BaseModel

An observation group (e.g., discharge at a specific gauging station).

Source code in mobidic/calibration/config.py
class ObservationGroup(BaseModel):
    """An observation group (e.g., discharge at a specific gauging station)."""

    name: str = Field(..., description="PEST++ observation group identifier (prefix for obs names)")
    obs_file: str = Field(..., description="Path to observed data CSV file (relative to calibration config)")
    reach_id: int = Field(..., description="Reach ID (mobidic_id) where observations are located")
    weight: float = Field(1.0, description="Default weight for all observations in this group")
    time_column: str = Field("time", description="Column name for timestamps in obs_file")
    value_column: str = Field(..., description="Column name for observed values in obs_file")
    metrics: Optional[list[MetricConfig]] = Field(None, description="Optional derived metrics as pseudo-observations")

    @field_validator("name")
    @classmethod
    def check_name_no_spaces(cls, v: str) -> str:
        """PEST++ observation names cannot contain spaces."""
        if " " in v:
            raise ValueError("Observation group name cannot contain spaces")
        return v

    @field_validator("weight")
    @classmethod
    def check_weight_non_negative(cls, v: float) -> float:
        """Weights must be non-negative."""
        if v < 0:
            raise ValueError("Weight must be non-negative")
        return v

check_name_no_spaces(v) classmethod

PEST++ observation names cannot contain spaces.

Source code in mobidic/calibration/config.py
@field_validator("name")
@classmethod
def check_name_no_spaces(cls, v: str) -> str:
    """PEST++ observation names cannot contain spaces."""
    if " " in v:
        raise ValueError("Observation group name cannot contain spaces")
    return v

check_weight_non_negative(v) classmethod

Weights must be non-negative.

Source code in mobidic/calibration/config.py
@field_validator("weight")
@classmethod
def check_weight_non_negative(cls, v: float) -> float:
    """Weights must be non-negative."""
    if v < 0:
        raise ValueError("Weight must be non-negative")
    return v

Bases: BaseModel

Configuration for a derived metric used as pseudo-observation.

Source code in mobidic/calibration/config.py
class MetricConfig(BaseModel):
    """Configuration for a derived metric used as pseudo-observation."""

    metric: str = Field(..., description="Metric name (nse, nse_log, pbias, peak_error, rmse, kge)")
    target: float = Field(..., description="Target value PEST++ tries to match (e.g., 1.0 for NSE)")
    weight: float = Field(1.0, description="Observation weight for this metric")

    @field_validator("metric")
    @classmethod
    def check_metric_name(cls, v: str) -> str:
        """Validate metric name is supported."""
        supported = {"nse", "nse_log", "pbias", "peak_error", "rmse", "kge"}
        if v not in supported:
            raise ValueError(f"Unsupported metric '{v}'. Supported: {sorted(supported)}")
        return v

    @field_validator("weight")
    @classmethod
    def check_weight_non_negative(cls, v: float) -> float:
        """Weights must be non-negative."""
        if v < 0:
            raise ValueError("Weight must be non-negative")
        return v

check_metric_name(v) classmethod

Validate metric name is supported.

Source code in mobidic/calibration/config.py
@field_validator("metric")
@classmethod
def check_metric_name(cls, v: str) -> str:
    """Validate metric name is supported."""
    supported = {"nse", "nse_log", "pbias", "peak_error", "rmse", "kge"}
    if v not in supported:
        raise ValueError(f"Unsupported metric '{v}'. Supported: {sorted(supported)}")
    return v

check_weight_non_negative(v) classmethod

Weights must be non-negative.

Source code in mobidic/calibration/config.py
@field_validator("weight")
@classmethod
def check_weight_non_negative(cls, v: float) -> float:
    """Weights must be non-negative."""
    if v < 0:
        raise ValueError("Weight must be non-negative")
    return v

Bases: BaseModel

A date range with start and end dates.

Used for both calibration_period and simulation_period.

Source code in mobidic/calibration/config.py
class CalibrationPeriod(BaseModel):
    """A date range with start and end dates.

    Used for both calibration_period and simulation_period.
    """

    start_date: str = Field(..., description="Start date (YYYY-MM-DD or YYYY-MM-DD HH:MM:SS)")
    end_date: str = Field(..., description="End date (YYYY-MM-DD or YYYY-MM-DD HH:MM:SS)")

    @field_validator("start_date", "end_date")
    @classmethod
    def check_date_format(cls, v: str) -> str:
        """Validate that date strings are parseable as YYYY-MM-DD or YYYY-MM-DD HH:MM:SS."""
        import re

        if not re.match(r"^\d{4}-\d{2}-\d{2}( \d{2}:\d{2}:\d{2})?$", v):
            raise ValueError(f"Invalid date format: '{v}'. Expected 'YYYY-MM-DD' or 'YYYY-MM-DD HH:MM:SS'.")
        try:
            pd.Timestamp(v)
        except ValueError as e:
            raise ValueError(f"Invalid date format: '{v}'. Expected 'YYYY-MM-DD' or 'YYYY-MM-DD HH:MM:SS'.") from e
        return v

    @model_validator(mode="after")
    def check_start_before_end(self) -> "CalibrationPeriod":
        """Validate that start_date < end_date."""
        start = pd.Timestamp(self.start_date)
        end = pd.Timestamp(self.end_date)
        if start >= end:
            raise ValueError(f"start_date ({self.start_date}) must be before end_date ({self.end_date})")
        return self

check_date_format(v) classmethod

Validate that date strings are parseable as YYYY-MM-DD or YYYY-MM-DD HH:MM:SS.

Source code in mobidic/calibration/config.py
@field_validator("start_date", "end_date")
@classmethod
def check_date_format(cls, v: str) -> str:
    """Validate that date strings are parseable as YYYY-MM-DD or YYYY-MM-DD HH:MM:SS."""
    import re

    if not re.match(r"^\d{4}-\d{2}-\d{2}( \d{2}:\d{2}:\d{2})?$", v):
        raise ValueError(f"Invalid date format: '{v}'. Expected 'YYYY-MM-DD' or 'YYYY-MM-DD HH:MM:SS'.")
    try:
        pd.Timestamp(v)
    except ValueError as e:
        raise ValueError(f"Invalid date format: '{v}'. Expected 'YYYY-MM-DD' or 'YYYY-MM-DD HH:MM:SS'.") from e
    return v

check_start_before_end()

Validate that start_date < end_date.

Source code in mobidic/calibration/config.py
@model_validator(mode="after")
def check_start_before_end(self) -> "CalibrationPeriod":
    """Validate that start_date < end_date."""
    start = pd.Timestamp(self.start_date)
    end = pd.Timestamp(self.end_date)
    if start >= end:
        raise ValueError(f"start_date ({self.start_date}) must be before end_date ({self.end_date})")
    return self

Load and validate calibration configuration from YAML file.

Parameters:

Name Type Description Default
config_path str | Path

Path to the calibration YAML file.

required

Returns:

Type Description
CalibrationConfig

Validated CalibrationConfig object.

Source code in mobidic/calibration/config.py
def load_calibration_config(config_path: str | Path) -> CalibrationConfig:
    """Load and validate calibration configuration from YAML file.

    Args:
        config_path: Path to the calibration YAML file.

    Returns:
        Validated CalibrationConfig object.
    """
    import yaml

    config_path = Path(config_path)
    if not config_path.exists():
        raise FileNotFoundError(f"Calibration config file not found: {config_path}")

    with open(config_path, "r", encoding="utf-8") as f:
        config_dict = yaml.safe_load(f)

    return CalibrationConfig(**config_dict)

Observations

Load observed data from CSV file for an observation group.

Parameters:

Name Type Description Default
obs_group ObservationGroup

Observation group configuration.

required
base_path Path

Base directory for resolving relative file paths.

required
start_date str | Timestamp | None

Optional start date to filter observations.

None
end_date str | Timestamp | None

Optional end date to filter observations.

None

Returns:

Type Description
DataFrame

DataFrame with ‘time’ (datetime) and ‘value’ columns, sorted by time.

Source code in mobidic/calibration/observation.py
def load_observations(
    obs_group: ObservationGroup,
    base_path: Path,
    start_date: str | pd.Timestamp | None = None,
    end_date: str | pd.Timestamp | None = None,
) -> pd.DataFrame:
    """Load observed data from CSV file for an observation group.

    Args:
        obs_group: Observation group configuration.
        base_path: Base directory for resolving relative file paths.
        start_date: Optional start date to filter observations.
        end_date: Optional end date to filter observations.

    Returns:
        DataFrame with 'time' (datetime) and 'value' columns, sorted by time.
    """
    obs_path = Path(obs_group.obs_file)
    if not obs_path.is_absolute():
        obs_path = base_path / obs_path

    if not obs_path.exists():
        raise FileNotFoundError(f"Observation file not found: {obs_path}")

    logger.info(f"Loading observations from: {obs_path}")

    df = pd.read_csv(obs_path)

    if obs_group.time_column not in df.columns:
        raise ValueError(f"Time column '{obs_group.time_column}' not found in {obs_path}")
    if obs_group.value_column not in df.columns:
        raise ValueError(f"Value column '{obs_group.value_column}' not found in {obs_path}")

    result = pd.DataFrame(
        {
            "time": pd.to_datetime(df[obs_group.time_column]),
            "value": pd.to_numeric(df[obs_group.value_column], errors="coerce"),
        }
    )
    result = result.dropna(subset=["value"]).sort_values("time").reset_index(drop=True)

    # Filter by calibration period
    if start_date is not None:
        result = result[result["time"] >= pd.Timestamp(start_date)]
    if end_date is not None:
        result = result[result["time"] <= pd.Timestamp(end_date)]

    result = result.reset_index(drop=True)
    logger.info(f"Loaded {len(result)} observations for group '{obs_group.name}' (reach {obs_group.reach_id})")

    return result

Align observed data to simulation time steps using nearest-neighbor matching.

Parameters:

Name Type Description Default
obs_df DataFrame

DataFrame with ‘time’ and ‘value’ columns.

required
sim_times list | ndarray | DatetimeIndex

Simulation time stamps.

required
tolerance_seconds float | None

Maximum allowed time difference in seconds for matching. If None, uses half the minimum simulation time step.

None

Returns:

Type Description
tuple[ndarray, ndarray, ndarray]

Tuple of (sim_indices, obs_values, obs_times): - sim_indices: Indices into sim_times where observations match. - obs_values: Observed values at matched times. - obs_times: Observed times at matched times.

Source code in mobidic/calibration/observation.py
def align_observations_to_simulation(
    obs_df: pd.DataFrame,
    sim_times: list | np.ndarray | pd.DatetimeIndex,
    tolerance_seconds: float | None = None,
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
    """Align observed data to simulation time steps using nearest-neighbor matching.

    Args:
        obs_df: DataFrame with 'time' and 'value' columns.
        sim_times: Simulation time stamps.
        tolerance_seconds: Maximum allowed time difference in seconds for matching.
            If None, uses half the minimum simulation time step.

    Returns:
        Tuple of (sim_indices, obs_values, obs_times):
            - sim_indices: Indices into sim_times where observations match.
            - obs_values: Observed values at matched times.
            - obs_times: Observed times at matched times.
    """
    sim_times = pd.DatetimeIndex(sim_times)

    if tolerance_seconds is None:
        if len(sim_times) > 1:
            dt = (sim_times[1] - sim_times[0]).total_seconds()
            tolerance_seconds = dt / 2
        else:
            tolerance_seconds = 3600  # Default 1 hour

    tolerance = pd.Timedelta(seconds=tolerance_seconds)

    sim_indices = []
    obs_values = []
    obs_times_matched = []

    for _, row in obs_df.iterrows():
        obs_time = row["time"]
        # Find nearest simulation time
        diffs = np.abs(sim_times - obs_time)
        nearest_idx = diffs.argmin()

        if diffs[nearest_idx] <= tolerance:
            sim_indices.append(nearest_idx)
            obs_values.append(row["value"])
            obs_times_matched.append(obs_time)

    return np.array(sim_indices), np.array(obs_values), np.array(obs_times_matched)

Performance metrics

Nash-Sutcliffe Efficiency.

NSE = 1 - sum((sim - obs)^2) / sum((obs - mean(obs))^2) Perfect score: 1.0

Parameters:

Name Type Description Default
simulated ndarray

Simulated values.

required
observed ndarray

Observed values.

required

Returns:

Type Description
float

NSE value (range: -inf to 1.0).

Source code in mobidic/calibration/metrics.py
def nse(simulated: np.ndarray, observed: np.ndarray) -> float:
    """Nash-Sutcliffe Efficiency.

    NSE = 1 - sum((sim - obs)^2) / sum((obs - mean(obs))^2)
    Perfect score: 1.0

    Args:
        simulated: Simulated values.
        observed: Observed values.

    Returns:
        NSE value (range: -inf to 1.0).
    """
    obs_mean = np.mean(observed)
    numerator = np.sum((simulated - observed) ** 2)
    denominator = np.sum((observed - obs_mean) ** 2)
    if denominator == 0:
        return np.nan
    return 1.0 - numerator / denominator

NSE on log-transformed flows (emphasizes low flows).

Parameters:

Name Type Description Default
simulated ndarray

Simulated values.

required
observed ndarray

Observed values.

required
eps float

Small constant to avoid log(0).

1e-06

Returns:

Type Description
float

NSE of log-transformed values.

Source code in mobidic/calibration/metrics.py
def nse_log(simulated: np.ndarray, observed: np.ndarray, eps: float = 1e-6) -> float:
    """NSE on log-transformed flows (emphasizes low flows).

    Args:
        simulated: Simulated values.
        observed: Observed values.
        eps: Small constant to avoid log(0).

    Returns:
        NSE of log-transformed values.
    """
    return nse(np.log(simulated + eps), np.log(observed + eps))

Kling-Gupta Efficiency.

KGE = 1 - sqrt((r - 1)^2 + (alpha - 1)^2 + (beta - 1)^2) where r = correlation, alpha = std(sim)/std(obs), beta = mean(sim)/mean(obs) Perfect score: 1.0

Parameters:

Name Type Description Default
simulated ndarray

Simulated values.

required
observed ndarray

Observed values.

required

Returns:

Type Description
float

KGE value (range: -inf to 1.0).

Source code in mobidic/calibration/metrics.py
def kge(simulated: np.ndarray, observed: np.ndarray) -> float:
    """Kling-Gupta Efficiency.

    KGE = 1 - sqrt((r - 1)^2 + (alpha - 1)^2 + (beta - 1)^2)
    where r = correlation, alpha = std(sim)/std(obs), beta = mean(sim)/mean(obs)
    Perfect score: 1.0

    Args:
        simulated: Simulated values.
        observed: Observed values.

    Returns:
        KGE value (range: -inf to 1.0).
    """
    obs_mean = np.mean(observed)
    sim_mean = np.mean(simulated)
    obs_std = np.std(observed)
    sim_std = np.std(simulated)

    if obs_std == 0 or obs_mean == 0:
        return np.nan

    # Correlation
    r = np.corrcoef(simulated, observed)[0, 1]
    # Variability ratio
    alpha = sim_std / obs_std
    # Bias ratio
    beta = sim_mean / obs_mean

    return 1.0 - np.sqrt((r - 1) ** 2 + (alpha - 1) ** 2 + (beta - 1) ** 2)

Percent bias as a fraction (not percentage).

pbias = sum(sim - obs) / sum(obs) Perfect score: 0.0

Parameters:

Name Type Description Default
simulated ndarray

Simulated values.

required
observed ndarray

Observed values.

required

Returns:

Type Description
float

Percent bias as fraction (e.g., 0.05 = 5% overestimation).

Source code in mobidic/calibration/metrics.py
def pbias(simulated: np.ndarray, observed: np.ndarray) -> float:
    """Percent bias as a fraction (not percentage).

    pbias = sum(sim - obs) / sum(obs)
    Perfect score: 0.0

    Args:
        simulated: Simulated values.
        observed: Observed values.

    Returns:
        Percent bias as fraction (e.g., 0.05 = 5% overestimation).
    """
    obs_sum = np.sum(observed)
    if obs_sum == 0:
        return np.nan
    return np.sum(simulated - observed) / obs_sum

Root Mean Square Error.

Perfect score: 0.0

Parameters:

Name Type Description Default
simulated ndarray

Simulated values.

required
observed ndarray

Observed values.

required

Returns:

Type Description
float

RMSE value.

Source code in mobidic/calibration/metrics.py
def rmse(simulated: np.ndarray, observed: np.ndarray) -> float:
    """Root Mean Square Error.

    Perfect score: 0.0

    Args:
        simulated: Simulated values.
        observed: Observed values.

    Returns:
        RMSE value.
    """
    return float(np.sqrt(np.mean((simulated - observed) ** 2)))

Relative peak error.

peak_error = (max(sim) - max(obs)) / max(obs) Perfect score: 0.0

Parameters:

Name Type Description Default
simulated ndarray

Simulated values.

required
observed ndarray

Observed values.

required

Returns:

Type Description
float

Relative peak error.

Source code in mobidic/calibration/metrics.py
def peak_error(simulated: np.ndarray, observed: np.ndarray) -> float:
    """Relative peak error.

    peak_error = (max(sim) - max(obs)) / max(obs)
    Perfect score: 0.0

    Args:
        simulated: Simulated values.
        observed: Observed values.

    Returns:
        Relative peak error.
    """
    obs_peak = np.max(observed)
    if obs_peak == 0:
        return np.nan
    return (np.max(simulated) - obs_peak) / obs_peak

Compute multiple metrics for a sim/obs pair.

Parameters:

Name Type Description Default
simulated ndarray

Simulated values.

required
observed ndarray

Observed values.

required
metric_names list[str]

List of metric names to compute.

required

Returns:

Type Description
dict[str, float]

Dict mapping metric name to computed value.

Source code in mobidic/calibration/metrics.py
def compute_metrics(
    simulated: np.ndarray,
    observed: np.ndarray,
    metric_names: list[str],
) -> dict[str, float]:
    """Compute multiple metrics for a sim/obs pair.

    Args:
        simulated: Simulated values.
        observed: Observed values.
        metric_names: List of metric names to compute.

    Returns:
        Dict mapping metric name to computed value.
    """
    results = {}
    for name in metric_names:
        if name not in METRIC_REGISTRY:
            raise ValueError(f"Unknown metric '{name}'. Available: {sorted(METRIC_REGISTRY.keys())}")
        func, _ = METRIC_REGISTRY[name]
        results[name] = func(simulated, observed)
    return results

Quick import

from mobidic.calibration import (
    # Setup and results
    PestSetup,
    CalibrationResults,
    # Configuration
    CalibrationConfig,
    CalibrationParameter,
    CalibrationPeriod,
    MetricConfig,
    ObservationGroup,
    load_calibration_config,
    # Observations
    load_observations,
    align_observations_to_simulation,
    # Metrics
    nse,
    nse_log,
    kge,
    pbias,
    rmse,
    peak_error,
    compute_metrics,
)

Workflow example

from pathlib import Path
from mobidic.calibration import PestSetup, load_calibration_config

# Load configuration
calib_config = load_calibration_config("Arno.calibration.yaml")

# Set up PEST++ files
pest = PestSetup(calib_config)
working_dir = pest.setup()

# Run calibration
results = pest.run()

# Extract results
optimal = results.get_optimal_parameters()
phi = results.get_objective_function_history()
sens = results.get_parameter_sensitivities()

See Examples for complete working scripts.