Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/auth/transport/_mtls_helper.py: 25%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

175 statements  

1# Copyright 2020 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Helper functions for getting mTLS cert and key.""" 

16 

17import json 

18import logging 

19from os import environ, getenv, path 

20import re 

21import subprocess 

22 

23from google.auth import _agent_identity_utils 

24from google.auth import environment_vars 

25from google.auth import exceptions 

26 

27CONTEXT_AWARE_METADATA_PATH = "~/.secureConnect/context_aware_metadata.json" 

28 

29# Default gcloud config path, to be used with path.expanduser for cross-platform compatibility. 

30CERTIFICATE_CONFIGURATION_DEFAULT_PATH = "~/.config/gcloud/certificate_config.json" 

31_CERT_PROVIDER_COMMAND = "cert_provider_command" 

32_CERT_REGEX = re.compile( 

33 b"-----BEGIN CERTIFICATE-----.+-----END CERTIFICATE-----\r?\n?", re.DOTALL 

34) 

35 

36# support various format of key files, e.g. 

37# "-----BEGIN PRIVATE KEY-----...", 

38# "-----BEGIN EC PRIVATE KEY-----...", 

39# "-----BEGIN RSA PRIVATE KEY-----..." 

40# "-----BEGIN ENCRYPTED PRIVATE KEY-----" 

41_KEY_REGEX = re.compile( 

42 b"-----BEGIN [A-Z ]*PRIVATE KEY-----.+-----END [A-Z ]*PRIVATE KEY-----\r?\n?", 

43 re.DOTALL, 

44) 

45 

46_LOGGER = logging.getLogger(__name__) 

47 

48 

49_PASSPHRASE_REGEX = re.compile( 

50 b"-----BEGIN PASSPHRASE-----(.+)-----END PASSPHRASE-----", re.DOTALL 

51) 

52 

53# Temporary patch to accomodate incorrect cert config in Cloud Run prod environment. 

54_WELL_KNOWN_CLOUD_RUN_CERT_PATH = ( 

55 "/var/run/secrets/workload-spiffe-credentials/certificates.pem" 

56) 

57_WELL_KNOWN_CLOUD_RUN_KEY_PATH = ( 

58 "/var/run/secrets/workload-spiffe-credentials/private_key.pem" 

59) 

60_INCORRECT_CLOUD_RUN_CERT_PATH = ( 

61 "/var/lib/volumes/certificate/workload-certificates/certificates.pem" 

62) 

63_INCORRECT_CLOUD_RUN_KEY_PATH = ( 

64 "/var/lib/volumes/certificate/workload-certificates/private_key.pem" 

65) 

66 

67 

68def _check_config_path(config_path): 

69 """Checks for config file path. If it exists, returns the absolute path with user expansion; 

70 otherwise returns None. 

71 

72 Args: 

73 config_path (str): The config file path for either context_aware_metadata.json or certificate_config.json for example 

74 

75 Returns: 

76 str: absolute path if exists and None otherwise. 

77 """ 

78 config_path = path.expanduser(config_path) 

79 if not path.exists(config_path): 

80 _LOGGER.debug("%s is not found.", config_path) 

81 return None 

82 return config_path 

83 

84 

85def _load_json_file(path): 

86 """Reads and loads JSON from the given path. Used to read both X509 workload certificate and 

87 secure connect configurations. 

88 

89 Args: 

90 path (str): the path to read from. 

91 

92 Returns: 

93 Dict[str, str]: The JSON stored at the file. 

94 

95 Raises: 

96 google.auth.exceptions.ClientCertError: If failed to parse the file as JSON. 

97 """ 

98 try: 

99 with open(path) as f: 

100 json_data = json.load(f) 

101 except ValueError as caught_exc: 

102 new_exc = exceptions.ClientCertError(caught_exc) 

103 raise new_exc from caught_exc 

104 

105 return json_data 

106 

107 

108def _get_workload_cert_and_key( 

109 certificate_config_path=None, include_context_aware=True 

110): 

111 """Read the workload identity cert and key files specified in the certificate config provided. 

112 If no config path is provided, check the environment variable: "GOOGLE_API_CERTIFICATE_CONFIG" 

113 first, then the well known gcloud location: "~/.config/gcloud/certificate_config.json". 

114 

115 Args: 

116 certificate_config_path (string): The certificate config path. If no path is provided, 

117 the environment variable will be checked first, then the well known gcloud location. 

118 include_context_aware (bool): If context aware metadata path should be checked for the 

119 SecureConnect mTLS configuration. 

120 

121 Returns: 

122 Tuple[Optional[bytes], Optional[bytes]]: client certificate bytes in PEM format and key 

123 bytes in PEM format. 

124 

125 Raises: 

126 google.auth.exceptions.ClientCertError: if problems occurs when retrieving 

127 the certificate or key information. 

128 """ 

129 

130 cert_path, key_path = _get_workload_cert_and_key_paths( 

131 certificate_config_path, include_context_aware 

132 ) 

133 

134 if cert_path is None and key_path is None: 

135 return None, None 

136 

137 return _read_cert_and_key_files(cert_path, key_path) 

138 

139 

140def _get_cert_config_path(certificate_config_path=None, include_context_aware=True): 

141 """Get the certificate configuration path based on the following order: 

142 

143 1: Explicit override, if set 

144 2: Environment variable, if set 

145 3: Well-known location 

146 

147 Returns "None" if the selected config file does not exist. 

148 

149 Args: 

150 certificate_config_path (string): The certificate config path. If provided, the well known 

151 location and environment variable will be ignored. 

152 include_context_aware (bool): If context aware metadata path should be checked for the 

153 SecureConnect mTLS configuration. 

154 

155 Returns: 

156 The absolute path of the certificate config file, and None if the file does not exist. 

157 """ 

158 

159 if certificate_config_path is None: 

160 env_path = environ.get(environment_vars.GOOGLE_API_CERTIFICATE_CONFIG, None) 

161 if env_path is not None and env_path != "": 

162 certificate_config_path = env_path 

163 else: 

164 env_path = environ.get( 

165 environment_vars.CLOUDSDK_CONTEXT_AWARE_CERTIFICATE_CONFIG_FILE_PATH, 

166 None, 

167 ) 

168 if include_context_aware and env_path is not None and env_path != "": 

169 certificate_config_path = env_path 

170 else: 

171 certificate_config_path = CERTIFICATE_CONFIGURATION_DEFAULT_PATH 

172 

173 certificate_config_path = path.expanduser(certificate_config_path) 

174 if not path.exists(certificate_config_path): 

175 return None 

176 return certificate_config_path 

177 

178 

179def _get_workload_cert_and_key_paths(config_path, include_context_aware=True): 

180 absolute_path = _get_cert_config_path(config_path, include_context_aware) 

181 if absolute_path is None: 

182 return None, None 

183 

184 data = _load_json_file(absolute_path) 

185 

186 if "cert_configs" not in data: 

187 raise exceptions.ClientCertError( 

188 'Certificate config file {} is in an invalid format, a "cert configs" object is expected'.format( 

189 absolute_path 

190 ) 

191 ) 

192 cert_configs = data["cert_configs"] 

193 

194 # We return None, None if the expected workload fields are not present. 

195 # The certificate config might be present for other types of connections (e.g. gECC), 

196 # and we want to gracefully fallback to testing other mTLS configurations 

197 # like SecureConnect instead of throwing an exception. 

198 

199 if "workload" not in cert_configs: 

200 return None, None 

201 workload = cert_configs["workload"] 

202 

203 if "cert_path" not in workload: 

204 return None, None 

205 cert_path = workload["cert_path"] 

206 

207 if "key_path" not in workload: 

208 return None, None 

209 key_path = workload["key_path"] 

210 

211 # == BEGIN Temporary Cloud Run PATCH == 

212 # See https://github.com/googleapis/google-auth-library-python/issues/1881 

213 if (cert_path == _INCORRECT_CLOUD_RUN_CERT_PATH) and ( 

214 key_path == _INCORRECT_CLOUD_RUN_KEY_PATH 

215 ): 

216 if not path.exists(cert_path) and not path.exists(key_path): 

217 _LOGGER.debug( 

218 "Applying Cloud Run certificate path patch. " 

219 "Configured paths not found: %s, %s. " 

220 "Using well-known paths: %s, %s", 

221 cert_path, 

222 key_path, 

223 _WELL_KNOWN_CLOUD_RUN_CERT_PATH, 

224 _WELL_KNOWN_CLOUD_RUN_KEY_PATH, 

225 ) 

226 cert_path = _WELL_KNOWN_CLOUD_RUN_CERT_PATH 

227 key_path = _WELL_KNOWN_CLOUD_RUN_KEY_PATH 

228 # == END Temporary Cloud Run PATCH == 

229 

230 return cert_path, key_path 

231 

232 

233def _read_cert_and_key_files(cert_path, key_path): 

234 cert_data = _read_cert_file(cert_path) 

235 key_data = _read_key_file(key_path) 

236 

237 return cert_data, key_data 

238 

239 

240def _read_cert_file(cert_path): 

241 with open(cert_path, "rb") as cert_file: 

242 cert_data = cert_file.read() 

243 

244 cert_match = re.findall(_CERT_REGEX, cert_data) 

245 if len(cert_match) != 1: 

246 raise exceptions.ClientCertError( 

247 "Certificate file {} is in an invalid format, a single PEM formatted certificate is expected".format( 

248 cert_path 

249 ) 

250 ) 

251 return cert_match[0] 

252 

253 

254def _read_key_file(key_path): 

255 with open(key_path, "rb") as key_file: 

256 key_data = key_file.read() 

257 

258 key_match = re.findall(_KEY_REGEX, key_data) 

259 if len(key_match) != 1: 

260 raise exceptions.ClientCertError( 

261 "Private key file {} is in an invalid format, a single PEM formatted private key is expected".format( 

262 key_path 

263 ) 

264 ) 

265 

266 return key_match[0] 

267 

268 

269def _run_cert_provider_command(command, expect_encrypted_key=False): 

270 """Run the provided command, and return client side mTLS cert, key and 

271 passphrase. 

272 

273 Args: 

274 command (List[str]): cert provider command. 

275 expect_encrypted_key (bool): If encrypted private key is expected. 

276 

277 Returns: 

278 Tuple[bytes, bytes, bytes]: client certificate bytes in PEM format, key 

279 bytes in PEM format and passphrase bytes. 

280 

281 Raises: 

282 google.auth.exceptions.ClientCertError: if problems occurs when running 

283 the cert provider command or generating cert, key and passphrase. 

284 """ 

285 try: 

286 process = subprocess.Popen( 

287 command, stdout=subprocess.PIPE, stderr=subprocess.PIPE 

288 ) 

289 stdout, stderr = process.communicate() 

290 except OSError as caught_exc: 

291 new_exc = exceptions.ClientCertError(caught_exc) 

292 raise new_exc from caught_exc 

293 

294 # Check cert provider command execution error. 

295 if process.returncode != 0: 

296 raise exceptions.ClientCertError( 

297 "Cert provider command returns non-zero status code %s" % process.returncode 

298 ) 

299 

300 # Extract certificate (chain), key and passphrase. 

301 cert_match = re.findall(_CERT_REGEX, stdout) 

302 if len(cert_match) != 1: 

303 raise exceptions.ClientCertError("Client SSL certificate is missing or invalid") 

304 key_match = re.findall(_KEY_REGEX, stdout) 

305 if len(key_match) != 1: 

306 raise exceptions.ClientCertError("Client SSL key is missing or invalid") 

307 passphrase_match = re.findall(_PASSPHRASE_REGEX, stdout) 

308 

309 if expect_encrypted_key: 

310 if len(passphrase_match) != 1: 

311 raise exceptions.ClientCertError("Passphrase is missing or invalid") 

312 if b"ENCRYPTED" not in key_match[0]: 

313 raise exceptions.ClientCertError("Encrypted private key is expected") 

314 return cert_match[0], key_match[0], passphrase_match[0].strip() 

315 

316 if b"ENCRYPTED" in key_match[0]: 

317 raise exceptions.ClientCertError("Encrypted private key is not expected") 

318 if len(passphrase_match) > 0: 

319 raise exceptions.ClientCertError("Passphrase is not expected") 

320 return cert_match[0], key_match[0], None 

321 

322 

323def get_client_ssl_credentials( 

324 generate_encrypted_key=False, 

325 context_aware_metadata_path=CONTEXT_AWARE_METADATA_PATH, 

326 certificate_config_path=None, 

327): 

328 """Returns the client side certificate, private key and passphrase. 

329 

330 We look for certificates and keys with the following order of priority: 

331 1. Certificate and key specified by certificate_config.json. 

332 Currently, only X.509 workload certificates are supported. 

333 2. Certificate and key specified by context aware metadata (i.e. SecureConnect). 

334 

335 Args: 

336 generate_encrypted_key (bool): If set to True, encrypted private key 

337 and passphrase will be generated; otherwise, unencrypted private key 

338 will be generated and passphrase will be None. This option only 

339 affects keys obtained via context_aware_metadata.json. 

340 context_aware_metadata_path (str): The context_aware_metadata.json file path. 

341 certificate_config_path (str): The certificate_config.json file path. 

342 

343 Returns: 

344 Tuple[bool, bytes, bytes, bytes]: 

345 A boolean indicating if cert, key and passphrase are obtained, the 

346 cert bytes and key bytes both in PEM format, and passphrase bytes. 

347 

348 Raises: 

349 google.auth.exceptions.ClientCertError: if problems occurs when getting 

350 the cert, key and passphrase. 

351 """ 

352 

353 # 1. Attempt to retrieve X.509 Workload cert and key. 

354 cert, key = _get_workload_cert_and_key(certificate_config_path) 

355 if cert and key: 

356 return True, cert, key, None 

357 

358 # 2. Check for context aware metadata json 

359 metadata_path = _check_config_path(context_aware_metadata_path) 

360 

361 if metadata_path: 

362 metadata_json = _load_json_file(metadata_path) 

363 

364 if _CERT_PROVIDER_COMMAND not in metadata_json: 

365 raise exceptions.ClientCertError("Cert provider command is not found") 

366 

367 command = metadata_json[_CERT_PROVIDER_COMMAND] 

368 

369 if generate_encrypted_key and "--with_passphrase" not in command: 

370 command.append("--with_passphrase") 

371 

372 # Execute the command. 

373 cert, key, passphrase = _run_cert_provider_command( 

374 command, expect_encrypted_key=generate_encrypted_key 

375 ) 

376 return True, cert, key, passphrase 

377 

378 return False, None, None, None 

379 

380 

381def get_client_cert_and_key(client_cert_callback=None): 

382 """Returns the client side certificate and private key. The function first 

383 tries to get certificate and key from client_cert_callback; if the callback 

384 is None or doesn't provide certificate and key, the function tries application 

385 default SSL credentials. 

386 

387 Args: 

388 client_cert_callback (Optional[Callable[[], (bytes, bytes)]]): An 

389 optional callback which returns client certificate bytes and private 

390 key bytes both in PEM format. 

391 

392 Returns: 

393 Tuple[bool, bytes, bytes]: 

394 A boolean indicating if cert and key are obtained, the cert bytes 

395 and key bytes both in PEM format. 

396 

397 Raises: 

398 google.auth.exceptions.ClientCertError: if problems occurs when getting 

399 the cert and key. 

400 """ 

401 if client_cert_callback: 

402 cert, key = client_cert_callback() 

403 return True, cert, key 

404 

405 has_cert, cert, key, _ = get_client_ssl_credentials(generate_encrypted_key=False) 

406 return has_cert, cert, key 

407 

408 

409def decrypt_private_key(key, passphrase): 

410 """A helper function to decrypt the private key with the given passphrase. 

411 google-auth library doesn't support passphrase protected private key for 

412 mutual TLS channel. This helper function can be used to decrypt the 

413 passphrase protected private key in order to estalish mutual TLS channel. 

414 

415 For example, if you have a function which produces client cert, passphrase 

416 protected private key and passphrase, you can convert it to a client cert 

417 callback function accepted by google-auth:: 

418 

419 from google.auth.transport import _mtls_helper 

420 

421 def your_client_cert_function(): 

422 return cert, encrypted_key, passphrase 

423 

424 # callback accepted by google-auth for mutual TLS channel. 

425 def client_cert_callback(): 

426 cert, encrypted_key, passphrase = your_client_cert_function() 

427 decrypted_key = _mtls_helper.decrypt_private_key(encrypted_key, 

428 passphrase) 

429 return cert, decrypted_key 

430 

431 Args: 

432 key (bytes): The private key bytes in PEM format. 

433 passphrase (bytes): The passphrase bytes. 

434 

435 Returns: 

436 bytes: The decrypted private key in PEM format. 

437 

438 Raises: 

439 ImportError: If pyOpenSSL is not installed. 

440 OpenSSL.crypto.Error: If there is any problem decrypting the private key. 

441 """ 

442 from OpenSSL import crypto 

443 

444 # First convert encrypted_key_bytes to PKey object 

445 pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, key, passphrase=passphrase) 

446 

447 # Then dump the decrypted key bytes 

448 return crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey) 

449 

450 

451def check_use_client_cert(): 

452 """Returns boolean for whether the client certificate should be used for mTLS. 

453 

454 If GOOGLE_API_USE_CLIENT_CERTIFICATE is set to true or false, a corresponding 

455 bool value will be returned. If the value is set to an unexpected string, it 

456 will default to False. 

457 If GOOGLE_API_USE_CLIENT_CERTIFICATE is unset, the value will be inferred 

458 by reading a file pointed at by GOOGLE_API_CERTIFICATE_CONFIG, and verifying 

459 it contains a "workload" section. If so, the function will return True, 

460 otherwise False. 

461 

462 Returns: 

463 bool: Whether the client certificate should be used for mTLS connection. 

464 """ 

465 use_client_cert = getenv(environment_vars.GOOGLE_API_USE_CLIENT_CERTIFICATE) 

466 if use_client_cert is None or use_client_cert == "": 

467 use_client_cert = getenv( 

468 environment_vars.CLOUDSDK_CONTEXT_AWARE_USE_CLIENT_CERTIFICATE 

469 ) 

470 

471 # Check if the value of GOOGLE_API_USE_CLIENT_CERTIFICATE is set. 

472 if use_client_cert: 

473 return use_client_cert.lower() == "true" 

474 else: 

475 # Check if the value of GOOGLE_API_CERTIFICATE_CONFIG is set. 

476 cert_path = getenv(environment_vars.GOOGLE_API_CERTIFICATE_CONFIG) 

477 if cert_path is None: 

478 cert_path = getenv( 

479 environment_vars.CLOUDSDK_CONTEXT_AWARE_CERTIFICATE_CONFIG_FILE_PATH 

480 ) 

481 

482 if cert_path: 

483 try: 

484 with open(cert_path, "r") as f: 

485 content = json.load(f) 

486 # verify json has workload key 

487 content["cert_configs"]["workload"] 

488 return True 

489 except ( 

490 FileNotFoundError, 

491 OSError, 

492 KeyError, 

493 TypeError, 

494 json.JSONDecodeError, 

495 ) as e: 

496 _LOGGER.debug("error decoding certificate: %s", e) 

497 return False 

498 

499 

500def check_parameters_for_unauthorized_response(cached_cert): 

501 """Returns the cached and current cert fingerprint for reconfiguring mTLS. 

502 

503 Args: 

504 cached_cert(bytes): The cached client certificate. 

505 

506 Returns: 

507 bytes: The client callback cert bytes. 

508 bytes: The client callback key bytes. 

509 str: The base64-encoded SHA256 cached fingerprint. 

510 str: The base64-encoded SHA256 current cert fingerprint. 

511 """ 

512 call_cert_bytes, call_key_bytes = call_client_cert_callback() 

513 cert_obj = _agent_identity_utils.parse_certificate(call_cert_bytes) 

514 current_cert_fingerprint = _agent_identity_utils.calculate_certificate_fingerprint( 

515 cert_obj 

516 ) 

517 if cached_cert: 

518 cached_fingerprint = _agent_identity_utils.get_cached_cert_fingerprint( 

519 cached_cert 

520 ) 

521 else: 

522 cached_fingerprint = current_cert_fingerprint 

523 return call_cert_bytes, call_key_bytes, cached_fingerprint, current_cert_fingerprint 

524 

525 

526def call_client_cert_callback(): 

527 """Calls the client cert callback and returns the certificate and key.""" 

528 _, cert_bytes, key_bytes, passphrase = get_client_ssl_credentials( 

529 generate_encrypted_key=True 

530 ) 

531 return cert_bytes, key_bytes