1# -*- coding: utf-8 -*-
2# Copyright 2024 Google LLC
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16from collections import OrderedDict
17import os
18import re
19from typing import (
20 Dict,
21 Iterable,
22 Mapping,
23 MutableMapping,
24 MutableSequence,
25 Optional,
26 Sequence,
27 Tuple,
28 Type,
29 Union,
30 cast,
31)
32import warnings
33
34from google.api_core import client_options as client_options_lib
35from google.api_core import exceptions as core_exceptions
36from google.api_core import gapic_v1
37from google.api_core import retry as retries
38from google.auth import credentials as ga_credentials # type: ignore
39from google.auth.exceptions import MutualTLSChannelError # type: ignore
40from google.auth.transport import mtls # type: ignore
41from google.auth.transport.grpc import SslCredentials # type: ignore
42from google.oauth2 import service_account # type: ignore
43
44from google.cloud.bigquery_storage_v1 import gapic_version as package_version
45
46try:
47 OptionalRetry = Union[retries.Retry, gapic_v1.method._MethodDefault, None]
48except AttributeError: # pragma: NO COVER
49 OptionalRetry = Union[retries.Retry, object, None] # type: ignore
50
51from google.protobuf import timestamp_pb2 # type: ignore
52
53from google.cloud.bigquery_storage_v1.types import arrow, avro, storage, stream
54
55from .transports.base import DEFAULT_CLIENT_INFO, BigQueryReadTransport
56from .transports.grpc import BigQueryReadGrpcTransport
57from .transports.grpc_asyncio import BigQueryReadGrpcAsyncIOTransport
58
59
60class BigQueryReadClientMeta(type):
61 """Metaclass for the BigQueryRead client.
62
63 This provides class-level methods for building and retrieving
64 support objects (e.g. transport) without polluting the client instance
65 objects.
66 """
67
68 _transport_registry = OrderedDict() # type: Dict[str, Type[BigQueryReadTransport]]
69 _transport_registry["grpc"] = BigQueryReadGrpcTransport
70 _transport_registry["grpc_asyncio"] = BigQueryReadGrpcAsyncIOTransport
71
72 def get_transport_class(
73 cls,
74 label: Optional[str] = None,
75 ) -> Type[BigQueryReadTransport]:
76 """Returns an appropriate transport class.
77
78 Args:
79 label: The name of the desired transport. If none is
80 provided, then the first transport in the registry is used.
81
82 Returns:
83 The transport class to use.
84 """
85 # If a specific transport is requested, return that one.
86 if label:
87 return cls._transport_registry[label]
88
89 # No transport is requested; return the default (that is, the first one
90 # in the dictionary).
91 return next(iter(cls._transport_registry.values()))
92
93
94class BigQueryReadClient(metaclass=BigQueryReadClientMeta):
95 """BigQuery Read API.
96
97 The Read API can be used to read data from BigQuery.
98 """
99
100 @staticmethod
101 def _get_default_mtls_endpoint(api_endpoint):
102 """Converts api endpoint to mTLS endpoint.
103
104 Convert "*.sandbox.googleapis.com" and "*.googleapis.com" to
105 "*.mtls.sandbox.googleapis.com" and "*.mtls.googleapis.com" respectively.
106 Args:
107 api_endpoint (Optional[str]): the api endpoint to convert.
108 Returns:
109 str: converted mTLS api endpoint.
110 """
111 if not api_endpoint:
112 return api_endpoint
113
114 mtls_endpoint_re = re.compile(
115 r"(?P<name>[^.]+)(?P<mtls>\.mtls)?(?P<sandbox>\.sandbox)?(?P<googledomain>\.googleapis\.com)?"
116 )
117
118 m = mtls_endpoint_re.match(api_endpoint)
119 name, mtls, sandbox, googledomain = m.groups()
120 if mtls or not googledomain:
121 return api_endpoint
122
123 if sandbox:
124 return api_endpoint.replace(
125 "sandbox.googleapis.com", "mtls.sandbox.googleapis.com"
126 )
127
128 return api_endpoint.replace(".googleapis.com", ".mtls.googleapis.com")
129
130 # Note: DEFAULT_ENDPOINT is deprecated. Use _DEFAULT_ENDPOINT_TEMPLATE instead.
131 DEFAULT_ENDPOINT = "bigquerystorage.googleapis.com"
132 DEFAULT_MTLS_ENDPOINT = _get_default_mtls_endpoint.__func__( # type: ignore
133 DEFAULT_ENDPOINT
134 )
135
136 _DEFAULT_ENDPOINT_TEMPLATE = "bigquerystorage.{UNIVERSE_DOMAIN}"
137 _DEFAULT_UNIVERSE = "googleapis.com"
138
139 @classmethod
140 def from_service_account_info(cls, info: dict, *args, **kwargs):
141 """Creates an instance of this client using the provided credentials
142 info.
143
144 Args:
145 info (dict): The service account private key info.
146 args: Additional arguments to pass to the constructor.
147 kwargs: Additional arguments to pass to the constructor.
148
149 Returns:
150 BigQueryReadClient: The constructed client.
151 """
152 credentials = service_account.Credentials.from_service_account_info(info)
153 kwargs["credentials"] = credentials
154 return cls(*args, **kwargs)
155
156 @classmethod
157 def from_service_account_file(cls, filename: str, *args, **kwargs):
158 """Creates an instance of this client using the provided credentials
159 file.
160
161 Args:
162 filename (str): The path to the service account private key json
163 file.
164 args: Additional arguments to pass to the constructor.
165 kwargs: Additional arguments to pass to the constructor.
166
167 Returns:
168 BigQueryReadClient: The constructed client.
169 """
170 credentials = service_account.Credentials.from_service_account_file(filename)
171 kwargs["credentials"] = credentials
172 return cls(*args, **kwargs)
173
174 from_service_account_json = from_service_account_file
175
176 @property
177 def transport(self) -> BigQueryReadTransport:
178 """Returns the transport used by the client instance.
179
180 Returns:
181 BigQueryReadTransport: The transport used by the client
182 instance.
183 """
184 return self._transport
185
186 @staticmethod
187 def read_session_path(
188 project: str,
189 location: str,
190 session: str,
191 ) -> str:
192 """Returns a fully-qualified read_session string."""
193 return "projects/{project}/locations/{location}/sessions/{session}".format(
194 project=project,
195 location=location,
196 session=session,
197 )
198
199 @staticmethod
200 def parse_read_session_path(path: str) -> Dict[str, str]:
201 """Parses a read_session path into its component segments."""
202 m = re.match(
203 r"^projects/(?P<project>.+?)/locations/(?P<location>.+?)/sessions/(?P<session>.+?)$",
204 path,
205 )
206 return m.groupdict() if m else {}
207
208 @staticmethod
209 def read_stream_path(
210 project: str,
211 location: str,
212 session: str,
213 stream: str,
214 ) -> str:
215 """Returns a fully-qualified read_stream string."""
216 return "projects/{project}/locations/{location}/sessions/{session}/streams/{stream}".format(
217 project=project,
218 location=location,
219 session=session,
220 stream=stream,
221 )
222
223 @staticmethod
224 def parse_read_stream_path(path: str) -> Dict[str, str]:
225 """Parses a read_stream path into its component segments."""
226 m = re.match(
227 r"^projects/(?P<project>.+?)/locations/(?P<location>.+?)/sessions/(?P<session>.+?)/streams/(?P<stream>.+?)$",
228 path,
229 )
230 return m.groupdict() if m else {}
231
232 @staticmethod
233 def table_path(
234 project: str,
235 dataset: str,
236 table: str,
237 ) -> str:
238 """Returns a fully-qualified table string."""
239 return "projects/{project}/datasets/{dataset}/tables/{table}".format(
240 project=project,
241 dataset=dataset,
242 table=table,
243 )
244
245 @staticmethod
246 def parse_table_path(path: str) -> Dict[str, str]:
247 """Parses a table path into its component segments."""
248 m = re.match(
249 r"^projects/(?P<project>.+?)/datasets/(?P<dataset>.+?)/tables/(?P<table>.+?)$",
250 path,
251 )
252 return m.groupdict() if m else {}
253
254 @staticmethod
255 def common_billing_account_path(
256 billing_account: str,
257 ) -> str:
258 """Returns a fully-qualified billing_account string."""
259 return "billingAccounts/{billing_account}".format(
260 billing_account=billing_account,
261 )
262
263 @staticmethod
264 def parse_common_billing_account_path(path: str) -> Dict[str, str]:
265 """Parse a billing_account path into its component segments."""
266 m = re.match(r"^billingAccounts/(?P<billing_account>.+?)$", path)
267 return m.groupdict() if m else {}
268
269 @staticmethod
270 def common_folder_path(
271 folder: str,
272 ) -> str:
273 """Returns a fully-qualified folder string."""
274 return "folders/{folder}".format(
275 folder=folder,
276 )
277
278 @staticmethod
279 def parse_common_folder_path(path: str) -> Dict[str, str]:
280 """Parse a folder path into its component segments."""
281 m = re.match(r"^folders/(?P<folder>.+?)$", path)
282 return m.groupdict() if m else {}
283
284 @staticmethod
285 def common_organization_path(
286 organization: str,
287 ) -> str:
288 """Returns a fully-qualified organization string."""
289 return "organizations/{organization}".format(
290 organization=organization,
291 )
292
293 @staticmethod
294 def parse_common_organization_path(path: str) -> Dict[str, str]:
295 """Parse a organization path into its component segments."""
296 m = re.match(r"^organizations/(?P<organization>.+?)$", path)
297 return m.groupdict() if m else {}
298
299 @staticmethod
300 def common_project_path(
301 project: str,
302 ) -> str:
303 """Returns a fully-qualified project string."""
304 return "projects/{project}".format(
305 project=project,
306 )
307
308 @staticmethod
309 def parse_common_project_path(path: str) -> Dict[str, str]:
310 """Parse a project path into its component segments."""
311 m = re.match(r"^projects/(?P<project>.+?)$", path)
312 return m.groupdict() if m else {}
313
314 @staticmethod
315 def common_location_path(
316 project: str,
317 location: str,
318 ) -> str:
319 """Returns a fully-qualified location string."""
320 return "projects/{project}/locations/{location}".format(
321 project=project,
322 location=location,
323 )
324
325 @staticmethod
326 def parse_common_location_path(path: str) -> Dict[str, str]:
327 """Parse a location path into its component segments."""
328 m = re.match(r"^projects/(?P<project>.+?)/locations/(?P<location>.+?)$", path)
329 return m.groupdict() if m else {}
330
331 @classmethod
332 def get_mtls_endpoint_and_cert_source(
333 cls, client_options: Optional[client_options_lib.ClientOptions] = None
334 ):
335 """Deprecated. Return the API endpoint and client cert source for mutual TLS.
336
337 The client cert source is determined in the following order:
338 (1) if `GOOGLE_API_USE_CLIENT_CERTIFICATE` environment variable is not "true", the
339 client cert source is None.
340 (2) if `client_options.client_cert_source` is provided, use the provided one; if the
341 default client cert source exists, use the default one; otherwise the client cert
342 source is None.
343
344 The API endpoint is determined in the following order:
345 (1) if `client_options.api_endpoint` if provided, use the provided one.
346 (2) if `GOOGLE_API_USE_CLIENT_CERTIFICATE` environment variable is "always", use the
347 default mTLS endpoint; if the environment variable is "never", use the default API
348 endpoint; otherwise if client cert source exists, use the default mTLS endpoint, otherwise
349 use the default API endpoint.
350
351 More details can be found at https://google.aip.dev/auth/4114.
352
353 Args:
354 client_options (google.api_core.client_options.ClientOptions): Custom options for the
355 client. Only the `api_endpoint` and `client_cert_source` properties may be used
356 in this method.
357
358 Returns:
359 Tuple[str, Callable[[], Tuple[bytes, bytes]]]: returns the API endpoint and the
360 client cert source to use.
361
362 Raises:
363 google.auth.exceptions.MutualTLSChannelError: If any errors happen.
364 """
365
366 warnings.warn(
367 "get_mtls_endpoint_and_cert_source is deprecated. Use the api_endpoint property instead.",
368 DeprecationWarning,
369 )
370 if client_options is None:
371 client_options = client_options_lib.ClientOptions()
372 use_client_cert = os.getenv("GOOGLE_API_USE_CLIENT_CERTIFICATE", "false")
373 use_mtls_endpoint = os.getenv("GOOGLE_API_USE_MTLS_ENDPOINT", "auto")
374 if use_client_cert not in ("true", "false"):
375 raise ValueError(
376 "Environment variable `GOOGLE_API_USE_CLIENT_CERTIFICATE` must be either `true` or `false`"
377 )
378 if use_mtls_endpoint not in ("auto", "never", "always"):
379 raise MutualTLSChannelError(
380 "Environment variable `GOOGLE_API_USE_MTLS_ENDPOINT` must be `never`, `auto` or `always`"
381 )
382
383 # Figure out the client cert source to use.
384 client_cert_source = None
385 if use_client_cert == "true":
386 if client_options.client_cert_source:
387 client_cert_source = client_options.client_cert_source
388 elif mtls.has_default_client_cert_source():
389 client_cert_source = mtls.default_client_cert_source()
390
391 # Figure out which api endpoint to use.
392 if client_options.api_endpoint is not None:
393 api_endpoint = client_options.api_endpoint
394 elif use_mtls_endpoint == "always" or (
395 use_mtls_endpoint == "auto" and client_cert_source
396 ):
397 api_endpoint = cls.DEFAULT_MTLS_ENDPOINT
398 else:
399 api_endpoint = cls.DEFAULT_ENDPOINT
400
401 return api_endpoint, client_cert_source
402
403 @staticmethod
404 def _read_environment_variables():
405 """Returns the environment variables used by the client.
406
407 Returns:
408 Tuple[bool, str, str]: returns the GOOGLE_API_USE_CLIENT_CERTIFICATE,
409 GOOGLE_API_USE_MTLS_ENDPOINT, and GOOGLE_CLOUD_UNIVERSE_DOMAIN environment variables.
410
411 Raises:
412 ValueError: If GOOGLE_API_USE_CLIENT_CERTIFICATE is not
413 any of ["true", "false"].
414 google.auth.exceptions.MutualTLSChannelError: If GOOGLE_API_USE_MTLS_ENDPOINT
415 is not any of ["auto", "never", "always"].
416 """
417 use_client_cert = os.getenv(
418 "GOOGLE_API_USE_CLIENT_CERTIFICATE", "false"
419 ).lower()
420 use_mtls_endpoint = os.getenv("GOOGLE_API_USE_MTLS_ENDPOINT", "auto").lower()
421 universe_domain_env = os.getenv("GOOGLE_CLOUD_UNIVERSE_DOMAIN")
422 if use_client_cert not in ("true", "false"):
423 raise ValueError(
424 "Environment variable `GOOGLE_API_USE_CLIENT_CERTIFICATE` must be either `true` or `false`"
425 )
426 if use_mtls_endpoint not in ("auto", "never", "always"):
427 raise MutualTLSChannelError(
428 "Environment variable `GOOGLE_API_USE_MTLS_ENDPOINT` must be `never`, `auto` or `always`"
429 )
430 return use_client_cert == "true", use_mtls_endpoint, universe_domain_env
431
432 @staticmethod
433 def _get_client_cert_source(provided_cert_source, use_cert_flag):
434 """Return the client cert source to be used by the client.
435
436 Args:
437 provided_cert_source (bytes): The client certificate source provided.
438 use_cert_flag (bool): A flag indicating whether to use the client certificate.
439
440 Returns:
441 bytes or None: The client cert source to be used by the client.
442 """
443 client_cert_source = None
444 if use_cert_flag:
445 if provided_cert_source:
446 client_cert_source = provided_cert_source
447 elif mtls.has_default_client_cert_source():
448 client_cert_source = mtls.default_client_cert_source()
449 return client_cert_source
450
451 @staticmethod
452 def _get_api_endpoint(
453 api_override, client_cert_source, universe_domain, use_mtls_endpoint
454 ):
455 """Return the API endpoint used by the client.
456
457 Args:
458 api_override (str): The API endpoint override. If specified, this is always
459 the return value of this function and the other arguments are not used.
460 client_cert_source (bytes): The client certificate source used by the client.
461 universe_domain (str): The universe domain used by the client.
462 use_mtls_endpoint (str): How to use the mTLS endpoint, which depends also on the other parameters.
463 Possible values are "always", "auto", or "never".
464
465 Returns:
466 str: The API endpoint to be used by the client.
467 """
468 if api_override is not None:
469 api_endpoint = api_override
470 elif use_mtls_endpoint == "always" or (
471 use_mtls_endpoint == "auto" and client_cert_source
472 ):
473 _default_universe = BigQueryReadClient._DEFAULT_UNIVERSE
474 if universe_domain != _default_universe:
475 raise MutualTLSChannelError(
476 f"mTLS is not supported in any universe other than {_default_universe}."
477 )
478 api_endpoint = BigQueryReadClient.DEFAULT_MTLS_ENDPOINT
479 else:
480 api_endpoint = BigQueryReadClient._DEFAULT_ENDPOINT_TEMPLATE.format(
481 UNIVERSE_DOMAIN=universe_domain
482 )
483 return api_endpoint
484
485 @staticmethod
486 def _get_universe_domain(
487 client_universe_domain: Optional[str], universe_domain_env: Optional[str]
488 ) -> str:
489 """Return the universe domain used by the client.
490
491 Args:
492 client_universe_domain (Optional[str]): The universe domain configured via the client options.
493 universe_domain_env (Optional[str]): The universe domain configured via the "GOOGLE_CLOUD_UNIVERSE_DOMAIN" environment variable.
494
495 Returns:
496 str: The universe domain to be used by the client.
497
498 Raises:
499 ValueError: If the universe domain is an empty string.
500 """
501 universe_domain = BigQueryReadClient._DEFAULT_UNIVERSE
502 if client_universe_domain is not None:
503 universe_domain = client_universe_domain
504 elif universe_domain_env is not None:
505 universe_domain = universe_domain_env
506 if len(universe_domain.strip()) == 0:
507 raise ValueError("Universe Domain cannot be an empty string.")
508 return universe_domain
509
510 @staticmethod
511 def _compare_universes(
512 client_universe: str, credentials: ga_credentials.Credentials
513 ) -> bool:
514 """Returns True iff the universe domains used by the client and credentials match.
515
516 Args:
517 client_universe (str): The universe domain configured via the client options.
518 credentials (ga_credentials.Credentials): The credentials being used in the client.
519
520 Returns:
521 bool: True iff client_universe matches the universe in credentials.
522
523 Raises:
524 ValueError: when client_universe does not match the universe in credentials.
525 """
526
527 default_universe = BigQueryReadClient._DEFAULT_UNIVERSE
528 credentials_universe = getattr(credentials, "universe_domain", default_universe)
529
530 if client_universe != credentials_universe:
531 raise ValueError(
532 "The configured universe domain "
533 f"({client_universe}) does not match the universe domain "
534 f"found in the credentials ({credentials_universe}). "
535 "If you haven't configured the universe domain explicitly, "
536 f"`{default_universe}` is the default."
537 )
538 return True
539
540 def _validate_universe_domain(self):
541 """Validates client's and credentials' universe domains are consistent.
542
543 Returns:
544 bool: True iff the configured universe domain is valid.
545
546 Raises:
547 ValueError: If the configured universe domain is not valid.
548 """
549 self._is_universe_domain_valid = (
550 self._is_universe_domain_valid
551 or BigQueryReadClient._compare_universes(
552 self.universe_domain, self.transport._credentials
553 )
554 )
555 return self._is_universe_domain_valid
556
557 @property
558 def api_endpoint(self):
559 """Return the API endpoint used by the client instance.
560
561 Returns:
562 str: The API endpoint used by the client instance.
563 """
564 return self._api_endpoint
565
566 @property
567 def universe_domain(self) -> str:
568 """Return the universe domain used by the client instance.
569
570 Returns:
571 str: The universe domain used by the client instance.
572 """
573 return self._universe_domain
574
575 def __init__(
576 self,
577 *,
578 credentials: Optional[ga_credentials.Credentials] = None,
579 transport: Optional[Union[str, BigQueryReadTransport]] = None,
580 client_options: Optional[Union[client_options_lib.ClientOptions, dict]] = None,
581 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO,
582 ) -> None:
583 """Instantiates the big query read client.
584
585 Args:
586 credentials (Optional[google.auth.credentials.Credentials]): The
587 authorization credentials to attach to requests. These
588 credentials identify the application to the service; if none
589 are specified, the client will attempt to ascertain the
590 credentials from the environment.
591 transport (Union[str, BigQueryReadTransport]): The
592 transport to use. If set to None, a transport is chosen
593 automatically.
594 client_options (Optional[Union[google.api_core.client_options.ClientOptions, dict]]):
595 Custom options for the client.
596
597 1. The ``api_endpoint`` property can be used to override the
598 default endpoint provided by the client when ``transport`` is
599 not explicitly provided. Only if this property is not set and
600 ``transport`` was not explicitly provided, the endpoint is
601 determined by the GOOGLE_API_USE_MTLS_ENDPOINT environment
602 variable, which have one of the following values:
603 "always" (always use the default mTLS endpoint), "never" (always
604 use the default regular endpoint) and "auto" (auto-switch to the
605 default mTLS endpoint if client certificate is present; this is
606 the default value).
607
608 2. If the GOOGLE_API_USE_CLIENT_CERTIFICATE environment variable
609 is "true", then the ``client_cert_source`` property can be used
610 to provide a client certificate for mTLS transport. If
611 not provided, the default SSL client certificate will be used if
612 present. If GOOGLE_API_USE_CLIENT_CERTIFICATE is "false" or not
613 set, no client certificate will be used.
614
615 3. The ``universe_domain`` property can be used to override the
616 default "googleapis.com" universe. Note that the ``api_endpoint``
617 property still takes precedence; and ``universe_domain`` is
618 currently not supported for mTLS.
619
620 client_info (google.api_core.gapic_v1.client_info.ClientInfo):
621 The client info used to send a user-agent string along with
622 API requests. If ``None``, then default info will be used.
623 Generally, you only need to set this if you're developing
624 your own client library.
625
626 Raises:
627 google.auth.exceptions.MutualTLSChannelError: If mutual TLS transport
628 creation failed for any reason.
629 """
630 self._client_options = client_options
631 if isinstance(self._client_options, dict):
632 self._client_options = client_options_lib.from_dict(self._client_options)
633 if self._client_options is None:
634 self._client_options = client_options_lib.ClientOptions()
635 self._client_options = cast(
636 client_options_lib.ClientOptions, self._client_options
637 )
638
639 universe_domain_opt = getattr(self._client_options, "universe_domain", None)
640
641 (
642 self._use_client_cert,
643 self._use_mtls_endpoint,
644 self._universe_domain_env,
645 ) = BigQueryReadClient._read_environment_variables()
646 self._client_cert_source = BigQueryReadClient._get_client_cert_source(
647 self._client_options.client_cert_source, self._use_client_cert
648 )
649 self._universe_domain = BigQueryReadClient._get_universe_domain(
650 universe_domain_opt, self._universe_domain_env
651 )
652 self._api_endpoint = None # updated below, depending on `transport`
653
654 # Initialize the universe domain validation.
655 self._is_universe_domain_valid = False
656
657 api_key_value = getattr(self._client_options, "api_key", None)
658 if api_key_value and credentials:
659 raise ValueError(
660 "client_options.api_key and credentials are mutually exclusive"
661 )
662
663 # Save or instantiate the transport.
664 # Ordinarily, we provide the transport, but allowing a custom transport
665 # instance provides an extensibility point for unusual situations.
666 transport_provided = isinstance(transport, BigQueryReadTransport)
667 if transport_provided:
668 # transport is a BigQueryReadTransport instance.
669 if credentials or self._client_options.credentials_file or api_key_value:
670 raise ValueError(
671 "When providing a transport instance, "
672 "provide its credentials directly."
673 )
674 if self._client_options.scopes:
675 raise ValueError(
676 "When providing a transport instance, provide its scopes "
677 "directly."
678 )
679 self._transport = cast(BigQueryReadTransport, transport)
680 self._api_endpoint = self._transport.host
681
682 self._api_endpoint = self._api_endpoint or BigQueryReadClient._get_api_endpoint(
683 self._client_options.api_endpoint,
684 self._client_cert_source,
685 self._universe_domain,
686 self._use_mtls_endpoint,
687 )
688
689 if not transport_provided:
690 import google.auth._default # type: ignore
691
692 if api_key_value and hasattr(
693 google.auth._default, "get_api_key_credentials"
694 ):
695 credentials = google.auth._default.get_api_key_credentials(
696 api_key_value
697 )
698
699 Transport = type(self).get_transport_class(cast(str, transport))
700 self._transport = Transport(
701 credentials=credentials,
702 credentials_file=self._client_options.credentials_file,
703 host=self._api_endpoint,
704 scopes=self._client_options.scopes,
705 client_cert_source_for_mtls=self._client_cert_source,
706 quota_project_id=self._client_options.quota_project_id,
707 client_info=client_info,
708 always_use_jwt_access=True,
709 api_audience=self._client_options.api_audience,
710 )
711
712 def create_read_session(
713 self,
714 request: Optional[Union[storage.CreateReadSessionRequest, dict]] = None,
715 *,
716 parent: Optional[str] = None,
717 read_session: Optional[stream.ReadSession] = None,
718 max_stream_count: Optional[int] = None,
719 retry: OptionalRetry = gapic_v1.method.DEFAULT,
720 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
721 metadata: Sequence[Tuple[str, str]] = (),
722 ) -> stream.ReadSession:
723 r"""Creates a new read session. A read session divides
724 the contents of a BigQuery table into one or more
725 streams, which can then be used to read data from the
726 table. The read session also specifies properties of the
727 data to be read, such as a list of columns or a
728 push-down filter describing the rows to be returned.
729
730 A particular row can be read by at most one stream. When
731 the caller has reached the end of each stream in the
732 session, then all the data in the table has been read.
733
734 Data is assigned to each stream such that roughly the
735 same number of rows can be read from each stream.
736 Because the server-side unit for assigning data is
737 collections of rows, the API does not guarantee that
738 each stream will return the same number or rows.
739 Additionally, the limits are enforced based on the
740 number of pre-filtered rows, so some filters can lead to
741 lopsided assignments.
742
743 Read sessions automatically expire 6 hours after they
744 are created and do not require manual clean-up by the
745 caller.
746
747 .. code-block:: python
748
749 # This snippet has been automatically generated and should be regarded as a
750 # code template only.
751 # It will require modifications to work:
752 # - It may require correct/in-range values for request initialization.
753 # - It may require specifying regional endpoints when creating the service
754 # client as shown in:
755 # https://googleapis.dev/python/google-api-core/latest/client_options.html
756 from google.cloud import bigquery_storage_v1
757
758 def sample_create_read_session():
759 # Create a client
760 client = bigquery_storage_v1.BigQueryReadClient()
761
762 # Initialize request argument(s)
763 request = bigquery_storage_v1.CreateReadSessionRequest(
764 parent="parent_value",
765 )
766
767 # Make the request
768 response = client.create_read_session(request=request)
769
770 # Handle the response
771 print(response)
772
773 Args:
774 request (Union[google.cloud.bigquery_storage_v1.types.CreateReadSessionRequest, dict]):
775 The request object. Request message for ``CreateReadSession``.
776 parent (str):
777 Required. The request project that owns the session, in
778 the form of ``projects/{project_id}``.
779
780 This corresponds to the ``parent`` field
781 on the ``request`` instance; if ``request`` is provided, this
782 should not be set.
783 read_session (google.cloud.bigquery_storage_v1.types.ReadSession):
784 Required. Session to be created.
785 This corresponds to the ``read_session`` field
786 on the ``request`` instance; if ``request`` is provided, this
787 should not be set.
788 max_stream_count (int):
789 Max initial number of streams. If unset or zero, the
790 server will provide a value of streams so as to produce
791 reasonable throughput. Must be non-negative. The number
792 of streams may be lower than the requested number,
793 depending on the amount parallelism that is reasonable
794 for the table. There is a default system max limit of
795 1,000.
796
797 This must be greater than or equal to
798 preferred_min_stream_count. Typically, clients should
799 either leave this unset to let the system to determine
800 an upper bound OR set this a size for the maximum "units
801 of work" it can gracefully handle.
802
803 This corresponds to the ``max_stream_count`` field
804 on the ``request`` instance; if ``request`` is provided, this
805 should not be set.
806 retry (google.api_core.retry.Retry): Designation of what errors, if any,
807 should be retried.
808 timeout (float): The timeout for this request.
809 metadata (Sequence[Tuple[str, str]]): Strings which should be
810 sent along with the request as metadata.
811
812 Returns:
813 google.cloud.bigquery_storage_v1.types.ReadSession:
814 Information about the ReadSession.
815 """
816 # Create or coerce a protobuf request object.
817 # Quick check: If we got a request object, we should *not* have
818 # gotten any keyword arguments that map to the request.
819 has_flattened_params = any([parent, read_session, max_stream_count])
820 if request is not None and has_flattened_params:
821 raise ValueError(
822 "If the `request` argument is set, then none of "
823 "the individual field arguments should be set."
824 )
825
826 # Minor optimization to avoid making a copy if the user passes
827 # in a storage.CreateReadSessionRequest.
828 # There's no risk of modifying the input as we've already verified
829 # there are no flattened fields.
830 if not isinstance(request, storage.CreateReadSessionRequest):
831 request = storage.CreateReadSessionRequest(request)
832 # If we have keyword arguments corresponding to fields on the
833 # request, apply these.
834 if parent is not None:
835 request.parent = parent
836 if read_session is not None:
837 request.read_session = read_session
838 if max_stream_count is not None:
839 request.max_stream_count = max_stream_count
840
841 # Wrap the RPC method; this adds retry and timeout information,
842 # and friendly error handling.
843 rpc = self._transport._wrapped_methods[self._transport.create_read_session]
844
845 # Certain fields should be provided within the metadata header;
846 # add these here.
847 metadata = tuple(metadata) + (
848 gapic_v1.routing_header.to_grpc_metadata(
849 (("read_session.table", request.read_session.table),)
850 ),
851 )
852
853 # Validate the universe domain.
854 self._validate_universe_domain()
855
856 # Send the request.
857 response = rpc(
858 request,
859 retry=retry,
860 timeout=timeout,
861 metadata=metadata,
862 )
863
864 # Done; return the response.
865 return response
866
867 def read_rows(
868 self,
869 request: Optional[Union[storage.ReadRowsRequest, dict]] = None,
870 *,
871 read_stream: Optional[str] = None,
872 offset: Optional[int] = None,
873 retry: OptionalRetry = gapic_v1.method.DEFAULT,
874 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
875 metadata: Sequence[Tuple[str, str]] = (),
876 ) -> Iterable[storage.ReadRowsResponse]:
877 r"""Reads rows from the stream in the format prescribed
878 by the ReadSession. Each response contains one or more
879 table rows, up to a maximum of 100 MiB per response;
880 read requests which attempt to read individual rows
881 larger than 100 MiB will fail.
882
883 Each request also returns a set of stream statistics
884 reflecting the current state of the stream.
885
886 .. code-block:: python
887
888 # This snippet has been automatically generated and should be regarded as a
889 # code template only.
890 # It will require modifications to work:
891 # - It may require correct/in-range values for request initialization.
892 # - It may require specifying regional endpoints when creating the service
893 # client as shown in:
894 # https://googleapis.dev/python/google-api-core/latest/client_options.html
895 from google.cloud import bigquery_storage_v1
896
897 def sample_read_rows():
898 # Create a client
899 client = bigquery_storage_v1.BigQueryReadClient()
900
901 # Initialize request argument(s)
902 request = bigquery_storage_v1.ReadRowsRequest(
903 read_stream="read_stream_value",
904 )
905
906 # Make the request
907 stream = client.read_rows(request=request)
908
909 # Handle the response
910 for response in stream:
911 print(response)
912
913 Args:
914 request (Union[google.cloud.bigquery_storage_v1.types.ReadRowsRequest, dict]):
915 The request object. Request message for ``ReadRows``.
916 read_stream (str):
917 Required. Stream to read rows from.
918 This corresponds to the ``read_stream`` field
919 on the ``request`` instance; if ``request`` is provided, this
920 should not be set.
921 offset (int):
922 The offset requested must be less
923 than the last row read from Read.
924 Requesting a larger offset is undefined.
925 If not specified, start reading from
926 offset zero.
927
928 This corresponds to the ``offset`` field
929 on the ``request`` instance; if ``request`` is provided, this
930 should not be set.
931 retry (google.api_core.retry.Retry): Designation of what errors, if any,
932 should be retried.
933 timeout (float): The timeout for this request.
934 metadata (Sequence[Tuple[str, str]]): Strings which should be
935 sent along with the request as metadata.
936
937 Returns:
938 Iterable[google.cloud.bigquery_storage_v1.types.ReadRowsResponse]:
939 Response from calling ReadRows may include row data, progress and
940 throttling information.
941
942 """
943 # Create or coerce a protobuf request object.
944 # Quick check: If we got a request object, we should *not* have
945 # gotten any keyword arguments that map to the request.
946 has_flattened_params = any([read_stream, offset])
947 if request is not None and has_flattened_params:
948 raise ValueError(
949 "If the `request` argument is set, then none of "
950 "the individual field arguments should be set."
951 )
952
953 # Minor optimization to avoid making a copy if the user passes
954 # in a storage.ReadRowsRequest.
955 # There's no risk of modifying the input as we've already verified
956 # there are no flattened fields.
957 if not isinstance(request, storage.ReadRowsRequest):
958 request = storage.ReadRowsRequest(request)
959 # If we have keyword arguments corresponding to fields on the
960 # request, apply these.
961 if read_stream is not None:
962 request.read_stream = read_stream
963 if offset is not None:
964 request.offset = offset
965
966 # Wrap the RPC method; this adds retry and timeout information,
967 # and friendly error handling.
968 rpc = self._transport._wrapped_methods[self._transport.read_rows]
969
970 # Certain fields should be provided within the metadata header;
971 # add these here.
972 metadata = tuple(metadata) + (
973 gapic_v1.routing_header.to_grpc_metadata(
974 (("read_stream", request.read_stream),)
975 ),
976 )
977
978 # Validate the universe domain.
979 self._validate_universe_domain()
980
981 # Send the request.
982 response = rpc(
983 request,
984 retry=retry,
985 timeout=timeout,
986 metadata=metadata,
987 )
988
989 # Done; return the response.
990 return response
991
992 def split_read_stream(
993 self,
994 request: Optional[Union[storage.SplitReadStreamRequest, dict]] = None,
995 *,
996 retry: OptionalRetry = gapic_v1.method.DEFAULT,
997 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
998 metadata: Sequence[Tuple[str, str]] = (),
999 ) -> storage.SplitReadStreamResponse:
1000 r"""Splits a given ``ReadStream`` into two ``ReadStream`` objects.
1001 These ``ReadStream`` objects are referred to as the primary and
1002 the residual streams of the split. The original ``ReadStream``
1003 can still be read from in the same manner as before. Both of the
1004 returned ``ReadStream`` objects can also be read from, and the
1005 rows returned by both child streams will be the same as the rows
1006 read from the original stream.
1007
1008 Moreover, the two child streams will be allocated back-to-back
1009 in the original ``ReadStream``. Concretely, it is guaranteed
1010 that for streams original, primary, and residual, that
1011 original[0-j] = primary[0-j] and original[j-n] = residual[0-m]
1012 once the streams have been read to completion.
1013
1014 .. code-block:: python
1015
1016 # This snippet has been automatically generated and should be regarded as a
1017 # code template only.
1018 # It will require modifications to work:
1019 # - It may require correct/in-range values for request initialization.
1020 # - It may require specifying regional endpoints when creating the service
1021 # client as shown in:
1022 # https://googleapis.dev/python/google-api-core/latest/client_options.html
1023 from google.cloud import bigquery_storage_v1
1024
1025 def sample_split_read_stream():
1026 # Create a client
1027 client = bigquery_storage_v1.BigQueryReadClient()
1028
1029 # Initialize request argument(s)
1030 request = bigquery_storage_v1.SplitReadStreamRequest(
1031 name="name_value",
1032 )
1033
1034 # Make the request
1035 response = client.split_read_stream(request=request)
1036
1037 # Handle the response
1038 print(response)
1039
1040 Args:
1041 request (Union[google.cloud.bigquery_storage_v1.types.SplitReadStreamRequest, dict]):
1042 The request object. Request message for ``SplitReadStream``.
1043 retry (google.api_core.retry.Retry): Designation of what errors, if any,
1044 should be retried.
1045 timeout (float): The timeout for this request.
1046 metadata (Sequence[Tuple[str, str]]): Strings which should be
1047 sent along with the request as metadata.
1048
1049 Returns:
1050 google.cloud.bigquery_storage_v1.types.SplitReadStreamResponse:
1051 Response message for SplitReadStream.
1052 """
1053 # Create or coerce a protobuf request object.
1054 # Minor optimization to avoid making a copy if the user passes
1055 # in a storage.SplitReadStreamRequest.
1056 # There's no risk of modifying the input as we've already verified
1057 # there are no flattened fields.
1058 if not isinstance(request, storage.SplitReadStreamRequest):
1059 request = storage.SplitReadStreamRequest(request)
1060
1061 # Wrap the RPC method; this adds retry and timeout information,
1062 # and friendly error handling.
1063 rpc = self._transport._wrapped_methods[self._transport.split_read_stream]
1064
1065 # Certain fields should be provided within the metadata header;
1066 # add these here.
1067 metadata = tuple(metadata) + (
1068 gapic_v1.routing_header.to_grpc_metadata((("name", request.name),)),
1069 )
1070
1071 # Validate the universe domain.
1072 self._validate_universe_domain()
1073
1074 # Send the request.
1075 response = rpc(
1076 request,
1077 retry=retry,
1078 timeout=timeout,
1079 metadata=metadata,
1080 )
1081
1082 # Done; return the response.
1083 return response
1084
1085 def __enter__(self) -> "BigQueryReadClient":
1086 return self
1087
1088 def __exit__(self, type, value, traceback):
1089 """Releases underlying transport's resources.
1090
1091 .. warning::
1092 ONLY use as a context manager if the transport is NOT shared
1093 with other clients! Exiting the with block will CLOSE the transport
1094 and may cause errors in other clients!
1095 """
1096 self.transport.close()
1097
1098
1099DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo(
1100 gapic_version=package_version.__version__
1101)
1102
1103
1104__all__ = ("BigQueryReadClient",)