Skip to content

Preprocessing workflow

The preprocessing module provides a high-level function that runs the complete MOBIDIC preprocessing, from raw GIS data to inputs that can be directly used in the simulation.

Overview

The preprocessing workflow handles:

  1. Loading and validating configuration
  2. Reading all raster data (DTM, flow direction, soil parameters, etc.)
  3. Processing the river network (topology, ordering, routing parameters)
  4. Computing hillslope-reach mapping
  5. Processing reservoirs (optional: polygons, stage-storage curves, regulation curves/schedules)
  6. Organizing all data into a consolidated GISData object

This is the recommended entry point for most users, as it handles all preprocessing steps automatically based on the configuration file.

Functions

Main preprocessing function

Run complete preprocessing workflow.

This function orchestrates the entire preprocessing pipeline: 1. Load raster grids (DTM, flow direction, soil parameters, etc.) 2. Apply grid decimation if needed (decimation factor > 1) 3. Convert flow direction to MOBIDIC notation 4. Process river network (topology, Strahler ordering, routing parameters) 5. Compute hillslope cells for each reach 6. Map hillslope cells to reaches

Parameters:

Name Type Description Default
config MOBIDICConfig

MOBIDIC configuration with preprocessing settings

required

Returns:

Type Description
GISData

GISData object containing all preprocessed spatial data

Examples:

>>> from mobidic import load_config, run_preprocessing
>>> config = load_config("Arno.yaml")
>>> gisdata = run_preprocessing(config)
>>> gisdata.save("Arno_gisdata.nc", "Arno_network.parquet")
Source code in mobidic/preprocessing/preprocessor.py
def run_preprocessing(config: MOBIDICConfig) -> GISData:
    """Run complete preprocessing workflow.

    This function orchestrates the entire preprocessing pipeline:
    1. Load raster grids (DTM, flow direction, soil parameters, etc.)
    2. Apply grid decimation if needed (decimation factor > 1)
    3. Convert flow direction to MOBIDIC notation
    4. Process river network (topology, Strahler ordering, routing parameters)
    5. Compute hillslope cells for each reach
    6. Map hillslope cells to reaches

    Args:
        config: MOBIDIC configuration with preprocessing settings

    Returns:
        GISData object containing all preprocessed spatial data

    Examples:
        >>> from mobidic import load_config, run_preprocessing
        >>> config = load_config("Arno.yaml")
        >>> gisdata = run_preprocessing(config)
        >>> gisdata.save("Arno_gisdata.nc", "Arno_network.parquet")
    """
    logger.info("=" * 80)
    logger.info(f"MOBIDICpy v{__version__} - PREPROCESSING")
    logger.info("=" * 80)
    logger.info(f"Basin: {config.basin.id}")
    logger.info(f"Parameter set: {config.basin.paramset_id}")
    logger.info("")

    # Step 1: Load required raster grids
    logger.info("Step 1/7: Loading raster grids")
    logger.info("-" * 80)

    grids = {}
    metadata = {}

    # Load DTM (required)
    logger.debug(f"Loading DTM from {config.raster_files.dtm}")
    dtm_result = grid_to_matrix(config.raster_files.dtm)
    grids["dtm"] = dtm_result["data"]
    xllcorner = dtm_result["xllcorner"]
    yllcorner = dtm_result["yllcorner"]
    cellsize = dtm_result["cellsize"]
    crs = dtm_result["crs"]

    # Store metadata extracted from DTM
    metadata["xllcorner"] = xllcorner
    metadata["yllcorner"] = yllcorner
    metadata["cellsize"] = cellsize
    metadata["shape"] = grids["dtm"].shape
    metadata["resolution"] = (cellsize, cellsize)
    metadata["nodata"] = np.nan
    metadata["crs"] = crs

    # Load flow direction (required)
    logger.debug(f"Loading flow direction from {config.raster_files.flow_dir}")
    flow_dir_result = grid_to_matrix(config.raster_files.flow_dir)
    grids["flow_dir"] = flow_dir_result["data"]

    # Load flow accumulation (required)
    logger.debug(f"Loading flow accumulation from {config.raster_files.flow_acc}")
    flow_acc_result = grid_to_matrix(config.raster_files.flow_acc)
    flow_acc_data = flow_acc_result["data"]
    if np.any((flow_acc_data[~np.isnan(flow_acc_data)] < 1)):
        flow_acc_data[~np.isnan(flow_acc_data)] += 1
        logger.info("Flow accumulation values were < 1 and have been incremented by 1.")
    grids["flow_acc"] = flow_acc_data

    # Get index of the cell with maximum flow accumulation
    max_acc_idx = np.unravel_index(np.nanargmax(flow_acc_data), flow_acc_data.shape)

    # Set outlet value to -1 in the flow direction grid
    grids["flow_dir"][max_acc_idx] = -1

    # Load soil parameters (required)
    logger.debug(f"Loading Wc0 from {config.raster_files.Wc0}")
    wc0_result = grid_to_matrix(config.raster_files.Wc0)
    grids["Wc0"] = wc0_result["data"] * 0.001  # Convert from mm to m

    logger.debug(f"Loading Wg0 from {config.raster_files.Wg0}")
    wg0_result = grid_to_matrix(config.raster_files.Wg0)
    grids["Wg0"] = wg0_result["data"] * 0.001  # Convert from mm to m

    logger.debug(f"Loading ks from {config.raster_files.ks}")
    ks_result = grid_to_matrix(config.raster_files.ks)
    grids["ks"] = ks_result["data"] * 0.001 / 3600  # Convert from mm/h to m/s

    # Check ranges of ks values (ks_min and ks_max in mm/h from config)
    ks_min = config.parameters.soil.ks_min
    ks_max = config.parameters.soil.ks_max
    if ks_min is not None:
        ks_min = ks_min * 0.001 / 3600
        if np.any(grids["ks"] < ks_min):
            logger.warning(f"Some ks values are below the minimum threshold: {ks_min} m/s")

    if ks_max is not None:
        ks_max = ks_max * 0.001 / 3600
        if np.any(grids["ks"] > ks_max):
            logger.warning(f"Some ks values are above the maximum threshold: {ks_max} m/s")

    if ks_min is not None and ks_max is not None:
        grids["ks"] = np.clip(grids["ks"], ks_min, ks_max)

    # Load optional raster files
    optional_rasters = {
        "kf": config.raster_files.kf,
        "CH": config.raster_files.CH,
        "Alb": config.raster_files.Alb,
        "Ma": config.raster_files.Ma,
        "Mf": config.raster_files.Mf,
        "gamma": config.raster_files.gamma,
        "kappa": config.raster_files.kappa,
        "beta": config.raster_files.beta,
        "alpha": config.raster_files.alpha,
    }

    for name, path in optional_rasters.items():
        if path is not None:
            logger.debug(f"Loading {name} from {path}")
            result = grid_to_matrix(path)
            grids[name] = result["data"]
        else:
            logger.debug(f"Using default value for {name} (no raster provided)")
            # Create grid filled with default parameter value
            default_value = _get_default_parameter_value(name, config)
            grids[name] = np.full_like(grids["dtm"], default_value, dtype=float)

    # Calculate alpsur (surface routing parameter based on slope)
    # From buildgis_mysql_include.m lines 647-651
    logger.debug("Calculating alpsur from DTM slopes")
    slopes = _calculate_slopes(grids["dtm"], cellsize, pmin=1e-5)
    alpsur = np.sqrt(slopes)
    alpsur = alpsur / np.nanmean(alpsur)
    grids["alpsur"] = alpsur

    logger.success(f"Loaded {len(grids)} raster grids. Shape: {grids['dtm'].shape}, cellsize: {cellsize} m")
    logger.info("")

    # Step 2: Apply grid decimation if needed
    logger.info("Step 2/7: Applying grid decimation")
    logger.info("-" * 80)

    decimation_factor = config.simulation.decimation

    if decimation_factor > 1:
        logger.info(f"Decimating grids by factor {decimation_factor}")

        # Decimate most grids using simple averaging
        for name in grids.keys():
            if name not in ["flow_dir", "flow_acc"]:
                logger.debug(f"Decimating {name}")
                grids[name] = decimate_raster(grids[name], decimation_factor)

        # Decimate flow direction and accumulation
        logger.debug("Decimating flow_dir and flow_acc")
        grids["flow_dir"], grids["flow_acc"] = decimate_flow_direction(
            grids["flow_dir"], grids["flow_acc"], decimation_factor
        )

        # Update metadata
        metadata["shape"] = grids["dtm"].shape
        metadata["resolution"] = (
            metadata["resolution"][0] * decimation_factor,
            metadata["resolution"][1] * decimation_factor,
        )

        logger.success(f"Grid decimation complete. New shape: {metadata['shape']}")
    else:
        logger.info("No grid decimation applied (decimation factor = 1)")

    logger.info("")

    # Step 3: Convert flow direction to MOBIDIC notation
    logger.info("Step 3/7: Converting flow direction to MOBIDIC notation")
    logger.info("-" * 80)

    flow_dir_type = config.raster_settings.flow_dir_type

    grids["flow_dir"] = convert_to_mobidic_notation(grids["flow_dir"], from_notation=flow_dir_type)

    logger.success("Flow direction conversion complete")
    logger.info("")

    # Step 4: Process river network
    logger.info("Step 4/7: Processing river network")
    logger.info("-" * 80)

    shapefile_path = config.vector_files.river_network.shp

    routing_params = {
        "wcel": config.parameters.routing.wcel,
        "Br0": config.parameters.routing.Br0,
        "NBr": config.parameters.routing.NBr,
        "n_Man": config.parameters.routing.n_Man,
    }

    network = process_river_network(
        shapefile_path=shapefile_path,
        join_single_tributaries=True,
        routing_params=routing_params,
    )

    logger.info("")

    # Step 5: Compute hillslope cells for each reach
    logger.info("Step 5/7: Computing hillslope cells")
    logger.info("-" * 80)

    network = compute_hillslope_cells(
        network=network,
        grid_array=grids["flow_dir"],
        xllcorner=xllcorner,
        yllcorner=yllcorner,
        cellsize=metadata["resolution"][0],
        densify_step=10.0,
    )

    logger.info("")

    # Step 6: Map hillslope cells to reaches
    logger.info("Step 6/7: Mapping hillslope to reaches")
    logger.info("-" * 80)

    hillslope_reach_map = map_hillslope_to_reach(
        network=network,
        flowdir_array=grids["flow_dir"],
    )

    logger.info("")

    # Step 7: Process reservoirs (optional)
    logger.info("Step 7/7: Processing reservoirs")
    logger.info("-" * 80)

    reservoirs = None
    if config.parameters.reservoirs.res_shape is not None:
        try:
            reservoirs = process_reservoirs(
                shapefile_path=config.parameters.reservoirs.res_shape,
                stage_storage_path=config.parameters.reservoirs.stage_storage,
                regulation_curves_path=config.parameters.reservoirs.regulation_curves,
                regulation_schedule_path=config.parameters.reservoirs.regulation_schedule,
                initial_volumes_path=config.initial_conditions.reservoir_volumes,
                network=network,
                grid_shape=grids["dtm"].shape,
                xllcorner=xllcorner,
                yllcorner=yllcorner,
                cellsize=metadata["resolution"][0],
            )
        except Exception as e:
            logger.error(f"Failed to process reservoirs: {e}")
            logger.warning("Continuing without reservoirs")
            reservoirs = None
    else:
        logger.info("No reservoirs configured, skipping")

    gisdata = GISData(
        grids=grids,
        metadata=metadata,
        network=network,
        hillslope_reach_map=hillslope_reach_map,
        config=config,
        reservoirs=reservoirs,
    )

    logger.info("")
    logger.success("Preprocessing completed successfully")
    logger.info("")

    return gisdata

Reservoir preprocessing function

Process reservoir data from input files.

This function orchestrates the complete reservoir preprocessing: 1. Read reservoir polygons from shapefile 2. Load stage-storage curves 3. Load regulation curves 4. Load regulation schedules 5. Load initial volumes 6. Map reservoirs to grid and network

Translated from MATLAB: buildgis_mysql_include.m (lines 594-740)

Parameters:

Name Type Description Default
shapefile_path str | Path

Path to reservoir polygon shapefile

required
stage_storage_path str | Path

Path to stage-storage CSV

required
regulation_curves_path str | Path

Path to regulation curves CSV

required
regulation_schedule_path str | Path

Path to regulation schedule CSV

required
initial_volumes_path Optional[str | Path]

Path to CSV with initial volumes (columns: ‘reservoir_id’, ‘volume_m3’). If None, initial volumes are auto-calculated as 100% capacity (volume at z_max)

required
network GeoDataFrame

Processed river network GeoDataFrame

required
grid_shape tuple[int, int]

Shape of computational grid (nrows, ncols)

required
xllcorner float

X coordinate of lower-left corner [m]

required
yllcorner float

Y coordinate of lower-left corner [m]

required
cellsize float

Grid cell size [m]

required

Returns:

Type Description
Reservoirs

Reservoirs object with processed data

Source code in mobidic/preprocessing/reservoirs.py
def process_reservoirs(
    shapefile_path: str | Path,
    stage_storage_path: str | Path,
    regulation_curves_path: str | Path,
    regulation_schedule_path: str | Path,
    initial_volumes_path: Optional[str | Path],
    network: gpd.GeoDataFrame,
    grid_shape: tuple[int, int],
    xllcorner: float,
    yllcorner: float,
    cellsize: float,
) -> Reservoirs:
    """Process reservoir data from input files.

    This function orchestrates the complete reservoir preprocessing:
    1. Read reservoir polygons from shapefile
    2. Load stage-storage curves
    3. Load regulation curves
    4. Load regulation schedules
    5. Load initial volumes
    6. Map reservoirs to grid and network

    Translated from MATLAB: buildgis_mysql_include.m (lines 594-740)

    Args:
        shapefile_path: Path to reservoir polygon shapefile
        stage_storage_path: Path to stage-storage CSV
        regulation_curves_path: Path to regulation curves CSV
        regulation_schedule_path: Path to regulation schedule CSV
        initial_volumes_path: Path to CSV with initial volumes (columns: 'reservoir_id', 'volume_m3').
            If None, initial volumes are auto-calculated as 100% capacity (volume at z_max)
        network: Processed river network GeoDataFrame
        grid_shape: Shape of computational grid (nrows, ncols)
        xllcorner: X coordinate of lower-left corner [m]
        yllcorner: Y coordinate of lower-left corner [m]
        cellsize: Grid cell size [m]

    Returns:
        Reservoirs object with processed data
    """
    logger.info("Processing reservoir data")
    logger.info("-" * 80)

    # Read reservoir polygons
    logger.debug(f"Reading reservoir shapefile: {shapefile_path}")
    res_polygons = gpd.read_file(shapefile_path)
    logger.info(f"Found {len(res_polygons)} reservoirs in shapefile")

    if len(res_polygons) == 0:
        logger.warning("No reservoirs found in shapefile")
        return Reservoirs([], metadata={"crs": res_polygons.crs})

    # Read stage-storage curves
    logger.debug(f"Reading stage-storage curves: {stage_storage_path}")
    stage_storage = pd.read_csv(stage_storage_path)

    # Read regulation curves
    logger.debug(f"Reading regulation curves: {regulation_curves_path}")
    regulation_curves = pd.read_csv(regulation_curves_path)

    # Read regulation schedule
    logger.debug(f"Reading regulation schedule: {regulation_schedule_path}")
    regulation_schedule = pd.read_csv(regulation_schedule_path)
    regulation_schedule["start_date"] = pd.to_datetime(regulation_schedule["start_date"])
    regulation_schedule["end_date"] = pd.to_datetime(regulation_schedule["end_date"])

    # Read initial volumes (optional)
    if initial_volumes_path is not None:
        logger.debug(f"Reading initial volumes: {initial_volumes_path}")
        initial_volumes = pd.read_csv(initial_volumes_path)
        volumes_dict = dict(zip(initial_volumes["reservoir_id"], initial_volumes["volume_m3"]))
        logger.info(f"Loaded initial volumes for {len(volumes_dict)} reservoirs from CSV")
    else:
        volumes_dict = {}
        logger.info("No initial volumes CSV provided, will auto-calculate from stage-storage curves")

    # Process each reservoir
    reservoirs = []
    for _, res_row in res_polygons.iterrows():
        res_id = res_row["id"]
        logger.debug(f"Processing reservoir {res_id}: {res_row.get('name', 'Unnamed')}")

        # Get reservoir polygon geometry
        polygon = res_row.geometry

        # Get z_max from shapefile
        z_max = res_row["zmax"]

        # Get stage-storage curve for this reservoir
        ss_curve = stage_storage[stage_storage["reservoir_id"] == res_id].copy()
        ss_curve = ss_curve.sort_values("stage_m")

        if len(ss_curve) == 0:
            logger.warning(f"No stage-storage curve found for reservoir {res_id}, skipping")
            continue

        # Drop reservoir_id column as it's used only for mapping
        ss_curve = ss_curve.drop(columns=["reservoir_id"])

        # Get initial volume: try CSV first, then auto-calculate from curve
        initial_volume = volumes_dict.get(res_id)
        if initial_volume is None:
            # Auto-calculate from stage-storage curve
            if len(ss_curve) > 0:
                initial_volume = _interpolate_volume_at_stage(ss_curve, z_max)
                logger.debug(
                    f"Auto-calculated initial volume for reservoir {res_id}: "
                    f"{initial_volume:.0f}m³ at z_max={z_max:.2f}m"
                )
            else:
                logger.warning(f"No stage-storage curve available for reservoir {res_id}, using 0m³")
                initial_volume = 0.0

        # Get regulation curves for this reservoir
        reg_curves_data = regulation_curves[regulation_curves["reservoir_id"] == res_id]
        reg_names = reg_curves_data["regulation_name"].unique()

        # Get regulation schedule for this reservoir
        reg_schedule = regulation_schedule[regulation_schedule["reservoir_id"] == res_id].copy()
        reg_schedule = reg_schedule.sort_values("start_date")

        # Build regulation curves arrays
        n_periods = len(reg_schedule)
        if n_periods == 0:
            logger.warning(f"No regulation schedule found for reservoir {res_id}, skipping")
            continue

        # Find maximum number of points across all regulation curves
        max_points = 0
        reg_curves_dict = {}
        for reg_name in reg_names:
            curve = reg_curves_data[reg_curves_data["regulation_name"] == reg_name].copy()
            curve = curve.sort_values("stage_m")
            reg_curves_dict[reg_name] = curve
            max_points = max(max_points, len(curve))

        # Build lawH and lawQ arrays (n_periods × max_points)
        lawH = np.full((n_periods, max_points), np.nan)
        lawQ = np.full((n_periods, max_points), np.nan)
        period_times = []

        for i, (_, sched_row) in enumerate(reg_schedule.iterrows()):
            reg_name = sched_row["regulation_name"]
            period_times.append(sched_row["start_date"])

            if reg_name in reg_curves_dict:
                curve = reg_curves_dict[reg_name]
                n_pts = len(curve)
                lawH[i, :n_pts] = curve["stage_m"].values
                lawQ[i, :n_pts] = curve["discharge_m3s"].values
            else:
                logger.warning(f"Regulation curve '{reg_name}' not found for reservoir {res_id}")

        # Rasterize polygon to get basin pixels
        logger.debug(f"Rasterizing reservoir {res_id} polygon")
        basin_pixels = _rasterize_reservoir_polygon(polygon, grid_shape, xllcorner, yllcorner, cellsize)
        logger.debug(f"Reservoir {res_id} basin: {len(basin_pixels)} cells")

        # Create temporary reservoir object for reach finding
        temp_reservoir = Reservoir(id=res_id, z_max=z_max, geometry=polygon)

        # Find inlet and outlet reaches
        logger.debug(f"Finding inlet/outlet reaches for reservoir {res_id}")
        inlet_reaches, outlet_reach = _find_reservoir_reaches(temp_reservoir, network)

        if outlet_reach >= 0:
            logger.debug(f"Reservoir {res_id} outlet reach: {outlet_reach}")
            if len(inlet_reaches) > 0:
                logger.debug(f"Reservoir {res_id} inlet reaches: {inlet_reaches}")
        else:
            logger.warning(f"No outlet reach found for reservoir {res_id}")

        # Convert arrays to dicts for storage (use zero-padded string keys for Parquet compatibility)
        period_times_dict = {f"{i:03d}": pd.Timestamp(t).isoformat() for i, t in enumerate(period_times)}
        stage_discharge_h_dict = {f"{i:03d}": lawH[i, :].tolist() for i in range(n_periods)}
        stage_discharge_q_dict = {f"{i:03d}": lawQ[i, :].tolist() for i in range(n_periods)}

        # Create Reservoir object
        reservoir = Reservoir(
            id=res_id,  # Use original shapefile id
            z_max=z_max,
            basin_pixels=basin_pixels,
            inlet_reaches=inlet_reaches,
            outlet_reach=outlet_reach,
            period_times=period_times_dict,
            stage_discharge_h=stage_discharge_h_dict,
            stage_discharge_q=stage_discharge_q_dict,
            initial_volume=initial_volume,
            geometry=polygon,
            name=res_row.get("name", ""),
            date_start=pd.to_datetime(res_row.get("date_start", None)),
            stage_storage_curve=ss_curve,
        )

        reservoirs.append(reservoir)

    logger.success(f"Processed {len(reservoirs)} reservoirs")

    return Reservoirs(reservoirs, metadata={"crs": res_polygons.crs})

Workflow stages

The preprocessing pipeline consists of up to seven stages (stages 6-7 are optional if reservoirs are configured):

Stage 1: configuration loading

Loads and validates the YAML configuration file:

config = load_config(config_path)

All subsequent steps are driven by paths and parameters in this configuration.

Stage 2: raster data loading

Reads all raster files specified in the configuration:

  • Required rasters: DTM, flow direction, flow accumulation
  • Soil parameters: Wc0, Wg0, ks, and optional kf
  • Energy parameters: CH, Alb (if energy balance enabled)
  • Flow coefficients: gamma, kappa, beta, alpha (if provided as rasters)

All rasters are: - Validated for consistent shape, resolution, and CRS - Converted to numpy arrays with NaN for nodata - Stored in the GISData.grids dictionary

Stage 3: river network processing

Processes the river network shapefile:

network = process_river_network(
    shapefile_path=config.vector_files.river_network.shp,
    join_single_tributaries=True,
    routing_params={
        "wcel": config.parameters.routing.wcel,
        "Br0": config.parameters.routing.Br0,
        "NBr": config.parameters.routing.NBr,
        "n_Man": config.parameters.routing.n_Man,
    }
)

This step: - Builds network topology - Enforces binary tree structure - Computes Strahler ordering - Calculates routing parameters - Determines calculation order

Stage 4: hillslope-reach mapping

Maps hillslope grid cells to river reaches:

network = compute_hillslope_cells(network, grid_path)
reach_map = map_hillslope_to_reach(network, flowdir_path, flow_dir_type)

This establishes the connection between the distributed grid and the river network for lateral inflow routing.

Stage 5: data consolidation

Packages everything into a GISData object:

gisdata = GISData()
gisdata.grids = all_grid_data
gisdata.network = processed_network
gisdata.metadata = spatial_reference_info

The GISData object can then be saved for reuse or passed directly to the simulation.

Stage 6: reservoir preprocessing (optional)

If reservoirs are configured (config.parameters.reservoirs.res_shape is set), process reservoir data:

reservoirs = process_reservoirs(
    res_shape_path=config.parameters.reservoirs.res_shape,
    stage_storage_path=config.parameters.reservoirs.stage_storage,
    regulation_curves_path=config.parameters.reservoirs.regulation_curves,
    regulation_schedule_path=config.parameters.reservoirs.regulation_schedule,
    initial_volumes_path=config.initial_conditions.reservoir_volumes,
    grid_transform=gisdata.metadata['transform'],
    grid_shape=gisdata.metadata['shape'],
    network=gisdata.network,
)
gisdata.reservoirs = reservoirs

This step: - Reads reservoir polygon shapefile - Rasterizes polygons to identify basin pixels - Loads stage-storage curves from CSV - Loads regulation curves and schedules from CSV - Identifies inlet/outlet reaches by network topology - Auto-calculates initial volumes from z_max if not provided - Consolidates all reservoir data into Reservoirs container

Stage 7: reservoir I/O (optional)

Save/load processed reservoir data:

# Save reservoirs to GeoParquet
gisdata.save(
    gisdata_path=config.paths.gisdata,
    network_path=config.paths.network,
    reservoirs_path=config.paths.reservoirs,
)

# Load reservoirs from GeoParquet
gisdata = GISData.load(
    gisdata_path=config.paths.gisdata,
    network_path=config.paths.network,
    reservoirs_path=config.paths.reservoirs,
)

Complete example

from mobidic import load_config, run_preprocessing

# Load configuration
config = load_config("config.yaml")

# Run complete preprocessing
gisdata = run_preprocessing(config)

# Inspect results
print(f"Basin: {config.basin.id}")
print(f"Grid shape: {gisdata.metadata['shape']}")
print(f"Resolution: {gisdata.metadata['resolution']} m")
print(f"CRS: {gisdata.metadata['crs']}")
print(f"Grid variables: {list(gisdata.grids.keys())}")
print(f"Network reaches: {len(gisdata.network)}")
print(f"Strahler orders: {sorted(gisdata.network['strahler_order'].unique())}")

# Save for later use
gisdata.save(
    gisdata_path=config.paths.gisdata,
    network_path=config.paths.network
)

Note: To configure logging behavior (level, output file, etc.), see the Logging section in the Configuration reference.

Configuration requirements

The preprocessing workflow requires the following configuration sections:

Required paths

paths:
  meteodata: path/to/meteo.nc
  gisdata: path/to/gisdata.nc      # Output path
  network: path/to/network.parquet # Output path
  states: path/to/states/          # For simulation states
  output: path/to/output/          # For simulation outputs

Required vector files

vector_files:
  river_network:
    shp: path/to/river_network.shp
    id_field: REACH_ID  # Optional, for tracking original IDs

Required raster files

raster_files:
  dtm: path/to/dtm.tif
  flow_dir: path/to/flowdir.tif
  flow_acc: path/to/flowacc.tif
  Wc0: path/to/wc0.tif  # Capillary capacity
  Wg0: path/to/wg0.tif  # Gravitational capacity
  ks: path/to/ks.tif    # Hydraulic conductivity

Required raster settings

raster_settings:
  flow_dir_type: Grass  # or Arc

Required routing parameters

parameters:
  routing:
    method: Linear
    wcel: 5.0      # Wave celerity (m/s)
    Br0: 1.0       # Base channel width (m)
    NBr: 1.5       # Channel width exponent
    n_Man: 0.03    # Manning's n (s/m^(1/3))

Optional reservoir parameters

parameters:
  reservoirs:
    res_shape: path/to/reservoirs.shp                    # Reservoir polygon shapefile
    stage_storage: path/to/stage_storage.csv             # Stage-storage curves
    regulation_curves: path/to/regulation_curves.csv     # Stage-discharge curves
    regulation_schedule: path/to/regulation_schedule.csv # Regulation period schedule

initial_conditions:
  reservoir_volumes: path/to/initial_volumes.csv  # Optional (auto-calculated if omitted)

paths:
  reservoirs: path/to/reservoirs.parquet  # Output path for consolidated reservoir data

output_states:
  reservoir_states: true  # Enable reservoir state output

CSV file formats:

  • stage_storage.csv: Columns: reservoir_id, stage_m, volume_m3
  • regulation_curves.csv: Columns: reservoir_id, regulation_name, stage_m, discharge_m3s
  • regulation_schedule.csv: Columns: reservoir_id, start_date, end_date, regulation_name
  • initial_volumes.csv: Columns: reservoir_id, volume_m3 (optional, defaults to auto-calculation from z_max)

See the sample configuration for a complete example.

Error handling

The preprocessing workflow performs comprehensive validation:

Configuration validation

  • All required paths and parameters are present
  • Numeric parameters are within valid ranges
  • File paths exist (if validation enabled)

Spatial consistency

  • All rasters have the same shape
  • All rasters have the same resolution
  • All rasters have compatible CRS
  • Network CRS matches raster CRS (or is reprojected)

Data quality

  • Rasters have valid nodata handling
  • Network has no topological errors
  • Flow direction is valid (no invalid direction codes)
  • Flow accumulation is consistent with flow direction

Advanced usage

Skipping preprocessing stages

If you already have some preprocessed data, you can skip certain stages and load existing files:

from mobidic import load_network, load_gisdata, GISData

# Option 1: Load complete preprocessed data
gisdata = load_gisdata("existing_gisdata.nc", "existing_network.parquet")

# Option 2: Load network separately
gisdata = GISData()
# ... load grids manually
gisdata.network = load_network("existing_network.parquet")

Integration with simulation

After preprocessing, the GISData object is ready for simulation:

from mobidic import run_preprocessing, run_simulation

# Preprocessing
gisdata = run_preprocessing(config)

# Simulation (future implementation)
results = run_simulation(config, gisdata)

The simulation will access: - gisdata.grids: Spatially-distributed parameters - gisdata.network: River network topology and parameters - gisdata.metadata: Spatial reference information