Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pandas/core/indexers/objects.py: 27%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

130 statements  

1"""Indexer objects for computing start/end window bounds for rolling operations""" 

2from __future__ import annotations 

3 

4from datetime import timedelta 

5 

6import numpy as np 

7 

8from pandas._libs.window.indexers import calculate_variable_window_bounds 

9from pandas.util._decorators import Appender 

10 

11from pandas.core.dtypes.common import ensure_platform_int 

12 

13from pandas.tseries.offsets import Nano 

14 

15get_window_bounds_doc = """ 

16Computes the bounds of a window. 

17 

18Parameters 

19---------- 

20num_values : int, default 0 

21 number of values that will be aggregated over 

22window_size : int, default 0 

23 the number of rows in a window 

24min_periods : int, default None 

25 min_periods passed from the top level rolling API 

26center : bool, default None 

27 center passed from the top level rolling API 

28closed : str, default None 

29 closed passed from the top level rolling API 

30step : int, default None 

31 step passed from the top level rolling API 

32 .. versionadded:: 1.5 

33win_type : str, default None 

34 win_type passed from the top level rolling API 

35 

36Returns 

37------- 

38A tuple of ndarray[int64]s, indicating the boundaries of each 

39window 

40""" 

41 

42 

43class BaseIndexer: 

44 """Base class for window bounds calculations.""" 

45 

46 def __init__( 

47 self, index_array: np.ndarray | None = None, window_size: int = 0, **kwargs 

48 ) -> None: 

49 """ 

50 Parameters 

51 ---------- 

52 **kwargs : 

53 keyword arguments that will be available when get_window_bounds is called 

54 """ 

55 self.index_array = index_array 

56 self.window_size = window_size 

57 # Set user defined kwargs as attributes that can be used in get_window_bounds 

58 for key, value in kwargs.items(): 

59 setattr(self, key, value) 

60 

61 @Appender(get_window_bounds_doc) 

62 def get_window_bounds( 

63 self, 

64 num_values: int = 0, 

65 min_periods: int | None = None, 

66 center: bool | None = None, 

67 closed: str | None = None, 

68 step: int | None = None, 

69 ) -> tuple[np.ndarray, np.ndarray]: 

70 raise NotImplementedError 

71 

72 

73class FixedWindowIndexer(BaseIndexer): 

74 """Creates window boundaries that are of fixed length.""" 

75 

76 @Appender(get_window_bounds_doc) 

77 def get_window_bounds( 

78 self, 

79 num_values: int = 0, 

80 min_periods: int | None = None, 

81 center: bool | None = None, 

82 closed: str | None = None, 

83 step: int | None = None, 

84 ) -> tuple[np.ndarray, np.ndarray]: 

85 if center: 

86 offset = (self.window_size - 1) // 2 

87 else: 

88 offset = 0 

89 

90 end = np.arange(1 + offset, num_values + 1 + offset, step, dtype="int64") 

91 start = end - self.window_size 

92 if closed in ["left", "both"]: 

93 start -= 1 

94 if closed in ["left", "neither"]: 

95 end -= 1 

96 

97 end = np.clip(end, 0, num_values) 

98 start = np.clip(start, 0, num_values) 

99 

100 return start, end 

101 

102 

103class VariableWindowIndexer(BaseIndexer): 

104 """Creates window boundaries that are of variable length, namely for time series.""" 

105 

106 @Appender(get_window_bounds_doc) 

107 def get_window_bounds( 

108 self, 

109 num_values: int = 0, 

110 min_periods: int | None = None, 

111 center: bool | None = None, 

112 closed: str | None = None, 

113 step: int | None = None, 

114 ) -> tuple[np.ndarray, np.ndarray]: 

115 # error: Argument 4 to "calculate_variable_window_bounds" has incompatible 

116 # type "Optional[bool]"; expected "bool" 

117 # error: Argument 6 to "calculate_variable_window_bounds" has incompatible 

118 # type "Optional[ndarray]"; expected "ndarray" 

119 return calculate_variable_window_bounds( 

120 num_values, 

121 self.window_size, 

122 min_periods, 

123 center, # type: ignore[arg-type] 

124 closed, 

125 self.index_array, # type: ignore[arg-type] 

126 ) 

127 

128 

129class VariableOffsetWindowIndexer(BaseIndexer): 

130 """Calculate window boundaries based on a non-fixed offset such as a BusinessDay.""" 

131 

132 def __init__( 

133 self, 

134 index_array: np.ndarray | None = None, 

135 window_size: int = 0, 

136 index=None, 

137 offset=None, 

138 **kwargs, 

139 ) -> None: 

140 super().__init__(index_array, window_size, **kwargs) 

141 self.index = index 

142 self.offset = offset 

143 

144 @Appender(get_window_bounds_doc) 

145 def get_window_bounds( 

146 self, 

147 num_values: int = 0, 

148 min_periods: int | None = None, 

149 center: bool | None = None, 

150 closed: str | None = None, 

151 step: int | None = None, 

152 ) -> tuple[np.ndarray, np.ndarray]: 

153 if step is not None: 

154 raise NotImplementedError("step not implemented for variable offset window") 

155 if num_values <= 0: 

156 return np.empty(0, dtype="int64"), np.empty(0, dtype="int64") 

157 

158 # if windows is variable, default is 'right', otherwise default is 'both' 

159 if closed is None: 

160 closed = "right" if self.index is not None else "both" 

161 

162 right_closed = closed in ["right", "both"] 

163 left_closed = closed in ["left", "both"] 

164 

165 if self.index[num_values - 1] < self.index[0]: 

166 index_growth_sign = -1 

167 else: 

168 index_growth_sign = 1 

169 

170 start = np.empty(num_values, dtype="int64") 

171 start.fill(-1) 

172 end = np.empty(num_values, dtype="int64") 

173 end.fill(-1) 

174 

175 start[0] = 0 

176 

177 # right endpoint is closed 

178 if right_closed: 

179 end[0] = 1 

180 # right endpoint is open 

181 else: 

182 end[0] = 0 

183 

184 # start is start of slice interval (including) 

185 # end is end of slice interval (not including) 

186 for i in range(1, num_values): 

187 end_bound = self.index[i] 

188 start_bound = self.index[i] - index_growth_sign * self.offset 

189 

190 # left endpoint is closed 

191 if left_closed: 

192 start_bound -= Nano(1) 

193 

194 # advance the start bound until we are 

195 # within the constraint 

196 start[i] = i 

197 for j in range(start[i - 1], i): 

198 if (self.index[j] - start_bound) * index_growth_sign > timedelta(0): 

199 start[i] = j 

200 break 

201 

202 # end bound is previous end 

203 # or current index 

204 if (self.index[end[i - 1]] - end_bound) * index_growth_sign <= timedelta(0): 

205 end[i] = i + 1 

206 else: 

207 end[i] = end[i - 1] 

208 

209 # right endpoint is open 

210 if not right_closed: 

211 end[i] -= 1 

212 

213 return start, end 

214 

215 

216class ExpandingIndexer(BaseIndexer): 

217 """Calculate expanding window bounds, mimicking df.expanding()""" 

218 

219 @Appender(get_window_bounds_doc) 

220 def get_window_bounds( 

221 self, 

222 num_values: int = 0, 

223 min_periods: int | None = None, 

224 center: bool | None = None, 

225 closed: str | None = None, 

226 step: int | None = None, 

227 ) -> tuple[np.ndarray, np.ndarray]: 

228 return ( 

229 np.zeros(num_values, dtype=np.int64), 

230 np.arange(1, num_values + 1, dtype=np.int64), 

231 ) 

232 

233 

234class FixedForwardWindowIndexer(BaseIndexer): 

235 """ 

236 Creates window boundaries for fixed-length windows that include the current row. 

237 

238 Examples 

239 -------- 

240 >>> df = pd.DataFrame({'B': [0, 1, 2, np.nan, 4]}) 

241 >>> df 

242 B 

243 0 0.0 

244 1 1.0 

245 2 2.0 

246 3 NaN 

247 4 4.0 

248 

249 >>> indexer = pd.api.indexers.FixedForwardWindowIndexer(window_size=2) 

250 >>> df.rolling(window=indexer, min_periods=1).sum() 

251 B 

252 0 1.0 

253 1 3.0 

254 2 2.0 

255 3 4.0 

256 4 4.0 

257 """ 

258 

259 @Appender(get_window_bounds_doc) 

260 def get_window_bounds( 

261 self, 

262 num_values: int = 0, 

263 min_periods: int | None = None, 

264 center: bool | None = None, 

265 closed: str | None = None, 

266 step: int | None = None, 

267 ) -> tuple[np.ndarray, np.ndarray]: 

268 if center: 

269 raise ValueError("Forward-looking windows can't have center=True") 

270 if closed is not None: 

271 raise ValueError( 

272 "Forward-looking windows don't support setting the closed argument" 

273 ) 

274 if step is None: 

275 step = 1 

276 

277 start = np.arange(0, num_values, step, dtype="int64") 

278 end = start + self.window_size 

279 if self.window_size: 

280 end = np.clip(end, 0, num_values) 

281 

282 return start, end 

283 

284 

285class GroupbyIndexer(BaseIndexer): 

286 """Calculate bounds to compute groupby rolling, mimicking df.groupby().rolling()""" 

287 

288 def __init__( 

289 self, 

290 index_array: np.ndarray | None = None, 

291 window_size: int | BaseIndexer = 0, 

292 groupby_indices: dict | None = None, 

293 window_indexer: type[BaseIndexer] = BaseIndexer, 

294 indexer_kwargs: dict | None = None, 

295 **kwargs, 

296 ) -> None: 

297 """ 

298 Parameters 

299 ---------- 

300 index_array : np.ndarray or None 

301 np.ndarray of the index of the original object that we are performing 

302 a chained groupby operation over. This index has been pre-sorted relative to 

303 the groups 

304 window_size : int or BaseIndexer 

305 window size during the windowing operation 

306 groupby_indices : dict or None 

307 dict of {group label: [positional index of rows belonging to the group]} 

308 window_indexer : BaseIndexer 

309 BaseIndexer class determining the start and end bounds of each group 

310 indexer_kwargs : dict or None 

311 Custom kwargs to be passed to window_indexer 

312 **kwargs : 

313 keyword arguments that will be available when get_window_bounds is called 

314 """ 

315 self.groupby_indices = groupby_indices or {} 

316 self.window_indexer = window_indexer 

317 self.indexer_kwargs = indexer_kwargs.copy() if indexer_kwargs else {} 

318 super().__init__( 

319 index_array=index_array, 

320 window_size=self.indexer_kwargs.pop("window_size", window_size), 

321 **kwargs, 

322 ) 

323 

324 @Appender(get_window_bounds_doc) 

325 def get_window_bounds( 

326 self, 

327 num_values: int = 0, 

328 min_periods: int | None = None, 

329 center: bool | None = None, 

330 closed: str | None = None, 

331 step: int | None = None, 

332 ) -> tuple[np.ndarray, np.ndarray]: 

333 # 1) For each group, get the indices that belong to the group 

334 # 2) Use the indices to calculate the start & end bounds of the window 

335 # 3) Append the window bounds in group order 

336 start_arrays = [] 

337 end_arrays = [] 

338 window_indices_start = 0 

339 for key, indices in self.groupby_indices.items(): 

340 index_array: np.ndarray | None 

341 

342 if self.index_array is not None: 

343 index_array = self.index_array.take(ensure_platform_int(indices)) 

344 else: 

345 index_array = self.index_array 

346 indexer = self.window_indexer( 

347 index_array=index_array, 

348 window_size=self.window_size, 

349 **self.indexer_kwargs, 

350 ) 

351 start, end = indexer.get_window_bounds( 

352 len(indices), min_periods, center, closed, step 

353 ) 

354 start = start.astype(np.int64) 

355 end = end.astype(np.int64) 

356 assert len(start) == len( 

357 end 

358 ), "these should be equal in length from get_window_bounds" 

359 # Cannot use groupby_indices as they might not be monotonic with the object 

360 # we're rolling over 

361 window_indices = np.arange( 

362 window_indices_start, window_indices_start + len(indices) 

363 ) 

364 window_indices_start += len(indices) 

365 # Extend as we'll be slicing window like [start, end) 

366 window_indices = np.append(window_indices, [window_indices[-1] + 1]).astype( 

367 np.int64, copy=False 

368 ) 

369 start_arrays.append(window_indices.take(ensure_platform_int(start))) 

370 end_arrays.append(window_indices.take(ensure_platform_int(end))) 

371 if len(start_arrays) == 0: 

372 return np.array([], dtype=np.int64), np.array([], dtype=np.int64) 

373 start = np.concatenate(start_arrays) 

374 end = np.concatenate(end_arrays) 

375 return start, end 

376 

377 

378class ExponentialMovingWindowIndexer(BaseIndexer): 

379 """Calculate ewm window bounds (the entire window)""" 

380 

381 @Appender(get_window_bounds_doc) 

382 def get_window_bounds( 

383 self, 

384 num_values: int = 0, 

385 min_periods: int | None = None, 

386 center: bool | None = None, 

387 closed: str | None = None, 

388 step: int | None = None, 

389 ) -> tuple[np.ndarray, np.ndarray]: 

390 return np.array([0], dtype=np.int64), np.array([num_values], dtype=np.int64)