Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/importlib_resources/_common.py: 52%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

101 statements  

1import os 

2import pathlib 

3import tempfile 

4import functools 

5import contextlib 

6import types 

7import importlib 

8import inspect 

9import warnings 

10import itertools 

11 

12from typing import Union, Optional, cast 

13from .abc import ResourceReader, Traversable 

14 

15Package = Union[types.ModuleType, str] 

16Anchor = Package 

17 

18 

19def package_to_anchor(func): 

20 """ 

21 Replace 'package' parameter as 'anchor' and warn about the change. 

22 

23 Other errors should fall through. 

24 

25 >>> files('a', 'b') 

26 Traceback (most recent call last): 

27 TypeError: files() takes from 0 to 1 positional arguments but 2 were given 

28 

29 Remove this compatibility in Python 3.14. 

30 """ 

31 undefined = object() 

32 

33 @functools.wraps(func) 

34 def wrapper(anchor=undefined, package=undefined): 

35 if package is not undefined: 

36 if anchor is not undefined: 

37 return func(anchor, package) 

38 warnings.warn( 

39 "First parameter to files is renamed to 'anchor'", 

40 DeprecationWarning, 

41 stacklevel=2, 

42 ) 

43 return func(package) 

44 elif anchor is undefined: 

45 return func() 

46 return func(anchor) 

47 

48 return wrapper 

49 

50 

51@package_to_anchor 

52def files(anchor: Optional[Anchor] = None) -> Traversable: 

53 """ 

54 Get a Traversable resource for an anchor. 

55 """ 

56 return from_package(resolve(anchor)) 

57 

58 

59def get_resource_reader(package: types.ModuleType) -> Optional[ResourceReader]: 

60 """ 

61 Return the package's loader if it's a ResourceReader. 

62 """ 

63 # We can't use 

64 # a issubclass() check here because apparently abc.'s __subclasscheck__() 

65 # hook wants to create a weak reference to the object, but 

66 # zipimport.zipimporter does not support weak references, resulting in a 

67 # TypeError. That seems terrible. 

68 spec = package.__spec__ 

69 reader = getattr(spec.loader, 'get_resource_reader', None) # type: ignore 

70 if reader is None: 

71 return None 

72 return reader(spec.name) # type: ignore 

73 

74 

75@functools.singledispatch 

76def resolve(cand: Optional[Anchor]) -> types.ModuleType: 

77 return cast(types.ModuleType, cand) 

78 

79 

80@resolve.register 

81def _(cand: str) -> types.ModuleType: 

82 return importlib.import_module(cand) 

83 

84 

85@resolve.register 

86def _(cand: None) -> types.ModuleType: 

87 return resolve(_infer_caller().f_globals['__name__']) 

88 

89 

90def _infer_caller(): 

91 """ 

92 Walk the stack and find the frame of the first caller not in this module. 

93 """ 

94 

95 def is_this_file(frame_info): 

96 return frame_info.filename == __file__ 

97 

98 def is_wrapper(frame_info): 

99 return frame_info.function == 'wrapper' 

100 

101 not_this_file = itertools.filterfalse(is_this_file, inspect.stack()) 

102 # also exclude 'wrapper' due to singledispatch in the call stack 

103 callers = itertools.filterfalse(is_wrapper, not_this_file) 

104 return next(callers).frame 

105 

106 

107def from_package(package: types.ModuleType): 

108 """ 

109 Return a Traversable object for the given package. 

110 

111 """ 

112 # deferred for performance (python/cpython#109829) 

113 from .future.adapters import wrap_spec 

114 

115 spec = wrap_spec(package) 

116 reader = spec.loader.get_resource_reader(spec.name) 

117 return reader.files() 

118 

119 

120@contextlib.contextmanager 

121def _tempfile( 

122 reader, 

123 suffix='', 

124 # gh-93353: Keep a reference to call os.remove() in late Python 

125 # finalization. 

126 *, 

127 _os_remove=os.remove, 

128): 

129 # Not using tempfile.NamedTemporaryFile as it leads to deeper 'try' 

130 # blocks due to the need to close the temporary file to work on Windows 

131 # properly. 

132 fd, raw_path = tempfile.mkstemp(suffix=suffix) 

133 try: 

134 try: 

135 os.write(fd, reader()) 

136 finally: 

137 os.close(fd) 

138 del reader 

139 yield pathlib.Path(raw_path) 

140 finally: 

141 try: 

142 _os_remove(raw_path) 

143 except FileNotFoundError: 

144 pass 

145 

146 

147def _temp_file(path): 

148 return _tempfile(path.read_bytes, suffix=path.name) 

149 

150 

151def _is_present_dir(path: Traversable) -> bool: 

152 """ 

153 Some Traversables implement ``is_dir()`` to raise an 

154 exception (i.e. ``FileNotFoundError``) when the 

155 directory doesn't exist. This function wraps that call 

156 to always return a boolean and only return True 

157 if there's a dir and it exists. 

158 """ 

159 with contextlib.suppress(FileNotFoundError): 

160 return path.is_dir() 

161 return False 

162 

163 

164@functools.singledispatch 

165def as_file(path): 

166 """ 

167 Given a Traversable object, return that object as a 

168 path on the local file system in a context manager. 

169 """ 

170 return _temp_dir(path) if _is_present_dir(path) else _temp_file(path) 

171 

172 

173@as_file.register(pathlib.Path) 

174@contextlib.contextmanager 

175def _(path): 

176 """ 

177 Degenerate behavior for pathlib.Path objects. 

178 """ 

179 yield path 

180 

181 

182@contextlib.contextmanager 

183def _temp_path(dir: tempfile.TemporaryDirectory): 

184 """ 

185 Wrap tempfile.TemporyDirectory to return a pathlib object. 

186 """ 

187 with dir as result: 

188 yield pathlib.Path(result) 

189 

190 

191@contextlib.contextmanager 

192def _temp_dir(path): 

193 """ 

194 Given a traversable dir, recursively replicate the whole tree 

195 to the file system in a context manager. 

196 """ 

197 assert path.is_dir() 

198 with _temp_path(tempfile.TemporaryDirectory()) as temp_dir: 

199 yield _write_contents(temp_dir, path) 

200 

201 

202def _write_contents(target, source): 

203 child = target.joinpath(source.name) 

204 if source.is_dir(): 

205 child.mkdir() 

206 for item in source.iterdir(): 

207 _write_contents(child, item) 

208 else: 

209 child.write_bytes(source.read_bytes()) 

210 return child