"""
This module contains functions for creating, loading and manipulating a file cache.
The file cache is a CSV file that contains a pandas DataFrame with the following columns:
- ``variable``: The name of the variable in the file.
- ``freq``: The frequency of the variable in the file.
- ``start``: The start time of the variable in the file.
- ``end``: The end time of the variable in the file.
- ``timespan``: The timespan of the variable in the file.
- ``steps``: The number of time steps in the variable in the file.
- ``units``: The units of the variable in the file.
- ``filename``: The filename of the file.
- ``filesize``: The file size of the file in bytes.
- ``mtime``: The last modified time of the file in seconds since the epoch.
- ``checksum``: The imohash checksum of the file.
- ``filepath``: The absolute path to the file.
The file cache can be used to quickly select files from the cache that have a
specific variable, frequency, start date, end date, timespan, number of time
steps, units, filename, file size, last modified time, checksum, or absolute
path.
The file cache is stored in the following location by default:
$HOME/.config/pymor_filecache.csv
The file cache can be loaded and saved using the following functions:
.. code-block:: python
>>> from pycmor.core.filecache import Filecache
>>> cache = Filecache.load()
>>> cache.save()
Collect metadata about the file(s) by adding it to the cache with the following methods:
`cache.add_file` or `cache.add_files`
.. code-block:: python
>>> filepath = "tests/data/test_experiments/my_expid/outdata/fesom/volo.nc"
>>> cache.add_file(filepath)
>>> # adding multiple files at once
>>> cache.add_files(["tests/data/dummy_data/random1.nc", "tests/data/dummy_data/random2.nc"])
You can access the metadata of a file in the cache using the `get` method:
.. code-block:: python
>>> filepath = "tests/data/test_experiments/my_expid/outdata/fesom/volo.nc"
>>> # alternative way of adding file to cache and getting the metadata is by usuig the `get` method
>>> cache.get(filepath) # doctest: +ELLIPSIS
filepath tests/data/test_experiments/my_expid/outdata/f...
filename volo.nc
checksum imohash:c8047bbd7e292dbe54a6387611f500c4
filesize 584
mtime ...
start 1951-01-02 00:00:00
end 1951-01-13 00:00:00
timespan 11 days, 0:00:00
freq D
steps 12
variable volo
units m3
Name: 0, dtype: object
For an overview of the cached data, use `summary` method: This method returns a
pandas DataFrame containing the summary each of the variables in the cache. The
fields include the variable name, frequency, start date, end date, timespan,
number of files in the collection for this variable.
.. code-block:: python
>>> cache.summary()
variable seq volo
freq D D
start 0001-01-01 00:00:00 1951-01-02 00:00:00
end 0001-01-11 00:00:00 1951-01-13 00:00:00
timespan 10 days 00:00:00 11 days 00:00:00
nfiles 2 1
steps 11 12
size 2120 584
To use a subset of the collection for a given variable, use `select_range`
method. This will limit the files in the cache to those that are within the
given range.
.. code-block:: python
>>> c = cache.select_range(variable="tas", start="1850-01-01", end="1900-01-01")
"""
import atexit
import datetime
import io
import os
import shutil
from pathlib import Path
from typing import List, Optional, Union
import numpy as np
import pandas as pd
import xarray as xr
from imohash import hashfile
from tqdm.contrib.concurrent import process_map
from .infer_freq import infer_frequency
CACHE_FILE = "~/.cache/pymor_filecache.csv"
[docs]
class Filecache:
_fields = "variable freq start end timespan steps units filename filesize mtime checksum filepath".split()
def __init__(self, cache: Optional[pd.DataFrame] = None):
"""
Parameters
----------
cache : pd.DataFrame, optional
A pandas DataFrame with columns corresponding to the fields of the file cache.
If not provided, an empty DataFrame is created.
Attributes
----------
df : pd.DataFrame
A pandas DataFrame containing the file cache.
"""
if cache is None:
cache = pd.DataFrame([], columns=self._fields)
self.df: pd.DataFrame = cache
self._new_record = False
[docs]
@classmethod
def load(cls):
"""
Load the file cache from the default location.
Returns
-------
pd.DataFrame
A pandas DataFrame containing the file cache.
"""
p = Path(CACHE_FILE).expanduser()
if not p.exists():
p.parent.mkdir(exist_ok=True, parents=True)
p.touch()
with p.open() as f:
comment = f.readline()
comment = comment.strip()
if comment.startswith("#"):
meta_string = comment
else:
# there no date recorded for the cache.
# create todays date
_date = datetime.datetime.now().strftime("%Y-%m-%d")
_checkfreq = "1ME"
meta_string = f"#{_date};{_checkfreq}"
meta_string = meta_string.rstrip() + "\n"
if p.stat().st_size == 0:
data = None
else:
data = pd.read_csv(str(p), comment="#")
obj = cls(data)
setattr(obj, "cache_meta", meta_string)
return obj
[docs]
def save(self) -> None:
"""
Save the file cache to the default location.
"""
if self._new_record:
buf = io.StringIO()
buf.write(self.cache_meta)
self.df.to_csv(buf, index=False)
with open(Path(CACHE_FILE).expanduser(), "w") as f:
buf.seek(0)
shutil.copyfileobj(buf, f)
[docs]
def _add_file(self, filename: str) -> None:
"""
Internal method to add a file to the cache.
Only adds a file if no file with the same name already exists in the cache.
"""
name = Path(filename).name
if name not in self.df.filename.values:
self.df = self.df._append(self._make_record(filename), ignore_index=True)
[docs]
def add_file(self, filename: str) -> None:
"""
Add a file to the cache.
Only adds a file if no file with the same name already exists in the cache.
Parameters
----------
filename : str
The path to the file to add.
"""
name = Path(filename).name
if name not in self.df.filename.values:
self._new_record = True
record = self._make_record(filename).to_frame().T
if self.df.empty:
self.df = record
else:
self.df = pd.concat([self.df, record], ignore_index=True)
[docs]
def add_files(self, files: List[str]) -> None:
"""
Add a list of files to the cache.
Only adds a file if no file with the same name already exists in the cache.
Parameters
----------
files : list of str
List of paths to the files to add.
"""
_files = np.asarray(files)
mask = np.isin(_files, self.df.filepath.values)
files = _files[~mask].tolist()
if not files:
print("No new files found")
return
self._new_record = True
records = process_map(
self._make_record,
files,
chunksize=5,
max_workers=10,
unit="files",
)
if self.df.empty:
self.df = pd.DataFrame(records)
else:
self.df = pd.concat([self.df, pd.DataFrame(records)], ignore_index=True)
[docs]
def infer_freq(self, filename: str):
info = self.get(filename)
if info.freq is not None:
return info.freq
filepath = info.filepath
dirname = os.path.dirname(filepath)
variable = info.variable
# we need variable records from this directory only.
mask = self.df.filepath.str.startswith(dirname)
df = self.df[mask]
df = df[df.variable == variable]
dates = df.start.sort_values().values
dates = [pd.Timestamp(d) for d in dates]
freq = infer_frequency(dates, log=True)
# Update the cache with the inferred frequency
filename_mask = self.df.filepath == filepath
if filename_mask.any():
self.df.loc[filename_mask, "freq"] = freq
self._new_record = True # Mark for saving
return freq
[docs]
def _make_record(self, filename: str) -> pd.Series:
"""
Internal method to create a record from a file.
Parameters
----------
filename : str
The path to the file to create a record from.
Returns
-------
pd.Series
A pandas Series containing the metadata of the file.
"""
record = {}
record["filepath"] = filename
record["filename"] = os.path.basename(filename)
# file checksum
record["checksum"] = f"imohash:{hashfile(filename, hexdigest=True)}"
# file stats
st = os.stat(filename)
record["filesize"] = st.st_size
record["mtime"] = st.st_mtime
# load_dataset
ds = xr.open_dataset(filename, use_cftime=True)
t = ds.time.to_pandas()
record["start"] = str(t.iloc[0])
record["end"] = str(t.iloc[-1])
record["timespan"] = str(t.iloc[-1] - t.iloc[0])
# Try to infer frequency from this file's time steps first
record["freq"] = self._infer_freq_from_file(filename, ds, t)
record["steps"] = t.size
record["variable"] = list(ds.data_vars.keys()).pop()
record["units"] = [
val.attrs.get("units") for val in ds.data_vars.values()
].pop()
ds.close()
return pd.Series(record)
[docs]
def _infer_freq_from_file(
self, filename: str, ds: xr.Dataset, time_series: pd.Series
) -> str:
"""
Infer frequency from a file's time steps, with fallback to multi-file approach.
Parameters
----------
filename : str
Path to the file being processed
ds : xr.Dataset
The opened xarray dataset
time_series : pd.Series
The time coordinate as pandas Series
Returns
-------
str or None
The inferred frequency, or None if unable to determine
"""
# Convert time series to timestamps, handling cftime objects
try:
if hasattr(time_series.iloc[0], "strftime"): # cftime object
timestamps = [
pd.Timestamp(t.strftime("%Y-%m-%d %H:%M:%S")) for t in time_series
]
else:
timestamps = [pd.Timestamp(t) for t in time_series]
except Exception:
return None
# Strategy 1: Try to infer from single file if it has enough time steps (>2)
if len(timestamps) > 2:
try:
freq = infer_frequency(
timestamps, log=False
) # Don't log for single file attempts
if freq is not None:
return freq
except Exception:
pass
# Strategy 2: Fallback to multi-file approach for files with 1-2 time steps
return self._infer_freq_from_directory(filename, ds)
[docs]
def _infer_freq_from_directory(self, filename: str, ds: xr.Dataset) -> str:
"""
Infer frequency by collecting time steps from all files with same variable in same directory.
Optimized to avoid redundant file I/O and O(N²) behavior.
Parameters
----------
filename : str
Path to the current file
ds : xr.Dataset
The opened xarray dataset
Returns
-------
str or None
The inferred frequency, or None if unable to determine
"""
try:
dirname = os.path.dirname(filename)
variable = list(ds.data_vars.keys())[0]
# Find all files in cache with same variable and directory
mask = self.df.filepath.str.startswith(dirname)
df = self.df[mask]
df = df[df.variable == variable]
# Early termination: if any file already has frequency determined, use it
existing_freq = df["freq"].dropna()
if not existing_freq.empty and existing_freq.iloc[0] is not None:
freq = existing_freq.iloc[0]
self._update_freq_for_group(dirname, variable, freq)
return freq
if len(df) < 2: # Need at least 2 files for multi-file inference
return None
# Use cached timestamps from start/end instead of re-reading files
all_timestamps = []
for _, row in df.iterrows():
try:
# Extract timestamps from cached start/end data
start_ts = pd.Timestamp(row.start)
end_ts = pd.Timestamp(row.end)
# For files with multiple steps, approximate intermediate timestamps
steps = row.steps
if steps == 1:
all_timestamps.append(start_ts)
elif steps == 2:
all_timestamps.extend([start_ts, end_ts])
else:
# For files with >2 steps, we already have frequency from single-file inference
# Just use start timestamp to represent the file
all_timestamps.append(start_ts)
except Exception:
continue
if len(all_timestamps) > 2:
# Sort all timestamps and infer frequency
all_timestamps.sort()
freq = infer_frequency(all_timestamps, log=True)
# Update frequency for all files in this group
if freq is not None:
self._update_freq_for_group(dirname, variable, freq)
return freq
except Exception:
pass
return None
[docs]
def _update_freq_for_group(self, dirname: str, variable: str, freq: str) -> None:
"""
Update frequency for all files with same variable in same directory.
Parameters
----------
dirname : str
Directory path
variable : str
Variable name
freq : str
Inferred frequency
"""
mask = self.df.filepath.str.startswith(dirname)
df_mask = mask & (self.df.variable == variable)
if df_mask.any():
self.df.loc[df_mask, "freq"] = freq
self._new_record = True
[docs]
def summary(self, variable=None) -> pd.DataFrame:
"""
Return a summary of the cached files.
Parameters
----------
None
Returns
-------
pd.DataFrame
A pandas DataFrame containing the summary of the cached files.
The summary includes the following information:
- `freq`: the frequency of the files (str)
- `start`: the start date of the files (str)
- `end`: the end date of the files (str)
- `timespan`: the timespan of the files (str)
- `nfiles`: the number of files (int)
- `steps`: the number of steps in the files (int)
- `size`: the total size of the files (int)
The summary is grouped by the variable name of the files.
"""
def _summary(df: pd.DataFrame) -> pd.Series:
d = {}
d["freq"] = df.freq.iloc[0]
d["start"] = start = df.start.min()
d["end"] = end = df.end.max()
d["timespan"] = str(pd.Timestamp(end) - pd.Timestamp(start))
d["nfiles"] = df.shape[0]
d["steps"] = df.steps.iloc[0]
d["size"] = df.filesize.sum()
return pd.Series(d)
info = self.df.groupby(["variable"]).apply(_summary, include_groups=False)
info = info.T
if variable:
if variable in info.columns:
return info[variable]
else:
raise ValueError(
f"Variable not found. Possible variables: {list(info.columns)}"
)
return info
[docs]
def details(self) -> pd.DataFrame:
return self.df
[docs]
def variables(self) -> List[str]:
"""
Return a list of unique variable names in the cache.
Parameters
----------
None
Returns
-------
list
A list of unique variable names in the cache.
"""
return self.df.variable.unique().tolist()
[docs]
def frequency(
self, *, filename: Optional[str] = None, variable: Optional[str] = None
) -> str:
"""
Return the frequency of a variable or a file.
Parameters
----------
filename : str, optional
The path to the file to get the frequency from.
variable : str, optional
The variable to get the frequency from.
Returns
-------
str
The frequency of the variable or file.
"""
if filename is None and variable is None:
return dict(self.df[["variable", "freq"]].drop_duplicates().values.tolist())
if variable:
return (
self.df[self.df.variable == variable]["freq"]
.drop_duplicates()
.squeeze()
)
if filename:
name = Path(filename).name
return (
(self.df[self.df.filename == name])["freq"].drop_duplicates().squeeze()
)
[docs]
def show_range(self, *, variable: Optional[str] = None) -> pd.DataFrame:
"""
Return the start and end dates of the cached files.
Parameters
----------
variable : str, optional
The variable to filter the results by.
Returns
-------
pd.DataFrame
A pandas DataFrame containing the start and end dates of the cached files.
"""
df = self.df
if variable:
df = self.df[self.df.variable == variable]
return df[["start", "end"]]
[docs]
def select_range(
self,
*,
start: Optional[Union[str, pd.Timestamp]] = None,
end: Optional[Union[str, pd.Timestamp]] = None,
variable: Optional[str] = None,
) -> "Filecache":
"""
Select the files in the cache that have a time range within the given start and end dates.
Parameters
----------
start : str or pd.Timestamp, optional
The start date of the time range. If None, the start date of the first file is used.
end : str or pd.Timestamp, optional
The end date of the time range. If None, the end date of the last file is used.
variable : str, optional
The variable to filter the results by.
Returns
-------
Filecache
A new Filecache object containing the selected files.
"""
df = self.df
if variable:
df = self.df[self.df.variable == variable]
if start is None and end is None:
return df
_start = df["start"].apply(pd.Timestamp)
_end = df["end"].apply(pd.Timestamp)
start = start and pd.Timestamp(start) or _start.min()
end = end and pd.Timestamp(end) or _end.max()
df = df[(_start >= start) & (_end <= end)]
return Filecache(df)
[docs]
def validate_range(
self,
*,
start: Optional[Union[str, pd.Timestamp]] = None,
end: Optional[Union[str, pd.Timestamp]] = None,
variable: Optional[str] = None,
) -> bool:
"""
Validate the given time range.
Parameters
----------
start : str or pd.Timestamp, optional
The start date of the time range. If None, the start date of the first file is used.
end : str or pd.Timestamp, optional
The end date of the time range. If None, the end date of the last file is used.
variable : str, optional
The variable to filter the results by.
Returns
-------
bool
True if the given time range is valid, False otherwise.
Raises
------
ValueError
If the given time range is out-of-bounds.
"""
df = self.df
if variable:
known_variables = self.variables()
assert (
variable in known_variables
), f"{variable} is not in {known_variables}"
df = self.df[self.df.variable == variable]
if start:
start_ts = pd.Timestamp(start)
_start = df["start"].apply(pd.Timestamp)
is_valid = start_ts >= _start.min()
if not is_valid:
raise ValueError(
f"Start date {start} is out-of-bounds. Valid range: {_start.min()} - {_start.max()}"
)
if end:
end_ts = pd.Timestamp(end)
_end = df["end"].apply(pd.Timestamp)
is_valid = end_ts <= _end.max()
if not is_valid:
raise ValueError(
f"End date {end} is out-of-bounds. Valid range: {_end.min()} - {_end.max()}"
)
return True
[docs]
def files(
self, *, variable: Optional[str] = None, fullpath: bool = True
) -> List[str]:
"""
Return the list of files in the cache.
Parameters
----------
variable : str, optional
The variable to filter the results by.
fullpath : bool
If True, return the full path to each file. If False, return the
filename only.
Returns
-------
list of str
The list of files in the cache.
"""
df = self.df
if variable:
df = self.df[self.df.variable == variable]
col = "filepath" if fullpath else "filename"
return df[col].tolist()
[docs]
def get(self, filename):
"""
Return the record for the given filename from the cache.
Parameters
----------
filename : str
The path to the file to get the record for.
Returns
-------
pd.DataFrame
The record for the given filename from the cache.
Notes
-----
If the filename is not in the cache and the file exists, it is added
to the cache and the record is returned.
"""
name = Path(filename).name
df = self.df[self.df.filename.str.contains(name)]
if df.empty:
if Path(filename).exists():
self.add_file(filename)
return self.get(filename)
series = df.iloc[0]
return series
fc = Filecache.load()
[docs]
@atexit.register
def _save():
"""
Perform the save operation on the file cache.
This function is registered to execute at program exit using `atexit.register`.
It triggers the `save` method of the `fc` object, which saves the file cache.
"""
fc.save()
[docs]
def register_cache(ds):
"""
Register a dataset in the file cache. use this as a preprocessing step with ~xr.open_mfdataset.
Parameters
----------
ds : xarray.Dataset
The dataset to register. The source filename is extracted from the
dataset's encoding and added to the cache.
Returns
-------
xr.Dataset
"""
filename = ds.encoding["source"]
fc.add_file(filename)
return ds