Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pikepdf/_io.py: 26%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

57 statements  

1# SPDX-FileCopyrightText: 2023 James R. Barlow 

2# SPDX-License-Identifier: MPL-2.0 

3 

4from __future__ import annotations 

5 

6import os 

7from collections.abc import Generator 

8from contextlib import contextmanager, suppress 

9from io import TextIOBase 

10from os import PathLike 

11from pathlib import Path 

12from shutil import copystat 

13from tempfile import NamedTemporaryFile 

14from typing import IO 

15 

16 

17def check_stream_is_usable(stream: IO) -> None: 

18 """Check that a stream is seekable and binary.""" 

19 if isinstance(stream, TextIOBase): 

20 raise TypeError("stream must be binary (no transcoding) and seekable") 

21 

22 

23def check_different_files(file1: str | PathLike, file2: str | PathLike) -> None: 

24 """Check that two files are different.""" 

25 with suppress(FileNotFoundError): 

26 if Path(file1) == Path(file2) or Path(file1).samefile(Path(file2)): 

27 raise ValueError( 

28 "Cannot overwrite input file. Open the file with " 

29 "pikepdf.open(..., allow_overwriting_input=True) to " 

30 "allow overwriting the input file." 

31 ) 

32 

33 

34@contextmanager 

35def atomic_overwrite(filename: Path) -> Generator[IO[bytes], None, None]: 

36 """Atomically ovewrite a file. 

37 

38 If the destination file does not exist, it is created. If writing fails, 

39 the destination file is deleted. 

40 

41 If the destination file does exist, a temporaryfile is created in the same 

42 directory, and data is written to that file. If writing succeeds, the temporary 

43 file is renamed to the destination file. If writing fails, the temporary file 

44 is deleted and the original destination file is left untouched. 

45 """ 

46 try: 

47 # Try to create the file using exclusive creation mode 

48 stream = filename.open("xb") 

49 except FileExistsError: 

50 pass 

51 else: 

52 # We were able to create the file, so we can use it directly 

53 try: 

54 with stream: 

55 yield stream 

56 except (Exception, KeyboardInterrupt): 

57 # ...but if an error occurs while using it, clean up 

58 with suppress(OSError): 

59 filename.unlink() 

60 raise 

61 return 

62 

63 # If we get here, the file already exists. Use a temporary file, then rename 

64 # it to the destination file if we succeed. Destination file is not touched 

65 # if we fail. 

66 

67 with filename.open("ab") as stream: 

68 pass # Confirm we will be able to write to the indicated destination 

69 

70 tf = None 

71 try: 

72 try: 

73 # First try to create the file in the same directory, so that Path.replace() 

74 # is more likely to be atomic. 

75 tf = NamedTemporaryFile( 

76 dir=filename.parent, prefix=f".pikepdf.{filename.name}", delete=False 

77 ) 

78 except PermissionError: 

79 # If same directory fails, write to stand temporary folder (losing 

80 # atomicity, if different file systems) 

81 tf = NamedTemporaryFile(prefix=f".pikepdf.{filename.name}", delete=False) 

82 yield tf 

83 tf.flush() 

84 tf.close() 

85 with suppress(OSError): 

86 # Copy permissions, create time, etc. from the original 

87 copystat(filename, Path(tf.name)) 

88 try: 

89 Path(tf.name).replace(filename) 

90 except OSError: 

91 if not Path(filename).resolve().samefile(os.devnull): 

92 # If user is writing to /dev/null, ignore permission errors 

93 raise 

94 with suppress(OSError): 

95 # Update modified time of the destination file 

96 filename.touch() 

97 finally: 

98 if tf is not None: 

99 with suppress(OSError): 

100 tf.close() 

101 with suppress(OSError): 

102 Path(tf.name).unlink()