Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/auth/transport/_mtls_helper.py: 26%

77 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-06 06:03 +0000

1# Copyright 2020 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Helper functions for getting mTLS cert and key.""" 

16 

17import json 

18import logging 

19from os import path 

20import re 

21import subprocess 

22 

23import six 

24 

25from google.auth import exceptions 

26 

27CONTEXT_AWARE_METADATA_PATH = "~/.secureConnect/context_aware_metadata.json" 

28_CERT_PROVIDER_COMMAND = "cert_provider_command" 

29_CERT_REGEX = re.compile( 

30 b"-----BEGIN CERTIFICATE-----.+-----END CERTIFICATE-----\r?\n?", re.DOTALL 

31) 

32 

33# support various format of key files, e.g. 

34# "-----BEGIN PRIVATE KEY-----...", 

35# "-----BEGIN EC PRIVATE KEY-----...", 

36# "-----BEGIN RSA PRIVATE KEY-----..." 

37# "-----BEGIN ENCRYPTED PRIVATE KEY-----" 

38_KEY_REGEX = re.compile( 

39 b"-----BEGIN [A-Z ]*PRIVATE KEY-----.+-----END [A-Z ]*PRIVATE KEY-----\r?\n?", 

40 re.DOTALL, 

41) 

42 

43_LOGGER = logging.getLogger(__name__) 

44 

45 

46_PASSPHRASE_REGEX = re.compile( 

47 b"-----BEGIN PASSPHRASE-----(.+)-----END PASSPHRASE-----", re.DOTALL 

48) 

49 

50 

51def _check_dca_metadata_path(metadata_path): 

52 """Checks for context aware metadata. If it exists, returns the absolute path; 

53 otherwise returns None. 

54 

55 Args: 

56 metadata_path (str): context aware metadata path. 

57 

58 Returns: 

59 str: absolute path if exists and None otherwise. 

60 """ 

61 metadata_path = path.expanduser(metadata_path) 

62 if not path.exists(metadata_path): 

63 _LOGGER.debug("%s is not found, skip client SSL authentication.", metadata_path) 

64 return None 

65 return metadata_path 

66 

67 

68def _read_dca_metadata_file(metadata_path): 

69 """Loads context aware metadata from the given path. 

70 

71 Args: 

72 metadata_path (str): context aware metadata path. 

73 

74 Returns: 

75 Dict[str, str]: The metadata. 

76 

77 Raises: 

78 google.auth.exceptions.ClientCertError: If failed to parse metadata as JSON. 

79 """ 

80 try: 

81 with open(metadata_path) as f: 

82 metadata = json.load(f) 

83 except ValueError as caught_exc: 

84 new_exc = exceptions.ClientCertError(caught_exc) 

85 six.raise_from(new_exc, caught_exc) 

86 

87 return metadata 

88 

89 

90def _run_cert_provider_command(command, expect_encrypted_key=False): 

91 """Run the provided command, and return client side mTLS cert, key and 

92 passphrase. 

93 

94 Args: 

95 command (List[str]): cert provider command. 

96 expect_encrypted_key (bool): If encrypted private key is expected. 

97 

98 Returns: 

99 Tuple[bytes, bytes, bytes]: client certificate bytes in PEM format, key 

100 bytes in PEM format and passphrase bytes. 

101 

102 Raises: 

103 google.auth.exceptions.ClientCertError: if problems occurs when running 

104 the cert provider command or generating cert, key and passphrase. 

105 """ 

106 try: 

107 process = subprocess.Popen( 

108 command, stdout=subprocess.PIPE, stderr=subprocess.PIPE 

109 ) 

110 stdout, stderr = process.communicate() 

111 except OSError as caught_exc: 

112 new_exc = exceptions.ClientCertError(caught_exc) 

113 six.raise_from(new_exc, caught_exc) 

114 

115 # Check cert provider command execution error. 

116 if process.returncode != 0: 

117 raise exceptions.ClientCertError( 

118 "Cert provider command returns non-zero status code %s" % process.returncode 

119 ) 

120 

121 # Extract certificate (chain), key and passphrase. 

122 cert_match = re.findall(_CERT_REGEX, stdout) 

123 if len(cert_match) != 1: 

124 raise exceptions.ClientCertError("Client SSL certificate is missing or invalid") 

125 key_match = re.findall(_KEY_REGEX, stdout) 

126 if len(key_match) != 1: 

127 raise exceptions.ClientCertError("Client SSL key is missing or invalid") 

128 passphrase_match = re.findall(_PASSPHRASE_REGEX, stdout) 

129 

130 if expect_encrypted_key: 

131 if len(passphrase_match) != 1: 

132 raise exceptions.ClientCertError("Passphrase is missing or invalid") 

133 if b"ENCRYPTED" not in key_match[0]: 

134 raise exceptions.ClientCertError("Encrypted private key is expected") 

135 return cert_match[0], key_match[0], passphrase_match[0].strip() 

136 

137 if b"ENCRYPTED" in key_match[0]: 

138 raise exceptions.ClientCertError("Encrypted private key is not expected") 

139 if len(passphrase_match) > 0: 

140 raise exceptions.ClientCertError("Passphrase is not expected") 

141 return cert_match[0], key_match[0], None 

142 

143 

144def get_client_ssl_credentials( 

145 generate_encrypted_key=False, 

146 context_aware_metadata_path=CONTEXT_AWARE_METADATA_PATH, 

147): 

148 """Returns the client side certificate, private key and passphrase. 

149 

150 Args: 

151 generate_encrypted_key (bool): If set to True, encrypted private key 

152 and passphrase will be generated; otherwise, unencrypted private key 

153 will be generated and passphrase will be None. 

154 context_aware_metadata_path (str): The context_aware_metadata.json file path. 

155 

156 Returns: 

157 Tuple[bool, bytes, bytes, bytes]: 

158 A boolean indicating if cert, key and passphrase are obtained, the 

159 cert bytes and key bytes both in PEM format, and passphrase bytes. 

160 

161 Raises: 

162 google.auth.exceptions.ClientCertError: if problems occurs when getting 

163 the cert, key and passphrase. 

164 """ 

165 metadata_path = _check_dca_metadata_path(context_aware_metadata_path) 

166 

167 if metadata_path: 

168 metadata_json = _read_dca_metadata_file(metadata_path) 

169 

170 if _CERT_PROVIDER_COMMAND not in metadata_json: 

171 raise exceptions.ClientCertError("Cert provider command is not found") 

172 

173 command = metadata_json[_CERT_PROVIDER_COMMAND] 

174 

175 if generate_encrypted_key and "--with_passphrase" not in command: 

176 command.append("--with_passphrase") 

177 

178 # Execute the command. 

179 cert, key, passphrase = _run_cert_provider_command( 

180 command, expect_encrypted_key=generate_encrypted_key 

181 ) 

182 return True, cert, key, passphrase 

183 

184 return False, None, None, None 

185 

186 

187def get_client_cert_and_key(client_cert_callback=None): 

188 """Returns the client side certificate and private key. The function first 

189 tries to get certificate and key from client_cert_callback; if the callback 

190 is None or doesn't provide certificate and key, the function tries application 

191 default SSL credentials. 

192 

193 Args: 

194 client_cert_callback (Optional[Callable[[], (bytes, bytes)]]): An 

195 optional callback which returns client certificate bytes and private 

196 key bytes both in PEM format. 

197 

198 Returns: 

199 Tuple[bool, bytes, bytes]: 

200 A boolean indicating if cert and key are obtained, the cert bytes 

201 and key bytes both in PEM format. 

202 

203 Raises: 

204 google.auth.exceptions.ClientCertError: if problems occurs when getting 

205 the cert and key. 

206 """ 

207 if client_cert_callback: 

208 cert, key = client_cert_callback() 

209 return True, cert, key 

210 

211 has_cert, cert, key, _ = get_client_ssl_credentials(generate_encrypted_key=False) 

212 return has_cert, cert, key 

213 

214 

215def decrypt_private_key(key, passphrase): 

216 """A helper function to decrypt the private key with the given passphrase. 

217 google-auth library doesn't support passphrase protected private key for 

218 mutual TLS channel. This helper function can be used to decrypt the 

219 passphrase protected private key in order to estalish mutual TLS channel. 

220 

221 For example, if you have a function which produces client cert, passphrase 

222 protected private key and passphrase, you can convert it to a client cert 

223 callback function accepted by google-auth:: 

224 

225 from google.auth.transport import _mtls_helper 

226 

227 def your_client_cert_function(): 

228 return cert, encrypted_key, passphrase 

229 

230 # callback accepted by google-auth for mutual TLS channel. 

231 def client_cert_callback(): 

232 cert, encrypted_key, passphrase = your_client_cert_function() 

233 decrypted_key = _mtls_helper.decrypt_private_key(encrypted_key, 

234 passphrase) 

235 return cert, decrypted_key 

236 

237 Args: 

238 key (bytes): The private key bytes in PEM format. 

239 passphrase (bytes): The passphrase bytes. 

240 

241 Returns: 

242 bytes: The decrypted private key in PEM format. 

243 

244 Raises: 

245 ImportError: If pyOpenSSL is not installed. 

246 OpenSSL.crypto.Error: If there is any problem decrypting the private key. 

247 """ 

248 from OpenSSL import crypto 

249 

250 # First convert encrypted_key_bytes to PKey object 

251 pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, key, passphrase=passphrase) 

252 

253 # Then dump the decrypted key bytes 

254 return crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey)