Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pandas/core/arrays/_ranges.py: 11%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

75 statements  

1""" 

2Helper functions to generate range-like data for DatetimeArray 

3(and possibly TimedeltaArray/PeriodArray) 

4""" 

5from __future__ import annotations 

6 

7import numpy as np 

8 

9from pandas._libs.lib import i8max 

10from pandas._libs.tslibs import ( 

11 BaseOffset, 

12 OutOfBoundsDatetime, 

13 Timedelta, 

14 Timestamp, 

15 iNaT, 

16) 

17from pandas._typing import npt 

18 

19 

20def generate_regular_range( 

21 start: Timestamp | Timedelta | None, 

22 end: Timestamp | Timedelta | None, 

23 periods: int | None, 

24 freq: BaseOffset, 

25 unit: str = "ns", 

26) -> npt.NDArray[np.intp]: 

27 """ 

28 Generate a range of dates or timestamps with the spans between dates 

29 described by the given `freq` DateOffset. 

30 

31 Parameters 

32 ---------- 

33 start : Timedelta, Timestamp or None 

34 First point of produced date range. 

35 end : Timedelta, Timestamp or None 

36 Last point of produced date range. 

37 periods : int or None 

38 Number of periods in produced date range. 

39 freq : Tick 

40 Describes space between dates in produced date range. 

41 unit : str, default "ns" 

42 The resolution the output is meant to represent. 

43 

44 Returns 

45 ------- 

46 ndarray[np.int64] 

47 Representing the given resolution. 

48 """ 

49 istart = start._value if start is not None else None 

50 iend = end._value if end is not None else None 

51 freq.nanos # raises if non-fixed frequency 

52 td = Timedelta(freq) 

53 try: 

54 td = td.as_unit( # pyright: ignore[reportGeneralTypeIssues] 

55 unit, round_ok=False 

56 ) 

57 except ValueError as err: 

58 raise ValueError( 

59 f"freq={freq} is incompatible with unit={unit}. " 

60 "Use a lower freq or a higher unit instead." 

61 ) from err 

62 stride = int(td._value) 

63 

64 if periods is None and istart is not None and iend is not None: 

65 b = istart 

66 # cannot just use e = Timestamp(end) + 1 because arange breaks when 

67 # stride is too large, see GH10887 

68 e = b + (iend - b) // stride * stride + stride // 2 + 1 

69 elif istart is not None and periods is not None: 

70 b = istart 

71 e = _generate_range_overflow_safe(b, periods, stride, side="start") 

72 elif iend is not None and periods is not None: 

73 e = iend + stride 

74 b = _generate_range_overflow_safe(e, periods, stride, side="end") 

75 else: 

76 raise ValueError( 

77 "at least 'start' or 'end' should be specified if a 'period' is given." 

78 ) 

79 

80 with np.errstate(over="raise"): 

81 # If the range is sufficiently large, np.arange may overflow 

82 # and incorrectly return an empty array if not caught. 

83 try: 

84 values = np.arange(b, e, stride, dtype=np.int64) 

85 except FloatingPointError: 

86 xdr = [b] 

87 while xdr[-1] != e: 

88 xdr.append(xdr[-1] + stride) 

89 values = np.array(xdr[:-1], dtype=np.int64) 

90 return values 

91 

92 

93def _generate_range_overflow_safe( 

94 endpoint: int, periods: int, stride: int, side: str = "start" 

95) -> int: 

96 """ 

97 Calculate the second endpoint for passing to np.arange, checking 

98 to avoid an integer overflow. Catch OverflowError and re-raise 

99 as OutOfBoundsDatetime. 

100 

101 Parameters 

102 ---------- 

103 endpoint : int 

104 nanosecond timestamp of the known endpoint of the desired range 

105 periods : int 

106 number of periods in the desired range 

107 stride : int 

108 nanoseconds between periods in the desired range 

109 side : {'start', 'end'} 

110 which end of the range `endpoint` refers to 

111 

112 Returns 

113 ------- 

114 other_end : int 

115 

116 Raises 

117 ------ 

118 OutOfBoundsDatetime 

119 """ 

120 # GH#14187 raise instead of incorrectly wrapping around 

121 assert side in ["start", "end"] 

122 

123 i64max = np.uint64(i8max) 

124 msg = f"Cannot generate range with {side}={endpoint} and periods={periods}" 

125 

126 with np.errstate(over="raise"): 

127 # if periods * strides cannot be multiplied within the *uint64* bounds, 

128 # we cannot salvage the operation by recursing, so raise 

129 try: 

130 addend = np.uint64(periods) * np.uint64(np.abs(stride)) 

131 except FloatingPointError as err: 

132 raise OutOfBoundsDatetime(msg) from err 

133 

134 if np.abs(addend) <= i64max: 

135 # relatively easy case without casting concerns 

136 return _generate_range_overflow_safe_signed(endpoint, periods, stride, side) 

137 

138 elif (endpoint > 0 and side == "start" and stride > 0) or ( 

139 endpoint < 0 < stride and side == "end" 

140 ): 

141 # no chance of not-overflowing 

142 raise OutOfBoundsDatetime(msg) 

143 

144 elif side == "end" and endpoint - stride <= i64max < endpoint: 

145 # in _generate_regular_range we added `stride` thereby overflowing 

146 # the bounds. Adjust to fix this. 

147 return _generate_range_overflow_safe( 

148 endpoint - stride, periods - 1, stride, side 

149 ) 

150 

151 # split into smaller pieces 

152 mid_periods = periods // 2 

153 remaining = periods - mid_periods 

154 assert 0 < remaining < periods, (remaining, periods, endpoint, stride) 

155 

156 midpoint = _generate_range_overflow_safe(endpoint, mid_periods, stride, side) 

157 return _generate_range_overflow_safe(midpoint, remaining, stride, side) 

158 

159 

160def _generate_range_overflow_safe_signed( 

161 endpoint: int, periods: int, stride: int, side: str 

162) -> int: 

163 """ 

164 A special case for _generate_range_overflow_safe where `periods * stride` 

165 can be calculated without overflowing int64 bounds. 

166 """ 

167 assert side in ["start", "end"] 

168 if side == "end": 

169 stride *= -1 

170 

171 with np.errstate(over="raise"): 

172 addend = np.int64(periods) * np.int64(stride) 

173 try: 

174 # easy case with no overflows 

175 result = np.int64(endpoint) + addend 

176 if result == iNaT: 

177 # Putting this into a DatetimeArray/TimedeltaArray 

178 # would incorrectly be interpreted as NaT 

179 raise OverflowError 

180 # error: Incompatible return value type (got "signedinteger[_64Bit]", 

181 # expected "int") 

182 return result # type: ignore[return-value] 

183 except (FloatingPointError, OverflowError): 

184 # with endpoint negative and addend positive we risk 

185 # FloatingPointError; with reversed signed we risk OverflowError 

186 pass 

187 

188 # if stride and endpoint had opposite signs, then endpoint + addend 

189 # should never overflow. so they must have the same signs 

190 assert (stride > 0 and endpoint >= 0) or (stride < 0 and endpoint <= 0) 

191 

192 if stride > 0: 

193 # watch out for very special case in which we just slightly 

194 # exceed implementation bounds, but when passing the result to 

195 # np.arange will get a result slightly within the bounds 

196 

197 # error: Incompatible types in assignment (expression has type 

198 # "unsignedinteger[_64Bit]", variable has type "signedinteger[_64Bit]") 

199 result = np.uint64(endpoint) + np.uint64(addend) # type: ignore[assignment] 

200 i64max = np.uint64(i8max) 

201 assert result > i64max 

202 if result <= i64max + np.uint64(stride): 

203 # error: Incompatible return value type (got "unsignedinteger", expected 

204 # "int") 

205 return result # type: ignore[return-value] 

206 

207 raise OutOfBoundsDatetime( 

208 f"Cannot generate range with {side}={endpoint} and periods={periods}" 

209 )