Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/jose/utils.py: 48%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

62 statements  

1import base64 

2import re 

3import struct 

4 

5# Piggyback of the backends implementation of the function that converts a long 

6# to a bytes stream. Some plumbing is necessary to have the signatures match. 

7try: 

8 from cryptography.utils import int_to_bytes as _long_to_bytes 

9 

10 def long_to_bytes(n, blocksize=0): 

11 return _long_to_bytes(n, blocksize or None) 

12 

13except ImportError: 

14 from ecdsa.ecdsa import int_to_string as _long_to_bytes 

15 

16 def long_to_bytes(n, blocksize=0): 

17 ret = _long_to_bytes(n) 

18 if blocksize == 0: 

19 return ret 

20 else: 

21 assert len(ret) <= blocksize 

22 padding = blocksize - len(ret) 

23 return b"\x00" * padding + ret 

24 

25 

26def long_to_base64(data, size=0): 

27 return base64.urlsafe_b64encode(long_to_bytes(data, size)).strip(b"=") 

28 

29 

30def int_arr_to_long(arr): 

31 return int("".join(["%02x" % byte for byte in arr]), 16) 

32 

33 

34def base64_to_long(data): 

35 if isinstance(data, str): 

36 data = data.encode("ascii") 

37 

38 # urlsafe_b64decode will happily convert b64encoded data 

39 _d = base64.urlsafe_b64decode(bytes(data) + b"==") 

40 return int_arr_to_long(struct.unpack("%sB" % len(_d), _d)) 

41 

42 

43def calculate_at_hash(access_token, hash_alg): 

44 """Helper method for calculating an access token 

45 hash, as described in http://openid.net/specs/openid-connect-core-1_0.html#CodeIDToken 

46 

47 Its value is the base64url encoding of the left-most half of the hash of the octets 

48 of the ASCII representation of the access_token value, where the hash algorithm 

49 used is the hash algorithm used in the alg Header Parameter of the ID Token's JOSE 

50 Header. For instance, if the alg is RS256, hash the access_token value with SHA-256, 

51 then take the left-most 128 bits and base64url encode them. The at_hash value is a 

52 case sensitive string. 

53 

54 Args: 

55 access_token (str): An access token string. 

56 hash_alg (callable): A callable returning a hash object, e.g. hashlib.sha256 

57 

58 """ 

59 hash_digest = hash_alg(access_token.encode("utf-8")).digest() 

60 cut_at = int(len(hash_digest) / 2) 

61 truncated = hash_digest[:cut_at] 

62 at_hash = base64url_encode(truncated) 

63 return at_hash.decode("utf-8") 

64 

65 

66def base64url_decode(input): 

67 """Helper method to base64url_decode a string. 

68 

69 Args: 

70 input (bytes): A base64url_encoded string (bytes) to decode. 

71 

72 """ 

73 rem = len(input) % 4 

74 

75 if rem > 0: 

76 input += b"=" * (4 - rem) 

77 

78 return base64.urlsafe_b64decode(input) 

79 

80 

81def base64url_encode(input): 

82 """Helper method to base64url_encode a string. 

83 

84 Args: 

85 input (bytes): A base64url_encoded string (bytes) to encode. 

86 

87 """ 

88 return base64.urlsafe_b64encode(input).replace(b"=", b"") 

89 

90 

91def timedelta_total_seconds(delta): 

92 """Helper method to determine the total number of seconds 

93 from a timedelta. 

94 

95 Args: 

96 delta (timedelta): A timedelta to convert to seconds. 

97 """ 

98 return delta.days * 24 * 60 * 60 + delta.seconds 

99 

100 

101def ensure_binary(s): 

102 """Coerce **s** to bytes.""" 

103 

104 if isinstance(s, bytes): 

105 return s 

106 if isinstance(s, str): 

107 return s.encode("utf-8", "strict") 

108 raise TypeError(f"not expecting type '{type(s)}'") 

109 

110 

111# The following was copied from PyJWT: 

112# https://github.com/jpadilla/pyjwt/commit/9c528670c455b8d948aff95ed50e22940d1ad3fc 

113# Based on: 

114# https://github.com/hynek/pem/blob/7ad94db26b0bc21d10953f5dbad3acfdfacf57aa/src/pem/_core.py#L224-L252 

115_PEMS = { 

116 b"CERTIFICATE", 

117 b"TRUSTED CERTIFICATE", 

118 b"PRIVATE KEY", 

119 b"PUBLIC KEY", 

120 b"ENCRYPTED PRIVATE KEY", 

121 b"OPENSSH PRIVATE KEY", 

122 b"DSA PRIVATE KEY", 

123 b"RSA PRIVATE KEY", 

124 b"RSA PUBLIC KEY", 

125 b"EC PRIVATE KEY", 

126 b"DH PARAMETERS", 

127 b"NEW CERTIFICATE REQUEST", 

128 b"CERTIFICATE REQUEST", 

129 b"SSH2 PUBLIC KEY", 

130 b"SSH2 ENCRYPTED PRIVATE KEY", 

131 b"X509 CRL", 

132} 

133_PEM_RE = re.compile( 

134 b"----[- ]BEGIN (" + b"|".join(re.escape(pem) for pem in _PEMS) + b")[- ]----", 

135) 

136 

137 

138def is_pem_format(key: bytes) -> bool: 

139 return bool(_PEM_RE.search(key)) 

140 

141 

142# Based on 

143# https://github.com/pyca/cryptography/blob/bcb70852d577b3f490f015378c75cba74986297b 

144# /src/cryptography/hazmat/primitives/serialization/ssh.py#L40-L46 

145_CERT_SUFFIX = b"-cert-v01@openssh.com" 

146_SSH_PUBKEY_RC = re.compile(rb"\A(\S+)[ \t]+(\S+)") 

147_SSH_KEY_FORMATS = [ 

148 b"ssh-ed25519", 

149 b"ssh-rsa", 

150 b"ssh-dss", 

151 b"ecdsa-sha2-nistp256", 

152 b"ecdsa-sha2-nistp384", 

153 b"ecdsa-sha2-nistp521", 

154] 

155 

156 

157def is_ssh_key(key: bytes) -> bool: 

158 if any(string_value in key for string_value in _SSH_KEY_FORMATS): 

159 return True 

160 ssh_pubkey_match = _SSH_PUBKEY_RC.match(key) 

161 if ssh_pubkey_match: 

162 key_type = ssh_pubkey_match.group(1) 

163 if _CERT_SUFFIX == key_type[-len(_CERT_SUFFIX) :]: 

164 return True 

165 return False