Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/upath/_stat.py: 33%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

204 statements  

1from __future__ import annotations 

2 

3import os 

4import warnings 

5from datetime import datetime 

6from stat import S_IFDIR 

7from stat import S_IFLNK 

8from stat import S_IFREG 

9from typing import Any 

10from typing import Iterator 

11from typing import Mapping 

12from typing import Sequence 

13 

14__all__ = [ 

15 "UPathStatResult", 

16] 

17 

18 

19def _convert_value_to_timestamp(value: Any) -> int | float: 

20 """Try to convert a datetime-like value to a timestamp.""" 

21 if isinstance(value, (int, float)): 

22 return value 

23 elif isinstance(value, str): 

24 if value.endswith("Z"): 

25 value = value[:-1] + "+00:00" 

26 return datetime.fromisoformat(value).timestamp() 

27 elif isinstance(value, datetime): 

28 return value.timestamp() 

29 else: 

30 warnings.warn( 

31 f"Cannot convert {value!r} of type {type(value)!r} to a timestamp." 

32 " Please report this at: https://github.com/fsspec/universal_path/issues", 

33 RuntimeWarning, 

34 stacklevel=2, 

35 ) 

36 raise TypeError(f"Cannot convert {value!r} to a timestamp.") 

37 

38 

39def _get_stat_result_extra_fields() -> tuple[str, ...]: 

40 """retrieve the extra fields of the os.stat_result class.""" 

41 # Note: 

42 # The lines below let us provide a dictionary with the additional 

43 # named fields of the stat_result class as keys and the internal 

44 # index of the field as value. 

45 sr = os.stat_result(range(os.stat_result.n_fields)) 

46 rd = sr.__reduce__() 

47 assert isinstance(rd, tuple), "unexpected return os.stat_result.__reduce__" 

48 _, (_, extra) = rd 

49 extra_fields = sorted(extra, key=extra.__getitem__) 

50 return tuple(extra_fields) 

51 

52 

53class UPathStatResult: 

54 """A stat_result compatible class wrapping fsspec info dicts. 

55 

56 **Note**: It is unlikely that you will ever have to instantiate 

57 this class directly. If you want to convert and info dict, 

58 use: `UPathStatResult.from_info(info)` 

59 

60 This object may be accessed either as a tuple of 

61 (mode, ino, dev, nlink, uid, gid, size, atime, mtime, ctime) 

62 or via the attributes st_mode, st_ino, st_dev, st_nlink, st_uid, and so on. 

63 

64 There's an additional method `as_info()` for accessing the info dict. 

65 This is useful to access additional information provided by the file system 

66 implementation, that's not covered by the stat_result tuple. 

67 

68 """ 

69 

70 __slots__ = ("_seq", "_info") 

71 # Note: 

72 # can't derive from os.stat_result at all, and can't derive from 

73 # tuple and have slots. So we duck type the os.stat_result class 

74 

75 # Add the fields and "extra fields" of the os.stat_result class 

76 _fields = ( 

77 "st_mode", 

78 "st_ino", 

79 "st_dev", 

80 "st_nlink", 

81 "st_uid", 

82 "st_gid", 

83 "st_size", 

84 "st_atime", 

85 "st_mtime", 

86 "st_ctime", 

87 ) 

88 _fields_extra = _get_stat_result_extra_fields() 

89 

90 # Provide the n_ attributes of the os.stat_result class for compatibility 

91 n_sequence_fields = len(_fields) 

92 n_fields = len(_fields) + len(_fields_extra) 

93 n_unnamed_fields = len(set(_fields_extra).intersection(_fields)) 

94 

95 if ( 

96 n_fields != os.stat_result.n_fields 

97 or n_sequence_fields != os.stat_result.n_sequence_fields 

98 or n_unnamed_fields != os.stat_result.n_unnamed_fields 

99 ): 

100 warnings.warn( 

101 "UPathStatResult: The assumed number of fields in the" 

102 " stat_result class is not correct. Got: " 

103 f" {_fields!r}, {_fields_extra!r}, {os.stat_result.n_fields}" 

104 " This might cause problems? Please report this issue at:" 

105 " https://github.com/fsspec/universal_path/issues", 

106 RuntimeWarning, 

107 stacklevel=2, 

108 ) 

109 

110 def __init__( 

111 self, 

112 stat_result_seq: Sequence[int], 

113 info_dict: Mapping[str, Any] | None = None, 

114 ) -> None: 

115 """init compatible with os.stat_result 

116 

117 Use `UPathStatResult.from_info(info)` to instantiate from a fsspec info. 

118 """ 

119 seq = tuple(stat_result_seq) 

120 if n := len(seq) < self.n_sequence_fields: 

121 raise TypeError( 

122 f"{self.__name__} takes at least {self.n_fields}-sequence" 

123 " ({n}-sequence given)" 

124 ) 

125 elif n > self.n_fields: 

126 raise TypeError( 

127 f"{self.__name__} takes at most {self.n_fields}-sequence" 

128 " ({n}-sequence given)" 

129 ) 

130 elif self.n_sequence_fields <= n < self.n_sequence_fields: 

131 warnings.warn( 

132 "UPathStatResult: The seq provided more than" 

133 f" {self.n_sequence_fields} items. Ignoring the extra items...", 

134 UserWarning, 

135 stacklevel=2, 

136 ) 

137 self._seq = seq[: self.n_sequence_fields] 

138 self._info = info_dict or {} 

139 

140 def __repr__(self): 

141 cls_name = type(self).__name__ 

142 seq_attrs = ", ".join(map("{0[0]}={0[1]}".format, zip(self._fields, self))) 

143 return f"{cls_name}({seq_attrs}, info={self._info!r})" 

144 

145 def __eq__(self, other): 

146 if not isinstance(other, UPathStatResult): 

147 return NotImplemented 

148 else: 

149 return self._info == other._info 

150 

151 # --- access to the fsspec info dict ------------------------------ 

152 

153 @classmethod 

154 def from_info(cls, info: Mapping[str, Any]) -> UPathStatResult: 

155 """Create a UPathStatResult from a fsspec info dict.""" 

156 # fill all the fallback default values with 0 

157 defaults = [0] * cls.n_sequence_fields 

158 return cls(defaults, info) 

159 

160 def as_info(self) -> Mapping[str, Any]: 

161 """Return the fsspec info dict.""" 

162 return self._info 

163 

164 # --- guaranteed fields ------------------------------------------- 

165 

166 @property 

167 def st_mode(self) -> int: 

168 """protection bits""" 

169 mode = self._info.get("mode") 

170 if isinstance(mode, int): 

171 return mode 

172 elif isinstance(mode, str): 

173 try: 

174 return int(mode, 8) 

175 except ValueError: 

176 pass 

177 

178 type_ = self._info.get("type") 

179 if type_ == "file": 

180 return S_IFREG # see: stat.S_ISREG 

181 elif type_ == "directory": 

182 return S_IFDIR # see: stat.S_ISDIR 

183 

184 if self._info.get("isLink"): 

185 return S_IFLNK # see: stat.S_ISLNK 

186 

187 return self._seq[0] 

188 

189 @property 

190 def st_ino(self) -> int: 

191 """inode""" 

192 ino = self._info.get("ino") 

193 if isinstance(ino, int): 

194 return ino 

195 return self._seq[1] 

196 

197 @property 

198 def st_dev(self) -> int: 

199 """device""" 

200 dev = self._info.get("dev") 

201 if isinstance(dev, int): 

202 return dev 

203 return self._seq[2] 

204 

205 @property 

206 def st_nlink(self) -> int: 

207 """number of hard links""" 

208 nlink = self._info.get("nlink") 

209 if isinstance(nlink, int): 

210 return nlink 

211 return self._seq[3] 

212 

213 @property 

214 def st_uid(self) -> int: 

215 """user ID of owner""" 

216 for key in ["uid", "owner", "uname", "unix.owner"]: 

217 try: 

218 return int(self._info[key]) 

219 except (ValueError, TypeError, KeyError): 

220 pass 

221 return self._seq[4] 

222 

223 @property 

224 def st_gid(self) -> int: 

225 """group ID of owner""" 

226 for key in ["gid", "group", "gname", "unix.group"]: 

227 try: 

228 return int(self._info[key]) 

229 except (ValueError, TypeError, KeyError): 

230 pass 

231 return self._seq[5] 

232 

233 @property 

234 def st_size(self) -> int: 

235 """total size, in bytes""" 

236 try: 

237 return int(self._info["size"]) 

238 except (ValueError, TypeError, KeyError): 

239 return self._seq[6] 

240 

241 @property 

242 def st_atime(self) -> int | float: 

243 """time of last access""" 

244 for key in ["atime", "time", "last_accessed", "accessTime"]: 

245 try: 

246 raw_value = self._info[key] 

247 except KeyError: 

248 continue 

249 try: 

250 return _convert_value_to_timestamp(raw_value) 

251 except (TypeError, ValueError): 

252 pass 

253 return self._seq[7] 

254 

255 @property 

256 def st_mtime(self) -> int | float: 

257 """time of last modification""" 

258 for key in [ 

259 "mtime", 

260 "LastModified", 

261 "last_modified", 

262 "timeModified", 

263 "modificationTime", 

264 "modified_at", 

265 ]: 

266 try: 

267 raw_value = self._info[key] 

268 except KeyError: 

269 continue 

270 try: 

271 return _convert_value_to_timestamp(raw_value) 

272 except (TypeError, ValueError): 

273 pass 

274 return self._seq[8] 

275 

276 @property 

277 def st_ctime(self) -> int | float: 

278 """time of last change""" 

279 try: 

280 raw_value = self._info["ctime"] 

281 except KeyError: 

282 pass 

283 else: 

284 try: 

285 return _convert_value_to_timestamp(raw_value) 

286 except (TypeError, ValueError): 

287 pass 

288 return self._seq[9] 

289 

290 @property 

291 def st_birthtime(self) -> int | float: 

292 """time of creation""" 

293 for key in [ 

294 "birthtime", 

295 "created", 

296 "creation_time", 

297 "timeCreated", 

298 "created_at", 

299 ]: 

300 try: 

301 raw_value = self._info[key] 

302 except KeyError: 

303 continue 

304 try: 

305 return _convert_value_to_timestamp(raw_value) 

306 except (TypeError, ValueError): 

307 pass 

308 raise AttributeError("birthtime") 

309 

310 # --- extra fields ------------------------------------------------ 

311 

312 def __getattr__(self, item): 

313 if item in self._fields_extra: 

314 return 0 # fallback default value 

315 raise AttributeError(item) 

316 

317 # --- os.stat_result tuple interface ------------------------------ 

318 

319 def __len__(self) -> int: 

320 return len(self._fields) 

321 

322 def __iter__(self) -> Iterator[int]: 

323 """the sequence interface iterates over the guaranteed fields. 

324 

325 All values are integers. 

326 """ 

327 for field in self._fields: 

328 yield int(getattr(self, field)) 

329 

330 def index(self, value: int, start: int = 0, stop: int | None = None, /) -> int: 

331 """the sequence interface index method.""" 

332 if stop is None: 

333 stop = len(self._seq) 

334 return self._seq.index(value, start, stop) 

335 

336 def count(self, value: int) -> int: 

337 """the sequence interface count method.""" 

338 return self._seq.count(value) 

339 

340 # --- compatibility with the fsspec info dict interface ------------ 

341 

342 def __getitem__(self, item: int | str) -> Any: 

343 if isinstance(item, str): 

344 warnings.warn( 

345 "Access the fsspec info via `.as_info()[key]`", 

346 DeprecationWarning, 

347 stacklevel=2, 

348 ) 

349 return self._info[item] 

350 # we need to go via the attributes and cast to int 

351 attr = self._fields[item] 

352 return int(getattr(self, attr)) 

353 

354 def keys(self): 

355 """compatibility with the fsspec info dict interface.""" 

356 warnings.warn( 

357 "Access the fsspec info via `.as_info().keys()`", 

358 DeprecationWarning, 

359 stacklevel=2, 

360 ) 

361 return self._info.keys() 

362 

363 def values(self): 

364 """compatibility with the fsspec info dict interface.""" 

365 warnings.warn( 

366 "Access the fsspec info via `.as_info().values()`", 

367 DeprecationWarning, 

368 stacklevel=2, 

369 ) 

370 return self._info.values() 

371 

372 def items(self): 

373 """compatibility with the fsspec info dict interface.""" 

374 warnings.warn( 

375 "Access the fsspec info via `.as_info().items()`", 

376 DeprecationWarning, 

377 stacklevel=2, 

378 ) 

379 return self._info.items() 

380 

381 def get(self, key, default=None): 

382 """compatibility with the fsspec info dict interface.""" 

383 warnings.warn( 

384 "Access the fsspec info via `.as_info().get(key, default)`", 

385 DeprecationWarning, 

386 stacklevel=2, 

387 ) 

388 return self._info.get(key, default) 

389 

390 def copy(self): 

391 """compatibility with the fsspec info dict interface.""" 

392 warnings.warn( 

393 "Access the fsspec info via `.as_info().copy()`", 

394 DeprecationWarning, 

395 stacklevel=2, 

396 ) 

397 return self._info.copy()