1# Copyright 2021 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Helpers for interacting with the job REST APIs from the client.
16
17For queries, there are three cases to consider:
18
191. jobs.insert: This always returns a job resource.
202. jobs.query, jobCreationMode=JOB_CREATION_REQUIRED:
21 This sometimes can return the results inline, but always includes a job ID.
223. jobs.query, jobCreationMode=JOB_CREATION_OPTIONAL:
23 This sometimes doesn't create a job at all, instead returning the results.
24 For better debugging, an auto-generated query ID is included in the
25 response.
26
27Client.query() calls either (1) or (2), depending on what the user provides
28for the api_method parameter. query() always returns a QueryJob object, which
29can retry the query when the query job fails for a retriable reason.
30
31Client.query_and_wait() calls (3). This returns a RowIterator that may wrap
32local results from the response or may wrap a query job containing multiple
33pages of results. Even though query_and_wait() waits for the job to complete,
34we still need a separate job_retry object because there are different
35predicates where it is safe to generate a new query ID.
36"""
37
38from __future__ import annotations
39
40import copy
41import dataclasses
42import datetime
43import functools
44import uuid
45import textwrap
46from typing import Any, Callable, Dict, Optional, TYPE_CHECKING, Union
47import warnings
48
49import google.api_core.exceptions as core_exceptions
50from google.api_core import retry as retries
51
52from google.cloud.bigquery import enums
53from google.cloud.bigquery import job
54import google.cloud.bigquery.job.query
55import google.cloud.bigquery.query
56from google.cloud.bigquery import table
57import google.cloud.bigquery.retry
58from google.cloud.bigquery.retry import POLLING_DEFAULT_VALUE
59
60# Avoid circular imports
61if TYPE_CHECKING: # pragma: NO COVER
62 from google.cloud.bigquery.client import Client
63
64
65# The purpose of _TIMEOUT_BUFFER_MILLIS is to allow the server-side timeout to
66# happen before the client-side timeout. This is not strictly necessary, as the
67# client retries client-side timeouts, but the hope by making the server-side
68# timeout slightly shorter is that it can save the server from some unncessary
69# processing time.
70#
71# 250 milliseconds is chosen arbitrarily, though should be about the right
72# order of magnitude for network latency and switching delays. It is about the
73# amount of time for light to circumnavigate the world twice.
74_TIMEOUT_BUFFER_MILLIS = 250
75
76
77def make_job_id(job_id: Optional[str] = None, prefix: Optional[str] = None) -> str:
78 """Construct an ID for a new job.
79
80 Args:
81 job_id: the user-provided job ID.
82 prefix: the user-provided prefix for a job ID.
83
84 Returns:
85 str: A job ID
86 """
87 if job_id is not None:
88 return job_id
89 elif prefix is not None:
90 return str(prefix) + str(uuid.uuid4())
91 else:
92 return str(uuid.uuid4())
93
94
95def job_config_with_defaults(
96 job_config: Optional[job.QueryJobConfig],
97 default_job_config: Optional[job.QueryJobConfig],
98) -> Optional[job.QueryJobConfig]:
99 """Create a copy of `job_config`, replacing unset values with those from
100 `default_job_config`.
101 """
102 if job_config is None:
103 return default_job_config
104
105 if default_job_config is None:
106 return job_config
107
108 # Both job_config and default_job_config are not None, so make a copy of
109 # job_config merged with default_job_config. Anything already explicitly
110 # set on job_config should not be replaced.
111 return job_config._fill_from_default(default_job_config)
112
113
114def query_jobs_insert(
115 client: "Client",
116 query: str,
117 job_config: Optional[job.QueryJobConfig],
118 job_id: Optional[str],
119 job_id_prefix: Optional[str],
120 location: Optional[str],
121 project: str,
122 retry: Optional[retries.Retry],
123 timeout: Optional[float],
124 job_retry: Optional[retries.Retry],
125 *,
126 callback: Callable = lambda _: None,
127) -> job.QueryJob:
128 """Initiate a query using jobs.insert.
129
130 See: https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/insert
131
132 Args:
133 callback (Callable):
134 A callback function used by bigframes to report query progress.
135 """
136 job_id_given = job_id is not None
137 job_id_save = job_id
138 job_config_save = job_config
139 query_sent_factory = QuerySentEventFactory()
140
141 def do_query():
142 # Make a copy now, so that original doesn't get changed by the process
143 # below and to facilitate retry
144 job_config = copy.deepcopy(job_config_save)
145
146 job_id = make_job_id(job_id_save, job_id_prefix)
147 job_ref = job._JobReference(job_id, project=project, location=location)
148 query_job = job.QueryJob(job_ref, query, client=client, job_config=job_config)
149
150 try:
151 query_job._begin(retry=retry, timeout=timeout)
152 if job_config is not None and not job_config.dry_run:
153 callback(
154 query_sent_factory(
155 query=query,
156 billing_project=query_job.project,
157 location=query_job.location,
158 job_id=query_job.job_id,
159 request_id=None,
160 )
161 )
162 except core_exceptions.Conflict as create_exc:
163 # The thought is if someone is providing their own job IDs and they get
164 # their job ID generation wrong, this could end up returning results for
165 # the wrong query. We thus only try to recover if job ID was not given.
166 if job_id_given:
167 raise create_exc
168
169 try:
170 # Sometimes we get a 404 after a Conflict. In this case, we
171 # have pretty high confidence that by retrying the 404, we'll
172 # (hopefully) eventually recover the job.
173 # https://github.com/googleapis/python-bigquery/issues/2134
174 #
175 # Allow users who want to completely disable retries to
176 # continue to do so by setting retry to None.
177 get_job_retry = retry
178 if retry is not None:
179 # TODO(tswast): Amend the user's retry object with allowing
180 # 404 to retry when there's a public way to do so.
181 # https://github.com/googleapis/python-api-core/issues/796
182 get_job_retry = (
183 google.cloud.bigquery.retry._DEFAULT_GET_JOB_CONFLICT_RETRY
184 )
185
186 query_job = client.get_job(
187 job_id,
188 project=project,
189 location=location,
190 retry=get_job_retry,
191 timeout=google.cloud.bigquery.retry.DEFAULT_GET_JOB_TIMEOUT,
192 )
193 except core_exceptions.GoogleAPIError: # (includes RetryError)
194 raise
195 else:
196 return query_job
197 else:
198 return query_job
199
200 # Allow users who want to completely disable retries to
201 # continue to do so by setting job_retry to None.
202 if job_retry is not None:
203 do_query = google.cloud.bigquery.retry._DEFAULT_QUERY_JOB_INSERT_RETRY(do_query)
204
205 future = do_query()
206
207 # The future might be in a failed state now, but if it's
208 # unrecoverable, we'll find out when we ask for it's result, at which
209 # point, we may retry.
210 if not job_id_given:
211 future._retry_do_query = do_query # in case we have to retry later
212 future._job_retry = job_retry
213
214 return future
215
216
217def _validate_job_config(request_body: Dict[str, Any], invalid_key: str):
218 """Catch common mistakes, such as passing in a *JobConfig object of the
219 wrong type.
220 """
221 if invalid_key in request_body:
222 raise ValueError(f"got unexpected key {repr(invalid_key)} in job_config")
223
224
225def validate_job_retry(job_id: Optional[str], job_retry: Optional[retries.Retry]):
226 """Catch common mistakes, such as setting a job_id and job_retry at the same
227 time.
228 """
229 if job_id is not None and job_retry is not None:
230 # TODO(tswast): To avoid breaking changes but still allow a default
231 # query job retry, we currently only raise if they explicitly set a
232 # job_retry other than the default. In a future version, we may want to
233 # avoid this check for DEFAULT_JOB_RETRY and always raise.
234 if job_retry is not google.cloud.bigquery.retry.DEFAULT_JOB_RETRY:
235 raise TypeError(
236 textwrap.dedent(
237 """
238 `job_retry` was provided, but the returned job is
239 not retryable, because a custom `job_id` was
240 provided. To customize the job ID and allow for job
241 retries, set job_id_prefix, instead.
242 """
243 ).strip()
244 )
245 else:
246 warnings.warn(
247 textwrap.dedent(
248 """
249 job_retry must be explicitly set to None if job_id is set.
250 BigQuery cannot retry a failed job by using the exact
251 same ID. Setting job_id without explicitly disabling
252 job_retry will raise an error in the future. To avoid this
253 warning, either use job_id_prefix instead (preferred) or
254 set job_retry=None.
255 """
256 ).strip(),
257 category=FutureWarning,
258 # user code -> client.query / client.query_and_wait -> validate_job_retry
259 stacklevel=3,
260 )
261
262
263def _to_query_request(
264 job_config: Optional[job.QueryJobConfig] = None,
265 *,
266 query: str,
267 location: Optional[str] = None,
268 timeout: Optional[float] = None,
269 timestamp_precision: Optional[enums.TimestampPrecision] = None,
270) -> Dict[str, Any]:
271 """Transform from Job resource to QueryRequest resource.
272
273 Most of the keys in job.configuration.query are in common with
274 QueryRequest. If any configuration property is set that is not available in
275 jobs.query, it will result in a server-side error.
276 """
277 request_body = copy.copy(job_config.to_api_repr()) if job_config else {}
278
279 _validate_job_config(request_body, job.CopyJob._JOB_TYPE)
280 _validate_job_config(request_body, job.ExtractJob._JOB_TYPE)
281 _validate_job_config(request_body, job.LoadJob._JOB_TYPE)
282
283 # Move query.* properties to top-level.
284 query_config_resource = request_body.pop("query", {})
285 request_body.update(query_config_resource)
286
287 # Default to standard SQL.
288 request_body.setdefault("useLegacySql", False)
289
290 request_body.setdefault("formatOptions", {})
291
292 # Cannot specify both use_int64_timestamp and timestamp_output_format.
293 if timestamp_precision == enums.TimestampPrecision.PICOSECOND:
294 request_body["formatOptions"]["timestampOutputFormat"] = "ISO8601_STRING" # type: ignore
295 else:
296 # Since jobs.query can return results, ensure we use the lossless
297 # timestamp format. See: https://github.com/googleapis/python-bigquery/issues/395
298 request_body["formatOptions"]["useInt64Timestamp"] = True # type: ignore
299
300 if timeout is not None:
301 # Subtract a buffer for context switching, network latency, etc.
302 request_body["timeoutMs"] = max(0, int(1000 * timeout) - _TIMEOUT_BUFFER_MILLIS)
303
304 if location is not None:
305 request_body["location"] = location
306
307 request_body["query"] = query
308
309 return request_body
310
311
312def _to_query_job(
313 client: "Client",
314 query: str,
315 request_config: Optional[job.QueryJobConfig],
316 query_response: Dict[str, Any],
317) -> job.QueryJob:
318 job_ref_resource = query_response["jobReference"]
319 job_ref = job._JobReference._from_api_repr(job_ref_resource)
320 query_job = job.QueryJob(job_ref, query, client=client)
321 query_job._properties.setdefault("configuration", {})
322
323 # Not all relevant properties are in the jobs.query response. Populate some
324 # expected properties based on the job configuration.
325 if request_config is not None:
326 query_job._properties["configuration"].update(request_config.to_api_repr())
327
328 query_job._properties["configuration"].setdefault("query", {})
329 query_job._properties["configuration"]["query"]["query"] = query
330 query_job._properties["configuration"]["query"].setdefault("useLegacySql", False)
331
332 query_job._properties.setdefault("statistics", {})
333 query_job._properties["statistics"].setdefault("query", {})
334 query_job._properties["statistics"]["query"]["cacheHit"] = query_response.get(
335 "cacheHit"
336 )
337 query_job._properties["statistics"]["query"]["schema"] = query_response.get(
338 "schema"
339 )
340 query_job._properties["statistics"]["query"][
341 "totalBytesProcessed"
342 ] = query_response.get("totalBytesProcessed")
343
344 # Set errors if any were encountered.
345 query_job._properties.setdefault("status", {})
346 if "errors" in query_response:
347 # Set errors but not errorResult. If there was an error that failed
348 # the job, jobs.query behaves like jobs.getQueryResults and returns a
349 # non-success HTTP status code.
350 errors = query_response["errors"]
351 query_job._properties["status"]["errors"] = errors
352
353 # Avoid an extra call to `getQueryResults` if the query has finished.
354 job_complete = query_response.get("jobComplete")
355 if job_complete:
356 query_job._query_results = google.cloud.bigquery.query._QueryResults(
357 query_response
358 )
359
360 # We want job.result() to refresh the job state, so the conversion is
361 # always "PENDING", even if the job is finished.
362 query_job._properties["status"]["state"] = "PENDING"
363
364 return query_job
365
366
367def _to_query_path(project: str) -> str:
368 return f"/projects/{project}/queries"
369
370
371def query_jobs_query(
372 client: "Client",
373 query: str,
374 job_config: Optional[job.QueryJobConfig],
375 location: Optional[str],
376 project: str,
377 retry: retries.Retry,
378 timeout: Optional[float],
379 job_retry: Optional[retries.Retry],
380 timestamp_precision: Optional[enums.TimestampPrecision] = None,
381) -> job.QueryJob:
382 """Initiate a query using jobs.query with jobCreationMode=JOB_CREATION_REQUIRED.
383
384 See: https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/query
385 """
386 path = _to_query_path(project)
387 request_body = _to_query_request(
388 query=query,
389 job_config=job_config,
390 location=location,
391 timeout=timeout,
392 timestamp_precision=timestamp_precision,
393 )
394
395 def do_query():
396 request_body["requestId"] = make_job_id()
397 span_attributes = {"path": path}
398 api_response = client._call_api(
399 retry,
400 span_name="BigQuery.query",
401 span_attributes=span_attributes,
402 method="POST",
403 path=path,
404 data=request_body,
405 timeout=timeout,
406 )
407 return _to_query_job(client, query, job_config, api_response)
408
409 future = do_query()
410
411 # The future might be in a failed state now, but if it's
412 # unrecoverable, we'll find out when we ask for it's result, at which
413 # point, we may retry.
414 future._retry_do_query = do_query # in case we have to retry later
415 future._job_retry = job_retry
416
417 return future
418
419
420def query_and_wait(
421 client: "Client",
422 query: str,
423 *,
424 job_config: Optional[job.QueryJobConfig],
425 location: Optional[str],
426 project: str,
427 api_timeout: Optional[float] = None,
428 wait_timeout: Optional[Union[float, object]] = POLLING_DEFAULT_VALUE,
429 retry: Optional[retries.Retry],
430 job_retry: Optional[retries.Retry],
431 page_size: Optional[int] = None,
432 max_results: Optional[int] = None,
433 callback: Callable = lambda _: None,
434) -> table.RowIterator:
435 """Run the query, wait for it to finish, and return the results.
436
437
438 Args:
439 client:
440 BigQuery client to make API calls.
441 query (str):
442 SQL query to be executed. Defaults to the standard SQL
443 dialect. Use the ``job_config`` parameter to change dialects.
444 job_config (Optional[google.cloud.bigquery.job.QueryJobConfig]):
445 Extra configuration options for the job.
446 To override any options that were previously set in
447 the ``default_query_job_config`` given to the
448 ``Client`` constructor, manually set those options to ``None``,
449 or whatever value is preferred.
450 location (Optional[str]):
451 Location where to run the job. Must match the location of the
452 table used in the query as well as the destination table.
453 project (str):
454 Project ID of the project of where to run the job.
455 api_timeout (Optional[float]):
456 The number of seconds to wait for the underlying HTTP transport
457 before using ``retry``.
458 wait_timeout (Optional[Union[float, object]]):
459 The number of seconds to wait for the query to finish. If the
460 query doesn't finish before this timeout, the client attempts
461 to cancel the query. If unset, the underlying Client.get_job() API
462 call has timeout, but we still wait indefinitely for the job to
463 finish.
464 retry (Optional[google.api_core.retry.Retry]):
465 How to retry the RPC. This only applies to making RPC
466 calls. It isn't used to retry failed jobs. This has
467 a reasonable default that should only be overridden
468 with care.
469 job_retry (Optional[google.api_core.retry.Retry]):
470 How to retry failed jobs. The default retries
471 rate-limit-exceeded errors. Passing ``None`` disables
472 job retry. Not all jobs can be retried.
473 page_size (Optional[int]):
474 The maximum number of rows in each page of results from this
475 request. Non-positive values are ignored.
476 max_results (Optional[int]):
477 The maximum total number of rows from this request.
478 callback (Callable):
479 A callback function used by bigframes to report query progress.
480
481 Returns:
482 google.cloud.bigquery.table.RowIterator:
483 Iterator of row data
484 :class:`~google.cloud.bigquery.table.Row`-s. During each
485 page, the iterator will have the ``total_rows`` attribute
486 set, which counts the total number of rows **in the result
487 set** (this is distinct from the total number of rows in the
488 current page: ``iterator.page.num_items``).
489
490 If the query is a special query that produces no results, e.g.
491 a DDL query, an ``_EmptyRowIterator`` instance is returned.
492
493 Raises:
494 TypeError:
495 If ``job_config`` is not an instance of
496 :class:`~google.cloud.bigquery.job.QueryJobConfig`
497 class.
498 """
499 request_body = _to_query_request(
500 query=query, job_config=job_config, location=location, timeout=api_timeout
501 )
502
503 # Some API parameters aren't supported by the jobs.query API. In these
504 # cases, fallback to a jobs.insert call.
505 if not _supported_by_jobs_query(request_body):
506 return _wait_or_cancel(
507 query_jobs_insert(
508 client=client,
509 query=query,
510 job_id=None,
511 job_id_prefix=None,
512 job_config=job_config,
513 location=location,
514 project=project,
515 retry=retry,
516 timeout=api_timeout,
517 job_retry=job_retry,
518 callback=callback,
519 ),
520 api_timeout=api_timeout,
521 wait_timeout=wait_timeout,
522 retry=retry,
523 page_size=page_size,
524 max_results=max_results,
525 callback=callback,
526 )
527
528 path = _to_query_path(project)
529
530 if page_size is not None and max_results is not None:
531 request_body["maxResults"] = min(page_size, max_results)
532 elif page_size is not None or max_results is not None:
533 request_body["maxResults"] = page_size or max_results
534 if client.default_job_creation_mode:
535 request_body["jobCreationMode"] = client.default_job_creation_mode
536
537 query_sent_factory = QuerySentEventFactory()
538
539 def do_query():
540 request_id = make_job_id()
541 request_body["requestId"] = request_id
542 span_attributes = {"path": path}
543
544 if "dryRun" not in request_body:
545 callback(
546 query_sent_factory(
547 query=query,
548 billing_project=project,
549 location=location,
550 job_id=None,
551 request_id=request_id,
552 )
553 )
554
555 # For easier testing, handle the retries ourselves.
556 if retry is not None:
557 response = retry(client._call_api)(
558 retry=None, # We're calling the retry decorator ourselves.
559 span_name="BigQuery.query",
560 span_attributes=span_attributes,
561 method="POST",
562 path=path,
563 data=request_body,
564 timeout=api_timeout,
565 )
566 else:
567 response = client._call_api(
568 retry=None,
569 span_name="BigQuery.query",
570 span_attributes=span_attributes,
571 method="POST",
572 path=path,
573 data=request_body,
574 timeout=api_timeout,
575 )
576
577 # Even if we run with JOB_CREATION_OPTIONAL, if there are more pages
578 # to fetch, there will be a job ID for jobs.getQueryResults.
579 query_results = google.cloud.bigquery.query._QueryResults.from_api_repr(
580 response
581 )
582 page_token = query_results.page_token
583 more_pages = page_token is not None
584
585 if more_pages or not query_results.complete:
586 # TODO(swast): Avoid a call to jobs.get in some cases (few
587 # remaining pages) by waiting for the query to finish and calling
588 # client._list_rows_from_query_results directly. Need to update
589 # RowIterator to fetch destination table via the job ID if needed.
590 return _wait_or_cancel(
591 _to_query_job(client, query, job_config, response),
592 api_timeout=api_timeout,
593 wait_timeout=wait_timeout,
594 retry=retry,
595 page_size=page_size,
596 max_results=max_results,
597 callback=callback,
598 )
599
600 if "dryRun" not in request_body:
601 callback(
602 QueryFinishedEvent(
603 billing_project=project,
604 location=query_results.location,
605 query_id=query_results.query_id,
606 job_id=query_results.job_id,
607 total_rows=query_results.total_rows,
608 total_bytes_processed=query_results.total_bytes_processed,
609 slot_millis=query_results.slot_millis,
610 destination=None,
611 created=query_results.created,
612 started=query_results.started,
613 ended=query_results.ended,
614 )
615 )
616 return table.RowIterator(
617 client=client,
618 api_request=functools.partial(client._call_api, retry, timeout=api_timeout),
619 path=None,
620 schema=query_results.schema,
621 max_results=max_results,
622 page_size=page_size,
623 total_rows=query_results.total_rows,
624 first_page_response=response,
625 location=query_results.location,
626 job_id=query_results.job_id,
627 query_id=query_results.query_id,
628 project=query_results.project,
629 num_dml_affected_rows=query_results.num_dml_affected_rows,
630 query=query,
631 total_bytes_processed=query_results.total_bytes_processed,
632 slot_millis=query_results.slot_millis,
633 created=query_results.created,
634 started=query_results.started,
635 ended=query_results.ended,
636 )
637
638 if job_retry is not None:
639 return job_retry(do_query)()
640 else:
641 return do_query()
642
643
644def _supported_by_jobs_query(request_body: Dict[str, Any]) -> bool:
645 """True if jobs.query can be used. False if jobs.insert is needed."""
646 request_keys = frozenset(request_body.keys())
647
648 # Per issue: https://github.com/googleapis/python-bigquery/issues/1867
649 # use an allowlist here instead of a denylist because the backend API allows
650 # unsupported parameters without any warning or failure. Instead, keep this
651 # set in sync with those in QueryRequest:
652 # https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/query#QueryRequest
653 keys_allowlist = {
654 "kind",
655 "query",
656 "maxResults",
657 "defaultDataset",
658 "timeoutMs",
659 "dryRun",
660 "preserveNulls",
661 "useQueryCache",
662 "useLegacySql",
663 "parameterMode",
664 "queryParameters",
665 "location",
666 "formatOptions",
667 "connectionProperties",
668 "labels",
669 "maximumBytesBilled",
670 "requestId",
671 "createSession",
672 "writeIncrementalResults",
673 "jobTimeoutMs",
674 "reservation",
675 "maxSlots",
676 }
677
678 unsupported_keys = request_keys - keys_allowlist
679 return len(unsupported_keys) == 0
680
681
682def _wait_or_cancel(
683 job: job.QueryJob,
684 api_timeout: Optional[float],
685 wait_timeout: Optional[Union[object, float]],
686 retry: Optional[retries.Retry],
687 page_size: Optional[int],
688 max_results: Optional[int],
689 *,
690 callback: Callable = lambda _: None,
691) -> table.RowIterator:
692 """Wait for a job to complete and return the results.
693
694 If we can't return the results within the ``wait_timeout``, try to cancel
695 the job.
696 """
697 try:
698 if not job.dry_run:
699 callback(
700 QueryReceivedEvent(
701 billing_project=job.project,
702 location=job.location,
703 job_id=job.job_id,
704 statement_type=job.statement_type,
705 state=job.state,
706 query_plan=job.query_plan,
707 created=job.created,
708 started=job.started,
709 ended=job.ended,
710 )
711 )
712 query_results = job.result(
713 page_size=page_size,
714 max_results=max_results,
715 retry=retry,
716 timeout=wait_timeout,
717 )
718 if not job.dry_run:
719 callback(
720 QueryFinishedEvent(
721 billing_project=job.project,
722 location=query_results.location,
723 query_id=query_results.query_id,
724 job_id=query_results.job_id,
725 total_rows=query_results.total_rows,
726 total_bytes_processed=query_results.total_bytes_processed,
727 slot_millis=query_results.slot_millis,
728 destination=job.destination,
729 created=job.created,
730 started=job.started,
731 ended=job.ended,
732 )
733 )
734 return query_results
735 except Exception:
736 # Attempt to cancel the job since we can't return the results.
737 try:
738 job.cancel(retry=retry, timeout=api_timeout)
739 except Exception:
740 # Don't eat the original exception if cancel fails.
741 pass
742 raise
743
744
745@dataclasses.dataclass(frozen=True)
746class QueryFinishedEvent:
747 """Query finished successfully."""
748
749 billing_project: Optional[str]
750 location: Optional[str]
751 query_id: Optional[str]
752 job_id: Optional[str]
753 destination: Optional[table.TableReference]
754 total_rows: Optional[int]
755 total_bytes_processed: Optional[int]
756 slot_millis: Optional[int]
757 created: Optional[datetime.datetime]
758 started: Optional[datetime.datetime]
759 ended: Optional[datetime.datetime]
760
761
762@dataclasses.dataclass(frozen=True)
763class QueryReceivedEvent:
764 """Query received and acknowledged by the BigQuery API."""
765
766 billing_project: Optional[str]
767 location: Optional[str]
768 job_id: Optional[str]
769 statement_type: Optional[str]
770 state: Optional[str]
771 query_plan: Optional[list[google.cloud.bigquery.job.query.QueryPlanEntry]]
772 created: Optional[datetime.datetime]
773 started: Optional[datetime.datetime]
774 ended: Optional[datetime.datetime]
775
776
777@dataclasses.dataclass(frozen=True)
778class QuerySentEvent:
779 """Query sent to BigQuery."""
780
781 query: str
782 billing_project: Optional[str]
783 location: Optional[str]
784 job_id: Optional[str]
785 request_id: Optional[str]
786
787
788class QueryRetryEvent(QuerySentEvent):
789 """Query sent another time because the previous attempt failed."""
790
791
792class QuerySentEventFactory:
793 """Creates a QuerySentEvent first, then QueryRetryEvent after that."""
794
795 def __init__(self):
796 self._event_constructor = QuerySentEvent
797
798 def __call__(self, **kwargs):
799 result = self._event_constructor(**kwargs)
800 self._event_constructor = QueryRetryEvent
801 return result