1from __future__ import annotations
2
3import abc
4from collections import defaultdict
5from contextlib import nullcontext
6from functools import partial
7import inspect
8from typing import (
9 TYPE_CHECKING,
10 Any,
11 Callable,
12 ContextManager,
13 DefaultDict,
14 Dict,
15 Hashable,
16 Iterable,
17 Iterator,
18 List,
19 Sequence,
20 cast,
21)
22
23import numpy as np
24
25from pandas._config import option_context
26
27from pandas._libs import lib
28from pandas._typing import (
29 AggFuncType,
30 AggFuncTypeBase,
31 AggFuncTypeDict,
32 AggObjType,
33 Axis,
34 AxisInt,
35 NDFrameT,
36 npt,
37)
38from pandas.errors import SpecificationError
39from pandas.util._decorators import cache_readonly
40
41from pandas.core.dtypes.cast import is_nested_object
42from pandas.core.dtypes.common import (
43 is_dict_like,
44 is_extension_array_dtype,
45 is_list_like,
46 is_sequence,
47)
48from pandas.core.dtypes.generic import (
49 ABCDataFrame,
50 ABCNDFrame,
51 ABCSeries,
52)
53
54from pandas.core.algorithms import safe_sort
55from pandas.core.base import SelectionMixin
56import pandas.core.common as com
57from pandas.core.construction import ensure_wrapped_if_datetimelike
58
59if TYPE_CHECKING:
60 from pandas import (
61 DataFrame,
62 Index,
63 Series,
64 )
65 from pandas.core.groupby import GroupBy
66 from pandas.core.resample import Resampler
67 from pandas.core.window.rolling import BaseWindow
68
69
70ResType = Dict[int, Any]
71
72
73def frame_apply(
74 obj: DataFrame,
75 func: AggFuncType,
76 axis: Axis = 0,
77 raw: bool = False,
78 result_type: str | None = None,
79 args=None,
80 kwargs=None,
81) -> FrameApply:
82 """construct and return a row or column based frame apply object"""
83 axis = obj._get_axis_number(axis)
84 klass: type[FrameApply]
85 if axis == 0:
86 klass = FrameRowApply
87 elif axis == 1:
88 klass = FrameColumnApply
89
90 return klass(
91 obj,
92 func,
93 raw=raw,
94 result_type=result_type,
95 args=args,
96 kwargs=kwargs,
97 )
98
99
100class Apply(metaclass=abc.ABCMeta):
101 axis: AxisInt
102
103 def __init__(
104 self,
105 obj: AggObjType,
106 func,
107 raw: bool,
108 result_type: str | None,
109 args,
110 kwargs,
111 ) -> None:
112 self.obj = obj
113 self.raw = raw
114 self.args = args or ()
115 self.kwargs = kwargs or {}
116
117 if result_type not in [None, "reduce", "broadcast", "expand"]:
118 raise ValueError(
119 "invalid value for result_type, must be one "
120 "of {None, 'reduce', 'broadcast', 'expand'}"
121 )
122
123 self.result_type = result_type
124
125 # curry if needed
126 if (
127 (kwargs or args)
128 and not isinstance(func, (np.ufunc, str))
129 and not is_list_like(func)
130 ):
131
132 def f(x):
133 return func(x, *args, **kwargs)
134
135 else:
136 f = func
137
138 self.orig_f: AggFuncType = func
139 self.f: AggFuncType = f
140
141 @abc.abstractmethod
142 def apply(self) -> DataFrame | Series:
143 pass
144
145 def agg(self) -> DataFrame | Series | None:
146 """
147 Provide an implementation for the aggregators.
148
149 Returns
150 -------
151 Result of aggregation, or None if agg cannot be performed by
152 this method.
153 """
154 obj = self.obj
155 arg = self.f
156 args = self.args
157 kwargs = self.kwargs
158
159 if isinstance(arg, str):
160 return self.apply_str()
161
162 if is_dict_like(arg):
163 return self.agg_dict_like()
164 elif is_list_like(arg):
165 # we require a list, but not a 'str'
166 return self.agg_list_like()
167
168 if callable(arg):
169 f = com.get_cython_func(arg)
170 if f and not args and not kwargs:
171 return getattr(obj, f)()
172
173 # caller can react
174 return None
175
176 def transform(self) -> DataFrame | Series:
177 """
178 Transform a DataFrame or Series.
179
180 Returns
181 -------
182 DataFrame or Series
183 Result of applying ``func`` along the given axis of the
184 Series or DataFrame.
185
186 Raises
187 ------
188 ValueError
189 If the transform function fails or does not transform.
190 """
191 obj = self.obj
192 func = self.orig_f
193 axis = self.axis
194 args = self.args
195 kwargs = self.kwargs
196
197 is_series = obj.ndim == 1
198
199 if obj._get_axis_number(axis) == 1:
200 assert not is_series
201 return obj.T.transform(func, 0, *args, **kwargs).T
202
203 if is_list_like(func) and not is_dict_like(func):
204 func = cast(List[AggFuncTypeBase], func)
205 # Convert func equivalent dict
206 if is_series:
207 func = {com.get_callable_name(v) or v: v for v in func}
208 else:
209 func = {col: func for col in obj}
210
211 if is_dict_like(func):
212 func = cast(AggFuncTypeDict, func)
213 return self.transform_dict_like(func)
214
215 # func is either str or callable
216 func = cast(AggFuncTypeBase, func)
217 try:
218 result = self.transform_str_or_callable(func)
219 except TypeError:
220 raise
221 except Exception as err:
222 raise ValueError("Transform function failed") from err
223
224 # Functions that transform may return empty Series/DataFrame
225 # when the dtype is not appropriate
226 if (
227 isinstance(result, (ABCSeries, ABCDataFrame))
228 and result.empty
229 and not obj.empty
230 ):
231 raise ValueError("Transform function failed")
232 # error: Argument 1 to "__get__" of "AxisProperty" has incompatible type
233 # "Union[Series, DataFrame, GroupBy[Any], SeriesGroupBy,
234 # DataFrameGroupBy, BaseWindow, Resampler]"; expected "Union[DataFrame,
235 # Series]"
236 if not isinstance(result, (ABCSeries, ABCDataFrame)) or not result.index.equals(
237 obj.index # type:ignore[arg-type]
238 ):
239 raise ValueError("Function did not transform")
240
241 return result
242
243 def transform_dict_like(self, func):
244 """
245 Compute transform in the case of a dict-like func
246 """
247 from pandas.core.reshape.concat import concat
248
249 obj = self.obj
250 args = self.args
251 kwargs = self.kwargs
252
253 # transform is currently only for Series/DataFrame
254 assert isinstance(obj, ABCNDFrame)
255
256 if len(func) == 0:
257 raise ValueError("No transform functions were provided")
258
259 func = self.normalize_dictlike_arg("transform", obj, func)
260
261 results: dict[Hashable, DataFrame | Series] = {}
262 for name, how in func.items():
263 colg = obj._gotitem(name, ndim=1)
264 results[name] = colg.transform(how, 0, *args, **kwargs)
265 return concat(results, axis=1)
266
267 def transform_str_or_callable(self, func) -> DataFrame | Series:
268 """
269 Compute transform in the case of a string or callable func
270 """
271 obj = self.obj
272 args = self.args
273 kwargs = self.kwargs
274
275 if isinstance(func, str):
276 return self._try_aggregate_string_function(obj, func, *args, **kwargs)
277
278 if not args and not kwargs:
279 f = com.get_cython_func(func)
280 if f:
281 return getattr(obj, f)()
282
283 # Two possible ways to use a UDF - apply or call directly
284 try:
285 return obj.apply(func, args=args, **kwargs)
286 except Exception:
287 return func(obj, *args, **kwargs)
288
289 def agg_list_like(self) -> DataFrame | Series:
290 """
291 Compute aggregation in the case of a list-like argument.
292
293 Returns
294 -------
295 Result of aggregation.
296 """
297 from pandas.core.groupby.generic import (
298 DataFrameGroupBy,
299 SeriesGroupBy,
300 )
301 from pandas.core.reshape.concat import concat
302
303 obj = self.obj
304 arg = cast(List[AggFuncTypeBase], self.f)
305
306 if getattr(obj, "axis", 0) == 1:
307 raise NotImplementedError("axis other than 0 is not supported")
308
309 if not isinstance(obj, SelectionMixin):
310 # i.e. obj is Series or DataFrame
311 selected_obj = obj
312 elif obj._selected_obj.ndim == 1:
313 # For SeriesGroupBy this matches _obj_with_exclusions
314 selected_obj = obj._selected_obj
315 else:
316 selected_obj = obj._obj_with_exclusions
317
318 results = []
319 keys = []
320
321 is_groupby = isinstance(obj, (DataFrameGroupBy, SeriesGroupBy))
322 context_manager: ContextManager
323 if is_groupby:
324 # When as_index=False, we combine all results using indices
325 # and adjust index after
326 context_manager = com.temp_setattr(obj, "as_index", True)
327 else:
328 context_manager = nullcontext()
329 with context_manager:
330 # degenerate case
331 if selected_obj.ndim == 1:
332 for a in arg:
333 colg = obj._gotitem(selected_obj.name, ndim=1, subset=selected_obj)
334 if isinstance(colg, (ABCSeries, ABCDataFrame)):
335 new_res = colg.aggregate(
336 a, self.axis, *self.args, **self.kwargs
337 )
338 else:
339 new_res = colg.aggregate(a, *self.args, **self.kwargs)
340 results.append(new_res)
341
342 # make sure we find a good name
343 name = com.get_callable_name(a) or a
344 keys.append(name)
345
346 else:
347 indices = []
348 for index, col in enumerate(selected_obj):
349 colg = obj._gotitem(col, ndim=1, subset=selected_obj.iloc[:, index])
350 if isinstance(colg, (ABCSeries, ABCDataFrame)):
351 new_res = colg.aggregate(
352 arg, self.axis, *self.args, **self.kwargs
353 )
354 else:
355 new_res = colg.aggregate(arg, *self.args, **self.kwargs)
356 results.append(new_res)
357 indices.append(index)
358 keys = selected_obj.columns.take(indices)
359
360 try:
361 return concat(results, keys=keys, axis=1, sort=False)
362 except TypeError as err:
363 # we are concatting non-NDFrame objects,
364 # e.g. a list of scalars
365 from pandas import Series
366
367 result = Series(results, index=keys, name=obj.name)
368 if is_nested_object(result):
369 raise ValueError(
370 "cannot combine transform and aggregation operations"
371 ) from err
372 return result
373
374 def agg_dict_like(self) -> DataFrame | Series:
375 """
376 Compute aggregation in the case of a dict-like argument.
377
378 Returns
379 -------
380 Result of aggregation.
381 """
382 from pandas import Index
383 from pandas.core.groupby.generic import (
384 DataFrameGroupBy,
385 SeriesGroupBy,
386 )
387 from pandas.core.reshape.concat import concat
388
389 obj = self.obj
390 arg = cast(AggFuncTypeDict, self.f)
391
392 if getattr(obj, "axis", 0) == 1:
393 raise NotImplementedError("axis other than 0 is not supported")
394
395 if not isinstance(obj, SelectionMixin):
396 # i.e. obj is Series or DataFrame
397 selected_obj = obj
398 selection = None
399 else:
400 selected_obj = obj._selected_obj
401 selection = obj._selection
402
403 arg = self.normalize_dictlike_arg("agg", selected_obj, arg)
404
405 is_groupby = isinstance(obj, (DataFrameGroupBy, SeriesGroupBy))
406 context_manager: ContextManager
407 if is_groupby:
408 # When as_index=False, we combine all results using indices
409 # and adjust index after
410 context_manager = com.temp_setattr(obj, "as_index", True)
411 else:
412 context_manager = nullcontext()
413 with context_manager:
414 if selected_obj.ndim == 1:
415 # key only used for output
416 colg = obj._gotitem(selection, ndim=1)
417 results = {key: colg.agg(how) for key, how in arg.items()}
418 else:
419 # key used for column selection and output
420 results = {
421 key: obj._gotitem(key, ndim=1).agg(how) for key, how in arg.items()
422 }
423
424 # set the final keys
425 keys = list(arg.keys())
426
427 # Avoid making two isinstance calls in all and any below
428 is_ndframe = [isinstance(r, ABCNDFrame) for r in results.values()]
429
430 # combine results
431 if all(is_ndframe):
432 keys_to_use: Iterable[Hashable]
433 keys_to_use = [k for k in keys if not results[k].empty]
434 # Have to check, if at least one DataFrame is not empty.
435 keys_to_use = keys_to_use if keys_to_use != [] else keys
436 if selected_obj.ndim == 2:
437 # keys are columns, so we can preserve names
438 ktu = Index(keys_to_use)
439 ktu._set_names(selected_obj.columns.names)
440 keys_to_use = ktu
441
442 axis: AxisInt = 0 if isinstance(obj, ABCSeries) else 1
443 result = concat(
444 {k: results[k] for k in keys_to_use},
445 axis=axis,
446 keys=keys_to_use,
447 )
448 elif any(is_ndframe):
449 # There is a mix of NDFrames and scalars
450 raise ValueError(
451 "cannot perform both aggregation "
452 "and transformation operations "
453 "simultaneously"
454 )
455 else:
456 from pandas import Series
457
458 # we have a dict of scalars
459 # GH 36212 use name only if obj is a series
460 if obj.ndim == 1:
461 obj = cast("Series", obj)
462 name = obj.name
463 else:
464 name = None
465
466 result = Series(results, name=name)
467
468 return result
469
470 def apply_str(self) -> DataFrame | Series:
471 """
472 Compute apply in case of a string.
473
474 Returns
475 -------
476 result: Series or DataFrame
477 """
478 # Caller is responsible for checking isinstance(self.f, str)
479 f = cast(str, self.f)
480
481 obj = self.obj
482
483 # Support for `frame.transform('method')`
484 # Some methods (shift, etc.) require the axis argument, others
485 # don't, so inspect and insert if necessary.
486 func = getattr(obj, f, None)
487 if callable(func):
488 sig = inspect.getfullargspec(func)
489 arg_names = (*sig.args, *sig.kwonlyargs)
490 if self.axis != 0 and (
491 "axis" not in arg_names or f in ("corrwith", "skew")
492 ):
493 raise ValueError(f"Operation {f} does not support axis=1")
494 if "axis" in arg_names:
495 self.kwargs["axis"] = self.axis
496 return self._try_aggregate_string_function(obj, f, *self.args, **self.kwargs)
497
498 def apply_multiple(self) -> DataFrame | Series:
499 """
500 Compute apply in case of a list-like or dict-like.
501
502 Returns
503 -------
504 result: Series, DataFrame, or None
505 Result when self.f is a list-like or dict-like, None otherwise.
506 """
507 return self.obj.aggregate(self.f, self.axis, *self.args, **self.kwargs)
508
509 def normalize_dictlike_arg(
510 self, how: str, obj: DataFrame | Series, func: AggFuncTypeDict
511 ) -> AggFuncTypeDict:
512 """
513 Handler for dict-like argument.
514
515 Ensures that necessary columns exist if obj is a DataFrame, and
516 that a nested renamer is not passed. Also normalizes to all lists
517 when values consists of a mix of list and non-lists.
518 """
519 assert how in ("apply", "agg", "transform")
520
521 # Can't use func.values(); wouldn't work for a Series
522 if (
523 how == "agg"
524 and isinstance(obj, ABCSeries)
525 and any(is_list_like(v) for _, v in func.items())
526 ) or (any(is_dict_like(v) for _, v in func.items())):
527 # GH 15931 - deprecation of renaming keys
528 raise SpecificationError("nested renamer is not supported")
529
530 if obj.ndim != 1:
531 # Check for missing columns on a frame
532 cols = set(func.keys()) - set(obj.columns)
533 if len(cols) > 0:
534 cols_sorted = list(safe_sort(list(cols)))
535 raise KeyError(f"Column(s) {cols_sorted} do not exist")
536
537 aggregator_types = (list, tuple, dict)
538
539 # if we have a dict of any non-scalars
540 # eg. {'A' : ['mean']}, normalize all to
541 # be list-likes
542 # Cannot use func.values() because arg may be a Series
543 if any(isinstance(x, aggregator_types) for _, x in func.items()):
544 new_func: AggFuncTypeDict = {}
545 for k, v in func.items():
546 if not isinstance(v, aggregator_types):
547 new_func[k] = [v]
548 else:
549 new_func[k] = v
550 func = new_func
551 return func
552
553 def _try_aggregate_string_function(self, obj, arg: str, *args, **kwargs):
554 """
555 if arg is a string, then try to operate on it:
556 - try to find a function (or attribute) on ourselves
557 - try to find a numpy function
558 - raise
559 """
560 assert isinstance(arg, str)
561
562 f = getattr(obj, arg, None)
563 if f is not None:
564 if callable(f):
565 return f(*args, **kwargs)
566
567 # people may try to aggregate on a non-callable attribute
568 # but don't let them think they can pass args to it
569 assert len(args) == 0
570 assert len([kwarg for kwarg in kwargs if kwarg not in ["axis"]]) == 0
571 return f
572
573 f = getattr(np, arg, None)
574 if f is not None and hasattr(obj, "__array__"):
575 # in particular exclude Window
576 return f(obj, *args, **kwargs)
577
578 raise AttributeError(
579 f"'{arg}' is not a valid function for '{type(obj).__name__}' object"
580 )
581
582
583class NDFrameApply(Apply):
584 """
585 Methods shared by FrameApply and SeriesApply but
586 not GroupByApply or ResamplerWindowApply
587 """
588
589 obj: DataFrame | Series
590
591 @property
592 def index(self) -> Index:
593 return self.obj.index
594
595 @property
596 def agg_axis(self) -> Index:
597 return self.obj._get_agg_axis(self.axis)
598
599
600class FrameApply(NDFrameApply):
601 obj: DataFrame
602
603 # ---------------------------------------------------------------
604 # Abstract Methods
605
606 @property
607 @abc.abstractmethod
608 def result_index(self) -> Index:
609 pass
610
611 @property
612 @abc.abstractmethod
613 def result_columns(self) -> Index:
614 pass
615
616 @property
617 @abc.abstractmethod
618 def series_generator(self) -> Iterator[Series]:
619 pass
620
621 @abc.abstractmethod
622 def wrap_results_for_axis(
623 self, results: ResType, res_index: Index
624 ) -> DataFrame | Series:
625 pass
626
627 # ---------------------------------------------------------------
628
629 @property
630 def res_columns(self) -> Index:
631 return self.result_columns
632
633 @property
634 def columns(self) -> Index:
635 return self.obj.columns
636
637 @cache_readonly
638 def values(self):
639 return self.obj.values
640
641 @cache_readonly
642 def dtypes(self) -> Series:
643 return self.obj.dtypes
644
645 def apply(self) -> DataFrame | Series:
646 """compute the results"""
647 # dispatch to agg
648 if is_list_like(self.f):
649 return self.apply_multiple()
650
651 # all empty
652 if len(self.columns) == 0 and len(self.index) == 0:
653 return self.apply_empty_result()
654
655 # string dispatch
656 if isinstance(self.f, str):
657 return self.apply_str()
658
659 # ufunc
660 elif isinstance(self.f, np.ufunc):
661 with np.errstate(all="ignore"):
662 results = self.obj._mgr.apply("apply", func=self.f)
663 # _constructor will retain self.index and self.columns
664 return self.obj._constructor(data=results)
665
666 # broadcasting
667 if self.result_type == "broadcast":
668 return self.apply_broadcast(self.obj)
669
670 # one axis empty
671 elif not all(self.obj.shape):
672 return self.apply_empty_result()
673
674 # raw
675 elif self.raw:
676 return self.apply_raw()
677
678 return self.apply_standard()
679
680 def agg(self):
681 obj = self.obj
682 axis = self.axis
683
684 # TODO: Avoid having to change state
685 self.obj = self.obj if self.axis == 0 else self.obj.T
686 self.axis = 0
687
688 result = None
689 try:
690 result = super().agg()
691 finally:
692 self.obj = obj
693 self.axis = axis
694
695 if axis == 1:
696 result = result.T if result is not None else result
697
698 if result is None:
699 result = self.obj.apply(self.orig_f, axis, args=self.args, **self.kwargs)
700
701 return result
702
703 def apply_empty_result(self):
704 """
705 we have an empty result; at least 1 axis is 0
706
707 we will try to apply the function to an empty
708 series in order to see if this is a reduction function
709 """
710 assert callable(self.f)
711
712 # we are not asked to reduce or infer reduction
713 # so just return a copy of the existing object
714 if self.result_type not in ["reduce", None]:
715 return self.obj.copy()
716
717 # we may need to infer
718 should_reduce = self.result_type == "reduce"
719
720 from pandas import Series
721
722 if not should_reduce:
723 try:
724 if self.axis == 0:
725 r = self.f(Series([], dtype=np.float64))
726 else:
727 r = self.f(Series(index=self.columns, dtype=np.float64))
728 except Exception:
729 pass
730 else:
731 should_reduce = not isinstance(r, Series)
732
733 if should_reduce:
734 if len(self.agg_axis):
735 r = self.f(Series([], dtype=np.float64))
736 else:
737 r = np.nan
738
739 return self.obj._constructor_sliced(r, index=self.agg_axis)
740 else:
741 return self.obj.copy()
742
743 def apply_raw(self):
744 """apply to the values as a numpy array"""
745
746 def wrap_function(func):
747 """
748 Wrap user supplied function to work around numpy issue.
749
750 see https://github.com/numpy/numpy/issues/8352
751 """
752
753 def wrapper(*args, **kwargs):
754 result = func(*args, **kwargs)
755 if isinstance(result, str):
756 result = np.array(result, dtype=object)
757 return result
758
759 return wrapper
760
761 result = np.apply_along_axis(wrap_function(self.f), self.axis, self.values)
762
763 # TODO: mixed type case
764 if result.ndim == 2:
765 return self.obj._constructor(result, index=self.index, columns=self.columns)
766 else:
767 return self.obj._constructor_sliced(result, index=self.agg_axis)
768
769 def apply_broadcast(self, target: DataFrame) -> DataFrame:
770 assert callable(self.f)
771
772 result_values = np.empty_like(target.values)
773
774 # axis which we want to compare compliance
775 result_compare = target.shape[0]
776
777 for i, col in enumerate(target.columns):
778 res = self.f(target[col])
779 ares = np.asarray(res).ndim
780
781 # must be a scalar or 1d
782 if ares > 1:
783 raise ValueError("too many dims to broadcast")
784 if ares == 1:
785 # must match return dim
786 if result_compare != len(res):
787 raise ValueError("cannot broadcast result")
788
789 result_values[:, i] = res
790
791 # we *always* preserve the original index / columns
792 result = self.obj._constructor(
793 result_values, index=target.index, columns=target.columns
794 )
795 return result
796
797 def apply_standard(self):
798 results, res_index = self.apply_series_generator()
799
800 # wrap results
801 return self.wrap_results(results, res_index)
802
803 def apply_series_generator(self) -> tuple[ResType, Index]:
804 assert callable(self.f)
805
806 series_gen = self.series_generator
807 res_index = self.result_index
808
809 results = {}
810
811 with option_context("mode.chained_assignment", None):
812 for i, v in enumerate(series_gen):
813 # ignore SettingWithCopy here in case the user mutates
814 results[i] = self.f(v)
815 if isinstance(results[i], ABCSeries):
816 # If we have a view on v, we need to make a copy because
817 # series_generator will swap out the underlying data
818 results[i] = results[i].copy(deep=False)
819
820 return results, res_index
821
822 def wrap_results(self, results: ResType, res_index: Index) -> DataFrame | Series:
823 from pandas import Series
824
825 # see if we can infer the results
826 if len(results) > 0 and 0 in results and is_sequence(results[0]):
827 return self.wrap_results_for_axis(results, res_index)
828
829 # dict of scalars
830
831 # the default dtype of an empty Series is `object`, but this
832 # code can be hit by df.mean() where the result should have dtype
833 # float64 even if it's an empty Series.
834 constructor_sliced = self.obj._constructor_sliced
835 if len(results) == 0 and constructor_sliced is Series:
836 result = constructor_sliced(results, dtype=np.float64)
837 else:
838 result = constructor_sliced(results)
839 result.index = res_index
840
841 return result
842
843 def apply_str(self) -> DataFrame | Series:
844 # Caller is responsible for checking isinstance(self.f, str)
845 # TODO: GH#39993 - Avoid special-casing by replacing with lambda
846 if self.f == "size":
847 # Special-cased because DataFrame.size returns a single scalar
848 obj = self.obj
849 value = obj.shape[self.axis]
850 return obj._constructor_sliced(value, index=self.agg_axis)
851 return super().apply_str()
852
853
854class FrameRowApply(FrameApply):
855 axis: AxisInt = 0
856
857 @property
858 def series_generator(self):
859 return (self.obj._ixs(i, axis=1) for i in range(len(self.columns)))
860
861 @property
862 def result_index(self) -> Index:
863 return self.columns
864
865 @property
866 def result_columns(self) -> Index:
867 return self.index
868
869 def wrap_results_for_axis(
870 self, results: ResType, res_index: Index
871 ) -> DataFrame | Series:
872 """return the results for the rows"""
873
874 if self.result_type == "reduce":
875 # e.g. test_apply_dict GH#8735
876 res = self.obj._constructor_sliced(results)
877 res.index = res_index
878 return res
879
880 elif self.result_type is None and all(
881 isinstance(x, dict) for x in results.values()
882 ):
883 # Our operation was a to_dict op e.g.
884 # test_apply_dict GH#8735, test_apply_reduce_to_dict GH#25196 #37544
885 res = self.obj._constructor_sliced(results)
886 res.index = res_index
887 return res
888
889 try:
890 result = self.obj._constructor(data=results)
891 except ValueError as err:
892 if "All arrays must be of the same length" in str(err):
893 # e.g. result = [[2, 3], [1.5], ['foo', 'bar']]
894 # see test_agg_listlike_result GH#29587
895 res = self.obj._constructor_sliced(results)
896 res.index = res_index
897 return res
898 else:
899 raise
900
901 if not isinstance(results[0], ABCSeries):
902 if len(result.index) == len(self.res_columns):
903 result.index = self.res_columns
904
905 if len(result.columns) == len(res_index):
906 result.columns = res_index
907
908 return result
909
910
911class FrameColumnApply(FrameApply):
912 axis: AxisInt = 1
913
914 def apply_broadcast(self, target: DataFrame) -> DataFrame:
915 result = super().apply_broadcast(target.T)
916 return result.T
917
918 @property
919 def series_generator(self):
920 values = self.values
921 values = ensure_wrapped_if_datetimelike(values)
922 assert len(values) > 0
923
924 # We create one Series object, and will swap out the data inside
925 # of it. Kids: don't do this at home.
926 ser = self.obj._ixs(0, axis=0)
927 mgr = ser._mgr
928
929 if is_extension_array_dtype(ser.dtype):
930 # values will be incorrect for this block
931 # TODO(EA2D): special case would be unnecessary with 2D EAs
932 obj = self.obj
933 for i in range(len(obj)):
934 yield obj._ixs(i, axis=0)
935
936 else:
937 for arr, name in zip(values, self.index):
938 # GH#35462 re-pin mgr in case setitem changed it
939 ser._mgr = mgr
940 mgr.set_values(arr)
941 object.__setattr__(ser, "_name", name)
942 yield ser
943
944 @property
945 def result_index(self) -> Index:
946 return self.index
947
948 @property
949 def result_columns(self) -> Index:
950 return self.columns
951
952 def wrap_results_for_axis(
953 self, results: ResType, res_index: Index
954 ) -> DataFrame | Series:
955 """return the results for the columns"""
956 result: DataFrame | Series
957
958 # we have requested to expand
959 if self.result_type == "expand":
960 result = self.infer_to_same_shape(results, res_index)
961
962 # we have a non-series and don't want inference
963 elif not isinstance(results[0], ABCSeries):
964 result = self.obj._constructor_sliced(results)
965 result.index = res_index
966
967 # we may want to infer results
968 else:
969 result = self.infer_to_same_shape(results, res_index)
970
971 return result
972
973 def infer_to_same_shape(self, results: ResType, res_index: Index) -> DataFrame:
974 """infer the results to the same shape as the input object"""
975 result = self.obj._constructor(data=results)
976 result = result.T
977
978 # set the index
979 result.index = res_index
980
981 # infer dtypes
982 result = result.infer_objects(copy=False)
983
984 return result
985
986
987class SeriesApply(NDFrameApply):
988 obj: Series
989 axis: AxisInt = 0
990
991 def __init__(
992 self,
993 obj: Series,
994 func: AggFuncType,
995 convert_dtype: bool,
996 args,
997 kwargs,
998 ) -> None:
999 self.convert_dtype = convert_dtype
1000
1001 super().__init__(
1002 obj,
1003 func,
1004 raw=False,
1005 result_type=None,
1006 args=args,
1007 kwargs=kwargs,
1008 )
1009
1010 def apply(self) -> DataFrame | Series:
1011 obj = self.obj
1012
1013 if len(obj) == 0:
1014 return self.apply_empty_result()
1015
1016 # dispatch to agg
1017 if is_list_like(self.f):
1018 return self.apply_multiple()
1019
1020 if isinstance(self.f, str):
1021 # if we are a string, try to dispatch
1022 return self.apply_str()
1023
1024 # self.f is Callable
1025 return self.apply_standard()
1026
1027 def agg(self):
1028 result = super().agg()
1029 if result is None:
1030 f = self.f
1031 kwargs = self.kwargs
1032
1033 # string, list-like, and dict-like are entirely handled in super
1034 assert callable(f)
1035
1036 # we can be called from an inner function which
1037 # passes this meta-data
1038 kwargs.pop("_level", None)
1039
1040 # try a regular apply, this evaluates lambdas
1041 # row-by-row; however if the lambda is expected a Series
1042 # expression, e.g.: lambda x: x-x.quantile(0.25)
1043 # this will fail, so we can try a vectorized evaluation
1044
1045 # we cannot FIRST try the vectorized evaluation, because
1046 # then .agg and .apply would have different semantics if the
1047 # operation is actually defined on the Series, e.g. str
1048 try:
1049 result = self.obj.apply(f)
1050 except (ValueError, AttributeError, TypeError):
1051 result = f(self.obj)
1052
1053 return result
1054
1055 def apply_empty_result(self) -> Series:
1056 obj = self.obj
1057 return obj._constructor(dtype=obj.dtype, index=obj.index).__finalize__(
1058 obj, method="apply"
1059 )
1060
1061 def apply_standard(self) -> DataFrame | Series:
1062 # caller is responsible for ensuring that f is Callable
1063 f = cast(Callable, self.f)
1064 obj = self.obj
1065
1066 with np.errstate(all="ignore"):
1067 if isinstance(f, np.ufunc):
1068 return f(obj)
1069
1070 # row-wise access
1071 if is_extension_array_dtype(obj.dtype) and hasattr(obj._values, "map"):
1072 # GH#23179 some EAs do not have `map`
1073 mapped = obj._values.map(f)
1074 else:
1075 values = obj.astype(object)._values
1076 mapped = lib.map_infer(
1077 values,
1078 f,
1079 convert=self.convert_dtype,
1080 )
1081
1082 if len(mapped) and isinstance(mapped[0], ABCSeries):
1083 # GH#43986 Need to do list(mapped) in order to get treated as nested
1084 # See also GH#25959 regarding EA support
1085 return obj._constructor_expanddim(list(mapped), index=obj.index)
1086 else:
1087 return obj._constructor(mapped, index=obj.index).__finalize__(
1088 obj, method="apply"
1089 )
1090
1091
1092class GroupByApply(Apply):
1093 def __init__(
1094 self,
1095 obj: GroupBy[NDFrameT],
1096 func: AggFuncType,
1097 args,
1098 kwargs,
1099 ) -> None:
1100 kwargs = kwargs.copy()
1101 self.axis = obj.obj._get_axis_number(kwargs.get("axis", 0))
1102 super().__init__(
1103 obj,
1104 func,
1105 raw=False,
1106 result_type=None,
1107 args=args,
1108 kwargs=kwargs,
1109 )
1110
1111 def apply(self):
1112 raise NotImplementedError
1113
1114 def transform(self):
1115 raise NotImplementedError
1116
1117
1118class ResamplerWindowApply(Apply):
1119 axis: AxisInt = 0
1120 obj: Resampler | BaseWindow
1121
1122 def __init__(
1123 self,
1124 obj: Resampler | BaseWindow,
1125 func: AggFuncType,
1126 args,
1127 kwargs,
1128 ) -> None:
1129 super().__init__(
1130 obj,
1131 func,
1132 raw=False,
1133 result_type=None,
1134 args=args,
1135 kwargs=kwargs,
1136 )
1137
1138 def apply(self):
1139 raise NotImplementedError
1140
1141 def transform(self):
1142 raise NotImplementedError
1143
1144
1145def reconstruct_func(
1146 func: AggFuncType | None, **kwargs
1147) -> tuple[bool, AggFuncType | None, list[str] | None, npt.NDArray[np.intp] | None]:
1148 """
1149 This is the internal function to reconstruct func given if there is relabeling
1150 or not and also normalize the keyword to get new order of columns.
1151
1152 If named aggregation is applied, `func` will be None, and kwargs contains the
1153 column and aggregation function information to be parsed;
1154 If named aggregation is not applied, `func` is either string (e.g. 'min') or
1155 Callable, or list of them (e.g. ['min', np.max]), or the dictionary of column name
1156 and str/Callable/list of them (e.g. {'A': 'min'}, or {'A': [np.min, lambda x: x]})
1157
1158 If relabeling is True, will return relabeling, reconstructed func, column
1159 names, and the reconstructed order of columns.
1160 If relabeling is False, the columns and order will be None.
1161
1162 Parameters
1163 ----------
1164 func: agg function (e.g. 'min' or Callable) or list of agg functions
1165 (e.g. ['min', np.max]) or dictionary (e.g. {'A': ['min', np.max]}).
1166 **kwargs: dict, kwargs used in is_multi_agg_with_relabel and
1167 normalize_keyword_aggregation function for relabelling
1168
1169 Returns
1170 -------
1171 relabelling: bool, if there is relabelling or not
1172 func: normalized and mangled func
1173 columns: list of column names
1174 order: array of columns indices
1175
1176 Examples
1177 --------
1178 >>> reconstruct_func(None, **{"foo": ("col", "min")})
1179 (True, defaultdict(<class 'list'>, {'col': ['min']}), ('foo',), array([0]))
1180
1181 >>> reconstruct_func("min")
1182 (False, 'min', None, None)
1183 """
1184 relabeling = func is None and is_multi_agg_with_relabel(**kwargs)
1185 columns: list[str] | None = None
1186 order: npt.NDArray[np.intp] | None = None
1187
1188 if not relabeling:
1189 if isinstance(func, list) and len(func) > len(set(func)):
1190 # GH 28426 will raise error if duplicated function names are used and
1191 # there is no reassigned name
1192 raise SpecificationError(
1193 "Function names must be unique if there is no new column names "
1194 "assigned"
1195 )
1196 if func is None:
1197 # nicer error message
1198 raise TypeError("Must provide 'func' or tuples of '(column, aggfunc).")
1199
1200 if relabeling:
1201 func, columns, order = normalize_keyword_aggregation(kwargs)
1202
1203 return relabeling, func, columns, order
1204
1205
1206def is_multi_agg_with_relabel(**kwargs) -> bool:
1207 """
1208 Check whether kwargs passed to .agg look like multi-agg with relabeling.
1209
1210 Parameters
1211 ----------
1212 **kwargs : dict
1213
1214 Returns
1215 -------
1216 bool
1217
1218 Examples
1219 --------
1220 >>> is_multi_agg_with_relabel(a="max")
1221 False
1222 >>> is_multi_agg_with_relabel(a_max=("a", "max"), a_min=("a", "min"))
1223 True
1224 >>> is_multi_agg_with_relabel()
1225 False
1226 """
1227 return all(isinstance(v, tuple) and len(v) == 2 for v in kwargs.values()) and (
1228 len(kwargs) > 0
1229 )
1230
1231
1232def normalize_keyword_aggregation(
1233 kwargs: dict,
1234) -> tuple[dict, list[str], npt.NDArray[np.intp]]:
1235 """
1236 Normalize user-provided "named aggregation" kwargs.
1237 Transforms from the new ``Mapping[str, NamedAgg]`` style kwargs
1238 to the old Dict[str, List[scalar]]].
1239
1240 Parameters
1241 ----------
1242 kwargs : dict
1243
1244 Returns
1245 -------
1246 aggspec : dict
1247 The transformed kwargs.
1248 columns : List[str]
1249 The user-provided keys.
1250 col_idx_order : List[int]
1251 List of columns indices.
1252
1253 Examples
1254 --------
1255 >>> normalize_keyword_aggregation({"output": ("input", "sum")})
1256 (defaultdict(<class 'list'>, {'input': ['sum']}), ('output',), array([0]))
1257 """
1258 from pandas.core.indexes.base import Index
1259
1260 # Normalize the aggregation functions as Mapping[column, List[func]],
1261 # process normally, then fixup the names.
1262 # TODO: aggspec type: typing.Dict[str, List[AggScalar]]
1263 # May be hitting https://github.com/python/mypy/issues/5958
1264 # saying it doesn't have an attribute __name__
1265 aggspec: DefaultDict = defaultdict(list)
1266 order = []
1267 columns, pairs = list(zip(*kwargs.items()))
1268
1269 for column, aggfunc in pairs:
1270 aggspec[column].append(aggfunc)
1271 order.append((column, com.get_callable_name(aggfunc) or aggfunc))
1272
1273 # uniquify aggfunc name if duplicated in order list
1274 uniquified_order = _make_unique_kwarg_list(order)
1275
1276 # GH 25719, due to aggspec will change the order of assigned columns in aggregation
1277 # uniquified_aggspec will store uniquified order list and will compare it with order
1278 # based on index
1279 aggspec_order = [
1280 (column, com.get_callable_name(aggfunc) or aggfunc)
1281 for column, aggfuncs in aggspec.items()
1282 for aggfunc in aggfuncs
1283 ]
1284 uniquified_aggspec = _make_unique_kwarg_list(aggspec_order)
1285
1286 # get the new index of columns by comparison
1287 col_idx_order = Index(uniquified_aggspec).get_indexer(uniquified_order)
1288 return aggspec, columns, col_idx_order
1289
1290
1291def _make_unique_kwarg_list(
1292 seq: Sequence[tuple[Any, Any]]
1293) -> Sequence[tuple[Any, Any]]:
1294 """
1295 Uniquify aggfunc name of the pairs in the order list
1296
1297 Examples:
1298 --------
1299 >>> kwarg_list = [('a', '<lambda>'), ('a', '<lambda>'), ('b', '<lambda>')]
1300 >>> _make_unique_kwarg_list(kwarg_list)
1301 [('a', '<lambda>_0'), ('a', '<lambda>_1'), ('b', '<lambda>')]
1302 """
1303 return [
1304 (pair[0], f"{pair[1]}_{seq[:i].count(pair)}") if seq.count(pair) > 1 else pair
1305 for i, pair in enumerate(seq)
1306 ]
1307
1308
1309def relabel_result(
1310 result: DataFrame | Series,
1311 func: dict[str, list[Callable | str]],
1312 columns: Iterable[Hashable],
1313 order: Iterable[int],
1314) -> dict[Hashable, Series]:
1315 """
1316 Internal function to reorder result if relabelling is True for
1317 dataframe.agg, and return the reordered result in dict.
1318
1319 Parameters:
1320 ----------
1321 result: Result from aggregation
1322 func: Dict of (column name, funcs)
1323 columns: New columns name for relabelling
1324 order: New order for relabelling
1325
1326 Examples:
1327 ---------
1328 >>> result = DataFrame({"A": [np.nan, 2, np.nan],
1329 ... "C": [6, np.nan, np.nan], "B": [np.nan, 4, 2.5]}) # doctest: +SKIP
1330 >>> funcs = {"A": ["max"], "C": ["max"], "B": ["mean", "min"]}
1331 >>> columns = ("foo", "aab", "bar", "dat")
1332 >>> order = [0, 1, 2, 3]
1333 >>> _relabel_result(result, func, columns, order) # doctest: +SKIP
1334 dict(A=Series([2.0, NaN, NaN, NaN], index=["foo", "aab", "bar", "dat"]),
1335 C=Series([NaN, 6.0, NaN, NaN], index=["foo", "aab", "bar", "dat"]),
1336 B=Series([NaN, NaN, 2.5, 4.0], index=["foo", "aab", "bar", "dat"]))
1337 """
1338 from pandas.core.indexes.base import Index
1339
1340 reordered_indexes = [
1341 pair[0] for pair in sorted(zip(columns, order), key=lambda t: t[1])
1342 ]
1343 reordered_result_in_dict: dict[Hashable, Series] = {}
1344 idx = 0
1345
1346 reorder_mask = not isinstance(result, ABCSeries) and len(result.columns) > 1
1347 for col, fun in func.items():
1348 s = result[col].dropna()
1349
1350 # In the `_aggregate`, the callable names are obtained and used in `result`, and
1351 # these names are ordered alphabetically. e.g.
1352 # C2 C1
1353 # <lambda> 1 NaN
1354 # amax NaN 4.0
1355 # max NaN 4.0
1356 # sum 18.0 6.0
1357 # Therefore, the order of functions for each column could be shuffled
1358 # accordingly so need to get the callable name if it is not parsed names, and
1359 # reorder the aggregated result for each column.
1360 # e.g. if df.agg(c1=("C2", sum), c2=("C2", lambda x: min(x))), correct order is
1361 # [sum, <lambda>], but in `result`, it will be [<lambda>, sum], and we need to
1362 # reorder so that aggregated values map to their functions regarding the order.
1363
1364 # However there is only one column being used for aggregation, not need to
1365 # reorder since the index is not sorted, and keep as is in `funcs`, e.g.
1366 # A
1367 # min 1.0
1368 # mean 1.5
1369 # mean 1.5
1370 if reorder_mask:
1371 fun = [
1372 com.get_callable_name(f) if not isinstance(f, str) else f for f in fun
1373 ]
1374 col_idx_order = Index(s.index).get_indexer(fun)
1375 s = s[col_idx_order]
1376
1377 # assign the new user-provided "named aggregation" as index names, and reindex
1378 # it based on the whole user-provided names.
1379 s.index = reordered_indexes[idx : idx + len(fun)]
1380 reordered_result_in_dict[col] = s.reindex(columns, copy=False)
1381 idx = idx + len(fun)
1382 return reordered_result_in_dict
1383
1384
1385# TODO: Can't use, because mypy doesn't like us setting __name__
1386# error: "partial[Any]" has no attribute "__name__"
1387# the type is:
1388# typing.Sequence[Callable[..., ScalarResult]]
1389# -> typing.Sequence[Callable[..., ScalarResult]]:
1390
1391
1392def _managle_lambda_list(aggfuncs: Sequence[Any]) -> Sequence[Any]:
1393 """
1394 Possibly mangle a list of aggfuncs.
1395
1396 Parameters
1397 ----------
1398 aggfuncs : Sequence
1399
1400 Returns
1401 -------
1402 mangled: list-like
1403 A new AggSpec sequence, where lambdas have been converted
1404 to have unique names.
1405
1406 Notes
1407 -----
1408 If just one aggfunc is passed, the name will not be mangled.
1409 """
1410 if len(aggfuncs) <= 1:
1411 # don't mangle for .agg([lambda x: .])
1412 return aggfuncs
1413 i = 0
1414 mangled_aggfuncs = []
1415 for aggfunc in aggfuncs:
1416 if com.get_callable_name(aggfunc) == "<lambda>":
1417 aggfunc = partial(aggfunc)
1418 aggfunc.__name__ = f"<lambda_{i}>"
1419 i += 1
1420 mangled_aggfuncs.append(aggfunc)
1421
1422 return mangled_aggfuncs
1423
1424
1425def maybe_mangle_lambdas(agg_spec: Any) -> Any:
1426 """
1427 Make new lambdas with unique names.
1428
1429 Parameters
1430 ----------
1431 agg_spec : Any
1432 An argument to GroupBy.agg.
1433 Non-dict-like `agg_spec` are pass through as is.
1434 For dict-like `agg_spec` a new spec is returned
1435 with name-mangled lambdas.
1436
1437 Returns
1438 -------
1439 mangled : Any
1440 Same type as the input.
1441
1442 Examples
1443 --------
1444 >>> maybe_mangle_lambdas('sum')
1445 'sum'
1446 >>> maybe_mangle_lambdas([lambda: 1, lambda: 2]) # doctest: +SKIP
1447 [<function __main__.<lambda_0>,
1448 <function pandas...._make_lambda.<locals>.f(*args, **kwargs)>]
1449 """
1450 is_dict = is_dict_like(agg_spec)
1451 if not (is_dict or is_list_like(agg_spec)):
1452 return agg_spec
1453 mangled_aggspec = type(agg_spec)() # dict or OrderedDict
1454
1455 if is_dict:
1456 for key, aggfuncs in agg_spec.items():
1457 if is_list_like(aggfuncs) and not is_dict_like(aggfuncs):
1458 mangled_aggfuncs = _managle_lambda_list(aggfuncs)
1459 else:
1460 mangled_aggfuncs = aggfuncs
1461
1462 mangled_aggspec[key] = mangled_aggfuncs
1463 else:
1464 mangled_aggspec = _managle_lambda_list(agg_spec)
1465
1466 return mangled_aggspec
1467
1468
1469def validate_func_kwargs(
1470 kwargs: dict,
1471) -> tuple[list[str], list[str | Callable[..., Any]]]:
1472 """
1473 Validates types of user-provided "named aggregation" kwargs.
1474 `TypeError` is raised if aggfunc is not `str` or callable.
1475
1476 Parameters
1477 ----------
1478 kwargs : dict
1479
1480 Returns
1481 -------
1482 columns : List[str]
1483 List of user-provied keys.
1484 func : List[Union[str, callable[...,Any]]]
1485 List of user-provided aggfuncs
1486
1487 Examples
1488 --------
1489 >>> validate_func_kwargs({'one': 'min', 'two': 'max'})
1490 (['one', 'two'], ['min', 'max'])
1491 """
1492 tuple_given_message = "func is expected but received {} in **kwargs."
1493 columns = list(kwargs)
1494 func = []
1495 for col_func in kwargs.values():
1496 if not (isinstance(col_func, str) or callable(col_func)):
1497 raise TypeError(tuple_given_message.format(type(col_func).__name__))
1498 func.append(col_func)
1499 if not columns:
1500 no_arg_message = "Must provide 'func' or named aggregation **kwargs."
1501 raise TypeError(no_arg_message)
1502 return columns, func