1# Copyright 2015 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Classes for query jobs."""
16
17import concurrent.futures
18import copy
19import re
20import time
21import typing
22from typing import Any, Dict, Iterable, List, Optional, Union
23
24from google.api_core import exceptions
25from google.api_core import retry as retries
26import requests
27
28from google.cloud.bigquery.dataset import Dataset
29from google.cloud.bigquery.dataset import DatasetListItem
30from google.cloud.bigquery.dataset import DatasetReference
31from google.cloud.bigquery.encryption_configuration import EncryptionConfiguration
32from google.cloud.bigquery.enums import KeyResultStatementKind, DefaultPandasDTypes
33from google.cloud.bigquery.external_config import ExternalConfig
34from google.cloud.bigquery import _helpers
35from google.cloud.bigquery.query import (
36 _query_param_from_api_repr,
37 ArrayQueryParameter,
38 ConnectionProperty,
39 ScalarQueryParameter,
40 StructQueryParameter,
41 UDFResource,
42)
43from google.cloud.bigquery.retry import (
44 DEFAULT_RETRY,
45 DEFAULT_JOB_RETRY,
46 POLLING_DEFAULT_VALUE,
47)
48from google.cloud.bigquery.routine import RoutineReference
49from google.cloud.bigquery.schema import SchemaField
50from google.cloud.bigquery.table import _EmptyRowIterator
51from google.cloud.bigquery.table import RangePartitioning
52from google.cloud.bigquery.table import _table_arg_to_table_ref
53from google.cloud.bigquery.table import TableReference
54from google.cloud.bigquery.table import TimePartitioning
55from google.cloud.bigquery._tqdm_helpers import wait_for_query
56
57from google.cloud.bigquery.job.base import _AsyncJob
58from google.cloud.bigquery.job.base import _JobConfig
59from google.cloud.bigquery.job.base import _JobReference
60
61try:
62 import pandas # type: ignore
63except ImportError:
64 pandas = None
65
66if typing.TYPE_CHECKING: # pragma: NO COVER
67 # Assumption: type checks are only used by library developers and CI environments
68 # that have all optional dependencies installed, thus no conditional imports.
69 import pandas # type: ignore
70 import geopandas # type: ignore
71 import pyarrow # type: ignore
72 from google.cloud import bigquery_storage
73 from google.cloud.bigquery.client import Client
74 from google.cloud.bigquery.table import RowIterator
75
76
77_CONTAINS_ORDER_BY = re.compile(r"ORDER\s+BY", re.IGNORECASE)
78_EXCEPTION_FOOTER_TEMPLATE = "{message}\n\nLocation: {location}\nJob ID: {job_id}\n"
79_TIMEOUT_BUFFER_SECS = 0.1
80
81
82def _contains_order_by(query):
83 """Do we need to preserve the order of the query results?
84
85 This function has known false positives, such as with ordered window
86 functions:
87
88 .. code-block:: sql
89
90 SELECT SUM(x) OVER (
91 window_name
92 PARTITION BY...
93 ORDER BY...
94 window_frame_clause)
95 FROM ...
96
97 This false positive failure case means the behavior will be correct, but
98 downloading results with the BigQuery Storage API may be slower than it
99 otherwise would. This is preferable to the false negative case, where
100 results are expected to be in order but are not (due to parallel reads).
101 """
102 return query and _CONTAINS_ORDER_BY.search(query)
103
104
105def _from_api_repr_query_parameters(resource):
106 return [_query_param_from_api_repr(mapping) for mapping in resource]
107
108
109def _to_api_repr_query_parameters(value):
110 return [query_parameter.to_api_repr() for query_parameter in value]
111
112
113def _from_api_repr_udf_resources(resource):
114 udf_resources = []
115 for udf_mapping in resource:
116 for udf_type, udf_value in udf_mapping.items():
117 udf_resources.append(UDFResource(udf_type, udf_value))
118 return udf_resources
119
120
121def _to_api_repr_udf_resources(value):
122 return [{udf_resource.udf_type: udf_resource.value} for udf_resource in value]
123
124
125def _from_api_repr_table_defs(resource):
126 return {k: ExternalConfig.from_api_repr(v) for k, v in resource.items()}
127
128
129def _to_api_repr_table_defs(value):
130 return {k: ExternalConfig.to_api_repr(v) for k, v in value.items()}
131
132
133class BiEngineReason(typing.NamedTuple):
134 """Reason for BI Engine acceleration failure
135
136 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#bienginereason
137 """
138
139 code: str = "CODE_UNSPECIFIED"
140
141 reason: str = ""
142
143 @classmethod
144 def from_api_repr(cls, reason: Dict[str, str]) -> "BiEngineReason":
145 return cls(reason.get("code", "CODE_UNSPECIFIED"), reason.get("message", ""))
146
147
148class BiEngineStats(typing.NamedTuple):
149 """Statistics for a BI Engine query
150
151 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#bienginestatistics
152 """
153
154 mode: str = "ACCELERATION_MODE_UNSPECIFIED"
155 """ Specifies which mode of BI Engine acceleration was performed (if any)
156 """
157
158 reasons: List[BiEngineReason] = []
159 """ Contains explanatory messages in case of DISABLED / PARTIAL acceleration
160 """
161
162 @classmethod
163 def from_api_repr(cls, stats: Dict[str, Any]) -> "BiEngineStats":
164 mode = stats.get("biEngineMode", "ACCELERATION_MODE_UNSPECIFIED")
165 reasons = [
166 BiEngineReason.from_api_repr(r) for r in stats.get("biEngineReasons", [])
167 ]
168 return cls(mode, reasons)
169
170
171class DmlStats(typing.NamedTuple):
172 """Detailed statistics for DML statements.
173
174 https://cloud.google.com/bigquery/docs/reference/rest/v2/DmlStats
175 """
176
177 inserted_row_count: int = 0
178 """Number of inserted rows. Populated by DML INSERT and MERGE statements."""
179
180 deleted_row_count: int = 0
181 """Number of deleted rows. populated by DML DELETE, MERGE and TRUNCATE statements.
182 """
183
184 updated_row_count: int = 0
185 """Number of updated rows. Populated by DML UPDATE and MERGE statements."""
186
187 @classmethod
188 def from_api_repr(cls, stats: Dict[str, str]) -> "DmlStats":
189 # NOTE: The field order here must match the order of fields set at the
190 # class level.
191 api_fields = ("insertedRowCount", "deletedRowCount", "updatedRowCount")
192
193 args = (
194 int(stats.get(api_field, default_val))
195 for api_field, default_val in zip(api_fields, cls.__new__.__defaults__) # type: ignore
196 )
197 return cls(*args)
198
199
200class IndexUnusedReason(typing.NamedTuple):
201 """Reason about why no search index was used in the search query (or sub-query).
202
203 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#indexunusedreason
204 """
205
206 code: Optional[str] = None
207 """Specifies the high-level reason for the scenario when no search index was used.
208 """
209
210 message: Optional[str] = None
211 """Free form human-readable reason for the scenario when no search index was used.
212 """
213
214 baseTable: Optional[TableReference] = None
215 """Specifies the base table involved in the reason that no search index was used.
216 """
217
218 indexName: Optional[str] = None
219 """Specifies the name of the unused search index, if available."""
220
221 @classmethod
222 def from_api_repr(cls, reason):
223 code = reason.get("code")
224 message = reason.get("message")
225 baseTable = reason.get("baseTable")
226 indexName = reason.get("indexName")
227
228 return cls(code, message, baseTable, indexName)
229
230
231class SearchStats(typing.NamedTuple):
232 """Statistics related to Search Queries. Populated as part of JobStatistics2.
233
234 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#searchstatistics
235 """
236
237 mode: Optional[str] = None
238 """Indicates the type of search index usage in the entire search query."""
239
240 reason: List[IndexUnusedReason] = []
241 """Reason about why no search index was used in the search query (or sub-query)"""
242
243 @classmethod
244 def from_api_repr(cls, stats: Dict[str, Any]):
245 mode = stats.get("indexUsageMode", None)
246 reason = [
247 IndexUnusedReason.from_api_repr(r)
248 for r in stats.get("indexUnusedReasons", [])
249 ]
250 return cls(mode, reason)
251
252
253class ScriptOptions:
254 """Options controlling the execution of scripts.
255
256 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#ScriptOptions
257 """
258
259 def __init__(
260 self,
261 statement_timeout_ms: Optional[int] = None,
262 statement_byte_budget: Optional[int] = None,
263 key_result_statement: Optional[KeyResultStatementKind] = None,
264 ):
265 self._properties: Dict[str, Any] = {}
266 self.statement_timeout_ms = statement_timeout_ms
267 self.statement_byte_budget = statement_byte_budget
268 self.key_result_statement = key_result_statement
269
270 @classmethod
271 def from_api_repr(cls, resource: Dict[str, Any]) -> "ScriptOptions":
272 """Factory: construct instance from the JSON repr.
273
274 Args:
275 resource(Dict[str: Any]):
276 ScriptOptions representation returned from API.
277
278 Returns:
279 google.cloud.bigquery.ScriptOptions:
280 ScriptOptions sample parsed from ``resource``.
281 """
282 entry = cls()
283 entry._properties = copy.deepcopy(resource)
284 return entry
285
286 def to_api_repr(self) -> Dict[str, Any]:
287 """Construct the API resource representation."""
288 return copy.deepcopy(self._properties)
289
290 @property
291 def statement_timeout_ms(self) -> Union[int, None]:
292 """Timeout period for each statement in a script."""
293 return _helpers._int_or_none(self._properties.get("statementTimeoutMs"))
294
295 @statement_timeout_ms.setter
296 def statement_timeout_ms(self, value: Union[int, None]):
297 new_value = None if value is None else str(value)
298 self._properties["statementTimeoutMs"] = new_value
299
300 @property
301 def statement_byte_budget(self) -> Union[int, None]:
302 """Limit on the number of bytes billed per statement.
303
304 Exceeding this budget results in an error.
305 """
306 return _helpers._int_or_none(self._properties.get("statementByteBudget"))
307
308 @statement_byte_budget.setter
309 def statement_byte_budget(self, value: Union[int, None]):
310 new_value = None if value is None else str(value)
311 self._properties["statementByteBudget"] = new_value
312
313 @property
314 def key_result_statement(self) -> Union[KeyResultStatementKind, None]:
315 """Determines which statement in the script represents the "key result".
316
317 This is used to populate the schema and query results of the script job.
318 Default is ``KeyResultStatementKind.LAST``.
319 """
320 return self._properties.get("keyResultStatement")
321
322 @key_result_statement.setter
323 def key_result_statement(self, value: Union[KeyResultStatementKind, None]):
324 self._properties["keyResultStatement"] = value
325
326
327class QueryJobConfig(_JobConfig):
328 """Configuration options for query jobs.
329
330 All properties in this class are optional. Values which are :data:`None` ->
331 server defaults. Set properties on the constructed configuration by using
332 the property name as the name of a keyword argument.
333 """
334
335 def __init__(self, **kwargs) -> None:
336 super(QueryJobConfig, self).__init__("query", **kwargs)
337
338 @property
339 def destination_encryption_configuration(self):
340 """google.cloud.bigquery.encryption_configuration.EncryptionConfiguration: Custom
341 encryption configuration for the destination table.
342
343 Custom encryption configuration (e.g., Cloud KMS keys) or :data:`None`
344 if using default encryption.
345
346 See
347 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.destination_encryption_configuration
348 """
349 prop = self._get_sub_prop("destinationEncryptionConfiguration")
350 if prop is not None:
351 prop = EncryptionConfiguration.from_api_repr(prop)
352 return prop
353
354 @destination_encryption_configuration.setter
355 def destination_encryption_configuration(self, value):
356 api_repr = value
357 if value is not None:
358 api_repr = value.to_api_repr()
359 self._set_sub_prop("destinationEncryptionConfiguration", api_repr)
360
361 @property
362 def allow_large_results(self):
363 """bool: Allow large query results tables (legacy SQL, only)
364
365 See
366 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.allow_large_results
367 """
368 return self._get_sub_prop("allowLargeResults")
369
370 @allow_large_results.setter
371 def allow_large_results(self, value):
372 self._set_sub_prop("allowLargeResults", value)
373
374 @property
375 def connection_properties(self) -> List[ConnectionProperty]:
376 """Connection properties.
377
378 See
379 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.connection_properties
380
381 .. versionadded:: 2.29.0
382 """
383 resource = self._get_sub_prop("connectionProperties", [])
384 return [ConnectionProperty.from_api_repr(prop) for prop in resource]
385
386 @connection_properties.setter
387 def connection_properties(self, value: Iterable[ConnectionProperty]):
388 self._set_sub_prop(
389 "connectionProperties",
390 [prop.to_api_repr() for prop in value],
391 )
392
393 @property
394 def create_disposition(self):
395 """google.cloud.bigquery.job.CreateDisposition: Specifies behavior
396 for creating tables.
397
398 See
399 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.create_disposition
400 """
401 return self._get_sub_prop("createDisposition")
402
403 @create_disposition.setter
404 def create_disposition(self, value):
405 self._set_sub_prop("createDisposition", value)
406
407 @property
408 def create_session(self) -> Optional[bool]:
409 """[Preview] If :data:`True`, creates a new session, where
410 :attr:`~google.cloud.bigquery.job.QueryJob.session_info` will contain a
411 random server generated session id.
412
413 If :data:`False`, runs query with an existing ``session_id`` passed in
414 :attr:`~google.cloud.bigquery.job.QueryJobConfig.connection_properties`,
415 otherwise runs query in non-session mode.
416
417 See
418 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.create_session
419
420 .. versionadded:: 2.29.0
421 """
422 return self._get_sub_prop("createSession")
423
424 @create_session.setter
425 def create_session(self, value: Optional[bool]):
426 self._set_sub_prop("createSession", value)
427
428 @property
429 def default_dataset(self):
430 """google.cloud.bigquery.dataset.DatasetReference: the default dataset
431 to use for unqualified table names in the query or :data:`None` if not
432 set.
433
434 The ``default_dataset`` setter accepts:
435
436 - a :class:`~google.cloud.bigquery.dataset.Dataset`, or
437 - a :class:`~google.cloud.bigquery.dataset.DatasetReference`, or
438 - a :class:`str` of the fully-qualified dataset ID in standard SQL
439 format. The value must included a project ID and dataset ID
440 separated by ``.``. For example: ``your-project.your_dataset``.
441
442 See
443 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.default_dataset
444 """
445 prop = self._get_sub_prop("defaultDataset")
446 if prop is not None:
447 prop = DatasetReference.from_api_repr(prop)
448 return prop
449
450 @default_dataset.setter
451 def default_dataset(self, value):
452 if value is None:
453 self._set_sub_prop("defaultDataset", None)
454 return
455
456 if isinstance(value, str):
457 value = DatasetReference.from_string(value)
458
459 if isinstance(value, (Dataset, DatasetListItem)):
460 value = value.reference
461
462 resource = value.to_api_repr()
463 self._set_sub_prop("defaultDataset", resource)
464
465 @property
466 def destination(self):
467 """google.cloud.bigquery.table.TableReference: table where results are
468 written or :data:`None` if not set.
469
470 The ``destination`` setter accepts:
471
472 - a :class:`~google.cloud.bigquery.table.Table`, or
473 - a :class:`~google.cloud.bigquery.table.TableReference`, or
474 - a :class:`str` of the fully-qualified table ID in standard SQL
475 format. The value must included a project ID, dataset ID, and table
476 ID, each separated by ``.``. For example:
477 ``your-project.your_dataset.your_table``.
478
479 .. note::
480
481 Only table ID is passed to the backend, so any configuration
482 in `~google.cloud.bigquery.table.Table` is discarded.
483
484 See
485 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.destination_table
486 """
487 prop = self._get_sub_prop("destinationTable")
488 if prop is not None:
489 prop = TableReference.from_api_repr(prop)
490 return prop
491
492 @destination.setter
493 def destination(self, value):
494 if value is None:
495 self._set_sub_prop("destinationTable", None)
496 return
497
498 value = _table_arg_to_table_ref(value)
499 resource = value.to_api_repr()
500 self._set_sub_prop("destinationTable", resource)
501
502 @property
503 def dry_run(self):
504 """bool: :data:`True` if this query should be a dry run to estimate
505 costs.
506
507 See
508 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfiguration.FIELDS.dry_run
509 """
510 return self._properties.get("dryRun")
511
512 @dry_run.setter
513 def dry_run(self, value):
514 self._properties["dryRun"] = value
515
516 @property
517 def flatten_results(self):
518 """bool: Flatten nested/repeated fields in results. (Legacy SQL only)
519
520 See
521 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.flatten_results
522 """
523 return self._get_sub_prop("flattenResults")
524
525 @flatten_results.setter
526 def flatten_results(self, value):
527 self._set_sub_prop("flattenResults", value)
528
529 @property
530 def maximum_billing_tier(self):
531 """int: Deprecated. Changes the billing tier to allow high-compute
532 queries.
533
534 See
535 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.maximum_billing_tier
536 """
537 return self._get_sub_prop("maximumBillingTier")
538
539 @maximum_billing_tier.setter
540 def maximum_billing_tier(self, value):
541 self._set_sub_prop("maximumBillingTier", value)
542
543 @property
544 def maximum_bytes_billed(self):
545 """int: Maximum bytes to be billed for this job or :data:`None` if not set.
546
547 See
548 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.maximum_bytes_billed
549 """
550 return _helpers._int_or_none(self._get_sub_prop("maximumBytesBilled"))
551
552 @maximum_bytes_billed.setter
553 def maximum_bytes_billed(self, value):
554 self._set_sub_prop("maximumBytesBilled", str(value))
555
556 @property
557 def priority(self):
558 """google.cloud.bigquery.job.QueryPriority: Priority of the query.
559
560 See
561 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.priority
562 """
563 return self._get_sub_prop("priority")
564
565 @priority.setter
566 def priority(self, value):
567 self._set_sub_prop("priority", value)
568
569 @property
570 def query_parameters(self):
571 """List[Union[google.cloud.bigquery.query.ArrayQueryParameter, \
572 google.cloud.bigquery.query.ScalarQueryParameter, \
573 google.cloud.bigquery.query.StructQueryParameter]]: list of parameters
574 for parameterized query (empty by default)
575
576 See:
577 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.query_parameters
578 """
579 prop = self._get_sub_prop("queryParameters", default=[])
580 return _from_api_repr_query_parameters(prop)
581
582 @query_parameters.setter
583 def query_parameters(self, values):
584 self._set_sub_prop("queryParameters", _to_api_repr_query_parameters(values))
585
586 @property
587 def range_partitioning(self):
588 """Optional[google.cloud.bigquery.table.RangePartitioning]:
589 Configures range-based partitioning for destination table.
590
591 .. note::
592 **Beta**. The integer range partitioning feature is in a
593 pre-release state and might change or have limited support.
594
595 Only specify at most one of
596 :attr:`~google.cloud.bigquery.job.LoadJobConfig.time_partitioning` or
597 :attr:`~google.cloud.bigquery.job.LoadJobConfig.range_partitioning`.
598
599 Raises:
600 ValueError:
601 If the value is not
602 :class:`~google.cloud.bigquery.table.RangePartitioning` or
603 :data:`None`.
604 """
605 resource = self._get_sub_prop("rangePartitioning")
606 if resource is not None:
607 return RangePartitioning(_properties=resource)
608
609 @range_partitioning.setter
610 def range_partitioning(self, value):
611 resource = value
612 if isinstance(value, RangePartitioning):
613 resource = value._properties
614 elif value is not None:
615 raise ValueError(
616 "Expected value to be RangePartitioning or None, got {}.".format(value)
617 )
618 self._set_sub_prop("rangePartitioning", resource)
619
620 @property
621 def udf_resources(self):
622 """List[google.cloud.bigquery.query.UDFResource]: user
623 defined function resources (empty by default)
624
625 See:
626 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.user_defined_function_resources
627 """
628 prop = self._get_sub_prop("userDefinedFunctionResources", default=[])
629 return _from_api_repr_udf_resources(prop)
630
631 @udf_resources.setter
632 def udf_resources(self, values):
633 self._set_sub_prop(
634 "userDefinedFunctionResources", _to_api_repr_udf_resources(values)
635 )
636
637 @property
638 def use_legacy_sql(self):
639 """bool: Use legacy SQL syntax.
640
641 See
642 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.use_legacy_sql
643 """
644 return self._get_sub_prop("useLegacySql")
645
646 @use_legacy_sql.setter
647 def use_legacy_sql(self, value):
648 self._set_sub_prop("useLegacySql", value)
649
650 @property
651 def use_query_cache(self):
652 """bool: Look for the query result in the cache.
653
654 See
655 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.use_query_cache
656 """
657 return self._get_sub_prop("useQueryCache")
658
659 @use_query_cache.setter
660 def use_query_cache(self, value):
661 self._set_sub_prop("useQueryCache", value)
662
663 @property
664 def write_disposition(self):
665 """google.cloud.bigquery.job.WriteDisposition: Action that occurs if
666 the destination table already exists.
667
668 See
669 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.write_disposition
670 """
671 return self._get_sub_prop("writeDisposition")
672
673 @write_disposition.setter
674 def write_disposition(self, value):
675 self._set_sub_prop("writeDisposition", value)
676
677 @property
678 def write_incremental_results(self) -> Optional[bool]:
679 """This is only supported for a SELECT query using a temporary table.
680
681 If set, the query is allowed to write results incrementally to the temporary result
682 table. This may incur a performance penalty. This option cannot be used with Legacy SQL.
683
684 This feature is not generally available.
685 """
686 return self._get_sub_prop("writeIncrementalResults")
687
688 @write_incremental_results.setter
689 def write_incremental_results(self, value):
690 self._set_sub_prop("writeIncrementalResults", value)
691
692 @property
693 def table_definitions(self):
694 """Dict[str, google.cloud.bigquery.external_config.ExternalConfig]:
695 Definitions for external tables or :data:`None` if not set.
696
697 See
698 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.external_table_definitions
699 """
700 prop = self._get_sub_prop("tableDefinitions")
701 if prop is not None:
702 prop = _from_api_repr_table_defs(prop)
703 return prop
704
705 @table_definitions.setter
706 def table_definitions(self, values):
707 self._set_sub_prop("tableDefinitions", _to_api_repr_table_defs(values))
708
709 @property
710 def time_partitioning(self):
711 """Optional[google.cloud.bigquery.table.TimePartitioning]: Specifies
712 time-based partitioning for the destination table.
713
714 Only specify at most one of
715 :attr:`~google.cloud.bigquery.job.LoadJobConfig.time_partitioning` or
716 :attr:`~google.cloud.bigquery.job.LoadJobConfig.range_partitioning`.
717
718 Raises:
719 ValueError:
720 If the value is not
721 :class:`~google.cloud.bigquery.table.TimePartitioning` or
722 :data:`None`.
723 """
724 prop = self._get_sub_prop("timePartitioning")
725 if prop is not None:
726 prop = TimePartitioning.from_api_repr(prop)
727 return prop
728
729 @time_partitioning.setter
730 def time_partitioning(self, value):
731 api_repr = value
732 if value is not None:
733 api_repr = value.to_api_repr()
734 self._set_sub_prop("timePartitioning", api_repr)
735
736 @property
737 def clustering_fields(self):
738 """Optional[List[str]]: Fields defining clustering for the table
739
740 (Defaults to :data:`None`).
741
742 Clustering fields are immutable after table creation.
743
744 .. note::
745
746 BigQuery supports clustering for both partitioned and
747 non-partitioned tables.
748 """
749 prop = self._get_sub_prop("clustering")
750 if prop is not None:
751 return list(prop.get("fields", ()))
752
753 @clustering_fields.setter
754 def clustering_fields(self, value):
755 """Optional[List[str]]: Fields defining clustering for the table
756
757 (Defaults to :data:`None`).
758 """
759 if value is not None:
760 self._set_sub_prop("clustering", {"fields": value})
761 else:
762 self._del_sub_prop("clustering")
763
764 @property
765 def schema_update_options(self):
766 """List[google.cloud.bigquery.job.SchemaUpdateOption]: Specifies
767 updates to the destination table schema to allow as a side effect of
768 the query job.
769 """
770 return self._get_sub_prop("schemaUpdateOptions")
771
772 @schema_update_options.setter
773 def schema_update_options(self, values):
774 self._set_sub_prop("schemaUpdateOptions", values)
775
776 @property
777 def script_options(self) -> ScriptOptions:
778 """Options controlling the execution of scripts.
779
780 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#scriptoptions
781 """
782 prop = self._get_sub_prop("scriptOptions")
783 if prop is not None:
784 prop = ScriptOptions.from_api_repr(prop)
785 return prop
786
787 @script_options.setter
788 def script_options(self, value: Union[ScriptOptions, None]):
789 new_value = None if value is None else value.to_api_repr()
790 self._set_sub_prop("scriptOptions", new_value)
791
792 def to_api_repr(self) -> dict:
793 """Build an API representation of the query job config.
794
795 Returns:
796 Dict: A dictionary in the format used by the BigQuery API.
797 """
798 resource = copy.deepcopy(self._properties)
799 # Query parameters have an addition property associated with them
800 # to indicate if the query is using named or positional parameters.
801 query_parameters = resource.get("query", {}).get("queryParameters")
802 if query_parameters:
803 if query_parameters[0].get("name") is None:
804 resource["query"]["parameterMode"] = "POSITIONAL"
805 else:
806 resource["query"]["parameterMode"] = "NAMED"
807
808 return resource
809
810
811class QueryJob(_AsyncJob):
812 """Asynchronous job: query tables.
813
814 Args:
815 job_id (str): the job's ID, within the project belonging to ``client``.
816
817 query (str): SQL query string.
818
819 client (google.cloud.bigquery.client.Client):
820 A client which holds credentials and project configuration
821 for the dataset (which requires a project).
822
823 job_config (Optional[google.cloud.bigquery.job.QueryJobConfig]):
824 Extra configuration options for the query job.
825 """
826
827 _JOB_TYPE = "query"
828 _UDF_KEY = "userDefinedFunctionResources"
829 _CONFIG_CLASS = QueryJobConfig
830
831 def __init__(self, job_id, query, client, job_config=None):
832 super(QueryJob, self).__init__(job_id, client)
833
834 if job_config is not None:
835 self._properties["configuration"] = job_config._properties
836 if self.configuration.use_legacy_sql is None:
837 self.configuration.use_legacy_sql = False
838
839 if query:
840 _helpers._set_sub_prop(
841 self._properties, ["configuration", "query", "query"], query
842 )
843 self._query_results = None
844 self._done_timeout = None
845 self._transport_timeout = None
846
847 @property
848 def allow_large_results(self):
849 """See
850 :attr:`google.cloud.bigquery.job.QueryJobConfig.allow_large_results`.
851 """
852 return self.configuration.allow_large_results
853
854 @property
855 def configuration(self) -> QueryJobConfig:
856 """The configuration for this query job."""
857 return typing.cast(QueryJobConfig, super().configuration)
858
859 @property
860 def connection_properties(self) -> List[ConnectionProperty]:
861 """See
862 :attr:`google.cloud.bigquery.job.QueryJobConfig.connection_properties`.
863
864 .. versionadded:: 2.29.0
865 """
866 return self.configuration.connection_properties
867
868 @property
869 def create_disposition(self):
870 """See
871 :attr:`google.cloud.bigquery.job.QueryJobConfig.create_disposition`.
872 """
873 return self.configuration.create_disposition
874
875 @property
876 def create_session(self) -> Optional[bool]:
877 """See
878 :attr:`google.cloud.bigquery.job.QueryJobConfig.create_session`.
879
880 .. versionadded:: 2.29.0
881 """
882 return self.configuration.create_session
883
884 @property
885 def default_dataset(self):
886 """See
887 :attr:`google.cloud.bigquery.job.QueryJobConfig.default_dataset`.
888 """
889 return self.configuration.default_dataset
890
891 @property
892 def destination(self):
893 """See
894 :attr:`google.cloud.bigquery.job.QueryJobConfig.destination`.
895 """
896 return self.configuration.destination
897
898 @property
899 def destination_encryption_configuration(self):
900 """google.cloud.bigquery.encryption_configuration.EncryptionConfiguration: Custom
901 encryption configuration for the destination table.
902
903 Custom encryption configuration (e.g., Cloud KMS keys) or :data:`None`
904 if using default encryption.
905
906 See
907 :attr:`google.cloud.bigquery.job.QueryJobConfig.destination_encryption_configuration`.
908 """
909 return self.configuration.destination_encryption_configuration
910
911 @property
912 def dry_run(self):
913 """See
914 :attr:`google.cloud.bigquery.job.QueryJobConfig.dry_run`.
915 """
916 return self.configuration.dry_run
917
918 @property
919 def flatten_results(self):
920 """See
921 :attr:`google.cloud.bigquery.job.QueryJobConfig.flatten_results`.
922 """
923 return self.configuration.flatten_results
924
925 @property
926 def priority(self):
927 """See
928 :attr:`google.cloud.bigquery.job.QueryJobConfig.priority`.
929 """
930 return self.configuration.priority
931
932 @property
933 def search_stats(self) -> Optional[SearchStats]:
934 """Returns a SearchStats object."""
935
936 stats = self._job_statistics().get("searchStatistics")
937 if stats is not None:
938 return SearchStats.from_api_repr(stats)
939 return None
940
941 @property
942 def query(self):
943 """str: The query text used in this query job.
944
945 See:
946 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.query
947 """
948 return _helpers._get_sub_prop(
949 self._properties, ["configuration", "query", "query"]
950 )
951
952 @property
953 def query_id(self) -> Optional[str]:
954 """[Preview] ID of a completed query.
955
956 This ID is auto-generated and not guaranteed to be populated.
957 """
958 query_results = self._query_results
959 return query_results.query_id if query_results is not None else None
960
961 @property
962 def query_parameters(self):
963 """See
964 :attr:`google.cloud.bigquery.job.QueryJobConfig.query_parameters`.
965 """
966 return self.configuration.query_parameters
967
968 @property
969 def udf_resources(self):
970 """See
971 :attr:`google.cloud.bigquery.job.QueryJobConfig.udf_resources`.
972 """
973 return self.configuration.udf_resources
974
975 @property
976 def use_legacy_sql(self):
977 """See
978 :attr:`google.cloud.bigquery.job.QueryJobConfig.use_legacy_sql`.
979 """
980 return self.configuration.use_legacy_sql
981
982 @property
983 def use_query_cache(self):
984 """See
985 :attr:`google.cloud.bigquery.job.QueryJobConfig.use_query_cache`.
986 """
987 return self.configuration.use_query_cache
988
989 @property
990 def write_disposition(self):
991 """See
992 :attr:`google.cloud.bigquery.job.QueryJobConfig.write_disposition`.
993 """
994 return self.configuration.write_disposition
995
996 @property
997 def maximum_billing_tier(self):
998 """See
999 :attr:`google.cloud.bigquery.job.QueryJobConfig.maximum_billing_tier`.
1000 """
1001 return self.configuration.maximum_billing_tier
1002
1003 @property
1004 def maximum_bytes_billed(self):
1005 """See
1006 :attr:`google.cloud.bigquery.job.QueryJobConfig.maximum_bytes_billed`.
1007 """
1008 return self.configuration.maximum_bytes_billed
1009
1010 @property
1011 def range_partitioning(self):
1012 """See
1013 :attr:`google.cloud.bigquery.job.QueryJobConfig.range_partitioning`.
1014 """
1015 return self.configuration.range_partitioning
1016
1017 @property
1018 def table_definitions(self):
1019 """See
1020 :attr:`google.cloud.bigquery.job.QueryJobConfig.table_definitions`.
1021 """
1022 return self.configuration.table_definitions
1023
1024 @property
1025 def time_partitioning(self):
1026 """See
1027 :attr:`google.cloud.bigquery.job.QueryJobConfig.time_partitioning`.
1028 """
1029 return self.configuration.time_partitioning
1030
1031 @property
1032 def clustering_fields(self):
1033 """See
1034 :attr:`google.cloud.bigquery.job.QueryJobConfig.clustering_fields`.
1035 """
1036 return self.configuration.clustering_fields
1037
1038 @property
1039 def schema_update_options(self):
1040 """See
1041 :attr:`google.cloud.bigquery.job.QueryJobConfig.schema_update_options`.
1042 """
1043 return self.configuration.schema_update_options
1044
1045 def to_api_repr(self):
1046 """Generate a resource for :meth:`_begin`."""
1047 # Use to_api_repr to allow for some configuration properties to be set
1048 # automatically.
1049 configuration = self.configuration.to_api_repr()
1050 return {
1051 "jobReference": self._properties["jobReference"],
1052 "configuration": configuration,
1053 }
1054
1055 @classmethod
1056 def from_api_repr(cls, resource: dict, client: "Client") -> "QueryJob":
1057 """Factory: construct a job given its API representation
1058
1059 Args:
1060 resource (Dict): dataset job representation returned from the API
1061
1062 client (google.cloud.bigquery.client.Client):
1063 Client which holds credentials and project
1064 configuration for the dataset.
1065
1066 Returns:
1067 google.cloud.bigquery.job.QueryJob: Job parsed from ``resource``.
1068 """
1069 job_ref_properties = resource.setdefault(
1070 "jobReference", {"projectId": client.project, "jobId": None}
1071 )
1072 job_ref = _JobReference._from_api_repr(job_ref_properties)
1073 job = cls(job_ref, None, client=client)
1074 job._set_properties(resource)
1075 return job
1076
1077 @property
1078 def query_plan(self):
1079 """Return query plan from job statistics, if present.
1080
1081 See:
1082 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics2.FIELDS.query_plan
1083
1084 Returns:
1085 List[google.cloud.bigquery.job.QueryPlanEntry]:
1086 mappings describing the query plan, or an empty list
1087 if the query has not yet completed.
1088 """
1089 plan_entries = self._job_statistics().get("queryPlan", ())
1090 return [QueryPlanEntry.from_api_repr(entry) for entry in plan_entries]
1091
1092 @property
1093 def schema(self) -> Optional[List[SchemaField]]:
1094 """The schema of the results.
1095
1096 Present only for successful dry run of non-legacy SQL queries.
1097 """
1098 resource = self._job_statistics().get("schema")
1099 if resource is None:
1100 return None
1101 fields = resource.get("fields", [])
1102 return [SchemaField.from_api_repr(field) for field in fields]
1103
1104 @property
1105 def timeline(self):
1106 """List(TimelineEntry): Return the query execution timeline
1107 from job statistics.
1108 """
1109 raw = self._job_statistics().get("timeline", ())
1110 return [TimelineEntry.from_api_repr(entry) for entry in raw]
1111
1112 @property
1113 def total_bytes_processed(self):
1114 """Return total bytes processed from job statistics, if present.
1115
1116 See:
1117 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics2.FIELDS.total_bytes_processed
1118
1119 Returns:
1120 Optional[int]:
1121 Total bytes processed by the job, or None if job is not
1122 yet complete.
1123 """
1124 result = self._job_statistics().get("totalBytesProcessed")
1125 if result is not None:
1126 result = int(result)
1127 return result
1128
1129 @property
1130 def total_bytes_billed(self):
1131 """Return total bytes billed from job statistics, if present.
1132
1133 See:
1134 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics2.FIELDS.total_bytes_billed
1135
1136 Returns:
1137 Optional[int]:
1138 Total bytes processed by the job, or None if job is not
1139 yet complete.
1140 """
1141 result = self._job_statistics().get("totalBytesBilled")
1142 if result is not None:
1143 result = int(result)
1144 return result
1145
1146 @property
1147 def billing_tier(self):
1148 """Return billing tier from job statistics, if present.
1149
1150 See:
1151 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics2.FIELDS.billing_tier
1152
1153 Returns:
1154 Optional[int]:
1155 Billing tier used by the job, or None if job is not
1156 yet complete.
1157 """
1158 return self._job_statistics().get("billingTier")
1159
1160 @property
1161 def cache_hit(self):
1162 """Return whether or not query results were served from cache.
1163
1164 See:
1165 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics2.FIELDS.cache_hit
1166
1167 Returns:
1168 Optional[bool]:
1169 whether the query results were returned from cache, or None
1170 if job is not yet complete.
1171 """
1172 return self._job_statistics().get("cacheHit")
1173
1174 @property
1175 def ddl_operation_performed(self):
1176 """Optional[str]: Return the DDL operation performed.
1177
1178 See:
1179 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics2.FIELDS.ddl_operation_performed
1180
1181 """
1182 return self._job_statistics().get("ddlOperationPerformed")
1183
1184 @property
1185 def ddl_target_routine(self):
1186 """Optional[google.cloud.bigquery.routine.RoutineReference]: Return the DDL target routine, present
1187 for CREATE/DROP FUNCTION/PROCEDURE queries.
1188
1189 See:
1190 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics2.FIELDS.ddl_target_routine
1191 """
1192 prop = self._job_statistics().get("ddlTargetRoutine")
1193 if prop is not None:
1194 prop = RoutineReference.from_api_repr(prop)
1195 return prop
1196
1197 @property
1198 def ddl_target_table(self):
1199 """Optional[google.cloud.bigquery.table.TableReference]: Return the DDL target table, present
1200 for CREATE/DROP TABLE/VIEW queries.
1201
1202 See:
1203 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics2.FIELDS.ddl_target_table
1204 """
1205 prop = self._job_statistics().get("ddlTargetTable")
1206 if prop is not None:
1207 prop = TableReference.from_api_repr(prop)
1208 return prop
1209
1210 @property
1211 def num_dml_affected_rows(self) -> Optional[int]:
1212 """Return the number of DML rows affected by the job.
1213
1214 See:
1215 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics2.FIELDS.num_dml_affected_rows
1216
1217 Returns:
1218 Optional[int]:
1219 number of DML rows affected by the job, or None if job is not
1220 yet complete.
1221 """
1222 result = self._job_statistics().get("numDmlAffectedRows")
1223 if result is not None:
1224 result = int(result)
1225 return result
1226
1227 @property
1228 def slot_millis(self):
1229 """Union[int, None]: Slot-milliseconds used by this query job."""
1230 return _helpers._int_or_none(self._job_statistics().get("totalSlotMs"))
1231
1232 @property
1233 def statement_type(self):
1234 """Return statement type from job statistics, if present.
1235
1236 See:
1237 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics2.FIELDS.statement_type
1238
1239 Returns:
1240 Optional[str]:
1241 type of statement used by the job, or None if job is not
1242 yet complete.
1243 """
1244 return self._job_statistics().get("statementType")
1245
1246 @property
1247 def referenced_tables(self):
1248 """Return referenced tables from job statistics, if present.
1249
1250 See:
1251 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics2.FIELDS.referenced_tables
1252
1253 Returns:
1254 List[Dict]:
1255 mappings describing the query plan, or an empty list
1256 if the query has not yet completed.
1257 """
1258 tables = []
1259 datasets_by_project_name = {}
1260
1261 for table in self._job_statistics().get("referencedTables", ()):
1262 t_project = table["projectId"]
1263
1264 ds_id = table["datasetId"]
1265 t_dataset = datasets_by_project_name.get((t_project, ds_id))
1266 if t_dataset is None:
1267 t_dataset = DatasetReference(t_project, ds_id)
1268 datasets_by_project_name[(t_project, ds_id)] = t_dataset
1269
1270 t_name = table["tableId"]
1271 tables.append(t_dataset.table(t_name))
1272
1273 return tables
1274
1275 @property
1276 def undeclared_query_parameters(self):
1277 """Return undeclared query parameters from job statistics, if present.
1278
1279 See:
1280 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics2.FIELDS.undeclared_query_parameters
1281
1282 Returns:
1283 List[Union[ \
1284 google.cloud.bigquery.query.ArrayQueryParameter, \
1285 google.cloud.bigquery.query.ScalarQueryParameter, \
1286 google.cloud.bigquery.query.StructQueryParameter \
1287 ]]:
1288 Undeclared parameters, or an empty list if the query has
1289 not yet completed.
1290 """
1291 parameters = []
1292 undeclared = self._job_statistics().get("undeclaredQueryParameters", ())
1293
1294 for parameter in undeclared:
1295 p_type = parameter["parameterType"]
1296
1297 if "arrayType" in p_type:
1298 klass = ArrayQueryParameter
1299 elif "structTypes" in p_type:
1300 klass = StructQueryParameter
1301 else:
1302 klass = ScalarQueryParameter
1303
1304 parameters.append(klass.from_api_repr(parameter))
1305
1306 return parameters
1307
1308 @property
1309 def estimated_bytes_processed(self):
1310 """Return the estimated number of bytes processed by the query.
1311
1312 See:
1313 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics2.FIELDS.estimated_bytes_processed
1314
1315 Returns:
1316 Optional[int]:
1317 number of DML rows affected by the job, or None if job is not
1318 yet complete.
1319 """
1320 result = self._job_statistics().get("estimatedBytesProcessed")
1321 if result is not None:
1322 result = int(result)
1323 return result
1324
1325 @property
1326 def dml_stats(self) -> Optional[DmlStats]:
1327 stats = self._job_statistics().get("dmlStats")
1328 if stats is None:
1329 return None
1330 else:
1331 return DmlStats.from_api_repr(stats)
1332
1333 @property
1334 def bi_engine_stats(self) -> Optional[BiEngineStats]:
1335 stats = self._job_statistics().get("biEngineStatistics")
1336
1337 if stats is None:
1338 return None
1339 else:
1340 return BiEngineStats.from_api_repr(stats)
1341
1342 def _blocking_poll(self, timeout=None, **kwargs):
1343 self._done_timeout = timeout
1344 self._transport_timeout = timeout
1345 super(QueryJob, self)._blocking_poll(timeout=timeout, **kwargs)
1346
1347 @staticmethod
1348 def _format_for_exception(message: str, query: str):
1349 """Format a query for the output in exception message.
1350
1351 Args:
1352 message (str): The original exception message.
1353 query (str): The SQL query to format.
1354
1355 Returns:
1356 str: A formatted query text.
1357 """
1358 template = "{message}\n\n{header}\n\n{ruler}\n{body}\n{ruler}"
1359
1360 lines = query.splitlines() if query is not None else [""]
1361 max_line_len = max(len(line) for line in lines)
1362
1363 header = "-----Query Job SQL Follows-----"
1364 header = "{:^{total_width}}".format(header, total_width=max_line_len + 5)
1365
1366 # Print out a "ruler" above and below the SQL so we can judge columns.
1367 # Left pad for the line numbers (4 digits plus ":").
1368 ruler = " |" + " . |" * (max_line_len // 10)
1369
1370 # Put line numbers next to the SQL.
1371 body = "\n".join(
1372 "{:4}:{}".format(n, line) for n, line in enumerate(lines, start=1)
1373 )
1374
1375 return template.format(message=message, header=header, ruler=ruler, body=body)
1376
1377 def _begin(self, client=None, retry=DEFAULT_RETRY, timeout=None):
1378 """API call: begin the job via a POST request
1379
1380 See
1381 https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/insert
1382
1383 Args:
1384 client (Optional[google.cloud.bigquery.client.Client]):
1385 The client to use. If not passed, falls back to the ``client``
1386 associated with the job object or``NoneType``.
1387 retry (Optional[google.api_core.retry.Retry]):
1388 How to retry the RPC.
1389 timeout (Optional[float]):
1390 The number of seconds to wait for the underlying HTTP transport
1391 before using ``retry``.
1392
1393 Raises:
1394 ValueError: If the job has already begun.
1395 """
1396
1397 try:
1398 super(QueryJob, self)._begin(client=client, retry=retry, timeout=timeout)
1399 except exceptions.GoogleAPICallError as exc:
1400 exc.message = _EXCEPTION_FOOTER_TEMPLATE.format(
1401 message=exc.message, location=self.location, job_id=self.job_id
1402 )
1403 exc.debug_message = self._format_for_exception(exc.message, self.query)
1404 exc.query_job = self
1405 raise
1406
1407 def _reload_query_results(
1408 self,
1409 retry: "retries.Retry" = DEFAULT_RETRY,
1410 timeout: Optional[float] = None,
1411 page_size: int = 0,
1412 start_index: Optional[int] = None,
1413 ):
1414 """Refresh the cached query results unless already cached and complete.
1415
1416 Args:
1417 retry (Optional[google.api_core.retry.Retry]):
1418 How to retry the call that retrieves query results.
1419 timeout (Optional[float]):
1420 The number of seconds to wait for the underlying HTTP transport
1421 before using ``retry``.
1422 page_size (int):
1423 Maximum number of rows in a single response. See maxResults in
1424 the jobs.getQueryResults REST API.
1425 start_index (Optional[int]):
1426 Zero-based index of the starting row. See startIndex in the
1427 jobs.getQueryResults REST API.
1428 """
1429 # Optimization: avoid a call to jobs.getQueryResults if it's already
1430 # been fetched, e.g. from jobs.query first page of results.
1431 if self._query_results and self._query_results.complete:
1432 return
1433
1434 # Since the API to getQueryResults can hang up to the timeout value
1435 # (default of 10 seconds), set the timeout parameter to ensure that
1436 # the timeout from the futures API is respected. See:
1437 # https://github.com/GoogleCloudPlatform/google-cloud-python/issues/4135
1438 timeout_ms = None
1439
1440 # Python_API_core, as part of a major rewrite of the deadline, timeout,
1441 # retry process sets the timeout value as a Python object().
1442 # Our system does not natively handle that and instead expects
1443 # either None or a numeric value. If passed a Python object, convert to
1444 # None.
1445 if type(self._done_timeout) is object: # pragma: NO COVER
1446 self._done_timeout = None
1447
1448 if self._done_timeout is not None: # pragma: NO COVER
1449 # Subtract a buffer for context switching, network latency, etc.
1450 api_timeout = self._done_timeout - _TIMEOUT_BUFFER_SECS
1451 api_timeout = max(min(api_timeout, 10), 0)
1452 self._done_timeout -= api_timeout
1453 self._done_timeout = max(0, self._done_timeout)
1454 timeout_ms = int(api_timeout * 1000)
1455
1456 # If an explicit timeout is not given, fall back to the transport timeout
1457 # stored in _blocking_poll() in the process of polling for job completion.
1458 if timeout is not None:
1459 transport_timeout = timeout
1460 else:
1461 transport_timeout = self._transport_timeout
1462
1463 # Handle PollingJob._DEFAULT_VALUE.
1464 if not isinstance(transport_timeout, (float, int)):
1465 transport_timeout = None
1466
1467 self._query_results = self._client._get_query_results(
1468 self.job_id,
1469 retry,
1470 project=self.project,
1471 timeout_ms=timeout_ms,
1472 location=self.location,
1473 timeout=transport_timeout,
1474 page_size=page_size,
1475 start_index=start_index,
1476 )
1477
1478 def result( # type: ignore # (incompatible with supertype)
1479 self,
1480 page_size: Optional[int] = None,
1481 max_results: Optional[int] = None,
1482 retry: Optional[retries.Retry] = DEFAULT_RETRY,
1483 timeout: Optional[Union[float, object]] = POLLING_DEFAULT_VALUE,
1484 start_index: Optional[int] = None,
1485 job_retry: Optional[retries.Retry] = DEFAULT_JOB_RETRY,
1486 ) -> Union["RowIterator", _EmptyRowIterator]:
1487 """Start the job and wait for it to complete and get the result.
1488
1489 Args:
1490 page_size (Optional[int]):
1491 The maximum number of rows in each page of results from this
1492 request. Non-positive values are ignored.
1493 max_results (Optional[int]):
1494 The maximum total number of rows from this request.
1495 retry (Optional[google.api_core.retry.Retry]):
1496 How to retry the call that retrieves rows. This only
1497 applies to making RPC calls. It isn't used to retry
1498 failed jobs. This has a reasonable default that
1499 should only be overridden with care. If the job state
1500 is ``DONE``, retrying is aborted early even if the
1501 results are not available, as this will not change
1502 anymore.
1503 timeout (Optional[Union[float, \
1504 google.api_core.future.polling.PollingFuture._DEFAULT_VALUE, \
1505 ]]):
1506 The number of seconds to wait for the underlying HTTP transport
1507 before using ``retry``. If ``None``, wait indefinitely
1508 unless an error is returned. If unset, only the
1509 underlying API calls have their default timeouts, but we still
1510 wait indefinitely for the job to finish.
1511 start_index (Optional[int]):
1512 The zero-based index of the starting row to read.
1513 job_retry (Optional[google.api_core.retry.Retry]):
1514 How to retry failed jobs. The default retries
1515 rate-limit-exceeded errors. Passing ``None`` disables
1516 job retry.
1517
1518 Not all jobs can be retried. If ``job_id`` was
1519 provided to the query that created this job, then the
1520 job returned by the query will not be retryable, and
1521 an exception will be raised if non-``None``
1522 non-default ``job_retry`` is also provided.
1523
1524 Returns:
1525 google.cloud.bigquery.table.RowIterator:
1526 Iterator of row data
1527 :class:`~google.cloud.bigquery.table.Row`-s. During each
1528 page, the iterator will have the ``total_rows`` attribute
1529 set, which counts the total number of rows **in the result
1530 set** (this is distinct from the total number of rows in the
1531 current page: ``iterator.page.num_items``).
1532
1533 If the query is a special query that produces no results, e.g.
1534 a DDL query, an ``_EmptyRowIterator`` instance is returned.
1535
1536 Raises:
1537 google.api_core.exceptions.GoogleAPICallError:
1538 If the job failed and retries aren't successful.
1539 concurrent.futures.TimeoutError:
1540 If the job did not complete in the given timeout.
1541 TypeError:
1542 If Non-``None`` and non-default ``job_retry`` is
1543 provided and the job is not retryable.
1544 """
1545 # Note: Since waiting for a query job to finish is more complex than
1546 # refreshing the job state in a loop, we avoid calling the superclass
1547 # in this method.
1548
1549 if self.dry_run:
1550 return _EmptyRowIterator(
1551 project=self.project,
1552 location=self.location,
1553 # Intentionally omit job_id and query_id since this doesn't
1554 # actually correspond to a finished query job.
1555 )
1556
1557 # Setting max_results should be equivalent to setting page_size with
1558 # regards to allowing the user to tune how many results to download
1559 # while we wait for the query to finish. See internal issue:
1560 # 344008814. But if start_index is set, user is trying to access a
1561 # specific page, so we don't need to set page_size. See issue #1950.
1562 if page_size is None and max_results is not None and start_index is None:
1563 page_size = max_results
1564
1565 # When timeout has default sentinel value ``object()``, do not pass
1566 # anything to invoke default timeouts in subsequent calls.
1567 done_kwargs: Dict[str, Union[_helpers.TimeoutType, object]] = {}
1568 reload_query_results_kwargs: Dict[str, Union[_helpers.TimeoutType, object]] = {}
1569 list_rows_kwargs: Dict[str, Union[_helpers.TimeoutType, object]] = {}
1570 if type(timeout) is not object:
1571 done_kwargs["timeout"] = timeout
1572 list_rows_kwargs["timeout"] = timeout
1573 reload_query_results_kwargs["timeout"] = timeout
1574
1575 if page_size is not None:
1576 reload_query_results_kwargs["page_size"] = page_size
1577
1578 if start_index is not None:
1579 reload_query_results_kwargs["start_index"] = start_index
1580
1581 try:
1582 retry_do_query = getattr(self, "_retry_do_query", None)
1583 if retry_do_query is not None:
1584 if job_retry is DEFAULT_JOB_RETRY:
1585 job_retry = self._job_retry # type: ignore
1586 else:
1587 if job_retry is not None and job_retry is not DEFAULT_JOB_RETRY:
1588 raise TypeError(
1589 "`job_retry` was provided, but this job is"
1590 " not retryable, because a custom `job_id` was"
1591 " provided to the query that created this job."
1592 )
1593
1594 restart_query_job = False
1595
1596 def is_job_done():
1597 nonlocal restart_query_job
1598
1599 if restart_query_job:
1600 restart_query_job = False
1601
1602 # The original job has failed. Create a new one.
1603 #
1604 # Note that we won't get here if retry_do_query is
1605 # None, because we won't use a retry.
1606 job = retry_do_query()
1607
1608 # Become the new job:
1609 self.__dict__.clear()
1610 self.__dict__.update(job.__dict__)
1611
1612 # It's possible the job fails again and we'll have to
1613 # retry that too.
1614 self._retry_do_query = retry_do_query
1615 self._job_retry = job_retry
1616
1617 # If the job hasn't been created, create it now. Related:
1618 # https://github.com/googleapis/python-bigquery/issues/1940
1619 if self.state is None:
1620 self._begin(retry=retry, **done_kwargs)
1621
1622 # Refresh the job status with jobs.get because some of the
1623 # exceptions thrown by jobs.getQueryResults like timeout and
1624 # rateLimitExceeded errors are ambiguous. We want to know if
1625 # the query job failed and not just the call to
1626 # jobs.getQueryResults.
1627 if self.done(retry=retry, **done_kwargs):
1628 # If it's already failed, we might as well stop.
1629 job_failed_exception = self.exception()
1630 if job_failed_exception is not None:
1631 # Only try to restart the query job if the job failed for
1632 # a retriable reason. For example, don't restart the query
1633 # if the call to reload the job metadata within self.done()
1634 # timed out.
1635 #
1636 # The `restart_query_job` must only be called after a
1637 # successful call to the `jobs.get` REST API and we
1638 # determine that the job has failed.
1639 #
1640 # The `jobs.get` REST API
1641 # (https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/get)
1642 # is called via `self.done()` which calls
1643 # `self.reload()`.
1644 #
1645 # To determine if the job failed, the `self.exception()`
1646 # is set from `self.reload()` via
1647 # `self._set_properties()`, which translates the
1648 # `Job.status.errorResult` field
1649 # (https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatus.FIELDS.error_result)
1650 # into an exception that can be processed by the
1651 # `job_retry` predicate.
1652 restart_query_job = True
1653 raise job_failed_exception
1654 else:
1655 # Make sure that the _query_results are cached so we
1656 # can return a complete RowIterator.
1657 #
1658 # Note: As an optimization, _reload_query_results
1659 # doesn't make any API calls if the query results are
1660 # already cached and have jobComplete=True in the
1661 # response from the REST API. This ensures we aren't
1662 # making any extra API calls if the previous loop
1663 # iteration fetched the finished job.
1664 self._reload_query_results(
1665 retry=retry, **reload_query_results_kwargs
1666 )
1667 return True
1668
1669 # Call jobs.getQueryResults with max results set to 0 just to
1670 # wait for the query to finish. Unlike most methods,
1671 # jobs.getQueryResults hangs as long as it can to ensure we
1672 # know when the query has finished as soon as possible.
1673 self._reload_query_results(retry=retry, **reload_query_results_kwargs)
1674
1675 # Even if the query is finished now according to
1676 # jobs.getQueryResults, we'll want to reload the job status if
1677 # it's not already DONE.
1678 return False
1679
1680 if retry_do_query is not None and job_retry is not None:
1681 is_job_done = job_retry(is_job_done)
1682
1683 # timeout can be a number of seconds, `None`, or a
1684 # `google.api_core.future.polling.PollingFuture._DEFAULT_VALUE`
1685 # sentinel object indicating a default timeout if we choose to add
1686 # one some day. This value can come from our PollingFuture
1687 # superclass and was introduced in
1688 # https://github.com/googleapis/python-api-core/pull/462.
1689 if isinstance(timeout, (float, int)):
1690 remaining_timeout = timeout
1691 else:
1692 # Note: we may need to handle _DEFAULT_VALUE as a separate
1693 # case someday, but even then the best we can do for queries
1694 # is 72+ hours for hyperparameter tuning jobs:
1695 # https://cloud.google.com/bigquery/quotas#query_jobs
1696 #
1697 # The timeout for a multi-statement query is 24+ hours. See:
1698 # https://cloud.google.com/bigquery/quotas#multi_statement_query_limits
1699 remaining_timeout = None
1700
1701 if remaining_timeout is None:
1702 # Since is_job_done() calls jobs.getQueryResults, which is a
1703 # long-running API, don't delay the next request at all.
1704 while not is_job_done():
1705 pass
1706 else:
1707 # Use a monotonic clock since we don't actually care about
1708 # daylight savings or similar, just the elapsed time.
1709 previous_time = time.monotonic()
1710
1711 while not is_job_done():
1712 current_time = time.monotonic()
1713 elapsed_time = current_time - previous_time
1714 remaining_timeout = remaining_timeout - elapsed_time
1715 previous_time = current_time
1716
1717 if remaining_timeout < 0:
1718 raise concurrent.futures.TimeoutError()
1719
1720 except exceptions.GoogleAPICallError as exc:
1721 exc.message = _EXCEPTION_FOOTER_TEMPLATE.format(
1722 message=exc.message, location=self.location, job_id=self.job_id
1723 )
1724 exc.debug_message = self._format_for_exception(exc.message, self.query) # type: ignore
1725 exc.query_job = self # type: ignore
1726 raise
1727 except requests.exceptions.Timeout as exc:
1728 raise concurrent.futures.TimeoutError from exc
1729
1730 # If the query job is complete but there are no query results, this was
1731 # special job, such as a DDL query. Return an empty result set to
1732 # indicate success and avoid calling tabledata.list on a table which
1733 # can't be read (such as a view table).
1734 if self._query_results.total_rows is None:
1735 return _EmptyRowIterator(
1736 location=self.location,
1737 project=self.project,
1738 job_id=self.job_id,
1739 query_id=self.query_id,
1740 num_dml_affected_rows=self._query_results.num_dml_affected_rows,
1741 )
1742
1743 # We know that there's at least 1 row, so only treat the response from
1744 # jobs.getQueryResults / jobs.query as the first page of the
1745 # RowIterator response if there are any rows in it. This prevents us
1746 # from stopping the iteration early in the cases where we set
1747 # maxResults=0. In that case, we're missing rows and there's no next
1748 # page token.
1749 first_page_response = self._query_results._properties
1750 if "rows" not in first_page_response:
1751 first_page_response = None
1752
1753 rows = self._client._list_rows_from_query_results(
1754 self.job_id,
1755 self.location,
1756 self.project,
1757 self._query_results.schema,
1758 total_rows=self._query_results.total_rows,
1759 destination=self.destination,
1760 page_size=page_size,
1761 max_results=max_results,
1762 start_index=start_index,
1763 retry=retry,
1764 query_id=self.query_id,
1765 first_page_response=first_page_response,
1766 num_dml_affected_rows=self._query_results.num_dml_affected_rows,
1767 query=self.query,
1768 total_bytes_processed=self.total_bytes_processed,
1769 slot_millis=self.slot_millis,
1770 **list_rows_kwargs,
1771 )
1772 rows._preserve_order = _contains_order_by(self.query)
1773 return rows
1774
1775 # If changing the signature of this method, make sure to apply the same
1776 # changes to table.RowIterator.to_arrow(), except for the max_results parameter
1777 # that should only exist here in the QueryJob method.
1778 def to_arrow(
1779 self,
1780 progress_bar_type: Optional[str] = None,
1781 bqstorage_client: Optional["bigquery_storage.BigQueryReadClient"] = None,
1782 create_bqstorage_client: bool = True,
1783 max_results: Optional[int] = None,
1784 ) -> "pyarrow.Table":
1785 """[Beta] Create a class:`pyarrow.Table` by loading all pages of a
1786 table or query.
1787
1788 Args:
1789 progress_bar_type (Optional[str]):
1790 If set, use the `tqdm <https://tqdm.github.io/>`_ library to
1791 display a progress bar while the data downloads. Install the
1792 ``tqdm`` package to use this feature.
1793
1794 Possible values of ``progress_bar_type`` include:
1795
1796 ``None``
1797 No progress bar.
1798 ``'tqdm'``
1799 Use the :func:`tqdm.tqdm` function to print a progress bar
1800 to :data:`sys.stdout`.
1801 ``'tqdm_notebook'``
1802 Use the :func:`tqdm.notebook.tqdm` function to display a
1803 progress bar as a Jupyter notebook widget.
1804 ``'tqdm_gui'``
1805 Use the :func:`tqdm.tqdm_gui` function to display a
1806 progress bar as a graphical dialog box.
1807 bqstorage_client (Optional[google.cloud.bigquery_storage_v1.BigQueryReadClient]):
1808 A BigQuery Storage API client. If supplied, use the faster
1809 BigQuery Storage API to fetch rows from BigQuery. This API
1810 is a billable API.
1811
1812 This method requires ``google-cloud-bigquery-storage`` library.
1813
1814 Reading from a specific partition or snapshot is not
1815 currently supported by this method.
1816 create_bqstorage_client (Optional[bool]):
1817 If ``True`` (default), create a BigQuery Storage API client
1818 using the default API settings. The BigQuery Storage API
1819 is a faster way to fetch rows from BigQuery. See the
1820 ``bqstorage_client`` parameter for more information.
1821
1822 This argument does nothing if ``bqstorage_client`` is supplied.
1823
1824 .. versionadded:: 1.24.0
1825
1826 max_results (Optional[int]):
1827 Maximum number of rows to include in the result. No limit by default.
1828
1829 .. versionadded:: 2.21.0
1830
1831 Returns:
1832 pyarrow.Table
1833 A :class:`pyarrow.Table` populated with row data and column
1834 headers from the query results. The column headers are derived
1835 from the destination table's schema.
1836
1837 Raises:
1838 ValueError:
1839 If the :mod:`pyarrow` library cannot be imported.
1840
1841 .. versionadded:: 1.17.0
1842 """
1843 query_result = wait_for_query(self, progress_bar_type, max_results=max_results)
1844 return query_result.to_arrow(
1845 progress_bar_type=progress_bar_type,
1846 bqstorage_client=bqstorage_client,
1847 create_bqstorage_client=create_bqstorage_client,
1848 )
1849
1850 # If changing the signature of this method, make sure to apply the same
1851 # changes to table.RowIterator.to_dataframe(), except for the max_results parameter
1852 # that should only exist here in the QueryJob method.
1853 def to_dataframe(
1854 self,
1855 bqstorage_client: Optional["bigquery_storage.BigQueryReadClient"] = None,
1856 dtypes: Optional[Dict[str, Any]] = None,
1857 progress_bar_type: Optional[str] = None,
1858 create_bqstorage_client: bool = True,
1859 max_results: Optional[int] = None,
1860 geography_as_object: bool = False,
1861 bool_dtype: Union[Any, None] = DefaultPandasDTypes.BOOL_DTYPE,
1862 int_dtype: Union[Any, None] = DefaultPandasDTypes.INT_DTYPE,
1863 float_dtype: Union[Any, None] = None,
1864 string_dtype: Union[Any, None] = None,
1865 date_dtype: Union[Any, None] = DefaultPandasDTypes.DATE_DTYPE,
1866 datetime_dtype: Union[Any, None] = None,
1867 time_dtype: Union[Any, None] = DefaultPandasDTypes.TIME_DTYPE,
1868 timestamp_dtype: Union[Any, None] = None,
1869 range_date_dtype: Union[Any, None] = DefaultPandasDTypes.RANGE_DATE_DTYPE,
1870 range_datetime_dtype: Union[
1871 Any, None
1872 ] = DefaultPandasDTypes.RANGE_DATETIME_DTYPE,
1873 range_timestamp_dtype: Union[
1874 Any, None
1875 ] = DefaultPandasDTypes.RANGE_TIMESTAMP_DTYPE,
1876 ) -> "pandas.DataFrame":
1877 """Return a pandas DataFrame from a QueryJob
1878
1879 Args:
1880 bqstorage_client (Optional[google.cloud.bigquery_storage_v1.BigQueryReadClient]):
1881 A BigQuery Storage API client. If supplied, use the faster
1882 BigQuery Storage API to fetch rows from BigQuery. This
1883 API is a billable API.
1884
1885 This method requires the ``fastavro`` and
1886 ``google-cloud-bigquery-storage`` libraries.
1887
1888 Reading from a specific partition or snapshot is not
1889 currently supported by this method.
1890
1891 dtypes (Optional[Map[str, Union[str, pandas.Series.dtype]]]):
1892 A dictionary of column names pandas ``dtype``s. The provided
1893 ``dtype`` is used when constructing the series for the column
1894 specified. Otherwise, the default pandas behavior is used.
1895
1896 progress_bar_type (Optional[str]):
1897 If set, use the `tqdm <https://tqdm.github.io/>`_ library to
1898 display a progress bar while the data downloads. Install the
1899 ``tqdm`` package to use this feature.
1900
1901 See
1902 :func:`~google.cloud.bigquery.table.RowIterator.to_dataframe`
1903 for details.
1904
1905 .. versionadded:: 1.11.0
1906 create_bqstorage_client (Optional[bool]):
1907 If ``True`` (default), create a BigQuery Storage API client
1908 using the default API settings. The BigQuery Storage API
1909 is a faster way to fetch rows from BigQuery. See the
1910 ``bqstorage_client`` parameter for more information.
1911
1912 This argument does nothing if ``bqstorage_client`` is supplied.
1913
1914 .. versionadded:: 1.24.0
1915
1916 max_results (Optional[int]):
1917 Maximum number of rows to include in the result. No limit by default.
1918
1919 .. versionadded:: 2.21.0
1920
1921 geography_as_object (Optional[bool]):
1922 If ``True``, convert GEOGRAPHY data to :mod:`shapely`
1923 geometry objects. If ``False`` (default), don't cast
1924 geography data to :mod:`shapely` geometry objects.
1925
1926 .. versionadded:: 2.24.0
1927
1928 bool_dtype (Optional[pandas.Series.dtype, None]):
1929 If set, indicate a pandas ExtensionDtype (e.g. ``pandas.BooleanDtype()``)
1930 to convert BigQuery Boolean type, instead of relying on the default
1931 ``pandas.BooleanDtype()``. If you explicitly set the value to ``None``,
1932 then the data type will be ``numpy.dtype("bool")``. BigQuery Boolean
1933 type can be found at:
1934 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#boolean_type
1935
1936 .. versionadded:: 3.8.0
1937
1938 int_dtype (Optional[pandas.Series.dtype, None]):
1939 If set, indicate a pandas ExtensionDtype (e.g. ``pandas.Int64Dtype()``)
1940 to convert BigQuery Integer types, instead of relying on the default
1941 ``pandas.Int64Dtype()``. If you explicitly set the value to ``None``,
1942 then the data type will be ``numpy.dtype("int64")``. A list of BigQuery
1943 Integer types can be found at:
1944 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#integer_types
1945
1946 .. versionadded:: 3.8.0
1947
1948 float_dtype (Optional[pandas.Series.dtype, None]):
1949 If set, indicate a pandas ExtensionDtype (e.g. ``pandas.Float32Dtype()``)
1950 to convert BigQuery Float type, instead of relying on the default
1951 ``numpy.dtype("float64")``. If you explicitly set the value to ``None``,
1952 then the data type will be ``numpy.dtype("float64")``. BigQuery Float
1953 type can be found at:
1954 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#floating_point_types
1955
1956 .. versionadded:: 3.8.0
1957
1958 string_dtype (Optional[pandas.Series.dtype, None]):
1959 If set, indicate a pandas ExtensionDtype (e.g. ``pandas.StringDtype()``) to
1960 convert BigQuery String type, instead of relying on the default
1961 ``numpy.dtype("object")``. If you explicitly set the value to ``None``,
1962 then the data type will be ``numpy.dtype("object")``. BigQuery String
1963 type can be found at:
1964 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#string_type
1965
1966 .. versionadded:: 3.8.0
1967
1968 date_dtype (Optional[pandas.Series.dtype, None]):
1969 If set, indicate a pandas ExtensionDtype (e.g.
1970 ``pandas.ArrowDtype(pyarrow.date32())``) to convert BigQuery Date
1971 type, instead of relying on the default ``db_dtypes.DateDtype()``.
1972 If you explicitly set the value to ``None``, then the data type will be
1973 ``numpy.dtype("datetime64[ns]")`` or ``object`` if out of bound. BigQuery
1974 Date type can be found at:
1975 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#date_type
1976
1977 .. versionadded:: 3.10.0
1978
1979 datetime_dtype (Optional[pandas.Series.dtype, None]):
1980 If set, indicate a pandas ExtensionDtype (e.g.
1981 ``pandas.ArrowDtype(pyarrow.timestamp("us"))``) to convert BigQuery Datetime
1982 type, instead of relying on the default ``numpy.dtype("datetime64[ns]``.
1983 If you explicitly set the value to ``None``, then the data type will be
1984 ``numpy.dtype("datetime64[ns]")`` or ``object`` if out of bound. BigQuery
1985 Datetime type can be found at:
1986 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#datetime_type
1987
1988 .. versionadded:: 3.10.0
1989
1990 time_dtype (Optional[pandas.Series.dtype, None]):
1991 If set, indicate a pandas ExtensionDtype (e.g.
1992 ``pandas.ArrowDtype(pyarrow.time64("us"))``) to convert BigQuery Time
1993 type, instead of relying on the default ``db_dtypes.TimeDtype()``.
1994 If you explicitly set the value to ``None``, then the data type will be
1995 ``numpy.dtype("object")``. BigQuery Time type can be found at:
1996 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#time_type
1997
1998 .. versionadded:: 3.10.0
1999
2000 timestamp_dtype (Optional[pandas.Series.dtype, None]):
2001 If set, indicate a pandas ExtensionDtype (e.g.
2002 ``pandas.ArrowDtype(pyarrow.timestamp("us", tz="UTC"))``) to convert BigQuery Timestamp
2003 type, instead of relying on the default ``numpy.dtype("datetime64[ns, UTC]")``.
2004 If you explicitly set the value to ``None``, then the data type will be
2005 ``numpy.dtype("datetime64[ns, UTC]")`` or ``object`` if out of bound. BigQuery
2006 Datetime type can be found at:
2007 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#timestamp_type
2008
2009 .. versionadded:: 3.10.0
2010
2011 range_date_dtype (Optional[pandas.Series.dtype, None]):
2012 If set, indicate a pandas ExtensionDtype, such as:
2013
2014 .. code-block:: python
2015
2016 pandas.ArrowDtype(pyarrow.struct(
2017 [("start", pyarrow.date32()), ("end", pyarrow.date32())]
2018 ))
2019
2020 to convert BigQuery RANGE<DATE> type, instead of relying on
2021 the default ``object``. If you explicitly set the value to
2022 ``None``, the data type will be ``object``. BigQuery Range type
2023 can be found at:
2024 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#range_type
2025
2026 .. versionadded:: 3.21.0
2027
2028 range_datetime_dtype (Optional[pandas.Series.dtype, None]):
2029 If set, indicate a pandas ExtensionDtype, such as:
2030
2031 .. code-block:: python
2032
2033 pandas.ArrowDtype(pyarrow.struct(
2034 [
2035 ("start", pyarrow.timestamp("us")),
2036 ("end", pyarrow.timestamp("us")),
2037 ]
2038 ))
2039
2040 to convert BigQuery RANGE<DATETIME> type, instead of relying on
2041 the default ``object``. If you explicitly set the value to
2042 ``None``, the data type will be ``object``. BigQuery Range type
2043 can be found at:
2044 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#range_type
2045
2046 .. versionadded:: 3.21.0
2047
2048 range_timestamp_dtype (Optional[pandas.Series.dtype, None]):
2049 If set, indicate a pandas ExtensionDtype, such as:
2050
2051 .. code-block:: python
2052
2053 pandas.ArrowDtype(pyarrow.struct(
2054 [
2055 ("start", pyarrow.timestamp("us", tz="UTC")),
2056 ("end", pyarrow.timestamp("us", tz="UTC")),
2057 ]
2058 ))
2059
2060 to convert BigQuery RANGE<TIMESTAMP> type, instead of relying
2061 on the default ``object``. If you explicitly set the value to
2062 ``None``, the data type will be ``object``. BigQuery Range type
2063 can be found at:
2064 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#range_type
2065
2066 .. versionadded:: 3.21.0
2067
2068 Returns:
2069 pandas.DataFrame:
2070 A :class:`~pandas.DataFrame` populated with row data
2071 and column headers from the query results. The column
2072 headers are derived from the destination table's
2073 schema.
2074
2075 Raises:
2076 ValueError:
2077 If the :mod:`pandas` library cannot be imported, or
2078 the :mod:`google.cloud.bigquery_storage_v1` module is
2079 required but cannot be imported. Also if
2080 `geography_as_object` is `True`, but the
2081 :mod:`shapely` library cannot be imported.
2082 """
2083 query_result = wait_for_query(self, progress_bar_type, max_results=max_results)
2084 return query_result.to_dataframe(
2085 bqstorage_client=bqstorage_client,
2086 dtypes=dtypes,
2087 progress_bar_type=progress_bar_type,
2088 create_bqstorage_client=create_bqstorage_client,
2089 geography_as_object=geography_as_object,
2090 bool_dtype=bool_dtype,
2091 int_dtype=int_dtype,
2092 float_dtype=float_dtype,
2093 string_dtype=string_dtype,
2094 date_dtype=date_dtype,
2095 datetime_dtype=datetime_dtype,
2096 time_dtype=time_dtype,
2097 timestamp_dtype=timestamp_dtype,
2098 range_date_dtype=range_date_dtype,
2099 range_datetime_dtype=range_datetime_dtype,
2100 range_timestamp_dtype=range_timestamp_dtype,
2101 )
2102
2103 # If changing the signature of this method, make sure to apply the same
2104 # changes to table.RowIterator.to_dataframe(), except for the max_results parameter
2105 # that should only exist here in the QueryJob method.
2106 def to_geodataframe(
2107 self,
2108 bqstorage_client: Optional["bigquery_storage.BigQueryReadClient"] = None,
2109 dtypes: Optional[Dict[str, Any]] = None,
2110 progress_bar_type: Optional[str] = None,
2111 create_bqstorage_client: bool = True,
2112 max_results: Optional[int] = None,
2113 geography_column: Optional[str] = None,
2114 bool_dtype: Union[Any, None] = DefaultPandasDTypes.BOOL_DTYPE,
2115 int_dtype: Union[Any, None] = DefaultPandasDTypes.INT_DTYPE,
2116 float_dtype: Union[Any, None] = None,
2117 string_dtype: Union[Any, None] = None,
2118 ) -> "geopandas.GeoDataFrame":
2119 """Return a GeoPandas GeoDataFrame from a QueryJob
2120
2121 Args:
2122 bqstorage_client (Optional[google.cloud.bigquery_storage_v1.BigQueryReadClient]):
2123 A BigQuery Storage API client. If supplied, use the faster
2124 BigQuery Storage API to fetch rows from BigQuery. This
2125 API is a billable API.
2126
2127 This method requires the ``fastavro`` and
2128 ``google-cloud-bigquery-storage`` libraries.
2129
2130 Reading from a specific partition or snapshot is not
2131 currently supported by this method.
2132
2133 dtypes (Optional[Map[str, Union[str, pandas.Series.dtype]]]):
2134 A dictionary of column names pandas ``dtype``s. The provided
2135 ``dtype`` is used when constructing the series for the column
2136 specified. Otherwise, the default pandas behavior is used.
2137
2138 progress_bar_type (Optional[str]):
2139 If set, use the `tqdm <https://tqdm.github.io/>`_ library to
2140 display a progress bar while the data downloads. Install the
2141 ``tqdm`` package to use this feature.
2142
2143 See
2144 :func:`~google.cloud.bigquery.table.RowIterator.to_dataframe`
2145 for details.
2146
2147 .. versionadded:: 1.11.0
2148 create_bqstorage_client (Optional[bool]):
2149 If ``True`` (default), create a BigQuery Storage API client
2150 using the default API settings. The BigQuery Storage API
2151 is a faster way to fetch rows from BigQuery. See the
2152 ``bqstorage_client`` parameter for more information.
2153
2154 This argument does nothing if ``bqstorage_client`` is supplied.
2155
2156 .. versionadded:: 1.24.0
2157
2158 max_results (Optional[int]):
2159 Maximum number of rows to include in the result. No limit by default.
2160
2161 .. versionadded:: 2.21.0
2162
2163 geography_column (Optional[str]):
2164 If there are more than one GEOGRAPHY column,
2165 identifies which one to use to construct a GeoPandas
2166 GeoDataFrame. This option can be ommitted if there's
2167 only one GEOGRAPHY column.
2168 bool_dtype (Optional[pandas.Series.dtype, None]):
2169 If set, indicate a pandas ExtensionDtype (e.g. ``pandas.BooleanDtype()``)
2170 to convert BigQuery Boolean type, instead of relying on the default
2171 ``pandas.BooleanDtype()``. If you explicitly set the value to ``None``,
2172 then the data type will be ``numpy.dtype("bool")``. BigQuery Boolean
2173 type can be found at:
2174 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#boolean_type
2175 int_dtype (Optional[pandas.Series.dtype, None]):
2176 If set, indicate a pandas ExtensionDtype (e.g. ``pandas.Int64Dtype()``)
2177 to convert BigQuery Integer types, instead of relying on the default
2178 ``pandas.Int64Dtype()``. If you explicitly set the value to ``None``,
2179 then the data type will be ``numpy.dtype("int64")``. A list of BigQuery
2180 Integer types can be found at:
2181 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#integer_types
2182 float_dtype (Optional[pandas.Series.dtype, None]):
2183 If set, indicate a pandas ExtensionDtype (e.g. ``pandas.Float32Dtype()``)
2184 to convert BigQuery Float type, instead of relying on the default
2185 ``numpy.dtype("float64")``. If you explicitly set the value to ``None``,
2186 then the data type will be ``numpy.dtype("float64")``. BigQuery Float
2187 type can be found at:
2188 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#floating_point_types
2189 string_dtype (Optional[pandas.Series.dtype, None]):
2190 If set, indicate a pandas ExtensionDtype (e.g. ``pandas.StringDtype()``) to
2191 convert BigQuery String type, instead of relying on the default
2192 ``numpy.dtype("object")``. If you explicitly set the value to ``None``,
2193 then the data type will be ``numpy.dtype("object")``. BigQuery String
2194 type can be found at:
2195 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#string_type
2196
2197 Returns:
2198 geopandas.GeoDataFrame:
2199 A :class:`geopandas.GeoDataFrame` populated with row
2200 data and column headers from the query results. The
2201 column headers are derived from the destination
2202 table's schema.
2203
2204 Raises:
2205 ValueError:
2206 If the :mod:`geopandas` library cannot be imported, or the
2207 :mod:`google.cloud.bigquery_storage_v1` module is
2208 required but cannot be imported.
2209
2210 .. versionadded:: 2.24.0
2211 """
2212 query_result = wait_for_query(self, progress_bar_type, max_results=max_results)
2213 return query_result.to_geodataframe(
2214 bqstorage_client=bqstorage_client,
2215 dtypes=dtypes,
2216 progress_bar_type=progress_bar_type,
2217 create_bqstorage_client=create_bqstorage_client,
2218 geography_column=geography_column,
2219 bool_dtype=bool_dtype,
2220 int_dtype=int_dtype,
2221 float_dtype=float_dtype,
2222 string_dtype=string_dtype,
2223 )
2224
2225 def __iter__(self):
2226 return iter(self.result())
2227
2228
2229class QueryPlanEntryStep(object):
2230 """Map a single step in a query plan entry.
2231
2232 Args:
2233 kind (str): step type.
2234 substeps (List): names of substeps.
2235 """
2236
2237 def __init__(self, kind, substeps):
2238 self.kind = kind
2239 self.substeps = list(substeps)
2240
2241 @classmethod
2242 def from_api_repr(cls, resource: dict) -> "QueryPlanEntryStep":
2243 """Factory: construct instance from the JSON repr.
2244
2245 Args:
2246 resource (Dict): JSON representation of the entry.
2247
2248 Returns:
2249 google.cloud.bigquery.job.QueryPlanEntryStep:
2250 New instance built from the resource.
2251 """
2252 return cls(kind=resource.get("kind"), substeps=resource.get("substeps", ()))
2253
2254 def __eq__(self, other):
2255 if not isinstance(other, self.__class__):
2256 return NotImplemented
2257 return self.kind == other.kind and self.substeps == other.substeps
2258
2259
2260class QueryPlanEntry(object):
2261 """QueryPlanEntry represents a single stage of a query execution plan.
2262
2263 See
2264 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#ExplainQueryStage
2265 for the underlying API representation within query statistics.
2266 """
2267
2268 def __init__(self):
2269 self._properties = {}
2270
2271 @classmethod
2272 def from_api_repr(cls, resource: dict) -> "QueryPlanEntry":
2273 """Factory: construct instance from the JSON repr.
2274
2275 Args:
2276 resource(Dict[str: object]):
2277 ExplainQueryStage representation returned from API.
2278
2279 Returns:
2280 google.cloud.bigquery.job.QueryPlanEntry:
2281 Query plan entry parsed from ``resource``.
2282 """
2283 entry = cls()
2284 entry._properties = resource
2285 return entry
2286
2287 @property
2288 def name(self):
2289 """Optional[str]: Human-readable name of the stage."""
2290 return self._properties.get("name")
2291
2292 @property
2293 def entry_id(self):
2294 """Optional[str]: Unique ID for the stage within the plan."""
2295 return self._properties.get("id")
2296
2297 @property
2298 def start(self):
2299 """Optional[Datetime]: Datetime when the stage started."""
2300 if self._properties.get("startMs") is None:
2301 return None
2302 return _helpers._datetime_from_microseconds(
2303 int(self._properties.get("startMs")) * 1000.0
2304 )
2305
2306 @property
2307 def end(self):
2308 """Optional[Datetime]: Datetime when the stage ended."""
2309 if self._properties.get("endMs") is None:
2310 return None
2311 return _helpers._datetime_from_microseconds(
2312 int(self._properties.get("endMs")) * 1000.0
2313 )
2314
2315 @property
2316 def input_stages(self):
2317 """List(int): Entry IDs for stages that were inputs for this stage."""
2318 if self._properties.get("inputStages") is None:
2319 return []
2320 return [
2321 _helpers._int_or_none(entry)
2322 for entry in self._properties.get("inputStages")
2323 ]
2324
2325 @property
2326 def parallel_inputs(self):
2327 """Optional[int]: Number of parallel input segments within
2328 the stage.
2329 """
2330 return _helpers._int_or_none(self._properties.get("parallelInputs"))
2331
2332 @property
2333 def completed_parallel_inputs(self):
2334 """Optional[int]: Number of parallel input segments completed."""
2335 return _helpers._int_or_none(self._properties.get("completedParallelInputs"))
2336
2337 @property
2338 def wait_ms_avg(self):
2339 """Optional[int]: Milliseconds the average worker spent waiting to
2340 be scheduled.
2341 """
2342 return _helpers._int_or_none(self._properties.get("waitMsAvg"))
2343
2344 @property
2345 def wait_ms_max(self):
2346 """Optional[int]: Milliseconds the slowest worker spent waiting to
2347 be scheduled.
2348 """
2349 return _helpers._int_or_none(self._properties.get("waitMsMax"))
2350
2351 @property
2352 def wait_ratio_avg(self):
2353 """Optional[float]: Ratio of time the average worker spent waiting
2354 to be scheduled, relative to the longest time spent by any worker in
2355 any stage of the overall plan.
2356 """
2357 return self._properties.get("waitRatioAvg")
2358
2359 @property
2360 def wait_ratio_max(self):
2361 """Optional[float]: Ratio of time the slowest worker spent waiting
2362 to be scheduled, relative to the longest time spent by any worker in
2363 any stage of the overall plan.
2364 """
2365 return self._properties.get("waitRatioMax")
2366
2367 @property
2368 def read_ms_avg(self):
2369 """Optional[int]: Milliseconds the average worker spent reading
2370 input.
2371 """
2372 return _helpers._int_or_none(self._properties.get("readMsAvg"))
2373
2374 @property
2375 def read_ms_max(self):
2376 """Optional[int]: Milliseconds the slowest worker spent reading
2377 input.
2378 """
2379 return _helpers._int_or_none(self._properties.get("readMsMax"))
2380
2381 @property
2382 def read_ratio_avg(self):
2383 """Optional[float]: Ratio of time the average worker spent reading
2384 input, relative to the longest time spent by any worker in any stage
2385 of the overall plan.
2386 """
2387 return self._properties.get("readRatioAvg")
2388
2389 @property
2390 def read_ratio_max(self):
2391 """Optional[float]: Ratio of time the slowest worker spent reading
2392 to be scheduled, relative to the longest time spent by any worker in
2393 any stage of the overall plan.
2394 """
2395 return self._properties.get("readRatioMax")
2396
2397 @property
2398 def compute_ms_avg(self):
2399 """Optional[int]: Milliseconds the average worker spent on CPU-bound
2400 processing.
2401 """
2402 return _helpers._int_or_none(self._properties.get("computeMsAvg"))
2403
2404 @property
2405 def compute_ms_max(self):
2406 """Optional[int]: Milliseconds the slowest worker spent on CPU-bound
2407 processing.
2408 """
2409 return _helpers._int_or_none(self._properties.get("computeMsMax"))
2410
2411 @property
2412 def compute_ratio_avg(self):
2413 """Optional[float]: Ratio of time the average worker spent on
2414 CPU-bound processing, relative to the longest time spent by any
2415 worker in any stage of the overall plan.
2416 """
2417 return self._properties.get("computeRatioAvg")
2418
2419 @property
2420 def compute_ratio_max(self):
2421 """Optional[float]: Ratio of time the slowest worker spent on
2422 CPU-bound processing, relative to the longest time spent by any
2423 worker in any stage of the overall plan.
2424 """
2425 return self._properties.get("computeRatioMax")
2426
2427 @property
2428 def write_ms_avg(self):
2429 """Optional[int]: Milliseconds the average worker spent writing
2430 output data.
2431 """
2432 return _helpers._int_or_none(self._properties.get("writeMsAvg"))
2433
2434 @property
2435 def write_ms_max(self):
2436 """Optional[int]: Milliseconds the slowest worker spent writing
2437 output data.
2438 """
2439 return _helpers._int_or_none(self._properties.get("writeMsMax"))
2440
2441 @property
2442 def write_ratio_avg(self):
2443 """Optional[float]: Ratio of time the average worker spent writing
2444 output data, relative to the longest time spent by any worker in any
2445 stage of the overall plan.
2446 """
2447 return self._properties.get("writeRatioAvg")
2448
2449 @property
2450 def write_ratio_max(self):
2451 """Optional[float]: Ratio of time the slowest worker spent writing
2452 output data, relative to the longest time spent by any worker in any
2453 stage of the overall plan.
2454 """
2455 return self._properties.get("writeRatioMax")
2456
2457 @property
2458 def records_read(self):
2459 """Optional[int]: Number of records read by this stage."""
2460 return _helpers._int_or_none(self._properties.get("recordsRead"))
2461
2462 @property
2463 def records_written(self):
2464 """Optional[int]: Number of records written by this stage."""
2465 return _helpers._int_or_none(self._properties.get("recordsWritten"))
2466
2467 @property
2468 def status(self):
2469 """Optional[str]: status of this stage."""
2470 return self._properties.get("status")
2471
2472 @property
2473 def shuffle_output_bytes(self):
2474 """Optional[int]: Number of bytes written by this stage to
2475 intermediate shuffle.
2476 """
2477 return _helpers._int_or_none(self._properties.get("shuffleOutputBytes"))
2478
2479 @property
2480 def shuffle_output_bytes_spilled(self):
2481 """Optional[int]: Number of bytes written by this stage to
2482 intermediate shuffle and spilled to disk.
2483 """
2484 return _helpers._int_or_none(self._properties.get("shuffleOutputBytesSpilled"))
2485
2486 @property
2487 def steps(self):
2488 """List(QueryPlanEntryStep): List of step operations performed by
2489 each worker in the stage.
2490 """
2491 return [
2492 QueryPlanEntryStep.from_api_repr(step)
2493 for step in self._properties.get("steps", [])
2494 ]
2495
2496 @property
2497 def slot_ms(self):
2498 """Optional[int]: Slot-milliseconds used by the stage."""
2499 return _helpers._int_or_none(self._properties.get("slotMs"))
2500
2501
2502class TimelineEntry(object):
2503 """TimelineEntry represents progress of a query job at a particular
2504 point in time.
2505
2506 See
2507 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#querytimelinesample
2508 for the underlying API representation within query statistics.
2509 """
2510
2511 def __init__(self):
2512 self._properties = {}
2513
2514 @classmethod
2515 def from_api_repr(cls, resource):
2516 """Factory: construct instance from the JSON repr.
2517
2518 Args:
2519 resource(Dict[str: object]):
2520 QueryTimelineSample representation returned from API.
2521
2522 Returns:
2523 google.cloud.bigquery.TimelineEntry:
2524 Timeline sample parsed from ``resource``.
2525 """
2526 entry = cls()
2527 entry._properties = resource
2528 return entry
2529
2530 @property
2531 def elapsed_ms(self):
2532 """Optional[int]: Milliseconds elapsed since start of query
2533 execution."""
2534 return _helpers._int_or_none(self._properties.get("elapsedMs"))
2535
2536 @property
2537 def active_units(self):
2538 """Optional[int]: Current number of input units being processed
2539 by workers, reported as largest value since the last sample."""
2540 return _helpers._int_or_none(self._properties.get("activeUnits"))
2541
2542 @property
2543 def pending_units(self):
2544 """Optional[int]: Current number of input units remaining for
2545 query stages active at this sample time."""
2546 return _helpers._int_or_none(self._properties.get("pendingUnits"))
2547
2548 @property
2549 def completed_units(self):
2550 """Optional[int]: Current number of input units completed by
2551 this query."""
2552 return _helpers._int_or_none(self._properties.get("completedUnits"))
2553
2554 @property
2555 def slot_millis(self):
2556 """Optional[int]: Cumulative slot-milliseconds consumed by
2557 this query."""
2558 return _helpers._int_or_none(self._properties.get("totalSlotMs"))