1# Copyright 2015 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Classes for query jobs."""
16
17import concurrent.futures
18import copy
19import re
20import time
21import typing
22from typing import Any, Dict, Iterable, List, Optional, Union
23
24from google.api_core import exceptions
25from google.api_core import retry as retries
26import requests
27
28from google.cloud.bigquery.dataset import Dataset
29from google.cloud.bigquery.dataset import DatasetListItem
30from google.cloud.bigquery.dataset import DatasetReference
31from google.cloud.bigquery.encryption_configuration import EncryptionConfiguration
32from google.cloud.bigquery.enums import KeyResultStatementKind, DefaultPandasDTypes
33from google.cloud.bigquery.external_config import ExternalConfig
34from google.cloud.bigquery import _helpers
35from google.cloud.bigquery.query import (
36 _query_param_from_api_repr,
37 ArrayQueryParameter,
38 ConnectionProperty,
39 ScalarQueryParameter,
40 StructQueryParameter,
41 UDFResource,
42)
43from google.cloud.bigquery.retry import (
44 DEFAULT_RETRY,
45 DEFAULT_JOB_RETRY,
46 POLLING_DEFAULT_VALUE,
47)
48from google.cloud.bigquery.routine import RoutineReference
49from google.cloud.bigquery.schema import SchemaField
50from google.cloud.bigquery.table import _EmptyRowIterator
51from google.cloud.bigquery.table import RangePartitioning
52from google.cloud.bigquery.table import _table_arg_to_table_ref
53from google.cloud.bigquery.table import TableReference
54from google.cloud.bigquery.table import TimePartitioning
55from google.cloud.bigquery._tqdm_helpers import wait_for_query
56
57from google.cloud.bigquery.job.base import _AsyncJob
58from google.cloud.bigquery.job.base import _JobConfig
59from google.cloud.bigquery.job.base import _JobReference
60
61try:
62 import pandas # type: ignore
63except ImportError:
64 pandas = None
65
66if typing.TYPE_CHECKING: # pragma: NO COVER
67 # Assumption: type checks are only used by library developers and CI environments
68 # that have all optional dependencies installed, thus no conditional imports.
69 import pandas # type: ignore
70 import geopandas # type: ignore
71 import pyarrow # type: ignore
72 from google.cloud import bigquery_storage
73 from google.cloud.bigquery.client import Client
74 from google.cloud.bigquery.table import RowIterator
75
76
77_CONTAINS_ORDER_BY = re.compile(r"ORDER\s+BY", re.IGNORECASE)
78_EXCEPTION_FOOTER_TEMPLATE = "{message}\n\nLocation: {location}\nJob ID: {job_id}\n"
79_TIMEOUT_BUFFER_SECS = 0.1
80
81
82def _contains_order_by(query):
83 """Do we need to preserve the order of the query results?
84
85 This function has known false positives, such as with ordered window
86 functions:
87
88 .. code-block:: sql
89
90 SELECT SUM(x) OVER (
91 window_name
92 PARTITION BY...
93 ORDER BY...
94 window_frame_clause)
95 FROM ...
96
97 This false positive failure case means the behavior will be correct, but
98 downloading results with the BigQuery Storage API may be slower than it
99 otherwise would. This is preferable to the false negative case, where
100 results are expected to be in order but are not (due to parallel reads).
101 """
102 return query and _CONTAINS_ORDER_BY.search(query)
103
104
105def _from_api_repr_query_parameters(resource):
106 return [_query_param_from_api_repr(mapping) for mapping in resource]
107
108
109def _to_api_repr_query_parameters(value):
110 return [query_parameter.to_api_repr() for query_parameter in value]
111
112
113def _from_api_repr_udf_resources(resource):
114 udf_resources = []
115 for udf_mapping in resource:
116 for udf_type, udf_value in udf_mapping.items():
117 udf_resources.append(UDFResource(udf_type, udf_value))
118 return udf_resources
119
120
121def _to_api_repr_udf_resources(value):
122 return [{udf_resource.udf_type: udf_resource.value} for udf_resource in value]
123
124
125def _from_api_repr_table_defs(resource):
126 return {k: ExternalConfig.from_api_repr(v) for k, v in resource.items()}
127
128
129def _to_api_repr_table_defs(value):
130 return {k: ExternalConfig.to_api_repr(v) for k, v in value.items()}
131
132
133class BiEngineReason(typing.NamedTuple):
134 """Reason for BI Engine acceleration failure
135
136 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#bienginereason
137 """
138
139 code: str = "CODE_UNSPECIFIED"
140
141 reason: str = ""
142
143 @classmethod
144 def from_api_repr(cls, reason: Dict[str, str]) -> "BiEngineReason":
145 return cls(reason.get("code", "CODE_UNSPECIFIED"), reason.get("message", ""))
146
147
148class BiEngineStats(typing.NamedTuple):
149 """Statistics for a BI Engine query
150
151 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#bienginestatistics
152 """
153
154 mode: str = "ACCELERATION_MODE_UNSPECIFIED"
155 """ Specifies which mode of BI Engine acceleration was performed (if any)
156 """
157
158 reasons: List[BiEngineReason] = []
159 """ Contains explanatory messages in case of DISABLED / PARTIAL acceleration
160 """
161
162 @classmethod
163 def from_api_repr(cls, stats: Dict[str, Any]) -> "BiEngineStats":
164 mode = stats.get("biEngineMode", "ACCELERATION_MODE_UNSPECIFIED")
165 reasons = [
166 BiEngineReason.from_api_repr(r) for r in stats.get("biEngineReasons", [])
167 ]
168 return cls(mode, reasons)
169
170
171class DmlStats(typing.NamedTuple):
172 """Detailed statistics for DML statements.
173
174 https://cloud.google.com/bigquery/docs/reference/rest/v2/DmlStats
175 """
176
177 inserted_row_count: int = 0
178 """Number of inserted rows. Populated by DML INSERT and MERGE statements."""
179
180 deleted_row_count: int = 0
181 """Number of deleted rows. populated by DML DELETE, MERGE and TRUNCATE statements.
182 """
183
184 updated_row_count: int = 0
185 """Number of updated rows. Populated by DML UPDATE and MERGE statements."""
186
187 @classmethod
188 def from_api_repr(cls, stats: Dict[str, str]) -> "DmlStats":
189 # NOTE: The field order here must match the order of fields set at the
190 # class level.
191 api_fields = ("insertedRowCount", "deletedRowCount", "updatedRowCount")
192
193 args = (
194 int(stats.get(api_field, default_val))
195 for api_field, default_val in zip(api_fields, cls.__new__.__defaults__) # type: ignore
196 )
197 return cls(*args)
198
199
200class IndexUnusedReason(typing.NamedTuple):
201 """Reason about why no search index was used in the search query (or sub-query).
202
203 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#indexunusedreason
204 """
205
206 code: Optional[str] = None
207 """Specifies the high-level reason for the scenario when no search index was used.
208 """
209
210 message: Optional[str] = None
211 """Free form human-readable reason for the scenario when no search index was used.
212 """
213
214 baseTable: Optional[TableReference] = None
215 """Specifies the base table involved in the reason that no search index was used.
216 """
217
218 indexName: Optional[str] = None
219 """Specifies the name of the unused search index, if available."""
220
221 @classmethod
222 def from_api_repr(cls, reason):
223 code = reason.get("code")
224 message = reason.get("message")
225 baseTable = reason.get("baseTable")
226 indexName = reason.get("indexName")
227
228 return cls(code, message, baseTable, indexName)
229
230
231class SearchStats(typing.NamedTuple):
232 """Statistics related to Search Queries. Populated as part of JobStatistics2.
233
234 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#searchstatistics
235 """
236
237 mode: Optional[str] = None
238 """Indicates the type of search index usage in the entire search query."""
239
240 reason: List[IndexUnusedReason] = []
241 """Reason about why no search index was used in the search query (or sub-query)"""
242
243 @classmethod
244 def from_api_repr(cls, stats: Dict[str, Any]):
245 mode = stats.get("indexUsageMode", None)
246 reason = [
247 IndexUnusedReason.from_api_repr(r)
248 for r in stats.get("indexUnusedReasons", [])
249 ]
250 return cls(mode, reason)
251
252
253class ScriptOptions:
254 """Options controlling the execution of scripts.
255
256 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#ScriptOptions
257 """
258
259 def __init__(
260 self,
261 statement_timeout_ms: Optional[int] = None,
262 statement_byte_budget: Optional[int] = None,
263 key_result_statement: Optional[KeyResultStatementKind] = None,
264 ):
265 self._properties: Dict[str, Any] = {}
266 self.statement_timeout_ms = statement_timeout_ms
267 self.statement_byte_budget = statement_byte_budget
268 self.key_result_statement = key_result_statement
269
270 @classmethod
271 def from_api_repr(cls, resource: Dict[str, Any]) -> "ScriptOptions":
272 """Factory: construct instance from the JSON repr.
273
274 Args:
275 resource(Dict[str: Any]):
276 ScriptOptions representation returned from API.
277
278 Returns:
279 google.cloud.bigquery.ScriptOptions:
280 ScriptOptions sample parsed from ``resource``.
281 """
282 entry = cls()
283 entry._properties = copy.deepcopy(resource)
284 return entry
285
286 def to_api_repr(self) -> Dict[str, Any]:
287 """Construct the API resource representation."""
288 return copy.deepcopy(self._properties)
289
290 @property
291 def statement_timeout_ms(self) -> Union[int, None]:
292 """Timeout period for each statement in a script."""
293 return _helpers._int_or_none(self._properties.get("statementTimeoutMs"))
294
295 @statement_timeout_ms.setter
296 def statement_timeout_ms(self, value: Union[int, None]):
297 new_value = None if value is None else str(value)
298 self._properties["statementTimeoutMs"] = new_value
299
300 @property
301 def statement_byte_budget(self) -> Union[int, None]:
302 """Limit on the number of bytes billed per statement.
303
304 Exceeding this budget results in an error.
305 """
306 return _helpers._int_or_none(self._properties.get("statementByteBudget"))
307
308 @statement_byte_budget.setter
309 def statement_byte_budget(self, value: Union[int, None]):
310 new_value = None if value is None else str(value)
311 self._properties["statementByteBudget"] = new_value
312
313 @property
314 def key_result_statement(self) -> Union[KeyResultStatementKind, None]:
315 """Determines which statement in the script represents the "key result".
316
317 This is used to populate the schema and query results of the script job.
318 Default is ``KeyResultStatementKind.LAST``.
319 """
320 return self._properties.get("keyResultStatement")
321
322 @key_result_statement.setter
323 def key_result_statement(self, value: Union[KeyResultStatementKind, None]):
324 self._properties["keyResultStatement"] = value
325
326
327class QueryJobConfig(_JobConfig):
328 """Configuration options for query jobs.
329
330 All properties in this class are optional. Values which are :data:`None` ->
331 server defaults. Set properties on the constructed configuration by using
332 the property name as the name of a keyword argument.
333 """
334
335 def __init__(self, **kwargs) -> None:
336 super(QueryJobConfig, self).__init__("query", **kwargs)
337
338 @property
339 def destination_encryption_configuration(self):
340 """google.cloud.bigquery.encryption_configuration.EncryptionConfiguration: Custom
341 encryption configuration for the destination table.
342
343 Custom encryption configuration (e.g., Cloud KMS keys) or :data:`None`
344 if using default encryption.
345
346 See
347 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.destination_encryption_configuration
348 """
349 prop = self._get_sub_prop("destinationEncryptionConfiguration")
350 if prop is not None:
351 prop = EncryptionConfiguration.from_api_repr(prop)
352 return prop
353
354 @destination_encryption_configuration.setter
355 def destination_encryption_configuration(self, value):
356 api_repr = value
357 if value is not None:
358 api_repr = value.to_api_repr()
359 self._set_sub_prop("destinationEncryptionConfiguration", api_repr)
360
361 @property
362 def allow_large_results(self):
363 """bool: Allow large query results tables (legacy SQL, only)
364
365 See
366 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.allow_large_results
367 """
368 return self._get_sub_prop("allowLargeResults")
369
370 @allow_large_results.setter
371 def allow_large_results(self, value):
372 self._set_sub_prop("allowLargeResults", value)
373
374 @property
375 def connection_properties(self) -> List[ConnectionProperty]:
376 """Connection properties.
377
378 See
379 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.connection_properties
380
381 .. versionadded:: 2.29.0
382 """
383 resource = self._get_sub_prop("connectionProperties", [])
384 return [ConnectionProperty.from_api_repr(prop) for prop in resource]
385
386 @connection_properties.setter
387 def connection_properties(self, value: Iterable[ConnectionProperty]):
388 self._set_sub_prop(
389 "connectionProperties",
390 [prop.to_api_repr() for prop in value],
391 )
392
393 @property
394 def create_disposition(self):
395 """google.cloud.bigquery.job.CreateDisposition: Specifies behavior
396 for creating tables.
397
398 See
399 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.create_disposition
400 """
401 return self._get_sub_prop("createDisposition")
402
403 @create_disposition.setter
404 def create_disposition(self, value):
405 self._set_sub_prop("createDisposition", value)
406
407 @property
408 def create_session(self) -> Optional[bool]:
409 """[Preview] If :data:`True`, creates a new session, where
410 :attr:`~google.cloud.bigquery.job.QueryJob.session_info` will contain a
411 random server generated session id.
412
413 If :data:`False`, runs query with an existing ``session_id`` passed in
414 :attr:`~google.cloud.bigquery.job.QueryJobConfig.connection_properties`,
415 otherwise runs query in non-session mode.
416
417 See
418 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.create_session
419
420 .. versionadded:: 2.29.0
421 """
422 return self._get_sub_prop("createSession")
423
424 @create_session.setter
425 def create_session(self, value: Optional[bool]):
426 self._set_sub_prop("createSession", value)
427
428 @property
429 def default_dataset(self):
430 """google.cloud.bigquery.dataset.DatasetReference: the default dataset
431 to use for unqualified table names in the query or :data:`None` if not
432 set.
433
434 The ``default_dataset`` setter accepts:
435
436 - a :class:`~google.cloud.bigquery.dataset.Dataset`, or
437 - a :class:`~google.cloud.bigquery.dataset.DatasetReference`, or
438 - a :class:`str` of the fully-qualified dataset ID in standard SQL
439 format. The value must included a project ID and dataset ID
440 separated by ``.``. For example: ``your-project.your_dataset``.
441
442 See
443 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.default_dataset
444 """
445 prop = self._get_sub_prop("defaultDataset")
446 if prop is not None:
447 prop = DatasetReference.from_api_repr(prop)
448 return prop
449
450 @default_dataset.setter
451 def default_dataset(self, value):
452 if value is None:
453 self._set_sub_prop("defaultDataset", None)
454 return
455
456 if isinstance(value, str):
457 value = DatasetReference.from_string(value)
458
459 if isinstance(value, (Dataset, DatasetListItem)):
460 value = value.reference
461
462 resource = value.to_api_repr()
463 self._set_sub_prop("defaultDataset", resource)
464
465 @property
466 def destination(self):
467 """google.cloud.bigquery.table.TableReference: table where results are
468 written or :data:`None` if not set.
469
470 The ``destination`` setter accepts:
471
472 - a :class:`~google.cloud.bigquery.table.Table`, or
473 - a :class:`~google.cloud.bigquery.table.TableReference`, or
474 - a :class:`str` of the fully-qualified table ID in standard SQL
475 format. The value must included a project ID, dataset ID, and table
476 ID, each separated by ``.``. For example:
477 ``your-project.your_dataset.your_table``.
478
479 .. note::
480
481 Only table ID is passed to the backend, so any configuration
482 in `~google.cloud.bigquery.table.Table` is discarded.
483
484 See
485 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.destination_table
486 """
487 prop = self._get_sub_prop("destinationTable")
488 if prop is not None:
489 prop = TableReference.from_api_repr(prop)
490 return prop
491
492 @destination.setter
493 def destination(self, value):
494 if value is None:
495 self._set_sub_prop("destinationTable", None)
496 return
497
498 value = _table_arg_to_table_ref(value)
499 resource = value.to_api_repr()
500 self._set_sub_prop("destinationTable", resource)
501
502 @property
503 def dry_run(self):
504 """bool: :data:`True` if this query should be a dry run to estimate
505 costs.
506
507 See
508 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfiguration.FIELDS.dry_run
509 """
510 return self._properties.get("dryRun")
511
512 @dry_run.setter
513 def dry_run(self, value):
514 self._properties["dryRun"] = value
515
516 @property
517 def flatten_results(self):
518 """bool: Flatten nested/repeated fields in results. (Legacy SQL only)
519
520 See
521 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.flatten_results
522 """
523 return self._get_sub_prop("flattenResults")
524
525 @flatten_results.setter
526 def flatten_results(self, value):
527 self._set_sub_prop("flattenResults", value)
528
529 @property
530 def maximum_billing_tier(self):
531 """int: Deprecated. Changes the billing tier to allow high-compute
532 queries.
533
534 See
535 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.maximum_billing_tier
536 """
537 return self._get_sub_prop("maximumBillingTier")
538
539 @maximum_billing_tier.setter
540 def maximum_billing_tier(self, value):
541 self._set_sub_prop("maximumBillingTier", value)
542
543 @property
544 def maximum_bytes_billed(self):
545 """int: Maximum bytes to be billed for this job or :data:`None` if not set.
546
547 See
548 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.maximum_bytes_billed
549 """
550 return _helpers._int_or_none(self._get_sub_prop("maximumBytesBilled"))
551
552 @maximum_bytes_billed.setter
553 def maximum_bytes_billed(self, value):
554 self._set_sub_prop("maximumBytesBilled", str(value))
555
556 @property
557 def priority(self):
558 """google.cloud.bigquery.job.QueryPriority: Priority of the query.
559
560 See
561 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.priority
562 """
563 return self._get_sub_prop("priority")
564
565 @priority.setter
566 def priority(self, value):
567 self._set_sub_prop("priority", value)
568
569 @property
570 def query_parameters(self):
571 """List[Union[google.cloud.bigquery.query.ArrayQueryParameter, \
572 google.cloud.bigquery.query.ScalarQueryParameter, \
573 google.cloud.bigquery.query.StructQueryParameter]]: list of parameters
574 for parameterized query (empty by default)
575
576 See:
577 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.query_parameters
578 """
579 prop = self._get_sub_prop("queryParameters", default=[])
580 return _from_api_repr_query_parameters(prop)
581
582 @query_parameters.setter
583 def query_parameters(self, values):
584 self._set_sub_prop("queryParameters", _to_api_repr_query_parameters(values))
585
586 @property
587 def range_partitioning(self):
588 """Optional[google.cloud.bigquery.table.RangePartitioning]:
589 Configures range-based partitioning for destination table.
590
591 .. note::
592 **Beta**. The integer range partitioning feature is in a
593 pre-release state and might change or have limited support.
594
595 Only specify at most one of
596 :attr:`~google.cloud.bigquery.job.LoadJobConfig.time_partitioning` or
597 :attr:`~google.cloud.bigquery.job.LoadJobConfig.range_partitioning`.
598
599 Raises:
600 ValueError:
601 If the value is not
602 :class:`~google.cloud.bigquery.table.RangePartitioning` or
603 :data:`None`.
604 """
605 resource = self._get_sub_prop("rangePartitioning")
606 if resource is not None:
607 return RangePartitioning(_properties=resource)
608
609 @range_partitioning.setter
610 def range_partitioning(self, value):
611 resource = value
612 if isinstance(value, RangePartitioning):
613 resource = value._properties
614 elif value is not None:
615 raise ValueError(
616 "Expected value to be RangePartitioning or None, got {}.".format(value)
617 )
618 self._set_sub_prop("rangePartitioning", resource)
619
620 @property
621 def udf_resources(self):
622 """List[google.cloud.bigquery.query.UDFResource]: user
623 defined function resources (empty by default)
624
625 See:
626 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.user_defined_function_resources
627 """
628 prop = self._get_sub_prop("userDefinedFunctionResources", default=[])
629 return _from_api_repr_udf_resources(prop)
630
631 @udf_resources.setter
632 def udf_resources(self, values):
633 self._set_sub_prop(
634 "userDefinedFunctionResources", _to_api_repr_udf_resources(values)
635 )
636
637 @property
638 def use_legacy_sql(self):
639 """bool: Use legacy SQL syntax.
640
641 See
642 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.use_legacy_sql
643 """
644 return self._get_sub_prop("useLegacySql")
645
646 @use_legacy_sql.setter
647 def use_legacy_sql(self, value):
648 self._set_sub_prop("useLegacySql", value)
649
650 @property
651 def use_query_cache(self):
652 """bool: Look for the query result in the cache.
653
654 See
655 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.use_query_cache
656 """
657 return self._get_sub_prop("useQueryCache")
658
659 @use_query_cache.setter
660 def use_query_cache(self, value):
661 self._set_sub_prop("useQueryCache", value)
662
663 @property
664 def write_disposition(self):
665 """google.cloud.bigquery.job.WriteDisposition: Action that occurs if
666 the destination table already exists.
667
668 See
669 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.write_disposition
670 """
671 return self._get_sub_prop("writeDisposition")
672
673 @write_disposition.setter
674 def write_disposition(self, value):
675 self._set_sub_prop("writeDisposition", value)
676
677 @property
678 def write_incremental_results(self) -> Optional[bool]:
679 """This is only supported for a SELECT query using a temporary table.
680
681 If set, the query is allowed to write results incrementally to the temporary result
682 table. This may incur a performance penalty. This option cannot be used with Legacy SQL.
683
684 This feature is not generally available.
685 """
686 return self._get_sub_prop("writeIncrementalResults")
687
688 @write_incremental_results.setter
689 def write_incremental_results(self, value):
690 self._set_sub_prop("writeIncrementalResults", value)
691
692 @property
693 def table_definitions(self):
694 """Dict[str, google.cloud.bigquery.external_config.ExternalConfig]:
695 Definitions for external tables or :data:`None` if not set.
696
697 See
698 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.external_table_definitions
699 """
700 prop = self._get_sub_prop("tableDefinitions")
701 if prop is not None:
702 prop = _from_api_repr_table_defs(prop)
703 return prop
704
705 @table_definitions.setter
706 def table_definitions(self, values):
707 self._set_sub_prop("tableDefinitions", _to_api_repr_table_defs(values))
708
709 @property
710 def time_partitioning(self):
711 """Optional[google.cloud.bigquery.table.TimePartitioning]: Specifies
712 time-based partitioning for the destination table.
713
714 Only specify at most one of
715 :attr:`~google.cloud.bigquery.job.LoadJobConfig.time_partitioning` or
716 :attr:`~google.cloud.bigquery.job.LoadJobConfig.range_partitioning`.
717
718 Raises:
719 ValueError:
720 If the value is not
721 :class:`~google.cloud.bigquery.table.TimePartitioning` or
722 :data:`None`.
723 """
724 prop = self._get_sub_prop("timePartitioning")
725 if prop is not None:
726 prop = TimePartitioning.from_api_repr(prop)
727 return prop
728
729 @time_partitioning.setter
730 def time_partitioning(self, value):
731 api_repr = value
732 if value is not None:
733 api_repr = value.to_api_repr()
734 self._set_sub_prop("timePartitioning", api_repr)
735
736 @property
737 def clustering_fields(self):
738 """Optional[List[str]]: Fields defining clustering for the table
739
740 (Defaults to :data:`None`).
741
742 Clustering fields are immutable after table creation.
743
744 .. note::
745
746 BigQuery supports clustering for both partitioned and
747 non-partitioned tables.
748 """
749 prop = self._get_sub_prop("clustering")
750 if prop is not None:
751 return list(prop.get("fields", ()))
752
753 @clustering_fields.setter
754 def clustering_fields(self, value):
755 """Optional[List[str]]: Fields defining clustering for the table
756
757 (Defaults to :data:`None`).
758 """
759 if value is not None:
760 self._set_sub_prop("clustering", {"fields": value})
761 else:
762 self._del_sub_prop("clustering")
763
764 @property
765 def schema_update_options(self):
766 """List[google.cloud.bigquery.job.SchemaUpdateOption]: Specifies
767 updates to the destination table schema to allow as a side effect of
768 the query job.
769 """
770 return self._get_sub_prop("schemaUpdateOptions")
771
772 @schema_update_options.setter
773 def schema_update_options(self, values):
774 self._set_sub_prop("schemaUpdateOptions", values)
775
776 @property
777 def script_options(self) -> ScriptOptions:
778 """Options controlling the execution of scripts.
779
780 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#scriptoptions
781 """
782 prop = self._get_sub_prop("scriptOptions")
783 if prop is not None:
784 prop = ScriptOptions.from_api_repr(prop)
785 return prop
786
787 @script_options.setter
788 def script_options(self, value: Union[ScriptOptions, None]):
789 new_value = None if value is None else value.to_api_repr()
790 self._set_sub_prop("scriptOptions", new_value)
791
792 def to_api_repr(self) -> dict:
793 """Build an API representation of the query job config.
794
795 Returns:
796 Dict: A dictionary in the format used by the BigQuery API.
797 """
798 resource = copy.deepcopy(self._properties)
799 # Query parameters have an addition property associated with them
800 # to indicate if the query is using named or positional parameters.
801 query_parameters = resource.get("query", {}).get("queryParameters")
802 if query_parameters:
803 if query_parameters[0].get("name") is None:
804 resource["query"]["parameterMode"] = "POSITIONAL"
805 else:
806 resource["query"]["parameterMode"] = "NAMED"
807
808 return resource
809
810
811class QueryJob(_AsyncJob):
812 """Asynchronous job: query tables.
813
814 Args:
815 job_id (str): the job's ID, within the project belonging to ``client``.
816
817 query (str): SQL query string.
818
819 client (google.cloud.bigquery.client.Client):
820 A client which holds credentials and project configuration
821 for the dataset (which requires a project).
822
823 job_config (Optional[google.cloud.bigquery.job.QueryJobConfig]):
824 Extra configuration options for the query job.
825 """
826
827 _JOB_TYPE = "query"
828 _UDF_KEY = "userDefinedFunctionResources"
829 _CONFIG_CLASS = QueryJobConfig
830
831 def __init__(self, job_id, query, client, job_config=None):
832 super(QueryJob, self).__init__(job_id, client)
833
834 if job_config is not None:
835 self._properties["configuration"] = job_config._properties
836 if self.configuration.use_legacy_sql is None:
837 self.configuration.use_legacy_sql = False
838
839 if query:
840 _helpers._set_sub_prop(
841 self._properties, ["configuration", "query", "query"], query
842 )
843 self._query_results = None
844 self._done_timeout = None
845 self._transport_timeout = None
846
847 @property
848 def allow_large_results(self):
849 """See
850 :attr:`google.cloud.bigquery.job.QueryJobConfig.allow_large_results`.
851 """
852 return self.configuration.allow_large_results
853
854 @property
855 def configuration(self) -> QueryJobConfig:
856 """The configuration for this query job."""
857 return typing.cast(QueryJobConfig, super().configuration)
858
859 @property
860 def connection_properties(self) -> List[ConnectionProperty]:
861 """See
862 :attr:`google.cloud.bigquery.job.QueryJobConfig.connection_properties`.
863
864 .. versionadded:: 2.29.0
865 """
866 return self.configuration.connection_properties
867
868 @property
869 def create_disposition(self):
870 """See
871 :attr:`google.cloud.bigquery.job.QueryJobConfig.create_disposition`.
872 """
873 return self.configuration.create_disposition
874
875 @property
876 def create_session(self) -> Optional[bool]:
877 """See
878 :attr:`google.cloud.bigquery.job.QueryJobConfig.create_session`.
879
880 .. versionadded:: 2.29.0
881 """
882 return self.configuration.create_session
883
884 @property
885 def default_dataset(self):
886 """See
887 :attr:`google.cloud.bigquery.job.QueryJobConfig.default_dataset`.
888 """
889 return self.configuration.default_dataset
890
891 @property
892 def destination(self):
893 """See
894 :attr:`google.cloud.bigquery.job.QueryJobConfig.destination`.
895 """
896 return self.configuration.destination
897
898 @property
899 def destination_encryption_configuration(self):
900 """google.cloud.bigquery.encryption_configuration.EncryptionConfiguration: Custom
901 encryption configuration for the destination table.
902
903 Custom encryption configuration (e.g., Cloud KMS keys) or :data:`None`
904 if using default encryption.
905
906 See
907 :attr:`google.cloud.bigquery.job.QueryJobConfig.destination_encryption_configuration`.
908 """
909 return self.configuration.destination_encryption_configuration
910
911 @property
912 def dry_run(self):
913 """See
914 :attr:`google.cloud.bigquery.job.QueryJobConfig.dry_run`.
915 """
916 return self.configuration.dry_run
917
918 @property
919 def flatten_results(self):
920 """See
921 :attr:`google.cloud.bigquery.job.QueryJobConfig.flatten_results`.
922 """
923 return self.configuration.flatten_results
924
925 @property
926 def priority(self):
927 """See
928 :attr:`google.cloud.bigquery.job.QueryJobConfig.priority`.
929 """
930 return self.configuration.priority
931
932 @property
933 def search_stats(self) -> Optional[SearchStats]:
934 """Returns a SearchStats object."""
935
936 stats = self._job_statistics().get("searchStatistics")
937 if stats is not None:
938 return SearchStats.from_api_repr(stats)
939 return None
940
941 @property
942 def query(self):
943 """str: The query text used in this query job.
944
945 See:
946 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.query
947 """
948 return _helpers._get_sub_prop(
949 self._properties, ["configuration", "query", "query"]
950 )
951
952 @property
953 def query_id(self) -> Optional[str]:
954 """[Preview] ID of a completed query.
955
956 This ID is auto-generated and not guaranteed to be populated.
957 """
958 query_results = self._query_results
959 return query_results.query_id if query_results is not None else None
960
961 @property
962 def query_parameters(self):
963 """See
964 :attr:`google.cloud.bigquery.job.QueryJobConfig.query_parameters`.
965 """
966 return self.configuration.query_parameters
967
968 @property
969 def udf_resources(self):
970 """See
971 :attr:`google.cloud.bigquery.job.QueryJobConfig.udf_resources`.
972 """
973 return self.configuration.udf_resources
974
975 @property
976 def use_legacy_sql(self):
977 """See
978 :attr:`google.cloud.bigquery.job.QueryJobConfig.use_legacy_sql`.
979 """
980 return self.configuration.use_legacy_sql
981
982 @property
983 def use_query_cache(self):
984 """See
985 :attr:`google.cloud.bigquery.job.QueryJobConfig.use_query_cache`.
986 """
987 return self.configuration.use_query_cache
988
989 @property
990 def write_disposition(self):
991 """See
992 :attr:`google.cloud.bigquery.job.QueryJobConfig.write_disposition`.
993 """
994 return self.configuration.write_disposition
995
996 @property
997 def maximum_billing_tier(self):
998 """See
999 :attr:`google.cloud.bigquery.job.QueryJobConfig.maximum_billing_tier`.
1000 """
1001 return self.configuration.maximum_billing_tier
1002
1003 @property
1004 def maximum_bytes_billed(self):
1005 """See
1006 :attr:`google.cloud.bigquery.job.QueryJobConfig.maximum_bytes_billed`.
1007 """
1008 return self.configuration.maximum_bytes_billed
1009
1010 @property
1011 def range_partitioning(self):
1012 """See
1013 :attr:`google.cloud.bigquery.job.QueryJobConfig.range_partitioning`.
1014 """
1015 return self.configuration.range_partitioning
1016
1017 @property
1018 def table_definitions(self):
1019 """See
1020 :attr:`google.cloud.bigquery.job.QueryJobConfig.table_definitions`.
1021 """
1022 return self.configuration.table_definitions
1023
1024 @property
1025 def time_partitioning(self):
1026 """See
1027 :attr:`google.cloud.bigquery.job.QueryJobConfig.time_partitioning`.
1028 """
1029 return self.configuration.time_partitioning
1030
1031 @property
1032 def clustering_fields(self):
1033 """See
1034 :attr:`google.cloud.bigquery.job.QueryJobConfig.clustering_fields`.
1035 """
1036 return self.configuration.clustering_fields
1037
1038 @property
1039 def schema_update_options(self):
1040 """See
1041 :attr:`google.cloud.bigquery.job.QueryJobConfig.schema_update_options`.
1042 """
1043 return self.configuration.schema_update_options
1044
1045 def to_api_repr(self):
1046 """Generate a resource for :meth:`_begin`."""
1047 # Use to_api_repr to allow for some configuration properties to be set
1048 # automatically.
1049 configuration = self.configuration.to_api_repr()
1050 return {
1051 "jobReference": self._properties["jobReference"],
1052 "configuration": configuration,
1053 }
1054
1055 @classmethod
1056 def from_api_repr(cls, resource: dict, client: "Client") -> "QueryJob":
1057 """Factory: construct a job given its API representation
1058
1059 Args:
1060 resource (Dict): dataset job representation returned from the API
1061
1062 client (google.cloud.bigquery.client.Client):
1063 Client which holds credentials and project
1064 configuration for the dataset.
1065
1066 Returns:
1067 google.cloud.bigquery.job.QueryJob: Job parsed from ``resource``.
1068 """
1069 job_ref_properties = resource.setdefault(
1070 "jobReference", {"projectId": client.project, "jobId": None}
1071 )
1072 job_ref = _JobReference._from_api_repr(job_ref_properties)
1073 job = cls(job_ref, None, client=client)
1074 job._set_properties(resource)
1075 return job
1076
1077 @property
1078 def query_plan(self):
1079 """Return query plan from job statistics, if present.
1080
1081 See:
1082 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics2.FIELDS.query_plan
1083
1084 Returns:
1085 List[google.cloud.bigquery.job.QueryPlanEntry]:
1086 mappings describing the query plan, or an empty list
1087 if the query has not yet completed.
1088 """
1089 plan_entries = self._job_statistics().get("queryPlan", ())
1090 return [QueryPlanEntry.from_api_repr(entry) for entry in plan_entries]
1091
1092 @property
1093 def schema(self) -> Optional[List[SchemaField]]:
1094 """The schema of the results.
1095
1096 Present only for successful dry run of non-legacy SQL queries.
1097 """
1098 resource = self._job_statistics().get("schema")
1099 if resource is None:
1100 return None
1101 fields = resource.get("fields", [])
1102 return [SchemaField.from_api_repr(field) for field in fields]
1103
1104 @property
1105 def timeline(self):
1106 """List(TimelineEntry): Return the query execution timeline
1107 from job statistics.
1108 """
1109 raw = self._job_statistics().get("timeline", ())
1110 return [TimelineEntry.from_api_repr(entry) for entry in raw]
1111
1112 @property
1113 def total_bytes_processed(self):
1114 """Return total bytes processed from job statistics, if present.
1115
1116 See:
1117 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics2.FIELDS.total_bytes_processed
1118
1119 Returns:
1120 Optional[int]:
1121 Total bytes processed by the job, or None if job is not
1122 yet complete.
1123 """
1124 result = self._job_statistics().get("totalBytesProcessed")
1125 if result is not None:
1126 result = int(result)
1127 return result
1128
1129 @property
1130 def total_bytes_billed(self):
1131 """Return total bytes billed from job statistics, if present.
1132
1133 See:
1134 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics2.FIELDS.total_bytes_billed
1135
1136 Returns:
1137 Optional[int]:
1138 Total bytes processed by the job, or None if job is not
1139 yet complete.
1140 """
1141 result = self._job_statistics().get("totalBytesBilled")
1142 if result is not None:
1143 result = int(result)
1144 return result
1145
1146 @property
1147 def billing_tier(self):
1148 """Return billing tier from job statistics, if present.
1149
1150 See:
1151 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics2.FIELDS.billing_tier
1152
1153 Returns:
1154 Optional[int]:
1155 Billing tier used by the job, or None if job is not
1156 yet complete.
1157 """
1158 return self._job_statistics().get("billingTier")
1159
1160 @property
1161 def cache_hit(self):
1162 """Return whether or not query results were served from cache.
1163
1164 See:
1165 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics2.FIELDS.cache_hit
1166
1167 Returns:
1168 Optional[bool]:
1169 whether the query results were returned from cache, or None
1170 if job is not yet complete.
1171 """
1172 return self._job_statistics().get("cacheHit")
1173
1174 @property
1175 def ddl_operation_performed(self):
1176 """Optional[str]: Return the DDL operation performed.
1177
1178 See:
1179 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics2.FIELDS.ddl_operation_performed
1180
1181 """
1182 return self._job_statistics().get("ddlOperationPerformed")
1183
1184 @property
1185 def ddl_target_routine(self):
1186 """Optional[google.cloud.bigquery.routine.RoutineReference]: Return the DDL target routine, present
1187 for CREATE/DROP FUNCTION/PROCEDURE queries.
1188
1189 See:
1190 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics2.FIELDS.ddl_target_routine
1191 """
1192 prop = self._job_statistics().get("ddlTargetRoutine")
1193 if prop is not None:
1194 prop = RoutineReference.from_api_repr(prop)
1195 return prop
1196
1197 @property
1198 def ddl_target_table(self):
1199 """Optional[google.cloud.bigquery.table.TableReference]: Return the DDL target table, present
1200 for CREATE/DROP TABLE/VIEW queries.
1201
1202 See:
1203 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics2.FIELDS.ddl_target_table
1204 """
1205 prop = self._job_statistics().get("ddlTargetTable")
1206 if prop is not None:
1207 prop = TableReference.from_api_repr(prop)
1208 return prop
1209
1210 @property
1211 def num_dml_affected_rows(self) -> Optional[int]:
1212 """Return the number of DML rows affected by the job.
1213
1214 See:
1215 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics2.FIELDS.num_dml_affected_rows
1216
1217 Returns:
1218 Optional[int]:
1219 number of DML rows affected by the job, or None if job is not
1220 yet complete.
1221 """
1222 result = self._job_statistics().get("numDmlAffectedRows")
1223 if result is not None:
1224 result = int(result)
1225 return result
1226
1227 @property
1228 def slot_millis(self):
1229 """Union[int, None]: Slot-milliseconds used by this query job."""
1230 return _helpers._int_or_none(self._job_statistics().get("totalSlotMs"))
1231
1232 @property
1233 def statement_type(self):
1234 """Return statement type from job statistics, if present.
1235
1236 See:
1237 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics2.FIELDS.statement_type
1238
1239 Returns:
1240 Optional[str]:
1241 type of statement used by the job, or None if job is not
1242 yet complete.
1243 """
1244 return self._job_statistics().get("statementType")
1245
1246 @property
1247 def referenced_tables(self):
1248 """Return referenced tables from job statistics, if present.
1249
1250 See:
1251 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics2.FIELDS.referenced_tables
1252
1253 Returns:
1254 List[Dict]:
1255 mappings describing the query plan, or an empty list
1256 if the query has not yet completed.
1257 """
1258 tables = []
1259 datasets_by_project_name = {}
1260
1261 for table in self._job_statistics().get("referencedTables", ()):
1262 t_project = table["projectId"]
1263
1264 ds_id = table["datasetId"]
1265 t_dataset = datasets_by_project_name.get((t_project, ds_id))
1266 if t_dataset is None:
1267 t_dataset = DatasetReference(t_project, ds_id)
1268 datasets_by_project_name[(t_project, ds_id)] = t_dataset
1269
1270 t_name = table["tableId"]
1271 tables.append(t_dataset.table(t_name))
1272
1273 return tables
1274
1275 @property
1276 def undeclared_query_parameters(self):
1277 """Return undeclared query parameters from job statistics, if present.
1278
1279 See:
1280 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics2.FIELDS.undeclared_query_parameters
1281
1282 Returns:
1283 List[Union[ \
1284 google.cloud.bigquery.query.ArrayQueryParameter, \
1285 google.cloud.bigquery.query.ScalarQueryParameter, \
1286 google.cloud.bigquery.query.StructQueryParameter \
1287 ]]:
1288 Undeclared parameters, or an empty list if the query has
1289 not yet completed.
1290 """
1291 parameters = []
1292 undeclared = self._job_statistics().get("undeclaredQueryParameters", ())
1293
1294 for parameter in undeclared:
1295 p_type = parameter["parameterType"]
1296
1297 if "arrayType" in p_type:
1298 klass = ArrayQueryParameter
1299 elif "structTypes" in p_type:
1300 klass = StructQueryParameter
1301 else:
1302 klass = ScalarQueryParameter
1303
1304 parameters.append(klass.from_api_repr(parameter))
1305
1306 return parameters
1307
1308 @property
1309 def estimated_bytes_processed(self):
1310 """Return the estimated number of bytes processed by the query.
1311
1312 See:
1313 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics2.FIELDS.estimated_bytes_processed
1314
1315 Returns:
1316 Optional[int]:
1317 number of DML rows affected by the job, or None if job is not
1318 yet complete.
1319 """
1320 result = self._job_statistics().get("estimatedBytesProcessed")
1321 if result is not None:
1322 result = int(result)
1323 return result
1324
1325 @property
1326 def dml_stats(self) -> Optional[DmlStats]:
1327 stats = self._job_statistics().get("dmlStats")
1328 if stats is None:
1329 return None
1330 else:
1331 return DmlStats.from_api_repr(stats)
1332
1333 @property
1334 def bi_engine_stats(self) -> Optional[BiEngineStats]:
1335 stats = self._job_statistics().get("biEngineStatistics")
1336
1337 if stats is None:
1338 return None
1339 else:
1340 return BiEngineStats.from_api_repr(stats)
1341
1342 def _blocking_poll(self, timeout=None, **kwargs):
1343 self._done_timeout = timeout
1344 self._transport_timeout = timeout
1345 super(QueryJob, self)._blocking_poll(timeout=timeout, **kwargs)
1346
1347 @staticmethod
1348 def _format_for_exception(message: str, query: str):
1349 """Format a query for the output in exception message.
1350
1351 Args:
1352 message (str): The original exception message.
1353 query (str): The SQL query to format.
1354
1355 Returns:
1356 str: A formatted query text.
1357 """
1358 template = "{message}\n\n{header}\n\n{ruler}\n{body}\n{ruler}"
1359
1360 lines = query.splitlines() if query is not None else [""]
1361 max_line_len = max(len(line) for line in lines)
1362
1363 header = "-----Query Job SQL Follows-----"
1364 header = "{:^{total_width}}".format(header, total_width=max_line_len + 5)
1365
1366 # Print out a "ruler" above and below the SQL so we can judge columns.
1367 # Left pad for the line numbers (4 digits plus ":").
1368 ruler = " |" + " . |" * (max_line_len // 10)
1369
1370 # Put line numbers next to the SQL.
1371 body = "\n".join(
1372 "{:4}:{}".format(n, line) for n, line in enumerate(lines, start=1)
1373 )
1374
1375 return template.format(message=message, header=header, ruler=ruler, body=body)
1376
1377 def _begin(self, client=None, retry=DEFAULT_RETRY, timeout=None):
1378 """API call: begin the job via a POST request
1379
1380 See
1381 https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/insert
1382
1383 Args:
1384 client (Optional[google.cloud.bigquery.client.Client]):
1385 The client to use. If not passed, falls back to the ``client``
1386 associated with the job object or``NoneType``.
1387 retry (Optional[google.api_core.retry.Retry]):
1388 How to retry the RPC.
1389 timeout (Optional[float]):
1390 The number of seconds to wait for the underlying HTTP transport
1391 before using ``retry``.
1392
1393 Raises:
1394 ValueError: If the job has already begun.
1395 """
1396
1397 try:
1398 super(QueryJob, self)._begin(client=client, retry=retry, timeout=timeout)
1399 except exceptions.GoogleAPICallError as exc:
1400 exc.message = _EXCEPTION_FOOTER_TEMPLATE.format(
1401 message=exc.message, location=self.location, job_id=self.job_id
1402 )
1403 exc.debug_message = self._format_for_exception(exc.message, self.query)
1404 exc.query_job = self
1405 raise
1406
1407 def _reload_query_results(
1408 self,
1409 retry: "retries.Retry" = DEFAULT_RETRY,
1410 timeout: Optional[float] = None,
1411 page_size: int = 0,
1412 start_index: Optional[int] = None,
1413 ):
1414 """Refresh the cached query results unless already cached and complete.
1415
1416 Args:
1417 retry (Optional[google.api_core.retry.Retry]):
1418 How to retry the call that retrieves query results.
1419 timeout (Optional[float]):
1420 The number of seconds to wait for the underlying HTTP transport
1421 before using ``retry``.
1422 page_size (int):
1423 Maximum number of rows in a single response. See maxResults in
1424 the jobs.getQueryResults REST API.
1425 start_index (Optional[int]):
1426 Zero-based index of the starting row. See startIndex in the
1427 jobs.getQueryResults REST API.
1428 """
1429 # Optimization: avoid a call to jobs.getQueryResults if it's already
1430 # been fetched, e.g. from jobs.query first page of results.
1431 if self._query_results and self._query_results.complete:
1432 return
1433
1434 # Since the API to getQueryResults can hang up to the timeout value
1435 # (default of 10 seconds), set the timeout parameter to ensure that
1436 # the timeout from the futures API is respected. See:
1437 # https://github.com/GoogleCloudPlatform/google-cloud-python/issues/4135
1438 timeout_ms = None
1439
1440 # Python_API_core, as part of a major rewrite of the deadline, timeout,
1441 # retry process sets the timeout value as a Python object().
1442 # Our system does not natively handle that and instead expects
1443 # either None or a numeric value. If passed a Python object, convert to
1444 # None.
1445 if type(self._done_timeout) is object: # pragma: NO COVER
1446 self._done_timeout = None
1447
1448 if self._done_timeout is not None: # pragma: NO COVER
1449 # Subtract a buffer for context switching, network latency, etc.
1450 api_timeout = self._done_timeout - _TIMEOUT_BUFFER_SECS
1451 api_timeout = max(min(api_timeout, 10), 0)
1452 self._done_timeout -= api_timeout
1453 self._done_timeout = max(0, self._done_timeout)
1454 timeout_ms = int(api_timeout * 1000)
1455
1456 # If an explicit timeout is not given, fall back to the transport timeout
1457 # stored in _blocking_poll() in the process of polling for job completion.
1458 if timeout is not None:
1459 transport_timeout = timeout
1460 else:
1461 transport_timeout = self._transport_timeout
1462
1463 # Handle PollingJob._DEFAULT_VALUE.
1464 if not isinstance(transport_timeout, (float, int)):
1465 transport_timeout = None
1466
1467 self._query_results = self._client._get_query_results(
1468 self.job_id,
1469 retry,
1470 project=self.project,
1471 timeout_ms=timeout_ms,
1472 location=self.location,
1473 timeout=transport_timeout,
1474 page_size=page_size,
1475 start_index=start_index,
1476 )
1477
1478 def result( # type: ignore # (incompatible with supertype)
1479 self,
1480 page_size: Optional[int] = None,
1481 max_results: Optional[int] = None,
1482 retry: Optional[retries.Retry] = DEFAULT_RETRY,
1483 timeout: Optional[Union[float, object]] = POLLING_DEFAULT_VALUE,
1484 start_index: Optional[int] = None,
1485 job_retry: Optional[retries.Retry] = DEFAULT_JOB_RETRY,
1486 ) -> Union["RowIterator", _EmptyRowIterator]:
1487 """Start the job and wait for it to complete and get the result.
1488
1489 Args:
1490 page_size (Optional[int]):
1491 The maximum number of rows in each page of results from this
1492 request. Non-positive values are ignored.
1493 max_results (Optional[int]):
1494 The maximum total number of rows from this request.
1495 retry (Optional[google.api_core.retry.Retry]):
1496 How to retry the call that retrieves rows. This only
1497 applies to making RPC calls. It isn't used to retry
1498 failed jobs. This has a reasonable default that
1499 should only be overridden with care. If the job state
1500 is ``DONE``, retrying is aborted early even if the
1501 results are not available, as this will not change
1502 anymore.
1503 timeout (Optional[Union[float, \
1504 google.api_core.future.polling.PollingFuture._DEFAULT_VALUE, \
1505 ]]):
1506 The number of seconds to wait for the underlying HTTP transport
1507 before using ``retry``. If ``None``, wait indefinitely
1508 unless an error is returned. If unset, only the
1509 underlying API calls have their default timeouts, but we still
1510 wait indefinitely for the job to finish.
1511 start_index (Optional[int]):
1512 The zero-based index of the starting row to read.
1513 job_retry (Optional[google.api_core.retry.Retry]):
1514 How to retry failed jobs. The default retries
1515 rate-limit-exceeded errors. Passing ``None`` disables
1516 job retry.
1517
1518 Not all jobs can be retried. If ``job_id`` was
1519 provided to the query that created this job, then the
1520 job returned by the query will not be retryable, and
1521 an exception will be raised if non-``None``
1522 non-default ``job_retry`` is also provided.
1523
1524 Returns:
1525 google.cloud.bigquery.table.RowIterator:
1526 Iterator of row data
1527 :class:`~google.cloud.bigquery.table.Row`-s. During each
1528 page, the iterator will have the ``total_rows`` attribute
1529 set, which counts the total number of rows **in the result
1530 set** (this is distinct from the total number of rows in the
1531 current page: ``iterator.page.num_items``).
1532
1533 If the query is a special query that produces no results, e.g.
1534 a DDL query, an ``_EmptyRowIterator`` instance is returned.
1535
1536 Raises:
1537 google.api_core.exceptions.GoogleAPICallError:
1538 If the job failed and retries aren't successful.
1539 concurrent.futures.TimeoutError:
1540 If the job did not complete in the given timeout.
1541 TypeError:
1542 If Non-``None`` and non-default ``job_retry`` is
1543 provided and the job is not retryable.
1544 """
1545 # Note: Since waiting for a query job to finish is more complex than
1546 # refreshing the job state in a loop, we avoid calling the superclass
1547 # in this method.
1548
1549 if self.dry_run:
1550 return _EmptyRowIterator(
1551 project=self.project,
1552 location=self.location,
1553 schema=self.schema,
1554 total_bytes_processed=self.total_bytes_processed,
1555 # Intentionally omit job_id and query_id since this doesn't
1556 # actually correspond to a finished query job.
1557 )
1558
1559 # Setting max_results should be equivalent to setting page_size with
1560 # regards to allowing the user to tune how many results to download
1561 # while we wait for the query to finish. See internal issue:
1562 # 344008814. But if start_index is set, user is trying to access a
1563 # specific page, so we don't need to set page_size. See issue #1950.
1564 if page_size is None and max_results is not None and start_index is None:
1565 page_size = max_results
1566
1567 # When timeout has default sentinel value ``object()``, do not pass
1568 # anything to invoke default timeouts in subsequent calls.
1569 done_kwargs: Dict[str, Union[_helpers.TimeoutType, object]] = {}
1570 reload_query_results_kwargs: Dict[str, Union[_helpers.TimeoutType, object]] = {}
1571 list_rows_kwargs: Dict[str, Union[_helpers.TimeoutType, object]] = {}
1572 if type(timeout) is not object:
1573 done_kwargs["timeout"] = timeout
1574 list_rows_kwargs["timeout"] = timeout
1575 reload_query_results_kwargs["timeout"] = timeout
1576
1577 if page_size is not None:
1578 reload_query_results_kwargs["page_size"] = page_size
1579
1580 if start_index is not None:
1581 reload_query_results_kwargs["start_index"] = start_index
1582
1583 try:
1584 retry_do_query = getattr(self, "_retry_do_query", None)
1585 if retry_do_query is not None:
1586 if job_retry is DEFAULT_JOB_RETRY:
1587 job_retry = self._job_retry # type: ignore
1588 else:
1589 if job_retry is not None and job_retry is not DEFAULT_JOB_RETRY:
1590 raise TypeError(
1591 "`job_retry` was provided, but this job is"
1592 " not retryable, because a custom `job_id` was"
1593 " provided to the query that created this job."
1594 )
1595
1596 restart_query_job = False
1597
1598 def is_job_done():
1599 nonlocal restart_query_job
1600
1601 if restart_query_job:
1602 restart_query_job = False
1603
1604 # The original job has failed. Create a new one.
1605 #
1606 # Note that we won't get here if retry_do_query is
1607 # None, because we won't use a retry.
1608 job = retry_do_query()
1609
1610 # Become the new job:
1611 self.__dict__.clear()
1612 self.__dict__.update(job.__dict__)
1613
1614 # It's possible the job fails again and we'll have to
1615 # retry that too.
1616 self._retry_do_query = retry_do_query
1617 self._job_retry = job_retry
1618
1619 # If the job hasn't been created, create it now. Related:
1620 # https://github.com/googleapis/python-bigquery/issues/1940
1621 if self.state is None:
1622 self._begin(retry=retry, **done_kwargs)
1623
1624 # Refresh the job status with jobs.get because some of the
1625 # exceptions thrown by jobs.getQueryResults like timeout and
1626 # rateLimitExceeded errors are ambiguous. We want to know if
1627 # the query job failed and not just the call to
1628 # jobs.getQueryResults.
1629 if self.done(retry=retry, **done_kwargs):
1630 # If it's already failed, we might as well stop.
1631 job_failed_exception = self.exception()
1632 if job_failed_exception is not None:
1633 # Only try to restart the query job if the job failed for
1634 # a retriable reason. For example, don't restart the query
1635 # if the call to reload the job metadata within self.done()
1636 # timed out.
1637 #
1638 # The `restart_query_job` must only be called after a
1639 # successful call to the `jobs.get` REST API and we
1640 # determine that the job has failed.
1641 #
1642 # The `jobs.get` REST API
1643 # (https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/get)
1644 # is called via `self.done()` which calls
1645 # `self.reload()`.
1646 #
1647 # To determine if the job failed, the `self.exception()`
1648 # is set from `self.reload()` via
1649 # `self._set_properties()`, which translates the
1650 # `Job.status.errorResult` field
1651 # (https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatus.FIELDS.error_result)
1652 # into an exception that can be processed by the
1653 # `job_retry` predicate.
1654 restart_query_job = True
1655 raise job_failed_exception
1656 else:
1657 # Make sure that the _query_results are cached so we
1658 # can return a complete RowIterator.
1659 #
1660 # Note: As an optimization, _reload_query_results
1661 # doesn't make any API calls if the query results are
1662 # already cached and have jobComplete=True in the
1663 # response from the REST API. This ensures we aren't
1664 # making any extra API calls if the previous loop
1665 # iteration fetched the finished job.
1666 self._reload_query_results(
1667 retry=retry, **reload_query_results_kwargs
1668 )
1669 return True
1670
1671 # Call jobs.getQueryResults with max results set to 0 just to
1672 # wait for the query to finish. Unlike most methods,
1673 # jobs.getQueryResults hangs as long as it can to ensure we
1674 # know when the query has finished as soon as possible.
1675 self._reload_query_results(retry=retry, **reload_query_results_kwargs)
1676
1677 # Even if the query is finished now according to
1678 # jobs.getQueryResults, we'll want to reload the job status if
1679 # it's not already DONE.
1680 return False
1681
1682 if retry_do_query is not None and job_retry is not None:
1683 is_job_done = job_retry(is_job_done)
1684
1685 # timeout can be a number of seconds, `None`, or a
1686 # `google.api_core.future.polling.PollingFuture._DEFAULT_VALUE`
1687 # sentinel object indicating a default timeout if we choose to add
1688 # one some day. This value can come from our PollingFuture
1689 # superclass and was introduced in
1690 # https://github.com/googleapis/python-api-core/pull/462.
1691 if isinstance(timeout, (float, int)):
1692 remaining_timeout = timeout
1693 else:
1694 # Note: we may need to handle _DEFAULT_VALUE as a separate
1695 # case someday, but even then the best we can do for queries
1696 # is 72+ hours for hyperparameter tuning jobs:
1697 # https://cloud.google.com/bigquery/quotas#query_jobs
1698 #
1699 # The timeout for a multi-statement query is 24+ hours. See:
1700 # https://cloud.google.com/bigquery/quotas#multi_statement_query_limits
1701 remaining_timeout = None
1702
1703 if remaining_timeout is None:
1704 # Since is_job_done() calls jobs.getQueryResults, which is a
1705 # long-running API, don't delay the next request at all.
1706 while not is_job_done():
1707 pass
1708 else:
1709 # Use a monotonic clock since we don't actually care about
1710 # daylight savings or similar, just the elapsed time.
1711 previous_time = time.monotonic()
1712
1713 while not is_job_done():
1714 current_time = time.monotonic()
1715 elapsed_time = current_time - previous_time
1716 remaining_timeout = remaining_timeout - elapsed_time
1717 previous_time = current_time
1718
1719 if remaining_timeout < 0:
1720 raise concurrent.futures.TimeoutError()
1721
1722 except exceptions.GoogleAPICallError as exc:
1723 exc.message = _EXCEPTION_FOOTER_TEMPLATE.format(
1724 message=exc.message, location=self.location, job_id=self.job_id
1725 )
1726 exc.debug_message = self._format_for_exception(exc.message, self.query) # type: ignore
1727 exc.query_job = self # type: ignore
1728 raise
1729 except requests.exceptions.Timeout as exc:
1730 raise concurrent.futures.TimeoutError from exc
1731
1732 # If the query job is complete but there are no query results, this was
1733 # special job, such as a DDL query. Return an empty result set to
1734 # indicate success and avoid calling tabledata.list on a table which
1735 # can't be read (such as a view table).
1736 if self._query_results.total_rows is None:
1737 return _EmptyRowIterator(
1738 location=self.location,
1739 project=self.project,
1740 job_id=self.job_id,
1741 query_id=self.query_id,
1742 schema=self.schema,
1743 num_dml_affected_rows=self._query_results.num_dml_affected_rows,
1744 query=self.query,
1745 total_bytes_processed=self.total_bytes_processed,
1746 slot_millis=self.slot_millis,
1747 )
1748
1749 # We know that there's at least 1 row, so only treat the response from
1750 # jobs.getQueryResults / jobs.query as the first page of the
1751 # RowIterator response if there are any rows in it. This prevents us
1752 # from stopping the iteration early in the cases where we set
1753 # maxResults=0. In that case, we're missing rows and there's no next
1754 # page token.
1755 first_page_response = self._query_results._properties
1756 if "rows" not in first_page_response:
1757 first_page_response = None
1758
1759 rows = self._client._list_rows_from_query_results(
1760 self.job_id,
1761 self.location,
1762 self.project,
1763 self._query_results.schema,
1764 total_rows=self._query_results.total_rows,
1765 destination=self.destination,
1766 page_size=page_size,
1767 max_results=max_results,
1768 start_index=start_index,
1769 retry=retry,
1770 query_id=self.query_id,
1771 first_page_response=first_page_response,
1772 num_dml_affected_rows=self._query_results.num_dml_affected_rows,
1773 query=self.query,
1774 total_bytes_processed=self.total_bytes_processed,
1775 slot_millis=self.slot_millis,
1776 created=self.created,
1777 started=self.started,
1778 ended=self.ended,
1779 **list_rows_kwargs,
1780 )
1781 rows._preserve_order = _contains_order_by(self.query)
1782 return rows
1783
1784 # If changing the signature of this method, make sure to apply the same
1785 # changes to table.RowIterator.to_arrow(), except for the max_results parameter
1786 # that should only exist here in the QueryJob method.
1787 def to_arrow(
1788 self,
1789 progress_bar_type: Optional[str] = None,
1790 bqstorage_client: Optional["bigquery_storage.BigQueryReadClient"] = None,
1791 create_bqstorage_client: bool = True,
1792 max_results: Optional[int] = None,
1793 ) -> "pyarrow.Table":
1794 """[Beta] Create a class:`pyarrow.Table` by loading all pages of a
1795 table or query.
1796
1797 Args:
1798 progress_bar_type (Optional[str]):
1799 If set, use the `tqdm <https://tqdm.github.io/>`_ library to
1800 display a progress bar while the data downloads. Install the
1801 ``tqdm`` package to use this feature.
1802
1803 Possible values of ``progress_bar_type`` include:
1804
1805 ``None``
1806 No progress bar.
1807 ``'tqdm'``
1808 Use the :func:`tqdm.tqdm` function to print a progress bar
1809 to :data:`sys.stdout`.
1810 ``'tqdm_notebook'``
1811 Use the :func:`tqdm.notebook.tqdm` function to display a
1812 progress bar as a Jupyter notebook widget.
1813 ``'tqdm_gui'``
1814 Use the :func:`tqdm.tqdm_gui` function to display a
1815 progress bar as a graphical dialog box.
1816 bqstorage_client (Optional[google.cloud.bigquery_storage_v1.BigQueryReadClient]):
1817 A BigQuery Storage API client. If supplied, use the faster
1818 BigQuery Storage API to fetch rows from BigQuery. This API
1819 is a billable API.
1820
1821 This method requires ``google-cloud-bigquery-storage`` library.
1822
1823 Reading from a specific partition or snapshot is not
1824 currently supported by this method.
1825 create_bqstorage_client (Optional[bool]):
1826 If ``True`` (default), create a BigQuery Storage API client
1827 using the default API settings. The BigQuery Storage API
1828 is a faster way to fetch rows from BigQuery. See the
1829 ``bqstorage_client`` parameter for more information.
1830
1831 This argument does nothing if ``bqstorage_client`` is supplied.
1832
1833 .. versionadded:: 1.24.0
1834
1835 max_results (Optional[int]):
1836 Maximum number of rows to include in the result. No limit by default.
1837
1838 .. versionadded:: 2.21.0
1839
1840 Returns:
1841 pyarrow.Table
1842 A :class:`pyarrow.Table` populated with row data and column
1843 headers from the query results. The column headers are derived
1844 from the destination table's schema.
1845
1846 Raises:
1847 ValueError:
1848 If the :mod:`pyarrow` library cannot be imported.
1849
1850 .. versionadded:: 1.17.0
1851 """
1852 query_result = wait_for_query(self, progress_bar_type, max_results=max_results)
1853 return query_result.to_arrow(
1854 progress_bar_type=progress_bar_type,
1855 bqstorage_client=bqstorage_client,
1856 create_bqstorage_client=create_bqstorage_client,
1857 )
1858
1859 # If changing the signature of this method, make sure to apply the same
1860 # changes to table.RowIterator.to_dataframe(), except for the max_results parameter
1861 # that should only exist here in the QueryJob method.
1862 def to_dataframe(
1863 self,
1864 bqstorage_client: Optional["bigquery_storage.BigQueryReadClient"] = None,
1865 dtypes: Optional[Dict[str, Any]] = None,
1866 progress_bar_type: Optional[str] = None,
1867 create_bqstorage_client: bool = True,
1868 max_results: Optional[int] = None,
1869 geography_as_object: bool = False,
1870 bool_dtype: Union[Any, None] = DefaultPandasDTypes.BOOL_DTYPE,
1871 int_dtype: Union[Any, None] = DefaultPandasDTypes.INT_DTYPE,
1872 float_dtype: Union[Any, None] = None,
1873 string_dtype: Union[Any, None] = None,
1874 date_dtype: Union[Any, None] = DefaultPandasDTypes.DATE_DTYPE,
1875 datetime_dtype: Union[Any, None] = None,
1876 time_dtype: Union[Any, None] = DefaultPandasDTypes.TIME_DTYPE,
1877 timestamp_dtype: Union[Any, None] = None,
1878 range_date_dtype: Union[Any, None] = DefaultPandasDTypes.RANGE_DATE_DTYPE,
1879 range_datetime_dtype: Union[
1880 Any, None
1881 ] = DefaultPandasDTypes.RANGE_DATETIME_DTYPE,
1882 range_timestamp_dtype: Union[
1883 Any, None
1884 ] = DefaultPandasDTypes.RANGE_TIMESTAMP_DTYPE,
1885 ) -> "pandas.DataFrame":
1886 """Return a pandas DataFrame from a QueryJob
1887
1888 Args:
1889 bqstorage_client (Optional[google.cloud.bigquery_storage_v1.BigQueryReadClient]):
1890 A BigQuery Storage API client. If supplied, use the faster
1891 BigQuery Storage API to fetch rows from BigQuery. This
1892 API is a billable API.
1893
1894 This method requires the ``fastavro`` and
1895 ``google-cloud-bigquery-storage`` libraries.
1896
1897 Reading from a specific partition or snapshot is not
1898 currently supported by this method.
1899
1900 dtypes (Optional[Map[str, Union[str, pandas.Series.dtype]]]):
1901 A dictionary of column names pandas ``dtype``s. The provided
1902 ``dtype`` is used when constructing the series for the column
1903 specified. Otherwise, the default pandas behavior is used.
1904
1905 progress_bar_type (Optional[str]):
1906 If set, use the `tqdm <https://tqdm.github.io/>`_ library to
1907 display a progress bar while the data downloads. Install the
1908 ``tqdm`` package to use this feature.
1909
1910 See
1911 :func:`~google.cloud.bigquery.table.RowIterator.to_dataframe`
1912 for details.
1913
1914 .. versionadded:: 1.11.0
1915 create_bqstorage_client (Optional[bool]):
1916 If ``True`` (default), create a BigQuery Storage API client
1917 using the default API settings. The BigQuery Storage API
1918 is a faster way to fetch rows from BigQuery. See the
1919 ``bqstorage_client`` parameter for more information.
1920
1921 This argument does nothing if ``bqstorage_client`` is supplied.
1922
1923 .. versionadded:: 1.24.0
1924
1925 max_results (Optional[int]):
1926 Maximum number of rows to include in the result. No limit by default.
1927
1928 .. versionadded:: 2.21.0
1929
1930 geography_as_object (Optional[bool]):
1931 If ``True``, convert GEOGRAPHY data to :mod:`shapely`
1932 geometry objects. If ``False`` (default), don't cast
1933 geography data to :mod:`shapely` geometry objects.
1934
1935 .. versionadded:: 2.24.0
1936
1937 bool_dtype (Optional[pandas.Series.dtype, None]):
1938 If set, indicate a pandas ExtensionDtype (e.g. ``pandas.BooleanDtype()``)
1939 to convert BigQuery Boolean type, instead of relying on the default
1940 ``pandas.BooleanDtype()``. If you explicitly set the value to ``None``,
1941 then the data type will be ``numpy.dtype("bool")``. BigQuery Boolean
1942 type can be found at:
1943 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#boolean_type
1944
1945 .. versionadded:: 3.8.0
1946
1947 int_dtype (Optional[pandas.Series.dtype, None]):
1948 If set, indicate a pandas ExtensionDtype (e.g. ``pandas.Int64Dtype()``)
1949 to convert BigQuery Integer types, instead of relying on the default
1950 ``pandas.Int64Dtype()``. If you explicitly set the value to ``None``,
1951 then the data type will be ``numpy.dtype("int64")``. A list of BigQuery
1952 Integer types can be found at:
1953 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#integer_types
1954
1955 .. versionadded:: 3.8.0
1956
1957 float_dtype (Optional[pandas.Series.dtype, None]):
1958 If set, indicate a pandas ExtensionDtype (e.g. ``pandas.Float32Dtype()``)
1959 to convert BigQuery Float type, instead of relying on the default
1960 ``numpy.dtype("float64")``. If you explicitly set the value to ``None``,
1961 then the data type will be ``numpy.dtype("float64")``. BigQuery Float
1962 type can be found at:
1963 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#floating_point_types
1964
1965 .. versionadded:: 3.8.0
1966
1967 string_dtype (Optional[pandas.Series.dtype, None]):
1968 If set, indicate a pandas ExtensionDtype (e.g. ``pandas.StringDtype()``) to
1969 convert BigQuery String type, instead of relying on the default
1970 ``numpy.dtype("object")``. If you explicitly set the value to ``None``,
1971 then the data type will be ``numpy.dtype("object")``. BigQuery String
1972 type can be found at:
1973 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#string_type
1974
1975 .. versionadded:: 3.8.0
1976
1977 date_dtype (Optional[pandas.Series.dtype, None]):
1978 If set, indicate a pandas ExtensionDtype (e.g.
1979 ``pandas.ArrowDtype(pyarrow.date32())``) to convert BigQuery Date
1980 type, instead of relying on the default ``db_dtypes.DateDtype()``.
1981 If you explicitly set the value to ``None``, then the data type will be
1982 ``numpy.dtype("datetime64[ns]")`` or ``object`` if out of bound. BigQuery
1983 Date type can be found at:
1984 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#date_type
1985
1986 .. versionadded:: 3.10.0
1987
1988 datetime_dtype (Optional[pandas.Series.dtype, None]):
1989 If set, indicate a pandas ExtensionDtype (e.g.
1990 ``pandas.ArrowDtype(pyarrow.timestamp("us"))``) to convert BigQuery Datetime
1991 type, instead of relying on the default ``numpy.dtype("datetime64[ns]``.
1992 If you explicitly set the value to ``None``, then the data type will be
1993 ``numpy.dtype("datetime64[ns]")`` or ``object`` if out of bound. BigQuery
1994 Datetime type can be found at:
1995 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#datetime_type
1996
1997 .. versionadded:: 3.10.0
1998
1999 time_dtype (Optional[pandas.Series.dtype, None]):
2000 If set, indicate a pandas ExtensionDtype (e.g.
2001 ``pandas.ArrowDtype(pyarrow.time64("us"))``) to convert BigQuery Time
2002 type, instead of relying on the default ``db_dtypes.TimeDtype()``.
2003 If you explicitly set the value to ``None``, then the data type will be
2004 ``numpy.dtype("object")``. BigQuery Time type can be found at:
2005 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#time_type
2006
2007 .. versionadded:: 3.10.0
2008
2009 timestamp_dtype (Optional[pandas.Series.dtype, None]):
2010 If set, indicate a pandas ExtensionDtype (e.g.
2011 ``pandas.ArrowDtype(pyarrow.timestamp("us", tz="UTC"))``) to convert BigQuery Timestamp
2012 type, instead of relying on the default ``numpy.dtype("datetime64[ns, UTC]")``.
2013 If you explicitly set the value to ``None``, then the data type will be
2014 ``numpy.dtype("datetime64[ns, UTC]")`` or ``object`` if out of bound. BigQuery
2015 Datetime type can be found at:
2016 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#timestamp_type
2017
2018 .. versionadded:: 3.10.0
2019
2020 range_date_dtype (Optional[pandas.Series.dtype, None]):
2021 If set, indicate a pandas ExtensionDtype, such as:
2022
2023 .. code-block:: python
2024
2025 pandas.ArrowDtype(pyarrow.struct(
2026 [("start", pyarrow.date32()), ("end", pyarrow.date32())]
2027 ))
2028
2029 to convert BigQuery RANGE<DATE> type, instead of relying on
2030 the default ``object``. If you explicitly set the value to
2031 ``None``, the data type will be ``object``. BigQuery Range type
2032 can be found at:
2033 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#range_type
2034
2035 .. versionadded:: 3.21.0
2036
2037 range_datetime_dtype (Optional[pandas.Series.dtype, None]):
2038 If set, indicate a pandas ExtensionDtype, such as:
2039
2040 .. code-block:: python
2041
2042 pandas.ArrowDtype(pyarrow.struct(
2043 [
2044 ("start", pyarrow.timestamp("us")),
2045 ("end", pyarrow.timestamp("us")),
2046 ]
2047 ))
2048
2049 to convert BigQuery RANGE<DATETIME> type, instead of relying on
2050 the default ``object``. If you explicitly set the value to
2051 ``None``, the data type will be ``object``. BigQuery Range type
2052 can be found at:
2053 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#range_type
2054
2055 .. versionadded:: 3.21.0
2056
2057 range_timestamp_dtype (Optional[pandas.Series.dtype, None]):
2058 If set, indicate a pandas ExtensionDtype, such as:
2059
2060 .. code-block:: python
2061
2062 pandas.ArrowDtype(pyarrow.struct(
2063 [
2064 ("start", pyarrow.timestamp("us", tz="UTC")),
2065 ("end", pyarrow.timestamp("us", tz="UTC")),
2066 ]
2067 ))
2068
2069 to convert BigQuery RANGE<TIMESTAMP> type, instead of relying
2070 on the default ``object``. If you explicitly set the value to
2071 ``None``, the data type will be ``object``. BigQuery Range type
2072 can be found at:
2073 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#range_type
2074
2075 .. versionadded:: 3.21.0
2076
2077 Returns:
2078 pandas.DataFrame:
2079 A :class:`~pandas.DataFrame` populated with row data
2080 and column headers from the query results. The column
2081 headers are derived from the destination table's
2082 schema.
2083
2084 Raises:
2085 ValueError:
2086 If the :mod:`pandas` library cannot be imported, or
2087 the :mod:`google.cloud.bigquery_storage_v1` module is
2088 required but cannot be imported. Also if
2089 `geography_as_object` is `True`, but the
2090 :mod:`shapely` library cannot be imported.
2091 """
2092 query_result = wait_for_query(self, progress_bar_type, max_results=max_results)
2093 return query_result.to_dataframe(
2094 bqstorage_client=bqstorage_client,
2095 dtypes=dtypes,
2096 progress_bar_type=progress_bar_type,
2097 create_bqstorage_client=create_bqstorage_client,
2098 geography_as_object=geography_as_object,
2099 bool_dtype=bool_dtype,
2100 int_dtype=int_dtype,
2101 float_dtype=float_dtype,
2102 string_dtype=string_dtype,
2103 date_dtype=date_dtype,
2104 datetime_dtype=datetime_dtype,
2105 time_dtype=time_dtype,
2106 timestamp_dtype=timestamp_dtype,
2107 range_date_dtype=range_date_dtype,
2108 range_datetime_dtype=range_datetime_dtype,
2109 range_timestamp_dtype=range_timestamp_dtype,
2110 )
2111
2112 # If changing the signature of this method, make sure to apply the same
2113 # changes to table.RowIterator.to_dataframe(), except for the max_results parameter
2114 # that should only exist here in the QueryJob method.
2115 def to_geodataframe(
2116 self,
2117 bqstorage_client: Optional["bigquery_storage.BigQueryReadClient"] = None,
2118 dtypes: Optional[Dict[str, Any]] = None,
2119 progress_bar_type: Optional[str] = None,
2120 create_bqstorage_client: bool = True,
2121 max_results: Optional[int] = None,
2122 geography_column: Optional[str] = None,
2123 bool_dtype: Union[Any, None] = DefaultPandasDTypes.BOOL_DTYPE,
2124 int_dtype: Union[Any, None] = DefaultPandasDTypes.INT_DTYPE,
2125 float_dtype: Union[Any, None] = None,
2126 string_dtype: Union[Any, None] = None,
2127 ) -> "geopandas.GeoDataFrame":
2128 """Return a GeoPandas GeoDataFrame from a QueryJob
2129
2130 Args:
2131 bqstorage_client (Optional[google.cloud.bigquery_storage_v1.BigQueryReadClient]):
2132 A BigQuery Storage API client. If supplied, use the faster
2133 BigQuery Storage API to fetch rows from BigQuery. This
2134 API is a billable API.
2135
2136 This method requires the ``fastavro`` and
2137 ``google-cloud-bigquery-storage`` libraries.
2138
2139 Reading from a specific partition or snapshot is not
2140 currently supported by this method.
2141
2142 dtypes (Optional[Map[str, Union[str, pandas.Series.dtype]]]):
2143 A dictionary of column names pandas ``dtype``s. The provided
2144 ``dtype`` is used when constructing the series for the column
2145 specified. Otherwise, the default pandas behavior is used.
2146
2147 progress_bar_type (Optional[str]):
2148 If set, use the `tqdm <https://tqdm.github.io/>`_ library to
2149 display a progress bar while the data downloads. Install the
2150 ``tqdm`` package to use this feature.
2151
2152 See
2153 :func:`~google.cloud.bigquery.table.RowIterator.to_dataframe`
2154 for details.
2155
2156 .. versionadded:: 1.11.0
2157 create_bqstorage_client (Optional[bool]):
2158 If ``True`` (default), create a BigQuery Storage API client
2159 using the default API settings. The BigQuery Storage API
2160 is a faster way to fetch rows from BigQuery. See the
2161 ``bqstorage_client`` parameter for more information.
2162
2163 This argument does nothing if ``bqstorage_client`` is supplied.
2164
2165 .. versionadded:: 1.24.0
2166
2167 max_results (Optional[int]):
2168 Maximum number of rows to include in the result. No limit by default.
2169
2170 .. versionadded:: 2.21.0
2171
2172 geography_column (Optional[str]):
2173 If there are more than one GEOGRAPHY column,
2174 identifies which one to use to construct a GeoPandas
2175 GeoDataFrame. This option can be ommitted if there's
2176 only one GEOGRAPHY column.
2177 bool_dtype (Optional[pandas.Series.dtype, None]):
2178 If set, indicate a pandas ExtensionDtype (e.g. ``pandas.BooleanDtype()``)
2179 to convert BigQuery Boolean type, instead of relying on the default
2180 ``pandas.BooleanDtype()``. If you explicitly set the value to ``None``,
2181 then the data type will be ``numpy.dtype("bool")``. BigQuery Boolean
2182 type can be found at:
2183 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#boolean_type
2184 int_dtype (Optional[pandas.Series.dtype, None]):
2185 If set, indicate a pandas ExtensionDtype (e.g. ``pandas.Int64Dtype()``)
2186 to convert BigQuery Integer types, instead of relying on the default
2187 ``pandas.Int64Dtype()``. If you explicitly set the value to ``None``,
2188 then the data type will be ``numpy.dtype("int64")``. A list of BigQuery
2189 Integer types can be found at:
2190 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#integer_types
2191 float_dtype (Optional[pandas.Series.dtype, None]):
2192 If set, indicate a pandas ExtensionDtype (e.g. ``pandas.Float32Dtype()``)
2193 to convert BigQuery Float type, instead of relying on the default
2194 ``numpy.dtype("float64")``. If you explicitly set the value to ``None``,
2195 then the data type will be ``numpy.dtype("float64")``. BigQuery Float
2196 type can be found at:
2197 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#floating_point_types
2198 string_dtype (Optional[pandas.Series.dtype, None]):
2199 If set, indicate a pandas ExtensionDtype (e.g. ``pandas.StringDtype()``) to
2200 convert BigQuery String type, instead of relying on the default
2201 ``numpy.dtype("object")``. If you explicitly set the value to ``None``,
2202 then the data type will be ``numpy.dtype("object")``. BigQuery String
2203 type can be found at:
2204 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#string_type
2205
2206 Returns:
2207 geopandas.GeoDataFrame:
2208 A :class:`geopandas.GeoDataFrame` populated with row
2209 data and column headers from the query results. The
2210 column headers are derived from the destination
2211 table's schema.
2212
2213 Raises:
2214 ValueError:
2215 If the :mod:`geopandas` library cannot be imported, or the
2216 :mod:`google.cloud.bigquery_storage_v1` module is
2217 required but cannot be imported.
2218
2219 .. versionadded:: 2.24.0
2220 """
2221 query_result = wait_for_query(self, progress_bar_type, max_results=max_results)
2222 return query_result.to_geodataframe(
2223 bqstorage_client=bqstorage_client,
2224 dtypes=dtypes,
2225 progress_bar_type=progress_bar_type,
2226 create_bqstorage_client=create_bqstorage_client,
2227 geography_column=geography_column,
2228 bool_dtype=bool_dtype,
2229 int_dtype=int_dtype,
2230 float_dtype=float_dtype,
2231 string_dtype=string_dtype,
2232 )
2233
2234 def __iter__(self):
2235 return iter(self.result())
2236
2237
2238class QueryPlanEntryStep(object):
2239 """Map a single step in a query plan entry.
2240
2241 Args:
2242 kind (str): step type.
2243 substeps (List): names of substeps.
2244 """
2245
2246 def __init__(self, kind, substeps):
2247 self.kind = kind
2248 self.substeps = list(substeps)
2249
2250 @classmethod
2251 def from_api_repr(cls, resource: dict) -> "QueryPlanEntryStep":
2252 """Factory: construct instance from the JSON repr.
2253
2254 Args:
2255 resource (Dict): JSON representation of the entry.
2256
2257 Returns:
2258 google.cloud.bigquery.job.QueryPlanEntryStep:
2259 New instance built from the resource.
2260 """
2261 return cls(kind=resource.get("kind"), substeps=resource.get("substeps", ()))
2262
2263 def __eq__(self, other):
2264 if not isinstance(other, self.__class__):
2265 return NotImplemented
2266 return self.kind == other.kind and self.substeps == other.substeps
2267
2268
2269class QueryPlanEntry(object):
2270 """QueryPlanEntry represents a single stage of a query execution plan.
2271
2272 See
2273 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#ExplainQueryStage
2274 for the underlying API representation within query statistics.
2275 """
2276
2277 def __init__(self):
2278 self._properties = {}
2279
2280 @classmethod
2281 def from_api_repr(cls, resource: dict) -> "QueryPlanEntry":
2282 """Factory: construct instance from the JSON repr.
2283
2284 Args:
2285 resource(Dict[str: object]):
2286 ExplainQueryStage representation returned from API.
2287
2288 Returns:
2289 google.cloud.bigquery.job.QueryPlanEntry:
2290 Query plan entry parsed from ``resource``.
2291 """
2292 entry = cls()
2293 entry._properties = resource
2294 return entry
2295
2296 @property
2297 def name(self):
2298 """Optional[str]: Human-readable name of the stage."""
2299 return self._properties.get("name")
2300
2301 @property
2302 def entry_id(self):
2303 """Optional[str]: Unique ID for the stage within the plan."""
2304 return self._properties.get("id")
2305
2306 @property
2307 def start(self):
2308 """Optional[Datetime]: Datetime when the stage started."""
2309 if self._properties.get("startMs") is None:
2310 return None
2311 return _helpers._datetime_from_microseconds(
2312 int(self._properties.get("startMs")) * 1000.0
2313 )
2314
2315 @property
2316 def end(self):
2317 """Optional[Datetime]: Datetime when the stage ended."""
2318 if self._properties.get("endMs") is None:
2319 return None
2320 return _helpers._datetime_from_microseconds(
2321 int(self._properties.get("endMs")) * 1000.0
2322 )
2323
2324 @property
2325 def input_stages(self):
2326 """List(int): Entry IDs for stages that were inputs for this stage."""
2327 if self._properties.get("inputStages") is None:
2328 return []
2329 return [
2330 _helpers._int_or_none(entry)
2331 for entry in self._properties.get("inputStages")
2332 ]
2333
2334 @property
2335 def parallel_inputs(self):
2336 """Optional[int]: Number of parallel input segments within
2337 the stage.
2338 """
2339 return _helpers._int_or_none(self._properties.get("parallelInputs"))
2340
2341 @property
2342 def completed_parallel_inputs(self):
2343 """Optional[int]: Number of parallel input segments completed."""
2344 return _helpers._int_or_none(self._properties.get("completedParallelInputs"))
2345
2346 @property
2347 def wait_ms_avg(self):
2348 """Optional[int]: Milliseconds the average worker spent waiting to
2349 be scheduled.
2350 """
2351 return _helpers._int_or_none(self._properties.get("waitMsAvg"))
2352
2353 @property
2354 def wait_ms_max(self):
2355 """Optional[int]: Milliseconds the slowest worker spent waiting to
2356 be scheduled.
2357 """
2358 return _helpers._int_or_none(self._properties.get("waitMsMax"))
2359
2360 @property
2361 def wait_ratio_avg(self):
2362 """Optional[float]: Ratio of time the average worker spent waiting
2363 to be scheduled, relative to the longest time spent by any worker in
2364 any stage of the overall plan.
2365 """
2366 return self._properties.get("waitRatioAvg")
2367
2368 @property
2369 def wait_ratio_max(self):
2370 """Optional[float]: Ratio of time the slowest worker spent waiting
2371 to be scheduled, relative to the longest time spent by any worker in
2372 any stage of the overall plan.
2373 """
2374 return self._properties.get("waitRatioMax")
2375
2376 @property
2377 def read_ms_avg(self):
2378 """Optional[int]: Milliseconds the average worker spent reading
2379 input.
2380 """
2381 return _helpers._int_or_none(self._properties.get("readMsAvg"))
2382
2383 @property
2384 def read_ms_max(self):
2385 """Optional[int]: Milliseconds the slowest worker spent reading
2386 input.
2387 """
2388 return _helpers._int_or_none(self._properties.get("readMsMax"))
2389
2390 @property
2391 def read_ratio_avg(self):
2392 """Optional[float]: Ratio of time the average worker spent reading
2393 input, relative to the longest time spent by any worker in any stage
2394 of the overall plan.
2395 """
2396 return self._properties.get("readRatioAvg")
2397
2398 @property
2399 def read_ratio_max(self):
2400 """Optional[float]: Ratio of time the slowest worker spent reading
2401 to be scheduled, relative to the longest time spent by any worker in
2402 any stage of the overall plan.
2403 """
2404 return self._properties.get("readRatioMax")
2405
2406 @property
2407 def compute_ms_avg(self):
2408 """Optional[int]: Milliseconds the average worker spent on CPU-bound
2409 processing.
2410 """
2411 return _helpers._int_or_none(self._properties.get("computeMsAvg"))
2412
2413 @property
2414 def compute_ms_max(self):
2415 """Optional[int]: Milliseconds the slowest worker spent on CPU-bound
2416 processing.
2417 """
2418 return _helpers._int_or_none(self._properties.get("computeMsMax"))
2419
2420 @property
2421 def compute_ratio_avg(self):
2422 """Optional[float]: Ratio of time the average worker spent on
2423 CPU-bound processing, relative to the longest time spent by any
2424 worker in any stage of the overall plan.
2425 """
2426 return self._properties.get("computeRatioAvg")
2427
2428 @property
2429 def compute_ratio_max(self):
2430 """Optional[float]: Ratio of time the slowest worker spent on
2431 CPU-bound processing, relative to the longest time spent by any
2432 worker in any stage of the overall plan.
2433 """
2434 return self._properties.get("computeRatioMax")
2435
2436 @property
2437 def write_ms_avg(self):
2438 """Optional[int]: Milliseconds the average worker spent writing
2439 output data.
2440 """
2441 return _helpers._int_or_none(self._properties.get("writeMsAvg"))
2442
2443 @property
2444 def write_ms_max(self):
2445 """Optional[int]: Milliseconds the slowest worker spent writing
2446 output data.
2447 """
2448 return _helpers._int_or_none(self._properties.get("writeMsMax"))
2449
2450 @property
2451 def write_ratio_avg(self):
2452 """Optional[float]: Ratio of time the average worker spent writing
2453 output data, relative to the longest time spent by any worker in any
2454 stage of the overall plan.
2455 """
2456 return self._properties.get("writeRatioAvg")
2457
2458 @property
2459 def write_ratio_max(self):
2460 """Optional[float]: Ratio of time the slowest worker spent writing
2461 output data, relative to the longest time spent by any worker in any
2462 stage of the overall plan.
2463 """
2464 return self._properties.get("writeRatioMax")
2465
2466 @property
2467 def records_read(self):
2468 """Optional[int]: Number of records read by this stage."""
2469 return _helpers._int_or_none(self._properties.get("recordsRead"))
2470
2471 @property
2472 def records_written(self):
2473 """Optional[int]: Number of records written by this stage."""
2474 return _helpers._int_or_none(self._properties.get("recordsWritten"))
2475
2476 @property
2477 def status(self):
2478 """Optional[str]: status of this stage."""
2479 return self._properties.get("status")
2480
2481 @property
2482 def shuffle_output_bytes(self):
2483 """Optional[int]: Number of bytes written by this stage to
2484 intermediate shuffle.
2485 """
2486 return _helpers._int_or_none(self._properties.get("shuffleOutputBytes"))
2487
2488 @property
2489 def shuffle_output_bytes_spilled(self):
2490 """Optional[int]: Number of bytes written by this stage to
2491 intermediate shuffle and spilled to disk.
2492 """
2493 return _helpers._int_or_none(self._properties.get("shuffleOutputBytesSpilled"))
2494
2495 @property
2496 def steps(self):
2497 """List(QueryPlanEntryStep): List of step operations performed by
2498 each worker in the stage.
2499 """
2500 return [
2501 QueryPlanEntryStep.from_api_repr(step)
2502 for step in self._properties.get("steps", [])
2503 ]
2504
2505 @property
2506 def slot_ms(self):
2507 """Optional[int]: Slot-milliseconds used by the stage."""
2508 return _helpers._int_or_none(self._properties.get("slotMs"))
2509
2510
2511class TimelineEntry(object):
2512 """TimelineEntry represents progress of a query job at a particular
2513 point in time.
2514
2515 See
2516 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#querytimelinesample
2517 for the underlying API representation within query statistics.
2518 """
2519
2520 def __init__(self):
2521 self._properties = {}
2522
2523 @classmethod
2524 def from_api_repr(cls, resource):
2525 """Factory: construct instance from the JSON repr.
2526
2527 Args:
2528 resource(Dict[str: object]):
2529 QueryTimelineSample representation returned from API.
2530
2531 Returns:
2532 google.cloud.bigquery.TimelineEntry:
2533 Timeline sample parsed from ``resource``.
2534 """
2535 entry = cls()
2536 entry._properties = resource
2537 return entry
2538
2539 @property
2540 def elapsed_ms(self):
2541 """Optional[int]: Milliseconds elapsed since start of query
2542 execution."""
2543 return _helpers._int_or_none(self._properties.get("elapsedMs"))
2544
2545 @property
2546 def active_units(self):
2547 """Optional[int]: Current number of input units being processed
2548 by workers, reported as largest value since the last sample."""
2549 return _helpers._int_or_none(self._properties.get("activeUnits"))
2550
2551 @property
2552 def pending_units(self):
2553 """Optional[int]: Current number of input units remaining for
2554 query stages active at this sample time."""
2555 return _helpers._int_or_none(self._properties.get("pendingUnits"))
2556
2557 @property
2558 def completed_units(self):
2559 """Optional[int]: Current number of input units completed by
2560 this query."""
2561 return _helpers._int_or_none(self._properties.get("completedUnits"))
2562
2563 @property
2564 def slot_millis(self):
2565 """Optional[int]: Cumulative slot-milliseconds consumed by
2566 this query."""
2567 return _helpers._int_or_none(self._properties.get("totalSlotMs"))