Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/importlib_resources/_common.py: 52%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

102 statements  

1import os 

2import pathlib 

3import tempfile 

4import functools 

5import contextlib 

6import types 

7import importlib 

8import inspect 

9import warnings 

10import itertools 

11 

12from typing import Union, Optional, cast 

13from .abc import ResourceReader, Traversable 

14 

15Package = Union[types.ModuleType, str] 

16Anchor = Package 

17 

18 

19def package_to_anchor(func): 

20 """ 

21 Replace 'package' parameter as 'anchor' and warn about the change. 

22 

23 Other errors should fall through. 

24 

25 >>> files('a', 'b') 

26 Traceback (most recent call last): 

27 TypeError: files() takes from 0 to 1 positional arguments but 2 were given 

28 

29 Remove this compatibility in Python 3.14. 

30 """ 

31 undefined = object() 

32 

33 @functools.wraps(func) 

34 def wrapper(anchor=undefined, package=undefined): 

35 if package is not undefined: 

36 if anchor is not undefined: 

37 return func(anchor, package) 

38 warnings.warn( 

39 "First parameter to files is renamed to 'anchor'", 

40 DeprecationWarning, 

41 stacklevel=2, 

42 ) 

43 return func(package) 

44 elif anchor is undefined: 

45 return func() 

46 return func(anchor) 

47 

48 return wrapper 

49 

50 

51@package_to_anchor 

52def files(anchor: Optional[Anchor] = None) -> Traversable: 

53 """ 

54 Get a Traversable resource for an anchor. 

55 """ 

56 return from_package(resolve(anchor)) 

57 

58 

59def get_resource_reader(package: types.ModuleType) -> Optional[ResourceReader]: 

60 """ 

61 Return the package's loader if it's a ResourceReader. 

62 """ 

63 # We can't use 

64 # a issubclass() check here because apparently abc.'s __subclasscheck__() 

65 # hook wants to create a weak reference to the object, but 

66 # zipimport.zipimporter does not support weak references, resulting in a 

67 # TypeError. That seems terrible. 

68 spec = package.__spec__ 

69 reader = getattr(spec.loader, 'get_resource_reader', None) # type: ignore[union-attr] 

70 if reader is None: 

71 return None 

72 return reader(spec.name) # type: ignore[union-attr] 

73 

74 

75@functools.singledispatch 

76def resolve(cand: Optional[Anchor]) -> types.ModuleType: 

77 return cast(types.ModuleType, cand) 

78 

79 

80@resolve.register 

81def _(cand: str) -> types.ModuleType: 

82 return importlib.import_module(cand) 

83 

84 

85@resolve.register 

86def _(cand: None) -> types.ModuleType: 

87 return resolve(_infer_caller().f_globals['__name__']) 

88 

89 

90def _infer_caller(): 

91 """ 

92 Walk the stack and find the frame of the first caller not in this module. 

93 """ 

94 

95 def is_this_file(frame_info): 

96 return frame_info.filename == stack[0].filename 

97 

98 def is_wrapper(frame_info): 

99 return frame_info.function == 'wrapper' 

100 

101 stack = inspect.stack() 

102 not_this_file = itertools.filterfalse(is_this_file, stack) 

103 # also exclude 'wrapper' due to singledispatch in the call stack 

104 callers = itertools.filterfalse(is_wrapper, not_this_file) 

105 return next(callers).frame 

106 

107 

108def from_package(package: types.ModuleType): 

109 """ 

110 Return a Traversable object for the given package. 

111 

112 """ 

113 # deferred for performance (python/cpython#109829) 

114 from .future.adapters import wrap_spec 

115 

116 spec = wrap_spec(package) 

117 reader = spec.loader.get_resource_reader(spec.name) 

118 return reader.files() 

119 

120 

121@contextlib.contextmanager 

122def _tempfile( 

123 reader, 

124 suffix='', 

125 # gh-93353: Keep a reference to call os.remove() in late Python 

126 # finalization. 

127 *, 

128 _os_remove=os.remove, 

129): 

130 # Not using tempfile.NamedTemporaryFile as it leads to deeper 'try' 

131 # blocks due to the need to close the temporary file to work on Windows 

132 # properly. 

133 fd, raw_path = tempfile.mkstemp(suffix=suffix) 

134 try: 

135 try: 

136 os.write(fd, reader()) 

137 finally: 

138 os.close(fd) 

139 del reader 

140 yield pathlib.Path(raw_path) 

141 finally: 

142 try: 

143 _os_remove(raw_path) 

144 except FileNotFoundError: 

145 pass 

146 

147 

148def _temp_file(path): 

149 return _tempfile(path.read_bytes, suffix=path.name) 

150 

151 

152def _is_present_dir(path: Traversable) -> bool: 

153 """ 

154 Some Traversables implement ``is_dir()`` to raise an 

155 exception (i.e. ``FileNotFoundError``) when the 

156 directory doesn't exist. This function wraps that call 

157 to always return a boolean and only return True 

158 if there's a dir and it exists. 

159 """ 

160 with contextlib.suppress(FileNotFoundError): 

161 return path.is_dir() 

162 return False 

163 

164 

165@functools.singledispatch 

166def as_file(path): 

167 """ 

168 Given a Traversable object, return that object as a 

169 path on the local file system in a context manager. 

170 """ 

171 return _temp_dir(path) if _is_present_dir(path) else _temp_file(path) 

172 

173 

174@as_file.register(pathlib.Path) 

175@contextlib.contextmanager 

176def _(path): 

177 """ 

178 Degenerate behavior for pathlib.Path objects. 

179 """ 

180 yield path 

181 

182 

183@contextlib.contextmanager 

184def _temp_path(dir: tempfile.TemporaryDirectory): 

185 """ 

186 Wrap tempfile.TemporyDirectory to return a pathlib object. 

187 """ 

188 with dir as result: 

189 yield pathlib.Path(result) 

190 

191 

192@contextlib.contextmanager 

193def _temp_dir(path): 

194 """ 

195 Given a traversable dir, recursively replicate the whole tree 

196 to the file system in a context manager. 

197 """ 

198 assert path.is_dir() 

199 with _temp_path(tempfile.TemporaryDirectory()) as temp_dir: 

200 yield _write_contents(temp_dir, path) 

201 

202 

203def _write_contents(target, source): 

204 child = target.joinpath(source.name) 

205 if source.is_dir(): 

206 child.mkdir() 

207 for item in source.iterdir(): 

208 _write_contents(child, item) 

209 else: 

210 child.write_bytes(source.read_bytes()) 

211 return child