1# Copyright 2021 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Helpers for interacting with the job REST APIs from the client.
16
17For queries, there are three cases to consider:
18
191. jobs.insert: This always returns a job resource.
202. jobs.query, jobCreationMode=JOB_CREATION_REQUIRED:
21 This sometimes can return the results inline, but always includes a job ID.
223. jobs.query, jobCreationMode=JOB_CREATION_OPTIONAL:
23 This sometimes doesn't create a job at all, instead returning the results.
24 For better debugging, an auto-generated query ID is included in the
25 response.
26
27Client.query() calls either (1) or (2), depending on what the user provides
28for the api_method parameter. query() always returns a QueryJob object, which
29can retry the query when the query job fails for a retriable reason.
30
31Client.query_and_wait() calls (3). This returns a RowIterator that may wrap
32local results from the response or may wrap a query job containing multiple
33pages of results. Even though query_and_wait() waits for the job to complete,
34we still need a separate job_retry object because there are different
35predicates where it is safe to generate a new query ID.
36"""
37
38from __future__ import annotations
39
40import copy
41import dataclasses
42import datetime
43import functools
44import uuid
45import textwrap
46from typing import Any, Callable, Dict, Optional, TYPE_CHECKING, Union
47import warnings
48
49import google.api_core.exceptions as core_exceptions
50from google.api_core import retry as retries
51
52from google.cloud.bigquery import job
53import google.cloud.bigquery.job.query
54import google.cloud.bigquery.query
55from google.cloud.bigquery import table
56import google.cloud.bigquery.retry
57from google.cloud.bigquery.retry import POLLING_DEFAULT_VALUE
58
59# Avoid circular imports
60if TYPE_CHECKING: # pragma: NO COVER
61 from google.cloud.bigquery.client import Client
62
63
64# The purpose of _TIMEOUT_BUFFER_MILLIS is to allow the server-side timeout to
65# happen before the client-side timeout. This is not strictly necessary, as the
66# client retries client-side timeouts, but the hope by making the server-side
67# timeout slightly shorter is that it can save the server from some unncessary
68# processing time.
69#
70# 250 milliseconds is chosen arbitrarily, though should be about the right
71# order of magnitude for network latency and switching delays. It is about the
72# amount of time for light to circumnavigate the world twice.
73_TIMEOUT_BUFFER_MILLIS = 250
74
75
76def make_job_id(job_id: Optional[str] = None, prefix: Optional[str] = None) -> str:
77 """Construct an ID for a new job.
78
79 Args:
80 job_id: the user-provided job ID.
81 prefix: the user-provided prefix for a job ID.
82
83 Returns:
84 str: A job ID
85 """
86 if job_id is not None:
87 return job_id
88 elif prefix is not None:
89 return str(prefix) + str(uuid.uuid4())
90 else:
91 return str(uuid.uuid4())
92
93
94def job_config_with_defaults(
95 job_config: Optional[job.QueryJobConfig],
96 default_job_config: Optional[job.QueryJobConfig],
97) -> Optional[job.QueryJobConfig]:
98 """Create a copy of `job_config`, replacing unset values with those from
99 `default_job_config`.
100 """
101 if job_config is None:
102 return default_job_config
103
104 if default_job_config is None:
105 return job_config
106
107 # Both job_config and default_job_config are not None, so make a copy of
108 # job_config merged with default_job_config. Anything already explicitly
109 # set on job_config should not be replaced.
110 return job_config._fill_from_default(default_job_config)
111
112
113def query_jobs_insert(
114 client: "Client",
115 query: str,
116 job_config: Optional[job.QueryJobConfig],
117 job_id: Optional[str],
118 job_id_prefix: Optional[str],
119 location: Optional[str],
120 project: str,
121 retry: Optional[retries.Retry],
122 timeout: Optional[float],
123 job_retry: Optional[retries.Retry],
124 *,
125 callback: Callable = lambda _: None,
126) -> job.QueryJob:
127 """Initiate a query using jobs.insert.
128
129 See: https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/insert
130
131 Args:
132 callback (Callable):
133 A callback function used by bigframes to report query progress.
134 """
135 job_id_given = job_id is not None
136 job_id_save = job_id
137 job_config_save = job_config
138 query_sent_factory = QuerySentEventFactory()
139
140 def do_query():
141 # Make a copy now, so that original doesn't get changed by the process
142 # below and to facilitate retry
143 job_config = copy.deepcopy(job_config_save)
144
145 job_id = make_job_id(job_id_save, job_id_prefix)
146 job_ref = job._JobReference(job_id, project=project, location=location)
147 query_job = job.QueryJob(job_ref, query, client=client, job_config=job_config)
148
149 try:
150 query_job._begin(retry=retry, timeout=timeout)
151 if job_config is not None and not job_config.dry_run:
152 callback(
153 query_sent_factory(
154 query=query,
155 billing_project=query_job.project,
156 location=query_job.location,
157 job_id=query_job.job_id,
158 request_id=None,
159 )
160 )
161 except core_exceptions.Conflict as create_exc:
162 # The thought is if someone is providing their own job IDs and they get
163 # their job ID generation wrong, this could end up returning results for
164 # the wrong query. We thus only try to recover if job ID was not given.
165 if job_id_given:
166 raise create_exc
167
168 try:
169 # Sometimes we get a 404 after a Conflict. In this case, we
170 # have pretty high confidence that by retrying the 404, we'll
171 # (hopefully) eventually recover the job.
172 # https://github.com/googleapis/python-bigquery/issues/2134
173 #
174 # Allow users who want to completely disable retries to
175 # continue to do so by setting retry to None.
176 get_job_retry = retry
177 if retry is not None:
178 # TODO(tswast): Amend the user's retry object with allowing
179 # 404 to retry when there's a public way to do so.
180 # https://github.com/googleapis/python-api-core/issues/796
181 get_job_retry = (
182 google.cloud.bigquery.retry._DEFAULT_GET_JOB_CONFLICT_RETRY
183 )
184
185 query_job = client.get_job(
186 job_id,
187 project=project,
188 location=location,
189 retry=get_job_retry,
190 timeout=google.cloud.bigquery.retry.DEFAULT_GET_JOB_TIMEOUT,
191 )
192 except core_exceptions.GoogleAPIError: # (includes RetryError)
193 raise
194 else:
195 return query_job
196 else:
197 return query_job
198
199 # Allow users who want to completely disable retries to
200 # continue to do so by setting job_retry to None.
201 if job_retry is not None:
202 do_query = google.cloud.bigquery.retry._DEFAULT_QUERY_JOB_INSERT_RETRY(do_query)
203
204 future = do_query()
205
206 # The future might be in a failed state now, but if it's
207 # unrecoverable, we'll find out when we ask for it's result, at which
208 # point, we may retry.
209 if not job_id_given:
210 future._retry_do_query = do_query # in case we have to retry later
211 future._job_retry = job_retry
212
213 return future
214
215
216def _validate_job_config(request_body: Dict[str, Any], invalid_key: str):
217 """Catch common mistakes, such as passing in a *JobConfig object of the
218 wrong type.
219 """
220 if invalid_key in request_body:
221 raise ValueError(f"got unexpected key {repr(invalid_key)} in job_config")
222
223
224def validate_job_retry(job_id: Optional[str], job_retry: Optional[retries.Retry]):
225 """Catch common mistakes, such as setting a job_id and job_retry at the same
226 time.
227 """
228 if job_id is not None and job_retry is not None:
229 # TODO(tswast): To avoid breaking changes but still allow a default
230 # query job retry, we currently only raise if they explicitly set a
231 # job_retry other than the default. In a future version, we may want to
232 # avoid this check for DEFAULT_JOB_RETRY and always raise.
233 if job_retry is not google.cloud.bigquery.retry.DEFAULT_JOB_RETRY:
234 raise TypeError(
235 textwrap.dedent(
236 """
237 `job_retry` was provided, but the returned job is
238 not retryable, because a custom `job_id` was
239 provided. To customize the job ID and allow for job
240 retries, set job_id_prefix, instead.
241 """
242 ).strip()
243 )
244 else:
245 warnings.warn(
246 textwrap.dedent(
247 """
248 job_retry must be explicitly set to None if job_id is set.
249 BigQuery cannot retry a failed job by using the exact
250 same ID. Setting job_id without explicitly disabling
251 job_retry will raise an error in the future. To avoid this
252 warning, either use job_id_prefix instead (preferred) or
253 set job_retry=None.
254 """
255 ).strip(),
256 category=FutureWarning,
257 # user code -> client.query / client.query_and_wait -> validate_job_retry
258 stacklevel=3,
259 )
260
261
262def _to_query_request(
263 job_config: Optional[job.QueryJobConfig] = None,
264 *,
265 query: str,
266 location: Optional[str] = None,
267 timeout: Optional[float] = None,
268) -> Dict[str, Any]:
269 """Transform from Job resource to QueryRequest resource.
270
271 Most of the keys in job.configuration.query are in common with
272 QueryRequest. If any configuration property is set that is not available in
273 jobs.query, it will result in a server-side error.
274 """
275 request_body = copy.copy(job_config.to_api_repr()) if job_config else {}
276
277 _validate_job_config(request_body, job.CopyJob._JOB_TYPE)
278 _validate_job_config(request_body, job.ExtractJob._JOB_TYPE)
279 _validate_job_config(request_body, job.LoadJob._JOB_TYPE)
280
281 # Move query.* properties to top-level.
282 query_config_resource = request_body.pop("query", {})
283 request_body.update(query_config_resource)
284
285 # Default to standard SQL.
286 request_body.setdefault("useLegacySql", False)
287
288 # Since jobs.query can return results, ensure we use the lossless timestamp
289 # format. See: https://github.com/googleapis/python-bigquery/issues/395
290 request_body.setdefault("formatOptions", {})
291 request_body["formatOptions"]["useInt64Timestamp"] = True # type: ignore
292
293 if timeout is not None:
294 # Subtract a buffer for context switching, network latency, etc.
295 request_body["timeoutMs"] = max(0, int(1000 * timeout) - _TIMEOUT_BUFFER_MILLIS)
296
297 if location is not None:
298 request_body["location"] = location
299
300 request_body["query"] = query
301
302 return request_body
303
304
305def _to_query_job(
306 client: "Client",
307 query: str,
308 request_config: Optional[job.QueryJobConfig],
309 query_response: Dict[str, Any],
310) -> job.QueryJob:
311 job_ref_resource = query_response["jobReference"]
312 job_ref = job._JobReference._from_api_repr(job_ref_resource)
313 query_job = job.QueryJob(job_ref, query, client=client)
314 query_job._properties.setdefault("configuration", {})
315
316 # Not all relevant properties are in the jobs.query response. Populate some
317 # expected properties based on the job configuration.
318 if request_config is not None:
319 query_job._properties["configuration"].update(request_config.to_api_repr())
320
321 query_job._properties["configuration"].setdefault("query", {})
322 query_job._properties["configuration"]["query"]["query"] = query
323 query_job._properties["configuration"]["query"].setdefault("useLegacySql", False)
324
325 query_job._properties.setdefault("statistics", {})
326 query_job._properties["statistics"].setdefault("query", {})
327 query_job._properties["statistics"]["query"]["cacheHit"] = query_response.get(
328 "cacheHit"
329 )
330 query_job._properties["statistics"]["query"]["schema"] = query_response.get(
331 "schema"
332 )
333 query_job._properties["statistics"]["query"][
334 "totalBytesProcessed"
335 ] = query_response.get("totalBytesProcessed")
336
337 # Set errors if any were encountered.
338 query_job._properties.setdefault("status", {})
339 if "errors" in query_response:
340 # Set errors but not errorResult. If there was an error that failed
341 # the job, jobs.query behaves like jobs.getQueryResults and returns a
342 # non-success HTTP status code.
343 errors = query_response["errors"]
344 query_job._properties["status"]["errors"] = errors
345
346 # Avoid an extra call to `getQueryResults` if the query has finished.
347 job_complete = query_response.get("jobComplete")
348 if job_complete:
349 query_job._query_results = google.cloud.bigquery.query._QueryResults(
350 query_response
351 )
352
353 # We want job.result() to refresh the job state, so the conversion is
354 # always "PENDING", even if the job is finished.
355 query_job._properties["status"]["state"] = "PENDING"
356
357 return query_job
358
359
360def _to_query_path(project: str) -> str:
361 return f"/projects/{project}/queries"
362
363
364def query_jobs_query(
365 client: "Client",
366 query: str,
367 job_config: Optional[job.QueryJobConfig],
368 location: Optional[str],
369 project: str,
370 retry: retries.Retry,
371 timeout: Optional[float],
372 job_retry: Optional[retries.Retry],
373) -> job.QueryJob:
374 """Initiate a query using jobs.query with jobCreationMode=JOB_CREATION_REQUIRED.
375
376 See: https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/query
377 """
378 path = _to_query_path(project)
379 request_body = _to_query_request(
380 query=query, job_config=job_config, location=location, timeout=timeout
381 )
382
383 def do_query():
384 request_body["requestId"] = make_job_id()
385 span_attributes = {"path": path}
386 api_response = client._call_api(
387 retry,
388 span_name="BigQuery.query",
389 span_attributes=span_attributes,
390 method="POST",
391 path=path,
392 data=request_body,
393 timeout=timeout,
394 )
395 return _to_query_job(client, query, job_config, api_response)
396
397 future = do_query()
398
399 # The future might be in a failed state now, but if it's
400 # unrecoverable, we'll find out when we ask for it's result, at which
401 # point, we may retry.
402 future._retry_do_query = do_query # in case we have to retry later
403 future._job_retry = job_retry
404
405 return future
406
407
408def query_and_wait(
409 client: "Client",
410 query: str,
411 *,
412 job_config: Optional[job.QueryJobConfig],
413 location: Optional[str],
414 project: str,
415 api_timeout: Optional[float] = None,
416 wait_timeout: Optional[Union[float, object]] = POLLING_DEFAULT_VALUE,
417 retry: Optional[retries.Retry],
418 job_retry: Optional[retries.Retry],
419 page_size: Optional[int] = None,
420 max_results: Optional[int] = None,
421 callback: Callable = lambda _: None,
422) -> table.RowIterator:
423 """Run the query, wait for it to finish, and return the results.
424
425
426 Args:
427 client:
428 BigQuery client to make API calls.
429 query (str):
430 SQL query to be executed. Defaults to the standard SQL
431 dialect. Use the ``job_config`` parameter to change dialects.
432 job_config (Optional[google.cloud.bigquery.job.QueryJobConfig]):
433 Extra configuration options for the job.
434 To override any options that were previously set in
435 the ``default_query_job_config`` given to the
436 ``Client`` constructor, manually set those options to ``None``,
437 or whatever value is preferred.
438 location (Optional[str]):
439 Location where to run the job. Must match the location of the
440 table used in the query as well as the destination table.
441 project (str):
442 Project ID of the project of where to run the job.
443 api_timeout (Optional[float]):
444 The number of seconds to wait for the underlying HTTP transport
445 before using ``retry``.
446 wait_timeout (Optional[Union[float, object]]):
447 The number of seconds to wait for the query to finish. If the
448 query doesn't finish before this timeout, the client attempts
449 to cancel the query. If unset, the underlying Client.get_job() API
450 call has timeout, but we still wait indefinitely for the job to
451 finish.
452 retry (Optional[google.api_core.retry.Retry]):
453 How to retry the RPC. This only applies to making RPC
454 calls. It isn't used to retry failed jobs. This has
455 a reasonable default that should only be overridden
456 with care.
457 job_retry (Optional[google.api_core.retry.Retry]):
458 How to retry failed jobs. The default retries
459 rate-limit-exceeded errors. Passing ``None`` disables
460 job retry. Not all jobs can be retried.
461 page_size (Optional[int]):
462 The maximum number of rows in each page of results from this
463 request. Non-positive values are ignored.
464 max_results (Optional[int]):
465 The maximum total number of rows from this request.
466 callback (Callable):
467 A callback function used by bigframes to report query progress.
468
469 Returns:
470 google.cloud.bigquery.table.RowIterator:
471 Iterator of row data
472 :class:`~google.cloud.bigquery.table.Row`-s. During each
473 page, the iterator will have the ``total_rows`` attribute
474 set, which counts the total number of rows **in the result
475 set** (this is distinct from the total number of rows in the
476 current page: ``iterator.page.num_items``).
477
478 If the query is a special query that produces no results, e.g.
479 a DDL query, an ``_EmptyRowIterator`` instance is returned.
480
481 Raises:
482 TypeError:
483 If ``job_config`` is not an instance of
484 :class:`~google.cloud.bigquery.job.QueryJobConfig`
485 class.
486 """
487 request_body = _to_query_request(
488 query=query, job_config=job_config, location=location, timeout=api_timeout
489 )
490
491 # Some API parameters aren't supported by the jobs.query API. In these
492 # cases, fallback to a jobs.insert call.
493 if not _supported_by_jobs_query(request_body):
494 return _wait_or_cancel(
495 query_jobs_insert(
496 client=client,
497 query=query,
498 job_id=None,
499 job_id_prefix=None,
500 job_config=job_config,
501 location=location,
502 project=project,
503 retry=retry,
504 timeout=api_timeout,
505 job_retry=job_retry,
506 callback=callback,
507 ),
508 api_timeout=api_timeout,
509 wait_timeout=wait_timeout,
510 retry=retry,
511 page_size=page_size,
512 max_results=max_results,
513 callback=callback,
514 )
515
516 path = _to_query_path(project)
517
518 if page_size is not None and max_results is not None:
519 request_body["maxResults"] = min(page_size, max_results)
520 elif page_size is not None or max_results is not None:
521 request_body["maxResults"] = page_size or max_results
522 if client.default_job_creation_mode:
523 request_body["jobCreationMode"] = client.default_job_creation_mode
524
525 query_sent_factory = QuerySentEventFactory()
526
527 def do_query():
528 request_id = make_job_id()
529 request_body["requestId"] = request_id
530 span_attributes = {"path": path}
531
532 if "dryRun" not in request_body:
533 callback(
534 query_sent_factory(
535 query=query,
536 billing_project=project,
537 location=location,
538 job_id=None,
539 request_id=request_id,
540 )
541 )
542
543 # For easier testing, handle the retries ourselves.
544 if retry is not None:
545 response = retry(client._call_api)(
546 retry=None, # We're calling the retry decorator ourselves.
547 span_name="BigQuery.query",
548 span_attributes=span_attributes,
549 method="POST",
550 path=path,
551 data=request_body,
552 timeout=api_timeout,
553 )
554 else:
555 response = client._call_api(
556 retry=None,
557 span_name="BigQuery.query",
558 span_attributes=span_attributes,
559 method="POST",
560 path=path,
561 data=request_body,
562 timeout=api_timeout,
563 )
564
565 # Even if we run with JOB_CREATION_OPTIONAL, if there are more pages
566 # to fetch, there will be a job ID for jobs.getQueryResults.
567 query_results = google.cloud.bigquery.query._QueryResults.from_api_repr(
568 response
569 )
570 page_token = query_results.page_token
571 more_pages = page_token is not None
572
573 if more_pages or not query_results.complete:
574 # TODO(swast): Avoid a call to jobs.get in some cases (few
575 # remaining pages) by waiting for the query to finish and calling
576 # client._list_rows_from_query_results directly. Need to update
577 # RowIterator to fetch destination table via the job ID if needed.
578 return _wait_or_cancel(
579 _to_query_job(client, query, job_config, response),
580 api_timeout=api_timeout,
581 wait_timeout=wait_timeout,
582 retry=retry,
583 page_size=page_size,
584 max_results=max_results,
585 callback=callback,
586 )
587
588 if "dryRun" not in request_body:
589 callback(
590 QueryFinishedEvent(
591 billing_project=project,
592 location=query_results.location,
593 query_id=query_results.query_id,
594 job_id=query_results.job_id,
595 total_rows=query_results.total_rows,
596 total_bytes_processed=query_results.total_bytes_processed,
597 slot_millis=query_results.slot_millis,
598 destination=None,
599 created=query_results.created,
600 started=query_results.started,
601 ended=query_results.ended,
602 )
603 )
604 return table.RowIterator(
605 client=client,
606 api_request=functools.partial(client._call_api, retry, timeout=api_timeout),
607 path=None,
608 schema=query_results.schema,
609 max_results=max_results,
610 page_size=page_size,
611 total_rows=query_results.total_rows,
612 first_page_response=response,
613 location=query_results.location,
614 job_id=query_results.job_id,
615 query_id=query_results.query_id,
616 project=query_results.project,
617 num_dml_affected_rows=query_results.num_dml_affected_rows,
618 query=query,
619 total_bytes_processed=query_results.total_bytes_processed,
620 slot_millis=query_results.slot_millis,
621 created=query_results.created,
622 started=query_results.started,
623 ended=query_results.ended,
624 )
625
626 if job_retry is not None:
627 return job_retry(do_query)()
628 else:
629 return do_query()
630
631
632def _supported_by_jobs_query(request_body: Dict[str, Any]) -> bool:
633 """True if jobs.query can be used. False if jobs.insert is needed."""
634 request_keys = frozenset(request_body.keys())
635
636 # Per issue: https://github.com/googleapis/python-bigquery/issues/1867
637 # use an allowlist here instead of a denylist because the backend API allows
638 # unsupported parameters without any warning or failure. Instead, keep this
639 # set in sync with those in QueryRequest:
640 # https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/query#QueryRequest
641 keys_allowlist = {
642 "kind",
643 "query",
644 "maxResults",
645 "defaultDataset",
646 "timeoutMs",
647 "dryRun",
648 "preserveNulls",
649 "useQueryCache",
650 "useLegacySql",
651 "parameterMode",
652 "queryParameters",
653 "location",
654 "formatOptions",
655 "connectionProperties",
656 "labels",
657 "maximumBytesBilled",
658 "requestId",
659 "createSession",
660 "writeIncrementalResults",
661 }
662
663 unsupported_keys = request_keys - keys_allowlist
664 return len(unsupported_keys) == 0
665
666
667def _wait_or_cancel(
668 job: job.QueryJob,
669 api_timeout: Optional[float],
670 wait_timeout: Optional[Union[object, float]],
671 retry: Optional[retries.Retry],
672 page_size: Optional[int],
673 max_results: Optional[int],
674 *,
675 callback: Callable = lambda _: None,
676) -> table.RowIterator:
677 """Wait for a job to complete and return the results.
678
679 If we can't return the results within the ``wait_timeout``, try to cancel
680 the job.
681 """
682 try:
683 if not job.dry_run:
684 callback(
685 QueryReceivedEvent(
686 billing_project=job.project,
687 location=job.location,
688 job_id=job.job_id,
689 statement_type=job.statement_type,
690 state=job.state,
691 query_plan=job.query_plan,
692 created=job.created,
693 started=job.started,
694 ended=job.ended,
695 )
696 )
697 query_results = job.result(
698 page_size=page_size,
699 max_results=max_results,
700 retry=retry,
701 timeout=wait_timeout,
702 )
703 if not job.dry_run:
704 callback(
705 QueryFinishedEvent(
706 billing_project=job.project,
707 location=query_results.location,
708 query_id=query_results.query_id,
709 job_id=query_results.job_id,
710 total_rows=query_results.total_rows,
711 total_bytes_processed=query_results.total_bytes_processed,
712 slot_millis=query_results.slot_millis,
713 destination=job.destination,
714 created=job.created,
715 started=job.started,
716 ended=job.ended,
717 )
718 )
719 return query_results
720 except Exception:
721 # Attempt to cancel the job since we can't return the results.
722 try:
723 job.cancel(retry=retry, timeout=api_timeout)
724 except Exception:
725 # Don't eat the original exception if cancel fails.
726 pass
727 raise
728
729
730@dataclasses.dataclass(frozen=True)
731class QueryFinishedEvent:
732 """Query finished successfully."""
733
734 billing_project: Optional[str]
735 location: Optional[str]
736 query_id: Optional[str]
737 job_id: Optional[str]
738 destination: Optional[table.TableReference]
739 total_rows: Optional[int]
740 total_bytes_processed: Optional[int]
741 slot_millis: Optional[int]
742 created: Optional[datetime.datetime]
743 started: Optional[datetime.datetime]
744 ended: Optional[datetime.datetime]
745
746
747@dataclasses.dataclass(frozen=True)
748class QueryReceivedEvent:
749 """Query received and acknowledged by the BigQuery API."""
750
751 billing_project: Optional[str]
752 location: Optional[str]
753 job_id: Optional[str]
754 statement_type: Optional[str]
755 state: Optional[str]
756 query_plan: Optional[list[google.cloud.bigquery.job.query.QueryPlanEntry]]
757 created: Optional[datetime.datetime]
758 started: Optional[datetime.datetime]
759 ended: Optional[datetime.datetime]
760
761
762@dataclasses.dataclass(frozen=True)
763class QuerySentEvent:
764 """Query sent to BigQuery."""
765
766 query: str
767 billing_project: Optional[str]
768 location: Optional[str]
769 job_id: Optional[str]
770 request_id: Optional[str]
771
772
773class QueryRetryEvent(QuerySentEvent):
774 """Query sent another time because the previous attempt failed."""
775
776
777class QuerySentEventFactory:
778 """Creates a QuerySentEvent first, then QueryRetryEvent after that."""
779
780 def __init__(self):
781 self._event_constructor = QuerySentEvent
782
783 def __call__(self, **kwargs):
784 result = self._event_constructor(**kwargs)
785 self._event_constructor = QueryRetryEvent
786 return result