Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pandas/tseries/frequencies.py: 23%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

307 statements  

1from __future__ import annotations 

2 

3import numpy as np 

4 

5from pandas._libs.algos import unique_deltas 

6from pandas._libs.tslibs import ( 

7 Timestamp, 

8 get_unit_from_dtype, 

9 periods_per_day, 

10 tz_convert_from_utc, 

11) 

12from pandas._libs.tslibs.ccalendar import ( 

13 DAYS, 

14 MONTH_ALIASES, 

15 MONTH_NUMBERS, 

16 MONTHS, 

17 int_to_weekday, 

18) 

19from pandas._libs.tslibs.fields import ( 

20 build_field_sarray, 

21 month_position_check, 

22) 

23from pandas._libs.tslibs.offsets import ( 

24 DateOffset, 

25 Day, 

26 to_offset, 

27) 

28from pandas._libs.tslibs.parsing import get_rule_month 

29from pandas._typing import npt 

30from pandas.util._decorators import cache_readonly 

31 

32from pandas.core.dtypes.common import ( 

33 is_datetime64_dtype, 

34 is_numeric_dtype, 

35 is_period_dtype, 

36 is_timedelta64_dtype, 

37) 

38from pandas.core.dtypes.generic import ( 

39 ABCIndex, 

40 ABCSeries, 

41) 

42 

43from pandas.core.algorithms import unique 

44 

45# --------------------------------------------------------------------- 

46# Offset names ("time rules") and related functions 

47 

48_offset_to_period_map = { 

49 "WEEKDAY": "D", 

50 "EOM": "M", 

51 "BM": "M", 

52 "BQS": "Q", 

53 "QS": "Q", 

54 "BQ": "Q", 

55 "BA": "A", 

56 "AS": "A", 

57 "BAS": "A", 

58 "MS": "M", 

59 "D": "D", 

60 "C": "C", 

61 "B": "B", 

62 "T": "T", 

63 "S": "S", 

64 "L": "L", 

65 "U": "U", 

66 "N": "N", 

67 "H": "H", 

68 "Q": "Q", 

69 "A": "A", 

70 "W": "W", 

71 "M": "M", 

72 "Y": "A", 

73 "BY": "A", 

74 "YS": "A", 

75 "BYS": "A", 

76} 

77 

78_need_suffix = ["QS", "BQ", "BQS", "YS", "AS", "BY", "BA", "BYS", "BAS"] 

79 

80for _prefix in _need_suffix: 

81 for _m in MONTHS: 

82 key = f"{_prefix}-{_m}" 

83 _offset_to_period_map[key] = _offset_to_period_map[_prefix] 

84 

85for _prefix in ["A", "Q"]: 

86 for _m in MONTHS: 

87 _alias = f"{_prefix}-{_m}" 

88 _offset_to_period_map[_alias] = _alias 

89 

90for _d in DAYS: 

91 _offset_to_period_map[f"W-{_d}"] = f"W-{_d}" 

92 

93 

94def get_period_alias(offset_str: str) -> str | None: 

95 """ 

96 Alias to closest period strings BQ->Q etc. 

97 """ 

98 return _offset_to_period_map.get(offset_str, None) 

99 

100 

101# --------------------------------------------------------------------- 

102# Period codes 

103 

104 

105def infer_freq(index) -> str | None: 

106 """ 

107 Infer the most likely frequency given the input index. 

108 

109 Parameters 

110 ---------- 

111 index : DatetimeIndex or TimedeltaIndex 

112 If passed a Series will use the values of the series (NOT THE INDEX). 

113 

114 Returns 

115 ------- 

116 str or None 

117 None if no discernible frequency. 

118 

119 Raises 

120 ------ 

121 TypeError 

122 If the index is not datetime-like. 

123 ValueError 

124 If there are fewer than three values. 

125 

126 Examples 

127 -------- 

128 >>> idx = pd.date_range(start='2020/12/01', end='2020/12/30', periods=30) 

129 >>> pd.infer_freq(idx) 

130 'D' 

131 """ 

132 from pandas.core.api import ( 

133 DatetimeIndex, 

134 Index, 

135 ) 

136 

137 if isinstance(index, ABCSeries): 

138 values = index._values 

139 if not ( 

140 is_datetime64_dtype(values) 

141 or is_timedelta64_dtype(values) 

142 or values.dtype == object 

143 ): 

144 raise TypeError( 

145 "cannot infer freq from a non-convertible dtype " 

146 f"on a Series of {index.dtype}" 

147 ) 

148 index = values 

149 

150 inferer: _FrequencyInferer 

151 

152 if not hasattr(index, "dtype"): 

153 pass 

154 elif is_period_dtype(index.dtype): 

155 raise TypeError( 

156 "PeriodIndex given. Check the `freq` attribute " 

157 "instead of using infer_freq." 

158 ) 

159 elif is_timedelta64_dtype(index.dtype): 

160 # Allow TimedeltaIndex and TimedeltaArray 

161 inferer = _TimedeltaFrequencyInferer(index) 

162 return inferer.get_freq() 

163 

164 if isinstance(index, Index) and not isinstance(index, DatetimeIndex): 

165 if is_numeric_dtype(index): 

166 raise TypeError( 

167 f"cannot infer freq from a non-convertible index of dtype {index.dtype}" 

168 ) 

169 index = index._values 

170 

171 if not isinstance(index, DatetimeIndex): 

172 index = DatetimeIndex(index) 

173 

174 inferer = _FrequencyInferer(index) 

175 return inferer.get_freq() 

176 

177 

178class _FrequencyInferer: 

179 """ 

180 Not sure if I can avoid the state machine here 

181 """ 

182 

183 def __init__(self, index) -> None: 

184 self.index = index 

185 self.i8values = index.asi8 

186 

187 # For get_unit_from_dtype we need the dtype to the underlying ndarray, 

188 # which for tz-aware is not the same as index.dtype 

189 if isinstance(index, ABCIndex): 

190 # error: Item "ndarray[Any, Any]" of "Union[ExtensionArray, 

191 # ndarray[Any, Any]]" has no attribute "_ndarray" 

192 self._creso = get_unit_from_dtype( 

193 index._data._ndarray.dtype # type: ignore[union-attr] 

194 ) 

195 else: 

196 # otherwise we have DTA/TDA 

197 self._creso = get_unit_from_dtype(index._ndarray.dtype) 

198 

199 # This moves the values, which are implicitly in UTC, to the 

200 # the timezone so they are in local time 

201 if hasattr(index, "tz"): 

202 if index.tz is not None: 

203 self.i8values = tz_convert_from_utc( 

204 self.i8values, index.tz, reso=self._creso 

205 ) 

206 

207 if len(index) < 3: 

208 raise ValueError("Need at least 3 dates to infer frequency") 

209 

210 self.is_monotonic = ( 

211 self.index._is_monotonic_increasing or self.index._is_monotonic_decreasing 

212 ) 

213 

214 @cache_readonly 

215 def deltas(self) -> npt.NDArray[np.int64]: 

216 return unique_deltas(self.i8values) 

217 

218 @cache_readonly 

219 def deltas_asi8(self) -> npt.NDArray[np.int64]: 

220 # NB: we cannot use self.i8values here because we may have converted 

221 # the tz in __init__ 

222 return unique_deltas(self.index.asi8) 

223 

224 @cache_readonly 

225 def is_unique(self) -> bool: 

226 return len(self.deltas) == 1 

227 

228 @cache_readonly 

229 def is_unique_asi8(self) -> bool: 

230 return len(self.deltas_asi8) == 1 

231 

232 def get_freq(self) -> str | None: 

233 """ 

234 Find the appropriate frequency string to describe the inferred 

235 frequency of self.i8values 

236 

237 Returns 

238 ------- 

239 str or None 

240 """ 

241 if not self.is_monotonic or not self.index._is_unique: 

242 return None 

243 

244 delta = self.deltas[0] 

245 ppd = periods_per_day(self._creso) 

246 if delta and _is_multiple(delta, ppd): 

247 return self._infer_daily_rule() 

248 

249 # Business hourly, maybe. 17: one day / 65: one weekend 

250 if self.hour_deltas in ([1, 17], [1, 65], [1, 17, 65]): 

251 return "BH" 

252 

253 # Possibly intraday frequency. Here we use the 

254 # original .asi8 values as the modified values 

255 # will not work around DST transitions. See #8772 

256 if not self.is_unique_asi8: 

257 return None 

258 

259 delta = self.deltas_asi8[0] 

260 pph = ppd // 24 

261 ppm = pph // 60 

262 pps = ppm // 60 

263 if _is_multiple(delta, pph): 

264 # Hours 

265 return _maybe_add_count("H", delta / pph) 

266 elif _is_multiple(delta, ppm): 

267 # Minutes 

268 return _maybe_add_count("T", delta / ppm) 

269 elif _is_multiple(delta, pps): 

270 # Seconds 

271 return _maybe_add_count("S", delta / pps) 

272 elif _is_multiple(delta, (pps // 1000)): 

273 # Milliseconds 

274 return _maybe_add_count("L", delta / (pps // 1000)) 

275 elif _is_multiple(delta, (pps // 1_000_000)): 

276 # Microseconds 

277 return _maybe_add_count("U", delta / (pps // 1_000_000)) 

278 else: 

279 # Nanoseconds 

280 return _maybe_add_count("N", delta) 

281 

282 @cache_readonly 

283 def day_deltas(self) -> list[int]: 

284 ppd = periods_per_day(self._creso) 

285 return [x / ppd for x in self.deltas] 

286 

287 @cache_readonly 

288 def hour_deltas(self) -> list[int]: 

289 pph = periods_per_day(self._creso) // 24 

290 return [x / pph for x in self.deltas] 

291 

292 @cache_readonly 

293 def fields(self) -> np.ndarray: # structured array of fields 

294 return build_field_sarray(self.i8values, reso=self._creso) 

295 

296 @cache_readonly 

297 def rep_stamp(self) -> Timestamp: 

298 return Timestamp(self.i8values[0]) 

299 

300 def month_position_check(self) -> str | None: 

301 return month_position_check(self.fields, self.index.dayofweek) 

302 

303 @cache_readonly 

304 def mdiffs(self) -> npt.NDArray[np.int64]: 

305 nmonths = self.fields["Y"] * 12 + self.fields["M"] 

306 return unique_deltas(nmonths.astype("i8")) 

307 

308 @cache_readonly 

309 def ydiffs(self) -> npt.NDArray[np.int64]: 

310 return unique_deltas(self.fields["Y"].astype("i8")) 

311 

312 def _infer_daily_rule(self) -> str | None: 

313 annual_rule = self._get_annual_rule() 

314 if annual_rule: 

315 nyears = self.ydiffs[0] 

316 month = MONTH_ALIASES[self.rep_stamp.month] 

317 alias = f"{annual_rule}-{month}" 

318 return _maybe_add_count(alias, nyears) 

319 

320 quarterly_rule = self._get_quarterly_rule() 

321 if quarterly_rule: 

322 nquarters = self.mdiffs[0] / 3 

323 mod_dict = {0: 12, 2: 11, 1: 10} 

324 month = MONTH_ALIASES[mod_dict[self.rep_stamp.month % 3]] 

325 alias = f"{quarterly_rule}-{month}" 

326 return _maybe_add_count(alias, nquarters) 

327 

328 monthly_rule = self._get_monthly_rule() 

329 if monthly_rule: 

330 return _maybe_add_count(monthly_rule, self.mdiffs[0]) 

331 

332 if self.is_unique: 

333 return self._get_daily_rule() 

334 

335 if self._is_business_daily(): 

336 return "B" 

337 

338 wom_rule = self._get_wom_rule() 

339 if wom_rule: 

340 return wom_rule 

341 

342 return None 

343 

344 def _get_daily_rule(self) -> str | None: 

345 ppd = periods_per_day(self._creso) 

346 days = self.deltas[0] / ppd 

347 if days % 7 == 0: 

348 # Weekly 

349 wd = int_to_weekday[self.rep_stamp.weekday()] 

350 alias = f"W-{wd}" 

351 return _maybe_add_count(alias, days / 7) 

352 else: 

353 return _maybe_add_count("D", days) 

354 

355 def _get_annual_rule(self) -> str | None: 

356 if len(self.ydiffs) > 1: 

357 return None 

358 

359 if len(unique(self.fields["M"])) > 1: 

360 return None 

361 

362 pos_check = self.month_position_check() 

363 

364 if pos_check is None: 

365 return None 

366 else: 

367 return {"cs": "AS", "bs": "BAS", "ce": "A", "be": "BA"}.get(pos_check) 

368 

369 def _get_quarterly_rule(self) -> str | None: 

370 if len(self.mdiffs) > 1: 

371 return None 

372 

373 if not self.mdiffs[0] % 3 == 0: 

374 return None 

375 

376 pos_check = self.month_position_check() 

377 

378 if pos_check is None: 

379 return None 

380 else: 

381 return {"cs": "QS", "bs": "BQS", "ce": "Q", "be": "BQ"}.get(pos_check) 

382 

383 def _get_monthly_rule(self) -> str | None: 

384 if len(self.mdiffs) > 1: 

385 return None 

386 pos_check = self.month_position_check() 

387 

388 if pos_check is None: 

389 return None 

390 else: 

391 return {"cs": "MS", "bs": "BMS", "ce": "M", "be": "BM"}.get(pos_check) 

392 

393 def _is_business_daily(self) -> bool: 

394 # quick check: cannot be business daily 

395 if self.day_deltas != [1, 3]: 

396 return False 

397 

398 # probably business daily, but need to confirm 

399 first_weekday = self.index[0].weekday() 

400 shifts = np.diff(self.i8values) 

401 ppd = periods_per_day(self._creso) 

402 shifts = np.floor_divide(shifts, ppd) 

403 weekdays = np.mod(first_weekday + np.cumsum(shifts), 7) 

404 

405 return bool( 

406 np.all( 

407 ((weekdays == 0) & (shifts == 3)) 

408 | ((weekdays > 0) & (weekdays <= 4) & (shifts == 1)) 

409 ) 

410 ) 

411 

412 def _get_wom_rule(self) -> str | None: 

413 weekdays = unique(self.index.weekday) 

414 if len(weekdays) > 1: 

415 return None 

416 

417 week_of_months = unique((self.index.day - 1) // 7) 

418 # Only attempt to infer up to WOM-4. See #9425 

419 week_of_months = week_of_months[week_of_months < 4] 

420 if len(week_of_months) == 0 or len(week_of_months) > 1: 

421 return None 

422 

423 # get which week 

424 week = week_of_months[0] + 1 

425 wd = int_to_weekday[weekdays[0]] 

426 

427 return f"WOM-{week}{wd}" 

428 

429 

430class _TimedeltaFrequencyInferer(_FrequencyInferer): 

431 def _infer_daily_rule(self): 

432 if self.is_unique: 

433 return self._get_daily_rule() 

434 

435 

436def _is_multiple(us, mult: int) -> bool: 

437 return us % mult == 0 

438 

439 

440def _maybe_add_count(base: str, count: float) -> str: 

441 if count != 1: 

442 assert count == int(count) 

443 count = int(count) 

444 return f"{count}{base}" 

445 else: 

446 return base 

447 

448 

449# ---------------------------------------------------------------------- 

450# Frequency comparison 

451 

452 

453def is_subperiod(source, target) -> bool: 

454 """ 

455 Returns True if downsampling is possible between source and target 

456 frequencies 

457 

458 Parameters 

459 ---------- 

460 source : str or DateOffset 

461 Frequency converting from 

462 target : str or DateOffset 

463 Frequency converting to 

464 

465 Returns 

466 ------- 

467 bool 

468 """ 

469 

470 if target is None or source is None: 

471 return False 

472 source = _maybe_coerce_freq(source) 

473 target = _maybe_coerce_freq(target) 

474 

475 if _is_annual(target): 

476 if _is_quarterly(source): 

477 return _quarter_months_conform( 

478 get_rule_month(source), get_rule_month(target) 

479 ) 

480 return source in {"D", "C", "B", "M", "H", "T", "S", "L", "U", "N"} 

481 elif _is_quarterly(target): 

482 return source in {"D", "C", "B", "M", "H", "T", "S", "L", "U", "N"} 

483 elif _is_monthly(target): 

484 return source in {"D", "C", "B", "H", "T", "S", "L", "U", "N"} 

485 elif _is_weekly(target): 

486 return source in {target, "D", "C", "B", "H", "T", "S", "L", "U", "N"} 

487 elif target == "B": 

488 return source in {"B", "H", "T", "S", "L", "U", "N"} 

489 elif target == "C": 

490 return source in {"C", "H", "T", "S", "L", "U", "N"} 

491 elif target == "D": 

492 return source in {"D", "H", "T", "S", "L", "U", "N"} 

493 elif target == "H": 

494 return source in {"H", "T", "S", "L", "U", "N"} 

495 elif target == "T": 

496 return source in {"T", "S", "L", "U", "N"} 

497 elif target == "S": 

498 return source in {"S", "L", "U", "N"} 

499 elif target == "L": 

500 return source in {"L", "U", "N"} 

501 elif target == "U": 

502 return source in {"U", "N"} 

503 elif target == "N": 

504 return source in {"N"} 

505 else: 

506 return False 

507 

508 

509def is_superperiod(source, target) -> bool: 

510 """ 

511 Returns True if upsampling is possible between source and target 

512 frequencies 

513 

514 Parameters 

515 ---------- 

516 source : str or DateOffset 

517 Frequency converting from 

518 target : str or DateOffset 

519 Frequency converting to 

520 

521 Returns 

522 ------- 

523 bool 

524 """ 

525 if target is None or source is None: 

526 return False 

527 source = _maybe_coerce_freq(source) 

528 target = _maybe_coerce_freq(target) 

529 

530 if _is_annual(source): 

531 if _is_annual(target): 

532 return get_rule_month(source) == get_rule_month(target) 

533 

534 if _is_quarterly(target): 

535 smonth = get_rule_month(source) 

536 tmonth = get_rule_month(target) 

537 return _quarter_months_conform(smonth, tmonth) 

538 return target in {"D", "C", "B", "M", "H", "T", "S", "L", "U", "N"} 

539 elif _is_quarterly(source): 

540 return target in {"D", "C", "B", "M", "H", "T", "S", "L", "U", "N"} 

541 elif _is_monthly(source): 

542 return target in {"D", "C", "B", "H", "T", "S", "L", "U", "N"} 

543 elif _is_weekly(source): 

544 return target in {source, "D", "C", "B", "H", "T", "S", "L", "U", "N"} 

545 elif source == "B": 

546 return target in {"D", "C", "B", "H", "T", "S", "L", "U", "N"} 

547 elif source == "C": 

548 return target in {"D", "C", "B", "H", "T", "S", "L", "U", "N"} 

549 elif source == "D": 

550 return target in {"D", "C", "B", "H", "T", "S", "L", "U", "N"} 

551 elif source == "H": 

552 return target in {"H", "T", "S", "L", "U", "N"} 

553 elif source == "T": 

554 return target in {"T", "S", "L", "U", "N"} 

555 elif source == "S": 

556 return target in {"S", "L", "U", "N"} 

557 elif source == "L": 

558 return target in {"L", "U", "N"} 

559 elif source == "U": 

560 return target in {"U", "N"} 

561 elif source == "N": 

562 return target in {"N"} 

563 else: 

564 return False 

565 

566 

567def _maybe_coerce_freq(code) -> str: 

568 """we might need to coerce a code to a rule_code 

569 and uppercase it 

570 

571 Parameters 

572 ---------- 

573 source : str or DateOffset 

574 Frequency converting from 

575 

576 Returns 

577 ------- 

578 str 

579 """ 

580 assert code is not None 

581 if isinstance(code, DateOffset): 

582 code = code.rule_code 

583 return code.upper() 

584 

585 

586def _quarter_months_conform(source: str, target: str) -> bool: 

587 snum = MONTH_NUMBERS[source] 

588 tnum = MONTH_NUMBERS[target] 

589 return snum % 3 == tnum % 3 

590 

591 

592def _is_annual(rule: str) -> bool: 

593 rule = rule.upper() 

594 return rule == "A" or rule.startswith("A-") 

595 

596 

597def _is_quarterly(rule: str) -> bool: 

598 rule = rule.upper() 

599 return rule == "Q" or rule.startswith("Q-") or rule.startswith("BQ") 

600 

601 

602def _is_monthly(rule: str) -> bool: 

603 rule = rule.upper() 

604 return rule in ("M", "BM") 

605 

606 

607def _is_weekly(rule: str) -> bool: 

608 rule = rule.upper() 

609 return rule == "W" or rule.startswith("W-") 

610 

611 

612__all__ = [ 

613 "Day", 

614 "get_period_alias", 

615 "infer_freq", 

616 "is_subperiod", 

617 "is_superperiod", 

618 "to_offset", 

619]