1# Copyright 2015 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Client for interacting with the Google BigQuery API."""
16
17from __future__ import absolute_import
18from __future__ import annotations
19from __future__ import division
20
21from collections import abc as collections_abc
22import copy
23import datetime
24import functools
25import gzip
26import io
27import itertools
28import json
29import math
30import os
31import tempfile
32import typing
33from typing import (
34 Any,
35 Callable,
36 Dict,
37 IO,
38 Iterable,
39 Mapping,
40 List,
41 Optional,
42 Sequence,
43 Tuple,
44 Union,
45)
46import uuid
47import warnings
48
49import requests
50
51from google import resumable_media # type: ignore
52from google.resumable_media.requests import MultipartUpload # type: ignore
53from google.resumable_media.requests import ResumableUpload
54
55import google.api_core.client_options
56import google.api_core.exceptions as core_exceptions
57from google.api_core.iam import Policy
58from google.api_core import page_iterator
59from google.api_core import retry as retries
60import google.cloud._helpers # type: ignore
61from google.cloud import exceptions # pytype: disable=import-error
62from google.cloud.client import ClientWithProject # type: ignore # pytype: disable=import-error
63
64try:
65 from google.cloud.bigquery_storage_v1.services.big_query_read.client import (
66 DEFAULT_CLIENT_INFO as DEFAULT_BQSTORAGE_CLIENT_INFO,
67 )
68except ImportError:
69 DEFAULT_BQSTORAGE_CLIENT_INFO = None # type: ignore
70
71
72from google.auth.credentials import Credentials
73from google.cloud.bigquery._http import Connection
74from google.cloud.bigquery import _job_helpers
75from google.cloud.bigquery import _pandas_helpers
76from google.cloud.bigquery import _versions_helpers
77from google.cloud.bigquery import enums
78from google.cloud.bigquery import exceptions as bq_exceptions
79from google.cloud.bigquery import job
80from google.cloud.bigquery._helpers import _get_sub_prop
81from google.cloud.bigquery._helpers import _record_field_to_json
82from google.cloud.bigquery._helpers import _str_or_none
83from google.cloud.bigquery._helpers import _verify_job_config_type
84from google.cloud.bigquery._helpers import _get_bigquery_host
85from google.cloud.bigquery._helpers import _DEFAULT_HOST
86from google.cloud.bigquery._helpers import _DEFAULT_HOST_TEMPLATE
87from google.cloud.bigquery._helpers import _DEFAULT_UNIVERSE
88from google.cloud.bigquery._helpers import _validate_universe
89from google.cloud.bigquery._helpers import _get_client_universe
90from google.cloud.bigquery._helpers import TimeoutType
91from google.cloud.bigquery._job_helpers import make_job_id as _make_job_id
92from google.cloud.bigquery.dataset import Dataset
93from google.cloud.bigquery.dataset import DatasetListItem
94from google.cloud.bigquery.dataset import DatasetReference
95
96from google.cloud.bigquery.enums import AutoRowIDs, DatasetView, UpdateMode
97from google.cloud.bigquery.format_options import ParquetOptions
98from google.cloud.bigquery.job import (
99 CopyJob,
100 CopyJobConfig,
101 ExtractJob,
102 ExtractJobConfig,
103 LoadJob,
104 LoadJobConfig,
105 QueryJob,
106 QueryJobConfig,
107)
108from google.cloud.bigquery.model import Model
109from google.cloud.bigquery.model import ModelReference
110from google.cloud.bigquery.model import _model_arg_to_model_ref
111from google.cloud.bigquery.opentelemetry_tracing import create_span
112from google.cloud.bigquery.query import _QueryResults
113from google.cloud.bigquery.retry import (
114 DEFAULT_JOB_RETRY,
115 DEFAULT_RETRY,
116 DEFAULT_TIMEOUT,
117 DEFAULT_GET_JOB_TIMEOUT,
118 POLLING_DEFAULT_VALUE,
119)
120from google.cloud.bigquery.routine import Routine
121from google.cloud.bigquery.routine import RoutineReference
122from google.cloud.bigquery.schema import SchemaField
123from google.cloud.bigquery.table import _table_arg_to_table
124from google.cloud.bigquery.table import _table_arg_to_table_ref
125from google.cloud.bigquery.table import Table
126from google.cloud.bigquery.table import TableListItem
127from google.cloud.bigquery.table import TableReference
128from google.cloud.bigquery.table import RowIterator
129
130pyarrow = _versions_helpers.PYARROW_VERSIONS.try_import()
131pandas = (
132 _versions_helpers.PANDAS_VERSIONS.try_import()
133) # mypy check fails because pandas import is outside module, there are type: ignore comments related to this
134
135
136ResumableTimeoutType = Union[
137 None, float, Tuple[float, float]
138] # for resumable media methods
139
140if typing.TYPE_CHECKING: # pragma: NO COVER
141 # os.PathLike is only subscriptable in Python 3.9+, thus shielding with a condition.
142 PathType = Union[str, bytes, os.PathLike[str], os.PathLike[bytes], io.IOBase]
143_DEFAULT_CHUNKSIZE = 100 * 1024 * 1024 # 100 MB
144_MAX_MULTIPART_SIZE = 5 * 1024 * 1024
145_DEFAULT_NUM_RETRIES = 6
146_BASE_UPLOAD_TEMPLATE = "{host}/upload/bigquery/v2/projects/{project}/jobs?uploadType="
147_MULTIPART_URL_TEMPLATE = _BASE_UPLOAD_TEMPLATE + "multipart"
148_RESUMABLE_URL_TEMPLATE = _BASE_UPLOAD_TEMPLATE + "resumable"
149_GENERIC_CONTENT_TYPE = "*/*"
150_READ_LESS_THAN_SIZE = (
151 "Size {:d} was specified but the file-like object only had " "{:d} bytes remaining."
152)
153_NEED_TABLE_ARGUMENT = (
154 "The table argument should be a table ID string, Table, or TableReference"
155)
156_LIST_ROWS_FROM_QUERY_RESULTS_FIELDS = "jobReference,totalRows,pageToken,rows"
157
158# In microbenchmarks, it's been shown that even in ideal conditions (query
159# finished, local data), requests to getQueryResults can take 10+ seconds.
160# In less-than-ideal situations, the response can take even longer, as it must
161# be able to download a full 100+ MB row in that time. Don't let the
162# connection timeout before data can be downloaded.
163# https://github.com/googleapis/python-bigquery/issues/438
164_MIN_GET_QUERY_RESULTS_TIMEOUT = 120
165
166TIMEOUT_HEADER = "X-Server-Timeout"
167
168
169class Project(object):
170 """Wrapper for resource describing a BigQuery project.
171
172 Args:
173 project_id (str): Opaque ID of the project
174
175 numeric_id (int): Numeric ID of the project
176
177 friendly_name (str): Display name of the project
178 """
179
180 def __init__(self, project_id, numeric_id, friendly_name):
181 self.project_id = project_id
182 self.numeric_id = numeric_id
183 self.friendly_name = friendly_name
184
185 @classmethod
186 def from_api_repr(cls, resource):
187 """Factory: construct an instance from a resource dict."""
188 return cls(resource["id"], resource["numericId"], resource["friendlyName"])
189
190
191class Client(ClientWithProject):
192 """Client to bundle configuration needed for API requests.
193
194 Args:
195 project (Optional[str]):
196 Project ID for the project which the client acts on behalf of.
197 Will be passed when creating a dataset / job. If not passed,
198 falls back to the default inferred from the environment.
199 credentials (Optional[google.auth.credentials.Credentials]):
200 The OAuth2 Credentials to use for this client. If not passed
201 (and if no ``_http`` object is passed), falls back to the
202 default inferred from the environment.
203 _http (Optional[requests.Session]):
204 HTTP object to make requests. Can be any object that
205 defines ``request()`` with the same interface as
206 :meth:`requests.Session.request`. If not passed, an ``_http``
207 object is created that is bound to the ``credentials`` for the
208 current object.
209 This parameter should be considered private, and could change in
210 the future.
211 location (Optional[str]):
212 Default location for jobs / datasets / tables.
213 default_query_job_config (Optional[google.cloud.bigquery.job.QueryJobConfig]):
214 Default ``QueryJobConfig``.
215 Will be merged into job configs passed into the ``query`` method.
216 default_load_job_config (Optional[google.cloud.bigquery.job.LoadJobConfig]):
217 Default ``LoadJobConfig``.
218 Will be merged into job configs passed into the ``load_table_*`` methods.
219 client_info (Optional[google.api_core.client_info.ClientInfo]):
220 The client info used to send a user-agent string along with API
221 requests. If ``None``, then default info will be used. Generally,
222 you only need to set this if you're developing your own library
223 or partner tool.
224 client_options (Optional[Union[google.api_core.client_options.ClientOptions, Dict]]):
225 Client options used to set user options on the client. API Endpoint
226 should be set through client_options.
227 default_job_creation_mode (Optional[str]):
228 Sets the default job creation mode used by query methods such as
229 query_and_wait(). For lightweight queries, JOB_CREATION_OPTIONAL is
230 generally recommended.
231
232 Raises:
233 google.auth.exceptions.DefaultCredentialsError:
234 Raised if ``credentials`` is not specified and the library fails
235 to acquire default credentials.
236 """
237
238 SCOPE = ("https://www.googleapis.com/auth/cloud-platform",) # type: ignore
239 """The scopes required for authenticating as a BigQuery consumer."""
240
241 def __init__(
242 self,
243 project: Optional[str] = None,
244 credentials: Optional[Credentials] = None,
245 _http: Optional[requests.Session] = None,
246 location: Optional[str] = None,
247 default_query_job_config: Optional[QueryJobConfig] = None,
248 default_load_job_config: Optional[LoadJobConfig] = None,
249 client_info: Optional[google.api_core.client_info.ClientInfo] = None,
250 client_options: Optional[
251 Union[google.api_core.client_options.ClientOptions, Dict[str, Any]]
252 ] = None,
253 default_job_creation_mode: Optional[str] = None,
254 ) -> None:
255 if client_options is None:
256 client_options = {}
257 if isinstance(client_options, dict):
258 client_options = google.api_core.client_options.from_dict(client_options)
259 # assert isinstance(client_options, google.api_core.client_options.ClientOptions)
260
261 super(Client, self).__init__(
262 project=project,
263 credentials=credentials,
264 client_options=client_options,
265 _http=_http,
266 )
267
268 kw_args: Dict[str, Any] = {"client_info": client_info}
269 bq_host = _get_bigquery_host()
270 kw_args["api_endpoint"] = bq_host if bq_host != _DEFAULT_HOST else None
271 client_universe = None
272 if client_options.api_endpoint:
273 api_endpoint = client_options.api_endpoint
274 kw_args["api_endpoint"] = api_endpoint
275 else:
276 client_universe = _get_client_universe(client_options)
277 if client_universe != _DEFAULT_UNIVERSE:
278 kw_args["api_endpoint"] = _DEFAULT_HOST_TEMPLATE.replace(
279 "{UNIVERSE_DOMAIN}", client_universe
280 )
281 # Ensure credentials and universe are not in conflict.
282 if hasattr(self, "_credentials") and client_universe is not None:
283 _validate_universe(client_universe, self._credentials)
284
285 self._connection = Connection(self, **kw_args)
286 self._location = location
287 self._default_load_job_config = copy.deepcopy(default_load_job_config)
288 self.default_job_creation_mode = default_job_creation_mode
289
290 # Use property setter so validation can run.
291 self.default_query_job_config = default_query_job_config
292
293 @property
294 def location(self):
295 """Default location for jobs / datasets / tables."""
296 return self._location
297
298 @property
299 def default_job_creation_mode(self):
300 """Default job creation mode used for query execution."""
301 return self._default_job_creation_mode
302
303 @default_job_creation_mode.setter
304 def default_job_creation_mode(self, value: Optional[str]):
305 self._default_job_creation_mode = value
306
307 @property
308 def default_query_job_config(self) -> Optional[QueryJobConfig]:
309 """Default ``QueryJobConfig`` or ``None``.
310
311 Will be merged into job configs passed into the ``query`` or
312 ``query_and_wait`` methods.
313 """
314 return self._default_query_job_config
315
316 @default_query_job_config.setter
317 def default_query_job_config(self, value: Optional[QueryJobConfig]):
318 if value is not None:
319 _verify_job_config_type(
320 value, QueryJobConfig, param_name="default_query_job_config"
321 )
322 self._default_query_job_config = copy.deepcopy(value)
323
324 @property
325 def default_load_job_config(self):
326 """Default ``LoadJobConfig``.
327 Will be merged into job configs passed into the ``load_table_*`` methods.
328 """
329 return self._default_load_job_config
330
331 @default_load_job_config.setter
332 def default_load_job_config(self, value: LoadJobConfig):
333 self._default_load_job_config = copy.deepcopy(value)
334
335 def close(self):
336 """Close the underlying transport objects, releasing system resources.
337
338 .. note::
339
340 The client instance can be used for making additional requests even
341 after closing, in which case the underlying connections are
342 automatically re-created.
343 """
344 self._http._auth_request.session.close()
345 self._http.close()
346
347 def get_service_account_email(
348 self,
349 project: Optional[str] = None,
350 retry: retries.Retry = DEFAULT_RETRY,
351 timeout: TimeoutType = DEFAULT_TIMEOUT,
352 ) -> str:
353 """Get the email address of the project's BigQuery service account
354
355 Example:
356
357 .. code-block:: python
358
359 from google.cloud import bigquery
360 client = bigquery.Client()
361 client.get_service_account_email()
362 # returns an email similar to: my_service_account@my-project.iam.gserviceaccount.com
363
364 Note:
365 This is the service account that BigQuery uses to manage tables
366 encrypted by a key in KMS.
367
368 Args:
369 project (Optional[str]):
370 Project ID to use for retreiving service account email.
371 Defaults to the client's project.
372 retry (Optional[google.api_core.retry.Retry]): How to retry the RPC.
373 timeout (Optional[float]):
374 The number of seconds to wait for the underlying HTTP transport
375 before using ``retry``.
376
377 Returns:
378 str:
379 service account email address
380
381 """
382 if project is None:
383 project = self.project
384 path = "/projects/%s/serviceAccount" % (project,)
385 span_attributes = {"path": path}
386 api_response = self._call_api(
387 retry,
388 span_name="BigQuery.getServiceAccountEmail",
389 span_attributes=span_attributes,
390 method="GET",
391 path=path,
392 timeout=timeout,
393 )
394 return api_response["email"]
395
396 def list_projects(
397 self,
398 max_results: Optional[int] = None,
399 page_token: Optional[str] = None,
400 retry: retries.Retry = DEFAULT_RETRY,
401 timeout: TimeoutType = DEFAULT_TIMEOUT,
402 page_size: Optional[int] = None,
403 ) -> page_iterator.Iterator:
404 """List projects for the project associated with this client.
405
406 See
407 https://cloud.google.com/bigquery/docs/reference/rest/v2/projects/list
408
409 Args:
410 max_results (Optional[int]):
411 Maximum number of projects to return.
412 Defaults to a value set by the API.
413
414 page_token (Optional[str]):
415 Token representing a cursor into the projects. If not passed,
416 the API will return the first page of projects. The token marks
417 the beginning of the iterator to be returned and the value of
418 the ``page_token`` can be accessed at ``next_page_token`` of the
419 :class:`~google.api_core.page_iterator.HTTPIterator`.
420
421 retry (Optional[google.api_core.retry.Retry]): How to retry the RPC.
422
423 timeout (Optional[float]):
424 The number of seconds to wait for the underlying HTTP transport
425 before using ``retry``.
426
427 page_size (Optional[int]):
428 Maximum number of projects to return in each page.
429 Defaults to a value set by the API.
430
431 Returns:
432 google.api_core.page_iterator.Iterator:
433 Iterator of :class:`~google.cloud.bigquery.client.Project`
434 accessible to the current client.
435 """
436 span_attributes = {"path": "/projects"}
437
438 def api_request(*args, **kwargs):
439 return self._call_api(
440 retry,
441 span_name="BigQuery.listProjects",
442 span_attributes=span_attributes,
443 *args,
444 timeout=timeout,
445 **kwargs,
446 )
447
448 return page_iterator.HTTPIterator(
449 client=self,
450 api_request=api_request,
451 path="/projects",
452 item_to_value=_item_to_project,
453 items_key="projects",
454 page_token=page_token,
455 max_results=max_results,
456 page_size=page_size,
457 )
458
459 def list_datasets(
460 self,
461 project: Optional[str] = None,
462 include_all: bool = False,
463 filter: Optional[str] = None,
464 max_results: Optional[int] = None,
465 page_token: Optional[str] = None,
466 retry: retries.Retry = DEFAULT_RETRY,
467 timeout: TimeoutType = DEFAULT_TIMEOUT,
468 page_size: Optional[int] = None,
469 ) -> page_iterator.Iterator:
470 """List datasets for the project associated with this client.
471
472 See
473 https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets/list
474
475 Args:
476 project (Optional[str]):
477 Project ID to use for retreiving datasets. Defaults to the
478 client's project.
479 include_all (Optional[bool]):
480 True if results include hidden datasets. Defaults to False.
481 filter (Optional[str]):
482 An expression for filtering the results by label.
483 For syntax, see
484 https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets/list#body.QUERY_PARAMETERS.filter
485 max_results (Optional[int]):
486 Maximum number of datasets to return.
487 page_token (Optional[str]):
488 Token representing a cursor into the datasets. If not passed,
489 the API will return the first page of datasets. The token marks
490 the beginning of the iterator to be returned and the value of
491 the ``page_token`` can be accessed at ``next_page_token`` of the
492 :class:`~google.api_core.page_iterator.HTTPIterator`.
493 retry (Optional[google.api_core.retry.Retry]):
494 How to retry the RPC.
495 timeout (Optional[float]):
496 The number of seconds to wait for the underlying HTTP transport
497 before using ``retry``.
498 page_size (Optional[int]):
499 Maximum number of datasets to return per page.
500
501 Returns:
502 google.api_core.page_iterator.Iterator:
503 Iterator of :class:`~google.cloud.bigquery.dataset.DatasetListItem`.
504 associated with the project.
505 """
506 extra_params: Dict[str, Any] = {}
507 if project is None:
508 project = self.project
509 if include_all:
510 extra_params["all"] = True
511 if filter:
512 # TODO: consider supporting a dict of label -> value for filter,
513 # and converting it into a string here.
514 extra_params["filter"] = filter
515 path = "/projects/%s/datasets" % (project,)
516
517 span_attributes = {"path": path}
518
519 def api_request(*args, **kwargs):
520 return self._call_api(
521 retry,
522 span_name="BigQuery.listDatasets",
523 span_attributes=span_attributes,
524 *args,
525 timeout=timeout,
526 **kwargs,
527 )
528
529 return page_iterator.HTTPIterator(
530 client=self,
531 api_request=api_request,
532 path=path,
533 item_to_value=_item_to_dataset,
534 items_key="datasets",
535 page_token=page_token,
536 max_results=max_results,
537 extra_params=extra_params,
538 page_size=page_size,
539 )
540
541 def dataset(
542 self, dataset_id: str, project: Optional[str] = None
543 ) -> DatasetReference:
544 """Deprecated: Construct a reference to a dataset.
545
546 .. deprecated:: 1.24.0
547 Construct a
548 :class:`~google.cloud.bigquery.dataset.DatasetReference` using its
549 constructor or use a string where previously a reference object
550 was used.
551
552 As of ``google-cloud-bigquery`` version 1.7.0, all client methods
553 that take a
554 :class:`~google.cloud.bigquery.dataset.DatasetReference` or
555 :class:`~google.cloud.bigquery.table.TableReference` also take a
556 string in standard SQL format, e.g. ``project.dataset_id`` or
557 ``project.dataset_id.table_id``.
558
559 Args:
560 dataset_id (str): ID of the dataset.
561
562 project (Optional[str]):
563 Project ID for the dataset (defaults to the project of the client).
564
565 Returns:
566 google.cloud.bigquery.dataset.DatasetReference:
567 a new ``DatasetReference`` instance.
568 """
569 if project is None:
570 project = self.project
571
572 warnings.warn(
573 "Client.dataset is deprecated and will be removed in a future version. "
574 "Use a string like 'my_project.my_dataset' or a "
575 "cloud.google.bigquery.DatasetReference object, instead.",
576 PendingDeprecationWarning,
577 stacklevel=2,
578 )
579 return DatasetReference(project, dataset_id)
580
581 def _ensure_bqstorage_client(
582 self,
583 bqstorage_client: Optional[
584 "google.cloud.bigquery_storage.BigQueryReadClient"
585 ] = None,
586 client_options: Optional[google.api_core.client_options.ClientOptions] = None,
587 client_info: Optional[
588 "google.api_core.gapic_v1.client_info.ClientInfo"
589 ] = DEFAULT_BQSTORAGE_CLIENT_INFO,
590 ) -> Optional["google.cloud.bigquery_storage.BigQueryReadClient"]:
591 """Create a BigQuery Storage API client using this client's credentials.
592
593 Args:
594 bqstorage_client:
595 An existing BigQuery Storage client instance. If ``None``, a new
596 instance is created and returned.
597 client_options:
598 Custom options used with a new BigQuery Storage client instance
599 if one is created.
600 client_info:
601 The client info used with a new BigQuery Storage client
602 instance if one is created.
603
604 Returns:
605 A BigQuery Storage API client.
606 """
607
608 try:
609 bigquery_storage = _versions_helpers.BQ_STORAGE_VERSIONS.try_import(
610 raise_if_error=True
611 )
612 except bq_exceptions.BigQueryStorageNotFoundError:
613 warnings.warn(
614 "Cannot create BigQuery Storage client, the dependency "
615 "google-cloud-bigquery-storage is not installed."
616 )
617 return None
618 except bq_exceptions.LegacyBigQueryStorageError as exc:
619 warnings.warn(
620 "Dependency google-cloud-bigquery-storage is outdated: " + str(exc)
621 )
622 return None
623
624 if bqstorage_client is None: # pragma: NO COVER
625 bqstorage_client = bigquery_storage.BigQueryReadClient(
626 credentials=self._credentials,
627 client_options=client_options,
628 client_info=client_info, # type: ignore # (None is also accepted)
629 )
630
631 return bqstorage_client
632
633 def _dataset_from_arg(self, dataset) -> Union[Dataset, DatasetReference]:
634 if isinstance(dataset, str):
635 dataset = DatasetReference.from_string(
636 dataset, default_project=self.project
637 )
638
639 if not isinstance(dataset, (Dataset, DatasetReference)):
640 if isinstance(dataset, DatasetListItem):
641 dataset = dataset.reference
642 else:
643 raise TypeError(
644 "dataset must be a Dataset, DatasetReference, DatasetListItem,"
645 " or string"
646 )
647 return dataset
648
649 def create_dataset(
650 self,
651 dataset: Union[str, Dataset, DatasetReference, DatasetListItem],
652 exists_ok: bool = False,
653 retry: retries.Retry = DEFAULT_RETRY,
654 timeout: TimeoutType = DEFAULT_TIMEOUT,
655 ) -> Dataset:
656 """API call: create the dataset via a POST request.
657
658
659 See
660 https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets/insert
661
662 Example:
663
664 .. code-block:: python
665
666 from google.cloud import bigquery
667 client = bigquery.Client()
668 dataset = bigquery.Dataset('my_project.my_dataset')
669 dataset = client.create_dataset(dataset)
670
671 Args:
672 dataset (Union[ \
673 google.cloud.bigquery.dataset.Dataset, \
674 google.cloud.bigquery.dataset.DatasetReference, \
675 google.cloud.bigquery.dataset.DatasetListItem, \
676 str, \
677 ]):
678 A :class:`~google.cloud.bigquery.dataset.Dataset` to create.
679 If ``dataset`` is a reference, an empty dataset is created
680 with the specified ID and client's default location.
681 exists_ok (Optional[bool]):
682 Defaults to ``False``. If ``True``, ignore "already exists"
683 errors when creating the dataset.
684 retry (Optional[google.api_core.retry.Retry]):
685 How to retry the RPC.
686 timeout (Optional[float]):
687 The number of seconds to wait for the underlying HTTP transport
688 before using ``retry``.
689
690 Returns:
691 google.cloud.bigquery.dataset.Dataset:
692 A new ``Dataset`` returned from the API.
693
694 Raises:
695 google.cloud.exceptions.Conflict:
696 If the dataset already exists.
697 """
698 dataset = self._dataset_from_arg(dataset)
699 if isinstance(dataset, DatasetReference):
700 dataset = Dataset(dataset)
701
702 path = "/projects/%s/datasets" % (dataset.project,)
703
704 data = dataset.to_api_repr()
705 if data.get("location") is None and self.location is not None:
706 data["location"] = self.location
707
708 try:
709 span_attributes = {"path": path}
710
711 api_response = self._call_api(
712 retry,
713 span_name="BigQuery.createDataset",
714 span_attributes=span_attributes,
715 method="POST",
716 path=path,
717 data=data,
718 timeout=timeout,
719 )
720 return Dataset.from_api_repr(api_response)
721 except core_exceptions.Conflict:
722 if not exists_ok:
723 raise
724 return self.get_dataset(dataset.reference, retry=retry)
725
726 def create_routine(
727 self,
728 routine: Routine,
729 exists_ok: bool = False,
730 retry: retries.Retry = DEFAULT_RETRY,
731 timeout: TimeoutType = DEFAULT_TIMEOUT,
732 ) -> Routine:
733 """[Beta] Create a routine via a POST request.
734
735 See
736 https://cloud.google.com/bigquery/docs/reference/rest/v2/routines/insert
737
738 Args:
739 routine (google.cloud.bigquery.routine.Routine):
740 A :class:`~google.cloud.bigquery.routine.Routine` to create.
741 The dataset that the routine belongs to must already exist.
742 exists_ok (Optional[bool]):
743 Defaults to ``False``. If ``True``, ignore "already exists"
744 errors when creating the routine.
745 retry (Optional[google.api_core.retry.Retry]):
746 How to retry the RPC.
747 timeout (Optional[float]):
748 The number of seconds to wait for the underlying HTTP transport
749 before using ``retry``.
750
751 Returns:
752 google.cloud.bigquery.routine.Routine:
753 A new ``Routine`` returned from the service.
754
755 Raises:
756 google.cloud.exceptions.Conflict:
757 If the routine already exists.
758 """
759 reference = routine.reference
760 path = "/projects/{}/datasets/{}/routines".format(
761 reference.project, reference.dataset_id
762 )
763 resource = routine.to_api_repr()
764 try:
765 span_attributes = {"path": path}
766 api_response = self._call_api(
767 retry,
768 span_name="BigQuery.createRoutine",
769 span_attributes=span_attributes,
770 method="POST",
771 path=path,
772 data=resource,
773 timeout=timeout,
774 )
775 return Routine.from_api_repr(api_response)
776 except core_exceptions.Conflict:
777 if not exists_ok:
778 raise
779 return self.get_routine(routine.reference, retry=retry)
780
781 def create_table(
782 self,
783 table: Union[str, Table, TableReference, TableListItem],
784 exists_ok: bool = False,
785 retry: retries.Retry = DEFAULT_RETRY,
786 timeout: TimeoutType = DEFAULT_TIMEOUT,
787 ) -> Table:
788 """API call: create a table via a PUT request
789
790 See
791 https://cloud.google.com/bigquery/docs/reference/rest/v2/tables/insert
792
793 Args:
794 table (Union[ \
795 google.cloud.bigquery.table.Table, \
796 google.cloud.bigquery.table.TableReference, \
797 google.cloud.bigquery.table.TableListItem, \
798 str, \
799 ]):
800 A :class:`~google.cloud.bigquery.table.Table` to create.
801 If ``table`` is a reference, an empty table is created
802 with the specified ID. The dataset that the table belongs to
803 must already exist.
804 exists_ok (Optional[bool]):
805 Defaults to ``False``. If ``True``, ignore "already exists"
806 errors when creating the table.
807 retry (Optional[google.api_core.retry.Retry]):
808 How to retry the RPC.
809 timeout (Optional[float]):
810 The number of seconds to wait for the underlying HTTP transport
811 before using ``retry``.
812
813 Returns:
814 google.cloud.bigquery.table.Table:
815 A new ``Table`` returned from the service.
816
817 Raises:
818 google.cloud.exceptions.Conflict:
819 If the table already exists.
820 """
821 table = _table_arg_to_table(table, default_project=self.project)
822 dataset_id = table.dataset_id
823 path = "/projects/%s/datasets/%s/tables" % (table.project, dataset_id)
824 data = table.to_api_repr()
825 try:
826 span_attributes = {"path": path, "dataset_id": dataset_id}
827 api_response = self._call_api(
828 retry,
829 span_name="BigQuery.createTable",
830 span_attributes=span_attributes,
831 method="POST",
832 path=path,
833 data=data,
834 timeout=timeout,
835 )
836 return Table.from_api_repr(api_response)
837 except core_exceptions.Conflict:
838 if not exists_ok:
839 raise
840 return self.get_table(table.reference, retry=retry)
841
842 def _call_api(
843 self,
844 retry,
845 span_name=None,
846 span_attributes=None,
847 job_ref=None,
848 headers: Optional[Dict[str, str]] = None,
849 **kwargs,
850 ):
851 kwargs = _add_server_timeout_header(headers, kwargs)
852 call = functools.partial(self._connection.api_request, **kwargs)
853
854 if retry:
855 call = retry(call)
856
857 if span_name is not None:
858 with create_span(
859 name=span_name, attributes=span_attributes, client=self, job_ref=job_ref
860 ):
861 return call()
862
863 return call()
864
865 def get_dataset(
866 self,
867 dataset_ref: Union[DatasetReference, str],
868 retry: retries.Retry = DEFAULT_RETRY,
869 timeout: TimeoutType = DEFAULT_TIMEOUT,
870 dataset_view: Optional[DatasetView] = None,
871 ) -> Dataset:
872 """Fetch the dataset referenced by ``dataset_ref``
873
874 Args:
875 dataset_ref (Union[ \
876 google.cloud.bigquery.dataset.DatasetReference, \
877 str, \
878 ]):
879 A reference to the dataset to fetch from the BigQuery API.
880 If a string is passed in, this method attempts to create a
881 dataset reference from a string using
882 :func:`~google.cloud.bigquery.dataset.DatasetReference.from_string`.
883 retry (Optional[google.api_core.retry.Retry]):
884 How to retry the RPC.
885 timeout (Optional[float]):
886 The number of seconds to wait for the underlying HTTP transport
887 before using ``retry``.
888 dataset_view (Optional[google.cloud.bigquery.enums.DatasetView]):
889 Specifies the view that determines which dataset information is
890 returned. By default, dataset metadata (e.g. friendlyName, description,
891 labels, etc) and ACL information are returned. This argument can
892 take on the following possible enum values.
893
894 * :attr:`~google.cloud.bigquery.enums.DatasetView.ACL`:
895 Includes dataset metadata and the ACL.
896 * :attr:`~google.cloud.bigquery.enums.DatasetView.FULL`:
897 Includes all dataset metadata, including the ACL and table metadata.
898 This view is not supported by the `datasets.list` API method.
899 * :attr:`~google.cloud.bigquery.enums.DatasetView.METADATA`:
900 Includes basic dataset metadata, but not the ACL.
901 * :attr:`~google.cloud.bigquery.enums.DatasetView.DATASET_VIEW_UNSPECIFIED`:
902 The server will decide which view to use. Currently defaults to FULL.
903 Returns:
904 google.cloud.bigquery.dataset.Dataset:
905 A ``Dataset`` instance.
906 """
907 if isinstance(dataset_ref, str):
908 dataset_ref = DatasetReference.from_string(
909 dataset_ref, default_project=self.project
910 )
911 path = dataset_ref.path
912
913 if dataset_view:
914 query_params = {"datasetView": dataset_view.value}
915 else:
916 query_params = {}
917
918 span_attributes = {"path": path}
919 api_response = self._call_api(
920 retry,
921 span_name="BigQuery.getDataset",
922 span_attributes=span_attributes,
923 method="GET",
924 path=path,
925 timeout=timeout,
926 query_params=query_params,
927 )
928 return Dataset.from_api_repr(api_response)
929
930 def get_iam_policy(
931 self,
932 table: Union[Table, TableReference, TableListItem, str],
933 requested_policy_version: int = 1,
934 retry: retries.Retry = DEFAULT_RETRY,
935 timeout: TimeoutType = DEFAULT_TIMEOUT,
936 ) -> Policy:
937 """Return the access control policy for a table resource.
938
939 Args:
940 table (Union[ \
941 google.cloud.bigquery.table.Table, \
942 google.cloud.bigquery.table.TableReference, \
943 google.cloud.bigquery.table.TableListItem, \
944 str, \
945 ]):
946 The table to get the access control policy for.
947 If a string is passed in, this method attempts to create a
948 table reference from a string using
949 :func:`~google.cloud.bigquery.table.TableReference.from_string`.
950 requested_policy_version (int):
951 Optional. The maximum policy version that will be used to format the policy.
952
953 Only version ``1`` is currently supported.
954
955 See: https://cloud.google.com/bigquery/docs/reference/rest/v2/GetPolicyOptions
956 retry (Optional[google.api_core.retry.Retry]):
957 How to retry the RPC.
958 timeout (Optional[float]):
959 The number of seconds to wait for the underlying HTTP transport
960 before using ``retry``.
961
962 Returns:
963 google.api_core.iam.Policy:
964 The access control policy.
965 """
966 table = _table_arg_to_table_ref(table, default_project=self.project)
967
968 if requested_policy_version != 1:
969 raise ValueError("only IAM policy version 1 is supported")
970
971 body = {"options": {"requestedPolicyVersion": 1}}
972
973 path = "{}:getIamPolicy".format(table.path)
974 span_attributes = {"path": path}
975 response = self._call_api(
976 retry,
977 span_name="BigQuery.getIamPolicy",
978 span_attributes=span_attributes,
979 method="POST",
980 path=path,
981 data=body,
982 timeout=timeout,
983 )
984
985 return Policy.from_api_repr(response)
986
987 def set_iam_policy(
988 self,
989 table: Union[Table, TableReference, TableListItem, str],
990 policy: Policy,
991 updateMask: Optional[str] = None,
992 retry: retries.Retry = DEFAULT_RETRY,
993 timeout: TimeoutType = DEFAULT_TIMEOUT,
994 *,
995 fields: Sequence[str] = (),
996 ) -> Policy:
997 """Return the access control policy for a table resource.
998
999 Args:
1000 table (Union[ \
1001 google.cloud.bigquery.table.Table, \
1002 google.cloud.bigquery.table.TableReference, \
1003 google.cloud.bigquery.table.TableListItem, \
1004 str, \
1005 ]):
1006 The table to get the access control policy for.
1007 If a string is passed in, this method attempts to create a
1008 table reference from a string using
1009 :func:`~google.cloud.bigquery.table.TableReference.from_string`.
1010 policy (google.api_core.iam.Policy):
1011 The access control policy to set.
1012 updateMask (Optional[str]):
1013 Mask as defined by
1014 https://cloud.google.com/bigquery/docs/reference/rest/v2/tables/setIamPolicy#body.request_body.FIELDS.update_mask
1015
1016 Incompatible with ``fields``.
1017 retry (Optional[google.api_core.retry.Retry]):
1018 How to retry the RPC.
1019 timeout (Optional[float]):
1020 The number of seconds to wait for the underlying HTTP transport
1021 before using ``retry``.
1022 fields (Sequence[str]):
1023 Which properties to set on the policy. See:
1024 https://cloud.google.com/bigquery/docs/reference/rest/v2/tables/setIamPolicy#body.request_body.FIELDS.update_mask
1025
1026 Incompatible with ``updateMask``.
1027
1028 Returns:
1029 google.api_core.iam.Policy:
1030 The updated access control policy.
1031 """
1032 if updateMask is not None and not fields:
1033 update_mask = updateMask
1034 elif updateMask is not None and fields:
1035 raise ValueError("Cannot set both fields and updateMask")
1036 elif fields:
1037 update_mask = ",".join(fields)
1038 else:
1039 update_mask = None
1040
1041 table = _table_arg_to_table_ref(table, default_project=self.project)
1042
1043 if not isinstance(policy, (Policy)):
1044 raise TypeError("policy must be a Policy")
1045
1046 body = {"policy": policy.to_api_repr()}
1047
1048 if update_mask is not None:
1049 body["updateMask"] = update_mask
1050
1051 path = "{}:setIamPolicy".format(table.path)
1052 span_attributes = {"path": path}
1053
1054 response = self._call_api(
1055 retry,
1056 span_name="BigQuery.setIamPolicy",
1057 span_attributes=span_attributes,
1058 method="POST",
1059 path=path,
1060 data=body,
1061 timeout=timeout,
1062 )
1063
1064 return Policy.from_api_repr(response)
1065
1066 def test_iam_permissions(
1067 self,
1068 table: Union[Table, TableReference, TableListItem, str],
1069 permissions: Sequence[str],
1070 retry: retries.Retry = DEFAULT_RETRY,
1071 timeout: TimeoutType = DEFAULT_TIMEOUT,
1072 ) -> Dict[str, Any]:
1073 table = _table_arg_to_table_ref(table, default_project=self.project)
1074
1075 body = {"permissions": permissions}
1076
1077 path = "{}:testIamPermissions".format(table.path)
1078 span_attributes = {"path": path}
1079 response = self._call_api(
1080 retry,
1081 span_name="BigQuery.testIamPermissions",
1082 span_attributes=span_attributes,
1083 method="POST",
1084 path=path,
1085 data=body,
1086 timeout=timeout,
1087 )
1088
1089 return response
1090
1091 def get_model(
1092 self,
1093 model_ref: Union[ModelReference, str],
1094 retry: retries.Retry = DEFAULT_RETRY,
1095 timeout: TimeoutType = DEFAULT_TIMEOUT,
1096 ) -> Model:
1097 """[Beta] Fetch the model referenced by ``model_ref``.
1098
1099 Args:
1100 model_ref (Union[ \
1101 google.cloud.bigquery.model.ModelReference, \
1102 str, \
1103 ]):
1104 A reference to the model to fetch from the BigQuery API.
1105 If a string is passed in, this method attempts to create a
1106 model reference from a string using
1107 :func:`google.cloud.bigquery.model.ModelReference.from_string`.
1108 retry (Optional[google.api_core.retry.Retry]):
1109 How to retry the RPC.
1110 timeout (Optional[float]):
1111 The number of seconds to wait for the underlying HTTP transport
1112 before using ``retry``.
1113
1114 Returns:
1115 google.cloud.bigquery.model.Model: A ``Model`` instance.
1116 """
1117 if isinstance(model_ref, str):
1118 model_ref = ModelReference.from_string(
1119 model_ref, default_project=self.project
1120 )
1121 path = model_ref.path
1122 span_attributes = {"path": path}
1123
1124 api_response = self._call_api(
1125 retry,
1126 span_name="BigQuery.getModel",
1127 span_attributes=span_attributes,
1128 method="GET",
1129 path=path,
1130 timeout=timeout,
1131 )
1132 return Model.from_api_repr(api_response)
1133
1134 def get_routine(
1135 self,
1136 routine_ref: Union[Routine, RoutineReference, str],
1137 retry: retries.Retry = DEFAULT_RETRY,
1138 timeout: TimeoutType = DEFAULT_TIMEOUT,
1139 ) -> Routine:
1140 """[Beta] Get the routine referenced by ``routine_ref``.
1141
1142 Args:
1143 routine_ref (Union[ \
1144 google.cloud.bigquery.routine.Routine, \
1145 google.cloud.bigquery.routine.RoutineReference, \
1146 str, \
1147 ]):
1148 A reference to the routine to fetch from the BigQuery API. If
1149 a string is passed in, this method attempts to create a
1150 reference from a string using
1151 :func:`google.cloud.bigquery.routine.RoutineReference.from_string`.
1152 retry (Optional[google.api_core.retry.Retry]):
1153 How to retry the API call.
1154 timeout (Optional[float]):
1155 The number of seconds to wait for the underlying HTTP transport
1156 before using ``retry``.
1157
1158 Returns:
1159 google.cloud.bigquery.routine.Routine:
1160 A ``Routine`` instance.
1161 """
1162 if isinstance(routine_ref, str):
1163 routine_ref = RoutineReference.from_string(
1164 routine_ref, default_project=self.project
1165 )
1166 path = routine_ref.path
1167 span_attributes = {"path": path}
1168 api_response = self._call_api(
1169 retry,
1170 span_name="BigQuery.getRoutine",
1171 span_attributes=span_attributes,
1172 method="GET",
1173 path=path,
1174 timeout=timeout,
1175 )
1176 return Routine.from_api_repr(api_response)
1177
1178 def get_table(
1179 self,
1180 table: Union[Table, TableReference, TableListItem, str],
1181 retry: retries.Retry = DEFAULT_RETRY,
1182 timeout: TimeoutType = DEFAULT_TIMEOUT,
1183 ) -> Table:
1184 """Fetch the table referenced by ``table``.
1185
1186 Args:
1187 table (Union[ \
1188 google.cloud.bigquery.table.Table, \
1189 google.cloud.bigquery.table.TableReference, \
1190 google.cloud.bigquery.table.TableListItem, \
1191 str, \
1192 ]):
1193 A reference to the table to fetch from the BigQuery API.
1194 If a string is passed in, this method attempts to create a
1195 table reference from a string using
1196 :func:`google.cloud.bigquery.table.TableReference.from_string`.
1197 retry (Optional[google.api_core.retry.Retry]):
1198 How to retry the RPC.
1199 timeout (Optional[float]):
1200 The number of seconds to wait for the underlying HTTP transport
1201 before using ``retry``.
1202
1203 Returns:
1204 google.cloud.bigquery.table.Table:
1205 A ``Table`` instance.
1206 """
1207 table_ref = _table_arg_to_table_ref(table, default_project=self.project)
1208 path = table_ref.path
1209 span_attributes = {"path": path}
1210 api_response = self._call_api(
1211 retry,
1212 span_name="BigQuery.getTable",
1213 span_attributes=span_attributes,
1214 method="GET",
1215 path=path,
1216 timeout=timeout,
1217 )
1218 return Table.from_api_repr(api_response)
1219
1220 def update_dataset(
1221 self,
1222 dataset: Dataset,
1223 fields: Sequence[str],
1224 retry: retries.Retry = DEFAULT_RETRY,
1225 timeout: TimeoutType = DEFAULT_TIMEOUT,
1226 update_mode: Optional[UpdateMode] = None,
1227 ) -> Dataset:
1228 """Change some fields of a dataset.
1229
1230 Use ``fields`` to specify which fields to update. At least one field
1231 must be provided. If a field is listed in ``fields`` and is ``None`` in
1232 ``dataset``, it will be deleted.
1233
1234 For example, to update the default expiration times, specify
1235 both properties in the ``fields`` argument:
1236
1237 .. code-block:: python
1238
1239 bigquery_client.update_dataset(
1240 dataset,
1241 [
1242 "default_partition_expiration_ms",
1243 "default_table_expiration_ms",
1244 ]
1245 )
1246
1247 If ``dataset.etag`` is not ``None``, the update will only
1248 succeed if the dataset on the server has the same ETag. Thus
1249 reading a dataset with ``get_dataset``, changing its fields,
1250 and then passing it to ``update_dataset`` will ensure that the changes
1251 will only be saved if no modifications to the dataset occurred
1252 since the read.
1253
1254 Args:
1255 dataset (google.cloud.bigquery.dataset.Dataset):
1256 The dataset to update.
1257 fields (Sequence[str]):
1258 The properties of ``dataset`` to change. These are strings
1259 corresponding to the properties of
1260 :class:`~google.cloud.bigquery.dataset.Dataset`.
1261 retry (Optional[google.api_core.retry.Retry]):
1262 How to retry the RPC.
1263 timeout (Optional[float]):
1264 The number of seconds to wait for the underlying HTTP transport
1265 before using ``retry``.
1266 update_mode (Optional[google.cloud.bigquery.enums.UpdateMode]):
1267 Specifies the kind of information to update in a dataset.
1268 By default, dataset metadata (e.g. friendlyName, description,
1269 labels, etc) and ACL information are updated. This argument can
1270 take on the following possible enum values.
1271
1272 * :attr:`~google.cloud.bigquery.enums.UPDATE_MODE_UNSPECIFIED`:
1273 The default value. Behavior defaults to UPDATE_FULL.
1274 * :attr:`~google.cloud.bigquery.enums.UpdateMode.UPDATE_METADATA`:
1275 Includes metadata information for the dataset, such as friendlyName, description, labels, etc.
1276 * :attr:`~google.cloud.bigquery.enums.UpdateMode.UPDATE_ACL`:
1277 Includes ACL information for the dataset, which defines dataset access for one or more entities.
1278 * :attr:`~google.cloud.bigquery.enums.UpdateMode.UPDATE_FULL`:
1279 Includes both dataset metadata and ACL information.
1280
1281 Returns:
1282 google.cloud.bigquery.dataset.Dataset:
1283 The modified ``Dataset`` instance.
1284 """
1285 partial = dataset._build_resource(fields)
1286 if dataset.etag is not None:
1287 headers: Optional[Dict[str, str]] = {"If-Match": dataset.etag}
1288 else:
1289 headers = None
1290 path = dataset.path
1291 span_attributes = {"path": path, "fields": fields}
1292
1293 if update_mode:
1294 query_params = {"updateMode": update_mode.value}
1295 else:
1296 query_params = {}
1297
1298 api_response = self._call_api(
1299 retry,
1300 span_name="BigQuery.updateDataset",
1301 span_attributes=span_attributes,
1302 method="PATCH",
1303 path=path,
1304 data=partial,
1305 headers=headers,
1306 timeout=timeout,
1307 query_params=query_params,
1308 )
1309 return Dataset.from_api_repr(api_response)
1310
1311 def update_model(
1312 self,
1313 model: Model,
1314 fields: Sequence[str],
1315 retry: retries.Retry = DEFAULT_RETRY,
1316 timeout: TimeoutType = DEFAULT_TIMEOUT,
1317 ) -> Model:
1318 """[Beta] Change some fields of a model.
1319
1320 Use ``fields`` to specify which fields to update. At least one field
1321 must be provided. If a field is listed in ``fields`` and is ``None``
1322 in ``model``, the field value will be deleted.
1323
1324 For example, to update the descriptive properties of the model,
1325 specify them in the ``fields`` argument:
1326
1327 .. code-block:: python
1328
1329 bigquery_client.update_model(
1330 model, ["description", "friendly_name"]
1331 )
1332
1333 If ``model.etag`` is not ``None``, the update will only succeed if
1334 the model on the server has the same ETag. Thus reading a model with
1335 ``get_model``, changing its fields, and then passing it to
1336 ``update_model`` will ensure that the changes will only be saved if
1337 no modifications to the model occurred since the read.
1338
1339 Args:
1340 model (google.cloud.bigquery.model.Model): The model to update.
1341 fields (Sequence[str]):
1342 The properties of ``model`` to change. These are strings
1343 corresponding to the properties of
1344 :class:`~google.cloud.bigquery.model.Model`.
1345 retry (Optional[google.api_core.retry.Retry]):
1346 A description of how to retry the API call.
1347 timeout (Optional[float]):
1348 The number of seconds to wait for the underlying HTTP transport
1349 before using ``retry``.
1350
1351 Returns:
1352 google.cloud.bigquery.model.Model:
1353 The model resource returned from the API call.
1354 """
1355 partial = model._build_resource(fields)
1356 if model.etag:
1357 headers: Optional[Dict[str, str]] = {"If-Match": model.etag}
1358 else:
1359 headers = None
1360 path = model.path
1361 span_attributes = {"path": path, "fields": fields}
1362
1363 api_response = self._call_api(
1364 retry,
1365 span_name="BigQuery.updateModel",
1366 span_attributes=span_attributes,
1367 method="PATCH",
1368 path=path,
1369 data=partial,
1370 headers=headers,
1371 timeout=timeout,
1372 )
1373 return Model.from_api_repr(api_response)
1374
1375 def update_routine(
1376 self,
1377 routine: Routine,
1378 fields: Sequence[str],
1379 retry: retries.Retry = DEFAULT_RETRY,
1380 timeout: TimeoutType = DEFAULT_TIMEOUT,
1381 ) -> Routine:
1382 """[Beta] Change some fields of a routine.
1383
1384 Use ``fields`` to specify which fields to update. At least one field
1385 must be provided. If a field is listed in ``fields`` and is ``None``
1386 in ``routine``, the field value will be deleted.
1387
1388 For example, to update the description property of the routine,
1389 specify it in the ``fields`` argument:
1390
1391 .. code-block:: python
1392
1393 bigquery_client.update_routine(
1394 routine, ["description"]
1395 )
1396
1397 .. warning::
1398 During beta, partial updates are not supported. You must provide
1399 all fields in the resource.
1400
1401 If :attr:`~google.cloud.bigquery.routine.Routine.etag` is not
1402 ``None``, the update will only succeed if the resource on the server
1403 has the same ETag. Thus reading a routine with
1404 :func:`~google.cloud.bigquery.client.Client.get_routine`, changing
1405 its fields, and then passing it to this method will ensure that the
1406 changes will only be saved if no modifications to the resource
1407 occurred since the read.
1408
1409 Args:
1410 routine (google.cloud.bigquery.routine.Routine):
1411 The routine to update.
1412 fields (Sequence[str]):
1413 The fields of ``routine`` to change, spelled as the
1414 :class:`~google.cloud.bigquery.routine.Routine` properties.
1415 retry (Optional[google.api_core.retry.Retry]):
1416 A description of how to retry the API call.
1417 timeout (Optional[float]):
1418 The number of seconds to wait for the underlying HTTP transport
1419 before using ``retry``.
1420
1421 Returns:
1422 google.cloud.bigquery.routine.Routine:
1423 The routine resource returned from the API call.
1424 """
1425 partial = routine._build_resource(fields)
1426 if routine.etag:
1427 headers: Optional[Dict[str, str]] = {"If-Match": routine.etag}
1428 else:
1429 headers = None
1430
1431 # TODO: remove when routines update supports partial requests.
1432 partial["routineReference"] = routine.reference.to_api_repr()
1433
1434 path = routine.path
1435 span_attributes = {"path": path, "fields": fields}
1436
1437 api_response = self._call_api(
1438 retry,
1439 span_name="BigQuery.updateRoutine",
1440 span_attributes=span_attributes,
1441 method="PUT",
1442 path=path,
1443 data=partial,
1444 headers=headers,
1445 timeout=timeout,
1446 )
1447 return Routine.from_api_repr(api_response)
1448
1449 def update_table(
1450 self,
1451 table: Table,
1452 fields: Sequence[str],
1453 autodetect_schema: bool = False,
1454 retry: retries.Retry = DEFAULT_RETRY,
1455 timeout: TimeoutType = DEFAULT_TIMEOUT,
1456 ) -> Table:
1457 """Change some fields of a table.
1458
1459 Use ``fields`` to specify which fields to update. At least one field
1460 must be provided. If a field is listed in ``fields`` and is ``None``
1461 in ``table``, the field value will be deleted.
1462
1463 For example, to update the descriptive properties of the table,
1464 specify them in the ``fields`` argument:
1465
1466 .. code-block:: python
1467
1468 bigquery_client.update_table(
1469 table,
1470 ["description", "friendly_name"]
1471 )
1472
1473 If ``table.etag`` is not ``None``, the update will only succeed if
1474 the table on the server has the same ETag. Thus reading a table with
1475 ``get_table``, changing its fields, and then passing it to
1476 ``update_table`` will ensure that the changes will only be saved if
1477 no modifications to the table occurred since the read.
1478
1479 Args:
1480 table (google.cloud.bigquery.table.Table): The table to update.
1481 fields (Sequence[str]):
1482 The fields of ``table`` to change, spelled as the
1483 :class:`~google.cloud.bigquery.table.Table` properties.
1484 autodetect_schema (bool):
1485 Specifies if the schema of the table should be autodetected when
1486 updating the table from the underlying source. Only applicable
1487 for external tables.
1488 retry (Optional[google.api_core.retry.Retry]):
1489 A description of how to retry the API call.
1490 timeout (Optional[float]):
1491 The number of seconds to wait for the underlying HTTP transport
1492 before using ``retry``.
1493
1494 Returns:
1495 google.cloud.bigquery.table.Table:
1496 The table resource returned from the API call.
1497 """
1498 partial = table._build_resource(fields)
1499 if table.etag is not None:
1500 headers: Optional[Dict[str, str]] = {"If-Match": table.etag}
1501 else:
1502 headers = None
1503
1504 path = table.path
1505 span_attributes = {"path": path, "fields": fields}
1506
1507 if autodetect_schema:
1508 query_params = {"autodetect_schema": True}
1509 else:
1510 query_params = {}
1511
1512 api_response = self._call_api(
1513 retry,
1514 span_name="BigQuery.updateTable",
1515 span_attributes=span_attributes,
1516 method="PATCH",
1517 path=path,
1518 query_params=query_params,
1519 data=partial,
1520 headers=headers,
1521 timeout=timeout,
1522 )
1523 return Table.from_api_repr(api_response)
1524
1525 def list_models(
1526 self,
1527 dataset: Union[Dataset, DatasetReference, DatasetListItem, str],
1528 max_results: Optional[int] = None,
1529 page_token: Optional[str] = None,
1530 retry: retries.Retry = DEFAULT_RETRY,
1531 timeout: TimeoutType = DEFAULT_TIMEOUT,
1532 page_size: Optional[int] = None,
1533 ) -> page_iterator.Iterator:
1534 """[Beta] List models in the dataset.
1535
1536 See
1537 https://cloud.google.com/bigquery/docs/reference/rest/v2/models/list
1538
1539 Args:
1540 dataset (Union[ \
1541 google.cloud.bigquery.dataset.Dataset, \
1542 google.cloud.bigquery.dataset.DatasetReference, \
1543 google.cloud.bigquery.dataset.DatasetListItem, \
1544 str, \
1545 ]):
1546 A reference to the dataset whose models to list from the
1547 BigQuery API. If a string is passed in, this method attempts
1548 to create a dataset reference from a string using
1549 :func:`google.cloud.bigquery.dataset.DatasetReference.from_string`.
1550 max_results (Optional[int]):
1551 Maximum number of models to return. Defaults to a
1552 value set by the API.
1553 page_token (Optional[str]):
1554 Token representing a cursor into the models. If not passed,
1555 the API will return the first page of models. The token marks
1556 the beginning of the iterator to be returned and the value of
1557 the ``page_token`` can be accessed at ``next_page_token`` of the
1558 :class:`~google.api_core.page_iterator.HTTPIterator`.
1559 retry (Optional[google.api_core.retry.Retry]):
1560 How to retry the RPC.
1561 timeout (Optional[float]):
1562 The number of seconds to wait for the underlying HTTP transport
1563 before using ``retry``.
1564 page_size (Optional[int]):
1565 Maximum number of models to return per page.
1566 Defaults to a value set by the API.
1567
1568 Returns:
1569 google.api_core.page_iterator.Iterator:
1570 Iterator of
1571 :class:`~google.cloud.bigquery.model.Model` contained
1572 within the requested dataset.
1573 """
1574 dataset = self._dataset_from_arg(dataset)
1575
1576 path = "%s/models" % dataset.path
1577 span_attributes = {"path": path}
1578
1579 def api_request(*args, **kwargs):
1580 return self._call_api(
1581 retry,
1582 span_name="BigQuery.listModels",
1583 span_attributes=span_attributes,
1584 *args,
1585 timeout=timeout,
1586 **kwargs,
1587 )
1588
1589 result = page_iterator.HTTPIterator(
1590 client=self,
1591 api_request=api_request,
1592 path=path,
1593 item_to_value=_item_to_model,
1594 items_key="models",
1595 page_token=page_token,
1596 max_results=max_results,
1597 page_size=page_size,
1598 )
1599 result.dataset = dataset # type: ignore
1600 return result
1601
1602 def list_routines(
1603 self,
1604 dataset: Union[Dataset, DatasetReference, DatasetListItem, str],
1605 max_results: Optional[int] = None,
1606 page_token: Optional[str] = None,
1607 retry: retries.Retry = DEFAULT_RETRY,
1608 timeout: TimeoutType = DEFAULT_TIMEOUT,
1609 page_size: Optional[int] = None,
1610 ) -> page_iterator.Iterator:
1611 """[Beta] List routines in the dataset.
1612
1613 See
1614 https://cloud.google.com/bigquery/docs/reference/rest/v2/routines/list
1615
1616 Args:
1617 dataset (Union[ \
1618 google.cloud.bigquery.dataset.Dataset, \
1619 google.cloud.bigquery.dataset.DatasetReference, \
1620 google.cloud.bigquery.dataset.DatasetListItem, \
1621 str, \
1622 ]):
1623 A reference to the dataset whose routines to list from the
1624 BigQuery API. If a string is passed in, this method attempts
1625 to create a dataset reference from a string using
1626 :func:`google.cloud.bigquery.dataset.DatasetReference.from_string`.
1627 max_results (Optional[int]):
1628 Maximum number of routines to return. Defaults
1629 to a value set by the API.
1630 page_token (Optional[str]):
1631 Token representing a cursor into the routines. If not passed,
1632 the API will return the first page of routines. The token marks
1633 the beginning of the iterator to be returned and the value of the
1634 ``page_token`` can be accessed at ``next_page_token`` of the
1635 :class:`~google.api_core.page_iterator.HTTPIterator`.
1636 retry (Optional[google.api_core.retry.Retry]):
1637 How to retry the RPC.
1638 timeout (Optional[float]):
1639 The number of seconds to wait for the underlying HTTP transport
1640 before using ``retry``.
1641 page_size (Optional[int]):
1642 Maximum number of routines to return per page.
1643 Defaults to a value set by the API.
1644
1645 Returns:
1646 google.api_core.page_iterator.Iterator:
1647 Iterator of all
1648 :class:`~google.cloud.bigquery.routine.Routine`s contained
1649 within the requested dataset, limited by ``max_results``.
1650 """
1651 dataset = self._dataset_from_arg(dataset)
1652 path = "{}/routines".format(dataset.path)
1653
1654 span_attributes = {"path": path}
1655
1656 def api_request(*args, **kwargs):
1657 return self._call_api(
1658 retry,
1659 span_name="BigQuery.listRoutines",
1660 span_attributes=span_attributes,
1661 *args,
1662 timeout=timeout,
1663 **kwargs,
1664 )
1665
1666 result = page_iterator.HTTPIterator(
1667 client=self,
1668 api_request=api_request,
1669 path=path,
1670 item_to_value=_item_to_routine,
1671 items_key="routines",
1672 page_token=page_token,
1673 max_results=max_results,
1674 page_size=page_size,
1675 )
1676 result.dataset = dataset # type: ignore
1677 return result
1678
1679 def list_tables(
1680 self,
1681 dataset: Union[Dataset, DatasetReference, DatasetListItem, str],
1682 max_results: Optional[int] = None,
1683 page_token: Optional[str] = None,
1684 retry: retries.Retry = DEFAULT_RETRY,
1685 timeout: TimeoutType = DEFAULT_TIMEOUT,
1686 page_size: Optional[int] = None,
1687 ) -> page_iterator.Iterator:
1688 """List tables in the dataset.
1689
1690 See
1691 https://cloud.google.com/bigquery/docs/reference/rest/v2/tables/list
1692
1693 Args:
1694 dataset (Union[ \
1695 google.cloud.bigquery.dataset.Dataset, \
1696 google.cloud.bigquery.dataset.DatasetReference, \
1697 google.cloud.bigquery.dataset.DatasetListItem, \
1698 str, \
1699 ]):
1700 A reference to the dataset whose tables to list from the
1701 BigQuery API. If a string is passed in, this method attempts
1702 to create a dataset reference from a string using
1703 :func:`google.cloud.bigquery.dataset.DatasetReference.from_string`.
1704 max_results (Optional[int]):
1705 Maximum number of tables to return. Defaults
1706 to a value set by the API.
1707 page_token (Optional[str]):
1708 Token representing a cursor into the tables. If not passed,
1709 the API will return the first page of tables. The token marks
1710 the beginning of the iterator to be returned and the value of
1711 the ``page_token`` can be accessed at ``next_page_token`` of the
1712 :class:`~google.api_core.page_iterator.HTTPIterator`.
1713 retry (Optional[google.api_core.retry.Retry]):
1714 How to retry the RPC.
1715 timeout (Optional[float]):
1716 The number of seconds to wait for the underlying HTTP transport
1717 before using ``retry``.
1718 page_size (Optional[int]):
1719 Maximum number of tables to return per page.
1720 Defaults to a value set by the API.
1721
1722 Returns:
1723 google.api_core.page_iterator.Iterator:
1724 Iterator of
1725 :class:`~google.cloud.bigquery.table.TableListItem` contained
1726 within the requested dataset.
1727 """
1728 dataset = self._dataset_from_arg(dataset)
1729 path = "%s/tables" % dataset.path
1730 span_attributes = {"path": path}
1731
1732 def api_request(*args, **kwargs):
1733 return self._call_api(
1734 retry,
1735 span_name="BigQuery.listTables",
1736 span_attributes=span_attributes,
1737 *args,
1738 timeout=timeout,
1739 **kwargs,
1740 )
1741
1742 result = page_iterator.HTTPIterator(
1743 client=self,
1744 api_request=api_request,
1745 path=path,
1746 item_to_value=_item_to_table,
1747 items_key="tables",
1748 page_token=page_token,
1749 max_results=max_results,
1750 page_size=page_size,
1751 )
1752 result.dataset = dataset # type: ignore
1753 return result
1754
1755 def delete_dataset(
1756 self,
1757 dataset: Union[Dataset, DatasetReference, DatasetListItem, str],
1758 delete_contents: bool = False,
1759 retry: retries.Retry = DEFAULT_RETRY,
1760 timeout: TimeoutType = DEFAULT_TIMEOUT,
1761 not_found_ok: bool = False,
1762 ) -> None:
1763 """Delete a dataset.
1764
1765 See
1766 https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets/delete
1767
1768 Args:
1769 dataset (Union[ \
1770 google.cloud.bigquery.dataset.Dataset, \
1771 google.cloud.bigquery.dataset.DatasetReference, \
1772 google.cloud.bigquery.dataset.DatasetListItem, \
1773 str, \
1774 ]):
1775 A reference to the dataset to delete. If a string is passed
1776 in, this method attempts to create a dataset reference from a
1777 string using
1778 :func:`google.cloud.bigquery.dataset.DatasetReference.from_string`.
1779 delete_contents (Optional[bool]):
1780 If True, delete all the tables in the dataset. If False and
1781 the dataset contains tables, the request will fail.
1782 Default is False.
1783 retry (Optional[google.api_core.retry.Retry]):
1784 How to retry the RPC.
1785 timeout (Optional[float]):
1786 The number of seconds to wait for the underlying HTTP transport
1787 before using ``retry``.
1788 not_found_ok (Optional[bool]):
1789 Defaults to ``False``. If ``True``, ignore "not found" errors
1790 when deleting the dataset.
1791 """
1792 dataset = self._dataset_from_arg(dataset)
1793 params = {}
1794 path = dataset.path
1795 if delete_contents:
1796 params["deleteContents"] = "true"
1797 span_attributes = {"path": path, "deleteContents": delete_contents}
1798 else:
1799 span_attributes = {"path": path}
1800
1801 try:
1802 self._call_api(
1803 retry,
1804 span_name="BigQuery.deleteDataset",
1805 span_attributes=span_attributes,
1806 method="DELETE",
1807 path=path,
1808 query_params=params,
1809 timeout=timeout,
1810 )
1811 except core_exceptions.NotFound:
1812 if not not_found_ok:
1813 raise
1814
1815 def delete_model(
1816 self,
1817 model: Union[Model, ModelReference, str],
1818 retry: retries.Retry = DEFAULT_RETRY,
1819 timeout: TimeoutType = DEFAULT_TIMEOUT,
1820 not_found_ok: bool = False,
1821 ) -> None:
1822 """[Beta] Delete a model
1823
1824 See
1825 https://cloud.google.com/bigquery/docs/reference/rest/v2/models/delete
1826
1827 Args:
1828 model (Union[ \
1829 google.cloud.bigquery.model.Model, \
1830 google.cloud.bigquery.model.ModelReference, \
1831 str, \
1832 ]):
1833 A reference to the model to delete. If a string is passed in,
1834 this method attempts to create a model reference from a
1835 string using
1836 :func:`google.cloud.bigquery.model.ModelReference.from_string`.
1837 retry (Optional[google.api_core.retry.Retry]):
1838 How to retry the RPC.
1839 timeout (Optional[float]):
1840 The number of seconds to wait for the underlying HTTP transport
1841 before using ``retry``.
1842 not_found_ok (Optional[bool]):
1843 Defaults to ``False``. If ``True``, ignore "not found" errors
1844 when deleting the model.
1845 """
1846 if isinstance(model, str):
1847 model = ModelReference.from_string(model, default_project=self.project)
1848
1849 if not isinstance(model, (Model, ModelReference)):
1850 raise TypeError("model must be a Model or a ModelReference")
1851
1852 path = model.path
1853 try:
1854 span_attributes = {"path": path}
1855 self._call_api(
1856 retry,
1857 span_name="BigQuery.deleteModel",
1858 span_attributes=span_attributes,
1859 method="DELETE",
1860 path=path,
1861 timeout=timeout,
1862 )
1863 except core_exceptions.NotFound:
1864 if not not_found_ok:
1865 raise
1866
1867 def delete_job_metadata(
1868 self,
1869 job_id: Union[str, LoadJob, CopyJob, ExtractJob, QueryJob],
1870 project: Optional[str] = None,
1871 location: Optional[str] = None,
1872 retry: retries.Retry = DEFAULT_RETRY,
1873 timeout: TimeoutType = DEFAULT_TIMEOUT,
1874 not_found_ok: bool = False,
1875 ):
1876 """[Beta] Delete job metadata from job history.
1877
1878 Note: This does not stop a running job. Use
1879 :func:`~google.cloud.bigquery.client.Client.cancel_job` instead.
1880
1881 Args:
1882 job_id (Union[ \
1883 str, \
1884 LoadJob, \
1885 CopyJob, \
1886 ExtractJob, \
1887 QueryJob \
1888 ]): Job or job identifier.
1889 project (Optional[str]):
1890 ID of the project which owns the job (defaults to the client's project).
1891 location (Optional[str]):
1892 Location where the job was run. Ignored if ``job_id`` is a job
1893 object.
1894 retry (Optional[google.api_core.retry.Retry]):
1895 How to retry the RPC.
1896 timeout (Optional[float]):
1897 The number of seconds to wait for the underlying HTTP transport
1898 before using ``retry``.
1899 not_found_ok (Optional[bool]):
1900 Defaults to ``False``. If ``True``, ignore "not found" errors
1901 when deleting the job.
1902 """
1903 extra_params = {}
1904
1905 project, location, job_id = _extract_job_reference(
1906 job_id, project=project, location=location
1907 )
1908
1909 if project is None:
1910 project = self.project
1911
1912 if location is None:
1913 location = self.location
1914
1915 # Location is always required for jobs.delete()
1916 extra_params["location"] = location
1917
1918 path = f"/projects/{project}/jobs/{job_id}/delete"
1919
1920 span_attributes = {"path": path, "job_id": job_id, "location": location}
1921
1922 try:
1923 self._call_api(
1924 retry,
1925 span_name="BigQuery.deleteJob",
1926 span_attributes=span_attributes,
1927 method="DELETE",
1928 path=path,
1929 query_params=extra_params,
1930 timeout=timeout,
1931 )
1932 except google.api_core.exceptions.NotFound:
1933 if not not_found_ok:
1934 raise
1935
1936 def delete_routine(
1937 self,
1938 routine: Union[Routine, RoutineReference, str],
1939 retry: retries.Retry = DEFAULT_RETRY,
1940 timeout: TimeoutType = DEFAULT_TIMEOUT,
1941 not_found_ok: bool = False,
1942 ) -> None:
1943 """[Beta] Delete a routine.
1944
1945 See
1946 https://cloud.google.com/bigquery/docs/reference/rest/v2/routines/delete
1947
1948 Args:
1949 routine (Union[ \
1950 google.cloud.bigquery.routine.Routine, \
1951 google.cloud.bigquery.routine.RoutineReference, \
1952 str, \
1953 ]):
1954 A reference to the routine to delete. If a string is passed
1955 in, this method attempts to create a routine reference from a
1956 string using
1957 :func:`google.cloud.bigquery.routine.RoutineReference.from_string`.
1958 retry (Optional[google.api_core.retry.Retry]):
1959 How to retry the RPC.
1960 timeout (Optional[float]):
1961 The number of seconds to wait for the underlying HTTP transport
1962 before using ``retry``.
1963 not_found_ok (Optional[bool]):
1964 Defaults to ``False``. If ``True``, ignore "not found" errors
1965 when deleting the routine.
1966 """
1967 if isinstance(routine, str):
1968 routine = RoutineReference.from_string(
1969 routine, default_project=self.project
1970 )
1971 path = routine.path
1972
1973 if not isinstance(routine, (Routine, RoutineReference)):
1974 raise TypeError("routine must be a Routine or a RoutineReference")
1975
1976 try:
1977 span_attributes = {"path": path}
1978 self._call_api(
1979 retry,
1980 span_name="BigQuery.deleteRoutine",
1981 span_attributes=span_attributes,
1982 method="DELETE",
1983 path=path,
1984 timeout=timeout,
1985 )
1986 except core_exceptions.NotFound:
1987 if not not_found_ok:
1988 raise
1989
1990 def delete_table(
1991 self,
1992 table: Union[Table, TableReference, TableListItem, str],
1993 retry: retries.Retry = DEFAULT_RETRY,
1994 timeout: TimeoutType = DEFAULT_TIMEOUT,
1995 not_found_ok: bool = False,
1996 ) -> None:
1997 """Delete a table
1998
1999 See
2000 https://cloud.google.com/bigquery/docs/reference/rest/v2/tables/delete
2001
2002 Args:
2003 table (Union[ \
2004 google.cloud.bigquery.table.Table, \
2005 google.cloud.bigquery.table.TableReference, \
2006 google.cloud.bigquery.table.TableListItem, \
2007 str, \
2008 ]):
2009 A reference to the table to delete. If a string is passed in,
2010 this method attempts to create a table reference from a
2011 string using
2012 :func:`google.cloud.bigquery.table.TableReference.from_string`.
2013 retry (Optional[google.api_core.retry.Retry]):
2014 How to retry the RPC.
2015 timeout (Optional[float]):
2016 The number of seconds to wait for the underlying HTTP transport
2017 before using ``retry``.
2018 not_found_ok (Optional[bool]):
2019 Defaults to ``False``. If ``True``, ignore "not found" errors
2020 when deleting the table.
2021 """
2022 table = _table_arg_to_table_ref(table, default_project=self.project)
2023 if not isinstance(table, TableReference):
2024 raise TypeError("Unable to get TableReference for table '{}'".format(table))
2025
2026 try:
2027 path = table.path
2028 span_attributes = {"path": path}
2029 self._call_api(
2030 retry,
2031 span_name="BigQuery.deleteTable",
2032 span_attributes=span_attributes,
2033 method="DELETE",
2034 path=path,
2035 timeout=timeout,
2036 )
2037 except core_exceptions.NotFound:
2038 if not not_found_ok:
2039 raise
2040
2041 def _get_query_results(
2042 self,
2043 job_id: str,
2044 retry: retries.Retry,
2045 project: Optional[str] = None,
2046 timeout_ms: Optional[int] = None,
2047 location: Optional[str] = None,
2048 timeout: TimeoutType = DEFAULT_TIMEOUT,
2049 page_size: int = 0,
2050 start_index: Optional[int] = None,
2051 ) -> _QueryResults:
2052 """Get the query results object for a query job.
2053
2054 Args:
2055 job_id (str): Name of the query job.
2056 retry (google.api_core.retry.Retry):
2057 How to retry the RPC.
2058 project (Optional[str]):
2059 Project ID for the query job (defaults to the project of the client).
2060 timeout_ms (Optional[int]):
2061 Number of milliseconds the the API call should wait for the query
2062 to complete before the request times out.
2063 location (Optional[str]): Location of the query job.
2064 timeout (Optional[float]):
2065 The number of seconds to wait for the underlying HTTP transport
2066 before using ``retry``. If set, this connection timeout may be
2067 increased to a minimum value. This prevents retries on what
2068 would otherwise be a successful response.
2069 page_size (Optional[int]):
2070 Maximum number of rows in a single response. See maxResults in
2071 the jobs.getQueryResults REST API.
2072 start_index (Optional[int]):
2073 Zero-based index of the starting row. See startIndex in the
2074 jobs.getQueryResults REST API.
2075
2076 Returns:
2077 google.cloud.bigquery.query._QueryResults:
2078 A new ``_QueryResults`` instance.
2079 """
2080
2081 extra_params: Dict[str, Any] = {"maxResults": page_size}
2082
2083 if timeout is not None:
2084 if not isinstance(timeout, (int, float)):
2085 timeout = _MIN_GET_QUERY_RESULTS_TIMEOUT
2086 else:
2087 timeout = max(timeout, _MIN_GET_QUERY_RESULTS_TIMEOUT)
2088
2089 if page_size > 0:
2090 extra_params["formatOptions.useInt64Timestamp"] = True
2091
2092 if project is None:
2093 project = self.project
2094
2095 if timeout_ms is not None:
2096 extra_params["timeoutMs"] = timeout_ms
2097
2098 if location is None:
2099 location = self.location
2100
2101 if location is not None:
2102 extra_params["location"] = location
2103
2104 if start_index is not None:
2105 extra_params["startIndex"] = start_index
2106
2107 path = "/projects/{}/queries/{}".format(project, job_id)
2108
2109 # This call is typically made in a polling loop that checks whether the
2110 # job is complete (from QueryJob.done(), called ultimately from
2111 # QueryJob.result()). So we don't need to poll here.
2112 span_attributes = {"path": path}
2113 resource = self._call_api(
2114 retry,
2115 span_name="BigQuery.getQueryResults",
2116 span_attributes=span_attributes,
2117 method="GET",
2118 path=path,
2119 query_params=extra_params,
2120 timeout=timeout,
2121 )
2122 return _QueryResults.from_api_repr(resource)
2123
2124 def job_from_resource(
2125 self, resource: dict
2126 ) -> Union[job.CopyJob, job.ExtractJob, job.LoadJob, job.QueryJob, job.UnknownJob]:
2127 """Detect correct job type from resource and instantiate.
2128
2129 Args:
2130 resource (Dict): one job resource from API response
2131
2132 Returns:
2133 Union[job.CopyJob, job.ExtractJob, job.LoadJob, job.QueryJob, job.UnknownJob]:
2134 The job instance, constructed via the resource.
2135 """
2136 config = resource.get("configuration", {})
2137 if "load" in config:
2138 return job.LoadJob.from_api_repr(resource, self)
2139 elif "copy" in config:
2140 return job.CopyJob.from_api_repr(resource, self)
2141 elif "extract" in config:
2142 return job.ExtractJob.from_api_repr(resource, self)
2143 elif "query" in config:
2144 return job.QueryJob.from_api_repr(resource, self)
2145 return job.UnknownJob.from_api_repr(resource, self)
2146
2147 def create_job(
2148 self,
2149 job_config: dict,
2150 retry: retries.Retry = DEFAULT_RETRY,
2151 timeout: TimeoutType = DEFAULT_TIMEOUT,
2152 ) -> Union[job.LoadJob, job.CopyJob, job.ExtractJob, job.QueryJob]:
2153 """Create a new job.
2154
2155 Args:
2156 job_config (dict): configuration job representation returned from the API.
2157 retry (Optional[google.api_core.retry.Retry]): How to retry the RPC.
2158 timeout (Optional[float]):
2159 The number of seconds to wait for the underlying HTTP transport
2160 before using ``retry``.
2161
2162 Returns:
2163 Union[ \
2164 google.cloud.bigquery.job.LoadJob, \
2165 google.cloud.bigquery.job.CopyJob, \
2166 google.cloud.bigquery.job.ExtractJob, \
2167 google.cloud.bigquery.job.QueryJob \
2168 ]:
2169 A new job instance.
2170 """
2171
2172 if "load" in job_config:
2173 load_job_config = google.cloud.bigquery.job.LoadJobConfig.from_api_repr(
2174 job_config
2175 )
2176 destination = _get_sub_prop(job_config, ["load", "destinationTable"])
2177 source_uris = _get_sub_prop(job_config, ["load", "sourceUris"])
2178 destination = TableReference.from_api_repr(destination)
2179 return self.load_table_from_uri(
2180 source_uris,
2181 destination,
2182 job_config=typing.cast(LoadJobConfig, load_job_config),
2183 retry=retry,
2184 timeout=timeout,
2185 )
2186 elif "copy" in job_config:
2187 copy_job_config = google.cloud.bigquery.job.CopyJobConfig.from_api_repr(
2188 job_config
2189 )
2190 destination = _get_sub_prop(job_config, ["copy", "destinationTable"])
2191 destination = TableReference.from_api_repr(destination)
2192 return self.copy_table(
2193 [], # Source table(s) already in job_config resource.
2194 destination,
2195 job_config=typing.cast(CopyJobConfig, copy_job_config),
2196 retry=retry,
2197 timeout=timeout,
2198 )
2199 elif "extract" in job_config:
2200 extract_job_config = (
2201 google.cloud.bigquery.job.ExtractJobConfig.from_api_repr(job_config)
2202 )
2203 source = _get_sub_prop(job_config, ["extract", "sourceTable"])
2204 if source:
2205 source_type = "Table"
2206 source = TableReference.from_api_repr(source)
2207 else:
2208 source = _get_sub_prop(job_config, ["extract", "sourceModel"])
2209 source_type = "Model"
2210 source = ModelReference.from_api_repr(source)
2211 destination_uris = _get_sub_prop(job_config, ["extract", "destinationUris"])
2212 return self.extract_table(
2213 source,
2214 destination_uris,
2215 job_config=typing.cast(ExtractJobConfig, extract_job_config),
2216 retry=retry,
2217 timeout=timeout,
2218 source_type=source_type,
2219 )
2220 elif "query" in job_config:
2221 query_job_config = google.cloud.bigquery.job.QueryJobConfig.from_api_repr(
2222 job_config
2223 )
2224 query = _get_sub_prop(job_config, ["query", "query"])
2225 return self.query(
2226 query,
2227 job_config=typing.cast(QueryJobConfig, query_job_config),
2228 retry=retry,
2229 timeout=timeout,
2230 )
2231 else:
2232 raise TypeError("Invalid job configuration received.")
2233
2234 def get_job(
2235 self,
2236 job_id: Union[str, job.LoadJob, job.CopyJob, job.ExtractJob, job.QueryJob],
2237 project: Optional[str] = None,
2238 location: Optional[str] = None,
2239 retry: retries.Retry = DEFAULT_RETRY,
2240 timeout: TimeoutType = DEFAULT_GET_JOB_TIMEOUT,
2241 ) -> Union[job.LoadJob, job.CopyJob, job.ExtractJob, job.QueryJob, job.UnknownJob]:
2242 """Fetch a job for the project associated with this client.
2243
2244 See
2245 https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/get
2246
2247 Args:
2248 job_id (Union[ \
2249 str, \
2250 job.LoadJob, \
2251 job.CopyJob, \
2252 job.ExtractJob, \
2253 job.QueryJob \
2254 ]):
2255 Job identifier.
2256 project (Optional[str]):
2257 ID of the project which owns the job (defaults to the client's project).
2258 location (Optional[str]):
2259 Location where the job was run. Ignored if ``job_id`` is a job
2260 object.
2261 retry (Optional[google.api_core.retry.Retry]):
2262 How to retry the RPC.
2263 timeout (Optional[float]):
2264 The number of seconds to wait for the underlying HTTP transport
2265 before using ``retry``.
2266
2267 Returns:
2268 Union[job.LoadJob, job.CopyJob, job.ExtractJob, job.QueryJob, job.UnknownJob]:
2269 Job instance, based on the resource returned by the API.
2270 """
2271 extra_params = {"projection": "full"}
2272
2273 project, location, job_id = _extract_job_reference(
2274 job_id, project=project, location=location
2275 )
2276
2277 if project is None:
2278 project = self.project
2279
2280 if location is None:
2281 location = self.location
2282
2283 if location is not None:
2284 extra_params["location"] = location
2285
2286 path = "/projects/{}/jobs/{}".format(project, job_id)
2287
2288 span_attributes = {"path": path, "job_id": job_id, "location": location}
2289
2290 resource = self._call_api(
2291 retry,
2292 span_name="BigQuery.getJob",
2293 span_attributes=span_attributes,
2294 method="GET",
2295 path=path,
2296 query_params=extra_params,
2297 timeout=timeout,
2298 )
2299
2300 return self.job_from_resource(resource)
2301
2302 def cancel_job(
2303 self,
2304 job_id: str,
2305 project: Optional[str] = None,
2306 location: Optional[str] = None,
2307 retry: retries.Retry = DEFAULT_RETRY,
2308 timeout: TimeoutType = DEFAULT_TIMEOUT,
2309 ) -> Union[job.LoadJob, job.CopyJob, job.ExtractJob, job.QueryJob]:
2310 """Attempt to cancel a job from a job ID.
2311
2312 See
2313 https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/cancel
2314
2315 Args:
2316 job_id (Union[ \
2317 str, \
2318 google.cloud.bigquery.job.LoadJob, \
2319 google.cloud.bigquery.job.CopyJob, \
2320 google.cloud.bigquery.job.ExtractJob, \
2321 google.cloud.bigquery.job.QueryJob \
2322 ]): Job identifier.
2323 project (Optional[str]):
2324 ID of the project which owns the job (defaults to the client's project).
2325 location (Optional[str]):
2326 Location where the job was run. Ignored if ``job_id`` is a job
2327 object.
2328 retry (Optional[google.api_core.retry.Retry]):
2329 How to retry the RPC.
2330 timeout (Optional[float]):
2331 The number of seconds to wait for the underlying HTTP transport
2332 before using ``retry``.
2333
2334 Returns:
2335 Union[ \
2336 google.cloud.bigquery.job.LoadJob, \
2337 google.cloud.bigquery.job.CopyJob, \
2338 google.cloud.bigquery.job.ExtractJob, \
2339 google.cloud.bigquery.job.QueryJob, \
2340 ]:
2341 Job instance, based on the resource returned by the API.
2342 """
2343 extra_params = {"projection": "full"}
2344
2345 project, location, job_id = _extract_job_reference(
2346 job_id, project=project, location=location
2347 )
2348
2349 if project is None:
2350 project = self.project
2351
2352 if location is None:
2353 location = self.location
2354
2355 if location is not None:
2356 extra_params["location"] = location
2357
2358 path = "/projects/{}/jobs/{}/cancel".format(project, job_id)
2359
2360 span_attributes = {"path": path, "job_id": job_id, "location": location}
2361
2362 resource = self._call_api(
2363 retry,
2364 span_name="BigQuery.cancelJob",
2365 span_attributes=span_attributes,
2366 method="POST",
2367 path=path,
2368 query_params=extra_params,
2369 timeout=timeout,
2370 )
2371
2372 job_instance = self.job_from_resource(resource["job"]) # never an UnknownJob
2373
2374 return typing.cast(
2375 Union[job.LoadJob, job.CopyJob, job.ExtractJob, job.QueryJob],
2376 job_instance,
2377 )
2378
2379 def list_jobs(
2380 self,
2381 project: Optional[str] = None,
2382 parent_job: Optional[Union[QueryJob, str]] = None,
2383 max_results: Optional[int] = None,
2384 page_token: Optional[str] = None,
2385 all_users: Optional[bool] = None,
2386 state_filter: Optional[str] = None,
2387 retry: retries.Retry = DEFAULT_RETRY,
2388 timeout: TimeoutType = DEFAULT_TIMEOUT,
2389 min_creation_time: Optional[datetime.datetime] = None,
2390 max_creation_time: Optional[datetime.datetime] = None,
2391 page_size: Optional[int] = None,
2392 ) -> page_iterator.Iterator:
2393 """List jobs for the project associated with this client.
2394
2395 See
2396 https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/list
2397
2398 Args:
2399 project (Optional[str]):
2400 Project ID to use for retreiving datasets. Defaults
2401 to the client's project.
2402 parent_job (Optional[Union[ \
2403 google.cloud.bigquery.job._AsyncJob, \
2404 str, \
2405 ]]):
2406 If set, retrieve only child jobs of the specified parent.
2407 max_results (Optional[int]):
2408 Maximum number of jobs to return.
2409 page_token (Optional[str]):
2410 Opaque marker for the next "page" of jobs. If not
2411 passed, the API will return the first page of jobs. The token
2412 marks the beginning of the iterator to be returned and the
2413 value of the ``page_token`` can be accessed at
2414 ``next_page_token`` of
2415 :class:`~google.api_core.page_iterator.HTTPIterator`.
2416 all_users (Optional[bool]):
2417 If true, include jobs owned by all users in the project.
2418 Defaults to :data:`False`.
2419 state_filter (Optional[str]):
2420 If set, include only jobs matching the given state. One of:
2421 * ``"done"``
2422 * ``"pending"``
2423 * ``"running"``
2424 retry (Optional[google.api_core.retry.Retry]):
2425 How to retry the RPC.
2426 timeout (Optional[float]):
2427 The number of seconds to wait for the underlying HTTP transport
2428 before using ``retry``.
2429 min_creation_time (Optional[datetime.datetime]):
2430 Min value for job creation time. If set, only jobs created
2431 after or at this timestamp are returned. If the datetime has
2432 no time zone assumes UTC time.
2433 max_creation_time (Optional[datetime.datetime]):
2434 Max value for job creation time. If set, only jobs created
2435 before or at this timestamp are returned. If the datetime has
2436 no time zone assumes UTC time.
2437 page_size (Optional[int]):
2438 Maximum number of jobs to return per page.
2439
2440 Returns:
2441 google.api_core.page_iterator.Iterator:
2442 Iterable of job instances.
2443 """
2444 if isinstance(parent_job, job._AsyncJob):
2445 parent_job = parent_job.job_id # pytype: disable=attribute-error
2446
2447 extra_params = {
2448 "allUsers": all_users,
2449 "stateFilter": state_filter,
2450 "minCreationTime": _str_or_none(
2451 google.cloud._helpers._millis_from_datetime(min_creation_time)
2452 ),
2453 "maxCreationTime": _str_or_none(
2454 google.cloud._helpers._millis_from_datetime(max_creation_time)
2455 ),
2456 "projection": "full",
2457 "parentJobId": parent_job,
2458 }
2459
2460 extra_params = {
2461 param: value for param, value in extra_params.items() if value is not None
2462 }
2463
2464 if project is None:
2465 project = self.project
2466
2467 path = "/projects/%s/jobs" % (project,)
2468
2469 span_attributes = {"path": path}
2470
2471 def api_request(*args, **kwargs):
2472 return self._call_api(
2473 retry,
2474 span_name="BigQuery.listJobs",
2475 span_attributes=span_attributes,
2476 *args,
2477 timeout=timeout,
2478 **kwargs,
2479 )
2480
2481 return page_iterator.HTTPIterator(
2482 client=self,
2483 api_request=api_request,
2484 path=path,
2485 item_to_value=_item_to_job,
2486 items_key="jobs",
2487 page_token=page_token,
2488 max_results=max_results,
2489 extra_params=extra_params,
2490 page_size=page_size,
2491 )
2492
2493 def load_table_from_uri(
2494 self,
2495 source_uris: Union[str, Sequence[str]],
2496 destination: Union[Table, TableReference, TableListItem, str],
2497 job_id: Optional[str] = None,
2498 job_id_prefix: Optional[str] = None,
2499 location: Optional[str] = None,
2500 project: Optional[str] = None,
2501 job_config: Optional[LoadJobConfig] = None,
2502 retry: retries.Retry = DEFAULT_RETRY,
2503 timeout: TimeoutType = DEFAULT_TIMEOUT,
2504 ) -> job.LoadJob:
2505 """Starts a job for loading data into a table from Cloud Storage.
2506
2507 See
2508 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationload
2509
2510 Args:
2511 source_uris (Union[str, Sequence[str]]):
2512 URIs of data files to be loaded; in format
2513 ``gs://<bucket_name>/<object_name_or_glob>``.
2514 destination (Union[ \
2515 google.cloud.bigquery.table.Table, \
2516 google.cloud.bigquery.table.TableReference, \
2517 google.cloud.bigquery.table.TableListItem, \
2518 str, \
2519 ]):
2520 Table into which data is to be loaded. If a string is passed
2521 in, this method attempts to create a table reference from a
2522 string using
2523 :func:`google.cloud.bigquery.table.TableReference.from_string`.
2524 job_id (Optional[str]): Name of the job.
2525 job_id_prefix (Optional[str]):
2526 The user-provided prefix for a randomly generated job ID.
2527 This parameter will be ignored if a ``job_id`` is also given.
2528 location (Optional[str]):
2529 Location where to run the job. Must match the location of the
2530 destination table.
2531 project (Optional[str]):
2532 Project ID of the project of where to run the job. Defaults
2533 to the client's project.
2534 job_config (Optional[google.cloud.bigquery.job.LoadJobConfig]):
2535 Extra configuration options for the job.
2536 retry (Optional[google.api_core.retry.Retry]):
2537 How to retry the RPC.
2538 timeout (Optional[float]):
2539 The number of seconds to wait for the underlying HTTP transport
2540 before using ``retry``.
2541
2542 Returns:
2543 google.cloud.bigquery.job.LoadJob: A new load job.
2544
2545 Raises:
2546 TypeError:
2547 If ``job_config`` is not an instance of
2548 :class:`~google.cloud.bigquery.job.LoadJobConfig` class.
2549 """
2550 job_id = _make_job_id(job_id, job_id_prefix)
2551
2552 if project is None:
2553 project = self.project
2554
2555 if location is None:
2556 location = self.location
2557
2558 job_ref = job._JobReference(job_id, project=project, location=location)
2559
2560 if isinstance(source_uris, str):
2561 source_uris = [source_uris]
2562
2563 destination = _table_arg_to_table_ref(destination, default_project=self.project)
2564
2565 if job_config is not None:
2566 _verify_job_config_type(job_config, LoadJobConfig)
2567 else:
2568 job_config = job.LoadJobConfig()
2569
2570 new_job_config = job_config._fill_from_default(self._default_load_job_config)
2571
2572 load_job = job.LoadJob(job_ref, source_uris, destination, self, new_job_config)
2573 load_job._begin(retry=retry, timeout=timeout)
2574
2575 return load_job
2576
2577 def load_table_from_file(
2578 self,
2579 file_obj: IO[bytes],
2580 destination: Union[Table, TableReference, TableListItem, str],
2581 rewind: bool = False,
2582 size: Optional[int] = None,
2583 num_retries: int = _DEFAULT_NUM_RETRIES,
2584 job_id: Optional[str] = None,
2585 job_id_prefix: Optional[str] = None,
2586 location: Optional[str] = None,
2587 project: Optional[str] = None,
2588 job_config: Optional[LoadJobConfig] = None,
2589 timeout: ResumableTimeoutType = DEFAULT_TIMEOUT,
2590 ) -> job.LoadJob:
2591 """Upload the contents of this table from a file-like object.
2592
2593 Similar to :meth:`load_table_from_uri`, this method creates, starts and
2594 returns a :class:`~google.cloud.bigquery.job.LoadJob`.
2595
2596 Args:
2597 file_obj (IO[bytes]):
2598 A file handle opened in binary mode for reading.
2599 destination (Union[Table, \
2600 TableReference, \
2601 TableListItem, \
2602 str \
2603 ]):
2604 Table into which data is to be loaded. If a string is passed
2605 in, this method attempts to create a table reference from a
2606 string using
2607 :func:`google.cloud.bigquery.table.TableReference.from_string`.
2608 rewind (Optional[bool]):
2609 If True, seek to the beginning of the file handle before
2610 reading the file. Defaults to False.
2611 size (Optional[int]):
2612 The number of bytes to read from the file handle. If size is
2613 ``None`` or large, resumable upload will be used. Otherwise,
2614 multipart upload will be used.
2615 num_retries (Optional[int]): Number of upload retries. Defaults to 6.
2616 job_id (Optional[str]): Name of the job.
2617 job_id_prefix (Optional[str]):
2618 The user-provided prefix for a randomly generated job ID.
2619 This parameter will be ignored if a ``job_id`` is also given.
2620 location (Optional[str]):
2621 Location where to run the job. Must match the location of the
2622 destination table.
2623 project (Optional[str]):
2624 Project ID of the project of where to run the job. Defaults
2625 to the client's project.
2626 job_config (Optional[LoadJobConfig]):
2627 Extra configuration options for the job.
2628 timeout (Optional[float]):
2629 The number of seconds to wait for the underlying HTTP transport
2630 before using ``retry``. Depending on the retry strategy, a request
2631 may be repeated several times using the same timeout each time.
2632 Defaults to None.
2633
2634 Can also be passed as a tuple (connect_timeout, read_timeout).
2635 See :meth:`requests.Session.request` documentation for details.
2636
2637 Returns:
2638 google.cloud.bigquery.job.LoadJob: A new load job.
2639
2640 Raises:
2641 ValueError:
2642 If ``size`` is not passed in and can not be determined, or if
2643 the ``file_obj`` can be detected to be a file opened in text
2644 mode.
2645
2646 TypeError:
2647 If ``job_config`` is not an instance of
2648 :class:`~google.cloud.bigquery.job.LoadJobConfig` class.
2649 """
2650 job_id = _make_job_id(job_id, job_id_prefix)
2651
2652 if project is None:
2653 project = self.project
2654
2655 if location is None:
2656 location = self.location
2657
2658 destination = _table_arg_to_table_ref(destination, default_project=self.project)
2659 job_ref = job._JobReference(job_id, project=project, location=location)
2660
2661 if job_config is not None:
2662 _verify_job_config_type(job_config, LoadJobConfig)
2663 else:
2664 job_config = job.LoadJobConfig()
2665
2666 new_job_config = job_config._fill_from_default(self._default_load_job_config)
2667
2668 load_job = job.LoadJob(job_ref, None, destination, self, new_job_config)
2669 job_resource = load_job.to_api_repr()
2670
2671 if rewind:
2672 file_obj.seek(0, os.SEEK_SET)
2673
2674 _check_mode(file_obj)
2675
2676 try:
2677 if size is None or size >= _MAX_MULTIPART_SIZE:
2678 response = self._do_resumable_upload(
2679 file_obj, job_resource, num_retries, timeout, project=project
2680 )
2681 else:
2682 response = self._do_multipart_upload(
2683 file_obj, job_resource, size, num_retries, timeout, project=project
2684 )
2685 except resumable_media.InvalidResponse as exc:
2686 raise exceptions.from_http_response(exc.response)
2687
2688 return typing.cast(LoadJob, self.job_from_resource(response.json()))
2689
2690 def load_table_from_dataframe(
2691 self,
2692 dataframe: "pandas.DataFrame", # type: ignore
2693 destination: Union[Table, TableReference, str],
2694 num_retries: int = _DEFAULT_NUM_RETRIES,
2695 job_id: Optional[str] = None,
2696 job_id_prefix: Optional[str] = None,
2697 location: Optional[str] = None,
2698 project: Optional[str] = None,
2699 job_config: Optional[LoadJobConfig] = None,
2700 parquet_compression: str = "snappy",
2701 timeout: ResumableTimeoutType = DEFAULT_TIMEOUT,
2702 ) -> job.LoadJob:
2703 """Upload the contents of a table from a pandas DataFrame.
2704
2705 Similar to :meth:`load_table_from_uri`, this method creates, starts and
2706 returns a :class:`~google.cloud.bigquery.job.LoadJob`.
2707
2708 .. note::
2709
2710 REPEATED fields are NOT supported when using the CSV source format.
2711 They are supported when using the PARQUET source format, but
2712 due to the way they are encoded in the ``parquet`` file,
2713 a mismatch with the existing table schema can occur, so
2714 REPEATED fields are not properly supported when using ``pyarrow<4.0.0``
2715 using the parquet format.
2716
2717 https://github.com/googleapis/python-bigquery/issues/19
2718
2719 Args:
2720 dataframe (pandas.Dataframe):
2721 A :class:`~pandas.DataFrame` containing the data to load.
2722 destination (Union[ \
2723 Table, \
2724 TableReference, \
2725 str \
2726 ]):
2727 The destination table to use for loading the data. If it is an
2728 existing table, the schema of the :class:`~pandas.DataFrame`
2729 must match the schema of the destination table. If the table
2730 does not yet exist, the schema is inferred from the
2731 :class:`~pandas.DataFrame`.
2732
2733 If a string is passed in, this method attempts to create a
2734 table reference from a string using
2735 :func:`google.cloud.bigquery.table.TableReference.from_string`.
2736 num_retries (Optional[int]): Number of upload retries. Defaults to 6.
2737 job_id (Optional[str]): Name of the job.
2738 job_id_prefix (Optional[str]):
2739 The user-provided prefix for a randomly generated
2740 job ID. This parameter will be ignored if a ``job_id`` is
2741 also given.
2742 location (Optional[str]):
2743 Location where to run the job. Must match the location of the
2744 destination table.
2745 project (Optional[str]):
2746 Project ID of the project of where to run the job. Defaults
2747 to the client's project.
2748 job_config (Optional[LoadJobConfig]):
2749 Extra configuration options for the job.
2750
2751 To override the default pandas data type conversions, supply
2752 a value for
2753 :attr:`~google.cloud.bigquery.job.LoadJobConfig.schema` with
2754 column names matching those of the dataframe. The BigQuery
2755 schema is used to determine the correct data type conversion.
2756 Indexes are not loaded.
2757
2758 By default, this method uses the parquet source format. To
2759 override this, supply a value for
2760 :attr:`~google.cloud.bigquery.job.LoadJobConfig.source_format`
2761 with the format name. Currently only
2762 :attr:`~google.cloud.bigquery.job.SourceFormat.CSV` and
2763 :attr:`~google.cloud.bigquery.job.SourceFormat.PARQUET` are
2764 supported.
2765 parquet_compression (Optional[str]):
2766 [Beta] The compression method to use if intermittently
2767 serializing ``dataframe`` to a parquet file.
2768 Defaults to "snappy".
2769
2770 The argument is directly passed as the ``compression``
2771 argument to the underlying ``pyarrow.parquet.write_table()``
2772 method (the default value "snappy" gets converted to uppercase).
2773 https://arrow.apache.org/docs/python/generated/pyarrow.parquet.write_table.html#pyarrow-parquet-write-table
2774
2775 If the job config schema is missing, the argument is directly
2776 passed as the ``compression`` argument to the underlying
2777 ``DataFrame.to_parquet()`` method.
2778 https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_parquet.html#pandas.DataFrame.to_parquet
2779 timeout (Optional[flaot]):
2780 The number of seconds to wait for the underlying HTTP transport
2781 before using ``retry``. Depending on the retry strategy, a request may
2782 be repeated several times using the same timeout each time.
2783 Defaults to None.
2784
2785 Can also be passed as a tuple (connect_timeout, read_timeout).
2786 See :meth:`requests.Session.request` documentation for details.
2787
2788 Returns:
2789 google.cloud.bigquery.job.LoadJob: A new load job.
2790
2791 Raises:
2792 ValueError:
2793 If a usable parquet engine cannot be found. This method
2794 requires :mod:`pyarrow` to be installed.
2795 TypeError:
2796 If ``job_config`` is not an instance of
2797 :class:`~google.cloud.bigquery.job.LoadJobConfig` class.
2798 """
2799 job_id = _make_job_id(job_id, job_id_prefix)
2800
2801 if job_config is not None:
2802 _verify_job_config_type(job_config, LoadJobConfig)
2803 else:
2804 job_config = job.LoadJobConfig()
2805
2806 new_job_config = job_config._fill_from_default(self._default_load_job_config)
2807
2808 supported_formats = {job.SourceFormat.CSV, job.SourceFormat.PARQUET}
2809 if new_job_config.source_format is None:
2810 # default value
2811 new_job_config.source_format = job.SourceFormat.PARQUET
2812
2813 if (
2814 new_job_config.source_format == job.SourceFormat.PARQUET
2815 and new_job_config.parquet_options is None
2816 ):
2817 parquet_options = ParquetOptions()
2818 # default value
2819 parquet_options.enable_list_inference = True
2820 new_job_config.parquet_options = parquet_options
2821
2822 if new_job_config.source_format not in supported_formats:
2823 raise ValueError(
2824 "Got unexpected source_format: '{}'. Currently, only PARQUET and CSV are supported".format(
2825 new_job_config.source_format
2826 )
2827 )
2828
2829 if pyarrow is None and new_job_config.source_format == job.SourceFormat.PARQUET:
2830 # pyarrow is now the only supported parquet engine.
2831 raise ValueError("This method requires pyarrow to be installed")
2832
2833 if location is None:
2834 location = self.location
2835
2836 # If table schema is not provided, we try to fetch the existing table
2837 # schema, and check if dataframe schema is compatible with it - except
2838 # for WRITE_TRUNCATE jobs, the existing schema does not matter then.
2839 if (
2840 not new_job_config.schema
2841 and new_job_config.write_disposition != job.WriteDisposition.WRITE_TRUNCATE
2842 ):
2843 try:
2844 table = self.get_table(destination)
2845 except core_exceptions.NotFound:
2846 pass
2847 else:
2848 columns_and_indexes = frozenset(
2849 name
2850 for name, _ in _pandas_helpers.list_columns_and_indexes(dataframe)
2851 )
2852 new_job_config.schema = [
2853 # Field description and policy tags are not needed to
2854 # serialize a data frame.
2855 SchemaField(
2856 field.name,
2857 field.field_type,
2858 mode=field.mode,
2859 fields=field.fields,
2860 )
2861 # schema fields not present in the dataframe are not needed
2862 for field in table.schema
2863 if field.name in columns_and_indexes
2864 ]
2865
2866 new_job_config.schema = _pandas_helpers.dataframe_to_bq_schema(
2867 dataframe, new_job_config.schema
2868 )
2869
2870 if not new_job_config.schema:
2871 # the schema could not be fully detected
2872 warnings.warn(
2873 "Schema could not be detected for all columns. Loading from a "
2874 "dataframe without a schema will be deprecated in the future, "
2875 "please provide a schema.",
2876 PendingDeprecationWarning,
2877 stacklevel=2,
2878 )
2879
2880 tmpfd, tmppath = tempfile.mkstemp(
2881 suffix="_job_{}.{}".format(job_id[:8], new_job_config.source_format.lower())
2882 )
2883 os.close(tmpfd)
2884
2885 try:
2886 if new_job_config.source_format == job.SourceFormat.PARQUET:
2887 if new_job_config.schema:
2888 if parquet_compression == "snappy": # adjust the default value
2889 parquet_compression = parquet_compression.upper()
2890
2891 _pandas_helpers.dataframe_to_parquet(
2892 dataframe,
2893 new_job_config.schema,
2894 tmppath,
2895 parquet_compression=parquet_compression,
2896 parquet_use_compliant_nested_type=True,
2897 )
2898 else:
2899 dataframe.to_parquet(
2900 tmppath,
2901 engine="pyarrow",
2902 compression=parquet_compression,
2903 **(
2904 {"use_compliant_nested_type": True}
2905 if _versions_helpers.PYARROW_VERSIONS.use_compliant_nested_type
2906 else {}
2907 ),
2908 )
2909
2910 else:
2911 dataframe.to_csv(
2912 tmppath,
2913 index=False,
2914 header=False,
2915 encoding="utf-8",
2916 float_format="%.17g",
2917 date_format="%Y-%m-%d %H:%M:%S.%f",
2918 )
2919
2920 with open(tmppath, "rb") as tmpfile:
2921 file_size = os.path.getsize(tmppath)
2922 return self.load_table_from_file(
2923 tmpfile,
2924 destination,
2925 num_retries=num_retries,
2926 rewind=True,
2927 size=file_size,
2928 job_id=job_id,
2929 job_id_prefix=job_id_prefix,
2930 location=location,
2931 project=project,
2932 job_config=new_job_config,
2933 timeout=timeout,
2934 )
2935
2936 finally:
2937 os.remove(tmppath)
2938
2939 def load_table_from_json(
2940 self,
2941 json_rows: Iterable[Dict[str, Any]],
2942 destination: Union[Table, TableReference, TableListItem, str],
2943 num_retries: int = _DEFAULT_NUM_RETRIES,
2944 job_id: Optional[str] = None,
2945 job_id_prefix: Optional[str] = None,
2946 location: Optional[str] = None,
2947 project: Optional[str] = None,
2948 job_config: Optional[LoadJobConfig] = None,
2949 timeout: ResumableTimeoutType = DEFAULT_TIMEOUT,
2950 ) -> job.LoadJob:
2951 """Upload the contents of a table from a JSON string or dict.
2952
2953 Args:
2954 json_rows (Iterable[Dict[str, Any]]):
2955 Row data to be inserted. Keys must match the table schema fields
2956 and values must be JSON-compatible representations.
2957
2958 .. note::
2959
2960 If your data is already a newline-delimited JSON string,
2961 it is best to wrap it into a file-like object and pass it
2962 to :meth:`~google.cloud.bigquery.client.Client.load_table_from_file`::
2963
2964 import io
2965 from google.cloud import bigquery
2966
2967 data = u'{"foo": "bar"}'
2968 data_as_file = io.StringIO(data)
2969
2970 client = bigquery.Client()
2971 client.load_table_from_file(data_as_file, ...)
2972
2973 destination (Union[ \
2974 Table, \
2975 TableReference, \
2976 TableListItem, \
2977 str \
2978 ]):
2979 Table into which data is to be loaded. If a string is passed
2980 in, this method attempts to create a table reference from a
2981 string using
2982 :func:`google.cloud.bigquery.table.TableReference.from_string`.
2983 num_retries (Optional[int]): Number of upload retries. Defaults to 6.
2984 job_id (Optional[str]): Name of the job.
2985 job_id_prefix (Optional[str]):
2986 The user-provided prefix for a randomly generated job ID.
2987 This parameter will be ignored if a ``job_id`` is also given.
2988 location (Optional[str]):
2989 Location where to run the job. Must match the location of the
2990 destination table.
2991 project (Optional[str]):
2992 Project ID of the project of where to run the job. Defaults
2993 to the client's project.
2994 job_config (Optional[LoadJobConfig]):
2995 Extra configuration options for the job. The ``source_format``
2996 setting is always set to
2997 :attr:`~google.cloud.bigquery.job.SourceFormat.NEWLINE_DELIMITED_JSON`.
2998 timeout (Optional[float]):
2999 The number of seconds to wait for the underlying HTTP transport
3000 before using ``retry``. Depending on the retry strategy, a request may
3001 be repeated several times using the same timeout each time.
3002 Defaults to None.
3003
3004 Can also be passed as a tuple (connect_timeout, read_timeout).
3005 See :meth:`requests.Session.request` documentation for details.
3006
3007 Returns:
3008 google.cloud.bigquery.job.LoadJob: A new load job.
3009
3010 Raises:
3011 TypeError:
3012 If ``job_config`` is not an instance of
3013 :class:`~google.cloud.bigquery.job.LoadJobConfig` class.
3014 """
3015 job_id = _make_job_id(job_id, job_id_prefix)
3016
3017 if job_config is not None:
3018 _verify_job_config_type(job_config, LoadJobConfig)
3019 else:
3020 job_config = job.LoadJobConfig()
3021
3022 new_job_config = job_config._fill_from_default(self._default_load_job_config)
3023
3024 new_job_config.source_format = job.SourceFormat.NEWLINE_DELIMITED_JSON
3025
3026 # In specific conditions, we check if the table alread exists, and/or
3027 # set the autodetect value for the user. For exact conditions, see table
3028 # https://github.com/googleapis/python-bigquery/issues/1228#issuecomment-1910946297
3029 if new_job_config.schema is None and new_job_config.autodetect is None:
3030 if new_job_config.write_disposition in (
3031 job.WriteDisposition.WRITE_TRUNCATE,
3032 job.WriteDisposition.WRITE_EMPTY,
3033 ):
3034 new_job_config.autodetect = True
3035 else:
3036 try:
3037 self.get_table(destination)
3038 except core_exceptions.NotFound:
3039 new_job_config.autodetect = True
3040 else:
3041 new_job_config.autodetect = False
3042
3043 if project is None:
3044 project = self.project
3045
3046 if location is None:
3047 location = self.location
3048
3049 destination = _table_arg_to_table_ref(destination, default_project=self.project)
3050
3051 data_str = "\n".join(json.dumps(item, ensure_ascii=False) for item in json_rows)
3052 encoded_str = data_str.encode()
3053 data_file = io.BytesIO(encoded_str)
3054 return self.load_table_from_file(
3055 data_file,
3056 destination,
3057 size=len(encoded_str),
3058 num_retries=num_retries,
3059 job_id=job_id,
3060 job_id_prefix=job_id_prefix,
3061 location=location,
3062 project=project,
3063 job_config=new_job_config,
3064 timeout=timeout,
3065 )
3066
3067 def _do_resumable_upload(
3068 self,
3069 stream: IO[bytes],
3070 metadata: Mapping[str, str],
3071 num_retries: int,
3072 timeout: Optional[ResumableTimeoutType],
3073 project: Optional[str] = None,
3074 ) -> "requests.Response":
3075 """Perform a resumable upload.
3076
3077 Args:
3078 stream (IO[bytes]): A bytes IO object open for reading.
3079 metadata (Mapping[str, str]): The metadata associated with the upload.
3080 num_retries (int):
3081 Number of upload retries. (Deprecated: This
3082 argument will be removed in a future release.)
3083 timeout (Optional[float]):
3084 The number of seconds to wait for the underlying HTTP transport
3085 before using ``retry``. Depending on the retry strategy, a request may
3086 be repeated several times using the same timeout each time.
3087
3088 Can also be passed as a tuple (connect_timeout, read_timeout).
3089 See :meth:`requests.Session.request` documentation for details.
3090 project (Optional[str]):
3091 Project ID of the project of where to run the upload. Defaults
3092 to the client's project.
3093
3094 Returns:
3095 The "200 OK" response object returned after the final chunk
3096 is uploaded.
3097 """
3098 upload, transport = self._initiate_resumable_upload(
3099 stream, metadata, num_retries, timeout, project=project
3100 )
3101
3102 while not upload.finished:
3103 response = upload.transmit_next_chunk(transport, timeout=timeout)
3104
3105 return response
3106
3107 def _initiate_resumable_upload(
3108 self,
3109 stream: IO[bytes],
3110 metadata: Mapping[str, str],
3111 num_retries: int,
3112 timeout: Optional[ResumableTimeoutType],
3113 project: Optional[str] = None,
3114 ):
3115 """Initiate a resumable upload.
3116
3117 Args:
3118 stream (IO[bytes]): A bytes IO object open for reading.
3119 metadata (Mapping[str, str]): The metadata associated with the upload.
3120 num_retries (int):
3121 Number of upload retries. (Deprecated: This
3122 argument will be removed in a future release.)
3123 timeout (Optional[float]):
3124 The number of seconds to wait for the underlying HTTP transport
3125 before using ``retry``. Depending on the retry strategy, a request may
3126 be repeated several times using the same timeout each time.
3127
3128 Can also be passed as a tuple (connect_timeout, read_timeout).
3129 See :meth:`requests.Session.request` documentation for details.
3130 project (Optional[str]):
3131 Project ID of the project of where to run the upload. Defaults
3132 to the client's project.
3133
3134 Returns:
3135 Tuple:
3136 Pair of
3137
3138 * The :class:`~google.resumable_media.requests.ResumableUpload`
3139 that was created
3140 * The ``transport`` used to initiate the upload.
3141 """
3142 chunk_size = _DEFAULT_CHUNKSIZE
3143 transport = self._http
3144 headers = _get_upload_headers(self._connection.user_agent)
3145
3146 if project is None:
3147 project = self.project
3148 # TODO: Increase the minimum version of google-cloud-core to 1.6.0
3149 # and remove this logic. See:
3150 # https://github.com/googleapis/python-bigquery/issues/509
3151 hostname = (
3152 self._connection.API_BASE_URL
3153 if not hasattr(self._connection, "get_api_base_url_for_mtls")
3154 else self._connection.get_api_base_url_for_mtls()
3155 )
3156 upload_url = _RESUMABLE_URL_TEMPLATE.format(host=hostname, project=project)
3157
3158 # TODO: modify ResumableUpload to take a retry.Retry object
3159 # that it can use for the initial RPC.
3160 upload = ResumableUpload(upload_url, chunk_size, headers=headers)
3161
3162 if num_retries is not None:
3163 upload._retry_strategy = resumable_media.RetryStrategy(
3164 max_retries=num_retries
3165 )
3166
3167 upload.initiate(
3168 transport,
3169 stream,
3170 metadata,
3171 _GENERIC_CONTENT_TYPE,
3172 stream_final=False,
3173 timeout=timeout,
3174 )
3175
3176 return upload, transport
3177
3178 def _do_multipart_upload(
3179 self,
3180 stream: IO[bytes],
3181 metadata: Mapping[str, str],
3182 size: int,
3183 num_retries: int,
3184 timeout: Optional[ResumableTimeoutType],
3185 project: Optional[str] = None,
3186 ):
3187 """Perform a multipart upload.
3188
3189 Args:
3190 stream (IO[bytes]): A bytes IO object open for reading.
3191 metadata (Mapping[str, str]): The metadata associated with the upload.
3192 size (int):
3193 The number of bytes to be uploaded (which will be read
3194 from ``stream``). If not provided, the upload will be
3195 concluded once ``stream`` is exhausted (or :data:`None`).
3196 num_retries (int):
3197 Number of upload retries. (Deprecated: This
3198 argument will be removed in a future release.)
3199 timeout (Optional[float]):
3200 The number of seconds to wait for the underlying HTTP transport
3201 before using ``retry``. Depending on the retry strategy, a request may
3202 be repeated several times using the same timeout each time.
3203
3204 Can also be passed as a tuple (connect_timeout, read_timeout).
3205 See :meth:`requests.Session.request` documentation for details.
3206 project (Optional[str]):
3207 Project ID of the project of where to run the upload. Defaults
3208 to the client's project.
3209
3210 Returns:
3211 requests.Response:
3212 The "200 OK" response object returned after the multipart
3213 upload request.
3214
3215 Raises:
3216 ValueError:
3217 if the ``stream`` has fewer than ``size``
3218 bytes remaining.
3219 """
3220 data = stream.read(size)
3221 if len(data) < size:
3222 msg = _READ_LESS_THAN_SIZE.format(size, len(data))
3223 raise ValueError(msg)
3224
3225 headers = _get_upload_headers(self._connection.user_agent)
3226
3227 if project is None:
3228 project = self.project
3229
3230 # TODO: Increase the minimum version of google-cloud-core to 1.6.0
3231 # and remove this logic. See:
3232 # https://github.com/googleapis/python-bigquery/issues/509
3233 hostname = (
3234 self._connection.API_BASE_URL
3235 if not hasattr(self._connection, "get_api_base_url_for_mtls")
3236 else self._connection.get_api_base_url_for_mtls()
3237 )
3238 upload_url = _MULTIPART_URL_TEMPLATE.format(host=hostname, project=project)
3239 upload = MultipartUpload(upload_url, headers=headers)
3240
3241 if num_retries is not None:
3242 upload._retry_strategy = resumable_media.RetryStrategy(
3243 max_retries=num_retries
3244 )
3245
3246 response = upload.transmit(
3247 self._http, data, metadata, _GENERIC_CONTENT_TYPE, timeout=timeout
3248 )
3249
3250 return response
3251
3252 def copy_table(
3253 self,
3254 sources: Union[
3255 Table,
3256 TableReference,
3257 TableListItem,
3258 str,
3259 Sequence[Union[Table, TableReference, TableListItem, str]],
3260 ],
3261 destination: Union[Table, TableReference, TableListItem, str],
3262 job_id: Optional[str] = None,
3263 job_id_prefix: Optional[str] = None,
3264 location: Optional[str] = None,
3265 project: Optional[str] = None,
3266 job_config: Optional[CopyJobConfig] = None,
3267 retry: retries.Retry = DEFAULT_RETRY,
3268 timeout: TimeoutType = DEFAULT_TIMEOUT,
3269 ) -> job.CopyJob:
3270 """Copy one or more tables to another table.
3271
3272 See
3273 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationtablecopy
3274
3275 Args:
3276 sources (Union[ \
3277 google.cloud.bigquery.table.Table, \
3278 google.cloud.bigquery.table.TableReference, \
3279 google.cloud.bigquery.table.TableListItem, \
3280 str, \
3281 Sequence[ \
3282 Union[ \
3283 google.cloud.bigquery.table.Table, \
3284 google.cloud.bigquery.table.TableReference, \
3285 google.cloud.bigquery.table.TableListItem, \
3286 str, \
3287 ] \
3288 ], \
3289 ]):
3290 Table or tables to be copied.
3291 destination (Union[ \
3292 google.cloud.bigquery.table.Table, \
3293 google.cloud.bigquery.table.TableReference, \
3294 google.cloud.bigquery.table.TableListItem, \
3295 str, \
3296 ]):
3297 Table into which data is to be copied.
3298 job_id (Optional[str]): The ID of the job.
3299 job_id_prefix (Optional[str]):
3300 The user-provided prefix for a randomly generated job ID.
3301 This parameter will be ignored if a ``job_id`` is also given.
3302 location (Optional[str]):
3303 Location where to run the job. Must match the location of any
3304 source table as well as the destination table.
3305 project (Optional[str]):
3306 Project ID of the project of where to run the job. Defaults
3307 to the client's project.
3308 job_config (Optional[google.cloud.bigquery.job.CopyJobConfig]):
3309 Extra configuration options for the job.
3310 retry (Optional[google.api_core.retry.Retry]):
3311 How to retry the RPC.
3312 timeout (Optional[float]):
3313 The number of seconds to wait for the underlying HTTP transport
3314 before using ``retry``.
3315
3316 Returns:
3317 google.cloud.bigquery.job.CopyJob: A new copy job instance.
3318
3319 Raises:
3320 TypeError:
3321 If ``job_config`` is not an instance of :class:`~google.cloud.bigquery.job.CopyJobConfig`
3322 class.
3323 """
3324 job_id = _make_job_id(job_id, job_id_prefix)
3325
3326 if project is None:
3327 project = self.project
3328
3329 if location is None:
3330 location = self.location
3331
3332 job_ref = job._JobReference(job_id, project=project, location=location)
3333
3334 # sources can be one of many different input types. (string, Table,
3335 # TableReference, or a sequence of any of those.) Convert them all to a
3336 # list of TableReferences.
3337 #
3338 # _table_arg_to_table_ref leaves lists unmodified.
3339 sources = _table_arg_to_table_ref(sources, default_project=self.project)
3340
3341 if not isinstance(sources, collections_abc.Sequence):
3342 sources = [sources]
3343
3344 sources = [
3345 _table_arg_to_table_ref(source, default_project=self.project)
3346 for source in sources
3347 ]
3348
3349 destination = _table_arg_to_table_ref(destination, default_project=self.project)
3350
3351 if job_config:
3352 _verify_job_config_type(job_config, google.cloud.bigquery.job.CopyJobConfig)
3353 job_config = copy.deepcopy(job_config)
3354
3355 copy_job = job.CopyJob(
3356 job_ref, sources, destination, client=self, job_config=job_config
3357 )
3358 copy_job._begin(retry=retry, timeout=timeout)
3359
3360 return copy_job
3361
3362 def extract_table(
3363 self,
3364 source: Union[Table, TableReference, TableListItem, Model, ModelReference, str],
3365 destination_uris: Union[str, Sequence[str]],
3366 job_id: Optional[str] = None,
3367 job_id_prefix: Optional[str] = None,
3368 location: Optional[str] = None,
3369 project: Optional[str] = None,
3370 job_config: Optional[ExtractJobConfig] = None,
3371 retry: retries.Retry = DEFAULT_RETRY,
3372 timeout: TimeoutType = DEFAULT_TIMEOUT,
3373 source_type: str = "Table",
3374 ) -> job.ExtractJob:
3375 """Start a job to extract a table into Cloud Storage files.
3376
3377 See
3378 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationextract
3379
3380 Args:
3381 source (Union[ \
3382 google.cloud.bigquery.table.Table, \
3383 google.cloud.bigquery.table.TableReference, \
3384 google.cloud.bigquery.table.TableListItem, \
3385 google.cloud.bigquery.model.Model, \
3386 google.cloud.bigquery.model.ModelReference, \
3387 src, \
3388 ]):
3389 Table or Model to be extracted.
3390 destination_uris (Union[str, Sequence[str]]):
3391 URIs of Cloud Storage file(s) into which table data is to be
3392 extracted; in format
3393 ``gs://<bucket_name>/<object_name_or_glob>``.
3394 job_id (Optional[str]): The ID of the job.
3395 job_id_prefix (Optional[str]):
3396 The user-provided prefix for a randomly generated job ID.
3397 This parameter will be ignored if a ``job_id`` is also given.
3398 location (Optional[str]):
3399 Location where to run the job. Must match the location of the
3400 source table.
3401 project (Optional[str]):
3402 Project ID of the project of where to run the job. Defaults
3403 to the client's project.
3404 job_config (Optional[google.cloud.bigquery.job.ExtractJobConfig]):
3405 Extra configuration options for the job.
3406 retry (Optional[google.api_core.retry.Retry]):
3407 How to retry the RPC.
3408 timeout (Optional[float]):
3409 The number of seconds to wait for the underlying HTTP transport
3410 before using ``retry``.
3411 source_type (Optional[str]):
3412 Type of source to be extracted.``Table`` or ``Model``. Defaults to ``Table``.
3413 Returns:
3414 google.cloud.bigquery.job.ExtractJob: A new extract job instance.
3415
3416 Raises:
3417 TypeError:
3418 If ``job_config`` is not an instance of :class:`~google.cloud.bigquery.job.ExtractJobConfig`
3419 class.
3420 ValueError:
3421 If ``source_type`` is not among ``Table``,``Model``.
3422 """
3423 job_id = _make_job_id(job_id, job_id_prefix)
3424
3425 if project is None:
3426 project = self.project
3427
3428 if location is None:
3429 location = self.location
3430
3431 job_ref = job._JobReference(job_id, project=project, location=location)
3432 src = source_type.lower()
3433 if src == "table":
3434 source = _table_arg_to_table_ref(source, default_project=self.project)
3435 elif src == "model":
3436 source = _model_arg_to_model_ref(source, default_project=self.project)
3437 else:
3438 raise ValueError(
3439 "Cannot pass `{}` as a ``source_type``, pass Table or Model".format(
3440 source_type
3441 )
3442 )
3443
3444 if isinstance(destination_uris, str):
3445 destination_uris = [destination_uris]
3446
3447 if job_config:
3448 _verify_job_config_type(
3449 job_config, google.cloud.bigquery.job.ExtractJobConfig
3450 )
3451 job_config = copy.deepcopy(job_config)
3452
3453 extract_job = job.ExtractJob(
3454 job_ref, source, destination_uris, client=self, job_config=job_config
3455 )
3456 extract_job._begin(retry=retry, timeout=timeout)
3457
3458 return extract_job
3459
3460 def query(
3461 self,
3462 query: str,
3463 job_config: Optional[QueryJobConfig] = None,
3464 job_id: Optional[str] = None,
3465 job_id_prefix: Optional[str] = None,
3466 location: Optional[str] = None,
3467 project: Optional[str] = None,
3468 retry: retries.Retry = DEFAULT_RETRY,
3469 timeout: TimeoutType = DEFAULT_TIMEOUT,
3470 job_retry: Optional[retries.Retry] = DEFAULT_JOB_RETRY,
3471 api_method: Union[str, enums.QueryApiMethod] = enums.QueryApiMethod.INSERT,
3472 *,
3473 timestamp_precision: Optional[enums.TimestampPrecision] = None,
3474 ) -> job.QueryJob:
3475 """Run a SQL query.
3476
3477 See
3478 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationquery
3479
3480 Args:
3481 query (str):
3482 SQL query to be executed. Defaults to the standard SQL
3483 dialect. Use the ``job_config`` parameter to change dialects.
3484 job_config (Optional[google.cloud.bigquery.job.QueryJobConfig]):
3485 Extra configuration options for the job.
3486 To override any options that were previously set in
3487 the ``default_query_job_config`` given to the
3488 ``Client`` constructor, manually set those options to ``None``,
3489 or whatever value is preferred.
3490 job_id (Optional[str]): ID to use for the query job.
3491 job_id_prefix (Optional[str]):
3492 The prefix to use for a randomly generated job ID. This parameter
3493 will be ignored if a ``job_id`` is also given.
3494 location (Optional[str]):
3495 Location where to run the job. Must match the location of the
3496 table used in the query as well as the destination table.
3497 project (Optional[str]):
3498 Project ID of the project of where to run the job. Defaults
3499 to the client's project.
3500 retry (Optional[google.api_core.retry.Retry]):
3501 How to retry the RPC. This only applies to making RPC
3502 calls. It isn't used to retry failed jobs. This has
3503 a reasonable default that should only be overridden
3504 with care.
3505 timeout (Optional[float]):
3506 The number of seconds to wait for the underlying HTTP transport
3507 before using ``retry``.
3508 job_retry (Optional[google.api_core.retry.Retry]):
3509 How to retry failed jobs. The default retries
3510 rate-limit-exceeded errors. Passing ``None`` disables
3511 job retry.
3512
3513 Not all jobs can be retried. If ``job_id`` is
3514 provided, then the job returned by the query will not
3515 be retryable, and an exception will be raised if a
3516 non-``None`` (and non-default) value for ``job_retry``
3517 is also provided.
3518
3519 Note that errors aren't detected until ``result()`` is
3520 called on the job returned. The ``job_retry``
3521 specified here becomes the default ``job_retry`` for
3522 ``result()``, where it can also be specified.
3523 api_method (Union[str, enums.QueryApiMethod]):
3524 Method with which to start the query job. By default,
3525 the jobs.insert API is used for starting a query.
3526
3527 See :class:`google.cloud.bigquery.enums.QueryApiMethod` for
3528 details on the difference between the query start methods.
3529 timestamp_precision (Optional[enums.TimestampPrecision]):
3530 [Private Preview] If set to `enums.TimestampPrecision.PICOSECOND`,
3531 timestamp columns of picosecond precision will be returned with
3532 full precision. Otherwise, will truncate to microsecond
3533 precision. Only applies when api_method == `enums.QueryApiMethod.QUERY`.
3534
3535 Returns:
3536 google.cloud.bigquery.job.QueryJob: A new query job instance.
3537
3538 Raises:
3539 TypeError:
3540 If ``job_config`` is not an instance of
3541 :class:`~google.cloud.bigquery.job.QueryJobConfig`
3542 class, or if both ``job_id`` and non-``None`` non-default
3543 ``job_retry`` are provided.
3544 """
3545 _job_helpers.validate_job_retry(job_id, job_retry)
3546
3547 job_id_given = job_id is not None
3548 if job_id_given and api_method == enums.QueryApiMethod.QUERY:
3549 raise TypeError(
3550 "`job_id` was provided, but the 'QUERY' `api_method` was requested."
3551 )
3552
3553 if (
3554 timestamp_precision == enums.TimestampPrecision.PICOSECOND
3555 and api_method != enums.QueryApiMethod.QUERY
3556 ):
3557 raise ValueError(
3558 "Picosecond Timestamp is only supported when `api_method "
3559 "== enums.QueryApiMethod.QUERY`."
3560 )
3561
3562 if project is None:
3563 project = self.project
3564
3565 if location is None:
3566 location = self.location
3567
3568 if job_config is not None:
3569 _verify_job_config_type(job_config, QueryJobConfig)
3570
3571 job_config = _job_helpers.job_config_with_defaults(
3572 job_config, self._default_query_job_config
3573 )
3574
3575 # Note that we haven't modified the original job_config (or
3576 # _default_query_job_config) up to this point.
3577 if api_method == enums.QueryApiMethod.QUERY:
3578 return _job_helpers.query_jobs_query(
3579 self,
3580 query,
3581 job_config,
3582 location,
3583 project,
3584 retry,
3585 timeout,
3586 job_retry,
3587 timestamp_precision=timestamp_precision,
3588 )
3589 elif api_method == enums.QueryApiMethod.INSERT:
3590 return _job_helpers.query_jobs_insert(
3591 self,
3592 query,
3593 job_config,
3594 job_id,
3595 job_id_prefix,
3596 location,
3597 project,
3598 retry,
3599 timeout,
3600 job_retry,
3601 )
3602 else:
3603 raise ValueError(f"Got unexpected value for api_method: {repr(api_method)}")
3604
3605 def query_and_wait(
3606 self,
3607 query,
3608 *,
3609 job_config: Optional[QueryJobConfig] = None,
3610 location: Optional[str] = None,
3611 project: Optional[str] = None,
3612 api_timeout: TimeoutType = DEFAULT_TIMEOUT,
3613 wait_timeout: Union[Optional[float], object] = POLLING_DEFAULT_VALUE,
3614 retry: retries.Retry = DEFAULT_RETRY,
3615 job_retry: retries.Retry = DEFAULT_JOB_RETRY,
3616 page_size: Optional[int] = None,
3617 max_results: Optional[int] = None,
3618 ) -> RowIterator:
3619 """Run the query, wait for it to finish, and return the results.
3620
3621 Args:
3622 query (str):
3623 SQL query to be executed. Defaults to the standard SQL
3624 dialect. Use the ``job_config`` parameter to change dialects.
3625 job_config (Optional[google.cloud.bigquery.job.QueryJobConfig]):
3626 Extra configuration options for the job.
3627 To override any options that were previously set in
3628 the ``default_query_job_config`` given to the
3629 ``Client`` constructor, manually set those options to ``None``,
3630 or whatever value is preferred.
3631 location (Optional[str]):
3632 Location where to run the job. Must match the location of the
3633 table used in the query as well as the destination table.
3634 project (Optional[str]):
3635 Project ID of the project of where to run the job. Defaults
3636 to the client's project.
3637 api_timeout (Optional[float]):
3638 The number of seconds to wait for the underlying HTTP transport
3639 before using ``retry``.
3640 wait_timeout (Optional[Union[float, object]]):
3641 The number of seconds to wait for the query to finish. If the
3642 query doesn't finish before this timeout, the client attempts
3643 to cancel the query. If unset, the underlying REST API calls
3644 have timeouts, but we still wait indefinitely for the job to
3645 finish.
3646 retry (Optional[google.api_core.retry.Retry]):
3647 How to retry the RPC. This only applies to making RPC
3648 calls. It isn't used to retry failed jobs. This has
3649 a reasonable default that should only be overridden
3650 with care.
3651 job_retry (Optional[google.api_core.retry.Retry]):
3652 How to retry failed jobs. The default retries
3653 rate-limit-exceeded errors. Passing ``None`` disables
3654 job retry. Not all jobs can be retried.
3655 page_size (Optional[int]):
3656 The maximum number of rows in each page of results from the
3657 initial jobs.query request. Non-positive values are ignored.
3658 max_results (Optional[int]):
3659 The maximum total number of rows from this request.
3660
3661 Returns:
3662 google.cloud.bigquery.table.RowIterator:
3663 Iterator of row data
3664 :class:`~google.cloud.bigquery.table.Row`-s. During each
3665 page, the iterator will have the ``total_rows`` attribute
3666 set, which counts the total number of rows **in the result
3667 set** (this is distinct from the total number of rows in the
3668 current page: ``iterator.page.num_items``).
3669
3670 If the query is a special query that produces no results, e.g.
3671 a DDL query, an ``_EmptyRowIterator`` instance is returned.
3672
3673 Raises:
3674 TypeError:
3675 If ``job_config`` is not an instance of
3676 :class:`~google.cloud.bigquery.job.QueryJobConfig`
3677 class.
3678 """
3679 return self._query_and_wait_bigframes(
3680 query,
3681 job_config=job_config,
3682 location=location,
3683 project=project,
3684 api_timeout=api_timeout,
3685 wait_timeout=wait_timeout,
3686 retry=retry,
3687 job_retry=job_retry,
3688 page_size=page_size,
3689 max_results=max_results,
3690 )
3691
3692 def _query_and_wait_bigframes(
3693 self,
3694 query,
3695 *,
3696 job_config: Optional[QueryJobConfig] = None,
3697 location: Optional[str] = None,
3698 project: Optional[str] = None,
3699 api_timeout: TimeoutType = DEFAULT_TIMEOUT,
3700 wait_timeout: Union[Optional[float], object] = POLLING_DEFAULT_VALUE,
3701 retry: retries.Retry = DEFAULT_RETRY,
3702 job_retry: retries.Retry = DEFAULT_JOB_RETRY,
3703 page_size: Optional[int] = None,
3704 max_results: Optional[int] = None,
3705 callback: Callable = lambda _: None,
3706 ) -> RowIterator:
3707 """See query_and_wait.
3708
3709 This method has an extra callback parameter, which is used by bigframes
3710 to create better progress bars.
3711 """
3712 if project is None:
3713 project = self.project
3714
3715 if location is None:
3716 location = self.location
3717
3718 if job_config is not None:
3719 _verify_job_config_type(job_config, QueryJobConfig)
3720
3721 job_config = _job_helpers.job_config_with_defaults(
3722 job_config, self._default_query_job_config
3723 )
3724
3725 return _job_helpers.query_and_wait(
3726 self,
3727 query,
3728 job_config=job_config,
3729 location=location,
3730 project=project,
3731 api_timeout=api_timeout,
3732 wait_timeout=wait_timeout,
3733 retry=retry,
3734 job_retry=job_retry,
3735 page_size=page_size,
3736 max_results=max_results,
3737 callback=callback,
3738 )
3739
3740 def insert_rows(
3741 self,
3742 table: Union[Table, TableReference, str],
3743 rows: Union[Iterable[Tuple], Iterable[Mapping[str, Any]]],
3744 selected_fields: Optional[Sequence[SchemaField]] = None,
3745 **kwargs,
3746 ) -> Sequence[Dict[str, Any]]:
3747 """Insert rows into a table via the streaming API.
3748
3749 See
3750 https://cloud.google.com/bigquery/docs/reference/rest/v2/tabledata/insertAll
3751
3752 BigQuery will reject insertAll payloads that exceed a defined limit (10MB).
3753 Additionally, if a payload vastly exceeds this limit, the request is rejected
3754 by the intermediate architecture, which returns a 413 (Payload Too Large) status code.
3755
3756
3757 See
3758 https://cloud.google.com/bigquery/quotas#streaming_inserts
3759
3760 Args:
3761 table (Union[ \
3762 google.cloud.bigquery.table.Table, \
3763 google.cloud.bigquery.table.TableReference, \
3764 str, \
3765 ]):
3766 The destination table for the row data, or a reference to it.
3767 rows (Union[Sequence[Tuple], Sequence[Dict]]):
3768 Row data to be inserted. If a list of tuples is given, each
3769 tuple should contain data for each schema field on the
3770 current table and in the same order as the schema fields. If
3771 a list of dictionaries is given, the keys must include all
3772 required fields in the schema. Keys which do not correspond
3773 to a field in the schema are ignored.
3774 selected_fields (Sequence[google.cloud.bigquery.schema.SchemaField]):
3775 The fields to return. Required if ``table`` is a
3776 :class:`~google.cloud.bigquery.table.TableReference`.
3777 kwargs (dict):
3778 Keyword arguments to
3779 :meth:`~google.cloud.bigquery.client.Client.insert_rows_json`.
3780
3781 Returns:
3782 Sequence[Mappings]:
3783 One mapping per row with insert errors: the "index" key
3784 identifies the row, and the "errors" key contains a list of
3785 the mappings describing one or more problems with the row.
3786
3787 Raises:
3788 ValueError: if table's schema is not set or `rows` is not a `Sequence`.
3789 """
3790 if not isinstance(rows, (collections_abc.Sequence, collections_abc.Iterator)):
3791 raise TypeError("rows argument should be a sequence of dicts or tuples")
3792
3793 table = _table_arg_to_table(table, default_project=self.project)
3794
3795 if not isinstance(table, Table):
3796 raise TypeError(_NEED_TABLE_ARGUMENT)
3797
3798 schema = table.schema
3799
3800 # selected_fields can override the table schema.
3801 if selected_fields is not None:
3802 schema = selected_fields
3803
3804 if len(schema) == 0:
3805 raise ValueError(
3806 (
3807 "Could not determine schema for table '{}'. Call client.get_table() "
3808 "or pass in a list of schema fields to the selected_fields argument."
3809 ).format(table)
3810 )
3811
3812 json_rows = [_record_field_to_json(schema, row) for row in rows]
3813
3814 return self.insert_rows_json(table, json_rows, **kwargs)
3815
3816 def insert_rows_from_dataframe(
3817 self,
3818 table: Union[Table, TableReference, str],
3819 dataframe,
3820 selected_fields: Optional[Sequence[SchemaField]] = None,
3821 chunk_size: int = 500,
3822 **kwargs: Dict,
3823 ) -> Sequence[Sequence[dict]]:
3824 """Insert rows into a table from a dataframe via the streaming API.
3825
3826 BigQuery will reject insertAll payloads that exceed a defined limit (10MB).
3827 Additionally, if a payload vastly exceeds this limit, the request is rejected
3828 by the intermediate architecture, which returns a 413 (Payload Too Large) status code.
3829
3830 See
3831 https://cloud.google.com/bigquery/quotas#streaming_inserts
3832
3833 Args:
3834 table (Union[ \
3835 google.cloud.bigquery.table.Table, \
3836 google.cloud.bigquery.table.TableReference, \
3837 str, \
3838 ]):
3839 The destination table for the row data, or a reference to it.
3840 dataframe (pandas.DataFrame):
3841 A :class:`~pandas.DataFrame` containing the data to load. Any
3842 ``NaN`` values present in the dataframe are omitted from the
3843 streaming API request(s).
3844 selected_fields (Sequence[google.cloud.bigquery.schema.SchemaField]):
3845 The fields to return. Required if ``table`` is a
3846 :class:`~google.cloud.bigquery.table.TableReference`.
3847 chunk_size (int):
3848 The number of rows to stream in a single chunk. Must be positive.
3849 kwargs (Dict):
3850 Keyword arguments to
3851 :meth:`~google.cloud.bigquery.client.Client.insert_rows_json`.
3852
3853 Returns:
3854 Sequence[Sequence[Mappings]]:
3855 A list with insert errors for each insert chunk. Each element
3856 is a list containing one mapping per row with insert errors:
3857 the "index" key identifies the row, and the "errors" key
3858 contains a list of the mappings describing one or more problems
3859 with the row.
3860
3861 Raises:
3862 ValueError: if table's schema is not set
3863 """
3864 insert_results = []
3865
3866 chunk_count = int(math.ceil(len(dataframe) / chunk_size))
3867 rows_iter = _pandas_helpers.dataframe_to_json_generator(dataframe)
3868
3869 for _ in range(chunk_count):
3870 rows_chunk = itertools.islice(rows_iter, chunk_size)
3871 result = self.insert_rows(table, rows_chunk, selected_fields, **kwargs)
3872 insert_results.append(result)
3873
3874 return insert_results
3875
3876 def insert_rows_json(
3877 self,
3878 table: Union[Table, TableReference, TableListItem, str],
3879 json_rows: Sequence[Mapping[str, Any]],
3880 row_ids: Union[
3881 Iterable[Optional[str]], AutoRowIDs, None
3882 ] = AutoRowIDs.GENERATE_UUID,
3883 skip_invalid_rows: Optional[bool] = None,
3884 ignore_unknown_values: Optional[bool] = None,
3885 template_suffix: Optional[str] = None,
3886 retry: retries.Retry = DEFAULT_RETRY,
3887 timeout: TimeoutType = DEFAULT_TIMEOUT,
3888 ) -> Sequence[dict]:
3889 """Insert rows into a table without applying local type conversions.
3890
3891 See
3892 https://cloud.google.com/bigquery/docs/reference/rest/v2/tabledata/insertAll
3893
3894 BigQuery will reject insertAll payloads that exceed a defined limit (10MB).
3895 Additionally, if a payload vastly exceeds this limit, the request is rejected
3896 by the intermediate architecture, which returns a 413 (Payload Too Large) status code.
3897
3898 See
3899 https://cloud.google.com/bigquery/quotas#streaming_inserts
3900
3901 Args:
3902 table (Union[ \
3903 google.cloud.bigquery.table.Table \
3904 google.cloud.bigquery.table.TableReference, \
3905 google.cloud.bigquery.table.TableListItem, \
3906 str \
3907 ]):
3908 The destination table for the row data, or a reference to it.
3909 json_rows (Sequence[Dict]):
3910 Row data to be inserted. Keys must match the table schema fields
3911 and values must be JSON-compatible representations.
3912 row_ids (Union[Iterable[str], AutoRowIDs, None]):
3913 Unique IDs, one per row being inserted. An ID can also be
3914 ``None``, indicating that an explicit insert ID should **not**
3915 be used for that row. If the argument is omitted altogether,
3916 unique IDs are created automatically.
3917
3918 .. versionchanged:: 2.21.0
3919 Can also be an iterable, not just a sequence, or an
3920 :class:`AutoRowIDs` enum member.
3921
3922 .. deprecated:: 2.21.0
3923 Passing ``None`` to explicitly request autogenerating insert IDs is
3924 deprecated, use :attr:`AutoRowIDs.GENERATE_UUID` instead.
3925
3926 skip_invalid_rows (Optional[bool]):
3927 Insert all valid rows of a request, even if invalid rows exist.
3928 The default value is ``False``, which causes the entire request
3929 to fail if any invalid rows exist.
3930 ignore_unknown_values (Optional[bool]):
3931 Accept rows that contain values that do not match the schema.
3932 The unknown values are ignored. Default is ``False``, which
3933 treats unknown values as errors.
3934 template_suffix (Optional[str]):
3935 Treat ``name`` as a template table and provide a suffix.
3936 BigQuery will create the table ``<name> + <template_suffix>``
3937 based on the schema of the template table. See
3938 https://cloud.google.com/bigquery/streaming-data-into-bigquery#template-tables
3939 retry (Optional[google.api_core.retry.Retry]):
3940 How to retry the RPC.
3941 timeout (Optional[float]):
3942 The number of seconds to wait for the underlying HTTP transport
3943 before using ``retry``.
3944
3945 Returns:
3946 Sequence[Mappings]:
3947 One mapping per row with insert errors: the "index" key
3948 identifies the row, and the "errors" key contains a list of
3949 the mappings describing one or more problems with the row.
3950
3951 Raises:
3952 TypeError: if `json_rows` is not a `Sequence`.
3953 """
3954 if not isinstance(
3955 json_rows, (collections_abc.Sequence, collections_abc.Iterator)
3956 ):
3957 raise TypeError("json_rows argument should be a sequence of dicts")
3958 # Convert table to just a reference because unlike insert_rows,
3959 # insert_rows_json doesn't need the table schema. It's not doing any
3960 # type conversions.
3961 table = _table_arg_to_table_ref(table, default_project=self.project)
3962 rows_info: List[Any] = []
3963 data: Dict[str, Any] = {"rows": rows_info}
3964
3965 if row_ids is None:
3966 warnings.warn(
3967 "Passing None for row_ids is deprecated. To explicitly request "
3968 "autogenerated insert IDs, use AutoRowIDs.GENERATE_UUID instead",
3969 category=DeprecationWarning,
3970 )
3971 row_ids = AutoRowIDs.GENERATE_UUID
3972
3973 if not isinstance(row_ids, AutoRowIDs):
3974 try:
3975 row_ids_iter = iter(row_ids)
3976 except TypeError:
3977 msg = "row_ids is neither an iterable nor an AutoRowIDs enum member"
3978 raise TypeError(msg)
3979
3980 for i, row in enumerate(json_rows):
3981 info: Dict[str, Any] = {"json": row}
3982
3983 if row_ids is AutoRowIDs.GENERATE_UUID:
3984 info["insertId"] = str(uuid.uuid4())
3985 elif row_ids is AutoRowIDs.DISABLED:
3986 info["insertId"] = None
3987 else:
3988 try:
3989 insert_id = next(row_ids_iter)
3990 except StopIteration:
3991 msg = f"row_ids did not generate enough IDs, error at index {i}"
3992 raise ValueError(msg)
3993 else:
3994 info["insertId"] = insert_id
3995
3996 rows_info.append(info)
3997
3998 if skip_invalid_rows is not None:
3999 data["skipInvalidRows"] = skip_invalid_rows
4000
4001 if ignore_unknown_values is not None:
4002 data["ignoreUnknownValues"] = ignore_unknown_values
4003
4004 if template_suffix is not None:
4005 data["templateSuffix"] = template_suffix
4006
4007 path = "%s/insertAll" % table.path
4008 # We can always retry, because every row has an insert ID.
4009 span_attributes = {"path": path}
4010 response = self._call_api(
4011 retry,
4012 span_name="BigQuery.insertRowsJson",
4013 span_attributes=span_attributes,
4014 method="POST",
4015 path=path,
4016 data=data,
4017 timeout=timeout,
4018 )
4019 errors = []
4020
4021 for error in response.get("insertErrors", ()):
4022 errors.append({"index": int(error["index"]), "errors": error["errors"]})
4023
4024 return errors
4025
4026 def list_partitions(
4027 self,
4028 table: Union[Table, TableReference, TableListItem, str],
4029 retry: retries.Retry = DEFAULT_RETRY,
4030 timeout: TimeoutType = DEFAULT_TIMEOUT,
4031 ) -> Sequence[str]:
4032 """List the partitions in a table.
4033
4034 Args:
4035 table (Union[ \
4036 google.cloud.bigquery.table.Table, \
4037 google.cloud.bigquery.table.TableReference, \
4038 google.cloud.bigquery.table.TableListItem, \
4039 str, \
4040 ]):
4041 The table or reference from which to get partition info
4042 retry (Optional[google.api_core.retry.Retry]):
4043 How to retry the RPC.
4044 timeout (Optional[float]):
4045 The number of seconds to wait for the underlying HTTP transport
4046 before using ``retry``.
4047 If multiple requests are made under the hood, ``timeout``
4048 applies to each individual request.
4049
4050 Returns:
4051 List[str]:
4052 A list of the partition ids present in the partitioned table
4053 """
4054 table = _table_arg_to_table_ref(table, default_project=self.project)
4055 meta_table = self.get_table(
4056 TableReference(
4057 DatasetReference(table.project, table.dataset_id),
4058 "%s$__PARTITIONS_SUMMARY__" % table.table_id,
4059 ),
4060 retry=retry,
4061 timeout=timeout,
4062 )
4063
4064 subset = [col for col in meta_table.schema if col.name == "partition_id"]
4065 return [
4066 row[0]
4067 for row in self.list_rows(
4068 meta_table, selected_fields=subset, retry=retry, timeout=timeout
4069 )
4070 ]
4071
4072 def list_rows(
4073 self,
4074 table: Union[Table, TableListItem, TableReference, str],
4075 selected_fields: Optional[Sequence[SchemaField]] = None,
4076 max_results: Optional[int] = None,
4077 page_token: Optional[str] = None,
4078 start_index: Optional[int] = None,
4079 page_size: Optional[int] = None,
4080 retry: retries.Retry = DEFAULT_RETRY,
4081 timeout: TimeoutType = DEFAULT_TIMEOUT,
4082 *,
4083 timestamp_precision: Optional[enums.TimestampPrecision] = None,
4084 ) -> RowIterator:
4085 """List the rows of the table.
4086
4087 See
4088 https://cloud.google.com/bigquery/docs/reference/rest/v2/tabledata/list
4089
4090 .. note::
4091
4092 This method assumes that the provided schema is up-to-date with the
4093 schema as defined on the back-end: if the two schemas are not
4094 identical, the values returned may be incomplete. To ensure that the
4095 local copy of the schema is up-to-date, call ``client.get_table``.
4096
4097 Args:
4098 table (Union[ \
4099 google.cloud.bigquery.table.Table, \
4100 google.cloud.bigquery.table.TableListItem, \
4101 google.cloud.bigquery.table.TableReference, \
4102 str, \
4103 ]):
4104 The table to list, or a reference to it. When the table
4105 object does not contain a schema and ``selected_fields`` is
4106 not supplied, this method calls ``get_table`` to fetch the
4107 table schema.
4108 selected_fields (Sequence[google.cloud.bigquery.schema.SchemaField]):
4109 The fields to return. If not supplied, data for all columns
4110 are downloaded.
4111 max_results (Optional[int]):
4112 Maximum number of rows to return.
4113 page_token (Optional[str]):
4114 Token representing a cursor into the table's rows.
4115 If not passed, the API will return the first page of the
4116 rows. The token marks the beginning of the iterator to be
4117 returned and the value of the ``page_token`` can be accessed
4118 at ``next_page_token`` of the
4119 :class:`~google.cloud.bigquery.table.RowIterator`.
4120 start_index (Optional[int]):
4121 The zero-based index of the starting row to read.
4122 page_size (Optional[int]):
4123 The maximum number of rows in each page of results from this request.
4124 Non-positive values are ignored. Defaults to a sensible value set by the API.
4125 retry (Optional[google.api_core.retry.Retry]):
4126 How to retry the RPC.
4127 timeout (Optional[float]):
4128 The number of seconds to wait for the underlying HTTP transport
4129 before using ``retry``.
4130 If multiple requests are made under the hood, ``timeout``
4131 applies to each individual request.
4132 timestamp_precision (Optional[enums.TimestampPrecision]):
4133 [Private Preview] If set to `enums.TimestampPrecision.PICOSECOND`,
4134 timestamp columns of picosecond precision will be returned with
4135 full precision. Otherwise, will truncate to microsecond
4136 precision.
4137
4138 Returns:
4139 google.cloud.bigquery.table.RowIterator:
4140 Iterator of row data
4141 :class:`~google.cloud.bigquery.table.Row`-s. During each
4142 page, the iterator will have the ``total_rows`` attribute
4143 set, which counts the total number of rows **in the table**
4144 (this is distinct from the total number of rows in the
4145 current page: ``iterator.page.num_items``).
4146 """
4147 table = _table_arg_to_table(table, default_project=self.project)
4148
4149 if not isinstance(table, Table):
4150 raise TypeError(_NEED_TABLE_ARGUMENT)
4151
4152 schema = table.schema
4153
4154 # selected_fields can override the table schema.
4155 if selected_fields is not None:
4156 schema = selected_fields
4157
4158 # No schema, but no selected_fields. Assume the developer wants all
4159 # columns, so get the table resource for them rather than failing.
4160 elif len(schema) == 0:
4161 table = self.get_table(table.reference, retry=retry, timeout=timeout)
4162 schema = table.schema
4163
4164 params: Dict[str, Any] = {}
4165 if selected_fields is not None:
4166 params["selectedFields"] = ",".join(field.name for field in selected_fields)
4167 if start_index is not None:
4168 params["startIndex"] = start_index
4169
4170 # Cannot specify both use_int64_timestamp and timestamp_output_format.
4171 if timestamp_precision == enums.TimestampPrecision.PICOSECOND:
4172 params["formatOptions.timestampOutputFormat"] = "ISO8601_STRING"
4173 else:
4174 params["formatOptions.useInt64Timestamp"] = True
4175
4176 row_iterator = RowIterator(
4177 client=self,
4178 api_request=functools.partial(self._call_api, retry, timeout=timeout),
4179 path="%s/data" % (table.path,),
4180 schema=schema,
4181 page_token=page_token,
4182 max_results=max_results,
4183 page_size=page_size,
4184 extra_params=params,
4185 table=table,
4186 # Pass in selected_fields separately from schema so that full
4187 # tables can be fetched without a column filter.
4188 selected_fields=selected_fields,
4189 total_rows=getattr(table, "num_rows", None),
4190 project=table.project,
4191 location=table.location,
4192 )
4193 return row_iterator
4194
4195 def _list_rows_from_query_results(
4196 self,
4197 job_id: str,
4198 location: str,
4199 project: str,
4200 schema: Sequence[SchemaField],
4201 total_rows: Optional[int] = None,
4202 destination: Optional[Union[Table, TableReference, TableListItem, str]] = None,
4203 max_results: Optional[int] = None,
4204 start_index: Optional[int] = None,
4205 page_size: Optional[int] = None,
4206 retry: retries.Retry = DEFAULT_RETRY,
4207 timeout: TimeoutType = DEFAULT_TIMEOUT,
4208 query_id: Optional[str] = None,
4209 first_page_response: Optional[Dict[str, Any]] = None,
4210 num_dml_affected_rows: Optional[int] = None,
4211 query: Optional[str] = None,
4212 total_bytes_processed: Optional[int] = None,
4213 slot_millis: Optional[int] = None,
4214 created: Optional[datetime.datetime] = None,
4215 started: Optional[datetime.datetime] = None,
4216 ended: Optional[datetime.datetime] = None,
4217 ) -> RowIterator:
4218 """List the rows of a completed query.
4219 See
4220 https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/getQueryResults
4221 Args:
4222 job_id (str):
4223 ID of a query job.
4224 location (str): Location of the query job.
4225 project (str):
4226 ID of the project where the query job was run.
4227 schema (Sequence[google.cloud.bigquery.schema.SchemaField]):
4228 The fields expected in these query results. Used to convert
4229 from JSON to expected Python types.
4230 total_rows (Optional[int]):
4231 Total number of rows in the query results.
4232 destination (Optional[Union[ \
4233 google.cloud.bigquery.table.Table, \
4234 google.cloud.bigquery.table.TableListItem, \
4235 google.cloud.bigquery.table.TableReference, \
4236 str, \
4237 ]]):
4238 Destination table reference. Used to fetch the query results
4239 with the BigQuery Storage API.
4240 max_results (Optional[int]):
4241 Maximum number of rows to return across the whole iterator.
4242 start_index (Optional[int]):
4243 The zero-based index of the starting row to read.
4244 page_size (Optional[int]):
4245 The maximum number of rows in each page of results from this request.
4246 Non-positive values are ignored. Defaults to a sensible value set by the API.
4247 retry (Optional[google.api_core.retry.Retry]):
4248 How to retry the RPC.
4249 timeout (Optional[float]):
4250 The number of seconds to wait for the underlying HTTP transport
4251 before using ``retry``. If set, this connection timeout may be
4252 increased to a minimum value. This prevents retries on what
4253 would otherwise be a successful response.
4254 If multiple requests are made under the hood, ``timeout``
4255 applies to each individual request.
4256 query_id (Optional[str]):
4257 [Preview] ID of a completed query. This ID is auto-generated
4258 and not guaranteed to be populated.
4259 first_page_response (Optional[dict]):
4260 API response for the first page of results (if available).
4261 num_dml_affected_rows (Optional[int]):
4262 If this RowIterator is the result of a DML query, the number of
4263 rows that were affected.
4264 query (Optional[str]):
4265 The query text used.
4266 total_bytes_processed (Optional[int]):
4267 total bytes processed from job statistics, if present.
4268 slot_millis (Optional[int]):
4269 Number of slot ms the user is actually billed for.
4270 created (Optional[datetime.datetime]):
4271 Datetime at which the job was created.
4272 started (Optional[datetime.datetime]):
4273 Datetime at which the job was started.
4274 ended (Optional[datetime.datetime]):
4275 Datetime at which the job finished.
4276
4277 Returns:
4278 google.cloud.bigquery.table.RowIterator:
4279 Iterator of row data
4280 :class:`~google.cloud.bigquery.table.Row`-s.
4281 """
4282 params: Dict[str, Any] = {
4283 "fields": _LIST_ROWS_FROM_QUERY_RESULTS_FIELDS,
4284 "location": location,
4285 }
4286
4287 if timeout is not None:
4288 if not isinstance(timeout, (int, float)):
4289 timeout = _MIN_GET_QUERY_RESULTS_TIMEOUT
4290 else:
4291 timeout = max(timeout, _MIN_GET_QUERY_RESULTS_TIMEOUT)
4292
4293 if start_index is not None:
4294 params["startIndex"] = start_index
4295
4296 params["formatOptions.useInt64Timestamp"] = True
4297 row_iterator = RowIterator(
4298 client=self,
4299 api_request=functools.partial(self._call_api, retry, timeout=timeout),
4300 path=f"/projects/{project}/queries/{job_id}",
4301 schema=schema,
4302 max_results=max_results,
4303 page_size=page_size,
4304 table=destination,
4305 extra_params=params,
4306 total_rows=total_rows,
4307 project=project,
4308 location=location,
4309 job_id=job_id,
4310 query_id=query_id,
4311 first_page_response=first_page_response,
4312 num_dml_affected_rows=num_dml_affected_rows,
4313 query=query,
4314 total_bytes_processed=total_bytes_processed,
4315 slot_millis=slot_millis,
4316 created=created,
4317 started=started,
4318 ended=ended,
4319 )
4320 return row_iterator
4321
4322 def _schema_from_json_file_object(self, file_obj):
4323 """Helper function for schema_from_json that takes a
4324 file object that describes a table schema.
4325
4326 Returns:
4327 List of schema field objects.
4328 """
4329 json_data = json.load(file_obj)
4330 return [SchemaField.from_api_repr(field) for field in json_data]
4331
4332 def _schema_to_json_file_object(self, schema_list, file_obj):
4333 """Helper function for schema_to_json that takes a schema list and file
4334 object and writes the schema list to the file object with json.dump
4335 """
4336 json.dump(schema_list, file_obj, indent=2, sort_keys=True)
4337
4338 def schema_from_json(self, file_or_path: "PathType") -> List[SchemaField]:
4339 """Takes a file object or file path that contains json that describes
4340 a table schema.
4341
4342 Returns:
4343 List[SchemaField]:
4344 List of :class:`~google.cloud.bigquery.schema.SchemaField` objects.
4345 """
4346 if isinstance(file_or_path, io.IOBase):
4347 return self._schema_from_json_file_object(file_or_path)
4348
4349 with open(file_or_path) as file_obj:
4350 return self._schema_from_json_file_object(file_obj)
4351
4352 def schema_to_json(
4353 self, schema_list: Sequence[SchemaField], destination: "PathType"
4354 ):
4355 """Takes a list of schema field objects.
4356
4357 Serializes the list of schema field objects as json to a file.
4358
4359 Destination is a file path or a file object.
4360 """
4361 json_schema_list = [f.to_api_repr() for f in schema_list]
4362
4363 if isinstance(destination, io.IOBase):
4364 return self._schema_to_json_file_object(json_schema_list, destination)
4365
4366 with open(destination, mode="w") as file_obj:
4367 return self._schema_to_json_file_object(json_schema_list, file_obj)
4368
4369 def __enter__(self):
4370 return self
4371
4372 def __exit__(self, exc_type, exc_value, traceback):
4373 self.close()
4374
4375
4376# pylint: disable=unused-argument
4377def _item_to_project(iterator, resource):
4378 """Convert a JSON project to the native object.
4379
4380 Args:
4381 iterator (google.api_core.page_iterator.Iterator): The iterator that is currently in use.
4382
4383 resource (Dict): An item to be converted to a project.
4384
4385 Returns:
4386 google.cloud.bigquery.client.Project: The next project in the page.
4387 """
4388 return Project.from_api_repr(resource)
4389
4390
4391# pylint: enable=unused-argument
4392
4393
4394def _item_to_dataset(iterator, resource):
4395 """Convert a JSON dataset to the native object.
4396
4397 Args:
4398 iterator (google.api_core.page_iterator.Iterator): The iterator that is currently in use.
4399
4400 resource (Dict): An item to be converted to a dataset.
4401
4402 Returns:
4403 google.cloud.bigquery.dataset.DatasetListItem: The next dataset in the page.
4404 """
4405 return DatasetListItem(resource)
4406
4407
4408def _item_to_job(iterator, resource):
4409 """Convert a JSON job to the native object.
4410
4411 Args:
4412 iterator (google.api_core.page_iterator.Iterator): The iterator that is currently in use.
4413
4414 resource (Dict): An item to be converted to a job.
4415
4416 Returns:
4417 job instance: The next job in the page.
4418 """
4419 return iterator.client.job_from_resource(resource)
4420
4421
4422def _item_to_model(iterator, resource):
4423 """Convert a JSON model to the native object.
4424
4425 Args:
4426 iterator (google.api_core.page_iterator.Iterator):
4427 The iterator that is currently in use.
4428 resource (Dict): An item to be converted to a model.
4429
4430 Returns:
4431 google.cloud.bigquery.model.Model: The next model in the page.
4432 """
4433 return Model.from_api_repr(resource)
4434
4435
4436def _item_to_routine(iterator, resource):
4437 """Convert a JSON model to the native object.
4438
4439 Args:
4440 iterator (google.api_core.page_iterator.Iterator):
4441 The iterator that is currently in use.
4442 resource (Dict): An item to be converted to a routine.
4443
4444 Returns:
4445 google.cloud.bigquery.routine.Routine: The next routine in the page.
4446 """
4447 return Routine.from_api_repr(resource)
4448
4449
4450def _item_to_table(iterator, resource):
4451 """Convert a JSON table to the native object.
4452
4453 Args:
4454 iterator (google.api_core.page_iterator.Iterator): The iterator that is currently in use.
4455
4456 resource (Dict): An item to be converted to a table.
4457
4458 Returns:
4459 google.cloud.bigquery.table.Table: The next table in the page.
4460 """
4461 return TableListItem(resource)
4462
4463
4464def _extract_job_reference(job, project=None, location=None):
4465 """Extract fully-qualified job reference from a job-like object.
4466
4467 Args:
4468 job_id (Union[ \
4469 str, \
4470 google.cloud.bigquery.job.LoadJob, \
4471 google.cloud.bigquery.job.CopyJob, \
4472 google.cloud.bigquery.job.ExtractJob, \
4473 google.cloud.bigquery.job.QueryJob \
4474 ]): Job identifier.
4475 project (Optional[str]):
4476 Project where the job was run. Ignored if ``job_id`` is a job
4477 object.
4478 location (Optional[str]):
4479 Location where the job was run. Ignored if ``job_id`` is a job
4480 object.
4481
4482 Returns:
4483 Tuple[str, str, str]: ``(project, location, job_id)``
4484 """
4485 if hasattr(job, "job_id"):
4486 project = job.project
4487 job_id = job.job_id
4488 location = job.location
4489 else:
4490 job_id = job
4491
4492 return (project, location, job_id)
4493
4494
4495def _check_mode(stream):
4496 """Check that a stream was opened in read-binary mode.
4497
4498 Args:
4499 stream (IO[bytes]): A bytes IO object open for reading.
4500
4501 Raises:
4502 ValueError:
4503 if the ``stream.mode`` is a valid attribute
4504 and is not among ``rb``, ``r+b`` or ``rb+``.
4505 """
4506 mode = getattr(stream, "mode", None)
4507
4508 if isinstance(stream, gzip.GzipFile):
4509 if mode != gzip.READ: # pytype: disable=module-attr
4510 raise ValueError(
4511 "Cannot upload gzip files opened in write mode: use "
4512 "gzip.GzipFile(filename, mode='rb')"
4513 )
4514 else:
4515 if mode is not None and mode not in ("rb", "r+b", "rb+"):
4516 raise ValueError(
4517 "Cannot upload files opened in text mode: use "
4518 "open(filename, mode='rb') or open(filename, mode='r+b')"
4519 )
4520
4521
4522def _get_upload_headers(user_agent):
4523 """Get the headers for an upload request.
4524
4525 Args:
4526 user_agent (str): The user-agent for requests.
4527
4528 Returns:
4529 Dict: The headers to be used for the request.
4530 """
4531 return {
4532 "Accept": "application/json",
4533 "Accept-Encoding": "gzip, deflate",
4534 "User-Agent": user_agent,
4535 "content-type": "application/json; charset=UTF-8",
4536 }
4537
4538
4539def _add_server_timeout_header(headers: Optional[Dict[str, str]], kwargs):
4540 timeout = kwargs.get("timeout")
4541 if timeout is not None:
4542 if headers is None:
4543 headers = {}
4544 headers[TIMEOUT_HEADER] = str(timeout)
4545
4546 if headers:
4547 kwargs["headers"] = headers
4548
4549 return kwargs