1from __future__ import annotations
2
3from typing import TYPE_CHECKING
4
5import numpy as np
6
7from pandas._libs import lib
8from pandas._libs.algos import unique_deltas
9from pandas._libs.tslibs import (
10 Timestamp,
11 get_unit_from_dtype,
12 periods_per_day,
13 tz_convert_from_utc,
14)
15from pandas._libs.tslibs.ccalendar import (
16 DAYS,
17 MONTH_ALIASES,
18 MONTH_NUMBERS,
19 MONTHS,
20 int_to_weekday,
21)
22from pandas._libs.tslibs.dtypes import (
23 OFFSET_TO_PERIOD_FREQSTR,
24 freq_to_period_freqstr,
25)
26from pandas._libs.tslibs.fields import (
27 build_field_sarray,
28 month_position_check,
29)
30from pandas._libs.tslibs.offsets import (
31 DateOffset,
32 Day,
33 to_offset,
34)
35from pandas._libs.tslibs.parsing import get_rule_month
36from pandas.util._decorators import cache_readonly
37
38from pandas.core.dtypes.common import is_numeric_dtype
39from pandas.core.dtypes.dtypes import (
40 DatetimeTZDtype,
41 PeriodDtype,
42)
43from pandas.core.dtypes.generic import (
44 ABCIndex,
45 ABCSeries,
46)
47
48from pandas.core.algorithms import unique
49
50if TYPE_CHECKING:
51 from pandas._typing import npt
52
53 from pandas import (
54 DatetimeIndex,
55 Series,
56 TimedeltaIndex,
57 )
58 from pandas.core.arrays.datetimelike import DatetimeLikeArrayMixin
59# --------------------------------------------------------------------
60# Offset related functions
61
62_need_suffix = ["QS", "BQE", "BQS", "YS", "BYE", "BYS"]
63
64for _prefix in _need_suffix:
65 for _m in MONTHS:
66 key = f"{_prefix}-{_m}"
67 OFFSET_TO_PERIOD_FREQSTR[key] = OFFSET_TO_PERIOD_FREQSTR[_prefix]
68
69for _prefix in ["Y", "Q"]:
70 for _m in MONTHS:
71 _alias = f"{_prefix}-{_m}"
72 OFFSET_TO_PERIOD_FREQSTR[_alias] = _alias
73
74for _d in DAYS:
75 OFFSET_TO_PERIOD_FREQSTR[f"W-{_d}"] = f"W-{_d}"
76
77
78def get_period_alias(offset_str: str) -> str | None:
79 """
80 Alias to closest period strings BQ->Q etc.
81 """
82 return OFFSET_TO_PERIOD_FREQSTR.get(offset_str, None)
83
84
85# ---------------------------------------------------------------------
86# Period codes
87
88
89def infer_freq(
90 index: DatetimeIndex | TimedeltaIndex | Series | DatetimeLikeArrayMixin,
91) -> str | None:
92 """
93 Infer the most likely frequency given the input index.
94
95 Parameters
96 ----------
97 index : DatetimeIndex, TimedeltaIndex, Series or array-like
98 If passed a Series will use the values of the series (NOT THE INDEX).
99
100 Returns
101 -------
102 str or None
103 None if no discernible frequency.
104
105 Raises
106 ------
107 TypeError
108 If the index is not datetime-like.
109 ValueError
110 If there are fewer than three values.
111
112 Examples
113 --------
114 >>> idx = pd.date_range(start='2020/12/01', end='2020/12/30', periods=30)
115 >>> pd.infer_freq(idx)
116 'D'
117 """
118 from pandas.core.api import DatetimeIndex
119
120 if isinstance(index, ABCSeries):
121 values = index._values
122 if not (
123 lib.is_np_dtype(values.dtype, "mM")
124 or isinstance(values.dtype, DatetimeTZDtype)
125 or values.dtype == object
126 ):
127 raise TypeError(
128 "cannot infer freq from a non-convertible dtype "
129 f"on a Series of {index.dtype}"
130 )
131 index = values
132
133 inferer: _FrequencyInferer
134
135 if not hasattr(index, "dtype"):
136 pass
137 elif isinstance(index.dtype, PeriodDtype):
138 raise TypeError(
139 "PeriodIndex given. Check the `freq` attribute "
140 "instead of using infer_freq."
141 )
142 elif lib.is_np_dtype(index.dtype, "m"):
143 # Allow TimedeltaIndex and TimedeltaArray
144 inferer = _TimedeltaFrequencyInferer(index)
145 return inferer.get_freq()
146
147 elif is_numeric_dtype(index.dtype):
148 raise TypeError(
149 f"cannot infer freq from a non-convertible index of dtype {index.dtype}"
150 )
151
152 if not isinstance(index, DatetimeIndex):
153 index = DatetimeIndex(index)
154
155 inferer = _FrequencyInferer(index)
156 return inferer.get_freq()
157
158
159class _FrequencyInferer:
160 """
161 Not sure if I can avoid the state machine here
162 """
163
164 def __init__(self, index) -> None:
165 self.index = index
166 self.i8values = index.asi8
167
168 # For get_unit_from_dtype we need the dtype to the underlying ndarray,
169 # which for tz-aware is not the same as index.dtype
170 if isinstance(index, ABCIndex):
171 # error: Item "ndarray[Any, Any]" of "Union[ExtensionArray,
172 # ndarray[Any, Any]]" has no attribute "_ndarray"
173 self._creso = get_unit_from_dtype(
174 index._data._ndarray.dtype # type: ignore[union-attr]
175 )
176 else:
177 # otherwise we have DTA/TDA
178 self._creso = get_unit_from_dtype(index._ndarray.dtype)
179
180 # This moves the values, which are implicitly in UTC, to the
181 # the timezone so they are in local time
182 if hasattr(index, "tz"):
183 if index.tz is not None:
184 self.i8values = tz_convert_from_utc(
185 self.i8values, index.tz, reso=self._creso
186 )
187
188 if len(index) < 3:
189 raise ValueError("Need at least 3 dates to infer frequency")
190
191 self.is_monotonic = (
192 self.index._is_monotonic_increasing or self.index._is_monotonic_decreasing
193 )
194
195 @cache_readonly
196 def deltas(self) -> npt.NDArray[np.int64]:
197 return unique_deltas(self.i8values)
198
199 @cache_readonly
200 def deltas_asi8(self) -> npt.NDArray[np.int64]:
201 # NB: we cannot use self.i8values here because we may have converted
202 # the tz in __init__
203 return unique_deltas(self.index.asi8)
204
205 @cache_readonly
206 def is_unique(self) -> bool:
207 return len(self.deltas) == 1
208
209 @cache_readonly
210 def is_unique_asi8(self) -> bool:
211 return len(self.deltas_asi8) == 1
212
213 def get_freq(self) -> str | None:
214 """
215 Find the appropriate frequency string to describe the inferred
216 frequency of self.i8values
217
218 Returns
219 -------
220 str or None
221 """
222 if not self.is_monotonic or not self.index._is_unique:
223 return None
224
225 delta = self.deltas[0]
226 ppd = periods_per_day(self._creso)
227 if delta and _is_multiple(delta, ppd):
228 return self._infer_daily_rule()
229
230 # Business hourly, maybe. 17: one day / 65: one weekend
231 if self.hour_deltas in ([1, 17], [1, 65], [1, 17, 65]):
232 return "bh"
233
234 # Possibly intraday frequency. Here we use the
235 # original .asi8 values as the modified values
236 # will not work around DST transitions. See #8772
237 if not self.is_unique_asi8:
238 return None
239
240 delta = self.deltas_asi8[0]
241 pph = ppd // 24
242 ppm = pph // 60
243 pps = ppm // 60
244 if _is_multiple(delta, pph):
245 # Hours
246 return _maybe_add_count("h", delta / pph)
247 elif _is_multiple(delta, ppm):
248 # Minutes
249 return _maybe_add_count("min", delta / ppm)
250 elif _is_multiple(delta, pps):
251 # Seconds
252 return _maybe_add_count("s", delta / pps)
253 elif _is_multiple(delta, (pps // 1000)):
254 # Milliseconds
255 return _maybe_add_count("ms", delta / (pps // 1000))
256 elif _is_multiple(delta, (pps // 1_000_000)):
257 # Microseconds
258 return _maybe_add_count("us", delta / (pps // 1_000_000))
259 else:
260 # Nanoseconds
261 return _maybe_add_count("ns", delta)
262
263 @cache_readonly
264 def day_deltas(self) -> list[int]:
265 ppd = periods_per_day(self._creso)
266 return [x / ppd for x in self.deltas]
267
268 @cache_readonly
269 def hour_deltas(self) -> list[int]:
270 pph = periods_per_day(self._creso) // 24
271 return [x / pph for x in self.deltas]
272
273 @cache_readonly
274 def fields(self) -> np.ndarray: # structured array of fields
275 return build_field_sarray(self.i8values, reso=self._creso)
276
277 @cache_readonly
278 def rep_stamp(self) -> Timestamp:
279 return Timestamp(self.i8values[0], unit=self.index.unit)
280
281 def month_position_check(self) -> str | None:
282 return month_position_check(self.fields, self.index.dayofweek)
283
284 @cache_readonly
285 def mdiffs(self) -> npt.NDArray[np.int64]:
286 nmonths = self.fields["Y"] * 12 + self.fields["M"]
287 return unique_deltas(nmonths.astype("i8"))
288
289 @cache_readonly
290 def ydiffs(self) -> npt.NDArray[np.int64]:
291 return unique_deltas(self.fields["Y"].astype("i8"))
292
293 def _infer_daily_rule(self) -> str | None:
294 annual_rule = self._get_annual_rule()
295 if annual_rule:
296 nyears = self.ydiffs[0]
297 month = MONTH_ALIASES[self.rep_stamp.month]
298 alias = f"{annual_rule}-{month}"
299 return _maybe_add_count(alias, nyears)
300
301 quarterly_rule = self._get_quarterly_rule()
302 if quarterly_rule:
303 nquarters = self.mdiffs[0] / 3
304 mod_dict = {0: 12, 2: 11, 1: 10}
305 month = MONTH_ALIASES[mod_dict[self.rep_stamp.month % 3]]
306 alias = f"{quarterly_rule}-{month}"
307 return _maybe_add_count(alias, nquarters)
308
309 monthly_rule = self._get_monthly_rule()
310 if monthly_rule:
311 return _maybe_add_count(monthly_rule, self.mdiffs[0])
312
313 if self.is_unique:
314 return self._get_daily_rule()
315
316 if self._is_business_daily():
317 return "B"
318
319 wom_rule = self._get_wom_rule()
320 if wom_rule:
321 return wom_rule
322
323 return None
324
325 def _get_daily_rule(self) -> str | None:
326 ppd = periods_per_day(self._creso)
327 days = self.deltas[0] / ppd
328 if days % 7 == 0:
329 # Weekly
330 wd = int_to_weekday[self.rep_stamp.weekday()]
331 alias = f"W-{wd}"
332 return _maybe_add_count(alias, days / 7)
333 else:
334 return _maybe_add_count("D", days)
335
336 def _get_annual_rule(self) -> str | None:
337 if len(self.ydiffs) > 1:
338 return None
339
340 if len(unique(self.fields["M"])) > 1:
341 return None
342
343 pos_check = self.month_position_check()
344
345 if pos_check is None:
346 return None
347 else:
348 return {"cs": "YS", "bs": "BYS", "ce": "YE", "be": "BYE"}.get(pos_check)
349
350 def _get_quarterly_rule(self) -> str | None:
351 if len(self.mdiffs) > 1:
352 return None
353
354 if not self.mdiffs[0] % 3 == 0:
355 return None
356
357 pos_check = self.month_position_check()
358
359 if pos_check is None:
360 return None
361 else:
362 return {"cs": "QS", "bs": "BQS", "ce": "QE", "be": "BQE"}.get(pos_check)
363
364 def _get_monthly_rule(self) -> str | None:
365 if len(self.mdiffs) > 1:
366 return None
367 pos_check = self.month_position_check()
368
369 if pos_check is None:
370 return None
371 else:
372 return {"cs": "MS", "bs": "BMS", "ce": "ME", "be": "BME"}.get(pos_check)
373
374 def _is_business_daily(self) -> bool:
375 # quick check: cannot be business daily
376 if self.day_deltas != [1, 3]:
377 return False
378
379 # probably business daily, but need to confirm
380 first_weekday = self.index[0].weekday()
381 shifts = np.diff(self.i8values)
382 ppd = periods_per_day(self._creso)
383 shifts = np.floor_divide(shifts, ppd)
384 weekdays = np.mod(first_weekday + np.cumsum(shifts), 7)
385
386 return bool(
387 np.all(
388 ((weekdays == 0) & (shifts == 3))
389 | ((weekdays > 0) & (weekdays <= 4) & (shifts == 1))
390 )
391 )
392
393 def _get_wom_rule(self) -> str | None:
394 weekdays = unique(self.index.weekday)
395 if len(weekdays) > 1:
396 return None
397
398 week_of_months = unique((self.index.day - 1) // 7)
399 # Only attempt to infer up to WOM-4. See #9425
400 week_of_months = week_of_months[week_of_months < 4]
401 if len(week_of_months) == 0 or len(week_of_months) > 1:
402 return None
403
404 # get which week
405 week = week_of_months[0] + 1
406 wd = int_to_weekday[weekdays[0]]
407
408 return f"WOM-{week}{wd}"
409
410
411class _TimedeltaFrequencyInferer(_FrequencyInferer):
412 def _infer_daily_rule(self):
413 if self.is_unique:
414 return self._get_daily_rule()
415
416
417def _is_multiple(us, mult: int) -> bool:
418 return us % mult == 0
419
420
421def _maybe_add_count(base: str, count: float) -> str:
422 if count != 1:
423 assert count == int(count)
424 count = int(count)
425 return f"{count}{base}"
426 else:
427 return base
428
429
430# ----------------------------------------------------------------------
431# Frequency comparison
432
433
434def is_subperiod(source, target) -> bool:
435 """
436 Returns True if downsampling is possible between source and target
437 frequencies
438
439 Parameters
440 ----------
441 source : str or DateOffset
442 Frequency converting from
443 target : str or DateOffset
444 Frequency converting to
445
446 Returns
447 -------
448 bool
449 """
450 if target is None or source is None:
451 return False
452 source = _maybe_coerce_freq(source)
453 target = _maybe_coerce_freq(target)
454
455 if _is_annual(target):
456 if _is_quarterly(source):
457 return _quarter_months_conform(
458 get_rule_month(source), get_rule_month(target)
459 )
460 return source in {"D", "C", "B", "M", "h", "min", "s", "ms", "us", "ns"}
461 elif _is_quarterly(target):
462 return source in {"D", "C", "B", "M", "h", "min", "s", "ms", "us", "ns"}
463 elif _is_monthly(target):
464 return source in {"D", "C", "B", "h", "min", "s", "ms", "us", "ns"}
465 elif _is_weekly(target):
466 return source in {target, "D", "C", "B", "h", "min", "s", "ms", "us", "ns"}
467 elif target == "B":
468 return source in {"B", "h", "min", "s", "ms", "us", "ns"}
469 elif target == "C":
470 return source in {"C", "h", "min", "s", "ms", "us", "ns"}
471 elif target == "D":
472 return source in {"D", "h", "min", "s", "ms", "us", "ns"}
473 elif target == "h":
474 return source in {"h", "min", "s", "ms", "us", "ns"}
475 elif target == "min":
476 return source in {"min", "s", "ms", "us", "ns"}
477 elif target == "s":
478 return source in {"s", "ms", "us", "ns"}
479 elif target == "ms":
480 return source in {"ms", "us", "ns"}
481 elif target == "us":
482 return source in {"us", "ns"}
483 elif target == "ns":
484 return source in {"ns"}
485 else:
486 return False
487
488
489def is_superperiod(source, target) -> bool:
490 """
491 Returns True if upsampling is possible between source and target
492 frequencies
493
494 Parameters
495 ----------
496 source : str or DateOffset
497 Frequency converting from
498 target : str or DateOffset
499 Frequency converting to
500
501 Returns
502 -------
503 bool
504 """
505 if target is None or source is None:
506 return False
507 source = _maybe_coerce_freq(source)
508 target = _maybe_coerce_freq(target)
509
510 if _is_annual(source):
511 if _is_annual(target):
512 return get_rule_month(source) == get_rule_month(target)
513
514 if _is_quarterly(target):
515 smonth = get_rule_month(source)
516 tmonth = get_rule_month(target)
517 return _quarter_months_conform(smonth, tmonth)
518 return target in {"D", "C", "B", "M", "h", "min", "s", "ms", "us", "ns"}
519 elif _is_quarterly(source):
520 return target in {"D", "C", "B", "M", "h", "min", "s", "ms", "us", "ns"}
521 elif _is_monthly(source):
522 return target in {"D", "C", "B", "h", "min", "s", "ms", "us", "ns"}
523 elif _is_weekly(source):
524 return target in {source, "D", "C", "B", "h", "min", "s", "ms", "us", "ns"}
525 elif source == "B":
526 return target in {"D", "C", "B", "h", "min", "s", "ms", "us", "ns"}
527 elif source == "C":
528 return target in {"D", "C", "B", "h", "min", "s", "ms", "us", "ns"}
529 elif source == "D":
530 return target in {"D", "C", "B", "h", "min", "s", "ms", "us", "ns"}
531 elif source == "h":
532 return target in {"h", "min", "s", "ms", "us", "ns"}
533 elif source == "min":
534 return target in {"min", "s", "ms", "us", "ns"}
535 elif source == "s":
536 return target in {"s", "ms", "us", "ns"}
537 elif source == "ms":
538 return target in {"ms", "us", "ns"}
539 elif source == "us":
540 return target in {"us", "ns"}
541 elif source == "ns":
542 return target in {"ns"}
543 else:
544 return False
545
546
547def _maybe_coerce_freq(code) -> str:
548 """we might need to coerce a code to a rule_code
549 and uppercase it
550
551 Parameters
552 ----------
553 source : str or DateOffset
554 Frequency converting from
555
556 Returns
557 -------
558 str
559 """
560 assert code is not None
561 if isinstance(code, DateOffset):
562 code = freq_to_period_freqstr(1, code.name)
563 if code in {"h", "min", "s", "ms", "us", "ns"}:
564 return code
565 else:
566 return code.upper()
567
568
569def _quarter_months_conform(source: str, target: str) -> bool:
570 snum = MONTH_NUMBERS[source]
571 tnum = MONTH_NUMBERS[target]
572 return snum % 3 == tnum % 3
573
574
575def _is_annual(rule: str) -> bool:
576 rule = rule.upper()
577 return rule == "Y" or rule.startswith("Y-")
578
579
580def _is_quarterly(rule: str) -> bool:
581 rule = rule.upper()
582 return rule == "Q" or rule.startswith(("Q-", "BQ"))
583
584
585def _is_monthly(rule: str) -> bool:
586 rule = rule.upper()
587 return rule in ("M", "BM")
588
589
590def _is_weekly(rule: str) -> bool:
591 rule = rule.upper()
592 return rule == "W" or rule.startswith("W-")
593
594
595__all__ = [
596 "Day",
597 "get_period_alias",
598 "infer_freq",
599 "is_subperiod",
600 "is_superperiod",
601 "to_offset",
602]