import os
import sys
from importlib import resources
from typing import List
import pkg_resources
import rich_click as click
import yaml
from click_loguru import ClickLoguru
from dask.distributed import Client
from rich.traceback import install as rich_traceback_install
from streamlit.web import cli as stcli
from . import _version
from .core import caching
from .core.cmorizer import CMORizer
from .core.filecache import fc
from .core.logging import add_report_logger, logger
from .core.ssh_tunnel import ssh_tunnel_cli
from .core.validate import GENERAL_VALIDATOR, PIPELINES_VALIDATOR, RULES_VALIDATOR
from .dev import utils as dev_utils
from .fesom_1p4.nodes_to_levels import convert
from .scripts.update_dimensionless_mappings import update_dimensionless_mappings
MAX_FRAMES = int(
os.environ.get(
"PYCMOR_ERROR_MAX_FRAMES", os.environ.get("PYMOR_ERROR_MAX_FRAMES", 3)
)
)
"""
str: The maximum number of frames to show in the traceback if there is an error. Default to 3
"""
# install rich traceback
rich_traceback_install(show_locals=True, max_frames=MAX_FRAMES)
VERSION = _version.get_versions()["version"]
# global constants
LOG_FILE_RETENTION = 3
NAME = "pycmor"
# define the CLI
click_loguru = ClickLoguru(
NAME,
VERSION,
retention=LOG_FILE_RETENTION,
# log_dir_parent="tests/data/logs",
timer_log_level="info",
)
# FIXME(PG): Doesn't work as intended :-(
[docs]
def pymor_cli_group(func):
"""
Decorator to add the click_loguru logging options to a click group
"""
func = click_loguru.logging_options(func)
func = click.group()(func)
func = click_loguru.stash_subcommand()(func)
func = click.version_option(
version=VERSION, prog_name="PyCMOR - Makes CMOR Simple"
)(func)
return func
[docs]
def find_subcommands():
"""
Finds CLI Subcommands for installed plugins in both legacy and new groups.
"""
groups = ["pycmor.cli_subcommands", "pymor.cli_subcommands"]
discovered_subcommands = {}
for group in groups:
for entry_point in pkg_resources.iter_entry_points(group):
discovered_subcommands[entry_point.name] = {
"plugin_name": entry_point.module_name.split(".")[0],
"callable": entry_point.load(),
}
return discovered_subcommands
@click_loguru.logging_options
@click.group(name="pycmor", help="PyCMOR - Makes CMOR Simple")
@click_loguru.stash_subcommand()
@click.version_option(version=VERSION, prog_name=NAME)
def cli(verbose, quiet, logfile, profile_mem):
return 0
################################################################################
################################################################################
################################################################################
################################################################################
# Direct Commands
################################################################################
@cli.command()
@click_loguru.init_logger()
@click.argument("config_file", type=click.Path(exists=True))
def process(config_file):
# NOTE(PG): The ``init_logger`` decorator above removes *ALL* previously configured loggers,
# so we need to re-create the report logger here. Paul does not like this at all.
add_report_logger()
logger.info(f"Processing {config_file}")
with open(config_file, "r") as f:
cfg = yaml.safe_load(f)
cmorizer = CMORizer.from_dict(cfg)
client = Client(cmorizer._cluster) # noqa: F841
cmorizer.process()
@cli.command()
@click_loguru.init_logger()
@click.argument("config_file", type=click.Path(exists=True))
def prefect_check(config_file):
add_report_logger()
logger.info(f"Checking prefect with dummy flow using {config_file}")
with open(config_file, "r") as f:
cfg = yaml.safe_load(f)
cmorizer = CMORizer.from_dict(cfg)
client = Client(cmorizer._cluster) # noqa: F841
cmorizer.check_prefect()
@cli.command()
@click_loguru.init_logger()
def table_explorer():
logger.info("Launching table explorer...")
try:
with resources.path("pycmor", "webapp.py") as webapp_path:
sys.argv = ["streamlit", "run", str(webapp_path)]
stcli.main()
return
except Exception:
pass
with resources.path("pymor", "webapp.py") as webapp_path:
sys.argv = ["streamlit", "run", str(webapp_path)]
stcli.main()
################################################################################
# SUBCOMMANDS
################################################################################
@click_loguru.logging_options
@click.group()
@click_loguru.stash_subcommand()
@click.version_option(version=VERSION, prog_name=NAME)
def validate(verbose, quiet, logfile, profile_mem):
return 0
@click_loguru.logging_options
@click.group()
@click_loguru.stash_subcommand()
@click.version_option(version=VERSION, prog_name=NAME)
def develop(verbose, quiet, logfile, profile_mem):
return 0
@click_loguru.logging_options
@click.group()
@click_loguru.stash_subcommand()
@click.version_option(version=VERSION, prog_name=NAME)
def cache(verbose, quiet, logfile, profile_mem):
return 0
@click.group()
def scripts():
"""Various utility scripts for Pycmor."""
return 0
################################################################################
################################################################################
################################################################################
# COMMANDS FOR develop
################################################################################
@develop.command()
@click_loguru.logging_options
@click_loguru.init_logger()
@click.argument("directory", type=click.Path(exists=True))
@click.argument("output_file", type=click.File("w"), required=False, default=None)
def ls(directory, output_file, verbose, quiet, logfile, profile_mem):
yaml_str = dev_utils.ls_to_yaml(directory)
# Append to beginning of output file
if output_file is not None:
output_file.write(f"# Created with: pycmor develop ls {directory}\n")
output_file.write(yaml_str)
return 0
################################################################################
################################################################################
################################################################################
################################################################################
# COMMANDS FOR validate
################################################################################
@validate.command()
@click_loguru.logging_options
@click_loguru.init_logger()
@click.argument("config_file", type=click.Path(exists=True))
def config(config_file, verbose, quiet, logfile, profile_mem):
logger.info(f"Checking if a CMORizer can be built from {config_file}")
with open(config_file, "r") as f:
cfg = yaml.safe_load(f)
if "pipelines" in cfg:
pipelines = cfg["pipelines"]
PIPELINES_VALIDATOR.validate({"pipelines": pipelines})
if "rules" in cfg:
rules = cfg["rules"]
RULES_VALIDATOR.validate({"rules": rules})
if "general" in cfg:
general = cfg["general"]
GENERAL_VALIDATOR.validate({"general": general})
if not any(
[
PIPELINES_VALIDATOR.errors,
RULES_VALIDATOR.errors,
GENERAL_VALIDATOR.errors,
]
):
logger.success(
f"Configuration {config_file} is valid for general settings, rules, and pipelines!"
)
for key, error in {
**GENERAL_VALIDATOR.errors,
**PIPELINES_VALIDATOR.errors,
**RULES_VALIDATOR.errors,
}.items():
logger.error(f"{key}: {error}")
@validate.command()
@click_loguru.logging_options
@click_loguru.init_logger()
@click.argument("config_file", type=click.Path(exists=True))
@click.argument("table_name", type=click.STRING)
def table(config_file, table_name, verbose, quiet, logfile, profile_mem):
logger.info(f"Processing {config_file}")
with open(config_file, "r") as f:
cfg = yaml.safe_load(f)
cmorizer = CMORizer.from_dict(cfg)
cmorizer.check_rules_for_table(table_name)
@validate.command()
@click_loguru.logging_options
@click_loguru.init_logger()
@click.argument("config_file", type=click.Path(exists=True))
@click.argument("output_dir", type=click.STRING)
def directory(config_file, output_dir, verbose, quiet, logfile, profile_mem):
logger.info(f"Processing {config_file}")
with open(config_file, "r") as f:
cfg = yaml.safe_load(f)
cmorizer = CMORizer.from_dict(cfg)
cmorizer.check_rules_for_output_dir(output_dir)
################################################################################
################################################################################
################################################################################
################################################################################
# COMMANDS FOR scripts
################################################################################
@scripts.group()
def fesom1():
pass
fesom1.add_command(convert, name="nodes-to-levels")
# Add scripts commands
scripts.add_command(update_dimensionless_mappings)
################################################################################
################################################################################
################################################################################
################################################################################
# COMMANDS FOR cache
################################################################################
@cache.command()
@click_loguru.logging_options
@click_loguru.init_logger()
@click.argument(
"cache_dir",
default=f"{os.environ['HOME']}/.prefect/storage/",
type=click.Path(exists=True, dir_okay=True),
)
def inspect_prefect_global(cache_dir, verbose, quiet, logfile, profile_mem):
"""Print information about items in Prefect's storage cache"""
logger.info(f"Inspecting Prefect Cache at {cache_dir}")
caching.inspect_cache(cache_dir)
return 0
@cache.command()
@click_loguru.logging_options
@click_loguru.init_logger()
@click.argument(
"result",
type=click.Path(exists=True),
)
def inspect_prefect_result(result, verbose, quiet, logfile, profile_mem):
obj = caching.inspect_result(result)
logger.info(obj)
return 0
@cache.command()
@click_loguru.logging_options
@click.argument("files", type=click.Path(exists=True), nargs=-1)
def populate_cache(files: List, verbose, quiet, logfile, profile_mem):
fc.add_files(files)
fc.save()
################################################################################
################################################################################
################################################################################
################################################################################
# Imported subcommands
################################################################################
cli.add_command(ssh_tunnel_cli, name="ssh-tunnel")
cli.add_command(scripts)
################################################################################
################################################################################
# Defined subcommands
################################################################################
cli.add_command(develop)
cli.add_command(validate)
cli.add_command(cache)
################################################################################
################################################################################
################################################################################
[docs]
def main():
for entry_point_name, entry_point in find_subcommands().items():
cli.add_command(entry_point["callable"], name=entry_point_name)
# Prefer new env var prefix, but keep backward compatibility
cli(auto_envvar_prefix="PYCMOR")
if __name__ == "__main__":
sys.exit(main())