Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/botocore/monitoring.py: 33%
221 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"). You
4# may not use this file except in compliance with the License. A copy of
5# the License is located at
6#
7# http://aws.amazon.com/apache2.0/
8#
9# or in the "license" file accompanying this file. This file is
10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11# ANY KIND, either express or implied. See the License for the specific
12# language governing permissions and limitations under the License.
13import json
14import logging
15import re
16import time
18from botocore.compat import ensure_bytes, ensure_unicode, urlparse
19from botocore.retryhandler import EXCEPTION_MAP as RETRYABLE_EXCEPTIONS
21logger = logging.getLogger(__name__)
24class Monitor:
25 _EVENTS_TO_REGISTER = [
26 'before-parameter-build',
27 'request-created',
28 'response-received',
29 'after-call',
30 'after-call-error',
31 ]
33 def __init__(self, adapter, publisher):
34 """Abstraction for monitoring clients API calls
36 :param adapter: An adapter that takes event emitter events
37 and produces monitor events
39 :param publisher: A publisher for generated monitor events
40 """
41 self._adapter = adapter
42 self._publisher = publisher
44 def register(self, event_emitter):
45 """Register an event emitter to the monitor"""
46 for event_to_register in self._EVENTS_TO_REGISTER:
47 event_emitter.register_last(event_to_register, self.capture)
49 def capture(self, event_name, **payload):
50 """Captures an incoming event from the event emitter
52 It will feed an event emitter event to the monitor's adaptor to create
53 a monitor event and then publish that event to the monitor's publisher.
54 """
55 try:
56 monitor_event = self._adapter.feed(event_name, payload)
57 if monitor_event:
58 self._publisher.publish(monitor_event)
59 except Exception as e:
60 logger.debug(
61 'Exception %s raised by client monitor in handling event %s',
62 e,
63 event_name,
64 exc_info=True,
65 )
68class MonitorEventAdapter:
69 def __init__(self, time=time.time):
70 """Adapts event emitter events to produce monitor events
72 :type time: callable
73 :param time: A callable that produces the current time
74 """
75 self._time = time
77 def feed(self, emitter_event_name, emitter_payload):
78 """Feed an event emitter event to generate a monitor event
80 :type emitter_event_name: str
81 :param emitter_event_name: The name of the event emitted
83 :type emitter_payload: dict
84 :param emitter_payload: The payload to associated to the event
85 emitted
87 :rtype: BaseMonitorEvent
88 :returns: A monitor event based on the event emitter events
89 fired
90 """
91 return self._get_handler(emitter_event_name)(**emitter_payload)
93 def _get_handler(self, event_name):
94 return getattr(
95 self, '_handle_' + event_name.split('.')[0].replace('-', '_')
96 )
98 def _handle_before_parameter_build(self, model, context, **kwargs):
99 context['current_api_call_event'] = APICallEvent(
100 service=model.service_model.service_id,
101 operation=model.wire_name,
102 timestamp=self._get_current_time(),
103 )
105 def _handle_request_created(self, request, **kwargs):
106 context = request.context
107 new_attempt_event = context[
108 'current_api_call_event'
109 ].new_api_call_attempt(timestamp=self._get_current_time())
110 new_attempt_event.request_headers = request.headers
111 new_attempt_event.url = request.url
112 context['current_api_call_attempt_event'] = new_attempt_event
114 def _handle_response_received(
115 self, parsed_response, context, exception, **kwargs
116 ):
117 attempt_event = context.pop('current_api_call_attempt_event')
118 attempt_event.latency = self._get_latency(attempt_event)
119 if parsed_response is not None:
120 attempt_event.http_status_code = parsed_response[
121 'ResponseMetadata'
122 ]['HTTPStatusCode']
123 attempt_event.response_headers = parsed_response[
124 'ResponseMetadata'
125 ]['HTTPHeaders']
126 attempt_event.parsed_error = parsed_response.get('Error')
127 else:
128 attempt_event.wire_exception = exception
129 return attempt_event
131 def _handle_after_call(self, context, parsed, **kwargs):
132 context['current_api_call_event'].retries_exceeded = parsed[
133 'ResponseMetadata'
134 ].get('MaxAttemptsReached', False)
135 return self._complete_api_call(context)
137 def _handle_after_call_error(self, context, exception, **kwargs):
138 # If the after-call-error was emitted and the error being raised
139 # was a retryable connection error, then the retries must have exceeded
140 # for that exception as this event gets emitted **after** retries
141 # happen.
142 context[
143 'current_api_call_event'
144 ].retries_exceeded = self._is_retryable_exception(exception)
145 return self._complete_api_call(context)
147 def _is_retryable_exception(self, exception):
148 return isinstance(
149 exception, tuple(RETRYABLE_EXCEPTIONS['GENERAL_CONNECTION_ERROR'])
150 )
152 def _complete_api_call(self, context):
153 call_event = context.pop('current_api_call_event')
154 call_event.latency = self._get_latency(call_event)
155 return call_event
157 def _get_latency(self, event):
158 return self._get_current_time() - event.timestamp
160 def _get_current_time(self):
161 return int(self._time() * 1000)
164class BaseMonitorEvent:
165 def __init__(self, service, operation, timestamp):
166 """Base monitor event
168 :type service: str
169 :param service: A string identifying the service associated to
170 the event
172 :type operation: str
173 :param operation: A string identifying the operation of service
174 associated to the event
176 :type timestamp: int
177 :param timestamp: Epoch time in milliseconds from when the event began
178 """
179 self.service = service
180 self.operation = operation
181 self.timestamp = timestamp
183 def __repr__(self):
184 return f'{self.__class__.__name__}({self.__dict__!r})'
186 def __eq__(self, other):
187 if isinstance(other, self.__class__):
188 return self.__dict__ == other.__dict__
189 return False
192class APICallEvent(BaseMonitorEvent):
193 def __init__(
194 self,
195 service,
196 operation,
197 timestamp,
198 latency=None,
199 attempts=None,
200 retries_exceeded=False,
201 ):
202 """Monitor event for a single API call
204 This event corresponds to a single client method call, which includes
205 every HTTP requests attempt made in order to complete the client call
207 :type service: str
208 :param service: A string identifying the service associated to
209 the event
211 :type operation: str
212 :param operation: A string identifying the operation of service
213 associated to the event
215 :type timestamp: int
216 :param timestamp: Epoch time in milliseconds from when the event began
218 :type latency: int
219 :param latency: The time in milliseconds to complete the client call
221 :type attempts: list
222 :param attempts: The list of APICallAttempts associated to the
223 APICall
225 :type retries_exceeded: bool
226 :param retries_exceeded: True if API call exceeded retries. False
227 otherwise
228 """
229 super().__init__(
230 service=service, operation=operation, timestamp=timestamp
231 )
232 self.latency = latency
233 self.attempts = attempts
234 if attempts is None:
235 self.attempts = []
236 self.retries_exceeded = retries_exceeded
238 def new_api_call_attempt(self, timestamp):
239 """Instantiates APICallAttemptEvent associated to the APICallEvent
241 :type timestamp: int
242 :param timestamp: Epoch time in milliseconds to associate to the
243 APICallAttemptEvent
244 """
245 attempt_event = APICallAttemptEvent(
246 service=self.service, operation=self.operation, timestamp=timestamp
247 )
248 self.attempts.append(attempt_event)
249 return attempt_event
252class APICallAttemptEvent(BaseMonitorEvent):
253 def __init__(
254 self,
255 service,
256 operation,
257 timestamp,
258 latency=None,
259 url=None,
260 http_status_code=None,
261 request_headers=None,
262 response_headers=None,
263 parsed_error=None,
264 wire_exception=None,
265 ):
266 """Monitor event for a single API call attempt
268 This event corresponds to a single HTTP request attempt in completing
269 the entire client method call.
271 :type service: str
272 :param service: A string identifying the service associated to
273 the event
275 :type operation: str
276 :param operation: A string identifying the operation of service
277 associated to the event
279 :type timestamp: int
280 :param timestamp: Epoch time in milliseconds from when the HTTP request
281 started
283 :type latency: int
284 :param latency: The time in milliseconds to complete the HTTP request
285 whether it succeeded or failed
287 :type url: str
288 :param url: The URL the attempt was sent to
290 :type http_status_code: int
291 :param http_status_code: The HTTP status code of the HTTP response
292 if there was a response
294 :type request_headers: dict
295 :param request_headers: The HTTP headers sent in making the HTTP
296 request
298 :type response_headers: dict
299 :param response_headers: The HTTP headers returned in the HTTP response
300 if there was a response
302 :type parsed_error: dict
303 :param parsed_error: The error parsed if the service returned an
304 error back
306 :type wire_exception: Exception
307 :param wire_exception: The exception raised in sending the HTTP
308 request (i.e. ConnectionError)
309 """
310 super().__init__(
311 service=service, operation=operation, timestamp=timestamp
312 )
313 self.latency = latency
314 self.url = url
315 self.http_status_code = http_status_code
316 self.request_headers = request_headers
317 self.response_headers = response_headers
318 self.parsed_error = parsed_error
319 self.wire_exception = wire_exception
322class CSMSerializer:
323 _MAX_CLIENT_ID_LENGTH = 255
324 _MAX_EXCEPTION_CLASS_LENGTH = 128
325 _MAX_ERROR_CODE_LENGTH = 128
326 _MAX_USER_AGENT_LENGTH = 256
327 _MAX_MESSAGE_LENGTH = 512
328 _RESPONSE_HEADERS_TO_EVENT_ENTRIES = {
329 'x-amzn-requestid': 'XAmznRequestId',
330 'x-amz-request-id': 'XAmzRequestId',
331 'x-amz-id-2': 'XAmzId2',
332 }
333 _AUTH_REGEXS = {
334 'v4': re.compile(
335 r'AWS4-HMAC-SHA256 '
336 r'Credential=(?P<access_key>\w+)/\d+/'
337 r'(?P<signing_region>[a-z0-9-]+)/'
338 ),
339 's3': re.compile(r'AWS (?P<access_key>\w+):'),
340 }
341 _SERIALIZEABLE_EVENT_PROPERTIES = [
342 'service',
343 'operation',
344 'timestamp',
345 'attempts',
346 'latency',
347 'retries_exceeded',
348 'url',
349 'request_headers',
350 'http_status_code',
351 'response_headers',
352 'parsed_error',
353 'wire_exception',
354 ]
356 def __init__(self, csm_client_id):
357 """Serializes monitor events to CSM (Client Side Monitoring) format
359 :type csm_client_id: str
360 :param csm_client_id: The application identifier to associate
361 to the serialized events
362 """
363 self._validate_client_id(csm_client_id)
364 self.csm_client_id = csm_client_id
366 def _validate_client_id(self, csm_client_id):
367 if len(csm_client_id) > self._MAX_CLIENT_ID_LENGTH:
368 raise ValueError(
369 f'The value provided for csm_client_id: {csm_client_id} exceeds '
370 f'the maximum length of {self._MAX_CLIENT_ID_LENGTH} characters'
371 )
373 def serialize(self, event):
374 """Serializes a monitor event to the CSM format
376 :type event: BaseMonitorEvent
377 :param event: The event to serialize to bytes
379 :rtype: bytes
380 :returns: The CSM serialized form of the event
381 """
382 event_dict = self._get_base_event_dict(event)
383 event_type = self._get_event_type(event)
384 event_dict['Type'] = event_type
385 for attr in self._SERIALIZEABLE_EVENT_PROPERTIES:
386 value = getattr(event, attr, None)
387 if value is not None:
388 getattr(self, '_serialize_' + attr)(
389 value, event_dict, event_type=event_type
390 )
391 return ensure_bytes(json.dumps(event_dict, separators=(',', ':')))
393 def _get_base_event_dict(self, event):
394 return {
395 'Version': 1,
396 'ClientId': self.csm_client_id,
397 }
399 def _serialize_service(self, service, event_dict, **kwargs):
400 event_dict['Service'] = service
402 def _serialize_operation(self, operation, event_dict, **kwargs):
403 event_dict['Api'] = operation
405 def _serialize_timestamp(self, timestamp, event_dict, **kwargs):
406 event_dict['Timestamp'] = timestamp
408 def _serialize_attempts(self, attempts, event_dict, **kwargs):
409 event_dict['AttemptCount'] = len(attempts)
410 if attempts:
411 self._add_fields_from_last_attempt(event_dict, attempts[-1])
413 def _add_fields_from_last_attempt(self, event_dict, last_attempt):
414 if last_attempt.request_headers:
415 # It does not matter which attempt to use to grab the region
416 # for the ApiCall event, but SDKs typically do the last one.
417 region = self._get_region(last_attempt.request_headers)
418 if region is not None:
419 event_dict['Region'] = region
420 event_dict['UserAgent'] = self._get_user_agent(
421 last_attempt.request_headers
422 )
423 if last_attempt.http_status_code is not None:
424 event_dict['FinalHttpStatusCode'] = last_attempt.http_status_code
425 if last_attempt.parsed_error is not None:
426 self._serialize_parsed_error(
427 last_attempt.parsed_error, event_dict, 'ApiCall'
428 )
429 if last_attempt.wire_exception is not None:
430 self._serialize_wire_exception(
431 last_attempt.wire_exception, event_dict, 'ApiCall'
432 )
434 def _serialize_latency(self, latency, event_dict, event_type):
435 if event_type == 'ApiCall':
436 event_dict['Latency'] = latency
437 elif event_type == 'ApiCallAttempt':
438 event_dict['AttemptLatency'] = latency
440 def _serialize_retries_exceeded(
441 self, retries_exceeded, event_dict, **kwargs
442 ):
443 event_dict['MaxRetriesExceeded'] = 1 if retries_exceeded else 0
445 def _serialize_url(self, url, event_dict, **kwargs):
446 event_dict['Fqdn'] = urlparse(url).netloc
448 def _serialize_request_headers(
449 self, request_headers, event_dict, **kwargs
450 ):
451 event_dict['UserAgent'] = self._get_user_agent(request_headers)
452 if self._is_signed(request_headers):
453 event_dict['AccessKey'] = self._get_access_key(request_headers)
454 region = self._get_region(request_headers)
455 if region is not None:
456 event_dict['Region'] = region
457 if 'X-Amz-Security-Token' in request_headers:
458 event_dict['SessionToken'] = request_headers[
459 'X-Amz-Security-Token'
460 ]
462 def _serialize_http_status_code(
463 self, http_status_code, event_dict, **kwargs
464 ):
465 event_dict['HttpStatusCode'] = http_status_code
467 def _serialize_response_headers(
468 self, response_headers, event_dict, **kwargs
469 ):
470 for header, entry in self._RESPONSE_HEADERS_TO_EVENT_ENTRIES.items():
471 if header in response_headers:
472 event_dict[entry] = response_headers[header]
474 def _serialize_parsed_error(
475 self, parsed_error, event_dict, event_type, **kwargs
476 ):
477 field_prefix = 'Final' if event_type == 'ApiCall' else ''
478 event_dict[field_prefix + 'AwsException'] = self._truncate(
479 parsed_error['Code'], self._MAX_ERROR_CODE_LENGTH
480 )
481 event_dict[field_prefix + 'AwsExceptionMessage'] = self._truncate(
482 parsed_error['Message'], self._MAX_MESSAGE_LENGTH
483 )
485 def _serialize_wire_exception(
486 self, wire_exception, event_dict, event_type, **kwargs
487 ):
488 field_prefix = 'Final' if event_type == 'ApiCall' else ''
489 event_dict[field_prefix + 'SdkException'] = self._truncate(
490 wire_exception.__class__.__name__, self._MAX_EXCEPTION_CLASS_LENGTH
491 )
492 event_dict[field_prefix + 'SdkExceptionMessage'] = self._truncate(
493 str(wire_exception), self._MAX_MESSAGE_LENGTH
494 )
496 def _get_event_type(self, event):
497 if isinstance(event, APICallEvent):
498 return 'ApiCall'
499 elif isinstance(event, APICallAttemptEvent):
500 return 'ApiCallAttempt'
502 def _get_access_key(self, request_headers):
503 auth_val = self._get_auth_value(request_headers)
504 _, auth_match = self._get_auth_match(auth_val)
505 return auth_match.group('access_key')
507 def _get_region(self, request_headers):
508 if not self._is_signed(request_headers):
509 return None
510 auth_val = self._get_auth_value(request_headers)
511 signature_version, auth_match = self._get_auth_match(auth_val)
512 if signature_version != 'v4':
513 return None
514 return auth_match.group('signing_region')
516 def _get_user_agent(self, request_headers):
517 return self._truncate(
518 ensure_unicode(request_headers.get('User-Agent', '')),
519 self._MAX_USER_AGENT_LENGTH,
520 )
522 def _is_signed(self, request_headers):
523 return 'Authorization' in request_headers
525 def _get_auth_value(self, request_headers):
526 return ensure_unicode(request_headers['Authorization'])
528 def _get_auth_match(self, auth_val):
529 for signature_version, regex in self._AUTH_REGEXS.items():
530 match = regex.match(auth_val)
531 if match:
532 return signature_version, match
533 return None, None
535 def _truncate(self, text, max_length):
536 if len(text) > max_length:
537 logger.debug(
538 'Truncating following value to maximum length of ' '%s: %s',
539 text,
540 max_length,
541 )
542 return text[:max_length]
543 return text
546class SocketPublisher:
547 _MAX_MONITOR_EVENT_LENGTH = 8 * 1024
549 def __init__(self, socket, host, port, serializer):
550 """Publishes monitor events to a socket
552 :type socket: socket.socket
553 :param socket: The socket object to use to publish events
555 :type host: string
556 :param host: The host to send events to
558 :type port: integer
559 :param port: The port on the host to send events to
561 :param serializer: The serializer to use to serialize the event
562 to a form that can be published to the socket. This must
563 have a `serialize()` method that accepts a monitor event
564 and return bytes
565 """
566 self._socket = socket
567 self._address = (host, port)
568 self._serializer = serializer
570 def publish(self, event):
571 """Publishes a specified monitor event
573 :type event: BaseMonitorEvent
574 :param event: The monitor event to be sent
575 over the publisher's socket to the desired address.
576 """
577 serialized_event = self._serializer.serialize(event)
578 if len(serialized_event) > self._MAX_MONITOR_EVENT_LENGTH:
579 logger.debug(
580 'Serialized event of size %s exceeds the maximum length '
581 'allowed: %s. Not sending event to socket.',
582 len(serialized_event),
583 self._MAX_MONITOR_EVENT_LENGTH,
584 )
585 return
586 self._socket.sendto(serialized_event, self._address)