Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/importlib_resources/readers.py: 46%
84 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:03 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:03 +0000
1import collections
2import itertools
3import pathlib
4import operator
6from . import abc
8from ._itertools import only
9from ._compat import ZipPath
12def remove_duplicates(items):
13 return iter(collections.OrderedDict.fromkeys(items))
16class FileReader(abc.TraversableResources):
17 def __init__(self, loader):
18 self.path = pathlib.Path(loader.path).parent
20 def resource_path(self, resource):
21 """
22 Return the file system path to prevent
23 `resources.path()` from creating a temporary
24 copy.
25 """
26 return str(self.path.joinpath(resource))
28 def files(self):
29 return self.path
32class ZipReader(abc.TraversableResources):
33 def __init__(self, loader, module):
34 _, _, name = module.rpartition('.')
35 self.prefix = loader.prefix.replace('\\', '/') + name + '/'
36 self.archive = loader.archive
38 def open_resource(self, resource):
39 try:
40 return super().open_resource(resource)
41 except KeyError as exc:
42 raise FileNotFoundError(exc.args[0])
44 def is_resource(self, path):
45 """
46 Workaround for `zipfile.Path.is_file` returning true
47 for non-existent paths.
48 """
49 target = self.files().joinpath(path)
50 return target.is_file() and target.exists()
52 def files(self):
53 return ZipPath(self.archive, self.prefix)
56class MultiplexedPath(abc.Traversable):
57 """
58 Given a series of Traversable objects, implement a merged
59 version of the interface across all objects. Useful for
60 namespace packages which may be multihomed at a single
61 name.
62 """
64 def __init__(self, *paths):
65 self._paths = list(map(pathlib.Path, remove_duplicates(paths)))
66 if not self._paths:
67 message = 'MultiplexedPath must contain at least one path'
68 raise FileNotFoundError(message)
69 if not all(path.is_dir() for path in self._paths):
70 raise NotADirectoryError('MultiplexedPath only supports directories')
72 def iterdir(self):
73 children = (child for path in self._paths for child in path.iterdir())
74 by_name = operator.attrgetter('name')
75 groups = itertools.groupby(sorted(children, key=by_name), key=by_name)
76 return map(self._follow, (locs for name, locs in groups))
78 def read_bytes(self):
79 raise FileNotFoundError(f'{self} is not a file')
81 def read_text(self, *args, **kwargs):
82 raise FileNotFoundError(f'{self} is not a file')
84 def is_dir(self):
85 return True
87 def is_file(self):
88 return False
90 def joinpath(self, *descendants):
91 try:
92 return super().joinpath(*descendants)
93 except abc.TraversalError:
94 # One of the paths did not resolve (a directory does not exist).
95 # Just return something that will not exist.
96 return self._paths[0].joinpath(*descendants)
98 @classmethod
99 def _follow(cls, children):
100 """
101 Construct a MultiplexedPath if needed.
103 If children contains a sole element, return it.
104 Otherwise, return a MultiplexedPath of the items.
105 Unless one of the items is not a Directory, then return the first.
106 """
107 subdirs, one_dir, one_file = itertools.tee(children, 3)
109 try:
110 return only(one_dir)
111 except ValueError:
112 try:
113 return cls(*subdirs)
114 except NotADirectoryError:
115 return next(one_file)
117 def open(self, *args, **kwargs):
118 raise FileNotFoundError(f'{self} is not a file')
120 @property
121 def name(self):
122 return self._paths[0].name
124 def __repr__(self):
125 paths = ', '.join(f"'{path}'" for path in self._paths)
126 return f'MultiplexedPath({paths})'
129class NamespaceReader(abc.TraversableResources):
130 def __init__(self, namespace_path):
131 if 'NamespacePath' not in str(namespace_path):
132 raise ValueError('Invalid path')
133 self.path = MultiplexedPath(*list(namespace_path))
135 def resource_path(self, resource):
136 """
137 Return the file system path to prevent
138 `resources.path()` from creating a temporary
139 copy.
140 """
141 return str(self.path.joinpath(resource))
143 def files(self):
144 return self.path