1"""
2masked_accumulations.py is for accumulation algorithms using a mask-based approach
3for missing values.
4"""
5
6from __future__ import annotations
7
8from typing import Callable
9
10import numpy as np
11
12from pandas._typing import npt
13
14from pandas.core.dtypes.common import (
15 is_bool_dtype,
16 is_float_dtype,
17 is_integer_dtype,
18)
19
20
21def _cum_func(
22 func: Callable,
23 values: np.ndarray,
24 mask: npt.NDArray[np.bool_],
25 *,
26 skipna: bool = True,
27):
28 """
29 Accumulations for 1D masked array.
30
31 We will modify values in place to replace NAs with the appropriate fill value.
32
33 Parameters
34 ----------
35 func : np.cumsum, np.cumprod, np.maximum.accumulate, np.minimum.accumulate
36 values : np.ndarray
37 Numpy array with the values (can be of any dtype that support the
38 operation).
39 mask : np.ndarray
40 Boolean numpy array (True values indicate missing values).
41 skipna : bool, default True
42 Whether to skip NA.
43 """
44 dtype_info: np.iinfo | np.finfo
45 if is_float_dtype(values):
46 dtype_info = np.finfo(values.dtype.type)
47 elif is_integer_dtype(values):
48 dtype_info = np.iinfo(values.dtype.type)
49 elif is_bool_dtype(values):
50 # Max value of bool is 1, but since we are setting into a boolean
51 # array, 255 is fine as well. Min value has to be 0 when setting
52 # into the boolean array.
53 dtype_info = np.iinfo(np.uint8)
54 else:
55 raise NotImplementedError(
56 f"No masked accumulation defined for dtype {values.dtype.type}"
57 )
58 try:
59 fill_value = {
60 np.cumprod: 1,
61 np.maximum.accumulate: dtype_info.min,
62 np.cumsum: 0,
63 np.minimum.accumulate: dtype_info.max,
64 }[func]
65 except KeyError:
66 raise NotImplementedError(
67 f"No accumulation for {func} implemented on BaseMaskedArray"
68 )
69
70 values[mask] = fill_value
71
72 if not skipna:
73 mask = np.maximum.accumulate(mask)
74
75 values = func(values)
76 return values, mask
77
78
79def cumsum(values: np.ndarray, mask: npt.NDArray[np.bool_], *, skipna: bool = True):
80 return _cum_func(np.cumsum, values, mask, skipna=skipna)
81
82
83def cumprod(values: np.ndarray, mask: npt.NDArray[np.bool_], *, skipna: bool = True):
84 return _cum_func(np.cumprod, values, mask, skipna=skipna)
85
86
87def cummin(values: np.ndarray, mask: npt.NDArray[np.bool_], *, skipna: bool = True):
88 return _cum_func(np.minimum.accumulate, values, mask, skipna=skipna)
89
90
91def cummax(values: np.ndarray, mask: npt.NDArray[np.bool_], *, skipna: bool = True):
92 return _cum_func(np.maximum.accumulate, values, mask, skipna=skipna)