Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/bleach/_vendor/html5lib/_inputstream.py: 15%
553 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-07-01 06:54 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-07-01 06:54 +0000
1from __future__ import absolute_import, division, unicode_literals
3from six import text_type
4from six.moves import http_client, urllib
6import codecs
7import re
8from io import BytesIO, StringIO
10import webencodings
12from .constants import EOF, spaceCharacters, asciiLetters, asciiUppercase
13from .constants import _ReparseException
14from . import _utils
16# Non-unicode versions of constants for use in the pre-parser
17spaceCharactersBytes = frozenset([item.encode("ascii") for item in spaceCharacters])
18asciiLettersBytes = frozenset([item.encode("ascii") for item in asciiLetters])
19asciiUppercaseBytes = frozenset([item.encode("ascii") for item in asciiUppercase])
20spacesAngleBrackets = spaceCharactersBytes | frozenset([b">", b"<"])
23invalid_unicode_no_surrogate = "[\u0001-\u0008\u000B\u000E-\u001F\u007F-\u009F\uFDD0-\uFDEF\uFFFE\uFFFF\U0001FFFE\U0001FFFF\U0002FFFE\U0002FFFF\U0003FFFE\U0003FFFF\U0004FFFE\U0004FFFF\U0005FFFE\U0005FFFF\U0006FFFE\U0006FFFF\U0007FFFE\U0007FFFF\U0008FFFE\U0008FFFF\U0009FFFE\U0009FFFF\U000AFFFE\U000AFFFF\U000BFFFE\U000BFFFF\U000CFFFE\U000CFFFF\U000DFFFE\U000DFFFF\U000EFFFE\U000EFFFF\U000FFFFE\U000FFFFF\U0010FFFE\U0010FFFF]" # noqa
25if _utils.supports_lone_surrogates:
26 # Use one extra step of indirection and create surrogates with
27 # eval. Not using this indirection would introduce an illegal
28 # unicode literal on platforms not supporting such lone
29 # surrogates.
30 assert invalid_unicode_no_surrogate[-1] == "]" and invalid_unicode_no_surrogate.count("]") == 1
31 invalid_unicode_re = re.compile(invalid_unicode_no_surrogate[:-1] +
32 eval('"\\uD800-\\uDFFF"') + # pylint:disable=eval-used
33 "]")
34else:
35 invalid_unicode_re = re.compile(invalid_unicode_no_surrogate)
37non_bmp_invalid_codepoints = {0x1FFFE, 0x1FFFF, 0x2FFFE, 0x2FFFF, 0x3FFFE,
38 0x3FFFF, 0x4FFFE, 0x4FFFF, 0x5FFFE, 0x5FFFF,
39 0x6FFFE, 0x6FFFF, 0x7FFFE, 0x7FFFF, 0x8FFFE,
40 0x8FFFF, 0x9FFFE, 0x9FFFF, 0xAFFFE, 0xAFFFF,
41 0xBFFFE, 0xBFFFF, 0xCFFFE, 0xCFFFF, 0xDFFFE,
42 0xDFFFF, 0xEFFFE, 0xEFFFF, 0xFFFFE, 0xFFFFF,
43 0x10FFFE, 0x10FFFF}
45ascii_punctuation_re = re.compile("[\u0009-\u000D\u0020-\u002F\u003A-\u0040\u005C\u005B-\u0060\u007B-\u007E]")
47# Cache for charsUntil()
48charsUntilRegEx = {}
51class BufferedStream(object):
52 """Buffering for streams that do not have buffering of their own
54 The buffer is implemented as a list of chunks on the assumption that
55 joining many strings will be slow since it is O(n**2)
56 """
58 def __init__(self, stream):
59 self.stream = stream
60 self.buffer = []
61 self.position = [-1, 0] # chunk number, offset
63 def tell(self):
64 pos = 0
65 for chunk in self.buffer[:self.position[0]]:
66 pos += len(chunk)
67 pos += self.position[1]
68 return pos
70 def seek(self, pos):
71 assert pos <= self._bufferedBytes()
72 offset = pos
73 i = 0
74 while len(self.buffer[i]) < offset:
75 offset -= len(self.buffer[i])
76 i += 1
77 self.position = [i, offset]
79 def read(self, bytes):
80 if not self.buffer:
81 return self._readStream(bytes)
82 elif (self.position[0] == len(self.buffer) and
83 self.position[1] == len(self.buffer[-1])):
84 return self._readStream(bytes)
85 else:
86 return self._readFromBuffer(bytes)
88 def _bufferedBytes(self):
89 return sum([len(item) for item in self.buffer])
91 def _readStream(self, bytes):
92 data = self.stream.read(bytes)
93 self.buffer.append(data)
94 self.position[0] += 1
95 self.position[1] = len(data)
96 return data
98 def _readFromBuffer(self, bytes):
99 remainingBytes = bytes
100 rv = []
101 bufferIndex = self.position[0]
102 bufferOffset = self.position[1]
103 while bufferIndex < len(self.buffer) and remainingBytes != 0:
104 assert remainingBytes > 0
105 bufferedData = self.buffer[bufferIndex]
107 if remainingBytes <= len(bufferedData) - bufferOffset:
108 bytesToRead = remainingBytes
109 self.position = [bufferIndex, bufferOffset + bytesToRead]
110 else:
111 bytesToRead = len(bufferedData) - bufferOffset
112 self.position = [bufferIndex, len(bufferedData)]
113 bufferIndex += 1
114 rv.append(bufferedData[bufferOffset:bufferOffset + bytesToRead])
115 remainingBytes -= bytesToRead
117 bufferOffset = 0
119 if remainingBytes:
120 rv.append(self._readStream(remainingBytes))
122 return b"".join(rv)
125def HTMLInputStream(source, **kwargs):
126 # Work around Python bug #20007: read(0) closes the connection.
127 # http://bugs.python.org/issue20007
128 if (isinstance(source, http_client.HTTPResponse) or
129 # Also check for addinfourl wrapping HTTPResponse
130 (isinstance(source, urllib.response.addbase) and
131 isinstance(source.fp, http_client.HTTPResponse))):
132 isUnicode = False
133 elif hasattr(source, "read"):
134 isUnicode = isinstance(source.read(0), text_type)
135 else:
136 isUnicode = isinstance(source, text_type)
138 if isUnicode:
139 encodings = [x for x in kwargs if x.endswith("_encoding")]
140 if encodings:
141 raise TypeError("Cannot set an encoding with a unicode input, set %r" % encodings)
143 return HTMLUnicodeInputStream(source, **kwargs)
144 else:
145 return HTMLBinaryInputStream(source, **kwargs)
148class HTMLUnicodeInputStream(object):
149 """Provides a unicode stream of characters to the HTMLTokenizer.
151 This class takes care of character encoding and removing or replacing
152 incorrect byte-sequences and also provides column and line tracking.
154 """
156 _defaultChunkSize = 10240
158 def __init__(self, source):
159 """Initialises the HTMLInputStream.
161 HTMLInputStream(source, [encoding]) -> Normalized stream from source
162 for use by html5lib.
164 source can be either a file-object, local filename or a string.
166 The optional encoding parameter must be a string that indicates
167 the encoding. If specified, that encoding will be used,
168 regardless of any BOM or later declaration (such as in a meta
169 element)
171 """
173 if not _utils.supports_lone_surrogates:
174 # Such platforms will have already checked for such
175 # surrogate errors, so no need to do this checking.
176 self.reportCharacterErrors = None
177 elif len("\U0010FFFF") == 1:
178 self.reportCharacterErrors = self.characterErrorsUCS4
179 else:
180 self.reportCharacterErrors = self.characterErrorsUCS2
182 # List of where new lines occur
183 self.newLines = [0]
185 self.charEncoding = (lookupEncoding("utf-8"), "certain")
186 self.dataStream = self.openStream(source)
188 self.reset()
190 def reset(self):
191 self.chunk = ""
192 self.chunkSize = 0
193 self.chunkOffset = 0
194 self.errors = []
196 # number of (complete) lines in previous chunks
197 self.prevNumLines = 0
198 # number of columns in the last line of the previous chunk
199 self.prevNumCols = 0
201 # Deal with CR LF and surrogates split over chunk boundaries
202 self._bufferedCharacter = None
204 def openStream(self, source):
205 """Produces a file object from source.
207 source can be either a file object, local filename or a string.
209 """
210 # Already a file object
211 if hasattr(source, 'read'):
212 stream = source
213 else:
214 stream = StringIO(source)
216 return stream
218 def _position(self, offset):
219 chunk = self.chunk
220 nLines = chunk.count('\n', 0, offset)
221 positionLine = self.prevNumLines + nLines
222 lastLinePos = chunk.rfind('\n', 0, offset)
223 if lastLinePos == -1:
224 positionColumn = self.prevNumCols + offset
225 else:
226 positionColumn = offset - (lastLinePos + 1)
227 return (positionLine, positionColumn)
229 def position(self):
230 """Returns (line, col) of the current position in the stream."""
231 line, col = self._position(self.chunkOffset)
232 return (line + 1, col)
234 def char(self):
235 """ Read one character from the stream or queue if available. Return
236 EOF when EOF is reached.
237 """
238 # Read a new chunk from the input stream if necessary
239 if self.chunkOffset >= self.chunkSize:
240 if not self.readChunk():
241 return EOF
243 chunkOffset = self.chunkOffset
244 char = self.chunk[chunkOffset]
245 self.chunkOffset = chunkOffset + 1
247 return char
249 def readChunk(self, chunkSize=None):
250 if chunkSize is None:
251 chunkSize = self._defaultChunkSize
253 self.prevNumLines, self.prevNumCols = self._position(self.chunkSize)
255 self.chunk = ""
256 self.chunkSize = 0
257 self.chunkOffset = 0
259 data = self.dataStream.read(chunkSize)
261 # Deal with CR LF and surrogates broken across chunks
262 if self._bufferedCharacter:
263 data = self._bufferedCharacter + data
264 self._bufferedCharacter = None
265 elif not data:
266 # We have no more data, bye-bye stream
267 return False
269 if len(data) > 1:
270 lastv = ord(data[-1])
271 if lastv == 0x0D or 0xD800 <= lastv <= 0xDBFF:
272 self._bufferedCharacter = data[-1]
273 data = data[:-1]
275 if self.reportCharacterErrors:
276 self.reportCharacterErrors(data)
278 # Replace invalid characters
279 data = data.replace("\r\n", "\n")
280 data = data.replace("\r", "\n")
282 self.chunk = data
283 self.chunkSize = len(data)
285 return True
287 def characterErrorsUCS4(self, data):
288 for _ in range(len(invalid_unicode_re.findall(data))):
289 self.errors.append("invalid-codepoint")
291 def characterErrorsUCS2(self, data):
292 # Someone picked the wrong compile option
293 # You lose
294 skip = False
295 for match in invalid_unicode_re.finditer(data):
296 if skip:
297 continue
298 codepoint = ord(match.group())
299 pos = match.start()
300 # Pretty sure there should be endianness issues here
301 if _utils.isSurrogatePair(data[pos:pos + 2]):
302 # We have a surrogate pair!
303 char_val = _utils.surrogatePairToCodepoint(data[pos:pos + 2])
304 if char_val in non_bmp_invalid_codepoints:
305 self.errors.append("invalid-codepoint")
306 skip = True
307 elif (codepoint >= 0xD800 and codepoint <= 0xDFFF and
308 pos == len(data) - 1):
309 self.errors.append("invalid-codepoint")
310 else:
311 skip = False
312 self.errors.append("invalid-codepoint")
314 def charsUntil(self, characters, opposite=False):
315 """ Returns a string of characters from the stream up to but not
316 including any character in 'characters' or EOF. 'characters' must be
317 a container that supports the 'in' method and iteration over its
318 characters.
319 """
321 # Use a cache of regexps to find the required characters
322 try:
323 chars = charsUntilRegEx[(characters, opposite)]
324 except KeyError:
325 if __debug__:
326 for c in characters:
327 assert(ord(c) < 128)
328 regex = "".join(["\\x%02x" % ord(c) for c in characters])
329 if not opposite:
330 regex = "^%s" % regex
331 chars = charsUntilRegEx[(characters, opposite)] = re.compile("[%s]+" % regex)
333 rv = []
335 while True:
336 # Find the longest matching prefix
337 m = chars.match(self.chunk, self.chunkOffset)
338 if m is None:
339 # If nothing matched, and it wasn't because we ran out of chunk,
340 # then stop
341 if self.chunkOffset != self.chunkSize:
342 break
343 else:
344 end = m.end()
345 # If not the whole chunk matched, return everything
346 # up to the part that didn't match
347 if end != self.chunkSize:
348 rv.append(self.chunk[self.chunkOffset:end])
349 self.chunkOffset = end
350 break
351 # If the whole remainder of the chunk matched,
352 # use it all and read the next chunk
353 rv.append(self.chunk[self.chunkOffset:])
354 if not self.readChunk():
355 # Reached EOF
356 break
358 r = "".join(rv)
359 return r
361 def unget(self, char):
362 # Only one character is allowed to be ungotten at once - it must
363 # be consumed again before any further call to unget
364 if char is not EOF:
365 if self.chunkOffset == 0:
366 # unget is called quite rarely, so it's a good idea to do
367 # more work here if it saves a bit of work in the frequently
368 # called char and charsUntil.
369 # So, just prepend the ungotten character onto the current
370 # chunk:
371 self.chunk = char + self.chunk
372 self.chunkSize += 1
373 else:
374 self.chunkOffset -= 1
375 assert self.chunk[self.chunkOffset] == char
378class HTMLBinaryInputStream(HTMLUnicodeInputStream):
379 """Provides a unicode stream of characters to the HTMLTokenizer.
381 This class takes care of character encoding and removing or replacing
382 incorrect byte-sequences and also provides column and line tracking.
384 """
386 def __init__(self, source, override_encoding=None, transport_encoding=None,
387 same_origin_parent_encoding=None, likely_encoding=None,
388 default_encoding="windows-1252", useChardet=True):
389 """Initialises the HTMLInputStream.
391 HTMLInputStream(source, [encoding]) -> Normalized stream from source
392 for use by html5lib.
394 source can be either a file-object, local filename or a string.
396 The optional encoding parameter must be a string that indicates
397 the encoding. If specified, that encoding will be used,
398 regardless of any BOM or later declaration (such as in a meta
399 element)
401 """
402 # Raw Stream - for unicode objects this will encode to utf-8 and set
403 # self.charEncoding as appropriate
404 self.rawStream = self.openStream(source)
406 HTMLUnicodeInputStream.__init__(self, self.rawStream)
408 # Encoding Information
409 # Number of bytes to use when looking for a meta element with
410 # encoding information
411 self.numBytesMeta = 1024
412 # Number of bytes to use when using detecting encoding using chardet
413 self.numBytesChardet = 100
414 # Things from args
415 self.override_encoding = override_encoding
416 self.transport_encoding = transport_encoding
417 self.same_origin_parent_encoding = same_origin_parent_encoding
418 self.likely_encoding = likely_encoding
419 self.default_encoding = default_encoding
421 # Determine encoding
422 self.charEncoding = self.determineEncoding(useChardet)
423 assert self.charEncoding[0] is not None
425 # Call superclass
426 self.reset()
428 def reset(self):
429 self.dataStream = self.charEncoding[0].codec_info.streamreader(self.rawStream, 'replace')
430 HTMLUnicodeInputStream.reset(self)
432 def openStream(self, source):
433 """Produces a file object from source.
435 source can be either a file object, local filename or a string.
437 """
438 # Already a file object
439 if hasattr(source, 'read'):
440 stream = source
441 else:
442 stream = BytesIO(source)
444 try:
445 stream.seek(stream.tell())
446 except Exception:
447 stream = BufferedStream(stream)
449 return stream
451 def determineEncoding(self, chardet=True):
452 # BOMs take precedence over everything
453 # This will also read past the BOM if present
454 charEncoding = self.detectBOM(), "certain"
455 if charEncoding[0] is not None:
456 return charEncoding
458 # If we've been overridden, we've been overridden
459 charEncoding = lookupEncoding(self.override_encoding), "certain"
460 if charEncoding[0] is not None:
461 return charEncoding
463 # Now check the transport layer
464 charEncoding = lookupEncoding(self.transport_encoding), "certain"
465 if charEncoding[0] is not None:
466 return charEncoding
468 # Look for meta elements with encoding information
469 charEncoding = self.detectEncodingMeta(), "tentative"
470 if charEncoding[0] is not None:
471 return charEncoding
473 # Parent document encoding
474 charEncoding = lookupEncoding(self.same_origin_parent_encoding), "tentative"
475 if charEncoding[0] is not None and not charEncoding[0].name.startswith("utf-16"):
476 return charEncoding
478 # "likely" encoding
479 charEncoding = lookupEncoding(self.likely_encoding), "tentative"
480 if charEncoding[0] is not None:
481 return charEncoding
483 # Guess with chardet, if available
484 if chardet:
485 try:
486 from chardet.universaldetector import UniversalDetector
487 except ImportError:
488 pass
489 else:
490 buffers = []
491 detector = UniversalDetector()
492 while not detector.done:
493 buffer = self.rawStream.read(self.numBytesChardet)
494 assert isinstance(buffer, bytes)
495 if not buffer:
496 break
497 buffers.append(buffer)
498 detector.feed(buffer)
499 detector.close()
500 encoding = lookupEncoding(detector.result['encoding'])
501 self.rawStream.seek(0)
502 if encoding is not None:
503 return encoding, "tentative"
505 # Try the default encoding
506 charEncoding = lookupEncoding(self.default_encoding), "tentative"
507 if charEncoding[0] is not None:
508 return charEncoding
510 # Fallback to html5lib's default if even that hasn't worked
511 return lookupEncoding("windows-1252"), "tentative"
513 def changeEncoding(self, newEncoding):
514 assert self.charEncoding[1] != "certain"
515 newEncoding = lookupEncoding(newEncoding)
516 if newEncoding is None:
517 return
518 if newEncoding.name in ("utf-16be", "utf-16le"):
519 newEncoding = lookupEncoding("utf-8")
520 assert newEncoding is not None
521 elif newEncoding == self.charEncoding[0]:
522 self.charEncoding = (self.charEncoding[0], "certain")
523 else:
524 self.rawStream.seek(0)
525 self.charEncoding = (newEncoding, "certain")
526 self.reset()
527 raise _ReparseException("Encoding changed from %s to %s" % (self.charEncoding[0], newEncoding))
529 def detectBOM(self):
530 """Attempts to detect at BOM at the start of the stream. If
531 an encoding can be determined from the BOM return the name of the
532 encoding otherwise return None"""
533 bomDict = {
534 codecs.BOM_UTF8: 'utf-8',
535 codecs.BOM_UTF16_LE: 'utf-16le', codecs.BOM_UTF16_BE: 'utf-16be',
536 codecs.BOM_UTF32_LE: 'utf-32le', codecs.BOM_UTF32_BE: 'utf-32be'
537 }
539 # Go to beginning of file and read in 4 bytes
540 string = self.rawStream.read(4)
541 assert isinstance(string, bytes)
543 # Try detecting the BOM using bytes from the string
544 encoding = bomDict.get(string[:3]) # UTF-8
545 seek = 3
546 if not encoding:
547 # Need to detect UTF-32 before UTF-16
548 encoding = bomDict.get(string) # UTF-32
549 seek = 4
550 if not encoding:
551 encoding = bomDict.get(string[:2]) # UTF-16
552 seek = 2
554 # Set the read position past the BOM if one was found, otherwise
555 # set it to the start of the stream
556 if encoding:
557 self.rawStream.seek(seek)
558 return lookupEncoding(encoding)
559 else:
560 self.rawStream.seek(0)
561 return None
563 def detectEncodingMeta(self):
564 """Report the encoding declared by the meta element
565 """
566 buffer = self.rawStream.read(self.numBytesMeta)
567 assert isinstance(buffer, bytes)
568 parser = EncodingParser(buffer)
569 self.rawStream.seek(0)
570 encoding = parser.getEncoding()
572 if encoding is not None and encoding.name in ("utf-16be", "utf-16le"):
573 encoding = lookupEncoding("utf-8")
575 return encoding
578class EncodingBytes(bytes):
579 """String-like object with an associated position and various extra methods
580 If the position is ever greater than the string length then an exception is
581 raised"""
582 def __new__(self, value):
583 assert isinstance(value, bytes)
584 return bytes.__new__(self, value.lower())
586 def __init__(self, value):
587 # pylint:disable=unused-argument
588 self._position = -1
590 def __iter__(self):
591 return self
593 def __next__(self):
594 p = self._position = self._position + 1
595 if p >= len(self):
596 raise StopIteration
597 elif p < 0:
598 raise TypeError
599 return self[p:p + 1]
601 def next(self):
602 # Py2 compat
603 return self.__next__()
605 def previous(self):
606 p = self._position
607 if p >= len(self):
608 raise StopIteration
609 elif p < 0:
610 raise TypeError
611 self._position = p = p - 1
612 return self[p:p + 1]
614 def setPosition(self, position):
615 if self._position >= len(self):
616 raise StopIteration
617 self._position = position
619 def getPosition(self):
620 if self._position >= len(self):
621 raise StopIteration
622 if self._position >= 0:
623 return self._position
624 else:
625 return None
627 position = property(getPosition, setPosition)
629 def getCurrentByte(self):
630 return self[self.position:self.position + 1]
632 currentByte = property(getCurrentByte)
634 def skip(self, chars=spaceCharactersBytes):
635 """Skip past a list of characters"""
636 p = self.position # use property for the error-checking
637 while p < len(self):
638 c = self[p:p + 1]
639 if c not in chars:
640 self._position = p
641 return c
642 p += 1
643 self._position = p
644 return None
646 def skipUntil(self, chars):
647 p = self.position
648 while p < len(self):
649 c = self[p:p + 1]
650 if c in chars:
651 self._position = p
652 return c
653 p += 1
654 self._position = p
655 return None
657 def matchBytes(self, bytes):
658 """Look for a sequence of bytes at the start of a string. If the bytes
659 are found return True and advance the position to the byte after the
660 match. Otherwise return False and leave the position alone"""
661 rv = self.startswith(bytes, self.position)
662 if rv:
663 self.position += len(bytes)
664 return rv
666 def jumpTo(self, bytes):
667 """Look for the next sequence of bytes matching a given sequence. If
668 a match is found advance the position to the last byte of the match"""
669 try:
670 self._position = self.index(bytes, self.position) + len(bytes) - 1
671 except ValueError:
672 raise StopIteration
673 return True
676class EncodingParser(object):
677 """Mini parser for detecting character encoding from meta elements"""
679 def __init__(self, data):
680 """string - the data to work on for encoding detection"""
681 self.data = EncodingBytes(data)
682 self.encoding = None
684 def getEncoding(self):
685 if b"<meta" not in self.data:
686 return None
688 methodDispatch = (
689 (b"<!--", self.handleComment),
690 (b"<meta", self.handleMeta),
691 (b"</", self.handlePossibleEndTag),
692 (b"<!", self.handleOther),
693 (b"<?", self.handleOther),
694 (b"<", self.handlePossibleStartTag))
695 for _ in self.data:
696 keepParsing = True
697 try:
698 self.data.jumpTo(b"<")
699 except StopIteration:
700 break
701 for key, method in methodDispatch:
702 if self.data.matchBytes(key):
703 try:
704 keepParsing = method()
705 break
706 except StopIteration:
707 keepParsing = False
708 break
709 if not keepParsing:
710 break
712 return self.encoding
714 def handleComment(self):
715 """Skip over comments"""
716 return self.data.jumpTo(b"-->")
718 def handleMeta(self):
719 if self.data.currentByte not in spaceCharactersBytes:
720 # if we have <meta not followed by a space so just keep going
721 return True
722 # We have a valid meta element we want to search for attributes
723 hasPragma = False
724 pendingEncoding = None
725 while True:
726 # Try to find the next attribute after the current position
727 attr = self.getAttribute()
728 if attr is None:
729 return True
730 else:
731 if attr[0] == b"http-equiv":
732 hasPragma = attr[1] == b"content-type"
733 if hasPragma and pendingEncoding is not None:
734 self.encoding = pendingEncoding
735 return False
736 elif attr[0] == b"charset":
737 tentativeEncoding = attr[1]
738 codec = lookupEncoding(tentativeEncoding)
739 if codec is not None:
740 self.encoding = codec
741 return False
742 elif attr[0] == b"content":
743 contentParser = ContentAttrParser(EncodingBytes(attr[1]))
744 tentativeEncoding = contentParser.parse()
745 if tentativeEncoding is not None:
746 codec = lookupEncoding(tentativeEncoding)
747 if codec is not None:
748 if hasPragma:
749 self.encoding = codec
750 return False
751 else:
752 pendingEncoding = codec
754 def handlePossibleStartTag(self):
755 return self.handlePossibleTag(False)
757 def handlePossibleEndTag(self):
758 next(self.data)
759 return self.handlePossibleTag(True)
761 def handlePossibleTag(self, endTag):
762 data = self.data
763 if data.currentByte not in asciiLettersBytes:
764 # If the next byte is not an ascii letter either ignore this
765 # fragment (possible start tag case) or treat it according to
766 # handleOther
767 if endTag:
768 data.previous()
769 self.handleOther()
770 return True
772 c = data.skipUntil(spacesAngleBrackets)
773 if c == b"<":
774 # return to the first step in the overall "two step" algorithm
775 # reprocessing the < byte
776 data.previous()
777 else:
778 # Read all attributes
779 attr = self.getAttribute()
780 while attr is not None:
781 attr = self.getAttribute()
782 return True
784 def handleOther(self):
785 return self.data.jumpTo(b">")
787 def getAttribute(self):
788 """Return a name,value pair for the next attribute in the stream,
789 if one is found, or None"""
790 data = self.data
791 # Step 1 (skip chars)
792 c = data.skip(spaceCharactersBytes | frozenset([b"/"]))
793 assert c is None or len(c) == 1
794 # Step 2
795 if c in (b">", None):
796 return None
797 # Step 3
798 attrName = []
799 attrValue = []
800 # Step 4 attribute name
801 while True:
802 if c == b"=" and attrName:
803 break
804 elif c in spaceCharactersBytes:
805 # Step 6!
806 c = data.skip()
807 break
808 elif c in (b"/", b">"):
809 return b"".join(attrName), b""
810 elif c in asciiUppercaseBytes:
811 attrName.append(c.lower())
812 elif c is None:
813 return None
814 else:
815 attrName.append(c)
816 # Step 5
817 c = next(data)
818 # Step 7
819 if c != b"=":
820 data.previous()
821 return b"".join(attrName), b""
822 # Step 8
823 next(data)
824 # Step 9
825 c = data.skip()
826 # Step 10
827 if c in (b"'", b'"'):
828 # 10.1
829 quoteChar = c
830 while True:
831 # 10.2
832 c = next(data)
833 # 10.3
834 if c == quoteChar:
835 next(data)
836 return b"".join(attrName), b"".join(attrValue)
837 # 10.4
838 elif c in asciiUppercaseBytes:
839 attrValue.append(c.lower())
840 # 10.5
841 else:
842 attrValue.append(c)
843 elif c == b">":
844 return b"".join(attrName), b""
845 elif c in asciiUppercaseBytes:
846 attrValue.append(c.lower())
847 elif c is None:
848 return None
849 else:
850 attrValue.append(c)
851 # Step 11
852 while True:
853 c = next(data)
854 if c in spacesAngleBrackets:
855 return b"".join(attrName), b"".join(attrValue)
856 elif c in asciiUppercaseBytes:
857 attrValue.append(c.lower())
858 elif c is None:
859 return None
860 else:
861 attrValue.append(c)
864class ContentAttrParser(object):
865 def __init__(self, data):
866 assert isinstance(data, bytes)
867 self.data = data
869 def parse(self):
870 try:
871 # Check if the attr name is charset
872 # otherwise return
873 self.data.jumpTo(b"charset")
874 self.data.position += 1
875 self.data.skip()
876 if not self.data.currentByte == b"=":
877 # If there is no = sign keep looking for attrs
878 return None
879 self.data.position += 1
880 self.data.skip()
881 # Look for an encoding between matching quote marks
882 if self.data.currentByte in (b'"', b"'"):
883 quoteMark = self.data.currentByte
884 self.data.position += 1
885 oldPosition = self.data.position
886 if self.data.jumpTo(quoteMark):
887 return self.data[oldPosition:self.data.position]
888 else:
889 return None
890 else:
891 # Unquoted value
892 oldPosition = self.data.position
893 try:
894 self.data.skipUntil(spaceCharactersBytes)
895 return self.data[oldPosition:self.data.position]
896 except StopIteration:
897 # Return the whole remaining value
898 return self.data[oldPosition:]
899 except StopIteration:
900 return None
903def lookupEncoding(encoding):
904 """Return the python codec name corresponding to an encoding or None if the
905 string doesn't correspond to a valid encoding."""
906 if isinstance(encoding, bytes):
907 try:
908 encoding = encoding.decode("ascii")
909 except UnicodeDecodeError:
910 return None
912 if encoding is not None:
913 try:
914 return webencodings.lookup(encoding)
915 except AttributeError:
916 return None
917 else:
918 return None