Source code for pycmor.core.cluster

"""
This module contains the functions to manage the Dask cluster.
"""

from contextlib import contextmanager

import dask
from dask.distributed import LocalCluster
from dask_jobqueue import SLURMCluster

from .logging import logger

CLUSTER_MAPPINGS = {
    "local": LocalCluster,
    "slurm": SLURMCluster,
}
CLUSTER_SCALE_SUPPORT = {"local": False, "slurm": True}
CLUSTER_ADAPT_SUPPORT = {"local": False, "slurm": True}






[docs] class DaskContext: """ Global singleton to store the current Dask cluster. This class ensures that there is only one active Dask cluster at any given time. It provides methods to set and retrieve the current cluster. Examples -------- Setting a Dask cluster: >>> from dask.distributed import LocalCluster >>> cluster = LocalCluster() >>> with DaskContext.set_cluster(cluster): ... # Perform operations with the active cluster ... active_cluster = DaskContext.get_cluster() ... print(active_cluster) # Outputs the current cluster LocalCluster(...) Retrieving the current Dask cluster: >>> try: ... active_cluster = DaskContext.get_cluster() ... except RuntimeError as e: ... print(e) No active Dask cluster in context! """ _current_cluster = None
[docs] @classmethod @contextmanager def set_cluster(cls, cluster): logger.debug(f"Setting Dask cluster {cluster=} in context!") cls._current_cluster = cluster try: yield finally: logger.debug("Removing Dask cluster from context!") cls._current_cluster = None
[docs] @classmethod def get_cluster(cls): if cls._current_cluster is None: raise RuntimeError("No active Dask cluster in context!") return cls._current_cluster