Coverage for /pythoncovmergedfiles/medio/medio/src/paramiko/paramiko/buffered_pipe.py: 20%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

89 statements  

1# Copyright (C) 2006-2007 Robey Pointer <robeypointer@gmail.com> 

2# 

3# This file is part of paramiko. 

4# 

5# Paramiko is free software; you can redistribute it and/or modify it under the 

6# terms of the GNU Lesser General Public License as published by the Free 

7# Software Foundation; either version 2.1 of the License, or (at your option) 

8# any later version. 

9# 

10# Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY 

11# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 

12# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more 

13# details. 

14# 

15# You should have received a copy of the GNU Lesser General Public License 

16# along with Paramiko; if not, write to the Free Software Foundation, Inc., 

17# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 

18 

19""" 

20Attempt to generalize the "feeder" part of a `.Channel`: an object which can be 

21read from and closed, but is reading from a buffer fed by another thread. The 

22read operations are blocking and can have a timeout set. 

23""" 

24 

25import array 

26import threading 

27import time 

28from paramiko.util import b 

29 

30 

31class PipeTimeout(IOError): 

32 """ 

33 Indicates that a timeout was reached on a read from a `.BufferedPipe`. 

34 """ 

35 

36 pass 

37 

38 

39class BufferedPipe: 

40 """ 

41 A buffer that obeys normal read (with timeout) & close semantics for a 

42 file or socket, but is fed data from another thread. This is used by 

43 `.Channel`. 

44 """ 

45 

46 def __init__(self): 

47 self._lock = threading.Lock() 

48 self._cv = threading.Condition(self._lock) 

49 self._event = None 

50 self._buffer = array.array("B") 

51 self._closed = False 

52 

53 def _buffer_frombytes(self, data): 

54 self._buffer.frombytes(data) 

55 

56 def _buffer_tobytes(self, limit=None): 

57 return self._buffer[:limit].tobytes() 

58 

59 def set_event(self, event): 

60 """ 

61 Set an event on this buffer. When data is ready to be read (or the 

62 buffer has been closed), the event will be set. When no data is 

63 ready, the event will be cleared. 

64 

65 :param threading.Event event: the event to set/clear 

66 """ 

67 self._lock.acquire() 

68 try: 

69 self._event = event 

70 # Make sure the event starts in `set` state if we appear to already 

71 # be closed; otherwise, if we start in `clear` state & are closed, 

72 # nothing will ever call `.feed` and the event (& OS pipe, if we're 

73 # wrapping one - see `Channel.fileno`) will permanently stay in 

74 # `clear`, causing deadlock if e.g. `select`ed upon. 

75 if self._closed or len(self._buffer) > 0: 

76 event.set() 

77 else: 

78 event.clear() 

79 finally: 

80 self._lock.release() 

81 

82 def feed(self, data): 

83 """ 

84 Feed new data into this pipe. This method is assumed to be called 

85 from a separate thread, so synchronization is done. 

86 

87 :param data: the data to add, as a ``str`` or ``bytes`` 

88 """ 

89 self._lock.acquire() 

90 try: 

91 if self._event is not None: 

92 self._event.set() 

93 self._buffer_frombytes(b(data)) 

94 self._cv.notify_all() 

95 finally: 

96 self._lock.release() 

97 

98 def read_ready(self): 

99 """ 

100 Returns true if data is buffered and ready to be read from this 

101 feeder. A ``False`` result does not mean that the feeder has closed; 

102 it means you may need to wait before more data arrives. 

103 

104 :return: 

105 ``True`` if a `read` call would immediately return at least one 

106 byte; ``False`` otherwise. 

107 """ 

108 self._lock.acquire() 

109 try: 

110 if len(self._buffer) == 0: 

111 return False 

112 return True 

113 finally: 

114 self._lock.release() 

115 

116 def read(self, nbytes, timeout=None): 

117 """ 

118 Read data from the pipe. The return value is a string representing 

119 the data received. The maximum amount of data to be received at once 

120 is specified by ``nbytes``. If a string of length zero is returned, 

121 the pipe has been closed. 

122 

123 The optional ``timeout`` argument can be a nonnegative float expressing 

124 seconds, or ``None`` for no timeout. If a float is given, a 

125 `.PipeTimeout` will be raised if the timeout period value has elapsed 

126 before any data arrives. 

127 

128 :param int nbytes: maximum number of bytes to read 

129 :param float timeout: 

130 maximum seconds to wait (or ``None``, the default, to wait forever) 

131 :return: the read data, as a ``str`` or ``bytes`` 

132 

133 :raises: 

134 `.PipeTimeout` -- if a timeout was specified and no data was ready 

135 before that timeout 

136 """ 

137 out = bytes() 

138 self._lock.acquire() 

139 try: 

140 if len(self._buffer) == 0: 

141 if self._closed: 

142 return out 

143 # should we block? 

144 if timeout == 0.0: 

145 raise PipeTimeout() 

146 # loop here in case we get woken up but a different thread has 

147 # grabbed everything in the buffer. 

148 while (len(self._buffer) == 0) and not self._closed: 

149 then = time.time() 

150 self._cv.wait(timeout) 

151 if timeout is not None: 

152 timeout -= time.time() - then 

153 if timeout <= 0.0: 

154 raise PipeTimeout() 

155 

156 # something's in the buffer and we have the lock! 

157 if len(self._buffer) <= nbytes: 

158 out = self._buffer_tobytes() 

159 del self._buffer[:] 

160 if (self._event is not None) and not self._closed: 

161 self._event.clear() 

162 else: 

163 out = self._buffer_tobytes(nbytes) 

164 del self._buffer[:nbytes] 

165 finally: 

166 self._lock.release() 

167 

168 return out 

169 

170 def empty(self): 

171 """ 

172 Clear out the buffer and return all data that was in it. 

173 

174 :return: 

175 any data that was in the buffer prior to clearing it out, as a 

176 `str` 

177 """ 

178 self._lock.acquire() 

179 try: 

180 out = self._buffer_tobytes() 

181 del self._buffer[:] 

182 if (self._event is not None) and not self._closed: 

183 self._event.clear() 

184 return out 

185 finally: 

186 self._lock.release() 

187 

188 def close(self): 

189 """ 

190 Close this pipe object. Future calls to `read` after the buffer 

191 has been emptied will return immediately with an empty string. 

192 """ 

193 self._lock.acquire() 

194 try: 

195 self._closed = True 

196 self._cv.notify_all() 

197 if self._event is not None: 

198 self._event.set() 

199 finally: 

200 self._lock.release() 

201 

202 def __len__(self): 

203 """ 

204 Return the number of bytes buffered. 

205 

206 :return: number (`int`) of bytes buffered 

207 """ 

208 self._lock.acquire() 

209 try: 

210 return len(self._buffer) 

211 finally: 

212 self._lock.release()