Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/cloud/bigquery/client.py: 20%
858 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:07 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:07 +0000
1# Copyright 2015 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""Client for interacting with the Google BigQuery API."""
17from __future__ import absolute_import
18from __future__ import division
20from collections import abc as collections_abc
21import copy
22import datetime
23import functools
24import gzip
25import io
26import itertools
27import json
28import math
29import os
30import packaging.version
31import tempfile
32import typing
33from typing import (
34 Any,
35 Dict,
36 IO,
37 Iterable,
38 Mapping,
39 List,
40 Optional,
41 Sequence,
42 Tuple,
43 Union,
44)
45import uuid
46import warnings
48try:
49 import pyarrow # type: ignore
51 _PYARROW_VERSION = packaging.version.parse(pyarrow.__version__)
52except ImportError: # pragma: NO COVER
53 pyarrow = None
55from google import resumable_media # type: ignore
56from google.resumable_media.requests import MultipartUpload # type: ignore
57from google.resumable_media.requests import ResumableUpload
59import google.api_core.client_options
60import google.api_core.exceptions as core_exceptions
61from google.api_core.iam import Policy
62from google.api_core import page_iterator
63from google.api_core import retry as retries
64import google.cloud._helpers # type: ignore
65from google.cloud import exceptions # pytype: disable=import-error
66from google.cloud.client import ClientWithProject # type: ignore # pytype: disable=import-error
68try:
69 from google.cloud.bigquery_storage_v1.services.big_query_read.client import (
70 DEFAULT_CLIENT_INFO as DEFAULT_BQSTORAGE_CLIENT_INFO,
71 )
72except ImportError:
73 DEFAULT_BQSTORAGE_CLIENT_INFO = None # type: ignore
76from google.cloud.bigquery import _job_helpers
77from google.cloud.bigquery._job_helpers import make_job_id as _make_job_id
78from google.cloud.bigquery._helpers import _get_sub_prop
79from google.cloud.bigquery._helpers import _record_field_to_json
80from google.cloud.bigquery._helpers import _str_or_none
81from google.cloud.bigquery._helpers import _verify_job_config_type
82from google.cloud.bigquery._helpers import _get_bigquery_host
83from google.cloud.bigquery._helpers import BQ_STORAGE_VERSIONS
84from google.cloud.bigquery._helpers import _DEFAULT_HOST
85from google.cloud.bigquery._http import Connection
86from google.cloud.bigquery import _pandas_helpers
87from google.cloud.bigquery.dataset import Dataset
88from google.cloud.bigquery.dataset import DatasetListItem
89from google.cloud.bigquery.dataset import DatasetReference
90from google.cloud.bigquery import enums
91from google.cloud.bigquery.enums import AutoRowIDs
92from google.cloud.bigquery.exceptions import LegacyBigQueryStorageError
93from google.cloud.bigquery.opentelemetry_tracing import create_span
94from google.cloud.bigquery import job
95from google.cloud.bigquery.job import (
96 CopyJob,
97 CopyJobConfig,
98 ExtractJob,
99 ExtractJobConfig,
100 LoadJob,
101 LoadJobConfig,
102 QueryJob,
103 QueryJobConfig,
104)
105from google.cloud.bigquery.model import Model
106from google.cloud.bigquery.model import ModelReference
107from google.cloud.bigquery.model import _model_arg_to_model_ref
108from google.cloud.bigquery.query import _QueryResults
109from google.cloud.bigquery.retry import (
110 DEFAULT_JOB_RETRY,
111 DEFAULT_RETRY,
112 DEFAULT_TIMEOUT,
113)
114from google.cloud.bigquery.routine import Routine
115from google.cloud.bigquery.routine import RoutineReference
116from google.cloud.bigquery.schema import SchemaField
117from google.cloud.bigquery.table import _table_arg_to_table
118from google.cloud.bigquery.table import _table_arg_to_table_ref
119from google.cloud.bigquery.table import Table
120from google.cloud.bigquery.table import TableListItem
121from google.cloud.bigquery.table import TableReference
122from google.cloud.bigquery.table import RowIterator
123from google.cloud.bigquery.format_options import ParquetOptions
124from google.cloud.bigquery import _helpers
126TimeoutType = Union[float, None]
127ResumableTimeoutType = Union[
128 None, float, Tuple[float, float]
129] # for resumable media methods
131if typing.TYPE_CHECKING: # pragma: NO COVER
132 # os.PathLike is only subscriptable in Python 3.9+, thus shielding with a condition.
133 PathType = Union[str, bytes, os.PathLike[str], os.PathLike[bytes]]
134 import pandas # type: ignore
135 import requests # required by api-core
137_DEFAULT_CHUNKSIZE = 100 * 1024 * 1024 # 100 MB
138_MAX_MULTIPART_SIZE = 5 * 1024 * 1024
139_DEFAULT_NUM_RETRIES = 6
140_BASE_UPLOAD_TEMPLATE = "{host}/upload/bigquery/v2/projects/{project}/jobs?uploadType="
141_MULTIPART_URL_TEMPLATE = _BASE_UPLOAD_TEMPLATE + "multipart"
142_RESUMABLE_URL_TEMPLATE = _BASE_UPLOAD_TEMPLATE + "resumable"
143_GENERIC_CONTENT_TYPE = "*/*"
144_READ_LESS_THAN_SIZE = (
145 "Size {:d} was specified but the file-like object only had " "{:d} bytes remaining."
146)
147_NEED_TABLE_ARGUMENT = (
148 "The table argument should be a table ID string, Table, or TableReference"
149)
150_LIST_ROWS_FROM_QUERY_RESULTS_FIELDS = "jobReference,totalRows,pageToken,rows"
152# In microbenchmarks, it's been shown that even in ideal conditions (query
153# finished, local data), requests to getQueryResults can take 10+ seconds.
154# In less-than-ideal situations, the response can take even longer, as it must
155# be able to download a full 100+ MB row in that time. Don't let the
156# connection timeout before data can be downloaded.
157# https://github.com/googleapis/python-bigquery/issues/438
158_MIN_GET_QUERY_RESULTS_TIMEOUT = 120
160TIMEOUT_HEADER = "X-Server-Timeout"
162# https://github.com/googleapis/python-bigquery/issues/781#issuecomment-883497414
163_PYARROW_BAD_VERSIONS = frozenset([packaging.version.Version("2.0.0")])
166class Project(object):
167 """Wrapper for resource describing a BigQuery project.
169 Args:
170 project_id (str): Opaque ID of the project
172 numeric_id (int): Numeric ID of the project
174 friendly_name (str): Display name of the project
175 """
177 def __init__(self, project_id, numeric_id, friendly_name):
178 self.project_id = project_id
179 self.numeric_id = numeric_id
180 self.friendly_name = friendly_name
182 @classmethod
183 def from_api_repr(cls, resource):
184 """Factory: construct an instance from a resource dict."""
185 return cls(resource["id"], resource["numericId"], resource["friendlyName"])
188class Client(ClientWithProject):
189 """Client to bundle configuration needed for API requests.
191 Args:
192 project (Optional[str]):
193 Project ID for the project which the client acts on behalf of.
194 Will be passed when creating a dataset / job. If not passed,
195 falls back to the default inferred from the environment.
196 credentials (Optional[google.auth.credentials.Credentials]):
197 The OAuth2 Credentials to use for this client. If not passed
198 (and if no ``_http`` object is passed), falls back to the
199 default inferred from the environment.
200 _http (Optional[requests.Session]):
201 HTTP object to make requests. Can be any object that
202 defines ``request()`` with the same interface as
203 :meth:`requests.Session.request`. If not passed, an ``_http``
204 object is created that is bound to the ``credentials`` for the
205 current object.
206 This parameter should be considered private, and could change in
207 the future.
208 location (Optional[str]):
209 Default location for jobs / datasets / tables.
210 default_query_job_config (Optional[google.cloud.bigquery.job.QueryJobConfig]):
211 Default ``QueryJobConfig``.
212 Will be merged into job configs passed into the ``query`` method.
213 default_load_job_config (Optional[google.cloud.bigquery.job.LoadJobConfig]):
214 Default ``LoadJobConfig``.
215 Will be merged into job configs passed into the ``load_table_*`` methods.
216 client_info (Optional[google.api_core.client_info.ClientInfo]):
217 The client info used to send a user-agent string along with API
218 requests. If ``None``, then default info will be used. Generally,
219 you only need to set this if you're developing your own library
220 or partner tool.
221 client_options (Optional[Union[google.api_core.client_options.ClientOptions, Dict]]):
222 Client options used to set user options on the client. API Endpoint
223 should be set through client_options.
225 Raises:
226 google.auth.exceptions.DefaultCredentialsError:
227 Raised if ``credentials`` is not specified and the library fails
228 to acquire default credentials.
229 """
231 SCOPE = ("https://www.googleapis.com/auth/cloud-platform",) # type: ignore
232 """The scopes required for authenticating as a BigQuery consumer."""
234 def __init__(
235 self,
236 project=None,
237 credentials=None,
238 _http=None,
239 location=None,
240 default_query_job_config=None,
241 default_load_job_config=None,
242 client_info=None,
243 client_options=None,
244 ) -> None:
245 super(Client, self).__init__(
246 project=project,
247 credentials=credentials,
248 client_options=client_options,
249 _http=_http,
250 )
252 kw_args = {"client_info": client_info}
253 bq_host = _get_bigquery_host()
254 kw_args["api_endpoint"] = bq_host if bq_host != _DEFAULT_HOST else None
255 if client_options:
256 if type(client_options) == dict:
257 client_options = google.api_core.client_options.from_dict(
258 client_options
259 )
260 if client_options.api_endpoint:
261 api_endpoint = client_options.api_endpoint
262 kw_args["api_endpoint"] = api_endpoint
264 self._connection = Connection(self, **kw_args)
265 self._location = location
266 self._default_query_job_config = copy.deepcopy(default_query_job_config)
267 self._default_load_job_config = copy.deepcopy(default_load_job_config)
269 @property
270 def location(self):
271 """Default location for jobs / datasets / tables."""
272 return self._location
274 @property
275 def default_query_job_config(self):
276 """Default ``QueryJobConfig``.
277 Will be merged into job configs passed into the ``query`` method.
278 """
279 return self._default_query_job_config
281 @default_query_job_config.setter
282 def default_query_job_config(self, value: QueryJobConfig):
283 self._default_query_job_config = copy.deepcopy(value)
285 @property
286 def default_load_job_config(self):
287 """Default ``LoadJobConfig``.
288 Will be merged into job configs passed into the ``load_table_*`` methods.
289 """
290 return self._default_load_job_config
292 @default_load_job_config.setter
293 def default_load_job_config(self, value: LoadJobConfig):
294 self._default_load_job_config = copy.deepcopy(value)
296 def close(self):
297 """Close the underlying transport objects, releasing system resources.
299 .. note::
301 The client instance can be used for making additional requests even
302 after closing, in which case the underlying connections are
303 automatically re-created.
304 """
305 self._http._auth_request.session.close()
306 self._http.close()
308 def get_service_account_email(
309 self,
310 project: str = None,
311 retry: retries.Retry = DEFAULT_RETRY,
312 timeout: TimeoutType = DEFAULT_TIMEOUT,
313 ) -> str:
314 """Get the email address of the project's BigQuery service account
316 Note:
317 This is the service account that BigQuery uses to manage tables
318 encrypted by a key in KMS.
320 Args:
321 project (Optional[str]):
322 Project ID to use for retreiving service account email.
323 Defaults to the client's project.
324 retry (Optional[google.api_core.retry.Retry]): How to retry the RPC.
325 timeout (Optional[float]):
326 The number of seconds to wait for the underlying HTTP transport
327 before using ``retry``.
329 Returns:
330 str: service account email address
332 Example:
334 >>> from google.cloud import bigquery
335 >>> client = bigquery.Client()
336 >>> client.get_service_account_email()
337 my_service_account@my-project.iam.gserviceaccount.com
339 """
340 if project is None:
341 project = self.project
342 path = "/projects/%s/serviceAccount" % (project,)
343 span_attributes = {"path": path}
344 api_response = self._call_api(
345 retry,
346 span_name="BigQuery.getServiceAccountEmail",
347 span_attributes=span_attributes,
348 method="GET",
349 path=path,
350 timeout=timeout,
351 )
352 return api_response["email"]
354 def list_projects(
355 self,
356 max_results: Optional[int] = None,
357 page_token: str = None,
358 retry: retries.Retry = DEFAULT_RETRY,
359 timeout: TimeoutType = DEFAULT_TIMEOUT,
360 page_size: Optional[int] = None,
361 ) -> page_iterator.Iterator:
362 """List projects for the project associated with this client.
364 See
365 https://cloud.google.com/bigquery/docs/reference/rest/v2/projects/list
367 Args:
368 max_results (Optional[int]):
369 Maximum number of projects to return.
370 Defaults to a value set by the API.
372 page_token (Optional[str]):
373 Token representing a cursor into the projects. If not passed,
374 the API will return the first page of projects. The token marks
375 the beginning of the iterator to be returned and the value of
376 the ``page_token`` can be accessed at ``next_page_token`` of the
377 :class:`~google.api_core.page_iterator.HTTPIterator`.
379 retry (Optional[google.api_core.retry.Retry]): How to retry the RPC.
381 timeout (Optional[float]):
382 The number of seconds to wait for the underlying HTTP transport
383 before using ``retry``.
385 page_size (Optional[int]):
386 Maximum number of projects to return in each page.
387 Defaults to a value set by the API.
389 Returns:
390 google.api_core.page_iterator.Iterator:
391 Iterator of :class:`~google.cloud.bigquery.client.Project`
392 accessible to the current client.
393 """
394 span_attributes = {"path": "/projects"}
396 def api_request(*args, **kwargs):
397 return self._call_api(
398 retry,
399 span_name="BigQuery.listProjects",
400 span_attributes=span_attributes,
401 *args,
402 timeout=timeout,
403 **kwargs,
404 )
406 return page_iterator.HTTPIterator(
407 client=self,
408 api_request=api_request,
409 path="/projects",
410 item_to_value=_item_to_project,
411 items_key="projects",
412 page_token=page_token,
413 max_results=max_results,
414 page_size=page_size,
415 )
417 def list_datasets(
418 self,
419 project: str = None,
420 include_all: bool = False,
421 filter: str = None,
422 max_results: Optional[int] = None,
423 page_token: str = None,
424 retry: retries.Retry = DEFAULT_RETRY,
425 timeout: TimeoutType = DEFAULT_TIMEOUT,
426 page_size: Optional[int] = None,
427 ) -> page_iterator.Iterator:
428 """List datasets for the project associated with this client.
430 See
431 https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets/list
433 Args:
434 project (Optional[str]):
435 Project ID to use for retreiving datasets. Defaults to the
436 client's project.
437 include_all (Optional[bool]):
438 True if results include hidden datasets. Defaults to False.
439 filter (Optional[str]):
440 An expression for filtering the results by label.
441 For syntax, see
442 https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets/list#body.QUERY_PARAMETERS.filter
443 max_results (Optional[int]):
444 Maximum number of datasets to return.
445 page_token (Optional[str]):
446 Token representing a cursor into the datasets. If not passed,
447 the API will return the first page of datasets. The token marks
448 the beginning of the iterator to be returned and the value of
449 the ``page_token`` can be accessed at ``next_page_token`` of the
450 :class:`~google.api_core.page_iterator.HTTPIterator`.
451 retry (Optional[google.api_core.retry.Retry]):
452 How to retry the RPC.
453 timeout (Optional[float]):
454 The number of seconds to wait for the underlying HTTP transport
455 before using ``retry``.
456 page_size (Optional[int]):
457 Maximum number of datasets to return per page.
459 Returns:
460 google.api_core.page_iterator.Iterator:
461 Iterator of :class:`~google.cloud.bigquery.dataset.DatasetListItem`.
462 associated with the project.
463 """
464 extra_params: Dict[str, Any] = {}
465 if project is None:
466 project = self.project
467 if include_all:
468 extra_params["all"] = True
469 if filter:
470 # TODO: consider supporting a dict of label -> value for filter,
471 # and converting it into a string here.
472 extra_params["filter"] = filter
473 path = "/projects/%s/datasets" % (project,)
475 span_attributes = {"path": path}
477 def api_request(*args, **kwargs):
479 return self._call_api(
480 retry,
481 span_name="BigQuery.listDatasets",
482 span_attributes=span_attributes,
483 *args,
484 timeout=timeout,
485 **kwargs,
486 )
488 return page_iterator.HTTPIterator(
489 client=self,
490 api_request=api_request,
491 path=path,
492 item_to_value=_item_to_dataset,
493 items_key="datasets",
494 page_token=page_token,
495 max_results=max_results,
496 extra_params=extra_params,
497 page_size=page_size,
498 )
500 def dataset(self, dataset_id: str, project: str = None) -> DatasetReference:
501 """Deprecated: Construct a reference to a dataset.
503 .. deprecated:: 1.24.0
504 Construct a
505 :class:`~google.cloud.bigquery.dataset.DatasetReference` using its
506 constructor or use a string where previously a reference object
507 was used.
509 As of ``google-cloud-bigquery`` version 1.7.0, all client methods
510 that take a
511 :class:`~google.cloud.bigquery.dataset.DatasetReference` or
512 :class:`~google.cloud.bigquery.table.TableReference` also take a
513 string in standard SQL format, e.g. ``project.dataset_id`` or
514 ``project.dataset_id.table_id``.
516 Args:
517 dataset_id (str): ID of the dataset.
519 project (Optional[str]):
520 Project ID for the dataset (defaults to the project of the client).
522 Returns:
523 google.cloud.bigquery.dataset.DatasetReference:
524 a new ``DatasetReference`` instance.
525 """
526 if project is None:
527 project = self.project
529 warnings.warn(
530 "Client.dataset is deprecated and will be removed in a future version. "
531 "Use a string like 'my_project.my_dataset' or a "
532 "cloud.google.bigquery.DatasetReference object, instead.",
533 PendingDeprecationWarning,
534 stacklevel=2,
535 )
536 return DatasetReference(project, dataset_id)
538 def _ensure_bqstorage_client(
539 self,
540 bqstorage_client: Optional[
541 "google.cloud.bigquery_storage.BigQueryReadClient"
542 ] = None,
543 client_options: Optional[google.api_core.client_options.ClientOptions] = None,
544 client_info: Optional[
545 "google.api_core.gapic_v1.client_info.ClientInfo"
546 ] = DEFAULT_BQSTORAGE_CLIENT_INFO,
547 ) -> Optional["google.cloud.bigquery_storage.BigQueryReadClient"]:
548 """Create a BigQuery Storage API client using this client's credentials.
550 Args:
551 bqstorage_client:
552 An existing BigQuery Storage client instance. If ``None``, a new
553 instance is created and returned.
554 client_options:
555 Custom options used with a new BigQuery Storage client instance if one
556 is created.
557 client_info:
558 The client info used with a new BigQuery Storage client instance if one
559 is created.
561 Returns:
562 A BigQuery Storage API client.
563 """
564 try:
565 from google.cloud import bigquery_storage # type: ignore
566 except ImportError:
567 warnings.warn(
568 "Cannot create BigQuery Storage client, the dependency "
569 "google-cloud-bigquery-storage is not installed."
570 )
571 return None
573 try:
574 BQ_STORAGE_VERSIONS.verify_version()
575 except LegacyBigQueryStorageError as exc:
576 warnings.warn(str(exc))
577 return None
578 if bqstorage_client is None:
579 bqstorage_client = bigquery_storage.BigQueryReadClient(
580 credentials=self._credentials,
581 client_options=client_options,
582 client_info=client_info, # type: ignore # (None is also accepted)
583 )
585 return bqstorage_client
587 def _dataset_from_arg(self, dataset) -> Union[Dataset, DatasetReference]:
588 if isinstance(dataset, str):
589 dataset = DatasetReference.from_string(
590 dataset, default_project=self.project
591 )
593 if not isinstance(dataset, (Dataset, DatasetReference)):
594 if isinstance(dataset, DatasetListItem):
595 dataset = dataset.reference
596 else:
597 raise TypeError(
598 "dataset must be a Dataset, DatasetReference, DatasetListItem,"
599 " or string"
600 )
601 return dataset
603 def create_dataset(
604 self,
605 dataset: Union[str, Dataset, DatasetReference, DatasetListItem],
606 exists_ok: bool = False,
607 retry: retries.Retry = DEFAULT_RETRY,
608 timeout: TimeoutType = DEFAULT_TIMEOUT,
609 ) -> Dataset:
610 """API call: create the dataset via a POST request.
612 See
613 https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets/insert
615 Args:
616 dataset (Union[ \
617 google.cloud.bigquery.dataset.Dataset, \
618 google.cloud.bigquery.dataset.DatasetReference, \
619 google.cloud.bigquery.dataset.DatasetListItem, \
620 str, \
621 ]):
622 A :class:`~google.cloud.bigquery.dataset.Dataset` to create.
623 If ``dataset`` is a reference, an empty dataset is created
624 with the specified ID and client's default location.
625 exists_ok (Optional[bool]):
626 Defaults to ``False``. If ``True``, ignore "already exists"
627 errors when creating the dataset.
628 retry (Optional[google.api_core.retry.Retry]):
629 How to retry the RPC.
630 timeout (Optional[float]):
631 The number of seconds to wait for the underlying HTTP transport
632 before using ``retry``.
634 Returns:
635 google.cloud.bigquery.dataset.Dataset:
636 A new ``Dataset`` returned from the API.
638 Raises:
639 google.cloud.exceptions.Conflict:
640 If the dataset already exists.
642 Example:
644 >>> from google.cloud import bigquery
645 >>> client = bigquery.Client()
646 >>> dataset = bigquery.Dataset('my_project.my_dataset')
647 >>> dataset = client.create_dataset(dataset)
649 """
650 dataset = self._dataset_from_arg(dataset)
651 if isinstance(dataset, DatasetReference):
652 dataset = Dataset(dataset)
654 path = "/projects/%s/datasets" % (dataset.project,)
656 data = dataset.to_api_repr()
657 if data.get("location") is None and self.location is not None:
658 data["location"] = self.location
660 try:
661 span_attributes = {"path": path}
663 api_response = self._call_api(
664 retry,
665 span_name="BigQuery.createDataset",
666 span_attributes=span_attributes,
667 method="POST",
668 path=path,
669 data=data,
670 timeout=timeout,
671 )
672 return Dataset.from_api_repr(api_response)
673 except core_exceptions.Conflict:
674 if not exists_ok:
675 raise
676 return self.get_dataset(dataset.reference, retry=retry)
678 def create_routine(
679 self,
680 routine: Routine,
681 exists_ok: bool = False,
682 retry: retries.Retry = DEFAULT_RETRY,
683 timeout: TimeoutType = DEFAULT_TIMEOUT,
684 ) -> Routine:
685 """[Beta] Create a routine via a POST request.
687 See
688 https://cloud.google.com/bigquery/docs/reference/rest/v2/routines/insert
690 Args:
691 routine (google.cloud.bigquery.routine.Routine):
692 A :class:`~google.cloud.bigquery.routine.Routine` to create.
693 The dataset that the routine belongs to must already exist.
694 exists_ok (Optional[bool]):
695 Defaults to ``False``. If ``True``, ignore "already exists"
696 errors when creating the routine.
697 retry (Optional[google.api_core.retry.Retry]):
698 How to retry the RPC.
699 timeout (Optional[float]):
700 The number of seconds to wait for the underlying HTTP transport
701 before using ``retry``.
703 Returns:
704 google.cloud.bigquery.routine.Routine:
705 A new ``Routine`` returned from the service.
707 Raises:
708 google.cloud.exceptions.Conflict:
709 If the routine already exists.
710 """
711 reference = routine.reference
712 path = "/projects/{}/datasets/{}/routines".format(
713 reference.project, reference.dataset_id
714 )
715 resource = routine.to_api_repr()
716 try:
717 span_attributes = {"path": path}
718 api_response = self._call_api(
719 retry,
720 span_name="BigQuery.createRoutine",
721 span_attributes=span_attributes,
722 method="POST",
723 path=path,
724 data=resource,
725 timeout=timeout,
726 )
727 return Routine.from_api_repr(api_response)
728 except core_exceptions.Conflict:
729 if not exists_ok:
730 raise
731 return self.get_routine(routine.reference, retry=retry)
733 def create_table(
734 self,
735 table: Union[str, Table, TableReference, TableListItem],
736 exists_ok: bool = False,
737 retry: retries.Retry = DEFAULT_RETRY,
738 timeout: TimeoutType = DEFAULT_TIMEOUT,
739 ) -> Table:
740 """API call: create a table via a PUT request
742 See
743 https://cloud.google.com/bigquery/docs/reference/rest/v2/tables/insert
745 Args:
746 table (Union[ \
747 google.cloud.bigquery.table.Table, \
748 google.cloud.bigquery.table.TableReference, \
749 google.cloud.bigquery.table.TableListItem, \
750 str, \
751 ]):
752 A :class:`~google.cloud.bigquery.table.Table` to create.
753 If ``table`` is a reference, an empty table is created
754 with the specified ID. The dataset that the table belongs to
755 must already exist.
756 exists_ok (Optional[bool]):
757 Defaults to ``False``. If ``True``, ignore "already exists"
758 errors when creating the table.
759 retry (Optional[google.api_core.retry.Retry]):
760 How to retry the RPC.
761 timeout (Optional[float]):
762 The number of seconds to wait for the underlying HTTP transport
763 before using ``retry``.
765 Returns:
766 google.cloud.bigquery.table.Table:
767 A new ``Table`` returned from the service.
769 Raises:
770 google.cloud.exceptions.Conflict:
771 If the table already exists.
772 """
773 table = _table_arg_to_table(table, default_project=self.project)
774 dataset_id = table.dataset_id
775 path = "/projects/%s/datasets/%s/tables" % (table.project, dataset_id)
776 data = table.to_api_repr()
777 try:
778 span_attributes = {"path": path, "dataset_id": dataset_id}
779 api_response = self._call_api(
780 retry,
781 span_name="BigQuery.createTable",
782 span_attributes=span_attributes,
783 method="POST",
784 path=path,
785 data=data,
786 timeout=timeout,
787 )
788 return Table.from_api_repr(api_response)
789 except core_exceptions.Conflict:
790 if not exists_ok:
791 raise
792 return self.get_table(table.reference, retry=retry)
794 def _call_api(
795 self,
796 retry,
797 span_name=None,
798 span_attributes=None,
799 job_ref=None,
800 headers: Optional[Dict[str, str]] = None,
801 **kwargs,
802 ):
803 kwargs = _add_server_timeout_header(headers, kwargs)
804 call = functools.partial(self._connection.api_request, **kwargs)
806 if retry:
807 call = retry(call)
809 if span_name is not None:
810 with create_span(
811 name=span_name, attributes=span_attributes, client=self, job_ref=job_ref
812 ):
813 return call()
815 return call()
817 def get_dataset(
818 self,
819 dataset_ref: Union[DatasetReference, str],
820 retry: retries.Retry = DEFAULT_RETRY,
821 timeout: TimeoutType = DEFAULT_TIMEOUT,
822 ) -> Dataset:
823 """Fetch the dataset referenced by ``dataset_ref``
825 Args:
826 dataset_ref (Union[ \
827 google.cloud.bigquery.dataset.DatasetReference, \
828 str, \
829 ]):
830 A reference to the dataset to fetch from the BigQuery API.
831 If a string is passed in, this method attempts to create a
832 dataset reference from a string using
833 :func:`~google.cloud.bigquery.dataset.DatasetReference.from_string`.
834 retry (Optional[google.api_core.retry.Retry]):
835 How to retry the RPC.
836 timeout (Optional[float]):
837 The number of seconds to wait for the underlying HTTP transport
838 before using ``retry``.
840 Returns:
841 google.cloud.bigquery.dataset.Dataset:
842 A ``Dataset`` instance.
843 """
844 if isinstance(dataset_ref, str):
845 dataset_ref = DatasetReference.from_string(
846 dataset_ref, default_project=self.project
847 )
848 path = dataset_ref.path
849 span_attributes = {"path": path}
850 api_response = self._call_api(
851 retry,
852 span_name="BigQuery.getDataset",
853 span_attributes=span_attributes,
854 method="GET",
855 path=path,
856 timeout=timeout,
857 )
858 return Dataset.from_api_repr(api_response)
860 def get_iam_policy(
861 self,
862 table: Union[Table, TableReference, TableListItem, str],
863 requested_policy_version: int = 1,
864 retry: retries.Retry = DEFAULT_RETRY,
865 timeout: TimeoutType = DEFAULT_TIMEOUT,
866 ) -> Policy:
867 table = _table_arg_to_table_ref(table, default_project=self.project)
869 if requested_policy_version != 1:
870 raise ValueError("only IAM policy version 1 is supported")
872 body = {"options": {"requestedPolicyVersion": 1}}
874 path = "{}:getIamPolicy".format(table.path)
875 span_attributes = {"path": path}
876 response = self._call_api(
877 retry,
878 span_name="BigQuery.getIamPolicy",
879 span_attributes=span_attributes,
880 method="POST",
881 path=path,
882 data=body,
883 timeout=timeout,
884 )
886 return Policy.from_api_repr(response)
888 def set_iam_policy(
889 self,
890 table: Union[Table, TableReference, TableListItem, str],
891 policy: Policy,
892 updateMask: str = None,
893 retry: retries.Retry = DEFAULT_RETRY,
894 timeout: TimeoutType = DEFAULT_TIMEOUT,
895 ) -> Policy:
896 table = _table_arg_to_table_ref(table, default_project=self.project)
898 if not isinstance(policy, (Policy)):
899 raise TypeError("policy must be a Policy")
901 body = {"policy": policy.to_api_repr()}
903 if updateMask is not None:
904 body["updateMask"] = updateMask
906 path = "{}:setIamPolicy".format(table.path)
907 span_attributes = {"path": path}
909 response = self._call_api(
910 retry,
911 span_name="BigQuery.setIamPolicy",
912 span_attributes=span_attributes,
913 method="POST",
914 path=path,
915 data=body,
916 timeout=timeout,
917 )
919 return Policy.from_api_repr(response)
921 def test_iam_permissions(
922 self,
923 table: Union[Table, TableReference, TableListItem, str],
924 permissions: Sequence[str],
925 retry: retries.Retry = DEFAULT_RETRY,
926 timeout: TimeoutType = DEFAULT_TIMEOUT,
927 ) -> Dict[str, Any]:
928 table = _table_arg_to_table_ref(table, default_project=self.project)
930 body = {"permissions": permissions}
932 path = "{}:testIamPermissions".format(table.path)
933 span_attributes = {"path": path}
934 response = self._call_api(
935 retry,
936 span_name="BigQuery.testIamPermissions",
937 span_attributes=span_attributes,
938 method="POST",
939 path=path,
940 data=body,
941 timeout=timeout,
942 )
944 return response
946 def get_model(
947 self,
948 model_ref: Union[ModelReference, str],
949 retry: retries.Retry = DEFAULT_RETRY,
950 timeout: TimeoutType = DEFAULT_TIMEOUT,
951 ) -> Model:
952 """[Beta] Fetch the model referenced by ``model_ref``.
954 Args:
955 model_ref (Union[ \
956 google.cloud.bigquery.model.ModelReference, \
957 str, \
958 ]):
959 A reference to the model to fetch from the BigQuery API.
960 If a string is passed in, this method attempts to create a
961 model reference from a string using
962 :func:`google.cloud.bigquery.model.ModelReference.from_string`.
963 retry (Optional[google.api_core.retry.Retry]):
964 How to retry the RPC.
965 timeout (Optional[float]):
966 The number of seconds to wait for the underlying HTTP transport
967 before using ``retry``.
969 Returns:
970 google.cloud.bigquery.model.Model: A ``Model`` instance.
971 """
972 if isinstance(model_ref, str):
973 model_ref = ModelReference.from_string(
974 model_ref, default_project=self.project
975 )
976 path = model_ref.path
977 span_attributes = {"path": path}
979 api_response = self._call_api(
980 retry,
981 span_name="BigQuery.getModel",
982 span_attributes=span_attributes,
983 method="GET",
984 path=path,
985 timeout=timeout,
986 )
987 return Model.from_api_repr(api_response)
989 def get_routine(
990 self,
991 routine_ref: Union[Routine, RoutineReference, str],
992 retry: retries.Retry = DEFAULT_RETRY,
993 timeout: TimeoutType = DEFAULT_TIMEOUT,
994 ) -> Routine:
995 """[Beta] Get the routine referenced by ``routine_ref``.
997 Args:
998 routine_ref (Union[ \
999 google.cloud.bigquery.routine.Routine, \
1000 google.cloud.bigquery.routine.RoutineReference, \
1001 str, \
1002 ]):
1003 A reference to the routine to fetch from the BigQuery API. If
1004 a string is passed in, this method attempts to create a
1005 reference from a string using
1006 :func:`google.cloud.bigquery.routine.RoutineReference.from_string`.
1007 retry (Optional[google.api_core.retry.Retry]):
1008 How to retry the API call.
1009 timeout (Optional[float]):
1010 The number of seconds to wait for the underlying HTTP transport
1011 before using ``retry``.
1013 Returns:
1014 google.cloud.bigquery.routine.Routine:
1015 A ``Routine`` instance.
1016 """
1017 if isinstance(routine_ref, str):
1018 routine_ref = RoutineReference.from_string(
1019 routine_ref, default_project=self.project
1020 )
1021 path = routine_ref.path
1022 span_attributes = {"path": path}
1023 api_response = self._call_api(
1024 retry,
1025 span_name="BigQuery.getRoutine",
1026 span_attributes=span_attributes,
1027 method="GET",
1028 path=path,
1029 timeout=timeout,
1030 )
1031 return Routine.from_api_repr(api_response)
1033 def get_table(
1034 self,
1035 table: Union[Table, TableReference, TableListItem, str],
1036 retry: retries.Retry = DEFAULT_RETRY,
1037 timeout: TimeoutType = DEFAULT_TIMEOUT,
1038 ) -> Table:
1039 """Fetch the table referenced by ``table``.
1041 Args:
1042 table (Union[ \
1043 google.cloud.bigquery.table.Table, \
1044 google.cloud.bigquery.table.TableReference, \
1045 google.cloud.bigquery.table.TableListItem, \
1046 str, \
1047 ]):
1048 A reference to the table to fetch from the BigQuery API.
1049 If a string is passed in, this method attempts to create a
1050 table reference from a string using
1051 :func:`google.cloud.bigquery.table.TableReference.from_string`.
1052 retry (Optional[google.api_core.retry.Retry]):
1053 How to retry the RPC.
1054 timeout (Optional[float]):
1055 The number of seconds to wait for the underlying HTTP transport
1056 before using ``retry``.
1058 Returns:
1059 google.cloud.bigquery.table.Table:
1060 A ``Table`` instance.
1061 """
1062 table_ref = _table_arg_to_table_ref(table, default_project=self.project)
1063 path = table_ref.path
1064 span_attributes = {"path": path}
1065 api_response = self._call_api(
1066 retry,
1067 span_name="BigQuery.getTable",
1068 span_attributes=span_attributes,
1069 method="GET",
1070 path=path,
1071 timeout=timeout,
1072 )
1073 return Table.from_api_repr(api_response)
1075 def update_dataset(
1076 self,
1077 dataset: Dataset,
1078 fields: Sequence[str],
1079 retry: retries.Retry = DEFAULT_RETRY,
1080 timeout: TimeoutType = DEFAULT_TIMEOUT,
1081 ) -> Dataset:
1082 """Change some fields of a dataset.
1084 Use ``fields`` to specify which fields to update. At least one field
1085 must be provided. If a field is listed in ``fields`` and is ``None`` in
1086 ``dataset``, it will be deleted.
1088 If ``dataset.etag`` is not ``None``, the update will only
1089 succeed if the dataset on the server has the same ETag. Thus
1090 reading a dataset with ``get_dataset``, changing its fields,
1091 and then passing it to ``update_dataset`` will ensure that the changes
1092 will only be saved if no modifications to the dataset occurred
1093 since the read.
1095 Args:
1096 dataset (google.cloud.bigquery.dataset.Dataset):
1097 The dataset to update.
1098 fields (Sequence[str]):
1099 The properties of ``dataset`` to change. These are strings
1100 corresponding to the properties of
1101 :class:`~google.cloud.bigquery.dataset.Dataset`.
1103 For example, to update the default expiration times, specify
1104 both properties in the ``fields`` argument:
1106 .. code-block:: python
1108 bigquery_client.update_dataset(
1109 dataset,
1110 [
1111 "default_partition_expiration_ms",
1112 "default_table_expiration_ms",
1113 ]
1114 )
1115 retry (Optional[google.api_core.retry.Retry]):
1116 How to retry the RPC.
1117 timeout (Optional[float]):
1118 The number of seconds to wait for the underlying HTTP transport
1119 before using ``retry``.
1121 Returns:
1122 google.cloud.bigquery.dataset.Dataset:
1123 The modified ``Dataset`` instance.
1124 """
1125 partial = dataset._build_resource(fields)
1126 if dataset.etag is not None:
1127 headers: Optional[Dict[str, str]] = {"If-Match": dataset.etag}
1128 else:
1129 headers = None
1130 path = dataset.path
1131 span_attributes = {"path": path, "fields": fields}
1133 api_response = self._call_api(
1134 retry,
1135 span_name="BigQuery.updateDataset",
1136 span_attributes=span_attributes,
1137 method="PATCH",
1138 path=path,
1139 data=partial,
1140 headers=headers,
1141 timeout=timeout,
1142 )
1143 return Dataset.from_api_repr(api_response)
1145 def update_model(
1146 self,
1147 model: Model,
1148 fields: Sequence[str],
1149 retry: retries.Retry = DEFAULT_RETRY,
1150 timeout: TimeoutType = DEFAULT_TIMEOUT,
1151 ) -> Model:
1152 """[Beta] Change some fields of a model.
1154 Use ``fields`` to specify which fields to update. At least one field
1155 must be provided. If a field is listed in ``fields`` and is ``None``
1156 in ``model``, the field value will be deleted.
1158 If ``model.etag`` is not ``None``, the update will only succeed if
1159 the model on the server has the same ETag. Thus reading a model with
1160 ``get_model``, changing its fields, and then passing it to
1161 ``update_model`` will ensure that the changes will only be saved if
1162 no modifications to the model occurred since the read.
1164 Args:
1165 model (google.cloud.bigquery.model.Model): The model to update.
1166 fields (Sequence[str]):
1167 The properties of ``model`` to change. These are strings
1168 corresponding to the properties of
1169 :class:`~google.cloud.bigquery.model.Model`.
1171 For example, to update the descriptive properties of the model,
1172 specify them in the ``fields`` argument:
1174 .. code-block:: python
1176 bigquery_client.update_model(
1177 model, ["description", "friendly_name"]
1178 )
1179 retry (Optional[google.api_core.retry.Retry]):
1180 A description of how to retry the API call.
1181 timeout (Optional[float]):
1182 The number of seconds to wait for the underlying HTTP transport
1183 before using ``retry``.
1185 Returns:
1186 google.cloud.bigquery.model.Model:
1187 The model resource returned from the API call.
1188 """
1189 partial = model._build_resource(fields)
1190 if model.etag:
1191 headers: Optional[Dict[str, str]] = {"If-Match": model.etag}
1192 else:
1193 headers = None
1194 path = model.path
1195 span_attributes = {"path": path, "fields": fields}
1197 api_response = self._call_api(
1198 retry,
1199 span_name="BigQuery.updateModel",
1200 span_attributes=span_attributes,
1201 method="PATCH",
1202 path=path,
1203 data=partial,
1204 headers=headers,
1205 timeout=timeout,
1206 )
1207 return Model.from_api_repr(api_response)
1209 def update_routine(
1210 self,
1211 routine: Routine,
1212 fields: Sequence[str],
1213 retry: retries.Retry = DEFAULT_RETRY,
1214 timeout: TimeoutType = DEFAULT_TIMEOUT,
1215 ) -> Routine:
1216 """[Beta] Change some fields of a routine.
1218 Use ``fields`` to specify which fields to update. At least one field
1219 must be provided. If a field is listed in ``fields`` and is ``None``
1220 in ``routine``, the field value will be deleted.
1222 .. warning::
1223 During beta, partial updates are not supported. You must provide
1224 all fields in the resource.
1226 If :attr:`~google.cloud.bigquery.routine.Routine.etag` is not
1227 ``None``, the update will only succeed if the resource on the server
1228 has the same ETag. Thus reading a routine with
1229 :func:`~google.cloud.bigquery.client.Client.get_routine`, changing
1230 its fields, and then passing it to this method will ensure that the
1231 changes will only be saved if no modifications to the resource
1232 occurred since the read.
1234 Args:
1235 routine (google.cloud.bigquery.routine.Routine):
1236 The routine to update.
1237 fields (Sequence[str]):
1238 The fields of ``routine`` to change, spelled as the
1239 :class:`~google.cloud.bigquery.routine.Routine` properties.
1241 For example, to update the description property of the routine,
1242 specify it in the ``fields`` argument:
1244 .. code-block:: python
1246 bigquery_client.update_routine(
1247 routine, ["description"]
1248 )
1249 retry (Optional[google.api_core.retry.Retry]):
1250 A description of how to retry the API call.
1251 timeout (Optional[float]):
1252 The number of seconds to wait for the underlying HTTP transport
1253 before using ``retry``.
1255 Returns:
1256 google.cloud.bigquery.routine.Routine:
1257 The routine resource returned from the API call.
1258 """
1259 partial = routine._build_resource(fields)
1260 if routine.etag:
1261 headers: Optional[Dict[str, str]] = {"If-Match": routine.etag}
1262 else:
1263 headers = None
1265 # TODO: remove when routines update supports partial requests.
1266 partial["routineReference"] = routine.reference.to_api_repr()
1268 path = routine.path
1269 span_attributes = {"path": path, "fields": fields}
1271 api_response = self._call_api(
1272 retry,
1273 span_name="BigQuery.updateRoutine",
1274 span_attributes=span_attributes,
1275 method="PUT",
1276 path=path,
1277 data=partial,
1278 headers=headers,
1279 timeout=timeout,
1280 )
1281 return Routine.from_api_repr(api_response)
1283 def update_table(
1284 self,
1285 table: Table,
1286 fields: Sequence[str],
1287 retry: retries.Retry = DEFAULT_RETRY,
1288 timeout: TimeoutType = DEFAULT_TIMEOUT,
1289 ) -> Table:
1290 """Change some fields of a table.
1292 Use ``fields`` to specify which fields to update. At least one field
1293 must be provided. If a field is listed in ``fields`` and is ``None``
1294 in ``table``, the field value will be deleted.
1296 If ``table.etag`` is not ``None``, the update will only succeed if
1297 the table on the server has the same ETag. Thus reading a table with
1298 ``get_table``, changing its fields, and then passing it to
1299 ``update_table`` will ensure that the changes will only be saved if
1300 no modifications to the table occurred since the read.
1302 Args:
1303 table (google.cloud.bigquery.table.Table): The table to update.
1304 fields (Sequence[str]):
1305 The fields of ``table`` to change, spelled as the
1306 :class:`~google.cloud.bigquery.table.Table` properties.
1308 For example, to update the descriptive properties of the table,
1309 specify them in the ``fields`` argument:
1311 .. code-block:: python
1313 bigquery_client.update_table(
1314 table,
1315 ["description", "friendly_name"]
1316 )
1317 retry (Optional[google.api_core.retry.Retry]):
1318 A description of how to retry the API call.
1319 timeout (Optional[float]):
1320 The number of seconds to wait for the underlying HTTP transport
1321 before using ``retry``.
1323 Returns:
1324 google.cloud.bigquery.table.Table:
1325 The table resource returned from the API call.
1326 """
1327 partial = table._build_resource(fields)
1328 if table.etag is not None:
1329 headers: Optional[Dict[str, str]] = {"If-Match": table.etag}
1330 else:
1331 headers = None
1333 path = table.path
1334 span_attributes = {"path": path, "fields": fields}
1336 api_response = self._call_api(
1337 retry,
1338 span_name="BigQuery.updateTable",
1339 span_attributes=span_attributes,
1340 method="PATCH",
1341 path=path,
1342 data=partial,
1343 headers=headers,
1344 timeout=timeout,
1345 )
1346 return Table.from_api_repr(api_response)
1348 def list_models(
1349 self,
1350 dataset: Union[Dataset, DatasetReference, DatasetListItem, str],
1351 max_results: Optional[int] = None,
1352 page_token: str = None,
1353 retry: retries.Retry = DEFAULT_RETRY,
1354 timeout: TimeoutType = DEFAULT_TIMEOUT,
1355 page_size: Optional[int] = None,
1356 ) -> page_iterator.Iterator:
1357 """[Beta] List models in the dataset.
1359 See
1360 https://cloud.google.com/bigquery/docs/reference/rest/v2/models/list
1362 Args:
1363 dataset (Union[ \
1364 google.cloud.bigquery.dataset.Dataset, \
1365 google.cloud.bigquery.dataset.DatasetReference, \
1366 google.cloud.bigquery.dataset.DatasetListItem, \
1367 str, \
1368 ]):
1369 A reference to the dataset whose models to list from the
1370 BigQuery API. If a string is passed in, this method attempts
1371 to create a dataset reference from a string using
1372 :func:`google.cloud.bigquery.dataset.DatasetReference.from_string`.
1373 max_results (Optional[int]):
1374 Maximum number of models to return. Defaults to a
1375 value set by the API.
1376 page_token (Optional[str]):
1377 Token representing a cursor into the models. If not passed,
1378 the API will return the first page of models. The token marks
1379 the beginning of the iterator to be returned and the value of
1380 the ``page_token`` can be accessed at ``next_page_token`` of the
1381 :class:`~google.api_core.page_iterator.HTTPIterator`.
1382 retry (Optional[google.api_core.retry.Retry]):
1383 How to retry the RPC.
1384 timeout (Optional[float]):
1385 The number of seconds to wait for the underlying HTTP transport
1386 before using ``retry``.
1387 page_size (Optional[int]):
1388 Maximum number of models to return per page.
1389 Defaults to a value set by the API.
1391 Returns:
1392 google.api_core.page_iterator.Iterator:
1393 Iterator of
1394 :class:`~google.cloud.bigquery.model.Model` contained
1395 within the requested dataset.
1396 """
1397 dataset = self._dataset_from_arg(dataset)
1399 path = "%s/models" % dataset.path
1400 span_attributes = {"path": path}
1402 def api_request(*args, **kwargs):
1403 return self._call_api(
1404 retry,
1405 span_name="BigQuery.listModels",
1406 span_attributes=span_attributes,
1407 *args,
1408 timeout=timeout,
1409 **kwargs,
1410 )
1412 result = page_iterator.HTTPIterator(
1413 client=self,
1414 api_request=api_request,
1415 path=path,
1416 item_to_value=_item_to_model,
1417 items_key="models",
1418 page_token=page_token,
1419 max_results=max_results,
1420 page_size=page_size,
1421 )
1422 result.dataset = dataset # type: ignore
1423 return result
1425 def list_routines(
1426 self,
1427 dataset: Union[Dataset, DatasetReference, DatasetListItem, str],
1428 max_results: Optional[int] = None,
1429 page_token: str = None,
1430 retry: retries.Retry = DEFAULT_RETRY,
1431 timeout: TimeoutType = DEFAULT_TIMEOUT,
1432 page_size: Optional[int] = None,
1433 ) -> page_iterator.Iterator:
1434 """[Beta] List routines in the dataset.
1436 See
1437 https://cloud.google.com/bigquery/docs/reference/rest/v2/routines/list
1439 Args:
1440 dataset (Union[ \
1441 google.cloud.bigquery.dataset.Dataset, \
1442 google.cloud.bigquery.dataset.DatasetReference, \
1443 google.cloud.bigquery.dataset.DatasetListItem, \
1444 str, \
1445 ]):
1446 A reference to the dataset whose routines to list from the
1447 BigQuery API. If a string is passed in, this method attempts
1448 to create a dataset reference from a string using
1449 :func:`google.cloud.bigquery.dataset.DatasetReference.from_string`.
1450 max_results (Optional[int]):
1451 Maximum number of routines to return. Defaults
1452 to a value set by the API.
1453 page_token (Optional[str]):
1454 Token representing a cursor into the routines. If not passed,
1455 the API will return the first page of routines. The token marks
1456 the beginning of the iterator to be returned and the value of the
1457 ``page_token`` can be accessed at ``next_page_token`` of the
1458 :class:`~google.api_core.page_iterator.HTTPIterator`.
1459 retry (Optional[google.api_core.retry.Retry]):
1460 How to retry the RPC.
1461 timeout (Optional[float]):
1462 The number of seconds to wait for the underlying HTTP transport
1463 before using ``retry``.
1464 page_size (Optional[int]):
1465 Maximum number of routines to return per page.
1466 Defaults to a value set by the API.
1468 Returns:
1469 google.api_core.page_iterator.Iterator:
1470 Iterator of all
1471 :class:`~google.cloud.bigquery.routine.Routine`s contained
1472 within the requested dataset, limited by ``max_results``.
1473 """
1474 dataset = self._dataset_from_arg(dataset)
1475 path = "{}/routines".format(dataset.path)
1477 span_attributes = {"path": path}
1479 def api_request(*args, **kwargs):
1480 return self._call_api(
1481 retry,
1482 span_name="BigQuery.listRoutines",
1483 span_attributes=span_attributes,
1484 *args,
1485 timeout=timeout,
1486 **kwargs,
1487 )
1489 result = page_iterator.HTTPIterator(
1490 client=self,
1491 api_request=api_request,
1492 path=path,
1493 item_to_value=_item_to_routine,
1494 items_key="routines",
1495 page_token=page_token,
1496 max_results=max_results,
1497 page_size=page_size,
1498 )
1499 result.dataset = dataset # type: ignore
1500 return result
1502 def list_tables(
1503 self,
1504 dataset: Union[Dataset, DatasetReference, DatasetListItem, str],
1505 max_results: Optional[int] = None,
1506 page_token: str = None,
1507 retry: retries.Retry = DEFAULT_RETRY,
1508 timeout: TimeoutType = DEFAULT_TIMEOUT,
1509 page_size: Optional[int] = None,
1510 ) -> page_iterator.Iterator:
1511 """List tables in the dataset.
1513 See
1514 https://cloud.google.com/bigquery/docs/reference/rest/v2/tables/list
1516 Args:
1517 dataset (Union[ \
1518 google.cloud.bigquery.dataset.Dataset, \
1519 google.cloud.bigquery.dataset.DatasetReference, \
1520 google.cloud.bigquery.dataset.DatasetListItem, \
1521 str, \
1522 ]):
1523 A reference to the dataset whose tables to list from the
1524 BigQuery API. If a string is passed in, this method attempts
1525 to create a dataset reference from a string using
1526 :func:`google.cloud.bigquery.dataset.DatasetReference.from_string`.
1527 max_results (Optional[int]):
1528 Maximum number of tables to return. Defaults
1529 to a value set by the API.
1530 page_token (Optional[str]):
1531 Token representing a cursor into the tables. If not passed,
1532 the API will return the first page of tables. The token marks
1533 the beginning of the iterator to be returned and the value of
1534 the ``page_token`` can be accessed at ``next_page_token`` of the
1535 :class:`~google.api_core.page_iterator.HTTPIterator`.
1536 retry (Optional[google.api_core.retry.Retry]):
1537 How to retry the RPC.
1538 timeout (Optional[float]):
1539 The number of seconds to wait for the underlying HTTP transport
1540 before using ``retry``.
1541 page_size (Optional[int]):
1542 Maximum number of tables to return per page.
1543 Defaults to a value set by the API.
1545 Returns:
1546 google.api_core.page_iterator.Iterator:
1547 Iterator of
1548 :class:`~google.cloud.bigquery.table.TableListItem` contained
1549 within the requested dataset.
1550 """
1551 dataset = self._dataset_from_arg(dataset)
1552 path = "%s/tables" % dataset.path
1553 span_attributes = {"path": path}
1555 def api_request(*args, **kwargs):
1556 return self._call_api(
1557 retry,
1558 span_name="BigQuery.listTables",
1559 span_attributes=span_attributes,
1560 *args,
1561 timeout=timeout,
1562 **kwargs,
1563 )
1565 result = page_iterator.HTTPIterator(
1566 client=self,
1567 api_request=api_request,
1568 path=path,
1569 item_to_value=_item_to_table,
1570 items_key="tables",
1571 page_token=page_token,
1572 max_results=max_results,
1573 page_size=page_size,
1574 )
1575 result.dataset = dataset # type: ignore
1576 return result
1578 def delete_dataset(
1579 self,
1580 dataset: Union[Dataset, DatasetReference, DatasetListItem, str],
1581 delete_contents: bool = False,
1582 retry: retries.Retry = DEFAULT_RETRY,
1583 timeout: TimeoutType = DEFAULT_TIMEOUT,
1584 not_found_ok: bool = False,
1585 ) -> None:
1586 """Delete a dataset.
1588 See
1589 https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets/delete
1591 Args:
1592 dataset (Union[ \
1593 google.cloud.bigquery.dataset.Dataset, \
1594 google.cloud.bigquery.dataset.DatasetReference, \
1595 google.cloud.bigquery.dataset.DatasetListItem, \
1596 str, \
1597 ]):
1598 A reference to the dataset to delete. If a string is passed
1599 in, this method attempts to create a dataset reference from a
1600 string using
1601 :func:`google.cloud.bigquery.dataset.DatasetReference.from_string`.
1602 delete_contents (Optional[bool]):
1603 If True, delete all the tables in the dataset. If False and
1604 the dataset contains tables, the request will fail.
1605 Default is False.
1606 retry (Optional[google.api_core.retry.Retry]):
1607 How to retry the RPC.
1608 timeout (Optional[float]):
1609 The number of seconds to wait for the underlying HTTP transport
1610 before using ``retry``.
1611 not_found_ok (Optional[bool]):
1612 Defaults to ``False``. If ``True``, ignore "not found" errors
1613 when deleting the dataset.
1614 """
1615 dataset = self._dataset_from_arg(dataset)
1616 params = {}
1617 path = dataset.path
1618 if delete_contents:
1619 params["deleteContents"] = "true"
1620 span_attributes = {"path": path, "deleteContents": delete_contents}
1621 else:
1622 span_attributes = {"path": path}
1624 try:
1625 self._call_api(
1626 retry,
1627 span_name="BigQuery.deleteDataset",
1628 span_attributes=span_attributes,
1629 method="DELETE",
1630 path=path,
1631 query_params=params,
1632 timeout=timeout,
1633 )
1634 except core_exceptions.NotFound:
1635 if not not_found_ok:
1636 raise
1638 def delete_model(
1639 self,
1640 model: Union[Model, ModelReference, str],
1641 retry: retries.Retry = DEFAULT_RETRY,
1642 timeout: TimeoutType = DEFAULT_TIMEOUT,
1643 not_found_ok: bool = False,
1644 ) -> None:
1645 """[Beta] Delete a model
1647 See
1648 https://cloud.google.com/bigquery/docs/reference/rest/v2/models/delete
1650 Args:
1651 model (Union[ \
1652 google.cloud.bigquery.model.Model, \
1653 google.cloud.bigquery.model.ModelReference, \
1654 str, \
1655 ]):
1656 A reference to the model to delete. If a string is passed in,
1657 this method attempts to create a model reference from a
1658 string using
1659 :func:`google.cloud.bigquery.model.ModelReference.from_string`.
1660 retry (Optional[google.api_core.retry.Retry]):
1661 How to retry the RPC.
1662 timeout (Optional[float]):
1663 The number of seconds to wait for the underlying HTTP transport
1664 before using ``retry``.
1665 not_found_ok (Optional[bool]):
1666 Defaults to ``False``. If ``True``, ignore "not found" errors
1667 when deleting the model.
1668 """
1669 if isinstance(model, str):
1670 model = ModelReference.from_string(model, default_project=self.project)
1672 if not isinstance(model, (Model, ModelReference)):
1673 raise TypeError("model must be a Model or a ModelReference")
1675 path = model.path
1676 try:
1677 span_attributes = {"path": path}
1678 self._call_api(
1679 retry,
1680 span_name="BigQuery.deleteModel",
1681 span_attributes=span_attributes,
1682 method="DELETE",
1683 path=path,
1684 timeout=timeout,
1685 )
1686 except core_exceptions.NotFound:
1687 if not not_found_ok:
1688 raise
1690 def delete_job_metadata(
1691 self,
1692 job_id: Union[str, LoadJob, CopyJob, ExtractJob, QueryJob],
1693 project: Optional[str] = None,
1694 location: Optional[str] = None,
1695 retry: retries.Retry = DEFAULT_RETRY,
1696 timeout: TimeoutType = DEFAULT_TIMEOUT,
1697 not_found_ok: bool = False,
1698 ):
1699 """[Beta] Delete job metadata from job history.
1701 Note: This does not stop a running job. Use
1702 :func:`~google.cloud.bigquery.client.Client.cancel_job` instead.
1704 Args:
1705 job_id: Job or job identifier.
1707 Keyword Arguments:
1708 project:
1709 ID of the project which owns the job (defaults to the client's project).
1710 location:
1711 Location where the job was run. Ignored if ``job_id`` is a job
1712 object.
1713 retry:
1714 How to retry the RPC.
1715 timeout:
1716 The number of seconds to wait for the underlying HTTP transport
1717 before using ``retry``.
1718 not_found_ok:
1719 Defaults to ``False``. If ``True``, ignore "not found" errors
1720 when deleting the job.
1721 """
1722 extra_params = {}
1724 project, location, job_id = _extract_job_reference(
1725 job_id, project=project, location=location
1726 )
1728 if project is None:
1729 project = self.project
1731 if location is None:
1732 location = self.location
1734 # Location is always required for jobs.delete()
1735 extra_params["location"] = location
1737 path = f"/projects/{project}/jobs/{job_id}/delete"
1739 span_attributes = {"path": path, "job_id": job_id, "location": location}
1741 try:
1742 self._call_api(
1743 retry,
1744 span_name="BigQuery.deleteJob",
1745 span_attributes=span_attributes,
1746 method="DELETE",
1747 path=path,
1748 query_params=extra_params,
1749 timeout=timeout,
1750 )
1751 except google.api_core.exceptions.NotFound:
1752 if not not_found_ok:
1753 raise
1755 def delete_routine(
1756 self,
1757 routine: Union[Routine, RoutineReference, str],
1758 retry: retries.Retry = DEFAULT_RETRY,
1759 timeout: TimeoutType = DEFAULT_TIMEOUT,
1760 not_found_ok: bool = False,
1761 ) -> None:
1762 """[Beta] Delete a routine.
1764 See
1765 https://cloud.google.com/bigquery/docs/reference/rest/v2/routines/delete
1767 Args:
1768 routine (Union[ \
1769 google.cloud.bigquery.routine.Routine, \
1770 google.cloud.bigquery.routine.RoutineReference, \
1771 str, \
1772 ]):
1773 A reference to the routine to delete. If a string is passed
1774 in, this method attempts to create a routine reference from a
1775 string using
1776 :func:`google.cloud.bigquery.routine.RoutineReference.from_string`.
1777 retry (Optional[google.api_core.retry.Retry]):
1778 How to retry the RPC.
1779 timeout (Optional[float]):
1780 The number of seconds to wait for the underlying HTTP transport
1781 before using ``retry``.
1782 not_found_ok (Optional[bool]):
1783 Defaults to ``False``. If ``True``, ignore "not found" errors
1784 when deleting the routine.
1785 """
1786 if isinstance(routine, str):
1787 routine = RoutineReference.from_string(
1788 routine, default_project=self.project
1789 )
1790 path = routine.path
1792 if not isinstance(routine, (Routine, RoutineReference)):
1793 raise TypeError("routine must be a Routine or a RoutineReference")
1795 try:
1796 span_attributes = {"path": path}
1797 self._call_api(
1798 retry,
1799 span_name="BigQuery.deleteRoutine",
1800 span_attributes=span_attributes,
1801 method="DELETE",
1802 path=path,
1803 timeout=timeout,
1804 )
1805 except core_exceptions.NotFound:
1806 if not not_found_ok:
1807 raise
1809 def delete_table(
1810 self,
1811 table: Union[Table, TableReference, TableListItem, str],
1812 retry: retries.Retry = DEFAULT_RETRY,
1813 timeout: TimeoutType = DEFAULT_TIMEOUT,
1814 not_found_ok: bool = False,
1815 ) -> None:
1816 """Delete a table
1818 See
1819 https://cloud.google.com/bigquery/docs/reference/rest/v2/tables/delete
1821 Args:
1822 table (Union[ \
1823 google.cloud.bigquery.table.Table, \
1824 google.cloud.bigquery.table.TableReference, \
1825 google.cloud.bigquery.table.TableListItem, \
1826 str, \
1827 ]):
1828 A reference to the table to delete. If a string is passed in,
1829 this method attempts to create a table reference from a
1830 string using
1831 :func:`google.cloud.bigquery.table.TableReference.from_string`.
1832 retry (Optional[google.api_core.retry.Retry]):
1833 How to retry the RPC.
1834 timeout (Optional[float]):
1835 The number of seconds to wait for the underlying HTTP transport
1836 before using ``retry``.
1837 not_found_ok (Optional[bool]):
1838 Defaults to ``False``. If ``True``, ignore "not found" errors
1839 when deleting the table.
1840 """
1841 table = _table_arg_to_table_ref(table, default_project=self.project)
1842 if not isinstance(table, TableReference):
1843 raise TypeError("Unable to get TableReference for table '{}'".format(table))
1845 try:
1846 path = table.path
1847 span_attributes = {"path": path}
1848 self._call_api(
1849 retry,
1850 span_name="BigQuery.deleteTable",
1851 span_attributes=span_attributes,
1852 method="DELETE",
1853 path=path,
1854 timeout=timeout,
1855 )
1856 except core_exceptions.NotFound:
1857 if not not_found_ok:
1858 raise
1860 def _get_query_results(
1861 self,
1862 job_id: str,
1863 retry: retries.Retry,
1864 project: str = None,
1865 timeout_ms: Optional[int] = None,
1866 location: str = None,
1867 timeout: TimeoutType = DEFAULT_TIMEOUT,
1868 ) -> _QueryResults:
1869 """Get the query results object for a query job.
1871 Args:
1872 job_id (str): Name of the query job.
1873 retry (google.api_core.retry.Retry):
1874 How to retry the RPC.
1875 project (Optional[str]):
1876 Project ID for the query job (defaults to the project of the client).
1877 timeout_ms (Optional[int]):
1878 Number of milliseconds the the API call should wait for the query
1879 to complete before the request times out.
1880 location (Optional[str]): Location of the query job.
1881 timeout (Optional[float]):
1882 The number of seconds to wait for the underlying HTTP transport
1883 before using ``retry``. If set, this connection timeout may be
1884 increased to a minimum value. This prevents retries on what
1885 would otherwise be a successful response.
1887 Returns:
1888 google.cloud.bigquery.query._QueryResults:
1889 A new ``_QueryResults`` instance.
1890 """
1892 extra_params: Dict[str, Any] = {"maxResults": 0}
1894 if timeout is not None:
1895 timeout = max(timeout, _MIN_GET_QUERY_RESULTS_TIMEOUT)
1897 if project is None:
1898 project = self.project
1900 if timeout_ms is not None:
1901 extra_params["timeoutMs"] = timeout_ms
1903 if location is None:
1904 location = self.location
1906 if location is not None:
1907 extra_params["location"] = location
1909 path = "/projects/{}/queries/{}".format(project, job_id)
1911 # This call is typically made in a polling loop that checks whether the
1912 # job is complete (from QueryJob.done(), called ultimately from
1913 # QueryJob.result()). So we don't need to poll here.
1914 span_attributes = {"path": path}
1915 resource = self._call_api(
1916 retry,
1917 span_name="BigQuery.getQueryResults",
1918 span_attributes=span_attributes,
1919 method="GET",
1920 path=path,
1921 query_params=extra_params,
1922 timeout=timeout,
1923 )
1924 return _QueryResults.from_api_repr(resource)
1926 def job_from_resource(
1927 self, resource: dict
1928 ) -> Union[job.CopyJob, job.ExtractJob, job.LoadJob, job.QueryJob, job.UnknownJob]:
1929 """Detect correct job type from resource and instantiate.
1931 Args:
1932 resource (Dict): one job resource from API response
1934 Returns:
1935 The job instance, constructed via the resource.
1936 """
1937 config = resource.get("configuration", {})
1938 if "load" in config:
1939 return job.LoadJob.from_api_repr(resource, self)
1940 elif "copy" in config:
1941 return job.CopyJob.from_api_repr(resource, self)
1942 elif "extract" in config:
1943 return job.ExtractJob.from_api_repr(resource, self)
1944 elif "query" in config:
1945 return job.QueryJob.from_api_repr(resource, self)
1946 return job.UnknownJob.from_api_repr(resource, self)
1948 def create_job(
1949 self,
1950 job_config: dict,
1951 retry: retries.Retry = DEFAULT_RETRY,
1952 timeout: TimeoutType = DEFAULT_TIMEOUT,
1953 ) -> Union[job.LoadJob, job.CopyJob, job.ExtractJob, job.QueryJob]:
1954 """Create a new job.
1955 Args:
1956 job_config (dict): configuration job representation returned from the API.
1958 Keyword Arguments:
1959 retry (Optional[google.api_core.retry.Retry]):
1960 How to retry the RPC.
1961 timeout (Optional[float]):
1962 The number of seconds to wait for the underlying HTTP transport
1963 before using ``retry``.
1965 Returns:
1966 Union[ \
1967 google.cloud.bigquery.job.LoadJob, \
1968 google.cloud.bigquery.job.CopyJob, \
1969 google.cloud.bigquery.job.ExtractJob, \
1970 google.cloud.bigquery.job.QueryJob \
1971 ]:
1972 A new job instance.
1973 """
1975 if "load" in job_config:
1976 load_job_config = google.cloud.bigquery.job.LoadJobConfig.from_api_repr(
1977 job_config
1978 )
1979 destination = _get_sub_prop(job_config, ["load", "destinationTable"])
1980 source_uris = _get_sub_prop(job_config, ["load", "sourceUris"])
1981 destination = TableReference.from_api_repr(destination)
1982 return self.load_table_from_uri(
1983 source_uris,
1984 destination,
1985 job_config=typing.cast(LoadJobConfig, load_job_config),
1986 retry=retry,
1987 timeout=timeout,
1988 )
1989 elif "copy" in job_config:
1990 copy_job_config = google.cloud.bigquery.job.CopyJobConfig.from_api_repr(
1991 job_config
1992 )
1993 destination = _get_sub_prop(job_config, ["copy", "destinationTable"])
1994 destination = TableReference.from_api_repr(destination)
1995 return self.copy_table(
1996 [], # Source table(s) already in job_config resource.
1997 destination,
1998 job_config=typing.cast(CopyJobConfig, copy_job_config),
1999 retry=retry,
2000 timeout=timeout,
2001 )
2002 elif "extract" in job_config:
2003 extract_job_config = (
2004 google.cloud.bigquery.job.ExtractJobConfig.from_api_repr(job_config)
2005 )
2006 source = _get_sub_prop(job_config, ["extract", "sourceTable"])
2007 if source:
2008 source_type = "Table"
2009 source = TableReference.from_api_repr(source)
2010 else:
2011 source = _get_sub_prop(job_config, ["extract", "sourceModel"])
2012 source_type = "Model"
2013 source = ModelReference.from_api_repr(source)
2014 destination_uris = _get_sub_prop(job_config, ["extract", "destinationUris"])
2015 return self.extract_table(
2016 source,
2017 destination_uris,
2018 job_config=typing.cast(ExtractJobConfig, extract_job_config),
2019 retry=retry,
2020 timeout=timeout,
2021 source_type=source_type,
2022 )
2023 elif "query" in job_config:
2024 query_job_config = google.cloud.bigquery.job.QueryJobConfig.from_api_repr(
2025 job_config
2026 )
2027 query = _get_sub_prop(job_config, ["query", "query"])
2028 return self.query(
2029 query,
2030 job_config=typing.cast(QueryJobConfig, query_job_config),
2031 retry=retry,
2032 timeout=timeout,
2033 )
2034 else:
2035 raise TypeError("Invalid job configuration received.")
2037 def get_job(
2038 self,
2039 job_id: Union[str, job.LoadJob, job.CopyJob, job.ExtractJob, job.QueryJob],
2040 project: str = None,
2041 location: str = None,
2042 retry: retries.Retry = DEFAULT_RETRY,
2043 timeout: TimeoutType = DEFAULT_TIMEOUT,
2044 ) -> Union[job.LoadJob, job.CopyJob, job.ExtractJob, job.QueryJob, job.UnknownJob]:
2045 """Fetch a job for the project associated with this client.
2047 See
2048 https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/get
2050 Args:
2051 job_id:
2052 Job identifier.
2054 Keyword Arguments:
2055 project (Optional[str]):
2056 ID of the project which owns the job (defaults to the client's project).
2057 location (Optional[str]):
2058 Location where the job was run. Ignored if ``job_id`` is a job
2059 object.
2060 retry (Optional[google.api_core.retry.Retry]):
2061 How to retry the RPC.
2062 timeout (Optional[float]):
2063 The number of seconds to wait for the underlying HTTP transport
2064 before using ``retry``.
2066 Returns:
2067 Job instance, based on the resource returned by the API.
2068 """
2069 extra_params = {"projection": "full"}
2071 project, location, job_id = _extract_job_reference(
2072 job_id, project=project, location=location
2073 )
2075 if project is None:
2076 project = self.project
2078 if location is None:
2079 location = self.location
2081 if location is not None:
2082 extra_params["location"] = location
2084 path = "/projects/{}/jobs/{}".format(project, job_id)
2086 span_attributes = {"path": path, "job_id": job_id, "location": location}
2088 resource = self._call_api(
2089 retry,
2090 span_name="BigQuery.getJob",
2091 span_attributes=span_attributes,
2092 method="GET",
2093 path=path,
2094 query_params=extra_params,
2095 timeout=timeout,
2096 )
2098 return self.job_from_resource(resource)
2100 def cancel_job(
2101 self,
2102 job_id: str,
2103 project: str = None,
2104 location: str = None,
2105 retry: retries.Retry = DEFAULT_RETRY,
2106 timeout: TimeoutType = DEFAULT_TIMEOUT,
2107 ) -> Union[job.LoadJob, job.CopyJob, job.ExtractJob, job.QueryJob]:
2108 """Attempt to cancel a job from a job ID.
2110 See
2111 https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/cancel
2113 Args:
2114 job_id (Union[ \
2115 str, \
2116 google.cloud.bigquery.job.LoadJob, \
2117 google.cloud.bigquery.job.CopyJob, \
2118 google.cloud.bigquery.job.ExtractJob, \
2119 google.cloud.bigquery.job.QueryJob \
2120 ]): Job identifier.
2122 Keyword Arguments:
2123 project (Optional[str]):
2124 ID of the project which owns the job (defaults to the client's project).
2125 location (Optional[str]):
2126 Location where the job was run. Ignored if ``job_id`` is a job
2127 object.
2128 retry (Optional[google.api_core.retry.Retry]):
2129 How to retry the RPC.
2130 timeout (Optional[float]):
2131 The number of seconds to wait for the underlying HTTP transport
2132 before using ``retry``.
2134 Returns:
2135 Union[ \
2136 google.cloud.bigquery.job.LoadJob, \
2137 google.cloud.bigquery.job.CopyJob, \
2138 google.cloud.bigquery.job.ExtractJob, \
2139 google.cloud.bigquery.job.QueryJob, \
2140 ]:
2141 Job instance, based on the resource returned by the API.
2142 """
2143 extra_params = {"projection": "full"}
2145 project, location, job_id = _extract_job_reference(
2146 job_id, project=project, location=location
2147 )
2149 if project is None:
2150 project = self.project
2152 if location is None:
2153 location = self.location
2155 if location is not None:
2156 extra_params["location"] = location
2158 path = "/projects/{}/jobs/{}/cancel".format(project, job_id)
2160 span_attributes = {"path": path, "job_id": job_id, "location": location}
2162 resource = self._call_api(
2163 retry,
2164 span_name="BigQuery.cancelJob",
2165 span_attributes=span_attributes,
2166 method="POST",
2167 path=path,
2168 query_params=extra_params,
2169 timeout=timeout,
2170 )
2172 job_instance = self.job_from_resource(resource["job"]) # never an UnknownJob
2174 return typing.cast(
2175 Union[job.LoadJob, job.CopyJob, job.ExtractJob, job.QueryJob],
2176 job_instance,
2177 )
2179 def list_jobs(
2180 self,
2181 project: str = None,
2182 parent_job: Optional[Union[QueryJob, str]] = None,
2183 max_results: Optional[int] = None,
2184 page_token: str = None,
2185 all_users: bool = None,
2186 state_filter: str = None,
2187 retry: retries.Retry = DEFAULT_RETRY,
2188 timeout: TimeoutType = DEFAULT_TIMEOUT,
2189 min_creation_time: datetime.datetime = None,
2190 max_creation_time: datetime.datetime = None,
2191 page_size: Optional[int] = None,
2192 ) -> page_iterator.Iterator:
2193 """List jobs for the project associated with this client.
2195 See
2196 https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/list
2198 Args:
2199 project (Optional[str]):
2200 Project ID to use for retreiving datasets. Defaults
2201 to the client's project.
2202 parent_job (Optional[Union[ \
2203 google.cloud.bigquery.job._AsyncJob, \
2204 str, \
2205 ]]):
2206 If set, retrieve only child jobs of the specified parent.
2207 max_results (Optional[int]):
2208 Maximum number of jobs to return.
2209 page_token (Optional[str]):
2210 Opaque marker for the next "page" of jobs. If not
2211 passed, the API will return the first page of jobs. The token
2212 marks the beginning of the iterator to be returned and the
2213 value of the ``page_token`` can be accessed at
2214 ``next_page_token`` of
2215 :class:`~google.api_core.page_iterator.HTTPIterator`.
2216 all_users (Optional[bool]):
2217 If true, include jobs owned by all users in the project.
2218 Defaults to :data:`False`.
2219 state_filter (Optional[str]):
2220 If set, include only jobs matching the given state. One of:
2221 * ``"done"``
2222 * ``"pending"``
2223 * ``"running"``
2224 retry (Optional[google.api_core.retry.Retry]):
2225 How to retry the RPC.
2226 timeout (Optional[float]):
2227 The number of seconds to wait for the underlying HTTP transport
2228 before using ``retry``.
2229 min_creation_time (Optional[datetime.datetime]):
2230 Min value for job creation time. If set, only jobs created
2231 after or at this timestamp are returned. If the datetime has
2232 no time zone assumes UTC time.
2233 max_creation_time (Optional[datetime.datetime]):
2234 Max value for job creation time. If set, only jobs created
2235 before or at this timestamp are returned. If the datetime has
2236 no time zone assumes UTC time.
2237 page_size (Optional[int]):
2238 Maximum number of jobs to return per page.
2240 Returns:
2241 google.api_core.page_iterator.Iterator:
2242 Iterable of job instances.
2243 """
2244 if isinstance(parent_job, job._AsyncJob):
2245 parent_job = parent_job.job_id # pytype: disable=attribute-error
2247 extra_params = {
2248 "allUsers": all_users,
2249 "stateFilter": state_filter,
2250 "minCreationTime": _str_or_none(
2251 google.cloud._helpers._millis_from_datetime(min_creation_time)
2252 ),
2253 "maxCreationTime": _str_or_none(
2254 google.cloud._helpers._millis_from_datetime(max_creation_time)
2255 ),
2256 "projection": "full",
2257 "parentJobId": parent_job,
2258 }
2260 extra_params = {
2261 param: value for param, value in extra_params.items() if value is not None
2262 }
2264 if project is None:
2265 project = self.project
2267 path = "/projects/%s/jobs" % (project,)
2269 span_attributes = {"path": path}
2271 def api_request(*args, **kwargs):
2272 return self._call_api(
2273 retry,
2274 span_name="BigQuery.listJobs",
2275 span_attributes=span_attributes,
2276 *args,
2277 timeout=timeout,
2278 **kwargs,
2279 )
2281 return page_iterator.HTTPIterator(
2282 client=self,
2283 api_request=api_request,
2284 path=path,
2285 item_to_value=_item_to_job,
2286 items_key="jobs",
2287 page_token=page_token,
2288 max_results=max_results,
2289 extra_params=extra_params,
2290 page_size=page_size,
2291 )
2293 def load_table_from_uri(
2294 self,
2295 source_uris: Union[str, Sequence[str]],
2296 destination: Union[Table, TableReference, TableListItem, str],
2297 job_id: str = None,
2298 job_id_prefix: str = None,
2299 location: str = None,
2300 project: str = None,
2301 job_config: LoadJobConfig = None,
2302 retry: retries.Retry = DEFAULT_RETRY,
2303 timeout: TimeoutType = DEFAULT_TIMEOUT,
2304 ) -> job.LoadJob:
2305 """Starts a job for loading data into a table from Cloud Storage.
2307 See
2308 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationload
2310 Args:
2311 source_uris (Union[str, Sequence[str]]):
2312 URIs of data files to be loaded; in format
2313 ``gs://<bucket_name>/<object_name_or_glob>``.
2314 destination (Union[ \
2315 google.cloud.bigquery.table.Table, \
2316 google.cloud.bigquery.table.TableReference, \
2317 google.cloud.bigquery.table.TableListItem, \
2318 str, \
2319 ]):
2320 Table into which data is to be loaded. If a string is passed
2321 in, this method attempts to create a table reference from a
2322 string using
2323 :func:`google.cloud.bigquery.table.TableReference.from_string`.
2325 Keyword Arguments:
2326 job_id (Optional[str]): Name of the job.
2327 job_id_prefix (Optional[str]):
2328 The user-provided prefix for a randomly generated job ID.
2329 This parameter will be ignored if a ``job_id`` is also given.
2330 location (Optional[str]):
2331 Location where to run the job. Must match the location of the
2332 destination table.
2333 project (Optional[str]):
2334 Project ID of the project of where to run the job. Defaults
2335 to the client's project.
2336 job_config (Optional[google.cloud.bigquery.job.LoadJobConfig]):
2337 Extra configuration options for the job.
2338 retry (Optional[google.api_core.retry.Retry]):
2339 How to retry the RPC.
2340 timeout (Optional[float]):
2341 The number of seconds to wait for the underlying HTTP transport
2342 before using ``retry``.
2344 Returns:
2345 google.cloud.bigquery.job.LoadJob: A new load job.
2347 Raises:
2348 TypeError:
2349 If ``job_config`` is not an instance of
2350 :class:`~google.cloud.bigquery.job.LoadJobConfig` class.
2351 """
2352 job_id = _make_job_id(job_id, job_id_prefix)
2354 if project is None:
2355 project = self.project
2357 if location is None:
2358 location = self.location
2360 job_ref = job._JobReference(job_id, project=project, location=location)
2362 if isinstance(source_uris, str):
2363 source_uris = [source_uris]
2365 destination = _table_arg_to_table_ref(destination, default_project=self.project)
2367 if job_config is not None:
2368 _verify_job_config_type(job_config, LoadJobConfig)
2369 else:
2370 job_config = job.LoadJobConfig()
2372 new_job_config = job_config._fill_from_default(self._default_load_job_config)
2374 load_job = job.LoadJob(job_ref, source_uris, destination, self, new_job_config)
2375 load_job._begin(retry=retry, timeout=timeout)
2377 return load_job
2379 def load_table_from_file(
2380 self,
2381 file_obj: IO[bytes],
2382 destination: Union[Table, TableReference, TableListItem, str],
2383 rewind: bool = False,
2384 size: Optional[int] = None,
2385 num_retries: int = _DEFAULT_NUM_RETRIES,
2386 job_id: str = None,
2387 job_id_prefix: str = None,
2388 location: str = None,
2389 project: str = None,
2390 job_config: LoadJobConfig = None,
2391 timeout: ResumableTimeoutType = DEFAULT_TIMEOUT,
2392 ) -> job.LoadJob:
2393 """Upload the contents of this table from a file-like object.
2395 Similar to :meth:`load_table_from_uri`, this method creates, starts and
2396 returns a :class:`~google.cloud.bigquery.job.LoadJob`.
2398 Args:
2399 file_obj:
2400 A file handle opened in binary mode for reading.
2401 destination:
2402 Table into which data is to be loaded. If a string is passed
2403 in, this method attempts to create a table reference from a
2404 string using
2405 :func:`google.cloud.bigquery.table.TableReference.from_string`.
2407 Keyword Arguments:
2408 rewind:
2409 If True, seek to the beginning of the file handle before
2410 reading the file.
2411 size:
2412 The number of bytes to read from the file handle. If size is
2413 ``None`` or large, resumable upload will be used. Otherwise,
2414 multipart upload will be used.
2415 num_retries: Number of upload retries. Defaults to 6.
2416 job_id: Name of the job.
2417 job_id_prefix:
2418 The user-provided prefix for a randomly generated job ID.
2419 This parameter will be ignored if a ``job_id`` is also given.
2420 location:
2421 Location where to run the job. Must match the location of the
2422 destination table.
2423 project:
2424 Project ID of the project of where to run the job. Defaults
2425 to the client's project.
2426 job_config:
2427 Extra configuration options for the job.
2428 timeout:
2429 The number of seconds to wait for the underlying HTTP transport
2430 before using ``retry``. Depending on the retry strategy, a request
2431 may be repeated several times using the same timeout each time.
2433 Can also be passed as a tuple (connect_timeout, read_timeout).
2434 See :meth:`requests.Session.request` documentation for details.
2436 Returns:
2437 google.cloud.bigquery.job.LoadJob: A new load job.
2439 Raises:
2440 ValueError:
2441 If ``size`` is not passed in and can not be determined, or if
2442 the ``file_obj`` can be detected to be a file opened in text
2443 mode.
2445 TypeError:
2446 If ``job_config`` is not an instance of
2447 :class:`~google.cloud.bigquery.job.LoadJobConfig` class.
2448 """
2449 job_id = _make_job_id(job_id, job_id_prefix)
2451 if project is None:
2452 project = self.project
2454 if location is None:
2455 location = self.location
2457 destination = _table_arg_to_table_ref(destination, default_project=self.project)
2458 job_ref = job._JobReference(job_id, project=project, location=location)
2460 if job_config is not None:
2461 _verify_job_config_type(job_config, LoadJobConfig)
2462 else:
2463 job_config = job.LoadJobConfig()
2465 new_job_config = job_config._fill_from_default(self._default_load_job_config)
2467 load_job = job.LoadJob(job_ref, None, destination, self, new_job_config)
2468 job_resource = load_job.to_api_repr()
2470 if rewind:
2471 file_obj.seek(0, os.SEEK_SET)
2473 _check_mode(file_obj)
2475 try:
2476 if size is None or size >= _MAX_MULTIPART_SIZE:
2477 response = self._do_resumable_upload(
2478 file_obj, job_resource, num_retries, timeout, project=project
2479 )
2480 else:
2481 response = self._do_multipart_upload(
2482 file_obj, job_resource, size, num_retries, timeout, project=project
2483 )
2484 except resumable_media.InvalidResponse as exc:
2485 raise exceptions.from_http_response(exc.response)
2487 return typing.cast(LoadJob, self.job_from_resource(response.json()))
2489 def load_table_from_dataframe(
2490 self,
2491 dataframe: "pandas.DataFrame",
2492 destination: Union[Table, TableReference, str],
2493 num_retries: int = _DEFAULT_NUM_RETRIES,
2494 job_id: str = None,
2495 job_id_prefix: str = None,
2496 location: str = None,
2497 project: str = None,
2498 job_config: LoadJobConfig = None,
2499 parquet_compression: str = "snappy",
2500 timeout: ResumableTimeoutType = DEFAULT_TIMEOUT,
2501 ) -> job.LoadJob:
2502 """Upload the contents of a table from a pandas DataFrame.
2504 Similar to :meth:`load_table_from_uri`, this method creates, starts and
2505 returns a :class:`~google.cloud.bigquery.job.LoadJob`.
2507 .. note::
2509 REPEATED fields are NOT supported when using the CSV source format.
2510 They are supported when using the PARQUET source format, but
2511 due to the way they are encoded in the ``parquet`` file,
2512 a mismatch with the existing table schema can occur, so
2513 REPEATED fields are not properly supported when using ``pyarrow<4.0.0``
2514 using the parquet format.
2516 https://github.com/googleapis/python-bigquery/issues/19
2518 Args:
2519 dataframe:
2520 A :class:`~pandas.DataFrame` containing the data to load.
2521 destination:
2522 The destination table to use for loading the data. If it is an
2523 existing table, the schema of the :class:`~pandas.DataFrame`
2524 must match the schema of the destination table. If the table
2525 does not yet exist, the schema is inferred from the
2526 :class:`~pandas.DataFrame`.
2528 If a string is passed in, this method attempts to create a
2529 table reference from a string using
2530 :func:`google.cloud.bigquery.table.TableReference.from_string`.
2532 Keyword Arguments:
2533 num_retries: Number of upload retries.
2534 job_id: Name of the job.
2535 job_id_prefix:
2536 The user-provided prefix for a randomly generated
2537 job ID. This parameter will be ignored if a ``job_id`` is
2538 also given.
2539 location:
2540 Location where to run the job. Must match the location of the
2541 destination table.
2542 project:
2543 Project ID of the project of where to run the job. Defaults
2544 to the client's project.
2545 job_config:
2546 Extra configuration options for the job.
2548 To override the default pandas data type conversions, supply
2549 a value for
2550 :attr:`~google.cloud.bigquery.job.LoadJobConfig.schema` with
2551 column names matching those of the dataframe. The BigQuery
2552 schema is used to determine the correct data type conversion.
2553 Indexes are not loaded.
2555 By default, this method uses the parquet source format. To
2556 override this, supply a value for
2557 :attr:`~google.cloud.bigquery.job.LoadJobConfig.source_format`
2558 with the format name. Currently only
2559 :attr:`~google.cloud.bigquery.job.SourceFormat.CSV` and
2560 :attr:`~google.cloud.bigquery.job.SourceFormat.PARQUET` are
2561 supported.
2562 parquet_compression:
2563 [Beta] The compression method to use if intermittently
2564 serializing ``dataframe`` to a parquet file.
2566 The argument is directly passed as the ``compression``
2567 argument to the underlying ``pyarrow.parquet.write_table()``
2568 method (the default value "snappy" gets converted to uppercase).
2569 https://arrow.apache.org/docs/python/generated/pyarrow.parquet.write_table.html#pyarrow-parquet-write-table
2571 If the job config schema is missing, the argument is directly
2572 passed as the ``compression`` argument to the underlying
2573 ``DataFrame.to_parquet()`` method.
2574 https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_parquet.html#pandas.DataFrame.to_parquet
2575 timeout:
2576 The number of seconds to wait for the underlying HTTP transport
2577 before using ``retry``. Depending on the retry strategy, a request may
2578 be repeated several times using the same timeout each time.
2580 Can also be passed as a tuple (connect_timeout, read_timeout).
2581 See :meth:`requests.Session.request` documentation for details.
2583 Returns:
2584 google.cloud.bigquery.job.LoadJob: A new load job.
2586 Raises:
2587 ValueError:
2588 If a usable parquet engine cannot be found. This method
2589 requires :mod:`pyarrow` to be installed.
2590 TypeError:
2591 If ``job_config`` is not an instance of
2592 :class:`~google.cloud.bigquery.job.LoadJobConfig` class.
2593 """
2594 job_id = _make_job_id(job_id, job_id_prefix)
2596 if job_config is not None:
2597 _verify_job_config_type(job_config, LoadJobConfig)
2598 else:
2599 job_config = job.LoadJobConfig()
2601 new_job_config = job_config._fill_from_default(self._default_load_job_config)
2603 supported_formats = {job.SourceFormat.CSV, job.SourceFormat.PARQUET}
2604 if new_job_config.source_format is None:
2605 # default value
2606 new_job_config.source_format = job.SourceFormat.PARQUET
2608 if (
2609 new_job_config.source_format == job.SourceFormat.PARQUET
2610 and new_job_config.parquet_options is None
2611 ):
2612 parquet_options = ParquetOptions()
2613 # default value
2614 parquet_options.enable_list_inference = True
2615 new_job_config.parquet_options = parquet_options
2617 if new_job_config.source_format not in supported_formats:
2618 raise ValueError(
2619 "Got unexpected source_format: '{}'. Currently, only PARQUET and CSV are supported".format(
2620 new_job_config.source_format
2621 )
2622 )
2624 if pyarrow is None and new_job_config.source_format == job.SourceFormat.PARQUET:
2625 # pyarrow is now the only supported parquet engine.
2626 raise ValueError("This method requires pyarrow to be installed")
2628 if location is None:
2629 location = self.location
2631 # If table schema is not provided, we try to fetch the existing table
2632 # schema, and check if dataframe schema is compatible with it - except
2633 # for WRITE_TRUNCATE jobs, the existing schema does not matter then.
2634 if (
2635 not new_job_config.schema
2636 and new_job_config.write_disposition != job.WriteDisposition.WRITE_TRUNCATE
2637 ):
2638 try:
2639 table = self.get_table(destination)
2640 except core_exceptions.NotFound:
2641 pass
2642 else:
2643 columns_and_indexes = frozenset(
2644 name
2645 for name, _ in _pandas_helpers.list_columns_and_indexes(dataframe)
2646 )
2647 new_job_config.schema = [
2648 # Field description and policy tags are not needed to
2649 # serialize a data frame.
2650 SchemaField(
2651 field.name,
2652 field.field_type,
2653 mode=field.mode,
2654 fields=field.fields,
2655 )
2656 # schema fields not present in the dataframe are not needed
2657 for field in table.schema
2658 if field.name in columns_and_indexes
2659 ]
2661 new_job_config.schema = _pandas_helpers.dataframe_to_bq_schema(
2662 dataframe, new_job_config.schema
2663 )
2665 if not new_job_config.schema:
2666 # the schema could not be fully detected
2667 warnings.warn(
2668 "Schema could not be detected for all columns. Loading from a "
2669 "dataframe without a schema will be deprecated in the future, "
2670 "please provide a schema.",
2671 PendingDeprecationWarning,
2672 stacklevel=2,
2673 )
2675 tmpfd, tmppath = tempfile.mkstemp(
2676 suffix="_job_{}.{}".format(job_id[:8], new_job_config.source_format.lower())
2677 )
2678 os.close(tmpfd)
2680 try:
2682 if new_job_config.source_format == job.SourceFormat.PARQUET:
2683 if _PYARROW_VERSION in _PYARROW_BAD_VERSIONS:
2684 msg = (
2685 "Loading dataframe data in PARQUET format with pyarrow "
2686 f"{_PYARROW_VERSION} can result in data corruption. It is "
2687 "therefore *strongly* advised to use a different pyarrow "
2688 "version or a different source format. "
2689 "See: https://github.com/googleapis/python-bigquery/issues/781"
2690 )
2691 warnings.warn(msg, category=RuntimeWarning)
2693 if new_job_config.schema:
2694 if parquet_compression == "snappy": # adjust the default value
2695 parquet_compression = parquet_compression.upper()
2697 _pandas_helpers.dataframe_to_parquet(
2698 dataframe,
2699 new_job_config.schema,
2700 tmppath,
2701 parquet_compression=parquet_compression,
2702 parquet_use_compliant_nested_type=True,
2703 )
2704 else:
2705 dataframe.to_parquet(
2706 tmppath,
2707 engine="pyarrow",
2708 compression=parquet_compression,
2709 **(
2710 {"use_compliant_nested_type": True}
2711 if _helpers.PYARROW_VERSIONS.use_compliant_nested_type
2712 else {}
2713 ),
2714 )
2716 else:
2718 dataframe.to_csv(
2719 tmppath,
2720 index=False,
2721 header=False,
2722 encoding="utf-8",
2723 float_format="%.17g",
2724 date_format="%Y-%m-%d %H:%M:%S.%f",
2725 )
2727 with open(tmppath, "rb") as tmpfile:
2728 file_size = os.path.getsize(tmppath)
2729 return self.load_table_from_file(
2730 tmpfile,
2731 destination,
2732 num_retries=num_retries,
2733 rewind=True,
2734 size=file_size,
2735 job_id=job_id,
2736 job_id_prefix=job_id_prefix,
2737 location=location,
2738 project=project,
2739 job_config=new_job_config,
2740 timeout=timeout,
2741 )
2743 finally:
2744 os.remove(tmppath)
2746 def load_table_from_json(
2747 self,
2748 json_rows: Iterable[Dict[str, Any]],
2749 destination: Union[Table, TableReference, TableListItem, str],
2750 num_retries: int = _DEFAULT_NUM_RETRIES,
2751 job_id: str = None,
2752 job_id_prefix: str = None,
2753 location: str = None,
2754 project: str = None,
2755 job_config: LoadJobConfig = None,
2756 timeout: ResumableTimeoutType = DEFAULT_TIMEOUT,
2757 ) -> job.LoadJob:
2758 """Upload the contents of a table from a JSON string or dict.
2760 Args:
2761 json_rows (Iterable[Dict[str, Any]]):
2762 Row data to be inserted. Keys must match the table schema fields
2763 and values must be JSON-compatible representations.
2765 .. note::
2767 If your data is already a newline-delimited JSON string,
2768 it is best to wrap it into a file-like object and pass it
2769 to :meth:`~google.cloud.bigquery.client.Client.load_table_from_file`::
2771 import io
2772 from google.cloud import bigquery
2774 data = u'{"foo": "bar"}'
2775 data_as_file = io.StringIO(data)
2777 client = bigquery.Client()
2778 client.load_table_from_file(data_as_file, ...)
2780 destination:
2781 Table into which data is to be loaded. If a string is passed
2782 in, this method attempts to create a table reference from a
2783 string using
2784 :func:`google.cloud.bigquery.table.TableReference.from_string`.
2786 Keyword Arguments:
2787 num_retries: Number of upload retries.
2788 job_id: Name of the job.
2789 job_id_prefix:
2790 The user-provided prefix for a randomly generated job ID.
2791 This parameter will be ignored if a ``job_id`` is also given.
2792 location:
2793 Location where to run the job. Must match the location of the
2794 destination table.
2795 project:
2796 Project ID of the project of where to run the job. Defaults
2797 to the client's project.
2798 job_config:
2799 Extra configuration options for the job. The ``source_format``
2800 setting is always set to
2801 :attr:`~google.cloud.bigquery.job.SourceFormat.NEWLINE_DELIMITED_JSON`.
2802 timeout:
2803 The number of seconds to wait for the underlying HTTP transport
2804 before using ``retry``. Depending on the retry strategy, a request may
2805 be repeated several times using the same timeout each time.
2807 Can also be passed as a tuple (connect_timeout, read_timeout).
2808 See :meth:`requests.Session.request` documentation for details.
2810 Returns:
2811 google.cloud.bigquery.job.LoadJob: A new load job.
2813 Raises:
2814 TypeError:
2815 If ``job_config`` is not an instance of
2816 :class:`~google.cloud.bigquery.job.LoadJobConfig` class.
2817 """
2818 job_id = _make_job_id(job_id, job_id_prefix)
2820 if job_config is not None:
2821 _verify_job_config_type(job_config, LoadJobConfig)
2822 else:
2823 job_config = job.LoadJobConfig()
2825 new_job_config = job_config._fill_from_default(self._default_load_job_config)
2827 new_job_config.source_format = job.SourceFormat.NEWLINE_DELIMITED_JSON
2829 if new_job_config.schema is None:
2830 new_job_config.autodetect = True
2832 if project is None:
2833 project = self.project
2835 if location is None:
2836 location = self.location
2838 destination = _table_arg_to_table_ref(destination, default_project=self.project)
2840 data_str = "\n".join(json.dumps(item, ensure_ascii=False) for item in json_rows)
2841 encoded_str = data_str.encode()
2842 data_file = io.BytesIO(encoded_str)
2843 return self.load_table_from_file(
2844 data_file,
2845 destination,
2846 size=len(encoded_str),
2847 num_retries=num_retries,
2848 job_id=job_id,
2849 job_id_prefix=job_id_prefix,
2850 location=location,
2851 project=project,
2852 job_config=new_job_config,
2853 timeout=timeout,
2854 )
2856 def _do_resumable_upload(
2857 self,
2858 stream: IO[bytes],
2859 metadata: Mapping[str, str],
2860 num_retries: int,
2861 timeout: Optional[ResumableTimeoutType],
2862 project: Optional[str] = None,
2863 ) -> "requests.Response":
2864 """Perform a resumable upload.
2866 Args:
2867 stream: A bytes IO object open for reading.
2869 metadata: The metadata associated with the upload.
2871 num_retries:
2872 Number of upload retries. (Deprecated: This
2873 argument will be removed in a future release.)
2875 timeout:
2876 The number of seconds to wait for the underlying HTTP transport
2877 before using ``retry``. Depending on the retry strategy, a request may
2878 be repeated several times using the same timeout each time.
2880 Can also be passed as a tuple (connect_timeout, read_timeout).
2881 See :meth:`requests.Session.request` documentation for details.
2883 project:
2884 Project ID of the project of where to run the upload. Defaults
2885 to the client's project.
2887 Returns:
2888 The "200 OK" response object returned after the final chunk
2889 is uploaded.
2890 """
2891 upload, transport = self._initiate_resumable_upload(
2892 stream, metadata, num_retries, timeout, project=project
2893 )
2895 while not upload.finished:
2896 response = upload.transmit_next_chunk(transport, timeout=timeout)
2898 return response
2900 def _initiate_resumable_upload(
2901 self,
2902 stream: IO[bytes],
2903 metadata: Mapping[str, str],
2904 num_retries: int,
2905 timeout: Optional[ResumableTimeoutType],
2906 project: Optional[str] = None,
2907 ):
2908 """Initiate a resumable upload.
2910 Args:
2911 stream: A bytes IO object open for reading.
2913 metadata: The metadata associated with the upload.
2915 num_retries:
2916 Number of upload retries. (Deprecated: This
2917 argument will be removed in a future release.)
2919 timeout:
2920 The number of seconds to wait for the underlying HTTP transport
2921 before using ``retry``. Depending on the retry strategy, a request may
2922 be repeated several times using the same timeout each time.
2924 Can also be passed as a tuple (connect_timeout, read_timeout).
2925 See :meth:`requests.Session.request` documentation for details.
2927 project:
2928 Project ID of the project of where to run the upload. Defaults
2929 to the client's project.
2931 Returns:
2932 Tuple:
2933 Pair of
2935 * The :class:`~google.resumable_media.requests.ResumableUpload`
2936 that was created
2937 * The ``transport`` used to initiate the upload.
2938 """
2939 chunk_size = _DEFAULT_CHUNKSIZE
2940 transport = self._http
2941 headers = _get_upload_headers(self._connection.user_agent)
2943 if project is None:
2944 project = self.project
2945 # TODO: Increase the minimum version of google-cloud-core to 1.6.0
2946 # and remove this logic. See:
2947 # https://github.com/googleapis/python-bigquery/issues/509
2948 hostname = (
2949 self._connection.API_BASE_URL
2950 if not hasattr(self._connection, "get_api_base_url_for_mtls")
2951 else self._connection.get_api_base_url_for_mtls()
2952 )
2953 upload_url = _RESUMABLE_URL_TEMPLATE.format(host=hostname, project=project)
2955 # TODO: modify ResumableUpload to take a retry.Retry object
2956 # that it can use for the initial RPC.
2957 upload = ResumableUpload(upload_url, chunk_size, headers=headers)
2959 if num_retries is not None:
2960 upload._retry_strategy = resumable_media.RetryStrategy(
2961 max_retries=num_retries
2962 )
2964 upload.initiate(
2965 transport,
2966 stream,
2967 metadata,
2968 _GENERIC_CONTENT_TYPE,
2969 stream_final=False,
2970 timeout=timeout,
2971 )
2973 return upload, transport
2975 def _do_multipart_upload(
2976 self,
2977 stream: IO[bytes],
2978 metadata: Mapping[str, str],
2979 size: int,
2980 num_retries: int,
2981 timeout: Optional[ResumableTimeoutType],
2982 project: Optional[str] = None,
2983 ):
2984 """Perform a multipart upload.
2986 Args:
2987 stream: A bytes IO object open for reading.
2989 metadata: The metadata associated with the upload.
2991 size:
2992 The number of bytes to be uploaded (which will be read
2993 from ``stream``). If not provided, the upload will be
2994 concluded once ``stream`` is exhausted (or :data:`None`).
2996 num_retries:
2997 Number of upload retries. (Deprecated: This
2998 argument will be removed in a future release.)
3000 timeout:
3001 The number of seconds to wait for the underlying HTTP transport
3002 before using ``retry``. Depending on the retry strategy, a request may
3003 be repeated several times using the same timeout each time.
3005 Can also be passed as a tuple (connect_timeout, read_timeout).
3006 See :meth:`requests.Session.request` documentation for details.
3008 project:
3009 Project ID of the project of where to run the upload. Defaults
3010 to the client's project.
3012 Returns:
3013 requests.Response:
3014 The "200 OK" response object returned after the multipart
3015 upload request.
3017 Raises:
3018 ValueError:
3019 if the ``stream`` has fewer than ``size``
3020 bytes remaining.
3021 """
3022 data = stream.read(size)
3023 if len(data) < size:
3024 msg = _READ_LESS_THAN_SIZE.format(size, len(data))
3025 raise ValueError(msg)
3027 headers = _get_upload_headers(self._connection.user_agent)
3029 if project is None:
3030 project = self.project
3032 # TODO: Increase the minimum version of google-cloud-core to 1.6.0
3033 # and remove this logic. See:
3034 # https://github.com/googleapis/python-bigquery/issues/509
3035 hostname = (
3036 self._connection.API_BASE_URL
3037 if not hasattr(self._connection, "get_api_base_url_for_mtls")
3038 else self._connection.get_api_base_url_for_mtls()
3039 )
3040 upload_url = _MULTIPART_URL_TEMPLATE.format(host=hostname, project=project)
3041 upload = MultipartUpload(upload_url, headers=headers)
3043 if num_retries is not None:
3044 upload._retry_strategy = resumable_media.RetryStrategy(
3045 max_retries=num_retries
3046 )
3048 response = upload.transmit(
3049 self._http, data, metadata, _GENERIC_CONTENT_TYPE, timeout=timeout
3050 )
3052 return response
3054 def copy_table(
3055 self,
3056 sources: Union[
3057 Table,
3058 TableReference,
3059 TableListItem,
3060 str,
3061 Sequence[Union[Table, TableReference, TableListItem, str]],
3062 ],
3063 destination: Union[Table, TableReference, TableListItem, str],
3064 job_id: str = None,
3065 job_id_prefix: str = None,
3066 location: str = None,
3067 project: str = None,
3068 job_config: CopyJobConfig = None,
3069 retry: retries.Retry = DEFAULT_RETRY,
3070 timeout: TimeoutType = DEFAULT_TIMEOUT,
3071 ) -> job.CopyJob:
3072 """Copy one or more tables to another table.
3074 See
3075 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationtablecopy
3077 Args:
3078 sources (Union[ \
3079 google.cloud.bigquery.table.Table, \
3080 google.cloud.bigquery.table.TableReference, \
3081 google.cloud.bigquery.table.TableListItem, \
3082 str, \
3083 Sequence[ \
3084 Union[ \
3085 google.cloud.bigquery.table.Table, \
3086 google.cloud.bigquery.table.TableReference, \
3087 google.cloud.bigquery.table.TableListItem, \
3088 str, \
3089 ] \
3090 ], \
3091 ]):
3092 Table or tables to be copied.
3093 destination (Union[ \
3094 google.cloud.bigquery.table.Table, \
3095 google.cloud.bigquery.table.TableReference, \
3096 google.cloud.bigquery.table.TableListItem, \
3097 str, \
3098 ]):
3099 Table into which data is to be copied.
3101 Keyword Arguments:
3102 job_id (Optional[str]): The ID of the job.
3103 job_id_prefix (Optional[str]):
3104 The user-provided prefix for a randomly generated job ID.
3105 This parameter will be ignored if a ``job_id`` is also given.
3106 location (Optional[str]):
3107 Location where to run the job. Must match the location of any
3108 source table as well as the destination table.
3109 project (Optional[str]):
3110 Project ID of the project of where to run the job. Defaults
3111 to the client's project.
3112 job_config (Optional[google.cloud.bigquery.job.CopyJobConfig]):
3113 Extra configuration options for the job.
3114 retry (Optional[google.api_core.retry.Retry]):
3115 How to retry the RPC.
3116 timeout (Optional[float]):
3117 The number of seconds to wait for the underlying HTTP transport
3118 before using ``retry``.
3120 Returns:
3121 google.cloud.bigquery.job.CopyJob: A new copy job instance.
3123 Raises:
3124 TypeError:
3125 If ``job_config`` is not an instance of :class:`~google.cloud.bigquery.job.CopyJobConfig`
3126 class.
3127 """
3128 job_id = _make_job_id(job_id, job_id_prefix)
3130 if project is None:
3131 project = self.project
3133 if location is None:
3134 location = self.location
3136 job_ref = job._JobReference(job_id, project=project, location=location)
3138 # sources can be one of many different input types. (string, Table,
3139 # TableReference, or a sequence of any of those.) Convert them all to a
3140 # list of TableReferences.
3141 #
3142 # _table_arg_to_table_ref leaves lists unmodified.
3143 sources = _table_arg_to_table_ref(sources, default_project=self.project)
3145 if not isinstance(sources, collections_abc.Sequence):
3146 sources = [sources]
3148 sources = [
3149 _table_arg_to_table_ref(source, default_project=self.project)
3150 for source in sources
3151 ]
3153 destination = _table_arg_to_table_ref(destination, default_project=self.project)
3155 if job_config:
3156 _verify_job_config_type(job_config, google.cloud.bigquery.job.CopyJobConfig)
3157 job_config = copy.deepcopy(job_config)
3159 copy_job = job.CopyJob(
3160 job_ref, sources, destination, client=self, job_config=job_config
3161 )
3162 copy_job._begin(retry=retry, timeout=timeout)
3164 return copy_job
3166 def extract_table(
3167 self,
3168 source: Union[Table, TableReference, TableListItem, Model, ModelReference, str],
3169 destination_uris: Union[str, Sequence[str]],
3170 job_id: str = None,
3171 job_id_prefix: str = None,
3172 location: str = None,
3173 project: str = None,
3174 job_config: ExtractJobConfig = None,
3175 retry: retries.Retry = DEFAULT_RETRY,
3176 timeout: TimeoutType = DEFAULT_TIMEOUT,
3177 source_type: str = "Table",
3178 ) -> job.ExtractJob:
3179 """Start a job to extract a table into Cloud Storage files.
3181 See
3182 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationextract
3184 Args:
3185 source (Union[ \
3186 google.cloud.bigquery.table.Table, \
3187 google.cloud.bigquery.table.TableReference, \
3188 google.cloud.bigquery.table.TableListItem, \
3189 google.cloud.bigquery.model.Model, \
3190 google.cloud.bigquery.model.ModelReference, \
3191 src, \
3192 ]):
3193 Table or Model to be extracted.
3194 destination_uris (Union[str, Sequence[str]]):
3195 URIs of Cloud Storage file(s) into which table data is to be
3196 extracted; in format
3197 ``gs://<bucket_name>/<object_name_or_glob>``.
3199 Keyword Arguments:
3200 job_id (Optional[str]): The ID of the job.
3201 job_id_prefix (Optional[str]):
3202 The user-provided prefix for a randomly generated job ID.
3203 This parameter will be ignored if a ``job_id`` is also given.
3204 location (Optional[str]):
3205 Location where to run the job. Must match the location of the
3206 source table.
3207 project (Optional[str]):
3208 Project ID of the project of where to run the job. Defaults
3209 to the client's project.
3210 job_config (Optional[google.cloud.bigquery.job.ExtractJobConfig]):
3211 Extra configuration options for the job.
3212 retry (Optional[google.api_core.retry.Retry]):
3213 How to retry the RPC.
3214 timeout (Optional[float]):
3215 The number of seconds to wait for the underlying HTTP transport
3216 before using ``retry``.
3217 source_type (Optional[str]):
3218 Type of source to be extracted.``Table`` or ``Model``. Defaults to ``Table``.
3219 Returns:
3220 google.cloud.bigquery.job.ExtractJob: A new extract job instance.
3222 Raises:
3223 TypeError:
3224 If ``job_config`` is not an instance of :class:`~google.cloud.bigquery.job.ExtractJobConfig`
3225 class.
3226 ValueError:
3227 If ``source_type`` is not among ``Table``,``Model``.
3228 """
3229 job_id = _make_job_id(job_id, job_id_prefix)
3231 if project is None:
3232 project = self.project
3234 if location is None:
3235 location = self.location
3237 job_ref = job._JobReference(job_id, project=project, location=location)
3238 src = source_type.lower()
3239 if src == "table":
3240 source = _table_arg_to_table_ref(source, default_project=self.project)
3241 elif src == "model":
3242 source = _model_arg_to_model_ref(source, default_project=self.project)
3243 else:
3244 raise ValueError(
3245 "Cannot pass `{}` as a ``source_type``, pass Table or Model".format(
3246 source_type
3247 )
3248 )
3250 if isinstance(destination_uris, str):
3251 destination_uris = [destination_uris]
3253 if job_config:
3254 _verify_job_config_type(
3255 job_config, google.cloud.bigquery.job.ExtractJobConfig
3256 )
3257 job_config = copy.deepcopy(job_config)
3259 extract_job = job.ExtractJob(
3260 job_ref, source, destination_uris, client=self, job_config=job_config
3261 )
3262 extract_job._begin(retry=retry, timeout=timeout)
3264 return extract_job
3266 def query(
3267 self,
3268 query: str,
3269 job_config: QueryJobConfig = None,
3270 job_id: str = None,
3271 job_id_prefix: str = None,
3272 location: str = None,
3273 project: str = None,
3274 retry: retries.Retry = DEFAULT_RETRY,
3275 timeout: TimeoutType = DEFAULT_TIMEOUT,
3276 job_retry: retries.Retry = DEFAULT_JOB_RETRY,
3277 api_method: Union[str, enums.QueryApiMethod] = enums.QueryApiMethod.INSERT,
3278 ) -> job.QueryJob:
3279 """Run a SQL query.
3281 See
3282 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationquery
3284 Args:
3285 query (str):
3286 SQL query to be executed. Defaults to the standard SQL
3287 dialect. Use the ``job_config`` parameter to change dialects.
3289 Keyword Arguments:
3290 job_config (Optional[google.cloud.bigquery.job.QueryJobConfig]):
3291 Extra configuration options for the job.
3292 To override any options that were previously set in
3293 the ``default_query_job_config`` given to the
3294 ``Client`` constructor, manually set those options to ``None``,
3295 or whatever value is preferred.
3296 job_id (Optional[str]): ID to use for the query job.
3297 job_id_prefix (Optional[str]):
3298 The prefix to use for a randomly generated job ID. This parameter
3299 will be ignored if a ``job_id`` is also given.
3300 location (Optional[str]):
3301 Location where to run the job. Must match the location of the
3302 table used in the query as well as the destination table.
3303 project (Optional[str]):
3304 Project ID of the project of where to run the job. Defaults
3305 to the client's project.
3306 retry (Optional[google.api_core.retry.Retry]):
3307 How to retry the RPC. This only applies to making RPC
3308 calls. It isn't used to retry failed jobs. This has
3309 a reasonable default that should only be overridden
3310 with care.
3311 timeout (Optional[float]):
3312 The number of seconds to wait for the underlying HTTP transport
3313 before using ``retry``.
3314 job_retry (Optional[google.api_core.retry.Retry]):
3315 How to retry failed jobs. The default retries
3316 rate-limit-exceeded errors. Passing ``None`` disables
3317 job retry.
3319 Not all jobs can be retried. If ``job_id`` is
3320 provided, then the job returned by the query will not
3321 be retryable, and an exception will be raised if a
3322 non-``None`` (and non-default) value for ``job_retry``
3323 is also provided.
3325 Note that errors aren't detected until ``result()`` is
3326 called on the job returned. The ``job_retry``
3327 specified here becomes the default ``job_retry`` for
3328 ``result()``, where it can also be specified.
3329 api_method (Union[str, enums.QueryApiMethod]):
3330 Method with which to start the query job.
3332 See :class:`google.cloud.bigquery.enums.QueryApiMethod` for
3333 details on the difference between the query start methods.
3335 Returns:
3336 google.cloud.bigquery.job.QueryJob: A new query job instance.
3338 Raises:
3339 TypeError:
3340 If ``job_config`` is not an instance of
3341 :class:`~google.cloud.bigquery.job.QueryJobConfig`
3342 class, or if both ``job_id`` and non-``None`` non-default
3343 ``job_retry`` are provided.
3344 """
3345 job_id_given = job_id is not None
3346 if (
3347 job_id_given
3348 and job_retry is not None
3349 and job_retry is not DEFAULT_JOB_RETRY
3350 ):
3351 raise TypeError(
3352 "`job_retry` was provided, but the returned job is"
3353 " not retryable, because a custom `job_id` was"
3354 " provided."
3355 )
3357 if job_id_given and api_method == enums.QueryApiMethod.QUERY:
3358 raise TypeError(
3359 "`job_id` was provided, but the 'QUERY' `api_method` was requested."
3360 )
3362 if project is None:
3363 project = self.project
3365 if location is None:
3366 location = self.location
3368 if self._default_query_job_config:
3369 if job_config:
3370 _verify_job_config_type(
3371 job_config, google.cloud.bigquery.job.QueryJobConfig
3372 )
3373 # anything that's not defined on the incoming
3374 # that is in the default,
3375 # should be filled in with the default
3376 # the incoming therefore has precedence
3377 #
3378 # Note that _fill_from_default doesn't mutate the receiver
3379 job_config = job_config._fill_from_default(
3380 self._default_query_job_config
3381 )
3382 else:
3383 _verify_job_config_type(
3384 self._default_query_job_config,
3385 google.cloud.bigquery.job.QueryJobConfig,
3386 )
3387 job_config = self._default_query_job_config
3389 # Note that we haven't modified the original job_config (or
3390 # _default_query_job_config) up to this point.
3391 if api_method == enums.QueryApiMethod.QUERY:
3392 return _job_helpers.query_jobs_query(
3393 self,
3394 query,
3395 job_config,
3396 location,
3397 project,
3398 retry,
3399 timeout,
3400 job_retry,
3401 )
3402 elif api_method == enums.QueryApiMethod.INSERT:
3403 return _job_helpers.query_jobs_insert(
3404 self,
3405 query,
3406 job_config,
3407 job_id,
3408 job_id_prefix,
3409 location,
3410 project,
3411 retry,
3412 timeout,
3413 job_retry,
3414 )
3415 else:
3416 raise ValueError(f"Got unexpected value for api_method: {repr(api_method)}")
3418 def insert_rows(
3419 self,
3420 table: Union[Table, TableReference, str],
3421 rows: Union[Iterable[Tuple], Iterable[Mapping[str, Any]]],
3422 selected_fields: Sequence[SchemaField] = None,
3423 **kwargs,
3424 ) -> Sequence[Dict[str, Any]]:
3425 """Insert rows into a table via the streaming API.
3427 See
3428 https://cloud.google.com/bigquery/docs/reference/rest/v2/tabledata/insertAll
3430 BigQuery will reject insertAll payloads that exceed a defined limit (10MB).
3431 Additionally, if a payload vastly exceeds this limit, the request is rejected
3432 by the intermediate architecture, which returns a 413 (Payload Too Large) status code.
3435 See
3436 https://cloud.google.com/bigquery/quotas#streaming_inserts
3438 Args:
3439 table (Union[ \
3440 google.cloud.bigquery.table.Table, \
3441 google.cloud.bigquery.table.TableReference, \
3442 str, \
3443 ]):
3444 The destination table for the row data, or a reference to it.
3445 rows (Union[Sequence[Tuple], Sequence[Dict]]):
3446 Row data to be inserted. If a list of tuples is given, each
3447 tuple should contain data for each schema field on the
3448 current table and in the same order as the schema fields. If
3449 a list of dictionaries is given, the keys must include all
3450 required fields in the schema. Keys which do not correspond
3451 to a field in the schema are ignored.
3452 selected_fields (Sequence[google.cloud.bigquery.schema.SchemaField]):
3453 The fields to return. Required if ``table`` is a
3454 :class:`~google.cloud.bigquery.table.TableReference`.
3455 kwargs (dict):
3456 Keyword arguments to
3457 :meth:`~google.cloud.bigquery.client.Client.insert_rows_json`.
3459 Returns:
3460 Sequence[Mappings]:
3461 One mapping per row with insert errors: the "index" key
3462 identifies the row, and the "errors" key contains a list of
3463 the mappings describing one or more problems with the row.
3465 Raises:
3466 ValueError: if table's schema is not set or `rows` is not a `Sequence`.
3467 """
3468 if not isinstance(rows, (collections_abc.Sequence, collections_abc.Iterator)):
3469 raise TypeError("rows argument should be a sequence of dicts or tuples")
3471 table = _table_arg_to_table(table, default_project=self.project)
3473 if not isinstance(table, Table):
3474 raise TypeError(_NEED_TABLE_ARGUMENT)
3476 schema = table.schema
3478 # selected_fields can override the table schema.
3479 if selected_fields is not None:
3480 schema = selected_fields
3482 if len(schema) == 0:
3483 raise ValueError(
3484 (
3485 "Could not determine schema for table '{}'. Call client.get_table() "
3486 "or pass in a list of schema fields to the selected_fields argument."
3487 ).format(table)
3488 )
3490 json_rows = [_record_field_to_json(schema, row) for row in rows]
3492 return self.insert_rows_json(table, json_rows, **kwargs)
3494 def insert_rows_from_dataframe(
3495 self,
3496 table: Union[Table, TableReference, str],
3497 dataframe,
3498 selected_fields: Sequence[SchemaField] = None,
3499 chunk_size: int = 500,
3500 **kwargs: Dict,
3501 ) -> Sequence[Sequence[dict]]:
3502 """Insert rows into a table from a dataframe via the streaming API.
3504 BigQuery will reject insertAll payloads that exceed a defined limit (10MB).
3505 Additionally, if a payload vastly exceeds this limit, the request is rejected
3506 by the intermediate architecture, which returns a 413 (Payload Too Large) status code.
3508 See
3509 https://cloud.google.com/bigquery/quotas#streaming_inserts
3511 Args:
3512 table (Union[ \
3513 google.cloud.bigquery.table.Table, \
3514 google.cloud.bigquery.table.TableReference, \
3515 str, \
3516 ]):
3517 The destination table for the row data, or a reference to it.
3518 dataframe (pandas.DataFrame):
3519 A :class:`~pandas.DataFrame` containing the data to load. Any
3520 ``NaN`` values present in the dataframe are omitted from the
3521 streaming API request(s).
3522 selected_fields (Sequence[google.cloud.bigquery.schema.SchemaField]):
3523 The fields to return. Required if ``table`` is a
3524 :class:`~google.cloud.bigquery.table.TableReference`.
3525 chunk_size (int):
3526 The number of rows to stream in a single chunk. Must be positive.
3527 kwargs (Dict):
3528 Keyword arguments to
3529 :meth:`~google.cloud.bigquery.client.Client.insert_rows_json`.
3531 Returns:
3532 Sequence[Sequence[Mappings]]:
3533 A list with insert errors for each insert chunk. Each element
3534 is a list containing one mapping per row with insert errors:
3535 the "index" key identifies the row, and the "errors" key
3536 contains a list of the mappings describing one or more problems
3537 with the row.
3539 Raises:
3540 ValueError: if table's schema is not set
3541 """
3542 insert_results = []
3544 chunk_count = int(math.ceil(len(dataframe) / chunk_size))
3545 rows_iter = _pandas_helpers.dataframe_to_json_generator(dataframe)
3547 for _ in range(chunk_count):
3548 rows_chunk = itertools.islice(rows_iter, chunk_size)
3549 result = self.insert_rows(table, rows_chunk, selected_fields, **kwargs)
3550 insert_results.append(result)
3552 return insert_results
3554 def insert_rows_json(
3555 self,
3556 table: Union[Table, TableReference, TableListItem, str],
3557 json_rows: Sequence[Mapping[str, Any]],
3558 row_ids: Union[
3559 Iterable[Optional[str]], AutoRowIDs, None
3560 ] = AutoRowIDs.GENERATE_UUID,
3561 skip_invalid_rows: bool = None,
3562 ignore_unknown_values: bool = None,
3563 template_suffix: str = None,
3564 retry: retries.Retry = DEFAULT_RETRY,
3565 timeout: TimeoutType = DEFAULT_TIMEOUT,
3566 ) -> Sequence[dict]:
3567 """Insert rows into a table without applying local type conversions.
3569 See
3570 https://cloud.google.com/bigquery/docs/reference/rest/v2/tabledata/insertAll
3572 BigQuery will reject insertAll payloads that exceed a defined limit (10MB).
3573 Additionally, if a payload vastly exceeds this limit, the request is rejected
3574 by the intermediate architecture, which returns a 413 (Payload Too Large) status code.
3576 See
3577 https://cloud.google.com/bigquery/quotas#streaming_inserts
3579 Args:
3580 table (Union[ \
3581 google.cloud.bigquery.table.Table \
3582 google.cloud.bigquery.table.TableReference, \
3583 google.cloud.bigquery.table.TableListItem, \
3584 str \
3585 ]):
3586 The destination table for the row data, or a reference to it.
3587 json_rows (Sequence[Dict]):
3588 Row data to be inserted. Keys must match the table schema fields
3589 and values must be JSON-compatible representations.
3590 row_ids (Union[Iterable[str], AutoRowIDs, None]):
3591 Unique IDs, one per row being inserted. An ID can also be
3592 ``None``, indicating that an explicit insert ID should **not**
3593 be used for that row. If the argument is omitted altogether,
3594 unique IDs are created automatically.
3596 .. versionchanged:: 2.21.0
3597 Can also be an iterable, not just a sequence, or an
3598 :class:`AutoRowIDs` enum member.
3600 .. deprecated:: 2.21.0
3601 Passing ``None`` to explicitly request autogenerating insert IDs is
3602 deprecated, use :attr:`AutoRowIDs.GENERATE_UUID` instead.
3604 skip_invalid_rows (Optional[bool]):
3605 Insert all valid rows of a request, even if invalid rows exist.
3606 The default value is ``False``, which causes the entire request
3607 to fail if any invalid rows exist.
3608 ignore_unknown_values (Optional[bool]):
3609 Accept rows that contain values that do not match the schema.
3610 The unknown values are ignored. Default is ``False``, which
3611 treats unknown values as errors.
3612 template_suffix (Optional[str]):
3613 Treat ``name`` as a template table and provide a suffix.
3614 BigQuery will create the table ``<name> + <template_suffix>``
3615 based on the schema of the template table. See
3616 https://cloud.google.com/bigquery/streaming-data-into-bigquery#template-tables
3617 retry (Optional[google.api_core.retry.Retry]):
3618 How to retry the RPC.
3619 timeout (Optional[float]):
3620 The number of seconds to wait for the underlying HTTP transport
3621 before using ``retry``.
3623 Returns:
3624 Sequence[Mappings]:
3625 One mapping per row with insert errors: the "index" key
3626 identifies the row, and the "errors" key contains a list of
3627 the mappings describing one or more problems with the row.
3629 Raises:
3630 TypeError: if `json_rows` is not a `Sequence`.
3631 """
3632 if not isinstance(
3633 json_rows, (collections_abc.Sequence, collections_abc.Iterator)
3634 ):
3635 raise TypeError("json_rows argument should be a sequence of dicts")
3636 # Convert table to just a reference because unlike insert_rows,
3637 # insert_rows_json doesn't need the table schema. It's not doing any
3638 # type conversions.
3639 table = _table_arg_to_table_ref(table, default_project=self.project)
3640 rows_info: List[Any] = []
3641 data: Dict[str, Any] = {"rows": rows_info}
3643 if row_ids is None:
3644 warnings.warn(
3645 "Passing None for row_ids is deprecated. To explicitly request "
3646 "autogenerated insert IDs, use AutoRowIDs.GENERATE_UUID instead",
3647 category=DeprecationWarning,
3648 )
3649 row_ids = AutoRowIDs.GENERATE_UUID
3651 if not isinstance(row_ids, AutoRowIDs):
3652 try:
3653 row_ids_iter = iter(row_ids)
3654 except TypeError:
3655 msg = "row_ids is neither an iterable nor an AutoRowIDs enum member"
3656 raise TypeError(msg)
3658 for i, row in enumerate(json_rows):
3659 info: Dict[str, Any] = {"json": row}
3661 if row_ids is AutoRowIDs.GENERATE_UUID:
3662 info["insertId"] = str(uuid.uuid4())
3663 elif row_ids is AutoRowIDs.DISABLED:
3664 info["insertId"] = None
3665 else:
3666 try:
3667 insert_id = next(row_ids_iter)
3668 except StopIteration:
3669 msg = f"row_ids did not generate enough IDs, error at index {i}"
3670 raise ValueError(msg)
3671 else:
3672 info["insertId"] = insert_id
3674 rows_info.append(info)
3676 if skip_invalid_rows is not None:
3677 data["skipInvalidRows"] = skip_invalid_rows
3679 if ignore_unknown_values is not None:
3680 data["ignoreUnknownValues"] = ignore_unknown_values
3682 if template_suffix is not None:
3683 data["templateSuffix"] = template_suffix
3685 path = "%s/insertAll" % table.path
3686 # We can always retry, because every row has an insert ID.
3687 span_attributes = {"path": path}
3688 response = self._call_api(
3689 retry,
3690 span_name="BigQuery.insertRowsJson",
3691 span_attributes=span_attributes,
3692 method="POST",
3693 path=path,
3694 data=data,
3695 timeout=timeout,
3696 )
3697 errors = []
3699 for error in response.get("insertErrors", ()):
3700 errors.append({"index": int(error["index"]), "errors": error["errors"]})
3702 return errors
3704 def list_partitions(
3705 self,
3706 table: Union[Table, TableReference, TableListItem, str],
3707 retry: retries.Retry = DEFAULT_RETRY,
3708 timeout: TimeoutType = DEFAULT_TIMEOUT,
3709 ) -> Sequence[str]:
3710 """List the partitions in a table.
3712 Args:
3713 table (Union[ \
3714 google.cloud.bigquery.table.Table, \
3715 google.cloud.bigquery.table.TableReference, \
3716 google.cloud.bigquery.table.TableListItem, \
3717 str, \
3718 ]):
3719 The table or reference from which to get partition info
3720 retry (Optional[google.api_core.retry.Retry]):
3721 How to retry the RPC.
3722 timeout (Optional[float]):
3723 The number of seconds to wait for the underlying HTTP transport
3724 before using ``retry``.
3725 If multiple requests are made under the hood, ``timeout``
3726 applies to each individual request.
3728 Returns:
3729 List[str]:
3730 A list of the partition ids present in the partitioned table
3731 """
3732 table = _table_arg_to_table_ref(table, default_project=self.project)
3733 meta_table = self.get_table(
3734 TableReference(
3735 DatasetReference(table.project, table.dataset_id),
3736 "%s$__PARTITIONS_SUMMARY__" % table.table_id,
3737 ),
3738 retry=retry,
3739 timeout=timeout,
3740 )
3742 subset = [col for col in meta_table.schema if col.name == "partition_id"]
3743 return [
3744 row[0]
3745 for row in self.list_rows(
3746 meta_table, selected_fields=subset, retry=retry, timeout=timeout
3747 )
3748 ]
3750 def list_rows(
3751 self,
3752 table: Union[Table, TableListItem, TableReference, str],
3753 selected_fields: Sequence[SchemaField] = None,
3754 max_results: Optional[int] = None,
3755 page_token: str = None,
3756 start_index: Optional[int] = None,
3757 page_size: Optional[int] = None,
3758 retry: retries.Retry = DEFAULT_RETRY,
3759 timeout: TimeoutType = DEFAULT_TIMEOUT,
3760 ) -> RowIterator:
3761 """List the rows of the table.
3763 See
3764 https://cloud.google.com/bigquery/docs/reference/rest/v2/tabledata/list
3766 .. note::
3768 This method assumes that the provided schema is up-to-date with the
3769 schema as defined on the back-end: if the two schemas are not
3770 identical, the values returned may be incomplete. To ensure that the
3771 local copy of the schema is up-to-date, call ``client.get_table``.
3773 Args:
3774 table (Union[ \
3775 google.cloud.bigquery.table.Table, \
3776 google.cloud.bigquery.table.TableListItem, \
3777 google.cloud.bigquery.table.TableReference, \
3778 str, \
3779 ]):
3780 The table to list, or a reference to it. When the table
3781 object does not contain a schema and ``selected_fields`` is
3782 not supplied, this method calls ``get_table`` to fetch the
3783 table schema.
3784 selected_fields (Sequence[google.cloud.bigquery.schema.SchemaField]):
3785 The fields to return. If not supplied, data for all columns
3786 are downloaded.
3787 max_results (Optional[int]):
3788 Maximum number of rows to return.
3789 page_token (Optional[str]):
3790 Token representing a cursor into the table's rows.
3791 If not passed, the API will return the first page of the
3792 rows. The token marks the beginning of the iterator to be
3793 returned and the value of the ``page_token`` can be accessed
3794 at ``next_page_token`` of the
3795 :class:`~google.cloud.bigquery.table.RowIterator`.
3796 start_index (Optional[int]):
3797 The zero-based index of the starting row to read.
3798 page_size (Optional[int]):
3799 The maximum number of rows in each page of results from this request.
3800 Non-positive values are ignored. Defaults to a sensible value set by the API.
3801 retry (Optional[google.api_core.retry.Retry]):
3802 How to retry the RPC.
3803 timeout (Optional[float]):
3804 The number of seconds to wait for the underlying HTTP transport
3805 before using ``retry``.
3806 If multiple requests are made under the hood, ``timeout``
3807 applies to each individual request.
3809 Returns:
3810 google.cloud.bigquery.table.RowIterator:
3811 Iterator of row data
3812 :class:`~google.cloud.bigquery.table.Row`-s. During each
3813 page, the iterator will have the ``total_rows`` attribute
3814 set, which counts the total number of rows **in the table**
3815 (this is distinct from the total number of rows in the
3816 current page: ``iterator.page.num_items``).
3817 """
3818 table = _table_arg_to_table(table, default_project=self.project)
3820 if not isinstance(table, Table):
3821 raise TypeError(_NEED_TABLE_ARGUMENT)
3823 schema = table.schema
3825 # selected_fields can override the table schema.
3826 if selected_fields is not None:
3827 schema = selected_fields
3829 # No schema, but no selected_fields. Assume the developer wants all
3830 # columns, so get the table resource for them rather than failing.
3831 elif len(schema) == 0:
3832 table = self.get_table(table.reference, retry=retry, timeout=timeout)
3833 schema = table.schema
3835 params: Dict[str, Any] = {}
3836 if selected_fields is not None:
3837 params["selectedFields"] = ",".join(field.name for field in selected_fields)
3838 if start_index is not None:
3839 params["startIndex"] = start_index
3841 params["formatOptions.useInt64Timestamp"] = True
3842 row_iterator = RowIterator(
3843 client=self,
3844 api_request=functools.partial(self._call_api, retry, timeout=timeout),
3845 path="%s/data" % (table.path,),
3846 schema=schema,
3847 page_token=page_token,
3848 max_results=max_results,
3849 page_size=page_size,
3850 extra_params=params,
3851 table=table,
3852 # Pass in selected_fields separately from schema so that full
3853 # tables can be fetched without a column filter.
3854 selected_fields=selected_fields,
3855 total_rows=getattr(table, "num_rows", None),
3856 )
3857 return row_iterator
3859 def _list_rows_from_query_results(
3860 self,
3861 job_id: str,
3862 location: str,
3863 project: str,
3864 schema: SchemaField,
3865 total_rows: Optional[int] = None,
3866 destination: Union[Table, TableReference, TableListItem, str] = None,
3867 max_results: Optional[int] = None,
3868 start_index: Optional[int] = None,
3869 page_size: Optional[int] = None,
3870 retry: retries.Retry = DEFAULT_RETRY,
3871 timeout: TimeoutType = DEFAULT_TIMEOUT,
3872 ) -> RowIterator:
3873 """List the rows of a completed query.
3874 See
3875 https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/getQueryResults
3876 Args:
3877 job_id (str):
3878 ID of a query job.
3879 location (str): Location of the query job.
3880 project (str):
3881 ID of the project where the query job was run.
3882 schema (Sequence[google.cloud.bigquery.schema.SchemaField]):
3883 The fields expected in these query results. Used to convert
3884 from JSON to expected Python types.
3885 total_rows (Optional[int]):
3886 Total number of rows in the query results.
3887 destination (Optional[Union[ \
3888 google.cloud.bigquery.table.Table, \
3889 google.cloud.bigquery.table.TableListItem, \
3890 google.cloud.bigquery.table.TableReference, \
3891 str, \
3892 ]]):
3893 Destination table reference. Used to fetch the query results
3894 with the BigQuery Storage API.
3895 max_results (Optional[int]):
3896 Maximum number of rows to return across the whole iterator.
3897 start_index (Optional[int]):
3898 The zero-based index of the starting row to read.
3899 page_size (Optional[int]):
3900 The maximum number of rows in each page of results from this request.
3901 Non-positive values are ignored. Defaults to a sensible value set by the API.
3902 retry (Optional[google.api_core.retry.Retry]):
3903 How to retry the RPC.
3904 timeout (Optional[float]):
3905 The number of seconds to wait for the underlying HTTP transport
3906 before using ``retry``. If set, this connection timeout may be
3907 increased to a minimum value. This prevents retries on what
3908 would otherwise be a successful response.
3909 If multiple requests are made under the hood, ``timeout``
3910 applies to each individual request.
3911 Returns:
3912 google.cloud.bigquery.table.RowIterator:
3913 Iterator of row data
3914 :class:`~google.cloud.bigquery.table.Row`-s.
3915 """
3916 params: Dict[str, Any] = {
3917 "fields": _LIST_ROWS_FROM_QUERY_RESULTS_FIELDS,
3918 "location": location,
3919 }
3921 if timeout is not None:
3922 timeout = max(timeout, _MIN_GET_QUERY_RESULTS_TIMEOUT)
3924 if start_index is not None:
3925 params["startIndex"] = start_index
3927 params["formatOptions.useInt64Timestamp"] = True
3928 row_iterator = RowIterator(
3929 client=self,
3930 api_request=functools.partial(self._call_api, retry, timeout=timeout),
3931 path=f"/projects/{project}/queries/{job_id}",
3932 schema=schema,
3933 max_results=max_results,
3934 page_size=page_size,
3935 table=destination,
3936 extra_params=params,
3937 total_rows=total_rows,
3938 )
3939 return row_iterator
3941 def _schema_from_json_file_object(self, file_obj):
3942 """Helper function for schema_from_json that takes a
3943 file object that describes a table schema.
3945 Returns:
3946 List of schema field objects.
3947 """
3948 json_data = json.load(file_obj)
3949 return [SchemaField.from_api_repr(field) for field in json_data]
3951 def _schema_to_json_file_object(self, schema_list, file_obj):
3952 """Helper function for schema_to_json that takes a schema list and file
3953 object and writes the schema list to the file object with json.dump
3954 """
3955 json.dump(schema_list, file_obj, indent=2, sort_keys=True)
3957 def schema_from_json(self, file_or_path: "PathType"):
3958 """Takes a file object or file path that contains json that describes
3959 a table schema.
3961 Returns:
3962 List of schema field objects.
3963 """
3964 if isinstance(file_or_path, io.IOBase):
3965 return self._schema_from_json_file_object(file_or_path)
3967 with open(file_or_path) as file_obj:
3968 return self._schema_from_json_file_object(file_obj)
3970 def schema_to_json(
3971 self, schema_list: Sequence[SchemaField], destination: "PathType"
3972 ):
3973 """Takes a list of schema field objects.
3975 Serializes the list of schema field objects as json to a file.
3977 Destination is a file path or a file object.
3978 """
3979 json_schema_list = [f.to_api_repr() for f in schema_list]
3981 if isinstance(destination, io.IOBase):
3982 return self._schema_to_json_file_object(json_schema_list, destination)
3984 with open(destination, mode="w") as file_obj:
3985 return self._schema_to_json_file_object(json_schema_list, file_obj)
3987 def __enter__(self):
3988 return self
3990 def __exit__(self, exc_type, exc_value, traceback):
3991 self.close()
3994# pylint: disable=unused-argument
3995def _item_to_project(iterator, resource):
3996 """Convert a JSON project to the native object.
3998 Args:
3999 iterator (google.api_core.page_iterator.Iterator): The iterator that is currently in use.
4001 resource (Dict): An item to be converted to a project.
4003 Returns:
4004 google.cloud.bigquery.client.Project: The next project in the page.
4005 """
4006 return Project.from_api_repr(resource)
4009# pylint: enable=unused-argument
4012def _item_to_dataset(iterator, resource):
4013 """Convert a JSON dataset to the native object.
4015 Args:
4016 iterator (google.api_core.page_iterator.Iterator): The iterator that is currently in use.
4018 resource (Dict): An item to be converted to a dataset.
4020 Returns:
4021 google.cloud.bigquery.dataset.DatasetListItem: The next dataset in the page.
4022 """
4023 return DatasetListItem(resource)
4026def _item_to_job(iterator, resource):
4027 """Convert a JSON job to the native object.
4029 Args:
4030 iterator (google.api_core.page_iterator.Iterator): The iterator that is currently in use.
4032 resource (Dict): An item to be converted to a job.
4034 Returns:
4035 job instance: The next job in the page.
4036 """
4037 return iterator.client.job_from_resource(resource)
4040def _item_to_model(iterator, resource):
4041 """Convert a JSON model to the native object.
4043 Args:
4044 iterator (google.api_core.page_iterator.Iterator):
4045 The iterator that is currently in use.
4046 resource (Dict): An item to be converted to a model.
4048 Returns:
4049 google.cloud.bigquery.model.Model: The next model in the page.
4050 """
4051 return Model.from_api_repr(resource)
4054def _item_to_routine(iterator, resource):
4055 """Convert a JSON model to the native object.
4057 Args:
4058 iterator (google.api_core.page_iterator.Iterator):
4059 The iterator that is currently in use.
4060 resource (Dict): An item to be converted to a routine.
4062 Returns:
4063 google.cloud.bigquery.routine.Routine: The next routine in the page.
4064 """
4065 return Routine.from_api_repr(resource)
4068def _item_to_table(iterator, resource):
4069 """Convert a JSON table to the native object.
4071 Args:
4072 iterator (google.api_core.page_iterator.Iterator): The iterator that is currently in use.
4074 resource (Dict): An item to be converted to a table.
4076 Returns:
4077 google.cloud.bigquery.table.Table: The next table in the page.
4078 """
4079 return TableListItem(resource)
4082def _extract_job_reference(job, project=None, location=None):
4083 """Extract fully-qualified job reference from a job-like object.
4085 Args:
4086 job_id (Union[ \
4087 str, \
4088 google.cloud.bigquery.job.LoadJob, \
4089 google.cloud.bigquery.job.CopyJob, \
4090 google.cloud.bigquery.job.ExtractJob, \
4091 google.cloud.bigquery.job.QueryJob \
4092 ]): Job identifier.
4093 project (Optional[str]):
4094 Project where the job was run. Ignored if ``job_id`` is a job
4095 object.
4096 location (Optional[str]):
4097 Location where the job was run. Ignored if ``job_id`` is a job
4098 object.
4100 Returns:
4101 Tuple[str, str, str]: ``(project, location, job_id)``
4102 """
4103 if hasattr(job, "job_id"):
4104 project = job.project
4105 job_id = job.job_id
4106 location = job.location
4107 else:
4108 job_id = job
4110 return (project, location, job_id)
4113def _check_mode(stream):
4114 """Check that a stream was opened in read-binary mode.
4116 Args:
4117 stream (IO[bytes]): A bytes IO object open for reading.
4119 Raises:
4120 ValueError:
4121 if the ``stream.mode`` is a valid attribute
4122 and is not among ``rb``, ``r+b`` or ``rb+``.
4123 """
4124 mode = getattr(stream, "mode", None)
4126 if isinstance(stream, gzip.GzipFile):
4127 if mode != gzip.READ: # pytype: disable=module-attr
4128 raise ValueError(
4129 "Cannot upload gzip files opened in write mode: use "
4130 "gzip.GzipFile(filename, mode='rb')"
4131 )
4132 else:
4133 if mode is not None and mode not in ("rb", "r+b", "rb+"):
4134 raise ValueError(
4135 "Cannot upload files opened in text mode: use "
4136 "open(filename, mode='rb') or open(filename, mode='r+b')"
4137 )
4140def _get_upload_headers(user_agent):
4141 """Get the headers for an upload request.
4143 Args:
4144 user_agent (str): The user-agent for requests.
4146 Returns:
4147 Dict: The headers to be used for the request.
4148 """
4149 return {
4150 "Accept": "application/json",
4151 "Accept-Encoding": "gzip, deflate",
4152 "User-Agent": user_agent,
4153 "content-type": "application/json; charset=UTF-8",
4154 }
4157def _add_server_timeout_header(headers: Optional[Dict[str, str]], kwargs):
4158 timeout = kwargs.get("timeout")
4159 if timeout is not None:
4160 if headers is None:
4161 headers = {}
4162 headers[TIMEOUT_HEADER] = str(timeout)
4164 if headers:
4165 kwargs["headers"] = headers
4167 return kwargs