Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pathlib_abc/_os.py: 18%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

190 statements  

1""" 

2Low-level OS functionality wrappers used by pathlib. 

3""" 

4 

5from errno import * 

6from io import TextIOWrapper 

7import os 

8import sys 

9try: 

10 from io import text_encoding 

11except ImportError: 

12 def text_encoding(encoding): 

13 return encoding 

14try: 

15 import fcntl 

16except ImportError: 

17 fcntl = None 

18try: 

19 import posix 

20except ImportError: 

21 posix = None 

22try: 

23 import _winapi 

24except ImportError: 

25 _winapi = None 

26 

27 

28def _get_copy_blocksize(infd): 

29 """Determine blocksize for fastcopying on Linux. 

30 Hopefully the whole file will be copied in a single call. 

31 The copying itself should be performed in a loop 'till EOF is 

32 reached (0 return) so a blocksize smaller or bigger than the actual 

33 file size should not make any difference, also in case the file 

34 content changes while being copied. 

35 """ 

36 try: 

37 blocksize = max(os.fstat(infd).st_size, 2 ** 23) # min 8 MiB 

38 except OSError: 

39 blocksize = 2 ** 27 # 128 MiB 

40 # On 32-bit architectures truncate to 1 GiB to avoid OverflowError, 

41 # see gh-82500. 

42 if sys.maxsize < 2 ** 32: 

43 blocksize = min(blocksize, 2 ** 30) 

44 return blocksize 

45 

46 

47if fcntl and hasattr(fcntl, 'FICLONE'): 

48 def _ficlone(source_fd, target_fd): 

49 """ 

50 Perform a lightweight copy of two files, where the data blocks are 

51 copied only when modified. This is known as Copy on Write (CoW), 

52 instantaneous copy or reflink. 

53 """ 

54 fcntl.ioctl(target_fd, fcntl.FICLONE, source_fd) 

55else: 

56 _ficlone = None 

57 

58 

59if posix and hasattr(posix, '_fcopyfile'): 

60 def _fcopyfile(source_fd, target_fd): 

61 """ 

62 Copy a regular file content using high-performance fcopyfile(3) 

63 syscall (macOS). 

64 """ 

65 posix._fcopyfile(source_fd, target_fd, posix._COPYFILE_DATA) 

66else: 

67 _fcopyfile = None 

68 

69 

70if hasattr(os, 'copy_file_range'): 

71 def _copy_file_range(source_fd, target_fd): 

72 """ 

73 Copy data from one regular mmap-like fd to another by using a 

74 high-performance copy_file_range(2) syscall that gives filesystems 

75 an opportunity to implement the use of reflinks or server-side 

76 copy. 

77 This should work on Linux >= 4.5 only. 

78 """ 

79 blocksize = _get_copy_blocksize(source_fd) 

80 offset = 0 

81 while True: 

82 sent = os.copy_file_range(source_fd, target_fd, blocksize, 

83 offset_dst=offset) 

84 if sent == 0: 

85 break # EOF 

86 offset += sent 

87else: 

88 _copy_file_range = None 

89 

90 

91if hasattr(os, 'sendfile'): 

92 def _sendfile(source_fd, target_fd): 

93 """Copy data from one regular mmap-like fd to another by using 

94 high-performance sendfile(2) syscall. 

95 This should work on Linux >= 2.6.33 only. 

96 """ 

97 blocksize = _get_copy_blocksize(source_fd) 

98 offset = 0 

99 while True: 

100 sent = os.sendfile(target_fd, source_fd, offset, blocksize) 

101 if sent == 0: 

102 break # EOF 

103 offset += sent 

104else: 

105 _sendfile = None 

106 

107 

108if _winapi and hasattr(_winapi, 'CopyFile2'): 

109 def copyfile2(source, target): 

110 """ 

111 Copy from one file to another using CopyFile2 (Windows only). 

112 """ 

113 _winapi.CopyFile2(source, target, 0) 

114else: 

115 copyfile2 = None 

116 

117 

118def copyfileobj(source_f, target_f): 

119 """ 

120 Copy data from file-like object source_f to file-like object target_f. 

121 """ 

122 try: 

123 source_fd = source_f.fileno() 

124 target_fd = target_f.fileno() 

125 except Exception: 

126 pass # Fall through to generic code. 

127 else: 

128 try: 

129 # Use OS copy-on-write where available. 

130 if _ficlone: 

131 try: 

132 _ficlone(source_fd, target_fd) 

133 return 

134 except OSError as err: 

135 if err.errno not in (EBADF, EOPNOTSUPP, ETXTBSY, EXDEV): 

136 raise err 

137 

138 # Use OS copy where available. 

139 if _fcopyfile: 

140 try: 

141 _fcopyfile(source_fd, target_fd) 

142 return 

143 except OSError as err: 

144 if err.errno not in (EINVAL, ENOTSUP): 

145 raise err 

146 if _copy_file_range: 

147 try: 

148 _copy_file_range(source_fd, target_fd) 

149 return 

150 except OSError as err: 

151 if err.errno not in (ETXTBSY, EXDEV): 

152 raise err 

153 if _sendfile: 

154 try: 

155 _sendfile(source_fd, target_fd) 

156 return 

157 except OSError as err: 

158 if err.errno != ENOTSOCK: 

159 raise err 

160 except OSError as err: 

161 # Produce more useful error messages. 

162 err.filename = source_f.name 

163 err.filename2 = target_f.name 

164 raise err 

165 

166 # Last resort: copy with fileobj read() and write(). 

167 read_source = source_f.read 

168 write_target = target_f.write 

169 while buf := read_source(1024 * 1024): 

170 write_target(buf) 

171 

172 

173def _open_reader(obj): 

174 cls = type(obj) 

175 try: 

176 open_reader = cls.__open_reader__ 

177 except AttributeError: 

178 cls_name = cls.__name__ 

179 raise TypeError(f"{cls_name} can't be opened for reading") from None 

180 else: 

181 return open_reader(obj) 

182 

183 

184def _open_writer(obj, mode): 

185 cls = type(obj) 

186 try: 

187 open_writer = cls.__open_writer__ 

188 except AttributeError: 

189 cls_name = cls.__name__ 

190 raise TypeError(f"{cls_name} can't be opened for writing") from None 

191 else: 

192 return open_writer(obj, mode) 

193 

194 

195def _open_updater(obj, mode): 

196 cls = type(obj) 

197 try: 

198 open_updater = cls.__open_updater__ 

199 except AttributeError: 

200 cls_name = cls.__name__ 

201 raise TypeError(f"{cls_name} can't be opened for updating") from None 

202 else: 

203 return open_updater(obj, mode) 

204 

205 

206def vfsopen(obj, mode='r', buffering=-1, encoding=None, errors=None, 

207 newline=None): 

208 """ 

209 Open the file pointed to by this path and return a file object, as 

210 the built-in open() function does. 

211 

212 Unlike the built-in open() function, this function additionally accepts 

213 'openable' objects, which are objects with any of these special methods: 

214 

215 __open_reader__() 

216 __open_writer__(mode) 

217 __open_updater__(mode) 

218 

219 '__open_reader__' is called for 'r' mode; '__open_writer__' for 'a', 'w' 

220 and 'x' modes; and '__open_updater__' for 'r+' and 'w+' modes. If text 

221 mode is requested, the result is wrapped in an io.TextIOWrapper object. 

222 """ 

223 if buffering != -1: 

224 raise ValueError("buffer size can't be customized") 

225 text = 'b' not in mode 

226 if text: 

227 # Call io.text_encoding() here to ensure any warning is raised at an 

228 # appropriate stack level. 

229 encoding = text_encoding(encoding) 

230 try: 

231 return open(obj, mode, buffering, encoding, errors, newline) 

232 except TypeError: 

233 pass 

234 if not text: 

235 if encoding is not None: 

236 raise ValueError("binary mode doesn't take an encoding argument") 

237 if errors is not None: 

238 raise ValueError("binary mode doesn't take an errors argument") 

239 if newline is not None: 

240 raise ValueError("binary mode doesn't take a newline argument") 

241 mode = ''.join(sorted(c for c in mode if c not in 'bt')) 

242 if mode == 'r': 

243 stream = _open_reader(obj) 

244 elif mode in ('a', 'w', 'x'): 

245 stream = _open_writer(obj, mode) 

246 elif mode in ('+r', '+w'): 

247 stream = _open_updater(obj, mode[1]) 

248 else: 

249 raise ValueError(f'invalid mode: {mode}') 

250 if text: 

251 stream = TextIOWrapper(stream, encoding, errors, newline) 

252 return stream 

253 

254 

255def vfspath(obj): 

256 """ 

257 Return the string representation of a virtual path object. 

258 """ 

259 cls = type(obj) 

260 try: 

261 vfspath_method = cls.__vfspath__ 

262 except AttributeError: 

263 cls_name = cls.__name__ 

264 raise TypeError(f"expected JoinablePath object, not {cls_name}") from None 

265 else: 

266 return vfspath_method(obj) 

267 

268 

269def ensure_distinct_paths(source, target): 

270 """ 

271 Raise OSError(EINVAL) if the other path is within this path. 

272 """ 

273 # Note: there is no straightforward, foolproof algorithm to determine 

274 # if one directory is within another (a particularly perverse example 

275 # would be a single network share mounted in one location via NFS, and 

276 # in another location via CIFS), so we simply checks whether the 

277 # other path is lexically equal to, or within, this path. 

278 if source == target: 

279 err = OSError(EINVAL, "Source and target are the same path") 

280 elif source in target.parents: 

281 err = OSError(EINVAL, "Source path is a parent of target path") 

282 else: 

283 return 

284 err.filename = vfspath(source) 

285 err.filename2 = vfspath(target) 

286 raise err 

287 

288 

289def ensure_different_files(source, target): 

290 """ 

291 Raise OSError(EINVAL) if both paths refer to the same file. 

292 """ 

293 try: 

294 source_file_id = source.info._file_id 

295 target_file_id = target.info._file_id 

296 except AttributeError: 

297 if source != target: 

298 return 

299 else: 

300 try: 

301 if source_file_id() != target_file_id(): 

302 return 

303 except (OSError, ValueError): 

304 return 

305 err = OSError(EINVAL, "Source and target are the same file") 

306 err.filename = vfspath(source) 

307 err.filename2 = vfspath(target) 

308 raise err