Coverage for /pythoncovmergedfiles/medio/medio/src/paramiko/paramiko/buffered_pipe.py: 20%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright (C) 2006-2007 Robey Pointer <robeypointer@gmail.com>
2#
3# This file is part of paramiko.
4#
5# Paramiko is free software; you can redistribute it and/or modify it under the
6# terms of the GNU Lesser General Public License as published by the Free
7# Software Foundation; either version 2.1 of the License, or (at your option)
8# any later version.
9#
10# Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY
11# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
12# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
13# details.
14#
15# You should have received a copy of the GNU Lesser General Public License
16# along with Paramiko; if not, write to the Free Software Foundation, Inc.,
17# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19"""
20Attempt to generalize the "feeder" part of a `.Channel`: an object which can be
21read from and closed, but is reading from a buffer fed by another thread. The
22read operations are blocking and can have a timeout set.
23"""
25import array
26import threading
27import time
28from paramiko.util import b
31class PipeTimeout(IOError):
32 """
33 Indicates that a timeout was reached on a read from a `.BufferedPipe`.
34 """
36 pass
39class BufferedPipe:
40 """
41 A buffer that obeys normal read (with timeout) & close semantics for a
42 file or socket, but is fed data from another thread. This is used by
43 `.Channel`.
44 """
46 def __init__(self):
47 self._lock = threading.Lock()
48 self._cv = threading.Condition(self._lock)
49 self._event = None
50 self._buffer = array.array("B")
51 self._closed = False
53 def _buffer_frombytes(self, data):
54 self._buffer.frombytes(data)
56 def _buffer_tobytes(self, limit=None):
57 return self._buffer[:limit].tobytes()
59 def set_event(self, event):
60 """
61 Set an event on this buffer. When data is ready to be read (or the
62 buffer has been closed), the event will be set. When no data is
63 ready, the event will be cleared.
65 :param threading.Event event: the event to set/clear
66 """
67 self._lock.acquire()
68 try:
69 self._event = event
70 # Make sure the event starts in `set` state if we appear to already
71 # be closed; otherwise, if we start in `clear` state & are closed,
72 # nothing will ever call `.feed` and the event (& OS pipe, if we're
73 # wrapping one - see `Channel.fileno`) will permanently stay in
74 # `clear`, causing deadlock if e.g. `select`ed upon.
75 if self._closed or len(self._buffer) > 0:
76 event.set()
77 else:
78 event.clear()
79 finally:
80 self._lock.release()
82 def feed(self, data):
83 """
84 Feed new data into this pipe. This method is assumed to be called
85 from a separate thread, so synchronization is done.
87 :param data: the data to add, as a ``str`` or ``bytes``
88 """
89 self._lock.acquire()
90 try:
91 if self._event is not None:
92 self._event.set()
93 self._buffer_frombytes(b(data))
94 self._cv.notify_all()
95 finally:
96 self._lock.release()
98 def read_ready(self):
99 """
100 Returns true if data is buffered and ready to be read from this
101 feeder. A ``False`` result does not mean that the feeder has closed;
102 it means you may need to wait before more data arrives.
104 :return:
105 ``True`` if a `read` call would immediately return at least one
106 byte; ``False`` otherwise.
107 """
108 self._lock.acquire()
109 try:
110 if len(self._buffer) == 0:
111 return False
112 return True
113 finally:
114 self._lock.release()
116 def read(self, nbytes, timeout=None):
117 """
118 Read data from the pipe. The return value is a string representing
119 the data received. The maximum amount of data to be received at once
120 is specified by ``nbytes``. If a string of length zero is returned,
121 the pipe has been closed.
123 The optional ``timeout`` argument can be a nonnegative float expressing
124 seconds, or ``None`` for no timeout. If a float is given, a
125 `.PipeTimeout` will be raised if the timeout period value has elapsed
126 before any data arrives.
128 :param int nbytes: maximum number of bytes to read
129 :param float timeout:
130 maximum seconds to wait (or ``None``, the default, to wait forever)
131 :return: the read data, as a ``str`` or ``bytes``
133 :raises:
134 `.PipeTimeout` -- if a timeout was specified and no data was ready
135 before that timeout
136 """
137 out = bytes()
138 self._lock.acquire()
139 try:
140 if len(self._buffer) == 0:
141 if self._closed:
142 return out
143 # should we block?
144 if timeout == 0.0:
145 raise PipeTimeout()
146 # loop here in case we get woken up but a different thread has
147 # grabbed everything in the buffer.
148 while (len(self._buffer) == 0) and not self._closed:
149 then = time.time()
150 self._cv.wait(timeout)
151 if timeout is not None:
152 timeout -= time.time() - then
153 if timeout <= 0.0:
154 raise PipeTimeout()
156 # something's in the buffer and we have the lock!
157 if len(self._buffer) <= nbytes:
158 out = self._buffer_tobytes()
159 del self._buffer[:]
160 if (self._event is not None) and not self._closed:
161 self._event.clear()
162 else:
163 out = self._buffer_tobytes(nbytes)
164 del self._buffer[:nbytes]
165 finally:
166 self._lock.release()
168 return out
170 def empty(self):
171 """
172 Clear out the buffer and return all data that was in it.
174 :return:
175 any data that was in the buffer prior to clearing it out, as a
176 `str`
177 """
178 self._lock.acquire()
179 try:
180 out = self._buffer_tobytes()
181 del self._buffer[:]
182 if (self._event is not None) and not self._closed:
183 self._event.clear()
184 return out
185 finally:
186 self._lock.release()
188 def close(self):
189 """
190 Close this pipe object. Future calls to `read` after the buffer
191 has been emptied will return immediately with an empty string.
192 """
193 self._lock.acquire()
194 try:
195 self._closed = True
196 self._cv.notify_all()
197 if self._event is not None:
198 self._event.set()
199 finally:
200 self._lock.release()
202 def __len__(self):
203 """
204 Return the number of bytes buffered.
206 :return: number (`int`) of bytes buffered
207 """
208 self._lock.acquire()
209 try:
210 return len(self._buffer)
211 finally:
212 self._lock.release()