Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/botocore/hooks.py: 57%
254 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1# Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"). You
4# may not use this file except in compliance with the License. A copy of
5# the License is located at
6#
7# http://aws.amazon.com/apache2.0/
8#
9# or in the "license" file accompanying this file. This file is
10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11# ANY KIND, either express or implied. See the License for the specific
12# language governing permissions and limitations under the License.
13import copy
14import logging
15from collections import deque, namedtuple
17from botocore.compat import accepts_kwargs
18from botocore.utils import EVENT_ALIASES
20logger = logging.getLogger(__name__)
23_NodeList = namedtuple('NodeList', ['first', 'middle', 'last'])
24_FIRST = 0
25_MIDDLE = 1
26_LAST = 2
29class NodeList(_NodeList):
30 def __copy__(self):
31 first_copy = copy.copy(self.first)
32 middle_copy = copy.copy(self.middle)
33 last_copy = copy.copy(self.last)
34 copied = NodeList(first_copy, middle_copy, last_copy)
35 return copied
38def first_non_none_response(responses, default=None):
39 """Find first non None response in a list of tuples.
41 This function can be used to find the first non None response from
42 handlers connected to an event. This is useful if you are interested
43 in the returned responses from event handlers. Example usage::
45 print(first_non_none_response([(func1, None), (func2, 'foo'),
46 (func3, 'bar')]))
47 # This will print 'foo'
49 :type responses: list of tuples
50 :param responses: The responses from the ``EventHooks.emit`` method.
51 This is a list of tuples, and each tuple is
52 (handler, handler_response).
54 :param default: If no non-None responses are found, then this default
55 value will be returned.
57 :return: The first non-None response in the list of tuples.
59 """
60 for response in responses:
61 if response[1] is not None:
62 return response[1]
63 return default
66class BaseEventHooks:
67 def emit(self, event_name, **kwargs):
68 """Call all handlers subscribed to an event.
70 :type event_name: str
71 :param event_name: The name of the event to emit.
73 :type **kwargs: dict
74 :param **kwargs: Arbitrary kwargs to pass through to the
75 subscribed handlers. The ``event_name`` will be injected
76 into the kwargs so it's not necessary to add this to **kwargs.
78 :rtype: list of tuples
79 :return: A list of ``(handler_func, handler_func_return_value)``
81 """
82 return []
84 def register(
85 self, event_name, handler, unique_id=None, unique_id_uses_count=False
86 ):
87 """Register an event handler for a given event.
89 If a ``unique_id`` is given, the handler will not be registered
90 if a handler with the ``unique_id`` has already been registered.
92 Handlers are called in the order they have been registered.
93 Note handlers can also be registered with ``register_first()``
94 and ``register_last()``. All handlers registered with
95 ``register_first()`` are called before handlers registered
96 with ``register()`` which are called before handlers registered
97 with ``register_last()``.
99 """
100 self._verify_and_register(
101 event_name,
102 handler,
103 unique_id,
104 register_method=self._register,
105 unique_id_uses_count=unique_id_uses_count,
106 )
108 def register_first(
109 self, event_name, handler, unique_id=None, unique_id_uses_count=False
110 ):
111 """Register an event handler to be called first for an event.
113 All event handlers registered with ``register_first()`` will
114 be called before handlers registered with ``register()`` and
115 ``register_last()``.
117 """
118 self._verify_and_register(
119 event_name,
120 handler,
121 unique_id,
122 register_method=self._register_first,
123 unique_id_uses_count=unique_id_uses_count,
124 )
126 def register_last(
127 self, event_name, handler, unique_id=None, unique_id_uses_count=False
128 ):
129 """Register an event handler to be called last for an event.
131 All event handlers registered with ``register_last()`` will be called
132 after handlers registered with ``register_first()`` and ``register()``.
134 """
135 self._verify_and_register(
136 event_name,
137 handler,
138 unique_id,
139 register_method=self._register_last,
140 unique_id_uses_count=unique_id_uses_count,
141 )
143 def _verify_and_register(
144 self,
145 event_name,
146 handler,
147 unique_id,
148 register_method,
149 unique_id_uses_count,
150 ):
151 self._verify_is_callable(handler)
152 self._verify_accept_kwargs(handler)
153 register_method(event_name, handler, unique_id, unique_id_uses_count)
155 def unregister(
156 self,
157 event_name,
158 handler=None,
159 unique_id=None,
160 unique_id_uses_count=False,
161 ):
162 """Unregister an event handler for a given event.
164 If no ``unique_id`` was given during registration, then the
165 first instance of the event handler is removed (if the event
166 handler has been registered multiple times).
168 """
169 pass
171 def _verify_is_callable(self, func):
172 if not callable(func):
173 raise ValueError("Event handler %s must be callable." % func)
175 def _verify_accept_kwargs(self, func):
176 """Verifies a callable accepts kwargs
178 :type func: callable
179 :param func: A callable object.
181 :returns: True, if ``func`` accepts kwargs, otherwise False.
183 """
184 try:
185 if not accepts_kwargs(func):
186 raise ValueError(
187 f"Event handler {func} must accept keyword "
188 f"arguments (**kwargs)"
189 )
190 except TypeError:
191 return False
194class HierarchicalEmitter(BaseEventHooks):
195 def __init__(self):
196 # We keep a reference to the handlers for quick
197 # read only access (we never modify self._handlers).
198 # A cache of event name to handler list.
199 self._lookup_cache = {}
200 self._handlers = _PrefixTrie()
201 # This is used to ensure that unique_id's are only
202 # registered once.
203 self._unique_id_handlers = {}
205 def _emit(self, event_name, kwargs, stop_on_response=False):
206 """
207 Emit an event with optional keyword arguments.
209 :type event_name: string
210 :param event_name: Name of the event
211 :type kwargs: dict
212 :param kwargs: Arguments to be passed to the handler functions.
213 :type stop_on_response: boolean
214 :param stop_on_response: Whether to stop on the first non-None
215 response. If False, then all handlers
216 will be called. This is especially useful
217 to handlers which mutate data and then
218 want to stop propagation of the event.
219 :rtype: list
220 :return: List of (handler, response) tuples from all processed
221 handlers.
222 """
223 responses = []
224 # Invoke the event handlers from most specific
225 # to least specific, each time stripping off a dot.
226 handlers_to_call = self._lookup_cache.get(event_name)
227 if handlers_to_call is None:
228 handlers_to_call = self._handlers.prefix_search(event_name)
229 self._lookup_cache[event_name] = handlers_to_call
230 elif not handlers_to_call:
231 # Short circuit and return an empty response is we have
232 # no handlers to call. This is the common case where
233 # for the majority of signals, nothing is listening.
234 return []
235 kwargs['event_name'] = event_name
236 responses = []
237 for handler in handlers_to_call:
238 logger.debug('Event %s: calling handler %s', event_name, handler)
239 response = handler(**kwargs)
240 responses.append((handler, response))
241 if stop_on_response and response is not None:
242 return responses
243 return responses
245 def emit(self, event_name, **kwargs):
246 """
247 Emit an event by name with arguments passed as keyword args.
249 >>> responses = emitter.emit(
250 ... 'my-event.service.operation', arg1='one', arg2='two')
252 :rtype: list
253 :return: List of (handler, response) tuples from all processed
254 handlers.
255 """
256 return self._emit(event_name, kwargs)
258 def emit_until_response(self, event_name, **kwargs):
259 """
260 Emit an event by name with arguments passed as keyword args,
261 until the first non-``None`` response is received. This
262 method prevents subsequent handlers from being invoked.
264 >>> handler, response = emitter.emit_until_response(
265 'my-event.service.operation', arg1='one', arg2='two')
267 :rtype: tuple
268 :return: The first (handler, response) tuple where the response
269 is not ``None``, otherwise (``None``, ``None``).
270 """
271 responses = self._emit(event_name, kwargs, stop_on_response=True)
272 if responses:
273 return responses[-1]
274 else:
275 return (None, None)
277 def _register(
278 self, event_name, handler, unique_id=None, unique_id_uses_count=False
279 ):
280 self._register_section(
281 event_name,
282 handler,
283 unique_id,
284 unique_id_uses_count,
285 section=_MIDDLE,
286 )
288 def _register_first(
289 self, event_name, handler, unique_id=None, unique_id_uses_count=False
290 ):
291 self._register_section(
292 event_name,
293 handler,
294 unique_id,
295 unique_id_uses_count,
296 section=_FIRST,
297 )
299 def _register_last(
300 self, event_name, handler, unique_id, unique_id_uses_count=False
301 ):
302 self._register_section(
303 event_name, handler, unique_id, unique_id_uses_count, section=_LAST
304 )
306 def _register_section(
307 self, event_name, handler, unique_id, unique_id_uses_count, section
308 ):
309 if unique_id is not None:
310 if unique_id in self._unique_id_handlers:
311 # We've already registered a handler using this unique_id
312 # so we don't need to register it again.
313 count = self._unique_id_handlers[unique_id].get('count', None)
314 if unique_id_uses_count:
315 if not count:
316 raise ValueError(
317 "Initial registration of unique id %s was "
318 "specified to use a counter. Subsequent register "
319 "calls to unique id must specify use of a counter "
320 "as well." % unique_id
321 )
322 else:
323 self._unique_id_handlers[unique_id]['count'] += 1
324 else:
325 if count:
326 raise ValueError(
327 "Initial registration of unique id %s was "
328 "specified to not use a counter. Subsequent "
329 "register calls to unique id must specify not to "
330 "use a counter as well." % unique_id
331 )
332 return
333 else:
334 # Note that the trie knows nothing about the unique
335 # id. We track uniqueness in this class via the
336 # _unique_id_handlers.
337 self._handlers.append_item(
338 event_name, handler, section=section
339 )
340 unique_id_handler_item = {'handler': handler}
341 if unique_id_uses_count:
342 unique_id_handler_item['count'] = 1
343 self._unique_id_handlers[unique_id] = unique_id_handler_item
344 else:
345 self._handlers.append_item(event_name, handler, section=section)
346 # Super simple caching strategy for now, if we change the registrations
347 # clear the cache. This has the opportunity for smarter invalidations.
348 self._lookup_cache = {}
350 def unregister(
351 self,
352 event_name,
353 handler=None,
354 unique_id=None,
355 unique_id_uses_count=False,
356 ):
357 if unique_id is not None:
358 try:
359 count = self._unique_id_handlers[unique_id].get('count', None)
360 except KeyError:
361 # There's no handler matching that unique_id so we have
362 # nothing to unregister.
363 return
364 if unique_id_uses_count:
365 if count is None:
366 raise ValueError(
367 "Initial registration of unique id %s was specified to "
368 "use a counter. Subsequent unregister calls to unique "
369 "id must specify use of a counter as well." % unique_id
370 )
371 elif count == 1:
372 handler = self._unique_id_handlers.pop(unique_id)[
373 'handler'
374 ]
375 else:
376 self._unique_id_handlers[unique_id]['count'] -= 1
377 return
378 else:
379 if count:
380 raise ValueError(
381 "Initial registration of unique id %s was specified "
382 "to not use a counter. Subsequent unregister calls "
383 "to unique id must specify not to use a counter as "
384 "well." % unique_id
385 )
386 handler = self._unique_id_handlers.pop(unique_id)['handler']
387 try:
388 self._handlers.remove_item(event_name, handler)
389 self._lookup_cache = {}
390 except ValueError:
391 pass
393 def __copy__(self):
394 new_instance = self.__class__()
395 new_state = self.__dict__.copy()
396 new_state['_handlers'] = copy.copy(self._handlers)
397 new_state['_unique_id_handlers'] = copy.copy(self._unique_id_handlers)
398 new_instance.__dict__ = new_state
399 return new_instance
402class EventAliaser(BaseEventHooks):
403 def __init__(self, event_emitter, event_aliases=None):
404 self._event_aliases = event_aliases
405 if event_aliases is None:
406 self._event_aliases = EVENT_ALIASES
407 self._alias_name_cache = {}
408 self._emitter = event_emitter
410 def emit(self, event_name, **kwargs):
411 aliased_event_name = self._alias_event_name(event_name)
412 return self._emitter.emit(aliased_event_name, **kwargs)
414 def emit_until_response(self, event_name, **kwargs):
415 aliased_event_name = self._alias_event_name(event_name)
416 return self._emitter.emit_until_response(aliased_event_name, **kwargs)
418 def register(
419 self, event_name, handler, unique_id=None, unique_id_uses_count=False
420 ):
421 aliased_event_name = self._alias_event_name(event_name)
422 return self._emitter.register(
423 aliased_event_name, handler, unique_id, unique_id_uses_count
424 )
426 def register_first(
427 self, event_name, handler, unique_id=None, unique_id_uses_count=False
428 ):
429 aliased_event_name = self._alias_event_name(event_name)
430 return self._emitter.register_first(
431 aliased_event_name, handler, unique_id, unique_id_uses_count
432 )
434 def register_last(
435 self, event_name, handler, unique_id=None, unique_id_uses_count=False
436 ):
437 aliased_event_name = self._alias_event_name(event_name)
438 return self._emitter.register_last(
439 aliased_event_name, handler, unique_id, unique_id_uses_count
440 )
442 def unregister(
443 self,
444 event_name,
445 handler=None,
446 unique_id=None,
447 unique_id_uses_count=False,
448 ):
449 aliased_event_name = self._alias_event_name(event_name)
450 return self._emitter.unregister(
451 aliased_event_name, handler, unique_id, unique_id_uses_count
452 )
454 def _alias_event_name(self, event_name):
455 if event_name in self._alias_name_cache:
456 return self._alias_name_cache[event_name]
458 for old_part, new_part in self._event_aliases.items():
459 # We can't simply do a string replace for everything, otherwise we
460 # might end up translating substrings that we never intended to
461 # translate. When there aren't any dots in the old event name
462 # part, then we can quickly replace the item in the list if it's
463 # there.
464 event_parts = event_name.split('.')
465 if '.' not in old_part:
466 try:
467 # Theoretically a given event name could have the same part
468 # repeated, but in practice this doesn't happen
469 event_parts[event_parts.index(old_part)] = new_part
470 except ValueError:
471 continue
473 # If there's dots in the name, it gets more complicated. Now we
474 # have to replace multiple sections of the original event.
475 elif old_part in event_name:
476 old_parts = old_part.split('.')
477 self._replace_subsection(event_parts, old_parts, new_part)
478 else:
479 continue
481 new_name = '.'.join(event_parts)
482 logger.debug(
483 f"Changing event name from {event_name} to {new_name}"
484 )
485 self._alias_name_cache[event_name] = new_name
486 return new_name
488 self._alias_name_cache[event_name] = event_name
489 return event_name
491 def _replace_subsection(self, sections, old_parts, new_part):
492 for i in range(len(sections)):
493 if (
494 sections[i] == old_parts[0]
495 and sections[i : i + len(old_parts)] == old_parts
496 ):
497 sections[i : i + len(old_parts)] = [new_part]
498 return
500 def __copy__(self):
501 return self.__class__(
502 copy.copy(self._emitter), copy.copy(self._event_aliases)
503 )
506class _PrefixTrie:
507 """Specialized prefix trie that handles wildcards.
509 The prefixes in this case are based on dot separated
510 names so 'foo.bar.baz' is::
512 foo -> bar -> baz
514 Wildcard support just means that having a key such as 'foo.bar.*.baz' will
515 be matched with a call to ``get_items(key='foo.bar.ANYTHING.baz')``.
517 You can think of this prefix trie as the equivalent as defaultdict(list),
518 except that it can do prefix searches:
520 foo.bar.baz -> A
521 foo.bar -> B
522 foo -> C
524 Calling ``get_items('foo.bar.baz')`` will return [A + B + C], from
525 most specific to least specific.
527 """
529 def __init__(self):
530 # Each dictionary can be though of as a node, where a node
531 # has values associated with the node, and children is a link
532 # to more nodes. So 'foo.bar' would have a 'foo' node with
533 # a 'bar' node as a child of foo.
534 # {'foo': {'children': {'bar': {...}}}}.
535 self._root = {'chunk': None, 'children': {}, 'values': None}
537 def append_item(self, key, value, section=_MIDDLE):
538 """Add an item to a key.
540 If a value is already associated with that key, the new
541 value is appended to the list for the key.
542 """
543 key_parts = key.split('.')
544 current = self._root
545 for part in key_parts:
546 if part not in current['children']:
547 new_child = {'chunk': part, 'values': None, 'children': {}}
548 current['children'][part] = new_child
549 current = new_child
550 else:
551 current = current['children'][part]
552 if current['values'] is None:
553 current['values'] = NodeList([], [], [])
554 current['values'][section].append(value)
556 def prefix_search(self, key):
557 """Collect all items that are prefixes of key.
559 Prefix in this case are delineated by '.' characters so
560 'foo.bar.baz' is a 3 chunk sequence of 3 "prefixes" (
561 "foo", "bar", and "baz").
563 """
564 collected = deque()
565 key_parts = key.split('.')
566 current = self._root
567 self._get_items(current, key_parts, collected, 0)
568 return collected
570 def _get_items(self, starting_node, key_parts, collected, starting_index):
571 stack = [(starting_node, starting_index)]
572 key_parts_len = len(key_parts)
573 # Traverse down the nodes, where at each level we add the
574 # next part from key_parts as well as the wildcard element '*'.
575 # This means for each node we see we potentially add two more
576 # elements to our stack.
577 while stack:
578 current_node, index = stack.pop()
579 if current_node['values']:
580 # We're using extendleft because we want
581 # the values associated with the node furthest
582 # from the root to come before nodes closer
583 # to the root. extendleft() also adds its items
584 # in right-left order so .extendleft([1, 2, 3])
585 # will result in final_list = [3, 2, 1], which is
586 # why we reverse the lists.
587 node_list = current_node['values']
588 complete_order = (
589 node_list.first + node_list.middle + node_list.last
590 )
591 collected.extendleft(reversed(complete_order))
592 if not index == key_parts_len:
593 children = current_node['children']
594 directs = children.get(key_parts[index])
595 wildcard = children.get('*')
596 next_index = index + 1
597 if wildcard is not None:
598 stack.append((wildcard, next_index))
599 if directs is not None:
600 stack.append((directs, next_index))
602 def remove_item(self, key, value):
603 """Remove an item associated with a key.
605 If the value is not associated with the key a ``ValueError``
606 will be raised. If the key does not exist in the trie, a
607 ``ValueError`` will be raised.
609 """
610 key_parts = key.split('.')
611 current = self._root
612 self._remove_item(current, key_parts, value, index=0)
614 def _remove_item(self, current_node, key_parts, value, index):
615 if current_node is None:
616 return
617 elif index < len(key_parts):
618 next_node = current_node['children'].get(key_parts[index])
619 if next_node is not None:
620 self._remove_item(next_node, key_parts, value, index + 1)
621 if index == len(key_parts) - 1:
622 node_list = next_node['values']
623 if value in node_list.first:
624 node_list.first.remove(value)
625 elif value in node_list.middle:
626 node_list.middle.remove(value)
627 elif value in node_list.last:
628 node_list.last.remove(value)
629 if not next_node['children'] and not next_node['values']:
630 # Then this is a leaf node with no values so
631 # we can just delete this link from the parent node.
632 # This makes subsequent search faster in the case
633 # where a key does not exist.
634 del current_node['children'][key_parts[index]]
635 else:
636 raise ValueError(f"key is not in trie: {'.'.join(key_parts)}")
638 def __copy__(self):
639 # The fact that we're using a nested dict under the covers
640 # is an implementation detail, and the user shouldn't have
641 # to know that they'd normally need a deepcopy so we expose
642 # __copy__ instead of __deepcopy__.
643 new_copy = self.__class__()
644 copied_attrs = self._recursive_copy(self.__dict__)
645 new_copy.__dict__ = copied_attrs
646 return new_copy
648 def _recursive_copy(self, node):
649 # We can't use copy.deepcopy because we actually only want to copy
650 # the structure of the trie, not the handlers themselves.
651 # Each node has a chunk, children, and values.
652 copied_node = {}
653 for key, value in node.items():
654 if isinstance(value, NodeList):
655 copied_node[key] = copy.copy(value)
656 elif isinstance(value, dict):
657 copied_node[key] = self._recursive_copy(value)
658 else:
659 copied_node[key] = value
660 return copied_node