1# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"). You
4# may not use this file except in compliance with the License. A copy of
5# the License is located at
6#
7# http://aws.amazon.com/apache2.0/
8#
9# or in the "license" file accompanying this file. This file is
10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11# ANY KIND, either express or implied. See the License for the specific
12# language governing permissions and limitations under the License.
13"""Binary Event Stream Decoding"""
14
15from binascii import crc32
16from struct import unpack
17
18from botocore.exceptions import EventStreamError
19
20# byte length of the prelude (total_length + header_length + prelude_crc)
21_PRELUDE_LENGTH = 12
22_MAX_HEADERS_LENGTH = 128 * 1024 # 128 Kb
23_MAX_PAYLOAD_LENGTH = 16 * 1024**2 # 16 Mb
24
25
26class ParserError(Exception):
27 """Base binary flow encoding parsing exception."""
28
29 pass
30
31
32class DuplicateHeader(ParserError):
33 """Duplicate header found in the event."""
34
35 def __init__(self, header):
36 message = f'Duplicate header present: "{header}"'
37 super().__init__(message)
38
39
40class InvalidHeadersLength(ParserError):
41 """Headers length is longer than the maximum."""
42
43 def __init__(self, length):
44 message = f'Header length of {length} exceeded the maximum of {_MAX_HEADERS_LENGTH}'
45 super().__init__(message)
46
47
48class InvalidPayloadLength(ParserError):
49 """Payload length is longer than the maximum."""
50
51 def __init__(self, length):
52 message = f'Payload length of {length} exceeded the maximum of {_MAX_PAYLOAD_LENGTH}'
53 super().__init__(message)
54
55
56class ChecksumMismatch(ParserError):
57 """Calculated checksum did not match the expected checksum."""
58
59 def __init__(self, expected, calculated):
60 message = f'Checksum mismatch: expected 0x{expected:08x}, calculated 0x{calculated:08x}'
61 super().__init__(message)
62
63
64class NoInitialResponseError(ParserError):
65 """An event of type initial-response was not received.
66
67 This exception is raised when the event stream produced no events or
68 the first event in the stream was not of the initial-response type.
69 """
70
71 def __init__(self):
72 message = 'First event was not of the initial-response type'
73 super().__init__(message)
74
75
76class DecodeUtils:
77 """Unpacking utility functions used in the decoder.
78
79 All methods on this class take raw bytes and return a tuple containing
80 the value parsed from the bytes and the number of bytes consumed to parse
81 that value.
82 """
83
84 UINT8_BYTE_FORMAT = '!B'
85 UINT16_BYTE_FORMAT = '!H'
86 UINT32_BYTE_FORMAT = '!I'
87 INT8_BYTE_FORMAT = '!b'
88 INT16_BYTE_FORMAT = '!h'
89 INT32_BYTE_FORMAT = '!i'
90 INT64_BYTE_FORMAT = '!q'
91 PRELUDE_BYTE_FORMAT = '!III'
92
93 # uint byte size to unpack format
94 UINT_BYTE_FORMAT = {
95 1: UINT8_BYTE_FORMAT,
96 2: UINT16_BYTE_FORMAT,
97 4: UINT32_BYTE_FORMAT,
98 }
99
100 @staticmethod
101 def unpack_true(data):
102 """This method consumes none of the provided bytes and returns True.
103
104 :type data: bytes
105 :param data: The bytes to parse from. This is ignored in this method.
106
107 :rtype: tuple
108 :rtype: (bool, int)
109 :returns: The tuple (True, 0)
110 """
111 return True, 0
112
113 @staticmethod
114 def unpack_false(data):
115 """This method consumes none of the provided bytes and returns False.
116
117 :type data: bytes
118 :param data: The bytes to parse from. This is ignored in this method.
119
120 :rtype: tuple
121 :rtype: (bool, int)
122 :returns: The tuple (False, 0)
123 """
124 return False, 0
125
126 @staticmethod
127 def unpack_uint8(data):
128 """Parse an unsigned 8-bit integer from the bytes.
129
130 :type data: bytes
131 :param data: The bytes to parse from.
132
133 :rtype: (int, int)
134 :returns: A tuple containing the (parsed integer value, bytes consumed)
135 """
136 value = unpack(DecodeUtils.UINT8_BYTE_FORMAT, data[:1])[0]
137 return value, 1
138
139 @staticmethod
140 def unpack_uint32(data):
141 """Parse an unsigned 32-bit integer from the bytes.
142
143 :type data: bytes
144 :param data: The bytes to parse from.
145
146 :rtype: (int, int)
147 :returns: A tuple containing the (parsed integer value, bytes consumed)
148 """
149 value = unpack(DecodeUtils.UINT32_BYTE_FORMAT, data[:4])[0]
150 return value, 4
151
152 @staticmethod
153 def unpack_int8(data):
154 """Parse a signed 8-bit integer from the bytes.
155
156 :type data: bytes
157 :param data: The bytes to parse from.
158
159 :rtype: (int, int)
160 :returns: A tuple containing the (parsed integer value, bytes consumed)
161 """
162 value = unpack(DecodeUtils.INT8_BYTE_FORMAT, data[:1])[0]
163 return value, 1
164
165 @staticmethod
166 def unpack_int16(data):
167 """Parse a signed 16-bit integer from the bytes.
168
169 :type data: bytes
170 :param data: The bytes to parse from.
171
172 :rtype: tuple
173 :rtype: (int, int)
174 :returns: A tuple containing the (parsed integer value, bytes consumed)
175 """
176 value = unpack(DecodeUtils.INT16_BYTE_FORMAT, data[:2])[0]
177 return value, 2
178
179 @staticmethod
180 def unpack_int32(data):
181 """Parse a signed 32-bit integer from the bytes.
182
183 :type data: bytes
184 :param data: The bytes to parse from.
185
186 :rtype: tuple
187 :rtype: (int, int)
188 :returns: A tuple containing the (parsed integer value, bytes consumed)
189 """
190 value = unpack(DecodeUtils.INT32_BYTE_FORMAT, data[:4])[0]
191 return value, 4
192
193 @staticmethod
194 def unpack_int64(data):
195 """Parse a signed 64-bit integer from the bytes.
196
197 :type data: bytes
198 :param data: The bytes to parse from.
199
200 :rtype: tuple
201 :rtype: (int, int)
202 :returns: A tuple containing the (parsed integer value, bytes consumed)
203 """
204 value = unpack(DecodeUtils.INT64_BYTE_FORMAT, data[:8])[0]
205 return value, 8
206
207 @staticmethod
208 def unpack_byte_array(data, length_byte_size=2):
209 """Parse a variable length byte array from the bytes.
210
211 The bytes are expected to be in the following format:
212 [ length ][0 ... length bytes]
213 where length is an unsigned integer represented in the smallest number
214 of bytes to hold the maximum length of the array.
215
216 :type data: bytes
217 :param data: The bytes to parse from.
218
219 :type length_byte_size: int
220 :param length_byte_size: The byte size of the preceding integer that
221 represents the length of the array. Supported values are 1, 2, and 4.
222
223 :rtype: (bytes, int)
224 :returns: A tuple containing the (parsed byte array, bytes consumed).
225 """
226 uint_byte_format = DecodeUtils.UINT_BYTE_FORMAT[length_byte_size]
227 length = unpack(uint_byte_format, data[:length_byte_size])[0]
228 bytes_end = length + length_byte_size
229 array_bytes = data[length_byte_size:bytes_end]
230 return array_bytes, bytes_end
231
232 @staticmethod
233 def unpack_utf8_string(data, length_byte_size=2):
234 """Parse a variable length utf-8 string from the bytes.
235
236 The bytes are expected to be in the following format:
237 [ length ][0 ... length bytes]
238 where length is an unsigned integer represented in the smallest number
239 of bytes to hold the maximum length of the array and the following
240 bytes are a valid utf-8 string.
241
242 :type data: bytes
243 :param bytes: The bytes to parse from.
244
245 :type length_byte_size: int
246 :param length_byte_size: The byte size of the preceding integer that
247 represents the length of the array. Supported values are 1, 2, and 4.
248
249 :rtype: (str, int)
250 :returns: A tuple containing the (utf-8 string, bytes consumed).
251 """
252 array_bytes, consumed = DecodeUtils.unpack_byte_array(
253 data, length_byte_size
254 )
255 return array_bytes.decode('utf-8'), consumed
256
257 @staticmethod
258 def unpack_uuid(data):
259 """Parse a 16-byte uuid from the bytes.
260
261 :type data: bytes
262 :param data: The bytes to parse from.
263
264 :rtype: (bytes, int)
265 :returns: A tuple containing the (uuid bytes, bytes consumed).
266 """
267 return data[:16], 16
268
269 @staticmethod
270 def unpack_prelude(data):
271 """Parse the prelude for an event stream message from the bytes.
272
273 The prelude for an event stream message has the following format:
274 [total_length][header_length][prelude_crc]
275 where each field is an unsigned 32-bit integer.
276
277 :rtype: ((int, int, int), int)
278 :returns: A tuple of ((total_length, headers_length, prelude_crc),
279 consumed)
280 """
281 return (unpack(DecodeUtils.PRELUDE_BYTE_FORMAT, data), _PRELUDE_LENGTH)
282
283
284def _validate_checksum(data, checksum, crc=0):
285 # To generate the same numeric value across all Python versions and
286 # platforms use crc32(data) & 0xffffffff.
287 computed_checksum = crc32(data, crc) & 0xFFFFFFFF
288 if checksum != computed_checksum:
289 raise ChecksumMismatch(checksum, computed_checksum)
290
291
292class MessagePrelude:
293 """Represents the prelude of an event stream message."""
294
295 def __init__(self, total_length, headers_length, crc):
296 self.total_length = total_length
297 self.headers_length = headers_length
298 self.crc = crc
299
300 @property
301 def payload_length(self):
302 """Calculates the total payload length.
303
304 The extra minus 4 bytes is for the message CRC.
305
306 :rtype: int
307 :returns: The total payload length.
308 """
309 return self.total_length - self.headers_length - _PRELUDE_LENGTH - 4
310
311 @property
312 def payload_end(self):
313 """Calculates the byte offset for the end of the message payload.
314
315 The extra minus 4 bytes is for the message CRC.
316
317 :rtype: int
318 :returns: The byte offset from the beginning of the event stream
319 message to the end of the payload.
320 """
321 return self.total_length - 4
322
323 @property
324 def headers_end(self):
325 """Calculates the byte offset for the end of the message headers.
326
327 :rtype: int
328 :returns: The byte offset from the beginning of the event stream
329 message to the end of the headers.
330 """
331 return _PRELUDE_LENGTH + self.headers_length
332
333
334class EventStreamMessage:
335 """Represents an event stream message."""
336
337 def __init__(self, prelude, headers, payload, crc):
338 self.prelude = prelude
339 self.headers = headers
340 self.payload = payload
341 self.crc = crc
342
343 def to_response_dict(self, status_code=200):
344 message_type = self.headers.get(':message-type')
345 if message_type == 'error' or message_type == 'exception':
346 status_code = 400
347 return {
348 'status_code': status_code,
349 'headers': self.headers,
350 'body': self.payload,
351 }
352
353
354class EventStreamHeaderParser:
355 """Parses the event headers from an event stream message.
356
357 Expects all of the header data upfront and creates a dictionary of headers
358 to return. This object can be reused multiple times to parse the headers
359 from multiple event stream messages.
360 """
361
362 # Maps header type to appropriate unpacking function
363 # These unpacking functions return the value and the amount unpacked
364 _HEADER_TYPE_MAP = {
365 # boolean_true
366 0: DecodeUtils.unpack_true,
367 # boolean_false
368 1: DecodeUtils.unpack_false,
369 # byte
370 2: DecodeUtils.unpack_int8,
371 # short
372 3: DecodeUtils.unpack_int16,
373 # integer
374 4: DecodeUtils.unpack_int32,
375 # long
376 5: DecodeUtils.unpack_int64,
377 # byte_array
378 6: DecodeUtils.unpack_byte_array,
379 # string
380 7: DecodeUtils.unpack_utf8_string,
381 # timestamp
382 8: DecodeUtils.unpack_int64,
383 # uuid
384 9: DecodeUtils.unpack_uuid,
385 }
386
387 def __init__(self):
388 self._data = None
389
390 def parse(self, data):
391 """Parses the event stream headers from an event stream message.
392
393 :type data: bytes
394 :param data: The bytes that correspond to the headers section of an
395 event stream message.
396
397 :rtype: dict
398 :returns: A dictionary of header key, value pairs.
399 """
400 self._data = data
401 return self._parse_headers()
402
403 def _parse_headers(self):
404 headers = {}
405 while self._data:
406 name, value = self._parse_header()
407 if name in headers:
408 raise DuplicateHeader(name)
409 headers[name] = value
410 return headers
411
412 def _parse_header(self):
413 name = self._parse_name()
414 value = self._parse_value()
415 return name, value
416
417 def _parse_name(self):
418 name, consumed = DecodeUtils.unpack_utf8_string(self._data, 1)
419 self._advance_data(consumed)
420 return name
421
422 def _parse_type(self):
423 type, consumed = DecodeUtils.unpack_uint8(self._data)
424 self._advance_data(consumed)
425 return type
426
427 def _parse_value(self):
428 header_type = self._parse_type()
429 value_unpacker = self._HEADER_TYPE_MAP[header_type]
430 value, consumed = value_unpacker(self._data)
431 self._advance_data(consumed)
432 return value
433
434 def _advance_data(self, consumed):
435 self._data = self._data[consumed:]
436
437
438class EventStreamBuffer:
439 """Streaming based event stream buffer
440
441 A buffer class that wraps bytes from an event stream providing parsed
442 messages as they become available via an iterable interface.
443 """
444
445 def __init__(self):
446 self._data = b''
447 self._prelude = None
448 self._header_parser = EventStreamHeaderParser()
449
450 def add_data(self, data):
451 """Add data to the buffer.
452
453 :type data: bytes
454 :param data: The bytes to add to the buffer to be used when parsing
455 """
456 self._data += data
457
458 def _validate_prelude(self, prelude):
459 if prelude.headers_length > _MAX_HEADERS_LENGTH:
460 raise InvalidHeadersLength(prelude.headers_length)
461
462 if prelude.payload_length > _MAX_PAYLOAD_LENGTH:
463 raise InvalidPayloadLength(prelude.payload_length)
464
465 def _parse_prelude(self):
466 prelude_bytes = self._data[:_PRELUDE_LENGTH]
467 raw_prelude, _ = DecodeUtils.unpack_prelude(prelude_bytes)
468 prelude = MessagePrelude(*raw_prelude)
469 # The minus 4 removes the prelude crc from the bytes to be checked
470 _validate_checksum(prelude_bytes[: _PRELUDE_LENGTH - 4], prelude.crc)
471 self._validate_prelude(prelude)
472 return prelude
473
474 def _parse_headers(self):
475 header_bytes = self._data[_PRELUDE_LENGTH : self._prelude.headers_end]
476 return self._header_parser.parse(header_bytes)
477
478 def _parse_payload(self):
479 prelude = self._prelude
480 payload_bytes = self._data[prelude.headers_end : prelude.payload_end]
481 return payload_bytes
482
483 def _parse_message_crc(self):
484 prelude = self._prelude
485 crc_bytes = self._data[prelude.payload_end : prelude.total_length]
486 message_crc, _ = DecodeUtils.unpack_uint32(crc_bytes)
487 return message_crc
488
489 def _parse_message_bytes(self):
490 # The minus 4 includes the prelude crc to the bytes to be checked
491 message_bytes = self._data[
492 _PRELUDE_LENGTH - 4 : self._prelude.payload_end
493 ]
494 return message_bytes
495
496 def _validate_message_crc(self):
497 message_crc = self._parse_message_crc()
498 message_bytes = self._parse_message_bytes()
499 _validate_checksum(message_bytes, message_crc, crc=self._prelude.crc)
500 return message_crc
501
502 def _parse_message(self):
503 crc = self._validate_message_crc()
504 headers = self._parse_headers()
505 payload = self._parse_payload()
506 message = EventStreamMessage(self._prelude, headers, payload, crc)
507 self._prepare_for_next_message()
508 return message
509
510 def _prepare_for_next_message(self):
511 # Advance the data and reset the current prelude
512 self._data = self._data[self._prelude.total_length :]
513 self._prelude = None
514
515 def next(self):
516 """Provides the next available message parsed from the stream
517
518 :rtype: EventStreamMessage
519 :returns: The next event stream message
520 """
521 if len(self._data) < _PRELUDE_LENGTH:
522 raise StopIteration()
523
524 if self._prelude is None:
525 self._prelude = self._parse_prelude()
526
527 if len(self._data) < self._prelude.total_length:
528 raise StopIteration()
529
530 return self._parse_message()
531
532 def __next__(self):
533 return self.next()
534
535 def __iter__(self):
536 return self
537
538
539class EventStream:
540 """Wrapper class for an event stream body.
541
542 This wraps the underlying streaming body, parsing it for individual events
543 and yielding them as they come available through the iterator interface.
544
545 The following example uses the S3 select API to get structured data out of
546 an object stored in S3 using an event stream.
547
548 **Example:**
549 ::
550 from botocore.session import Session
551
552 s3 = Session().create_client('s3')
553 response = s3.select_object_content(
554 Bucket='bucketname',
555 Key='keyname',
556 ExpressionType='SQL',
557 RequestProgress={'Enabled': True},
558 Expression="SELECT * FROM S3Object s",
559 InputSerialization={'CSV': {}},
560 OutputSerialization={'CSV': {}},
561 )
562 # This is the event stream in the response
563 event_stream = response['Payload']
564 end_event_received = False
565 with open('output', 'wb') as f:
566 # Iterate over events in the event stream as they come
567 for event in event_stream:
568 # If we received a records event, write the data to a file
569 if 'Records' in event:
570 data = event['Records']['Payload']
571 f.write(data)
572 # If we received a progress event, print the details
573 elif 'Progress' in event:
574 print(event['Progress']['Details'])
575 # End event indicates that the request finished successfully
576 elif 'End' in event:
577 print('Result is complete')
578 end_event_received = True
579 if not end_event_received:
580 raise Exception("End event not received, request incomplete.")
581 """
582
583 def __init__(self, raw_stream, output_shape, parser, operation_name):
584 self._raw_stream = raw_stream
585 self._output_shape = output_shape
586 self._operation_name = operation_name
587 self._parser = parser
588 self._event_generator = self._create_raw_event_generator()
589
590 def __iter__(self):
591 for event in self._event_generator:
592 parsed_event = self._parse_event(event)
593 if parsed_event:
594 yield parsed_event
595
596 def _create_raw_event_generator(self):
597 event_stream_buffer = EventStreamBuffer()
598 for chunk in self._raw_stream.stream():
599 event_stream_buffer.add_data(chunk)
600 yield from event_stream_buffer
601
602 def _parse_event(self, event):
603 response_dict = event.to_response_dict()
604 parsed_response = self._parser.parse(response_dict, self._output_shape)
605 if response_dict['status_code'] == 200:
606 return parsed_response
607 else:
608 raise EventStreamError(parsed_response, self._operation_name)
609
610 def get_initial_response(self):
611 try:
612 initial_event = next(self._event_generator)
613 event_type = initial_event.headers.get(':event-type')
614 if event_type == 'initial-response':
615 return initial_event
616 except StopIteration:
617 pass
618 raise NoInitialResponseError()
619
620 def close(self):
621 """Closes the underlying streaming body."""
622 self._raw_stream.close()