Coverage for /pythoncovmergedfiles/medio/medio/src/paramiko/paramiko/sftp_file.py: 18%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

234 statements  

1# Copyright (C) 2003-2007 Robey Pointer <robeypointer@gmail.com> 

2# 

3# This file is part of paramiko. 

4# 

5# Paramiko is free software; you can redistribute it and/or modify it under the 

6# terms of the GNU Lesser General Public License as published by the Free 

7# Software Foundation; either version 2.1 of the License, or (at your option) 

8# any later version. 

9# 

10# Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY 

11# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 

12# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more 

13# details. 

14# 

15# You should have received a copy of the GNU Lesser General Public License 

16# along with Paramiko; if not, write to the Free Software Foundation, Inc., 

17# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 

18 

19""" 

20SFTP file object 

21""" 

22 

23 

24from binascii import hexlify 

25from collections import deque 

26import socket 

27import threading 

28import time 

29from paramiko.common import DEBUG, io_sleep 

30 

31from paramiko.file import BufferedFile 

32from paramiko.util import u 

33from paramiko.sftp import ( 

34 CMD_CLOSE, 

35 CMD_READ, 

36 CMD_DATA, 

37 SFTPError, 

38 CMD_WRITE, 

39 CMD_STATUS, 

40 CMD_FSTAT, 

41 CMD_ATTRS, 

42 CMD_FSETSTAT, 

43 CMD_EXTENDED, 

44 int64, 

45) 

46from paramiko.sftp_attr import SFTPAttributes 

47 

48 

49class SFTPFile(BufferedFile): 

50 """ 

51 Proxy object for a file on the remote server, in client mode SFTP. 

52 

53 Instances of this class may be used as context managers in the same way 

54 that built-in Python file objects are. 

55 """ 

56 

57 # Some sftp servers will choke if you send read/write requests larger than 

58 # this size. 

59 MAX_REQUEST_SIZE = 32768 

60 

61 def __init__(self, sftp, handle, mode="r", bufsize=-1): 

62 BufferedFile.__init__(self) 

63 self.sftp = sftp 

64 self.handle = handle 

65 BufferedFile._set_mode(self, mode, bufsize) 

66 self.pipelined = False 

67 self._prefetching = False 

68 self._prefetch_done = False 

69 self._prefetch_data = {} 

70 self._prefetch_extents = {} 

71 self._prefetch_lock = threading.Lock() 

72 self._saved_exception = None 

73 self._reqs = deque() 

74 

75 def __del__(self): 

76 self._close(async_=True) 

77 

78 def close(self): 

79 """ 

80 Close the file. 

81 """ 

82 self._close(async_=False) 

83 

84 def _close(self, async_=False): 

85 # We allow double-close without signaling an error, because real 

86 # Python file objects do. However, we must protect against actually 

87 # sending multiple CMD_CLOSE packets, because after we close our 

88 # handle, the same handle may be re-allocated by the server, and we 

89 # may end up mysteriously closing some random other file. (This is 

90 # especially important because we unconditionally call close() from 

91 # __del__.) 

92 if self._closed: 

93 return 

94 self.sftp._log(DEBUG, "close({})".format(u(hexlify(self.handle)))) 

95 if self.pipelined: 

96 self.sftp._finish_responses(self) 

97 BufferedFile.close(self) 

98 try: 

99 if async_: 

100 # GC'd file handle could be called from an arbitrary thread 

101 # -- don't wait for a response 

102 self.sftp._async_request(type(None), CMD_CLOSE, self.handle) 

103 else: 

104 self.sftp._request(CMD_CLOSE, self.handle) 

105 except EOFError: 

106 # may have outlived the Transport connection 

107 pass 

108 except (IOError, socket.error): 

109 # may have outlived the Transport connection 

110 pass 

111 

112 def _data_in_prefetch_requests(self, offset, size): 

113 k = [ 

114 x for x in list(self._prefetch_extents.values()) if x[0] <= offset 

115 ] 

116 if len(k) == 0: 

117 return False 

118 k.sort(key=lambda x: x[0]) 

119 buf_offset, buf_size = k[-1] 

120 if buf_offset + buf_size <= offset: 

121 # prefetch request ends before this one begins 

122 return False 

123 if buf_offset + buf_size >= offset + size: 

124 # inclusive 

125 return True 

126 # well, we have part of the request. see if another chunk has 

127 # the rest. 

128 return self._data_in_prefetch_requests( 

129 buf_offset + buf_size, offset + size - buf_offset - buf_size 

130 ) 

131 

132 def _data_in_prefetch_buffers(self, offset): 

133 """ 

134 if a block of data is present in the prefetch buffers, at the given 

135 offset, return the offset of the relevant prefetch buffer. otherwise, 

136 return None. this guarantees nothing about the number of bytes 

137 collected in the prefetch buffer so far. 

138 """ 

139 k = [i for i in self._prefetch_data.keys() if i <= offset] 

140 if len(k) == 0: 

141 return None 

142 index = max(k) 

143 buf_offset = offset - index 

144 if buf_offset >= len(self._prefetch_data[index]): 

145 # it's not here 

146 return None 

147 return index 

148 

149 def _read_prefetch(self, size): 

150 """ 

151 read data out of the prefetch buffer, if possible. if the data isn't 

152 in the buffer, return None. otherwise, behaves like a normal read. 

153 """ 

154 # while not closed, and haven't fetched past the current position, 

155 # and haven't reached EOF... 

156 while True: 

157 offset = self._data_in_prefetch_buffers(self._realpos) 

158 if offset is not None: 

159 break 

160 if self._prefetch_done or self._closed: 

161 break 

162 self.sftp._read_response() 

163 self._check_exception() 

164 if offset is None: 

165 self._prefetching = False 

166 return None 

167 prefetch = self._prefetch_data[offset] 

168 del self._prefetch_data[offset] 

169 

170 buf_offset = self._realpos - offset 

171 if buf_offset > 0: 

172 self._prefetch_data[offset] = prefetch[:buf_offset] 

173 prefetch = prefetch[buf_offset:] 

174 if size < len(prefetch): 

175 self._prefetch_data[self._realpos + size] = prefetch[size:] 

176 prefetch = prefetch[:size] 

177 return prefetch 

178 

179 def _read(self, size): 

180 size = min(size, self.MAX_REQUEST_SIZE) 

181 if self._prefetching: 

182 data = self._read_prefetch(size) 

183 if data is not None: 

184 return data 

185 t, msg = self.sftp._request( 

186 CMD_READ, self.handle, int64(self._realpos), int(size) 

187 ) 

188 if t != CMD_DATA: 

189 raise SFTPError("Expected data") 

190 return msg.get_string() 

191 

192 def _write(self, data): 

193 # may write less than requested if it would exceed max packet size 

194 chunk = min(len(data), self.MAX_REQUEST_SIZE) 

195 sftp_async_request = self.sftp._async_request( 

196 type(None), 

197 CMD_WRITE, 

198 self.handle, 

199 int64(self._realpos), 

200 data[:chunk], 

201 ) 

202 self._reqs.append(sftp_async_request) 

203 if not self.pipelined or ( 

204 len(self._reqs) > 100 and self.sftp.sock.recv_ready() 

205 ): 

206 while len(self._reqs): 

207 req = self._reqs.popleft() 

208 t, msg = self.sftp._read_response(req) 

209 if t != CMD_STATUS: 

210 raise SFTPError("Expected status") 

211 # convert_status already called 

212 return chunk 

213 

214 def settimeout(self, timeout): 

215 """ 

216 Set a timeout on read/write operations on the underlying socket or 

217 ssh `.Channel`. 

218 

219 :param float timeout: 

220 seconds to wait for a pending read/write operation before raising 

221 ``socket.timeout``, or ``None`` for no timeout 

222 

223 .. seealso:: `.Channel.settimeout` 

224 """ 

225 self.sftp.sock.settimeout(timeout) 

226 

227 def gettimeout(self): 

228 """ 

229 Returns the timeout in seconds (as a `float`) associated with the 

230 socket or ssh `.Channel` used for this file. 

231 

232 .. seealso:: `.Channel.gettimeout` 

233 """ 

234 return self.sftp.sock.gettimeout() 

235 

236 def setblocking(self, blocking): 

237 """ 

238 Set blocking or non-blocking mode on the underiying socket or ssh 

239 `.Channel`. 

240 

241 :param int blocking: 

242 0 to set non-blocking mode; non-0 to set blocking mode. 

243 

244 .. seealso:: `.Channel.setblocking` 

245 """ 

246 self.sftp.sock.setblocking(blocking) 

247 

248 def seekable(self): 

249 """ 

250 Check if the file supports random access. 

251 

252 :return: 

253 `True` if the file supports random access. If `False`, 

254 :meth:`seek` will raise an exception 

255 """ 

256 return True 

257 

258 def seek(self, offset, whence=0): 

259 """ 

260 Set the file's current position. 

261 

262 See `file.seek` for details. 

263 """ 

264 self.flush() 

265 if whence == self.SEEK_SET: 

266 self._realpos = self._pos = offset 

267 elif whence == self.SEEK_CUR: 

268 self._pos += offset 

269 self._realpos = self._pos 

270 else: 

271 self._realpos = self._pos = self._get_size() + offset 

272 self._rbuffer = bytes() 

273 

274 def stat(self): 

275 """ 

276 Retrieve information about this file from the remote system. This is 

277 exactly like `.SFTPClient.stat`, except that it operates on an 

278 already-open file. 

279 

280 :returns: 

281 an `.SFTPAttributes` object containing attributes about this file. 

282 """ 

283 t, msg = self.sftp._request(CMD_FSTAT, self.handle) 

284 if t != CMD_ATTRS: 

285 raise SFTPError("Expected attributes") 

286 return SFTPAttributes._from_msg(msg) 

287 

288 def chmod(self, mode): 

289 """ 

290 Change the mode (permissions) of this file. The permissions are 

291 unix-style and identical to those used by Python's `os.chmod` 

292 function. 

293 

294 :param int mode: new permissions 

295 """ 

296 self.sftp._log( 

297 DEBUG, "chmod({}, {!r})".format(hexlify(self.handle), mode) 

298 ) 

299 attr = SFTPAttributes() 

300 attr.st_mode = mode 

301 self.sftp._request(CMD_FSETSTAT, self.handle, attr) 

302 

303 def chown(self, uid, gid): 

304 """ 

305 Change the owner (``uid``) and group (``gid``) of this file. As with 

306 Python's `os.chown` function, you must pass both arguments, so if you 

307 only want to change one, use `stat` first to retrieve the current 

308 owner and group. 

309 

310 :param int uid: new owner's uid 

311 :param int gid: new group id 

312 """ 

313 self.sftp._log( 

314 DEBUG, 

315 "chown({}, {!r}, {!r})".format(hexlify(self.handle), uid, gid), 

316 ) 

317 attr = SFTPAttributes() 

318 attr.st_uid, attr.st_gid = uid, gid 

319 self.sftp._request(CMD_FSETSTAT, self.handle, attr) 

320 

321 def utime(self, times): 

322 """ 

323 Set the access and modified times of this file. If 

324 ``times`` is ``None``, then the file's access and modified times are 

325 set to the current time. Otherwise, ``times`` must be a 2-tuple of 

326 numbers, of the form ``(atime, mtime)``, which is used to set the 

327 access and modified times, respectively. This bizarre API is mimicked 

328 from Python for the sake of consistency -- I apologize. 

329 

330 :param tuple times: 

331 ``None`` or a tuple of (access time, modified time) in standard 

332 internet epoch time (seconds since 01 January 1970 GMT) 

333 """ 

334 if times is None: 

335 times = (time.time(), time.time()) 

336 self.sftp._log( 

337 DEBUG, "utime({}, {!r})".format(hexlify(self.handle), times) 

338 ) 

339 attr = SFTPAttributes() 

340 attr.st_atime, attr.st_mtime = times 

341 self.sftp._request(CMD_FSETSTAT, self.handle, attr) 

342 

343 def truncate(self, size): 

344 """ 

345 Change the size of this file. This usually extends 

346 or shrinks the size of the file, just like the ``truncate()`` method on 

347 Python file objects. 

348 

349 :param size: the new size of the file 

350 """ 

351 self.sftp._log( 

352 DEBUG, "truncate({}, {!r})".format(hexlify(self.handle), size) 

353 ) 

354 attr = SFTPAttributes() 

355 attr.st_size = size 

356 self.sftp._request(CMD_FSETSTAT, self.handle, attr) 

357 

358 def check(self, hash_algorithm, offset=0, length=0, block_size=0): 

359 """ 

360 Ask the server for a hash of a section of this file. This can be used 

361 to verify a successful upload or download, or for various rsync-like 

362 operations. 

363 

364 The file is hashed from ``offset``, for ``length`` bytes. 

365 If ``length`` is 0, the remainder of the file is hashed. Thus, if both 

366 ``offset`` and ``length`` are zero, the entire file is hashed. 

367 

368 Normally, ``block_size`` will be 0 (the default), and this method will 

369 return a byte string representing the requested hash (for example, a 

370 string of length 16 for MD5, or 20 for SHA-1). If a non-zero 

371 ``block_size`` is given, each chunk of the file (from ``offset`` to 

372 ``offset + length``) of ``block_size`` bytes is computed as a separate 

373 hash. The hash results are all concatenated and returned as a single 

374 string. 

375 

376 For example, ``check('sha1', 0, 1024, 512)`` will return a string of 

377 length 40. The first 20 bytes will be the SHA-1 of the first 512 bytes 

378 of the file, and the last 20 bytes will be the SHA-1 of the next 512 

379 bytes. 

380 

381 :param str hash_algorithm: 

382 the name of the hash algorithm to use (normally ``"sha1"`` or 

383 ``"md5"``) 

384 :param offset: 

385 offset into the file to begin hashing (0 means to start from the 

386 beginning) 

387 :param length: 

388 number of bytes to hash (0 means continue to the end of the file) 

389 :param int block_size: 

390 number of bytes to hash per result (must not be less than 256; 0 

391 means to compute only one hash of the entire segment) 

392 :return: 

393 `str` of bytes representing the hash of each block, concatenated 

394 together 

395 

396 :raises: 

397 ``IOError`` -- if the server doesn't support the "check-file" 

398 extension, or possibly doesn't support the hash algorithm requested 

399 

400 .. note:: Many (most?) servers don't support this extension yet. 

401 

402 .. versionadded:: 1.4 

403 """ 

404 t, msg = self.sftp._request( 

405 CMD_EXTENDED, 

406 "check-file", 

407 self.handle, 

408 hash_algorithm, 

409 int64(offset), 

410 int64(length), 

411 block_size, 

412 ) 

413 msg.get_text() # ext 

414 msg.get_text() # alg 

415 data = msg.get_remainder() 

416 return data 

417 

418 def set_pipelined(self, pipelined=True): 

419 """ 

420 Turn on/off the pipelining of write operations to this file. When 

421 pipelining is on, paramiko won't wait for the server response after 

422 each write operation. Instead, they're collected as they come in. At 

423 the first non-write operation (including `.close`), all remaining 

424 server responses are collected. This means that if there was an error 

425 with one of your later writes, an exception might be thrown from within 

426 `.close` instead of `.write`. 

427 

428 By default, files are not pipelined. 

429 

430 :param bool pipelined: 

431 ``True`` if pipelining should be turned on for this file; ``False`` 

432 otherwise 

433 

434 .. versionadded:: 1.5 

435 """ 

436 self.pipelined = pipelined 

437 

438 def prefetch(self, file_size=None, max_concurrent_requests=None): 

439 """ 

440 Pre-fetch the remaining contents of this file in anticipation of future 

441 `.read` calls. If reading the entire file, pre-fetching can 

442 dramatically improve the download speed by avoiding roundtrip latency. 

443 The file's contents are incrementally buffered in a background thread. 

444 

445 The prefetched data is stored in a buffer until read via the `.read` 

446 method. Once data has been read, it's removed from the buffer. The 

447 data may be read in a random order (using `.seek`); chunks of the 

448 buffer that haven't been read will continue to be buffered. 

449 

450 :param int file_size: 

451 When this is ``None`` (the default), this method calls `stat` to 

452 determine the remote file size. In some situations, doing so can 

453 cause exceptions or hangs (see `#562 

454 <https://github.com/paramiko/paramiko/pull/562>`_); as a 

455 workaround, one may call `stat` explicitly and pass its value in 

456 via this parameter. 

457 :param int max_concurrent_requests: 

458 The maximum number of concurrent read requests to prefetch. See 

459 `.SFTPClient.get` (its ``max_concurrent_prefetch_requests`` param) 

460 for details. 

461 

462 .. versionadded:: 1.5.1 

463 .. versionchanged:: 1.16.0 

464 The ``file_size`` parameter was added (with no default value). 

465 .. versionchanged:: 1.16.1 

466 The ``file_size`` parameter was made optional for backwards 

467 compatibility. 

468 .. versionchanged:: 3.3 

469 Added ``max_concurrent_requests``. 

470 """ 

471 if file_size is None: 

472 file_size = self.stat().st_size 

473 

474 # queue up async reads for the rest of the file 

475 chunks = [] 

476 n = self._realpos 

477 while n < file_size: 

478 chunk = min(self.MAX_REQUEST_SIZE, file_size - n) 

479 chunks.append((n, chunk)) 

480 n += chunk 

481 if len(chunks) > 0: 

482 self._start_prefetch(chunks, max_concurrent_requests) 

483 

484 def readv(self, chunks, max_concurrent_prefetch_requests=None): 

485 """ 

486 Read a set of blocks from the file by (offset, length). This is more 

487 efficient than doing a series of `.seek` and `.read` calls, since the 

488 prefetch machinery is used to retrieve all the requested blocks at 

489 once. 

490 

491 :param chunks: 

492 a list of ``(offset, length)`` tuples indicating which sections of 

493 the file to read 

494 :param int max_concurrent_prefetch_requests: 

495 The maximum number of concurrent read requests to prefetch. See 

496 `.SFTPClient.get` (its ``max_concurrent_prefetch_requests`` param) 

497 for details. 

498 :return: a list of blocks read, in the same order as in ``chunks`` 

499 

500 .. versionadded:: 1.5.4 

501 .. versionchanged:: 3.3 

502 Added ``max_concurrent_prefetch_requests``. 

503 """ 

504 self.sftp._log( 

505 DEBUG, "readv({}, {!r})".format(hexlify(self.handle), chunks) 

506 ) 

507 

508 read_chunks = [] 

509 for offset, size in chunks: 

510 # don't fetch data that's already in the prefetch buffer 

511 if self._data_in_prefetch_buffers( 

512 offset 

513 ) or self._data_in_prefetch_requests(offset, size): 

514 continue 

515 

516 # break up anything larger than the max read size 

517 while size > 0: 

518 chunk_size = min(size, self.MAX_REQUEST_SIZE) 

519 read_chunks.append((offset, chunk_size)) 

520 offset += chunk_size 

521 size -= chunk_size 

522 

523 self._start_prefetch(read_chunks, max_concurrent_prefetch_requests) 

524 # now we can just devolve to a bunch of read()s :) 

525 for x in chunks: 

526 self.seek(x[0]) 

527 yield self.read(x[1]) 

528 

529 # ...internals... 

530 

531 def _get_size(self): 

532 try: 

533 return self.stat().st_size 

534 except: 

535 return 0 

536 

537 def _start_prefetch(self, chunks, max_concurrent_requests=None): 

538 self._prefetching = True 

539 self._prefetch_done = False 

540 

541 t = threading.Thread( 

542 target=self._prefetch_thread, 

543 args=(chunks, max_concurrent_requests), 

544 ) 

545 t.daemon = True 

546 t.start() 

547 

548 def _prefetch_thread(self, chunks, max_concurrent_requests): 

549 # do these read requests in a temporary thread because there may be 

550 # a lot of them, so it may block. 

551 for offset, length in chunks: 

552 # Limit the number of concurrent requests in a busy-loop 

553 if max_concurrent_requests is not None: 

554 while True: 

555 with self._prefetch_lock: 

556 pf_len = len(self._prefetch_extents) 

557 if pf_len < max_concurrent_requests: 

558 break 

559 time.sleep(io_sleep) 

560 

561 num = self.sftp._async_request( 

562 self, CMD_READ, self.handle, int64(offset), int(length) 

563 ) 

564 with self._prefetch_lock: 

565 self._prefetch_extents[num] = (offset, length) 

566 

567 def _async_response(self, t, msg, num): 

568 if t == CMD_STATUS: 

569 # save exception and re-raise it on next file operation 

570 try: 

571 self.sftp._convert_status(msg) 

572 except Exception as e: 

573 self._saved_exception = e 

574 return 

575 if t != CMD_DATA: 

576 raise SFTPError("Expected data") 

577 data = msg.get_string() 

578 while True: 

579 with self._prefetch_lock: 

580 # spin if in race with _prefetch_thread 

581 if num in self._prefetch_extents: 

582 offset, length = self._prefetch_extents[num] 

583 self._prefetch_data[offset] = data 

584 del self._prefetch_extents[num] 

585 if len(self._prefetch_extents) == 0: 

586 self._prefetch_done = True 

587 break 

588 

589 def _check_exception(self): 

590 """if there's a saved exception, raise & clear it""" 

591 if self._saved_exception is not None: 

592 x = self._saved_exception 

593 self._saved_exception = None 

594 raise x