Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/scipy/_lib/_testutils.py: 22%

121 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-12 06:31 +0000

1""" 

2Generic test utilities. 

3 

4""" 

5 

6import os 

7import re 

8import sys 

9import numpy as np 

10import inspect 

11 

12 

13__all__ = ['PytestTester', 'check_free_memory', '_TestPythranFunc'] 

14 

15 

16class FPUModeChangeWarning(RuntimeWarning): 

17 """Warning about FPU mode change""" 

18 pass 

19 

20 

21class PytestTester: 

22 """ 

23 Pytest test runner entry point. 

24 """ 

25 

26 def __init__(self, module_name): 

27 self.module_name = module_name 

28 

29 def __call__(self, label="fast", verbose=1, extra_argv=None, doctests=False, 

30 coverage=False, tests=None, parallel=None): 

31 import pytest 

32 

33 module = sys.modules[self.module_name] 

34 module_path = os.path.abspath(module.__path__[0]) 

35 

36 pytest_args = ['--showlocals', '--tb=short'] 

37 

38 if doctests: 

39 raise ValueError("Doctests not supported") 

40 

41 if extra_argv: 

42 pytest_args += list(extra_argv) 

43 

44 if verbose and int(verbose) > 1: 

45 pytest_args += ["-" + "v"*(int(verbose)-1)] 

46 

47 if coverage: 

48 pytest_args += ["--cov=" + module_path] 

49 

50 if label == "fast": 

51 pytest_args += ["-m", "not slow"] 

52 elif label != "full": 

53 pytest_args += ["-m", label] 

54 

55 if tests is None: 

56 tests = [self.module_name] 

57 

58 if parallel is not None and parallel > 1: 

59 if _pytest_has_xdist(): 

60 pytest_args += ['-n', str(parallel)] 

61 else: 

62 import warnings 

63 warnings.warn('Could not run tests in parallel because ' 

64 'pytest-xdist plugin is not available.') 

65 

66 pytest_args += ['--pyargs'] + list(tests) 

67 

68 try: 

69 code = pytest.main(pytest_args) 

70 except SystemExit as exc: 

71 code = exc.code 

72 

73 return (code == 0) 

74 

75 

76class _TestPythranFunc: 

77 ''' 

78 These are situations that can be tested in our pythran tests: 

79 - A function with multiple array arguments and then 

80 other positional and keyword arguments. 

81 - A function with array-like keywords (e.g. `def somefunc(x0, x1=None)`. 

82 Note: list/tuple input is not yet tested! 

83 

84 `self.arguments`: A dictionary which key is the index of the argument, 

85 value is tuple(array value, all supported dtypes) 

86 `self.partialfunc`: A function used to freeze some non-array argument 

87 that of no interests in the original function 

88 ''' 

89 ALL_INTEGER = [np.int8, np.int16, np.int32, np.int64, np.intc, np.intp] 

90 ALL_FLOAT = [np.float32, np.float64] 

91 ALL_COMPLEX = [np.complex64, np.complex128] 

92 

93 def setup_method(self): 

94 self.arguments = {} 

95 self.partialfunc = None 

96 self.expected = None 

97 

98 def get_optional_args(self, func): 

99 # get optional arguments with its default value, 

100 # used for testing keywords 

101 signature = inspect.signature(func) 

102 optional_args = {} 

103 for k, v in signature.parameters.items(): 

104 if v.default is not inspect.Parameter.empty: 

105 optional_args[k] = v.default 

106 return optional_args 

107 

108 def get_max_dtype_list_length(self): 

109 # get the max supported dtypes list length in all arguments 

110 max_len = 0 

111 for arg_idx in self.arguments: 

112 cur_len = len(self.arguments[arg_idx][1]) 

113 if cur_len > max_len: 

114 max_len = cur_len 

115 return max_len 

116 

117 def get_dtype(self, dtype_list, dtype_idx): 

118 # get the dtype from dtype_list via index 

119 # if the index is out of range, then return the last dtype 

120 if dtype_idx > len(dtype_list)-1: 

121 return dtype_list[-1] 

122 else: 

123 return dtype_list[dtype_idx] 

124 

125 def test_all_dtypes(self): 

126 for type_idx in range(self.get_max_dtype_list_length()): 

127 args_array = [] 

128 for arg_idx in self.arguments: 

129 new_dtype = self.get_dtype(self.arguments[arg_idx][1], 

130 type_idx) 

131 args_array.append(self.arguments[arg_idx][0].astype(new_dtype)) 

132 self.pythranfunc(*args_array) 

133 

134 def test_views(self): 

135 args_array = [] 

136 for arg_idx in self.arguments: 

137 args_array.append(self.arguments[arg_idx][0][::-1][::-1]) 

138 self.pythranfunc(*args_array) 

139 

140 def test_strided(self): 

141 args_array = [] 

142 for arg_idx in self.arguments: 

143 args_array.append(np.repeat(self.arguments[arg_idx][0], 

144 2, axis=0)[::2]) 

145 self.pythranfunc(*args_array) 

146 

147 

148def _pytest_has_xdist(): 

149 """ 

150 Check if the pytest-xdist plugin is installed, providing parallel tests 

151 """ 

152 # Check xdist exists without importing, otherwise pytests emits warnings 

153 from importlib.util import find_spec 

154 return find_spec('xdist') is not None 

155 

156 

157def check_free_memory(free_mb): 

158 """ 

159 Check *free_mb* of memory is available, otherwise do pytest.skip 

160 """ 

161 import pytest 

162 

163 try: 

164 mem_free = _parse_size(os.environ['SCIPY_AVAILABLE_MEM']) 

165 msg = '{0} MB memory required, but environment SCIPY_AVAILABLE_MEM={1}'.format( 

166 free_mb, os.environ['SCIPY_AVAILABLE_MEM']) 

167 except KeyError: 

168 mem_free = _get_mem_available() 

169 if mem_free is None: 

170 pytest.skip("Could not determine available memory; set SCIPY_AVAILABLE_MEM " 

171 "variable to free memory in MB to run the test.") 

172 msg = '{0} MB memory required, but {1} MB available'.format( 

173 free_mb, mem_free/1e6) 

174 

175 if mem_free < free_mb * 1e6: 

176 pytest.skip(msg) 

177 

178 

179def _parse_size(size_str): 

180 suffixes = {'': 1e6, 

181 'b': 1.0, 

182 'k': 1e3, 'M': 1e6, 'G': 1e9, 'T': 1e12, 

183 'kb': 1e3, 'Mb': 1e6, 'Gb': 1e9, 'Tb': 1e12, 

184 'kib': 1024.0, 'Mib': 1024.0**2, 'Gib': 1024.0**3, 'Tib': 1024.0**4} 

185 m = re.match(r'^\s*(\d+)\s*({0})\s*$'.format('|'.join(suffixes.keys())), 

186 size_str, 

187 re.I) 

188 if not m or m.group(2) not in suffixes: 

189 raise ValueError("Invalid size string") 

190 

191 return float(m.group(1)) * suffixes[m.group(2)] 

192 

193 

194def _get_mem_available(): 

195 """ 

196 Get information about memory available, not counting swap. 

197 """ 

198 try: 

199 import psutil 

200 return psutil.virtual_memory().available 

201 except (ImportError, AttributeError): 

202 pass 

203 

204 if sys.platform.startswith('linux'): 

205 info = {} 

206 with open('/proc/meminfo', 'r') as f: 

207 for line in f: 

208 p = line.split() 

209 info[p[0].strip(':').lower()] = float(p[1]) * 1e3 

210 

211 if 'memavailable' in info: 

212 # Linux >= 3.14 

213 return info['memavailable'] 

214 else: 

215 return info['memfree'] + info['cached'] 

216 

217 return None