1"""
2Provide a generic structure to support window functions,
3similar to how we have a Groupby object.
4"""
5from __future__ import annotations
6
7import copy
8from datetime import timedelta
9from functools import partial
10import inspect
11from textwrap import dedent
12from typing import (
13 TYPE_CHECKING,
14 Any,
15 Callable,
16 Literal,
17)
18
19import numpy as np
20
21from pandas._libs.tslibs import (
22 BaseOffset,
23 Timedelta,
24 to_offset,
25)
26import pandas._libs.window.aggregations as window_aggregations
27from pandas.compat._optional import import_optional_dependency
28from pandas.errors import DataError
29from pandas.util._decorators import (
30 deprecate_kwarg,
31 doc,
32)
33
34from pandas.core.dtypes.common import (
35 ensure_float64,
36 is_bool,
37 is_integer,
38 is_numeric_dtype,
39 needs_i8_conversion,
40)
41from pandas.core.dtypes.dtypes import ArrowDtype
42from pandas.core.dtypes.generic import (
43 ABCDataFrame,
44 ABCSeries,
45)
46from pandas.core.dtypes.missing import notna
47
48from pandas.core._numba import executor
49from pandas.core.algorithms import factorize
50from pandas.core.apply import ResamplerWindowApply
51from pandas.core.arrays import ExtensionArray
52from pandas.core.base import SelectionMixin
53import pandas.core.common as com
54from pandas.core.indexers.objects import (
55 BaseIndexer,
56 FixedWindowIndexer,
57 GroupbyIndexer,
58 VariableWindowIndexer,
59)
60from pandas.core.indexes.api import (
61 DatetimeIndex,
62 Index,
63 MultiIndex,
64 PeriodIndex,
65 TimedeltaIndex,
66)
67from pandas.core.reshape.concat import concat
68from pandas.core.util.numba_ import (
69 get_jit_arguments,
70 maybe_use_numba,
71)
72from pandas.core.window.common import (
73 flex_binary_moment,
74 zsqrt,
75)
76from pandas.core.window.doc import (
77 _shared_docs,
78 create_section_header,
79 kwargs_numeric_only,
80 kwargs_scipy,
81 numba_notes,
82 template_header,
83 template_returns,
84 template_see_also,
85 window_agg_numba_parameters,
86 window_apply_parameters,
87)
88from pandas.core.window.numba_ import (
89 generate_manual_numpy_nan_agg_with_axis,
90 generate_numba_apply_func,
91 generate_numba_table_func,
92)
93
94if TYPE_CHECKING:
95 from collections.abc import (
96 Hashable,
97 Iterator,
98 Sized,
99 )
100
101 from pandas._typing import (
102 ArrayLike,
103 Axis,
104 NDFrameT,
105 QuantileInterpolation,
106 WindowingRankType,
107 npt,
108 )
109
110 from pandas import (
111 DataFrame,
112 Series,
113 )
114 from pandas.core.generic import NDFrame
115 from pandas.core.groupby.ops import BaseGrouper
116
117from pandas.core.arrays.datetimelike import dtype_to_unit
118
119
120class BaseWindow(SelectionMixin):
121 """Provides utilities for performing windowing operations."""
122
123 _attributes: list[str] = []
124 exclusions: frozenset[Hashable] = frozenset()
125 _on: Index
126
127 def __init__(
128 self,
129 obj: NDFrame,
130 window=None,
131 min_periods: int | None = None,
132 center: bool | None = False,
133 win_type: str | None = None,
134 axis: Axis = 0,
135 on: str | Index | None = None,
136 closed: str | None = None,
137 step: int | None = None,
138 method: str = "single",
139 *,
140 selection=None,
141 ) -> None:
142 self.obj = obj
143 self.on = on
144 self.closed = closed
145 self.step = step
146 self.window = window
147 self.min_periods = min_periods
148 self.center = center
149 self.win_type = win_type
150 self.axis = obj._get_axis_number(axis) if axis is not None else None
151 self.method = method
152 self._win_freq_i8: int | None = None
153 if self.on is None:
154 if self.axis == 0:
155 self._on = self.obj.index
156 else:
157 # i.e. self.axis == 1
158 self._on = self.obj.columns
159 elif isinstance(self.on, Index):
160 self._on = self.on
161 elif isinstance(self.obj, ABCDataFrame) and self.on in self.obj.columns:
162 self._on = Index(self.obj[self.on])
163 else:
164 raise ValueError(
165 f"invalid on specified as {self.on}, "
166 "must be a column (of DataFrame), an Index or None"
167 )
168
169 self._selection = selection
170 self._validate()
171
172 def _validate(self) -> None:
173 if self.center is not None and not is_bool(self.center):
174 raise ValueError("center must be a boolean")
175 if self.min_periods is not None:
176 if not is_integer(self.min_periods):
177 raise ValueError("min_periods must be an integer")
178 if self.min_periods < 0:
179 raise ValueError("min_periods must be >= 0")
180 if is_integer(self.window) and self.min_periods > self.window:
181 raise ValueError(
182 f"min_periods {self.min_periods} must be <= window {self.window}"
183 )
184 if self.closed is not None and self.closed not in [
185 "right",
186 "both",
187 "left",
188 "neither",
189 ]:
190 raise ValueError("closed must be 'right', 'left', 'both' or 'neither'")
191 if not isinstance(self.obj, (ABCSeries, ABCDataFrame)):
192 raise TypeError(f"invalid type: {type(self)}")
193 if isinstance(self.window, BaseIndexer):
194 # Validate that the passed BaseIndexer subclass has
195 # a get_window_bounds with the correct signature.
196 get_window_bounds_signature = inspect.signature(
197 self.window.get_window_bounds
198 ).parameters.keys()
199 expected_signature = inspect.signature(
200 BaseIndexer().get_window_bounds
201 ).parameters.keys()
202 if get_window_bounds_signature != expected_signature:
203 raise ValueError(
204 f"{type(self.window).__name__} does not implement "
205 f"the correct signature for get_window_bounds"
206 )
207 if self.method not in ["table", "single"]:
208 raise ValueError("method must be 'table' or 'single")
209 if self.step is not None:
210 if not is_integer(self.step):
211 raise ValueError("step must be an integer")
212 if self.step < 0:
213 raise ValueError("step must be >= 0")
214
215 def _check_window_bounds(
216 self, start: np.ndarray, end: np.ndarray, num_vals: int
217 ) -> None:
218 if len(start) != len(end):
219 raise ValueError(
220 f"start ({len(start)}) and end ({len(end)}) bounds must be the "
221 f"same length"
222 )
223 if len(start) != (num_vals + (self.step or 1) - 1) // (self.step or 1):
224 raise ValueError(
225 f"start and end bounds ({len(start)}) must be the same length "
226 f"as the object ({num_vals}) divided by the step ({self.step}) "
227 f"if given and rounded up"
228 )
229
230 def _slice_axis_for_step(self, index: Index, result: Sized | None = None) -> Index:
231 """
232 Slices the index for a given result and the preset step.
233 """
234 return (
235 index
236 if result is None or len(result) == len(index)
237 else index[:: self.step]
238 )
239
240 def _validate_numeric_only(self, name: str, numeric_only: bool) -> None:
241 """
242 Validate numeric_only argument, raising if invalid for the input.
243
244 Parameters
245 ----------
246 name : str
247 Name of the operator (kernel).
248 numeric_only : bool
249 Value passed by user.
250 """
251 if (
252 self._selected_obj.ndim == 1
253 and numeric_only
254 and not is_numeric_dtype(self._selected_obj.dtype)
255 ):
256 raise NotImplementedError(
257 f"{type(self).__name__}.{name} does not implement numeric_only"
258 )
259
260 def _make_numeric_only(self, obj: NDFrameT) -> NDFrameT:
261 """Subset DataFrame to numeric columns.
262
263 Parameters
264 ----------
265 obj : DataFrame
266
267 Returns
268 -------
269 obj subset to numeric-only columns.
270 """
271 result = obj.select_dtypes(include=["number"], exclude=["timedelta"])
272 return result
273
274 def _create_data(self, obj: NDFrameT, numeric_only: bool = False) -> NDFrameT:
275 """
276 Split data into blocks & return conformed data.
277 """
278 # filter out the on from the object
279 if self.on is not None and not isinstance(self.on, Index) and obj.ndim == 2:
280 obj = obj.reindex(columns=obj.columns.difference([self.on]), copy=False)
281 if obj.ndim > 1 and (numeric_only or self.axis == 1):
282 # GH: 20649 in case of mixed dtype and axis=1 we have to convert everything
283 # to float to calculate the complete row at once. We exclude all non-numeric
284 # dtypes.
285 obj = self._make_numeric_only(obj)
286 if self.axis == 1:
287 obj = obj.astype("float64", copy=False)
288 obj._mgr = obj._mgr.consolidate()
289 return obj
290
291 def _gotitem(self, key, ndim, subset=None):
292 """
293 Sub-classes to define. Return a sliced object.
294
295 Parameters
296 ----------
297 key : str / list of selections
298 ndim : {1, 2}
299 requested ndim of result
300 subset : object, default None
301 subset to act on
302 """
303 # create a new object to prevent aliasing
304 if subset is None:
305 subset = self.obj
306
307 # we need to make a shallow copy of ourselves
308 # with the same groupby
309 kwargs = {attr: getattr(self, attr) for attr in self._attributes}
310
311 selection = self._infer_selection(key, subset)
312 new_win = type(self)(subset, selection=selection, **kwargs)
313 return new_win
314
315 def __getattr__(self, attr: str):
316 if attr in self._internal_names_set:
317 return object.__getattribute__(self, attr)
318 if attr in self.obj:
319 return self[attr]
320
321 raise AttributeError(
322 f"'{type(self).__name__}' object has no attribute '{attr}'"
323 )
324
325 def _dir_additions(self):
326 return self.obj._dir_additions()
327
328 def __repr__(self) -> str:
329 """
330 Provide a nice str repr of our rolling object.
331 """
332 attrs_list = (
333 f"{attr_name}={getattr(self, attr_name)}"
334 for attr_name in self._attributes
335 if getattr(self, attr_name, None) is not None and attr_name[0] != "_"
336 )
337 attrs = ",".join(attrs_list)
338 return f"{type(self).__name__} [{attrs}]"
339
340 def __iter__(self) -> Iterator:
341 obj = self._selected_obj.set_axis(self._on)
342 obj = self._create_data(obj)
343 indexer = self._get_window_indexer()
344
345 start, end = indexer.get_window_bounds(
346 num_values=len(obj),
347 min_periods=self.min_periods,
348 center=self.center,
349 closed=self.closed,
350 step=self.step,
351 )
352 self._check_window_bounds(start, end, len(obj))
353
354 for s, e in zip(start, end):
355 result = obj.iloc[slice(s, e)]
356 yield result
357
358 def _prep_values(self, values: ArrayLike) -> np.ndarray:
359 """Convert input to numpy arrays for Cython routines"""
360 if needs_i8_conversion(values.dtype):
361 raise NotImplementedError(
362 f"ops for {type(self).__name__} for this "
363 f"dtype {values.dtype} are not implemented"
364 )
365 # GH #12373 : rolling functions error on float32 data
366 # make sure the data is coerced to float64
367 try:
368 if isinstance(values, ExtensionArray):
369 values = values.to_numpy(np.float64, na_value=np.nan)
370 else:
371 values = ensure_float64(values)
372 except (ValueError, TypeError) as err:
373 raise TypeError(f"cannot handle this type -> {values.dtype}") from err
374
375 # Convert inf to nan for C funcs
376 inf = np.isinf(values)
377 if inf.any():
378 values = np.where(inf, np.nan, values)
379
380 return values
381
382 def _insert_on_column(self, result: DataFrame, obj: DataFrame) -> None:
383 # if we have an 'on' column we want to put it back into
384 # the results in the same location
385 from pandas import Series
386
387 if self.on is not None and not self._on.equals(obj.index):
388 name = self._on.name
389 extra_col = Series(self._on, index=self.obj.index, name=name, copy=False)
390 if name in result.columns:
391 # TODO: sure we want to overwrite results?
392 result[name] = extra_col
393 elif name in result.index.names:
394 pass
395 elif name in self._selected_obj.columns:
396 # insert in the same location as we had in _selected_obj
397 old_cols = self._selected_obj.columns
398 new_cols = result.columns
399 old_loc = old_cols.get_loc(name)
400 overlap = new_cols.intersection(old_cols[:old_loc])
401 new_loc = len(overlap)
402 result.insert(new_loc, name, extra_col)
403 else:
404 # insert at the end
405 result[name] = extra_col
406
407 @property
408 def _index_array(self) -> npt.NDArray[np.int64] | None:
409 # TODO: why do we get here with e.g. MultiIndex?
410 if isinstance(self._on, (PeriodIndex, DatetimeIndex, TimedeltaIndex)):
411 return self._on.asi8
412 elif isinstance(self._on.dtype, ArrowDtype) and self._on.dtype.kind in "mM":
413 return self._on.to_numpy(dtype=np.int64)
414 return None
415
416 def _resolve_output(self, out: DataFrame, obj: DataFrame) -> DataFrame:
417 """Validate and finalize result."""
418 if out.shape[1] == 0 and obj.shape[1] > 0:
419 raise DataError("No numeric types to aggregate")
420 if out.shape[1] == 0:
421 return obj.astype("float64")
422
423 self._insert_on_column(out, obj)
424 return out
425
426 def _get_window_indexer(self) -> BaseIndexer:
427 """
428 Return an indexer class that will compute the window start and end bounds
429 """
430 if isinstance(self.window, BaseIndexer):
431 return self.window
432 if self._win_freq_i8 is not None:
433 return VariableWindowIndexer(
434 index_array=self._index_array,
435 window_size=self._win_freq_i8,
436 center=self.center,
437 )
438 return FixedWindowIndexer(window_size=self.window)
439
440 def _apply_series(
441 self, homogeneous_func: Callable[..., ArrayLike], name: str | None = None
442 ) -> Series:
443 """
444 Series version of _apply_columnwise
445 """
446 obj = self._create_data(self._selected_obj)
447
448 if name == "count":
449 # GH 12541: Special case for count where we support date-like types
450 obj = notna(obj).astype(int)
451 try:
452 values = self._prep_values(obj._values)
453 except (TypeError, NotImplementedError) as err:
454 raise DataError("No numeric types to aggregate") from err
455
456 result = homogeneous_func(values)
457 index = self._slice_axis_for_step(obj.index, result)
458 return obj._constructor(result, index=index, name=obj.name)
459
460 def _apply_columnwise(
461 self,
462 homogeneous_func: Callable[..., ArrayLike],
463 name: str,
464 numeric_only: bool = False,
465 ) -> DataFrame | Series:
466 """
467 Apply the given function to the DataFrame broken down into homogeneous
468 sub-frames.
469 """
470 self._validate_numeric_only(name, numeric_only)
471 if self._selected_obj.ndim == 1:
472 return self._apply_series(homogeneous_func, name)
473
474 obj = self._create_data(self._selected_obj, numeric_only)
475 if name == "count":
476 # GH 12541: Special case for count where we support date-like types
477 obj = notna(obj).astype(int)
478 obj._mgr = obj._mgr.consolidate()
479
480 if self.axis == 1:
481 obj = obj.T
482
483 taker = []
484 res_values = []
485 for i, arr in enumerate(obj._iter_column_arrays()):
486 # GH#42736 operate column-wise instead of block-wise
487 # As of 2.0, hfunc will raise for nuisance columns
488 try:
489 arr = self._prep_values(arr)
490 except (TypeError, NotImplementedError) as err:
491 raise DataError(
492 f"Cannot aggregate non-numeric type: {arr.dtype}"
493 ) from err
494 res = homogeneous_func(arr)
495 res_values.append(res)
496 taker.append(i)
497
498 index = self._slice_axis_for_step(
499 obj.index, res_values[0] if len(res_values) > 0 else None
500 )
501 df = type(obj)._from_arrays(
502 res_values,
503 index=index,
504 columns=obj.columns.take(taker),
505 verify_integrity=False,
506 )
507
508 if self.axis == 1:
509 df = df.T
510
511 return self._resolve_output(df, obj)
512
513 def _apply_tablewise(
514 self,
515 homogeneous_func: Callable[..., ArrayLike],
516 name: str | None = None,
517 numeric_only: bool = False,
518 ) -> DataFrame | Series:
519 """
520 Apply the given function to the DataFrame across the entire object
521 """
522 if self._selected_obj.ndim == 1:
523 raise ValueError("method='table' not applicable for Series objects.")
524 obj = self._create_data(self._selected_obj, numeric_only)
525 values = self._prep_values(obj.to_numpy())
526 values = values.T if self.axis == 1 else values
527 result = homogeneous_func(values)
528 result = result.T if self.axis == 1 else result
529 index = self._slice_axis_for_step(obj.index, result)
530 columns = (
531 obj.columns
532 if result.shape[1] == len(obj.columns)
533 else obj.columns[:: self.step]
534 )
535 out = obj._constructor(result, index=index, columns=columns)
536
537 return self._resolve_output(out, obj)
538
539 def _apply_pairwise(
540 self,
541 target: DataFrame | Series,
542 other: DataFrame | Series | None,
543 pairwise: bool | None,
544 func: Callable[[DataFrame | Series, DataFrame | Series], DataFrame | Series],
545 numeric_only: bool,
546 ) -> DataFrame | Series:
547 """
548 Apply the given pairwise function given 2 pandas objects (DataFrame/Series)
549 """
550 target = self._create_data(target, numeric_only)
551 if other is None:
552 other = target
553 # only default unset
554 pairwise = True if pairwise is None else pairwise
555 elif not isinstance(other, (ABCDataFrame, ABCSeries)):
556 raise ValueError("other must be a DataFrame or Series")
557 elif other.ndim == 2 and numeric_only:
558 other = self._make_numeric_only(other)
559
560 return flex_binary_moment(target, other, func, pairwise=bool(pairwise))
561
562 def _apply(
563 self,
564 func: Callable[..., Any],
565 name: str,
566 numeric_only: bool = False,
567 numba_args: tuple[Any, ...] = (),
568 **kwargs,
569 ):
570 """
571 Rolling statistical measure using supplied function.
572
573 Designed to be used with passed-in Cython array-based functions.
574
575 Parameters
576 ----------
577 func : callable function to apply
578 name : str,
579 numba_args : tuple
580 args to be passed when func is a numba func
581 **kwargs
582 additional arguments for rolling function and window function
583
584 Returns
585 -------
586 y : type of input
587 """
588 window_indexer = self._get_window_indexer()
589 min_periods = (
590 self.min_periods
591 if self.min_periods is not None
592 else window_indexer.window_size
593 )
594
595 def homogeneous_func(values: np.ndarray):
596 # calculation function
597
598 if values.size == 0:
599 return values.copy()
600
601 def calc(x):
602 start, end = window_indexer.get_window_bounds(
603 num_values=len(x),
604 min_periods=min_periods,
605 center=self.center,
606 closed=self.closed,
607 step=self.step,
608 )
609 self._check_window_bounds(start, end, len(x))
610
611 return func(x, start, end, min_periods, *numba_args)
612
613 with np.errstate(all="ignore"):
614 result = calc(values)
615
616 return result
617
618 if self.method == "single":
619 return self._apply_columnwise(homogeneous_func, name, numeric_only)
620 else:
621 return self._apply_tablewise(homogeneous_func, name, numeric_only)
622
623 def _numba_apply(
624 self,
625 func: Callable[..., Any],
626 engine_kwargs: dict[str, bool] | None = None,
627 **func_kwargs,
628 ):
629 window_indexer = self._get_window_indexer()
630 min_periods = (
631 self.min_periods
632 if self.min_periods is not None
633 else window_indexer.window_size
634 )
635 obj = self._create_data(self._selected_obj)
636 if self.axis == 1:
637 obj = obj.T
638 values = self._prep_values(obj.to_numpy())
639 if values.ndim == 1:
640 values = values.reshape(-1, 1)
641 start, end = window_indexer.get_window_bounds(
642 num_values=len(values),
643 min_periods=min_periods,
644 center=self.center,
645 closed=self.closed,
646 step=self.step,
647 )
648 self._check_window_bounds(start, end, len(values))
649 # For now, map everything to float to match the Cython impl
650 # even though it is wrong
651 # TODO: Could preserve correct dtypes in future
652 # xref #53214
653 dtype_mapping = executor.float_dtype_mapping
654 aggregator = executor.generate_shared_aggregator(
655 func,
656 dtype_mapping,
657 is_grouped_kernel=False,
658 **get_jit_arguments(engine_kwargs),
659 )
660 result = aggregator(
661 values.T, start=start, end=end, min_periods=min_periods, **func_kwargs
662 ).T
663 result = result.T if self.axis == 1 else result
664 index = self._slice_axis_for_step(obj.index, result)
665 if obj.ndim == 1:
666 result = result.squeeze()
667 out = obj._constructor(result, index=index, name=obj.name)
668 return out
669 else:
670 columns = self._slice_axis_for_step(obj.columns, result.T)
671 out = obj._constructor(result, index=index, columns=columns)
672 return self._resolve_output(out, obj)
673
674 def aggregate(self, func, *args, **kwargs):
675 result = ResamplerWindowApply(self, func, args=args, kwargs=kwargs).agg()
676 if result is None:
677 return self.apply(func, raw=False, args=args, kwargs=kwargs)
678 return result
679
680 agg = aggregate
681
682
683class BaseWindowGroupby(BaseWindow):
684 """
685 Provide the groupby windowing facilities.
686 """
687
688 _grouper: BaseGrouper
689 _as_index: bool
690 _attributes: list[str] = ["_grouper"]
691
692 def __init__(
693 self,
694 obj: DataFrame | Series,
695 *args,
696 _grouper: BaseGrouper,
697 _as_index: bool = True,
698 **kwargs,
699 ) -> None:
700 from pandas.core.groupby.ops import BaseGrouper
701
702 if not isinstance(_grouper, BaseGrouper):
703 raise ValueError("Must pass a BaseGrouper object.")
704 self._grouper = _grouper
705 self._as_index = _as_index
706 # GH 32262: It's convention to keep the grouping column in
707 # groupby.<agg_func>, but unexpected to users in
708 # groupby.rolling.<agg_func>
709 obj = obj.drop(columns=self._grouper.names, errors="ignore")
710 # GH 15354
711 if kwargs.get("step") is not None:
712 raise NotImplementedError("step not implemented for groupby")
713 super().__init__(obj, *args, **kwargs)
714
715 def _apply(
716 self,
717 func: Callable[..., Any],
718 name: str,
719 numeric_only: bool = False,
720 numba_args: tuple[Any, ...] = (),
721 **kwargs,
722 ) -> DataFrame | Series:
723 result = super()._apply(
724 func,
725 name,
726 numeric_only,
727 numba_args,
728 **kwargs,
729 )
730 # Reconstruct the resulting MultiIndex
731 # 1st set of levels = group by labels
732 # 2nd set of levels = original DataFrame/Series index
733 grouped_object_index = self.obj.index
734 grouped_index_name = [*grouped_object_index.names]
735 groupby_keys = copy.copy(self._grouper.names)
736 result_index_names = groupby_keys + grouped_index_name
737
738 drop_columns = [
739 key
740 for key in self._grouper.names
741 if key not in self.obj.index.names or key is None
742 ]
743
744 if len(drop_columns) != len(groupby_keys):
745 # Our result will have still kept the column in the result
746 result = result.drop(columns=drop_columns, errors="ignore")
747
748 codes = self._grouper.codes
749 levels = copy.copy(self._grouper.levels)
750
751 group_indices = self._grouper.indices.values()
752 if group_indices:
753 indexer = np.concatenate(list(group_indices))
754 else:
755 indexer = np.array([], dtype=np.intp)
756 codes = [c.take(indexer) for c in codes]
757
758 # if the index of the original dataframe needs to be preserved, append
759 # this index (but reordered) to the codes/levels from the groupby
760 if grouped_object_index is not None:
761 idx = grouped_object_index.take(indexer)
762 if not isinstance(idx, MultiIndex):
763 idx = MultiIndex.from_arrays([idx])
764 codes.extend(list(idx.codes))
765 levels.extend(list(idx.levels))
766
767 result_index = MultiIndex(
768 levels, codes, names=result_index_names, verify_integrity=False
769 )
770
771 result.index = result_index
772 if not self._as_index:
773 result = result.reset_index(level=list(range(len(groupby_keys))))
774 return result
775
776 def _apply_pairwise(
777 self,
778 target: DataFrame | Series,
779 other: DataFrame | Series | None,
780 pairwise: bool | None,
781 func: Callable[[DataFrame | Series, DataFrame | Series], DataFrame | Series],
782 numeric_only: bool,
783 ) -> DataFrame | Series:
784 """
785 Apply the given pairwise function given 2 pandas objects (DataFrame/Series)
786 """
787 # Manually drop the grouping column first
788 target = target.drop(columns=self._grouper.names, errors="ignore")
789 result = super()._apply_pairwise(target, other, pairwise, func, numeric_only)
790 # 1) Determine the levels + codes of the groupby levels
791 if other is not None and not all(
792 len(group) == len(other) for group in self._grouper.indices.values()
793 ):
794 # GH 42915
795 # len(other) != len(any group), so must reindex (expand) the result
796 # from flex_binary_moment to a "transform"-like result
797 # per groupby combination
798 old_result_len = len(result)
799 result = concat(
800 [
801 result.take(gb_indices).reindex(result.index)
802 for gb_indices in self._grouper.indices.values()
803 ]
804 )
805
806 gb_pairs = (
807 com.maybe_make_list(pair) for pair in self._grouper.indices.keys()
808 )
809 groupby_codes = []
810 groupby_levels = []
811 # e.g. [[1, 2], [4, 5]] as [[1, 4], [2, 5]]
812 for gb_level_pair in map(list, zip(*gb_pairs)):
813 labels = np.repeat(np.array(gb_level_pair), old_result_len)
814 codes, levels = factorize(labels)
815 groupby_codes.append(codes)
816 groupby_levels.append(levels)
817 else:
818 # pairwise=True or len(other) == len(each group), so repeat
819 # the groupby labels by the number of columns in the original object
820 groupby_codes = self._grouper.codes
821 # error: Incompatible types in assignment (expression has type
822 # "List[Index]", variable has type "List[Union[ndarray, Index]]")
823 groupby_levels = self._grouper.levels # type: ignore[assignment]
824
825 group_indices = self._grouper.indices.values()
826 if group_indices:
827 indexer = np.concatenate(list(group_indices))
828 else:
829 indexer = np.array([], dtype=np.intp)
830
831 if target.ndim == 1:
832 repeat_by = 1
833 else:
834 repeat_by = len(target.columns)
835 groupby_codes = [
836 np.repeat(c.take(indexer), repeat_by) for c in groupby_codes
837 ]
838 # 2) Determine the levels + codes of the result from super()._apply_pairwise
839 if isinstance(result.index, MultiIndex):
840 result_codes = list(result.index.codes)
841 result_levels = list(result.index.levels)
842 result_names = list(result.index.names)
843 else:
844 idx_codes, idx_levels = factorize(result.index)
845 result_codes = [idx_codes]
846 result_levels = [idx_levels]
847 result_names = [result.index.name]
848
849 # 3) Create the resulting index by combining 1) + 2)
850 result_codes = groupby_codes + result_codes
851 result_levels = groupby_levels + result_levels
852 result_names = self._grouper.names + result_names
853
854 result_index = MultiIndex(
855 result_levels, result_codes, names=result_names, verify_integrity=False
856 )
857 result.index = result_index
858 return result
859
860 def _create_data(self, obj: NDFrameT, numeric_only: bool = False) -> NDFrameT:
861 """
862 Split data into blocks & return conformed data.
863 """
864 # Ensure the object we're rolling over is monotonically sorted relative
865 # to the groups
866 # GH 36197
867 if not obj.empty:
868 groupby_order = np.concatenate(list(self._grouper.indices.values())).astype(
869 np.int64
870 )
871 obj = obj.take(groupby_order)
872 return super()._create_data(obj, numeric_only)
873
874 def _gotitem(self, key, ndim, subset=None):
875 # we are setting the index on the actual object
876 # here so our index is carried through to the selected obj
877 # when we do the splitting for the groupby
878 if self.on is not None:
879 # GH 43355
880 subset = self.obj.set_index(self._on)
881 return super()._gotitem(key, ndim, subset=subset)
882
883
884class Window(BaseWindow):
885 """
886 Provide rolling window calculations.
887
888 Parameters
889 ----------
890 window : int, timedelta, str, offset, or BaseIndexer subclass
891 Size of the moving window.
892
893 If an integer, the fixed number of observations used for
894 each window.
895
896 If a timedelta, str, or offset, the time period of each window. Each
897 window will be a variable sized based on the observations included in
898 the time-period. This is only valid for datetimelike indexes.
899 To learn more about the offsets & frequency strings, please see `this link
900 <https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#offset-aliases>`__.
901
902 If a BaseIndexer subclass, the window boundaries
903 based on the defined ``get_window_bounds`` method. Additional rolling
904 keyword arguments, namely ``min_periods``, ``center``, ``closed`` and
905 ``step`` will be passed to ``get_window_bounds``.
906
907 min_periods : int, default None
908 Minimum number of observations in window required to have a value;
909 otherwise, result is ``np.nan``.
910
911 For a window that is specified by an offset, ``min_periods`` will default to 1.
912
913 For a window that is specified by an integer, ``min_periods`` will default
914 to the size of the window.
915
916 center : bool, default False
917 If False, set the window labels as the right edge of the window index.
918
919 If True, set the window labels as the center of the window index.
920
921 win_type : str, default None
922 If ``None``, all points are evenly weighted.
923
924 If a string, it must be a valid `scipy.signal window function
925 <https://docs.scipy.org/doc/scipy/reference/signal.windows.html#module-scipy.signal.windows>`__.
926
927 Certain Scipy window types require additional parameters to be passed
928 in the aggregation function. The additional parameters must match
929 the keywords specified in the Scipy window type method signature.
930
931 on : str, optional
932 For a DataFrame, a column label or Index level on which
933 to calculate the rolling window, rather than the DataFrame's index.
934
935 Provided integer column is ignored and excluded from result since
936 an integer index is not used to calculate the rolling window.
937
938 axis : int or str, default 0
939 If ``0`` or ``'index'``, roll across the rows.
940
941 If ``1`` or ``'columns'``, roll across the columns.
942
943 For `Series` this parameter is unused and defaults to 0.
944
945 .. deprecated:: 2.1.0
946
947 The axis keyword is deprecated. For ``axis=1``,
948 transpose the DataFrame first instead.
949
950 closed : str, default None
951 If ``'right'``, the first point in the window is excluded from calculations.
952
953 If ``'left'``, the last point in the window is excluded from calculations.
954
955 If ``'both'``, the no points in the window are excluded from calculations.
956
957 If ``'neither'``, the first and last points in the window are excluded
958 from calculations.
959
960 Default ``None`` (``'right'``).
961
962 step : int, default None
963
964 .. versionadded:: 1.5.0
965
966 Evaluate the window at every ``step`` result, equivalent to slicing as
967 ``[::step]``. ``window`` must be an integer. Using a step argument other
968 than None or 1 will produce a result with a different shape than the input.
969
970 method : str {'single', 'table'}, default 'single'
971
972 .. versionadded:: 1.3.0
973
974 Execute the rolling operation per single column or row (``'single'``)
975 or over the entire object (``'table'``).
976
977 This argument is only implemented when specifying ``engine='numba'``
978 in the method call.
979
980 Returns
981 -------
982 pandas.api.typing.Window or pandas.api.typing.Rolling
983 An instance of Window is returned if ``win_type`` is passed. Otherwise,
984 an instance of Rolling is returned.
985
986 See Also
987 --------
988 expanding : Provides expanding transformations.
989 ewm : Provides exponential weighted functions.
990
991 Notes
992 -----
993 See :ref:`Windowing Operations <window.generic>` for further usage details
994 and examples.
995
996 Examples
997 --------
998 >>> df = pd.DataFrame({'B': [0, 1, 2, np.nan, 4]})
999 >>> df
1000 B
1001 0 0.0
1002 1 1.0
1003 2 2.0
1004 3 NaN
1005 4 4.0
1006
1007 **window**
1008
1009 Rolling sum with a window length of 2 observations.
1010
1011 >>> df.rolling(2).sum()
1012 B
1013 0 NaN
1014 1 1.0
1015 2 3.0
1016 3 NaN
1017 4 NaN
1018
1019 Rolling sum with a window span of 2 seconds.
1020
1021 >>> df_time = pd.DataFrame({'B': [0, 1, 2, np.nan, 4]},
1022 ... index=[pd.Timestamp('20130101 09:00:00'),
1023 ... pd.Timestamp('20130101 09:00:02'),
1024 ... pd.Timestamp('20130101 09:00:03'),
1025 ... pd.Timestamp('20130101 09:00:05'),
1026 ... pd.Timestamp('20130101 09:00:06')])
1027
1028 >>> df_time
1029 B
1030 2013-01-01 09:00:00 0.0
1031 2013-01-01 09:00:02 1.0
1032 2013-01-01 09:00:03 2.0
1033 2013-01-01 09:00:05 NaN
1034 2013-01-01 09:00:06 4.0
1035
1036 >>> df_time.rolling('2s').sum()
1037 B
1038 2013-01-01 09:00:00 0.0
1039 2013-01-01 09:00:02 1.0
1040 2013-01-01 09:00:03 3.0
1041 2013-01-01 09:00:05 NaN
1042 2013-01-01 09:00:06 4.0
1043
1044 Rolling sum with forward looking windows with 2 observations.
1045
1046 >>> indexer = pd.api.indexers.FixedForwardWindowIndexer(window_size=2)
1047 >>> df.rolling(window=indexer, min_periods=1).sum()
1048 B
1049 0 1.0
1050 1 3.0
1051 2 2.0
1052 3 4.0
1053 4 4.0
1054
1055 **min_periods**
1056
1057 Rolling sum with a window length of 2 observations, but only needs a minimum of 1
1058 observation to calculate a value.
1059
1060 >>> df.rolling(2, min_periods=1).sum()
1061 B
1062 0 0.0
1063 1 1.0
1064 2 3.0
1065 3 2.0
1066 4 4.0
1067
1068 **center**
1069
1070 Rolling sum with the result assigned to the center of the window index.
1071
1072 >>> df.rolling(3, min_periods=1, center=True).sum()
1073 B
1074 0 1.0
1075 1 3.0
1076 2 3.0
1077 3 6.0
1078 4 4.0
1079
1080 >>> df.rolling(3, min_periods=1, center=False).sum()
1081 B
1082 0 0.0
1083 1 1.0
1084 2 3.0
1085 3 3.0
1086 4 6.0
1087
1088 **step**
1089
1090 Rolling sum with a window length of 2 observations, minimum of 1 observation to
1091 calculate a value, and a step of 2.
1092
1093 >>> df.rolling(2, min_periods=1, step=2).sum()
1094 B
1095 0 0.0
1096 2 3.0
1097 4 4.0
1098
1099 **win_type**
1100
1101 Rolling sum with a window length of 2, using the Scipy ``'gaussian'``
1102 window type. ``std`` is required in the aggregation function.
1103
1104 >>> df.rolling(2, win_type='gaussian').sum(std=3)
1105 B
1106 0 NaN
1107 1 0.986207
1108 2 2.958621
1109 3 NaN
1110 4 NaN
1111
1112 **on**
1113
1114 Rolling sum with a window length of 2 days.
1115
1116 >>> df = pd.DataFrame({
1117 ... 'A': [pd.to_datetime('2020-01-01'),
1118 ... pd.to_datetime('2020-01-01'),
1119 ... pd.to_datetime('2020-01-02'),],
1120 ... 'B': [1, 2, 3], },
1121 ... index=pd.date_range('2020', periods=3))
1122
1123 >>> df
1124 A B
1125 2020-01-01 2020-01-01 1
1126 2020-01-02 2020-01-01 2
1127 2020-01-03 2020-01-02 3
1128
1129 >>> df.rolling('2D', on='A').sum()
1130 A B
1131 2020-01-01 2020-01-01 1.0
1132 2020-01-02 2020-01-01 3.0
1133 2020-01-03 2020-01-02 6.0
1134 """
1135
1136 _attributes = [
1137 "window",
1138 "min_periods",
1139 "center",
1140 "win_type",
1141 "axis",
1142 "on",
1143 "closed",
1144 "step",
1145 "method",
1146 ]
1147
1148 def _validate(self):
1149 super()._validate()
1150
1151 if not isinstance(self.win_type, str):
1152 raise ValueError(f"Invalid win_type {self.win_type}")
1153 signal = import_optional_dependency(
1154 "scipy.signal.windows", extra="Scipy is required to generate window weight."
1155 )
1156 self._scipy_weight_generator = getattr(signal, self.win_type, None)
1157 if self._scipy_weight_generator is None:
1158 raise ValueError(f"Invalid win_type {self.win_type}")
1159
1160 if isinstance(self.window, BaseIndexer):
1161 raise NotImplementedError(
1162 "BaseIndexer subclasses not implemented with win_types."
1163 )
1164 if not is_integer(self.window) or self.window < 0:
1165 raise ValueError("window must be an integer 0 or greater")
1166
1167 if self.method != "single":
1168 raise NotImplementedError("'single' is the only supported method type.")
1169
1170 def _center_window(self, result: np.ndarray, offset: int) -> np.ndarray:
1171 """
1172 Center the result in the window for weighted rolling aggregations.
1173 """
1174 if offset > 0:
1175 lead_indexer = [slice(offset, None)]
1176 result = np.copy(result[tuple(lead_indexer)])
1177 return result
1178
1179 def _apply(
1180 self,
1181 func: Callable[[np.ndarray, int, int], np.ndarray],
1182 name: str,
1183 numeric_only: bool = False,
1184 numba_args: tuple[Any, ...] = (),
1185 **kwargs,
1186 ):
1187 """
1188 Rolling with weights statistical measure using supplied function.
1189
1190 Designed to be used with passed-in Cython array-based functions.
1191
1192 Parameters
1193 ----------
1194 func : callable function to apply
1195 name : str,
1196 numeric_only : bool, default False
1197 Whether to only operate on bool, int, and float columns
1198 numba_args : tuple
1199 unused
1200 **kwargs
1201 additional arguments for scipy windows if necessary
1202
1203 Returns
1204 -------
1205 y : type of input
1206 """
1207 # "None" not callable [misc]
1208 window = self._scipy_weight_generator( # type: ignore[misc]
1209 self.window, **kwargs
1210 )
1211 offset = (len(window) - 1) // 2 if self.center else 0
1212
1213 def homogeneous_func(values: np.ndarray):
1214 # calculation function
1215
1216 if values.size == 0:
1217 return values.copy()
1218
1219 def calc(x):
1220 additional_nans = np.array([np.nan] * offset)
1221 x = np.concatenate((x, additional_nans))
1222 return func(
1223 x,
1224 window,
1225 self.min_periods if self.min_periods is not None else len(window),
1226 )
1227
1228 with np.errstate(all="ignore"):
1229 # Our weighted aggregations return memoryviews
1230 result = np.asarray(calc(values))
1231
1232 if self.center:
1233 result = self._center_window(result, offset)
1234
1235 return result
1236
1237 return self._apply_columnwise(homogeneous_func, name, numeric_only)[
1238 :: self.step
1239 ]
1240
1241 @doc(
1242 _shared_docs["aggregate"],
1243 see_also=dedent(
1244 """
1245 See Also
1246 --------
1247 pandas.DataFrame.aggregate : Similar DataFrame method.
1248 pandas.Series.aggregate : Similar Series method.
1249 """
1250 ),
1251 examples=dedent(
1252 """
1253 Examples
1254 --------
1255 >>> df = pd.DataFrame({"A": [1, 2, 3], "B": [4, 5, 6], "C": [7, 8, 9]})
1256 >>> df
1257 A B C
1258 0 1 4 7
1259 1 2 5 8
1260 2 3 6 9
1261
1262 >>> df.rolling(2, win_type="boxcar").agg("mean")
1263 A B C
1264 0 NaN NaN NaN
1265 1 1.5 4.5 7.5
1266 2 2.5 5.5 8.5
1267 """
1268 ),
1269 klass="Series/DataFrame",
1270 axis="",
1271 )
1272 def aggregate(self, func, *args, **kwargs):
1273 result = ResamplerWindowApply(self, func, args=args, kwargs=kwargs).agg()
1274 if result is None:
1275 # these must apply directly
1276 result = func(self)
1277
1278 return result
1279
1280 agg = aggregate
1281
1282 @doc(
1283 template_header,
1284 create_section_header("Parameters"),
1285 kwargs_numeric_only,
1286 kwargs_scipy,
1287 create_section_header("Returns"),
1288 template_returns,
1289 create_section_header("See Also"),
1290 template_see_also,
1291 create_section_header("Examples"),
1292 dedent(
1293 """\
1294 >>> ser = pd.Series([0, 1, 5, 2, 8])
1295
1296 To get an instance of :class:`~pandas.core.window.rolling.Window` we need
1297 to pass the parameter `win_type`.
1298
1299 >>> type(ser.rolling(2, win_type='gaussian'))
1300 <class 'pandas.core.window.rolling.Window'>
1301
1302 In order to use the `SciPy` Gaussian window we need to provide the parameters
1303 `M` and `std`. The parameter `M` corresponds to 2 in our example.
1304 We pass the second parameter `std` as a parameter of the following method
1305 (`sum` in this case):
1306
1307 >>> ser.rolling(2, win_type='gaussian').sum(std=3)
1308 0 NaN
1309 1 0.986207
1310 2 5.917243
1311 3 6.903450
1312 4 9.862071
1313 dtype: float64
1314 """
1315 ),
1316 window_method="rolling",
1317 aggregation_description="weighted window sum",
1318 agg_method="sum",
1319 )
1320 def sum(self, numeric_only: bool = False, **kwargs):
1321 window_func = window_aggregations.roll_weighted_sum
1322 # error: Argument 1 to "_apply" of "Window" has incompatible type
1323 # "Callable[[ndarray, ndarray, int], ndarray]"; expected
1324 # "Callable[[ndarray, int, int], ndarray]"
1325 return self._apply(
1326 window_func, # type: ignore[arg-type]
1327 name="sum",
1328 numeric_only=numeric_only,
1329 **kwargs,
1330 )
1331
1332 @doc(
1333 template_header,
1334 create_section_header("Parameters"),
1335 kwargs_numeric_only,
1336 kwargs_scipy,
1337 create_section_header("Returns"),
1338 template_returns,
1339 create_section_header("See Also"),
1340 template_see_also,
1341 create_section_header("Examples"),
1342 dedent(
1343 """\
1344 >>> ser = pd.Series([0, 1, 5, 2, 8])
1345
1346 To get an instance of :class:`~pandas.core.window.rolling.Window` we need
1347 to pass the parameter `win_type`.
1348
1349 >>> type(ser.rolling(2, win_type='gaussian'))
1350 <class 'pandas.core.window.rolling.Window'>
1351
1352 In order to use the `SciPy` Gaussian window we need to provide the parameters
1353 `M` and `std`. The parameter `M` corresponds to 2 in our example.
1354 We pass the second parameter `std` as a parameter of the following method:
1355
1356 >>> ser.rolling(2, win_type='gaussian').mean(std=3)
1357 0 NaN
1358 1 0.5
1359 2 3.0
1360 3 3.5
1361 4 5.0
1362 dtype: float64
1363 """
1364 ),
1365 window_method="rolling",
1366 aggregation_description="weighted window mean",
1367 agg_method="mean",
1368 )
1369 def mean(self, numeric_only: bool = False, **kwargs):
1370 window_func = window_aggregations.roll_weighted_mean
1371 # error: Argument 1 to "_apply" of "Window" has incompatible type
1372 # "Callable[[ndarray, ndarray, int], ndarray]"; expected
1373 # "Callable[[ndarray, int, int], ndarray]"
1374 return self._apply(
1375 window_func, # type: ignore[arg-type]
1376 name="mean",
1377 numeric_only=numeric_only,
1378 **kwargs,
1379 )
1380
1381 @doc(
1382 template_header,
1383 create_section_header("Parameters"),
1384 kwargs_numeric_only,
1385 kwargs_scipy,
1386 create_section_header("Returns"),
1387 template_returns,
1388 create_section_header("See Also"),
1389 template_see_also,
1390 create_section_header("Examples"),
1391 dedent(
1392 """\
1393 >>> ser = pd.Series([0, 1, 5, 2, 8])
1394
1395 To get an instance of :class:`~pandas.core.window.rolling.Window` we need
1396 to pass the parameter `win_type`.
1397
1398 >>> type(ser.rolling(2, win_type='gaussian'))
1399 <class 'pandas.core.window.rolling.Window'>
1400
1401 In order to use the `SciPy` Gaussian window we need to provide the parameters
1402 `M` and `std`. The parameter `M` corresponds to 2 in our example.
1403 We pass the second parameter `std` as a parameter of the following method:
1404
1405 >>> ser.rolling(2, win_type='gaussian').var(std=3)
1406 0 NaN
1407 1 0.5
1408 2 8.0
1409 3 4.5
1410 4 18.0
1411 dtype: float64
1412 """
1413 ),
1414 window_method="rolling",
1415 aggregation_description="weighted window variance",
1416 agg_method="var",
1417 )
1418 def var(self, ddof: int = 1, numeric_only: bool = False, **kwargs):
1419 window_func = partial(window_aggregations.roll_weighted_var, ddof=ddof)
1420 kwargs.pop("name", None)
1421 return self._apply(window_func, name="var", numeric_only=numeric_only, **kwargs)
1422
1423 @doc(
1424 template_header,
1425 create_section_header("Parameters"),
1426 kwargs_numeric_only,
1427 kwargs_scipy,
1428 create_section_header("Returns"),
1429 template_returns,
1430 create_section_header("See Also"),
1431 template_see_also,
1432 create_section_header("Examples"),
1433 dedent(
1434 """\
1435 >>> ser = pd.Series([0, 1, 5, 2, 8])
1436
1437 To get an instance of :class:`~pandas.core.window.rolling.Window` we need
1438 to pass the parameter `win_type`.
1439
1440 >>> type(ser.rolling(2, win_type='gaussian'))
1441 <class 'pandas.core.window.rolling.Window'>
1442
1443 In order to use the `SciPy` Gaussian window we need to provide the parameters
1444 `M` and `std`. The parameter `M` corresponds to 2 in our example.
1445 We pass the second parameter `std` as a parameter of the following method:
1446
1447 >>> ser.rolling(2, win_type='gaussian').std(std=3)
1448 0 NaN
1449 1 0.707107
1450 2 2.828427
1451 3 2.121320
1452 4 4.242641
1453 dtype: float64
1454 """
1455 ),
1456 window_method="rolling",
1457 aggregation_description="weighted window standard deviation",
1458 agg_method="std",
1459 )
1460 def std(self, ddof: int = 1, numeric_only: bool = False, **kwargs):
1461 return zsqrt(
1462 self.var(ddof=ddof, name="std", numeric_only=numeric_only, **kwargs)
1463 )
1464
1465
1466class RollingAndExpandingMixin(BaseWindow):
1467 def count(self, numeric_only: bool = False):
1468 window_func = window_aggregations.roll_sum
1469 return self._apply(window_func, name="count", numeric_only=numeric_only)
1470
1471 def apply(
1472 self,
1473 func: Callable[..., Any],
1474 raw: bool = False,
1475 engine: Literal["cython", "numba"] | None = None,
1476 engine_kwargs: dict[str, bool] | None = None,
1477 args: tuple[Any, ...] | None = None,
1478 kwargs: dict[str, Any] | None = None,
1479 ):
1480 if args is None:
1481 args = ()
1482 if kwargs is None:
1483 kwargs = {}
1484
1485 if not is_bool(raw):
1486 raise ValueError("raw parameter must be `True` or `False`")
1487
1488 numba_args: tuple[Any, ...] = ()
1489 if maybe_use_numba(engine):
1490 if raw is False:
1491 raise ValueError("raw must be `True` when using the numba engine")
1492 numba_args = args
1493 if self.method == "single":
1494 apply_func = generate_numba_apply_func(
1495 func, **get_jit_arguments(engine_kwargs, kwargs)
1496 )
1497 else:
1498 apply_func = generate_numba_table_func(
1499 func, **get_jit_arguments(engine_kwargs, kwargs)
1500 )
1501 elif engine in ("cython", None):
1502 if engine_kwargs is not None:
1503 raise ValueError("cython engine does not accept engine_kwargs")
1504 apply_func = self._generate_cython_apply_func(args, kwargs, raw, func)
1505 else:
1506 raise ValueError("engine must be either 'numba' or 'cython'")
1507
1508 return self._apply(
1509 apply_func,
1510 name="apply",
1511 numba_args=numba_args,
1512 )
1513
1514 def _generate_cython_apply_func(
1515 self,
1516 args: tuple[Any, ...],
1517 kwargs: dict[str, Any],
1518 raw: bool | np.bool_,
1519 function: Callable[..., Any],
1520 ) -> Callable[[np.ndarray, np.ndarray, np.ndarray, int], np.ndarray]:
1521 from pandas import Series
1522
1523 window_func = partial(
1524 window_aggregations.roll_apply,
1525 args=args,
1526 kwargs=kwargs,
1527 raw=raw,
1528 function=function,
1529 )
1530
1531 def apply_func(values, begin, end, min_periods, raw=raw):
1532 if not raw:
1533 # GH 45912
1534 values = Series(values, index=self._on, copy=False)
1535 return window_func(values, begin, end, min_periods)
1536
1537 return apply_func
1538
1539 def sum(
1540 self,
1541 numeric_only: bool = False,
1542 engine: Literal["cython", "numba"] | None = None,
1543 engine_kwargs: dict[str, bool] | None = None,
1544 ):
1545 if maybe_use_numba(engine):
1546 if self.method == "table":
1547 func = generate_manual_numpy_nan_agg_with_axis(np.nansum)
1548 return self.apply(
1549 func,
1550 raw=True,
1551 engine=engine,
1552 engine_kwargs=engine_kwargs,
1553 )
1554 else:
1555 from pandas.core._numba.kernels import sliding_sum
1556
1557 return self._numba_apply(sliding_sum, engine_kwargs)
1558 window_func = window_aggregations.roll_sum
1559 return self._apply(window_func, name="sum", numeric_only=numeric_only)
1560
1561 def max(
1562 self,
1563 numeric_only: bool = False,
1564 engine: Literal["cython", "numba"] | None = None,
1565 engine_kwargs: dict[str, bool] | None = None,
1566 ):
1567 if maybe_use_numba(engine):
1568 if self.method == "table":
1569 func = generate_manual_numpy_nan_agg_with_axis(np.nanmax)
1570 return self.apply(
1571 func,
1572 raw=True,
1573 engine=engine,
1574 engine_kwargs=engine_kwargs,
1575 )
1576 else:
1577 from pandas.core._numba.kernels import sliding_min_max
1578
1579 return self._numba_apply(sliding_min_max, engine_kwargs, is_max=True)
1580 window_func = window_aggregations.roll_max
1581 return self._apply(window_func, name="max", numeric_only=numeric_only)
1582
1583 def min(
1584 self,
1585 numeric_only: bool = False,
1586 engine: Literal["cython", "numba"] | None = None,
1587 engine_kwargs: dict[str, bool] | None = None,
1588 ):
1589 if maybe_use_numba(engine):
1590 if self.method == "table":
1591 func = generate_manual_numpy_nan_agg_with_axis(np.nanmin)
1592 return self.apply(
1593 func,
1594 raw=True,
1595 engine=engine,
1596 engine_kwargs=engine_kwargs,
1597 )
1598 else:
1599 from pandas.core._numba.kernels import sliding_min_max
1600
1601 return self._numba_apply(sliding_min_max, engine_kwargs, is_max=False)
1602 window_func = window_aggregations.roll_min
1603 return self._apply(window_func, name="min", numeric_only=numeric_only)
1604
1605 def mean(
1606 self,
1607 numeric_only: bool = False,
1608 engine: Literal["cython", "numba"] | None = None,
1609 engine_kwargs: dict[str, bool] | None = None,
1610 ):
1611 if maybe_use_numba(engine):
1612 if self.method == "table":
1613 func = generate_manual_numpy_nan_agg_with_axis(np.nanmean)
1614 return self.apply(
1615 func,
1616 raw=True,
1617 engine=engine,
1618 engine_kwargs=engine_kwargs,
1619 )
1620 else:
1621 from pandas.core._numba.kernels import sliding_mean
1622
1623 return self._numba_apply(sliding_mean, engine_kwargs)
1624 window_func = window_aggregations.roll_mean
1625 return self._apply(window_func, name="mean", numeric_only=numeric_only)
1626
1627 def median(
1628 self,
1629 numeric_only: bool = False,
1630 engine: Literal["cython", "numba"] | None = None,
1631 engine_kwargs: dict[str, bool] | None = None,
1632 ):
1633 if maybe_use_numba(engine):
1634 if self.method == "table":
1635 func = generate_manual_numpy_nan_agg_with_axis(np.nanmedian)
1636 else:
1637 func = np.nanmedian
1638
1639 return self.apply(
1640 func,
1641 raw=True,
1642 engine=engine,
1643 engine_kwargs=engine_kwargs,
1644 )
1645 window_func = window_aggregations.roll_median_c
1646 return self._apply(window_func, name="median", numeric_only=numeric_only)
1647
1648 def std(
1649 self,
1650 ddof: int = 1,
1651 numeric_only: bool = False,
1652 engine: Literal["cython", "numba"] | None = None,
1653 engine_kwargs: dict[str, bool] | None = None,
1654 ):
1655 if maybe_use_numba(engine):
1656 if self.method == "table":
1657 raise NotImplementedError("std not supported with method='table'")
1658 from pandas.core._numba.kernels import sliding_var
1659
1660 return zsqrt(self._numba_apply(sliding_var, engine_kwargs, ddof=ddof))
1661 window_func = window_aggregations.roll_var
1662
1663 def zsqrt_func(values, begin, end, min_periods):
1664 return zsqrt(window_func(values, begin, end, min_periods, ddof=ddof))
1665
1666 return self._apply(
1667 zsqrt_func,
1668 name="std",
1669 numeric_only=numeric_only,
1670 )
1671
1672 def var(
1673 self,
1674 ddof: int = 1,
1675 numeric_only: bool = False,
1676 engine: Literal["cython", "numba"] | None = None,
1677 engine_kwargs: dict[str, bool] | None = None,
1678 ):
1679 if maybe_use_numba(engine):
1680 if self.method == "table":
1681 raise NotImplementedError("var not supported with method='table'")
1682 from pandas.core._numba.kernels import sliding_var
1683
1684 return self._numba_apply(sliding_var, engine_kwargs, ddof=ddof)
1685 window_func = partial(window_aggregations.roll_var, ddof=ddof)
1686 return self._apply(
1687 window_func,
1688 name="var",
1689 numeric_only=numeric_only,
1690 )
1691
1692 def skew(self, numeric_only: bool = False):
1693 window_func = window_aggregations.roll_skew
1694 return self._apply(
1695 window_func,
1696 name="skew",
1697 numeric_only=numeric_only,
1698 )
1699
1700 def sem(self, ddof: int = 1, numeric_only: bool = False):
1701 # Raise here so error message says sem instead of std
1702 self._validate_numeric_only("sem", numeric_only)
1703 return self.std(numeric_only=numeric_only) / (
1704 self.count(numeric_only=numeric_only) - ddof
1705 ).pow(0.5)
1706
1707 def kurt(self, numeric_only: bool = False):
1708 window_func = window_aggregations.roll_kurt
1709 return self._apply(
1710 window_func,
1711 name="kurt",
1712 numeric_only=numeric_only,
1713 )
1714
1715 def quantile(
1716 self,
1717 q: float,
1718 interpolation: QuantileInterpolation = "linear",
1719 numeric_only: bool = False,
1720 ):
1721 if q == 1.0:
1722 window_func = window_aggregations.roll_max
1723 elif q == 0.0:
1724 window_func = window_aggregations.roll_min
1725 else:
1726 window_func = partial(
1727 window_aggregations.roll_quantile,
1728 quantile=q,
1729 interpolation=interpolation,
1730 )
1731
1732 return self._apply(window_func, name="quantile", numeric_only=numeric_only)
1733
1734 def rank(
1735 self,
1736 method: WindowingRankType = "average",
1737 ascending: bool = True,
1738 pct: bool = False,
1739 numeric_only: bool = False,
1740 ):
1741 window_func = partial(
1742 window_aggregations.roll_rank,
1743 method=method,
1744 ascending=ascending,
1745 percentile=pct,
1746 )
1747
1748 return self._apply(window_func, name="rank", numeric_only=numeric_only)
1749
1750 def cov(
1751 self,
1752 other: DataFrame | Series | None = None,
1753 pairwise: bool | None = None,
1754 ddof: int = 1,
1755 numeric_only: bool = False,
1756 ):
1757 if self.step is not None:
1758 raise NotImplementedError("step not implemented for cov")
1759 self._validate_numeric_only("cov", numeric_only)
1760
1761 from pandas import Series
1762
1763 def cov_func(x, y):
1764 x_array = self._prep_values(x)
1765 y_array = self._prep_values(y)
1766 window_indexer = self._get_window_indexer()
1767 min_periods = (
1768 self.min_periods
1769 if self.min_periods is not None
1770 else window_indexer.window_size
1771 )
1772 start, end = window_indexer.get_window_bounds(
1773 num_values=len(x_array),
1774 min_periods=min_periods,
1775 center=self.center,
1776 closed=self.closed,
1777 step=self.step,
1778 )
1779 self._check_window_bounds(start, end, len(x_array))
1780
1781 with np.errstate(all="ignore"):
1782 mean_x_y = window_aggregations.roll_mean(
1783 x_array * y_array, start, end, min_periods
1784 )
1785 mean_x = window_aggregations.roll_mean(x_array, start, end, min_periods)
1786 mean_y = window_aggregations.roll_mean(y_array, start, end, min_periods)
1787 count_x_y = window_aggregations.roll_sum(
1788 notna(x_array + y_array).astype(np.float64), start, end, 0
1789 )
1790 result = (mean_x_y - mean_x * mean_y) * (count_x_y / (count_x_y - ddof))
1791 return Series(result, index=x.index, name=x.name, copy=False)
1792
1793 return self._apply_pairwise(
1794 self._selected_obj, other, pairwise, cov_func, numeric_only
1795 )
1796
1797 def corr(
1798 self,
1799 other: DataFrame | Series | None = None,
1800 pairwise: bool | None = None,
1801 ddof: int = 1,
1802 numeric_only: bool = False,
1803 ):
1804 if self.step is not None:
1805 raise NotImplementedError("step not implemented for corr")
1806 self._validate_numeric_only("corr", numeric_only)
1807
1808 from pandas import Series
1809
1810 def corr_func(x, y):
1811 x_array = self._prep_values(x)
1812 y_array = self._prep_values(y)
1813 window_indexer = self._get_window_indexer()
1814 min_periods = (
1815 self.min_periods
1816 if self.min_periods is not None
1817 else window_indexer.window_size
1818 )
1819 start, end = window_indexer.get_window_bounds(
1820 num_values=len(x_array),
1821 min_periods=min_periods,
1822 center=self.center,
1823 closed=self.closed,
1824 step=self.step,
1825 )
1826 self._check_window_bounds(start, end, len(x_array))
1827
1828 with np.errstate(all="ignore"):
1829 mean_x_y = window_aggregations.roll_mean(
1830 x_array * y_array, start, end, min_periods
1831 )
1832 mean_x = window_aggregations.roll_mean(x_array, start, end, min_periods)
1833 mean_y = window_aggregations.roll_mean(y_array, start, end, min_periods)
1834 count_x_y = window_aggregations.roll_sum(
1835 notna(x_array + y_array).astype(np.float64), start, end, 0
1836 )
1837 x_var = window_aggregations.roll_var(
1838 x_array, start, end, min_periods, ddof
1839 )
1840 y_var = window_aggregations.roll_var(
1841 y_array, start, end, min_periods, ddof
1842 )
1843 numerator = (mean_x_y - mean_x * mean_y) * (
1844 count_x_y / (count_x_y - ddof)
1845 )
1846 denominator = (x_var * y_var) ** 0.5
1847 result = numerator / denominator
1848 return Series(result, index=x.index, name=x.name, copy=False)
1849
1850 return self._apply_pairwise(
1851 self._selected_obj, other, pairwise, corr_func, numeric_only
1852 )
1853
1854
1855class Rolling(RollingAndExpandingMixin):
1856 _attributes: list[str] = [
1857 "window",
1858 "min_periods",
1859 "center",
1860 "win_type",
1861 "axis",
1862 "on",
1863 "closed",
1864 "step",
1865 "method",
1866 ]
1867
1868 def _validate(self):
1869 super()._validate()
1870
1871 # we allow rolling on a datetimelike index
1872 if (
1873 self.obj.empty
1874 or isinstance(self._on, (DatetimeIndex, TimedeltaIndex, PeriodIndex))
1875 or (isinstance(self._on.dtype, ArrowDtype) and self._on.dtype.kind in "mM")
1876 ) and isinstance(self.window, (str, BaseOffset, timedelta)):
1877 self._validate_datetimelike_monotonic()
1878
1879 # this will raise ValueError on non-fixed freqs
1880 try:
1881 freq = to_offset(self.window)
1882 except (TypeError, ValueError) as err:
1883 raise ValueError(
1884 f"passed window {self.window} is not "
1885 "compatible with a datetimelike index"
1886 ) from err
1887 if isinstance(self._on, PeriodIndex):
1888 # error: Incompatible types in assignment (expression has type
1889 # "float", variable has type "Optional[int]")
1890 self._win_freq_i8 = freq.nanos / ( # type: ignore[assignment]
1891 self._on.freq.nanos / self._on.freq.n
1892 )
1893 else:
1894 try:
1895 unit = dtype_to_unit(self._on.dtype) # type: ignore[arg-type]
1896 except TypeError:
1897 # if not a datetime dtype, eg for empty dataframes
1898 unit = "ns"
1899 self._win_freq_i8 = Timedelta(freq.nanos).as_unit(unit)._value
1900
1901 # min_periods must be an integer
1902 if self.min_periods is None:
1903 self.min_periods = 1
1904
1905 if self.step is not None:
1906 raise NotImplementedError(
1907 "step is not supported with frequency windows"
1908 )
1909
1910 elif isinstance(self.window, BaseIndexer):
1911 # Passed BaseIndexer subclass should handle all other rolling kwargs
1912 pass
1913 elif not is_integer(self.window) or self.window < 0:
1914 raise ValueError("window must be an integer 0 or greater")
1915
1916 def _validate_datetimelike_monotonic(self) -> None:
1917 """
1918 Validate self._on is monotonic (increasing or decreasing) and has
1919 no NaT values for frequency windows.
1920 """
1921 if self._on.hasnans:
1922 self._raise_monotonic_error("values must not have NaT")
1923 if not (self._on.is_monotonic_increasing or self._on.is_monotonic_decreasing):
1924 self._raise_monotonic_error("values must be monotonic")
1925
1926 def _raise_monotonic_error(self, msg: str):
1927 on = self.on
1928 if on is None:
1929 if self.axis == 0:
1930 on = "index"
1931 else:
1932 on = "column"
1933 raise ValueError(f"{on} {msg}")
1934
1935 @doc(
1936 _shared_docs["aggregate"],
1937 see_also=dedent(
1938 """
1939 See Also
1940 --------
1941 pandas.Series.rolling : Calling object with Series data.
1942 pandas.DataFrame.rolling : Calling object with DataFrame data.
1943 """
1944 ),
1945 examples=dedent(
1946 """
1947 Examples
1948 --------
1949 >>> df = pd.DataFrame({"A": [1, 2, 3], "B": [4, 5, 6], "C": [7, 8, 9]})
1950 >>> df
1951 A B C
1952 0 1 4 7
1953 1 2 5 8
1954 2 3 6 9
1955
1956 >>> df.rolling(2).sum()
1957 A B C
1958 0 NaN NaN NaN
1959 1 3.0 9.0 15.0
1960 2 5.0 11.0 17.0
1961
1962 >>> df.rolling(2).agg({"A": "sum", "B": "min"})
1963 A B
1964 0 NaN NaN
1965 1 3.0 4.0
1966 2 5.0 5.0
1967 """
1968 ),
1969 klass="Series/Dataframe",
1970 axis="",
1971 )
1972 def aggregate(self, func, *args, **kwargs):
1973 return super().aggregate(func, *args, **kwargs)
1974
1975 agg = aggregate
1976
1977 @doc(
1978 template_header,
1979 create_section_header("Parameters"),
1980 kwargs_numeric_only,
1981 create_section_header("Returns"),
1982 template_returns,
1983 create_section_header("See Also"),
1984 template_see_also,
1985 create_section_header("Examples"),
1986 dedent(
1987 """
1988 >>> s = pd.Series([2, 3, np.nan, 10])
1989 >>> s.rolling(2).count()
1990 0 NaN
1991 1 2.0
1992 2 1.0
1993 3 1.0
1994 dtype: float64
1995 >>> s.rolling(3).count()
1996 0 NaN
1997 1 NaN
1998 2 2.0
1999 3 2.0
2000 dtype: float64
2001 >>> s.rolling(4).count()
2002 0 NaN
2003 1 NaN
2004 2 NaN
2005 3 3.0
2006 dtype: float64
2007 """
2008 ).replace("\n", "", 1),
2009 window_method="rolling",
2010 aggregation_description="count of non NaN observations",
2011 agg_method="count",
2012 )
2013 def count(self, numeric_only: bool = False):
2014 return super().count(numeric_only)
2015
2016 @doc(
2017 template_header,
2018 create_section_header("Parameters"),
2019 window_apply_parameters,
2020 create_section_header("Returns"),
2021 template_returns,
2022 create_section_header("See Also"),
2023 template_see_also,
2024 create_section_header("Examples"),
2025 dedent(
2026 """\
2027 >>> ser = pd.Series([1, 6, 5, 4])
2028 >>> ser.rolling(2).apply(lambda s: s.sum() - s.min())
2029 0 NaN
2030 1 6.0
2031 2 6.0
2032 3 5.0
2033 dtype: float64
2034 """
2035 ),
2036 window_method="rolling",
2037 aggregation_description="custom aggregation function",
2038 agg_method="apply",
2039 )
2040 def apply(
2041 self,
2042 func: Callable[..., Any],
2043 raw: bool = False,
2044 engine: Literal["cython", "numba"] | None = None,
2045 engine_kwargs: dict[str, bool] | None = None,
2046 args: tuple[Any, ...] | None = None,
2047 kwargs: dict[str, Any] | None = None,
2048 ):
2049 return super().apply(
2050 func,
2051 raw=raw,
2052 engine=engine,
2053 engine_kwargs=engine_kwargs,
2054 args=args,
2055 kwargs=kwargs,
2056 )
2057
2058 @doc(
2059 template_header,
2060 create_section_header("Parameters"),
2061 kwargs_numeric_only,
2062 window_agg_numba_parameters(),
2063 create_section_header("Returns"),
2064 template_returns,
2065 create_section_header("See Also"),
2066 template_see_also,
2067 create_section_header("Notes"),
2068 numba_notes,
2069 create_section_header("Examples"),
2070 dedent(
2071 """
2072 >>> s = pd.Series([1, 2, 3, 4, 5])
2073 >>> s
2074 0 1
2075 1 2
2076 2 3
2077 3 4
2078 4 5
2079 dtype: int64
2080
2081 >>> s.rolling(3).sum()
2082 0 NaN
2083 1 NaN
2084 2 6.0
2085 3 9.0
2086 4 12.0
2087 dtype: float64
2088
2089 >>> s.rolling(3, center=True).sum()
2090 0 NaN
2091 1 6.0
2092 2 9.0
2093 3 12.0
2094 4 NaN
2095 dtype: float64
2096
2097 For DataFrame, each sum is computed column-wise.
2098
2099 >>> df = pd.DataFrame({{"A": s, "B": s ** 2}})
2100 >>> df
2101 A B
2102 0 1 1
2103 1 2 4
2104 2 3 9
2105 3 4 16
2106 4 5 25
2107
2108 >>> df.rolling(3).sum()
2109 A B
2110 0 NaN NaN
2111 1 NaN NaN
2112 2 6.0 14.0
2113 3 9.0 29.0
2114 4 12.0 50.0
2115 """
2116 ).replace("\n", "", 1),
2117 window_method="rolling",
2118 aggregation_description="sum",
2119 agg_method="sum",
2120 )
2121 def sum(
2122 self,
2123 numeric_only: bool = False,
2124 engine: Literal["cython", "numba"] | None = None,
2125 engine_kwargs: dict[str, bool] | None = None,
2126 ):
2127 return super().sum(
2128 numeric_only=numeric_only,
2129 engine=engine,
2130 engine_kwargs=engine_kwargs,
2131 )
2132
2133 @doc(
2134 template_header,
2135 create_section_header("Parameters"),
2136 kwargs_numeric_only,
2137 window_agg_numba_parameters(),
2138 create_section_header("Returns"),
2139 template_returns,
2140 create_section_header("See Also"),
2141 template_see_also,
2142 create_section_header("Notes"),
2143 numba_notes,
2144 create_section_header("Examples"),
2145 dedent(
2146 """\
2147 >>> ser = pd.Series([1, 2, 3, 4])
2148 >>> ser.rolling(2).max()
2149 0 NaN
2150 1 2.0
2151 2 3.0
2152 3 4.0
2153 dtype: float64
2154 """
2155 ),
2156 window_method="rolling",
2157 aggregation_description="maximum",
2158 agg_method="max",
2159 )
2160 def max(
2161 self,
2162 numeric_only: bool = False,
2163 *args,
2164 engine: Literal["cython", "numba"] | None = None,
2165 engine_kwargs: dict[str, bool] | None = None,
2166 **kwargs,
2167 ):
2168 return super().max(
2169 numeric_only=numeric_only,
2170 engine=engine,
2171 engine_kwargs=engine_kwargs,
2172 )
2173
2174 @doc(
2175 template_header,
2176 create_section_header("Parameters"),
2177 kwargs_numeric_only,
2178 window_agg_numba_parameters(),
2179 create_section_header("Returns"),
2180 template_returns,
2181 create_section_header("See Also"),
2182 template_see_also,
2183 create_section_header("Notes"),
2184 numba_notes,
2185 create_section_header("Examples"),
2186 dedent(
2187 """
2188 Performing a rolling minimum with a window size of 3.
2189
2190 >>> s = pd.Series([4, 3, 5, 2, 6])
2191 >>> s.rolling(3).min()
2192 0 NaN
2193 1 NaN
2194 2 3.0
2195 3 2.0
2196 4 2.0
2197 dtype: float64
2198 """
2199 ).replace("\n", "", 1),
2200 window_method="rolling",
2201 aggregation_description="minimum",
2202 agg_method="min",
2203 )
2204 def min(
2205 self,
2206 numeric_only: bool = False,
2207 engine: Literal["cython", "numba"] | None = None,
2208 engine_kwargs: dict[str, bool] | None = None,
2209 ):
2210 return super().min(
2211 numeric_only=numeric_only,
2212 engine=engine,
2213 engine_kwargs=engine_kwargs,
2214 )
2215
2216 @doc(
2217 template_header,
2218 create_section_header("Parameters"),
2219 kwargs_numeric_only,
2220 window_agg_numba_parameters(),
2221 create_section_header("Returns"),
2222 template_returns,
2223 create_section_header("See Also"),
2224 template_see_also,
2225 create_section_header("Notes"),
2226 numba_notes,
2227 create_section_header("Examples"),
2228 dedent(
2229 """
2230 The below examples will show rolling mean calculations with window sizes of
2231 two and three, respectively.
2232
2233 >>> s = pd.Series([1, 2, 3, 4])
2234 >>> s.rolling(2).mean()
2235 0 NaN
2236 1 1.5
2237 2 2.5
2238 3 3.5
2239 dtype: float64
2240
2241 >>> s.rolling(3).mean()
2242 0 NaN
2243 1 NaN
2244 2 2.0
2245 3 3.0
2246 dtype: float64
2247 """
2248 ).replace("\n", "", 1),
2249 window_method="rolling",
2250 aggregation_description="mean",
2251 agg_method="mean",
2252 )
2253 def mean(
2254 self,
2255 numeric_only: bool = False,
2256 engine: Literal["cython", "numba"] | None = None,
2257 engine_kwargs: dict[str, bool] | None = None,
2258 ):
2259 return super().mean(
2260 numeric_only=numeric_only,
2261 engine=engine,
2262 engine_kwargs=engine_kwargs,
2263 )
2264
2265 @doc(
2266 template_header,
2267 create_section_header("Parameters"),
2268 kwargs_numeric_only,
2269 window_agg_numba_parameters(),
2270 create_section_header("Returns"),
2271 template_returns,
2272 create_section_header("See Also"),
2273 template_see_also,
2274 create_section_header("Notes"),
2275 numba_notes,
2276 create_section_header("Examples"),
2277 dedent(
2278 """
2279 Compute the rolling median of a series with a window size of 3.
2280
2281 >>> s = pd.Series([0, 1, 2, 3, 4])
2282 >>> s.rolling(3).median()
2283 0 NaN
2284 1 NaN
2285 2 1.0
2286 3 2.0
2287 4 3.0
2288 dtype: float64
2289 """
2290 ).replace("\n", "", 1),
2291 window_method="rolling",
2292 aggregation_description="median",
2293 agg_method="median",
2294 )
2295 def median(
2296 self,
2297 numeric_only: bool = False,
2298 engine: Literal["cython", "numba"] | None = None,
2299 engine_kwargs: dict[str, bool] | None = None,
2300 ):
2301 return super().median(
2302 numeric_only=numeric_only,
2303 engine=engine,
2304 engine_kwargs=engine_kwargs,
2305 )
2306
2307 @doc(
2308 template_header,
2309 create_section_header("Parameters"),
2310 dedent(
2311 """
2312 ddof : int, default 1
2313 Delta Degrees of Freedom. The divisor used in calculations
2314 is ``N - ddof``, where ``N`` represents the number of elements.
2315 """
2316 ).replace("\n", "", 1),
2317 kwargs_numeric_only,
2318 window_agg_numba_parameters("1.4"),
2319 create_section_header("Returns"),
2320 template_returns,
2321 create_section_header("See Also"),
2322 "numpy.std : Equivalent method for NumPy array.\n",
2323 template_see_also,
2324 create_section_header("Notes"),
2325 dedent(
2326 """
2327 The default ``ddof`` of 1 used in :meth:`Series.std` is different
2328 than the default ``ddof`` of 0 in :func:`numpy.std`.
2329
2330 A minimum of one period is required for the rolling calculation.\n
2331 """
2332 ).replace("\n", "", 1),
2333 create_section_header("Examples"),
2334 dedent(
2335 """
2336 >>> s = pd.Series([5, 5, 6, 7, 5, 5, 5])
2337 >>> s.rolling(3).std()
2338 0 NaN
2339 1 NaN
2340 2 0.577350
2341 3 1.000000
2342 4 1.000000
2343 5 1.154701
2344 6 0.000000
2345 dtype: float64
2346 """
2347 ).replace("\n", "", 1),
2348 window_method="rolling",
2349 aggregation_description="standard deviation",
2350 agg_method="std",
2351 )
2352 def std(
2353 self,
2354 ddof: int = 1,
2355 numeric_only: bool = False,
2356 engine: Literal["cython", "numba"] | None = None,
2357 engine_kwargs: dict[str, bool] | None = None,
2358 ):
2359 return super().std(
2360 ddof=ddof,
2361 numeric_only=numeric_only,
2362 engine=engine,
2363 engine_kwargs=engine_kwargs,
2364 )
2365
2366 @doc(
2367 template_header,
2368 create_section_header("Parameters"),
2369 dedent(
2370 """
2371 ddof : int, default 1
2372 Delta Degrees of Freedom. The divisor used in calculations
2373 is ``N - ddof``, where ``N`` represents the number of elements.
2374 """
2375 ).replace("\n", "", 1),
2376 kwargs_numeric_only,
2377 window_agg_numba_parameters("1.4"),
2378 create_section_header("Returns"),
2379 template_returns,
2380 create_section_header("See Also"),
2381 "numpy.var : Equivalent method for NumPy array.\n",
2382 template_see_also,
2383 create_section_header("Notes"),
2384 dedent(
2385 """
2386 The default ``ddof`` of 1 used in :meth:`Series.var` is different
2387 than the default ``ddof`` of 0 in :func:`numpy.var`.
2388
2389 A minimum of one period is required for the rolling calculation.\n
2390 """
2391 ).replace("\n", "", 1),
2392 create_section_header("Examples"),
2393 dedent(
2394 """
2395 >>> s = pd.Series([5, 5, 6, 7, 5, 5, 5])
2396 >>> s.rolling(3).var()
2397 0 NaN
2398 1 NaN
2399 2 0.333333
2400 3 1.000000
2401 4 1.000000
2402 5 1.333333
2403 6 0.000000
2404 dtype: float64
2405 """
2406 ).replace("\n", "", 1),
2407 window_method="rolling",
2408 aggregation_description="variance",
2409 agg_method="var",
2410 )
2411 def var(
2412 self,
2413 ddof: int = 1,
2414 numeric_only: bool = False,
2415 engine: Literal["cython", "numba"] | None = None,
2416 engine_kwargs: dict[str, bool] | None = None,
2417 ):
2418 return super().var(
2419 ddof=ddof,
2420 numeric_only=numeric_only,
2421 engine=engine,
2422 engine_kwargs=engine_kwargs,
2423 )
2424
2425 @doc(
2426 template_header,
2427 create_section_header("Parameters"),
2428 kwargs_numeric_only,
2429 create_section_header("Returns"),
2430 template_returns,
2431 create_section_header("See Also"),
2432 "scipy.stats.skew : Third moment of a probability density.\n",
2433 template_see_also,
2434 create_section_header("Notes"),
2435 dedent(
2436 """
2437 A minimum of three periods is required for the rolling calculation.\n
2438 """
2439 ),
2440 create_section_header("Examples"),
2441 dedent(
2442 """\
2443 >>> ser = pd.Series([1, 5, 2, 7, 15, 6])
2444 >>> ser.rolling(3).skew().round(6)
2445 0 NaN
2446 1 NaN
2447 2 1.293343
2448 3 -0.585583
2449 4 0.670284
2450 5 1.652317
2451 dtype: float64
2452 """
2453 ),
2454 window_method="rolling",
2455 aggregation_description="unbiased skewness",
2456 agg_method="skew",
2457 )
2458 def skew(self, numeric_only: bool = False):
2459 return super().skew(numeric_only=numeric_only)
2460
2461 @doc(
2462 template_header,
2463 create_section_header("Parameters"),
2464 dedent(
2465 """
2466 ddof : int, default 1
2467 Delta Degrees of Freedom. The divisor used in calculations
2468 is ``N - ddof``, where ``N`` represents the number of elements.
2469 """
2470 ).replace("\n", "", 1),
2471 kwargs_numeric_only,
2472 create_section_header("Returns"),
2473 template_returns,
2474 create_section_header("See Also"),
2475 template_see_also,
2476 create_section_header("Notes"),
2477 "A minimum of one period is required for the calculation.\n\n",
2478 create_section_header("Examples"),
2479 dedent(
2480 """
2481 >>> s = pd.Series([0, 1, 2, 3])
2482 >>> s.rolling(2, min_periods=1).sem()
2483 0 NaN
2484 1 0.707107
2485 2 0.707107
2486 3 0.707107
2487 dtype: float64
2488 """
2489 ).replace("\n", "", 1),
2490 window_method="rolling",
2491 aggregation_description="standard error of mean",
2492 agg_method="sem",
2493 )
2494 def sem(self, ddof: int = 1, numeric_only: bool = False):
2495 # Raise here so error message says sem instead of std
2496 self._validate_numeric_only("sem", numeric_only)
2497 return self.std(numeric_only=numeric_only) / (
2498 self.count(numeric_only) - ddof
2499 ).pow(0.5)
2500
2501 @doc(
2502 template_header,
2503 create_section_header("Parameters"),
2504 kwargs_numeric_only,
2505 create_section_header("Returns"),
2506 template_returns,
2507 create_section_header("See Also"),
2508 "scipy.stats.kurtosis : Reference SciPy method.\n",
2509 template_see_also,
2510 create_section_header("Notes"),
2511 "A minimum of four periods is required for the calculation.\n\n",
2512 create_section_header("Examples"),
2513 dedent(
2514 """
2515 The example below will show a rolling calculation with a window size of
2516 four matching the equivalent function call using `scipy.stats`.
2517
2518 >>> arr = [1, 2, 3, 4, 999]
2519 >>> import scipy.stats
2520 >>> print(f"{{scipy.stats.kurtosis(arr[:-1], bias=False):.6f}}")
2521 -1.200000
2522 >>> print(f"{{scipy.stats.kurtosis(arr[1:], bias=False):.6f}}")
2523 3.999946
2524 >>> s = pd.Series(arr)
2525 >>> s.rolling(4).kurt()
2526 0 NaN
2527 1 NaN
2528 2 NaN
2529 3 -1.200000
2530 4 3.999946
2531 dtype: float64
2532 """
2533 ).replace("\n", "", 1),
2534 window_method="rolling",
2535 aggregation_description="Fisher's definition of kurtosis without bias",
2536 agg_method="kurt",
2537 )
2538 def kurt(self, numeric_only: bool = False):
2539 return super().kurt(numeric_only=numeric_only)
2540
2541 @doc(
2542 template_header,
2543 create_section_header("Parameters"),
2544 dedent(
2545 """
2546 quantile : float
2547 Quantile to compute. 0 <= quantile <= 1.
2548
2549 .. deprecated:: 2.1.0
2550 This will be renamed to 'q' in a future version.
2551 interpolation : {{'linear', 'lower', 'higher', 'midpoint', 'nearest'}}
2552 This optional parameter specifies the interpolation method to use,
2553 when the desired quantile lies between two data points `i` and `j`:
2554
2555 * linear: `i + (j - i) * fraction`, where `fraction` is the
2556 fractional part of the index surrounded by `i` and `j`.
2557 * lower: `i`.
2558 * higher: `j`.
2559 * nearest: `i` or `j` whichever is nearest.
2560 * midpoint: (`i` + `j`) / 2.
2561 """
2562 ).replace("\n", "", 1),
2563 kwargs_numeric_only,
2564 create_section_header("Returns"),
2565 template_returns,
2566 create_section_header("See Also"),
2567 template_see_also,
2568 create_section_header("Examples"),
2569 dedent(
2570 """
2571 >>> s = pd.Series([1, 2, 3, 4])
2572 >>> s.rolling(2).quantile(.4, interpolation='lower')
2573 0 NaN
2574 1 1.0
2575 2 2.0
2576 3 3.0
2577 dtype: float64
2578
2579 >>> s.rolling(2).quantile(.4, interpolation='midpoint')
2580 0 NaN
2581 1 1.5
2582 2 2.5
2583 3 3.5
2584 dtype: float64
2585 """
2586 ).replace("\n", "", 1),
2587 window_method="rolling",
2588 aggregation_description="quantile",
2589 agg_method="quantile",
2590 )
2591 @deprecate_kwarg(old_arg_name="quantile", new_arg_name="q")
2592 def quantile(
2593 self,
2594 q: float,
2595 interpolation: QuantileInterpolation = "linear",
2596 numeric_only: bool = False,
2597 ):
2598 return super().quantile(
2599 q=q,
2600 interpolation=interpolation,
2601 numeric_only=numeric_only,
2602 )
2603
2604 @doc(
2605 template_header,
2606 ".. versionadded:: 1.4.0 \n\n",
2607 create_section_header("Parameters"),
2608 dedent(
2609 """
2610 method : {{'average', 'min', 'max'}}, default 'average'
2611 How to rank the group of records that have the same value (i.e. ties):
2612
2613 * average: average rank of the group
2614 * min: lowest rank in the group
2615 * max: highest rank in the group
2616
2617 ascending : bool, default True
2618 Whether or not the elements should be ranked in ascending order.
2619 pct : bool, default False
2620 Whether or not to display the returned rankings in percentile
2621 form.
2622 """
2623 ).replace("\n", "", 1),
2624 kwargs_numeric_only,
2625 create_section_header("Returns"),
2626 template_returns,
2627 create_section_header("See Also"),
2628 template_see_also,
2629 create_section_header("Examples"),
2630 dedent(
2631 """
2632 >>> s = pd.Series([1, 4, 2, 3, 5, 3])
2633 >>> s.rolling(3).rank()
2634 0 NaN
2635 1 NaN
2636 2 2.0
2637 3 2.0
2638 4 3.0
2639 5 1.5
2640 dtype: float64
2641
2642 >>> s.rolling(3).rank(method="max")
2643 0 NaN
2644 1 NaN
2645 2 2.0
2646 3 2.0
2647 4 3.0
2648 5 2.0
2649 dtype: float64
2650
2651 >>> s.rolling(3).rank(method="min")
2652 0 NaN
2653 1 NaN
2654 2 2.0
2655 3 2.0
2656 4 3.0
2657 5 1.0
2658 dtype: float64
2659 """
2660 ).replace("\n", "", 1),
2661 window_method="rolling",
2662 aggregation_description="rank",
2663 agg_method="rank",
2664 )
2665 def rank(
2666 self,
2667 method: WindowingRankType = "average",
2668 ascending: bool = True,
2669 pct: bool = False,
2670 numeric_only: bool = False,
2671 ):
2672 return super().rank(
2673 method=method,
2674 ascending=ascending,
2675 pct=pct,
2676 numeric_only=numeric_only,
2677 )
2678
2679 @doc(
2680 template_header,
2681 create_section_header("Parameters"),
2682 dedent(
2683 """
2684 other : Series or DataFrame, optional
2685 If not supplied then will default to self and produce pairwise
2686 output.
2687 pairwise : bool, default None
2688 If False then only matching columns between self and other will be
2689 used and the output will be a DataFrame.
2690 If True then all pairwise combinations will be calculated and the
2691 output will be a MultiIndexed DataFrame in the case of DataFrame
2692 inputs. In the case of missing elements, only complete pairwise
2693 observations will be used.
2694 ddof : int, default 1
2695 Delta Degrees of Freedom. The divisor used in calculations
2696 is ``N - ddof``, where ``N`` represents the number of elements.
2697 """
2698 ).replace("\n", "", 1),
2699 kwargs_numeric_only,
2700 create_section_header("Returns"),
2701 template_returns,
2702 create_section_header("See Also"),
2703 template_see_also,
2704 create_section_header("Examples"),
2705 dedent(
2706 """\
2707 >>> ser1 = pd.Series([1, 2, 3, 4])
2708 >>> ser2 = pd.Series([1, 4, 5, 8])
2709 >>> ser1.rolling(2).cov(ser2)
2710 0 NaN
2711 1 1.5
2712 2 0.5
2713 3 1.5
2714 dtype: float64
2715 """
2716 ),
2717 window_method="rolling",
2718 aggregation_description="sample covariance",
2719 agg_method="cov",
2720 )
2721 def cov(
2722 self,
2723 other: DataFrame | Series | None = None,
2724 pairwise: bool | None = None,
2725 ddof: int = 1,
2726 numeric_only: bool = False,
2727 ):
2728 return super().cov(
2729 other=other,
2730 pairwise=pairwise,
2731 ddof=ddof,
2732 numeric_only=numeric_only,
2733 )
2734
2735 @doc(
2736 template_header,
2737 create_section_header("Parameters"),
2738 dedent(
2739 """
2740 other : Series or DataFrame, optional
2741 If not supplied then will default to self and produce pairwise
2742 output.
2743 pairwise : bool, default None
2744 If False then only matching columns between self and other will be
2745 used and the output will be a DataFrame.
2746 If True then all pairwise combinations will be calculated and the
2747 output will be a MultiIndexed DataFrame in the case of DataFrame
2748 inputs. In the case of missing elements, only complete pairwise
2749 observations will be used.
2750 ddof : int, default 1
2751 Delta Degrees of Freedom. The divisor used in calculations
2752 is ``N - ddof``, where ``N`` represents the number of elements.
2753 """
2754 ).replace("\n", "", 1),
2755 kwargs_numeric_only,
2756 create_section_header("Returns"),
2757 template_returns,
2758 create_section_header("See Also"),
2759 dedent(
2760 """
2761 cov : Similar method to calculate covariance.
2762 numpy.corrcoef : NumPy Pearson's correlation calculation.
2763 """
2764 ).replace("\n", "", 1),
2765 template_see_also,
2766 create_section_header("Notes"),
2767 dedent(
2768 """
2769 This function uses Pearson's definition of correlation
2770 (https://en.wikipedia.org/wiki/Pearson_correlation_coefficient).
2771
2772 When `other` is not specified, the output will be self correlation (e.g.
2773 all 1's), except for :class:`~pandas.DataFrame` inputs with `pairwise`
2774 set to `True`.
2775
2776 Function will return ``NaN`` for correlations of equal valued sequences;
2777 this is the result of a 0/0 division error.
2778
2779 When `pairwise` is set to `False`, only matching columns between `self` and
2780 `other` will be used.
2781
2782 When `pairwise` is set to `True`, the output will be a MultiIndex DataFrame
2783 with the original index on the first level, and the `other` DataFrame
2784 columns on the second level.
2785
2786 In the case of missing elements, only complete pairwise observations
2787 will be used.\n
2788 """
2789 ).replace("\n", "", 1),
2790 create_section_header("Examples"),
2791 dedent(
2792 """
2793 The below example shows a rolling calculation with a window size of
2794 four matching the equivalent function call using :meth:`numpy.corrcoef`.
2795
2796 >>> v1 = [3, 3, 3, 5, 8]
2797 >>> v2 = [3, 4, 4, 4, 8]
2798 >>> np.corrcoef(v1[:-1], v2[:-1])
2799 array([[1. , 0.33333333],
2800 [0.33333333, 1. ]])
2801 >>> np.corrcoef(v1[1:], v2[1:])
2802 array([[1. , 0.9169493],
2803 [0.9169493, 1. ]])
2804 >>> s1 = pd.Series(v1)
2805 >>> s2 = pd.Series(v2)
2806 >>> s1.rolling(4).corr(s2)
2807 0 NaN
2808 1 NaN
2809 2 NaN
2810 3 0.333333
2811 4 0.916949
2812 dtype: float64
2813
2814 The below example shows a similar rolling calculation on a
2815 DataFrame using the pairwise option.
2816
2817 >>> matrix = np.array([[51., 35.],
2818 ... [49., 30.],
2819 ... [47., 32.],
2820 ... [46., 31.],
2821 ... [50., 36.]])
2822 >>> np.corrcoef(matrix[:-1, 0], matrix[:-1, 1])
2823 array([[1. , 0.6263001],
2824 [0.6263001, 1. ]])
2825 >>> np.corrcoef(matrix[1:, 0], matrix[1:, 1])
2826 array([[1. , 0.55536811],
2827 [0.55536811, 1. ]])
2828 >>> df = pd.DataFrame(matrix, columns=['X', 'Y'])
2829 >>> df
2830 X Y
2831 0 51.0 35.0
2832 1 49.0 30.0
2833 2 47.0 32.0
2834 3 46.0 31.0
2835 4 50.0 36.0
2836 >>> df.rolling(4).corr(pairwise=True)
2837 X Y
2838 0 X NaN NaN
2839 Y NaN NaN
2840 1 X NaN NaN
2841 Y NaN NaN
2842 2 X NaN NaN
2843 Y NaN NaN
2844 3 X 1.000000 0.626300
2845 Y 0.626300 1.000000
2846 4 X 1.000000 0.555368
2847 Y 0.555368 1.000000
2848 """
2849 ).replace("\n", "", 1),
2850 window_method="rolling",
2851 aggregation_description="correlation",
2852 agg_method="corr",
2853 )
2854 def corr(
2855 self,
2856 other: DataFrame | Series | None = None,
2857 pairwise: bool | None = None,
2858 ddof: int = 1,
2859 numeric_only: bool = False,
2860 ):
2861 return super().corr(
2862 other=other,
2863 pairwise=pairwise,
2864 ddof=ddof,
2865 numeric_only=numeric_only,
2866 )
2867
2868
2869Rolling.__doc__ = Window.__doc__
2870
2871
2872class RollingGroupby(BaseWindowGroupby, Rolling):
2873 """
2874 Provide a rolling groupby implementation.
2875 """
2876
2877 _attributes = Rolling._attributes + BaseWindowGroupby._attributes
2878
2879 def _get_window_indexer(self) -> GroupbyIndexer:
2880 """
2881 Return an indexer class that will compute the window start and end bounds
2882
2883 Returns
2884 -------
2885 GroupbyIndexer
2886 """
2887 rolling_indexer: type[BaseIndexer]
2888 indexer_kwargs: dict[str, Any] | None = None
2889 index_array = self._index_array
2890 if isinstance(self.window, BaseIndexer):
2891 rolling_indexer = type(self.window)
2892 indexer_kwargs = self.window.__dict__.copy()
2893 assert isinstance(indexer_kwargs, dict) # for mypy
2894 # We'll be using the index of each group later
2895 indexer_kwargs.pop("index_array", None)
2896 window = self.window
2897 elif self._win_freq_i8 is not None:
2898 rolling_indexer = VariableWindowIndexer
2899 # error: Incompatible types in assignment (expression has type
2900 # "int", variable has type "BaseIndexer")
2901 window = self._win_freq_i8 # type: ignore[assignment]
2902 else:
2903 rolling_indexer = FixedWindowIndexer
2904 window = self.window
2905 window_indexer = GroupbyIndexer(
2906 index_array=index_array,
2907 window_size=window,
2908 groupby_indices=self._grouper.indices,
2909 window_indexer=rolling_indexer,
2910 indexer_kwargs=indexer_kwargs,
2911 )
2912 return window_indexer
2913
2914 def _validate_datetimelike_monotonic(self):
2915 """
2916 Validate that each group in self._on is monotonic
2917 """
2918 # GH 46061
2919 if self._on.hasnans:
2920 self._raise_monotonic_error("values must not have NaT")
2921 for group_indices in self._grouper.indices.values():
2922 group_on = self._on.take(group_indices)
2923 if not (
2924 group_on.is_monotonic_increasing or group_on.is_monotonic_decreasing
2925 ):
2926 on = "index" if self.on is None else self.on
2927 raise ValueError(
2928 f"Each group within {on} must be monotonic. "
2929 f"Sort the values in {on} first."
2930 )