1# Copyright 2015 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Client for interacting with the Google BigQuery API."""
16
17from __future__ import absolute_import
18from __future__ import annotations
19from __future__ import division
20
21from collections import abc as collections_abc
22import copy
23import datetime
24import functools
25import gzip
26import io
27import itertools
28import json
29import math
30import os
31import tempfile
32import typing
33from typing import (
34 Any,
35 Callable,
36 Dict,
37 IO,
38 Iterable,
39 Mapping,
40 List,
41 Optional,
42 Sequence,
43 Tuple,
44 Union,
45)
46import uuid
47import warnings
48
49import requests
50
51from google import resumable_media # type: ignore
52from google.resumable_media.requests import MultipartUpload # type: ignore
53from google.resumable_media.requests import ResumableUpload
54
55import google.api_core.client_options
56import google.api_core.exceptions as core_exceptions
57from google.api_core.iam import Policy
58from google.api_core import page_iterator
59from google.api_core import retry as retries
60import google.cloud._helpers # type: ignore
61from google.cloud import exceptions # pytype: disable=import-error
62from google.cloud.client import ClientWithProject # type: ignore # pytype: disable=import-error
63
64try:
65 from google.cloud.bigquery_storage_v1.services.big_query_read.client import (
66 DEFAULT_CLIENT_INFO as DEFAULT_BQSTORAGE_CLIENT_INFO,
67 )
68except ImportError:
69 DEFAULT_BQSTORAGE_CLIENT_INFO = None # type: ignore
70
71
72from google.auth.credentials import Credentials
73from google.cloud.bigquery._http import Connection
74from google.cloud.bigquery import _job_helpers
75from google.cloud.bigquery import _pandas_helpers
76from google.cloud.bigquery import _versions_helpers
77from google.cloud.bigquery import enums
78from google.cloud.bigquery import exceptions as bq_exceptions
79from google.cloud.bigquery import job
80from google.cloud.bigquery._helpers import _get_sub_prop
81from google.cloud.bigquery._helpers import _record_field_to_json
82from google.cloud.bigquery._helpers import _str_or_none
83from google.cloud.bigquery._helpers import _verify_job_config_type
84from google.cloud.bigquery._helpers import _get_bigquery_host
85from google.cloud.bigquery._helpers import _DEFAULT_HOST
86from google.cloud.bigquery._helpers import _DEFAULT_HOST_TEMPLATE
87from google.cloud.bigquery._helpers import _DEFAULT_UNIVERSE
88from google.cloud.bigquery._helpers import _validate_universe
89from google.cloud.bigquery._helpers import _get_client_universe
90from google.cloud.bigquery._helpers import TimeoutType
91from google.cloud.bigquery._job_helpers import make_job_id as _make_job_id
92from google.cloud.bigquery.dataset import Dataset
93from google.cloud.bigquery.dataset import DatasetListItem
94from google.cloud.bigquery.dataset import DatasetReference
95
96from google.cloud.bigquery.enums import AutoRowIDs, DatasetView, UpdateMode
97from google.cloud.bigquery.format_options import ParquetOptions
98from google.cloud.bigquery.job import (
99 CopyJob,
100 CopyJobConfig,
101 ExtractJob,
102 ExtractJobConfig,
103 LoadJob,
104 LoadJobConfig,
105 QueryJob,
106 QueryJobConfig,
107)
108from google.cloud.bigquery.model import Model
109from google.cloud.bigquery.model import ModelReference
110from google.cloud.bigquery.model import _model_arg_to_model_ref
111from google.cloud.bigquery.opentelemetry_tracing import create_span
112from google.cloud.bigquery.query import _QueryResults
113from google.cloud.bigquery.retry import (
114 DEFAULT_JOB_RETRY,
115 DEFAULT_RETRY,
116 DEFAULT_TIMEOUT,
117 DEFAULT_GET_JOB_TIMEOUT,
118 POLLING_DEFAULT_VALUE,
119)
120from google.cloud.bigquery.routine import Routine
121from google.cloud.bigquery.routine import RoutineReference
122from google.cloud.bigquery.schema import SchemaField
123from google.cloud.bigquery.table import _table_arg_to_table
124from google.cloud.bigquery.table import _table_arg_to_table_ref
125from google.cloud.bigquery.table import Table
126from google.cloud.bigquery.table import TableListItem
127from google.cloud.bigquery.table import TableReference
128from google.cloud.bigquery.table import RowIterator
129
130pyarrow = _versions_helpers.PYARROW_VERSIONS.try_import()
131pandas = (
132 _versions_helpers.PANDAS_VERSIONS.try_import()
133) # mypy check fails because pandas import is outside module, there are type: ignore comments related to this
134
135
136ResumableTimeoutType = Union[
137 None, float, Tuple[float, float]
138] # for resumable media methods
139
140if typing.TYPE_CHECKING: # pragma: NO COVER
141 # os.PathLike is only subscriptable in Python 3.9+, thus shielding with a condition.
142 PathType = Union[str, bytes, os.PathLike[str], os.PathLike[bytes]]
143_DEFAULT_CHUNKSIZE = 100 * 1024 * 1024 # 100 MB
144_MAX_MULTIPART_SIZE = 5 * 1024 * 1024
145_DEFAULT_NUM_RETRIES = 6
146_BASE_UPLOAD_TEMPLATE = "{host}/upload/bigquery/v2/projects/{project}/jobs?uploadType="
147_MULTIPART_URL_TEMPLATE = _BASE_UPLOAD_TEMPLATE + "multipart"
148_RESUMABLE_URL_TEMPLATE = _BASE_UPLOAD_TEMPLATE + "resumable"
149_GENERIC_CONTENT_TYPE = "*/*"
150_READ_LESS_THAN_SIZE = (
151 "Size {:d} was specified but the file-like object only had " "{:d} bytes remaining."
152)
153_NEED_TABLE_ARGUMENT = (
154 "The table argument should be a table ID string, Table, or TableReference"
155)
156_LIST_ROWS_FROM_QUERY_RESULTS_FIELDS = "jobReference,totalRows,pageToken,rows"
157
158# In microbenchmarks, it's been shown that even in ideal conditions (query
159# finished, local data), requests to getQueryResults can take 10+ seconds.
160# In less-than-ideal situations, the response can take even longer, as it must
161# be able to download a full 100+ MB row in that time. Don't let the
162# connection timeout before data can be downloaded.
163# https://github.com/googleapis/python-bigquery/issues/438
164_MIN_GET_QUERY_RESULTS_TIMEOUT = 120
165
166TIMEOUT_HEADER = "X-Server-Timeout"
167
168
169class Project(object):
170 """Wrapper for resource describing a BigQuery project.
171
172 Args:
173 project_id (str): Opaque ID of the project
174
175 numeric_id (int): Numeric ID of the project
176
177 friendly_name (str): Display name of the project
178 """
179
180 def __init__(self, project_id, numeric_id, friendly_name):
181 self.project_id = project_id
182 self.numeric_id = numeric_id
183 self.friendly_name = friendly_name
184
185 @classmethod
186 def from_api_repr(cls, resource):
187 """Factory: construct an instance from a resource dict."""
188 return cls(resource["id"], resource["numericId"], resource["friendlyName"])
189
190
191class Client(ClientWithProject):
192 """Client to bundle configuration needed for API requests.
193
194 Args:
195 project (Optional[str]):
196 Project ID for the project which the client acts on behalf of.
197 Will be passed when creating a dataset / job. If not passed,
198 falls back to the default inferred from the environment.
199 credentials (Optional[google.auth.credentials.Credentials]):
200 The OAuth2 Credentials to use for this client. If not passed
201 (and if no ``_http`` object is passed), falls back to the
202 default inferred from the environment.
203 _http (Optional[requests.Session]):
204 HTTP object to make requests. Can be any object that
205 defines ``request()`` with the same interface as
206 :meth:`requests.Session.request`. If not passed, an ``_http``
207 object is created that is bound to the ``credentials`` for the
208 current object.
209 This parameter should be considered private, and could change in
210 the future.
211 location (Optional[str]):
212 Default location for jobs / datasets / tables.
213 default_query_job_config (Optional[google.cloud.bigquery.job.QueryJobConfig]):
214 Default ``QueryJobConfig``.
215 Will be merged into job configs passed into the ``query`` method.
216 default_load_job_config (Optional[google.cloud.bigquery.job.LoadJobConfig]):
217 Default ``LoadJobConfig``.
218 Will be merged into job configs passed into the ``load_table_*`` methods.
219 client_info (Optional[google.api_core.client_info.ClientInfo]):
220 The client info used to send a user-agent string along with API
221 requests. If ``None``, then default info will be used. Generally,
222 you only need to set this if you're developing your own library
223 or partner tool.
224 client_options (Optional[Union[google.api_core.client_options.ClientOptions, Dict]]):
225 Client options used to set user options on the client. API Endpoint
226 should be set through client_options.
227 default_job_creation_mode (Optional[str]):
228 Sets the default job creation mode used by query methods such as
229 query_and_wait(). For lightweight queries, JOB_CREATION_OPTIONAL is
230 generally recommended.
231
232 Raises:
233 google.auth.exceptions.DefaultCredentialsError:
234 Raised if ``credentials`` is not specified and the library fails
235 to acquire default credentials.
236 """
237
238 SCOPE = ("https://www.googleapis.com/auth/cloud-platform",) # type: ignore
239 """The scopes required for authenticating as a BigQuery consumer."""
240
241 def __init__(
242 self,
243 project: Optional[str] = None,
244 credentials: Optional[Credentials] = None,
245 _http: Optional[requests.Session] = None,
246 location: Optional[str] = None,
247 default_query_job_config: Optional[QueryJobConfig] = None,
248 default_load_job_config: Optional[LoadJobConfig] = None,
249 client_info: Optional[google.api_core.client_info.ClientInfo] = None,
250 client_options: Optional[
251 Union[google.api_core.client_options.ClientOptions, Dict[str, Any]]
252 ] = None,
253 default_job_creation_mode: Optional[str] = None,
254 ) -> None:
255 if client_options is None:
256 client_options = {}
257 if isinstance(client_options, dict):
258 client_options = google.api_core.client_options.from_dict(client_options)
259 # assert isinstance(client_options, google.api_core.client_options.ClientOptions)
260
261 super(Client, self).__init__(
262 project=project,
263 credentials=credentials,
264 client_options=client_options,
265 _http=_http,
266 )
267
268 kw_args: Dict[str, Any] = {"client_info": client_info}
269 bq_host = _get_bigquery_host()
270 kw_args["api_endpoint"] = bq_host if bq_host != _DEFAULT_HOST else None
271 client_universe = None
272 if client_options.api_endpoint:
273 api_endpoint = client_options.api_endpoint
274 kw_args["api_endpoint"] = api_endpoint
275 else:
276 client_universe = _get_client_universe(client_options)
277 if client_universe != _DEFAULT_UNIVERSE:
278 kw_args["api_endpoint"] = _DEFAULT_HOST_TEMPLATE.replace(
279 "{UNIVERSE_DOMAIN}", client_universe
280 )
281 # Ensure credentials and universe are not in conflict.
282 if hasattr(self, "_credentials") and client_universe is not None:
283 _validate_universe(client_universe, self._credentials)
284
285 self._connection = Connection(self, **kw_args)
286 self._location = location
287 self._default_load_job_config = copy.deepcopy(default_load_job_config)
288 self.default_job_creation_mode = default_job_creation_mode
289
290 # Use property setter so validation can run.
291 self.default_query_job_config = default_query_job_config
292
293 @property
294 def location(self):
295 """Default location for jobs / datasets / tables."""
296 return self._location
297
298 @property
299 def default_job_creation_mode(self):
300 """Default job creation mode used for query execution."""
301 return self._default_job_creation_mode
302
303 @default_job_creation_mode.setter
304 def default_job_creation_mode(self, value: Optional[str]):
305 self._default_job_creation_mode = value
306
307 @property
308 def default_query_job_config(self) -> Optional[QueryJobConfig]:
309 """Default ``QueryJobConfig`` or ``None``.
310
311 Will be merged into job configs passed into the ``query`` or
312 ``query_and_wait`` methods.
313 """
314 return self._default_query_job_config
315
316 @default_query_job_config.setter
317 def default_query_job_config(self, value: Optional[QueryJobConfig]):
318 if value is not None:
319 _verify_job_config_type(
320 value, QueryJobConfig, param_name="default_query_job_config"
321 )
322 self._default_query_job_config = copy.deepcopy(value)
323
324 @property
325 def default_load_job_config(self):
326 """Default ``LoadJobConfig``.
327 Will be merged into job configs passed into the ``load_table_*`` methods.
328 """
329 return self._default_load_job_config
330
331 @default_load_job_config.setter
332 def default_load_job_config(self, value: LoadJobConfig):
333 self._default_load_job_config = copy.deepcopy(value)
334
335 def close(self):
336 """Close the underlying transport objects, releasing system resources.
337
338 .. note::
339
340 The client instance can be used for making additional requests even
341 after closing, in which case the underlying connections are
342 automatically re-created.
343 """
344 self._http._auth_request.session.close()
345 self._http.close()
346
347 def get_service_account_email(
348 self,
349 project: Optional[str] = None,
350 retry: retries.Retry = DEFAULT_RETRY,
351 timeout: TimeoutType = DEFAULT_TIMEOUT,
352 ) -> str:
353 """Get the email address of the project's BigQuery service account
354
355 Example:
356
357 .. code-block:: python
358
359 from google.cloud import bigquery
360 client = bigquery.Client()
361 client.get_service_account_email()
362 # returns an email similar to: my_service_account@my-project.iam.gserviceaccount.com
363
364 Note:
365 This is the service account that BigQuery uses to manage tables
366 encrypted by a key in KMS.
367
368 Args:
369 project (Optional[str]):
370 Project ID to use for retreiving service account email.
371 Defaults to the client's project.
372 retry (Optional[google.api_core.retry.Retry]): How to retry the RPC.
373 timeout (Optional[float]):
374 The number of seconds to wait for the underlying HTTP transport
375 before using ``retry``.
376
377 Returns:
378 str:
379 service account email address
380
381 """
382 if project is None:
383 project = self.project
384 path = "/projects/%s/serviceAccount" % (project,)
385 span_attributes = {"path": path}
386 api_response = self._call_api(
387 retry,
388 span_name="BigQuery.getServiceAccountEmail",
389 span_attributes=span_attributes,
390 method="GET",
391 path=path,
392 timeout=timeout,
393 )
394 return api_response["email"]
395
396 def list_projects(
397 self,
398 max_results: Optional[int] = None,
399 page_token: Optional[str] = None,
400 retry: retries.Retry = DEFAULT_RETRY,
401 timeout: TimeoutType = DEFAULT_TIMEOUT,
402 page_size: Optional[int] = None,
403 ) -> page_iterator.Iterator:
404 """List projects for the project associated with this client.
405
406 See
407 https://cloud.google.com/bigquery/docs/reference/rest/v2/projects/list
408
409 Args:
410 max_results (Optional[int]):
411 Maximum number of projects to return.
412 Defaults to a value set by the API.
413
414 page_token (Optional[str]):
415 Token representing a cursor into the projects. If not passed,
416 the API will return the first page of projects. The token marks
417 the beginning of the iterator to be returned and the value of
418 the ``page_token`` can be accessed at ``next_page_token`` of the
419 :class:`~google.api_core.page_iterator.HTTPIterator`.
420
421 retry (Optional[google.api_core.retry.Retry]): How to retry the RPC.
422
423 timeout (Optional[float]):
424 The number of seconds to wait for the underlying HTTP transport
425 before using ``retry``.
426
427 page_size (Optional[int]):
428 Maximum number of projects to return in each page.
429 Defaults to a value set by the API.
430
431 Returns:
432 google.api_core.page_iterator.Iterator:
433 Iterator of :class:`~google.cloud.bigquery.client.Project`
434 accessible to the current client.
435 """
436 span_attributes = {"path": "/projects"}
437
438 def api_request(*args, **kwargs):
439 return self._call_api(
440 retry,
441 span_name="BigQuery.listProjects",
442 span_attributes=span_attributes,
443 *args,
444 timeout=timeout,
445 **kwargs,
446 )
447
448 return page_iterator.HTTPIterator(
449 client=self,
450 api_request=api_request,
451 path="/projects",
452 item_to_value=_item_to_project,
453 items_key="projects",
454 page_token=page_token,
455 max_results=max_results,
456 page_size=page_size,
457 )
458
459 def list_datasets(
460 self,
461 project: Optional[str] = None,
462 include_all: bool = False,
463 filter: Optional[str] = None,
464 max_results: Optional[int] = None,
465 page_token: Optional[str] = None,
466 retry: retries.Retry = DEFAULT_RETRY,
467 timeout: TimeoutType = DEFAULT_TIMEOUT,
468 page_size: Optional[int] = None,
469 ) -> page_iterator.Iterator:
470 """List datasets for the project associated with this client.
471
472 See
473 https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets/list
474
475 Args:
476 project (Optional[str]):
477 Project ID to use for retreiving datasets. Defaults to the
478 client's project.
479 include_all (Optional[bool]):
480 True if results include hidden datasets. Defaults to False.
481 filter (Optional[str]):
482 An expression for filtering the results by label.
483 For syntax, see
484 https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets/list#body.QUERY_PARAMETERS.filter
485 max_results (Optional[int]):
486 Maximum number of datasets to return.
487 page_token (Optional[str]):
488 Token representing a cursor into the datasets. If not passed,
489 the API will return the first page of datasets. The token marks
490 the beginning of the iterator to be returned and the value of
491 the ``page_token`` can be accessed at ``next_page_token`` of the
492 :class:`~google.api_core.page_iterator.HTTPIterator`.
493 retry (Optional[google.api_core.retry.Retry]):
494 How to retry the RPC.
495 timeout (Optional[float]):
496 The number of seconds to wait for the underlying HTTP transport
497 before using ``retry``.
498 page_size (Optional[int]):
499 Maximum number of datasets to return per page.
500
501 Returns:
502 google.api_core.page_iterator.Iterator:
503 Iterator of :class:`~google.cloud.bigquery.dataset.DatasetListItem`.
504 associated with the project.
505 """
506 extra_params: Dict[str, Any] = {}
507 if project is None:
508 project = self.project
509 if include_all:
510 extra_params["all"] = True
511 if filter:
512 # TODO: consider supporting a dict of label -> value for filter,
513 # and converting it into a string here.
514 extra_params["filter"] = filter
515 path = "/projects/%s/datasets" % (project,)
516
517 span_attributes = {"path": path}
518
519 def api_request(*args, **kwargs):
520 return self._call_api(
521 retry,
522 span_name="BigQuery.listDatasets",
523 span_attributes=span_attributes,
524 *args,
525 timeout=timeout,
526 **kwargs,
527 )
528
529 return page_iterator.HTTPIterator(
530 client=self,
531 api_request=api_request,
532 path=path,
533 item_to_value=_item_to_dataset,
534 items_key="datasets",
535 page_token=page_token,
536 max_results=max_results,
537 extra_params=extra_params,
538 page_size=page_size,
539 )
540
541 def dataset(
542 self, dataset_id: str, project: Optional[str] = None
543 ) -> DatasetReference:
544 """Deprecated: Construct a reference to a dataset.
545
546 .. deprecated:: 1.24.0
547 Construct a
548 :class:`~google.cloud.bigquery.dataset.DatasetReference` using its
549 constructor or use a string where previously a reference object
550 was used.
551
552 As of ``google-cloud-bigquery`` version 1.7.0, all client methods
553 that take a
554 :class:`~google.cloud.bigquery.dataset.DatasetReference` or
555 :class:`~google.cloud.bigquery.table.TableReference` also take a
556 string in standard SQL format, e.g. ``project.dataset_id`` or
557 ``project.dataset_id.table_id``.
558
559 Args:
560 dataset_id (str): ID of the dataset.
561
562 project (Optional[str]):
563 Project ID for the dataset (defaults to the project of the client).
564
565 Returns:
566 google.cloud.bigquery.dataset.DatasetReference:
567 a new ``DatasetReference`` instance.
568 """
569 if project is None:
570 project = self.project
571
572 warnings.warn(
573 "Client.dataset is deprecated and will be removed in a future version. "
574 "Use a string like 'my_project.my_dataset' or a "
575 "cloud.google.bigquery.DatasetReference object, instead.",
576 PendingDeprecationWarning,
577 stacklevel=2,
578 )
579 return DatasetReference(project, dataset_id)
580
581 def _ensure_bqstorage_client(
582 self,
583 bqstorage_client: Optional[
584 "google.cloud.bigquery_storage.BigQueryReadClient"
585 ] = None,
586 client_options: Optional[google.api_core.client_options.ClientOptions] = None,
587 client_info: Optional[
588 "google.api_core.gapic_v1.client_info.ClientInfo"
589 ] = DEFAULT_BQSTORAGE_CLIENT_INFO,
590 ) -> Optional["google.cloud.bigquery_storage.BigQueryReadClient"]:
591 """Create a BigQuery Storage API client using this client's credentials.
592
593 Args:
594 bqstorage_client:
595 An existing BigQuery Storage client instance. If ``None``, a new
596 instance is created and returned.
597 client_options:
598 Custom options used with a new BigQuery Storage client instance
599 if one is created.
600 client_info:
601 The client info used with a new BigQuery Storage client
602 instance if one is created.
603
604 Returns:
605 A BigQuery Storage API client.
606 """
607
608 try:
609 bigquery_storage = _versions_helpers.BQ_STORAGE_VERSIONS.try_import(
610 raise_if_error=True
611 )
612 except bq_exceptions.BigQueryStorageNotFoundError:
613 warnings.warn(
614 "Cannot create BigQuery Storage client, the dependency "
615 "google-cloud-bigquery-storage is not installed."
616 )
617 return None
618 except bq_exceptions.LegacyBigQueryStorageError as exc:
619 warnings.warn(
620 "Dependency google-cloud-bigquery-storage is outdated: " + str(exc)
621 )
622 return None
623
624 if bqstorage_client is None: # pragma: NO COVER
625 bqstorage_client = bigquery_storage.BigQueryReadClient(
626 credentials=self._credentials,
627 client_options=client_options,
628 client_info=client_info, # type: ignore # (None is also accepted)
629 )
630
631 return bqstorage_client
632
633 def _dataset_from_arg(self, dataset) -> Union[Dataset, DatasetReference]:
634 if isinstance(dataset, str):
635 dataset = DatasetReference.from_string(
636 dataset, default_project=self.project
637 )
638
639 if not isinstance(dataset, (Dataset, DatasetReference)):
640 if isinstance(dataset, DatasetListItem):
641 dataset = dataset.reference
642 else:
643 raise TypeError(
644 "dataset must be a Dataset, DatasetReference, DatasetListItem,"
645 " or string"
646 )
647 return dataset
648
649 def create_dataset(
650 self,
651 dataset: Union[str, Dataset, DatasetReference, DatasetListItem],
652 exists_ok: bool = False,
653 retry: retries.Retry = DEFAULT_RETRY,
654 timeout: TimeoutType = DEFAULT_TIMEOUT,
655 ) -> Dataset:
656 """API call: create the dataset via a POST request.
657
658
659 See
660 https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets/insert
661
662 Example:
663
664 .. code-block:: python
665
666 from google.cloud import bigquery
667 client = bigquery.Client()
668 dataset = bigquery.Dataset('my_project.my_dataset')
669 dataset = client.create_dataset(dataset)
670
671 Args:
672 dataset (Union[ \
673 google.cloud.bigquery.dataset.Dataset, \
674 google.cloud.bigquery.dataset.DatasetReference, \
675 google.cloud.bigquery.dataset.DatasetListItem, \
676 str, \
677 ]):
678 A :class:`~google.cloud.bigquery.dataset.Dataset` to create.
679 If ``dataset`` is a reference, an empty dataset is created
680 with the specified ID and client's default location.
681 exists_ok (Optional[bool]):
682 Defaults to ``False``. If ``True``, ignore "already exists"
683 errors when creating the dataset.
684 retry (Optional[google.api_core.retry.Retry]):
685 How to retry the RPC.
686 timeout (Optional[float]):
687 The number of seconds to wait for the underlying HTTP transport
688 before using ``retry``.
689
690 Returns:
691 google.cloud.bigquery.dataset.Dataset:
692 A new ``Dataset`` returned from the API.
693
694 Raises:
695 google.cloud.exceptions.Conflict:
696 If the dataset already exists.
697 """
698 dataset = self._dataset_from_arg(dataset)
699 if isinstance(dataset, DatasetReference):
700 dataset = Dataset(dataset)
701
702 path = "/projects/%s/datasets" % (dataset.project,)
703
704 data = dataset.to_api_repr()
705 if data.get("location") is None and self.location is not None:
706 data["location"] = self.location
707
708 try:
709 span_attributes = {"path": path}
710
711 api_response = self._call_api(
712 retry,
713 span_name="BigQuery.createDataset",
714 span_attributes=span_attributes,
715 method="POST",
716 path=path,
717 data=data,
718 timeout=timeout,
719 )
720 return Dataset.from_api_repr(api_response)
721 except core_exceptions.Conflict:
722 if not exists_ok:
723 raise
724 return self.get_dataset(dataset.reference, retry=retry)
725
726 def create_routine(
727 self,
728 routine: Routine,
729 exists_ok: bool = False,
730 retry: retries.Retry = DEFAULT_RETRY,
731 timeout: TimeoutType = DEFAULT_TIMEOUT,
732 ) -> Routine:
733 """[Beta] Create a routine via a POST request.
734
735 See
736 https://cloud.google.com/bigquery/docs/reference/rest/v2/routines/insert
737
738 Args:
739 routine (google.cloud.bigquery.routine.Routine):
740 A :class:`~google.cloud.bigquery.routine.Routine` to create.
741 The dataset that the routine belongs to must already exist.
742 exists_ok (Optional[bool]):
743 Defaults to ``False``. If ``True``, ignore "already exists"
744 errors when creating the routine.
745 retry (Optional[google.api_core.retry.Retry]):
746 How to retry the RPC.
747 timeout (Optional[float]):
748 The number of seconds to wait for the underlying HTTP transport
749 before using ``retry``.
750
751 Returns:
752 google.cloud.bigquery.routine.Routine:
753 A new ``Routine`` returned from the service.
754
755 Raises:
756 google.cloud.exceptions.Conflict:
757 If the routine already exists.
758 """
759 reference = routine.reference
760 path = "/projects/{}/datasets/{}/routines".format(
761 reference.project, reference.dataset_id
762 )
763 resource = routine.to_api_repr()
764 try:
765 span_attributes = {"path": path}
766 api_response = self._call_api(
767 retry,
768 span_name="BigQuery.createRoutine",
769 span_attributes=span_attributes,
770 method="POST",
771 path=path,
772 data=resource,
773 timeout=timeout,
774 )
775 return Routine.from_api_repr(api_response)
776 except core_exceptions.Conflict:
777 if not exists_ok:
778 raise
779 return self.get_routine(routine.reference, retry=retry)
780
781 def create_table(
782 self,
783 table: Union[str, Table, TableReference, TableListItem],
784 exists_ok: bool = False,
785 retry: retries.Retry = DEFAULT_RETRY,
786 timeout: TimeoutType = DEFAULT_TIMEOUT,
787 ) -> Table:
788 """API call: create a table via a PUT request
789
790 See
791 https://cloud.google.com/bigquery/docs/reference/rest/v2/tables/insert
792
793 Args:
794 table (Union[ \
795 google.cloud.bigquery.table.Table, \
796 google.cloud.bigquery.table.TableReference, \
797 google.cloud.bigquery.table.TableListItem, \
798 str, \
799 ]):
800 A :class:`~google.cloud.bigquery.table.Table` to create.
801 If ``table`` is a reference, an empty table is created
802 with the specified ID. The dataset that the table belongs to
803 must already exist.
804 exists_ok (Optional[bool]):
805 Defaults to ``False``. If ``True``, ignore "already exists"
806 errors when creating the table.
807 retry (Optional[google.api_core.retry.Retry]):
808 How to retry the RPC.
809 timeout (Optional[float]):
810 The number of seconds to wait for the underlying HTTP transport
811 before using ``retry``.
812
813 Returns:
814 google.cloud.bigquery.table.Table:
815 A new ``Table`` returned from the service.
816
817 Raises:
818 google.cloud.exceptions.Conflict:
819 If the table already exists.
820 """
821 table = _table_arg_to_table(table, default_project=self.project)
822 dataset_id = table.dataset_id
823 path = "/projects/%s/datasets/%s/tables" % (table.project, dataset_id)
824 data = table.to_api_repr()
825 try:
826 span_attributes = {"path": path, "dataset_id": dataset_id}
827 api_response = self._call_api(
828 retry,
829 span_name="BigQuery.createTable",
830 span_attributes=span_attributes,
831 method="POST",
832 path=path,
833 data=data,
834 timeout=timeout,
835 )
836 return Table.from_api_repr(api_response)
837 except core_exceptions.Conflict:
838 if not exists_ok:
839 raise
840 return self.get_table(table.reference, retry=retry)
841
842 def _call_api(
843 self,
844 retry,
845 span_name=None,
846 span_attributes=None,
847 job_ref=None,
848 headers: Optional[Dict[str, str]] = None,
849 **kwargs,
850 ):
851 kwargs = _add_server_timeout_header(headers, kwargs)
852 call = functools.partial(self._connection.api_request, **kwargs)
853
854 if retry:
855 call = retry(call)
856
857 if span_name is not None:
858 with create_span(
859 name=span_name, attributes=span_attributes, client=self, job_ref=job_ref
860 ):
861 return call()
862
863 return call()
864
865 def get_dataset(
866 self,
867 dataset_ref: Union[DatasetReference, str],
868 retry: retries.Retry = DEFAULT_RETRY,
869 timeout: TimeoutType = DEFAULT_TIMEOUT,
870 dataset_view: Optional[DatasetView] = None,
871 ) -> Dataset:
872 """Fetch the dataset referenced by ``dataset_ref``
873
874 Args:
875 dataset_ref (Union[ \
876 google.cloud.bigquery.dataset.DatasetReference, \
877 str, \
878 ]):
879 A reference to the dataset to fetch from the BigQuery API.
880 If a string is passed in, this method attempts to create a
881 dataset reference from a string using
882 :func:`~google.cloud.bigquery.dataset.DatasetReference.from_string`.
883 retry (Optional[google.api_core.retry.Retry]):
884 How to retry the RPC.
885 timeout (Optional[float]):
886 The number of seconds to wait for the underlying HTTP transport
887 before using ``retry``.
888 dataset_view (Optional[google.cloud.bigquery.enums.DatasetView]):
889 Specifies the view that determines which dataset information is
890 returned. By default, dataset metadata (e.g. friendlyName, description,
891 labels, etc) and ACL information are returned. This argument can
892 take on the following possible enum values.
893
894 * :attr:`~google.cloud.bigquery.enums.DatasetView.ACL`:
895 Includes dataset metadata and the ACL.
896 * :attr:`~google.cloud.bigquery.enums.DatasetView.FULL`:
897 Includes all dataset metadata, including the ACL and table metadata.
898 This view is not supported by the `datasets.list` API method.
899 * :attr:`~google.cloud.bigquery.enums.DatasetView.METADATA`:
900 Includes basic dataset metadata, but not the ACL.
901 * :attr:`~google.cloud.bigquery.enums.DatasetView.DATASET_VIEW_UNSPECIFIED`:
902 The server will decide which view to use. Currently defaults to FULL.
903 Returns:
904 google.cloud.bigquery.dataset.Dataset:
905 A ``Dataset`` instance.
906 """
907 if isinstance(dataset_ref, str):
908 dataset_ref = DatasetReference.from_string(
909 dataset_ref, default_project=self.project
910 )
911 path = dataset_ref.path
912
913 if dataset_view:
914 query_params = {"datasetView": dataset_view.value}
915 else:
916 query_params = {}
917
918 span_attributes = {"path": path}
919 api_response = self._call_api(
920 retry,
921 span_name="BigQuery.getDataset",
922 span_attributes=span_attributes,
923 method="GET",
924 path=path,
925 timeout=timeout,
926 query_params=query_params,
927 )
928 return Dataset.from_api_repr(api_response)
929
930 def get_iam_policy(
931 self,
932 table: Union[Table, TableReference, TableListItem, str],
933 requested_policy_version: int = 1,
934 retry: retries.Retry = DEFAULT_RETRY,
935 timeout: TimeoutType = DEFAULT_TIMEOUT,
936 ) -> Policy:
937 """Return the access control policy for a table resource.
938
939 Args:
940 table (Union[ \
941 google.cloud.bigquery.table.Table, \
942 google.cloud.bigquery.table.TableReference, \
943 google.cloud.bigquery.table.TableListItem, \
944 str, \
945 ]):
946 The table to get the access control policy for.
947 If a string is passed in, this method attempts to create a
948 table reference from a string using
949 :func:`~google.cloud.bigquery.table.TableReference.from_string`.
950 requested_policy_version (int):
951 Optional. The maximum policy version that will be used to format the policy.
952
953 Only version ``1`` is currently supported.
954
955 See: https://cloud.google.com/bigquery/docs/reference/rest/v2/GetPolicyOptions
956 retry (Optional[google.api_core.retry.Retry]):
957 How to retry the RPC.
958 timeout (Optional[float]):
959 The number of seconds to wait for the underlying HTTP transport
960 before using ``retry``.
961
962 Returns:
963 google.api_core.iam.Policy:
964 The access control policy.
965 """
966 table = _table_arg_to_table_ref(table, default_project=self.project)
967
968 if requested_policy_version != 1:
969 raise ValueError("only IAM policy version 1 is supported")
970
971 body = {"options": {"requestedPolicyVersion": 1}}
972
973 path = "{}:getIamPolicy".format(table.path)
974 span_attributes = {"path": path}
975 response = self._call_api(
976 retry,
977 span_name="BigQuery.getIamPolicy",
978 span_attributes=span_attributes,
979 method="POST",
980 path=path,
981 data=body,
982 timeout=timeout,
983 )
984
985 return Policy.from_api_repr(response)
986
987 def set_iam_policy(
988 self,
989 table: Union[Table, TableReference, TableListItem, str],
990 policy: Policy,
991 updateMask: Optional[str] = None,
992 retry: retries.Retry = DEFAULT_RETRY,
993 timeout: TimeoutType = DEFAULT_TIMEOUT,
994 *,
995 fields: Sequence[str] = (),
996 ) -> Policy:
997 """Return the access control policy for a table resource.
998
999 Args:
1000 table (Union[ \
1001 google.cloud.bigquery.table.Table, \
1002 google.cloud.bigquery.table.TableReference, \
1003 google.cloud.bigquery.table.TableListItem, \
1004 str, \
1005 ]):
1006 The table to get the access control policy for.
1007 If a string is passed in, this method attempts to create a
1008 table reference from a string using
1009 :func:`~google.cloud.bigquery.table.TableReference.from_string`.
1010 policy (google.api_core.iam.Policy):
1011 The access control policy to set.
1012 updateMask (Optional[str]):
1013 Mask as defined by
1014 https://cloud.google.com/bigquery/docs/reference/rest/v2/tables/setIamPolicy#body.request_body.FIELDS.update_mask
1015
1016 Incompatible with ``fields``.
1017 retry (Optional[google.api_core.retry.Retry]):
1018 How to retry the RPC.
1019 timeout (Optional[float]):
1020 The number of seconds to wait for the underlying HTTP transport
1021 before using ``retry``.
1022 fields (Sequence[str]):
1023 Which properties to set on the policy. See:
1024 https://cloud.google.com/bigquery/docs/reference/rest/v2/tables/setIamPolicy#body.request_body.FIELDS.update_mask
1025
1026 Incompatible with ``updateMask``.
1027
1028 Returns:
1029 google.api_core.iam.Policy:
1030 The updated access control policy.
1031 """
1032 if updateMask is not None and not fields:
1033 update_mask = updateMask
1034 elif updateMask is not None and fields:
1035 raise ValueError("Cannot set both fields and updateMask")
1036 elif fields:
1037 update_mask = ",".join(fields)
1038 else:
1039 update_mask = None
1040
1041 table = _table_arg_to_table_ref(table, default_project=self.project)
1042
1043 if not isinstance(policy, (Policy)):
1044 raise TypeError("policy must be a Policy")
1045
1046 body = {"policy": policy.to_api_repr()}
1047
1048 if update_mask is not None:
1049 body["updateMask"] = update_mask
1050
1051 path = "{}:setIamPolicy".format(table.path)
1052 span_attributes = {"path": path}
1053
1054 response = self._call_api(
1055 retry,
1056 span_name="BigQuery.setIamPolicy",
1057 span_attributes=span_attributes,
1058 method="POST",
1059 path=path,
1060 data=body,
1061 timeout=timeout,
1062 )
1063
1064 return Policy.from_api_repr(response)
1065
1066 def test_iam_permissions(
1067 self,
1068 table: Union[Table, TableReference, TableListItem, str],
1069 permissions: Sequence[str],
1070 retry: retries.Retry = DEFAULT_RETRY,
1071 timeout: TimeoutType = DEFAULT_TIMEOUT,
1072 ) -> Dict[str, Any]:
1073 table = _table_arg_to_table_ref(table, default_project=self.project)
1074
1075 body = {"permissions": permissions}
1076
1077 path = "{}:testIamPermissions".format(table.path)
1078 span_attributes = {"path": path}
1079 response = self._call_api(
1080 retry,
1081 span_name="BigQuery.testIamPermissions",
1082 span_attributes=span_attributes,
1083 method="POST",
1084 path=path,
1085 data=body,
1086 timeout=timeout,
1087 )
1088
1089 return response
1090
1091 def get_model(
1092 self,
1093 model_ref: Union[ModelReference, str],
1094 retry: retries.Retry = DEFAULT_RETRY,
1095 timeout: TimeoutType = DEFAULT_TIMEOUT,
1096 ) -> Model:
1097 """[Beta] Fetch the model referenced by ``model_ref``.
1098
1099 Args:
1100 model_ref (Union[ \
1101 google.cloud.bigquery.model.ModelReference, \
1102 str, \
1103 ]):
1104 A reference to the model to fetch from the BigQuery API.
1105 If a string is passed in, this method attempts to create a
1106 model reference from a string using
1107 :func:`google.cloud.bigquery.model.ModelReference.from_string`.
1108 retry (Optional[google.api_core.retry.Retry]):
1109 How to retry the RPC.
1110 timeout (Optional[float]):
1111 The number of seconds to wait for the underlying HTTP transport
1112 before using ``retry``.
1113
1114 Returns:
1115 google.cloud.bigquery.model.Model: A ``Model`` instance.
1116 """
1117 if isinstance(model_ref, str):
1118 model_ref = ModelReference.from_string(
1119 model_ref, default_project=self.project
1120 )
1121 path = model_ref.path
1122 span_attributes = {"path": path}
1123
1124 api_response = self._call_api(
1125 retry,
1126 span_name="BigQuery.getModel",
1127 span_attributes=span_attributes,
1128 method="GET",
1129 path=path,
1130 timeout=timeout,
1131 )
1132 return Model.from_api_repr(api_response)
1133
1134 def get_routine(
1135 self,
1136 routine_ref: Union[Routine, RoutineReference, str],
1137 retry: retries.Retry = DEFAULT_RETRY,
1138 timeout: TimeoutType = DEFAULT_TIMEOUT,
1139 ) -> Routine:
1140 """[Beta] Get the routine referenced by ``routine_ref``.
1141
1142 Args:
1143 routine_ref (Union[ \
1144 google.cloud.bigquery.routine.Routine, \
1145 google.cloud.bigquery.routine.RoutineReference, \
1146 str, \
1147 ]):
1148 A reference to the routine to fetch from the BigQuery API. If
1149 a string is passed in, this method attempts to create a
1150 reference from a string using
1151 :func:`google.cloud.bigquery.routine.RoutineReference.from_string`.
1152 retry (Optional[google.api_core.retry.Retry]):
1153 How to retry the API call.
1154 timeout (Optional[float]):
1155 The number of seconds to wait for the underlying HTTP transport
1156 before using ``retry``.
1157
1158 Returns:
1159 google.cloud.bigquery.routine.Routine:
1160 A ``Routine`` instance.
1161 """
1162 if isinstance(routine_ref, str):
1163 routine_ref = RoutineReference.from_string(
1164 routine_ref, default_project=self.project
1165 )
1166 path = routine_ref.path
1167 span_attributes = {"path": path}
1168 api_response = self._call_api(
1169 retry,
1170 span_name="BigQuery.getRoutine",
1171 span_attributes=span_attributes,
1172 method="GET",
1173 path=path,
1174 timeout=timeout,
1175 )
1176 return Routine.from_api_repr(api_response)
1177
1178 def get_table(
1179 self,
1180 table: Union[Table, TableReference, TableListItem, str],
1181 retry: retries.Retry = DEFAULT_RETRY,
1182 timeout: TimeoutType = DEFAULT_TIMEOUT,
1183 ) -> Table:
1184 """Fetch the table referenced by ``table``.
1185
1186 Args:
1187 table (Union[ \
1188 google.cloud.bigquery.table.Table, \
1189 google.cloud.bigquery.table.TableReference, \
1190 google.cloud.bigquery.table.TableListItem, \
1191 str, \
1192 ]):
1193 A reference to the table to fetch from the BigQuery API.
1194 If a string is passed in, this method attempts to create a
1195 table reference from a string using
1196 :func:`google.cloud.bigquery.table.TableReference.from_string`.
1197 retry (Optional[google.api_core.retry.Retry]):
1198 How to retry the RPC.
1199 timeout (Optional[float]):
1200 The number of seconds to wait for the underlying HTTP transport
1201 before using ``retry``.
1202
1203 Returns:
1204 google.cloud.bigquery.table.Table:
1205 A ``Table`` instance.
1206 """
1207 table_ref = _table_arg_to_table_ref(table, default_project=self.project)
1208 path = table_ref.path
1209 span_attributes = {"path": path}
1210 api_response = self._call_api(
1211 retry,
1212 span_name="BigQuery.getTable",
1213 span_attributes=span_attributes,
1214 method="GET",
1215 path=path,
1216 timeout=timeout,
1217 )
1218 return Table.from_api_repr(api_response)
1219
1220 def update_dataset(
1221 self,
1222 dataset: Dataset,
1223 fields: Sequence[str],
1224 retry: retries.Retry = DEFAULT_RETRY,
1225 timeout: TimeoutType = DEFAULT_TIMEOUT,
1226 update_mode: Optional[UpdateMode] = None,
1227 ) -> Dataset:
1228 """Change some fields of a dataset.
1229
1230 Use ``fields`` to specify which fields to update. At least one field
1231 must be provided. If a field is listed in ``fields`` and is ``None`` in
1232 ``dataset``, it will be deleted.
1233
1234 For example, to update the default expiration times, specify
1235 both properties in the ``fields`` argument:
1236
1237 .. code-block:: python
1238
1239 bigquery_client.update_dataset(
1240 dataset,
1241 [
1242 "default_partition_expiration_ms",
1243 "default_table_expiration_ms",
1244 ]
1245 )
1246
1247 If ``dataset.etag`` is not ``None``, the update will only
1248 succeed if the dataset on the server has the same ETag. Thus
1249 reading a dataset with ``get_dataset``, changing its fields,
1250 and then passing it to ``update_dataset`` will ensure that the changes
1251 will only be saved if no modifications to the dataset occurred
1252 since the read.
1253
1254 Args:
1255 dataset (google.cloud.bigquery.dataset.Dataset):
1256 The dataset to update.
1257 fields (Sequence[str]):
1258 The properties of ``dataset`` to change. These are strings
1259 corresponding to the properties of
1260 :class:`~google.cloud.bigquery.dataset.Dataset`.
1261 retry (Optional[google.api_core.retry.Retry]):
1262 How to retry the RPC.
1263 timeout (Optional[float]):
1264 The number of seconds to wait for the underlying HTTP transport
1265 before using ``retry``.
1266 update_mode (Optional[google.cloud.bigquery.enums.UpdateMode]):
1267 Specifies the kind of information to update in a dataset.
1268 By default, dataset metadata (e.g. friendlyName, description,
1269 labels, etc) and ACL information are updated. This argument can
1270 take on the following possible enum values.
1271
1272 * :attr:`~google.cloud.bigquery.enums.UPDATE_MODE_UNSPECIFIED`:
1273 The default value. Behavior defaults to UPDATE_FULL.
1274 * :attr:`~google.cloud.bigquery.enums.UpdateMode.UPDATE_METADATA`:
1275 Includes metadata information for the dataset, such as friendlyName, description, labels, etc.
1276 * :attr:`~google.cloud.bigquery.enums.UpdateMode.UPDATE_ACL`:
1277 Includes ACL information for the dataset, which defines dataset access for one or more entities.
1278 * :attr:`~google.cloud.bigquery.enums.UpdateMode.UPDATE_FULL`:
1279 Includes both dataset metadata and ACL information.
1280
1281 Returns:
1282 google.cloud.bigquery.dataset.Dataset:
1283 The modified ``Dataset`` instance.
1284 """
1285 partial = dataset._build_resource(fields)
1286 if dataset.etag is not None:
1287 headers: Optional[Dict[str, str]] = {"If-Match": dataset.etag}
1288 else:
1289 headers = None
1290 path = dataset.path
1291 span_attributes = {"path": path, "fields": fields}
1292
1293 if update_mode:
1294 query_params = {"updateMode": update_mode.value}
1295 else:
1296 query_params = {}
1297
1298 api_response = self._call_api(
1299 retry,
1300 span_name="BigQuery.updateDataset",
1301 span_attributes=span_attributes,
1302 method="PATCH",
1303 path=path,
1304 data=partial,
1305 headers=headers,
1306 timeout=timeout,
1307 query_params=query_params,
1308 )
1309 return Dataset.from_api_repr(api_response)
1310
1311 def update_model(
1312 self,
1313 model: Model,
1314 fields: Sequence[str],
1315 retry: retries.Retry = DEFAULT_RETRY,
1316 timeout: TimeoutType = DEFAULT_TIMEOUT,
1317 ) -> Model:
1318 """[Beta] Change some fields of a model.
1319
1320 Use ``fields`` to specify which fields to update. At least one field
1321 must be provided. If a field is listed in ``fields`` and is ``None``
1322 in ``model``, the field value will be deleted.
1323
1324 For example, to update the descriptive properties of the model,
1325 specify them in the ``fields`` argument:
1326
1327 .. code-block:: python
1328
1329 bigquery_client.update_model(
1330 model, ["description", "friendly_name"]
1331 )
1332
1333 If ``model.etag`` is not ``None``, the update will only succeed if
1334 the model on the server has the same ETag. Thus reading a model with
1335 ``get_model``, changing its fields, and then passing it to
1336 ``update_model`` will ensure that the changes will only be saved if
1337 no modifications to the model occurred since the read.
1338
1339 Args:
1340 model (google.cloud.bigquery.model.Model): The model to update.
1341 fields (Sequence[str]):
1342 The properties of ``model`` to change. These are strings
1343 corresponding to the properties of
1344 :class:`~google.cloud.bigquery.model.Model`.
1345 retry (Optional[google.api_core.retry.Retry]):
1346 A description of how to retry the API call.
1347 timeout (Optional[float]):
1348 The number of seconds to wait for the underlying HTTP transport
1349 before using ``retry``.
1350
1351 Returns:
1352 google.cloud.bigquery.model.Model:
1353 The model resource returned from the API call.
1354 """
1355 partial = model._build_resource(fields)
1356 if model.etag:
1357 headers: Optional[Dict[str, str]] = {"If-Match": model.etag}
1358 else:
1359 headers = None
1360 path = model.path
1361 span_attributes = {"path": path, "fields": fields}
1362
1363 api_response = self._call_api(
1364 retry,
1365 span_name="BigQuery.updateModel",
1366 span_attributes=span_attributes,
1367 method="PATCH",
1368 path=path,
1369 data=partial,
1370 headers=headers,
1371 timeout=timeout,
1372 )
1373 return Model.from_api_repr(api_response)
1374
1375 def update_routine(
1376 self,
1377 routine: Routine,
1378 fields: Sequence[str],
1379 retry: retries.Retry = DEFAULT_RETRY,
1380 timeout: TimeoutType = DEFAULT_TIMEOUT,
1381 ) -> Routine:
1382 """[Beta] Change some fields of a routine.
1383
1384 Use ``fields`` to specify which fields to update. At least one field
1385 must be provided. If a field is listed in ``fields`` and is ``None``
1386 in ``routine``, the field value will be deleted.
1387
1388 For example, to update the description property of the routine,
1389 specify it in the ``fields`` argument:
1390
1391 .. code-block:: python
1392
1393 bigquery_client.update_routine(
1394 routine, ["description"]
1395 )
1396
1397 .. warning::
1398 During beta, partial updates are not supported. You must provide
1399 all fields in the resource.
1400
1401 If :attr:`~google.cloud.bigquery.routine.Routine.etag` is not
1402 ``None``, the update will only succeed if the resource on the server
1403 has the same ETag. Thus reading a routine with
1404 :func:`~google.cloud.bigquery.client.Client.get_routine`, changing
1405 its fields, and then passing it to this method will ensure that the
1406 changes will only be saved if no modifications to the resource
1407 occurred since the read.
1408
1409 Args:
1410 routine (google.cloud.bigquery.routine.Routine):
1411 The routine to update.
1412 fields (Sequence[str]):
1413 The fields of ``routine`` to change, spelled as the
1414 :class:`~google.cloud.bigquery.routine.Routine` properties.
1415 retry (Optional[google.api_core.retry.Retry]):
1416 A description of how to retry the API call.
1417 timeout (Optional[float]):
1418 The number of seconds to wait for the underlying HTTP transport
1419 before using ``retry``.
1420
1421 Returns:
1422 google.cloud.bigquery.routine.Routine:
1423 The routine resource returned from the API call.
1424 """
1425 partial = routine._build_resource(fields)
1426 if routine.etag:
1427 headers: Optional[Dict[str, str]] = {"If-Match": routine.etag}
1428 else:
1429 headers = None
1430
1431 # TODO: remove when routines update supports partial requests.
1432 partial["routineReference"] = routine.reference.to_api_repr()
1433
1434 path = routine.path
1435 span_attributes = {"path": path, "fields": fields}
1436
1437 api_response = self._call_api(
1438 retry,
1439 span_name="BigQuery.updateRoutine",
1440 span_attributes=span_attributes,
1441 method="PUT",
1442 path=path,
1443 data=partial,
1444 headers=headers,
1445 timeout=timeout,
1446 )
1447 return Routine.from_api_repr(api_response)
1448
1449 def update_table(
1450 self,
1451 table: Table,
1452 fields: Sequence[str],
1453 autodetect_schema: bool = False,
1454 retry: retries.Retry = DEFAULT_RETRY,
1455 timeout: TimeoutType = DEFAULT_TIMEOUT,
1456 ) -> Table:
1457 """Change some fields of a table.
1458
1459 Use ``fields`` to specify which fields to update. At least one field
1460 must be provided. If a field is listed in ``fields`` and is ``None``
1461 in ``table``, the field value will be deleted.
1462
1463 For example, to update the descriptive properties of the table,
1464 specify them in the ``fields`` argument:
1465
1466 .. code-block:: python
1467
1468 bigquery_client.update_table(
1469 table,
1470 ["description", "friendly_name"]
1471 )
1472
1473 If ``table.etag`` is not ``None``, the update will only succeed if
1474 the table on the server has the same ETag. Thus reading a table with
1475 ``get_table``, changing its fields, and then passing it to
1476 ``update_table`` will ensure that the changes will only be saved if
1477 no modifications to the table occurred since the read.
1478
1479 Args:
1480 table (google.cloud.bigquery.table.Table): The table to update.
1481 fields (Sequence[str]):
1482 The fields of ``table`` to change, spelled as the
1483 :class:`~google.cloud.bigquery.table.Table` properties.
1484 autodetect_schema (bool):
1485 Specifies if the schema of the table should be autodetected when
1486 updating the table from the underlying source. Only applicable
1487 for external tables.
1488 retry (Optional[google.api_core.retry.Retry]):
1489 A description of how to retry the API call.
1490 timeout (Optional[float]):
1491 The number of seconds to wait for the underlying HTTP transport
1492 before using ``retry``.
1493
1494 Returns:
1495 google.cloud.bigquery.table.Table:
1496 The table resource returned from the API call.
1497 """
1498 partial = table._build_resource(fields)
1499 if table.etag is not None:
1500 headers: Optional[Dict[str, str]] = {"If-Match": table.etag}
1501 else:
1502 headers = None
1503
1504 path = table.path
1505 span_attributes = {"path": path, "fields": fields}
1506
1507 if autodetect_schema:
1508 query_params = {"autodetect_schema": True}
1509 else:
1510 query_params = {}
1511
1512 api_response = self._call_api(
1513 retry,
1514 span_name="BigQuery.updateTable",
1515 span_attributes=span_attributes,
1516 method="PATCH",
1517 path=path,
1518 query_params=query_params,
1519 data=partial,
1520 headers=headers,
1521 timeout=timeout,
1522 )
1523 return Table.from_api_repr(api_response)
1524
1525 def list_models(
1526 self,
1527 dataset: Union[Dataset, DatasetReference, DatasetListItem, str],
1528 max_results: Optional[int] = None,
1529 page_token: Optional[str] = None,
1530 retry: retries.Retry = DEFAULT_RETRY,
1531 timeout: TimeoutType = DEFAULT_TIMEOUT,
1532 page_size: Optional[int] = None,
1533 ) -> page_iterator.Iterator:
1534 """[Beta] List models in the dataset.
1535
1536 See
1537 https://cloud.google.com/bigquery/docs/reference/rest/v2/models/list
1538
1539 Args:
1540 dataset (Union[ \
1541 google.cloud.bigquery.dataset.Dataset, \
1542 google.cloud.bigquery.dataset.DatasetReference, \
1543 google.cloud.bigquery.dataset.DatasetListItem, \
1544 str, \
1545 ]):
1546 A reference to the dataset whose models to list from the
1547 BigQuery API. If a string is passed in, this method attempts
1548 to create a dataset reference from a string using
1549 :func:`google.cloud.bigquery.dataset.DatasetReference.from_string`.
1550 max_results (Optional[int]):
1551 Maximum number of models to return. Defaults to a
1552 value set by the API.
1553 page_token (Optional[str]):
1554 Token representing a cursor into the models. If not passed,
1555 the API will return the first page of models. The token marks
1556 the beginning of the iterator to be returned and the value of
1557 the ``page_token`` can be accessed at ``next_page_token`` of the
1558 :class:`~google.api_core.page_iterator.HTTPIterator`.
1559 retry (Optional[google.api_core.retry.Retry]):
1560 How to retry the RPC.
1561 timeout (Optional[float]):
1562 The number of seconds to wait for the underlying HTTP transport
1563 before using ``retry``.
1564 page_size (Optional[int]):
1565 Maximum number of models to return per page.
1566 Defaults to a value set by the API.
1567
1568 Returns:
1569 google.api_core.page_iterator.Iterator:
1570 Iterator of
1571 :class:`~google.cloud.bigquery.model.Model` contained
1572 within the requested dataset.
1573 """
1574 dataset = self._dataset_from_arg(dataset)
1575
1576 path = "%s/models" % dataset.path
1577 span_attributes = {"path": path}
1578
1579 def api_request(*args, **kwargs):
1580 return self._call_api(
1581 retry,
1582 span_name="BigQuery.listModels",
1583 span_attributes=span_attributes,
1584 *args,
1585 timeout=timeout,
1586 **kwargs,
1587 )
1588
1589 result = page_iterator.HTTPIterator(
1590 client=self,
1591 api_request=api_request,
1592 path=path,
1593 item_to_value=_item_to_model,
1594 items_key="models",
1595 page_token=page_token,
1596 max_results=max_results,
1597 page_size=page_size,
1598 )
1599 result.dataset = dataset # type: ignore
1600 return result
1601
1602 def list_routines(
1603 self,
1604 dataset: Union[Dataset, DatasetReference, DatasetListItem, str],
1605 max_results: Optional[int] = None,
1606 page_token: Optional[str] = None,
1607 retry: retries.Retry = DEFAULT_RETRY,
1608 timeout: TimeoutType = DEFAULT_TIMEOUT,
1609 page_size: Optional[int] = None,
1610 ) -> page_iterator.Iterator:
1611 """[Beta] List routines in the dataset.
1612
1613 See
1614 https://cloud.google.com/bigquery/docs/reference/rest/v2/routines/list
1615
1616 Args:
1617 dataset (Union[ \
1618 google.cloud.bigquery.dataset.Dataset, \
1619 google.cloud.bigquery.dataset.DatasetReference, \
1620 google.cloud.bigquery.dataset.DatasetListItem, \
1621 str, \
1622 ]):
1623 A reference to the dataset whose routines to list from the
1624 BigQuery API. If a string is passed in, this method attempts
1625 to create a dataset reference from a string using
1626 :func:`google.cloud.bigquery.dataset.DatasetReference.from_string`.
1627 max_results (Optional[int]):
1628 Maximum number of routines to return. Defaults
1629 to a value set by the API.
1630 page_token (Optional[str]):
1631 Token representing a cursor into the routines. If not passed,
1632 the API will return the first page of routines. The token marks
1633 the beginning of the iterator to be returned and the value of the
1634 ``page_token`` can be accessed at ``next_page_token`` of the
1635 :class:`~google.api_core.page_iterator.HTTPIterator`.
1636 retry (Optional[google.api_core.retry.Retry]):
1637 How to retry the RPC.
1638 timeout (Optional[float]):
1639 The number of seconds to wait for the underlying HTTP transport
1640 before using ``retry``.
1641 page_size (Optional[int]):
1642 Maximum number of routines to return per page.
1643 Defaults to a value set by the API.
1644
1645 Returns:
1646 google.api_core.page_iterator.Iterator:
1647 Iterator of all
1648 :class:`~google.cloud.bigquery.routine.Routine`s contained
1649 within the requested dataset, limited by ``max_results``.
1650 """
1651 dataset = self._dataset_from_arg(dataset)
1652 path = "{}/routines".format(dataset.path)
1653
1654 span_attributes = {"path": path}
1655
1656 def api_request(*args, **kwargs):
1657 return self._call_api(
1658 retry,
1659 span_name="BigQuery.listRoutines",
1660 span_attributes=span_attributes,
1661 *args,
1662 timeout=timeout,
1663 **kwargs,
1664 )
1665
1666 result = page_iterator.HTTPIterator(
1667 client=self,
1668 api_request=api_request,
1669 path=path,
1670 item_to_value=_item_to_routine,
1671 items_key="routines",
1672 page_token=page_token,
1673 max_results=max_results,
1674 page_size=page_size,
1675 )
1676 result.dataset = dataset # type: ignore
1677 return result
1678
1679 def list_tables(
1680 self,
1681 dataset: Union[Dataset, DatasetReference, DatasetListItem, str],
1682 max_results: Optional[int] = None,
1683 page_token: Optional[str] = None,
1684 retry: retries.Retry = DEFAULT_RETRY,
1685 timeout: TimeoutType = DEFAULT_TIMEOUT,
1686 page_size: Optional[int] = None,
1687 ) -> page_iterator.Iterator:
1688 """List tables in the dataset.
1689
1690 See
1691 https://cloud.google.com/bigquery/docs/reference/rest/v2/tables/list
1692
1693 Args:
1694 dataset (Union[ \
1695 google.cloud.bigquery.dataset.Dataset, \
1696 google.cloud.bigquery.dataset.DatasetReference, \
1697 google.cloud.bigquery.dataset.DatasetListItem, \
1698 str, \
1699 ]):
1700 A reference to the dataset whose tables to list from the
1701 BigQuery API. If a string is passed in, this method attempts
1702 to create a dataset reference from a string using
1703 :func:`google.cloud.bigquery.dataset.DatasetReference.from_string`.
1704 max_results (Optional[int]):
1705 Maximum number of tables to return. Defaults
1706 to a value set by the API.
1707 page_token (Optional[str]):
1708 Token representing a cursor into the tables. If not passed,
1709 the API will return the first page of tables. The token marks
1710 the beginning of the iterator to be returned and the value of
1711 the ``page_token`` can be accessed at ``next_page_token`` of the
1712 :class:`~google.api_core.page_iterator.HTTPIterator`.
1713 retry (Optional[google.api_core.retry.Retry]):
1714 How to retry the RPC.
1715 timeout (Optional[float]):
1716 The number of seconds to wait for the underlying HTTP transport
1717 before using ``retry``.
1718 page_size (Optional[int]):
1719 Maximum number of tables to return per page.
1720 Defaults to a value set by the API.
1721
1722 Returns:
1723 google.api_core.page_iterator.Iterator:
1724 Iterator of
1725 :class:`~google.cloud.bigquery.table.TableListItem` contained
1726 within the requested dataset.
1727 """
1728 dataset = self._dataset_from_arg(dataset)
1729 path = "%s/tables" % dataset.path
1730 span_attributes = {"path": path}
1731
1732 def api_request(*args, **kwargs):
1733 return self._call_api(
1734 retry,
1735 span_name="BigQuery.listTables",
1736 span_attributes=span_attributes,
1737 *args,
1738 timeout=timeout,
1739 **kwargs,
1740 )
1741
1742 result = page_iterator.HTTPIterator(
1743 client=self,
1744 api_request=api_request,
1745 path=path,
1746 item_to_value=_item_to_table,
1747 items_key="tables",
1748 page_token=page_token,
1749 max_results=max_results,
1750 page_size=page_size,
1751 )
1752 result.dataset = dataset # type: ignore
1753 return result
1754
1755 def delete_dataset(
1756 self,
1757 dataset: Union[Dataset, DatasetReference, DatasetListItem, str],
1758 delete_contents: bool = False,
1759 retry: retries.Retry = DEFAULT_RETRY,
1760 timeout: TimeoutType = DEFAULT_TIMEOUT,
1761 not_found_ok: bool = False,
1762 ) -> None:
1763 """Delete a dataset.
1764
1765 See
1766 https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets/delete
1767
1768 Args:
1769 dataset (Union[ \
1770 google.cloud.bigquery.dataset.Dataset, \
1771 google.cloud.bigquery.dataset.DatasetReference, \
1772 google.cloud.bigquery.dataset.DatasetListItem, \
1773 str, \
1774 ]):
1775 A reference to the dataset to delete. If a string is passed
1776 in, this method attempts to create a dataset reference from a
1777 string using
1778 :func:`google.cloud.bigquery.dataset.DatasetReference.from_string`.
1779 delete_contents (Optional[bool]):
1780 If True, delete all the tables in the dataset. If False and
1781 the dataset contains tables, the request will fail.
1782 Default is False.
1783 retry (Optional[google.api_core.retry.Retry]):
1784 How to retry the RPC.
1785 timeout (Optional[float]):
1786 The number of seconds to wait for the underlying HTTP transport
1787 before using ``retry``.
1788 not_found_ok (Optional[bool]):
1789 Defaults to ``False``. If ``True``, ignore "not found" errors
1790 when deleting the dataset.
1791 """
1792 dataset = self._dataset_from_arg(dataset)
1793 params = {}
1794 path = dataset.path
1795 if delete_contents:
1796 params["deleteContents"] = "true"
1797 span_attributes = {"path": path, "deleteContents": delete_contents}
1798 else:
1799 span_attributes = {"path": path}
1800
1801 try:
1802 self._call_api(
1803 retry,
1804 span_name="BigQuery.deleteDataset",
1805 span_attributes=span_attributes,
1806 method="DELETE",
1807 path=path,
1808 query_params=params,
1809 timeout=timeout,
1810 )
1811 except core_exceptions.NotFound:
1812 if not not_found_ok:
1813 raise
1814
1815 def delete_model(
1816 self,
1817 model: Union[Model, ModelReference, str],
1818 retry: retries.Retry = DEFAULT_RETRY,
1819 timeout: TimeoutType = DEFAULT_TIMEOUT,
1820 not_found_ok: bool = False,
1821 ) -> None:
1822 """[Beta] Delete a model
1823
1824 See
1825 https://cloud.google.com/bigquery/docs/reference/rest/v2/models/delete
1826
1827 Args:
1828 model (Union[ \
1829 google.cloud.bigquery.model.Model, \
1830 google.cloud.bigquery.model.ModelReference, \
1831 str, \
1832 ]):
1833 A reference to the model to delete. If a string is passed in,
1834 this method attempts to create a model reference from a
1835 string using
1836 :func:`google.cloud.bigquery.model.ModelReference.from_string`.
1837 retry (Optional[google.api_core.retry.Retry]):
1838 How to retry the RPC.
1839 timeout (Optional[float]):
1840 The number of seconds to wait for the underlying HTTP transport
1841 before using ``retry``.
1842 not_found_ok (Optional[bool]):
1843 Defaults to ``False``. If ``True``, ignore "not found" errors
1844 when deleting the model.
1845 """
1846 if isinstance(model, str):
1847 model = ModelReference.from_string(model, default_project=self.project)
1848
1849 if not isinstance(model, (Model, ModelReference)):
1850 raise TypeError("model must be a Model or a ModelReference")
1851
1852 path = model.path
1853 try:
1854 span_attributes = {"path": path}
1855 self._call_api(
1856 retry,
1857 span_name="BigQuery.deleteModel",
1858 span_attributes=span_attributes,
1859 method="DELETE",
1860 path=path,
1861 timeout=timeout,
1862 )
1863 except core_exceptions.NotFound:
1864 if not not_found_ok:
1865 raise
1866
1867 def delete_job_metadata(
1868 self,
1869 job_id: Union[str, LoadJob, CopyJob, ExtractJob, QueryJob],
1870 project: Optional[str] = None,
1871 location: Optional[str] = None,
1872 retry: retries.Retry = DEFAULT_RETRY,
1873 timeout: TimeoutType = DEFAULT_TIMEOUT,
1874 not_found_ok: bool = False,
1875 ):
1876 """[Beta] Delete job metadata from job history.
1877
1878 Note: This does not stop a running job. Use
1879 :func:`~google.cloud.bigquery.client.Client.cancel_job` instead.
1880
1881 Args:
1882 job_id (Union[ \
1883 str, \
1884 LoadJob, \
1885 CopyJob, \
1886 ExtractJob, \
1887 QueryJob \
1888 ]): Job or job identifier.
1889 project (Optional[str]):
1890 ID of the project which owns the job (defaults to the client's project).
1891 location (Optional[str]):
1892 Location where the job was run. Ignored if ``job_id`` is a job
1893 object.
1894 retry (Optional[google.api_core.retry.Retry]):
1895 How to retry the RPC.
1896 timeout (Optional[float]):
1897 The number of seconds to wait for the underlying HTTP transport
1898 before using ``retry``.
1899 not_found_ok (Optional[bool]):
1900 Defaults to ``False``. If ``True``, ignore "not found" errors
1901 when deleting the job.
1902 """
1903 extra_params = {}
1904
1905 project, location, job_id = _extract_job_reference(
1906 job_id, project=project, location=location
1907 )
1908
1909 if project is None:
1910 project = self.project
1911
1912 if location is None:
1913 location = self.location
1914
1915 # Location is always required for jobs.delete()
1916 extra_params["location"] = location
1917
1918 path = f"/projects/{project}/jobs/{job_id}/delete"
1919
1920 span_attributes = {"path": path, "job_id": job_id, "location": location}
1921
1922 try:
1923 self._call_api(
1924 retry,
1925 span_name="BigQuery.deleteJob",
1926 span_attributes=span_attributes,
1927 method="DELETE",
1928 path=path,
1929 query_params=extra_params,
1930 timeout=timeout,
1931 )
1932 except google.api_core.exceptions.NotFound:
1933 if not not_found_ok:
1934 raise
1935
1936 def delete_routine(
1937 self,
1938 routine: Union[Routine, RoutineReference, str],
1939 retry: retries.Retry = DEFAULT_RETRY,
1940 timeout: TimeoutType = DEFAULT_TIMEOUT,
1941 not_found_ok: bool = False,
1942 ) -> None:
1943 """[Beta] Delete a routine.
1944
1945 See
1946 https://cloud.google.com/bigquery/docs/reference/rest/v2/routines/delete
1947
1948 Args:
1949 routine (Union[ \
1950 google.cloud.bigquery.routine.Routine, \
1951 google.cloud.bigquery.routine.RoutineReference, \
1952 str, \
1953 ]):
1954 A reference to the routine to delete. If a string is passed
1955 in, this method attempts to create a routine reference from a
1956 string using
1957 :func:`google.cloud.bigquery.routine.RoutineReference.from_string`.
1958 retry (Optional[google.api_core.retry.Retry]):
1959 How to retry the RPC.
1960 timeout (Optional[float]):
1961 The number of seconds to wait for the underlying HTTP transport
1962 before using ``retry``.
1963 not_found_ok (Optional[bool]):
1964 Defaults to ``False``. If ``True``, ignore "not found" errors
1965 when deleting the routine.
1966 """
1967 if isinstance(routine, str):
1968 routine = RoutineReference.from_string(
1969 routine, default_project=self.project
1970 )
1971 path = routine.path
1972
1973 if not isinstance(routine, (Routine, RoutineReference)):
1974 raise TypeError("routine must be a Routine or a RoutineReference")
1975
1976 try:
1977 span_attributes = {"path": path}
1978 self._call_api(
1979 retry,
1980 span_name="BigQuery.deleteRoutine",
1981 span_attributes=span_attributes,
1982 method="DELETE",
1983 path=path,
1984 timeout=timeout,
1985 )
1986 except core_exceptions.NotFound:
1987 if not not_found_ok:
1988 raise
1989
1990 def delete_table(
1991 self,
1992 table: Union[Table, TableReference, TableListItem, str],
1993 retry: retries.Retry = DEFAULT_RETRY,
1994 timeout: TimeoutType = DEFAULT_TIMEOUT,
1995 not_found_ok: bool = False,
1996 ) -> None:
1997 """Delete a table
1998
1999 See
2000 https://cloud.google.com/bigquery/docs/reference/rest/v2/tables/delete
2001
2002 Args:
2003 table (Union[ \
2004 google.cloud.bigquery.table.Table, \
2005 google.cloud.bigquery.table.TableReference, \
2006 google.cloud.bigquery.table.TableListItem, \
2007 str, \
2008 ]):
2009 A reference to the table to delete. If a string is passed in,
2010 this method attempts to create a table reference from a
2011 string using
2012 :func:`google.cloud.bigquery.table.TableReference.from_string`.
2013 retry (Optional[google.api_core.retry.Retry]):
2014 How to retry the RPC.
2015 timeout (Optional[float]):
2016 The number of seconds to wait for the underlying HTTP transport
2017 before using ``retry``.
2018 not_found_ok (Optional[bool]):
2019 Defaults to ``False``. If ``True``, ignore "not found" errors
2020 when deleting the table.
2021 """
2022 table = _table_arg_to_table_ref(table, default_project=self.project)
2023 if not isinstance(table, TableReference):
2024 raise TypeError("Unable to get TableReference for table '{}'".format(table))
2025
2026 try:
2027 path = table.path
2028 span_attributes = {"path": path}
2029 self._call_api(
2030 retry,
2031 span_name="BigQuery.deleteTable",
2032 span_attributes=span_attributes,
2033 method="DELETE",
2034 path=path,
2035 timeout=timeout,
2036 )
2037 except core_exceptions.NotFound:
2038 if not not_found_ok:
2039 raise
2040
2041 def _get_query_results(
2042 self,
2043 job_id: str,
2044 retry: retries.Retry,
2045 project: Optional[str] = None,
2046 timeout_ms: Optional[int] = None,
2047 location: Optional[str] = None,
2048 timeout: TimeoutType = DEFAULT_TIMEOUT,
2049 page_size: int = 0,
2050 start_index: Optional[int] = None,
2051 ) -> _QueryResults:
2052 """Get the query results object for a query job.
2053
2054 Args:
2055 job_id (str): Name of the query job.
2056 retry (google.api_core.retry.Retry):
2057 How to retry the RPC.
2058 project (Optional[str]):
2059 Project ID for the query job (defaults to the project of the client).
2060 timeout_ms (Optional[int]):
2061 Number of milliseconds the the API call should wait for the query
2062 to complete before the request times out.
2063 location (Optional[str]): Location of the query job.
2064 timeout (Optional[float]):
2065 The number of seconds to wait for the underlying HTTP transport
2066 before using ``retry``. If set, this connection timeout may be
2067 increased to a minimum value. This prevents retries on what
2068 would otherwise be a successful response.
2069 page_size (Optional[int]):
2070 Maximum number of rows in a single response. See maxResults in
2071 the jobs.getQueryResults REST API.
2072 start_index (Optional[int]):
2073 Zero-based index of the starting row. See startIndex in the
2074 jobs.getQueryResults REST API.
2075
2076 Returns:
2077 google.cloud.bigquery.query._QueryResults:
2078 A new ``_QueryResults`` instance.
2079 """
2080
2081 extra_params: Dict[str, Any] = {"maxResults": page_size}
2082
2083 if timeout is not None:
2084 if not isinstance(timeout, (int, float)):
2085 timeout = _MIN_GET_QUERY_RESULTS_TIMEOUT
2086 else:
2087 timeout = max(timeout, _MIN_GET_QUERY_RESULTS_TIMEOUT)
2088
2089 if page_size > 0:
2090 extra_params["formatOptions.useInt64Timestamp"] = True
2091
2092 if project is None:
2093 project = self.project
2094
2095 if timeout_ms is not None:
2096 extra_params["timeoutMs"] = timeout_ms
2097
2098 if location is None:
2099 location = self.location
2100
2101 if location is not None:
2102 extra_params["location"] = location
2103
2104 if start_index is not None:
2105 extra_params["startIndex"] = start_index
2106
2107 path = "/projects/{}/queries/{}".format(project, job_id)
2108
2109 # This call is typically made in a polling loop that checks whether the
2110 # job is complete (from QueryJob.done(), called ultimately from
2111 # QueryJob.result()). So we don't need to poll here.
2112 span_attributes = {"path": path}
2113 resource = self._call_api(
2114 retry,
2115 span_name="BigQuery.getQueryResults",
2116 span_attributes=span_attributes,
2117 method="GET",
2118 path=path,
2119 query_params=extra_params,
2120 timeout=timeout,
2121 )
2122 return _QueryResults.from_api_repr(resource)
2123
2124 def job_from_resource(
2125 self, resource: dict
2126 ) -> Union[job.CopyJob, job.ExtractJob, job.LoadJob, job.QueryJob, job.UnknownJob]:
2127 """Detect correct job type from resource and instantiate.
2128
2129 Args:
2130 resource (Dict): one job resource from API response
2131
2132 Returns:
2133 Union[job.CopyJob, job.ExtractJob, job.LoadJob, job.QueryJob, job.UnknownJob]:
2134 The job instance, constructed via the resource.
2135 """
2136 config = resource.get("configuration", {})
2137 if "load" in config:
2138 return job.LoadJob.from_api_repr(resource, self)
2139 elif "copy" in config:
2140 return job.CopyJob.from_api_repr(resource, self)
2141 elif "extract" in config:
2142 return job.ExtractJob.from_api_repr(resource, self)
2143 elif "query" in config:
2144 return job.QueryJob.from_api_repr(resource, self)
2145 return job.UnknownJob.from_api_repr(resource, self)
2146
2147 def create_job(
2148 self,
2149 job_config: dict,
2150 retry: retries.Retry = DEFAULT_RETRY,
2151 timeout: TimeoutType = DEFAULT_TIMEOUT,
2152 ) -> Union[job.LoadJob, job.CopyJob, job.ExtractJob, job.QueryJob]:
2153 """Create a new job.
2154
2155 Args:
2156 job_config (dict): configuration job representation returned from the API.
2157 retry (Optional[google.api_core.retry.Retry]): How to retry the RPC.
2158 timeout (Optional[float]):
2159 The number of seconds to wait for the underlying HTTP transport
2160 before using ``retry``.
2161
2162 Returns:
2163 Union[ \
2164 google.cloud.bigquery.job.LoadJob, \
2165 google.cloud.bigquery.job.CopyJob, \
2166 google.cloud.bigquery.job.ExtractJob, \
2167 google.cloud.bigquery.job.QueryJob \
2168 ]:
2169 A new job instance.
2170 """
2171
2172 if "load" in job_config:
2173 load_job_config = google.cloud.bigquery.job.LoadJobConfig.from_api_repr(
2174 job_config
2175 )
2176 destination = _get_sub_prop(job_config, ["load", "destinationTable"])
2177 source_uris = _get_sub_prop(job_config, ["load", "sourceUris"])
2178 destination = TableReference.from_api_repr(destination)
2179 return self.load_table_from_uri(
2180 source_uris,
2181 destination,
2182 job_config=typing.cast(LoadJobConfig, load_job_config),
2183 retry=retry,
2184 timeout=timeout,
2185 )
2186 elif "copy" in job_config:
2187 copy_job_config = google.cloud.bigquery.job.CopyJobConfig.from_api_repr(
2188 job_config
2189 )
2190 destination = _get_sub_prop(job_config, ["copy", "destinationTable"])
2191 destination = TableReference.from_api_repr(destination)
2192 return self.copy_table(
2193 [], # Source table(s) already in job_config resource.
2194 destination,
2195 job_config=typing.cast(CopyJobConfig, copy_job_config),
2196 retry=retry,
2197 timeout=timeout,
2198 )
2199 elif "extract" in job_config:
2200 extract_job_config = (
2201 google.cloud.bigquery.job.ExtractJobConfig.from_api_repr(job_config)
2202 )
2203 source = _get_sub_prop(job_config, ["extract", "sourceTable"])
2204 if source:
2205 source_type = "Table"
2206 source = TableReference.from_api_repr(source)
2207 else:
2208 source = _get_sub_prop(job_config, ["extract", "sourceModel"])
2209 source_type = "Model"
2210 source = ModelReference.from_api_repr(source)
2211 destination_uris = _get_sub_prop(job_config, ["extract", "destinationUris"])
2212 return self.extract_table(
2213 source,
2214 destination_uris,
2215 job_config=typing.cast(ExtractJobConfig, extract_job_config),
2216 retry=retry,
2217 timeout=timeout,
2218 source_type=source_type,
2219 )
2220 elif "query" in job_config:
2221 query_job_config = google.cloud.bigquery.job.QueryJobConfig.from_api_repr(
2222 job_config
2223 )
2224 query = _get_sub_prop(job_config, ["query", "query"])
2225 return self.query(
2226 query,
2227 job_config=typing.cast(QueryJobConfig, query_job_config),
2228 retry=retry,
2229 timeout=timeout,
2230 )
2231 else:
2232 raise TypeError("Invalid job configuration received.")
2233
2234 def get_job(
2235 self,
2236 job_id: Union[str, job.LoadJob, job.CopyJob, job.ExtractJob, job.QueryJob],
2237 project: Optional[str] = None,
2238 location: Optional[str] = None,
2239 retry: retries.Retry = DEFAULT_RETRY,
2240 timeout: TimeoutType = DEFAULT_GET_JOB_TIMEOUT,
2241 ) -> Union[job.LoadJob, job.CopyJob, job.ExtractJob, job.QueryJob, job.UnknownJob]:
2242 """Fetch a job for the project associated with this client.
2243
2244 See
2245 https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/get
2246
2247 Args:
2248 job_id (Union[ \
2249 str, \
2250 job.LoadJob, \
2251 job.CopyJob, \
2252 job.ExtractJob, \
2253 job.QueryJob \
2254 ]):
2255 Job identifier.
2256 project (Optional[str]):
2257 ID of the project which owns the job (defaults to the client's project).
2258 location (Optional[str]):
2259 Location where the job was run. Ignored if ``job_id`` is a job
2260 object.
2261 retry (Optional[google.api_core.retry.Retry]):
2262 How to retry the RPC.
2263 timeout (Optional[float]):
2264 The number of seconds to wait for the underlying HTTP transport
2265 before using ``retry``.
2266
2267 Returns:
2268 Union[job.LoadJob, job.CopyJob, job.ExtractJob, job.QueryJob, job.UnknownJob]:
2269 Job instance, based on the resource returned by the API.
2270 """
2271 extra_params = {"projection": "full"}
2272
2273 project, location, job_id = _extract_job_reference(
2274 job_id, project=project, location=location
2275 )
2276
2277 if project is None:
2278 project = self.project
2279
2280 if location is None:
2281 location = self.location
2282
2283 if location is not None:
2284 extra_params["location"] = location
2285
2286 path = "/projects/{}/jobs/{}".format(project, job_id)
2287
2288 span_attributes = {"path": path, "job_id": job_id, "location": location}
2289
2290 resource = self._call_api(
2291 retry,
2292 span_name="BigQuery.getJob",
2293 span_attributes=span_attributes,
2294 method="GET",
2295 path=path,
2296 query_params=extra_params,
2297 timeout=timeout,
2298 )
2299
2300 return self.job_from_resource(resource)
2301
2302 def cancel_job(
2303 self,
2304 job_id: str,
2305 project: Optional[str] = None,
2306 location: Optional[str] = None,
2307 retry: retries.Retry = DEFAULT_RETRY,
2308 timeout: TimeoutType = DEFAULT_TIMEOUT,
2309 ) -> Union[job.LoadJob, job.CopyJob, job.ExtractJob, job.QueryJob]:
2310 """Attempt to cancel a job from a job ID.
2311
2312 See
2313 https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/cancel
2314
2315 Args:
2316 job_id (Union[ \
2317 str, \
2318 google.cloud.bigquery.job.LoadJob, \
2319 google.cloud.bigquery.job.CopyJob, \
2320 google.cloud.bigquery.job.ExtractJob, \
2321 google.cloud.bigquery.job.QueryJob \
2322 ]): Job identifier.
2323 project (Optional[str]):
2324 ID of the project which owns the job (defaults to the client's project).
2325 location (Optional[str]):
2326 Location where the job was run. Ignored if ``job_id`` is a job
2327 object.
2328 retry (Optional[google.api_core.retry.Retry]):
2329 How to retry the RPC.
2330 timeout (Optional[float]):
2331 The number of seconds to wait for the underlying HTTP transport
2332 before using ``retry``.
2333
2334 Returns:
2335 Union[ \
2336 google.cloud.bigquery.job.LoadJob, \
2337 google.cloud.bigquery.job.CopyJob, \
2338 google.cloud.bigquery.job.ExtractJob, \
2339 google.cloud.bigquery.job.QueryJob, \
2340 ]:
2341 Job instance, based on the resource returned by the API.
2342 """
2343 extra_params = {"projection": "full"}
2344
2345 project, location, job_id = _extract_job_reference(
2346 job_id, project=project, location=location
2347 )
2348
2349 if project is None:
2350 project = self.project
2351
2352 if location is None:
2353 location = self.location
2354
2355 if location is not None:
2356 extra_params["location"] = location
2357
2358 path = "/projects/{}/jobs/{}/cancel".format(project, job_id)
2359
2360 span_attributes = {"path": path, "job_id": job_id, "location": location}
2361
2362 resource = self._call_api(
2363 retry,
2364 span_name="BigQuery.cancelJob",
2365 span_attributes=span_attributes,
2366 method="POST",
2367 path=path,
2368 query_params=extra_params,
2369 timeout=timeout,
2370 )
2371
2372 job_instance = self.job_from_resource(resource["job"]) # never an UnknownJob
2373
2374 return typing.cast(
2375 Union[job.LoadJob, job.CopyJob, job.ExtractJob, job.QueryJob],
2376 job_instance,
2377 )
2378
2379 def list_jobs(
2380 self,
2381 project: Optional[str] = None,
2382 parent_job: Optional[Union[QueryJob, str]] = None,
2383 max_results: Optional[int] = None,
2384 page_token: Optional[str] = None,
2385 all_users: Optional[bool] = None,
2386 state_filter: Optional[str] = None,
2387 retry: retries.Retry = DEFAULT_RETRY,
2388 timeout: TimeoutType = DEFAULT_TIMEOUT,
2389 min_creation_time: Optional[datetime.datetime] = None,
2390 max_creation_time: Optional[datetime.datetime] = None,
2391 page_size: Optional[int] = None,
2392 ) -> page_iterator.Iterator:
2393 """List jobs for the project associated with this client.
2394
2395 See
2396 https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/list
2397
2398 Args:
2399 project (Optional[str]):
2400 Project ID to use for retreiving datasets. Defaults
2401 to the client's project.
2402 parent_job (Optional[Union[ \
2403 google.cloud.bigquery.job._AsyncJob, \
2404 str, \
2405 ]]):
2406 If set, retrieve only child jobs of the specified parent.
2407 max_results (Optional[int]):
2408 Maximum number of jobs to return.
2409 page_token (Optional[str]):
2410 Opaque marker for the next "page" of jobs. If not
2411 passed, the API will return the first page of jobs. The token
2412 marks the beginning of the iterator to be returned and the
2413 value of the ``page_token`` can be accessed at
2414 ``next_page_token`` of
2415 :class:`~google.api_core.page_iterator.HTTPIterator`.
2416 all_users (Optional[bool]):
2417 If true, include jobs owned by all users in the project.
2418 Defaults to :data:`False`.
2419 state_filter (Optional[str]):
2420 If set, include only jobs matching the given state. One of:
2421 * ``"done"``
2422 * ``"pending"``
2423 * ``"running"``
2424 retry (Optional[google.api_core.retry.Retry]):
2425 How to retry the RPC.
2426 timeout (Optional[float]):
2427 The number of seconds to wait for the underlying HTTP transport
2428 before using ``retry``.
2429 min_creation_time (Optional[datetime.datetime]):
2430 Min value for job creation time. If set, only jobs created
2431 after or at this timestamp are returned. If the datetime has
2432 no time zone assumes UTC time.
2433 max_creation_time (Optional[datetime.datetime]):
2434 Max value for job creation time. If set, only jobs created
2435 before or at this timestamp are returned. If the datetime has
2436 no time zone assumes UTC time.
2437 page_size (Optional[int]):
2438 Maximum number of jobs to return per page.
2439
2440 Returns:
2441 google.api_core.page_iterator.Iterator:
2442 Iterable of job instances.
2443 """
2444 if isinstance(parent_job, job._AsyncJob):
2445 parent_job = parent_job.job_id # pytype: disable=attribute-error
2446
2447 extra_params = {
2448 "allUsers": all_users,
2449 "stateFilter": state_filter,
2450 "minCreationTime": _str_or_none(
2451 google.cloud._helpers._millis_from_datetime(min_creation_time)
2452 ),
2453 "maxCreationTime": _str_or_none(
2454 google.cloud._helpers._millis_from_datetime(max_creation_time)
2455 ),
2456 "projection": "full",
2457 "parentJobId": parent_job,
2458 }
2459
2460 extra_params = {
2461 param: value for param, value in extra_params.items() if value is not None
2462 }
2463
2464 if project is None:
2465 project = self.project
2466
2467 path = "/projects/%s/jobs" % (project,)
2468
2469 span_attributes = {"path": path}
2470
2471 def api_request(*args, **kwargs):
2472 return self._call_api(
2473 retry,
2474 span_name="BigQuery.listJobs",
2475 span_attributes=span_attributes,
2476 *args,
2477 timeout=timeout,
2478 **kwargs,
2479 )
2480
2481 return page_iterator.HTTPIterator(
2482 client=self,
2483 api_request=api_request,
2484 path=path,
2485 item_to_value=_item_to_job,
2486 items_key="jobs",
2487 page_token=page_token,
2488 max_results=max_results,
2489 extra_params=extra_params,
2490 page_size=page_size,
2491 )
2492
2493 def load_table_from_uri(
2494 self,
2495 source_uris: Union[str, Sequence[str]],
2496 destination: Union[Table, TableReference, TableListItem, str],
2497 job_id: Optional[str] = None,
2498 job_id_prefix: Optional[str] = None,
2499 location: Optional[str] = None,
2500 project: Optional[str] = None,
2501 job_config: Optional[LoadJobConfig] = None,
2502 retry: retries.Retry = DEFAULT_RETRY,
2503 timeout: TimeoutType = DEFAULT_TIMEOUT,
2504 ) -> job.LoadJob:
2505 """Starts a job for loading data into a table from Cloud Storage.
2506
2507 See
2508 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationload
2509
2510 Args:
2511 source_uris (Union[str, Sequence[str]]):
2512 URIs of data files to be loaded; in format
2513 ``gs://<bucket_name>/<object_name_or_glob>``.
2514 destination (Union[ \
2515 google.cloud.bigquery.table.Table, \
2516 google.cloud.bigquery.table.TableReference, \
2517 google.cloud.bigquery.table.TableListItem, \
2518 str, \
2519 ]):
2520 Table into which data is to be loaded. If a string is passed
2521 in, this method attempts to create a table reference from a
2522 string using
2523 :func:`google.cloud.bigquery.table.TableReference.from_string`.
2524 job_id (Optional[str]): Name of the job.
2525 job_id_prefix (Optional[str]):
2526 The user-provided prefix for a randomly generated job ID.
2527 This parameter will be ignored if a ``job_id`` is also given.
2528 location (Optional[str]):
2529 Location where to run the job. Must match the location of the
2530 destination table.
2531 project (Optional[str]):
2532 Project ID of the project of where to run the job. Defaults
2533 to the client's project.
2534 job_config (Optional[google.cloud.bigquery.job.LoadJobConfig]):
2535 Extra configuration options for the job.
2536 retry (Optional[google.api_core.retry.Retry]):
2537 How to retry the RPC.
2538 timeout (Optional[float]):
2539 The number of seconds to wait for the underlying HTTP transport
2540 before using ``retry``.
2541
2542 Returns:
2543 google.cloud.bigquery.job.LoadJob: A new load job.
2544
2545 Raises:
2546 TypeError:
2547 If ``job_config`` is not an instance of
2548 :class:`~google.cloud.bigquery.job.LoadJobConfig` class.
2549 """
2550 job_id = _make_job_id(job_id, job_id_prefix)
2551
2552 if project is None:
2553 project = self.project
2554
2555 if location is None:
2556 location = self.location
2557
2558 job_ref = job._JobReference(job_id, project=project, location=location)
2559
2560 if isinstance(source_uris, str):
2561 source_uris = [source_uris]
2562
2563 destination = _table_arg_to_table_ref(destination, default_project=self.project)
2564
2565 if job_config is not None:
2566 _verify_job_config_type(job_config, LoadJobConfig)
2567 else:
2568 job_config = job.LoadJobConfig()
2569
2570 new_job_config = job_config._fill_from_default(self._default_load_job_config)
2571
2572 load_job = job.LoadJob(job_ref, source_uris, destination, self, new_job_config)
2573 load_job._begin(retry=retry, timeout=timeout)
2574
2575 return load_job
2576
2577 def load_table_from_file(
2578 self,
2579 file_obj: IO[bytes],
2580 destination: Union[Table, TableReference, TableListItem, str],
2581 rewind: bool = False,
2582 size: Optional[int] = None,
2583 num_retries: int = _DEFAULT_NUM_RETRIES,
2584 job_id: Optional[str] = None,
2585 job_id_prefix: Optional[str] = None,
2586 location: Optional[str] = None,
2587 project: Optional[str] = None,
2588 job_config: Optional[LoadJobConfig] = None,
2589 timeout: ResumableTimeoutType = DEFAULT_TIMEOUT,
2590 ) -> job.LoadJob:
2591 """Upload the contents of this table from a file-like object.
2592
2593 Similar to :meth:`load_table_from_uri`, this method creates, starts and
2594 returns a :class:`~google.cloud.bigquery.job.LoadJob`.
2595
2596 Args:
2597 file_obj (IO[bytes]):
2598 A file handle opened in binary mode for reading.
2599 destination (Union[Table, \
2600 TableReference, \
2601 TableListItem, \
2602 str \
2603 ]):
2604 Table into which data is to be loaded. If a string is passed
2605 in, this method attempts to create a table reference from a
2606 string using
2607 :func:`google.cloud.bigquery.table.TableReference.from_string`.
2608 rewind (Optional[bool]):
2609 If True, seek to the beginning of the file handle before
2610 reading the file. Defaults to False.
2611 size (Optional[int]):
2612 The number of bytes to read from the file handle. If size is
2613 ``None`` or large, resumable upload will be used. Otherwise,
2614 multipart upload will be used.
2615 num_retries (Optional[int]): Number of upload retries. Defaults to 6.
2616 job_id (Optional[str]): Name of the job.
2617 job_id_prefix (Optional[str]):
2618 The user-provided prefix for a randomly generated job ID.
2619 This parameter will be ignored if a ``job_id`` is also given.
2620 location (Optional[str]):
2621 Location where to run the job. Must match the location of the
2622 destination table.
2623 project (Optional[str]):
2624 Project ID of the project of where to run the job. Defaults
2625 to the client's project.
2626 job_config (Optional[LoadJobConfig]):
2627 Extra configuration options for the job.
2628 timeout (Optional[float]):
2629 The number of seconds to wait for the underlying HTTP transport
2630 before using ``retry``. Depending on the retry strategy, a request
2631 may be repeated several times using the same timeout each time.
2632 Defaults to None.
2633
2634 Can also be passed as a tuple (connect_timeout, read_timeout).
2635 See :meth:`requests.Session.request` documentation for details.
2636
2637 Returns:
2638 google.cloud.bigquery.job.LoadJob: A new load job.
2639
2640 Raises:
2641 ValueError:
2642 If ``size`` is not passed in and can not be determined, or if
2643 the ``file_obj`` can be detected to be a file opened in text
2644 mode.
2645
2646 TypeError:
2647 If ``job_config`` is not an instance of
2648 :class:`~google.cloud.bigquery.job.LoadJobConfig` class.
2649 """
2650 job_id = _make_job_id(job_id, job_id_prefix)
2651
2652 if project is None:
2653 project = self.project
2654
2655 if location is None:
2656 location = self.location
2657
2658 destination = _table_arg_to_table_ref(destination, default_project=self.project)
2659 job_ref = job._JobReference(job_id, project=project, location=location)
2660
2661 if job_config is not None:
2662 _verify_job_config_type(job_config, LoadJobConfig)
2663 else:
2664 job_config = job.LoadJobConfig()
2665
2666 new_job_config = job_config._fill_from_default(self._default_load_job_config)
2667
2668 load_job = job.LoadJob(job_ref, None, destination, self, new_job_config)
2669 job_resource = load_job.to_api_repr()
2670
2671 if rewind:
2672 file_obj.seek(0, os.SEEK_SET)
2673
2674 _check_mode(file_obj)
2675
2676 try:
2677 if size is None or size >= _MAX_MULTIPART_SIZE:
2678 response = self._do_resumable_upload(
2679 file_obj, job_resource, num_retries, timeout, project=project
2680 )
2681 else:
2682 response = self._do_multipart_upload(
2683 file_obj, job_resource, size, num_retries, timeout, project=project
2684 )
2685 except resumable_media.InvalidResponse as exc:
2686 raise exceptions.from_http_response(exc.response)
2687
2688 return typing.cast(LoadJob, self.job_from_resource(response.json()))
2689
2690 def load_table_from_dataframe(
2691 self,
2692 dataframe: "pandas.DataFrame", # type: ignore
2693 destination: Union[Table, TableReference, str],
2694 num_retries: int = _DEFAULT_NUM_RETRIES,
2695 job_id: Optional[str] = None,
2696 job_id_prefix: Optional[str] = None,
2697 location: Optional[str] = None,
2698 project: Optional[str] = None,
2699 job_config: Optional[LoadJobConfig] = None,
2700 parquet_compression: str = "snappy",
2701 timeout: ResumableTimeoutType = DEFAULT_TIMEOUT,
2702 ) -> job.LoadJob:
2703 """Upload the contents of a table from a pandas DataFrame.
2704
2705 Similar to :meth:`load_table_from_uri`, this method creates, starts and
2706 returns a :class:`~google.cloud.bigquery.job.LoadJob`.
2707
2708 .. note::
2709
2710 REPEATED fields are NOT supported when using the CSV source format.
2711 They are supported when using the PARQUET source format, but
2712 due to the way they are encoded in the ``parquet`` file,
2713 a mismatch with the existing table schema can occur, so
2714 REPEATED fields are not properly supported when using ``pyarrow<4.0.0``
2715 using the parquet format.
2716
2717 https://github.com/googleapis/python-bigquery/issues/19
2718
2719 Args:
2720 dataframe (pandas.Dataframe):
2721 A :class:`~pandas.DataFrame` containing the data to load.
2722 destination (Union[ \
2723 Table, \
2724 TableReference, \
2725 str \
2726 ]):
2727 The destination table to use for loading the data. If it is an
2728 existing table, the schema of the :class:`~pandas.DataFrame`
2729 must match the schema of the destination table. If the table
2730 does not yet exist, the schema is inferred from the
2731 :class:`~pandas.DataFrame`.
2732
2733 If a string is passed in, this method attempts to create a
2734 table reference from a string using
2735 :func:`google.cloud.bigquery.table.TableReference.from_string`.
2736 num_retries (Optional[int]): Number of upload retries. Defaults to 6.
2737 job_id (Optional[str]): Name of the job.
2738 job_id_prefix (Optional[str]):
2739 The user-provided prefix for a randomly generated
2740 job ID. This parameter will be ignored if a ``job_id`` is
2741 also given.
2742 location (Optional[str]):
2743 Location where to run the job. Must match the location of the
2744 destination table.
2745 project (Optional[str]):
2746 Project ID of the project of where to run the job. Defaults
2747 to the client's project.
2748 job_config (Optional[LoadJobConfig]):
2749 Extra configuration options for the job.
2750
2751 To override the default pandas data type conversions, supply
2752 a value for
2753 :attr:`~google.cloud.bigquery.job.LoadJobConfig.schema` with
2754 column names matching those of the dataframe. The BigQuery
2755 schema is used to determine the correct data type conversion.
2756 Indexes are not loaded.
2757
2758 By default, this method uses the parquet source format. To
2759 override this, supply a value for
2760 :attr:`~google.cloud.bigquery.job.LoadJobConfig.source_format`
2761 with the format name. Currently only
2762 :attr:`~google.cloud.bigquery.job.SourceFormat.CSV` and
2763 :attr:`~google.cloud.bigquery.job.SourceFormat.PARQUET` are
2764 supported.
2765 parquet_compression (Optional[str]):
2766 [Beta] The compression method to use if intermittently
2767 serializing ``dataframe`` to a parquet file.
2768 Defaults to "snappy".
2769
2770 The argument is directly passed as the ``compression``
2771 argument to the underlying ``pyarrow.parquet.write_table()``
2772 method (the default value "snappy" gets converted to uppercase).
2773 https://arrow.apache.org/docs/python/generated/pyarrow.parquet.write_table.html#pyarrow-parquet-write-table
2774
2775 If the job config schema is missing, the argument is directly
2776 passed as the ``compression`` argument to the underlying
2777 ``DataFrame.to_parquet()`` method.
2778 https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_parquet.html#pandas.DataFrame.to_parquet
2779 timeout (Optional[flaot]):
2780 The number of seconds to wait for the underlying HTTP transport
2781 before using ``retry``. Depending on the retry strategy, a request may
2782 be repeated several times using the same timeout each time.
2783 Defaults to None.
2784
2785 Can also be passed as a tuple (connect_timeout, read_timeout).
2786 See :meth:`requests.Session.request` documentation for details.
2787
2788 Returns:
2789 google.cloud.bigquery.job.LoadJob: A new load job.
2790
2791 Raises:
2792 ValueError:
2793 If a usable parquet engine cannot be found. This method
2794 requires :mod:`pyarrow` to be installed.
2795 TypeError:
2796 If ``job_config`` is not an instance of
2797 :class:`~google.cloud.bigquery.job.LoadJobConfig` class.
2798 """
2799 job_id = _make_job_id(job_id, job_id_prefix)
2800
2801 if job_config is not None:
2802 _verify_job_config_type(job_config, LoadJobConfig)
2803 else:
2804 job_config = job.LoadJobConfig()
2805
2806 new_job_config = job_config._fill_from_default(self._default_load_job_config)
2807
2808 supported_formats = {job.SourceFormat.CSV, job.SourceFormat.PARQUET}
2809 if new_job_config.source_format is None:
2810 # default value
2811 new_job_config.source_format = job.SourceFormat.PARQUET
2812
2813 if (
2814 new_job_config.source_format == job.SourceFormat.PARQUET
2815 and new_job_config.parquet_options is None
2816 ):
2817 parquet_options = ParquetOptions()
2818 # default value
2819 parquet_options.enable_list_inference = True
2820 new_job_config.parquet_options = parquet_options
2821
2822 if new_job_config.source_format not in supported_formats:
2823 raise ValueError(
2824 "Got unexpected source_format: '{}'. Currently, only PARQUET and CSV are supported".format(
2825 new_job_config.source_format
2826 )
2827 )
2828
2829 if pyarrow is None and new_job_config.source_format == job.SourceFormat.PARQUET:
2830 # pyarrow is now the only supported parquet engine.
2831 raise ValueError("This method requires pyarrow to be installed")
2832
2833 if location is None:
2834 location = self.location
2835
2836 # If table schema is not provided, we try to fetch the existing table
2837 # schema, and check if dataframe schema is compatible with it - except
2838 # for WRITE_TRUNCATE jobs, the existing schema does not matter then.
2839 if (
2840 not new_job_config.schema
2841 and new_job_config.write_disposition != job.WriteDisposition.WRITE_TRUNCATE
2842 ):
2843 try:
2844 table = self.get_table(destination)
2845 except core_exceptions.NotFound:
2846 pass
2847 else:
2848 columns_and_indexes = frozenset(
2849 name
2850 for name, _ in _pandas_helpers.list_columns_and_indexes(dataframe)
2851 )
2852 new_job_config.schema = [
2853 # Field description and policy tags are not needed to
2854 # serialize a data frame.
2855 SchemaField(
2856 field.name,
2857 field.field_type,
2858 mode=field.mode,
2859 fields=field.fields,
2860 )
2861 # schema fields not present in the dataframe are not needed
2862 for field in table.schema
2863 if field.name in columns_and_indexes
2864 ]
2865
2866 new_job_config.schema = _pandas_helpers.dataframe_to_bq_schema(
2867 dataframe, new_job_config.schema
2868 )
2869
2870 if not new_job_config.schema:
2871 # the schema could not be fully detected
2872 warnings.warn(
2873 "Schema could not be detected for all columns. Loading from a "
2874 "dataframe without a schema will be deprecated in the future, "
2875 "please provide a schema.",
2876 PendingDeprecationWarning,
2877 stacklevel=2,
2878 )
2879
2880 tmpfd, tmppath = tempfile.mkstemp(
2881 suffix="_job_{}.{}".format(job_id[:8], new_job_config.source_format.lower())
2882 )
2883 os.close(tmpfd)
2884
2885 try:
2886 if new_job_config.source_format == job.SourceFormat.PARQUET:
2887 if new_job_config.schema:
2888 if parquet_compression == "snappy": # adjust the default value
2889 parquet_compression = parquet_compression.upper()
2890
2891 _pandas_helpers.dataframe_to_parquet(
2892 dataframe,
2893 new_job_config.schema,
2894 tmppath,
2895 parquet_compression=parquet_compression,
2896 parquet_use_compliant_nested_type=True,
2897 )
2898 else:
2899 dataframe.to_parquet(
2900 tmppath,
2901 engine="pyarrow",
2902 compression=parquet_compression,
2903 **(
2904 {"use_compliant_nested_type": True}
2905 if _versions_helpers.PYARROW_VERSIONS.use_compliant_nested_type
2906 else {}
2907 ),
2908 )
2909
2910 else:
2911 dataframe.to_csv(
2912 tmppath,
2913 index=False,
2914 header=False,
2915 encoding="utf-8",
2916 float_format="%.17g",
2917 date_format="%Y-%m-%d %H:%M:%S.%f",
2918 )
2919
2920 with open(tmppath, "rb") as tmpfile:
2921 file_size = os.path.getsize(tmppath)
2922 return self.load_table_from_file(
2923 tmpfile,
2924 destination,
2925 num_retries=num_retries,
2926 rewind=True,
2927 size=file_size,
2928 job_id=job_id,
2929 job_id_prefix=job_id_prefix,
2930 location=location,
2931 project=project,
2932 job_config=new_job_config,
2933 timeout=timeout,
2934 )
2935
2936 finally:
2937 os.remove(tmppath)
2938
2939 def load_table_from_json(
2940 self,
2941 json_rows: Iterable[Dict[str, Any]],
2942 destination: Union[Table, TableReference, TableListItem, str],
2943 num_retries: int = _DEFAULT_NUM_RETRIES,
2944 job_id: Optional[str] = None,
2945 job_id_prefix: Optional[str] = None,
2946 location: Optional[str] = None,
2947 project: Optional[str] = None,
2948 job_config: Optional[LoadJobConfig] = None,
2949 timeout: ResumableTimeoutType = DEFAULT_TIMEOUT,
2950 ) -> job.LoadJob:
2951 """Upload the contents of a table from a JSON string or dict.
2952
2953 Args:
2954 json_rows (Iterable[Dict[str, Any]]):
2955 Row data to be inserted. Keys must match the table schema fields
2956 and values must be JSON-compatible representations.
2957
2958 .. note::
2959
2960 If your data is already a newline-delimited JSON string,
2961 it is best to wrap it into a file-like object and pass it
2962 to :meth:`~google.cloud.bigquery.client.Client.load_table_from_file`::
2963
2964 import io
2965 from google.cloud import bigquery
2966
2967 data = u'{"foo": "bar"}'
2968 data_as_file = io.StringIO(data)
2969
2970 client = bigquery.Client()
2971 client.load_table_from_file(data_as_file, ...)
2972
2973 destination (Union[ \
2974 Table, \
2975 TableReference, \
2976 TableListItem, \
2977 str \
2978 ]):
2979 Table into which data is to be loaded. If a string is passed
2980 in, this method attempts to create a table reference from a
2981 string using
2982 :func:`google.cloud.bigquery.table.TableReference.from_string`.
2983 num_retries (Optional[int]): Number of upload retries. Defaults to 6.
2984 job_id (Optional[str]): Name of the job.
2985 job_id_prefix (Optional[str]):
2986 The user-provided prefix for a randomly generated job ID.
2987 This parameter will be ignored if a ``job_id`` is also given.
2988 location (Optional[str]):
2989 Location where to run the job. Must match the location of the
2990 destination table.
2991 project (Optional[str]):
2992 Project ID of the project of where to run the job. Defaults
2993 to the client's project.
2994 job_config (Optional[LoadJobConfig]):
2995 Extra configuration options for the job. The ``source_format``
2996 setting is always set to
2997 :attr:`~google.cloud.bigquery.job.SourceFormat.NEWLINE_DELIMITED_JSON`.
2998 timeout (Optional[float]):
2999 The number of seconds to wait for the underlying HTTP transport
3000 before using ``retry``. Depending on the retry strategy, a request may
3001 be repeated several times using the same timeout each time.
3002 Defaults to None.
3003
3004 Can also be passed as a tuple (connect_timeout, read_timeout).
3005 See :meth:`requests.Session.request` documentation for details.
3006
3007 Returns:
3008 google.cloud.bigquery.job.LoadJob: A new load job.
3009
3010 Raises:
3011 TypeError:
3012 If ``job_config`` is not an instance of
3013 :class:`~google.cloud.bigquery.job.LoadJobConfig` class.
3014 """
3015 job_id = _make_job_id(job_id, job_id_prefix)
3016
3017 if job_config is not None:
3018 _verify_job_config_type(job_config, LoadJobConfig)
3019 else:
3020 job_config = job.LoadJobConfig()
3021
3022 new_job_config = job_config._fill_from_default(self._default_load_job_config)
3023
3024 new_job_config.source_format = job.SourceFormat.NEWLINE_DELIMITED_JSON
3025
3026 # In specific conditions, we check if the table alread exists, and/or
3027 # set the autodetect value for the user. For exact conditions, see table
3028 # https://github.com/googleapis/python-bigquery/issues/1228#issuecomment-1910946297
3029 if new_job_config.schema is None and new_job_config.autodetect is None:
3030 if new_job_config.write_disposition in (
3031 job.WriteDisposition.WRITE_TRUNCATE,
3032 job.WriteDisposition.WRITE_EMPTY,
3033 ):
3034 new_job_config.autodetect = True
3035 else:
3036 try:
3037 self.get_table(destination)
3038 except core_exceptions.NotFound:
3039 new_job_config.autodetect = True
3040 else:
3041 new_job_config.autodetect = False
3042
3043 if project is None:
3044 project = self.project
3045
3046 if location is None:
3047 location = self.location
3048
3049 destination = _table_arg_to_table_ref(destination, default_project=self.project)
3050
3051 data_str = "\n".join(json.dumps(item, ensure_ascii=False) for item in json_rows)
3052 encoded_str = data_str.encode()
3053 data_file = io.BytesIO(encoded_str)
3054 return self.load_table_from_file(
3055 data_file,
3056 destination,
3057 size=len(encoded_str),
3058 num_retries=num_retries,
3059 job_id=job_id,
3060 job_id_prefix=job_id_prefix,
3061 location=location,
3062 project=project,
3063 job_config=new_job_config,
3064 timeout=timeout,
3065 )
3066
3067 def _do_resumable_upload(
3068 self,
3069 stream: IO[bytes],
3070 metadata: Mapping[str, str],
3071 num_retries: int,
3072 timeout: Optional[ResumableTimeoutType],
3073 project: Optional[str] = None,
3074 ) -> "requests.Response":
3075 """Perform a resumable upload.
3076
3077 Args:
3078 stream (IO[bytes]): A bytes IO object open for reading.
3079 metadata (Mapping[str, str]): The metadata associated with the upload.
3080 num_retries (int):
3081 Number of upload retries. (Deprecated: This
3082 argument will be removed in a future release.)
3083 timeout (Optional[float]):
3084 The number of seconds to wait for the underlying HTTP transport
3085 before using ``retry``. Depending on the retry strategy, a request may
3086 be repeated several times using the same timeout each time.
3087
3088 Can also be passed as a tuple (connect_timeout, read_timeout).
3089 See :meth:`requests.Session.request` documentation for details.
3090 project (Optional[str]):
3091 Project ID of the project of where to run the upload. Defaults
3092 to the client's project.
3093
3094 Returns:
3095 The "200 OK" response object returned after the final chunk
3096 is uploaded.
3097 """
3098 upload, transport = self._initiate_resumable_upload(
3099 stream, metadata, num_retries, timeout, project=project
3100 )
3101
3102 while not upload.finished:
3103 response = upload.transmit_next_chunk(transport, timeout=timeout)
3104
3105 return response
3106
3107 def _initiate_resumable_upload(
3108 self,
3109 stream: IO[bytes],
3110 metadata: Mapping[str, str],
3111 num_retries: int,
3112 timeout: Optional[ResumableTimeoutType],
3113 project: Optional[str] = None,
3114 ):
3115 """Initiate a resumable upload.
3116
3117 Args:
3118 stream (IO[bytes]): A bytes IO object open for reading.
3119 metadata (Mapping[str, str]): The metadata associated with the upload.
3120 num_retries (int):
3121 Number of upload retries. (Deprecated: This
3122 argument will be removed in a future release.)
3123 timeout (Optional[float]):
3124 The number of seconds to wait for the underlying HTTP transport
3125 before using ``retry``. Depending on the retry strategy, a request may
3126 be repeated several times using the same timeout each time.
3127
3128 Can also be passed as a tuple (connect_timeout, read_timeout).
3129 See :meth:`requests.Session.request` documentation for details.
3130 project (Optional[str]):
3131 Project ID of the project of where to run the upload. Defaults
3132 to the client's project.
3133
3134 Returns:
3135 Tuple:
3136 Pair of
3137
3138 * The :class:`~google.resumable_media.requests.ResumableUpload`
3139 that was created
3140 * The ``transport`` used to initiate the upload.
3141 """
3142 chunk_size = _DEFAULT_CHUNKSIZE
3143 transport = self._http
3144 headers = _get_upload_headers(self._connection.user_agent)
3145
3146 if project is None:
3147 project = self.project
3148 # TODO: Increase the minimum version of google-cloud-core to 1.6.0
3149 # and remove this logic. See:
3150 # https://github.com/googleapis/python-bigquery/issues/509
3151 hostname = (
3152 self._connection.API_BASE_URL
3153 if not hasattr(self._connection, "get_api_base_url_for_mtls")
3154 else self._connection.get_api_base_url_for_mtls()
3155 )
3156 upload_url = _RESUMABLE_URL_TEMPLATE.format(host=hostname, project=project)
3157
3158 # TODO: modify ResumableUpload to take a retry.Retry object
3159 # that it can use for the initial RPC.
3160 upload = ResumableUpload(upload_url, chunk_size, headers=headers)
3161
3162 if num_retries is not None:
3163 upload._retry_strategy = resumable_media.RetryStrategy(
3164 max_retries=num_retries
3165 )
3166
3167 upload.initiate(
3168 transport,
3169 stream,
3170 metadata,
3171 _GENERIC_CONTENT_TYPE,
3172 stream_final=False,
3173 timeout=timeout,
3174 )
3175
3176 return upload, transport
3177
3178 def _do_multipart_upload(
3179 self,
3180 stream: IO[bytes],
3181 metadata: Mapping[str, str],
3182 size: int,
3183 num_retries: int,
3184 timeout: Optional[ResumableTimeoutType],
3185 project: Optional[str] = None,
3186 ):
3187 """Perform a multipart upload.
3188
3189 Args:
3190 stream (IO[bytes]): A bytes IO object open for reading.
3191 metadata (Mapping[str, str]): The metadata associated with the upload.
3192 size (int):
3193 The number of bytes to be uploaded (which will be read
3194 from ``stream``). If not provided, the upload will be
3195 concluded once ``stream`` is exhausted (or :data:`None`).
3196 num_retries (int):
3197 Number of upload retries. (Deprecated: This
3198 argument will be removed in a future release.)
3199 timeout (Optional[float]):
3200 The number of seconds to wait for the underlying HTTP transport
3201 before using ``retry``. Depending on the retry strategy, a request may
3202 be repeated several times using the same timeout each time.
3203
3204 Can also be passed as a tuple (connect_timeout, read_timeout).
3205 See :meth:`requests.Session.request` documentation for details.
3206 project (Optional[str]):
3207 Project ID of the project of where to run the upload. Defaults
3208 to the client's project.
3209
3210 Returns:
3211 requests.Response:
3212 The "200 OK" response object returned after the multipart
3213 upload request.
3214
3215 Raises:
3216 ValueError:
3217 if the ``stream`` has fewer than ``size``
3218 bytes remaining.
3219 """
3220 data = stream.read(size)
3221 if len(data) < size:
3222 msg = _READ_LESS_THAN_SIZE.format(size, len(data))
3223 raise ValueError(msg)
3224
3225 headers = _get_upload_headers(self._connection.user_agent)
3226
3227 if project is None:
3228 project = self.project
3229
3230 # TODO: Increase the minimum version of google-cloud-core to 1.6.0
3231 # and remove this logic. See:
3232 # https://github.com/googleapis/python-bigquery/issues/509
3233 hostname = (
3234 self._connection.API_BASE_URL
3235 if not hasattr(self._connection, "get_api_base_url_for_mtls")
3236 else self._connection.get_api_base_url_for_mtls()
3237 )
3238 upload_url = _MULTIPART_URL_TEMPLATE.format(host=hostname, project=project)
3239 upload = MultipartUpload(upload_url, headers=headers)
3240
3241 if num_retries is not None:
3242 upload._retry_strategy = resumable_media.RetryStrategy(
3243 max_retries=num_retries
3244 )
3245
3246 response = upload.transmit(
3247 self._http, data, metadata, _GENERIC_CONTENT_TYPE, timeout=timeout
3248 )
3249
3250 return response
3251
3252 def copy_table(
3253 self,
3254 sources: Union[
3255 Table,
3256 TableReference,
3257 TableListItem,
3258 str,
3259 Sequence[Union[Table, TableReference, TableListItem, str]],
3260 ],
3261 destination: Union[Table, TableReference, TableListItem, str],
3262 job_id: Optional[str] = None,
3263 job_id_prefix: Optional[str] = None,
3264 location: Optional[str] = None,
3265 project: Optional[str] = None,
3266 job_config: Optional[CopyJobConfig] = None,
3267 retry: retries.Retry = DEFAULT_RETRY,
3268 timeout: TimeoutType = DEFAULT_TIMEOUT,
3269 ) -> job.CopyJob:
3270 """Copy one or more tables to another table.
3271
3272 See
3273 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationtablecopy
3274
3275 Args:
3276 sources (Union[ \
3277 google.cloud.bigquery.table.Table, \
3278 google.cloud.bigquery.table.TableReference, \
3279 google.cloud.bigquery.table.TableListItem, \
3280 str, \
3281 Sequence[ \
3282 Union[ \
3283 google.cloud.bigquery.table.Table, \
3284 google.cloud.bigquery.table.TableReference, \
3285 google.cloud.bigquery.table.TableListItem, \
3286 str, \
3287 ] \
3288 ], \
3289 ]):
3290 Table or tables to be copied.
3291 destination (Union[ \
3292 google.cloud.bigquery.table.Table, \
3293 google.cloud.bigquery.table.TableReference, \
3294 google.cloud.bigquery.table.TableListItem, \
3295 str, \
3296 ]):
3297 Table into which data is to be copied.
3298 job_id (Optional[str]): The ID of the job.
3299 job_id_prefix (Optional[str]):
3300 The user-provided prefix for a randomly generated job ID.
3301 This parameter will be ignored if a ``job_id`` is also given.
3302 location (Optional[str]):
3303 Location where to run the job. Must match the location of any
3304 source table as well as the destination table.
3305 project (Optional[str]):
3306 Project ID of the project of where to run the job. Defaults
3307 to the client's project.
3308 job_config (Optional[google.cloud.bigquery.job.CopyJobConfig]):
3309 Extra configuration options for the job.
3310 retry (Optional[google.api_core.retry.Retry]):
3311 How to retry the RPC.
3312 timeout (Optional[float]):
3313 The number of seconds to wait for the underlying HTTP transport
3314 before using ``retry``.
3315
3316 Returns:
3317 google.cloud.bigquery.job.CopyJob: A new copy job instance.
3318
3319 Raises:
3320 TypeError:
3321 If ``job_config`` is not an instance of :class:`~google.cloud.bigquery.job.CopyJobConfig`
3322 class.
3323 """
3324 job_id = _make_job_id(job_id, job_id_prefix)
3325
3326 if project is None:
3327 project = self.project
3328
3329 if location is None:
3330 location = self.location
3331
3332 job_ref = job._JobReference(job_id, project=project, location=location)
3333
3334 # sources can be one of many different input types. (string, Table,
3335 # TableReference, or a sequence of any of those.) Convert them all to a
3336 # list of TableReferences.
3337 #
3338 # _table_arg_to_table_ref leaves lists unmodified.
3339 sources = _table_arg_to_table_ref(sources, default_project=self.project)
3340
3341 if not isinstance(sources, collections_abc.Sequence):
3342 sources = [sources]
3343
3344 sources = [
3345 _table_arg_to_table_ref(source, default_project=self.project)
3346 for source in sources
3347 ]
3348
3349 destination = _table_arg_to_table_ref(destination, default_project=self.project)
3350
3351 if job_config:
3352 _verify_job_config_type(job_config, google.cloud.bigquery.job.CopyJobConfig)
3353 job_config = copy.deepcopy(job_config)
3354
3355 copy_job = job.CopyJob(
3356 job_ref, sources, destination, client=self, job_config=job_config
3357 )
3358 copy_job._begin(retry=retry, timeout=timeout)
3359
3360 return copy_job
3361
3362 def extract_table(
3363 self,
3364 source: Union[Table, TableReference, TableListItem, Model, ModelReference, str],
3365 destination_uris: Union[str, Sequence[str]],
3366 job_id: Optional[str] = None,
3367 job_id_prefix: Optional[str] = None,
3368 location: Optional[str] = None,
3369 project: Optional[str] = None,
3370 job_config: Optional[ExtractJobConfig] = None,
3371 retry: retries.Retry = DEFAULT_RETRY,
3372 timeout: TimeoutType = DEFAULT_TIMEOUT,
3373 source_type: str = "Table",
3374 ) -> job.ExtractJob:
3375 """Start a job to extract a table into Cloud Storage files.
3376
3377 See
3378 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationextract
3379
3380 Args:
3381 source (Union[ \
3382 google.cloud.bigquery.table.Table, \
3383 google.cloud.bigquery.table.TableReference, \
3384 google.cloud.bigquery.table.TableListItem, \
3385 google.cloud.bigquery.model.Model, \
3386 google.cloud.bigquery.model.ModelReference, \
3387 src, \
3388 ]):
3389 Table or Model to be extracted.
3390 destination_uris (Union[str, Sequence[str]]):
3391 URIs of Cloud Storage file(s) into which table data is to be
3392 extracted; in format
3393 ``gs://<bucket_name>/<object_name_or_glob>``.
3394 job_id (Optional[str]): The ID of the job.
3395 job_id_prefix (Optional[str]):
3396 The user-provided prefix for a randomly generated job ID.
3397 This parameter will be ignored if a ``job_id`` is also given.
3398 location (Optional[str]):
3399 Location where to run the job. Must match the location of the
3400 source table.
3401 project (Optional[str]):
3402 Project ID of the project of where to run the job. Defaults
3403 to the client's project.
3404 job_config (Optional[google.cloud.bigquery.job.ExtractJobConfig]):
3405 Extra configuration options for the job.
3406 retry (Optional[google.api_core.retry.Retry]):
3407 How to retry the RPC.
3408 timeout (Optional[float]):
3409 The number of seconds to wait for the underlying HTTP transport
3410 before using ``retry``.
3411 source_type (Optional[str]):
3412 Type of source to be extracted.``Table`` or ``Model``. Defaults to ``Table``.
3413 Returns:
3414 google.cloud.bigquery.job.ExtractJob: A new extract job instance.
3415
3416 Raises:
3417 TypeError:
3418 If ``job_config`` is not an instance of :class:`~google.cloud.bigquery.job.ExtractJobConfig`
3419 class.
3420 ValueError:
3421 If ``source_type`` is not among ``Table``,``Model``.
3422 """
3423 job_id = _make_job_id(job_id, job_id_prefix)
3424
3425 if project is None:
3426 project = self.project
3427
3428 if location is None:
3429 location = self.location
3430
3431 job_ref = job._JobReference(job_id, project=project, location=location)
3432 src = source_type.lower()
3433 if src == "table":
3434 source = _table_arg_to_table_ref(source, default_project=self.project)
3435 elif src == "model":
3436 source = _model_arg_to_model_ref(source, default_project=self.project)
3437 else:
3438 raise ValueError(
3439 "Cannot pass `{}` as a ``source_type``, pass Table or Model".format(
3440 source_type
3441 )
3442 )
3443
3444 if isinstance(destination_uris, str):
3445 destination_uris = [destination_uris]
3446
3447 if job_config:
3448 _verify_job_config_type(
3449 job_config, google.cloud.bigquery.job.ExtractJobConfig
3450 )
3451 job_config = copy.deepcopy(job_config)
3452
3453 extract_job = job.ExtractJob(
3454 job_ref, source, destination_uris, client=self, job_config=job_config
3455 )
3456 extract_job._begin(retry=retry, timeout=timeout)
3457
3458 return extract_job
3459
3460 def query(
3461 self,
3462 query: str,
3463 job_config: Optional[QueryJobConfig] = None,
3464 job_id: Optional[str] = None,
3465 job_id_prefix: Optional[str] = None,
3466 location: Optional[str] = None,
3467 project: Optional[str] = None,
3468 retry: retries.Retry = DEFAULT_RETRY,
3469 timeout: TimeoutType = DEFAULT_TIMEOUT,
3470 job_retry: Optional[retries.Retry] = DEFAULT_JOB_RETRY,
3471 api_method: Union[str, enums.QueryApiMethod] = enums.QueryApiMethod.INSERT,
3472 ) -> job.QueryJob:
3473 """Run a SQL query.
3474
3475 See
3476 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationquery
3477
3478 Args:
3479 query (str):
3480 SQL query to be executed. Defaults to the standard SQL
3481 dialect. Use the ``job_config`` parameter to change dialects.
3482 job_config (Optional[google.cloud.bigquery.job.QueryJobConfig]):
3483 Extra configuration options for the job.
3484 To override any options that were previously set in
3485 the ``default_query_job_config`` given to the
3486 ``Client`` constructor, manually set those options to ``None``,
3487 or whatever value is preferred.
3488 job_id (Optional[str]): ID to use for the query job.
3489 job_id_prefix (Optional[str]):
3490 The prefix to use for a randomly generated job ID. This parameter
3491 will be ignored if a ``job_id`` is also given.
3492 location (Optional[str]):
3493 Location where to run the job. Must match the location of the
3494 table used in the query as well as the destination table.
3495 project (Optional[str]):
3496 Project ID of the project of where to run the job. Defaults
3497 to the client's project.
3498 retry (Optional[google.api_core.retry.Retry]):
3499 How to retry the RPC. This only applies to making RPC
3500 calls. It isn't used to retry failed jobs. This has
3501 a reasonable default that should only be overridden
3502 with care.
3503 timeout (Optional[float]):
3504 The number of seconds to wait for the underlying HTTP transport
3505 before using ``retry``.
3506 job_retry (Optional[google.api_core.retry.Retry]):
3507 How to retry failed jobs. The default retries
3508 rate-limit-exceeded errors. Passing ``None`` disables
3509 job retry.
3510
3511 Not all jobs can be retried. If ``job_id`` is
3512 provided, then the job returned by the query will not
3513 be retryable, and an exception will be raised if a
3514 non-``None`` (and non-default) value for ``job_retry``
3515 is also provided.
3516
3517 Note that errors aren't detected until ``result()`` is
3518 called on the job returned. The ``job_retry``
3519 specified here becomes the default ``job_retry`` for
3520 ``result()``, where it can also be specified.
3521 api_method (Union[str, enums.QueryApiMethod]):
3522 Method with which to start the query job.
3523
3524 See :class:`google.cloud.bigquery.enums.QueryApiMethod` for
3525 details on the difference between the query start methods.
3526
3527 Returns:
3528 google.cloud.bigquery.job.QueryJob: A new query job instance.
3529
3530 Raises:
3531 TypeError:
3532 If ``job_config`` is not an instance of
3533 :class:`~google.cloud.bigquery.job.QueryJobConfig`
3534 class, or if both ``job_id`` and non-``None`` non-default
3535 ``job_retry`` are provided.
3536 """
3537 _job_helpers.validate_job_retry(job_id, job_retry)
3538
3539 job_id_given = job_id is not None
3540 if job_id_given and api_method == enums.QueryApiMethod.QUERY:
3541 raise TypeError(
3542 "`job_id` was provided, but the 'QUERY' `api_method` was requested."
3543 )
3544
3545 if project is None:
3546 project = self.project
3547
3548 if location is None:
3549 location = self.location
3550
3551 if job_config is not None:
3552 _verify_job_config_type(job_config, QueryJobConfig)
3553
3554 job_config = _job_helpers.job_config_with_defaults(
3555 job_config, self._default_query_job_config
3556 )
3557
3558 # Note that we haven't modified the original job_config (or
3559 # _default_query_job_config) up to this point.
3560 if api_method == enums.QueryApiMethod.QUERY:
3561 return _job_helpers.query_jobs_query(
3562 self,
3563 query,
3564 job_config,
3565 location,
3566 project,
3567 retry,
3568 timeout,
3569 job_retry,
3570 )
3571 elif api_method == enums.QueryApiMethod.INSERT:
3572 return _job_helpers.query_jobs_insert(
3573 self,
3574 query,
3575 job_config,
3576 job_id,
3577 job_id_prefix,
3578 location,
3579 project,
3580 retry,
3581 timeout,
3582 job_retry,
3583 )
3584 else:
3585 raise ValueError(f"Got unexpected value for api_method: {repr(api_method)}")
3586
3587 def query_and_wait(
3588 self,
3589 query,
3590 *,
3591 job_config: Optional[QueryJobConfig] = None,
3592 location: Optional[str] = None,
3593 project: Optional[str] = None,
3594 api_timeout: TimeoutType = DEFAULT_TIMEOUT,
3595 wait_timeout: Union[Optional[float], object] = POLLING_DEFAULT_VALUE,
3596 retry: retries.Retry = DEFAULT_RETRY,
3597 job_retry: retries.Retry = DEFAULT_JOB_RETRY,
3598 page_size: Optional[int] = None,
3599 max_results: Optional[int] = None,
3600 ) -> RowIterator:
3601 """Run the query, wait for it to finish, and return the results.
3602
3603 Args:
3604 query (str):
3605 SQL query to be executed. Defaults to the standard SQL
3606 dialect. Use the ``job_config`` parameter to change dialects.
3607 job_config (Optional[google.cloud.bigquery.job.QueryJobConfig]):
3608 Extra configuration options for the job.
3609 To override any options that were previously set in
3610 the ``default_query_job_config`` given to the
3611 ``Client`` constructor, manually set those options to ``None``,
3612 or whatever value is preferred.
3613 location (Optional[str]):
3614 Location where to run the job. Must match the location of the
3615 table used in the query as well as the destination table.
3616 project (Optional[str]):
3617 Project ID of the project of where to run the job. Defaults
3618 to the client's project.
3619 api_timeout (Optional[float]):
3620 The number of seconds to wait for the underlying HTTP transport
3621 before using ``retry``.
3622 wait_timeout (Optional[Union[float, object]]):
3623 The number of seconds to wait for the query to finish. If the
3624 query doesn't finish before this timeout, the client attempts
3625 to cancel the query. If unset, the underlying REST API calls
3626 have timeouts, but we still wait indefinitely for the job to
3627 finish.
3628 retry (Optional[google.api_core.retry.Retry]):
3629 How to retry the RPC. This only applies to making RPC
3630 calls. It isn't used to retry failed jobs. This has
3631 a reasonable default that should only be overridden
3632 with care.
3633 job_retry (Optional[google.api_core.retry.Retry]):
3634 How to retry failed jobs. The default retries
3635 rate-limit-exceeded errors. Passing ``None`` disables
3636 job retry. Not all jobs can be retried.
3637 page_size (Optional[int]):
3638 The maximum number of rows in each page of results from the
3639 initial jobs.query request. Non-positive values are ignored.
3640 max_results (Optional[int]):
3641 The maximum total number of rows from this request.
3642
3643 Returns:
3644 google.cloud.bigquery.table.RowIterator:
3645 Iterator of row data
3646 :class:`~google.cloud.bigquery.table.Row`-s. During each
3647 page, the iterator will have the ``total_rows`` attribute
3648 set, which counts the total number of rows **in the result
3649 set** (this is distinct from the total number of rows in the
3650 current page: ``iterator.page.num_items``).
3651
3652 If the query is a special query that produces no results, e.g.
3653 a DDL query, an ``_EmptyRowIterator`` instance is returned.
3654
3655 Raises:
3656 TypeError:
3657 If ``job_config`` is not an instance of
3658 :class:`~google.cloud.bigquery.job.QueryJobConfig`
3659 class.
3660 """
3661 return self._query_and_wait_bigframes(
3662 query,
3663 job_config=job_config,
3664 location=location,
3665 project=project,
3666 api_timeout=api_timeout,
3667 wait_timeout=wait_timeout,
3668 retry=retry,
3669 job_retry=job_retry,
3670 page_size=page_size,
3671 max_results=max_results,
3672 )
3673
3674 def _query_and_wait_bigframes(
3675 self,
3676 query,
3677 *,
3678 job_config: Optional[QueryJobConfig] = None,
3679 location: Optional[str] = None,
3680 project: Optional[str] = None,
3681 api_timeout: TimeoutType = DEFAULT_TIMEOUT,
3682 wait_timeout: Union[Optional[float], object] = POLLING_DEFAULT_VALUE,
3683 retry: retries.Retry = DEFAULT_RETRY,
3684 job_retry: retries.Retry = DEFAULT_JOB_RETRY,
3685 page_size: Optional[int] = None,
3686 max_results: Optional[int] = None,
3687 callback: Callable = lambda _: None,
3688 ) -> RowIterator:
3689 """See query_and_wait.
3690
3691 This method has an extra callback parameter, which is used by bigframes
3692 to create better progress bars.
3693 """
3694 if project is None:
3695 project = self.project
3696
3697 if location is None:
3698 location = self.location
3699
3700 if job_config is not None:
3701 _verify_job_config_type(job_config, QueryJobConfig)
3702
3703 job_config = _job_helpers.job_config_with_defaults(
3704 job_config, self._default_query_job_config
3705 )
3706
3707 return _job_helpers.query_and_wait(
3708 self,
3709 query,
3710 job_config=job_config,
3711 location=location,
3712 project=project,
3713 api_timeout=api_timeout,
3714 wait_timeout=wait_timeout,
3715 retry=retry,
3716 job_retry=job_retry,
3717 page_size=page_size,
3718 max_results=max_results,
3719 callback=callback,
3720 )
3721
3722 def insert_rows(
3723 self,
3724 table: Union[Table, TableReference, str],
3725 rows: Union[Iterable[Tuple], Iterable[Mapping[str, Any]]],
3726 selected_fields: Optional[Sequence[SchemaField]] = None,
3727 **kwargs,
3728 ) -> Sequence[Dict[str, Any]]:
3729 """Insert rows into a table via the streaming API.
3730
3731 See
3732 https://cloud.google.com/bigquery/docs/reference/rest/v2/tabledata/insertAll
3733
3734 BigQuery will reject insertAll payloads that exceed a defined limit (10MB).
3735 Additionally, if a payload vastly exceeds this limit, the request is rejected
3736 by the intermediate architecture, which returns a 413 (Payload Too Large) status code.
3737
3738
3739 See
3740 https://cloud.google.com/bigquery/quotas#streaming_inserts
3741
3742 Args:
3743 table (Union[ \
3744 google.cloud.bigquery.table.Table, \
3745 google.cloud.bigquery.table.TableReference, \
3746 str, \
3747 ]):
3748 The destination table for the row data, or a reference to it.
3749 rows (Union[Sequence[Tuple], Sequence[Dict]]):
3750 Row data to be inserted. If a list of tuples is given, each
3751 tuple should contain data for each schema field on the
3752 current table and in the same order as the schema fields. If
3753 a list of dictionaries is given, the keys must include all
3754 required fields in the schema. Keys which do not correspond
3755 to a field in the schema are ignored.
3756 selected_fields (Sequence[google.cloud.bigquery.schema.SchemaField]):
3757 The fields to return. Required if ``table`` is a
3758 :class:`~google.cloud.bigquery.table.TableReference`.
3759 kwargs (dict):
3760 Keyword arguments to
3761 :meth:`~google.cloud.bigquery.client.Client.insert_rows_json`.
3762
3763 Returns:
3764 Sequence[Mappings]:
3765 One mapping per row with insert errors: the "index" key
3766 identifies the row, and the "errors" key contains a list of
3767 the mappings describing one or more problems with the row.
3768
3769 Raises:
3770 ValueError: if table's schema is not set or `rows` is not a `Sequence`.
3771 """
3772 if not isinstance(rows, (collections_abc.Sequence, collections_abc.Iterator)):
3773 raise TypeError("rows argument should be a sequence of dicts or tuples")
3774
3775 table = _table_arg_to_table(table, default_project=self.project)
3776
3777 if not isinstance(table, Table):
3778 raise TypeError(_NEED_TABLE_ARGUMENT)
3779
3780 schema = table.schema
3781
3782 # selected_fields can override the table schema.
3783 if selected_fields is not None:
3784 schema = selected_fields
3785
3786 if len(schema) == 0:
3787 raise ValueError(
3788 (
3789 "Could not determine schema for table '{}'. Call client.get_table() "
3790 "or pass in a list of schema fields to the selected_fields argument."
3791 ).format(table)
3792 )
3793
3794 json_rows = [_record_field_to_json(schema, row) for row in rows]
3795
3796 return self.insert_rows_json(table, json_rows, **kwargs)
3797
3798 def insert_rows_from_dataframe(
3799 self,
3800 table: Union[Table, TableReference, str],
3801 dataframe,
3802 selected_fields: Optional[Sequence[SchemaField]] = None,
3803 chunk_size: int = 500,
3804 **kwargs: Dict,
3805 ) -> Sequence[Sequence[dict]]:
3806 """Insert rows into a table from a dataframe via the streaming API.
3807
3808 BigQuery will reject insertAll payloads that exceed a defined limit (10MB).
3809 Additionally, if a payload vastly exceeds this limit, the request is rejected
3810 by the intermediate architecture, which returns a 413 (Payload Too Large) status code.
3811
3812 See
3813 https://cloud.google.com/bigquery/quotas#streaming_inserts
3814
3815 Args:
3816 table (Union[ \
3817 google.cloud.bigquery.table.Table, \
3818 google.cloud.bigquery.table.TableReference, \
3819 str, \
3820 ]):
3821 The destination table for the row data, or a reference to it.
3822 dataframe (pandas.DataFrame):
3823 A :class:`~pandas.DataFrame` containing the data to load. Any
3824 ``NaN`` values present in the dataframe are omitted from the
3825 streaming API request(s).
3826 selected_fields (Sequence[google.cloud.bigquery.schema.SchemaField]):
3827 The fields to return. Required if ``table`` is a
3828 :class:`~google.cloud.bigquery.table.TableReference`.
3829 chunk_size (int):
3830 The number of rows to stream in a single chunk. Must be positive.
3831 kwargs (Dict):
3832 Keyword arguments to
3833 :meth:`~google.cloud.bigquery.client.Client.insert_rows_json`.
3834
3835 Returns:
3836 Sequence[Sequence[Mappings]]:
3837 A list with insert errors for each insert chunk. Each element
3838 is a list containing one mapping per row with insert errors:
3839 the "index" key identifies the row, and the "errors" key
3840 contains a list of the mappings describing one or more problems
3841 with the row.
3842
3843 Raises:
3844 ValueError: if table's schema is not set
3845 """
3846 insert_results = []
3847
3848 chunk_count = int(math.ceil(len(dataframe) / chunk_size))
3849 rows_iter = _pandas_helpers.dataframe_to_json_generator(dataframe)
3850
3851 for _ in range(chunk_count):
3852 rows_chunk = itertools.islice(rows_iter, chunk_size)
3853 result = self.insert_rows(table, rows_chunk, selected_fields, **kwargs)
3854 insert_results.append(result)
3855
3856 return insert_results
3857
3858 def insert_rows_json(
3859 self,
3860 table: Union[Table, TableReference, TableListItem, str],
3861 json_rows: Sequence[Mapping[str, Any]],
3862 row_ids: Union[
3863 Iterable[Optional[str]], AutoRowIDs, None
3864 ] = AutoRowIDs.GENERATE_UUID,
3865 skip_invalid_rows: Optional[bool] = None,
3866 ignore_unknown_values: Optional[bool] = None,
3867 template_suffix: Optional[str] = None,
3868 retry: retries.Retry = DEFAULT_RETRY,
3869 timeout: TimeoutType = DEFAULT_TIMEOUT,
3870 ) -> Sequence[dict]:
3871 """Insert rows into a table without applying local type conversions.
3872
3873 See
3874 https://cloud.google.com/bigquery/docs/reference/rest/v2/tabledata/insertAll
3875
3876 BigQuery will reject insertAll payloads that exceed a defined limit (10MB).
3877 Additionally, if a payload vastly exceeds this limit, the request is rejected
3878 by the intermediate architecture, which returns a 413 (Payload Too Large) status code.
3879
3880 See
3881 https://cloud.google.com/bigquery/quotas#streaming_inserts
3882
3883 Args:
3884 table (Union[ \
3885 google.cloud.bigquery.table.Table \
3886 google.cloud.bigquery.table.TableReference, \
3887 google.cloud.bigquery.table.TableListItem, \
3888 str \
3889 ]):
3890 The destination table for the row data, or a reference to it.
3891 json_rows (Sequence[Dict]):
3892 Row data to be inserted. Keys must match the table schema fields
3893 and values must be JSON-compatible representations.
3894 row_ids (Union[Iterable[str], AutoRowIDs, None]):
3895 Unique IDs, one per row being inserted. An ID can also be
3896 ``None``, indicating that an explicit insert ID should **not**
3897 be used for that row. If the argument is omitted altogether,
3898 unique IDs are created automatically.
3899
3900 .. versionchanged:: 2.21.0
3901 Can also be an iterable, not just a sequence, or an
3902 :class:`AutoRowIDs` enum member.
3903
3904 .. deprecated:: 2.21.0
3905 Passing ``None`` to explicitly request autogenerating insert IDs is
3906 deprecated, use :attr:`AutoRowIDs.GENERATE_UUID` instead.
3907
3908 skip_invalid_rows (Optional[bool]):
3909 Insert all valid rows of a request, even if invalid rows exist.
3910 The default value is ``False``, which causes the entire request
3911 to fail if any invalid rows exist.
3912 ignore_unknown_values (Optional[bool]):
3913 Accept rows that contain values that do not match the schema.
3914 The unknown values are ignored. Default is ``False``, which
3915 treats unknown values as errors.
3916 template_suffix (Optional[str]):
3917 Treat ``name`` as a template table and provide a suffix.
3918 BigQuery will create the table ``<name> + <template_suffix>``
3919 based on the schema of the template table. See
3920 https://cloud.google.com/bigquery/streaming-data-into-bigquery#template-tables
3921 retry (Optional[google.api_core.retry.Retry]):
3922 How to retry the RPC.
3923 timeout (Optional[float]):
3924 The number of seconds to wait for the underlying HTTP transport
3925 before using ``retry``.
3926
3927 Returns:
3928 Sequence[Mappings]:
3929 One mapping per row with insert errors: the "index" key
3930 identifies the row, and the "errors" key contains a list of
3931 the mappings describing one or more problems with the row.
3932
3933 Raises:
3934 TypeError: if `json_rows` is not a `Sequence`.
3935 """
3936 if not isinstance(
3937 json_rows, (collections_abc.Sequence, collections_abc.Iterator)
3938 ):
3939 raise TypeError("json_rows argument should be a sequence of dicts")
3940 # Convert table to just a reference because unlike insert_rows,
3941 # insert_rows_json doesn't need the table schema. It's not doing any
3942 # type conversions.
3943 table = _table_arg_to_table_ref(table, default_project=self.project)
3944 rows_info: List[Any] = []
3945 data: Dict[str, Any] = {"rows": rows_info}
3946
3947 if row_ids is None:
3948 warnings.warn(
3949 "Passing None for row_ids is deprecated. To explicitly request "
3950 "autogenerated insert IDs, use AutoRowIDs.GENERATE_UUID instead",
3951 category=DeprecationWarning,
3952 )
3953 row_ids = AutoRowIDs.GENERATE_UUID
3954
3955 if not isinstance(row_ids, AutoRowIDs):
3956 try:
3957 row_ids_iter = iter(row_ids)
3958 except TypeError:
3959 msg = "row_ids is neither an iterable nor an AutoRowIDs enum member"
3960 raise TypeError(msg)
3961
3962 for i, row in enumerate(json_rows):
3963 info: Dict[str, Any] = {"json": row}
3964
3965 if row_ids is AutoRowIDs.GENERATE_UUID:
3966 info["insertId"] = str(uuid.uuid4())
3967 elif row_ids is AutoRowIDs.DISABLED:
3968 info["insertId"] = None
3969 else:
3970 try:
3971 insert_id = next(row_ids_iter)
3972 except StopIteration:
3973 msg = f"row_ids did not generate enough IDs, error at index {i}"
3974 raise ValueError(msg)
3975 else:
3976 info["insertId"] = insert_id
3977
3978 rows_info.append(info)
3979
3980 if skip_invalid_rows is not None:
3981 data["skipInvalidRows"] = skip_invalid_rows
3982
3983 if ignore_unknown_values is not None:
3984 data["ignoreUnknownValues"] = ignore_unknown_values
3985
3986 if template_suffix is not None:
3987 data["templateSuffix"] = template_suffix
3988
3989 path = "%s/insertAll" % table.path
3990 # We can always retry, because every row has an insert ID.
3991 span_attributes = {"path": path}
3992 response = self._call_api(
3993 retry,
3994 span_name="BigQuery.insertRowsJson",
3995 span_attributes=span_attributes,
3996 method="POST",
3997 path=path,
3998 data=data,
3999 timeout=timeout,
4000 )
4001 errors = []
4002
4003 for error in response.get("insertErrors", ()):
4004 errors.append({"index": int(error["index"]), "errors": error["errors"]})
4005
4006 return errors
4007
4008 def list_partitions(
4009 self,
4010 table: Union[Table, TableReference, TableListItem, str],
4011 retry: retries.Retry = DEFAULT_RETRY,
4012 timeout: TimeoutType = DEFAULT_TIMEOUT,
4013 ) -> Sequence[str]:
4014 """List the partitions in a table.
4015
4016 Args:
4017 table (Union[ \
4018 google.cloud.bigquery.table.Table, \
4019 google.cloud.bigquery.table.TableReference, \
4020 google.cloud.bigquery.table.TableListItem, \
4021 str, \
4022 ]):
4023 The table or reference from which to get partition info
4024 retry (Optional[google.api_core.retry.Retry]):
4025 How to retry the RPC.
4026 timeout (Optional[float]):
4027 The number of seconds to wait for the underlying HTTP transport
4028 before using ``retry``.
4029 If multiple requests are made under the hood, ``timeout``
4030 applies to each individual request.
4031
4032 Returns:
4033 List[str]:
4034 A list of the partition ids present in the partitioned table
4035 """
4036 table = _table_arg_to_table_ref(table, default_project=self.project)
4037 meta_table = self.get_table(
4038 TableReference(
4039 DatasetReference(table.project, table.dataset_id),
4040 "%s$__PARTITIONS_SUMMARY__" % table.table_id,
4041 ),
4042 retry=retry,
4043 timeout=timeout,
4044 )
4045
4046 subset = [col for col in meta_table.schema if col.name == "partition_id"]
4047 return [
4048 row[0]
4049 for row in self.list_rows(
4050 meta_table, selected_fields=subset, retry=retry, timeout=timeout
4051 )
4052 ]
4053
4054 def list_rows(
4055 self,
4056 table: Union[Table, TableListItem, TableReference, str],
4057 selected_fields: Optional[Sequence[SchemaField]] = None,
4058 max_results: Optional[int] = None,
4059 page_token: Optional[str] = None,
4060 start_index: Optional[int] = None,
4061 page_size: Optional[int] = None,
4062 retry: retries.Retry = DEFAULT_RETRY,
4063 timeout: TimeoutType = DEFAULT_TIMEOUT,
4064 ) -> RowIterator:
4065 """List the rows of the table.
4066
4067 See
4068 https://cloud.google.com/bigquery/docs/reference/rest/v2/tabledata/list
4069
4070 .. note::
4071
4072 This method assumes that the provided schema is up-to-date with the
4073 schema as defined on the back-end: if the two schemas are not
4074 identical, the values returned may be incomplete. To ensure that the
4075 local copy of the schema is up-to-date, call ``client.get_table``.
4076
4077 Args:
4078 table (Union[ \
4079 google.cloud.bigquery.table.Table, \
4080 google.cloud.bigquery.table.TableListItem, \
4081 google.cloud.bigquery.table.TableReference, \
4082 str, \
4083 ]):
4084 The table to list, or a reference to it. When the table
4085 object does not contain a schema and ``selected_fields`` is
4086 not supplied, this method calls ``get_table`` to fetch the
4087 table schema.
4088 selected_fields (Sequence[google.cloud.bigquery.schema.SchemaField]):
4089 The fields to return. If not supplied, data for all columns
4090 are downloaded.
4091 max_results (Optional[int]):
4092 Maximum number of rows to return.
4093 page_token (Optional[str]):
4094 Token representing a cursor into the table's rows.
4095 If not passed, the API will return the first page of the
4096 rows. The token marks the beginning of the iterator to be
4097 returned and the value of the ``page_token`` can be accessed
4098 at ``next_page_token`` of the
4099 :class:`~google.cloud.bigquery.table.RowIterator`.
4100 start_index (Optional[int]):
4101 The zero-based index of the starting row to read.
4102 page_size (Optional[int]):
4103 The maximum number of rows in each page of results from this request.
4104 Non-positive values are ignored. Defaults to a sensible value set by the API.
4105 retry (Optional[google.api_core.retry.Retry]):
4106 How to retry the RPC.
4107 timeout (Optional[float]):
4108 The number of seconds to wait for the underlying HTTP transport
4109 before using ``retry``.
4110 If multiple requests are made under the hood, ``timeout``
4111 applies to each individual request.
4112
4113 Returns:
4114 google.cloud.bigquery.table.RowIterator:
4115 Iterator of row data
4116 :class:`~google.cloud.bigquery.table.Row`-s. During each
4117 page, the iterator will have the ``total_rows`` attribute
4118 set, which counts the total number of rows **in the table**
4119 (this is distinct from the total number of rows in the
4120 current page: ``iterator.page.num_items``).
4121 """
4122 table = _table_arg_to_table(table, default_project=self.project)
4123
4124 if not isinstance(table, Table):
4125 raise TypeError(_NEED_TABLE_ARGUMENT)
4126
4127 schema = table.schema
4128
4129 # selected_fields can override the table schema.
4130 if selected_fields is not None:
4131 schema = selected_fields
4132
4133 # No schema, but no selected_fields. Assume the developer wants all
4134 # columns, so get the table resource for them rather than failing.
4135 elif len(schema) == 0:
4136 table = self.get_table(table.reference, retry=retry, timeout=timeout)
4137 schema = table.schema
4138
4139 params: Dict[str, Any] = {}
4140 if selected_fields is not None:
4141 params["selectedFields"] = ",".join(field.name for field in selected_fields)
4142 if start_index is not None:
4143 params["startIndex"] = start_index
4144
4145 params["formatOptions.useInt64Timestamp"] = True
4146 row_iterator = RowIterator(
4147 client=self,
4148 api_request=functools.partial(self._call_api, retry, timeout=timeout),
4149 path="%s/data" % (table.path,),
4150 schema=schema,
4151 page_token=page_token,
4152 max_results=max_results,
4153 page_size=page_size,
4154 extra_params=params,
4155 table=table,
4156 # Pass in selected_fields separately from schema so that full
4157 # tables can be fetched without a column filter.
4158 selected_fields=selected_fields,
4159 total_rows=getattr(table, "num_rows", None),
4160 project=table.project,
4161 location=table.location,
4162 )
4163 return row_iterator
4164
4165 def _list_rows_from_query_results(
4166 self,
4167 job_id: str,
4168 location: str,
4169 project: str,
4170 schema: Sequence[SchemaField],
4171 total_rows: Optional[int] = None,
4172 destination: Optional[Union[Table, TableReference, TableListItem, str]] = None,
4173 max_results: Optional[int] = None,
4174 start_index: Optional[int] = None,
4175 page_size: Optional[int] = None,
4176 retry: retries.Retry = DEFAULT_RETRY,
4177 timeout: TimeoutType = DEFAULT_TIMEOUT,
4178 query_id: Optional[str] = None,
4179 first_page_response: Optional[Dict[str, Any]] = None,
4180 num_dml_affected_rows: Optional[int] = None,
4181 query: Optional[str] = None,
4182 total_bytes_processed: Optional[int] = None,
4183 slot_millis: Optional[int] = None,
4184 created: Optional[datetime.datetime] = None,
4185 started: Optional[datetime.datetime] = None,
4186 ended: Optional[datetime.datetime] = None,
4187 ) -> RowIterator:
4188 """List the rows of a completed query.
4189 See
4190 https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/getQueryResults
4191 Args:
4192 job_id (str):
4193 ID of a query job.
4194 location (str): Location of the query job.
4195 project (str):
4196 ID of the project where the query job was run.
4197 schema (Sequence[google.cloud.bigquery.schema.SchemaField]):
4198 The fields expected in these query results. Used to convert
4199 from JSON to expected Python types.
4200 total_rows (Optional[int]):
4201 Total number of rows in the query results.
4202 destination (Optional[Union[ \
4203 google.cloud.bigquery.table.Table, \
4204 google.cloud.bigquery.table.TableListItem, \
4205 google.cloud.bigquery.table.TableReference, \
4206 str, \
4207 ]]):
4208 Destination table reference. Used to fetch the query results
4209 with the BigQuery Storage API.
4210 max_results (Optional[int]):
4211 Maximum number of rows to return across the whole iterator.
4212 start_index (Optional[int]):
4213 The zero-based index of the starting row to read.
4214 page_size (Optional[int]):
4215 The maximum number of rows in each page of results from this request.
4216 Non-positive values are ignored. Defaults to a sensible value set by the API.
4217 retry (Optional[google.api_core.retry.Retry]):
4218 How to retry the RPC.
4219 timeout (Optional[float]):
4220 The number of seconds to wait for the underlying HTTP transport
4221 before using ``retry``. If set, this connection timeout may be
4222 increased to a minimum value. This prevents retries on what
4223 would otherwise be a successful response.
4224 If multiple requests are made under the hood, ``timeout``
4225 applies to each individual request.
4226 query_id (Optional[str]):
4227 [Preview] ID of a completed query. This ID is auto-generated
4228 and not guaranteed to be populated.
4229 first_page_response (Optional[dict]):
4230 API response for the first page of results (if available).
4231 num_dml_affected_rows (Optional[int]):
4232 If this RowIterator is the result of a DML query, the number of
4233 rows that were affected.
4234 query (Optional[str]):
4235 The query text used.
4236 total_bytes_processed (Optional[int]):
4237 total bytes processed from job statistics, if present.
4238 slot_millis (Optional[int]):
4239 Number of slot ms the user is actually billed for.
4240 created (Optional[datetime.datetime]):
4241 Datetime at which the job was created.
4242 started (Optional[datetime.datetime]):
4243 Datetime at which the job was started.
4244 ended (Optional[datetime.datetime]):
4245 Datetime at which the job finished.
4246
4247 Returns:
4248 google.cloud.bigquery.table.RowIterator:
4249 Iterator of row data
4250 :class:`~google.cloud.bigquery.table.Row`-s.
4251 """
4252 params: Dict[str, Any] = {
4253 "fields": _LIST_ROWS_FROM_QUERY_RESULTS_FIELDS,
4254 "location": location,
4255 }
4256
4257 if timeout is not None:
4258 if not isinstance(timeout, (int, float)):
4259 timeout = _MIN_GET_QUERY_RESULTS_TIMEOUT
4260 else:
4261 timeout = max(timeout, _MIN_GET_QUERY_RESULTS_TIMEOUT)
4262
4263 if start_index is not None:
4264 params["startIndex"] = start_index
4265
4266 params["formatOptions.useInt64Timestamp"] = True
4267 row_iterator = RowIterator(
4268 client=self,
4269 api_request=functools.partial(self._call_api, retry, timeout=timeout),
4270 path=f"/projects/{project}/queries/{job_id}",
4271 schema=schema,
4272 max_results=max_results,
4273 page_size=page_size,
4274 table=destination,
4275 extra_params=params,
4276 total_rows=total_rows,
4277 project=project,
4278 location=location,
4279 job_id=job_id,
4280 query_id=query_id,
4281 first_page_response=first_page_response,
4282 num_dml_affected_rows=num_dml_affected_rows,
4283 query=query,
4284 total_bytes_processed=total_bytes_processed,
4285 slot_millis=slot_millis,
4286 created=created,
4287 started=started,
4288 ended=ended,
4289 )
4290 return row_iterator
4291
4292 def _schema_from_json_file_object(self, file_obj):
4293 """Helper function for schema_from_json that takes a
4294 file object that describes a table schema.
4295
4296 Returns:
4297 List of schema field objects.
4298 """
4299 json_data = json.load(file_obj)
4300 return [SchemaField.from_api_repr(field) for field in json_data]
4301
4302 def _schema_to_json_file_object(self, schema_list, file_obj):
4303 """Helper function for schema_to_json that takes a schema list and file
4304 object and writes the schema list to the file object with json.dump
4305 """
4306 json.dump(schema_list, file_obj, indent=2, sort_keys=True)
4307
4308 def schema_from_json(self, file_or_path: "PathType") -> List[SchemaField]:
4309 """Takes a file object or file path that contains json that describes
4310 a table schema.
4311
4312 Returns:
4313 List[SchemaField]:
4314 List of :class:`~google.cloud.bigquery.schema.SchemaField` objects.
4315 """
4316 if isinstance(file_or_path, io.IOBase):
4317 return self._schema_from_json_file_object(file_or_path)
4318
4319 with open(file_or_path) as file_obj:
4320 return self._schema_from_json_file_object(file_obj)
4321
4322 def schema_to_json(
4323 self, schema_list: Sequence[SchemaField], destination: "PathType"
4324 ):
4325 """Takes a list of schema field objects.
4326
4327 Serializes the list of schema field objects as json to a file.
4328
4329 Destination is a file path or a file object.
4330 """
4331 json_schema_list = [f.to_api_repr() for f in schema_list]
4332
4333 if isinstance(destination, io.IOBase):
4334 return self._schema_to_json_file_object(json_schema_list, destination)
4335
4336 with open(destination, mode="w") as file_obj:
4337 return self._schema_to_json_file_object(json_schema_list, file_obj)
4338
4339 def __enter__(self):
4340 return self
4341
4342 def __exit__(self, exc_type, exc_value, traceback):
4343 self.close()
4344
4345
4346# pylint: disable=unused-argument
4347def _item_to_project(iterator, resource):
4348 """Convert a JSON project to the native object.
4349
4350 Args:
4351 iterator (google.api_core.page_iterator.Iterator): The iterator that is currently in use.
4352
4353 resource (Dict): An item to be converted to a project.
4354
4355 Returns:
4356 google.cloud.bigquery.client.Project: The next project in the page.
4357 """
4358 return Project.from_api_repr(resource)
4359
4360
4361# pylint: enable=unused-argument
4362
4363
4364def _item_to_dataset(iterator, resource):
4365 """Convert a JSON dataset to the native object.
4366
4367 Args:
4368 iterator (google.api_core.page_iterator.Iterator): The iterator that is currently in use.
4369
4370 resource (Dict): An item to be converted to a dataset.
4371
4372 Returns:
4373 google.cloud.bigquery.dataset.DatasetListItem: The next dataset in the page.
4374 """
4375 return DatasetListItem(resource)
4376
4377
4378def _item_to_job(iterator, resource):
4379 """Convert a JSON job to the native object.
4380
4381 Args:
4382 iterator (google.api_core.page_iterator.Iterator): The iterator that is currently in use.
4383
4384 resource (Dict): An item to be converted to a job.
4385
4386 Returns:
4387 job instance: The next job in the page.
4388 """
4389 return iterator.client.job_from_resource(resource)
4390
4391
4392def _item_to_model(iterator, resource):
4393 """Convert a JSON model to the native object.
4394
4395 Args:
4396 iterator (google.api_core.page_iterator.Iterator):
4397 The iterator that is currently in use.
4398 resource (Dict): An item to be converted to a model.
4399
4400 Returns:
4401 google.cloud.bigquery.model.Model: The next model in the page.
4402 """
4403 return Model.from_api_repr(resource)
4404
4405
4406def _item_to_routine(iterator, resource):
4407 """Convert a JSON model to the native object.
4408
4409 Args:
4410 iterator (google.api_core.page_iterator.Iterator):
4411 The iterator that is currently in use.
4412 resource (Dict): An item to be converted to a routine.
4413
4414 Returns:
4415 google.cloud.bigquery.routine.Routine: The next routine in the page.
4416 """
4417 return Routine.from_api_repr(resource)
4418
4419
4420def _item_to_table(iterator, resource):
4421 """Convert a JSON table to the native object.
4422
4423 Args:
4424 iterator (google.api_core.page_iterator.Iterator): The iterator that is currently in use.
4425
4426 resource (Dict): An item to be converted to a table.
4427
4428 Returns:
4429 google.cloud.bigquery.table.Table: The next table in the page.
4430 """
4431 return TableListItem(resource)
4432
4433
4434def _extract_job_reference(job, project=None, location=None):
4435 """Extract fully-qualified job reference from a job-like object.
4436
4437 Args:
4438 job_id (Union[ \
4439 str, \
4440 google.cloud.bigquery.job.LoadJob, \
4441 google.cloud.bigquery.job.CopyJob, \
4442 google.cloud.bigquery.job.ExtractJob, \
4443 google.cloud.bigquery.job.QueryJob \
4444 ]): Job identifier.
4445 project (Optional[str]):
4446 Project where the job was run. Ignored if ``job_id`` is a job
4447 object.
4448 location (Optional[str]):
4449 Location where the job was run. Ignored if ``job_id`` is a job
4450 object.
4451
4452 Returns:
4453 Tuple[str, str, str]: ``(project, location, job_id)``
4454 """
4455 if hasattr(job, "job_id"):
4456 project = job.project
4457 job_id = job.job_id
4458 location = job.location
4459 else:
4460 job_id = job
4461
4462 return (project, location, job_id)
4463
4464
4465def _check_mode(stream):
4466 """Check that a stream was opened in read-binary mode.
4467
4468 Args:
4469 stream (IO[bytes]): A bytes IO object open for reading.
4470
4471 Raises:
4472 ValueError:
4473 if the ``stream.mode`` is a valid attribute
4474 and is not among ``rb``, ``r+b`` or ``rb+``.
4475 """
4476 mode = getattr(stream, "mode", None)
4477
4478 if isinstance(stream, gzip.GzipFile):
4479 if mode != gzip.READ: # pytype: disable=module-attr
4480 raise ValueError(
4481 "Cannot upload gzip files opened in write mode: use "
4482 "gzip.GzipFile(filename, mode='rb')"
4483 )
4484 else:
4485 if mode is not None and mode not in ("rb", "r+b", "rb+"):
4486 raise ValueError(
4487 "Cannot upload files opened in text mode: use "
4488 "open(filename, mode='rb') or open(filename, mode='r+b')"
4489 )
4490
4491
4492def _get_upload_headers(user_agent):
4493 """Get the headers for an upload request.
4494
4495 Args:
4496 user_agent (str): The user-agent for requests.
4497
4498 Returns:
4499 Dict: The headers to be used for the request.
4500 """
4501 return {
4502 "Accept": "application/json",
4503 "Accept-Encoding": "gzip, deflate",
4504 "User-Agent": user_agent,
4505 "content-type": "application/json; charset=UTF-8",
4506 }
4507
4508
4509def _add_server_timeout_header(headers: Optional[Dict[str, str]], kwargs):
4510 timeout = kwargs.get("timeout")
4511 if timeout is not None:
4512 if headers is None:
4513 headers = {}
4514 headers[TIMEOUT_HEADER] = str(timeout)
4515
4516 if headers:
4517 kwargs["headers"] = headers
4518
4519 return kwargs