1# Copyright 2015 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Classes for query jobs."""
16
17import concurrent.futures
18import copy
19import re
20import time
21import typing
22from typing import Any, Dict, Iterable, List, Optional, Union
23
24from google.api_core import exceptions
25from google.api_core import retry as retries
26import requests
27
28from google.cloud.bigquery.dataset import Dataset
29from google.cloud.bigquery.dataset import DatasetListItem
30from google.cloud.bigquery.dataset import DatasetReference
31from google.cloud.bigquery.encryption_configuration import EncryptionConfiguration
32from google.cloud.bigquery.enums import KeyResultStatementKind, DefaultPandasDTypes
33from google.cloud.bigquery.external_config import ExternalConfig
34from google.cloud.bigquery import _helpers
35from google.cloud.bigquery.query import (
36 _query_param_from_api_repr,
37 ArrayQueryParameter,
38 ConnectionProperty,
39 ScalarQueryParameter,
40 StructQueryParameter,
41 UDFResource,
42)
43from google.cloud.bigquery.retry import (
44 DEFAULT_RETRY,
45 DEFAULT_JOB_RETRY,
46 POLLING_DEFAULT_VALUE,
47)
48from google.cloud.bigquery.routine import RoutineReference
49from google.cloud.bigquery.schema import SchemaField
50from google.cloud.bigquery.table import _EmptyRowIterator
51from google.cloud.bigquery.table import RangePartitioning
52from google.cloud.bigquery.table import _table_arg_to_table_ref
53from google.cloud.bigquery.table import TableReference
54from google.cloud.bigquery.table import TimePartitioning
55from google.cloud.bigquery._tqdm_helpers import wait_for_query
56
57from google.cloud.bigquery.job.base import _AsyncJob
58from google.cloud.bigquery.job.base import _JobConfig
59from google.cloud.bigquery.job.base import _JobReference
60
61try:
62 import pandas # type: ignore
63except ImportError:
64 pandas = None
65
66if typing.TYPE_CHECKING: # pragma: NO COVER
67 # Assumption: type checks are only used by library developers and CI environments
68 # that have all optional dependencies installed, thus no conditional imports.
69 import pandas # type: ignore
70 import geopandas # type: ignore
71 import pyarrow # type: ignore
72 from google.cloud import bigquery_storage
73 from google.cloud.bigquery.client import Client
74 from google.cloud.bigquery.table import RowIterator
75
76
77_CONTAINS_ORDER_BY = re.compile(r"ORDER\s+BY", re.IGNORECASE)
78_EXCEPTION_FOOTER_TEMPLATE = "{message}\n\nLocation: {location}\nJob ID: {job_id}\n"
79_TIMEOUT_BUFFER_SECS = 0.1
80
81
82def _contains_order_by(query):
83 """Do we need to preserve the order of the query results?
84
85 This function has known false positives, such as with ordered window
86 functions:
87
88 .. code-block:: sql
89
90 SELECT SUM(x) OVER (
91 window_name
92 PARTITION BY...
93 ORDER BY...
94 window_frame_clause)
95 FROM ...
96
97 This false positive failure case means the behavior will be correct, but
98 downloading results with the BigQuery Storage API may be slower than it
99 otherwise would. This is preferable to the false negative case, where
100 results are expected to be in order but are not (due to parallel reads).
101 """
102 return query and _CONTAINS_ORDER_BY.search(query)
103
104
105def _from_api_repr_query_parameters(resource):
106 return [_query_param_from_api_repr(mapping) for mapping in resource]
107
108
109def _to_api_repr_query_parameters(value):
110 return [query_parameter.to_api_repr() for query_parameter in value]
111
112
113def _from_api_repr_udf_resources(resource):
114 udf_resources = []
115 for udf_mapping in resource:
116 for udf_type, udf_value in udf_mapping.items():
117 udf_resources.append(UDFResource(udf_type, udf_value))
118 return udf_resources
119
120
121def _to_api_repr_udf_resources(value):
122 return [{udf_resource.udf_type: udf_resource.value} for udf_resource in value]
123
124
125def _from_api_repr_table_defs(resource):
126 return {k: ExternalConfig.from_api_repr(v) for k, v in resource.items()}
127
128
129def _to_api_repr_table_defs(value):
130 return {k: ExternalConfig.to_api_repr(v) for k, v in value.items()}
131
132
133class BiEngineReason(typing.NamedTuple):
134 """Reason for BI Engine acceleration failure
135
136 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#bienginereason
137 """
138
139 code: str = "CODE_UNSPECIFIED"
140
141 reason: str = ""
142
143 @classmethod
144 def from_api_repr(cls, reason: Dict[str, str]) -> "BiEngineReason":
145 return cls(reason.get("code", "CODE_UNSPECIFIED"), reason.get("message", ""))
146
147
148class BiEngineStats(typing.NamedTuple):
149 """Statistics for a BI Engine query
150
151 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#bienginestatistics
152 """
153
154 mode: str = "ACCELERATION_MODE_UNSPECIFIED"
155 """ Specifies which mode of BI Engine acceleration was performed (if any)
156 """
157
158 reasons: List[BiEngineReason] = []
159 """ Contains explanatory messages in case of DISABLED / PARTIAL acceleration
160 """
161
162 @classmethod
163 def from_api_repr(cls, stats: Dict[str, Any]) -> "BiEngineStats":
164 mode = stats.get("biEngineMode", "ACCELERATION_MODE_UNSPECIFIED")
165 reasons = [
166 BiEngineReason.from_api_repr(r) for r in stats.get("biEngineReasons", [])
167 ]
168 return cls(mode, reasons)
169
170
171class DmlStats(typing.NamedTuple):
172 """Detailed statistics for DML statements.
173
174 https://cloud.google.com/bigquery/docs/reference/rest/v2/DmlStats
175 """
176
177 inserted_row_count: int = 0
178 """Number of inserted rows. Populated by DML INSERT and MERGE statements."""
179
180 deleted_row_count: int = 0
181 """Number of deleted rows. populated by DML DELETE, MERGE and TRUNCATE statements.
182 """
183
184 updated_row_count: int = 0
185 """Number of updated rows. Populated by DML UPDATE and MERGE statements."""
186
187 @classmethod
188 def from_api_repr(cls, stats: Dict[str, str]) -> "DmlStats":
189 # NOTE: The field order here must match the order of fields set at the
190 # class level.
191 api_fields = ("insertedRowCount", "deletedRowCount", "updatedRowCount")
192
193 args = (
194 int(stats.get(api_field, default_val))
195 for api_field, default_val in zip(api_fields, cls.__new__.__defaults__) # type: ignore
196 )
197 return cls(*args)
198
199
200class IncrementalResultStats:
201 """IncrementalResultStats provides information about incremental query execution."""
202
203 def __init__(self):
204 self._properties = {}
205
206 @classmethod
207 def from_api_repr(cls, resource) -> "IncrementalResultStats":
208 """Factory: construct instance from the JSON repr.
209
210 Args:
211 resource(Dict[str: object]):
212 IncrementalResultStats representation returned from API.
213
214 Returns:
215 google.cloud.bigquery.job.IncrementalResultStats:
216 stats parsed from ``resource``.
217 """
218 entry = cls()
219 entry._properties = resource
220 return entry
221
222 @property
223 def disabled_reason(self):
224 """Optional[string]: Reason why incremental results were not
225 written by the query.
226 """
227 return _helpers._str_or_none(self._properties.get("disabledReason"))
228
229 @property
230 def result_set_last_replace_time(self):
231 """Optional[datetime]: The time at which the result table's contents
232 were completely replaced. May be absent if no results have been written
233 or the query has completed."""
234 from google.cloud._helpers import _rfc3339_nanos_to_datetime
235
236 value = self._properties.get("resultSetLastReplaceTime")
237 if value:
238 try:
239 return _rfc3339_nanos_to_datetime(value)
240 except ValueError:
241 pass
242 return None
243
244 @property
245 def result_set_last_modify_time(self):
246 """Optional[datetime]: The time at which the result table's contents
247 were modified. May be absent if no results have been written or the
248 query has completed."""
249 from google.cloud._helpers import _rfc3339_nanos_to_datetime
250
251 value = self._properties.get("resultSetLastModifyTime")
252 if value:
253 try:
254 return _rfc3339_nanos_to_datetime(value)
255 except ValueError:
256 pass
257 return None
258
259
260class IndexUnusedReason(typing.NamedTuple):
261 """Reason about why no search index was used in the search query (or sub-query).
262
263 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#indexunusedreason
264 """
265
266 code: Optional[str] = None
267 """Specifies the high-level reason for the scenario when no search index was used.
268 """
269
270 message: Optional[str] = None
271 """Free form human-readable reason for the scenario when no search index was used.
272 """
273
274 baseTable: Optional[TableReference] = None
275 """Specifies the base table involved in the reason that no search index was used.
276 """
277
278 indexName: Optional[str] = None
279 """Specifies the name of the unused search index, if available."""
280
281 @classmethod
282 def from_api_repr(cls, reason):
283 code = reason.get("code")
284 message = reason.get("message")
285 baseTable = reason.get("baseTable")
286 indexName = reason.get("indexName")
287
288 return cls(code, message, baseTable, indexName)
289
290
291class SearchStats(typing.NamedTuple):
292 """Statistics related to Search Queries. Populated as part of JobStatistics2.
293
294 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#searchstatistics
295 """
296
297 mode: Optional[str] = None
298 """Indicates the type of search index usage in the entire search query."""
299
300 reason: List[IndexUnusedReason] = []
301 """Reason about why no search index was used in the search query (or sub-query)"""
302
303 @classmethod
304 def from_api_repr(cls, stats: Dict[str, Any]):
305 mode = stats.get("indexUsageMode", None)
306 reason = [
307 IndexUnusedReason.from_api_repr(r)
308 for r in stats.get("indexUnusedReasons", [])
309 ]
310 return cls(mode, reason)
311
312
313class ScriptOptions:
314 """Options controlling the execution of scripts.
315
316 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#ScriptOptions
317 """
318
319 def __init__(
320 self,
321 statement_timeout_ms: Optional[int] = None,
322 statement_byte_budget: Optional[int] = None,
323 key_result_statement: Optional[KeyResultStatementKind] = None,
324 ):
325 self._properties: Dict[str, Any] = {}
326 self.statement_timeout_ms = statement_timeout_ms
327 self.statement_byte_budget = statement_byte_budget
328 self.key_result_statement = key_result_statement
329
330 @classmethod
331 def from_api_repr(cls, resource: Dict[str, Any]) -> "ScriptOptions":
332 """Factory: construct instance from the JSON repr.
333
334 Args:
335 resource(Dict[str: Any]):
336 ScriptOptions representation returned from API.
337
338 Returns:
339 google.cloud.bigquery.ScriptOptions:
340 ScriptOptions sample parsed from ``resource``.
341 """
342 entry = cls()
343 entry._properties = copy.deepcopy(resource)
344 return entry
345
346 def to_api_repr(self) -> Dict[str, Any]:
347 """Construct the API resource representation."""
348 return copy.deepcopy(self._properties)
349
350 @property
351 def statement_timeout_ms(self) -> Union[int, None]:
352 """Timeout period for each statement in a script."""
353 return _helpers._int_or_none(self._properties.get("statementTimeoutMs"))
354
355 @statement_timeout_ms.setter
356 def statement_timeout_ms(self, value: Union[int, None]):
357 new_value = None if value is None else str(value)
358 self._properties["statementTimeoutMs"] = new_value
359
360 @property
361 def statement_byte_budget(self) -> Union[int, None]:
362 """Limit on the number of bytes billed per statement.
363
364 Exceeding this budget results in an error.
365 """
366 return _helpers._int_or_none(self._properties.get("statementByteBudget"))
367
368 @statement_byte_budget.setter
369 def statement_byte_budget(self, value: Union[int, None]):
370 new_value = None if value is None else str(value)
371 self._properties["statementByteBudget"] = new_value
372
373 @property
374 def key_result_statement(self) -> Union[KeyResultStatementKind, None]:
375 """Determines which statement in the script represents the "key result".
376
377 This is used to populate the schema and query results of the script job.
378 Default is ``KeyResultStatementKind.LAST``.
379 """
380 return self._properties.get("keyResultStatement")
381
382 @key_result_statement.setter
383 def key_result_statement(self, value: Union[KeyResultStatementKind, None]):
384 self._properties["keyResultStatement"] = value
385
386
387class QueryJobConfig(_JobConfig):
388 """Configuration options for query jobs.
389
390 All properties in this class are optional. Values which are :data:`None` ->
391 server defaults. Set properties on the constructed configuration by using
392 the property name as the name of a keyword argument.
393 """
394
395 def __init__(self, **kwargs) -> None:
396 super(QueryJobConfig, self).__init__("query", **kwargs)
397
398 @property
399 def destination_encryption_configuration(self):
400 """google.cloud.bigquery.encryption_configuration.EncryptionConfiguration: Custom
401 encryption configuration for the destination table.
402
403 Custom encryption configuration (e.g., Cloud KMS keys) or :data:`None`
404 if using default encryption.
405
406 See
407 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.destination_encryption_configuration
408 """
409 prop = self._get_sub_prop("destinationEncryptionConfiguration")
410 if prop is not None:
411 prop = EncryptionConfiguration.from_api_repr(prop)
412 return prop
413
414 @destination_encryption_configuration.setter
415 def destination_encryption_configuration(self, value):
416 api_repr = value
417 if value is not None:
418 api_repr = value.to_api_repr()
419 self._set_sub_prop("destinationEncryptionConfiguration", api_repr)
420
421 @property
422 def allow_large_results(self):
423 """bool: Allow large query results tables (legacy SQL, only)
424
425 See
426 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.allow_large_results
427 """
428 return self._get_sub_prop("allowLargeResults")
429
430 @allow_large_results.setter
431 def allow_large_results(self, value):
432 self._set_sub_prop("allowLargeResults", value)
433
434 @property
435 def connection_properties(self) -> List[ConnectionProperty]:
436 """Connection properties.
437
438 See
439 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.connection_properties
440
441 .. versionadded:: 2.29.0
442 """
443 resource = self._get_sub_prop("connectionProperties", [])
444 return [ConnectionProperty.from_api_repr(prop) for prop in resource]
445
446 @connection_properties.setter
447 def connection_properties(self, value: Iterable[ConnectionProperty]):
448 self._set_sub_prop(
449 "connectionProperties",
450 [prop.to_api_repr() for prop in value],
451 )
452
453 @property
454 def create_disposition(self):
455 """google.cloud.bigquery.job.CreateDisposition: Specifies behavior
456 for creating tables.
457
458 See
459 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.create_disposition
460 """
461 return self._get_sub_prop("createDisposition")
462
463 @create_disposition.setter
464 def create_disposition(self, value):
465 self._set_sub_prop("createDisposition", value)
466
467 @property
468 def create_session(self) -> Optional[bool]:
469 """[Preview] If :data:`True`, creates a new session, where
470 :attr:`~google.cloud.bigquery.job.QueryJob.session_info` will contain a
471 random server generated session id.
472
473 If :data:`False`, runs query with an existing ``session_id`` passed in
474 :attr:`~google.cloud.bigquery.job.QueryJobConfig.connection_properties`,
475 otherwise runs query in non-session mode.
476
477 See
478 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.create_session
479
480 .. versionadded:: 2.29.0
481 """
482 return self._get_sub_prop("createSession")
483
484 @create_session.setter
485 def create_session(self, value: Optional[bool]):
486 self._set_sub_prop("createSession", value)
487
488 @property
489 def default_dataset(self):
490 """google.cloud.bigquery.dataset.DatasetReference: the default dataset
491 to use for unqualified table names in the query or :data:`None` if not
492 set.
493
494 The ``default_dataset`` setter accepts:
495
496 - a :class:`~google.cloud.bigquery.dataset.Dataset`, or
497 - a :class:`~google.cloud.bigquery.dataset.DatasetReference`, or
498 - a :class:`str` of the fully-qualified dataset ID in standard SQL
499 format. The value must included a project ID and dataset ID
500 separated by ``.``. For example: ``your-project.your_dataset``.
501
502 See
503 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.default_dataset
504 """
505 prop = self._get_sub_prop("defaultDataset")
506 if prop is not None:
507 prop = DatasetReference.from_api_repr(prop)
508 return prop
509
510 @default_dataset.setter
511 def default_dataset(self, value):
512 if value is None:
513 self._set_sub_prop("defaultDataset", None)
514 return
515
516 if isinstance(value, str):
517 value = DatasetReference.from_string(value)
518
519 if isinstance(value, (Dataset, DatasetListItem)):
520 value = value.reference
521
522 resource = value.to_api_repr()
523 self._set_sub_prop("defaultDataset", resource)
524
525 @property
526 def destination(self):
527 """google.cloud.bigquery.table.TableReference: table where results are
528 written or :data:`None` if not set.
529
530 The ``destination`` setter accepts:
531
532 - a :class:`~google.cloud.bigquery.table.Table`, or
533 - a :class:`~google.cloud.bigquery.table.TableReference`, or
534 - a :class:`str` of the fully-qualified table ID in standard SQL
535 format. The value must included a project ID, dataset ID, and table
536 ID, each separated by ``.``. For example:
537 ``your-project.your_dataset.your_table``.
538
539 .. note::
540
541 Only table ID is passed to the backend, so any configuration
542 in `~google.cloud.bigquery.table.Table` is discarded.
543
544 See
545 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.destination_table
546 """
547 prop = self._get_sub_prop("destinationTable")
548 if prop is not None:
549 prop = TableReference.from_api_repr(prop)
550 return prop
551
552 @destination.setter
553 def destination(self, value):
554 if value is None:
555 self._set_sub_prop("destinationTable", None)
556 return
557
558 value = _table_arg_to_table_ref(value)
559 resource = value.to_api_repr()
560 self._set_sub_prop("destinationTable", resource)
561
562 @property
563 def dry_run(self):
564 """bool: :data:`True` if this query should be a dry run to estimate
565 costs.
566
567 See
568 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfiguration.FIELDS.dry_run
569 """
570 return self._properties.get("dryRun")
571
572 @dry_run.setter
573 def dry_run(self, value):
574 self._properties["dryRun"] = value
575
576 @property
577 def flatten_results(self):
578 """bool: Flatten nested/repeated fields in results. (Legacy SQL only)
579
580 See
581 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.flatten_results
582 """
583 return self._get_sub_prop("flattenResults")
584
585 @flatten_results.setter
586 def flatten_results(self, value):
587 self._set_sub_prop("flattenResults", value)
588
589 @property
590 def maximum_billing_tier(self):
591 """int: Deprecated. Changes the billing tier to allow high-compute
592 queries.
593
594 See
595 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.maximum_billing_tier
596 """
597 return self._get_sub_prop("maximumBillingTier")
598
599 @maximum_billing_tier.setter
600 def maximum_billing_tier(self, value):
601 self._set_sub_prop("maximumBillingTier", value)
602
603 @property
604 def maximum_bytes_billed(self):
605 """int: Maximum bytes to be billed for this job or :data:`None` if not set.
606
607 See
608 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.maximum_bytes_billed
609 """
610 return _helpers._int_or_none(self._get_sub_prop("maximumBytesBilled"))
611
612 @maximum_bytes_billed.setter
613 def maximum_bytes_billed(self, value):
614 self._set_sub_prop("maximumBytesBilled", str(value))
615
616 @property
617 def priority(self):
618 """google.cloud.bigquery.job.QueryPriority: Priority of the query.
619
620 See
621 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.priority
622 """
623 return self._get_sub_prop("priority")
624
625 @priority.setter
626 def priority(self, value):
627 self._set_sub_prop("priority", value)
628
629 @property
630 def query_parameters(self):
631 """List[Union[google.cloud.bigquery.query.ArrayQueryParameter, \
632 google.cloud.bigquery.query.ScalarQueryParameter, \
633 google.cloud.bigquery.query.StructQueryParameter]]: list of parameters
634 for parameterized query (empty by default)
635
636 See:
637 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.query_parameters
638 """
639 prop = self._get_sub_prop("queryParameters", default=[])
640 return _from_api_repr_query_parameters(prop)
641
642 @query_parameters.setter
643 def query_parameters(self, values):
644 self._set_sub_prop("queryParameters", _to_api_repr_query_parameters(values))
645
646 @property
647 def range_partitioning(self):
648 """Optional[google.cloud.bigquery.table.RangePartitioning]:
649 Configures range-based partitioning for destination table.
650
651 .. note::
652 **Beta**. The integer range partitioning feature is in a
653 pre-release state and might change or have limited support.
654
655 Only specify at most one of
656 :attr:`~google.cloud.bigquery.job.LoadJobConfig.time_partitioning` or
657 :attr:`~google.cloud.bigquery.job.LoadJobConfig.range_partitioning`.
658
659 Raises:
660 ValueError:
661 If the value is not
662 :class:`~google.cloud.bigquery.table.RangePartitioning` or
663 :data:`None`.
664 """
665 resource = self._get_sub_prop("rangePartitioning")
666 if resource is not None:
667 return RangePartitioning(_properties=resource)
668
669 @range_partitioning.setter
670 def range_partitioning(self, value):
671 resource = value
672 if isinstance(value, RangePartitioning):
673 resource = value._properties
674 elif value is not None:
675 raise ValueError(
676 "Expected value to be RangePartitioning or None, got {}.".format(value)
677 )
678 self._set_sub_prop("rangePartitioning", resource)
679
680 @property
681 def udf_resources(self):
682 """List[google.cloud.bigquery.query.UDFResource]: user
683 defined function resources (empty by default)
684
685 See:
686 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.user_defined_function_resources
687 """
688 prop = self._get_sub_prop("userDefinedFunctionResources", default=[])
689 return _from_api_repr_udf_resources(prop)
690
691 @udf_resources.setter
692 def udf_resources(self, values):
693 self._set_sub_prop(
694 "userDefinedFunctionResources", _to_api_repr_udf_resources(values)
695 )
696
697 @property
698 def use_legacy_sql(self):
699 """bool: Use legacy SQL syntax.
700
701 See
702 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.use_legacy_sql
703 """
704 return self._get_sub_prop("useLegacySql")
705
706 @use_legacy_sql.setter
707 def use_legacy_sql(self, value):
708 self._set_sub_prop("useLegacySql", value)
709
710 @property
711 def use_query_cache(self):
712 """bool: Look for the query result in the cache.
713
714 See
715 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.use_query_cache
716 """
717 return self._get_sub_prop("useQueryCache")
718
719 @use_query_cache.setter
720 def use_query_cache(self, value):
721 self._set_sub_prop("useQueryCache", value)
722
723 @property
724 def write_disposition(self):
725 """google.cloud.bigquery.job.WriteDisposition: Action that occurs if
726 the destination table already exists.
727
728 See
729 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.write_disposition
730 """
731 return self._get_sub_prop("writeDisposition")
732
733 @write_disposition.setter
734 def write_disposition(self, value):
735 self._set_sub_prop("writeDisposition", value)
736
737 @property
738 def write_incremental_results(self) -> Optional[bool]:
739 """This is only supported for a SELECT query using a temporary table.
740
741 If set, the query is allowed to write results incrementally to the temporary result
742 table. This may incur a performance penalty. This option cannot be used with Legacy SQL.
743
744 This feature is not generally available.
745 """
746 return self._get_sub_prop("writeIncrementalResults")
747
748 @write_incremental_results.setter
749 def write_incremental_results(self, value):
750 self._set_sub_prop("writeIncrementalResults", value)
751
752 @property
753 def table_definitions(self):
754 """Dict[str, google.cloud.bigquery.external_config.ExternalConfig]:
755 Definitions for external tables or :data:`None` if not set.
756
757 See
758 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.external_table_definitions
759 """
760 prop = self._get_sub_prop("tableDefinitions")
761 if prop is not None:
762 prop = _from_api_repr_table_defs(prop)
763 return prop
764
765 @table_definitions.setter
766 def table_definitions(self, values):
767 self._set_sub_prop("tableDefinitions", _to_api_repr_table_defs(values))
768
769 @property
770 def time_partitioning(self):
771 """Optional[google.cloud.bigquery.table.TimePartitioning]: Specifies
772 time-based partitioning for the destination table.
773
774 Only specify at most one of
775 :attr:`~google.cloud.bigquery.job.LoadJobConfig.time_partitioning` or
776 :attr:`~google.cloud.bigquery.job.LoadJobConfig.range_partitioning`.
777
778 Raises:
779 ValueError:
780 If the value is not
781 :class:`~google.cloud.bigquery.table.TimePartitioning` or
782 :data:`None`.
783 """
784 prop = self._get_sub_prop("timePartitioning")
785 if prop is not None:
786 prop = TimePartitioning.from_api_repr(prop)
787 return prop
788
789 @time_partitioning.setter
790 def time_partitioning(self, value):
791 api_repr = value
792 if value is not None:
793 api_repr = value.to_api_repr()
794 self._set_sub_prop("timePartitioning", api_repr)
795
796 @property
797 def clustering_fields(self):
798 """Optional[List[str]]: Fields defining clustering for the table
799
800 (Defaults to :data:`None`).
801
802 Clustering fields are immutable after table creation.
803
804 .. note::
805
806 BigQuery supports clustering for both partitioned and
807 non-partitioned tables.
808 """
809 prop = self._get_sub_prop("clustering")
810 if prop is not None:
811 return list(prop.get("fields", ()))
812
813 @clustering_fields.setter
814 def clustering_fields(self, value):
815 """Optional[List[str]]: Fields defining clustering for the table
816
817 (Defaults to :data:`None`).
818 """
819 if value is not None:
820 self._set_sub_prop("clustering", {"fields": value})
821 else:
822 self._del_sub_prop("clustering")
823
824 @property
825 def schema_update_options(self):
826 """List[google.cloud.bigquery.job.SchemaUpdateOption]: Specifies
827 updates to the destination table schema to allow as a side effect of
828 the query job.
829 """
830 return self._get_sub_prop("schemaUpdateOptions")
831
832 @schema_update_options.setter
833 def schema_update_options(self, values):
834 self._set_sub_prop("schemaUpdateOptions", values)
835
836 @property
837 def script_options(self) -> ScriptOptions:
838 """Options controlling the execution of scripts.
839
840 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#scriptoptions
841 """
842 prop = self._get_sub_prop("scriptOptions")
843 if prop is not None:
844 prop = ScriptOptions.from_api_repr(prop)
845 return prop
846
847 @script_options.setter
848 def script_options(self, value: Union[ScriptOptions, None]):
849 new_value = None if value is None else value.to_api_repr()
850 self._set_sub_prop("scriptOptions", new_value)
851
852 def to_api_repr(self) -> dict:
853 """Build an API representation of the query job config.
854
855 Returns:
856 Dict: A dictionary in the format used by the BigQuery API.
857 """
858 resource = copy.deepcopy(self._properties)
859 # Query parameters have an addition property associated with them
860 # to indicate if the query is using named or positional parameters.
861 query_parameters = resource.get("query", {}).get("queryParameters")
862 if query_parameters:
863 if query_parameters[0].get("name") is None:
864 resource["query"]["parameterMode"] = "POSITIONAL"
865 else:
866 resource["query"]["parameterMode"] = "NAMED"
867
868 return resource
869
870
871class QueryJob(_AsyncJob):
872 """Asynchronous job: query tables.
873
874 Args:
875 job_id (str): the job's ID, within the project belonging to ``client``.
876
877 query (str): SQL query string.
878
879 client (google.cloud.bigquery.client.Client):
880 A client which holds credentials and project configuration
881 for the dataset (which requires a project).
882
883 job_config (Optional[google.cloud.bigquery.job.QueryJobConfig]):
884 Extra configuration options for the query job.
885 """
886
887 _JOB_TYPE = "query"
888 _UDF_KEY = "userDefinedFunctionResources"
889 _CONFIG_CLASS = QueryJobConfig
890
891 def __init__(self, job_id, query, client, job_config=None):
892 super(QueryJob, self).__init__(job_id, client)
893
894 if job_config is not None:
895 self._properties["configuration"] = job_config._properties
896 if self.configuration.use_legacy_sql is None:
897 self.configuration.use_legacy_sql = False
898
899 if query:
900 _helpers._set_sub_prop(
901 self._properties, ["configuration", "query", "query"], query
902 )
903 self._query_results = None
904 self._done_timeout = None
905 self._transport_timeout = None
906
907 @property
908 def allow_large_results(self):
909 """See
910 :attr:`google.cloud.bigquery.job.QueryJobConfig.allow_large_results`.
911 """
912 return self.configuration.allow_large_results
913
914 @property
915 def configuration(self) -> QueryJobConfig:
916 """The configuration for this query job."""
917 return typing.cast(QueryJobConfig, super().configuration)
918
919 @property
920 def connection_properties(self) -> List[ConnectionProperty]:
921 """See
922 :attr:`google.cloud.bigquery.job.QueryJobConfig.connection_properties`.
923
924 .. versionadded:: 2.29.0
925 """
926 return self.configuration.connection_properties
927
928 @property
929 def create_disposition(self):
930 """See
931 :attr:`google.cloud.bigquery.job.QueryJobConfig.create_disposition`.
932 """
933 return self.configuration.create_disposition
934
935 @property
936 def create_session(self) -> Optional[bool]:
937 """See
938 :attr:`google.cloud.bigquery.job.QueryJobConfig.create_session`.
939
940 .. versionadded:: 2.29.0
941 """
942 return self.configuration.create_session
943
944 @property
945 def default_dataset(self):
946 """See
947 :attr:`google.cloud.bigquery.job.QueryJobConfig.default_dataset`.
948 """
949 return self.configuration.default_dataset
950
951 @property
952 def destination(self):
953 """See
954 :attr:`google.cloud.bigquery.job.QueryJobConfig.destination`.
955 """
956 return self.configuration.destination
957
958 @property
959 def destination_encryption_configuration(self):
960 """google.cloud.bigquery.encryption_configuration.EncryptionConfiguration: Custom
961 encryption configuration for the destination table.
962
963 Custom encryption configuration (e.g., Cloud KMS keys) or :data:`None`
964 if using default encryption.
965
966 See
967 :attr:`google.cloud.bigquery.job.QueryJobConfig.destination_encryption_configuration`.
968 """
969 return self.configuration.destination_encryption_configuration
970
971 @property
972 def dry_run(self):
973 """See
974 :attr:`google.cloud.bigquery.job.QueryJobConfig.dry_run`.
975 """
976 return self.configuration.dry_run
977
978 @property
979 def flatten_results(self):
980 """See
981 :attr:`google.cloud.bigquery.job.QueryJobConfig.flatten_results`.
982 """
983 return self.configuration.flatten_results
984
985 @property
986 def priority(self):
987 """See
988 :attr:`google.cloud.bigquery.job.QueryJobConfig.priority`.
989 """
990 return self.configuration.priority
991
992 @property
993 def search_stats(self) -> Optional[SearchStats]:
994 """Returns a SearchStats object."""
995
996 stats = self._job_statistics().get("searchStatistics")
997 if stats is not None:
998 return SearchStats.from_api_repr(stats)
999 return None
1000
1001 @property
1002 def query(self):
1003 """str: The query text used in this query job.
1004
1005 See:
1006 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.query
1007 """
1008 return _helpers._get_sub_prop(
1009 self._properties, ["configuration", "query", "query"]
1010 )
1011
1012 @property
1013 def query_id(self) -> Optional[str]:
1014 """[Preview] ID of a completed query.
1015
1016 This ID is auto-generated and not guaranteed to be populated.
1017 """
1018 query_results = self._query_results
1019 return query_results.query_id if query_results is not None else None
1020
1021 @property
1022 def query_parameters(self):
1023 """See
1024 :attr:`google.cloud.bigquery.job.QueryJobConfig.query_parameters`.
1025 """
1026 return self.configuration.query_parameters
1027
1028 @property
1029 def udf_resources(self):
1030 """See
1031 :attr:`google.cloud.bigquery.job.QueryJobConfig.udf_resources`.
1032 """
1033 return self.configuration.udf_resources
1034
1035 @property
1036 def use_legacy_sql(self):
1037 """See
1038 :attr:`google.cloud.bigquery.job.QueryJobConfig.use_legacy_sql`.
1039 """
1040 return self.configuration.use_legacy_sql
1041
1042 @property
1043 def use_query_cache(self):
1044 """See
1045 :attr:`google.cloud.bigquery.job.QueryJobConfig.use_query_cache`.
1046 """
1047 return self.configuration.use_query_cache
1048
1049 @property
1050 def write_disposition(self):
1051 """See
1052 :attr:`google.cloud.bigquery.job.QueryJobConfig.write_disposition`.
1053 """
1054 return self.configuration.write_disposition
1055
1056 @property
1057 def maximum_billing_tier(self):
1058 """See
1059 :attr:`google.cloud.bigquery.job.QueryJobConfig.maximum_billing_tier`.
1060 """
1061 return self.configuration.maximum_billing_tier
1062
1063 @property
1064 def maximum_bytes_billed(self):
1065 """See
1066 :attr:`google.cloud.bigquery.job.QueryJobConfig.maximum_bytes_billed`.
1067 """
1068 return self.configuration.maximum_bytes_billed
1069
1070 @property
1071 def range_partitioning(self):
1072 """See
1073 :attr:`google.cloud.bigquery.job.QueryJobConfig.range_partitioning`.
1074 """
1075 return self.configuration.range_partitioning
1076
1077 @property
1078 def table_definitions(self):
1079 """See
1080 :attr:`google.cloud.bigquery.job.QueryJobConfig.table_definitions`.
1081 """
1082 return self.configuration.table_definitions
1083
1084 @property
1085 def time_partitioning(self):
1086 """See
1087 :attr:`google.cloud.bigquery.job.QueryJobConfig.time_partitioning`.
1088 """
1089 return self.configuration.time_partitioning
1090
1091 @property
1092 def clustering_fields(self):
1093 """See
1094 :attr:`google.cloud.bigquery.job.QueryJobConfig.clustering_fields`.
1095 """
1096 return self.configuration.clustering_fields
1097
1098 @property
1099 def schema_update_options(self):
1100 """See
1101 :attr:`google.cloud.bigquery.job.QueryJobConfig.schema_update_options`.
1102 """
1103 return self.configuration.schema_update_options
1104
1105 def to_api_repr(self):
1106 """Generate a resource for :meth:`_begin`."""
1107 # Use to_api_repr to allow for some configuration properties to be set
1108 # automatically.
1109 configuration = self.configuration.to_api_repr()
1110 return {
1111 "jobReference": self._properties["jobReference"],
1112 "configuration": configuration,
1113 }
1114
1115 @classmethod
1116 def from_api_repr(cls, resource: dict, client: "Client") -> "QueryJob":
1117 """Factory: construct a job given its API representation
1118
1119 Args:
1120 resource (Dict): dataset job representation returned from the API
1121
1122 client (google.cloud.bigquery.client.Client):
1123 Client which holds credentials and project
1124 configuration for the dataset.
1125
1126 Returns:
1127 google.cloud.bigquery.job.QueryJob: Job parsed from ``resource``.
1128 """
1129 job_ref_properties = resource.setdefault(
1130 "jobReference", {"projectId": client.project, "jobId": None}
1131 )
1132 job_ref = _JobReference._from_api_repr(job_ref_properties)
1133 job = cls(job_ref, None, client=client)
1134 job._set_properties(resource)
1135 return job
1136
1137 @property
1138 def query_plan(self):
1139 """Return query plan from job statistics, if present.
1140
1141 See:
1142 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics2.FIELDS.query_plan
1143
1144 Returns:
1145 List[google.cloud.bigquery.job.QueryPlanEntry]:
1146 mappings describing the query plan, or an empty list
1147 if the query has not yet completed.
1148 """
1149 plan_entries = self._job_statistics().get("queryPlan", ())
1150 return [QueryPlanEntry.from_api_repr(entry) for entry in plan_entries]
1151
1152 @property
1153 def schema(self) -> Optional[List[SchemaField]]:
1154 """The schema of the results.
1155
1156 Present only for successful dry run of non-legacy SQL queries.
1157 """
1158 resource = self._job_statistics().get("schema")
1159 if resource is None:
1160 return None
1161 fields = resource.get("fields", [])
1162 return [SchemaField.from_api_repr(field) for field in fields]
1163
1164 @property
1165 def timeline(self):
1166 """List(TimelineEntry): Return the query execution timeline
1167 from job statistics.
1168 """
1169 raw = self._job_statistics().get("timeline", ())
1170 return [TimelineEntry.from_api_repr(entry) for entry in raw]
1171
1172 @property
1173 def total_bytes_processed(self):
1174 """Return total bytes processed from job statistics, if present.
1175
1176 See:
1177 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics2.FIELDS.total_bytes_processed
1178
1179 Returns:
1180 Optional[int]:
1181 Total bytes processed by the job, or None if job is not
1182 yet complete.
1183 """
1184 result = self._job_statistics().get("totalBytesProcessed")
1185 if result is not None:
1186 result = int(result)
1187 return result
1188
1189 @property
1190 def total_bytes_billed(self):
1191 """Return total bytes billed from job statistics, if present.
1192
1193 See:
1194 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics2.FIELDS.total_bytes_billed
1195
1196 Returns:
1197 Optional[int]:
1198 Total bytes processed by the job, or None if job is not
1199 yet complete.
1200 """
1201 result = self._job_statistics().get("totalBytesBilled")
1202 if result is not None:
1203 result = int(result)
1204 return result
1205
1206 @property
1207 def billing_tier(self):
1208 """Return billing tier from job statistics, if present.
1209
1210 See:
1211 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics2.FIELDS.billing_tier
1212
1213 Returns:
1214 Optional[int]:
1215 Billing tier used by the job, or None if job is not
1216 yet complete.
1217 """
1218 return self._job_statistics().get("billingTier")
1219
1220 @property
1221 def cache_hit(self):
1222 """Return whether or not query results were served from cache.
1223
1224 See:
1225 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics2.FIELDS.cache_hit
1226
1227 Returns:
1228 Optional[bool]:
1229 whether the query results were returned from cache, or None
1230 if job is not yet complete.
1231 """
1232 return self._job_statistics().get("cacheHit")
1233
1234 @property
1235 def ddl_operation_performed(self):
1236 """Optional[str]: Return the DDL operation performed.
1237
1238 See:
1239 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics2.FIELDS.ddl_operation_performed
1240
1241 """
1242 return self._job_statistics().get("ddlOperationPerformed")
1243
1244 @property
1245 def ddl_target_routine(self):
1246 """Optional[google.cloud.bigquery.routine.RoutineReference]: Return the DDL target routine, present
1247 for CREATE/DROP FUNCTION/PROCEDURE queries.
1248
1249 See:
1250 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics2.FIELDS.ddl_target_routine
1251 """
1252 prop = self._job_statistics().get("ddlTargetRoutine")
1253 if prop is not None:
1254 prop = RoutineReference.from_api_repr(prop)
1255 return prop
1256
1257 @property
1258 def ddl_target_table(self):
1259 """Optional[google.cloud.bigquery.table.TableReference]: Return the DDL target table, present
1260 for CREATE/DROP TABLE/VIEW queries.
1261
1262 See:
1263 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics2.FIELDS.ddl_target_table
1264 """
1265 prop = self._job_statistics().get("ddlTargetTable")
1266 if prop is not None:
1267 prop = TableReference.from_api_repr(prop)
1268 return prop
1269
1270 @property
1271 def num_dml_affected_rows(self) -> Optional[int]:
1272 """Return the number of DML rows affected by the job.
1273
1274 See:
1275 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics2.FIELDS.num_dml_affected_rows
1276
1277 Returns:
1278 Optional[int]:
1279 number of DML rows affected by the job, or None if job is not
1280 yet complete.
1281 """
1282 result = self._job_statistics().get("numDmlAffectedRows")
1283 if result is not None:
1284 result = int(result)
1285 return result
1286
1287 @property
1288 def slot_millis(self):
1289 """Union[int, None]: Slot-milliseconds used by this query job."""
1290 return _helpers._int_or_none(self._job_statistics().get("totalSlotMs"))
1291
1292 @property
1293 def statement_type(self):
1294 """Return statement type from job statistics, if present.
1295
1296 See:
1297 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics2.FIELDS.statement_type
1298
1299 Returns:
1300 Optional[str]:
1301 type of statement used by the job, or None if job is not
1302 yet complete.
1303 """
1304 return self._job_statistics().get("statementType")
1305
1306 @property
1307 def referenced_tables(self):
1308 """Return referenced tables from job statistics, if present.
1309
1310 See:
1311 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics2.FIELDS.referenced_tables
1312
1313 Returns:
1314 List[Dict]:
1315 mappings describing the query plan, or an empty list
1316 if the query has not yet completed.
1317 """
1318 tables = []
1319 datasets_by_project_name = {}
1320
1321 for table in self._job_statistics().get("referencedTables", ()):
1322 t_project = table["projectId"]
1323
1324 ds_id = table["datasetId"]
1325 t_dataset = datasets_by_project_name.get((t_project, ds_id))
1326 if t_dataset is None:
1327 t_dataset = DatasetReference(t_project, ds_id)
1328 datasets_by_project_name[(t_project, ds_id)] = t_dataset
1329
1330 t_name = table["tableId"]
1331 tables.append(t_dataset.table(t_name))
1332
1333 return tables
1334
1335 @property
1336 def undeclared_query_parameters(self):
1337 """Return undeclared query parameters from job statistics, if present.
1338
1339 See:
1340 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics2.FIELDS.undeclared_query_parameters
1341
1342 Returns:
1343 List[Union[ \
1344 google.cloud.bigquery.query.ArrayQueryParameter, \
1345 google.cloud.bigquery.query.ScalarQueryParameter, \
1346 google.cloud.bigquery.query.StructQueryParameter \
1347 ]]:
1348 Undeclared parameters, or an empty list if the query has
1349 not yet completed.
1350 """
1351 parameters = []
1352 undeclared = self._job_statistics().get("undeclaredQueryParameters", ())
1353
1354 for parameter in undeclared:
1355 p_type = parameter["parameterType"]
1356
1357 if "arrayType" in p_type:
1358 klass = ArrayQueryParameter
1359 elif "structTypes" in p_type:
1360 klass = StructQueryParameter
1361 else:
1362 klass = ScalarQueryParameter
1363
1364 parameters.append(klass.from_api_repr(parameter))
1365
1366 return parameters
1367
1368 @property
1369 def estimated_bytes_processed(self):
1370 """Return the estimated number of bytes processed by the query.
1371
1372 See:
1373 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics2.FIELDS.estimated_bytes_processed
1374
1375 Returns:
1376 Optional[int]:
1377 number of DML rows affected by the job, or None if job is not
1378 yet complete.
1379 """
1380 result = self._job_statistics().get("estimatedBytesProcessed")
1381 if result is not None:
1382 result = int(result)
1383 return result
1384
1385 @property
1386 def dml_stats(self) -> Optional[DmlStats]:
1387 stats = self._job_statistics().get("dmlStats")
1388 if stats is None:
1389 return None
1390 else:
1391 return DmlStats.from_api_repr(stats)
1392
1393 @property
1394 def bi_engine_stats(self) -> Optional[BiEngineStats]:
1395 stats = self._job_statistics().get("biEngineStatistics")
1396
1397 if stats is None:
1398 return None
1399 else:
1400 return BiEngineStats.from_api_repr(stats)
1401
1402 @property
1403 def incremental_result_stats(self) -> Optional[IncrementalResultStats]:
1404 stats = self._job_statistics().get("incrementalResultStats")
1405 if stats is None:
1406 return None
1407 return IncrementalResultStats.from_api_repr(stats)
1408
1409 def _blocking_poll(self, timeout=None, **kwargs):
1410 self._done_timeout = timeout
1411 self._transport_timeout = timeout
1412 super(QueryJob, self)._blocking_poll(timeout=timeout, **kwargs)
1413
1414 @staticmethod
1415 def _format_for_exception(message: str, query: str):
1416 """Format a query for the output in exception message.
1417
1418 Args:
1419 message (str): The original exception message.
1420 query (str): The SQL query to format.
1421
1422 Returns:
1423 str: A formatted query text.
1424 """
1425 template = "{message}\n\n{header}\n\n{ruler}\n{body}\n{ruler}"
1426
1427 lines = query.splitlines() if query is not None else [""]
1428 max_line_len = max(len(line) for line in lines)
1429
1430 header = "-----Query Job SQL Follows-----"
1431 header = "{:^{total_width}}".format(header, total_width=max_line_len + 5)
1432
1433 # Print out a "ruler" above and below the SQL so we can judge columns.
1434 # Left pad for the line numbers (4 digits plus ":").
1435 ruler = " |" + " . |" * (max_line_len // 10)
1436
1437 # Put line numbers next to the SQL.
1438 body = "\n".join(
1439 "{:4}:{}".format(n, line) for n, line in enumerate(lines, start=1)
1440 )
1441
1442 return template.format(message=message, header=header, ruler=ruler, body=body)
1443
1444 def _begin(self, client=None, retry=DEFAULT_RETRY, timeout=None):
1445 """API call: begin the job via a POST request
1446
1447 See
1448 https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/insert
1449
1450 Args:
1451 client (Optional[google.cloud.bigquery.client.Client]):
1452 The client to use. If not passed, falls back to the ``client``
1453 associated with the job object or``NoneType``.
1454 retry (Optional[google.api_core.retry.Retry]):
1455 How to retry the RPC.
1456 timeout (Optional[float]):
1457 The number of seconds to wait for the underlying HTTP transport
1458 before using ``retry``.
1459
1460 Raises:
1461 ValueError: If the job has already begun.
1462 """
1463
1464 try:
1465 super(QueryJob, self)._begin(client=client, retry=retry, timeout=timeout)
1466 except exceptions.GoogleAPICallError as exc:
1467 exc.message = _EXCEPTION_FOOTER_TEMPLATE.format(
1468 message=exc.message, location=self.location, job_id=self.job_id
1469 )
1470 exc.debug_message = self._format_for_exception(exc.message, self.query)
1471 exc.query_job = self
1472 raise
1473
1474 def _reload_query_results(
1475 self,
1476 retry: "retries.Retry" = DEFAULT_RETRY,
1477 timeout: Optional[float] = None,
1478 page_size: int = 0,
1479 start_index: Optional[int] = None,
1480 ):
1481 """Refresh the cached query results unless already cached and complete.
1482
1483 Args:
1484 retry (Optional[google.api_core.retry.Retry]):
1485 How to retry the call that retrieves query results.
1486 timeout (Optional[float]):
1487 The number of seconds to wait for the underlying HTTP transport
1488 before using ``retry``.
1489 page_size (int):
1490 Maximum number of rows in a single response. See maxResults in
1491 the jobs.getQueryResults REST API.
1492 start_index (Optional[int]):
1493 Zero-based index of the starting row. See startIndex in the
1494 jobs.getQueryResults REST API.
1495 """
1496 # Optimization: avoid a call to jobs.getQueryResults if it's already
1497 # been fetched, e.g. from jobs.query first page of results.
1498 if self._query_results and self._query_results.complete:
1499 return
1500
1501 # Since the API to getQueryResults can hang up to the timeout value
1502 # (default of 10 seconds), set the timeout parameter to ensure that
1503 # the timeout from the futures API is respected. See:
1504 # https://github.com/GoogleCloudPlatform/google-cloud-python/issues/4135
1505 timeout_ms = None
1506
1507 # Python_API_core, as part of a major rewrite of the deadline, timeout,
1508 # retry process sets the timeout value as a Python object().
1509 # Our system does not natively handle that and instead expects
1510 # either None or a numeric value. If passed a Python object, convert to
1511 # None.
1512 if type(self._done_timeout) is object: # pragma: NO COVER
1513 self._done_timeout = None
1514
1515 if self._done_timeout is not None: # pragma: NO COVER
1516 # Subtract a buffer for context switching, network latency, etc.
1517 api_timeout = self._done_timeout - _TIMEOUT_BUFFER_SECS
1518 api_timeout = max(min(api_timeout, 10), 0)
1519 self._done_timeout -= api_timeout
1520 self._done_timeout = max(0, self._done_timeout)
1521 timeout_ms = int(api_timeout * 1000)
1522
1523 # If an explicit timeout is not given, fall back to the transport timeout
1524 # stored in _blocking_poll() in the process of polling for job completion.
1525 if timeout is not None:
1526 transport_timeout = timeout
1527 else:
1528 transport_timeout = self._transport_timeout
1529
1530 # Handle PollingJob._DEFAULT_VALUE.
1531 if not isinstance(transport_timeout, (float, int)):
1532 transport_timeout = None
1533
1534 self._query_results = self._client._get_query_results(
1535 self.job_id,
1536 retry,
1537 project=self.project,
1538 timeout_ms=timeout_ms,
1539 location=self.location,
1540 timeout=transport_timeout,
1541 page_size=page_size,
1542 start_index=start_index,
1543 )
1544
1545 def result( # type: ignore # (incompatible with supertype)
1546 self,
1547 page_size: Optional[int] = None,
1548 max_results: Optional[int] = None,
1549 retry: Optional[retries.Retry] = DEFAULT_RETRY,
1550 timeout: Optional[Union[float, object]] = POLLING_DEFAULT_VALUE,
1551 start_index: Optional[int] = None,
1552 job_retry: Optional[retries.Retry] = DEFAULT_JOB_RETRY,
1553 ) -> Union["RowIterator", _EmptyRowIterator]:
1554 """Start the job and wait for it to complete and get the result.
1555
1556 Args:
1557 page_size (Optional[int]):
1558 The maximum number of rows in each page of results from this
1559 request. Non-positive values are ignored.
1560 max_results (Optional[int]):
1561 The maximum total number of rows from this request.
1562 retry (Optional[google.api_core.retry.Retry]):
1563 How to retry the call that retrieves rows. This only
1564 applies to making RPC calls. It isn't used to retry
1565 failed jobs. This has a reasonable default that
1566 should only be overridden with care. If the job state
1567 is ``DONE``, retrying is aborted early even if the
1568 results are not available, as this will not change
1569 anymore.
1570 timeout (Optional[Union[float, \
1571 google.api_core.future.polling.PollingFuture._DEFAULT_VALUE, \
1572 ]]):
1573 The number of seconds to wait for the underlying HTTP transport
1574 before using ``retry``. If ``None``, wait indefinitely
1575 unless an error is returned. If unset, only the
1576 underlying API calls have their default timeouts, but we still
1577 wait indefinitely for the job to finish.
1578 start_index (Optional[int]):
1579 The zero-based index of the starting row to read.
1580 job_retry (Optional[google.api_core.retry.Retry]):
1581 How to retry failed jobs. The default retries
1582 rate-limit-exceeded errors. Passing ``None`` disables
1583 job retry.
1584
1585 Not all jobs can be retried. If ``job_id`` was
1586 provided to the query that created this job, then the
1587 job returned by the query will not be retryable, and
1588 an exception will be raised if non-``None``
1589 non-default ``job_retry`` is also provided.
1590
1591 Returns:
1592 google.cloud.bigquery.table.RowIterator:
1593 Iterator of row data
1594 :class:`~google.cloud.bigquery.table.Row`-s. During each
1595 page, the iterator will have the ``total_rows`` attribute
1596 set, which counts the total number of rows **in the result
1597 set** (this is distinct from the total number of rows in the
1598 current page: ``iterator.page.num_items``).
1599
1600 If the query is a special query that produces no results, e.g.
1601 a DDL query, an ``_EmptyRowIterator`` instance is returned.
1602
1603 Raises:
1604 google.api_core.exceptions.GoogleAPICallError:
1605 If the job failed and retries aren't successful.
1606 concurrent.futures.TimeoutError:
1607 If the job did not complete in the given timeout.
1608 TypeError:
1609 If Non-``None`` and non-default ``job_retry`` is
1610 provided and the job is not retryable.
1611 """
1612 # Note: Since waiting for a query job to finish is more complex than
1613 # refreshing the job state in a loop, we avoid calling the superclass
1614 # in this method.
1615
1616 if self.dry_run:
1617 return _EmptyRowIterator(
1618 project=self.project,
1619 location=self.location,
1620 schema=self.schema,
1621 total_bytes_processed=self.total_bytes_processed,
1622 # Intentionally omit job_id and query_id since this doesn't
1623 # actually correspond to a finished query job.
1624 )
1625
1626 # Setting max_results should be equivalent to setting page_size with
1627 # regards to allowing the user to tune how many results to download
1628 # while we wait for the query to finish. See internal issue:
1629 # 344008814. But if start_index is set, user is trying to access a
1630 # specific page, so we don't need to set page_size. See issue #1950.
1631 if page_size is None and max_results is not None and start_index is None:
1632 page_size = max_results
1633
1634 # When timeout has default sentinel value ``object()``, do not pass
1635 # anything to invoke default timeouts in subsequent calls.
1636 done_kwargs: Dict[str, Union[_helpers.TimeoutType, object]] = {}
1637 reload_query_results_kwargs: Dict[str, Union[_helpers.TimeoutType, object]] = {}
1638 list_rows_kwargs: Dict[str, Union[_helpers.TimeoutType, object]] = {}
1639 if type(timeout) is not object:
1640 done_kwargs["timeout"] = timeout
1641 list_rows_kwargs["timeout"] = timeout
1642 reload_query_results_kwargs["timeout"] = timeout
1643
1644 if page_size is not None:
1645 reload_query_results_kwargs["page_size"] = page_size
1646
1647 if start_index is not None:
1648 reload_query_results_kwargs["start_index"] = start_index
1649
1650 try:
1651 retry_do_query = getattr(self, "_retry_do_query", None)
1652 if retry_do_query is not None:
1653 if job_retry is DEFAULT_JOB_RETRY:
1654 job_retry = self._job_retry # type: ignore
1655 else:
1656 if job_retry is not None and job_retry is not DEFAULT_JOB_RETRY:
1657 raise TypeError(
1658 "`job_retry` was provided, but this job is"
1659 " not retryable, because a custom `job_id` was"
1660 " provided to the query that created this job."
1661 )
1662
1663 restart_query_job = False
1664
1665 def is_job_done():
1666 nonlocal restart_query_job
1667
1668 if restart_query_job:
1669 restart_query_job = False
1670
1671 # The original job has failed. Create a new one.
1672 #
1673 # Note that we won't get here if retry_do_query is
1674 # None, because we won't use a retry.
1675 job = retry_do_query()
1676
1677 # Become the new job:
1678 self.__dict__.clear()
1679 self.__dict__.update(job.__dict__)
1680
1681 # It's possible the job fails again and we'll have to
1682 # retry that too.
1683 self._retry_do_query = retry_do_query
1684 self._job_retry = job_retry
1685
1686 # If the job hasn't been created, create it now. Related:
1687 # https://github.com/googleapis/python-bigquery/issues/1940
1688 if self.state is None:
1689 self._begin(retry=retry, **done_kwargs)
1690
1691 # Refresh the job status with jobs.get because some of the
1692 # exceptions thrown by jobs.getQueryResults like timeout and
1693 # rateLimitExceeded errors are ambiguous. We want to know if
1694 # the query job failed and not just the call to
1695 # jobs.getQueryResults.
1696 if self.done(retry=retry, **done_kwargs):
1697 # If it's already failed, we might as well stop.
1698 job_failed_exception = self.exception()
1699 if job_failed_exception is not None:
1700 # Only try to restart the query job if the job failed for
1701 # a retriable reason. For example, don't restart the query
1702 # if the call to reload the job metadata within self.done()
1703 # timed out.
1704 #
1705 # The `restart_query_job` must only be called after a
1706 # successful call to the `jobs.get` REST API and we
1707 # determine that the job has failed.
1708 #
1709 # The `jobs.get` REST API
1710 # (https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/get)
1711 # is called via `self.done()` which calls
1712 # `self.reload()`.
1713 #
1714 # To determine if the job failed, the `self.exception()`
1715 # is set from `self.reload()` via
1716 # `self._set_properties()`, which translates the
1717 # `Job.status.errorResult` field
1718 # (https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatus.FIELDS.error_result)
1719 # into an exception that can be processed by the
1720 # `job_retry` predicate.
1721 restart_query_job = True
1722 raise job_failed_exception
1723 else:
1724 # Make sure that the _query_results are cached so we
1725 # can return a complete RowIterator.
1726 #
1727 # Note: As an optimization, _reload_query_results
1728 # doesn't make any API calls if the query results are
1729 # already cached and have jobComplete=True in the
1730 # response from the REST API. This ensures we aren't
1731 # making any extra API calls if the previous loop
1732 # iteration fetched the finished job.
1733 self._reload_query_results(
1734 retry=retry, **reload_query_results_kwargs
1735 )
1736 return True
1737
1738 # Call jobs.getQueryResults with max results set to 0 just to
1739 # wait for the query to finish. Unlike most methods,
1740 # jobs.getQueryResults hangs as long as it can to ensure we
1741 # know when the query has finished as soon as possible.
1742 self._reload_query_results(retry=retry, **reload_query_results_kwargs)
1743
1744 # Even if the query is finished now according to
1745 # jobs.getQueryResults, we'll want to reload the job status if
1746 # it's not already DONE.
1747 return False
1748
1749 if retry_do_query is not None and job_retry is not None:
1750 is_job_done = job_retry(is_job_done)
1751
1752 # timeout can be a number of seconds, `None`, or a
1753 # `google.api_core.future.polling.PollingFuture._DEFAULT_VALUE`
1754 # sentinel object indicating a default timeout if we choose to add
1755 # one some day. This value can come from our PollingFuture
1756 # superclass and was introduced in
1757 # https://github.com/googleapis/python-api-core/pull/462.
1758 if isinstance(timeout, (float, int)):
1759 remaining_timeout = timeout
1760 else:
1761 # Note: we may need to handle _DEFAULT_VALUE as a separate
1762 # case someday, but even then the best we can do for queries
1763 # is 72+ hours for hyperparameter tuning jobs:
1764 # https://cloud.google.com/bigquery/quotas#query_jobs
1765 #
1766 # The timeout for a multi-statement query is 24+ hours. See:
1767 # https://cloud.google.com/bigquery/quotas#multi_statement_query_limits
1768 remaining_timeout = None
1769
1770 if remaining_timeout is None:
1771 # Since is_job_done() calls jobs.getQueryResults, which is a
1772 # long-running API, don't delay the next request at all.
1773 while not is_job_done():
1774 pass
1775 else:
1776 # Use a monotonic clock since we don't actually care about
1777 # daylight savings or similar, just the elapsed time.
1778 previous_time = time.monotonic()
1779
1780 while not is_job_done():
1781 current_time = time.monotonic()
1782 elapsed_time = current_time - previous_time
1783 remaining_timeout = remaining_timeout - elapsed_time
1784 previous_time = current_time
1785
1786 if remaining_timeout < 0:
1787 raise concurrent.futures.TimeoutError()
1788
1789 except exceptions.GoogleAPICallError as exc:
1790 exc.message = _EXCEPTION_FOOTER_TEMPLATE.format(
1791 message=exc.message, location=self.location, job_id=self.job_id
1792 )
1793 exc.debug_message = self._format_for_exception(exc.message, self.query) # type: ignore
1794 exc.query_job = self # type: ignore
1795 raise
1796 except requests.exceptions.Timeout as exc:
1797 raise concurrent.futures.TimeoutError from exc
1798
1799 # If the query job is complete but there are no query results, this was
1800 # special job, such as a DDL query. Return an empty result set to
1801 # indicate success and avoid calling tabledata.list on a table which
1802 # can't be read (such as a view table).
1803 if self._query_results.total_rows is None:
1804 return _EmptyRowIterator(
1805 location=self.location,
1806 project=self.project,
1807 job_id=self.job_id,
1808 query_id=self.query_id,
1809 schema=self.schema,
1810 num_dml_affected_rows=self._query_results.num_dml_affected_rows,
1811 query=self.query,
1812 total_bytes_processed=self.total_bytes_processed,
1813 slot_millis=self.slot_millis,
1814 )
1815
1816 # We know that there's at least 1 row, so only treat the response from
1817 # jobs.getQueryResults / jobs.query as the first page of the
1818 # RowIterator response if there are any rows in it. This prevents us
1819 # from stopping the iteration early in the cases where we set
1820 # maxResults=0. In that case, we're missing rows and there's no next
1821 # page token.
1822 first_page_response = self._query_results._properties
1823 if "rows" not in first_page_response:
1824 first_page_response = None
1825
1826 rows = self._client._list_rows_from_query_results(
1827 self.job_id,
1828 self.location,
1829 self.project,
1830 self._query_results.schema,
1831 total_rows=self._query_results.total_rows,
1832 destination=self.destination,
1833 page_size=page_size,
1834 max_results=max_results,
1835 start_index=start_index,
1836 retry=retry,
1837 query_id=self.query_id,
1838 first_page_response=first_page_response,
1839 num_dml_affected_rows=self._query_results.num_dml_affected_rows,
1840 query=self.query,
1841 total_bytes_processed=self.total_bytes_processed,
1842 slot_millis=self.slot_millis,
1843 created=self.created,
1844 started=self.started,
1845 ended=self.ended,
1846 **list_rows_kwargs,
1847 )
1848 rows._preserve_order = _contains_order_by(self.query)
1849 return rows
1850
1851 # If changing the signature of this method, make sure to apply the same
1852 # changes to table.RowIterator.to_arrow(), except for the max_results parameter
1853 # that should only exist here in the QueryJob method.
1854 def to_arrow(
1855 self,
1856 progress_bar_type: Optional[str] = None,
1857 bqstorage_client: Optional["bigquery_storage.BigQueryReadClient"] = None,
1858 create_bqstorage_client: bool = True,
1859 max_results: Optional[int] = None,
1860 ) -> "pyarrow.Table":
1861 """[Beta] Create a class:`pyarrow.Table` by loading all pages of a
1862 table or query.
1863
1864 Args:
1865 progress_bar_type (Optional[str]):
1866 If set, use the `tqdm <https://tqdm.github.io/>`_ library to
1867 display a progress bar while the data downloads. Install the
1868 ``tqdm`` package to use this feature.
1869
1870 Possible values of ``progress_bar_type`` include:
1871
1872 ``None``
1873 No progress bar.
1874 ``'tqdm'``
1875 Use the :func:`tqdm.tqdm` function to print a progress bar
1876 to :data:`sys.stdout`.
1877 ``'tqdm_notebook'``
1878 Use the :func:`tqdm.notebook.tqdm` function to display a
1879 progress bar as a Jupyter notebook widget.
1880 ``'tqdm_gui'``
1881 Use the :func:`tqdm.tqdm_gui` function to display a
1882 progress bar as a graphical dialog box.
1883 bqstorage_client (Optional[google.cloud.bigquery_storage_v1.BigQueryReadClient]):
1884 A BigQuery Storage API client. If supplied, use the faster
1885 BigQuery Storage API to fetch rows from BigQuery. This API
1886 is a billable API.
1887
1888 This method requires ``google-cloud-bigquery-storage`` library.
1889
1890 Reading from a specific partition or snapshot is not
1891 currently supported by this method.
1892 create_bqstorage_client (Optional[bool]):
1893 If ``True`` (default), create a BigQuery Storage API client
1894 using the default API settings. The BigQuery Storage API
1895 is a faster way to fetch rows from BigQuery. See the
1896 ``bqstorage_client`` parameter for more information.
1897
1898 This argument does nothing if ``bqstorage_client`` is supplied.
1899
1900 .. versionadded:: 1.24.0
1901
1902 max_results (Optional[int]):
1903 Maximum number of rows to include in the result. No limit by default.
1904
1905 .. versionadded:: 2.21.0
1906
1907 Returns:
1908 pyarrow.Table
1909 A :class:`pyarrow.Table` populated with row data and column
1910 headers from the query results. The column headers are derived
1911 from the destination table's schema.
1912
1913 Raises:
1914 ValueError:
1915 If the :mod:`pyarrow` library cannot be imported.
1916
1917 .. versionadded:: 1.17.0
1918 """
1919 query_result = wait_for_query(self, progress_bar_type, max_results=max_results)
1920 return query_result.to_arrow(
1921 progress_bar_type=progress_bar_type,
1922 bqstorage_client=bqstorage_client,
1923 create_bqstorage_client=create_bqstorage_client,
1924 )
1925
1926 # If changing the signature of this method, make sure to apply the same
1927 # changes to table.RowIterator.to_dataframe(), except for the max_results parameter
1928 # that should only exist here in the QueryJob method.
1929 def to_dataframe(
1930 self,
1931 bqstorage_client: Optional["bigquery_storage.BigQueryReadClient"] = None,
1932 dtypes: Optional[Dict[str, Any]] = None,
1933 progress_bar_type: Optional[str] = None,
1934 create_bqstorage_client: bool = True,
1935 max_results: Optional[int] = None,
1936 geography_as_object: bool = False,
1937 bool_dtype: Union[Any, None] = DefaultPandasDTypes.BOOL_DTYPE,
1938 int_dtype: Union[Any, None] = DefaultPandasDTypes.INT_DTYPE,
1939 float_dtype: Union[Any, None] = None,
1940 string_dtype: Union[Any, None] = None,
1941 date_dtype: Union[Any, None] = DefaultPandasDTypes.DATE_DTYPE,
1942 datetime_dtype: Union[Any, None] = None,
1943 time_dtype: Union[Any, None] = DefaultPandasDTypes.TIME_DTYPE,
1944 timestamp_dtype: Union[Any, None] = None,
1945 range_date_dtype: Union[Any, None] = DefaultPandasDTypes.RANGE_DATE_DTYPE,
1946 range_datetime_dtype: Union[
1947 Any, None
1948 ] = DefaultPandasDTypes.RANGE_DATETIME_DTYPE,
1949 range_timestamp_dtype: Union[
1950 Any, None
1951 ] = DefaultPandasDTypes.RANGE_TIMESTAMP_DTYPE,
1952 ) -> "pandas.DataFrame":
1953 """Return a pandas DataFrame from a QueryJob
1954
1955 Args:
1956 bqstorage_client (Optional[google.cloud.bigquery_storage_v1.BigQueryReadClient]):
1957 A BigQuery Storage API client. If supplied, use the faster
1958 BigQuery Storage API to fetch rows from BigQuery. This
1959 API is a billable API.
1960
1961 This method requires the ``fastavro`` and
1962 ``google-cloud-bigquery-storage`` libraries.
1963
1964 Reading from a specific partition or snapshot is not
1965 currently supported by this method.
1966
1967 dtypes (Optional[Map[str, Union[str, pandas.Series.dtype]]]):
1968 A dictionary of column names pandas ``dtype``s. The provided
1969 ``dtype`` is used when constructing the series for the column
1970 specified. Otherwise, the default pandas behavior is used.
1971
1972 progress_bar_type (Optional[str]):
1973 If set, use the `tqdm <https://tqdm.github.io/>`_ library to
1974 display a progress bar while the data downloads. Install the
1975 ``tqdm`` package to use this feature.
1976
1977 See
1978 :func:`~google.cloud.bigquery.table.RowIterator.to_dataframe`
1979 for details.
1980
1981 .. versionadded:: 1.11.0
1982 create_bqstorage_client (Optional[bool]):
1983 If ``True`` (default), create a BigQuery Storage API client
1984 using the default API settings. The BigQuery Storage API
1985 is a faster way to fetch rows from BigQuery. See the
1986 ``bqstorage_client`` parameter for more information.
1987
1988 This argument does nothing if ``bqstorage_client`` is supplied.
1989
1990 .. versionadded:: 1.24.0
1991
1992 max_results (Optional[int]):
1993 Maximum number of rows to include in the result. No limit by default.
1994
1995 .. versionadded:: 2.21.0
1996
1997 geography_as_object (Optional[bool]):
1998 If ``True``, convert GEOGRAPHY data to :mod:`shapely`
1999 geometry objects. If ``False`` (default), don't cast
2000 geography data to :mod:`shapely` geometry objects.
2001
2002 .. versionadded:: 2.24.0
2003
2004 bool_dtype (Optional[pandas.Series.dtype, None]):
2005 If set, indicate a pandas ExtensionDtype (e.g. ``pandas.BooleanDtype()``)
2006 to convert BigQuery Boolean type, instead of relying on the default
2007 ``pandas.BooleanDtype()``. If you explicitly set the value to ``None``,
2008 then the data type will be ``numpy.dtype("bool")``. BigQuery Boolean
2009 type can be found at:
2010 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#boolean_type
2011
2012 .. versionadded:: 3.8.0
2013
2014 int_dtype (Optional[pandas.Series.dtype, None]):
2015 If set, indicate a pandas ExtensionDtype (e.g. ``pandas.Int64Dtype()``)
2016 to convert BigQuery Integer types, instead of relying on the default
2017 ``pandas.Int64Dtype()``. If you explicitly set the value to ``None``,
2018 then the data type will be ``numpy.dtype("int64")``. A list of BigQuery
2019 Integer types can be found at:
2020 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#integer_types
2021
2022 .. versionadded:: 3.8.0
2023
2024 float_dtype (Optional[pandas.Series.dtype, None]):
2025 If set, indicate a pandas ExtensionDtype (e.g. ``pandas.Float32Dtype()``)
2026 to convert BigQuery Float type, instead of relying on the default
2027 ``numpy.dtype("float64")``. If you explicitly set the value to ``None``,
2028 then the data type will be ``numpy.dtype("float64")``. BigQuery Float
2029 type can be found at:
2030 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#floating_point_types
2031
2032 .. versionadded:: 3.8.0
2033
2034 string_dtype (Optional[pandas.Series.dtype, None]):
2035 If set, indicate a pandas ExtensionDtype (e.g. ``pandas.StringDtype()``) to
2036 convert BigQuery String type, instead of relying on the default
2037 ``numpy.dtype("object")``. If you explicitly set the value to ``None``,
2038 then the data type will be ``numpy.dtype("object")``. BigQuery String
2039 type can be found at:
2040 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#string_type
2041
2042 .. versionadded:: 3.8.0
2043
2044 date_dtype (Optional[pandas.Series.dtype, None]):
2045 If set, indicate a pandas ExtensionDtype (e.g.
2046 ``pandas.ArrowDtype(pyarrow.date32())``) to convert BigQuery Date
2047 type, instead of relying on the default ``db_dtypes.DateDtype()``.
2048 If you explicitly set the value to ``None``, then the data type will be
2049 ``numpy.dtype("datetime64[ns]")`` or ``object`` if out of bound. BigQuery
2050 Date type can be found at:
2051 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#date_type
2052
2053 .. versionadded:: 3.10.0
2054
2055 datetime_dtype (Optional[pandas.Series.dtype, None]):
2056 If set, indicate a pandas ExtensionDtype (e.g.
2057 ``pandas.ArrowDtype(pyarrow.timestamp("us"))``) to convert BigQuery Datetime
2058 type, instead of relying on the default ``numpy.dtype("datetime64[ns]``.
2059 If you explicitly set the value to ``None``, then the data type will be
2060 ``numpy.dtype("datetime64[ns]")`` or ``object`` if out of bound. BigQuery
2061 Datetime type can be found at:
2062 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#datetime_type
2063
2064 .. versionadded:: 3.10.0
2065
2066 time_dtype (Optional[pandas.Series.dtype, None]):
2067 If set, indicate a pandas ExtensionDtype (e.g.
2068 ``pandas.ArrowDtype(pyarrow.time64("us"))``) to convert BigQuery Time
2069 type, instead of relying on the default ``db_dtypes.TimeDtype()``.
2070 If you explicitly set the value to ``None``, then the data type will be
2071 ``numpy.dtype("object")``. BigQuery Time type can be found at:
2072 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#time_type
2073
2074 .. versionadded:: 3.10.0
2075
2076 timestamp_dtype (Optional[pandas.Series.dtype, None]):
2077 If set, indicate a pandas ExtensionDtype (e.g.
2078 ``pandas.ArrowDtype(pyarrow.timestamp("us", tz="UTC"))``) to convert BigQuery Timestamp
2079 type, instead of relying on the default ``numpy.dtype("datetime64[ns, UTC]")``.
2080 If you explicitly set the value to ``None``, then the data type will be
2081 ``numpy.dtype("datetime64[ns, UTC]")`` or ``object`` if out of bound. BigQuery
2082 Datetime type can be found at:
2083 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#timestamp_type
2084
2085 .. versionadded:: 3.10.0
2086
2087 range_date_dtype (Optional[pandas.Series.dtype, None]):
2088 If set, indicate a pandas ExtensionDtype, such as:
2089
2090 .. code-block:: python
2091
2092 pandas.ArrowDtype(pyarrow.struct(
2093 [("start", pyarrow.date32()), ("end", pyarrow.date32())]
2094 ))
2095
2096 to convert BigQuery RANGE<DATE> type, instead of relying on
2097 the default ``object``. If you explicitly set the value to
2098 ``None``, the data type will be ``object``. BigQuery Range type
2099 can be found at:
2100 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#range_type
2101
2102 .. versionadded:: 3.21.0
2103
2104 range_datetime_dtype (Optional[pandas.Series.dtype, None]):
2105 If set, indicate a pandas ExtensionDtype, such as:
2106
2107 .. code-block:: python
2108
2109 pandas.ArrowDtype(pyarrow.struct(
2110 [
2111 ("start", pyarrow.timestamp("us")),
2112 ("end", pyarrow.timestamp("us")),
2113 ]
2114 ))
2115
2116 to convert BigQuery RANGE<DATETIME> type, instead of relying on
2117 the default ``object``. If you explicitly set the value to
2118 ``None``, the data type will be ``object``. BigQuery Range type
2119 can be found at:
2120 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#range_type
2121
2122 .. versionadded:: 3.21.0
2123
2124 range_timestamp_dtype (Optional[pandas.Series.dtype, None]):
2125 If set, indicate a pandas ExtensionDtype, such as:
2126
2127 .. code-block:: python
2128
2129 pandas.ArrowDtype(pyarrow.struct(
2130 [
2131 ("start", pyarrow.timestamp("us", tz="UTC")),
2132 ("end", pyarrow.timestamp("us", tz="UTC")),
2133 ]
2134 ))
2135
2136 to convert BigQuery RANGE<TIMESTAMP> type, instead of relying
2137 on the default ``object``. If you explicitly set the value to
2138 ``None``, the data type will be ``object``. BigQuery Range type
2139 can be found at:
2140 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#range_type
2141
2142 .. versionadded:: 3.21.0
2143
2144 Returns:
2145 pandas.DataFrame:
2146 A :class:`~pandas.DataFrame` populated with row data
2147 and column headers from the query results. The column
2148 headers are derived from the destination table's
2149 schema.
2150
2151 Raises:
2152 ValueError:
2153 If the :mod:`pandas` library cannot be imported, or
2154 the :mod:`google.cloud.bigquery_storage_v1` module is
2155 required but cannot be imported. Also if
2156 `geography_as_object` is `True`, but the
2157 :mod:`shapely` library cannot be imported.
2158 """
2159 query_result = wait_for_query(self, progress_bar_type, max_results=max_results)
2160 return query_result.to_dataframe(
2161 bqstorage_client=bqstorage_client,
2162 dtypes=dtypes,
2163 progress_bar_type=progress_bar_type,
2164 create_bqstorage_client=create_bqstorage_client,
2165 geography_as_object=geography_as_object,
2166 bool_dtype=bool_dtype,
2167 int_dtype=int_dtype,
2168 float_dtype=float_dtype,
2169 string_dtype=string_dtype,
2170 date_dtype=date_dtype,
2171 datetime_dtype=datetime_dtype,
2172 time_dtype=time_dtype,
2173 timestamp_dtype=timestamp_dtype,
2174 range_date_dtype=range_date_dtype,
2175 range_datetime_dtype=range_datetime_dtype,
2176 range_timestamp_dtype=range_timestamp_dtype,
2177 )
2178
2179 # If changing the signature of this method, make sure to apply the same
2180 # changes to table.RowIterator.to_dataframe(), except for the max_results parameter
2181 # that should only exist here in the QueryJob method.
2182 def to_geodataframe(
2183 self,
2184 bqstorage_client: Optional["bigquery_storage.BigQueryReadClient"] = None,
2185 dtypes: Optional[Dict[str, Any]] = None,
2186 progress_bar_type: Optional[str] = None,
2187 create_bqstorage_client: bool = True,
2188 max_results: Optional[int] = None,
2189 geography_column: Optional[str] = None,
2190 bool_dtype: Union[Any, None] = DefaultPandasDTypes.BOOL_DTYPE,
2191 int_dtype: Union[Any, None] = DefaultPandasDTypes.INT_DTYPE,
2192 float_dtype: Union[Any, None] = None,
2193 string_dtype: Union[Any, None] = None,
2194 ) -> "geopandas.GeoDataFrame":
2195 """Return a GeoPandas GeoDataFrame from a QueryJob
2196
2197 Args:
2198 bqstorage_client (Optional[google.cloud.bigquery_storage_v1.BigQueryReadClient]):
2199 A BigQuery Storage API client. If supplied, use the faster
2200 BigQuery Storage API to fetch rows from BigQuery. This
2201 API is a billable API.
2202
2203 This method requires the ``fastavro`` and
2204 ``google-cloud-bigquery-storage`` libraries.
2205
2206 Reading from a specific partition or snapshot is not
2207 currently supported by this method.
2208
2209 dtypes (Optional[Map[str, Union[str, pandas.Series.dtype]]]):
2210 A dictionary of column names pandas ``dtype``s. The provided
2211 ``dtype`` is used when constructing the series for the column
2212 specified. Otherwise, the default pandas behavior is used.
2213
2214 progress_bar_type (Optional[str]):
2215 If set, use the `tqdm <https://tqdm.github.io/>`_ library to
2216 display a progress bar while the data downloads. Install the
2217 ``tqdm`` package to use this feature.
2218
2219 See
2220 :func:`~google.cloud.bigquery.table.RowIterator.to_dataframe`
2221 for details.
2222
2223 .. versionadded:: 1.11.0
2224 create_bqstorage_client (Optional[bool]):
2225 If ``True`` (default), create a BigQuery Storage API client
2226 using the default API settings. The BigQuery Storage API
2227 is a faster way to fetch rows from BigQuery. See the
2228 ``bqstorage_client`` parameter for more information.
2229
2230 This argument does nothing if ``bqstorage_client`` is supplied.
2231
2232 .. versionadded:: 1.24.0
2233
2234 max_results (Optional[int]):
2235 Maximum number of rows to include in the result. No limit by default.
2236
2237 .. versionadded:: 2.21.0
2238
2239 geography_column (Optional[str]):
2240 If there are more than one GEOGRAPHY column,
2241 identifies which one to use to construct a GeoPandas
2242 GeoDataFrame. This option can be ommitted if there's
2243 only one GEOGRAPHY column.
2244 bool_dtype (Optional[pandas.Series.dtype, None]):
2245 If set, indicate a pandas ExtensionDtype (e.g. ``pandas.BooleanDtype()``)
2246 to convert BigQuery Boolean type, instead of relying on the default
2247 ``pandas.BooleanDtype()``. If you explicitly set the value to ``None``,
2248 then the data type will be ``numpy.dtype("bool")``. BigQuery Boolean
2249 type can be found at:
2250 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#boolean_type
2251 int_dtype (Optional[pandas.Series.dtype, None]):
2252 If set, indicate a pandas ExtensionDtype (e.g. ``pandas.Int64Dtype()``)
2253 to convert BigQuery Integer types, instead of relying on the default
2254 ``pandas.Int64Dtype()``. If you explicitly set the value to ``None``,
2255 then the data type will be ``numpy.dtype("int64")``. A list of BigQuery
2256 Integer types can be found at:
2257 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#integer_types
2258 float_dtype (Optional[pandas.Series.dtype, None]):
2259 If set, indicate a pandas ExtensionDtype (e.g. ``pandas.Float32Dtype()``)
2260 to convert BigQuery Float type, instead of relying on the default
2261 ``numpy.dtype("float64")``. If you explicitly set the value to ``None``,
2262 then the data type will be ``numpy.dtype("float64")``. BigQuery Float
2263 type can be found at:
2264 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#floating_point_types
2265 string_dtype (Optional[pandas.Series.dtype, None]):
2266 If set, indicate a pandas ExtensionDtype (e.g. ``pandas.StringDtype()``) to
2267 convert BigQuery String type, instead of relying on the default
2268 ``numpy.dtype("object")``. If you explicitly set the value to ``None``,
2269 then the data type will be ``numpy.dtype("object")``. BigQuery String
2270 type can be found at:
2271 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#string_type
2272
2273 Returns:
2274 geopandas.GeoDataFrame:
2275 A :class:`geopandas.GeoDataFrame` populated with row
2276 data and column headers from the query results. The
2277 column headers are derived from the destination
2278 table's schema.
2279
2280 Raises:
2281 ValueError:
2282 If the :mod:`geopandas` library cannot be imported, or the
2283 :mod:`google.cloud.bigquery_storage_v1` module is
2284 required but cannot be imported.
2285
2286 .. versionadded:: 2.24.0
2287 """
2288 query_result = wait_for_query(self, progress_bar_type, max_results=max_results)
2289 return query_result.to_geodataframe(
2290 bqstorage_client=bqstorage_client,
2291 dtypes=dtypes,
2292 progress_bar_type=progress_bar_type,
2293 create_bqstorage_client=create_bqstorage_client,
2294 geography_column=geography_column,
2295 bool_dtype=bool_dtype,
2296 int_dtype=int_dtype,
2297 float_dtype=float_dtype,
2298 string_dtype=string_dtype,
2299 )
2300
2301 def __iter__(self):
2302 return iter(self.result())
2303
2304
2305class QueryPlanEntryStep(object):
2306 """Map a single step in a query plan entry.
2307
2308 Args:
2309 kind (str): step type.
2310 substeps (List): names of substeps.
2311 """
2312
2313 def __init__(self, kind, substeps):
2314 self.kind = kind
2315 self.substeps = list(substeps)
2316
2317 @classmethod
2318 def from_api_repr(cls, resource: dict) -> "QueryPlanEntryStep":
2319 """Factory: construct instance from the JSON repr.
2320
2321 Args:
2322 resource (Dict): JSON representation of the entry.
2323
2324 Returns:
2325 google.cloud.bigquery.job.QueryPlanEntryStep:
2326 New instance built from the resource.
2327 """
2328 return cls(kind=resource.get("kind"), substeps=resource.get("substeps", ()))
2329
2330 def __eq__(self, other):
2331 if not isinstance(other, self.__class__):
2332 return NotImplemented
2333 return self.kind == other.kind and self.substeps == other.substeps
2334
2335
2336class QueryPlanEntry(object):
2337 """QueryPlanEntry represents a single stage of a query execution plan.
2338
2339 See
2340 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#ExplainQueryStage
2341 for the underlying API representation within query statistics.
2342 """
2343
2344 def __init__(self):
2345 self._properties = {}
2346
2347 @classmethod
2348 def from_api_repr(cls, resource: dict) -> "QueryPlanEntry":
2349 """Factory: construct instance from the JSON repr.
2350
2351 Args:
2352 resource(Dict[str: object]):
2353 ExplainQueryStage representation returned from API.
2354
2355 Returns:
2356 google.cloud.bigquery.job.QueryPlanEntry:
2357 Query plan entry parsed from ``resource``.
2358 """
2359 entry = cls()
2360 entry._properties = resource
2361 return entry
2362
2363 @property
2364 def name(self):
2365 """Optional[str]: Human-readable name of the stage."""
2366 return self._properties.get("name")
2367
2368 @property
2369 def entry_id(self):
2370 """Optional[str]: Unique ID for the stage within the plan."""
2371 return self._properties.get("id")
2372
2373 @property
2374 def start(self):
2375 """Optional[Datetime]: Datetime when the stage started."""
2376 if self._properties.get("startMs") is None:
2377 return None
2378 return _helpers._datetime_from_microseconds(
2379 int(self._properties.get("startMs")) * 1000.0
2380 )
2381
2382 @property
2383 def end(self):
2384 """Optional[Datetime]: Datetime when the stage ended."""
2385 if self._properties.get("endMs") is None:
2386 return None
2387 return _helpers._datetime_from_microseconds(
2388 int(self._properties.get("endMs")) * 1000.0
2389 )
2390
2391 @property
2392 def input_stages(self):
2393 """List(int): Entry IDs for stages that were inputs for this stage."""
2394 if self._properties.get("inputStages") is None:
2395 return []
2396 return [
2397 _helpers._int_or_none(entry)
2398 for entry in self._properties.get("inputStages")
2399 ]
2400
2401 @property
2402 def parallel_inputs(self):
2403 """Optional[int]: Number of parallel input segments within
2404 the stage.
2405 """
2406 return _helpers._int_or_none(self._properties.get("parallelInputs"))
2407
2408 @property
2409 def completed_parallel_inputs(self):
2410 """Optional[int]: Number of parallel input segments completed."""
2411 return _helpers._int_or_none(self._properties.get("completedParallelInputs"))
2412
2413 @property
2414 def wait_ms_avg(self):
2415 """Optional[int]: Milliseconds the average worker spent waiting to
2416 be scheduled.
2417 """
2418 return _helpers._int_or_none(self._properties.get("waitMsAvg"))
2419
2420 @property
2421 def wait_ms_max(self):
2422 """Optional[int]: Milliseconds the slowest worker spent waiting to
2423 be scheduled.
2424 """
2425 return _helpers._int_or_none(self._properties.get("waitMsMax"))
2426
2427 @property
2428 def wait_ratio_avg(self):
2429 """Optional[float]: Ratio of time the average worker spent waiting
2430 to be scheduled, relative to the longest time spent by any worker in
2431 any stage of the overall plan.
2432 """
2433 return self._properties.get("waitRatioAvg")
2434
2435 @property
2436 def wait_ratio_max(self):
2437 """Optional[float]: Ratio of time the slowest worker spent waiting
2438 to be scheduled, relative to the longest time spent by any worker in
2439 any stage of the overall plan.
2440 """
2441 return self._properties.get("waitRatioMax")
2442
2443 @property
2444 def read_ms_avg(self):
2445 """Optional[int]: Milliseconds the average worker spent reading
2446 input.
2447 """
2448 return _helpers._int_or_none(self._properties.get("readMsAvg"))
2449
2450 @property
2451 def read_ms_max(self):
2452 """Optional[int]: Milliseconds the slowest worker spent reading
2453 input.
2454 """
2455 return _helpers._int_or_none(self._properties.get("readMsMax"))
2456
2457 @property
2458 def read_ratio_avg(self):
2459 """Optional[float]: Ratio of time the average worker spent reading
2460 input, relative to the longest time spent by any worker in any stage
2461 of the overall plan.
2462 """
2463 return self._properties.get("readRatioAvg")
2464
2465 @property
2466 def read_ratio_max(self):
2467 """Optional[float]: Ratio of time the slowest worker spent reading
2468 to be scheduled, relative to the longest time spent by any worker in
2469 any stage of the overall plan.
2470 """
2471 return self._properties.get("readRatioMax")
2472
2473 @property
2474 def compute_ms_avg(self):
2475 """Optional[int]: Milliseconds the average worker spent on CPU-bound
2476 processing.
2477 """
2478 return _helpers._int_or_none(self._properties.get("computeMsAvg"))
2479
2480 @property
2481 def compute_ms_max(self):
2482 """Optional[int]: Milliseconds the slowest worker spent on CPU-bound
2483 processing.
2484 """
2485 return _helpers._int_or_none(self._properties.get("computeMsMax"))
2486
2487 @property
2488 def compute_ratio_avg(self):
2489 """Optional[float]: Ratio of time the average worker spent on
2490 CPU-bound processing, relative to the longest time spent by any
2491 worker in any stage of the overall plan.
2492 """
2493 return self._properties.get("computeRatioAvg")
2494
2495 @property
2496 def compute_ratio_max(self):
2497 """Optional[float]: Ratio of time the slowest worker spent on
2498 CPU-bound processing, relative to the longest time spent by any
2499 worker in any stage of the overall plan.
2500 """
2501 return self._properties.get("computeRatioMax")
2502
2503 @property
2504 def write_ms_avg(self):
2505 """Optional[int]: Milliseconds the average worker spent writing
2506 output data.
2507 """
2508 return _helpers._int_or_none(self._properties.get("writeMsAvg"))
2509
2510 @property
2511 def write_ms_max(self):
2512 """Optional[int]: Milliseconds the slowest worker spent writing
2513 output data.
2514 """
2515 return _helpers._int_or_none(self._properties.get("writeMsMax"))
2516
2517 @property
2518 def write_ratio_avg(self):
2519 """Optional[float]: Ratio of time the average worker spent writing
2520 output data, relative to the longest time spent by any worker in any
2521 stage of the overall plan.
2522 """
2523 return self._properties.get("writeRatioAvg")
2524
2525 @property
2526 def write_ratio_max(self):
2527 """Optional[float]: Ratio of time the slowest worker spent writing
2528 output data, relative to the longest time spent by any worker in any
2529 stage of the overall plan.
2530 """
2531 return self._properties.get("writeRatioMax")
2532
2533 @property
2534 def records_read(self):
2535 """Optional[int]: Number of records read by this stage."""
2536 return _helpers._int_or_none(self._properties.get("recordsRead"))
2537
2538 @property
2539 def records_written(self):
2540 """Optional[int]: Number of records written by this stage."""
2541 return _helpers._int_or_none(self._properties.get("recordsWritten"))
2542
2543 @property
2544 def status(self):
2545 """Optional[str]: status of this stage."""
2546 return self._properties.get("status")
2547
2548 @property
2549 def shuffle_output_bytes(self):
2550 """Optional[int]: Number of bytes written by this stage to
2551 intermediate shuffle.
2552 """
2553 return _helpers._int_or_none(self._properties.get("shuffleOutputBytes"))
2554
2555 @property
2556 def shuffle_output_bytes_spilled(self):
2557 """Optional[int]: Number of bytes written by this stage to
2558 intermediate shuffle and spilled to disk.
2559 """
2560 return _helpers._int_or_none(self._properties.get("shuffleOutputBytesSpilled"))
2561
2562 @property
2563 def steps(self):
2564 """List(QueryPlanEntryStep): List of step operations performed by
2565 each worker in the stage.
2566 """
2567 return [
2568 QueryPlanEntryStep.from_api_repr(step)
2569 for step in self._properties.get("steps", [])
2570 ]
2571
2572 @property
2573 def slot_ms(self):
2574 """Optional[int]: Slot-milliseconds used by the stage."""
2575 return _helpers._int_or_none(self._properties.get("slotMs"))
2576
2577
2578class TimelineEntry(object):
2579 """TimelineEntry represents progress of a query job at a particular
2580 point in time.
2581
2582 See
2583 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#querytimelinesample
2584 for the underlying API representation within query statistics.
2585 """
2586
2587 def __init__(self):
2588 self._properties = {}
2589
2590 @classmethod
2591 def from_api_repr(cls, resource):
2592 """Factory: construct instance from the JSON repr.
2593
2594 Args:
2595 resource(Dict[str: object]):
2596 QueryTimelineSample representation returned from API.
2597
2598 Returns:
2599 google.cloud.bigquery.TimelineEntry:
2600 Timeline sample parsed from ``resource``.
2601 """
2602 entry = cls()
2603 entry._properties = resource
2604 return entry
2605
2606 @property
2607 def elapsed_ms(self):
2608 """Optional[int]: Milliseconds elapsed since start of query
2609 execution."""
2610 return _helpers._int_or_none(self._properties.get("elapsedMs"))
2611
2612 @property
2613 def active_units(self):
2614 """Optional[int]: Current number of input units being processed
2615 by workers, reported as largest value since the last sample."""
2616 return _helpers._int_or_none(self._properties.get("activeUnits"))
2617
2618 @property
2619 def pending_units(self):
2620 """Optional[int]: Current number of input units remaining for
2621 query stages active at this sample time."""
2622 return _helpers._int_or_none(self._properties.get("pendingUnits"))
2623
2624 @property
2625 def completed_units(self):
2626 """Optional[int]: Current number of input units completed by
2627 this query."""
2628 return _helpers._int_or_none(self._properties.get("completedUnits"))
2629
2630 @property
2631 def slot_millis(self):
2632 """Optional[int]: Cumulative slot-milliseconds consumed by
2633 this query."""
2634 return _helpers._int_or_none(self._properties.get("totalSlotMs"))