Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/auth/transport/_mtls_helper.py: 21%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

165 statements  

1# Copyright 2020 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Helper functions for getting mTLS cert and key.""" 

16 

17import json 

18import logging 

19from os import environ, getenv, path 

20import re 

21import subprocess 

22 

23from google.auth import _agent_identity_utils 

24from google.auth import environment_vars 

25from google.auth import exceptions 

26 

27CONTEXT_AWARE_METADATA_PATH = "~/.secureConnect/context_aware_metadata.json" 

28CERTIFICATE_CONFIGURATION_DEFAULT_PATH = "~/.config/gcloud/certificate_config.json" 

29_CERT_PROVIDER_COMMAND = "cert_provider_command" 

30_CERT_REGEX = re.compile( 

31 b"-----BEGIN CERTIFICATE-----.+-----END CERTIFICATE-----\r?\n?", re.DOTALL 

32) 

33 

34# support various format of key files, e.g. 

35# "-----BEGIN PRIVATE KEY-----...", 

36# "-----BEGIN EC PRIVATE KEY-----...", 

37# "-----BEGIN RSA PRIVATE KEY-----..." 

38# "-----BEGIN ENCRYPTED PRIVATE KEY-----" 

39_KEY_REGEX = re.compile( 

40 b"-----BEGIN [A-Z ]*PRIVATE KEY-----.+-----END [A-Z ]*PRIVATE KEY-----\r?\n?", 

41 re.DOTALL, 

42) 

43 

44_LOGGER = logging.getLogger(__name__) 

45 

46 

47_PASSPHRASE_REGEX = re.compile( 

48 b"-----BEGIN PASSPHRASE-----(.+)-----END PASSPHRASE-----", re.DOTALL 

49) 

50 

51# Temporary patch to accomodate incorrect cert config in Cloud Run prod environment. 

52_WELL_KNOWN_CLOUD_RUN_CERT_PATH = ( 

53 "/var/run/secrets/workload-spiffe-credentials/certificates.pem" 

54) 

55_WELL_KNOWN_CLOUD_RUN_KEY_PATH = ( 

56 "/var/run/secrets/workload-spiffe-credentials/private_key.pem" 

57) 

58_INCORRECT_CLOUD_RUN_CERT_PATH = ( 

59 "/var/lib/volumes/certificate/workload-certificates/certificates.pem" 

60) 

61_INCORRECT_CLOUD_RUN_KEY_PATH = ( 

62 "/var/lib/volumes/certificate/workload-certificates/private_key.pem" 

63) 

64 

65 

66def _check_config_path(config_path): 

67 """Checks for config file path. If it exists, returns the absolute path with user expansion; 

68 otherwise returns None. 

69 

70 Args: 

71 config_path (str): The config file path for either context_aware_metadata.json or certificate_config.json for example 

72 

73 Returns: 

74 str: absolute path if exists and None otherwise. 

75 """ 

76 config_path = path.expanduser(config_path) 

77 if not path.exists(config_path): 

78 _LOGGER.debug("%s is not found.", config_path) 

79 return None 

80 return config_path 

81 

82 

83def _load_json_file(path): 

84 """Reads and loads JSON from the given path. Used to read both X509 workload certificate and 

85 secure connect configurations. 

86 

87 Args: 

88 path (str): the path to read from. 

89 

90 Returns: 

91 Dict[str, str]: The JSON stored at the file. 

92 

93 Raises: 

94 google.auth.exceptions.ClientCertError: If failed to parse the file as JSON. 

95 """ 

96 try: 

97 with open(path) as f: 

98 json_data = json.load(f) 

99 except ValueError as caught_exc: 

100 new_exc = exceptions.ClientCertError(caught_exc) 

101 raise new_exc from caught_exc 

102 

103 return json_data 

104 

105 

106def _get_workload_cert_and_key(certificate_config_path=None): 

107 """Read the workload identity cert and key files specified in the certificate config provided. 

108 If no config path is provided, check the environment variable: "GOOGLE_API_CERTIFICATE_CONFIG" 

109 first, then the well known gcloud location: "~/.config/gcloud/certificate_config.json". 

110 

111 Args: 

112 certificate_config_path (string): The certificate config path. If no path is provided, 

113 the environment variable will be checked first, then the well known gcloud location. 

114 

115 Returns: 

116 Tuple[Optional[bytes], Optional[bytes]]: client certificate bytes in PEM format and key 

117 bytes in PEM format. 

118 

119 Raises: 

120 google.auth.exceptions.ClientCertError: if problems occurs when retrieving 

121 the certificate or key information. 

122 """ 

123 

124 cert_path, key_path = _get_workload_cert_and_key_paths(certificate_config_path) 

125 

126 if cert_path is None and key_path is None: 

127 return None, None 

128 

129 return _read_cert_and_key_files(cert_path, key_path) 

130 

131 

132def _get_cert_config_path(certificate_config_path=None): 

133 """Get the certificate configuration path based on the following order: 

134 

135 1: Explicit override, if set 

136 2: Environment variable, if set 

137 3: Well-known location 

138 

139 Returns "None" if the selected config file does not exist. 

140 

141 Args: 

142 certificate_config_path (string): The certificate config path. If provided, the well known 

143 location and environment variable will be ignored. 

144 

145 Returns: 

146 The absolute path of the certificate config file, and None if the file does not exist. 

147 """ 

148 

149 if certificate_config_path is None: 

150 env_path = environ.get(environment_vars.GOOGLE_API_CERTIFICATE_CONFIG, None) 

151 if env_path is not None and env_path != "": 

152 certificate_config_path = env_path 

153 else: 

154 certificate_config_path = CERTIFICATE_CONFIGURATION_DEFAULT_PATH 

155 

156 certificate_config_path = path.expanduser(certificate_config_path) 

157 if not path.exists(certificate_config_path): 

158 return None 

159 return certificate_config_path 

160 

161 

162def _get_workload_cert_and_key_paths(config_path): 

163 absolute_path = _get_cert_config_path(config_path) 

164 if absolute_path is None: 

165 return None, None 

166 

167 data = _load_json_file(absolute_path) 

168 

169 if "cert_configs" not in data: 

170 raise exceptions.ClientCertError( 

171 'Certificate config file {} is in an invalid format, a "cert configs" object is expected'.format( 

172 absolute_path 

173 ) 

174 ) 

175 cert_configs = data["cert_configs"] 

176 

177 if "workload" not in cert_configs: 

178 raise exceptions.ClientCertError( 

179 'Certificate config file {} is in an invalid format, a "workload" cert config is expected'.format( 

180 absolute_path 

181 ) 

182 ) 

183 workload = cert_configs["workload"] 

184 

185 if "cert_path" not in workload: 

186 raise exceptions.ClientCertError( 

187 'Certificate config file {} is in an invalid format, a "cert_path" is expected in the workload cert config'.format( 

188 absolute_path 

189 ) 

190 ) 

191 cert_path = workload["cert_path"] 

192 

193 if "key_path" not in workload: 

194 raise exceptions.ClientCertError( 

195 'Certificate config file {} is in an invalid format, a "key_path" is expected in the workload cert config'.format( 

196 absolute_path 

197 ) 

198 ) 

199 key_path = workload["key_path"] 

200 

201 # == BEGIN Temporary Cloud Run PATCH == 

202 # See https://github.com/googleapis/google-auth-library-python/issues/1881 

203 if (cert_path == _INCORRECT_CLOUD_RUN_CERT_PATH) and ( 

204 key_path == _INCORRECT_CLOUD_RUN_KEY_PATH 

205 ): 

206 if not path.exists(cert_path) and not path.exists(key_path): 

207 _LOGGER.debug( 

208 "Applying Cloud Run certificate path patch. " 

209 "Configured paths not found: %s, %s. " 

210 "Using well-known paths: %s, %s", 

211 cert_path, 

212 key_path, 

213 _WELL_KNOWN_CLOUD_RUN_CERT_PATH, 

214 _WELL_KNOWN_CLOUD_RUN_KEY_PATH, 

215 ) 

216 cert_path = _WELL_KNOWN_CLOUD_RUN_CERT_PATH 

217 key_path = _WELL_KNOWN_CLOUD_RUN_KEY_PATH 

218 # == END Temporary Cloud Run PATCH == 

219 

220 return cert_path, key_path 

221 

222 

223def _read_cert_and_key_files(cert_path, key_path): 

224 cert_data = _read_cert_file(cert_path) 

225 key_data = _read_key_file(key_path) 

226 

227 return cert_data, key_data 

228 

229 

230def _read_cert_file(cert_path): 

231 with open(cert_path, "rb") as cert_file: 

232 cert_data = cert_file.read() 

233 

234 cert_match = re.findall(_CERT_REGEX, cert_data) 

235 if len(cert_match) != 1: 

236 raise exceptions.ClientCertError( 

237 "Certificate file {} is in an invalid format, a single PEM formatted certificate is expected".format( 

238 cert_path 

239 ) 

240 ) 

241 return cert_match[0] 

242 

243 

244def _read_key_file(key_path): 

245 with open(key_path, "rb") as key_file: 

246 key_data = key_file.read() 

247 

248 key_match = re.findall(_KEY_REGEX, key_data) 

249 if len(key_match) != 1: 

250 raise exceptions.ClientCertError( 

251 "Private key file {} is in an invalid format, a single PEM formatted private key is expected".format( 

252 key_path 

253 ) 

254 ) 

255 

256 return key_match[0] 

257 

258 

259def _run_cert_provider_command(command, expect_encrypted_key=False): 

260 """Run the provided command, and return client side mTLS cert, key and 

261 passphrase. 

262 

263 Args: 

264 command (List[str]): cert provider command. 

265 expect_encrypted_key (bool): If encrypted private key is expected. 

266 

267 Returns: 

268 Tuple[bytes, bytes, bytes]: client certificate bytes in PEM format, key 

269 bytes in PEM format and passphrase bytes. 

270 

271 Raises: 

272 google.auth.exceptions.ClientCertError: if problems occurs when running 

273 the cert provider command or generating cert, key and passphrase. 

274 """ 

275 try: 

276 process = subprocess.Popen( 

277 command, stdout=subprocess.PIPE, stderr=subprocess.PIPE 

278 ) 

279 stdout, stderr = process.communicate() 

280 except OSError as caught_exc: 

281 new_exc = exceptions.ClientCertError(caught_exc) 

282 raise new_exc from caught_exc 

283 

284 # Check cert provider command execution error. 

285 if process.returncode != 0: 

286 raise exceptions.ClientCertError( 

287 "Cert provider command returns non-zero status code %s" % process.returncode 

288 ) 

289 

290 # Extract certificate (chain), key and passphrase. 

291 cert_match = re.findall(_CERT_REGEX, stdout) 

292 if len(cert_match) != 1: 

293 raise exceptions.ClientCertError("Client SSL certificate is missing or invalid") 

294 key_match = re.findall(_KEY_REGEX, stdout) 

295 if len(key_match) != 1: 

296 raise exceptions.ClientCertError("Client SSL key is missing or invalid") 

297 passphrase_match = re.findall(_PASSPHRASE_REGEX, stdout) 

298 

299 if expect_encrypted_key: 

300 if len(passphrase_match) != 1: 

301 raise exceptions.ClientCertError("Passphrase is missing or invalid") 

302 if b"ENCRYPTED" not in key_match[0]: 

303 raise exceptions.ClientCertError("Encrypted private key is expected") 

304 return cert_match[0], key_match[0], passphrase_match[0].strip() 

305 

306 if b"ENCRYPTED" in key_match[0]: 

307 raise exceptions.ClientCertError("Encrypted private key is not expected") 

308 if len(passphrase_match) > 0: 

309 raise exceptions.ClientCertError("Passphrase is not expected") 

310 return cert_match[0], key_match[0], None 

311 

312 

313def get_client_ssl_credentials( 

314 generate_encrypted_key=False, 

315 context_aware_metadata_path=CONTEXT_AWARE_METADATA_PATH, 

316 certificate_config_path=None, 

317): 

318 """Returns the client side certificate, private key and passphrase. 

319 

320 We look for certificates and keys with the following order of priority: 

321 1. Certificate and key specified by certificate_config.json. 

322 Currently, only X.509 workload certificates are supported. 

323 2. Certificate and key specified by context aware metadata (i.e. SecureConnect). 

324 

325 Args: 

326 generate_encrypted_key (bool): If set to True, encrypted private key 

327 and passphrase will be generated; otherwise, unencrypted private key 

328 will be generated and passphrase will be None. This option only 

329 affects keys obtained via context_aware_metadata.json. 

330 context_aware_metadata_path (str): The context_aware_metadata.json file path. 

331 certificate_config_path (str): The certificate_config.json file path. 

332 

333 Returns: 

334 Tuple[bool, bytes, bytes, bytes]: 

335 A boolean indicating if cert, key and passphrase are obtained, the 

336 cert bytes and key bytes both in PEM format, and passphrase bytes. 

337 

338 Raises: 

339 google.auth.exceptions.ClientCertError: if problems occurs when getting 

340 the cert, key and passphrase. 

341 """ 

342 

343 # 1. Attempt to retrieve X.509 Workload cert and key. 

344 cert, key = _get_workload_cert_and_key(certificate_config_path) 

345 if cert and key: 

346 return True, cert, key, None 

347 

348 # 2. Check for context aware metadata json 

349 metadata_path = _check_config_path(context_aware_metadata_path) 

350 

351 if metadata_path: 

352 metadata_json = _load_json_file(metadata_path) 

353 

354 if _CERT_PROVIDER_COMMAND not in metadata_json: 

355 raise exceptions.ClientCertError("Cert provider command is not found") 

356 

357 command = metadata_json[_CERT_PROVIDER_COMMAND] 

358 

359 if generate_encrypted_key and "--with_passphrase" not in command: 

360 command.append("--with_passphrase") 

361 

362 # Execute the command. 

363 cert, key, passphrase = _run_cert_provider_command( 

364 command, expect_encrypted_key=generate_encrypted_key 

365 ) 

366 return True, cert, key, passphrase 

367 

368 return False, None, None, None 

369 

370 

371def get_client_cert_and_key(client_cert_callback=None): 

372 """Returns the client side certificate and private key. The function first 

373 tries to get certificate and key from client_cert_callback; if the callback 

374 is None or doesn't provide certificate and key, the function tries application 

375 default SSL credentials. 

376 

377 Args: 

378 client_cert_callback (Optional[Callable[[], (bytes, bytes)]]): An 

379 optional callback which returns client certificate bytes and private 

380 key bytes both in PEM format. 

381 

382 Returns: 

383 Tuple[bool, bytes, bytes]: 

384 A boolean indicating if cert and key are obtained, the cert bytes 

385 and key bytes both in PEM format. 

386 

387 Raises: 

388 google.auth.exceptions.ClientCertError: if problems occurs when getting 

389 the cert and key. 

390 """ 

391 if client_cert_callback: 

392 cert, key = client_cert_callback() 

393 return True, cert, key 

394 

395 has_cert, cert, key, _ = get_client_ssl_credentials(generate_encrypted_key=False) 

396 return has_cert, cert, key 

397 

398 

399def decrypt_private_key(key, passphrase): 

400 """A helper function to decrypt the private key with the given passphrase. 

401 google-auth library doesn't support passphrase protected private key for 

402 mutual TLS channel. This helper function can be used to decrypt the 

403 passphrase protected private key in order to estalish mutual TLS channel. 

404 

405 For example, if you have a function which produces client cert, passphrase 

406 protected private key and passphrase, you can convert it to a client cert 

407 callback function accepted by google-auth:: 

408 

409 from google.auth.transport import _mtls_helper 

410 

411 def your_client_cert_function(): 

412 return cert, encrypted_key, passphrase 

413 

414 # callback accepted by google-auth for mutual TLS channel. 

415 def client_cert_callback(): 

416 cert, encrypted_key, passphrase = your_client_cert_function() 

417 decrypted_key = _mtls_helper.decrypt_private_key(encrypted_key, 

418 passphrase) 

419 return cert, decrypted_key 

420 

421 Args: 

422 key (bytes): The private key bytes in PEM format. 

423 passphrase (bytes): The passphrase bytes. 

424 

425 Returns: 

426 bytes: The decrypted private key in PEM format. 

427 

428 Raises: 

429 ImportError: If pyOpenSSL is not installed. 

430 OpenSSL.crypto.Error: If there is any problem decrypting the private key. 

431 """ 

432 from OpenSSL import crypto 

433 

434 # First convert encrypted_key_bytes to PKey object 

435 pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, key, passphrase=passphrase) 

436 

437 # Then dump the decrypted key bytes 

438 return crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey) 

439 

440 

441def check_use_client_cert(): 

442 """Returns boolean for whether the client certificate should be used for mTLS. 

443 

444 If GOOGLE_API_USE_CLIENT_CERTIFICATE is set to true or false, a corresponding 

445 bool value will be returned. If the value is set to an unexpected string, it 

446 will default to False. 

447 If GOOGLE_API_USE_CLIENT_CERTIFICATE is unset, the value will be inferred 

448 by reading a file pointed at by GOOGLE_API_CERTIFICATE_CONFIG, and verifying 

449 it contains a "workload" section. If so, the function will return True, 

450 otherwise False. 

451 

452 Returns: 

453 bool: Whether the client certificate should be used for mTLS connection. 

454 """ 

455 use_client_cert = getenv("GOOGLE_API_USE_CLIENT_CERTIFICATE") 

456 # Check if the value of GOOGLE_API_USE_CLIENT_CERTIFICATE is set. 

457 if use_client_cert: 

458 return use_client_cert.lower() == "true" 

459 else: 

460 # Check if the value of GOOGLE_API_CERTIFICATE_CONFIG is set. 

461 cert_path = getenv("GOOGLE_API_CERTIFICATE_CONFIG") 

462 if cert_path: 

463 try: 

464 with open(cert_path, "r") as f: 

465 content = json.load(f) 

466 # verify json has workload key 

467 content["cert_configs"]["workload"] 

468 return True 

469 except ( 

470 FileNotFoundError, 

471 OSError, 

472 KeyError, 

473 TypeError, 

474 json.JSONDecodeError, 

475 ) as e: 

476 _LOGGER.debug("error decoding certificate: %s", e) 

477 return False 

478 

479 

480def check_parameters_for_unauthorized_response(cached_cert): 

481 """Returns the cached and current cert fingerprint for reconfiguring mTLS. 

482 

483 Args: 

484 cached_cert(bytes): The cached client certificate. 

485 

486 Returns: 

487 bytes: The client callback cert bytes. 

488 bytes: The client callback key bytes. 

489 str: The base64-encoded SHA256 cached fingerprint. 

490 str: The base64-encoded SHA256 current cert fingerprint. 

491 """ 

492 call_cert_bytes, call_key_bytes = _agent_identity_utils.call_client_cert_callback() 

493 cert_obj = _agent_identity_utils.parse_certificate(call_cert_bytes) 

494 current_cert_fingerprint = _agent_identity_utils.calculate_certificate_fingerprint( 

495 cert_obj 

496 ) 

497 if cached_cert: 

498 cached_fingerprint = _agent_identity_utils.get_cached_cert_fingerprint( 

499 cached_cert 

500 ) 

501 else: 

502 cached_fingerprint = current_cert_fingerprint 

503 return call_cert_bytes, call_key_bytes, cached_fingerprint, current_cert_fingerprint