Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/auth/compute_engine/_metadata.py: 61%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

146 statements  

1# Copyright 2016 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Provides helper methods for talking to the Compute Engine metadata server. 

16 

17See https://cloud.google.com/compute/docs/metadata for more details. 

18""" 

19 

20import datetime 

21import http.client as http_client 

22import json 

23import logging 

24import os 

25from urllib.parse import urljoin 

26 

27import requests 

28 

29from google.auth import _helpers 

30from google.auth import environment_vars 

31from google.auth import exceptions 

32from google.auth import metrics 

33from google.auth import transport 

34from google.auth._exponential_backoff import ExponentialBackoff 

35from google.auth.compute_engine import _mtls 

36 

37 

38_LOGGER = logging.getLogger(__name__) 

39 

40_GCE_DEFAULT_MDS_IP = "169.254.169.254" 

41_GCE_DEFAULT_HOST = "metadata.google.internal" 

42_GCE_DEFAULT_MDS_HOSTS = [_GCE_DEFAULT_HOST, _GCE_DEFAULT_MDS_IP] 

43 

44# Environment variable GCE_METADATA_HOST is originally named 

45# GCE_METADATA_ROOT. For compatibility reasons, here it checks 

46# the new variable first; if not set, the system falls back 

47# to the old variable. 

48_GCE_METADATA_HOST = os.getenv(environment_vars.GCE_METADATA_HOST, None) 

49if not _GCE_METADATA_HOST: 

50 _GCE_METADATA_HOST = os.getenv( 

51 environment_vars.GCE_METADATA_ROOT, _GCE_DEFAULT_HOST 

52 ) 

53 

54 

55def _validate_gce_mds_configured_environment(): 

56 """Validates the GCE metadata server environment configuration for mTLS. 

57 

58 mTLS is only supported when connecting to the default metadata server hosts. 

59 If we are in strict mode (which requires mTLS), ensure that the metadata host 

60 has not been overridden to a custom value (which means mTLS will fail). 

61 

62 Raises: 

63 google.auth.exceptions.MutualTLSChannelError: if the environment 

64 configuration is invalid for mTLS. 

65 """ 

66 mode = _mtls._parse_mds_mode() 

67 if mode == _mtls.MdsMtlsMode.STRICT: 

68 # mTLS is only supported when connecting to the default metadata host. 

69 # Raise an exception if we are in strict mode (which requires mTLS) 

70 # but the metadata host has been overridden to a custom MDS. (which means mTLS will fail) 

71 if _GCE_METADATA_HOST not in _GCE_DEFAULT_MDS_HOSTS: 

72 raise exceptions.MutualTLSChannelError( 

73 "Mutual TLS is required, but the metadata host has been overridden. " 

74 "mTLS is only supported when connecting to the default metadata host." 

75 ) 

76 

77 

78def _get_metadata_root(use_mtls: bool): 

79 """Returns the metadata server root URL.""" 

80 

81 scheme = "https" if use_mtls else "http" 

82 return "{}://{}/computeMetadata/v1/".format(scheme, _GCE_METADATA_HOST) 

83 

84 

85def _get_metadata_ip_root(use_mtls: bool): 

86 """Returns the metadata server IP root URL.""" 

87 scheme = "https" if use_mtls else "http" 

88 return "{}://{}".format( 

89 scheme, os.getenv(environment_vars.GCE_METADATA_IP, _GCE_DEFAULT_MDS_IP) 

90 ) 

91 

92 

93_METADATA_FLAVOR_HEADER = "metadata-flavor" 

94_METADATA_FLAVOR_VALUE = "Google" 

95_METADATA_HEADERS = {_METADATA_FLAVOR_HEADER: _METADATA_FLAVOR_VALUE} 

96 

97# Timeout in seconds to wait for the GCE metadata server when detecting the 

98# GCE environment. 

99try: 

100 _METADATA_DEFAULT_TIMEOUT = int(os.getenv("GCE_METADATA_TIMEOUT", 3)) 

101except ValueError: # pragma: NO COVER 

102 _METADATA_DEFAULT_TIMEOUT = 3 

103 

104# Detect GCE Residency 

105_GOOGLE = "Google" 

106_GCE_PRODUCT_NAME_FILE = "/sys/class/dmi/id/product_name" 

107 

108 

109def is_on_gce(request): 

110 """Checks to see if the code runs on Google Compute Engine 

111 

112 Args: 

113 request (google.auth.transport.Request): A callable used to make 

114 HTTP requests. 

115 

116 Returns: 

117 bool: True if the code runs on Google Compute Engine, False otherwise. 

118 """ 

119 if ping(request): 

120 return True 

121 

122 if os.name == "nt": 

123 # TODO: implement GCE residency detection on Windows 

124 return False 

125 

126 # Detect GCE residency on Linux 

127 return detect_gce_residency_linux() 

128 

129 

130def detect_gce_residency_linux(): 

131 """Detect Google Compute Engine residency by smbios check on Linux 

132 

133 Returns: 

134 bool: True if the GCE product name file is detected, False otherwise. 

135 """ 

136 try: 

137 with open(_GCE_PRODUCT_NAME_FILE, "r") as file_obj: 

138 content = file_obj.read().strip() 

139 

140 except Exception: 

141 return False 

142 

143 return content.startswith(_GOOGLE) 

144 

145 

146def _prepare_request_for_mds(request, use_mtls=False) -> None: 

147 """Prepares a request for the metadata server. 

148 

149 This will check if mTLS should be used and mount the mTLS adapter if needed. 

150 

151 Args: 

152 request (google.auth.transport.Request): A callable used to make 

153 HTTP requests. 

154 use_mtls (bool): Whether to use mTLS for the request. 

155 

156 Returns: 

157 google.auth.transport.Request: A request object to use. 

158 If mTLS is enabled, the request will have the mTLS adapter mounted. 

159 Otherwise, the original request will be returned unchanged. 

160 """ 

161 # Only modify the request if mTLS is enabled. 

162 if use_mtls: 

163 # Ensure the request has a session to mount the adapter to. 

164 if not request.session: 

165 request.session = requests.Session() 

166 

167 adapter = _mtls.MdsMtlsAdapter() 

168 # Mount the adapter for all default GCE metadata hosts. 

169 for host in _GCE_DEFAULT_MDS_HOSTS: 

170 request.session.mount(f"https://{host}/", adapter) 

171 

172 

173def ping(request, timeout=_METADATA_DEFAULT_TIMEOUT, retry_count=3): 

174 """Checks to see if the metadata server is available. 

175 

176 Args: 

177 request (google.auth.transport.Request): A callable used to make 

178 HTTP requests. 

179 timeout (int): How long to wait for the metadata server to respond. 

180 retry_count (int): How many times to attempt connecting to metadata 

181 server using above timeout. 

182 

183 Returns: 

184 bool: True if the metadata server is reachable, False otherwise. 

185 """ 

186 use_mtls = _mtls.should_use_mds_mtls() 

187 _prepare_request_for_mds(request, use_mtls=use_mtls) 

188 # NOTE: The explicit ``timeout`` is a workaround. The underlying 

189 # issue is that resolving an unknown host on some networks will take 

190 # 20-30 seconds; making this timeout short fixes the issue, but 

191 # could lead to false negatives in the event that we are on GCE, but 

192 # the metadata resolution was particularly slow. The latter case is 

193 # "unlikely". 

194 headers = _METADATA_HEADERS.copy() 

195 headers[metrics.API_CLIENT_HEADER] = metrics.mds_ping() 

196 

197 backoff = ExponentialBackoff(total_attempts=retry_count) 

198 

199 for attempt in backoff: 

200 try: 

201 response = request( 

202 url=_get_metadata_ip_root(use_mtls), 

203 method="GET", 

204 headers=headers, 

205 timeout=timeout, 

206 ) 

207 

208 metadata_flavor = response.headers.get(_METADATA_FLAVOR_HEADER) 

209 return ( 

210 response.status == http_client.OK 

211 and metadata_flavor == _METADATA_FLAVOR_VALUE 

212 ) 

213 

214 except exceptions.TransportError as e: 

215 _LOGGER.warning( 

216 "Compute Engine Metadata server unavailable on " 

217 "attempt %s of %s. Reason: %s", 

218 attempt, 

219 retry_count, 

220 e, 

221 ) 

222 

223 return False 

224 

225 

226def get( 

227 request, 

228 path, 

229 root=None, 

230 params=None, 

231 recursive=False, 

232 retry_count=5, 

233 headers=None, 

234 return_none_for_not_found_error=False, 

235 timeout=_METADATA_DEFAULT_TIMEOUT, 

236): 

237 """Fetch a resource from the metadata server. 

238 

239 Args: 

240 request (google.auth.transport.Request): A callable used to make 

241 HTTP requests. 

242 path (str): The resource to retrieve. For example, 

243 ``'instance/service-accounts/default'``. 

244 root (Optional[str]): The full path to the metadata server root. If not 

245 provided, the default root will be used. 

246 params (Optional[Mapping[str, str]]): A mapping of query parameter 

247 keys to values. 

248 recursive (bool): Whether to do a recursive query of metadata. See 

249 https://cloud.google.com/compute/docs/metadata#aggcontents for more 

250 details. 

251 retry_count (int): How many times to attempt connecting to metadata 

252 server using above timeout. 

253 headers (Optional[Mapping[str, str]]): Headers for the request. 

254 return_none_for_not_found_error (Optional[bool]): If True, returns None 

255 for 404 error instead of throwing an exception. 

256 timeout (int): How long to wait, in seconds for the metadata server to respond. 

257 

258 Returns: 

259 Union[Mapping, str]: If the metadata server returns JSON, a mapping of 

260 the decoded JSON is returned. Otherwise, the response content is 

261 returned as a string. 

262 

263 Raises: 

264 google.auth.exceptions.TransportError: if an error occurred while 

265 retrieving metadata. 

266 google.auth.exceptions.MutualTLSChannelError: if using mtls and the environment 

267 configuration is invalid for mTLS (for example, the metadata host 

268 has been overridden in strict mTLS mode). 

269 

270 """ 

271 use_mtls = _mtls.should_use_mds_mtls() 

272 # Prepare the request object for mTLS if needed. 

273 # This will create a new request object with the mTLS session. 

274 _prepare_request_for_mds(request, use_mtls=use_mtls) 

275 

276 if root is None: 

277 root = _get_metadata_root(use_mtls) 

278 

279 # mTLS is only supported when connecting to the default metadata host. 

280 # If we are in strict mode (which requires mTLS), ensure that the metadata host 

281 # has not been overridden to a non-default host value (which means mTLS will fail). 

282 _validate_gce_mds_configured_environment() 

283 

284 base_url = urljoin(root, path) 

285 query_params = {} if params is None else params 

286 

287 headers_to_use = _METADATA_HEADERS.copy() 

288 if headers: 

289 headers_to_use.update(headers) 

290 

291 if recursive: 

292 query_params["recursive"] = "true" 

293 

294 url = _helpers.update_query(base_url, query_params) 

295 

296 backoff = ExponentialBackoff(total_attempts=retry_count) 

297 last_exception = None 

298 for attempt in backoff: 

299 try: 

300 response = request( 

301 url=url, method="GET", headers=headers_to_use, timeout=timeout 

302 ) 

303 if response.status in transport.DEFAULT_RETRYABLE_STATUS_CODES: 

304 _LOGGER.warning( 

305 "Compute Engine Metadata server unavailable on " 

306 "attempt %s of %s. Response status: %s", 

307 attempt, 

308 retry_count, 

309 response.status, 

310 ) 

311 last_exception = None 

312 continue 

313 else: 

314 last_exception = None 

315 break 

316 

317 except exceptions.TransportError as e: 

318 _LOGGER.warning( 

319 "Compute Engine Metadata server unavailable on " 

320 "attempt %s of %s. Reason: %s", 

321 attempt, 

322 retry_count, 

323 e, 

324 ) 

325 last_exception = e 

326 else: 

327 if last_exception: 

328 raise exceptions.TransportError( 

329 "Failed to retrieve {} from the Google Compute Engine " 

330 "metadata service. Compute Engine Metadata server unavailable. " 

331 "Last exception: {}".format(url, last_exception) 

332 ) from last_exception 

333 else: 

334 error_details = ( 

335 response.data.decode("utf-8") 

336 if hasattr(response.data, "decode") 

337 else response.data 

338 ) 

339 raise exceptions.TransportError( 

340 "Failed to retrieve {} from the Google Compute Engine " 

341 "metadata service. Compute Engine Metadata server unavailable. " 

342 "Response status: {}\nResponse details:\n{}".format( 

343 url, response.status, error_details 

344 ) 

345 ) 

346 

347 content = _helpers.from_bytes(response.data) 

348 

349 if response.status == http_client.NOT_FOUND and return_none_for_not_found_error: 

350 return None 

351 

352 if response.status == http_client.OK: 

353 if ( 

354 _helpers.parse_content_type(response.headers["content-type"]) 

355 == "application/json" 

356 ): 

357 try: 

358 return json.loads(content) 

359 except ValueError as caught_exc: 

360 new_exc = exceptions.TransportError( 

361 "Received invalid JSON from the Google Compute Engine " 

362 "metadata service: {:.20}".format(content) 

363 ) 

364 raise new_exc from caught_exc 

365 else: 

366 return content 

367 

368 raise exceptions.TransportError( 

369 "Failed to retrieve {} from the Google Compute Engine " 

370 "metadata service. Status: {} Response:\n{}".format( 

371 url, response.status, response.data 

372 ), 

373 response, 

374 ) 

375 

376 

377def get_project_id(request): 

378 """Get the Google Cloud Project ID from the metadata server. 

379 

380 Args: 

381 request (google.auth.transport.Request): A callable used to make 

382 HTTP requests. 

383 

384 Returns: 

385 str: The project ID 

386 

387 Raises: 

388 google.auth.exceptions.TransportError: if an error occurred while 

389 retrieving metadata. 

390 """ 

391 return get(request, "project/project-id") 

392 

393 

394def get_universe_domain(request): 

395 """Get the universe domain value from the metadata server. 

396 

397 Args: 

398 request (google.auth.transport.Request): A callable used to make 

399 HTTP requests. 

400 

401 Returns: 

402 str: The universe domain value. If the universe domain endpoint is not 

403 not found, return the default value, which is googleapis.com 

404 

405 Raises: 

406 google.auth.exceptions.TransportError: if an error other than 

407 404 occurs while retrieving metadata. 

408 """ 

409 universe_domain = get( 

410 request, "universe/universe-domain", return_none_for_not_found_error=True 

411 ) 

412 if not universe_domain: 

413 return "googleapis.com" 

414 return universe_domain 

415 

416 

417def get_service_account_info(request, service_account="default"): 

418 """Get information about a service account from the metadata server. 

419 

420 Args: 

421 request (google.auth.transport.Request): A callable used to make 

422 HTTP requests. 

423 service_account (str): The string 'default' or a service account email 

424 address. The determines which service account for which to acquire 

425 information. 

426 

427 Returns: 

428 Mapping: The service account's information, for example:: 

429 

430 { 

431 'email': '...', 

432 'scopes': ['scope', ...], 

433 'aliases': ['default', '...'] 

434 } 

435 

436 Raises: 

437 google.auth.exceptions.TransportError: if an error occurred while 

438 retrieving metadata. 

439 """ 

440 path = "instance/service-accounts/{0}/".format(service_account) 

441 # See https://cloud.google.com/compute/docs/metadata#aggcontents 

442 # for more on the use of 'recursive'. 

443 return get(request, path, params={"recursive": "true"}) 

444 

445 

446def get_service_account_token(request, service_account="default", scopes=None): 

447 """Get the OAuth 2.0 access token for a service account. 

448 

449 Args: 

450 request (google.auth.transport.Request): A callable used to make 

451 HTTP requests. 

452 service_account (str): The string 'default' or a service account email 

453 address. The determines which service account for which to acquire 

454 an access token. 

455 scopes (Optional[Union[str, List[str]]]): Optional string or list of 

456 strings with auth scopes. 

457 Returns: 

458 Tuple[str, datetime]: The access token and its expiration. 

459 

460 Raises: 

461 google.auth.exceptions.TransportError: if an error occurred while 

462 retrieving metadata. 

463 """ 

464 from google.auth import _agent_identity_utils 

465 

466 params = {} 

467 if scopes: 

468 if not isinstance(scopes, str): 

469 scopes = ",".join(scopes) 

470 params["scopes"] = scopes 

471 

472 cert = _agent_identity_utils.get_and_parse_agent_identity_certificate() 

473 if cert: 

474 if _agent_identity_utils.should_request_bound_token(cert): 

475 fingerprint = _agent_identity_utils.calculate_certificate_fingerprint(cert) 

476 params["bindCertificateFingerprint"] = fingerprint 

477 

478 metrics_header = { 

479 metrics.API_CLIENT_HEADER: metrics.token_request_access_token_mds() 

480 } 

481 

482 path = "instance/service-accounts/{0}/token".format(service_account) 

483 token_json = get(request, path, params=params, headers=metrics_header) 

484 token_expiry = _helpers.utcnow() + datetime.timedelta( 

485 seconds=token_json["expires_in"] 

486 ) 

487 return token_json["access_token"], token_expiry