Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pexpect/expect.py: 8%

232 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-04-20 06:09 +0000

1import time 

2 

3from .exceptions import EOF, TIMEOUT 

4 

5class Expecter(object): 

6 def __init__(self, spawn, searcher, searchwindowsize=-1): 

7 self.spawn = spawn 

8 self.searcher = searcher 

9 # A value of -1 means to use the figure from spawn, which should 

10 # be None or a positive number. 

11 if searchwindowsize == -1: 

12 searchwindowsize = spawn.searchwindowsize 

13 self.searchwindowsize = searchwindowsize 

14 self.lookback = None 

15 if hasattr(searcher, 'longest_string'): 

16 self.lookback = searcher.longest_string 

17 

18 def do_search(self, window, freshlen): 

19 spawn = self.spawn 

20 searcher = self.searcher 

21 if freshlen > len(window): 

22 freshlen = len(window) 

23 index = searcher.search(window, freshlen, self.searchwindowsize) 

24 if index >= 0: 

25 spawn._buffer = spawn.buffer_type() 

26 spawn._buffer.write(window[searcher.end:]) 

27 spawn.before = spawn._before.getvalue()[ 

28 0:-(len(window) - searcher.start)] 

29 spawn._before = spawn.buffer_type() 

30 spawn._before.write(window[searcher.end:]) 

31 spawn.after = window[searcher.start:searcher.end] 

32 spawn.match = searcher.match 

33 spawn.match_index = index 

34 # Found a match 

35 return index 

36 elif self.searchwindowsize or self.lookback: 

37 maintain = self.searchwindowsize or self.lookback 

38 if spawn._buffer.tell() > maintain: 

39 spawn._buffer = spawn.buffer_type() 

40 spawn._buffer.write(window[-maintain:]) 

41 

42 def existing_data(self): 

43 # First call from a new call to expect_loop or expect_async. 

44 # self.searchwindowsize may have changed. 

45 # Treat all data as fresh. 

46 spawn = self.spawn 

47 before_len = spawn._before.tell() 

48 buf_len = spawn._buffer.tell() 

49 freshlen = before_len 

50 if before_len > buf_len: 

51 if not self.searchwindowsize: 

52 spawn._buffer = spawn.buffer_type() 

53 window = spawn._before.getvalue() 

54 spawn._buffer.write(window) 

55 elif buf_len < self.searchwindowsize: 

56 spawn._buffer = spawn.buffer_type() 

57 spawn._before.seek( 

58 max(0, before_len - self.searchwindowsize)) 

59 window = spawn._before.read() 

60 spawn._buffer.write(window) 

61 else: 

62 spawn._buffer.seek(max(0, buf_len - self.searchwindowsize)) 

63 window = spawn._buffer.read() 

64 else: 

65 if self.searchwindowsize: 

66 spawn._buffer.seek(max(0, buf_len - self.searchwindowsize)) 

67 window = spawn._buffer.read() 

68 else: 

69 window = spawn._buffer.getvalue() 

70 return self.do_search(window, freshlen) 

71 

72 def new_data(self, data): 

73 # A subsequent call, after a call to existing_data. 

74 spawn = self.spawn 

75 freshlen = len(data) 

76 spawn._before.write(data) 

77 if not self.searchwindowsize: 

78 if self.lookback: 

79 # search lookback + new data. 

80 old_len = spawn._buffer.tell() 

81 spawn._buffer.write(data) 

82 spawn._buffer.seek(max(0, old_len - self.lookback)) 

83 window = spawn._buffer.read() 

84 else: 

85 # copy the whole buffer (really slow for large datasets). 

86 spawn._buffer.write(data) 

87 window = spawn.buffer 

88 else: 

89 if len(data) >= self.searchwindowsize or not spawn._buffer.tell(): 

90 window = data[-self.searchwindowsize:] 

91 spawn._buffer = spawn.buffer_type() 

92 spawn._buffer.write(window[-self.searchwindowsize:]) 

93 else: 

94 spawn._buffer.write(data) 

95 new_len = spawn._buffer.tell() 

96 spawn._buffer.seek(max(0, new_len - self.searchwindowsize)) 

97 window = spawn._buffer.read() 

98 return self.do_search(window, freshlen) 

99 

100 def eof(self, err=None): 

101 spawn = self.spawn 

102 

103 spawn.before = spawn._before.getvalue() 

104 spawn._buffer = spawn.buffer_type() 

105 spawn._before = spawn.buffer_type() 

106 spawn.after = EOF 

107 index = self.searcher.eof_index 

108 if index >= 0: 

109 spawn.match = EOF 

110 spawn.match_index = index 

111 return index 

112 else: 

113 spawn.match = None 

114 spawn.match_index = None 

115 msg = str(spawn) 

116 msg += '\nsearcher: %s' % self.searcher 

117 if err is not None: 

118 msg = str(err) + '\n' + msg 

119 

120 exc = EOF(msg) 

121 exc.__cause__ = None # in Python 3.x we can use "raise exc from None" 

122 raise exc 

123 

124 def timeout(self, err=None): 

125 spawn = self.spawn 

126 

127 spawn.before = spawn._before.getvalue() 

128 spawn.after = TIMEOUT 

129 index = self.searcher.timeout_index 

130 if index >= 0: 

131 spawn.match = TIMEOUT 

132 spawn.match_index = index 

133 return index 

134 else: 

135 spawn.match = None 

136 spawn.match_index = None 

137 msg = str(spawn) 

138 msg += '\nsearcher: %s' % self.searcher 

139 if err is not None: 

140 msg = str(err) + '\n' + msg 

141 

142 exc = TIMEOUT(msg) 

143 exc.__cause__ = None # in Python 3.x we can use "raise exc from None" 

144 raise exc 

145 

146 def errored(self): 

147 spawn = self.spawn 

148 spawn.before = spawn._before.getvalue() 

149 spawn.after = None 

150 spawn.match = None 

151 spawn.match_index = None 

152 

153 def expect_loop(self, timeout=-1): 

154 """Blocking expect""" 

155 spawn = self.spawn 

156 

157 if timeout is not None: 

158 end_time = time.time() + timeout 

159 

160 try: 

161 idx = self.existing_data() 

162 if idx is not None: 

163 return idx 

164 while True: 

165 # No match at this point 

166 if (timeout is not None) and (timeout < 0): 

167 return self.timeout() 

168 # Still have time left, so read more data 

169 incoming = spawn.read_nonblocking(spawn.maxread, timeout) 

170 if self.spawn.delayafterread is not None: 

171 time.sleep(self.spawn.delayafterread) 

172 idx = self.new_data(incoming) 

173 # Keep reading until exception or return. 

174 if idx is not None: 

175 return idx 

176 if timeout is not None: 

177 timeout = end_time - time.time() 

178 except EOF as e: 

179 return self.eof(e) 

180 except TIMEOUT as e: 

181 return self.timeout(e) 

182 except: 

183 self.errored() 

184 raise 

185 

186 

187class searcher_string(object): 

188 '''This is a plain string search helper for the spawn.expect_any() method. 

189 This helper class is for speed. For more powerful regex patterns 

190 see the helper class, searcher_re. 

191 

192 Attributes: 

193 

194 eof_index - index of EOF, or -1 

195 timeout_index - index of TIMEOUT, or -1 

196 

197 After a successful match by the search() method the following attributes 

198 are available: 

199 

200 start - index into the buffer, first byte of match 

201 end - index into the buffer, first byte after match 

202 match - the matching string itself 

203 

204 ''' 

205 

206 def __init__(self, strings): 

207 '''This creates an instance of searcher_string. This argument 'strings' 

208 may be a list; a sequence of strings; or the EOF or TIMEOUT types. ''' 

209 

210 self.eof_index = -1 

211 self.timeout_index = -1 

212 self._strings = [] 

213 self.longest_string = 0 

214 for n, s in enumerate(strings): 

215 if s is EOF: 

216 self.eof_index = n 

217 continue 

218 if s is TIMEOUT: 

219 self.timeout_index = n 

220 continue 

221 self._strings.append((n, s)) 

222 if len(s) > self.longest_string: 

223 self.longest_string = len(s) 

224 

225 def __str__(self): 

226 '''This returns a human-readable string that represents the state of 

227 the object.''' 

228 

229 ss = [(ns[0], ' %d: %r' % ns) for ns in self._strings] 

230 ss.append((-1, 'searcher_string:')) 

231 if self.eof_index >= 0: 

232 ss.append((self.eof_index, ' %d: EOF' % self.eof_index)) 

233 if self.timeout_index >= 0: 

234 ss.append((self.timeout_index, 

235 ' %d: TIMEOUT' % self.timeout_index)) 

236 ss.sort() 

237 ss = list(zip(*ss))[1] 

238 return '\n'.join(ss) 

239 

240 def search(self, buffer, freshlen, searchwindowsize=None): 

241 '''This searches 'buffer' for the first occurrence of one of the search 

242 strings. 'freshlen' must indicate the number of bytes at the end of 

243 'buffer' which have not been searched before. It helps to avoid 

244 searching the same, possibly big, buffer over and over again. 

245 

246 See class spawn for the 'searchwindowsize' argument. 

247 

248 If there is a match this returns the index of that string, and sets 

249 'start', 'end' and 'match'. Otherwise, this returns -1. ''' 

250 

251 first_match = None 

252 

253 # 'freshlen' helps a lot here. Further optimizations could 

254 # possibly include: 

255 # 

256 # using something like the Boyer-Moore Fast String Searching 

257 # Algorithm; pre-compiling the search through a list of 

258 # strings into something that can scan the input once to 

259 # search for all N strings; realize that if we search for 

260 # ['bar', 'baz'] and the input is '...foo' we need not bother 

261 # rescanning until we've read three more bytes. 

262 # 

263 # Sadly, I don't know enough about this interesting topic. /grahn 

264 

265 for index, s in self._strings: 

266 if searchwindowsize is None: 

267 # the match, if any, can only be in the fresh data, 

268 # or at the very end of the old data 

269 offset = -(freshlen + len(s)) 

270 else: 

271 # better obey searchwindowsize 

272 offset = -searchwindowsize 

273 n = buffer.find(s, offset) 

274 if n >= 0 and (first_match is None or n < first_match): 

275 first_match = n 

276 best_index, best_match = index, s 

277 if first_match is None: 

278 return -1 

279 self.match = best_match 

280 self.start = first_match 

281 self.end = self.start + len(self.match) 

282 return best_index 

283 

284 

285class searcher_re(object): 

286 '''This is regular expression string search helper for the 

287 spawn.expect_any() method. This helper class is for powerful 

288 pattern matching. For speed, see the helper class, searcher_string. 

289 

290 Attributes: 

291 

292 eof_index - index of EOF, or -1 

293 timeout_index - index of TIMEOUT, or -1 

294 

295 After a successful match by the search() method the following attributes 

296 are available: 

297 

298 start - index into the buffer, first byte of match 

299 end - index into the buffer, first byte after match 

300 match - the re.match object returned by a successful re.search 

301 

302 ''' 

303 

304 def __init__(self, patterns): 

305 '''This creates an instance that searches for 'patterns' Where 

306 'patterns' may be a list or other sequence of compiled regular 

307 expressions, or the EOF or TIMEOUT types.''' 

308 

309 self.eof_index = -1 

310 self.timeout_index = -1 

311 self._searches = [] 

312 for n, s in enumerate(patterns): 

313 if s is EOF: 

314 self.eof_index = n 

315 continue 

316 if s is TIMEOUT: 

317 self.timeout_index = n 

318 continue 

319 self._searches.append((n, s)) 

320 

321 def __str__(self): 

322 '''This returns a human-readable string that represents the state of 

323 the object.''' 

324 

325 #ss = [(n, ' %d: re.compile("%s")' % 

326 # (n, repr(s.pattern))) for n, s in self._searches] 

327 ss = list() 

328 for n, s in self._searches: 

329 ss.append((n, ' %d: re.compile(%r)' % (n, s.pattern))) 

330 ss.append((-1, 'searcher_re:')) 

331 if self.eof_index >= 0: 

332 ss.append((self.eof_index, ' %d: EOF' % self.eof_index)) 

333 if self.timeout_index >= 0: 

334 ss.append((self.timeout_index, ' %d: TIMEOUT' % 

335 self.timeout_index)) 

336 ss.sort() 

337 ss = list(zip(*ss))[1] 

338 return '\n'.join(ss) 

339 

340 def search(self, buffer, freshlen, searchwindowsize=None): 

341 '''This searches 'buffer' for the first occurrence of one of the regular 

342 expressions. 'freshlen' must indicate the number of bytes at the end of 

343 'buffer' which have not been searched before. 

344 

345 See class spawn for the 'searchwindowsize' argument. 

346 

347 If there is a match this returns the index of that string, and sets 

348 'start', 'end' and 'match'. Otherwise, returns -1.''' 

349 

350 first_match = None 

351 # 'freshlen' doesn't help here -- we cannot predict the 

352 # length of a match, and the re module provides no help. 

353 if searchwindowsize is None: 

354 searchstart = 0 

355 else: 

356 searchstart = max(0, len(buffer) - searchwindowsize) 

357 for index, s in self._searches: 

358 match = s.search(buffer, searchstart) 

359 if match is None: 

360 continue 

361 n = match.start() 

362 if first_match is None or n < first_match: 

363 first_match = n 

364 the_match = match 

365 best_index = index 

366 if first_match is None: 

367 return -1 

368 self.start = first_match 

369 self.match = the_match 

370 self.end = self.match.end() 

371 return best_index