Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/jwt/utils.py: 32%

74 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:05 +0000

1import base64 

2import binascii 

3import re 

4from typing import Union 

5 

6try: 

7 from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurve 

8 from cryptography.hazmat.primitives.asymmetric.utils import ( 

9 decode_dss_signature, 

10 encode_dss_signature, 

11 ) 

12except ModuleNotFoundError: 

13 pass 

14 

15 

16def force_bytes(value: Union[bytes, str]) -> bytes: 

17 if isinstance(value, str): 

18 return value.encode("utf-8") 

19 elif isinstance(value, bytes): 

20 return value 

21 else: 

22 raise TypeError("Expected a string value") 

23 

24 

25def base64url_decode(input: Union[bytes, str]) -> bytes: 

26 input_bytes = force_bytes(input) 

27 

28 rem = len(input_bytes) % 4 

29 

30 if rem > 0: 

31 input_bytes += b"=" * (4 - rem) 

32 

33 return base64.urlsafe_b64decode(input_bytes) 

34 

35 

36def base64url_encode(input: bytes) -> bytes: 

37 return base64.urlsafe_b64encode(input).replace(b"=", b"") 

38 

39 

40def to_base64url_uint(val: int) -> bytes: 

41 if val < 0: 

42 raise ValueError("Must be a positive integer") 

43 

44 int_bytes = bytes_from_int(val) 

45 

46 if len(int_bytes) == 0: 

47 int_bytes = b"\x00" 

48 

49 return base64url_encode(int_bytes) 

50 

51 

52def from_base64url_uint(val: Union[bytes, str]) -> int: 

53 data = base64url_decode(force_bytes(val)) 

54 return int.from_bytes(data, byteorder="big") 

55 

56 

57def number_to_bytes(num: int, num_bytes: int) -> bytes: 

58 padded_hex = "%0*x" % (2 * num_bytes, num) 

59 return binascii.a2b_hex(padded_hex.encode("ascii")) 

60 

61 

62def bytes_to_number(string: bytes) -> int: 

63 return int(binascii.b2a_hex(string), 16) 

64 

65 

66def bytes_from_int(val: int) -> bytes: 

67 remaining = val 

68 byte_length = 0 

69 

70 while remaining != 0: 

71 remaining >>= 8 

72 byte_length += 1 

73 

74 return val.to_bytes(byte_length, "big", signed=False) 

75 

76 

77def der_to_raw_signature(der_sig: bytes, curve: "EllipticCurve") -> bytes: 

78 num_bits = curve.key_size 

79 num_bytes = (num_bits + 7) // 8 

80 

81 r, s = decode_dss_signature(der_sig) 

82 

83 return number_to_bytes(r, num_bytes) + number_to_bytes(s, num_bytes) 

84 

85 

86def raw_to_der_signature(raw_sig: bytes, curve: "EllipticCurve") -> bytes: 

87 num_bits = curve.key_size 

88 num_bytes = (num_bits + 7) // 8 

89 

90 if len(raw_sig) != 2 * num_bytes: 

91 raise ValueError("Invalid signature") 

92 

93 r = bytes_to_number(raw_sig[:num_bytes]) 

94 s = bytes_to_number(raw_sig[num_bytes:]) 

95 

96 return bytes(encode_dss_signature(r, s)) 

97 

98 

99# Based on https://github.com/hynek/pem/blob/7ad94db26b0bc21d10953f5dbad3acfdfacf57aa/src/pem/_core.py#L224-L252 

100_PEMS = { 

101 b"CERTIFICATE", 

102 b"TRUSTED CERTIFICATE", 

103 b"PRIVATE KEY", 

104 b"PUBLIC KEY", 

105 b"ENCRYPTED PRIVATE KEY", 

106 b"OPENSSH PRIVATE KEY", 

107 b"DSA PRIVATE KEY", 

108 b"RSA PRIVATE KEY", 

109 b"RSA PUBLIC KEY", 

110 b"EC PRIVATE KEY", 

111 b"DH PARAMETERS", 

112 b"NEW CERTIFICATE REQUEST", 

113 b"CERTIFICATE REQUEST", 

114 b"SSH2 PUBLIC KEY", 

115 b"SSH2 ENCRYPTED PRIVATE KEY", 

116 b"X509 CRL", 

117} 

118 

119_PEM_RE = re.compile( 

120 b"----[- ]BEGIN (" 

121 + b"|".join(_PEMS) 

122 + b""")[- ]----\r? 

123.+?\r? 

124----[- ]END \\1[- ]----\r?\n?""", 

125 re.DOTALL, 

126) 

127 

128 

129def is_pem_format(key: bytes) -> bool: 

130 return bool(_PEM_RE.search(key)) 

131 

132 

133# Based on https://github.com/pyca/cryptography/blob/bcb70852d577b3f490f015378c75cba74986297b/src/cryptography/hazmat/primitives/serialization/ssh.py#L40-L46 

134_CERT_SUFFIX = b"-cert-v01@openssh.com" 

135_SSH_PUBKEY_RC = re.compile(rb"\A(\S+)[ \t]+(\S+)") 

136_SSH_KEY_FORMATS = [ 

137 b"ssh-ed25519", 

138 b"ssh-rsa", 

139 b"ssh-dss", 

140 b"ecdsa-sha2-nistp256", 

141 b"ecdsa-sha2-nistp384", 

142 b"ecdsa-sha2-nistp521", 

143] 

144 

145 

146def is_ssh_key(key: bytes) -> bool: 

147 if any(string_value in key for string_value in _SSH_KEY_FORMATS): 

148 return True 

149 

150 ssh_pubkey_match = _SSH_PUBKEY_RC.match(key) 

151 if ssh_pubkey_match: 

152 key_type = ssh_pubkey_match.group(1) 

153 if _CERT_SUFFIX == key_type[-len(_CERT_SUFFIX) :]: 

154 return True 

155 

156 return False