Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/cloud/bigquery/_job_helpers.py: 30%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

217 statements  

1# Copyright 2021 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# https://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Helpers for interacting with the job REST APIs from the client. 

16 

17For queries, there are three cases to consider: 

18 

191. jobs.insert: This always returns a job resource. 

202. jobs.query, jobCreationMode=JOB_CREATION_REQUIRED: 

21 This sometimes can return the results inline, but always includes a job ID. 

223. jobs.query, jobCreationMode=JOB_CREATION_OPTIONAL: 

23 This sometimes doesn't create a job at all, instead returning the results. 

24 For better debugging, an auto-generated query ID is included in the 

25 response. 

26 

27Client.query() calls either (1) or (2), depending on what the user provides 

28for the api_method parameter. query() always returns a QueryJob object, which 

29can retry the query when the query job fails for a retriable reason. 

30 

31Client.query_and_wait() calls (3). This returns a RowIterator that may wrap 

32local results from the response or may wrap a query job containing multiple 

33pages of results. Even though query_and_wait() waits for the job to complete, 

34we still need a separate job_retry object because there are different 

35predicates where it is safe to generate a new query ID. 

36""" 

37 

38from __future__ import annotations 

39 

40import copy 

41import dataclasses 

42import datetime 

43import functools 

44import uuid 

45import textwrap 

46from typing import Any, Callable, Dict, Optional, TYPE_CHECKING, Union 

47import warnings 

48 

49import google.api_core.exceptions as core_exceptions 

50from google.api_core import retry as retries 

51 

52from google.cloud.bigquery import job 

53import google.cloud.bigquery.job.query 

54import google.cloud.bigquery.query 

55from google.cloud.bigquery import table 

56import google.cloud.bigquery.retry 

57from google.cloud.bigquery.retry import POLLING_DEFAULT_VALUE 

58 

59# Avoid circular imports 

60if TYPE_CHECKING: # pragma: NO COVER 

61 from google.cloud.bigquery.client import Client 

62 

63 

64# The purpose of _TIMEOUT_BUFFER_MILLIS is to allow the server-side timeout to 

65# happen before the client-side timeout. This is not strictly necessary, as the 

66# client retries client-side timeouts, but the hope by making the server-side 

67# timeout slightly shorter is that it can save the server from some unncessary 

68# processing time. 

69# 

70# 250 milliseconds is chosen arbitrarily, though should be about the right 

71# order of magnitude for network latency and switching delays. It is about the 

72# amount of time for light to circumnavigate the world twice. 

73_TIMEOUT_BUFFER_MILLIS = 250 

74 

75 

76def make_job_id(job_id: Optional[str] = None, prefix: Optional[str] = None) -> str: 

77 """Construct an ID for a new job. 

78 

79 Args: 

80 job_id: the user-provided job ID. 

81 prefix: the user-provided prefix for a job ID. 

82 

83 Returns: 

84 str: A job ID 

85 """ 

86 if job_id is not None: 

87 return job_id 

88 elif prefix is not None: 

89 return str(prefix) + str(uuid.uuid4()) 

90 else: 

91 return str(uuid.uuid4()) 

92 

93 

94def job_config_with_defaults( 

95 job_config: Optional[job.QueryJobConfig], 

96 default_job_config: Optional[job.QueryJobConfig], 

97) -> Optional[job.QueryJobConfig]: 

98 """Create a copy of `job_config`, replacing unset values with those from 

99 `default_job_config`. 

100 """ 

101 if job_config is None: 

102 return default_job_config 

103 

104 if default_job_config is None: 

105 return job_config 

106 

107 # Both job_config and default_job_config are not None, so make a copy of 

108 # job_config merged with default_job_config. Anything already explicitly 

109 # set on job_config should not be replaced. 

110 return job_config._fill_from_default(default_job_config) 

111 

112 

113def query_jobs_insert( 

114 client: "Client", 

115 query: str, 

116 job_config: Optional[job.QueryJobConfig], 

117 job_id: Optional[str], 

118 job_id_prefix: Optional[str], 

119 location: Optional[str], 

120 project: str, 

121 retry: Optional[retries.Retry], 

122 timeout: Optional[float], 

123 job_retry: Optional[retries.Retry], 

124 *, 

125 callback: Callable = lambda _: None, 

126) -> job.QueryJob: 

127 """Initiate a query using jobs.insert. 

128 

129 See: https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/insert 

130 

131 Args: 

132 callback (Callable): 

133 A callback function used by bigframes to report query progress. 

134 """ 

135 job_id_given = job_id is not None 

136 job_id_save = job_id 

137 job_config_save = job_config 

138 query_sent_factory = QuerySentEventFactory() 

139 

140 def do_query(): 

141 # Make a copy now, so that original doesn't get changed by the process 

142 # below and to facilitate retry 

143 job_config = copy.deepcopy(job_config_save) 

144 

145 job_id = make_job_id(job_id_save, job_id_prefix) 

146 job_ref = job._JobReference(job_id, project=project, location=location) 

147 query_job = job.QueryJob(job_ref, query, client=client, job_config=job_config) 

148 

149 try: 

150 query_job._begin(retry=retry, timeout=timeout) 

151 if job_config is not None and not job_config.dry_run: 

152 callback( 

153 query_sent_factory( 

154 query=query, 

155 billing_project=query_job.project, 

156 location=query_job.location, 

157 job_id=query_job.job_id, 

158 request_id=None, 

159 ) 

160 ) 

161 except core_exceptions.Conflict as create_exc: 

162 # The thought is if someone is providing their own job IDs and they get 

163 # their job ID generation wrong, this could end up returning results for 

164 # the wrong query. We thus only try to recover if job ID was not given. 

165 if job_id_given: 

166 raise create_exc 

167 

168 try: 

169 # Sometimes we get a 404 after a Conflict. In this case, we 

170 # have pretty high confidence that by retrying the 404, we'll 

171 # (hopefully) eventually recover the job. 

172 # https://github.com/googleapis/python-bigquery/issues/2134 

173 # 

174 # Allow users who want to completely disable retries to 

175 # continue to do so by setting retry to None. 

176 get_job_retry = retry 

177 if retry is not None: 

178 # TODO(tswast): Amend the user's retry object with allowing 

179 # 404 to retry when there's a public way to do so. 

180 # https://github.com/googleapis/python-api-core/issues/796 

181 get_job_retry = ( 

182 google.cloud.bigquery.retry._DEFAULT_GET_JOB_CONFLICT_RETRY 

183 ) 

184 

185 query_job = client.get_job( 

186 job_id, 

187 project=project, 

188 location=location, 

189 retry=get_job_retry, 

190 timeout=google.cloud.bigquery.retry.DEFAULT_GET_JOB_TIMEOUT, 

191 ) 

192 except core_exceptions.GoogleAPIError: # (includes RetryError) 

193 raise 

194 else: 

195 return query_job 

196 else: 

197 return query_job 

198 

199 # Allow users who want to completely disable retries to 

200 # continue to do so by setting job_retry to None. 

201 if job_retry is not None: 

202 do_query = google.cloud.bigquery.retry._DEFAULT_QUERY_JOB_INSERT_RETRY(do_query) 

203 

204 future = do_query() 

205 

206 # The future might be in a failed state now, but if it's 

207 # unrecoverable, we'll find out when we ask for it's result, at which 

208 # point, we may retry. 

209 if not job_id_given: 

210 future._retry_do_query = do_query # in case we have to retry later 

211 future._job_retry = job_retry 

212 

213 return future 

214 

215 

216def _validate_job_config(request_body: Dict[str, Any], invalid_key: str): 

217 """Catch common mistakes, such as passing in a *JobConfig object of the 

218 wrong type. 

219 """ 

220 if invalid_key in request_body: 

221 raise ValueError(f"got unexpected key {repr(invalid_key)} in job_config") 

222 

223 

224def validate_job_retry(job_id: Optional[str], job_retry: Optional[retries.Retry]): 

225 """Catch common mistakes, such as setting a job_id and job_retry at the same 

226 time. 

227 """ 

228 if job_id is not None and job_retry is not None: 

229 # TODO(tswast): To avoid breaking changes but still allow a default 

230 # query job retry, we currently only raise if they explicitly set a 

231 # job_retry other than the default. In a future version, we may want to 

232 # avoid this check for DEFAULT_JOB_RETRY and always raise. 

233 if job_retry is not google.cloud.bigquery.retry.DEFAULT_JOB_RETRY: 

234 raise TypeError( 

235 textwrap.dedent( 

236 """ 

237 `job_retry` was provided, but the returned job is 

238 not retryable, because a custom `job_id` was 

239 provided. To customize the job ID and allow for job 

240 retries, set job_id_prefix, instead. 

241 """ 

242 ).strip() 

243 ) 

244 else: 

245 warnings.warn( 

246 textwrap.dedent( 

247 """ 

248 job_retry must be explicitly set to None if job_id is set. 

249 BigQuery cannot retry a failed job by using the exact 

250 same ID. Setting job_id without explicitly disabling 

251 job_retry will raise an error in the future. To avoid this 

252 warning, either use job_id_prefix instead (preferred) or 

253 set job_retry=None. 

254 """ 

255 ).strip(), 

256 category=FutureWarning, 

257 # user code -> client.query / client.query_and_wait -> validate_job_retry 

258 stacklevel=3, 

259 ) 

260 

261 

262def _to_query_request( 

263 job_config: Optional[job.QueryJobConfig] = None, 

264 *, 

265 query: str, 

266 location: Optional[str] = None, 

267 timeout: Optional[float] = None, 

268) -> Dict[str, Any]: 

269 """Transform from Job resource to QueryRequest resource. 

270 

271 Most of the keys in job.configuration.query are in common with 

272 QueryRequest. If any configuration property is set that is not available in 

273 jobs.query, it will result in a server-side error. 

274 """ 

275 request_body = copy.copy(job_config.to_api_repr()) if job_config else {} 

276 

277 _validate_job_config(request_body, job.CopyJob._JOB_TYPE) 

278 _validate_job_config(request_body, job.ExtractJob._JOB_TYPE) 

279 _validate_job_config(request_body, job.LoadJob._JOB_TYPE) 

280 

281 # Move query.* properties to top-level. 

282 query_config_resource = request_body.pop("query", {}) 

283 request_body.update(query_config_resource) 

284 

285 # Default to standard SQL. 

286 request_body.setdefault("useLegacySql", False) 

287 

288 # Since jobs.query can return results, ensure we use the lossless timestamp 

289 # format. See: https://github.com/googleapis/python-bigquery/issues/395 

290 request_body.setdefault("formatOptions", {}) 

291 request_body["formatOptions"]["useInt64Timestamp"] = True # type: ignore 

292 

293 if timeout is not None: 

294 # Subtract a buffer for context switching, network latency, etc. 

295 request_body["timeoutMs"] = max(0, int(1000 * timeout) - _TIMEOUT_BUFFER_MILLIS) 

296 

297 if location is not None: 

298 request_body["location"] = location 

299 

300 request_body["query"] = query 

301 

302 return request_body 

303 

304 

305def _to_query_job( 

306 client: "Client", 

307 query: str, 

308 request_config: Optional[job.QueryJobConfig], 

309 query_response: Dict[str, Any], 

310) -> job.QueryJob: 

311 job_ref_resource = query_response["jobReference"] 

312 job_ref = job._JobReference._from_api_repr(job_ref_resource) 

313 query_job = job.QueryJob(job_ref, query, client=client) 

314 query_job._properties.setdefault("configuration", {}) 

315 

316 # Not all relevant properties are in the jobs.query response. Populate some 

317 # expected properties based on the job configuration. 

318 if request_config is not None: 

319 query_job._properties["configuration"].update(request_config.to_api_repr()) 

320 

321 query_job._properties["configuration"].setdefault("query", {}) 

322 query_job._properties["configuration"]["query"]["query"] = query 

323 query_job._properties["configuration"]["query"].setdefault("useLegacySql", False) 

324 

325 query_job._properties.setdefault("statistics", {}) 

326 query_job._properties["statistics"].setdefault("query", {}) 

327 query_job._properties["statistics"]["query"]["cacheHit"] = query_response.get( 

328 "cacheHit" 

329 ) 

330 query_job._properties["statistics"]["query"]["schema"] = query_response.get( 

331 "schema" 

332 ) 

333 query_job._properties["statistics"]["query"][ 

334 "totalBytesProcessed" 

335 ] = query_response.get("totalBytesProcessed") 

336 

337 # Set errors if any were encountered. 

338 query_job._properties.setdefault("status", {}) 

339 if "errors" in query_response: 

340 # Set errors but not errorResult. If there was an error that failed 

341 # the job, jobs.query behaves like jobs.getQueryResults and returns a 

342 # non-success HTTP status code. 

343 errors = query_response["errors"] 

344 query_job._properties["status"]["errors"] = errors 

345 

346 # Avoid an extra call to `getQueryResults` if the query has finished. 

347 job_complete = query_response.get("jobComplete") 

348 if job_complete: 

349 query_job._query_results = google.cloud.bigquery.query._QueryResults( 

350 query_response 

351 ) 

352 

353 # We want job.result() to refresh the job state, so the conversion is 

354 # always "PENDING", even if the job is finished. 

355 query_job._properties["status"]["state"] = "PENDING" 

356 

357 return query_job 

358 

359 

360def _to_query_path(project: str) -> str: 

361 return f"/projects/{project}/queries" 

362 

363 

364def query_jobs_query( 

365 client: "Client", 

366 query: str, 

367 job_config: Optional[job.QueryJobConfig], 

368 location: Optional[str], 

369 project: str, 

370 retry: retries.Retry, 

371 timeout: Optional[float], 

372 job_retry: Optional[retries.Retry], 

373) -> job.QueryJob: 

374 """Initiate a query using jobs.query with jobCreationMode=JOB_CREATION_REQUIRED. 

375 

376 See: https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/query 

377 """ 

378 path = _to_query_path(project) 

379 request_body = _to_query_request( 

380 query=query, job_config=job_config, location=location, timeout=timeout 

381 ) 

382 

383 def do_query(): 

384 request_body["requestId"] = make_job_id() 

385 span_attributes = {"path": path} 

386 api_response = client._call_api( 

387 retry, 

388 span_name="BigQuery.query", 

389 span_attributes=span_attributes, 

390 method="POST", 

391 path=path, 

392 data=request_body, 

393 timeout=timeout, 

394 ) 

395 return _to_query_job(client, query, job_config, api_response) 

396 

397 future = do_query() 

398 

399 # The future might be in a failed state now, but if it's 

400 # unrecoverable, we'll find out when we ask for it's result, at which 

401 # point, we may retry. 

402 future._retry_do_query = do_query # in case we have to retry later 

403 future._job_retry = job_retry 

404 

405 return future 

406 

407 

408def query_and_wait( 

409 client: "Client", 

410 query: str, 

411 *, 

412 job_config: Optional[job.QueryJobConfig], 

413 location: Optional[str], 

414 project: str, 

415 api_timeout: Optional[float] = None, 

416 wait_timeout: Optional[Union[float, object]] = POLLING_DEFAULT_VALUE, 

417 retry: Optional[retries.Retry], 

418 job_retry: Optional[retries.Retry], 

419 page_size: Optional[int] = None, 

420 max_results: Optional[int] = None, 

421 callback: Callable = lambda _: None, 

422) -> table.RowIterator: 

423 """Run the query, wait for it to finish, and return the results. 

424 

425 

426 Args: 

427 client: 

428 BigQuery client to make API calls. 

429 query (str): 

430 SQL query to be executed. Defaults to the standard SQL 

431 dialect. Use the ``job_config`` parameter to change dialects. 

432 job_config (Optional[google.cloud.bigquery.job.QueryJobConfig]): 

433 Extra configuration options for the job. 

434 To override any options that were previously set in 

435 the ``default_query_job_config`` given to the 

436 ``Client`` constructor, manually set those options to ``None``, 

437 or whatever value is preferred. 

438 location (Optional[str]): 

439 Location where to run the job. Must match the location of the 

440 table used in the query as well as the destination table. 

441 project (str): 

442 Project ID of the project of where to run the job. 

443 api_timeout (Optional[float]): 

444 The number of seconds to wait for the underlying HTTP transport 

445 before using ``retry``. 

446 wait_timeout (Optional[Union[float, object]]): 

447 The number of seconds to wait for the query to finish. If the 

448 query doesn't finish before this timeout, the client attempts 

449 to cancel the query. If unset, the underlying Client.get_job() API 

450 call has timeout, but we still wait indefinitely for the job to 

451 finish. 

452 retry (Optional[google.api_core.retry.Retry]): 

453 How to retry the RPC. This only applies to making RPC 

454 calls. It isn't used to retry failed jobs. This has 

455 a reasonable default that should only be overridden 

456 with care. 

457 job_retry (Optional[google.api_core.retry.Retry]): 

458 How to retry failed jobs. The default retries 

459 rate-limit-exceeded errors. Passing ``None`` disables 

460 job retry. Not all jobs can be retried. 

461 page_size (Optional[int]): 

462 The maximum number of rows in each page of results from this 

463 request. Non-positive values are ignored. 

464 max_results (Optional[int]): 

465 The maximum total number of rows from this request. 

466 callback (Callable): 

467 A callback function used by bigframes to report query progress. 

468 

469 Returns: 

470 google.cloud.bigquery.table.RowIterator: 

471 Iterator of row data 

472 :class:`~google.cloud.bigquery.table.Row`-s. During each 

473 page, the iterator will have the ``total_rows`` attribute 

474 set, which counts the total number of rows **in the result 

475 set** (this is distinct from the total number of rows in the 

476 current page: ``iterator.page.num_items``). 

477 

478 If the query is a special query that produces no results, e.g. 

479 a DDL query, an ``_EmptyRowIterator`` instance is returned. 

480 

481 Raises: 

482 TypeError: 

483 If ``job_config`` is not an instance of 

484 :class:`~google.cloud.bigquery.job.QueryJobConfig` 

485 class. 

486 """ 

487 request_body = _to_query_request( 

488 query=query, job_config=job_config, location=location, timeout=api_timeout 

489 ) 

490 

491 # Some API parameters aren't supported by the jobs.query API. In these 

492 # cases, fallback to a jobs.insert call. 

493 if not _supported_by_jobs_query(request_body): 

494 return _wait_or_cancel( 

495 query_jobs_insert( 

496 client=client, 

497 query=query, 

498 job_id=None, 

499 job_id_prefix=None, 

500 job_config=job_config, 

501 location=location, 

502 project=project, 

503 retry=retry, 

504 timeout=api_timeout, 

505 job_retry=job_retry, 

506 callback=callback, 

507 ), 

508 api_timeout=api_timeout, 

509 wait_timeout=wait_timeout, 

510 retry=retry, 

511 page_size=page_size, 

512 max_results=max_results, 

513 callback=callback, 

514 ) 

515 

516 path = _to_query_path(project) 

517 

518 if page_size is not None and max_results is not None: 

519 request_body["maxResults"] = min(page_size, max_results) 

520 elif page_size is not None or max_results is not None: 

521 request_body["maxResults"] = page_size or max_results 

522 if client.default_job_creation_mode: 

523 request_body["jobCreationMode"] = client.default_job_creation_mode 

524 

525 query_sent_factory = QuerySentEventFactory() 

526 

527 def do_query(): 

528 request_id = make_job_id() 

529 request_body["requestId"] = request_id 

530 span_attributes = {"path": path} 

531 

532 if "dryRun" not in request_body: 

533 callback( 

534 query_sent_factory( 

535 query=query, 

536 billing_project=project, 

537 location=location, 

538 job_id=None, 

539 request_id=request_id, 

540 ) 

541 ) 

542 

543 # For easier testing, handle the retries ourselves. 

544 if retry is not None: 

545 response = retry(client._call_api)( 

546 retry=None, # We're calling the retry decorator ourselves. 

547 span_name="BigQuery.query", 

548 span_attributes=span_attributes, 

549 method="POST", 

550 path=path, 

551 data=request_body, 

552 timeout=api_timeout, 

553 ) 

554 else: 

555 response = client._call_api( 

556 retry=None, 

557 span_name="BigQuery.query", 

558 span_attributes=span_attributes, 

559 method="POST", 

560 path=path, 

561 data=request_body, 

562 timeout=api_timeout, 

563 ) 

564 

565 # Even if we run with JOB_CREATION_OPTIONAL, if there are more pages 

566 # to fetch, there will be a job ID for jobs.getQueryResults. 

567 query_results = google.cloud.bigquery.query._QueryResults.from_api_repr( 

568 response 

569 ) 

570 page_token = query_results.page_token 

571 more_pages = page_token is not None 

572 

573 if more_pages or not query_results.complete: 

574 # TODO(swast): Avoid a call to jobs.get in some cases (few 

575 # remaining pages) by waiting for the query to finish and calling 

576 # client._list_rows_from_query_results directly. Need to update 

577 # RowIterator to fetch destination table via the job ID if needed. 

578 return _wait_or_cancel( 

579 _to_query_job(client, query, job_config, response), 

580 api_timeout=api_timeout, 

581 wait_timeout=wait_timeout, 

582 retry=retry, 

583 page_size=page_size, 

584 max_results=max_results, 

585 callback=callback, 

586 ) 

587 

588 if "dryRun" not in request_body: 

589 callback( 

590 QueryFinishedEvent( 

591 billing_project=project, 

592 location=query_results.location, 

593 query_id=query_results.query_id, 

594 job_id=query_results.job_id, 

595 total_rows=query_results.total_rows, 

596 total_bytes_processed=query_results.total_bytes_processed, 

597 slot_millis=query_results.slot_millis, 

598 destination=None, 

599 created=query_results.created, 

600 started=query_results.started, 

601 ended=query_results.ended, 

602 ) 

603 ) 

604 return table.RowIterator( 

605 client=client, 

606 api_request=functools.partial(client._call_api, retry, timeout=api_timeout), 

607 path=None, 

608 schema=query_results.schema, 

609 max_results=max_results, 

610 page_size=page_size, 

611 total_rows=query_results.total_rows, 

612 first_page_response=response, 

613 location=query_results.location, 

614 job_id=query_results.job_id, 

615 query_id=query_results.query_id, 

616 project=query_results.project, 

617 num_dml_affected_rows=query_results.num_dml_affected_rows, 

618 query=query, 

619 total_bytes_processed=query_results.total_bytes_processed, 

620 slot_millis=query_results.slot_millis, 

621 created=query_results.created, 

622 started=query_results.started, 

623 ended=query_results.ended, 

624 ) 

625 

626 if job_retry is not None: 

627 return job_retry(do_query)() 

628 else: 

629 return do_query() 

630 

631 

632def _supported_by_jobs_query(request_body: Dict[str, Any]) -> bool: 

633 """True if jobs.query can be used. False if jobs.insert is needed.""" 

634 request_keys = frozenset(request_body.keys()) 

635 

636 # Per issue: https://github.com/googleapis/python-bigquery/issues/1867 

637 # use an allowlist here instead of a denylist because the backend API allows 

638 # unsupported parameters without any warning or failure. Instead, keep this 

639 # set in sync with those in QueryRequest: 

640 # https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/query#QueryRequest 

641 keys_allowlist = { 

642 "kind", 

643 "query", 

644 "maxResults", 

645 "defaultDataset", 

646 "timeoutMs", 

647 "dryRun", 

648 "preserveNulls", 

649 "useQueryCache", 

650 "useLegacySql", 

651 "parameterMode", 

652 "queryParameters", 

653 "location", 

654 "formatOptions", 

655 "connectionProperties", 

656 "labels", 

657 "maximumBytesBilled", 

658 "requestId", 

659 "createSession", 

660 "writeIncrementalResults", 

661 } 

662 

663 unsupported_keys = request_keys - keys_allowlist 

664 return len(unsupported_keys) == 0 

665 

666 

667def _wait_or_cancel( 

668 job: job.QueryJob, 

669 api_timeout: Optional[float], 

670 wait_timeout: Optional[Union[object, float]], 

671 retry: Optional[retries.Retry], 

672 page_size: Optional[int], 

673 max_results: Optional[int], 

674 *, 

675 callback: Callable = lambda _: None, 

676) -> table.RowIterator: 

677 """Wait for a job to complete and return the results. 

678 

679 If we can't return the results within the ``wait_timeout``, try to cancel 

680 the job. 

681 """ 

682 try: 

683 if not job.dry_run: 

684 callback( 

685 QueryReceivedEvent( 

686 billing_project=job.project, 

687 location=job.location, 

688 job_id=job.job_id, 

689 statement_type=job.statement_type, 

690 state=job.state, 

691 query_plan=job.query_plan, 

692 created=job.created, 

693 started=job.started, 

694 ended=job.ended, 

695 ) 

696 ) 

697 query_results = job.result( 

698 page_size=page_size, 

699 max_results=max_results, 

700 retry=retry, 

701 timeout=wait_timeout, 

702 ) 

703 if not job.dry_run: 

704 callback( 

705 QueryFinishedEvent( 

706 billing_project=job.project, 

707 location=query_results.location, 

708 query_id=query_results.query_id, 

709 job_id=query_results.job_id, 

710 total_rows=query_results.total_rows, 

711 total_bytes_processed=query_results.total_bytes_processed, 

712 slot_millis=query_results.slot_millis, 

713 destination=job.destination, 

714 created=job.created, 

715 started=job.started, 

716 ended=job.ended, 

717 ) 

718 ) 

719 return query_results 

720 except Exception: 

721 # Attempt to cancel the job since we can't return the results. 

722 try: 

723 job.cancel(retry=retry, timeout=api_timeout) 

724 except Exception: 

725 # Don't eat the original exception if cancel fails. 

726 pass 

727 raise 

728 

729 

730@dataclasses.dataclass(frozen=True) 

731class QueryFinishedEvent: 

732 """Query finished successfully.""" 

733 

734 billing_project: Optional[str] 

735 location: Optional[str] 

736 query_id: Optional[str] 

737 job_id: Optional[str] 

738 destination: Optional[table.TableReference] 

739 total_rows: Optional[int] 

740 total_bytes_processed: Optional[int] 

741 slot_millis: Optional[int] 

742 created: Optional[datetime.datetime] 

743 started: Optional[datetime.datetime] 

744 ended: Optional[datetime.datetime] 

745 

746 

747@dataclasses.dataclass(frozen=True) 

748class QueryReceivedEvent: 

749 """Query received and acknowledged by the BigQuery API.""" 

750 

751 billing_project: Optional[str] 

752 location: Optional[str] 

753 job_id: Optional[str] 

754 statement_type: Optional[str] 

755 state: Optional[str] 

756 query_plan: Optional[list[google.cloud.bigquery.job.query.QueryPlanEntry]] 

757 created: Optional[datetime.datetime] 

758 started: Optional[datetime.datetime] 

759 ended: Optional[datetime.datetime] 

760 

761 

762@dataclasses.dataclass(frozen=True) 

763class QuerySentEvent: 

764 """Query sent to BigQuery.""" 

765 

766 query: str 

767 billing_project: Optional[str] 

768 location: Optional[str] 

769 job_id: Optional[str] 

770 request_id: Optional[str] 

771 

772 

773class QueryRetryEvent(QuerySentEvent): 

774 """Query sent another time because the previous attempt failed.""" 

775 

776 

777class QuerySentEventFactory: 

778 """Creates a QuerySentEvent first, then QueryRetryEvent after that.""" 

779 

780 def __init__(self): 

781 self._event_constructor = QuerySentEvent 

782 

783 def __call__(self, **kwargs): 

784 result = self._event_constructor(**kwargs) 

785 self._event_constructor = QueryRetryEvent 

786 return result