Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pandas/core/window/ewm.py: 25%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

227 statements  

1from __future__ import annotations 

2 

3import datetime 

4from functools import partial 

5from textwrap import dedent 

6from typing import TYPE_CHECKING 

7 

8import numpy as np 

9 

10from pandas._libs.tslibs import Timedelta 

11import pandas._libs.window.aggregations as window_aggregations 

12from pandas._typing import ( 

13 Axis, 

14 TimedeltaConvertibleTypes, 

15) 

16 

17if TYPE_CHECKING: 

18 from pandas import DataFrame, Series 

19 from pandas.core.generic import NDFrame 

20 

21from pandas.util._decorators import doc 

22 

23from pandas.core.dtypes.common import ( 

24 is_datetime64_ns_dtype, 

25 is_numeric_dtype, 

26) 

27from pandas.core.dtypes.missing import isna 

28 

29from pandas.core import common 

30from pandas.core.indexers.objects import ( 

31 BaseIndexer, 

32 ExponentialMovingWindowIndexer, 

33 GroupbyIndexer, 

34) 

35from pandas.core.util.numba_ import ( 

36 get_jit_arguments, 

37 maybe_use_numba, 

38) 

39from pandas.core.window.common import zsqrt 

40from pandas.core.window.doc import ( 

41 _shared_docs, 

42 create_section_header, 

43 kwargs_numeric_only, 

44 numba_notes, 

45 template_header, 

46 template_returns, 

47 template_see_also, 

48 window_agg_numba_parameters, 

49) 

50from pandas.core.window.numba_ import ( 

51 generate_numba_ewm_func, 

52 generate_numba_ewm_table_func, 

53) 

54from pandas.core.window.online import ( 

55 EWMMeanState, 

56 generate_online_numba_ewma_func, 

57) 

58from pandas.core.window.rolling import ( 

59 BaseWindow, 

60 BaseWindowGroupby, 

61) 

62 

63 

64def get_center_of_mass( 

65 comass: float | None, 

66 span: float | None, 

67 halflife: float | None, 

68 alpha: float | None, 

69) -> float: 

70 valid_count = common.count_not_none(comass, span, halflife, alpha) 

71 if valid_count > 1: 

72 raise ValueError("comass, span, halflife, and alpha are mutually exclusive") 

73 

74 # Convert to center of mass; domain checks ensure 0 < alpha <= 1 

75 if comass is not None: 

76 if comass < 0: 

77 raise ValueError("comass must satisfy: comass >= 0") 

78 elif span is not None: 

79 if span < 1: 

80 raise ValueError("span must satisfy: span >= 1") 

81 comass = (span - 1) / 2 

82 elif halflife is not None: 

83 if halflife <= 0: 

84 raise ValueError("halflife must satisfy: halflife > 0") 

85 decay = 1 - np.exp(np.log(0.5) / halflife) 

86 comass = 1 / decay - 1 

87 elif alpha is not None: 

88 if alpha <= 0 or alpha > 1: 

89 raise ValueError("alpha must satisfy: 0 < alpha <= 1") 

90 comass = (1 - alpha) / alpha 

91 else: 

92 raise ValueError("Must pass one of comass, span, halflife, or alpha") 

93 

94 return float(comass) 

95 

96 

97def _calculate_deltas( 

98 times: np.ndarray | NDFrame, 

99 halflife: float | TimedeltaConvertibleTypes | None, 

100) -> np.ndarray: 

101 """ 

102 Return the diff of the times divided by the half-life. These values are used in 

103 the calculation of the ewm mean. 

104 

105 Parameters 

106 ---------- 

107 times : np.ndarray, Series 

108 Times corresponding to the observations. Must be monotonically increasing 

109 and ``datetime64[ns]`` dtype. 

110 halflife : float, str, timedelta, optional 

111 Half-life specifying the decay 

112 

113 Returns 

114 ------- 

115 np.ndarray 

116 Diff of the times divided by the half-life 

117 """ 

118 _times = np.asarray(times.view(np.int64), dtype=np.float64) 

119 # TODO: generalize to non-nano? 

120 _halflife = float(Timedelta(halflife).as_unit("ns")._value) 

121 return np.diff(_times) / _halflife 

122 

123 

124class ExponentialMovingWindow(BaseWindow): 

125 r""" 

126 Provide exponentially weighted (EW) calculations. 

127 

128 Exactly one of ``com``, ``span``, ``halflife``, or ``alpha`` must be 

129 provided if ``times`` is not provided. If ``times`` is provided, 

130 ``halflife`` and one of ``com``, ``span`` or ``alpha`` may be provided. 

131 

132 Parameters 

133 ---------- 

134 com : float, optional 

135 Specify decay in terms of center of mass 

136 

137 :math:`\alpha = 1 / (1 + com)`, for :math:`com \geq 0`. 

138 

139 span : float, optional 

140 Specify decay in terms of span 

141 

142 :math:`\alpha = 2 / (span + 1)`, for :math:`span \geq 1`. 

143 

144 halflife : float, str, timedelta, optional 

145 Specify decay in terms of half-life 

146 

147 :math:`\alpha = 1 - \exp\left(-\ln(2) / halflife\right)`, for 

148 :math:`halflife > 0`. 

149 

150 If ``times`` is specified, a timedelta convertible unit over which an 

151 observation decays to half its value. Only applicable to ``mean()``, 

152 and halflife value will not apply to the other functions. 

153 

154 .. versionadded:: 1.1.0 

155 

156 alpha : float, optional 

157 Specify smoothing factor :math:`\alpha` directly 

158 

159 :math:`0 < \alpha \leq 1`. 

160 

161 min_periods : int, default 0 

162 Minimum number of observations in window required to have a value; 

163 otherwise, result is ``np.nan``. 

164 

165 adjust : bool, default True 

166 Divide by decaying adjustment factor in beginning periods to account 

167 for imbalance in relative weightings (viewing EWMA as a moving average). 

168 

169 - When ``adjust=True`` (default), the EW function is calculated using weights 

170 :math:`w_i = (1 - \alpha)^i`. For example, the EW moving average of the series 

171 [:math:`x_0, x_1, ..., x_t`] would be: 

172 

173 .. math:: 

174 y_t = \frac{x_t + (1 - \alpha)x_{t-1} + (1 - \alpha)^2 x_{t-2} + ... + (1 - 

175 \alpha)^t x_0}{1 + (1 - \alpha) + (1 - \alpha)^2 + ... + (1 - \alpha)^t} 

176 

177 - When ``adjust=False``, the exponentially weighted function is calculated 

178 recursively: 

179 

180 .. math:: 

181 \begin{split} 

182 y_0 &= x_0\\ 

183 y_t &= (1 - \alpha) y_{t-1} + \alpha x_t, 

184 \end{split} 

185 ignore_na : bool, default False 

186 Ignore missing values when calculating weights. 

187 

188 - When ``ignore_na=False`` (default), weights are based on absolute positions. 

189 For example, the weights of :math:`x_0` and :math:`x_2` used in calculating 

190 the final weighted average of [:math:`x_0`, None, :math:`x_2`] are 

191 :math:`(1-\alpha)^2` and :math:`1` if ``adjust=True``, and 

192 :math:`(1-\alpha)^2` and :math:`\alpha` if ``adjust=False``. 

193 

194 - When ``ignore_na=True``, weights are based 

195 on relative positions. For example, the weights of :math:`x_0` and :math:`x_2` 

196 used in calculating the final weighted average of 

197 [:math:`x_0`, None, :math:`x_2`] are :math:`1-\alpha` and :math:`1` if 

198 ``adjust=True``, and :math:`1-\alpha` and :math:`\alpha` if ``adjust=False``. 

199 

200 axis : {0, 1}, default 0 

201 If ``0`` or ``'index'``, calculate across the rows. 

202 

203 If ``1`` or ``'columns'``, calculate across the columns. 

204 

205 For `Series` this parameter is unused and defaults to 0. 

206 

207 times : np.ndarray, Series, default None 

208 

209 .. versionadded:: 1.1.0 

210 

211 Only applicable to ``mean()``. 

212 

213 Times corresponding to the observations. Must be monotonically increasing and 

214 ``datetime64[ns]`` dtype. 

215 

216 If 1-D array like, a sequence with the same shape as the observations. 

217 

218 method : str {'single', 'table'}, default 'single' 

219 .. versionadded:: 1.4.0 

220 

221 Execute the rolling operation per single column or row (``'single'``) 

222 or over the entire object (``'table'``). 

223 

224 This argument is only implemented when specifying ``engine='numba'`` 

225 in the method call. 

226 

227 Only applicable to ``mean()`` 

228 

229 Returns 

230 ------- 

231 ``ExponentialMovingWindow`` subclass 

232 

233 See Also 

234 -------- 

235 rolling : Provides rolling window calculations. 

236 expanding : Provides expanding transformations. 

237 

238 Notes 

239 ----- 

240 See :ref:`Windowing Operations <window.exponentially_weighted>` 

241 for further usage details and examples. 

242 

243 Examples 

244 -------- 

245 >>> df = pd.DataFrame({'B': [0, 1, 2, np.nan, 4]}) 

246 >>> df 

247 B 

248 0 0.0 

249 1 1.0 

250 2 2.0 

251 3 NaN 

252 4 4.0 

253 

254 >>> df.ewm(com=0.5).mean() 

255 B 

256 0 0.000000 

257 1 0.750000 

258 2 1.615385 

259 3 1.615385 

260 4 3.670213 

261 >>> df.ewm(alpha=2 / 3).mean() 

262 B 

263 0 0.000000 

264 1 0.750000 

265 2 1.615385 

266 3 1.615385 

267 4 3.670213 

268 

269 **adjust** 

270 

271 >>> df.ewm(com=0.5, adjust=True).mean() 

272 B 

273 0 0.000000 

274 1 0.750000 

275 2 1.615385 

276 3 1.615385 

277 4 3.670213 

278 >>> df.ewm(com=0.5, adjust=False).mean() 

279 B 

280 0 0.000000 

281 1 0.666667 

282 2 1.555556 

283 3 1.555556 

284 4 3.650794 

285 

286 **ignore_na** 

287 

288 >>> df.ewm(com=0.5, ignore_na=True).mean() 

289 B 

290 0 0.000000 

291 1 0.750000 

292 2 1.615385 

293 3 1.615385 

294 4 3.225000 

295 >>> df.ewm(com=0.5, ignore_na=False).mean() 

296 B 

297 0 0.000000 

298 1 0.750000 

299 2 1.615385 

300 3 1.615385 

301 4 3.670213 

302 

303 **times** 

304 

305 Exponentially weighted mean with weights calculated with a timedelta ``halflife`` 

306 relative to ``times``. 

307 

308 >>> times = ['2020-01-01', '2020-01-03', '2020-01-10', '2020-01-15', '2020-01-17'] 

309 >>> df.ewm(halflife='4 days', times=pd.DatetimeIndex(times)).mean() 

310 B 

311 0 0.000000 

312 1 0.585786 

313 2 1.523889 

314 3 1.523889 

315 4 3.233686 

316 """ 

317 

318 _attributes = [ 

319 "com", 

320 "span", 

321 "halflife", 

322 "alpha", 

323 "min_periods", 

324 "adjust", 

325 "ignore_na", 

326 "axis", 

327 "times", 

328 "method", 

329 ] 

330 

331 def __init__( 

332 self, 

333 obj: NDFrame, 

334 com: float | None = None, 

335 span: float | None = None, 

336 halflife: float | TimedeltaConvertibleTypes | None = None, 

337 alpha: float | None = None, 

338 min_periods: int | None = 0, 

339 adjust: bool = True, 

340 ignore_na: bool = False, 

341 axis: Axis = 0, 

342 times: np.ndarray | NDFrame | None = None, 

343 method: str = "single", 

344 *, 

345 selection=None, 

346 ) -> None: 

347 super().__init__( 

348 obj=obj, 

349 min_periods=1 if min_periods is None else max(int(min_periods), 1), 

350 on=None, 

351 center=False, 

352 closed=None, 

353 method=method, 

354 axis=axis, 

355 selection=selection, 

356 ) 

357 self.com = com 

358 self.span = span 

359 self.halflife = halflife 

360 self.alpha = alpha 

361 self.adjust = adjust 

362 self.ignore_na = ignore_na 

363 self.times = times 

364 if self.times is not None: 

365 if not self.adjust: 

366 raise NotImplementedError("times is not supported with adjust=False.") 

367 if not is_datetime64_ns_dtype(self.times): 

368 raise ValueError("times must be datetime64[ns] dtype.") 

369 if len(self.times) != len(obj): 

370 raise ValueError("times must be the same length as the object.") 

371 if not isinstance(self.halflife, (str, datetime.timedelta, np.timedelta64)): 

372 raise ValueError("halflife must be a timedelta convertible object") 

373 if isna(self.times).any(): 

374 raise ValueError("Cannot convert NaT values to integer") 

375 self._deltas = _calculate_deltas(self.times, self.halflife) 

376 # Halflife is no longer applicable when calculating COM 

377 # But allow COM to still be calculated if the user passes other decay args 

378 if common.count_not_none(self.com, self.span, self.alpha) > 0: 

379 self._com = get_center_of_mass(self.com, self.span, None, self.alpha) 

380 else: 

381 self._com = 1.0 

382 else: 

383 if self.halflife is not None and isinstance( 

384 self.halflife, (str, datetime.timedelta, np.timedelta64) 

385 ): 

386 raise ValueError( 

387 "halflife can only be a timedelta convertible argument if " 

388 "times is not None." 

389 ) 

390 # Without times, points are equally spaced 

391 self._deltas = np.ones( 

392 max(self.obj.shape[self.axis] - 1, 0), dtype=np.float64 

393 ) 

394 self._com = get_center_of_mass( 

395 # error: Argument 3 to "get_center_of_mass" has incompatible type 

396 # "Union[float, Any, None, timedelta64, signedinteger[_64Bit]]"; 

397 # expected "Optional[float]" 

398 self.com, 

399 self.span, 

400 self.halflife, # type: ignore[arg-type] 

401 self.alpha, 

402 ) 

403 

404 def _check_window_bounds( 

405 self, start: np.ndarray, end: np.ndarray, num_vals: int 

406 ) -> None: 

407 # emw algorithms are iterative with each point 

408 # ExponentialMovingWindowIndexer "bounds" are the entire window 

409 pass 

410 

411 def _get_window_indexer(self) -> BaseIndexer: 

412 """ 

413 Return an indexer class that will compute the window start and end bounds 

414 """ 

415 return ExponentialMovingWindowIndexer() 

416 

417 def online( 

418 self, engine: str = "numba", engine_kwargs=None 

419 ) -> OnlineExponentialMovingWindow: 

420 """ 

421 Return an ``OnlineExponentialMovingWindow`` object to calculate 

422 exponentially moving window aggregations in an online method. 

423 

424 .. versionadded:: 1.3.0 

425 

426 Parameters 

427 ---------- 

428 engine: str, default ``'numba'`` 

429 Execution engine to calculate online aggregations. 

430 Applies to all supported aggregation methods. 

431 

432 engine_kwargs : dict, default None 

433 Applies to all supported aggregation methods. 

434 

435 * For ``'numba'`` engine, the engine can accept ``nopython``, ``nogil`` 

436 and ``parallel`` dictionary keys. The values must either be ``True`` or 

437 ``False``. The default ``engine_kwargs`` for the ``'numba'`` engine is 

438 ``{{'nopython': True, 'nogil': False, 'parallel': False}}`` and will be 

439 applied to the function 

440 

441 Returns 

442 ------- 

443 OnlineExponentialMovingWindow 

444 """ 

445 return OnlineExponentialMovingWindow( 

446 obj=self.obj, 

447 com=self.com, 

448 span=self.span, 

449 halflife=self.halflife, 

450 alpha=self.alpha, 

451 min_periods=self.min_periods, 

452 adjust=self.adjust, 

453 ignore_na=self.ignore_na, 

454 axis=self.axis, 

455 times=self.times, 

456 engine=engine, 

457 engine_kwargs=engine_kwargs, 

458 selection=self._selection, 

459 ) 

460 

461 @doc( 

462 _shared_docs["aggregate"], 

463 see_also=dedent( 

464 """ 

465 See Also 

466 -------- 

467 pandas.DataFrame.rolling.aggregate 

468 """ 

469 ), 

470 examples=dedent( 

471 """ 

472 Examples 

473 -------- 

474 >>> df = pd.DataFrame({"A": [1, 2, 3], "B": [4, 5, 6], "C": [7, 8, 9]}) 

475 >>> df 

476 A B C 

477 0 1 4 7 

478 1 2 5 8 

479 2 3 6 9 

480 

481 >>> df.ewm(alpha=0.5).mean() 

482 A B C 

483 0 1.000000 4.000000 7.000000 

484 1 1.666667 4.666667 7.666667 

485 2 2.428571 5.428571 8.428571 

486 """ 

487 ), 

488 klass="Series/Dataframe", 

489 axis="", 

490 ) 

491 def aggregate(self, func, *args, **kwargs): 

492 return super().aggregate(func, *args, **kwargs) 

493 

494 agg = aggregate 

495 

496 @doc( 

497 template_header, 

498 create_section_header("Parameters"), 

499 kwargs_numeric_only, 

500 window_agg_numba_parameters(), 

501 create_section_header("Returns"), 

502 template_returns, 

503 create_section_header("See Also"), 

504 template_see_also, 

505 create_section_header("Notes"), 

506 numba_notes.replace("\n", "", 1), 

507 window_method="ewm", 

508 aggregation_description="(exponential weighted moment) mean", 

509 agg_method="mean", 

510 ) 

511 def mean( 

512 self, 

513 numeric_only: bool = False, 

514 engine=None, 

515 engine_kwargs=None, 

516 ): 

517 if maybe_use_numba(engine): 

518 if self.method == "single": 

519 func = generate_numba_ewm_func 

520 else: 

521 func = generate_numba_ewm_table_func 

522 ewm_func = func( 

523 **get_jit_arguments(engine_kwargs), 

524 com=self._com, 

525 adjust=self.adjust, 

526 ignore_na=self.ignore_na, 

527 deltas=tuple(self._deltas), 

528 normalize=True, 

529 ) 

530 return self._apply(ewm_func, name="mean") 

531 elif engine in ("cython", None): 

532 if engine_kwargs is not None: 

533 raise ValueError("cython engine does not accept engine_kwargs") 

534 

535 deltas = None if self.times is None else self._deltas 

536 window_func = partial( 

537 window_aggregations.ewm, 

538 com=self._com, 

539 adjust=self.adjust, 

540 ignore_na=self.ignore_na, 

541 deltas=deltas, 

542 normalize=True, 

543 ) 

544 return self._apply(window_func, name="mean", numeric_only=numeric_only) 

545 else: 

546 raise ValueError("engine must be either 'numba' or 'cython'") 

547 

548 @doc( 

549 template_header, 

550 create_section_header("Parameters"), 

551 kwargs_numeric_only, 

552 window_agg_numba_parameters(), 

553 create_section_header("Returns"), 

554 template_returns, 

555 create_section_header("See Also"), 

556 template_see_also, 

557 create_section_header("Notes"), 

558 numba_notes.replace("\n", "", 1), 

559 window_method="ewm", 

560 aggregation_description="(exponential weighted moment) sum", 

561 agg_method="sum", 

562 ) 

563 def sum( 

564 self, 

565 numeric_only: bool = False, 

566 engine=None, 

567 engine_kwargs=None, 

568 ): 

569 if not self.adjust: 

570 raise NotImplementedError("sum is not implemented with adjust=False") 

571 if maybe_use_numba(engine): 

572 if self.method == "single": 

573 func = generate_numba_ewm_func 

574 else: 

575 func = generate_numba_ewm_table_func 

576 ewm_func = func( 

577 **get_jit_arguments(engine_kwargs), 

578 com=self._com, 

579 adjust=self.adjust, 

580 ignore_na=self.ignore_na, 

581 deltas=tuple(self._deltas), 

582 normalize=False, 

583 ) 

584 return self._apply(ewm_func, name="sum") 

585 elif engine in ("cython", None): 

586 if engine_kwargs is not None: 

587 raise ValueError("cython engine does not accept engine_kwargs") 

588 

589 deltas = None if self.times is None else self._deltas 

590 window_func = partial( 

591 window_aggregations.ewm, 

592 com=self._com, 

593 adjust=self.adjust, 

594 ignore_na=self.ignore_na, 

595 deltas=deltas, 

596 normalize=False, 

597 ) 

598 return self._apply(window_func, name="sum", numeric_only=numeric_only) 

599 else: 

600 raise ValueError("engine must be either 'numba' or 'cython'") 

601 

602 @doc( 

603 template_header, 

604 create_section_header("Parameters"), 

605 dedent( 

606 """ 

607 bias : bool, default False 

608 Use a standard estimation bias correction. 

609 """ 

610 ).replace("\n", "", 1), 

611 kwargs_numeric_only, 

612 create_section_header("Returns"), 

613 template_returns, 

614 create_section_header("See Also"), 

615 template_see_also[:-1], 

616 window_method="ewm", 

617 aggregation_description="(exponential weighted moment) standard deviation", 

618 agg_method="std", 

619 ) 

620 def std(self, bias: bool = False, numeric_only: bool = False): 

621 if ( 

622 numeric_only 

623 and self._selected_obj.ndim == 1 

624 and not is_numeric_dtype(self._selected_obj.dtype) 

625 ): 

626 # Raise directly so error message says std instead of var 

627 raise NotImplementedError( 

628 f"{type(self).__name__}.std does not implement numeric_only" 

629 ) 

630 return zsqrt(self.var(bias=bias, numeric_only=numeric_only)) 

631 

632 @doc( 

633 template_header, 

634 create_section_header("Parameters"), 

635 dedent( 

636 """ 

637 bias : bool, default False 

638 Use a standard estimation bias correction. 

639 """ 

640 ).replace("\n", "", 1), 

641 kwargs_numeric_only, 

642 create_section_header("Returns"), 

643 template_returns, 

644 create_section_header("See Also"), 

645 template_see_also[:-1], 

646 window_method="ewm", 

647 aggregation_description="(exponential weighted moment) variance", 

648 agg_method="var", 

649 ) 

650 def var(self, bias: bool = False, numeric_only: bool = False): 

651 window_func = window_aggregations.ewmcov 

652 wfunc = partial( 

653 window_func, 

654 com=self._com, 

655 adjust=self.adjust, 

656 ignore_na=self.ignore_na, 

657 bias=bias, 

658 ) 

659 

660 def var_func(values, begin, end, min_periods): 

661 return wfunc(values, begin, end, min_periods, values) 

662 

663 return self._apply(var_func, name="var", numeric_only=numeric_only) 

664 

665 @doc( 

666 template_header, 

667 create_section_header("Parameters"), 

668 dedent( 

669 """ 

670 other : Series or DataFrame , optional 

671 If not supplied then will default to self and produce pairwise 

672 output. 

673 pairwise : bool, default None 

674 If False then only matching columns between self and other will be 

675 used and the output will be a DataFrame. 

676 If True then all pairwise combinations will be calculated and the 

677 output will be a MultiIndex DataFrame in the case of DataFrame 

678 inputs. In the case of missing elements, only complete pairwise 

679 observations will be used. 

680 bias : bool, default False 

681 Use a standard estimation bias correction. 

682 """ 

683 ).replace("\n", "", 1), 

684 kwargs_numeric_only, 

685 create_section_header("Returns"), 

686 template_returns, 

687 create_section_header("See Also"), 

688 template_see_also[:-1], 

689 window_method="ewm", 

690 aggregation_description="(exponential weighted moment) sample covariance", 

691 agg_method="cov", 

692 ) 

693 def cov( 

694 self, 

695 other: DataFrame | Series | None = None, 

696 pairwise: bool | None = None, 

697 bias: bool = False, 

698 numeric_only: bool = False, 

699 ): 

700 from pandas import Series 

701 

702 self._validate_numeric_only("cov", numeric_only) 

703 

704 def cov_func(x, y): 

705 x_array = self._prep_values(x) 

706 y_array = self._prep_values(y) 

707 window_indexer = self._get_window_indexer() 

708 min_periods = ( 

709 self.min_periods 

710 if self.min_periods is not None 

711 else window_indexer.window_size 

712 ) 

713 start, end = window_indexer.get_window_bounds( 

714 num_values=len(x_array), 

715 min_periods=min_periods, 

716 center=self.center, 

717 closed=self.closed, 

718 step=self.step, 

719 ) 

720 result = window_aggregations.ewmcov( 

721 x_array, 

722 start, 

723 end, 

724 # error: Argument 4 to "ewmcov" has incompatible type 

725 # "Optional[int]"; expected "int" 

726 self.min_periods, # type: ignore[arg-type] 

727 y_array, 

728 self._com, 

729 self.adjust, 

730 self.ignore_na, 

731 bias, 

732 ) 

733 return Series(result, index=x.index, name=x.name, copy=False) 

734 

735 return self._apply_pairwise( 

736 self._selected_obj, other, pairwise, cov_func, numeric_only 

737 ) 

738 

739 @doc( 

740 template_header, 

741 create_section_header("Parameters"), 

742 dedent( 

743 """ 

744 other : Series or DataFrame, optional 

745 If not supplied then will default to self and produce pairwise 

746 output. 

747 pairwise : bool, default None 

748 If False then only matching columns between self and other will be 

749 used and the output will be a DataFrame. 

750 If True then all pairwise combinations will be calculated and the 

751 output will be a MultiIndex DataFrame in the case of DataFrame 

752 inputs. In the case of missing elements, only complete pairwise 

753 observations will be used. 

754 """ 

755 ).replace("\n", "", 1), 

756 kwargs_numeric_only, 

757 create_section_header("Returns"), 

758 template_returns, 

759 create_section_header("See Also"), 

760 template_see_also[:-1], 

761 window_method="ewm", 

762 aggregation_description="(exponential weighted moment) sample correlation", 

763 agg_method="corr", 

764 ) 

765 def corr( 

766 self, 

767 other: DataFrame | Series | None = None, 

768 pairwise: bool | None = None, 

769 numeric_only: bool = False, 

770 ): 

771 from pandas import Series 

772 

773 self._validate_numeric_only("corr", numeric_only) 

774 

775 def cov_func(x, y): 

776 x_array = self._prep_values(x) 

777 y_array = self._prep_values(y) 

778 window_indexer = self._get_window_indexer() 

779 min_periods = ( 

780 self.min_periods 

781 if self.min_periods is not None 

782 else window_indexer.window_size 

783 ) 

784 start, end = window_indexer.get_window_bounds( 

785 num_values=len(x_array), 

786 min_periods=min_periods, 

787 center=self.center, 

788 closed=self.closed, 

789 step=self.step, 

790 ) 

791 

792 def _cov(X, Y): 

793 return window_aggregations.ewmcov( 

794 X, 

795 start, 

796 end, 

797 min_periods, 

798 Y, 

799 self._com, 

800 self.adjust, 

801 self.ignore_na, 

802 True, 

803 ) 

804 

805 with np.errstate(all="ignore"): 

806 cov = _cov(x_array, y_array) 

807 x_var = _cov(x_array, x_array) 

808 y_var = _cov(y_array, y_array) 

809 result = cov / zsqrt(x_var * y_var) 

810 return Series(result, index=x.index, name=x.name, copy=False) 

811 

812 return self._apply_pairwise( 

813 self._selected_obj, other, pairwise, cov_func, numeric_only 

814 ) 

815 

816 

817class ExponentialMovingWindowGroupby(BaseWindowGroupby, ExponentialMovingWindow): 

818 """ 

819 Provide an exponential moving window groupby implementation. 

820 """ 

821 

822 _attributes = ExponentialMovingWindow._attributes + BaseWindowGroupby._attributes 

823 

824 def __init__(self, obj, *args, _grouper=None, **kwargs) -> None: 

825 super().__init__(obj, *args, _grouper=_grouper, **kwargs) 

826 

827 if not obj.empty and self.times is not None: 

828 # sort the times and recalculate the deltas according to the groups 

829 groupby_order = np.concatenate(list(self._grouper.indices.values())) 

830 self._deltas = _calculate_deltas( 

831 self.times.take(groupby_order), 

832 self.halflife, 

833 ) 

834 

835 def _get_window_indexer(self) -> GroupbyIndexer: 

836 """ 

837 Return an indexer class that will compute the window start and end bounds 

838 

839 Returns 

840 ------- 

841 GroupbyIndexer 

842 """ 

843 window_indexer = GroupbyIndexer( 

844 groupby_indices=self._grouper.indices, 

845 window_indexer=ExponentialMovingWindowIndexer, 

846 ) 

847 return window_indexer 

848 

849 

850class OnlineExponentialMovingWindow(ExponentialMovingWindow): 

851 def __init__( 

852 self, 

853 obj: NDFrame, 

854 com: float | None = None, 

855 span: float | None = None, 

856 halflife: float | TimedeltaConvertibleTypes | None = None, 

857 alpha: float | None = None, 

858 min_periods: int | None = 0, 

859 adjust: bool = True, 

860 ignore_na: bool = False, 

861 axis: Axis = 0, 

862 times: np.ndarray | NDFrame | None = None, 

863 engine: str = "numba", 

864 engine_kwargs: dict[str, bool] | None = None, 

865 *, 

866 selection=None, 

867 ) -> None: 

868 if times is not None: 

869 raise NotImplementedError( 

870 "times is not implemented with online operations." 

871 ) 

872 super().__init__( 

873 obj=obj, 

874 com=com, 

875 span=span, 

876 halflife=halflife, 

877 alpha=alpha, 

878 min_periods=min_periods, 

879 adjust=adjust, 

880 ignore_na=ignore_na, 

881 axis=axis, 

882 times=times, 

883 selection=selection, 

884 ) 

885 self._mean = EWMMeanState( 

886 self._com, self.adjust, self.ignore_na, self.axis, obj.shape 

887 ) 

888 if maybe_use_numba(engine): 

889 self.engine = engine 

890 self.engine_kwargs = engine_kwargs 

891 else: 

892 raise ValueError("'numba' is the only supported engine") 

893 

894 def reset(self) -> None: 

895 """ 

896 Reset the state captured by `update` calls. 

897 """ 

898 self._mean.reset() 

899 

900 def aggregate(self, func, *args, **kwargs): 

901 raise NotImplementedError("aggregate is not implemented.") 

902 

903 def std(self, bias: bool = False, *args, **kwargs): 

904 raise NotImplementedError("std is not implemented.") 

905 

906 def corr( 

907 self, 

908 other: DataFrame | Series | None = None, 

909 pairwise: bool | None = None, 

910 numeric_only: bool = False, 

911 ): 

912 raise NotImplementedError("corr is not implemented.") 

913 

914 def cov( 

915 self, 

916 other: DataFrame | Series | None = None, 

917 pairwise: bool | None = None, 

918 bias: bool = False, 

919 numeric_only: bool = False, 

920 ): 

921 raise NotImplementedError("cov is not implemented.") 

922 

923 def var(self, bias: bool = False, numeric_only: bool = False): 

924 raise NotImplementedError("var is not implemented.") 

925 

926 def mean(self, *args, update=None, update_times=None, **kwargs): 

927 """ 

928 Calculate an online exponentially weighted mean. 

929 

930 Parameters 

931 ---------- 

932 update: DataFrame or Series, default None 

933 New values to continue calculating the 

934 exponentially weighted mean from the last values and weights. 

935 Values should be float64 dtype. 

936 

937 ``update`` needs to be ``None`` the first time the 

938 exponentially weighted mean is calculated. 

939 

940 update_times: Series or 1-D np.ndarray, default None 

941 New times to continue calculating the 

942 exponentially weighted mean from the last values and weights. 

943 If ``None``, values are assumed to be evenly spaced 

944 in time. 

945 This feature is currently unsupported. 

946 

947 Returns 

948 ------- 

949 DataFrame or Series 

950 

951 Examples 

952 -------- 

953 >>> df = pd.DataFrame({"a": range(5), "b": range(5, 10)}) 

954 >>> online_ewm = df.head(2).ewm(0.5).online() 

955 >>> online_ewm.mean() 

956 a b 

957 0 0.00 5.00 

958 1 0.75 5.75 

959 >>> online_ewm.mean(update=df.tail(3)) 

960 a b 

961 2 1.615385 6.615385 

962 3 2.550000 7.550000 

963 4 3.520661 8.520661 

964 >>> online_ewm.reset() 

965 >>> online_ewm.mean() 

966 a b 

967 0 0.00 5.00 

968 1 0.75 5.75 

969 """ 

970 result_kwargs = {} 

971 is_frame = self._selected_obj.ndim == 2 

972 if update_times is not None: 

973 raise NotImplementedError("update_times is not implemented.") 

974 update_deltas = np.ones( 

975 max(self._selected_obj.shape[self.axis - 1] - 1, 0), dtype=np.float64 

976 ) 

977 if update is not None: 

978 if self._mean.last_ewm is None: 

979 raise ValueError( 

980 "Must call mean with update=None first before passing update" 

981 ) 

982 result_from = 1 

983 result_kwargs["index"] = update.index 

984 if is_frame: 

985 last_value = self._mean.last_ewm[np.newaxis, :] 

986 result_kwargs["columns"] = update.columns 

987 else: 

988 last_value = self._mean.last_ewm 

989 result_kwargs["name"] = update.name 

990 np_array = np.concatenate((last_value, update.to_numpy())) 

991 else: 

992 result_from = 0 

993 result_kwargs["index"] = self._selected_obj.index 

994 if is_frame: 

995 result_kwargs["columns"] = self._selected_obj.columns 

996 else: 

997 result_kwargs["name"] = self._selected_obj.name 

998 np_array = self._selected_obj.astype(np.float64).to_numpy() 

999 ewma_func = generate_online_numba_ewma_func( 

1000 **get_jit_arguments(self.engine_kwargs) 

1001 ) 

1002 result = self._mean.run_ewm( 

1003 np_array if is_frame else np_array[:, np.newaxis], 

1004 update_deltas, 

1005 self.min_periods, 

1006 ewma_func, 

1007 ) 

1008 if not is_frame: 

1009 result = result.squeeze() 

1010 result = result[result_from:] 

1011 result = self._selected_obj._constructor(result, **result_kwargs) 

1012 return result