1from __future__ import annotations
2
3import datetime
4from functools import partial
5from textwrap import dedent
6from typing import TYPE_CHECKING
7
8import numpy as np
9
10from pandas._libs.tslibs import Timedelta
11import pandas._libs.window.aggregations as window_aggregations
12from pandas._typing import (
13 Axis,
14 TimedeltaConvertibleTypes,
15)
16
17if TYPE_CHECKING:
18 from pandas import DataFrame, Series
19 from pandas.core.generic import NDFrame
20
21from pandas.util._decorators import doc
22
23from pandas.core.dtypes.common import (
24 is_datetime64_ns_dtype,
25 is_numeric_dtype,
26)
27from pandas.core.dtypes.missing import isna
28
29from pandas.core import common
30from pandas.core.indexers.objects import (
31 BaseIndexer,
32 ExponentialMovingWindowIndexer,
33 GroupbyIndexer,
34)
35from pandas.core.util.numba_ import (
36 get_jit_arguments,
37 maybe_use_numba,
38)
39from pandas.core.window.common import zsqrt
40from pandas.core.window.doc import (
41 _shared_docs,
42 create_section_header,
43 kwargs_numeric_only,
44 numba_notes,
45 template_header,
46 template_returns,
47 template_see_also,
48 window_agg_numba_parameters,
49)
50from pandas.core.window.numba_ import (
51 generate_numba_ewm_func,
52 generate_numba_ewm_table_func,
53)
54from pandas.core.window.online import (
55 EWMMeanState,
56 generate_online_numba_ewma_func,
57)
58from pandas.core.window.rolling import (
59 BaseWindow,
60 BaseWindowGroupby,
61)
62
63
64def get_center_of_mass(
65 comass: float | None,
66 span: float | None,
67 halflife: float | None,
68 alpha: float | None,
69) -> float:
70 valid_count = common.count_not_none(comass, span, halflife, alpha)
71 if valid_count > 1:
72 raise ValueError("comass, span, halflife, and alpha are mutually exclusive")
73
74 # Convert to center of mass; domain checks ensure 0 < alpha <= 1
75 if comass is not None:
76 if comass < 0:
77 raise ValueError("comass must satisfy: comass >= 0")
78 elif span is not None:
79 if span < 1:
80 raise ValueError("span must satisfy: span >= 1")
81 comass = (span - 1) / 2
82 elif halflife is not None:
83 if halflife <= 0:
84 raise ValueError("halflife must satisfy: halflife > 0")
85 decay = 1 - np.exp(np.log(0.5) / halflife)
86 comass = 1 / decay - 1
87 elif alpha is not None:
88 if alpha <= 0 or alpha > 1:
89 raise ValueError("alpha must satisfy: 0 < alpha <= 1")
90 comass = (1 - alpha) / alpha
91 else:
92 raise ValueError("Must pass one of comass, span, halflife, or alpha")
93
94 return float(comass)
95
96
97def _calculate_deltas(
98 times: np.ndarray | NDFrame,
99 halflife: float | TimedeltaConvertibleTypes | None,
100) -> np.ndarray:
101 """
102 Return the diff of the times divided by the half-life. These values are used in
103 the calculation of the ewm mean.
104
105 Parameters
106 ----------
107 times : np.ndarray, Series
108 Times corresponding to the observations. Must be monotonically increasing
109 and ``datetime64[ns]`` dtype.
110 halflife : float, str, timedelta, optional
111 Half-life specifying the decay
112
113 Returns
114 -------
115 np.ndarray
116 Diff of the times divided by the half-life
117 """
118 _times = np.asarray(times.view(np.int64), dtype=np.float64)
119 # TODO: generalize to non-nano?
120 _halflife = float(Timedelta(halflife).as_unit("ns")._value)
121 return np.diff(_times) / _halflife
122
123
124class ExponentialMovingWindow(BaseWindow):
125 r"""
126 Provide exponentially weighted (EW) calculations.
127
128 Exactly one of ``com``, ``span``, ``halflife``, or ``alpha`` must be
129 provided if ``times`` is not provided. If ``times`` is provided,
130 ``halflife`` and one of ``com``, ``span`` or ``alpha`` may be provided.
131
132 Parameters
133 ----------
134 com : float, optional
135 Specify decay in terms of center of mass
136
137 :math:`\alpha = 1 / (1 + com)`, for :math:`com \geq 0`.
138
139 span : float, optional
140 Specify decay in terms of span
141
142 :math:`\alpha = 2 / (span + 1)`, for :math:`span \geq 1`.
143
144 halflife : float, str, timedelta, optional
145 Specify decay in terms of half-life
146
147 :math:`\alpha = 1 - \exp\left(-\ln(2) / halflife\right)`, for
148 :math:`halflife > 0`.
149
150 If ``times`` is specified, a timedelta convertible unit over which an
151 observation decays to half its value. Only applicable to ``mean()``,
152 and halflife value will not apply to the other functions.
153
154 .. versionadded:: 1.1.0
155
156 alpha : float, optional
157 Specify smoothing factor :math:`\alpha` directly
158
159 :math:`0 < \alpha \leq 1`.
160
161 min_periods : int, default 0
162 Minimum number of observations in window required to have a value;
163 otherwise, result is ``np.nan``.
164
165 adjust : bool, default True
166 Divide by decaying adjustment factor in beginning periods to account
167 for imbalance in relative weightings (viewing EWMA as a moving average).
168
169 - When ``adjust=True`` (default), the EW function is calculated using weights
170 :math:`w_i = (1 - \alpha)^i`. For example, the EW moving average of the series
171 [:math:`x_0, x_1, ..., x_t`] would be:
172
173 .. math::
174 y_t = \frac{x_t + (1 - \alpha)x_{t-1} + (1 - \alpha)^2 x_{t-2} + ... + (1 -
175 \alpha)^t x_0}{1 + (1 - \alpha) + (1 - \alpha)^2 + ... + (1 - \alpha)^t}
176
177 - When ``adjust=False``, the exponentially weighted function is calculated
178 recursively:
179
180 .. math::
181 \begin{split}
182 y_0 &= x_0\\
183 y_t &= (1 - \alpha) y_{t-1} + \alpha x_t,
184 \end{split}
185 ignore_na : bool, default False
186 Ignore missing values when calculating weights.
187
188 - When ``ignore_na=False`` (default), weights are based on absolute positions.
189 For example, the weights of :math:`x_0` and :math:`x_2` used in calculating
190 the final weighted average of [:math:`x_0`, None, :math:`x_2`] are
191 :math:`(1-\alpha)^2` and :math:`1` if ``adjust=True``, and
192 :math:`(1-\alpha)^2` and :math:`\alpha` if ``adjust=False``.
193
194 - When ``ignore_na=True``, weights are based
195 on relative positions. For example, the weights of :math:`x_0` and :math:`x_2`
196 used in calculating the final weighted average of
197 [:math:`x_0`, None, :math:`x_2`] are :math:`1-\alpha` and :math:`1` if
198 ``adjust=True``, and :math:`1-\alpha` and :math:`\alpha` if ``adjust=False``.
199
200 axis : {0, 1}, default 0
201 If ``0`` or ``'index'``, calculate across the rows.
202
203 If ``1`` or ``'columns'``, calculate across the columns.
204
205 For `Series` this parameter is unused and defaults to 0.
206
207 times : np.ndarray, Series, default None
208
209 .. versionadded:: 1.1.0
210
211 Only applicable to ``mean()``.
212
213 Times corresponding to the observations. Must be monotonically increasing and
214 ``datetime64[ns]`` dtype.
215
216 If 1-D array like, a sequence with the same shape as the observations.
217
218 method : str {'single', 'table'}, default 'single'
219 .. versionadded:: 1.4.0
220
221 Execute the rolling operation per single column or row (``'single'``)
222 or over the entire object (``'table'``).
223
224 This argument is only implemented when specifying ``engine='numba'``
225 in the method call.
226
227 Only applicable to ``mean()``
228
229 Returns
230 -------
231 ``ExponentialMovingWindow`` subclass
232
233 See Also
234 --------
235 rolling : Provides rolling window calculations.
236 expanding : Provides expanding transformations.
237
238 Notes
239 -----
240 See :ref:`Windowing Operations <window.exponentially_weighted>`
241 for further usage details and examples.
242
243 Examples
244 --------
245 >>> df = pd.DataFrame({'B': [0, 1, 2, np.nan, 4]})
246 >>> df
247 B
248 0 0.0
249 1 1.0
250 2 2.0
251 3 NaN
252 4 4.0
253
254 >>> df.ewm(com=0.5).mean()
255 B
256 0 0.000000
257 1 0.750000
258 2 1.615385
259 3 1.615385
260 4 3.670213
261 >>> df.ewm(alpha=2 / 3).mean()
262 B
263 0 0.000000
264 1 0.750000
265 2 1.615385
266 3 1.615385
267 4 3.670213
268
269 **adjust**
270
271 >>> df.ewm(com=0.5, adjust=True).mean()
272 B
273 0 0.000000
274 1 0.750000
275 2 1.615385
276 3 1.615385
277 4 3.670213
278 >>> df.ewm(com=0.5, adjust=False).mean()
279 B
280 0 0.000000
281 1 0.666667
282 2 1.555556
283 3 1.555556
284 4 3.650794
285
286 **ignore_na**
287
288 >>> df.ewm(com=0.5, ignore_na=True).mean()
289 B
290 0 0.000000
291 1 0.750000
292 2 1.615385
293 3 1.615385
294 4 3.225000
295 >>> df.ewm(com=0.5, ignore_na=False).mean()
296 B
297 0 0.000000
298 1 0.750000
299 2 1.615385
300 3 1.615385
301 4 3.670213
302
303 **times**
304
305 Exponentially weighted mean with weights calculated with a timedelta ``halflife``
306 relative to ``times``.
307
308 >>> times = ['2020-01-01', '2020-01-03', '2020-01-10', '2020-01-15', '2020-01-17']
309 >>> df.ewm(halflife='4 days', times=pd.DatetimeIndex(times)).mean()
310 B
311 0 0.000000
312 1 0.585786
313 2 1.523889
314 3 1.523889
315 4 3.233686
316 """
317
318 _attributes = [
319 "com",
320 "span",
321 "halflife",
322 "alpha",
323 "min_periods",
324 "adjust",
325 "ignore_na",
326 "axis",
327 "times",
328 "method",
329 ]
330
331 def __init__(
332 self,
333 obj: NDFrame,
334 com: float | None = None,
335 span: float | None = None,
336 halflife: float | TimedeltaConvertibleTypes | None = None,
337 alpha: float | None = None,
338 min_periods: int | None = 0,
339 adjust: bool = True,
340 ignore_na: bool = False,
341 axis: Axis = 0,
342 times: np.ndarray | NDFrame | None = None,
343 method: str = "single",
344 *,
345 selection=None,
346 ) -> None:
347 super().__init__(
348 obj=obj,
349 min_periods=1 if min_periods is None else max(int(min_periods), 1),
350 on=None,
351 center=False,
352 closed=None,
353 method=method,
354 axis=axis,
355 selection=selection,
356 )
357 self.com = com
358 self.span = span
359 self.halflife = halflife
360 self.alpha = alpha
361 self.adjust = adjust
362 self.ignore_na = ignore_na
363 self.times = times
364 if self.times is not None:
365 if not self.adjust:
366 raise NotImplementedError("times is not supported with adjust=False.")
367 if not is_datetime64_ns_dtype(self.times):
368 raise ValueError("times must be datetime64[ns] dtype.")
369 if len(self.times) != len(obj):
370 raise ValueError("times must be the same length as the object.")
371 if not isinstance(self.halflife, (str, datetime.timedelta, np.timedelta64)):
372 raise ValueError("halflife must be a timedelta convertible object")
373 if isna(self.times).any():
374 raise ValueError("Cannot convert NaT values to integer")
375 self._deltas = _calculate_deltas(self.times, self.halflife)
376 # Halflife is no longer applicable when calculating COM
377 # But allow COM to still be calculated if the user passes other decay args
378 if common.count_not_none(self.com, self.span, self.alpha) > 0:
379 self._com = get_center_of_mass(self.com, self.span, None, self.alpha)
380 else:
381 self._com = 1.0
382 else:
383 if self.halflife is not None and isinstance(
384 self.halflife, (str, datetime.timedelta, np.timedelta64)
385 ):
386 raise ValueError(
387 "halflife can only be a timedelta convertible argument if "
388 "times is not None."
389 )
390 # Without times, points are equally spaced
391 self._deltas = np.ones(
392 max(self.obj.shape[self.axis] - 1, 0), dtype=np.float64
393 )
394 self._com = get_center_of_mass(
395 # error: Argument 3 to "get_center_of_mass" has incompatible type
396 # "Union[float, Any, None, timedelta64, signedinteger[_64Bit]]";
397 # expected "Optional[float]"
398 self.com,
399 self.span,
400 self.halflife, # type: ignore[arg-type]
401 self.alpha,
402 )
403
404 def _check_window_bounds(
405 self, start: np.ndarray, end: np.ndarray, num_vals: int
406 ) -> None:
407 # emw algorithms are iterative with each point
408 # ExponentialMovingWindowIndexer "bounds" are the entire window
409 pass
410
411 def _get_window_indexer(self) -> BaseIndexer:
412 """
413 Return an indexer class that will compute the window start and end bounds
414 """
415 return ExponentialMovingWindowIndexer()
416
417 def online(
418 self, engine: str = "numba", engine_kwargs=None
419 ) -> OnlineExponentialMovingWindow:
420 """
421 Return an ``OnlineExponentialMovingWindow`` object to calculate
422 exponentially moving window aggregations in an online method.
423
424 .. versionadded:: 1.3.0
425
426 Parameters
427 ----------
428 engine: str, default ``'numba'``
429 Execution engine to calculate online aggregations.
430 Applies to all supported aggregation methods.
431
432 engine_kwargs : dict, default None
433 Applies to all supported aggregation methods.
434
435 * For ``'numba'`` engine, the engine can accept ``nopython``, ``nogil``
436 and ``parallel`` dictionary keys. The values must either be ``True`` or
437 ``False``. The default ``engine_kwargs`` for the ``'numba'`` engine is
438 ``{{'nopython': True, 'nogil': False, 'parallel': False}}`` and will be
439 applied to the function
440
441 Returns
442 -------
443 OnlineExponentialMovingWindow
444 """
445 return OnlineExponentialMovingWindow(
446 obj=self.obj,
447 com=self.com,
448 span=self.span,
449 halflife=self.halflife,
450 alpha=self.alpha,
451 min_periods=self.min_periods,
452 adjust=self.adjust,
453 ignore_na=self.ignore_na,
454 axis=self.axis,
455 times=self.times,
456 engine=engine,
457 engine_kwargs=engine_kwargs,
458 selection=self._selection,
459 )
460
461 @doc(
462 _shared_docs["aggregate"],
463 see_also=dedent(
464 """
465 See Also
466 --------
467 pandas.DataFrame.rolling.aggregate
468 """
469 ),
470 examples=dedent(
471 """
472 Examples
473 --------
474 >>> df = pd.DataFrame({"A": [1, 2, 3], "B": [4, 5, 6], "C": [7, 8, 9]})
475 >>> df
476 A B C
477 0 1 4 7
478 1 2 5 8
479 2 3 6 9
480
481 >>> df.ewm(alpha=0.5).mean()
482 A B C
483 0 1.000000 4.000000 7.000000
484 1 1.666667 4.666667 7.666667
485 2 2.428571 5.428571 8.428571
486 """
487 ),
488 klass="Series/Dataframe",
489 axis="",
490 )
491 def aggregate(self, func, *args, **kwargs):
492 return super().aggregate(func, *args, **kwargs)
493
494 agg = aggregate
495
496 @doc(
497 template_header,
498 create_section_header("Parameters"),
499 kwargs_numeric_only,
500 window_agg_numba_parameters(),
501 create_section_header("Returns"),
502 template_returns,
503 create_section_header("See Also"),
504 template_see_also,
505 create_section_header("Notes"),
506 numba_notes.replace("\n", "", 1),
507 window_method="ewm",
508 aggregation_description="(exponential weighted moment) mean",
509 agg_method="mean",
510 )
511 def mean(
512 self,
513 numeric_only: bool = False,
514 engine=None,
515 engine_kwargs=None,
516 ):
517 if maybe_use_numba(engine):
518 if self.method == "single":
519 func = generate_numba_ewm_func
520 else:
521 func = generate_numba_ewm_table_func
522 ewm_func = func(
523 **get_jit_arguments(engine_kwargs),
524 com=self._com,
525 adjust=self.adjust,
526 ignore_na=self.ignore_na,
527 deltas=tuple(self._deltas),
528 normalize=True,
529 )
530 return self._apply(ewm_func, name="mean")
531 elif engine in ("cython", None):
532 if engine_kwargs is not None:
533 raise ValueError("cython engine does not accept engine_kwargs")
534
535 deltas = None if self.times is None else self._deltas
536 window_func = partial(
537 window_aggregations.ewm,
538 com=self._com,
539 adjust=self.adjust,
540 ignore_na=self.ignore_na,
541 deltas=deltas,
542 normalize=True,
543 )
544 return self._apply(window_func, name="mean", numeric_only=numeric_only)
545 else:
546 raise ValueError("engine must be either 'numba' or 'cython'")
547
548 @doc(
549 template_header,
550 create_section_header("Parameters"),
551 kwargs_numeric_only,
552 window_agg_numba_parameters(),
553 create_section_header("Returns"),
554 template_returns,
555 create_section_header("See Also"),
556 template_see_also,
557 create_section_header("Notes"),
558 numba_notes.replace("\n", "", 1),
559 window_method="ewm",
560 aggregation_description="(exponential weighted moment) sum",
561 agg_method="sum",
562 )
563 def sum(
564 self,
565 numeric_only: bool = False,
566 engine=None,
567 engine_kwargs=None,
568 ):
569 if not self.adjust:
570 raise NotImplementedError("sum is not implemented with adjust=False")
571 if maybe_use_numba(engine):
572 if self.method == "single":
573 func = generate_numba_ewm_func
574 else:
575 func = generate_numba_ewm_table_func
576 ewm_func = func(
577 **get_jit_arguments(engine_kwargs),
578 com=self._com,
579 adjust=self.adjust,
580 ignore_na=self.ignore_na,
581 deltas=tuple(self._deltas),
582 normalize=False,
583 )
584 return self._apply(ewm_func, name="sum")
585 elif engine in ("cython", None):
586 if engine_kwargs is not None:
587 raise ValueError("cython engine does not accept engine_kwargs")
588
589 deltas = None if self.times is None else self._deltas
590 window_func = partial(
591 window_aggregations.ewm,
592 com=self._com,
593 adjust=self.adjust,
594 ignore_na=self.ignore_na,
595 deltas=deltas,
596 normalize=False,
597 )
598 return self._apply(window_func, name="sum", numeric_only=numeric_only)
599 else:
600 raise ValueError("engine must be either 'numba' or 'cython'")
601
602 @doc(
603 template_header,
604 create_section_header("Parameters"),
605 dedent(
606 """
607 bias : bool, default False
608 Use a standard estimation bias correction.
609 """
610 ).replace("\n", "", 1),
611 kwargs_numeric_only,
612 create_section_header("Returns"),
613 template_returns,
614 create_section_header("See Also"),
615 template_see_also[:-1],
616 window_method="ewm",
617 aggregation_description="(exponential weighted moment) standard deviation",
618 agg_method="std",
619 )
620 def std(self, bias: bool = False, numeric_only: bool = False):
621 if (
622 numeric_only
623 and self._selected_obj.ndim == 1
624 and not is_numeric_dtype(self._selected_obj.dtype)
625 ):
626 # Raise directly so error message says std instead of var
627 raise NotImplementedError(
628 f"{type(self).__name__}.std does not implement numeric_only"
629 )
630 return zsqrt(self.var(bias=bias, numeric_only=numeric_only))
631
632 @doc(
633 template_header,
634 create_section_header("Parameters"),
635 dedent(
636 """
637 bias : bool, default False
638 Use a standard estimation bias correction.
639 """
640 ).replace("\n", "", 1),
641 kwargs_numeric_only,
642 create_section_header("Returns"),
643 template_returns,
644 create_section_header("See Also"),
645 template_see_also[:-1],
646 window_method="ewm",
647 aggregation_description="(exponential weighted moment) variance",
648 agg_method="var",
649 )
650 def var(self, bias: bool = False, numeric_only: bool = False):
651 window_func = window_aggregations.ewmcov
652 wfunc = partial(
653 window_func,
654 com=self._com,
655 adjust=self.adjust,
656 ignore_na=self.ignore_na,
657 bias=bias,
658 )
659
660 def var_func(values, begin, end, min_periods):
661 return wfunc(values, begin, end, min_periods, values)
662
663 return self._apply(var_func, name="var", numeric_only=numeric_only)
664
665 @doc(
666 template_header,
667 create_section_header("Parameters"),
668 dedent(
669 """
670 other : Series or DataFrame , optional
671 If not supplied then will default to self and produce pairwise
672 output.
673 pairwise : bool, default None
674 If False then only matching columns between self and other will be
675 used and the output will be a DataFrame.
676 If True then all pairwise combinations will be calculated and the
677 output will be a MultiIndex DataFrame in the case of DataFrame
678 inputs. In the case of missing elements, only complete pairwise
679 observations will be used.
680 bias : bool, default False
681 Use a standard estimation bias correction.
682 """
683 ).replace("\n", "", 1),
684 kwargs_numeric_only,
685 create_section_header("Returns"),
686 template_returns,
687 create_section_header("See Also"),
688 template_see_also[:-1],
689 window_method="ewm",
690 aggregation_description="(exponential weighted moment) sample covariance",
691 agg_method="cov",
692 )
693 def cov(
694 self,
695 other: DataFrame | Series | None = None,
696 pairwise: bool | None = None,
697 bias: bool = False,
698 numeric_only: bool = False,
699 ):
700 from pandas import Series
701
702 self._validate_numeric_only("cov", numeric_only)
703
704 def cov_func(x, y):
705 x_array = self._prep_values(x)
706 y_array = self._prep_values(y)
707 window_indexer = self._get_window_indexer()
708 min_periods = (
709 self.min_periods
710 if self.min_periods is not None
711 else window_indexer.window_size
712 )
713 start, end = window_indexer.get_window_bounds(
714 num_values=len(x_array),
715 min_periods=min_periods,
716 center=self.center,
717 closed=self.closed,
718 step=self.step,
719 )
720 result = window_aggregations.ewmcov(
721 x_array,
722 start,
723 end,
724 # error: Argument 4 to "ewmcov" has incompatible type
725 # "Optional[int]"; expected "int"
726 self.min_periods, # type: ignore[arg-type]
727 y_array,
728 self._com,
729 self.adjust,
730 self.ignore_na,
731 bias,
732 )
733 return Series(result, index=x.index, name=x.name, copy=False)
734
735 return self._apply_pairwise(
736 self._selected_obj, other, pairwise, cov_func, numeric_only
737 )
738
739 @doc(
740 template_header,
741 create_section_header("Parameters"),
742 dedent(
743 """
744 other : Series or DataFrame, optional
745 If not supplied then will default to self and produce pairwise
746 output.
747 pairwise : bool, default None
748 If False then only matching columns between self and other will be
749 used and the output will be a DataFrame.
750 If True then all pairwise combinations will be calculated and the
751 output will be a MultiIndex DataFrame in the case of DataFrame
752 inputs. In the case of missing elements, only complete pairwise
753 observations will be used.
754 """
755 ).replace("\n", "", 1),
756 kwargs_numeric_only,
757 create_section_header("Returns"),
758 template_returns,
759 create_section_header("See Also"),
760 template_see_also[:-1],
761 window_method="ewm",
762 aggregation_description="(exponential weighted moment) sample correlation",
763 agg_method="corr",
764 )
765 def corr(
766 self,
767 other: DataFrame | Series | None = None,
768 pairwise: bool | None = None,
769 numeric_only: bool = False,
770 ):
771 from pandas import Series
772
773 self._validate_numeric_only("corr", numeric_only)
774
775 def cov_func(x, y):
776 x_array = self._prep_values(x)
777 y_array = self._prep_values(y)
778 window_indexer = self._get_window_indexer()
779 min_periods = (
780 self.min_periods
781 if self.min_periods is not None
782 else window_indexer.window_size
783 )
784 start, end = window_indexer.get_window_bounds(
785 num_values=len(x_array),
786 min_periods=min_periods,
787 center=self.center,
788 closed=self.closed,
789 step=self.step,
790 )
791
792 def _cov(X, Y):
793 return window_aggregations.ewmcov(
794 X,
795 start,
796 end,
797 min_periods,
798 Y,
799 self._com,
800 self.adjust,
801 self.ignore_na,
802 True,
803 )
804
805 with np.errstate(all="ignore"):
806 cov = _cov(x_array, y_array)
807 x_var = _cov(x_array, x_array)
808 y_var = _cov(y_array, y_array)
809 result = cov / zsqrt(x_var * y_var)
810 return Series(result, index=x.index, name=x.name, copy=False)
811
812 return self._apply_pairwise(
813 self._selected_obj, other, pairwise, cov_func, numeric_only
814 )
815
816
817class ExponentialMovingWindowGroupby(BaseWindowGroupby, ExponentialMovingWindow):
818 """
819 Provide an exponential moving window groupby implementation.
820 """
821
822 _attributes = ExponentialMovingWindow._attributes + BaseWindowGroupby._attributes
823
824 def __init__(self, obj, *args, _grouper=None, **kwargs) -> None:
825 super().__init__(obj, *args, _grouper=_grouper, **kwargs)
826
827 if not obj.empty and self.times is not None:
828 # sort the times and recalculate the deltas according to the groups
829 groupby_order = np.concatenate(list(self._grouper.indices.values()))
830 self._deltas = _calculate_deltas(
831 self.times.take(groupby_order),
832 self.halflife,
833 )
834
835 def _get_window_indexer(self) -> GroupbyIndexer:
836 """
837 Return an indexer class that will compute the window start and end bounds
838
839 Returns
840 -------
841 GroupbyIndexer
842 """
843 window_indexer = GroupbyIndexer(
844 groupby_indices=self._grouper.indices,
845 window_indexer=ExponentialMovingWindowIndexer,
846 )
847 return window_indexer
848
849
850class OnlineExponentialMovingWindow(ExponentialMovingWindow):
851 def __init__(
852 self,
853 obj: NDFrame,
854 com: float | None = None,
855 span: float | None = None,
856 halflife: float | TimedeltaConvertibleTypes | None = None,
857 alpha: float | None = None,
858 min_periods: int | None = 0,
859 adjust: bool = True,
860 ignore_na: bool = False,
861 axis: Axis = 0,
862 times: np.ndarray | NDFrame | None = None,
863 engine: str = "numba",
864 engine_kwargs: dict[str, bool] | None = None,
865 *,
866 selection=None,
867 ) -> None:
868 if times is not None:
869 raise NotImplementedError(
870 "times is not implemented with online operations."
871 )
872 super().__init__(
873 obj=obj,
874 com=com,
875 span=span,
876 halflife=halflife,
877 alpha=alpha,
878 min_periods=min_periods,
879 adjust=adjust,
880 ignore_na=ignore_na,
881 axis=axis,
882 times=times,
883 selection=selection,
884 )
885 self._mean = EWMMeanState(
886 self._com, self.adjust, self.ignore_na, self.axis, obj.shape
887 )
888 if maybe_use_numba(engine):
889 self.engine = engine
890 self.engine_kwargs = engine_kwargs
891 else:
892 raise ValueError("'numba' is the only supported engine")
893
894 def reset(self) -> None:
895 """
896 Reset the state captured by `update` calls.
897 """
898 self._mean.reset()
899
900 def aggregate(self, func, *args, **kwargs):
901 raise NotImplementedError("aggregate is not implemented.")
902
903 def std(self, bias: bool = False, *args, **kwargs):
904 raise NotImplementedError("std is not implemented.")
905
906 def corr(
907 self,
908 other: DataFrame | Series | None = None,
909 pairwise: bool | None = None,
910 numeric_only: bool = False,
911 ):
912 raise NotImplementedError("corr is not implemented.")
913
914 def cov(
915 self,
916 other: DataFrame | Series | None = None,
917 pairwise: bool | None = None,
918 bias: bool = False,
919 numeric_only: bool = False,
920 ):
921 raise NotImplementedError("cov is not implemented.")
922
923 def var(self, bias: bool = False, numeric_only: bool = False):
924 raise NotImplementedError("var is not implemented.")
925
926 def mean(self, *args, update=None, update_times=None, **kwargs):
927 """
928 Calculate an online exponentially weighted mean.
929
930 Parameters
931 ----------
932 update: DataFrame or Series, default None
933 New values to continue calculating the
934 exponentially weighted mean from the last values and weights.
935 Values should be float64 dtype.
936
937 ``update`` needs to be ``None`` the first time the
938 exponentially weighted mean is calculated.
939
940 update_times: Series or 1-D np.ndarray, default None
941 New times to continue calculating the
942 exponentially weighted mean from the last values and weights.
943 If ``None``, values are assumed to be evenly spaced
944 in time.
945 This feature is currently unsupported.
946
947 Returns
948 -------
949 DataFrame or Series
950
951 Examples
952 --------
953 >>> df = pd.DataFrame({"a": range(5), "b": range(5, 10)})
954 >>> online_ewm = df.head(2).ewm(0.5).online()
955 >>> online_ewm.mean()
956 a b
957 0 0.00 5.00
958 1 0.75 5.75
959 >>> online_ewm.mean(update=df.tail(3))
960 a b
961 2 1.615385 6.615385
962 3 2.550000 7.550000
963 4 3.520661 8.520661
964 >>> online_ewm.reset()
965 >>> online_ewm.mean()
966 a b
967 0 0.00 5.00
968 1 0.75 5.75
969 """
970 result_kwargs = {}
971 is_frame = self._selected_obj.ndim == 2
972 if update_times is not None:
973 raise NotImplementedError("update_times is not implemented.")
974 update_deltas = np.ones(
975 max(self._selected_obj.shape[self.axis - 1] - 1, 0), dtype=np.float64
976 )
977 if update is not None:
978 if self._mean.last_ewm is None:
979 raise ValueError(
980 "Must call mean with update=None first before passing update"
981 )
982 result_from = 1
983 result_kwargs["index"] = update.index
984 if is_frame:
985 last_value = self._mean.last_ewm[np.newaxis, :]
986 result_kwargs["columns"] = update.columns
987 else:
988 last_value = self._mean.last_ewm
989 result_kwargs["name"] = update.name
990 np_array = np.concatenate((last_value, update.to_numpy()))
991 else:
992 result_from = 0
993 result_kwargs["index"] = self._selected_obj.index
994 if is_frame:
995 result_kwargs["columns"] = self._selected_obj.columns
996 else:
997 result_kwargs["name"] = self._selected_obj.name
998 np_array = self._selected_obj.astype(np.float64).to_numpy()
999 ewma_func = generate_online_numba_ewma_func(
1000 **get_jit_arguments(self.engine_kwargs)
1001 )
1002 result = self._mean.run_ewm(
1003 np_array if is_frame else np_array[:, np.newaxis],
1004 update_deltas,
1005 self.min_periods,
1006 ewma_func,
1007 )
1008 if not is_frame:
1009 result = result.squeeze()
1010 result = result[result_from:]
1011 result = self._selected_obj._constructor(result, **result_kwargs)
1012 return result