1"""io-related utilities"""
2
3# Copyright (c) Jupyter Development Team.
4# Distributed under the terms of the Modified BSD License.
5
6import codecs
7import errno
8import os
9import random
10import shutil
11import sys
12from typing import Any
13
14
15def unicode_std_stream(stream="stdout"):
16 """Get a wrapper to write unicode to stdout/stderr as UTF-8.
17
18 This ignores environment variables and default encodings, to reliably write
19 unicode to stdout or stderr.
20
21 ::
22
23 unicode_std_stream().write(u'ł@e¶ŧ←')
24 """
25 assert stream in ("stdout", "stderr")
26 stream = getattr(sys, stream)
27
28 try:
29 stream_b = stream.buffer
30 except AttributeError:
31 # sys.stdout has been replaced - use it directly
32 return stream
33
34 return codecs.getwriter("utf-8")(stream_b)
35
36
37def unicode_stdin_stream():
38 """Get a wrapper to read unicode from stdin as UTF-8.
39
40 This ignores environment variables and default encodings, to reliably read unicode from stdin.
41
42 ::
43
44 totreat = unicode_stdin_stream().read()
45 """
46 stream = sys.stdin
47 try:
48 stream_b = stream.buffer
49 except AttributeError:
50 return stream
51
52 return codecs.getreader("utf-8")(stream_b)
53
54
55class FormatSafeDict(dict[Any, Any]):
56 """Format a dictionary safely."""
57
58 def __missing__(self, key):
59 """Handle missing value."""
60 return "{" + key + "}"
61
62
63try:
64 ENOLINK = errno.ENOLINK
65except AttributeError:
66 ENOLINK = 1998
67
68
69def link(src, dst):
70 """Hard links ``src`` to ``dst``, returning 0 or errno.
71
72 Note that the special errno ``ENOLINK`` will be returned if ``os.link`` isn't
73 supported by the operating system.
74 """
75
76 if not hasattr(os, "link"):
77 return ENOLINK
78 link_errno = 0
79 try:
80 os.link(src, dst)
81 except OSError as e:
82 link_errno = e.errno
83 return link_errno
84
85
86def link_or_copy(src, dst):
87 """Attempts to hardlink ``src`` to ``dst``, copying if the link fails.
88
89 Attempts to maintain the semantics of ``shutil.copy``.
90
91 Because ``os.link`` does not overwrite files, a unique temporary file
92 will be used if the target already exists, then that file will be moved
93 into place.
94 """
95
96 if os.path.isdir(dst):
97 dst = os.path.join(dst, os.path.basename(src))
98
99 link_errno = link(src, dst)
100 if link_errno == errno.EEXIST:
101 if os.stat(src).st_ino == os.stat(dst).st_ino:
102 # dst is already a hard link to the correct file, so we don't need
103 # to do anything else. If we try to link and rename the file
104 # anyway, we get duplicate files - see http://bugs.python.org/issue21876
105 return
106
107 new_dst = dst + f"-temp-{random.randint(1, 16**4):04X}" # noqa: S311
108 try:
109 link_or_copy(src, new_dst)
110 except BaseException:
111 try:
112 os.remove(new_dst)
113 except OSError:
114 pass
115 raise
116 os.rename(new_dst, dst)
117 elif link_errno != 0:
118 # Either link isn't supported, or the filesystem doesn't support
119 # linking, or 'src' and 'dst' are on different filesystems.
120 shutil.copy(src, dst)