1# -*- coding: utf-8 -*-
2# Copyright 2025 Google LLC
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16from collections import OrderedDict
17from http import HTTPStatus
18import json
19import logging as std_logging
20import os
21import re
22from typing import (
23 Callable,
24 Dict,
25 Iterable,
26 Mapping,
27 MutableMapping,
28 MutableSequence,
29 Optional,
30 Sequence,
31 Tuple,
32 Type,
33 Union,
34 cast,
35)
36import warnings
37
38from google.api_core import client_options as client_options_lib
39from google.api_core import exceptions as core_exceptions
40from google.api_core import gapic_v1
41from google.api_core import retry as retries
42from google.auth import credentials as ga_credentials # type: ignore
43from google.auth.exceptions import MutualTLSChannelError # type: ignore
44from google.auth.transport import mtls # type: ignore
45from google.auth.transport.grpc import SslCredentials # type: ignore
46from google.oauth2 import service_account # type: ignore
47import google.protobuf
48
49from google.cloud.bigquery_storage_v1 import gapic_version as package_version
50
51try:
52 OptionalRetry = Union[retries.Retry, gapic_v1.method._MethodDefault, None]
53except AttributeError: # pragma: NO COVER
54 OptionalRetry = Union[retries.Retry, object, None] # type: ignore
55
56try:
57 from google.api_core import client_logging # type: ignore
58
59 CLIENT_LOGGING_SUPPORTED = True # pragma: NO COVER
60except ImportError: # pragma: NO COVER
61 CLIENT_LOGGING_SUPPORTED = False
62
63_LOGGER = std_logging.getLogger(__name__)
64
65from google.protobuf import timestamp_pb2 # type: ignore
66
67from google.cloud.bigquery_storage_v1.types import arrow, avro, storage, stream
68
69from .transports.base import DEFAULT_CLIENT_INFO, BigQueryReadTransport
70from .transports.grpc import BigQueryReadGrpcTransport
71from .transports.grpc_asyncio import BigQueryReadGrpcAsyncIOTransport
72
73
74class BigQueryReadClientMeta(type):
75 """Metaclass for the BigQueryRead client.
76
77 This provides class-level methods for building and retrieving
78 support objects (e.g. transport) without polluting the client instance
79 objects.
80 """
81
82 _transport_registry = OrderedDict() # type: Dict[str, Type[BigQueryReadTransport]]
83 _transport_registry["grpc"] = BigQueryReadGrpcTransport
84 _transport_registry["grpc_asyncio"] = BigQueryReadGrpcAsyncIOTransport
85
86 def get_transport_class(
87 cls,
88 label: Optional[str] = None,
89 ) -> Type[BigQueryReadTransport]:
90 """Returns an appropriate transport class.
91
92 Args:
93 label: The name of the desired transport. If none is
94 provided, then the first transport in the registry is used.
95
96 Returns:
97 The transport class to use.
98 """
99 # If a specific transport is requested, return that one.
100 if label:
101 return cls._transport_registry[label]
102
103 # No transport is requested; return the default (that is, the first one
104 # in the dictionary).
105 return next(iter(cls._transport_registry.values()))
106
107
108class BigQueryReadClient(metaclass=BigQueryReadClientMeta):
109 """BigQuery Read API.
110
111 The Read API can be used to read data from BigQuery.
112 """
113
114 @staticmethod
115 def _get_default_mtls_endpoint(api_endpoint):
116 """Converts api endpoint to mTLS endpoint.
117
118 Convert "*.sandbox.googleapis.com" and "*.googleapis.com" to
119 "*.mtls.sandbox.googleapis.com" and "*.mtls.googleapis.com" respectively.
120 Args:
121 api_endpoint (Optional[str]): the api endpoint to convert.
122 Returns:
123 str: converted mTLS api endpoint.
124 """
125 if not api_endpoint:
126 return api_endpoint
127
128 mtls_endpoint_re = re.compile(
129 r"(?P<name>[^.]+)(?P<mtls>\.mtls)?(?P<sandbox>\.sandbox)?(?P<googledomain>\.googleapis\.com)?"
130 )
131
132 m = mtls_endpoint_re.match(api_endpoint)
133 name, mtls, sandbox, googledomain = m.groups()
134 if mtls or not googledomain:
135 return api_endpoint
136
137 if sandbox:
138 return api_endpoint.replace(
139 "sandbox.googleapis.com", "mtls.sandbox.googleapis.com"
140 )
141
142 return api_endpoint.replace(".googleapis.com", ".mtls.googleapis.com")
143
144 # Note: DEFAULT_ENDPOINT is deprecated. Use _DEFAULT_ENDPOINT_TEMPLATE instead.
145 DEFAULT_ENDPOINT = "bigquerystorage.googleapis.com"
146 DEFAULT_MTLS_ENDPOINT = _get_default_mtls_endpoint.__func__( # type: ignore
147 DEFAULT_ENDPOINT
148 )
149
150 _DEFAULT_ENDPOINT_TEMPLATE = "bigquerystorage.{UNIVERSE_DOMAIN}"
151 _DEFAULT_UNIVERSE = "googleapis.com"
152
153 @classmethod
154 def from_service_account_info(cls, info: dict, *args, **kwargs):
155 """Creates an instance of this client using the provided credentials
156 info.
157
158 Args:
159 info (dict): The service account private key info.
160 args: Additional arguments to pass to the constructor.
161 kwargs: Additional arguments to pass to the constructor.
162
163 Returns:
164 BigQueryReadClient: The constructed client.
165 """
166 credentials = service_account.Credentials.from_service_account_info(info)
167 kwargs["credentials"] = credentials
168 return cls(*args, **kwargs)
169
170 @classmethod
171 def from_service_account_file(cls, filename: str, *args, **kwargs):
172 """Creates an instance of this client using the provided credentials
173 file.
174
175 Args:
176 filename (str): The path to the service account private key json
177 file.
178 args: Additional arguments to pass to the constructor.
179 kwargs: Additional arguments to pass to the constructor.
180
181 Returns:
182 BigQueryReadClient: The constructed client.
183 """
184 credentials = service_account.Credentials.from_service_account_file(filename)
185 kwargs["credentials"] = credentials
186 return cls(*args, **kwargs)
187
188 from_service_account_json = from_service_account_file
189
190 @property
191 def transport(self) -> BigQueryReadTransport:
192 """Returns the transport used by the client instance.
193
194 Returns:
195 BigQueryReadTransport: The transport used by the client
196 instance.
197 """
198 return self._transport
199
200 @staticmethod
201 def read_session_path(
202 project: str,
203 location: str,
204 session: str,
205 ) -> str:
206 """Returns a fully-qualified read_session string."""
207 return "projects/{project}/locations/{location}/sessions/{session}".format(
208 project=project,
209 location=location,
210 session=session,
211 )
212
213 @staticmethod
214 def parse_read_session_path(path: str) -> Dict[str, str]:
215 """Parses a read_session path into its component segments."""
216 m = re.match(
217 r"^projects/(?P<project>.+?)/locations/(?P<location>.+?)/sessions/(?P<session>.+?)$",
218 path,
219 )
220 return m.groupdict() if m else {}
221
222 @staticmethod
223 def read_stream_path(
224 project: str,
225 location: str,
226 session: str,
227 stream: str,
228 ) -> str:
229 """Returns a fully-qualified read_stream string."""
230 return "projects/{project}/locations/{location}/sessions/{session}/streams/{stream}".format(
231 project=project,
232 location=location,
233 session=session,
234 stream=stream,
235 )
236
237 @staticmethod
238 def parse_read_stream_path(path: str) -> Dict[str, str]:
239 """Parses a read_stream path into its component segments."""
240 m = re.match(
241 r"^projects/(?P<project>.+?)/locations/(?P<location>.+?)/sessions/(?P<session>.+?)/streams/(?P<stream>.+?)$",
242 path,
243 )
244 return m.groupdict() if m else {}
245
246 @staticmethod
247 def table_path(
248 project: str,
249 dataset: str,
250 table: str,
251 ) -> str:
252 """Returns a fully-qualified table string."""
253 return "projects/{project}/datasets/{dataset}/tables/{table}".format(
254 project=project,
255 dataset=dataset,
256 table=table,
257 )
258
259 @staticmethod
260 def parse_table_path(path: str) -> Dict[str, str]:
261 """Parses a table path into its component segments."""
262 m = re.match(
263 r"^projects/(?P<project>.+?)/datasets/(?P<dataset>.+?)/tables/(?P<table>.+?)$",
264 path,
265 )
266 return m.groupdict() if m else {}
267
268 @staticmethod
269 def common_billing_account_path(
270 billing_account: str,
271 ) -> str:
272 """Returns a fully-qualified billing_account string."""
273 return "billingAccounts/{billing_account}".format(
274 billing_account=billing_account,
275 )
276
277 @staticmethod
278 def parse_common_billing_account_path(path: str) -> Dict[str, str]:
279 """Parse a billing_account path into its component segments."""
280 m = re.match(r"^billingAccounts/(?P<billing_account>.+?)$", path)
281 return m.groupdict() if m else {}
282
283 @staticmethod
284 def common_folder_path(
285 folder: str,
286 ) -> str:
287 """Returns a fully-qualified folder string."""
288 return "folders/{folder}".format(
289 folder=folder,
290 )
291
292 @staticmethod
293 def parse_common_folder_path(path: str) -> Dict[str, str]:
294 """Parse a folder path into its component segments."""
295 m = re.match(r"^folders/(?P<folder>.+?)$", path)
296 return m.groupdict() if m else {}
297
298 @staticmethod
299 def common_organization_path(
300 organization: str,
301 ) -> str:
302 """Returns a fully-qualified organization string."""
303 return "organizations/{organization}".format(
304 organization=organization,
305 )
306
307 @staticmethod
308 def parse_common_organization_path(path: str) -> Dict[str, str]:
309 """Parse a organization path into its component segments."""
310 m = re.match(r"^organizations/(?P<organization>.+?)$", path)
311 return m.groupdict() if m else {}
312
313 @staticmethod
314 def common_project_path(
315 project: str,
316 ) -> str:
317 """Returns a fully-qualified project string."""
318 return "projects/{project}".format(
319 project=project,
320 )
321
322 @staticmethod
323 def parse_common_project_path(path: str) -> Dict[str, str]:
324 """Parse a project path into its component segments."""
325 m = re.match(r"^projects/(?P<project>.+?)$", path)
326 return m.groupdict() if m else {}
327
328 @staticmethod
329 def common_location_path(
330 project: str,
331 location: str,
332 ) -> str:
333 """Returns a fully-qualified location string."""
334 return "projects/{project}/locations/{location}".format(
335 project=project,
336 location=location,
337 )
338
339 @staticmethod
340 def parse_common_location_path(path: str) -> Dict[str, str]:
341 """Parse a location path into its component segments."""
342 m = re.match(r"^projects/(?P<project>.+?)/locations/(?P<location>.+?)$", path)
343 return m.groupdict() if m else {}
344
345 @classmethod
346 def get_mtls_endpoint_and_cert_source(
347 cls, client_options: Optional[client_options_lib.ClientOptions] = None
348 ):
349 """Deprecated. Return the API endpoint and client cert source for mutual TLS.
350
351 The client cert source is determined in the following order:
352 (1) if `GOOGLE_API_USE_CLIENT_CERTIFICATE` environment variable is not "true", the
353 client cert source is None.
354 (2) if `client_options.client_cert_source` is provided, use the provided one; if the
355 default client cert source exists, use the default one; otherwise the client cert
356 source is None.
357
358 The API endpoint is determined in the following order:
359 (1) if `client_options.api_endpoint` if provided, use the provided one.
360 (2) if `GOOGLE_API_USE_CLIENT_CERTIFICATE` environment variable is "always", use the
361 default mTLS endpoint; if the environment variable is "never", use the default API
362 endpoint; otherwise if client cert source exists, use the default mTLS endpoint, otherwise
363 use the default API endpoint.
364
365 More details can be found at https://google.aip.dev/auth/4114.
366
367 Args:
368 client_options (google.api_core.client_options.ClientOptions): Custom options for the
369 client. Only the `api_endpoint` and `client_cert_source` properties may be used
370 in this method.
371
372 Returns:
373 Tuple[str, Callable[[], Tuple[bytes, bytes]]]: returns the API endpoint and the
374 client cert source to use.
375
376 Raises:
377 google.auth.exceptions.MutualTLSChannelError: If any errors happen.
378 """
379
380 warnings.warn(
381 "get_mtls_endpoint_and_cert_source is deprecated. Use the api_endpoint property instead.",
382 DeprecationWarning,
383 )
384 if client_options is None:
385 client_options = client_options_lib.ClientOptions()
386 use_client_cert = os.getenv("GOOGLE_API_USE_CLIENT_CERTIFICATE", "false")
387 use_mtls_endpoint = os.getenv("GOOGLE_API_USE_MTLS_ENDPOINT", "auto")
388 if use_client_cert not in ("true", "false"):
389 raise ValueError(
390 "Environment variable `GOOGLE_API_USE_CLIENT_CERTIFICATE` must be either `true` or `false`"
391 )
392 if use_mtls_endpoint not in ("auto", "never", "always"):
393 raise MutualTLSChannelError(
394 "Environment variable `GOOGLE_API_USE_MTLS_ENDPOINT` must be `never`, `auto` or `always`"
395 )
396
397 # Figure out the client cert source to use.
398 client_cert_source = None
399 if use_client_cert == "true":
400 if client_options.client_cert_source:
401 client_cert_source = client_options.client_cert_source
402 elif mtls.has_default_client_cert_source():
403 client_cert_source = mtls.default_client_cert_source()
404
405 # Figure out which api endpoint to use.
406 if client_options.api_endpoint is not None:
407 api_endpoint = client_options.api_endpoint
408 elif use_mtls_endpoint == "always" or (
409 use_mtls_endpoint == "auto" and client_cert_source
410 ):
411 api_endpoint = cls.DEFAULT_MTLS_ENDPOINT
412 else:
413 api_endpoint = cls.DEFAULT_ENDPOINT
414
415 return api_endpoint, client_cert_source
416
417 @staticmethod
418 def _read_environment_variables():
419 """Returns the environment variables used by the client.
420
421 Returns:
422 Tuple[bool, str, str]: returns the GOOGLE_API_USE_CLIENT_CERTIFICATE,
423 GOOGLE_API_USE_MTLS_ENDPOINT, and GOOGLE_CLOUD_UNIVERSE_DOMAIN environment variables.
424
425 Raises:
426 ValueError: If GOOGLE_API_USE_CLIENT_CERTIFICATE is not
427 any of ["true", "false"].
428 google.auth.exceptions.MutualTLSChannelError: If GOOGLE_API_USE_MTLS_ENDPOINT
429 is not any of ["auto", "never", "always"].
430 """
431 use_client_cert = os.getenv(
432 "GOOGLE_API_USE_CLIENT_CERTIFICATE", "false"
433 ).lower()
434 use_mtls_endpoint = os.getenv("GOOGLE_API_USE_MTLS_ENDPOINT", "auto").lower()
435 universe_domain_env = os.getenv("GOOGLE_CLOUD_UNIVERSE_DOMAIN")
436 if use_client_cert not in ("true", "false"):
437 raise ValueError(
438 "Environment variable `GOOGLE_API_USE_CLIENT_CERTIFICATE` must be either `true` or `false`"
439 )
440 if use_mtls_endpoint not in ("auto", "never", "always"):
441 raise MutualTLSChannelError(
442 "Environment variable `GOOGLE_API_USE_MTLS_ENDPOINT` must be `never`, `auto` or `always`"
443 )
444 return use_client_cert == "true", use_mtls_endpoint, universe_domain_env
445
446 @staticmethod
447 def _get_client_cert_source(provided_cert_source, use_cert_flag):
448 """Return the client cert source to be used by the client.
449
450 Args:
451 provided_cert_source (bytes): The client certificate source provided.
452 use_cert_flag (bool): A flag indicating whether to use the client certificate.
453
454 Returns:
455 bytes or None: The client cert source to be used by the client.
456 """
457 client_cert_source = None
458 if use_cert_flag:
459 if provided_cert_source:
460 client_cert_source = provided_cert_source
461 elif mtls.has_default_client_cert_source():
462 client_cert_source = mtls.default_client_cert_source()
463 return client_cert_source
464
465 @staticmethod
466 def _get_api_endpoint(
467 api_override, client_cert_source, universe_domain, use_mtls_endpoint
468 ):
469 """Return the API endpoint used by the client.
470
471 Args:
472 api_override (str): The API endpoint override. If specified, this is always
473 the return value of this function and the other arguments are not used.
474 client_cert_source (bytes): The client certificate source used by the client.
475 universe_domain (str): The universe domain used by the client.
476 use_mtls_endpoint (str): How to use the mTLS endpoint, which depends also on the other parameters.
477 Possible values are "always", "auto", or "never".
478
479 Returns:
480 str: The API endpoint to be used by the client.
481 """
482 if api_override is not None:
483 api_endpoint = api_override
484 elif use_mtls_endpoint == "always" or (
485 use_mtls_endpoint == "auto" and client_cert_source
486 ):
487 _default_universe = BigQueryReadClient._DEFAULT_UNIVERSE
488 if universe_domain != _default_universe:
489 raise MutualTLSChannelError(
490 f"mTLS is not supported in any universe other than {_default_universe}."
491 )
492 api_endpoint = BigQueryReadClient.DEFAULT_MTLS_ENDPOINT
493 else:
494 api_endpoint = BigQueryReadClient._DEFAULT_ENDPOINT_TEMPLATE.format(
495 UNIVERSE_DOMAIN=universe_domain
496 )
497 return api_endpoint
498
499 @staticmethod
500 def _get_universe_domain(
501 client_universe_domain: Optional[str], universe_domain_env: Optional[str]
502 ) -> str:
503 """Return the universe domain used by the client.
504
505 Args:
506 client_universe_domain (Optional[str]): The universe domain configured via the client options.
507 universe_domain_env (Optional[str]): The universe domain configured via the "GOOGLE_CLOUD_UNIVERSE_DOMAIN" environment variable.
508
509 Returns:
510 str: The universe domain to be used by the client.
511
512 Raises:
513 ValueError: If the universe domain is an empty string.
514 """
515 universe_domain = BigQueryReadClient._DEFAULT_UNIVERSE
516 if client_universe_domain is not None:
517 universe_domain = client_universe_domain
518 elif universe_domain_env is not None:
519 universe_domain = universe_domain_env
520 if len(universe_domain.strip()) == 0:
521 raise ValueError("Universe Domain cannot be an empty string.")
522 return universe_domain
523
524 def _validate_universe_domain(self):
525 """Validates client's and credentials' universe domains are consistent.
526
527 Returns:
528 bool: True iff the configured universe domain is valid.
529
530 Raises:
531 ValueError: If the configured universe domain is not valid.
532 """
533
534 # NOTE (b/349488459): universe validation is disabled until further notice.
535 return True
536
537 def _add_cred_info_for_auth_errors(
538 self, error: core_exceptions.GoogleAPICallError
539 ) -> None:
540 """Adds credential info string to error details for 401/403/404 errors.
541
542 Args:
543 error (google.api_core.exceptions.GoogleAPICallError): The error to add the cred info.
544 """
545 if error.code not in [
546 HTTPStatus.UNAUTHORIZED,
547 HTTPStatus.FORBIDDEN,
548 HTTPStatus.NOT_FOUND,
549 ]:
550 return
551
552 cred = self._transport._credentials
553
554 # get_cred_info is only available in google-auth>=2.35.0
555 if not hasattr(cred, "get_cred_info"):
556 return
557
558 # ignore the type check since pypy test fails when get_cred_info
559 # is not available
560 cred_info = cred.get_cred_info() # type: ignore
561 if cred_info and hasattr(error._details, "append"):
562 error._details.append(json.dumps(cred_info))
563
564 @property
565 def api_endpoint(self):
566 """Return the API endpoint used by the client instance.
567
568 Returns:
569 str: The API endpoint used by the client instance.
570 """
571 return self._api_endpoint
572
573 @property
574 def universe_domain(self) -> str:
575 """Return the universe domain used by the client instance.
576
577 Returns:
578 str: The universe domain used by the client instance.
579 """
580 return self._universe_domain
581
582 def __init__(
583 self,
584 *,
585 credentials: Optional[ga_credentials.Credentials] = None,
586 transport: Optional[
587 Union[str, BigQueryReadTransport, Callable[..., BigQueryReadTransport]]
588 ] = None,
589 client_options: Optional[Union[client_options_lib.ClientOptions, dict]] = None,
590 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO,
591 ) -> None:
592 """Instantiates the big query read client.
593
594 Args:
595 credentials (Optional[google.auth.credentials.Credentials]): The
596 authorization credentials to attach to requests. These
597 credentials identify the application to the service; if none
598 are specified, the client will attempt to ascertain the
599 credentials from the environment.
600 transport (Optional[Union[str,BigQueryReadTransport,Callable[..., BigQueryReadTransport]]]):
601 The transport to use, or a Callable that constructs and returns a new transport.
602 If a Callable is given, it will be called with the same set of initialization
603 arguments as used in the BigQueryReadTransport constructor.
604 If set to None, a transport is chosen automatically.
605 client_options (Optional[Union[google.api_core.client_options.ClientOptions, dict]]):
606 Custom options for the client.
607
608 1. The ``api_endpoint`` property can be used to override the
609 default endpoint provided by the client when ``transport`` is
610 not explicitly provided. Only if this property is not set and
611 ``transport`` was not explicitly provided, the endpoint is
612 determined by the GOOGLE_API_USE_MTLS_ENDPOINT environment
613 variable, which have one of the following values:
614 "always" (always use the default mTLS endpoint), "never" (always
615 use the default regular endpoint) and "auto" (auto-switch to the
616 default mTLS endpoint if client certificate is present; this is
617 the default value).
618
619 2. If the GOOGLE_API_USE_CLIENT_CERTIFICATE environment variable
620 is "true", then the ``client_cert_source`` property can be used
621 to provide a client certificate for mTLS transport. If
622 not provided, the default SSL client certificate will be used if
623 present. If GOOGLE_API_USE_CLIENT_CERTIFICATE is "false" or not
624 set, no client certificate will be used.
625
626 3. The ``universe_domain`` property can be used to override the
627 default "googleapis.com" universe. Note that the ``api_endpoint``
628 property still takes precedence; and ``universe_domain`` is
629 currently not supported for mTLS.
630
631 client_info (google.api_core.gapic_v1.client_info.ClientInfo):
632 The client info used to send a user-agent string along with
633 API requests. If ``None``, then default info will be used.
634 Generally, you only need to set this if you're developing
635 your own client library.
636
637 Raises:
638 google.auth.exceptions.MutualTLSChannelError: If mutual TLS transport
639 creation failed for any reason.
640 """
641 self._client_options = client_options
642 if isinstance(self._client_options, dict):
643 self._client_options = client_options_lib.from_dict(self._client_options)
644 if self._client_options is None:
645 self._client_options = client_options_lib.ClientOptions()
646 self._client_options = cast(
647 client_options_lib.ClientOptions, self._client_options
648 )
649
650 universe_domain_opt = getattr(self._client_options, "universe_domain", None)
651
652 (
653 self._use_client_cert,
654 self._use_mtls_endpoint,
655 self._universe_domain_env,
656 ) = BigQueryReadClient._read_environment_variables()
657 self._client_cert_source = BigQueryReadClient._get_client_cert_source(
658 self._client_options.client_cert_source, self._use_client_cert
659 )
660 self._universe_domain = BigQueryReadClient._get_universe_domain(
661 universe_domain_opt, self._universe_domain_env
662 )
663 self._api_endpoint = None # updated below, depending on `transport`
664
665 # Initialize the universe domain validation.
666 self._is_universe_domain_valid = False
667
668 if CLIENT_LOGGING_SUPPORTED: # pragma: NO COVER
669 # Setup logging.
670 client_logging.initialize_logging()
671
672 api_key_value = getattr(self._client_options, "api_key", None)
673 if api_key_value and credentials:
674 raise ValueError(
675 "client_options.api_key and credentials are mutually exclusive"
676 )
677
678 # Save or instantiate the transport.
679 # Ordinarily, we provide the transport, but allowing a custom transport
680 # instance provides an extensibility point for unusual situations.
681 transport_provided = isinstance(transport, BigQueryReadTransport)
682 if transport_provided:
683 # transport is a BigQueryReadTransport instance.
684 if credentials or self._client_options.credentials_file or api_key_value:
685 raise ValueError(
686 "When providing a transport instance, "
687 "provide its credentials directly."
688 )
689 if self._client_options.scopes:
690 raise ValueError(
691 "When providing a transport instance, provide its scopes "
692 "directly."
693 )
694 self._transport = cast(BigQueryReadTransport, transport)
695 self._api_endpoint = self._transport.host
696
697 self._api_endpoint = self._api_endpoint or BigQueryReadClient._get_api_endpoint(
698 self._client_options.api_endpoint,
699 self._client_cert_source,
700 self._universe_domain,
701 self._use_mtls_endpoint,
702 )
703
704 if not transport_provided:
705 import google.auth._default # type: ignore
706
707 if api_key_value and hasattr(
708 google.auth._default, "get_api_key_credentials"
709 ):
710 credentials = google.auth._default.get_api_key_credentials(
711 api_key_value
712 )
713
714 transport_init: Union[
715 Type[BigQueryReadTransport], Callable[..., BigQueryReadTransport]
716 ] = (
717 BigQueryReadClient.get_transport_class(transport)
718 if isinstance(transport, str) or transport is None
719 else cast(Callable[..., BigQueryReadTransport], transport)
720 )
721 # initialize with the provided callable or the passed in class
722 self._transport = transport_init(
723 credentials=credentials,
724 credentials_file=self._client_options.credentials_file,
725 host=self._api_endpoint,
726 scopes=self._client_options.scopes,
727 client_cert_source_for_mtls=self._client_cert_source,
728 quota_project_id=self._client_options.quota_project_id,
729 client_info=client_info,
730 always_use_jwt_access=True,
731 api_audience=self._client_options.api_audience,
732 )
733
734 if "async" not in str(self._transport):
735 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(
736 std_logging.DEBUG
737 ): # pragma: NO COVER
738 _LOGGER.debug(
739 "Created client `google.cloud.bigquery.storage_v1.BigQueryReadClient`.",
740 extra={
741 "serviceName": "google.cloud.bigquery.storage.v1.BigQueryRead",
742 "universeDomain": getattr(
743 self._transport._credentials, "universe_domain", ""
744 ),
745 "credentialsType": f"{type(self._transport._credentials).__module__}.{type(self._transport._credentials).__qualname__}",
746 "credentialsInfo": getattr(
747 self.transport._credentials, "get_cred_info", lambda: None
748 )(),
749 }
750 if hasattr(self._transport, "_credentials")
751 else {
752 "serviceName": "google.cloud.bigquery.storage.v1.BigQueryRead",
753 "credentialsType": None,
754 },
755 )
756
757 def create_read_session(
758 self,
759 request: Optional[Union[storage.CreateReadSessionRequest, dict]] = None,
760 *,
761 parent: Optional[str] = None,
762 read_session: Optional[stream.ReadSession] = None,
763 max_stream_count: Optional[int] = None,
764 retry: OptionalRetry = gapic_v1.method.DEFAULT,
765 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
766 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
767 ) -> stream.ReadSession:
768 r"""Creates a new read session. A read session divides
769 the contents of a BigQuery table into one or more
770 streams, which can then be used to read data from the
771 table. The read session also specifies properties of the
772 data to be read, such as a list of columns or a
773 push-down filter describing the rows to be returned.
774
775 A particular row can be read by at most one stream. When
776 the caller has reached the end of each stream in the
777 session, then all the data in the table has been read.
778
779 Data is assigned to each stream such that roughly the
780 same number of rows can be read from each stream.
781 Because the server-side unit for assigning data is
782 collections of rows, the API does not guarantee that
783 each stream will return the same number or rows.
784 Additionally, the limits are enforced based on the
785 number of pre-filtered rows, so some filters can lead to
786 lopsided assignments.
787
788 Read sessions automatically expire 6 hours after they
789 are created and do not require manual clean-up by the
790 caller.
791
792 .. code-block:: python
793
794 # This snippet has been automatically generated and should be regarded as a
795 # code template only.
796 # It will require modifications to work:
797 # - It may require correct/in-range values for request initialization.
798 # - It may require specifying regional endpoints when creating the service
799 # client as shown in:
800 # https://googleapis.dev/python/google-api-core/latest/client_options.html
801 from google.cloud import bigquery_storage_v1
802
803 def sample_create_read_session():
804 # Create a client
805 client = bigquery_storage_v1.BigQueryReadClient()
806
807 # Initialize request argument(s)
808 request = bigquery_storage_v1.CreateReadSessionRequest(
809 parent="parent_value",
810 )
811
812 # Make the request
813 response = client.create_read_session(request=request)
814
815 # Handle the response
816 print(response)
817
818 Args:
819 request (Union[google.cloud.bigquery_storage_v1.types.CreateReadSessionRequest, dict]):
820 The request object. Request message for ``CreateReadSession``.
821 parent (str):
822 Required. The request project that owns the session, in
823 the form of ``projects/{project_id}``.
824
825 This corresponds to the ``parent`` field
826 on the ``request`` instance; if ``request`` is provided, this
827 should not be set.
828 read_session (google.cloud.bigquery_storage_v1.types.ReadSession):
829 Required. Session to be created.
830 This corresponds to the ``read_session`` field
831 on the ``request`` instance; if ``request`` is provided, this
832 should not be set.
833 max_stream_count (int):
834 Max initial number of streams. If unset or zero, the
835 server will provide a value of streams so as to produce
836 reasonable throughput. Must be non-negative. The number
837 of streams may be lower than the requested number,
838 depending on the amount parallelism that is reasonable
839 for the table. There is a default system max limit of
840 1,000.
841
842 This must be greater than or equal to
843 preferred_min_stream_count. Typically, clients should
844 either leave this unset to let the system to determine
845 an upper bound OR set this a size for the maximum "units
846 of work" it can gracefully handle.
847
848 This corresponds to the ``max_stream_count`` field
849 on the ``request`` instance; if ``request`` is provided, this
850 should not be set.
851 retry (google.api_core.retry.Retry): Designation of what errors, if any,
852 should be retried.
853 timeout (float): The timeout for this request.
854 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
855 sent along with the request as metadata. Normally, each value must be of type `str`,
856 but for metadata keys ending with the suffix `-bin`, the corresponding values must
857 be of type `bytes`.
858
859 Returns:
860 google.cloud.bigquery_storage_v1.types.ReadSession:
861 Information about the ReadSession.
862 """
863 # Create or coerce a protobuf request object.
864 # - Quick check: If we got a request object, we should *not* have
865 # gotten any keyword arguments that map to the request.
866 flattened_params = [parent, read_session, max_stream_count]
867 has_flattened_params = (
868 len([param for param in flattened_params if param is not None]) > 0
869 )
870 if request is not None and has_flattened_params:
871 raise ValueError(
872 "If the `request` argument is set, then none of "
873 "the individual field arguments should be set."
874 )
875
876 # - Use the request object if provided (there's no risk of modifying the input as
877 # there are no flattened fields), or create one.
878 if not isinstance(request, storage.CreateReadSessionRequest):
879 request = storage.CreateReadSessionRequest(request)
880 # If we have keyword arguments corresponding to fields on the
881 # request, apply these.
882 if parent is not None:
883 request.parent = parent
884 if read_session is not None:
885 request.read_session = read_session
886 if max_stream_count is not None:
887 request.max_stream_count = max_stream_count
888
889 # Wrap the RPC method; this adds retry and timeout information,
890 # and friendly error handling.
891 rpc = self._transport._wrapped_methods[self._transport.create_read_session]
892
893 # Certain fields should be provided within the metadata header;
894 # add these here.
895 metadata = tuple(metadata) + (
896 gapic_v1.routing_header.to_grpc_metadata(
897 (("read_session.table", request.read_session.table),)
898 ),
899 )
900
901 # Validate the universe domain.
902 self._validate_universe_domain()
903
904 # Send the request.
905 response = rpc(
906 request,
907 retry=retry,
908 timeout=timeout,
909 metadata=metadata,
910 )
911
912 # Done; return the response.
913 return response
914
915 def read_rows(
916 self,
917 request: Optional[Union[storage.ReadRowsRequest, dict]] = None,
918 *,
919 read_stream: Optional[str] = None,
920 offset: Optional[int] = None,
921 retry: OptionalRetry = gapic_v1.method.DEFAULT,
922 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
923 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
924 ) -> Iterable[storage.ReadRowsResponse]:
925 r"""Reads rows from the stream in the format prescribed
926 by the ReadSession. Each response contains one or more
927 table rows, up to a maximum of 100 MiB per response;
928 read requests which attempt to read individual rows
929 larger than 100 MiB will fail.
930
931 Each request also returns a set of stream statistics
932 reflecting the current state of the stream.
933
934 .. code-block:: python
935
936 # This snippet has been automatically generated and should be regarded as a
937 # code template only.
938 # It will require modifications to work:
939 # - It may require correct/in-range values for request initialization.
940 # - It may require specifying regional endpoints when creating the service
941 # client as shown in:
942 # https://googleapis.dev/python/google-api-core/latest/client_options.html
943 from google.cloud import bigquery_storage_v1
944
945 def sample_read_rows():
946 # Create a client
947 client = bigquery_storage_v1.BigQueryReadClient()
948
949 # Initialize request argument(s)
950 request = bigquery_storage_v1.ReadRowsRequest(
951 read_stream="read_stream_value",
952 )
953
954 # Make the request
955 stream = client.read_rows(request=request)
956
957 # Handle the response
958 for response in stream:
959 print(response)
960
961 Args:
962 request (Union[google.cloud.bigquery_storage_v1.types.ReadRowsRequest, dict]):
963 The request object. Request message for ``ReadRows``.
964 read_stream (str):
965 Required. Stream to read rows from.
966 This corresponds to the ``read_stream`` field
967 on the ``request`` instance; if ``request`` is provided, this
968 should not be set.
969 offset (int):
970 The offset requested must be less
971 than the last row read from Read.
972 Requesting a larger offset is undefined.
973 If not specified, start reading from
974 offset zero.
975
976 This corresponds to the ``offset`` field
977 on the ``request`` instance; if ``request`` is provided, this
978 should not be set.
979 retry (google.api_core.retry.Retry): Designation of what errors, if any,
980 should be retried.
981 timeout (float): The timeout for this request.
982 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
983 sent along with the request as metadata. Normally, each value must be of type `str`,
984 but for metadata keys ending with the suffix `-bin`, the corresponding values must
985 be of type `bytes`.
986
987 Returns:
988 Iterable[google.cloud.bigquery_storage_v1.types.ReadRowsResponse]:
989 Response from calling ReadRows may include row data, progress and
990 throttling information.
991
992 """
993 # Create or coerce a protobuf request object.
994 # - Quick check: If we got a request object, we should *not* have
995 # gotten any keyword arguments that map to the request.
996 flattened_params = [read_stream, offset]
997 has_flattened_params = (
998 len([param for param in flattened_params if param is not None]) > 0
999 )
1000 if request is not None and has_flattened_params:
1001 raise ValueError(
1002 "If the `request` argument is set, then none of "
1003 "the individual field arguments should be set."
1004 )
1005
1006 # - Use the request object if provided (there's no risk of modifying the input as
1007 # there are no flattened fields), or create one.
1008 if not isinstance(request, storage.ReadRowsRequest):
1009 request = storage.ReadRowsRequest(request)
1010 # If we have keyword arguments corresponding to fields on the
1011 # request, apply these.
1012 if read_stream is not None:
1013 request.read_stream = read_stream
1014 if offset is not None:
1015 request.offset = offset
1016
1017 # Wrap the RPC method; this adds retry and timeout information,
1018 # and friendly error handling.
1019 rpc = self._transport._wrapped_methods[self._transport.read_rows]
1020
1021 # Certain fields should be provided within the metadata header;
1022 # add these here.
1023 metadata = tuple(metadata) + (
1024 gapic_v1.routing_header.to_grpc_metadata(
1025 (("read_stream", request.read_stream),)
1026 ),
1027 )
1028
1029 # Validate the universe domain.
1030 self._validate_universe_domain()
1031
1032 # Send the request.
1033 response = rpc(
1034 request,
1035 retry=retry,
1036 timeout=timeout,
1037 metadata=metadata,
1038 )
1039
1040 # Done; return the response.
1041 return response
1042
1043 def split_read_stream(
1044 self,
1045 request: Optional[Union[storage.SplitReadStreamRequest, dict]] = None,
1046 *,
1047 retry: OptionalRetry = gapic_v1.method.DEFAULT,
1048 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
1049 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
1050 ) -> storage.SplitReadStreamResponse:
1051 r"""Splits a given ``ReadStream`` into two ``ReadStream`` objects.
1052 These ``ReadStream`` objects are referred to as the primary and
1053 the residual streams of the split. The original ``ReadStream``
1054 can still be read from in the same manner as before. Both of the
1055 returned ``ReadStream`` objects can also be read from, and the
1056 rows returned by both child streams will be the same as the rows
1057 read from the original stream.
1058
1059 Moreover, the two child streams will be allocated back-to-back
1060 in the original ``ReadStream``. Concretely, it is guaranteed
1061 that for streams original, primary, and residual, that
1062 original[0-j] = primary[0-j] and original[j-n] = residual[0-m]
1063 once the streams have been read to completion.
1064
1065 .. code-block:: python
1066
1067 # This snippet has been automatically generated and should be regarded as a
1068 # code template only.
1069 # It will require modifications to work:
1070 # - It may require correct/in-range values for request initialization.
1071 # - It may require specifying regional endpoints when creating the service
1072 # client as shown in:
1073 # https://googleapis.dev/python/google-api-core/latest/client_options.html
1074 from google.cloud import bigquery_storage_v1
1075
1076 def sample_split_read_stream():
1077 # Create a client
1078 client = bigquery_storage_v1.BigQueryReadClient()
1079
1080 # Initialize request argument(s)
1081 request = bigquery_storage_v1.SplitReadStreamRequest(
1082 name="name_value",
1083 )
1084
1085 # Make the request
1086 response = client.split_read_stream(request=request)
1087
1088 # Handle the response
1089 print(response)
1090
1091 Args:
1092 request (Union[google.cloud.bigquery_storage_v1.types.SplitReadStreamRequest, dict]):
1093 The request object. Request message for ``SplitReadStream``.
1094 retry (google.api_core.retry.Retry): Designation of what errors, if any,
1095 should be retried.
1096 timeout (float): The timeout for this request.
1097 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
1098 sent along with the request as metadata. Normally, each value must be of type `str`,
1099 but for metadata keys ending with the suffix `-bin`, the corresponding values must
1100 be of type `bytes`.
1101
1102 Returns:
1103 google.cloud.bigquery_storage_v1.types.SplitReadStreamResponse:
1104 Response message for SplitReadStream.
1105 """
1106 # Create or coerce a protobuf request object.
1107 # - Use the request object if provided (there's no risk of modifying the input as
1108 # there are no flattened fields), or create one.
1109 if not isinstance(request, storage.SplitReadStreamRequest):
1110 request = storage.SplitReadStreamRequest(request)
1111
1112 # Wrap the RPC method; this adds retry and timeout information,
1113 # and friendly error handling.
1114 rpc = self._transport._wrapped_methods[self._transport.split_read_stream]
1115
1116 # Certain fields should be provided within the metadata header;
1117 # add these here.
1118 metadata = tuple(metadata) + (
1119 gapic_v1.routing_header.to_grpc_metadata((("name", request.name),)),
1120 )
1121
1122 # Validate the universe domain.
1123 self._validate_universe_domain()
1124
1125 # Send the request.
1126 response = rpc(
1127 request,
1128 retry=retry,
1129 timeout=timeout,
1130 metadata=metadata,
1131 )
1132
1133 # Done; return the response.
1134 return response
1135
1136 def __enter__(self) -> "BigQueryReadClient":
1137 return self
1138
1139 def __exit__(self, type, value, traceback):
1140 """Releases underlying transport's resources.
1141
1142 .. warning::
1143 ONLY use as a context manager if the transport is NOT shared
1144 with other clients! Exiting the with block will CLOSE the transport
1145 and may cause errors in other clients!
1146 """
1147 self.transport.close()
1148
1149
1150DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo(
1151 gapic_version=package_version.__version__
1152)
1153
1154if hasattr(DEFAULT_CLIENT_INFO, "protobuf_runtime_version"): # pragma: NO COVER
1155 DEFAULT_CLIENT_INFO.protobuf_runtime_version = google.protobuf.__version__
1156
1157__all__ = ("BigQueryReadClient",)