Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pikepdf/_io.py: 29%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

49 statements  

1# SPDX-FileCopyrightText: 2023 James R. Barlow 

2# SPDX-License-Identifier: MPL-2.0 

3 

4from __future__ import annotations 

5 

6from collections.abc import Generator 

7from contextlib import contextmanager, suppress 

8from io import TextIOBase 

9from os import PathLike 

10from pathlib import Path 

11from shutil import copystat 

12from tempfile import NamedTemporaryFile 

13from typing import IO 

14 

15 

16def check_stream_is_usable(stream: IO) -> None: 

17 """Check that a stream is seekable and binary.""" 

18 if isinstance(stream, TextIOBase): 

19 raise TypeError("stream must be binary (no transcoding) and seekable") 

20 

21 

22def check_different_files(file1: str | PathLike, file2: str | PathLike) -> None: 

23 """Check that two files are different.""" 

24 with suppress(FileNotFoundError): 

25 if Path(file1) == Path(file2) or Path(file1).samefile(Path(file2)): 

26 raise ValueError( 

27 "Cannot overwrite input file. Open the file with " 

28 "pikepdf.open(..., allow_overwriting_input=True) to " 

29 "allow overwriting the input file." 

30 ) 

31 

32 

33@contextmanager 

34def atomic_overwrite(filename: Path) -> Generator[IO[bytes], None, None]: 

35 """Atomically ovewrite a file. 

36 

37 If the destination file does not exist, it is created. If writing fails, 

38 the destination file is deleted. 

39 

40 If the destination file does exist, a temporaryfile is created in the same 

41 directory, and data is written to that file. If writing succeeds, the temporary 

42 file is renamed to the destination file. If writing fails, the temporary file 

43 is deleted and the original destination file is left untouched. 

44 """ 

45 try: 

46 # Try to create the file using exclusive creation mode 

47 stream = filename.open("xb") 

48 except FileExistsError: 

49 pass 

50 else: 

51 # We were able to create the file, so we can use it directly 

52 try: 

53 with stream: 

54 yield stream 

55 except (Exception, KeyboardInterrupt): 

56 # ...but if an error occurs while using it, clean up 

57 with suppress(OSError): 

58 filename.unlink() 

59 raise 

60 return 

61 

62 # If we get here, the file already exists. Use a temporary file, then rename 

63 # it to the destination file if we succeed. Destination file is not touched 

64 # if we fail. 

65 

66 with filename.open("ab") as stream: 

67 pass # Confirm we will be able to write to the indicated destination 

68 

69 tf = None 

70 try: 

71 tf = NamedTemporaryFile( 

72 dir=filename.parent, prefix=f".pikepdf.{filename.name}", delete=False 

73 ) 

74 yield tf 

75 tf.flush() 

76 tf.close() 

77 with suppress(OSError): 

78 # Copy permissions, create time, etc. from the original 

79 copystat(filename, Path(tf.name)) 

80 Path(tf.name).replace(filename) 

81 with suppress(OSError): 

82 # Update modified time of the destination file 

83 filename.touch() 

84 finally: 

85 if tf is not None: 

86 with suppress(OSError): 

87 tf.close() 

88 with suppress(OSError): 

89 Path(tf.name).unlink()