Source code for pycmor.std_lib.files

"""
This module contains functions for handling file-related operations in the pycmor package.
It includes functions for creating filepaths based on given rules and datasets, and for
saving the resulting datasets to the generated filepaths.



Table 2: Precision of time labels used in file names
|---------------+-------------------+-----------------------------------------------|
| Frequency     | Precision of time | Notes                                         |
|               | label             |                                               |
|---------------+-------------------+-----------------------------------------------|
| yr, dec,      | “yyyy”            | Label with the years recorded in the first    |
| yrPt          |                   | and last coordinate values.                   |
|---------------+-------------------+-----------------------------------------------|
| mon, monC     | “yyyyMM”          | For “mon”, label with the months recorded in  |
|               |                   | the first and last coordinate values; for     |
|               |                   | “monC” label with the first and last months   |
|               |                   | contributing to the climatology.              |
|---------------+-------------------+-----------------------------------------------|
| day           | “yyyyMMdd”        | Label with the days recorded in the first and |
|               |                   | last coordinate values.                       |
|---------------+-------------------+-----------------------------------------------|
| 6hr, 3hr,     | “yyyyMMddhhmm”    | Label 1hrCM files with the beginning of the   |
| 1hr,          |                   | first hour and the end of the last hour       |
| 1hrCM, 6hrPt, |                   | contributing to climatology (rounded to the   |
| 3hrPt,        |                   | nearest minute); for other frequencies in     |
| 1hrPt         |                   | this category, label with the first and last  |
|               |                   | time-coordinate values (rounded to the        |
|               |                   | nearest minute).                              |
|---------------+-------------------+-----------------------------------------------|
| subhrPt       | “yyyyMMddhhmmss”  | Label with the first and last time-coordinate |
|               |                   | values (rounded to the nearest second)        |
|---------------+-------------------+-----------------------------------------------|
| fx            | Omit time label   | This frequency applies to variables that are  |
|               |                   | independent of time (“fixed”).                |
|---------------+-------------------+-----------------------------------------------|

"""

from pathlib import Path

import pandas as pd
import xarray as xr
from xarray.core.utils import is_scalar

from ..core.logging import logger
from .dataset_helpers import get_time_label, has_time_axis


[docs] def _filename_time_range(ds, rule) -> str: """ Determine the time range used in naming the file. Parameters ---------- ds : xarray.Dataset The input dataset. rule : Rule The rule object containing information for generating the filepath. Returns ------- str time_range in filepath. """ if not has_time_axis(ds): return "" time_label = get_time_label(ds) if is_scalar(ds[time_label]): return "" start = pd.Timestamp(str(ds[time_label].data[0])) end = pd.Timestamp(str(ds[time_label].data[-1])) # frequency_str = rule.get("frequency_str") frequency_str = rule.data_request_variable.frequency if frequency_str in ("yr", "yrPt", "dec"): return f"{start:%Y}-{end:%Y}" if frequency_str in ("mon", "monC", "monPt"): return f"{start:%Y%m}-{end:%Y%m}" if frequency_str == "day": return f"{start:%Y%m%d}-{end:%Y%m%d}" if frequency_str in ("6hr", "3hr", "1hr", "6hrPt", "3hrPt", "1hrPt", "1hrCM"): _start = start.round("1min") _end = end.round("1min") return f"{_start:%Y%m%d%H%M}-{_end:%Y%m%d%H%M}" if frequency_str == "subhrPt": _start = start.round("1s") _end = end.round("1s") return f"{_start:%Y%m%d%H%M%S}-{_end:%Y%m%d%H%M%S}" if frequency_str == "fx": return "" else: raise NotImplementedError(f"No implementation for {frequency_str} yet.")
[docs] def create_filepath(ds, rule): """ Generate a filepath when given an xarray dataset and a rule. This function generates a filepath for the output file based on the given dataset and rule. The filepath includes the name, table_id, institution, source_id, experiment_id, label, grid, and optionally the start and end time. Parameters ---------- ds : xarray.Dataset The input dataset. rule : Rule The rule object containing information for generating the filepath. Returns ------- str The generated filepath. Notes ----- The rule object should have the following attributes: cmor_variable, data_request_variable, variant_label, source_id, experiment_id, output_directory, and optionally institution. """ name = rule.cmor_variable table_id = rule.data_request_variable.table_header.table_id # Omon label = rule.variant_label # r1i1p1f1 source_id = rule.source_id # AWI-CM-1-1-MR experiment_id = rule.experiment_id # historical out_dir = rule.output_directory # where to save output files institution = getattr(rule, "institution", "AWI") grid = rule.grid_label # grid_type time_range = _filename_time_range(ds, rule) # check if output sub-directory is needed enable_output_subdirs = rule._pycmor_cfg.get("enable_output_subdirs", False) if enable_output_subdirs: subdirs = rule.ga.subdir_path() out_dir = f"{out_dir}/{subdirs}" filepath = f"{out_dir}/{name}_{table_id}_{institution}-{source_id}_{experiment_id}_{label}_{grid}_{time_range}.nc" Path(filepath).parent.mkdir(parents=True, exist_ok=True) return filepath
[docs] def get_offset(rule): """convert offset defined on the rule to a timedelta.""" offset = getattr(rule, "adjust_timestamp", None) if offset is not None: offset_presets = { "first": 0, "start": 0, "last": 1, "end": 1, "mid": 0.5, "middle": 0.5, } offset = offset_presets.get(offset, offset) try: offset = float(offset) except (ValueError, TypeError): # expect offset to a literal string. Example: "14D" offset = pd.Timedelta(offset) else: # offset is a float value scaled by the approx_interval approx_interval = float( rule.data_request_variable.table_header.approx_interval ) dt = pd.Timedelta(approx_interval, unit="d") offset = dt * float(offset) return offset
[docs] def file_timespan_tail(rule): """Grab the last timestamp in each file and return them as a list. Also account for offset (if any) defined on the rule""" times = [] try: options = {"decode_times": xr.coders.CFDatetimeCoder(use_cftime=True)} except AttributeError: # in python3.9, xarray does not have coders options = {"use_cftime": True} for _input in rule.inputs: for f in sorted(_input.files): ds = xr.open_dataset(str(f), **options) time_label = get_time_label(ds) if time_label: times.append(ds[time_label].values[-1]) offset = get_offset(rule) if offset is not None: times = xr.CFTimeIndex(times) + offset times = list(times.values) return times
[docs] def split_data_timespan(ds, rule): """ Splits the dataset into chunks based on the time axis as defined in the source files. Parameters ---------- ds : xarray.Dataset The dataset to split. rule : Rule The rule object containing information for generating the filepath. Returns ------- list A list of datasets, each containing a chunk of the original dataset. """ time_cuts = file_timespan_tail(rule) ncuts = len(time_cuts) time_label = get_time_label(ds) if not time_label: return [ds] resampled_times = ds[time_label].values ref = pd.DataFrame(resampled_times, columns=["dateindex"]) ref["mark"] = ncuts for ind, timecut in enumerate(reversed(time_cuts), start=1): ref.loc[ref.dateindex < timecut, "mark"] = ncuts - ind result = [] for _, grp in ref.groupby("mark"): result.append((grp.dateindex.iloc[0], grp.dateindex.iloc[-1])) data_chunks = [] for timespan in result: da = ds.sel({time_label: slice(timespan[0], timespan[-1])}) data_chunks.append(da) if not data_chunks: data_chunks.append(ds) return data_chunks
[docs] def _save_dataset_with_native_timespan( da, rule, time_label, time_encoding, **extra_kwargs, ): paths = [] datasets = split_data_timespan(da, rule) for group_ds in datasets: paths.append(create_filepath(group_ds, rule)) return xr.save_mfdataset( datasets, paths, encoding={time_label: time_encoding}, **extra_kwargs, )
[docs] def save_dataset(da: xr.DataArray, rule): """ Save dataset to one or more files. Parameters ---------- da : xr.DataArray The dataset to be saved. rule : Rule The rule object containing information for generating the filepath. Returns ------- None Notes ----- If the dataset does not have a time axis, or if the time axis is a scalar, this function will save the dataset to a single file. Otherwise, it will split the dataset into chunks based on the time axis and save each chunk to a separate file. The filepath will be generated based on the rule object and the time range of the dataset. The filepath will include the name, table_id, institution, source_id, experiment_id, label, grid, and optionally the start and end time. If the dataset needs resampling (i.e., the time axis does not align with the time frequency specified in the rule object), this function will split the dataset into chunks based on the time axis and resample each chunk to the specified frequency. The resampled chunks will then be saved to separate files. NOTE: prior to calling this function, call dask.compute() method, otherwise tasks will progress very slow. """ time_dtype = rule._pycmor_cfg("xarray_time_dtype") time_unlimited = rule._pycmor_cfg("xarray_time_unlimited") extra_kwargs = {} if time_unlimited: extra_kwargs.update({"unlimited_dims": ["time"]}) time_encoding = {"dtype": time_dtype} time_encoding = {k: v for k, v in time_encoding.items() if v is not None} if not has_time_axis(da): filepath = create_filepath(da, rule) return da.to_netcdf( filepath, mode="w", format="NETCDF4", ) time_label = get_time_label(da) if is_scalar(da[time_label]): filepath = create_filepath(da, rule) return da.to_netcdf( filepath, mode="w", format="NETCDF4", encoding={time_label: time_encoding}, **extra_kwargs, ) if isinstance(da, xr.DataArray): da = da.to_dataset() # Not sure about this, maybe it needs to go above, before the is_scalar # check if rule._pycmor_cfg("xarray_time_set_standard_name"): da[time_label].attrs["standard_name"] = "time" if rule._pycmor_cfg("xarray_time_set_long_name"): da[time_label].attrs["long_name"] = "time" if rule._pycmor_cfg("xarray_time_enable_set_axis"): time_axis_str = rule._pycmor_cfg("xarray_time_taxis_str") da[time_label].attrs["axis"] = time_axis_str if rule._pycmor_cfg("xarray_time_remove_fill_value_attr"): time_encoding["_FillValue"] = None if not has_time_axis(da): filepath = create_filepath(da, rule) return da.to_netcdf( filepath, mode="w", format="NETCDF4", **extra_kwargs, ) default_file_timespan = rule._pycmor_cfg("file_timespan") file_timespan = getattr(rule, "file_timespan", default_file_timespan) if file_timespan == "file_native": return _save_dataset_with_native_timespan( da, rule, time_label, time_encoding, **extra_kwargs, ) else: file_timespan_as_offset = pd.tseries.frequencies.to_offset(file_timespan) file_timespan_as_dt = ( pd.Timestamp.now() + file_timespan_as_offset - pd.Timestamp.now() ) approx_interval = float(rule.data_request_variable.table_header.approx_interval) dt = pd.Timedelta(approx_interval, unit="d") if file_timespan_as_dt < dt: logger.warning( f"file_timespan {file_timespan_as_dt} is smaller than approx_interval {dt}" "falling back to timespan as defined in the source file" ) return _save_dataset_with_native_timespan( da, rule, time_label, time_encoding, **extra_kwargs, ) else: groups = da.resample(time=file_timespan) paths = [] datasets = [] for group_name, group_ds in groups: paths.append(create_filepath(group_ds, rule)) datasets.append(group_ds) return xr.save_mfdataset( datasets, paths, encoding={time_label: time_encoding}, **extra_kwargs, )