Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/atheris/import_hook.py: 66%
150 statements
« prev ^ index » next coverage.py v7.0.5, created at 2023-01-17 06:13 +0000
« prev ^ index » next coverage.py v7.0.5, created at 2023-01-17 06:13 +0000
1# Copyright 2021 Google LLC
2# Copyright 2021 Fraunhofer FKIE
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15"""atheris instruments modules at import-time.
17The instrument() function temporarily installs an import hook
18(AtherisMetaPathFinder) in sys.meta_path that employs a custom loader
19(AtherisSourceFileLoader, AtherisSourcelessFileLoader).
20"""
21# _frozen_importlib is a special Py Interpreter library, disable import-error.
22import _frozen_importlib # type: ignore[import]
23import _frozen_importlib_external # type: ignore[import]
24from importlib import abc
25from importlib import machinery
26import sys
27import types
28from typing import Set, Optional, Sequence, Type, Union, Any
29from .instrument_bytecode import patch_code
31_warned_experimental = False
33# A list of known loaders we should silence warnings about.
34SKIP_LOADERS = set([
35 # Google3 loader, implemented in native code, loads other native code.
36 "StaticMetaImporter",
37 # Google3 loader, implemented in native code, loads other native code as
38 # well as Python code.
39 "ElfZipImporter",
40])
43# TODO(b/207008147) Mypy does not like abc.FileLoader?
44def _should_skip(loader: Any) -> bool:
45 """Returns whether modules loaded with this importer should be ignored."""
46 if hasattr(loader, "__qualname__"):
47 if loader.__qualname__ in SKIP_LOADERS: # type: ignore[attr-defined]
48 return True
50 if hasattr(loader.__class__, "__qualname__"):
51 if loader.__class__.__qualname__ in SKIP_LOADERS:
52 return True
54 return False
57class AtherisMetaPathFinder(abc.MetaPathFinder):
58 """Finds and loads package metapaths with Atheris loaders."""
60 def __init__(self, include_packages: Set[str], exclude_modules: Set[str],
61 enable_loader_override: bool, trace_dataflow: bool):
62 """Finds and loads package metapaths with Atheris loaders.
64 Args:
65 include_packages: If not empty, an allowlist of packages to instrument.
66 exclude_modules: A denylist of modules to never instrument. This has
67 higher precedent than include_packages.
68 enable_loader_override: Use experimental support to instrument bytecode
69 loaded from custom loaders.
70 trace_dataflow: Whether or not to trace dataflow.
71 """
72 super().__init__()
73 self._include_packages = include_packages
74 self._exclude_modules = exclude_modules
75 self._trace_dataflow = trace_dataflow
76 self._enable_loader_override = enable_loader_override
78 def find_spec(
79 self,
80 fullname: str,
81 path: Optional[Sequence[Union[bytes, str]]],
82 target: Optional[types.ModuleType] = None
83 ) -> Optional[machinery.ModuleSpec]:
84 """Returns the module spec if any.
86 Args:
87 fullname: Fully qualified name of the package.
88 path: Parent package's __path__
89 target: When passed in, target is a module object that the finder may use
90 to make a more educated guess about what spec to return.
92 Returns:
93 The ModuleSpec if found, not excluded, and included if any are included.
94 """
95 if fullname in self._exclude_modules:
96 return None
98 package_name = fullname.split(".")[0]
100 if (not self._include_packages or
101 package_name in self._include_packages) and package_name != "atheris":
102 # Try each importer after the Atheris importer until we find an acceptable
103 # one
104 found_atheris = False
105 for meta in sys.meta_path:
106 # Skip any loaders before (or including) the Atheris loader
107 if not found_atheris:
108 if meta is self:
109 found_atheris = True
110 continue
112 # Check each remaining loader
113 if not hasattr(meta, "find_spec"):
114 continue
116 spec = meta.find_spec(fullname, path, target)
117 if spec is None or spec.loader is None:
118 continue
120 if _should_skip(spec.loader):
121 return None
123 if isinstance(spec.loader, machinery.ExtensionFileLoader):
124 # An extension, coverage doesn't come from Python
125 return None
127 sys.stderr.write(f"INFO: Instrumenting {fullname}\n")
129 # Use normal inheritance for the common cases. This may not be needed
130 # (the dynamic case should work for everything), but keep this for as
131 # long as that's experimental.
132 if isinstance(spec.loader, _frozen_importlib_external.SourceFileLoader):
133 spec.loader = AtherisSourceFileLoader(spec.loader.name,
134 spec.loader.path,
135 self._trace_dataflow)
136 return spec
138 elif isinstance(spec.loader,
139 _frozen_importlib_external.SourcelessFileLoader):
140 spec.loader = AtherisSourcelessFileLoader(spec.loader.name,
141 spec.loader.path,
142 self._trace_dataflow)
143 return spec
145 else:
146 # The common case isn't what we have, so wrap an existing object
147 # via composition.
149 if not self._enable_loader_override:
150 sys.stderr.write("WARNING: Skipping import with custom loader.\n")
151 return None
153 global _warned_experimental
154 if not _warned_experimental:
155 sys.stderr.write(
156 "WARNING: It looks like this module is imported by a "
157 "custom loader. Atheris has experimental support for this. "
158 "However, it may be incompatible with certain libraries. "
159 "If you experience unusual errors or poor coverage "
160 "collection, try atheris.instrument_all() instead, add "
161 "enable_loader_override=False to instrument_imports(), or "
162 "file an issue on GitHub.\n")
163 _warned_experimental = True
165 try:
166 spec.loader = make_dynamic_atheris_loader(spec.loader,
167 self._trace_dataflow)
168 return spec
169 except Exception: # pylint: disable=broad-except
170 sys.stderr.write("WARNING: This module uses a custom loader that "
171 "prevents it from being instrumented: "
172 f"{spec.loader}\n")
173 return None
175 return None
176 return None
177 return None
179 def invalidate_caches(self) -> None:
180 return machinery.PathFinder.invalidate_caches()
183class AtherisSourceFileLoader(_frozen_importlib_external.SourceFileLoader):
184 """Loads a source file, patching its bytecode with Atheris instrumentation."""
186 def __init__(self, name: str, path: str, trace_dataflow: bool):
187 super().__init__(name, path)
188 self._trace_dataflow = trace_dataflow
190 def get_code(self, fullname: str) -> Optional[types.CodeType]:
191 code = super().get_code(fullname)
193 if code is None:
194 return None
195 else:
196 return patch_code(code, self._trace_dataflow)
199class AtherisSourcelessFileLoader(
200 _frozen_importlib_external.SourcelessFileLoader):
201 """Loads a sourceless/bytecode file, patching it with Atheris instrumentation."""
203 def __init__(self, name: str, path: str, trace_dataflow: bool):
204 super().__init__(name, path)
205 self._trace_dataflow = trace_dataflow
207 def get_code(self, fullname: str) -> Optional[types.CodeType]:
208 code = super().get_code(fullname)
210 if code is None:
211 return None
212 else:
213 return patch_code(code, self._trace_dataflow)
216def make_dynamic_atheris_loader(loader: Any, trace_dataflow: bool) -> Any:
217 """Create a loader via 'object inheritance' and return it.
219 This technique allows us to override just the get_code function on an
220 already-existing object loader. This is experimental.
222 Args:
223 loader: Loader or Loader class.
224 trace_dataflow: Whether or not to trace dataflow.
226 Returns:
227 The loader class overriden with Atheris tracing.
228 """
229 if loader.__class__ is type:
230 # This is a class with classmethods. Use regular inheritance to override
231 # get_code.
233 class DynAtherisLoaderClass(loader): # type: ignore[valid-type, misc]
235 @classmethod
236 def get_code(cls, fullname: str) -> Optional[types.CodeType]:
237 code = loader.get_code(fullname)
239 if code is None:
240 return None
241 return patch_code(code, cls._trace_dataflow)
243 return DynAtherisLoaderClass
245 # This is an object. We create a new object that's a copy of the existing
246 # object but with a custom get_code implementation.
247 class DynAtherisLoaderObject(loader.__class__): # type: ignore[name-defined]
248 """Dynamic wrapper over a loader."""
250 def __init__(self, trace_dataflow: bool):
251 self._trace_dataflow = trace_dataflow
253 def get_code(self, fullname: str) -> Optional[types.CodeType]:
254 code = super().get_code(fullname)
256 if code is None:
257 return None
258 return patch_code(code, self._trace_dataflow)
260 ret = DynAtherisLoaderObject(trace_dataflow)
261 for k, v in loader.__dict__.items():
262 if k not in ret.__dict__:
263 ret.__dict__[k] = v
265 return ret
268class HookManager:
269 """A Context manager that manages hooks."""
271 def __init__(self, include_packages: Set[str], exclude_modules: Set[str],
272 enable_loader_override: bool, trace_dataflow: bool):
273 self._include_packages = include_packages
274 self._exclude_modules = exclude_modules
275 self._enable_loader_override = enable_loader_override
276 self._trace_dataflow = trace_dataflow
278 def __enter__(self) -> "HookManager":
279 i = 0
280 while i < len(sys.meta_path):
281 if isinstance(sys.meta_path[i], AtherisMetaPathFinder):
282 return self
283 i += 1
285 i = 0
286 while i < len(sys.meta_path) and sys.meta_path[i] in [
287 _frozen_importlib.BuiltinImporter, _frozen_importlib.FrozenImporter
288 ]:
289 i += 1
291 sys.meta_path.insert(
292 i,
293 AtherisMetaPathFinder(self._include_packages, self._exclude_modules,
294 self._enable_loader_override,
295 self._trace_dataflow))
297 return self
299 def __exit__(self, *args: Any) -> None:
300 i = 0
301 while i < len(sys.meta_path):
302 if isinstance(sys.meta_path[i], AtherisMetaPathFinder):
303 sys.meta_path.pop(i)
304 else:
305 i += 1
308def instrument_imports(include: Optional[Sequence[str]] = None,
309 exclude: Optional[Sequence[str]] = None,
310 enable_loader_override: bool = True) -> HookManager:
311 """Returns a context manager that will instrument modules as imported.
313 Args:
314 include: module names that shall be instrumented. Submodules within these
315 packages will be recursively instrumented too.
316 exclude: module names that shall not be instrumented.
317 enable_loader_override: Whether or not to enable the experimental feature of
318 instrumenting custom loaders.
320 Returns:
322 Raises:
323 TypeError: If any module name is not a str.
324 ValueError: If any module name is a relative path or empty.
325 """
326 include = [] if include is None else list(include)
327 exclude = [] if exclude is None else list(exclude)
329 include_packages = set()
331 for module_name in include + exclude:
332 if not isinstance(module_name, str):
333 raise TypeError("atheris.instrument_imports() expects names of " +
334 "modules of type <str>")
335 elif not module_name:
336 raise ValueError("atheris.instrument_imports(): " +
337 "You supplied an empty module name")
338 elif module_name[0] == ".":
339 raise ValueError("atheris.instrument_imports(): Please specify fully " +
340 "qualified module names (absolute not relative)")
342 for module_name in include:
343 if "." in module_name:
344 module_name = module_name.split(".")[0]
346 include_packages.add(module_name)
348 return HookManager(
349 include_packages,
350 set(exclude),
351 enable_loader_override,
352 trace_dataflow=True)