Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/botocore/eventstream.py: 40%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

245 statements  

1# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"). You 

4# may not use this file except in compliance with the License. A copy of 

5# the License is located at 

6# 

7# http://aws.amazon.com/apache2.0/ 

8# 

9# or in the "license" file accompanying this file. This file is 

10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF 

11# ANY KIND, either express or implied. See the License for the specific 

12# language governing permissions and limitations under the License. 

13"""Binary Event Stream Decoding""" 

14 

15from binascii import crc32 

16from struct import unpack 

17 

18from botocore.exceptions import EventStreamError 

19 

20# byte length of the prelude (total_length + header_length + prelude_crc) 

21_PRELUDE_LENGTH = 12 

22_MAX_HEADERS_LENGTH = 128 * 1024 # 128 Kb 

23_MAX_PAYLOAD_LENGTH = 16 * 1024**2 # 16 Mb 

24 

25 

26class ParserError(Exception): 

27 """Base binary flow encoding parsing exception.""" 

28 

29 pass 

30 

31 

32class DuplicateHeader(ParserError): 

33 """Duplicate header found in the event.""" 

34 

35 def __init__(self, header): 

36 message = f'Duplicate header present: "{header}"' 

37 super().__init__(message) 

38 

39 

40class InvalidHeadersLength(ParserError): 

41 """Headers length is longer than the maximum.""" 

42 

43 def __init__(self, length): 

44 message = f'Header length of {length} exceeded the maximum of {_MAX_HEADERS_LENGTH}' 

45 super().__init__(message) 

46 

47 

48class InvalidPayloadLength(ParserError): 

49 """Payload length is longer than the maximum.""" 

50 

51 def __init__(self, length): 

52 message = f'Payload length of {length} exceeded the maximum of {_MAX_PAYLOAD_LENGTH}' 

53 super().__init__(message) 

54 

55 

56class ChecksumMismatch(ParserError): 

57 """Calculated checksum did not match the expected checksum.""" 

58 

59 def __init__(self, expected, calculated): 

60 message = f'Checksum mismatch: expected 0x{expected:08x}, calculated 0x{calculated:08x}' 

61 super().__init__(message) 

62 

63 

64class NoInitialResponseError(ParserError): 

65 """An event of type initial-response was not received. 

66 

67 This exception is raised when the event stream produced no events or 

68 the first event in the stream was not of the initial-response type. 

69 """ 

70 

71 def __init__(self): 

72 message = 'First event was not of the initial-response type' 

73 super().__init__(message) 

74 

75 

76class DecodeUtils: 

77 """Unpacking utility functions used in the decoder. 

78 

79 All methods on this class take raw bytes and return a tuple containing 

80 the value parsed from the bytes and the number of bytes consumed to parse 

81 that value. 

82 """ 

83 

84 UINT8_BYTE_FORMAT = '!B' 

85 UINT16_BYTE_FORMAT = '!H' 

86 UINT32_BYTE_FORMAT = '!I' 

87 INT8_BYTE_FORMAT = '!b' 

88 INT16_BYTE_FORMAT = '!h' 

89 INT32_BYTE_FORMAT = '!i' 

90 INT64_BYTE_FORMAT = '!q' 

91 PRELUDE_BYTE_FORMAT = '!III' 

92 

93 # uint byte size to unpack format 

94 UINT_BYTE_FORMAT = { 

95 1: UINT8_BYTE_FORMAT, 

96 2: UINT16_BYTE_FORMAT, 

97 4: UINT32_BYTE_FORMAT, 

98 } 

99 

100 @staticmethod 

101 def unpack_true(data): 

102 """This method consumes none of the provided bytes and returns True. 

103 

104 :type data: bytes 

105 :param data: The bytes to parse from. This is ignored in this method. 

106 

107 :rtype: tuple 

108 :rtype: (bool, int) 

109 :returns: The tuple (True, 0) 

110 """ 

111 return True, 0 

112 

113 @staticmethod 

114 def unpack_false(data): 

115 """This method consumes none of the provided bytes and returns False. 

116 

117 :type data: bytes 

118 :param data: The bytes to parse from. This is ignored in this method. 

119 

120 :rtype: tuple 

121 :rtype: (bool, int) 

122 :returns: The tuple (False, 0) 

123 """ 

124 return False, 0 

125 

126 @staticmethod 

127 def unpack_uint8(data): 

128 """Parse an unsigned 8-bit integer from the bytes. 

129 

130 :type data: bytes 

131 :param data: The bytes to parse from. 

132 

133 :rtype: (int, int) 

134 :returns: A tuple containing the (parsed integer value, bytes consumed) 

135 """ 

136 value = unpack(DecodeUtils.UINT8_BYTE_FORMAT, data[:1])[0] 

137 return value, 1 

138 

139 @staticmethod 

140 def unpack_uint32(data): 

141 """Parse an unsigned 32-bit integer from the bytes. 

142 

143 :type data: bytes 

144 :param data: The bytes to parse from. 

145 

146 :rtype: (int, int) 

147 :returns: A tuple containing the (parsed integer value, bytes consumed) 

148 """ 

149 value = unpack(DecodeUtils.UINT32_BYTE_FORMAT, data[:4])[0] 

150 return value, 4 

151 

152 @staticmethod 

153 def unpack_int8(data): 

154 """Parse a signed 8-bit integer from the bytes. 

155 

156 :type data: bytes 

157 :param data: The bytes to parse from. 

158 

159 :rtype: (int, int) 

160 :returns: A tuple containing the (parsed integer value, bytes consumed) 

161 """ 

162 value = unpack(DecodeUtils.INT8_BYTE_FORMAT, data[:1])[0] 

163 return value, 1 

164 

165 @staticmethod 

166 def unpack_int16(data): 

167 """Parse a signed 16-bit integer from the bytes. 

168 

169 :type data: bytes 

170 :param data: The bytes to parse from. 

171 

172 :rtype: tuple 

173 :rtype: (int, int) 

174 :returns: A tuple containing the (parsed integer value, bytes consumed) 

175 """ 

176 value = unpack(DecodeUtils.INT16_BYTE_FORMAT, data[:2])[0] 

177 return value, 2 

178 

179 @staticmethod 

180 def unpack_int32(data): 

181 """Parse a signed 32-bit integer from the bytes. 

182 

183 :type data: bytes 

184 :param data: The bytes to parse from. 

185 

186 :rtype: tuple 

187 :rtype: (int, int) 

188 :returns: A tuple containing the (parsed integer value, bytes consumed) 

189 """ 

190 value = unpack(DecodeUtils.INT32_BYTE_FORMAT, data[:4])[0] 

191 return value, 4 

192 

193 @staticmethod 

194 def unpack_int64(data): 

195 """Parse a signed 64-bit integer from the bytes. 

196 

197 :type data: bytes 

198 :param data: The bytes to parse from. 

199 

200 :rtype: tuple 

201 :rtype: (int, int) 

202 :returns: A tuple containing the (parsed integer value, bytes consumed) 

203 """ 

204 value = unpack(DecodeUtils.INT64_BYTE_FORMAT, data[:8])[0] 

205 return value, 8 

206 

207 @staticmethod 

208 def unpack_byte_array(data, length_byte_size=2): 

209 """Parse a variable length byte array from the bytes. 

210 

211 The bytes are expected to be in the following format: 

212 [ length ][0 ... length bytes] 

213 where length is an unsigned integer represented in the smallest number 

214 of bytes to hold the maximum length of the array. 

215 

216 :type data: bytes 

217 :param data: The bytes to parse from. 

218 

219 :type length_byte_size: int 

220 :param length_byte_size: The byte size of the preceding integer that 

221 represents the length of the array. Supported values are 1, 2, and 4. 

222 

223 :rtype: (bytes, int) 

224 :returns: A tuple containing the (parsed byte array, bytes consumed). 

225 """ 

226 uint_byte_format = DecodeUtils.UINT_BYTE_FORMAT[length_byte_size] 

227 length = unpack(uint_byte_format, data[:length_byte_size])[0] 

228 bytes_end = length + length_byte_size 

229 array_bytes = data[length_byte_size:bytes_end] 

230 return array_bytes, bytes_end 

231 

232 @staticmethod 

233 def unpack_utf8_string(data, length_byte_size=2): 

234 """Parse a variable length utf-8 string from the bytes. 

235 

236 The bytes are expected to be in the following format: 

237 [ length ][0 ... length bytes] 

238 where length is an unsigned integer represented in the smallest number 

239 of bytes to hold the maximum length of the array and the following 

240 bytes are a valid utf-8 string. 

241 

242 :type data: bytes 

243 :param bytes: The bytes to parse from. 

244 

245 :type length_byte_size: int 

246 :param length_byte_size: The byte size of the preceding integer that 

247 represents the length of the array. Supported values are 1, 2, and 4. 

248 

249 :rtype: (str, int) 

250 :returns: A tuple containing the (utf-8 string, bytes consumed). 

251 """ 

252 array_bytes, consumed = DecodeUtils.unpack_byte_array( 

253 data, length_byte_size 

254 ) 

255 return array_bytes.decode('utf-8'), consumed 

256 

257 @staticmethod 

258 def unpack_uuid(data): 

259 """Parse a 16-byte uuid from the bytes. 

260 

261 :type data: bytes 

262 :param data: The bytes to parse from. 

263 

264 :rtype: (bytes, int) 

265 :returns: A tuple containing the (uuid bytes, bytes consumed). 

266 """ 

267 return data[:16], 16 

268 

269 @staticmethod 

270 def unpack_prelude(data): 

271 """Parse the prelude for an event stream message from the bytes. 

272 

273 The prelude for an event stream message has the following format: 

274 [total_length][header_length][prelude_crc] 

275 where each field is an unsigned 32-bit integer. 

276 

277 :rtype: ((int, int, int), int) 

278 :returns: A tuple of ((total_length, headers_length, prelude_crc), 

279 consumed) 

280 """ 

281 return (unpack(DecodeUtils.PRELUDE_BYTE_FORMAT, data), _PRELUDE_LENGTH) 

282 

283 

284def _validate_checksum(data, checksum, crc=0): 

285 # To generate the same numeric value across all Python versions and 

286 # platforms use crc32(data) & 0xffffffff. 

287 computed_checksum = crc32(data, crc) & 0xFFFFFFFF 

288 if checksum != computed_checksum: 

289 raise ChecksumMismatch(checksum, computed_checksum) 

290 

291 

292class MessagePrelude: 

293 """Represents the prelude of an event stream message.""" 

294 

295 def __init__(self, total_length, headers_length, crc): 

296 self.total_length = total_length 

297 self.headers_length = headers_length 

298 self.crc = crc 

299 

300 @property 

301 def payload_length(self): 

302 """Calculates the total payload length. 

303 

304 The extra minus 4 bytes is for the message CRC. 

305 

306 :rtype: int 

307 :returns: The total payload length. 

308 """ 

309 return self.total_length - self.headers_length - _PRELUDE_LENGTH - 4 

310 

311 @property 

312 def payload_end(self): 

313 """Calculates the byte offset for the end of the message payload. 

314 

315 The extra minus 4 bytes is for the message CRC. 

316 

317 :rtype: int 

318 :returns: The byte offset from the beginning of the event stream 

319 message to the end of the payload. 

320 """ 

321 return self.total_length - 4 

322 

323 @property 

324 def headers_end(self): 

325 """Calculates the byte offset for the end of the message headers. 

326 

327 :rtype: int 

328 :returns: The byte offset from the beginning of the event stream 

329 message to the end of the headers. 

330 """ 

331 return _PRELUDE_LENGTH + self.headers_length 

332 

333 

334class EventStreamMessage: 

335 """Represents an event stream message.""" 

336 

337 def __init__(self, prelude, headers, payload, crc): 

338 self.prelude = prelude 

339 self.headers = headers 

340 self.payload = payload 

341 self.crc = crc 

342 

343 def to_response_dict(self, status_code=200): 

344 message_type = self.headers.get(':message-type') 

345 if message_type == 'error' or message_type == 'exception': 

346 status_code = 400 

347 return { 

348 'status_code': status_code, 

349 'headers': self.headers, 

350 'body': self.payload, 

351 } 

352 

353 

354class EventStreamHeaderParser: 

355 """Parses the event headers from an event stream message. 

356 

357 Expects all of the header data upfront and creates a dictionary of headers 

358 to return. This object can be reused multiple times to parse the headers 

359 from multiple event stream messages. 

360 """ 

361 

362 # Maps header type to appropriate unpacking function 

363 # These unpacking functions return the value and the amount unpacked 

364 _HEADER_TYPE_MAP = { 

365 # boolean_true 

366 0: DecodeUtils.unpack_true, 

367 # boolean_false 

368 1: DecodeUtils.unpack_false, 

369 # byte 

370 2: DecodeUtils.unpack_int8, 

371 # short 

372 3: DecodeUtils.unpack_int16, 

373 # integer 

374 4: DecodeUtils.unpack_int32, 

375 # long 

376 5: DecodeUtils.unpack_int64, 

377 # byte_array 

378 6: DecodeUtils.unpack_byte_array, 

379 # string 

380 7: DecodeUtils.unpack_utf8_string, 

381 # timestamp 

382 8: DecodeUtils.unpack_int64, 

383 # uuid 

384 9: DecodeUtils.unpack_uuid, 

385 } 

386 

387 def __init__(self): 

388 self._data = None 

389 

390 def parse(self, data): 

391 """Parses the event stream headers from an event stream message. 

392 

393 :type data: bytes 

394 :param data: The bytes that correspond to the headers section of an 

395 event stream message. 

396 

397 :rtype: dict 

398 :returns: A dictionary of header key, value pairs. 

399 """ 

400 self._data = data 

401 return self._parse_headers() 

402 

403 def _parse_headers(self): 

404 headers = {} 

405 while self._data: 

406 name, value = self._parse_header() 

407 if name in headers: 

408 raise DuplicateHeader(name) 

409 headers[name] = value 

410 return headers 

411 

412 def _parse_header(self): 

413 name = self._parse_name() 

414 value = self._parse_value() 

415 return name, value 

416 

417 def _parse_name(self): 

418 name, consumed = DecodeUtils.unpack_utf8_string(self._data, 1) 

419 self._advance_data(consumed) 

420 return name 

421 

422 def _parse_type(self): 

423 type, consumed = DecodeUtils.unpack_uint8(self._data) 

424 self._advance_data(consumed) 

425 return type 

426 

427 def _parse_value(self): 

428 header_type = self._parse_type() 

429 value_unpacker = self._HEADER_TYPE_MAP[header_type] 

430 value, consumed = value_unpacker(self._data) 

431 self._advance_data(consumed) 

432 return value 

433 

434 def _advance_data(self, consumed): 

435 self._data = self._data[consumed:] 

436 

437 

438class EventStreamBuffer: 

439 """Streaming based event stream buffer 

440 

441 A buffer class that wraps bytes from an event stream providing parsed 

442 messages as they become available via an iterable interface. 

443 """ 

444 

445 def __init__(self): 

446 self._data = b'' 

447 self._prelude = None 

448 self._header_parser = EventStreamHeaderParser() 

449 

450 def add_data(self, data): 

451 """Add data to the buffer. 

452 

453 :type data: bytes 

454 :param data: The bytes to add to the buffer to be used when parsing 

455 """ 

456 self._data += data 

457 

458 def _validate_prelude(self, prelude): 

459 if prelude.headers_length > _MAX_HEADERS_LENGTH: 

460 raise InvalidHeadersLength(prelude.headers_length) 

461 

462 if prelude.payload_length > _MAX_PAYLOAD_LENGTH: 

463 raise InvalidPayloadLength(prelude.payload_length) 

464 

465 def _parse_prelude(self): 

466 prelude_bytes = self._data[:_PRELUDE_LENGTH] 

467 raw_prelude, _ = DecodeUtils.unpack_prelude(prelude_bytes) 

468 prelude = MessagePrelude(*raw_prelude) 

469 # The minus 4 removes the prelude crc from the bytes to be checked 

470 _validate_checksum(prelude_bytes[: _PRELUDE_LENGTH - 4], prelude.crc) 

471 self._validate_prelude(prelude) 

472 return prelude 

473 

474 def _parse_headers(self): 

475 header_bytes = self._data[_PRELUDE_LENGTH : self._prelude.headers_end] 

476 return self._header_parser.parse(header_bytes) 

477 

478 def _parse_payload(self): 

479 prelude = self._prelude 

480 payload_bytes = self._data[prelude.headers_end : prelude.payload_end] 

481 return payload_bytes 

482 

483 def _parse_message_crc(self): 

484 prelude = self._prelude 

485 crc_bytes = self._data[prelude.payload_end : prelude.total_length] 

486 message_crc, _ = DecodeUtils.unpack_uint32(crc_bytes) 

487 return message_crc 

488 

489 def _parse_message_bytes(self): 

490 # The minus 4 includes the prelude crc to the bytes to be checked 

491 message_bytes = self._data[ 

492 _PRELUDE_LENGTH - 4 : self._prelude.payload_end 

493 ] 

494 return message_bytes 

495 

496 def _validate_message_crc(self): 

497 message_crc = self._parse_message_crc() 

498 message_bytes = self._parse_message_bytes() 

499 _validate_checksum(message_bytes, message_crc, crc=self._prelude.crc) 

500 return message_crc 

501 

502 def _parse_message(self): 

503 crc = self._validate_message_crc() 

504 headers = self._parse_headers() 

505 payload = self._parse_payload() 

506 message = EventStreamMessage(self._prelude, headers, payload, crc) 

507 self._prepare_for_next_message() 

508 return message 

509 

510 def _prepare_for_next_message(self): 

511 # Advance the data and reset the current prelude 

512 self._data = self._data[self._prelude.total_length :] 

513 self._prelude = None 

514 

515 def next(self): 

516 """Provides the next available message parsed from the stream 

517 

518 :rtype: EventStreamMessage 

519 :returns: The next event stream message 

520 """ 

521 if len(self._data) < _PRELUDE_LENGTH: 

522 raise StopIteration() 

523 

524 if self._prelude is None: 

525 self._prelude = self._parse_prelude() 

526 

527 if len(self._data) < self._prelude.total_length: 

528 raise StopIteration() 

529 

530 return self._parse_message() 

531 

532 def __next__(self): 

533 return self.next() 

534 

535 def __iter__(self): 

536 return self 

537 

538 

539class EventStream: 

540 """Wrapper class for an event stream body. 

541 

542 This wraps the underlying streaming body, parsing it for individual events 

543 and yielding them as they come available through the iterator interface. 

544 

545 The following example uses the S3 select API to get structured data out of 

546 an object stored in S3 using an event stream. 

547 

548 **Example:** 

549 :: 

550 from botocore.session import Session 

551 

552 s3 = Session().create_client('s3') 

553 response = s3.select_object_content( 

554 Bucket='bucketname', 

555 Key='keyname', 

556 ExpressionType='SQL', 

557 RequestProgress={'Enabled': True}, 

558 Expression="SELECT * FROM S3Object s", 

559 InputSerialization={'CSV': {}}, 

560 OutputSerialization={'CSV': {}}, 

561 ) 

562 # This is the event stream in the response 

563 event_stream = response['Payload'] 

564 end_event_received = False 

565 with open('output', 'wb') as f: 

566 # Iterate over events in the event stream as they come 

567 for event in event_stream: 

568 # If we received a records event, write the data to a file 

569 if 'Records' in event: 

570 data = event['Records']['Payload'] 

571 f.write(data) 

572 # If we received a progress event, print the details 

573 elif 'Progress' in event: 

574 print(event['Progress']['Details']) 

575 # End event indicates that the request finished successfully 

576 elif 'End' in event: 

577 print('Result is complete') 

578 end_event_received = True 

579 if not end_event_received: 

580 raise Exception("End event not received, request incomplete.") 

581 """ 

582 

583 def __init__(self, raw_stream, output_shape, parser, operation_name): 

584 self._raw_stream = raw_stream 

585 self._output_shape = output_shape 

586 self._operation_name = operation_name 

587 self._parser = parser 

588 self._event_generator = self._create_raw_event_generator() 

589 

590 def __iter__(self): 

591 for event in self._event_generator: 

592 parsed_event = self._parse_event(event) 

593 if parsed_event: 

594 yield parsed_event 

595 

596 def _create_raw_event_generator(self): 

597 event_stream_buffer = EventStreamBuffer() 

598 for chunk in self._raw_stream.stream(): 

599 event_stream_buffer.add_data(chunk) 

600 yield from event_stream_buffer 

601 

602 def _parse_event(self, event): 

603 response_dict = event.to_response_dict() 

604 parsed_response = self._parser.parse(response_dict, self._output_shape) 

605 if response_dict['status_code'] == 200: 

606 return parsed_response 

607 else: 

608 raise EventStreamError(parsed_response, self._operation_name) 

609 

610 def get_initial_response(self): 

611 try: 

612 initial_event = next(self._event_generator) 

613 event_type = initial_event.headers.get(':event-type') 

614 if event_type == 'initial-response': 

615 return initial_event 

616 except StopIteration: 

617 pass 

618 raise NoInitialResponseError() 

619 

620 def close(self): 

621 """Closes the underlying streaming body.""" 

622 self._raw_stream.close()