Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/importlib_resources/readers.py: 46%

84 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:12 +0000

1import collections 

2import itertools 

3import pathlib 

4import operator 

5 

6from . import abc 

7 

8from ._itertools import only 

9from ._compat import ZipPath 

10 

11 

12def remove_duplicates(items): 

13 return iter(collections.OrderedDict.fromkeys(items)) 

14 

15 

16class FileReader(abc.TraversableResources): 

17 def __init__(self, loader): 

18 self.path = pathlib.Path(loader.path).parent 

19 

20 def resource_path(self, resource): 

21 """ 

22 Return the file system path to prevent 

23 `resources.path()` from creating a temporary 

24 copy. 

25 """ 

26 return str(self.path.joinpath(resource)) 

27 

28 def files(self): 

29 return self.path 

30 

31 

32class ZipReader(abc.TraversableResources): 

33 def __init__(self, loader, module): 

34 _, _, name = module.rpartition('.') 

35 self.prefix = loader.prefix.replace('\\', '/') + name + '/' 

36 self.archive = loader.archive 

37 

38 def open_resource(self, resource): 

39 try: 

40 return super().open_resource(resource) 

41 except KeyError as exc: 

42 raise FileNotFoundError(exc.args[0]) 

43 

44 def is_resource(self, path): 

45 """ 

46 Workaround for `zipfile.Path.is_file` returning true 

47 for non-existent paths. 

48 """ 

49 target = self.files().joinpath(path) 

50 return target.is_file() and target.exists() 

51 

52 def files(self): 

53 return ZipPath(self.archive, self.prefix) 

54 

55 

56class MultiplexedPath(abc.Traversable): 

57 """ 

58 Given a series of Traversable objects, implement a merged 

59 version of the interface across all objects. Useful for 

60 namespace packages which may be multihomed at a single 

61 name. 

62 """ 

63 

64 def __init__(self, *paths): 

65 self._paths = list(map(pathlib.Path, remove_duplicates(paths))) 

66 if not self._paths: 

67 message = 'MultiplexedPath must contain at least one path' 

68 raise FileNotFoundError(message) 

69 if not all(path.is_dir() for path in self._paths): 

70 raise NotADirectoryError('MultiplexedPath only supports directories') 

71 

72 def iterdir(self): 

73 children = (child for path in self._paths for child in path.iterdir()) 

74 by_name = operator.attrgetter('name') 

75 groups = itertools.groupby(sorted(children, key=by_name), key=by_name) 

76 return map(self._follow, (locs for name, locs in groups)) 

77 

78 def read_bytes(self): 

79 raise FileNotFoundError(f'{self} is not a file') 

80 

81 def read_text(self, *args, **kwargs): 

82 raise FileNotFoundError(f'{self} is not a file') 

83 

84 def is_dir(self): 

85 return True 

86 

87 def is_file(self): 

88 return False 

89 

90 def joinpath(self, *descendants): 

91 try: 

92 return super().joinpath(*descendants) 

93 except abc.TraversalError: 

94 # One of the paths did not resolve (a directory does not exist). 

95 # Just return something that will not exist. 

96 return self._paths[0].joinpath(*descendants) 

97 

98 @classmethod 

99 def _follow(cls, children): 

100 """ 

101 Construct a MultiplexedPath if needed. 

102 

103 If children contains a sole element, return it. 

104 Otherwise, return a MultiplexedPath of the items. 

105 Unless one of the items is not a Directory, then return the first. 

106 """ 

107 subdirs, one_dir, one_file = itertools.tee(children, 3) 

108 

109 try: 

110 return only(one_dir) 

111 except ValueError: 

112 try: 

113 return cls(*subdirs) 

114 except NotADirectoryError: 

115 return next(one_file) 

116 

117 def open(self, *args, **kwargs): 

118 raise FileNotFoundError(f'{self} is not a file') 

119 

120 @property 

121 def name(self): 

122 return self._paths[0].name 

123 

124 def __repr__(self): 

125 paths = ', '.join(f"'{path}'" for path in self._paths) 

126 return f'MultiplexedPath({paths})' 

127 

128 

129class NamespaceReader(abc.TraversableResources): 

130 def __init__(self, namespace_path): 

131 if 'NamespacePath' not in str(namespace_path): 

132 raise ValueError('Invalid path') 

133 self.path = MultiplexedPath(*list(namespace_path)) 

134 

135 def resource_path(self, resource): 

136 """ 

137 Return the file system path to prevent 

138 `resources.path()` from creating a temporary 

139 copy. 

140 """ 

141 return str(self.path.joinpath(resource)) 

142 

143 def files(self): 

144 return self.path