1# Copyright 2015 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Client for interacting with the Google BigQuery API."""
16
17from __future__ import absolute_import
18from __future__ import division
19
20from collections import abc as collections_abc
21import copy
22import datetime
23import functools
24import gzip
25import io
26import itertools
27import json
28import math
29import os
30import tempfile
31import typing
32from typing import (
33 Any,
34 Dict,
35 IO,
36 Iterable,
37 Mapping,
38 List,
39 Optional,
40 Sequence,
41 Tuple,
42 Union,
43)
44import uuid
45import warnings
46
47import requests
48
49from google import resumable_media # type: ignore
50from google.resumable_media.requests import MultipartUpload # type: ignore
51from google.resumable_media.requests import ResumableUpload
52
53import google.api_core.client_options
54import google.api_core.exceptions as core_exceptions
55from google.api_core.iam import Policy
56from google.api_core import page_iterator
57from google.api_core import retry as retries
58import google.cloud._helpers # type: ignore
59from google.cloud import exceptions # pytype: disable=import-error
60from google.cloud.client import ClientWithProject # type: ignore # pytype: disable=import-error
61
62try:
63 from google.cloud.bigquery_storage_v1.services.big_query_read.client import (
64 DEFAULT_CLIENT_INFO as DEFAULT_BQSTORAGE_CLIENT_INFO,
65 )
66except ImportError:
67 DEFAULT_BQSTORAGE_CLIENT_INFO = None # type: ignore
68
69
70from google.auth.credentials import Credentials
71from google.cloud.bigquery._http import Connection
72from google.cloud.bigquery import _job_helpers
73from google.cloud.bigquery import _pandas_helpers
74from google.cloud.bigquery import _versions_helpers
75from google.cloud.bigquery import enums
76from google.cloud.bigquery import exceptions as bq_exceptions
77from google.cloud.bigquery import job
78from google.cloud.bigquery._helpers import _get_sub_prop
79from google.cloud.bigquery._helpers import _record_field_to_json
80from google.cloud.bigquery._helpers import _str_or_none
81from google.cloud.bigquery._helpers import _verify_job_config_type
82from google.cloud.bigquery._helpers import _get_bigquery_host
83from google.cloud.bigquery._helpers import _DEFAULT_HOST
84from google.cloud.bigquery._helpers import _DEFAULT_HOST_TEMPLATE
85from google.cloud.bigquery._helpers import _DEFAULT_UNIVERSE
86from google.cloud.bigquery._helpers import _validate_universe
87from google.cloud.bigquery._helpers import _get_client_universe
88from google.cloud.bigquery._helpers import TimeoutType
89from google.cloud.bigquery._job_helpers import make_job_id as _make_job_id
90from google.cloud.bigquery.dataset import Dataset
91from google.cloud.bigquery.dataset import DatasetListItem
92from google.cloud.bigquery.dataset import DatasetReference
93
94from google.cloud.bigquery.enums import AutoRowIDs, DatasetView, UpdateMode
95from google.cloud.bigquery.format_options import ParquetOptions
96from google.cloud.bigquery.job import (
97 CopyJob,
98 CopyJobConfig,
99 ExtractJob,
100 ExtractJobConfig,
101 LoadJob,
102 LoadJobConfig,
103 QueryJob,
104 QueryJobConfig,
105)
106from google.cloud.bigquery.model import Model
107from google.cloud.bigquery.model import ModelReference
108from google.cloud.bigquery.model import _model_arg_to_model_ref
109from google.cloud.bigquery.opentelemetry_tracing import create_span
110from google.cloud.bigquery.query import _QueryResults
111from google.cloud.bigquery.retry import (
112 DEFAULT_JOB_RETRY,
113 DEFAULT_RETRY,
114 DEFAULT_TIMEOUT,
115 DEFAULT_GET_JOB_TIMEOUT,
116 POLLING_DEFAULT_VALUE,
117)
118from google.cloud.bigquery.routine import Routine
119from google.cloud.bigquery.routine import RoutineReference
120from google.cloud.bigquery.schema import SchemaField
121from google.cloud.bigquery.table import _table_arg_to_table
122from google.cloud.bigquery.table import _table_arg_to_table_ref
123from google.cloud.bigquery.table import Table
124from google.cloud.bigquery.table import TableListItem
125from google.cloud.bigquery.table import TableReference
126from google.cloud.bigquery.table import RowIterator
127
128pyarrow = _versions_helpers.PYARROW_VERSIONS.try_import()
129pandas = (
130 _versions_helpers.PANDAS_VERSIONS.try_import()
131) # mypy check fails because pandas import is outside module, there are type: ignore comments related to this
132
133
134ResumableTimeoutType = Union[
135 None, float, Tuple[float, float]
136] # for resumable media methods
137
138if typing.TYPE_CHECKING: # pragma: NO COVER
139 # os.PathLike is only subscriptable in Python 3.9+, thus shielding with a condition.
140 PathType = Union[str, bytes, os.PathLike[str], os.PathLike[bytes]]
141_DEFAULT_CHUNKSIZE = 100 * 1024 * 1024 # 100 MB
142_MAX_MULTIPART_SIZE = 5 * 1024 * 1024
143_DEFAULT_NUM_RETRIES = 6
144_BASE_UPLOAD_TEMPLATE = "{host}/upload/bigquery/v2/projects/{project}/jobs?uploadType="
145_MULTIPART_URL_TEMPLATE = _BASE_UPLOAD_TEMPLATE + "multipart"
146_RESUMABLE_URL_TEMPLATE = _BASE_UPLOAD_TEMPLATE + "resumable"
147_GENERIC_CONTENT_TYPE = "*/*"
148_READ_LESS_THAN_SIZE = (
149 "Size {:d} was specified but the file-like object only had " "{:d} bytes remaining."
150)
151_NEED_TABLE_ARGUMENT = (
152 "The table argument should be a table ID string, Table, or TableReference"
153)
154_LIST_ROWS_FROM_QUERY_RESULTS_FIELDS = "jobReference,totalRows,pageToken,rows"
155
156# In microbenchmarks, it's been shown that even in ideal conditions (query
157# finished, local data), requests to getQueryResults can take 10+ seconds.
158# In less-than-ideal situations, the response can take even longer, as it must
159# be able to download a full 100+ MB row in that time. Don't let the
160# connection timeout before data can be downloaded.
161# https://github.com/googleapis/python-bigquery/issues/438
162_MIN_GET_QUERY_RESULTS_TIMEOUT = 120
163
164TIMEOUT_HEADER = "X-Server-Timeout"
165
166
167class Project(object):
168 """Wrapper for resource describing a BigQuery project.
169
170 Args:
171 project_id (str): Opaque ID of the project
172
173 numeric_id (int): Numeric ID of the project
174
175 friendly_name (str): Display name of the project
176 """
177
178 def __init__(self, project_id, numeric_id, friendly_name):
179 self.project_id = project_id
180 self.numeric_id = numeric_id
181 self.friendly_name = friendly_name
182
183 @classmethod
184 def from_api_repr(cls, resource):
185 """Factory: construct an instance from a resource dict."""
186 return cls(resource["id"], resource["numericId"], resource["friendlyName"])
187
188
189class Client(ClientWithProject):
190 """Client to bundle configuration needed for API requests.
191
192 Args:
193 project (Optional[str]):
194 Project ID for the project which the client acts on behalf of.
195 Will be passed when creating a dataset / job. If not passed,
196 falls back to the default inferred from the environment.
197 credentials (Optional[google.auth.credentials.Credentials]):
198 The OAuth2 Credentials to use for this client. If not passed
199 (and if no ``_http`` object is passed), falls back to the
200 default inferred from the environment.
201 _http (Optional[requests.Session]):
202 HTTP object to make requests. Can be any object that
203 defines ``request()`` with the same interface as
204 :meth:`requests.Session.request`. If not passed, an ``_http``
205 object is created that is bound to the ``credentials`` for the
206 current object.
207 This parameter should be considered private, and could change in
208 the future.
209 location (Optional[str]):
210 Default location for jobs / datasets / tables.
211 default_query_job_config (Optional[google.cloud.bigquery.job.QueryJobConfig]):
212 Default ``QueryJobConfig``.
213 Will be merged into job configs passed into the ``query`` method.
214 default_load_job_config (Optional[google.cloud.bigquery.job.LoadJobConfig]):
215 Default ``LoadJobConfig``.
216 Will be merged into job configs passed into the ``load_table_*`` methods.
217 client_info (Optional[google.api_core.client_info.ClientInfo]):
218 The client info used to send a user-agent string along with API
219 requests. If ``None``, then default info will be used. Generally,
220 you only need to set this if you're developing your own library
221 or partner tool.
222 client_options (Optional[Union[google.api_core.client_options.ClientOptions, Dict]]):
223 Client options used to set user options on the client. API Endpoint
224 should be set through client_options.
225 default_job_creation_mode (Optional[str]):
226 Sets the default job creation mode used by query methods such as
227 query_and_wait(). For lightweight queries, JOB_CREATION_OPTIONAL is
228 generally recommended.
229
230 Raises:
231 google.auth.exceptions.DefaultCredentialsError:
232 Raised if ``credentials`` is not specified and the library fails
233 to acquire default credentials.
234 """
235
236 SCOPE = ("https://www.googleapis.com/auth/cloud-platform",) # type: ignore
237 """The scopes required for authenticating as a BigQuery consumer."""
238
239 def __init__(
240 self,
241 project: Optional[str] = None,
242 credentials: Optional[Credentials] = None,
243 _http: Optional[requests.Session] = None,
244 location: Optional[str] = None,
245 default_query_job_config: Optional[QueryJobConfig] = None,
246 default_load_job_config: Optional[LoadJobConfig] = None,
247 client_info: Optional[google.api_core.client_info.ClientInfo] = None,
248 client_options: Optional[
249 Union[google.api_core.client_options.ClientOptions, Dict[str, Any]]
250 ] = None,
251 default_job_creation_mode: Optional[str] = None,
252 ) -> None:
253 if client_options is None:
254 client_options = {}
255 if isinstance(client_options, dict):
256 client_options = google.api_core.client_options.from_dict(client_options)
257 # assert isinstance(client_options, google.api_core.client_options.ClientOptions)
258
259 super(Client, self).__init__(
260 project=project,
261 credentials=credentials,
262 client_options=client_options,
263 _http=_http,
264 )
265
266 kw_args: Dict[str, Any] = {"client_info": client_info}
267 bq_host = _get_bigquery_host()
268 kw_args["api_endpoint"] = bq_host if bq_host != _DEFAULT_HOST else None
269 client_universe = None
270 if client_options.api_endpoint:
271 api_endpoint = client_options.api_endpoint
272 kw_args["api_endpoint"] = api_endpoint
273 else:
274 client_universe = _get_client_universe(client_options)
275 if client_universe != _DEFAULT_UNIVERSE:
276 kw_args["api_endpoint"] = _DEFAULT_HOST_TEMPLATE.replace(
277 "{UNIVERSE_DOMAIN}", client_universe
278 )
279 # Ensure credentials and universe are not in conflict.
280 if hasattr(self, "_credentials") and client_universe is not None:
281 _validate_universe(client_universe, self._credentials)
282
283 self._connection = Connection(self, **kw_args)
284 self._location = location
285 self._default_load_job_config = copy.deepcopy(default_load_job_config)
286 self.default_job_creation_mode = default_job_creation_mode
287
288 # Use property setter so validation can run.
289 self.default_query_job_config = default_query_job_config
290
291 @property
292 def location(self):
293 """Default location for jobs / datasets / tables."""
294 return self._location
295
296 @property
297 def default_job_creation_mode(self):
298 """Default job creation mode used for query execution."""
299 return self._default_job_creation_mode
300
301 @default_job_creation_mode.setter
302 def default_job_creation_mode(self, value: Optional[str]):
303 self._default_job_creation_mode = value
304
305 @property
306 def default_query_job_config(self) -> Optional[QueryJobConfig]:
307 """Default ``QueryJobConfig`` or ``None``.
308
309 Will be merged into job configs passed into the ``query`` or
310 ``query_and_wait`` methods.
311 """
312 return self._default_query_job_config
313
314 @default_query_job_config.setter
315 def default_query_job_config(self, value: Optional[QueryJobConfig]):
316 if value is not None:
317 _verify_job_config_type(
318 value, QueryJobConfig, param_name="default_query_job_config"
319 )
320 self._default_query_job_config = copy.deepcopy(value)
321
322 @property
323 def default_load_job_config(self):
324 """Default ``LoadJobConfig``.
325 Will be merged into job configs passed into the ``load_table_*`` methods.
326 """
327 return self._default_load_job_config
328
329 @default_load_job_config.setter
330 def default_load_job_config(self, value: LoadJobConfig):
331 self._default_load_job_config = copy.deepcopy(value)
332
333 def close(self):
334 """Close the underlying transport objects, releasing system resources.
335
336 .. note::
337
338 The client instance can be used for making additional requests even
339 after closing, in which case the underlying connections are
340 automatically re-created.
341 """
342 self._http._auth_request.session.close()
343 self._http.close()
344
345 def get_service_account_email(
346 self,
347 project: Optional[str] = None,
348 retry: retries.Retry = DEFAULT_RETRY,
349 timeout: TimeoutType = DEFAULT_TIMEOUT,
350 ) -> str:
351 """Get the email address of the project's BigQuery service account
352
353 Example:
354
355 .. code-block:: python
356
357 from google.cloud import bigquery
358 client = bigquery.Client()
359 client.get_service_account_email()
360 # returns an email similar to: my_service_account@my-project.iam.gserviceaccount.com
361
362 Note:
363 This is the service account that BigQuery uses to manage tables
364 encrypted by a key in KMS.
365
366 Args:
367 project (Optional[str]):
368 Project ID to use for retreiving service account email.
369 Defaults to the client's project.
370 retry (Optional[google.api_core.retry.Retry]): How to retry the RPC.
371 timeout (Optional[float]):
372 The number of seconds to wait for the underlying HTTP transport
373 before using ``retry``.
374
375 Returns:
376 str:
377 service account email address
378
379 """
380 if project is None:
381 project = self.project
382 path = "/projects/%s/serviceAccount" % (project,)
383 span_attributes = {"path": path}
384 api_response = self._call_api(
385 retry,
386 span_name="BigQuery.getServiceAccountEmail",
387 span_attributes=span_attributes,
388 method="GET",
389 path=path,
390 timeout=timeout,
391 )
392 return api_response["email"]
393
394 def list_projects(
395 self,
396 max_results: Optional[int] = None,
397 page_token: Optional[str] = None,
398 retry: retries.Retry = DEFAULT_RETRY,
399 timeout: TimeoutType = DEFAULT_TIMEOUT,
400 page_size: Optional[int] = None,
401 ) -> page_iterator.Iterator:
402 """List projects for the project associated with this client.
403
404 See
405 https://cloud.google.com/bigquery/docs/reference/rest/v2/projects/list
406
407 Args:
408 max_results (Optional[int]):
409 Maximum number of projects to return.
410 Defaults to a value set by the API.
411
412 page_token (Optional[str]):
413 Token representing a cursor into the projects. If not passed,
414 the API will return the first page of projects. The token marks
415 the beginning of the iterator to be returned and the value of
416 the ``page_token`` can be accessed at ``next_page_token`` of the
417 :class:`~google.api_core.page_iterator.HTTPIterator`.
418
419 retry (Optional[google.api_core.retry.Retry]): How to retry the RPC.
420
421 timeout (Optional[float]):
422 The number of seconds to wait for the underlying HTTP transport
423 before using ``retry``.
424
425 page_size (Optional[int]):
426 Maximum number of projects to return in each page.
427 Defaults to a value set by the API.
428
429 Returns:
430 google.api_core.page_iterator.Iterator:
431 Iterator of :class:`~google.cloud.bigquery.client.Project`
432 accessible to the current client.
433 """
434 span_attributes = {"path": "/projects"}
435
436 def api_request(*args, **kwargs):
437 return self._call_api(
438 retry,
439 span_name="BigQuery.listProjects",
440 span_attributes=span_attributes,
441 *args,
442 timeout=timeout,
443 **kwargs,
444 )
445
446 return page_iterator.HTTPIterator(
447 client=self,
448 api_request=api_request,
449 path="/projects",
450 item_to_value=_item_to_project,
451 items_key="projects",
452 page_token=page_token,
453 max_results=max_results,
454 page_size=page_size,
455 )
456
457 def list_datasets(
458 self,
459 project: Optional[str] = None,
460 include_all: bool = False,
461 filter: Optional[str] = None,
462 max_results: Optional[int] = None,
463 page_token: Optional[str] = None,
464 retry: retries.Retry = DEFAULT_RETRY,
465 timeout: TimeoutType = DEFAULT_TIMEOUT,
466 page_size: Optional[int] = None,
467 ) -> page_iterator.Iterator:
468 """List datasets for the project associated with this client.
469
470 See
471 https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets/list
472
473 Args:
474 project (Optional[str]):
475 Project ID to use for retreiving datasets. Defaults to the
476 client's project.
477 include_all (Optional[bool]):
478 True if results include hidden datasets. Defaults to False.
479 filter (Optional[str]):
480 An expression for filtering the results by label.
481 For syntax, see
482 https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets/list#body.QUERY_PARAMETERS.filter
483 max_results (Optional[int]):
484 Maximum number of datasets to return.
485 page_token (Optional[str]):
486 Token representing a cursor into the datasets. If not passed,
487 the API will return the first page of datasets. The token marks
488 the beginning of the iterator to be returned and the value of
489 the ``page_token`` can be accessed at ``next_page_token`` of the
490 :class:`~google.api_core.page_iterator.HTTPIterator`.
491 retry (Optional[google.api_core.retry.Retry]):
492 How to retry the RPC.
493 timeout (Optional[float]):
494 The number of seconds to wait for the underlying HTTP transport
495 before using ``retry``.
496 page_size (Optional[int]):
497 Maximum number of datasets to return per page.
498
499 Returns:
500 google.api_core.page_iterator.Iterator:
501 Iterator of :class:`~google.cloud.bigquery.dataset.DatasetListItem`.
502 associated with the project.
503 """
504 extra_params: Dict[str, Any] = {}
505 if project is None:
506 project = self.project
507 if include_all:
508 extra_params["all"] = True
509 if filter:
510 # TODO: consider supporting a dict of label -> value for filter,
511 # and converting it into a string here.
512 extra_params["filter"] = filter
513 path = "/projects/%s/datasets" % (project,)
514
515 span_attributes = {"path": path}
516
517 def api_request(*args, **kwargs):
518 return self._call_api(
519 retry,
520 span_name="BigQuery.listDatasets",
521 span_attributes=span_attributes,
522 *args,
523 timeout=timeout,
524 **kwargs,
525 )
526
527 return page_iterator.HTTPIterator(
528 client=self,
529 api_request=api_request,
530 path=path,
531 item_to_value=_item_to_dataset,
532 items_key="datasets",
533 page_token=page_token,
534 max_results=max_results,
535 extra_params=extra_params,
536 page_size=page_size,
537 )
538
539 def dataset(
540 self, dataset_id: str, project: Optional[str] = None
541 ) -> DatasetReference:
542 """Deprecated: Construct a reference to a dataset.
543
544 .. deprecated:: 1.24.0
545 Construct a
546 :class:`~google.cloud.bigquery.dataset.DatasetReference` using its
547 constructor or use a string where previously a reference object
548 was used.
549
550 As of ``google-cloud-bigquery`` version 1.7.0, all client methods
551 that take a
552 :class:`~google.cloud.bigquery.dataset.DatasetReference` or
553 :class:`~google.cloud.bigquery.table.TableReference` also take a
554 string in standard SQL format, e.g. ``project.dataset_id`` or
555 ``project.dataset_id.table_id``.
556
557 Args:
558 dataset_id (str): ID of the dataset.
559
560 project (Optional[str]):
561 Project ID for the dataset (defaults to the project of the client).
562
563 Returns:
564 google.cloud.bigquery.dataset.DatasetReference:
565 a new ``DatasetReference`` instance.
566 """
567 if project is None:
568 project = self.project
569
570 warnings.warn(
571 "Client.dataset is deprecated and will be removed in a future version. "
572 "Use a string like 'my_project.my_dataset' or a "
573 "cloud.google.bigquery.DatasetReference object, instead.",
574 PendingDeprecationWarning,
575 stacklevel=2,
576 )
577 return DatasetReference(project, dataset_id)
578
579 def _ensure_bqstorage_client(
580 self,
581 bqstorage_client: Optional[
582 "google.cloud.bigquery_storage.BigQueryReadClient"
583 ] = None,
584 client_options: Optional[google.api_core.client_options.ClientOptions] = None,
585 client_info: Optional[
586 "google.api_core.gapic_v1.client_info.ClientInfo"
587 ] = DEFAULT_BQSTORAGE_CLIENT_INFO,
588 ) -> Optional["google.cloud.bigquery_storage.BigQueryReadClient"]:
589 """Create a BigQuery Storage API client using this client's credentials.
590
591 Args:
592 bqstorage_client:
593 An existing BigQuery Storage client instance. If ``None``, a new
594 instance is created and returned.
595 client_options:
596 Custom options used with a new BigQuery Storage client instance
597 if one is created.
598 client_info:
599 The client info used with a new BigQuery Storage client
600 instance if one is created.
601
602 Returns:
603 A BigQuery Storage API client.
604 """
605
606 try:
607 bigquery_storage = _versions_helpers.BQ_STORAGE_VERSIONS.try_import(
608 raise_if_error=True
609 )
610 except bq_exceptions.BigQueryStorageNotFoundError:
611 warnings.warn(
612 "Cannot create BigQuery Storage client, the dependency "
613 "google-cloud-bigquery-storage is not installed."
614 )
615 return None
616 except bq_exceptions.LegacyBigQueryStorageError as exc:
617 warnings.warn(
618 "Dependency google-cloud-bigquery-storage is outdated: " + str(exc)
619 )
620 return None
621
622 if bqstorage_client is None: # pragma: NO COVER
623 bqstorage_client = bigquery_storage.BigQueryReadClient(
624 credentials=self._credentials,
625 client_options=client_options,
626 client_info=client_info, # type: ignore # (None is also accepted)
627 )
628
629 return bqstorage_client
630
631 def _dataset_from_arg(self, dataset) -> Union[Dataset, DatasetReference]:
632 if isinstance(dataset, str):
633 dataset = DatasetReference.from_string(
634 dataset, default_project=self.project
635 )
636
637 if not isinstance(dataset, (Dataset, DatasetReference)):
638 if isinstance(dataset, DatasetListItem):
639 dataset = dataset.reference
640 else:
641 raise TypeError(
642 "dataset must be a Dataset, DatasetReference, DatasetListItem,"
643 " or string"
644 )
645 return dataset
646
647 def create_dataset(
648 self,
649 dataset: Union[str, Dataset, DatasetReference, DatasetListItem],
650 exists_ok: bool = False,
651 retry: retries.Retry = DEFAULT_RETRY,
652 timeout: TimeoutType = DEFAULT_TIMEOUT,
653 ) -> Dataset:
654 """API call: create the dataset via a POST request.
655
656
657 See
658 https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets/insert
659
660 Example:
661
662 .. code-block:: python
663
664 from google.cloud import bigquery
665 client = bigquery.Client()
666 dataset = bigquery.Dataset('my_project.my_dataset')
667 dataset = client.create_dataset(dataset)
668
669 Args:
670 dataset (Union[ \
671 google.cloud.bigquery.dataset.Dataset, \
672 google.cloud.bigquery.dataset.DatasetReference, \
673 google.cloud.bigquery.dataset.DatasetListItem, \
674 str, \
675 ]):
676 A :class:`~google.cloud.bigquery.dataset.Dataset` to create.
677 If ``dataset`` is a reference, an empty dataset is created
678 with the specified ID and client's default location.
679 exists_ok (Optional[bool]):
680 Defaults to ``False``. If ``True``, ignore "already exists"
681 errors when creating the dataset.
682 retry (Optional[google.api_core.retry.Retry]):
683 How to retry the RPC.
684 timeout (Optional[float]):
685 The number of seconds to wait for the underlying HTTP transport
686 before using ``retry``.
687
688 Returns:
689 google.cloud.bigquery.dataset.Dataset:
690 A new ``Dataset`` returned from the API.
691
692 Raises:
693 google.cloud.exceptions.Conflict:
694 If the dataset already exists.
695 """
696 dataset = self._dataset_from_arg(dataset)
697 if isinstance(dataset, DatasetReference):
698 dataset = Dataset(dataset)
699
700 path = "/projects/%s/datasets" % (dataset.project,)
701
702 data = dataset.to_api_repr()
703 if data.get("location") is None and self.location is not None:
704 data["location"] = self.location
705
706 try:
707 span_attributes = {"path": path}
708
709 api_response = self._call_api(
710 retry,
711 span_name="BigQuery.createDataset",
712 span_attributes=span_attributes,
713 method="POST",
714 path=path,
715 data=data,
716 timeout=timeout,
717 )
718 return Dataset.from_api_repr(api_response)
719 except core_exceptions.Conflict:
720 if not exists_ok:
721 raise
722 return self.get_dataset(dataset.reference, retry=retry)
723
724 def create_routine(
725 self,
726 routine: Routine,
727 exists_ok: bool = False,
728 retry: retries.Retry = DEFAULT_RETRY,
729 timeout: TimeoutType = DEFAULT_TIMEOUT,
730 ) -> Routine:
731 """[Beta] Create a routine via a POST request.
732
733 See
734 https://cloud.google.com/bigquery/docs/reference/rest/v2/routines/insert
735
736 Args:
737 routine (google.cloud.bigquery.routine.Routine):
738 A :class:`~google.cloud.bigquery.routine.Routine` to create.
739 The dataset that the routine belongs to must already exist.
740 exists_ok (Optional[bool]):
741 Defaults to ``False``. If ``True``, ignore "already exists"
742 errors when creating the routine.
743 retry (Optional[google.api_core.retry.Retry]):
744 How to retry the RPC.
745 timeout (Optional[float]):
746 The number of seconds to wait for the underlying HTTP transport
747 before using ``retry``.
748
749 Returns:
750 google.cloud.bigquery.routine.Routine:
751 A new ``Routine`` returned from the service.
752
753 Raises:
754 google.cloud.exceptions.Conflict:
755 If the routine already exists.
756 """
757 reference = routine.reference
758 path = "/projects/{}/datasets/{}/routines".format(
759 reference.project, reference.dataset_id
760 )
761 resource = routine.to_api_repr()
762 try:
763 span_attributes = {"path": path}
764 api_response = self._call_api(
765 retry,
766 span_name="BigQuery.createRoutine",
767 span_attributes=span_attributes,
768 method="POST",
769 path=path,
770 data=resource,
771 timeout=timeout,
772 )
773 return Routine.from_api_repr(api_response)
774 except core_exceptions.Conflict:
775 if not exists_ok:
776 raise
777 return self.get_routine(routine.reference, retry=retry)
778
779 def create_table(
780 self,
781 table: Union[str, Table, TableReference, TableListItem],
782 exists_ok: bool = False,
783 retry: retries.Retry = DEFAULT_RETRY,
784 timeout: TimeoutType = DEFAULT_TIMEOUT,
785 ) -> Table:
786 """API call: create a table via a PUT request
787
788 See
789 https://cloud.google.com/bigquery/docs/reference/rest/v2/tables/insert
790
791 Args:
792 table (Union[ \
793 google.cloud.bigquery.table.Table, \
794 google.cloud.bigquery.table.TableReference, \
795 google.cloud.bigquery.table.TableListItem, \
796 str, \
797 ]):
798 A :class:`~google.cloud.bigquery.table.Table` to create.
799 If ``table`` is a reference, an empty table is created
800 with the specified ID. The dataset that the table belongs to
801 must already exist.
802 exists_ok (Optional[bool]):
803 Defaults to ``False``. If ``True``, ignore "already exists"
804 errors when creating the table.
805 retry (Optional[google.api_core.retry.Retry]):
806 How to retry the RPC.
807 timeout (Optional[float]):
808 The number of seconds to wait for the underlying HTTP transport
809 before using ``retry``.
810
811 Returns:
812 google.cloud.bigquery.table.Table:
813 A new ``Table`` returned from the service.
814
815 Raises:
816 google.cloud.exceptions.Conflict:
817 If the table already exists.
818 """
819 table = _table_arg_to_table(table, default_project=self.project)
820 dataset_id = table.dataset_id
821 path = "/projects/%s/datasets/%s/tables" % (table.project, dataset_id)
822 data = table.to_api_repr()
823 try:
824 span_attributes = {"path": path, "dataset_id": dataset_id}
825 api_response = self._call_api(
826 retry,
827 span_name="BigQuery.createTable",
828 span_attributes=span_attributes,
829 method="POST",
830 path=path,
831 data=data,
832 timeout=timeout,
833 )
834 return Table.from_api_repr(api_response)
835 except core_exceptions.Conflict:
836 if not exists_ok:
837 raise
838 return self.get_table(table.reference, retry=retry)
839
840 def _call_api(
841 self,
842 retry,
843 span_name=None,
844 span_attributes=None,
845 job_ref=None,
846 headers: Optional[Dict[str, str]] = None,
847 **kwargs,
848 ):
849 kwargs = _add_server_timeout_header(headers, kwargs)
850 call = functools.partial(self._connection.api_request, **kwargs)
851
852 if retry:
853 call = retry(call)
854
855 if span_name is not None:
856 with create_span(
857 name=span_name, attributes=span_attributes, client=self, job_ref=job_ref
858 ):
859 return call()
860
861 return call()
862
863 def get_dataset(
864 self,
865 dataset_ref: Union[DatasetReference, str],
866 retry: retries.Retry = DEFAULT_RETRY,
867 timeout: TimeoutType = DEFAULT_TIMEOUT,
868 dataset_view: Optional[DatasetView] = None,
869 ) -> Dataset:
870 """Fetch the dataset referenced by ``dataset_ref``
871
872 Args:
873 dataset_ref (Union[ \
874 google.cloud.bigquery.dataset.DatasetReference, \
875 str, \
876 ]):
877 A reference to the dataset to fetch from the BigQuery API.
878 If a string is passed in, this method attempts to create a
879 dataset reference from a string using
880 :func:`~google.cloud.bigquery.dataset.DatasetReference.from_string`.
881 retry (Optional[google.api_core.retry.Retry]):
882 How to retry the RPC.
883 timeout (Optional[float]):
884 The number of seconds to wait for the underlying HTTP transport
885 before using ``retry``.
886 dataset_view (Optional[google.cloud.bigquery.enums.DatasetView]):
887 Specifies the view that determines which dataset information is
888 returned. By default, dataset metadata (e.g. friendlyName, description,
889 labels, etc) and ACL information are returned. This argument can
890 take on the following possible enum values.
891
892 * :attr:`~google.cloud.bigquery.enums.DatasetView.ACL`:
893 Includes dataset metadata and the ACL.
894 * :attr:`~google.cloud.bigquery.enums.DatasetView.FULL`:
895 Includes all dataset metadata, including the ACL and table metadata.
896 This view is not supported by the `datasets.list` API method.
897 * :attr:`~google.cloud.bigquery.enums.DatasetView.METADATA`:
898 Includes basic dataset metadata, but not the ACL.
899 * :attr:`~google.cloud.bigquery.enums.DatasetView.DATASET_VIEW_UNSPECIFIED`:
900 The server will decide which view to use. Currently defaults to FULL.
901 Returns:
902 google.cloud.bigquery.dataset.Dataset:
903 A ``Dataset`` instance.
904 """
905 if isinstance(dataset_ref, str):
906 dataset_ref = DatasetReference.from_string(
907 dataset_ref, default_project=self.project
908 )
909 path = dataset_ref.path
910
911 if dataset_view:
912 query_params = {"datasetView": dataset_view.value}
913 else:
914 query_params = {}
915
916 span_attributes = {"path": path}
917 api_response = self._call_api(
918 retry,
919 span_name="BigQuery.getDataset",
920 span_attributes=span_attributes,
921 method="GET",
922 path=path,
923 timeout=timeout,
924 query_params=query_params,
925 )
926 return Dataset.from_api_repr(api_response)
927
928 def get_iam_policy(
929 self,
930 table: Union[Table, TableReference, TableListItem, str],
931 requested_policy_version: int = 1,
932 retry: retries.Retry = DEFAULT_RETRY,
933 timeout: TimeoutType = DEFAULT_TIMEOUT,
934 ) -> Policy:
935 """Return the access control policy for a table resource.
936
937 Args:
938 table (Union[ \
939 google.cloud.bigquery.table.Table, \
940 google.cloud.bigquery.table.TableReference, \
941 google.cloud.bigquery.table.TableListItem, \
942 str, \
943 ]):
944 The table to get the access control policy for.
945 If a string is passed in, this method attempts to create a
946 table reference from a string using
947 :func:`~google.cloud.bigquery.table.TableReference.from_string`.
948 requested_policy_version (int):
949 Optional. The maximum policy version that will be used to format the policy.
950
951 Only version ``1`` is currently supported.
952
953 See: https://cloud.google.com/bigquery/docs/reference/rest/v2/GetPolicyOptions
954 retry (Optional[google.api_core.retry.Retry]):
955 How to retry the RPC.
956 timeout (Optional[float]):
957 The number of seconds to wait for the underlying HTTP transport
958 before using ``retry``.
959
960 Returns:
961 google.api_core.iam.Policy:
962 The access control policy.
963 """
964 table = _table_arg_to_table_ref(table, default_project=self.project)
965
966 if requested_policy_version != 1:
967 raise ValueError("only IAM policy version 1 is supported")
968
969 body = {"options": {"requestedPolicyVersion": 1}}
970
971 path = "{}:getIamPolicy".format(table.path)
972 span_attributes = {"path": path}
973 response = self._call_api(
974 retry,
975 span_name="BigQuery.getIamPolicy",
976 span_attributes=span_attributes,
977 method="POST",
978 path=path,
979 data=body,
980 timeout=timeout,
981 )
982
983 return Policy.from_api_repr(response)
984
985 def set_iam_policy(
986 self,
987 table: Union[Table, TableReference, TableListItem, str],
988 policy: Policy,
989 updateMask: Optional[str] = None,
990 retry: retries.Retry = DEFAULT_RETRY,
991 timeout: TimeoutType = DEFAULT_TIMEOUT,
992 *,
993 fields: Sequence[str] = (),
994 ) -> Policy:
995 """Return the access control policy for a table resource.
996
997 Args:
998 table (Union[ \
999 google.cloud.bigquery.table.Table, \
1000 google.cloud.bigquery.table.TableReference, \
1001 google.cloud.bigquery.table.TableListItem, \
1002 str, \
1003 ]):
1004 The table to get the access control policy for.
1005 If a string is passed in, this method attempts to create a
1006 table reference from a string using
1007 :func:`~google.cloud.bigquery.table.TableReference.from_string`.
1008 policy (google.api_core.iam.Policy):
1009 The access control policy to set.
1010 updateMask (Optional[str]):
1011 Mask as defined by
1012 https://cloud.google.com/bigquery/docs/reference/rest/v2/tables/setIamPolicy#body.request_body.FIELDS.update_mask
1013
1014 Incompatible with ``fields``.
1015 retry (Optional[google.api_core.retry.Retry]):
1016 How to retry the RPC.
1017 timeout (Optional[float]):
1018 The number of seconds to wait for the underlying HTTP transport
1019 before using ``retry``.
1020 fields (Sequence[str]):
1021 Which properties to set on the policy. See:
1022 https://cloud.google.com/bigquery/docs/reference/rest/v2/tables/setIamPolicy#body.request_body.FIELDS.update_mask
1023
1024 Incompatible with ``updateMask``.
1025
1026 Returns:
1027 google.api_core.iam.Policy:
1028 The updated access control policy.
1029 """
1030 if updateMask is not None and not fields:
1031 update_mask = updateMask
1032 elif updateMask is not None and fields:
1033 raise ValueError("Cannot set both fields and updateMask")
1034 elif fields:
1035 update_mask = ",".join(fields)
1036 else:
1037 update_mask = None
1038
1039 table = _table_arg_to_table_ref(table, default_project=self.project)
1040
1041 if not isinstance(policy, (Policy)):
1042 raise TypeError("policy must be a Policy")
1043
1044 body = {"policy": policy.to_api_repr()}
1045
1046 if update_mask is not None:
1047 body["updateMask"] = update_mask
1048
1049 path = "{}:setIamPolicy".format(table.path)
1050 span_attributes = {"path": path}
1051
1052 response = self._call_api(
1053 retry,
1054 span_name="BigQuery.setIamPolicy",
1055 span_attributes=span_attributes,
1056 method="POST",
1057 path=path,
1058 data=body,
1059 timeout=timeout,
1060 )
1061
1062 return Policy.from_api_repr(response)
1063
1064 def test_iam_permissions(
1065 self,
1066 table: Union[Table, TableReference, TableListItem, str],
1067 permissions: Sequence[str],
1068 retry: retries.Retry = DEFAULT_RETRY,
1069 timeout: TimeoutType = DEFAULT_TIMEOUT,
1070 ) -> Dict[str, Any]:
1071 table = _table_arg_to_table_ref(table, default_project=self.project)
1072
1073 body = {"permissions": permissions}
1074
1075 path = "{}:testIamPermissions".format(table.path)
1076 span_attributes = {"path": path}
1077 response = self._call_api(
1078 retry,
1079 span_name="BigQuery.testIamPermissions",
1080 span_attributes=span_attributes,
1081 method="POST",
1082 path=path,
1083 data=body,
1084 timeout=timeout,
1085 )
1086
1087 return response
1088
1089 def get_model(
1090 self,
1091 model_ref: Union[ModelReference, str],
1092 retry: retries.Retry = DEFAULT_RETRY,
1093 timeout: TimeoutType = DEFAULT_TIMEOUT,
1094 ) -> Model:
1095 """[Beta] Fetch the model referenced by ``model_ref``.
1096
1097 Args:
1098 model_ref (Union[ \
1099 google.cloud.bigquery.model.ModelReference, \
1100 str, \
1101 ]):
1102 A reference to the model to fetch from the BigQuery API.
1103 If a string is passed in, this method attempts to create a
1104 model reference from a string using
1105 :func:`google.cloud.bigquery.model.ModelReference.from_string`.
1106 retry (Optional[google.api_core.retry.Retry]):
1107 How to retry the RPC.
1108 timeout (Optional[float]):
1109 The number of seconds to wait for the underlying HTTP transport
1110 before using ``retry``.
1111
1112 Returns:
1113 google.cloud.bigquery.model.Model: A ``Model`` instance.
1114 """
1115 if isinstance(model_ref, str):
1116 model_ref = ModelReference.from_string(
1117 model_ref, default_project=self.project
1118 )
1119 path = model_ref.path
1120 span_attributes = {"path": path}
1121
1122 api_response = self._call_api(
1123 retry,
1124 span_name="BigQuery.getModel",
1125 span_attributes=span_attributes,
1126 method="GET",
1127 path=path,
1128 timeout=timeout,
1129 )
1130 return Model.from_api_repr(api_response)
1131
1132 def get_routine(
1133 self,
1134 routine_ref: Union[Routine, RoutineReference, str],
1135 retry: retries.Retry = DEFAULT_RETRY,
1136 timeout: TimeoutType = DEFAULT_TIMEOUT,
1137 ) -> Routine:
1138 """[Beta] Get the routine referenced by ``routine_ref``.
1139
1140 Args:
1141 routine_ref (Union[ \
1142 google.cloud.bigquery.routine.Routine, \
1143 google.cloud.bigquery.routine.RoutineReference, \
1144 str, \
1145 ]):
1146 A reference to the routine to fetch from the BigQuery API. If
1147 a string is passed in, this method attempts to create a
1148 reference from a string using
1149 :func:`google.cloud.bigquery.routine.RoutineReference.from_string`.
1150 retry (Optional[google.api_core.retry.Retry]):
1151 How to retry the API call.
1152 timeout (Optional[float]):
1153 The number of seconds to wait for the underlying HTTP transport
1154 before using ``retry``.
1155
1156 Returns:
1157 google.cloud.bigquery.routine.Routine:
1158 A ``Routine`` instance.
1159 """
1160 if isinstance(routine_ref, str):
1161 routine_ref = RoutineReference.from_string(
1162 routine_ref, default_project=self.project
1163 )
1164 path = routine_ref.path
1165 span_attributes = {"path": path}
1166 api_response = self._call_api(
1167 retry,
1168 span_name="BigQuery.getRoutine",
1169 span_attributes=span_attributes,
1170 method="GET",
1171 path=path,
1172 timeout=timeout,
1173 )
1174 return Routine.from_api_repr(api_response)
1175
1176 def get_table(
1177 self,
1178 table: Union[Table, TableReference, TableListItem, str],
1179 retry: retries.Retry = DEFAULT_RETRY,
1180 timeout: TimeoutType = DEFAULT_TIMEOUT,
1181 ) -> Table:
1182 """Fetch the table referenced by ``table``.
1183
1184 Args:
1185 table (Union[ \
1186 google.cloud.bigquery.table.Table, \
1187 google.cloud.bigquery.table.TableReference, \
1188 google.cloud.bigquery.table.TableListItem, \
1189 str, \
1190 ]):
1191 A reference to the table to fetch from the BigQuery API.
1192 If a string is passed in, this method attempts to create a
1193 table reference from a string using
1194 :func:`google.cloud.bigquery.table.TableReference.from_string`.
1195 retry (Optional[google.api_core.retry.Retry]):
1196 How to retry the RPC.
1197 timeout (Optional[float]):
1198 The number of seconds to wait for the underlying HTTP transport
1199 before using ``retry``.
1200
1201 Returns:
1202 google.cloud.bigquery.table.Table:
1203 A ``Table`` instance.
1204 """
1205 table_ref = _table_arg_to_table_ref(table, default_project=self.project)
1206 path = table_ref.path
1207 span_attributes = {"path": path}
1208 api_response = self._call_api(
1209 retry,
1210 span_name="BigQuery.getTable",
1211 span_attributes=span_attributes,
1212 method="GET",
1213 path=path,
1214 timeout=timeout,
1215 )
1216 return Table.from_api_repr(api_response)
1217
1218 def update_dataset(
1219 self,
1220 dataset: Dataset,
1221 fields: Sequence[str],
1222 retry: retries.Retry = DEFAULT_RETRY,
1223 timeout: TimeoutType = DEFAULT_TIMEOUT,
1224 update_mode: Optional[UpdateMode] = None,
1225 ) -> Dataset:
1226 """Change some fields of a dataset.
1227
1228 Use ``fields`` to specify which fields to update. At least one field
1229 must be provided. If a field is listed in ``fields`` and is ``None`` in
1230 ``dataset``, it will be deleted.
1231
1232 For example, to update the default expiration times, specify
1233 both properties in the ``fields`` argument:
1234
1235 .. code-block:: python
1236
1237 bigquery_client.update_dataset(
1238 dataset,
1239 [
1240 "default_partition_expiration_ms",
1241 "default_table_expiration_ms",
1242 ]
1243 )
1244
1245 If ``dataset.etag`` is not ``None``, the update will only
1246 succeed if the dataset on the server has the same ETag. Thus
1247 reading a dataset with ``get_dataset``, changing its fields,
1248 and then passing it to ``update_dataset`` will ensure that the changes
1249 will only be saved if no modifications to the dataset occurred
1250 since the read.
1251
1252 Args:
1253 dataset (google.cloud.bigquery.dataset.Dataset):
1254 The dataset to update.
1255 fields (Sequence[str]):
1256 The properties of ``dataset`` to change. These are strings
1257 corresponding to the properties of
1258 :class:`~google.cloud.bigquery.dataset.Dataset`.
1259 retry (Optional[google.api_core.retry.Retry]):
1260 How to retry the RPC.
1261 timeout (Optional[float]):
1262 The number of seconds to wait for the underlying HTTP transport
1263 before using ``retry``.
1264 update_mode (Optional[google.cloud.bigquery.enums.UpdateMode]):
1265 Specifies the kind of information to update in a dataset.
1266 By default, dataset metadata (e.g. friendlyName, description,
1267 labels, etc) and ACL information are updated. This argument can
1268 take on the following possible enum values.
1269
1270 * :attr:`~google.cloud.bigquery.enums.UPDATE_MODE_UNSPECIFIED`:
1271 The default value. Behavior defaults to UPDATE_FULL.
1272 * :attr:`~google.cloud.bigquery.enums.UpdateMode.UPDATE_METADATA`:
1273 Includes metadata information for the dataset, such as friendlyName, description, labels, etc.
1274 * :attr:`~google.cloud.bigquery.enums.UpdateMode.UPDATE_ACL`:
1275 Includes ACL information for the dataset, which defines dataset access for one or more entities.
1276 * :attr:`~google.cloud.bigquery.enums.UpdateMode.UPDATE_FULL`:
1277 Includes both dataset metadata and ACL information.
1278
1279 Returns:
1280 google.cloud.bigquery.dataset.Dataset:
1281 The modified ``Dataset`` instance.
1282 """
1283 partial = dataset._build_resource(fields)
1284 if dataset.etag is not None:
1285 headers: Optional[Dict[str, str]] = {"If-Match": dataset.etag}
1286 else:
1287 headers = None
1288 path = dataset.path
1289 span_attributes = {"path": path, "fields": fields}
1290
1291 if update_mode:
1292 query_params = {"updateMode": update_mode.value}
1293 else:
1294 query_params = {}
1295
1296 api_response = self._call_api(
1297 retry,
1298 span_name="BigQuery.updateDataset",
1299 span_attributes=span_attributes,
1300 method="PATCH",
1301 path=path,
1302 data=partial,
1303 headers=headers,
1304 timeout=timeout,
1305 query_params=query_params,
1306 )
1307 return Dataset.from_api_repr(api_response)
1308
1309 def update_model(
1310 self,
1311 model: Model,
1312 fields: Sequence[str],
1313 retry: retries.Retry = DEFAULT_RETRY,
1314 timeout: TimeoutType = DEFAULT_TIMEOUT,
1315 ) -> Model:
1316 """[Beta] Change some fields of a model.
1317
1318 Use ``fields`` to specify which fields to update. At least one field
1319 must be provided. If a field is listed in ``fields`` and is ``None``
1320 in ``model``, the field value will be deleted.
1321
1322 For example, to update the descriptive properties of the model,
1323 specify them in the ``fields`` argument:
1324
1325 .. code-block:: python
1326
1327 bigquery_client.update_model(
1328 model, ["description", "friendly_name"]
1329 )
1330
1331 If ``model.etag`` is not ``None``, the update will only succeed if
1332 the model on the server has the same ETag. Thus reading a model with
1333 ``get_model``, changing its fields, and then passing it to
1334 ``update_model`` will ensure that the changes will only be saved if
1335 no modifications to the model occurred since the read.
1336
1337 Args:
1338 model (google.cloud.bigquery.model.Model): The model to update.
1339 fields (Sequence[str]):
1340 The properties of ``model`` to change. These are strings
1341 corresponding to the properties of
1342 :class:`~google.cloud.bigquery.model.Model`.
1343 retry (Optional[google.api_core.retry.Retry]):
1344 A description of how to retry the API call.
1345 timeout (Optional[float]):
1346 The number of seconds to wait for the underlying HTTP transport
1347 before using ``retry``.
1348
1349 Returns:
1350 google.cloud.bigquery.model.Model:
1351 The model resource returned from the API call.
1352 """
1353 partial = model._build_resource(fields)
1354 if model.etag:
1355 headers: Optional[Dict[str, str]] = {"If-Match": model.etag}
1356 else:
1357 headers = None
1358 path = model.path
1359 span_attributes = {"path": path, "fields": fields}
1360
1361 api_response = self._call_api(
1362 retry,
1363 span_name="BigQuery.updateModel",
1364 span_attributes=span_attributes,
1365 method="PATCH",
1366 path=path,
1367 data=partial,
1368 headers=headers,
1369 timeout=timeout,
1370 )
1371 return Model.from_api_repr(api_response)
1372
1373 def update_routine(
1374 self,
1375 routine: Routine,
1376 fields: Sequence[str],
1377 retry: retries.Retry = DEFAULT_RETRY,
1378 timeout: TimeoutType = DEFAULT_TIMEOUT,
1379 ) -> Routine:
1380 """[Beta] Change some fields of a routine.
1381
1382 Use ``fields`` to specify which fields to update. At least one field
1383 must be provided. If a field is listed in ``fields`` and is ``None``
1384 in ``routine``, the field value will be deleted.
1385
1386 For example, to update the description property of the routine,
1387 specify it in the ``fields`` argument:
1388
1389 .. code-block:: python
1390
1391 bigquery_client.update_routine(
1392 routine, ["description"]
1393 )
1394
1395 .. warning::
1396 During beta, partial updates are not supported. You must provide
1397 all fields in the resource.
1398
1399 If :attr:`~google.cloud.bigquery.routine.Routine.etag` is not
1400 ``None``, the update will only succeed if the resource on the server
1401 has the same ETag. Thus reading a routine with
1402 :func:`~google.cloud.bigquery.client.Client.get_routine`, changing
1403 its fields, and then passing it to this method will ensure that the
1404 changes will only be saved if no modifications to the resource
1405 occurred since the read.
1406
1407 Args:
1408 routine (google.cloud.bigquery.routine.Routine):
1409 The routine to update.
1410 fields (Sequence[str]):
1411 The fields of ``routine`` to change, spelled as the
1412 :class:`~google.cloud.bigquery.routine.Routine` properties.
1413 retry (Optional[google.api_core.retry.Retry]):
1414 A description of how to retry the API call.
1415 timeout (Optional[float]):
1416 The number of seconds to wait for the underlying HTTP transport
1417 before using ``retry``.
1418
1419 Returns:
1420 google.cloud.bigquery.routine.Routine:
1421 The routine resource returned from the API call.
1422 """
1423 partial = routine._build_resource(fields)
1424 if routine.etag:
1425 headers: Optional[Dict[str, str]] = {"If-Match": routine.etag}
1426 else:
1427 headers = None
1428
1429 # TODO: remove when routines update supports partial requests.
1430 partial["routineReference"] = routine.reference.to_api_repr()
1431
1432 path = routine.path
1433 span_attributes = {"path": path, "fields": fields}
1434
1435 api_response = self._call_api(
1436 retry,
1437 span_name="BigQuery.updateRoutine",
1438 span_attributes=span_attributes,
1439 method="PUT",
1440 path=path,
1441 data=partial,
1442 headers=headers,
1443 timeout=timeout,
1444 )
1445 return Routine.from_api_repr(api_response)
1446
1447 def update_table(
1448 self,
1449 table: Table,
1450 fields: Sequence[str],
1451 autodetect_schema: bool = False,
1452 retry: retries.Retry = DEFAULT_RETRY,
1453 timeout: TimeoutType = DEFAULT_TIMEOUT,
1454 ) -> Table:
1455 """Change some fields of a table.
1456
1457 Use ``fields`` to specify which fields to update. At least one field
1458 must be provided. If a field is listed in ``fields`` and is ``None``
1459 in ``table``, the field value will be deleted.
1460
1461 For example, to update the descriptive properties of the table,
1462 specify them in the ``fields`` argument:
1463
1464 .. code-block:: python
1465
1466 bigquery_client.update_table(
1467 table,
1468 ["description", "friendly_name"]
1469 )
1470
1471 If ``table.etag`` is not ``None``, the update will only succeed if
1472 the table on the server has the same ETag. Thus reading a table with
1473 ``get_table``, changing its fields, and then passing it to
1474 ``update_table`` will ensure that the changes will only be saved if
1475 no modifications to the table occurred since the read.
1476
1477 Args:
1478 table (google.cloud.bigquery.table.Table): The table to update.
1479 fields (Sequence[str]):
1480 The fields of ``table`` to change, spelled as the
1481 :class:`~google.cloud.bigquery.table.Table` properties.
1482 autodetect_schema (bool):
1483 Specifies if the schema of the table should be autodetected when
1484 updating the table from the underlying source. Only applicable
1485 for external tables.
1486 retry (Optional[google.api_core.retry.Retry]):
1487 A description of how to retry the API call.
1488 timeout (Optional[float]):
1489 The number of seconds to wait for the underlying HTTP transport
1490 before using ``retry``.
1491
1492 Returns:
1493 google.cloud.bigquery.table.Table:
1494 The table resource returned from the API call.
1495 """
1496 partial = table._build_resource(fields)
1497 if table.etag is not None:
1498 headers: Optional[Dict[str, str]] = {"If-Match": table.etag}
1499 else:
1500 headers = None
1501
1502 path = table.path
1503 span_attributes = {"path": path, "fields": fields}
1504
1505 if autodetect_schema:
1506 query_params = {"autodetect_schema": True}
1507 else:
1508 query_params = {}
1509
1510 api_response = self._call_api(
1511 retry,
1512 span_name="BigQuery.updateTable",
1513 span_attributes=span_attributes,
1514 method="PATCH",
1515 path=path,
1516 query_params=query_params,
1517 data=partial,
1518 headers=headers,
1519 timeout=timeout,
1520 )
1521 return Table.from_api_repr(api_response)
1522
1523 def list_models(
1524 self,
1525 dataset: Union[Dataset, DatasetReference, DatasetListItem, str],
1526 max_results: Optional[int] = None,
1527 page_token: Optional[str] = None,
1528 retry: retries.Retry = DEFAULT_RETRY,
1529 timeout: TimeoutType = DEFAULT_TIMEOUT,
1530 page_size: Optional[int] = None,
1531 ) -> page_iterator.Iterator:
1532 """[Beta] List models in the dataset.
1533
1534 See
1535 https://cloud.google.com/bigquery/docs/reference/rest/v2/models/list
1536
1537 Args:
1538 dataset (Union[ \
1539 google.cloud.bigquery.dataset.Dataset, \
1540 google.cloud.bigquery.dataset.DatasetReference, \
1541 google.cloud.bigquery.dataset.DatasetListItem, \
1542 str, \
1543 ]):
1544 A reference to the dataset whose models to list from the
1545 BigQuery API. If a string is passed in, this method attempts
1546 to create a dataset reference from a string using
1547 :func:`google.cloud.bigquery.dataset.DatasetReference.from_string`.
1548 max_results (Optional[int]):
1549 Maximum number of models to return. Defaults to a
1550 value set by the API.
1551 page_token (Optional[str]):
1552 Token representing a cursor into the models. If not passed,
1553 the API will return the first page of models. The token marks
1554 the beginning of the iterator to be returned and the value of
1555 the ``page_token`` can be accessed at ``next_page_token`` of the
1556 :class:`~google.api_core.page_iterator.HTTPIterator`.
1557 retry (Optional[google.api_core.retry.Retry]):
1558 How to retry the RPC.
1559 timeout (Optional[float]):
1560 The number of seconds to wait for the underlying HTTP transport
1561 before using ``retry``.
1562 page_size (Optional[int]):
1563 Maximum number of models to return per page.
1564 Defaults to a value set by the API.
1565
1566 Returns:
1567 google.api_core.page_iterator.Iterator:
1568 Iterator of
1569 :class:`~google.cloud.bigquery.model.Model` contained
1570 within the requested dataset.
1571 """
1572 dataset = self._dataset_from_arg(dataset)
1573
1574 path = "%s/models" % dataset.path
1575 span_attributes = {"path": path}
1576
1577 def api_request(*args, **kwargs):
1578 return self._call_api(
1579 retry,
1580 span_name="BigQuery.listModels",
1581 span_attributes=span_attributes,
1582 *args,
1583 timeout=timeout,
1584 **kwargs,
1585 )
1586
1587 result = page_iterator.HTTPIterator(
1588 client=self,
1589 api_request=api_request,
1590 path=path,
1591 item_to_value=_item_to_model,
1592 items_key="models",
1593 page_token=page_token,
1594 max_results=max_results,
1595 page_size=page_size,
1596 )
1597 result.dataset = dataset # type: ignore
1598 return result
1599
1600 def list_routines(
1601 self,
1602 dataset: Union[Dataset, DatasetReference, DatasetListItem, str],
1603 max_results: Optional[int] = None,
1604 page_token: Optional[str] = None,
1605 retry: retries.Retry = DEFAULT_RETRY,
1606 timeout: TimeoutType = DEFAULT_TIMEOUT,
1607 page_size: Optional[int] = None,
1608 ) -> page_iterator.Iterator:
1609 """[Beta] List routines in the dataset.
1610
1611 See
1612 https://cloud.google.com/bigquery/docs/reference/rest/v2/routines/list
1613
1614 Args:
1615 dataset (Union[ \
1616 google.cloud.bigquery.dataset.Dataset, \
1617 google.cloud.bigquery.dataset.DatasetReference, \
1618 google.cloud.bigquery.dataset.DatasetListItem, \
1619 str, \
1620 ]):
1621 A reference to the dataset whose routines to list from the
1622 BigQuery API. If a string is passed in, this method attempts
1623 to create a dataset reference from a string using
1624 :func:`google.cloud.bigquery.dataset.DatasetReference.from_string`.
1625 max_results (Optional[int]):
1626 Maximum number of routines to return. Defaults
1627 to a value set by the API.
1628 page_token (Optional[str]):
1629 Token representing a cursor into the routines. If not passed,
1630 the API will return the first page of routines. The token marks
1631 the beginning of the iterator to be returned and the value of the
1632 ``page_token`` can be accessed at ``next_page_token`` of the
1633 :class:`~google.api_core.page_iterator.HTTPIterator`.
1634 retry (Optional[google.api_core.retry.Retry]):
1635 How to retry the RPC.
1636 timeout (Optional[float]):
1637 The number of seconds to wait for the underlying HTTP transport
1638 before using ``retry``.
1639 page_size (Optional[int]):
1640 Maximum number of routines to return per page.
1641 Defaults to a value set by the API.
1642
1643 Returns:
1644 google.api_core.page_iterator.Iterator:
1645 Iterator of all
1646 :class:`~google.cloud.bigquery.routine.Routine`s contained
1647 within the requested dataset, limited by ``max_results``.
1648 """
1649 dataset = self._dataset_from_arg(dataset)
1650 path = "{}/routines".format(dataset.path)
1651
1652 span_attributes = {"path": path}
1653
1654 def api_request(*args, **kwargs):
1655 return self._call_api(
1656 retry,
1657 span_name="BigQuery.listRoutines",
1658 span_attributes=span_attributes,
1659 *args,
1660 timeout=timeout,
1661 **kwargs,
1662 )
1663
1664 result = page_iterator.HTTPIterator(
1665 client=self,
1666 api_request=api_request,
1667 path=path,
1668 item_to_value=_item_to_routine,
1669 items_key="routines",
1670 page_token=page_token,
1671 max_results=max_results,
1672 page_size=page_size,
1673 )
1674 result.dataset = dataset # type: ignore
1675 return result
1676
1677 def list_tables(
1678 self,
1679 dataset: Union[Dataset, DatasetReference, DatasetListItem, str],
1680 max_results: Optional[int] = None,
1681 page_token: Optional[str] = None,
1682 retry: retries.Retry = DEFAULT_RETRY,
1683 timeout: TimeoutType = DEFAULT_TIMEOUT,
1684 page_size: Optional[int] = None,
1685 ) -> page_iterator.Iterator:
1686 """List tables in the dataset.
1687
1688 See
1689 https://cloud.google.com/bigquery/docs/reference/rest/v2/tables/list
1690
1691 Args:
1692 dataset (Union[ \
1693 google.cloud.bigquery.dataset.Dataset, \
1694 google.cloud.bigquery.dataset.DatasetReference, \
1695 google.cloud.bigquery.dataset.DatasetListItem, \
1696 str, \
1697 ]):
1698 A reference to the dataset whose tables to list from the
1699 BigQuery API. If a string is passed in, this method attempts
1700 to create a dataset reference from a string using
1701 :func:`google.cloud.bigquery.dataset.DatasetReference.from_string`.
1702 max_results (Optional[int]):
1703 Maximum number of tables to return. Defaults
1704 to a value set by the API.
1705 page_token (Optional[str]):
1706 Token representing a cursor into the tables. If not passed,
1707 the API will return the first page of tables. The token marks
1708 the beginning of the iterator to be returned and the value of
1709 the ``page_token`` can be accessed at ``next_page_token`` of the
1710 :class:`~google.api_core.page_iterator.HTTPIterator`.
1711 retry (Optional[google.api_core.retry.Retry]):
1712 How to retry the RPC.
1713 timeout (Optional[float]):
1714 The number of seconds to wait for the underlying HTTP transport
1715 before using ``retry``.
1716 page_size (Optional[int]):
1717 Maximum number of tables to return per page.
1718 Defaults to a value set by the API.
1719
1720 Returns:
1721 google.api_core.page_iterator.Iterator:
1722 Iterator of
1723 :class:`~google.cloud.bigquery.table.TableListItem` contained
1724 within the requested dataset.
1725 """
1726 dataset = self._dataset_from_arg(dataset)
1727 path = "%s/tables" % dataset.path
1728 span_attributes = {"path": path}
1729
1730 def api_request(*args, **kwargs):
1731 return self._call_api(
1732 retry,
1733 span_name="BigQuery.listTables",
1734 span_attributes=span_attributes,
1735 *args,
1736 timeout=timeout,
1737 **kwargs,
1738 )
1739
1740 result = page_iterator.HTTPIterator(
1741 client=self,
1742 api_request=api_request,
1743 path=path,
1744 item_to_value=_item_to_table,
1745 items_key="tables",
1746 page_token=page_token,
1747 max_results=max_results,
1748 page_size=page_size,
1749 )
1750 result.dataset = dataset # type: ignore
1751 return result
1752
1753 def delete_dataset(
1754 self,
1755 dataset: Union[Dataset, DatasetReference, DatasetListItem, str],
1756 delete_contents: bool = False,
1757 retry: retries.Retry = DEFAULT_RETRY,
1758 timeout: TimeoutType = DEFAULT_TIMEOUT,
1759 not_found_ok: bool = False,
1760 ) -> None:
1761 """Delete a dataset.
1762
1763 See
1764 https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets/delete
1765
1766 Args:
1767 dataset (Union[ \
1768 google.cloud.bigquery.dataset.Dataset, \
1769 google.cloud.bigquery.dataset.DatasetReference, \
1770 google.cloud.bigquery.dataset.DatasetListItem, \
1771 str, \
1772 ]):
1773 A reference to the dataset to delete. If a string is passed
1774 in, this method attempts to create a dataset reference from a
1775 string using
1776 :func:`google.cloud.bigquery.dataset.DatasetReference.from_string`.
1777 delete_contents (Optional[bool]):
1778 If True, delete all the tables in the dataset. If False and
1779 the dataset contains tables, the request will fail.
1780 Default is False.
1781 retry (Optional[google.api_core.retry.Retry]):
1782 How to retry the RPC.
1783 timeout (Optional[float]):
1784 The number of seconds to wait for the underlying HTTP transport
1785 before using ``retry``.
1786 not_found_ok (Optional[bool]):
1787 Defaults to ``False``. If ``True``, ignore "not found" errors
1788 when deleting the dataset.
1789 """
1790 dataset = self._dataset_from_arg(dataset)
1791 params = {}
1792 path = dataset.path
1793 if delete_contents:
1794 params["deleteContents"] = "true"
1795 span_attributes = {"path": path, "deleteContents": delete_contents}
1796 else:
1797 span_attributes = {"path": path}
1798
1799 try:
1800 self._call_api(
1801 retry,
1802 span_name="BigQuery.deleteDataset",
1803 span_attributes=span_attributes,
1804 method="DELETE",
1805 path=path,
1806 query_params=params,
1807 timeout=timeout,
1808 )
1809 except core_exceptions.NotFound:
1810 if not not_found_ok:
1811 raise
1812
1813 def delete_model(
1814 self,
1815 model: Union[Model, ModelReference, str],
1816 retry: retries.Retry = DEFAULT_RETRY,
1817 timeout: TimeoutType = DEFAULT_TIMEOUT,
1818 not_found_ok: bool = False,
1819 ) -> None:
1820 """[Beta] Delete a model
1821
1822 See
1823 https://cloud.google.com/bigquery/docs/reference/rest/v2/models/delete
1824
1825 Args:
1826 model (Union[ \
1827 google.cloud.bigquery.model.Model, \
1828 google.cloud.bigquery.model.ModelReference, \
1829 str, \
1830 ]):
1831 A reference to the model to delete. If a string is passed in,
1832 this method attempts to create a model reference from a
1833 string using
1834 :func:`google.cloud.bigquery.model.ModelReference.from_string`.
1835 retry (Optional[google.api_core.retry.Retry]):
1836 How to retry the RPC.
1837 timeout (Optional[float]):
1838 The number of seconds to wait for the underlying HTTP transport
1839 before using ``retry``.
1840 not_found_ok (Optional[bool]):
1841 Defaults to ``False``. If ``True``, ignore "not found" errors
1842 when deleting the model.
1843 """
1844 if isinstance(model, str):
1845 model = ModelReference.from_string(model, default_project=self.project)
1846
1847 if not isinstance(model, (Model, ModelReference)):
1848 raise TypeError("model must be a Model or a ModelReference")
1849
1850 path = model.path
1851 try:
1852 span_attributes = {"path": path}
1853 self._call_api(
1854 retry,
1855 span_name="BigQuery.deleteModel",
1856 span_attributes=span_attributes,
1857 method="DELETE",
1858 path=path,
1859 timeout=timeout,
1860 )
1861 except core_exceptions.NotFound:
1862 if not not_found_ok:
1863 raise
1864
1865 def delete_job_metadata(
1866 self,
1867 job_id: Union[str, LoadJob, CopyJob, ExtractJob, QueryJob],
1868 project: Optional[str] = None,
1869 location: Optional[str] = None,
1870 retry: retries.Retry = DEFAULT_RETRY,
1871 timeout: TimeoutType = DEFAULT_TIMEOUT,
1872 not_found_ok: bool = False,
1873 ):
1874 """[Beta] Delete job metadata from job history.
1875
1876 Note: This does not stop a running job. Use
1877 :func:`~google.cloud.bigquery.client.Client.cancel_job` instead.
1878
1879 Args:
1880 job_id (Union[ \
1881 str, \
1882 LoadJob, \
1883 CopyJob, \
1884 ExtractJob, \
1885 QueryJob \
1886 ]): Job or job identifier.
1887 project (Optional[str]):
1888 ID of the project which owns the job (defaults to the client's project).
1889 location (Optional[str]):
1890 Location where the job was run. Ignored if ``job_id`` is a job
1891 object.
1892 retry (Optional[google.api_core.retry.Retry]):
1893 How to retry the RPC.
1894 timeout (Optional[float]):
1895 The number of seconds to wait for the underlying HTTP transport
1896 before using ``retry``.
1897 not_found_ok (Optional[bool]):
1898 Defaults to ``False``. If ``True``, ignore "not found" errors
1899 when deleting the job.
1900 """
1901 extra_params = {}
1902
1903 project, location, job_id = _extract_job_reference(
1904 job_id, project=project, location=location
1905 )
1906
1907 if project is None:
1908 project = self.project
1909
1910 if location is None:
1911 location = self.location
1912
1913 # Location is always required for jobs.delete()
1914 extra_params["location"] = location
1915
1916 path = f"/projects/{project}/jobs/{job_id}/delete"
1917
1918 span_attributes = {"path": path, "job_id": job_id, "location": location}
1919
1920 try:
1921 self._call_api(
1922 retry,
1923 span_name="BigQuery.deleteJob",
1924 span_attributes=span_attributes,
1925 method="DELETE",
1926 path=path,
1927 query_params=extra_params,
1928 timeout=timeout,
1929 )
1930 except google.api_core.exceptions.NotFound:
1931 if not not_found_ok:
1932 raise
1933
1934 def delete_routine(
1935 self,
1936 routine: Union[Routine, RoutineReference, str],
1937 retry: retries.Retry = DEFAULT_RETRY,
1938 timeout: TimeoutType = DEFAULT_TIMEOUT,
1939 not_found_ok: bool = False,
1940 ) -> None:
1941 """[Beta] Delete a routine.
1942
1943 See
1944 https://cloud.google.com/bigquery/docs/reference/rest/v2/routines/delete
1945
1946 Args:
1947 routine (Union[ \
1948 google.cloud.bigquery.routine.Routine, \
1949 google.cloud.bigquery.routine.RoutineReference, \
1950 str, \
1951 ]):
1952 A reference to the routine to delete. If a string is passed
1953 in, this method attempts to create a routine reference from a
1954 string using
1955 :func:`google.cloud.bigquery.routine.RoutineReference.from_string`.
1956 retry (Optional[google.api_core.retry.Retry]):
1957 How to retry the RPC.
1958 timeout (Optional[float]):
1959 The number of seconds to wait for the underlying HTTP transport
1960 before using ``retry``.
1961 not_found_ok (Optional[bool]):
1962 Defaults to ``False``. If ``True``, ignore "not found" errors
1963 when deleting the routine.
1964 """
1965 if isinstance(routine, str):
1966 routine = RoutineReference.from_string(
1967 routine, default_project=self.project
1968 )
1969 path = routine.path
1970
1971 if not isinstance(routine, (Routine, RoutineReference)):
1972 raise TypeError("routine must be a Routine or a RoutineReference")
1973
1974 try:
1975 span_attributes = {"path": path}
1976 self._call_api(
1977 retry,
1978 span_name="BigQuery.deleteRoutine",
1979 span_attributes=span_attributes,
1980 method="DELETE",
1981 path=path,
1982 timeout=timeout,
1983 )
1984 except core_exceptions.NotFound:
1985 if not not_found_ok:
1986 raise
1987
1988 def delete_table(
1989 self,
1990 table: Union[Table, TableReference, TableListItem, str],
1991 retry: retries.Retry = DEFAULT_RETRY,
1992 timeout: TimeoutType = DEFAULT_TIMEOUT,
1993 not_found_ok: bool = False,
1994 ) -> None:
1995 """Delete a table
1996
1997 See
1998 https://cloud.google.com/bigquery/docs/reference/rest/v2/tables/delete
1999
2000 Args:
2001 table (Union[ \
2002 google.cloud.bigquery.table.Table, \
2003 google.cloud.bigquery.table.TableReference, \
2004 google.cloud.bigquery.table.TableListItem, \
2005 str, \
2006 ]):
2007 A reference to the table to delete. If a string is passed in,
2008 this method attempts to create a table reference from a
2009 string using
2010 :func:`google.cloud.bigquery.table.TableReference.from_string`.
2011 retry (Optional[google.api_core.retry.Retry]):
2012 How to retry the RPC.
2013 timeout (Optional[float]):
2014 The number of seconds to wait for the underlying HTTP transport
2015 before using ``retry``.
2016 not_found_ok (Optional[bool]):
2017 Defaults to ``False``. If ``True``, ignore "not found" errors
2018 when deleting the table.
2019 """
2020 table = _table_arg_to_table_ref(table, default_project=self.project)
2021 if not isinstance(table, TableReference):
2022 raise TypeError("Unable to get TableReference for table '{}'".format(table))
2023
2024 try:
2025 path = table.path
2026 span_attributes = {"path": path}
2027 self._call_api(
2028 retry,
2029 span_name="BigQuery.deleteTable",
2030 span_attributes=span_attributes,
2031 method="DELETE",
2032 path=path,
2033 timeout=timeout,
2034 )
2035 except core_exceptions.NotFound:
2036 if not not_found_ok:
2037 raise
2038
2039 def _get_query_results(
2040 self,
2041 job_id: str,
2042 retry: retries.Retry,
2043 project: Optional[str] = None,
2044 timeout_ms: Optional[int] = None,
2045 location: Optional[str] = None,
2046 timeout: TimeoutType = DEFAULT_TIMEOUT,
2047 page_size: int = 0,
2048 start_index: Optional[int] = None,
2049 ) -> _QueryResults:
2050 """Get the query results object for a query job.
2051
2052 Args:
2053 job_id (str): Name of the query job.
2054 retry (google.api_core.retry.Retry):
2055 How to retry the RPC.
2056 project (Optional[str]):
2057 Project ID for the query job (defaults to the project of the client).
2058 timeout_ms (Optional[int]):
2059 Number of milliseconds the the API call should wait for the query
2060 to complete before the request times out.
2061 location (Optional[str]): Location of the query job.
2062 timeout (Optional[float]):
2063 The number of seconds to wait for the underlying HTTP transport
2064 before using ``retry``. If set, this connection timeout may be
2065 increased to a minimum value. This prevents retries on what
2066 would otherwise be a successful response.
2067 page_size (Optional[int]):
2068 Maximum number of rows in a single response. See maxResults in
2069 the jobs.getQueryResults REST API.
2070 start_index (Optional[int]):
2071 Zero-based index of the starting row. See startIndex in the
2072 jobs.getQueryResults REST API.
2073
2074 Returns:
2075 google.cloud.bigquery.query._QueryResults:
2076 A new ``_QueryResults`` instance.
2077 """
2078
2079 extra_params: Dict[str, Any] = {"maxResults": page_size}
2080
2081 if timeout is not None:
2082 if not isinstance(timeout, (int, float)):
2083 timeout = _MIN_GET_QUERY_RESULTS_TIMEOUT
2084 else:
2085 timeout = max(timeout, _MIN_GET_QUERY_RESULTS_TIMEOUT)
2086
2087 if page_size > 0:
2088 extra_params["formatOptions.useInt64Timestamp"] = True
2089
2090 if project is None:
2091 project = self.project
2092
2093 if timeout_ms is not None:
2094 extra_params["timeoutMs"] = timeout_ms
2095
2096 if location is None:
2097 location = self.location
2098
2099 if location is not None:
2100 extra_params["location"] = location
2101
2102 if start_index is not None:
2103 extra_params["startIndex"] = start_index
2104
2105 path = "/projects/{}/queries/{}".format(project, job_id)
2106
2107 # This call is typically made in a polling loop that checks whether the
2108 # job is complete (from QueryJob.done(), called ultimately from
2109 # QueryJob.result()). So we don't need to poll here.
2110 span_attributes = {"path": path}
2111 resource = self._call_api(
2112 retry,
2113 span_name="BigQuery.getQueryResults",
2114 span_attributes=span_attributes,
2115 method="GET",
2116 path=path,
2117 query_params=extra_params,
2118 timeout=timeout,
2119 )
2120 return _QueryResults.from_api_repr(resource)
2121
2122 def job_from_resource(
2123 self, resource: dict
2124 ) -> Union[job.CopyJob, job.ExtractJob, job.LoadJob, job.QueryJob, job.UnknownJob]:
2125 """Detect correct job type from resource and instantiate.
2126
2127 Args:
2128 resource (Dict): one job resource from API response
2129
2130 Returns:
2131 Union[job.CopyJob, job.ExtractJob, job.LoadJob, job.QueryJob, job.UnknownJob]:
2132 The job instance, constructed via the resource.
2133 """
2134 config = resource.get("configuration", {})
2135 if "load" in config:
2136 return job.LoadJob.from_api_repr(resource, self)
2137 elif "copy" in config:
2138 return job.CopyJob.from_api_repr(resource, self)
2139 elif "extract" in config:
2140 return job.ExtractJob.from_api_repr(resource, self)
2141 elif "query" in config:
2142 return job.QueryJob.from_api_repr(resource, self)
2143 return job.UnknownJob.from_api_repr(resource, self)
2144
2145 def create_job(
2146 self,
2147 job_config: dict,
2148 retry: retries.Retry = DEFAULT_RETRY,
2149 timeout: TimeoutType = DEFAULT_TIMEOUT,
2150 ) -> Union[job.LoadJob, job.CopyJob, job.ExtractJob, job.QueryJob]:
2151 """Create a new job.
2152
2153 Args:
2154 job_config (dict): configuration job representation returned from the API.
2155 retry (Optional[google.api_core.retry.Retry]): How to retry the RPC.
2156 timeout (Optional[float]):
2157 The number of seconds to wait for the underlying HTTP transport
2158 before using ``retry``.
2159
2160 Returns:
2161 Union[ \
2162 google.cloud.bigquery.job.LoadJob, \
2163 google.cloud.bigquery.job.CopyJob, \
2164 google.cloud.bigquery.job.ExtractJob, \
2165 google.cloud.bigquery.job.QueryJob \
2166 ]:
2167 A new job instance.
2168 """
2169
2170 if "load" in job_config:
2171 load_job_config = google.cloud.bigquery.job.LoadJobConfig.from_api_repr(
2172 job_config
2173 )
2174 destination = _get_sub_prop(job_config, ["load", "destinationTable"])
2175 source_uris = _get_sub_prop(job_config, ["load", "sourceUris"])
2176 destination = TableReference.from_api_repr(destination)
2177 return self.load_table_from_uri(
2178 source_uris,
2179 destination,
2180 job_config=typing.cast(LoadJobConfig, load_job_config),
2181 retry=retry,
2182 timeout=timeout,
2183 )
2184 elif "copy" in job_config:
2185 copy_job_config = google.cloud.bigquery.job.CopyJobConfig.from_api_repr(
2186 job_config
2187 )
2188 destination = _get_sub_prop(job_config, ["copy", "destinationTable"])
2189 destination = TableReference.from_api_repr(destination)
2190 return self.copy_table(
2191 [], # Source table(s) already in job_config resource.
2192 destination,
2193 job_config=typing.cast(CopyJobConfig, copy_job_config),
2194 retry=retry,
2195 timeout=timeout,
2196 )
2197 elif "extract" in job_config:
2198 extract_job_config = (
2199 google.cloud.bigquery.job.ExtractJobConfig.from_api_repr(job_config)
2200 )
2201 source = _get_sub_prop(job_config, ["extract", "sourceTable"])
2202 if source:
2203 source_type = "Table"
2204 source = TableReference.from_api_repr(source)
2205 else:
2206 source = _get_sub_prop(job_config, ["extract", "sourceModel"])
2207 source_type = "Model"
2208 source = ModelReference.from_api_repr(source)
2209 destination_uris = _get_sub_prop(job_config, ["extract", "destinationUris"])
2210 return self.extract_table(
2211 source,
2212 destination_uris,
2213 job_config=typing.cast(ExtractJobConfig, extract_job_config),
2214 retry=retry,
2215 timeout=timeout,
2216 source_type=source_type,
2217 )
2218 elif "query" in job_config:
2219 query_job_config = google.cloud.bigquery.job.QueryJobConfig.from_api_repr(
2220 job_config
2221 )
2222 query = _get_sub_prop(job_config, ["query", "query"])
2223 return self.query(
2224 query,
2225 job_config=typing.cast(QueryJobConfig, query_job_config),
2226 retry=retry,
2227 timeout=timeout,
2228 )
2229 else:
2230 raise TypeError("Invalid job configuration received.")
2231
2232 def get_job(
2233 self,
2234 job_id: Union[str, job.LoadJob, job.CopyJob, job.ExtractJob, job.QueryJob],
2235 project: Optional[str] = None,
2236 location: Optional[str] = None,
2237 retry: retries.Retry = DEFAULT_RETRY,
2238 timeout: TimeoutType = DEFAULT_GET_JOB_TIMEOUT,
2239 ) -> Union[job.LoadJob, job.CopyJob, job.ExtractJob, job.QueryJob, job.UnknownJob]:
2240 """Fetch a job for the project associated with this client.
2241
2242 See
2243 https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/get
2244
2245 Args:
2246 job_id (Union[ \
2247 str, \
2248 job.LoadJob, \
2249 job.CopyJob, \
2250 job.ExtractJob, \
2251 job.QueryJob \
2252 ]):
2253 Job identifier.
2254 project (Optional[str]):
2255 ID of the project which owns the job (defaults to the client's project).
2256 location (Optional[str]):
2257 Location where the job was run. Ignored if ``job_id`` is a job
2258 object.
2259 retry (Optional[google.api_core.retry.Retry]):
2260 How to retry the RPC.
2261 timeout (Optional[float]):
2262 The number of seconds to wait for the underlying HTTP transport
2263 before using ``retry``.
2264
2265 Returns:
2266 Union[job.LoadJob, job.CopyJob, job.ExtractJob, job.QueryJob, job.UnknownJob]:
2267 Job instance, based on the resource returned by the API.
2268 """
2269 extra_params = {"projection": "full"}
2270
2271 project, location, job_id = _extract_job_reference(
2272 job_id, project=project, location=location
2273 )
2274
2275 if project is None:
2276 project = self.project
2277
2278 if location is None:
2279 location = self.location
2280
2281 if location is not None:
2282 extra_params["location"] = location
2283
2284 path = "/projects/{}/jobs/{}".format(project, job_id)
2285
2286 span_attributes = {"path": path, "job_id": job_id, "location": location}
2287
2288 resource = self._call_api(
2289 retry,
2290 span_name="BigQuery.getJob",
2291 span_attributes=span_attributes,
2292 method="GET",
2293 path=path,
2294 query_params=extra_params,
2295 timeout=timeout,
2296 )
2297
2298 return self.job_from_resource(resource)
2299
2300 def cancel_job(
2301 self,
2302 job_id: str,
2303 project: Optional[str] = None,
2304 location: Optional[str] = None,
2305 retry: retries.Retry = DEFAULT_RETRY,
2306 timeout: TimeoutType = DEFAULT_TIMEOUT,
2307 ) -> Union[job.LoadJob, job.CopyJob, job.ExtractJob, job.QueryJob]:
2308 """Attempt to cancel a job from a job ID.
2309
2310 See
2311 https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/cancel
2312
2313 Args:
2314 job_id (Union[ \
2315 str, \
2316 google.cloud.bigquery.job.LoadJob, \
2317 google.cloud.bigquery.job.CopyJob, \
2318 google.cloud.bigquery.job.ExtractJob, \
2319 google.cloud.bigquery.job.QueryJob \
2320 ]): Job identifier.
2321 project (Optional[str]):
2322 ID of the project which owns the job (defaults to the client's project).
2323 location (Optional[str]):
2324 Location where the job was run. Ignored if ``job_id`` is a job
2325 object.
2326 retry (Optional[google.api_core.retry.Retry]):
2327 How to retry the RPC.
2328 timeout (Optional[float]):
2329 The number of seconds to wait for the underlying HTTP transport
2330 before using ``retry``.
2331
2332 Returns:
2333 Union[ \
2334 google.cloud.bigquery.job.LoadJob, \
2335 google.cloud.bigquery.job.CopyJob, \
2336 google.cloud.bigquery.job.ExtractJob, \
2337 google.cloud.bigquery.job.QueryJob, \
2338 ]:
2339 Job instance, based on the resource returned by the API.
2340 """
2341 extra_params = {"projection": "full"}
2342
2343 project, location, job_id = _extract_job_reference(
2344 job_id, project=project, location=location
2345 )
2346
2347 if project is None:
2348 project = self.project
2349
2350 if location is None:
2351 location = self.location
2352
2353 if location is not None:
2354 extra_params["location"] = location
2355
2356 path = "/projects/{}/jobs/{}/cancel".format(project, job_id)
2357
2358 span_attributes = {"path": path, "job_id": job_id, "location": location}
2359
2360 resource = self._call_api(
2361 retry,
2362 span_name="BigQuery.cancelJob",
2363 span_attributes=span_attributes,
2364 method="POST",
2365 path=path,
2366 query_params=extra_params,
2367 timeout=timeout,
2368 )
2369
2370 job_instance = self.job_from_resource(resource["job"]) # never an UnknownJob
2371
2372 return typing.cast(
2373 Union[job.LoadJob, job.CopyJob, job.ExtractJob, job.QueryJob],
2374 job_instance,
2375 )
2376
2377 def list_jobs(
2378 self,
2379 project: Optional[str] = None,
2380 parent_job: Optional[Union[QueryJob, str]] = None,
2381 max_results: Optional[int] = None,
2382 page_token: Optional[str] = None,
2383 all_users: Optional[bool] = None,
2384 state_filter: Optional[str] = None,
2385 retry: retries.Retry = DEFAULT_RETRY,
2386 timeout: TimeoutType = DEFAULT_TIMEOUT,
2387 min_creation_time: Optional[datetime.datetime] = None,
2388 max_creation_time: Optional[datetime.datetime] = None,
2389 page_size: Optional[int] = None,
2390 ) -> page_iterator.Iterator:
2391 """List jobs for the project associated with this client.
2392
2393 See
2394 https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/list
2395
2396 Args:
2397 project (Optional[str]):
2398 Project ID to use for retreiving datasets. Defaults
2399 to the client's project.
2400 parent_job (Optional[Union[ \
2401 google.cloud.bigquery.job._AsyncJob, \
2402 str, \
2403 ]]):
2404 If set, retrieve only child jobs of the specified parent.
2405 max_results (Optional[int]):
2406 Maximum number of jobs to return.
2407 page_token (Optional[str]):
2408 Opaque marker for the next "page" of jobs. If not
2409 passed, the API will return the first page of jobs. The token
2410 marks the beginning of the iterator to be returned and the
2411 value of the ``page_token`` can be accessed at
2412 ``next_page_token`` of
2413 :class:`~google.api_core.page_iterator.HTTPIterator`.
2414 all_users (Optional[bool]):
2415 If true, include jobs owned by all users in the project.
2416 Defaults to :data:`False`.
2417 state_filter (Optional[str]):
2418 If set, include only jobs matching the given state. One of:
2419 * ``"done"``
2420 * ``"pending"``
2421 * ``"running"``
2422 retry (Optional[google.api_core.retry.Retry]):
2423 How to retry the RPC.
2424 timeout (Optional[float]):
2425 The number of seconds to wait for the underlying HTTP transport
2426 before using ``retry``.
2427 min_creation_time (Optional[datetime.datetime]):
2428 Min value for job creation time. If set, only jobs created
2429 after or at this timestamp are returned. If the datetime has
2430 no time zone assumes UTC time.
2431 max_creation_time (Optional[datetime.datetime]):
2432 Max value for job creation time. If set, only jobs created
2433 before or at this timestamp are returned. If the datetime has
2434 no time zone assumes UTC time.
2435 page_size (Optional[int]):
2436 Maximum number of jobs to return per page.
2437
2438 Returns:
2439 google.api_core.page_iterator.Iterator:
2440 Iterable of job instances.
2441 """
2442 if isinstance(parent_job, job._AsyncJob):
2443 parent_job = parent_job.job_id # pytype: disable=attribute-error
2444
2445 extra_params = {
2446 "allUsers": all_users,
2447 "stateFilter": state_filter,
2448 "minCreationTime": _str_or_none(
2449 google.cloud._helpers._millis_from_datetime(min_creation_time)
2450 ),
2451 "maxCreationTime": _str_or_none(
2452 google.cloud._helpers._millis_from_datetime(max_creation_time)
2453 ),
2454 "projection": "full",
2455 "parentJobId": parent_job,
2456 }
2457
2458 extra_params = {
2459 param: value for param, value in extra_params.items() if value is not None
2460 }
2461
2462 if project is None:
2463 project = self.project
2464
2465 path = "/projects/%s/jobs" % (project,)
2466
2467 span_attributes = {"path": path}
2468
2469 def api_request(*args, **kwargs):
2470 return self._call_api(
2471 retry,
2472 span_name="BigQuery.listJobs",
2473 span_attributes=span_attributes,
2474 *args,
2475 timeout=timeout,
2476 **kwargs,
2477 )
2478
2479 return page_iterator.HTTPIterator(
2480 client=self,
2481 api_request=api_request,
2482 path=path,
2483 item_to_value=_item_to_job,
2484 items_key="jobs",
2485 page_token=page_token,
2486 max_results=max_results,
2487 extra_params=extra_params,
2488 page_size=page_size,
2489 )
2490
2491 def load_table_from_uri(
2492 self,
2493 source_uris: Union[str, Sequence[str]],
2494 destination: Union[Table, TableReference, TableListItem, str],
2495 job_id: Optional[str] = None,
2496 job_id_prefix: Optional[str] = None,
2497 location: Optional[str] = None,
2498 project: Optional[str] = None,
2499 job_config: Optional[LoadJobConfig] = None,
2500 retry: retries.Retry = DEFAULT_RETRY,
2501 timeout: TimeoutType = DEFAULT_TIMEOUT,
2502 ) -> job.LoadJob:
2503 """Starts a job for loading data into a table from Cloud Storage.
2504
2505 See
2506 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationload
2507
2508 Args:
2509 source_uris (Union[str, Sequence[str]]):
2510 URIs of data files to be loaded; in format
2511 ``gs://<bucket_name>/<object_name_or_glob>``.
2512 destination (Union[ \
2513 google.cloud.bigquery.table.Table, \
2514 google.cloud.bigquery.table.TableReference, \
2515 google.cloud.bigquery.table.TableListItem, \
2516 str, \
2517 ]):
2518 Table into which data is to be loaded. If a string is passed
2519 in, this method attempts to create a table reference from a
2520 string using
2521 :func:`google.cloud.bigquery.table.TableReference.from_string`.
2522 job_id (Optional[str]): Name of the job.
2523 job_id_prefix (Optional[str]):
2524 The user-provided prefix for a randomly generated job ID.
2525 This parameter will be ignored if a ``job_id`` is also given.
2526 location (Optional[str]):
2527 Location where to run the job. Must match the location of the
2528 destination table.
2529 project (Optional[str]):
2530 Project ID of the project of where to run the job. Defaults
2531 to the client's project.
2532 job_config (Optional[google.cloud.bigquery.job.LoadJobConfig]):
2533 Extra configuration options for the job.
2534 retry (Optional[google.api_core.retry.Retry]):
2535 How to retry the RPC.
2536 timeout (Optional[float]):
2537 The number of seconds to wait for the underlying HTTP transport
2538 before using ``retry``.
2539
2540 Returns:
2541 google.cloud.bigquery.job.LoadJob: A new load job.
2542
2543 Raises:
2544 TypeError:
2545 If ``job_config`` is not an instance of
2546 :class:`~google.cloud.bigquery.job.LoadJobConfig` class.
2547 """
2548 job_id = _make_job_id(job_id, job_id_prefix)
2549
2550 if project is None:
2551 project = self.project
2552
2553 if location is None:
2554 location = self.location
2555
2556 job_ref = job._JobReference(job_id, project=project, location=location)
2557
2558 if isinstance(source_uris, str):
2559 source_uris = [source_uris]
2560
2561 destination = _table_arg_to_table_ref(destination, default_project=self.project)
2562
2563 if job_config is not None:
2564 _verify_job_config_type(job_config, LoadJobConfig)
2565 else:
2566 job_config = job.LoadJobConfig()
2567
2568 new_job_config = job_config._fill_from_default(self._default_load_job_config)
2569
2570 load_job = job.LoadJob(job_ref, source_uris, destination, self, new_job_config)
2571 load_job._begin(retry=retry, timeout=timeout)
2572
2573 return load_job
2574
2575 def load_table_from_file(
2576 self,
2577 file_obj: IO[bytes],
2578 destination: Union[Table, TableReference, TableListItem, str],
2579 rewind: bool = False,
2580 size: Optional[int] = None,
2581 num_retries: int = _DEFAULT_NUM_RETRIES,
2582 job_id: Optional[str] = None,
2583 job_id_prefix: Optional[str] = None,
2584 location: Optional[str] = None,
2585 project: Optional[str] = None,
2586 job_config: Optional[LoadJobConfig] = None,
2587 timeout: ResumableTimeoutType = DEFAULT_TIMEOUT,
2588 ) -> job.LoadJob:
2589 """Upload the contents of this table from a file-like object.
2590
2591 Similar to :meth:`load_table_from_uri`, this method creates, starts and
2592 returns a :class:`~google.cloud.bigquery.job.LoadJob`.
2593
2594 Args:
2595 file_obj (IO[bytes]):
2596 A file handle opened in binary mode for reading.
2597 destination (Union[Table, \
2598 TableReference, \
2599 TableListItem, \
2600 str \
2601 ]):
2602 Table into which data is to be loaded. If a string is passed
2603 in, this method attempts to create a table reference from a
2604 string using
2605 :func:`google.cloud.bigquery.table.TableReference.from_string`.
2606 rewind (Optional[bool]):
2607 If True, seek to the beginning of the file handle before
2608 reading the file. Defaults to False.
2609 size (Optional[int]):
2610 The number of bytes to read from the file handle. If size is
2611 ``None`` or large, resumable upload will be used. Otherwise,
2612 multipart upload will be used.
2613 num_retries (Optional[int]): Number of upload retries. Defaults to 6.
2614 job_id (Optional[str]): Name of the job.
2615 job_id_prefix (Optional[str]):
2616 The user-provided prefix for a randomly generated job ID.
2617 This parameter will be ignored if a ``job_id`` is also given.
2618 location (Optional[str]):
2619 Location where to run the job. Must match the location of the
2620 destination table.
2621 project (Optional[str]):
2622 Project ID of the project of where to run the job. Defaults
2623 to the client's project.
2624 job_config (Optional[LoadJobConfig]):
2625 Extra configuration options for the job.
2626 timeout (Optional[float]):
2627 The number of seconds to wait for the underlying HTTP transport
2628 before using ``retry``. Depending on the retry strategy, a request
2629 may be repeated several times using the same timeout each time.
2630 Defaults to None.
2631
2632 Can also be passed as a tuple (connect_timeout, read_timeout).
2633 See :meth:`requests.Session.request` documentation for details.
2634
2635 Returns:
2636 google.cloud.bigquery.job.LoadJob: A new load job.
2637
2638 Raises:
2639 ValueError:
2640 If ``size`` is not passed in and can not be determined, or if
2641 the ``file_obj`` can be detected to be a file opened in text
2642 mode.
2643
2644 TypeError:
2645 If ``job_config`` is not an instance of
2646 :class:`~google.cloud.bigquery.job.LoadJobConfig` class.
2647 """
2648 job_id = _make_job_id(job_id, job_id_prefix)
2649
2650 if project is None:
2651 project = self.project
2652
2653 if location is None:
2654 location = self.location
2655
2656 destination = _table_arg_to_table_ref(destination, default_project=self.project)
2657 job_ref = job._JobReference(job_id, project=project, location=location)
2658
2659 if job_config is not None:
2660 _verify_job_config_type(job_config, LoadJobConfig)
2661 else:
2662 job_config = job.LoadJobConfig()
2663
2664 new_job_config = job_config._fill_from_default(self._default_load_job_config)
2665
2666 load_job = job.LoadJob(job_ref, None, destination, self, new_job_config)
2667 job_resource = load_job.to_api_repr()
2668
2669 if rewind:
2670 file_obj.seek(0, os.SEEK_SET)
2671
2672 _check_mode(file_obj)
2673
2674 try:
2675 if size is None or size >= _MAX_MULTIPART_SIZE:
2676 response = self._do_resumable_upload(
2677 file_obj, job_resource, num_retries, timeout, project=project
2678 )
2679 else:
2680 response = self._do_multipart_upload(
2681 file_obj, job_resource, size, num_retries, timeout, project=project
2682 )
2683 except resumable_media.InvalidResponse as exc:
2684 raise exceptions.from_http_response(exc.response)
2685
2686 return typing.cast(LoadJob, self.job_from_resource(response.json()))
2687
2688 def load_table_from_dataframe(
2689 self,
2690 dataframe: "pandas.DataFrame", # type: ignore
2691 destination: Union[Table, TableReference, str],
2692 num_retries: int = _DEFAULT_NUM_RETRIES,
2693 job_id: Optional[str] = None,
2694 job_id_prefix: Optional[str] = None,
2695 location: Optional[str] = None,
2696 project: Optional[str] = None,
2697 job_config: Optional[LoadJobConfig] = None,
2698 parquet_compression: str = "snappy",
2699 timeout: ResumableTimeoutType = DEFAULT_TIMEOUT,
2700 ) -> job.LoadJob:
2701 """Upload the contents of a table from a pandas DataFrame.
2702
2703 Similar to :meth:`load_table_from_uri`, this method creates, starts and
2704 returns a :class:`~google.cloud.bigquery.job.LoadJob`.
2705
2706 .. note::
2707
2708 REPEATED fields are NOT supported when using the CSV source format.
2709 They are supported when using the PARQUET source format, but
2710 due to the way they are encoded in the ``parquet`` file,
2711 a mismatch with the existing table schema can occur, so
2712 REPEATED fields are not properly supported when using ``pyarrow<4.0.0``
2713 using the parquet format.
2714
2715 https://github.com/googleapis/python-bigquery/issues/19
2716
2717 Args:
2718 dataframe (pandas.Dataframe):
2719 A :class:`~pandas.DataFrame` containing the data to load.
2720 destination (Union[ \
2721 Table, \
2722 TableReference, \
2723 str \
2724 ]):
2725 The destination table to use for loading the data. If it is an
2726 existing table, the schema of the :class:`~pandas.DataFrame`
2727 must match the schema of the destination table. If the table
2728 does not yet exist, the schema is inferred from the
2729 :class:`~pandas.DataFrame`.
2730
2731 If a string is passed in, this method attempts to create a
2732 table reference from a string using
2733 :func:`google.cloud.bigquery.table.TableReference.from_string`.
2734 num_retries (Optional[int]): Number of upload retries. Defaults to 6.
2735 job_id (Optional[str]): Name of the job.
2736 job_id_prefix (Optional[str]):
2737 The user-provided prefix for a randomly generated
2738 job ID. This parameter will be ignored if a ``job_id`` is
2739 also given.
2740 location (Optional[str]):
2741 Location where to run the job. Must match the location of the
2742 destination table.
2743 project (Optional[str]):
2744 Project ID of the project of where to run the job. Defaults
2745 to the client's project.
2746 job_config (Optional[LoadJobConfig]):
2747 Extra configuration options for the job.
2748
2749 To override the default pandas data type conversions, supply
2750 a value for
2751 :attr:`~google.cloud.bigquery.job.LoadJobConfig.schema` with
2752 column names matching those of the dataframe. The BigQuery
2753 schema is used to determine the correct data type conversion.
2754 Indexes are not loaded.
2755
2756 By default, this method uses the parquet source format. To
2757 override this, supply a value for
2758 :attr:`~google.cloud.bigquery.job.LoadJobConfig.source_format`
2759 with the format name. Currently only
2760 :attr:`~google.cloud.bigquery.job.SourceFormat.CSV` and
2761 :attr:`~google.cloud.bigquery.job.SourceFormat.PARQUET` are
2762 supported.
2763 parquet_compression (Optional[str]):
2764 [Beta] The compression method to use if intermittently
2765 serializing ``dataframe`` to a parquet file.
2766 Defaults to "snappy".
2767
2768 The argument is directly passed as the ``compression``
2769 argument to the underlying ``pyarrow.parquet.write_table()``
2770 method (the default value "snappy" gets converted to uppercase).
2771 https://arrow.apache.org/docs/python/generated/pyarrow.parquet.write_table.html#pyarrow-parquet-write-table
2772
2773 If the job config schema is missing, the argument is directly
2774 passed as the ``compression`` argument to the underlying
2775 ``DataFrame.to_parquet()`` method.
2776 https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_parquet.html#pandas.DataFrame.to_parquet
2777 timeout (Optional[flaot]):
2778 The number of seconds to wait for the underlying HTTP transport
2779 before using ``retry``. Depending on the retry strategy, a request may
2780 be repeated several times using the same timeout each time.
2781 Defaults to None.
2782
2783 Can also be passed as a tuple (connect_timeout, read_timeout).
2784 See :meth:`requests.Session.request` documentation for details.
2785
2786 Returns:
2787 google.cloud.bigquery.job.LoadJob: A new load job.
2788
2789 Raises:
2790 ValueError:
2791 If a usable parquet engine cannot be found. This method
2792 requires :mod:`pyarrow` to be installed.
2793 TypeError:
2794 If ``job_config`` is not an instance of
2795 :class:`~google.cloud.bigquery.job.LoadJobConfig` class.
2796 """
2797 job_id = _make_job_id(job_id, job_id_prefix)
2798
2799 if job_config is not None:
2800 _verify_job_config_type(job_config, LoadJobConfig)
2801 else:
2802 job_config = job.LoadJobConfig()
2803
2804 new_job_config = job_config._fill_from_default(self._default_load_job_config)
2805
2806 supported_formats = {job.SourceFormat.CSV, job.SourceFormat.PARQUET}
2807 if new_job_config.source_format is None:
2808 # default value
2809 new_job_config.source_format = job.SourceFormat.PARQUET
2810
2811 if (
2812 new_job_config.source_format == job.SourceFormat.PARQUET
2813 and new_job_config.parquet_options is None
2814 ):
2815 parquet_options = ParquetOptions()
2816 # default value
2817 parquet_options.enable_list_inference = True
2818 new_job_config.parquet_options = parquet_options
2819
2820 if new_job_config.source_format not in supported_formats:
2821 raise ValueError(
2822 "Got unexpected source_format: '{}'. Currently, only PARQUET and CSV are supported".format(
2823 new_job_config.source_format
2824 )
2825 )
2826
2827 if pyarrow is None and new_job_config.source_format == job.SourceFormat.PARQUET:
2828 # pyarrow is now the only supported parquet engine.
2829 raise ValueError("This method requires pyarrow to be installed")
2830
2831 if location is None:
2832 location = self.location
2833
2834 # If table schema is not provided, we try to fetch the existing table
2835 # schema, and check if dataframe schema is compatible with it - except
2836 # for WRITE_TRUNCATE jobs, the existing schema does not matter then.
2837 if (
2838 not new_job_config.schema
2839 and new_job_config.write_disposition != job.WriteDisposition.WRITE_TRUNCATE
2840 ):
2841 try:
2842 table = self.get_table(destination)
2843 except core_exceptions.NotFound:
2844 pass
2845 else:
2846 columns_and_indexes = frozenset(
2847 name
2848 for name, _ in _pandas_helpers.list_columns_and_indexes(dataframe)
2849 )
2850 new_job_config.schema = [
2851 # Field description and policy tags are not needed to
2852 # serialize a data frame.
2853 SchemaField(
2854 field.name,
2855 field.field_type,
2856 mode=field.mode,
2857 fields=field.fields,
2858 )
2859 # schema fields not present in the dataframe are not needed
2860 for field in table.schema
2861 if field.name in columns_and_indexes
2862 ]
2863
2864 new_job_config.schema = _pandas_helpers.dataframe_to_bq_schema(
2865 dataframe, new_job_config.schema
2866 )
2867
2868 if not new_job_config.schema:
2869 # the schema could not be fully detected
2870 warnings.warn(
2871 "Schema could not be detected for all columns. Loading from a "
2872 "dataframe without a schema will be deprecated in the future, "
2873 "please provide a schema.",
2874 PendingDeprecationWarning,
2875 stacklevel=2,
2876 )
2877
2878 tmpfd, tmppath = tempfile.mkstemp(
2879 suffix="_job_{}.{}".format(job_id[:8], new_job_config.source_format.lower())
2880 )
2881 os.close(tmpfd)
2882
2883 try:
2884 if new_job_config.source_format == job.SourceFormat.PARQUET:
2885 if new_job_config.schema:
2886 if parquet_compression == "snappy": # adjust the default value
2887 parquet_compression = parquet_compression.upper()
2888
2889 _pandas_helpers.dataframe_to_parquet(
2890 dataframe,
2891 new_job_config.schema,
2892 tmppath,
2893 parquet_compression=parquet_compression,
2894 parquet_use_compliant_nested_type=True,
2895 )
2896 else:
2897 dataframe.to_parquet(
2898 tmppath,
2899 engine="pyarrow",
2900 compression=parquet_compression,
2901 **(
2902 {"use_compliant_nested_type": True}
2903 if _versions_helpers.PYARROW_VERSIONS.use_compliant_nested_type
2904 else {}
2905 ),
2906 )
2907
2908 else:
2909 dataframe.to_csv(
2910 tmppath,
2911 index=False,
2912 header=False,
2913 encoding="utf-8",
2914 float_format="%.17g",
2915 date_format="%Y-%m-%d %H:%M:%S.%f",
2916 )
2917
2918 with open(tmppath, "rb") as tmpfile:
2919 file_size = os.path.getsize(tmppath)
2920 return self.load_table_from_file(
2921 tmpfile,
2922 destination,
2923 num_retries=num_retries,
2924 rewind=True,
2925 size=file_size,
2926 job_id=job_id,
2927 job_id_prefix=job_id_prefix,
2928 location=location,
2929 project=project,
2930 job_config=new_job_config,
2931 timeout=timeout,
2932 )
2933
2934 finally:
2935 os.remove(tmppath)
2936
2937 def load_table_from_json(
2938 self,
2939 json_rows: Iterable[Dict[str, Any]],
2940 destination: Union[Table, TableReference, TableListItem, str],
2941 num_retries: int = _DEFAULT_NUM_RETRIES,
2942 job_id: Optional[str] = None,
2943 job_id_prefix: Optional[str] = None,
2944 location: Optional[str] = None,
2945 project: Optional[str] = None,
2946 job_config: Optional[LoadJobConfig] = None,
2947 timeout: ResumableTimeoutType = DEFAULT_TIMEOUT,
2948 ) -> job.LoadJob:
2949 """Upload the contents of a table from a JSON string or dict.
2950
2951 Args:
2952 json_rows (Iterable[Dict[str, Any]]):
2953 Row data to be inserted. Keys must match the table schema fields
2954 and values must be JSON-compatible representations.
2955
2956 .. note::
2957
2958 If your data is already a newline-delimited JSON string,
2959 it is best to wrap it into a file-like object and pass it
2960 to :meth:`~google.cloud.bigquery.client.Client.load_table_from_file`::
2961
2962 import io
2963 from google.cloud import bigquery
2964
2965 data = u'{"foo": "bar"}'
2966 data_as_file = io.StringIO(data)
2967
2968 client = bigquery.Client()
2969 client.load_table_from_file(data_as_file, ...)
2970
2971 destination (Union[ \
2972 Table, \
2973 TableReference, \
2974 TableListItem, \
2975 str \
2976 ]):
2977 Table into which data is to be loaded. If a string is passed
2978 in, this method attempts to create a table reference from a
2979 string using
2980 :func:`google.cloud.bigquery.table.TableReference.from_string`.
2981 num_retries (Optional[int]): Number of upload retries. Defaults to 6.
2982 job_id (Optional[str]): Name of the job.
2983 job_id_prefix (Optional[str]):
2984 The user-provided prefix for a randomly generated job ID.
2985 This parameter will be ignored if a ``job_id`` is also given.
2986 location (Optional[str]):
2987 Location where to run the job. Must match the location of the
2988 destination table.
2989 project (Optional[str]):
2990 Project ID of the project of where to run the job. Defaults
2991 to the client's project.
2992 job_config (Optional[LoadJobConfig]):
2993 Extra configuration options for the job. The ``source_format``
2994 setting is always set to
2995 :attr:`~google.cloud.bigquery.job.SourceFormat.NEWLINE_DELIMITED_JSON`.
2996 timeout (Optional[float]):
2997 The number of seconds to wait for the underlying HTTP transport
2998 before using ``retry``. Depending on the retry strategy, a request may
2999 be repeated several times using the same timeout each time.
3000 Defaults to None.
3001
3002 Can also be passed as a tuple (connect_timeout, read_timeout).
3003 See :meth:`requests.Session.request` documentation for details.
3004
3005 Returns:
3006 google.cloud.bigquery.job.LoadJob: A new load job.
3007
3008 Raises:
3009 TypeError:
3010 If ``job_config`` is not an instance of
3011 :class:`~google.cloud.bigquery.job.LoadJobConfig` class.
3012 """
3013 job_id = _make_job_id(job_id, job_id_prefix)
3014
3015 if job_config is not None:
3016 _verify_job_config_type(job_config, LoadJobConfig)
3017 else:
3018 job_config = job.LoadJobConfig()
3019
3020 new_job_config = job_config._fill_from_default(self._default_load_job_config)
3021
3022 new_job_config.source_format = job.SourceFormat.NEWLINE_DELIMITED_JSON
3023
3024 # In specific conditions, we check if the table alread exists, and/or
3025 # set the autodetect value for the user. For exact conditions, see table
3026 # https://github.com/googleapis/python-bigquery/issues/1228#issuecomment-1910946297
3027 if new_job_config.schema is None and new_job_config.autodetect is None:
3028 if new_job_config.write_disposition in (
3029 job.WriteDisposition.WRITE_TRUNCATE,
3030 job.WriteDisposition.WRITE_EMPTY,
3031 ):
3032 new_job_config.autodetect = True
3033 else:
3034 try:
3035 self.get_table(destination)
3036 except core_exceptions.NotFound:
3037 new_job_config.autodetect = True
3038 else:
3039 new_job_config.autodetect = False
3040
3041 if project is None:
3042 project = self.project
3043
3044 if location is None:
3045 location = self.location
3046
3047 destination = _table_arg_to_table_ref(destination, default_project=self.project)
3048
3049 data_str = "\n".join(json.dumps(item, ensure_ascii=False) for item in json_rows)
3050 encoded_str = data_str.encode()
3051 data_file = io.BytesIO(encoded_str)
3052 return self.load_table_from_file(
3053 data_file,
3054 destination,
3055 size=len(encoded_str),
3056 num_retries=num_retries,
3057 job_id=job_id,
3058 job_id_prefix=job_id_prefix,
3059 location=location,
3060 project=project,
3061 job_config=new_job_config,
3062 timeout=timeout,
3063 )
3064
3065 def _do_resumable_upload(
3066 self,
3067 stream: IO[bytes],
3068 metadata: Mapping[str, str],
3069 num_retries: int,
3070 timeout: Optional[ResumableTimeoutType],
3071 project: Optional[str] = None,
3072 ) -> "requests.Response":
3073 """Perform a resumable upload.
3074
3075 Args:
3076 stream (IO[bytes]): A bytes IO object open for reading.
3077 metadata (Mapping[str, str]): The metadata associated with the upload.
3078 num_retries (int):
3079 Number of upload retries. (Deprecated: This
3080 argument will be removed in a future release.)
3081 timeout (Optional[float]):
3082 The number of seconds to wait for the underlying HTTP transport
3083 before using ``retry``. Depending on the retry strategy, a request may
3084 be repeated several times using the same timeout each time.
3085
3086 Can also be passed as a tuple (connect_timeout, read_timeout).
3087 See :meth:`requests.Session.request` documentation for details.
3088 project (Optional[str]):
3089 Project ID of the project of where to run the upload. Defaults
3090 to the client's project.
3091
3092 Returns:
3093 The "200 OK" response object returned after the final chunk
3094 is uploaded.
3095 """
3096 upload, transport = self._initiate_resumable_upload(
3097 stream, metadata, num_retries, timeout, project=project
3098 )
3099
3100 while not upload.finished:
3101 response = upload.transmit_next_chunk(transport, timeout=timeout)
3102
3103 return response
3104
3105 def _initiate_resumable_upload(
3106 self,
3107 stream: IO[bytes],
3108 metadata: Mapping[str, str],
3109 num_retries: int,
3110 timeout: Optional[ResumableTimeoutType],
3111 project: Optional[str] = None,
3112 ):
3113 """Initiate a resumable upload.
3114
3115 Args:
3116 stream (IO[bytes]): A bytes IO object open for reading.
3117 metadata (Mapping[str, str]): The metadata associated with the upload.
3118 num_retries (int):
3119 Number of upload retries. (Deprecated: This
3120 argument will be removed in a future release.)
3121 timeout (Optional[float]):
3122 The number of seconds to wait for the underlying HTTP transport
3123 before using ``retry``. Depending on the retry strategy, a request may
3124 be repeated several times using the same timeout each time.
3125
3126 Can also be passed as a tuple (connect_timeout, read_timeout).
3127 See :meth:`requests.Session.request` documentation for details.
3128 project (Optional[str]):
3129 Project ID of the project of where to run the upload. Defaults
3130 to the client's project.
3131
3132 Returns:
3133 Tuple:
3134 Pair of
3135
3136 * The :class:`~google.resumable_media.requests.ResumableUpload`
3137 that was created
3138 * The ``transport`` used to initiate the upload.
3139 """
3140 chunk_size = _DEFAULT_CHUNKSIZE
3141 transport = self._http
3142 headers = _get_upload_headers(self._connection.user_agent)
3143
3144 if project is None:
3145 project = self.project
3146 # TODO: Increase the minimum version of google-cloud-core to 1.6.0
3147 # and remove this logic. See:
3148 # https://github.com/googleapis/python-bigquery/issues/509
3149 hostname = (
3150 self._connection.API_BASE_URL
3151 if not hasattr(self._connection, "get_api_base_url_for_mtls")
3152 else self._connection.get_api_base_url_for_mtls()
3153 )
3154 upload_url = _RESUMABLE_URL_TEMPLATE.format(host=hostname, project=project)
3155
3156 # TODO: modify ResumableUpload to take a retry.Retry object
3157 # that it can use for the initial RPC.
3158 upload = ResumableUpload(upload_url, chunk_size, headers=headers)
3159
3160 if num_retries is not None:
3161 upload._retry_strategy = resumable_media.RetryStrategy(
3162 max_retries=num_retries
3163 )
3164
3165 upload.initiate(
3166 transport,
3167 stream,
3168 metadata,
3169 _GENERIC_CONTENT_TYPE,
3170 stream_final=False,
3171 timeout=timeout,
3172 )
3173
3174 return upload, transport
3175
3176 def _do_multipart_upload(
3177 self,
3178 stream: IO[bytes],
3179 metadata: Mapping[str, str],
3180 size: int,
3181 num_retries: int,
3182 timeout: Optional[ResumableTimeoutType],
3183 project: Optional[str] = None,
3184 ):
3185 """Perform a multipart upload.
3186
3187 Args:
3188 stream (IO[bytes]): A bytes IO object open for reading.
3189 metadata (Mapping[str, str]): The metadata associated with the upload.
3190 size (int):
3191 The number of bytes to be uploaded (which will be read
3192 from ``stream``). If not provided, the upload will be
3193 concluded once ``stream`` is exhausted (or :data:`None`).
3194 num_retries (int):
3195 Number of upload retries. (Deprecated: This
3196 argument will be removed in a future release.)
3197 timeout (Optional[float]):
3198 The number of seconds to wait for the underlying HTTP transport
3199 before using ``retry``. Depending on the retry strategy, a request may
3200 be repeated several times using the same timeout each time.
3201
3202 Can also be passed as a tuple (connect_timeout, read_timeout).
3203 See :meth:`requests.Session.request` documentation for details.
3204 project (Optional[str]):
3205 Project ID of the project of where to run the upload. Defaults
3206 to the client's project.
3207
3208 Returns:
3209 requests.Response:
3210 The "200 OK" response object returned after the multipart
3211 upload request.
3212
3213 Raises:
3214 ValueError:
3215 if the ``stream`` has fewer than ``size``
3216 bytes remaining.
3217 """
3218 data = stream.read(size)
3219 if len(data) < size:
3220 msg = _READ_LESS_THAN_SIZE.format(size, len(data))
3221 raise ValueError(msg)
3222
3223 headers = _get_upload_headers(self._connection.user_agent)
3224
3225 if project is None:
3226 project = self.project
3227
3228 # TODO: Increase the minimum version of google-cloud-core to 1.6.0
3229 # and remove this logic. See:
3230 # https://github.com/googleapis/python-bigquery/issues/509
3231 hostname = (
3232 self._connection.API_BASE_URL
3233 if not hasattr(self._connection, "get_api_base_url_for_mtls")
3234 else self._connection.get_api_base_url_for_mtls()
3235 )
3236 upload_url = _MULTIPART_URL_TEMPLATE.format(host=hostname, project=project)
3237 upload = MultipartUpload(upload_url, headers=headers)
3238
3239 if num_retries is not None:
3240 upload._retry_strategy = resumable_media.RetryStrategy(
3241 max_retries=num_retries
3242 )
3243
3244 response = upload.transmit(
3245 self._http, data, metadata, _GENERIC_CONTENT_TYPE, timeout=timeout
3246 )
3247
3248 return response
3249
3250 def copy_table(
3251 self,
3252 sources: Union[
3253 Table,
3254 TableReference,
3255 TableListItem,
3256 str,
3257 Sequence[Union[Table, TableReference, TableListItem, str]],
3258 ],
3259 destination: Union[Table, TableReference, TableListItem, str],
3260 job_id: Optional[str] = None,
3261 job_id_prefix: Optional[str] = None,
3262 location: Optional[str] = None,
3263 project: Optional[str] = None,
3264 job_config: Optional[CopyJobConfig] = None,
3265 retry: retries.Retry = DEFAULT_RETRY,
3266 timeout: TimeoutType = DEFAULT_TIMEOUT,
3267 ) -> job.CopyJob:
3268 """Copy one or more tables to another table.
3269
3270 See
3271 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationtablecopy
3272
3273 Args:
3274 sources (Union[ \
3275 google.cloud.bigquery.table.Table, \
3276 google.cloud.bigquery.table.TableReference, \
3277 google.cloud.bigquery.table.TableListItem, \
3278 str, \
3279 Sequence[ \
3280 Union[ \
3281 google.cloud.bigquery.table.Table, \
3282 google.cloud.bigquery.table.TableReference, \
3283 google.cloud.bigquery.table.TableListItem, \
3284 str, \
3285 ] \
3286 ], \
3287 ]):
3288 Table or tables to be copied.
3289 destination (Union[ \
3290 google.cloud.bigquery.table.Table, \
3291 google.cloud.bigquery.table.TableReference, \
3292 google.cloud.bigquery.table.TableListItem, \
3293 str, \
3294 ]):
3295 Table into which data is to be copied.
3296 job_id (Optional[str]): The ID of the job.
3297 job_id_prefix (Optional[str]):
3298 The user-provided prefix for a randomly generated job ID.
3299 This parameter will be ignored if a ``job_id`` is also given.
3300 location (Optional[str]):
3301 Location where to run the job. Must match the location of any
3302 source table as well as the destination table.
3303 project (Optional[str]):
3304 Project ID of the project of where to run the job. Defaults
3305 to the client's project.
3306 job_config (Optional[google.cloud.bigquery.job.CopyJobConfig]):
3307 Extra configuration options for the job.
3308 retry (Optional[google.api_core.retry.Retry]):
3309 How to retry the RPC.
3310 timeout (Optional[float]):
3311 The number of seconds to wait for the underlying HTTP transport
3312 before using ``retry``.
3313
3314 Returns:
3315 google.cloud.bigquery.job.CopyJob: A new copy job instance.
3316
3317 Raises:
3318 TypeError:
3319 If ``job_config`` is not an instance of :class:`~google.cloud.bigquery.job.CopyJobConfig`
3320 class.
3321 """
3322 job_id = _make_job_id(job_id, job_id_prefix)
3323
3324 if project is None:
3325 project = self.project
3326
3327 if location is None:
3328 location = self.location
3329
3330 job_ref = job._JobReference(job_id, project=project, location=location)
3331
3332 # sources can be one of many different input types. (string, Table,
3333 # TableReference, or a sequence of any of those.) Convert them all to a
3334 # list of TableReferences.
3335 #
3336 # _table_arg_to_table_ref leaves lists unmodified.
3337 sources = _table_arg_to_table_ref(sources, default_project=self.project)
3338
3339 if not isinstance(sources, collections_abc.Sequence):
3340 sources = [sources]
3341
3342 sources = [
3343 _table_arg_to_table_ref(source, default_project=self.project)
3344 for source in sources
3345 ]
3346
3347 destination = _table_arg_to_table_ref(destination, default_project=self.project)
3348
3349 if job_config:
3350 _verify_job_config_type(job_config, google.cloud.bigquery.job.CopyJobConfig)
3351 job_config = copy.deepcopy(job_config)
3352
3353 copy_job = job.CopyJob(
3354 job_ref, sources, destination, client=self, job_config=job_config
3355 )
3356 copy_job._begin(retry=retry, timeout=timeout)
3357
3358 return copy_job
3359
3360 def extract_table(
3361 self,
3362 source: Union[Table, TableReference, TableListItem, Model, ModelReference, str],
3363 destination_uris: Union[str, Sequence[str]],
3364 job_id: Optional[str] = None,
3365 job_id_prefix: Optional[str] = None,
3366 location: Optional[str] = None,
3367 project: Optional[str] = None,
3368 job_config: Optional[ExtractJobConfig] = None,
3369 retry: retries.Retry = DEFAULT_RETRY,
3370 timeout: TimeoutType = DEFAULT_TIMEOUT,
3371 source_type: str = "Table",
3372 ) -> job.ExtractJob:
3373 """Start a job to extract a table into Cloud Storage files.
3374
3375 See
3376 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationextract
3377
3378 Args:
3379 source (Union[ \
3380 google.cloud.bigquery.table.Table, \
3381 google.cloud.bigquery.table.TableReference, \
3382 google.cloud.bigquery.table.TableListItem, \
3383 google.cloud.bigquery.model.Model, \
3384 google.cloud.bigquery.model.ModelReference, \
3385 src, \
3386 ]):
3387 Table or Model to be extracted.
3388 destination_uris (Union[str, Sequence[str]]):
3389 URIs of Cloud Storage file(s) into which table data is to be
3390 extracted; in format
3391 ``gs://<bucket_name>/<object_name_or_glob>``.
3392 job_id (Optional[str]): The ID of the job.
3393 job_id_prefix (Optional[str]):
3394 The user-provided prefix for a randomly generated job ID.
3395 This parameter will be ignored if a ``job_id`` is also given.
3396 location (Optional[str]):
3397 Location where to run the job. Must match the location of the
3398 source table.
3399 project (Optional[str]):
3400 Project ID of the project of where to run the job. Defaults
3401 to the client's project.
3402 job_config (Optional[google.cloud.bigquery.job.ExtractJobConfig]):
3403 Extra configuration options for the job.
3404 retry (Optional[google.api_core.retry.Retry]):
3405 How to retry the RPC.
3406 timeout (Optional[float]):
3407 The number of seconds to wait for the underlying HTTP transport
3408 before using ``retry``.
3409 source_type (Optional[str]):
3410 Type of source to be extracted.``Table`` or ``Model``. Defaults to ``Table``.
3411 Returns:
3412 google.cloud.bigquery.job.ExtractJob: A new extract job instance.
3413
3414 Raises:
3415 TypeError:
3416 If ``job_config`` is not an instance of :class:`~google.cloud.bigquery.job.ExtractJobConfig`
3417 class.
3418 ValueError:
3419 If ``source_type`` is not among ``Table``,``Model``.
3420 """
3421 job_id = _make_job_id(job_id, job_id_prefix)
3422
3423 if project is None:
3424 project = self.project
3425
3426 if location is None:
3427 location = self.location
3428
3429 job_ref = job._JobReference(job_id, project=project, location=location)
3430 src = source_type.lower()
3431 if src == "table":
3432 source = _table_arg_to_table_ref(source, default_project=self.project)
3433 elif src == "model":
3434 source = _model_arg_to_model_ref(source, default_project=self.project)
3435 else:
3436 raise ValueError(
3437 "Cannot pass `{}` as a ``source_type``, pass Table or Model".format(
3438 source_type
3439 )
3440 )
3441
3442 if isinstance(destination_uris, str):
3443 destination_uris = [destination_uris]
3444
3445 if job_config:
3446 _verify_job_config_type(
3447 job_config, google.cloud.bigquery.job.ExtractJobConfig
3448 )
3449 job_config = copy.deepcopy(job_config)
3450
3451 extract_job = job.ExtractJob(
3452 job_ref, source, destination_uris, client=self, job_config=job_config
3453 )
3454 extract_job._begin(retry=retry, timeout=timeout)
3455
3456 return extract_job
3457
3458 def query(
3459 self,
3460 query: str,
3461 job_config: Optional[QueryJobConfig] = None,
3462 job_id: Optional[str] = None,
3463 job_id_prefix: Optional[str] = None,
3464 location: Optional[str] = None,
3465 project: Optional[str] = None,
3466 retry: retries.Retry = DEFAULT_RETRY,
3467 timeout: TimeoutType = DEFAULT_TIMEOUT,
3468 job_retry: Optional[retries.Retry] = DEFAULT_JOB_RETRY,
3469 api_method: Union[str, enums.QueryApiMethod] = enums.QueryApiMethod.INSERT,
3470 ) -> job.QueryJob:
3471 """Run a SQL query.
3472
3473 See
3474 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationquery
3475
3476 Args:
3477 query (str):
3478 SQL query to be executed. Defaults to the standard SQL
3479 dialect. Use the ``job_config`` parameter to change dialects.
3480 job_config (Optional[google.cloud.bigquery.job.QueryJobConfig]):
3481 Extra configuration options for the job.
3482 To override any options that were previously set in
3483 the ``default_query_job_config`` given to the
3484 ``Client`` constructor, manually set those options to ``None``,
3485 or whatever value is preferred.
3486 job_id (Optional[str]): ID to use for the query job.
3487 job_id_prefix (Optional[str]):
3488 The prefix to use for a randomly generated job ID. This parameter
3489 will be ignored if a ``job_id`` is also given.
3490 location (Optional[str]):
3491 Location where to run the job. Must match the location of the
3492 table used in the query as well as the destination table.
3493 project (Optional[str]):
3494 Project ID of the project of where to run the job. Defaults
3495 to the client's project.
3496 retry (Optional[google.api_core.retry.Retry]):
3497 How to retry the RPC. This only applies to making RPC
3498 calls. It isn't used to retry failed jobs. This has
3499 a reasonable default that should only be overridden
3500 with care.
3501 timeout (Optional[float]):
3502 The number of seconds to wait for the underlying HTTP transport
3503 before using ``retry``.
3504 job_retry (Optional[google.api_core.retry.Retry]):
3505 How to retry failed jobs. The default retries
3506 rate-limit-exceeded errors. Passing ``None`` disables
3507 job retry.
3508
3509 Not all jobs can be retried. If ``job_id`` is
3510 provided, then the job returned by the query will not
3511 be retryable, and an exception will be raised if a
3512 non-``None`` (and non-default) value for ``job_retry``
3513 is also provided.
3514
3515 Note that errors aren't detected until ``result()`` is
3516 called on the job returned. The ``job_retry``
3517 specified here becomes the default ``job_retry`` for
3518 ``result()``, where it can also be specified.
3519 api_method (Union[str, enums.QueryApiMethod]):
3520 Method with which to start the query job.
3521
3522 See :class:`google.cloud.bigquery.enums.QueryApiMethod` for
3523 details on the difference between the query start methods.
3524
3525 Returns:
3526 google.cloud.bigquery.job.QueryJob: A new query job instance.
3527
3528 Raises:
3529 TypeError:
3530 If ``job_config`` is not an instance of
3531 :class:`~google.cloud.bigquery.job.QueryJobConfig`
3532 class, or if both ``job_id`` and non-``None`` non-default
3533 ``job_retry`` are provided.
3534 """
3535 _job_helpers.validate_job_retry(job_id, job_retry)
3536
3537 job_id_given = job_id is not None
3538 if job_id_given and api_method == enums.QueryApiMethod.QUERY:
3539 raise TypeError(
3540 "`job_id` was provided, but the 'QUERY' `api_method` was requested."
3541 )
3542
3543 if project is None:
3544 project = self.project
3545
3546 if location is None:
3547 location = self.location
3548
3549 if job_config is not None:
3550 _verify_job_config_type(job_config, QueryJobConfig)
3551
3552 job_config = _job_helpers.job_config_with_defaults(
3553 job_config, self._default_query_job_config
3554 )
3555
3556 # Note that we haven't modified the original job_config (or
3557 # _default_query_job_config) up to this point.
3558 if api_method == enums.QueryApiMethod.QUERY:
3559 return _job_helpers.query_jobs_query(
3560 self,
3561 query,
3562 job_config,
3563 location,
3564 project,
3565 retry,
3566 timeout,
3567 job_retry,
3568 )
3569 elif api_method == enums.QueryApiMethod.INSERT:
3570 return _job_helpers.query_jobs_insert(
3571 self,
3572 query,
3573 job_config,
3574 job_id,
3575 job_id_prefix,
3576 location,
3577 project,
3578 retry,
3579 timeout,
3580 job_retry,
3581 )
3582 else:
3583 raise ValueError(f"Got unexpected value for api_method: {repr(api_method)}")
3584
3585 def query_and_wait(
3586 self,
3587 query,
3588 *,
3589 job_config: Optional[QueryJobConfig] = None,
3590 location: Optional[str] = None,
3591 project: Optional[str] = None,
3592 api_timeout: TimeoutType = DEFAULT_TIMEOUT,
3593 wait_timeout: Union[Optional[float], object] = POLLING_DEFAULT_VALUE,
3594 retry: retries.Retry = DEFAULT_RETRY,
3595 job_retry: retries.Retry = DEFAULT_JOB_RETRY,
3596 page_size: Optional[int] = None,
3597 max_results: Optional[int] = None,
3598 ) -> RowIterator:
3599 """Run the query, wait for it to finish, and return the results.
3600
3601 Args:
3602 query (str):
3603 SQL query to be executed. Defaults to the standard SQL
3604 dialect. Use the ``job_config`` parameter to change dialects.
3605 job_config (Optional[google.cloud.bigquery.job.QueryJobConfig]):
3606 Extra configuration options for the job.
3607 To override any options that were previously set in
3608 the ``default_query_job_config`` given to the
3609 ``Client`` constructor, manually set those options to ``None``,
3610 or whatever value is preferred.
3611 location (Optional[str]):
3612 Location where to run the job. Must match the location of the
3613 table used in the query as well as the destination table.
3614 project (Optional[str]):
3615 Project ID of the project of where to run the job. Defaults
3616 to the client's project.
3617 api_timeout (Optional[float]):
3618 The number of seconds to wait for the underlying HTTP transport
3619 before using ``retry``.
3620 wait_timeout (Optional[Union[float, object]]):
3621 The number of seconds to wait for the query to finish. If the
3622 query doesn't finish before this timeout, the client attempts
3623 to cancel the query. If unset, the underlying REST API calls
3624 have timeouts, but we still wait indefinitely for the job to
3625 finish.
3626 retry (Optional[google.api_core.retry.Retry]):
3627 How to retry the RPC. This only applies to making RPC
3628 calls. It isn't used to retry failed jobs. This has
3629 a reasonable default that should only be overridden
3630 with care.
3631 job_retry (Optional[google.api_core.retry.Retry]):
3632 How to retry failed jobs. The default retries
3633 rate-limit-exceeded errors. Passing ``None`` disables
3634 job retry. Not all jobs can be retried.
3635 page_size (Optional[int]):
3636 The maximum number of rows in each page of results from this
3637 request. Non-positive values are ignored.
3638 max_results (Optional[int]):
3639 The maximum total number of rows from this request.
3640
3641 Returns:
3642 google.cloud.bigquery.table.RowIterator:
3643 Iterator of row data
3644 :class:`~google.cloud.bigquery.table.Row`-s. During each
3645 page, the iterator will have the ``total_rows`` attribute
3646 set, which counts the total number of rows **in the result
3647 set** (this is distinct from the total number of rows in the
3648 current page: ``iterator.page.num_items``).
3649
3650 If the query is a special query that produces no results, e.g.
3651 a DDL query, an ``_EmptyRowIterator`` instance is returned.
3652
3653 Raises:
3654 TypeError:
3655 If ``job_config`` is not an instance of
3656 :class:`~google.cloud.bigquery.job.QueryJobConfig`
3657 class.
3658 """
3659 if project is None:
3660 project = self.project
3661
3662 if location is None:
3663 location = self.location
3664
3665 if job_config is not None:
3666 _verify_job_config_type(job_config, QueryJobConfig)
3667
3668 job_config = _job_helpers.job_config_with_defaults(
3669 job_config, self._default_query_job_config
3670 )
3671
3672 return _job_helpers.query_and_wait(
3673 self,
3674 query,
3675 job_config=job_config,
3676 location=location,
3677 project=project,
3678 api_timeout=api_timeout,
3679 wait_timeout=wait_timeout,
3680 retry=retry,
3681 job_retry=job_retry,
3682 page_size=page_size,
3683 max_results=max_results,
3684 )
3685
3686 def insert_rows(
3687 self,
3688 table: Union[Table, TableReference, str],
3689 rows: Union[Iterable[Tuple], Iterable[Mapping[str, Any]]],
3690 selected_fields: Optional[Sequence[SchemaField]] = None,
3691 **kwargs,
3692 ) -> Sequence[Dict[str, Any]]:
3693 """Insert rows into a table via the streaming API.
3694
3695 See
3696 https://cloud.google.com/bigquery/docs/reference/rest/v2/tabledata/insertAll
3697
3698 BigQuery will reject insertAll payloads that exceed a defined limit (10MB).
3699 Additionally, if a payload vastly exceeds this limit, the request is rejected
3700 by the intermediate architecture, which returns a 413 (Payload Too Large) status code.
3701
3702
3703 See
3704 https://cloud.google.com/bigquery/quotas#streaming_inserts
3705
3706 Args:
3707 table (Union[ \
3708 google.cloud.bigquery.table.Table, \
3709 google.cloud.bigquery.table.TableReference, \
3710 str, \
3711 ]):
3712 The destination table for the row data, or a reference to it.
3713 rows (Union[Sequence[Tuple], Sequence[Dict]]):
3714 Row data to be inserted. If a list of tuples is given, each
3715 tuple should contain data for each schema field on the
3716 current table and in the same order as the schema fields. If
3717 a list of dictionaries is given, the keys must include all
3718 required fields in the schema. Keys which do not correspond
3719 to a field in the schema are ignored.
3720 selected_fields (Sequence[google.cloud.bigquery.schema.SchemaField]):
3721 The fields to return. Required if ``table`` is a
3722 :class:`~google.cloud.bigquery.table.TableReference`.
3723 kwargs (dict):
3724 Keyword arguments to
3725 :meth:`~google.cloud.bigquery.client.Client.insert_rows_json`.
3726
3727 Returns:
3728 Sequence[Mappings]:
3729 One mapping per row with insert errors: the "index" key
3730 identifies the row, and the "errors" key contains a list of
3731 the mappings describing one or more problems with the row.
3732
3733 Raises:
3734 ValueError: if table's schema is not set or `rows` is not a `Sequence`.
3735 """
3736 if not isinstance(rows, (collections_abc.Sequence, collections_abc.Iterator)):
3737 raise TypeError("rows argument should be a sequence of dicts or tuples")
3738
3739 table = _table_arg_to_table(table, default_project=self.project)
3740
3741 if not isinstance(table, Table):
3742 raise TypeError(_NEED_TABLE_ARGUMENT)
3743
3744 schema = table.schema
3745
3746 # selected_fields can override the table schema.
3747 if selected_fields is not None:
3748 schema = selected_fields
3749
3750 if len(schema) == 0:
3751 raise ValueError(
3752 (
3753 "Could not determine schema for table '{}'. Call client.get_table() "
3754 "or pass in a list of schema fields to the selected_fields argument."
3755 ).format(table)
3756 )
3757
3758 json_rows = [_record_field_to_json(schema, row) for row in rows]
3759
3760 return self.insert_rows_json(table, json_rows, **kwargs)
3761
3762 def insert_rows_from_dataframe(
3763 self,
3764 table: Union[Table, TableReference, str],
3765 dataframe,
3766 selected_fields: Optional[Sequence[SchemaField]] = None,
3767 chunk_size: int = 500,
3768 **kwargs: Dict,
3769 ) -> Sequence[Sequence[dict]]:
3770 """Insert rows into a table from a dataframe via the streaming API.
3771
3772 BigQuery will reject insertAll payloads that exceed a defined limit (10MB).
3773 Additionally, if a payload vastly exceeds this limit, the request is rejected
3774 by the intermediate architecture, which returns a 413 (Payload Too Large) status code.
3775
3776 See
3777 https://cloud.google.com/bigquery/quotas#streaming_inserts
3778
3779 Args:
3780 table (Union[ \
3781 google.cloud.bigquery.table.Table, \
3782 google.cloud.bigquery.table.TableReference, \
3783 str, \
3784 ]):
3785 The destination table for the row data, or a reference to it.
3786 dataframe (pandas.DataFrame):
3787 A :class:`~pandas.DataFrame` containing the data to load. Any
3788 ``NaN`` values present in the dataframe are omitted from the
3789 streaming API request(s).
3790 selected_fields (Sequence[google.cloud.bigquery.schema.SchemaField]):
3791 The fields to return. Required if ``table`` is a
3792 :class:`~google.cloud.bigquery.table.TableReference`.
3793 chunk_size (int):
3794 The number of rows to stream in a single chunk. Must be positive.
3795 kwargs (Dict):
3796 Keyword arguments to
3797 :meth:`~google.cloud.bigquery.client.Client.insert_rows_json`.
3798
3799 Returns:
3800 Sequence[Sequence[Mappings]]:
3801 A list with insert errors for each insert chunk. Each element
3802 is a list containing one mapping per row with insert errors:
3803 the "index" key identifies the row, and the "errors" key
3804 contains a list of the mappings describing one or more problems
3805 with the row.
3806
3807 Raises:
3808 ValueError: if table's schema is not set
3809 """
3810 insert_results = []
3811
3812 chunk_count = int(math.ceil(len(dataframe) / chunk_size))
3813 rows_iter = _pandas_helpers.dataframe_to_json_generator(dataframe)
3814
3815 for _ in range(chunk_count):
3816 rows_chunk = itertools.islice(rows_iter, chunk_size)
3817 result = self.insert_rows(table, rows_chunk, selected_fields, **kwargs)
3818 insert_results.append(result)
3819
3820 return insert_results
3821
3822 def insert_rows_json(
3823 self,
3824 table: Union[Table, TableReference, TableListItem, str],
3825 json_rows: Sequence[Mapping[str, Any]],
3826 row_ids: Union[
3827 Iterable[Optional[str]], AutoRowIDs, None
3828 ] = AutoRowIDs.GENERATE_UUID,
3829 skip_invalid_rows: Optional[bool] = None,
3830 ignore_unknown_values: Optional[bool] = None,
3831 template_suffix: Optional[str] = None,
3832 retry: retries.Retry = DEFAULT_RETRY,
3833 timeout: TimeoutType = DEFAULT_TIMEOUT,
3834 ) -> Sequence[dict]:
3835 """Insert rows into a table without applying local type conversions.
3836
3837 See
3838 https://cloud.google.com/bigquery/docs/reference/rest/v2/tabledata/insertAll
3839
3840 BigQuery will reject insertAll payloads that exceed a defined limit (10MB).
3841 Additionally, if a payload vastly exceeds this limit, the request is rejected
3842 by the intermediate architecture, which returns a 413 (Payload Too Large) status code.
3843
3844 See
3845 https://cloud.google.com/bigquery/quotas#streaming_inserts
3846
3847 Args:
3848 table (Union[ \
3849 google.cloud.bigquery.table.Table \
3850 google.cloud.bigquery.table.TableReference, \
3851 google.cloud.bigquery.table.TableListItem, \
3852 str \
3853 ]):
3854 The destination table for the row data, or a reference to it.
3855 json_rows (Sequence[Dict]):
3856 Row data to be inserted. Keys must match the table schema fields
3857 and values must be JSON-compatible representations.
3858 row_ids (Union[Iterable[str], AutoRowIDs, None]):
3859 Unique IDs, one per row being inserted. An ID can also be
3860 ``None``, indicating that an explicit insert ID should **not**
3861 be used for that row. If the argument is omitted altogether,
3862 unique IDs are created automatically.
3863
3864 .. versionchanged:: 2.21.0
3865 Can also be an iterable, not just a sequence, or an
3866 :class:`AutoRowIDs` enum member.
3867
3868 .. deprecated:: 2.21.0
3869 Passing ``None`` to explicitly request autogenerating insert IDs is
3870 deprecated, use :attr:`AutoRowIDs.GENERATE_UUID` instead.
3871
3872 skip_invalid_rows (Optional[bool]):
3873 Insert all valid rows of a request, even if invalid rows exist.
3874 The default value is ``False``, which causes the entire request
3875 to fail if any invalid rows exist.
3876 ignore_unknown_values (Optional[bool]):
3877 Accept rows that contain values that do not match the schema.
3878 The unknown values are ignored. Default is ``False``, which
3879 treats unknown values as errors.
3880 template_suffix (Optional[str]):
3881 Treat ``name`` as a template table and provide a suffix.
3882 BigQuery will create the table ``<name> + <template_suffix>``
3883 based on the schema of the template table. See
3884 https://cloud.google.com/bigquery/streaming-data-into-bigquery#template-tables
3885 retry (Optional[google.api_core.retry.Retry]):
3886 How to retry the RPC.
3887 timeout (Optional[float]):
3888 The number of seconds to wait for the underlying HTTP transport
3889 before using ``retry``.
3890
3891 Returns:
3892 Sequence[Mappings]:
3893 One mapping per row with insert errors: the "index" key
3894 identifies the row, and the "errors" key contains a list of
3895 the mappings describing one or more problems with the row.
3896
3897 Raises:
3898 TypeError: if `json_rows` is not a `Sequence`.
3899 """
3900 if not isinstance(
3901 json_rows, (collections_abc.Sequence, collections_abc.Iterator)
3902 ):
3903 raise TypeError("json_rows argument should be a sequence of dicts")
3904 # Convert table to just a reference because unlike insert_rows,
3905 # insert_rows_json doesn't need the table schema. It's not doing any
3906 # type conversions.
3907 table = _table_arg_to_table_ref(table, default_project=self.project)
3908 rows_info: List[Any] = []
3909 data: Dict[str, Any] = {"rows": rows_info}
3910
3911 if row_ids is None:
3912 warnings.warn(
3913 "Passing None for row_ids is deprecated. To explicitly request "
3914 "autogenerated insert IDs, use AutoRowIDs.GENERATE_UUID instead",
3915 category=DeprecationWarning,
3916 )
3917 row_ids = AutoRowIDs.GENERATE_UUID
3918
3919 if not isinstance(row_ids, AutoRowIDs):
3920 try:
3921 row_ids_iter = iter(row_ids)
3922 except TypeError:
3923 msg = "row_ids is neither an iterable nor an AutoRowIDs enum member"
3924 raise TypeError(msg)
3925
3926 for i, row in enumerate(json_rows):
3927 info: Dict[str, Any] = {"json": row}
3928
3929 if row_ids is AutoRowIDs.GENERATE_UUID:
3930 info["insertId"] = str(uuid.uuid4())
3931 elif row_ids is AutoRowIDs.DISABLED:
3932 info["insertId"] = None
3933 else:
3934 try:
3935 insert_id = next(row_ids_iter)
3936 except StopIteration:
3937 msg = f"row_ids did not generate enough IDs, error at index {i}"
3938 raise ValueError(msg)
3939 else:
3940 info["insertId"] = insert_id
3941
3942 rows_info.append(info)
3943
3944 if skip_invalid_rows is not None:
3945 data["skipInvalidRows"] = skip_invalid_rows
3946
3947 if ignore_unknown_values is not None:
3948 data["ignoreUnknownValues"] = ignore_unknown_values
3949
3950 if template_suffix is not None:
3951 data["templateSuffix"] = template_suffix
3952
3953 path = "%s/insertAll" % table.path
3954 # We can always retry, because every row has an insert ID.
3955 span_attributes = {"path": path}
3956 response = self._call_api(
3957 retry,
3958 span_name="BigQuery.insertRowsJson",
3959 span_attributes=span_attributes,
3960 method="POST",
3961 path=path,
3962 data=data,
3963 timeout=timeout,
3964 )
3965 errors = []
3966
3967 for error in response.get("insertErrors", ()):
3968 errors.append({"index": int(error["index"]), "errors": error["errors"]})
3969
3970 return errors
3971
3972 def list_partitions(
3973 self,
3974 table: Union[Table, TableReference, TableListItem, str],
3975 retry: retries.Retry = DEFAULT_RETRY,
3976 timeout: TimeoutType = DEFAULT_TIMEOUT,
3977 ) -> Sequence[str]:
3978 """List the partitions in a table.
3979
3980 Args:
3981 table (Union[ \
3982 google.cloud.bigquery.table.Table, \
3983 google.cloud.bigquery.table.TableReference, \
3984 google.cloud.bigquery.table.TableListItem, \
3985 str, \
3986 ]):
3987 The table or reference from which to get partition info
3988 retry (Optional[google.api_core.retry.Retry]):
3989 How to retry the RPC.
3990 timeout (Optional[float]):
3991 The number of seconds to wait for the underlying HTTP transport
3992 before using ``retry``.
3993 If multiple requests are made under the hood, ``timeout``
3994 applies to each individual request.
3995
3996 Returns:
3997 List[str]:
3998 A list of the partition ids present in the partitioned table
3999 """
4000 table = _table_arg_to_table_ref(table, default_project=self.project)
4001 meta_table = self.get_table(
4002 TableReference(
4003 DatasetReference(table.project, table.dataset_id),
4004 "%s$__PARTITIONS_SUMMARY__" % table.table_id,
4005 ),
4006 retry=retry,
4007 timeout=timeout,
4008 )
4009
4010 subset = [col for col in meta_table.schema if col.name == "partition_id"]
4011 return [
4012 row[0]
4013 for row in self.list_rows(
4014 meta_table, selected_fields=subset, retry=retry, timeout=timeout
4015 )
4016 ]
4017
4018 def list_rows(
4019 self,
4020 table: Union[Table, TableListItem, TableReference, str],
4021 selected_fields: Optional[Sequence[SchemaField]] = None,
4022 max_results: Optional[int] = None,
4023 page_token: Optional[str] = None,
4024 start_index: Optional[int] = None,
4025 page_size: Optional[int] = None,
4026 retry: retries.Retry = DEFAULT_RETRY,
4027 timeout: TimeoutType = DEFAULT_TIMEOUT,
4028 ) -> RowIterator:
4029 """List the rows of the table.
4030
4031 See
4032 https://cloud.google.com/bigquery/docs/reference/rest/v2/tabledata/list
4033
4034 .. note::
4035
4036 This method assumes that the provided schema is up-to-date with the
4037 schema as defined on the back-end: if the two schemas are not
4038 identical, the values returned may be incomplete. To ensure that the
4039 local copy of the schema is up-to-date, call ``client.get_table``.
4040
4041 Args:
4042 table (Union[ \
4043 google.cloud.bigquery.table.Table, \
4044 google.cloud.bigquery.table.TableListItem, \
4045 google.cloud.bigquery.table.TableReference, \
4046 str, \
4047 ]):
4048 The table to list, or a reference to it. When the table
4049 object does not contain a schema and ``selected_fields`` is
4050 not supplied, this method calls ``get_table`` to fetch the
4051 table schema.
4052 selected_fields (Sequence[google.cloud.bigquery.schema.SchemaField]):
4053 The fields to return. If not supplied, data for all columns
4054 are downloaded.
4055 max_results (Optional[int]):
4056 Maximum number of rows to return.
4057 page_token (Optional[str]):
4058 Token representing a cursor into the table's rows.
4059 If not passed, the API will return the first page of the
4060 rows. The token marks the beginning of the iterator to be
4061 returned and the value of the ``page_token`` can be accessed
4062 at ``next_page_token`` of the
4063 :class:`~google.cloud.bigquery.table.RowIterator`.
4064 start_index (Optional[int]):
4065 The zero-based index of the starting row to read.
4066 page_size (Optional[int]):
4067 The maximum number of rows in each page of results from this request.
4068 Non-positive values are ignored. Defaults to a sensible value set by the API.
4069 retry (Optional[google.api_core.retry.Retry]):
4070 How to retry the RPC.
4071 timeout (Optional[float]):
4072 The number of seconds to wait for the underlying HTTP transport
4073 before using ``retry``.
4074 If multiple requests are made under the hood, ``timeout``
4075 applies to each individual request.
4076
4077 Returns:
4078 google.cloud.bigquery.table.RowIterator:
4079 Iterator of row data
4080 :class:`~google.cloud.bigquery.table.Row`-s. During each
4081 page, the iterator will have the ``total_rows`` attribute
4082 set, which counts the total number of rows **in the table**
4083 (this is distinct from the total number of rows in the
4084 current page: ``iterator.page.num_items``).
4085 """
4086 table = _table_arg_to_table(table, default_project=self.project)
4087
4088 if not isinstance(table, Table):
4089 raise TypeError(_NEED_TABLE_ARGUMENT)
4090
4091 schema = table.schema
4092
4093 # selected_fields can override the table schema.
4094 if selected_fields is not None:
4095 schema = selected_fields
4096
4097 # No schema, but no selected_fields. Assume the developer wants all
4098 # columns, so get the table resource for them rather than failing.
4099 elif len(schema) == 0:
4100 table = self.get_table(table.reference, retry=retry, timeout=timeout)
4101 schema = table.schema
4102
4103 params: Dict[str, Any] = {}
4104 if selected_fields is not None:
4105 params["selectedFields"] = ",".join(field.name for field in selected_fields)
4106 if start_index is not None:
4107 params["startIndex"] = start_index
4108
4109 params["formatOptions.useInt64Timestamp"] = True
4110 row_iterator = RowIterator(
4111 client=self,
4112 api_request=functools.partial(self._call_api, retry, timeout=timeout),
4113 path="%s/data" % (table.path,),
4114 schema=schema,
4115 page_token=page_token,
4116 max_results=max_results,
4117 page_size=page_size,
4118 extra_params=params,
4119 table=table,
4120 # Pass in selected_fields separately from schema so that full
4121 # tables can be fetched without a column filter.
4122 selected_fields=selected_fields,
4123 total_rows=getattr(table, "num_rows", None),
4124 project=table.project,
4125 location=table.location,
4126 )
4127 return row_iterator
4128
4129 def _list_rows_from_query_results(
4130 self,
4131 job_id: str,
4132 location: str,
4133 project: str,
4134 schema: Sequence[SchemaField],
4135 total_rows: Optional[int] = None,
4136 destination: Optional[Union[Table, TableReference, TableListItem, str]] = None,
4137 max_results: Optional[int] = None,
4138 start_index: Optional[int] = None,
4139 page_size: Optional[int] = None,
4140 retry: retries.Retry = DEFAULT_RETRY,
4141 timeout: TimeoutType = DEFAULT_TIMEOUT,
4142 query_id: Optional[str] = None,
4143 first_page_response: Optional[Dict[str, Any]] = None,
4144 num_dml_affected_rows: Optional[int] = None,
4145 query: Optional[str] = None,
4146 total_bytes_processed: Optional[int] = None,
4147 ) -> RowIterator:
4148 """List the rows of a completed query.
4149 See
4150 https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/getQueryResults
4151 Args:
4152 job_id (str):
4153 ID of a query job.
4154 location (str): Location of the query job.
4155 project (str):
4156 ID of the project where the query job was run.
4157 schema (Sequence[google.cloud.bigquery.schema.SchemaField]):
4158 The fields expected in these query results. Used to convert
4159 from JSON to expected Python types.
4160 total_rows (Optional[int]):
4161 Total number of rows in the query results.
4162 destination (Optional[Union[ \
4163 google.cloud.bigquery.table.Table, \
4164 google.cloud.bigquery.table.TableListItem, \
4165 google.cloud.bigquery.table.TableReference, \
4166 str, \
4167 ]]):
4168 Destination table reference. Used to fetch the query results
4169 with the BigQuery Storage API.
4170 max_results (Optional[int]):
4171 Maximum number of rows to return across the whole iterator.
4172 start_index (Optional[int]):
4173 The zero-based index of the starting row to read.
4174 page_size (Optional[int]):
4175 The maximum number of rows in each page of results from this request.
4176 Non-positive values are ignored. Defaults to a sensible value set by the API.
4177 retry (Optional[google.api_core.retry.Retry]):
4178 How to retry the RPC.
4179 timeout (Optional[float]):
4180 The number of seconds to wait for the underlying HTTP transport
4181 before using ``retry``. If set, this connection timeout may be
4182 increased to a minimum value. This prevents retries on what
4183 would otherwise be a successful response.
4184 If multiple requests are made under the hood, ``timeout``
4185 applies to each individual request.
4186 query_id (Optional[str]):
4187 [Preview] ID of a completed query. This ID is auto-generated
4188 and not guaranteed to be populated.
4189 first_page_response (Optional[dict]):
4190 API response for the first page of results (if available).
4191 num_dml_affected_rows (Optional[int]):
4192 If this RowIterator is the result of a DML query, the number of
4193 rows that were affected.
4194 query (Optional[str]):
4195 The query text used.
4196 total_bytes_processed (Optional[int]):
4197 total bytes processed from job statistics, if present.
4198
4199 Returns:
4200 google.cloud.bigquery.table.RowIterator:
4201 Iterator of row data
4202 :class:`~google.cloud.bigquery.table.Row`-s.
4203 """
4204 params: Dict[str, Any] = {
4205 "fields": _LIST_ROWS_FROM_QUERY_RESULTS_FIELDS,
4206 "location": location,
4207 }
4208
4209 if timeout is not None:
4210 if not isinstance(timeout, (int, float)):
4211 timeout = _MIN_GET_QUERY_RESULTS_TIMEOUT
4212 else:
4213 timeout = max(timeout, _MIN_GET_QUERY_RESULTS_TIMEOUT)
4214
4215 if start_index is not None:
4216 params["startIndex"] = start_index
4217
4218 params["formatOptions.useInt64Timestamp"] = True
4219 row_iterator = RowIterator(
4220 client=self,
4221 api_request=functools.partial(self._call_api, retry, timeout=timeout),
4222 path=f"/projects/{project}/queries/{job_id}",
4223 schema=schema,
4224 max_results=max_results,
4225 page_size=page_size,
4226 table=destination,
4227 extra_params=params,
4228 total_rows=total_rows,
4229 project=project,
4230 location=location,
4231 job_id=job_id,
4232 query_id=query_id,
4233 first_page_response=first_page_response,
4234 num_dml_affected_rows=num_dml_affected_rows,
4235 query=query,
4236 total_bytes_processed=total_bytes_processed,
4237 )
4238 return row_iterator
4239
4240 def _schema_from_json_file_object(self, file_obj):
4241 """Helper function for schema_from_json that takes a
4242 file object that describes a table schema.
4243
4244 Returns:
4245 List of schema field objects.
4246 """
4247 json_data = json.load(file_obj)
4248 return [SchemaField.from_api_repr(field) for field in json_data]
4249
4250 def _schema_to_json_file_object(self, schema_list, file_obj):
4251 """Helper function for schema_to_json that takes a schema list and file
4252 object and writes the schema list to the file object with json.dump
4253 """
4254 json.dump(schema_list, file_obj, indent=2, sort_keys=True)
4255
4256 def schema_from_json(self, file_or_path: "PathType") -> List[SchemaField]:
4257 """Takes a file object or file path that contains json that describes
4258 a table schema.
4259
4260 Returns:
4261 List[SchemaField]:
4262 List of :class:`~google.cloud.bigquery.schema.SchemaField` objects.
4263 """
4264 if isinstance(file_or_path, io.IOBase):
4265 return self._schema_from_json_file_object(file_or_path)
4266
4267 with open(file_or_path) as file_obj:
4268 return self._schema_from_json_file_object(file_obj)
4269
4270 def schema_to_json(
4271 self, schema_list: Sequence[SchemaField], destination: "PathType"
4272 ):
4273 """Takes a list of schema field objects.
4274
4275 Serializes the list of schema field objects as json to a file.
4276
4277 Destination is a file path or a file object.
4278 """
4279 json_schema_list = [f.to_api_repr() for f in schema_list]
4280
4281 if isinstance(destination, io.IOBase):
4282 return self._schema_to_json_file_object(json_schema_list, destination)
4283
4284 with open(destination, mode="w") as file_obj:
4285 return self._schema_to_json_file_object(json_schema_list, file_obj)
4286
4287 def __enter__(self):
4288 return self
4289
4290 def __exit__(self, exc_type, exc_value, traceback):
4291 self.close()
4292
4293
4294# pylint: disable=unused-argument
4295def _item_to_project(iterator, resource):
4296 """Convert a JSON project to the native object.
4297
4298 Args:
4299 iterator (google.api_core.page_iterator.Iterator): The iterator that is currently in use.
4300
4301 resource (Dict): An item to be converted to a project.
4302
4303 Returns:
4304 google.cloud.bigquery.client.Project: The next project in the page.
4305 """
4306 return Project.from_api_repr(resource)
4307
4308
4309# pylint: enable=unused-argument
4310
4311
4312def _item_to_dataset(iterator, resource):
4313 """Convert a JSON dataset to the native object.
4314
4315 Args:
4316 iterator (google.api_core.page_iterator.Iterator): The iterator that is currently in use.
4317
4318 resource (Dict): An item to be converted to a dataset.
4319
4320 Returns:
4321 google.cloud.bigquery.dataset.DatasetListItem: The next dataset in the page.
4322 """
4323 return DatasetListItem(resource)
4324
4325
4326def _item_to_job(iterator, resource):
4327 """Convert a JSON job to the native object.
4328
4329 Args:
4330 iterator (google.api_core.page_iterator.Iterator): The iterator that is currently in use.
4331
4332 resource (Dict): An item to be converted to a job.
4333
4334 Returns:
4335 job instance: The next job in the page.
4336 """
4337 return iterator.client.job_from_resource(resource)
4338
4339
4340def _item_to_model(iterator, resource):
4341 """Convert a JSON model to the native object.
4342
4343 Args:
4344 iterator (google.api_core.page_iterator.Iterator):
4345 The iterator that is currently in use.
4346 resource (Dict): An item to be converted to a model.
4347
4348 Returns:
4349 google.cloud.bigquery.model.Model: The next model in the page.
4350 """
4351 return Model.from_api_repr(resource)
4352
4353
4354def _item_to_routine(iterator, resource):
4355 """Convert a JSON model to the native object.
4356
4357 Args:
4358 iterator (google.api_core.page_iterator.Iterator):
4359 The iterator that is currently in use.
4360 resource (Dict): An item to be converted to a routine.
4361
4362 Returns:
4363 google.cloud.bigquery.routine.Routine: The next routine in the page.
4364 """
4365 return Routine.from_api_repr(resource)
4366
4367
4368def _item_to_table(iterator, resource):
4369 """Convert a JSON table to the native object.
4370
4371 Args:
4372 iterator (google.api_core.page_iterator.Iterator): The iterator that is currently in use.
4373
4374 resource (Dict): An item to be converted to a table.
4375
4376 Returns:
4377 google.cloud.bigquery.table.Table: The next table in the page.
4378 """
4379 return TableListItem(resource)
4380
4381
4382def _extract_job_reference(job, project=None, location=None):
4383 """Extract fully-qualified job reference from a job-like object.
4384
4385 Args:
4386 job_id (Union[ \
4387 str, \
4388 google.cloud.bigquery.job.LoadJob, \
4389 google.cloud.bigquery.job.CopyJob, \
4390 google.cloud.bigquery.job.ExtractJob, \
4391 google.cloud.bigquery.job.QueryJob \
4392 ]): Job identifier.
4393 project (Optional[str]):
4394 Project where the job was run. Ignored if ``job_id`` is a job
4395 object.
4396 location (Optional[str]):
4397 Location where the job was run. Ignored if ``job_id`` is a job
4398 object.
4399
4400 Returns:
4401 Tuple[str, str, str]: ``(project, location, job_id)``
4402 """
4403 if hasattr(job, "job_id"):
4404 project = job.project
4405 job_id = job.job_id
4406 location = job.location
4407 else:
4408 job_id = job
4409
4410 return (project, location, job_id)
4411
4412
4413def _check_mode(stream):
4414 """Check that a stream was opened in read-binary mode.
4415
4416 Args:
4417 stream (IO[bytes]): A bytes IO object open for reading.
4418
4419 Raises:
4420 ValueError:
4421 if the ``stream.mode`` is a valid attribute
4422 and is not among ``rb``, ``r+b`` or ``rb+``.
4423 """
4424 mode = getattr(stream, "mode", None)
4425
4426 if isinstance(stream, gzip.GzipFile):
4427 if mode != gzip.READ: # pytype: disable=module-attr
4428 raise ValueError(
4429 "Cannot upload gzip files opened in write mode: use "
4430 "gzip.GzipFile(filename, mode='rb')"
4431 )
4432 else:
4433 if mode is not None and mode not in ("rb", "r+b", "rb+"):
4434 raise ValueError(
4435 "Cannot upload files opened in text mode: use "
4436 "open(filename, mode='rb') or open(filename, mode='r+b')"
4437 )
4438
4439
4440def _get_upload_headers(user_agent):
4441 """Get the headers for an upload request.
4442
4443 Args:
4444 user_agent (str): The user-agent for requests.
4445
4446 Returns:
4447 Dict: The headers to be used for the request.
4448 """
4449 return {
4450 "Accept": "application/json",
4451 "Accept-Encoding": "gzip, deflate",
4452 "User-Agent": user_agent,
4453 "content-type": "application/json; charset=UTF-8",
4454 }
4455
4456
4457def _add_server_timeout_header(headers: Optional[Dict[str, str]], kwargs):
4458 timeout = kwargs.get("timeout")
4459 if timeout is not None:
4460 if headers is None:
4461 headers = {}
4462 headers[TIMEOUT_HEADER] = str(timeout)
4463
4464 if headers:
4465 kwargs["headers"] = headers
4466
4467 return kwargs