Source code for pycmor.fesom_1p4.nodes_to_levels

#!/usr/bin/env python3
"""
This module contains a function to convert FESOM 1.4 output data stored in
the dimensions (nodes, time) to the dimensions (nodes, levels, time).

You can include it in your Pipeline by using the step::

    pycmor.fesom1.nodes_to_levels

This script can also be used stand-alone::

    $ pycmor scripts fesom1 nodes_to_levels mesh in_path out_path [variable]

The argument ``[variable]`` defaults to ``"temp"``.
"""
import os

import numpy as np
import rich_click as click
import xarray as xr
from dask.diagnostics import ProgressBar

from .load_mesh_data import ind_for_depth, load_mesh


[docs] def open_dataset(filepath): """Open a dataset with Xarray.""" return xr.open_dataset(filepath, engine="netcdf4", decode_times=False)
[docs] def save_dataset(ds, filepath, compute=True): """Save an Xarray dataset to a NetCDF file.""" print(f"Saving {filepath}") with ProgressBar(): ds.to_netcdf(filepath, mode="w", format="NETCDF4", compute=compute)
[docs] def process_dataset(input_path, output_path, processor, skip=False): """ General framework for loading, processing, and saving datasets. Parameters: input_path (str): Path to the input file. output_path (str): Path to the output file. processor (function): Function that takes an Xarray dataset and returns a processed one. skip (bool): Whether to skip processing if the output file exists. """ if skip and os.path.isfile(output_path): print(f"File {output_path} exists. Skipping.") return ds_in = open_dataset(input_path) ds_out = processor(ds_in) save_dataset(ds_out, output_path) ds_in.close()
[docs] def interpolate_to_levels(data, mesh, indices): """ Interpolates unstructured data onto depth levels for FESOM. Parameters: data (np.ndarray): Input data for a single time step. mesh (object): FESOM mesh object. indices (dict): Precomputed depth and mask indices. Returns: np.ndarray: Interpolated data (depth, ncells). """ level_data = np.full((len(mesh.zlevs), mesh.n2d), np.nan) for i, (ind_depth, ind_noempty, ind_empty) in enumerate( zip( indices["ind_depth_all"], indices["ind_noempty_all"], indices["ind_empty_all"], ) ): level_data[i, ind_noempty] = data[ind_depth[ind_noempty]] level_data[i, ind_empty] = np.nan # Fill missing values return level_data
[docs] def interpolate_dataarray(ds_in, mesh, indices): """ Applies depth-level interpolation across an entire dataset. Parameters: ds_in (xarray.DataArray): Input dataset. mesh (object): FESOM mesh object. variable (str): Variable name to interpolate. indices (dict): Precomputed depth and mask indices. Returns: xarray.Dataset: Dataset with interpolated values. """ variable = ds_in.name # Apply interpolation per time step level_data = xr.apply_ufunc( interpolate_to_levels, # data_var, ds_in, input_core_dims=[["nodes_3d"]], output_core_dims=[["depth", "ncells"]], vectorize=True, dask="parallelized", kwargs={"mesh": mesh, "indices": indices}, output_dtypes=[np.float32], output_sizes={"depth": len(mesh.zlevs), "ncells": mesh.n2d}, ) # Build the output dataset coords = { "time": ds_in["time"], "depth": ("depth", mesh.zlevs), "ncells": ("ncells", np.arange(mesh.n2d)), } attrs = ds_in.attrs.copy() attrs.update( { "grid_type": "unstructured", "description": f"{variable} interpolated to FESOM levels", } ) # FIXME(PG): This works but is hard to read. Level_data is already an xr.DataArray, so # we need to get out the "data" attribute again... ds_out = xr.Dataset( {variable: (["time", "depth", "ncells"], level_data.data, attrs)}, coords=coords, ) # Add metadata for time and depth ds_out["time"].attrs = ds_in["time"].attrs ds_out["depth"].attrs = { "units": "m", "long_name": "depth", "positive": "down", "axis": "Z", } return ds_out
[docs] def interpolate_dataset(ds_in, variable, mesh, indices): """Interpolate Dataset -> Interpolate DataArray converter function""" da_in = ds_in[variable] return interpolate_dataarray(da_in, mesh, indices)
[docs] def nodes_to_levels(data, rule): mesh = rule.mesh_path mesh = load_mesh(mesh) indices = indicies_from_mesh(mesh) data = interpolate_dataarray(data, mesh, indices) return data
# FIXME(PG): This should be... a method of the mesh object??
[docs] def indicies_from_mesh(mesh): # Precompute depth indices indices = { "ind_depth_all": [], "ind_noempty_all": [], "ind_empty_all": [], } for zlev in mesh.zlevs: ind_depth, ind_noempty, ind_empty = ind_for_depth(zlev, mesh) indices["ind_depth_all"].append(ind_depth) indices["ind_noempty_all"].append(ind_noempty) indices["ind_empty_all"].append(ind_empty) return indices
[docs] def _convert(meshpath, ipath, opath, variable, ncore, skip): """Main CLI for FESOM unstructured-to-structured conversion.""" mesh = load_mesh(meshpath, usepickle=False, usejoblib=True) indices = indicies_from_mesh(mesh) # Define a reusable processor function def processor(ds_in): return interpolate_dataset(ds_in, variable, mesh, indices) # Process all input files for ifile in ipath: ofile = os.path.join(opath, f"{os.path.basename(ifile)[:-3]}_levels.nc") process_dataset(ifile, ofile, processor, skip=skip)
@click.command() @click.argument("meshpath", type=click.Path(exists=True), required=True) @click.argument("ipath", nargs=-1, type=click.Path(exists=True), required=True) @click.argument("opath", nargs=1, required=False, default="./") @click.argument("variable", nargs=1, required=False, default="temp") @click.option( "--ncore", "-n", default=2, help="Number of cores (parallel processes)", show_default=True, ) @click.option( "--skip", "-s", is_flag=True, help="Skip the calculation if the output file already exists.", ) def convert(meshpath, ipath, opath, variable, ncore, skip): """Main CLI for FESOM unstructured-to-structured conversion.""" _convert(meshpath, ipath, opath, variable, ncore, skip) if __name__ == "__main__": convert()