Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pathlib_abc/__init__.py: 37%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

263 statements  

1""" 

2Protocols for supporting classes in pathlib. 

3""" 

4 

5# This module also provides abstract base classes for rich path objects. 

6# These ABCs are a *private* part of the Python standard library, but they're 

7# made available as a PyPI package called "pathlib-abc". It's possible they'll 

8# become an official part of the standard library in future. 

9# 

10# Three ABCs are provided -- _JoinablePath, _ReadablePath and _WritablePath 

11 

12 

13from abc import ABC, abstractmethod 

14from pathlib_abc._glob import _GlobberBase 

15from pathlib_abc._os import ( 

16 copyfileobj, ensure_different_files, 

17 ensure_distinct_paths, vfsopen, vfspath) 

18from typing import Optional, Protocol, runtime_checkable 

19try: 

20 from io import text_encoding 

21except ImportError: 

22 def text_encoding(encoding): 

23 return encoding 

24 

25 

26__all__ = ['PathParser', 'PathInfo', 'JoinablePath', 'ReadablePath', 'WritablePath', 'vfsopen', 'vfspath'] 

27 

28 

29def _explode_path(path, split): 

30 """ 

31 Split the path into a 2-tuple (anchor, parts), where *anchor* is the 

32 uppermost parent of the path (equivalent to path.parents[-1]), and 

33 *parts* is a reversed list of parts following the anchor. 

34 """ 

35 parent, name = split(path) 

36 names = [] 

37 while path != parent: 

38 names.append(name) 

39 path = parent 

40 parent, name = split(path) 

41 return path, names 

42 

43 

44@runtime_checkable 

45class PathParser(Protocol): 

46 """Protocol for path parsers, which do low-level path manipulation. 

47 

48 Path parsers provide a subset of the os.path API, specifically those 

49 functions needed to provide JoinablePath functionality. Each JoinablePath 

50 subclass references its path parser via a 'parser' class attribute. 

51 """ 

52 

53 sep: str 

54 altsep: Optional[str] 

55 def split(self, path: str) -> tuple[str, str]: ... 

56 def splitext(self, path: str) -> tuple[str, str]: ... 

57 def normcase(self, path: str) -> str: ... 

58 

59 

60@runtime_checkable 

61class PathInfo(Protocol): 

62 """Protocol for path info objects, which support querying the file type. 

63 Methods may return cached results. 

64 """ 

65 def exists(self, *, follow_symlinks: bool = True) -> bool: ... 

66 def is_dir(self, *, follow_symlinks: bool = True) -> bool: ... 

67 def is_file(self, *, follow_symlinks: bool = True) -> bool: ... 

68 def is_symlink(self) -> bool: ... 

69 

70 

71class _PathGlobber(_GlobberBase): 

72 """Provides shell-style pattern matching and globbing for ReadablePath. 

73 """ 

74 

75 @staticmethod 

76 def lexists(path): 

77 return path.info.exists(follow_symlinks=False) 

78 

79 @staticmethod 

80 def scandir(path): 

81 return ((child.info, child.name, child) for child in path.iterdir()) 

82 

83 @staticmethod 

84 def concat_path(path, text): 

85 return path.with_segments(vfspath(path) + text) 

86 

87 stringify_path = staticmethod(vfspath) 

88 

89 

90class JoinablePath(ABC): 

91 """Abstract base class for pure path objects. 

92 

93 This class *does not* provide several magic methods that are defined in 

94 its implementation PurePath. They are: __init__, __fspath__, __bytes__, 

95 __reduce__, __hash__, __eq__, __lt__, __le__, __gt__, __ge__. 

96 """ 

97 __slots__ = () 

98 

99 @property 

100 @abstractmethod 

101 def parser(self): 

102 """Implementation of pathlib._types.Parser used for low-level path 

103 parsing and manipulation. 

104 """ 

105 raise NotImplementedError 

106 

107 @abstractmethod 

108 def with_segments(self, *pathsegments): 

109 """Construct a new path object from any number of path-like objects. 

110 Subclasses may override this method to customize how new path objects 

111 are created from methods like `iterdir()`. 

112 """ 

113 raise NotImplementedError 

114 

115 @abstractmethod 

116 def __vfspath__(self): 

117 """Return the string representation of the path.""" 

118 raise NotImplementedError 

119 

120 @property 

121 def anchor(self): 

122 """The concatenation of the drive and root, or ''.""" 

123 return _explode_path(vfspath(self), self.parser.split)[0] 

124 

125 @property 

126 def name(self): 

127 """The final path component, if any.""" 

128 return self.parser.split(vfspath(self))[1] 

129 

130 @property 

131 def suffix(self): 

132 """ 

133 The final component's last suffix, if any. 

134 

135 This includes the leading period. For example: '.txt' 

136 """ 

137 return self.parser.splitext(self.name)[1] 

138 

139 @property 

140 def suffixes(self): 

141 """ 

142 A list of the final component's suffixes, if any. 

143 

144 These include the leading periods. For example: ['.tar', '.gz'] 

145 """ 

146 split = self.parser.splitext 

147 stem, suffix = split(self.name) 

148 suffixes = [] 

149 while suffix: 

150 suffixes.append(suffix) 

151 stem, suffix = split(stem) 

152 return suffixes[::-1] 

153 

154 @property 

155 def stem(self): 

156 """The final path component, minus its last suffix.""" 

157 return self.parser.splitext(self.name)[0] 

158 

159 def with_name(self, name): 

160 """Return a new path with the file name changed.""" 

161 split = self.parser.split 

162 if split(name)[0]: 

163 raise ValueError(f"Invalid name {name!r}") 

164 path = vfspath(self) 

165 path = path.removesuffix(split(path)[1]) + name 

166 return self.with_segments(path) 

167 

168 def with_stem(self, stem): 

169 """Return a new path with the stem changed.""" 

170 suffix = self.suffix 

171 if not suffix: 

172 return self.with_name(stem) 

173 elif not stem: 

174 # If the suffix is non-empty, we can't make the stem empty. 

175 raise ValueError(f"{self!r} has a non-empty suffix") 

176 else: 

177 return self.with_name(stem + suffix) 

178 

179 def with_suffix(self, suffix): 

180 """Return a new path with the file suffix changed. If the path 

181 has no suffix, add given suffix. If the given suffix is an empty 

182 string, remove the suffix from the path. 

183 """ 

184 stem = self.stem 

185 if not stem: 

186 # If the stem is empty, we can't make the suffix non-empty. 

187 raise ValueError(f"{self!r} has an empty name") 

188 elif suffix and not suffix.startswith('.'): 

189 raise ValueError(f"Invalid suffix {suffix!r}") 

190 else: 

191 return self.with_name(stem + suffix) 

192 

193 @property 

194 def parts(self): 

195 """An object providing sequence-like access to the 

196 components in the filesystem path.""" 

197 anchor, parts = _explode_path(vfspath(self), self.parser.split) 

198 if anchor: 

199 parts.append(anchor) 

200 return tuple(reversed(parts)) 

201 

202 def joinpath(self, *pathsegments): 

203 """Combine this path with one or several arguments, and return a 

204 new path representing either a subpath (if all arguments are relative 

205 paths) or a totally different path (if one of the arguments is 

206 anchored). 

207 """ 

208 return self.with_segments(vfspath(self), *pathsegments) 

209 

210 def __truediv__(self, key): 

211 try: 

212 return self.with_segments(vfspath(self), key) 

213 except TypeError: 

214 return NotImplemented 

215 

216 def __rtruediv__(self, key): 

217 try: 

218 return self.with_segments(key, vfspath(self)) 

219 except TypeError: 

220 return NotImplemented 

221 

222 @property 

223 def parent(self): 

224 """The logical parent of the path.""" 

225 path = vfspath(self) 

226 parent = self.parser.split(path)[0] 

227 if path != parent: 

228 return self.with_segments(parent) 

229 return self 

230 

231 @property 

232 def parents(self): 

233 """A sequence of this path's logical parents.""" 

234 split = self.parser.split 

235 path = vfspath(self) 

236 parent = split(path)[0] 

237 parents = [] 

238 while path != parent: 

239 parents.append(self.with_segments(parent)) 

240 path = parent 

241 parent = split(path)[0] 

242 return tuple(parents) 

243 

244 def relative_to(self, other, *, walk_up=False): 

245 """Return the relative path to another path identified by the passed 

246 arguments. If the operation is not possible (because this is not 

247 related to the other path), raise ValueError. 

248 

249 The *walk_up* parameter controls whether `..` may be used to resolve 

250 the path. 

251 """ 

252 parts = [] 

253 for path in (other,) + other.parents: 

254 if self.is_relative_to(path): 

255 break 

256 elif not walk_up: 

257 raise ValueError(f"{self!r} is not in the subpath of {other!r}") 

258 elif path.name == '..': 

259 raise ValueError(f"'..' segment in {other!r} cannot be walked") 

260 else: 

261 parts.append('..') 

262 else: 

263 raise ValueError(f"{self!r} and {other!r} have different anchors") 

264 return self.with_segments(*parts, *self.parts[len(path.parts):]) 

265 

266 def is_relative_to(self, other): 

267 """Return True if the path is relative to another path or False. 

268 """ 

269 return other == self or other in self.parents 

270 

271 def full_match(self, pattern): 

272 """ 

273 Return True if this path matches the given glob-style pattern. The 

274 pattern is matched against the entire path. 

275 """ 

276 case_sensitive = self.parser.normcase('Aa') == 'Aa' 

277 globber = _PathGlobber(self.parser.sep, case_sensitive, recursive=True) 

278 match = globber.compile(pattern, altsep=self.parser.altsep) 

279 return match(vfspath(self)) is not None 

280 

281 

282class ReadablePath(JoinablePath): 

283 """Abstract base class for readable path objects. 

284 

285 The Path class implements this ABC for local filesystem paths. Users may 

286 create subclasses to implement readable virtual filesystem paths, such as 

287 paths in archive files or on remote storage systems. 

288 """ 

289 __slots__ = () 

290 

291 @property 

292 @abstractmethod 

293 def info(self): 

294 """ 

295 A PathInfo object that exposes the file type and other file attributes 

296 of this path. 

297 """ 

298 raise NotImplementedError 

299 

300 @abstractmethod 

301 def __open_reader__(self): 

302 """ 

303 Open the file pointed to by this path for reading in binary mode and 

304 return a file object. 

305 """ 

306 raise NotImplementedError 

307 

308 def read_bytes(self): 

309 """ 

310 Open the file in bytes mode, read it, and close the file. 

311 """ 

312 with vfsopen(self, mode='rb') as f: 

313 return f.read() 

314 

315 def read_text(self, encoding=None, errors=None, newline=None): 

316 """ 

317 Open the file in text mode, read it, and close the file. 

318 """ 

319 # Call io.text_encoding() here to ensure any warning is raised at an 

320 # appropriate stack level. 

321 encoding = text_encoding(encoding) 

322 with vfsopen(self, mode='r', encoding=encoding, errors=errors, newline=newline) as f: 

323 return f.read() 

324 

325 @abstractmethod 

326 def iterdir(self): 

327 """Yield path objects of the directory contents. 

328 

329 The children are yielded in arbitrary order, and the 

330 special entries '.' and '..' are not included. 

331 """ 

332 raise NotImplementedError 

333 

334 def glob(self, pattern, *, recurse_symlinks=True): 

335 """Iterate over this subtree and yield all existing files (of any 

336 kind, including directories) matching the given relative pattern. 

337 """ 

338 anchor, parts = _explode_path(pattern, self.parser.split) 

339 if anchor: 

340 raise NotImplementedError("Non-relative patterns are unsupported") 

341 elif not parts: 

342 raise ValueError(f"Unacceptable pattern: {pattern!r}") 

343 elif not recurse_symlinks: 

344 raise NotImplementedError("recurse_symlinks=False is unsupported") 

345 case_sensitive = self.parser.normcase('Aa') == 'Aa' 

346 globber = _PathGlobber(self.parser.sep, case_sensitive, recursive=True) 

347 select = globber.selector(parts) 

348 return select(self.joinpath('')) 

349 

350 def walk(self, top_down=True, on_error=None, follow_symlinks=False): 

351 """Walk the directory tree from this directory, similar to os.walk().""" 

352 paths = [self] 

353 while paths: 

354 path = paths.pop() 

355 if isinstance(path, tuple): 

356 yield path 

357 continue 

358 dirnames = [] 

359 filenames = [] 

360 if not top_down: 

361 paths.append((path, dirnames, filenames)) 

362 try: 

363 for child in path.iterdir(): 

364 if child.info.is_dir(follow_symlinks=follow_symlinks): 

365 if not top_down: 

366 paths.append(child) 

367 dirnames.append(child.name) 

368 else: 

369 filenames.append(child.name) 

370 except OSError as error: 

371 if on_error is not None: 

372 on_error(error) 

373 if not top_down: 

374 while not isinstance(paths.pop(), tuple): 

375 pass 

376 continue 

377 if top_down: 

378 yield path, dirnames, filenames 

379 paths += [path.joinpath(d) for d in reversed(dirnames)] 

380 

381 @abstractmethod 

382 def readlink(self): 

383 """ 

384 Return the path to which the symbolic link points. 

385 """ 

386 raise NotImplementedError 

387 

388 def copy(self, target, **kwargs): 

389 """ 

390 Recursively copy this file or directory tree to the given destination. 

391 """ 

392 ensure_distinct_paths(self, target) 

393 target._copy_from(self, **kwargs) 

394 return target.joinpath() # Empty join to ensure fresh metadata. 

395 

396 def copy_into(self, target_dir, **kwargs): 

397 """ 

398 Copy this file or directory tree into the given existing directory. 

399 """ 

400 name = self.name 

401 if not name: 

402 raise ValueError(f"{self!r} has an empty name") 

403 return self.copy(target_dir / name, **kwargs) 

404 

405 

406class WritablePath(JoinablePath): 

407 """Abstract base class for writable path objects. 

408 

409 The Path class implements this ABC for local filesystem paths. Users may 

410 create subclasses to implement writable virtual filesystem paths, such as 

411 paths in archive files or on remote storage systems. 

412 """ 

413 __slots__ = () 

414 

415 @abstractmethod 

416 def symlink_to(self, target, target_is_directory=False): 

417 """ 

418 Make this path a symlink pointing to the target path. 

419 Note the order of arguments (link, target) is the reverse of os.symlink. 

420 """ 

421 raise NotImplementedError 

422 

423 @abstractmethod 

424 def mkdir(self): 

425 """ 

426 Create a new directory at this given path. 

427 """ 

428 raise NotImplementedError 

429 

430 @abstractmethod 

431 def __open_writer__(self, mode): 

432 """ 

433 Open the file pointed to by this path for writing in binary mode and 

434 return a file object. 

435 """ 

436 raise NotImplementedError 

437 

438 def write_bytes(self, data): 

439 """ 

440 Open the file in bytes mode, write to it, and close the file. 

441 """ 

442 # type-check for the buffer interface before truncating the file 

443 view = memoryview(data) 

444 with vfsopen(self, mode='wb') as f: 

445 return f.write(view) 

446 

447 def write_text(self, data, encoding=None, errors=None, newline=None): 

448 """ 

449 Open the file in text mode, write to it, and close the file. 

450 """ 

451 # Call io.text_encoding() here to ensure any warning is raised at an 

452 # appropriate stack level. 

453 encoding = text_encoding(encoding) 

454 if not isinstance(data, str): 

455 raise TypeError('data must be str, not %s' % 

456 data.__class__.__name__) 

457 with vfsopen(self, mode='w', encoding=encoding, errors=errors, newline=newline) as f: 

458 return f.write(data) 

459 

460 def _copy_from(self, source, follow_symlinks=True): 

461 """ 

462 Recursively copy the given path to this path. 

463 """ 

464 stack = [(source, self)] 

465 while stack: 

466 src, dst = stack.pop() 

467 if not follow_symlinks and src.info.is_symlink(): 

468 dst.symlink_to(vfspath(src.readlink()), src.info.is_dir()) 

469 elif src.info.is_dir(): 

470 children = src.iterdir() 

471 dst.mkdir() 

472 for child in children: 

473 stack.append((child, dst.joinpath(child.name))) 

474 else: 

475 ensure_different_files(src, dst) 

476 with vfsopen(src, 'rb') as source_f: 

477 with vfsopen(dst, 'wb') as target_f: 

478 copyfileobj(source_f, target_f) 

479 

480 

481# For tests. 

482_PathParser = PathParser 

483_JoinablePath = JoinablePath 

484_ReadablePath = ReadablePath 

485_WritablePath = WritablePath