1"""Indexer objects for computing start/end window bounds for rolling operations"""
2from __future__ import annotations
3
4from datetime import timedelta
5
6import numpy as np
7
8from pandas._libs.tslibs import BaseOffset
9from pandas._libs.window.indexers import calculate_variable_window_bounds
10from pandas.util._decorators import Appender
11
12from pandas.core.dtypes.common import ensure_platform_int
13
14from pandas.core.indexes.datetimes import DatetimeIndex
15
16from pandas.tseries.offsets import Nano
17
18get_window_bounds_doc = """
19Computes the bounds of a window.
20
21Parameters
22----------
23num_values : int, default 0
24 number of values that will be aggregated over
25window_size : int, default 0
26 the number of rows in a window
27min_periods : int, default None
28 min_periods passed from the top level rolling API
29center : bool, default None
30 center passed from the top level rolling API
31closed : str, default None
32 closed passed from the top level rolling API
33step : int, default None
34 step passed from the top level rolling API
35 .. versionadded:: 1.5
36win_type : str, default None
37 win_type passed from the top level rolling API
38
39Returns
40-------
41A tuple of ndarray[int64]s, indicating the boundaries of each
42window
43"""
44
45
46class BaseIndexer:
47 """
48 Base class for window bounds calculations.
49
50 Examples
51 --------
52 >>> from pandas.api.indexers import BaseIndexer
53 >>> class CustomIndexer(BaseIndexer):
54 ... def get_window_bounds(self, num_values, min_periods, center, closed, step):
55 ... start = np.empty(num_values, dtype=np.int64)
56 ... end = np.empty(num_values, dtype=np.int64)
57 ... for i in range(num_values):
58 ... start[i] = i
59 ... end[i] = i + self.window_size
60 ... return start, end
61 >>> df = pd.DataFrame({"values": range(5)})
62 >>> indexer = CustomIndexer(window_size=2)
63 >>> df.rolling(indexer).sum()
64 values
65 0 1.0
66 1 3.0
67 2 5.0
68 3 7.0
69 4 4.0
70 """
71
72 def __init__(
73 self, index_array: np.ndarray | None = None, window_size: int = 0, **kwargs
74 ) -> None:
75 self.index_array = index_array
76 self.window_size = window_size
77 # Set user defined kwargs as attributes that can be used in get_window_bounds
78 for key, value in kwargs.items():
79 setattr(self, key, value)
80
81 @Appender(get_window_bounds_doc)
82 def get_window_bounds(
83 self,
84 num_values: int = 0,
85 min_periods: int | None = None,
86 center: bool | None = None,
87 closed: str | None = None,
88 step: int | None = None,
89 ) -> tuple[np.ndarray, np.ndarray]:
90 raise NotImplementedError
91
92
93class FixedWindowIndexer(BaseIndexer):
94 """Creates window boundaries that are of fixed length."""
95
96 @Appender(get_window_bounds_doc)
97 def get_window_bounds(
98 self,
99 num_values: int = 0,
100 min_periods: int | None = None,
101 center: bool | None = None,
102 closed: str | None = None,
103 step: int | None = None,
104 ) -> tuple[np.ndarray, np.ndarray]:
105 if center or self.window_size == 0:
106 offset = (self.window_size - 1) // 2
107 else:
108 offset = 0
109
110 end = np.arange(1 + offset, num_values + 1 + offset, step, dtype="int64")
111 start = end - self.window_size
112 if closed in ["left", "both"]:
113 start -= 1
114 if closed in ["left", "neither"]:
115 end -= 1
116
117 end = np.clip(end, 0, num_values)
118 start = np.clip(start, 0, num_values)
119
120 return start, end
121
122
123class VariableWindowIndexer(BaseIndexer):
124 """Creates window boundaries that are of variable length, namely for time series."""
125
126 @Appender(get_window_bounds_doc)
127 def get_window_bounds(
128 self,
129 num_values: int = 0,
130 min_periods: int | None = None,
131 center: bool | None = None,
132 closed: str | None = None,
133 step: int | None = None,
134 ) -> tuple[np.ndarray, np.ndarray]:
135 # error: Argument 4 to "calculate_variable_window_bounds" has incompatible
136 # type "Optional[bool]"; expected "bool"
137 # error: Argument 6 to "calculate_variable_window_bounds" has incompatible
138 # type "Optional[ndarray]"; expected "ndarray"
139 return calculate_variable_window_bounds(
140 num_values,
141 self.window_size,
142 min_periods,
143 center, # type: ignore[arg-type]
144 closed,
145 self.index_array, # type: ignore[arg-type]
146 )
147
148
149class VariableOffsetWindowIndexer(BaseIndexer):
150 """
151 Calculate window boundaries based on a non-fixed offset such as a BusinessDay.
152
153 Examples
154 --------
155 >>> from pandas.api.indexers import VariableOffsetWindowIndexer
156 >>> df = pd.DataFrame(range(10), index=pd.date_range("2020", periods=10))
157 >>> offset = pd.offsets.BDay(1)
158 >>> indexer = VariableOffsetWindowIndexer(index=df.index, offset=offset)
159 >>> df
160 0
161 2020-01-01 0
162 2020-01-02 1
163 2020-01-03 2
164 2020-01-04 3
165 2020-01-05 4
166 2020-01-06 5
167 2020-01-07 6
168 2020-01-08 7
169 2020-01-09 8
170 2020-01-10 9
171 >>> df.rolling(indexer).sum()
172 0
173 2020-01-01 0.0
174 2020-01-02 1.0
175 2020-01-03 2.0
176 2020-01-04 3.0
177 2020-01-05 7.0
178 2020-01-06 12.0
179 2020-01-07 6.0
180 2020-01-08 7.0
181 2020-01-09 8.0
182 2020-01-10 9.0
183 """
184
185 def __init__(
186 self,
187 index_array: np.ndarray | None = None,
188 window_size: int = 0,
189 index: DatetimeIndex | None = None,
190 offset: BaseOffset | None = None,
191 **kwargs,
192 ) -> None:
193 super().__init__(index_array, window_size, **kwargs)
194 if not isinstance(index, DatetimeIndex):
195 raise ValueError("index must be a DatetimeIndex.")
196 self.index = index
197 if not isinstance(offset, BaseOffset):
198 raise ValueError("offset must be a DateOffset-like object.")
199 self.offset = offset
200
201 @Appender(get_window_bounds_doc)
202 def get_window_bounds(
203 self,
204 num_values: int = 0,
205 min_periods: int | None = None,
206 center: bool | None = None,
207 closed: str | None = None,
208 step: int | None = None,
209 ) -> tuple[np.ndarray, np.ndarray]:
210 if step is not None:
211 raise NotImplementedError("step not implemented for variable offset window")
212 if num_values <= 0:
213 return np.empty(0, dtype="int64"), np.empty(0, dtype="int64")
214
215 # if windows is variable, default is 'right', otherwise default is 'both'
216 if closed is None:
217 closed = "right" if self.index is not None else "both"
218
219 right_closed = closed in ["right", "both"]
220 left_closed = closed in ["left", "both"]
221
222 if self.index[num_values - 1] < self.index[0]:
223 index_growth_sign = -1
224 else:
225 index_growth_sign = 1
226 offset_diff = index_growth_sign * self.offset
227
228 start = np.empty(num_values, dtype="int64")
229 start.fill(-1)
230 end = np.empty(num_values, dtype="int64")
231 end.fill(-1)
232
233 start[0] = 0
234
235 # right endpoint is closed
236 if right_closed:
237 end[0] = 1
238 # right endpoint is open
239 else:
240 end[0] = 0
241
242 zero = timedelta(0)
243 # start is start of slice interval (including)
244 # end is end of slice interval (not including)
245 for i in range(1, num_values):
246 end_bound = self.index[i]
247 start_bound = end_bound - offset_diff
248
249 # left endpoint is closed
250 if left_closed:
251 start_bound -= Nano(1)
252
253 # advance the start bound until we are
254 # within the constraint
255 start[i] = i
256 for j in range(start[i - 1], i):
257 start_diff = (self.index[j] - start_bound) * index_growth_sign
258 if start_diff > zero:
259 start[i] = j
260 break
261
262 # end bound is previous end
263 # or current index
264 end_diff = (self.index[end[i - 1]] - end_bound) * index_growth_sign
265 if end_diff == zero and not right_closed:
266 end[i] = end[i - 1] + 1
267 elif end_diff <= zero:
268 end[i] = i + 1
269 else:
270 end[i] = end[i - 1]
271
272 # right endpoint is open
273 if not right_closed:
274 end[i] -= 1
275
276 return start, end
277
278
279class ExpandingIndexer(BaseIndexer):
280 """Calculate expanding window bounds, mimicking df.expanding()"""
281
282 @Appender(get_window_bounds_doc)
283 def get_window_bounds(
284 self,
285 num_values: int = 0,
286 min_periods: int | None = None,
287 center: bool | None = None,
288 closed: str | None = None,
289 step: int | None = None,
290 ) -> tuple[np.ndarray, np.ndarray]:
291 return (
292 np.zeros(num_values, dtype=np.int64),
293 np.arange(1, num_values + 1, dtype=np.int64),
294 )
295
296
297class FixedForwardWindowIndexer(BaseIndexer):
298 """
299 Creates window boundaries for fixed-length windows that include the current row.
300
301 Examples
302 --------
303 >>> df = pd.DataFrame({'B': [0, 1, 2, np.nan, 4]})
304 >>> df
305 B
306 0 0.0
307 1 1.0
308 2 2.0
309 3 NaN
310 4 4.0
311
312 >>> indexer = pd.api.indexers.FixedForwardWindowIndexer(window_size=2)
313 >>> df.rolling(window=indexer, min_periods=1).sum()
314 B
315 0 1.0
316 1 3.0
317 2 2.0
318 3 4.0
319 4 4.0
320 """
321
322 @Appender(get_window_bounds_doc)
323 def get_window_bounds(
324 self,
325 num_values: int = 0,
326 min_periods: int | None = None,
327 center: bool | None = None,
328 closed: str | None = None,
329 step: int | None = None,
330 ) -> tuple[np.ndarray, np.ndarray]:
331 if center:
332 raise ValueError("Forward-looking windows can't have center=True")
333 if closed is not None:
334 raise ValueError(
335 "Forward-looking windows don't support setting the closed argument"
336 )
337 if step is None:
338 step = 1
339
340 start = np.arange(0, num_values, step, dtype="int64")
341 end = start + self.window_size
342 if self.window_size:
343 end = np.clip(end, 0, num_values)
344
345 return start, end
346
347
348class GroupbyIndexer(BaseIndexer):
349 """Calculate bounds to compute groupby rolling, mimicking df.groupby().rolling()"""
350
351 def __init__(
352 self,
353 index_array: np.ndarray | None = None,
354 window_size: int | BaseIndexer = 0,
355 groupby_indices: dict | None = None,
356 window_indexer: type[BaseIndexer] = BaseIndexer,
357 indexer_kwargs: dict | None = None,
358 **kwargs,
359 ) -> None:
360 """
361 Parameters
362 ----------
363 index_array : np.ndarray or None
364 np.ndarray of the index of the original object that we are performing
365 a chained groupby operation over. This index has been pre-sorted relative to
366 the groups
367 window_size : int or BaseIndexer
368 window size during the windowing operation
369 groupby_indices : dict or None
370 dict of {group label: [positional index of rows belonging to the group]}
371 window_indexer : BaseIndexer
372 BaseIndexer class determining the start and end bounds of each group
373 indexer_kwargs : dict or None
374 Custom kwargs to be passed to window_indexer
375 **kwargs :
376 keyword arguments that will be available when get_window_bounds is called
377 """
378 self.groupby_indices = groupby_indices or {}
379 self.window_indexer = window_indexer
380 self.indexer_kwargs = indexer_kwargs.copy() if indexer_kwargs else {}
381 super().__init__(
382 index_array=index_array,
383 window_size=self.indexer_kwargs.pop("window_size", window_size),
384 **kwargs,
385 )
386
387 @Appender(get_window_bounds_doc)
388 def get_window_bounds(
389 self,
390 num_values: int = 0,
391 min_periods: int | None = None,
392 center: bool | None = None,
393 closed: str | None = None,
394 step: int | None = None,
395 ) -> tuple[np.ndarray, np.ndarray]:
396 # 1) For each group, get the indices that belong to the group
397 # 2) Use the indices to calculate the start & end bounds of the window
398 # 3) Append the window bounds in group order
399 start_arrays = []
400 end_arrays = []
401 window_indices_start = 0
402 for key, indices in self.groupby_indices.items():
403 index_array: np.ndarray | None
404
405 if self.index_array is not None:
406 index_array = self.index_array.take(ensure_platform_int(indices))
407 else:
408 index_array = self.index_array
409 indexer = self.window_indexer(
410 index_array=index_array,
411 window_size=self.window_size,
412 **self.indexer_kwargs,
413 )
414 start, end = indexer.get_window_bounds(
415 len(indices), min_periods, center, closed, step
416 )
417 start = start.astype(np.int64)
418 end = end.astype(np.int64)
419 assert len(start) == len(
420 end
421 ), "these should be equal in length from get_window_bounds"
422 # Cannot use groupby_indices as they might not be monotonic with the object
423 # we're rolling over
424 window_indices = np.arange(
425 window_indices_start, window_indices_start + len(indices)
426 )
427 window_indices_start += len(indices)
428 # Extend as we'll be slicing window like [start, end)
429 window_indices = np.append(window_indices, [window_indices[-1] + 1]).astype(
430 np.int64, copy=False
431 )
432 start_arrays.append(window_indices.take(ensure_platform_int(start)))
433 end_arrays.append(window_indices.take(ensure_platform_int(end)))
434 if len(start_arrays) == 0:
435 return np.array([], dtype=np.int64), np.array([], dtype=np.int64)
436 start = np.concatenate(start_arrays)
437 end = np.concatenate(end_arrays)
438 return start, end
439
440
441class ExponentialMovingWindowIndexer(BaseIndexer):
442 """Calculate ewm window bounds (the entire window)"""
443
444 @Appender(get_window_bounds_doc)
445 def get_window_bounds(
446 self,
447 num_values: int = 0,
448 min_periods: int | None = None,
449 center: bool | None = None,
450 closed: str | None = None,
451 step: int | None = None,
452 ) -> tuple[np.ndarray, np.ndarray]:
453 return np.array([0], dtype=np.int64), np.array([num_values], dtype=np.int64)