Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/upath/_stat.py: 31%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import os
4import warnings
5from collections.abc import Iterator
6from collections.abc import Mapping
7from collections.abc import Sequence
8from datetime import datetime
9from stat import S_IFDIR
10from stat import S_IFLNK
11from stat import S_IFREG
12from typing import Any
14__all__ = [
15 "UPathStatResult",
16]
19def _convert_value_to_timestamp(value: Any) -> int | float:
20 """Try to convert a datetime-like value to a timestamp."""
21 if isinstance(value, (int, float)):
22 return value
23 elif isinstance(value, str):
24 if len(value) == 14:
25 return datetime.strptime(value, r"%Y%m%d%H%M%S").timestamp()
26 if value.endswith("Z"):
27 value = value[:-1] + "+00:00"
28 return datetime.fromisoformat(value).timestamp()
29 elif isinstance(value, datetime):
30 return value.timestamp()
31 else:
32 warnings.warn(
33 f"Cannot convert {value!r} of type {type(value)!r} to a timestamp."
34 " Please report this at: https://github.com/fsspec/universal_path/issues",
35 RuntimeWarning,
36 stacklevel=2,
37 )
38 raise TypeError(f"Cannot convert {value!r} to a timestamp.")
41def _get_stat_result_extra_fields() -> tuple[str, ...]:
42 """retrieve the extra fields of the os.stat_result class."""
43 # Note:
44 # The lines below let us provide a dictionary with the additional
45 # named fields of the stat_result class as keys and the internal
46 # index of the field as value.
47 sr = os.stat_result(range(os.stat_result.n_fields))
48 rd = sr.__reduce__()
49 assert isinstance(rd, tuple), "unexpected return os.stat_result.__reduce__"
50 _, (_, extra) = rd
51 extra_fields = sorted(extra, key=extra.__getitem__)
52 return tuple(extra_fields)
55class UPathStatResult:
56 """A stat_result compatible class wrapping fsspec info dicts.
58 **Note**: It is unlikely that you will ever have to instantiate
59 this class directly. If you want to convert and info dict,
60 use: `UPathStatResult.from_info(info)`
62 This object may be accessed either as a tuple of
63 (mode, ino, dev, nlink, uid, gid, size, atime, mtime, ctime)
64 or via the attributes st_mode, st_ino, st_dev, st_nlink, st_uid, and so on.
66 There's an additional method `as_info()` for accessing the info dict.
67 This is useful to access additional information provided by the file system
68 implementation, that's not covered by the stat_result tuple.
70 """
72 __slots__ = ("_seq", "_info")
73 # Note:
74 # can't derive from os.stat_result at all, and can't derive from
75 # tuple and have slots. So we duck type the os.stat_result class
77 # Add the fields and "extra fields" of the os.stat_result class
78 _fields = (
79 "st_mode",
80 "st_ino",
81 "st_dev",
82 "st_nlink",
83 "st_uid",
84 "st_gid",
85 "st_size",
86 "st_atime",
87 "st_mtime",
88 "st_ctime",
89 )
90 _fields_extra = _get_stat_result_extra_fields()
92 # Provide the n_ attributes of the os.stat_result class for compatibility
93 n_sequence_fields = len(_fields)
94 n_fields = len(_fields) + len(_fields_extra)
95 n_unnamed_fields = len(set(_fields_extra).intersection(_fields))
97 if (
98 n_fields != os.stat_result.n_fields
99 or n_sequence_fields != os.stat_result.n_sequence_fields
100 or n_unnamed_fields != os.stat_result.n_unnamed_fields
101 ):
102 warnings.warn(
103 "UPathStatResult: The assumed number of fields in the"
104 " stat_result class is not correct. Got: "
105 f" {_fields!r}, {_fields_extra!r}, {os.stat_result.n_fields}"
106 " This might cause problems? Please report this issue at:"
107 " https://github.com/fsspec/universal_path/issues",
108 RuntimeWarning,
109 stacklevel=2,
110 )
112 def __init__(
113 self,
114 stat_result_seq: Sequence[int],
115 info_dict: Mapping[str, Any] | None = None,
116 ) -> None:
117 """init compatible with os.stat_result
119 Use `UPathStatResult.from_info(info)` to instantiate from a fsspec info.
120 """
121 seq = tuple(stat_result_seq)
122 if n := len(seq) < self.n_sequence_fields:
123 raise TypeError(
124 f"{self.__name__} takes at least {self.n_fields}-sequence"
125 " ({n}-sequence given)"
126 )
127 elif n > self.n_fields:
128 raise TypeError(
129 f"{self.__name__} takes at most {self.n_fields}-sequence"
130 " ({n}-sequence given)"
131 )
132 elif self.n_sequence_fields <= n < self.n_sequence_fields:
133 warnings.warn(
134 "UPathStatResult: The seq provided more than"
135 f" {self.n_sequence_fields} items. Ignoring the extra items...",
136 UserWarning,
137 stacklevel=2,
138 )
139 self._seq = seq[: self.n_sequence_fields]
140 self._info = info_dict or {}
142 def __repr__(self):
143 cls_name = type(self).__name__
144 seq_attrs = ", ".join(map("{0[0]}={0[1]}".format, zip(self._fields, self)))
145 return f"{cls_name}({seq_attrs}, info={self._info!r})"
147 def __eq__(self, other):
148 if not isinstance(other, UPathStatResult):
149 return NotImplemented
150 else:
151 return self._info == other._info
153 # --- access to the fsspec info dict ------------------------------
155 @classmethod
156 def from_info(cls, info: Mapping[str, Any]) -> UPathStatResult:
157 """Create a UPathStatResult from a fsspec info dict."""
158 # fill all the fallback default values with 0
159 defaults = [0] * cls.n_sequence_fields
160 return cls(defaults, info)
162 def as_info(self) -> Mapping[str, Any]:
163 """Return the fsspec info dict."""
164 return self._info
166 # --- guaranteed fields -------------------------------------------
168 @property
169 def st_mode(self) -> int:
170 """protection bits"""
171 mode = self._info.get("mode")
172 if isinstance(mode, int):
173 return mode
174 elif isinstance(mode, str):
175 try:
176 return int(mode, 8)
177 except ValueError:
178 pass
180 type_ = self._info.get("type")
181 if type_ == "file":
182 return S_IFREG # see: stat.S_ISREG
183 elif type_ == "directory":
184 return S_IFDIR # see: stat.S_ISDIR
186 if self._info.get("isLink"):
187 return S_IFLNK # see: stat.S_ISLNK
189 return self._seq[0]
191 @property
192 def st_ino(self) -> int:
193 """inode"""
194 ino = self._info.get("ino")
195 if isinstance(ino, int):
196 return ino
197 return self._seq[1]
199 @property
200 def st_dev(self) -> int:
201 """device"""
202 dev = self._info.get("dev")
203 if isinstance(dev, int):
204 return dev
205 return self._seq[2]
207 @property
208 def st_nlink(self) -> int:
209 """number of hard links"""
210 nlink = self._info.get("nlink")
211 if isinstance(nlink, int):
212 return nlink
213 return self._seq[3]
215 @property
216 def st_uid(self) -> int:
217 """user ID of owner"""
218 for key in ["uid", "owner", "uname", "unix.owner"]:
219 try:
220 return int(self._info[key])
221 except (ValueError, TypeError, KeyError):
222 pass
223 return self._seq[4]
225 @property
226 def st_gid(self) -> int:
227 """group ID of owner"""
228 for key in ["gid", "group", "gname", "unix.group"]:
229 try:
230 return int(self._info[key])
231 except (ValueError, TypeError, KeyError):
232 pass
233 return self._seq[5]
235 @property
236 def st_size(self) -> int:
237 """total size, in bytes"""
238 try:
239 return int(self._info["size"])
240 except (ValueError, TypeError, KeyError):
241 return self._seq[6]
243 @property
244 def st_atime(self) -> int | float:
245 """time of last access"""
246 for key in ["atime", "time", "last_accessed", "accessTime"]:
247 try:
248 raw_value = self._info[key]
249 except KeyError:
250 continue
251 try:
252 return _convert_value_to_timestamp(raw_value)
253 except (TypeError, ValueError):
254 pass
255 return self._seq[7]
257 @property
258 def st_mtime(self) -> int | float:
259 """time of last modification"""
260 for key in [
261 "mtime",
262 "LastModified",
263 "last_modified",
264 "timeModified",
265 "modificationTime",
266 "modified_at",
267 "modify",
268 ]:
269 try:
270 raw_value = self._info[key]
271 except KeyError:
272 continue
273 try:
274 return _convert_value_to_timestamp(raw_value)
275 except (TypeError, ValueError):
276 pass
277 return self._seq[8]
279 @property
280 def st_ctime(self) -> int | float:
281 """time of last change"""
282 try:
283 raw_value = self._info["ctime"]
284 except KeyError:
285 pass
286 else:
287 try:
288 return _convert_value_to_timestamp(raw_value)
289 except (TypeError, ValueError):
290 pass
291 return self._seq[9]
293 @property
294 def st_birthtime(self) -> int | float:
295 """time of creation"""
296 for key in [
297 "birthtime",
298 "created",
299 "creation_time",
300 "timeCreated",
301 "created_at",
302 ]:
303 try:
304 raw_value = self._info[key]
305 except KeyError:
306 continue
307 try:
308 return _convert_value_to_timestamp(raw_value)
309 except (TypeError, ValueError):
310 pass
311 raise AttributeError("birthtime")
313 @property
314 def st_atime_ns(self) -> int:
315 """time of last access in nanoseconds"""
316 try:
317 return int(self._info["atime_ns"])
318 except KeyError:
319 pass
320 atime = self.st_atime
321 if isinstance(atime, float):
322 return int(atime * 1e9)
323 return atime * 1_000_000_000
325 @property
326 def st_mtime_ns(self) -> int:
327 """time of last modification in nanoseconds"""
328 try:
329 return int(self._info["mtime_ns"])
330 except KeyError:
331 pass
332 mtime = self.st_mtime
333 if isinstance(mtime, float):
334 return int(mtime * 1e9)
335 return mtime * 1_000_000_000
337 @property
338 def st_ctime_ns(self) -> int:
339 """time of last change in nanoseconds"""
340 try:
341 return int(self._info["ctime_ns"])
342 except KeyError:
343 pass
344 ctime = self.st_ctime
345 if isinstance(ctime, float):
346 return int(ctime * 1e9)
347 return ctime * 1_000_000_000
349 # --- extra fields ------------------------------------------------
351 def __getattr__(self, item):
352 if item in self._fields_extra:
353 return 0 # fallback default value
354 raise AttributeError(item)
356 # --- os.stat_result tuple interface ------------------------------
358 def __len__(self) -> int:
359 return len(self._fields)
361 def __iter__(self) -> Iterator[int]:
362 """the sequence interface iterates over the guaranteed fields.
364 All values are integers.
365 """
366 for field in self._fields:
367 yield int(getattr(self, field))
369 def index(self, value: int, start: int = 0, stop: int | None = None, /) -> int:
370 """the sequence interface index method."""
371 if stop is None:
372 stop = len(self._seq)
373 return self._seq.index(value, start, stop)
375 def count(self, value: int) -> int:
376 """the sequence interface count method."""
377 return self._seq.count(value)
379 # --- compatibility with the fsspec info dict interface ------------
381 def __getitem__(self, item: int | str) -> Any:
382 if isinstance(item, str):
383 warnings.warn(
384 "Access the fsspec info via `.as_info()[key]`",
385 DeprecationWarning,
386 stacklevel=2,
387 )
388 return self._info[item]
389 # we need to go via the attributes and cast to int
390 attr = self._fields[item]
391 return int(getattr(self, attr))
393 def keys(self):
394 """compatibility with the fsspec info dict interface."""
395 warnings.warn(
396 "Access the fsspec info via `.as_info().keys()`",
397 DeprecationWarning,
398 stacklevel=2,
399 )
400 return self._info.keys()
402 def values(self):
403 """compatibility with the fsspec info dict interface."""
404 warnings.warn(
405 "Access the fsspec info via `.as_info().values()`",
406 DeprecationWarning,
407 stacklevel=2,
408 )
409 return self._info.values()
411 def items(self):
412 """compatibility with the fsspec info dict interface."""
413 warnings.warn(
414 "Access the fsspec info via `.as_info().items()`",
415 DeprecationWarning,
416 stacklevel=2,
417 )
418 return self._info.items()
420 def get(self, key, default=None):
421 """compatibility with the fsspec info dict interface."""
422 warnings.warn(
423 "Access the fsspec info via `.as_info().get(key, default)`",
424 DeprecationWarning,
425 stacklevel=2,
426 )
427 return self._info.get(key, default)
429 def copy(self):
430 """compatibility with the fsspec info dict interface."""
431 warnings.warn(
432 "Access the fsspec info via `.as_info().copy()`",
433 DeprecationWarning,
434 stacklevel=2,
435 )
436 return self._info.copy()