1"""
2Helper functions to generate range-like data for DatetimeArray
3(and possibly TimedeltaArray/PeriodArray)
4"""
5from __future__ import annotations
6
7import numpy as np
8
9from pandas._libs.lib import i8max
10from pandas._libs.tslibs import (
11 BaseOffset,
12 OutOfBoundsDatetime,
13 Timedelta,
14 Timestamp,
15 iNaT,
16)
17from pandas._typing import npt
18
19
20def generate_regular_range(
21 start: Timestamp | Timedelta | None,
22 end: Timestamp | Timedelta | None,
23 periods: int | None,
24 freq: BaseOffset,
25 unit: str = "ns",
26) -> npt.NDArray[np.intp]:
27 """
28 Generate a range of dates or timestamps with the spans between dates
29 described by the given `freq` DateOffset.
30
31 Parameters
32 ----------
33 start : Timedelta, Timestamp or None
34 First point of produced date range.
35 end : Timedelta, Timestamp or None
36 Last point of produced date range.
37 periods : int or None
38 Number of periods in produced date range.
39 freq : Tick
40 Describes space between dates in produced date range.
41 unit : str, default "ns"
42 The resolution the output is meant to represent.
43
44 Returns
45 -------
46 ndarray[np.int64]
47 Representing the given resolution.
48 """
49 istart = start._value if start is not None else None
50 iend = end._value if end is not None else None
51 freq.nanos # raises if non-fixed frequency
52 td = Timedelta(freq)
53 try:
54 td = td.as_unit( # pyright: ignore[reportGeneralTypeIssues]
55 unit, round_ok=False
56 )
57 except ValueError as err:
58 raise ValueError(
59 f"freq={freq} is incompatible with unit={unit}. "
60 "Use a lower freq or a higher unit instead."
61 ) from err
62 stride = int(td._value)
63
64 if periods is None and istart is not None and iend is not None:
65 b = istart
66 # cannot just use e = Timestamp(end) + 1 because arange breaks when
67 # stride is too large, see GH10887
68 e = b + (iend - b) // stride * stride + stride // 2 + 1
69 elif istart is not None and periods is not None:
70 b = istart
71 e = _generate_range_overflow_safe(b, periods, stride, side="start")
72 elif iend is not None and periods is not None:
73 e = iend + stride
74 b = _generate_range_overflow_safe(e, periods, stride, side="end")
75 else:
76 raise ValueError(
77 "at least 'start' or 'end' should be specified if a 'period' is given."
78 )
79
80 with np.errstate(over="raise"):
81 # If the range is sufficiently large, np.arange may overflow
82 # and incorrectly return an empty array if not caught.
83 try:
84 values = np.arange(b, e, stride, dtype=np.int64)
85 except FloatingPointError:
86 xdr = [b]
87 while xdr[-1] != e:
88 xdr.append(xdr[-1] + stride)
89 values = np.array(xdr[:-1], dtype=np.int64)
90 return values
91
92
93def _generate_range_overflow_safe(
94 endpoint: int, periods: int, stride: int, side: str = "start"
95) -> int:
96 """
97 Calculate the second endpoint for passing to np.arange, checking
98 to avoid an integer overflow. Catch OverflowError and re-raise
99 as OutOfBoundsDatetime.
100
101 Parameters
102 ----------
103 endpoint : int
104 nanosecond timestamp of the known endpoint of the desired range
105 periods : int
106 number of periods in the desired range
107 stride : int
108 nanoseconds between periods in the desired range
109 side : {'start', 'end'}
110 which end of the range `endpoint` refers to
111
112 Returns
113 -------
114 other_end : int
115
116 Raises
117 ------
118 OutOfBoundsDatetime
119 """
120 # GH#14187 raise instead of incorrectly wrapping around
121 assert side in ["start", "end"]
122
123 i64max = np.uint64(i8max)
124 msg = f"Cannot generate range with {side}={endpoint} and periods={periods}"
125
126 with np.errstate(over="raise"):
127 # if periods * strides cannot be multiplied within the *uint64* bounds,
128 # we cannot salvage the operation by recursing, so raise
129 try:
130 addend = np.uint64(periods) * np.uint64(np.abs(stride))
131 except FloatingPointError as err:
132 raise OutOfBoundsDatetime(msg) from err
133
134 if np.abs(addend) <= i64max:
135 # relatively easy case without casting concerns
136 return _generate_range_overflow_safe_signed(endpoint, periods, stride, side)
137
138 elif (endpoint > 0 and side == "start" and stride > 0) or (
139 endpoint < 0 < stride and side == "end"
140 ):
141 # no chance of not-overflowing
142 raise OutOfBoundsDatetime(msg)
143
144 elif side == "end" and endpoint - stride <= i64max < endpoint:
145 # in _generate_regular_range we added `stride` thereby overflowing
146 # the bounds. Adjust to fix this.
147 return _generate_range_overflow_safe(
148 endpoint - stride, periods - 1, stride, side
149 )
150
151 # split into smaller pieces
152 mid_periods = periods // 2
153 remaining = periods - mid_periods
154 assert 0 < remaining < periods, (remaining, periods, endpoint, stride)
155
156 midpoint = _generate_range_overflow_safe(endpoint, mid_periods, stride, side)
157 return _generate_range_overflow_safe(midpoint, remaining, stride, side)
158
159
160def _generate_range_overflow_safe_signed(
161 endpoint: int, periods: int, stride: int, side: str
162) -> int:
163 """
164 A special case for _generate_range_overflow_safe where `periods * stride`
165 can be calculated without overflowing int64 bounds.
166 """
167 assert side in ["start", "end"]
168 if side == "end":
169 stride *= -1
170
171 with np.errstate(over="raise"):
172 addend = np.int64(periods) * np.int64(stride)
173 try:
174 # easy case with no overflows
175 result = np.int64(endpoint) + addend
176 if result == iNaT:
177 # Putting this into a DatetimeArray/TimedeltaArray
178 # would incorrectly be interpreted as NaT
179 raise OverflowError
180 # error: Incompatible return value type (got "signedinteger[_64Bit]",
181 # expected "int")
182 return result # type: ignore[return-value]
183 except (FloatingPointError, OverflowError):
184 # with endpoint negative and addend positive we risk
185 # FloatingPointError; with reversed signed we risk OverflowError
186 pass
187
188 # if stride and endpoint had opposite signs, then endpoint + addend
189 # should never overflow. so they must have the same signs
190 assert (stride > 0 and endpoint >= 0) or (stride < 0 and endpoint <= 0)
191
192 if stride > 0:
193 # watch out for very special case in which we just slightly
194 # exceed implementation bounds, but when passing the result to
195 # np.arange will get a result slightly within the bounds
196
197 # error: Incompatible types in assignment (expression has type
198 # "unsignedinteger[_64Bit]", variable has type "signedinteger[_64Bit]")
199 result = np.uint64(endpoint) + np.uint64(addend) # type: ignore[assignment]
200 i64max = np.uint64(i8max)
201 assert result > i64max
202 if result <= i64max + np.uint64(stride):
203 # error: Incompatible return value type (got "unsignedinteger", expected
204 # "int")
205 return result # type: ignore[return-value]
206
207 raise OutOfBoundsDatetime(
208 f"Cannot generate range with {side}={endpoint} and periods={periods}"
209 )