Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/msal/telemetry.py: 50%
48 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:20 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:20 +0000
1import uuid
2import logging
5logger = logging.getLogger(__name__)
7CLIENT_REQUEST_ID = 'client-request-id'
8CLIENT_CURRENT_TELEMETRY = "x-client-current-telemetry"
9CLIENT_LAST_TELEMETRY = "x-client-last-telemetry"
10NON_SILENT_CALL = 0
11FORCE_REFRESH = 1
12AT_ABSENT = 2
13AT_EXPIRED = 3
14AT_AGING = 4
15RESERVED = 5
18def _get_new_correlation_id():
19 return str(uuid.uuid4())
22class _TelemetryContext(object):
23 """It is used for handling the telemetry context for current OAuth2 "exchange"."""
24 # https://identitydivision.visualstudio.com/DevEx/_git/AuthLibrariesApiReview?path=%2FTelemetry%2FMSALServerSideTelemetry.md&_a=preview
25 _SUCCEEDED = "succeeded"
26 _FAILED = "failed"
27 _FAILURE_SIZE = "failure_size"
28 _CURRENT_HEADER_SIZE_LIMIT = 100
29 _LAST_HEADER_SIZE_LIMIT = 350
31 def __init__(self, buffer, lock, api_id, correlation_id=None, refresh_reason=None):
32 self._buffer = buffer
33 self._lock = lock
34 self._api_id = api_id
35 self._correlation_id = correlation_id or _get_new_correlation_id()
36 self._refresh_reason = refresh_reason or NON_SILENT_CALL
37 logger.debug("Generate or reuse correlation_id: %s", self._correlation_id)
39 def generate_headers(self):
40 with self._lock:
41 current = "4|{api_id},{cache_refresh}|".format(
42 api_id=self._api_id, cache_refresh=self._refresh_reason)
43 if len(current) > self._CURRENT_HEADER_SIZE_LIMIT:
44 logger.warning(
45 "Telemetry header greater than {} will be truncated by AAD".format(
46 self._CURRENT_HEADER_SIZE_LIMIT))
47 failures = self._buffer.get(self._FAILED, [])
48 return {
49 CLIENT_REQUEST_ID: self._correlation_id,
50 CLIENT_CURRENT_TELEMETRY: current,
51 CLIENT_LAST_TELEMETRY: "4|{succeeded}|{failed_requests}|{errors}|".format(
52 succeeded=self._buffer.get(self._SUCCEEDED, 0),
53 failed_requests=",".join("{a},{c}".format(**f) for f in failures),
54 errors=",".join(f["e"] for f in failures),
55 )
56 }
58 def hit_an_access_token(self):
59 with self._lock:
60 self._buffer[self._SUCCEEDED] = self._buffer.get(self._SUCCEEDED, 0) + 1
62 def update_telemetry(self, auth_result):
63 if auth_result:
64 with self._lock:
65 if "error" in auth_result:
66 self._record_failure(auth_result["error"])
67 else: # Telemetry sent successfully. Reset buffer
68 self._buffer.clear() # This won't work: self._buffer = {}
70 def _record_failure(self, error):
71 simulation = len(",{api_id},{correlation_id},{error}".format(
72 api_id=self._api_id, correlation_id=self._correlation_id, error=error))
73 if self._buffer.get(self._FAILURE_SIZE, 0) + simulation < self._LAST_HEADER_SIZE_LIMIT:
74 self._buffer[self._FAILURE_SIZE] = self._buffer.get(
75 self._FAILURE_SIZE, 0) + simulation
76 self._buffer.setdefault(self._FAILED, []).append({
77 "a": self._api_id, "c": self._correlation_id, "e": error})