Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/anyio/_core/_tempfile.py: 41%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

188 statements  

1from __future__ import annotations 

2 

3import os 

4import sys 

5import tempfile 

6from collections.abc import Iterable 

7from io import BytesIO, TextIOWrapper 

8from types import TracebackType 

9from typing import ( 

10 TYPE_CHECKING, 

11 Any, 

12 AnyStr, 

13 Generic, 

14 overload, 

15) 

16 

17from .. import to_thread 

18from .._core._fileio import AsyncFile 

19from ..lowlevel import checkpoint_if_cancelled 

20 

21if TYPE_CHECKING: 

22 from _typeshed import OpenBinaryMode, OpenTextMode, ReadableBuffer, WriteableBuffer 

23 

24 

25class TemporaryFile(Generic[AnyStr]): 

26 """ 

27 An asynchronous temporary file that is automatically created and cleaned up. 

28 

29 This class provides an asynchronous context manager interface to a temporary file. 

30 The file is created using Python's standard `tempfile.TemporaryFile` function in a 

31 background thread, and is wrapped as an asynchronous file using `AsyncFile`. 

32 

33 :param mode: The mode in which the file is opened. Defaults to "w+b". 

34 :param buffering: The buffering policy (-1 means the default buffering). 

35 :param encoding: The encoding used to decode or encode the file. Only applicable in 

36 text mode. 

37 :param newline: Controls how universal newlines mode works (only applicable in text 

38 mode). 

39 :param suffix: The suffix for the temporary file name. 

40 :param prefix: The prefix for the temporary file name. 

41 :param dir: The directory in which the temporary file is created. 

42 :param errors: The error handling scheme used for encoding/decoding errors. 

43 """ 

44 

45 _async_file: AsyncFile[AnyStr] 

46 

47 @overload 

48 def __init__( 

49 self: TemporaryFile[bytes], 

50 mode: OpenBinaryMode = ..., 

51 buffering: int = ..., 

52 encoding: str | None = ..., 

53 newline: str | None = ..., 

54 suffix: str | None = ..., 

55 prefix: str | None = ..., 

56 dir: str | None = ..., 

57 *, 

58 errors: str | None = ..., 

59 ): ... 

60 @overload 

61 def __init__( 

62 self: TemporaryFile[str], 

63 mode: OpenTextMode, 

64 buffering: int = ..., 

65 encoding: str | None = ..., 

66 newline: str | None = ..., 

67 suffix: str | None = ..., 

68 prefix: str | None = ..., 

69 dir: str | None = ..., 

70 *, 

71 errors: str | None = ..., 

72 ): ... 

73 

74 def __init__( 

75 self, 

76 mode: OpenTextMode | OpenBinaryMode = "w+b", 

77 buffering: int = -1, 

78 encoding: str | None = None, 

79 newline: str | None = None, 

80 suffix: str | None = None, 

81 prefix: str | None = None, 

82 dir: str | None = None, 

83 *, 

84 errors: str | None = None, 

85 ) -> None: 

86 self.mode = mode 

87 self.buffering = buffering 

88 self.encoding = encoding 

89 self.newline = newline 

90 self.suffix: str | None = suffix 

91 self.prefix: str | None = prefix 

92 self.dir: str | None = dir 

93 self.errors = errors 

94 

95 async def __aenter__(self) -> AsyncFile[AnyStr]: 

96 fp = await to_thread.run_sync( 

97 lambda: tempfile.TemporaryFile( 

98 self.mode, 

99 self.buffering, 

100 self.encoding, 

101 self.newline, 

102 self.suffix, 

103 self.prefix, 

104 self.dir, 

105 errors=self.errors, 

106 ) 

107 ) 

108 self._async_file = AsyncFile(fp) 

109 return self._async_file 

110 

111 async def __aexit__( 

112 self, 

113 exc_type: type[BaseException] | None, 

114 exc_value: BaseException | None, 

115 traceback: TracebackType | None, 

116 ) -> None: 

117 await self._async_file.aclose() 

118 

119 

120class NamedTemporaryFile(Generic[AnyStr]): 

121 """ 

122 An asynchronous named temporary file that is automatically created and cleaned up. 

123 

124 This class provides an asynchronous context manager for a temporary file with a 

125 visible name in the file system. It uses Python's standard 

126 :func:`~tempfile.NamedTemporaryFile` function and wraps the file object with 

127 :class:`AsyncFile` for asynchronous operations. 

128 

129 :param mode: The mode in which the file is opened. Defaults to "w+b". 

130 :param buffering: The buffering policy (-1 means the default buffering). 

131 :param encoding: The encoding used to decode or encode the file. Only applicable in 

132 text mode. 

133 :param newline: Controls how universal newlines mode works (only applicable in text 

134 mode). 

135 :param suffix: The suffix for the temporary file name. 

136 :param prefix: The prefix for the temporary file name. 

137 :param dir: The directory in which the temporary file is created. 

138 :param delete: Whether to delete the file when it is closed. 

139 :param errors: The error handling scheme used for encoding/decoding errors. 

140 :param delete_on_close: (Python 3.12+) Whether to delete the file on close. 

141 """ 

142 

143 _async_file: AsyncFile[AnyStr] 

144 

145 @overload 

146 def __init__( 

147 self: NamedTemporaryFile[bytes], 

148 mode: OpenBinaryMode = ..., 

149 buffering: int = ..., 

150 encoding: str | None = ..., 

151 newline: str | None = ..., 

152 suffix: str | None = ..., 

153 prefix: str | None = ..., 

154 dir: str | None = ..., 

155 delete: bool = ..., 

156 *, 

157 errors: str | None = ..., 

158 delete_on_close: bool = ..., 

159 ): ... 

160 @overload 

161 def __init__( 

162 self: NamedTemporaryFile[str], 

163 mode: OpenTextMode, 

164 buffering: int = ..., 

165 encoding: str | None = ..., 

166 newline: str | None = ..., 

167 suffix: str | None = ..., 

168 prefix: str | None = ..., 

169 dir: str | None = ..., 

170 delete: bool = ..., 

171 *, 

172 errors: str | None = ..., 

173 delete_on_close: bool = ..., 

174 ): ... 

175 

176 def __init__( 

177 self, 

178 mode: OpenBinaryMode | OpenTextMode = "w+b", 

179 buffering: int = -1, 

180 encoding: str | None = None, 

181 newline: str | None = None, 

182 suffix: str | None = None, 

183 prefix: str | None = None, 

184 dir: str | None = None, 

185 delete: bool = True, 

186 *, 

187 errors: str | None = None, 

188 delete_on_close: bool = True, 

189 ) -> None: 

190 self._params: dict[str, Any] = { 

191 "mode": mode, 

192 "buffering": buffering, 

193 "encoding": encoding, 

194 "newline": newline, 

195 "suffix": suffix, 

196 "prefix": prefix, 

197 "dir": dir, 

198 "delete": delete, 

199 "errors": errors, 

200 } 

201 if sys.version_info >= (3, 12): 

202 self._params["delete_on_close"] = delete_on_close 

203 

204 async def __aenter__(self) -> AsyncFile[AnyStr]: 

205 fp = await to_thread.run_sync( 

206 lambda: tempfile.NamedTemporaryFile(**self._params) 

207 ) 

208 self._async_file = AsyncFile(fp) 

209 return self._async_file 

210 

211 async def __aexit__( 

212 self, 

213 exc_type: type[BaseException] | None, 

214 exc_value: BaseException | None, 

215 traceback: TracebackType | None, 

216 ) -> None: 

217 await self._async_file.aclose() 

218 

219 

220class SpooledTemporaryFile(AsyncFile[AnyStr]): 

221 """ 

222 An asynchronous spooled temporary file that starts in memory and is spooled to disk. 

223 

224 This class provides an asynchronous interface to a spooled temporary file, much like 

225 Python's standard :class:`~tempfile.SpooledTemporaryFile`. It supports asynchronous 

226 write operations and provides a method to force a rollover to disk. 

227 

228 :param max_size: Maximum size in bytes before the file is rolled over to disk. 

229 :param mode: The mode in which the file is opened. Defaults to "w+b". 

230 :param buffering: The buffering policy (-1 means the default buffering). 

231 :param encoding: The encoding used to decode or encode the file (text mode only). 

232 :param newline: Controls how universal newlines mode works (text mode only). 

233 :param suffix: The suffix for the temporary file name. 

234 :param prefix: The prefix for the temporary file name. 

235 :param dir: The directory in which the temporary file is created. 

236 :param errors: The error handling scheme used for encoding/decoding errors. 

237 """ 

238 

239 _rolled: bool = False 

240 

241 @overload 

242 def __init__( 

243 self: SpooledTemporaryFile[bytes], 

244 max_size: int = ..., 

245 mode: OpenBinaryMode = ..., 

246 buffering: int = ..., 

247 encoding: str | None = ..., 

248 newline: str | None = ..., 

249 suffix: str | None = ..., 

250 prefix: str | None = ..., 

251 dir: str | None = ..., 

252 *, 

253 errors: str | None = ..., 

254 ): ... 

255 @overload 

256 def __init__( 

257 self: SpooledTemporaryFile[str], 

258 max_size: int = ..., 

259 mode: OpenTextMode = ..., 

260 buffering: int = ..., 

261 encoding: str | None = ..., 

262 newline: str | None = ..., 

263 suffix: str | None = ..., 

264 prefix: str | None = ..., 

265 dir: str | None = ..., 

266 *, 

267 errors: str | None = ..., 

268 ): ... 

269 

270 def __init__( 

271 self, 

272 max_size: int = 0, 

273 mode: OpenBinaryMode | OpenTextMode = "w+b", 

274 buffering: int = -1, 

275 encoding: str | None = None, 

276 newline: str | None = None, 

277 suffix: str | None = None, 

278 prefix: str | None = None, 

279 dir: str | None = None, 

280 *, 

281 errors: str | None = None, 

282 ) -> None: 

283 self._tempfile_params: dict[str, Any] = { 

284 "mode": mode, 

285 "buffering": buffering, 

286 "encoding": encoding, 

287 "newline": newline, 

288 "suffix": suffix, 

289 "prefix": prefix, 

290 "dir": dir, 

291 "errors": errors, 

292 } 

293 self._max_size = max_size 

294 if "b" in mode: 

295 super().__init__(BytesIO()) # type: ignore[arg-type] 

296 else: 

297 super().__init__( 

298 TextIOWrapper( # type: ignore[arg-type] 

299 BytesIO(), 

300 encoding=encoding, 

301 errors=errors, 

302 newline=newline, 

303 write_through=True, 

304 ) 

305 ) 

306 

307 async def aclose(self) -> None: 

308 if not self._rolled: 

309 self._fp.close() 

310 return 

311 

312 await super().aclose() 

313 

314 async def _check(self) -> None: 

315 if self._rolled or self._fp.tell() < self._max_size: 

316 return 

317 

318 await self.rollover() 

319 

320 async def rollover(self) -> None: 

321 if self._rolled: 

322 return 

323 

324 self._rolled = True 

325 buffer = self._fp 

326 buffer.seek(0) 

327 self._fp = await to_thread.run_sync( 

328 lambda: tempfile.TemporaryFile(**self._tempfile_params) 

329 ) 

330 await self.write(buffer.read()) 

331 buffer.close() 

332 

333 @property 

334 def closed(self) -> bool: 

335 return self._fp.closed 

336 

337 async def read(self, size: int = -1) -> AnyStr: 

338 if not self._rolled: 

339 await checkpoint_if_cancelled() 

340 return self._fp.read(size) 

341 

342 return await super().read(size) # type: ignore[return-value] 

343 

344 async def read1(self: SpooledTemporaryFile[bytes], size: int = -1) -> bytes: 

345 if not self._rolled: 

346 await checkpoint_if_cancelled() 

347 return self._fp.read1(size) 

348 

349 return await super().read1(size) 

350 

351 async def readline(self) -> AnyStr: 

352 if not self._rolled: 

353 await checkpoint_if_cancelled() 

354 return self._fp.readline() 

355 

356 return await super().readline() # type: ignore[return-value] 

357 

358 async def readlines(self) -> list[AnyStr]: 

359 if not self._rolled: 

360 await checkpoint_if_cancelled() 

361 return self._fp.readlines() 

362 

363 return await super().readlines() # type: ignore[return-value] 

364 

365 async def readinto(self: SpooledTemporaryFile[bytes], b: WriteableBuffer) -> int: 

366 if not self._rolled: 

367 await checkpoint_if_cancelled() 

368 self._fp.readinto(b) 

369 

370 return await super().readinto(b) 

371 

372 async def readinto1(self: SpooledTemporaryFile[bytes], b: WriteableBuffer) -> int: 

373 if not self._rolled: 

374 await checkpoint_if_cancelled() 

375 self._fp.readinto(b) 

376 

377 return await super().readinto1(b) 

378 

379 async def seek(self, offset: int, whence: int | None = os.SEEK_SET) -> int: 

380 if not self._rolled: 

381 await checkpoint_if_cancelled() 

382 return self._fp.seek(offset, whence) 

383 

384 return await super().seek(offset, whence) 

385 

386 async def tell(self) -> int: 

387 if not self._rolled: 

388 await checkpoint_if_cancelled() 

389 return self._fp.tell() 

390 

391 return await super().tell() 

392 

393 async def truncate(self, size: int | None = None) -> int: 

394 if not self._rolled: 

395 await checkpoint_if_cancelled() 

396 return self._fp.truncate(size) 

397 

398 return await super().truncate(size) 

399 

400 @overload 

401 async def write(self: SpooledTemporaryFile[bytes], b: ReadableBuffer) -> int: ... 

402 @overload 

403 async def write(self: SpooledTemporaryFile[str], b: str) -> int: ... 

404 

405 async def write(self, b: ReadableBuffer | str) -> int: 

406 """ 

407 Asynchronously write data to the spooled temporary file. 

408 

409 If the file has not yet been rolled over, the data is written synchronously, 

410 and a rollover is triggered if the size exceeds the maximum size. 

411 

412 :param s: The data to write. 

413 :return: The number of bytes written. 

414 :raises RuntimeError: If the underlying file is not initialized. 

415 

416 """ 

417 if not self._rolled: 

418 await checkpoint_if_cancelled() 

419 result = self._fp.write(b) 

420 await self._check() 

421 return result 

422 

423 return await super().write(b) # type: ignore[misc] 

424 

425 @overload 

426 async def writelines( 

427 self: SpooledTemporaryFile[bytes], lines: Iterable[ReadableBuffer] 

428 ) -> None: ... 

429 @overload 

430 async def writelines( 

431 self: SpooledTemporaryFile[str], lines: Iterable[str] 

432 ) -> None: ... 

433 

434 async def writelines(self, lines: Iterable[str] | Iterable[ReadableBuffer]) -> None: 

435 """ 

436 Asynchronously write a list of lines to the spooled temporary file. 

437 

438 If the file has not yet been rolled over, the lines are written synchronously, 

439 and a rollover is triggered if the size exceeds the maximum size. 

440 

441 :param lines: An iterable of lines to write. 

442 :raises RuntimeError: If the underlying file is not initialized. 

443 

444 """ 

445 if not self._rolled: 

446 await checkpoint_if_cancelled() 

447 result = self._fp.writelines(lines) 

448 await self._check() 

449 return result 

450 

451 return await super().writelines(lines) # type: ignore[misc] 

452 

453 

454class TemporaryDirectory(Generic[AnyStr]): 

455 """ 

456 An asynchronous temporary directory that is created and cleaned up automatically. 

457 

458 This class provides an asynchronous context manager for creating a temporary 

459 directory. It wraps Python's standard :class:`~tempfile.TemporaryDirectory` to 

460 perform directory creation and cleanup operations in a background thread. 

461 

462 :param suffix: Suffix to be added to the temporary directory name. 

463 :param prefix: Prefix to be added to the temporary directory name. 

464 :param dir: The parent directory where the temporary directory is created. 

465 :param ignore_cleanup_errors: Whether to ignore errors during cleanup 

466 (Python 3.10+). 

467 :param delete: Whether to delete the directory upon closing (Python 3.12+). 

468 """ 

469 

470 def __init__( 

471 self, 

472 suffix: AnyStr | None = None, 

473 prefix: AnyStr | None = None, 

474 dir: AnyStr | None = None, 

475 *, 

476 ignore_cleanup_errors: bool = False, 

477 delete: bool = True, 

478 ) -> None: 

479 self.suffix: AnyStr | None = suffix 

480 self.prefix: AnyStr | None = prefix 

481 self.dir: AnyStr | None = dir 

482 self.ignore_cleanup_errors = ignore_cleanup_errors 

483 self.delete = delete 

484 

485 self._tempdir: tempfile.TemporaryDirectory | None = None 

486 

487 async def __aenter__(self) -> str: 

488 params: dict[str, Any] = { 

489 "suffix": self.suffix, 

490 "prefix": self.prefix, 

491 "dir": self.dir, 

492 } 

493 if sys.version_info >= (3, 10): 

494 params["ignore_cleanup_errors"] = self.ignore_cleanup_errors 

495 

496 if sys.version_info >= (3, 12): 

497 params["delete"] = self.delete 

498 

499 self._tempdir = await to_thread.run_sync( 

500 lambda: tempfile.TemporaryDirectory(**params) 

501 ) 

502 return await to_thread.run_sync(self._tempdir.__enter__) 

503 

504 async def __aexit__( 

505 self, 

506 exc_type: type[BaseException] | None, 

507 exc_value: BaseException | None, 

508 traceback: TracebackType | None, 

509 ) -> None: 

510 if self._tempdir is not None: 

511 await to_thread.run_sync( 

512 self._tempdir.__exit__, exc_type, exc_value, traceback 

513 ) 

514 

515 async def cleanup(self) -> None: 

516 if self._tempdir is not None: 

517 await to_thread.run_sync(self._tempdir.cleanup) 

518 

519 

520@overload 

521async def mkstemp( 

522 suffix: str | None = None, 

523 prefix: str | None = None, 

524 dir: str | None = None, 

525 text: bool = False, 

526) -> tuple[int, str]: ... 

527 

528 

529@overload 

530async def mkstemp( 

531 suffix: bytes | None = None, 

532 prefix: bytes | None = None, 

533 dir: bytes | None = None, 

534 text: bool = False, 

535) -> tuple[int, bytes]: ... 

536 

537 

538async def mkstemp( 

539 suffix: AnyStr | None = None, 

540 prefix: AnyStr | None = None, 

541 dir: AnyStr | None = None, 

542 text: bool = False, 

543) -> tuple[int, str | bytes]: 

544 """ 

545 Asynchronously create a temporary file and return an OS-level handle and the file 

546 name. 

547 

548 This function wraps `tempfile.mkstemp` and executes it in a background thread. 

549 

550 :param suffix: Suffix to be added to the file name. 

551 :param prefix: Prefix to be added to the file name. 

552 :param dir: Directory in which the temporary file is created. 

553 :param text: Whether the file is opened in text mode. 

554 :return: A tuple containing the file descriptor and the file name. 

555 

556 """ 

557 return await to_thread.run_sync(tempfile.mkstemp, suffix, prefix, dir, text) 

558 

559 

560@overload 

561async def mkdtemp( 

562 suffix: str | None = None, 

563 prefix: str | None = None, 

564 dir: str | None = None, 

565) -> str: ... 

566 

567 

568@overload 

569async def mkdtemp( 

570 suffix: bytes | None = None, 

571 prefix: bytes | None = None, 

572 dir: bytes | None = None, 

573) -> bytes: ... 

574 

575 

576async def mkdtemp( 

577 suffix: AnyStr | None = None, 

578 prefix: AnyStr | None = None, 

579 dir: AnyStr | None = None, 

580) -> str | bytes: 

581 """ 

582 Asynchronously create a temporary directory and return its path. 

583 

584 This function wraps `tempfile.mkdtemp` and executes it in a background thread. 

585 

586 :param suffix: Suffix to be added to the directory name. 

587 :param prefix: Prefix to be added to the directory name. 

588 :param dir: Parent directory where the temporary directory is created. 

589 :return: The path of the created temporary directory. 

590 

591 """ 

592 return await to_thread.run_sync(tempfile.mkdtemp, suffix, prefix, dir) 

593 

594 

595async def gettempdir() -> str: 

596 """ 

597 Asynchronously return the name of the directory used for temporary files. 

598 

599 This function wraps `tempfile.gettempdir` and executes it in a background thread. 

600 

601 :return: The path of the temporary directory as a string. 

602 

603 """ 

604 return await to_thread.run_sync(tempfile.gettempdir) 

605 

606 

607async def gettempdirb() -> bytes: 

608 """ 

609 Asynchronously return the name of the directory used for temporary files in bytes. 

610 

611 This function wraps `tempfile.gettempdirb` and executes it in a background thread. 

612 

613 :return: The path of the temporary directory as bytes. 

614 

615 """ 

616 return await to_thread.run_sync(tempfile.gettempdirb)