Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.9/dist-packages/pandas/tseries/frequencies.py: 24%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

313 statements  

1from __future__ import annotations 

2 

3from typing import TYPE_CHECKING 

4 

5import numpy as np 

6 

7from pandas._libs import lib 

8from pandas._libs.algos import unique_deltas 

9from pandas._libs.tslibs import ( 

10 Timestamp, 

11 get_unit_from_dtype, 

12 periods_per_day, 

13 tz_convert_from_utc, 

14) 

15from pandas._libs.tslibs.ccalendar import ( 

16 DAYS, 

17 MONTH_ALIASES, 

18 MONTH_NUMBERS, 

19 MONTHS, 

20 int_to_weekday, 

21) 

22from pandas._libs.tslibs.dtypes import ( 

23 OFFSET_TO_PERIOD_FREQSTR, 

24 freq_to_period_freqstr, 

25) 

26from pandas._libs.tslibs.fields import ( 

27 build_field_sarray, 

28 month_position_check, 

29) 

30from pandas._libs.tslibs.offsets import ( 

31 DateOffset, 

32 Day, 

33 to_offset, 

34) 

35from pandas._libs.tslibs.parsing import get_rule_month 

36from pandas.util._decorators import cache_readonly 

37 

38from pandas.core.dtypes.common import is_numeric_dtype 

39from pandas.core.dtypes.dtypes import ( 

40 DatetimeTZDtype, 

41 PeriodDtype, 

42) 

43from pandas.core.dtypes.generic import ( 

44 ABCIndex, 

45 ABCSeries, 

46) 

47 

48from pandas.core.algorithms import unique 

49 

50if TYPE_CHECKING: 

51 from pandas._typing import npt 

52 

53 from pandas import ( 

54 DatetimeIndex, 

55 Series, 

56 TimedeltaIndex, 

57 ) 

58 from pandas.core.arrays.datetimelike import DatetimeLikeArrayMixin 

59# -------------------------------------------------------------------- 

60# Offset related functions 

61 

62_need_suffix = ["QS", "BQE", "BQS", "YS", "BYE", "BYS"] 

63 

64for _prefix in _need_suffix: 

65 for _m in MONTHS: 

66 key = f"{_prefix}-{_m}" 

67 OFFSET_TO_PERIOD_FREQSTR[key] = OFFSET_TO_PERIOD_FREQSTR[_prefix] 

68 

69for _prefix in ["Y", "Q"]: 

70 for _m in MONTHS: 

71 _alias = f"{_prefix}-{_m}" 

72 OFFSET_TO_PERIOD_FREQSTR[_alias] = _alias 

73 

74for _d in DAYS: 

75 OFFSET_TO_PERIOD_FREQSTR[f"W-{_d}"] = f"W-{_d}" 

76 

77 

78def get_period_alias(offset_str: str) -> str | None: 

79 """ 

80 Alias to closest period strings BQ->Q etc. 

81 """ 

82 return OFFSET_TO_PERIOD_FREQSTR.get(offset_str, None) 

83 

84 

85# --------------------------------------------------------------------- 

86# Period codes 

87 

88 

89def infer_freq( 

90 index: DatetimeIndex | TimedeltaIndex | Series | DatetimeLikeArrayMixin, 

91) -> str | None: 

92 """ 

93 Infer the most likely frequency given the input index. 

94 

95 Parameters 

96 ---------- 

97 index : DatetimeIndex, TimedeltaIndex, Series or array-like 

98 If passed a Series will use the values of the series (NOT THE INDEX). 

99 

100 Returns 

101 ------- 

102 str or None 

103 None if no discernible frequency. 

104 

105 Raises 

106 ------ 

107 TypeError 

108 If the index is not datetime-like. 

109 ValueError 

110 If there are fewer than three values. 

111 

112 Examples 

113 -------- 

114 >>> idx = pd.date_range(start='2020/12/01', end='2020/12/30', periods=30) 

115 >>> pd.infer_freq(idx) 

116 'D' 

117 """ 

118 from pandas.core.api import DatetimeIndex 

119 

120 if isinstance(index, ABCSeries): 

121 values = index._values 

122 if not ( 

123 lib.is_np_dtype(values.dtype, "mM") 

124 or isinstance(values.dtype, DatetimeTZDtype) 

125 or values.dtype == object 

126 ): 

127 raise TypeError( 

128 "cannot infer freq from a non-convertible dtype " 

129 f"on a Series of {index.dtype}" 

130 ) 

131 index = values 

132 

133 inferer: _FrequencyInferer 

134 

135 if not hasattr(index, "dtype"): 

136 pass 

137 elif isinstance(index.dtype, PeriodDtype): 

138 raise TypeError( 

139 "PeriodIndex given. Check the `freq` attribute " 

140 "instead of using infer_freq." 

141 ) 

142 elif lib.is_np_dtype(index.dtype, "m"): 

143 # Allow TimedeltaIndex and TimedeltaArray 

144 inferer = _TimedeltaFrequencyInferer(index) 

145 return inferer.get_freq() 

146 

147 elif is_numeric_dtype(index.dtype): 

148 raise TypeError( 

149 f"cannot infer freq from a non-convertible index of dtype {index.dtype}" 

150 ) 

151 

152 if not isinstance(index, DatetimeIndex): 

153 index = DatetimeIndex(index) 

154 

155 inferer = _FrequencyInferer(index) 

156 return inferer.get_freq() 

157 

158 

159class _FrequencyInferer: 

160 """ 

161 Not sure if I can avoid the state machine here 

162 """ 

163 

164 def __init__(self, index) -> None: 

165 self.index = index 

166 self.i8values = index.asi8 

167 

168 # For get_unit_from_dtype we need the dtype to the underlying ndarray, 

169 # which for tz-aware is not the same as index.dtype 

170 if isinstance(index, ABCIndex): 

171 # error: Item "ndarray[Any, Any]" of "Union[ExtensionArray, 

172 # ndarray[Any, Any]]" has no attribute "_ndarray" 

173 self._creso = get_unit_from_dtype( 

174 index._data._ndarray.dtype # type: ignore[union-attr] 

175 ) 

176 else: 

177 # otherwise we have DTA/TDA 

178 self._creso = get_unit_from_dtype(index._ndarray.dtype) 

179 

180 # This moves the values, which are implicitly in UTC, to the 

181 # the timezone so they are in local time 

182 if hasattr(index, "tz"): 

183 if index.tz is not None: 

184 self.i8values = tz_convert_from_utc( 

185 self.i8values, index.tz, reso=self._creso 

186 ) 

187 

188 if len(index) < 3: 

189 raise ValueError("Need at least 3 dates to infer frequency") 

190 

191 self.is_monotonic = ( 

192 self.index._is_monotonic_increasing or self.index._is_monotonic_decreasing 

193 ) 

194 

195 @cache_readonly 

196 def deltas(self) -> npt.NDArray[np.int64]: 

197 return unique_deltas(self.i8values) 

198 

199 @cache_readonly 

200 def deltas_asi8(self) -> npt.NDArray[np.int64]: 

201 # NB: we cannot use self.i8values here because we may have converted 

202 # the tz in __init__ 

203 return unique_deltas(self.index.asi8) 

204 

205 @cache_readonly 

206 def is_unique(self) -> bool: 

207 return len(self.deltas) == 1 

208 

209 @cache_readonly 

210 def is_unique_asi8(self) -> bool: 

211 return len(self.deltas_asi8) == 1 

212 

213 def get_freq(self) -> str | None: 

214 """ 

215 Find the appropriate frequency string to describe the inferred 

216 frequency of self.i8values 

217 

218 Returns 

219 ------- 

220 str or None 

221 """ 

222 if not self.is_monotonic or not self.index._is_unique: 

223 return None 

224 

225 delta = self.deltas[0] 

226 ppd = periods_per_day(self._creso) 

227 if delta and _is_multiple(delta, ppd): 

228 return self._infer_daily_rule() 

229 

230 # Business hourly, maybe. 17: one day / 65: one weekend 

231 if self.hour_deltas in ([1, 17], [1, 65], [1, 17, 65]): 

232 return "bh" 

233 

234 # Possibly intraday frequency. Here we use the 

235 # original .asi8 values as the modified values 

236 # will not work around DST transitions. See #8772 

237 if not self.is_unique_asi8: 

238 return None 

239 

240 delta = self.deltas_asi8[0] 

241 pph = ppd // 24 

242 ppm = pph // 60 

243 pps = ppm // 60 

244 if _is_multiple(delta, pph): 

245 # Hours 

246 return _maybe_add_count("h", delta / pph) 

247 elif _is_multiple(delta, ppm): 

248 # Minutes 

249 return _maybe_add_count("min", delta / ppm) 

250 elif _is_multiple(delta, pps): 

251 # Seconds 

252 return _maybe_add_count("s", delta / pps) 

253 elif _is_multiple(delta, (pps // 1000)): 

254 # Milliseconds 

255 return _maybe_add_count("ms", delta / (pps // 1000)) 

256 elif _is_multiple(delta, (pps // 1_000_000)): 

257 # Microseconds 

258 return _maybe_add_count("us", delta / (pps // 1_000_000)) 

259 else: 

260 # Nanoseconds 

261 return _maybe_add_count("ns", delta) 

262 

263 @cache_readonly 

264 def day_deltas(self) -> list[int]: 

265 ppd = periods_per_day(self._creso) 

266 return [x / ppd for x in self.deltas] 

267 

268 @cache_readonly 

269 def hour_deltas(self) -> list[int]: 

270 pph = periods_per_day(self._creso) // 24 

271 return [x / pph for x in self.deltas] 

272 

273 @cache_readonly 

274 def fields(self) -> np.ndarray: # structured array of fields 

275 return build_field_sarray(self.i8values, reso=self._creso) 

276 

277 @cache_readonly 

278 def rep_stamp(self) -> Timestamp: 

279 return Timestamp(self.i8values[0], unit=self.index.unit) 

280 

281 def month_position_check(self) -> str | None: 

282 return month_position_check(self.fields, self.index.dayofweek) 

283 

284 @cache_readonly 

285 def mdiffs(self) -> npt.NDArray[np.int64]: 

286 nmonths = self.fields["Y"] * 12 + self.fields["M"] 

287 return unique_deltas(nmonths.astype("i8")) 

288 

289 @cache_readonly 

290 def ydiffs(self) -> npt.NDArray[np.int64]: 

291 return unique_deltas(self.fields["Y"].astype("i8")) 

292 

293 def _infer_daily_rule(self) -> str | None: 

294 annual_rule = self._get_annual_rule() 

295 if annual_rule: 

296 nyears = self.ydiffs[0] 

297 month = MONTH_ALIASES[self.rep_stamp.month] 

298 alias = f"{annual_rule}-{month}" 

299 return _maybe_add_count(alias, nyears) 

300 

301 quarterly_rule = self._get_quarterly_rule() 

302 if quarterly_rule: 

303 nquarters = self.mdiffs[0] / 3 

304 mod_dict = {0: 12, 2: 11, 1: 10} 

305 month = MONTH_ALIASES[mod_dict[self.rep_stamp.month % 3]] 

306 alias = f"{quarterly_rule}-{month}" 

307 return _maybe_add_count(alias, nquarters) 

308 

309 monthly_rule = self._get_monthly_rule() 

310 if monthly_rule: 

311 return _maybe_add_count(monthly_rule, self.mdiffs[0]) 

312 

313 if self.is_unique: 

314 return self._get_daily_rule() 

315 

316 if self._is_business_daily(): 

317 return "B" 

318 

319 wom_rule = self._get_wom_rule() 

320 if wom_rule: 

321 return wom_rule 

322 

323 return None 

324 

325 def _get_daily_rule(self) -> str | None: 

326 ppd = periods_per_day(self._creso) 

327 days = self.deltas[0] / ppd 

328 if days % 7 == 0: 

329 # Weekly 

330 wd = int_to_weekday[self.rep_stamp.weekday()] 

331 alias = f"W-{wd}" 

332 return _maybe_add_count(alias, days / 7) 

333 else: 

334 return _maybe_add_count("D", days) 

335 

336 def _get_annual_rule(self) -> str | None: 

337 if len(self.ydiffs) > 1: 

338 return None 

339 

340 if len(unique(self.fields["M"])) > 1: 

341 return None 

342 

343 pos_check = self.month_position_check() 

344 

345 if pos_check is None: 

346 return None 

347 else: 

348 return {"cs": "YS", "bs": "BYS", "ce": "YE", "be": "BYE"}.get(pos_check) 

349 

350 def _get_quarterly_rule(self) -> str | None: 

351 if len(self.mdiffs) > 1: 

352 return None 

353 

354 if not self.mdiffs[0] % 3 == 0: 

355 return None 

356 

357 pos_check = self.month_position_check() 

358 

359 if pos_check is None: 

360 return None 

361 else: 

362 return {"cs": "QS", "bs": "BQS", "ce": "QE", "be": "BQE"}.get(pos_check) 

363 

364 def _get_monthly_rule(self) -> str | None: 

365 if len(self.mdiffs) > 1: 

366 return None 

367 pos_check = self.month_position_check() 

368 

369 if pos_check is None: 

370 return None 

371 else: 

372 return {"cs": "MS", "bs": "BMS", "ce": "ME", "be": "BME"}.get(pos_check) 

373 

374 def _is_business_daily(self) -> bool: 

375 # quick check: cannot be business daily 

376 if self.day_deltas != [1, 3]: 

377 return False 

378 

379 # probably business daily, but need to confirm 

380 first_weekday = self.index[0].weekday() 

381 shifts = np.diff(self.i8values) 

382 ppd = periods_per_day(self._creso) 

383 shifts = np.floor_divide(shifts, ppd) 

384 weekdays = np.mod(first_weekday + np.cumsum(shifts), 7) 

385 

386 return bool( 

387 np.all( 

388 ((weekdays == 0) & (shifts == 3)) 

389 | ((weekdays > 0) & (weekdays <= 4) & (shifts == 1)) 

390 ) 

391 ) 

392 

393 def _get_wom_rule(self) -> str | None: 

394 weekdays = unique(self.index.weekday) 

395 if len(weekdays) > 1: 

396 return None 

397 

398 week_of_months = unique((self.index.day - 1) // 7) 

399 # Only attempt to infer up to WOM-4. See #9425 

400 week_of_months = week_of_months[week_of_months < 4] 

401 if len(week_of_months) == 0 or len(week_of_months) > 1: 

402 return None 

403 

404 # get which week 

405 week = week_of_months[0] + 1 

406 wd = int_to_weekday[weekdays[0]] 

407 

408 return f"WOM-{week}{wd}" 

409 

410 

411class _TimedeltaFrequencyInferer(_FrequencyInferer): 

412 def _infer_daily_rule(self): 

413 if self.is_unique: 

414 return self._get_daily_rule() 

415 

416 

417def _is_multiple(us, mult: int) -> bool: 

418 return us % mult == 0 

419 

420 

421def _maybe_add_count(base: str, count: float) -> str: 

422 if count != 1: 

423 assert count == int(count) 

424 count = int(count) 

425 return f"{count}{base}" 

426 else: 

427 return base 

428 

429 

430# ---------------------------------------------------------------------- 

431# Frequency comparison 

432 

433 

434def is_subperiod(source, target) -> bool: 

435 """ 

436 Returns True if downsampling is possible between source and target 

437 frequencies 

438 

439 Parameters 

440 ---------- 

441 source : str or DateOffset 

442 Frequency converting from 

443 target : str or DateOffset 

444 Frequency converting to 

445 

446 Returns 

447 ------- 

448 bool 

449 """ 

450 if target is None or source is None: 

451 return False 

452 source = _maybe_coerce_freq(source) 

453 target = _maybe_coerce_freq(target) 

454 

455 if _is_annual(target): 

456 if _is_quarterly(source): 

457 return _quarter_months_conform( 

458 get_rule_month(source), get_rule_month(target) 

459 ) 

460 return source in {"D", "C", "B", "M", "h", "min", "s", "ms", "us", "ns"} 

461 elif _is_quarterly(target): 

462 return source in {"D", "C", "B", "M", "h", "min", "s", "ms", "us", "ns"} 

463 elif _is_monthly(target): 

464 return source in {"D", "C", "B", "h", "min", "s", "ms", "us", "ns"} 

465 elif _is_weekly(target): 

466 return source in {target, "D", "C", "B", "h", "min", "s", "ms", "us", "ns"} 

467 elif target == "B": 

468 return source in {"B", "h", "min", "s", "ms", "us", "ns"} 

469 elif target == "C": 

470 return source in {"C", "h", "min", "s", "ms", "us", "ns"} 

471 elif target == "D": 

472 return source in {"D", "h", "min", "s", "ms", "us", "ns"} 

473 elif target == "h": 

474 return source in {"h", "min", "s", "ms", "us", "ns"} 

475 elif target == "min": 

476 return source in {"min", "s", "ms", "us", "ns"} 

477 elif target == "s": 

478 return source in {"s", "ms", "us", "ns"} 

479 elif target == "ms": 

480 return source in {"ms", "us", "ns"} 

481 elif target == "us": 

482 return source in {"us", "ns"} 

483 elif target == "ns": 

484 return source in {"ns"} 

485 else: 

486 return False 

487 

488 

489def is_superperiod(source, target) -> bool: 

490 """ 

491 Returns True if upsampling is possible between source and target 

492 frequencies 

493 

494 Parameters 

495 ---------- 

496 source : str or DateOffset 

497 Frequency converting from 

498 target : str or DateOffset 

499 Frequency converting to 

500 

501 Returns 

502 ------- 

503 bool 

504 """ 

505 if target is None or source is None: 

506 return False 

507 source = _maybe_coerce_freq(source) 

508 target = _maybe_coerce_freq(target) 

509 

510 if _is_annual(source): 

511 if _is_annual(target): 

512 return get_rule_month(source) == get_rule_month(target) 

513 

514 if _is_quarterly(target): 

515 smonth = get_rule_month(source) 

516 tmonth = get_rule_month(target) 

517 return _quarter_months_conform(smonth, tmonth) 

518 return target in {"D", "C", "B", "M", "h", "min", "s", "ms", "us", "ns"} 

519 elif _is_quarterly(source): 

520 return target in {"D", "C", "B", "M", "h", "min", "s", "ms", "us", "ns"} 

521 elif _is_monthly(source): 

522 return target in {"D", "C", "B", "h", "min", "s", "ms", "us", "ns"} 

523 elif _is_weekly(source): 

524 return target in {source, "D", "C", "B", "h", "min", "s", "ms", "us", "ns"} 

525 elif source == "B": 

526 return target in {"D", "C", "B", "h", "min", "s", "ms", "us", "ns"} 

527 elif source == "C": 

528 return target in {"D", "C", "B", "h", "min", "s", "ms", "us", "ns"} 

529 elif source == "D": 

530 return target in {"D", "C", "B", "h", "min", "s", "ms", "us", "ns"} 

531 elif source == "h": 

532 return target in {"h", "min", "s", "ms", "us", "ns"} 

533 elif source == "min": 

534 return target in {"min", "s", "ms", "us", "ns"} 

535 elif source == "s": 

536 return target in {"s", "ms", "us", "ns"} 

537 elif source == "ms": 

538 return target in {"ms", "us", "ns"} 

539 elif source == "us": 

540 return target in {"us", "ns"} 

541 elif source == "ns": 

542 return target in {"ns"} 

543 else: 

544 return False 

545 

546 

547def _maybe_coerce_freq(code) -> str: 

548 """we might need to coerce a code to a rule_code 

549 and uppercase it 

550 

551 Parameters 

552 ---------- 

553 source : str or DateOffset 

554 Frequency converting from 

555 

556 Returns 

557 ------- 

558 str 

559 """ 

560 assert code is not None 

561 if isinstance(code, DateOffset): 

562 code = freq_to_period_freqstr(1, code.name) 

563 if code in {"h", "min", "s", "ms", "us", "ns"}: 

564 return code 

565 else: 

566 return code.upper() 

567 

568 

569def _quarter_months_conform(source: str, target: str) -> bool: 

570 snum = MONTH_NUMBERS[source] 

571 tnum = MONTH_NUMBERS[target] 

572 return snum % 3 == tnum % 3 

573 

574 

575def _is_annual(rule: str) -> bool: 

576 rule = rule.upper() 

577 return rule == "Y" or rule.startswith("Y-") 

578 

579 

580def _is_quarterly(rule: str) -> bool: 

581 rule = rule.upper() 

582 return rule == "Q" or rule.startswith(("Q-", "BQ")) 

583 

584 

585def _is_monthly(rule: str) -> bool: 

586 rule = rule.upper() 

587 return rule in ("M", "BM") 

588 

589 

590def _is_weekly(rule: str) -> bool: 

591 rule = rule.upper() 

592 return rule == "W" or rule.startswith("W-") 

593 

594 

595__all__ = [ 

596 "Day", 

597 "get_period_alias", 

598 "infer_freq", 

599 "is_subperiod", 

600 "is_superperiod", 

601 "to_offset", 

602]