Source code for pycmor.core.calendar

"""
Yet another calendar implementation.

This module provides functions for creating date ranges.

The main components of this module are:

- ``year_bounds_major_digits``: generates a list of year ranges (bounds) where each range starts with a specific digit.
- ``date_ranges_from_bounds``: creates a list of date indexes from bounds
- ``date_ranges_from_year_bounds``: creates a list of date indexes from year bounds
- ``simple_ranges_from_bounds``: creates a list of simple ranges from bounds

Examples
--------
>>> year_bounds = year_bounds_major_digits(2000, 2010, 2, 2)
>>> print(year_bounds)
[[2000, 2001], [2002, 2003], [2004, 2005], [2006, 2007], [2008, 2009], [2010, 2010]]
"""

import pendulum
import xarray as xr

from .logging import logger


[docs] def year_bounds_major_digits(first, last, step, binning_digit, return_type=int): """ Generate year ranges with a specific first digit. This function generates a list of year ranges (bounds) where each range starts with a specific digit (binning_digit). The ranges are generated from a given start year (first) to an end year (last) with a specific step size. Parameters ---------- first : int The first year in the range. last : int The last year in the range. step : int The step size for the range. binning_digit : int The digit that each range should start with. return_type : type, optional The type of the elements in the returned list, either int or pendulum.DateTime. Defaults to int. Returns ------- list A list of lists where each inner list is a range of years. Raises ------ ValueError If the binning_digit is greater than 10. Examples -------- >>> year_bounds_major_digits(2000, 2010, 2, 2) [[2000, 2001], [2002, 2003], [2004, 2005], [2006, 2007], [2008, 2009], [2010, 2010]] >>> year_bounds_major_digits(2000, 2010, 3, 3) [[2000, 2002], [2003, 2005], [2006, 2008], [2009, 2010]] Notes ----- This function uses a while loop to iterate through the years from first to last. It checks the ones digit of the current year and compares it with the binning_digit to determine the start of a new range. If the first range is undersized (i.e., the binning_digit is in the ones digit of the first few years), the function will continue to increment the current year until it hits the binning_digit. If the first range is not undersized, the function will continue to increment the current year until it hits the next binning_digit. Once a range is completed, it is appended to the bounds list and the process continues until the last year is reached. """ # NOTE(PG): This is a bit hacky and difficult to read, but all the tests pass... logger.debug( f"Running year_bounds_major_digits({first=}, {last=}, {step=}, {binning_digit=})" ) if binning_digit >= 10: raise ValueError("Give a binning_digit less than 10") bounds = [] current_location = bin_start = first first_bin_is_undersized = binning_digit in [ i % 10 for i in range(first, first + step) ] bin_end = "underfull bin" if first_bin_is_undersized else bin_start + step logger.debug(f"first_bin_is_undersized: {first_bin_is_undersized}") first_bin_empty = True while current_location <= last: ones_digit = current_location % 10 if first_bin_empty: if first_bin_is_undersized: # Go until you hit the binning digit if ones_digit != binning_digit: current_location += 1 ones_digit = current_location % 10 else: bounds.append([bin_start, current_location - 1]) logger.debug( f"Appending bounds {bin_start=}, {current_location-1=}" ) first_bin_empty = False bin_start = current_location else: # Go until you hit the next binning digit if ones_digit == binning_digit: bounds.append([bin_start, current_location - 1]) logger.debug( f"Appending bounds {bin_start=}, {current_location-1=}" ) first_bin_empty = False bin_start = current_location else: current_location += 1 else: bin_end = bin_start + step current_location += 1 if current_location == bin_end or current_location > last: bounds.append([bin_start, min(current_location - 1, last)]) logger.debug( f"Appending bounds {bin_start=}, {min(current_location-1, last)=}" ) bin_start = current_location if return_type is int: return [[int(i) for i in bound] for bound in bounds] elif return_type is pendulum.DateTime: return [[pendulum.datetime(int(i), 1, 1) for i in bound] for bound in bounds] else: raise ValueError("return_type must be either int or pendulum.DateTime")
[docs] def date_ranges_from_bounds(bounds, freq: str = "M", **kwargs): """ Class method to create a list of instances from a list of start and end bounds. Parameters ---------- bounds : list of tuple of str or datetime-like A list of strings or datetime-like tuples each containing a start and end bound. freq : str, optional The frequency of the periods. Defaults to one month. **kwargs : Additional keyword arguments to pass to the date_range function. Returns ------- tuple A tuple containing instances of the class for each provided bound. Examples -------- >>> bounds = [("2020-01-01", "2020-12-31")] >>> date_ranges_from_bounds(bounds, freq="M") DatetimeIndex(['2020-01-31', '2020-02-29', ..., '2020-12-31'], dtype='datetime64[ns]', freq='ME') """ objs = [] for start, end in bounds: objs.append(xr.date_range(start=start, end=end, freq=freq, **kwargs)) if len(objs) == 1: return objs[0] return (*objs,)
[docs] def date_ranges_from_year_bounds(year_bounds, freq: str = "M", **kwargs): """ Class method to create a list of instances from a list of year bounds. Parameters ---------- year_bounds : list of lists or tuples A list of lists, each containing a start and end year. freq : str, optional The frequency of the periods. Defaults to one month. **kwargs : Additional keyword arguments to pass to the date_range function. """ bounds = [ (pendulum.datetime(start, 1, 1), pendulum.datetime(end, 12, 31)) for start, end in year_bounds ] return date_ranges_from_bounds(bounds, freq, **kwargs)
[docs] def simple_ranges_from_bounds(bounds): """ Create a list of simple ranges from a list of bounds. """ if len(bounds) == 1: start, end = bounds[0] return range(start, end + 1) return [range(start, end + 1) for start, end in bounds]
[docs] def assign_time_axis(da: xr.DataArray, taxis): return da.assign_coords(time=taxis)