Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/cloud/bigquery/_job_helpers.py: 30%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

220 statements  

1# Copyright 2021 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# https://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Helpers for interacting with the job REST APIs from the client. 

16 

17For queries, there are three cases to consider: 

18 

191. jobs.insert: This always returns a job resource. 

202. jobs.query, jobCreationMode=JOB_CREATION_REQUIRED: 

21 This sometimes can return the results inline, but always includes a job ID. 

223. jobs.query, jobCreationMode=JOB_CREATION_OPTIONAL: 

23 This sometimes doesn't create a job at all, instead returning the results. 

24 For better debugging, an auto-generated query ID is included in the 

25 response. 

26 

27Client.query() calls either (1) or (2), depending on what the user provides 

28for the api_method parameter. query() always returns a QueryJob object, which 

29can retry the query when the query job fails for a retriable reason. 

30 

31Client.query_and_wait() calls (3). This returns a RowIterator that may wrap 

32local results from the response or may wrap a query job containing multiple 

33pages of results. Even though query_and_wait() waits for the job to complete, 

34we still need a separate job_retry object because there are different 

35predicates where it is safe to generate a new query ID. 

36""" 

37 

38from __future__ import annotations 

39 

40import copy 

41import dataclasses 

42import datetime 

43import functools 

44import uuid 

45import textwrap 

46from typing import Any, Callable, Dict, Optional, TYPE_CHECKING, Union 

47import warnings 

48 

49import google.api_core.exceptions as core_exceptions 

50from google.api_core import retry as retries 

51 

52from google.cloud.bigquery import enums 

53from google.cloud.bigquery import job 

54import google.cloud.bigquery.job.query 

55import google.cloud.bigquery.query 

56from google.cloud.bigquery import table 

57import google.cloud.bigquery.retry 

58from google.cloud.bigquery.retry import POLLING_DEFAULT_VALUE 

59 

60# Avoid circular imports 

61if TYPE_CHECKING: # pragma: NO COVER 

62 from google.cloud.bigquery.client import Client 

63 

64 

65# The purpose of _TIMEOUT_BUFFER_MILLIS is to allow the server-side timeout to 

66# happen before the client-side timeout. This is not strictly necessary, as the 

67# client retries client-side timeouts, but the hope by making the server-side 

68# timeout slightly shorter is that it can save the server from some unncessary 

69# processing time. 

70# 

71# 250 milliseconds is chosen arbitrarily, though should be about the right 

72# order of magnitude for network latency and switching delays. It is about the 

73# amount of time for light to circumnavigate the world twice. 

74_TIMEOUT_BUFFER_MILLIS = 250 

75 

76 

77def make_job_id(job_id: Optional[str] = None, prefix: Optional[str] = None) -> str: 

78 """Construct an ID for a new job. 

79 

80 Args: 

81 job_id: the user-provided job ID. 

82 prefix: the user-provided prefix for a job ID. 

83 

84 Returns: 

85 str: A job ID 

86 """ 

87 if job_id is not None: 

88 return job_id 

89 elif prefix is not None: 

90 return str(prefix) + str(uuid.uuid4()) 

91 else: 

92 return str(uuid.uuid4()) 

93 

94 

95def job_config_with_defaults( 

96 job_config: Optional[job.QueryJobConfig], 

97 default_job_config: Optional[job.QueryJobConfig], 

98) -> Optional[job.QueryJobConfig]: 

99 """Create a copy of `job_config`, replacing unset values with those from 

100 `default_job_config`. 

101 """ 

102 if job_config is None: 

103 return default_job_config 

104 

105 if default_job_config is None: 

106 return job_config 

107 

108 # Both job_config and default_job_config are not None, so make a copy of 

109 # job_config merged with default_job_config. Anything already explicitly 

110 # set on job_config should not be replaced. 

111 return job_config._fill_from_default(default_job_config) 

112 

113 

114def query_jobs_insert( 

115 client: "Client", 

116 query: str, 

117 job_config: Optional[job.QueryJobConfig], 

118 job_id: Optional[str], 

119 job_id_prefix: Optional[str], 

120 location: Optional[str], 

121 project: str, 

122 retry: Optional[retries.Retry], 

123 timeout: Optional[float], 

124 job_retry: Optional[retries.Retry], 

125 *, 

126 callback: Callable = lambda _: None, 

127) -> job.QueryJob: 

128 """Initiate a query using jobs.insert. 

129 

130 See: https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/insert 

131 

132 Args: 

133 callback (Callable): 

134 A callback function used by bigframes to report query progress. 

135 """ 

136 job_id_given = job_id is not None 

137 job_id_save = job_id 

138 job_config_save = job_config 

139 query_sent_factory = QuerySentEventFactory() 

140 

141 def do_query(): 

142 # Make a copy now, so that original doesn't get changed by the process 

143 # below and to facilitate retry 

144 job_config = copy.deepcopy(job_config_save) 

145 

146 job_id = make_job_id(job_id_save, job_id_prefix) 

147 job_ref = job._JobReference(job_id, project=project, location=location) 

148 query_job = job.QueryJob(job_ref, query, client=client, job_config=job_config) 

149 

150 try: 

151 query_job._begin(retry=retry, timeout=timeout) 

152 if job_config is not None and not job_config.dry_run: 

153 callback( 

154 query_sent_factory( 

155 query=query, 

156 billing_project=query_job.project, 

157 location=query_job.location, 

158 job_id=query_job.job_id, 

159 request_id=None, 

160 ) 

161 ) 

162 except core_exceptions.Conflict as create_exc: 

163 # The thought is if someone is providing their own job IDs and they get 

164 # their job ID generation wrong, this could end up returning results for 

165 # the wrong query. We thus only try to recover if job ID was not given. 

166 if job_id_given: 

167 raise create_exc 

168 

169 try: 

170 # Sometimes we get a 404 after a Conflict. In this case, we 

171 # have pretty high confidence that by retrying the 404, we'll 

172 # (hopefully) eventually recover the job. 

173 # https://github.com/googleapis/python-bigquery/issues/2134 

174 # 

175 # Allow users who want to completely disable retries to 

176 # continue to do so by setting retry to None. 

177 get_job_retry = retry 

178 if retry is not None: 

179 # TODO(tswast): Amend the user's retry object with allowing 

180 # 404 to retry when there's a public way to do so. 

181 # https://github.com/googleapis/python-api-core/issues/796 

182 get_job_retry = ( 

183 google.cloud.bigquery.retry._DEFAULT_GET_JOB_CONFLICT_RETRY 

184 ) 

185 

186 query_job = client.get_job( 

187 job_id, 

188 project=project, 

189 location=location, 

190 retry=get_job_retry, 

191 timeout=google.cloud.bigquery.retry.DEFAULT_GET_JOB_TIMEOUT, 

192 ) 

193 except core_exceptions.GoogleAPIError: # (includes RetryError) 

194 raise 

195 else: 

196 return query_job 

197 else: 

198 return query_job 

199 

200 # Allow users who want to completely disable retries to 

201 # continue to do so by setting job_retry to None. 

202 if job_retry is not None: 

203 do_query = google.cloud.bigquery.retry._DEFAULT_QUERY_JOB_INSERT_RETRY(do_query) 

204 

205 future = do_query() 

206 

207 # The future might be in a failed state now, but if it's 

208 # unrecoverable, we'll find out when we ask for it's result, at which 

209 # point, we may retry. 

210 if not job_id_given: 

211 future._retry_do_query = do_query # in case we have to retry later 

212 future._job_retry = job_retry 

213 

214 return future 

215 

216 

217def _validate_job_config(request_body: Dict[str, Any], invalid_key: str): 

218 """Catch common mistakes, such as passing in a *JobConfig object of the 

219 wrong type. 

220 """ 

221 if invalid_key in request_body: 

222 raise ValueError(f"got unexpected key {repr(invalid_key)} in job_config") 

223 

224 

225def validate_job_retry(job_id: Optional[str], job_retry: Optional[retries.Retry]): 

226 """Catch common mistakes, such as setting a job_id and job_retry at the same 

227 time. 

228 """ 

229 if job_id is not None and job_retry is not None: 

230 # TODO(tswast): To avoid breaking changes but still allow a default 

231 # query job retry, we currently only raise if they explicitly set a 

232 # job_retry other than the default. In a future version, we may want to 

233 # avoid this check for DEFAULT_JOB_RETRY and always raise. 

234 if job_retry is not google.cloud.bigquery.retry.DEFAULT_JOB_RETRY: 

235 raise TypeError( 

236 textwrap.dedent( 

237 """ 

238 `job_retry` was provided, but the returned job is 

239 not retryable, because a custom `job_id` was 

240 provided. To customize the job ID and allow for job 

241 retries, set job_id_prefix, instead. 

242 """ 

243 ).strip() 

244 ) 

245 else: 

246 warnings.warn( 

247 textwrap.dedent( 

248 """ 

249 job_retry must be explicitly set to None if job_id is set. 

250 BigQuery cannot retry a failed job by using the exact 

251 same ID. Setting job_id without explicitly disabling 

252 job_retry will raise an error in the future. To avoid this 

253 warning, either use job_id_prefix instead (preferred) or 

254 set job_retry=None. 

255 """ 

256 ).strip(), 

257 category=FutureWarning, 

258 # user code -> client.query / client.query_and_wait -> validate_job_retry 

259 stacklevel=3, 

260 ) 

261 

262 

263def _to_query_request( 

264 job_config: Optional[job.QueryJobConfig] = None, 

265 *, 

266 query: str, 

267 location: Optional[str] = None, 

268 timeout: Optional[float] = None, 

269 timestamp_precision: Optional[enums.TimestampPrecision] = None, 

270) -> Dict[str, Any]: 

271 """Transform from Job resource to QueryRequest resource. 

272 

273 Most of the keys in job.configuration.query are in common with 

274 QueryRequest. If any configuration property is set that is not available in 

275 jobs.query, it will result in a server-side error. 

276 """ 

277 request_body = copy.copy(job_config.to_api_repr()) if job_config else {} 

278 

279 _validate_job_config(request_body, job.CopyJob._JOB_TYPE) 

280 _validate_job_config(request_body, job.ExtractJob._JOB_TYPE) 

281 _validate_job_config(request_body, job.LoadJob._JOB_TYPE) 

282 

283 # Move query.* properties to top-level. 

284 query_config_resource = request_body.pop("query", {}) 

285 request_body.update(query_config_resource) 

286 

287 # Default to standard SQL. 

288 request_body.setdefault("useLegacySql", False) 

289 

290 request_body.setdefault("formatOptions", {}) 

291 

292 # Cannot specify both use_int64_timestamp and timestamp_output_format. 

293 if timestamp_precision == enums.TimestampPrecision.PICOSECOND: 

294 request_body["formatOptions"]["timestampOutputFormat"] = "ISO8601_STRING" # type: ignore 

295 else: 

296 # Since jobs.query can return results, ensure we use the lossless 

297 # timestamp format. See: https://github.com/googleapis/python-bigquery/issues/395 

298 request_body["formatOptions"]["useInt64Timestamp"] = True # type: ignore 

299 

300 if timeout is not None: 

301 # Subtract a buffer for context switching, network latency, etc. 

302 request_body["timeoutMs"] = max(0, int(1000 * timeout) - _TIMEOUT_BUFFER_MILLIS) 

303 

304 if location is not None: 

305 request_body["location"] = location 

306 

307 request_body["query"] = query 

308 

309 return request_body 

310 

311 

312def _to_query_job( 

313 client: "Client", 

314 query: str, 

315 request_config: Optional[job.QueryJobConfig], 

316 query_response: Dict[str, Any], 

317) -> job.QueryJob: 

318 job_ref_resource = query_response["jobReference"] 

319 job_ref = job._JobReference._from_api_repr(job_ref_resource) 

320 query_job = job.QueryJob(job_ref, query, client=client) 

321 query_job._properties.setdefault("configuration", {}) 

322 

323 # Not all relevant properties are in the jobs.query response. Populate some 

324 # expected properties based on the job configuration. 

325 if request_config is not None: 

326 query_job._properties["configuration"].update(request_config.to_api_repr()) 

327 

328 query_job._properties["configuration"].setdefault("query", {}) 

329 query_job._properties["configuration"]["query"]["query"] = query 

330 query_job._properties["configuration"]["query"].setdefault("useLegacySql", False) 

331 

332 query_job._properties.setdefault("statistics", {}) 

333 query_job._properties["statistics"].setdefault("query", {}) 

334 query_job._properties["statistics"]["query"]["cacheHit"] = query_response.get( 

335 "cacheHit" 

336 ) 

337 query_job._properties["statistics"]["query"]["schema"] = query_response.get( 

338 "schema" 

339 ) 

340 query_job._properties["statistics"]["query"][ 

341 "totalBytesProcessed" 

342 ] = query_response.get("totalBytesProcessed") 

343 

344 # Set errors if any were encountered. 

345 query_job._properties.setdefault("status", {}) 

346 if "errors" in query_response: 

347 # Set errors but not errorResult. If there was an error that failed 

348 # the job, jobs.query behaves like jobs.getQueryResults and returns a 

349 # non-success HTTP status code. 

350 errors = query_response["errors"] 

351 query_job._properties["status"]["errors"] = errors 

352 

353 # Avoid an extra call to `getQueryResults` if the query has finished. 

354 job_complete = query_response.get("jobComplete") 

355 if job_complete: 

356 query_job._query_results = google.cloud.bigquery.query._QueryResults( 

357 query_response 

358 ) 

359 

360 # We want job.result() to refresh the job state, so the conversion is 

361 # always "PENDING", even if the job is finished. 

362 query_job._properties["status"]["state"] = "PENDING" 

363 

364 return query_job 

365 

366 

367def _to_query_path(project: str) -> str: 

368 return f"/projects/{project}/queries" 

369 

370 

371def query_jobs_query( 

372 client: "Client", 

373 query: str, 

374 job_config: Optional[job.QueryJobConfig], 

375 location: Optional[str], 

376 project: str, 

377 retry: retries.Retry, 

378 timeout: Optional[float], 

379 job_retry: Optional[retries.Retry], 

380 timestamp_precision: Optional[enums.TimestampPrecision] = None, 

381) -> job.QueryJob: 

382 """Initiate a query using jobs.query with jobCreationMode=JOB_CREATION_REQUIRED. 

383 

384 See: https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/query 

385 """ 

386 path = _to_query_path(project) 

387 request_body = _to_query_request( 

388 query=query, 

389 job_config=job_config, 

390 location=location, 

391 timeout=timeout, 

392 timestamp_precision=timestamp_precision, 

393 ) 

394 

395 def do_query(): 

396 request_body["requestId"] = make_job_id() 

397 span_attributes = {"path": path} 

398 api_response = client._call_api( 

399 retry, 

400 span_name="BigQuery.query", 

401 span_attributes=span_attributes, 

402 method="POST", 

403 path=path, 

404 data=request_body, 

405 timeout=timeout, 

406 ) 

407 return _to_query_job(client, query, job_config, api_response) 

408 

409 future = do_query() 

410 

411 # The future might be in a failed state now, but if it's 

412 # unrecoverable, we'll find out when we ask for it's result, at which 

413 # point, we may retry. 

414 future._retry_do_query = do_query # in case we have to retry later 

415 future._job_retry = job_retry 

416 

417 return future 

418 

419 

420def query_and_wait( 

421 client: "Client", 

422 query: str, 

423 *, 

424 job_config: Optional[job.QueryJobConfig], 

425 location: Optional[str], 

426 project: str, 

427 api_timeout: Optional[float] = None, 

428 wait_timeout: Optional[Union[float, object]] = POLLING_DEFAULT_VALUE, 

429 retry: Optional[retries.Retry], 

430 job_retry: Optional[retries.Retry], 

431 page_size: Optional[int] = None, 

432 max_results: Optional[int] = None, 

433 callback: Callable = lambda _: None, 

434) -> table.RowIterator: 

435 """Run the query, wait for it to finish, and return the results. 

436 

437 

438 Args: 

439 client: 

440 BigQuery client to make API calls. 

441 query (str): 

442 SQL query to be executed. Defaults to the standard SQL 

443 dialect. Use the ``job_config`` parameter to change dialects. 

444 job_config (Optional[google.cloud.bigquery.job.QueryJobConfig]): 

445 Extra configuration options for the job. 

446 To override any options that were previously set in 

447 the ``default_query_job_config`` given to the 

448 ``Client`` constructor, manually set those options to ``None``, 

449 or whatever value is preferred. 

450 location (Optional[str]): 

451 Location where to run the job. Must match the location of the 

452 table used in the query as well as the destination table. 

453 project (str): 

454 Project ID of the project of where to run the job. 

455 api_timeout (Optional[float]): 

456 The number of seconds to wait for the underlying HTTP transport 

457 before using ``retry``. 

458 wait_timeout (Optional[Union[float, object]]): 

459 The number of seconds to wait for the query to finish. If the 

460 query doesn't finish before this timeout, the client attempts 

461 to cancel the query. If unset, the underlying Client.get_job() API 

462 call has timeout, but we still wait indefinitely for the job to 

463 finish. 

464 retry (Optional[google.api_core.retry.Retry]): 

465 How to retry the RPC. This only applies to making RPC 

466 calls. It isn't used to retry failed jobs. This has 

467 a reasonable default that should only be overridden 

468 with care. 

469 job_retry (Optional[google.api_core.retry.Retry]): 

470 How to retry failed jobs. The default retries 

471 rate-limit-exceeded errors. Passing ``None`` disables 

472 job retry. Not all jobs can be retried. 

473 page_size (Optional[int]): 

474 The maximum number of rows in each page of results from this 

475 request. Non-positive values are ignored. 

476 max_results (Optional[int]): 

477 The maximum total number of rows from this request. 

478 callback (Callable): 

479 A callback function used by bigframes to report query progress. 

480 

481 Returns: 

482 google.cloud.bigquery.table.RowIterator: 

483 Iterator of row data 

484 :class:`~google.cloud.bigquery.table.Row`-s. During each 

485 page, the iterator will have the ``total_rows`` attribute 

486 set, which counts the total number of rows **in the result 

487 set** (this is distinct from the total number of rows in the 

488 current page: ``iterator.page.num_items``). 

489 

490 If the query is a special query that produces no results, e.g. 

491 a DDL query, an ``_EmptyRowIterator`` instance is returned. 

492 

493 Raises: 

494 TypeError: 

495 If ``job_config`` is not an instance of 

496 :class:`~google.cloud.bigquery.job.QueryJobConfig` 

497 class. 

498 """ 

499 request_body = _to_query_request( 

500 query=query, job_config=job_config, location=location, timeout=api_timeout 

501 ) 

502 

503 # Some API parameters aren't supported by the jobs.query API. In these 

504 # cases, fallback to a jobs.insert call. 

505 if not _supported_by_jobs_query(request_body): 

506 return _wait_or_cancel( 

507 query_jobs_insert( 

508 client=client, 

509 query=query, 

510 job_id=None, 

511 job_id_prefix=None, 

512 job_config=job_config, 

513 location=location, 

514 project=project, 

515 retry=retry, 

516 timeout=api_timeout, 

517 job_retry=job_retry, 

518 callback=callback, 

519 ), 

520 api_timeout=api_timeout, 

521 wait_timeout=wait_timeout, 

522 retry=retry, 

523 page_size=page_size, 

524 max_results=max_results, 

525 callback=callback, 

526 ) 

527 

528 path = _to_query_path(project) 

529 

530 if page_size is not None and max_results is not None: 

531 request_body["maxResults"] = min(page_size, max_results) 

532 elif page_size is not None or max_results is not None: 

533 request_body["maxResults"] = page_size or max_results 

534 if client.default_job_creation_mode: 

535 request_body["jobCreationMode"] = client.default_job_creation_mode 

536 

537 query_sent_factory = QuerySentEventFactory() 

538 

539 def do_query(): 

540 request_id = make_job_id() 

541 request_body["requestId"] = request_id 

542 span_attributes = {"path": path} 

543 

544 if "dryRun" not in request_body: 

545 callback( 

546 query_sent_factory( 

547 query=query, 

548 billing_project=project, 

549 location=location, 

550 job_id=None, 

551 request_id=request_id, 

552 ) 

553 ) 

554 

555 # For easier testing, handle the retries ourselves. 

556 if retry is not None: 

557 response = retry(client._call_api)( 

558 retry=None, # We're calling the retry decorator ourselves. 

559 span_name="BigQuery.query", 

560 span_attributes=span_attributes, 

561 method="POST", 

562 path=path, 

563 data=request_body, 

564 timeout=api_timeout, 

565 ) 

566 else: 

567 response = client._call_api( 

568 retry=None, 

569 span_name="BigQuery.query", 

570 span_attributes=span_attributes, 

571 method="POST", 

572 path=path, 

573 data=request_body, 

574 timeout=api_timeout, 

575 ) 

576 

577 # Even if we run with JOB_CREATION_OPTIONAL, if there are more pages 

578 # to fetch, there will be a job ID for jobs.getQueryResults. 

579 query_results = google.cloud.bigquery.query._QueryResults.from_api_repr( 

580 response 

581 ) 

582 page_token = query_results.page_token 

583 more_pages = page_token is not None 

584 

585 if more_pages or not query_results.complete: 

586 # TODO(swast): Avoid a call to jobs.get in some cases (few 

587 # remaining pages) by waiting for the query to finish and calling 

588 # client._list_rows_from_query_results directly. Need to update 

589 # RowIterator to fetch destination table via the job ID if needed. 

590 return _wait_or_cancel( 

591 _to_query_job(client, query, job_config, response), 

592 api_timeout=api_timeout, 

593 wait_timeout=wait_timeout, 

594 retry=retry, 

595 page_size=page_size, 

596 max_results=max_results, 

597 callback=callback, 

598 ) 

599 

600 if "dryRun" not in request_body: 

601 callback( 

602 QueryFinishedEvent( 

603 billing_project=project, 

604 location=query_results.location, 

605 query_id=query_results.query_id, 

606 job_id=query_results.job_id, 

607 total_rows=query_results.total_rows, 

608 total_bytes_processed=query_results.total_bytes_processed, 

609 slot_millis=query_results.slot_millis, 

610 destination=None, 

611 created=query_results.created, 

612 started=query_results.started, 

613 ended=query_results.ended, 

614 ) 

615 ) 

616 return table.RowIterator( 

617 client=client, 

618 api_request=functools.partial(client._call_api, retry, timeout=api_timeout), 

619 path=None, 

620 schema=query_results.schema, 

621 max_results=max_results, 

622 page_size=page_size, 

623 total_rows=query_results.total_rows, 

624 first_page_response=response, 

625 location=query_results.location, 

626 job_id=query_results.job_id, 

627 query_id=query_results.query_id, 

628 project=query_results.project, 

629 num_dml_affected_rows=query_results.num_dml_affected_rows, 

630 query=query, 

631 total_bytes_processed=query_results.total_bytes_processed, 

632 slot_millis=query_results.slot_millis, 

633 created=query_results.created, 

634 started=query_results.started, 

635 ended=query_results.ended, 

636 ) 

637 

638 if job_retry is not None: 

639 return job_retry(do_query)() 

640 else: 

641 return do_query() 

642 

643 

644def _supported_by_jobs_query(request_body: Dict[str, Any]) -> bool: 

645 """True if jobs.query can be used. False if jobs.insert is needed.""" 

646 request_keys = frozenset(request_body.keys()) 

647 

648 # Per issue: https://github.com/googleapis/python-bigquery/issues/1867 

649 # use an allowlist here instead of a denylist because the backend API allows 

650 # unsupported parameters without any warning or failure. Instead, keep this 

651 # set in sync with those in QueryRequest: 

652 # https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/query#QueryRequest 

653 keys_allowlist = { 

654 "kind", 

655 "query", 

656 "maxResults", 

657 "defaultDataset", 

658 "timeoutMs", 

659 "dryRun", 

660 "preserveNulls", 

661 "useQueryCache", 

662 "useLegacySql", 

663 "parameterMode", 

664 "queryParameters", 

665 "location", 

666 "formatOptions", 

667 "connectionProperties", 

668 "labels", 

669 "maximumBytesBilled", 

670 "requestId", 

671 "createSession", 

672 "writeIncrementalResults", 

673 "jobTimeoutMs", 

674 "reservation", 

675 "maxSlots", 

676 } 

677 

678 unsupported_keys = request_keys - keys_allowlist 

679 return len(unsupported_keys) == 0 

680 

681 

682def _wait_or_cancel( 

683 job: job.QueryJob, 

684 api_timeout: Optional[float], 

685 wait_timeout: Optional[Union[object, float]], 

686 retry: Optional[retries.Retry], 

687 page_size: Optional[int], 

688 max_results: Optional[int], 

689 *, 

690 callback: Callable = lambda _: None, 

691) -> table.RowIterator: 

692 """Wait for a job to complete and return the results. 

693 

694 If we can't return the results within the ``wait_timeout``, try to cancel 

695 the job. 

696 """ 

697 try: 

698 if not job.dry_run: 

699 callback( 

700 QueryReceivedEvent( 

701 billing_project=job.project, 

702 location=job.location, 

703 job_id=job.job_id, 

704 statement_type=job.statement_type, 

705 state=job.state, 

706 query_plan=job.query_plan, 

707 created=job.created, 

708 started=job.started, 

709 ended=job.ended, 

710 ) 

711 ) 

712 query_results = job.result( 

713 page_size=page_size, 

714 max_results=max_results, 

715 retry=retry, 

716 timeout=wait_timeout, 

717 ) 

718 if not job.dry_run: 

719 callback( 

720 QueryFinishedEvent( 

721 billing_project=job.project, 

722 location=query_results.location, 

723 query_id=query_results.query_id, 

724 job_id=query_results.job_id, 

725 total_rows=query_results.total_rows, 

726 total_bytes_processed=query_results.total_bytes_processed, 

727 slot_millis=query_results.slot_millis, 

728 destination=job.destination, 

729 created=job.created, 

730 started=job.started, 

731 ended=job.ended, 

732 ) 

733 ) 

734 return query_results 

735 except Exception: 

736 # Attempt to cancel the job since we can't return the results. 

737 try: 

738 job.cancel(retry=retry, timeout=api_timeout) 

739 except Exception: 

740 # Don't eat the original exception if cancel fails. 

741 pass 

742 raise 

743 

744 

745@dataclasses.dataclass(frozen=True) 

746class QueryFinishedEvent: 

747 """Query finished successfully.""" 

748 

749 billing_project: Optional[str] 

750 location: Optional[str] 

751 query_id: Optional[str] 

752 job_id: Optional[str] 

753 destination: Optional[table.TableReference] 

754 total_rows: Optional[int] 

755 total_bytes_processed: Optional[int] 

756 slot_millis: Optional[int] 

757 created: Optional[datetime.datetime] 

758 started: Optional[datetime.datetime] 

759 ended: Optional[datetime.datetime] 

760 

761 

762@dataclasses.dataclass(frozen=True) 

763class QueryReceivedEvent: 

764 """Query received and acknowledged by the BigQuery API.""" 

765 

766 billing_project: Optional[str] 

767 location: Optional[str] 

768 job_id: Optional[str] 

769 statement_type: Optional[str] 

770 state: Optional[str] 

771 query_plan: Optional[list[google.cloud.bigquery.job.query.QueryPlanEntry]] 

772 created: Optional[datetime.datetime] 

773 started: Optional[datetime.datetime] 

774 ended: Optional[datetime.datetime] 

775 

776 

777@dataclasses.dataclass(frozen=True) 

778class QuerySentEvent: 

779 """Query sent to BigQuery.""" 

780 

781 query: str 

782 billing_project: Optional[str] 

783 location: Optional[str] 

784 job_id: Optional[str] 

785 request_id: Optional[str] 

786 

787 

788class QueryRetryEvent(QuerySentEvent): 

789 """Query sent another time because the previous attempt failed.""" 

790 

791 

792class QuerySentEventFactory: 

793 """Creates a QuerySentEvent first, then QueryRetryEvent after that.""" 

794 

795 def __init__(self): 

796 self._event_constructor = QuerySentEvent 

797 

798 def __call__(self, **kwargs): 

799 result = self._event_constructor(**kwargs) 

800 self._event_constructor = QueryRetryEvent 

801 return result