Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pikepdf/_io.py: 29%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# SPDX-FileCopyrightText: 2023 James R. Barlow
2# SPDX-License-Identifier: MPL-2.0
4from __future__ import annotations
6from collections.abc import Generator
7from contextlib import contextmanager, suppress
8from io import TextIOBase
9from os import PathLike
10from pathlib import Path
11from shutil import copystat
12from tempfile import NamedTemporaryFile
13from typing import IO
16def check_stream_is_usable(stream: IO) -> None:
17 """Check that a stream is seekable and binary."""
18 if isinstance(stream, TextIOBase):
19 raise TypeError("stream must be binary (no transcoding) and seekable")
22def check_different_files(file1: str | PathLike, file2: str | PathLike) -> None:
23 """Check that two files are different."""
24 with suppress(FileNotFoundError):
25 if Path(file1) == Path(file2) or Path(file1).samefile(Path(file2)):
26 raise ValueError(
27 "Cannot overwrite input file. Open the file with "
28 "pikepdf.open(..., allow_overwriting_input=True) to "
29 "allow overwriting the input file."
30 )
33@contextmanager
34def atomic_overwrite(filename: Path) -> Generator[IO[bytes], None, None]:
35 """Atomically ovewrite a file.
37 If the destination file does not exist, it is created. If writing fails,
38 the destination file is deleted.
40 If the destination file does exist, a temporaryfile is created in the same
41 directory, and data is written to that file. If writing succeeds, the temporary
42 file is renamed to the destination file. If writing fails, the temporary file
43 is deleted and the original destination file is left untouched.
44 """
45 try:
46 # Try to create the file using exclusive creation mode
47 stream = filename.open("xb")
48 except FileExistsError:
49 pass
50 else:
51 # We were able to create the file, so we can use it directly
52 try:
53 with stream:
54 yield stream
55 except (Exception, KeyboardInterrupt):
56 # ...but if an error occurs while using it, clean up
57 with suppress(OSError):
58 filename.unlink()
59 raise
60 return
62 # If we get here, the file already exists. Use a temporary file, then rename
63 # it to the destination file if we succeed. Destination file is not touched
64 # if we fail.
66 with filename.open("ab") as stream:
67 pass # Confirm we will be able to write to the indicated destination
69 tf = None
70 try:
71 tf = NamedTemporaryFile(
72 dir=filename.parent, prefix=f".pikepdf.{filename.name}", delete=False
73 )
74 yield tf
75 tf.flush()
76 tf.close()
77 with suppress(OSError):
78 # Copy permissions, create time, etc. from the original
79 copystat(filename, Path(tf.name))
80 Path(tf.name).replace(filename)
81 with suppress(OSError):
82 # Update modified time of the destination file
83 filename.touch()
84 finally:
85 if tf is not None:
86 with suppress(OSError):
87 tf.close()
88 with suppress(OSError):
89 Path(tf.name).unlink()