Coverage for /pythoncovmergedfiles/medio/medio/src/paramiko/paramiko/sftp_file.py: 18%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright (C) 2003-2007 Robey Pointer <robeypointer@gmail.com>
2#
3# This file is part of paramiko.
4#
5# Paramiko is free software; you can redistribute it and/or modify it under the
6# terms of the GNU Lesser General Public License as published by the Free
7# Software Foundation; either version 2.1 of the License, or (at your option)
8# any later version.
9#
10# Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY
11# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
12# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
13# details.
14#
15# You should have received a copy of the GNU Lesser General Public License
16# along with Paramiko; if not, write to the Free Software Foundation, Inc.,
17# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19"""
20SFTP file object
21"""
24from binascii import hexlify
25from collections import deque
26import socket
27import threading
28import time
29from paramiko.common import DEBUG, io_sleep
31from paramiko.file import BufferedFile
32from paramiko.util import u
33from paramiko.sftp import (
34 CMD_CLOSE,
35 CMD_READ,
36 CMD_DATA,
37 SFTPError,
38 CMD_WRITE,
39 CMD_STATUS,
40 CMD_FSTAT,
41 CMD_ATTRS,
42 CMD_FSETSTAT,
43 CMD_EXTENDED,
44 int64,
45)
46from paramiko.sftp_attr import SFTPAttributes
49class SFTPFile(BufferedFile):
50 """
51 Proxy object for a file on the remote server, in client mode SFTP.
53 Instances of this class may be used as context managers in the same way
54 that built-in Python file objects are.
55 """
57 # Some sftp servers will choke if you send read/write requests larger than
58 # this size.
59 MAX_REQUEST_SIZE = 32768
61 def __init__(self, sftp, handle, mode="r", bufsize=-1):
62 BufferedFile.__init__(self)
63 self.sftp = sftp
64 self.handle = handle
65 BufferedFile._set_mode(self, mode, bufsize)
66 self.pipelined = False
67 self._prefetching = False
68 self._prefetch_done = False
69 self._prefetch_data = {}
70 self._prefetch_extents = {}
71 self._prefetch_lock = threading.Lock()
72 self._saved_exception = None
73 self._reqs = deque()
75 def __del__(self):
76 self._close(async_=True)
78 def close(self):
79 """
80 Close the file.
81 """
82 self._close(async_=False)
84 def _close(self, async_=False):
85 # We allow double-close without signaling an error, because real
86 # Python file objects do. However, we must protect against actually
87 # sending multiple CMD_CLOSE packets, because after we close our
88 # handle, the same handle may be re-allocated by the server, and we
89 # may end up mysteriously closing some random other file. (This is
90 # especially important because we unconditionally call close() from
91 # __del__.)
92 if self._closed:
93 return
94 self.sftp._log(DEBUG, "close({})".format(u(hexlify(self.handle))))
95 if self.pipelined:
96 self.sftp._finish_responses(self)
97 BufferedFile.close(self)
98 try:
99 if async_:
100 # GC'd file handle could be called from an arbitrary thread
101 # -- don't wait for a response
102 self.sftp._async_request(type(None), CMD_CLOSE, self.handle)
103 else:
104 self.sftp._request(CMD_CLOSE, self.handle)
105 except EOFError:
106 # may have outlived the Transport connection
107 pass
108 except (IOError, socket.error):
109 # may have outlived the Transport connection
110 pass
112 def _data_in_prefetch_requests(self, offset, size):
113 k = [
114 x for x in list(self._prefetch_extents.values()) if x[0] <= offset
115 ]
116 if len(k) == 0:
117 return False
118 k.sort(key=lambda x: x[0])
119 buf_offset, buf_size = k[-1]
120 if buf_offset + buf_size <= offset:
121 # prefetch request ends before this one begins
122 return False
123 if buf_offset + buf_size >= offset + size:
124 # inclusive
125 return True
126 # well, we have part of the request. see if another chunk has
127 # the rest.
128 return self._data_in_prefetch_requests(
129 buf_offset + buf_size, offset + size - buf_offset - buf_size
130 )
132 def _data_in_prefetch_buffers(self, offset):
133 """
134 if a block of data is present in the prefetch buffers, at the given
135 offset, return the offset of the relevant prefetch buffer. otherwise,
136 return None. this guarantees nothing about the number of bytes
137 collected in the prefetch buffer so far.
138 """
139 k = [i for i in self._prefetch_data.keys() if i <= offset]
140 if len(k) == 0:
141 return None
142 index = max(k)
143 buf_offset = offset - index
144 if buf_offset >= len(self._prefetch_data[index]):
145 # it's not here
146 return None
147 return index
149 def _read_prefetch(self, size):
150 """
151 read data out of the prefetch buffer, if possible. if the data isn't
152 in the buffer, return None. otherwise, behaves like a normal read.
153 """
154 # while not closed, and haven't fetched past the current position,
155 # and haven't reached EOF...
156 while True:
157 offset = self._data_in_prefetch_buffers(self._realpos)
158 if offset is not None:
159 break
160 if self._prefetch_done or self._closed:
161 break
162 self.sftp._read_response()
163 self._check_exception()
164 if offset is None:
165 self._prefetching = False
166 return None
167 prefetch = self._prefetch_data[offset]
168 del self._prefetch_data[offset]
170 buf_offset = self._realpos - offset
171 if buf_offset > 0:
172 self._prefetch_data[offset] = prefetch[:buf_offset]
173 prefetch = prefetch[buf_offset:]
174 if size < len(prefetch):
175 self._prefetch_data[self._realpos + size] = prefetch[size:]
176 prefetch = prefetch[:size]
177 return prefetch
179 def _read(self, size):
180 size = min(size, self.MAX_REQUEST_SIZE)
181 if self._prefetching:
182 data = self._read_prefetch(size)
183 if data is not None:
184 return data
185 t, msg = self.sftp._request(
186 CMD_READ, self.handle, int64(self._realpos), int(size)
187 )
188 if t != CMD_DATA:
189 raise SFTPError("Expected data")
190 return msg.get_string()
192 def _write(self, data):
193 # may write less than requested if it would exceed max packet size
194 chunk = min(len(data), self.MAX_REQUEST_SIZE)
195 sftp_async_request = self.sftp._async_request(
196 type(None),
197 CMD_WRITE,
198 self.handle,
199 int64(self._realpos),
200 data[:chunk],
201 )
202 self._reqs.append(sftp_async_request)
203 if not self.pipelined or (
204 len(self._reqs) > 100 and self.sftp.sock.recv_ready()
205 ):
206 while len(self._reqs):
207 req = self._reqs.popleft()
208 t, msg = self.sftp._read_response(req)
209 if t != CMD_STATUS:
210 raise SFTPError("Expected status")
211 # convert_status already called
212 return chunk
214 def settimeout(self, timeout):
215 """
216 Set a timeout on read/write operations on the underlying socket or
217 ssh `.Channel`.
219 :param float timeout:
220 seconds to wait for a pending read/write operation before raising
221 ``socket.timeout``, or ``None`` for no timeout
223 .. seealso:: `.Channel.settimeout`
224 """
225 self.sftp.sock.settimeout(timeout)
227 def gettimeout(self):
228 """
229 Returns the timeout in seconds (as a `float`) associated with the
230 socket or ssh `.Channel` used for this file.
232 .. seealso:: `.Channel.gettimeout`
233 """
234 return self.sftp.sock.gettimeout()
236 def setblocking(self, blocking):
237 """
238 Set blocking or non-blocking mode on the underiying socket or ssh
239 `.Channel`.
241 :param int blocking:
242 0 to set non-blocking mode; non-0 to set blocking mode.
244 .. seealso:: `.Channel.setblocking`
245 """
246 self.sftp.sock.setblocking(blocking)
248 def seekable(self):
249 """
250 Check if the file supports random access.
252 :return:
253 `True` if the file supports random access. If `False`,
254 :meth:`seek` will raise an exception
255 """
256 return True
258 def seek(self, offset, whence=0):
259 """
260 Set the file's current position.
262 See `file.seek` for details.
263 """
264 self.flush()
265 if whence == self.SEEK_SET:
266 self._realpos = self._pos = offset
267 elif whence == self.SEEK_CUR:
268 self._pos += offset
269 self._realpos = self._pos
270 else:
271 self._realpos = self._pos = self._get_size() + offset
272 self._rbuffer = bytes()
274 def stat(self):
275 """
276 Retrieve information about this file from the remote system. This is
277 exactly like `.SFTPClient.stat`, except that it operates on an
278 already-open file.
280 :returns:
281 an `.SFTPAttributes` object containing attributes about this file.
282 """
283 t, msg = self.sftp._request(CMD_FSTAT, self.handle)
284 if t != CMD_ATTRS:
285 raise SFTPError("Expected attributes")
286 return SFTPAttributes._from_msg(msg)
288 def chmod(self, mode):
289 """
290 Change the mode (permissions) of this file. The permissions are
291 unix-style and identical to those used by Python's `os.chmod`
292 function.
294 :param int mode: new permissions
295 """
296 self.sftp._log(
297 DEBUG, "chmod({}, {!r})".format(hexlify(self.handle), mode)
298 )
299 attr = SFTPAttributes()
300 attr.st_mode = mode
301 self.sftp._request(CMD_FSETSTAT, self.handle, attr)
303 def chown(self, uid, gid):
304 """
305 Change the owner (``uid``) and group (``gid``) of this file. As with
306 Python's `os.chown` function, you must pass both arguments, so if you
307 only want to change one, use `stat` first to retrieve the current
308 owner and group.
310 :param int uid: new owner's uid
311 :param int gid: new group id
312 """
313 self.sftp._log(
314 DEBUG,
315 "chown({}, {!r}, {!r})".format(hexlify(self.handle), uid, gid),
316 )
317 attr = SFTPAttributes()
318 attr.st_uid, attr.st_gid = uid, gid
319 self.sftp._request(CMD_FSETSTAT, self.handle, attr)
321 def utime(self, times):
322 """
323 Set the access and modified times of this file. If
324 ``times`` is ``None``, then the file's access and modified times are
325 set to the current time. Otherwise, ``times`` must be a 2-tuple of
326 numbers, of the form ``(atime, mtime)``, which is used to set the
327 access and modified times, respectively. This bizarre API is mimicked
328 from Python for the sake of consistency -- I apologize.
330 :param tuple times:
331 ``None`` or a tuple of (access time, modified time) in standard
332 internet epoch time (seconds since 01 January 1970 GMT)
333 """
334 if times is None:
335 times = (time.time(), time.time())
336 self.sftp._log(
337 DEBUG, "utime({}, {!r})".format(hexlify(self.handle), times)
338 )
339 attr = SFTPAttributes()
340 attr.st_atime, attr.st_mtime = times
341 self.sftp._request(CMD_FSETSTAT, self.handle, attr)
343 def truncate(self, size):
344 """
345 Change the size of this file. This usually extends
346 or shrinks the size of the file, just like the ``truncate()`` method on
347 Python file objects.
349 :param size: the new size of the file
350 """
351 self.sftp._log(
352 DEBUG, "truncate({}, {!r})".format(hexlify(self.handle), size)
353 )
354 attr = SFTPAttributes()
355 attr.st_size = size
356 self.sftp._request(CMD_FSETSTAT, self.handle, attr)
358 def check(self, hash_algorithm, offset=0, length=0, block_size=0):
359 """
360 Ask the server for a hash of a section of this file. This can be used
361 to verify a successful upload or download, or for various rsync-like
362 operations.
364 The file is hashed from ``offset``, for ``length`` bytes.
365 If ``length`` is 0, the remainder of the file is hashed. Thus, if both
366 ``offset`` and ``length`` are zero, the entire file is hashed.
368 Normally, ``block_size`` will be 0 (the default), and this method will
369 return a byte string representing the requested hash (for example, a
370 string of length 16 for MD5, or 20 for SHA-1). If a non-zero
371 ``block_size`` is given, each chunk of the file (from ``offset`` to
372 ``offset + length``) of ``block_size`` bytes is computed as a separate
373 hash. The hash results are all concatenated and returned as a single
374 string.
376 For example, ``check('sha1', 0, 1024, 512)`` will return a string of
377 length 40. The first 20 bytes will be the SHA-1 of the first 512 bytes
378 of the file, and the last 20 bytes will be the SHA-1 of the next 512
379 bytes.
381 :param str hash_algorithm:
382 the name of the hash algorithm to use (normally ``"sha1"`` or
383 ``"md5"``)
384 :param offset:
385 offset into the file to begin hashing (0 means to start from the
386 beginning)
387 :param length:
388 number of bytes to hash (0 means continue to the end of the file)
389 :param int block_size:
390 number of bytes to hash per result (must not be less than 256; 0
391 means to compute only one hash of the entire segment)
392 :return:
393 `str` of bytes representing the hash of each block, concatenated
394 together
396 :raises:
397 ``IOError`` -- if the server doesn't support the "check-file"
398 extension, or possibly doesn't support the hash algorithm requested
400 .. note:: Many (most?) servers don't support this extension yet.
402 .. versionadded:: 1.4
403 """
404 t, msg = self.sftp._request(
405 CMD_EXTENDED,
406 "check-file",
407 self.handle,
408 hash_algorithm,
409 int64(offset),
410 int64(length),
411 block_size,
412 )
413 msg.get_text() # ext
414 msg.get_text() # alg
415 data = msg.get_remainder()
416 return data
418 def set_pipelined(self, pipelined=True):
419 """
420 Turn on/off the pipelining of write operations to this file. When
421 pipelining is on, paramiko won't wait for the server response after
422 each write operation. Instead, they're collected as they come in. At
423 the first non-write operation (including `.close`), all remaining
424 server responses are collected. This means that if there was an error
425 with one of your later writes, an exception might be thrown from within
426 `.close` instead of `.write`.
428 By default, files are not pipelined.
430 :param bool pipelined:
431 ``True`` if pipelining should be turned on for this file; ``False``
432 otherwise
434 .. versionadded:: 1.5
435 """
436 self.pipelined = pipelined
438 def prefetch(self, file_size=None, max_concurrent_requests=None):
439 """
440 Pre-fetch the remaining contents of this file in anticipation of future
441 `.read` calls. If reading the entire file, pre-fetching can
442 dramatically improve the download speed by avoiding roundtrip latency.
443 The file's contents are incrementally buffered in a background thread.
445 The prefetched data is stored in a buffer until read via the `.read`
446 method. Once data has been read, it's removed from the buffer. The
447 data may be read in a random order (using `.seek`); chunks of the
448 buffer that haven't been read will continue to be buffered.
450 :param int file_size:
451 When this is ``None`` (the default), this method calls `stat` to
452 determine the remote file size. In some situations, doing so can
453 cause exceptions or hangs (see `#562
454 <https://github.com/paramiko/paramiko/pull/562>`_); as a
455 workaround, one may call `stat` explicitly and pass its value in
456 via this parameter.
457 :param int max_concurrent_requests:
458 The maximum number of concurrent read requests to prefetch. See
459 `.SFTPClient.get` (its ``max_concurrent_prefetch_requests`` param)
460 for details.
462 .. versionadded:: 1.5.1
463 .. versionchanged:: 1.16.0
464 The ``file_size`` parameter was added (with no default value).
465 .. versionchanged:: 1.16.1
466 The ``file_size`` parameter was made optional for backwards
467 compatibility.
468 .. versionchanged:: 3.3
469 Added ``max_concurrent_requests``.
470 """
471 if file_size is None:
472 file_size = self.stat().st_size
474 # queue up async reads for the rest of the file
475 chunks = []
476 n = self._realpos
477 while n < file_size:
478 chunk = min(self.MAX_REQUEST_SIZE, file_size - n)
479 chunks.append((n, chunk))
480 n += chunk
481 if len(chunks) > 0:
482 self._start_prefetch(chunks, max_concurrent_requests)
484 def readv(self, chunks, max_concurrent_prefetch_requests=None):
485 """
486 Read a set of blocks from the file by (offset, length). This is more
487 efficient than doing a series of `.seek` and `.read` calls, since the
488 prefetch machinery is used to retrieve all the requested blocks at
489 once.
491 :param chunks:
492 a list of ``(offset, length)`` tuples indicating which sections of
493 the file to read
494 :param int max_concurrent_prefetch_requests:
495 The maximum number of concurrent read requests to prefetch. See
496 `.SFTPClient.get` (its ``max_concurrent_prefetch_requests`` param)
497 for details.
498 :return: a list of blocks read, in the same order as in ``chunks``
500 .. versionadded:: 1.5.4
501 .. versionchanged:: 3.3
502 Added ``max_concurrent_prefetch_requests``.
503 """
504 self.sftp._log(
505 DEBUG, "readv({}, {!r})".format(hexlify(self.handle), chunks)
506 )
508 read_chunks = []
509 for offset, size in chunks:
510 # don't fetch data that's already in the prefetch buffer
511 if self._data_in_prefetch_buffers(
512 offset
513 ) or self._data_in_prefetch_requests(offset, size):
514 continue
516 # break up anything larger than the max read size
517 while size > 0:
518 chunk_size = min(size, self.MAX_REQUEST_SIZE)
519 read_chunks.append((offset, chunk_size))
520 offset += chunk_size
521 size -= chunk_size
523 self._start_prefetch(read_chunks, max_concurrent_prefetch_requests)
524 # now we can just devolve to a bunch of read()s :)
525 for x in chunks:
526 self.seek(x[0])
527 yield self.read(x[1])
529 # ...internals...
531 def _get_size(self):
532 try:
533 return self.stat().st_size
534 except:
535 return 0
537 def _start_prefetch(self, chunks, max_concurrent_requests=None):
538 self._prefetching = True
539 self._prefetch_done = False
541 t = threading.Thread(
542 target=self._prefetch_thread,
543 args=(chunks, max_concurrent_requests),
544 )
545 t.daemon = True
546 t.start()
548 def _prefetch_thread(self, chunks, max_concurrent_requests):
549 # do these read requests in a temporary thread because there may be
550 # a lot of them, so it may block.
551 for offset, length in chunks:
552 # Limit the number of concurrent requests in a busy-loop
553 if max_concurrent_requests is not None:
554 while True:
555 with self._prefetch_lock:
556 pf_len = len(self._prefetch_extents)
557 if pf_len < max_concurrent_requests:
558 break
559 time.sleep(io_sleep)
561 num = self.sftp._async_request(
562 self, CMD_READ, self.handle, int64(offset), int(length)
563 )
564 with self._prefetch_lock:
565 self._prefetch_extents[num] = (offset, length)
567 def _async_response(self, t, msg, num):
568 if t == CMD_STATUS:
569 # save exception and re-raise it on next file operation
570 try:
571 self.sftp._convert_status(msg)
572 except Exception as e:
573 self._saved_exception = e
574 return
575 if t != CMD_DATA:
576 raise SFTPError("Expected data")
577 data = msg.get_string()
578 while True:
579 with self._prefetch_lock:
580 # spin if in race with _prefetch_thread
581 if num in self._prefetch_extents:
582 offset, length = self._prefetch_extents[num]
583 self._prefetch_data[offset] = data
584 del self._prefetch_extents[num]
585 if len(self._prefetch_extents) == 0:
586 self._prefetch_done = True
587 break
589 def _check_exception(self):
590 """if there's a saved exception, raise & clear it"""
591 if self._saved_exception is not None:
592 x = self._saved_exception
593 self._saved_exception = None
594 raise x