Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/yaml/reader.py: 14%

122 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1# This module contains abstractions for the input stream. You don't have to 

2# looks further, there are no pretty code. 

3# 

4# We define two classes here. 

5# 

6# Mark(source, line, column) 

7# It's just a record and its only use is producing nice error messages. 

8# Parser does not use it for any other purposes. 

9# 

10# Reader(source, data) 

11# Reader determines the encoding of `data` and converts it to unicode. 

12# Reader provides the following methods and attributes: 

13# reader.peek(length=1) - return the next `length` characters 

14# reader.forward(length=1) - move the current position to `length` characters. 

15# reader.index - the number of the current character. 

16# reader.line, stream.column - the line and the column of the current character. 

17 

18__all__ = ['Reader', 'ReaderError'] 

19 

20from .error import YAMLError, Mark 

21 

22import codecs, re 

23 

24class ReaderError(YAMLError): 

25 

26 def __init__(self, name, position, character, encoding, reason): 

27 self.name = name 

28 self.character = character 

29 self.position = position 

30 self.encoding = encoding 

31 self.reason = reason 

32 

33 def __str__(self): 

34 if isinstance(self.character, bytes): 

35 return "'%s' codec can't decode byte #x%02x: %s\n" \ 

36 " in \"%s\", position %d" \ 

37 % (self.encoding, ord(self.character), self.reason, 

38 self.name, self.position) 

39 else: 

40 return "unacceptable character #x%04x: %s\n" \ 

41 " in \"%s\", position %d" \ 

42 % (self.character, self.reason, 

43 self.name, self.position) 

44 

45class Reader(object): 

46 # Reader: 

47 # - determines the data encoding and converts it to a unicode string, 

48 # - checks if characters are in allowed range, 

49 # - adds '\0' to the end. 

50 

51 # Reader accepts 

52 # - a `bytes` object, 

53 # - a `str` object, 

54 # - a file-like object with its `read` method returning `str`, 

55 # - a file-like object with its `read` method returning `unicode`. 

56 

57 # Yeah, it's ugly and slow. 

58 

59 def __init__(self, stream): 

60 self.name = None 

61 self.stream = None 

62 self.stream_pointer = 0 

63 self.eof = True 

64 self.buffer = '' 

65 self.pointer = 0 

66 self.raw_buffer = None 

67 self.raw_decode = None 

68 self.encoding = None 

69 self.index = 0 

70 self.line = 0 

71 self.column = 0 

72 if isinstance(stream, str): 

73 self.name = "<unicode string>" 

74 self.check_printable(stream) 

75 self.buffer = stream+'\0' 

76 elif isinstance(stream, bytes): 

77 self.name = "<byte string>" 

78 self.raw_buffer = stream 

79 self.determine_encoding() 

80 else: 

81 self.stream = stream 

82 self.name = getattr(stream, 'name', "<file>") 

83 self.eof = False 

84 self.raw_buffer = None 

85 self.determine_encoding() 

86 

87 def peek(self, index=0): 

88 try: 

89 return self.buffer[self.pointer+index] 

90 except IndexError: 

91 self.update(index+1) 

92 return self.buffer[self.pointer+index] 

93 

94 def prefix(self, length=1): 

95 if self.pointer+length >= len(self.buffer): 

96 self.update(length) 

97 return self.buffer[self.pointer:self.pointer+length] 

98 

99 def forward(self, length=1): 

100 if self.pointer+length+1 >= len(self.buffer): 

101 self.update(length+1) 

102 while length: 

103 ch = self.buffer[self.pointer] 

104 self.pointer += 1 

105 self.index += 1 

106 if ch in '\n\x85\u2028\u2029' \ 

107 or (ch == '\r' and self.buffer[self.pointer] != '\n'): 

108 self.line += 1 

109 self.column = 0 

110 elif ch != '\uFEFF': 

111 self.column += 1 

112 length -= 1 

113 

114 def get_mark(self): 

115 if self.stream is None: 

116 return Mark(self.name, self.index, self.line, self.column, 

117 self.buffer, self.pointer) 

118 else: 

119 return Mark(self.name, self.index, self.line, self.column, 

120 None, None) 

121 

122 def determine_encoding(self): 

123 while not self.eof and (self.raw_buffer is None or len(self.raw_buffer) < 2): 

124 self.update_raw() 

125 if isinstance(self.raw_buffer, bytes): 

126 if self.raw_buffer.startswith(codecs.BOM_UTF16_LE): 

127 self.raw_decode = codecs.utf_16_le_decode 

128 self.encoding = 'utf-16-le' 

129 elif self.raw_buffer.startswith(codecs.BOM_UTF16_BE): 

130 self.raw_decode = codecs.utf_16_be_decode 

131 self.encoding = 'utf-16-be' 

132 else: 

133 self.raw_decode = codecs.utf_8_decode 

134 self.encoding = 'utf-8' 

135 self.update(1) 

136 

137 NON_PRINTABLE = re.compile('[^\x09\x0A\x0D\x20-\x7E\x85\xA0-\uD7FF\uE000-\uFFFD\U00010000-\U0010ffff]') 

138 def check_printable(self, data): 

139 match = self.NON_PRINTABLE.search(data) 

140 if match: 

141 character = match.group() 

142 position = self.index+(len(self.buffer)-self.pointer)+match.start() 

143 raise ReaderError(self.name, position, ord(character), 

144 'unicode', "special characters are not allowed") 

145 

146 def update(self, length): 

147 if self.raw_buffer is None: 

148 return 

149 self.buffer = self.buffer[self.pointer:] 

150 self.pointer = 0 

151 while len(self.buffer) < length: 

152 if not self.eof: 

153 self.update_raw() 

154 if self.raw_decode is not None: 

155 try: 

156 data, converted = self.raw_decode(self.raw_buffer, 

157 'strict', self.eof) 

158 except UnicodeDecodeError as exc: 

159 character = self.raw_buffer[exc.start] 

160 if self.stream is not None: 

161 position = self.stream_pointer-len(self.raw_buffer)+exc.start 

162 else: 

163 position = exc.start 

164 raise ReaderError(self.name, position, character, 

165 exc.encoding, exc.reason) 

166 else: 

167 data = self.raw_buffer 

168 converted = len(data) 

169 self.check_printable(data) 

170 self.buffer += data 

171 self.raw_buffer = self.raw_buffer[converted:] 

172 if self.eof: 

173 self.buffer += '\0' 

174 self.raw_buffer = None 

175 break 

176 

177 def update_raw(self, size=4096): 

178 data = self.stream.read(size) 

179 if self.raw_buffer is None: 

180 self.raw_buffer = data 

181 else: 

182 self.raw_buffer += data 

183 self.stream_pointer += len(data) 

184 if not data: 

185 self.eof = True