Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pexpect/spawnbase.py: 19%
225 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:07 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:07 +0000
1from io import StringIO, BytesIO
2import codecs
3import os
4import sys
5import re
6import errno
7from .exceptions import ExceptionPexpect, EOF, TIMEOUT
8from .expect import Expecter, searcher_string, searcher_re
10PY3 = (sys.version_info[0] >= 3)
11text_type = str if PY3 else unicode
13class _NullCoder(object):
14 """Pass bytes through unchanged."""
15 @staticmethod
16 def encode(b, final=False):
17 return b
19 @staticmethod
20 def decode(b, final=False):
21 return b
23class SpawnBase(object):
24 """A base class providing the backwards-compatible spawn API for Pexpect.
26 This should not be instantiated directly: use :class:`pexpect.spawn` or
27 :class:`pexpect.fdpexpect.fdspawn`.
28 """
29 encoding = None
30 pid = None
31 flag_eof = False
33 def __init__(self, timeout=30, maxread=2000, searchwindowsize=None,
34 logfile=None, encoding=None, codec_errors='strict'):
35 self.stdin = sys.stdin
36 self.stdout = sys.stdout
37 self.stderr = sys.stderr
39 self.searcher = None
40 self.ignorecase = False
41 self.before = None
42 self.after = None
43 self.match = None
44 self.match_index = None
45 self.terminated = True
46 self.exitstatus = None
47 self.signalstatus = None
48 # status returned by os.waitpid
49 self.status = None
50 # the child file descriptor is initially closed
51 self.child_fd = -1
52 self.timeout = timeout
53 self.delimiter = EOF
54 self.logfile = logfile
55 # input from child (read_nonblocking)
56 self.logfile_read = None
57 # output to send (send, sendline)
58 self.logfile_send = None
59 # max bytes to read at one time into buffer
60 self.maxread = maxread
61 # Data before searchwindowsize point is preserved, but not searched.
62 self.searchwindowsize = searchwindowsize
63 # Delay used before sending data to child. Time in seconds.
64 # Set this to None to skip the time.sleep() call completely.
65 self.delaybeforesend = 0.05
66 # Used by close() to give kernel time to update process status.
67 # Time in seconds.
68 self.delayafterclose = 0.1
69 # Used by terminate() to give kernel time to update process status.
70 # Time in seconds.
71 self.delayafterterminate = 0.1
72 # Delay in seconds to sleep after each call to read_nonblocking().
73 # Set this to None to skip the time.sleep() call completely: that
74 # would restore the behavior from pexpect-2.0 (for performance
75 # reasons or because you don't want to release Python's global
76 # interpreter lock).
77 self.delayafterread = 0.0001
78 self.softspace = False
79 self.name = '<' + repr(self) + '>'
80 self.closed = True
82 # Unicode interface
83 self.encoding = encoding
84 self.codec_errors = codec_errors
85 if encoding is None:
86 # bytes mode (accepts some unicode for backwards compatibility)
87 self._encoder = self._decoder = _NullCoder()
88 self.string_type = bytes
89 self.buffer_type = BytesIO
90 self.crlf = b'\r\n'
91 if PY3:
92 self.allowed_string_types = (bytes, str)
93 self.linesep = os.linesep.encode('ascii')
94 def write_to_stdout(b):
95 try:
96 return sys.stdout.buffer.write(b)
97 except AttributeError:
98 # If stdout has been replaced, it may not have .buffer
99 return sys.stdout.write(b.decode('ascii', 'replace'))
100 self.write_to_stdout = write_to_stdout
101 else:
102 self.allowed_string_types = (basestring,) # analysis:ignore
103 self.linesep = os.linesep
104 self.write_to_stdout = sys.stdout.write
105 else:
106 # unicode mode
107 self._encoder = codecs.getincrementalencoder(encoding)(codec_errors)
108 self._decoder = codecs.getincrementaldecoder(encoding)(codec_errors)
109 self.string_type = text_type
110 self.buffer_type = StringIO
111 self.crlf = u'\r\n'
112 self.allowed_string_types = (text_type, )
113 if PY3:
114 self.linesep = os.linesep
115 else:
116 self.linesep = os.linesep.decode('ascii')
117 # This can handle unicode in both Python 2 and 3
118 self.write_to_stdout = sys.stdout.write
119 # storage for async transport
120 self.async_pw_transport = None
121 # This is the read buffer. See maxread.
122 self._buffer = self.buffer_type()
123 # The buffer may be trimmed for efficiency reasons. This is the
124 # untrimmed buffer, used to create the before attribute.
125 self._before = self.buffer_type()
127 def _log(self, s, direction):
128 if self.logfile is not None:
129 self.logfile.write(s)
130 self.logfile.flush()
131 second_log = self.logfile_send if (direction=='send') else self.logfile_read
132 if second_log is not None:
133 second_log.write(s)
134 second_log.flush()
136 # For backwards compatibility, in bytes mode (when encoding is None)
137 # unicode is accepted for send and expect. Unicode mode is strictly unicode
138 # only.
139 def _coerce_expect_string(self, s):
140 if self.encoding is None and not isinstance(s, bytes):
141 return s.encode('ascii')
142 return s
144 def _coerce_send_string(self, s):
145 if self.encoding is None and not isinstance(s, bytes):
146 return s.encode('utf-8')
147 return s
149 def _get_buffer(self):
150 return self._buffer.getvalue()
152 def _set_buffer(self, value):
153 self._buffer = self.buffer_type()
154 self._buffer.write(value)
156 # This property is provided for backwards compatability (self.buffer used
157 # to be a string/bytes object)
158 buffer = property(_get_buffer, _set_buffer)
160 def read_nonblocking(self, size=1, timeout=None):
161 """This reads data from the file descriptor.
163 This is a simple implementation suitable for a regular file. Subclasses using ptys or pipes should override it.
165 The timeout parameter is ignored.
166 """
168 try:
169 s = os.read(self.child_fd, size)
170 except OSError as err:
171 if err.args[0] == errno.EIO:
172 # Linux-style EOF
173 self.flag_eof = True
174 raise EOF('End Of File (EOF). Exception style platform.')
175 raise
176 if s == b'':
177 # BSD-style EOF
178 self.flag_eof = True
179 raise EOF('End Of File (EOF). Empty string style platform.')
181 s = self._decoder.decode(s, final=False)
182 self._log(s, 'read')
183 return s
185 def _pattern_type_err(self, pattern):
186 raise TypeError('got {badtype} ({badobj!r}) as pattern, must be one'
187 ' of: {goodtypes}, pexpect.EOF, pexpect.TIMEOUT'\
188 .format(badtype=type(pattern),
189 badobj=pattern,
190 goodtypes=', '.join([str(ast)\
191 for ast in self.allowed_string_types])
192 )
193 )
195 def compile_pattern_list(self, patterns):
196 '''This compiles a pattern-string or a list of pattern-strings.
197 Patterns must be a StringType, EOF, TIMEOUT, SRE_Pattern, or a list of
198 those. Patterns may also be None which results in an empty list (you
199 might do this if waiting for an EOF or TIMEOUT condition without
200 expecting any pattern).
202 This is used by expect() when calling expect_list(). Thus expect() is
203 nothing more than::
205 cpl = self.compile_pattern_list(pl)
206 return self.expect_list(cpl, timeout)
208 If you are using expect() within a loop it may be more
209 efficient to compile the patterns first and then call expect_list().
210 This avoid calls in a loop to compile_pattern_list()::
212 cpl = self.compile_pattern_list(my_pattern)
213 while some_condition:
214 ...
215 i = self.expect_list(cpl, timeout)
216 ...
217 '''
219 if patterns is None:
220 return []
221 if not isinstance(patterns, list):
222 patterns = [patterns]
224 # Allow dot to match \n
225 compile_flags = re.DOTALL
226 if self.ignorecase:
227 compile_flags = compile_flags | re.IGNORECASE
228 compiled_pattern_list = []
229 for idx, p in enumerate(patterns):
230 if isinstance(p, self.allowed_string_types):
231 p = self._coerce_expect_string(p)
232 compiled_pattern_list.append(re.compile(p, compile_flags))
233 elif p is EOF:
234 compiled_pattern_list.append(EOF)
235 elif p is TIMEOUT:
236 compiled_pattern_list.append(TIMEOUT)
237 elif isinstance(p, type(re.compile(''))):
238 compiled_pattern_list.append(p)
239 else:
240 self._pattern_type_err(p)
241 return compiled_pattern_list
243 def expect(self, pattern, timeout=-1, searchwindowsize=-1, async_=False, **kw):
244 '''This seeks through the stream until a pattern is matched. The
245 pattern is overloaded and may take several types. The pattern can be a
246 StringType, EOF, a compiled re, or a list of any of those types.
247 Strings will be compiled to re types. This returns the index into the
248 pattern list. If the pattern was not a list this returns index 0 on a
249 successful match. This may raise exceptions for EOF or TIMEOUT. To
250 avoid the EOF or TIMEOUT exceptions add EOF or TIMEOUT to the pattern
251 list. That will cause expect to match an EOF or TIMEOUT condition
252 instead of raising an exception.
254 If you pass a list of patterns and more than one matches, the first
255 match in the stream is chosen. If more than one pattern matches at that
256 point, the leftmost in the pattern list is chosen. For example::
258 # the input is 'foobar'
259 index = p.expect(['bar', 'foo', 'foobar'])
260 # returns 1('foo') even though 'foobar' is a "better" match
262 Please note, however, that buffering can affect this behavior, since
263 input arrives in unpredictable chunks. For example::
265 # the input is 'foobar'
266 index = p.expect(['foobar', 'foo'])
267 # returns 0('foobar') if all input is available at once,
268 # but returns 1('foo') if parts of the final 'bar' arrive late
270 When a match is found for the given pattern, the class instance
271 attribute *match* becomes an re.MatchObject result. Should an EOF
272 or TIMEOUT pattern match, then the match attribute will be an instance
273 of that exception class. The pairing before and after class
274 instance attributes are views of the data preceding and following
275 the matching pattern. On general exception, class attribute
276 *before* is all data received up to the exception, while *match* and
277 *after* attributes are value None.
279 When the keyword argument timeout is -1 (default), then TIMEOUT will
280 raise after the default value specified by the class timeout
281 attribute. When None, TIMEOUT will not be raised and may block
282 indefinitely until match.
284 When the keyword argument searchwindowsize is -1 (default), then the
285 value specified by the class maxread attribute is used.
287 A list entry may be EOF or TIMEOUT instead of a string. This will
288 catch these exceptions and return the index of the list entry instead
289 of raising the exception. The attribute 'after' will be set to the
290 exception type. The attribute 'match' will be None. This allows you to
291 write code like this::
293 index = p.expect(['good', 'bad', pexpect.EOF, pexpect.TIMEOUT])
294 if index == 0:
295 do_something()
296 elif index == 1:
297 do_something_else()
298 elif index == 2:
299 do_some_other_thing()
300 elif index == 3:
301 do_something_completely_different()
303 instead of code like this::
305 try:
306 index = p.expect(['good', 'bad'])
307 if index == 0:
308 do_something()
309 elif index == 1:
310 do_something_else()
311 except EOF:
312 do_some_other_thing()
313 except TIMEOUT:
314 do_something_completely_different()
316 These two forms are equivalent. It all depends on what you want. You
317 can also just expect the EOF if you are waiting for all output of a
318 child to finish. For example::
320 p = pexpect.spawn('/bin/ls')
321 p.expect(pexpect.EOF)
322 print p.before
324 If you are trying to optimize for speed then see expect_list().
326 On Python 3.4, or Python 3.3 with asyncio installed, passing
327 ``async_=True`` will make this return an :mod:`asyncio` coroutine,
328 which you can yield from to get the same result that this method would
329 normally give directly. So, inside a coroutine, you can replace this code::
331 index = p.expect(patterns)
333 With this non-blocking form::
335 index = yield from p.expect(patterns, async_=True)
336 '''
337 if 'async' in kw:
338 async_ = kw.pop('async')
339 if kw:
340 raise TypeError("Unknown keyword arguments: {}".format(kw))
342 compiled_pattern_list = self.compile_pattern_list(pattern)
343 return self.expect_list(compiled_pattern_list,
344 timeout, searchwindowsize, async_)
346 def expect_list(self, pattern_list, timeout=-1, searchwindowsize=-1,
347 async_=False, **kw):
348 '''This takes a list of compiled regular expressions and returns the
349 index into the pattern_list that matched the child output. The list may
350 also contain EOF or TIMEOUT(which are not compiled regular
351 expressions). This method is similar to the expect() method except that
352 expect_list() does not recompile the pattern list on every call. This
353 may help if you are trying to optimize for speed, otherwise just use
354 the expect() method. This is called by expect().
357 Like :meth:`expect`, passing ``async_=True`` will make this return an
358 asyncio coroutine.
359 '''
360 if timeout == -1:
361 timeout = self.timeout
362 if 'async' in kw:
363 async_ = kw.pop('async')
364 if kw:
365 raise TypeError("Unknown keyword arguments: {}".format(kw))
367 exp = Expecter(self, searcher_re(pattern_list), searchwindowsize)
368 if async_:
369 from ._async import expect_async
370 return expect_async(exp, timeout)
371 else:
372 return exp.expect_loop(timeout)
374 def expect_exact(self, pattern_list, timeout=-1, searchwindowsize=-1,
375 async_=False, **kw):
377 '''This is similar to expect(), but uses plain string matching instead
378 of compiled regular expressions in 'pattern_list'. The 'pattern_list'
379 may be a string; a list or other sequence of strings; or TIMEOUT and
380 EOF.
382 This call might be faster than expect() for two reasons: string
383 searching is faster than RE matching and it is possible to limit the
384 search to just the end of the input buffer.
386 This method is also useful when you don't want to have to worry about
387 escaping regular expression characters that you want to match.
389 Like :meth:`expect`, passing ``async_=True`` will make this return an
390 asyncio coroutine.
391 '''
392 if timeout == -1:
393 timeout = self.timeout
394 if 'async' in kw:
395 async_ = kw.pop('async')
396 if kw:
397 raise TypeError("Unknown keyword arguments: {}".format(kw))
399 if (isinstance(pattern_list, self.allowed_string_types) or
400 pattern_list in (TIMEOUT, EOF)):
401 pattern_list = [pattern_list]
403 def prepare_pattern(pattern):
404 if pattern in (TIMEOUT, EOF):
405 return pattern
406 if isinstance(pattern, self.allowed_string_types):
407 return self._coerce_expect_string(pattern)
408 self._pattern_type_err(pattern)
410 try:
411 pattern_list = iter(pattern_list)
412 except TypeError:
413 self._pattern_type_err(pattern_list)
414 pattern_list = [prepare_pattern(p) for p in pattern_list]
416 exp = Expecter(self, searcher_string(pattern_list), searchwindowsize)
417 if async_:
418 from ._async import expect_async
419 return expect_async(exp, timeout)
420 else:
421 return exp.expect_loop(timeout)
423 def expect_loop(self, searcher, timeout=-1, searchwindowsize=-1):
424 '''This is the common loop used inside expect. The 'searcher' should be
425 an instance of searcher_re or searcher_string, which describes how and
426 what to search for in the input.
428 See expect() for other arguments, return value and exceptions. '''
430 exp = Expecter(self, searcher, searchwindowsize)
431 return exp.expect_loop(timeout)
433 def read(self, size=-1):
434 '''This reads at most "size" bytes from the file (less if the read hits
435 EOF before obtaining size bytes). If the size argument is negative or
436 omitted, read all data until EOF is reached. The bytes are returned as
437 a string object. An empty string is returned when EOF is encountered
438 immediately. '''
440 if size == 0:
441 return self.string_type()
442 if size < 0:
443 # delimiter default is EOF
444 self.expect(self.delimiter)
445 return self.before
447 # I could have done this more directly by not using expect(), but
448 # I deliberately decided to couple read() to expect() so that
449 # I would catch any bugs early and ensure consistent behavior.
450 # It's a little less efficient, but there is less for me to
451 # worry about if I have to later modify read() or expect().
452 # Note, it's OK if size==-1 in the regex. That just means it
453 # will never match anything in which case we stop only on EOF.
454 cre = re.compile(self._coerce_expect_string('.{%d}' % size), re.DOTALL)
455 # delimiter default is EOF
456 index = self.expect([cre, self.delimiter])
457 if index == 0:
458 ### FIXME self.before should be ''. Should I assert this?
459 return self.after
460 return self.before
462 def readline(self, size=-1):
463 '''This reads and returns one entire line. The newline at the end of
464 line is returned as part of the string, unless the file ends without a
465 newline. An empty string is returned if EOF is encountered immediately.
466 This looks for a newline as a CR/LF pair (\\r\\n) even on UNIX because
467 this is what the pseudotty device returns. So contrary to what you may
468 expect you will receive newlines as \\r\\n.
470 If the size argument is 0 then an empty string is returned. In all
471 other cases the size argument is ignored, which is not standard
472 behavior for a file-like object. '''
474 if size == 0:
475 return self.string_type()
476 # delimiter default is EOF
477 index = self.expect([self.crlf, self.delimiter])
478 if index == 0:
479 return self.before + self.crlf
480 else:
481 return self.before
483 def __iter__(self):
484 '''This is to support iterators over a file-like object.
485 '''
486 return iter(self.readline, self.string_type())
488 def readlines(self, sizehint=-1):
489 '''This reads until EOF using readline() and returns a list containing
490 the lines thus read. The optional 'sizehint' argument is ignored.
491 Remember, because this reads until EOF that means the child
492 process should have closed its stdout. If you run this method on
493 a child that is still running with its stdout open then this
494 method will block until it timesout.'''
496 lines = []
497 while True:
498 line = self.readline()
499 if not line:
500 break
501 lines.append(line)
502 return lines
504 def fileno(self):
505 '''Expose file descriptor for a file-like interface
506 '''
507 return self.child_fd
509 def flush(self):
510 '''This does nothing. It is here to support the interface for a
511 File-like object. '''
512 pass
514 def isatty(self):
515 """Overridden in subclass using tty"""
516 return False
518 # For 'with spawn(...) as child:'
519 def __enter__(self):
520 return self
522 def __exit__(self, etype, evalue, tb):
523 # We rely on subclasses to implement close(). If they don't, it's not
524 # clear what a context manager should do.
525 self.close()