Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/upath/_stat.py: 33%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import os
4import warnings
5from datetime import datetime
6from stat import S_IFDIR
7from stat import S_IFLNK
8from stat import S_IFREG
9from typing import Any
10from typing import Iterator
11from typing import Mapping
12from typing import Sequence
14__all__ = [
15 "UPathStatResult",
16]
19def _convert_value_to_timestamp(value: Any) -> int | float:
20 """Try to convert a datetime-like value to a timestamp."""
21 if isinstance(value, (int, float)):
22 return value
23 elif isinstance(value, str):
24 if value.endswith("Z"):
25 value = value[:-1] + "+00:00"
26 return datetime.fromisoformat(value).timestamp()
27 elif isinstance(value, datetime):
28 return value.timestamp()
29 else:
30 warnings.warn(
31 f"Cannot convert {value!r} of type {type(value)!r} to a timestamp."
32 " Please report this at: https://github.com/fsspec/universal_path/issues",
33 RuntimeWarning,
34 stacklevel=2,
35 )
36 raise TypeError(f"Cannot convert {value!r} to a timestamp.")
39def _get_stat_result_extra_fields() -> tuple[str, ...]:
40 """retrieve the extra fields of the os.stat_result class."""
41 # Note:
42 # The lines below let us provide a dictionary with the additional
43 # named fields of the stat_result class as keys and the internal
44 # index of the field as value.
45 sr = os.stat_result(range(os.stat_result.n_fields))
46 rd = sr.__reduce__()
47 assert isinstance(rd, tuple), "unexpected return os.stat_result.__reduce__"
48 _, (_, extra) = rd
49 extra_fields = sorted(extra, key=extra.__getitem__)
50 return tuple(extra_fields)
53class UPathStatResult:
54 """A stat_result compatible class wrapping fsspec info dicts.
56 **Note**: It is unlikely that you will ever have to instantiate
57 this class directly. If you want to convert and info dict,
58 use: `UPathStatResult.from_info(info)`
60 This object may be accessed either as a tuple of
61 (mode, ino, dev, nlink, uid, gid, size, atime, mtime, ctime)
62 or via the attributes st_mode, st_ino, st_dev, st_nlink, st_uid, and so on.
64 There's an additional method `as_info()` for accessing the info dict.
65 This is useful to access additional information provided by the file system
66 implementation, that's not covered by the stat_result tuple.
68 """
70 __slots__ = ("_seq", "_info")
71 # Note:
72 # can't derive from os.stat_result at all, and can't derive from
73 # tuple and have slots. So we duck type the os.stat_result class
75 # Add the fields and "extra fields" of the os.stat_result class
76 _fields = (
77 "st_mode",
78 "st_ino",
79 "st_dev",
80 "st_nlink",
81 "st_uid",
82 "st_gid",
83 "st_size",
84 "st_atime",
85 "st_mtime",
86 "st_ctime",
87 )
88 _fields_extra = _get_stat_result_extra_fields()
90 # Provide the n_ attributes of the os.stat_result class for compatibility
91 n_sequence_fields = len(_fields)
92 n_fields = len(_fields) + len(_fields_extra)
93 n_unnamed_fields = len(set(_fields_extra).intersection(_fields))
95 if (
96 n_fields != os.stat_result.n_fields
97 or n_sequence_fields != os.stat_result.n_sequence_fields
98 or n_unnamed_fields != os.stat_result.n_unnamed_fields
99 ):
100 warnings.warn(
101 "UPathStatResult: The assumed number of fields in the"
102 " stat_result class is not correct. Got: "
103 f" {_fields!r}, {_fields_extra!r}, {os.stat_result.n_fields}"
104 " This might cause problems? Please report this issue at:"
105 " https://github.com/fsspec/universal_path/issues",
106 RuntimeWarning,
107 stacklevel=2,
108 )
110 def __init__(
111 self,
112 stat_result_seq: Sequence[int],
113 info_dict: Mapping[str, Any] | None = None,
114 ) -> None:
115 """init compatible with os.stat_result
117 Use `UPathStatResult.from_info(info)` to instantiate from a fsspec info.
118 """
119 seq = tuple(stat_result_seq)
120 if n := len(seq) < self.n_sequence_fields:
121 raise TypeError(
122 f"{self.__name__} takes at least {self.n_fields}-sequence"
123 " ({n}-sequence given)"
124 )
125 elif n > self.n_fields:
126 raise TypeError(
127 f"{self.__name__} takes at most {self.n_fields}-sequence"
128 " ({n}-sequence given)"
129 )
130 elif self.n_sequence_fields <= n < self.n_sequence_fields:
131 warnings.warn(
132 "UPathStatResult: The seq provided more than"
133 f" {self.n_sequence_fields} items. Ignoring the extra items...",
134 UserWarning,
135 stacklevel=2,
136 )
137 self._seq = seq[: self.n_sequence_fields]
138 self._info = info_dict or {}
140 def __repr__(self):
141 cls_name = type(self).__name__
142 seq_attrs = ", ".join(map("{0[0]}={0[1]}".format, zip(self._fields, self)))
143 return f"{cls_name}({seq_attrs}, info={self._info!r})"
145 def __eq__(self, other):
146 if not isinstance(other, UPathStatResult):
147 return NotImplemented
148 else:
149 return self._info == other._info
151 # --- access to the fsspec info dict ------------------------------
153 @classmethod
154 def from_info(cls, info: Mapping[str, Any]) -> UPathStatResult:
155 """Create a UPathStatResult from a fsspec info dict."""
156 # fill all the fallback default values with 0
157 defaults = [0] * cls.n_sequence_fields
158 return cls(defaults, info)
160 def as_info(self) -> Mapping[str, Any]:
161 """Return the fsspec info dict."""
162 return self._info
164 # --- guaranteed fields -------------------------------------------
166 @property
167 def st_mode(self) -> int:
168 """protection bits"""
169 mode = self._info.get("mode")
170 if isinstance(mode, int):
171 return mode
172 elif isinstance(mode, str):
173 try:
174 return int(mode, 8)
175 except ValueError:
176 pass
178 type_ = self._info.get("type")
179 if type_ == "file":
180 return S_IFREG # see: stat.S_ISREG
181 elif type_ == "directory":
182 return S_IFDIR # see: stat.S_ISDIR
184 if self._info.get("isLink"):
185 return S_IFLNK # see: stat.S_ISLNK
187 return self._seq[0]
189 @property
190 def st_ino(self) -> int:
191 """inode"""
192 ino = self._info.get("ino")
193 if isinstance(ino, int):
194 return ino
195 return self._seq[1]
197 @property
198 def st_dev(self) -> int:
199 """device"""
200 dev = self._info.get("dev")
201 if isinstance(dev, int):
202 return dev
203 return self._seq[2]
205 @property
206 def st_nlink(self) -> int:
207 """number of hard links"""
208 nlink = self._info.get("nlink")
209 if isinstance(nlink, int):
210 return nlink
211 return self._seq[3]
213 @property
214 def st_uid(self) -> int:
215 """user ID of owner"""
216 for key in ["uid", "owner", "uname", "unix.owner"]:
217 try:
218 return int(self._info[key])
219 except (ValueError, TypeError, KeyError):
220 pass
221 return self._seq[4]
223 @property
224 def st_gid(self) -> int:
225 """group ID of owner"""
226 for key in ["gid", "group", "gname", "unix.group"]:
227 try:
228 return int(self._info[key])
229 except (ValueError, TypeError, KeyError):
230 pass
231 return self._seq[5]
233 @property
234 def st_size(self) -> int:
235 """total size, in bytes"""
236 try:
237 return int(self._info["size"])
238 except (ValueError, TypeError, KeyError):
239 return self._seq[6]
241 @property
242 def st_atime(self) -> int | float:
243 """time of last access"""
244 for key in ["atime", "time", "last_accessed", "accessTime"]:
245 try:
246 raw_value = self._info[key]
247 except KeyError:
248 continue
249 try:
250 return _convert_value_to_timestamp(raw_value)
251 except (TypeError, ValueError):
252 pass
253 return self._seq[7]
255 @property
256 def st_mtime(self) -> int | float:
257 """time of last modification"""
258 for key in [
259 "mtime",
260 "LastModified",
261 "last_modified",
262 "timeModified",
263 "modificationTime",
264 "modified_at",
265 ]:
266 try:
267 raw_value = self._info[key]
268 except KeyError:
269 continue
270 try:
271 return _convert_value_to_timestamp(raw_value)
272 except (TypeError, ValueError):
273 pass
274 return self._seq[8]
276 @property
277 def st_ctime(self) -> int | float:
278 """time of last change"""
279 try:
280 raw_value = self._info["ctime"]
281 except KeyError:
282 pass
283 else:
284 try:
285 return _convert_value_to_timestamp(raw_value)
286 except (TypeError, ValueError):
287 pass
288 return self._seq[9]
290 @property
291 def st_birthtime(self) -> int | float:
292 """time of creation"""
293 for key in [
294 "birthtime",
295 "created",
296 "creation_time",
297 "timeCreated",
298 "created_at",
299 ]:
300 try:
301 raw_value = self._info[key]
302 except KeyError:
303 continue
304 try:
305 return _convert_value_to_timestamp(raw_value)
306 except (TypeError, ValueError):
307 pass
308 raise AttributeError("birthtime")
310 # --- extra fields ------------------------------------------------
312 def __getattr__(self, item):
313 if item in self._fields_extra:
314 return 0 # fallback default value
315 raise AttributeError(item)
317 # --- os.stat_result tuple interface ------------------------------
319 def __len__(self) -> int:
320 return len(self._fields)
322 def __iter__(self) -> Iterator[int]:
323 """the sequence interface iterates over the guaranteed fields.
325 All values are integers.
326 """
327 for field in self._fields:
328 yield int(getattr(self, field))
330 def index(self, value: int, start: int = 0, stop: int | None = None, /) -> int:
331 """the sequence interface index method."""
332 if stop is None:
333 stop = len(self._seq)
334 return self._seq.index(value, start, stop)
336 def count(self, value: int) -> int:
337 """the sequence interface count method."""
338 return self._seq.count(value)
340 # --- compatibility with the fsspec info dict interface ------------
342 def __getitem__(self, item: int | str) -> Any:
343 if isinstance(item, str):
344 warnings.warn(
345 "Access the fsspec info via `.as_info()[key]`",
346 DeprecationWarning,
347 stacklevel=2,
348 )
349 return self._info[item]
350 # we need to go via the attributes and cast to int
351 attr = self._fields[item]
352 return int(getattr(self, attr))
354 def keys(self):
355 """compatibility with the fsspec info dict interface."""
356 warnings.warn(
357 "Access the fsspec info via `.as_info().keys()`",
358 DeprecationWarning,
359 stacklevel=2,
360 )
361 return self._info.keys()
363 def values(self):
364 """compatibility with the fsspec info dict interface."""
365 warnings.warn(
366 "Access the fsspec info via `.as_info().values()`",
367 DeprecationWarning,
368 stacklevel=2,
369 )
370 return self._info.values()
372 def items(self):
373 """compatibility with the fsspec info dict interface."""
374 warnings.warn(
375 "Access the fsspec info via `.as_info().items()`",
376 DeprecationWarning,
377 stacklevel=2,
378 )
379 return self._info.items()
381 def get(self, key, default=None):
382 """compatibility with the fsspec info dict interface."""
383 warnings.warn(
384 "Access the fsspec info via `.as_info().get(key, default)`",
385 DeprecationWarning,
386 stacklevel=2,
387 )
388 return self._info.get(key, default)
390 def copy(self):
391 """compatibility with the fsspec info dict interface."""
392 warnings.warn(
393 "Access the fsspec info via `.as_info().copy()`",
394 DeprecationWarning,
395 stacklevel=2,
396 )
397 return self._info.copy()