1"""Indexer objects for computing start/end window bounds for rolling operations"""
2from __future__ import annotations
3
4from datetime import timedelta
5
6import numpy as np
7
8from pandas._libs.window.indexers import calculate_variable_window_bounds
9from pandas.util._decorators import Appender
10
11from pandas.core.dtypes.common import ensure_platform_int
12
13from pandas.tseries.offsets import Nano
14
15get_window_bounds_doc = """
16Computes the bounds of a window.
17
18Parameters
19----------
20num_values : int, default 0
21 number of values that will be aggregated over
22window_size : int, default 0
23 the number of rows in a window
24min_periods : int, default None
25 min_periods passed from the top level rolling API
26center : bool, default None
27 center passed from the top level rolling API
28closed : str, default None
29 closed passed from the top level rolling API
30step : int, default None
31 step passed from the top level rolling API
32 .. versionadded:: 1.5
33win_type : str, default None
34 win_type passed from the top level rolling API
35
36Returns
37-------
38A tuple of ndarray[int64]s, indicating the boundaries of each
39window
40"""
41
42
43class BaseIndexer:
44 """Base class for window bounds calculations."""
45
46 def __init__(
47 self, index_array: np.ndarray | None = None, window_size: int = 0, **kwargs
48 ) -> None:
49 """
50 Parameters
51 ----------
52 **kwargs :
53 keyword arguments that will be available when get_window_bounds is called
54 """
55 self.index_array = index_array
56 self.window_size = window_size
57 # Set user defined kwargs as attributes that can be used in get_window_bounds
58 for key, value in kwargs.items():
59 setattr(self, key, value)
60
61 @Appender(get_window_bounds_doc)
62 def get_window_bounds(
63 self,
64 num_values: int = 0,
65 min_periods: int | None = None,
66 center: bool | None = None,
67 closed: str | None = None,
68 step: int | None = None,
69 ) -> tuple[np.ndarray, np.ndarray]:
70 raise NotImplementedError
71
72
73class FixedWindowIndexer(BaseIndexer):
74 """Creates window boundaries that are of fixed length."""
75
76 @Appender(get_window_bounds_doc)
77 def get_window_bounds(
78 self,
79 num_values: int = 0,
80 min_periods: int | None = None,
81 center: bool | None = None,
82 closed: str | None = None,
83 step: int | None = None,
84 ) -> tuple[np.ndarray, np.ndarray]:
85 if center:
86 offset = (self.window_size - 1) // 2
87 else:
88 offset = 0
89
90 end = np.arange(1 + offset, num_values + 1 + offset, step, dtype="int64")
91 start = end - self.window_size
92 if closed in ["left", "both"]:
93 start -= 1
94 if closed in ["left", "neither"]:
95 end -= 1
96
97 end = np.clip(end, 0, num_values)
98 start = np.clip(start, 0, num_values)
99
100 return start, end
101
102
103class VariableWindowIndexer(BaseIndexer):
104 """Creates window boundaries that are of variable length, namely for time series."""
105
106 @Appender(get_window_bounds_doc)
107 def get_window_bounds(
108 self,
109 num_values: int = 0,
110 min_periods: int | None = None,
111 center: bool | None = None,
112 closed: str | None = None,
113 step: int | None = None,
114 ) -> tuple[np.ndarray, np.ndarray]:
115 # error: Argument 4 to "calculate_variable_window_bounds" has incompatible
116 # type "Optional[bool]"; expected "bool"
117 # error: Argument 6 to "calculate_variable_window_bounds" has incompatible
118 # type "Optional[ndarray]"; expected "ndarray"
119 return calculate_variable_window_bounds(
120 num_values,
121 self.window_size,
122 min_periods,
123 center, # type: ignore[arg-type]
124 closed,
125 self.index_array, # type: ignore[arg-type]
126 )
127
128
129class VariableOffsetWindowIndexer(BaseIndexer):
130 """Calculate window boundaries based on a non-fixed offset such as a BusinessDay."""
131
132 def __init__(
133 self,
134 index_array: np.ndarray | None = None,
135 window_size: int = 0,
136 index=None,
137 offset=None,
138 **kwargs,
139 ) -> None:
140 super().__init__(index_array, window_size, **kwargs)
141 self.index = index
142 self.offset = offset
143
144 @Appender(get_window_bounds_doc)
145 def get_window_bounds(
146 self,
147 num_values: int = 0,
148 min_periods: int | None = None,
149 center: bool | None = None,
150 closed: str | None = None,
151 step: int | None = None,
152 ) -> tuple[np.ndarray, np.ndarray]:
153 if step is not None:
154 raise NotImplementedError("step not implemented for variable offset window")
155 if num_values <= 0:
156 return np.empty(0, dtype="int64"), np.empty(0, dtype="int64")
157
158 # if windows is variable, default is 'right', otherwise default is 'both'
159 if closed is None:
160 closed = "right" if self.index is not None else "both"
161
162 right_closed = closed in ["right", "both"]
163 left_closed = closed in ["left", "both"]
164
165 if self.index[num_values - 1] < self.index[0]:
166 index_growth_sign = -1
167 else:
168 index_growth_sign = 1
169
170 start = np.empty(num_values, dtype="int64")
171 start.fill(-1)
172 end = np.empty(num_values, dtype="int64")
173 end.fill(-1)
174
175 start[0] = 0
176
177 # right endpoint is closed
178 if right_closed:
179 end[0] = 1
180 # right endpoint is open
181 else:
182 end[0] = 0
183
184 # start is start of slice interval (including)
185 # end is end of slice interval (not including)
186 for i in range(1, num_values):
187 end_bound = self.index[i]
188 start_bound = self.index[i] - index_growth_sign * self.offset
189
190 # left endpoint is closed
191 if left_closed:
192 start_bound -= Nano(1)
193
194 # advance the start bound until we are
195 # within the constraint
196 start[i] = i
197 for j in range(start[i - 1], i):
198 if (self.index[j] - start_bound) * index_growth_sign > timedelta(0):
199 start[i] = j
200 break
201
202 # end bound is previous end
203 # or current index
204 if (self.index[end[i - 1]] - end_bound) * index_growth_sign <= timedelta(0):
205 end[i] = i + 1
206 else:
207 end[i] = end[i - 1]
208
209 # right endpoint is open
210 if not right_closed:
211 end[i] -= 1
212
213 return start, end
214
215
216class ExpandingIndexer(BaseIndexer):
217 """Calculate expanding window bounds, mimicking df.expanding()"""
218
219 @Appender(get_window_bounds_doc)
220 def get_window_bounds(
221 self,
222 num_values: int = 0,
223 min_periods: int | None = None,
224 center: bool | None = None,
225 closed: str | None = None,
226 step: int | None = None,
227 ) -> tuple[np.ndarray, np.ndarray]:
228 return (
229 np.zeros(num_values, dtype=np.int64),
230 np.arange(1, num_values + 1, dtype=np.int64),
231 )
232
233
234class FixedForwardWindowIndexer(BaseIndexer):
235 """
236 Creates window boundaries for fixed-length windows that include the current row.
237
238 Examples
239 --------
240 >>> df = pd.DataFrame({'B': [0, 1, 2, np.nan, 4]})
241 >>> df
242 B
243 0 0.0
244 1 1.0
245 2 2.0
246 3 NaN
247 4 4.0
248
249 >>> indexer = pd.api.indexers.FixedForwardWindowIndexer(window_size=2)
250 >>> df.rolling(window=indexer, min_periods=1).sum()
251 B
252 0 1.0
253 1 3.0
254 2 2.0
255 3 4.0
256 4 4.0
257 """
258
259 @Appender(get_window_bounds_doc)
260 def get_window_bounds(
261 self,
262 num_values: int = 0,
263 min_periods: int | None = None,
264 center: bool | None = None,
265 closed: str | None = None,
266 step: int | None = None,
267 ) -> tuple[np.ndarray, np.ndarray]:
268 if center:
269 raise ValueError("Forward-looking windows can't have center=True")
270 if closed is not None:
271 raise ValueError(
272 "Forward-looking windows don't support setting the closed argument"
273 )
274 if step is None:
275 step = 1
276
277 start = np.arange(0, num_values, step, dtype="int64")
278 end = start + self.window_size
279 if self.window_size:
280 end = np.clip(end, 0, num_values)
281
282 return start, end
283
284
285class GroupbyIndexer(BaseIndexer):
286 """Calculate bounds to compute groupby rolling, mimicking df.groupby().rolling()"""
287
288 def __init__(
289 self,
290 index_array: np.ndarray | None = None,
291 window_size: int | BaseIndexer = 0,
292 groupby_indices: dict | None = None,
293 window_indexer: type[BaseIndexer] = BaseIndexer,
294 indexer_kwargs: dict | None = None,
295 **kwargs,
296 ) -> None:
297 """
298 Parameters
299 ----------
300 index_array : np.ndarray or None
301 np.ndarray of the index of the original object that we are performing
302 a chained groupby operation over. This index has been pre-sorted relative to
303 the groups
304 window_size : int or BaseIndexer
305 window size during the windowing operation
306 groupby_indices : dict or None
307 dict of {group label: [positional index of rows belonging to the group]}
308 window_indexer : BaseIndexer
309 BaseIndexer class determining the start and end bounds of each group
310 indexer_kwargs : dict or None
311 Custom kwargs to be passed to window_indexer
312 **kwargs :
313 keyword arguments that will be available when get_window_bounds is called
314 """
315 self.groupby_indices = groupby_indices or {}
316 self.window_indexer = window_indexer
317 self.indexer_kwargs = indexer_kwargs.copy() if indexer_kwargs else {}
318 super().__init__(
319 index_array=index_array,
320 window_size=self.indexer_kwargs.pop("window_size", window_size),
321 **kwargs,
322 )
323
324 @Appender(get_window_bounds_doc)
325 def get_window_bounds(
326 self,
327 num_values: int = 0,
328 min_periods: int | None = None,
329 center: bool | None = None,
330 closed: str | None = None,
331 step: int | None = None,
332 ) -> tuple[np.ndarray, np.ndarray]:
333 # 1) For each group, get the indices that belong to the group
334 # 2) Use the indices to calculate the start & end bounds of the window
335 # 3) Append the window bounds in group order
336 start_arrays = []
337 end_arrays = []
338 window_indices_start = 0
339 for key, indices in self.groupby_indices.items():
340 index_array: np.ndarray | None
341
342 if self.index_array is not None:
343 index_array = self.index_array.take(ensure_platform_int(indices))
344 else:
345 index_array = self.index_array
346 indexer = self.window_indexer(
347 index_array=index_array,
348 window_size=self.window_size,
349 **self.indexer_kwargs,
350 )
351 start, end = indexer.get_window_bounds(
352 len(indices), min_periods, center, closed, step
353 )
354 start = start.astype(np.int64)
355 end = end.astype(np.int64)
356 assert len(start) == len(
357 end
358 ), "these should be equal in length from get_window_bounds"
359 # Cannot use groupby_indices as they might not be monotonic with the object
360 # we're rolling over
361 window_indices = np.arange(
362 window_indices_start, window_indices_start + len(indices)
363 )
364 window_indices_start += len(indices)
365 # Extend as we'll be slicing window like [start, end)
366 window_indices = np.append(window_indices, [window_indices[-1] + 1]).astype(
367 np.int64, copy=False
368 )
369 start_arrays.append(window_indices.take(ensure_platform_int(start)))
370 end_arrays.append(window_indices.take(ensure_platform_int(end)))
371 if len(start_arrays) == 0:
372 return np.array([], dtype=np.int64), np.array([], dtype=np.int64)
373 start = np.concatenate(start_arrays)
374 end = np.concatenate(end_arrays)
375 return start, end
376
377
378class ExponentialMovingWindowIndexer(BaseIndexer):
379 """Calculate ewm window bounds (the entire window)"""
380
381 @Appender(get_window_bounds_doc)
382 def get_window_bounds(
383 self,
384 num_values: int = 0,
385 min_periods: int | None = None,
386 center: bool | None = None,
387 closed: str | None = None,
388 step: int | None = None,
389 ) -> tuple[np.ndarray, np.ndarray]:
390 return np.array([0], dtype=np.int64), np.array([num_values], dtype=np.int64)