Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/atheris/import_hook.py: 66%

150 statements  

« prev     ^ index     » next       coverage.py v7.0.5, created at 2023-01-17 06:13 +0000

1# Copyright 2021 Google LLC 

2# Copyright 2021 Fraunhofer FKIE 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15"""atheris instruments modules at import-time. 

16 

17The instrument() function temporarily installs an import hook 

18(AtherisMetaPathFinder) in sys.meta_path that employs a custom loader 

19(AtherisSourceFileLoader, AtherisSourcelessFileLoader). 

20""" 

21# _frozen_importlib is a special Py Interpreter library, disable import-error. 

22import _frozen_importlib # type: ignore[import] 

23import _frozen_importlib_external # type: ignore[import] 

24from importlib import abc 

25from importlib import machinery 

26import sys 

27import types 

28from typing import Set, Optional, Sequence, Type, Union, Any 

29from .instrument_bytecode import patch_code 

30 

31_warned_experimental = False 

32 

33# A list of known loaders we should silence warnings about. 

34SKIP_LOADERS = set([ 

35 # Google3 loader, implemented in native code, loads other native code. 

36 "StaticMetaImporter", 

37 # Google3 loader, implemented in native code, loads other native code as 

38 # well as Python code. 

39 "ElfZipImporter", 

40]) 

41 

42 

43# TODO(b/207008147) Mypy does not like abc.FileLoader? 

44def _should_skip(loader: Any) -> bool: 

45 """Returns whether modules loaded with this importer should be ignored.""" 

46 if hasattr(loader, "__qualname__"): 

47 if loader.__qualname__ in SKIP_LOADERS: # type: ignore[attr-defined] 

48 return True 

49 

50 if hasattr(loader.__class__, "__qualname__"): 

51 if loader.__class__.__qualname__ in SKIP_LOADERS: 

52 return True 

53 

54 return False 

55 

56 

57class AtherisMetaPathFinder(abc.MetaPathFinder): 

58 """Finds and loads package metapaths with Atheris loaders.""" 

59 

60 def __init__(self, include_packages: Set[str], exclude_modules: Set[str], 

61 enable_loader_override: bool, trace_dataflow: bool): 

62 """Finds and loads package metapaths with Atheris loaders. 

63 

64 Args: 

65 include_packages: If not empty, an allowlist of packages to instrument. 

66 exclude_modules: A denylist of modules to never instrument. This has 

67 higher precedent than include_packages. 

68 enable_loader_override: Use experimental support to instrument bytecode 

69 loaded from custom loaders. 

70 trace_dataflow: Whether or not to trace dataflow. 

71 """ 

72 super().__init__() 

73 self._include_packages = include_packages 

74 self._exclude_modules = exclude_modules 

75 self._trace_dataflow = trace_dataflow 

76 self._enable_loader_override = enable_loader_override 

77 

78 def find_spec( 

79 self, 

80 fullname: str, 

81 path: Optional[Sequence[Union[bytes, str]]], 

82 target: Optional[types.ModuleType] = None 

83 ) -> Optional[machinery.ModuleSpec]: 

84 """Returns the module spec if any. 

85 

86 Args: 

87 fullname: Fully qualified name of the package. 

88 path: Parent package's __path__ 

89 target: When passed in, target is a module object that the finder may use 

90 to make a more educated guess about what spec to return. 

91 

92 Returns: 

93 The ModuleSpec if found, not excluded, and included if any are included. 

94 """ 

95 if fullname in self._exclude_modules: 

96 return None 

97 

98 package_name = fullname.split(".")[0] 

99 

100 if (not self._include_packages or 

101 package_name in self._include_packages) and package_name != "atheris": 

102 # Try each importer after the Atheris importer until we find an acceptable 

103 # one 

104 found_atheris = False 

105 for meta in sys.meta_path: 

106 # Skip any loaders before (or including) the Atheris loader 

107 if not found_atheris: 

108 if meta is self: 

109 found_atheris = True 

110 continue 

111 

112 # Check each remaining loader 

113 if not hasattr(meta, "find_spec"): 

114 continue 

115 

116 spec = meta.find_spec(fullname, path, target) 

117 if spec is None or spec.loader is None: 

118 continue 

119 

120 if _should_skip(spec.loader): 

121 return None 

122 

123 if isinstance(spec.loader, machinery.ExtensionFileLoader): 

124 # An extension, coverage doesn't come from Python 

125 return None 

126 

127 sys.stderr.write(f"INFO: Instrumenting {fullname}\n") 

128 

129 # Use normal inheritance for the common cases. This may not be needed 

130 # (the dynamic case should work for everything), but keep this for as 

131 # long as that's experimental. 

132 if isinstance(spec.loader, _frozen_importlib_external.SourceFileLoader): 

133 spec.loader = AtherisSourceFileLoader(spec.loader.name, 

134 spec.loader.path, 

135 self._trace_dataflow) 

136 return spec 

137 

138 elif isinstance(spec.loader, 

139 _frozen_importlib_external.SourcelessFileLoader): 

140 spec.loader = AtherisSourcelessFileLoader(spec.loader.name, 

141 spec.loader.path, 

142 self._trace_dataflow) 

143 return spec 

144 

145 else: 

146 # The common case isn't what we have, so wrap an existing object 

147 # via composition. 

148 

149 if not self._enable_loader_override: 

150 sys.stderr.write("WARNING: Skipping import with custom loader.\n") 

151 return None 

152 

153 global _warned_experimental 

154 if not _warned_experimental: 

155 sys.stderr.write( 

156 "WARNING: It looks like this module is imported by a " 

157 "custom loader. Atheris has experimental support for this. " 

158 "However, it may be incompatible with certain libraries. " 

159 "If you experience unusual errors or poor coverage " 

160 "collection, try atheris.instrument_all() instead, add " 

161 "enable_loader_override=False to instrument_imports(), or " 

162 "file an issue on GitHub.\n") 

163 _warned_experimental = True 

164 

165 try: 

166 spec.loader = make_dynamic_atheris_loader(spec.loader, 

167 self._trace_dataflow) 

168 return spec 

169 except Exception: # pylint: disable=broad-except 

170 sys.stderr.write("WARNING: This module uses a custom loader that " 

171 "prevents it from being instrumented: " 

172 f"{spec.loader}\n") 

173 return None 

174 

175 return None 

176 return None 

177 return None 

178 

179 def invalidate_caches(self) -> None: 

180 return machinery.PathFinder.invalidate_caches() 

181 

182 

183class AtherisSourceFileLoader(_frozen_importlib_external.SourceFileLoader): 

184 """Loads a source file, patching its bytecode with Atheris instrumentation.""" 

185 

186 def __init__(self, name: str, path: str, trace_dataflow: bool): 

187 super().__init__(name, path) 

188 self._trace_dataflow = trace_dataflow 

189 

190 def get_code(self, fullname: str) -> Optional[types.CodeType]: 

191 code = super().get_code(fullname) 

192 

193 if code is None: 

194 return None 

195 else: 

196 return patch_code(code, self._trace_dataflow) 

197 

198 

199class AtherisSourcelessFileLoader( 

200 _frozen_importlib_external.SourcelessFileLoader): 

201 """Loads a sourceless/bytecode file, patching it with Atheris instrumentation.""" 

202 

203 def __init__(self, name: str, path: str, trace_dataflow: bool): 

204 super().__init__(name, path) 

205 self._trace_dataflow = trace_dataflow 

206 

207 def get_code(self, fullname: str) -> Optional[types.CodeType]: 

208 code = super().get_code(fullname) 

209 

210 if code is None: 

211 return None 

212 else: 

213 return patch_code(code, self._trace_dataflow) 

214 

215 

216def make_dynamic_atheris_loader(loader: Any, trace_dataflow: bool) -> Any: 

217 """Create a loader via 'object inheritance' and return it. 

218 

219 This technique allows us to override just the get_code function on an 

220 already-existing object loader. This is experimental. 

221 

222 Args: 

223 loader: Loader or Loader class. 

224 trace_dataflow: Whether or not to trace dataflow. 

225 

226 Returns: 

227 The loader class overriden with Atheris tracing. 

228 """ 

229 if loader.__class__ is type: 

230 # This is a class with classmethods. Use regular inheritance to override 

231 # get_code. 

232 

233 class DynAtherisLoaderClass(loader): # type: ignore[valid-type, misc] 

234 

235 @classmethod 

236 def get_code(cls, fullname: str) -> Optional[types.CodeType]: 

237 code = loader.get_code(fullname) 

238 

239 if code is None: 

240 return None 

241 return patch_code(code, cls._trace_dataflow) 

242 

243 return DynAtherisLoaderClass 

244 

245 # This is an object. We create a new object that's a copy of the existing 

246 # object but with a custom get_code implementation. 

247 class DynAtherisLoaderObject(loader.__class__): # type: ignore[name-defined] 

248 """Dynamic wrapper over a loader.""" 

249 

250 def __init__(self, trace_dataflow: bool): 

251 self._trace_dataflow = trace_dataflow 

252 

253 def get_code(self, fullname: str) -> Optional[types.CodeType]: 

254 code = super().get_code(fullname) 

255 

256 if code is None: 

257 return None 

258 return patch_code(code, self._trace_dataflow) 

259 

260 ret = DynAtherisLoaderObject(trace_dataflow) 

261 for k, v in loader.__dict__.items(): 

262 if k not in ret.__dict__: 

263 ret.__dict__[k] = v 

264 

265 return ret 

266 

267 

268class HookManager: 

269 """A Context manager that manages hooks.""" 

270 

271 def __init__(self, include_packages: Set[str], exclude_modules: Set[str], 

272 enable_loader_override: bool, trace_dataflow: bool): 

273 self._include_packages = include_packages 

274 self._exclude_modules = exclude_modules 

275 self._enable_loader_override = enable_loader_override 

276 self._trace_dataflow = trace_dataflow 

277 

278 def __enter__(self) -> "HookManager": 

279 i = 0 

280 while i < len(sys.meta_path): 

281 if isinstance(sys.meta_path[i], AtherisMetaPathFinder): 

282 return self 

283 i += 1 

284 

285 i = 0 

286 while i < len(sys.meta_path) and sys.meta_path[i] in [ 

287 _frozen_importlib.BuiltinImporter, _frozen_importlib.FrozenImporter 

288 ]: 

289 i += 1 

290 

291 sys.meta_path.insert( 

292 i, 

293 AtherisMetaPathFinder(self._include_packages, self._exclude_modules, 

294 self._enable_loader_override, 

295 self._trace_dataflow)) 

296 

297 return self 

298 

299 def __exit__(self, *args: Any) -> None: 

300 i = 0 

301 while i < len(sys.meta_path): 

302 if isinstance(sys.meta_path[i], AtherisMetaPathFinder): 

303 sys.meta_path.pop(i) 

304 else: 

305 i += 1 

306 

307 

308def instrument_imports(include: Optional[Sequence[str]] = None, 

309 exclude: Optional[Sequence[str]] = None, 

310 enable_loader_override: bool = True) -> HookManager: 

311 """Returns a context manager that will instrument modules as imported. 

312 

313 Args: 

314 include: module names that shall be instrumented. Submodules within these 

315 packages will be recursively instrumented too. 

316 exclude: module names that shall not be instrumented. 

317 enable_loader_override: Whether or not to enable the experimental feature of 

318 instrumenting custom loaders. 

319 

320 Returns: 

321 

322 Raises: 

323 TypeError: If any module name is not a str. 

324 ValueError: If any module name is a relative path or empty. 

325 """ 

326 include = [] if include is None else list(include) 

327 exclude = [] if exclude is None else list(exclude) 

328 

329 include_packages = set() 

330 

331 for module_name in include + exclude: 

332 if not isinstance(module_name, str): 

333 raise TypeError("atheris.instrument_imports() expects names of " + 

334 "modules of type <str>") 

335 elif not module_name: 

336 raise ValueError("atheris.instrument_imports(): " + 

337 "You supplied an empty module name") 

338 elif module_name[0] == ".": 

339 raise ValueError("atheris.instrument_imports(): Please specify fully " + 

340 "qualified module names (absolute not relative)") 

341 

342 for module_name in include: 

343 if "." in module_name: 

344 module_name = module_name.split(".")[0] 

345 

346 include_packages.add(module_name) 

347 

348 return HookManager( 

349 include_packages, 

350 set(exclude), 

351 enable_loader_override, 

352 trace_dataflow=True)