Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pexpect/spawnbase.py: 19%

225 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:07 +0000

1from io import StringIO, BytesIO 

2import codecs 

3import os 

4import sys 

5import re 

6import errno 

7from .exceptions import ExceptionPexpect, EOF, TIMEOUT 

8from .expect import Expecter, searcher_string, searcher_re 

9 

10PY3 = (sys.version_info[0] >= 3) 

11text_type = str if PY3 else unicode 

12 

13class _NullCoder(object): 

14 """Pass bytes through unchanged.""" 

15 @staticmethod 

16 def encode(b, final=False): 

17 return b 

18 

19 @staticmethod 

20 def decode(b, final=False): 

21 return b 

22 

23class SpawnBase(object): 

24 """A base class providing the backwards-compatible spawn API for Pexpect. 

25 

26 This should not be instantiated directly: use :class:`pexpect.spawn` or 

27 :class:`pexpect.fdpexpect.fdspawn`. 

28 """ 

29 encoding = None 

30 pid = None 

31 flag_eof = False 

32 

33 def __init__(self, timeout=30, maxread=2000, searchwindowsize=None, 

34 logfile=None, encoding=None, codec_errors='strict'): 

35 self.stdin = sys.stdin 

36 self.stdout = sys.stdout 

37 self.stderr = sys.stderr 

38 

39 self.searcher = None 

40 self.ignorecase = False 

41 self.before = None 

42 self.after = None 

43 self.match = None 

44 self.match_index = None 

45 self.terminated = True 

46 self.exitstatus = None 

47 self.signalstatus = None 

48 # status returned by os.waitpid 

49 self.status = None 

50 # the child file descriptor is initially closed 

51 self.child_fd = -1 

52 self.timeout = timeout 

53 self.delimiter = EOF 

54 self.logfile = logfile 

55 # input from child (read_nonblocking) 

56 self.logfile_read = None 

57 # output to send (send, sendline) 

58 self.logfile_send = None 

59 # max bytes to read at one time into buffer 

60 self.maxread = maxread 

61 # Data before searchwindowsize point is preserved, but not searched. 

62 self.searchwindowsize = searchwindowsize 

63 # Delay used before sending data to child. Time in seconds. 

64 # Set this to None to skip the time.sleep() call completely. 

65 self.delaybeforesend = 0.05 

66 # Used by close() to give kernel time to update process status. 

67 # Time in seconds. 

68 self.delayafterclose = 0.1 

69 # Used by terminate() to give kernel time to update process status. 

70 # Time in seconds. 

71 self.delayafterterminate = 0.1 

72 # Delay in seconds to sleep after each call to read_nonblocking(). 

73 # Set this to None to skip the time.sleep() call completely: that 

74 # would restore the behavior from pexpect-2.0 (for performance 

75 # reasons or because you don't want to release Python's global 

76 # interpreter lock). 

77 self.delayafterread = 0.0001 

78 self.softspace = False 

79 self.name = '<' + repr(self) + '>' 

80 self.closed = True 

81 

82 # Unicode interface 

83 self.encoding = encoding 

84 self.codec_errors = codec_errors 

85 if encoding is None: 

86 # bytes mode (accepts some unicode for backwards compatibility) 

87 self._encoder = self._decoder = _NullCoder() 

88 self.string_type = bytes 

89 self.buffer_type = BytesIO 

90 self.crlf = b'\r\n' 

91 if PY3: 

92 self.allowed_string_types = (bytes, str) 

93 self.linesep = os.linesep.encode('ascii') 

94 def write_to_stdout(b): 

95 try: 

96 return sys.stdout.buffer.write(b) 

97 except AttributeError: 

98 # If stdout has been replaced, it may not have .buffer 

99 return sys.stdout.write(b.decode('ascii', 'replace')) 

100 self.write_to_stdout = write_to_stdout 

101 else: 

102 self.allowed_string_types = (basestring,) # analysis:ignore 

103 self.linesep = os.linesep 

104 self.write_to_stdout = sys.stdout.write 

105 else: 

106 # unicode mode 

107 self._encoder = codecs.getincrementalencoder(encoding)(codec_errors) 

108 self._decoder = codecs.getincrementaldecoder(encoding)(codec_errors) 

109 self.string_type = text_type 

110 self.buffer_type = StringIO 

111 self.crlf = u'\r\n' 

112 self.allowed_string_types = (text_type, ) 

113 if PY3: 

114 self.linesep = os.linesep 

115 else: 

116 self.linesep = os.linesep.decode('ascii') 

117 # This can handle unicode in both Python 2 and 3 

118 self.write_to_stdout = sys.stdout.write 

119 # storage for async transport 

120 self.async_pw_transport = None 

121 # This is the read buffer. See maxread. 

122 self._buffer = self.buffer_type() 

123 # The buffer may be trimmed for efficiency reasons. This is the 

124 # untrimmed buffer, used to create the before attribute. 

125 self._before = self.buffer_type() 

126 

127 def _log(self, s, direction): 

128 if self.logfile is not None: 

129 self.logfile.write(s) 

130 self.logfile.flush() 

131 second_log = self.logfile_send if (direction=='send') else self.logfile_read 

132 if second_log is not None: 

133 second_log.write(s) 

134 second_log.flush() 

135 

136 # For backwards compatibility, in bytes mode (when encoding is None) 

137 # unicode is accepted for send and expect. Unicode mode is strictly unicode 

138 # only. 

139 def _coerce_expect_string(self, s): 

140 if self.encoding is None and not isinstance(s, bytes): 

141 return s.encode('ascii') 

142 return s 

143 

144 def _coerce_send_string(self, s): 

145 if self.encoding is None and not isinstance(s, bytes): 

146 return s.encode('utf-8') 

147 return s 

148 

149 def _get_buffer(self): 

150 return self._buffer.getvalue() 

151 

152 def _set_buffer(self, value): 

153 self._buffer = self.buffer_type() 

154 self._buffer.write(value) 

155 

156 # This property is provided for backwards compatability (self.buffer used 

157 # to be a string/bytes object) 

158 buffer = property(_get_buffer, _set_buffer) 

159 

160 def read_nonblocking(self, size=1, timeout=None): 

161 """This reads data from the file descriptor. 

162 

163 This is a simple implementation suitable for a regular file. Subclasses using ptys or pipes should override it. 

164 

165 The timeout parameter is ignored. 

166 """ 

167 

168 try: 

169 s = os.read(self.child_fd, size) 

170 except OSError as err: 

171 if err.args[0] == errno.EIO: 

172 # Linux-style EOF 

173 self.flag_eof = True 

174 raise EOF('End Of File (EOF). Exception style platform.') 

175 raise 

176 if s == b'': 

177 # BSD-style EOF 

178 self.flag_eof = True 

179 raise EOF('End Of File (EOF). Empty string style platform.') 

180 

181 s = self._decoder.decode(s, final=False) 

182 self._log(s, 'read') 

183 return s 

184 

185 def _pattern_type_err(self, pattern): 

186 raise TypeError('got {badtype} ({badobj!r}) as pattern, must be one' 

187 ' of: {goodtypes}, pexpect.EOF, pexpect.TIMEOUT'\ 

188 .format(badtype=type(pattern), 

189 badobj=pattern, 

190 goodtypes=', '.join([str(ast)\ 

191 for ast in self.allowed_string_types]) 

192 ) 

193 ) 

194 

195 def compile_pattern_list(self, patterns): 

196 '''This compiles a pattern-string or a list of pattern-strings. 

197 Patterns must be a StringType, EOF, TIMEOUT, SRE_Pattern, or a list of 

198 those. Patterns may also be None which results in an empty list (you 

199 might do this if waiting for an EOF or TIMEOUT condition without 

200 expecting any pattern). 

201 

202 This is used by expect() when calling expect_list(). Thus expect() is 

203 nothing more than:: 

204 

205 cpl = self.compile_pattern_list(pl) 

206 return self.expect_list(cpl, timeout) 

207 

208 If you are using expect() within a loop it may be more 

209 efficient to compile the patterns first and then call expect_list(). 

210 This avoid calls in a loop to compile_pattern_list():: 

211 

212 cpl = self.compile_pattern_list(my_pattern) 

213 while some_condition: 

214 ... 

215 i = self.expect_list(cpl, timeout) 

216 ... 

217 ''' 

218 

219 if patterns is None: 

220 return [] 

221 if not isinstance(patterns, list): 

222 patterns = [patterns] 

223 

224 # Allow dot to match \n 

225 compile_flags = re.DOTALL 

226 if self.ignorecase: 

227 compile_flags = compile_flags | re.IGNORECASE 

228 compiled_pattern_list = [] 

229 for idx, p in enumerate(patterns): 

230 if isinstance(p, self.allowed_string_types): 

231 p = self._coerce_expect_string(p) 

232 compiled_pattern_list.append(re.compile(p, compile_flags)) 

233 elif p is EOF: 

234 compiled_pattern_list.append(EOF) 

235 elif p is TIMEOUT: 

236 compiled_pattern_list.append(TIMEOUT) 

237 elif isinstance(p, type(re.compile(''))): 

238 compiled_pattern_list.append(p) 

239 else: 

240 self._pattern_type_err(p) 

241 return compiled_pattern_list 

242 

243 def expect(self, pattern, timeout=-1, searchwindowsize=-1, async_=False, **kw): 

244 '''This seeks through the stream until a pattern is matched. The 

245 pattern is overloaded and may take several types. The pattern can be a 

246 StringType, EOF, a compiled re, or a list of any of those types. 

247 Strings will be compiled to re types. This returns the index into the 

248 pattern list. If the pattern was not a list this returns index 0 on a 

249 successful match. This may raise exceptions for EOF or TIMEOUT. To 

250 avoid the EOF or TIMEOUT exceptions add EOF or TIMEOUT to the pattern 

251 list. That will cause expect to match an EOF or TIMEOUT condition 

252 instead of raising an exception. 

253 

254 If you pass a list of patterns and more than one matches, the first 

255 match in the stream is chosen. If more than one pattern matches at that 

256 point, the leftmost in the pattern list is chosen. For example:: 

257 

258 # the input is 'foobar' 

259 index = p.expect(['bar', 'foo', 'foobar']) 

260 # returns 1('foo') even though 'foobar' is a "better" match 

261 

262 Please note, however, that buffering can affect this behavior, since 

263 input arrives in unpredictable chunks. For example:: 

264 

265 # the input is 'foobar' 

266 index = p.expect(['foobar', 'foo']) 

267 # returns 0('foobar') if all input is available at once, 

268 # but returns 1('foo') if parts of the final 'bar' arrive late 

269 

270 When a match is found for the given pattern, the class instance 

271 attribute *match* becomes an re.MatchObject result. Should an EOF 

272 or TIMEOUT pattern match, then the match attribute will be an instance 

273 of that exception class. The pairing before and after class 

274 instance attributes are views of the data preceding and following 

275 the matching pattern. On general exception, class attribute 

276 *before* is all data received up to the exception, while *match* and 

277 *after* attributes are value None. 

278 

279 When the keyword argument timeout is -1 (default), then TIMEOUT will 

280 raise after the default value specified by the class timeout 

281 attribute. When None, TIMEOUT will not be raised and may block 

282 indefinitely until match. 

283 

284 When the keyword argument searchwindowsize is -1 (default), then the 

285 value specified by the class maxread attribute is used. 

286 

287 A list entry may be EOF or TIMEOUT instead of a string. This will 

288 catch these exceptions and return the index of the list entry instead 

289 of raising the exception. The attribute 'after' will be set to the 

290 exception type. The attribute 'match' will be None. This allows you to 

291 write code like this:: 

292 

293 index = p.expect(['good', 'bad', pexpect.EOF, pexpect.TIMEOUT]) 

294 if index == 0: 

295 do_something() 

296 elif index == 1: 

297 do_something_else() 

298 elif index == 2: 

299 do_some_other_thing() 

300 elif index == 3: 

301 do_something_completely_different() 

302 

303 instead of code like this:: 

304 

305 try: 

306 index = p.expect(['good', 'bad']) 

307 if index == 0: 

308 do_something() 

309 elif index == 1: 

310 do_something_else() 

311 except EOF: 

312 do_some_other_thing() 

313 except TIMEOUT: 

314 do_something_completely_different() 

315 

316 These two forms are equivalent. It all depends on what you want. You 

317 can also just expect the EOF if you are waiting for all output of a 

318 child to finish. For example:: 

319 

320 p = pexpect.spawn('/bin/ls') 

321 p.expect(pexpect.EOF) 

322 print p.before 

323 

324 If you are trying to optimize for speed then see expect_list(). 

325 

326 On Python 3.4, or Python 3.3 with asyncio installed, passing 

327 ``async_=True`` will make this return an :mod:`asyncio` coroutine, 

328 which you can yield from to get the same result that this method would 

329 normally give directly. So, inside a coroutine, you can replace this code:: 

330 

331 index = p.expect(patterns) 

332 

333 With this non-blocking form:: 

334 

335 index = yield from p.expect(patterns, async_=True) 

336 ''' 

337 if 'async' in kw: 

338 async_ = kw.pop('async') 

339 if kw: 

340 raise TypeError("Unknown keyword arguments: {}".format(kw)) 

341 

342 compiled_pattern_list = self.compile_pattern_list(pattern) 

343 return self.expect_list(compiled_pattern_list, 

344 timeout, searchwindowsize, async_) 

345 

346 def expect_list(self, pattern_list, timeout=-1, searchwindowsize=-1, 

347 async_=False, **kw): 

348 '''This takes a list of compiled regular expressions and returns the 

349 index into the pattern_list that matched the child output. The list may 

350 also contain EOF or TIMEOUT(which are not compiled regular 

351 expressions). This method is similar to the expect() method except that 

352 expect_list() does not recompile the pattern list on every call. This 

353 may help if you are trying to optimize for speed, otherwise just use 

354 the expect() method. This is called by expect(). 

355 

356 

357 Like :meth:`expect`, passing ``async_=True`` will make this return an 

358 asyncio coroutine. 

359 ''' 

360 if timeout == -1: 

361 timeout = self.timeout 

362 if 'async' in kw: 

363 async_ = kw.pop('async') 

364 if kw: 

365 raise TypeError("Unknown keyword arguments: {}".format(kw)) 

366 

367 exp = Expecter(self, searcher_re(pattern_list), searchwindowsize) 

368 if async_: 

369 from ._async import expect_async 

370 return expect_async(exp, timeout) 

371 else: 

372 return exp.expect_loop(timeout) 

373 

374 def expect_exact(self, pattern_list, timeout=-1, searchwindowsize=-1, 

375 async_=False, **kw): 

376 

377 '''This is similar to expect(), but uses plain string matching instead 

378 of compiled regular expressions in 'pattern_list'. The 'pattern_list' 

379 may be a string; a list or other sequence of strings; or TIMEOUT and 

380 EOF. 

381 

382 This call might be faster than expect() for two reasons: string 

383 searching is faster than RE matching and it is possible to limit the 

384 search to just the end of the input buffer. 

385 

386 This method is also useful when you don't want to have to worry about 

387 escaping regular expression characters that you want to match. 

388 

389 Like :meth:`expect`, passing ``async_=True`` will make this return an 

390 asyncio coroutine. 

391 ''' 

392 if timeout == -1: 

393 timeout = self.timeout 

394 if 'async' in kw: 

395 async_ = kw.pop('async') 

396 if kw: 

397 raise TypeError("Unknown keyword arguments: {}".format(kw)) 

398 

399 if (isinstance(pattern_list, self.allowed_string_types) or 

400 pattern_list in (TIMEOUT, EOF)): 

401 pattern_list = [pattern_list] 

402 

403 def prepare_pattern(pattern): 

404 if pattern in (TIMEOUT, EOF): 

405 return pattern 

406 if isinstance(pattern, self.allowed_string_types): 

407 return self._coerce_expect_string(pattern) 

408 self._pattern_type_err(pattern) 

409 

410 try: 

411 pattern_list = iter(pattern_list) 

412 except TypeError: 

413 self._pattern_type_err(pattern_list) 

414 pattern_list = [prepare_pattern(p) for p in pattern_list] 

415 

416 exp = Expecter(self, searcher_string(pattern_list), searchwindowsize) 

417 if async_: 

418 from ._async import expect_async 

419 return expect_async(exp, timeout) 

420 else: 

421 return exp.expect_loop(timeout) 

422 

423 def expect_loop(self, searcher, timeout=-1, searchwindowsize=-1): 

424 '''This is the common loop used inside expect. The 'searcher' should be 

425 an instance of searcher_re or searcher_string, which describes how and 

426 what to search for in the input. 

427 

428 See expect() for other arguments, return value and exceptions. ''' 

429 

430 exp = Expecter(self, searcher, searchwindowsize) 

431 return exp.expect_loop(timeout) 

432 

433 def read(self, size=-1): 

434 '''This reads at most "size" bytes from the file (less if the read hits 

435 EOF before obtaining size bytes). If the size argument is negative or 

436 omitted, read all data until EOF is reached. The bytes are returned as 

437 a string object. An empty string is returned when EOF is encountered 

438 immediately. ''' 

439 

440 if size == 0: 

441 return self.string_type() 

442 if size < 0: 

443 # delimiter default is EOF 

444 self.expect(self.delimiter) 

445 return self.before 

446 

447 # I could have done this more directly by not using expect(), but 

448 # I deliberately decided to couple read() to expect() so that 

449 # I would catch any bugs early and ensure consistent behavior. 

450 # It's a little less efficient, but there is less for me to 

451 # worry about if I have to later modify read() or expect(). 

452 # Note, it's OK if size==-1 in the regex. That just means it 

453 # will never match anything in which case we stop only on EOF. 

454 cre = re.compile(self._coerce_expect_string('.{%d}' % size), re.DOTALL) 

455 # delimiter default is EOF 

456 index = self.expect([cre, self.delimiter]) 

457 if index == 0: 

458 ### FIXME self.before should be ''. Should I assert this? 

459 return self.after 

460 return self.before 

461 

462 def readline(self, size=-1): 

463 '''This reads and returns one entire line. The newline at the end of 

464 line is returned as part of the string, unless the file ends without a 

465 newline. An empty string is returned if EOF is encountered immediately. 

466 This looks for a newline as a CR/LF pair (\\r\\n) even on UNIX because 

467 this is what the pseudotty device returns. So contrary to what you may 

468 expect you will receive newlines as \\r\\n. 

469 

470 If the size argument is 0 then an empty string is returned. In all 

471 other cases the size argument is ignored, which is not standard 

472 behavior for a file-like object. ''' 

473 

474 if size == 0: 

475 return self.string_type() 

476 # delimiter default is EOF 

477 index = self.expect([self.crlf, self.delimiter]) 

478 if index == 0: 

479 return self.before + self.crlf 

480 else: 

481 return self.before 

482 

483 def __iter__(self): 

484 '''This is to support iterators over a file-like object. 

485 ''' 

486 return iter(self.readline, self.string_type()) 

487 

488 def readlines(self, sizehint=-1): 

489 '''This reads until EOF using readline() and returns a list containing 

490 the lines thus read. The optional 'sizehint' argument is ignored. 

491 Remember, because this reads until EOF that means the child 

492 process should have closed its stdout. If you run this method on 

493 a child that is still running with its stdout open then this 

494 method will block until it timesout.''' 

495 

496 lines = [] 

497 while True: 

498 line = self.readline() 

499 if not line: 

500 break 

501 lines.append(line) 

502 return lines 

503 

504 def fileno(self): 

505 '''Expose file descriptor for a file-like interface 

506 ''' 

507 return self.child_fd 

508 

509 def flush(self): 

510 '''This does nothing. It is here to support the interface for a 

511 File-like object. ''' 

512 pass 

513 

514 def isatty(self): 

515 """Overridden in subclass using tty""" 

516 return False 

517 

518 # For 'with spawn(...) as child:' 

519 def __enter__(self): 

520 return self 

521 

522 def __exit__(self, etype, evalue, tb): 

523 # We rely on subclasses to implement close(). If they don't, it's not 

524 # clear what a context manager should do. 

525 self.close()