import copy
import getpass
import os
from importlib.resources import files
from pathlib import Path
import dask # noqa: F401
import pandas as pd
import questionary
import xarray as xr # noqa: F401
import yaml
from dask.distributed import Client
from everett.manager import generate_uppercase_key, get_runtime_config
from prefect import flow, get_run_logger, task
from prefect.futures import wait
from rich.progress import track
from ..data_request.collection import DataRequest
from ..data_request.table import DataRequestTable
from ..data_request.variable import DataRequestVariable
from ..std_lib.global_attributes import GlobalAttributes
from ..std_lib.timeaverage import _frequency_from_approx_interval
from .aux_files import attach_files_to_rule
from .cluster import (
CLUSTER_ADAPT_SUPPORT,
CLUSTER_MAPPINGS,
CLUSTER_SCALE_SUPPORT,
DaskContext,
set_dashboard_link,
)
from .config import PycmorConfig, PycmorConfigManager
from .controlled_vocabularies import ControlledVocabularies
from .factory import create_factory
from .filecache import fc
from .logging import logger
from .pipeline import Pipeline
from .rule import Rule
from .utils import wait_for_workers
from .validate import GENERAL_VALIDATOR, PIPELINES_VALIDATOR, RULES_VALIDATOR
DIMENSIONLESS_MAPPING_TABLE = files("pycmor.data").joinpath(
"dimensionless_mappings.yaml"
)
"""Path: The dimenionless unit mapping table, used to recreate meaningful units from
dimensionless fractional values (e.g. 0.001 --> g/kg)"""
[docs]
class CMORizer:
_SUPPORTED_CMOR_VERSIONS = ("CMIP6", "CMIP7")
"""tuple : Supported CMOR versions."""
def __init__(
self,
pymor_cfg=None,
pycmor_cfg=None, # New parameter name
general_cfg=None,
pipelines_cfg=None,
rules_cfg=None,
dask_cfg=None,
inherit_cfg=None,
**kwargs,
):
################################################################################
self._general_cfg = general_cfg or {}
# Use pycmor_cfg if provided, otherwise fall back to pymor_cfg for backward compatibility
pycmor_cfg = pycmor_cfg or pymor_cfg or {}
self._pycmor_cfg = PycmorConfigManager.from_pycmor_cfg(pycmor_cfg)
self._pymor_cfg = self._pycmor_cfg # For backward compatibility
self._dask_cfg = dask_cfg or {}
self._inherit_cfg = inherit_cfg or {}
self.rules = rules_cfg or []
self.pipelines = pipelines_cfg or []
self._cluster = None # ask Cluster, might be set up later
################################################################################
# CMOR Version Settings:
if self._general_cfg.get("cmor_version") is None:
raise ValueError("cmor_version must be set in the general configuration.")
self.cmor_version = self._general_cfg["cmor_version"]
if self.cmor_version not in self._SUPPORTED_CMOR_VERSIONS:
logger.error(f"CMOR version {self.cmor_version} is not supported.")
logger.error(f"Supported versions are {self._SUPPORTED_CMOR_VERSION}")
raise ValueError(f"Unsupported CMOR version: {self.cmor_version}")
################################################################################
# Print Out Configuration:
logger.debug(80 * "#")
logger.debug("---------------------")
logger.debug("General Configuration")
logger.debug("---------------------")
logger.debug(yaml.dump(self._general_cfg))
logger.debug("--------------------")
logger.debug("PyCMOR Configuration:")
logger.debug("--------------------")
# This isn't actually the config, it's the "App" object. Everett is weird about this...
pymor_config = PycmorConfig()
# NOTE(PG): This variable is for demonstration purposes:
_pymor_config_dict = {}
for namespace, key, value, option in get_runtime_config(
self._pymor_cfg, pymor_config
):
full_key = generate_uppercase_key(key, namespace)
_pymor_config_dict[full_key] = value
logger.info(yaml.dump(_pymor_config_dict))
# Avoid confusion:
del pymor_config
logger.info(80 * "#")
################################################################################
################################################################################
# NOTE(PG): Curious about the configuration? Add a breakpoint here and print
# out the variable _pymor_config_dict to see EVERYTHING that is
# available to you in the configuration.
# breakpoint()
################################################################################
################################################################################
# Post_Init:
if self._pycmor_cfg("enable_dask"):
logger.debug("Setting up dask configuration...")
self._post_init_configure_dask()
logger.debug("...done!")
logger.debug("Creating dask cluster...")
self._post_init_create_dask_cluster()
logger.debug("...done!")
self._post_init_create_pipelines()
self._post_init_create_rules()
self._post_init_create_data_request_tables()
self._post_init_create_data_request()
self._post_init_populate_rules_with_tables()
self._post_init_populate_rules_with_dimensionless_unit_mappings()
self._post_init_populate_rules_with_aux_files()
self._post_init_populate_rules_with_data_request_variables()
self._post_init_create_controlled_vocabularies()
self._post_init_populate_rules_with_controlled_vocabularies()
self._post_init_create_global_attributes_on_rules()
logger.debug("...post-init done!")
################################################################################
def __del__(self):
"""Gracefully close the cluster if it exists"""
if self._cluster is not None:
self._cluster.close()
[docs]
@staticmethod
def _ensure_dask_slurm_account(jobqueue_cfg):
slurm_jobqueue_cfg = jobqueue_cfg.get("slurm", {})
if slurm_jobqueue_cfg.get("account") is None:
slurm_jobqueue_cfg["account"] = os.environ.get("SLURM_JOB_ACCOUNT")
return jobqueue_cfg
[docs]
def _post_init_configure_dask(self):
"""
Sets up configuration for Dask-Distributed
See Also
--------
https://docs.dask.org/en/stable/configuration.html?highlight=config#directly-within-python
"""
# Needed to pre-populate config
import dask.distributed # noqa: F401
import dask_jobqueue # noqa: F401
jobqueue_cfg = self._dask_cfg.get("jobqueue", {})
jobqueue_cfg = self._ensure_dask_slurm_account(jobqueue_cfg)
self._dask_cfg = {
"distributed": self._dask_cfg.get("distributed", {}),
"jobqueue": jobqueue_cfg,
}
logger.info("Updating Dask configuration. Changed values will be:")
logger.info(yaml.dump(self._dask_cfg))
dask.config.update(dask.config.config, self._dask_cfg)
logger.info("Dask configuration updated!")
[docs]
def _post_init_create_dask_cluster(self):
# FIXME: In the future, we can support PBS, too.
logger.info("Setting up dask cluster...")
cluster_name = self._pymor_cfg("dask_cluster")
ClusterClass = CLUSTER_MAPPINGS[cluster_name]
self._cluster = ClusterClass()
set_dashboard_link(self._cluster)
cluster_scaling_mode = self._pymor_cfg.get("dask_cluster_scaling_mode", "adapt")
if cluster_scaling_mode == "adapt":
if CLUSTER_ADAPT_SUPPORT[cluster_name]:
min_jobs = self._pymor_cfg.get("dask_cluster_scaling_minimum_jobs", 1)
max_jobs = self._pymor_cfg.get("dask_cluster_scaling_maximum_jobs", 10)
self._cluster.adapt(minimum_jobs=min_jobs, maximum_jobs=max_jobs)
else:
logger.warning(f"{self._cluster} does not support adaptive scaling!")
elif cluster_scaling_mode == "fixed":
if CLUSTER_SCALE_SUPPORT[cluster_name]:
jobs = self._pymor_cfg.get("dask_cluster_scaling_fixed_jobs", 5)
self._cluster.scale(jobs=jobs)
else:
logger.warning(f"{self._cluster} does not support fixed scaing")
else:
raise ValueError(
"You need to specify adapt or fixed for pymor.dask_cluster_scaling_mode"
)
# FIXME: Include the gateway option if possible
# FIXME: Does ``Client`` needs to be available here?
logger.info(f"Cluster can be found at: {self._cluster=}")
logger.info(f"Dashboard {self._cluster.dashboard_link}")
username = getpass.getuser()
nodename = getattr(os.uname(), "nodename", "UNKNOWN")
logger.info(
"To see the dashboards run the following command in your computer's "
"terminal:\n"
f"\tpycmor ssh-tunnel --username {username} --compute-node "
f"{nodename}"
)
dask_extras = 0
messages = []
messages.append("Importing Dask Extras...")
if self._pymor_cfg.get("enable_flox", True):
dask_extras += 1
messages.append("...flox...")
import flox # noqa: F401
import flox.xarray # noqa: F401
messages.append(f"...done! Imported {dask_extras} libraries.")
if messages:
for message in messages:
logger.info(message)
else:
logger.info("No Dask extras specified...")
[docs]
def _post_init_create_data_request_tables(self):
"""
Loads all the tables from table directory as a mapping object.
A shortened version of the filename (i.e., ``CMIP6_Omon.json`` -> ``Omon``) is used as the mapping key.
The same key format is used in CMIP6_table_id.json
"""
data_request_table_factory = create_factory(DataRequestTable)
DataRequestTableClass = data_request_table_factory.get(self.cmor_version)
table_dir = Path(self._general_cfg["CMIP_Tables_Dir"])
tables = DataRequestTableClass.table_dict_from_directory(table_dir)
self._general_cfg["tables"] = self.tables = tables
[docs]
def _post_init_create_data_request(self):
"""
Creates a DataRequest object from the tables directory.
"""
table_dir = self._general_cfg["CMIP_Tables_Dir"]
data_request_factory = create_factory(DataRequest)
DataRequestClass = data_request_factory.get(self.cmor_version)
self.data_request = DataRequestClass.from_directory(table_dir)
[docs]
def _post_init_populate_rules_with_tables(self):
"""
Populates the rules with the tables in which the variable described by that rule is found.
"""
tables = self._general_cfg["tables"]
for rule in self.rules:
for tbl in tables.values():
if rule.cmor_variable in tbl.variables:
rule.add_table(tbl.table_id)
[docs]
def _post_init_populate_rules_with_data_request_variables(self):
for drv in self.data_request.variables.values():
rule_for_var = self.find_matching_rule(drv)
if rule_for_var is None:
continue
if rule_for_var.data_request_variables == []:
rule_for_var.data_request_variables = [drv]
else:
rule_for_var.data_request_variables.append(drv)
# FIXME: This needs a better name...
# Cluster might need to be copied:
with DaskContext.set_cluster(self._cluster):
self._rules_expand_drvs()
self._rules_depluralize_drvs()
[docs]
def _post_init_create_controlled_vocabularies(self):
"""
Reads the controlled vocabularies from the directory tree rooted at
``<tables_dir>/CMIP6_CVs`` and stores them in the ``controlled_vocabularies``
attribute. This is done after the rules have been populated with the
tables and data request variables, which may be used to lookup the
controlled vocabularies.
"""
table_dir = self._general_cfg["CV_Dir"]
controlled_vocabularies_factory = create_factory(ControlledVocabularies)
ControlledVocabulariesClass = controlled_vocabularies_factory.get(
self.cmor_version
)
self.controlled_vocabularies = ControlledVocabulariesClass.load(table_dir)
[docs]
def _post_init_populate_rules_with_controlled_vocabularies(self):
for rule in self.rules:
rule.controlled_vocabularies = self.controlled_vocabularies
[docs]
def _post_init_populate_rules_with_aux_files(self):
"""Attaches auxiliary files to the rules"""
for rule in self.rules:
attach_files_to_rule(rule)
[docs]
def _post_init_populate_rules_with_dimensionless_unit_mappings(self):
"""
Reads the dimensionless unit mappings from a configuration file and
updates the rules with these mappings.
This method reads the dimensionless unit mappings from a file specified
in the configuration. If the file is not specified or does not exist,
an empty dictionary is used. The mappings are then added to each rule
in the `rules` attribute.
Parameters
----------
None
Returns
-------
None
"""
pymor_cfg = self._pymor_cfg
unit_map_file = pymor_cfg.get(
"dimensionless_mapping_table", DIMENSIONLESS_MAPPING_TABLE
)
if unit_map_file is None:
logger.warning("No dimensionless unit mappings file specified!")
dimensionless_unit_mappings = {}
else:
with open(unit_map_file, "r") as f:
dimensionless_unit_mappings = yaml.safe_load(f)
# Add to rules:
for rule in self.rules:
rule.dimensionless_unit_mappings = dimensionless_unit_mappings
[docs]
def _match_pipelines_in_rules(self, force=False):
for rule in self.rules:
rule.match_pipelines(self.pipelines, force=force)
[docs]
def find_matching_rule(
self, data_request_variable: DataRequestVariable
) -> Rule or None:
matches = []
attr_criteria = [("cmor_variable", "variable_id")]
for rule in self.rules:
if all(
getattr(rule, r_attr) == getattr(data_request_variable, drv_attr)
for (r_attr, drv_attr) in attr_criteria
):
matches.append(rule)
if len(matches) == 0:
msg = f"No rule found for {data_request_variable}"
if self._pymor_cfg.get("raise_on_no_rule", False):
raise ValueError(msg)
elif self._pymor_cfg.get("warn_on_no_rule", False):
logger.warning(msg)
return None
if len(matches) > 1:
msg = f"Need only one rule to match to {data_request_variable}. Found {len(matches)}."
if self._pymor_cfg.get("raise_on_multiple_rules", True):
raise ValueError(msg)
else:
logger.critical(msg)
logger.critical(
"""
This should lead to a program crash! Exception due to:
>> pymor_cfg['raise_on_multiple_rules'] = False <<
"""
)
logger.warning("Returning the first match.")
return matches[0]
# FIXME: This needs a better name...
[docs]
def _rules_expand_drvs(self):
new_rules = []
for rule in self.rules:
if len(rule.data_request_variables) == 1:
new_rules.append(rule)
else:
cloned_rules = rule.expand_drvs()
for rule in cloned_rules:
# Rule has a table_id or a table_name, so it should only
# match that table
if hasattr(rule, "table_id"):
if isinstance(rule.table_id, str):
rule.table_id = [
rule.table_id,
]
logger.info(f"Specified table_id as {rule.table_id=}")
for drv in rule.data_request_variables:
if drv.table_header.table_id in rule.table_id:
logger.info(f"Adding rule/table combo for {drv}")
new_rules.append(rule)
elif hasattr(rule, "table_name"):
if isinstance(rule.table_name, str):
rule.table_name = [
rule.table_name,
]
logger.info(f"Specified table_name as {rule.table_name=}")
for drv in rule.data_request_variables:
if drv.table_header.table_id in rule.table_name:
logger.info(f"Adding rule/table combo for {drv}")
new_rules.append(rule)
else:
new_rules.append(rule)
self.rules = new_rules
[docs]
def _rules_depluralize_drvs(self):
"""Ensures that only one data request variable is assigned to each rule"""
for rule in self.rules:
rule.depluralize_drvs()
[docs]
def _post_init_create_pipelines(self):
pipelines = []
for p in self.pipelines:
if isinstance(p, Pipeline):
pipelines.append(p)
elif isinstance(p, dict):
p["workflow_backend"] = p.get(
"workflow_backend",
self._pymor_cfg("pipeline_workflow_orchestrator"),
)
pl = Pipeline.from_dict(p)
if self._cluster is not None:
pl.assign_cluster(self._cluster)
pipelines.append(Pipeline.from_dict(p))
else:
raise ValueError(f"Invalid pipeline configuration for {p}")
self.pipelines = pipelines
[docs]
def _post_init_create_rules(self):
_rules = []
for p in self.rules:
if isinstance(p, Rule):
_rules.append(p)
elif isinstance(p, dict):
_rules.append(Rule.from_dict(p))
else:
raise TypeError("rule must be an instance of Rule or dict")
self.rules = _rules
self._post_init_inherit_rules()
self._post_init_attach_pymor_config_rules()
[docs]
def _post_init_attach_pymor_config_rules(self):
for rule in self.rules:
# NOTE(PG): **COPY** (don't assign) the configuration to the rule
rule._pycmor_cfg = copy.deepcopy(self._pycmor_cfg)
rule._pymor_cfg = rule._pycmor_cfg # For backward compatibility
[docs]
def _post_init_inherit_rules(self):
for rule_attr, rule_value in self._inherit_cfg.items():
for rule in self.rules:
rule.set(rule_attr, rule_value)
[docs]
def validate(self):
"""Performs validation on files if they are suitable for use with the pipeline requirements"""
# Sanity Checks:
# :PS: @PG the following functions are not defined yet
# self._check_rules_for_table()
# self._check_rules_for_output_dir()
# FIXME(PS): Turn off this check, see GH #59 (https://tinyurl.com/3z7d8uuy)
# self._check_is_subperiod()
logger.debug("Starting validate....")
self._check_units()
logger.debug("...done!")
[docs]
def _check_is_subperiod(self):
logger.info("checking frequency in netcdf file and in table...")
errors = []
for rule in self.rules:
table_freq = _frequency_from_approx_interval(
rule.data_request_variable.table_header.approx_interval
)
# is_subperiod from pandas does not support YE or ME notation
table_freq = table_freq.rstrip("E")
for input_collection in rule.inputs:
data_freq = input_collection.frequency
if data_freq is None:
if not input_collection.files:
logger.info("No. input files found. Skipping frequency check.")
break
data_freq = fc.get(input_collection.files[0]).freq
is_subperiod = pd.tseries.frequencies.is_subperiod(
data_freq, table_freq
)
if not is_subperiod:
errors.append(
ValueError(
f"Freq in source file {data_freq} is not a subperiod of freq in table {table_freq}."
),
)
logger.info(
f"Frequency of data {data_freq}. Frequency in tables {table_freq}"
)
if errors:
for err in errors:
logger.error(err)
raise errors[0]
[docs]
def _check_units(self):
# TODO (MA): This function needs to be cleaned up if it needs to stay
# but it will probably be removed soon if we do the validation checks
# via dryruns of the steps.
def is_unit_scalar(value):
if value is None:
return False
try:
x = float(value)
except ValueError:
return False
return (x - 1) == 0
errors = []
for rule in self.rules:
for input_collection in rule.inputs:
try:
filename = input_collection.files[0]
except IndexError:
break
model_unit = rule.get("model_unit") or fc.get(filename).units
cmor_unit = rule.data_request_variable.units
cmor_variable = rule.data_request_variables.get("cmor_variable")
if model_unit is None:
if not (is_unit_scalar(cmor_unit) or cmor_unit == "%"):
errors.append(
ValueError(
f"dimensionless variables must have dimensionless units ({model_unit} {cmor_unit})"
)
)
if is_unit_scalar(cmor_unit):
if not is_unit_scalar(model_unit):
dimless = rule.get("dimensionless_unit_mappings", {})
if cmor_unit not in dimless.get(cmor_variable, {}):
errors.append(
f"Missing mapping for dimensionless variable {cmor_variable}"
)
if errors:
for err in errors:
logger.error(err)
raise errors[0]
[docs]
@classmethod
def from_dict(cls, data):
if "general" in data:
if not GENERAL_VALIDATOR.validate({"general": data["general"]}):
raise ValueError(GENERAL_VALIDATOR.errors)
# Use pycmor config if available, otherwise fall back to pymor for backward compatibility
pycmor_cfg = data.get("pycmor", data.get("pymor", {}))
instance = cls(
pycmor_cfg=pycmor_cfg,
general_cfg=data.get("general", {}),
dask_cfg={
"distributed": data.get("distributed", {}),
"jobqueue": data.get("jobqueue", {}),
},
inherit_cfg=data.get("inherit", {}),
)
if "rules" in data:
if not RULES_VALIDATOR.validate({"rules": data["rules"]}):
raise ValueError(RULES_VALIDATOR.errors)
for rule in data.get("rules", []):
rule_obj = Rule.from_dict(rule)
instance.add_rule(rule_obj)
instance._post_init_attach_pymor_config_rules()
instance._post_init_inherit_rules()
if "pipelines" in data:
if not PIPELINES_VALIDATOR.validate({"pipelines": data["pipelines"]}):
raise ValueError(PIPELINES_VALIDATOR.errors)
for pipeline in data.get("pipelines", []):
pipeline["workflow_backend"] = pipeline.get(
"workflow_backend",
instance._pymor_cfg("pipeline_workflow_orchestrator"),
)
pipeline_obj = Pipeline.from_dict(pipeline)
instance.add_pipeline(pipeline_obj)
instance._post_init_populate_rules_with_tables()
instance._post_init_create_data_request()
instance._post_init_populate_rules_with_data_request_variables()
instance._post_init_populate_rules_with_dimensionless_unit_mappings()
instance._post_init_populate_rules_with_aux_files()
instance._post_init_populate_rules_with_controlled_vocabularies()
instance._post_init_create_global_attributes_on_rules()
logger.debug("Object creation done!")
return instance
[docs]
def add_rule(self, rule):
if not isinstance(rule, Rule):
raise TypeError("rule must be an instance of Rule")
self.rules.append(rule)
[docs]
def add_pipeline(self, pipeline):
if not isinstance(pipeline, Pipeline):
raise TypeError("pipeline must be an instance of Pipeline")
if self._cluster is not None:
# Assign the cluster to this pipeline:
pipeline.assign_cluster(self._cluster)
self.pipelines.append(pipeline)
[docs]
def _rule_for_filepath(self, filepath):
filepath = str(filepath)
matching_rules = []
for rule in self.rules:
for pattern in rule.input_patterns:
if pattern.match(filepath):
matching_rules.append(rule)
return matching_rules
[docs]
def _rule_for_cmor_variable(self, cmor_variable):
matching_rules = []
for rule in self.rules:
if rule.cmor_variable == cmor_variable:
matching_rules.append(rule)
logger.debug(f"Found {len(matching_rules)} rules to apply for {cmor_variable}")
return matching_rules
[docs]
def check_rules_for_table(self, table_name):
missing_variables = []
for cmor_variable in self._cmor_tables[table_name]["variable_entry"]:
if self._rule_for_cmor_variable(cmor_variable) == []:
if self._pymor_cfg.get("raise_on_no_rule", False):
raise ValueError(f"No rule found for {cmor_variable}")
elif self._pymor_cfg.get("warn_on_no_rule", True):
# FIXME(PG): This should be handled by the logger automatically
if not self._pymor_cfg.get("quiet", True):
logger.warning(f"No rule found for {cmor_variable}")
missing_variables.append(cmor_variable)
if missing_variables:
logger.warning("This CMORizer may be incomplete or badly configured!")
logger.warning(
f"Missing rules for >> {len(missing_variables)} << variables."
)
[docs]
def check_rules_for_output_dir(self, output_dir):
all_files_in_output_dir = [f for f in Path(output_dir).iterdir()]
for rule in self.rules:
# Remove files from list when matching a rule
for filepath in all_files_in_output_dir:
if self._rule_for_filepath(filepath):
all_files_in_output_dir.remove(filepath)
if all_files_in_output_dir:
logger.warning("This CMORizer may be incomplete or badly configured!")
logger.warning(
f"Found >> {len(all_files_in_output_dir)} << files in output dir not matching any rule."
)
if questionary.confirm("Do you want to view these files?").ask():
for filepath in all_files_in_output_dir:
logger.warning(filepath)
[docs]
def process(self, parallel=None):
logger.debug("Process start!")
self._match_pipelines_in_rules()
if parallel is None:
parallel = self._pymor_cfg.get("parallel", True)
if parallel:
logger.debug("Parallel processing...")
# FIXME(PG): This is mixed up, hard-coding to prefect for now...
workflow_backend = self._pymor_cfg.get("pipeline_orchestrator", "prefect")
logger.debug(f"...with {workflow_backend}...")
return self.parallel_process(backend=workflow_backend)
else:
return self.serial_process()
[docs]
def parallel_process(self, backend="prefect"):
if backend == "prefect":
logger.debug("About to submit _parallel_process_prefect()")
return self._parallel_process_prefect()
elif backend == "dask":
return self._parallel_process_dask()
else:
raise ValueError("Unknown backend for parallel processing")
[docs]
def _parallel_process_prefect(self):
# prefect_logger = get_run_logger()
# logger = prefect_logger
# @flow(task_runner=DaskTaskRunner(address=self._cluster.scheduler_address))
logger.debug("Defining dynamically generated prefect workflow...")
@flow(name="CMORizer Process")
def dynamic_flow():
rule_results = []
for rule in self.rules:
rule_results.append(self._process_rule.submit(rule))
wait(rule_results)
return rule_results
logger.debug("...done!")
logger.debug("About to return dynamic_flow()...")
with DaskContext.set_cluster(self._cluster):
# We encapsulate the flow in a context manager to ensure that the
# Dask cluster is available in the singleton, which could be used
# during unpickling to reattach it to a Pipeline.
return dynamic_flow()
[docs]
def _parallel_process_dask(self, external_client=None):
if external_client:
client = external_client
else:
client = Client(cluster=self._cluster) # start a local Dask client
if wait_for_workers(client, 1):
futures = [client.submit(self._process_rule, rule) for rule in self.rules]
results = client.gather(futures)
logger.success("Processing completed.")
return results
else:
logger.error("Timeout reached waiting for dask cluster, sorry...")
[docs]
def serial_process(self):
data = {}
for rule in track(self.rules, description="Processing rules"):
data[rule.name] = self._process_rule(rule)
logger.success("Processing completed.")
return data
[docs]
@flow
def check_prefect(self):
logger = get_run_logger()
try:
self._caching_check()
except Exception:
logger.critical("Problem with caching in Prefect detected...")
[docs]
@flow
def _caching_check(self):
"""Checks if workflows are possible to be cached"""
data = {}
for rule in self.rules:
# del rule._pymor_cfg
# del rule.data_request_variable
data[rule.name] = self._caching_single_rule(rule)
return data
[docs]
@staticmethod
@task
def _caching_single_rule(rule):
logger.info(f"Starting to try caching on {rule}")
data = f"Cached call of {rule.name}"
return data
[docs]
@staticmethod
@task(name="Process rule")
def _process_rule(rule):
logger.info(f"Starting to process rule {rule}")
data = None
if not len(rule.pipelines) > 0:
logger.error("No pipeline defined, something is wrong!")
for pipeline in rule.pipelines:
logger.info(f"Running {str(pipeline)}")
data = pipeline.run(data, rule)
return data
[docs]
def _post_init_create_global_attributes_on_rules(self):
global_attributes_factory = create_factory(GlobalAttributes)
GlobalAttributesClass = global_attributes_factory.get(self.cmor_version)
for rule in self.rules:
rule.create_global_attributes(GlobalAttributesClass)