1"""Common utility functions for rolling operations"""
2from __future__ import annotations
3
4from collections import defaultdict
5from typing import cast
6
7import numpy as np
8
9from pandas.core.dtypes.generic import (
10 ABCDataFrame,
11 ABCSeries,
12)
13
14from pandas.core.indexes.api import MultiIndex
15
16
17def flex_binary_moment(arg1, arg2, f, pairwise: bool = False):
18 if isinstance(arg1, ABCSeries) and isinstance(arg2, ABCSeries):
19 X, Y = prep_binary(arg1, arg2)
20 return f(X, Y)
21
22 elif isinstance(arg1, ABCDataFrame):
23 from pandas import DataFrame
24
25 def dataframe_from_int_dict(data, frame_template) -> DataFrame:
26 result = DataFrame(data, index=frame_template.index)
27 if len(result.columns) > 0:
28 result.columns = frame_template.columns[result.columns]
29 else:
30 result.columns = frame_template.columns.copy()
31 return result
32
33 results = {}
34 if isinstance(arg2, ABCDataFrame):
35 if pairwise is False:
36 if arg1 is arg2:
37 # special case in order to handle duplicate column names
38 for i in range(len(arg1.columns)):
39 results[i] = f(arg1.iloc[:, i], arg2.iloc[:, i])
40 return dataframe_from_int_dict(results, arg1)
41 else:
42 if not arg1.columns.is_unique:
43 raise ValueError("'arg1' columns are not unique")
44 if not arg2.columns.is_unique:
45 raise ValueError("'arg2' columns are not unique")
46 X, Y = arg1.align(arg2, join="outer")
47 X, Y = prep_binary(X, Y)
48 res_columns = arg1.columns.union(arg2.columns)
49 for col in res_columns:
50 if col in X and col in Y:
51 results[col] = f(X[col], Y[col])
52 return DataFrame(results, index=X.index, columns=res_columns)
53 elif pairwise is True:
54 results = defaultdict(dict)
55 for i in range(len(arg1.columns)):
56 for j in range(len(arg2.columns)):
57 if j < i and arg2 is arg1:
58 # Symmetric case
59 results[i][j] = results[j][i]
60 else:
61 results[i][j] = f(
62 *prep_binary(arg1.iloc[:, i], arg2.iloc[:, j])
63 )
64
65 from pandas import concat
66
67 result_index = arg1.index.union(arg2.index)
68 if len(result_index):
69 # construct result frame
70 result = concat(
71 [
72 concat(
73 [results[i][j] for j in range(len(arg2.columns))],
74 ignore_index=True,
75 )
76 for i in range(len(arg1.columns))
77 ],
78 ignore_index=True,
79 axis=1,
80 )
81 result.columns = arg1.columns
82
83 # set the index and reorder
84 if arg2.columns.nlevels > 1:
85 # mypy needs to know columns is a MultiIndex, Index doesn't
86 # have levels attribute
87 arg2.columns = cast(MultiIndex, arg2.columns)
88 # GH 21157: Equivalent to MultiIndex.from_product(
89 # [result_index], <unique combinations of arg2.columns.levels>,
90 # )
91 # A normal MultiIndex.from_product will produce too many
92 # combinations.
93 result_level = np.tile(
94 result_index, len(result) // len(result_index)
95 )
96 arg2_levels = (
97 np.repeat(
98 arg2.columns.get_level_values(i),
99 len(result) // len(arg2.columns),
100 )
101 for i in range(arg2.columns.nlevels)
102 )
103 result_names = list(arg2.columns.names) + [result_index.name]
104 result.index = MultiIndex.from_arrays(
105 [*arg2_levels, result_level], names=result_names
106 )
107 # GH 34440
108 num_levels = len(result.index.levels)
109 new_order = [num_levels - 1] + list(range(num_levels - 1))
110 result = result.reorder_levels(new_order).sort_index()
111 else:
112 result.index = MultiIndex.from_product(
113 [range(len(arg2.columns)), range(len(result_index))]
114 )
115 result = result.swaplevel(1, 0).sort_index()
116 result.index = MultiIndex.from_product(
117 [result_index] + [arg2.columns]
118 )
119 else:
120 # empty result
121 result = DataFrame(
122 index=MultiIndex(
123 levels=[arg1.index, arg2.columns], codes=[[], []]
124 ),
125 columns=arg2.columns,
126 dtype="float64",
127 )
128
129 # reset our index names to arg1 names
130 # reset our column names to arg2 names
131 # careful not to mutate the original names
132 result.columns = result.columns.set_names(arg1.columns.names)
133 result.index = result.index.set_names(
134 result_index.names + arg2.columns.names
135 )
136
137 return result
138 else:
139 results = {
140 i: f(*prep_binary(arg1.iloc[:, i], arg2))
141 for i in range(len(arg1.columns))
142 }
143 return dataframe_from_int_dict(results, arg1)
144
145 else:
146 return flex_binary_moment(arg2, arg1, f)
147
148
149def zsqrt(x):
150 with np.errstate(all="ignore"):
151 result = np.sqrt(x)
152 mask = x < 0
153
154 if isinstance(x, ABCDataFrame):
155 if mask._values.any():
156 result[mask] = 0
157 else:
158 if mask.any():
159 result[mask] = 0
160
161 return result
162
163
164def prep_binary(arg1, arg2):
165 # mask out values, this also makes a common index...
166 X = arg1 + 0 * arg2
167 Y = arg2 + 0 * arg1
168 return X, Y