Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/auth/transport/_mtls_helper.py: 20%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

135 statements  

1# Copyright 2020 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Helper functions for getting mTLS cert and key.""" 

16 

17import json 

18import logging 

19from os import environ, path 

20import re 

21import subprocess 

22 

23from google.auth import exceptions 

24 

25CONTEXT_AWARE_METADATA_PATH = "~/.secureConnect/context_aware_metadata.json" 

26CERTIFICATE_CONFIGURATION_DEFAULT_PATH = "~/.config/gcloud/certificate_config.json" 

27_CERTIFICATE_CONFIGURATION_ENV = "GOOGLE_API_CERTIFICATE_CONFIG" 

28_CERT_PROVIDER_COMMAND = "cert_provider_command" 

29_CERT_REGEX = re.compile( 

30 b"-----BEGIN CERTIFICATE-----.+-----END CERTIFICATE-----\r?\n?", re.DOTALL 

31) 

32 

33# support various format of key files, e.g. 

34# "-----BEGIN PRIVATE KEY-----...", 

35# "-----BEGIN EC PRIVATE KEY-----...", 

36# "-----BEGIN RSA PRIVATE KEY-----..." 

37# "-----BEGIN ENCRYPTED PRIVATE KEY-----" 

38_KEY_REGEX = re.compile( 

39 b"-----BEGIN [A-Z ]*PRIVATE KEY-----.+-----END [A-Z ]*PRIVATE KEY-----\r?\n?", 

40 re.DOTALL, 

41) 

42 

43_LOGGER = logging.getLogger(__name__) 

44 

45 

46_PASSPHRASE_REGEX = re.compile( 

47 b"-----BEGIN PASSPHRASE-----(.+)-----END PASSPHRASE-----", re.DOTALL 

48) 

49 

50 

51def _check_config_path(config_path): 

52 """Checks for config file path. If it exists, returns the absolute path with user expansion; 

53 otherwise returns None. 

54 

55 Args: 

56 config_path (str): The config file path for either context_aware_metadata.json or certificate_config.json for example 

57 

58 Returns: 

59 str: absolute path if exists and None otherwise. 

60 """ 

61 config_path = path.expanduser(config_path) 

62 if not path.exists(config_path): 

63 _LOGGER.debug("%s is not found.", config_path) 

64 return None 

65 return config_path 

66 

67 

68def _load_json_file(path): 

69 """Reads and loads JSON from the given path. Used to read both X509 workload certificate and 

70 secure connect configurations. 

71 

72 Args: 

73 path (str): the path to read from. 

74 

75 Returns: 

76 Dict[str, str]: The JSON stored at the file. 

77 

78 Raises: 

79 google.auth.exceptions.ClientCertError: If failed to parse the file as JSON. 

80 """ 

81 try: 

82 with open(path) as f: 

83 json_data = json.load(f) 

84 except ValueError as caught_exc: 

85 new_exc = exceptions.ClientCertError(caught_exc) 

86 raise new_exc from caught_exc 

87 

88 return json_data 

89 

90 

91def _get_workload_cert_and_key(certificate_config_path=None): 

92 """Read the workload identity cert and key files specified in the certificate config provided. 

93 If no config path is provided, check the environment variable: "GOOGLE_API_CERTIFICATE_CONFIG" 

94 first, then the well known gcloud location: "~/.config/gcloud/certificate_config.json". 

95 

96 Args: 

97 certificate_config_path (string): The certificate config path. If no path is provided, 

98 the environment variable will be checked first, then the well known gcloud location. 

99 

100 Returns: 

101 Tuple[Optional[bytes], Optional[bytes]]: client certificate bytes in PEM format and key 

102 bytes in PEM format. 

103 

104 Raises: 

105 google.auth.exceptions.ClientCertError: if problems occurs when retrieving 

106 the certificate or key information. 

107 """ 

108 

109 cert_path, key_path = _get_workload_cert_and_key_paths(certificate_config_path) 

110 

111 if cert_path is None and key_path is None: 

112 return None, None 

113 

114 return _read_cert_and_key_files(cert_path, key_path) 

115 

116 

117def _get_cert_config_path(certificate_config_path=None): 

118 """Get the certificate configuration path based on the following order: 

119 

120 1: Explicit override, if set 

121 2: Environment variable, if set 

122 3: Well-known location 

123 

124 Returns "None" if the selected config file does not exist. 

125 

126 Args: 

127 certificate_config_path (string): The certificate config path. If provided, the well known 

128 location and environment variable will be ignored. 

129 

130 Returns: 

131 The absolute path of the certificate config file, and None if the file does not exist. 

132 """ 

133 

134 if certificate_config_path is None: 

135 env_path = environ.get(_CERTIFICATE_CONFIGURATION_ENV, None) 

136 if env_path is not None and env_path != "": 

137 certificate_config_path = env_path 

138 else: 

139 certificate_config_path = CERTIFICATE_CONFIGURATION_DEFAULT_PATH 

140 

141 certificate_config_path = path.expanduser(certificate_config_path) 

142 if not path.exists(certificate_config_path): 

143 return None 

144 return certificate_config_path 

145 

146 

147def _get_workload_cert_and_key_paths(config_path): 

148 absolute_path = _get_cert_config_path(config_path) 

149 if absolute_path is None: 

150 return None, None 

151 

152 data = _load_json_file(absolute_path) 

153 

154 if "cert_configs" not in data: 

155 raise exceptions.ClientCertError( 

156 'Certificate config file {} is in an invalid format, a "cert configs" object is expected'.format( 

157 absolute_path 

158 ) 

159 ) 

160 cert_configs = data["cert_configs"] 

161 

162 if "workload" not in cert_configs: 

163 raise exceptions.ClientCertError( 

164 'Certificate config file {} is in an invalid format, a "workload" cert config is expected'.format( 

165 absolute_path 

166 ) 

167 ) 

168 workload = cert_configs["workload"] 

169 

170 if "cert_path" not in workload: 

171 raise exceptions.ClientCertError( 

172 'Certificate config file {} is in an invalid format, a "cert_path" is expected in the workload cert config'.format( 

173 absolute_path 

174 ) 

175 ) 

176 cert_path = workload["cert_path"] 

177 

178 if "key_path" not in workload: 

179 raise exceptions.ClientCertError( 

180 'Certificate config file {} is in an invalid format, a "key_path" is expected in the workload cert config'.format( 

181 absolute_path 

182 ) 

183 ) 

184 key_path = workload["key_path"] 

185 

186 return cert_path, key_path 

187 

188 

189def _read_cert_and_key_files(cert_path, key_path): 

190 cert_data = _read_cert_file(cert_path) 

191 key_data = _read_key_file(key_path) 

192 

193 return cert_data, key_data 

194 

195 

196def _read_cert_file(cert_path): 

197 with open(cert_path, "rb") as cert_file: 

198 cert_data = cert_file.read() 

199 

200 cert_match = re.findall(_CERT_REGEX, cert_data) 

201 if len(cert_match) != 1: 

202 raise exceptions.ClientCertError( 

203 "Certificate file {} is in an invalid format, a single PEM formatted certificate is expected".format( 

204 cert_path 

205 ) 

206 ) 

207 return cert_match[0] 

208 

209 

210def _read_key_file(key_path): 

211 with open(key_path, "rb") as key_file: 

212 key_data = key_file.read() 

213 

214 key_match = re.findall(_KEY_REGEX, key_data) 

215 if len(key_match) != 1: 

216 raise exceptions.ClientCertError( 

217 "Private key file {} is in an invalid format, a single PEM formatted private key is expected".format( 

218 key_path 

219 ) 

220 ) 

221 

222 return key_match[0] 

223 

224 

225def _run_cert_provider_command(command, expect_encrypted_key=False): 

226 """Run the provided command, and return client side mTLS cert, key and 

227 passphrase. 

228 

229 Args: 

230 command (List[str]): cert provider command. 

231 expect_encrypted_key (bool): If encrypted private key is expected. 

232 

233 Returns: 

234 Tuple[bytes, bytes, bytes]: client certificate bytes in PEM format, key 

235 bytes in PEM format and passphrase bytes. 

236 

237 Raises: 

238 google.auth.exceptions.ClientCertError: if problems occurs when running 

239 the cert provider command or generating cert, key and passphrase. 

240 """ 

241 try: 

242 process = subprocess.Popen( 

243 command, stdout=subprocess.PIPE, stderr=subprocess.PIPE 

244 ) 

245 stdout, stderr = process.communicate() 

246 except OSError as caught_exc: 

247 new_exc = exceptions.ClientCertError(caught_exc) 

248 raise new_exc from caught_exc 

249 

250 # Check cert provider command execution error. 

251 if process.returncode != 0: 

252 raise exceptions.ClientCertError( 

253 "Cert provider command returns non-zero status code %s" % process.returncode 

254 ) 

255 

256 # Extract certificate (chain), key and passphrase. 

257 cert_match = re.findall(_CERT_REGEX, stdout) 

258 if len(cert_match) != 1: 

259 raise exceptions.ClientCertError("Client SSL certificate is missing or invalid") 

260 key_match = re.findall(_KEY_REGEX, stdout) 

261 if len(key_match) != 1: 

262 raise exceptions.ClientCertError("Client SSL key is missing or invalid") 

263 passphrase_match = re.findall(_PASSPHRASE_REGEX, stdout) 

264 

265 if expect_encrypted_key: 

266 if len(passphrase_match) != 1: 

267 raise exceptions.ClientCertError("Passphrase is missing or invalid") 

268 if b"ENCRYPTED" not in key_match[0]: 

269 raise exceptions.ClientCertError("Encrypted private key is expected") 

270 return cert_match[0], key_match[0], passphrase_match[0].strip() 

271 

272 if b"ENCRYPTED" in key_match[0]: 

273 raise exceptions.ClientCertError("Encrypted private key is not expected") 

274 if len(passphrase_match) > 0: 

275 raise exceptions.ClientCertError("Passphrase is not expected") 

276 return cert_match[0], key_match[0], None 

277 

278 

279def get_client_ssl_credentials( 

280 generate_encrypted_key=False, 

281 context_aware_metadata_path=CONTEXT_AWARE_METADATA_PATH, 

282 certificate_config_path=CERTIFICATE_CONFIGURATION_DEFAULT_PATH, 

283): 

284 """Returns the client side certificate, private key and passphrase. 

285 

286 We look for certificates and keys with the following order of priority: 

287 1. Certificate and key specified by certificate_config.json. 

288 Currently, only X.509 workload certificates are supported. 

289 2. Certificate and key specified by context aware metadata (i.e. SecureConnect). 

290 

291 Args: 

292 generate_encrypted_key (bool): If set to True, encrypted private key 

293 and passphrase will be generated; otherwise, unencrypted private key 

294 will be generated and passphrase will be None. This option only 

295 affects keys obtained via context_aware_metadata.json. 

296 context_aware_metadata_path (str): The context_aware_metadata.json file path. 

297 certificate_config_path (str): The certificate_config.json file path. 

298 

299 Returns: 

300 Tuple[bool, bytes, bytes, bytes]: 

301 A boolean indicating if cert, key and passphrase are obtained, the 

302 cert bytes and key bytes both in PEM format, and passphrase bytes. 

303 

304 Raises: 

305 google.auth.exceptions.ClientCertError: if problems occurs when getting 

306 the cert, key and passphrase. 

307 """ 

308 

309 # 1. Check for certificate config json. 

310 cert_config_path = _check_config_path(certificate_config_path) 

311 if cert_config_path: 

312 # Attempt to retrieve X.509 Workload cert and key. 

313 cert, key = _get_workload_cert_and_key(cert_config_path) 

314 if cert and key: 

315 return True, cert, key, None 

316 

317 # 2. Check for context aware metadata json 

318 metadata_path = _check_config_path(context_aware_metadata_path) 

319 

320 if metadata_path: 

321 metadata_json = _load_json_file(metadata_path) 

322 

323 if _CERT_PROVIDER_COMMAND not in metadata_json: 

324 raise exceptions.ClientCertError("Cert provider command is not found") 

325 

326 command = metadata_json[_CERT_PROVIDER_COMMAND] 

327 

328 if generate_encrypted_key and "--with_passphrase" not in command: 

329 command.append("--with_passphrase") 

330 

331 # Execute the command. 

332 cert, key, passphrase = _run_cert_provider_command( 

333 command, expect_encrypted_key=generate_encrypted_key 

334 ) 

335 return True, cert, key, passphrase 

336 

337 return False, None, None, None 

338 

339 

340def get_client_cert_and_key(client_cert_callback=None): 

341 """Returns the client side certificate and private key. The function first 

342 tries to get certificate and key from client_cert_callback; if the callback 

343 is None or doesn't provide certificate and key, the function tries application 

344 default SSL credentials. 

345 

346 Args: 

347 client_cert_callback (Optional[Callable[[], (bytes, bytes)]]): An 

348 optional callback which returns client certificate bytes and private 

349 key bytes both in PEM format. 

350 

351 Returns: 

352 Tuple[bool, bytes, bytes]: 

353 A boolean indicating if cert and key are obtained, the cert bytes 

354 and key bytes both in PEM format. 

355 

356 Raises: 

357 google.auth.exceptions.ClientCertError: if problems occurs when getting 

358 the cert and key. 

359 """ 

360 if client_cert_callback: 

361 cert, key = client_cert_callback() 

362 return True, cert, key 

363 

364 has_cert, cert, key, _ = get_client_ssl_credentials(generate_encrypted_key=False) 

365 return has_cert, cert, key 

366 

367 

368def decrypt_private_key(key, passphrase): 

369 """A helper function to decrypt the private key with the given passphrase. 

370 google-auth library doesn't support passphrase protected private key for 

371 mutual TLS channel. This helper function can be used to decrypt the 

372 passphrase protected private key in order to estalish mutual TLS channel. 

373 

374 For example, if you have a function which produces client cert, passphrase 

375 protected private key and passphrase, you can convert it to a client cert 

376 callback function accepted by google-auth:: 

377 

378 from google.auth.transport import _mtls_helper 

379 

380 def your_client_cert_function(): 

381 return cert, encrypted_key, passphrase 

382 

383 # callback accepted by google-auth for mutual TLS channel. 

384 def client_cert_callback(): 

385 cert, encrypted_key, passphrase = your_client_cert_function() 

386 decrypted_key = _mtls_helper.decrypt_private_key(encrypted_key, 

387 passphrase) 

388 return cert, decrypted_key 

389 

390 Args: 

391 key (bytes): The private key bytes in PEM format. 

392 passphrase (bytes): The passphrase bytes. 

393 

394 Returns: 

395 bytes: The decrypted private key in PEM format. 

396 

397 Raises: 

398 ImportError: If pyOpenSSL is not installed. 

399 OpenSSL.crypto.Error: If there is any problem decrypting the private key. 

400 """ 

401 from OpenSSL import crypto 

402 

403 # First convert encrypted_key_bytes to PKey object 

404 pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, key, passphrase=passphrase) 

405 

406 # Then dump the decrypted key bytes 

407 return crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey)