1from __future__ import annotations
2
3import abc
4from collections import defaultdict
5import functools
6from functools import partial
7import inspect
8from typing import (
9 TYPE_CHECKING,
10 Any,
11 Callable,
12 Literal,
13 cast,
14)
15import warnings
16
17import numpy as np
18
19from pandas._config import option_context
20
21from pandas._libs import lib
22from pandas._libs.internals import BlockValuesRefs
23from pandas._typing import (
24 AggFuncType,
25 AggFuncTypeBase,
26 AggFuncTypeDict,
27 AggObjType,
28 Axis,
29 AxisInt,
30 NDFrameT,
31 npt,
32)
33from pandas.compat._optional import import_optional_dependency
34from pandas.errors import SpecificationError
35from pandas.util._decorators import cache_readonly
36from pandas.util._exceptions import find_stack_level
37
38from pandas.core.dtypes.cast import is_nested_object
39from pandas.core.dtypes.common import (
40 is_dict_like,
41 is_extension_array_dtype,
42 is_list_like,
43 is_numeric_dtype,
44 is_sequence,
45)
46from pandas.core.dtypes.dtypes import (
47 CategoricalDtype,
48 ExtensionDtype,
49)
50from pandas.core.dtypes.generic import (
51 ABCDataFrame,
52 ABCNDFrame,
53 ABCSeries,
54)
55
56from pandas.core._numba.executor import generate_apply_looper
57import pandas.core.common as com
58from pandas.core.construction import ensure_wrapped_if_datetimelike
59
60if TYPE_CHECKING:
61 from collections.abc import (
62 Generator,
63 Hashable,
64 Iterable,
65 MutableMapping,
66 Sequence,
67 )
68
69 from pandas import (
70 DataFrame,
71 Index,
72 Series,
73 )
74 from pandas.core.groupby import GroupBy
75 from pandas.core.resample import Resampler
76 from pandas.core.window.rolling import BaseWindow
77
78
79ResType = dict[int, Any]
80
81
82def frame_apply(
83 obj: DataFrame,
84 func: AggFuncType,
85 axis: Axis = 0,
86 raw: bool = False,
87 result_type: str | None = None,
88 by_row: Literal[False, "compat"] = "compat",
89 engine: str = "python",
90 engine_kwargs: dict[str, bool] | None = None,
91 args=None,
92 kwargs=None,
93) -> FrameApply:
94 """construct and return a row or column based frame apply object"""
95 axis = obj._get_axis_number(axis)
96 klass: type[FrameApply]
97 if axis == 0:
98 klass = FrameRowApply
99 elif axis == 1:
100 klass = FrameColumnApply
101
102 _, func, _, _ = reconstruct_func(func, **kwargs)
103 assert func is not None
104
105 return klass(
106 obj,
107 func,
108 raw=raw,
109 result_type=result_type,
110 by_row=by_row,
111 engine=engine,
112 engine_kwargs=engine_kwargs,
113 args=args,
114 kwargs=kwargs,
115 )
116
117
118class Apply(metaclass=abc.ABCMeta):
119 axis: AxisInt
120
121 def __init__(
122 self,
123 obj: AggObjType,
124 func: AggFuncType,
125 raw: bool,
126 result_type: str | None,
127 *,
128 by_row: Literal[False, "compat", "_compat"] = "compat",
129 engine: str = "python",
130 engine_kwargs: dict[str, bool] | None = None,
131 args,
132 kwargs,
133 ) -> None:
134 self.obj = obj
135 self.raw = raw
136
137 assert by_row is False or by_row in ["compat", "_compat"]
138 self.by_row = by_row
139
140 self.args = args or ()
141 self.kwargs = kwargs or {}
142
143 self.engine = engine
144 self.engine_kwargs = {} if engine_kwargs is None else engine_kwargs
145
146 if result_type not in [None, "reduce", "broadcast", "expand"]:
147 raise ValueError(
148 "invalid value for result_type, must be one "
149 "of {None, 'reduce', 'broadcast', 'expand'}"
150 )
151
152 self.result_type = result_type
153
154 self.func = func
155
156 @abc.abstractmethod
157 def apply(self) -> DataFrame | Series:
158 pass
159
160 @abc.abstractmethod
161 def agg_or_apply_list_like(
162 self, op_name: Literal["agg", "apply"]
163 ) -> DataFrame | Series:
164 pass
165
166 @abc.abstractmethod
167 def agg_or_apply_dict_like(
168 self, op_name: Literal["agg", "apply"]
169 ) -> DataFrame | Series:
170 pass
171
172 def agg(self) -> DataFrame | Series | None:
173 """
174 Provide an implementation for the aggregators.
175
176 Returns
177 -------
178 Result of aggregation, or None if agg cannot be performed by
179 this method.
180 """
181 obj = self.obj
182 func = self.func
183 args = self.args
184 kwargs = self.kwargs
185
186 if isinstance(func, str):
187 return self.apply_str()
188
189 if is_dict_like(func):
190 return self.agg_dict_like()
191 elif is_list_like(func):
192 # we require a list, but not a 'str'
193 return self.agg_list_like()
194
195 if callable(func):
196 f = com.get_cython_func(func)
197 if f and not args and not kwargs:
198 warn_alias_replacement(obj, func, f)
199 return getattr(obj, f)()
200
201 # caller can react
202 return None
203
204 def transform(self) -> DataFrame | Series:
205 """
206 Transform a DataFrame or Series.
207
208 Returns
209 -------
210 DataFrame or Series
211 Result of applying ``func`` along the given axis of the
212 Series or DataFrame.
213
214 Raises
215 ------
216 ValueError
217 If the transform function fails or does not transform.
218 """
219 obj = self.obj
220 func = self.func
221 axis = self.axis
222 args = self.args
223 kwargs = self.kwargs
224
225 is_series = obj.ndim == 1
226
227 if obj._get_axis_number(axis) == 1:
228 assert not is_series
229 return obj.T.transform(func, 0, *args, **kwargs).T
230
231 if is_list_like(func) and not is_dict_like(func):
232 func = cast(list[AggFuncTypeBase], func)
233 # Convert func equivalent dict
234 if is_series:
235 func = {com.get_callable_name(v) or v: v for v in func}
236 else:
237 func = {col: func for col in obj}
238
239 if is_dict_like(func):
240 func = cast(AggFuncTypeDict, func)
241 return self.transform_dict_like(func)
242
243 # func is either str or callable
244 func = cast(AggFuncTypeBase, func)
245 try:
246 result = self.transform_str_or_callable(func)
247 except TypeError:
248 raise
249 except Exception as err:
250 raise ValueError("Transform function failed") from err
251
252 # Functions that transform may return empty Series/DataFrame
253 # when the dtype is not appropriate
254 if (
255 isinstance(result, (ABCSeries, ABCDataFrame))
256 and result.empty
257 and not obj.empty
258 ):
259 raise ValueError("Transform function failed")
260 # error: Argument 1 to "__get__" of "AxisProperty" has incompatible type
261 # "Union[Series, DataFrame, GroupBy[Any], SeriesGroupBy,
262 # DataFrameGroupBy, BaseWindow, Resampler]"; expected "Union[DataFrame,
263 # Series]"
264 if not isinstance(result, (ABCSeries, ABCDataFrame)) or not result.index.equals(
265 obj.index # type: ignore[arg-type]
266 ):
267 raise ValueError("Function did not transform")
268
269 return result
270
271 def transform_dict_like(self, func) -> DataFrame:
272 """
273 Compute transform in the case of a dict-like func
274 """
275 from pandas.core.reshape.concat import concat
276
277 obj = self.obj
278 args = self.args
279 kwargs = self.kwargs
280
281 # transform is currently only for Series/DataFrame
282 assert isinstance(obj, ABCNDFrame)
283
284 if len(func) == 0:
285 raise ValueError("No transform functions were provided")
286
287 func = self.normalize_dictlike_arg("transform", obj, func)
288
289 results: dict[Hashable, DataFrame | Series] = {}
290 for name, how in func.items():
291 colg = obj._gotitem(name, ndim=1)
292 results[name] = colg.transform(how, 0, *args, **kwargs)
293 return concat(results, axis=1)
294
295 def transform_str_or_callable(self, func) -> DataFrame | Series:
296 """
297 Compute transform in the case of a string or callable func
298 """
299 obj = self.obj
300 args = self.args
301 kwargs = self.kwargs
302
303 if isinstance(func, str):
304 return self._apply_str(obj, func, *args, **kwargs)
305
306 if not args and not kwargs:
307 f = com.get_cython_func(func)
308 if f:
309 warn_alias_replacement(obj, func, f)
310 return getattr(obj, f)()
311
312 # Two possible ways to use a UDF - apply or call directly
313 try:
314 return obj.apply(func, args=args, **kwargs)
315 except Exception:
316 return func(obj, *args, **kwargs)
317
318 def agg_list_like(self) -> DataFrame | Series:
319 """
320 Compute aggregation in the case of a list-like argument.
321
322 Returns
323 -------
324 Result of aggregation.
325 """
326 return self.agg_or_apply_list_like(op_name="agg")
327
328 def compute_list_like(
329 self,
330 op_name: Literal["agg", "apply"],
331 selected_obj: Series | DataFrame,
332 kwargs: dict[str, Any],
333 ) -> tuple[list[Hashable] | Index, list[Any]]:
334 """
335 Compute agg/apply results for like-like input.
336
337 Parameters
338 ----------
339 op_name : {"agg", "apply"}
340 Operation being performed.
341 selected_obj : Series or DataFrame
342 Data to perform operation on.
343 kwargs : dict
344 Keyword arguments to pass to the functions.
345
346 Returns
347 -------
348 keys : list[Hashable] or Index
349 Index labels for result.
350 results : list
351 Data for result. When aggregating with a Series, this can contain any
352 Python objects.
353 """
354 func = cast(list[AggFuncTypeBase], self.func)
355 obj = self.obj
356
357 results = []
358 keys = []
359
360 # degenerate case
361 if selected_obj.ndim == 1:
362 for a in func:
363 colg = obj._gotitem(selected_obj.name, ndim=1, subset=selected_obj)
364 args = (
365 [self.axis, *self.args]
366 if include_axis(op_name, colg)
367 else self.args
368 )
369 new_res = getattr(colg, op_name)(a, *args, **kwargs)
370 results.append(new_res)
371
372 # make sure we find a good name
373 name = com.get_callable_name(a) or a
374 keys.append(name)
375
376 else:
377 indices = []
378 for index, col in enumerate(selected_obj):
379 colg = obj._gotitem(col, ndim=1, subset=selected_obj.iloc[:, index])
380 args = (
381 [self.axis, *self.args]
382 if include_axis(op_name, colg)
383 else self.args
384 )
385 new_res = getattr(colg, op_name)(func, *args, **kwargs)
386 results.append(new_res)
387 indices.append(index)
388 # error: Incompatible types in assignment (expression has type "Any |
389 # Index", variable has type "list[Any | Callable[..., Any] | str]")
390 keys = selected_obj.columns.take(indices) # type: ignore[assignment]
391
392 return keys, results
393
394 def wrap_results_list_like(
395 self, keys: Iterable[Hashable], results: list[Series | DataFrame]
396 ):
397 from pandas.core.reshape.concat import concat
398
399 obj = self.obj
400
401 try:
402 return concat(results, keys=keys, axis=1, sort=False)
403 except TypeError as err:
404 # we are concatting non-NDFrame objects,
405 # e.g. a list of scalars
406 from pandas import Series
407
408 result = Series(results, index=keys, name=obj.name)
409 if is_nested_object(result):
410 raise ValueError(
411 "cannot combine transform and aggregation operations"
412 ) from err
413 return result
414
415 def agg_dict_like(self) -> DataFrame | Series:
416 """
417 Compute aggregation in the case of a dict-like argument.
418
419 Returns
420 -------
421 Result of aggregation.
422 """
423 return self.agg_or_apply_dict_like(op_name="agg")
424
425 def compute_dict_like(
426 self,
427 op_name: Literal["agg", "apply"],
428 selected_obj: Series | DataFrame,
429 selection: Hashable | Sequence[Hashable],
430 kwargs: dict[str, Any],
431 ) -> tuple[list[Hashable], list[Any]]:
432 """
433 Compute agg/apply results for dict-like input.
434
435 Parameters
436 ----------
437 op_name : {"agg", "apply"}
438 Operation being performed.
439 selected_obj : Series or DataFrame
440 Data to perform operation on.
441 selection : hashable or sequence of hashables
442 Used by GroupBy, Window, and Resample if selection is applied to the object.
443 kwargs : dict
444 Keyword arguments to pass to the functions.
445
446 Returns
447 -------
448 keys : list[hashable]
449 Index labels for result.
450 results : list
451 Data for result. When aggregating with a Series, this can contain any
452 Python object.
453 """
454 from pandas.core.groupby.generic import (
455 DataFrameGroupBy,
456 SeriesGroupBy,
457 )
458
459 obj = self.obj
460 is_groupby = isinstance(obj, (DataFrameGroupBy, SeriesGroupBy))
461 func = cast(AggFuncTypeDict, self.func)
462 func = self.normalize_dictlike_arg(op_name, selected_obj, func)
463
464 is_non_unique_col = (
465 selected_obj.ndim == 2
466 and selected_obj.columns.nunique() < len(selected_obj.columns)
467 )
468
469 if selected_obj.ndim == 1:
470 # key only used for output
471 colg = obj._gotitem(selection, ndim=1)
472 results = [getattr(colg, op_name)(how, **kwargs) for _, how in func.items()]
473 keys = list(func.keys())
474 elif not is_groupby and is_non_unique_col:
475 # key used for column selection and output
476 # GH#51099
477 results = []
478 keys = []
479 for key, how in func.items():
480 indices = selected_obj.columns.get_indexer_for([key])
481 labels = selected_obj.columns.take(indices)
482 label_to_indices = defaultdict(list)
483 for index, label in zip(indices, labels):
484 label_to_indices[label].append(index)
485
486 key_data = [
487 getattr(selected_obj._ixs(indice, axis=1), op_name)(how, **kwargs)
488 for label, indices in label_to_indices.items()
489 for indice in indices
490 ]
491
492 keys += [key] * len(key_data)
493 results += key_data
494 else:
495 # key used for column selection and output
496 results = [
497 getattr(obj._gotitem(key, ndim=1), op_name)(how, **kwargs)
498 for key, how in func.items()
499 ]
500 keys = list(func.keys())
501
502 return keys, results
503
504 def wrap_results_dict_like(
505 self,
506 selected_obj: Series | DataFrame,
507 result_index: list[Hashable],
508 result_data: list,
509 ):
510 from pandas import Index
511 from pandas.core.reshape.concat import concat
512
513 obj = self.obj
514
515 # Avoid making two isinstance calls in all and any below
516 is_ndframe = [isinstance(r, ABCNDFrame) for r in result_data]
517
518 if all(is_ndframe):
519 results = dict(zip(result_index, result_data))
520 keys_to_use: Iterable[Hashable]
521 keys_to_use = [k for k in result_index if not results[k].empty]
522 # Have to check, if at least one DataFrame is not empty.
523 keys_to_use = keys_to_use if keys_to_use != [] else result_index
524 if selected_obj.ndim == 2:
525 # keys are columns, so we can preserve names
526 ktu = Index(keys_to_use)
527 ktu._set_names(selected_obj.columns.names)
528 keys_to_use = ktu
529
530 axis: AxisInt = 0 if isinstance(obj, ABCSeries) else 1
531 result = concat(
532 {k: results[k] for k in keys_to_use},
533 axis=axis,
534 keys=keys_to_use,
535 )
536 elif any(is_ndframe):
537 # There is a mix of NDFrames and scalars
538 raise ValueError(
539 "cannot perform both aggregation "
540 "and transformation operations "
541 "simultaneously"
542 )
543 else:
544 from pandas import Series
545
546 # we have a list of scalars
547 # GH 36212 use name only if obj is a series
548 if obj.ndim == 1:
549 obj = cast("Series", obj)
550 name = obj.name
551 else:
552 name = None
553
554 result = Series(result_data, index=result_index, name=name)
555
556 return result
557
558 def apply_str(self) -> DataFrame | Series:
559 """
560 Compute apply in case of a string.
561
562 Returns
563 -------
564 result: Series or DataFrame
565 """
566 # Caller is responsible for checking isinstance(self.f, str)
567 func = cast(str, self.func)
568
569 obj = self.obj
570
571 from pandas.core.groupby.generic import (
572 DataFrameGroupBy,
573 SeriesGroupBy,
574 )
575
576 # Support for `frame.transform('method')`
577 # Some methods (shift, etc.) require the axis argument, others
578 # don't, so inspect and insert if necessary.
579 method = getattr(obj, func, None)
580 if callable(method):
581 sig = inspect.getfullargspec(method)
582 arg_names = (*sig.args, *sig.kwonlyargs)
583 if self.axis != 0 and (
584 "axis" not in arg_names or func in ("corrwith", "skew")
585 ):
586 raise ValueError(f"Operation {func} does not support axis=1")
587 if "axis" in arg_names:
588 if isinstance(obj, (SeriesGroupBy, DataFrameGroupBy)):
589 # Try to avoid FutureWarning for deprecated axis keyword;
590 # If self.axis matches the axis we would get by not passing
591 # axis, we safely exclude the keyword.
592
593 default_axis = 0
594 if func in ["idxmax", "idxmin"]:
595 # DataFrameGroupBy.idxmax, idxmin axis defaults to self.axis,
596 # whereas other axis keywords default to 0
597 default_axis = self.obj.axis
598
599 if default_axis != self.axis:
600 self.kwargs["axis"] = self.axis
601 else:
602 self.kwargs["axis"] = self.axis
603 return self._apply_str(obj, func, *self.args, **self.kwargs)
604
605 def apply_list_or_dict_like(self) -> DataFrame | Series:
606 """
607 Compute apply in case of a list-like or dict-like.
608
609 Returns
610 -------
611 result: Series, DataFrame, or None
612 Result when self.func is a list-like or dict-like, None otherwise.
613 """
614
615 if self.engine == "numba":
616 raise NotImplementedError(
617 "The 'numba' engine doesn't support list-like/"
618 "dict likes of callables yet."
619 )
620
621 if self.axis == 1 and isinstance(self.obj, ABCDataFrame):
622 return self.obj.T.apply(self.func, 0, args=self.args, **self.kwargs).T
623
624 func = self.func
625 kwargs = self.kwargs
626
627 if is_dict_like(func):
628 result = self.agg_or_apply_dict_like(op_name="apply")
629 else:
630 result = self.agg_or_apply_list_like(op_name="apply")
631
632 result = reconstruct_and_relabel_result(result, func, **kwargs)
633
634 return result
635
636 def normalize_dictlike_arg(
637 self, how: str, obj: DataFrame | Series, func: AggFuncTypeDict
638 ) -> AggFuncTypeDict:
639 """
640 Handler for dict-like argument.
641
642 Ensures that necessary columns exist if obj is a DataFrame, and
643 that a nested renamer is not passed. Also normalizes to all lists
644 when values consists of a mix of list and non-lists.
645 """
646 assert how in ("apply", "agg", "transform")
647
648 # Can't use func.values(); wouldn't work for a Series
649 if (
650 how == "agg"
651 and isinstance(obj, ABCSeries)
652 and any(is_list_like(v) for _, v in func.items())
653 ) or (any(is_dict_like(v) for _, v in func.items())):
654 # GH 15931 - deprecation of renaming keys
655 raise SpecificationError("nested renamer is not supported")
656
657 if obj.ndim != 1:
658 # Check for missing columns on a frame
659 from pandas import Index
660
661 cols = Index(list(func.keys())).difference(obj.columns, sort=True)
662 if len(cols) > 0:
663 raise KeyError(f"Column(s) {list(cols)} do not exist")
664
665 aggregator_types = (list, tuple, dict)
666
667 # if we have a dict of any non-scalars
668 # eg. {'A' : ['mean']}, normalize all to
669 # be list-likes
670 # Cannot use func.values() because arg may be a Series
671 if any(isinstance(x, aggregator_types) for _, x in func.items()):
672 new_func: AggFuncTypeDict = {}
673 for k, v in func.items():
674 if not isinstance(v, aggregator_types):
675 new_func[k] = [v]
676 else:
677 new_func[k] = v
678 func = new_func
679 return func
680
681 def _apply_str(self, obj, func: str, *args, **kwargs):
682 """
683 if arg is a string, then try to operate on it:
684 - try to find a function (or attribute) on obj
685 - try to find a numpy function
686 - raise
687 """
688 assert isinstance(func, str)
689
690 if hasattr(obj, func):
691 f = getattr(obj, func)
692 if callable(f):
693 return f(*args, **kwargs)
694
695 # people may aggregate on a non-callable attribute
696 # but don't let them think they can pass args to it
697 assert len(args) == 0
698 assert len([kwarg for kwarg in kwargs if kwarg not in ["axis"]]) == 0
699 return f
700 elif hasattr(np, func) and hasattr(obj, "__array__"):
701 # in particular exclude Window
702 f = getattr(np, func)
703 return f(obj, *args, **kwargs)
704 else:
705 msg = f"'{func}' is not a valid function for '{type(obj).__name__}' object"
706 raise AttributeError(msg)
707
708
709class NDFrameApply(Apply):
710 """
711 Methods shared by FrameApply and SeriesApply but
712 not GroupByApply or ResamplerWindowApply
713 """
714
715 obj: DataFrame | Series
716
717 @property
718 def index(self) -> Index:
719 return self.obj.index
720
721 @property
722 def agg_axis(self) -> Index:
723 return self.obj._get_agg_axis(self.axis)
724
725 def agg_or_apply_list_like(
726 self, op_name: Literal["agg", "apply"]
727 ) -> DataFrame | Series:
728 obj = self.obj
729 kwargs = self.kwargs
730
731 if op_name == "apply":
732 if isinstance(self, FrameApply):
733 by_row = self.by_row
734
735 elif isinstance(self, SeriesApply):
736 by_row = "_compat" if self.by_row else False
737 else:
738 by_row = False
739 kwargs = {**kwargs, "by_row": by_row}
740
741 if getattr(obj, "axis", 0) == 1:
742 raise NotImplementedError("axis other than 0 is not supported")
743
744 keys, results = self.compute_list_like(op_name, obj, kwargs)
745 result = self.wrap_results_list_like(keys, results)
746 return result
747
748 def agg_or_apply_dict_like(
749 self, op_name: Literal["agg", "apply"]
750 ) -> DataFrame | Series:
751 assert op_name in ["agg", "apply"]
752 obj = self.obj
753
754 kwargs = {}
755 if op_name == "apply":
756 by_row = "_compat" if self.by_row else False
757 kwargs.update({"by_row": by_row})
758
759 if getattr(obj, "axis", 0) == 1:
760 raise NotImplementedError("axis other than 0 is not supported")
761
762 selection = None
763 result_index, result_data = self.compute_dict_like(
764 op_name, obj, selection, kwargs
765 )
766 result = self.wrap_results_dict_like(obj, result_index, result_data)
767 return result
768
769
770class FrameApply(NDFrameApply):
771 obj: DataFrame
772
773 def __init__(
774 self,
775 obj: AggObjType,
776 func: AggFuncType,
777 raw: bool,
778 result_type: str | None,
779 *,
780 by_row: Literal[False, "compat"] = False,
781 engine: str = "python",
782 engine_kwargs: dict[str, bool] | None = None,
783 args,
784 kwargs,
785 ) -> None:
786 if by_row is not False and by_row != "compat":
787 raise ValueError(f"by_row={by_row} not allowed")
788 super().__init__(
789 obj,
790 func,
791 raw,
792 result_type,
793 by_row=by_row,
794 engine=engine,
795 engine_kwargs=engine_kwargs,
796 args=args,
797 kwargs=kwargs,
798 )
799
800 # ---------------------------------------------------------------
801 # Abstract Methods
802
803 @property
804 @abc.abstractmethod
805 def result_index(self) -> Index:
806 pass
807
808 @property
809 @abc.abstractmethod
810 def result_columns(self) -> Index:
811 pass
812
813 @property
814 @abc.abstractmethod
815 def series_generator(self) -> Generator[Series, None, None]:
816 pass
817
818 @staticmethod
819 @functools.cache
820 @abc.abstractmethod
821 def generate_numba_apply_func(
822 func, nogil=True, nopython=True, parallel=False
823 ) -> Callable[[npt.NDArray, Index, Index], dict[int, Any]]:
824 pass
825
826 @abc.abstractmethod
827 def apply_with_numba(self):
828 pass
829
830 def validate_values_for_numba(self):
831 # Validate column dtyps all OK
832 for colname, dtype in self.obj.dtypes.items():
833 if not is_numeric_dtype(dtype):
834 raise ValueError(
835 f"Column {colname} must have a numeric dtype. "
836 f"Found '{dtype}' instead"
837 )
838 if is_extension_array_dtype(dtype):
839 raise ValueError(
840 f"Column {colname} is backed by an extension array, "
841 f"which is not supported by the numba engine."
842 )
843
844 @abc.abstractmethod
845 def wrap_results_for_axis(
846 self, results: ResType, res_index: Index
847 ) -> DataFrame | Series:
848 pass
849
850 # ---------------------------------------------------------------
851
852 @property
853 def res_columns(self) -> Index:
854 return self.result_columns
855
856 @property
857 def columns(self) -> Index:
858 return self.obj.columns
859
860 @cache_readonly
861 def values(self):
862 return self.obj.values
863
864 def apply(self) -> DataFrame | Series:
865 """compute the results"""
866
867 # dispatch to handle list-like or dict-like
868 if is_list_like(self.func):
869 if self.engine == "numba":
870 raise NotImplementedError(
871 "the 'numba' engine doesn't support lists of callables yet"
872 )
873 return self.apply_list_or_dict_like()
874
875 # all empty
876 if len(self.columns) == 0 and len(self.index) == 0:
877 return self.apply_empty_result()
878
879 # string dispatch
880 if isinstance(self.func, str):
881 if self.engine == "numba":
882 raise NotImplementedError(
883 "the 'numba' engine doesn't support using "
884 "a string as the callable function"
885 )
886 return self.apply_str()
887
888 # ufunc
889 elif isinstance(self.func, np.ufunc):
890 if self.engine == "numba":
891 raise NotImplementedError(
892 "the 'numba' engine doesn't support "
893 "using a numpy ufunc as the callable function"
894 )
895 with np.errstate(all="ignore"):
896 results = self.obj._mgr.apply("apply", func=self.func)
897 # _constructor will retain self.index and self.columns
898 return self.obj._constructor_from_mgr(results, axes=results.axes)
899
900 # broadcasting
901 if self.result_type == "broadcast":
902 if self.engine == "numba":
903 raise NotImplementedError(
904 "the 'numba' engine doesn't support result_type='broadcast'"
905 )
906 return self.apply_broadcast(self.obj)
907
908 # one axis empty
909 elif not all(self.obj.shape):
910 return self.apply_empty_result()
911
912 # raw
913 elif self.raw:
914 return self.apply_raw(engine=self.engine, engine_kwargs=self.engine_kwargs)
915
916 return self.apply_standard()
917
918 def agg(self):
919 obj = self.obj
920 axis = self.axis
921
922 # TODO: Avoid having to change state
923 self.obj = self.obj if self.axis == 0 else self.obj.T
924 self.axis = 0
925
926 result = None
927 try:
928 result = super().agg()
929 finally:
930 self.obj = obj
931 self.axis = axis
932
933 if axis == 1:
934 result = result.T if result is not None else result
935
936 if result is None:
937 result = self.obj.apply(self.func, axis, args=self.args, **self.kwargs)
938
939 return result
940
941 def apply_empty_result(self):
942 """
943 we have an empty result; at least 1 axis is 0
944
945 we will try to apply the function to an empty
946 series in order to see if this is a reduction function
947 """
948 assert callable(self.func)
949
950 # we are not asked to reduce or infer reduction
951 # so just return a copy of the existing object
952 if self.result_type not in ["reduce", None]:
953 return self.obj.copy()
954
955 # we may need to infer
956 should_reduce = self.result_type == "reduce"
957
958 from pandas import Series
959
960 if not should_reduce:
961 try:
962 if self.axis == 0:
963 r = self.func(
964 Series([], dtype=np.float64), *self.args, **self.kwargs
965 )
966 else:
967 r = self.func(
968 Series(index=self.columns, dtype=np.float64),
969 *self.args,
970 **self.kwargs,
971 )
972 except Exception:
973 pass
974 else:
975 should_reduce = not isinstance(r, Series)
976
977 if should_reduce:
978 if len(self.agg_axis):
979 r = self.func(Series([], dtype=np.float64), *self.args, **self.kwargs)
980 else:
981 r = np.nan
982
983 return self.obj._constructor_sliced(r, index=self.agg_axis)
984 else:
985 return self.obj.copy()
986
987 def apply_raw(self, engine="python", engine_kwargs=None):
988 """apply to the values as a numpy array"""
989
990 def wrap_function(func):
991 """
992 Wrap user supplied function to work around numpy issue.
993
994 see https://github.com/numpy/numpy/issues/8352
995 """
996
997 def wrapper(*args, **kwargs):
998 result = func(*args, **kwargs)
999 if isinstance(result, str):
1000 result = np.array(result, dtype=object)
1001 return result
1002
1003 return wrapper
1004
1005 if engine == "numba":
1006 engine_kwargs = {} if engine_kwargs is None else engine_kwargs
1007
1008 # error: Argument 1 to "__call__" of "_lru_cache_wrapper" has
1009 # incompatible type "Callable[..., Any] | str | list[Callable
1010 # [..., Any] | str] | dict[Hashable,Callable[..., Any] | str |
1011 # list[Callable[..., Any] | str]]"; expected "Hashable"
1012 nb_looper = generate_apply_looper(
1013 self.func, **engine_kwargs # type: ignore[arg-type]
1014 )
1015 result = nb_looper(self.values, self.axis)
1016 # If we made the result 2-D, squeeze it back to 1-D
1017 result = np.squeeze(result)
1018 else:
1019 result = np.apply_along_axis(
1020 wrap_function(self.func),
1021 self.axis,
1022 self.values,
1023 *self.args,
1024 **self.kwargs,
1025 )
1026
1027 # TODO: mixed type case
1028 if result.ndim == 2:
1029 return self.obj._constructor(result, index=self.index, columns=self.columns)
1030 else:
1031 return self.obj._constructor_sliced(result, index=self.agg_axis)
1032
1033 def apply_broadcast(self, target: DataFrame) -> DataFrame:
1034 assert callable(self.func)
1035
1036 result_values = np.empty_like(target.values)
1037
1038 # axis which we want to compare compliance
1039 result_compare = target.shape[0]
1040
1041 for i, col in enumerate(target.columns):
1042 res = self.func(target[col], *self.args, **self.kwargs)
1043 ares = np.asarray(res).ndim
1044
1045 # must be a scalar or 1d
1046 if ares > 1:
1047 raise ValueError("too many dims to broadcast")
1048 if ares == 1:
1049 # must match return dim
1050 if result_compare != len(res):
1051 raise ValueError("cannot broadcast result")
1052
1053 result_values[:, i] = res
1054
1055 # we *always* preserve the original index / columns
1056 result = self.obj._constructor(
1057 result_values, index=target.index, columns=target.columns
1058 )
1059 return result
1060
1061 def apply_standard(self):
1062 if self.engine == "python":
1063 results, res_index = self.apply_series_generator()
1064 else:
1065 results, res_index = self.apply_series_numba()
1066
1067 # wrap results
1068 return self.wrap_results(results, res_index)
1069
1070 def apply_series_generator(self) -> tuple[ResType, Index]:
1071 assert callable(self.func)
1072
1073 series_gen = self.series_generator
1074 res_index = self.result_index
1075
1076 results = {}
1077
1078 with option_context("mode.chained_assignment", None):
1079 for i, v in enumerate(series_gen):
1080 # ignore SettingWithCopy here in case the user mutates
1081 results[i] = self.func(v, *self.args, **self.kwargs)
1082 if isinstance(results[i], ABCSeries):
1083 # If we have a view on v, we need to make a copy because
1084 # series_generator will swap out the underlying data
1085 results[i] = results[i].copy(deep=False)
1086
1087 return results, res_index
1088
1089 def apply_series_numba(self):
1090 if self.engine_kwargs.get("parallel", False):
1091 raise NotImplementedError(
1092 "Parallel apply is not supported when raw=False and engine='numba'"
1093 )
1094 if not self.obj.index.is_unique or not self.columns.is_unique:
1095 raise NotImplementedError(
1096 "The index/columns must be unique when raw=False and engine='numba'"
1097 )
1098 self.validate_values_for_numba()
1099 results = self.apply_with_numba()
1100 return results, self.result_index
1101
1102 def wrap_results(self, results: ResType, res_index: Index) -> DataFrame | Series:
1103 from pandas import Series
1104
1105 # see if we can infer the results
1106 if len(results) > 0 and 0 in results and is_sequence(results[0]):
1107 return self.wrap_results_for_axis(results, res_index)
1108
1109 # dict of scalars
1110
1111 # the default dtype of an empty Series is `object`, but this
1112 # code can be hit by df.mean() where the result should have dtype
1113 # float64 even if it's an empty Series.
1114 constructor_sliced = self.obj._constructor_sliced
1115 if len(results) == 0 and constructor_sliced is Series:
1116 result = constructor_sliced(results, dtype=np.float64)
1117 else:
1118 result = constructor_sliced(results)
1119 result.index = res_index
1120
1121 return result
1122
1123 def apply_str(self) -> DataFrame | Series:
1124 # Caller is responsible for checking isinstance(self.func, str)
1125 # TODO: GH#39993 - Avoid special-casing by replacing with lambda
1126 if self.func == "size":
1127 # Special-cased because DataFrame.size returns a single scalar
1128 obj = self.obj
1129 value = obj.shape[self.axis]
1130 return obj._constructor_sliced(value, index=self.agg_axis)
1131 return super().apply_str()
1132
1133
1134class FrameRowApply(FrameApply):
1135 axis: AxisInt = 0
1136
1137 @property
1138 def series_generator(self) -> Generator[Series, None, None]:
1139 return (self.obj._ixs(i, axis=1) for i in range(len(self.columns)))
1140
1141 @staticmethod
1142 @functools.cache
1143 def generate_numba_apply_func(
1144 func, nogil=True, nopython=True, parallel=False
1145 ) -> Callable[[npt.NDArray, Index, Index], dict[int, Any]]:
1146 numba = import_optional_dependency("numba")
1147 from pandas import Series
1148
1149 # Import helper from extensions to cast string object -> np strings
1150 # Note: This also has the side effect of loading our numba extensions
1151 from pandas.core._numba.extensions import maybe_cast_str
1152
1153 jitted_udf = numba.extending.register_jitable(func)
1154
1155 # Currently the parallel argument doesn't get passed through here
1156 # (it's disabled) since the dicts in numba aren't thread-safe.
1157 @numba.jit(nogil=nogil, nopython=nopython, parallel=parallel)
1158 def numba_func(values, col_names, df_index):
1159 results = {}
1160 for j in range(values.shape[1]):
1161 # Create the series
1162 ser = Series(
1163 values[:, j], index=df_index, name=maybe_cast_str(col_names[j])
1164 )
1165 results[j] = jitted_udf(ser)
1166 return results
1167
1168 return numba_func
1169
1170 def apply_with_numba(self) -> dict[int, Any]:
1171 nb_func = self.generate_numba_apply_func(
1172 cast(Callable, self.func), **self.engine_kwargs
1173 )
1174 from pandas.core._numba.extensions import set_numba_data
1175
1176 index = self.obj.index
1177 if index.dtype == "string":
1178 index = index.astype(object)
1179
1180 columns = self.obj.columns
1181 if columns.dtype == "string":
1182 columns = columns.astype(object)
1183
1184 # Convert from numba dict to regular dict
1185 # Our isinstance checks in the df constructor don't pass for numbas typed dict
1186 with set_numba_data(index) as index, set_numba_data(columns) as columns:
1187 res = dict(nb_func(self.values, columns, index))
1188 return res
1189
1190 @property
1191 def result_index(self) -> Index:
1192 return self.columns
1193
1194 @property
1195 def result_columns(self) -> Index:
1196 return self.index
1197
1198 def wrap_results_for_axis(
1199 self, results: ResType, res_index: Index
1200 ) -> DataFrame | Series:
1201 """return the results for the rows"""
1202
1203 if self.result_type == "reduce":
1204 # e.g. test_apply_dict GH#8735
1205 res = self.obj._constructor_sliced(results)
1206 res.index = res_index
1207 return res
1208
1209 elif self.result_type is None and all(
1210 isinstance(x, dict) for x in results.values()
1211 ):
1212 # Our operation was a to_dict op e.g.
1213 # test_apply_dict GH#8735, test_apply_reduce_to_dict GH#25196 #37544
1214 res = self.obj._constructor_sliced(results)
1215 res.index = res_index
1216 return res
1217
1218 try:
1219 result = self.obj._constructor(data=results)
1220 except ValueError as err:
1221 if "All arrays must be of the same length" in str(err):
1222 # e.g. result = [[2, 3], [1.5], ['foo', 'bar']]
1223 # see test_agg_listlike_result GH#29587
1224 res = self.obj._constructor_sliced(results)
1225 res.index = res_index
1226 return res
1227 else:
1228 raise
1229
1230 if not isinstance(results[0], ABCSeries):
1231 if len(result.index) == len(self.res_columns):
1232 result.index = self.res_columns
1233
1234 if len(result.columns) == len(res_index):
1235 result.columns = res_index
1236
1237 return result
1238
1239
1240class FrameColumnApply(FrameApply):
1241 axis: AxisInt = 1
1242
1243 def apply_broadcast(self, target: DataFrame) -> DataFrame:
1244 result = super().apply_broadcast(target.T)
1245 return result.T
1246
1247 @property
1248 def series_generator(self) -> Generator[Series, None, None]:
1249 values = self.values
1250 values = ensure_wrapped_if_datetimelike(values)
1251 assert len(values) > 0
1252
1253 # We create one Series object, and will swap out the data inside
1254 # of it. Kids: don't do this at home.
1255 ser = self.obj._ixs(0, axis=0)
1256 mgr = ser._mgr
1257
1258 is_view = mgr.blocks[0].refs.has_reference() # type: ignore[union-attr]
1259
1260 if isinstance(ser.dtype, ExtensionDtype):
1261 # values will be incorrect for this block
1262 # TODO(EA2D): special case would be unnecessary with 2D EAs
1263 obj = self.obj
1264 for i in range(len(obj)):
1265 yield obj._ixs(i, axis=0)
1266
1267 else:
1268 for arr, name in zip(values, self.index):
1269 # GH#35462 re-pin mgr in case setitem changed it
1270 ser._mgr = mgr
1271 mgr.set_values(arr)
1272 object.__setattr__(ser, "_name", name)
1273 if not is_view:
1274 # In apply_series_generator we store the a shallow copy of the
1275 # result, which potentially increases the ref count of this reused
1276 # `ser` object (depending on the result of the applied function)
1277 # -> if that happened and `ser` is already a copy, then we reset
1278 # the refs here to avoid triggering a unnecessary CoW inside the
1279 # applied function (https://github.com/pandas-dev/pandas/pull/56212)
1280 mgr.blocks[0].refs = BlockValuesRefs(mgr.blocks[0]) # type: ignore[union-attr]
1281 yield ser
1282
1283 @staticmethod
1284 @functools.cache
1285 def generate_numba_apply_func(
1286 func, nogil=True, nopython=True, parallel=False
1287 ) -> Callable[[npt.NDArray, Index, Index], dict[int, Any]]:
1288 numba = import_optional_dependency("numba")
1289 from pandas import Series
1290 from pandas.core._numba.extensions import maybe_cast_str
1291
1292 jitted_udf = numba.extending.register_jitable(func)
1293
1294 @numba.jit(nogil=nogil, nopython=nopython, parallel=parallel)
1295 def numba_func(values, col_names_index, index):
1296 results = {}
1297 # Currently the parallel argument doesn't get passed through here
1298 # (it's disabled) since the dicts in numba aren't thread-safe.
1299 for i in range(values.shape[0]):
1300 # Create the series
1301 # TODO: values corrupted without the copy
1302 ser = Series(
1303 values[i].copy(),
1304 index=col_names_index,
1305 name=maybe_cast_str(index[i]),
1306 )
1307 results[i] = jitted_udf(ser)
1308
1309 return results
1310
1311 return numba_func
1312
1313 def apply_with_numba(self) -> dict[int, Any]:
1314 nb_func = self.generate_numba_apply_func(
1315 cast(Callable, self.func), **self.engine_kwargs
1316 )
1317
1318 from pandas.core._numba.extensions import set_numba_data
1319
1320 # Convert from numba dict to regular dict
1321 # Our isinstance checks in the df constructor don't pass for numbas typed dict
1322 with set_numba_data(self.obj.index) as index, set_numba_data(
1323 self.columns
1324 ) as columns:
1325 res = dict(nb_func(self.values, columns, index))
1326
1327 return res
1328
1329 @property
1330 def result_index(self) -> Index:
1331 return self.index
1332
1333 @property
1334 def result_columns(self) -> Index:
1335 return self.columns
1336
1337 def wrap_results_for_axis(
1338 self, results: ResType, res_index: Index
1339 ) -> DataFrame | Series:
1340 """return the results for the columns"""
1341 result: DataFrame | Series
1342
1343 # we have requested to expand
1344 if self.result_type == "expand":
1345 result = self.infer_to_same_shape(results, res_index)
1346
1347 # we have a non-series and don't want inference
1348 elif not isinstance(results[0], ABCSeries):
1349 result = self.obj._constructor_sliced(results)
1350 result.index = res_index
1351
1352 # we may want to infer results
1353 else:
1354 result = self.infer_to_same_shape(results, res_index)
1355
1356 return result
1357
1358 def infer_to_same_shape(self, results: ResType, res_index: Index) -> DataFrame:
1359 """infer the results to the same shape as the input object"""
1360 result = self.obj._constructor(data=results)
1361 result = result.T
1362
1363 # set the index
1364 result.index = res_index
1365
1366 # infer dtypes
1367 result = result.infer_objects(copy=False)
1368
1369 return result
1370
1371
1372class SeriesApply(NDFrameApply):
1373 obj: Series
1374 axis: AxisInt = 0
1375 by_row: Literal[False, "compat", "_compat"] # only relevant for apply()
1376
1377 def __init__(
1378 self,
1379 obj: Series,
1380 func: AggFuncType,
1381 *,
1382 convert_dtype: bool | lib.NoDefault = lib.no_default,
1383 by_row: Literal[False, "compat", "_compat"] = "compat",
1384 args,
1385 kwargs,
1386 ) -> None:
1387 if convert_dtype is lib.no_default:
1388 convert_dtype = True
1389 else:
1390 warnings.warn(
1391 "the convert_dtype parameter is deprecated and will be removed in a "
1392 "future version. Do ``ser.astype(object).apply()`` "
1393 "instead if you want ``convert_dtype=False``.",
1394 FutureWarning,
1395 stacklevel=find_stack_level(),
1396 )
1397 self.convert_dtype = convert_dtype
1398
1399 super().__init__(
1400 obj,
1401 func,
1402 raw=False,
1403 result_type=None,
1404 by_row=by_row,
1405 args=args,
1406 kwargs=kwargs,
1407 )
1408
1409 def apply(self) -> DataFrame | Series:
1410 obj = self.obj
1411
1412 if len(obj) == 0:
1413 return self.apply_empty_result()
1414
1415 # dispatch to handle list-like or dict-like
1416 if is_list_like(self.func):
1417 return self.apply_list_or_dict_like()
1418
1419 if isinstance(self.func, str):
1420 # if we are a string, try to dispatch
1421 return self.apply_str()
1422
1423 if self.by_row == "_compat":
1424 return self.apply_compat()
1425
1426 # self.func is Callable
1427 return self.apply_standard()
1428
1429 def agg(self):
1430 result = super().agg()
1431 if result is None:
1432 obj = self.obj
1433 func = self.func
1434 # string, list-like, and dict-like are entirely handled in super
1435 assert callable(func)
1436
1437 # GH53325: The setup below is just to keep current behavior while emitting a
1438 # deprecation message. In the future this will all be replaced with a simple
1439 # `result = f(self.obj, *self.args, **self.kwargs)`.
1440 try:
1441 result = obj.apply(func, args=self.args, **self.kwargs)
1442 except (ValueError, AttributeError, TypeError):
1443 result = func(obj, *self.args, **self.kwargs)
1444 else:
1445 msg = (
1446 f"using {func} in {type(obj).__name__}.agg cannot aggregate and "
1447 f"has been deprecated. Use {type(obj).__name__}.transform to "
1448 f"keep behavior unchanged."
1449 )
1450 warnings.warn(msg, FutureWarning, stacklevel=find_stack_level())
1451
1452 return result
1453
1454 def apply_empty_result(self) -> Series:
1455 obj = self.obj
1456 return obj._constructor(dtype=obj.dtype, index=obj.index).__finalize__(
1457 obj, method="apply"
1458 )
1459
1460 def apply_compat(self):
1461 """compat apply method for funcs in listlikes and dictlikes.
1462
1463 Used for each callable when giving listlikes and dictlikes of callables to
1464 apply. Needed for compatibility with Pandas < v2.1.
1465
1466 .. versionadded:: 2.1.0
1467 """
1468 obj = self.obj
1469 func = self.func
1470
1471 if callable(func):
1472 f = com.get_cython_func(func)
1473 if f and not self.args and not self.kwargs:
1474 return obj.apply(func, by_row=False)
1475
1476 try:
1477 result = obj.apply(func, by_row="compat")
1478 except (ValueError, AttributeError, TypeError):
1479 result = obj.apply(func, by_row=False)
1480 return result
1481
1482 def apply_standard(self) -> DataFrame | Series:
1483 # caller is responsible for ensuring that f is Callable
1484 func = cast(Callable, self.func)
1485 obj = self.obj
1486
1487 if isinstance(func, np.ufunc):
1488 with np.errstate(all="ignore"):
1489 return func(obj, *self.args, **self.kwargs)
1490 elif not self.by_row:
1491 return func(obj, *self.args, **self.kwargs)
1492
1493 if self.args or self.kwargs:
1494 # _map_values does not support args/kwargs
1495 def curried(x):
1496 return func(x, *self.args, **self.kwargs)
1497
1498 else:
1499 curried = func
1500
1501 # row-wise access
1502 # apply doesn't have a `na_action` keyword and for backward compat reasons
1503 # we need to give `na_action="ignore"` for categorical data.
1504 # TODO: remove the `na_action="ignore"` when that default has been changed in
1505 # Categorical (GH51645).
1506 action = "ignore" if isinstance(obj.dtype, CategoricalDtype) else None
1507 mapped = obj._map_values(
1508 mapper=curried, na_action=action, convert=self.convert_dtype
1509 )
1510
1511 if len(mapped) and isinstance(mapped[0], ABCSeries):
1512 # GH#43986 Need to do list(mapped) in order to get treated as nested
1513 # See also GH#25959 regarding EA support
1514 return obj._constructor_expanddim(list(mapped), index=obj.index)
1515 else:
1516 return obj._constructor(mapped, index=obj.index).__finalize__(
1517 obj, method="apply"
1518 )
1519
1520
1521class GroupByApply(Apply):
1522 obj: GroupBy | Resampler | BaseWindow
1523
1524 def __init__(
1525 self,
1526 obj: GroupBy[NDFrameT],
1527 func: AggFuncType,
1528 *,
1529 args,
1530 kwargs,
1531 ) -> None:
1532 kwargs = kwargs.copy()
1533 self.axis = obj.obj._get_axis_number(kwargs.get("axis", 0))
1534 super().__init__(
1535 obj,
1536 func,
1537 raw=False,
1538 result_type=None,
1539 args=args,
1540 kwargs=kwargs,
1541 )
1542
1543 def apply(self):
1544 raise NotImplementedError
1545
1546 def transform(self):
1547 raise NotImplementedError
1548
1549 def agg_or_apply_list_like(
1550 self, op_name: Literal["agg", "apply"]
1551 ) -> DataFrame | Series:
1552 obj = self.obj
1553 kwargs = self.kwargs
1554 if op_name == "apply":
1555 kwargs = {**kwargs, "by_row": False}
1556
1557 if getattr(obj, "axis", 0) == 1:
1558 raise NotImplementedError("axis other than 0 is not supported")
1559
1560 if obj._selected_obj.ndim == 1:
1561 # For SeriesGroupBy this matches _obj_with_exclusions
1562 selected_obj = obj._selected_obj
1563 else:
1564 selected_obj = obj._obj_with_exclusions
1565
1566 # Only set as_index=True on groupby objects, not Window or Resample
1567 # that inherit from this class.
1568 with com.temp_setattr(
1569 obj, "as_index", True, condition=hasattr(obj, "as_index")
1570 ):
1571 keys, results = self.compute_list_like(op_name, selected_obj, kwargs)
1572 result = self.wrap_results_list_like(keys, results)
1573 return result
1574
1575 def agg_or_apply_dict_like(
1576 self, op_name: Literal["agg", "apply"]
1577 ) -> DataFrame | Series:
1578 from pandas.core.groupby.generic import (
1579 DataFrameGroupBy,
1580 SeriesGroupBy,
1581 )
1582
1583 assert op_name in ["agg", "apply"]
1584
1585 obj = self.obj
1586 kwargs = {}
1587 if op_name == "apply":
1588 by_row = "_compat" if self.by_row else False
1589 kwargs.update({"by_row": by_row})
1590
1591 if getattr(obj, "axis", 0) == 1:
1592 raise NotImplementedError("axis other than 0 is not supported")
1593
1594 selected_obj = obj._selected_obj
1595 selection = obj._selection
1596
1597 is_groupby = isinstance(obj, (DataFrameGroupBy, SeriesGroupBy))
1598
1599 # Numba Groupby engine/engine-kwargs passthrough
1600 if is_groupby:
1601 engine = self.kwargs.get("engine", None)
1602 engine_kwargs = self.kwargs.get("engine_kwargs", None)
1603 kwargs.update({"engine": engine, "engine_kwargs": engine_kwargs})
1604
1605 with com.temp_setattr(
1606 obj, "as_index", True, condition=hasattr(obj, "as_index")
1607 ):
1608 result_index, result_data = self.compute_dict_like(
1609 op_name, selected_obj, selection, kwargs
1610 )
1611 result = self.wrap_results_dict_like(selected_obj, result_index, result_data)
1612 return result
1613
1614
1615class ResamplerWindowApply(GroupByApply):
1616 axis: AxisInt = 0
1617 obj: Resampler | BaseWindow
1618
1619 def __init__(
1620 self,
1621 obj: Resampler | BaseWindow,
1622 func: AggFuncType,
1623 *,
1624 args,
1625 kwargs,
1626 ) -> None:
1627 super(GroupByApply, self).__init__(
1628 obj,
1629 func,
1630 raw=False,
1631 result_type=None,
1632 args=args,
1633 kwargs=kwargs,
1634 )
1635
1636 def apply(self):
1637 raise NotImplementedError
1638
1639 def transform(self):
1640 raise NotImplementedError
1641
1642
1643def reconstruct_func(
1644 func: AggFuncType | None, **kwargs
1645) -> tuple[bool, AggFuncType, tuple[str, ...] | None, npt.NDArray[np.intp] | None]:
1646 """
1647 This is the internal function to reconstruct func given if there is relabeling
1648 or not and also normalize the keyword to get new order of columns.
1649
1650 If named aggregation is applied, `func` will be None, and kwargs contains the
1651 column and aggregation function information to be parsed;
1652 If named aggregation is not applied, `func` is either string (e.g. 'min') or
1653 Callable, or list of them (e.g. ['min', np.max]), or the dictionary of column name
1654 and str/Callable/list of them (e.g. {'A': 'min'}, or {'A': [np.min, lambda x: x]})
1655
1656 If relabeling is True, will return relabeling, reconstructed func, column
1657 names, and the reconstructed order of columns.
1658 If relabeling is False, the columns and order will be None.
1659
1660 Parameters
1661 ----------
1662 func: agg function (e.g. 'min' or Callable) or list of agg functions
1663 (e.g. ['min', np.max]) or dictionary (e.g. {'A': ['min', np.max]}).
1664 **kwargs: dict, kwargs used in is_multi_agg_with_relabel and
1665 normalize_keyword_aggregation function for relabelling
1666
1667 Returns
1668 -------
1669 relabelling: bool, if there is relabelling or not
1670 func: normalized and mangled func
1671 columns: tuple of column names
1672 order: array of columns indices
1673
1674 Examples
1675 --------
1676 >>> reconstruct_func(None, **{"foo": ("col", "min")})
1677 (True, defaultdict(<class 'list'>, {'col': ['min']}), ('foo',), array([0]))
1678
1679 >>> reconstruct_func("min")
1680 (False, 'min', None, None)
1681 """
1682 relabeling = func is None and is_multi_agg_with_relabel(**kwargs)
1683 columns: tuple[str, ...] | None = None
1684 order: npt.NDArray[np.intp] | None = None
1685
1686 if not relabeling:
1687 if isinstance(func, list) and len(func) > len(set(func)):
1688 # GH 28426 will raise error if duplicated function names are used and
1689 # there is no reassigned name
1690 raise SpecificationError(
1691 "Function names must be unique if there is no new column names "
1692 "assigned"
1693 )
1694 if func is None:
1695 # nicer error message
1696 raise TypeError("Must provide 'func' or tuples of '(column, aggfunc).")
1697
1698 if relabeling:
1699 # error: Incompatible types in assignment (expression has type
1700 # "MutableMapping[Hashable, list[Callable[..., Any] | str]]", variable has type
1701 # "Callable[..., Any] | str | list[Callable[..., Any] | str] |
1702 # MutableMapping[Hashable, Callable[..., Any] | str | list[Callable[..., Any] |
1703 # str]] | None")
1704 func, columns, order = normalize_keyword_aggregation( # type: ignore[assignment]
1705 kwargs
1706 )
1707 assert func is not None
1708
1709 return relabeling, func, columns, order
1710
1711
1712def is_multi_agg_with_relabel(**kwargs) -> bool:
1713 """
1714 Check whether kwargs passed to .agg look like multi-agg with relabeling.
1715
1716 Parameters
1717 ----------
1718 **kwargs : dict
1719
1720 Returns
1721 -------
1722 bool
1723
1724 Examples
1725 --------
1726 >>> is_multi_agg_with_relabel(a="max")
1727 False
1728 >>> is_multi_agg_with_relabel(a_max=("a", "max"), a_min=("a", "min"))
1729 True
1730 >>> is_multi_agg_with_relabel()
1731 False
1732 """
1733 return all(isinstance(v, tuple) and len(v) == 2 for v in kwargs.values()) and (
1734 len(kwargs) > 0
1735 )
1736
1737
1738def normalize_keyword_aggregation(
1739 kwargs: dict,
1740) -> tuple[
1741 MutableMapping[Hashable, list[AggFuncTypeBase]],
1742 tuple[str, ...],
1743 npt.NDArray[np.intp],
1744]:
1745 """
1746 Normalize user-provided "named aggregation" kwargs.
1747 Transforms from the new ``Mapping[str, NamedAgg]`` style kwargs
1748 to the old Dict[str, List[scalar]]].
1749
1750 Parameters
1751 ----------
1752 kwargs : dict
1753
1754 Returns
1755 -------
1756 aggspec : dict
1757 The transformed kwargs.
1758 columns : tuple[str, ...]
1759 The user-provided keys.
1760 col_idx_order : List[int]
1761 List of columns indices.
1762
1763 Examples
1764 --------
1765 >>> normalize_keyword_aggregation({"output": ("input", "sum")})
1766 (defaultdict(<class 'list'>, {'input': ['sum']}), ('output',), array([0]))
1767 """
1768 from pandas.core.indexes.base import Index
1769
1770 # Normalize the aggregation functions as Mapping[column, List[func]],
1771 # process normally, then fixup the names.
1772 # TODO: aggspec type: typing.Dict[str, List[AggScalar]]
1773 aggspec = defaultdict(list)
1774 order = []
1775 columns, pairs = list(zip(*kwargs.items()))
1776
1777 for column, aggfunc in pairs:
1778 aggspec[column].append(aggfunc)
1779 order.append((column, com.get_callable_name(aggfunc) or aggfunc))
1780
1781 # uniquify aggfunc name if duplicated in order list
1782 uniquified_order = _make_unique_kwarg_list(order)
1783
1784 # GH 25719, due to aggspec will change the order of assigned columns in aggregation
1785 # uniquified_aggspec will store uniquified order list and will compare it with order
1786 # based on index
1787 aggspec_order = [
1788 (column, com.get_callable_name(aggfunc) or aggfunc)
1789 for column, aggfuncs in aggspec.items()
1790 for aggfunc in aggfuncs
1791 ]
1792 uniquified_aggspec = _make_unique_kwarg_list(aggspec_order)
1793
1794 # get the new index of columns by comparison
1795 col_idx_order = Index(uniquified_aggspec).get_indexer(uniquified_order)
1796 return aggspec, columns, col_idx_order
1797
1798
1799def _make_unique_kwarg_list(
1800 seq: Sequence[tuple[Any, Any]]
1801) -> Sequence[tuple[Any, Any]]:
1802 """
1803 Uniquify aggfunc name of the pairs in the order list
1804
1805 Examples:
1806 --------
1807 >>> kwarg_list = [('a', '<lambda>'), ('a', '<lambda>'), ('b', '<lambda>')]
1808 >>> _make_unique_kwarg_list(kwarg_list)
1809 [('a', '<lambda>_0'), ('a', '<lambda>_1'), ('b', '<lambda>')]
1810 """
1811 return [
1812 (pair[0], f"{pair[1]}_{seq[:i].count(pair)}") if seq.count(pair) > 1 else pair
1813 for i, pair in enumerate(seq)
1814 ]
1815
1816
1817def relabel_result(
1818 result: DataFrame | Series,
1819 func: dict[str, list[Callable | str]],
1820 columns: Iterable[Hashable],
1821 order: Iterable[int],
1822) -> dict[Hashable, Series]:
1823 """
1824 Internal function to reorder result if relabelling is True for
1825 dataframe.agg, and return the reordered result in dict.
1826
1827 Parameters:
1828 ----------
1829 result: Result from aggregation
1830 func: Dict of (column name, funcs)
1831 columns: New columns name for relabelling
1832 order: New order for relabelling
1833
1834 Examples
1835 --------
1836 >>> from pandas.core.apply import relabel_result
1837 >>> result = pd.DataFrame(
1838 ... {"A": [np.nan, 2, np.nan], "C": [6, np.nan, np.nan], "B": [np.nan, 4, 2.5]},
1839 ... index=["max", "mean", "min"]
1840 ... )
1841 >>> funcs = {"A": ["max"], "C": ["max"], "B": ["mean", "min"]}
1842 >>> columns = ("foo", "aab", "bar", "dat")
1843 >>> order = [0, 1, 2, 3]
1844 >>> result_in_dict = relabel_result(result, funcs, columns, order)
1845 >>> pd.DataFrame(result_in_dict, index=columns)
1846 A C B
1847 foo 2.0 NaN NaN
1848 aab NaN 6.0 NaN
1849 bar NaN NaN 4.0
1850 dat NaN NaN 2.5
1851 """
1852 from pandas.core.indexes.base import Index
1853
1854 reordered_indexes = [
1855 pair[0] for pair in sorted(zip(columns, order), key=lambda t: t[1])
1856 ]
1857 reordered_result_in_dict: dict[Hashable, Series] = {}
1858 idx = 0
1859
1860 reorder_mask = not isinstance(result, ABCSeries) and len(result.columns) > 1
1861 for col, fun in func.items():
1862 s = result[col].dropna()
1863
1864 # In the `_aggregate`, the callable names are obtained and used in `result`, and
1865 # these names are ordered alphabetically. e.g.
1866 # C2 C1
1867 # <lambda> 1 NaN
1868 # amax NaN 4.0
1869 # max NaN 4.0
1870 # sum 18.0 6.0
1871 # Therefore, the order of functions for each column could be shuffled
1872 # accordingly so need to get the callable name if it is not parsed names, and
1873 # reorder the aggregated result for each column.
1874 # e.g. if df.agg(c1=("C2", sum), c2=("C2", lambda x: min(x))), correct order is
1875 # [sum, <lambda>], but in `result`, it will be [<lambda>, sum], and we need to
1876 # reorder so that aggregated values map to their functions regarding the order.
1877
1878 # However there is only one column being used for aggregation, not need to
1879 # reorder since the index is not sorted, and keep as is in `funcs`, e.g.
1880 # A
1881 # min 1.0
1882 # mean 1.5
1883 # mean 1.5
1884 if reorder_mask:
1885 fun = [
1886 com.get_callable_name(f) if not isinstance(f, str) else f for f in fun
1887 ]
1888 col_idx_order = Index(s.index).get_indexer(fun)
1889 s = s.iloc[col_idx_order]
1890
1891 # assign the new user-provided "named aggregation" as index names, and reindex
1892 # it based on the whole user-provided names.
1893 s.index = reordered_indexes[idx : idx + len(fun)]
1894 reordered_result_in_dict[col] = s.reindex(columns, copy=False)
1895 idx = idx + len(fun)
1896 return reordered_result_in_dict
1897
1898
1899def reconstruct_and_relabel_result(result, func, **kwargs) -> DataFrame | Series:
1900 from pandas import DataFrame
1901
1902 relabeling, func, columns, order = reconstruct_func(func, **kwargs)
1903
1904 if relabeling:
1905 # This is to keep the order to columns occurrence unchanged, and also
1906 # keep the order of new columns occurrence unchanged
1907
1908 # For the return values of reconstruct_func, if relabeling is
1909 # False, columns and order will be None.
1910 assert columns is not None
1911 assert order is not None
1912
1913 result_in_dict = relabel_result(result, func, columns, order)
1914 result = DataFrame(result_in_dict, index=columns)
1915
1916 return result
1917
1918
1919# TODO: Can't use, because mypy doesn't like us setting __name__
1920# error: "partial[Any]" has no attribute "__name__"
1921# the type is:
1922# typing.Sequence[Callable[..., ScalarResult]]
1923# -> typing.Sequence[Callable[..., ScalarResult]]:
1924
1925
1926def _managle_lambda_list(aggfuncs: Sequence[Any]) -> Sequence[Any]:
1927 """
1928 Possibly mangle a list of aggfuncs.
1929
1930 Parameters
1931 ----------
1932 aggfuncs : Sequence
1933
1934 Returns
1935 -------
1936 mangled: list-like
1937 A new AggSpec sequence, where lambdas have been converted
1938 to have unique names.
1939
1940 Notes
1941 -----
1942 If just one aggfunc is passed, the name will not be mangled.
1943 """
1944 if len(aggfuncs) <= 1:
1945 # don't mangle for .agg([lambda x: .])
1946 return aggfuncs
1947 i = 0
1948 mangled_aggfuncs = []
1949 for aggfunc in aggfuncs:
1950 if com.get_callable_name(aggfunc) == "<lambda>":
1951 aggfunc = partial(aggfunc)
1952 aggfunc.__name__ = f"<lambda_{i}>"
1953 i += 1
1954 mangled_aggfuncs.append(aggfunc)
1955
1956 return mangled_aggfuncs
1957
1958
1959def maybe_mangle_lambdas(agg_spec: Any) -> Any:
1960 """
1961 Make new lambdas with unique names.
1962
1963 Parameters
1964 ----------
1965 agg_spec : Any
1966 An argument to GroupBy.agg.
1967 Non-dict-like `agg_spec` are pass through as is.
1968 For dict-like `agg_spec` a new spec is returned
1969 with name-mangled lambdas.
1970
1971 Returns
1972 -------
1973 mangled : Any
1974 Same type as the input.
1975
1976 Examples
1977 --------
1978 >>> maybe_mangle_lambdas('sum')
1979 'sum'
1980 >>> maybe_mangle_lambdas([lambda: 1, lambda: 2]) # doctest: +SKIP
1981 [<function __main__.<lambda_0>,
1982 <function pandas...._make_lambda.<locals>.f(*args, **kwargs)>]
1983 """
1984 is_dict = is_dict_like(agg_spec)
1985 if not (is_dict or is_list_like(agg_spec)):
1986 return agg_spec
1987 mangled_aggspec = type(agg_spec)() # dict or OrderedDict
1988
1989 if is_dict:
1990 for key, aggfuncs in agg_spec.items():
1991 if is_list_like(aggfuncs) and not is_dict_like(aggfuncs):
1992 mangled_aggfuncs = _managle_lambda_list(aggfuncs)
1993 else:
1994 mangled_aggfuncs = aggfuncs
1995
1996 mangled_aggspec[key] = mangled_aggfuncs
1997 else:
1998 mangled_aggspec = _managle_lambda_list(agg_spec)
1999
2000 return mangled_aggspec
2001
2002
2003def validate_func_kwargs(
2004 kwargs: dict,
2005) -> tuple[list[str], list[str | Callable[..., Any]]]:
2006 """
2007 Validates types of user-provided "named aggregation" kwargs.
2008 `TypeError` is raised if aggfunc is not `str` or callable.
2009
2010 Parameters
2011 ----------
2012 kwargs : dict
2013
2014 Returns
2015 -------
2016 columns : List[str]
2017 List of user-provided keys.
2018 func : List[Union[str, callable[...,Any]]]
2019 List of user-provided aggfuncs
2020
2021 Examples
2022 --------
2023 >>> validate_func_kwargs({'one': 'min', 'two': 'max'})
2024 (['one', 'two'], ['min', 'max'])
2025 """
2026 tuple_given_message = "func is expected but received {} in **kwargs."
2027 columns = list(kwargs)
2028 func = []
2029 for col_func in kwargs.values():
2030 if not (isinstance(col_func, str) or callable(col_func)):
2031 raise TypeError(tuple_given_message.format(type(col_func).__name__))
2032 func.append(col_func)
2033 if not columns:
2034 no_arg_message = "Must provide 'func' or named aggregation **kwargs."
2035 raise TypeError(no_arg_message)
2036 return columns, func
2037
2038
2039def include_axis(op_name: Literal["agg", "apply"], colg: Series | DataFrame) -> bool:
2040 return isinstance(colg, ABCDataFrame) or (
2041 isinstance(colg, ABCSeries) and op_name == "agg"
2042 )
2043
2044
2045def warn_alias_replacement(
2046 obj: AggObjType,
2047 func: Callable,
2048 alias: str,
2049) -> None:
2050 if alias.startswith("np."):
2051 full_alias = alias
2052 else:
2053 full_alias = f"{type(obj).__name__}.{alias}"
2054 alias = f'"{alias}"'
2055 warnings.warn(
2056 f"The provided callable {func} is currently using "
2057 f"{full_alias}. In a future version of pandas, "
2058 f"the provided callable will be used directly. To keep current "
2059 f"behavior pass the string {alias} instead.",
2060 category=FutureWarning,
2061 stacklevel=find_stack_level(),
2062 )