Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pexpect/expect.py: 8%
232 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-20 06:09 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-20 06:09 +0000
1import time
3from .exceptions import EOF, TIMEOUT
5class Expecter(object):
6 def __init__(self, spawn, searcher, searchwindowsize=-1):
7 self.spawn = spawn
8 self.searcher = searcher
9 # A value of -1 means to use the figure from spawn, which should
10 # be None or a positive number.
11 if searchwindowsize == -1:
12 searchwindowsize = spawn.searchwindowsize
13 self.searchwindowsize = searchwindowsize
14 self.lookback = None
15 if hasattr(searcher, 'longest_string'):
16 self.lookback = searcher.longest_string
18 def do_search(self, window, freshlen):
19 spawn = self.spawn
20 searcher = self.searcher
21 if freshlen > len(window):
22 freshlen = len(window)
23 index = searcher.search(window, freshlen, self.searchwindowsize)
24 if index >= 0:
25 spawn._buffer = spawn.buffer_type()
26 spawn._buffer.write(window[searcher.end:])
27 spawn.before = spawn._before.getvalue()[
28 0:-(len(window) - searcher.start)]
29 spawn._before = spawn.buffer_type()
30 spawn._before.write(window[searcher.end:])
31 spawn.after = window[searcher.start:searcher.end]
32 spawn.match = searcher.match
33 spawn.match_index = index
34 # Found a match
35 return index
36 elif self.searchwindowsize or self.lookback:
37 maintain = self.searchwindowsize or self.lookback
38 if spawn._buffer.tell() > maintain:
39 spawn._buffer = spawn.buffer_type()
40 spawn._buffer.write(window[-maintain:])
42 def existing_data(self):
43 # First call from a new call to expect_loop or expect_async.
44 # self.searchwindowsize may have changed.
45 # Treat all data as fresh.
46 spawn = self.spawn
47 before_len = spawn._before.tell()
48 buf_len = spawn._buffer.tell()
49 freshlen = before_len
50 if before_len > buf_len:
51 if not self.searchwindowsize:
52 spawn._buffer = spawn.buffer_type()
53 window = spawn._before.getvalue()
54 spawn._buffer.write(window)
55 elif buf_len < self.searchwindowsize:
56 spawn._buffer = spawn.buffer_type()
57 spawn._before.seek(
58 max(0, before_len - self.searchwindowsize))
59 window = spawn._before.read()
60 spawn._buffer.write(window)
61 else:
62 spawn._buffer.seek(max(0, buf_len - self.searchwindowsize))
63 window = spawn._buffer.read()
64 else:
65 if self.searchwindowsize:
66 spawn._buffer.seek(max(0, buf_len - self.searchwindowsize))
67 window = spawn._buffer.read()
68 else:
69 window = spawn._buffer.getvalue()
70 return self.do_search(window, freshlen)
72 def new_data(self, data):
73 # A subsequent call, after a call to existing_data.
74 spawn = self.spawn
75 freshlen = len(data)
76 spawn._before.write(data)
77 if not self.searchwindowsize:
78 if self.lookback:
79 # search lookback + new data.
80 old_len = spawn._buffer.tell()
81 spawn._buffer.write(data)
82 spawn._buffer.seek(max(0, old_len - self.lookback))
83 window = spawn._buffer.read()
84 else:
85 # copy the whole buffer (really slow for large datasets).
86 spawn._buffer.write(data)
87 window = spawn.buffer
88 else:
89 if len(data) >= self.searchwindowsize or not spawn._buffer.tell():
90 window = data[-self.searchwindowsize:]
91 spawn._buffer = spawn.buffer_type()
92 spawn._buffer.write(window[-self.searchwindowsize:])
93 else:
94 spawn._buffer.write(data)
95 new_len = spawn._buffer.tell()
96 spawn._buffer.seek(max(0, new_len - self.searchwindowsize))
97 window = spawn._buffer.read()
98 return self.do_search(window, freshlen)
100 def eof(self, err=None):
101 spawn = self.spawn
103 spawn.before = spawn._before.getvalue()
104 spawn._buffer = spawn.buffer_type()
105 spawn._before = spawn.buffer_type()
106 spawn.after = EOF
107 index = self.searcher.eof_index
108 if index >= 0:
109 spawn.match = EOF
110 spawn.match_index = index
111 return index
112 else:
113 spawn.match = None
114 spawn.match_index = None
115 msg = str(spawn)
116 msg += '\nsearcher: %s' % self.searcher
117 if err is not None:
118 msg = str(err) + '\n' + msg
120 exc = EOF(msg)
121 exc.__cause__ = None # in Python 3.x we can use "raise exc from None"
122 raise exc
124 def timeout(self, err=None):
125 spawn = self.spawn
127 spawn.before = spawn._before.getvalue()
128 spawn.after = TIMEOUT
129 index = self.searcher.timeout_index
130 if index >= 0:
131 spawn.match = TIMEOUT
132 spawn.match_index = index
133 return index
134 else:
135 spawn.match = None
136 spawn.match_index = None
137 msg = str(spawn)
138 msg += '\nsearcher: %s' % self.searcher
139 if err is not None:
140 msg = str(err) + '\n' + msg
142 exc = TIMEOUT(msg)
143 exc.__cause__ = None # in Python 3.x we can use "raise exc from None"
144 raise exc
146 def errored(self):
147 spawn = self.spawn
148 spawn.before = spawn._before.getvalue()
149 spawn.after = None
150 spawn.match = None
151 spawn.match_index = None
153 def expect_loop(self, timeout=-1):
154 """Blocking expect"""
155 spawn = self.spawn
157 if timeout is not None:
158 end_time = time.time() + timeout
160 try:
161 idx = self.existing_data()
162 if idx is not None:
163 return idx
164 while True:
165 # No match at this point
166 if (timeout is not None) and (timeout < 0):
167 return self.timeout()
168 # Still have time left, so read more data
169 incoming = spawn.read_nonblocking(spawn.maxread, timeout)
170 if self.spawn.delayafterread is not None:
171 time.sleep(self.spawn.delayafterread)
172 idx = self.new_data(incoming)
173 # Keep reading until exception or return.
174 if idx is not None:
175 return idx
176 if timeout is not None:
177 timeout = end_time - time.time()
178 except EOF as e:
179 return self.eof(e)
180 except TIMEOUT as e:
181 return self.timeout(e)
182 except:
183 self.errored()
184 raise
187class searcher_string(object):
188 '''This is a plain string search helper for the spawn.expect_any() method.
189 This helper class is for speed. For more powerful regex patterns
190 see the helper class, searcher_re.
192 Attributes:
194 eof_index - index of EOF, or -1
195 timeout_index - index of TIMEOUT, or -1
197 After a successful match by the search() method the following attributes
198 are available:
200 start - index into the buffer, first byte of match
201 end - index into the buffer, first byte after match
202 match - the matching string itself
204 '''
206 def __init__(self, strings):
207 '''This creates an instance of searcher_string. This argument 'strings'
208 may be a list; a sequence of strings; or the EOF or TIMEOUT types. '''
210 self.eof_index = -1
211 self.timeout_index = -1
212 self._strings = []
213 self.longest_string = 0
214 for n, s in enumerate(strings):
215 if s is EOF:
216 self.eof_index = n
217 continue
218 if s is TIMEOUT:
219 self.timeout_index = n
220 continue
221 self._strings.append((n, s))
222 if len(s) > self.longest_string:
223 self.longest_string = len(s)
225 def __str__(self):
226 '''This returns a human-readable string that represents the state of
227 the object.'''
229 ss = [(ns[0], ' %d: %r' % ns) for ns in self._strings]
230 ss.append((-1, 'searcher_string:'))
231 if self.eof_index >= 0:
232 ss.append((self.eof_index, ' %d: EOF' % self.eof_index))
233 if self.timeout_index >= 0:
234 ss.append((self.timeout_index,
235 ' %d: TIMEOUT' % self.timeout_index))
236 ss.sort()
237 ss = list(zip(*ss))[1]
238 return '\n'.join(ss)
240 def search(self, buffer, freshlen, searchwindowsize=None):
241 '''This searches 'buffer' for the first occurrence of one of the search
242 strings. 'freshlen' must indicate the number of bytes at the end of
243 'buffer' which have not been searched before. It helps to avoid
244 searching the same, possibly big, buffer over and over again.
246 See class spawn for the 'searchwindowsize' argument.
248 If there is a match this returns the index of that string, and sets
249 'start', 'end' and 'match'. Otherwise, this returns -1. '''
251 first_match = None
253 # 'freshlen' helps a lot here. Further optimizations could
254 # possibly include:
255 #
256 # using something like the Boyer-Moore Fast String Searching
257 # Algorithm; pre-compiling the search through a list of
258 # strings into something that can scan the input once to
259 # search for all N strings; realize that if we search for
260 # ['bar', 'baz'] and the input is '...foo' we need not bother
261 # rescanning until we've read three more bytes.
262 #
263 # Sadly, I don't know enough about this interesting topic. /grahn
265 for index, s in self._strings:
266 if searchwindowsize is None:
267 # the match, if any, can only be in the fresh data,
268 # or at the very end of the old data
269 offset = -(freshlen + len(s))
270 else:
271 # better obey searchwindowsize
272 offset = -searchwindowsize
273 n = buffer.find(s, offset)
274 if n >= 0 and (first_match is None or n < first_match):
275 first_match = n
276 best_index, best_match = index, s
277 if first_match is None:
278 return -1
279 self.match = best_match
280 self.start = first_match
281 self.end = self.start + len(self.match)
282 return best_index
285class searcher_re(object):
286 '''This is regular expression string search helper for the
287 spawn.expect_any() method. This helper class is for powerful
288 pattern matching. For speed, see the helper class, searcher_string.
290 Attributes:
292 eof_index - index of EOF, or -1
293 timeout_index - index of TIMEOUT, or -1
295 After a successful match by the search() method the following attributes
296 are available:
298 start - index into the buffer, first byte of match
299 end - index into the buffer, first byte after match
300 match - the re.match object returned by a successful re.search
302 '''
304 def __init__(self, patterns):
305 '''This creates an instance that searches for 'patterns' Where
306 'patterns' may be a list or other sequence of compiled regular
307 expressions, or the EOF or TIMEOUT types.'''
309 self.eof_index = -1
310 self.timeout_index = -1
311 self._searches = []
312 for n, s in enumerate(patterns):
313 if s is EOF:
314 self.eof_index = n
315 continue
316 if s is TIMEOUT:
317 self.timeout_index = n
318 continue
319 self._searches.append((n, s))
321 def __str__(self):
322 '''This returns a human-readable string that represents the state of
323 the object.'''
325 #ss = [(n, ' %d: re.compile("%s")' %
326 # (n, repr(s.pattern))) for n, s in self._searches]
327 ss = list()
328 for n, s in self._searches:
329 ss.append((n, ' %d: re.compile(%r)' % (n, s.pattern)))
330 ss.append((-1, 'searcher_re:'))
331 if self.eof_index >= 0:
332 ss.append((self.eof_index, ' %d: EOF' % self.eof_index))
333 if self.timeout_index >= 0:
334 ss.append((self.timeout_index, ' %d: TIMEOUT' %
335 self.timeout_index))
336 ss.sort()
337 ss = list(zip(*ss))[1]
338 return '\n'.join(ss)
340 def search(self, buffer, freshlen, searchwindowsize=None):
341 '''This searches 'buffer' for the first occurrence of one of the regular
342 expressions. 'freshlen' must indicate the number of bytes at the end of
343 'buffer' which have not been searched before.
345 See class spawn for the 'searchwindowsize' argument.
347 If there is a match this returns the index of that string, and sets
348 'start', 'end' and 'match'. Otherwise, returns -1.'''
350 first_match = None
351 # 'freshlen' doesn't help here -- we cannot predict the
352 # length of a match, and the re module provides no help.
353 if searchwindowsize is None:
354 searchstart = 0
355 else:
356 searchstart = max(0, len(buffer) - searchwindowsize)
357 for index, s in self._searches:
358 match = s.search(buffer, searchstart)
359 if match is None:
360 continue
361 n = match.start()
362 if first_match is None or n < first_match:
363 first_match = n
364 the_match = match
365 best_index = index
366 if first_match is None:
367 return -1
368 self.start = first_match
369 self.match = the_match
370 self.end = self.match.end()
371 return best_index