Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/nbconvert/utils/io.py: 28%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

58 statements  

1"""io-related utilities""" 

2 

3# Copyright (c) Jupyter Development Team. 

4# Distributed under the terms of the Modified BSD License. 

5 

6import codecs 

7import errno 

8import os 

9import random 

10import shutil 

11import sys 

12from typing import Any 

13 

14 

15def unicode_std_stream(stream="stdout"): 

16 """Get a wrapper to write unicode to stdout/stderr as UTF-8. 

17 

18 This ignores environment variables and default encodings, to reliably write 

19 unicode to stdout or stderr. 

20 

21 :: 

22 

23 unicode_std_stream().write(u'ł@e¶ŧ←') 

24 """ 

25 assert stream in ("stdout", "stderr") 

26 stream = getattr(sys, stream) 

27 

28 try: 

29 stream_b = stream.buffer 

30 except AttributeError: 

31 # sys.stdout has been replaced - use it directly 

32 return stream 

33 

34 return codecs.getwriter("utf-8")(stream_b) 

35 

36 

37def unicode_stdin_stream(): 

38 """Get a wrapper to read unicode from stdin as UTF-8. 

39 

40 This ignores environment variables and default encodings, to reliably read unicode from stdin. 

41 

42 :: 

43 

44 totreat = unicode_stdin_stream().read() 

45 """ 

46 stream = sys.stdin 

47 try: 

48 stream_b = stream.buffer 

49 except AttributeError: 

50 return stream 

51 

52 return codecs.getreader("utf-8")(stream_b) 

53 

54 

55class FormatSafeDict(dict[Any, Any]): 

56 """Format a dictionary safely.""" 

57 

58 def __missing__(self, key): 

59 """Handle missing value.""" 

60 return "{" + key + "}" 

61 

62 

63try: 

64 ENOLINK = errno.ENOLINK 

65except AttributeError: 

66 ENOLINK = 1998 

67 

68 

69def link(src, dst): 

70 """Hard links ``src`` to ``dst``, returning 0 or errno. 

71 

72 Note that the special errno ``ENOLINK`` will be returned if ``os.link`` isn't 

73 supported by the operating system. 

74 """ 

75 

76 if not hasattr(os, "link"): 

77 return ENOLINK 

78 link_errno = 0 

79 try: 

80 os.link(src, dst) 

81 except OSError as e: 

82 link_errno = e.errno 

83 return link_errno 

84 

85 

86def link_or_copy(src, dst): 

87 """Attempts to hardlink ``src`` to ``dst``, copying if the link fails. 

88 

89 Attempts to maintain the semantics of ``shutil.copy``. 

90 

91 Because ``os.link`` does not overwrite files, a unique temporary file 

92 will be used if the target already exists, then that file will be moved 

93 into place. 

94 """ 

95 

96 if os.path.isdir(dst): 

97 dst = os.path.join(dst, os.path.basename(src)) 

98 

99 link_errno = link(src, dst) 

100 if link_errno == errno.EEXIST: 

101 if os.stat(src).st_ino == os.stat(dst).st_ino: 

102 # dst is already a hard link to the correct file, so we don't need 

103 # to do anything else. If we try to link and rename the file 

104 # anyway, we get duplicate files - see http://bugs.python.org/issue21876 

105 return 

106 

107 new_dst = dst + f"-temp-{random.randint(1, 16**4):04X}" # noqa: S311 

108 try: 

109 link_or_copy(src, new_dst) 

110 except BaseException: 

111 try: 

112 os.remove(new_dst) 

113 except OSError: 

114 pass 

115 raise 

116 os.rename(new_dst, dst) 

117 elif link_errno != 0: 

118 # Either link isn't supported, or the filesystem doesn't support 

119 # linking, or 'src' and 'dst' are on different filesystems. 

120 shutil.copy(src, dst)