1"""
2Helper functions to generate range-like data for DatetimeArray
3(and possibly TimedeltaArray/PeriodArray)
4"""
5from __future__ import annotations
6
7from typing import TYPE_CHECKING
8
9import numpy as np
10
11from pandas._libs.lib import i8max
12from pandas._libs.tslibs import (
13 BaseOffset,
14 OutOfBoundsDatetime,
15 Timedelta,
16 Timestamp,
17 iNaT,
18)
19
20if TYPE_CHECKING:
21 from pandas._typing import npt
22
23
24def generate_regular_range(
25 start: Timestamp | Timedelta | None,
26 end: Timestamp | Timedelta | None,
27 periods: int | None,
28 freq: BaseOffset,
29 unit: str = "ns",
30) -> npt.NDArray[np.intp]:
31 """
32 Generate a range of dates or timestamps with the spans between dates
33 described by the given `freq` DateOffset.
34
35 Parameters
36 ----------
37 start : Timedelta, Timestamp or None
38 First point of produced date range.
39 end : Timedelta, Timestamp or None
40 Last point of produced date range.
41 periods : int or None
42 Number of periods in produced date range.
43 freq : Tick
44 Describes space between dates in produced date range.
45 unit : str, default "ns"
46 The resolution the output is meant to represent.
47
48 Returns
49 -------
50 ndarray[np.int64]
51 Representing the given resolution.
52 """
53 istart = start._value if start is not None else None
54 iend = end._value if end is not None else None
55 freq.nanos # raises if non-fixed frequency
56 td = Timedelta(freq)
57 b: int
58 e: int
59 try:
60 td = td.as_unit(unit, round_ok=False)
61 except ValueError as err:
62 raise ValueError(
63 f"freq={freq} is incompatible with unit={unit}. "
64 "Use a lower freq or a higher unit instead."
65 ) from err
66 stride = int(td._value)
67
68 if periods is None and istart is not None and iend is not None:
69 b = istart
70 # cannot just use e = Timestamp(end) + 1 because arange breaks when
71 # stride is too large, see GH10887
72 e = b + (iend - b) // stride * stride + stride // 2 + 1
73 elif istart is not None and periods is not None:
74 b = istart
75 e = _generate_range_overflow_safe(b, periods, stride, side="start")
76 elif iend is not None and periods is not None:
77 e = iend + stride
78 b = _generate_range_overflow_safe(e, periods, stride, side="end")
79 else:
80 raise ValueError(
81 "at least 'start' or 'end' should be specified if a 'period' is given."
82 )
83
84 with np.errstate(over="raise"):
85 # If the range is sufficiently large, np.arange may overflow
86 # and incorrectly return an empty array if not caught.
87 try:
88 values = np.arange(b, e, stride, dtype=np.int64)
89 except FloatingPointError:
90 xdr = [b]
91 while xdr[-1] != e:
92 xdr.append(xdr[-1] + stride)
93 values = np.array(xdr[:-1], dtype=np.int64)
94 return values
95
96
97def _generate_range_overflow_safe(
98 endpoint: int, periods: int, stride: int, side: str = "start"
99) -> int:
100 """
101 Calculate the second endpoint for passing to np.arange, checking
102 to avoid an integer overflow. Catch OverflowError and re-raise
103 as OutOfBoundsDatetime.
104
105 Parameters
106 ----------
107 endpoint : int
108 nanosecond timestamp of the known endpoint of the desired range
109 periods : int
110 number of periods in the desired range
111 stride : int
112 nanoseconds between periods in the desired range
113 side : {'start', 'end'}
114 which end of the range `endpoint` refers to
115
116 Returns
117 -------
118 other_end : int
119
120 Raises
121 ------
122 OutOfBoundsDatetime
123 """
124 # GH#14187 raise instead of incorrectly wrapping around
125 assert side in ["start", "end"]
126
127 i64max = np.uint64(i8max)
128 msg = f"Cannot generate range with {side}={endpoint} and periods={periods}"
129
130 with np.errstate(over="raise"):
131 # if periods * strides cannot be multiplied within the *uint64* bounds,
132 # we cannot salvage the operation by recursing, so raise
133 try:
134 addend = np.uint64(periods) * np.uint64(np.abs(stride))
135 except FloatingPointError as err:
136 raise OutOfBoundsDatetime(msg) from err
137
138 if np.abs(addend) <= i64max:
139 # relatively easy case without casting concerns
140 return _generate_range_overflow_safe_signed(endpoint, periods, stride, side)
141
142 elif (endpoint > 0 and side == "start" and stride > 0) or (
143 endpoint < 0 < stride and side == "end"
144 ):
145 # no chance of not-overflowing
146 raise OutOfBoundsDatetime(msg)
147
148 elif side == "end" and endpoint - stride <= i64max < endpoint:
149 # in _generate_regular_range we added `stride` thereby overflowing
150 # the bounds. Adjust to fix this.
151 return _generate_range_overflow_safe(
152 endpoint - stride, periods - 1, stride, side
153 )
154
155 # split into smaller pieces
156 mid_periods = periods // 2
157 remaining = periods - mid_periods
158 assert 0 < remaining < periods, (remaining, periods, endpoint, stride)
159
160 midpoint = int(_generate_range_overflow_safe(endpoint, mid_periods, stride, side))
161 return _generate_range_overflow_safe(midpoint, remaining, stride, side)
162
163
164def _generate_range_overflow_safe_signed(
165 endpoint: int, periods: int, stride: int, side: str
166) -> int:
167 """
168 A special case for _generate_range_overflow_safe where `periods * stride`
169 can be calculated without overflowing int64 bounds.
170 """
171 assert side in ["start", "end"]
172 if side == "end":
173 stride *= -1
174
175 with np.errstate(over="raise"):
176 addend = np.int64(periods) * np.int64(stride)
177 try:
178 # easy case with no overflows
179 result = np.int64(endpoint) + addend
180 if result == iNaT:
181 # Putting this into a DatetimeArray/TimedeltaArray
182 # would incorrectly be interpreted as NaT
183 raise OverflowError
184 return int(result)
185 except (FloatingPointError, OverflowError):
186 # with endpoint negative and addend positive we risk
187 # FloatingPointError; with reversed signed we risk OverflowError
188 pass
189
190 # if stride and endpoint had opposite signs, then endpoint + addend
191 # should never overflow. so they must have the same signs
192 assert (stride > 0 and endpoint >= 0) or (stride < 0 and endpoint <= 0)
193
194 if stride > 0:
195 # watch out for very special case in which we just slightly
196 # exceed implementation bounds, but when passing the result to
197 # np.arange will get a result slightly within the bounds
198
199 uresult = np.uint64(endpoint) + np.uint64(addend)
200 i64max = np.uint64(i8max)
201 assert uresult > i64max
202 if uresult <= i64max + np.uint64(stride):
203 return int(uresult)
204
205 raise OutOfBoundsDatetime(
206 f"Cannot generate range with {side}={endpoint} and periods={periods}"
207 )