Skip to content

Preprocessing workflow

The preprocessing module provides a high-level function that runs the complete MOBIDIC preprocessing, from raw GIS data to inputs that can be directly used in the simulation.

Overview

The preprocessing workflow handles:

  1. Loading and validating configuration
  2. Reading all raster data (DTM, flow direction, soil parameters, etc.)
  3. Processing the river network (topology, ordering, routing parameters)
  4. Computing hillslope-reach mapping
  5. Processing reservoirs (optional: polygons, stage-storage curves, regulation curves/schedules)
  6. Organizing all data into a consolidated GISData object

This is the recommended entry point for most users, as it handles all preprocessing steps automatically based on the configuration file.

Functions

Main preprocessing function

mobidic.preprocessing.preprocessor.run_preprocessing(config)

Run complete preprocessing workflow.

This function orchestrates the entire preprocessing pipeline: 1. Load raster grids (DTM, flow direction, soil parameters, etc.) 2. Apply grid decimation if needed (decimation factor > 1) 3. Convert flow direction to MOBIDIC notation 4. Process river network (topology, Strahler ordering, routing parameters) 5. Compute hillslope cells for each reach 6. Map hillslope cells to reaches

Parameters:

Name Type Description Default
config MOBIDICConfig

MOBIDIC configuration with preprocessing settings

required

Returns:

Type Description
GISData

GISData object containing all preprocessed spatial data

Examples:

>>> from mobidic import load_config, run_preprocessing
>>> config = load_config("Arno.yaml")
>>> gisdata = run_preprocessing(config)
>>> gisdata.save("Arno_gisdata.nc", "Arno_network.parquet")
Source code in mobidic/preprocessing/preprocessor.py
def run_preprocessing(config: MOBIDICConfig) -> GISData:
    """Run complete preprocessing workflow.

    This function orchestrates the entire preprocessing pipeline:
    1. Load raster grids (DTM, flow direction, soil parameters, etc.)
    2. Apply grid decimation if needed (decimation factor > 1)
    3. Convert flow direction to MOBIDIC notation
    4. Process river network (topology, Strahler ordering, routing parameters)
    5. Compute hillslope cells for each reach
    6. Map hillslope cells to reaches

    Args:
        config: MOBIDIC configuration with preprocessing settings

    Returns:
        GISData object containing all preprocessed spatial data

    Examples:
        >>> from mobidic import load_config, run_preprocessing
        >>> config = load_config("Arno.yaml")
        >>> gisdata = run_preprocessing(config)
        >>> gisdata.save("Arno_gisdata.nc", "Arno_network.parquet")
    """
    logger.info("=" * 80)
    logger.info(f"MOBIDICpy v{__version__} - PREPROCESSING")
    logger.info("=" * 80)
    logger.info(f"Basin: {config.basin.id}")
    logger.info(f"Parameter set: {config.basin.paramset_id}")
    logger.info("")

    # Step 1: Load required raster grids
    logger.info("Step 1/7: Loading raster grids")
    logger.info("-" * 80)

    grids = {}
    metadata = {}

    # Load DTM (required)
    logger.debug(f"Loading DTM from {config.raster_files.dtm}")
    dtm_result = grid_to_matrix(config.raster_files.dtm)
    grids["dtm"] = dtm_result["data"]
    xllcorner = dtm_result["xllcorner"]
    yllcorner = dtm_result["yllcorner"]
    cellsize = dtm_result["cellsize"]
    crs = dtm_result["crs"]

    # Store metadata extracted from DTM
    metadata["xllcorner"] = xllcorner
    metadata["yllcorner"] = yllcorner
    metadata["cellsize"] = cellsize
    metadata["shape"] = grids["dtm"].shape
    metadata["resolution"] = (cellsize, cellsize)
    metadata["nodata"] = np.nan
    metadata["crs"] = crs

    # Load flow direction (required)
    logger.debug(f"Loading flow direction from {config.raster_files.flow_dir}")
    flow_dir_result = grid_to_matrix(config.raster_files.flow_dir)
    grids["flow_dir"] = flow_dir_result["data"]

    # Load flow accumulation (required)
    logger.debug(f"Loading flow accumulation from {config.raster_files.flow_acc}")
    flow_acc_result = grid_to_matrix(config.raster_files.flow_acc)
    flow_acc_data = flow_acc_result["data"]
    if np.any((flow_acc_data[~np.isnan(flow_acc_data)] < 1)):
        flow_acc_data[~np.isnan(flow_acc_data)] += 1
        logger.info("Flow accumulation values were < 1 and have been incremented by 1.")
    grids["flow_acc"] = flow_acc_data

    # Get index of the cell with maximum flow accumulation
    max_acc_idx = np.unravel_index(np.nanargmax(flow_acc_data), flow_acc_data.shape)

    # Set outlet value to -1 in the flow direction grid
    grids["flow_dir"][max_acc_idx] = -1

    # Load soil parameters (required)
    logger.debug(f"Loading Wc0 from {config.raster_files.Wc0}")
    wc0_result = grid_to_matrix(config.raster_files.Wc0)
    grids["Wc0"] = wc0_result["data"] * 0.001  # Convert from mm to m

    logger.debug(f"Loading Wg0 from {config.raster_files.Wg0}")
    wg0_result = grid_to_matrix(config.raster_files.Wg0)
    grids["Wg0"] = wg0_result["data"] * 0.001  # Convert from mm to m

    logger.debug(f"Loading ks from {config.raster_files.ks}")
    ks_result = grid_to_matrix(config.raster_files.ks)
    grids["ks"] = ks_result["data"] * 0.001 / 3600  # Convert from mm/h to m/s

    # Check ranges of ks values (ks_min and ks_max in mm/h from config)
    ks_min = config.parameters.soil.ks_min
    ks_max = config.parameters.soil.ks_max
    if ks_min is not None:
        ks_min = ks_min * 0.001 / 3600
        if np.any(grids["ks"] < ks_min):
            logger.warning(f"Some ks values are below the minimum threshold: {ks_min} m/s")

    if ks_max is not None:
        ks_max = ks_max * 0.001 / 3600
        if np.any(grids["ks"] > ks_max):
            logger.warning(f"Some ks values are above the maximum threshold: {ks_max} m/s")

    if ks_min is not None and ks_max is not None:
        grids["ks"] = np.clip(grids["ks"], ks_min, ks_max)

    # Load optional raster files
    optional_rasters = {
        "kf": config.raster_files.kf,
        "CH": config.raster_files.CH,
        "Alb": config.raster_files.Alb,
        "Ma": config.raster_files.Ma,
        "Mf": config.raster_files.Mf,
        "gamma": config.raster_files.gamma,
        "kappa": config.raster_files.kappa,
        "beta": config.raster_files.beta,
        "alpha": config.raster_files.alpha,
    }

    for name, path in optional_rasters.items():
        if path is not None:
            logger.debug(f"Loading {name} from {path}")
            result = grid_to_matrix(path)
            grids[name] = result["data"]
        else:
            logger.debug(f"Using default value for {name} (no raster provided)")
            # Create grid filled with default parameter value
            default_value = _get_default_parameter_value(name, config)
            grids[name] = np.full_like(grids["dtm"], default_value, dtype=float)

    # Corine Land Cover (CLC) raster: optional categorical grid; when absent the
    # simulation falls back to the scalar parameters.soil.Kc. We keep it out of
    # the numeric optional_rasters loop because it carries class codes and must
    # not be filled with a default flux value nor averaged during decimation.
    if config.raster_files.CLC is not None:
        logger.debug(f"Loading CLC from {config.raster_files.CLC}")
        clc_result = grid_to_matrix(config.raster_files.CLC)
        grids["CLC"] = clc_result["data"]
    else:
        logger.debug("No CLC raster provided; Kc will default to parameters.soil.Kc")

    # Calculate alpsur (surface routing parameter based on slope)
    # From buildgis_mysql_include.m lines 647-651
    logger.debug("Calculating alpsur from DTM slopes")
    slopes = _calculate_slopes(grids["dtm"], cellsize, pmin=1e-5)
    alpsur = np.sqrt(slopes)
    alpsur = alpsur / np.nanmean(alpsur)
    grids["alpsur"] = alpsur

    logger.success(f"Loaded {len(grids)} raster grids. Shape: {grids['dtm'].shape}, cellsize: {cellsize} m")
    logger.info("")

    # Step 2: Apply grid decimation if needed
    logger.info("Step 2/7: Applying grid decimation")
    logger.info("-" * 80)

    decimation_factor = config.simulation.decimation

    if decimation_factor > 1:
        logger.info(f"Decimating grids by factor {decimation_factor}")

        # Decimate most grids using simple averaging. flow_dir/flow_acc follow
        # a dedicated path below; CLC holds discrete class codes so averaging
        # is skipped and the upper-left sub-cell of each block is taken.
        categorical_grids = {"CLC"}
        for name in grids.keys():
            if name in ("flow_dir", "flow_acc"):
                continue
            if name in categorical_grids:
                logger.debug(f"Decimating {name} (nearest)")
                grids[name] = grids[name][::decimation_factor, ::decimation_factor]
                continue
            logger.debug(f"Decimating {name}")
            grids[name] = decimate_raster(grids[name], decimation_factor)

        # Decimate flow direction and accumulation
        logger.debug("Decimating flow_dir and flow_acc")
        grids["flow_dir"], grids["flow_acc"] = decimate_flow_direction(
            grids["flow_dir"], grids["flow_acc"], decimation_factor
        )

        # Update metadata
        metadata["shape"] = grids["dtm"].shape
        metadata["resolution"] = (
            metadata["resolution"][0] * decimation_factor,
            metadata["resolution"][1] * decimation_factor,
        )

        logger.success(f"Grid decimation complete. New shape: {metadata['shape']}")
    else:
        logger.info("No grid decimation applied (decimation factor = 1)")

    logger.info("")

    # Step 3: Convert flow direction to MOBIDIC notation
    logger.info("Step 3/7: Converting flow direction to MOBIDIC notation")
    logger.info("-" * 80)

    flow_dir_type = config.raster_settings.flow_dir_type

    grids["flow_dir"] = convert_to_mobidic_notation(grids["flow_dir"], from_notation=flow_dir_type)

    logger.success("Flow direction conversion complete")
    logger.info("")

    # Step 4: Process river network
    logger.info("Step 4/7: Processing river network")
    logger.info("-" * 80)

    shapefile_path = config.vector_files.river_network.shp

    routing_params = {
        "wcel": config.parameters.routing.wcel,
        "Br0": config.parameters.routing.Br0,
        "NBr": config.parameters.routing.NBr,
        "n_Man": config.parameters.routing.n_Man,
    }

    network = process_river_network(
        shapefile_path=shapefile_path,
        join_single_tributaries=True,
        routing_params=routing_params,
    )

    logger.info("")

    # Step 5: Compute hillslope cells for each reach
    logger.info("Step 5/7: Computing hillslope cells")
    logger.info("-" * 80)

    network = compute_hillslope_cells(
        network=network,
        grid_array=grids["flow_dir"],
        xllcorner=xllcorner,
        yllcorner=yllcorner,
        cellsize=metadata["resolution"][0],
        densify_step=10.0,
    )

    logger.info("")

    # Step 6: Map hillslope cells to reaches
    logger.info("Step 6/7: Mapping hillslope to reaches")
    logger.info("-" * 80)

    hillslope_reach_map = map_hillslope_to_reach(
        network=network,
        flowdir_array=grids["flow_dir"],
    )

    logger.info("")

    # Step 7: Process reservoirs (optional)
    logger.info("Step 7/7: Processing reservoirs")
    logger.info("-" * 80)

    reservoirs = None
    if config.parameters.reservoirs.res_shape is not None:
        try:
            reservoirs = process_reservoirs(
                shapefile_path=config.parameters.reservoirs.res_shape,
                stage_storage_path=config.parameters.reservoirs.stage_storage,
                regulation_curves_path=config.parameters.reservoirs.regulation_curves,
                regulation_schedule_path=config.parameters.reservoirs.regulation_schedule,
                initial_volumes_path=config.initial_conditions.reservoir_volumes,
                network=network,
                grid_shape=grids["dtm"].shape,
                xllcorner=xllcorner,
                yllcorner=yllcorner,
                cellsize=metadata["resolution"][0],
            )
        except Exception as e:
            logger.error(f"Failed to process reservoirs: {e}")
            logger.warning("Continuing without reservoirs")
            reservoirs = None
    else:
        logger.info("No reservoirs configured, skipping")

    gisdata = GISData(
        grids=grids,
        metadata=metadata,
        network=network,
        hillslope_reach_map=hillslope_reach_map,
        config=config,
        reservoirs=reservoirs,
    )

    logger.info("")
    logger.success("Preprocessing completed successfully")
    logger.info("")

    return gisdata

Reservoir preprocessing function

mobidic.preprocessing.reservoirs.process_reservoirs(shapefile_path, stage_storage_path, regulation_curves_path, regulation_schedule_path, initial_volumes_path, network, grid_shape, xllcorner, yllcorner, cellsize)

Process reservoir data from input files.

This function orchestrates the complete reservoir preprocessing: 1. Read reservoir polygons from shapefile 2. Load stage-storage curves 3. Load regulation curves 4. Load regulation schedules 5. Load initial volumes 6. Map reservoirs to grid and network

Translated from MATLAB: buildgis_mysql_include.m (lines 594-740)

Parameters:

Name Type Description Default
shapefile_path str | Path

Path to reservoir polygon shapefile

required
stage_storage_path str | Path

Path to stage-storage CSV

required
regulation_curves_path str | Path

Path to regulation curves CSV

required
regulation_schedule_path str | Path

Path to regulation schedule CSV

required
initial_volumes_path Optional[str | Path]

Path to CSV with initial volumes (columns: ‘reservoir_id’, ‘volume_m3’). If None, initial volumes are auto-calculated as 100% capacity (volume at z_max)

required
network GeoDataFrame

Processed river network GeoDataFrame

required
grid_shape tuple[int, int]

Shape of computational grid (nrows, ncols)

required
xllcorner float

X coordinate of lower-left corner [m]

required
yllcorner float

Y coordinate of lower-left corner [m]

required
cellsize float

Grid cell size [m]

required

Returns:

Type Description
Reservoirs

Reservoirs object with processed data

Source code in mobidic/preprocessing/reservoirs.py
def process_reservoirs(
    shapefile_path: str | Path,
    stage_storage_path: str | Path,
    regulation_curves_path: str | Path,
    regulation_schedule_path: str | Path,
    initial_volumes_path: Optional[str | Path],
    network: gpd.GeoDataFrame,
    grid_shape: tuple[int, int],
    xllcorner: float,
    yllcorner: float,
    cellsize: float,
) -> Reservoirs:
    """Process reservoir data from input files.

    This function orchestrates the complete reservoir preprocessing:
    1. Read reservoir polygons from shapefile
    2. Load stage-storage curves
    3. Load regulation curves
    4. Load regulation schedules
    5. Load initial volumes
    6. Map reservoirs to grid and network

    Translated from MATLAB: buildgis_mysql_include.m (lines 594-740)

    Args:
        shapefile_path: Path to reservoir polygon shapefile
        stage_storage_path: Path to stage-storage CSV
        regulation_curves_path: Path to regulation curves CSV
        regulation_schedule_path: Path to regulation schedule CSV
        initial_volumes_path: Path to CSV with initial volumes (columns: 'reservoir_id', 'volume_m3').
            If None, initial volumes are auto-calculated as 100% capacity (volume at z_max)
        network: Processed river network GeoDataFrame
        grid_shape: Shape of computational grid (nrows, ncols)
        xllcorner: X coordinate of lower-left corner [m]
        yllcorner: Y coordinate of lower-left corner [m]
        cellsize: Grid cell size [m]

    Returns:
        Reservoirs object with processed data
    """
    logger.info("Processing reservoir data")
    logger.info("-" * 80)

    # Read reservoir polygons
    logger.debug(f"Reading reservoir shapefile: {shapefile_path}")
    res_polygons = gpd.read_file(shapefile_path)
    logger.info(f"Found {len(res_polygons)} reservoirs in shapefile")

    if len(res_polygons) == 0:
        logger.warning("No reservoirs found in shapefile")
        return Reservoirs([], metadata={"crs": res_polygons.crs})

    # Read stage-storage curves
    logger.debug(f"Reading stage-storage curves: {stage_storage_path}")
    stage_storage = pd.read_csv(stage_storage_path)

    # Read regulation curves
    logger.debug(f"Reading regulation curves: {regulation_curves_path}")
    regulation_curves = pd.read_csv(regulation_curves_path)

    # Read regulation schedule
    logger.debug(f"Reading regulation schedule: {regulation_schedule_path}")
    regulation_schedule = pd.read_csv(regulation_schedule_path)
    regulation_schedule["start_date"] = pd.to_datetime(regulation_schedule["start_date"])
    regulation_schedule["end_date"] = pd.to_datetime(regulation_schedule["end_date"])

    # Read initial volumes (optional)
    if initial_volumes_path is not None:
        logger.debug(f"Reading initial volumes: {initial_volumes_path}")
        initial_volumes = pd.read_csv(initial_volumes_path)
        volumes_dict = dict(zip(initial_volumes["reservoir_id"], initial_volumes["volume_m3"]))
        logger.info(f"Loaded initial volumes for {len(volumes_dict)} reservoirs from CSV")
    else:
        volumes_dict = {}
        logger.info("No initial volumes CSV provided, will auto-calculate from stage-storage curves")

    # Process each reservoir
    reservoirs = []
    for _, res_row in res_polygons.iterrows():
        res_id = res_row["id"]
        logger.debug(f"Processing reservoir {res_id}: {res_row.get('name', 'Unnamed')}")

        # Get reservoir polygon geometry
        polygon = res_row.geometry

        # Get z_max from shapefile
        z_max = res_row["zmax"]

        # Get stage-storage curve for this reservoir
        ss_curve = stage_storage[stage_storage["reservoir_id"] == res_id].copy()
        ss_curve = ss_curve.sort_values("stage_m")

        if len(ss_curve) == 0:
            logger.warning(f"No stage-storage curve found for reservoir {res_id}, skipping")
            continue

        # Drop reservoir_id column as it's used only for mapping
        ss_curve = ss_curve.drop(columns=["reservoir_id"])

        # Get initial volume: try CSV first, then auto-calculate from curve
        initial_volume = volumes_dict.get(res_id)
        if initial_volume is None:
            # Auto-calculate from stage-storage curve
            if len(ss_curve) > 0:
                initial_volume = _interpolate_volume_at_stage(ss_curve, z_max)
                logger.debug(
                    f"Auto-calculated initial volume for reservoir {res_id}: "
                    f"{initial_volume:.0f}m³ at z_max={z_max:.2f}m"
                )
            else:
                logger.warning(f"No stage-storage curve available for reservoir {res_id}, using 0m³")
                initial_volume = 0.0

        # Get regulation curves for this reservoir
        reg_curves_data = regulation_curves[regulation_curves["reservoir_id"] == res_id]
        reg_names = reg_curves_data["regulation_name"].unique()

        # Get regulation schedule for this reservoir
        reg_schedule = regulation_schedule[regulation_schedule["reservoir_id"] == res_id].copy()
        reg_schedule = reg_schedule.sort_values("start_date")

        # Build regulation curves arrays
        n_periods = len(reg_schedule)
        if n_periods == 0:
            logger.warning(f"No regulation schedule found for reservoir {res_id}, skipping")
            continue

        # Find maximum number of points across all regulation curves
        max_points = 0
        reg_curves_dict = {}
        for reg_name in reg_names:
            curve = reg_curves_data[reg_curves_data["regulation_name"] == reg_name].copy()
            curve = curve.sort_values("stage_m")
            reg_curves_dict[reg_name] = curve
            max_points = max(max_points, len(curve))

        # Build lawH and lawQ arrays (n_periods × max_points)
        lawH = np.full((n_periods, max_points), np.nan)
        lawQ = np.full((n_periods, max_points), np.nan)
        period_times = []

        for i, (_, sched_row) in enumerate(reg_schedule.iterrows()):
            reg_name = sched_row["regulation_name"]
            period_times.append(sched_row["start_date"])

            if reg_name in reg_curves_dict:
                curve = reg_curves_dict[reg_name]
                n_pts = len(curve)
                lawH[i, :n_pts] = curve["stage_m"].values
                lawQ[i, :n_pts] = curve["discharge_m3s"].values
            else:
                logger.warning(f"Regulation curve '{reg_name}' not found for reservoir {res_id}")

        # Rasterize polygon to get basin pixels
        logger.debug(f"Rasterizing reservoir {res_id} polygon")
        basin_pixels = _rasterize_reservoir_polygon(polygon, grid_shape, xllcorner, yllcorner, cellsize)
        logger.debug(f"Reservoir {res_id} basin: {len(basin_pixels)} cells")

        # Create temporary reservoir object for reach finding
        temp_reservoir = Reservoir(id=res_id, z_max=z_max, geometry=polygon)

        # Find inlet and outlet reaches
        logger.debug(f"Finding inlet/outlet reaches for reservoir {res_id}")
        inlet_reaches, outlet_reach = _find_reservoir_reaches(temp_reservoir, network)

        if outlet_reach >= 0:
            logger.debug(f"Reservoir {res_id} outlet reach: {outlet_reach}")
            if len(inlet_reaches) > 0:
                logger.debug(f"Reservoir {res_id} inlet reaches: {inlet_reaches}")
        else:
            logger.warning(f"No outlet reach found for reservoir {res_id}")

        # Convert arrays to dicts for storage (use zero-padded string keys for Parquet compatibility)
        period_times_dict = {f"{i:03d}": pd.Timestamp(t).isoformat() for i, t in enumerate(period_times)}
        stage_discharge_h_dict = {f"{i:03d}": lawH[i, :].tolist() for i in range(n_periods)}
        stage_discharge_q_dict = {f"{i:03d}": lawQ[i, :].tolist() for i in range(n_periods)}

        # Create Reservoir object
        reservoir = Reservoir(
            id=res_id,  # Use original shapefile id
            z_max=z_max,
            basin_pixels=basin_pixels,
            inlet_reaches=inlet_reaches,
            outlet_reach=outlet_reach,
            period_times=period_times_dict,
            stage_discharge_h=stage_discharge_h_dict,
            stage_discharge_q=stage_discharge_q_dict,
            initial_volume=initial_volume,
            geometry=polygon,
            name=res_row.get("name", ""),
            date_start=pd.to_datetime(res_row.get("date_start", None)),
            stage_storage_curve=ss_curve,
        )

        reservoirs.append(reservoir)

    logger.success(f"Processed {len(reservoirs)} reservoirs")

    return Reservoirs(reservoirs, metadata={"crs": res_polygons.crs})

Workflow stages

The preprocessing pipeline consists of up to seven stages (stages 6-7 are optional if reservoirs are configured):

Stage 1: configuration loading

Loads and validates the YAML configuration file:

config = load_config(config_path)

All subsequent steps are driven by paths and parameters in this configuration.

Stage 2: raster data loading

Reads all raster files specified in the configuration:

  • Required rasters: DTM, flow direction, flow accumulation
  • Soil parameters: Wc0, Wg0, ks, and optional kf
  • Energy parameters: CH, Alb (if energy balance enabled)
  • Flow coefficients: gamma, kappa, beta, alpha (if provided as rasters)

All rasters are: - Validated for consistent shape, resolution, and CRS - Converted to numpy arrays with NaN for nodata - Stored in the GISData.grids dictionary

Stage 3: river network processing

Processes the river network shapefile:

network = process_river_network(
    shapefile_path=config.vector_files.river_network.shp,
    join_single_tributaries=True,
    routing_params={
        "wcel": config.parameters.routing.wcel,
        "Br0": config.parameters.routing.Br0,
        "NBr": config.parameters.routing.NBr,
        "n_Man": config.parameters.routing.n_Man,
    }
)

This step: - Builds network topology - Enforces binary tree structure - Computes Strahler ordering - Calculates routing parameters - Determines calculation order

Stage 4: hillslope-reach mapping

Maps hillslope grid cells to river reaches:

network = compute_hillslope_cells(network, grid_path)
reach_map = map_hillslope_to_reach(network, flowdir_path, flow_dir_type)

This establishes the connection between the distributed grid and the river network for lateral inflow routing.

Stage 5: data consolidation

Packages everything into a GISData object:

gisdata = GISData()
gisdata.grids = all_grid_data
gisdata.network = processed_network
gisdata.metadata = spatial_reference_info

The GISData object can then be saved for reuse or passed directly to the simulation.

Stage 6: reservoir preprocessing (optional)

If reservoirs are configured (config.parameters.reservoirs.res_shape is set), process reservoir data:

reservoirs = process_reservoirs(
    res_shape_path=config.parameters.reservoirs.res_shape,
    stage_storage_path=config.parameters.reservoirs.stage_storage,
    regulation_curves_path=config.parameters.reservoirs.regulation_curves,
    regulation_schedule_path=config.parameters.reservoirs.regulation_schedule,
    initial_volumes_path=config.initial_conditions.reservoir_volumes,
    grid_transform=gisdata.metadata['transform'],
    grid_shape=gisdata.metadata['shape'],
    network=gisdata.network,
)
gisdata.reservoirs = reservoirs

This step: - Reads reservoir polygon shapefile - Rasterizes polygons to identify basin pixels - Loads stage-storage curves from CSV - Loads regulation curves and schedules from CSV - Identifies inlet/outlet reaches by network topology - Auto-calculates initial volumes from z_max if not provided - Consolidates all reservoir data into Reservoirs container

Stage 7: reservoir I/O (optional)

Save/load processed reservoir data:

# Save reservoirs to GeoParquet
gisdata.save(
    gisdata_path=config.paths.gisdata,
    network_path=config.paths.network,
    reservoirs_path=config.paths.reservoirs,
)

# Load reservoirs from GeoParquet
gisdata = GISData.load(
    gisdata_path=config.paths.gisdata,
    network_path=config.paths.network,
    reservoirs_path=config.paths.reservoirs,
)

Complete example

from mobidic import load_config, run_preprocessing

# Load configuration
config = load_config("config.yaml")

# Run complete preprocessing
gisdata = run_preprocessing(config)

# Inspect results
print(f"Basin: {config.basin.id}")
print(f"Grid shape: {gisdata.metadata['shape']}")
print(f"Resolution: {gisdata.metadata['resolution']} m")
print(f"CRS: {gisdata.metadata['crs']}")
print(f"Grid variables: {list(gisdata.grids.keys())}")
print(f"Network reaches: {len(gisdata.network)}")
print(f"Strahler orders: {sorted(gisdata.network['strahler_order'].unique())}")

# Save for later use
gisdata.save(
    gisdata_path=config.paths.gisdata,
    network_path=config.paths.network
)

Note: To configure logging behavior (level, output file, etc.), see the Logging section in the Configuration reference.

Configuration requirements

The preprocessing workflow requires the following configuration sections:

Required paths

paths:
  meteodata: path/to/meteo.nc
  gisdata: path/to/gisdata.nc      # Output path
  network: path/to/network.parquet # Output path
  states: path/to/states/          # For simulation states
  output: path/to/output/          # For simulation outputs

Required vector files

vector_files:
  river_network:
    shp: path/to/river_network.shp
    id_field: REACH_ID  # Optional, for tracking original IDs

Required raster files

raster_files:
  dtm: path/to/dtm.tif
  flow_dir: path/to/flowdir.tif
  flow_acc: path/to/flowacc.tif
  Wc0: path/to/wc0.tif  # Capillary capacity
  Wg0: path/to/wg0.tif  # Gravitational capacity
  ks: path/to/ks.tif    # Hydraulic conductivity

Required raster settings

raster_settings:
  flow_dir_type: Grass  # or Arc

Required routing parameters

parameters:
  routing:
    method: Linear
    wcel: 5.0      # Wave celerity (m/s)
    Br0: 1.0       # Base channel width (m)
    NBr: 1.5       # Channel width exponent
    n_Man: 0.03    # Manning's n (s/m^(1/3))

Optional reservoir parameters

parameters:
  reservoirs:
    res_shape: path/to/reservoirs.shp                    # Reservoir polygon shapefile
    stage_storage: path/to/stage_storage.csv             # Stage-storage curves
    regulation_curves: path/to/regulation_curves.csv     # Stage-discharge curves
    regulation_schedule: path/to/regulation_schedule.csv # Regulation period schedule

initial_conditions:
  reservoir_volumes: path/to/initial_volumes.csv  # Optional (auto-calculated if omitted)

paths:
  reservoirs: path/to/reservoirs.parquet  # Output path for consolidated reservoir data

output_states:
  reservoir_states: true  # Enable reservoir state output

CSV file formats:

  • stage_storage.csv: Columns: reservoir_id, stage_m, volume_m3
  • regulation_curves.csv: Columns: reservoir_id, regulation_name, stage_m, discharge_m3s
  • regulation_schedule.csv: Columns: reservoir_id, start_date, end_date, regulation_name
  • initial_volumes.csv: Columns: reservoir_id, volume_m3 (optional, defaults to auto-calculation from z_max)

See the sample configuration for a complete example.

Error handling

The preprocessing workflow performs comprehensive validation:

Configuration validation

  • All required paths and parameters are present
  • Numeric parameters are within valid ranges
  • File paths exist (if validation enabled)

Spatial consistency

  • All rasters have the same shape
  • All rasters have the same resolution
  • All rasters have compatible CRS
  • Network CRS matches raster CRS (or is reprojected)

Data quality

  • Rasters have valid nodata handling
  • Network has no topological errors
  • Flow direction is valid (no invalid direction codes)
  • Flow accumulation is consistent with flow direction

Advanced usage

Skipping preprocessing stages

If you already have some preprocessed data, you can skip certain stages and load existing files:

from mobidic import load_network, load_gisdata, GISData

# Option 1: Load complete preprocessed data
gisdata = load_gisdata("existing_gisdata.nc", "existing_network.parquet")

# Option 2: Load network separately
gisdata = GISData()
# ... load grids manually
gisdata.network = load_network("existing_network.parquet")

Integration with simulation

After preprocessing, the GISData object is ready for simulation:

from mobidic import run_preprocessing, run_simulation

# Preprocessing
gisdata = run_preprocessing(config)

# Simulation (future implementation)
results = run_simulation(config, gisdata)

The simulation will access: - gisdata.grids: Spatially-distributed parameters - gisdata.network: River network topology and parameters - gisdata.metadata: Spatial reference information