1# -*- coding: utf-8 -*-
2# Copyright 2022 Google LLC
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16from collections import OrderedDict
17import os
18import re
19from typing import (
20 Dict,
21 Mapping,
22 MutableMapping,
23 MutableSequence,
24 Optional,
25 Iterable,
26 Sequence,
27 Tuple,
28 Type,
29 Union,
30 cast,
31)
32
33from google.cloud.bigquery_storage_v1 import gapic_version as package_version
34
35from google.api_core import client_options as client_options_lib
36from google.api_core import exceptions as core_exceptions
37from google.api_core import gapic_v1
38from google.api_core import retry as retries
39from google.auth import credentials as ga_credentials # type: ignore
40from google.auth.transport import mtls # type: ignore
41from google.auth.transport.grpc import SslCredentials # type: ignore
42from google.auth.exceptions import MutualTLSChannelError # type: ignore
43from google.oauth2 import service_account # type: ignore
44
45try:
46 OptionalRetry = Union[retries.Retry, gapic_v1.method._MethodDefault]
47except AttributeError: # pragma: NO COVER
48 OptionalRetry = Union[retries.Retry, object] # type: ignore
49
50from google.cloud.bigquery_storage_v1.types import arrow
51from google.cloud.bigquery_storage_v1.types import avro
52from google.cloud.bigquery_storage_v1.types import storage
53from google.cloud.bigquery_storage_v1.types import stream
54from google.protobuf import timestamp_pb2 # type: ignore
55from .transports.base import BigQueryReadTransport, DEFAULT_CLIENT_INFO
56from .transports.grpc import BigQueryReadGrpcTransport
57from .transports.grpc_asyncio import BigQueryReadGrpcAsyncIOTransport
58
59
60class BigQueryReadClientMeta(type):
61 """Metaclass for the BigQueryRead client.
62
63 This provides class-level methods for building and retrieving
64 support objects (e.g. transport) without polluting the client instance
65 objects.
66 """
67
68 _transport_registry = OrderedDict() # type: Dict[str, Type[BigQueryReadTransport]]
69 _transport_registry["grpc"] = BigQueryReadGrpcTransport
70 _transport_registry["grpc_asyncio"] = BigQueryReadGrpcAsyncIOTransport
71
72 def get_transport_class(
73 cls,
74 label: Optional[str] = None,
75 ) -> Type[BigQueryReadTransport]:
76 """Returns an appropriate transport class.
77
78 Args:
79 label: The name of the desired transport. If none is
80 provided, then the first transport in the registry is used.
81
82 Returns:
83 The transport class to use.
84 """
85 # If a specific transport is requested, return that one.
86 if label:
87 return cls._transport_registry[label]
88
89 # No transport is requested; return the default (that is, the first one
90 # in the dictionary).
91 return next(iter(cls._transport_registry.values()))
92
93
94class BigQueryReadClient(metaclass=BigQueryReadClientMeta):
95 """BigQuery Read API.
96 The Read API can be used to read data from BigQuery.
97 """
98
99 @staticmethod
100 def _get_default_mtls_endpoint(api_endpoint):
101 """Converts api endpoint to mTLS endpoint.
102
103 Convert "*.sandbox.googleapis.com" and "*.googleapis.com" to
104 "*.mtls.sandbox.googleapis.com" and "*.mtls.googleapis.com" respectively.
105 Args:
106 api_endpoint (Optional[str]): the api endpoint to convert.
107 Returns:
108 str: converted mTLS api endpoint.
109 """
110 if not api_endpoint:
111 return api_endpoint
112
113 mtls_endpoint_re = re.compile(
114 r"(?P<name>[^.]+)(?P<mtls>\.mtls)?(?P<sandbox>\.sandbox)?(?P<googledomain>\.googleapis\.com)?"
115 )
116
117 m = mtls_endpoint_re.match(api_endpoint)
118 name, mtls, sandbox, googledomain = m.groups()
119 if mtls or not googledomain:
120 return api_endpoint
121
122 if sandbox:
123 return api_endpoint.replace(
124 "sandbox.googleapis.com", "mtls.sandbox.googleapis.com"
125 )
126
127 return api_endpoint.replace(".googleapis.com", ".mtls.googleapis.com")
128
129 DEFAULT_ENDPOINT = "bigquerystorage.googleapis.com"
130 DEFAULT_MTLS_ENDPOINT = _get_default_mtls_endpoint.__func__( # type: ignore
131 DEFAULT_ENDPOINT
132 )
133
134 @classmethod
135 def from_service_account_info(cls, info: dict, *args, **kwargs):
136 """Creates an instance of this client using the provided credentials
137 info.
138
139 Args:
140 info (dict): The service account private key info.
141 args: Additional arguments to pass to the constructor.
142 kwargs: Additional arguments to pass to the constructor.
143
144 Returns:
145 BigQueryReadClient: The constructed client.
146 """
147 credentials = service_account.Credentials.from_service_account_info(info)
148 kwargs["credentials"] = credentials
149 return cls(*args, **kwargs)
150
151 @classmethod
152 def from_service_account_file(cls, filename: str, *args, **kwargs):
153 """Creates an instance of this client using the provided credentials
154 file.
155
156 Args:
157 filename (str): The path to the service account private key json
158 file.
159 args: Additional arguments to pass to the constructor.
160 kwargs: Additional arguments to pass to the constructor.
161
162 Returns:
163 BigQueryReadClient: The constructed client.
164 """
165 credentials = service_account.Credentials.from_service_account_file(filename)
166 kwargs["credentials"] = credentials
167 return cls(*args, **kwargs)
168
169 from_service_account_json = from_service_account_file
170
171 @property
172 def transport(self) -> BigQueryReadTransport:
173 """Returns the transport used by the client instance.
174
175 Returns:
176 BigQueryReadTransport: The transport used by the client
177 instance.
178 """
179 return self._transport
180
181 @staticmethod
182 def read_session_path(
183 project: str,
184 location: str,
185 session: str,
186 ) -> str:
187 """Returns a fully-qualified read_session string."""
188 return "projects/{project}/locations/{location}/sessions/{session}".format(
189 project=project,
190 location=location,
191 session=session,
192 )
193
194 @staticmethod
195 def parse_read_session_path(path: str) -> Dict[str, str]:
196 """Parses a read_session path into its component segments."""
197 m = re.match(
198 r"^projects/(?P<project>.+?)/locations/(?P<location>.+?)/sessions/(?P<session>.+?)$",
199 path,
200 )
201 return m.groupdict() if m else {}
202
203 @staticmethod
204 def read_stream_path(
205 project: str,
206 location: str,
207 session: str,
208 stream: str,
209 ) -> str:
210 """Returns a fully-qualified read_stream string."""
211 return "projects/{project}/locations/{location}/sessions/{session}/streams/{stream}".format(
212 project=project,
213 location=location,
214 session=session,
215 stream=stream,
216 )
217
218 @staticmethod
219 def parse_read_stream_path(path: str) -> Dict[str, str]:
220 """Parses a read_stream path into its component segments."""
221 m = re.match(
222 r"^projects/(?P<project>.+?)/locations/(?P<location>.+?)/sessions/(?P<session>.+?)/streams/(?P<stream>.+?)$",
223 path,
224 )
225 return m.groupdict() if m else {}
226
227 @staticmethod
228 def table_path(
229 project: str,
230 dataset: str,
231 table: str,
232 ) -> str:
233 """Returns a fully-qualified table string."""
234 return "projects/{project}/datasets/{dataset}/tables/{table}".format(
235 project=project,
236 dataset=dataset,
237 table=table,
238 )
239
240 @staticmethod
241 def parse_table_path(path: str) -> Dict[str, str]:
242 """Parses a table path into its component segments."""
243 m = re.match(
244 r"^projects/(?P<project>.+?)/datasets/(?P<dataset>.+?)/tables/(?P<table>.+?)$",
245 path,
246 )
247 return m.groupdict() if m else {}
248
249 @staticmethod
250 def common_billing_account_path(
251 billing_account: str,
252 ) -> str:
253 """Returns a fully-qualified billing_account string."""
254 return "billingAccounts/{billing_account}".format(
255 billing_account=billing_account,
256 )
257
258 @staticmethod
259 def parse_common_billing_account_path(path: str) -> Dict[str, str]:
260 """Parse a billing_account path into its component segments."""
261 m = re.match(r"^billingAccounts/(?P<billing_account>.+?)$", path)
262 return m.groupdict() if m else {}
263
264 @staticmethod
265 def common_folder_path(
266 folder: str,
267 ) -> str:
268 """Returns a fully-qualified folder string."""
269 return "folders/{folder}".format(
270 folder=folder,
271 )
272
273 @staticmethod
274 def parse_common_folder_path(path: str) -> Dict[str, str]:
275 """Parse a folder path into its component segments."""
276 m = re.match(r"^folders/(?P<folder>.+?)$", path)
277 return m.groupdict() if m else {}
278
279 @staticmethod
280 def common_organization_path(
281 organization: str,
282 ) -> str:
283 """Returns a fully-qualified organization string."""
284 return "organizations/{organization}".format(
285 organization=organization,
286 )
287
288 @staticmethod
289 def parse_common_organization_path(path: str) -> Dict[str, str]:
290 """Parse a organization path into its component segments."""
291 m = re.match(r"^organizations/(?P<organization>.+?)$", path)
292 return m.groupdict() if m else {}
293
294 @staticmethod
295 def common_project_path(
296 project: str,
297 ) -> str:
298 """Returns a fully-qualified project string."""
299 return "projects/{project}".format(
300 project=project,
301 )
302
303 @staticmethod
304 def parse_common_project_path(path: str) -> Dict[str, str]:
305 """Parse a project path into its component segments."""
306 m = re.match(r"^projects/(?P<project>.+?)$", path)
307 return m.groupdict() if m else {}
308
309 @staticmethod
310 def common_location_path(
311 project: str,
312 location: str,
313 ) -> str:
314 """Returns a fully-qualified location string."""
315 return "projects/{project}/locations/{location}".format(
316 project=project,
317 location=location,
318 )
319
320 @staticmethod
321 def parse_common_location_path(path: str) -> Dict[str, str]:
322 """Parse a location path into its component segments."""
323 m = re.match(r"^projects/(?P<project>.+?)/locations/(?P<location>.+?)$", path)
324 return m.groupdict() if m else {}
325
326 @classmethod
327 def get_mtls_endpoint_and_cert_source(
328 cls, client_options: Optional[client_options_lib.ClientOptions] = None
329 ):
330 """Return the API endpoint and client cert source for mutual TLS.
331
332 The client cert source is determined in the following order:
333 (1) if `GOOGLE_API_USE_CLIENT_CERTIFICATE` environment variable is not "true", the
334 client cert source is None.
335 (2) if `client_options.client_cert_source` is provided, use the provided one; if the
336 default client cert source exists, use the default one; otherwise the client cert
337 source is None.
338
339 The API endpoint is determined in the following order:
340 (1) if `client_options.api_endpoint` if provided, use the provided one.
341 (2) if `GOOGLE_API_USE_CLIENT_CERTIFICATE` environment variable is "always", use the
342 default mTLS endpoint; if the environment variable is "never", use the default API
343 endpoint; otherwise if client cert source exists, use the default mTLS endpoint, otherwise
344 use the default API endpoint.
345
346 More details can be found at https://google.aip.dev/auth/4114.
347
348 Args:
349 client_options (google.api_core.client_options.ClientOptions): Custom options for the
350 client. Only the `api_endpoint` and `client_cert_source` properties may be used
351 in this method.
352
353 Returns:
354 Tuple[str, Callable[[], Tuple[bytes, bytes]]]: returns the API endpoint and the
355 client cert source to use.
356
357 Raises:
358 google.auth.exceptions.MutualTLSChannelError: If any errors happen.
359 """
360 if client_options is None:
361 client_options = client_options_lib.ClientOptions()
362 use_client_cert = os.getenv("GOOGLE_API_USE_CLIENT_CERTIFICATE", "false")
363 use_mtls_endpoint = os.getenv("GOOGLE_API_USE_MTLS_ENDPOINT", "auto")
364 if use_client_cert not in ("true", "false"):
365 raise ValueError(
366 "Environment variable `GOOGLE_API_USE_CLIENT_CERTIFICATE` must be either `true` or `false`"
367 )
368 if use_mtls_endpoint not in ("auto", "never", "always"):
369 raise MutualTLSChannelError(
370 "Environment variable `GOOGLE_API_USE_MTLS_ENDPOINT` must be `never`, `auto` or `always`"
371 )
372
373 # Figure out the client cert source to use.
374 client_cert_source = None
375 if use_client_cert == "true":
376 if client_options.client_cert_source:
377 client_cert_source = client_options.client_cert_source
378 elif mtls.has_default_client_cert_source():
379 client_cert_source = mtls.default_client_cert_source()
380
381 # Figure out which api endpoint to use.
382 if client_options.api_endpoint is not None:
383 api_endpoint = client_options.api_endpoint
384 elif use_mtls_endpoint == "always" or (
385 use_mtls_endpoint == "auto" and client_cert_source
386 ):
387 api_endpoint = cls.DEFAULT_MTLS_ENDPOINT
388 else:
389 api_endpoint = cls.DEFAULT_ENDPOINT
390
391 return api_endpoint, client_cert_source
392
393 def __init__(
394 self,
395 *,
396 credentials: Optional[ga_credentials.Credentials] = None,
397 transport: Optional[Union[str, BigQueryReadTransport]] = None,
398 client_options: Optional[Union[client_options_lib.ClientOptions, dict]] = None,
399 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO,
400 ) -> None:
401 """Instantiates the big query read client.
402
403 Args:
404 credentials (Optional[google.auth.credentials.Credentials]): The
405 authorization credentials to attach to requests. These
406 credentials identify the application to the service; if none
407 are specified, the client will attempt to ascertain the
408 credentials from the environment.
409 transport (Union[str, BigQueryReadTransport]): The
410 transport to use. If set to None, a transport is chosen
411 automatically.
412 client_options (Optional[Union[google.api_core.client_options.ClientOptions, dict]]): Custom options for the
413 client. It won't take effect if a ``transport`` instance is provided.
414 (1) The ``api_endpoint`` property can be used to override the
415 default endpoint provided by the client. GOOGLE_API_USE_MTLS_ENDPOINT
416 environment variable can also be used to override the endpoint:
417 "always" (always use the default mTLS endpoint), "never" (always
418 use the default regular endpoint) and "auto" (auto switch to the
419 default mTLS endpoint if client certificate is present, this is
420 the default value). However, the ``api_endpoint`` property takes
421 precedence if provided.
422 (2) If GOOGLE_API_USE_CLIENT_CERTIFICATE environment variable
423 is "true", then the ``client_cert_source`` property can be used
424 to provide client certificate for mutual TLS transport. If
425 not provided, the default SSL client certificate will be used if
426 present. If GOOGLE_API_USE_CLIENT_CERTIFICATE is "false" or not
427 set, no client certificate will be used.
428 client_info (google.api_core.gapic_v1.client_info.ClientInfo):
429 The client info used to send a user-agent string along with
430 API requests. If ``None``, then default info will be used.
431 Generally, you only need to set this if you're developing
432 your own client library.
433
434 Raises:
435 google.auth.exceptions.MutualTLSChannelError: If mutual TLS transport
436 creation failed for any reason.
437 """
438 if isinstance(client_options, dict):
439 client_options = client_options_lib.from_dict(client_options)
440 if client_options is None:
441 client_options = client_options_lib.ClientOptions()
442 client_options = cast(client_options_lib.ClientOptions, client_options)
443
444 api_endpoint, client_cert_source_func = self.get_mtls_endpoint_and_cert_source(
445 client_options
446 )
447
448 api_key_value = getattr(client_options, "api_key", None)
449 if api_key_value and credentials:
450 raise ValueError(
451 "client_options.api_key and credentials are mutually exclusive"
452 )
453
454 # Save or instantiate the transport.
455 # Ordinarily, we provide the transport, but allowing a custom transport
456 # instance provides an extensibility point for unusual situations.
457 if isinstance(transport, BigQueryReadTransport):
458 # transport is a BigQueryReadTransport instance.
459 if credentials or client_options.credentials_file or api_key_value:
460 raise ValueError(
461 "When providing a transport instance, "
462 "provide its credentials directly."
463 )
464 if client_options.scopes:
465 raise ValueError(
466 "When providing a transport instance, provide its scopes "
467 "directly."
468 )
469 self._transport = transport
470 else:
471 import google.auth._default # type: ignore
472
473 if api_key_value and hasattr(
474 google.auth._default, "get_api_key_credentials"
475 ):
476 credentials = google.auth._default.get_api_key_credentials(
477 api_key_value
478 )
479
480 Transport = type(self).get_transport_class(transport)
481 self._transport = Transport(
482 credentials=credentials,
483 credentials_file=client_options.credentials_file,
484 host=api_endpoint,
485 scopes=client_options.scopes,
486 client_cert_source_for_mtls=client_cert_source_func,
487 quota_project_id=client_options.quota_project_id,
488 client_info=client_info,
489 always_use_jwt_access=True,
490 api_audience=client_options.api_audience,
491 )
492
493 def create_read_session(
494 self,
495 request: Optional[Union[storage.CreateReadSessionRequest, dict]] = None,
496 *,
497 parent: Optional[str] = None,
498 read_session: Optional[stream.ReadSession] = None,
499 max_stream_count: Optional[int] = None,
500 retry: OptionalRetry = gapic_v1.method.DEFAULT,
501 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
502 metadata: Sequence[Tuple[str, str]] = (),
503 ) -> stream.ReadSession:
504 r"""Creates a new read session. A read session divides
505 the contents of a BigQuery table into one or more
506 streams, which can then be used to read data from the
507 table. The read session also specifies properties of the
508 data to be read, such as a list of columns or a
509 push-down filter describing the rows to be returned.
510
511 A particular row can be read by at most one stream. When
512 the caller has reached the end of each stream in the
513 session, then all the data in the table has been read.
514
515 Data is assigned to each stream such that roughly the
516 same number of rows can be read from each stream.
517 Because the server-side unit for assigning data is
518 collections of rows, the API does not guarantee that
519 each stream will return the same number or rows.
520 Additionally, the limits are enforced based on the
521 number of pre-filtered rows, so some filters can lead to
522 lopsided assignments.
523
524 Read sessions automatically expire 6 hours after they
525 are created and do not require manual clean-up by the
526 caller.
527
528 .. code-block:: python
529
530 # This snippet has been automatically generated and should be regarded as a
531 # code template only.
532 # It will require modifications to work:
533 # - It may require correct/in-range values for request initialization.
534 # - It may require specifying regional endpoints when creating the service
535 # client as shown in:
536 # https://googleapis.dev/python/google-api-core/latest/client_options.html
537 from google.cloud import bigquery_storage_v1
538
539 def sample_create_read_session():
540 # Create a client
541 client = bigquery_storage_v1.BigQueryReadClient()
542
543 # Initialize request argument(s)
544 request = bigquery_storage_v1.CreateReadSessionRequest(
545 parent="parent_value",
546 )
547
548 # Make the request
549 response = client.create_read_session(request=request)
550
551 # Handle the response
552 print(response)
553
554 Args:
555 request (Union[google.cloud.bigquery_storage_v1.types.CreateReadSessionRequest, dict]):
556 The request object. Request message for ``CreateReadSession``.
557 parent (str):
558 Required. The request project that owns the session, in
559 the form of ``projects/{project_id}``.
560
561 This corresponds to the ``parent`` field
562 on the ``request`` instance; if ``request`` is provided, this
563 should not be set.
564 read_session (google.cloud.bigquery_storage_v1.types.ReadSession):
565 Required. Session to be created.
566 This corresponds to the ``read_session`` field
567 on the ``request`` instance; if ``request`` is provided, this
568 should not be set.
569 max_stream_count (int):
570 Max initial number of streams. If unset or zero, the
571 server will provide a value of streams so as to produce
572 reasonable throughput. Must be non-negative. The number
573 of streams may be lower than the requested number,
574 depending on the amount parallelism that is reasonable
575 for the table. There is a default system max limit of
576 1,000.
577
578 This must be greater than or equal to
579 preferred_min_stream_count. Typically, clients should
580 either leave this unset to let the system to determine
581 an upper bound OR set this a size for the maximum "units
582 of work" it can gracefully handle.
583
584 This corresponds to the ``max_stream_count`` field
585 on the ``request`` instance; if ``request`` is provided, this
586 should not be set.
587 retry (google.api_core.retry.Retry): Designation of what errors, if any,
588 should be retried.
589 timeout (float): The timeout for this request.
590 metadata (Sequence[Tuple[str, str]]): Strings which should be
591 sent along with the request as metadata.
592
593 Returns:
594 google.cloud.bigquery_storage_v1.types.ReadSession:
595 Information about the ReadSession.
596 """
597 # Create or coerce a protobuf request object.
598 # Quick check: If we got a request object, we should *not* have
599 # gotten any keyword arguments that map to the request.
600 has_flattened_params = any([parent, read_session, max_stream_count])
601 if request is not None and has_flattened_params:
602 raise ValueError(
603 "If the `request` argument is set, then none of "
604 "the individual field arguments should be set."
605 )
606
607 # Minor optimization to avoid making a copy if the user passes
608 # in a storage.CreateReadSessionRequest.
609 # There's no risk of modifying the input as we've already verified
610 # there are no flattened fields.
611 if not isinstance(request, storage.CreateReadSessionRequest):
612 request = storage.CreateReadSessionRequest(request)
613 # If we have keyword arguments corresponding to fields on the
614 # request, apply these.
615 if parent is not None:
616 request.parent = parent
617 if read_session is not None:
618 request.read_session = read_session
619 if max_stream_count is not None:
620 request.max_stream_count = max_stream_count
621
622 # Wrap the RPC method; this adds retry and timeout information,
623 # and friendly error handling.
624 rpc = self._transport._wrapped_methods[self._transport.create_read_session]
625
626 # Certain fields should be provided within the metadata header;
627 # add these here.
628 metadata = tuple(metadata) + (
629 gapic_v1.routing_header.to_grpc_metadata(
630 (("read_session.table", request.read_session.table),)
631 ),
632 )
633
634 # Send the request.
635 response = rpc(
636 request,
637 retry=retry,
638 timeout=timeout,
639 metadata=metadata,
640 )
641
642 # Done; return the response.
643 return response
644
645 def read_rows(
646 self,
647 request: Optional[Union[storage.ReadRowsRequest, dict]] = None,
648 *,
649 read_stream: Optional[str] = None,
650 offset: Optional[int] = None,
651 retry: OptionalRetry = gapic_v1.method.DEFAULT,
652 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
653 metadata: Sequence[Tuple[str, str]] = (),
654 ) -> Iterable[storage.ReadRowsResponse]:
655 r"""Reads rows from the stream in the format prescribed
656 by the ReadSession. Each response contains one or more
657 table rows, up to a maximum of 100 MiB per response;
658 read requests which attempt to read individual rows
659 larger than 100 MiB will fail.
660
661 Each request also returns a set of stream statistics
662 reflecting the current state of the stream.
663
664 .. code-block:: python
665
666 # This snippet has been automatically generated and should be regarded as a
667 # code template only.
668 # It will require modifications to work:
669 # - It may require correct/in-range values for request initialization.
670 # - It may require specifying regional endpoints when creating the service
671 # client as shown in:
672 # https://googleapis.dev/python/google-api-core/latest/client_options.html
673 from google.cloud import bigquery_storage_v1
674
675 def sample_read_rows():
676 # Create a client
677 client = bigquery_storage_v1.BigQueryReadClient()
678
679 # Initialize request argument(s)
680 request = bigquery_storage_v1.ReadRowsRequest(
681 read_stream="read_stream_value",
682 )
683
684 # Make the request
685 stream = client.read_rows(request=request)
686
687 # Handle the response
688 for response in stream:
689 print(response)
690
691 Args:
692 request (Union[google.cloud.bigquery_storage_v1.types.ReadRowsRequest, dict]):
693 The request object. Request message for ``ReadRows``.
694 read_stream (str):
695 Required. Stream to read rows from.
696 This corresponds to the ``read_stream`` field
697 on the ``request`` instance; if ``request`` is provided, this
698 should not be set.
699 offset (int):
700 The offset requested must be less
701 than the last row read from Read.
702 Requesting a larger offset is undefined.
703 If not specified, start reading from
704 offset zero.
705
706 This corresponds to the ``offset`` field
707 on the ``request`` instance; if ``request`` is provided, this
708 should not be set.
709 retry (google.api_core.retry.Retry): Designation of what errors, if any,
710 should be retried.
711 timeout (float): The timeout for this request.
712 metadata (Sequence[Tuple[str, str]]): Strings which should be
713 sent along with the request as metadata.
714
715 Returns:
716 Iterable[google.cloud.bigquery_storage_v1.types.ReadRowsResponse]:
717 Response from calling ReadRows may include row data, progress and
718 throttling information.
719
720 """
721 # Create or coerce a protobuf request object.
722 # Quick check: If we got a request object, we should *not* have
723 # gotten any keyword arguments that map to the request.
724 has_flattened_params = any([read_stream, offset])
725 if request is not None and has_flattened_params:
726 raise ValueError(
727 "If the `request` argument is set, then none of "
728 "the individual field arguments should be set."
729 )
730
731 # Minor optimization to avoid making a copy if the user passes
732 # in a storage.ReadRowsRequest.
733 # There's no risk of modifying the input as we've already verified
734 # there are no flattened fields.
735 if not isinstance(request, storage.ReadRowsRequest):
736 request = storage.ReadRowsRequest(request)
737 # If we have keyword arguments corresponding to fields on the
738 # request, apply these.
739 if read_stream is not None:
740 request.read_stream = read_stream
741 if offset is not None:
742 request.offset = offset
743
744 # Wrap the RPC method; this adds retry and timeout information,
745 # and friendly error handling.
746 rpc = self._transport._wrapped_methods[self._transport.read_rows]
747
748 # Certain fields should be provided within the metadata header;
749 # add these here.
750 metadata = tuple(metadata) + (
751 gapic_v1.routing_header.to_grpc_metadata(
752 (("read_stream", request.read_stream),)
753 ),
754 )
755
756 # Send the request.
757 response = rpc(
758 request,
759 retry=retry,
760 timeout=timeout,
761 metadata=metadata,
762 )
763
764 # Done; return the response.
765 return response
766
767 def split_read_stream(
768 self,
769 request: Optional[Union[storage.SplitReadStreamRequest, dict]] = None,
770 *,
771 retry: OptionalRetry = gapic_v1.method.DEFAULT,
772 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
773 metadata: Sequence[Tuple[str, str]] = (),
774 ) -> storage.SplitReadStreamResponse:
775 r"""Splits a given ``ReadStream`` into two ``ReadStream`` objects.
776 These ``ReadStream`` objects are referred to as the primary and
777 the residual streams of the split. The original ``ReadStream``
778 can still be read from in the same manner as before. Both of the
779 returned ``ReadStream`` objects can also be read from, and the
780 rows returned by both child streams will be the same as the rows
781 read from the original stream.
782
783 Moreover, the two child streams will be allocated back-to-back
784 in the original ``ReadStream``. Concretely, it is guaranteed
785 that for streams original, primary, and residual, that
786 original[0-j] = primary[0-j] and original[j-n] = residual[0-m]
787 once the streams have been read to completion.
788
789 .. code-block:: python
790
791 # This snippet has been automatically generated and should be regarded as a
792 # code template only.
793 # It will require modifications to work:
794 # - It may require correct/in-range values for request initialization.
795 # - It may require specifying regional endpoints when creating the service
796 # client as shown in:
797 # https://googleapis.dev/python/google-api-core/latest/client_options.html
798 from google.cloud import bigquery_storage_v1
799
800 def sample_split_read_stream():
801 # Create a client
802 client = bigquery_storage_v1.BigQueryReadClient()
803
804 # Initialize request argument(s)
805 request = bigquery_storage_v1.SplitReadStreamRequest(
806 name="name_value",
807 )
808
809 # Make the request
810 response = client.split_read_stream(request=request)
811
812 # Handle the response
813 print(response)
814
815 Args:
816 request (Union[google.cloud.bigquery_storage_v1.types.SplitReadStreamRequest, dict]):
817 The request object. Request message for ``SplitReadStream``.
818 retry (google.api_core.retry.Retry): Designation of what errors, if any,
819 should be retried.
820 timeout (float): The timeout for this request.
821 metadata (Sequence[Tuple[str, str]]): Strings which should be
822 sent along with the request as metadata.
823
824 Returns:
825 google.cloud.bigquery_storage_v1.types.SplitReadStreamResponse:
826 Response message for SplitReadStream.
827 """
828 # Create or coerce a protobuf request object.
829 # Minor optimization to avoid making a copy if the user passes
830 # in a storage.SplitReadStreamRequest.
831 # There's no risk of modifying the input as we've already verified
832 # there are no flattened fields.
833 if not isinstance(request, storage.SplitReadStreamRequest):
834 request = storage.SplitReadStreamRequest(request)
835
836 # Wrap the RPC method; this adds retry and timeout information,
837 # and friendly error handling.
838 rpc = self._transport._wrapped_methods[self._transport.split_read_stream]
839
840 # Certain fields should be provided within the metadata header;
841 # add these here.
842 metadata = tuple(metadata) + (
843 gapic_v1.routing_header.to_grpc_metadata((("name", request.name),)),
844 )
845
846 # Send the request.
847 response = rpc(
848 request,
849 retry=retry,
850 timeout=timeout,
851 metadata=metadata,
852 )
853
854 # Done; return the response.
855 return response
856
857 def __enter__(self) -> "BigQueryReadClient":
858 return self
859
860 def __exit__(self, type, value, traceback):
861 """Releases underlying transport's resources.
862
863 .. warning::
864 ONLY use as a context manager if the transport is NOT shared
865 with other clients! Exiting the with block will CLOSE the transport
866 and may cause errors in other clients!
867 """
868 self.transport.close()
869
870
871DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo(
872 gapic_version=package_version.__version__
873)
874
875
876__all__ = ("BigQueryReadClient",)