Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/auth/transport/_mtls_helper.py: 25%

76 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:40 +0000

1# Copyright 2020 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Helper functions for getting mTLS cert and key.""" 

16 

17import json 

18import logging 

19from os import path 

20import re 

21import subprocess 

22 

23from google.auth import exceptions 

24 

25CONTEXT_AWARE_METADATA_PATH = "~/.secureConnect/context_aware_metadata.json" 

26_CERT_PROVIDER_COMMAND = "cert_provider_command" 

27_CERT_REGEX = re.compile( 

28 b"-----BEGIN CERTIFICATE-----.+-----END CERTIFICATE-----\r?\n?", re.DOTALL 

29) 

30 

31# support various format of key files, e.g. 

32# "-----BEGIN PRIVATE KEY-----...", 

33# "-----BEGIN EC PRIVATE KEY-----...", 

34# "-----BEGIN RSA PRIVATE KEY-----..." 

35# "-----BEGIN ENCRYPTED PRIVATE KEY-----" 

36_KEY_REGEX = re.compile( 

37 b"-----BEGIN [A-Z ]*PRIVATE KEY-----.+-----END [A-Z ]*PRIVATE KEY-----\r?\n?", 

38 re.DOTALL, 

39) 

40 

41_LOGGER = logging.getLogger(__name__) 

42 

43 

44_PASSPHRASE_REGEX = re.compile( 

45 b"-----BEGIN PASSPHRASE-----(.+)-----END PASSPHRASE-----", re.DOTALL 

46) 

47 

48 

49def _check_dca_metadata_path(metadata_path): 

50 """Checks for context aware metadata. If it exists, returns the absolute path; 

51 otherwise returns None. 

52 

53 Args: 

54 metadata_path (str): context aware metadata path. 

55 

56 Returns: 

57 str: absolute path if exists and None otherwise. 

58 """ 

59 metadata_path = path.expanduser(metadata_path) 

60 if not path.exists(metadata_path): 

61 _LOGGER.debug("%s is not found, skip client SSL authentication.", metadata_path) 

62 return None 

63 return metadata_path 

64 

65 

66def _read_dca_metadata_file(metadata_path): 

67 """Loads context aware metadata from the given path. 

68 

69 Args: 

70 metadata_path (str): context aware metadata path. 

71 

72 Returns: 

73 Dict[str, str]: The metadata. 

74 

75 Raises: 

76 google.auth.exceptions.ClientCertError: If failed to parse metadata as JSON. 

77 """ 

78 try: 

79 with open(metadata_path) as f: 

80 metadata = json.load(f) 

81 except ValueError as caught_exc: 

82 new_exc = exceptions.ClientCertError(caught_exc) 

83 raise new_exc from caught_exc 

84 

85 return metadata 

86 

87 

88def _run_cert_provider_command(command, expect_encrypted_key=False): 

89 """Run the provided command, and return client side mTLS cert, key and 

90 passphrase. 

91 

92 Args: 

93 command (List[str]): cert provider command. 

94 expect_encrypted_key (bool): If encrypted private key is expected. 

95 

96 Returns: 

97 Tuple[bytes, bytes, bytes]: client certificate bytes in PEM format, key 

98 bytes in PEM format and passphrase bytes. 

99 

100 Raises: 

101 google.auth.exceptions.ClientCertError: if problems occurs when running 

102 the cert provider command or generating cert, key and passphrase. 

103 """ 

104 try: 

105 process = subprocess.Popen( 

106 command, stdout=subprocess.PIPE, stderr=subprocess.PIPE 

107 ) 

108 stdout, stderr = process.communicate() 

109 except OSError as caught_exc: 

110 new_exc = exceptions.ClientCertError(caught_exc) 

111 raise new_exc from caught_exc 

112 

113 # Check cert provider command execution error. 

114 if process.returncode != 0: 

115 raise exceptions.ClientCertError( 

116 "Cert provider command returns non-zero status code %s" % process.returncode 

117 ) 

118 

119 # Extract certificate (chain), key and passphrase. 

120 cert_match = re.findall(_CERT_REGEX, stdout) 

121 if len(cert_match) != 1: 

122 raise exceptions.ClientCertError("Client SSL certificate is missing or invalid") 

123 key_match = re.findall(_KEY_REGEX, stdout) 

124 if len(key_match) != 1: 

125 raise exceptions.ClientCertError("Client SSL key is missing or invalid") 

126 passphrase_match = re.findall(_PASSPHRASE_REGEX, stdout) 

127 

128 if expect_encrypted_key: 

129 if len(passphrase_match) != 1: 

130 raise exceptions.ClientCertError("Passphrase is missing or invalid") 

131 if b"ENCRYPTED" not in key_match[0]: 

132 raise exceptions.ClientCertError("Encrypted private key is expected") 

133 return cert_match[0], key_match[0], passphrase_match[0].strip() 

134 

135 if b"ENCRYPTED" in key_match[0]: 

136 raise exceptions.ClientCertError("Encrypted private key is not expected") 

137 if len(passphrase_match) > 0: 

138 raise exceptions.ClientCertError("Passphrase is not expected") 

139 return cert_match[0], key_match[0], None 

140 

141 

142def get_client_ssl_credentials( 

143 generate_encrypted_key=False, 

144 context_aware_metadata_path=CONTEXT_AWARE_METADATA_PATH, 

145): 

146 """Returns the client side certificate, private key and passphrase. 

147 

148 Args: 

149 generate_encrypted_key (bool): If set to True, encrypted private key 

150 and passphrase will be generated; otherwise, unencrypted private key 

151 will be generated and passphrase will be None. 

152 context_aware_metadata_path (str): The context_aware_metadata.json file path. 

153 

154 Returns: 

155 Tuple[bool, bytes, bytes, bytes]: 

156 A boolean indicating if cert, key and passphrase are obtained, the 

157 cert bytes and key bytes both in PEM format, and passphrase bytes. 

158 

159 Raises: 

160 google.auth.exceptions.ClientCertError: if problems occurs when getting 

161 the cert, key and passphrase. 

162 """ 

163 metadata_path = _check_dca_metadata_path(context_aware_metadata_path) 

164 

165 if metadata_path: 

166 metadata_json = _read_dca_metadata_file(metadata_path) 

167 

168 if _CERT_PROVIDER_COMMAND not in metadata_json: 

169 raise exceptions.ClientCertError("Cert provider command is not found") 

170 

171 command = metadata_json[_CERT_PROVIDER_COMMAND] 

172 

173 if generate_encrypted_key and "--with_passphrase" not in command: 

174 command.append("--with_passphrase") 

175 

176 # Execute the command. 

177 cert, key, passphrase = _run_cert_provider_command( 

178 command, expect_encrypted_key=generate_encrypted_key 

179 ) 

180 return True, cert, key, passphrase 

181 

182 return False, None, None, None 

183 

184 

185def get_client_cert_and_key(client_cert_callback=None): 

186 """Returns the client side certificate and private key. The function first 

187 tries to get certificate and key from client_cert_callback; if the callback 

188 is None or doesn't provide certificate and key, the function tries application 

189 default SSL credentials. 

190 

191 Args: 

192 client_cert_callback (Optional[Callable[[], (bytes, bytes)]]): An 

193 optional callback which returns client certificate bytes and private 

194 key bytes both in PEM format. 

195 

196 Returns: 

197 Tuple[bool, bytes, bytes]: 

198 A boolean indicating if cert and key are obtained, the cert bytes 

199 and key bytes both in PEM format. 

200 

201 Raises: 

202 google.auth.exceptions.ClientCertError: if problems occurs when getting 

203 the cert and key. 

204 """ 

205 if client_cert_callback: 

206 cert, key = client_cert_callback() 

207 return True, cert, key 

208 

209 has_cert, cert, key, _ = get_client_ssl_credentials(generate_encrypted_key=False) 

210 return has_cert, cert, key 

211 

212 

213def decrypt_private_key(key, passphrase): 

214 """A helper function to decrypt the private key with the given passphrase. 

215 google-auth library doesn't support passphrase protected private key for 

216 mutual TLS channel. This helper function can be used to decrypt the 

217 passphrase protected private key in order to estalish mutual TLS channel. 

218 

219 For example, if you have a function which produces client cert, passphrase 

220 protected private key and passphrase, you can convert it to a client cert 

221 callback function accepted by google-auth:: 

222 

223 from google.auth.transport import _mtls_helper 

224 

225 def your_client_cert_function(): 

226 return cert, encrypted_key, passphrase 

227 

228 # callback accepted by google-auth for mutual TLS channel. 

229 def client_cert_callback(): 

230 cert, encrypted_key, passphrase = your_client_cert_function() 

231 decrypted_key = _mtls_helper.decrypt_private_key(encrypted_key, 

232 passphrase) 

233 return cert, decrypted_key 

234 

235 Args: 

236 key (bytes): The private key bytes in PEM format. 

237 passphrase (bytes): The passphrase bytes. 

238 

239 Returns: 

240 bytes: The decrypted private key in PEM format. 

241 

242 Raises: 

243 ImportError: If pyOpenSSL is not installed. 

244 OpenSSL.crypto.Error: If there is any problem decrypting the private key. 

245 """ 

246 from OpenSSL import crypto 

247 

248 # First convert encrypted_key_bytes to PKey object 

249 pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, key, passphrase=passphrase) 

250 

251 # Then dump the decrypted key bytes 

252 return crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey)