Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/bleach/_vendor/html5lib/_inputstream.py: 15%

553 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-07-01 06:54 +0000

1from __future__ import absolute_import, division, unicode_literals 

2 

3from six import text_type 

4from six.moves import http_client, urllib 

5 

6import codecs 

7import re 

8from io import BytesIO, StringIO 

9 

10import webencodings 

11 

12from .constants import EOF, spaceCharacters, asciiLetters, asciiUppercase 

13from .constants import _ReparseException 

14from . import _utils 

15 

16# Non-unicode versions of constants for use in the pre-parser 

17spaceCharactersBytes = frozenset([item.encode("ascii") for item in spaceCharacters]) 

18asciiLettersBytes = frozenset([item.encode("ascii") for item in asciiLetters]) 

19asciiUppercaseBytes = frozenset([item.encode("ascii") for item in asciiUppercase]) 

20spacesAngleBrackets = spaceCharactersBytes | frozenset([b">", b"<"]) 

21 

22 

23invalid_unicode_no_surrogate = "[\u0001-\u0008\u000B\u000E-\u001F\u007F-\u009F\uFDD0-\uFDEF\uFFFE\uFFFF\U0001FFFE\U0001FFFF\U0002FFFE\U0002FFFF\U0003FFFE\U0003FFFF\U0004FFFE\U0004FFFF\U0005FFFE\U0005FFFF\U0006FFFE\U0006FFFF\U0007FFFE\U0007FFFF\U0008FFFE\U0008FFFF\U0009FFFE\U0009FFFF\U000AFFFE\U000AFFFF\U000BFFFE\U000BFFFF\U000CFFFE\U000CFFFF\U000DFFFE\U000DFFFF\U000EFFFE\U000EFFFF\U000FFFFE\U000FFFFF\U0010FFFE\U0010FFFF]" # noqa 

24 

25if _utils.supports_lone_surrogates: 

26 # Use one extra step of indirection and create surrogates with 

27 # eval. Not using this indirection would introduce an illegal 

28 # unicode literal on platforms not supporting such lone 

29 # surrogates. 

30 assert invalid_unicode_no_surrogate[-1] == "]" and invalid_unicode_no_surrogate.count("]") == 1 

31 invalid_unicode_re = re.compile(invalid_unicode_no_surrogate[:-1] + 

32 eval('"\\uD800-\\uDFFF"') + # pylint:disable=eval-used 

33 "]") 

34else: 

35 invalid_unicode_re = re.compile(invalid_unicode_no_surrogate) 

36 

37non_bmp_invalid_codepoints = {0x1FFFE, 0x1FFFF, 0x2FFFE, 0x2FFFF, 0x3FFFE, 

38 0x3FFFF, 0x4FFFE, 0x4FFFF, 0x5FFFE, 0x5FFFF, 

39 0x6FFFE, 0x6FFFF, 0x7FFFE, 0x7FFFF, 0x8FFFE, 

40 0x8FFFF, 0x9FFFE, 0x9FFFF, 0xAFFFE, 0xAFFFF, 

41 0xBFFFE, 0xBFFFF, 0xCFFFE, 0xCFFFF, 0xDFFFE, 

42 0xDFFFF, 0xEFFFE, 0xEFFFF, 0xFFFFE, 0xFFFFF, 

43 0x10FFFE, 0x10FFFF} 

44 

45ascii_punctuation_re = re.compile("[\u0009-\u000D\u0020-\u002F\u003A-\u0040\u005C\u005B-\u0060\u007B-\u007E]") 

46 

47# Cache for charsUntil() 

48charsUntilRegEx = {} 

49 

50 

51class BufferedStream(object): 

52 """Buffering for streams that do not have buffering of their own 

53 

54 The buffer is implemented as a list of chunks on the assumption that 

55 joining many strings will be slow since it is O(n**2) 

56 """ 

57 

58 def __init__(self, stream): 

59 self.stream = stream 

60 self.buffer = [] 

61 self.position = [-1, 0] # chunk number, offset 

62 

63 def tell(self): 

64 pos = 0 

65 for chunk in self.buffer[:self.position[0]]: 

66 pos += len(chunk) 

67 pos += self.position[1] 

68 return pos 

69 

70 def seek(self, pos): 

71 assert pos <= self._bufferedBytes() 

72 offset = pos 

73 i = 0 

74 while len(self.buffer[i]) < offset: 

75 offset -= len(self.buffer[i]) 

76 i += 1 

77 self.position = [i, offset] 

78 

79 def read(self, bytes): 

80 if not self.buffer: 

81 return self._readStream(bytes) 

82 elif (self.position[0] == len(self.buffer) and 

83 self.position[1] == len(self.buffer[-1])): 

84 return self._readStream(bytes) 

85 else: 

86 return self._readFromBuffer(bytes) 

87 

88 def _bufferedBytes(self): 

89 return sum([len(item) for item in self.buffer]) 

90 

91 def _readStream(self, bytes): 

92 data = self.stream.read(bytes) 

93 self.buffer.append(data) 

94 self.position[0] += 1 

95 self.position[1] = len(data) 

96 return data 

97 

98 def _readFromBuffer(self, bytes): 

99 remainingBytes = bytes 

100 rv = [] 

101 bufferIndex = self.position[0] 

102 bufferOffset = self.position[1] 

103 while bufferIndex < len(self.buffer) and remainingBytes != 0: 

104 assert remainingBytes > 0 

105 bufferedData = self.buffer[bufferIndex] 

106 

107 if remainingBytes <= len(bufferedData) - bufferOffset: 

108 bytesToRead = remainingBytes 

109 self.position = [bufferIndex, bufferOffset + bytesToRead] 

110 else: 

111 bytesToRead = len(bufferedData) - bufferOffset 

112 self.position = [bufferIndex, len(bufferedData)] 

113 bufferIndex += 1 

114 rv.append(bufferedData[bufferOffset:bufferOffset + bytesToRead]) 

115 remainingBytes -= bytesToRead 

116 

117 bufferOffset = 0 

118 

119 if remainingBytes: 

120 rv.append(self._readStream(remainingBytes)) 

121 

122 return b"".join(rv) 

123 

124 

125def HTMLInputStream(source, **kwargs): 

126 # Work around Python bug #20007: read(0) closes the connection. 

127 # http://bugs.python.org/issue20007 

128 if (isinstance(source, http_client.HTTPResponse) or 

129 # Also check for addinfourl wrapping HTTPResponse 

130 (isinstance(source, urllib.response.addbase) and 

131 isinstance(source.fp, http_client.HTTPResponse))): 

132 isUnicode = False 

133 elif hasattr(source, "read"): 

134 isUnicode = isinstance(source.read(0), text_type) 

135 else: 

136 isUnicode = isinstance(source, text_type) 

137 

138 if isUnicode: 

139 encodings = [x for x in kwargs if x.endswith("_encoding")] 

140 if encodings: 

141 raise TypeError("Cannot set an encoding with a unicode input, set %r" % encodings) 

142 

143 return HTMLUnicodeInputStream(source, **kwargs) 

144 else: 

145 return HTMLBinaryInputStream(source, **kwargs) 

146 

147 

148class HTMLUnicodeInputStream(object): 

149 """Provides a unicode stream of characters to the HTMLTokenizer. 

150 

151 This class takes care of character encoding and removing or replacing 

152 incorrect byte-sequences and also provides column and line tracking. 

153 

154 """ 

155 

156 _defaultChunkSize = 10240 

157 

158 def __init__(self, source): 

159 """Initialises the HTMLInputStream. 

160 

161 HTMLInputStream(source, [encoding]) -> Normalized stream from source 

162 for use by html5lib. 

163 

164 source can be either a file-object, local filename or a string. 

165 

166 The optional encoding parameter must be a string that indicates 

167 the encoding. If specified, that encoding will be used, 

168 regardless of any BOM or later declaration (such as in a meta 

169 element) 

170 

171 """ 

172 

173 if not _utils.supports_lone_surrogates: 

174 # Such platforms will have already checked for such 

175 # surrogate errors, so no need to do this checking. 

176 self.reportCharacterErrors = None 

177 elif len("\U0010FFFF") == 1: 

178 self.reportCharacterErrors = self.characterErrorsUCS4 

179 else: 

180 self.reportCharacterErrors = self.characterErrorsUCS2 

181 

182 # List of where new lines occur 

183 self.newLines = [0] 

184 

185 self.charEncoding = (lookupEncoding("utf-8"), "certain") 

186 self.dataStream = self.openStream(source) 

187 

188 self.reset() 

189 

190 def reset(self): 

191 self.chunk = "" 

192 self.chunkSize = 0 

193 self.chunkOffset = 0 

194 self.errors = [] 

195 

196 # number of (complete) lines in previous chunks 

197 self.prevNumLines = 0 

198 # number of columns in the last line of the previous chunk 

199 self.prevNumCols = 0 

200 

201 # Deal with CR LF and surrogates split over chunk boundaries 

202 self._bufferedCharacter = None 

203 

204 def openStream(self, source): 

205 """Produces a file object from source. 

206 

207 source can be either a file object, local filename or a string. 

208 

209 """ 

210 # Already a file object 

211 if hasattr(source, 'read'): 

212 stream = source 

213 else: 

214 stream = StringIO(source) 

215 

216 return stream 

217 

218 def _position(self, offset): 

219 chunk = self.chunk 

220 nLines = chunk.count('\n', 0, offset) 

221 positionLine = self.prevNumLines + nLines 

222 lastLinePos = chunk.rfind('\n', 0, offset) 

223 if lastLinePos == -1: 

224 positionColumn = self.prevNumCols + offset 

225 else: 

226 positionColumn = offset - (lastLinePos + 1) 

227 return (positionLine, positionColumn) 

228 

229 def position(self): 

230 """Returns (line, col) of the current position in the stream.""" 

231 line, col = self._position(self.chunkOffset) 

232 return (line + 1, col) 

233 

234 def char(self): 

235 """ Read one character from the stream or queue if available. Return 

236 EOF when EOF is reached. 

237 """ 

238 # Read a new chunk from the input stream if necessary 

239 if self.chunkOffset >= self.chunkSize: 

240 if not self.readChunk(): 

241 return EOF 

242 

243 chunkOffset = self.chunkOffset 

244 char = self.chunk[chunkOffset] 

245 self.chunkOffset = chunkOffset + 1 

246 

247 return char 

248 

249 def readChunk(self, chunkSize=None): 

250 if chunkSize is None: 

251 chunkSize = self._defaultChunkSize 

252 

253 self.prevNumLines, self.prevNumCols = self._position(self.chunkSize) 

254 

255 self.chunk = "" 

256 self.chunkSize = 0 

257 self.chunkOffset = 0 

258 

259 data = self.dataStream.read(chunkSize) 

260 

261 # Deal with CR LF and surrogates broken across chunks 

262 if self._bufferedCharacter: 

263 data = self._bufferedCharacter + data 

264 self._bufferedCharacter = None 

265 elif not data: 

266 # We have no more data, bye-bye stream 

267 return False 

268 

269 if len(data) > 1: 

270 lastv = ord(data[-1]) 

271 if lastv == 0x0D or 0xD800 <= lastv <= 0xDBFF: 

272 self._bufferedCharacter = data[-1] 

273 data = data[:-1] 

274 

275 if self.reportCharacterErrors: 

276 self.reportCharacterErrors(data) 

277 

278 # Replace invalid characters 

279 data = data.replace("\r\n", "\n") 

280 data = data.replace("\r", "\n") 

281 

282 self.chunk = data 

283 self.chunkSize = len(data) 

284 

285 return True 

286 

287 def characterErrorsUCS4(self, data): 

288 for _ in range(len(invalid_unicode_re.findall(data))): 

289 self.errors.append("invalid-codepoint") 

290 

291 def characterErrorsUCS2(self, data): 

292 # Someone picked the wrong compile option 

293 # You lose 

294 skip = False 

295 for match in invalid_unicode_re.finditer(data): 

296 if skip: 

297 continue 

298 codepoint = ord(match.group()) 

299 pos = match.start() 

300 # Pretty sure there should be endianness issues here 

301 if _utils.isSurrogatePair(data[pos:pos + 2]): 

302 # We have a surrogate pair! 

303 char_val = _utils.surrogatePairToCodepoint(data[pos:pos + 2]) 

304 if char_val in non_bmp_invalid_codepoints: 

305 self.errors.append("invalid-codepoint") 

306 skip = True 

307 elif (codepoint >= 0xD800 and codepoint <= 0xDFFF and 

308 pos == len(data) - 1): 

309 self.errors.append("invalid-codepoint") 

310 else: 

311 skip = False 

312 self.errors.append("invalid-codepoint") 

313 

314 def charsUntil(self, characters, opposite=False): 

315 """ Returns a string of characters from the stream up to but not 

316 including any character in 'characters' or EOF. 'characters' must be 

317 a container that supports the 'in' method and iteration over its 

318 characters. 

319 """ 

320 

321 # Use a cache of regexps to find the required characters 

322 try: 

323 chars = charsUntilRegEx[(characters, opposite)] 

324 except KeyError: 

325 if __debug__: 

326 for c in characters: 

327 assert(ord(c) < 128) 

328 regex = "".join(["\\x%02x" % ord(c) for c in characters]) 

329 if not opposite: 

330 regex = "^%s" % regex 

331 chars = charsUntilRegEx[(characters, opposite)] = re.compile("[%s]+" % regex) 

332 

333 rv = [] 

334 

335 while True: 

336 # Find the longest matching prefix 

337 m = chars.match(self.chunk, self.chunkOffset) 

338 if m is None: 

339 # If nothing matched, and it wasn't because we ran out of chunk, 

340 # then stop 

341 if self.chunkOffset != self.chunkSize: 

342 break 

343 else: 

344 end = m.end() 

345 # If not the whole chunk matched, return everything 

346 # up to the part that didn't match 

347 if end != self.chunkSize: 

348 rv.append(self.chunk[self.chunkOffset:end]) 

349 self.chunkOffset = end 

350 break 

351 # If the whole remainder of the chunk matched, 

352 # use it all and read the next chunk 

353 rv.append(self.chunk[self.chunkOffset:]) 

354 if not self.readChunk(): 

355 # Reached EOF 

356 break 

357 

358 r = "".join(rv) 

359 return r 

360 

361 def unget(self, char): 

362 # Only one character is allowed to be ungotten at once - it must 

363 # be consumed again before any further call to unget 

364 if char is not EOF: 

365 if self.chunkOffset == 0: 

366 # unget is called quite rarely, so it's a good idea to do 

367 # more work here if it saves a bit of work in the frequently 

368 # called char and charsUntil. 

369 # So, just prepend the ungotten character onto the current 

370 # chunk: 

371 self.chunk = char + self.chunk 

372 self.chunkSize += 1 

373 else: 

374 self.chunkOffset -= 1 

375 assert self.chunk[self.chunkOffset] == char 

376 

377 

378class HTMLBinaryInputStream(HTMLUnicodeInputStream): 

379 """Provides a unicode stream of characters to the HTMLTokenizer. 

380 

381 This class takes care of character encoding and removing or replacing 

382 incorrect byte-sequences and also provides column and line tracking. 

383 

384 """ 

385 

386 def __init__(self, source, override_encoding=None, transport_encoding=None, 

387 same_origin_parent_encoding=None, likely_encoding=None, 

388 default_encoding="windows-1252", useChardet=True): 

389 """Initialises the HTMLInputStream. 

390 

391 HTMLInputStream(source, [encoding]) -> Normalized stream from source 

392 for use by html5lib. 

393 

394 source can be either a file-object, local filename or a string. 

395 

396 The optional encoding parameter must be a string that indicates 

397 the encoding. If specified, that encoding will be used, 

398 regardless of any BOM or later declaration (such as in a meta 

399 element) 

400 

401 """ 

402 # Raw Stream - for unicode objects this will encode to utf-8 and set 

403 # self.charEncoding as appropriate 

404 self.rawStream = self.openStream(source) 

405 

406 HTMLUnicodeInputStream.__init__(self, self.rawStream) 

407 

408 # Encoding Information 

409 # Number of bytes to use when looking for a meta element with 

410 # encoding information 

411 self.numBytesMeta = 1024 

412 # Number of bytes to use when using detecting encoding using chardet 

413 self.numBytesChardet = 100 

414 # Things from args 

415 self.override_encoding = override_encoding 

416 self.transport_encoding = transport_encoding 

417 self.same_origin_parent_encoding = same_origin_parent_encoding 

418 self.likely_encoding = likely_encoding 

419 self.default_encoding = default_encoding 

420 

421 # Determine encoding 

422 self.charEncoding = self.determineEncoding(useChardet) 

423 assert self.charEncoding[0] is not None 

424 

425 # Call superclass 

426 self.reset() 

427 

428 def reset(self): 

429 self.dataStream = self.charEncoding[0].codec_info.streamreader(self.rawStream, 'replace') 

430 HTMLUnicodeInputStream.reset(self) 

431 

432 def openStream(self, source): 

433 """Produces a file object from source. 

434 

435 source can be either a file object, local filename or a string. 

436 

437 """ 

438 # Already a file object 

439 if hasattr(source, 'read'): 

440 stream = source 

441 else: 

442 stream = BytesIO(source) 

443 

444 try: 

445 stream.seek(stream.tell()) 

446 except Exception: 

447 stream = BufferedStream(stream) 

448 

449 return stream 

450 

451 def determineEncoding(self, chardet=True): 

452 # BOMs take precedence over everything 

453 # This will also read past the BOM if present 

454 charEncoding = self.detectBOM(), "certain" 

455 if charEncoding[0] is not None: 

456 return charEncoding 

457 

458 # If we've been overridden, we've been overridden 

459 charEncoding = lookupEncoding(self.override_encoding), "certain" 

460 if charEncoding[0] is not None: 

461 return charEncoding 

462 

463 # Now check the transport layer 

464 charEncoding = lookupEncoding(self.transport_encoding), "certain" 

465 if charEncoding[0] is not None: 

466 return charEncoding 

467 

468 # Look for meta elements with encoding information 

469 charEncoding = self.detectEncodingMeta(), "tentative" 

470 if charEncoding[0] is not None: 

471 return charEncoding 

472 

473 # Parent document encoding 

474 charEncoding = lookupEncoding(self.same_origin_parent_encoding), "tentative" 

475 if charEncoding[0] is not None and not charEncoding[0].name.startswith("utf-16"): 

476 return charEncoding 

477 

478 # "likely" encoding 

479 charEncoding = lookupEncoding(self.likely_encoding), "tentative" 

480 if charEncoding[0] is not None: 

481 return charEncoding 

482 

483 # Guess with chardet, if available 

484 if chardet: 

485 try: 

486 from chardet.universaldetector import UniversalDetector 

487 except ImportError: 

488 pass 

489 else: 

490 buffers = [] 

491 detector = UniversalDetector() 

492 while not detector.done: 

493 buffer = self.rawStream.read(self.numBytesChardet) 

494 assert isinstance(buffer, bytes) 

495 if not buffer: 

496 break 

497 buffers.append(buffer) 

498 detector.feed(buffer) 

499 detector.close() 

500 encoding = lookupEncoding(detector.result['encoding']) 

501 self.rawStream.seek(0) 

502 if encoding is not None: 

503 return encoding, "tentative" 

504 

505 # Try the default encoding 

506 charEncoding = lookupEncoding(self.default_encoding), "tentative" 

507 if charEncoding[0] is not None: 

508 return charEncoding 

509 

510 # Fallback to html5lib's default if even that hasn't worked 

511 return lookupEncoding("windows-1252"), "tentative" 

512 

513 def changeEncoding(self, newEncoding): 

514 assert self.charEncoding[1] != "certain" 

515 newEncoding = lookupEncoding(newEncoding) 

516 if newEncoding is None: 

517 return 

518 if newEncoding.name in ("utf-16be", "utf-16le"): 

519 newEncoding = lookupEncoding("utf-8") 

520 assert newEncoding is not None 

521 elif newEncoding == self.charEncoding[0]: 

522 self.charEncoding = (self.charEncoding[0], "certain") 

523 else: 

524 self.rawStream.seek(0) 

525 self.charEncoding = (newEncoding, "certain") 

526 self.reset() 

527 raise _ReparseException("Encoding changed from %s to %s" % (self.charEncoding[0], newEncoding)) 

528 

529 def detectBOM(self): 

530 """Attempts to detect at BOM at the start of the stream. If 

531 an encoding can be determined from the BOM return the name of the 

532 encoding otherwise return None""" 

533 bomDict = { 

534 codecs.BOM_UTF8: 'utf-8', 

535 codecs.BOM_UTF16_LE: 'utf-16le', codecs.BOM_UTF16_BE: 'utf-16be', 

536 codecs.BOM_UTF32_LE: 'utf-32le', codecs.BOM_UTF32_BE: 'utf-32be' 

537 } 

538 

539 # Go to beginning of file and read in 4 bytes 

540 string = self.rawStream.read(4) 

541 assert isinstance(string, bytes) 

542 

543 # Try detecting the BOM using bytes from the string 

544 encoding = bomDict.get(string[:3]) # UTF-8 

545 seek = 3 

546 if not encoding: 

547 # Need to detect UTF-32 before UTF-16 

548 encoding = bomDict.get(string) # UTF-32 

549 seek = 4 

550 if not encoding: 

551 encoding = bomDict.get(string[:2]) # UTF-16 

552 seek = 2 

553 

554 # Set the read position past the BOM if one was found, otherwise 

555 # set it to the start of the stream 

556 if encoding: 

557 self.rawStream.seek(seek) 

558 return lookupEncoding(encoding) 

559 else: 

560 self.rawStream.seek(0) 

561 return None 

562 

563 def detectEncodingMeta(self): 

564 """Report the encoding declared by the meta element 

565 """ 

566 buffer = self.rawStream.read(self.numBytesMeta) 

567 assert isinstance(buffer, bytes) 

568 parser = EncodingParser(buffer) 

569 self.rawStream.seek(0) 

570 encoding = parser.getEncoding() 

571 

572 if encoding is not None and encoding.name in ("utf-16be", "utf-16le"): 

573 encoding = lookupEncoding("utf-8") 

574 

575 return encoding 

576 

577 

578class EncodingBytes(bytes): 

579 """String-like object with an associated position and various extra methods 

580 If the position is ever greater than the string length then an exception is 

581 raised""" 

582 def __new__(self, value): 

583 assert isinstance(value, bytes) 

584 return bytes.__new__(self, value.lower()) 

585 

586 def __init__(self, value): 

587 # pylint:disable=unused-argument 

588 self._position = -1 

589 

590 def __iter__(self): 

591 return self 

592 

593 def __next__(self): 

594 p = self._position = self._position + 1 

595 if p >= len(self): 

596 raise StopIteration 

597 elif p < 0: 

598 raise TypeError 

599 return self[p:p + 1] 

600 

601 def next(self): 

602 # Py2 compat 

603 return self.__next__() 

604 

605 def previous(self): 

606 p = self._position 

607 if p >= len(self): 

608 raise StopIteration 

609 elif p < 0: 

610 raise TypeError 

611 self._position = p = p - 1 

612 return self[p:p + 1] 

613 

614 def setPosition(self, position): 

615 if self._position >= len(self): 

616 raise StopIteration 

617 self._position = position 

618 

619 def getPosition(self): 

620 if self._position >= len(self): 

621 raise StopIteration 

622 if self._position >= 0: 

623 return self._position 

624 else: 

625 return None 

626 

627 position = property(getPosition, setPosition) 

628 

629 def getCurrentByte(self): 

630 return self[self.position:self.position + 1] 

631 

632 currentByte = property(getCurrentByte) 

633 

634 def skip(self, chars=spaceCharactersBytes): 

635 """Skip past a list of characters""" 

636 p = self.position # use property for the error-checking 

637 while p < len(self): 

638 c = self[p:p + 1] 

639 if c not in chars: 

640 self._position = p 

641 return c 

642 p += 1 

643 self._position = p 

644 return None 

645 

646 def skipUntil(self, chars): 

647 p = self.position 

648 while p < len(self): 

649 c = self[p:p + 1] 

650 if c in chars: 

651 self._position = p 

652 return c 

653 p += 1 

654 self._position = p 

655 return None 

656 

657 def matchBytes(self, bytes): 

658 """Look for a sequence of bytes at the start of a string. If the bytes 

659 are found return True and advance the position to the byte after the 

660 match. Otherwise return False and leave the position alone""" 

661 rv = self.startswith(bytes, self.position) 

662 if rv: 

663 self.position += len(bytes) 

664 return rv 

665 

666 def jumpTo(self, bytes): 

667 """Look for the next sequence of bytes matching a given sequence. If 

668 a match is found advance the position to the last byte of the match""" 

669 try: 

670 self._position = self.index(bytes, self.position) + len(bytes) - 1 

671 except ValueError: 

672 raise StopIteration 

673 return True 

674 

675 

676class EncodingParser(object): 

677 """Mini parser for detecting character encoding from meta elements""" 

678 

679 def __init__(self, data): 

680 """string - the data to work on for encoding detection""" 

681 self.data = EncodingBytes(data) 

682 self.encoding = None 

683 

684 def getEncoding(self): 

685 if b"<meta" not in self.data: 

686 return None 

687 

688 methodDispatch = ( 

689 (b"<!--", self.handleComment), 

690 (b"<meta", self.handleMeta), 

691 (b"</", self.handlePossibleEndTag), 

692 (b"<!", self.handleOther), 

693 (b"<?", self.handleOther), 

694 (b"<", self.handlePossibleStartTag)) 

695 for _ in self.data: 

696 keepParsing = True 

697 try: 

698 self.data.jumpTo(b"<") 

699 except StopIteration: 

700 break 

701 for key, method in methodDispatch: 

702 if self.data.matchBytes(key): 

703 try: 

704 keepParsing = method() 

705 break 

706 except StopIteration: 

707 keepParsing = False 

708 break 

709 if not keepParsing: 

710 break 

711 

712 return self.encoding 

713 

714 def handleComment(self): 

715 """Skip over comments""" 

716 return self.data.jumpTo(b"-->") 

717 

718 def handleMeta(self): 

719 if self.data.currentByte not in spaceCharactersBytes: 

720 # if we have <meta not followed by a space so just keep going 

721 return True 

722 # We have a valid meta element we want to search for attributes 

723 hasPragma = False 

724 pendingEncoding = None 

725 while True: 

726 # Try to find the next attribute after the current position 

727 attr = self.getAttribute() 

728 if attr is None: 

729 return True 

730 else: 

731 if attr[0] == b"http-equiv": 

732 hasPragma = attr[1] == b"content-type" 

733 if hasPragma and pendingEncoding is not None: 

734 self.encoding = pendingEncoding 

735 return False 

736 elif attr[0] == b"charset": 

737 tentativeEncoding = attr[1] 

738 codec = lookupEncoding(tentativeEncoding) 

739 if codec is not None: 

740 self.encoding = codec 

741 return False 

742 elif attr[0] == b"content": 

743 contentParser = ContentAttrParser(EncodingBytes(attr[1])) 

744 tentativeEncoding = contentParser.parse() 

745 if tentativeEncoding is not None: 

746 codec = lookupEncoding(tentativeEncoding) 

747 if codec is not None: 

748 if hasPragma: 

749 self.encoding = codec 

750 return False 

751 else: 

752 pendingEncoding = codec 

753 

754 def handlePossibleStartTag(self): 

755 return self.handlePossibleTag(False) 

756 

757 def handlePossibleEndTag(self): 

758 next(self.data) 

759 return self.handlePossibleTag(True) 

760 

761 def handlePossibleTag(self, endTag): 

762 data = self.data 

763 if data.currentByte not in asciiLettersBytes: 

764 # If the next byte is not an ascii letter either ignore this 

765 # fragment (possible start tag case) or treat it according to 

766 # handleOther 

767 if endTag: 

768 data.previous() 

769 self.handleOther() 

770 return True 

771 

772 c = data.skipUntil(spacesAngleBrackets) 

773 if c == b"<": 

774 # return to the first step in the overall "two step" algorithm 

775 # reprocessing the < byte 

776 data.previous() 

777 else: 

778 # Read all attributes 

779 attr = self.getAttribute() 

780 while attr is not None: 

781 attr = self.getAttribute() 

782 return True 

783 

784 def handleOther(self): 

785 return self.data.jumpTo(b">") 

786 

787 def getAttribute(self): 

788 """Return a name,value pair for the next attribute in the stream, 

789 if one is found, or None""" 

790 data = self.data 

791 # Step 1 (skip chars) 

792 c = data.skip(spaceCharactersBytes | frozenset([b"/"])) 

793 assert c is None or len(c) == 1 

794 # Step 2 

795 if c in (b">", None): 

796 return None 

797 # Step 3 

798 attrName = [] 

799 attrValue = [] 

800 # Step 4 attribute name 

801 while True: 

802 if c == b"=" and attrName: 

803 break 

804 elif c in spaceCharactersBytes: 

805 # Step 6! 

806 c = data.skip() 

807 break 

808 elif c in (b"/", b">"): 

809 return b"".join(attrName), b"" 

810 elif c in asciiUppercaseBytes: 

811 attrName.append(c.lower()) 

812 elif c is None: 

813 return None 

814 else: 

815 attrName.append(c) 

816 # Step 5 

817 c = next(data) 

818 # Step 7 

819 if c != b"=": 

820 data.previous() 

821 return b"".join(attrName), b"" 

822 # Step 8 

823 next(data) 

824 # Step 9 

825 c = data.skip() 

826 # Step 10 

827 if c in (b"'", b'"'): 

828 # 10.1 

829 quoteChar = c 

830 while True: 

831 # 10.2 

832 c = next(data) 

833 # 10.3 

834 if c == quoteChar: 

835 next(data) 

836 return b"".join(attrName), b"".join(attrValue) 

837 # 10.4 

838 elif c in asciiUppercaseBytes: 

839 attrValue.append(c.lower()) 

840 # 10.5 

841 else: 

842 attrValue.append(c) 

843 elif c == b">": 

844 return b"".join(attrName), b"" 

845 elif c in asciiUppercaseBytes: 

846 attrValue.append(c.lower()) 

847 elif c is None: 

848 return None 

849 else: 

850 attrValue.append(c) 

851 # Step 11 

852 while True: 

853 c = next(data) 

854 if c in spacesAngleBrackets: 

855 return b"".join(attrName), b"".join(attrValue) 

856 elif c in asciiUppercaseBytes: 

857 attrValue.append(c.lower()) 

858 elif c is None: 

859 return None 

860 else: 

861 attrValue.append(c) 

862 

863 

864class ContentAttrParser(object): 

865 def __init__(self, data): 

866 assert isinstance(data, bytes) 

867 self.data = data 

868 

869 def parse(self): 

870 try: 

871 # Check if the attr name is charset 

872 # otherwise return 

873 self.data.jumpTo(b"charset") 

874 self.data.position += 1 

875 self.data.skip() 

876 if not self.data.currentByte == b"=": 

877 # If there is no = sign keep looking for attrs 

878 return None 

879 self.data.position += 1 

880 self.data.skip() 

881 # Look for an encoding between matching quote marks 

882 if self.data.currentByte in (b'"', b"'"): 

883 quoteMark = self.data.currentByte 

884 self.data.position += 1 

885 oldPosition = self.data.position 

886 if self.data.jumpTo(quoteMark): 

887 return self.data[oldPosition:self.data.position] 

888 else: 

889 return None 

890 else: 

891 # Unquoted value 

892 oldPosition = self.data.position 

893 try: 

894 self.data.skipUntil(spaceCharactersBytes) 

895 return self.data[oldPosition:self.data.position] 

896 except StopIteration: 

897 # Return the whole remaining value 

898 return self.data[oldPosition:] 

899 except StopIteration: 

900 return None 

901 

902 

903def lookupEncoding(encoding): 

904 """Return the python codec name corresponding to an encoding or None if the 

905 string doesn't correspond to a valid encoding.""" 

906 if isinstance(encoding, bytes): 

907 try: 

908 encoding = encoding.decode("ascii") 

909 except UnicodeDecodeError: 

910 return None 

911 

912 if encoding is not None: 

913 try: 

914 return webencodings.lookup(encoding) 

915 except AttributeError: 

916 return None 

917 else: 

918 return None