Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/auth/compute_engine/_metadata.py: 59%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

151 statements  

1# Copyright 2016 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Provides helper methods for talking to the Compute Engine metadata server. 

16 

17See https://cloud.google.com/compute/docs/metadata for more details. 

18""" 

19 

20import datetime 

21import http.client as http_client 

22import json 

23import logging 

24import os 

25from urllib.parse import urljoin 

26 

27import requests 

28 

29from google.auth import _helpers 

30from google.auth import environment_vars 

31from google.auth import exceptions 

32from google.auth import metrics 

33from google.auth import transport 

34from google.auth._exponential_backoff import ExponentialBackoff 

35from google.auth.compute_engine import _mtls 

36 

37 

38_LOGGER = logging.getLogger(__name__) 

39 

40_GCE_DEFAULT_MDS_IP = "169.254.169.254" 

41_GCE_DEFAULT_HOST = "metadata.google.internal" 

42_GCE_DEFAULT_MDS_HOSTS = [_GCE_DEFAULT_HOST, _GCE_DEFAULT_MDS_IP] 

43 

44# Environment variable GCE_METADATA_HOST is originally named 

45# GCE_METADATA_ROOT. For compatibility reasons, here it checks 

46# the new variable first; if not set, the system falls back 

47# to the old variable. 

48_GCE_METADATA_HOST = os.getenv(environment_vars.GCE_METADATA_HOST, None) 

49if not _GCE_METADATA_HOST: 

50 _GCE_METADATA_HOST = os.getenv( 

51 environment_vars.GCE_METADATA_ROOT, _GCE_DEFAULT_HOST 

52 ) 

53 

54 

55def _validate_gce_mds_configured_environment(): 

56 """Validates the GCE metadata server environment configuration for mTLS. 

57 

58 mTLS is only supported when connecting to the default metadata server hosts. 

59 If we are in strict mode (which requires mTLS), ensure that the metadata host 

60 has not been overridden to a custom value (which means mTLS will fail). 

61 

62 Raises: 

63 google.auth.exceptions.MutualTLSChannelError: if the environment 

64 configuration is invalid for mTLS. 

65 """ 

66 mode = _mtls._parse_mds_mode() 

67 if mode == _mtls.MdsMtlsMode.STRICT: 

68 # mTLS is only supported when connecting to the default metadata host. 

69 # Raise an exception if we are in strict mode (which requires mTLS) 

70 # but the metadata host has been overridden to a custom MDS. (which means mTLS will fail) 

71 if _GCE_METADATA_HOST not in _GCE_DEFAULT_MDS_HOSTS: 

72 raise exceptions.MutualTLSChannelError( 

73 "Mutual TLS is required, but the metadata host has been overridden. " 

74 "mTLS is only supported when connecting to the default metadata host." 

75 ) 

76 

77 

78def _get_metadata_root(use_mtls: bool): 

79 """Returns the metadata server root URL.""" 

80 

81 scheme = "https" if use_mtls else "http" 

82 return "{}://{}/computeMetadata/v1/".format(scheme, _GCE_METADATA_HOST) 

83 

84 

85def _get_metadata_ip_root(use_mtls: bool): 

86 """Returns the metadata server IP root URL.""" 

87 scheme = "https" if use_mtls else "http" 

88 return "{}://{}".format( 

89 scheme, os.getenv(environment_vars.GCE_METADATA_IP, _GCE_DEFAULT_MDS_IP) 

90 ) 

91 

92 

93_METADATA_FLAVOR_HEADER = "metadata-flavor" 

94_METADATA_FLAVOR_VALUE = "Google" 

95_METADATA_HEADERS = {_METADATA_FLAVOR_HEADER: _METADATA_FLAVOR_VALUE} 

96 

97# Timeout in seconds to wait for the GCE metadata server when detecting the 

98# GCE environment. 

99try: 

100 _METADATA_DEFAULT_TIMEOUT = int(os.getenv(environment_vars.GCE_METADATA_TIMEOUT, 3)) 

101except ValueError: # pragma: NO COVER 

102 _METADATA_DEFAULT_TIMEOUT = 3 

103 

104# The number of tries to perform when waiting for the GCE metadata server 

105# when detecting the GCE environment. 

106try: 

107 _METADATA_DETECT_RETRIES = int( 

108 os.getenv(environment_vars.GCE_METADATA_DETECT_RETRIES, 3) 

109 ) 

110except ValueError: # pragma: NO COVER 

111 _METADATA_DETECT_RETRIES = 3 

112 

113# This is used to disable checking for the GCE metadata server and directly 

114# assuming it's not available. 

115_NO_GCE_CHECK = os.getenv(environment_vars.NO_GCE_CHECK) == "true" 

116 

117# Detect GCE Residency 

118_GOOGLE = "Google" 

119_GCE_PRODUCT_NAME_FILE = "/sys/class/dmi/id/product_name" 

120 

121 

122def is_on_gce(request): 

123 """Checks to see if the code runs on Google Compute Engine 

124 

125 Args: 

126 request (google.auth.transport.Request): A callable used to make 

127 HTTP requests. 

128 

129 Returns: 

130 bool: True if the code runs on Google Compute Engine, False otherwise. 

131 """ 

132 if _NO_GCE_CHECK: 

133 return False 

134 

135 if ping(request): 

136 return True 

137 

138 if os.name == "nt": 

139 # TODO: implement GCE residency detection on Windows 

140 return False 

141 

142 # Detect GCE residency on Linux 

143 return detect_gce_residency_linux() 

144 

145 

146def detect_gce_residency_linux(): 

147 """Detect Google Compute Engine residency by smbios check on Linux 

148 

149 Returns: 

150 bool: True if the GCE product name file is detected, False otherwise. 

151 """ 

152 try: 

153 with open(_GCE_PRODUCT_NAME_FILE, "r") as file_obj: 

154 content = file_obj.read().strip() 

155 

156 except Exception: 

157 return False 

158 

159 return content.startswith(_GOOGLE) 

160 

161 

162def _prepare_request_for_mds(request, use_mtls=False) -> None: 

163 """Prepares a request for the metadata server. 

164 

165 This will check if mTLS should be used and mount the mTLS adapter if needed. 

166 

167 Args: 

168 request (google.auth.transport.Request): A callable used to make 

169 HTTP requests. 

170 use_mtls (bool): Whether to use mTLS for the request. 

171 

172 Returns: 

173 google.auth.transport.Request: A request object to use. 

174 If mTLS is enabled, the request will have the mTLS adapter mounted. 

175 Otherwise, the original request will be returned unchanged. 

176 """ 

177 # Only modify the request if mTLS is enabled. 

178 if use_mtls: 

179 # Ensure the request has a session to mount the adapter to. 

180 if not request.session: 

181 request.session = requests.Session() 

182 

183 adapter = _mtls.MdsMtlsAdapter() 

184 # Mount the adapter for all default GCE metadata hosts. 

185 for host in _GCE_DEFAULT_MDS_HOSTS: 

186 request.session.mount(f"https://{host}/", adapter) 

187 

188 

189def ping( 

190 request, timeout=_METADATA_DEFAULT_TIMEOUT, retry_count=_METADATA_DETECT_RETRIES 

191): 

192 """Checks to see if the metadata server is available. 

193 

194 Args: 

195 request (google.auth.transport.Request): A callable used to make 

196 HTTP requests. 

197 timeout (int): How long to wait for the metadata server to respond. 

198 retry_count (int): How many times to attempt connecting to metadata 

199 server using above timeout. 

200 

201 Returns: 

202 bool: True if the metadata server is reachable, False otherwise. 

203 """ 

204 use_mtls = _mtls.should_use_mds_mtls() 

205 _prepare_request_for_mds(request, use_mtls=use_mtls) 

206 # NOTE: The explicit ``timeout`` is a workaround. The underlying 

207 # issue is that resolving an unknown host on some networks will take 

208 # 20-30 seconds; making this timeout short fixes the issue, but 

209 # could lead to false negatives in the event that we are on GCE, but 

210 # the metadata resolution was particularly slow. The latter case is 

211 # "unlikely". 

212 headers = _METADATA_HEADERS.copy() 

213 headers[metrics.API_CLIENT_HEADER] = metrics.mds_ping() 

214 

215 backoff = ExponentialBackoff(total_attempts=retry_count) 

216 

217 for attempt in backoff: 

218 try: 

219 response = request( 

220 url=_get_metadata_ip_root(use_mtls), 

221 method="GET", 

222 headers=headers, 

223 timeout=timeout, 

224 ) 

225 

226 metadata_flavor = response.headers.get(_METADATA_FLAVOR_HEADER) 

227 return ( 

228 response.status == http_client.OK 

229 and metadata_flavor == _METADATA_FLAVOR_VALUE 

230 ) 

231 

232 except exceptions.TransportError as e: 

233 _LOGGER.warning( 

234 "Compute Engine Metadata server unavailable on " 

235 "attempt %s of %s. Reason: %s", 

236 attempt, 

237 retry_count, 

238 e, 

239 ) 

240 

241 return False 

242 

243 

244def get( 

245 request, 

246 path, 

247 root=None, 

248 params=None, 

249 recursive=False, 

250 retry_count=5, 

251 headers=None, 

252 return_none_for_not_found_error=False, 

253 timeout=_METADATA_DEFAULT_TIMEOUT, 

254): 

255 """Fetch a resource from the metadata server. 

256 

257 Args: 

258 request (google.auth.transport.Request): A callable used to make 

259 HTTP requests. 

260 path (str): The resource to retrieve. For example, 

261 ``'instance/service-accounts/default'``. 

262 root (Optional[str]): The full path to the metadata server root. If not 

263 provided, the default root will be used. 

264 params (Optional[Mapping[str, str]]): A mapping of query parameter 

265 keys to values. 

266 recursive (bool): Whether to do a recursive query of metadata. See 

267 https://cloud.google.com/compute/docs/metadata#aggcontents for more 

268 details. 

269 retry_count (int): How many times to attempt connecting to metadata 

270 server using above timeout. 

271 headers (Optional[Mapping[str, str]]): Headers for the request. 

272 return_none_for_not_found_error (Optional[bool]): If True, returns None 

273 for 404 error instead of throwing an exception. 

274 timeout (int): How long to wait, in seconds for the metadata server to respond. 

275 

276 Returns: 

277 Union[Mapping, str]: If the metadata server returns JSON, a mapping of 

278 the decoded JSON is returned. Otherwise, the response content is 

279 returned as a string. 

280 

281 Raises: 

282 google.auth.exceptions.TransportError: if an error occurred while 

283 retrieving metadata. 

284 google.auth.exceptions.MutualTLSChannelError: if using mtls and the environment 

285 configuration is invalid for mTLS (for example, the metadata host 

286 has been overridden in strict mTLS mode). 

287 

288 """ 

289 use_mtls = _mtls.should_use_mds_mtls() 

290 # Prepare the request object for mTLS if needed. 

291 # This will create a new request object with the mTLS session. 

292 _prepare_request_for_mds(request, use_mtls=use_mtls) 

293 

294 if root is None: 

295 root = _get_metadata_root(use_mtls) 

296 

297 # mTLS is only supported when connecting to the default metadata host. 

298 # If we are in strict mode (which requires mTLS), ensure that the metadata host 

299 # has not been overridden to a non-default host value (which means mTLS will fail). 

300 _validate_gce_mds_configured_environment() 

301 

302 base_url = urljoin(root, path) 

303 query_params = {} if params is None else params 

304 

305 headers_to_use = _METADATA_HEADERS.copy() 

306 if headers: 

307 headers_to_use.update(headers) 

308 

309 if recursive: 

310 query_params["recursive"] = "true" 

311 

312 url = _helpers.update_query(base_url, query_params) 

313 

314 backoff = ExponentialBackoff(total_attempts=retry_count) 

315 last_exception = None 

316 for attempt in backoff: 

317 try: 

318 response = request( 

319 url=url, method="GET", headers=headers_to_use, timeout=timeout 

320 ) 

321 if response.status in transport.DEFAULT_RETRYABLE_STATUS_CODES: 

322 _LOGGER.warning( 

323 "Compute Engine Metadata server unavailable on " 

324 "attempt %s of %s. Response status: %s", 

325 attempt, 

326 retry_count, 

327 response.status, 

328 ) 

329 last_exception = None 

330 continue 

331 else: 

332 last_exception = None 

333 break 

334 

335 except exceptions.TransportError as e: 

336 _LOGGER.warning( 

337 "Compute Engine Metadata server unavailable on " 

338 "attempt %s of %s. Reason: %s", 

339 attempt, 

340 retry_count, 

341 e, 

342 ) 

343 last_exception = e 

344 else: 

345 if last_exception: 

346 raise exceptions.TransportError( 

347 "Failed to retrieve {} from the Google Compute Engine " 

348 "metadata service. Compute Engine Metadata server unavailable. " 

349 "Last exception: {}".format(url, last_exception) 

350 ) from last_exception 

351 else: 

352 error_details = ( 

353 response.data.decode("utf-8") 

354 if hasattr(response.data, "decode") 

355 else response.data 

356 ) 

357 raise exceptions.TransportError( 

358 "Failed to retrieve {} from the Google Compute Engine " 

359 "metadata service. Compute Engine Metadata server unavailable. " 

360 "Response status: {}\nResponse details:\n{}".format( 

361 url, response.status, error_details 

362 ) 

363 ) 

364 

365 content = _helpers.from_bytes(response.data) 

366 

367 if response.status == http_client.NOT_FOUND and return_none_for_not_found_error: 

368 return None 

369 

370 if response.status == http_client.OK: 

371 if ( 

372 _helpers.parse_content_type(response.headers["content-type"]) 

373 == "application/json" 

374 ): 

375 try: 

376 return json.loads(content) 

377 except ValueError as caught_exc: 

378 new_exc = exceptions.TransportError( 

379 "Received invalid JSON from the Google Compute Engine " 

380 "metadata service: {:.20}".format(content) 

381 ) 

382 raise new_exc from caught_exc 

383 else: 

384 return content 

385 

386 raise exceptions.TransportError( 

387 "Failed to retrieve {} from the Google Compute Engine " 

388 "metadata service. Status: {} Response:\n{}".format( 

389 url, response.status, response.data 

390 ), 

391 response, 

392 ) 

393 

394 

395def get_project_id(request): 

396 """Get the Google Cloud Project ID from the metadata server. 

397 

398 Args: 

399 request (google.auth.transport.Request): A callable used to make 

400 HTTP requests. 

401 

402 Returns: 

403 str: The project ID 

404 

405 Raises: 

406 google.auth.exceptions.TransportError: if an error occurred while 

407 retrieving metadata. 

408 """ 

409 return get(request, "project/project-id") 

410 

411 

412def get_universe_domain(request): 

413 """Get the universe domain value from the metadata server. 

414 

415 Args: 

416 request (google.auth.transport.Request): A callable used to make 

417 HTTP requests. 

418 

419 Returns: 

420 str: The universe domain value. If the universe domain endpoint is not 

421 not found, return the default value, which is googleapis.com 

422 

423 Raises: 

424 google.auth.exceptions.TransportError: if an error other than 

425 404 occurs while retrieving metadata. 

426 """ 

427 universe_domain = get( 

428 request, "universe/universe-domain", return_none_for_not_found_error=True 

429 ) 

430 if not universe_domain: 

431 return "googleapis.com" 

432 return universe_domain 

433 

434 

435def get_service_account_info(request, service_account="default"): 

436 """Get information about a service account from the metadata server. 

437 

438 Args: 

439 request (google.auth.transport.Request): A callable used to make 

440 HTTP requests. 

441 service_account (str): The string 'default' or a service account email 

442 address. The determines which service account for which to acquire 

443 information. 

444 

445 Returns: 

446 Mapping: The service account's information, for example:: 

447 

448 { 

449 'email': '...', 

450 'scopes': ['scope', ...], 

451 'aliases': ['default', '...'] 

452 } 

453 

454 Raises: 

455 google.auth.exceptions.TransportError: if an error occurred while 

456 retrieving metadata. 

457 """ 

458 path = "instance/service-accounts/{0}/".format(service_account) 

459 # See https://cloud.google.com/compute/docs/metadata#aggcontents 

460 # for more on the use of 'recursive'. 

461 return get(request, path, params={"recursive": "true"}) 

462 

463 

464def get_service_account_token(request, service_account="default", scopes=None): 

465 """Get the OAuth 2.0 access token for a service account. 

466 

467 Args: 

468 request (google.auth.transport.Request): A callable used to make 

469 HTTP requests. 

470 service_account (str): The string 'default' or a service account email 

471 address. The determines which service account for which to acquire 

472 an access token. 

473 scopes (Optional[Union[str, List[str]]]): Optional string or list of 

474 strings with auth scopes. 

475 Returns: 

476 Tuple[str, datetime]: The access token and its expiration. 

477 

478 Raises: 

479 google.auth.exceptions.TransportError: if an error occurred while 

480 retrieving metadata. 

481 """ 

482 from google.auth import _agent_identity_utils 

483 

484 params = {} 

485 if scopes: 

486 if not isinstance(scopes, str): 

487 scopes = ",".join(scopes) 

488 params["scopes"] = scopes 

489 

490 cert = _agent_identity_utils.get_and_parse_agent_identity_certificate() 

491 if cert: 

492 if _agent_identity_utils.should_request_bound_token(cert): 

493 fingerprint = _agent_identity_utils.calculate_certificate_fingerprint(cert) 

494 params["bindCertificateFingerprint"] = fingerprint 

495 

496 metrics_header = { 

497 metrics.API_CLIENT_HEADER: metrics.token_request_access_token_mds() 

498 } 

499 

500 path = "instance/service-accounts/{0}/token".format(service_account) 

501 token_json = get(request, path, params=params, headers=metrics_header) 

502 token_expiry = _helpers.utcnow() + datetime.timedelta( 

503 seconds=token_json["expires_in"] 

504 ) 

505 return token_json["access_token"], token_expiry