Coverage for /pythoncovmergedfiles/medio/medio/src/paramiko/paramiko/file.py: 16%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright (C) 2003-2007 Robey Pointer <robeypointer@gmail.com>
2#
3# This file is part of paramiko.
4#
5# Paramiko is free software; you can redistribute it and/or modify it under the
6# terms of the GNU Lesser General Public License as published by the Free
7# Software Foundation; either version 2.1 of the License, or (at your option)
8# any later version.
9#
10# Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY
11# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
12# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
13# details.
14#
15# You should have received a copy of the GNU Lesser General Public License
16# along with Paramiko; if not, write to the Free Software Foundation, Inc.,
17# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18from io import BytesIO
20from paramiko.common import (
21 linefeed_byte_value,
22 crlf,
23 cr_byte,
24 linefeed_byte,
25 cr_byte_value,
26)
28from paramiko.util import ClosingContextManager, u
31class BufferedFile(ClosingContextManager):
32 """
33 Reusable base class to implement Python-style file buffering around a
34 simpler stream.
35 """
37 _DEFAULT_BUFSIZE = 8192
39 SEEK_SET = 0
40 SEEK_CUR = 1
41 SEEK_END = 2
43 FLAG_READ = 0x1
44 FLAG_WRITE = 0x2
45 FLAG_APPEND = 0x4
46 FLAG_BINARY = 0x10
47 FLAG_BUFFERED = 0x20
48 FLAG_LINE_BUFFERED = 0x40
49 FLAG_UNIVERSAL_NEWLINE = 0x80
51 def __init__(self):
52 self.newlines = None
53 self._flags = 0
54 self._bufsize = self._DEFAULT_BUFSIZE
55 self._wbuffer = BytesIO()
56 self._rbuffer = bytes()
57 self._at_trailing_cr = False
58 self._closed = False
59 # pos - position within the file, according to the user
60 # realpos - position according the OS
61 # (these may be different because we buffer for line reading)
62 self._pos = self._realpos = 0
63 # size only matters for seekable files
64 self._size = 0
66 def __del__(self):
67 self.close()
69 def __iter__(self):
70 """
71 Returns an iterator that can be used to iterate over the lines in this
72 file. This iterator happens to return the file itself, since a file is
73 its own iterator.
75 :raises: ``ValueError`` -- if the file is closed.
76 """
77 if self._closed:
78 raise ValueError("I/O operation on closed file")
79 return self
81 def close(self):
82 """
83 Close the file. Future read and write operations will fail.
84 """
85 self.flush()
86 self._closed = True
88 def flush(self):
89 """
90 Write out any data in the write buffer. This may do nothing if write
91 buffering is not turned on.
92 """
93 self._write_all(self._wbuffer.getvalue())
94 self._wbuffer = BytesIO()
95 return
97 def __next__(self):
98 """
99 Returns the next line from the input, or raises ``StopIteration``
100 when EOF is hit. Unlike python file objects, it's okay to mix
101 calls to `.next` and `.readline`.
103 :raises: ``StopIteration`` -- when the end of the file is reached.
105 :returns:
106 a line (`str`, or `bytes` if the file was opened in binary mode)
107 read from the file.
108 """
109 line = self.readline()
110 if not line:
111 raise StopIteration
112 return line
114 def readable(self):
115 """
116 Check if the file can be read from.
118 :returns:
119 `True` if the file can be read from. If `False`, `read` will raise
120 an exception.
121 """
122 return (self._flags & self.FLAG_READ) == self.FLAG_READ
124 def writable(self):
125 """
126 Check if the file can be written to.
128 :returns:
129 `True` if the file can be written to. If `False`, `write` will
130 raise an exception.
131 """
132 return (self._flags & self.FLAG_WRITE) == self.FLAG_WRITE
134 def seekable(self):
135 """
136 Check if the file supports random access.
138 :returns:
139 `True` if the file supports random access. If `False`, `seek` will
140 raise an exception.
141 """
142 return False
144 def readinto(self, buff):
145 """
146 Read up to ``len(buff)`` bytes into ``bytearray`` *buff* and return the
147 number of bytes read.
149 :returns:
150 The number of bytes read.
151 """
152 data = self.read(len(buff))
153 buff[: len(data)] = data
154 return len(data)
156 def read(self, size=None):
157 """
158 Read at most ``size`` bytes from the file (less if we hit the end of
159 the file first). If the ``size`` argument is negative or omitted,
160 read all the remaining data in the file.
162 .. note::
163 ``'b'`` mode flag is ignored (``self.FLAG_BINARY`` in
164 ``self._flags``), because SSH treats all files as binary, since we
165 have no idea what encoding the file is in, or even if the file is
166 text data.
168 :param int size: maximum number of bytes to read
169 :returns:
170 data read from the file (as bytes), or an empty string if EOF was
171 encountered immediately
172 """
173 if self._closed:
174 raise IOError("File is closed")
175 if not (self._flags & self.FLAG_READ):
176 raise IOError("File is not open for reading")
177 if (size is None) or (size < 0):
178 # go for broke
179 result = bytearray(self._rbuffer)
180 self._rbuffer = bytes()
181 self._pos += len(result)
182 while True:
183 try:
184 new_data = self._read(self._DEFAULT_BUFSIZE)
185 except EOFError:
186 new_data = None
187 if (new_data is None) or (len(new_data) == 0):
188 break
189 result.extend(new_data)
190 self._realpos += len(new_data)
191 self._pos += len(new_data)
192 return bytes(result)
193 if size <= len(self._rbuffer):
194 result = self._rbuffer[:size]
195 self._rbuffer = self._rbuffer[size:]
196 self._pos += len(result)
197 return result
198 while len(self._rbuffer) < size:
199 read_size = size - len(self._rbuffer)
200 if self._flags & self.FLAG_BUFFERED:
201 read_size = max(self._bufsize, read_size)
202 try:
203 new_data = self._read(read_size)
204 except EOFError:
205 new_data = None
206 if (new_data is None) or (len(new_data) == 0):
207 break
208 self._rbuffer += new_data
209 self._realpos += len(new_data)
210 result = self._rbuffer[:size]
211 self._rbuffer = self._rbuffer[size:]
212 self._pos += len(result)
213 return result
215 def readline(self, size=None):
216 """
217 Read one entire line from the file. A trailing newline character is
218 kept in the string (but may be absent when a file ends with an
219 incomplete line). If the size argument is present and non-negative, it
220 is a maximum byte count (including the trailing newline) and an
221 incomplete line may be returned. An empty string is returned only when
222 EOF is encountered immediately.
224 .. note::
225 Unlike stdio's ``fgets``, the returned string contains null
226 characters (``'\\0'``) if they occurred in the input.
228 :param int size: maximum length of returned string.
229 :returns:
230 next line of the file, or an empty string if the end of the
231 file has been reached.
233 If the file was opened in binary (``'b'``) mode: bytes are returned
234 Else: the encoding of the file is assumed to be UTF-8 and character
235 strings (`str`) are returned
236 """
237 # it's almost silly how complex this function is.
238 if self._closed:
239 raise IOError("File is closed")
240 if not (self._flags & self.FLAG_READ):
241 raise IOError("File not open for reading")
242 line = self._rbuffer
243 truncated = False
244 while True:
245 if (
246 self._at_trailing_cr
247 and self._flags & self.FLAG_UNIVERSAL_NEWLINE
248 and len(line) > 0
249 ):
250 # edge case: the newline may be '\r\n' and we may have read
251 # only the first '\r' last time.
252 if line[0] == linefeed_byte_value:
253 line = line[1:]
254 self._record_newline(crlf)
255 else:
256 self._record_newline(cr_byte)
257 self._at_trailing_cr = False
258 # check size before looking for a linefeed, in case we already have
259 # enough.
260 if (size is not None) and (size >= 0):
261 if len(line) >= size:
262 # truncate line
263 self._rbuffer = line[size:]
264 line = line[:size]
265 truncated = True
266 break
267 n = size - len(line)
268 else:
269 n = self._bufsize
270 if linefeed_byte in line or (
271 self._flags & self.FLAG_UNIVERSAL_NEWLINE and cr_byte in line
272 ):
273 break
274 try:
275 new_data = self._read(n)
276 except EOFError:
277 new_data = None
278 if (new_data is None) or (len(new_data) == 0):
279 self._rbuffer = bytes()
280 self._pos += len(line)
281 return line if self._flags & self.FLAG_BINARY else u(line)
282 line += new_data
283 self._realpos += len(new_data)
284 # find the newline
285 pos = line.find(linefeed_byte)
286 if self._flags & self.FLAG_UNIVERSAL_NEWLINE:
287 rpos = line.find(cr_byte)
288 if (rpos >= 0) and (rpos < pos or pos < 0):
289 pos = rpos
290 if pos == -1:
291 # we couldn't find a newline in the truncated string, return it
292 self._pos += len(line)
293 return line if self._flags & self.FLAG_BINARY else u(line)
294 xpos = pos + 1
295 if (
296 line[pos] == cr_byte_value
297 and xpos < len(line)
298 and line[xpos] == linefeed_byte_value
299 ):
300 xpos += 1
301 # if the string was truncated, _rbuffer needs to have the string after
302 # the newline character plus the truncated part of the line we stored
303 # earlier in _rbuffer
304 if truncated:
305 self._rbuffer = line[xpos:] + self._rbuffer
306 else:
307 self._rbuffer = line[xpos:]
309 lf = line[pos:xpos]
310 line = line[:pos] + linefeed_byte
311 if (len(self._rbuffer) == 0) and (lf == cr_byte):
312 # we could read the line up to a '\r' and there could still be a
313 # '\n' following that we read next time. note that and eat it.
314 self._at_trailing_cr = True
315 else:
316 self._record_newline(lf)
317 self._pos += len(line)
318 return line if self._flags & self.FLAG_BINARY else u(line)
320 def readlines(self, sizehint=None):
321 """
322 Read all remaining lines using `readline` and return them as a list.
323 If the optional ``sizehint`` argument is present, instead of reading up
324 to EOF, whole lines totalling approximately sizehint bytes (possibly
325 after rounding up to an internal buffer size) are read.
327 :param int sizehint: desired maximum number of bytes to read.
328 :returns: list of lines read from the file.
329 """
330 lines = []
331 byte_count = 0
332 while True:
333 line = self.readline()
334 if len(line) == 0:
335 break
336 lines.append(line)
337 byte_count += len(line)
338 if (sizehint is not None) and (byte_count >= sizehint):
339 break
340 return lines
342 def seek(self, offset, whence=0):
343 """
344 Set the file's current position, like stdio's ``fseek``. Not all file
345 objects support seeking.
347 .. note::
348 If a file is opened in append mode (``'a'`` or ``'a+'``), any seek
349 operations will be undone at the next write (as the file position
350 will move back to the end of the file).
352 :param int offset:
353 position to move to within the file, relative to ``whence``.
354 :param int whence:
355 type of movement: 0 = absolute; 1 = relative to the current
356 position; 2 = relative to the end of the file.
358 :raises: ``IOError`` -- if the file doesn't support random access.
359 """
360 raise IOError("File does not support seeking.")
362 def tell(self):
363 """
364 Return the file's current position. This may not be accurate or
365 useful if the underlying file doesn't support random access, or was
366 opened in append mode.
368 :returns: file position (`number <int>` of bytes).
369 """
370 return self._pos
372 def write(self, data):
373 """
374 Write data to the file. If write buffering is on (``bufsize`` was
375 specified and non-zero), some or all of the data may not actually be
376 written yet. (Use `flush` or `close` to force buffered data to be
377 written out.)
379 :param data: ``str``/``bytes`` data to write
380 """
381 if isinstance(data, str):
382 # Accept text and encode as utf-8 for compatibility only.
383 data = data.encode("utf-8")
384 if self._closed:
385 raise IOError("File is closed")
386 if not (self._flags & self.FLAG_WRITE):
387 raise IOError("File not open for writing")
388 if not (self._flags & self.FLAG_BUFFERED):
389 self._write_all(data)
390 return
391 self._wbuffer.write(data)
392 if self._flags & self.FLAG_LINE_BUFFERED:
393 # only scan the new data for linefeed, to avoid wasting time.
394 last_newline_pos = data.rfind(linefeed_byte)
395 if last_newline_pos >= 0:
396 wbuf = self._wbuffer.getvalue()
397 last_newline_pos += len(wbuf) - len(data)
398 self._write_all(wbuf[: last_newline_pos + 1])
399 self._wbuffer = BytesIO()
400 self._wbuffer.write(wbuf[last_newline_pos + 1 :])
401 return
402 # even if we're line buffering, if the buffer has grown past the
403 # buffer size, force a flush.
404 if self._wbuffer.tell() >= self._bufsize:
405 self.flush()
406 return
408 def writelines(self, sequence):
409 """
410 Write a sequence of strings to the file. The sequence can be any
411 iterable object producing strings, typically a list of strings. (The
412 name is intended to match `readlines`; `writelines` does not add line
413 separators.)
415 :param sequence: an iterable sequence of strings.
416 """
417 for line in sequence:
418 self.write(line)
419 return
421 def xreadlines(self):
422 """
423 Identical to ``iter(f)``. This is a deprecated file interface that
424 predates Python iterator support.
425 """
426 return self
428 @property
429 def closed(self):
430 return self._closed
432 # ...overrides...
434 def _read(self, size):
435 """
436 (subclass override)
437 Read data from the stream. Return ``None`` or raise ``EOFError`` to
438 indicate EOF.
439 """
440 raise EOFError()
442 def _write(self, data):
443 """
444 (subclass override)
445 Write data into the stream.
446 """
447 raise IOError("write not implemented")
449 def _get_size(self):
450 """
451 (subclass override)
452 Return the size of the file. This is called from within `_set_mode`
453 if the file is opened in append mode, so the file position can be
454 tracked and `seek` and `tell` will work correctly. If the file is
455 a stream that can't be randomly accessed, you don't need to override
456 this method,
457 """
458 return 0
460 # ...internals...
462 def _set_mode(self, mode="r", bufsize=-1):
463 """
464 Subclasses call this method to initialize the BufferedFile.
465 """
466 # set bufsize in any event, because it's used for readline().
467 self._bufsize = self._DEFAULT_BUFSIZE
468 if bufsize < 0:
469 # do no buffering by default, because otherwise writes will get
470 # buffered in a way that will probably confuse people.
471 bufsize = 0
472 if bufsize == 1:
473 # apparently, line buffering only affects writes. reads are only
474 # buffered if you call readline (directly or indirectly: iterating
475 # over a file will indirectly call readline).
476 self._flags |= self.FLAG_BUFFERED | self.FLAG_LINE_BUFFERED
477 elif bufsize > 1:
478 self._bufsize = bufsize
479 self._flags |= self.FLAG_BUFFERED
480 self._flags &= ~self.FLAG_LINE_BUFFERED
481 elif bufsize == 0:
482 # unbuffered
483 self._flags &= ~(self.FLAG_BUFFERED | self.FLAG_LINE_BUFFERED)
485 if ("r" in mode) or ("+" in mode):
486 self._flags |= self.FLAG_READ
487 if ("w" in mode) or ("+" in mode):
488 self._flags |= self.FLAG_WRITE
489 if "a" in mode:
490 self._flags |= self.FLAG_WRITE | self.FLAG_APPEND
491 self._size = self._get_size()
492 self._pos = self._realpos = self._size
493 if "b" in mode:
494 self._flags |= self.FLAG_BINARY
495 if "U" in mode:
496 self._flags |= self.FLAG_UNIVERSAL_NEWLINE
497 # built-in file objects have this attribute to store which kinds of
498 # line terminations they've seen:
499 # <http://www.python.org/doc/current/lib/built-in-funcs.html>
500 self.newlines = None
502 def _write_all(self, raw_data):
503 # the underlying stream may be something that does partial writes (like
504 # a socket).
505 data = memoryview(raw_data)
506 while len(data) > 0:
507 count = self._write(data)
508 data = data[count:]
509 if self._flags & self.FLAG_APPEND:
510 self._size += count
511 self._pos = self._realpos = self._size
512 else:
513 self._pos += count
514 self._realpos += count
515 return None
517 def _record_newline(self, newline):
518 # silliness about tracking what kinds of newlines we've seen.
519 # i don't understand why it can be None, a string, or a tuple, instead
520 # of just always being a tuple, but we'll emulate that behavior anyway.
521 if not (self._flags & self.FLAG_UNIVERSAL_NEWLINE):
522 return
523 if self.newlines is None:
524 self.newlines = newline
525 elif self.newlines != newline and isinstance(self.newlines, bytes):
526 self.newlines = (self.newlines, newline)
527 elif newline not in self.newlines:
528 self.newlines += (newline,)