1# Copyright 2015 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Classes for query jobs."""
16
17import concurrent.futures
18import copy
19import re
20import time
21import typing
22from typing import Any, Dict, Iterable, List, Optional, Union
23
24from google.api_core import exceptions
25from google.api_core import retry as retries
26import requests
27
28from google.cloud.bigquery.dataset import Dataset
29from google.cloud.bigquery.dataset import DatasetListItem
30from google.cloud.bigquery.dataset import DatasetReference
31from google.cloud.bigquery.encryption_configuration import EncryptionConfiguration
32from google.cloud.bigquery.enums import KeyResultStatementKind, DefaultPandasDTypes
33from google.cloud.bigquery.external_config import ExternalConfig
34from google.cloud.bigquery import _helpers
35from google.cloud.bigquery.query import (
36 _query_param_from_api_repr,
37 ArrayQueryParameter,
38 ConnectionProperty,
39 ScalarQueryParameter,
40 StructQueryParameter,
41 UDFResource,
42)
43from google.cloud.bigquery.retry import (
44 DEFAULT_RETRY,
45 DEFAULT_JOB_RETRY,
46 POLLING_DEFAULT_VALUE,
47)
48from google.cloud.bigquery.routine import RoutineReference
49from google.cloud.bigquery.schema import SchemaField
50from google.cloud.bigquery.table import _EmptyRowIterator
51from google.cloud.bigquery.table import RangePartitioning
52from google.cloud.bigquery.table import _table_arg_to_table_ref
53from google.cloud.bigquery.table import TableReference
54from google.cloud.bigquery.table import TimePartitioning
55from google.cloud.bigquery._tqdm_helpers import wait_for_query
56
57from google.cloud.bigquery.job.base import _AsyncJob
58from google.cloud.bigquery.job.base import _JobConfig
59from google.cloud.bigquery.job.base import _JobReference
60
61try:
62 import pandas # type: ignore
63except ImportError:
64 pandas = None
65
66if typing.TYPE_CHECKING: # pragma: NO COVER
67 # Assumption: type checks are only used by library developers and CI environments
68 # that have all optional dependencies installed, thus no conditional imports.
69 import pandas # type: ignore
70 import geopandas # type: ignore
71 import pyarrow # type: ignore
72 from google.cloud import bigquery_storage
73 from google.cloud.bigquery.client import Client
74 from google.cloud.bigquery.table import RowIterator
75
76
77_CONTAINS_ORDER_BY = re.compile(r"ORDER\s+BY", re.IGNORECASE)
78_EXCEPTION_FOOTER_TEMPLATE = "{message}\n\nLocation: {location}\nJob ID: {job_id}\n"
79_TIMEOUT_BUFFER_SECS = 0.1
80
81
82def _contains_order_by(query):
83 """Do we need to preserve the order of the query results?
84
85 This function has known false positives, such as with ordered window
86 functions:
87
88 .. code-block:: sql
89
90 SELECT SUM(x) OVER (
91 window_name
92 PARTITION BY...
93 ORDER BY...
94 window_frame_clause)
95 FROM ...
96
97 This false positive failure case means the behavior will be correct, but
98 downloading results with the BigQuery Storage API may be slower than it
99 otherwise would. This is preferable to the false negative case, where
100 results are expected to be in order but are not (due to parallel reads).
101 """
102 return query and _CONTAINS_ORDER_BY.search(query)
103
104
105def _from_api_repr_query_parameters(resource):
106 return [_query_param_from_api_repr(mapping) for mapping in resource]
107
108
109def _to_api_repr_query_parameters(value):
110 return [query_parameter.to_api_repr() for query_parameter in value]
111
112
113def _from_api_repr_udf_resources(resource):
114 udf_resources = []
115 for udf_mapping in resource:
116 for udf_type, udf_value in udf_mapping.items():
117 udf_resources.append(UDFResource(udf_type, udf_value))
118 return udf_resources
119
120
121def _to_api_repr_udf_resources(value):
122 return [{udf_resource.udf_type: udf_resource.value} for udf_resource in value]
123
124
125def _from_api_repr_table_defs(resource):
126 return {k: ExternalConfig.from_api_repr(v) for k, v in resource.items()}
127
128
129def _to_api_repr_table_defs(value):
130 return {k: ExternalConfig.to_api_repr(v) for k, v in value.items()}
131
132
133class BiEngineReason(typing.NamedTuple):
134 """Reason for BI Engine acceleration failure
135
136 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#bienginereason
137 """
138
139 code: str = "CODE_UNSPECIFIED"
140
141 reason: str = ""
142
143 @classmethod
144 def from_api_repr(cls, reason: Dict[str, str]) -> "BiEngineReason":
145 return cls(reason.get("code", "CODE_UNSPECIFIED"), reason.get("message", ""))
146
147
148class BiEngineStats(typing.NamedTuple):
149 """Statistics for a BI Engine query
150
151 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#bienginestatistics
152 """
153
154 mode: str = "ACCELERATION_MODE_UNSPECIFIED"
155 """ Specifies which mode of BI Engine acceleration was performed (if any)
156 """
157
158 reasons: List[BiEngineReason] = []
159 """ Contains explanatory messages in case of DISABLED / PARTIAL acceleration
160 """
161
162 @classmethod
163 def from_api_repr(cls, stats: Dict[str, Any]) -> "BiEngineStats":
164 mode = stats.get("biEngineMode", "ACCELERATION_MODE_UNSPECIFIED")
165 reasons = [
166 BiEngineReason.from_api_repr(r) for r in stats.get("biEngineReasons", [])
167 ]
168 return cls(mode, reasons)
169
170
171class DmlStats(typing.NamedTuple):
172 """Detailed statistics for DML statements.
173
174 https://cloud.google.com/bigquery/docs/reference/rest/v2/DmlStats
175 """
176
177 inserted_row_count: int = 0
178 """Number of inserted rows. Populated by DML INSERT and MERGE statements."""
179
180 deleted_row_count: int = 0
181 """Number of deleted rows. populated by DML DELETE, MERGE and TRUNCATE statements.
182 """
183
184 updated_row_count: int = 0
185 """Number of updated rows. Populated by DML UPDATE and MERGE statements."""
186
187 @classmethod
188 def from_api_repr(cls, stats: Dict[str, str]) -> "DmlStats":
189 # NOTE: The field order here must match the order of fields set at the
190 # class level.
191 api_fields = ("insertedRowCount", "deletedRowCount", "updatedRowCount")
192
193 args = (
194 int(stats.get(api_field, default_val))
195 for api_field, default_val in zip(api_fields, cls.__new__.__defaults__) # type: ignore
196 )
197 return cls(*args)
198
199
200class IncrementalResultStats:
201 """IncrementalResultStats provides information about incremental query execution."""
202
203 def __init__(self):
204 self._properties = {}
205
206 @classmethod
207 def from_api_repr(cls, resource) -> "IncrementalResultStats":
208 """Factory: construct instance from the JSON repr.
209
210 Args:
211 resource(Dict[str: object]):
212 IncrementalResultStats representation returned from API.
213
214 Returns:
215 google.cloud.bigquery.job.IncrementalResultStats:
216 stats parsed from ``resource``.
217 """
218 entry = cls()
219 entry._properties = resource
220 return entry
221
222 @property
223 def disabled_reason(self):
224 """Optional[string]: Reason why incremental results were not
225 written by the query.
226 """
227 return _helpers._str_or_none(self._properties.get("disabledReason"))
228
229 @property
230 def result_set_last_replace_time(self):
231 """Optional[datetime]: The time at which the result table's contents
232 were completely replaced. May be absent if no results have been written
233 or the query has completed."""
234 from google.cloud._helpers import _rfc3339_nanos_to_datetime
235
236 value = self._properties.get("resultSetLastReplaceTime")
237 if value:
238 try:
239 return _rfc3339_nanos_to_datetime(value)
240 except ValueError:
241 pass
242 return None
243
244 @property
245 def result_set_last_modify_time(self):
246 """Optional[datetime]: The time at which the result table's contents
247 were modified. May be absent if no results have been written or the
248 query has completed."""
249 from google.cloud._helpers import _rfc3339_nanos_to_datetime
250
251 value = self._properties.get("resultSetLastModifyTime")
252 if value:
253 try:
254 return _rfc3339_nanos_to_datetime(value)
255 except ValueError:
256 pass
257 return None
258
259
260class IndexUnusedReason(typing.NamedTuple):
261 """Reason about why no search index was used in the search query (or sub-query).
262
263 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#indexunusedreason
264 """
265
266 code: Optional[str] = None
267 """Specifies the high-level reason for the scenario when no search index was used.
268 """
269
270 message: Optional[str] = None
271 """Free form human-readable reason for the scenario when no search index was used.
272 """
273
274 baseTable: Optional[TableReference] = None
275 """Specifies the base table involved in the reason that no search index was used.
276 """
277
278 indexName: Optional[str] = None
279 """Specifies the name of the unused search index, if available."""
280
281 @classmethod
282 def from_api_repr(cls, reason):
283 code = reason.get("code")
284 message = reason.get("message")
285 baseTable = reason.get("baseTable")
286 indexName = reason.get("indexName")
287
288 return cls(code, message, baseTable, indexName)
289
290
291class SearchStats(typing.NamedTuple):
292 """Statistics related to Search Queries. Populated as part of JobStatistics2.
293
294 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#searchstatistics
295 """
296
297 mode: Optional[str] = None
298 """Indicates the type of search index usage in the entire search query."""
299
300 reason: List[IndexUnusedReason] = []
301 """Reason about why no search index was used in the search query (or sub-query)"""
302
303 @classmethod
304 def from_api_repr(cls, stats: Dict[str, Any]):
305 mode = stats.get("indexUsageMode", None)
306 reason = [
307 IndexUnusedReason.from_api_repr(r)
308 for r in stats.get("indexUnusedReasons", [])
309 ]
310 return cls(mode, reason)
311
312
313class ScriptOptions:
314 """Options controlling the execution of scripts.
315
316 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#ScriptOptions
317 """
318
319 def __init__(
320 self,
321 statement_timeout_ms: Optional[int] = None,
322 statement_byte_budget: Optional[int] = None,
323 key_result_statement: Optional[KeyResultStatementKind] = None,
324 ):
325 self._properties: Dict[str, Any] = {}
326 self.statement_timeout_ms = statement_timeout_ms
327 self.statement_byte_budget = statement_byte_budget
328 self.key_result_statement = key_result_statement
329
330 @classmethod
331 def from_api_repr(cls, resource: Dict[str, Any]) -> "ScriptOptions":
332 """Factory: construct instance from the JSON repr.
333
334 Args:
335 resource(Dict[str: Any]):
336 ScriptOptions representation returned from API.
337
338 Returns:
339 google.cloud.bigquery.ScriptOptions:
340 ScriptOptions sample parsed from ``resource``.
341 """
342 entry = cls()
343 entry._properties = copy.deepcopy(resource)
344 return entry
345
346 def to_api_repr(self) -> Dict[str, Any]:
347 """Construct the API resource representation."""
348 return copy.deepcopy(self._properties)
349
350 @property
351 def statement_timeout_ms(self) -> Union[int, None]:
352 """Timeout period for each statement in a script."""
353 return _helpers._int_or_none(self._properties.get("statementTimeoutMs"))
354
355 @statement_timeout_ms.setter
356 def statement_timeout_ms(self, value: Union[int, None]):
357 new_value = None if value is None else str(value)
358 self._properties["statementTimeoutMs"] = new_value
359
360 @property
361 def statement_byte_budget(self) -> Union[int, None]:
362 """Limit on the number of bytes billed per statement.
363
364 Exceeding this budget results in an error.
365 """
366 return _helpers._int_or_none(self._properties.get("statementByteBudget"))
367
368 @statement_byte_budget.setter
369 def statement_byte_budget(self, value: Union[int, None]):
370 new_value = None if value is None else str(value)
371 self._properties["statementByteBudget"] = new_value
372
373 @property
374 def key_result_statement(self) -> Union[KeyResultStatementKind, None]:
375 """Determines which statement in the script represents the "key result".
376
377 This is used to populate the schema and query results of the script job.
378 Default is ``KeyResultStatementKind.LAST``.
379 """
380 return self._properties.get("keyResultStatement")
381
382 @key_result_statement.setter
383 def key_result_statement(self, value: Union[KeyResultStatementKind, None]):
384 self._properties["keyResultStatement"] = value
385
386
387class QueryJobConfig(_JobConfig):
388 """Configuration options for query jobs.
389
390 All properties in this class are optional. Values which are :data:`None` ->
391 server defaults. Set properties on the constructed configuration by using
392 the property name as the name of a keyword argument.
393 """
394
395 def __init__(self, **kwargs) -> None:
396 super(QueryJobConfig, self).__init__("query", **kwargs)
397
398 @property
399 def destination_encryption_configuration(self):
400 """google.cloud.bigquery.encryption_configuration.EncryptionConfiguration: Custom
401 encryption configuration for the destination table.
402
403 Custom encryption configuration (e.g., Cloud KMS keys) or :data:`None`
404 if using default encryption.
405
406 See
407 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.destination_encryption_configuration
408 """
409 prop = self._get_sub_prop("destinationEncryptionConfiguration")
410 if prop is not None:
411 prop = EncryptionConfiguration.from_api_repr(prop)
412 return prop
413
414 @destination_encryption_configuration.setter
415 def destination_encryption_configuration(self, value):
416 api_repr = value
417 if value is not None:
418 api_repr = value.to_api_repr()
419 self._set_sub_prop("destinationEncryptionConfiguration", api_repr)
420
421 @property
422 def allow_large_results(self):
423 """bool: Allow large query results tables (legacy SQL, only)
424
425 See
426 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.allow_large_results
427 """
428 return self._get_sub_prop("allowLargeResults")
429
430 @allow_large_results.setter
431 def allow_large_results(self, value):
432 self._set_sub_prop("allowLargeResults", value)
433
434 @property
435 def connection_properties(self) -> List[ConnectionProperty]:
436 """Connection properties.
437
438 See
439 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.connection_properties
440
441 .. versionadded:: 2.29.0
442 """
443 resource = self._get_sub_prop("connectionProperties", [])
444 return [ConnectionProperty.from_api_repr(prop) for prop in resource]
445
446 @connection_properties.setter
447 def connection_properties(self, value: Iterable[ConnectionProperty]):
448 self._set_sub_prop(
449 "connectionProperties",
450 [prop.to_api_repr() for prop in value],
451 )
452
453 @property
454 def create_disposition(self):
455 """google.cloud.bigquery.job.CreateDisposition: Specifies behavior
456 for creating tables.
457
458 See
459 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.create_disposition
460 """
461 return self._get_sub_prop("createDisposition")
462
463 @create_disposition.setter
464 def create_disposition(self, value):
465 self._set_sub_prop("createDisposition", value)
466
467 @property
468 def create_session(self) -> Optional[bool]:
469 """[Preview] If :data:`True`, creates a new session, where
470 :attr:`~google.cloud.bigquery.job.QueryJob.session_info` will contain a
471 random server generated session id.
472
473 If :data:`False`, runs query with an existing ``session_id`` passed in
474 :attr:`~google.cloud.bigquery.job.QueryJobConfig.connection_properties`,
475 otherwise runs query in non-session mode.
476
477 See
478 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.create_session
479
480 .. versionadded:: 2.29.0
481 """
482 return self._get_sub_prop("createSession")
483
484 @create_session.setter
485 def create_session(self, value: Optional[bool]):
486 self._set_sub_prop("createSession", value)
487
488 @property
489 def default_dataset(self):
490 """google.cloud.bigquery.dataset.DatasetReference: the default dataset
491 to use for unqualified table names in the query or :data:`None` if not
492 set.
493
494 The ``default_dataset`` setter accepts:
495
496 - a :class:`~google.cloud.bigquery.dataset.Dataset`, or
497 - a :class:`~google.cloud.bigquery.dataset.DatasetReference`, or
498 - a :class:`str` of the fully-qualified dataset ID in standard SQL
499 format. The value must included a project ID and dataset ID
500 separated by ``.``. For example: ``your-project.your_dataset``.
501
502 See
503 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.default_dataset
504 """
505 prop = self._get_sub_prop("defaultDataset")
506 if prop is not None:
507 prop = DatasetReference.from_api_repr(prop)
508 return prop
509
510 @default_dataset.setter
511 def default_dataset(self, value):
512 if value is None:
513 self._set_sub_prop("defaultDataset", None)
514 return
515
516 if isinstance(value, str):
517 value = DatasetReference.from_string(value)
518
519 if isinstance(value, (Dataset, DatasetListItem)):
520 value = value.reference
521
522 resource = value.to_api_repr()
523 self._set_sub_prop("defaultDataset", resource)
524
525 @property
526 def destination(self):
527 """google.cloud.bigquery.table.TableReference: table where results are
528 written or :data:`None` if not set.
529
530 The ``destination`` setter accepts:
531
532 - a :class:`~google.cloud.bigquery.table.Table`, or
533 - a :class:`~google.cloud.bigquery.table.TableReference`, or
534 - a :class:`str` of the fully-qualified table ID in standard SQL
535 format. The value must included a project ID, dataset ID, and table
536 ID, each separated by ``.``. For example:
537 ``your-project.your_dataset.your_table``.
538
539 .. note::
540
541 Only table ID is passed to the backend, so any configuration
542 in `~google.cloud.bigquery.table.Table` is discarded.
543
544 See
545 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.destination_table
546 """
547 prop = self._get_sub_prop("destinationTable")
548 if prop is not None:
549 prop = TableReference.from_api_repr(prop)
550 return prop
551
552 @destination.setter
553 def destination(self, value):
554 if value is None:
555 self._set_sub_prop("destinationTable", None)
556 return
557
558 value = _table_arg_to_table_ref(value)
559 resource = value.to_api_repr()
560 self._set_sub_prop("destinationTable", resource)
561
562 @property
563 def dry_run(self):
564 """bool: :data:`True` if this query should be a dry run to estimate
565 costs.
566
567 See
568 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfiguration.FIELDS.dry_run
569 """
570 return self._properties.get("dryRun")
571
572 @dry_run.setter
573 def dry_run(self, value):
574 self._properties["dryRun"] = value
575
576 @property
577 def flatten_results(self):
578 """bool: Flatten nested/repeated fields in results. (Legacy SQL only)
579
580 See
581 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.flatten_results
582 """
583 return self._get_sub_prop("flattenResults")
584
585 @flatten_results.setter
586 def flatten_results(self, value):
587 self._set_sub_prop("flattenResults", value)
588
589 @property
590 def maximum_billing_tier(self):
591 """int: Deprecated. Changes the billing tier to allow high-compute
592 queries.
593
594 See
595 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.maximum_billing_tier
596 """
597 return self._get_sub_prop("maximumBillingTier")
598
599 @maximum_billing_tier.setter
600 def maximum_billing_tier(self, value):
601 self._set_sub_prop("maximumBillingTier", value)
602
603 @property
604 def maximum_bytes_billed(self):
605 """int: Maximum bytes to be billed for this job or :data:`None` if not set.
606
607 See
608 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.maximum_bytes_billed
609 """
610 return _helpers._int_or_none(self._get_sub_prop("maximumBytesBilled"))
611
612 @maximum_bytes_billed.setter
613 def maximum_bytes_billed(self, value):
614 self._set_sub_prop("maximumBytesBilled", str(value))
615
616 @property
617 def priority(self):
618 """google.cloud.bigquery.job.QueryPriority: Priority of the query.
619
620 See
621 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.priority
622 """
623 return self._get_sub_prop("priority")
624
625 @priority.setter
626 def priority(self, value):
627 self._set_sub_prop("priority", value)
628
629 @property
630 def query_parameters(self):
631 """List[Union[google.cloud.bigquery.query.ArrayQueryParameter, \
632 google.cloud.bigquery.query.ScalarQueryParameter, \
633 google.cloud.bigquery.query.StructQueryParameter]]: list of parameters
634 for parameterized query (empty by default)
635
636 See:
637 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.query_parameters
638 """
639 prop = self._get_sub_prop("queryParameters", default=[])
640 return _from_api_repr_query_parameters(prop)
641
642 @query_parameters.setter
643 def query_parameters(self, values):
644 self._set_sub_prop("queryParameters", _to_api_repr_query_parameters(values))
645
646 @property
647 def range_partitioning(self):
648 """Optional[google.cloud.bigquery.table.RangePartitioning]:
649 Configures range-based partitioning for destination table.
650
651 .. note::
652 **Beta**. The integer range partitioning feature is in a
653 pre-release state and might change or have limited support.
654
655 Only specify at most one of
656 :attr:`~google.cloud.bigquery.job.LoadJobConfig.time_partitioning` or
657 :attr:`~google.cloud.bigquery.job.LoadJobConfig.range_partitioning`.
658
659 Raises:
660 ValueError:
661 If the value is not
662 :class:`~google.cloud.bigquery.table.RangePartitioning` or
663 :data:`None`.
664 """
665 resource = self._get_sub_prop("rangePartitioning")
666 if resource is not None:
667 return RangePartitioning(_properties=resource)
668
669 @range_partitioning.setter
670 def range_partitioning(self, value):
671 resource = value
672 if isinstance(value, RangePartitioning):
673 resource = value._properties
674 elif value is not None:
675 raise ValueError(
676 "Expected value to be RangePartitioning or None, got {}.".format(value)
677 )
678 self._set_sub_prop("rangePartitioning", resource)
679
680 @property
681 def udf_resources(self):
682 """List[google.cloud.bigquery.query.UDFResource]: user
683 defined function resources (empty by default)
684
685 See:
686 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.user_defined_function_resources
687 """
688 prop = self._get_sub_prop("userDefinedFunctionResources", default=[])
689 return _from_api_repr_udf_resources(prop)
690
691 @udf_resources.setter
692 def udf_resources(self, values):
693 self._set_sub_prop(
694 "userDefinedFunctionResources", _to_api_repr_udf_resources(values)
695 )
696
697 @property
698 def use_legacy_sql(self):
699 """bool: Use legacy SQL syntax.
700
701 See
702 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.use_legacy_sql
703 """
704 return self._get_sub_prop("useLegacySql")
705
706 @use_legacy_sql.setter
707 def use_legacy_sql(self, value):
708 self._set_sub_prop("useLegacySql", value)
709
710 @property
711 def use_query_cache(self):
712 """bool: Look for the query result in the cache.
713
714 See
715 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.use_query_cache
716 """
717 return self._get_sub_prop("useQueryCache")
718
719 @use_query_cache.setter
720 def use_query_cache(self, value):
721 self._set_sub_prop("useQueryCache", value)
722
723 @property
724 def write_disposition(self):
725 """google.cloud.bigquery.job.WriteDisposition: Action that occurs if
726 the destination table already exists.
727
728 See
729 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.write_disposition
730 """
731 return self._get_sub_prop("writeDisposition")
732
733 @write_disposition.setter
734 def write_disposition(self, value):
735 self._set_sub_prop("writeDisposition", value)
736
737 @property
738 def write_incremental_results(self) -> Optional[bool]:
739 """This is only supported for a SELECT query using a temporary table.
740
741 If set, the query is allowed to write results incrementally to the temporary result
742 table. This may incur a performance penalty. This option cannot be used with Legacy SQL.
743
744 This feature is not generally available.
745 """
746 return self._get_sub_prop("writeIncrementalResults")
747
748 @write_incremental_results.setter
749 def write_incremental_results(self, value):
750 self._set_sub_prop("writeIncrementalResults", value)
751
752 @property
753 def table_definitions(self):
754 """Dict[str, google.cloud.bigquery.external_config.ExternalConfig]:
755 Definitions for external tables or :data:`None` if not set.
756
757 See
758 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.external_table_definitions
759 """
760 prop = self._get_sub_prop("tableDefinitions")
761 if prop is not None:
762 prop = _from_api_repr_table_defs(prop)
763 return prop
764
765 @table_definitions.setter
766 def table_definitions(self, values):
767 self._set_sub_prop("tableDefinitions", _to_api_repr_table_defs(values))
768
769 @property
770 def time_partitioning(self):
771 """Optional[google.cloud.bigquery.table.TimePartitioning]: Specifies
772 time-based partitioning for the destination table.
773
774 Only specify at most one of
775 :attr:`~google.cloud.bigquery.job.LoadJobConfig.time_partitioning` or
776 :attr:`~google.cloud.bigquery.job.LoadJobConfig.range_partitioning`.
777
778 Raises:
779 ValueError:
780 If the value is not
781 :class:`~google.cloud.bigquery.table.TimePartitioning` or
782 :data:`None`.
783 """
784 prop = self._get_sub_prop("timePartitioning")
785 if prop is not None:
786 prop = TimePartitioning.from_api_repr(prop)
787 return prop
788
789 @time_partitioning.setter
790 def time_partitioning(self, value):
791 api_repr = value
792 if value is not None:
793 api_repr = value.to_api_repr()
794 self._set_sub_prop("timePartitioning", api_repr)
795
796 @property
797 def clustering_fields(self):
798 """Optional[List[str]]: Fields defining clustering for the table
799
800 (Defaults to :data:`None`).
801
802 Clustering fields are immutable after table creation.
803
804 .. note::
805
806 BigQuery supports clustering for both partitioned and
807 non-partitioned tables.
808 """
809 prop = self._get_sub_prop("clustering")
810 if prop is not None:
811 return list(prop.get("fields", ()))
812
813 @clustering_fields.setter
814 def clustering_fields(self, value):
815 """Optional[List[str]]: Fields defining clustering for the table
816
817 (Defaults to :data:`None`).
818 """
819 if value is not None:
820 self._set_sub_prop("clustering", {"fields": value})
821 else:
822 self._del_sub_prop("clustering")
823
824 @property
825 def schema_update_options(self):
826 """List[google.cloud.bigquery.job.SchemaUpdateOption]: Specifies
827 updates to the destination table schema to allow as a side effect of
828 the query job.
829 """
830 return self._get_sub_prop("schemaUpdateOptions")
831
832 @schema_update_options.setter
833 def schema_update_options(self, values):
834 self._set_sub_prop("schemaUpdateOptions", values)
835
836 @property
837 def script_options(self) -> ScriptOptions:
838 """Options controlling the execution of scripts.
839
840 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#scriptoptions
841 """
842 prop = self._get_sub_prop("scriptOptions")
843 if prop is not None:
844 prop = ScriptOptions.from_api_repr(prop)
845 return prop
846
847 @script_options.setter
848 def script_options(self, value: Union[ScriptOptions, None]):
849 new_value = None if value is None else value.to_api_repr()
850 self._set_sub_prop("scriptOptions", new_value)
851
852 def to_api_repr(self) -> dict:
853 """Build an API representation of the query job config.
854
855 Returns:
856 Dict: A dictionary in the format used by the BigQuery API.
857 """
858 resource = copy.deepcopy(self._properties)
859 # Query parameters have an addition property associated with them
860 # to indicate if the query is using named or positional parameters.
861 query_parameters = resource.get("query", {}).get("queryParameters")
862 if query_parameters:
863 if query_parameters[0].get("name") is None:
864 resource["query"]["parameterMode"] = "POSITIONAL"
865 else:
866 resource["query"]["parameterMode"] = "NAMED"
867
868 return resource
869
870
871class QueryJob(_AsyncJob):
872 """Asynchronous job: query tables.
873
874 Args:
875 job_id (str): the job's ID, within the project belonging to ``client``.
876
877 query (str): SQL query string.
878
879 client (google.cloud.bigquery.client.Client):
880 A client which holds credentials and project configuration
881 for the dataset (which requires a project).
882
883 job_config (Optional[google.cloud.bigquery.job.QueryJobConfig]):
884 Extra configuration options for the query job.
885 """
886
887 _JOB_TYPE = "query"
888 _UDF_KEY = "userDefinedFunctionResources"
889 _CONFIG_CLASS = QueryJobConfig
890
891 def __init__(self, job_id, query, client, job_config=None):
892 super(QueryJob, self).__init__(job_id, client)
893
894 if job_config is not None:
895 self._properties["configuration"] = job_config._properties
896 if self.configuration.use_legacy_sql is None:
897 self.configuration.use_legacy_sql = False
898
899 if query:
900 _helpers._set_sub_prop(
901 self._properties, ["configuration", "query", "query"], query
902 )
903 self._query_results = None
904 self._done_timeout = None
905 self._transport_timeout = None
906
907 @property
908 def allow_large_results(self):
909 """See
910 :attr:`google.cloud.bigquery.job.QueryJobConfig.allow_large_results`.
911 """
912 return self.configuration.allow_large_results
913
914 @property
915 def configuration(self) -> QueryJobConfig:
916 """The configuration for this query job."""
917 return typing.cast(QueryJobConfig, super().configuration)
918
919 @property
920 def connection_properties(self) -> List[ConnectionProperty]:
921 """See
922 :attr:`google.cloud.bigquery.job.QueryJobConfig.connection_properties`.
923
924 .. versionadded:: 2.29.0
925 """
926 return self.configuration.connection_properties
927
928 @property
929 def create_disposition(self):
930 """See
931 :attr:`google.cloud.bigquery.job.QueryJobConfig.create_disposition`.
932 """
933 return self.configuration.create_disposition
934
935 @property
936 def create_session(self) -> Optional[bool]:
937 """See
938 :attr:`google.cloud.bigquery.job.QueryJobConfig.create_session`.
939
940 .. versionadded:: 2.29.0
941 """
942 return self.configuration.create_session
943
944 @property
945 def default_dataset(self):
946 """See
947 :attr:`google.cloud.bigquery.job.QueryJobConfig.default_dataset`.
948 """
949 return self.configuration.default_dataset
950
951 @property
952 def destination(self):
953 """See
954 :attr:`google.cloud.bigquery.job.QueryJobConfig.destination`.
955 """
956 return self.configuration.destination
957
958 @property
959 def destination_encryption_configuration(self):
960 """google.cloud.bigquery.encryption_configuration.EncryptionConfiguration: Custom
961 encryption configuration for the destination table.
962
963 Custom encryption configuration (e.g., Cloud KMS keys) or :data:`None`
964 if using default encryption.
965
966 See
967 :attr:`google.cloud.bigquery.job.QueryJobConfig.destination_encryption_configuration`.
968 """
969 return self.configuration.destination_encryption_configuration
970
971 @property
972 def dry_run(self):
973 """See
974 :attr:`google.cloud.bigquery.job.QueryJobConfig.dry_run`.
975 """
976 return self.configuration.dry_run
977
978 @property
979 def flatten_results(self):
980 """See
981 :attr:`google.cloud.bigquery.job.QueryJobConfig.flatten_results`.
982 """
983 return self.configuration.flatten_results
984
985 @property
986 def priority(self):
987 """See
988 :attr:`google.cloud.bigquery.job.QueryJobConfig.priority`.
989 """
990 return self.configuration.priority
991
992 @property
993 def search_stats(self) -> Optional[SearchStats]:
994 """Returns a SearchStats object."""
995
996 stats = self._job_statistics().get("searchStatistics")
997 if stats is not None:
998 return SearchStats.from_api_repr(stats)
999 return None
1000
1001 @property
1002 def query(self):
1003 """str: The query text used in this query job.
1004
1005 See:
1006 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.query
1007 """
1008 return _helpers._get_sub_prop(
1009 self._properties, ["configuration", "query", "query"]
1010 )
1011
1012 @property
1013 def query_id(self) -> Optional[str]:
1014 """[Preview] ID of a completed query.
1015
1016 This ID is auto-generated and not guaranteed to be populated.
1017 """
1018 query_results = self._query_results
1019 return query_results.query_id if query_results is not None else None
1020
1021 @property
1022 def query_parameters(self):
1023 """See
1024 :attr:`google.cloud.bigquery.job.QueryJobConfig.query_parameters`.
1025 """
1026 return self.configuration.query_parameters
1027
1028 @property
1029 def udf_resources(self):
1030 """See
1031 :attr:`google.cloud.bigquery.job.QueryJobConfig.udf_resources`.
1032 """
1033 return self.configuration.udf_resources
1034
1035 @property
1036 def use_legacy_sql(self):
1037 """See
1038 :attr:`google.cloud.bigquery.job.QueryJobConfig.use_legacy_sql`.
1039 """
1040 return self.configuration.use_legacy_sql
1041
1042 @property
1043 def use_query_cache(self):
1044 """See
1045 :attr:`google.cloud.bigquery.job.QueryJobConfig.use_query_cache`.
1046 """
1047 return self.configuration.use_query_cache
1048
1049 @property
1050 def write_disposition(self):
1051 """See
1052 :attr:`google.cloud.bigquery.job.QueryJobConfig.write_disposition`.
1053 """
1054 return self.configuration.write_disposition
1055
1056 @property
1057 def maximum_billing_tier(self):
1058 """See
1059 :attr:`google.cloud.bigquery.job.QueryJobConfig.maximum_billing_tier`.
1060 """
1061 return self.configuration.maximum_billing_tier
1062
1063 @property
1064 def maximum_bytes_billed(self):
1065 """See
1066 :attr:`google.cloud.bigquery.job.QueryJobConfig.maximum_bytes_billed`.
1067 """
1068 return self.configuration.maximum_bytes_billed
1069
1070 @property
1071 def range_partitioning(self):
1072 """See
1073 :attr:`google.cloud.bigquery.job.QueryJobConfig.range_partitioning`.
1074 """
1075 return self.configuration.range_partitioning
1076
1077 @property
1078 def table_definitions(self):
1079 """See
1080 :attr:`google.cloud.bigquery.job.QueryJobConfig.table_definitions`.
1081 """
1082 return self.configuration.table_definitions
1083
1084 @property
1085 def time_partitioning(self):
1086 """See
1087 :attr:`google.cloud.bigquery.job.QueryJobConfig.time_partitioning`.
1088 """
1089 return self.configuration.time_partitioning
1090
1091 @property
1092 def clustering_fields(self):
1093 """See
1094 :attr:`google.cloud.bigquery.job.QueryJobConfig.clustering_fields`.
1095 """
1096 return self.configuration.clustering_fields
1097
1098 @property
1099 def schema_update_options(self):
1100 """See
1101 :attr:`google.cloud.bigquery.job.QueryJobConfig.schema_update_options`.
1102 """
1103 return self.configuration.schema_update_options
1104
1105 def to_api_repr(self):
1106 """Generate a resource for :meth:`_begin`."""
1107 # Use to_api_repr to allow for some configuration properties to be set
1108 # automatically.
1109 configuration = self.configuration.to_api_repr()
1110 return {
1111 "jobReference": self._properties["jobReference"],
1112 "configuration": configuration,
1113 }
1114
1115 @classmethod
1116 def from_api_repr(cls, resource: dict, client: "Client") -> "QueryJob":
1117 """Factory: construct a job given its API representation
1118
1119 Args:
1120 resource (Dict): dataset job representation returned from the API
1121
1122 client (google.cloud.bigquery.client.Client):
1123 Client which holds credentials and project
1124 configuration for the dataset.
1125
1126 Returns:
1127 google.cloud.bigquery.job.QueryJob: Job parsed from ``resource``.
1128 """
1129 job_ref_properties = resource.setdefault(
1130 "jobReference", {"projectId": client.project, "jobId": None}
1131 )
1132 job_ref = _JobReference._from_api_repr(job_ref_properties)
1133 job = cls(job_ref, None, client=client)
1134 job._set_properties(resource)
1135 return job
1136
1137 @property
1138 def query_plan(self):
1139 """Return query plan from job statistics, if present.
1140
1141 See:
1142 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics2.FIELDS.query_plan
1143
1144 Returns:
1145 List[google.cloud.bigquery.job.QueryPlanEntry]:
1146 mappings describing the query plan, or an empty list
1147 if the query has not yet completed.
1148 """
1149 plan_entries = self._job_statistics().get("queryPlan", ())
1150 return [QueryPlanEntry.from_api_repr(entry) for entry in plan_entries]
1151
1152 @property
1153 def schema(self) -> Optional[List[SchemaField]]:
1154 """The schema of the results.
1155
1156 Present only for successful dry run of non-legacy SQL queries.
1157 """
1158 resource = self._job_statistics().get("schema")
1159 if resource is None:
1160 return None
1161 fields = resource.get("fields", [])
1162 return [SchemaField.from_api_repr(field) for field in fields]
1163
1164 @property
1165 def timeline(self):
1166 """List(TimelineEntry): Return the query execution timeline
1167 from job statistics.
1168 """
1169 raw = self._job_statistics().get("timeline", ())
1170 return [TimelineEntry.from_api_repr(entry) for entry in raw]
1171
1172 @property
1173 def total_bytes_processed(self):
1174 """Return total bytes processed from job statistics, if present.
1175
1176 See:
1177 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics2.FIELDS.total_bytes_processed
1178
1179 Returns:
1180 Optional[int]:
1181 Total bytes processed by the job, or None if job is not
1182 yet complete.
1183 """
1184 result = self._job_statistics().get("totalBytesProcessed")
1185 if result is not None:
1186 result = int(result)
1187 return result
1188
1189 @property
1190 def total_bytes_billed(self):
1191 """Return total bytes billed from job statistics, if present.
1192
1193 See:
1194 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics2.FIELDS.total_bytes_billed
1195
1196 Returns:
1197 Optional[int]:
1198 Total bytes processed by the job, or None if job is not
1199 yet complete.
1200 """
1201 result = self._job_statistics().get("totalBytesBilled")
1202 if result is not None:
1203 result = int(result)
1204 return result
1205
1206 @property
1207 def billing_tier(self):
1208 """Return billing tier from job statistics, if present.
1209
1210 See:
1211 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics2.FIELDS.billing_tier
1212
1213 Returns:
1214 Optional[int]:
1215 Billing tier used by the job, or None if job is not
1216 yet complete.
1217 """
1218 return self._job_statistics().get("billingTier")
1219
1220 @property
1221 def cache_hit(self):
1222 """Return whether or not query results were served from cache.
1223
1224 See:
1225 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics2.FIELDS.cache_hit
1226
1227 Returns:
1228 Optional[bool]:
1229 whether the query results were returned from cache, or None
1230 if job is not yet complete.
1231 """
1232 return self._job_statistics().get("cacheHit")
1233
1234 @property
1235 def ddl_operation_performed(self):
1236 """Optional[str]: Return the DDL operation performed.
1237
1238 See:
1239 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics2.FIELDS.ddl_operation_performed
1240
1241 """
1242 return self._job_statistics().get("ddlOperationPerformed")
1243
1244 @property
1245 def ddl_target_routine(self):
1246 """Optional[google.cloud.bigquery.routine.RoutineReference]: Return the DDL target routine, present
1247 for CREATE/DROP FUNCTION/PROCEDURE queries.
1248
1249 See:
1250 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics2.FIELDS.ddl_target_routine
1251 """
1252 prop = self._job_statistics().get("ddlTargetRoutine")
1253 if prop is not None:
1254 prop = RoutineReference.from_api_repr(prop)
1255 return prop
1256
1257 @property
1258 def ddl_target_table(self):
1259 """Optional[google.cloud.bigquery.table.TableReference]: Return the DDL target table, present
1260 for CREATE/DROP TABLE/VIEW queries.
1261
1262 See:
1263 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics2.FIELDS.ddl_target_table
1264 """
1265 prop = self._job_statistics().get("ddlTargetTable")
1266 if prop is not None:
1267 prop = TableReference.from_api_repr(prop)
1268 return prop
1269
1270 @property
1271 def num_dml_affected_rows(self) -> Optional[int]:
1272 """Return the number of DML rows affected by the job.
1273
1274 See:
1275 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics2.FIELDS.num_dml_affected_rows
1276
1277 Returns:
1278 Optional[int]:
1279 number of DML rows affected by the job, or None if job is not
1280 yet complete.
1281 """
1282 result = self._job_statistics().get("numDmlAffectedRows")
1283 if result is not None:
1284 result = int(result)
1285 return result
1286
1287 @property
1288 def slot_millis(self):
1289 """Union[int, None]: Slot-milliseconds used by this query job."""
1290 return _helpers._int_or_none(self._job_statistics().get("totalSlotMs"))
1291
1292 @property
1293 def statement_type(self):
1294 """Return statement type from job statistics, if present.
1295
1296 See:
1297 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics2.FIELDS.statement_type
1298
1299 Returns:
1300 Optional[str]:
1301 type of statement used by the job, or None if job is not
1302 yet complete.
1303 """
1304 return self._job_statistics().get("statementType")
1305
1306 @property
1307 def referenced_tables(self):
1308 """Return referenced tables from job statistics, if present.
1309
1310 See:
1311 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics2.FIELDS.referenced_tables
1312
1313 Returns:
1314 List[Dict]:
1315 mappings describing the query plan, or an empty list
1316 if the query has not yet completed.
1317 """
1318 tables = []
1319 datasets_by_project_name = {}
1320
1321 for table in self._job_statistics().get("referencedTables", ()):
1322 t_project = table["projectId"]
1323
1324 ds_id = table["datasetId"]
1325 t_dataset = datasets_by_project_name.get((t_project, ds_id))
1326 if t_dataset is None:
1327 t_dataset = DatasetReference(t_project, ds_id)
1328 datasets_by_project_name[(t_project, ds_id)] = t_dataset
1329
1330 t_name = table["tableId"]
1331 tables.append(t_dataset.table(t_name))
1332
1333 return tables
1334
1335 @property
1336 def undeclared_query_parameters(self):
1337 """Return undeclared query parameters from job statistics, if present.
1338
1339 See:
1340 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics2.FIELDS.undeclared_query_parameters
1341
1342 Returns:
1343 List[Union[ \
1344 google.cloud.bigquery.query.ArrayQueryParameter, \
1345 google.cloud.bigquery.query.ScalarQueryParameter, \
1346 google.cloud.bigquery.query.StructQueryParameter \
1347 ]]:
1348 Undeclared parameters, or an empty list if the query has
1349 not yet completed.
1350 """
1351 parameters = []
1352 undeclared = self._job_statistics().get("undeclaredQueryParameters", ())
1353
1354 for parameter in undeclared:
1355 p_type = parameter["parameterType"]
1356
1357 if "arrayType" in p_type:
1358 klass = ArrayQueryParameter
1359 elif "structTypes" in p_type:
1360 klass = StructQueryParameter
1361 else:
1362 klass = ScalarQueryParameter
1363
1364 parameters.append(klass.from_api_repr(parameter))
1365
1366 return parameters
1367
1368 @property
1369 def estimated_bytes_processed(self):
1370 """Return the estimated number of bytes processed by the query.
1371
1372 See:
1373 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics2.FIELDS.estimated_bytes_processed
1374
1375 Returns:
1376 Optional[int]:
1377 number of DML rows affected by the job, or None if job is not
1378 yet complete.
1379 """
1380 result = self._job_statistics().get("estimatedBytesProcessed")
1381 if result is not None:
1382 result = int(result)
1383 return result
1384
1385 @property
1386 def dml_stats(self) -> Optional[DmlStats]:
1387 stats = self._job_statistics().get("dmlStats")
1388 if stats is None:
1389 return None
1390 else:
1391 return DmlStats.from_api_repr(stats)
1392
1393 @property
1394 def bi_engine_stats(self) -> Optional[BiEngineStats]:
1395 stats = self._job_statistics().get("biEngineStatistics")
1396
1397 if stats is None:
1398 return None
1399 else:
1400 return BiEngineStats.from_api_repr(stats)
1401
1402 @property
1403 def incremental_result_stats(self) -> Optional[IncrementalResultStats]:
1404 stats = self._job_statistics().get("incrementalResultStats")
1405 if stats is None:
1406 return None
1407 return IncrementalResultStats.from_api_repr(stats)
1408
1409 def _blocking_poll(self, timeout=None, **kwargs):
1410 self._done_timeout = timeout
1411 self._transport_timeout = timeout
1412 super(QueryJob, self)._blocking_poll(timeout=timeout, **kwargs)
1413
1414 @staticmethod
1415 def _format_for_exception(message: str, query: str):
1416 """Format a query for the output in exception message.
1417
1418 Args:
1419 message (str): The original exception message.
1420 query (str): The SQL query to format.
1421
1422 Returns:
1423 str: A formatted query text.
1424 """
1425 template = "{message}\n\n{header}\n\n{ruler}\n{body}\n{ruler}"
1426
1427 lines = query.splitlines() if query is not None else [""]
1428 max_line_len = max(len(line) for line in lines)
1429
1430 header = "-----Query Job SQL Follows-----"
1431 header = "{:^{total_width}}".format(header, total_width=max_line_len + 5)
1432
1433 # Print out a "ruler" above and below the SQL so we can judge columns.
1434 # Left pad for the line numbers (4 digits plus ":").
1435 ruler = " |" + " . |" * (max_line_len // 10)
1436
1437 # Put line numbers next to the SQL.
1438 body = "\n".join(
1439 "{:4}:{}".format(n, line) for n, line in enumerate(lines, start=1)
1440 )
1441
1442 return template.format(message=message, header=header, ruler=ruler, body=body)
1443
1444 def _begin(self, client=None, retry=DEFAULT_RETRY, timeout=None):
1445 """API call: begin the job via a POST request
1446
1447 See
1448 https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/insert
1449
1450 Args:
1451 client (Optional[google.cloud.bigquery.client.Client]):
1452 The client to use. If not passed, falls back to the ``client``
1453 associated with the job object or``NoneType``.
1454 retry (Optional[google.api_core.retry.Retry]):
1455 How to retry the RPC.
1456 timeout (Optional[float]):
1457 The number of seconds to wait for the underlying HTTP transport
1458 before using ``retry``.
1459
1460 Raises:
1461 ValueError: If the job has already begun.
1462 """
1463
1464 try:
1465 super(QueryJob, self)._begin(client=client, retry=retry, timeout=timeout)
1466 except exceptions.GoogleAPICallError as exc:
1467 exc.message = _EXCEPTION_FOOTER_TEMPLATE.format(
1468 message=exc.message, location=self.location, job_id=self.job_id
1469 )
1470 exc.debug_message = self._format_for_exception(exc.message, self.query)
1471 exc.query_job = self
1472 raise
1473
1474 def _reload_query_results(
1475 self,
1476 retry: "retries.Retry" = DEFAULT_RETRY,
1477 timeout: Optional[float] = None,
1478 page_size: int = 0,
1479 start_index: Optional[int] = None,
1480 ):
1481 """Refresh the cached query results unless already cached and complete.
1482
1483 Args:
1484 retry (Optional[google.api_core.retry.Retry]):
1485 How to retry the call that retrieves query results.
1486 timeout (Optional[float]):
1487 The number of seconds to wait for the underlying HTTP transport
1488 before using ``retry``.
1489 page_size (int):
1490 Maximum number of rows in a single response. See maxResults in
1491 the jobs.getQueryResults REST API.
1492 start_index (Optional[int]):
1493 Zero-based index of the starting row. See startIndex in the
1494 jobs.getQueryResults REST API.
1495 """
1496 # Optimization: avoid a call to jobs.getQueryResults if it's already
1497 # been fetched, e.g. from jobs.query first page of results.
1498 if self._query_results and self._query_results.complete:
1499 return
1500
1501 # Since the API to getQueryResults can hang up to the timeout value
1502 # (default of 10 seconds), set the timeout parameter to ensure that
1503 # the timeout from the futures API is respected. See:
1504 # https://github.com/GoogleCloudPlatform/google-cloud-python/issues/4135
1505 timeout_ms = None
1506
1507 # Python_API_core, as part of a major rewrite of the deadline, timeout,
1508 # retry process sets the timeout value as a Python object().
1509 # Our system does not natively handle that and instead expects
1510 # either None or a numeric value. If passed a Python object, convert to
1511 # None.
1512 if type(self._done_timeout) is object: # pragma: NO COVER
1513 self._done_timeout = None
1514
1515 if self._done_timeout is not None: # pragma: NO COVER
1516 # Subtract a buffer for context switching, network latency, etc.
1517 api_timeout = self._done_timeout - _TIMEOUT_BUFFER_SECS
1518 api_timeout = max(min(api_timeout, 10), 0)
1519 self._done_timeout -= api_timeout
1520 self._done_timeout = max(0, self._done_timeout)
1521 timeout_ms = int(api_timeout * 1000)
1522
1523 # If an explicit timeout is not given, fall back to the transport timeout
1524 # stored in _blocking_poll() in the process of polling for job completion.
1525 if timeout is not None:
1526 transport_timeout = timeout
1527 else:
1528 transport_timeout = self._transport_timeout
1529
1530 # Handle PollingJob._DEFAULT_VALUE.
1531 if not isinstance(transport_timeout, (float, int)):
1532 transport_timeout = None
1533
1534 self._query_results = self._client._get_query_results(
1535 self.job_id,
1536 retry,
1537 project=self.project,
1538 timeout_ms=timeout_ms,
1539 location=self.location,
1540 timeout=transport_timeout,
1541 page_size=page_size,
1542 start_index=start_index,
1543 )
1544
1545 def result( # type: ignore # (incompatible with supertype)
1546 self,
1547 page_size: Optional[int] = None,
1548 max_results: Optional[int] = None,
1549 retry: Optional[retries.Retry] = DEFAULT_RETRY,
1550 timeout: Optional[Union[float, object]] = POLLING_DEFAULT_VALUE,
1551 start_index: Optional[int] = None,
1552 job_retry: Optional[retries.Retry] = DEFAULT_JOB_RETRY,
1553 ) -> Union["RowIterator", _EmptyRowIterator]:
1554 """Start the job and wait for it to complete and get the result.
1555
1556 Args:
1557 page_size (Optional[int]):
1558 The maximum number of rows in each page of results from this
1559 request. Non-positive values are ignored.
1560 max_results (Optional[int]):
1561 The maximum total number of rows from this request.
1562 retry (Optional[google.api_core.retry.Retry]):
1563 How to retry the call that retrieves rows. This only
1564 applies to making RPC calls. It isn't used to retry
1565 failed jobs. This has a reasonable default that
1566 should only be overridden with care. If the job state
1567 is ``DONE``, retrying is aborted early even if the
1568 results are not available, as this will not change
1569 anymore.
1570 timeout (Optional[Union[float, \
1571 google.api_core.future.polling.PollingFuture._DEFAULT_VALUE, \
1572 ]]):
1573 The number of seconds to wait for the underlying HTTP transport
1574 before using ``retry``. If ``None``, wait indefinitely
1575 unless an error is returned. If unset, only the
1576 underlying API calls have their default timeouts, but we still
1577 wait indefinitely for the job to finish.
1578 start_index (Optional[int]):
1579 The zero-based index of the starting row to read.
1580 job_retry (Optional[google.api_core.retry.Retry]):
1581 How to retry failed jobs. The default retries
1582 rate-limit-exceeded errors. Passing ``None`` disables
1583 job retry.
1584
1585 Not all jobs can be retried. If ``job_id`` was
1586 provided to the query that created this job, then the
1587 job returned by the query will not be retryable, and
1588 an exception will be raised if non-``None``
1589 non-default ``job_retry`` is also provided.
1590
1591 Returns:
1592 google.cloud.bigquery.table.RowIterator:
1593 Iterator of row data
1594 :class:`~google.cloud.bigquery.table.Row`-s. During each
1595 page, the iterator will have the ``total_rows`` attribute
1596 set, which counts the total number of rows **in the result
1597 set** (this is distinct from the total number of rows in the
1598 current page: ``iterator.page.num_items``).
1599
1600 If the query is a special query that produces no results, e.g.
1601 a DDL query, an ``_EmptyRowIterator`` instance is returned.
1602
1603 Raises:
1604 google.api_core.exceptions.GoogleAPICallError:
1605 If the job failed and retries aren't successful.
1606 concurrent.futures.TimeoutError:
1607 If the job did not complete in the given timeout.
1608 TypeError:
1609 If Non-``None`` and non-default ``job_retry`` is
1610 provided and the job is not retryable.
1611 """
1612 # Note: Since waiting for a query job to finish is more complex than
1613 # refreshing the job state in a loop, we avoid calling the superclass
1614 # in this method.
1615
1616 if self.dry_run:
1617 return _EmptyRowIterator(
1618 project=self.project,
1619 location=self.location,
1620 schema=self.schema,
1621 total_bytes_processed=self.total_bytes_processed,
1622 # Intentionally omit job_id and query_id since this doesn't
1623 # actually correspond to a finished query job.
1624 )
1625
1626 # Setting max_results should be equivalent to setting page_size with
1627 # regards to allowing the user to tune how many results to download
1628 # while we wait for the query to finish. See internal issue:
1629 # 344008814. But if start_index is set, user is trying to access a
1630 # specific page, so we don't need to set page_size. See issue #1950.
1631 if page_size is None and max_results is not None and start_index is None:
1632 page_size = max_results
1633
1634 # When timeout has default sentinel value ``object()``, do not pass
1635 # anything to invoke default timeouts in subsequent calls.
1636 done_kwargs: Dict[str, Union[_helpers.TimeoutType, object]] = {}
1637 reload_query_results_kwargs: Dict[str, Union[_helpers.TimeoutType, object]] = {}
1638 list_rows_kwargs: Dict[str, Union[_helpers.TimeoutType, object]] = {}
1639 if type(timeout) is not object:
1640 done_kwargs["timeout"] = timeout
1641 list_rows_kwargs["timeout"] = timeout
1642 reload_query_results_kwargs["timeout"] = timeout
1643
1644 if page_size is not None:
1645 reload_query_results_kwargs["page_size"] = page_size
1646
1647 if start_index is not None:
1648 reload_query_results_kwargs["start_index"] = start_index
1649
1650 try:
1651 retry_do_query = getattr(self, "_retry_do_query", None)
1652 if retry_do_query is not None:
1653 if job_retry is DEFAULT_JOB_RETRY:
1654 job_retry = self._job_retry # type: ignore
1655 else:
1656 if job_retry is not None and job_retry is not DEFAULT_JOB_RETRY:
1657 raise TypeError(
1658 "`job_retry` was provided, but this job is"
1659 " not retryable, because a custom `job_id` was"
1660 " provided to the query that created this job."
1661 )
1662
1663 restart_query_job = False
1664
1665 def is_job_done():
1666 nonlocal restart_query_job
1667
1668 if restart_query_job:
1669 restart_query_job = False
1670
1671 # The original job has failed. Create a new one.
1672 #
1673 # Note that we won't get here if retry_do_query is
1674 # None, because we won't use a retry.
1675 job = retry_do_query()
1676
1677 # Become the new job:
1678 self.__dict__.clear()
1679 self.__dict__.update(job.__dict__)
1680
1681 # It's possible the job fails again and we'll have to
1682 # retry that too.
1683 self._retry_do_query = retry_do_query
1684 self._job_retry = job_retry
1685
1686 # If the job hasn't been created, create it now. Related:
1687 # https://github.com/googleapis/python-bigquery/issues/1940
1688 if self.state is None:
1689 self._begin(retry=retry, **done_kwargs)
1690
1691 # Refresh the job status with jobs.get because some of the
1692 # exceptions thrown by jobs.getQueryResults like timeout and
1693 # rateLimitExceeded errors are ambiguous. We want to know if
1694 # the query job failed and not just the call to
1695 # jobs.getQueryResults.
1696 if self.done(retry=retry, **done_kwargs):
1697 # If it's already failed, we might as well stop.
1698 job_failed_exception = self.exception()
1699 if job_failed_exception is not None:
1700 # Only try to restart the query job if the job failed for
1701 # a retriable reason. For example, don't restart the query
1702 # if the call to reload the job metadata within self.done()
1703 # timed out.
1704 #
1705 # The `restart_query_job` must only be called after a
1706 # successful call to the `jobs.get` REST API and we
1707 # determine that the job has failed.
1708 #
1709 # The `jobs.get` REST API
1710 # (https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/get)
1711 # is called via `self.done()` which calls
1712 # `self.reload()`.
1713 #
1714 # To determine if the job failed, the `self.exception()`
1715 # is set from `self.reload()` via
1716 # `self._set_properties()`, which translates the
1717 # `Job.status.errorResult` field
1718 # (https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatus.FIELDS.error_result)
1719 # into an exception that can be processed by the
1720 # `job_retry` predicate.
1721 restart_query_job = True
1722 raise job_failed_exception
1723 else:
1724 # Make sure that the _query_results are cached so we
1725 # can return a complete RowIterator.
1726 #
1727 # Note: As an optimization, _reload_query_results
1728 # doesn't make any API calls if the query results are
1729 # already cached and have jobComplete=True in the
1730 # response from the REST API. This ensures we aren't
1731 # making any extra API calls if the previous loop
1732 # iteration fetched the finished job.
1733 self._reload_query_results(
1734 retry=retry, **reload_query_results_kwargs
1735 )
1736 return True
1737
1738 # Call jobs.getQueryResults with max results set to 0 just to
1739 # wait for the query to finish. Unlike most methods,
1740 # jobs.getQueryResults hangs as long as it can to ensure we
1741 # know when the query has finished as soon as possible.
1742 self._reload_query_results(retry=retry, **reload_query_results_kwargs)
1743
1744 # Even if the query is finished now according to
1745 # jobs.getQueryResults, we'll want to reload the job status if
1746 # it's not already DONE.
1747 return False
1748
1749 if retry_do_query is not None and job_retry is not None:
1750 is_job_done = job_retry(is_job_done)
1751
1752 # timeout can be a number of seconds, `None`, or a
1753 # `google.api_core.future.polling.PollingFuture._DEFAULT_VALUE`
1754 # sentinel object indicating a default timeout if we choose to add
1755 # one some day. This value can come from our PollingFuture
1756 # superclass and was introduced in
1757 # https://github.com/googleapis/python-api-core/pull/462.
1758 if isinstance(timeout, (float, int)):
1759 remaining_timeout = timeout
1760 else:
1761 # Note: we may need to handle _DEFAULT_VALUE as a separate
1762 # case someday, but even then the best we can do for queries
1763 # is 72+ hours for hyperparameter tuning jobs:
1764 # https://cloud.google.com/bigquery/quotas#query_jobs
1765 #
1766 # The timeout for a multi-statement query is 24+ hours. See:
1767 # https://cloud.google.com/bigquery/quotas#multi_statement_query_limits
1768 remaining_timeout = None
1769
1770 if remaining_timeout is None:
1771 # Since is_job_done() calls jobs.getQueryResults, which is a
1772 # long-running API, don't delay the next request at all.
1773 while not is_job_done():
1774 pass
1775 else:
1776 # Use a monotonic clock since we don't actually care about
1777 # daylight savings or similar, just the elapsed time.
1778 previous_time = time.monotonic()
1779
1780 while not is_job_done():
1781 current_time = time.monotonic()
1782 elapsed_time = current_time - previous_time
1783 remaining_timeout = remaining_timeout - elapsed_time
1784 previous_time = current_time
1785
1786 if remaining_timeout < 0:
1787 raise concurrent.futures.TimeoutError()
1788
1789 except exceptions.GoogleAPICallError as exc:
1790 exc.message = _EXCEPTION_FOOTER_TEMPLATE.format(
1791 message=exc.message, location=self.location, job_id=self.job_id
1792 )
1793 exc.debug_message = self._format_for_exception(exc.message, self.query) # type: ignore
1794 exc.query_job = self # type: ignore
1795 raise
1796 except requests.exceptions.Timeout as exc:
1797 raise concurrent.futures.TimeoutError from exc
1798
1799 # If the query job is complete but there are no query results, this was
1800 # special job, such as a DDL query. Return an empty result set to
1801 # indicate success and avoid calling tabledata.list on a table which
1802 # can't be read (such as a view table).
1803 if self._query_results.total_rows is None:
1804 return _EmptyRowIterator(
1805 location=self.location,
1806 project=self.project,
1807 job_id=self.job_id,
1808 query_id=self.query_id,
1809 schema=self.schema,
1810 num_dml_affected_rows=self._query_results.num_dml_affected_rows,
1811 query=self.query,
1812 total_bytes_processed=self.total_bytes_processed,
1813 slot_millis=self.slot_millis,
1814 )
1815
1816 # We know that there's at least 1 row, so only treat the response from
1817 # jobs.getQueryResults / jobs.query as the first page of the
1818 # RowIterator response if there are any rows in it. This prevents us
1819 # from stopping the iteration early in the cases where we set
1820 # maxResults=0. In that case, we're missing rows and there's no next
1821 # page token.
1822 first_page_response = self._query_results._properties
1823 if "rows" not in first_page_response:
1824 first_page_response = None
1825
1826 rows = self._client._list_rows_from_query_results(
1827 self.job_id,
1828 self.location,
1829 self.project,
1830 self._query_results.schema,
1831 total_rows=self._query_results.total_rows,
1832 destination=self.destination,
1833 page_size=page_size,
1834 max_results=max_results,
1835 start_index=start_index,
1836 retry=retry,
1837 query_id=self.query_id,
1838 first_page_response=first_page_response,
1839 num_dml_affected_rows=self._query_results.num_dml_affected_rows,
1840 query=self.query,
1841 total_bytes_processed=self.total_bytes_processed,
1842 slot_millis=self.slot_millis,
1843 created=self.created,
1844 started=self.started,
1845 ended=self.ended,
1846 **list_rows_kwargs,
1847 )
1848 rows._preserve_order = _contains_order_by(self.query)
1849 return rows
1850
1851 # If changing the signature of this method, make sure to apply the same
1852 # changes to table.RowIterator.to_arrow(), except for the max_results parameter
1853 # that should only exist here in the QueryJob method.
1854 def to_arrow(
1855 self,
1856 progress_bar_type: Optional[str] = None,
1857 bqstorage_client: Optional["bigquery_storage.BigQueryReadClient"] = None,
1858 create_bqstorage_client: bool = True,
1859 max_results: Optional[int] = None,
1860 timeout: Optional[float] = None,
1861 ) -> "pyarrow.Table":
1862 """[Beta] Create a class:`pyarrow.Table` by loading all pages of a
1863 table or query.
1864
1865 Args:
1866 progress_bar_type (Optional[str]):
1867 If set, use the `tqdm <https://tqdm.github.io/>`_ library to
1868 display a progress bar while the data downloads. Install the
1869 ``tqdm`` package to use this feature.
1870
1871 Possible values of ``progress_bar_type`` include:
1872
1873 ``None``
1874 No progress bar.
1875 ``'tqdm'``
1876 Use the :func:`tqdm.tqdm` function to print a progress bar
1877 to :data:`sys.stdout`.
1878 ``'tqdm_notebook'``
1879 Use the :func:`tqdm.notebook.tqdm` function to display a
1880 progress bar as a Jupyter notebook widget.
1881 ``'tqdm_gui'``
1882 Use the :func:`tqdm.tqdm_gui` function to display a
1883 progress bar as a graphical dialog box.
1884 bqstorage_client (Optional[google.cloud.bigquery_storage_v1.BigQueryReadClient]):
1885 A BigQuery Storage API client. If supplied, use the faster
1886 BigQuery Storage API to fetch rows from BigQuery. This API
1887 is a billable API.
1888
1889 This method requires ``google-cloud-bigquery-storage`` library.
1890
1891 Reading from a specific partition or snapshot is not
1892 currently supported by this method.
1893 create_bqstorage_client (Optional[bool]):
1894 If ``True`` (default), create a BigQuery Storage API client
1895 using the default API settings. The BigQuery Storage API
1896 is a faster way to fetch rows from BigQuery. See the
1897 ``bqstorage_client`` parameter for more information.
1898
1899 This argument does nothing if ``bqstorage_client`` is supplied.
1900
1901 .. versionadded:: 1.24.0
1902
1903 max_results (Optional[int]):
1904 Maximum number of rows to include in the result. No limit by default.
1905
1906 .. versionadded:: 2.21.0
1907
1908 timeout (Optional[float]):
1909 The number of seconds to wait for the underlying download to complete.
1910 If ``None``, wait indefinitely.
1911
1912 Returns:
1913 pyarrow.Table
1914 A :class:`pyarrow.Table` populated with row data and column
1915 headers from the query results. The column headers are derived
1916 from the destination table's schema.
1917
1918 Raises:
1919 ValueError:
1920 If the :mod:`pyarrow` library cannot be imported.
1921
1922 .. versionadded:: 1.17.0
1923 """
1924 query_result = wait_for_query(self, progress_bar_type, max_results=max_results)
1925 return query_result.to_arrow(
1926 progress_bar_type=progress_bar_type,
1927 bqstorage_client=bqstorage_client,
1928 create_bqstorage_client=create_bqstorage_client,
1929 timeout=timeout,
1930 )
1931
1932 # If changing the signature of this method, make sure to apply the same
1933 # changes to table.RowIterator.to_dataframe(), except for the max_results parameter
1934 # that should only exist here in the QueryJob method.
1935 def to_dataframe(
1936 self,
1937 bqstorage_client: Optional["bigquery_storage.BigQueryReadClient"] = None,
1938 dtypes: Optional[Dict[str, Any]] = None,
1939 progress_bar_type: Optional[str] = None,
1940 create_bqstorage_client: bool = True,
1941 max_results: Optional[int] = None,
1942 geography_as_object: bool = False,
1943 bool_dtype: Union[Any, None] = DefaultPandasDTypes.BOOL_DTYPE,
1944 int_dtype: Union[Any, None] = DefaultPandasDTypes.INT_DTYPE,
1945 float_dtype: Union[Any, None] = None,
1946 string_dtype: Union[Any, None] = None,
1947 date_dtype: Union[Any, None] = DefaultPandasDTypes.DATE_DTYPE,
1948 datetime_dtype: Union[Any, None] = None,
1949 time_dtype: Union[Any, None] = DefaultPandasDTypes.TIME_DTYPE,
1950 timestamp_dtype: Union[Any, None] = None,
1951 range_date_dtype: Union[Any, None] = DefaultPandasDTypes.RANGE_DATE_DTYPE,
1952 range_datetime_dtype: Union[
1953 Any, None
1954 ] = DefaultPandasDTypes.RANGE_DATETIME_DTYPE,
1955 range_timestamp_dtype: Union[
1956 Any, None
1957 ] = DefaultPandasDTypes.RANGE_TIMESTAMP_DTYPE,
1958 timeout: Optional[float] = None,
1959 ) -> "pandas.DataFrame":
1960 """Return a pandas DataFrame from a QueryJob
1961
1962 Args:
1963 bqstorage_client (Optional[google.cloud.bigquery_storage_v1.BigQueryReadClient]):
1964 A BigQuery Storage API client. If supplied, use the faster
1965 BigQuery Storage API to fetch rows from BigQuery. This
1966 API is a billable API.
1967
1968 This method requires the ``fastavro`` and
1969 ``google-cloud-bigquery-storage`` libraries.
1970
1971 Reading from a specific partition or snapshot is not
1972 currently supported by this method.
1973
1974 dtypes (Optional[Map[str, Union[str, pandas.Series.dtype]]]):
1975 A dictionary of column names pandas ``dtype``s. The provided
1976 ``dtype`` is used when constructing the series for the column
1977 specified. Otherwise, the default pandas behavior is used.
1978
1979 progress_bar_type (Optional[str]):
1980 If set, use the `tqdm <https://tqdm.github.io/>`_ library to
1981 display a progress bar while the data downloads. Install the
1982 ``tqdm`` package to use this feature.
1983
1984 See
1985 :func:`~google.cloud.bigquery.table.RowIterator.to_dataframe`
1986 for details.
1987
1988 .. versionadded:: 1.11.0
1989 create_bqstorage_client (Optional[bool]):
1990 If ``True`` (default), create a BigQuery Storage API client
1991 using the default API settings. The BigQuery Storage API
1992 is a faster way to fetch rows from BigQuery. See the
1993 ``bqstorage_client`` parameter for more information.
1994
1995 This argument does nothing if ``bqstorage_client`` is supplied.
1996
1997 .. versionadded:: 1.24.0
1998
1999 max_results (Optional[int]):
2000 Maximum number of rows to include in the result. No limit by default.
2001
2002 .. versionadded:: 2.21.0
2003
2004 geography_as_object (Optional[bool]):
2005 If ``True``, convert GEOGRAPHY data to :mod:`shapely`
2006 geometry objects. If ``False`` (default), don't cast
2007 geography data to :mod:`shapely` geometry objects.
2008
2009 .. versionadded:: 2.24.0
2010
2011 bool_dtype (Optional[pandas.Series.dtype, None]):
2012 If set, indicate a pandas ExtensionDtype (e.g. ``pandas.BooleanDtype()``)
2013 to convert BigQuery Boolean type, instead of relying on the default
2014 ``pandas.BooleanDtype()``. If you explicitly set the value to ``None``,
2015 then the data type will be ``numpy.dtype("bool")``. BigQuery Boolean
2016 type can be found at:
2017 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#boolean_type
2018
2019 .. versionadded:: 3.8.0
2020
2021 int_dtype (Optional[pandas.Series.dtype, None]):
2022 If set, indicate a pandas ExtensionDtype (e.g. ``pandas.Int64Dtype()``)
2023 to convert BigQuery Integer types, instead of relying on the default
2024 ``pandas.Int64Dtype()``. If you explicitly set the value to ``None``,
2025 then the data type will be ``numpy.dtype("int64")``. A list of BigQuery
2026 Integer types can be found at:
2027 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#integer_types
2028
2029 .. versionadded:: 3.8.0
2030
2031 float_dtype (Optional[pandas.Series.dtype, None]):
2032 If set, indicate a pandas ExtensionDtype (e.g. ``pandas.Float32Dtype()``)
2033 to convert BigQuery Float type, instead of relying on the default
2034 ``numpy.dtype("float64")``. If you explicitly set the value to ``None``,
2035 then the data type will be ``numpy.dtype("float64")``. BigQuery Float
2036 type can be found at:
2037 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#floating_point_types
2038
2039 .. versionadded:: 3.8.0
2040
2041 string_dtype (Optional[pandas.Series.dtype, None]):
2042 If set, indicate a pandas ExtensionDtype (e.g. ``pandas.StringDtype()``) to
2043 convert BigQuery String type, instead of relying on the default
2044 ``numpy.dtype("object")``. If you explicitly set the value to ``None``,
2045 then the data type will be ``numpy.dtype("object")``. BigQuery String
2046 type can be found at:
2047 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#string_type
2048
2049 .. versionadded:: 3.8.0
2050
2051 date_dtype (Optional[pandas.Series.dtype, None]):
2052 If set, indicate a pandas ExtensionDtype (e.g.
2053 ``pandas.ArrowDtype(pyarrow.date32())``) to convert BigQuery Date
2054 type, instead of relying on the default ``db_dtypes.DateDtype()``.
2055 If you explicitly set the value to ``None``, then the data type will be
2056 ``numpy.dtype("datetime64[ns]")`` or ``object`` if out of bound. BigQuery
2057 Date type can be found at:
2058 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#date_type
2059
2060 .. versionadded:: 3.10.0
2061
2062 datetime_dtype (Optional[pandas.Series.dtype, None]):
2063 If set, indicate a pandas ExtensionDtype (e.g.
2064 ``pandas.ArrowDtype(pyarrow.timestamp("us"))``) to convert BigQuery Datetime
2065 type, instead of relying on the default ``numpy.dtype("datetime64[ns]``.
2066 If you explicitly set the value to ``None``, then the data type will be
2067 ``numpy.dtype("datetime64[ns]")`` or ``object`` if out of bound. BigQuery
2068 Datetime type can be found at:
2069 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#datetime_type
2070
2071 .. versionadded:: 3.10.0
2072
2073 time_dtype (Optional[pandas.Series.dtype, None]):
2074 If set, indicate a pandas ExtensionDtype (e.g.
2075 ``pandas.ArrowDtype(pyarrow.time64("us"))``) to convert BigQuery Time
2076 type, instead of relying on the default ``db_dtypes.TimeDtype()``.
2077 If you explicitly set the value to ``None``, then the data type will be
2078 ``numpy.dtype("object")``. BigQuery Time type can be found at:
2079 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#time_type
2080
2081 .. versionadded:: 3.10.0
2082
2083 timestamp_dtype (Optional[pandas.Series.dtype, None]):
2084 If set, indicate a pandas ExtensionDtype (e.g.
2085 ``pandas.ArrowDtype(pyarrow.timestamp("us", tz="UTC"))``) to convert BigQuery Timestamp
2086 type, instead of relying on the default ``numpy.dtype("datetime64[ns, UTC]")``.
2087 If you explicitly set the value to ``None``, then the data type will be
2088 ``numpy.dtype("datetime64[ns, UTC]")`` or ``object`` if out of bound. BigQuery
2089 Datetime type can be found at:
2090 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#timestamp_type
2091
2092 .. versionadded:: 3.10.0
2093
2094 range_date_dtype (Optional[pandas.Series.dtype, None]):
2095 If set, indicate a pandas ExtensionDtype, such as:
2096
2097 .. code-block:: python
2098
2099 pandas.ArrowDtype(pyarrow.struct(
2100 [("start", pyarrow.date32()), ("end", pyarrow.date32())]
2101 ))
2102
2103 to convert BigQuery RANGE<DATE> type, instead of relying on
2104 the default ``object``. If you explicitly set the value to
2105 ``None``, the data type will be ``object``. BigQuery Range type
2106 can be found at:
2107 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#range_type
2108
2109 .. versionadded:: 3.21.0
2110
2111 range_datetime_dtype (Optional[pandas.Series.dtype, None]):
2112 If set, indicate a pandas ExtensionDtype, such as:
2113
2114 .. code-block:: python
2115
2116 pandas.ArrowDtype(pyarrow.struct(
2117 [
2118 ("start", pyarrow.timestamp("us")),
2119 ("end", pyarrow.timestamp("us")),
2120 ]
2121 ))
2122
2123 to convert BigQuery RANGE<DATETIME> type, instead of relying on
2124 the default ``object``. If you explicitly set the value to
2125 ``None``, the data type will be ``object``. BigQuery Range type
2126 can be found at:
2127 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#range_type
2128
2129 .. versionadded:: 3.21.0
2130
2131 range_timestamp_dtype (Optional[pandas.Series.dtype, None]):
2132 If set, indicate a pandas ExtensionDtype, such as:
2133
2134 .. code-block:: python
2135
2136 pandas.ArrowDtype(pyarrow.struct(
2137 [
2138 ("start", pyarrow.timestamp("us", tz="UTC")),
2139 ("end", pyarrow.timestamp("us", tz="UTC")),
2140 ]
2141 ))
2142
2143 to convert BigQuery RANGE<TIMESTAMP> type, instead of relying
2144 on the default ``object``. If you explicitly set the value to
2145 ``None``, the data type will be ``object``. BigQuery Range type
2146 can be found at:
2147 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#range_type
2148
2149 .. versionadded:: 3.21.0
2150
2151 timeout (Optional[float]):
2152 The number of seconds to wait for the underlying download to complete.
2153 If ``None``, wait indefinitely.
2154
2155 Returns:
2156 pandas.DataFrame:
2157 A :class:`~pandas.DataFrame` populated with row data
2158 and column headers from the query results. The column
2159 headers are derived from the destination table's
2160 schema.
2161
2162 Raises:
2163 ValueError:
2164 If the :mod:`pandas` library cannot be imported, or
2165 the :mod:`google.cloud.bigquery_storage_v1` module is
2166 required but cannot be imported. Also if
2167 `geography_as_object` is `True`, but the
2168 :mod:`shapely` library cannot be imported.
2169 """
2170 query_result = wait_for_query(self, progress_bar_type, max_results=max_results)
2171 return query_result.to_dataframe(
2172 bqstorage_client=bqstorage_client,
2173 dtypes=dtypes,
2174 progress_bar_type=progress_bar_type,
2175 create_bqstorage_client=create_bqstorage_client,
2176 geography_as_object=geography_as_object,
2177 bool_dtype=bool_dtype,
2178 int_dtype=int_dtype,
2179 float_dtype=float_dtype,
2180 string_dtype=string_dtype,
2181 date_dtype=date_dtype,
2182 datetime_dtype=datetime_dtype,
2183 time_dtype=time_dtype,
2184 timestamp_dtype=timestamp_dtype,
2185 range_date_dtype=range_date_dtype,
2186 range_datetime_dtype=range_datetime_dtype,
2187 range_timestamp_dtype=range_timestamp_dtype,
2188 timeout=timeout,
2189 )
2190
2191 # If changing the signature of this method, make sure to apply the same
2192 # changes to table.RowIterator.to_dataframe(), except for the max_results parameter
2193 # that should only exist here in the QueryJob method.
2194 def to_geodataframe(
2195 self,
2196 bqstorage_client: Optional["bigquery_storage.BigQueryReadClient"] = None,
2197 dtypes: Optional[Dict[str, Any]] = None,
2198 progress_bar_type: Optional[str] = None,
2199 create_bqstorage_client: bool = True,
2200 max_results: Optional[int] = None,
2201 geography_column: Optional[str] = None,
2202 bool_dtype: Union[Any, None] = DefaultPandasDTypes.BOOL_DTYPE,
2203 int_dtype: Union[Any, None] = DefaultPandasDTypes.INT_DTYPE,
2204 float_dtype: Union[Any, None] = None,
2205 string_dtype: Union[Any, None] = None,
2206 timeout: Optional[float] = None,
2207 ) -> "geopandas.GeoDataFrame":
2208 """Return a GeoPandas GeoDataFrame from a QueryJob
2209
2210 Args:
2211 bqstorage_client (Optional[google.cloud.bigquery_storage_v1.BigQueryReadClient]):
2212 A BigQuery Storage API client. If supplied, use the faster
2213 BigQuery Storage API to fetch rows from BigQuery. This
2214 API is a billable API.
2215
2216 This method requires the ``fastavro`` and
2217 ``google-cloud-bigquery-storage`` libraries.
2218
2219 Reading from a specific partition or snapshot is not
2220 currently supported by this method.
2221
2222 dtypes (Optional[Map[str, Union[str, pandas.Series.dtype]]]):
2223 A dictionary of column names pandas ``dtype``s. The provided
2224 ``dtype`` is used when constructing the series for the column
2225 specified. Otherwise, the default pandas behavior is used.
2226
2227 progress_bar_type (Optional[str]):
2228 If set, use the `tqdm <https://tqdm.github.io/>`_ library to
2229 display a progress bar while the data downloads. Install the
2230 ``tqdm`` package to use this feature.
2231
2232 See
2233 :func:`~google.cloud.bigquery.table.RowIterator.to_dataframe`
2234 for details.
2235
2236 .. versionadded:: 1.11.0
2237 create_bqstorage_client (Optional[bool]):
2238 If ``True`` (default), create a BigQuery Storage API client
2239 using the default API settings. The BigQuery Storage API
2240 is a faster way to fetch rows from BigQuery. See the
2241 ``bqstorage_client`` parameter for more information.
2242
2243 This argument does nothing if ``bqstorage_client`` is supplied.
2244
2245 .. versionadded:: 1.24.0
2246
2247 max_results (Optional[int]):
2248 Maximum number of rows to include in the result. No limit by default.
2249
2250 .. versionadded:: 2.21.0
2251
2252 geography_column (Optional[str]):
2253 If there are more than one GEOGRAPHY column,
2254 identifies which one to use to construct a GeoPandas
2255 GeoDataFrame. This option can be ommitted if there's
2256 only one GEOGRAPHY column.
2257 bool_dtype (Optional[pandas.Series.dtype, None]):
2258 If set, indicate a pandas ExtensionDtype (e.g. ``pandas.BooleanDtype()``)
2259 to convert BigQuery Boolean type, instead of relying on the default
2260 ``pandas.BooleanDtype()``. If you explicitly set the value to ``None``,
2261 then the data type will be ``numpy.dtype("bool")``. BigQuery Boolean
2262 type can be found at:
2263 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#boolean_type
2264 int_dtype (Optional[pandas.Series.dtype, None]):
2265 If set, indicate a pandas ExtensionDtype (e.g. ``pandas.Int64Dtype()``)
2266 to convert BigQuery Integer types, instead of relying on the default
2267 ``pandas.Int64Dtype()``. If you explicitly set the value to ``None``,
2268 then the data type will be ``numpy.dtype("int64")``. A list of BigQuery
2269 Integer types can be found at:
2270 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#integer_types
2271 float_dtype (Optional[pandas.Series.dtype, None]):
2272 If set, indicate a pandas ExtensionDtype (e.g. ``pandas.Float32Dtype()``)
2273 to convert BigQuery Float type, instead of relying on the default
2274 ``numpy.dtype("float64")``. If you explicitly set the value to ``None``,
2275 then the data type will be ``numpy.dtype("float64")``. BigQuery Float
2276 type can be found at:
2277 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#floating_point_types
2278 string_dtype (Optional[pandas.Series.dtype, None]):
2279 If set, indicate a pandas ExtensionDtype (e.g. ``pandas.StringDtype()``) to
2280 convert BigQuery String type, instead of relying on the default
2281 ``numpy.dtype("object")``. If you explicitly set the value to ``None``,
2282 then the data type will be ``numpy.dtype("object")``. BigQuery String
2283 type can be found at:
2284 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#string_type
2285 timeout (Optional[float]):
2286 The number of seconds to wait for the underlying download to complete.
2287 If ``None``, wait indefinitely.
2288
2289 Returns:
2290 geopandas.GeoDataFrame:
2291 A :class:`geopandas.GeoDataFrame` populated with row
2292 data and column headers from the query results. The
2293 column headers are derived from the destination
2294 table's schema.
2295
2296 Raises:
2297 ValueError:
2298 If the :mod:`geopandas` library cannot be imported, or the
2299 :mod:`google.cloud.bigquery_storage_v1` module is
2300 required but cannot be imported.
2301
2302 .. versionadded:: 2.24.0
2303 """
2304 query_result = wait_for_query(self, progress_bar_type, max_results=max_results)
2305 return query_result.to_geodataframe(
2306 bqstorage_client=bqstorage_client,
2307 dtypes=dtypes,
2308 progress_bar_type=progress_bar_type,
2309 create_bqstorage_client=create_bqstorage_client,
2310 geography_column=geography_column,
2311 bool_dtype=bool_dtype,
2312 int_dtype=int_dtype,
2313 float_dtype=float_dtype,
2314 string_dtype=string_dtype,
2315 timeout=timeout,
2316 )
2317
2318 def __iter__(self):
2319 return iter(self.result())
2320
2321
2322class QueryPlanEntryStep(object):
2323 """Map a single step in a query plan entry.
2324
2325 Args:
2326 kind (str): step type.
2327 substeps (List): names of substeps.
2328 """
2329
2330 def __init__(self, kind, substeps):
2331 self.kind = kind
2332 self.substeps = list(substeps)
2333
2334 @classmethod
2335 def from_api_repr(cls, resource: dict) -> "QueryPlanEntryStep":
2336 """Factory: construct instance from the JSON repr.
2337
2338 Args:
2339 resource (Dict): JSON representation of the entry.
2340
2341 Returns:
2342 google.cloud.bigquery.job.QueryPlanEntryStep:
2343 New instance built from the resource.
2344 """
2345 return cls(kind=resource.get("kind"), substeps=resource.get("substeps", ()))
2346
2347 def __eq__(self, other):
2348 if not isinstance(other, self.__class__):
2349 return NotImplemented
2350 return self.kind == other.kind and self.substeps == other.substeps
2351
2352
2353class QueryPlanEntry(object):
2354 """QueryPlanEntry represents a single stage of a query execution plan.
2355
2356 See
2357 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#ExplainQueryStage
2358 for the underlying API representation within query statistics.
2359 """
2360
2361 def __init__(self):
2362 self._properties = {}
2363
2364 @classmethod
2365 def from_api_repr(cls, resource: dict) -> "QueryPlanEntry":
2366 """Factory: construct instance from the JSON repr.
2367
2368 Args:
2369 resource(Dict[str: object]):
2370 ExplainQueryStage representation returned from API.
2371
2372 Returns:
2373 google.cloud.bigquery.job.QueryPlanEntry:
2374 Query plan entry parsed from ``resource``.
2375 """
2376 entry = cls()
2377 entry._properties = resource
2378 return entry
2379
2380 @property
2381 def name(self):
2382 """Optional[str]: Human-readable name of the stage."""
2383 return self._properties.get("name")
2384
2385 @property
2386 def entry_id(self):
2387 """Optional[str]: Unique ID for the stage within the plan."""
2388 return self._properties.get("id")
2389
2390 @property
2391 def start(self):
2392 """Optional[Datetime]: Datetime when the stage started."""
2393 if self._properties.get("startMs") is None:
2394 return None
2395 return _helpers._datetime_from_microseconds(
2396 int(self._properties.get("startMs")) * 1000.0
2397 )
2398
2399 @property
2400 def end(self):
2401 """Optional[Datetime]: Datetime when the stage ended."""
2402 if self._properties.get("endMs") is None:
2403 return None
2404 return _helpers._datetime_from_microseconds(
2405 int(self._properties.get("endMs")) * 1000.0
2406 )
2407
2408 @property
2409 def input_stages(self):
2410 """List(int): Entry IDs for stages that were inputs for this stage."""
2411 if self._properties.get("inputStages") is None:
2412 return []
2413 return [
2414 _helpers._int_or_none(entry)
2415 for entry in self._properties.get("inputStages")
2416 ]
2417
2418 @property
2419 def parallel_inputs(self):
2420 """Optional[int]: Number of parallel input segments within
2421 the stage.
2422 """
2423 return _helpers._int_or_none(self._properties.get("parallelInputs"))
2424
2425 @property
2426 def completed_parallel_inputs(self):
2427 """Optional[int]: Number of parallel input segments completed."""
2428 return _helpers._int_or_none(self._properties.get("completedParallelInputs"))
2429
2430 @property
2431 def wait_ms_avg(self):
2432 """Optional[int]: Milliseconds the average worker spent waiting to
2433 be scheduled.
2434 """
2435 return _helpers._int_or_none(self._properties.get("waitMsAvg"))
2436
2437 @property
2438 def wait_ms_max(self):
2439 """Optional[int]: Milliseconds the slowest worker spent waiting to
2440 be scheduled.
2441 """
2442 return _helpers._int_or_none(self._properties.get("waitMsMax"))
2443
2444 @property
2445 def wait_ratio_avg(self):
2446 """Optional[float]: Ratio of time the average worker spent waiting
2447 to be scheduled, relative to the longest time spent by any worker in
2448 any stage of the overall plan.
2449 """
2450 return self._properties.get("waitRatioAvg")
2451
2452 @property
2453 def wait_ratio_max(self):
2454 """Optional[float]: Ratio of time the slowest worker spent waiting
2455 to be scheduled, relative to the longest time spent by any worker in
2456 any stage of the overall plan.
2457 """
2458 return self._properties.get("waitRatioMax")
2459
2460 @property
2461 def read_ms_avg(self):
2462 """Optional[int]: Milliseconds the average worker spent reading
2463 input.
2464 """
2465 return _helpers._int_or_none(self._properties.get("readMsAvg"))
2466
2467 @property
2468 def read_ms_max(self):
2469 """Optional[int]: Milliseconds the slowest worker spent reading
2470 input.
2471 """
2472 return _helpers._int_or_none(self._properties.get("readMsMax"))
2473
2474 @property
2475 def read_ratio_avg(self):
2476 """Optional[float]: Ratio of time the average worker spent reading
2477 input, relative to the longest time spent by any worker in any stage
2478 of the overall plan.
2479 """
2480 return self._properties.get("readRatioAvg")
2481
2482 @property
2483 def read_ratio_max(self):
2484 """Optional[float]: Ratio of time the slowest worker spent reading
2485 to be scheduled, relative to the longest time spent by any worker in
2486 any stage of the overall plan.
2487 """
2488 return self._properties.get("readRatioMax")
2489
2490 @property
2491 def compute_ms_avg(self):
2492 """Optional[int]: Milliseconds the average worker spent on CPU-bound
2493 processing.
2494 """
2495 return _helpers._int_or_none(self._properties.get("computeMsAvg"))
2496
2497 @property
2498 def compute_ms_max(self):
2499 """Optional[int]: Milliseconds the slowest worker spent on CPU-bound
2500 processing.
2501 """
2502 return _helpers._int_or_none(self._properties.get("computeMsMax"))
2503
2504 @property
2505 def compute_ratio_avg(self):
2506 """Optional[float]: Ratio of time the average worker spent on
2507 CPU-bound processing, relative to the longest time spent by any
2508 worker in any stage of the overall plan.
2509 """
2510 return self._properties.get("computeRatioAvg")
2511
2512 @property
2513 def compute_ratio_max(self):
2514 """Optional[float]: Ratio of time the slowest worker spent on
2515 CPU-bound processing, relative to the longest time spent by any
2516 worker in any stage of the overall plan.
2517 """
2518 return self._properties.get("computeRatioMax")
2519
2520 @property
2521 def write_ms_avg(self):
2522 """Optional[int]: Milliseconds the average worker spent writing
2523 output data.
2524 """
2525 return _helpers._int_or_none(self._properties.get("writeMsAvg"))
2526
2527 @property
2528 def write_ms_max(self):
2529 """Optional[int]: Milliseconds the slowest worker spent writing
2530 output data.
2531 """
2532 return _helpers._int_or_none(self._properties.get("writeMsMax"))
2533
2534 @property
2535 def write_ratio_avg(self):
2536 """Optional[float]: Ratio of time the average worker spent writing
2537 output data, relative to the longest time spent by any worker in any
2538 stage of the overall plan.
2539 """
2540 return self._properties.get("writeRatioAvg")
2541
2542 @property
2543 def write_ratio_max(self):
2544 """Optional[float]: Ratio of time the slowest worker spent writing
2545 output data, relative to the longest time spent by any worker in any
2546 stage of the overall plan.
2547 """
2548 return self._properties.get("writeRatioMax")
2549
2550 @property
2551 def records_read(self):
2552 """Optional[int]: Number of records read by this stage."""
2553 return _helpers._int_or_none(self._properties.get("recordsRead"))
2554
2555 @property
2556 def records_written(self):
2557 """Optional[int]: Number of records written by this stage."""
2558 return _helpers._int_or_none(self._properties.get("recordsWritten"))
2559
2560 @property
2561 def status(self):
2562 """Optional[str]: status of this stage."""
2563 return self._properties.get("status")
2564
2565 @property
2566 def shuffle_output_bytes(self):
2567 """Optional[int]: Number of bytes written by this stage to
2568 intermediate shuffle.
2569 """
2570 return _helpers._int_or_none(self._properties.get("shuffleOutputBytes"))
2571
2572 @property
2573 def shuffle_output_bytes_spilled(self):
2574 """Optional[int]: Number of bytes written by this stage to
2575 intermediate shuffle and spilled to disk.
2576 """
2577 return _helpers._int_or_none(self._properties.get("shuffleOutputBytesSpilled"))
2578
2579 @property
2580 def steps(self):
2581 """List(QueryPlanEntryStep): List of step operations performed by
2582 each worker in the stage.
2583 """
2584 return [
2585 QueryPlanEntryStep.from_api_repr(step)
2586 for step in self._properties.get("steps", [])
2587 ]
2588
2589 @property
2590 def slot_ms(self):
2591 """Optional[int]: Slot-milliseconds used by the stage."""
2592 return _helpers._int_or_none(self._properties.get("slotMs"))
2593
2594
2595class TimelineEntry(object):
2596 """TimelineEntry represents progress of a query job at a particular
2597 point in time.
2598
2599 See
2600 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#querytimelinesample
2601 for the underlying API representation within query statistics.
2602 """
2603
2604 def __init__(self):
2605 self._properties = {}
2606
2607 @classmethod
2608 def from_api_repr(cls, resource):
2609 """Factory: construct instance from the JSON repr.
2610
2611 Args:
2612 resource(Dict[str: object]):
2613 QueryTimelineSample representation returned from API.
2614
2615 Returns:
2616 google.cloud.bigquery.TimelineEntry:
2617 Timeline sample parsed from ``resource``.
2618 """
2619 entry = cls()
2620 entry._properties = resource
2621 return entry
2622
2623 @property
2624 def elapsed_ms(self):
2625 """Optional[int]: Milliseconds elapsed since start of query
2626 execution."""
2627 return _helpers._int_or_none(self._properties.get("elapsedMs"))
2628
2629 @property
2630 def active_units(self):
2631 """Optional[int]: Current number of input units being processed
2632 by workers, reported as largest value since the last sample."""
2633 return _helpers._int_or_none(self._properties.get("activeUnits"))
2634
2635 @property
2636 def pending_units(self):
2637 """Optional[int]: Current number of input units remaining for
2638 query stages active at this sample time."""
2639 return _helpers._int_or_none(self._properties.get("pendingUnits"))
2640
2641 @property
2642 def completed_units(self):
2643 """Optional[int]: Current number of input units completed by
2644 this query."""
2645 return _helpers._int_or_none(self._properties.get("completedUnits"))
2646
2647 @property
2648 def slot_millis(self):
2649 """Optional[int]: Cumulative slot-milliseconds consumed by
2650 this query."""
2651 return _helpers._int_or_none(self._properties.get("totalSlotMs"))