"""
This module contains functions for handling file-related operations in the pycmor package.
It includes functions for creating filepaths based on given rules and datasets, and for
saving the resulting datasets to the generated filepaths.
Table 2: Precision of time labels used in file names
|---------------+-------------------+-----------------------------------------------|
| Frequency | Precision of time | Notes |
| | label | |
|---------------+-------------------+-----------------------------------------------|
| yr, dec, | “yyyy” | Label with the years recorded in the first |
| yrPt | | and last coordinate values. |
|---------------+-------------------+-----------------------------------------------|
| mon, monC | “yyyyMM” | For “mon”, label with the months recorded in |
| | | the first and last coordinate values; for |
| | | “monC” label with the first and last months |
| | | contributing to the climatology. |
|---------------+-------------------+-----------------------------------------------|
| day | “yyyyMMdd” | Label with the days recorded in the first and |
| | | last coordinate values. |
|---------------+-------------------+-----------------------------------------------|
| 6hr, 3hr, | “yyyyMMddhhmm” | Label 1hrCM files with the beginning of the |
| 1hr, | | first hour and the end of the last hour |
| 1hrCM, 6hrPt, | | contributing to climatology (rounded to the |
| 3hrPt, | | nearest minute); for other frequencies in |
| 1hrPt | | this category, label with the first and last |
| | | time-coordinate values (rounded to the |
| | | nearest minute). |
|---------------+-------------------+-----------------------------------------------|
| subhrPt | “yyyyMMddhhmmss” | Label with the first and last time-coordinate |
| | | values (rounded to the nearest second) |
|---------------+-------------------+-----------------------------------------------|
| fx | Omit time label | This frequency applies to variables that are |
| | | independent of time (“fixed”). |
|---------------+-------------------+-----------------------------------------------|
"""
from pathlib import Path
import pandas as pd
import xarray as xr
from xarray.core.utils import is_scalar
from ..core.logging import logger
from .dataset_helpers import get_time_label, has_time_axis
[docs]
def _filename_time_range(ds, rule) -> str:
"""
Determine the time range used in naming the file.
Parameters
----------
ds : xarray.Dataset
The input dataset.
rule : Rule
The rule object containing information for generating the
filepath.
Returns
-------
str
time_range in filepath.
"""
if not has_time_axis(ds):
return ""
time_label = get_time_label(ds)
if is_scalar(ds[time_label]):
return ""
start = pd.Timestamp(str(ds[time_label].data[0]))
end = pd.Timestamp(str(ds[time_label].data[-1]))
# frequency_str = rule.get("frequency_str")
frequency_str = rule.data_request_variable.frequency
if frequency_str in ("yr", "yrPt", "dec"):
return f"{start:%Y}-{end:%Y}"
if frequency_str in ("mon", "monC", "monPt"):
return f"{start:%Y%m}-{end:%Y%m}"
if frequency_str == "day":
return f"{start:%Y%m%d}-{end:%Y%m%d}"
if frequency_str in ("6hr", "3hr", "1hr", "6hrPt", "3hrPt", "1hrPt", "1hrCM"):
_start = start.round("1min")
_end = end.round("1min")
return f"{_start:%Y%m%d%H%M}-{_end:%Y%m%d%H%M}"
if frequency_str == "subhrPt":
_start = start.round("1s")
_end = end.round("1s")
return f"{_start:%Y%m%d%H%M%S}-{_end:%Y%m%d%H%M%S}"
if frequency_str == "fx":
return ""
else:
raise NotImplementedError(f"No implementation for {frequency_str} yet.")
[docs]
def create_filepath(ds, rule):
"""
Generate a filepath when given an xarray dataset and a rule.
This function generates a filepath for the output file based on
the given dataset and rule. The filepath includes the name,
table_id, institution, source_id, experiment_id, label, grid, and
optionally the start and end time.
Parameters
----------
ds : xarray.Dataset
The input dataset.
rule : Rule
The rule object containing information for generating the
filepath.
Returns
-------
str
The generated filepath.
Notes
-----
The rule object should have the following attributes:
cmor_variable, data_request_variable, variant_label, source_id,
experiment_id, output_directory, and optionally institution.
"""
name = rule.cmor_variable
table_id = rule.data_request_variable.table_header.table_id # Omon
label = rule.variant_label # r1i1p1f1
source_id = rule.source_id # AWI-CM-1-1-MR
experiment_id = rule.experiment_id # historical
out_dir = rule.output_directory # where to save output files
institution = getattr(rule, "institution", "AWI")
grid = rule.grid_label # grid_type
time_range = _filename_time_range(ds, rule)
# check if output sub-directory is needed
enable_output_subdirs = rule._pycmor_cfg.get("enable_output_subdirs", False)
if enable_output_subdirs:
subdirs = rule.ga.subdir_path()
out_dir = f"{out_dir}/{subdirs}"
filepath = f"{out_dir}/{name}_{table_id}_{institution}-{source_id}_{experiment_id}_{label}_{grid}_{time_range}.nc"
Path(filepath).parent.mkdir(parents=True, exist_ok=True)
return filepath
[docs]
def get_offset(rule):
"""convert offset defined on the rule to a timedelta."""
offset = getattr(rule, "adjust_timestamp", None)
if offset is not None:
offset_presets = {
"first": 0,
"start": 0,
"last": 1,
"end": 1,
"mid": 0.5,
"middle": 0.5,
}
offset = offset_presets.get(offset, offset)
try:
offset = float(offset)
except (ValueError, TypeError):
# expect offset to a literal string. Example: "14D"
offset = pd.Timedelta(offset)
else:
# offset is a float value scaled by the approx_interval
approx_interval = float(
rule.data_request_variable.table_header.approx_interval
)
dt = pd.Timedelta(approx_interval, unit="d")
offset = dt * float(offset)
return offset
[docs]
def file_timespan_tail(rule):
"""Grab the last timestamp in each file and return them as a list.
Also account for offset (if any) defined on the rule"""
times = []
try:
options = {"decode_times": xr.coders.CFDatetimeCoder(use_cftime=True)}
except AttributeError:
# in python3.9, xarray does not have coders
options = {"use_cftime": True}
for _input in rule.inputs:
for f in sorted(_input.files):
ds = xr.open_dataset(str(f), **options)
time_label = get_time_label(ds)
if time_label:
times.append(ds[time_label].values[-1])
offset = get_offset(rule)
if offset is not None:
times = xr.CFTimeIndex(times) + offset
times = list(times.values)
return times
[docs]
def split_data_timespan(ds, rule):
"""
Splits the dataset into chunks based on the time axis as defined in the source files.
Parameters
----------
ds : xarray.Dataset
The dataset to split.
rule : Rule
The rule object containing information for generating the
filepath.
Returns
-------
list
A list of datasets, each containing a chunk of the original dataset.
"""
time_cuts = file_timespan_tail(rule)
ncuts = len(time_cuts)
time_label = get_time_label(ds)
if not time_label:
return [ds]
resampled_times = ds[time_label].values
ref = pd.DataFrame(resampled_times, columns=["dateindex"])
ref["mark"] = ncuts
for ind, timecut in enumerate(reversed(time_cuts), start=1):
ref.loc[ref.dateindex < timecut, "mark"] = ncuts - ind
result = []
for _, grp in ref.groupby("mark"):
result.append((grp.dateindex.iloc[0], grp.dateindex.iloc[-1]))
data_chunks = []
for timespan in result:
da = ds.sel({time_label: slice(timespan[0], timespan[-1])})
data_chunks.append(da)
if not data_chunks:
data_chunks.append(ds)
return data_chunks
[docs]
def _save_dataset_with_native_timespan(
da,
rule,
time_label,
time_encoding,
**extra_kwargs,
):
paths = []
datasets = split_data_timespan(da, rule)
for group_ds in datasets:
paths.append(create_filepath(group_ds, rule))
return xr.save_mfdataset(
datasets,
paths,
encoding={time_label: time_encoding},
**extra_kwargs,
)
[docs]
def save_dataset(da: xr.DataArray, rule):
"""
Save dataset to one or more files.
Parameters
----------
da : xr.DataArray
The dataset to be saved.
rule : Rule
The rule object containing information for generating the
filepath.
Returns
-------
None
Notes
-----
If the dataset does not have a time axis, or if the time axis is a scalar,
this function will save the dataset to a single file. Otherwise, it will
split the dataset into chunks based on the time axis and save each chunk
to a separate file.
The filepath will be generated based on the rule object and the time range
of the dataset. The filepath will include the name, table_id, institution,
source_id, experiment_id, label, grid, and optionally the start and end time.
If the dataset needs resampling (i.e., the time axis does not align with the
time frequency specified in the rule object), this function will split the
dataset into chunks based on the time axis and resample each chunk to the
specified frequency. The resampled chunks will then be saved to separate
files.
NOTE: prior to calling this function, call dask.compute() method,
otherwise tasks will progress very slow.
"""
time_dtype = rule._pycmor_cfg("xarray_time_dtype")
time_unlimited = rule._pycmor_cfg("xarray_time_unlimited")
extra_kwargs = {}
if time_unlimited:
extra_kwargs.update({"unlimited_dims": ["time"]})
time_encoding = {"dtype": time_dtype}
time_encoding = {k: v for k, v in time_encoding.items() if v is not None}
if not has_time_axis(da):
filepath = create_filepath(da, rule)
return da.to_netcdf(
filepath,
mode="w",
format="NETCDF4",
)
time_label = get_time_label(da)
if is_scalar(da[time_label]):
filepath = create_filepath(da, rule)
return da.to_netcdf(
filepath,
mode="w",
format="NETCDF4",
encoding={time_label: time_encoding},
**extra_kwargs,
)
if isinstance(da, xr.DataArray):
da = da.to_dataset()
# Not sure about this, maybe it needs to go above, before the is_scalar
# check
if rule._pycmor_cfg("xarray_time_set_standard_name"):
da[time_label].attrs["standard_name"] = "time"
if rule._pycmor_cfg("xarray_time_set_long_name"):
da[time_label].attrs["long_name"] = "time"
if rule._pycmor_cfg("xarray_time_enable_set_axis"):
time_axis_str = rule._pycmor_cfg("xarray_time_taxis_str")
da[time_label].attrs["axis"] = time_axis_str
if rule._pycmor_cfg("xarray_time_remove_fill_value_attr"):
time_encoding["_FillValue"] = None
if not has_time_axis(da):
filepath = create_filepath(da, rule)
return da.to_netcdf(
filepath,
mode="w",
format="NETCDF4",
**extra_kwargs,
)
default_file_timespan = rule._pycmor_cfg("file_timespan")
file_timespan = getattr(rule, "file_timespan", default_file_timespan)
if file_timespan == "file_native":
return _save_dataset_with_native_timespan(
da,
rule,
time_label,
time_encoding,
**extra_kwargs,
)
else:
file_timespan_as_offset = pd.tseries.frequencies.to_offset(file_timespan)
file_timespan_as_dt = (
pd.Timestamp.now() + file_timespan_as_offset - pd.Timestamp.now()
)
approx_interval = float(rule.data_request_variable.table_header.approx_interval)
dt = pd.Timedelta(approx_interval, unit="d")
if file_timespan_as_dt < dt:
logger.warning(
f"file_timespan {file_timespan_as_dt} is smaller than approx_interval {dt}"
"falling back to timespan as defined in the source file"
)
return _save_dataset_with_native_timespan(
da,
rule,
time_label,
time_encoding,
**extra_kwargs,
)
else:
groups = da.resample(time=file_timespan)
paths = []
datasets = []
for group_name, group_ds in groups:
paths.append(create_filepath(group_ds, rule))
datasets.append(group_ds)
return xr.save_mfdataset(
datasets,
paths,
encoding={time_label: time_encoding},
**extra_kwargs,
)