1"""
2Provide classes to perform the groupby aggregate operations.
3
4These are not exposed to the user and provide implementations of the grouping
5operations, primarily in cython. These classes (BaseGrouper and BinGrouper)
6are contained *in* the SeriesGroupBy and DataFrameGroupBy objects.
7"""
8from __future__ import annotations
9
10import collections
11import functools
12from typing import (
13 TYPE_CHECKING,
14 Callable,
15 Generic,
16 final,
17)
18
19import numpy as np
20
21from pandas._libs import (
22 NaT,
23 lib,
24)
25import pandas._libs.groupby as libgroupby
26from pandas._typing import (
27 ArrayLike,
28 AxisInt,
29 NDFrameT,
30 Shape,
31 npt,
32)
33from pandas.errors import AbstractMethodError
34from pandas.util._decorators import cache_readonly
35
36from pandas.core.dtypes.cast import (
37 maybe_cast_pointwise_result,
38 maybe_downcast_to_dtype,
39)
40from pandas.core.dtypes.common import (
41 ensure_float64,
42 ensure_int64,
43 ensure_platform_int,
44 ensure_uint64,
45 is_1d_only_ea_dtype,
46)
47from pandas.core.dtypes.missing import (
48 isna,
49 maybe_fill,
50)
51
52from pandas.core.frame import DataFrame
53from pandas.core.groupby import grouper
54from pandas.core.indexes.api import (
55 CategoricalIndex,
56 Index,
57 MultiIndex,
58 ensure_index,
59)
60from pandas.core.series import Series
61from pandas.core.sorting import (
62 compress_group_index,
63 decons_obs_group_ids,
64 get_flattened_list,
65 get_group_index,
66 get_group_index_sorter,
67 get_indexer_dict,
68)
69
70if TYPE_CHECKING:
71 from collections.abc import (
72 Hashable,
73 Iterator,
74 Sequence,
75 )
76
77 from pandas.core.generic import NDFrame
78
79
80def check_result_array(obj, dtype) -> None:
81 # Our operation is supposed to be an aggregation/reduction. If
82 # it returns an ndarray, this likely means an invalid operation has
83 # been passed. See test_apply_without_aggregation, test_agg_must_agg
84 if isinstance(obj, np.ndarray):
85 if dtype != object:
86 # If it is object dtype, the function can be a reduction/aggregation
87 # and still return an ndarray e.g. test_agg_over_numpy_arrays
88 raise ValueError("Must produce aggregated value")
89
90
91def extract_result(res):
92 """
93 Extract the result object, it might be a 0-dim ndarray
94 or a len-1 0-dim, or a scalar
95 """
96 if hasattr(res, "_values"):
97 # Preserve EA
98 res = res._values
99 if res.ndim == 1 and len(res) == 1:
100 # see test_agg_lambda_with_timezone, test_resampler_grouper.py::test_apply
101 res = res[0]
102 return res
103
104
105class WrappedCythonOp:
106 """
107 Dispatch logic for functions defined in _libs.groupby
108
109 Parameters
110 ----------
111 kind: str
112 Whether the operation is an aggregate or transform.
113 how: str
114 Operation name, e.g. "mean".
115 has_dropped_na: bool
116 True precisely when dropna=True and the grouper contains a null value.
117 """
118
119 # Functions for which we do _not_ attempt to cast the cython result
120 # back to the original dtype.
121 cast_blocklist = frozenset(
122 ["any", "all", "rank", "count", "size", "idxmin", "idxmax"]
123 )
124
125 def __init__(self, kind: str, how: str, has_dropped_na: bool) -> None:
126 self.kind = kind
127 self.how = how
128 self.has_dropped_na = has_dropped_na
129
130 _CYTHON_FUNCTIONS: dict[str, dict] = {
131 "aggregate": {
132 "any": functools.partial(libgroupby.group_any_all, val_test="any"),
133 "all": functools.partial(libgroupby.group_any_all, val_test="all"),
134 "sum": "group_sum",
135 "prod": "group_prod",
136 "idxmin": functools.partial(libgroupby.group_idxmin_idxmax, name="idxmin"),
137 "idxmax": functools.partial(libgroupby.group_idxmin_idxmax, name="idxmax"),
138 "min": "group_min",
139 "max": "group_max",
140 "mean": "group_mean",
141 "median": "group_median_float64",
142 "var": "group_var",
143 "std": functools.partial(libgroupby.group_var, name="std"),
144 "sem": functools.partial(libgroupby.group_var, name="sem"),
145 "skew": "group_skew",
146 "first": "group_nth",
147 "last": "group_last",
148 "ohlc": "group_ohlc",
149 },
150 "transform": {
151 "cumprod": "group_cumprod",
152 "cumsum": "group_cumsum",
153 "cummin": "group_cummin",
154 "cummax": "group_cummax",
155 "rank": "group_rank",
156 },
157 }
158
159 _cython_arity = {"ohlc": 4} # OHLC
160
161 @classmethod
162 def get_kind_from_how(cls, how: str) -> str:
163 if how in cls._CYTHON_FUNCTIONS["aggregate"]:
164 return "aggregate"
165 return "transform"
166
167 # Note: we make this a classmethod and pass kind+how so that caching
168 # works at the class level and not the instance level
169 @classmethod
170 @functools.cache
171 def _get_cython_function(
172 cls, kind: str, how: str, dtype: np.dtype, is_numeric: bool
173 ):
174 dtype_str = dtype.name
175 ftype = cls._CYTHON_FUNCTIONS[kind][how]
176
177 # see if there is a fused-type version of function
178 # only valid for numeric
179 if callable(ftype):
180 f = ftype
181 else:
182 f = getattr(libgroupby, ftype)
183 if is_numeric:
184 return f
185 elif dtype == np.dtype(object):
186 if how in ["median", "cumprod"]:
187 # no fused types -> no __signatures__
188 raise NotImplementedError(
189 f"function is not implemented for this dtype: "
190 f"[how->{how},dtype->{dtype_str}]"
191 )
192 elif how in ["std", "sem", "idxmin", "idxmax"]:
193 # We have a partial object that does not have __signatures__
194 return f
195 elif how == "skew":
196 # _get_cython_vals will convert to float64
197 pass
198 elif "object" not in f.__signatures__:
199 # raise NotImplementedError here rather than TypeError later
200 raise NotImplementedError(
201 f"function is not implemented for this dtype: "
202 f"[how->{how},dtype->{dtype_str}]"
203 )
204 return f
205 else:
206 raise NotImplementedError(
207 "This should not be reached. Please report a bug at "
208 "github.com/pandas-dev/pandas/",
209 dtype,
210 )
211
212 def _get_cython_vals(self, values: np.ndarray) -> np.ndarray:
213 """
214 Cast numeric dtypes to float64 for functions that only support that.
215
216 Parameters
217 ----------
218 values : np.ndarray
219
220 Returns
221 -------
222 values : np.ndarray
223 """
224 how = self.how
225
226 if how in ["median", "std", "sem", "skew"]:
227 # median only has a float64 implementation
228 # We should only get here with is_numeric, as non-numeric cases
229 # should raise in _get_cython_function
230 values = ensure_float64(values)
231
232 elif values.dtype.kind in "iu":
233 if how in ["var", "mean"] or (
234 self.kind == "transform" and self.has_dropped_na
235 ):
236 # has_dropped_na check need for test_null_group_str_transformer
237 # result may still include NaN, so we have to cast
238 values = ensure_float64(values)
239
240 elif how in ["sum", "ohlc", "prod", "cumsum", "cumprod"]:
241 # Avoid overflow during group op
242 if values.dtype.kind == "i":
243 values = ensure_int64(values)
244 else:
245 values = ensure_uint64(values)
246
247 return values
248
249 def _get_output_shape(self, ngroups: int, values: np.ndarray) -> Shape:
250 how = self.how
251 kind = self.kind
252
253 arity = self._cython_arity.get(how, 1)
254
255 out_shape: Shape
256 if how == "ohlc":
257 out_shape = (ngroups, arity)
258 elif arity > 1:
259 raise NotImplementedError(
260 "arity of more than 1 is not supported for the 'how' argument"
261 )
262 elif kind == "transform":
263 out_shape = values.shape
264 else:
265 out_shape = (ngroups,) + values.shape[1:]
266 return out_shape
267
268 def _get_out_dtype(self, dtype: np.dtype) -> np.dtype:
269 how = self.how
270
271 if how == "rank":
272 out_dtype = "float64"
273 elif how in ["idxmin", "idxmax"]:
274 # The Cython implementation only produces the row number; we'll take
275 # from the index using this in post processing
276 out_dtype = "intp"
277 else:
278 if dtype.kind in "iufcb":
279 out_dtype = f"{dtype.kind}{dtype.itemsize}"
280 else:
281 out_dtype = "object"
282 return np.dtype(out_dtype)
283
284 def _get_result_dtype(self, dtype: np.dtype) -> np.dtype:
285 """
286 Get the desired dtype of a result based on the
287 input dtype and how it was computed.
288
289 Parameters
290 ----------
291 dtype : np.dtype
292
293 Returns
294 -------
295 np.dtype
296 The desired dtype of the result.
297 """
298 how = self.how
299
300 if how in ["sum", "cumsum", "sum", "prod", "cumprod"]:
301 if dtype == np.dtype(bool):
302 return np.dtype(np.int64)
303 elif how in ["mean", "median", "var", "std", "sem"]:
304 if dtype.kind in "fc":
305 return dtype
306 elif dtype.kind in "iub":
307 return np.dtype(np.float64)
308 return dtype
309
310 @final
311 def _cython_op_ndim_compat(
312 self,
313 values: np.ndarray,
314 *,
315 min_count: int,
316 ngroups: int,
317 comp_ids: np.ndarray,
318 mask: npt.NDArray[np.bool_] | None = None,
319 result_mask: npt.NDArray[np.bool_] | None = None,
320 **kwargs,
321 ) -> np.ndarray:
322 if values.ndim == 1:
323 # expand to 2d, dispatch, then squeeze if appropriate
324 values2d = values[None, :]
325 if mask is not None:
326 mask = mask[None, :]
327 if result_mask is not None:
328 result_mask = result_mask[None, :]
329 res = self._call_cython_op(
330 values2d,
331 min_count=min_count,
332 ngroups=ngroups,
333 comp_ids=comp_ids,
334 mask=mask,
335 result_mask=result_mask,
336 **kwargs,
337 )
338 if res.shape[0] == 1:
339 return res[0]
340
341 # otherwise we have OHLC
342 return res.T
343
344 return self._call_cython_op(
345 values,
346 min_count=min_count,
347 ngroups=ngroups,
348 comp_ids=comp_ids,
349 mask=mask,
350 result_mask=result_mask,
351 **kwargs,
352 )
353
354 @final
355 def _call_cython_op(
356 self,
357 values: np.ndarray, # np.ndarray[ndim=2]
358 *,
359 min_count: int,
360 ngroups: int,
361 comp_ids: np.ndarray,
362 mask: npt.NDArray[np.bool_] | None,
363 result_mask: npt.NDArray[np.bool_] | None,
364 **kwargs,
365 ) -> np.ndarray: # np.ndarray[ndim=2]
366 orig_values = values
367
368 dtype = values.dtype
369 is_numeric = dtype.kind in "iufcb"
370
371 is_datetimelike = dtype.kind in "mM"
372
373 if is_datetimelike:
374 values = values.view("int64")
375 is_numeric = True
376 elif dtype.kind == "b":
377 values = values.view("uint8")
378 if values.dtype == "float16":
379 values = values.astype(np.float32)
380
381 if self.how in ["any", "all"]:
382 if mask is None:
383 mask = isna(values)
384 if dtype == object:
385 if kwargs["skipna"]:
386 # GH#37501: don't raise on pd.NA when skipna=True
387 if mask.any():
388 # mask on original values computed separately
389 values = values.copy()
390 values[mask] = True
391 values = values.astype(bool, copy=False).view(np.int8)
392 is_numeric = True
393
394 values = values.T
395 if mask is not None:
396 mask = mask.T
397 if result_mask is not None:
398 result_mask = result_mask.T
399
400 out_shape = self._get_output_shape(ngroups, values)
401 func = self._get_cython_function(self.kind, self.how, values.dtype, is_numeric)
402 values = self._get_cython_vals(values)
403 out_dtype = self._get_out_dtype(values.dtype)
404
405 result = maybe_fill(np.empty(out_shape, dtype=out_dtype))
406 if self.kind == "aggregate":
407 counts = np.zeros(ngroups, dtype=np.int64)
408 if self.how in [
409 "idxmin",
410 "idxmax",
411 "min",
412 "max",
413 "mean",
414 "last",
415 "first",
416 "sum",
417 ]:
418 func(
419 out=result,
420 counts=counts,
421 values=values,
422 labels=comp_ids,
423 min_count=min_count,
424 mask=mask,
425 result_mask=result_mask,
426 is_datetimelike=is_datetimelike,
427 **kwargs,
428 )
429 elif self.how in ["sem", "std", "var", "ohlc", "prod", "median"]:
430 if self.how in ["std", "sem"]:
431 kwargs["is_datetimelike"] = is_datetimelike
432 func(
433 result,
434 counts,
435 values,
436 comp_ids,
437 min_count=min_count,
438 mask=mask,
439 result_mask=result_mask,
440 **kwargs,
441 )
442 elif self.how in ["any", "all"]:
443 func(
444 out=result,
445 values=values,
446 labels=comp_ids,
447 mask=mask,
448 result_mask=result_mask,
449 **kwargs,
450 )
451 result = result.astype(bool, copy=False)
452 elif self.how in ["skew"]:
453 func(
454 out=result,
455 counts=counts,
456 values=values,
457 labels=comp_ids,
458 mask=mask,
459 result_mask=result_mask,
460 **kwargs,
461 )
462 if dtype == object:
463 result = result.astype(object)
464
465 else:
466 raise NotImplementedError(f"{self.how} is not implemented")
467 else:
468 # TODO: min_count
469 if self.how != "rank":
470 # TODO: should rank take result_mask?
471 kwargs["result_mask"] = result_mask
472 func(
473 out=result,
474 values=values,
475 labels=comp_ids,
476 ngroups=ngroups,
477 is_datetimelike=is_datetimelike,
478 mask=mask,
479 **kwargs,
480 )
481
482 if self.kind == "aggregate" and self.how not in ["idxmin", "idxmax"]:
483 # i.e. counts is defined. Locations where count<min_count
484 # need to have the result set to np.nan, which may require casting,
485 # see GH#40767. For idxmin/idxmax is handled specially via post-processing
486 if result.dtype.kind in "iu" and not is_datetimelike:
487 # if the op keeps the int dtypes, we have to use 0
488 cutoff = max(0 if self.how in ["sum", "prod"] else 1, min_count)
489 empty_groups = counts < cutoff
490 if empty_groups.any():
491 if result_mask is not None:
492 assert result_mask[empty_groups].all()
493 else:
494 # Note: this conversion could be lossy, see GH#40767
495 result = result.astype("float64")
496 result[empty_groups] = np.nan
497
498 result = result.T
499
500 if self.how not in self.cast_blocklist:
501 # e.g. if we are int64 and need to restore to datetime64/timedelta64
502 # "rank" is the only member of cast_blocklist we get here
503 # Casting only needed for float16, bool, datetimelike,
504 # and self.how in ["sum", "prod", "ohlc", "cumprod"]
505 res_dtype = self._get_result_dtype(orig_values.dtype)
506 op_result = maybe_downcast_to_dtype(result, res_dtype)
507 else:
508 op_result = result
509
510 return op_result
511
512 @final
513 def _validate_axis(self, axis: AxisInt, values: ArrayLike) -> None:
514 if values.ndim > 2:
515 raise NotImplementedError("number of dimensions is currently limited to 2")
516 if values.ndim == 2:
517 assert axis == 1, axis
518 elif not is_1d_only_ea_dtype(values.dtype):
519 # Note: it is *not* the case that axis is always 0 for 1-dim values,
520 # as we can have 1D ExtensionArrays that we need to treat as 2D
521 assert axis == 0
522
523 @final
524 def cython_operation(
525 self,
526 *,
527 values: ArrayLike,
528 axis: AxisInt,
529 min_count: int = -1,
530 comp_ids: np.ndarray,
531 ngroups: int,
532 **kwargs,
533 ) -> ArrayLike:
534 """
535 Call our cython function, with appropriate pre- and post- processing.
536 """
537 self._validate_axis(axis, values)
538
539 if not isinstance(values, np.ndarray):
540 # i.e. ExtensionArray
541 return values._groupby_op(
542 how=self.how,
543 has_dropped_na=self.has_dropped_na,
544 min_count=min_count,
545 ngroups=ngroups,
546 ids=comp_ids,
547 **kwargs,
548 )
549
550 return self._cython_op_ndim_compat(
551 values,
552 min_count=min_count,
553 ngroups=ngroups,
554 comp_ids=comp_ids,
555 mask=None,
556 **kwargs,
557 )
558
559
560class BaseGrouper:
561 """
562 This is an internal Grouper class, which actually holds
563 the generated groups
564
565 Parameters
566 ----------
567 axis : Index
568 groupings : Sequence[Grouping]
569 all the grouping instances to handle in this grouper
570 for example for grouper list to groupby, need to pass the list
571 sort : bool, default True
572 whether this grouper will give sorted result or not
573
574 """
575
576 axis: Index
577
578 def __init__(
579 self,
580 axis: Index,
581 groupings: Sequence[grouper.Grouping],
582 sort: bool = True,
583 dropna: bool = True,
584 ) -> None:
585 assert isinstance(axis, Index), axis
586
587 self.axis = axis
588 self._groupings: list[grouper.Grouping] = list(groupings)
589 self._sort = sort
590 self.dropna = dropna
591
592 @property
593 def groupings(self) -> list[grouper.Grouping]:
594 return self._groupings
595
596 @property
597 def shape(self) -> Shape:
598 return tuple(ping.ngroups for ping in self.groupings)
599
600 def __iter__(self) -> Iterator[Hashable]:
601 return iter(self.indices)
602
603 @property
604 def nkeys(self) -> int:
605 return len(self.groupings)
606
607 def get_iterator(
608 self, data: NDFrameT, axis: AxisInt = 0
609 ) -> Iterator[tuple[Hashable, NDFrameT]]:
610 """
611 Groupby iterator
612
613 Returns
614 -------
615 Generator yielding sequence of (name, subsetted object)
616 for each group
617 """
618 splitter = self._get_splitter(data, axis=axis)
619 keys = self.group_keys_seq
620 yield from zip(keys, splitter)
621
622 @final
623 def _get_splitter(self, data: NDFrame, axis: AxisInt = 0) -> DataSplitter:
624 """
625 Returns
626 -------
627 Generator yielding subsetted objects
628 """
629 ids, _, ngroups = self.group_info
630 return _get_splitter(
631 data,
632 ids,
633 ngroups,
634 sorted_ids=self._sorted_ids,
635 sort_idx=self._sort_idx,
636 axis=axis,
637 )
638
639 @final
640 @cache_readonly
641 def group_keys_seq(self):
642 if len(self.groupings) == 1:
643 return self.levels[0]
644 else:
645 ids, _, ngroups = self.group_info
646
647 # provide "flattened" iterator for multi-group setting
648 return get_flattened_list(ids, ngroups, self.levels, self.codes)
649
650 @cache_readonly
651 def indices(self) -> dict[Hashable, npt.NDArray[np.intp]]:
652 """dict {group name -> group indices}"""
653 if len(self.groupings) == 1 and isinstance(self.result_index, CategoricalIndex):
654 # This shows unused categories in indices GH#38642
655 return self.groupings[0].indices
656 codes_list = [ping.codes for ping in self.groupings]
657 keys = [ping._group_index for ping in self.groupings]
658 return get_indexer_dict(codes_list, keys)
659
660 @final
661 def result_ilocs(self) -> npt.NDArray[np.intp]:
662 """
663 Get the original integer locations of result_index in the input.
664 """
665 # Original indices are where group_index would go via sorting.
666 # But when dropna is true, we need to remove null values while accounting for
667 # any gaps that then occur because of them.
668 group_index = get_group_index(
669 self.codes, self.shape, sort=self._sort, xnull=True
670 )
671 group_index, _ = compress_group_index(group_index, sort=self._sort)
672
673 if self.has_dropped_na:
674 mask = np.where(group_index >= 0)
675 # Count how many gaps are caused by previous null values for each position
676 null_gaps = np.cumsum(group_index == -1)[mask]
677 group_index = group_index[mask]
678
679 result = get_group_index_sorter(group_index, self.ngroups)
680
681 if self.has_dropped_na:
682 # Shift by the number of prior null gaps
683 result += np.take(null_gaps, result)
684
685 return result
686
687 @final
688 @property
689 def codes(self) -> list[npt.NDArray[np.signedinteger]]:
690 return [ping.codes for ping in self.groupings]
691
692 @property
693 def levels(self) -> list[Index]:
694 return [ping._group_index for ping in self.groupings]
695
696 @property
697 def names(self) -> list[Hashable]:
698 return [ping.name for ping in self.groupings]
699
700 @final
701 def size(self) -> Series:
702 """
703 Compute group sizes.
704 """
705 ids, _, ngroups = self.group_info
706 out: np.ndarray | list
707 if ngroups:
708 out = np.bincount(ids[ids != -1], minlength=ngroups)
709 else:
710 out = []
711 return Series(out, index=self.result_index, dtype="int64", copy=False)
712
713 @cache_readonly
714 def groups(self) -> dict[Hashable, np.ndarray]:
715 """dict {group name -> group labels}"""
716 if len(self.groupings) == 1:
717 return self.groupings[0].groups
718 else:
719 to_groupby = []
720 for ping in self.groupings:
721 gv = ping.grouping_vector
722 if not isinstance(gv, BaseGrouper):
723 to_groupby.append(gv)
724 else:
725 to_groupby.append(gv.groupings[0].grouping_vector)
726 index = MultiIndex.from_arrays(to_groupby)
727 return self.axis.groupby(index)
728
729 @final
730 @cache_readonly
731 def is_monotonic(self) -> bool:
732 # return if my group orderings are monotonic
733 return Index(self.group_info[0]).is_monotonic_increasing
734
735 @final
736 @cache_readonly
737 def has_dropped_na(self) -> bool:
738 """
739 Whether grouper has null value(s) that are dropped.
740 """
741 return bool((self.group_info[0] < 0).any())
742
743 @cache_readonly
744 def group_info(self) -> tuple[npt.NDArray[np.intp], npt.NDArray[np.intp], int]:
745 comp_ids, obs_group_ids = self._get_compressed_codes()
746
747 ngroups = len(obs_group_ids)
748 comp_ids = ensure_platform_int(comp_ids)
749
750 return comp_ids, obs_group_ids, ngroups
751
752 @cache_readonly
753 def codes_info(self) -> npt.NDArray[np.intp]:
754 # return the codes of items in original grouped axis
755 ids, _, _ = self.group_info
756 return ids
757
758 @final
759 def _get_compressed_codes(
760 self,
761 ) -> tuple[npt.NDArray[np.signedinteger], npt.NDArray[np.intp]]:
762 # The first returned ndarray may have any signed integer dtype
763 if len(self.groupings) > 1:
764 group_index = get_group_index(self.codes, self.shape, sort=True, xnull=True)
765 return compress_group_index(group_index, sort=self._sort)
766 # FIXME: compress_group_index's second return value is int64, not intp
767
768 ping = self.groupings[0]
769 return ping.codes, np.arange(len(ping._group_index), dtype=np.intp)
770
771 @final
772 @cache_readonly
773 def ngroups(self) -> int:
774 return len(self.result_index)
775
776 @property
777 def reconstructed_codes(self) -> list[npt.NDArray[np.intp]]:
778 codes = self.codes
779 ids, obs_ids, _ = self.group_info
780 return decons_obs_group_ids(ids, obs_ids, self.shape, codes, xnull=True)
781
782 @cache_readonly
783 def result_index(self) -> Index:
784 if len(self.groupings) == 1:
785 return self.groupings[0]._result_index.rename(self.names[0])
786
787 codes = self.reconstructed_codes
788 levels = [ping._result_index for ping in self.groupings]
789 return MultiIndex(
790 levels=levels, codes=codes, verify_integrity=False, names=self.names
791 )
792
793 @final
794 def get_group_levels(self) -> list[ArrayLike]:
795 # Note: only called from _insert_inaxis_grouper, which
796 # is only called for BaseGrouper, never for BinGrouper
797 if len(self.groupings) == 1:
798 return [self.groupings[0]._group_arraylike]
799
800 name_list = []
801 for ping, codes in zip(self.groupings, self.reconstructed_codes):
802 codes = ensure_platform_int(codes)
803 levels = ping._group_arraylike.take(codes)
804
805 name_list.append(levels)
806
807 return name_list
808
809 # ------------------------------------------------------------
810 # Aggregation functions
811
812 @final
813 def _cython_operation(
814 self,
815 kind: str,
816 values,
817 how: str,
818 axis: AxisInt,
819 min_count: int = -1,
820 **kwargs,
821 ) -> ArrayLike:
822 """
823 Returns the values of a cython operation.
824 """
825 assert kind in ["transform", "aggregate"]
826
827 cy_op = WrappedCythonOp(kind=kind, how=how, has_dropped_na=self.has_dropped_na)
828
829 ids, _, _ = self.group_info
830 ngroups = self.ngroups
831 return cy_op.cython_operation(
832 values=values,
833 axis=axis,
834 min_count=min_count,
835 comp_ids=ids,
836 ngroups=ngroups,
837 **kwargs,
838 )
839
840 @final
841 def agg_series(
842 self, obj: Series, func: Callable, preserve_dtype: bool = False
843 ) -> ArrayLike:
844 """
845 Parameters
846 ----------
847 obj : Series
848 func : function taking a Series and returning a scalar-like
849 preserve_dtype : bool
850 Whether the aggregation is known to be dtype-preserving.
851
852 Returns
853 -------
854 np.ndarray or ExtensionArray
855 """
856
857 if not isinstance(obj._values, np.ndarray):
858 # we can preserve a little bit more aggressively with EA dtype
859 # because maybe_cast_pointwise_result will do a try/except
860 # with _from_sequence. NB we are assuming here that _from_sequence
861 # is sufficiently strict that it casts appropriately.
862 preserve_dtype = True
863
864 result = self._aggregate_series_pure_python(obj, func)
865
866 npvalues = lib.maybe_convert_objects(result, try_float=False)
867 if preserve_dtype:
868 out = maybe_cast_pointwise_result(npvalues, obj.dtype, numeric_only=True)
869 else:
870 out = npvalues
871 return out
872
873 @final
874 def _aggregate_series_pure_python(
875 self, obj: Series, func: Callable
876 ) -> npt.NDArray[np.object_]:
877 _, _, ngroups = self.group_info
878
879 result = np.empty(ngroups, dtype="O")
880 initialized = False
881
882 splitter = self._get_splitter(obj, axis=0)
883
884 for i, group in enumerate(splitter):
885 res = func(group)
886 res = extract_result(res)
887
888 if not initialized:
889 # We only do this validation on the first iteration
890 check_result_array(res, group.dtype)
891 initialized = True
892
893 result[i] = res
894
895 return result
896
897 @final
898 def apply_groupwise(
899 self, f: Callable, data: DataFrame | Series, axis: AxisInt = 0
900 ) -> tuple[list, bool]:
901 mutated = False
902 splitter = self._get_splitter(data, axis=axis)
903 group_keys = self.group_keys_seq
904 result_values = []
905
906 # This calls DataSplitter.__iter__
907 zipped = zip(group_keys, splitter)
908
909 for key, group in zipped:
910 # Pinning name is needed for
911 # test_group_apply_once_per_group,
912 # test_inconsistent_return_type, test_set_group_name,
913 # test_group_name_available_in_inference_pass,
914 # test_groupby_multi_timezone
915 object.__setattr__(group, "name", key)
916
917 # group might be modified
918 group_axes = group.axes
919 res = f(group)
920 if not mutated and not _is_indexed_like(res, group_axes, axis):
921 mutated = True
922 result_values.append(res)
923 # getattr pattern for __name__ is needed for functools.partial objects
924 if len(group_keys) == 0 and getattr(f, "__name__", None) in [
925 "skew",
926 "sum",
927 "prod",
928 ]:
929 # If group_keys is empty, then no function calls have been made,
930 # so we will not have raised even if this is an invalid dtype.
931 # So do one dummy call here to raise appropriate TypeError.
932 f(data.iloc[:0])
933
934 return result_values, mutated
935
936 # ------------------------------------------------------------
937 # Methods for sorting subsets of our GroupBy's object
938
939 @final
940 @cache_readonly
941 def _sort_idx(self) -> npt.NDArray[np.intp]:
942 # Counting sort indexer
943 ids, _, ngroups = self.group_info
944 return get_group_index_sorter(ids, ngroups)
945
946 @final
947 @cache_readonly
948 def _sorted_ids(self) -> npt.NDArray[np.intp]:
949 ids, _, _ = self.group_info
950 return ids.take(self._sort_idx)
951
952
953class BinGrouper(BaseGrouper):
954 """
955 This is an internal Grouper class
956
957 Parameters
958 ----------
959 bins : the split index of binlabels to group the item of axis
960 binlabels : the label list
961 indexer : np.ndarray[np.intp], optional
962 the indexer created by Grouper
963 some groupers (TimeGrouper) will sort its axis and its
964 group_info is also sorted, so need the indexer to reorder
965
966 Examples
967 --------
968 bins: [2, 4, 6, 8, 10]
969 binlabels: DatetimeIndex(['2005-01-01', '2005-01-03',
970 '2005-01-05', '2005-01-07', '2005-01-09'],
971 dtype='datetime64[ns]', freq='2D')
972
973 the group_info, which contains the label of each item in grouped
974 axis, the index of label in label list, group number, is
975
976 (array([0, 0, 1, 1, 2, 2, 3, 3, 4, 4]), array([0, 1, 2, 3, 4]), 5)
977
978 means that, the grouped axis has 10 items, can be grouped into 5
979 labels, the first and second items belong to the first label, the
980 third and forth items belong to the second label, and so on
981
982 """
983
984 bins: npt.NDArray[np.int64]
985 binlabels: Index
986
987 def __init__(
988 self,
989 bins,
990 binlabels,
991 indexer=None,
992 ) -> None:
993 self.bins = ensure_int64(bins)
994 self.binlabels = ensure_index(binlabels)
995 self.indexer = indexer
996
997 # These lengths must match, otherwise we could call agg_series
998 # with empty self.bins, which would raise later.
999 assert len(self.binlabels) == len(self.bins)
1000
1001 @cache_readonly
1002 def groups(self):
1003 """dict {group name -> group labels}"""
1004 # this is mainly for compat
1005 # GH 3881
1006 result = {
1007 key: value
1008 for key, value in zip(self.binlabels, self.bins)
1009 if key is not NaT
1010 }
1011 return result
1012
1013 @property
1014 def nkeys(self) -> int:
1015 # still matches len(self.groupings), but we can hard-code
1016 return 1
1017
1018 @cache_readonly
1019 def codes_info(self) -> npt.NDArray[np.intp]:
1020 # return the codes of items in original grouped axis
1021 ids, _, _ = self.group_info
1022 if self.indexer is not None:
1023 sorter = np.lexsort((ids, self.indexer))
1024 ids = ids[sorter]
1025 return ids
1026
1027 def get_iterator(self, data: NDFrame, axis: AxisInt = 0):
1028 """
1029 Groupby iterator
1030
1031 Returns
1032 -------
1033 Generator yielding sequence of (name, subsetted object)
1034 for each group
1035 """
1036 if axis == 0:
1037 slicer = lambda start, edge: data.iloc[start:edge]
1038 else:
1039 slicer = lambda start, edge: data.iloc[:, start:edge]
1040
1041 length = len(data.axes[axis])
1042
1043 start = 0
1044 for edge, label in zip(self.bins, self.binlabels):
1045 if label is not NaT:
1046 yield label, slicer(start, edge)
1047 start = edge
1048
1049 if start < length:
1050 yield self.binlabels[-1], slicer(start, None)
1051
1052 @cache_readonly
1053 def indices(self):
1054 indices = collections.defaultdict(list)
1055
1056 i = 0
1057 for label, bin in zip(self.binlabels, self.bins):
1058 if i < bin:
1059 if label is not NaT:
1060 indices[label] = list(range(i, bin))
1061 i = bin
1062 return indices
1063
1064 @cache_readonly
1065 def group_info(self) -> tuple[npt.NDArray[np.intp], npt.NDArray[np.intp], int]:
1066 ngroups = self.ngroups
1067 obs_group_ids = np.arange(ngroups, dtype=np.intp)
1068 rep = np.diff(np.r_[0, self.bins])
1069
1070 rep = ensure_platform_int(rep)
1071 if ngroups == len(self.bins):
1072 comp_ids = np.repeat(np.arange(ngroups), rep)
1073 else:
1074 comp_ids = np.repeat(np.r_[-1, np.arange(ngroups)], rep)
1075
1076 return (
1077 ensure_platform_int(comp_ids),
1078 obs_group_ids,
1079 ngroups,
1080 )
1081
1082 @cache_readonly
1083 def reconstructed_codes(self) -> list[np.ndarray]:
1084 # get unique result indices, and prepend 0 as groupby starts from the first
1085 return [np.r_[0, np.flatnonzero(self.bins[1:] != self.bins[:-1]) + 1]]
1086
1087 @cache_readonly
1088 def result_index(self) -> Index:
1089 if len(self.binlabels) != 0 and isna(self.binlabels[0]):
1090 return self.binlabels[1:]
1091
1092 return self.binlabels
1093
1094 @property
1095 def levels(self) -> list[Index]:
1096 return [self.binlabels]
1097
1098 @property
1099 def names(self) -> list[Hashable]:
1100 return [self.binlabels.name]
1101
1102 @property
1103 def groupings(self) -> list[grouper.Grouping]:
1104 lev = self.binlabels
1105 codes = self.group_info[0]
1106 labels = lev.take(codes)
1107 ping = grouper.Grouping(
1108 labels, labels, in_axis=False, level=None, uniques=lev._values
1109 )
1110 return [ping]
1111
1112
1113def _is_indexed_like(obj, axes, axis: AxisInt) -> bool:
1114 if isinstance(obj, Series):
1115 if len(axes) > 1:
1116 return False
1117 return obj.axes[axis].equals(axes[axis])
1118 elif isinstance(obj, DataFrame):
1119 return obj.axes[axis].equals(axes[axis])
1120
1121 return False
1122
1123
1124# ----------------------------------------------------------------------
1125# Splitting / application
1126
1127
1128class DataSplitter(Generic[NDFrameT]):
1129 def __init__(
1130 self,
1131 data: NDFrameT,
1132 labels: npt.NDArray[np.intp],
1133 ngroups: int,
1134 *,
1135 sort_idx: npt.NDArray[np.intp],
1136 sorted_ids: npt.NDArray[np.intp],
1137 axis: AxisInt = 0,
1138 ) -> None:
1139 self.data = data
1140 self.labels = ensure_platform_int(labels) # _should_ already be np.intp
1141 self.ngroups = ngroups
1142
1143 self._slabels = sorted_ids
1144 self._sort_idx = sort_idx
1145
1146 self.axis = axis
1147 assert isinstance(axis, int), axis
1148
1149 def __iter__(self) -> Iterator:
1150 sdata = self._sorted_data
1151
1152 if self.ngroups == 0:
1153 # we are inside a generator, rather than raise StopIteration
1154 # we merely return signal the end
1155 return
1156
1157 starts, ends = lib.generate_slices(self._slabels, self.ngroups)
1158
1159 for start, end in zip(starts, ends):
1160 yield self._chop(sdata, slice(start, end))
1161
1162 @cache_readonly
1163 def _sorted_data(self) -> NDFrameT:
1164 return self.data.take(self._sort_idx, axis=self.axis)
1165
1166 def _chop(self, sdata, slice_obj: slice) -> NDFrame:
1167 raise AbstractMethodError(self)
1168
1169
1170class SeriesSplitter(DataSplitter):
1171 def _chop(self, sdata: Series, slice_obj: slice) -> Series:
1172 # fastpath equivalent to `sdata.iloc[slice_obj]`
1173 mgr = sdata._mgr.get_slice(slice_obj)
1174 ser = sdata._constructor_from_mgr(mgr, axes=mgr.axes)
1175 ser._name = sdata.name
1176 return ser.__finalize__(sdata, method="groupby")
1177
1178
1179class FrameSplitter(DataSplitter):
1180 def _chop(self, sdata: DataFrame, slice_obj: slice) -> DataFrame:
1181 # Fastpath equivalent to:
1182 # if self.axis == 0:
1183 # return sdata.iloc[slice_obj]
1184 # else:
1185 # return sdata.iloc[:, slice_obj]
1186 mgr = sdata._mgr.get_slice(slice_obj, axis=1 - self.axis)
1187 df = sdata._constructor_from_mgr(mgr, axes=mgr.axes)
1188 return df.__finalize__(sdata, method="groupby")
1189
1190
1191def _get_splitter(
1192 data: NDFrame,
1193 labels: npt.NDArray[np.intp],
1194 ngroups: int,
1195 *,
1196 sort_idx: npt.NDArray[np.intp],
1197 sorted_ids: npt.NDArray[np.intp],
1198 axis: AxisInt = 0,
1199) -> DataSplitter:
1200 if isinstance(data, Series):
1201 klass: type[DataSplitter] = SeriesSplitter
1202 else:
1203 # i.e. DataFrame
1204 klass = FrameSplitter
1205
1206 return klass(
1207 data, labels, ngroups, sort_idx=sort_idx, sorted_ids=sorted_ids, axis=axis
1208 )