Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/auth/transport/_mtls_helper.py: 21%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

129 statements  

1# Copyright 2020 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Helper functions for getting mTLS cert and key.""" 

16 

17import json 

18import logging 

19from os import environ, path 

20import re 

21import subprocess 

22 

23from google.auth import exceptions 

24 

25CONTEXT_AWARE_METADATA_PATH = "~/.secureConnect/context_aware_metadata.json" 

26_CERTIFICATE_CONFIGURATION_DEFAULT_PATH = "~/.config/gcloud/certificate_config.json" 

27_CERTIFICATE_CONFIGURATION_ENV = "GOOGLE_API_CERTIFICATE_CONFIG" 

28_CERT_PROVIDER_COMMAND = "cert_provider_command" 

29_CERT_REGEX = re.compile( 

30 b"-----BEGIN CERTIFICATE-----.+-----END CERTIFICATE-----\r?\n?", re.DOTALL 

31) 

32 

33# support various format of key files, e.g. 

34# "-----BEGIN PRIVATE KEY-----...", 

35# "-----BEGIN EC PRIVATE KEY-----...", 

36# "-----BEGIN RSA PRIVATE KEY-----..." 

37# "-----BEGIN ENCRYPTED PRIVATE KEY-----" 

38_KEY_REGEX = re.compile( 

39 b"-----BEGIN [A-Z ]*PRIVATE KEY-----.+-----END [A-Z ]*PRIVATE KEY-----\r?\n?", 

40 re.DOTALL, 

41) 

42 

43_LOGGER = logging.getLogger(__name__) 

44 

45 

46_PASSPHRASE_REGEX = re.compile( 

47 b"-----BEGIN PASSPHRASE-----(.+)-----END PASSPHRASE-----", re.DOTALL 

48) 

49 

50 

51def _check_dca_metadata_path(metadata_path): 

52 """Checks for context aware metadata. If it exists, returns the absolute path; 

53 otherwise returns None. 

54 

55 Args: 

56 metadata_path (str): context aware metadata path. 

57 

58 Returns: 

59 str: absolute path if exists and None otherwise. 

60 """ 

61 metadata_path = path.expanduser(metadata_path) 

62 if not path.exists(metadata_path): 

63 _LOGGER.debug("%s is not found, skip client SSL authentication.", metadata_path) 

64 return None 

65 return metadata_path 

66 

67 

68def _load_json_file(path): 

69 """Reads and loads JSON from the given path. Used to read both X509 workload certificate and 

70 secure connect configurations. 

71 

72 Args: 

73 path (str): the path to read from. 

74 

75 Returns: 

76 Dict[str, str]: The JSON stored at the file. 

77 

78 Raises: 

79 google.auth.exceptions.ClientCertError: If failed to parse the file as JSON. 

80 """ 

81 try: 

82 with open(path) as f: 

83 json_data = json.load(f) 

84 except ValueError as caught_exc: 

85 new_exc = exceptions.ClientCertError(caught_exc) 

86 raise new_exc from caught_exc 

87 

88 return json_data 

89 

90 

91def _get_workload_cert_and_key(certificate_config_path=None): 

92 """Read the workload identity cert and key files specified in the certificate config provided. 

93 If no config path is provided, check the environment variable: "GOOGLE_API_CERTIFICATE_CONFIG" 

94 first, then the well known gcloud location: "~/.config/gcloud/certificate_config.json". 

95 

96 Args: 

97 certificate_config_path (string): The certificate config path. If no path is provided, 

98 the environment variable will be checked first, then the well known gcloud location. 

99 

100 Returns: 

101 Tuple[Optional[bytes], Optional[bytes]]: client certificate bytes in PEM format and key 

102 bytes in PEM format. 

103 

104 Raises: 

105 google.auth.exceptions.ClientCertError: if problems occurs when retrieving 

106 the certificate or key information. 

107 """ 

108 

109 cert_path, key_path = _get_workload_cert_and_key_paths(certificate_config_path) 

110 

111 if cert_path is None and key_path is None: 

112 return None, None 

113 

114 return _read_cert_and_key_files(cert_path, key_path) 

115 

116 

117def _get_cert_config_path(certificate_config_path=None): 

118 """Get the certificate configuration path based on the following order: 

119 

120 1: Explicit override, if set 

121 2: Environment variable, if set 

122 3: Well-known location 

123 

124 Returns "None" if the selected config file does not exist. 

125 

126 Args: 

127 certificate_config_path (string): The certificate config path. If provided, the well known 

128 location and environment variable will be ignored. 

129 

130 Returns: 

131 The absolute path of the certificate config file, and None if the file does not exist. 

132 """ 

133 

134 if certificate_config_path is None: 

135 env_path = environ.get(_CERTIFICATE_CONFIGURATION_ENV, None) 

136 if env_path is not None and env_path != "": 

137 certificate_config_path = env_path 

138 else: 

139 certificate_config_path = _CERTIFICATE_CONFIGURATION_DEFAULT_PATH 

140 

141 certificate_config_path = path.expanduser(certificate_config_path) 

142 if not path.exists(certificate_config_path): 

143 return None 

144 return certificate_config_path 

145 

146 

147def _get_workload_cert_and_key_paths(config_path): 

148 absolute_path = _get_cert_config_path(config_path) 

149 if absolute_path is None: 

150 return None, None 

151 

152 data = _load_json_file(absolute_path) 

153 

154 if "cert_configs" not in data: 

155 raise exceptions.ClientCertError( 

156 'Certificate config file {} is in an invalid format, a "cert configs" object is expected'.format( 

157 absolute_path 

158 ) 

159 ) 

160 cert_configs = data["cert_configs"] 

161 

162 if "workload" not in cert_configs: 

163 raise exceptions.ClientCertError( 

164 'Certificate config file {} is in an invalid format, a "workload" cert config is expected'.format( 

165 absolute_path 

166 ) 

167 ) 

168 workload = cert_configs["workload"] 

169 

170 if "cert_path" not in workload: 

171 raise exceptions.ClientCertError( 

172 'Certificate config file {} is in an invalid format, a "cert_path" is expected in the workload cert config'.format( 

173 absolute_path 

174 ) 

175 ) 

176 cert_path = workload["cert_path"] 

177 

178 if "key_path" not in workload: 

179 raise exceptions.ClientCertError( 

180 'Certificate config file {} is in an invalid format, a "key_path" is expected in the workload cert config'.format( 

181 absolute_path 

182 ) 

183 ) 

184 key_path = workload["key_path"] 

185 

186 return cert_path, key_path 

187 

188 

189def _read_cert_and_key_files(cert_path, key_path): 

190 cert_data = _read_cert_file(cert_path) 

191 key_data = _read_key_file(key_path) 

192 

193 return cert_data, key_data 

194 

195 

196def _read_cert_file(cert_path): 

197 with open(cert_path, "rb") as cert_file: 

198 cert_data = cert_file.read() 

199 

200 cert_match = re.findall(_CERT_REGEX, cert_data) 

201 if len(cert_match) != 1: 

202 raise exceptions.ClientCertError( 

203 "Certificate file {} is in an invalid format, a single PEM formatted certificate is expected".format( 

204 cert_path 

205 ) 

206 ) 

207 return cert_match[0] 

208 

209 

210def _read_key_file(key_path): 

211 with open(key_path, "rb") as key_file: 

212 key_data = key_file.read() 

213 

214 key_match = re.findall(_KEY_REGEX, key_data) 

215 if len(key_match) != 1: 

216 raise exceptions.ClientCertError( 

217 "Private key file {} is in an invalid format, a single PEM formatted private key is expected".format( 

218 key_path 

219 ) 

220 ) 

221 

222 return key_match[0] 

223 

224 

225def _run_cert_provider_command(command, expect_encrypted_key=False): 

226 """Run the provided command, and return client side mTLS cert, key and 

227 passphrase. 

228 

229 Args: 

230 command (List[str]): cert provider command. 

231 expect_encrypted_key (bool): If encrypted private key is expected. 

232 

233 Returns: 

234 Tuple[bytes, bytes, bytes]: client certificate bytes in PEM format, key 

235 bytes in PEM format and passphrase bytes. 

236 

237 Raises: 

238 google.auth.exceptions.ClientCertError: if problems occurs when running 

239 the cert provider command or generating cert, key and passphrase. 

240 """ 

241 try: 

242 process = subprocess.Popen( 

243 command, stdout=subprocess.PIPE, stderr=subprocess.PIPE 

244 ) 

245 stdout, stderr = process.communicate() 

246 except OSError as caught_exc: 

247 new_exc = exceptions.ClientCertError(caught_exc) 

248 raise new_exc from caught_exc 

249 

250 # Check cert provider command execution error. 

251 if process.returncode != 0: 

252 raise exceptions.ClientCertError( 

253 "Cert provider command returns non-zero status code %s" % process.returncode 

254 ) 

255 

256 # Extract certificate (chain), key and passphrase. 

257 cert_match = re.findall(_CERT_REGEX, stdout) 

258 if len(cert_match) != 1: 

259 raise exceptions.ClientCertError("Client SSL certificate is missing or invalid") 

260 key_match = re.findall(_KEY_REGEX, stdout) 

261 if len(key_match) != 1: 

262 raise exceptions.ClientCertError("Client SSL key is missing or invalid") 

263 passphrase_match = re.findall(_PASSPHRASE_REGEX, stdout) 

264 

265 if expect_encrypted_key: 

266 if len(passphrase_match) != 1: 

267 raise exceptions.ClientCertError("Passphrase is missing or invalid") 

268 if b"ENCRYPTED" not in key_match[0]: 

269 raise exceptions.ClientCertError("Encrypted private key is expected") 

270 return cert_match[0], key_match[0], passphrase_match[0].strip() 

271 

272 if b"ENCRYPTED" in key_match[0]: 

273 raise exceptions.ClientCertError("Encrypted private key is not expected") 

274 if len(passphrase_match) > 0: 

275 raise exceptions.ClientCertError("Passphrase is not expected") 

276 return cert_match[0], key_match[0], None 

277 

278 

279def get_client_ssl_credentials( 

280 generate_encrypted_key=False, 

281 context_aware_metadata_path=CONTEXT_AWARE_METADATA_PATH, 

282): 

283 """Returns the client side certificate, private key and passphrase. 

284 

285 Args: 

286 generate_encrypted_key (bool): If set to True, encrypted private key 

287 and passphrase will be generated; otherwise, unencrypted private key 

288 will be generated and passphrase will be None. 

289 context_aware_metadata_path (str): The context_aware_metadata.json file path. 

290 

291 Returns: 

292 Tuple[bool, bytes, bytes, bytes]: 

293 A boolean indicating if cert, key and passphrase are obtained, the 

294 cert bytes and key bytes both in PEM format, and passphrase bytes. 

295 

296 Raises: 

297 google.auth.exceptions.ClientCertError: if problems occurs when getting 

298 the cert, key and passphrase. 

299 """ 

300 metadata_path = _check_dca_metadata_path(context_aware_metadata_path) 

301 

302 if metadata_path: 

303 metadata_json = _load_json_file(metadata_path) 

304 

305 if _CERT_PROVIDER_COMMAND not in metadata_json: 

306 raise exceptions.ClientCertError("Cert provider command is not found") 

307 

308 command = metadata_json[_CERT_PROVIDER_COMMAND] 

309 

310 if generate_encrypted_key and "--with_passphrase" not in command: 

311 command.append("--with_passphrase") 

312 

313 # Execute the command. 

314 cert, key, passphrase = _run_cert_provider_command( 

315 command, expect_encrypted_key=generate_encrypted_key 

316 ) 

317 return True, cert, key, passphrase 

318 

319 return False, None, None, None 

320 

321 

322def get_client_cert_and_key(client_cert_callback=None): 

323 """Returns the client side certificate and private key. The function first 

324 tries to get certificate and key from client_cert_callback; if the callback 

325 is None or doesn't provide certificate and key, the function tries application 

326 default SSL credentials. 

327 

328 Args: 

329 client_cert_callback (Optional[Callable[[], (bytes, bytes)]]): An 

330 optional callback which returns client certificate bytes and private 

331 key bytes both in PEM format. 

332 

333 Returns: 

334 Tuple[bool, bytes, bytes]: 

335 A boolean indicating if cert and key are obtained, the cert bytes 

336 and key bytes both in PEM format. 

337 

338 Raises: 

339 google.auth.exceptions.ClientCertError: if problems occurs when getting 

340 the cert and key. 

341 """ 

342 if client_cert_callback: 

343 cert, key = client_cert_callback() 

344 return True, cert, key 

345 

346 has_cert, cert, key, _ = get_client_ssl_credentials(generate_encrypted_key=False) 

347 return has_cert, cert, key 

348 

349 

350def decrypt_private_key(key, passphrase): 

351 """A helper function to decrypt the private key with the given passphrase. 

352 google-auth library doesn't support passphrase protected private key for 

353 mutual TLS channel. This helper function can be used to decrypt the 

354 passphrase protected private key in order to estalish mutual TLS channel. 

355 

356 For example, if you have a function which produces client cert, passphrase 

357 protected private key and passphrase, you can convert it to a client cert 

358 callback function accepted by google-auth:: 

359 

360 from google.auth.transport import _mtls_helper 

361 

362 def your_client_cert_function(): 

363 return cert, encrypted_key, passphrase 

364 

365 # callback accepted by google-auth for mutual TLS channel. 

366 def client_cert_callback(): 

367 cert, encrypted_key, passphrase = your_client_cert_function() 

368 decrypted_key = _mtls_helper.decrypt_private_key(encrypted_key, 

369 passphrase) 

370 return cert, decrypted_key 

371 

372 Args: 

373 key (bytes): The private key bytes in PEM format. 

374 passphrase (bytes): The passphrase bytes. 

375 

376 Returns: 

377 bytes: The decrypted private key in PEM format. 

378 

379 Raises: 

380 ImportError: If pyOpenSSL is not installed. 

381 OpenSSL.crypto.Error: If there is any problem decrypting the private key. 

382 """ 

383 from OpenSSL import crypto 

384 

385 # First convert encrypted_key_bytes to PKey object 

386 pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, key, passphrase=passphrase) 

387 

388 # Then dump the decrypted key bytes 

389 return crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey)