Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pandas/core/window/common.py: 11%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

81 statements  

1"""Common utility functions for rolling operations""" 

2from __future__ import annotations 

3 

4from collections import defaultdict 

5from typing import cast 

6 

7import numpy as np 

8 

9from pandas.core.dtypes.generic import ( 

10 ABCDataFrame, 

11 ABCSeries, 

12) 

13 

14from pandas.core.indexes.api import MultiIndex 

15 

16 

17def flex_binary_moment(arg1, arg2, f, pairwise: bool = False): 

18 if isinstance(arg1, ABCSeries) and isinstance(arg2, ABCSeries): 

19 X, Y = prep_binary(arg1, arg2) 

20 return f(X, Y) 

21 

22 elif isinstance(arg1, ABCDataFrame): 

23 from pandas import DataFrame 

24 

25 def dataframe_from_int_dict(data, frame_template) -> DataFrame: 

26 result = DataFrame(data, index=frame_template.index) 

27 if len(result.columns) > 0: 

28 result.columns = frame_template.columns[result.columns] 

29 else: 

30 result.columns = frame_template.columns.copy() 

31 return result 

32 

33 results = {} 

34 if isinstance(arg2, ABCDataFrame): 

35 if pairwise is False: 

36 if arg1 is arg2: 

37 # special case in order to handle duplicate column names 

38 for i in range(len(arg1.columns)): 

39 results[i] = f(arg1.iloc[:, i], arg2.iloc[:, i]) 

40 return dataframe_from_int_dict(results, arg1) 

41 else: 

42 if not arg1.columns.is_unique: 

43 raise ValueError("'arg1' columns are not unique") 

44 if not arg2.columns.is_unique: 

45 raise ValueError("'arg2' columns are not unique") 

46 X, Y = arg1.align(arg2, join="outer") 

47 X, Y = prep_binary(X, Y) 

48 res_columns = arg1.columns.union(arg2.columns) 

49 for col in res_columns: 

50 if col in X and col in Y: 

51 results[col] = f(X[col], Y[col]) 

52 return DataFrame(results, index=X.index, columns=res_columns) 

53 elif pairwise is True: 

54 results = defaultdict(dict) 

55 for i in range(len(arg1.columns)): 

56 for j in range(len(arg2.columns)): 

57 if j < i and arg2 is arg1: 

58 # Symmetric case 

59 results[i][j] = results[j][i] 

60 else: 

61 results[i][j] = f( 

62 *prep_binary(arg1.iloc[:, i], arg2.iloc[:, j]) 

63 ) 

64 

65 from pandas import concat 

66 

67 result_index = arg1.index.union(arg2.index) 

68 if len(result_index): 

69 # construct result frame 

70 result = concat( 

71 [ 

72 concat( 

73 [results[i][j] for j in range(len(arg2.columns))], 

74 ignore_index=True, 

75 ) 

76 for i in range(len(arg1.columns)) 

77 ], 

78 ignore_index=True, 

79 axis=1, 

80 ) 

81 result.columns = arg1.columns 

82 

83 # set the index and reorder 

84 if arg2.columns.nlevels > 1: 

85 # mypy needs to know columns is a MultiIndex, Index doesn't 

86 # have levels attribute 

87 arg2.columns = cast(MultiIndex, arg2.columns) 

88 # GH 21157: Equivalent to MultiIndex.from_product( 

89 # [result_index], <unique combinations of arg2.columns.levels>, 

90 # ) 

91 # A normal MultiIndex.from_product will produce too many 

92 # combinations. 

93 result_level = np.tile( 

94 result_index, len(result) // len(result_index) 

95 ) 

96 arg2_levels = ( 

97 np.repeat( 

98 arg2.columns.get_level_values(i), 

99 len(result) // len(arg2.columns), 

100 ) 

101 for i in range(arg2.columns.nlevels) 

102 ) 

103 result_names = list(arg2.columns.names) + [result_index.name] 

104 result.index = MultiIndex.from_arrays( 

105 [*arg2_levels, result_level], names=result_names 

106 ) 

107 # GH 34440 

108 num_levels = len(result.index.levels) 

109 new_order = [num_levels - 1] + list(range(num_levels - 1)) 

110 result = result.reorder_levels(new_order).sort_index() 

111 else: 

112 result.index = MultiIndex.from_product( 

113 [range(len(arg2.columns)), range(len(result_index))] 

114 ) 

115 result = result.swaplevel(1, 0).sort_index() 

116 result.index = MultiIndex.from_product( 

117 [result_index] + [arg2.columns] 

118 ) 

119 else: 

120 # empty result 

121 result = DataFrame( 

122 index=MultiIndex( 

123 levels=[arg1.index, arg2.columns], codes=[[], []] 

124 ), 

125 columns=arg2.columns, 

126 dtype="float64", 

127 ) 

128 

129 # reset our index names to arg1 names 

130 # reset our column names to arg2 names 

131 # careful not to mutate the original names 

132 result.columns = result.columns.set_names(arg1.columns.names) 

133 result.index = result.index.set_names( 

134 result_index.names + arg2.columns.names 

135 ) 

136 

137 return result 

138 else: 

139 results = { 

140 i: f(*prep_binary(arg1.iloc[:, i], arg2)) 

141 for i in range(len(arg1.columns)) 

142 } 

143 return dataframe_from_int_dict(results, arg1) 

144 

145 else: 

146 return flex_binary_moment(arg2, arg1, f) 

147 

148 

149def zsqrt(x): 

150 with np.errstate(all="ignore"): 

151 result = np.sqrt(x) 

152 mask = x < 0 

153 

154 if isinstance(x, ABCDataFrame): 

155 if mask._values.any(): 

156 result[mask] = 0 

157 else: 

158 if mask.any(): 

159 result[mask] = 0 

160 

161 return result 

162 

163 

164def prep_binary(arg1, arg2): 

165 # mask out values, this also makes a common index... 

166 X = arg1 + 0 * arg2 

167 Y = arg2 + 0 * arg1 

168 return X, Y