Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pathlib_abc/_os.py: 18%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""
2Low-level OS functionality wrappers used by pathlib.
3"""
5from errno import *
6from io import TextIOWrapper
7import os
8import sys
9try:
10 from io import text_encoding
11except ImportError:
12 def text_encoding(encoding):
13 return encoding
14try:
15 import fcntl
16except ImportError:
17 fcntl = None
18try:
19 import posix
20except ImportError:
21 posix = None
22try:
23 import _winapi
24except ImportError:
25 _winapi = None
28def _get_copy_blocksize(infd):
29 """Determine blocksize for fastcopying on Linux.
30 Hopefully the whole file will be copied in a single call.
31 The copying itself should be performed in a loop 'till EOF is
32 reached (0 return) so a blocksize smaller or bigger than the actual
33 file size should not make any difference, also in case the file
34 content changes while being copied.
35 """
36 try:
37 blocksize = max(os.fstat(infd).st_size, 2 ** 23) # min 8 MiB
38 except OSError:
39 blocksize = 2 ** 27 # 128 MiB
40 # On 32-bit architectures truncate to 1 GiB to avoid OverflowError,
41 # see gh-82500.
42 if sys.maxsize < 2 ** 32:
43 blocksize = min(blocksize, 2 ** 30)
44 return blocksize
47if fcntl and hasattr(fcntl, 'FICLONE'):
48 def _ficlone(source_fd, target_fd):
49 """
50 Perform a lightweight copy of two files, where the data blocks are
51 copied only when modified. This is known as Copy on Write (CoW),
52 instantaneous copy or reflink.
53 """
54 fcntl.ioctl(target_fd, fcntl.FICLONE, source_fd)
55else:
56 _ficlone = None
59if posix and hasattr(posix, '_fcopyfile'):
60 def _fcopyfile(source_fd, target_fd):
61 """
62 Copy a regular file content using high-performance fcopyfile(3)
63 syscall (macOS).
64 """
65 posix._fcopyfile(source_fd, target_fd, posix._COPYFILE_DATA)
66else:
67 _fcopyfile = None
70if hasattr(os, 'copy_file_range'):
71 def _copy_file_range(source_fd, target_fd):
72 """
73 Copy data from one regular mmap-like fd to another by using a
74 high-performance copy_file_range(2) syscall that gives filesystems
75 an opportunity to implement the use of reflinks or server-side
76 copy.
77 This should work on Linux >= 4.5 only.
78 """
79 blocksize = _get_copy_blocksize(source_fd)
80 offset = 0
81 while True:
82 sent = os.copy_file_range(source_fd, target_fd, blocksize,
83 offset_dst=offset)
84 if sent == 0:
85 break # EOF
86 offset += sent
87else:
88 _copy_file_range = None
91if hasattr(os, 'sendfile'):
92 def _sendfile(source_fd, target_fd):
93 """Copy data from one regular mmap-like fd to another by using
94 high-performance sendfile(2) syscall.
95 This should work on Linux >= 2.6.33 only.
96 """
97 blocksize = _get_copy_blocksize(source_fd)
98 offset = 0
99 while True:
100 sent = os.sendfile(target_fd, source_fd, offset, blocksize)
101 if sent == 0:
102 break # EOF
103 offset += sent
104else:
105 _sendfile = None
108if _winapi and hasattr(_winapi, 'CopyFile2'):
109 def copyfile2(source, target):
110 """
111 Copy from one file to another using CopyFile2 (Windows only).
112 """
113 _winapi.CopyFile2(source, target, 0)
114else:
115 copyfile2 = None
118def copyfileobj(source_f, target_f):
119 """
120 Copy data from file-like object source_f to file-like object target_f.
121 """
122 try:
123 source_fd = source_f.fileno()
124 target_fd = target_f.fileno()
125 except Exception:
126 pass # Fall through to generic code.
127 else:
128 try:
129 # Use OS copy-on-write where available.
130 if _ficlone:
131 try:
132 _ficlone(source_fd, target_fd)
133 return
134 except OSError as err:
135 if err.errno not in (EBADF, EOPNOTSUPP, ETXTBSY, EXDEV):
136 raise err
138 # Use OS copy where available.
139 if _fcopyfile:
140 try:
141 _fcopyfile(source_fd, target_fd)
142 return
143 except OSError as err:
144 if err.errno not in (EINVAL, ENOTSUP):
145 raise err
146 if _copy_file_range:
147 try:
148 _copy_file_range(source_fd, target_fd)
149 return
150 except OSError as err:
151 if err.errno not in (ETXTBSY, EXDEV):
152 raise err
153 if _sendfile:
154 try:
155 _sendfile(source_fd, target_fd)
156 return
157 except OSError as err:
158 if err.errno != ENOTSOCK:
159 raise err
160 except OSError as err:
161 # Produce more useful error messages.
162 err.filename = source_f.name
163 err.filename2 = target_f.name
164 raise err
166 # Last resort: copy with fileobj read() and write().
167 read_source = source_f.read
168 write_target = target_f.write
169 while buf := read_source(1024 * 1024):
170 write_target(buf)
173def _open_reader(obj):
174 cls = type(obj)
175 try:
176 open_reader = cls.__open_reader__
177 except AttributeError:
178 cls_name = cls.__name__
179 raise TypeError(f"{cls_name} can't be opened for reading") from None
180 else:
181 return open_reader(obj)
184def _open_writer(obj, mode):
185 cls = type(obj)
186 try:
187 open_writer = cls.__open_writer__
188 except AttributeError:
189 cls_name = cls.__name__
190 raise TypeError(f"{cls_name} can't be opened for writing") from None
191 else:
192 return open_writer(obj, mode)
195def _open_updater(obj, mode):
196 cls = type(obj)
197 try:
198 open_updater = cls.__open_updater__
199 except AttributeError:
200 cls_name = cls.__name__
201 raise TypeError(f"{cls_name} can't be opened for updating") from None
202 else:
203 return open_updater(obj, mode)
206def vfsopen(obj, mode='r', buffering=-1, encoding=None, errors=None,
207 newline=None):
208 """
209 Open the file pointed to by this path and return a file object, as
210 the built-in open() function does.
212 Unlike the built-in open() function, this function additionally accepts
213 'openable' objects, which are objects with any of these special methods:
215 __open_reader__()
216 __open_writer__(mode)
217 __open_updater__(mode)
219 '__open_reader__' is called for 'r' mode; '__open_writer__' for 'a', 'w'
220 and 'x' modes; and '__open_updater__' for 'r+' and 'w+' modes. If text
221 mode is requested, the result is wrapped in an io.TextIOWrapper object.
222 """
223 if buffering != -1:
224 raise ValueError("buffer size can't be customized")
225 text = 'b' not in mode
226 if text:
227 # Call io.text_encoding() here to ensure any warning is raised at an
228 # appropriate stack level.
229 encoding = text_encoding(encoding)
230 try:
231 return open(obj, mode, buffering, encoding, errors, newline)
232 except TypeError:
233 pass
234 if not text:
235 if encoding is not None:
236 raise ValueError("binary mode doesn't take an encoding argument")
237 if errors is not None:
238 raise ValueError("binary mode doesn't take an errors argument")
239 if newline is not None:
240 raise ValueError("binary mode doesn't take a newline argument")
241 mode = ''.join(sorted(c for c in mode if c not in 'bt'))
242 if mode == 'r':
243 stream = _open_reader(obj)
244 elif mode in ('a', 'w', 'x'):
245 stream = _open_writer(obj, mode)
246 elif mode in ('+r', '+w'):
247 stream = _open_updater(obj, mode[1])
248 else:
249 raise ValueError(f'invalid mode: {mode}')
250 if text:
251 stream = TextIOWrapper(stream, encoding, errors, newline)
252 return stream
255def vfspath(obj):
256 """
257 Return the string representation of a virtual path object.
258 """
259 cls = type(obj)
260 try:
261 vfspath_method = cls.__vfspath__
262 except AttributeError:
263 cls_name = cls.__name__
264 raise TypeError(f"expected JoinablePath object, not {cls_name}") from None
265 else:
266 return vfspath_method(obj)
269def ensure_distinct_paths(source, target):
270 """
271 Raise OSError(EINVAL) if the other path is within this path.
272 """
273 # Note: there is no straightforward, foolproof algorithm to determine
274 # if one directory is within another (a particularly perverse example
275 # would be a single network share mounted in one location via NFS, and
276 # in another location via CIFS), so we simply checks whether the
277 # other path is lexically equal to, or within, this path.
278 if source == target:
279 err = OSError(EINVAL, "Source and target are the same path")
280 elif source in target.parents:
281 err = OSError(EINVAL, "Source path is a parent of target path")
282 else:
283 return
284 err.filename = vfspath(source)
285 err.filename2 = vfspath(target)
286 raise err
289def ensure_different_files(source, target):
290 """
291 Raise OSError(EINVAL) if both paths refer to the same file.
292 """
293 try:
294 source_file_id = source.info._file_id
295 target_file_id = target.info._file_id
296 except AttributeError:
297 if source != target:
298 return
299 else:
300 try:
301 if source_file_id() != target_file_id():
302 return
303 except (OSError, ValueError):
304 return
305 err = OSError(EINVAL, "Source and target are the same file")
306 err.filename = vfspath(source)
307 err.filename2 = vfspath(target)
308 raise err