Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/joblib/func_inspect.py: 11%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

184 statements  

1""" 

2My own variation on function-specific inspect-like features. 

3""" 

4 

5# Author: Gael Varoquaux <gael dot varoquaux at normalesup dot org> 

6# Copyright (c) 2009 Gael Varoquaux 

7# License: BSD Style, 3 clauses. 

8 

9import collections 

10import inspect 

11import os 

12import re 

13import warnings 

14from itertools import islice 

15from tokenize import open as open_py_source 

16 

17from .logger import pformat 

18 

19full_argspec_fields = ( 

20 "args varargs varkw defaults kwonlyargs kwonlydefaults annotations" 

21) 

22full_argspec_type = collections.namedtuple("FullArgSpec", full_argspec_fields) 

23 

24 

25def get_func_code(func): 

26 """Attempts to retrieve a reliable function code hash. 

27 

28 The reason we don't use inspect.getsource is that it caches the 

29 source, whereas we want this to be modified on the fly when the 

30 function is modified. 

31 

32 Returns 

33 ------- 

34 func_code: string 

35 The function code 

36 source_file: string 

37 The path to the file in which the function is defined. 

38 first_line: int 

39 The first line of the code in the source file. 

40 

41 Notes 

42 ------ 

43 This function does a bit more magic than inspect, and is thus 

44 more robust. 

45 """ 

46 source_file = None 

47 try: 

48 code = func.__code__ 

49 source_file = code.co_filename 

50 if not os.path.exists(source_file): 

51 # Use inspect for lambda functions and functions defined in an 

52 # interactive shell, or in doctests 

53 source_code = "".join(inspect.getsourcelines(func)[0]) 

54 line_no = 1 

55 if source_file.startswith("<doctest "): 

56 source_file, line_no = re.match( 

57 r"\<doctest (.*\.rst)\[(.*)\]\>", source_file 

58 ).groups() 

59 line_no = int(line_no) 

60 source_file = "<doctest %s>" % source_file 

61 return source_code, source_file, line_no 

62 # Try to retrieve the source code. 

63 with open_py_source(source_file) as source_file_obj: 

64 first_line = code.co_firstlineno 

65 # All the lines after the function definition: 

66 source_lines = list(islice(source_file_obj, first_line - 1, None)) 

67 return "".join(inspect.getblock(source_lines)), source_file, first_line 

68 except: # noqa: E722 

69 # If the source code fails, we use the hash. This is fragile and 

70 # might change from one session to another. 

71 if hasattr(func, "__code__"): 

72 # Python 3.X 

73 return str(func.__code__.__hash__()), source_file, -1 

74 else: 

75 # Weird objects like numpy ufunc don't have __code__ 

76 # This is fragile, as quite often the id of the object is 

77 # in the repr, so it might not persist across sessions, 

78 # however it will work for ufuncs. 

79 return repr(func), source_file, -1 

80 

81 

82def _clean_win_chars(string): 

83 """Windows cannot encode some characters in filename.""" 

84 import urllib 

85 

86 if hasattr(urllib, "quote"): 

87 quote = urllib.quote 

88 else: 

89 # In Python 3, quote is elsewhere 

90 import urllib.parse 

91 

92 quote = urllib.parse.quote 

93 for char in ("<", ">", "!", ":", "\\"): 

94 string = string.replace(char, quote(char)) 

95 return string 

96 

97 

98def get_func_name(func, resolv_alias=True, win_characters=True): 

99 """Return the function import path (as a list of module names), and 

100 a name for the function. 

101 

102 Parameters 

103 ---------- 

104 func: callable 

105 The func to inspect 

106 resolv_alias: boolean, optional 

107 If true, possible local aliases are indicated. 

108 win_characters: boolean, optional 

109 If true, substitute special characters using urllib.quote 

110 This is useful in Windows, as it cannot encode some filenames 

111 """ 

112 if hasattr(func, "__module__"): 

113 module = func.__module__ 

114 else: 

115 try: 

116 module = inspect.getmodule(func) 

117 except TypeError: 

118 if hasattr(func, "__class__"): 

119 module = func.__class__.__module__ 

120 else: 

121 module = "unknown" 

122 if module is None: 

123 # Happens in doctests, eg 

124 module = "" 

125 if module == "__main__": 

126 try: 

127 filename = os.path.abspath(inspect.getsourcefile(func)) 

128 except: # noqa: E722 

129 filename = None 

130 if filename is not None: 

131 # mangling of full path to filename 

132 parts = filename.split(os.sep) 

133 if parts[-1].startswith("<ipython-input"): 

134 # We're in a IPython (or notebook) session. parts[-1] comes 

135 # from func.__code__.co_filename and is of the form 

136 # <ipython-input-N-XYZ>, where: 

137 # - N is the cell number where the function was defined 

138 # - XYZ is a hash representing the function's code (and name). 

139 # It will be consistent across sessions and kernel restarts, 

140 # and will change if the function's code/name changes 

141 # We remove N so that cache is properly hit if the cell where 

142 # the func is defined is re-exectuted. 

143 # The XYZ hash should avoid collisions between functions with 

144 # the same name, both within the same notebook but also across 

145 # notebooks 

146 split = parts[-1].split("-") 

147 parts[-1] = "-".join(split[:2] + split[3:]) 

148 elif len(parts) > 2 and parts[-2].startswith("ipykernel_"): 

149 # In a notebook session (ipykernel). Filename seems to be 'xyz' 

150 # of above. parts[-2] has the structure ipykernel_XXXXXX where 

151 # XXXXXX is a six-digit number identifying the current run (?). 

152 # If we split it off, the function again has the same 

153 # identifier across runs. 

154 parts[-2] = "ipykernel" 

155 filename = "-".join(parts) 

156 if filename.endswith(".py"): 

157 filename = filename[:-3] 

158 module = module + "-" + filename 

159 module = module.split(".") 

160 if hasattr(func, "func_name"): 

161 name = func.func_name 

162 elif hasattr(func, "__name__"): 

163 name = func.__name__ 

164 else: 

165 name = "unknown" 

166 # Hack to detect functions not defined at the module-level 

167 if resolv_alias: 

168 # TODO: Maybe add a warning here? 

169 if hasattr(func, "func_globals") and name in func.func_globals: 

170 if func.func_globals[name] is not func: 

171 name = "%s-alias" % name 

172 if hasattr(func, "__qualname__") and func.__qualname__ != name: 

173 # Extend the module name in case of nested functions to avoid 

174 # (module, name) collisions 

175 module.extend(func.__qualname__.split(".")[:-1]) 

176 if inspect.ismethod(func): 

177 # We need to add the name of the class 

178 if hasattr(func, "im_class"): 

179 klass = func.im_class 

180 module.append(klass.__name__) 

181 if os.name == "nt" and win_characters: 

182 # Windows can't encode certain characters in filenames 

183 name = _clean_win_chars(name) 

184 module = [_clean_win_chars(s) for s in module] 

185 return module, name 

186 

187 

188def _signature_str(function_name, arg_sig): 

189 """Helper function to output a function signature""" 

190 return "{}{}".format(function_name, arg_sig) 

191 

192 

193def _function_called_str(function_name, args, kwargs): 

194 """Helper function to output a function call""" 

195 template_str = "{0}({1}, {2})" 

196 

197 args_str = repr(args)[1:-1] 

198 kwargs_str = ", ".join("%s=%s" % (k, v) for k, v in kwargs.items()) 

199 return template_str.format(function_name, args_str, kwargs_str) 

200 

201 

202def filter_args(func, ignore_lst, args=(), kwargs=dict()): 

203 """Filters the given args and kwargs using a list of arguments to 

204 ignore, and a function specification. 

205 

206 Parameters 

207 ---------- 

208 func: callable 

209 Function giving the argument specification 

210 ignore_lst: list of strings 

211 List of arguments to ignore (either a name of an argument 

212 in the function spec, or '*', or '**') 

213 *args: list 

214 Positional arguments passed to the function. 

215 **kwargs: dict 

216 Keyword arguments passed to the function 

217 

218 Returns 

219 ------- 

220 filtered_args: list 

221 List of filtered positional and keyword arguments. 

222 """ 

223 args = list(args) 

224 if isinstance(ignore_lst, str): 

225 # Catch a common mistake 

226 raise ValueError( 

227 "ignore_lst must be a list of parameters to ignore " 

228 f"{ignore_lst} (type {type(ignore_lst)}) was given" 

229 ) 

230 # Special case for functools.partial objects 

231 if not inspect.ismethod(func) and not inspect.isfunction(func): 

232 if ignore_lst: 

233 warnings.warn( 

234 f"Cannot inspect object {func}, ignore list will not work.", 

235 stacklevel=2, 

236 ) 

237 return {"*": args, "**": kwargs} 

238 arg_sig = inspect.signature(func) 

239 arg_names = [] 

240 arg_kwonlyargs = [] 

241 arg_varargs = None 

242 arg_varkw = None 

243 arg_dict = dict() 

244 for param in arg_sig.parameters.values(): 

245 if param.kind is param.POSITIONAL_OR_KEYWORD: 

246 arg_names.append(param.name) 

247 elif param.kind is param.KEYWORD_ONLY: 

248 arg_kwonlyargs.append(param.name) 

249 elif param.kind is param.VAR_POSITIONAL: 

250 arg_varargs = param.name 

251 elif param.kind is param.VAR_KEYWORD: 

252 arg_varkw = param.name 

253 if param.default is not param.empty: 

254 arg_dict[param.name] = param.default 

255 if inspect.ismethod(func): 

256 # First argument is 'self', it has been removed by Python 

257 # we need to add it back: 

258 args = [ 

259 func.__self__, 

260 ] + args 

261 # func is an instance method, inspect.signature(func) does not 

262 # include self, we need to fetch it from the class method, i.e 

263 # func.__func__ 

264 class_method_sig = inspect.signature(func.__func__) 

265 self_name = next(iter(class_method_sig.parameters)) 

266 arg_names = [self_name] + arg_names 

267 # XXX: Maybe I need an inspect.isbuiltin to detect C-level methods, such 

268 # as on ndarrays. 

269 

270 _, name = get_func_name(func, resolv_alias=False) 

271 

272 # Check for positional argument of the function, that can be 

273 # passed either as positional or keyword arguments in the call 

274 for arg_position, arg_name in enumerate(arg_names): 

275 if arg_position < len(args): 

276 # given as positional 

277 arg_dict[arg_name] = args[arg_position] 

278 if arg_name in kwargs: 

279 raise ValueError( 

280 f"Argument {arg_name} was given both as positional and as keyword" 

281 f" for {_signature_str(name, arg_sig)}:\n" 

282 f" {_function_called_str(name, args, kwargs)} was called." 

283 ) 

284 elif arg_name in kwargs: 

285 # given as keyword 

286 arg_dict[arg_name] = kwargs[arg_name] 

287 elif arg_name not in arg_dict: 

288 # Missing argument 

289 raise ValueError( 

290 f"Wrong number of arguments for {_signature_str(name, arg_sig)}:\n" 

291 f" {_function_called_str(name, args, kwargs)} was called." 

292 ) 

293 

294 # If more positional arguments are given, they correspond 

295 # to *args, store them in vargs 

296 if len(args) > len(arg_names): 

297 if arg_varargs is None: 

298 raise ValueError( 

299 f"Too many arguments for {_signature_str(name, arg_sig)}:\n" 

300 f" {_function_called_str(name, args, kwargs)} was called." 

301 ) 

302 arg_dict["*"] = args[len(arg_names) :] 

303 elif arg_varargs is not None: 

304 arg_dict["*"] = [] 

305 

306 # Check keyword only arguments 

307 for arg_name in arg_kwonlyargs: 

308 if arg_name in kwargs: 

309 arg_dict[arg_name] = kwargs[arg_name] 

310 elif arg_name not in arg_dict: 

311 # required keyword only argument that is missing, 

312 # raise an error 

313 raise ValueError( 

314 f"Wrong number of arguments for {_signature_str(name, arg_sig)}:\n" 

315 f" {_function_called_str(name, args, kwargs)} was called." 

316 ) 

317 

318 # If more keyword arguments are given, store them in varkwargs 

319 varkwargs = {k: v for k, v in kwargs.items() if k not in arg_dict} 

320 if arg_varkw is not None: 

321 arg_dict["**"] = varkwargs 

322 

323 elif varkwargs: 

324 raise ValueError( 

325 f"Too many keyword arguments for {_signature_str(name, arg_sig)}:\n" 

326 f" {_function_called_str(name, args, kwargs)} was called." 

327 ) 

328 

329 # Now remove the arguments to be ignored 

330 for item in ignore_lst: 

331 if item in arg_dict: 

332 arg_dict.pop(item) 

333 else: 

334 raise ValueError( 

335 f"Ignore list: argument '{item}' is not defined for " 

336 f"function {_signature_str(name, arg_sig)}" 

337 ) 

338 # XXX: Return a sorted list of pairs? 

339 return arg_dict 

340 

341 

342def _format_arg(arg): 

343 formatted_arg = pformat(arg, indent=2) 

344 if len(formatted_arg) > 1500: 

345 formatted_arg = "%s..." % formatted_arg[:700] 

346 return formatted_arg 

347 

348 

349def format_signature(func, *args, **kwargs): 

350 # XXX: Should this use inspect.formatargvalues/formatargspec? 

351 module, name = get_func_name(func) 

352 module = [m for m in module if m] 

353 if module: 

354 module.append(name) 

355 module_path = ".".join(module) 

356 else: 

357 module_path = name 

358 arg_str = list() 

359 previous_length = 0 

360 for arg in args: 

361 formatted_arg = _format_arg(arg) 

362 if previous_length > 80: 

363 formatted_arg = "\n%s" % formatted_arg 

364 previous_length = len(formatted_arg) 

365 arg_str.append(formatted_arg) 

366 arg_str.extend(["%s=%s" % (v, _format_arg(i)) for v, i in kwargs.items()]) 

367 arg_str = ", ".join(arg_str) 

368 

369 signature = "%s(%s)" % (name, arg_str) 

370 return module_path, signature 

371 

372 

373def format_call(func, args, kwargs, object_name="Memory"): 

374 """Returns a nicely formatted statement displaying the function 

375 call with the given arguments. 

376 """ 

377 path, signature = format_signature(func, *args, **kwargs) 

378 msg = "%s\n[%s] Calling %s...\n%s" % (80 * "_", object_name, path, signature) 

379 return msg 

380 # XXX: Not using logging framework 

381 # self.debug(msg)