Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/auth/compute_engine/_metadata.py: 62%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

142 statements  

1# Copyright 2016 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Provides helper methods for talking to the Compute Engine metadata server. 

16 

17See https://cloud.google.com/compute/docs/metadata for more details. 

18""" 

19 

20import datetime 

21import http.client as http_client 

22import json 

23import logging 

24import os 

25from urllib.parse import urljoin 

26 

27import requests 

28 

29from google.auth import _helpers 

30from google.auth import environment_vars 

31from google.auth import exceptions 

32from google.auth import metrics 

33from google.auth import transport 

34from google.auth._exponential_backoff import ExponentialBackoff 

35from google.auth.compute_engine import _mtls 

36 

37 

38_LOGGER = logging.getLogger(__name__) 

39 

40_GCE_DEFAULT_MDS_IP = "169.254.169.254" 

41_GCE_DEFAULT_HOST = "metadata.google.internal" 

42_GCE_DEFAULT_MDS_HOSTS = [_GCE_DEFAULT_HOST, _GCE_DEFAULT_MDS_IP] 

43 

44# Environment variable GCE_METADATA_HOST is originally named 

45# GCE_METADATA_ROOT. For compatibility reasons, here it checks 

46# the new variable first; if not set, the system falls back 

47# to the old variable. 

48_GCE_METADATA_HOST = os.getenv(environment_vars.GCE_METADATA_HOST, None) 

49if not _GCE_METADATA_HOST: 

50 _GCE_METADATA_HOST = os.getenv( 

51 environment_vars.GCE_METADATA_ROOT, _GCE_DEFAULT_HOST 

52 ) 

53 

54 

55def _validate_gce_mds_configured_environment(): 

56 """Validates the GCE metadata server environment configuration for mTLS. 

57 

58 mTLS is only supported when connecting to the default metadata server hosts. 

59 If we are in strict mode (which requires mTLS), ensure that the metadata host 

60 has not been overridden to a custom value (which means mTLS will fail). 

61 

62 Raises: 

63 google.auth.exceptions.MutualTLSChannelError: if the environment 

64 configuration is invalid for mTLS. 

65 """ 

66 mode = _mtls._parse_mds_mode() 

67 if mode == _mtls.MdsMtlsMode.STRICT: 

68 # mTLS is only supported when connecting to the default metadata host. 

69 # Raise an exception if we are in strict mode (which requires mTLS) 

70 # but the metadata host has been overridden to a custom MDS. (which means mTLS will fail) 

71 if _GCE_METADATA_HOST not in _GCE_DEFAULT_MDS_HOSTS: 

72 raise exceptions.MutualTLSChannelError( 

73 "Mutual TLS is required, but the metadata host has been overridden. " 

74 "mTLS is only supported when connecting to the default metadata host." 

75 ) 

76 

77 

78def _get_metadata_root(use_mtls: bool): 

79 """Returns the metadata server root URL.""" 

80 

81 scheme = "https" if use_mtls else "http" 

82 return "{}://{}/computeMetadata/v1/".format(scheme, _GCE_METADATA_HOST) 

83 

84 

85def _get_metadata_ip_root(use_mtls: bool): 

86 """Returns the metadata server IP root URL.""" 

87 scheme = "https" if use_mtls else "http" 

88 return "{}://{}".format( 

89 scheme, os.getenv(environment_vars.GCE_METADATA_IP, _GCE_DEFAULT_MDS_IP) 

90 ) 

91 

92 

93_METADATA_FLAVOR_HEADER = "metadata-flavor" 

94_METADATA_FLAVOR_VALUE = "Google" 

95_METADATA_HEADERS = {_METADATA_FLAVOR_HEADER: _METADATA_FLAVOR_VALUE} 

96 

97# Timeout in seconds to wait for the GCE metadata server when detecting the 

98# GCE environment. 

99try: 

100 _METADATA_DEFAULT_TIMEOUT = int(os.getenv("GCE_METADATA_TIMEOUT", 3)) 

101except ValueError: # pragma: NO COVER 

102 _METADATA_DEFAULT_TIMEOUT = 3 

103 

104# Detect GCE Residency 

105_GOOGLE = "Google" 

106_GCE_PRODUCT_NAME_FILE = "/sys/class/dmi/id/product_name" 

107 

108 

109def is_on_gce(request): 

110 """Checks to see if the code runs on Google Compute Engine 

111 

112 Args: 

113 request (google.auth.transport.Request): A callable used to make 

114 HTTP requests. 

115 

116 Returns: 

117 bool: True if the code runs on Google Compute Engine, False otherwise. 

118 """ 

119 if ping(request): 

120 return True 

121 

122 if os.name == "nt": 

123 # TODO: implement GCE residency detection on Windows 

124 return False 

125 

126 # Detect GCE residency on Linux 

127 return detect_gce_residency_linux() 

128 

129 

130def detect_gce_residency_linux(): 

131 """Detect Google Compute Engine residency by smbios check on Linux 

132 

133 Returns: 

134 bool: True if the GCE product name file is detected, False otherwise. 

135 """ 

136 try: 

137 with open(_GCE_PRODUCT_NAME_FILE, "r") as file_obj: 

138 content = file_obj.read().strip() 

139 

140 except Exception: 

141 return False 

142 

143 return content.startswith(_GOOGLE) 

144 

145 

146def _prepare_request_for_mds(request, use_mtls=False) -> None: 

147 """Prepares a request for the metadata server. 

148 

149 This will check if mTLS should be used and mount the mTLS adapter if needed. 

150 

151 Args: 

152 request (google.auth.transport.Request): A callable used to make 

153 HTTP requests. 

154 use_mtls (bool): Whether to use mTLS for the request. 

155 

156 Returns: 

157 google.auth.transport.Request: A request object to use. 

158 If mTLS is enabled, the request will have the mTLS adapter mounted. 

159 Otherwise, the original request will be returned unchanged. 

160 """ 

161 # Only modify the request if mTLS is enabled. 

162 if use_mtls: 

163 # Ensure the request has a session to mount the adapter to. 

164 if not request.session: 

165 request.session = requests.Session() 

166 

167 adapter = _mtls.MdsMtlsAdapter() 

168 # Mount the adapter for all default GCE metadata hosts. 

169 for host in _GCE_DEFAULT_MDS_HOSTS: 

170 request.session.mount(f"https://{host}/", adapter) 

171 

172 

173def ping(request, timeout=_METADATA_DEFAULT_TIMEOUT, retry_count=3): 

174 """Checks to see if the metadata server is available. 

175 

176 Args: 

177 request (google.auth.transport.Request): A callable used to make 

178 HTTP requests. 

179 timeout (int): How long to wait for the metadata server to respond. 

180 retry_count (int): How many times to attempt connecting to metadata 

181 server using above timeout. 

182 

183 Returns: 

184 bool: True if the metadata server is reachable, False otherwise. 

185 """ 

186 use_mtls = _mtls.should_use_mds_mtls() 

187 _prepare_request_for_mds(request, use_mtls=use_mtls) 

188 # NOTE: The explicit ``timeout`` is a workaround. The underlying 

189 # issue is that resolving an unknown host on some networks will take 

190 # 20-30 seconds; making this timeout short fixes the issue, but 

191 # could lead to false negatives in the event that we are on GCE, but 

192 # the metadata resolution was particularly slow. The latter case is 

193 # "unlikely". 

194 headers = _METADATA_HEADERS.copy() 

195 headers[metrics.API_CLIENT_HEADER] = metrics.mds_ping() 

196 

197 backoff = ExponentialBackoff(total_attempts=retry_count) 

198 

199 for attempt in backoff: 

200 try: 

201 response = request( 

202 url=_get_metadata_ip_root(use_mtls), 

203 method="GET", 

204 headers=headers, 

205 timeout=timeout, 

206 ) 

207 

208 metadata_flavor = response.headers.get(_METADATA_FLAVOR_HEADER) 

209 return ( 

210 response.status == http_client.OK 

211 and metadata_flavor == _METADATA_FLAVOR_VALUE 

212 ) 

213 

214 except exceptions.TransportError as e: 

215 _LOGGER.warning( 

216 "Compute Engine Metadata server unavailable on " 

217 "attempt %s of %s. Reason: %s", 

218 attempt, 

219 retry_count, 

220 e, 

221 ) 

222 

223 return False 

224 

225 

226def get( 

227 request, 

228 path, 

229 root=None, 

230 params=None, 

231 recursive=False, 

232 retry_count=5, 

233 headers=None, 

234 return_none_for_not_found_error=False, 

235 timeout=_METADATA_DEFAULT_TIMEOUT, 

236): 

237 """Fetch a resource from the metadata server. 

238 

239 Args: 

240 request (google.auth.transport.Request): A callable used to make 

241 HTTP requests. 

242 path (str): The resource to retrieve. For example, 

243 ``'instance/service-accounts/default'``. 

244 root (Optional[str]): The full path to the metadata server root. If not 

245 provided, the default root will be used. 

246 params (Optional[Mapping[str, str]]): A mapping of query parameter 

247 keys to values. 

248 recursive (bool): Whether to do a recursive query of metadata. See 

249 https://cloud.google.com/compute/docs/metadata#aggcontents for more 

250 details. 

251 retry_count (int): How many times to attempt connecting to metadata 

252 server using above timeout. 

253 headers (Optional[Mapping[str, str]]): Headers for the request. 

254 return_none_for_not_found_error (Optional[bool]): If True, returns None 

255 for 404 error instead of throwing an exception. 

256 timeout (int): How long to wait, in seconds for the metadata server to respond. 

257 

258 Returns: 

259 Union[Mapping, str]: If the metadata server returns JSON, a mapping of 

260 the decoded JSON is returned. Otherwise, the response content is 

261 returned as a string. 

262 

263 Raises: 

264 google.auth.exceptions.TransportError: if an error occurred while 

265 retrieving metadata. 

266 google.auth.exceptions.MutualTLSChannelError: if using mtls and the environment 

267 configuration is invalid for mTLS (for example, the metadata host 

268 has been overridden in strict mTLS mode). 

269 

270 """ 

271 use_mtls = _mtls.should_use_mds_mtls() 

272 # Prepare the request object for mTLS if needed. 

273 # This will create a new request object with the mTLS session. 

274 _prepare_request_for_mds(request, use_mtls=use_mtls) 

275 

276 if root is None: 

277 root = _get_metadata_root(use_mtls) 

278 

279 # mTLS is only supported when connecting to the default metadata host. 

280 # If we are in strict mode (which requires mTLS), ensure that the metadata host 

281 # has not been overridden to a non-default host value (which means mTLS will fail). 

282 _validate_gce_mds_configured_environment() 

283 

284 base_url = urljoin(root, path) 

285 query_params = {} if params is None else params 

286 

287 headers_to_use = _METADATA_HEADERS.copy() 

288 if headers: 

289 headers_to_use.update(headers) 

290 

291 if recursive: 

292 query_params["recursive"] = "true" 

293 

294 url = _helpers.update_query(base_url, query_params) 

295 

296 backoff = ExponentialBackoff(total_attempts=retry_count) 

297 failure_reason = None 

298 for attempt in backoff: 

299 try: 

300 response = request( 

301 url=url, method="GET", headers=headers_to_use, timeout=timeout 

302 ) 

303 if response.status in transport.DEFAULT_RETRYABLE_STATUS_CODES: 

304 _LOGGER.warning( 

305 "Compute Engine Metadata server unavailable on " 

306 "attempt %s of %s. Response status: %s", 

307 attempt, 

308 retry_count, 

309 response.status, 

310 ) 

311 failure_reason = ( 

312 response.data.decode("utf-8") 

313 if hasattr(response.data, "decode") 

314 else response.data 

315 ) 

316 continue 

317 else: 

318 break 

319 

320 except exceptions.TransportError as e: 

321 _LOGGER.warning( 

322 "Compute Engine Metadata server unavailable on " 

323 "attempt %s of %s. Reason: %s", 

324 attempt, 

325 retry_count, 

326 e, 

327 ) 

328 failure_reason = e 

329 else: 

330 raise exceptions.TransportError( 

331 "Failed to retrieve {} from the Google Compute Engine " 

332 "metadata service. Compute Engine Metadata server unavailable due to {}".format( 

333 url, failure_reason 

334 ) 

335 ) 

336 

337 content = _helpers.from_bytes(response.data) 

338 

339 if response.status == http_client.NOT_FOUND and return_none_for_not_found_error: 

340 return None 

341 

342 if response.status == http_client.OK: 

343 if ( 

344 _helpers.parse_content_type(response.headers["content-type"]) 

345 == "application/json" 

346 ): 

347 try: 

348 return json.loads(content) 

349 except ValueError as caught_exc: 

350 new_exc = exceptions.TransportError( 

351 "Received invalid JSON from the Google Compute Engine " 

352 "metadata service: {:.20}".format(content) 

353 ) 

354 raise new_exc from caught_exc 

355 else: 

356 return content 

357 

358 raise exceptions.TransportError( 

359 "Failed to retrieve {} from the Google Compute Engine " 

360 "metadata service. Status: {} Response:\n{}".format( 

361 url, response.status, response.data 

362 ), 

363 response, 

364 ) 

365 

366 

367def get_project_id(request): 

368 """Get the Google Cloud Project ID from the metadata server. 

369 

370 Args: 

371 request (google.auth.transport.Request): A callable used to make 

372 HTTP requests. 

373 

374 Returns: 

375 str: The project ID 

376 

377 Raises: 

378 google.auth.exceptions.TransportError: if an error occurred while 

379 retrieving metadata. 

380 """ 

381 return get(request, "project/project-id") 

382 

383 

384def get_universe_domain(request): 

385 """Get the universe domain value from the metadata server. 

386 

387 Args: 

388 request (google.auth.transport.Request): A callable used to make 

389 HTTP requests. 

390 

391 Returns: 

392 str: The universe domain value. If the universe domain endpoint is not 

393 not found, return the default value, which is googleapis.com 

394 

395 Raises: 

396 google.auth.exceptions.TransportError: if an error other than 

397 404 occurs while retrieving metadata. 

398 """ 

399 universe_domain = get( 

400 request, "universe/universe-domain", return_none_for_not_found_error=True 

401 ) 

402 if not universe_domain: 

403 return "googleapis.com" 

404 return universe_domain 

405 

406 

407def get_service_account_info(request, service_account="default"): 

408 """Get information about a service account from the metadata server. 

409 

410 Args: 

411 request (google.auth.transport.Request): A callable used to make 

412 HTTP requests. 

413 service_account (str): The string 'default' or a service account email 

414 address. The determines which service account for which to acquire 

415 information. 

416 

417 Returns: 

418 Mapping: The service account's information, for example:: 

419 

420 { 

421 'email': '...', 

422 'scopes': ['scope', ...], 

423 'aliases': ['default', '...'] 

424 } 

425 

426 Raises: 

427 google.auth.exceptions.TransportError: if an error occurred while 

428 retrieving metadata. 

429 """ 

430 path = "instance/service-accounts/{0}/".format(service_account) 

431 # See https://cloud.google.com/compute/docs/metadata#aggcontents 

432 # for more on the use of 'recursive'. 

433 return get(request, path, params={"recursive": "true"}) 

434 

435 

436def get_service_account_token(request, service_account="default", scopes=None): 

437 """Get the OAuth 2.0 access token for a service account. 

438 

439 Args: 

440 request (google.auth.transport.Request): A callable used to make 

441 HTTP requests. 

442 service_account (str): The string 'default' or a service account email 

443 address. The determines which service account for which to acquire 

444 an access token. 

445 scopes (Optional[Union[str, List[str]]]): Optional string or list of 

446 strings with auth scopes. 

447 Returns: 

448 Tuple[str, datetime]: The access token and its expiration. 

449 

450 Raises: 

451 google.auth.exceptions.TransportError: if an error occurred while 

452 retrieving metadata. 

453 """ 

454 from google.auth import _agent_identity_utils 

455 

456 params = {} 

457 if scopes: 

458 if not isinstance(scopes, str): 

459 scopes = ",".join(scopes) 

460 params["scopes"] = scopes 

461 

462 cert = _agent_identity_utils.get_and_parse_agent_identity_certificate() 

463 if cert: 

464 if _agent_identity_utils.should_request_bound_token(cert): 

465 fingerprint = _agent_identity_utils.calculate_certificate_fingerprint(cert) 

466 params["bindCertificateFingerprint"] = fingerprint 

467 

468 metrics_header = { 

469 metrics.API_CLIENT_HEADER: metrics.token_request_access_token_mds() 

470 } 

471 

472 path = "instance/service-accounts/{0}/token".format(service_account) 

473 token_json = get(request, path, params=params, headers=metrics_header) 

474 token_expiry = _helpers.utcnow() + datetime.timedelta( 

475 seconds=token_json["expires_in"] 

476 ) 

477 return token_json["access_token"], token_expiry