1# Copyright 2015 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Classes for query jobs."""
16
17import concurrent.futures
18import copy
19import re
20import time
21import typing
22from typing import Any, Dict, Iterable, List, Optional, Union
23
24from google.api_core import exceptions
25from google.api_core import retry as retries
26import requests
27
28from google.cloud.bigquery.dataset import Dataset
29from google.cloud.bigquery.dataset import DatasetListItem
30from google.cloud.bigquery.dataset import DatasetReference
31from google.cloud.bigquery.encryption_configuration import EncryptionConfiguration
32from google.cloud.bigquery.enums import KeyResultStatementKind, DefaultPandasDTypes
33from google.cloud.bigquery.external_config import ExternalConfig
34from google.cloud.bigquery import _helpers
35from google.cloud.bigquery.query import (
36 _query_param_from_api_repr,
37 ArrayQueryParameter,
38 ConnectionProperty,
39 ScalarQueryParameter,
40 StructQueryParameter,
41 UDFResource,
42)
43from google.cloud.bigquery.retry import (
44 DEFAULT_RETRY,
45 DEFAULT_JOB_RETRY,
46 POLLING_DEFAULT_VALUE,
47)
48from google.cloud.bigquery.routine import RoutineReference
49from google.cloud.bigquery.schema import SchemaField
50from google.cloud.bigquery.table import _EmptyRowIterator
51from google.cloud.bigquery.table import RangePartitioning
52from google.cloud.bigquery.table import _table_arg_to_table_ref
53from google.cloud.bigquery.table import TableReference
54from google.cloud.bigquery.table import TimePartitioning
55from google.cloud.bigquery._tqdm_helpers import wait_for_query
56
57from google.cloud.bigquery.job.base import _AsyncJob
58from google.cloud.bigquery.job.base import _JobConfig
59from google.cloud.bigquery.job.base import _JobReference
60
61try:
62 import pandas # type: ignore
63except ImportError:
64 pandas = None
65
66if typing.TYPE_CHECKING: # pragma: NO COVER
67 # Assumption: type checks are only used by library developers and CI environments
68 # that have all optional dependencies installed, thus no conditional imports.
69 import pandas # type: ignore
70 import geopandas # type: ignore
71 import pyarrow # type: ignore
72 from google.cloud import bigquery_storage
73 from google.cloud.bigquery.client import Client
74 from google.cloud.bigquery.table import RowIterator
75
76
77_CONTAINS_ORDER_BY = re.compile(r"ORDER\s+BY", re.IGNORECASE)
78_EXCEPTION_FOOTER_TEMPLATE = "{message}\n\nLocation: {location}\nJob ID: {job_id}\n"
79_TIMEOUT_BUFFER_SECS = 0.1
80
81
82def _contains_order_by(query):
83 """Do we need to preserve the order of the query results?
84
85 This function has known false positives, such as with ordered window
86 functions:
87
88 .. code-block:: sql
89
90 SELECT SUM(x) OVER (
91 window_name
92 PARTITION BY...
93 ORDER BY...
94 window_frame_clause)
95 FROM ...
96
97 This false positive failure case means the behavior will be correct, but
98 downloading results with the BigQuery Storage API may be slower than it
99 otherwise would. This is preferable to the false negative case, where
100 results are expected to be in order but are not (due to parallel reads).
101 """
102 return query and _CONTAINS_ORDER_BY.search(query)
103
104
105def _from_api_repr_query_parameters(resource):
106 return [_query_param_from_api_repr(mapping) for mapping in resource]
107
108
109def _to_api_repr_query_parameters(value):
110 return [query_parameter.to_api_repr() for query_parameter in value]
111
112
113def _from_api_repr_udf_resources(resource):
114 udf_resources = []
115 for udf_mapping in resource:
116 for udf_type, udf_value in udf_mapping.items():
117 udf_resources.append(UDFResource(udf_type, udf_value))
118 return udf_resources
119
120
121def _to_api_repr_udf_resources(value):
122 return [{udf_resource.udf_type: udf_resource.value} for udf_resource in value]
123
124
125def _from_api_repr_table_defs(resource):
126 return {k: ExternalConfig.from_api_repr(v) for k, v in resource.items()}
127
128
129def _to_api_repr_table_defs(value):
130 return {k: ExternalConfig.to_api_repr(v) for k, v in value.items()}
131
132
133class BiEngineReason(typing.NamedTuple):
134 """Reason for BI Engine acceleration failure
135
136 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#bienginereason
137 """
138
139 code: str = "CODE_UNSPECIFIED"
140
141 reason: str = ""
142
143 @classmethod
144 def from_api_repr(cls, reason: Dict[str, str]) -> "BiEngineReason":
145 return cls(reason.get("code", "CODE_UNSPECIFIED"), reason.get("message", ""))
146
147
148class BiEngineStats(typing.NamedTuple):
149 """Statistics for a BI Engine query
150
151 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#bienginestatistics
152 """
153
154 mode: str = "ACCELERATION_MODE_UNSPECIFIED"
155 """ Specifies which mode of BI Engine acceleration was performed (if any)
156 """
157
158 reasons: List[BiEngineReason] = []
159 """ Contains explanatory messages in case of DISABLED / PARTIAL acceleration
160 """
161
162 @classmethod
163 def from_api_repr(cls, stats: Dict[str, Any]) -> "BiEngineStats":
164 mode = stats.get("biEngineMode", "ACCELERATION_MODE_UNSPECIFIED")
165 reasons = [
166 BiEngineReason.from_api_repr(r) for r in stats.get("biEngineReasons", [])
167 ]
168 return cls(mode, reasons)
169
170
171class DmlStats(typing.NamedTuple):
172 """Detailed statistics for DML statements.
173
174 https://cloud.google.com/bigquery/docs/reference/rest/v2/DmlStats
175 """
176
177 inserted_row_count: int = 0
178 """Number of inserted rows. Populated by DML INSERT and MERGE statements."""
179
180 deleted_row_count: int = 0
181 """Number of deleted rows. populated by DML DELETE, MERGE and TRUNCATE statements.
182 """
183
184 updated_row_count: int = 0
185 """Number of updated rows. Populated by DML UPDATE and MERGE statements."""
186
187 @classmethod
188 def from_api_repr(cls, stats: Dict[str, str]) -> "DmlStats":
189 # NOTE: The field order here must match the order of fields set at the
190 # class level.
191 api_fields = ("insertedRowCount", "deletedRowCount", "updatedRowCount")
192
193 args = (
194 int(stats.get(api_field, default_val))
195 for api_field, default_val in zip(api_fields, cls.__new__.__defaults__) # type: ignore
196 )
197 return cls(*args)
198
199
200class IndexUnusedReason(typing.NamedTuple):
201 """Reason about why no search index was used in the search query (or sub-query).
202
203 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#indexunusedreason
204 """
205
206 code: Optional[str] = None
207 """Specifies the high-level reason for the scenario when no search index was used.
208 """
209
210 message: Optional[str] = None
211 """Free form human-readable reason for the scenario when no search index was used.
212 """
213
214 baseTable: Optional[TableReference] = None
215 """Specifies the base table involved in the reason that no search index was used.
216 """
217
218 indexName: Optional[str] = None
219 """Specifies the name of the unused search index, if available."""
220
221 @classmethod
222 def from_api_repr(cls, reason):
223 code = reason.get("code")
224 message = reason.get("message")
225 baseTable = reason.get("baseTable")
226 indexName = reason.get("indexName")
227
228 return cls(code, message, baseTable, indexName)
229
230
231class SearchStats(typing.NamedTuple):
232 """Statistics related to Search Queries. Populated as part of JobStatistics2.
233
234 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#searchstatistics
235 """
236
237 mode: Optional[str] = None
238 """Indicates the type of search index usage in the entire search query."""
239
240 reason: List[IndexUnusedReason] = []
241 """Reason about why no search index was used in the search query (or sub-query)"""
242
243 @classmethod
244 def from_api_repr(cls, stats: Dict[str, Any]):
245 mode = stats.get("indexUsageMode", None)
246 reason = [
247 IndexUnusedReason.from_api_repr(r)
248 for r in stats.get("indexUnusedReasons", [])
249 ]
250 return cls(mode, reason)
251
252
253class ScriptOptions:
254 """Options controlling the execution of scripts.
255
256 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#ScriptOptions
257 """
258
259 def __init__(
260 self,
261 statement_timeout_ms: Optional[int] = None,
262 statement_byte_budget: Optional[int] = None,
263 key_result_statement: Optional[KeyResultStatementKind] = None,
264 ):
265 self._properties: Dict[str, Any] = {}
266 self.statement_timeout_ms = statement_timeout_ms
267 self.statement_byte_budget = statement_byte_budget
268 self.key_result_statement = key_result_statement
269
270 @classmethod
271 def from_api_repr(cls, resource: Dict[str, Any]) -> "ScriptOptions":
272 """Factory: construct instance from the JSON repr.
273
274 Args:
275 resource(Dict[str: Any]):
276 ScriptOptions representation returned from API.
277
278 Returns:
279 google.cloud.bigquery.ScriptOptions:
280 ScriptOptions sample parsed from ``resource``.
281 """
282 entry = cls()
283 entry._properties = copy.deepcopy(resource)
284 return entry
285
286 def to_api_repr(self) -> Dict[str, Any]:
287 """Construct the API resource representation."""
288 return copy.deepcopy(self._properties)
289
290 @property
291 def statement_timeout_ms(self) -> Union[int, None]:
292 """Timeout period for each statement in a script."""
293 return _helpers._int_or_none(self._properties.get("statementTimeoutMs"))
294
295 @statement_timeout_ms.setter
296 def statement_timeout_ms(self, value: Union[int, None]):
297 new_value = None if value is None else str(value)
298 self._properties["statementTimeoutMs"] = new_value
299
300 @property
301 def statement_byte_budget(self) -> Union[int, None]:
302 """Limit on the number of bytes billed per statement.
303
304 Exceeding this budget results in an error.
305 """
306 return _helpers._int_or_none(self._properties.get("statementByteBudget"))
307
308 @statement_byte_budget.setter
309 def statement_byte_budget(self, value: Union[int, None]):
310 new_value = None if value is None else str(value)
311 self._properties["statementByteBudget"] = new_value
312
313 @property
314 def key_result_statement(self) -> Union[KeyResultStatementKind, None]:
315 """Determines which statement in the script represents the "key result".
316
317 This is used to populate the schema and query results of the script job.
318 Default is ``KeyResultStatementKind.LAST``.
319 """
320 return self._properties.get("keyResultStatement")
321
322 @key_result_statement.setter
323 def key_result_statement(self, value: Union[KeyResultStatementKind, None]):
324 self._properties["keyResultStatement"] = value
325
326
327class QueryJobConfig(_JobConfig):
328 """Configuration options for query jobs.
329
330 All properties in this class are optional. Values which are :data:`None` ->
331 server defaults. Set properties on the constructed configuration by using
332 the property name as the name of a keyword argument.
333 """
334
335 def __init__(self, **kwargs) -> None:
336 super(QueryJobConfig, self).__init__("query", **kwargs)
337
338 @property
339 def destination_encryption_configuration(self):
340 """google.cloud.bigquery.encryption_configuration.EncryptionConfiguration: Custom
341 encryption configuration for the destination table.
342
343 Custom encryption configuration (e.g., Cloud KMS keys) or :data:`None`
344 if using default encryption.
345
346 See
347 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.destination_encryption_configuration
348 """
349 prop = self._get_sub_prop("destinationEncryptionConfiguration")
350 if prop is not None:
351 prop = EncryptionConfiguration.from_api_repr(prop)
352 return prop
353
354 @destination_encryption_configuration.setter
355 def destination_encryption_configuration(self, value):
356 api_repr = value
357 if value is not None:
358 api_repr = value.to_api_repr()
359 self._set_sub_prop("destinationEncryptionConfiguration", api_repr)
360
361 @property
362 def allow_large_results(self):
363 """bool: Allow large query results tables (legacy SQL, only)
364
365 See
366 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.allow_large_results
367 """
368 return self._get_sub_prop("allowLargeResults")
369
370 @allow_large_results.setter
371 def allow_large_results(self, value):
372 self._set_sub_prop("allowLargeResults", value)
373
374 @property
375 def connection_properties(self) -> List[ConnectionProperty]:
376 """Connection properties.
377
378 See
379 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.connection_properties
380
381 .. versionadded:: 2.29.0
382 """
383 resource = self._get_sub_prop("connectionProperties", [])
384 return [ConnectionProperty.from_api_repr(prop) for prop in resource]
385
386 @connection_properties.setter
387 def connection_properties(self, value: Iterable[ConnectionProperty]):
388 self._set_sub_prop(
389 "connectionProperties",
390 [prop.to_api_repr() for prop in value],
391 )
392
393 @property
394 def create_disposition(self):
395 """google.cloud.bigquery.job.CreateDisposition: Specifies behavior
396 for creating tables.
397
398 See
399 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.create_disposition
400 """
401 return self._get_sub_prop("createDisposition")
402
403 @create_disposition.setter
404 def create_disposition(self, value):
405 self._set_sub_prop("createDisposition", value)
406
407 @property
408 def create_session(self) -> Optional[bool]:
409 """[Preview] If :data:`True`, creates a new session, where
410 :attr:`~google.cloud.bigquery.job.QueryJob.session_info` will contain a
411 random server generated session id.
412
413 If :data:`False`, runs query with an existing ``session_id`` passed in
414 :attr:`~google.cloud.bigquery.job.QueryJobConfig.connection_properties`,
415 otherwise runs query in non-session mode.
416
417 See
418 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.create_session
419
420 .. versionadded:: 2.29.0
421 """
422 return self._get_sub_prop("createSession")
423
424 @create_session.setter
425 def create_session(self, value: Optional[bool]):
426 self._set_sub_prop("createSession", value)
427
428 @property
429 def default_dataset(self):
430 """google.cloud.bigquery.dataset.DatasetReference: the default dataset
431 to use for unqualified table names in the query or :data:`None` if not
432 set.
433
434 The ``default_dataset`` setter accepts:
435
436 - a :class:`~google.cloud.bigquery.dataset.Dataset`, or
437 - a :class:`~google.cloud.bigquery.dataset.DatasetReference`, or
438 - a :class:`str` of the fully-qualified dataset ID in standard SQL
439 format. The value must included a project ID and dataset ID
440 separated by ``.``. For example: ``your-project.your_dataset``.
441
442 See
443 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.default_dataset
444 """
445 prop = self._get_sub_prop("defaultDataset")
446 if prop is not None:
447 prop = DatasetReference.from_api_repr(prop)
448 return prop
449
450 @default_dataset.setter
451 def default_dataset(self, value):
452 if value is None:
453 self._set_sub_prop("defaultDataset", None)
454 return
455
456 if isinstance(value, str):
457 value = DatasetReference.from_string(value)
458
459 if isinstance(value, (Dataset, DatasetListItem)):
460 value = value.reference
461
462 resource = value.to_api_repr()
463 self._set_sub_prop("defaultDataset", resource)
464
465 @property
466 def destination(self):
467 """google.cloud.bigquery.table.TableReference: table where results are
468 written or :data:`None` if not set.
469
470 The ``destination`` setter accepts:
471
472 - a :class:`~google.cloud.bigquery.table.Table`, or
473 - a :class:`~google.cloud.bigquery.table.TableReference`, or
474 - a :class:`str` of the fully-qualified table ID in standard SQL
475 format. The value must included a project ID, dataset ID, and table
476 ID, each separated by ``.``. For example:
477 ``your-project.your_dataset.your_table``.
478
479 .. note::
480
481 Only table ID is passed to the backend, so any configuration
482 in `~google.cloud.bigquery.table.Table` is discarded.
483
484 See
485 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.destination_table
486 """
487 prop = self._get_sub_prop("destinationTable")
488 if prop is not None:
489 prop = TableReference.from_api_repr(prop)
490 return prop
491
492 @destination.setter
493 def destination(self, value):
494 if value is None:
495 self._set_sub_prop("destinationTable", None)
496 return
497
498 value = _table_arg_to_table_ref(value)
499 resource = value.to_api_repr()
500 self._set_sub_prop("destinationTable", resource)
501
502 @property
503 def dry_run(self):
504 """bool: :data:`True` if this query should be a dry run to estimate
505 costs.
506
507 See
508 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfiguration.FIELDS.dry_run
509 """
510 return self._properties.get("dryRun")
511
512 @dry_run.setter
513 def dry_run(self, value):
514 self._properties["dryRun"] = value
515
516 @property
517 def flatten_results(self):
518 """bool: Flatten nested/repeated fields in results. (Legacy SQL only)
519
520 See
521 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.flatten_results
522 """
523 return self._get_sub_prop("flattenResults")
524
525 @flatten_results.setter
526 def flatten_results(self, value):
527 self._set_sub_prop("flattenResults", value)
528
529 @property
530 def maximum_billing_tier(self):
531 """int: Deprecated. Changes the billing tier to allow high-compute
532 queries.
533
534 See
535 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.maximum_billing_tier
536 """
537 return self._get_sub_prop("maximumBillingTier")
538
539 @maximum_billing_tier.setter
540 def maximum_billing_tier(self, value):
541 self._set_sub_prop("maximumBillingTier", value)
542
543 @property
544 def maximum_bytes_billed(self):
545 """int: Maximum bytes to be billed for this job or :data:`None` if not set.
546
547 See
548 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.maximum_bytes_billed
549 """
550 return _helpers._int_or_none(self._get_sub_prop("maximumBytesBilled"))
551
552 @maximum_bytes_billed.setter
553 def maximum_bytes_billed(self, value):
554 self._set_sub_prop("maximumBytesBilled", str(value))
555
556 @property
557 def priority(self):
558 """google.cloud.bigquery.job.QueryPriority: Priority of the query.
559
560 See
561 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.priority
562 """
563 return self._get_sub_prop("priority")
564
565 @priority.setter
566 def priority(self, value):
567 self._set_sub_prop("priority", value)
568
569 @property
570 def query_parameters(self):
571 """List[Union[google.cloud.bigquery.query.ArrayQueryParameter, \
572 google.cloud.bigquery.query.ScalarQueryParameter, \
573 google.cloud.bigquery.query.StructQueryParameter]]: list of parameters
574 for parameterized query (empty by default)
575
576 See:
577 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.query_parameters
578 """
579 prop = self._get_sub_prop("queryParameters", default=[])
580 return _from_api_repr_query_parameters(prop)
581
582 @query_parameters.setter
583 def query_parameters(self, values):
584 self._set_sub_prop("queryParameters", _to_api_repr_query_parameters(values))
585
586 @property
587 def range_partitioning(self):
588 """Optional[google.cloud.bigquery.table.RangePartitioning]:
589 Configures range-based partitioning for destination table.
590
591 .. note::
592 **Beta**. The integer range partitioning feature is in a
593 pre-release state and might change or have limited support.
594
595 Only specify at most one of
596 :attr:`~google.cloud.bigquery.job.LoadJobConfig.time_partitioning` or
597 :attr:`~google.cloud.bigquery.job.LoadJobConfig.range_partitioning`.
598
599 Raises:
600 ValueError:
601 If the value is not
602 :class:`~google.cloud.bigquery.table.RangePartitioning` or
603 :data:`None`.
604 """
605 resource = self._get_sub_prop("rangePartitioning")
606 if resource is not None:
607 return RangePartitioning(_properties=resource)
608
609 @range_partitioning.setter
610 def range_partitioning(self, value):
611 resource = value
612 if isinstance(value, RangePartitioning):
613 resource = value._properties
614 elif value is not None:
615 raise ValueError(
616 "Expected value to be RangePartitioning or None, got {}.".format(value)
617 )
618 self._set_sub_prop("rangePartitioning", resource)
619
620 @property
621 def udf_resources(self):
622 """List[google.cloud.bigquery.query.UDFResource]: user
623 defined function resources (empty by default)
624
625 See:
626 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.user_defined_function_resources
627 """
628 prop = self._get_sub_prop("userDefinedFunctionResources", default=[])
629 return _from_api_repr_udf_resources(prop)
630
631 @udf_resources.setter
632 def udf_resources(self, values):
633 self._set_sub_prop(
634 "userDefinedFunctionResources", _to_api_repr_udf_resources(values)
635 )
636
637 @property
638 def use_legacy_sql(self):
639 """bool: Use legacy SQL syntax.
640
641 See
642 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.use_legacy_sql
643 """
644 return self._get_sub_prop("useLegacySql")
645
646 @use_legacy_sql.setter
647 def use_legacy_sql(self, value):
648 self._set_sub_prop("useLegacySql", value)
649
650 @property
651 def use_query_cache(self):
652 """bool: Look for the query result in the cache.
653
654 See
655 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.use_query_cache
656 """
657 return self._get_sub_prop("useQueryCache")
658
659 @use_query_cache.setter
660 def use_query_cache(self, value):
661 self._set_sub_prop("useQueryCache", value)
662
663 @property
664 def write_disposition(self):
665 """google.cloud.bigquery.job.WriteDisposition: Action that occurs if
666 the destination table already exists.
667
668 See
669 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.write_disposition
670 """
671 return self._get_sub_prop("writeDisposition")
672
673 @write_disposition.setter
674 def write_disposition(self, value):
675 self._set_sub_prop("writeDisposition", value)
676
677 @property
678 def write_incremental_results(self) -> Optional[bool]:
679 """This is only supported for a SELECT query using a temporary table.
680
681 If set, the query is allowed to write results incrementally to the temporary result
682 table. This may incur a performance penalty. This option cannot be used with Legacy SQL.
683
684 This feature is not generally available.
685 """
686 return self._get_sub_prop("writeIncrementalResults")
687
688 @write_incremental_results.setter
689 def write_incremental_results(self, value):
690 self._set_sub_prop("writeIncrementalResults", value)
691
692 @property
693 def table_definitions(self):
694 """Dict[str, google.cloud.bigquery.external_config.ExternalConfig]:
695 Definitions for external tables or :data:`None` if not set.
696
697 See
698 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.external_table_definitions
699 """
700 prop = self._get_sub_prop("tableDefinitions")
701 if prop is not None:
702 prop = _from_api_repr_table_defs(prop)
703 return prop
704
705 @table_definitions.setter
706 def table_definitions(self, values):
707 self._set_sub_prop("tableDefinitions", _to_api_repr_table_defs(values))
708
709 @property
710 def time_partitioning(self):
711 """Optional[google.cloud.bigquery.table.TimePartitioning]: Specifies
712 time-based partitioning for the destination table.
713
714 Only specify at most one of
715 :attr:`~google.cloud.bigquery.job.LoadJobConfig.time_partitioning` or
716 :attr:`~google.cloud.bigquery.job.LoadJobConfig.range_partitioning`.
717
718 Raises:
719 ValueError:
720 If the value is not
721 :class:`~google.cloud.bigquery.table.TimePartitioning` or
722 :data:`None`.
723 """
724 prop = self._get_sub_prop("timePartitioning")
725 if prop is not None:
726 prop = TimePartitioning.from_api_repr(prop)
727 return prop
728
729 @time_partitioning.setter
730 def time_partitioning(self, value):
731 api_repr = value
732 if value is not None:
733 api_repr = value.to_api_repr()
734 self._set_sub_prop("timePartitioning", api_repr)
735
736 @property
737 def clustering_fields(self):
738 """Optional[List[str]]: Fields defining clustering for the table
739
740 (Defaults to :data:`None`).
741
742 Clustering fields are immutable after table creation.
743
744 .. note::
745
746 BigQuery supports clustering for both partitioned and
747 non-partitioned tables.
748 """
749 prop = self._get_sub_prop("clustering")
750 if prop is not None:
751 return list(prop.get("fields", ()))
752
753 @clustering_fields.setter
754 def clustering_fields(self, value):
755 """Optional[List[str]]: Fields defining clustering for the table
756
757 (Defaults to :data:`None`).
758 """
759 if value is not None:
760 self._set_sub_prop("clustering", {"fields": value})
761 else:
762 self._del_sub_prop("clustering")
763
764 @property
765 def schema_update_options(self):
766 """List[google.cloud.bigquery.job.SchemaUpdateOption]: Specifies
767 updates to the destination table schema to allow as a side effect of
768 the query job.
769 """
770 return self._get_sub_prop("schemaUpdateOptions")
771
772 @schema_update_options.setter
773 def schema_update_options(self, values):
774 self._set_sub_prop("schemaUpdateOptions", values)
775
776 @property
777 def script_options(self) -> ScriptOptions:
778 """Options controlling the execution of scripts.
779
780 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#scriptoptions
781 """
782 prop = self._get_sub_prop("scriptOptions")
783 if prop is not None:
784 prop = ScriptOptions.from_api_repr(prop)
785 return prop
786
787 @script_options.setter
788 def script_options(self, value: Union[ScriptOptions, None]):
789 new_value = None if value is None else value.to_api_repr()
790 self._set_sub_prop("scriptOptions", new_value)
791
792 def to_api_repr(self) -> dict:
793 """Build an API representation of the query job config.
794
795 Returns:
796 Dict: A dictionary in the format used by the BigQuery API.
797 """
798 resource = copy.deepcopy(self._properties)
799 # Query parameters have an addition property associated with them
800 # to indicate if the query is using named or positional parameters.
801 query_parameters = resource.get("query", {}).get("queryParameters")
802 if query_parameters:
803 if query_parameters[0].get("name") is None:
804 resource["query"]["parameterMode"] = "POSITIONAL"
805 else:
806 resource["query"]["parameterMode"] = "NAMED"
807
808 return resource
809
810
811class QueryJob(_AsyncJob):
812 """Asynchronous job: query tables.
813
814 Args:
815 job_id (str): the job's ID, within the project belonging to ``client``.
816
817 query (str): SQL query string.
818
819 client (google.cloud.bigquery.client.Client):
820 A client which holds credentials and project configuration
821 for the dataset (which requires a project).
822
823 job_config (Optional[google.cloud.bigquery.job.QueryJobConfig]):
824 Extra configuration options for the query job.
825 """
826
827 _JOB_TYPE = "query"
828 _UDF_KEY = "userDefinedFunctionResources"
829 _CONFIG_CLASS = QueryJobConfig
830
831 def __init__(self, job_id, query, client, job_config=None):
832 super(QueryJob, self).__init__(job_id, client)
833
834 if job_config is not None:
835 self._properties["configuration"] = job_config._properties
836 if self.configuration.use_legacy_sql is None:
837 self.configuration.use_legacy_sql = False
838
839 if query:
840 _helpers._set_sub_prop(
841 self._properties, ["configuration", "query", "query"], query
842 )
843 self._query_results = None
844 self._done_timeout = None
845 self._transport_timeout = None
846
847 @property
848 def allow_large_results(self):
849 """See
850 :attr:`google.cloud.bigquery.job.QueryJobConfig.allow_large_results`.
851 """
852 return self.configuration.allow_large_results
853
854 @property
855 def configuration(self) -> QueryJobConfig:
856 """The configuration for this query job."""
857 return typing.cast(QueryJobConfig, super().configuration)
858
859 @property
860 def connection_properties(self) -> List[ConnectionProperty]:
861 """See
862 :attr:`google.cloud.bigquery.job.QueryJobConfig.connection_properties`.
863
864 .. versionadded:: 2.29.0
865 """
866 return self.configuration.connection_properties
867
868 @property
869 def create_disposition(self):
870 """See
871 :attr:`google.cloud.bigquery.job.QueryJobConfig.create_disposition`.
872 """
873 return self.configuration.create_disposition
874
875 @property
876 def create_session(self) -> Optional[bool]:
877 """See
878 :attr:`google.cloud.bigquery.job.QueryJobConfig.create_session`.
879
880 .. versionadded:: 2.29.0
881 """
882 return self.configuration.create_session
883
884 @property
885 def default_dataset(self):
886 """See
887 :attr:`google.cloud.bigquery.job.QueryJobConfig.default_dataset`.
888 """
889 return self.configuration.default_dataset
890
891 @property
892 def destination(self):
893 """See
894 :attr:`google.cloud.bigquery.job.QueryJobConfig.destination`.
895 """
896 return self.configuration.destination
897
898 @property
899 def destination_encryption_configuration(self):
900 """google.cloud.bigquery.encryption_configuration.EncryptionConfiguration: Custom
901 encryption configuration for the destination table.
902
903 Custom encryption configuration (e.g., Cloud KMS keys) or :data:`None`
904 if using default encryption.
905
906 See
907 :attr:`google.cloud.bigquery.job.QueryJobConfig.destination_encryption_configuration`.
908 """
909 return self.configuration.destination_encryption_configuration
910
911 @property
912 def dry_run(self):
913 """See
914 :attr:`google.cloud.bigquery.job.QueryJobConfig.dry_run`.
915 """
916 return self.configuration.dry_run
917
918 @property
919 def flatten_results(self):
920 """See
921 :attr:`google.cloud.bigquery.job.QueryJobConfig.flatten_results`.
922 """
923 return self.configuration.flatten_results
924
925 @property
926 def priority(self):
927 """See
928 :attr:`google.cloud.bigquery.job.QueryJobConfig.priority`.
929 """
930 return self.configuration.priority
931
932 @property
933 def search_stats(self) -> Optional[SearchStats]:
934 """Returns a SearchStats object."""
935
936 stats = self._job_statistics().get("searchStatistics")
937 if stats is not None:
938 return SearchStats.from_api_repr(stats)
939 return None
940
941 @property
942 def query(self):
943 """str: The query text used in this query job.
944
945 See:
946 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.query
947 """
948 return _helpers._get_sub_prop(
949 self._properties, ["configuration", "query", "query"]
950 )
951
952 @property
953 def query_id(self) -> Optional[str]:
954 """[Preview] ID of a completed query.
955
956 This ID is auto-generated and not guaranteed to be populated.
957 """
958 query_results = self._query_results
959 return query_results.query_id if query_results is not None else None
960
961 @property
962 def query_parameters(self):
963 """See
964 :attr:`google.cloud.bigquery.job.QueryJobConfig.query_parameters`.
965 """
966 return self.configuration.query_parameters
967
968 @property
969 def udf_resources(self):
970 """See
971 :attr:`google.cloud.bigquery.job.QueryJobConfig.udf_resources`.
972 """
973 return self.configuration.udf_resources
974
975 @property
976 def use_legacy_sql(self):
977 """See
978 :attr:`google.cloud.bigquery.job.QueryJobConfig.use_legacy_sql`.
979 """
980 return self.configuration.use_legacy_sql
981
982 @property
983 def use_query_cache(self):
984 """See
985 :attr:`google.cloud.bigquery.job.QueryJobConfig.use_query_cache`.
986 """
987 return self.configuration.use_query_cache
988
989 @property
990 def write_disposition(self):
991 """See
992 :attr:`google.cloud.bigquery.job.QueryJobConfig.write_disposition`.
993 """
994 return self.configuration.write_disposition
995
996 @property
997 def maximum_billing_tier(self):
998 """See
999 :attr:`google.cloud.bigquery.job.QueryJobConfig.maximum_billing_tier`.
1000 """
1001 return self.configuration.maximum_billing_tier
1002
1003 @property
1004 def maximum_bytes_billed(self):
1005 """See
1006 :attr:`google.cloud.bigquery.job.QueryJobConfig.maximum_bytes_billed`.
1007 """
1008 return self.configuration.maximum_bytes_billed
1009
1010 @property
1011 def range_partitioning(self):
1012 """See
1013 :attr:`google.cloud.bigquery.job.QueryJobConfig.range_partitioning`.
1014 """
1015 return self.configuration.range_partitioning
1016
1017 @property
1018 def table_definitions(self):
1019 """See
1020 :attr:`google.cloud.bigquery.job.QueryJobConfig.table_definitions`.
1021 """
1022 return self.configuration.table_definitions
1023
1024 @property
1025 def time_partitioning(self):
1026 """See
1027 :attr:`google.cloud.bigquery.job.QueryJobConfig.time_partitioning`.
1028 """
1029 return self.configuration.time_partitioning
1030
1031 @property
1032 def clustering_fields(self):
1033 """See
1034 :attr:`google.cloud.bigquery.job.QueryJobConfig.clustering_fields`.
1035 """
1036 return self.configuration.clustering_fields
1037
1038 @property
1039 def schema_update_options(self):
1040 """See
1041 :attr:`google.cloud.bigquery.job.QueryJobConfig.schema_update_options`.
1042 """
1043 return self.configuration.schema_update_options
1044
1045 def to_api_repr(self):
1046 """Generate a resource for :meth:`_begin`."""
1047 # Use to_api_repr to allow for some configuration properties to be set
1048 # automatically.
1049 configuration = self.configuration.to_api_repr()
1050 return {
1051 "jobReference": self._properties["jobReference"],
1052 "configuration": configuration,
1053 }
1054
1055 @classmethod
1056 def from_api_repr(cls, resource: dict, client: "Client") -> "QueryJob":
1057 """Factory: construct a job given its API representation
1058
1059 Args:
1060 resource (Dict): dataset job representation returned from the API
1061
1062 client (google.cloud.bigquery.client.Client):
1063 Client which holds credentials and project
1064 configuration for the dataset.
1065
1066 Returns:
1067 google.cloud.bigquery.job.QueryJob: Job parsed from ``resource``.
1068 """
1069 job_ref_properties = resource.setdefault(
1070 "jobReference", {"projectId": client.project, "jobId": None}
1071 )
1072 job_ref = _JobReference._from_api_repr(job_ref_properties)
1073 job = cls(job_ref, None, client=client)
1074 job._set_properties(resource)
1075 return job
1076
1077 @property
1078 def query_plan(self):
1079 """Return query plan from job statistics, if present.
1080
1081 See:
1082 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics2.FIELDS.query_plan
1083
1084 Returns:
1085 List[google.cloud.bigquery.job.QueryPlanEntry]:
1086 mappings describing the query plan, or an empty list
1087 if the query has not yet completed.
1088 """
1089 plan_entries = self._job_statistics().get("queryPlan", ())
1090 return [QueryPlanEntry.from_api_repr(entry) for entry in plan_entries]
1091
1092 @property
1093 def schema(self) -> Optional[List[SchemaField]]:
1094 """The schema of the results.
1095
1096 Present only for successful dry run of non-legacy SQL queries.
1097 """
1098 resource = self._job_statistics().get("schema")
1099 if resource is None:
1100 return None
1101 fields = resource.get("fields", [])
1102 return [SchemaField.from_api_repr(field) for field in fields]
1103
1104 @property
1105 def timeline(self):
1106 """List(TimelineEntry): Return the query execution timeline
1107 from job statistics.
1108 """
1109 raw = self._job_statistics().get("timeline", ())
1110 return [TimelineEntry.from_api_repr(entry) for entry in raw]
1111
1112 @property
1113 def total_bytes_processed(self):
1114 """Return total bytes processed from job statistics, if present.
1115
1116 See:
1117 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics2.FIELDS.total_bytes_processed
1118
1119 Returns:
1120 Optional[int]:
1121 Total bytes processed by the job, or None if job is not
1122 yet complete.
1123 """
1124 result = self._job_statistics().get("totalBytesProcessed")
1125 if result is not None:
1126 result = int(result)
1127 return result
1128
1129 @property
1130 def total_bytes_billed(self):
1131 """Return total bytes billed from job statistics, if present.
1132
1133 See:
1134 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics2.FIELDS.total_bytes_billed
1135
1136 Returns:
1137 Optional[int]:
1138 Total bytes processed by the job, or None if job is not
1139 yet complete.
1140 """
1141 result = self._job_statistics().get("totalBytesBilled")
1142 if result is not None:
1143 result = int(result)
1144 return result
1145
1146 @property
1147 def billing_tier(self):
1148 """Return billing tier from job statistics, if present.
1149
1150 See:
1151 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics2.FIELDS.billing_tier
1152
1153 Returns:
1154 Optional[int]:
1155 Billing tier used by the job, or None if job is not
1156 yet complete.
1157 """
1158 return self._job_statistics().get("billingTier")
1159
1160 @property
1161 def cache_hit(self):
1162 """Return whether or not query results were served from cache.
1163
1164 See:
1165 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics2.FIELDS.cache_hit
1166
1167 Returns:
1168 Optional[bool]:
1169 whether the query results were returned from cache, or None
1170 if job is not yet complete.
1171 """
1172 return self._job_statistics().get("cacheHit")
1173
1174 @property
1175 def ddl_operation_performed(self):
1176 """Optional[str]: Return the DDL operation performed.
1177
1178 See:
1179 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics2.FIELDS.ddl_operation_performed
1180
1181 """
1182 return self._job_statistics().get("ddlOperationPerformed")
1183
1184 @property
1185 def ddl_target_routine(self):
1186 """Optional[google.cloud.bigquery.routine.RoutineReference]: Return the DDL target routine, present
1187 for CREATE/DROP FUNCTION/PROCEDURE queries.
1188
1189 See:
1190 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics2.FIELDS.ddl_target_routine
1191 """
1192 prop = self._job_statistics().get("ddlTargetRoutine")
1193 if prop is not None:
1194 prop = RoutineReference.from_api_repr(prop)
1195 return prop
1196
1197 @property
1198 def ddl_target_table(self):
1199 """Optional[google.cloud.bigquery.table.TableReference]: Return the DDL target table, present
1200 for CREATE/DROP TABLE/VIEW queries.
1201
1202 See:
1203 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics2.FIELDS.ddl_target_table
1204 """
1205 prop = self._job_statistics().get("ddlTargetTable")
1206 if prop is not None:
1207 prop = TableReference.from_api_repr(prop)
1208 return prop
1209
1210 @property
1211 def num_dml_affected_rows(self) -> Optional[int]:
1212 """Return the number of DML rows affected by the job.
1213
1214 See:
1215 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics2.FIELDS.num_dml_affected_rows
1216
1217 Returns:
1218 Optional[int]:
1219 number of DML rows affected by the job, or None if job is not
1220 yet complete.
1221 """
1222 result = self._job_statistics().get("numDmlAffectedRows")
1223 if result is not None:
1224 result = int(result)
1225 return result
1226
1227 @property
1228 def slot_millis(self):
1229 """Union[int, None]: Slot-milliseconds used by this query job."""
1230 return _helpers._int_or_none(self._job_statistics().get("totalSlotMs"))
1231
1232 @property
1233 def statement_type(self):
1234 """Return statement type from job statistics, if present.
1235
1236 See:
1237 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics2.FIELDS.statement_type
1238
1239 Returns:
1240 Optional[str]:
1241 type of statement used by the job, or None if job is not
1242 yet complete.
1243 """
1244 return self._job_statistics().get("statementType")
1245
1246 @property
1247 def referenced_tables(self):
1248 """Return referenced tables from job statistics, if present.
1249
1250 See:
1251 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics2.FIELDS.referenced_tables
1252
1253 Returns:
1254 List[Dict]:
1255 mappings describing the query plan, or an empty list
1256 if the query has not yet completed.
1257 """
1258 tables = []
1259 datasets_by_project_name = {}
1260
1261 for table in self._job_statistics().get("referencedTables", ()):
1262 t_project = table["projectId"]
1263
1264 ds_id = table["datasetId"]
1265 t_dataset = datasets_by_project_name.get((t_project, ds_id))
1266 if t_dataset is None:
1267 t_dataset = DatasetReference(t_project, ds_id)
1268 datasets_by_project_name[(t_project, ds_id)] = t_dataset
1269
1270 t_name = table["tableId"]
1271 tables.append(t_dataset.table(t_name))
1272
1273 return tables
1274
1275 @property
1276 def undeclared_query_parameters(self):
1277 """Return undeclared query parameters from job statistics, if present.
1278
1279 See:
1280 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics2.FIELDS.undeclared_query_parameters
1281
1282 Returns:
1283 List[Union[ \
1284 google.cloud.bigquery.query.ArrayQueryParameter, \
1285 google.cloud.bigquery.query.ScalarQueryParameter, \
1286 google.cloud.bigquery.query.StructQueryParameter \
1287 ]]:
1288 Undeclared parameters, or an empty list if the query has
1289 not yet completed.
1290 """
1291 parameters = []
1292 undeclared = self._job_statistics().get("undeclaredQueryParameters", ())
1293
1294 for parameter in undeclared:
1295 p_type = parameter["parameterType"]
1296
1297 if "arrayType" in p_type:
1298 klass = ArrayQueryParameter
1299 elif "structTypes" in p_type:
1300 klass = StructQueryParameter
1301 else:
1302 klass = ScalarQueryParameter
1303
1304 parameters.append(klass.from_api_repr(parameter))
1305
1306 return parameters
1307
1308 @property
1309 def estimated_bytes_processed(self):
1310 """Return the estimated number of bytes processed by the query.
1311
1312 See:
1313 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics2.FIELDS.estimated_bytes_processed
1314
1315 Returns:
1316 Optional[int]:
1317 number of DML rows affected by the job, or None if job is not
1318 yet complete.
1319 """
1320 result = self._job_statistics().get("estimatedBytesProcessed")
1321 if result is not None:
1322 result = int(result)
1323 return result
1324
1325 @property
1326 def dml_stats(self) -> Optional[DmlStats]:
1327 stats = self._job_statistics().get("dmlStats")
1328 if stats is None:
1329 return None
1330 else:
1331 return DmlStats.from_api_repr(stats)
1332
1333 @property
1334 def bi_engine_stats(self) -> Optional[BiEngineStats]:
1335 stats = self._job_statistics().get("biEngineStatistics")
1336
1337 if stats is None:
1338 return None
1339 else:
1340 return BiEngineStats.from_api_repr(stats)
1341
1342 def _blocking_poll(self, timeout=None, **kwargs):
1343 self._done_timeout = timeout
1344 self._transport_timeout = timeout
1345 super(QueryJob, self)._blocking_poll(timeout=timeout, **kwargs)
1346
1347 @staticmethod
1348 def _format_for_exception(message: str, query: str):
1349 """Format a query for the output in exception message.
1350
1351 Args:
1352 message (str): The original exception message.
1353 query (str): The SQL query to format.
1354
1355 Returns:
1356 str: A formatted query text.
1357 """
1358 template = "{message}\n\n{header}\n\n{ruler}\n{body}\n{ruler}"
1359
1360 lines = query.splitlines() if query is not None else [""]
1361 max_line_len = max(len(line) for line in lines)
1362
1363 header = "-----Query Job SQL Follows-----"
1364 header = "{:^{total_width}}".format(header, total_width=max_line_len + 5)
1365
1366 # Print out a "ruler" above and below the SQL so we can judge columns.
1367 # Left pad for the line numbers (4 digits plus ":").
1368 ruler = " |" + " . |" * (max_line_len // 10)
1369
1370 # Put line numbers next to the SQL.
1371 body = "\n".join(
1372 "{:4}:{}".format(n, line) for n, line in enumerate(lines, start=1)
1373 )
1374
1375 return template.format(message=message, header=header, ruler=ruler, body=body)
1376
1377 def _begin(self, client=None, retry=DEFAULT_RETRY, timeout=None):
1378 """API call: begin the job via a POST request
1379
1380 See
1381 https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/insert
1382
1383 Args:
1384 client (Optional[google.cloud.bigquery.client.Client]):
1385 The client to use. If not passed, falls back to the ``client``
1386 associated with the job object or``NoneType``.
1387 retry (Optional[google.api_core.retry.Retry]):
1388 How to retry the RPC.
1389 timeout (Optional[float]):
1390 The number of seconds to wait for the underlying HTTP transport
1391 before using ``retry``.
1392
1393 Raises:
1394 ValueError: If the job has already begun.
1395 """
1396
1397 try:
1398 super(QueryJob, self)._begin(client=client, retry=retry, timeout=timeout)
1399 except exceptions.GoogleAPICallError as exc:
1400 exc.message = _EXCEPTION_FOOTER_TEMPLATE.format(
1401 message=exc.message, location=self.location, job_id=self.job_id
1402 )
1403 exc.debug_message = self._format_for_exception(exc.message, self.query)
1404 exc.query_job = self
1405 raise
1406
1407 def _reload_query_results(
1408 self,
1409 retry: "retries.Retry" = DEFAULT_RETRY,
1410 timeout: Optional[float] = None,
1411 page_size: int = 0,
1412 start_index: Optional[int] = None,
1413 ):
1414 """Refresh the cached query results unless already cached and complete.
1415
1416 Args:
1417 retry (Optional[google.api_core.retry.Retry]):
1418 How to retry the call that retrieves query results.
1419 timeout (Optional[float]):
1420 The number of seconds to wait for the underlying HTTP transport
1421 before using ``retry``.
1422 page_size (int):
1423 Maximum number of rows in a single response. See maxResults in
1424 the jobs.getQueryResults REST API.
1425 start_index (Optional[int]):
1426 Zero-based index of the starting row. See startIndex in the
1427 jobs.getQueryResults REST API.
1428 """
1429 # Optimization: avoid a call to jobs.getQueryResults if it's already
1430 # been fetched, e.g. from jobs.query first page of results.
1431 if self._query_results and self._query_results.complete:
1432 return
1433
1434 # Since the API to getQueryResults can hang up to the timeout value
1435 # (default of 10 seconds), set the timeout parameter to ensure that
1436 # the timeout from the futures API is respected. See:
1437 # https://github.com/GoogleCloudPlatform/google-cloud-python/issues/4135
1438 timeout_ms = None
1439
1440 # Python_API_core, as part of a major rewrite of the deadline, timeout,
1441 # retry process sets the timeout value as a Python object().
1442 # Our system does not natively handle that and instead expects
1443 # either None or a numeric value. If passed a Python object, convert to
1444 # None.
1445 if type(self._done_timeout) is object: # pragma: NO COVER
1446 self._done_timeout = None
1447
1448 if self._done_timeout is not None: # pragma: NO COVER
1449 # Subtract a buffer for context switching, network latency, etc.
1450 api_timeout = self._done_timeout - _TIMEOUT_BUFFER_SECS
1451 api_timeout = max(min(api_timeout, 10), 0)
1452 self._done_timeout -= api_timeout
1453 self._done_timeout = max(0, self._done_timeout)
1454 timeout_ms = int(api_timeout * 1000)
1455
1456 # If an explicit timeout is not given, fall back to the transport timeout
1457 # stored in _blocking_poll() in the process of polling for job completion.
1458 if timeout is not None:
1459 transport_timeout = timeout
1460 else:
1461 transport_timeout = self._transport_timeout
1462
1463 # Handle PollingJob._DEFAULT_VALUE.
1464 if not isinstance(transport_timeout, (float, int)):
1465 transport_timeout = None
1466
1467 self._query_results = self._client._get_query_results(
1468 self.job_id,
1469 retry,
1470 project=self.project,
1471 timeout_ms=timeout_ms,
1472 location=self.location,
1473 timeout=transport_timeout,
1474 page_size=page_size,
1475 start_index=start_index,
1476 )
1477
1478 def result( # type: ignore # (incompatible with supertype)
1479 self,
1480 page_size: Optional[int] = None,
1481 max_results: Optional[int] = None,
1482 retry: Optional[retries.Retry] = DEFAULT_RETRY,
1483 timeout: Optional[Union[float, object]] = POLLING_DEFAULT_VALUE,
1484 start_index: Optional[int] = None,
1485 job_retry: Optional[retries.Retry] = DEFAULT_JOB_RETRY,
1486 ) -> Union["RowIterator", _EmptyRowIterator]:
1487 """Start the job and wait for it to complete and get the result.
1488
1489 Args:
1490 page_size (Optional[int]):
1491 The maximum number of rows in each page of results from this
1492 request. Non-positive values are ignored.
1493 max_results (Optional[int]):
1494 The maximum total number of rows from this request.
1495 retry (Optional[google.api_core.retry.Retry]):
1496 How to retry the call that retrieves rows. This only
1497 applies to making RPC calls. It isn't used to retry
1498 failed jobs. This has a reasonable default that
1499 should only be overridden with care. If the job state
1500 is ``DONE``, retrying is aborted early even if the
1501 results are not available, as this will not change
1502 anymore.
1503 timeout (Optional[Union[float, \
1504 google.api_core.future.polling.PollingFuture._DEFAULT_VALUE, \
1505 ]]):
1506 The number of seconds to wait for the underlying HTTP transport
1507 before using ``retry``. If ``None``, wait indefinitely
1508 unless an error is returned. If unset, only the
1509 underlying API calls have their default timeouts, but we still
1510 wait indefinitely for the job to finish.
1511 start_index (Optional[int]):
1512 The zero-based index of the starting row to read.
1513 job_retry (Optional[google.api_core.retry.Retry]):
1514 How to retry failed jobs. The default retries
1515 rate-limit-exceeded errors. Passing ``None`` disables
1516 job retry.
1517
1518 Not all jobs can be retried. If ``job_id`` was
1519 provided to the query that created this job, then the
1520 job returned by the query will not be retryable, and
1521 an exception will be raised if non-``None``
1522 non-default ``job_retry`` is also provided.
1523
1524 Returns:
1525 google.cloud.bigquery.table.RowIterator:
1526 Iterator of row data
1527 :class:`~google.cloud.bigquery.table.Row`-s. During each
1528 page, the iterator will have the ``total_rows`` attribute
1529 set, which counts the total number of rows **in the result
1530 set** (this is distinct from the total number of rows in the
1531 current page: ``iterator.page.num_items``).
1532
1533 If the query is a special query that produces no results, e.g.
1534 a DDL query, an ``_EmptyRowIterator`` instance is returned.
1535
1536 Raises:
1537 google.api_core.exceptions.GoogleAPICallError:
1538 If the job failed and retries aren't successful.
1539 concurrent.futures.TimeoutError:
1540 If the job did not complete in the given timeout.
1541 TypeError:
1542 If Non-``None`` and non-default ``job_retry`` is
1543 provided and the job is not retryable.
1544 """
1545 # Note: Since waiting for a query job to finish is more complex than
1546 # refreshing the job state in a loop, we avoid calling the superclass
1547 # in this method.
1548
1549 if self.dry_run:
1550 return _EmptyRowIterator(
1551 project=self.project,
1552 location=self.location,
1553 # Intentionally omit job_id and query_id since this doesn't
1554 # actually correspond to a finished query job.
1555 )
1556
1557 # Setting max_results should be equivalent to setting page_size with
1558 # regards to allowing the user to tune how many results to download
1559 # while we wait for the query to finish. See internal issue:
1560 # 344008814. But if start_index is set, user is trying to access a
1561 # specific page, so we don't need to set page_size. See issue #1950.
1562 if page_size is None and max_results is not None and start_index is None:
1563 page_size = max_results
1564
1565 # When timeout has default sentinel value ``object()``, do not pass
1566 # anything to invoke default timeouts in subsequent calls.
1567 done_kwargs: Dict[str, Union[_helpers.TimeoutType, object]] = {}
1568 reload_query_results_kwargs: Dict[str, Union[_helpers.TimeoutType, object]] = {}
1569 list_rows_kwargs: Dict[str, Union[_helpers.TimeoutType, object]] = {}
1570 if type(timeout) is not object:
1571 done_kwargs["timeout"] = timeout
1572 list_rows_kwargs["timeout"] = timeout
1573 reload_query_results_kwargs["timeout"] = timeout
1574
1575 if page_size is not None:
1576 reload_query_results_kwargs["page_size"] = page_size
1577
1578 if start_index is not None:
1579 reload_query_results_kwargs["start_index"] = start_index
1580
1581 try:
1582 retry_do_query = getattr(self, "_retry_do_query", None)
1583 if retry_do_query is not None:
1584 if job_retry is DEFAULT_JOB_RETRY:
1585 job_retry = self._job_retry # type: ignore
1586 else:
1587 if job_retry is not None and job_retry is not DEFAULT_JOB_RETRY:
1588 raise TypeError(
1589 "`job_retry` was provided, but this job is"
1590 " not retryable, because a custom `job_id` was"
1591 " provided to the query that created this job."
1592 )
1593
1594 restart_query_job = False
1595
1596 def is_job_done():
1597 nonlocal restart_query_job
1598
1599 if restart_query_job:
1600 restart_query_job = False
1601
1602 # The original job has failed. Create a new one.
1603 #
1604 # Note that we won't get here if retry_do_query is
1605 # None, because we won't use a retry.
1606 job = retry_do_query()
1607
1608 # Become the new job:
1609 self.__dict__.clear()
1610 self.__dict__.update(job.__dict__)
1611
1612 # It's possible the job fails again and we'll have to
1613 # retry that too.
1614 self._retry_do_query = retry_do_query
1615 self._job_retry = job_retry
1616
1617 # If the job hasn't been created, create it now. Related:
1618 # https://github.com/googleapis/python-bigquery/issues/1940
1619 if self.state is None:
1620 self._begin(retry=retry, **done_kwargs)
1621
1622 # Refresh the job status with jobs.get because some of the
1623 # exceptions thrown by jobs.getQueryResults like timeout and
1624 # rateLimitExceeded errors are ambiguous. We want to know if
1625 # the query job failed and not just the call to
1626 # jobs.getQueryResults.
1627 if self.done(retry=retry, **done_kwargs):
1628 # If it's already failed, we might as well stop.
1629 job_failed_exception = self.exception()
1630 if job_failed_exception is not None:
1631 # Only try to restart the query job if the job failed for
1632 # a retriable reason. For example, don't restart the query
1633 # if the call to reload the job metadata within self.done()
1634 # timed out.
1635 #
1636 # The `restart_query_job` must only be called after a
1637 # successful call to the `jobs.get` REST API and we
1638 # determine that the job has failed.
1639 #
1640 # The `jobs.get` REST API
1641 # (https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/get)
1642 # is called via `self.done()` which calls
1643 # `self.reload()`.
1644 #
1645 # To determine if the job failed, the `self.exception()`
1646 # is set from `self.reload()` via
1647 # `self._set_properties()`, which translates the
1648 # `Job.status.errorResult` field
1649 # (https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatus.FIELDS.error_result)
1650 # into an exception that can be processed by the
1651 # `job_retry` predicate.
1652 restart_query_job = True
1653 raise job_failed_exception
1654 else:
1655 # Make sure that the _query_results are cached so we
1656 # can return a complete RowIterator.
1657 #
1658 # Note: As an optimization, _reload_query_results
1659 # doesn't make any API calls if the query results are
1660 # already cached and have jobComplete=True in the
1661 # response from the REST API. This ensures we aren't
1662 # making any extra API calls if the previous loop
1663 # iteration fetched the finished job.
1664 self._reload_query_results(
1665 retry=retry, **reload_query_results_kwargs
1666 )
1667 return True
1668
1669 # Call jobs.getQueryResults with max results set to 0 just to
1670 # wait for the query to finish. Unlike most methods,
1671 # jobs.getQueryResults hangs as long as it can to ensure we
1672 # know when the query has finished as soon as possible.
1673 self._reload_query_results(retry=retry, **reload_query_results_kwargs)
1674
1675 # Even if the query is finished now according to
1676 # jobs.getQueryResults, we'll want to reload the job status if
1677 # it's not already DONE.
1678 return False
1679
1680 if retry_do_query is not None and job_retry is not None:
1681 is_job_done = job_retry(is_job_done)
1682
1683 # timeout can be a number of seconds, `None`, or a
1684 # `google.api_core.future.polling.PollingFuture._DEFAULT_VALUE`
1685 # sentinel object indicating a default timeout if we choose to add
1686 # one some day. This value can come from our PollingFuture
1687 # superclass and was introduced in
1688 # https://github.com/googleapis/python-api-core/pull/462.
1689 if isinstance(timeout, (float, int)):
1690 remaining_timeout = timeout
1691 else:
1692 # Note: we may need to handle _DEFAULT_VALUE as a separate
1693 # case someday, but even then the best we can do for queries
1694 # is 72+ hours for hyperparameter tuning jobs:
1695 # https://cloud.google.com/bigquery/quotas#query_jobs
1696 #
1697 # The timeout for a multi-statement query is 24+ hours. See:
1698 # https://cloud.google.com/bigquery/quotas#multi_statement_query_limits
1699 remaining_timeout = None
1700
1701 if remaining_timeout is None:
1702 # Since is_job_done() calls jobs.getQueryResults, which is a
1703 # long-running API, don't delay the next request at all.
1704 while not is_job_done():
1705 pass
1706 else:
1707 # Use a monotonic clock since we don't actually care about
1708 # daylight savings or similar, just the elapsed time.
1709 previous_time = time.monotonic()
1710
1711 while not is_job_done():
1712 current_time = time.monotonic()
1713 elapsed_time = current_time - previous_time
1714 remaining_timeout = remaining_timeout - elapsed_time
1715 previous_time = current_time
1716
1717 if remaining_timeout < 0:
1718 raise concurrent.futures.TimeoutError()
1719
1720 except exceptions.GoogleAPICallError as exc:
1721 exc.message = _EXCEPTION_FOOTER_TEMPLATE.format(
1722 message=exc.message, location=self.location, job_id=self.job_id
1723 )
1724 exc.debug_message = self._format_for_exception(exc.message, self.query) # type: ignore
1725 exc.query_job = self # type: ignore
1726 raise
1727 except requests.exceptions.Timeout as exc:
1728 raise concurrent.futures.TimeoutError from exc
1729
1730 # If the query job is complete but there are no query results, this was
1731 # special job, such as a DDL query. Return an empty result set to
1732 # indicate success and avoid calling tabledata.list on a table which
1733 # can't be read (such as a view table).
1734 if self._query_results.total_rows is None:
1735 return _EmptyRowIterator(
1736 location=self.location,
1737 project=self.project,
1738 job_id=self.job_id,
1739 query_id=self.query_id,
1740 num_dml_affected_rows=self._query_results.num_dml_affected_rows,
1741 )
1742
1743 # We know that there's at least 1 row, so only treat the response from
1744 # jobs.getQueryResults / jobs.query as the first page of the
1745 # RowIterator response if there are any rows in it. This prevents us
1746 # from stopping the iteration early in the cases where we set
1747 # maxResults=0. In that case, we're missing rows and there's no next
1748 # page token.
1749 first_page_response = self._query_results._properties
1750 if "rows" not in first_page_response:
1751 first_page_response = None
1752
1753 rows = self._client._list_rows_from_query_results(
1754 self.job_id,
1755 self.location,
1756 self.project,
1757 self._query_results.schema,
1758 total_rows=self._query_results.total_rows,
1759 destination=self.destination,
1760 page_size=page_size,
1761 max_results=max_results,
1762 start_index=start_index,
1763 retry=retry,
1764 query_id=self.query_id,
1765 first_page_response=first_page_response,
1766 num_dml_affected_rows=self._query_results.num_dml_affected_rows,
1767 query=self.query,
1768 total_bytes_processed=self.total_bytes_processed,
1769 **list_rows_kwargs,
1770 )
1771 rows._preserve_order = _contains_order_by(self.query)
1772 return rows
1773
1774 # If changing the signature of this method, make sure to apply the same
1775 # changes to table.RowIterator.to_arrow(), except for the max_results parameter
1776 # that should only exist here in the QueryJob method.
1777 def to_arrow(
1778 self,
1779 progress_bar_type: Optional[str] = None,
1780 bqstorage_client: Optional["bigquery_storage.BigQueryReadClient"] = None,
1781 create_bqstorage_client: bool = True,
1782 max_results: Optional[int] = None,
1783 ) -> "pyarrow.Table":
1784 """[Beta] Create a class:`pyarrow.Table` by loading all pages of a
1785 table or query.
1786
1787 Args:
1788 progress_bar_type (Optional[str]):
1789 If set, use the `tqdm <https://tqdm.github.io/>`_ library to
1790 display a progress bar while the data downloads. Install the
1791 ``tqdm`` package to use this feature.
1792
1793 Possible values of ``progress_bar_type`` include:
1794
1795 ``None``
1796 No progress bar.
1797 ``'tqdm'``
1798 Use the :func:`tqdm.tqdm` function to print a progress bar
1799 to :data:`sys.stdout`.
1800 ``'tqdm_notebook'``
1801 Use the :func:`tqdm.notebook.tqdm` function to display a
1802 progress bar as a Jupyter notebook widget.
1803 ``'tqdm_gui'``
1804 Use the :func:`tqdm.tqdm_gui` function to display a
1805 progress bar as a graphical dialog box.
1806 bqstorage_client (Optional[google.cloud.bigquery_storage_v1.BigQueryReadClient]):
1807 A BigQuery Storage API client. If supplied, use the faster
1808 BigQuery Storage API to fetch rows from BigQuery. This API
1809 is a billable API.
1810
1811 This method requires ``google-cloud-bigquery-storage`` library.
1812
1813 Reading from a specific partition or snapshot is not
1814 currently supported by this method.
1815 create_bqstorage_client (Optional[bool]):
1816 If ``True`` (default), create a BigQuery Storage API client
1817 using the default API settings. The BigQuery Storage API
1818 is a faster way to fetch rows from BigQuery. See the
1819 ``bqstorage_client`` parameter for more information.
1820
1821 This argument does nothing if ``bqstorage_client`` is supplied.
1822
1823 .. versionadded:: 1.24.0
1824
1825 max_results (Optional[int]):
1826 Maximum number of rows to include in the result. No limit by default.
1827
1828 .. versionadded:: 2.21.0
1829
1830 Returns:
1831 pyarrow.Table
1832 A :class:`pyarrow.Table` populated with row data and column
1833 headers from the query results. The column headers are derived
1834 from the destination table's schema.
1835
1836 Raises:
1837 ValueError:
1838 If the :mod:`pyarrow` library cannot be imported.
1839
1840 .. versionadded:: 1.17.0
1841 """
1842 query_result = wait_for_query(self, progress_bar_type, max_results=max_results)
1843 return query_result.to_arrow(
1844 progress_bar_type=progress_bar_type,
1845 bqstorage_client=bqstorage_client,
1846 create_bqstorage_client=create_bqstorage_client,
1847 )
1848
1849 # If changing the signature of this method, make sure to apply the same
1850 # changes to table.RowIterator.to_dataframe(), except for the max_results parameter
1851 # that should only exist here in the QueryJob method.
1852 def to_dataframe(
1853 self,
1854 bqstorage_client: Optional["bigquery_storage.BigQueryReadClient"] = None,
1855 dtypes: Optional[Dict[str, Any]] = None,
1856 progress_bar_type: Optional[str] = None,
1857 create_bqstorage_client: bool = True,
1858 max_results: Optional[int] = None,
1859 geography_as_object: bool = False,
1860 bool_dtype: Union[Any, None] = DefaultPandasDTypes.BOOL_DTYPE,
1861 int_dtype: Union[Any, None] = DefaultPandasDTypes.INT_DTYPE,
1862 float_dtype: Union[Any, None] = None,
1863 string_dtype: Union[Any, None] = None,
1864 date_dtype: Union[Any, None] = DefaultPandasDTypes.DATE_DTYPE,
1865 datetime_dtype: Union[Any, None] = None,
1866 time_dtype: Union[Any, None] = DefaultPandasDTypes.TIME_DTYPE,
1867 timestamp_dtype: Union[Any, None] = None,
1868 range_date_dtype: Union[Any, None] = DefaultPandasDTypes.RANGE_DATE_DTYPE,
1869 range_datetime_dtype: Union[
1870 Any, None
1871 ] = DefaultPandasDTypes.RANGE_DATETIME_DTYPE,
1872 range_timestamp_dtype: Union[
1873 Any, None
1874 ] = DefaultPandasDTypes.RANGE_TIMESTAMP_DTYPE,
1875 ) -> "pandas.DataFrame":
1876 """Return a pandas DataFrame from a QueryJob
1877
1878 Args:
1879 bqstorage_client (Optional[google.cloud.bigquery_storage_v1.BigQueryReadClient]):
1880 A BigQuery Storage API client. If supplied, use the faster
1881 BigQuery Storage API to fetch rows from BigQuery. This
1882 API is a billable API.
1883
1884 This method requires the ``fastavro`` and
1885 ``google-cloud-bigquery-storage`` libraries.
1886
1887 Reading from a specific partition or snapshot is not
1888 currently supported by this method.
1889
1890 dtypes (Optional[Map[str, Union[str, pandas.Series.dtype]]]):
1891 A dictionary of column names pandas ``dtype``s. The provided
1892 ``dtype`` is used when constructing the series for the column
1893 specified. Otherwise, the default pandas behavior is used.
1894
1895 progress_bar_type (Optional[str]):
1896 If set, use the `tqdm <https://tqdm.github.io/>`_ library to
1897 display a progress bar while the data downloads. Install the
1898 ``tqdm`` package to use this feature.
1899
1900 See
1901 :func:`~google.cloud.bigquery.table.RowIterator.to_dataframe`
1902 for details.
1903
1904 .. versionadded:: 1.11.0
1905 create_bqstorage_client (Optional[bool]):
1906 If ``True`` (default), create a BigQuery Storage API client
1907 using the default API settings. The BigQuery Storage API
1908 is a faster way to fetch rows from BigQuery. See the
1909 ``bqstorage_client`` parameter for more information.
1910
1911 This argument does nothing if ``bqstorage_client`` is supplied.
1912
1913 .. versionadded:: 1.24.0
1914
1915 max_results (Optional[int]):
1916 Maximum number of rows to include in the result. No limit by default.
1917
1918 .. versionadded:: 2.21.0
1919
1920 geography_as_object (Optional[bool]):
1921 If ``True``, convert GEOGRAPHY data to :mod:`shapely`
1922 geometry objects. If ``False`` (default), don't cast
1923 geography data to :mod:`shapely` geometry objects.
1924
1925 .. versionadded:: 2.24.0
1926
1927 bool_dtype (Optional[pandas.Series.dtype, None]):
1928 If set, indicate a pandas ExtensionDtype (e.g. ``pandas.BooleanDtype()``)
1929 to convert BigQuery Boolean type, instead of relying on the default
1930 ``pandas.BooleanDtype()``. If you explicitly set the value to ``None``,
1931 then the data type will be ``numpy.dtype("bool")``. BigQuery Boolean
1932 type can be found at:
1933 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#boolean_type
1934
1935 .. versionadded:: 3.8.0
1936
1937 int_dtype (Optional[pandas.Series.dtype, None]):
1938 If set, indicate a pandas ExtensionDtype (e.g. ``pandas.Int64Dtype()``)
1939 to convert BigQuery Integer types, instead of relying on the default
1940 ``pandas.Int64Dtype()``. If you explicitly set the value to ``None``,
1941 then the data type will be ``numpy.dtype("int64")``. A list of BigQuery
1942 Integer types can be found at:
1943 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#integer_types
1944
1945 .. versionadded:: 3.8.0
1946
1947 float_dtype (Optional[pandas.Series.dtype, None]):
1948 If set, indicate a pandas ExtensionDtype (e.g. ``pandas.Float32Dtype()``)
1949 to convert BigQuery Float type, instead of relying on the default
1950 ``numpy.dtype("float64")``. If you explicitly set the value to ``None``,
1951 then the data type will be ``numpy.dtype("float64")``. BigQuery Float
1952 type can be found at:
1953 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#floating_point_types
1954
1955 .. versionadded:: 3.8.0
1956
1957 string_dtype (Optional[pandas.Series.dtype, None]):
1958 If set, indicate a pandas ExtensionDtype (e.g. ``pandas.StringDtype()``) to
1959 convert BigQuery String type, instead of relying on the default
1960 ``numpy.dtype("object")``. If you explicitly set the value to ``None``,
1961 then the data type will be ``numpy.dtype("object")``. BigQuery String
1962 type can be found at:
1963 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#string_type
1964
1965 .. versionadded:: 3.8.0
1966
1967 date_dtype (Optional[pandas.Series.dtype, None]):
1968 If set, indicate a pandas ExtensionDtype (e.g.
1969 ``pandas.ArrowDtype(pyarrow.date32())``) to convert BigQuery Date
1970 type, instead of relying on the default ``db_dtypes.DateDtype()``.
1971 If you explicitly set the value to ``None``, then the data type will be
1972 ``numpy.dtype("datetime64[ns]")`` or ``object`` if out of bound. BigQuery
1973 Date type can be found at:
1974 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#date_type
1975
1976 .. versionadded:: 3.10.0
1977
1978 datetime_dtype (Optional[pandas.Series.dtype, None]):
1979 If set, indicate a pandas ExtensionDtype (e.g.
1980 ``pandas.ArrowDtype(pyarrow.timestamp("us"))``) to convert BigQuery Datetime
1981 type, instead of relying on the default ``numpy.dtype("datetime64[ns]``.
1982 If you explicitly set the value to ``None``, then the data type will be
1983 ``numpy.dtype("datetime64[ns]")`` or ``object`` if out of bound. BigQuery
1984 Datetime type can be found at:
1985 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#datetime_type
1986
1987 .. versionadded:: 3.10.0
1988
1989 time_dtype (Optional[pandas.Series.dtype, None]):
1990 If set, indicate a pandas ExtensionDtype (e.g.
1991 ``pandas.ArrowDtype(pyarrow.time64("us"))``) to convert BigQuery Time
1992 type, instead of relying on the default ``db_dtypes.TimeDtype()``.
1993 If you explicitly set the value to ``None``, then the data type will be
1994 ``numpy.dtype("object")``. BigQuery Time type can be found at:
1995 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#time_type
1996
1997 .. versionadded:: 3.10.0
1998
1999 timestamp_dtype (Optional[pandas.Series.dtype, None]):
2000 If set, indicate a pandas ExtensionDtype (e.g.
2001 ``pandas.ArrowDtype(pyarrow.timestamp("us", tz="UTC"))``) to convert BigQuery Timestamp
2002 type, instead of relying on the default ``numpy.dtype("datetime64[ns, UTC]")``.
2003 If you explicitly set the value to ``None``, then the data type will be
2004 ``numpy.dtype("datetime64[ns, UTC]")`` or ``object`` if out of bound. BigQuery
2005 Datetime type can be found at:
2006 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#timestamp_type
2007
2008 .. versionadded:: 3.10.0
2009
2010 range_date_dtype (Optional[pandas.Series.dtype, None]):
2011 If set, indicate a pandas ExtensionDtype, such as:
2012
2013 .. code-block:: python
2014
2015 pandas.ArrowDtype(pyarrow.struct(
2016 [("start", pyarrow.date32()), ("end", pyarrow.date32())]
2017 ))
2018
2019 to convert BigQuery RANGE<DATE> type, instead of relying on
2020 the default ``object``. If you explicitly set the value to
2021 ``None``, the data type will be ``object``. BigQuery Range type
2022 can be found at:
2023 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#range_type
2024
2025 .. versionadded:: 3.21.0
2026
2027 range_datetime_dtype (Optional[pandas.Series.dtype, None]):
2028 If set, indicate a pandas ExtensionDtype, such as:
2029
2030 .. code-block:: python
2031
2032 pandas.ArrowDtype(pyarrow.struct(
2033 [
2034 ("start", pyarrow.timestamp("us")),
2035 ("end", pyarrow.timestamp("us")),
2036 ]
2037 ))
2038
2039 to convert BigQuery RANGE<DATETIME> type, instead of relying on
2040 the default ``object``. If you explicitly set the value to
2041 ``None``, the data type will be ``object``. BigQuery Range type
2042 can be found at:
2043 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#range_type
2044
2045 .. versionadded:: 3.21.0
2046
2047 range_timestamp_dtype (Optional[pandas.Series.dtype, None]):
2048 If set, indicate a pandas ExtensionDtype, such as:
2049
2050 .. code-block:: python
2051
2052 pandas.ArrowDtype(pyarrow.struct(
2053 [
2054 ("start", pyarrow.timestamp("us", tz="UTC")),
2055 ("end", pyarrow.timestamp("us", tz="UTC")),
2056 ]
2057 ))
2058
2059 to convert BigQuery RANGE<TIMESTAMP> type, instead of relying
2060 on the default ``object``. If you explicitly set the value to
2061 ``None``, the data type will be ``object``. BigQuery Range type
2062 can be found at:
2063 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#range_type
2064
2065 .. versionadded:: 3.21.0
2066
2067 Returns:
2068 pandas.DataFrame:
2069 A :class:`~pandas.DataFrame` populated with row data
2070 and column headers from the query results. The column
2071 headers are derived from the destination table's
2072 schema.
2073
2074 Raises:
2075 ValueError:
2076 If the :mod:`pandas` library cannot be imported, or
2077 the :mod:`google.cloud.bigquery_storage_v1` module is
2078 required but cannot be imported. Also if
2079 `geography_as_object` is `True`, but the
2080 :mod:`shapely` library cannot be imported.
2081 """
2082 query_result = wait_for_query(self, progress_bar_type, max_results=max_results)
2083 return query_result.to_dataframe(
2084 bqstorage_client=bqstorage_client,
2085 dtypes=dtypes,
2086 progress_bar_type=progress_bar_type,
2087 create_bqstorage_client=create_bqstorage_client,
2088 geography_as_object=geography_as_object,
2089 bool_dtype=bool_dtype,
2090 int_dtype=int_dtype,
2091 float_dtype=float_dtype,
2092 string_dtype=string_dtype,
2093 date_dtype=date_dtype,
2094 datetime_dtype=datetime_dtype,
2095 time_dtype=time_dtype,
2096 timestamp_dtype=timestamp_dtype,
2097 range_date_dtype=range_date_dtype,
2098 range_datetime_dtype=range_datetime_dtype,
2099 range_timestamp_dtype=range_timestamp_dtype,
2100 )
2101
2102 # If changing the signature of this method, make sure to apply the same
2103 # changes to table.RowIterator.to_dataframe(), except for the max_results parameter
2104 # that should only exist here in the QueryJob method.
2105 def to_geodataframe(
2106 self,
2107 bqstorage_client: Optional["bigquery_storage.BigQueryReadClient"] = None,
2108 dtypes: Optional[Dict[str, Any]] = None,
2109 progress_bar_type: Optional[str] = None,
2110 create_bqstorage_client: bool = True,
2111 max_results: Optional[int] = None,
2112 geography_column: Optional[str] = None,
2113 bool_dtype: Union[Any, None] = DefaultPandasDTypes.BOOL_DTYPE,
2114 int_dtype: Union[Any, None] = DefaultPandasDTypes.INT_DTYPE,
2115 float_dtype: Union[Any, None] = None,
2116 string_dtype: Union[Any, None] = None,
2117 ) -> "geopandas.GeoDataFrame":
2118 """Return a GeoPandas GeoDataFrame from a QueryJob
2119
2120 Args:
2121 bqstorage_client (Optional[google.cloud.bigquery_storage_v1.BigQueryReadClient]):
2122 A BigQuery Storage API client. If supplied, use the faster
2123 BigQuery Storage API to fetch rows from BigQuery. This
2124 API is a billable API.
2125
2126 This method requires the ``fastavro`` and
2127 ``google-cloud-bigquery-storage`` libraries.
2128
2129 Reading from a specific partition or snapshot is not
2130 currently supported by this method.
2131
2132 dtypes (Optional[Map[str, Union[str, pandas.Series.dtype]]]):
2133 A dictionary of column names pandas ``dtype``s. The provided
2134 ``dtype`` is used when constructing the series for the column
2135 specified. Otherwise, the default pandas behavior is used.
2136
2137 progress_bar_type (Optional[str]):
2138 If set, use the `tqdm <https://tqdm.github.io/>`_ library to
2139 display a progress bar while the data downloads. Install the
2140 ``tqdm`` package to use this feature.
2141
2142 See
2143 :func:`~google.cloud.bigquery.table.RowIterator.to_dataframe`
2144 for details.
2145
2146 .. versionadded:: 1.11.0
2147 create_bqstorage_client (Optional[bool]):
2148 If ``True`` (default), create a BigQuery Storage API client
2149 using the default API settings. The BigQuery Storage API
2150 is a faster way to fetch rows from BigQuery. See the
2151 ``bqstorage_client`` parameter for more information.
2152
2153 This argument does nothing if ``bqstorage_client`` is supplied.
2154
2155 .. versionadded:: 1.24.0
2156
2157 max_results (Optional[int]):
2158 Maximum number of rows to include in the result. No limit by default.
2159
2160 .. versionadded:: 2.21.0
2161
2162 geography_column (Optional[str]):
2163 If there are more than one GEOGRAPHY column,
2164 identifies which one to use to construct a GeoPandas
2165 GeoDataFrame. This option can be ommitted if there's
2166 only one GEOGRAPHY column.
2167 bool_dtype (Optional[pandas.Series.dtype, None]):
2168 If set, indicate a pandas ExtensionDtype (e.g. ``pandas.BooleanDtype()``)
2169 to convert BigQuery Boolean type, instead of relying on the default
2170 ``pandas.BooleanDtype()``. If you explicitly set the value to ``None``,
2171 then the data type will be ``numpy.dtype("bool")``. BigQuery Boolean
2172 type can be found at:
2173 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#boolean_type
2174 int_dtype (Optional[pandas.Series.dtype, None]):
2175 If set, indicate a pandas ExtensionDtype (e.g. ``pandas.Int64Dtype()``)
2176 to convert BigQuery Integer types, instead of relying on the default
2177 ``pandas.Int64Dtype()``. If you explicitly set the value to ``None``,
2178 then the data type will be ``numpy.dtype("int64")``. A list of BigQuery
2179 Integer types can be found at:
2180 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#integer_types
2181 float_dtype (Optional[pandas.Series.dtype, None]):
2182 If set, indicate a pandas ExtensionDtype (e.g. ``pandas.Float32Dtype()``)
2183 to convert BigQuery Float type, instead of relying on the default
2184 ``numpy.dtype("float64")``. If you explicitly set the value to ``None``,
2185 then the data type will be ``numpy.dtype("float64")``. BigQuery Float
2186 type can be found at:
2187 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#floating_point_types
2188 string_dtype (Optional[pandas.Series.dtype, None]):
2189 If set, indicate a pandas ExtensionDtype (e.g. ``pandas.StringDtype()``) to
2190 convert BigQuery String type, instead of relying on the default
2191 ``numpy.dtype("object")``. If you explicitly set the value to ``None``,
2192 then the data type will be ``numpy.dtype("object")``. BigQuery String
2193 type can be found at:
2194 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#string_type
2195
2196 Returns:
2197 geopandas.GeoDataFrame:
2198 A :class:`geopandas.GeoDataFrame` populated with row
2199 data and column headers from the query results. The
2200 column headers are derived from the destination
2201 table's schema.
2202
2203 Raises:
2204 ValueError:
2205 If the :mod:`geopandas` library cannot be imported, or the
2206 :mod:`google.cloud.bigquery_storage_v1` module is
2207 required but cannot be imported.
2208
2209 .. versionadded:: 2.24.0
2210 """
2211 query_result = wait_for_query(self, progress_bar_type, max_results=max_results)
2212 return query_result.to_geodataframe(
2213 bqstorage_client=bqstorage_client,
2214 dtypes=dtypes,
2215 progress_bar_type=progress_bar_type,
2216 create_bqstorage_client=create_bqstorage_client,
2217 geography_column=geography_column,
2218 bool_dtype=bool_dtype,
2219 int_dtype=int_dtype,
2220 float_dtype=float_dtype,
2221 string_dtype=string_dtype,
2222 )
2223
2224 def __iter__(self):
2225 return iter(self.result())
2226
2227
2228class QueryPlanEntryStep(object):
2229 """Map a single step in a query plan entry.
2230
2231 Args:
2232 kind (str): step type.
2233 substeps (List): names of substeps.
2234 """
2235
2236 def __init__(self, kind, substeps):
2237 self.kind = kind
2238 self.substeps = list(substeps)
2239
2240 @classmethod
2241 def from_api_repr(cls, resource: dict) -> "QueryPlanEntryStep":
2242 """Factory: construct instance from the JSON repr.
2243
2244 Args:
2245 resource (Dict): JSON representation of the entry.
2246
2247 Returns:
2248 google.cloud.bigquery.job.QueryPlanEntryStep:
2249 New instance built from the resource.
2250 """
2251 return cls(kind=resource.get("kind"), substeps=resource.get("substeps", ()))
2252
2253 def __eq__(self, other):
2254 if not isinstance(other, self.__class__):
2255 return NotImplemented
2256 return self.kind == other.kind and self.substeps == other.substeps
2257
2258
2259class QueryPlanEntry(object):
2260 """QueryPlanEntry represents a single stage of a query execution plan.
2261
2262 See
2263 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#ExplainQueryStage
2264 for the underlying API representation within query statistics.
2265 """
2266
2267 def __init__(self):
2268 self._properties = {}
2269
2270 @classmethod
2271 def from_api_repr(cls, resource: dict) -> "QueryPlanEntry":
2272 """Factory: construct instance from the JSON repr.
2273
2274 Args:
2275 resource(Dict[str: object]):
2276 ExplainQueryStage representation returned from API.
2277
2278 Returns:
2279 google.cloud.bigquery.job.QueryPlanEntry:
2280 Query plan entry parsed from ``resource``.
2281 """
2282 entry = cls()
2283 entry._properties = resource
2284 return entry
2285
2286 @property
2287 def name(self):
2288 """Optional[str]: Human-readable name of the stage."""
2289 return self._properties.get("name")
2290
2291 @property
2292 def entry_id(self):
2293 """Optional[str]: Unique ID for the stage within the plan."""
2294 return self._properties.get("id")
2295
2296 @property
2297 def start(self):
2298 """Optional[Datetime]: Datetime when the stage started."""
2299 if self._properties.get("startMs") is None:
2300 return None
2301 return _helpers._datetime_from_microseconds(
2302 int(self._properties.get("startMs")) * 1000.0
2303 )
2304
2305 @property
2306 def end(self):
2307 """Optional[Datetime]: Datetime when the stage ended."""
2308 if self._properties.get("endMs") is None:
2309 return None
2310 return _helpers._datetime_from_microseconds(
2311 int(self._properties.get("endMs")) * 1000.0
2312 )
2313
2314 @property
2315 def input_stages(self):
2316 """List(int): Entry IDs for stages that were inputs for this stage."""
2317 if self._properties.get("inputStages") is None:
2318 return []
2319 return [
2320 _helpers._int_or_none(entry)
2321 for entry in self._properties.get("inputStages")
2322 ]
2323
2324 @property
2325 def parallel_inputs(self):
2326 """Optional[int]: Number of parallel input segments within
2327 the stage.
2328 """
2329 return _helpers._int_or_none(self._properties.get("parallelInputs"))
2330
2331 @property
2332 def completed_parallel_inputs(self):
2333 """Optional[int]: Number of parallel input segments completed."""
2334 return _helpers._int_or_none(self._properties.get("completedParallelInputs"))
2335
2336 @property
2337 def wait_ms_avg(self):
2338 """Optional[int]: Milliseconds the average worker spent waiting to
2339 be scheduled.
2340 """
2341 return _helpers._int_or_none(self._properties.get("waitMsAvg"))
2342
2343 @property
2344 def wait_ms_max(self):
2345 """Optional[int]: Milliseconds the slowest worker spent waiting to
2346 be scheduled.
2347 """
2348 return _helpers._int_or_none(self._properties.get("waitMsMax"))
2349
2350 @property
2351 def wait_ratio_avg(self):
2352 """Optional[float]: Ratio of time the average worker spent waiting
2353 to be scheduled, relative to the longest time spent by any worker in
2354 any stage of the overall plan.
2355 """
2356 return self._properties.get("waitRatioAvg")
2357
2358 @property
2359 def wait_ratio_max(self):
2360 """Optional[float]: Ratio of time the slowest worker spent waiting
2361 to be scheduled, relative to the longest time spent by any worker in
2362 any stage of the overall plan.
2363 """
2364 return self._properties.get("waitRatioMax")
2365
2366 @property
2367 def read_ms_avg(self):
2368 """Optional[int]: Milliseconds the average worker spent reading
2369 input.
2370 """
2371 return _helpers._int_or_none(self._properties.get("readMsAvg"))
2372
2373 @property
2374 def read_ms_max(self):
2375 """Optional[int]: Milliseconds the slowest worker spent reading
2376 input.
2377 """
2378 return _helpers._int_or_none(self._properties.get("readMsMax"))
2379
2380 @property
2381 def read_ratio_avg(self):
2382 """Optional[float]: Ratio of time the average worker spent reading
2383 input, relative to the longest time spent by any worker in any stage
2384 of the overall plan.
2385 """
2386 return self._properties.get("readRatioAvg")
2387
2388 @property
2389 def read_ratio_max(self):
2390 """Optional[float]: Ratio of time the slowest worker spent reading
2391 to be scheduled, relative to the longest time spent by any worker in
2392 any stage of the overall plan.
2393 """
2394 return self._properties.get("readRatioMax")
2395
2396 @property
2397 def compute_ms_avg(self):
2398 """Optional[int]: Milliseconds the average worker spent on CPU-bound
2399 processing.
2400 """
2401 return _helpers._int_or_none(self._properties.get("computeMsAvg"))
2402
2403 @property
2404 def compute_ms_max(self):
2405 """Optional[int]: Milliseconds the slowest worker spent on CPU-bound
2406 processing.
2407 """
2408 return _helpers._int_or_none(self._properties.get("computeMsMax"))
2409
2410 @property
2411 def compute_ratio_avg(self):
2412 """Optional[float]: Ratio of time the average worker spent on
2413 CPU-bound processing, relative to the longest time spent by any
2414 worker in any stage of the overall plan.
2415 """
2416 return self._properties.get("computeRatioAvg")
2417
2418 @property
2419 def compute_ratio_max(self):
2420 """Optional[float]: Ratio of time the slowest worker spent on
2421 CPU-bound processing, relative to the longest time spent by any
2422 worker in any stage of the overall plan.
2423 """
2424 return self._properties.get("computeRatioMax")
2425
2426 @property
2427 def write_ms_avg(self):
2428 """Optional[int]: Milliseconds the average worker spent writing
2429 output data.
2430 """
2431 return _helpers._int_or_none(self._properties.get("writeMsAvg"))
2432
2433 @property
2434 def write_ms_max(self):
2435 """Optional[int]: Milliseconds the slowest worker spent writing
2436 output data.
2437 """
2438 return _helpers._int_or_none(self._properties.get("writeMsMax"))
2439
2440 @property
2441 def write_ratio_avg(self):
2442 """Optional[float]: Ratio of time the average worker spent writing
2443 output data, relative to the longest time spent by any worker in any
2444 stage of the overall plan.
2445 """
2446 return self._properties.get("writeRatioAvg")
2447
2448 @property
2449 def write_ratio_max(self):
2450 """Optional[float]: Ratio of time the slowest worker spent writing
2451 output data, relative to the longest time spent by any worker in any
2452 stage of the overall plan.
2453 """
2454 return self._properties.get("writeRatioMax")
2455
2456 @property
2457 def records_read(self):
2458 """Optional[int]: Number of records read by this stage."""
2459 return _helpers._int_or_none(self._properties.get("recordsRead"))
2460
2461 @property
2462 def records_written(self):
2463 """Optional[int]: Number of records written by this stage."""
2464 return _helpers._int_or_none(self._properties.get("recordsWritten"))
2465
2466 @property
2467 def status(self):
2468 """Optional[str]: status of this stage."""
2469 return self._properties.get("status")
2470
2471 @property
2472 def shuffle_output_bytes(self):
2473 """Optional[int]: Number of bytes written by this stage to
2474 intermediate shuffle.
2475 """
2476 return _helpers._int_or_none(self._properties.get("shuffleOutputBytes"))
2477
2478 @property
2479 def shuffle_output_bytes_spilled(self):
2480 """Optional[int]: Number of bytes written by this stage to
2481 intermediate shuffle and spilled to disk.
2482 """
2483 return _helpers._int_or_none(self._properties.get("shuffleOutputBytesSpilled"))
2484
2485 @property
2486 def steps(self):
2487 """List(QueryPlanEntryStep): List of step operations performed by
2488 each worker in the stage.
2489 """
2490 return [
2491 QueryPlanEntryStep.from_api_repr(step)
2492 for step in self._properties.get("steps", [])
2493 ]
2494
2495 @property
2496 def slot_ms(self):
2497 """Optional[int]: Slot-milliseconds used by the stage."""
2498 return _helpers._int_or_none(self._properties.get("slotMs"))
2499
2500
2501class TimelineEntry(object):
2502 """TimelineEntry represents progress of a query job at a particular
2503 point in time.
2504
2505 See
2506 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#querytimelinesample
2507 for the underlying API representation within query statistics.
2508 """
2509
2510 def __init__(self):
2511 self._properties = {}
2512
2513 @classmethod
2514 def from_api_repr(cls, resource):
2515 """Factory: construct instance from the JSON repr.
2516
2517 Args:
2518 resource(Dict[str: object]):
2519 QueryTimelineSample representation returned from API.
2520
2521 Returns:
2522 google.cloud.bigquery.TimelineEntry:
2523 Timeline sample parsed from ``resource``.
2524 """
2525 entry = cls()
2526 entry._properties = resource
2527 return entry
2528
2529 @property
2530 def elapsed_ms(self):
2531 """Optional[int]: Milliseconds elapsed since start of query
2532 execution."""
2533 return _helpers._int_or_none(self._properties.get("elapsedMs"))
2534
2535 @property
2536 def active_units(self):
2537 """Optional[int]: Current number of input units being processed
2538 by workers, reported as largest value since the last sample."""
2539 return _helpers._int_or_none(self._properties.get("activeUnits"))
2540
2541 @property
2542 def pending_units(self):
2543 """Optional[int]: Current number of input units remaining for
2544 query stages active at this sample time."""
2545 return _helpers._int_or_none(self._properties.get("pendingUnits"))
2546
2547 @property
2548 def completed_units(self):
2549 """Optional[int]: Current number of input units completed by
2550 this query."""
2551 return _helpers._int_or_none(self._properties.get("completedUnits"))
2552
2553 @property
2554 def slot_millis(self):
2555 """Optional[int]: Cumulative slot-milliseconds consumed by
2556 this query."""
2557 return _helpers._int_or_none(self._properties.get("totalSlotMs"))