Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.9/dist-packages/pandas/core/groupby/ops.py: 42%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

515 statements  

1""" 

2Provide classes to perform the groupby aggregate operations. 

3 

4These are not exposed to the user and provide implementations of the grouping 

5operations, primarily in cython. These classes (BaseGrouper and BinGrouper) 

6are contained *in* the SeriesGroupBy and DataFrameGroupBy objects. 

7""" 

8from __future__ import annotations 

9 

10import collections 

11import functools 

12from typing import ( 

13 TYPE_CHECKING, 

14 Callable, 

15 Generic, 

16 final, 

17) 

18 

19import numpy as np 

20 

21from pandas._libs import ( 

22 NaT, 

23 lib, 

24) 

25import pandas._libs.groupby as libgroupby 

26from pandas._typing import ( 

27 ArrayLike, 

28 AxisInt, 

29 NDFrameT, 

30 Shape, 

31 npt, 

32) 

33from pandas.errors import AbstractMethodError 

34from pandas.util._decorators import cache_readonly 

35 

36from pandas.core.dtypes.cast import ( 

37 maybe_cast_pointwise_result, 

38 maybe_downcast_to_dtype, 

39) 

40from pandas.core.dtypes.common import ( 

41 ensure_float64, 

42 ensure_int64, 

43 ensure_platform_int, 

44 ensure_uint64, 

45 is_1d_only_ea_dtype, 

46) 

47from pandas.core.dtypes.missing import ( 

48 isna, 

49 maybe_fill, 

50) 

51 

52from pandas.core.frame import DataFrame 

53from pandas.core.groupby import grouper 

54from pandas.core.indexes.api import ( 

55 CategoricalIndex, 

56 Index, 

57 MultiIndex, 

58 ensure_index, 

59) 

60from pandas.core.series import Series 

61from pandas.core.sorting import ( 

62 compress_group_index, 

63 decons_obs_group_ids, 

64 get_flattened_list, 

65 get_group_index, 

66 get_group_index_sorter, 

67 get_indexer_dict, 

68) 

69 

70if TYPE_CHECKING: 

71 from collections.abc import ( 

72 Hashable, 

73 Iterator, 

74 Sequence, 

75 ) 

76 

77 from pandas.core.generic import NDFrame 

78 

79 

80def check_result_array(obj, dtype) -> None: 

81 # Our operation is supposed to be an aggregation/reduction. If 

82 # it returns an ndarray, this likely means an invalid operation has 

83 # been passed. See test_apply_without_aggregation, test_agg_must_agg 

84 if isinstance(obj, np.ndarray): 

85 if dtype != object: 

86 # If it is object dtype, the function can be a reduction/aggregation 

87 # and still return an ndarray e.g. test_agg_over_numpy_arrays 

88 raise ValueError("Must produce aggregated value") 

89 

90 

91def extract_result(res): 

92 """ 

93 Extract the result object, it might be a 0-dim ndarray 

94 or a len-1 0-dim, or a scalar 

95 """ 

96 if hasattr(res, "_values"): 

97 # Preserve EA 

98 res = res._values 

99 if res.ndim == 1 and len(res) == 1: 

100 # see test_agg_lambda_with_timezone, test_resampler_grouper.py::test_apply 

101 res = res[0] 

102 return res 

103 

104 

105class WrappedCythonOp: 

106 """ 

107 Dispatch logic for functions defined in _libs.groupby 

108 

109 Parameters 

110 ---------- 

111 kind: str 

112 Whether the operation is an aggregate or transform. 

113 how: str 

114 Operation name, e.g. "mean". 

115 has_dropped_na: bool 

116 True precisely when dropna=True and the grouper contains a null value. 

117 """ 

118 

119 # Functions for which we do _not_ attempt to cast the cython result 

120 # back to the original dtype. 

121 cast_blocklist = frozenset( 

122 ["any", "all", "rank", "count", "size", "idxmin", "idxmax"] 

123 ) 

124 

125 def __init__(self, kind: str, how: str, has_dropped_na: bool) -> None: 

126 self.kind = kind 

127 self.how = how 

128 self.has_dropped_na = has_dropped_na 

129 

130 _CYTHON_FUNCTIONS: dict[str, dict] = { 

131 "aggregate": { 

132 "any": functools.partial(libgroupby.group_any_all, val_test="any"), 

133 "all": functools.partial(libgroupby.group_any_all, val_test="all"), 

134 "sum": "group_sum", 

135 "prod": "group_prod", 

136 "idxmin": functools.partial(libgroupby.group_idxmin_idxmax, name="idxmin"), 

137 "idxmax": functools.partial(libgroupby.group_idxmin_idxmax, name="idxmax"), 

138 "min": "group_min", 

139 "max": "group_max", 

140 "mean": "group_mean", 

141 "median": "group_median_float64", 

142 "var": "group_var", 

143 "std": functools.partial(libgroupby.group_var, name="std"), 

144 "sem": functools.partial(libgroupby.group_var, name="sem"), 

145 "skew": "group_skew", 

146 "first": "group_nth", 

147 "last": "group_last", 

148 "ohlc": "group_ohlc", 

149 }, 

150 "transform": { 

151 "cumprod": "group_cumprod", 

152 "cumsum": "group_cumsum", 

153 "cummin": "group_cummin", 

154 "cummax": "group_cummax", 

155 "rank": "group_rank", 

156 }, 

157 } 

158 

159 _cython_arity = {"ohlc": 4} # OHLC 

160 

161 @classmethod 

162 def get_kind_from_how(cls, how: str) -> str: 

163 if how in cls._CYTHON_FUNCTIONS["aggregate"]: 

164 return "aggregate" 

165 return "transform" 

166 

167 # Note: we make this a classmethod and pass kind+how so that caching 

168 # works at the class level and not the instance level 

169 @classmethod 

170 @functools.cache 

171 def _get_cython_function( 

172 cls, kind: str, how: str, dtype: np.dtype, is_numeric: bool 

173 ): 

174 dtype_str = dtype.name 

175 ftype = cls._CYTHON_FUNCTIONS[kind][how] 

176 

177 # see if there is a fused-type version of function 

178 # only valid for numeric 

179 if callable(ftype): 

180 f = ftype 

181 else: 

182 f = getattr(libgroupby, ftype) 

183 if is_numeric: 

184 return f 

185 elif dtype == np.dtype(object): 

186 if how in ["median", "cumprod"]: 

187 # no fused types -> no __signatures__ 

188 raise NotImplementedError( 

189 f"function is not implemented for this dtype: " 

190 f"[how->{how},dtype->{dtype_str}]" 

191 ) 

192 elif how in ["std", "sem", "idxmin", "idxmax"]: 

193 # We have a partial object that does not have __signatures__ 

194 return f 

195 elif how == "skew": 

196 # _get_cython_vals will convert to float64 

197 pass 

198 elif "object" not in f.__signatures__: 

199 # raise NotImplementedError here rather than TypeError later 

200 raise NotImplementedError( 

201 f"function is not implemented for this dtype: " 

202 f"[how->{how},dtype->{dtype_str}]" 

203 ) 

204 return f 

205 else: 

206 raise NotImplementedError( 

207 "This should not be reached. Please report a bug at " 

208 "github.com/pandas-dev/pandas/", 

209 dtype, 

210 ) 

211 

212 def _get_cython_vals(self, values: np.ndarray) -> np.ndarray: 

213 """ 

214 Cast numeric dtypes to float64 for functions that only support that. 

215 

216 Parameters 

217 ---------- 

218 values : np.ndarray 

219 

220 Returns 

221 ------- 

222 values : np.ndarray 

223 """ 

224 how = self.how 

225 

226 if how in ["median", "std", "sem", "skew"]: 

227 # median only has a float64 implementation 

228 # We should only get here with is_numeric, as non-numeric cases 

229 # should raise in _get_cython_function 

230 values = ensure_float64(values) 

231 

232 elif values.dtype.kind in "iu": 

233 if how in ["var", "mean"] or ( 

234 self.kind == "transform" and self.has_dropped_na 

235 ): 

236 # has_dropped_na check need for test_null_group_str_transformer 

237 # result may still include NaN, so we have to cast 

238 values = ensure_float64(values) 

239 

240 elif how in ["sum", "ohlc", "prod", "cumsum", "cumprod"]: 

241 # Avoid overflow during group op 

242 if values.dtype.kind == "i": 

243 values = ensure_int64(values) 

244 else: 

245 values = ensure_uint64(values) 

246 

247 return values 

248 

249 def _get_output_shape(self, ngroups: int, values: np.ndarray) -> Shape: 

250 how = self.how 

251 kind = self.kind 

252 

253 arity = self._cython_arity.get(how, 1) 

254 

255 out_shape: Shape 

256 if how == "ohlc": 

257 out_shape = (ngroups, arity) 

258 elif arity > 1: 

259 raise NotImplementedError( 

260 "arity of more than 1 is not supported for the 'how' argument" 

261 ) 

262 elif kind == "transform": 

263 out_shape = values.shape 

264 else: 

265 out_shape = (ngroups,) + values.shape[1:] 

266 return out_shape 

267 

268 def _get_out_dtype(self, dtype: np.dtype) -> np.dtype: 

269 how = self.how 

270 

271 if how == "rank": 

272 out_dtype = "float64" 

273 elif how in ["idxmin", "idxmax"]: 

274 # The Cython implementation only produces the row number; we'll take 

275 # from the index using this in post processing 

276 out_dtype = "intp" 

277 else: 

278 if dtype.kind in "iufcb": 

279 out_dtype = f"{dtype.kind}{dtype.itemsize}" 

280 else: 

281 out_dtype = "object" 

282 return np.dtype(out_dtype) 

283 

284 def _get_result_dtype(self, dtype: np.dtype) -> np.dtype: 

285 """ 

286 Get the desired dtype of a result based on the 

287 input dtype and how it was computed. 

288 

289 Parameters 

290 ---------- 

291 dtype : np.dtype 

292 

293 Returns 

294 ------- 

295 np.dtype 

296 The desired dtype of the result. 

297 """ 

298 how = self.how 

299 

300 if how in ["sum", "cumsum", "sum", "prod", "cumprod"]: 

301 if dtype == np.dtype(bool): 

302 return np.dtype(np.int64) 

303 elif how in ["mean", "median", "var", "std", "sem"]: 

304 if dtype.kind in "fc": 

305 return dtype 

306 elif dtype.kind in "iub": 

307 return np.dtype(np.float64) 

308 return dtype 

309 

310 @final 

311 def _cython_op_ndim_compat( 

312 self, 

313 values: np.ndarray, 

314 *, 

315 min_count: int, 

316 ngroups: int, 

317 comp_ids: np.ndarray, 

318 mask: npt.NDArray[np.bool_] | None = None, 

319 result_mask: npt.NDArray[np.bool_] | None = None, 

320 **kwargs, 

321 ) -> np.ndarray: 

322 if values.ndim == 1: 

323 # expand to 2d, dispatch, then squeeze if appropriate 

324 values2d = values[None, :] 

325 if mask is not None: 

326 mask = mask[None, :] 

327 if result_mask is not None: 

328 result_mask = result_mask[None, :] 

329 res = self._call_cython_op( 

330 values2d, 

331 min_count=min_count, 

332 ngroups=ngroups, 

333 comp_ids=comp_ids, 

334 mask=mask, 

335 result_mask=result_mask, 

336 **kwargs, 

337 ) 

338 if res.shape[0] == 1: 

339 return res[0] 

340 

341 # otherwise we have OHLC 

342 return res.T 

343 

344 return self._call_cython_op( 

345 values, 

346 min_count=min_count, 

347 ngroups=ngroups, 

348 comp_ids=comp_ids, 

349 mask=mask, 

350 result_mask=result_mask, 

351 **kwargs, 

352 ) 

353 

354 @final 

355 def _call_cython_op( 

356 self, 

357 values: np.ndarray, # np.ndarray[ndim=2] 

358 *, 

359 min_count: int, 

360 ngroups: int, 

361 comp_ids: np.ndarray, 

362 mask: npt.NDArray[np.bool_] | None, 

363 result_mask: npt.NDArray[np.bool_] | None, 

364 **kwargs, 

365 ) -> np.ndarray: # np.ndarray[ndim=2] 

366 orig_values = values 

367 

368 dtype = values.dtype 

369 is_numeric = dtype.kind in "iufcb" 

370 

371 is_datetimelike = dtype.kind in "mM" 

372 

373 if is_datetimelike: 

374 values = values.view("int64") 

375 is_numeric = True 

376 elif dtype.kind == "b": 

377 values = values.view("uint8") 

378 if values.dtype == "float16": 

379 values = values.astype(np.float32) 

380 

381 if self.how in ["any", "all"]: 

382 if mask is None: 

383 mask = isna(values) 

384 if dtype == object: 

385 if kwargs["skipna"]: 

386 # GH#37501: don't raise on pd.NA when skipna=True 

387 if mask.any(): 

388 # mask on original values computed separately 

389 values = values.copy() 

390 values[mask] = True 

391 values = values.astype(bool, copy=False).view(np.int8) 

392 is_numeric = True 

393 

394 values = values.T 

395 if mask is not None: 

396 mask = mask.T 

397 if result_mask is not None: 

398 result_mask = result_mask.T 

399 

400 out_shape = self._get_output_shape(ngroups, values) 

401 func = self._get_cython_function(self.kind, self.how, values.dtype, is_numeric) 

402 values = self._get_cython_vals(values) 

403 out_dtype = self._get_out_dtype(values.dtype) 

404 

405 result = maybe_fill(np.empty(out_shape, dtype=out_dtype)) 

406 if self.kind == "aggregate": 

407 counts = np.zeros(ngroups, dtype=np.int64) 

408 if self.how in [ 

409 "idxmin", 

410 "idxmax", 

411 "min", 

412 "max", 

413 "mean", 

414 "last", 

415 "first", 

416 "sum", 

417 ]: 

418 func( 

419 out=result, 

420 counts=counts, 

421 values=values, 

422 labels=comp_ids, 

423 min_count=min_count, 

424 mask=mask, 

425 result_mask=result_mask, 

426 is_datetimelike=is_datetimelike, 

427 **kwargs, 

428 ) 

429 elif self.how in ["sem", "std", "var", "ohlc", "prod", "median"]: 

430 if self.how in ["std", "sem"]: 

431 kwargs["is_datetimelike"] = is_datetimelike 

432 func( 

433 result, 

434 counts, 

435 values, 

436 comp_ids, 

437 min_count=min_count, 

438 mask=mask, 

439 result_mask=result_mask, 

440 **kwargs, 

441 ) 

442 elif self.how in ["any", "all"]: 

443 func( 

444 out=result, 

445 values=values, 

446 labels=comp_ids, 

447 mask=mask, 

448 result_mask=result_mask, 

449 **kwargs, 

450 ) 

451 result = result.astype(bool, copy=False) 

452 elif self.how in ["skew"]: 

453 func( 

454 out=result, 

455 counts=counts, 

456 values=values, 

457 labels=comp_ids, 

458 mask=mask, 

459 result_mask=result_mask, 

460 **kwargs, 

461 ) 

462 if dtype == object: 

463 result = result.astype(object) 

464 

465 else: 

466 raise NotImplementedError(f"{self.how} is not implemented") 

467 else: 

468 # TODO: min_count 

469 if self.how != "rank": 

470 # TODO: should rank take result_mask? 

471 kwargs["result_mask"] = result_mask 

472 func( 

473 out=result, 

474 values=values, 

475 labels=comp_ids, 

476 ngroups=ngroups, 

477 is_datetimelike=is_datetimelike, 

478 mask=mask, 

479 **kwargs, 

480 ) 

481 

482 if self.kind == "aggregate" and self.how not in ["idxmin", "idxmax"]: 

483 # i.e. counts is defined. Locations where count<min_count 

484 # need to have the result set to np.nan, which may require casting, 

485 # see GH#40767. For idxmin/idxmax is handled specially via post-processing 

486 if result.dtype.kind in "iu" and not is_datetimelike: 

487 # if the op keeps the int dtypes, we have to use 0 

488 cutoff = max(0 if self.how in ["sum", "prod"] else 1, min_count) 

489 empty_groups = counts < cutoff 

490 if empty_groups.any(): 

491 if result_mask is not None: 

492 assert result_mask[empty_groups].all() 

493 else: 

494 # Note: this conversion could be lossy, see GH#40767 

495 result = result.astype("float64") 

496 result[empty_groups] = np.nan 

497 

498 result = result.T 

499 

500 if self.how not in self.cast_blocklist: 

501 # e.g. if we are int64 and need to restore to datetime64/timedelta64 

502 # "rank" is the only member of cast_blocklist we get here 

503 # Casting only needed for float16, bool, datetimelike, 

504 # and self.how in ["sum", "prod", "ohlc", "cumprod"] 

505 res_dtype = self._get_result_dtype(orig_values.dtype) 

506 op_result = maybe_downcast_to_dtype(result, res_dtype) 

507 else: 

508 op_result = result 

509 

510 return op_result 

511 

512 @final 

513 def _validate_axis(self, axis: AxisInt, values: ArrayLike) -> None: 

514 if values.ndim > 2: 

515 raise NotImplementedError("number of dimensions is currently limited to 2") 

516 if values.ndim == 2: 

517 assert axis == 1, axis 

518 elif not is_1d_only_ea_dtype(values.dtype): 

519 # Note: it is *not* the case that axis is always 0 for 1-dim values, 

520 # as we can have 1D ExtensionArrays that we need to treat as 2D 

521 assert axis == 0 

522 

523 @final 

524 def cython_operation( 

525 self, 

526 *, 

527 values: ArrayLike, 

528 axis: AxisInt, 

529 min_count: int = -1, 

530 comp_ids: np.ndarray, 

531 ngroups: int, 

532 **kwargs, 

533 ) -> ArrayLike: 

534 """ 

535 Call our cython function, with appropriate pre- and post- processing. 

536 """ 

537 self._validate_axis(axis, values) 

538 

539 if not isinstance(values, np.ndarray): 

540 # i.e. ExtensionArray 

541 return values._groupby_op( 

542 how=self.how, 

543 has_dropped_na=self.has_dropped_na, 

544 min_count=min_count, 

545 ngroups=ngroups, 

546 ids=comp_ids, 

547 **kwargs, 

548 ) 

549 

550 return self._cython_op_ndim_compat( 

551 values, 

552 min_count=min_count, 

553 ngroups=ngroups, 

554 comp_ids=comp_ids, 

555 mask=None, 

556 **kwargs, 

557 ) 

558 

559 

560class BaseGrouper: 

561 """ 

562 This is an internal Grouper class, which actually holds 

563 the generated groups 

564 

565 Parameters 

566 ---------- 

567 axis : Index 

568 groupings : Sequence[Grouping] 

569 all the grouping instances to handle in this grouper 

570 for example for grouper list to groupby, need to pass the list 

571 sort : bool, default True 

572 whether this grouper will give sorted result or not 

573 

574 """ 

575 

576 axis: Index 

577 

578 def __init__( 

579 self, 

580 axis: Index, 

581 groupings: Sequence[grouper.Grouping], 

582 sort: bool = True, 

583 dropna: bool = True, 

584 ) -> None: 

585 assert isinstance(axis, Index), axis 

586 

587 self.axis = axis 

588 self._groupings: list[grouper.Grouping] = list(groupings) 

589 self._sort = sort 

590 self.dropna = dropna 

591 

592 @property 

593 def groupings(self) -> list[grouper.Grouping]: 

594 return self._groupings 

595 

596 @property 

597 def shape(self) -> Shape: 

598 return tuple(ping.ngroups for ping in self.groupings) 

599 

600 def __iter__(self) -> Iterator[Hashable]: 

601 return iter(self.indices) 

602 

603 @property 

604 def nkeys(self) -> int: 

605 return len(self.groupings) 

606 

607 def get_iterator( 

608 self, data: NDFrameT, axis: AxisInt = 0 

609 ) -> Iterator[tuple[Hashable, NDFrameT]]: 

610 """ 

611 Groupby iterator 

612 

613 Returns 

614 ------- 

615 Generator yielding sequence of (name, subsetted object) 

616 for each group 

617 """ 

618 splitter = self._get_splitter(data, axis=axis) 

619 keys = self.group_keys_seq 

620 yield from zip(keys, splitter) 

621 

622 @final 

623 def _get_splitter(self, data: NDFrame, axis: AxisInt = 0) -> DataSplitter: 

624 """ 

625 Returns 

626 ------- 

627 Generator yielding subsetted objects 

628 """ 

629 ids, _, ngroups = self.group_info 

630 return _get_splitter( 

631 data, 

632 ids, 

633 ngroups, 

634 sorted_ids=self._sorted_ids, 

635 sort_idx=self._sort_idx, 

636 axis=axis, 

637 ) 

638 

639 @final 

640 @cache_readonly 

641 def group_keys_seq(self): 

642 if len(self.groupings) == 1: 

643 return self.levels[0] 

644 else: 

645 ids, _, ngroups = self.group_info 

646 

647 # provide "flattened" iterator for multi-group setting 

648 return get_flattened_list(ids, ngroups, self.levels, self.codes) 

649 

650 @cache_readonly 

651 def indices(self) -> dict[Hashable, npt.NDArray[np.intp]]: 

652 """dict {group name -> group indices}""" 

653 if len(self.groupings) == 1 and isinstance(self.result_index, CategoricalIndex): 

654 # This shows unused categories in indices GH#38642 

655 return self.groupings[0].indices 

656 codes_list = [ping.codes for ping in self.groupings] 

657 keys = [ping._group_index for ping in self.groupings] 

658 return get_indexer_dict(codes_list, keys) 

659 

660 @final 

661 def result_ilocs(self) -> npt.NDArray[np.intp]: 

662 """ 

663 Get the original integer locations of result_index in the input. 

664 """ 

665 # Original indices are where group_index would go via sorting. 

666 # But when dropna is true, we need to remove null values while accounting for 

667 # any gaps that then occur because of them. 

668 group_index = get_group_index( 

669 self.codes, self.shape, sort=self._sort, xnull=True 

670 ) 

671 group_index, _ = compress_group_index(group_index, sort=self._sort) 

672 

673 if self.has_dropped_na: 

674 mask = np.where(group_index >= 0) 

675 # Count how many gaps are caused by previous null values for each position 

676 null_gaps = np.cumsum(group_index == -1)[mask] 

677 group_index = group_index[mask] 

678 

679 result = get_group_index_sorter(group_index, self.ngroups) 

680 

681 if self.has_dropped_na: 

682 # Shift by the number of prior null gaps 

683 result += np.take(null_gaps, result) 

684 

685 return result 

686 

687 @final 

688 @property 

689 def codes(self) -> list[npt.NDArray[np.signedinteger]]: 

690 return [ping.codes for ping in self.groupings] 

691 

692 @property 

693 def levels(self) -> list[Index]: 

694 return [ping._group_index for ping in self.groupings] 

695 

696 @property 

697 def names(self) -> list[Hashable]: 

698 return [ping.name for ping in self.groupings] 

699 

700 @final 

701 def size(self) -> Series: 

702 """ 

703 Compute group sizes. 

704 """ 

705 ids, _, ngroups = self.group_info 

706 out: np.ndarray | list 

707 if ngroups: 

708 out = np.bincount(ids[ids != -1], minlength=ngroups) 

709 else: 

710 out = [] 

711 return Series(out, index=self.result_index, dtype="int64", copy=False) 

712 

713 @cache_readonly 

714 def groups(self) -> dict[Hashable, np.ndarray]: 

715 """dict {group name -> group labels}""" 

716 if len(self.groupings) == 1: 

717 return self.groupings[0].groups 

718 else: 

719 to_groupby = [] 

720 for ping in self.groupings: 

721 gv = ping.grouping_vector 

722 if not isinstance(gv, BaseGrouper): 

723 to_groupby.append(gv) 

724 else: 

725 to_groupby.append(gv.groupings[0].grouping_vector) 

726 index = MultiIndex.from_arrays(to_groupby) 

727 return self.axis.groupby(index) 

728 

729 @final 

730 @cache_readonly 

731 def is_monotonic(self) -> bool: 

732 # return if my group orderings are monotonic 

733 return Index(self.group_info[0]).is_monotonic_increasing 

734 

735 @final 

736 @cache_readonly 

737 def has_dropped_na(self) -> bool: 

738 """ 

739 Whether grouper has null value(s) that are dropped. 

740 """ 

741 return bool((self.group_info[0] < 0).any()) 

742 

743 @cache_readonly 

744 def group_info(self) -> tuple[npt.NDArray[np.intp], npt.NDArray[np.intp], int]: 

745 comp_ids, obs_group_ids = self._get_compressed_codes() 

746 

747 ngroups = len(obs_group_ids) 

748 comp_ids = ensure_platform_int(comp_ids) 

749 

750 return comp_ids, obs_group_ids, ngroups 

751 

752 @cache_readonly 

753 def codes_info(self) -> npt.NDArray[np.intp]: 

754 # return the codes of items in original grouped axis 

755 ids, _, _ = self.group_info 

756 return ids 

757 

758 @final 

759 def _get_compressed_codes( 

760 self, 

761 ) -> tuple[npt.NDArray[np.signedinteger], npt.NDArray[np.intp]]: 

762 # The first returned ndarray may have any signed integer dtype 

763 if len(self.groupings) > 1: 

764 group_index = get_group_index(self.codes, self.shape, sort=True, xnull=True) 

765 return compress_group_index(group_index, sort=self._sort) 

766 # FIXME: compress_group_index's second return value is int64, not intp 

767 

768 ping = self.groupings[0] 

769 return ping.codes, np.arange(len(ping._group_index), dtype=np.intp) 

770 

771 @final 

772 @cache_readonly 

773 def ngroups(self) -> int: 

774 return len(self.result_index) 

775 

776 @property 

777 def reconstructed_codes(self) -> list[npt.NDArray[np.intp]]: 

778 codes = self.codes 

779 ids, obs_ids, _ = self.group_info 

780 return decons_obs_group_ids(ids, obs_ids, self.shape, codes, xnull=True) 

781 

782 @cache_readonly 

783 def result_index(self) -> Index: 

784 if len(self.groupings) == 1: 

785 return self.groupings[0]._result_index.rename(self.names[0]) 

786 

787 codes = self.reconstructed_codes 

788 levels = [ping._result_index for ping in self.groupings] 

789 return MultiIndex( 

790 levels=levels, codes=codes, verify_integrity=False, names=self.names 

791 ) 

792 

793 @final 

794 def get_group_levels(self) -> list[ArrayLike]: 

795 # Note: only called from _insert_inaxis_grouper, which 

796 # is only called for BaseGrouper, never for BinGrouper 

797 if len(self.groupings) == 1: 

798 return [self.groupings[0]._group_arraylike] 

799 

800 name_list = [] 

801 for ping, codes in zip(self.groupings, self.reconstructed_codes): 

802 codes = ensure_platform_int(codes) 

803 levels = ping._group_arraylike.take(codes) 

804 

805 name_list.append(levels) 

806 

807 return name_list 

808 

809 # ------------------------------------------------------------ 

810 # Aggregation functions 

811 

812 @final 

813 def _cython_operation( 

814 self, 

815 kind: str, 

816 values, 

817 how: str, 

818 axis: AxisInt, 

819 min_count: int = -1, 

820 **kwargs, 

821 ) -> ArrayLike: 

822 """ 

823 Returns the values of a cython operation. 

824 """ 

825 assert kind in ["transform", "aggregate"] 

826 

827 cy_op = WrappedCythonOp(kind=kind, how=how, has_dropped_na=self.has_dropped_na) 

828 

829 ids, _, _ = self.group_info 

830 ngroups = self.ngroups 

831 return cy_op.cython_operation( 

832 values=values, 

833 axis=axis, 

834 min_count=min_count, 

835 comp_ids=ids, 

836 ngroups=ngroups, 

837 **kwargs, 

838 ) 

839 

840 @final 

841 def agg_series( 

842 self, obj: Series, func: Callable, preserve_dtype: bool = False 

843 ) -> ArrayLike: 

844 """ 

845 Parameters 

846 ---------- 

847 obj : Series 

848 func : function taking a Series and returning a scalar-like 

849 preserve_dtype : bool 

850 Whether the aggregation is known to be dtype-preserving. 

851 

852 Returns 

853 ------- 

854 np.ndarray or ExtensionArray 

855 """ 

856 

857 if not isinstance(obj._values, np.ndarray): 

858 # we can preserve a little bit more aggressively with EA dtype 

859 # because maybe_cast_pointwise_result will do a try/except 

860 # with _from_sequence. NB we are assuming here that _from_sequence 

861 # is sufficiently strict that it casts appropriately. 

862 preserve_dtype = True 

863 

864 result = self._aggregate_series_pure_python(obj, func) 

865 

866 npvalues = lib.maybe_convert_objects(result, try_float=False) 

867 if preserve_dtype: 

868 out = maybe_cast_pointwise_result(npvalues, obj.dtype, numeric_only=True) 

869 else: 

870 out = npvalues 

871 return out 

872 

873 @final 

874 def _aggregate_series_pure_python( 

875 self, obj: Series, func: Callable 

876 ) -> npt.NDArray[np.object_]: 

877 _, _, ngroups = self.group_info 

878 

879 result = np.empty(ngroups, dtype="O") 

880 initialized = False 

881 

882 splitter = self._get_splitter(obj, axis=0) 

883 

884 for i, group in enumerate(splitter): 

885 res = func(group) 

886 res = extract_result(res) 

887 

888 if not initialized: 

889 # We only do this validation on the first iteration 

890 check_result_array(res, group.dtype) 

891 initialized = True 

892 

893 result[i] = res 

894 

895 return result 

896 

897 @final 

898 def apply_groupwise( 

899 self, f: Callable, data: DataFrame | Series, axis: AxisInt = 0 

900 ) -> tuple[list, bool]: 

901 mutated = False 

902 splitter = self._get_splitter(data, axis=axis) 

903 group_keys = self.group_keys_seq 

904 result_values = [] 

905 

906 # This calls DataSplitter.__iter__ 

907 zipped = zip(group_keys, splitter) 

908 

909 for key, group in zipped: 

910 # Pinning name is needed for 

911 # test_group_apply_once_per_group, 

912 # test_inconsistent_return_type, test_set_group_name, 

913 # test_group_name_available_in_inference_pass, 

914 # test_groupby_multi_timezone 

915 object.__setattr__(group, "name", key) 

916 

917 # group might be modified 

918 group_axes = group.axes 

919 res = f(group) 

920 if not mutated and not _is_indexed_like(res, group_axes, axis): 

921 mutated = True 

922 result_values.append(res) 

923 # getattr pattern for __name__ is needed for functools.partial objects 

924 if len(group_keys) == 0 and getattr(f, "__name__", None) in [ 

925 "skew", 

926 "sum", 

927 "prod", 

928 ]: 

929 # If group_keys is empty, then no function calls have been made, 

930 # so we will not have raised even if this is an invalid dtype. 

931 # So do one dummy call here to raise appropriate TypeError. 

932 f(data.iloc[:0]) 

933 

934 return result_values, mutated 

935 

936 # ------------------------------------------------------------ 

937 # Methods for sorting subsets of our GroupBy's object 

938 

939 @final 

940 @cache_readonly 

941 def _sort_idx(self) -> npt.NDArray[np.intp]: 

942 # Counting sort indexer 

943 ids, _, ngroups = self.group_info 

944 return get_group_index_sorter(ids, ngroups) 

945 

946 @final 

947 @cache_readonly 

948 def _sorted_ids(self) -> npt.NDArray[np.intp]: 

949 ids, _, _ = self.group_info 

950 return ids.take(self._sort_idx) 

951 

952 

953class BinGrouper(BaseGrouper): 

954 """ 

955 This is an internal Grouper class 

956 

957 Parameters 

958 ---------- 

959 bins : the split index of binlabels to group the item of axis 

960 binlabels : the label list 

961 indexer : np.ndarray[np.intp], optional 

962 the indexer created by Grouper 

963 some groupers (TimeGrouper) will sort its axis and its 

964 group_info is also sorted, so need the indexer to reorder 

965 

966 Examples 

967 -------- 

968 bins: [2, 4, 6, 8, 10] 

969 binlabels: DatetimeIndex(['2005-01-01', '2005-01-03', 

970 '2005-01-05', '2005-01-07', '2005-01-09'], 

971 dtype='datetime64[ns]', freq='2D') 

972 

973 the group_info, which contains the label of each item in grouped 

974 axis, the index of label in label list, group number, is 

975 

976 (array([0, 0, 1, 1, 2, 2, 3, 3, 4, 4]), array([0, 1, 2, 3, 4]), 5) 

977 

978 means that, the grouped axis has 10 items, can be grouped into 5 

979 labels, the first and second items belong to the first label, the 

980 third and forth items belong to the second label, and so on 

981 

982 """ 

983 

984 bins: npt.NDArray[np.int64] 

985 binlabels: Index 

986 

987 def __init__( 

988 self, 

989 bins, 

990 binlabels, 

991 indexer=None, 

992 ) -> None: 

993 self.bins = ensure_int64(bins) 

994 self.binlabels = ensure_index(binlabels) 

995 self.indexer = indexer 

996 

997 # These lengths must match, otherwise we could call agg_series 

998 # with empty self.bins, which would raise later. 

999 assert len(self.binlabels) == len(self.bins) 

1000 

1001 @cache_readonly 

1002 def groups(self): 

1003 """dict {group name -> group labels}""" 

1004 # this is mainly for compat 

1005 # GH 3881 

1006 result = { 

1007 key: value 

1008 for key, value in zip(self.binlabels, self.bins) 

1009 if key is not NaT 

1010 } 

1011 return result 

1012 

1013 @property 

1014 def nkeys(self) -> int: 

1015 # still matches len(self.groupings), but we can hard-code 

1016 return 1 

1017 

1018 @cache_readonly 

1019 def codes_info(self) -> npt.NDArray[np.intp]: 

1020 # return the codes of items in original grouped axis 

1021 ids, _, _ = self.group_info 

1022 if self.indexer is not None: 

1023 sorter = np.lexsort((ids, self.indexer)) 

1024 ids = ids[sorter] 

1025 return ids 

1026 

1027 def get_iterator(self, data: NDFrame, axis: AxisInt = 0): 

1028 """ 

1029 Groupby iterator 

1030 

1031 Returns 

1032 ------- 

1033 Generator yielding sequence of (name, subsetted object) 

1034 for each group 

1035 """ 

1036 if axis == 0: 

1037 slicer = lambda start, edge: data.iloc[start:edge] 

1038 else: 

1039 slicer = lambda start, edge: data.iloc[:, start:edge] 

1040 

1041 length = len(data.axes[axis]) 

1042 

1043 start = 0 

1044 for edge, label in zip(self.bins, self.binlabels): 

1045 if label is not NaT: 

1046 yield label, slicer(start, edge) 

1047 start = edge 

1048 

1049 if start < length: 

1050 yield self.binlabels[-1], slicer(start, None) 

1051 

1052 @cache_readonly 

1053 def indices(self): 

1054 indices = collections.defaultdict(list) 

1055 

1056 i = 0 

1057 for label, bin in zip(self.binlabels, self.bins): 

1058 if i < bin: 

1059 if label is not NaT: 

1060 indices[label] = list(range(i, bin)) 

1061 i = bin 

1062 return indices 

1063 

1064 @cache_readonly 

1065 def group_info(self) -> tuple[npt.NDArray[np.intp], npt.NDArray[np.intp], int]: 

1066 ngroups = self.ngroups 

1067 obs_group_ids = np.arange(ngroups, dtype=np.intp) 

1068 rep = np.diff(np.r_[0, self.bins]) 

1069 

1070 rep = ensure_platform_int(rep) 

1071 if ngroups == len(self.bins): 

1072 comp_ids = np.repeat(np.arange(ngroups), rep) 

1073 else: 

1074 comp_ids = np.repeat(np.r_[-1, np.arange(ngroups)], rep) 

1075 

1076 return ( 

1077 ensure_platform_int(comp_ids), 

1078 obs_group_ids, 

1079 ngroups, 

1080 ) 

1081 

1082 @cache_readonly 

1083 def reconstructed_codes(self) -> list[np.ndarray]: 

1084 # get unique result indices, and prepend 0 as groupby starts from the first 

1085 return [np.r_[0, np.flatnonzero(self.bins[1:] != self.bins[:-1]) + 1]] 

1086 

1087 @cache_readonly 

1088 def result_index(self) -> Index: 

1089 if len(self.binlabels) != 0 and isna(self.binlabels[0]): 

1090 return self.binlabels[1:] 

1091 

1092 return self.binlabels 

1093 

1094 @property 

1095 def levels(self) -> list[Index]: 

1096 return [self.binlabels] 

1097 

1098 @property 

1099 def names(self) -> list[Hashable]: 

1100 return [self.binlabels.name] 

1101 

1102 @property 

1103 def groupings(self) -> list[grouper.Grouping]: 

1104 lev = self.binlabels 

1105 codes = self.group_info[0] 

1106 labels = lev.take(codes) 

1107 ping = grouper.Grouping( 

1108 labels, labels, in_axis=False, level=None, uniques=lev._values 

1109 ) 

1110 return [ping] 

1111 

1112 

1113def _is_indexed_like(obj, axes, axis: AxisInt) -> bool: 

1114 if isinstance(obj, Series): 

1115 if len(axes) > 1: 

1116 return False 

1117 return obj.axes[axis].equals(axes[axis]) 

1118 elif isinstance(obj, DataFrame): 

1119 return obj.axes[axis].equals(axes[axis]) 

1120 

1121 return False 

1122 

1123 

1124# ---------------------------------------------------------------------- 

1125# Splitting / application 

1126 

1127 

1128class DataSplitter(Generic[NDFrameT]): 

1129 def __init__( 

1130 self, 

1131 data: NDFrameT, 

1132 labels: npt.NDArray[np.intp], 

1133 ngroups: int, 

1134 *, 

1135 sort_idx: npt.NDArray[np.intp], 

1136 sorted_ids: npt.NDArray[np.intp], 

1137 axis: AxisInt = 0, 

1138 ) -> None: 

1139 self.data = data 

1140 self.labels = ensure_platform_int(labels) # _should_ already be np.intp 

1141 self.ngroups = ngroups 

1142 

1143 self._slabels = sorted_ids 

1144 self._sort_idx = sort_idx 

1145 

1146 self.axis = axis 

1147 assert isinstance(axis, int), axis 

1148 

1149 def __iter__(self) -> Iterator: 

1150 sdata = self._sorted_data 

1151 

1152 if self.ngroups == 0: 

1153 # we are inside a generator, rather than raise StopIteration 

1154 # we merely return signal the end 

1155 return 

1156 

1157 starts, ends = lib.generate_slices(self._slabels, self.ngroups) 

1158 

1159 for start, end in zip(starts, ends): 

1160 yield self._chop(sdata, slice(start, end)) 

1161 

1162 @cache_readonly 

1163 def _sorted_data(self) -> NDFrameT: 

1164 return self.data.take(self._sort_idx, axis=self.axis) 

1165 

1166 def _chop(self, sdata, slice_obj: slice) -> NDFrame: 

1167 raise AbstractMethodError(self) 

1168 

1169 

1170class SeriesSplitter(DataSplitter): 

1171 def _chop(self, sdata: Series, slice_obj: slice) -> Series: 

1172 # fastpath equivalent to `sdata.iloc[slice_obj]` 

1173 mgr = sdata._mgr.get_slice(slice_obj) 

1174 ser = sdata._constructor_from_mgr(mgr, axes=mgr.axes) 

1175 ser._name = sdata.name 

1176 return ser.__finalize__(sdata, method="groupby") 

1177 

1178 

1179class FrameSplitter(DataSplitter): 

1180 def _chop(self, sdata: DataFrame, slice_obj: slice) -> DataFrame: 

1181 # Fastpath equivalent to: 

1182 # if self.axis == 0: 

1183 # return sdata.iloc[slice_obj] 

1184 # else: 

1185 # return sdata.iloc[:, slice_obj] 

1186 mgr = sdata._mgr.get_slice(slice_obj, axis=1 - self.axis) 

1187 df = sdata._constructor_from_mgr(mgr, axes=mgr.axes) 

1188 return df.__finalize__(sdata, method="groupby") 

1189 

1190 

1191def _get_splitter( 

1192 data: NDFrame, 

1193 labels: npt.NDArray[np.intp], 

1194 ngroups: int, 

1195 *, 

1196 sort_idx: npt.NDArray[np.intp], 

1197 sorted_ids: npt.NDArray[np.intp], 

1198 axis: AxisInt = 0, 

1199) -> DataSplitter: 

1200 if isinstance(data, Series): 

1201 klass: type[DataSplitter] = SeriesSplitter 

1202 else: 

1203 # i.e. DataFrame 

1204 klass = FrameSplitter 

1205 

1206 return klass( 

1207 data, labels, ngroups, sort_idx=sort_idx, sorted_ids=sorted_ids, axis=axis 

1208 )