Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/botocore/eventstream.py: 40%
245 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"). You
4# may not use this file except in compliance with the License. A copy of
5# the License is located at
6#
7# http://aws.amazon.com/apache2.0/
8#
9# or in the "license" file accompanying this file. This file is
10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11# ANY KIND, either express or implied. See the License for the specific
12# language governing permissions and limitations under the License.
13"""Binary Event Stream Decoding """
15from binascii import crc32
16from struct import unpack
18from botocore.exceptions import EventStreamError
20# byte length of the prelude (total_length + header_length + prelude_crc)
21_PRELUDE_LENGTH = 12
22_MAX_HEADERS_LENGTH = 128 * 1024 # 128 Kb
23_MAX_PAYLOAD_LENGTH = 16 * 1024**2 # 16 Mb
26class ParserError(Exception):
27 """Base binary flow encoding parsing exception."""
29 pass
32class DuplicateHeader(ParserError):
33 """Duplicate header found in the event."""
35 def __init__(self, header):
36 message = 'Duplicate header present: "%s"' % header
37 super().__init__(message)
40class InvalidHeadersLength(ParserError):
41 """Headers length is longer than the maximum."""
43 def __init__(self, length):
44 message = 'Header length of {} exceeded the maximum of {}'.format(
45 length,
46 _MAX_HEADERS_LENGTH,
47 )
48 super().__init__(message)
51class InvalidPayloadLength(ParserError):
52 """Payload length is longer than the maximum."""
54 def __init__(self, length):
55 message = 'Payload length of {} exceeded the maximum of {}'.format(
56 length,
57 _MAX_PAYLOAD_LENGTH,
58 )
59 super().__init__(message)
62class ChecksumMismatch(ParserError):
63 """Calculated checksum did not match the expected checksum."""
65 def __init__(self, expected, calculated):
66 message = (
67 'Checksum mismatch: expected 0x{:08x}, calculated 0x{:08x}'.format(
68 expected,
69 calculated,
70 )
71 )
72 super().__init__(message)
75class NoInitialResponseError(ParserError):
76 """An event of type initial-response was not received.
78 This exception is raised when the event stream produced no events or
79 the first event in the stream was not of the initial-response type.
80 """
82 def __init__(self):
83 message = 'First event was not of the initial-response type'
84 super().__init__(message)
87class DecodeUtils:
88 """Unpacking utility functions used in the decoder.
90 All methods on this class take raw bytes and return a tuple containing
91 the value parsed from the bytes and the number of bytes consumed to parse
92 that value.
93 """
95 UINT8_BYTE_FORMAT = '!B'
96 UINT16_BYTE_FORMAT = '!H'
97 UINT32_BYTE_FORMAT = '!I'
98 INT8_BYTE_FORMAT = '!b'
99 INT16_BYTE_FORMAT = '!h'
100 INT32_BYTE_FORMAT = '!i'
101 INT64_BYTE_FORMAT = '!q'
102 PRELUDE_BYTE_FORMAT = '!III'
104 # uint byte size to unpack format
105 UINT_BYTE_FORMAT = {
106 1: UINT8_BYTE_FORMAT,
107 2: UINT16_BYTE_FORMAT,
108 4: UINT32_BYTE_FORMAT,
109 }
111 @staticmethod
112 def unpack_true(data):
113 """This method consumes none of the provided bytes and returns True.
115 :type data: bytes
116 :param data: The bytes to parse from. This is ignored in this method.
118 :rtype: tuple
119 :rtype: (bool, int)
120 :returns: The tuple (True, 0)
121 """
122 return True, 0
124 @staticmethod
125 def unpack_false(data):
126 """This method consumes none of the provided bytes and returns False.
128 :type data: bytes
129 :param data: The bytes to parse from. This is ignored in this method.
131 :rtype: tuple
132 :rtype: (bool, int)
133 :returns: The tuple (False, 0)
134 """
135 return False, 0
137 @staticmethod
138 def unpack_uint8(data):
139 """Parse an unsigned 8-bit integer from the bytes.
141 :type data: bytes
142 :param data: The bytes to parse from.
144 :rtype: (int, int)
145 :returns: A tuple containing the (parsed integer value, bytes consumed)
146 """
147 value = unpack(DecodeUtils.UINT8_BYTE_FORMAT, data[:1])[0]
148 return value, 1
150 @staticmethod
151 def unpack_uint32(data):
152 """Parse an unsigned 32-bit integer from the bytes.
154 :type data: bytes
155 :param data: The bytes to parse from.
157 :rtype: (int, int)
158 :returns: A tuple containing the (parsed integer value, bytes consumed)
159 """
160 value = unpack(DecodeUtils.UINT32_BYTE_FORMAT, data[:4])[0]
161 return value, 4
163 @staticmethod
164 def unpack_int8(data):
165 """Parse a signed 8-bit integer from the bytes.
167 :type data: bytes
168 :param data: The bytes to parse from.
170 :rtype: (int, int)
171 :returns: A tuple containing the (parsed integer value, bytes consumed)
172 """
173 value = unpack(DecodeUtils.INT8_BYTE_FORMAT, data[:1])[0]
174 return value, 1
176 @staticmethod
177 def unpack_int16(data):
178 """Parse a signed 16-bit integer from the bytes.
180 :type data: bytes
181 :param data: The bytes to parse from.
183 :rtype: tuple
184 :rtype: (int, int)
185 :returns: A tuple containing the (parsed integer value, bytes consumed)
186 """
187 value = unpack(DecodeUtils.INT16_BYTE_FORMAT, data[:2])[0]
188 return value, 2
190 @staticmethod
191 def unpack_int32(data):
192 """Parse a signed 32-bit integer from the bytes.
194 :type data: bytes
195 :param data: The bytes to parse from.
197 :rtype: tuple
198 :rtype: (int, int)
199 :returns: A tuple containing the (parsed integer value, bytes consumed)
200 """
201 value = unpack(DecodeUtils.INT32_BYTE_FORMAT, data[:4])[0]
202 return value, 4
204 @staticmethod
205 def unpack_int64(data):
206 """Parse a signed 64-bit integer from the bytes.
208 :type data: bytes
209 :param data: The bytes to parse from.
211 :rtype: tuple
212 :rtype: (int, int)
213 :returns: A tuple containing the (parsed integer value, bytes consumed)
214 """
215 value = unpack(DecodeUtils.INT64_BYTE_FORMAT, data[:8])[0]
216 return value, 8
218 @staticmethod
219 def unpack_byte_array(data, length_byte_size=2):
220 """Parse a variable length byte array from the bytes.
222 The bytes are expected to be in the following format:
223 [ length ][0 ... length bytes]
224 where length is an unsigned integer represented in the smallest number
225 of bytes to hold the maximum length of the array.
227 :type data: bytes
228 :param data: The bytes to parse from.
230 :type length_byte_size: int
231 :param length_byte_size: The byte size of the preceding integer that
232 represents the length of the array. Supported values are 1, 2, and 4.
234 :rtype: (bytes, int)
235 :returns: A tuple containing the (parsed byte array, bytes consumed).
236 """
237 uint_byte_format = DecodeUtils.UINT_BYTE_FORMAT[length_byte_size]
238 length = unpack(uint_byte_format, data[:length_byte_size])[0]
239 bytes_end = length + length_byte_size
240 array_bytes = data[length_byte_size:bytes_end]
241 return array_bytes, bytes_end
243 @staticmethod
244 def unpack_utf8_string(data, length_byte_size=2):
245 """Parse a variable length utf-8 string from the bytes.
247 The bytes are expected to be in the following format:
248 [ length ][0 ... length bytes]
249 where length is an unsigned integer represented in the smallest number
250 of bytes to hold the maximum length of the array and the following
251 bytes are a valid utf-8 string.
253 :type data: bytes
254 :param bytes: The bytes to parse from.
256 :type length_byte_size: int
257 :param length_byte_size: The byte size of the preceding integer that
258 represents the length of the array. Supported values are 1, 2, and 4.
260 :rtype: (str, int)
261 :returns: A tuple containing the (utf-8 string, bytes consumed).
262 """
263 array_bytes, consumed = DecodeUtils.unpack_byte_array(
264 data, length_byte_size
265 )
266 return array_bytes.decode('utf-8'), consumed
268 @staticmethod
269 def unpack_uuid(data):
270 """Parse a 16-byte uuid from the bytes.
272 :type data: bytes
273 :param data: The bytes to parse from.
275 :rtype: (bytes, int)
276 :returns: A tuple containing the (uuid bytes, bytes consumed).
277 """
278 return data[:16], 16
280 @staticmethod
281 def unpack_prelude(data):
282 """Parse the prelude for an event stream message from the bytes.
284 The prelude for an event stream message has the following format:
285 [total_length][header_length][prelude_crc]
286 where each field is an unsigned 32-bit integer.
288 :rtype: ((int, int, int), int)
289 :returns: A tuple of ((total_length, headers_length, prelude_crc),
290 consumed)
291 """
292 return (unpack(DecodeUtils.PRELUDE_BYTE_FORMAT, data), _PRELUDE_LENGTH)
295def _validate_checksum(data, checksum, crc=0):
296 # To generate the same numeric value across all Python versions and
297 # platforms use crc32(data) & 0xffffffff.
298 computed_checksum = crc32(data, crc) & 0xFFFFFFFF
299 if checksum != computed_checksum:
300 raise ChecksumMismatch(checksum, computed_checksum)
303class MessagePrelude:
304 """Represents the prelude of an event stream message."""
306 def __init__(self, total_length, headers_length, crc):
307 self.total_length = total_length
308 self.headers_length = headers_length
309 self.crc = crc
311 @property
312 def payload_length(self):
313 """Calculates the total payload length.
315 The extra minus 4 bytes is for the message CRC.
317 :rtype: int
318 :returns: The total payload length.
319 """
320 return self.total_length - self.headers_length - _PRELUDE_LENGTH - 4
322 @property
323 def payload_end(self):
324 """Calculates the byte offset for the end of the message payload.
326 The extra minus 4 bytes is for the message CRC.
328 :rtype: int
329 :returns: The byte offset from the beginning of the event stream
330 message to the end of the payload.
331 """
332 return self.total_length - 4
334 @property
335 def headers_end(self):
336 """Calculates the byte offset for the end of the message headers.
338 :rtype: int
339 :returns: The byte offset from the beginning of the event stream
340 message to the end of the headers.
341 """
342 return _PRELUDE_LENGTH + self.headers_length
345class EventStreamMessage:
346 """Represents an event stream message."""
348 def __init__(self, prelude, headers, payload, crc):
349 self.prelude = prelude
350 self.headers = headers
351 self.payload = payload
352 self.crc = crc
354 def to_response_dict(self, status_code=200):
355 message_type = self.headers.get(':message-type')
356 if message_type == 'error' or message_type == 'exception':
357 status_code = 400
358 return {
359 'status_code': status_code,
360 'headers': self.headers,
361 'body': self.payload,
362 }
365class EventStreamHeaderParser:
366 """Parses the event headers from an event stream message.
368 Expects all of the header data upfront and creates a dictionary of headers
369 to return. This object can be reused multiple times to parse the headers
370 from multiple event stream messages.
371 """
373 # Maps header type to appropriate unpacking function
374 # These unpacking functions return the value and the amount unpacked
375 _HEADER_TYPE_MAP = {
376 # boolean_true
377 0: DecodeUtils.unpack_true,
378 # boolean_false
379 1: DecodeUtils.unpack_false,
380 # byte
381 2: DecodeUtils.unpack_int8,
382 # short
383 3: DecodeUtils.unpack_int16,
384 # integer
385 4: DecodeUtils.unpack_int32,
386 # long
387 5: DecodeUtils.unpack_int64,
388 # byte_array
389 6: DecodeUtils.unpack_byte_array,
390 # string
391 7: DecodeUtils.unpack_utf8_string,
392 # timestamp
393 8: DecodeUtils.unpack_int64,
394 # uuid
395 9: DecodeUtils.unpack_uuid,
396 }
398 def __init__(self):
399 self._data = None
401 def parse(self, data):
402 """Parses the event stream headers from an event stream message.
404 :type data: bytes
405 :param data: The bytes that correspond to the headers section of an
406 event stream message.
408 :rtype: dict
409 :returns: A dictionary of header key, value pairs.
410 """
411 self._data = data
412 return self._parse_headers()
414 def _parse_headers(self):
415 headers = {}
416 while self._data:
417 name, value = self._parse_header()
418 if name in headers:
419 raise DuplicateHeader(name)
420 headers[name] = value
421 return headers
423 def _parse_header(self):
424 name = self._parse_name()
425 value = self._parse_value()
426 return name, value
428 def _parse_name(self):
429 name, consumed = DecodeUtils.unpack_utf8_string(self._data, 1)
430 self._advance_data(consumed)
431 return name
433 def _parse_type(self):
434 type, consumed = DecodeUtils.unpack_uint8(self._data)
435 self._advance_data(consumed)
436 return type
438 def _parse_value(self):
439 header_type = self._parse_type()
440 value_unpacker = self._HEADER_TYPE_MAP[header_type]
441 value, consumed = value_unpacker(self._data)
442 self._advance_data(consumed)
443 return value
445 def _advance_data(self, consumed):
446 self._data = self._data[consumed:]
449class EventStreamBuffer:
450 """Streaming based event stream buffer
452 A buffer class that wraps bytes from an event stream providing parsed
453 messages as they become available via an iterable interface.
454 """
456 def __init__(self):
457 self._data = b''
458 self._prelude = None
459 self._header_parser = EventStreamHeaderParser()
461 def add_data(self, data):
462 """Add data to the buffer.
464 :type data: bytes
465 :param data: The bytes to add to the buffer to be used when parsing
466 """
467 self._data += data
469 def _validate_prelude(self, prelude):
470 if prelude.headers_length > _MAX_HEADERS_LENGTH:
471 raise InvalidHeadersLength(prelude.headers_length)
473 if prelude.payload_length > _MAX_PAYLOAD_LENGTH:
474 raise InvalidPayloadLength(prelude.payload_length)
476 def _parse_prelude(self):
477 prelude_bytes = self._data[:_PRELUDE_LENGTH]
478 raw_prelude, _ = DecodeUtils.unpack_prelude(prelude_bytes)
479 prelude = MessagePrelude(*raw_prelude)
480 self._validate_prelude(prelude)
481 # The minus 4 removes the prelude crc from the bytes to be checked
482 _validate_checksum(prelude_bytes[: _PRELUDE_LENGTH - 4], prelude.crc)
483 return prelude
485 def _parse_headers(self):
486 header_bytes = self._data[_PRELUDE_LENGTH : self._prelude.headers_end]
487 return self._header_parser.parse(header_bytes)
489 def _parse_payload(self):
490 prelude = self._prelude
491 payload_bytes = self._data[prelude.headers_end : prelude.payload_end]
492 return payload_bytes
494 def _parse_message_crc(self):
495 prelude = self._prelude
496 crc_bytes = self._data[prelude.payload_end : prelude.total_length]
497 message_crc, _ = DecodeUtils.unpack_uint32(crc_bytes)
498 return message_crc
500 def _parse_message_bytes(self):
501 # The minus 4 includes the prelude crc to the bytes to be checked
502 message_bytes = self._data[
503 _PRELUDE_LENGTH - 4 : self._prelude.payload_end
504 ]
505 return message_bytes
507 def _validate_message_crc(self):
508 message_crc = self._parse_message_crc()
509 message_bytes = self._parse_message_bytes()
510 _validate_checksum(message_bytes, message_crc, crc=self._prelude.crc)
511 return message_crc
513 def _parse_message(self):
514 crc = self._validate_message_crc()
515 headers = self._parse_headers()
516 payload = self._parse_payload()
517 message = EventStreamMessage(self._prelude, headers, payload, crc)
518 self._prepare_for_next_message()
519 return message
521 def _prepare_for_next_message(self):
522 # Advance the data and reset the current prelude
523 self._data = self._data[self._prelude.total_length :]
524 self._prelude = None
526 def next(self):
527 """Provides the next available message parsed from the stream
529 :rtype: EventStreamMessage
530 :returns: The next event stream message
531 """
532 if len(self._data) < _PRELUDE_LENGTH:
533 raise StopIteration()
535 if self._prelude is None:
536 self._prelude = self._parse_prelude()
538 if len(self._data) < self._prelude.total_length:
539 raise StopIteration()
541 return self._parse_message()
543 def __next__(self):
544 return self.next()
546 def __iter__(self):
547 return self
550class EventStream:
551 """Wrapper class for an event stream body.
553 This wraps the underlying streaming body, parsing it for individual events
554 and yielding them as they come available through the iterator interface.
556 The following example uses the S3 select API to get structured data out of
557 an object stored in S3 using an event stream.
559 **Example:**
560 ::
561 from botocore.session import Session
563 s3 = Session().create_client('s3')
564 response = s3.select_object_content(
565 Bucket='bucketname',
566 Key='keyname',
567 ExpressionType='SQL',
568 RequestProgress={'Enabled': True},
569 Expression="SELECT * FROM S3Object s",
570 InputSerialization={'CSV': {}},
571 OutputSerialization={'CSV': {}},
572 )
573 # This is the event stream in the response
574 event_stream = response['Payload']
575 end_event_received = False
576 with open('output', 'wb') as f:
577 # Iterate over events in the event stream as they come
578 for event in event_stream:
579 # If we received a records event, write the data to a file
580 if 'Records' in event:
581 data = event['Records']['Payload']
582 f.write(data)
583 # If we received a progress event, print the details
584 elif 'Progress' in event:
585 print(event['Progress']['Details'])
586 # End event indicates that the request finished successfully
587 elif 'End' in event:
588 print('Result is complete')
589 end_event_received = True
590 if not end_event_received:
591 raise Exception("End event not received, request incomplete.")
592 """
594 def __init__(self, raw_stream, output_shape, parser, operation_name):
595 self._raw_stream = raw_stream
596 self._output_shape = output_shape
597 self._operation_name = operation_name
598 self._parser = parser
599 self._event_generator = self._create_raw_event_generator()
601 def __iter__(self):
602 for event in self._event_generator:
603 parsed_event = self._parse_event(event)
604 if parsed_event:
605 yield parsed_event
607 def _create_raw_event_generator(self):
608 event_stream_buffer = EventStreamBuffer()
609 for chunk in self._raw_stream.stream():
610 event_stream_buffer.add_data(chunk)
611 yield from event_stream_buffer
613 def _parse_event(self, event):
614 response_dict = event.to_response_dict()
615 parsed_response = self._parser.parse(response_dict, self._output_shape)
616 if response_dict['status_code'] == 200:
617 return parsed_response
618 else:
619 raise EventStreamError(parsed_response, self._operation_name)
621 def get_initial_response(self):
622 try:
623 initial_event = next(self._event_generator)
624 event_type = initial_event.headers.get(':event-type')
625 if event_type == 'initial-response':
626 return initial_event
627 except StopIteration:
628 pass
629 raise NoInitialResponseError()
631 def close(self):
632 """Closes the underlying streaming body."""
633 self._raw_stream.close()