Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.9/dist-packages/pandas/core/indexers/objects.py: 26%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

142 statements  

1"""Indexer objects for computing start/end window bounds for rolling operations""" 

2from __future__ import annotations 

3 

4from datetime import timedelta 

5 

6import numpy as np 

7 

8from pandas._libs.tslibs import BaseOffset 

9from pandas._libs.window.indexers import calculate_variable_window_bounds 

10from pandas.util._decorators import Appender 

11 

12from pandas.core.dtypes.common import ensure_platform_int 

13 

14from pandas.core.indexes.datetimes import DatetimeIndex 

15 

16from pandas.tseries.offsets import Nano 

17 

18get_window_bounds_doc = """ 

19Computes the bounds of a window. 

20 

21Parameters 

22---------- 

23num_values : int, default 0 

24 number of values that will be aggregated over 

25window_size : int, default 0 

26 the number of rows in a window 

27min_periods : int, default None 

28 min_periods passed from the top level rolling API 

29center : bool, default None 

30 center passed from the top level rolling API 

31closed : str, default None 

32 closed passed from the top level rolling API 

33step : int, default None 

34 step passed from the top level rolling API 

35 .. versionadded:: 1.5 

36win_type : str, default None 

37 win_type passed from the top level rolling API 

38 

39Returns 

40------- 

41A tuple of ndarray[int64]s, indicating the boundaries of each 

42window 

43""" 

44 

45 

46class BaseIndexer: 

47 """ 

48 Base class for window bounds calculations. 

49 

50 Examples 

51 -------- 

52 >>> from pandas.api.indexers import BaseIndexer 

53 >>> class CustomIndexer(BaseIndexer): 

54 ... def get_window_bounds(self, num_values, min_periods, center, closed, step): 

55 ... start = np.empty(num_values, dtype=np.int64) 

56 ... end = np.empty(num_values, dtype=np.int64) 

57 ... for i in range(num_values): 

58 ... start[i] = i 

59 ... end[i] = i + self.window_size 

60 ... return start, end 

61 >>> df = pd.DataFrame({"values": range(5)}) 

62 >>> indexer = CustomIndexer(window_size=2) 

63 >>> df.rolling(indexer).sum() 

64 values 

65 0 1.0 

66 1 3.0 

67 2 5.0 

68 3 7.0 

69 4 4.0 

70 """ 

71 

72 def __init__( 

73 self, index_array: np.ndarray | None = None, window_size: int = 0, **kwargs 

74 ) -> None: 

75 self.index_array = index_array 

76 self.window_size = window_size 

77 # Set user defined kwargs as attributes that can be used in get_window_bounds 

78 for key, value in kwargs.items(): 

79 setattr(self, key, value) 

80 

81 @Appender(get_window_bounds_doc) 

82 def get_window_bounds( 

83 self, 

84 num_values: int = 0, 

85 min_periods: int | None = None, 

86 center: bool | None = None, 

87 closed: str | None = None, 

88 step: int | None = None, 

89 ) -> tuple[np.ndarray, np.ndarray]: 

90 raise NotImplementedError 

91 

92 

93class FixedWindowIndexer(BaseIndexer): 

94 """Creates window boundaries that are of fixed length.""" 

95 

96 @Appender(get_window_bounds_doc) 

97 def get_window_bounds( 

98 self, 

99 num_values: int = 0, 

100 min_periods: int | None = None, 

101 center: bool | None = None, 

102 closed: str | None = None, 

103 step: int | None = None, 

104 ) -> tuple[np.ndarray, np.ndarray]: 

105 if center or self.window_size == 0: 

106 offset = (self.window_size - 1) // 2 

107 else: 

108 offset = 0 

109 

110 end = np.arange(1 + offset, num_values + 1 + offset, step, dtype="int64") 

111 start = end - self.window_size 

112 if closed in ["left", "both"]: 

113 start -= 1 

114 if closed in ["left", "neither"]: 

115 end -= 1 

116 

117 end = np.clip(end, 0, num_values) 

118 start = np.clip(start, 0, num_values) 

119 

120 return start, end 

121 

122 

123class VariableWindowIndexer(BaseIndexer): 

124 """Creates window boundaries that are of variable length, namely for time series.""" 

125 

126 @Appender(get_window_bounds_doc) 

127 def get_window_bounds( 

128 self, 

129 num_values: int = 0, 

130 min_periods: int | None = None, 

131 center: bool | None = None, 

132 closed: str | None = None, 

133 step: int | None = None, 

134 ) -> tuple[np.ndarray, np.ndarray]: 

135 # error: Argument 4 to "calculate_variable_window_bounds" has incompatible 

136 # type "Optional[bool]"; expected "bool" 

137 # error: Argument 6 to "calculate_variable_window_bounds" has incompatible 

138 # type "Optional[ndarray]"; expected "ndarray" 

139 return calculate_variable_window_bounds( 

140 num_values, 

141 self.window_size, 

142 min_periods, 

143 center, # type: ignore[arg-type] 

144 closed, 

145 self.index_array, # type: ignore[arg-type] 

146 ) 

147 

148 

149class VariableOffsetWindowIndexer(BaseIndexer): 

150 """ 

151 Calculate window boundaries based on a non-fixed offset such as a BusinessDay. 

152 

153 Examples 

154 -------- 

155 >>> from pandas.api.indexers import VariableOffsetWindowIndexer 

156 >>> df = pd.DataFrame(range(10), index=pd.date_range("2020", periods=10)) 

157 >>> offset = pd.offsets.BDay(1) 

158 >>> indexer = VariableOffsetWindowIndexer(index=df.index, offset=offset) 

159 >>> df 

160 0 

161 2020-01-01 0 

162 2020-01-02 1 

163 2020-01-03 2 

164 2020-01-04 3 

165 2020-01-05 4 

166 2020-01-06 5 

167 2020-01-07 6 

168 2020-01-08 7 

169 2020-01-09 8 

170 2020-01-10 9 

171 >>> df.rolling(indexer).sum() 

172 0 

173 2020-01-01 0.0 

174 2020-01-02 1.0 

175 2020-01-03 2.0 

176 2020-01-04 3.0 

177 2020-01-05 7.0 

178 2020-01-06 12.0 

179 2020-01-07 6.0 

180 2020-01-08 7.0 

181 2020-01-09 8.0 

182 2020-01-10 9.0 

183 """ 

184 

185 def __init__( 

186 self, 

187 index_array: np.ndarray | None = None, 

188 window_size: int = 0, 

189 index: DatetimeIndex | None = None, 

190 offset: BaseOffset | None = None, 

191 **kwargs, 

192 ) -> None: 

193 super().__init__(index_array, window_size, **kwargs) 

194 if not isinstance(index, DatetimeIndex): 

195 raise ValueError("index must be a DatetimeIndex.") 

196 self.index = index 

197 if not isinstance(offset, BaseOffset): 

198 raise ValueError("offset must be a DateOffset-like object.") 

199 self.offset = offset 

200 

201 @Appender(get_window_bounds_doc) 

202 def get_window_bounds( 

203 self, 

204 num_values: int = 0, 

205 min_periods: int | None = None, 

206 center: bool | None = None, 

207 closed: str | None = None, 

208 step: int | None = None, 

209 ) -> tuple[np.ndarray, np.ndarray]: 

210 if step is not None: 

211 raise NotImplementedError("step not implemented for variable offset window") 

212 if num_values <= 0: 

213 return np.empty(0, dtype="int64"), np.empty(0, dtype="int64") 

214 

215 # if windows is variable, default is 'right', otherwise default is 'both' 

216 if closed is None: 

217 closed = "right" if self.index is not None else "both" 

218 

219 right_closed = closed in ["right", "both"] 

220 left_closed = closed in ["left", "both"] 

221 

222 if self.index[num_values - 1] < self.index[0]: 

223 index_growth_sign = -1 

224 else: 

225 index_growth_sign = 1 

226 offset_diff = index_growth_sign * self.offset 

227 

228 start = np.empty(num_values, dtype="int64") 

229 start.fill(-1) 

230 end = np.empty(num_values, dtype="int64") 

231 end.fill(-1) 

232 

233 start[0] = 0 

234 

235 # right endpoint is closed 

236 if right_closed: 

237 end[0] = 1 

238 # right endpoint is open 

239 else: 

240 end[0] = 0 

241 

242 zero = timedelta(0) 

243 # start is start of slice interval (including) 

244 # end is end of slice interval (not including) 

245 for i in range(1, num_values): 

246 end_bound = self.index[i] 

247 start_bound = end_bound - offset_diff 

248 

249 # left endpoint is closed 

250 if left_closed: 

251 start_bound -= Nano(1) 

252 

253 # advance the start bound until we are 

254 # within the constraint 

255 start[i] = i 

256 for j in range(start[i - 1], i): 

257 start_diff = (self.index[j] - start_bound) * index_growth_sign 

258 if start_diff > zero: 

259 start[i] = j 

260 break 

261 

262 # end bound is previous end 

263 # or current index 

264 end_diff = (self.index[end[i - 1]] - end_bound) * index_growth_sign 

265 if end_diff == zero and not right_closed: 

266 end[i] = end[i - 1] + 1 

267 elif end_diff <= zero: 

268 end[i] = i + 1 

269 else: 

270 end[i] = end[i - 1] 

271 

272 # right endpoint is open 

273 if not right_closed: 

274 end[i] -= 1 

275 

276 return start, end 

277 

278 

279class ExpandingIndexer(BaseIndexer): 

280 """Calculate expanding window bounds, mimicking df.expanding()""" 

281 

282 @Appender(get_window_bounds_doc) 

283 def get_window_bounds( 

284 self, 

285 num_values: int = 0, 

286 min_periods: int | None = None, 

287 center: bool | None = None, 

288 closed: str | None = None, 

289 step: int | None = None, 

290 ) -> tuple[np.ndarray, np.ndarray]: 

291 return ( 

292 np.zeros(num_values, dtype=np.int64), 

293 np.arange(1, num_values + 1, dtype=np.int64), 

294 ) 

295 

296 

297class FixedForwardWindowIndexer(BaseIndexer): 

298 """ 

299 Creates window boundaries for fixed-length windows that include the current row. 

300 

301 Examples 

302 -------- 

303 >>> df = pd.DataFrame({'B': [0, 1, 2, np.nan, 4]}) 

304 >>> df 

305 B 

306 0 0.0 

307 1 1.0 

308 2 2.0 

309 3 NaN 

310 4 4.0 

311 

312 >>> indexer = pd.api.indexers.FixedForwardWindowIndexer(window_size=2) 

313 >>> df.rolling(window=indexer, min_periods=1).sum() 

314 B 

315 0 1.0 

316 1 3.0 

317 2 2.0 

318 3 4.0 

319 4 4.0 

320 """ 

321 

322 @Appender(get_window_bounds_doc) 

323 def get_window_bounds( 

324 self, 

325 num_values: int = 0, 

326 min_periods: int | None = None, 

327 center: bool | None = None, 

328 closed: str | None = None, 

329 step: int | None = None, 

330 ) -> tuple[np.ndarray, np.ndarray]: 

331 if center: 

332 raise ValueError("Forward-looking windows can't have center=True") 

333 if closed is not None: 

334 raise ValueError( 

335 "Forward-looking windows don't support setting the closed argument" 

336 ) 

337 if step is None: 

338 step = 1 

339 

340 start = np.arange(0, num_values, step, dtype="int64") 

341 end = start + self.window_size 

342 if self.window_size: 

343 end = np.clip(end, 0, num_values) 

344 

345 return start, end 

346 

347 

348class GroupbyIndexer(BaseIndexer): 

349 """Calculate bounds to compute groupby rolling, mimicking df.groupby().rolling()""" 

350 

351 def __init__( 

352 self, 

353 index_array: np.ndarray | None = None, 

354 window_size: int | BaseIndexer = 0, 

355 groupby_indices: dict | None = None, 

356 window_indexer: type[BaseIndexer] = BaseIndexer, 

357 indexer_kwargs: dict | None = None, 

358 **kwargs, 

359 ) -> None: 

360 """ 

361 Parameters 

362 ---------- 

363 index_array : np.ndarray or None 

364 np.ndarray of the index of the original object that we are performing 

365 a chained groupby operation over. This index has been pre-sorted relative to 

366 the groups 

367 window_size : int or BaseIndexer 

368 window size during the windowing operation 

369 groupby_indices : dict or None 

370 dict of {group label: [positional index of rows belonging to the group]} 

371 window_indexer : BaseIndexer 

372 BaseIndexer class determining the start and end bounds of each group 

373 indexer_kwargs : dict or None 

374 Custom kwargs to be passed to window_indexer 

375 **kwargs : 

376 keyword arguments that will be available when get_window_bounds is called 

377 """ 

378 self.groupby_indices = groupby_indices or {} 

379 self.window_indexer = window_indexer 

380 self.indexer_kwargs = indexer_kwargs.copy() if indexer_kwargs else {} 

381 super().__init__( 

382 index_array=index_array, 

383 window_size=self.indexer_kwargs.pop("window_size", window_size), 

384 **kwargs, 

385 ) 

386 

387 @Appender(get_window_bounds_doc) 

388 def get_window_bounds( 

389 self, 

390 num_values: int = 0, 

391 min_periods: int | None = None, 

392 center: bool | None = None, 

393 closed: str | None = None, 

394 step: int | None = None, 

395 ) -> tuple[np.ndarray, np.ndarray]: 

396 # 1) For each group, get the indices that belong to the group 

397 # 2) Use the indices to calculate the start & end bounds of the window 

398 # 3) Append the window bounds in group order 

399 start_arrays = [] 

400 end_arrays = [] 

401 window_indices_start = 0 

402 for key, indices in self.groupby_indices.items(): 

403 index_array: np.ndarray | None 

404 

405 if self.index_array is not None: 

406 index_array = self.index_array.take(ensure_platform_int(indices)) 

407 else: 

408 index_array = self.index_array 

409 indexer = self.window_indexer( 

410 index_array=index_array, 

411 window_size=self.window_size, 

412 **self.indexer_kwargs, 

413 ) 

414 start, end = indexer.get_window_bounds( 

415 len(indices), min_periods, center, closed, step 

416 ) 

417 start = start.astype(np.int64) 

418 end = end.astype(np.int64) 

419 assert len(start) == len( 

420 end 

421 ), "these should be equal in length from get_window_bounds" 

422 # Cannot use groupby_indices as they might not be monotonic with the object 

423 # we're rolling over 

424 window_indices = np.arange( 

425 window_indices_start, window_indices_start + len(indices) 

426 ) 

427 window_indices_start += len(indices) 

428 # Extend as we'll be slicing window like [start, end) 

429 window_indices = np.append(window_indices, [window_indices[-1] + 1]).astype( 

430 np.int64, copy=False 

431 ) 

432 start_arrays.append(window_indices.take(ensure_platform_int(start))) 

433 end_arrays.append(window_indices.take(ensure_platform_int(end))) 

434 if len(start_arrays) == 0: 

435 return np.array([], dtype=np.int64), np.array([], dtype=np.int64) 

436 start = np.concatenate(start_arrays) 

437 end = np.concatenate(end_arrays) 

438 return start, end 

439 

440 

441class ExponentialMovingWindowIndexer(BaseIndexer): 

442 """Calculate ewm window bounds (the entire window)""" 

443 

444 @Appender(get_window_bounds_doc) 

445 def get_window_bounds( 

446 self, 

447 num_values: int = 0, 

448 min_periods: int | None = None, 

449 center: bool | None = None, 

450 closed: str | None = None, 

451 step: int | None = None, 

452 ) -> tuple[np.ndarray, np.ndarray]: 

453 return np.array([0], dtype=np.int64), np.array([num_values], dtype=np.int64)