Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pikepdf/_io.py: 26%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# SPDX-FileCopyrightText: 2023 James R. Barlow
2# SPDX-License-Identifier: MPL-2.0
4from __future__ import annotations
6import os
7from collections.abc import Generator
8from contextlib import contextmanager, suppress
9from io import TextIOBase
10from os import PathLike
11from pathlib import Path
12from shutil import copystat
13from tempfile import NamedTemporaryFile
14from typing import IO
17def check_stream_is_usable(stream: IO) -> None:
18 """Check that a stream is seekable and binary."""
19 if isinstance(stream, TextIOBase):
20 raise TypeError("stream must be binary (no transcoding) and seekable")
23def check_different_files(file1: str | PathLike, file2: str | PathLike) -> None:
24 """Check that two files are different."""
25 with suppress(FileNotFoundError):
26 if Path(file1) == Path(file2) or Path(file1).samefile(Path(file2)):
27 raise ValueError(
28 "Cannot overwrite input file. Open the file with "
29 "pikepdf.open(..., allow_overwriting_input=True) to "
30 "allow overwriting the input file."
31 )
34@contextmanager
35def atomic_overwrite(filename: Path) -> Generator[IO[bytes], None, None]:
36 """Atomically ovewrite a file.
38 If the destination file does not exist, it is created. If writing fails,
39 the destination file is deleted.
41 If the destination file does exist, a temporaryfile is created in the same
42 directory, and data is written to that file. If writing succeeds, the temporary
43 file is renamed to the destination file. If writing fails, the temporary file
44 is deleted and the original destination file is left untouched.
45 """
46 try:
47 # Try to create the file using exclusive creation mode
48 stream = filename.open("xb")
49 except FileExistsError:
50 pass
51 else:
52 # We were able to create the file, so we can use it directly
53 try:
54 with stream:
55 yield stream
56 except (Exception, KeyboardInterrupt):
57 # ...but if an error occurs while using it, clean up
58 with suppress(OSError):
59 filename.unlink()
60 raise
61 return
63 # If we get here, the file already exists. Use a temporary file, then rename
64 # it to the destination file if we succeed. Destination file is not touched
65 # if we fail.
67 with filename.open("ab") as stream:
68 pass # Confirm we will be able to write to the indicated destination
70 tf = None
71 try:
72 try:
73 # First try to create the file in the same directory, so that Path.replace()
74 # is more likely to be atomic.
75 tf = NamedTemporaryFile(
76 dir=filename.parent, prefix=f".pikepdf.{filename.name}", delete=False
77 )
78 except PermissionError:
79 # If same directory fails, write to stand temporary folder (losing
80 # atomicity, if different file systems)
81 tf = NamedTemporaryFile(prefix=f".pikepdf.{filename.name}", delete=False)
82 yield tf
83 tf.flush()
84 tf.close()
85 with suppress(OSError):
86 # Copy permissions, create time, etc. from the original
87 copystat(filename, Path(tf.name))
88 try:
89 Path(tf.name).replace(filename)
90 except OSError:
91 if not Path(filename).resolve().samefile(os.devnull):
92 # If user is writing to /dev/null, ignore permission errors
93 raise
94 with suppress(OSError):
95 # Update modified time of the destination file
96 filename.touch()
97 finally:
98 if tf is not None:
99 with suppress(OSError):
100 tf.close()
101 with suppress(OSError):
102 Path(tf.name).unlink()