Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/upath/_stat.py: 31%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

236 statements  

1from __future__ import annotations 

2 

3import os 

4import warnings 

5from collections.abc import Iterator 

6from collections.abc import Mapping 

7from collections.abc import Sequence 

8from datetime import datetime 

9from stat import S_IFDIR 

10from stat import S_IFLNK 

11from stat import S_IFREG 

12from typing import Any 

13 

14__all__ = [ 

15 "UPathStatResult", 

16] 

17 

18 

19def _convert_value_to_timestamp(value: Any) -> int | float: 

20 """Try to convert a datetime-like value to a timestamp.""" 

21 if isinstance(value, (int, float)): 

22 return value 

23 elif isinstance(value, str): 

24 if len(value) == 14: 

25 return datetime.strptime(value, r"%Y%m%d%H%M%S").timestamp() 

26 if value.endswith("Z"): 

27 value = value[:-1] + "+00:00" 

28 return datetime.fromisoformat(value).timestamp() 

29 elif isinstance(value, datetime): 

30 return value.timestamp() 

31 else: 

32 warnings.warn( 

33 f"Cannot convert {value!r} of type {type(value)!r} to a timestamp." 

34 " Please report this at: https://github.com/fsspec/universal_path/issues", 

35 RuntimeWarning, 

36 stacklevel=2, 

37 ) 

38 raise TypeError(f"Cannot convert {value!r} to a timestamp.") 

39 

40 

41def _get_stat_result_extra_fields() -> tuple[str, ...]: 

42 """retrieve the extra fields of the os.stat_result class.""" 

43 # Note: 

44 # The lines below let us provide a dictionary with the additional 

45 # named fields of the stat_result class as keys and the internal 

46 # index of the field as value. 

47 sr = os.stat_result(range(os.stat_result.n_fields)) 

48 rd = sr.__reduce__() 

49 assert isinstance(rd, tuple), "unexpected return os.stat_result.__reduce__" 

50 _, (_, extra) = rd 

51 extra_fields = sorted(extra, key=extra.__getitem__) 

52 return tuple(extra_fields) 

53 

54 

55class UPathStatResult: 

56 """A stat_result compatible class wrapping fsspec info dicts. 

57 

58 **Note**: It is unlikely that you will ever have to instantiate 

59 this class directly. If you want to convert and info dict, 

60 use: `UPathStatResult.from_info(info)` 

61 

62 This object may be accessed either as a tuple of 

63 (mode, ino, dev, nlink, uid, gid, size, atime, mtime, ctime) 

64 or via the attributes st_mode, st_ino, st_dev, st_nlink, st_uid, and so on. 

65 

66 There's an additional method `as_info()` for accessing the info dict. 

67 This is useful to access additional information provided by the file system 

68 implementation, that's not covered by the stat_result tuple. 

69 

70 """ 

71 

72 __slots__ = ("_seq", "_info") 

73 # Note: 

74 # can't derive from os.stat_result at all, and can't derive from 

75 # tuple and have slots. So we duck type the os.stat_result class 

76 

77 # Add the fields and "extra fields" of the os.stat_result class 

78 _fields = ( 

79 "st_mode", 

80 "st_ino", 

81 "st_dev", 

82 "st_nlink", 

83 "st_uid", 

84 "st_gid", 

85 "st_size", 

86 "st_atime", 

87 "st_mtime", 

88 "st_ctime", 

89 ) 

90 _fields_extra = _get_stat_result_extra_fields() 

91 

92 # Provide the n_ attributes of the os.stat_result class for compatibility 

93 n_sequence_fields = len(_fields) 

94 n_fields = len(_fields) + len(_fields_extra) 

95 n_unnamed_fields = len(set(_fields_extra).intersection(_fields)) 

96 

97 if ( 

98 n_fields != os.stat_result.n_fields 

99 or n_sequence_fields != os.stat_result.n_sequence_fields 

100 or n_unnamed_fields != os.stat_result.n_unnamed_fields 

101 ): 

102 warnings.warn( 

103 "UPathStatResult: The assumed number of fields in the" 

104 " stat_result class is not correct. Got: " 

105 f" {_fields!r}, {_fields_extra!r}, {os.stat_result.n_fields}" 

106 " This might cause problems? Please report this issue at:" 

107 " https://github.com/fsspec/universal_path/issues", 

108 RuntimeWarning, 

109 stacklevel=2, 

110 ) 

111 

112 def __init__( 

113 self, 

114 stat_result_seq: Sequence[int], 

115 info_dict: Mapping[str, Any] | None = None, 

116 ) -> None: 

117 """init compatible with os.stat_result 

118 

119 Use `UPathStatResult.from_info(info)` to instantiate from a fsspec info. 

120 """ 

121 seq = tuple(stat_result_seq) 

122 if n := len(seq) < self.n_sequence_fields: 

123 raise TypeError( 

124 f"{self.__name__} takes at least {self.n_fields}-sequence" 

125 " ({n}-sequence given)" 

126 ) 

127 elif n > self.n_fields: 

128 raise TypeError( 

129 f"{self.__name__} takes at most {self.n_fields}-sequence" 

130 " ({n}-sequence given)" 

131 ) 

132 elif self.n_sequence_fields <= n < self.n_sequence_fields: 

133 warnings.warn( 

134 "UPathStatResult: The seq provided more than" 

135 f" {self.n_sequence_fields} items. Ignoring the extra items...", 

136 UserWarning, 

137 stacklevel=2, 

138 ) 

139 self._seq = seq[: self.n_sequence_fields] 

140 self._info = info_dict or {} 

141 

142 def __repr__(self): 

143 cls_name = type(self).__name__ 

144 seq_attrs = ", ".join(map("{0[0]}={0[1]}".format, zip(self._fields, self))) 

145 return f"{cls_name}({seq_attrs}, info={self._info!r})" 

146 

147 def __eq__(self, other): 

148 if not isinstance(other, UPathStatResult): 

149 return NotImplemented 

150 else: 

151 return self._info == other._info 

152 

153 # --- access to the fsspec info dict ------------------------------ 

154 

155 @classmethod 

156 def from_info(cls, info: Mapping[str, Any]) -> UPathStatResult: 

157 """Create a UPathStatResult from a fsspec info dict.""" 

158 # fill all the fallback default values with 0 

159 defaults = [0] * cls.n_sequence_fields 

160 return cls(defaults, info) 

161 

162 def as_info(self) -> Mapping[str, Any]: 

163 """Return the fsspec info dict.""" 

164 return self._info 

165 

166 # --- guaranteed fields ------------------------------------------- 

167 

168 @property 

169 def st_mode(self) -> int: 

170 """protection bits""" 

171 mode = self._info.get("mode") 

172 if isinstance(mode, int): 

173 return mode 

174 elif isinstance(mode, str): 

175 try: 

176 return int(mode, 8) 

177 except ValueError: 

178 pass 

179 

180 type_ = self._info.get("type") 

181 if type_ == "file": 

182 return S_IFREG # see: stat.S_ISREG 

183 elif type_ == "directory": 

184 return S_IFDIR # see: stat.S_ISDIR 

185 

186 if self._info.get("isLink"): 

187 return S_IFLNK # see: stat.S_ISLNK 

188 

189 return self._seq[0] 

190 

191 @property 

192 def st_ino(self) -> int: 

193 """inode""" 

194 ino = self._info.get("ino") 

195 if isinstance(ino, int): 

196 return ino 

197 return self._seq[1] 

198 

199 @property 

200 def st_dev(self) -> int: 

201 """device""" 

202 dev = self._info.get("dev") 

203 if isinstance(dev, int): 

204 return dev 

205 return self._seq[2] 

206 

207 @property 

208 def st_nlink(self) -> int: 

209 """number of hard links""" 

210 nlink = self._info.get("nlink") 

211 if isinstance(nlink, int): 

212 return nlink 

213 return self._seq[3] 

214 

215 @property 

216 def st_uid(self) -> int: 

217 """user ID of owner""" 

218 for key in ["uid", "owner", "uname", "unix.owner"]: 

219 try: 

220 return int(self._info[key]) 

221 except (ValueError, TypeError, KeyError): 

222 pass 

223 return self._seq[4] 

224 

225 @property 

226 def st_gid(self) -> int: 

227 """group ID of owner""" 

228 for key in ["gid", "group", "gname", "unix.group"]: 

229 try: 

230 return int(self._info[key]) 

231 except (ValueError, TypeError, KeyError): 

232 pass 

233 return self._seq[5] 

234 

235 @property 

236 def st_size(self) -> int: 

237 """total size, in bytes""" 

238 try: 

239 return int(self._info["size"]) 

240 except (ValueError, TypeError, KeyError): 

241 return self._seq[6] 

242 

243 @property 

244 def st_atime(self) -> int | float: 

245 """time of last access""" 

246 for key in ["atime", "time", "last_accessed", "accessTime"]: 

247 try: 

248 raw_value = self._info[key] 

249 except KeyError: 

250 continue 

251 try: 

252 return _convert_value_to_timestamp(raw_value) 

253 except (TypeError, ValueError): 

254 pass 

255 return self._seq[7] 

256 

257 @property 

258 def st_mtime(self) -> int | float: 

259 """time of last modification""" 

260 for key in [ 

261 "mtime", 

262 "LastModified", 

263 "last_modified", 

264 "timeModified", 

265 "modificationTime", 

266 "modified_at", 

267 "modify", 

268 ]: 

269 try: 

270 raw_value = self._info[key] 

271 except KeyError: 

272 continue 

273 try: 

274 return _convert_value_to_timestamp(raw_value) 

275 except (TypeError, ValueError): 

276 pass 

277 return self._seq[8] 

278 

279 @property 

280 def st_ctime(self) -> int | float: 

281 """time of last change""" 

282 try: 

283 raw_value = self._info["ctime"] 

284 except KeyError: 

285 pass 

286 else: 

287 try: 

288 return _convert_value_to_timestamp(raw_value) 

289 except (TypeError, ValueError): 

290 pass 

291 return self._seq[9] 

292 

293 @property 

294 def st_birthtime(self) -> int | float: 

295 """time of creation""" 

296 for key in [ 

297 "birthtime", 

298 "created", 

299 "creation_time", 

300 "timeCreated", 

301 "created_at", 

302 ]: 

303 try: 

304 raw_value = self._info[key] 

305 except KeyError: 

306 continue 

307 try: 

308 return _convert_value_to_timestamp(raw_value) 

309 except (TypeError, ValueError): 

310 pass 

311 raise AttributeError("birthtime") 

312 

313 @property 

314 def st_atime_ns(self) -> int: 

315 """time of last access in nanoseconds""" 

316 try: 

317 return int(self._info["atime_ns"]) 

318 except KeyError: 

319 pass 

320 atime = self.st_atime 

321 if isinstance(atime, float): 

322 return int(atime * 1e9) 

323 return atime * 1_000_000_000 

324 

325 @property 

326 def st_mtime_ns(self) -> int: 

327 """time of last modification in nanoseconds""" 

328 try: 

329 return int(self._info["mtime_ns"]) 

330 except KeyError: 

331 pass 

332 mtime = self.st_mtime 

333 if isinstance(mtime, float): 

334 return int(mtime * 1e9) 

335 return mtime * 1_000_000_000 

336 

337 @property 

338 def st_ctime_ns(self) -> int: 

339 """time of last change in nanoseconds""" 

340 try: 

341 return int(self._info["ctime_ns"]) 

342 except KeyError: 

343 pass 

344 ctime = self.st_ctime 

345 if isinstance(ctime, float): 

346 return int(ctime * 1e9) 

347 return ctime * 1_000_000_000 

348 

349 # --- extra fields ------------------------------------------------ 

350 

351 def __getattr__(self, item): 

352 if item in self._fields_extra: 

353 return 0 # fallback default value 

354 raise AttributeError(item) 

355 

356 # --- os.stat_result tuple interface ------------------------------ 

357 

358 def __len__(self) -> int: 

359 return len(self._fields) 

360 

361 def __iter__(self) -> Iterator[int]: 

362 """the sequence interface iterates over the guaranteed fields. 

363 

364 All values are integers. 

365 """ 

366 for field in self._fields: 

367 yield int(getattr(self, field)) 

368 

369 def index(self, value: int, start: int = 0, stop: int | None = None, /) -> int: 

370 """the sequence interface index method.""" 

371 if stop is None: 

372 stop = len(self._seq) 

373 return self._seq.index(value, start, stop) 

374 

375 def count(self, value: int) -> int: 

376 """the sequence interface count method.""" 

377 return self._seq.count(value) 

378 

379 # --- compatibility with the fsspec info dict interface ------------ 

380 

381 def __getitem__(self, item: int | str) -> Any: 

382 if isinstance(item, str): 

383 warnings.warn( 

384 "Access the fsspec info via `.as_info()[key]`", 

385 DeprecationWarning, 

386 stacklevel=2, 

387 ) 

388 return self._info[item] 

389 # we need to go via the attributes and cast to int 

390 attr = self._fields[item] 

391 return int(getattr(self, attr)) 

392 

393 def keys(self): 

394 """compatibility with the fsspec info dict interface.""" 

395 warnings.warn( 

396 "Access the fsspec info via `.as_info().keys()`", 

397 DeprecationWarning, 

398 stacklevel=2, 

399 ) 

400 return self._info.keys() 

401 

402 def values(self): 

403 """compatibility with the fsspec info dict interface.""" 

404 warnings.warn( 

405 "Access the fsspec info via `.as_info().values()`", 

406 DeprecationWarning, 

407 stacklevel=2, 

408 ) 

409 return self._info.values() 

410 

411 def items(self): 

412 """compatibility with the fsspec info dict interface.""" 

413 warnings.warn( 

414 "Access the fsspec info via `.as_info().items()`", 

415 DeprecationWarning, 

416 stacklevel=2, 

417 ) 

418 return self._info.items() 

419 

420 def get(self, key, default=None): 

421 """compatibility with the fsspec info dict interface.""" 

422 warnings.warn( 

423 "Access the fsspec info via `.as_info().get(key, default)`", 

424 DeprecationWarning, 

425 stacklevel=2, 

426 ) 

427 return self._info.get(key, default) 

428 

429 def copy(self): 

430 """compatibility with the fsspec info dict interface.""" 

431 warnings.warn( 

432 "Access the fsspec info via `.as_info().copy()`", 

433 DeprecationWarning, 

434 stacklevel=2, 

435 ) 

436 return self._info.copy()