1# Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"). You
4# may not use this file except in compliance with the License. A copy of
5# the License is located at
6#
7# http://aws.amazon.com/apache2.0/
8#
9# or in the "license" file accompanying this file. This file is
10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11# ANY KIND, either express or implied. See the License for the specific
12# language governing permissions and limitations under the License.
13
14import base64
15import json
16import logging
17from functools import partial
18from itertools import tee
19
20import jmespath
21
22from botocore.context import with_current_context
23from botocore.exceptions import PaginationError
24from botocore.useragent import register_feature_id
25from botocore.utils import merge_dicts, set_value_from_jmespath
26
27log = logging.getLogger(__name__)
28
29
30class TokenEncoder:
31 """Encodes dictionaries into opaque strings.
32
33 This for the most part json dumps + base64 encoding, but also supports
34 having bytes in the dictionary in addition to the types that json can
35 handle by default.
36
37 This is intended for use in encoding pagination tokens, which in some
38 cases can be complex structures and / or contain bytes.
39 """
40
41 def encode(self, token):
42 """Encodes a dictionary to an opaque string.
43
44 :type token: dict
45 :param token: A dictionary containing pagination information,
46 particularly the service pagination token(s) but also other boto
47 metadata.
48
49 :rtype: str
50 :returns: An opaque string
51 """
52 try:
53 # Try just using json dumps first to avoid having to traverse
54 # and encode the dict. In 99.9999% of cases this will work.
55 json_string = json.dumps(token)
56 except (TypeError, UnicodeDecodeError):
57 # If normal dumping failed, go through and base64 encode all bytes.
58 encoded_token, encoded_keys = self._encode(token, [])
59
60 # Save the list of all the encoded key paths. We can safely
61 # assume that no service will ever use this key.
62 encoded_token['boto_encoded_keys'] = encoded_keys
63
64 # Now that the bytes are all encoded, dump the json.
65 json_string = json.dumps(encoded_token)
66
67 # base64 encode the json string to produce an opaque token string.
68 return base64.b64encode(json_string.encode('utf-8')).decode('utf-8')
69
70 def _encode(self, data, path):
71 """Encode bytes in given data, keeping track of the path traversed."""
72 if isinstance(data, dict):
73 return self._encode_dict(data, path)
74 elif isinstance(data, list):
75 return self._encode_list(data, path)
76 elif isinstance(data, bytes):
77 return self._encode_bytes(data, path)
78 else:
79 return data, []
80
81 def _encode_list(self, data, path):
82 """Encode any bytes in a list, noting the index of what is encoded."""
83 new_data = []
84 encoded = []
85 for i, value in enumerate(data):
86 new_path = path + [i]
87 new_value, new_encoded = self._encode(value, new_path)
88 new_data.append(new_value)
89 encoded.extend(new_encoded)
90 return new_data, encoded
91
92 def _encode_dict(self, data, path):
93 """Encode any bytes in a dict, noting the index of what is encoded."""
94 new_data = {}
95 encoded = []
96 for key, value in data.items():
97 new_path = path + [key]
98 new_value, new_encoded = self._encode(value, new_path)
99 new_data[key] = new_value
100 encoded.extend(new_encoded)
101 return new_data, encoded
102
103 def _encode_bytes(self, data, path):
104 """Base64 encode a byte string."""
105 return base64.b64encode(data).decode('utf-8'), [path]
106
107
108class TokenDecoder:
109 """Decodes token strings back into dictionaries.
110
111 This performs the inverse operation to the TokenEncoder, accepting
112 opaque strings and decoding them into a useable form.
113 """
114
115 def decode(self, token):
116 """Decodes an opaque string to a dictionary.
117
118 :type token: str
119 :param token: A token string given by the botocore pagination
120 interface.
121
122 :rtype: dict
123 :returns: A dictionary containing pagination information,
124 particularly the service pagination token(s) but also other boto
125 metadata.
126 """
127 json_string = base64.b64decode(token.encode('utf-8')).decode('utf-8')
128 decoded_token = json.loads(json_string)
129
130 # Remove the encoding metadata as it is read since it will no longer
131 # be needed.
132 encoded_keys = decoded_token.pop('boto_encoded_keys', None)
133 if encoded_keys is None:
134 return decoded_token
135 else:
136 return self._decode(decoded_token, encoded_keys)
137
138 def _decode(self, token, encoded_keys):
139 """Find each encoded value and decode it."""
140 for key in encoded_keys:
141 encoded = self._path_get(token, key)
142 decoded = base64.b64decode(encoded.encode('utf-8'))
143 self._path_set(token, key, decoded)
144 return token
145
146 def _path_get(self, data, path):
147 """Return the nested data at the given path.
148
149 For instance:
150 data = {'foo': ['bar', 'baz']}
151 path = ['foo', 0]
152 ==> 'bar'
153 """
154 # jmespath isn't used here because it would be difficult to actually
155 # create the jmespath query when taking all of the unknowns of key
156 # structure into account. Gross though this is, it is simple and not
157 # very error prone.
158 d = data
159 for step in path:
160 d = d[step]
161 return d
162
163 def _path_set(self, data, path, value):
164 """Set the value of a key in the given data.
165
166 Example:
167 data = {'foo': ['bar', 'baz']}
168 path = ['foo', 1]
169 value = 'bin'
170 ==> data = {'foo': ['bar', 'bin']}
171 """
172 container = self._path_get(data, path[:-1])
173 container[path[-1]] = value
174
175
176class PaginatorModel:
177 def __init__(self, paginator_config):
178 self._paginator_config = paginator_config['pagination']
179
180 def get_paginator(self, operation_name):
181 try:
182 single_paginator_config = self._paginator_config[operation_name]
183 except KeyError:
184 raise ValueError(
185 f"Paginator for operation does not exist: {operation_name}"
186 )
187 return single_paginator_config
188
189
190class PageIterator:
191 """An iterable object to paginate API results.
192 Please note it is NOT a python iterator.
193 Use ``iter`` to wrap this as a generator.
194 """
195
196 def __init__(
197 self,
198 method,
199 input_token,
200 output_token,
201 more_results,
202 result_keys,
203 non_aggregate_keys,
204 limit_key,
205 max_items,
206 starting_token,
207 page_size,
208 op_kwargs,
209 ):
210 self._method = method
211 self._input_token = input_token
212 self._output_token = output_token
213 self._more_results = more_results
214 self._result_keys = result_keys
215 self._max_items = max_items
216 self._limit_key = limit_key
217 self._starting_token = starting_token
218 self._page_size = page_size
219 self._op_kwargs = op_kwargs
220 self._resume_token = None
221 self._non_aggregate_key_exprs = non_aggregate_keys
222 self._non_aggregate_part = {}
223 self._token_encoder = TokenEncoder()
224 self._token_decoder = TokenDecoder()
225
226 @property
227 def result_keys(self):
228 return self._result_keys
229
230 @property
231 def resume_token(self):
232 """Token to specify to resume pagination."""
233 return self._resume_token
234
235 @resume_token.setter
236 def resume_token(self, value):
237 if not isinstance(value, dict):
238 raise ValueError(f"Bad starting token: {value}")
239
240 if 'boto_truncate_amount' in value:
241 token_keys = sorted(self._input_token + ['boto_truncate_amount'])
242 else:
243 token_keys = sorted(self._input_token)
244 dict_keys = sorted(value.keys())
245
246 if token_keys == dict_keys:
247 self._resume_token = self._token_encoder.encode(value)
248 else:
249 raise ValueError(f"Bad starting token: {value}")
250
251 @property
252 def non_aggregate_part(self):
253 return self._non_aggregate_part
254
255 def __iter__(self):
256 current_kwargs = self._op_kwargs
257 previous_next_token = None
258 next_token = {key: None for key in self._input_token}
259 if self._starting_token is not None:
260 # If the starting token exists, populate the next_token with the
261 # values inside it. This ensures that we have the service's
262 # pagination token on hand if we need to truncate after the
263 # first response.
264 next_token = self._parse_starting_token()[0]
265 # The number of items from result_key we've seen so far.
266 total_items = 0
267 first_request = True
268 primary_result_key = self.result_keys[0]
269 starting_truncation = 0
270 self._inject_starting_params(current_kwargs)
271 while True:
272 response = self._make_request(current_kwargs)
273 parsed = self._extract_parsed_response(response)
274 if first_request:
275 # The first request is handled differently. We could
276 # possibly have a resume/starting token that tells us where
277 # to index into the retrieved page.
278 if self._starting_token is not None:
279 starting_truncation = self._handle_first_request(
280 parsed, primary_result_key, starting_truncation
281 )
282 first_request = False
283 self._record_non_aggregate_key_values(parsed)
284 else:
285 # If this isn't the first request, we have already sliced into
286 # the first request and had to make additional requests after.
287 # We no longer need to add this to truncation.
288 starting_truncation = 0
289 current_response = primary_result_key.search(parsed)
290 if current_response is None:
291 current_response = []
292 num_current_response = len(current_response)
293 truncate_amount = 0
294 if self._max_items is not None:
295 truncate_amount = (
296 total_items + num_current_response - self._max_items
297 )
298 if truncate_amount > 0:
299 self._truncate_response(
300 parsed,
301 primary_result_key,
302 truncate_amount,
303 starting_truncation,
304 next_token,
305 )
306 yield response
307 break
308 else:
309 yield response
310 total_items += num_current_response
311 next_token = self._get_next_token(parsed)
312 if all(t is None for t in next_token.values()):
313 break
314 if (
315 self._max_items is not None
316 and total_items == self._max_items
317 ):
318 # We're on a page boundary so we can set the current
319 # next token to be the resume token.
320 self.resume_token = next_token
321 break
322 if (
323 previous_next_token is not None
324 and previous_next_token == next_token
325 ):
326 message = (
327 f"The same next token was received twice: {next_token}"
328 )
329 raise PaginationError(message=message)
330 self._inject_token_into_kwargs(current_kwargs, next_token)
331 previous_next_token = next_token
332
333 def search(self, expression):
334 """Applies a JMESPath expression to a paginator
335
336 Each page of results is searched using the provided JMESPath
337 expression. If the result is not a list, it is yielded
338 directly. If the result is a list, each element in the result
339 is yielded individually (essentially implementing a flatmap in
340 which the JMESPath search is the mapping function).
341
342 :type expression: str
343 :param expression: JMESPath expression to apply to each page.
344
345 :return: Returns an iterator that yields the individual
346 elements of applying a JMESPath expression to each page of
347 results.
348 """
349 compiled = jmespath.compile(expression)
350 for page in self:
351 results = compiled.search(page)
352 if isinstance(results, list):
353 yield from results
354 else:
355 # Yield result directly if it is not a list.
356 yield results
357
358 @with_current_context(partial(register_feature_id, 'PAGINATOR'))
359 def _make_request(self, current_kwargs):
360 return self._method(**current_kwargs)
361
362 def _extract_parsed_response(self, response):
363 return response
364
365 def _record_non_aggregate_key_values(self, response):
366 non_aggregate_keys = {}
367 for expression in self._non_aggregate_key_exprs:
368 result = expression.search(response)
369 set_value_from_jmespath(
370 non_aggregate_keys, expression.expression, result
371 )
372 self._non_aggregate_part = non_aggregate_keys
373
374 def _inject_starting_params(self, op_kwargs):
375 # If the user has specified a starting token we need to
376 # inject that into the operation's kwargs.
377 if self._starting_token is not None:
378 # Don't need to do anything special if there is no starting
379 # token specified.
380 next_token = self._parse_starting_token()[0]
381 self._inject_token_into_kwargs(op_kwargs, next_token)
382 if self._page_size is not None:
383 # Pass the page size as the parameter name for limiting
384 # page size, also known as the limit_key.
385 op_kwargs[self._limit_key] = self._page_size
386
387 def _inject_token_into_kwargs(self, op_kwargs, next_token):
388 for name, token in next_token.items():
389 if (token is not None) and (token != 'None'):
390 op_kwargs[name] = token
391 elif name in op_kwargs:
392 del op_kwargs[name]
393
394 def _handle_first_request(
395 self, parsed, primary_result_key, starting_truncation
396 ):
397 # If the payload is an array or string, we need to slice into it
398 # and only return the truncated amount.
399 starting_truncation = self._parse_starting_token()[1]
400 all_data = primary_result_key.search(parsed)
401 if isinstance(all_data, (list, str)):
402 data = all_data[starting_truncation:]
403 else:
404 data = None
405 set_value_from_jmespath(parsed, primary_result_key.expression, data)
406 # We also need to truncate any secondary result keys
407 # because they were not truncated in the previous last
408 # response.
409 for token in self.result_keys:
410 if token == primary_result_key:
411 continue
412 sample = token.search(parsed)
413 if isinstance(sample, list):
414 empty_value = []
415 elif isinstance(sample, str):
416 empty_value = ''
417 elif isinstance(sample, (int, float)):
418 empty_value = 0
419 else:
420 empty_value = None
421 set_value_from_jmespath(parsed, token.expression, empty_value)
422 return starting_truncation
423
424 def _truncate_response(
425 self,
426 parsed,
427 primary_result_key,
428 truncate_amount,
429 starting_truncation,
430 next_token,
431 ):
432 original = primary_result_key.search(parsed)
433 if original is None:
434 original = []
435 amount_to_keep = len(original) - truncate_amount
436 truncated = original[:amount_to_keep]
437 set_value_from_jmespath(
438 parsed, primary_result_key.expression, truncated
439 )
440 # The issue here is that even though we know how much we've truncated
441 # we need to account for this globally including any starting
442 # left truncation. For example:
443 # Raw response: [0,1,2,3]
444 # Starting index: 1
445 # Max items: 1
446 # Starting left truncation: [1, 2, 3]
447 # End right truncation for max items: [1]
448 # However, even though we only kept 1, this is post
449 # left truncation so the next starting index should be 2, not 1
450 # (left_truncation + amount_to_keep).
451 next_token['boto_truncate_amount'] = (
452 amount_to_keep + starting_truncation
453 )
454 self.resume_token = next_token
455
456 def _get_next_token(self, parsed):
457 if self._more_results is not None:
458 if not self._more_results.search(parsed):
459 return {}
460 next_tokens = {}
461 for output_token, input_key in zip(
462 self._output_token, self._input_token
463 ):
464 next_token = output_token.search(parsed)
465 # We do not want to include any empty strings as actual tokens.
466 # Treat them as None.
467 if next_token:
468 next_tokens[input_key] = next_token
469 else:
470 next_tokens[input_key] = None
471 return next_tokens
472
473 def result_key_iters(self):
474 teed_results = tee(self, len(self.result_keys))
475 return [
476 ResultKeyIterator(i, result_key)
477 for i, result_key in zip(teed_results, self.result_keys)
478 ]
479
480 def build_full_result(self):
481 complete_result = {}
482 for response in self:
483 page = response
484 # We want to try to catch operation object pagination
485 # and format correctly for those. They come in the form
486 # of a tuple of two elements: (http_response, parsed_responsed).
487 # We want the parsed_response as that is what the page iterator
488 # uses. We can remove it though once operation objects are removed.
489 if isinstance(response, tuple) and len(response) == 2:
490 page = response[1]
491 # We're incrementally building the full response page
492 # by page. For each page in the response we need to
493 # inject the necessary components from the page
494 # into the complete_result.
495 for result_expression in self.result_keys:
496 # In order to incrementally update a result key
497 # we need to search the existing value from complete_result,
498 # then we need to search the _current_ page for the
499 # current result key value. Then we append the current
500 # value onto the existing value, and re-set that value
501 # as the new value.
502 result_value = result_expression.search(page)
503 if result_value is None:
504 continue
505 existing_value = result_expression.search(complete_result)
506 if existing_value is None:
507 # Set the initial result
508 set_value_from_jmespath(
509 complete_result,
510 result_expression.expression,
511 result_value,
512 )
513 continue
514 # Now both result_value and existing_value contain something
515 if isinstance(result_value, list):
516 existing_value.extend(result_value)
517 elif isinstance(result_value, (int, float, str)):
518 # Modify the existing result with the sum or concatenation
519 set_value_from_jmespath(
520 complete_result,
521 result_expression.expression,
522 existing_value + result_value,
523 )
524 merge_dicts(complete_result, self.non_aggregate_part)
525 if self.resume_token is not None:
526 complete_result['NextToken'] = self.resume_token
527 return complete_result
528
529 def _parse_starting_token(self):
530 if self._starting_token is None:
531 return None
532
533 # The starting token is a dict passed as a base64 encoded string.
534 next_token = self._starting_token
535 try:
536 next_token = self._token_decoder.decode(next_token)
537 index = 0
538 if 'boto_truncate_amount' in next_token:
539 index = next_token.get('boto_truncate_amount')
540 del next_token['boto_truncate_amount']
541 except (ValueError, TypeError):
542 next_token, index = self._parse_starting_token_deprecated()
543 return next_token, index
544
545 def _parse_starting_token_deprecated(self):
546 """
547 This handles parsing of old style starting tokens, and attempts to
548 coerce them into the new style.
549 """
550 log.debug(
551 "Attempting to fall back to old starting token parser. For "
552 f"token: {self._starting_token}"
553 )
554 if self._starting_token is None:
555 return None
556
557 parts = self._starting_token.split('___')
558 next_token = []
559 index = 0
560 if len(parts) == len(self._input_token) + 1:
561 try:
562 index = int(parts.pop())
563 except ValueError:
564 # This doesn't look like a valid old-style token, so we're
565 # passing it along as an opaque service token.
566 parts = [self._starting_token]
567
568 for part in parts:
569 if part == 'None':
570 next_token.append(None)
571 else:
572 next_token.append(part)
573 return self._convert_deprecated_starting_token(next_token), index
574
575 def _convert_deprecated_starting_token(self, deprecated_token):
576 """
577 This attempts to convert a deprecated starting token into the new
578 style.
579 """
580 len_deprecated_token = len(deprecated_token)
581 len_input_token = len(self._input_token)
582 if len_deprecated_token > len_input_token:
583 raise ValueError(f"Bad starting token: {self._starting_token}")
584 elif len_deprecated_token < len_input_token:
585 log.debug(
586 "Old format starting token does not contain all input "
587 "tokens. Setting the rest, in order, as None."
588 )
589 for i in range(len_input_token - len_deprecated_token):
590 deprecated_token.append(None)
591 return dict(zip(self._input_token, deprecated_token))
592
593
594class Paginator:
595 PAGE_ITERATOR_CLS = PageIterator
596
597 def __init__(self, method, pagination_config, model):
598 self._model = model
599 self._method = method
600 self._pagination_cfg = pagination_config
601 self._output_token = self._get_output_tokens(self._pagination_cfg)
602 self._input_token = self._get_input_tokens(self._pagination_cfg)
603 self._more_results = self._get_more_results_token(self._pagination_cfg)
604 self._non_aggregate_keys = self._get_non_aggregate_keys(
605 self._pagination_cfg
606 )
607 self._result_keys = self._get_result_keys(self._pagination_cfg)
608 self._limit_key = self._get_limit_key(self._pagination_cfg)
609
610 @property
611 def result_keys(self):
612 return self._result_keys
613
614 def _get_non_aggregate_keys(self, config):
615 keys = []
616 for key in config.get('non_aggregate_keys', []):
617 keys.append(jmespath.compile(key))
618 return keys
619
620 def _get_output_tokens(self, config):
621 output = []
622 output_token = config['output_token']
623 if not isinstance(output_token, list):
624 output_token = [output_token]
625 for config in output_token:
626 output.append(jmespath.compile(config))
627 return output
628
629 def _get_input_tokens(self, config):
630 input_token = self._pagination_cfg['input_token']
631 if not isinstance(input_token, list):
632 input_token = [input_token]
633 return input_token
634
635 def _get_more_results_token(self, config):
636 more_results = config.get('more_results')
637 if more_results is not None:
638 return jmespath.compile(more_results)
639
640 def _get_result_keys(self, config):
641 result_key = config.get('result_key')
642 if result_key is not None:
643 if not isinstance(result_key, list):
644 result_key = [result_key]
645 result_key = [jmespath.compile(rk) for rk in result_key]
646 return result_key
647
648 def _get_limit_key(self, config):
649 return config.get('limit_key')
650
651 def paginate(self, **kwargs):
652 """Create paginator object for an operation.
653
654 This returns an iterable object. Iterating over
655 this object will yield a single page of a response
656 at a time.
657
658 """
659 page_params = self._extract_paging_params(kwargs)
660 return self.PAGE_ITERATOR_CLS(
661 self._method,
662 self._input_token,
663 self._output_token,
664 self._more_results,
665 self._result_keys,
666 self._non_aggregate_keys,
667 self._limit_key,
668 page_params['MaxItems'],
669 page_params['StartingToken'],
670 page_params['PageSize'],
671 kwargs,
672 )
673
674 def _extract_paging_params(self, kwargs):
675 pagination_config = kwargs.pop('PaginationConfig', {})
676 max_items = pagination_config.get('MaxItems', None)
677 if max_items is not None:
678 max_items = int(max_items)
679 page_size = pagination_config.get('PageSize', None)
680 if page_size is not None:
681 if self._limit_key is None:
682 raise PaginationError(
683 message="PageSize parameter is not supported for the "
684 "pagination interface for this operation."
685 )
686 input_members = self._model.input_shape.members
687 limit_key_shape = input_members.get(self._limit_key)
688 if limit_key_shape.type_name == 'string':
689 if not isinstance(page_size, str):
690 page_size = str(page_size)
691 else:
692 page_size = int(page_size)
693 return {
694 'MaxItems': max_items,
695 'StartingToken': pagination_config.get('StartingToken', None),
696 'PageSize': page_size,
697 }
698
699
700class ResultKeyIterator:
701 """Iterates over the results of paginated responses.
702
703 Each iterator is associated with a single result key.
704 Iterating over this object will give you each element in
705 the result key list.
706
707 :param pages_iterator: An iterator that will give you
708 pages of results (a ``PageIterator`` class).
709 :param result_key: The JMESPath expression representing
710 the result key.
711
712 """
713
714 def __init__(self, pages_iterator, result_key):
715 self._pages_iterator = pages_iterator
716 self.result_key = result_key
717
718 def __iter__(self):
719 for page in self._pages_iterator:
720 results = self.result_key.search(page)
721 if results is None:
722 results = []
723 yield from results