Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/msal/telemetry.py: 50%

48 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:20 +0000

1import uuid 

2import logging 

3 

4 

5logger = logging.getLogger(__name__) 

6 

7CLIENT_REQUEST_ID = 'client-request-id' 

8CLIENT_CURRENT_TELEMETRY = "x-client-current-telemetry" 

9CLIENT_LAST_TELEMETRY = "x-client-last-telemetry" 

10NON_SILENT_CALL = 0 

11FORCE_REFRESH = 1 

12AT_ABSENT = 2 

13AT_EXPIRED = 3 

14AT_AGING = 4 

15RESERVED = 5 

16 

17 

18def _get_new_correlation_id(): 

19 return str(uuid.uuid4()) 

20 

21 

22class _TelemetryContext(object): 

23 """It is used for handling the telemetry context for current OAuth2 "exchange".""" 

24 # https://identitydivision.visualstudio.com/DevEx/_git/AuthLibrariesApiReview?path=%2FTelemetry%2FMSALServerSideTelemetry.md&_a=preview 

25 _SUCCEEDED = "succeeded" 

26 _FAILED = "failed" 

27 _FAILURE_SIZE = "failure_size" 

28 _CURRENT_HEADER_SIZE_LIMIT = 100 

29 _LAST_HEADER_SIZE_LIMIT = 350 

30 

31 def __init__(self, buffer, lock, api_id, correlation_id=None, refresh_reason=None): 

32 self._buffer = buffer 

33 self._lock = lock 

34 self._api_id = api_id 

35 self._correlation_id = correlation_id or _get_new_correlation_id() 

36 self._refresh_reason = refresh_reason or NON_SILENT_CALL 

37 logger.debug("Generate or reuse correlation_id: %s", self._correlation_id) 

38 

39 def generate_headers(self): 

40 with self._lock: 

41 current = "4|{api_id},{cache_refresh}|".format( 

42 api_id=self._api_id, cache_refresh=self._refresh_reason) 

43 if len(current) > self._CURRENT_HEADER_SIZE_LIMIT: 

44 logger.warning( 

45 "Telemetry header greater than {} will be truncated by AAD".format( 

46 self._CURRENT_HEADER_SIZE_LIMIT)) 

47 failures = self._buffer.get(self._FAILED, []) 

48 return { 

49 CLIENT_REQUEST_ID: self._correlation_id, 

50 CLIENT_CURRENT_TELEMETRY: current, 

51 CLIENT_LAST_TELEMETRY: "4|{succeeded}|{failed_requests}|{errors}|".format( 

52 succeeded=self._buffer.get(self._SUCCEEDED, 0), 

53 failed_requests=",".join("{a},{c}".format(**f) for f in failures), 

54 errors=",".join(f["e"] for f in failures), 

55 ) 

56 } 

57 

58 def hit_an_access_token(self): 

59 with self._lock: 

60 self._buffer[self._SUCCEEDED] = self._buffer.get(self._SUCCEEDED, 0) + 1 

61 

62 def update_telemetry(self, auth_result): 

63 if auth_result: 

64 with self._lock: 

65 if "error" in auth_result: 

66 self._record_failure(auth_result["error"]) 

67 else: # Telemetry sent successfully. Reset buffer 

68 self._buffer.clear() # This won't work: self._buffer = {} 

69 

70 def _record_failure(self, error): 

71 simulation = len(",{api_id},{correlation_id},{error}".format( 

72 api_id=self._api_id, correlation_id=self._correlation_id, error=error)) 

73 if self._buffer.get(self._FAILURE_SIZE, 0) + simulation < self._LAST_HEADER_SIZE_LIMIT: 

74 self._buffer[self._FAILURE_SIZE] = self._buffer.get( 

75 self._FAILURE_SIZE, 0) + simulation 

76 self._buffer.setdefault(self._FAILED, []).append({ 

77 "a": self._api_id, "c": self._correlation_id, "e": error}) 

78