pycmor.core package

Contents

pycmor.core package#

Submodules#

pycmor.core.aux_files module#

Auxiliary files that can be attached to a Rule

class pycmor.core.aux_files.AuxiliaryFile(name, path, loader=None, loader_args=None, loader_kwargs=None)[source]#

Bases: object

A class to represent an auxiliary file.

name#

The name of the file.

Type:

str

path#

The path to the file.

Type:

str

loader#

A callable to load the file.

Type:

callable, optional

loader_args#

Arguments to pass to the loader.

Type:

list, optional

loader_kwargs#

Keyword arguments to pass to the loader.

Type:

dict, optional

load():

Loads the file using the specified loader or reads the file content.

from_dict(d):

Creates an AuxiliaryFile instance from a dictionary.

classmethod from_dict(d)[source]#

Creates an AuxiliaryFile instance from a dictionary.

Parameters:

d (dict) – A dictionary containing the attributes of the AuxiliaryFile.

Returns:

An instance of AuxiliaryFile.

Return type:

AuxiliaryFile

load()[source]#

Loads the file using the specified loader or reads the file content.

Returns:

  • str – The content of the file if no loader is specified.

  • object – The result of the loader if a loader is specified.

pycmor.core.aux_files.attach_files_to_rule(rule)[source]#

Attaches extra files to the rule

Mutates:

rule – The Rule object is modified to include the loaded auxiliary files

pycmor.core.caching module#

This module contains the functions that are used to cache the results of the tasks.

pycmor.core.caching.generate_cache_key(task, inputs)[source]#

Generate a cache key for the task

pycmor.core.caching.inspect_cache(cache_dir='~/.prefect/storage')[source]#
pycmor.core.caching.inspect_result(result)[source]#
pycmor.core.caching.manual_checkpoint(data, rule)[source]#

Manually insert a checkpoint in the flow

pycmor.core.calendar module#

Yet another calendar implementation.

This module provides functions for creating date ranges.

The main components of this module are:

  • year_bounds_major_digits: generates a list of year ranges (bounds) where each range starts with a specific digit.

  • date_ranges_from_bounds: creates a list of date indexes from bounds

  • date_ranges_from_year_bounds: creates a list of date indexes from year bounds

  • simple_ranges_from_bounds: creates a list of simple ranges from bounds

Examples

>>> year_bounds = year_bounds_major_digits(2000, 2010, 2, 2)
>>> print(year_bounds)
[[2000, 2001], [2002, 2003], [2004, 2005], [2006, 2007], [2008, 2009], [2010, 2010]]
pycmor.core.calendar.assign_time_axis(da: DataArray, taxis)[source]#
pycmor.core.calendar.date_ranges_from_bounds(bounds, freq: str = 'M', **kwargs)[source]#

Class method to create a list of instances from a list of start and end bounds.

Parameters:
  • bounds (list of tuple of str or datetime-like) – A list of strings or datetime-like tuples each containing a start and end bound.

  • freq (str, optional) – The frequency of the periods. Defaults to one month.

  • **kwargs – Additional keyword arguments to pass to the date_range function.

Returns:

A tuple containing instances of the class for each provided bound.

Return type:

tuple

Examples

>>> bounds = [("2020-01-01", "2020-12-31")]
>>> date_ranges_from_bounds(bounds, freq="M")
DatetimeIndex(['2020-01-31', '2020-02-29', ..., '2020-12-31'], dtype='datetime64[ns]', freq='ME')
pycmor.core.calendar.date_ranges_from_year_bounds(year_bounds, freq: str = 'M', **kwargs)[source]#

Class method to create a list of instances from a list of year bounds.

Parameters:
  • year_bounds (list of lists or tuples) – A list of lists, each containing a start and end year.

  • freq (str, optional) – The frequency of the periods. Defaults to one month.

  • **kwargs – Additional keyword arguments to pass to the date_range function.

pycmor.core.calendar.simple_ranges_from_bounds(bounds)[source]#

Create a list of simple ranges from a list of bounds.

pycmor.core.calendar.year_bounds_major_digits(first, last, step, binning_digit, return_type=<class 'int'>)[source]#

Generate year ranges with a specific first digit.

This function generates a list of year ranges (bounds) where each range starts with a specific digit (binning_digit). The ranges are generated from a given start year (first) to an end year (last) with a specific step size.

Parameters:
  • first (int) – The first year in the range.

  • last (int) – The last year in the range.

  • step (int) – The step size for the range.

  • binning_digit (int) – The digit that each range should start with.

  • return_type (type, optional) – The type of the elements in the returned list, either int or pendulum.DateTime. Defaults to int.

Returns:

A list of lists where each inner list is a range of years.

Return type:

list

Raises:

ValueError – If the binning_digit is greater than 10.

Examples

>>> year_bounds_major_digits(2000, 2010, 2, 2)
[[2000, 2001], [2002, 2003], [2004, 2005], [2006, 2007], [2008, 2009], [2010, 2010]]
>>> year_bounds_major_digits(2000, 2010, 3, 3)
[[2000, 2002], [2003, 2005], [2006, 2008], [2009, 2010]]

Notes

This function uses a while loop to iterate through the years from first to last. It checks the ones digit of the current year and compares it with the binning_digit to determine the start of a new range. If the first range is undersized (i.e., the binning_digit is in the ones digit of the first few years), the function will continue to increment the current year until it hits the binning_digit. If the first range is not undersized, the function will continue to increment the current year until it hits the next binning_digit. Once a range is completed, it is appended to the bounds list and the process continues until the last year is reached.

pycmor.core.cluster module#

This module contains the functions to manage the Dask cluster.

class pycmor.core.cluster.DaskContext[source]#

Bases: object

Global singleton to store the current Dask cluster.

This class ensures that there is only one active Dask cluster at any given time. It provides methods to set and retrieve the current cluster.

Examples

Setting a Dask cluster: >>> from dask.distributed import LocalCluster >>> cluster = LocalCluster() >>> with DaskContext.set_cluster(cluster): … # Perform operations with the active cluster … active_cluster = DaskContext.get_cluster() … print(active_cluster) # Outputs the current cluster LocalCluster(…)

Retrieving the current Dask cluster: >>> try: … active_cluster = DaskContext.get_cluster() … except RuntimeError as e: … print(e) No active Dask cluster in context!

_current_cluster = None#
classmethod get_cluster()[source]#
classmethod set_cluster(cluster)[source]#

Checks whether the default user configuration for the dashboard link is valid. If the configuration is invalid it tried to catch the following errors:

  • KeyError: ‘JUPYTERHUB_SERVICE_PREFIX’ -> The dashboard link is not valid because the cluster was not launched from JupyterHub. In this case, the default dashboard link is set to ‘http://{host}:8787’.

Parameters:

cluster (dask_jobqueue.SLURMCluster) – The Dask cluster to set the dashboard link.

pycmor.core.cmorizer module#

class pycmor.core.cmorizer.CMORizer(pymor_cfg=None, pycmor_cfg=None, general_cfg=None, pipelines_cfg=None, rules_cfg=None, dask_cfg=None, inherit_cfg=None, **kwargs)[source]#

Bases: object

_SUPPORTED_CMOR_VERSIONS = ('CMIP6', 'CMIP7')#

Supported CMOR versions.

Type:

tuple

_caching_check()[source]#

Checks if workflows are possible to be cached

static _caching_single_rule(rule)[source]#
_check_is_subperiod()[source]#
_check_units()[source]#
static _ensure_dask_slurm_account(jobqueue_cfg)[source]#
_match_pipelines_in_rules(force=False)[source]#
_parallel_process_dask(external_client=None)[source]#
_parallel_process_prefect()[source]#
_post_init_attach_pymor_config_rules()[source]#
_post_init_configure_dask()[source]#

Sets up configuration for Dask-Distributed

See also

https

//docs.dask.org/en/stable/configuration.html?highlight=config#directly-within-python

_post_init_create_controlled_vocabularies()[source]#

Reads the controlled vocabularies from the directory tree rooted at <tables_dir>/CMIP6_CVs and stores them in the controlled_vocabularies attribute. This is done after the rules have been populated with the tables and data request variables, which may be used to lookup the controlled vocabularies.

_post_init_create_dask_cluster()[source]#
_post_init_create_data_request()[source]#

Creates a DataRequest object from the tables directory.

_post_init_create_data_request_tables()[source]#

Loads all the tables from table directory as a mapping object. A shortened version of the filename (i.e., CMIP6_Omon.json -> Omon) is used as the mapping key. The same key format is used in CMIP6_table_id.json

_post_init_create_global_attributes_on_rules()[source]#
_post_init_create_pipelines()[source]#
_post_init_create_rules()[source]#
_post_init_inherit_rules()[source]#
_post_init_populate_rules_with_aux_files()[source]#

Attaches auxiliary files to the rules

_post_init_populate_rules_with_controlled_vocabularies()[source]#
_post_init_populate_rules_with_data_request_variables()[source]#
_post_init_populate_rules_with_dimensionless_unit_mappings()[source]#

Reads the dimensionless unit mappings from a configuration file and updates the rules with these mappings.

This method reads the dimensionless unit mappings from a file specified in the configuration. If the file is not specified or does not exist, an empty dictionary is used. The mappings are then added to each rule in the rules attribute.

Parameters:

None

Return type:

None

_post_init_populate_rules_with_tables()[source]#

Populates the rules with the tables in which the variable described by that rule is found.

static _process_rule(rule)[source]#
_rule_for_cmor_variable(cmor_variable)[source]#
_rule_for_filepath(filepath)[source]#
_rules_depluralize_drvs()[source]#

Ensures that only one data request variable is assigned to each rule

_rules_expand_drvs()[source]#
add_pipeline(pipeline)[source]#
add_rule(rule)[source]#
check_prefect()[source]#
check_rules_for_output_dir(output_dir)[source]#
check_rules_for_table(table_name)[source]#
find_matching_rule(data_request_variable: DataRequestVariable) Rule[source]#
classmethod from_dict(data)[source]#
parallel_process(backend='prefect')[source]#
process(parallel=None)[source]#
serial_process()[source]#
validate()[source]#

Performs validation on files if they are suitable for use with the pipeline requirements

pycmor.core.cmorizer.DIMENSIONLESS_MAPPING_TABLE = PosixPath('/home/docs/checkouts/readthedocs.org/user_builds/pymor/checkouts/stable/src/pycmor/data/dimensionless_mappings.yaml')#

The dimenionless unit mapping table, used to recreate meaningful units from dimensionless fractional values (e.g. 0.001 –> g/kg)

Type:

Path

pycmor.core.config module#

This module defines the configuration hierarchy for the pycmor application, using everett’s ~everett.manager.ConfigManager. The configuration hierarchy is as follows (lowest to highest priority):

  1. Hardcoded defaults

  2. User configuration file

  3. Run-specific configuration

  4. Environment variables

  5. Command-line switches

The configuration hierarchy is defined in the from_pycmor_cfg class method, and cannot be modified outside the class. You should initialize a PycmorConfigManager object (probably in your CMORizer) and grab config values from it by calling with the config key as an argument.

User Configuration File#

You can define global configuration options in a user configuration file. The files found at these locations will be used, in highest to lowest priority order:

  1. ${PYCMOR_CONFIG_FILE}

  2. ${XDG_CONFIG_HOME}/pycmor.yaml

  3. ${XDG_CONFIG_HOME}/pycmor/pycmor.yaml

  4. ~/.pycmor.yaml

Note that the ${XDG_CONFIG_HOME} environment variable defaults to ~/.config if it is not set.

Configuration Options#

You can configure the following:

component pycmor.core.config.PycmorConfig#

Configuration summary:

Setting

Parser

Required?

PYCMOR_DASK_CLUSTER

<ChoiceOf(str, [‘local’, ‘slurm’])>

PYCMOR_DASK_CLUSTER_SCALING_FIXED_JOBS

int

PYCMOR_DASK_CLUSTER_SCALING_MAXIMUM_JOBS

int

PYCMOR_DASK_CLUSTER_SCALING_MINIMUM_JOBS

int

PYCMOR_DASK_CLUSTER_SCALING_MODE

<ChoiceOf(str, [‘adapt’, ‘fixed’])>

PYCMOR_DIMENSIONLESS_MAPPING_TABLE

str

PYCMOR_ENABLE_DASK

pycmor.core.config._parse_bool

PYCMOR_ENABLE_FLOX

pycmor.core.config._parse_bool

PYCMOR_ENABLE_OUTPUT_SUBDIRS

pycmor.core.config._parse_bool

PYCMOR_FILE_TIMESPAN

str

PYCMOR_PARALLEL

pycmor.core.config._parse_bool

PYCMOR_PARALLEL_BACKEND

str

PYCMOR_PIPELINE_WORKFLOW_ORCHESTRATOR

<ChoiceOf(str, [‘native’, ‘prefect’])>

PYCMOR_PREFECT_TASK_RUNNER

<ChoiceOf(str, [‘thread_pool’, ‘dask’])>

PYCMOR_QUIET

pycmor.core.config._parse_bool

PYCMOR_RAISE_ON_NO_RULE

pycmor.core.config._parse_bool

PYCMOR_WARN_ON_NO_RULE

pycmor.core.config._parse_bool

PYCMOR_XARRAY_DEFAULT_MISSING_VALUE

float

PYCMOR_XARRAY_ENGINE

<ChoiceOf(str, [‘netcdf4’, ‘h5netcdf’, ‘zarr’])>

PYCMOR_XARRAY_SKIP_UNIT_ATTR_FROM_DRV

pycmor.core.config._parse_bool

PYCMOR_XARRAY_TIME_DTYPE

<ChoiceOf(str, [‘float64’, ‘datetime64[ns]’])>

PYCMOR_XARRAY_TIME_ENABLE_SET_AXIS

pycmor.core.config._parse_bool

PYCMOR_XARRAY_TIME_REMOVE_FILL_VALUE_ATTR

pycmor.core.config._parse_bool

PYCMOR_XARRAY_TIME_SET_LONG_NAME

pycmor.core.config._parse_bool

PYCMOR_XARRAY_TIME_SET_STANDARD_NAME

pycmor.core.config._parse_bool

PYCMOR_XARRAY_TIME_TAXIS_STR

str

PYCMOR_XARRAY_TIME_UNLIMITED

pycmor.core.config._parse_bool

Configuration options:

PYCMOR_DASK_CLUSTER#
Parser:

<ChoiceOf(str, [‘local’, ‘slurm’])>

Default:

“local”

Required:

No

Dask cluster to use. See: https://docs.dask.org/en/stable/deploying.html

PYCMOR_DASK_CLUSTER_SCALING_FIXED_JOBS#
Parser:

int

Default:

“5”

Required:

No

Number of jobs to create for Jobqueue-backed Dask Cluster

PYCMOR_DASK_CLUSTER_SCALING_MAXIMUM_JOBS#
Parser:

int

Default:

“10”

Required:

No

Maximum number of jobs to create for Jobqueue-backed Dask Clusters (adaptive)

PYCMOR_DASK_CLUSTER_SCALING_MINIMUM_JOBS#
Parser:

int

Default:

“1”

Required:

No

Minimum number of jobs to create for Jobqueue-backed Dask Clusters (adaptive)

PYCMOR_DASK_CLUSTER_SCALING_MODE#
Parser:

<ChoiceOf(str, [‘adapt’, ‘fixed’])>

Default:

“adapt”

Required:

No

Flexible dask cluster scaling

PYCMOR_DIMENSIONLESS_MAPPING_TABLE#
Parser:

str

Default:

“/home/docs/checkouts/readthedocs.org/user_builds/pymor/checkouts/stable/src/pycmor/data/dimensionless_mappings.yaml”

Required:

No

Where the dimensionless unit mapping table is defined.

PYCMOR_ENABLE_DASK#
Parser:

pycmor.core.config._parse_bool

Default:

“yes”

Required:

No

Whether to enable Dask-based processing

PYCMOR_ENABLE_FLOX#
Parser:

pycmor.core.config._parse_bool

Default:

“yes”

Required:

No

Whether to enable flox for group-by operation. See: https://flox.readthedocs.io/en/latest/

PYCMOR_ENABLE_OUTPUT_SUBDIRS#
Parser:

pycmor.core.config._parse_bool

Default:

“no”

Required:

No

Whether to create subdirectories under output_dir when saving data-sets.

PYCMOR_FILE_TIMESPAN#
Parser:

str

Default:

“1YS”

Required:

No

Default timespan for grouping output files together.

Use the special flag 'file_native' to use the same grouping as in the input files. Otherwise, use a pandas-flavoured string, see: https://tinyurl.com/38wxf8px

PYCMOR_PARALLEL#
Parser:

pycmor.core.config._parse_bool

Default:

“yes”

Required:

No

Whether to run in parallel.

PYCMOR_PARALLEL_BACKEND#
Parser:

str

Default:

“dask”

Required:

No

Which parallel backend to use.

PYCMOR_PIPELINE_WORKFLOW_ORCHESTRATOR#
Parser:

<ChoiceOf(str, [‘native’, ‘prefect’])>

Default:

“prefect”

Required:

No

Which workflow orchestrator to use for running pipelines

PYCMOR_PREFECT_TASK_RUNNER#
Parser:

<ChoiceOf(str, [‘thread_pool’, ‘dask’])>

Default:

“thread_pool”

Required:

No

Which runner to use for Prefect flows.

PYCMOR_QUIET#
Parser:

pycmor.core.config._parse_bool

Default:

“False”

Required:

No

Whether to suppress output.

PYCMOR_RAISE_ON_NO_RULE#
Parser:

pycmor.core.config._parse_bool

Default:

“no”

Required:

No

Whether or not to raise an error if no rule is found for every single DataRequestVariable

PYCMOR_WARN_ON_NO_RULE#
Parser:

pycmor.core.config._parse_bool

Default:

“yes”

Required:

No

Whether or not to issue a warning if no rule is found for every single DataRequestVariable

PYCMOR_XARRAY_DEFAULT_MISSING_VALUE#
Parser:

float

Default:

“1e+30”

Required:

No

Which missing value to use for xarray. Default is 1e30.

PYCMOR_XARRAY_ENGINE#
Parser:

<ChoiceOf(str, [‘netcdf4’, ‘h5netcdf’, ‘zarr’])>

Default:

“netcdf4”

Required:

No

Which engine to use for xarray.

PYCMOR_XARRAY_SKIP_UNIT_ATTR_FROM_DRV#
Parser:

pycmor.core.config._parse_bool

Default:

“yes”

Required:

No

Whether to skip setting the unit attribute from the DataRequestVariable, this can be handled via Pint

PYCMOR_XARRAY_TIME_DTYPE#
Parser:

<ChoiceOf(str, [‘float64’, ‘datetime64[ns]’])>

Default:

“float64”

Required:

No

The dtype to use for time axis in xarray.

PYCMOR_XARRAY_TIME_ENABLE_SET_AXIS#
Parser:

pycmor.core.config._parse_bool

Default:

“yes”

Required:

No

Whether to enable setting the axis for the time axis in xarray.

PYCMOR_XARRAY_TIME_REMOVE_FILL_VALUE_ATTR#
Parser:

pycmor.core.config._parse_bool

Default:

“yes”

Required:

No

Whether to remove the fill_value attribute from the time axis in xarray.

PYCMOR_XARRAY_TIME_SET_LONG_NAME#
Parser:

pycmor.core.config._parse_bool

Default:

“yes”

Required:

No

Whether to set the long name for the time axis in xarray.

PYCMOR_XARRAY_TIME_SET_STANDARD_NAME#
Parser:

pycmor.core.config._parse_bool

Default:

“yes”

Required:

No

Whether to set the standard name for the time axis in xarray.

PYCMOR_XARRAY_TIME_TAXIS_STR#
Parser:

str

Default:

“T”

Required:

No

Which axis to set for the time axis in xarray.

PYCMOR_XARRAY_TIME_UNLIMITED#
Parser:

pycmor.core.config._parse_bool

Default:

“yes”

Required:

No

Whether the time axis is unlimited in xarray.

Usage#

Here are some examples of how to use the configuration manager:

>>> pycmor_cfg = {}
>>> config = PycmorConfigManager.from_pycmor_cfg(pycmor_cfg)

>>> engine = config("xarray_engine")
>>> print(f"Using xarray backend: {engine}")
Using xarray backend: netcdf4

>>> parallel = config("parallel")
>>> print(f"Running in parallel: {parallel}")
Running in parallel: True

You can define a user file at ${XDG_CONFIG_DIR}/pycmor/pycmor.yaml:

>>> import pathlib
>>> import yaml
>>> cfg_file = pathlib.Path("~/.config/pycmor/pycmor.yaml").expanduser()
>>> cfg_file.parent.mkdir(parents=True, exist_ok=True)
>>> cfg_to_dump = {"xarray_engine": "zarr"}
>>> with open(cfg_file, "w") as f:
...     yaml.dump(cfg_to_dump, f)
>>> config = PycmorConfigManager.from_pycmor_cfg()
>>> engine = config("xarray_engine")
>>> print(f"Using xarray backend: {engine}")
Using xarray backend: zarr

See also

-

class pycmor.core.config.PycmorConfig[source]#

Bases: object

class Config[source]#

Bases: object

dask_cluster = <everett.manager.Option object>#
dask_cluster_scaling_fixed_jobs = <everett.manager.Option object>#
dask_cluster_scaling_maximum_jobs = <everett.manager.Option object>#
dask_cluster_scaling_minimum_jobs = <everett.manager.Option object>#
dask_cluster_scaling_mode = <everett.manager.Option object>#
dimensionless_mapping_table = <everett.manager.Option object>#
enable_dask = <everett.manager.Option object>#
enable_flox = <everett.manager.Option object>#
enable_output_subdirs = <everett.manager.Option object>#
file_timespan = <everett.manager.Option object>#
parallel = <everett.manager.Option object>#
parallel_backend = <everett.manager.Option object>#
pipeline_workflow_orchestrator = <everett.manager.Option object>#
prefect_task_runner = <everett.manager.Option object>#
quiet = <everett.manager.Option object>#
raise_on_no_rule = <everett.manager.Option object>#
warn_on_no_rule = <everett.manager.Option object>#
xarray_default_missing_value = <everett.manager.Option object>#
xarray_engine = <everett.manager.Option object>#
xarray_skip_unit_attr_from_drv = <everett.manager.Option object>#
xarray_time_dtype = <everett.manager.Option object>#
xarray_time_enable_set_axis = <everett.manager.Option object>#
xarray_time_remove_fill_value_attr = <everett.manager.Option object>#
xarray_time_set_long_name = <everett.manager.Option object>#
xarray_time_set_standard_name = <everett.manager.Option object>#
xarray_time_taxis_str = <everett.manager.Option object>#
xarray_time_unlimited = <everett.manager.Option object>#
class pycmor.core.config.PycmorConfigManager(environments: list[~typing.Any], doc: str = '', msg_builder: ~typing.Callable = <function build_msg>, with_override: bool = True)[source]#

Bases: ConfigManager

Custom ConfigManager for Pycmor, with a predefined hierarchy and support for injecting run-specific configuration.

_CONFIG_FILES = ['/home/docs/.config/pycmor.yaml', '/home/docs/.config/pycmor/pycmor.yaml', '/home/docs/.pycmor.yaml', '/home/docs/.config/pymor.yaml', '/home/docs/.config/pymor/pymor.yaml', '/home/docs/.pymor.yaml']#

The list of configuration files to check for user configuration.

Type:

List[str]

_XDG_CONFIG_HOME = '~/.config'#

The XDG configuration directory.

Type:

str

clone()[source]#
classmethod from_pycmor_cfg(run_specific_cfg=None)[source]#

Create a PycmorConfigManager with the appropriate hierarchy.

Parameters:

run_specific_cfg (dict) – Optional. Overrides specific values for this run.

classmethod from_pymor_cfg(run_specific_cfg=None)#
get(key, default=None, parser=None)[source]#

Get a configuration value by key, with a default value.

Parameters:
  • key (str) – The configuration key to get.

  • default (Any) – The default value to return if the key is not found.

  • parser (Callable) – Optional. A callable to parse the configuration value.

Returns:

The configuration value.

Return type:

Any

pycmor.core.config.PymorConfig#

alias of PycmorConfig

pycmor.core.config.PymorConfigManager#

alias of PycmorConfigManager

pycmor.core.config._parse_bool(value)[source]#

pycmor.core.controlled_vocabularies module#

Controlled vocabularies for CMIP6

class pycmor.core.controlled_vocabularies.CMIP6ControlledVocabularies(json_files)[source]#

Bases: ControlledVocabularies

Controlled vocabularies for CMIP6

_registry = {}#
static dict_from_json_file(path)[source]#

Load a json file into a dictionary object

Parameters:

path (str) – Path to the json file to load

Raises:

ValueError – If the file cannot be loaded

classmethod from_directory(directory)[source]#

Create a new ControlledVocabularies object from a directory of json files

Parameters:

directory (str) – Path to the directory containing the json files

classmethod load(table_dir=None)[source]#

Load the controlled vocabularies from the CMIP6_CVs directory

classmethod load_from_git(tag: str = '6.2.58.64')[source]#

Load the controlled vocabularies from the git repository

Parameters:

tag (str) – The git tag to use. Default is 6.2.58.64 If tag is None, the main branch is used.

Returns:

A new ControlledVocabularies object, behaves like a dictionary.

Return type:

ControlledVocabularies

print_experiment_ids()[source]#

Print experiment ids with start and end years and parent experiment ids

class pycmor.core.controlled_vocabularies.CMIP7ControlledVocabularies[source]#

Bases: ControlledVocabularies

_registry = {}#
class pycmor.core.controlled_vocabularies.ControlledVocabularies[source]#

Bases: dict

_registry = {'CMIP6': <class 'pycmor.core.controlled_vocabularies.CMIP6ControlledVocabularies'>, 'CMIP7': <class 'pycmor.core.controlled_vocabularies.CMIP7ControlledVocabularies'>}#
classmethod from_directory(directory: str) ControlledVocabularies[source]#

Create ControlledVocabularies from a directory of CV files

classmethod load(table_dir: str) ControlledVocabularies[source]#

Load the ControlledVocabularies using the default method

classmethod load_from_git(tag: str) ControlledVocabularies[source]#

Load the ControlledVocabularies from the git repository

pycmor.core.externals module#

pycmor.core.factory module#

class pycmor.core.factory.MetaFactory(name, bases, class_dict)[source]#

Bases: type

_registry = {}#
pycmor.core.factory.create_factory(klass)[source]#

Factory factory

pycmor.core.filecache module#

This module contains functions for creating, loading and manipulating a file cache.

The file cache is a CSV file that contains a pandas DataFrame with the following columns:

  • variable: The name of the variable in the file.

  • freq: The frequency of the variable in the file.

  • start: The start time of the variable in the file.

  • end: The end time of the variable in the file.

  • timespan: The timespan of the variable in the file.

  • steps: The number of time steps in the variable in the file.

  • units: The units of the variable in the file.

  • filename: The filename of the file.

  • filesize: The file size of the file in bytes.

  • mtime: The last modified time of the file in seconds since the epoch.

  • checksum: The imohash checksum of the file.

  • filepath: The absolute path to the file.

The file cache can be used to quickly select files from the cache that have a specific variable, frequency, start date, end date, timespan, number of time steps, units, filename, file size, last modified time, checksum, or absolute path.

The file cache is stored in the following location by default:

$HOME/.config/pymor_filecache.csv

The file cache can be loaded and saved using the following functions:

>>> from pycmor.core.filecache import Filecache
>>> cache = Filecache.load()
>>> cache.save()

Collect metadata about the file(s) by adding it to the cache with the following methods: cache.add_file or cache.add_files

>>> filepath = "tests/data/test_experiments/my_expid/outdata/fesom/volo.nc"
>>> cache.add_file(filepath)
>>> # adding multiple files at once
>>> cache.add_files(["tests/data/dummy_data/random1.nc", "tests/data/dummy_data/random2.nc"])

You can access the metadata of a file in the cache using the get method:

>>> filepath = "tests/data/test_experiments/my_expid/outdata/fesom/volo.nc"
>>> # alternative way of adding file to cache and getting the metadata is by usuig the `get` method
>>> cache.get(filepath)
filepath    tests/data/test_experiments/my_expid/outdata/f...
filename                                              volo.nc
checksum             imohash:c8047bbd7e292dbe54a6387611f500c4
filesize                                                  584
mtime                                                     ...
start                                     1951-01-02 00:00:00
end                                       1951-01-13 00:00:00
timespan                                     11 days, 0:00:00
freq                                                        D
steps                                                      12
variable                                                 volo
units                                                      m3
Name: 0, dtype: object

For an overview of the cached data, use summary method: This method returns a pandas DataFrame containing the summary each of the variables in the cache. The fields include the variable name, frequency, start date, end date, timespan, number of files in the collection for this variable.

>>> cache.summary()
variable                  seq                 volo
freq                        D                    D
start     0001-01-01 00:00:00  1951-01-02 00:00:00
end       0001-01-11 00:00:00  1951-01-13 00:00:00
timespan     10 days 00:00:00     11 days 00:00:00
nfiles                      2                    1
steps                      11                   12
size                     2120                  584

To use a subset of the collection for a given variable, use select_range method. This will limit the files in the cache to those that are within the given range.

class pycmor.core.filecache.Filecache(cache: DataFrame | None = None)[source]#

Bases: object

_add_file(filename: str) None[source]#

Internal method to add a file to the cache.

Only adds a file if no file with the same name already exists in the cache.

_fields = ['variable', 'freq', 'start', 'end', 'timespan', 'steps', 'units', 'filename', 'filesize', 'mtime', 'checksum', 'filepath']#
_infer_freq_from_directory(filename: str, ds: Dataset) str[source]#

Infer frequency by collecting time steps from all files with same variable in same directory. Optimized to avoid redundant file I/O and O(N²) behavior.

Parameters:
  • filename (str) – Path to the current file

  • ds (xr.Dataset) – The opened xarray dataset

Returns:

The inferred frequency, or None if unable to determine

Return type:

str or None

_infer_freq_from_file(filename: str, ds: Dataset, time_series: Series) str[source]#

Infer frequency from a file’s time steps, with fallback to multi-file approach.

Parameters:
  • filename (str) – Path to the file being processed

  • ds (xr.Dataset) – The opened xarray dataset

  • time_series (pd.Series) – The time coordinate as pandas Series

Returns:

The inferred frequency, or None if unable to determine

Return type:

str or None

_make_record(filename: str) Series[source]#

Internal method to create a record from a file.

Parameters:

filename (str) – The path to the file to create a record from.

Returns:

A pandas Series containing the metadata of the file.

Return type:

pd.Series

_update_freq_for_group(dirname: str, variable: str, freq: str) None[source]#

Update frequency for all files with same variable in same directory.

Parameters:
  • dirname (str) – Directory path

  • variable (str) – Variable name

  • freq (str) – Inferred frequency

add_file(filename: str) None[source]#

Add a file to the cache.

Only adds a file if no file with the same name already exists in the cache.

Parameters:

filename (str) – The path to the file to add.

add_files(files: List[str]) None[source]#

Add a list of files to the cache.

Only adds a file if no file with the same name already exists in the cache.

Parameters:

files (list of str) – List of paths to the files to add.

details() DataFrame[source]#
files(*, variable: str | None = None, fullpath: bool = True) List[str][source]#

Return the list of files in the cache.

Parameters:
  • variable (str, optional) – The variable to filter the results by.

  • fullpath (bool) – If True, return the full path to each file. If False, return the filename only.

Returns:

The list of files in the cache.

Return type:

list of str

frequency(*, filename: str | None = None, variable: str | None = None) str[source]#

Return the frequency of a variable or a file.

Parameters:
  • filename (str, optional) – The path to the file to get the frequency from.

  • variable (str, optional) – The variable to get the frequency from.

Returns:

The frequency of the variable or file.

Return type:

str

get(filename)[source]#

Return the record for the given filename from the cache.

Parameters:

filename (str) – The path to the file to get the record for.

Returns:

The record for the given filename from the cache.

Return type:

pd.DataFrame

Notes

If the filename is not in the cache and the file exists, it is added to the cache and the record is returned.

infer_freq(filename: str)[source]#
classmethod load()[source]#

Load the file cache from the default location.

Returns:

A pandas DataFrame containing the file cache.

Return type:

pd.DataFrame

save() None[source]#

Save the file cache to the default location.

select_range(*, start: str | Timestamp | None = None, end: str | Timestamp | None = None, variable: str | None = None) Filecache[source]#

Select the files in the cache that have a time range within the given start and end dates.

Parameters:
  • start (str or pd.Timestamp, optional) – The start date of the time range. If None, the start date of the first file is used.

  • end (str or pd.Timestamp, optional) – The end date of the time range. If None, the end date of the last file is used.

  • variable (str, optional) – The variable to filter the results by.

Returns:

A new Filecache object containing the selected files.

Return type:

Filecache

show_range(*, variable: str | None = None) DataFrame[source]#

Return the start and end dates of the cached files.

Parameters:

variable (str, optional) – The variable to filter the results by.

Returns:

A pandas DataFrame containing the start and end dates of the cached files.

Return type:

pd.DataFrame

summary(variable=None) DataFrame[source]#

Return a summary of the cached files.

Parameters:

None

Returns:

  • pd.DataFrame – A pandas DataFrame containing the summary of the cached files. The summary includes the following information: - freq: the frequency of the files (str) - start: the start date of the files (str) - end: the end date of the files (str) - timespan: the timespan of the files (str) - nfiles: the number of files (int) - steps: the number of steps in the files (int) - size: the total size of the files (int)

  • The summary is grouped by the variable name of the files.

validate_range(*, start: str | Timestamp | None = None, end: str | Timestamp | None = None, variable: str | None = None) bool[source]#

Validate the given time range.

Parameters:
  • start (str or pd.Timestamp, optional) – The start date of the time range. If None, the start date of the first file is used.

  • end (str or pd.Timestamp, optional) – The end date of the time range. If None, the end date of the last file is used.

  • variable (str, optional) – The variable to filter the results by.

Returns:

True if the given time range is valid, False otherwise.

Return type:

bool

Raises:

ValueError – If the given time range is out-of-bounds.

variables() List[str][source]#

Return a list of unique variable names in the cache.

Parameters:

None

Returns:

A list of unique variable names in the cache.

Return type:

list

pycmor.core.filecache._save()[source]#

Perform the save operation on the file cache.

This function is registered to execute at program exit using atexit.register. It triggers the save method of the fc object, which saves the file cache.

pycmor.core.filecache.register_cache(ds)[source]#

Register a dataset in the file cache. use this as a preprocessing step with ~xr.open_mfdataset.

Parameters:

ds (xarray.Dataset) – The dataset to register. The source filename is extracted from the dataset’s encoding and added to the cache.

Return type:

xr.Dataset

pycmor.core.frequency module#

This module defines the Frequency class and the TimeMethods Enum.

The Frequency class represents a frequency with a name, an approximate interval, and a time method. The TimeMethods Enum represents various time methods declared in CMIP.

Examples

Creating a Frequency instance:

>>> freq = Frequency("day", 1.0)
>>> print(freq.name)
day
>>> print(freq.approx_interval)
1.0
>>> print(freq.time_method)
TimeMethods.MEAN

Comparing two Frequency instances:

>>> freq1 = Frequency("day", 1.0)
>>> freq2 = Frequency("hr", 1.0/24)
>>> print(freq1 > freq2)
True

Getting a Frequency instance for a given name:

>>> freq = Frequency.for_name("day")
>>> print(freq.name)
day
pycmor.core.frequency.CMIP_FREQUENCIES = {'3hr': 0.125, '6hrLev': 0.25, '6hrPlev': 0.25, '6hrPlevPt': 0.25, 'AERday': 1.0, 'AERhr': 0.041666666666666664, 'AERmon': 30.0, 'AERmonZ': 30.0, 'Amon': 30.0, 'CF3hr': 0.125, 'CFday': 1.0, 'CFmon': 30.0, 'E3hr': 0.125, 'E3hrPt': 0.125, 'E6hrZ': 0.25, 'Eday': 1.0, 'EdayZ': 1.0, 'Emon': 30.0, 'EmonZ': 30.0, 'Eyr': 365.0, 'ImonAnt': 30.0, 'ImonGre': 30.0, 'IyrAnt': 365.0, 'IyrGre': 365.0, 'LImon': 30.0, 'Lmon': 30.0, 'Oclim': 30.0, 'Oday': 1.0, 'Odec': 3650.0, 'Omon': 30.0, 'Oyr': 365.0, 'SIday': 1.0, 'SImon': 30.0, 'day': 1.0}#

A dictionary mapping CMIP6 frequency names to the number of days in that frequency.

Type:

dict

class pycmor.core.frequency.Frequency(name, approx_interval, time_method=TimeMethods.MEAN)[source]#

Bases: object

Representation of a frequency.

name#

The name of the frequency.

Type:

str

approx_interval#

The approximate interval of the frequency.

Type:

float

time_method#

The time method of the frequency.

Type:

TimeMethods

classmethod for_name(n)[source]#

Get a Frequency instance for a given name.

Parameters:

n (str) – The name of the frequency.

Returns:

The Frequency instance for the given name.

Return type:

Frequency

Raises:

ValueError – If no Frequency instance can be determined for the given name.

class pycmor.core.frequency.TimeMethods(*values)[source]#

Bases: Enum

Various time methods declared in CMIP

CLIMATOLOGY = 'CLIMATOLOGY'#
INSTANTANEOUS = 'INSTANTANEOUS'#
MEAN = 'MEAN'#
NONE = 'NONE'#

pycmor.core.gather_inputs module#

Functionality for gathering possible inputs from a user directory

class pycmor.core.gather_inputs.InputFileCollection(path, pattern, frequency=None, time_dim_name=None)[source]#

Bases: object

property files#
classmethod from_dict(d)[source]#
pycmor.core.gather_inputs._PATTERN_ENV_VAR_NAME_ADDRS = ['/pycmor/pattern_env_var_name', '/pymor/pattern_env_var_name']#

Addresses in the YAML file for the env var name used for the pattern (new, legacy).

Type:

list[str]

pycmor.core.gather_inputs._PATTERN_ENV_VAR_NAME_DEFAULTS = ['PYCMOR_INPUT_PATTERN', 'PYMOR_INPUT_PATTERN']#

Defaults for env var name (new, legacy).

Type:

list[str]

pycmor.core.gather_inputs._PATTERN_ENV_VAR_VALUE_ADDRS = ['/pycmor/pattern_env_var_value', '/pymor/pattern_env_var_value']#

Addresses in the YAML file for the env var value (new, legacy).

Type:

list[str]

pycmor.core.gather_inputs._PATTERN_ENV_VAR_VALUE_DEFAULT = '.*'#

Default value for the environment variable’s value to be used if not set.

Type:

str

pycmor.core.gather_inputs._files_to_string(files: List[Path], sep=',') str[source]#

Converts a list of pathlib.Path objects to a string.

Parameters:
  • files (list) – A list of pathlib.Path objects.

  • sep (str) – The separator to use between the paths. Defaults to a comma.

Returns:

A string representation of the list of files.

Return type:

str

pycmor.core.gather_inputs._filter_by_year(files: List[Path], fpattern: Pattern, year_start: int, year_end: int) List[Path][source]#

Filters a list of files by the year in their name.

Parameters:
  • files (list of pathlib.Path) – A list of files to filter.

  • fpattern (re.Pattern) – The regular expression pattern to match the files.

  • year_start (int) – The start year to filter by.

  • year_end (int) – The end year to filter by.

pycmor.core.gather_inputs._input_files_in_path(path: Path, pattern: Pattern) list[source]#

Get a list of files in a directory that match a pattern.

This function takes a directory path and a regular expression pattern. It then returns a list of all files in the directory that match the pattern.

Parameters:
Returns:

A list of files in the directory that match the pattern.

Return type:

list

pycmor.core.gather_inputs._input_pattern_from_env(config: dict) Pattern[source]#

Get the input pattern from the environment variable.

This function retrieves the name of the environment variable from the configuration dictionary using the dpath library. It then gets the value of this environment variable, which is expected to be a regular expression pattern. This pattern is then compiled and returned.

Parameters:

config (dict) – The configuration dictionary. This dictionary should contain the keys pattern_env_var_name and pattern_env_value_default, which are used to locate the environment variable name and default value respectively. If not gives, these default Prefer PYCMOR_INPUT_PATTERN and .* respectively. Legacy PYMOR_INPUT_PATTERN is also supported.

Returns:

The compiled regular expression pattern.

Return type:

re.Pattern

Examples

>>> config_bare = { "pycmor": {} }
>>> config_only_env_name = {
...     "pycmor": {
...         'pattern_env_var_name': 'CMOR_PATTERN',
...     }
... }
>>> config_only_env_value = {
...     "pymor": {
...         'pattern_env_var_default': 'test*nc',
...   }
... }
>>> pattern = _input_pattern_from_env(config_bare)
>>> pattern
re.compile('.*')
>>> bool(pattern.match('test'))
True
>>> os.environ["CMOR_PATTERN"] = "test*nc"
>>> pattern = _input_pattern_from_env(config_only_env_name)
>>> pattern
re.compile('test*nc')
>>> bool(pattern.match('test'))
False
>>> del os.environ["CMOR_PATTERN"]
>>> pattern = _input_pattern_from_env(config_only_env_value)
>>> pattern
re.compile('.*')
>>> bool(pattern.match('test'))
True

Filters out symbolic links from a list of pathlib.Path objects.

Parameters:

files (list) – A list of pathlib.Path objects.

Returns:

A list of pathlib.Path objects excluding any symbolic links.

Return type:

list

Raises:

TypeError – If any element in the input list is not a pathlib.Path object.

Examples

>>> from pathlib import Path
>>> files = [Path('/path/to/file1'), Path('/path/to/file2')]
>>> paths = _resolve_symlinks(files)
>>> [str(p) for p in paths]  # Convert to strings for doctest
['/path/to/file1', '/path/to/file2']
pycmor.core.gather_inputs._sort_by_year(files: List[Path], fpattern: Pattern) List[Path][source]#

Sorts a list of files by the year in their name.

pycmor.core.gather_inputs._validate_rule_has_marked_regex(rule: dict, required_marks: List[str] = ['year']) bool[source]#

Validates that a rule has a marked regular expression.

This function takes a rule dictionary and a list of required marks. It then checks that the rule has a regular expression pattern that has been marked with all of the required marks.

Parameters:
  • rule (dict) – The rule dictionary.

  • required_marks (list) – A list of strings representing the required marks.

Returns:

True if the rule has a marked regular expression, False otherwise.

Return type:

bool

Examples

>>> rule = { 'pattern': 'test(?P<year>[0-9]{4})' }
>>> _validate_rule_has_marked_regex(rule)
True
>>> rule = { 'pattern': 'test' }
>>> _validate_rule_has_marked_regex(rule)
False
pycmor.core.gather_inputs.gather_inputs(config: dict) dict[source]#

Gather possible inputs from a user directory.

This function takes a configuration dictionary and returns a list of pathlib.Path objects representing the files in the directory that match the pattern specified in the configuration.

Parameters:

config (dict) – The configuration dictionary. This dictionary should contain the keys pattern_env_var_name and pattern_env_value_default, which are used to locate the environment variable name and default value respectively. If not gives, these default to PYMOR_INPUT_PATTERN and .* respectively.

Returns:

The configuration dictionary with the input files added.

Return type:

config

Deprecated since version Use: load_mfdataset in your pipeline instead!

pycmor.core.gather_inputs.load_mfdataset(data, rule_spec)[source]#

Load a dataset from a list of files using xarray.

Parameters:
  • data (Any) – Data in the pipeline flow thus far.

  • rule_spec (Rule) – Rule being handled

pycmor.core.infer_freq module#

class pycmor.core.infer_freq.DatasetFrequencyAccessor(ds)[source]#

Bases: object

check_resolution(target_approx_interval, time_dim=None, **kwargs)[source]#

Check if the time resolution is fine enough for resampling.

Parameters:
  • target_approx_interval (float) – Expected interval in days for the target frequency

  • time_dim (str, optional) – Name of the time dimension. If None, automatically detects the time dimension using get_time_label. Defaults to None.

  • **kwargs – Additional arguments passed to check_resolution.

Returns:

Dictionary containing the inferred interval, comparison status, and validity for resampling.

Return type:

dict

infer_frequency(time_dim=None, **kwargs)[source]#

Infer time frequency from datetime-like array, returning pandas-style frequency strings.

Parameters:
  • time_dim (str, optional) – Name of the time dimension in the Dataset. If None, automatically detects the time dimension using get_time_label. Defaults to None.

  • **kwargs – Additional arguments passed to infer_frequency.

Returns:

Inferred frequency string (e.g., ‘M’) or (freq, delta, step, is_exact, status) if return_metadata=True.

Return type:

str or FrequencyResult

resample_safe(target_approx_interval=None, freq_str=None, time_dim=None, calendar='standard', method='mean', tolerance=0.01, **resample_kwargs)[source]#

Safely resample dataset time series data after checking temporal resolution.

Users can specify the target frequency in two ways: 1. Provide target_approx_interval (float in days) - will be converted to freq_str 2. Provide freq_str (pandas frequency string) - used directly for resampling

If both are provided, freq_str takes precedence for resampling, and target_approx_interval is used for validation.

Parameters:
  • target_approx_interval (float, optional) – Expected interval in days for the target frequency. If provided without freq_str, this will be converted to an appropriate frequency string. If provided with freq_str, this is used for validation only.

  • freq_str (str, optional) – Target frequency string (e.g., ‘M’ for monthly, ‘3H’ for 3-hourly). If provided, this takes precedence for resampling operations.

  • time_dim (str, optional) – Name of the time dimension. If None, automatically detects the time dimension using get_time_label. Defaults to None.

  • calendar (str, optional) – Calendar type, by default “standard”

  • method (str or dict, optional) – Resampling method, by default “mean”

  • tolerance (float, optional) – Tolerance for time interval comparison, by default 0.01

  • **resample_kwargs – Additional arguments passed to xarray’s resample

Returns:

Resampled dataset

Return type:

xarray.Dataset

Raises:

ValueError – If neither target_approx_interval nor freq_str is provided, or if the time resolution is too coarse for the target frequency

Examples

# Using approximate interval (will be converted to frequency string) dataset.timefreq.resample_safe(target_approx_interval=30.0) # ~monthly

# Using frequency string directly dataset.timefreq.resample_safe(freq_str=’3M’) # 3-monthly

# Using both (freq_str used for resampling, target_approx_interval for validation) dataset.timefreq.resample_safe(target_approx_interval=90.0, freq_str=’3M’)

class pycmor.core.infer_freq.FrequencyResult(frequency, delta_days, step, is_exact, status)#

Bases: tuple

_asdict()#

Return a new dict which maps field names to their values.

_field_defaults = {}#
_fields = ('frequency', 'delta_days', 'step', 'is_exact', 'status')#
classmethod _make(iterable)#

Make a new FrequencyResult object from a sequence or iterable

_replace(**kwds)#

Return a new FrequencyResult object replacing specified fields with new values

delta_days#

Alias for field number 1

frequency#

Alias for field number 0

is_exact#

Alias for field number 3

status#

Alias for field number 4

step#

Alias for field number 2

class pycmor.core.infer_freq.TimeFrequencyAccessor(xarray_obj)[source]#

Bases: object

check_resolution(target_approx_interval, calendar='standard', strict=True, tolerance=0.01, log=True, time_dim=None)[source]#

Check if the time resolution is fine enough for resampling.

Parameters:
  • target_approx_interval (float) – Expected interval in days for the target frequency

  • calendar (str, optional) – Calendar type, by default “standard”

  • strict (bool, optional) – If True, performs additional checks for irregular time series and returns a status message. Defaults to True.

  • tolerance (float, optional) – Tolerance for time interval comparison, by default 0.01

  • log (bool, optional) – If True, logs the results of the frequency check. Defaults to True.

  • time_dim (str, optional) – Name of the time dimension. If None, automatically detects the time dimension using get_time_label. Defaults to None.

Returns:

Dictionary containing the inferred interval, comparison status, and validity for resampling.

Return type:

dict

infer_frequency(strict=False, calendar='standard', log=True, time_dim=None, return_metadata=True)[source]#

Infer time frequency from datetime-like array, returning pandas-style frequency strings.

Parameters:
  • strict (bool, optional) – If True, performs additional checks for irregular time series and returns a status message. Defaults to False.

  • calendar (str, optional) – Calendar type to use for cftime objects. Defaults to “standard”.

  • log (bool, optional) – If True, logs the results of the frequency check. Defaults to False.

  • time_dim (str, optional) – Name of the time dimension in the DataArray. If None, automatically detects the time dimension using get_time_label. Defaults to None.

  • return_metadata (bool, optional) – If True, returns (freq, delta, step, is_exact, status) instead of just the frequency string. Defaults to True.

Returns:

Inferred frequency string (e.g., ‘M’) or (freq, delta, step, is_exact, status) if return_metadata=True.

Return type:

str or FrequencyResult

resample_safe(target_approx_interval=None, freq_str=None, calendar='standard', method='mean', time_dim=None, tolerance=0.01, **resample_kwargs)[source]#

Safely resample time series data after checking temporal resolution.

Users can specify the target frequency in two ways: 1. Provide target_approx_interval (float in days) - will be converted to freq_str 2. Provide freq_str (pandas frequency string) - used directly for resampling

If both are provided, freq_str takes precedence for resampling, and target_approx_interval is used for validation.

Parameters:
  • target_approx_interval (float, optional) – Expected interval in days for the target frequency. If provided without freq_str, this will be converted to an appropriate frequency string. If provided with freq_str, this is used for validation only.

  • freq_str (str, optional) – Target frequency string (e.g., ‘M’ for monthly, ‘3H’ for 3-hourly). If provided, this takes precedence for resampling operations.

  • calendar (str, optional) – Calendar type, by default “standard”

  • method (str or dict, optional) – Resampling method, by default “mean”

  • time_dim (str, optional) – Name of the time dimension. If None, automatically detects the time dimension using get_time_label. Defaults to None.

  • tolerance (float, optional) – Tolerance for time interval comparison, by default 0.01

  • **resample_kwargs – Additional arguments passed to xarray’s resample

Returns:

Resampled data

Return type:

xarray.DataArray

Raises:

ValueError – If neither target_approx_interval nor freq_str is provided, or if the time resolution is too coarse for the target frequency

Examples

# Using approximate interval (will be converted to frequency string) data.timefreq.resample_safe(target_approx_interval=30.0) # ~monthly

# Using frequency string directly data.timefreq.resample_safe(freq_str=’3M’) # 3-monthly

# Using both (freq_str used for resampling, target_approx_interval for validation) data.timefreq.resample_safe(target_approx_interval=90.0, freq_str=’3M’)

pycmor.core.infer_freq._convert_cftime_to_ordinals(times_values)[source]#

Convert cftime objects to ordinal values.

pycmor.core.infer_freq._convert_numeric_timestamps_to_ordinals(times_values)[source]#

Convert numeric timestamps (e.g., numpy.datetime64) to ordinal values.

pycmor.core.infer_freq._convert_standard_datetime_to_ordinals(times_values)[source]#

Convert standard datetime objects to ordinal values.

pycmor.core.infer_freq._convert_times_to_ordinals(times_values)[source]#

Convert various datetime types to ordinal values for frequency analysis.

This function handles three main datetime types: 1. cftime objects (with calendar attribute) 2. Standard datetime objects (with toordinal method) 3. Numeric timestamps (numpy.datetime64, etc.)

Parameters:

times_values (array-like) – Array of datetime-like objects

Returns:

Array of ordinal values representing the datetime objects

Return type:

np.ndarray

pycmor.core.infer_freq._infer_frequency_core(times, tol=0.05, return_metadata=False, strict=False, calendar='standard', log=False)[source]#

Infer time frequency from datetime-like array, returning pandas-style frequency strings.

Parameters:
  • times (array-like) – List of datetime-like objects (cftime or datetime64).

  • tol (float, optional) – Tolerance for delta comparisons (in days). Defaults to 0.05.

  • return_metadata (bool, optional) – If True, returns (frequency, median_delta, step, is_exact, status) instead of just the frequency string. Defaults to False.

  • strict (bool, optional) – If True, performs additional checks for irregular time series and returns a status message. Defaults to False.

  • calendar (str, optional) – Calendar type to use for cftime objects. Defaults to “standard”.

  • log (bool, optional) – If True, logs the results of the frequency check. Defaults to False.

Returns:

Inferred frequency string (e.g., ‘M’) or (freq, delta, step, is_exact, status) if return_metadata=True.

Return type:

str or FrequencyResult

pycmor.core.infer_freq.approx_interval_to_frequency_str(approx_interval, tolerance=0.1)[source]#

Convert an approximate interval in days to a pandas-style frequency string.

This function uses algorithmic logic to determine the most appropriate frequency string based on common time patterns, rather than hardcoded mappings. It handles sub-daily, daily, weekly, monthly, and yearly frequencies intelligently.

Parameters:
  • approx_interval (float) – Approximate interval in days

  • tolerance (float, optional) – Relative tolerance for matching standard frequencies, by default 0.1 (10%)

Returns:

Pandas-style frequency string (e.g., ‘D’, ‘M’, ‘3M’, ‘Y’) or None for time-invariant data (0.0 days)

Return type:

str or None

Examples

>>> approx_interval_to_frequency_str(1.0)  # Daily
'D'
>>> approx_interval_to_frequency_str(30.0)  # Monthly
'M'
>>> approx_interval_to_frequency_str(91.3)  # 3-Monthly (approx)
'3M'
>>> approx_interval_to_frequency_str(365.0)  # Yearly
'Y'
>>> approx_interval_to_frequency_str(0.041667)  # Hourly
'H'
pycmor.core.infer_freq.infer_frequency(times, return_metadata=False, strict=False, calendar='standard', log=False)[source]#

Infer time frequency from datetime-like array, returning pandas-style frequency strings.

Parameters:
  • times (array-like) – List of datetime-like objects (cftime or datetime64).

  • return_metadata (bool, optional) – If True, returns (frequency, median_delta, step, is_exact, status) instead of just the frequency string. Defaults to False.

  • strict (bool, optional) – If True, performs additional checks for irregular time series and returns a status message. Defaults to False.

  • calendar (str, optional) – Calendar type to use for cftime objects. Defaults to “standard”.

  • log (bool, optional) – If True, logs the results of the frequency check. Defaults to False.

Returns:

Inferred frequency string (e.g., ‘M’) or (freq, delta, step, is_exact, status) if return_metadata=True.

Return type:

str or FrequencyResult

pycmor.core.infer_freq.is_resolution_fine_enough(times, target_approx_interval, calendar='standard', strict=True, tolerance=0.01, log=True)[source]#

Determines if the temporal resolution of a time series is sufficient for resampling.

Parameters:
  • times (list or array-like) – Array of datetime-like objects representing the time series.

  • target_approx_interval (float) – Expected interval in days for the target frequency.

  • calendar (str, optional) – Calendar type to use for cftime objects, by default “standard”.

  • strict (bool, optional) – If True, performs additional checks for irregular time series and includes status messages. Defaults to True.

  • tolerance (float, optional) – Tolerance for comparing time intervals. Defaults to 0.01.

  • log (bool, optional) – If True, logs the results of the frequency check. Defaults to True.

Returns:

Contains the inferred interval, comparison status, validity for resampling, and status message.

Return type:

dict

Notes

The function infers the frequency using infer_frequency and compares it against the target interval, considering the specified tolerance. The result includes a status indicating whether the time series is suitable for resampling.

pycmor.core.infer_freq.log_frequency_check(name, freq, delta, step, exact, status, strict=False)[source]#

Log the results of the frequency check.

pycmor.core.logging module#

pycmor.core.logging.add_report_logger()[source]#
pycmor.core.logging.add_to_report_log(func)[source]#

Decorator for logging to the report log

pycmor.core.logging.report_filter(record)[source]#

Checks if the record should be added to the report log or not

pycmor.core.logging.showwarning(message, *args, **kwargs)[source]#

Set up warnings to use logger

pycmor.core.pipeline module#

Pipeline of the data processing steps.

class pycmor.core.pipeline.DefaultPipeline(name='FrozenPipeline', **kwargs)[source]#

Bases: FrozenPipeline

The DefaultPipeline class is a subclass of the Pipeline class. It is designed to be a general-purpose pipeline for data processing. It includes steps for loading data and handling unit conversion. The specific steps are fixed and cannot be customized, only the name of the pipeline can be customized.

Parameters:

name (str, optional) – The name of the pipeline. If not provided, it defaults to “pycmor.pipeline.DefaultPipeline”.

NAME = 'pycmor.pipeline.DefaultPipeline'#
STEPS = ('pycmor.core.gather_inputs.load_mfdataset', 'pycmor.std_lib.generic.get_variable', 'pycmor.std_lib.timeaverage.timeavg', 'pycmor.std_lib.units.handle_unit_conversion', 'pycmor.std_lib.global_attributes.set_global_attributes', 'pycmor.std_lib.variable_attributes.set_variable_attributes', 'pycmor.core.caching.manual_checkpoint', 'pycmor.std_lib.generic.trigger_compute', 'pycmor.std_lib.generic.show_data', 'pycmor.std_lib.files.save_dataset')#
class pycmor.core.pipeline.FrozenPipeline(name='FrozenPipeline', **kwargs)[source]#

Bases: Pipeline

The FrozenPipeline class is a subclass of the Pipeline class. It is designed to have a fixed set of steps that cannot be modified, hence the term “frozen”. The specific steps are defined as a class-level constant and cannot be customized, only the name of the pipeline can be customized.

Parameters:
  • *args – Variable length argument list. Not used in this class, but included for compatibility with parent.

  • name (str, optional) – The name of the pipeline. If not provided, it defaults to None.

STEPS#

A tuple containing the steps of the pipeline. This is a class-level attribute and cannot be modified.

Type:

tuple

NAME = 'FrozenPipeline'#
STEPS = ()#
property steps#
class pycmor.core.pipeline.Pipeline(*args, name=None, workflow_backend=None, cache_policy=None, dask_cluster=None, cache_expiration=None)[source]#

Bases: object

_prefectize_steps()[source]#
_run_native(data, rule_spec)[source]#
_run_prefect(data, rule_spec)[source]#
assign_cluster(cluster)[source]#
classmethod from_callable_strings(step_strings: list, name=None, **kwargs)[source]#
classmethod from_dict(data)[source]#
classmethod from_list(steps, name=None, **kwargs)[source]#
classmethod from_qualname_list(qualnames: list, name=None, **kwargs)[source]#
static on_completion(flow, flowrun, state)[source]#
static on_failure(flow, flowrun, state)[source]#
run(data, rule_spec)[source]#
property steps#
class pycmor.core.pipeline.TestingPipeline(name='FrozenPipeline', **kwargs)[source]#

Bases: FrozenPipeline

The TestingPipeline class is a subclass of the Pipeline class. It is designed for testing purposes. It includes steps for loading data fake data, performing a logic step, and saving data. The specific steps are fixed and cannot be customized, only the name of the pipeline can be customized.

Parameters:

name (str, optional) – The name of the pipeline. If not provided, it defaults to “pycmor.pipeline.TestingPipeline”.

Warning

An internet connection is required to run this pipeline, as the load_data step fetches data from the internet.

NAME = 'pycmor.pipeline.TestingPipeline'#
STEPS = ('pycmor.std_lib.generic.dummy_load_data', 'pycmor.std_lib.generic.dummy_logic_step', 'pycmor.std_lib.generic.dummy_save_data')#

pycmor.core.plugins module#

pycmor.core.rule module#

class pycmor.core.rule.Rule(*, name: str = None, inputs: List[dict] = None, cmor_variable: str, pipelines: List[Pipeline] = None, tables: List[DataRequestTable] = None, data_request_variables: List[DataRequestVariable] = None, **kwargs)[source]#

Bases: object

add_data_request_variable(drv)[source]#

Add a data request variable to the rule.

add_input(inp_dict)[source]#

Add an input collection to the rule.

add_table(tbl)[source]#

Add a table to the rule

clone()[source]#

Creates a copy of this rule object as it is currently configured.

create_global_attributes(GlobalAttributesClass)[source]#
depluralize_drvs()[source]#

Depluralizes Data Request Variables to just a single entry

expand_drvs()[source]#

Depluralize the rule by creating a new rule for each DataRequestVariable.

This method clones the current rule object for each DataRequestVariable (drv) it contains. For each cloned rule, it also clones the corresponding drv and sets its tables, frequencies, cell_methods, and cell_measures attributes to the individual elements from the original drv. The cloned drv is then set as the only drv of the cloned rule. The method returns a list of all these cloned rules.

Returns:

A list of cloned rule objects, each containing a single DataRequestVariable.

Return type:

list

classmethod from_dict(data)[source]#

Build a rule object from a dictionary

The dictionary should have the following keys: “inputs”, “cmor_variable”, “pipelines”. Note that the "inputs" key should contain a list of dictionaries that can be used to build InputFileCollection objects. The "pipelines" key should contain a list of dictionaries that can be used to build Pipeline objects, and the cmor_variable is just a string.

Parameters:

data (dict) – A dictionary containing the rule data.

classmethod from_yaml(yaml_str)[source]#

Wrapper around from_dict for initializing from YAML

get(key, default=None)[source]#

Gets an attribute from the Rule object

Useful for passing the Rule object to other functions that may not know the current structure, e.g. when calling Pipeline steps.

Parameters:
  • key (str) – The name of the attribute to get.

  • default (Any, optional) – The value to return if the attribute does not exist.

Returns:

value – The value of the attribute, or the default value if the attribute does not exist.

Return type:

Any

global_attributes_set_on_rule()[source]#
property input_patterns#

Return a list of compiled regex patterns for the input files.

match_pipelines(pipelines, force=False)[source]#

Match the pipelines in the rule with the pipelines in the configuration. The pipelines should be a list of pipeline instances that can be matched with the rule’s required pipelines.

Parameters:
  • list (list of pipeline.Pipeline) – Available pipelines to use

  • force (bool, optional) – If True, the pipelines will be remapped even if they were already mapped.

Mutates:

self.pipelines (list of str –> list of pipeline.Pipeline objects) – self.pipelines will be replaced from a list of strings to a list of Pipeline objects. The order of the pipelines will be preserved.

remove_data_request_variable(drv)[source]#

Remove a data request variable from the rule.

remove_table(tbl)[source]#

Remove a table from the rule

set(key, value, force=False, warn=True)[source]#

Set a new attribute for the object.

Parameters:
  • key (str) – The name of the attribute to set.

  • value (Any) – The value to set for the attribute.

  • force (bool, optional) – If True, the attribute will be overwritten if it already exists. If False (default), an AttributeError will be raised if the attribute already exists.

  • warn (bool, optional) – If True (default) a warning will be issued if the attribute already exists, and it will not be overwritten. If False, an AttributeError will be raised if the attribute already exists.

Returns:

value – Returns the value appended to the object. This is the same behaviour as setattr.

Return type:

Any

Raises:

AttributeError – If the attribute already exists and force and warn are both False.

pycmor.core.ssh_tunnel module#

pycmor.core.time_utils module#

Time-related utility functions for working with xarray datasets and coordinates.

This module provides utilities for: - Detecting datetime types in arrays - Finding time coordinates in xarray objects - Checking for time axes in datasets

pycmor.core.time_utils.get_time_label(ds)[source]#

Determines the name of the coordinate in the dataset that can serve as a time label.

Parameters:

ds (xarray.Dataset) – The dataset containing coordinates to check for a time label.

Returns:

The name of the coordinate that is a datetime type and can serve as a time label, or None if no such coordinate is found.

Return type:

str or None

Example

>>> import xarray as xr
>>> import pandas as pd
>>> import numpy as np
>>> ds = xr.Dataset(
...     {'temperature': (['time'], [20, 21, 22])},
...     coords={'time': pd.date_range('2000-01-01', periods=3)}
... )
>>> get_time_label(ds)
'time'
>>> da = xr.DataArray(np.ones(3), coords={'T': ('T', pd.date_range('2000-01-01', periods=3))})
>>> get_time_label(da)
'T'
>>> # The following does not have a valid time coordinate, expected to return None
>>> ds_no_time = xr.Dataset({'temperature': (['x'], [20, 21, 22])}, coords={'x': [1, 2, 3]})
>>> get_time_label(ds_no_time) is None
True
pycmor.core.time_utils.has_time_axis(ds) bool[source]#

Checks if the given dataset has a time axis.

Parameters:

ds (xarray.Dataset or xarray.DataArray) – The dataset to check for a time axis.

Returns:

True if the dataset has a time axis, False otherwise.

Return type:

bool

pycmor.core.time_utils.is_cftime_type(arr: ndarray) bool[source]#

Checks if array elements are cftime objects

pycmor.core.time_utils.is_datetime_type(arr: ndarray) bool[source]#

Checks if array elements are datetime objects or cftime objects

pycmor.core.utils module#

Various utility functions needed around the package

pycmor.core.utils.can_be_partialized(func: callable, open_arg: str, arg_list: list, kwargs_dict: dict) bool[source]#

Checks if a function can be reasonably partialized with a single argument open.

Parameters:
  • func (callable) – The function to be partially applied.

  • open_arg (str) – The name of the argument that should remain open in the partial function.

  • arg_list (list) – The list of arguments that will be passed to the partial function.

  • kwargs_dict (dict) – The dictionary of keyword arguments that will be passed to the partial function.

Returns:

True if the function can be partially applied with a single argument open, False otherwise.

Return type:

bool

pycmor.core.utils.download_json_tables_from_url(url: str, filenames: list)[source]#

Downloads JSON tables from a raw git URL

Parameters:

url (str) – The URL to download the JSON tables from.

Returns:

The directory where the JSON tables were downloaded.

Return type:

str

pycmor.core.utils.generate_partial_function(func: callable, open_arg: str, *args, **kwargs)[source]#

Reduces func to a partial function by fixing all but the argument named by open_arg.

Parameters:
  • func (callable) – The function to be partially applied.

  • open_arg (str) – The name of the argument that should remain open in the partial function.

  • *args – Positional arguments to be passed to the partial function.

  • **kwargs – Keyword arguments to be passed to the partial function.

Returns:

The partial function with the specified arguments fixed.

Return type:

callable

pycmor.core.utils.get_callable(name)[source]#

Get a callable from a string First, tries standard import, then tries entry points, then from script

pycmor.core.utils.get_callable_by_name(name)[source]#

Get a callable by its name.

This function takes a string that represents the fully qualified name of a callable object (i.e., a function or a method), and returns the actual callable object. The name should be in the format ‘module.submodule.callable’. If the callable does not exist, this function will raise an AttributeError.

Parameters:

name (str) – The fully qualified name of the callable to be retrieved. It should be in the format ‘module.submodule.callable’.

Returns:

The callable object that corresponds to the given name.

Return type:

callable

Raises:
  • ImportError – If the module or submodule specified in the name does not exist.

  • AttributeError – If the callable specified in the name does not exist in the given module or submodule.

pycmor.core.utils.get_callable_by_script(step_signature)[source]#
pycmor.core.utils.get_entrypoint_by_name(name, group='pycmor.steps')[source]#

Get an entry point by its name.

This function takes a string that represents the name of an entry point in a given group, and returns the actual entry point object. If the entry point does not exist, this function will raise a ValueError.

Parameters:
  • name (str) – The name of the entry point to be retrieved.

  • group (str) – The group that the entry point belongs to.

Returns:

The entry point object that corresponds to the given name.

Return type:

EntryPoint

Raises:

ValueError – If the entry point specified by the name does not exist in the given group.

pycmor.core.utils.get_function_from_script(script_path: str, function_name: str)[source]#

Get a function from a Python script.

This function takes the path to a Python script and the name of a function defined in that script, and returns the actual function object. If the script does not exist or the function is not defined in the script, this function will raise an ImportError.

Parameters:
  • script_path (str) – The path to the Python script where the function is defined.

  • function_name (str) – The name of the function to be retrieved.

Returns:

The function object that corresponds to the given name in the specified script.

Return type:

callable

Raises:

ImportError – If the script does not exist or the function is not defined in the script.

pycmor.core.utils.git_url_to_api_url(git_url, path='', branch='main')[source]#

Convert a GitHub URL to the GitHub API URL for accessing directory contents.

Parameters:
  • git_url (str) – the original GitHub repository URL.

  • path (str) – the path to the directory within the repository (default: “”).

  • branch (str) – the branch or commit hash to target (default: main).

Returns:

the API URL.

Return type:

str

pycmor.core.utils.list_files_in_directory(git_url, directory_path, branch='main')[source]#

Get a list of file names in a directory from a GitHub repository.

Parameters: - git_url: str, the GitHub repository URL. - directory_path: str, the path to the directory in the repository. - branch: str, the branch or commit hash to target (default: main).

Returns: - list of str, filenames in the directory.

pycmor.core.utils.wait_for_workers(client, n_workers, timeout=600)[source]#

Wait for a specific number of workers to be available.

Args: client (distributed.Client): The Dask client n_workers (int): The number of workers to wait for timeout (int): Maximum time to wait in seconds

Returns: bool: True if the required number of workers are available, False if timeout occurred

pycmor.core.validate module#

Provides validation of user configuration files by checking against a schema.

class pycmor.core.validate.DirectoryAwareValidator(*args, **kwargs)[source]#

Bases: Validator

A Validator that can check if a field is a directory.

_types_from_methods = ()#
_validate_is_directory(is_directory, field, value)[source]#

Checks if a string can be a pathlib.Path object.

The rule’s arguments are validated against this schema: {‘type’: ‘boolean’}

checkers = ()#
coercers = ()#
default_setters = ()#
normalization_rules = {'coerce': {'oneof': [{'type': 'callable'}, {'schema': {'oneof': [{'type': 'callable'}, {'allowed': (), 'type': 'string'}]}, 'type': 'list'}, {'allowed': (), 'type': 'string'}]}, 'default': {'nullable': True}, 'default_setter': {'oneof': [{'type': 'callable'}, {'allowed': (), 'type': 'string'}]}, 'purge_unknown': {'type': 'boolean'}, 'rename': {'type': 'hashable'}, 'rename_handler': {'oneof': [{'type': 'callable'}, {'schema': {'oneof': [{'type': 'callable'}, {'allowed': (), 'type': 'string'}]}, 'type': 'list'}, {'allowed': (), 'type': 'string'}]}}#
rules = {'allof': {'logical': 'allof', 'type': 'list'}, 'allow_unknown': {'oneof': [{'type': 'boolean'}, {'check_with': 'bulk_schema', 'type': ['dict', 'string']}]}, 'allowed': {'type': 'container'}, 'anyof': {'logical': 'anyof', 'type': 'list'}, 'check_with': {'oneof': [{'type': 'callable'}, {'schema': {'oneof': [{'type': 'callable'}, {'allowed': (), 'type': 'string'}]}, 'type': 'list'}, {'allowed': (), 'type': 'string'}]}, 'coerce': {'oneof': [{'type': 'callable'}, {'schema': {'oneof': [{'type': 'callable'}, {'allowed': (), 'type': 'string'}]}, 'type': 'list'}, {'allowed': (), 'type': 'string'}]}, 'contains': {'empty': False}, 'default': {'nullable': True}, 'default_setter': {'oneof': [{'type': 'callable'}, {'allowed': (), 'type': 'string'}]}, 'dependencies': {'check_with': 'dependencies', 'type': ('dict', 'hashable', 'list')}, 'empty': {'type': 'boolean'}, 'excludes': {'schema': {'type': 'hashable'}, 'type': ('hashable', 'list')}, 'forbidden': {'type': 'list'}, 'is_directory': {'type': 'boolean'}, 'items': {'check_with': 'items', 'type': 'list'}, 'keysrules': {'check_with': 'bulk_schema', 'forbidden': ['rename', 'rename_handler'], 'type': ['dict', 'string']}, 'max': {'nullable': False}, 'maxlength': {'type': 'integer'}, 'meta': {}, 'min': {'nullable': False}, 'minlength': {'type': 'integer'}, 'noneof': {'logical': 'noneof', 'type': 'list'}, 'nullable': {'type': 'boolean'}, 'oneof': {'logical': 'oneof', 'type': 'list'}, 'purge_unknown': {'type': 'boolean'}, 'readonly': {'type': 'boolean'}, 'regex': {'type': 'string'}, 'rename': {'type': 'hashable'}, 'rename_handler': {'oneof': [{'type': 'callable'}, {'schema': {'oneof': [{'type': 'callable'}, {'allowed': (), 'type': 'string'}]}, 'type': 'list'}, {'allowed': (), 'type': 'string'}]}, 'require_all': {'type': 'boolean'}, 'required': {'type': 'boolean'}, 'schema': {'anyof': [{'check_with': 'schema'}, {'check_with': 'bulk_schema'}], 'type': ['dict', 'string']}, 'type': {'check_with': 'type', 'type': ['string', 'list']}, 'valuesrules': {'check_with': 'bulk_schema', 'forbidden': ['rename', 'rename_handler'], 'type': ['dict', 'string']}}#
validation_rules = {'allof': {'logical': 'allof', 'type': 'list'}, 'allow_unknown': {'oneof': [{'type': 'boolean'}, {'check_with': 'bulk_schema', 'type': ['dict', 'string']}]}, 'allowed': {'type': 'container'}, 'anyof': {'logical': 'anyof', 'type': 'list'}, 'check_with': {'oneof': [{'type': 'callable'}, {'schema': {'oneof': [{'type': 'callable'}, {'allowed': (), 'type': 'string'}]}, 'type': 'list'}, {'allowed': (), 'type': 'string'}]}, 'contains': {'empty': False}, 'dependencies': {'check_with': 'dependencies', 'type': ('dict', 'hashable', 'list')}, 'empty': {'type': 'boolean'}, 'excludes': {'schema': {'type': 'hashable'}, 'type': ('hashable', 'list')}, 'forbidden': {'type': 'list'}, 'is_directory': {'type': 'boolean'}, 'items': {'check_with': 'items', 'type': 'list'}, 'keysrules': {'check_with': 'bulk_schema', 'forbidden': ['rename', 'rename_handler'], 'type': ['dict', 'string']}, 'max': {'nullable': False}, 'maxlength': {'type': 'integer'}, 'meta': {}, 'min': {'nullable': False}, 'minlength': {'type': 'integer'}, 'noneof': {'logical': 'noneof', 'type': 'list'}, 'nullable': {'type': 'boolean'}, 'oneof': {'logical': 'oneof', 'type': 'list'}, 'readonly': {'type': 'boolean'}, 'regex': {'type': 'string'}, 'require_all': {'type': 'boolean'}, 'required': {'type': 'boolean'}, 'schema': {'anyof': [{'check_with': 'schema'}, {'check_with': 'bulk_schema'}], 'type': ['dict', 'string']}, 'type': {'check_with': 'type', 'type': ['string', 'list']}, 'valuesrules': {'check_with': 'bulk_schema', 'forbidden': ['rename', 'rename_handler'], 'type': ['dict', 'string']}}#
pycmor.core.validate.GENERAL_SCHEMA = {'general': {'allow_unknown': True, 'schema': {'CMIP_Tables_Dir': {'is_directory': True, 'required': True, 'type': 'string'}, 'CV_Dir': {'is_directory': True, 'required': True, 'type': 'string'}, 'cmor_version': {'allowed': ['CMIP6', 'CMIP7'], 'required': True, 'type': 'string'}}, 'type': 'dict'}}#

Schema for validating general configuration.

Type:

dict

pycmor.core.validate.GENERAL_VALIDATOR = <pycmor.core.validate.GeneralSectionValidator object>#

Validator for general configuration.

Type:

Validator

class pycmor.core.validate.GeneralSectionValidator(*args, **kwargs)[source]#

Bases: DirectoryAwareValidator

A Validator for the general section of the configuration file

_types_from_methods = ()#
checkers = ()#
coercers = ()#
default_setters = ()#
normalization_rules = {'coerce': {'oneof': [{'type': 'callable'}, {'schema': {'oneof': [{'type': 'callable'}, {'allowed': (), 'type': 'string'}]}, 'type': 'list'}, {'allowed': (), 'type': 'string'}]}, 'default': {'nullable': True}, 'default_setter': {'oneof': [{'type': 'callable'}, {'allowed': (), 'type': 'string'}]}, 'purge_unknown': {'type': 'boolean'}, 'rename': {'type': 'hashable'}, 'rename_handler': {'oneof': [{'type': 'callable'}, {'schema': {'oneof': [{'type': 'callable'}, {'allowed': (), 'type': 'string'}]}, 'type': 'list'}, {'allowed': (), 'type': 'string'}]}}#
rules = {'allof': {'logical': 'allof', 'type': 'list'}, 'allow_unknown': {'oneof': [{'type': 'boolean'}, {'check_with': 'bulk_schema', 'type': ['dict', 'string']}]}, 'allowed': {'type': 'container'}, 'anyof': {'logical': 'anyof', 'type': 'list'}, 'check_with': {'oneof': [{'type': 'callable'}, {'schema': {'oneof': [{'type': 'callable'}, {'allowed': (), 'type': 'string'}]}, 'type': 'list'}, {'allowed': (), 'type': 'string'}]}, 'coerce': {'oneof': [{'type': 'callable'}, {'schema': {'oneof': [{'type': 'callable'}, {'allowed': (), 'type': 'string'}]}, 'type': 'list'}, {'allowed': (), 'type': 'string'}]}, 'contains': {'empty': False}, 'default': {'nullable': True}, 'default_setter': {'oneof': [{'type': 'callable'}, {'allowed': (), 'type': 'string'}]}, 'dependencies': {'check_with': 'dependencies', 'type': ('dict', 'hashable', 'list')}, 'empty': {'type': 'boolean'}, 'excludes': {'schema': {'type': 'hashable'}, 'type': ('hashable', 'list')}, 'forbidden': {'type': 'list'}, 'is_directory': {'type': 'boolean'}, 'items': {'check_with': 'items', 'type': 'list'}, 'keysrules': {'check_with': 'bulk_schema', 'forbidden': ['rename', 'rename_handler'], 'type': ['dict', 'string']}, 'max': {'nullable': False}, 'maxlength': {'type': 'integer'}, 'meta': {}, 'min': {'nullable': False}, 'minlength': {'type': 'integer'}, 'noneof': {'logical': 'noneof', 'type': 'list'}, 'nullable': {'type': 'boolean'}, 'oneof': {'logical': 'oneof', 'type': 'list'}, 'purge_unknown': {'type': 'boolean'}, 'readonly': {'type': 'boolean'}, 'regex': {'type': 'string'}, 'rename': {'type': 'hashable'}, 'rename_handler': {'oneof': [{'type': 'callable'}, {'schema': {'oneof': [{'type': 'callable'}, {'allowed': (), 'type': 'string'}]}, 'type': 'list'}, {'allowed': (), 'type': 'string'}]}, 'require_all': {'type': 'boolean'}, 'required': {'type': 'boolean'}, 'schema': {'anyof': [{'check_with': 'schema'}, {'check_with': 'bulk_schema'}], 'type': ['dict', 'string']}, 'type': {'check_with': 'type', 'type': ['string', 'list']}, 'valuesrules': {'check_with': 'bulk_schema', 'forbidden': ['rename', 'rename_handler'], 'type': ['dict', 'string']}}#
validation_rules = {'allof': {'logical': 'allof', 'type': 'list'}, 'allow_unknown': {'oneof': [{'type': 'boolean'}, {'check_with': 'bulk_schema', 'type': ['dict', 'string']}]}, 'allowed': {'type': 'container'}, 'anyof': {'logical': 'anyof', 'type': 'list'}, 'check_with': {'oneof': [{'type': 'callable'}, {'schema': {'oneof': [{'type': 'callable'}, {'allowed': (), 'type': 'string'}]}, 'type': 'list'}, {'allowed': (), 'type': 'string'}]}, 'contains': {'empty': False}, 'dependencies': {'check_with': 'dependencies', 'type': ('dict', 'hashable', 'list')}, 'empty': {'type': 'boolean'}, 'excludes': {'schema': {'type': 'hashable'}, 'type': ('hashable', 'list')}, 'forbidden': {'type': 'list'}, 'is_directory': {'type': 'boolean'}, 'items': {'check_with': 'items', 'type': 'list'}, 'keysrules': {'check_with': 'bulk_schema', 'forbidden': ['rename', 'rename_handler'], 'type': ['dict', 'string']}, 'max': {'nullable': False}, 'maxlength': {'type': 'integer'}, 'meta': {}, 'min': {'nullable': False}, 'minlength': {'type': 'integer'}, 'noneof': {'logical': 'noneof', 'type': 'list'}, 'nullable': {'type': 'boolean'}, 'oneof': {'logical': 'oneof', 'type': 'list'}, 'readonly': {'type': 'boolean'}, 'regex': {'type': 'string'}, 'require_all': {'type': 'boolean'}, 'required': {'type': 'boolean'}, 'schema': {'anyof': [{'check_with': 'schema'}, {'check_with': 'bulk_schema'}], 'type': ['dict', 'string']}, 'type': {'check_with': 'type', 'type': ['string', 'list']}, 'valuesrules': {'check_with': 'bulk_schema', 'forbidden': ['rename', 'rename_handler'], 'type': ['dict', 'string']}}#
pycmor.core.validate.PIPELINES_SCHEMA = {'pipelines': {'schema': {'schema': {'name': {'required': False, 'type': 'string'}, 'steps': {'excludes': 'uses', 'schema': {'is_qualname_or_script': True, 'type': 'string'}, 'type': 'list'}, 'uses': {'excludes': 'steps', 'type': 'string'}}, 'type': 'dict'}, 'type': 'list'}}#

Schema for validating pipelines configuration.

Type:

dict

pycmor.core.validate.PIPELINES_VALIDATOR = <pycmor.core.validate.PipelineSectionValidator object>#

Validator for pipelines configuration.

Type:

Validator

class pycmor.core.validate.PipelineSectionValidator(*args, **kwargs)[source]#

Bases: Validator

Validator for pipeline configuration.

_types_from_methods = ()#
_validate(document)[source]#
_validate_is_qualname_or_script(is_qualname, field, value)[source]#

Test if a string is a Python qualname.

The rule’s arguments are validated against this schema: {‘type’: ‘boolean’}

checkers = ()#
coercers = ()#
default_setters = ()#
normalization_rules = {'coerce': {'oneof': [{'type': 'callable'}, {'schema': {'oneof': [{'type': 'callable'}, {'allowed': (), 'type': 'string'}]}, 'type': 'list'}, {'allowed': (), 'type': 'string'}]}, 'default': {'nullable': True}, 'default_setter': {'oneof': [{'type': 'callable'}, {'allowed': (), 'type': 'string'}]}, 'purge_unknown': {'type': 'boolean'}, 'rename': {'type': 'hashable'}, 'rename_handler': {'oneof': [{'type': 'callable'}, {'schema': {'oneof': [{'type': 'callable'}, {'allowed': (), 'type': 'string'}]}, 'type': 'list'}, {'allowed': (), 'type': 'string'}]}}#
rules = {'allof': {'logical': 'allof', 'type': 'list'}, 'allow_unknown': {'oneof': [{'type': 'boolean'}, {'check_with': 'bulk_schema', 'type': ['dict', 'string']}]}, 'allowed': {'type': 'container'}, 'anyof': {'logical': 'anyof', 'type': 'list'}, 'check_with': {'oneof': [{'type': 'callable'}, {'schema': {'oneof': [{'type': 'callable'}, {'allowed': (), 'type': 'string'}]}, 'type': 'list'}, {'allowed': (), 'type': 'string'}]}, 'coerce': {'oneof': [{'type': 'callable'}, {'schema': {'oneof': [{'type': 'callable'}, {'allowed': (), 'type': 'string'}]}, 'type': 'list'}, {'allowed': (), 'type': 'string'}]}, 'contains': {'empty': False}, 'default': {'nullable': True}, 'default_setter': {'oneof': [{'type': 'callable'}, {'allowed': (), 'type': 'string'}]}, 'dependencies': {'check_with': 'dependencies', 'type': ('dict', 'hashable', 'list')}, 'empty': {'type': 'boolean'}, 'excludes': {'schema': {'type': 'hashable'}, 'type': ('hashable', 'list')}, 'forbidden': {'type': 'list'}, 'is_qualname_or_script': {'type': 'boolean'}, 'items': {'check_with': 'items', 'type': 'list'}, 'keysrules': {'check_with': 'bulk_schema', 'forbidden': ['rename', 'rename_handler'], 'type': ['dict', 'string']}, 'max': {'nullable': False}, 'maxlength': {'type': 'integer'}, 'meta': {}, 'min': {'nullable': False}, 'minlength': {'type': 'integer'}, 'noneof': {'logical': 'noneof', 'type': 'list'}, 'nullable': {'type': 'boolean'}, 'oneof': {'logical': 'oneof', 'type': 'list'}, 'purge_unknown': {'type': 'boolean'}, 'readonly': {'type': 'boolean'}, 'regex': {'type': 'string'}, 'rename': {'type': 'hashable'}, 'rename_handler': {'oneof': [{'type': 'callable'}, {'schema': {'oneof': [{'type': 'callable'}, {'allowed': (), 'type': 'string'}]}, 'type': 'list'}, {'allowed': (), 'type': 'string'}]}, 'require_all': {'type': 'boolean'}, 'required': {'type': 'boolean'}, 'schema': {'anyof': [{'check_with': 'schema'}, {'check_with': 'bulk_schema'}], 'type': ['dict', 'string']}, 'type': {'check_with': 'type', 'type': ['string', 'list']}, 'valuesrules': {'check_with': 'bulk_schema', 'forbidden': ['rename', 'rename_handler'], 'type': ['dict', 'string']}}#
validation_rules = {'allof': {'logical': 'allof', 'type': 'list'}, 'allow_unknown': {'oneof': [{'type': 'boolean'}, {'check_with': 'bulk_schema', 'type': ['dict', 'string']}]}, 'allowed': {'type': 'container'}, 'anyof': {'logical': 'anyof', 'type': 'list'}, 'check_with': {'oneof': [{'type': 'callable'}, {'schema': {'oneof': [{'type': 'callable'}, {'allowed': (), 'type': 'string'}]}, 'type': 'list'}, {'allowed': (), 'type': 'string'}]}, 'contains': {'empty': False}, 'dependencies': {'check_with': 'dependencies', 'type': ('dict', 'hashable', 'list')}, 'empty': {'type': 'boolean'}, 'excludes': {'schema': {'type': 'hashable'}, 'type': ('hashable', 'list')}, 'forbidden': {'type': 'list'}, 'is_qualname_or_script': {'type': 'boolean'}, 'items': {'check_with': 'items', 'type': 'list'}, 'keysrules': {'check_with': 'bulk_schema', 'forbidden': ['rename', 'rename_handler'], 'type': ['dict', 'string']}, 'max': {'nullable': False}, 'maxlength': {'type': 'integer'}, 'meta': {}, 'min': {'nullable': False}, 'minlength': {'type': 'integer'}, 'noneof': {'logical': 'noneof', 'type': 'list'}, 'nullable': {'type': 'boolean'}, 'oneof': {'logical': 'oneof', 'type': 'list'}, 'readonly': {'type': 'boolean'}, 'regex': {'type': 'string'}, 'require_all': {'type': 'boolean'}, 'required': {'type': 'boolean'}, 'schema': {'anyof': [{'check_with': 'schema'}, {'check_with': 'bulk_schema'}], 'type': ['dict', 'string']}, 'type': {'check_with': 'type', 'type': ['string', 'list']}, 'valuesrules': {'check_with': 'bulk_schema', 'forbidden': ['rename', 'rename_handler'], 'type': ['dict', 'string']}}#
pycmor.core.validate.RULES_SCHEMA = {'rules': {'schema': {'allow_unknown': True, 'schema': {'adjust_timestamp': {'required': False, 'type': 'string'}, 'array_order': {'required': False, 'type': 'list'}, 'cmor_unit': {'required': False, 'type': 'string'}, 'cmor_variable': {'required': True, 'type': 'string'}, 'description': {'required': False, 'type': 'string'}, 'enabled': {'required': False, 'type': 'boolean'}, 'experiment_id': {'required': True, 'type': 'string'}, 'file_timespan': {'required': False, 'type': 'string'}, 'further_info_url': {'required': False, 'type': 'string'}, 'grid_label': {'required': True, 'type': 'string'}, 'input_source': {'allowed': ['xr_tutorial'], 'required': False, 'type': 'string'}, 'input_type': {'allowed': ['xr.DataArray', 'xr.Dataset'], 'required': False, 'type': 'string'}, 'inputs': {'required': True, 'schema': {'schema': {'path': {'required': True, 'type': 'string'}, 'pattern': {'required': True, 'type': 'string'}}, 'type': 'dict'}, 'type': 'list'}, 'instition_id': {'required': False, 'type': 'string'}, 'model_component': {'required': True, 'type': 'string'}, 'model_unit': {'required': False, 'type': 'string'}, 'model_variable': {'required': False, 'type': 'string'}, 'name': {'required': False, 'type': 'string'}, 'output_directory': {'is_directory': True, 'required': True, 'type': 'string'}, 'pipelines': {'schema': {'type': 'string'}, 'type': 'list'}, 'source_id': {'required': True, 'type': 'string'}, 'variant_label': {'regex': '^r\\d+i\\d+p\\d+f\\d+$', 'required': True, 'type': 'string'}}, 'type': 'dict'}, 'type': 'list'}}#

Schema for validating rules configuration.

Type:

dict

class pycmor.core.validate.RuleSectionValidator(*args, **kwargs)[source]#

Bases: DirectoryAwareValidator

Validator for rules configuration.

_types_from_methods = ()#
checkers = ()#
coercers = ()#
default_setters = ()#
normalization_rules = {'coerce': {'oneof': [{'type': 'callable'}, {'schema': {'oneof': [{'type': 'callable'}, {'allowed': (), 'type': 'string'}]}, 'type': 'list'}, {'allowed': (), 'type': 'string'}]}, 'default': {'nullable': True}, 'default_setter': {'oneof': [{'type': 'callable'}, {'allowed': (), 'type': 'string'}]}, 'purge_unknown': {'type': 'boolean'}, 'rename': {'type': 'hashable'}, 'rename_handler': {'oneof': [{'type': 'callable'}, {'schema': {'oneof': [{'type': 'callable'}, {'allowed': (), 'type': 'string'}]}, 'type': 'list'}, {'allowed': (), 'type': 'string'}]}}#
rules = {'allof': {'logical': 'allof', 'type': 'list'}, 'allow_unknown': {'oneof': [{'type': 'boolean'}, {'check_with': 'bulk_schema', 'type': ['dict', 'string']}]}, 'allowed': {'type': 'container'}, 'anyof': {'logical': 'anyof', 'type': 'list'}, 'check_with': {'oneof': [{'type': 'callable'}, {'schema': {'oneof': [{'type': 'callable'}, {'allowed': (), 'type': 'string'}]}, 'type': 'list'}, {'allowed': (), 'type': 'string'}]}, 'coerce': {'oneof': [{'type': 'callable'}, {'schema': {'oneof': [{'type': 'callable'}, {'allowed': (), 'type': 'string'}]}, 'type': 'list'}, {'allowed': (), 'type': 'string'}]}, 'contains': {'empty': False}, 'default': {'nullable': True}, 'default_setter': {'oneof': [{'type': 'callable'}, {'allowed': (), 'type': 'string'}]}, 'dependencies': {'check_with': 'dependencies', 'type': ('dict', 'hashable', 'list')}, 'empty': {'type': 'boolean'}, 'excludes': {'schema': {'type': 'hashable'}, 'type': ('hashable', 'list')}, 'forbidden': {'type': 'list'}, 'is_directory': {'type': 'boolean'}, 'items': {'check_with': 'items', 'type': 'list'}, 'keysrules': {'check_with': 'bulk_schema', 'forbidden': ['rename', 'rename_handler'], 'type': ['dict', 'string']}, 'max': {'nullable': False}, 'maxlength': {'type': 'integer'}, 'meta': {}, 'min': {'nullable': False}, 'minlength': {'type': 'integer'}, 'noneof': {'logical': 'noneof', 'type': 'list'}, 'nullable': {'type': 'boolean'}, 'oneof': {'logical': 'oneof', 'type': 'list'}, 'purge_unknown': {'type': 'boolean'}, 'readonly': {'type': 'boolean'}, 'regex': {'type': 'string'}, 'rename': {'type': 'hashable'}, 'rename_handler': {'oneof': [{'type': 'callable'}, {'schema': {'oneof': [{'type': 'callable'}, {'allowed': (), 'type': 'string'}]}, 'type': 'list'}, {'allowed': (), 'type': 'string'}]}, 'require_all': {'type': 'boolean'}, 'required': {'type': 'boolean'}, 'schema': {'anyof': [{'check_with': 'schema'}, {'check_with': 'bulk_schema'}], 'type': ['dict', 'string']}, 'type': {'check_with': 'type', 'type': ['string', 'list']}, 'valuesrules': {'check_with': 'bulk_schema', 'forbidden': ['rename', 'rename_handler'], 'type': ['dict', 'string']}}#
validation_rules = {'allof': {'logical': 'allof', 'type': 'list'}, 'allow_unknown': {'oneof': [{'type': 'boolean'}, {'check_with': 'bulk_schema', 'type': ['dict', 'string']}]}, 'allowed': {'type': 'container'}, 'anyof': {'logical': 'anyof', 'type': 'list'}, 'check_with': {'oneof': [{'type': 'callable'}, {'schema': {'oneof': [{'type': 'callable'}, {'allowed': (), 'type': 'string'}]}, 'type': 'list'}, {'allowed': (), 'type': 'string'}]}, 'contains': {'empty': False}, 'dependencies': {'check_with': 'dependencies', 'type': ('dict', 'hashable', 'list')}, 'empty': {'type': 'boolean'}, 'excludes': {'schema': {'type': 'hashable'}, 'type': ('hashable', 'list')}, 'forbidden': {'type': 'list'}, 'is_directory': {'type': 'boolean'}, 'items': {'check_with': 'items', 'type': 'list'}, 'keysrules': {'check_with': 'bulk_schema', 'forbidden': ['rename', 'rename_handler'], 'type': ['dict', 'string']}, 'max': {'nullable': False}, 'maxlength': {'type': 'integer'}, 'meta': {}, 'min': {'nullable': False}, 'minlength': {'type': 'integer'}, 'noneof': {'logical': 'noneof', 'type': 'list'}, 'nullable': {'type': 'boolean'}, 'oneof': {'logical': 'oneof', 'type': 'list'}, 'readonly': {'type': 'boolean'}, 'regex': {'type': 'string'}, 'require_all': {'type': 'boolean'}, 'required': {'type': 'boolean'}, 'schema': {'anyof': [{'check_with': 'schema'}, {'check_with': 'bulk_schema'}], 'type': ['dict', 'string']}, 'type': {'check_with': 'type', 'type': ['string', 'list']}, 'valuesrules': {'check_with': 'bulk_schema', 'forbidden': ['rename', 'rename_handler'], 'type': ['dict', 'string']}}#