1from __future__ import annotations
2
3import datetime
4from functools import partial
5from textwrap import dedent
6from typing import TYPE_CHECKING
7
8import numpy as np
9
10from pandas._libs.tslibs import Timedelta
11import pandas._libs.window.aggregations as window_aggregations
12from pandas.util._decorators import doc
13
14from pandas.core.dtypes.common import (
15 is_datetime64_dtype,
16 is_numeric_dtype,
17)
18from pandas.core.dtypes.dtypes import DatetimeTZDtype
19from pandas.core.dtypes.generic import ABCSeries
20from pandas.core.dtypes.missing import isna
21
22from pandas.core import common
23from pandas.core.arrays.datetimelike import dtype_to_unit
24from pandas.core.indexers.objects import (
25 BaseIndexer,
26 ExponentialMovingWindowIndexer,
27 GroupbyIndexer,
28)
29from pandas.core.util.numba_ import (
30 get_jit_arguments,
31 maybe_use_numba,
32)
33from pandas.core.window.common import zsqrt
34from pandas.core.window.doc import (
35 _shared_docs,
36 create_section_header,
37 kwargs_numeric_only,
38 numba_notes,
39 template_header,
40 template_returns,
41 template_see_also,
42 window_agg_numba_parameters,
43)
44from pandas.core.window.numba_ import (
45 generate_numba_ewm_func,
46 generate_numba_ewm_table_func,
47)
48from pandas.core.window.online import (
49 EWMMeanState,
50 generate_online_numba_ewma_func,
51)
52from pandas.core.window.rolling import (
53 BaseWindow,
54 BaseWindowGroupby,
55)
56
57if TYPE_CHECKING:
58 from pandas._typing import (
59 Axis,
60 TimedeltaConvertibleTypes,
61 npt,
62 )
63
64 from pandas import (
65 DataFrame,
66 Series,
67 )
68 from pandas.core.generic import NDFrame
69
70
71def get_center_of_mass(
72 comass: float | None,
73 span: float | None,
74 halflife: float | None,
75 alpha: float | None,
76) -> float:
77 valid_count = common.count_not_none(comass, span, halflife, alpha)
78 if valid_count > 1:
79 raise ValueError("comass, span, halflife, and alpha are mutually exclusive")
80
81 # Convert to center of mass; domain checks ensure 0 < alpha <= 1
82 if comass is not None:
83 if comass < 0:
84 raise ValueError("comass must satisfy: comass >= 0")
85 elif span is not None:
86 if span < 1:
87 raise ValueError("span must satisfy: span >= 1")
88 comass = (span - 1) / 2
89 elif halflife is not None:
90 if halflife <= 0:
91 raise ValueError("halflife must satisfy: halflife > 0")
92 decay = 1 - np.exp(np.log(0.5) / halflife)
93 comass = 1 / decay - 1
94 elif alpha is not None:
95 if alpha <= 0 or alpha > 1:
96 raise ValueError("alpha must satisfy: 0 < alpha <= 1")
97 comass = (1 - alpha) / alpha
98 else:
99 raise ValueError("Must pass one of comass, span, halflife, or alpha")
100
101 return float(comass)
102
103
104def _calculate_deltas(
105 times: np.ndarray | NDFrame,
106 halflife: float | TimedeltaConvertibleTypes | None,
107) -> npt.NDArray[np.float64]:
108 """
109 Return the diff of the times divided by the half-life. These values are used in
110 the calculation of the ewm mean.
111
112 Parameters
113 ----------
114 times : np.ndarray, Series
115 Times corresponding to the observations. Must be monotonically increasing
116 and ``datetime64[ns]`` dtype.
117 halflife : float, str, timedelta, optional
118 Half-life specifying the decay
119
120 Returns
121 -------
122 np.ndarray
123 Diff of the times divided by the half-life
124 """
125 unit = dtype_to_unit(times.dtype)
126 if isinstance(times, ABCSeries):
127 times = times._values
128 _times = np.asarray(times.view(np.int64), dtype=np.float64)
129 _halflife = float(Timedelta(halflife).as_unit(unit)._value)
130 return np.diff(_times) / _halflife
131
132
133class ExponentialMovingWindow(BaseWindow):
134 r"""
135 Provide exponentially weighted (EW) calculations.
136
137 Exactly one of ``com``, ``span``, ``halflife``, or ``alpha`` must be
138 provided if ``times`` is not provided. If ``times`` is provided,
139 ``halflife`` and one of ``com``, ``span`` or ``alpha`` may be provided.
140
141 Parameters
142 ----------
143 com : float, optional
144 Specify decay in terms of center of mass
145
146 :math:`\alpha = 1 / (1 + com)`, for :math:`com \geq 0`.
147
148 span : float, optional
149 Specify decay in terms of span
150
151 :math:`\alpha = 2 / (span + 1)`, for :math:`span \geq 1`.
152
153 halflife : float, str, timedelta, optional
154 Specify decay in terms of half-life
155
156 :math:`\alpha = 1 - \exp\left(-\ln(2) / halflife\right)`, for
157 :math:`halflife > 0`.
158
159 If ``times`` is specified, a timedelta convertible unit over which an
160 observation decays to half its value. Only applicable to ``mean()``,
161 and halflife value will not apply to the other functions.
162
163 alpha : float, optional
164 Specify smoothing factor :math:`\alpha` directly
165
166 :math:`0 < \alpha \leq 1`.
167
168 min_periods : int, default 0
169 Minimum number of observations in window required to have a value;
170 otherwise, result is ``np.nan``.
171
172 adjust : bool, default True
173 Divide by decaying adjustment factor in beginning periods to account
174 for imbalance in relative weightings (viewing EWMA as a moving average).
175
176 - When ``adjust=True`` (default), the EW function is calculated using weights
177 :math:`w_i = (1 - \alpha)^i`. For example, the EW moving average of the series
178 [:math:`x_0, x_1, ..., x_t`] would be:
179
180 .. math::
181 y_t = \frac{x_t + (1 - \alpha)x_{t-1} + (1 - \alpha)^2 x_{t-2} + ... + (1 -
182 \alpha)^t x_0}{1 + (1 - \alpha) + (1 - \alpha)^2 + ... + (1 - \alpha)^t}
183
184 - When ``adjust=False``, the exponentially weighted function is calculated
185 recursively:
186
187 .. math::
188 \begin{split}
189 y_0 &= x_0\\
190 y_t &= (1 - \alpha) y_{t-1} + \alpha x_t,
191 \end{split}
192 ignore_na : bool, default False
193 Ignore missing values when calculating weights.
194
195 - When ``ignore_na=False`` (default), weights are based on absolute positions.
196 For example, the weights of :math:`x_0` and :math:`x_2` used in calculating
197 the final weighted average of [:math:`x_0`, None, :math:`x_2`] are
198 :math:`(1-\alpha)^2` and :math:`1` if ``adjust=True``, and
199 :math:`(1-\alpha)^2` and :math:`\alpha` if ``adjust=False``.
200
201 - When ``ignore_na=True``, weights are based
202 on relative positions. For example, the weights of :math:`x_0` and :math:`x_2`
203 used in calculating the final weighted average of
204 [:math:`x_0`, None, :math:`x_2`] are :math:`1-\alpha` and :math:`1` if
205 ``adjust=True``, and :math:`1-\alpha` and :math:`\alpha` if ``adjust=False``.
206
207 axis : {0, 1}, default 0
208 If ``0`` or ``'index'``, calculate across the rows.
209
210 If ``1`` or ``'columns'``, calculate across the columns.
211
212 For `Series` this parameter is unused and defaults to 0.
213
214 times : np.ndarray, Series, default None
215
216 Only applicable to ``mean()``.
217
218 Times corresponding to the observations. Must be monotonically increasing and
219 ``datetime64[ns]`` dtype.
220
221 If 1-D array like, a sequence with the same shape as the observations.
222
223 method : str {'single', 'table'}, default 'single'
224 .. versionadded:: 1.4.0
225
226 Execute the rolling operation per single column or row (``'single'``)
227 or over the entire object (``'table'``).
228
229 This argument is only implemented when specifying ``engine='numba'``
230 in the method call.
231
232 Only applicable to ``mean()``
233
234 Returns
235 -------
236 pandas.api.typing.ExponentialMovingWindow
237
238 See Also
239 --------
240 rolling : Provides rolling window calculations.
241 expanding : Provides expanding transformations.
242
243 Notes
244 -----
245 See :ref:`Windowing Operations <window.exponentially_weighted>`
246 for further usage details and examples.
247
248 Examples
249 --------
250 >>> df = pd.DataFrame({'B': [0, 1, 2, np.nan, 4]})
251 >>> df
252 B
253 0 0.0
254 1 1.0
255 2 2.0
256 3 NaN
257 4 4.0
258
259 >>> df.ewm(com=0.5).mean()
260 B
261 0 0.000000
262 1 0.750000
263 2 1.615385
264 3 1.615385
265 4 3.670213
266 >>> df.ewm(alpha=2 / 3).mean()
267 B
268 0 0.000000
269 1 0.750000
270 2 1.615385
271 3 1.615385
272 4 3.670213
273
274 **adjust**
275
276 >>> df.ewm(com=0.5, adjust=True).mean()
277 B
278 0 0.000000
279 1 0.750000
280 2 1.615385
281 3 1.615385
282 4 3.670213
283 >>> df.ewm(com=0.5, adjust=False).mean()
284 B
285 0 0.000000
286 1 0.666667
287 2 1.555556
288 3 1.555556
289 4 3.650794
290
291 **ignore_na**
292
293 >>> df.ewm(com=0.5, ignore_na=True).mean()
294 B
295 0 0.000000
296 1 0.750000
297 2 1.615385
298 3 1.615385
299 4 3.225000
300 >>> df.ewm(com=0.5, ignore_na=False).mean()
301 B
302 0 0.000000
303 1 0.750000
304 2 1.615385
305 3 1.615385
306 4 3.670213
307
308 **times**
309
310 Exponentially weighted mean with weights calculated with a timedelta ``halflife``
311 relative to ``times``.
312
313 >>> times = ['2020-01-01', '2020-01-03', '2020-01-10', '2020-01-15', '2020-01-17']
314 >>> df.ewm(halflife='4 days', times=pd.DatetimeIndex(times)).mean()
315 B
316 0 0.000000
317 1 0.585786
318 2 1.523889
319 3 1.523889
320 4 3.233686
321 """
322
323 _attributes = [
324 "com",
325 "span",
326 "halflife",
327 "alpha",
328 "min_periods",
329 "adjust",
330 "ignore_na",
331 "axis",
332 "times",
333 "method",
334 ]
335
336 def __init__(
337 self,
338 obj: NDFrame,
339 com: float | None = None,
340 span: float | None = None,
341 halflife: float | TimedeltaConvertibleTypes | None = None,
342 alpha: float | None = None,
343 min_periods: int | None = 0,
344 adjust: bool = True,
345 ignore_na: bool = False,
346 axis: Axis = 0,
347 times: np.ndarray | NDFrame | None = None,
348 method: str = "single",
349 *,
350 selection=None,
351 ) -> None:
352 super().__init__(
353 obj=obj,
354 min_periods=1 if min_periods is None else max(int(min_periods), 1),
355 on=None,
356 center=False,
357 closed=None,
358 method=method,
359 axis=axis,
360 selection=selection,
361 )
362 self.com = com
363 self.span = span
364 self.halflife = halflife
365 self.alpha = alpha
366 self.adjust = adjust
367 self.ignore_na = ignore_na
368 self.times = times
369 if self.times is not None:
370 if not self.adjust:
371 raise NotImplementedError("times is not supported with adjust=False.")
372 times_dtype = getattr(self.times, "dtype", None)
373 if not (
374 is_datetime64_dtype(times_dtype)
375 or isinstance(times_dtype, DatetimeTZDtype)
376 ):
377 raise ValueError("times must be datetime64 dtype.")
378 if len(self.times) != len(obj):
379 raise ValueError("times must be the same length as the object.")
380 if not isinstance(self.halflife, (str, datetime.timedelta, np.timedelta64)):
381 raise ValueError("halflife must be a timedelta convertible object")
382 if isna(self.times).any():
383 raise ValueError("Cannot convert NaT values to integer")
384 self._deltas = _calculate_deltas(self.times, self.halflife)
385 # Halflife is no longer applicable when calculating COM
386 # But allow COM to still be calculated if the user passes other decay args
387 if common.count_not_none(self.com, self.span, self.alpha) > 0:
388 self._com = get_center_of_mass(self.com, self.span, None, self.alpha)
389 else:
390 self._com = 1.0
391 else:
392 if self.halflife is not None and isinstance(
393 self.halflife, (str, datetime.timedelta, np.timedelta64)
394 ):
395 raise ValueError(
396 "halflife can only be a timedelta convertible argument if "
397 "times is not None."
398 )
399 # Without times, points are equally spaced
400 self._deltas = np.ones(
401 max(self.obj.shape[self.axis] - 1, 0), dtype=np.float64
402 )
403 self._com = get_center_of_mass(
404 # error: Argument 3 to "get_center_of_mass" has incompatible type
405 # "Union[float, Any, None, timedelta64, signedinteger[_64Bit]]";
406 # expected "Optional[float]"
407 self.com,
408 self.span,
409 self.halflife, # type: ignore[arg-type]
410 self.alpha,
411 )
412
413 def _check_window_bounds(
414 self, start: np.ndarray, end: np.ndarray, num_vals: int
415 ) -> None:
416 # emw algorithms are iterative with each point
417 # ExponentialMovingWindowIndexer "bounds" are the entire window
418 pass
419
420 def _get_window_indexer(self) -> BaseIndexer:
421 """
422 Return an indexer class that will compute the window start and end bounds
423 """
424 return ExponentialMovingWindowIndexer()
425
426 def online(
427 self, engine: str = "numba", engine_kwargs=None
428 ) -> OnlineExponentialMovingWindow:
429 """
430 Return an ``OnlineExponentialMovingWindow`` object to calculate
431 exponentially moving window aggregations in an online method.
432
433 .. versionadded:: 1.3.0
434
435 Parameters
436 ----------
437 engine: str, default ``'numba'``
438 Execution engine to calculate online aggregations.
439 Applies to all supported aggregation methods.
440
441 engine_kwargs : dict, default None
442 Applies to all supported aggregation methods.
443
444 * For ``'numba'`` engine, the engine can accept ``nopython``, ``nogil``
445 and ``parallel`` dictionary keys. The values must either be ``True`` or
446 ``False``. The default ``engine_kwargs`` for the ``'numba'`` engine is
447 ``{{'nopython': True, 'nogil': False, 'parallel': False}}`` and will be
448 applied to the function
449
450 Returns
451 -------
452 OnlineExponentialMovingWindow
453 """
454 return OnlineExponentialMovingWindow(
455 obj=self.obj,
456 com=self.com,
457 span=self.span,
458 halflife=self.halflife,
459 alpha=self.alpha,
460 min_periods=self.min_periods,
461 adjust=self.adjust,
462 ignore_na=self.ignore_na,
463 axis=self.axis,
464 times=self.times,
465 engine=engine,
466 engine_kwargs=engine_kwargs,
467 selection=self._selection,
468 )
469
470 @doc(
471 _shared_docs["aggregate"],
472 see_also=dedent(
473 """
474 See Also
475 --------
476 pandas.DataFrame.rolling.aggregate
477 """
478 ),
479 examples=dedent(
480 """
481 Examples
482 --------
483 >>> df = pd.DataFrame({"A": [1, 2, 3], "B": [4, 5, 6], "C": [7, 8, 9]})
484 >>> df
485 A B C
486 0 1 4 7
487 1 2 5 8
488 2 3 6 9
489
490 >>> df.ewm(alpha=0.5).mean()
491 A B C
492 0 1.000000 4.000000 7.000000
493 1 1.666667 4.666667 7.666667
494 2 2.428571 5.428571 8.428571
495 """
496 ),
497 klass="Series/Dataframe",
498 axis="",
499 )
500 def aggregate(self, func, *args, **kwargs):
501 return super().aggregate(func, *args, **kwargs)
502
503 agg = aggregate
504
505 @doc(
506 template_header,
507 create_section_header("Parameters"),
508 kwargs_numeric_only,
509 window_agg_numba_parameters(),
510 create_section_header("Returns"),
511 template_returns,
512 create_section_header("See Also"),
513 template_see_also,
514 create_section_header("Notes"),
515 numba_notes,
516 create_section_header("Examples"),
517 dedent(
518 """\
519 >>> ser = pd.Series([1, 2, 3, 4])
520 >>> ser.ewm(alpha=.2).mean()
521 0 1.000000
522 1 1.555556
523 2 2.147541
524 3 2.775068
525 dtype: float64
526 """
527 ),
528 window_method="ewm",
529 aggregation_description="(exponential weighted moment) mean",
530 agg_method="mean",
531 )
532 def mean(
533 self,
534 numeric_only: bool = False,
535 engine=None,
536 engine_kwargs=None,
537 ):
538 if maybe_use_numba(engine):
539 if self.method == "single":
540 func = generate_numba_ewm_func
541 else:
542 func = generate_numba_ewm_table_func
543 ewm_func = func(
544 **get_jit_arguments(engine_kwargs),
545 com=self._com,
546 adjust=self.adjust,
547 ignore_na=self.ignore_na,
548 deltas=tuple(self._deltas),
549 normalize=True,
550 )
551 return self._apply(ewm_func, name="mean")
552 elif engine in ("cython", None):
553 if engine_kwargs is not None:
554 raise ValueError("cython engine does not accept engine_kwargs")
555
556 deltas = None if self.times is None else self._deltas
557 window_func = partial(
558 window_aggregations.ewm,
559 com=self._com,
560 adjust=self.adjust,
561 ignore_na=self.ignore_na,
562 deltas=deltas,
563 normalize=True,
564 )
565 return self._apply(window_func, name="mean", numeric_only=numeric_only)
566 else:
567 raise ValueError("engine must be either 'numba' or 'cython'")
568
569 @doc(
570 template_header,
571 create_section_header("Parameters"),
572 kwargs_numeric_only,
573 window_agg_numba_parameters(),
574 create_section_header("Returns"),
575 template_returns,
576 create_section_header("See Also"),
577 template_see_also,
578 create_section_header("Notes"),
579 numba_notes,
580 create_section_header("Examples"),
581 dedent(
582 """\
583 >>> ser = pd.Series([1, 2, 3, 4])
584 >>> ser.ewm(alpha=.2).sum()
585 0 1.000
586 1 2.800
587 2 5.240
588 3 8.192
589 dtype: float64
590 """
591 ),
592 window_method="ewm",
593 aggregation_description="(exponential weighted moment) sum",
594 agg_method="sum",
595 )
596 def sum(
597 self,
598 numeric_only: bool = False,
599 engine=None,
600 engine_kwargs=None,
601 ):
602 if not self.adjust:
603 raise NotImplementedError("sum is not implemented with adjust=False")
604 if maybe_use_numba(engine):
605 if self.method == "single":
606 func = generate_numba_ewm_func
607 else:
608 func = generate_numba_ewm_table_func
609 ewm_func = func(
610 **get_jit_arguments(engine_kwargs),
611 com=self._com,
612 adjust=self.adjust,
613 ignore_na=self.ignore_na,
614 deltas=tuple(self._deltas),
615 normalize=False,
616 )
617 return self._apply(ewm_func, name="sum")
618 elif engine in ("cython", None):
619 if engine_kwargs is not None:
620 raise ValueError("cython engine does not accept engine_kwargs")
621
622 deltas = None if self.times is None else self._deltas
623 window_func = partial(
624 window_aggregations.ewm,
625 com=self._com,
626 adjust=self.adjust,
627 ignore_na=self.ignore_na,
628 deltas=deltas,
629 normalize=False,
630 )
631 return self._apply(window_func, name="sum", numeric_only=numeric_only)
632 else:
633 raise ValueError("engine must be either 'numba' or 'cython'")
634
635 @doc(
636 template_header,
637 create_section_header("Parameters"),
638 dedent(
639 """\
640 bias : bool, default False
641 Use a standard estimation bias correction.
642 """
643 ),
644 kwargs_numeric_only,
645 create_section_header("Returns"),
646 template_returns,
647 create_section_header("See Also"),
648 template_see_also,
649 create_section_header("Examples"),
650 dedent(
651 """\
652 >>> ser = pd.Series([1, 2, 3, 4])
653 >>> ser.ewm(alpha=.2).std()
654 0 NaN
655 1 0.707107
656 2 0.995893
657 3 1.277320
658 dtype: float64
659 """
660 ),
661 window_method="ewm",
662 aggregation_description="(exponential weighted moment) standard deviation",
663 agg_method="std",
664 )
665 def std(self, bias: bool = False, numeric_only: bool = False):
666 if (
667 numeric_only
668 and self._selected_obj.ndim == 1
669 and not is_numeric_dtype(self._selected_obj.dtype)
670 ):
671 # Raise directly so error message says std instead of var
672 raise NotImplementedError(
673 f"{type(self).__name__}.std does not implement numeric_only"
674 )
675 return zsqrt(self.var(bias=bias, numeric_only=numeric_only))
676
677 @doc(
678 template_header,
679 create_section_header("Parameters"),
680 dedent(
681 """\
682 bias : bool, default False
683 Use a standard estimation bias correction.
684 """
685 ),
686 kwargs_numeric_only,
687 create_section_header("Returns"),
688 template_returns,
689 create_section_header("See Also"),
690 template_see_also,
691 create_section_header("Examples"),
692 dedent(
693 """\
694 >>> ser = pd.Series([1, 2, 3, 4])
695 >>> ser.ewm(alpha=.2).var()
696 0 NaN
697 1 0.500000
698 2 0.991803
699 3 1.631547
700 dtype: float64
701 """
702 ),
703 window_method="ewm",
704 aggregation_description="(exponential weighted moment) variance",
705 agg_method="var",
706 )
707 def var(self, bias: bool = False, numeric_only: bool = False):
708 window_func = window_aggregations.ewmcov
709 wfunc = partial(
710 window_func,
711 com=self._com,
712 adjust=self.adjust,
713 ignore_na=self.ignore_na,
714 bias=bias,
715 )
716
717 def var_func(values, begin, end, min_periods):
718 return wfunc(values, begin, end, min_periods, values)
719
720 return self._apply(var_func, name="var", numeric_only=numeric_only)
721
722 @doc(
723 template_header,
724 create_section_header("Parameters"),
725 dedent(
726 """\
727 other : Series or DataFrame , optional
728 If not supplied then will default to self and produce pairwise
729 output.
730 pairwise : bool, default None
731 If False then only matching columns between self and other will be
732 used and the output will be a DataFrame.
733 If True then all pairwise combinations will be calculated and the
734 output will be a MultiIndex DataFrame in the case of DataFrame
735 inputs. In the case of missing elements, only complete pairwise
736 observations will be used.
737 bias : bool, default False
738 Use a standard estimation bias correction.
739 """
740 ),
741 kwargs_numeric_only,
742 create_section_header("Returns"),
743 template_returns,
744 create_section_header("See Also"),
745 template_see_also,
746 create_section_header("Examples"),
747 dedent(
748 """\
749 >>> ser1 = pd.Series([1, 2, 3, 4])
750 >>> ser2 = pd.Series([10, 11, 13, 16])
751 >>> ser1.ewm(alpha=.2).cov(ser2)
752 0 NaN
753 1 0.500000
754 2 1.524590
755 3 3.408836
756 dtype: float64
757 """
758 ),
759 window_method="ewm",
760 aggregation_description="(exponential weighted moment) sample covariance",
761 agg_method="cov",
762 )
763 def cov(
764 self,
765 other: DataFrame | Series | None = None,
766 pairwise: bool | None = None,
767 bias: bool = False,
768 numeric_only: bool = False,
769 ):
770 from pandas import Series
771
772 self._validate_numeric_only("cov", numeric_only)
773
774 def cov_func(x, y):
775 x_array = self._prep_values(x)
776 y_array = self._prep_values(y)
777 window_indexer = self._get_window_indexer()
778 min_periods = (
779 self.min_periods
780 if self.min_periods is not None
781 else window_indexer.window_size
782 )
783 start, end = window_indexer.get_window_bounds(
784 num_values=len(x_array),
785 min_periods=min_periods,
786 center=self.center,
787 closed=self.closed,
788 step=self.step,
789 )
790 result = window_aggregations.ewmcov(
791 x_array,
792 start,
793 end,
794 # error: Argument 4 to "ewmcov" has incompatible type
795 # "Optional[int]"; expected "int"
796 self.min_periods, # type: ignore[arg-type]
797 y_array,
798 self._com,
799 self.adjust,
800 self.ignore_na,
801 bias,
802 )
803 return Series(result, index=x.index, name=x.name, copy=False)
804
805 return self._apply_pairwise(
806 self._selected_obj, other, pairwise, cov_func, numeric_only
807 )
808
809 @doc(
810 template_header,
811 create_section_header("Parameters"),
812 dedent(
813 """\
814 other : Series or DataFrame, optional
815 If not supplied then will default to self and produce pairwise
816 output.
817 pairwise : bool, default None
818 If False then only matching columns between self and other will be
819 used and the output will be a DataFrame.
820 If True then all pairwise combinations will be calculated and the
821 output will be a MultiIndex DataFrame in the case of DataFrame
822 inputs. In the case of missing elements, only complete pairwise
823 observations will be used.
824 """
825 ),
826 kwargs_numeric_only,
827 create_section_header("Returns"),
828 template_returns,
829 create_section_header("See Also"),
830 template_see_also,
831 create_section_header("Examples"),
832 dedent(
833 """\
834 >>> ser1 = pd.Series([1, 2, 3, 4])
835 >>> ser2 = pd.Series([10, 11, 13, 16])
836 >>> ser1.ewm(alpha=.2).corr(ser2)
837 0 NaN
838 1 1.000000
839 2 0.982821
840 3 0.977802
841 dtype: float64
842 """
843 ),
844 window_method="ewm",
845 aggregation_description="(exponential weighted moment) sample correlation",
846 agg_method="corr",
847 )
848 def corr(
849 self,
850 other: DataFrame | Series | None = None,
851 pairwise: bool | None = None,
852 numeric_only: bool = False,
853 ):
854 from pandas import Series
855
856 self._validate_numeric_only("corr", numeric_only)
857
858 def cov_func(x, y):
859 x_array = self._prep_values(x)
860 y_array = self._prep_values(y)
861 window_indexer = self._get_window_indexer()
862 min_periods = (
863 self.min_periods
864 if self.min_periods is not None
865 else window_indexer.window_size
866 )
867 start, end = window_indexer.get_window_bounds(
868 num_values=len(x_array),
869 min_periods=min_periods,
870 center=self.center,
871 closed=self.closed,
872 step=self.step,
873 )
874
875 def _cov(X, Y):
876 return window_aggregations.ewmcov(
877 X,
878 start,
879 end,
880 min_periods,
881 Y,
882 self._com,
883 self.adjust,
884 self.ignore_na,
885 True,
886 )
887
888 with np.errstate(all="ignore"):
889 cov = _cov(x_array, y_array)
890 x_var = _cov(x_array, x_array)
891 y_var = _cov(y_array, y_array)
892 result = cov / zsqrt(x_var * y_var)
893 return Series(result, index=x.index, name=x.name, copy=False)
894
895 return self._apply_pairwise(
896 self._selected_obj, other, pairwise, cov_func, numeric_only
897 )
898
899
900class ExponentialMovingWindowGroupby(BaseWindowGroupby, ExponentialMovingWindow):
901 """
902 Provide an exponential moving window groupby implementation.
903 """
904
905 _attributes = ExponentialMovingWindow._attributes + BaseWindowGroupby._attributes
906
907 def __init__(self, obj, *args, _grouper=None, **kwargs) -> None:
908 super().__init__(obj, *args, _grouper=_grouper, **kwargs)
909
910 if not obj.empty and self.times is not None:
911 # sort the times and recalculate the deltas according to the groups
912 groupby_order = np.concatenate(list(self._grouper.indices.values()))
913 self._deltas = _calculate_deltas(
914 self.times.take(groupby_order),
915 self.halflife,
916 )
917
918 def _get_window_indexer(self) -> GroupbyIndexer:
919 """
920 Return an indexer class that will compute the window start and end bounds
921
922 Returns
923 -------
924 GroupbyIndexer
925 """
926 window_indexer = GroupbyIndexer(
927 groupby_indices=self._grouper.indices,
928 window_indexer=ExponentialMovingWindowIndexer,
929 )
930 return window_indexer
931
932
933class OnlineExponentialMovingWindow(ExponentialMovingWindow):
934 def __init__(
935 self,
936 obj: NDFrame,
937 com: float | None = None,
938 span: float | None = None,
939 halflife: float | TimedeltaConvertibleTypes | None = None,
940 alpha: float | None = None,
941 min_periods: int | None = 0,
942 adjust: bool = True,
943 ignore_na: bool = False,
944 axis: Axis = 0,
945 times: np.ndarray | NDFrame | None = None,
946 engine: str = "numba",
947 engine_kwargs: dict[str, bool] | None = None,
948 *,
949 selection=None,
950 ) -> None:
951 if times is not None:
952 raise NotImplementedError(
953 "times is not implemented with online operations."
954 )
955 super().__init__(
956 obj=obj,
957 com=com,
958 span=span,
959 halflife=halflife,
960 alpha=alpha,
961 min_periods=min_periods,
962 adjust=adjust,
963 ignore_na=ignore_na,
964 axis=axis,
965 times=times,
966 selection=selection,
967 )
968 self._mean = EWMMeanState(
969 self._com, self.adjust, self.ignore_na, self.axis, obj.shape
970 )
971 if maybe_use_numba(engine):
972 self.engine = engine
973 self.engine_kwargs = engine_kwargs
974 else:
975 raise ValueError("'numba' is the only supported engine")
976
977 def reset(self) -> None:
978 """
979 Reset the state captured by `update` calls.
980 """
981 self._mean.reset()
982
983 def aggregate(self, func, *args, **kwargs):
984 raise NotImplementedError("aggregate is not implemented.")
985
986 def std(self, bias: bool = False, *args, **kwargs):
987 raise NotImplementedError("std is not implemented.")
988
989 def corr(
990 self,
991 other: DataFrame | Series | None = None,
992 pairwise: bool | None = None,
993 numeric_only: bool = False,
994 ):
995 raise NotImplementedError("corr is not implemented.")
996
997 def cov(
998 self,
999 other: DataFrame | Series | None = None,
1000 pairwise: bool | None = None,
1001 bias: bool = False,
1002 numeric_only: bool = False,
1003 ):
1004 raise NotImplementedError("cov is not implemented.")
1005
1006 def var(self, bias: bool = False, numeric_only: bool = False):
1007 raise NotImplementedError("var is not implemented.")
1008
1009 def mean(self, *args, update=None, update_times=None, **kwargs):
1010 """
1011 Calculate an online exponentially weighted mean.
1012
1013 Parameters
1014 ----------
1015 update: DataFrame or Series, default None
1016 New values to continue calculating the
1017 exponentially weighted mean from the last values and weights.
1018 Values should be float64 dtype.
1019
1020 ``update`` needs to be ``None`` the first time the
1021 exponentially weighted mean is calculated.
1022
1023 update_times: Series or 1-D np.ndarray, default None
1024 New times to continue calculating the
1025 exponentially weighted mean from the last values and weights.
1026 If ``None``, values are assumed to be evenly spaced
1027 in time.
1028 This feature is currently unsupported.
1029
1030 Returns
1031 -------
1032 DataFrame or Series
1033
1034 Examples
1035 --------
1036 >>> df = pd.DataFrame({"a": range(5), "b": range(5, 10)})
1037 >>> online_ewm = df.head(2).ewm(0.5).online()
1038 >>> online_ewm.mean()
1039 a b
1040 0 0.00 5.00
1041 1 0.75 5.75
1042 >>> online_ewm.mean(update=df.tail(3))
1043 a b
1044 2 1.615385 6.615385
1045 3 2.550000 7.550000
1046 4 3.520661 8.520661
1047 >>> online_ewm.reset()
1048 >>> online_ewm.mean()
1049 a b
1050 0 0.00 5.00
1051 1 0.75 5.75
1052 """
1053 result_kwargs = {}
1054 is_frame = self._selected_obj.ndim == 2
1055 if update_times is not None:
1056 raise NotImplementedError("update_times is not implemented.")
1057 update_deltas = np.ones(
1058 max(self._selected_obj.shape[self.axis - 1] - 1, 0), dtype=np.float64
1059 )
1060 if update is not None:
1061 if self._mean.last_ewm is None:
1062 raise ValueError(
1063 "Must call mean with update=None first before passing update"
1064 )
1065 result_from = 1
1066 result_kwargs["index"] = update.index
1067 if is_frame:
1068 last_value = self._mean.last_ewm[np.newaxis, :]
1069 result_kwargs["columns"] = update.columns
1070 else:
1071 last_value = self._mean.last_ewm
1072 result_kwargs["name"] = update.name
1073 np_array = np.concatenate((last_value, update.to_numpy()))
1074 else:
1075 result_from = 0
1076 result_kwargs["index"] = self._selected_obj.index
1077 if is_frame:
1078 result_kwargs["columns"] = self._selected_obj.columns
1079 else:
1080 result_kwargs["name"] = self._selected_obj.name
1081 np_array = self._selected_obj.astype(np.float64, copy=False).to_numpy()
1082 ewma_func = generate_online_numba_ewma_func(
1083 **get_jit_arguments(self.engine_kwargs)
1084 )
1085 result = self._mean.run_ewm(
1086 np_array if is_frame else np_array[:, np.newaxis],
1087 update_deltas,
1088 self.min_periods,
1089 ewma_func,
1090 )
1091 if not is_frame:
1092 result = result.squeeze()
1093 result = result[result_from:]
1094 result = self._selected_obj._constructor(result, **result_kwargs)
1095 return result