Skip to content

State I/O

The state I/O module provides functions to save and load MOBIDIC simulation state variables in NetCDF format, enabling simulation restart and state analysis.

Overview

State files store:

  • Grid variables: Capillary water (Wc), gravitational water (Wg), plant water (Wp), surface water (Ws)
  • Network variables: Discharge, lateral inflow for each reach
  • Metadata: Simulation time, grid coordinates, CRS, global attributes
  • CF-1.12 compliance: NetCDF Climate and Forecast metadata conventions

State files enable:

  • Warm start: Resume simulations from saved state
  • State analysis: Examine spatial patterns of soil moisture, discharge
  • Model evaluation: Compare simulated states against observations
  • Ensemble runs: Initialize multiple simulations from different states
  • Large simulations: Automatic file chunking for simulations exceeding size limits

Classes and functions

Incremental NetCDF state writer with buffering, flushing, and automatic chunking.

This class manages writing simulation states to NetCDF files across multiple timesteps, with configurable memory buffering, periodic flushing to disk, and automatic file chunking when size limits are reached.

File chunking occurs when the current file reaches max_file_size before a new flush. For effective chunking, flushing must be set to a positive integer (e.g., flush every N timesteps). If flushing=-1 (flush only at end), all data is written in one operation and chunking may not work as expected.

Note: Files may slightly exceed max_file_size by up to one flush worth of data, since the size check occurs before writing each flush batch.

Parameters:

Name Type Description Default
output_path str | Path

Path to output NetCDF file (will be created/overwritten)

required
grid_metadata dict

Dictionary with grid metadata (shape, resolution, crs, etc.)

required
network_size int

Number of reaches in network

required
output_states OutputStates

Configuration object specifying which state variables to save

required
flushing int

Flush interval (positive int = every N steps, -1 = only at end)

-1
max_file_size float

Maximum file size in MB before creating a new chunk (default: 500)

500.0
add_metadata dict | None

Additional global attributes (optional)

None

Examples:

>>> # With automatic chunking at 500 MB
>>> writer = StateWriter("states.nc", metadata, 1235, config.output_states,
...                      flushing=10, max_file_size=500)
>>> for step in range(num_steps):
...     writer.append_state(state, current_time)
>>> writer.close()
>>> # This may create: states_001.nc, states_002.nc, states_003.nc, etc.
Source code in mobidic/io/state.py
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
class StateWriter:
    """
    Incremental NetCDF state writer with buffering, flushing, and automatic chunking.

    This class manages writing simulation states to NetCDF files across multiple
    timesteps, with configurable memory buffering, periodic flushing to disk, and
    automatic file chunking when size limits are reached.

    File chunking occurs when the current file reaches max_file_size before a new flush.
    For effective chunking, flushing must be set to a positive integer (e.g., flush
    every N timesteps). If flushing=-1 (flush only at end), all data is written in
    one operation and chunking may not work as expected.

    Note: Files may slightly exceed max_file_size by up to one flush worth of data,
    since the size check occurs before writing each flush batch.

    Args:
        output_path: Path to output NetCDF file (will be created/overwritten)
        grid_metadata: Dictionary with grid metadata (shape, resolution, crs, etc.)
        network_size: Number of reaches in network
        output_states: Configuration object specifying which state variables to save
        flushing: Flush interval (positive int = every N steps, -1 = only at end)
        max_file_size: Maximum file size in MB before creating a new chunk (default: 500)
        add_metadata: Additional global attributes (optional)

    Examples:
        >>> # With automatic chunking at 500 MB
        >>> writer = StateWriter("states.nc", metadata, 1235, config.output_states,
        ...                      flushing=10, max_file_size=500)
        >>> for step in range(num_steps):
        ...     writer.append_state(state, current_time)
        >>> writer.close()
        >>> # This may create: states_001.nc, states_002.nc, states_003.nc, etc.
    """

    def __init__(
        self,
        output_path: str | Path,
        grid_metadata: dict,
        network_size: int,
        output_states: "OutputStates",  # noqa: F821
        flushing: int = -1,
        max_file_size: float = 500.0,
        add_metadata: dict | None = None,
        reservoir_size: int = 0,
    ):
        """Initialize the state writer and create the NetCDF file."""
        self.base_output_path = Path(output_path)
        self.base_output_path.parent.mkdir(parents=True, exist_ok=True)

        self.grid_metadata = grid_metadata
        self.network_size = network_size
        self.reservoir_size = reservoir_size
        self.output_states = output_states
        self.flushing = flushing
        self.max_file_size_bytes = max_file_size * 1024 * 1024  # Convert MB to bytes
        self.add_metadata = add_metadata or {}

        # Chunking state
        self.current_chunk = 1
        self.output_path = self._get_chunk_path(self.current_chunk)
        self.chunk_files_created = []

        # Remove existing chunk files if present
        self._remove_existing_chunks()

        # Get grid dimensions
        self.nrows, self.ncols = grid_metadata["shape"]
        resolution = grid_metadata["resolution"]
        xllcorner = grid_metadata["xllcorner"]
        yllcorner = grid_metadata["yllcorner"]

        # Create coordinate arrays
        self.x = xllcorner + np.arange(self.ncols) * resolution[0]
        self.y = yllcorner + np.arange(self.nrows) * resolution[1]
        self.reaches = np.arange(network_size)
        self.reservoirs = np.arange(reservoir_size) if reservoir_size > 0 else np.array([])

        # Initialize buffer for states
        self.buffer = []
        self.buffer_times = []
        self.step_count = 0

        # Create the NetCDF file structure
        self._initialize_file()

        logger.debug(
            f"StateWriter initialized: {self.output_path}, flushing={self.flushing}, "
            f"max_file_size={max_file_size:.1f} MB"
        )

    def _get_chunk_path(self, chunk_number: int) -> Path:
        """
        Generate path for a chunk file.

        Args:
            chunk_number: Chunk number (1-indexed)

        Returns:
            Path object with chunk suffix (e.g., states_001.nc)
        """
        stem = self.base_output_path.stem
        suffix = self.base_output_path.suffix
        parent = self.base_output_path.parent
        return parent / f"{stem}_{chunk_number:03d}{suffix}"

    def _remove_existing_chunks(self) -> None:
        """Remove any existing chunk files matching the base path pattern."""
        stem = self.base_output_path.stem
        suffix = self.base_output_path.suffix
        parent = self.base_output_path.parent

        # Find and remove all chunk files
        import glob

        pattern = str(parent / f"{stem}_[0-9][0-9][0-9]{suffix}")
        existing_chunks = glob.glob(pattern)

        for chunk_file in existing_chunks:
            chunk_path = Path(chunk_file)
            logger.info(f"Removing existing chunk file: {chunk_path}")
            chunk_path.unlink()

    def _check_and_rotate_file(self) -> None:
        """Check current file size and rotate to new chunk if needed."""
        if not self.output_path.exists():
            return

        file_size = self.output_path.stat().st_size

        if file_size >= self.max_file_size_bytes:
            # Current chunk exceeded size limit - rotate to new chunk
            file_size_mb = file_size / 1024 / 1024
            logger.info(
                f"File size limit reached ({file_size_mb:.1f} MB >= {self.max_file_size_bytes / 1024 / 1024:.1f} MB). "
                f"Creating new chunk file."
            )

            # Record the completed chunk
            self.chunk_files_created.append(self.output_path)

            # Increment chunk number and update path
            self.current_chunk += 1
            self.output_path = self._get_chunk_path(self.current_chunk)

            logger.info(f"Starting new chunk: {self.output_path}")

    def _initialize_file(self) -> None:
        """Create the NetCDF file with unlimited time dimension."""
        # Create coordinates dictionary
        coords = {
            "x": (["x"], self.x),
            "y": (["y"], self.y),
        }

        # Add reach coordinate if discharge is enabled
        if self.output_states.discharge:
            coords["reach"] = (["reach"], self.reaches)

        # Add reservoir coordinate if reservoir_states is enabled
        if self.output_states.reservoir_states and self.reservoir_size > 0:
            coords["reservoir"] = (["reservoir"], self.reservoirs)

        # Create empty dataset with coordinates only (no time yet)
        self.ds = xr.Dataset(coords=coords)

        # Add coordinate attributes
        self.ds["x"].attrs = {
            "standard_name": "projection_x_coordinate",
            "long_name": "x coordinate of projection",
            "units": "m",
            "axis": "X",
        }
        self.ds["y"].attrs = {
            "standard_name": "projection_y_coordinate",
            "long_name": "y coordinate of projection",
            "units": "m",
            "axis": "Y",
        }
        if "reach" in self.ds.coords:
            self.ds["reach"].attrs = {
                "long_name": "reach index",
                "description": "MOBIDIC reach ID (mobidic_id)",
                "units": "1",
            }
        if "reservoir" in self.ds.coords:
            self.ds["reservoir"].attrs = {
                "long_name": "reservoir index",
                "description": "Reservoir index (0-based)",
                "units": "1",
            }

        # Add grid mapping variable for CRS (CF-1.12 compliance)
        self.ds["crs"] = ([], 0)
        self.ds["crs"].attrs = crs_to_cf_attrs(self.grid_metadata.get("crs"))

        # Add global attributes
        self.ds.attrs["Conventions"] = "CF-1.12"
        self.ds.attrs["title"] = "MOBIDIC simulation states"
        self.ds.attrs["source"] = "MOBIDICpy simulation"
        self.ds.attrs["history"] = f"Created by MOBIDICpy version {__version__} at {datetime.now().isoformat()}"
        self.ds.attrs.update(self.add_metadata)

    def append_state(self, state: "SimulationState", time: datetime) -> None:  # noqa: F821
        """
        Add a state to the buffer and flush if necessary.

        Args:
            state: SimulationState object containing state variables
            time: Current simulation time
        """
        # Import here to avoid circular import
        from mobidic.core.simulation import SimulationState

        # Create a copy of the state to avoid storing references to mutable arrays
        # This is critical because the same state object is reused across timesteps
        # Copy reservoir states if present
        reservoir_states_copy = None
        if state.reservoir_states is not None:
            from copy import deepcopy

            reservoir_states_copy = deepcopy(state.reservoir_states)

        state_copy = SimulationState(
            wc=state.wc.copy(),
            wg=state.wg.copy(),
            wp=state.wp.copy() if state.wp is not None else None,
            ws=state.ws.copy(),
            discharge=state.discharge.copy(),
            lateral_inflow=state.lateral_inflow.copy(),
            reservoir_states=reservoir_states_copy,
        )

        # Add to buffer
        self.buffer.append(state_copy)
        self.buffer_times.append(time)
        self.step_count += 1

        # Check if we need to flush
        should_flush = False
        if self.flushing > 0 and self.step_count % self.flushing == 0:
            should_flush = True

        if should_flush:
            self.flush()
            logger.debug(f"StateWriter flushed at step {self.step_count}")

    def flush(self) -> None:
        """Write buffered states to disk using efficient append mode."""
        if not self.buffer:
            return

        # Check if we need to rotate to a new chunk BEFORE writing
        # This prevents files from exceeding the size limit
        self._check_and_rotate_file()

        logger.debug(f"Flushing {len(self.buffer)} states to {self.output_path}")

        # Convert buffer to arrays with time dimension
        num_buffered = len(self.buffer)

        # Initialize data arrays for this flush
        data_vars = {}

        # Grid variables
        if self.output_states.soil_capillary:
            wc_data = np.array([s.wc for s in self.buffer])
            data_vars["Wc"] = (["time", "y", "x"], wc_data)

        if self.output_states.soil_gravitational:
            wg_data = np.array([s.wg for s in self.buffer])
            data_vars["Wg"] = (["time", "y", "x"], wg_data)

        if self.output_states.soil_plant:
            # Check if any state has wp (plant water)
            has_wp = any(s.wp is not None for s in self.buffer)
            if has_wp:
                wp_data = np.array(
                    [s.wp if s.wp is not None else np.full((self.nrows, self.ncols), np.nan) for s in self.buffer]
                )
                data_vars["Wp"] = (["time", "y", "x"], wp_data)

        if self.output_states.soil_surface:
            ws_data = np.array([s.ws for s in self.buffer])
            data_vars["Ws"] = (["time", "y", "x"], ws_data)

        # Network variables
        if self.output_states.discharge:
            discharge_data = np.array([s.discharge for s in self.buffer])
            lateral_inflow_data = np.array([s.lateral_inflow for s in self.buffer])
            data_vars["discharge"] = (["time", "reach"], discharge_data)
            data_vars["lateral_inflow"] = (["time", "reach"], lateral_inflow_data)

        # Reservoir variables
        if self.output_states.reservoir_states and self.reservoir_size > 0:
            # Check if any state has reservoir_states
            has_reservoirs = any(s.reservoir_states is not None for s in self.buffer)
            if has_reservoirs:
                # Extract reservoir data (volume, stage, inflow, outflow)
                res_volume_data = np.array([[r.volume for r in s.reservoir_states] for s in self.buffer])
                res_stage_data = np.array([[r.stage for r in s.reservoir_states] for s in self.buffer])
                res_inflow_data = np.array([[r.inflow for r in s.reservoir_states] for s in self.buffer])
                res_outflow_data = np.array([[r.outflow for r in s.reservoir_states] for s in self.buffer])

                data_vars["reservoir_volume"] = (["time", "reservoir"], res_volume_data)
                data_vars["reservoir_stage"] = (["time", "reservoir"], res_stage_data)
                data_vars["reservoir_inflow"] = (["time", "reservoir"], res_inflow_data)
                data_vars["reservoir_outflow"] = (["time", "reservoir"], res_outflow_data)

        # Create dataset for this flush
        flush_ds = xr.Dataset(
            data_vars=data_vars,
            coords={
                "time": (["time"], self.buffer_times),
                "x": (["x"], self.x),
                "y": (["y"], self.y),
            },
        )

        # Add reach coordinate if discharge is enabled
        if self.output_states.discharge:
            flush_ds.coords["reach"] = (["reach"], self.reaches)

        # Add reservoir coordinate if reservoir_states is enabled
        if self.output_states.reservoir_states and self.reservoir_size > 0:
            flush_ds.coords["reservoir"] = (["reservoir"], self.reservoirs)

        # Add variable attributes
        if "Wc" in flush_ds:
            flush_ds["Wc"].attrs = {
                "long_name": "Capillary Water Content",
                "units": "m",
                "description": "Water held by capillary forces in soil",
                "grid_mapping": "crs",
            }

        if "Wg" in flush_ds:
            flush_ds["Wg"].attrs = {
                "long_name": "Gravitational Water Content",
                "units": "m",
                "description": "Drainable water in soil large pores",
                "grid_mapping": "crs",
            }

        if "Wp" in flush_ds:
            flush_ds["Wp"].attrs = {
                "long_name": "Plant/Canopy Water Content",
                "units": "m",
                "description": "Water intercepted by vegetation canopy",
                "grid_mapping": "crs",
            }

        if "Ws" in flush_ds:
            flush_ds["Ws"].attrs = {
                "long_name": "Surface Water Content",
                "units": "m",
                "description": "Water in surface depressions",
                "grid_mapping": "crs",
            }

        if "discharge" in flush_ds:
            flush_ds["discharge"].attrs = {
                "long_name": "River Discharge",
                "units": "m3 s-1",
                "description": "Discharge at downstream end of each reach",
            }

        if "lateral_inflow" in flush_ds:
            flush_ds["lateral_inflow"].attrs = {
                "long_name": "Lateral Inflow",
                "units": "m3 s-1",
                "description": "Lateral inflow from hillslope to each reach",
            }

        if "reservoir_volume" in flush_ds:
            flush_ds["reservoir_volume"].attrs = {
                "long_name": "Reservoir Volume",
                "units": "m3",
                "description": "Water volume stored in reservoir",
            }

        if "reservoir_stage" in flush_ds:
            flush_ds["reservoir_stage"].attrs = {
                "long_name": "Reservoir Stage",
                "units": "m",
                "description": "Water level/stage in reservoir",
            }

        if "reservoir_inflow" in flush_ds:
            flush_ds["reservoir_inflow"].attrs = {
                "long_name": "Reservoir Inflow",
                "units": "m3 s-1",
                "description": "Total inflow to reservoir from upstream reaches",
            }

        if "reservoir_outflow" in flush_ds:
            flush_ds["reservoir_outflow"].attrs = {
                "long_name": "Reservoir Outflow",
                "units": "m3 s-1",
                "description": "Total outflow from reservoir (release + withdrawals)",
            }

        # Add time attributes
        flush_ds["time"].attrs = {
            "long_name": "simulation time",
            "axis": "T",
        }

        # Add coordinate attributes
        flush_ds["x"].attrs = {
            "standard_name": "projection_x_coordinate",
            "long_name": "x coordinate of projection",
            "units": "m",
            "axis": "X",
        }
        flush_ds["y"].attrs = {
            "standard_name": "projection_y_coordinate",
            "long_name": "y coordinate of projection",
            "units": "m",
            "axis": "Y",
        }
        if "reach" in flush_ds.coords:
            flush_ds["reach"].attrs = {
                "long_name": "reach index",
                "description": "MOBIDIC reach ID (mobidic_id)",
                "units": "1",
            }

        # Determine write mode: first write or append
        is_first_write = not self.output_path.exists()

        if is_first_write:
            # First write - add CRS variable and global attributes
            flush_ds["crs"] = xr.DataArray(
                data=np.int32(0),  # Explicitly use int32 to avoid type issues
                dims=[],
                attrs=crs_to_cf_attrs(self.grid_metadata.get("crs")),
            )
            flush_ds.attrs["Conventions"] = "CF-1.12"
            flush_ds.attrs["title"] = "MOBIDIC simulation states"
            flush_ds.attrs["source"] = "MOBIDICpy simulation"
            flush_ds.attrs["history"] = f"Created by MOBIDICpy version {__version__} at {datetime.now().isoformat()}"
            flush_ds.attrs.update(self.add_metadata)

            # Write initial file with compression
            encoding = {var: {"zlib": True, "complevel": 4} for var in flush_ds.data_vars if var != "crs"}
            encoding["crs"] = {"dtype": "int32"}  # Explicitly set CRS dtype

            flush_ds.to_netcdf(self.output_path, encoding=encoding, engine="netcdf4", mode="w", unlimited_dims=["time"])
            flush_ds.close()
        else:
            # Append mode - use efficient NetCDF append via netCDF4 library
            # This avoids the slow read-concatenate-write cycle
            import netCDF4 as nc4

            # Open existing file in append mode
            with nc4.Dataset(self.output_path, "a") as nc_file:
                # Get current time dimension length
                time_dim_len = len(nc_file.dimensions["time"])

                # Extend time dimension
                new_time_len = time_dim_len + num_buffered

                # Append time values
                time_var = nc_file.variables["time"]
                for i, t in enumerate(self.buffer_times):
                    time_var[time_dim_len + i] = nc4.date2num(t, time_var.units, time_var.calendar)

                # Append data variables
                for var_name in flush_ds.data_vars:
                    if var_name == "crs":
                        continue  # Skip CRS - it's a scalar

                    nc_var = nc_file.variables[var_name]
                    data = flush_ds[var_name].values

                    # Append along time dimension
                    nc_var[time_dim_len:new_time_len, ...] = data

            flush_ds.close()

        # Clear buffer
        self.buffer = []
        self.buffer_times = []

    def close(self) -> None:
        """Flush any remaining buffered states and close the writer."""
        # Flush remaining buffer
        if self.buffer:
            self.flush()

        # Add the current (last) chunk to the list
        if self.output_path.exists():
            self.chunk_files_created.append(self.output_path)

        # Report summary
        total_chunks = len(self.chunk_files_created)
        if total_chunks == 1:
            logger.success(f"States file saved: {self.chunk_files_created[0]} ({self.step_count} states written)")
            logger.debug(f"File size: {self.chunk_files_created[0].stat().st_size / 1024 / 1024:.2f} MB")
        else:
            logger.success(f"States saved in {total_chunks} chunk files ({self.step_count} states total):")
            total_size_mb = 0
            for chunk_path in self.chunk_files_created:
                size_mb = chunk_path.stat().st_size / 1024 / 1024
                total_size_mb += size_mb
                logger.info(f"  {chunk_path.name}: {size_mb:.2f} MB")
            logger.debug(f"Total size: {total_size_mb:.2f} MB")

    def __enter__(self):
        """Context manager entry."""
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        """Context manager exit - ensures file is closed."""
        self.close()
        return False

__enter__()

Context manager entry.

Source code in mobidic/io/state.py
def __enter__(self):
    """Context manager entry."""
    return self

__exit__(exc_type, exc_val, exc_tb)

Context manager exit - ensures file is closed.

Source code in mobidic/io/state.py
def __exit__(self, exc_type, exc_val, exc_tb):
    """Context manager exit - ensures file is closed."""
    self.close()
    return False

__init__(output_path, grid_metadata, network_size, output_states, flushing=-1, max_file_size=500.0, add_metadata=None, reservoir_size=0)

Initialize the state writer and create the NetCDF file.

Source code in mobidic/io/state.py
def __init__(
    self,
    output_path: str | Path,
    grid_metadata: dict,
    network_size: int,
    output_states: "OutputStates",  # noqa: F821
    flushing: int = -1,
    max_file_size: float = 500.0,
    add_metadata: dict | None = None,
    reservoir_size: int = 0,
):
    """Initialize the state writer and create the NetCDF file."""
    self.base_output_path = Path(output_path)
    self.base_output_path.parent.mkdir(parents=True, exist_ok=True)

    self.grid_metadata = grid_metadata
    self.network_size = network_size
    self.reservoir_size = reservoir_size
    self.output_states = output_states
    self.flushing = flushing
    self.max_file_size_bytes = max_file_size * 1024 * 1024  # Convert MB to bytes
    self.add_metadata = add_metadata or {}

    # Chunking state
    self.current_chunk = 1
    self.output_path = self._get_chunk_path(self.current_chunk)
    self.chunk_files_created = []

    # Remove existing chunk files if present
    self._remove_existing_chunks()

    # Get grid dimensions
    self.nrows, self.ncols = grid_metadata["shape"]
    resolution = grid_metadata["resolution"]
    xllcorner = grid_metadata["xllcorner"]
    yllcorner = grid_metadata["yllcorner"]

    # Create coordinate arrays
    self.x = xllcorner + np.arange(self.ncols) * resolution[0]
    self.y = yllcorner + np.arange(self.nrows) * resolution[1]
    self.reaches = np.arange(network_size)
    self.reservoirs = np.arange(reservoir_size) if reservoir_size > 0 else np.array([])

    # Initialize buffer for states
    self.buffer = []
    self.buffer_times = []
    self.step_count = 0

    # Create the NetCDF file structure
    self._initialize_file()

    logger.debug(
        f"StateWriter initialized: {self.output_path}, flushing={self.flushing}, "
        f"max_file_size={max_file_size:.1f} MB"
    )

append_state(state, time)

Add a state to the buffer and flush if necessary.

Parameters:

Name Type Description Default
state SimulationState

SimulationState object containing state variables

required
time datetime

Current simulation time

required
Source code in mobidic/io/state.py
def append_state(self, state: "SimulationState", time: datetime) -> None:  # noqa: F821
    """
    Add a state to the buffer and flush if necessary.

    Args:
        state: SimulationState object containing state variables
        time: Current simulation time
    """
    # Import here to avoid circular import
    from mobidic.core.simulation import SimulationState

    # Create a copy of the state to avoid storing references to mutable arrays
    # This is critical because the same state object is reused across timesteps
    # Copy reservoir states if present
    reservoir_states_copy = None
    if state.reservoir_states is not None:
        from copy import deepcopy

        reservoir_states_copy = deepcopy(state.reservoir_states)

    state_copy = SimulationState(
        wc=state.wc.copy(),
        wg=state.wg.copy(),
        wp=state.wp.copy() if state.wp is not None else None,
        ws=state.ws.copy(),
        discharge=state.discharge.copy(),
        lateral_inflow=state.lateral_inflow.copy(),
        reservoir_states=reservoir_states_copy,
    )

    # Add to buffer
    self.buffer.append(state_copy)
    self.buffer_times.append(time)
    self.step_count += 1

    # Check if we need to flush
    should_flush = False
    if self.flushing > 0 and self.step_count % self.flushing == 0:
        should_flush = True

    if should_flush:
        self.flush()
        logger.debug(f"StateWriter flushed at step {self.step_count}")

close()

Flush any remaining buffered states and close the writer.

Source code in mobidic/io/state.py
def close(self) -> None:
    """Flush any remaining buffered states and close the writer."""
    # Flush remaining buffer
    if self.buffer:
        self.flush()

    # Add the current (last) chunk to the list
    if self.output_path.exists():
        self.chunk_files_created.append(self.output_path)

    # Report summary
    total_chunks = len(self.chunk_files_created)
    if total_chunks == 1:
        logger.success(f"States file saved: {self.chunk_files_created[0]} ({self.step_count} states written)")
        logger.debug(f"File size: {self.chunk_files_created[0].stat().st_size / 1024 / 1024:.2f} MB")
    else:
        logger.success(f"States saved in {total_chunks} chunk files ({self.step_count} states total):")
        total_size_mb = 0
        for chunk_path in self.chunk_files_created:
            size_mb = chunk_path.stat().st_size / 1024 / 1024
            total_size_mb += size_mb
            logger.info(f"  {chunk_path.name}: {size_mb:.2f} MB")
        logger.debug(f"Total size: {total_size_mb:.2f} MB")

flush()

Write buffered states to disk using efficient append mode.

Source code in mobidic/io/state.py
def flush(self) -> None:
    """Write buffered states to disk using efficient append mode."""
    if not self.buffer:
        return

    # Check if we need to rotate to a new chunk BEFORE writing
    # This prevents files from exceeding the size limit
    self._check_and_rotate_file()

    logger.debug(f"Flushing {len(self.buffer)} states to {self.output_path}")

    # Convert buffer to arrays with time dimension
    num_buffered = len(self.buffer)

    # Initialize data arrays for this flush
    data_vars = {}

    # Grid variables
    if self.output_states.soil_capillary:
        wc_data = np.array([s.wc for s in self.buffer])
        data_vars["Wc"] = (["time", "y", "x"], wc_data)

    if self.output_states.soil_gravitational:
        wg_data = np.array([s.wg for s in self.buffer])
        data_vars["Wg"] = (["time", "y", "x"], wg_data)

    if self.output_states.soil_plant:
        # Check if any state has wp (plant water)
        has_wp = any(s.wp is not None for s in self.buffer)
        if has_wp:
            wp_data = np.array(
                [s.wp if s.wp is not None else np.full((self.nrows, self.ncols), np.nan) for s in self.buffer]
            )
            data_vars["Wp"] = (["time", "y", "x"], wp_data)

    if self.output_states.soil_surface:
        ws_data = np.array([s.ws for s in self.buffer])
        data_vars["Ws"] = (["time", "y", "x"], ws_data)

    # Network variables
    if self.output_states.discharge:
        discharge_data = np.array([s.discharge for s in self.buffer])
        lateral_inflow_data = np.array([s.lateral_inflow for s in self.buffer])
        data_vars["discharge"] = (["time", "reach"], discharge_data)
        data_vars["lateral_inflow"] = (["time", "reach"], lateral_inflow_data)

    # Reservoir variables
    if self.output_states.reservoir_states and self.reservoir_size > 0:
        # Check if any state has reservoir_states
        has_reservoirs = any(s.reservoir_states is not None for s in self.buffer)
        if has_reservoirs:
            # Extract reservoir data (volume, stage, inflow, outflow)
            res_volume_data = np.array([[r.volume for r in s.reservoir_states] for s in self.buffer])
            res_stage_data = np.array([[r.stage for r in s.reservoir_states] for s in self.buffer])
            res_inflow_data = np.array([[r.inflow for r in s.reservoir_states] for s in self.buffer])
            res_outflow_data = np.array([[r.outflow for r in s.reservoir_states] for s in self.buffer])

            data_vars["reservoir_volume"] = (["time", "reservoir"], res_volume_data)
            data_vars["reservoir_stage"] = (["time", "reservoir"], res_stage_data)
            data_vars["reservoir_inflow"] = (["time", "reservoir"], res_inflow_data)
            data_vars["reservoir_outflow"] = (["time", "reservoir"], res_outflow_data)

    # Create dataset for this flush
    flush_ds = xr.Dataset(
        data_vars=data_vars,
        coords={
            "time": (["time"], self.buffer_times),
            "x": (["x"], self.x),
            "y": (["y"], self.y),
        },
    )

    # Add reach coordinate if discharge is enabled
    if self.output_states.discharge:
        flush_ds.coords["reach"] = (["reach"], self.reaches)

    # Add reservoir coordinate if reservoir_states is enabled
    if self.output_states.reservoir_states and self.reservoir_size > 0:
        flush_ds.coords["reservoir"] = (["reservoir"], self.reservoirs)

    # Add variable attributes
    if "Wc" in flush_ds:
        flush_ds["Wc"].attrs = {
            "long_name": "Capillary Water Content",
            "units": "m",
            "description": "Water held by capillary forces in soil",
            "grid_mapping": "crs",
        }

    if "Wg" in flush_ds:
        flush_ds["Wg"].attrs = {
            "long_name": "Gravitational Water Content",
            "units": "m",
            "description": "Drainable water in soil large pores",
            "grid_mapping": "crs",
        }

    if "Wp" in flush_ds:
        flush_ds["Wp"].attrs = {
            "long_name": "Plant/Canopy Water Content",
            "units": "m",
            "description": "Water intercepted by vegetation canopy",
            "grid_mapping": "crs",
        }

    if "Ws" in flush_ds:
        flush_ds["Ws"].attrs = {
            "long_name": "Surface Water Content",
            "units": "m",
            "description": "Water in surface depressions",
            "grid_mapping": "crs",
        }

    if "discharge" in flush_ds:
        flush_ds["discharge"].attrs = {
            "long_name": "River Discharge",
            "units": "m3 s-1",
            "description": "Discharge at downstream end of each reach",
        }

    if "lateral_inflow" in flush_ds:
        flush_ds["lateral_inflow"].attrs = {
            "long_name": "Lateral Inflow",
            "units": "m3 s-1",
            "description": "Lateral inflow from hillslope to each reach",
        }

    if "reservoir_volume" in flush_ds:
        flush_ds["reservoir_volume"].attrs = {
            "long_name": "Reservoir Volume",
            "units": "m3",
            "description": "Water volume stored in reservoir",
        }

    if "reservoir_stage" in flush_ds:
        flush_ds["reservoir_stage"].attrs = {
            "long_name": "Reservoir Stage",
            "units": "m",
            "description": "Water level/stage in reservoir",
        }

    if "reservoir_inflow" in flush_ds:
        flush_ds["reservoir_inflow"].attrs = {
            "long_name": "Reservoir Inflow",
            "units": "m3 s-1",
            "description": "Total inflow to reservoir from upstream reaches",
        }

    if "reservoir_outflow" in flush_ds:
        flush_ds["reservoir_outflow"].attrs = {
            "long_name": "Reservoir Outflow",
            "units": "m3 s-1",
            "description": "Total outflow from reservoir (release + withdrawals)",
        }

    # Add time attributes
    flush_ds["time"].attrs = {
        "long_name": "simulation time",
        "axis": "T",
    }

    # Add coordinate attributes
    flush_ds["x"].attrs = {
        "standard_name": "projection_x_coordinate",
        "long_name": "x coordinate of projection",
        "units": "m",
        "axis": "X",
    }
    flush_ds["y"].attrs = {
        "standard_name": "projection_y_coordinate",
        "long_name": "y coordinate of projection",
        "units": "m",
        "axis": "Y",
    }
    if "reach" in flush_ds.coords:
        flush_ds["reach"].attrs = {
            "long_name": "reach index",
            "description": "MOBIDIC reach ID (mobidic_id)",
            "units": "1",
        }

    # Determine write mode: first write or append
    is_first_write = not self.output_path.exists()

    if is_first_write:
        # First write - add CRS variable and global attributes
        flush_ds["crs"] = xr.DataArray(
            data=np.int32(0),  # Explicitly use int32 to avoid type issues
            dims=[],
            attrs=crs_to_cf_attrs(self.grid_metadata.get("crs")),
        )
        flush_ds.attrs["Conventions"] = "CF-1.12"
        flush_ds.attrs["title"] = "MOBIDIC simulation states"
        flush_ds.attrs["source"] = "MOBIDICpy simulation"
        flush_ds.attrs["history"] = f"Created by MOBIDICpy version {__version__} at {datetime.now().isoformat()}"
        flush_ds.attrs.update(self.add_metadata)

        # Write initial file with compression
        encoding = {var: {"zlib": True, "complevel": 4} for var in flush_ds.data_vars if var != "crs"}
        encoding["crs"] = {"dtype": "int32"}  # Explicitly set CRS dtype

        flush_ds.to_netcdf(self.output_path, encoding=encoding, engine="netcdf4", mode="w", unlimited_dims=["time"])
        flush_ds.close()
    else:
        # Append mode - use efficient NetCDF append via netCDF4 library
        # This avoids the slow read-concatenate-write cycle
        import netCDF4 as nc4

        # Open existing file in append mode
        with nc4.Dataset(self.output_path, "a") as nc_file:
            # Get current time dimension length
            time_dim_len = len(nc_file.dimensions["time"])

            # Extend time dimension
            new_time_len = time_dim_len + num_buffered

            # Append time values
            time_var = nc_file.variables["time"]
            for i, t in enumerate(self.buffer_times):
                time_var[time_dim_len + i] = nc4.date2num(t, time_var.units, time_var.calendar)

            # Append data variables
            for var_name in flush_ds.data_vars:
                if var_name == "crs":
                    continue  # Skip CRS - it's a scalar

                nc_var = nc_file.variables[var_name]
                data = flush_ds[var_name].values

                # Append along time dimension
                nc_var[time_dim_len:new_time_len, ...] = data

        flush_ds.close()

    # Clear buffer
    self.buffer = []
    self.buffer_times = []

Load simulation state from NetCDF file.

Supports both single-timestep and multi-timestep state files. For multi-timestep files, loads the specified time index (default: last timestep). Automatically detects chunk files (e.g., states_001.nc) when the base path doesn’t exist.

If a state variable is missing from the file, it will be initialized using the initial conditions from the configuration file (if config and gisdata are provided), otherwise grid variables will be initialized with zeros and network variables will be initialized with zeros.

Parameters:

Name Type Description Default
input_path str | Path

Path to output NetCDF file (may be chunked as _001.nc, _002.nc, etc.)

required
network_size int

Expected number of reaches in network. Validates consistency between the saved state and current model setup.

required
time_index int

Index of timestep to load (default: -1 = last timestep)

-1
config MOBIDICConfig | None

Optional MOBIDIC configuration for initializing missing variables

None
gisdata GISData | None

Optional GIS data for initializing missing variables

None

Returns:

Type Description
tuple[SimulationState, datetime, dict]

Tuple of (state, time, metadata) where: - state: SimulationState object - time: Simulation time from file - metadata: Grid metadata dictionary

Raises:

Type Description
FileNotFoundError

If input file does not exist

ValueError

If file format is invalid

Examples:

>>> from mobidic.io import load_state
>>> # Load last timestep
>>> state, time, metadata = load_state("states.nc", 1235)
>>> # Load first timestep with config (for missing variables)
>>> state, time, metadata = load_state("states.nc", 1235, time_index=0,
...                                      config=config, gisdata=gisdata)
>>> # Works with chunked files too
>>> state, time, metadata = load_state("states.nc", 1235)  # Auto-loads states_001.nc
Source code in mobidic/io/state.py
def load_state(
    input_path: str | Path,
    network_size: int,
    time_index: int = -1,
    config: "MOBIDICConfig | None" = None,  # noqa: F821
    gisdata: "GISData | None" = None,  # noqa: F821
) -> tuple["SimulationState", datetime, dict]:  # noqa: F821
    """
    Load simulation state from NetCDF file.

    Supports both single-timestep and multi-timestep state files.
    For multi-timestep files, loads the specified time index (default: last timestep).
    Automatically detects chunk files (e.g., states_001.nc) when the base path
    doesn't exist.

    If a state variable is missing from the file, it will be initialized using
    the initial conditions from the configuration file (if config and gisdata are
    provided), otherwise grid variables will be initialized with zeros and network
    variables will be initialized with zeros.

    Args:
        input_path: Path to output NetCDF file (may be chunked as _001.nc, _002.nc, etc.)
        network_size: Expected number of reaches in network. Validates consistency between
                        the saved state and current model setup.
        time_index: Index of timestep to load (default: -1 = last timestep)
        config: Optional MOBIDIC configuration for initializing missing variables
        gisdata: Optional GIS data for initializing missing variables

    Returns:
        Tuple of (state, time, metadata) where:
            - state: SimulationState object
            - time: Simulation time from file
            - metadata: Grid metadata dictionary

    Raises:
        FileNotFoundError: If input file does not exist
        ValueError: If file format is invalid

    Examples:
        >>> from mobidic.io import load_state
        >>> # Load last timestep
        >>> state, time, metadata = load_state("states.nc", 1235)
        >>> # Load first timestep with config (for missing variables)
        >>> state, time, metadata = load_state("states.nc", 1235, time_index=0,
        ...                                      config=config, gisdata=gisdata)
        >>> # Works with chunked files too
        >>> state, time, metadata = load_state("states.nc", 1235)  # Auto-loads states_001.nc
    """
    input_path = Path(input_path)

    # Check if the exact path exists, otherwise look for chunk files
    if not input_path.exists():
        # Try to find chunk files
        stem = input_path.stem
        suffix = input_path.suffix
        parent = input_path.parent

        # Look for first chunk
        first_chunk = parent / f"{stem}_001{suffix}"
        if first_chunk.exists():
            logger.info(f"Base file not found, using chunk file: {first_chunk}")
            input_path = first_chunk
        else:
            raise FileNotFoundError(f"State file not found: {input_path} (also checked for {first_chunk})")

    logger.info(f"Loading simulation state from NetCDF: {input_path}")

    # Load dataset
    ds = xr.open_dataset(input_path)

    # Check if time is a dimension or scalar coordinate
    has_time_dim = "time" in ds.dims
    if has_time_dim:
        num_times = len(ds.time)
        logger.debug(f"Multi-timestep file detected: {num_times} timesteps available")
        # Select the specified timestep
        ds = ds.isel(time=time_index)
        logger.info(f"Loading timestep {time_index} (out of {num_times})")

    # Get grid shape from coordinates (needed for initializing missing variables)
    nrows = len(ds.y)
    ncols = len(ds.x)

    # Check if we can use config-based initialization for missing variables
    can_use_config = config is not None and gisdata is not None

    # Extract state variables (conditionally, since they may not be present)
    # Grid variables: initialize from config if missing and config provided, otherwise use zeros
    if "Wc" in ds:
        wc = ds["Wc"].values
    else:
        if can_use_config:
            logger.warning("Wc not found in state file, initializing from config initial conditions")
            from mobidic.core import constants as const

            # Apply same preprocessing as in Simulation.__init__
            wc0 = gisdata.grids["Wc0"] * config.parameters.multipliers.Wc_factor
            wc0 = np.maximum(wc0, const.W_MIN)

            # Apply Wg_Wc_tr transition if specified
            Wg_Wc_tr = config.parameters.multipliers.Wg_Wc_tr
            if Wg_Wc_tr >= 0:
                wg0 = gisdata.grids["Wg0"] * config.parameters.multipliers.Wg_factor
                wg0 = np.maximum(wg0, const.W_MIN)
                wtot = wc0 + wg0
                wg0 = np.minimum(Wg_Wc_tr * wg0, wtot)
                wc0 = wtot - wg0

            # Apply initial saturation from config
            wcsat = config.initial_conditions.Wcsat
            wc = wc0 * wcsat

            # Set NaN outside domain
            flow_acc = gisdata.grids["flow_acc"]
            wc[np.isnan(flow_acc)] = np.nan
        else:
            logger.warning("Wc not found in state file, initializing with zeros")
            wc = np.zeros((nrows, ncols))

    if "Wg" in ds:
        wg = ds["Wg"].values
    else:
        if can_use_config:
            logger.warning("Wg not found in state file, initializing from config initial conditions")
            from mobidic.core import constants as const

            # Apply same preprocessing as in Simulation.__init__
            wc0 = gisdata.grids["Wc0"] * config.parameters.multipliers.Wc_factor
            wg0 = gisdata.grids["Wg0"] * config.parameters.multipliers.Wg_factor
            wc0 = np.maximum(wc0, const.W_MIN)
            wg0 = np.maximum(wg0, const.W_MIN)

            # Apply Wg_Wc_tr transition if specified
            Wg_Wc_tr = config.parameters.multipliers.Wg_Wc_tr
            if Wg_Wc_tr >= 0:
                wtot = wc0 + wg0
                wg0 = np.minimum(Wg_Wc_tr * wg0, wtot)
                wc0 = wtot - wg0

            # Apply initial saturation from config
            wgsat = config.initial_conditions.Wgsat
            wg = wg0 * wgsat

            # Set NaN outside domain
            flow_acc = gisdata.grids["flow_acc"]
            wg[np.isnan(flow_acc)] = np.nan
        else:
            logger.warning("Wg not found in state file, initializing with zeros")
            wg = np.zeros((nrows, ncols))

    if "Ws" in ds:
        ws = ds["Ws"].values
    else:
        if can_use_config:
            logger.warning("Ws not found in state file, initializing from config initial conditions")
            # Initialize surface water from config
            ws_init = config.initial_conditions.Ws
            ws = np.full((nrows, ncols), ws_init)

            # Set NaN outside domain
            flow_acc = gisdata.grids["flow_acc"]
            ws[np.isnan(flow_acc)] = np.nan
        else:
            logger.warning("Ws not found in state file, initializing with zeros")
            ws = np.zeros((nrows, ncols))

    # Plant water: optional, initialize with zeros if missing
    if "Wp" in ds:
        wp = ds["Wp"].values
    else:
        if can_use_config:
            logger.warning("Wp not found in state file, initializing from config initial conditions")
            # Initialize plant water (currently zeros)
            wp = np.zeros((nrows, ncols))

            # Set NaN outside domain
            flow_acc = gisdata.grids["flow_acc"]
            wp[np.isnan(flow_acc)] = np.nan
        else:
            logger.warning("Wp not found in state file, initializing with zeros")
            wp = np.zeros((nrows, ncols))

    # Network variables: initialize with zeros if missing
    if "discharge" in ds:
        discharge = ds["discharge"].values
        # For multi-timestep files, discharge might be 1D after isel
        # For single-timestep files, it's already 1D
        if discharge.ndim > 1:
            discharge = discharge.flatten()
        # Check array shapes
        if len(discharge) != network_size:
            logger.warning(
                f"Network size mismatch: expected {network_size}, got {len(discharge)}. "
                "This may cause issues if network has changed."
            )
    else:
        logger.warning("discharge not found in state file, initializing with zeros")
        discharge = np.zeros(network_size)

    if "lateral_inflow" in ds:
        lateral_inflow = ds["lateral_inflow"].values
        # For multi-timestep files, lateral_inflow might be 1D after isel
        if lateral_inflow.ndim > 1:
            lateral_inflow = lateral_inflow.flatten()
    else:
        logger.warning("lateral_inflow not found in state file, initializing with zeros")
        lateral_inflow = np.zeros(network_size)

    # Extract time
    time = pd.Timestamp(ds["time"].values).to_pydatetime()

    # Extract grid metadata
    resolution_x = float(ds.x.values[1] - ds.x.values[0]) if len(ds.x) > 1 else float(ds.attrs.get("resolution_x", 1.0))
    resolution_y = float(ds.y.values[1] - ds.y.values[0]) if len(ds.y) > 1 else float(ds.attrs.get("resolution_y", 1.0))
    xllcorner = float(ds.x.values[0])
    yllcorner = float(ds.y.values[0])
    crs = ds["crs"].attrs.get("crs_wkt", "") if "crs" in ds else ""

    metadata = {
        "shape": (nrows, ncols),
        "resolution": (resolution_x, resolution_y),
        "xllcorner": xllcorner,
        "yllcorner": yllcorner,
        "crs": crs,
    }

    # Close dataset
    ds.close()

    # Import here to avoid circular import
    from mobidic.core.simulation import SimulationState

    state = SimulationState(wc, wg, wp, ws, discharge, lateral_inflow)

    logger.success(f"State loaded: time={time.isoformat()}, grid={nrows}x{ncols}, reaches={len(discharge)}")

    return state, time, metadata

Examples

Saving states incrementally with StateWriter

from mobidic.io import StateWriter
from mobidic import load_config, load_gisdata
from datetime import datetime, timedelta

# Load configuration and data
config = load_config("config.yaml")
gisdata = load_gisdata("gisdata.nc", "network.parquet")

# Create StateWriter with flushing every 10 timesteps
with StateWriter(
    output_path="output/states.nc",
    grid_metadata=gisdata.metadata,
    network_size=len(gisdata.network),
    output_states=config.output_states,
    flushing=10,  # Flush every 10 timesteps (-1 = only at end)
    max_file_size=500.0,  # Create new chunk file when reaching 500 MB
    add_metadata={
        "simulation_version": "v1.0",
        "calibration_run": "baseline",
        "notes": "Calibration run with default parameters"
    }
) as writer:
    # Simulation loop
    current_time = datetime(2020, 1, 1)
    dt = timedelta(seconds=900)  # 15-minute timesteps

    for step in range(num_steps):
        # ... run simulation step ...
        # state = perform_simulation_step(...)

        # Append state to file (buffered)
        writer.append_state(state, current_time)
        current_time += dt

    # States are automatically flushed and file closed when exiting context
    # May create: states_001.nc, states_002.nc, states_003.nc, etc.

# Alternatively, manually manage the writer
writer = StateWriter(
    output_path="output/states.nc",
    grid_metadata=gisdata.metadata,
    network_size=len(gisdata.network),
    output_states=config.output_states,
    flushing=-1,  # Only flush at end
    max_file_size=500.0,
)

for step in range(num_steps):
    # ... simulation ...
    writer.append_state(state, current_time)
    current_time += dt

writer.close()

Saving only final state

from mobidic.io import StateWriter
from datetime import datetime

# Create writer with flushing=-1 (only flush at end)
writer = StateWriter(
    output_path="output/state_final.nc",
    grid_metadata=gisdata.metadata,
    network_size=len(gisdata.network),
    output_states=config.output_states,
    flushing=-1,
    add_metadata={"run_type": "final_state"}
)

# Only save the final state
writer.append_state(final_state, datetime(2020, 12, 31, 23, 45))
writer.close()

Loading state for warm start

from mobidic.io import load_state
from mobidic import Simulation, load_config, load_gisdata

# Load last timestep from multi-timestep file (default)
state, time, metadata = load_state(
    input_path="output/states.nc",
    network_size=1235  # Number of reaches in network
)

print(f"Loaded state at {time}")
print(f"Grid shape: {metadata['shape']}")
print(f"Mean capillary water: {state.wc.mean():.3f} m")
print(f"Mean discharge: {state.discharge.mean():.3f} m³/s")

# Load specific timestep (e.g., first timestep)
state_first, time_first, _ = load_state(
    input_path="output/states.nc",
    network_size=1235,
    time_index=0  # 0 = first, -1 = last (default)
)

# Load from single-timestep file
state_final, time_final, _ = load_state(
    input_path="output/state_final.nc",
    network_size=1235
)

# Load from chunked files (automatically detects chunk files)
# Even if you specify "states.nc", it will find and load "states_001.nc"
state_chunk, time_chunk, _ = load_state(
    input_path="output/states.nc",  # Auto-detects states_001.nc
    network_size=1235
)

# Load with config and gisdata for proper missing variable initialization
# If the state file is missing some variables, they will be initialized
# using the initial conditions from the config file. If config/gisdata are not
# provided, missing variables are set to zero.
config = load_config("config.yaml")
gisdata = load_gisdata("gisdata.nc", "network.parquet")

state_with_init, time, metadata = load_state(
    input_path="output/states.nc",
    network_size=1235,
    config=config,      # Optional: for missing variable initialization
    gisdata=gisdata     # Optional: for missing variable initialization
)

# Use state to initialize simulation
sim = Simulation(gisdata, forcing, config)
sim.state = state  # Override initial state

# Resume simulation from this state
results = sim.run("2020-06-15", "2020-12-31")

Handling missing variables

When loading a state file, some variables may be missing (e.g., if the file was saved with selective output). The load_state() function handles this automatically:

Without config/gisdata (default behavior):

# Missing variables are initialized with zeros
state, time, metadata = load_state("output/states.nc", network_size=1235)

# Grid variables (Wc, Wg, Ws, Wp) → initialized with zeros
# Network variables (discharge, lateral_inflow) → initialized with zeros

With config/gisdata (recommended for warm start):

from mobidic import load_config, load_gisdata

config = load_config("config.yaml")
gisdata = load_gisdata("gisdata.nc", "network.parquet")

# Missing variables are initialized using config initial conditions
state, time, metadata = load_state(
    "output/states.nc",
    network_size=1235,
    config=config,
    gisdata=gisdata
)

# Grid variables (Wc, Wg, Ws, Wp) → initialized from config.initial_conditions
#   - Wc: Wc0 × Wcsat (with capacity factors and Wg_Wc_tr transition)
#   - Wg: Wg0 × Wgsat (with capacity factors and Wg_Wc_tr transition)
#   - Ws: config.initial_conditions.Ws
#   - Wp: zeros (with NaN outside domain)
#   - All grid variables: NaN outside domain (where flow_acc is NaN)
# Network variables (discharge, lateral_inflow) → initialized with zeros

Example: Loading a partial state file

# Suppose a state file was saved with only Wc and Wg enabled
# (soil_surface=false in config), but you need Ws for the new simulation

config = load_config("config.yaml")
gisdata = load_gisdata("gisdata.nc", "network.parquet")

# Load state with config - Ws will be initialized from config.initial_conditions.Ws
state, time, metadata = load_state(
    "output/partial_state.nc",
    network_size=1235,
    config=config,
    gisdata=gisdata
)

# Now state.ws is properly initialized (not zeros) and ready for simulation
print(f"Ws initialized from config: {state.ws.mean():.6f} m")

# The Simulation class automatically uses config-based initialization
sim = Simulation(gisdata, forcing, config)
sim.set_initial_state(state_file="output/partial_state.nc")
# Missing variables are automatically initialized using config

This ensures that warm-start simulations always have properly initialized state variables, even when loading from files that don’t contain all variables.

Inspecting state files

import xarray as xr
import matplotlib.pyplot as plt

# Open multi-timestep state file
ds = xr.open_dataset("output/states.nc")

# Examine contents
print(ds)
print(f"\nVariables: {list(ds.data_vars)}")
print(f"Coordinates: {list(ds.coords)}")
print(f"Number of timesteps: {len(ds.time)}")
print(f"Time range: {ds.time.values[0]} to {ds.time.values[-1]}")

# Plot capillary water content at last timestep
if "Wc" in ds:
    fig, ax = plt.subplots(figsize=(10, 8))
    ds["Wc"].isel(time=-1).plot(ax=ax, cmap="Blues", cbar_kwargs={"label": "Wc [m]"})
    ax.set_title(f"Capillary Water Content at {ds.time.values[-1]}")
    plt.savefig("output/Wc_map.png")

# Plot time series of mean soil moisture
if "Wc" in ds:
    mean_wc = ds["Wc"].mean(dim=["x", "y"])
    plt.figure(figsize=(12, 4))
    mean_wc.plot()
    plt.ylabel("Mean Wc [m]")
    plt.title("Mean Capillary Water Content Over Time")
    plt.grid(True)
    plt.savefig("output/Wc_timeseries.png")

# Plot discharge along network at last timestep
if "discharge" in ds:
    plt.figure(figsize=(12, 4))
    plt.plot(ds["reach"], ds["discharge"].isel(time=-1), 'b-', linewidth=0.5)
    plt.xlabel("Reach ID")
    plt.ylabel("Discharge [m³/s]")
    plt.title(f"River Discharge at {ds.time.values[-1]}")
    plt.grid(True)
    plt.savefig("output/discharge_profile.png")

# Plot discharge time series for a specific reach
if "discharge" in ds:
    reach_id = 500  # Example reach
    plt.figure(figsize=(12, 4))
    ds["discharge"].isel(reach=reach_id).plot()
    plt.ylabel("Discharge [m³/s]")
    plt.title(f"Discharge Time Series for Reach {reach_id}")
    plt.grid(True)
    plt.savefig("output/discharge_ts.png")

ds.close()

Comparing states

import xarray as xr
import numpy as np
import matplotlib.pyplot as plt

# Load two state files to compare
ds1 = xr.open_dataset("output/states_run1.nc")
ds2 = xr.open_dataset("output/states_run2.nc")

# Compare capillary water at last timestep
if "Wc" in ds1 and "Wc" in ds2:
    wc1_final = ds1["Wc"].isel(time=-1)
    wc2_final = ds2["Wc"].isel(time=-1)
    diff = wc1_final - wc2_final

    print(f"Wc difference statistics (final timestep):")
    print(f"  Mean: {float(diff.mean()):.6f} m")
    print(f"  Std: {float(diff.std()):.6f} m")
    print(f"  Max abs diff: {float(np.abs(diff).max()):.6f} m")

    # Plot difference
    fig, ax = plt.subplots(figsize=(10, 8))
    diff.plot(ax=ax, cmap="RdBu_r", center=0)
    ax.set_title("Capillary Water Difference (Run1 - Run2)")
    plt.savefig("output/Wc_diff.png")

# Compare time series
if "Wc" in ds1 and "Wc" in ds2:
    mean_wc1 = ds1["Wc"].mean(dim=["x", "y"])
    mean_wc2 = ds2["Wc"].mean(dim=["x", "y"])

    plt.figure(figsize=(12, 4))
    mean_wc1.plot(label="Run 1")
    mean_wc2.plot(label="Run 2")
    plt.ylabel("Mean Wc [m]")
    plt.title("Mean Capillary Water Content Comparison")
    plt.legend()
    plt.grid(True)
    plt.savefig("output/Wc_comparison.png")

ds1.close()
ds2.close()

Working with chunked state files

When saving very large simulations, StateWriter automatically splits the output into multiple chunk files when the size limit is reached:

from mobidic.io import StateWriter, load_state
from datetime import datetime, timedelta

# Create writer with 500 MB chunk size and regular flushing
# NOTE: For chunking to work effectively, flushing must be > 0
writer = StateWriter(
    output_path="output/states.nc",
    grid_metadata=gisdata.metadata,
    network_size=len(gisdata.network),
    output_states=config.output_states,
    flushing=100,  # Flush every 100 steps (required for chunking)
    max_file_size=500.0,  # 500 MB per chunk
)

# Run long simulation that generates > 500 MB of data
for step in range(10000):
    writer.append_state(state, current_time)
    current_time += dt

writer.close()

# This creates multiple files:
# - states_001.nc (500 MB)
# - states_002.nc (500 MB)
# - states_003.nc (200 MB)

# Loading automatically detects and uses the first chunk
state, time, metadata = load_state(
    input_path="output/states.nc",  # Automatically finds states_001.nc
    network_size=1235
)

# Or load from a specific chunk directly
state_chunk2, time_chunk2, _ = load_state(
    input_path="output/states_002.nc",  # Load from second chunk
    network_size=1235,
    time_index=-1  # Last timestep in this chunk
)

Important notes about chunking:

  • Files may slightly exceed max_file_size by up to one flush worth of data
  • Size check occurs before writing each flush batch
  • For effective chunking, set flushing > 0 (e.g., 10, 50, 100)
  • If flushing=-1 (flush only at end), all data writes in one operation and chunking won’t work as expected
  • Chunk files are numbered sequentially: _001.nc, _002.nc, _003.nc, etc.
  • Existing chunk files are automatically removed when creating a new StateWriter with the same base path

Configuration control

State saving is controlled by the configuration file:

output_states:
  discharge: true               # Save river discharge
  soil_capillary: true          # Save capillary water (Wc)
  soil_gravitational: true      # Save gravitational water (Wg)
  soil_plant: true              # Save plant/canopy water (Wp)
  soil_surface: true            # Save surface water (Ws)
  # Other state outputs (not yet implemented):
  reservoir_states: false
  surface_temperature: false
  ground_temperature: false
  aquifer_head: false
  et_prec: false

output_states_settings:
  output_format: "netCDF"       # Format (currently only netCDF)
  output_states: "final"        # "all", "final", or "list"
  output_interval: 3600         # Interval in seconds (for "all")
  output_list: [0, 100, 200]    # Timestep indices (for "list")

File structure

NetCDF state files contain:

Dimensions

  • time: Unlimited dimension for multiple timesteps
  • x: Grid columns
  • y: Grid rows
  • reach: Number of reaches (if discharge enabled)

Coordinates

  • time(time): Simulation time [datetime64]
  • x(x): X coordinates [m]
  • y(y): Y coordinates [m]
  • reach(reach): Reach indices [dimensionless]

Data variables

  • Wc(time, y, x): Capillary water content [m] (if enabled)
  • Wg(time, y, x): Gravitational water content [m] (if enabled)
  • Wp(time, y, x): Plant/canopy water content [m] (optional, if enabled)
  • Ws(time, y, x): Surface water content [m] (if enabled)
  • discharge(time, reach): River discharge [m³/s] (if enabled)
  • lateral_inflow(time, reach): Lateral inflow to reaches [m³/s] (if enabled)
  • crs(): Grid mapping (CRS metadata, scalar)

Global attributes

  • title: “MOBIDIC simulation states”
  • source: “MOBIDICpy simulation”
  • Conventions: “CF-1.12”
  • history: Creation timestamp with MOBIDICpy version
  • Custom metadata from add_metadata parameter

Chunked files

When a simulation produces very large state files, StateWriter automatically creates multiple chunk files:

  • Naming convention: states_001.nc, states_002.nc, states_003.nc, etc.
  • Size limit: Each chunk is limited to max_file_size MB (default: 500 MB)
  • Independent files: Each chunk is a complete, self-contained NetCDF file with its own timesteps
  • Sequential ordering: Chunks are created in temporal order as the simulation progresses
  • Automatic detection: load_state() automatically finds states_001.nc when you specify states.nc

Design features

  • CF-1.12 compliant: Follows Climate and Forecast metadata conventions
  • Incremental writing: StateWriter appends states to a single file with unlimited time dimension
  • Automatic chunking: Splits large outputs into multiple files when size limit is reached (configurable via max_file_size)
  • Memory efficient: Configurable buffering with periodic flushing to disk
  • Fast append mode: Uses netCDF4 library for efficient appending (avoids read-concatenate-write)
  • Compression: zlib compression (level 4) for efficient storage
  • Selective saving: Only configured state variables are saved
  • Context manager support: Automatic resource cleanup with with statement
  • Multi-timestep support: Can load any timestep from multi-timestep files
  • Chunk file detection: load_state() automatically finds chunk files (e.g., _001.nc) when base path doesn’t exist
  • CRS preservation: Coordinate Reference System stored as WKT
  • Robust error handling: Clear warnings for missing data or size mismatches

Error handling

The module provides clear error messages for common issues:

from mobidic.io import load_state

try:
    state, time, metadata = load_state("missing_file.nc", 1235)
except FileNotFoundError as e:
    print(f"Error: {e}")

try:
    state, time, metadata = load_state("state.nc", 5000)  # Wrong network size
except ValueError as e:
    print(f"Warning: {e}")  # Warning about size mismatch, but loads anyway

References

File format:

  • NetCDF4 with CF-1.12 conventions
  • Uses xarray for reading/writing
  • Compatible with standard NetCDF tools (ncdump, ncview, Panoply)

Related modules:

  • Simulation - Uses state I/O for warm start and final states
  • Report I/O - Time series output (Parquet format)