Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/importlib_resources/_common.py: 52%

101 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1import os 

2import pathlib 

3import tempfile 

4import functools 

5import contextlib 

6import types 

7import importlib 

8import inspect 

9import warnings 

10import itertools 

11 

12from typing import Union, Optional, cast 

13from .abc import ResourceReader, Traversable 

14 

15from ._compat import wrap_spec 

16 

17Package = Union[types.ModuleType, str] 

18Anchor = Package 

19 

20 

21def package_to_anchor(func): 

22 """ 

23 Replace 'package' parameter as 'anchor' and warn about the change. 

24 

25 Other errors should fall through. 

26 

27 >>> files('a', 'b') 

28 Traceback (most recent call last): 

29 TypeError: files() takes from 0 to 1 positional arguments but 2 were given 

30 """ 

31 undefined = object() 

32 

33 @functools.wraps(func) 

34 def wrapper(anchor=undefined, package=undefined): 

35 if package is not undefined: 

36 if anchor is not undefined: 

37 return func(anchor, package) 

38 warnings.warn( 

39 "First parameter to files is renamed to 'anchor'", 

40 DeprecationWarning, 

41 stacklevel=2, 

42 ) 

43 return func(package) 

44 elif anchor is undefined: 

45 return func() 

46 return func(anchor) 

47 

48 return wrapper 

49 

50 

51@package_to_anchor 

52def files(anchor: Optional[Anchor] = None) -> Traversable: 

53 """ 

54 Get a Traversable resource for an anchor. 

55 """ 

56 return from_package(resolve(anchor)) 

57 

58 

59def get_resource_reader(package: types.ModuleType) -> Optional[ResourceReader]: 

60 """ 

61 Return the package's loader if it's a ResourceReader. 

62 """ 

63 # We can't use 

64 # a issubclass() check here because apparently abc.'s __subclasscheck__() 

65 # hook wants to create a weak reference to the object, but 

66 # zipimport.zipimporter does not support weak references, resulting in a 

67 # TypeError. That seems terrible. 

68 spec = package.__spec__ 

69 reader = getattr(spec.loader, 'get_resource_reader', None) # type: ignore 

70 if reader is None: 

71 return None 

72 return reader(spec.name) # type: ignore 

73 

74 

75@functools.singledispatch 

76def resolve(cand: Optional[Anchor]) -> types.ModuleType: 

77 return cast(types.ModuleType, cand) 

78 

79 

80@resolve.register 

81def _(cand: str) -> types.ModuleType: 

82 return importlib.import_module(cand) 

83 

84 

85@resolve.register 

86def _(cand: None) -> types.ModuleType: 

87 return resolve(_infer_caller().f_globals['__name__']) 

88 

89 

90def _infer_caller(): 

91 """ 

92 Walk the stack and find the frame of the first caller not in this module. 

93 """ 

94 

95 def is_this_file(frame_info): 

96 return frame_info.filename == __file__ 

97 

98 def is_wrapper(frame_info): 

99 return frame_info.function == 'wrapper' 

100 

101 not_this_file = itertools.filterfalse(is_this_file, inspect.stack()) 

102 # also exclude 'wrapper' due to singledispatch in the call stack 

103 callers = itertools.filterfalse(is_wrapper, not_this_file) 

104 return next(callers).frame 

105 

106 

107def from_package(package: types.ModuleType): 

108 """ 

109 Return a Traversable object for the given package. 

110 

111 """ 

112 spec = wrap_spec(package) 

113 reader = spec.loader.get_resource_reader(spec.name) 

114 return reader.files() 

115 

116 

117@contextlib.contextmanager 

118def _tempfile( 

119 reader, 

120 suffix='', 

121 # gh-93353: Keep a reference to call os.remove() in late Python 

122 # finalization. 

123 *, 

124 _os_remove=os.remove, 

125): 

126 # Not using tempfile.NamedTemporaryFile as it leads to deeper 'try' 

127 # blocks due to the need to close the temporary file to work on Windows 

128 # properly. 

129 fd, raw_path = tempfile.mkstemp(suffix=suffix) 

130 try: 

131 try: 

132 os.write(fd, reader()) 

133 finally: 

134 os.close(fd) 

135 del reader 

136 yield pathlib.Path(raw_path) 

137 finally: 

138 try: 

139 _os_remove(raw_path) 

140 except FileNotFoundError: 

141 pass 

142 

143 

144def _temp_file(path): 

145 return _tempfile(path.read_bytes, suffix=path.name) 

146 

147 

148def _is_present_dir(path: Traversable) -> bool: 

149 """ 

150 Some Traversables implement ``is_dir()`` to raise an 

151 exception (i.e. ``FileNotFoundError``) when the 

152 directory doesn't exist. This function wraps that call 

153 to always return a boolean and only return True 

154 if there's a dir and it exists. 

155 """ 

156 with contextlib.suppress(FileNotFoundError): 

157 return path.is_dir() 

158 return False 

159 

160 

161@functools.singledispatch 

162def as_file(path): 

163 """ 

164 Given a Traversable object, return that object as a 

165 path on the local file system in a context manager. 

166 """ 

167 return _temp_dir(path) if _is_present_dir(path) else _temp_file(path) 

168 

169 

170@as_file.register(pathlib.Path) 

171@contextlib.contextmanager 

172def _(path): 

173 """ 

174 Degenerate behavior for pathlib.Path objects. 

175 """ 

176 yield path 

177 

178 

179@contextlib.contextmanager 

180def _temp_path(dir: tempfile.TemporaryDirectory): 

181 """ 

182 Wrap tempfile.TemporyDirectory to return a pathlib object. 

183 """ 

184 with dir as result: 

185 yield pathlib.Path(result) 

186 

187 

188@contextlib.contextmanager 

189def _temp_dir(path): 

190 """ 

191 Given a traversable dir, recursively replicate the whole tree 

192 to the file system in a context manager. 

193 """ 

194 assert path.is_dir() 

195 with _temp_path(tempfile.TemporaryDirectory()) as temp_dir: 

196 yield _write_contents(temp_dir, path) 

197 

198 

199def _write_contents(target, source): 

200 child = target.joinpath(source.name) 

201 if source.is_dir(): 

202 child.mkdir() 

203 for item in source.iterdir(): 

204 _write_contents(child, item) 

205 else: 

206 child.write_bytes(source.read_bytes()) 

207 return child