1"""
2Provide a generic structure to support window functions,
3similar to how we have a Groupby object.
4"""
5from __future__ import annotations
6
7import copy
8from datetime import timedelta
9from functools import partial
10import inspect
11from textwrap import dedent
12from typing import (
13 TYPE_CHECKING,
14 Any,
15 Callable,
16 Hashable,
17 Iterator,
18 Sized,
19 cast,
20)
21
22import numpy as np
23
24from pandas._libs.tslibs import (
25 BaseOffset,
26 to_offset,
27)
28import pandas._libs.window.aggregations as window_aggregations
29from pandas._typing import (
30 ArrayLike,
31 Axis,
32 NDFrameT,
33 QuantileInterpolation,
34 WindowingRankType,
35)
36from pandas.compat._optional import import_optional_dependency
37from pandas.errors import DataError
38from pandas.util._decorators import doc
39
40from pandas.core.dtypes.common import (
41 ensure_float64,
42 is_bool,
43 is_integer,
44 is_list_like,
45 is_numeric_dtype,
46 is_scalar,
47 needs_i8_conversion,
48)
49from pandas.core.dtypes.generic import (
50 ABCDataFrame,
51 ABCSeries,
52)
53from pandas.core.dtypes.missing import notna
54
55from pandas.core._numba import executor
56from pandas.core.algorithms import factorize
57from pandas.core.apply import ResamplerWindowApply
58from pandas.core.arrays import ExtensionArray
59from pandas.core.base import SelectionMixin
60import pandas.core.common as com
61from pandas.core.indexers.objects import (
62 BaseIndexer,
63 FixedWindowIndexer,
64 GroupbyIndexer,
65 VariableWindowIndexer,
66)
67from pandas.core.indexes.api import (
68 DatetimeIndex,
69 Index,
70 MultiIndex,
71 PeriodIndex,
72 TimedeltaIndex,
73)
74from pandas.core.reshape.concat import concat
75from pandas.core.util.numba_ import (
76 get_jit_arguments,
77 maybe_use_numba,
78)
79from pandas.core.window.common import (
80 flex_binary_moment,
81 zsqrt,
82)
83from pandas.core.window.doc import (
84 _shared_docs,
85 create_section_header,
86 kwargs_numeric_only,
87 kwargs_scipy,
88 numba_notes,
89 template_header,
90 template_returns,
91 template_see_also,
92 window_agg_numba_parameters,
93 window_apply_parameters,
94)
95from pandas.core.window.numba_ import (
96 generate_manual_numpy_nan_agg_with_axis,
97 generate_numba_apply_func,
98 generate_numba_table_func,
99)
100
101if TYPE_CHECKING:
102 from pandas import (
103 DataFrame,
104 Series,
105 )
106 from pandas.core.generic import NDFrame
107 from pandas.core.groupby.ops import BaseGrouper
108
109
110class BaseWindow(SelectionMixin):
111 """Provides utilities for performing windowing operations."""
112
113 _attributes: list[str] = []
114 exclusions: frozenset[Hashable] = frozenset()
115 _on: Index
116
117 def __init__(
118 self,
119 obj: NDFrame,
120 window=None,
121 min_periods: int | None = None,
122 center: bool | None = False,
123 win_type: str | None = None,
124 axis: Axis = 0,
125 on: str | Index | None = None,
126 closed: str | None = None,
127 step: int | None = None,
128 method: str = "single",
129 *,
130 selection=None,
131 ) -> None:
132 self.obj = obj
133 self.on = on
134 self.closed = closed
135 self.step = step
136 self.window = window
137 self.min_periods = min_periods
138 self.center = center
139 self.win_type = win_type
140 self.axis = obj._get_axis_number(axis) if axis is not None else None
141 self.method = method
142 self._win_freq_i8: int | None = None
143 if self.on is None:
144 if self.axis == 0:
145 self._on = self.obj.index
146 else:
147 # i.e. self.axis == 1
148 self._on = self.obj.columns
149 elif isinstance(self.on, Index):
150 self._on = self.on
151 elif isinstance(self.obj, ABCDataFrame) and self.on in self.obj.columns:
152 self._on = Index(self.obj[self.on])
153 else:
154 raise ValueError(
155 f"invalid on specified as {self.on}, "
156 "must be a column (of DataFrame), an Index or None"
157 )
158
159 self._selection = selection
160 self._validate()
161
162 def _validate(self) -> None:
163 if self.center is not None and not is_bool(self.center):
164 raise ValueError("center must be a boolean")
165 if self.min_periods is not None:
166 if not is_integer(self.min_periods):
167 raise ValueError("min_periods must be an integer")
168 if self.min_periods < 0:
169 raise ValueError("min_periods must be >= 0")
170 if is_integer(self.window) and self.min_periods > self.window:
171 raise ValueError(
172 f"min_periods {self.min_periods} must be <= window {self.window}"
173 )
174 if self.closed is not None and self.closed not in [
175 "right",
176 "both",
177 "left",
178 "neither",
179 ]:
180 raise ValueError("closed must be 'right', 'left', 'both' or 'neither'")
181 if not isinstance(self.obj, (ABCSeries, ABCDataFrame)):
182 raise TypeError(f"invalid type: {type(self)}")
183 if isinstance(self.window, BaseIndexer):
184 # Validate that the passed BaseIndexer subclass has
185 # a get_window_bounds with the correct signature.
186 get_window_bounds_signature = inspect.signature(
187 self.window.get_window_bounds
188 ).parameters.keys()
189 expected_signature = inspect.signature(
190 BaseIndexer().get_window_bounds
191 ).parameters.keys()
192 if get_window_bounds_signature != expected_signature:
193 raise ValueError(
194 f"{type(self.window).__name__} does not implement "
195 f"the correct signature for get_window_bounds"
196 )
197 if self.method not in ["table", "single"]:
198 raise ValueError("method must be 'table' or 'single")
199 if self.step is not None:
200 if not is_integer(self.step):
201 raise ValueError("step must be an integer")
202 if self.step < 0:
203 raise ValueError("step must be >= 0")
204
205 def _check_window_bounds(
206 self, start: np.ndarray, end: np.ndarray, num_vals: int
207 ) -> None:
208 if len(start) != len(end):
209 raise ValueError(
210 f"start ({len(start)}) and end ({len(end)}) bounds must be the "
211 f"same length"
212 )
213 if len(start) != (num_vals + (self.step or 1) - 1) // (self.step or 1):
214 raise ValueError(
215 f"start and end bounds ({len(start)}) must be the same length "
216 f"as the object ({num_vals}) divided by the step ({self.step}) "
217 f"if given and rounded up"
218 )
219
220 def _slice_axis_for_step(self, index: Index, result: Sized | None = None) -> Index:
221 """
222 Slices the index for a given result and the preset step.
223 """
224 return (
225 index
226 if result is None or len(result) == len(index)
227 else index[:: self.step]
228 )
229
230 def _validate_numeric_only(self, name: str, numeric_only: bool) -> None:
231 """
232 Validate numeric_only argument, raising if invalid for the input.
233
234 Parameters
235 ----------
236 name : str
237 Name of the operator (kernel).
238 numeric_only : bool
239 Value passed by user.
240 """
241 if (
242 self._selected_obj.ndim == 1
243 and numeric_only
244 and not is_numeric_dtype(self._selected_obj.dtype)
245 ):
246 raise NotImplementedError(
247 f"{type(self).__name__}.{name} does not implement numeric_only"
248 )
249
250 def _make_numeric_only(self, obj: NDFrameT) -> NDFrameT:
251 """Subset DataFrame to numeric columns.
252
253 Parameters
254 ----------
255 obj : DataFrame
256
257 Returns
258 -------
259 obj subset to numeric-only columns.
260 """
261 result = obj.select_dtypes(include=["number"], exclude=["timedelta"])
262 return result
263
264 def _create_data(self, obj: NDFrameT, numeric_only: bool = False) -> NDFrameT:
265 """
266 Split data into blocks & return conformed data.
267 """
268 # filter out the on from the object
269 if self.on is not None and not isinstance(self.on, Index) and obj.ndim == 2:
270 obj = obj.reindex(columns=obj.columns.difference([self.on]), copy=False)
271 if obj.ndim > 1 and (numeric_only or self.axis == 1):
272 # GH: 20649 in case of mixed dtype and axis=1 we have to convert everything
273 # to float to calculate the complete row at once. We exclude all non-numeric
274 # dtypes.
275 obj = self._make_numeric_only(obj)
276 if self.axis == 1:
277 obj = obj.astype("float64", copy=False)
278 obj._mgr = obj._mgr.consolidate()
279 return obj
280
281 def _gotitem(self, key, ndim, subset=None):
282 """
283 Sub-classes to define. Return a sliced object.
284
285 Parameters
286 ----------
287 key : str / list of selections
288 ndim : {1, 2}
289 requested ndim of result
290 subset : object, default None
291 subset to act on
292 """
293 # create a new object to prevent aliasing
294 if subset is None:
295 subset = self.obj
296
297 # we need to make a shallow copy of ourselves
298 # with the same groupby
299 kwargs = {attr: getattr(self, attr) for attr in self._attributes}
300
301 selection = None
302 if subset.ndim == 2 and (
303 (is_scalar(key) and key in subset) or is_list_like(key)
304 ):
305 selection = key
306 elif subset.ndim == 1 and is_scalar(key) and key == subset.name:
307 selection = key
308
309 new_win = type(self)(subset, selection=selection, **kwargs)
310 return new_win
311
312 def __getattr__(self, attr: str):
313 if attr in self._internal_names_set:
314 return object.__getattribute__(self, attr)
315 if attr in self.obj:
316 return self[attr]
317
318 raise AttributeError(
319 f"'{type(self).__name__}' object has no attribute '{attr}'"
320 )
321
322 def _dir_additions(self):
323 return self.obj._dir_additions()
324
325 def __repr__(self) -> str:
326 """
327 Provide a nice str repr of our rolling object.
328 """
329 attrs_list = (
330 f"{attr_name}={getattr(self, attr_name)}"
331 for attr_name in self._attributes
332 if getattr(self, attr_name, None) is not None and attr_name[0] != "_"
333 )
334 attrs = ",".join(attrs_list)
335 return f"{type(self).__name__} [{attrs}]"
336
337 def __iter__(self) -> Iterator:
338 obj = self._selected_obj.set_axis(self._on)
339 obj = self._create_data(obj)
340 indexer = self._get_window_indexer()
341
342 start, end = indexer.get_window_bounds(
343 num_values=len(obj),
344 min_periods=self.min_periods,
345 center=self.center,
346 closed=self.closed,
347 step=self.step,
348 )
349 self._check_window_bounds(start, end, len(obj))
350
351 for s, e in zip(start, end):
352 result = obj.iloc[slice(s, e)]
353 yield result
354
355 def _prep_values(self, values: ArrayLike) -> np.ndarray:
356 """Convert input to numpy arrays for Cython routines"""
357 if needs_i8_conversion(values.dtype):
358 raise NotImplementedError(
359 f"ops for {type(self).__name__} for this "
360 f"dtype {values.dtype} are not implemented"
361 )
362 # GH #12373 : rolling functions error on float32 data
363 # make sure the data is coerced to float64
364 try:
365 if isinstance(values, ExtensionArray):
366 values = values.to_numpy(np.float64, na_value=np.nan)
367 else:
368 values = ensure_float64(values)
369 except (ValueError, TypeError) as err:
370 raise TypeError(f"cannot handle this type -> {values.dtype}") from err
371
372 # Convert inf to nan for C funcs
373 inf = np.isinf(values)
374 if inf.any():
375 values = np.where(inf, np.nan, values)
376
377 return values
378
379 def _insert_on_column(self, result: DataFrame, obj: DataFrame) -> None:
380 # if we have an 'on' column we want to put it back into
381 # the results in the same location
382 from pandas import Series
383
384 if self.on is not None and not self._on.equals(obj.index):
385 name = self._on.name
386 extra_col = Series(self._on, index=self.obj.index, name=name, copy=False)
387 if name in result.columns:
388 # TODO: sure we want to overwrite results?
389 result[name] = extra_col
390 elif name in result.index.names:
391 pass
392 elif name in self._selected_obj.columns:
393 # insert in the same location as we had in _selected_obj
394 old_cols = self._selected_obj.columns
395 new_cols = result.columns
396 old_loc = old_cols.get_loc(name)
397 overlap = new_cols.intersection(old_cols[:old_loc])
398 new_loc = len(overlap)
399 result.insert(new_loc, name, extra_col)
400 else:
401 # insert at the end
402 result[name] = extra_col
403
404 @property
405 def _index_array(self):
406 # TODO: why do we get here with e.g. MultiIndex?
407 if needs_i8_conversion(self._on.dtype):
408 idx = cast("PeriodIndex | DatetimeIndex | TimedeltaIndex", self._on)
409 return idx.asi8
410 return None
411
412 def _resolve_output(self, out: DataFrame, obj: DataFrame) -> DataFrame:
413 """Validate and finalize result."""
414 if out.shape[1] == 0 and obj.shape[1] > 0:
415 raise DataError("No numeric types to aggregate")
416 if out.shape[1] == 0:
417 return obj.astype("float64")
418
419 self._insert_on_column(out, obj)
420 return out
421
422 def _get_window_indexer(self) -> BaseIndexer:
423 """
424 Return an indexer class that will compute the window start and end bounds
425 """
426 if isinstance(self.window, BaseIndexer):
427 return self.window
428 if self._win_freq_i8 is not None:
429 return VariableWindowIndexer(
430 index_array=self._index_array,
431 window_size=self._win_freq_i8,
432 center=self.center,
433 )
434 return FixedWindowIndexer(window_size=self.window)
435
436 def _apply_series(
437 self, homogeneous_func: Callable[..., ArrayLike], name: str | None = None
438 ) -> Series:
439 """
440 Series version of _apply_blockwise
441 """
442 obj = self._create_data(self._selected_obj)
443
444 if name == "count":
445 # GH 12541: Special case for count where we support date-like types
446 obj = notna(obj).astype(int)
447 try:
448 values = self._prep_values(obj._values)
449 except (TypeError, NotImplementedError) as err:
450 raise DataError("No numeric types to aggregate") from err
451
452 result = homogeneous_func(values)
453 index = self._slice_axis_for_step(obj.index, result)
454 return obj._constructor(result, index=index, name=obj.name)
455
456 def _apply_blockwise(
457 self,
458 homogeneous_func: Callable[..., ArrayLike],
459 name: str,
460 numeric_only: bool = False,
461 ) -> DataFrame | Series:
462 """
463 Apply the given function to the DataFrame broken down into homogeneous
464 sub-frames.
465 """
466 self._validate_numeric_only(name, numeric_only)
467 if self._selected_obj.ndim == 1:
468 return self._apply_series(homogeneous_func, name)
469
470 obj = self._create_data(self._selected_obj, numeric_only)
471 if name == "count":
472 # GH 12541: Special case for count where we support date-like types
473 obj = notna(obj).astype(int)
474 obj._mgr = obj._mgr.consolidate()
475
476 if self.axis == 1:
477 obj = obj.T
478
479 taker = []
480 res_values = []
481 for i, arr in enumerate(obj._iter_column_arrays()):
482 # GH#42736 operate column-wise instead of block-wise
483 # As of 2.0, hfunc will raise for nuisance columns
484 try:
485 arr = self._prep_values(arr)
486 except (TypeError, NotImplementedError) as err:
487 raise DataError(
488 f"Cannot aggregate non-numeric type: {arr.dtype}"
489 ) from err
490 res = homogeneous_func(arr)
491 res_values.append(res)
492 taker.append(i)
493
494 index = self._slice_axis_for_step(
495 obj.index, res_values[0] if len(res_values) > 0 else None
496 )
497 df = type(obj)._from_arrays(
498 res_values,
499 index=index,
500 columns=obj.columns.take(taker),
501 verify_integrity=False,
502 )
503
504 if self.axis == 1:
505 df = df.T
506
507 return self._resolve_output(df, obj)
508
509 def _apply_tablewise(
510 self,
511 homogeneous_func: Callable[..., ArrayLike],
512 name: str | None = None,
513 numeric_only: bool = False,
514 ) -> DataFrame | Series:
515 """
516 Apply the given function to the DataFrame across the entire object
517 """
518 if self._selected_obj.ndim == 1:
519 raise ValueError("method='table' not applicable for Series objects.")
520 obj = self._create_data(self._selected_obj, numeric_only)
521 values = self._prep_values(obj.to_numpy())
522 values = values.T if self.axis == 1 else values
523 result = homogeneous_func(values)
524 result = result.T if self.axis == 1 else result
525 index = self._slice_axis_for_step(obj.index, result)
526 columns = (
527 obj.columns
528 if result.shape[1] == len(obj.columns)
529 else obj.columns[:: self.step]
530 )
531 out = obj._constructor(result, index=index, columns=columns)
532
533 return self._resolve_output(out, obj)
534
535 def _apply_pairwise(
536 self,
537 target: DataFrame | Series,
538 other: DataFrame | Series | None,
539 pairwise: bool | None,
540 func: Callable[[DataFrame | Series, DataFrame | Series], DataFrame | Series],
541 numeric_only: bool,
542 ) -> DataFrame | Series:
543 """
544 Apply the given pairwise function given 2 pandas objects (DataFrame/Series)
545 """
546 target = self._create_data(target, numeric_only)
547 if other is None:
548 other = target
549 # only default unset
550 pairwise = True if pairwise is None else pairwise
551 elif not isinstance(other, (ABCDataFrame, ABCSeries)):
552 raise ValueError("other must be a DataFrame or Series")
553 elif other.ndim == 2 and numeric_only:
554 other = self._make_numeric_only(other)
555
556 return flex_binary_moment(target, other, func, pairwise=bool(pairwise))
557
558 def _apply(
559 self,
560 func: Callable[..., Any],
561 name: str,
562 numeric_only: bool = False,
563 numba_args: tuple[Any, ...] = (),
564 **kwargs,
565 ):
566 """
567 Rolling statistical measure using supplied function.
568
569 Designed to be used with passed-in Cython array-based functions.
570
571 Parameters
572 ----------
573 func : callable function to apply
574 name : str,
575 numba_args : tuple
576 args to be passed when func is a numba func
577 **kwargs
578 additional arguments for rolling function and window function
579
580 Returns
581 -------
582 y : type of input
583 """
584 window_indexer = self._get_window_indexer()
585 min_periods = (
586 self.min_periods
587 if self.min_periods is not None
588 else window_indexer.window_size
589 )
590
591 def homogeneous_func(values: np.ndarray):
592 # calculation function
593
594 if values.size == 0:
595 return values.copy()
596
597 def calc(x):
598 start, end = window_indexer.get_window_bounds(
599 num_values=len(x),
600 min_periods=min_periods,
601 center=self.center,
602 closed=self.closed,
603 step=self.step,
604 )
605 self._check_window_bounds(start, end, len(x))
606
607 return func(x, start, end, min_periods, *numba_args)
608
609 with np.errstate(all="ignore"):
610 result = calc(values)
611
612 return result
613
614 if self.method == "single":
615 return self._apply_blockwise(homogeneous_func, name, numeric_only)
616 else:
617 return self._apply_tablewise(homogeneous_func, name, numeric_only)
618
619 def _numba_apply(
620 self,
621 func: Callable[..., Any],
622 engine_kwargs: dict[str, bool] | None = None,
623 *func_args,
624 ):
625 window_indexer = self._get_window_indexer()
626 min_periods = (
627 self.min_periods
628 if self.min_periods is not None
629 else window_indexer.window_size
630 )
631 obj = self._create_data(self._selected_obj)
632 if self.axis == 1:
633 obj = obj.T
634 values = self._prep_values(obj.to_numpy())
635 if values.ndim == 1:
636 values = values.reshape(-1, 1)
637 start, end = window_indexer.get_window_bounds(
638 num_values=len(values),
639 min_periods=min_periods,
640 center=self.center,
641 closed=self.closed,
642 step=self.step,
643 )
644 self._check_window_bounds(start, end, len(values))
645 aggregator = executor.generate_shared_aggregator(
646 func, **get_jit_arguments(engine_kwargs)
647 )
648 result = aggregator(values, start, end, min_periods, *func_args)
649 result = result.T if self.axis == 1 else result
650 index = self._slice_axis_for_step(obj.index, result)
651 if obj.ndim == 1:
652 result = result.squeeze()
653 out = obj._constructor(result, index=index, name=obj.name)
654 return out
655 else:
656 columns = self._slice_axis_for_step(obj.columns, result.T)
657 out = obj._constructor(result, index=index, columns=columns)
658 return self._resolve_output(out, obj)
659
660 def aggregate(self, func, *args, **kwargs):
661 result = ResamplerWindowApply(self, func, args=args, kwargs=kwargs).agg()
662 if result is None:
663 return self.apply(func, raw=False, args=args, kwargs=kwargs)
664 return result
665
666 agg = aggregate
667
668
669class BaseWindowGroupby(BaseWindow):
670 """
671 Provide the groupby windowing facilities.
672 """
673
674 _grouper: BaseGrouper
675 _as_index: bool
676 _attributes: list[str] = ["_grouper"]
677
678 def __init__(
679 self,
680 obj: DataFrame | Series,
681 *args,
682 _grouper: BaseGrouper,
683 _as_index: bool = True,
684 **kwargs,
685 ) -> None:
686 from pandas.core.groupby.ops import BaseGrouper
687
688 if not isinstance(_grouper, BaseGrouper):
689 raise ValueError("Must pass a BaseGrouper object.")
690 self._grouper = _grouper
691 self._as_index = _as_index
692 # GH 32262: It's convention to keep the grouping column in
693 # groupby.<agg_func>, but unexpected to users in
694 # groupby.rolling.<agg_func>
695 obj = obj.drop(columns=self._grouper.names, errors="ignore")
696 # GH 15354
697 if kwargs.get("step") is not None:
698 raise NotImplementedError("step not implemented for groupby")
699 super().__init__(obj, *args, **kwargs)
700
701 def _apply(
702 self,
703 func: Callable[..., Any],
704 name: str,
705 numeric_only: bool = False,
706 numba_args: tuple[Any, ...] = (),
707 **kwargs,
708 ) -> DataFrame | Series:
709 result = super()._apply(
710 func,
711 name,
712 numeric_only,
713 numba_args,
714 **kwargs,
715 )
716 # Reconstruct the resulting MultiIndex
717 # 1st set of levels = group by labels
718 # 2nd set of levels = original DataFrame/Series index
719 grouped_object_index = self.obj.index
720 grouped_index_name = [*grouped_object_index.names]
721 groupby_keys = copy.copy(self._grouper.names)
722 result_index_names = groupby_keys + grouped_index_name
723
724 drop_columns = [
725 key
726 for key in self._grouper.names
727 if key not in self.obj.index.names or key is None
728 ]
729
730 if len(drop_columns) != len(groupby_keys):
731 # Our result will have still kept the column in the result
732 result = result.drop(columns=drop_columns, errors="ignore")
733
734 codes = self._grouper.codes
735 levels = copy.copy(self._grouper.levels)
736
737 group_indices = self._grouper.indices.values()
738 if group_indices:
739 indexer = np.concatenate(list(group_indices))
740 else:
741 indexer = np.array([], dtype=np.intp)
742 codes = [c.take(indexer) for c in codes]
743
744 # if the index of the original dataframe needs to be preserved, append
745 # this index (but reordered) to the codes/levels from the groupby
746 if grouped_object_index is not None:
747 idx = grouped_object_index.take(indexer)
748 if not isinstance(idx, MultiIndex):
749 idx = MultiIndex.from_arrays([idx])
750 codes.extend(list(idx.codes))
751 levels.extend(list(idx.levels))
752
753 result_index = MultiIndex(
754 levels, codes, names=result_index_names, verify_integrity=False
755 )
756
757 result.index = result_index
758 if not self._as_index:
759 result = result.reset_index(level=list(range(len(groupby_keys))))
760 return result
761
762 def _apply_pairwise(
763 self,
764 target: DataFrame | Series,
765 other: DataFrame | Series | None,
766 pairwise: bool | None,
767 func: Callable[[DataFrame | Series, DataFrame | Series], DataFrame | Series],
768 numeric_only: bool,
769 ) -> DataFrame | Series:
770 """
771 Apply the given pairwise function given 2 pandas objects (DataFrame/Series)
772 """
773 # Manually drop the grouping column first
774 target = target.drop(columns=self._grouper.names, errors="ignore")
775 result = super()._apply_pairwise(target, other, pairwise, func, numeric_only)
776 # 1) Determine the levels + codes of the groupby levels
777 if other is not None and not all(
778 len(group) == len(other) for group in self._grouper.indices.values()
779 ):
780 # GH 42915
781 # len(other) != len(any group), so must reindex (expand) the result
782 # from flex_binary_moment to a "transform"-like result
783 # per groupby combination
784 old_result_len = len(result)
785 result = concat(
786 [
787 result.take(gb_indices).reindex(result.index)
788 for gb_indices in self._grouper.indices.values()
789 ]
790 )
791
792 gb_pairs = (
793 com.maybe_make_list(pair) for pair in self._grouper.indices.keys()
794 )
795 groupby_codes = []
796 groupby_levels = []
797 # e.g. [[1, 2], [4, 5]] as [[1, 4], [2, 5]]
798 for gb_level_pair in map(list, zip(*gb_pairs)):
799 labels = np.repeat(np.array(gb_level_pair), old_result_len)
800 codes, levels = factorize(labels)
801 groupby_codes.append(codes)
802 groupby_levels.append(levels)
803 else:
804 # pairwise=True or len(other) == len(each group), so repeat
805 # the groupby labels by the number of columns in the original object
806 groupby_codes = self._grouper.codes
807 # error: Incompatible types in assignment (expression has type
808 # "List[Index]", variable has type "List[Union[ndarray, Index]]")
809 groupby_levels = self._grouper.levels # type: ignore[assignment]
810
811 group_indices = self._grouper.indices.values()
812 if group_indices:
813 indexer = np.concatenate(list(group_indices))
814 else:
815 indexer = np.array([], dtype=np.intp)
816
817 if target.ndim == 1:
818 repeat_by = 1
819 else:
820 repeat_by = len(target.columns)
821 groupby_codes = [
822 np.repeat(c.take(indexer), repeat_by) for c in groupby_codes
823 ]
824 # 2) Determine the levels + codes of the result from super()._apply_pairwise
825 if isinstance(result.index, MultiIndex):
826 result_codes = list(result.index.codes)
827 result_levels = list(result.index.levels)
828 result_names = list(result.index.names)
829 else:
830 idx_codes, idx_levels = factorize(result.index)
831 result_codes = [idx_codes]
832 result_levels = [idx_levels]
833 result_names = [result.index.name]
834
835 # 3) Create the resulting index by combining 1) + 2)
836 result_codes = groupby_codes + result_codes
837 result_levels = groupby_levels + result_levels
838 result_names = self._grouper.names + result_names
839
840 result_index = MultiIndex(
841 result_levels, result_codes, names=result_names, verify_integrity=False
842 )
843 result.index = result_index
844 return result
845
846 def _create_data(self, obj: NDFrameT, numeric_only: bool = False) -> NDFrameT:
847 """
848 Split data into blocks & return conformed data.
849 """
850 # Ensure the object we're rolling over is monotonically sorted relative
851 # to the groups
852 # GH 36197
853 if not obj.empty:
854 groupby_order = np.concatenate(list(self._grouper.indices.values())).astype(
855 np.int64
856 )
857 obj = obj.take(groupby_order)
858 return super()._create_data(obj, numeric_only)
859
860 def _gotitem(self, key, ndim, subset=None):
861 # we are setting the index on the actual object
862 # here so our index is carried through to the selected obj
863 # when we do the splitting for the groupby
864 if self.on is not None:
865 # GH 43355
866 subset = self.obj.set_index(self._on)
867 return super()._gotitem(key, ndim, subset=subset)
868
869
870class Window(BaseWindow):
871 """
872 Provide rolling window calculations.
873
874 Parameters
875 ----------
876 window : int, timedelta, str, offset, or BaseIndexer subclass
877 Size of the moving window.
878
879 If an integer, the fixed number of observations used for
880 each window.
881
882 If a timedelta, str, or offset, the time period of each window. Each
883 window will be a variable sized based on the observations included in
884 the time-period. This is only valid for datetimelike indexes.
885 To learn more about the offsets & frequency strings, please see `this link
886 <https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#offset-aliases>`__.
887
888 If a BaseIndexer subclass, the window boundaries
889 based on the defined ``get_window_bounds`` method. Additional rolling
890 keyword arguments, namely ``min_periods``, ``center``, ``closed`` and
891 ``step`` will be passed to ``get_window_bounds``.
892
893 min_periods : int, default None
894 Minimum number of observations in window required to have a value;
895 otherwise, result is ``np.nan``.
896
897 For a window that is specified by an offset, ``min_periods`` will default to 1.
898
899 For a window that is specified by an integer, ``min_periods`` will default
900 to the size of the window.
901
902 center : bool, default False
903 If False, set the window labels as the right edge of the window index.
904
905 If True, set the window labels as the center of the window index.
906
907 win_type : str, default None
908 If ``None``, all points are evenly weighted.
909
910 If a string, it must be a valid `scipy.signal window function
911 <https://docs.scipy.org/doc/scipy/reference/signal.windows.html#module-scipy.signal.windows>`__.
912
913 Certain Scipy window types require additional parameters to be passed
914 in the aggregation function. The additional parameters must match
915 the keywords specified in the Scipy window type method signature.
916
917 on : str, optional
918 For a DataFrame, a column label or Index level on which
919 to calculate the rolling window, rather than the DataFrame's index.
920
921 Provided integer column is ignored and excluded from result since
922 an integer index is not used to calculate the rolling window.
923
924 axis : int or str, default 0
925 If ``0`` or ``'index'``, roll across the rows.
926
927 If ``1`` or ``'columns'``, roll across the columns.
928
929 For `Series` this parameter is unused and defaults to 0.
930
931 closed : str, default None
932 If ``'right'``, the first point in the window is excluded from calculations.
933
934 If ``'left'``, the last point in the window is excluded from calculations.
935
936 If ``'both'``, the no points in the window are excluded from calculations.
937
938 If ``'neither'``, the first and last points in the window are excluded
939 from calculations.
940
941 Default ``None`` (``'right'``).
942
943 .. versionchanged:: 1.2.0
944
945 The closed parameter with fixed windows is now supported.
946
947 step : int, default None
948
949 .. versionadded:: 1.5.0
950
951 Evaluate the window at every ``step`` result, equivalent to slicing as
952 ``[::step]``. ``window`` must be an integer. Using a step argument other
953 than None or 1 will produce a result with a different shape than the input.
954
955 method : str {'single', 'table'}, default 'single'
956
957 .. versionadded:: 1.3.0
958
959 Execute the rolling operation per single column or row (``'single'``)
960 or over the entire object (``'table'``).
961
962 This argument is only implemented when specifying ``engine='numba'``
963 in the method call.
964
965 Returns
966 -------
967 ``Window`` subclass if a ``win_type`` is passed
968
969 ``Rolling`` subclass if ``win_type`` is not passed
970
971 See Also
972 --------
973 expanding : Provides expanding transformations.
974 ewm : Provides exponential weighted functions.
975
976 Notes
977 -----
978 See :ref:`Windowing Operations <window.generic>` for further usage details
979 and examples.
980
981 Examples
982 --------
983 >>> df = pd.DataFrame({'B': [0, 1, 2, np.nan, 4]})
984 >>> df
985 B
986 0 0.0
987 1 1.0
988 2 2.0
989 3 NaN
990 4 4.0
991
992 **window**
993
994 Rolling sum with a window length of 2 observations.
995
996 >>> df.rolling(2).sum()
997 B
998 0 NaN
999 1 1.0
1000 2 3.0
1001 3 NaN
1002 4 NaN
1003
1004 Rolling sum with a window span of 2 seconds.
1005
1006 >>> df_time = pd.DataFrame({'B': [0, 1, 2, np.nan, 4]},
1007 ... index = [pd.Timestamp('20130101 09:00:00'),
1008 ... pd.Timestamp('20130101 09:00:02'),
1009 ... pd.Timestamp('20130101 09:00:03'),
1010 ... pd.Timestamp('20130101 09:00:05'),
1011 ... pd.Timestamp('20130101 09:00:06')])
1012
1013 >>> df_time
1014 B
1015 2013-01-01 09:00:00 0.0
1016 2013-01-01 09:00:02 1.0
1017 2013-01-01 09:00:03 2.0
1018 2013-01-01 09:00:05 NaN
1019 2013-01-01 09:00:06 4.0
1020
1021 >>> df_time.rolling('2s').sum()
1022 B
1023 2013-01-01 09:00:00 0.0
1024 2013-01-01 09:00:02 1.0
1025 2013-01-01 09:00:03 3.0
1026 2013-01-01 09:00:05 NaN
1027 2013-01-01 09:00:06 4.0
1028
1029 Rolling sum with forward looking windows with 2 observations.
1030
1031 >>> indexer = pd.api.indexers.FixedForwardWindowIndexer(window_size=2)
1032 >>> df.rolling(window=indexer, min_periods=1).sum()
1033 B
1034 0 1.0
1035 1 3.0
1036 2 2.0
1037 3 4.0
1038 4 4.0
1039
1040 **min_periods**
1041
1042 Rolling sum with a window length of 2 observations, but only needs a minimum of 1
1043 observation to calculate a value.
1044
1045 >>> df.rolling(2, min_periods=1).sum()
1046 B
1047 0 0.0
1048 1 1.0
1049 2 3.0
1050 3 2.0
1051 4 4.0
1052
1053 **center**
1054
1055 Rolling sum with the result assigned to the center of the window index.
1056
1057 >>> df.rolling(3, min_periods=1, center=True).sum()
1058 B
1059 0 1.0
1060 1 3.0
1061 2 3.0
1062 3 6.0
1063 4 4.0
1064
1065 >>> df.rolling(3, min_periods=1, center=False).sum()
1066 B
1067 0 0.0
1068 1 1.0
1069 2 3.0
1070 3 3.0
1071 4 6.0
1072
1073 **step**
1074
1075 Rolling sum with a window length of 2 observations, minimum of 1 observation to
1076 calculate a value, and a step of 2.
1077
1078 >>> df.rolling(2, min_periods=1, step=2).sum()
1079 B
1080 0 0.0
1081 2 3.0
1082 4 4.0
1083
1084 **win_type**
1085
1086 Rolling sum with a window length of 2, using the Scipy ``'gaussian'``
1087 window type. ``std`` is required in the aggregation function.
1088
1089 >>> df.rolling(2, win_type='gaussian').sum(std=3)
1090 B
1091 0 NaN
1092 1 0.986207
1093 2 2.958621
1094 3 NaN
1095 4 NaN
1096
1097 **on**
1098
1099 Rolling sum with a window length of 2 days.
1100
1101 >>> df = pd.DataFrame({
1102 ... 'A': [pd.to_datetime('2020-01-01'),
1103 ... pd.to_datetime('2020-01-01'),
1104 ... pd.to_datetime('2020-01-02'),],
1105 ... 'B': [1, 2, 3], },
1106 ... index=pd.date_range('2020', periods=3))
1107
1108 >>> df
1109 A B
1110 2020-01-01 2020-01-01 1
1111 2020-01-02 2020-01-01 2
1112 2020-01-03 2020-01-02 3
1113
1114 >>> df.rolling('2D', on='A').sum()
1115 A B
1116 2020-01-01 2020-01-01 1.0
1117 2020-01-02 2020-01-01 3.0
1118 2020-01-03 2020-01-02 6.0
1119 """
1120
1121 _attributes = [
1122 "window",
1123 "min_periods",
1124 "center",
1125 "win_type",
1126 "axis",
1127 "on",
1128 "closed",
1129 "step",
1130 "method",
1131 ]
1132
1133 def _validate(self):
1134 super()._validate()
1135
1136 if not isinstance(self.win_type, str):
1137 raise ValueError(f"Invalid win_type {self.win_type}")
1138 signal = import_optional_dependency(
1139 "scipy.signal.windows", extra="Scipy is required to generate window weight."
1140 )
1141 self._scipy_weight_generator = getattr(signal, self.win_type, None)
1142 if self._scipy_weight_generator is None:
1143 raise ValueError(f"Invalid win_type {self.win_type}")
1144
1145 if isinstance(self.window, BaseIndexer):
1146 raise NotImplementedError(
1147 "BaseIndexer subclasses not implemented with win_types."
1148 )
1149 if not is_integer(self.window) or self.window < 0:
1150 raise ValueError("window must be an integer 0 or greater")
1151
1152 if self.method != "single":
1153 raise NotImplementedError("'single' is the only supported method type.")
1154
1155 def _center_window(self, result: np.ndarray, offset: int) -> np.ndarray:
1156 """
1157 Center the result in the window for weighted rolling aggregations.
1158 """
1159 if offset > 0:
1160 lead_indexer = [slice(offset, None)]
1161 result = np.copy(result[tuple(lead_indexer)])
1162 return result
1163
1164 def _apply(
1165 self,
1166 func: Callable[[np.ndarray, int, int], np.ndarray],
1167 name: str,
1168 numeric_only: bool = False,
1169 numba_args: tuple[Any, ...] = (),
1170 **kwargs,
1171 ):
1172 """
1173 Rolling with weights statistical measure using supplied function.
1174
1175 Designed to be used with passed-in Cython array-based functions.
1176
1177 Parameters
1178 ----------
1179 func : callable function to apply
1180 name : str,
1181 numeric_only : bool, default False
1182 Whether to only operate on bool, int, and float columns
1183 numba_args : tuple
1184 unused
1185 **kwargs
1186 additional arguments for scipy windows if necessary
1187
1188 Returns
1189 -------
1190 y : type of input
1191 """
1192 # "None" not callable [misc]
1193 window = self._scipy_weight_generator( # type: ignore[misc]
1194 self.window, **kwargs
1195 )
1196 offset = (len(window) - 1) // 2 if self.center else 0
1197
1198 def homogeneous_func(values: np.ndarray):
1199 # calculation function
1200
1201 if values.size == 0:
1202 return values.copy()
1203
1204 def calc(x):
1205 additional_nans = np.array([np.nan] * offset)
1206 x = np.concatenate((x, additional_nans))
1207 return func(x, window, self.min_periods or len(window))
1208
1209 with np.errstate(all="ignore"):
1210 # Our weighted aggregations return memoryviews
1211 result = np.asarray(calc(values))
1212
1213 if self.center:
1214 result = self._center_window(result, offset)
1215
1216 return result
1217
1218 return self._apply_blockwise(homogeneous_func, name, numeric_only)[:: self.step]
1219
1220 @doc(
1221 _shared_docs["aggregate"],
1222 see_also=dedent(
1223 """
1224 See Also
1225 --------
1226 pandas.DataFrame.aggregate : Similar DataFrame method.
1227 pandas.Series.aggregate : Similar Series method.
1228 """
1229 ),
1230 examples=dedent(
1231 """
1232 Examples
1233 --------
1234 >>> df = pd.DataFrame({"A": [1, 2, 3], "B": [4, 5, 6], "C": [7, 8, 9]})
1235 >>> df
1236 A B C
1237 0 1 4 7
1238 1 2 5 8
1239 2 3 6 9
1240
1241 >>> df.rolling(2, win_type="boxcar").agg("mean")
1242 A B C
1243 0 NaN NaN NaN
1244 1 1.5 4.5 7.5
1245 2 2.5 5.5 8.5
1246 """
1247 ),
1248 klass="Series/DataFrame",
1249 axis="",
1250 )
1251 def aggregate(self, func, *args, **kwargs):
1252 result = ResamplerWindowApply(self, func, args=args, kwargs=kwargs).agg()
1253 if result is None:
1254 # these must apply directly
1255 result = func(self)
1256
1257 return result
1258
1259 agg = aggregate
1260
1261 @doc(
1262 template_header,
1263 create_section_header("Parameters"),
1264 kwargs_numeric_only,
1265 kwargs_scipy,
1266 create_section_header("Returns"),
1267 template_returns,
1268 create_section_header("See Also"),
1269 template_see_also[:-1],
1270 window_method="rolling",
1271 aggregation_description="weighted window sum",
1272 agg_method="sum",
1273 )
1274 def sum(self, numeric_only: bool = False, **kwargs):
1275 window_func = window_aggregations.roll_weighted_sum
1276 # error: Argument 1 to "_apply" of "Window" has incompatible type
1277 # "Callable[[ndarray, ndarray, int], ndarray]"; expected
1278 # "Callable[[ndarray, int, int], ndarray]"
1279 return self._apply(
1280 window_func, # type: ignore[arg-type]
1281 name="sum",
1282 numeric_only=numeric_only,
1283 **kwargs,
1284 )
1285
1286 @doc(
1287 template_header,
1288 create_section_header("Parameters"),
1289 kwargs_numeric_only,
1290 kwargs_scipy,
1291 create_section_header("Returns"),
1292 template_returns,
1293 create_section_header("See Also"),
1294 template_see_also[:-1],
1295 window_method="rolling",
1296 aggregation_description="weighted window mean",
1297 agg_method="mean",
1298 )
1299 def mean(self, numeric_only: bool = False, **kwargs):
1300 window_func = window_aggregations.roll_weighted_mean
1301 # error: Argument 1 to "_apply" of "Window" has incompatible type
1302 # "Callable[[ndarray, ndarray, int], ndarray]"; expected
1303 # "Callable[[ndarray, int, int], ndarray]"
1304 return self._apply(
1305 window_func, # type: ignore[arg-type]
1306 name="mean",
1307 numeric_only=numeric_only,
1308 **kwargs,
1309 )
1310
1311 @doc(
1312 template_header,
1313 create_section_header("Parameters"),
1314 kwargs_numeric_only,
1315 kwargs_scipy,
1316 create_section_header("Returns"),
1317 template_returns,
1318 create_section_header("See Also"),
1319 template_see_also[:-1],
1320 window_method="rolling",
1321 aggregation_description="weighted window variance",
1322 agg_method="var",
1323 )
1324 def var(self, ddof: int = 1, numeric_only: bool = False, **kwargs):
1325 window_func = partial(window_aggregations.roll_weighted_var, ddof=ddof)
1326 kwargs.pop("name", None)
1327 return self._apply(window_func, name="var", numeric_only=numeric_only, **kwargs)
1328
1329 @doc(
1330 template_header,
1331 create_section_header("Parameters"),
1332 kwargs_numeric_only,
1333 kwargs_scipy,
1334 create_section_header("Returns"),
1335 template_returns,
1336 create_section_header("See Also"),
1337 template_see_also[:-1],
1338 window_method="rolling",
1339 aggregation_description="weighted window standard deviation",
1340 agg_method="std",
1341 )
1342 def std(self, ddof: int = 1, numeric_only: bool = False, **kwargs):
1343 return zsqrt(
1344 self.var(ddof=ddof, name="std", numeric_only=numeric_only, **kwargs)
1345 )
1346
1347
1348class RollingAndExpandingMixin(BaseWindow):
1349 def count(self, numeric_only: bool = False):
1350 window_func = window_aggregations.roll_sum
1351 return self._apply(window_func, name="count", numeric_only=numeric_only)
1352
1353 def apply(
1354 self,
1355 func: Callable[..., Any],
1356 raw: bool = False,
1357 engine: str | None = None,
1358 engine_kwargs: dict[str, bool] | None = None,
1359 args: tuple[Any, ...] | None = None,
1360 kwargs: dict[str, Any] | None = None,
1361 ):
1362 if args is None:
1363 args = ()
1364 if kwargs is None:
1365 kwargs = {}
1366
1367 if not is_bool(raw):
1368 raise ValueError("raw parameter must be `True` or `False`")
1369
1370 numba_args: tuple[Any, ...] = ()
1371 if maybe_use_numba(engine):
1372 if raw is False:
1373 raise ValueError("raw must be `True` when using the numba engine")
1374 numba_args = args
1375 if self.method == "single":
1376 apply_func = generate_numba_apply_func(
1377 func, **get_jit_arguments(engine_kwargs, kwargs)
1378 )
1379 else:
1380 apply_func = generate_numba_table_func(
1381 func, **get_jit_arguments(engine_kwargs, kwargs)
1382 )
1383 elif engine in ("cython", None):
1384 if engine_kwargs is not None:
1385 raise ValueError("cython engine does not accept engine_kwargs")
1386 apply_func = self._generate_cython_apply_func(args, kwargs, raw, func)
1387 else:
1388 raise ValueError("engine must be either 'numba' or 'cython'")
1389
1390 return self._apply(
1391 apply_func,
1392 name="apply",
1393 numba_args=numba_args,
1394 )
1395
1396 def _generate_cython_apply_func(
1397 self,
1398 args: tuple[Any, ...],
1399 kwargs: dict[str, Any],
1400 raw: bool,
1401 function: Callable[..., Any],
1402 ) -> Callable[[np.ndarray, np.ndarray, np.ndarray, int], np.ndarray]:
1403 from pandas import Series
1404
1405 window_func = partial(
1406 window_aggregations.roll_apply,
1407 args=args,
1408 kwargs=kwargs,
1409 raw=raw,
1410 function=function,
1411 )
1412
1413 def apply_func(values, begin, end, min_periods, raw=raw):
1414 if not raw:
1415 # GH 45912
1416 values = Series(values, index=self._on, copy=False)
1417 return window_func(values, begin, end, min_periods)
1418
1419 return apply_func
1420
1421 def sum(
1422 self,
1423 numeric_only: bool = False,
1424 engine: str | None = None,
1425 engine_kwargs: dict[str, bool] | None = None,
1426 ):
1427 if maybe_use_numba(engine):
1428 if self.method == "table":
1429 func = generate_manual_numpy_nan_agg_with_axis(np.nansum)
1430 return self.apply(
1431 func,
1432 raw=True,
1433 engine=engine,
1434 engine_kwargs=engine_kwargs,
1435 )
1436 else:
1437 from pandas.core._numba.kernels import sliding_sum
1438
1439 return self._numba_apply(sliding_sum, engine_kwargs)
1440 window_func = window_aggregations.roll_sum
1441 return self._apply(window_func, name="sum", numeric_only=numeric_only)
1442
1443 def max(
1444 self,
1445 numeric_only: bool = False,
1446 engine: str | None = None,
1447 engine_kwargs: dict[str, bool] | None = None,
1448 ):
1449 if maybe_use_numba(engine):
1450 if self.method == "table":
1451 func = generate_manual_numpy_nan_agg_with_axis(np.nanmax)
1452 return self.apply(
1453 func,
1454 raw=True,
1455 engine=engine,
1456 engine_kwargs=engine_kwargs,
1457 )
1458 else:
1459 from pandas.core._numba.kernels import sliding_min_max
1460
1461 return self._numba_apply(sliding_min_max, engine_kwargs, True)
1462 window_func = window_aggregations.roll_max
1463 return self._apply(window_func, name="max", numeric_only=numeric_only)
1464
1465 def min(
1466 self,
1467 numeric_only: bool = False,
1468 engine: str | None = None,
1469 engine_kwargs: dict[str, bool] | None = None,
1470 ):
1471 if maybe_use_numba(engine):
1472 if self.method == "table":
1473 func = generate_manual_numpy_nan_agg_with_axis(np.nanmin)
1474 return self.apply(
1475 func,
1476 raw=True,
1477 engine=engine,
1478 engine_kwargs=engine_kwargs,
1479 )
1480 else:
1481 from pandas.core._numba.kernels import sliding_min_max
1482
1483 return self._numba_apply(sliding_min_max, engine_kwargs, False)
1484 window_func = window_aggregations.roll_min
1485 return self._apply(window_func, name="min", numeric_only=numeric_only)
1486
1487 def mean(
1488 self,
1489 numeric_only: bool = False,
1490 engine: str | None = None,
1491 engine_kwargs: dict[str, bool] | None = None,
1492 ):
1493 if maybe_use_numba(engine):
1494 if self.method == "table":
1495 func = generate_manual_numpy_nan_agg_with_axis(np.nanmean)
1496 return self.apply(
1497 func,
1498 raw=True,
1499 engine=engine,
1500 engine_kwargs=engine_kwargs,
1501 )
1502 else:
1503 from pandas.core._numba.kernels import sliding_mean
1504
1505 return self._numba_apply(sliding_mean, engine_kwargs)
1506 window_func = window_aggregations.roll_mean
1507 return self._apply(window_func, name="mean", numeric_only=numeric_only)
1508
1509 def median(
1510 self,
1511 numeric_only: bool = False,
1512 engine: str | None = None,
1513 engine_kwargs: dict[str, bool] | None = None,
1514 ):
1515 if maybe_use_numba(engine):
1516 if self.method == "table":
1517 func = generate_manual_numpy_nan_agg_with_axis(np.nanmedian)
1518 else:
1519 func = np.nanmedian
1520
1521 return self.apply(
1522 func,
1523 raw=True,
1524 engine=engine,
1525 engine_kwargs=engine_kwargs,
1526 )
1527 window_func = window_aggregations.roll_median_c
1528 return self._apply(window_func, name="median", numeric_only=numeric_only)
1529
1530 def std(
1531 self,
1532 ddof: int = 1,
1533 numeric_only: bool = False,
1534 engine: str | None = None,
1535 engine_kwargs: dict[str, bool] | None = None,
1536 ):
1537 if maybe_use_numba(engine):
1538 if self.method == "table":
1539 raise NotImplementedError("std not supported with method='table'")
1540 from pandas.core._numba.kernels import sliding_var
1541
1542 return zsqrt(self._numba_apply(sliding_var, engine_kwargs, ddof))
1543 window_func = window_aggregations.roll_var
1544
1545 def zsqrt_func(values, begin, end, min_periods):
1546 return zsqrt(window_func(values, begin, end, min_periods, ddof=ddof))
1547
1548 return self._apply(
1549 zsqrt_func,
1550 name="std",
1551 numeric_only=numeric_only,
1552 )
1553
1554 def var(
1555 self,
1556 ddof: int = 1,
1557 numeric_only: bool = False,
1558 engine: str | None = None,
1559 engine_kwargs: dict[str, bool] | None = None,
1560 ):
1561 if maybe_use_numba(engine):
1562 if self.method == "table":
1563 raise NotImplementedError("var not supported with method='table'")
1564 from pandas.core._numba.kernels import sliding_var
1565
1566 return self._numba_apply(sliding_var, engine_kwargs, ddof)
1567 window_func = partial(window_aggregations.roll_var, ddof=ddof)
1568 return self._apply(
1569 window_func,
1570 name="var",
1571 numeric_only=numeric_only,
1572 )
1573
1574 def skew(self, numeric_only: bool = False):
1575 window_func = window_aggregations.roll_skew
1576 return self._apply(
1577 window_func,
1578 name="skew",
1579 numeric_only=numeric_only,
1580 )
1581
1582 def sem(self, ddof: int = 1, numeric_only: bool = False):
1583 # Raise here so error message says sem instead of std
1584 self._validate_numeric_only("sem", numeric_only)
1585 return self.std(numeric_only=numeric_only) / (
1586 self.count(numeric_only=numeric_only) - ddof
1587 ).pow(0.5)
1588
1589 def kurt(self, numeric_only: bool = False):
1590 window_func = window_aggregations.roll_kurt
1591 return self._apply(
1592 window_func,
1593 name="kurt",
1594 numeric_only=numeric_only,
1595 )
1596
1597 def quantile(
1598 self,
1599 quantile: float,
1600 interpolation: QuantileInterpolation = "linear",
1601 numeric_only: bool = False,
1602 ):
1603 if quantile == 1.0:
1604 window_func = window_aggregations.roll_max
1605 elif quantile == 0.0:
1606 window_func = window_aggregations.roll_min
1607 else:
1608 window_func = partial(
1609 window_aggregations.roll_quantile,
1610 quantile=quantile,
1611 interpolation=interpolation,
1612 )
1613
1614 return self._apply(window_func, name="quantile", numeric_only=numeric_only)
1615
1616 def rank(
1617 self,
1618 method: WindowingRankType = "average",
1619 ascending: bool = True,
1620 pct: bool = False,
1621 numeric_only: bool = False,
1622 ):
1623 window_func = partial(
1624 window_aggregations.roll_rank,
1625 method=method,
1626 ascending=ascending,
1627 percentile=pct,
1628 )
1629
1630 return self._apply(window_func, name="rank", numeric_only=numeric_only)
1631
1632 def cov(
1633 self,
1634 other: DataFrame | Series | None = None,
1635 pairwise: bool | None = None,
1636 ddof: int = 1,
1637 numeric_only: bool = False,
1638 ):
1639 if self.step is not None:
1640 raise NotImplementedError("step not implemented for cov")
1641 self._validate_numeric_only("cov", numeric_only)
1642
1643 from pandas import Series
1644
1645 def cov_func(x, y):
1646 x_array = self._prep_values(x)
1647 y_array = self._prep_values(y)
1648 window_indexer = self._get_window_indexer()
1649 min_periods = (
1650 self.min_periods
1651 if self.min_periods is not None
1652 else window_indexer.window_size
1653 )
1654 start, end = window_indexer.get_window_bounds(
1655 num_values=len(x_array),
1656 min_periods=min_periods,
1657 center=self.center,
1658 closed=self.closed,
1659 step=self.step,
1660 )
1661 self._check_window_bounds(start, end, len(x_array))
1662
1663 with np.errstate(all="ignore"):
1664 mean_x_y = window_aggregations.roll_mean(
1665 x_array * y_array, start, end, min_periods
1666 )
1667 mean_x = window_aggregations.roll_mean(x_array, start, end, min_periods)
1668 mean_y = window_aggregations.roll_mean(y_array, start, end, min_periods)
1669 count_x_y = window_aggregations.roll_sum(
1670 notna(x_array + y_array).astype(np.float64), start, end, 0
1671 )
1672 result = (mean_x_y - mean_x * mean_y) * (count_x_y / (count_x_y - ddof))
1673 return Series(result, index=x.index, name=x.name, copy=False)
1674
1675 return self._apply_pairwise(
1676 self._selected_obj, other, pairwise, cov_func, numeric_only
1677 )
1678
1679 def corr(
1680 self,
1681 other: DataFrame | Series | None = None,
1682 pairwise: bool | None = None,
1683 ddof: int = 1,
1684 numeric_only: bool = False,
1685 ):
1686 if self.step is not None:
1687 raise NotImplementedError("step not implemented for corr")
1688 self._validate_numeric_only("corr", numeric_only)
1689
1690 from pandas import Series
1691
1692 def corr_func(x, y):
1693 x_array = self._prep_values(x)
1694 y_array = self._prep_values(y)
1695 window_indexer = self._get_window_indexer()
1696 min_periods = (
1697 self.min_periods
1698 if self.min_periods is not None
1699 else window_indexer.window_size
1700 )
1701 start, end = window_indexer.get_window_bounds(
1702 num_values=len(x_array),
1703 min_periods=min_periods,
1704 center=self.center,
1705 closed=self.closed,
1706 step=self.step,
1707 )
1708 self._check_window_bounds(start, end, len(x_array))
1709
1710 with np.errstate(all="ignore"):
1711 mean_x_y = window_aggregations.roll_mean(
1712 x_array * y_array, start, end, min_periods
1713 )
1714 mean_x = window_aggregations.roll_mean(x_array, start, end, min_periods)
1715 mean_y = window_aggregations.roll_mean(y_array, start, end, min_periods)
1716 count_x_y = window_aggregations.roll_sum(
1717 notna(x_array + y_array).astype(np.float64), start, end, 0
1718 )
1719 x_var = window_aggregations.roll_var(
1720 x_array, start, end, min_periods, ddof
1721 )
1722 y_var = window_aggregations.roll_var(
1723 y_array, start, end, min_periods, ddof
1724 )
1725 numerator = (mean_x_y - mean_x * mean_y) * (
1726 count_x_y / (count_x_y - ddof)
1727 )
1728 denominator = (x_var * y_var) ** 0.5
1729 result = numerator / denominator
1730 return Series(result, index=x.index, name=x.name, copy=False)
1731
1732 return self._apply_pairwise(
1733 self._selected_obj, other, pairwise, corr_func, numeric_only
1734 )
1735
1736
1737class Rolling(RollingAndExpandingMixin):
1738 _attributes: list[str] = [
1739 "window",
1740 "min_periods",
1741 "center",
1742 "win_type",
1743 "axis",
1744 "on",
1745 "closed",
1746 "step",
1747 "method",
1748 ]
1749
1750 def _validate(self):
1751 super()._validate()
1752
1753 # we allow rolling on a datetimelike index
1754 if (
1755 self.obj.empty
1756 or isinstance(self._on, (DatetimeIndex, TimedeltaIndex, PeriodIndex))
1757 ) and isinstance(self.window, (str, BaseOffset, timedelta)):
1758 self._validate_datetimelike_monotonic()
1759
1760 # this will raise ValueError on non-fixed freqs
1761 try:
1762 freq = to_offset(self.window)
1763 except (TypeError, ValueError) as err:
1764 raise ValueError(
1765 f"passed window {self.window} is not "
1766 "compatible with a datetimelike index"
1767 ) from err
1768 if isinstance(self._on, PeriodIndex):
1769 # error: Incompatible types in assignment (expression has type
1770 # "float", variable has type "Optional[int]")
1771 self._win_freq_i8 = freq.nanos / ( # type: ignore[assignment]
1772 self._on.freq.nanos / self._on.freq.n
1773 )
1774 else:
1775 self._win_freq_i8 = freq.nanos
1776
1777 # min_periods must be an integer
1778 if self.min_periods is None:
1779 self.min_periods = 1
1780
1781 if self.step is not None:
1782 raise NotImplementedError(
1783 "step is not supported with frequency windows"
1784 )
1785
1786 elif isinstance(self.window, BaseIndexer):
1787 # Passed BaseIndexer subclass should handle all other rolling kwargs
1788 pass
1789 elif not is_integer(self.window) or self.window < 0:
1790 raise ValueError("window must be an integer 0 or greater")
1791
1792 def _validate_datetimelike_monotonic(self) -> None:
1793 """
1794 Validate self._on is monotonic (increasing or decreasing) and has
1795 no NaT values for frequency windows.
1796 """
1797 if self._on.hasnans:
1798 self._raise_monotonic_error("values must not have NaT")
1799 if not (self._on.is_monotonic_increasing or self._on.is_monotonic_decreasing):
1800 self._raise_monotonic_error("values must be monotonic")
1801
1802 def _raise_monotonic_error(self, msg: str):
1803 on = self.on
1804 if on is None:
1805 if self.axis == 0:
1806 on = "index"
1807 else:
1808 on = "column"
1809 raise ValueError(f"{on} {msg}")
1810
1811 @doc(
1812 _shared_docs["aggregate"],
1813 see_also=dedent(
1814 """
1815 See Also
1816 --------
1817 pandas.Series.rolling : Calling object with Series data.
1818 pandas.DataFrame.rolling : Calling object with DataFrame data.
1819 """
1820 ),
1821 examples=dedent(
1822 """
1823 Examples
1824 --------
1825 >>> df = pd.DataFrame({"A": [1, 2, 3], "B": [4, 5, 6], "C": [7, 8, 9]})
1826 >>> df
1827 A B C
1828 0 1 4 7
1829 1 2 5 8
1830 2 3 6 9
1831
1832 >>> df.rolling(2).sum()
1833 A B C
1834 0 NaN NaN NaN
1835 1 3.0 9.0 15.0
1836 2 5.0 11.0 17.0
1837
1838 >>> df.rolling(2).agg({"A": "sum", "B": "min"})
1839 A B
1840 0 NaN NaN
1841 1 3.0 4.0
1842 2 5.0 5.0
1843 """
1844 ),
1845 klass="Series/Dataframe",
1846 axis="",
1847 )
1848 def aggregate(self, func, *args, **kwargs):
1849 return super().aggregate(func, *args, **kwargs)
1850
1851 agg = aggregate
1852
1853 @doc(
1854 template_header,
1855 create_section_header("Parameters"),
1856 kwargs_numeric_only,
1857 create_section_header("Returns"),
1858 template_returns,
1859 create_section_header("See Also"),
1860 template_see_also,
1861 create_section_header("Examples"),
1862 dedent(
1863 """
1864 >>> s = pd.Series([2, 3, np.nan, 10])
1865 >>> s.rolling(2).count()
1866 0 NaN
1867 1 2.0
1868 2 1.0
1869 3 1.0
1870 dtype: float64
1871 >>> s.rolling(3).count()
1872 0 NaN
1873 1 NaN
1874 2 2.0
1875 3 2.0
1876 dtype: float64
1877 >>> s.rolling(4).count()
1878 0 NaN
1879 1 NaN
1880 2 NaN
1881 3 3.0
1882 dtype: float64
1883 """
1884 ).replace("\n", "", 1),
1885 window_method="rolling",
1886 aggregation_description="count of non NaN observations",
1887 agg_method="count",
1888 )
1889 def count(self, numeric_only: bool = False):
1890 return super().count(numeric_only)
1891
1892 @doc(
1893 template_header,
1894 create_section_header("Parameters"),
1895 window_apply_parameters,
1896 create_section_header("Returns"),
1897 template_returns,
1898 create_section_header("See Also"),
1899 template_see_also[:-1],
1900 window_method="rolling",
1901 aggregation_description="custom aggregation function",
1902 agg_method="apply",
1903 )
1904 def apply(
1905 self,
1906 func: Callable[..., Any],
1907 raw: bool = False,
1908 engine: str | None = None,
1909 engine_kwargs: dict[str, bool] | None = None,
1910 args: tuple[Any, ...] | None = None,
1911 kwargs: dict[str, Any] | None = None,
1912 ):
1913 return super().apply(
1914 func,
1915 raw=raw,
1916 engine=engine,
1917 engine_kwargs=engine_kwargs,
1918 args=args,
1919 kwargs=kwargs,
1920 )
1921
1922 @doc(
1923 template_header,
1924 create_section_header("Parameters"),
1925 kwargs_numeric_only,
1926 window_agg_numba_parameters(),
1927 create_section_header("Returns"),
1928 template_returns,
1929 create_section_header("See Also"),
1930 template_see_also,
1931 create_section_header("Notes"),
1932 numba_notes,
1933 create_section_header("Examples"),
1934 dedent(
1935 """
1936 >>> s = pd.Series([1, 2, 3, 4, 5])
1937 >>> s
1938 0 1
1939 1 2
1940 2 3
1941 3 4
1942 4 5
1943 dtype: int64
1944
1945 >>> s.rolling(3).sum()
1946 0 NaN
1947 1 NaN
1948 2 6.0
1949 3 9.0
1950 4 12.0
1951 dtype: float64
1952
1953 >>> s.rolling(3, center=True).sum()
1954 0 NaN
1955 1 6.0
1956 2 9.0
1957 3 12.0
1958 4 NaN
1959 dtype: float64
1960
1961 For DataFrame, each sum is computed column-wise.
1962
1963 >>> df = pd.DataFrame({{"A": s, "B": s ** 2}})
1964 >>> df
1965 A B
1966 0 1 1
1967 1 2 4
1968 2 3 9
1969 3 4 16
1970 4 5 25
1971
1972 >>> df.rolling(3).sum()
1973 A B
1974 0 NaN NaN
1975 1 NaN NaN
1976 2 6.0 14.0
1977 3 9.0 29.0
1978 4 12.0 50.0
1979 """
1980 ).replace("\n", "", 1),
1981 window_method="rolling",
1982 aggregation_description="sum",
1983 agg_method="sum",
1984 )
1985 def sum(
1986 self,
1987 numeric_only: bool = False,
1988 engine: str | None = None,
1989 engine_kwargs: dict[str, bool] | None = None,
1990 ):
1991 return super().sum(
1992 numeric_only=numeric_only,
1993 engine=engine,
1994 engine_kwargs=engine_kwargs,
1995 )
1996
1997 @doc(
1998 template_header,
1999 create_section_header("Parameters"),
2000 kwargs_numeric_only,
2001 window_agg_numba_parameters(),
2002 create_section_header("Returns"),
2003 template_returns,
2004 create_section_header("See Also"),
2005 template_see_also,
2006 create_section_header("Notes"),
2007 numba_notes[:-1],
2008 window_method="rolling",
2009 aggregation_description="maximum",
2010 agg_method="max",
2011 )
2012 def max(
2013 self,
2014 numeric_only: bool = False,
2015 *args,
2016 engine: str | None = None,
2017 engine_kwargs: dict[str, bool] | None = None,
2018 **kwargs,
2019 ):
2020 return super().max(
2021 numeric_only=numeric_only,
2022 engine=engine,
2023 engine_kwargs=engine_kwargs,
2024 )
2025
2026 @doc(
2027 template_header,
2028 create_section_header("Parameters"),
2029 kwargs_numeric_only,
2030 window_agg_numba_parameters(),
2031 create_section_header("Returns"),
2032 template_returns,
2033 create_section_header("See Also"),
2034 template_see_also,
2035 create_section_header("Notes"),
2036 numba_notes,
2037 create_section_header("Examples"),
2038 dedent(
2039 """
2040 Performing a rolling minimum with a window size of 3.
2041
2042 >>> s = pd.Series([4, 3, 5, 2, 6])
2043 >>> s.rolling(3).min()
2044 0 NaN
2045 1 NaN
2046 2 3.0
2047 3 2.0
2048 4 2.0
2049 dtype: float64
2050 """
2051 ).replace("\n", "", 1),
2052 window_method="rolling",
2053 aggregation_description="minimum",
2054 agg_method="min",
2055 )
2056 def min(
2057 self,
2058 numeric_only: bool = False,
2059 engine: str | None = None,
2060 engine_kwargs: dict[str, bool] | None = None,
2061 ):
2062 return super().min(
2063 numeric_only=numeric_only,
2064 engine=engine,
2065 engine_kwargs=engine_kwargs,
2066 )
2067
2068 @doc(
2069 template_header,
2070 create_section_header("Parameters"),
2071 kwargs_numeric_only,
2072 window_agg_numba_parameters(),
2073 create_section_header("Returns"),
2074 template_returns,
2075 create_section_header("See Also"),
2076 template_see_also,
2077 create_section_header("Notes"),
2078 numba_notes,
2079 create_section_header("Examples"),
2080 dedent(
2081 """
2082 The below examples will show rolling mean calculations with window sizes of
2083 two and three, respectively.
2084
2085 >>> s = pd.Series([1, 2, 3, 4])
2086 >>> s.rolling(2).mean()
2087 0 NaN
2088 1 1.5
2089 2 2.5
2090 3 3.5
2091 dtype: float64
2092
2093 >>> s.rolling(3).mean()
2094 0 NaN
2095 1 NaN
2096 2 2.0
2097 3 3.0
2098 dtype: float64
2099 """
2100 ).replace("\n", "", 1),
2101 window_method="rolling",
2102 aggregation_description="mean",
2103 agg_method="mean",
2104 )
2105 def mean(
2106 self,
2107 numeric_only: bool = False,
2108 engine: str | None = None,
2109 engine_kwargs: dict[str, bool] | None = None,
2110 ):
2111 return super().mean(
2112 numeric_only=numeric_only,
2113 engine=engine,
2114 engine_kwargs=engine_kwargs,
2115 )
2116
2117 @doc(
2118 template_header,
2119 create_section_header("Parameters"),
2120 kwargs_numeric_only,
2121 window_agg_numba_parameters(),
2122 create_section_header("Returns"),
2123 template_returns,
2124 create_section_header("See Also"),
2125 template_see_also,
2126 create_section_header("Notes"),
2127 numba_notes,
2128 create_section_header("Examples"),
2129 dedent(
2130 """
2131 Compute the rolling median of a series with a window size of 3.
2132
2133 >>> s = pd.Series([0, 1, 2, 3, 4])
2134 >>> s.rolling(3).median()
2135 0 NaN
2136 1 NaN
2137 2 1.0
2138 3 2.0
2139 4 3.0
2140 dtype: float64
2141 """
2142 ).replace("\n", "", 1),
2143 window_method="rolling",
2144 aggregation_description="median",
2145 agg_method="median",
2146 )
2147 def median(
2148 self,
2149 numeric_only: bool = False,
2150 engine: str | None = None,
2151 engine_kwargs: dict[str, bool] | None = None,
2152 ):
2153 return super().median(
2154 numeric_only=numeric_only,
2155 engine=engine,
2156 engine_kwargs=engine_kwargs,
2157 )
2158
2159 @doc(
2160 template_header,
2161 create_section_header("Parameters"),
2162 dedent(
2163 """
2164 ddof : int, default 1
2165 Delta Degrees of Freedom. The divisor used in calculations
2166 is ``N - ddof``, where ``N`` represents the number of elements.
2167 """
2168 ).replace("\n", "", 1),
2169 kwargs_numeric_only,
2170 window_agg_numba_parameters("1.4"),
2171 create_section_header("Returns"),
2172 template_returns,
2173 create_section_header("See Also"),
2174 "numpy.std : Equivalent method for NumPy array.\n",
2175 template_see_also,
2176 create_section_header("Notes"),
2177 dedent(
2178 """
2179 The default ``ddof`` of 1 used in :meth:`Series.std` is different
2180 than the default ``ddof`` of 0 in :func:`numpy.std`.
2181
2182 A minimum of one period is required for the rolling calculation.\n
2183 """
2184 ).replace("\n", "", 1),
2185 create_section_header("Examples"),
2186 dedent(
2187 """
2188 >>> s = pd.Series([5, 5, 6, 7, 5, 5, 5])
2189 >>> s.rolling(3).std()
2190 0 NaN
2191 1 NaN
2192 2 0.577350
2193 3 1.000000
2194 4 1.000000
2195 5 1.154701
2196 6 0.000000
2197 dtype: float64
2198 """
2199 ).replace("\n", "", 1),
2200 window_method="rolling",
2201 aggregation_description="standard deviation",
2202 agg_method="std",
2203 )
2204 def std(
2205 self,
2206 ddof: int = 1,
2207 numeric_only: bool = False,
2208 engine: str | None = None,
2209 engine_kwargs: dict[str, bool] | None = None,
2210 ):
2211 return super().std(
2212 ddof=ddof,
2213 numeric_only=numeric_only,
2214 engine=engine,
2215 engine_kwargs=engine_kwargs,
2216 )
2217
2218 @doc(
2219 template_header,
2220 create_section_header("Parameters"),
2221 dedent(
2222 """
2223 ddof : int, default 1
2224 Delta Degrees of Freedom. The divisor used in calculations
2225 is ``N - ddof``, where ``N`` represents the number of elements.
2226 """
2227 ).replace("\n", "", 1),
2228 kwargs_numeric_only,
2229 window_agg_numba_parameters("1.4"),
2230 create_section_header("Returns"),
2231 template_returns,
2232 create_section_header("See Also"),
2233 "numpy.var : Equivalent method for NumPy array.\n",
2234 template_see_also,
2235 create_section_header("Notes"),
2236 dedent(
2237 """
2238 The default ``ddof`` of 1 used in :meth:`Series.var` is different
2239 than the default ``ddof`` of 0 in :func:`numpy.var`.
2240
2241 A minimum of one period is required for the rolling calculation.\n
2242 """
2243 ).replace("\n", "", 1),
2244 create_section_header("Examples"),
2245 dedent(
2246 """
2247 >>> s = pd.Series([5, 5, 6, 7, 5, 5, 5])
2248 >>> s.rolling(3).var()
2249 0 NaN
2250 1 NaN
2251 2 0.333333
2252 3 1.000000
2253 4 1.000000
2254 5 1.333333
2255 6 0.000000
2256 dtype: float64
2257 """
2258 ).replace("\n", "", 1),
2259 window_method="rolling",
2260 aggregation_description="variance",
2261 agg_method="var",
2262 )
2263 def var(
2264 self,
2265 ddof: int = 1,
2266 numeric_only: bool = False,
2267 engine: str | None = None,
2268 engine_kwargs: dict[str, bool] | None = None,
2269 ):
2270 return super().var(
2271 ddof=ddof,
2272 numeric_only=numeric_only,
2273 engine=engine,
2274 engine_kwargs=engine_kwargs,
2275 )
2276
2277 @doc(
2278 template_header,
2279 create_section_header("Parameters"),
2280 kwargs_numeric_only,
2281 create_section_header("Returns"),
2282 template_returns,
2283 create_section_header("See Also"),
2284 "scipy.stats.skew : Third moment of a probability density.\n",
2285 template_see_also,
2286 create_section_header("Notes"),
2287 "A minimum of three periods is required for the rolling calculation.\n",
2288 window_method="rolling",
2289 aggregation_description="unbiased skewness",
2290 agg_method="skew",
2291 )
2292 def skew(self, numeric_only: bool = False):
2293 return super().skew(numeric_only=numeric_only)
2294
2295 @doc(
2296 template_header,
2297 create_section_header("Parameters"),
2298 dedent(
2299 """
2300 ddof : int, default 1
2301 Delta Degrees of Freedom. The divisor used in calculations
2302 is ``N - ddof``, where ``N`` represents the number of elements.
2303 """
2304 ).replace("\n", "", 1),
2305 kwargs_numeric_only,
2306 create_section_header("Returns"),
2307 template_returns,
2308 create_section_header("See Also"),
2309 template_see_also,
2310 create_section_header("Notes"),
2311 "A minimum of one period is required for the calculation.\n\n",
2312 create_section_header("Examples"),
2313 dedent(
2314 """
2315 >>> s = pd.Series([0, 1, 2, 3])
2316 >>> s.rolling(2, min_periods=1).sem()
2317 0 NaN
2318 1 0.707107
2319 2 0.707107
2320 3 0.707107
2321 dtype: float64
2322 """
2323 ).replace("\n", "", 1),
2324 window_method="rolling",
2325 aggregation_description="standard error of mean",
2326 agg_method="sem",
2327 )
2328 def sem(self, ddof: int = 1, numeric_only: bool = False):
2329 # Raise here so error message says sem instead of std
2330 self._validate_numeric_only("sem", numeric_only)
2331 return self.std(numeric_only=numeric_only) / (
2332 self.count(numeric_only) - ddof
2333 ).pow(0.5)
2334
2335 @doc(
2336 template_header,
2337 create_section_header("Parameters"),
2338 kwargs_numeric_only,
2339 create_section_header("Returns"),
2340 template_returns,
2341 create_section_header("See Also"),
2342 "scipy.stats.kurtosis : Reference SciPy method.\n",
2343 template_see_also,
2344 create_section_header("Notes"),
2345 "A minimum of four periods is required for the calculation.\n\n",
2346 create_section_header("Examples"),
2347 dedent(
2348 """
2349 The example below will show a rolling calculation with a window size of
2350 four matching the equivalent function call using `scipy.stats`.
2351
2352 >>> arr = [1, 2, 3, 4, 999]
2353 >>> import scipy.stats
2354 >>> print(f"{{scipy.stats.kurtosis(arr[:-1], bias=False):.6f}}")
2355 -1.200000
2356 >>> print(f"{{scipy.stats.kurtosis(arr[1:], bias=False):.6f}}")
2357 3.999946
2358 >>> s = pd.Series(arr)
2359 >>> s.rolling(4).kurt()
2360 0 NaN
2361 1 NaN
2362 2 NaN
2363 3 -1.200000
2364 4 3.999946
2365 dtype: float64
2366 """
2367 ).replace("\n", "", 1),
2368 window_method="rolling",
2369 aggregation_description="Fisher's definition of kurtosis without bias",
2370 agg_method="kurt",
2371 )
2372 def kurt(self, numeric_only: bool = False):
2373 return super().kurt(numeric_only=numeric_only)
2374
2375 @doc(
2376 template_header,
2377 create_section_header("Parameters"),
2378 dedent(
2379 """
2380 quantile : float
2381 Quantile to compute. 0 <= quantile <= 1.
2382 interpolation : {{'linear', 'lower', 'higher', 'midpoint', 'nearest'}}
2383 This optional parameter specifies the interpolation method to use,
2384 when the desired quantile lies between two data points `i` and `j`:
2385
2386 * linear: `i + (j - i) * fraction`, where `fraction` is the
2387 fractional part of the index surrounded by `i` and `j`.
2388 * lower: `i`.
2389 * higher: `j`.
2390 * nearest: `i` or `j` whichever is nearest.
2391 * midpoint: (`i` + `j`) / 2.
2392 """
2393 ).replace("\n", "", 1),
2394 kwargs_numeric_only,
2395 create_section_header("Returns"),
2396 template_returns,
2397 create_section_header("See Also"),
2398 template_see_also,
2399 create_section_header("Examples"),
2400 dedent(
2401 """
2402 >>> s = pd.Series([1, 2, 3, 4])
2403 >>> s.rolling(2).quantile(.4, interpolation='lower')
2404 0 NaN
2405 1 1.0
2406 2 2.0
2407 3 3.0
2408 dtype: float64
2409
2410 >>> s.rolling(2).quantile(.4, interpolation='midpoint')
2411 0 NaN
2412 1 1.5
2413 2 2.5
2414 3 3.5
2415 dtype: float64
2416 """
2417 ).replace("\n", "", 1),
2418 window_method="rolling",
2419 aggregation_description="quantile",
2420 agg_method="quantile",
2421 )
2422 def quantile(
2423 self,
2424 quantile: float,
2425 interpolation: QuantileInterpolation = "linear",
2426 numeric_only: bool = False,
2427 ):
2428 return super().quantile(
2429 quantile=quantile,
2430 interpolation=interpolation,
2431 numeric_only=numeric_only,
2432 )
2433
2434 @doc(
2435 template_header,
2436 ".. versionadded:: 1.4.0 \n\n",
2437 create_section_header("Parameters"),
2438 dedent(
2439 """
2440 method : {{'average', 'min', 'max'}}, default 'average'
2441 How to rank the group of records that have the same value (i.e. ties):
2442
2443 * average: average rank of the group
2444 * min: lowest rank in the group
2445 * max: highest rank in the group
2446
2447 ascending : bool, default True
2448 Whether or not the elements should be ranked in ascending order.
2449 pct : bool, default False
2450 Whether or not to display the returned rankings in percentile
2451 form.
2452 """
2453 ).replace("\n", "", 1),
2454 kwargs_numeric_only,
2455 create_section_header("Returns"),
2456 template_returns,
2457 create_section_header("See Also"),
2458 template_see_also,
2459 create_section_header("Examples"),
2460 dedent(
2461 """
2462 >>> s = pd.Series([1, 4, 2, 3, 5, 3])
2463 >>> s.rolling(3).rank()
2464 0 NaN
2465 1 NaN
2466 2 2.0
2467 3 2.0
2468 4 3.0
2469 5 1.5
2470 dtype: float64
2471
2472 >>> s.rolling(3).rank(method="max")
2473 0 NaN
2474 1 NaN
2475 2 2.0
2476 3 2.0
2477 4 3.0
2478 5 2.0
2479 dtype: float64
2480
2481 >>> s.rolling(3).rank(method="min")
2482 0 NaN
2483 1 NaN
2484 2 2.0
2485 3 2.0
2486 4 3.0
2487 5 1.0
2488 dtype: float64
2489 """
2490 ).replace("\n", "", 1),
2491 window_method="rolling",
2492 aggregation_description="rank",
2493 agg_method="rank",
2494 )
2495 def rank(
2496 self,
2497 method: WindowingRankType = "average",
2498 ascending: bool = True,
2499 pct: bool = False,
2500 numeric_only: bool = False,
2501 ):
2502 return super().rank(
2503 method=method,
2504 ascending=ascending,
2505 pct=pct,
2506 numeric_only=numeric_only,
2507 )
2508
2509 @doc(
2510 template_header,
2511 create_section_header("Parameters"),
2512 dedent(
2513 """
2514 other : Series or DataFrame, optional
2515 If not supplied then will default to self and produce pairwise
2516 output.
2517 pairwise : bool, default None
2518 If False then only matching columns between self and other will be
2519 used and the output will be a DataFrame.
2520 If True then all pairwise combinations will be calculated and the
2521 output will be a MultiIndexed DataFrame in the case of DataFrame
2522 inputs. In the case of missing elements, only complete pairwise
2523 observations will be used.
2524 ddof : int, default 1
2525 Delta Degrees of Freedom. The divisor used in calculations
2526 is ``N - ddof``, where ``N`` represents the number of elements.
2527 """
2528 ).replace("\n", "", 1),
2529 kwargs_numeric_only,
2530 create_section_header("Returns"),
2531 template_returns,
2532 create_section_header("See Also"),
2533 template_see_also[:-1],
2534 window_method="rolling",
2535 aggregation_description="sample covariance",
2536 agg_method="cov",
2537 )
2538 def cov(
2539 self,
2540 other: DataFrame | Series | None = None,
2541 pairwise: bool | None = None,
2542 ddof: int = 1,
2543 numeric_only: bool = False,
2544 ):
2545 return super().cov(
2546 other=other,
2547 pairwise=pairwise,
2548 ddof=ddof,
2549 numeric_only=numeric_only,
2550 )
2551
2552 @doc(
2553 template_header,
2554 create_section_header("Parameters"),
2555 dedent(
2556 """
2557 other : Series or DataFrame, optional
2558 If not supplied then will default to self and produce pairwise
2559 output.
2560 pairwise : bool, default None
2561 If False then only matching columns between self and other will be
2562 used and the output will be a DataFrame.
2563 If True then all pairwise combinations will be calculated and the
2564 output will be a MultiIndexed DataFrame in the case of DataFrame
2565 inputs. In the case of missing elements, only complete pairwise
2566 observations will be used.
2567 ddof : int, default 1
2568 Delta Degrees of Freedom. The divisor used in calculations
2569 is ``N - ddof``, where ``N`` represents the number of elements.
2570 """
2571 ).replace("\n", "", 1),
2572 kwargs_numeric_only,
2573 create_section_header("Returns"),
2574 template_returns,
2575 create_section_header("See Also"),
2576 dedent(
2577 """
2578 cov : Similar method to calculate covariance.
2579 numpy.corrcoef : NumPy Pearson's correlation calculation.
2580 """
2581 ).replace("\n", "", 1),
2582 template_see_also,
2583 create_section_header("Notes"),
2584 dedent(
2585 """
2586 This function uses Pearson's definition of correlation
2587 (https://en.wikipedia.org/wiki/Pearson_correlation_coefficient).
2588
2589 When `other` is not specified, the output will be self correlation (e.g.
2590 all 1's), except for :class:`~pandas.DataFrame` inputs with `pairwise`
2591 set to `True`.
2592
2593 Function will return ``NaN`` for correlations of equal valued sequences;
2594 this is the result of a 0/0 division error.
2595
2596 When `pairwise` is set to `False`, only matching columns between `self` and
2597 `other` will be used.
2598
2599 When `pairwise` is set to `True`, the output will be a MultiIndex DataFrame
2600 with the original index on the first level, and the `other` DataFrame
2601 columns on the second level.
2602
2603 In the case of missing elements, only complete pairwise observations
2604 will be used.\n
2605 """
2606 ).replace("\n", "", 1),
2607 create_section_header("Examples"),
2608 dedent(
2609 """
2610 The below example shows a rolling calculation with a window size of
2611 four matching the equivalent function call using :meth:`numpy.corrcoef`.
2612
2613 >>> v1 = [3, 3, 3, 5, 8]
2614 >>> v2 = [3, 4, 4, 4, 8]
2615 >>> # numpy returns a 2X2 array, the correlation coefficient
2616 >>> # is the number at entry [0][1]
2617 >>> print(f"{{np.corrcoef(v1[:-1], v2[:-1])[0][1]:.6f}}")
2618 0.333333
2619 >>> print(f"{{np.corrcoef(v1[1:], v2[1:])[0][1]:.6f}}")
2620 0.916949
2621 >>> s1 = pd.Series(v1)
2622 >>> s2 = pd.Series(v2)
2623 >>> s1.rolling(4).corr(s2)
2624 0 NaN
2625 1 NaN
2626 2 NaN
2627 3 0.333333
2628 4 0.916949
2629 dtype: float64
2630
2631 The below example shows a similar rolling calculation on a
2632 DataFrame using the pairwise option.
2633
2634 >>> matrix = np.array([[51., 35.], [49., 30.], [47., 32.],\
2635 [46., 31.], [50., 36.]])
2636 >>> print(np.corrcoef(matrix[:-1,0], matrix[:-1,1]).round(7))
2637 [[1. 0.6263001]
2638 [0.6263001 1. ]]
2639 >>> print(np.corrcoef(matrix[1:,0], matrix[1:,1]).round(7))
2640 [[1. 0.5553681]
2641 [0.5553681 1. ]]
2642 >>> df = pd.DataFrame(matrix, columns=['X','Y'])
2643 >>> df
2644 X Y
2645 0 51.0 35.0
2646 1 49.0 30.0
2647 2 47.0 32.0
2648 3 46.0 31.0
2649 4 50.0 36.0
2650 >>> df.rolling(4).corr(pairwise=True)
2651 X Y
2652 0 X NaN NaN
2653 Y NaN NaN
2654 1 X NaN NaN
2655 Y NaN NaN
2656 2 X NaN NaN
2657 Y NaN NaN
2658 3 X 1.000000 0.626300
2659 Y 0.626300 1.000000
2660 4 X 1.000000 0.555368
2661 Y 0.555368 1.000000
2662 """
2663 ).replace("\n", "", 1),
2664 window_method="rolling",
2665 aggregation_description="correlation",
2666 agg_method="corr",
2667 )
2668 def corr(
2669 self,
2670 other: DataFrame | Series | None = None,
2671 pairwise: bool | None = None,
2672 ddof: int = 1,
2673 numeric_only: bool = False,
2674 ):
2675 return super().corr(
2676 other=other,
2677 pairwise=pairwise,
2678 ddof=ddof,
2679 numeric_only=numeric_only,
2680 )
2681
2682
2683Rolling.__doc__ = Window.__doc__
2684
2685
2686class RollingGroupby(BaseWindowGroupby, Rolling):
2687 """
2688 Provide a rolling groupby implementation.
2689 """
2690
2691 _attributes = Rolling._attributes + BaseWindowGroupby._attributes
2692
2693 def _get_window_indexer(self) -> GroupbyIndexer:
2694 """
2695 Return an indexer class that will compute the window start and end bounds
2696
2697 Returns
2698 -------
2699 GroupbyIndexer
2700 """
2701 rolling_indexer: type[BaseIndexer]
2702 indexer_kwargs: dict[str, Any] | None = None
2703 index_array = self._index_array
2704 if isinstance(self.window, BaseIndexer):
2705 rolling_indexer = type(self.window)
2706 indexer_kwargs = self.window.__dict__.copy()
2707 assert isinstance(indexer_kwargs, dict) # for mypy
2708 # We'll be using the index of each group later
2709 indexer_kwargs.pop("index_array", None)
2710 window = self.window
2711 elif self._win_freq_i8 is not None:
2712 rolling_indexer = VariableWindowIndexer
2713 # error: Incompatible types in assignment (expression has type
2714 # "int", variable has type "BaseIndexer")
2715 window = self._win_freq_i8 # type: ignore[assignment]
2716 else:
2717 rolling_indexer = FixedWindowIndexer
2718 window = self.window
2719 window_indexer = GroupbyIndexer(
2720 index_array=index_array,
2721 window_size=window,
2722 groupby_indices=self._grouper.indices,
2723 window_indexer=rolling_indexer,
2724 indexer_kwargs=indexer_kwargs,
2725 )
2726 return window_indexer
2727
2728 def _validate_datetimelike_monotonic(self):
2729 """
2730 Validate that each group in self._on is monotonic
2731 """
2732 # GH 46061
2733 if self._on.hasnans:
2734 self._raise_monotonic_error("values must not have NaT")
2735 for group_indices in self._grouper.indices.values():
2736 group_on = self._on.take(group_indices)
2737 if not (
2738 group_on.is_monotonic_increasing or group_on.is_monotonic_decreasing
2739 ):
2740 on = "index" if self.on is None else self.on
2741 raise ValueError(
2742 f"Each group within {on} must be monotonic. "
2743 f"Sort the values in {on} first."
2744 )