Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/botocore/eventstream.py: 40%

245 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"). You 

4# may not use this file except in compliance with the License. A copy of 

5# the License is located at 

6# 

7# http://aws.amazon.com/apache2.0/ 

8# 

9# or in the "license" file accompanying this file. This file is 

10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF 

11# ANY KIND, either express or implied. See the License for the specific 

12# language governing permissions and limitations under the License. 

13"""Binary Event Stream Decoding """ 

14 

15from binascii import crc32 

16from struct import unpack 

17 

18from botocore.exceptions import EventStreamError 

19 

20# byte length of the prelude (total_length + header_length + prelude_crc) 

21_PRELUDE_LENGTH = 12 

22_MAX_HEADERS_LENGTH = 128 * 1024 # 128 Kb 

23_MAX_PAYLOAD_LENGTH = 16 * 1024**2 # 16 Mb 

24 

25 

26class ParserError(Exception): 

27 """Base binary flow encoding parsing exception.""" 

28 

29 pass 

30 

31 

32class DuplicateHeader(ParserError): 

33 """Duplicate header found in the event.""" 

34 

35 def __init__(self, header): 

36 message = 'Duplicate header present: "%s"' % header 

37 super().__init__(message) 

38 

39 

40class InvalidHeadersLength(ParserError): 

41 """Headers length is longer than the maximum.""" 

42 

43 def __init__(self, length): 

44 message = 'Header length of {} exceeded the maximum of {}'.format( 

45 length, 

46 _MAX_HEADERS_LENGTH, 

47 ) 

48 super().__init__(message) 

49 

50 

51class InvalidPayloadLength(ParserError): 

52 """Payload length is longer than the maximum.""" 

53 

54 def __init__(self, length): 

55 message = 'Payload length of {} exceeded the maximum of {}'.format( 

56 length, 

57 _MAX_PAYLOAD_LENGTH, 

58 ) 

59 super().__init__(message) 

60 

61 

62class ChecksumMismatch(ParserError): 

63 """Calculated checksum did not match the expected checksum.""" 

64 

65 def __init__(self, expected, calculated): 

66 message = ( 

67 'Checksum mismatch: expected 0x{:08x}, calculated 0x{:08x}'.format( 

68 expected, 

69 calculated, 

70 ) 

71 ) 

72 super().__init__(message) 

73 

74 

75class NoInitialResponseError(ParserError): 

76 """An event of type initial-response was not received. 

77 

78 This exception is raised when the event stream produced no events or 

79 the first event in the stream was not of the initial-response type. 

80 """ 

81 

82 def __init__(self): 

83 message = 'First event was not of the initial-response type' 

84 super().__init__(message) 

85 

86 

87class DecodeUtils: 

88 """Unpacking utility functions used in the decoder. 

89 

90 All methods on this class take raw bytes and return a tuple containing 

91 the value parsed from the bytes and the number of bytes consumed to parse 

92 that value. 

93 """ 

94 

95 UINT8_BYTE_FORMAT = '!B' 

96 UINT16_BYTE_FORMAT = '!H' 

97 UINT32_BYTE_FORMAT = '!I' 

98 INT8_BYTE_FORMAT = '!b' 

99 INT16_BYTE_FORMAT = '!h' 

100 INT32_BYTE_FORMAT = '!i' 

101 INT64_BYTE_FORMAT = '!q' 

102 PRELUDE_BYTE_FORMAT = '!III' 

103 

104 # uint byte size to unpack format 

105 UINT_BYTE_FORMAT = { 

106 1: UINT8_BYTE_FORMAT, 

107 2: UINT16_BYTE_FORMAT, 

108 4: UINT32_BYTE_FORMAT, 

109 } 

110 

111 @staticmethod 

112 def unpack_true(data): 

113 """This method consumes none of the provided bytes and returns True. 

114 

115 :type data: bytes 

116 :param data: The bytes to parse from. This is ignored in this method. 

117 

118 :rtype: tuple 

119 :rtype: (bool, int) 

120 :returns: The tuple (True, 0) 

121 """ 

122 return True, 0 

123 

124 @staticmethod 

125 def unpack_false(data): 

126 """This method consumes none of the provided bytes and returns False. 

127 

128 :type data: bytes 

129 :param data: The bytes to parse from. This is ignored in this method. 

130 

131 :rtype: tuple 

132 :rtype: (bool, int) 

133 :returns: The tuple (False, 0) 

134 """ 

135 return False, 0 

136 

137 @staticmethod 

138 def unpack_uint8(data): 

139 """Parse an unsigned 8-bit integer from the bytes. 

140 

141 :type data: bytes 

142 :param data: The bytes to parse from. 

143 

144 :rtype: (int, int) 

145 :returns: A tuple containing the (parsed integer value, bytes consumed) 

146 """ 

147 value = unpack(DecodeUtils.UINT8_BYTE_FORMAT, data[:1])[0] 

148 return value, 1 

149 

150 @staticmethod 

151 def unpack_uint32(data): 

152 """Parse an unsigned 32-bit integer from the bytes. 

153 

154 :type data: bytes 

155 :param data: The bytes to parse from. 

156 

157 :rtype: (int, int) 

158 :returns: A tuple containing the (parsed integer value, bytes consumed) 

159 """ 

160 value = unpack(DecodeUtils.UINT32_BYTE_FORMAT, data[:4])[0] 

161 return value, 4 

162 

163 @staticmethod 

164 def unpack_int8(data): 

165 """Parse a signed 8-bit integer from the bytes. 

166 

167 :type data: bytes 

168 :param data: The bytes to parse from. 

169 

170 :rtype: (int, int) 

171 :returns: A tuple containing the (parsed integer value, bytes consumed) 

172 """ 

173 value = unpack(DecodeUtils.INT8_BYTE_FORMAT, data[:1])[0] 

174 return value, 1 

175 

176 @staticmethod 

177 def unpack_int16(data): 

178 """Parse a signed 16-bit integer from the bytes. 

179 

180 :type data: bytes 

181 :param data: The bytes to parse from. 

182 

183 :rtype: tuple 

184 :rtype: (int, int) 

185 :returns: A tuple containing the (parsed integer value, bytes consumed) 

186 """ 

187 value = unpack(DecodeUtils.INT16_BYTE_FORMAT, data[:2])[0] 

188 return value, 2 

189 

190 @staticmethod 

191 def unpack_int32(data): 

192 """Parse a signed 32-bit integer from the bytes. 

193 

194 :type data: bytes 

195 :param data: The bytes to parse from. 

196 

197 :rtype: tuple 

198 :rtype: (int, int) 

199 :returns: A tuple containing the (parsed integer value, bytes consumed) 

200 """ 

201 value = unpack(DecodeUtils.INT32_BYTE_FORMAT, data[:4])[0] 

202 return value, 4 

203 

204 @staticmethod 

205 def unpack_int64(data): 

206 """Parse a signed 64-bit integer from the bytes. 

207 

208 :type data: bytes 

209 :param data: The bytes to parse from. 

210 

211 :rtype: tuple 

212 :rtype: (int, int) 

213 :returns: A tuple containing the (parsed integer value, bytes consumed) 

214 """ 

215 value = unpack(DecodeUtils.INT64_BYTE_FORMAT, data[:8])[0] 

216 return value, 8 

217 

218 @staticmethod 

219 def unpack_byte_array(data, length_byte_size=2): 

220 """Parse a variable length byte array from the bytes. 

221 

222 The bytes are expected to be in the following format: 

223 [ length ][0 ... length bytes] 

224 where length is an unsigned integer represented in the smallest number 

225 of bytes to hold the maximum length of the array. 

226 

227 :type data: bytes 

228 :param data: The bytes to parse from. 

229 

230 :type length_byte_size: int 

231 :param length_byte_size: The byte size of the preceding integer that 

232 represents the length of the array. Supported values are 1, 2, and 4. 

233 

234 :rtype: (bytes, int) 

235 :returns: A tuple containing the (parsed byte array, bytes consumed). 

236 """ 

237 uint_byte_format = DecodeUtils.UINT_BYTE_FORMAT[length_byte_size] 

238 length = unpack(uint_byte_format, data[:length_byte_size])[0] 

239 bytes_end = length + length_byte_size 

240 array_bytes = data[length_byte_size:bytes_end] 

241 return array_bytes, bytes_end 

242 

243 @staticmethod 

244 def unpack_utf8_string(data, length_byte_size=2): 

245 """Parse a variable length utf-8 string from the bytes. 

246 

247 The bytes are expected to be in the following format: 

248 [ length ][0 ... length bytes] 

249 where length is an unsigned integer represented in the smallest number 

250 of bytes to hold the maximum length of the array and the following 

251 bytes are a valid utf-8 string. 

252 

253 :type data: bytes 

254 :param bytes: The bytes to parse from. 

255 

256 :type length_byte_size: int 

257 :param length_byte_size: The byte size of the preceding integer that 

258 represents the length of the array. Supported values are 1, 2, and 4. 

259 

260 :rtype: (str, int) 

261 :returns: A tuple containing the (utf-8 string, bytes consumed). 

262 """ 

263 array_bytes, consumed = DecodeUtils.unpack_byte_array( 

264 data, length_byte_size 

265 ) 

266 return array_bytes.decode('utf-8'), consumed 

267 

268 @staticmethod 

269 def unpack_uuid(data): 

270 """Parse a 16-byte uuid from the bytes. 

271 

272 :type data: bytes 

273 :param data: The bytes to parse from. 

274 

275 :rtype: (bytes, int) 

276 :returns: A tuple containing the (uuid bytes, bytes consumed). 

277 """ 

278 return data[:16], 16 

279 

280 @staticmethod 

281 def unpack_prelude(data): 

282 """Parse the prelude for an event stream message from the bytes. 

283 

284 The prelude for an event stream message has the following format: 

285 [total_length][header_length][prelude_crc] 

286 where each field is an unsigned 32-bit integer. 

287 

288 :rtype: ((int, int, int), int) 

289 :returns: A tuple of ((total_length, headers_length, prelude_crc), 

290 consumed) 

291 """ 

292 return (unpack(DecodeUtils.PRELUDE_BYTE_FORMAT, data), _PRELUDE_LENGTH) 

293 

294 

295def _validate_checksum(data, checksum, crc=0): 

296 # To generate the same numeric value across all Python versions and 

297 # platforms use crc32(data) & 0xffffffff. 

298 computed_checksum = crc32(data, crc) & 0xFFFFFFFF 

299 if checksum != computed_checksum: 

300 raise ChecksumMismatch(checksum, computed_checksum) 

301 

302 

303class MessagePrelude: 

304 """Represents the prelude of an event stream message.""" 

305 

306 def __init__(self, total_length, headers_length, crc): 

307 self.total_length = total_length 

308 self.headers_length = headers_length 

309 self.crc = crc 

310 

311 @property 

312 def payload_length(self): 

313 """Calculates the total payload length. 

314 

315 The extra minus 4 bytes is for the message CRC. 

316 

317 :rtype: int 

318 :returns: The total payload length. 

319 """ 

320 return self.total_length - self.headers_length - _PRELUDE_LENGTH - 4 

321 

322 @property 

323 def payload_end(self): 

324 """Calculates the byte offset for the end of the message payload. 

325 

326 The extra minus 4 bytes is for the message CRC. 

327 

328 :rtype: int 

329 :returns: The byte offset from the beginning of the event stream 

330 message to the end of the payload. 

331 """ 

332 return self.total_length - 4 

333 

334 @property 

335 def headers_end(self): 

336 """Calculates the byte offset for the end of the message headers. 

337 

338 :rtype: int 

339 :returns: The byte offset from the beginning of the event stream 

340 message to the end of the headers. 

341 """ 

342 return _PRELUDE_LENGTH + self.headers_length 

343 

344 

345class EventStreamMessage: 

346 """Represents an event stream message.""" 

347 

348 def __init__(self, prelude, headers, payload, crc): 

349 self.prelude = prelude 

350 self.headers = headers 

351 self.payload = payload 

352 self.crc = crc 

353 

354 def to_response_dict(self, status_code=200): 

355 message_type = self.headers.get(':message-type') 

356 if message_type == 'error' or message_type == 'exception': 

357 status_code = 400 

358 return { 

359 'status_code': status_code, 

360 'headers': self.headers, 

361 'body': self.payload, 

362 } 

363 

364 

365class EventStreamHeaderParser: 

366 """Parses the event headers from an event stream message. 

367 

368 Expects all of the header data upfront and creates a dictionary of headers 

369 to return. This object can be reused multiple times to parse the headers 

370 from multiple event stream messages. 

371 """ 

372 

373 # Maps header type to appropriate unpacking function 

374 # These unpacking functions return the value and the amount unpacked 

375 _HEADER_TYPE_MAP = { 

376 # boolean_true 

377 0: DecodeUtils.unpack_true, 

378 # boolean_false 

379 1: DecodeUtils.unpack_false, 

380 # byte 

381 2: DecodeUtils.unpack_int8, 

382 # short 

383 3: DecodeUtils.unpack_int16, 

384 # integer 

385 4: DecodeUtils.unpack_int32, 

386 # long 

387 5: DecodeUtils.unpack_int64, 

388 # byte_array 

389 6: DecodeUtils.unpack_byte_array, 

390 # string 

391 7: DecodeUtils.unpack_utf8_string, 

392 # timestamp 

393 8: DecodeUtils.unpack_int64, 

394 # uuid 

395 9: DecodeUtils.unpack_uuid, 

396 } 

397 

398 def __init__(self): 

399 self._data = None 

400 

401 def parse(self, data): 

402 """Parses the event stream headers from an event stream message. 

403 

404 :type data: bytes 

405 :param data: The bytes that correspond to the headers section of an 

406 event stream message. 

407 

408 :rtype: dict 

409 :returns: A dictionary of header key, value pairs. 

410 """ 

411 self._data = data 

412 return self._parse_headers() 

413 

414 def _parse_headers(self): 

415 headers = {} 

416 while self._data: 

417 name, value = self._parse_header() 

418 if name in headers: 

419 raise DuplicateHeader(name) 

420 headers[name] = value 

421 return headers 

422 

423 def _parse_header(self): 

424 name = self._parse_name() 

425 value = self._parse_value() 

426 return name, value 

427 

428 def _parse_name(self): 

429 name, consumed = DecodeUtils.unpack_utf8_string(self._data, 1) 

430 self._advance_data(consumed) 

431 return name 

432 

433 def _parse_type(self): 

434 type, consumed = DecodeUtils.unpack_uint8(self._data) 

435 self._advance_data(consumed) 

436 return type 

437 

438 def _parse_value(self): 

439 header_type = self._parse_type() 

440 value_unpacker = self._HEADER_TYPE_MAP[header_type] 

441 value, consumed = value_unpacker(self._data) 

442 self._advance_data(consumed) 

443 return value 

444 

445 def _advance_data(self, consumed): 

446 self._data = self._data[consumed:] 

447 

448 

449class EventStreamBuffer: 

450 """Streaming based event stream buffer 

451 

452 A buffer class that wraps bytes from an event stream providing parsed 

453 messages as they become available via an iterable interface. 

454 """ 

455 

456 def __init__(self): 

457 self._data = b'' 

458 self._prelude = None 

459 self._header_parser = EventStreamHeaderParser() 

460 

461 def add_data(self, data): 

462 """Add data to the buffer. 

463 

464 :type data: bytes 

465 :param data: The bytes to add to the buffer to be used when parsing 

466 """ 

467 self._data += data 

468 

469 def _validate_prelude(self, prelude): 

470 if prelude.headers_length > _MAX_HEADERS_LENGTH: 

471 raise InvalidHeadersLength(prelude.headers_length) 

472 

473 if prelude.payload_length > _MAX_PAYLOAD_LENGTH: 

474 raise InvalidPayloadLength(prelude.payload_length) 

475 

476 def _parse_prelude(self): 

477 prelude_bytes = self._data[:_PRELUDE_LENGTH] 

478 raw_prelude, _ = DecodeUtils.unpack_prelude(prelude_bytes) 

479 prelude = MessagePrelude(*raw_prelude) 

480 self._validate_prelude(prelude) 

481 # The minus 4 removes the prelude crc from the bytes to be checked 

482 _validate_checksum(prelude_bytes[: _PRELUDE_LENGTH - 4], prelude.crc) 

483 return prelude 

484 

485 def _parse_headers(self): 

486 header_bytes = self._data[_PRELUDE_LENGTH : self._prelude.headers_end] 

487 return self._header_parser.parse(header_bytes) 

488 

489 def _parse_payload(self): 

490 prelude = self._prelude 

491 payload_bytes = self._data[prelude.headers_end : prelude.payload_end] 

492 return payload_bytes 

493 

494 def _parse_message_crc(self): 

495 prelude = self._prelude 

496 crc_bytes = self._data[prelude.payload_end : prelude.total_length] 

497 message_crc, _ = DecodeUtils.unpack_uint32(crc_bytes) 

498 return message_crc 

499 

500 def _parse_message_bytes(self): 

501 # The minus 4 includes the prelude crc to the bytes to be checked 

502 message_bytes = self._data[ 

503 _PRELUDE_LENGTH - 4 : self._prelude.payload_end 

504 ] 

505 return message_bytes 

506 

507 def _validate_message_crc(self): 

508 message_crc = self._parse_message_crc() 

509 message_bytes = self._parse_message_bytes() 

510 _validate_checksum(message_bytes, message_crc, crc=self._prelude.crc) 

511 return message_crc 

512 

513 def _parse_message(self): 

514 crc = self._validate_message_crc() 

515 headers = self._parse_headers() 

516 payload = self._parse_payload() 

517 message = EventStreamMessage(self._prelude, headers, payload, crc) 

518 self._prepare_for_next_message() 

519 return message 

520 

521 def _prepare_for_next_message(self): 

522 # Advance the data and reset the current prelude 

523 self._data = self._data[self._prelude.total_length :] 

524 self._prelude = None 

525 

526 def next(self): 

527 """Provides the next available message parsed from the stream 

528 

529 :rtype: EventStreamMessage 

530 :returns: The next event stream message 

531 """ 

532 if len(self._data) < _PRELUDE_LENGTH: 

533 raise StopIteration() 

534 

535 if self._prelude is None: 

536 self._prelude = self._parse_prelude() 

537 

538 if len(self._data) < self._prelude.total_length: 

539 raise StopIteration() 

540 

541 return self._parse_message() 

542 

543 def __next__(self): 

544 return self.next() 

545 

546 def __iter__(self): 

547 return self 

548 

549 

550class EventStream: 

551 """Wrapper class for an event stream body. 

552 

553 This wraps the underlying streaming body, parsing it for individual events 

554 and yielding them as they come available through the iterator interface. 

555 

556 The following example uses the S3 select API to get structured data out of 

557 an object stored in S3 using an event stream. 

558 

559 **Example:** 

560 :: 

561 from botocore.session import Session 

562 

563 s3 = Session().create_client('s3') 

564 response = s3.select_object_content( 

565 Bucket='bucketname', 

566 Key='keyname', 

567 ExpressionType='SQL', 

568 RequestProgress={'Enabled': True}, 

569 Expression="SELECT * FROM S3Object s", 

570 InputSerialization={'CSV': {}}, 

571 OutputSerialization={'CSV': {}}, 

572 ) 

573 # This is the event stream in the response 

574 event_stream = response['Payload'] 

575 end_event_received = False 

576 with open('output', 'wb') as f: 

577 # Iterate over events in the event stream as they come 

578 for event in event_stream: 

579 # If we received a records event, write the data to a file 

580 if 'Records' in event: 

581 data = event['Records']['Payload'] 

582 f.write(data) 

583 # If we received a progress event, print the details 

584 elif 'Progress' in event: 

585 print(event['Progress']['Details']) 

586 # End event indicates that the request finished successfully 

587 elif 'End' in event: 

588 print('Result is complete') 

589 end_event_received = True 

590 if not end_event_received: 

591 raise Exception("End event not received, request incomplete.") 

592 """ 

593 

594 def __init__(self, raw_stream, output_shape, parser, operation_name): 

595 self._raw_stream = raw_stream 

596 self._output_shape = output_shape 

597 self._operation_name = operation_name 

598 self._parser = parser 

599 self._event_generator = self._create_raw_event_generator() 

600 

601 def __iter__(self): 

602 for event in self._event_generator: 

603 parsed_event = self._parse_event(event) 

604 if parsed_event: 

605 yield parsed_event 

606 

607 def _create_raw_event_generator(self): 

608 event_stream_buffer = EventStreamBuffer() 

609 for chunk in self._raw_stream.stream(): 

610 event_stream_buffer.add_data(chunk) 

611 yield from event_stream_buffer 

612 

613 def _parse_event(self, event): 

614 response_dict = event.to_response_dict() 

615 parsed_response = self._parser.parse(response_dict, self._output_shape) 

616 if response_dict['status_code'] == 200: 

617 return parsed_response 

618 else: 

619 raise EventStreamError(parsed_response, self._operation_name) 

620 

621 def get_initial_response(self): 

622 try: 

623 initial_event = next(self._event_generator) 

624 event_type = initial_event.headers.get(':event-type') 

625 if event_type == 'initial-response': 

626 return initial_event 

627 except StopIteration: 

628 pass 

629 raise NoInitialResponseError() 

630 

631 def close(self): 

632 """Closes the underlying streaming body.""" 

633 self._raw_stream.close()