Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/cloud/bigquery/_job_helpers.py: 17%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

160 statements  

1# Copyright 2021 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# https://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Helpers for interacting with the job REST APIs from the client. 

16 

17For queries, there are three cases to consider: 

18 

191. jobs.insert: This always returns a job resource. 

202. jobs.query, jobCreationMode=JOB_CREATION_REQUIRED: 

21 This sometimes can return the results inline, but always includes a job ID. 

223. jobs.query, jobCreationMode=JOB_CREATION_OPTIONAL: 

23 This sometimes doesn't create a job at all, instead returning the results. 

24 For better debugging, an auto-generated query ID is included in the 

25 response. 

26 

27Client.query() calls either (1) or (2), depending on what the user provides 

28for the api_method parameter. query() always returns a QueryJob object, which 

29can retry the query when the query job fails for a retriable reason. 

30 

31Client.query_and_wait() calls (3). This returns a RowIterator that may wrap 

32local results from the response or may wrap a query job containing multiple 

33pages of results. Even though query_and_wait() waits for the job to complete, 

34we still need a separate job_retry object because there are different 

35predicates where it is safe to generate a new query ID. 

36""" 

37 

38import copy 

39import functools 

40import uuid 

41import textwrap 

42from typing import Any, Dict, Optional, TYPE_CHECKING, Union 

43import warnings 

44 

45import google.api_core.exceptions as core_exceptions 

46from google.api_core import retry as retries 

47 

48from google.cloud.bigquery import job 

49import google.cloud.bigquery.query 

50from google.cloud.bigquery import table 

51import google.cloud.bigquery.retry 

52from google.cloud.bigquery.retry import POLLING_DEFAULT_VALUE 

53 

54# Avoid circular imports 

55if TYPE_CHECKING: # pragma: NO COVER 

56 from google.cloud.bigquery.client import Client 

57 

58 

59# The purpose of _TIMEOUT_BUFFER_MILLIS is to allow the server-side timeout to 

60# happen before the client-side timeout. This is not strictly necessary, as the 

61# client retries client-side timeouts, but the hope by making the server-side 

62# timeout slightly shorter is that it can save the server from some unncessary 

63# processing time. 

64# 

65# 250 milliseconds is chosen arbitrarily, though should be about the right 

66# order of magnitude for network latency and switching delays. It is about the 

67# amount of time for light to circumnavigate the world twice. 

68_TIMEOUT_BUFFER_MILLIS = 250 

69 

70 

71def make_job_id(job_id: Optional[str] = None, prefix: Optional[str] = None) -> str: 

72 """Construct an ID for a new job. 

73 

74 Args: 

75 job_id: the user-provided job ID. 

76 prefix: the user-provided prefix for a job ID. 

77 

78 Returns: 

79 str: A job ID 

80 """ 

81 if job_id is not None: 

82 return job_id 

83 elif prefix is not None: 

84 return str(prefix) + str(uuid.uuid4()) 

85 else: 

86 return str(uuid.uuid4()) 

87 

88 

89def job_config_with_defaults( 

90 job_config: Optional[job.QueryJobConfig], 

91 default_job_config: Optional[job.QueryJobConfig], 

92) -> Optional[job.QueryJobConfig]: 

93 """Create a copy of `job_config`, replacing unset values with those from 

94 `default_job_config`. 

95 """ 

96 if job_config is None: 

97 return default_job_config 

98 

99 if default_job_config is None: 

100 return job_config 

101 

102 # Both job_config and default_job_config are not None, so make a copy of 

103 # job_config merged with default_job_config. Anything already explicitly 

104 # set on job_config should not be replaced. 

105 return job_config._fill_from_default(default_job_config) 

106 

107 

108def query_jobs_insert( 

109 client: "Client", 

110 query: str, 

111 job_config: Optional[job.QueryJobConfig], 

112 job_id: Optional[str], 

113 job_id_prefix: Optional[str], 

114 location: Optional[str], 

115 project: str, 

116 retry: Optional[retries.Retry], 

117 timeout: Optional[float], 

118 job_retry: Optional[retries.Retry], 

119) -> job.QueryJob: 

120 """Initiate a query using jobs.insert. 

121 

122 See: https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/insert 

123 """ 

124 job_id_given = job_id is not None 

125 job_id_save = job_id 

126 job_config_save = job_config 

127 

128 def do_query(): 

129 # Make a copy now, so that original doesn't get changed by the process 

130 # below and to facilitate retry 

131 job_config = copy.deepcopy(job_config_save) 

132 

133 job_id = make_job_id(job_id_save, job_id_prefix) 

134 job_ref = job._JobReference(job_id, project=project, location=location) 

135 query_job = job.QueryJob(job_ref, query, client=client, job_config=job_config) 

136 

137 try: 

138 query_job._begin(retry=retry, timeout=timeout) 

139 except core_exceptions.Conflict as create_exc: 

140 # The thought is if someone is providing their own job IDs and they get 

141 # their job ID generation wrong, this could end up returning results for 

142 # the wrong query. We thus only try to recover if job ID was not given. 

143 if job_id_given: 

144 raise create_exc 

145 

146 try: 

147 # Sometimes we get a 404 after a Conflict. In this case, we 

148 # have pretty high confidence that by retrying the 404, we'll 

149 # (hopefully) eventually recover the job. 

150 # https://github.com/googleapis/python-bigquery/issues/2134 

151 # 

152 # Allow users who want to completely disable retries to 

153 # continue to do so by setting retry to None. 

154 get_job_retry = retry 

155 if retry is not None: 

156 # TODO(tswast): Amend the user's retry object with allowing 

157 # 404 to retry when there's a public way to do so. 

158 # https://github.com/googleapis/python-api-core/issues/796 

159 get_job_retry = ( 

160 google.cloud.bigquery.retry._DEFAULT_GET_JOB_CONFLICT_RETRY 

161 ) 

162 

163 query_job = client.get_job( 

164 job_id, 

165 project=project, 

166 location=location, 

167 retry=get_job_retry, 

168 timeout=google.cloud.bigquery.retry.DEFAULT_GET_JOB_TIMEOUT, 

169 ) 

170 except core_exceptions.GoogleAPIError: # (includes RetryError) 

171 raise 

172 else: 

173 return query_job 

174 else: 

175 return query_job 

176 

177 # Allow users who want to completely disable retries to 

178 # continue to do so by setting job_retry to None. 

179 if job_retry is not None: 

180 do_query = google.cloud.bigquery.retry._DEFAULT_QUERY_JOB_INSERT_RETRY(do_query) 

181 

182 future = do_query() 

183 

184 # The future might be in a failed state now, but if it's 

185 # unrecoverable, we'll find out when we ask for it's result, at which 

186 # point, we may retry. 

187 if not job_id_given: 

188 future._retry_do_query = do_query # in case we have to retry later 

189 future._job_retry = job_retry 

190 

191 return future 

192 

193 

194def _validate_job_config(request_body: Dict[str, Any], invalid_key: str): 

195 """Catch common mistakes, such as passing in a *JobConfig object of the 

196 wrong type. 

197 """ 

198 if invalid_key in request_body: 

199 raise ValueError(f"got unexpected key {repr(invalid_key)} in job_config") 

200 

201 

202def validate_job_retry(job_id: Optional[str], job_retry: Optional[retries.Retry]): 

203 """Catch common mistakes, such as setting a job_id and job_retry at the same 

204 time. 

205 """ 

206 if job_id is not None and job_retry is not None: 

207 # TODO(tswast): To avoid breaking changes but still allow a default 

208 # query job retry, we currently only raise if they explicitly set a 

209 # job_retry other than the default. In a future version, we may want to 

210 # avoid this check for DEFAULT_JOB_RETRY and always raise. 

211 if job_retry is not google.cloud.bigquery.retry.DEFAULT_JOB_RETRY: 

212 raise TypeError( 

213 textwrap.dedent( 

214 """ 

215 `job_retry` was provided, but the returned job is 

216 not retryable, because a custom `job_id` was 

217 provided. To customize the job ID and allow for job 

218 retries, set job_id_prefix, instead. 

219 """ 

220 ).strip() 

221 ) 

222 else: 

223 warnings.warn( 

224 textwrap.dedent( 

225 """ 

226 job_retry must be explicitly set to None if job_id is set. 

227 BigQuery cannot retry a failed job by using the exact 

228 same ID. Setting job_id without explicitly disabling 

229 job_retry will raise an error in the future. To avoid this 

230 warning, either use job_id_prefix instead (preferred) or 

231 set job_retry=None. 

232 """ 

233 ).strip(), 

234 category=FutureWarning, 

235 # user code -> client.query / client.query_and_wait -> validate_job_retry 

236 stacklevel=3, 

237 ) 

238 

239 

240def _to_query_request( 

241 job_config: Optional[job.QueryJobConfig] = None, 

242 *, 

243 query: str, 

244 location: Optional[str] = None, 

245 timeout: Optional[float] = None, 

246) -> Dict[str, Any]: 

247 """Transform from Job resource to QueryRequest resource. 

248 

249 Most of the keys in job.configuration.query are in common with 

250 QueryRequest. If any configuration property is set that is not available in 

251 jobs.query, it will result in a server-side error. 

252 """ 

253 request_body = copy.copy(job_config.to_api_repr()) if job_config else {} 

254 

255 _validate_job_config(request_body, job.CopyJob._JOB_TYPE) 

256 _validate_job_config(request_body, job.ExtractJob._JOB_TYPE) 

257 _validate_job_config(request_body, job.LoadJob._JOB_TYPE) 

258 

259 # Move query.* properties to top-level. 

260 query_config_resource = request_body.pop("query", {}) 

261 request_body.update(query_config_resource) 

262 

263 # Default to standard SQL. 

264 request_body.setdefault("useLegacySql", False) 

265 

266 # Since jobs.query can return results, ensure we use the lossless timestamp 

267 # format. See: https://github.com/googleapis/python-bigquery/issues/395 

268 request_body.setdefault("formatOptions", {}) 

269 request_body["formatOptions"]["useInt64Timestamp"] = True # type: ignore 

270 

271 if timeout is not None: 

272 # Subtract a buffer for context switching, network latency, etc. 

273 request_body["timeoutMs"] = max(0, int(1000 * timeout) - _TIMEOUT_BUFFER_MILLIS) 

274 

275 if location is not None: 

276 request_body["location"] = location 

277 

278 request_body["query"] = query 

279 

280 return request_body 

281 

282 

283def _to_query_job( 

284 client: "Client", 

285 query: str, 

286 request_config: Optional[job.QueryJobConfig], 

287 query_response: Dict[str, Any], 

288) -> job.QueryJob: 

289 job_ref_resource = query_response["jobReference"] 

290 job_ref = job._JobReference._from_api_repr(job_ref_resource) 

291 query_job = job.QueryJob(job_ref, query, client=client) 

292 query_job._properties.setdefault("configuration", {}) 

293 

294 # Not all relevant properties are in the jobs.query response. Populate some 

295 # expected properties based on the job configuration. 

296 if request_config is not None: 

297 query_job._properties["configuration"].update(request_config.to_api_repr()) 

298 

299 query_job._properties["configuration"].setdefault("query", {}) 

300 query_job._properties["configuration"]["query"]["query"] = query 

301 query_job._properties["configuration"]["query"].setdefault("useLegacySql", False) 

302 

303 query_job._properties.setdefault("statistics", {}) 

304 query_job._properties["statistics"].setdefault("query", {}) 

305 query_job._properties["statistics"]["query"]["cacheHit"] = query_response.get( 

306 "cacheHit" 

307 ) 

308 query_job._properties["statistics"]["query"]["schema"] = query_response.get( 

309 "schema" 

310 ) 

311 query_job._properties["statistics"]["query"][ 

312 "totalBytesProcessed" 

313 ] = query_response.get("totalBytesProcessed") 

314 

315 # Set errors if any were encountered. 

316 query_job._properties.setdefault("status", {}) 

317 if "errors" in query_response: 

318 # Set errors but not errorResult. If there was an error that failed 

319 # the job, jobs.query behaves like jobs.getQueryResults and returns a 

320 # non-success HTTP status code. 

321 errors = query_response["errors"] 

322 query_job._properties["status"]["errors"] = errors 

323 

324 # Avoid an extra call to `getQueryResults` if the query has finished. 

325 job_complete = query_response.get("jobComplete") 

326 if job_complete: 

327 query_job._query_results = google.cloud.bigquery.query._QueryResults( 

328 query_response 

329 ) 

330 

331 # We want job.result() to refresh the job state, so the conversion is 

332 # always "PENDING", even if the job is finished. 

333 query_job._properties["status"]["state"] = "PENDING" 

334 

335 return query_job 

336 

337 

338def _to_query_path(project: str) -> str: 

339 return f"/projects/{project}/queries" 

340 

341 

342def query_jobs_query( 

343 client: "Client", 

344 query: str, 

345 job_config: Optional[job.QueryJobConfig], 

346 location: Optional[str], 

347 project: str, 

348 retry: retries.Retry, 

349 timeout: Optional[float], 

350 job_retry: Optional[retries.Retry], 

351) -> job.QueryJob: 

352 """Initiate a query using jobs.query with jobCreationMode=JOB_CREATION_REQUIRED. 

353 

354 See: https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/query 

355 """ 

356 path = _to_query_path(project) 

357 request_body = _to_query_request( 

358 query=query, job_config=job_config, location=location, timeout=timeout 

359 ) 

360 

361 def do_query(): 

362 request_body["requestId"] = make_job_id() 

363 span_attributes = {"path": path} 

364 api_response = client._call_api( 

365 retry, 

366 span_name="BigQuery.query", 

367 span_attributes=span_attributes, 

368 method="POST", 

369 path=path, 

370 data=request_body, 

371 timeout=timeout, 

372 ) 

373 return _to_query_job(client, query, job_config, api_response) 

374 

375 future = do_query() 

376 

377 # The future might be in a failed state now, but if it's 

378 # unrecoverable, we'll find out when we ask for it's result, at which 

379 # point, we may retry. 

380 future._retry_do_query = do_query # in case we have to retry later 

381 future._job_retry = job_retry 

382 

383 return future 

384 

385 

386def query_and_wait( 

387 client: "Client", 

388 query: str, 

389 *, 

390 job_config: Optional[job.QueryJobConfig], 

391 location: Optional[str], 

392 project: str, 

393 api_timeout: Optional[float] = None, 

394 wait_timeout: Optional[Union[float, object]] = POLLING_DEFAULT_VALUE, 

395 retry: Optional[retries.Retry], 

396 job_retry: Optional[retries.Retry], 

397 page_size: Optional[int] = None, 

398 max_results: Optional[int] = None, 

399) -> table.RowIterator: 

400 """Run the query, wait for it to finish, and return the results. 

401 

402 

403 Args: 

404 client: 

405 BigQuery client to make API calls. 

406 query (str): 

407 SQL query to be executed. Defaults to the standard SQL 

408 dialect. Use the ``job_config`` parameter to change dialects. 

409 job_config (Optional[google.cloud.bigquery.job.QueryJobConfig]): 

410 Extra configuration options for the job. 

411 To override any options that were previously set in 

412 the ``default_query_job_config`` given to the 

413 ``Client`` constructor, manually set those options to ``None``, 

414 or whatever value is preferred. 

415 location (Optional[str]): 

416 Location where to run the job. Must match the location of the 

417 table used in the query as well as the destination table. 

418 project (Optional[str]): 

419 Project ID of the project of where to run the job. Defaults 

420 to the client's project. 

421 api_timeout (Optional[float]): 

422 The number of seconds to wait for the underlying HTTP transport 

423 before using ``retry``. 

424 wait_timeout (Optional[Union[float, object]]): 

425 The number of seconds to wait for the query to finish. If the 

426 query doesn't finish before this timeout, the client attempts 

427 to cancel the query. If unset, the underlying Client.get_job() API 

428 call has timeout, but we still wait indefinitely for the job to 

429 finish. 

430 retry (Optional[google.api_core.retry.Retry]): 

431 How to retry the RPC. This only applies to making RPC 

432 calls. It isn't used to retry failed jobs. This has 

433 a reasonable default that should only be overridden 

434 with care. 

435 job_retry (Optional[google.api_core.retry.Retry]): 

436 How to retry failed jobs. The default retries 

437 rate-limit-exceeded errors. Passing ``None`` disables 

438 job retry. Not all jobs can be retried. 

439 page_size (Optional[int]): 

440 The maximum number of rows in each page of results from this 

441 request. Non-positive values are ignored. 

442 max_results (Optional[int]): 

443 The maximum total number of rows from this request. 

444 

445 Returns: 

446 google.cloud.bigquery.table.RowIterator: 

447 Iterator of row data 

448 :class:`~google.cloud.bigquery.table.Row`-s. During each 

449 page, the iterator will have the ``total_rows`` attribute 

450 set, which counts the total number of rows **in the result 

451 set** (this is distinct from the total number of rows in the 

452 current page: ``iterator.page.num_items``). 

453 

454 If the query is a special query that produces no results, e.g. 

455 a DDL query, an ``_EmptyRowIterator`` instance is returned. 

456 

457 Raises: 

458 TypeError: 

459 If ``job_config`` is not an instance of 

460 :class:`~google.cloud.bigquery.job.QueryJobConfig` 

461 class. 

462 """ 

463 request_body = _to_query_request( 

464 query=query, job_config=job_config, location=location, timeout=api_timeout 

465 ) 

466 

467 # Some API parameters aren't supported by the jobs.query API. In these 

468 # cases, fallback to a jobs.insert call. 

469 if not _supported_by_jobs_query(request_body): 

470 return _wait_or_cancel( 

471 query_jobs_insert( 

472 client=client, 

473 query=query, 

474 job_id=None, 

475 job_id_prefix=None, 

476 job_config=job_config, 

477 location=location, 

478 project=project, 

479 retry=retry, 

480 timeout=api_timeout, 

481 job_retry=job_retry, 

482 ), 

483 api_timeout=api_timeout, 

484 wait_timeout=wait_timeout, 

485 retry=retry, 

486 page_size=page_size, 

487 max_results=max_results, 

488 ) 

489 

490 path = _to_query_path(project) 

491 

492 if page_size is not None and max_results is not None: 

493 request_body["maxResults"] = min(page_size, max_results) 

494 elif page_size is not None or max_results is not None: 

495 request_body["maxResults"] = page_size or max_results 

496 if client.default_job_creation_mode: 

497 request_body["jobCreationMode"] = client.default_job_creation_mode 

498 

499 def do_query(): 

500 request_body["requestId"] = make_job_id() 

501 span_attributes = {"path": path} 

502 

503 # For easier testing, handle the retries ourselves. 

504 if retry is not None: 

505 response = retry(client._call_api)( 

506 retry=None, # We're calling the retry decorator ourselves. 

507 span_name="BigQuery.query", 

508 span_attributes=span_attributes, 

509 method="POST", 

510 path=path, 

511 data=request_body, 

512 timeout=api_timeout, 

513 ) 

514 else: 

515 response = client._call_api( 

516 retry=None, 

517 span_name="BigQuery.query", 

518 span_attributes=span_attributes, 

519 method="POST", 

520 path=path, 

521 data=request_body, 

522 timeout=api_timeout, 

523 ) 

524 

525 # Even if we run with JOB_CREATION_OPTIONAL, if there are more pages 

526 # to fetch, there will be a job ID for jobs.getQueryResults. 

527 query_results = google.cloud.bigquery.query._QueryResults.from_api_repr( 

528 response 

529 ) 

530 page_token = query_results.page_token 

531 more_pages = page_token is not None 

532 

533 if more_pages or not query_results.complete: 

534 # TODO(swast): Avoid a call to jobs.get in some cases (few 

535 # remaining pages) by waiting for the query to finish and calling 

536 # client._list_rows_from_query_results directly. Need to update 

537 # RowIterator to fetch destination table via the job ID if needed. 

538 return _wait_or_cancel( 

539 _to_query_job(client, query, job_config, response), 

540 api_timeout=api_timeout, 

541 wait_timeout=wait_timeout, 

542 retry=retry, 

543 page_size=page_size, 

544 max_results=max_results, 

545 ) 

546 

547 return table.RowIterator( 

548 client=client, 

549 api_request=functools.partial(client._call_api, retry, timeout=api_timeout), 

550 path=None, 

551 schema=query_results.schema, 

552 max_results=max_results, 

553 page_size=page_size, 

554 total_rows=query_results.total_rows, 

555 first_page_response=response, 

556 location=query_results.location, 

557 job_id=query_results.job_id, 

558 query_id=query_results.query_id, 

559 project=query_results.project, 

560 num_dml_affected_rows=query_results.num_dml_affected_rows, 

561 query=query, 

562 total_bytes_processed=query_results.total_bytes_processed, 

563 ) 

564 

565 if job_retry is not None: 

566 return job_retry(do_query)() 

567 else: 

568 return do_query() 

569 

570 

571def _supported_by_jobs_query(request_body: Dict[str, Any]) -> bool: 

572 """True if jobs.query can be used. False if jobs.insert is needed.""" 

573 request_keys = frozenset(request_body.keys()) 

574 

575 # Per issue: https://github.com/googleapis/python-bigquery/issues/1867 

576 # use an allowlist here instead of a denylist because the backend API allows 

577 # unsupported parameters without any warning or failure. Instead, keep this 

578 # set in sync with those in QueryRequest: 

579 # https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/query#QueryRequest 

580 keys_allowlist = { 

581 "kind", 

582 "query", 

583 "maxResults", 

584 "defaultDataset", 

585 "timeoutMs", 

586 "dryRun", 

587 "preserveNulls", 

588 "useQueryCache", 

589 "useLegacySql", 

590 "parameterMode", 

591 "queryParameters", 

592 "location", 

593 "formatOptions", 

594 "connectionProperties", 

595 "labels", 

596 "maximumBytesBilled", 

597 "requestId", 

598 "createSession", 

599 "writeIncrementalResults", 

600 } 

601 

602 unsupported_keys = request_keys - keys_allowlist 

603 return len(unsupported_keys) == 0 

604 

605 

606def _wait_or_cancel( 

607 job: job.QueryJob, 

608 api_timeout: Optional[float], 

609 wait_timeout: Optional[Union[object, float]], 

610 retry: Optional[retries.Retry], 

611 page_size: Optional[int], 

612 max_results: Optional[int], 

613) -> table.RowIterator: 

614 """Wait for a job to complete and return the results. 

615 

616 If we can't return the results within the ``wait_timeout``, try to cancel 

617 the job. 

618 """ 

619 try: 

620 return job.result( 

621 page_size=page_size, 

622 max_results=max_results, 

623 retry=retry, 

624 timeout=wait_timeout, 

625 ) 

626 except Exception: 

627 # Attempt to cancel the job since we can't return the results. 

628 try: 

629 job.cancel(retry=retry, timeout=api_timeout) 

630 except Exception: 

631 # Don't eat the original exception if cancel fails. 

632 pass 

633 raise