1"""
2masked_accumulations.py is for accumulation algorithms using a mask-based approach
3for missing values.
4"""
5
6from __future__ import annotations
7
8from typing import (
9 TYPE_CHECKING,
10 Callable,
11)
12
13import numpy as np
14
15if TYPE_CHECKING:
16 from pandas._typing import npt
17
18
19def _cum_func(
20 func: Callable,
21 values: np.ndarray,
22 mask: npt.NDArray[np.bool_],
23 *,
24 skipna: bool = True,
25):
26 """
27 Accumulations for 1D masked array.
28
29 We will modify values in place to replace NAs with the appropriate fill value.
30
31 Parameters
32 ----------
33 func : np.cumsum, np.cumprod, np.maximum.accumulate, np.minimum.accumulate
34 values : np.ndarray
35 Numpy array with the values (can be of any dtype that support the
36 operation).
37 mask : np.ndarray
38 Boolean numpy array (True values indicate missing values).
39 skipna : bool, default True
40 Whether to skip NA.
41 """
42 dtype_info: np.iinfo | np.finfo
43 if values.dtype.kind == "f":
44 dtype_info = np.finfo(values.dtype.type)
45 elif values.dtype.kind in "iu":
46 dtype_info = np.iinfo(values.dtype.type)
47 elif values.dtype.kind == "b":
48 # Max value of bool is 1, but since we are setting into a boolean
49 # array, 255 is fine as well. Min value has to be 0 when setting
50 # into the boolean array.
51 dtype_info = np.iinfo(np.uint8)
52 else:
53 raise NotImplementedError(
54 f"No masked accumulation defined for dtype {values.dtype.type}"
55 )
56 try:
57 fill_value = {
58 np.cumprod: 1,
59 np.maximum.accumulate: dtype_info.min,
60 np.cumsum: 0,
61 np.minimum.accumulate: dtype_info.max,
62 }[func]
63 except KeyError:
64 raise NotImplementedError(
65 f"No accumulation for {func} implemented on BaseMaskedArray"
66 )
67
68 values[mask] = fill_value
69
70 if not skipna:
71 mask = np.maximum.accumulate(mask)
72
73 values = func(values)
74 return values, mask
75
76
77def cumsum(values: np.ndarray, mask: npt.NDArray[np.bool_], *, skipna: bool = True):
78 return _cum_func(np.cumsum, values, mask, skipna=skipna)
79
80
81def cumprod(values: np.ndarray, mask: npt.NDArray[np.bool_], *, skipna: bool = True):
82 return _cum_func(np.cumprod, values, mask, skipna=skipna)
83
84
85def cummin(values: np.ndarray, mask: npt.NDArray[np.bool_], *, skipna: bool = True):
86 return _cum_func(np.minimum.accumulate, values, mask, skipna=skipna)
87
88
89def cummax(values: np.ndarray, mask: npt.NDArray[np.bool_], *, skipna: bool = True):
90 return _cum_func(np.maximum.accumulate, values, mask, skipna=skipna)