Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/botocore/hooks.py: 57%

254 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1# Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"). You 

4# may not use this file except in compliance with the License. A copy of 

5# the License is located at 

6# 

7# http://aws.amazon.com/apache2.0/ 

8# 

9# or in the "license" file accompanying this file. This file is 

10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF 

11# ANY KIND, either express or implied. See the License for the specific 

12# language governing permissions and limitations under the License. 

13import copy 

14import logging 

15from collections import deque, namedtuple 

16 

17from botocore.compat import accepts_kwargs 

18from botocore.utils import EVENT_ALIASES 

19 

20logger = logging.getLogger(__name__) 

21 

22 

23_NodeList = namedtuple('NodeList', ['first', 'middle', 'last']) 

24_FIRST = 0 

25_MIDDLE = 1 

26_LAST = 2 

27 

28 

29class NodeList(_NodeList): 

30 def __copy__(self): 

31 first_copy = copy.copy(self.first) 

32 middle_copy = copy.copy(self.middle) 

33 last_copy = copy.copy(self.last) 

34 copied = NodeList(first_copy, middle_copy, last_copy) 

35 return copied 

36 

37 

38def first_non_none_response(responses, default=None): 

39 """Find first non None response in a list of tuples. 

40 

41 This function can be used to find the first non None response from 

42 handlers connected to an event. This is useful if you are interested 

43 in the returned responses from event handlers. Example usage:: 

44 

45 print(first_non_none_response([(func1, None), (func2, 'foo'), 

46 (func3, 'bar')])) 

47 # This will print 'foo' 

48 

49 :type responses: list of tuples 

50 :param responses: The responses from the ``EventHooks.emit`` method. 

51 This is a list of tuples, and each tuple is 

52 (handler, handler_response). 

53 

54 :param default: If no non-None responses are found, then this default 

55 value will be returned. 

56 

57 :return: The first non-None response in the list of tuples. 

58 

59 """ 

60 for response in responses: 

61 if response[1] is not None: 

62 return response[1] 

63 return default 

64 

65 

66class BaseEventHooks: 

67 def emit(self, event_name, **kwargs): 

68 """Call all handlers subscribed to an event. 

69 

70 :type event_name: str 

71 :param event_name: The name of the event to emit. 

72 

73 :type **kwargs: dict 

74 :param **kwargs: Arbitrary kwargs to pass through to the 

75 subscribed handlers. The ``event_name`` will be injected 

76 into the kwargs so it's not necessary to add this to **kwargs. 

77 

78 :rtype: list of tuples 

79 :return: A list of ``(handler_func, handler_func_return_value)`` 

80 

81 """ 

82 return [] 

83 

84 def register( 

85 self, event_name, handler, unique_id=None, unique_id_uses_count=False 

86 ): 

87 """Register an event handler for a given event. 

88 

89 If a ``unique_id`` is given, the handler will not be registered 

90 if a handler with the ``unique_id`` has already been registered. 

91 

92 Handlers are called in the order they have been registered. 

93 Note handlers can also be registered with ``register_first()`` 

94 and ``register_last()``. All handlers registered with 

95 ``register_first()`` are called before handlers registered 

96 with ``register()`` which are called before handlers registered 

97 with ``register_last()``. 

98 

99 """ 

100 self._verify_and_register( 

101 event_name, 

102 handler, 

103 unique_id, 

104 register_method=self._register, 

105 unique_id_uses_count=unique_id_uses_count, 

106 ) 

107 

108 def register_first( 

109 self, event_name, handler, unique_id=None, unique_id_uses_count=False 

110 ): 

111 """Register an event handler to be called first for an event. 

112 

113 All event handlers registered with ``register_first()`` will 

114 be called before handlers registered with ``register()`` and 

115 ``register_last()``. 

116 

117 """ 

118 self._verify_and_register( 

119 event_name, 

120 handler, 

121 unique_id, 

122 register_method=self._register_first, 

123 unique_id_uses_count=unique_id_uses_count, 

124 ) 

125 

126 def register_last( 

127 self, event_name, handler, unique_id=None, unique_id_uses_count=False 

128 ): 

129 """Register an event handler to be called last for an event. 

130 

131 All event handlers registered with ``register_last()`` will be called 

132 after handlers registered with ``register_first()`` and ``register()``. 

133 

134 """ 

135 self._verify_and_register( 

136 event_name, 

137 handler, 

138 unique_id, 

139 register_method=self._register_last, 

140 unique_id_uses_count=unique_id_uses_count, 

141 ) 

142 

143 def _verify_and_register( 

144 self, 

145 event_name, 

146 handler, 

147 unique_id, 

148 register_method, 

149 unique_id_uses_count, 

150 ): 

151 self._verify_is_callable(handler) 

152 self._verify_accept_kwargs(handler) 

153 register_method(event_name, handler, unique_id, unique_id_uses_count) 

154 

155 def unregister( 

156 self, 

157 event_name, 

158 handler=None, 

159 unique_id=None, 

160 unique_id_uses_count=False, 

161 ): 

162 """Unregister an event handler for a given event. 

163 

164 If no ``unique_id`` was given during registration, then the 

165 first instance of the event handler is removed (if the event 

166 handler has been registered multiple times). 

167 

168 """ 

169 pass 

170 

171 def _verify_is_callable(self, func): 

172 if not callable(func): 

173 raise ValueError("Event handler %s must be callable." % func) 

174 

175 def _verify_accept_kwargs(self, func): 

176 """Verifies a callable accepts kwargs 

177 

178 :type func: callable 

179 :param func: A callable object. 

180 

181 :returns: True, if ``func`` accepts kwargs, otherwise False. 

182 

183 """ 

184 try: 

185 if not accepts_kwargs(func): 

186 raise ValueError( 

187 f"Event handler {func} must accept keyword " 

188 f"arguments (**kwargs)" 

189 ) 

190 except TypeError: 

191 return False 

192 

193 

194class HierarchicalEmitter(BaseEventHooks): 

195 def __init__(self): 

196 # We keep a reference to the handlers for quick 

197 # read only access (we never modify self._handlers). 

198 # A cache of event name to handler list. 

199 self._lookup_cache = {} 

200 self._handlers = _PrefixTrie() 

201 # This is used to ensure that unique_id's are only 

202 # registered once. 

203 self._unique_id_handlers = {} 

204 

205 def _emit(self, event_name, kwargs, stop_on_response=False): 

206 """ 

207 Emit an event with optional keyword arguments. 

208 

209 :type event_name: string 

210 :param event_name: Name of the event 

211 :type kwargs: dict 

212 :param kwargs: Arguments to be passed to the handler functions. 

213 :type stop_on_response: boolean 

214 :param stop_on_response: Whether to stop on the first non-None 

215 response. If False, then all handlers 

216 will be called. This is especially useful 

217 to handlers which mutate data and then 

218 want to stop propagation of the event. 

219 :rtype: list 

220 :return: List of (handler, response) tuples from all processed 

221 handlers. 

222 """ 

223 responses = [] 

224 # Invoke the event handlers from most specific 

225 # to least specific, each time stripping off a dot. 

226 handlers_to_call = self._lookup_cache.get(event_name) 

227 if handlers_to_call is None: 

228 handlers_to_call = self._handlers.prefix_search(event_name) 

229 self._lookup_cache[event_name] = handlers_to_call 

230 elif not handlers_to_call: 

231 # Short circuit and return an empty response is we have 

232 # no handlers to call. This is the common case where 

233 # for the majority of signals, nothing is listening. 

234 return [] 

235 kwargs['event_name'] = event_name 

236 responses = [] 

237 for handler in handlers_to_call: 

238 logger.debug('Event %s: calling handler %s', event_name, handler) 

239 response = handler(**kwargs) 

240 responses.append((handler, response)) 

241 if stop_on_response and response is not None: 

242 return responses 

243 return responses 

244 

245 def emit(self, event_name, **kwargs): 

246 """ 

247 Emit an event by name with arguments passed as keyword args. 

248 

249 >>> responses = emitter.emit( 

250 ... 'my-event.service.operation', arg1='one', arg2='two') 

251 

252 :rtype: list 

253 :return: List of (handler, response) tuples from all processed 

254 handlers. 

255 """ 

256 return self._emit(event_name, kwargs) 

257 

258 def emit_until_response(self, event_name, **kwargs): 

259 """ 

260 Emit an event by name with arguments passed as keyword args, 

261 until the first non-``None`` response is received. This 

262 method prevents subsequent handlers from being invoked. 

263 

264 >>> handler, response = emitter.emit_until_response( 

265 'my-event.service.operation', arg1='one', arg2='two') 

266 

267 :rtype: tuple 

268 :return: The first (handler, response) tuple where the response 

269 is not ``None``, otherwise (``None``, ``None``). 

270 """ 

271 responses = self._emit(event_name, kwargs, stop_on_response=True) 

272 if responses: 

273 return responses[-1] 

274 else: 

275 return (None, None) 

276 

277 def _register( 

278 self, event_name, handler, unique_id=None, unique_id_uses_count=False 

279 ): 

280 self._register_section( 

281 event_name, 

282 handler, 

283 unique_id, 

284 unique_id_uses_count, 

285 section=_MIDDLE, 

286 ) 

287 

288 def _register_first( 

289 self, event_name, handler, unique_id=None, unique_id_uses_count=False 

290 ): 

291 self._register_section( 

292 event_name, 

293 handler, 

294 unique_id, 

295 unique_id_uses_count, 

296 section=_FIRST, 

297 ) 

298 

299 def _register_last( 

300 self, event_name, handler, unique_id, unique_id_uses_count=False 

301 ): 

302 self._register_section( 

303 event_name, handler, unique_id, unique_id_uses_count, section=_LAST 

304 ) 

305 

306 def _register_section( 

307 self, event_name, handler, unique_id, unique_id_uses_count, section 

308 ): 

309 if unique_id is not None: 

310 if unique_id in self._unique_id_handlers: 

311 # We've already registered a handler using this unique_id 

312 # so we don't need to register it again. 

313 count = self._unique_id_handlers[unique_id].get('count', None) 

314 if unique_id_uses_count: 

315 if not count: 

316 raise ValueError( 

317 "Initial registration of unique id %s was " 

318 "specified to use a counter. Subsequent register " 

319 "calls to unique id must specify use of a counter " 

320 "as well." % unique_id 

321 ) 

322 else: 

323 self._unique_id_handlers[unique_id]['count'] += 1 

324 else: 

325 if count: 

326 raise ValueError( 

327 "Initial registration of unique id %s was " 

328 "specified to not use a counter. Subsequent " 

329 "register calls to unique id must specify not to " 

330 "use a counter as well." % unique_id 

331 ) 

332 return 

333 else: 

334 # Note that the trie knows nothing about the unique 

335 # id. We track uniqueness in this class via the 

336 # _unique_id_handlers. 

337 self._handlers.append_item( 

338 event_name, handler, section=section 

339 ) 

340 unique_id_handler_item = {'handler': handler} 

341 if unique_id_uses_count: 

342 unique_id_handler_item['count'] = 1 

343 self._unique_id_handlers[unique_id] = unique_id_handler_item 

344 else: 

345 self._handlers.append_item(event_name, handler, section=section) 

346 # Super simple caching strategy for now, if we change the registrations 

347 # clear the cache. This has the opportunity for smarter invalidations. 

348 self._lookup_cache = {} 

349 

350 def unregister( 

351 self, 

352 event_name, 

353 handler=None, 

354 unique_id=None, 

355 unique_id_uses_count=False, 

356 ): 

357 if unique_id is not None: 

358 try: 

359 count = self._unique_id_handlers[unique_id].get('count', None) 

360 except KeyError: 

361 # There's no handler matching that unique_id so we have 

362 # nothing to unregister. 

363 return 

364 if unique_id_uses_count: 

365 if count is None: 

366 raise ValueError( 

367 "Initial registration of unique id %s was specified to " 

368 "use a counter. Subsequent unregister calls to unique " 

369 "id must specify use of a counter as well." % unique_id 

370 ) 

371 elif count == 1: 

372 handler = self._unique_id_handlers.pop(unique_id)[ 

373 'handler' 

374 ] 

375 else: 

376 self._unique_id_handlers[unique_id]['count'] -= 1 

377 return 

378 else: 

379 if count: 

380 raise ValueError( 

381 "Initial registration of unique id %s was specified " 

382 "to not use a counter. Subsequent unregister calls " 

383 "to unique id must specify not to use a counter as " 

384 "well." % unique_id 

385 ) 

386 handler = self._unique_id_handlers.pop(unique_id)['handler'] 

387 try: 

388 self._handlers.remove_item(event_name, handler) 

389 self._lookup_cache = {} 

390 except ValueError: 

391 pass 

392 

393 def __copy__(self): 

394 new_instance = self.__class__() 

395 new_state = self.__dict__.copy() 

396 new_state['_handlers'] = copy.copy(self._handlers) 

397 new_state['_unique_id_handlers'] = copy.copy(self._unique_id_handlers) 

398 new_instance.__dict__ = new_state 

399 return new_instance 

400 

401 

402class EventAliaser(BaseEventHooks): 

403 def __init__(self, event_emitter, event_aliases=None): 

404 self._event_aliases = event_aliases 

405 if event_aliases is None: 

406 self._event_aliases = EVENT_ALIASES 

407 self._alias_name_cache = {} 

408 self._emitter = event_emitter 

409 

410 def emit(self, event_name, **kwargs): 

411 aliased_event_name = self._alias_event_name(event_name) 

412 return self._emitter.emit(aliased_event_name, **kwargs) 

413 

414 def emit_until_response(self, event_name, **kwargs): 

415 aliased_event_name = self._alias_event_name(event_name) 

416 return self._emitter.emit_until_response(aliased_event_name, **kwargs) 

417 

418 def register( 

419 self, event_name, handler, unique_id=None, unique_id_uses_count=False 

420 ): 

421 aliased_event_name = self._alias_event_name(event_name) 

422 return self._emitter.register( 

423 aliased_event_name, handler, unique_id, unique_id_uses_count 

424 ) 

425 

426 def register_first( 

427 self, event_name, handler, unique_id=None, unique_id_uses_count=False 

428 ): 

429 aliased_event_name = self._alias_event_name(event_name) 

430 return self._emitter.register_first( 

431 aliased_event_name, handler, unique_id, unique_id_uses_count 

432 ) 

433 

434 def register_last( 

435 self, event_name, handler, unique_id=None, unique_id_uses_count=False 

436 ): 

437 aliased_event_name = self._alias_event_name(event_name) 

438 return self._emitter.register_last( 

439 aliased_event_name, handler, unique_id, unique_id_uses_count 

440 ) 

441 

442 def unregister( 

443 self, 

444 event_name, 

445 handler=None, 

446 unique_id=None, 

447 unique_id_uses_count=False, 

448 ): 

449 aliased_event_name = self._alias_event_name(event_name) 

450 return self._emitter.unregister( 

451 aliased_event_name, handler, unique_id, unique_id_uses_count 

452 ) 

453 

454 def _alias_event_name(self, event_name): 

455 if event_name in self._alias_name_cache: 

456 return self._alias_name_cache[event_name] 

457 

458 for old_part, new_part in self._event_aliases.items(): 

459 # We can't simply do a string replace for everything, otherwise we 

460 # might end up translating substrings that we never intended to 

461 # translate. When there aren't any dots in the old event name 

462 # part, then we can quickly replace the item in the list if it's 

463 # there. 

464 event_parts = event_name.split('.') 

465 if '.' not in old_part: 

466 try: 

467 # Theoretically a given event name could have the same part 

468 # repeated, but in practice this doesn't happen 

469 event_parts[event_parts.index(old_part)] = new_part 

470 except ValueError: 

471 continue 

472 

473 # If there's dots in the name, it gets more complicated. Now we 

474 # have to replace multiple sections of the original event. 

475 elif old_part in event_name: 

476 old_parts = old_part.split('.') 

477 self._replace_subsection(event_parts, old_parts, new_part) 

478 else: 

479 continue 

480 

481 new_name = '.'.join(event_parts) 

482 logger.debug( 

483 f"Changing event name from {event_name} to {new_name}" 

484 ) 

485 self._alias_name_cache[event_name] = new_name 

486 return new_name 

487 

488 self._alias_name_cache[event_name] = event_name 

489 return event_name 

490 

491 def _replace_subsection(self, sections, old_parts, new_part): 

492 for i in range(len(sections)): 

493 if ( 

494 sections[i] == old_parts[0] 

495 and sections[i : i + len(old_parts)] == old_parts 

496 ): 

497 sections[i : i + len(old_parts)] = [new_part] 

498 return 

499 

500 def __copy__(self): 

501 return self.__class__( 

502 copy.copy(self._emitter), copy.copy(self._event_aliases) 

503 ) 

504 

505 

506class _PrefixTrie: 

507 """Specialized prefix trie that handles wildcards. 

508 

509 The prefixes in this case are based on dot separated 

510 names so 'foo.bar.baz' is:: 

511 

512 foo -> bar -> baz 

513 

514 Wildcard support just means that having a key such as 'foo.bar.*.baz' will 

515 be matched with a call to ``get_items(key='foo.bar.ANYTHING.baz')``. 

516 

517 You can think of this prefix trie as the equivalent as defaultdict(list), 

518 except that it can do prefix searches: 

519 

520 foo.bar.baz -> A 

521 foo.bar -> B 

522 foo -> C 

523 

524 Calling ``get_items('foo.bar.baz')`` will return [A + B + C], from 

525 most specific to least specific. 

526 

527 """ 

528 

529 def __init__(self): 

530 # Each dictionary can be though of as a node, where a node 

531 # has values associated with the node, and children is a link 

532 # to more nodes. So 'foo.bar' would have a 'foo' node with 

533 # a 'bar' node as a child of foo. 

534 # {'foo': {'children': {'bar': {...}}}}. 

535 self._root = {'chunk': None, 'children': {}, 'values': None} 

536 

537 def append_item(self, key, value, section=_MIDDLE): 

538 """Add an item to a key. 

539 

540 If a value is already associated with that key, the new 

541 value is appended to the list for the key. 

542 """ 

543 key_parts = key.split('.') 

544 current = self._root 

545 for part in key_parts: 

546 if part not in current['children']: 

547 new_child = {'chunk': part, 'values': None, 'children': {}} 

548 current['children'][part] = new_child 

549 current = new_child 

550 else: 

551 current = current['children'][part] 

552 if current['values'] is None: 

553 current['values'] = NodeList([], [], []) 

554 current['values'][section].append(value) 

555 

556 def prefix_search(self, key): 

557 """Collect all items that are prefixes of key. 

558 

559 Prefix in this case are delineated by '.' characters so 

560 'foo.bar.baz' is a 3 chunk sequence of 3 "prefixes" ( 

561 "foo", "bar", and "baz"). 

562 

563 """ 

564 collected = deque() 

565 key_parts = key.split('.') 

566 current = self._root 

567 self._get_items(current, key_parts, collected, 0) 

568 return collected 

569 

570 def _get_items(self, starting_node, key_parts, collected, starting_index): 

571 stack = [(starting_node, starting_index)] 

572 key_parts_len = len(key_parts) 

573 # Traverse down the nodes, where at each level we add the 

574 # next part from key_parts as well as the wildcard element '*'. 

575 # This means for each node we see we potentially add two more 

576 # elements to our stack. 

577 while stack: 

578 current_node, index = stack.pop() 

579 if current_node['values']: 

580 # We're using extendleft because we want 

581 # the values associated with the node furthest 

582 # from the root to come before nodes closer 

583 # to the root. extendleft() also adds its items 

584 # in right-left order so .extendleft([1, 2, 3]) 

585 # will result in final_list = [3, 2, 1], which is 

586 # why we reverse the lists. 

587 node_list = current_node['values'] 

588 complete_order = ( 

589 node_list.first + node_list.middle + node_list.last 

590 ) 

591 collected.extendleft(reversed(complete_order)) 

592 if not index == key_parts_len: 

593 children = current_node['children'] 

594 directs = children.get(key_parts[index]) 

595 wildcard = children.get('*') 

596 next_index = index + 1 

597 if wildcard is not None: 

598 stack.append((wildcard, next_index)) 

599 if directs is not None: 

600 stack.append((directs, next_index)) 

601 

602 def remove_item(self, key, value): 

603 """Remove an item associated with a key. 

604 

605 If the value is not associated with the key a ``ValueError`` 

606 will be raised. If the key does not exist in the trie, a 

607 ``ValueError`` will be raised. 

608 

609 """ 

610 key_parts = key.split('.') 

611 current = self._root 

612 self._remove_item(current, key_parts, value, index=0) 

613 

614 def _remove_item(self, current_node, key_parts, value, index): 

615 if current_node is None: 

616 return 

617 elif index < len(key_parts): 

618 next_node = current_node['children'].get(key_parts[index]) 

619 if next_node is not None: 

620 self._remove_item(next_node, key_parts, value, index + 1) 

621 if index == len(key_parts) - 1: 

622 node_list = next_node['values'] 

623 if value in node_list.first: 

624 node_list.first.remove(value) 

625 elif value in node_list.middle: 

626 node_list.middle.remove(value) 

627 elif value in node_list.last: 

628 node_list.last.remove(value) 

629 if not next_node['children'] and not next_node['values']: 

630 # Then this is a leaf node with no values so 

631 # we can just delete this link from the parent node. 

632 # This makes subsequent search faster in the case 

633 # where a key does not exist. 

634 del current_node['children'][key_parts[index]] 

635 else: 

636 raise ValueError(f"key is not in trie: {'.'.join(key_parts)}") 

637 

638 def __copy__(self): 

639 # The fact that we're using a nested dict under the covers 

640 # is an implementation detail, and the user shouldn't have 

641 # to know that they'd normally need a deepcopy so we expose 

642 # __copy__ instead of __deepcopy__. 

643 new_copy = self.__class__() 

644 copied_attrs = self._recursive_copy(self.__dict__) 

645 new_copy.__dict__ = copied_attrs 

646 return new_copy 

647 

648 def _recursive_copy(self, node): 

649 # We can't use copy.deepcopy because we actually only want to copy 

650 # the structure of the trie, not the handlers themselves. 

651 # Each node has a chunk, children, and values. 

652 copied_node = {} 

653 for key, value in node.items(): 

654 if isinstance(value, NodeList): 

655 copied_node[key] = copy.copy(value) 

656 elif isinstance(value, dict): 

657 copied_node[key] = self._recursive_copy(value) 

658 else: 

659 copied_node[key] = value 

660 return copied_node