1from contextlib import suppress
2from io import TextIOWrapper
3
4from . import abc
5
6
7class SpecLoaderAdapter:
8 """
9 Adapt a package spec to adapt the underlying loader.
10 """
11
12 def __init__(self, spec, adapter=lambda spec: spec.loader):
13 self.spec = spec
14 self.loader = adapter(spec)
15
16 def __getattr__(self, name):
17 return getattr(self.spec, name)
18
19
20class TraversableResourcesLoader:
21 """
22 Adapt a loader to provide TraversableResources.
23 """
24
25 def __init__(self, spec):
26 self.spec = spec
27
28 def get_resource_reader(self, name):
29 return CompatibilityFiles(self.spec)._native()
30
31
32def _io_wrapper(file, mode='r', *args, **kwargs):
33 if mode == 'r':
34 return TextIOWrapper(file, *args, **kwargs)
35 elif mode == 'rb':
36 return file
37 raise ValueError(f"Invalid mode value '{mode}', only 'r' and 'rb' are supported")
38
39
40class CompatibilityFiles:
41 """
42 Adapter for an existing or non-existent resource reader
43 to provide a compatibility .files().
44 """
45
46 class SpecPath(abc.Traversable):
47 """
48 Path tied to a module spec.
49 Can be read and exposes the resource reader children.
50 """
51
52 def __init__(self, spec, reader):
53 self._spec = spec
54 self._reader = reader
55
56 def iterdir(self):
57 if not self._reader:
58 return iter(())
59 return iter(
60 CompatibilityFiles.ChildPath(self._reader, path)
61 for path in self._reader.contents()
62 )
63
64 def is_file(self):
65 return False
66
67 is_dir = is_file
68
69 def joinpath(self, other):
70 if not self._reader:
71 return CompatibilityFiles.OrphanPath(other)
72 return CompatibilityFiles.ChildPath(self._reader, other)
73
74 @property
75 def name(self):
76 return self._spec.name
77
78 def open(self, mode='r', *args, **kwargs):
79 return _io_wrapper(self._reader.open_resource(None), mode, *args, **kwargs)
80
81 class ChildPath(abc.Traversable):
82 """
83 Path tied to a resource reader child.
84 Can be read but doesn't expose any meaningful children.
85 """
86
87 def __init__(self, reader, name):
88 self._reader = reader
89 self._name = name
90
91 def iterdir(self):
92 return iter(())
93
94 def is_file(self):
95 return self._reader.is_resource(self.name)
96
97 def is_dir(self):
98 return not self.is_file()
99
100 def joinpath(self, other):
101 return CompatibilityFiles.OrphanPath(self.name, other)
102
103 @property
104 def name(self):
105 return self._name
106
107 def open(self, mode='r', *args, **kwargs):
108 return _io_wrapper(
109 self._reader.open_resource(self.name), mode, *args, **kwargs
110 )
111
112 class OrphanPath(abc.Traversable):
113 """
114 Orphan path, not tied to a module spec or resource reader.
115 Can't be read and doesn't expose any meaningful children.
116 """
117
118 def __init__(self, *path_parts):
119 if len(path_parts) < 1:
120 raise ValueError('Need at least one path part to construct a path')
121 self._path = path_parts
122
123 def iterdir(self):
124 return iter(())
125
126 def is_file(self):
127 return False
128
129 is_dir = is_file
130
131 def joinpath(self, other):
132 return CompatibilityFiles.OrphanPath(*self._path, other)
133
134 @property
135 def name(self):
136 return self._path[-1]
137
138 def open(self, mode='r', *args, **kwargs):
139 raise FileNotFoundError("Can't open orphan path")
140
141 def __init__(self, spec):
142 self.spec = spec
143
144 @property
145 def _reader(self):
146 with suppress(AttributeError):
147 return self.spec.loader.get_resource_reader(self.spec.name)
148
149 def _native(self):
150 """
151 Return the native reader if it supports files().
152 """
153 reader = self._reader
154 return reader if hasattr(reader, 'files') else self
155
156 def __getattr__(self, attr):
157 return getattr(self._reader, attr)
158
159 def files(self):
160 return CompatibilityFiles.SpecPath(self.spec, self._reader)
161
162
163def wrap_spec(package):
164 """
165 Construct a package spec with traversable compatibility
166 on the spec/loader/reader.
167 """
168 return SpecLoaderAdapter(package.__spec__, TraversableResourcesLoader)