1# -*- coding: utf-8 -*-
2# Copyright 2024 Google LLC
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16from collections import OrderedDict
17import functools
18import re
19from typing import (
20 AsyncIterable,
21 Awaitable,
22 Dict,
23 Mapping,
24 MutableMapping,
25 MutableSequence,
26 Optional,
27 Sequence,
28 Tuple,
29 Type,
30 Union,
31)
32
33from google.api_core import exceptions as core_exceptions
34from google.api_core import gapic_v1
35from google.api_core import retry_async as retries
36from google.api_core.client_options import ClientOptions
37from google.auth import credentials as ga_credentials # type: ignore
38from google.oauth2 import service_account # type: ignore
39
40from google.cloud.bigquery_storage_v1 import gapic_version as package_version
41
42try:
43 OptionalRetry = Union[retries.AsyncRetry, gapic_v1.method._MethodDefault, None]
44except AttributeError: # pragma: NO COVER
45 OptionalRetry = Union[retries.AsyncRetry, object, None] # type: ignore
46
47from google.protobuf import timestamp_pb2 # type: ignore
48
49from google.cloud.bigquery_storage_v1.types import arrow, avro, storage, stream
50
51from .client import BigQueryReadClient
52from .transports.base import DEFAULT_CLIENT_INFO, BigQueryReadTransport
53from .transports.grpc_asyncio import BigQueryReadGrpcAsyncIOTransport
54
55
56class BigQueryReadAsyncClient:
57 """BigQuery Read API.
58
59 The Read API can be used to read data from BigQuery.
60 """
61
62 _client: BigQueryReadClient
63
64 # Copy defaults from the synchronous client for use here.
65 # Note: DEFAULT_ENDPOINT is deprecated. Use _DEFAULT_ENDPOINT_TEMPLATE instead.
66 DEFAULT_ENDPOINT = BigQueryReadClient.DEFAULT_ENDPOINT
67 DEFAULT_MTLS_ENDPOINT = BigQueryReadClient.DEFAULT_MTLS_ENDPOINT
68 _DEFAULT_ENDPOINT_TEMPLATE = BigQueryReadClient._DEFAULT_ENDPOINT_TEMPLATE
69 _DEFAULT_UNIVERSE = BigQueryReadClient._DEFAULT_UNIVERSE
70
71 read_session_path = staticmethod(BigQueryReadClient.read_session_path)
72 parse_read_session_path = staticmethod(BigQueryReadClient.parse_read_session_path)
73 read_stream_path = staticmethod(BigQueryReadClient.read_stream_path)
74 parse_read_stream_path = staticmethod(BigQueryReadClient.parse_read_stream_path)
75 table_path = staticmethod(BigQueryReadClient.table_path)
76 parse_table_path = staticmethod(BigQueryReadClient.parse_table_path)
77 common_billing_account_path = staticmethod(
78 BigQueryReadClient.common_billing_account_path
79 )
80 parse_common_billing_account_path = staticmethod(
81 BigQueryReadClient.parse_common_billing_account_path
82 )
83 common_folder_path = staticmethod(BigQueryReadClient.common_folder_path)
84 parse_common_folder_path = staticmethod(BigQueryReadClient.parse_common_folder_path)
85 common_organization_path = staticmethod(BigQueryReadClient.common_organization_path)
86 parse_common_organization_path = staticmethod(
87 BigQueryReadClient.parse_common_organization_path
88 )
89 common_project_path = staticmethod(BigQueryReadClient.common_project_path)
90 parse_common_project_path = staticmethod(
91 BigQueryReadClient.parse_common_project_path
92 )
93 common_location_path = staticmethod(BigQueryReadClient.common_location_path)
94 parse_common_location_path = staticmethod(
95 BigQueryReadClient.parse_common_location_path
96 )
97
98 @classmethod
99 def from_service_account_info(cls, info: dict, *args, **kwargs):
100 """Creates an instance of this client using the provided credentials
101 info.
102
103 Args:
104 info (dict): The service account private key info.
105 args: Additional arguments to pass to the constructor.
106 kwargs: Additional arguments to pass to the constructor.
107
108 Returns:
109 BigQueryReadAsyncClient: The constructed client.
110 """
111 return BigQueryReadClient.from_service_account_info.__func__(BigQueryReadAsyncClient, info, *args, **kwargs) # type: ignore
112
113 @classmethod
114 def from_service_account_file(cls, filename: str, *args, **kwargs):
115 """Creates an instance of this client using the provided credentials
116 file.
117
118 Args:
119 filename (str): The path to the service account private key json
120 file.
121 args: Additional arguments to pass to the constructor.
122 kwargs: Additional arguments to pass to the constructor.
123
124 Returns:
125 BigQueryReadAsyncClient: The constructed client.
126 """
127 return BigQueryReadClient.from_service_account_file.__func__(BigQueryReadAsyncClient, filename, *args, **kwargs) # type: ignore
128
129 from_service_account_json = from_service_account_file
130
131 @classmethod
132 def get_mtls_endpoint_and_cert_source(
133 cls, client_options: Optional[ClientOptions] = None
134 ):
135 """Return the API endpoint and client cert source for mutual TLS.
136
137 The client cert source is determined in the following order:
138 (1) if `GOOGLE_API_USE_CLIENT_CERTIFICATE` environment variable is not "true", the
139 client cert source is None.
140 (2) if `client_options.client_cert_source` is provided, use the provided one; if the
141 default client cert source exists, use the default one; otherwise the client cert
142 source is None.
143
144 The API endpoint is determined in the following order:
145 (1) if `client_options.api_endpoint` if provided, use the provided one.
146 (2) if `GOOGLE_API_USE_CLIENT_CERTIFICATE` environment variable is "always", use the
147 default mTLS endpoint; if the environment variable is "never", use the default API
148 endpoint; otherwise if client cert source exists, use the default mTLS endpoint, otherwise
149 use the default API endpoint.
150
151 More details can be found at https://google.aip.dev/auth/4114.
152
153 Args:
154 client_options (google.api_core.client_options.ClientOptions): Custom options for the
155 client. Only the `api_endpoint` and `client_cert_source` properties may be used
156 in this method.
157
158 Returns:
159 Tuple[str, Callable[[], Tuple[bytes, bytes]]]: returns the API endpoint and the
160 client cert source to use.
161
162 Raises:
163 google.auth.exceptions.MutualTLSChannelError: If any errors happen.
164 """
165 return BigQueryReadClient.get_mtls_endpoint_and_cert_source(client_options) # type: ignore
166
167 @property
168 def transport(self) -> BigQueryReadTransport:
169 """Returns the transport used by the client instance.
170
171 Returns:
172 BigQueryReadTransport: The transport used by the client instance.
173 """
174 return self._client.transport
175
176 @property
177 def api_endpoint(self):
178 """Return the API endpoint used by the client instance.
179
180 Returns:
181 str: The API endpoint used by the client instance.
182 """
183 return self._client._api_endpoint
184
185 @property
186 def universe_domain(self) -> str:
187 """Return the universe domain used by the client instance.
188
189 Returns:
190 str: The universe domain used
191 by the client instance.
192 """
193 return self._client._universe_domain
194
195 get_transport_class = functools.partial(
196 type(BigQueryReadClient).get_transport_class, type(BigQueryReadClient)
197 )
198
199 def __init__(
200 self,
201 *,
202 credentials: Optional[ga_credentials.Credentials] = None,
203 transport: Union[str, BigQueryReadTransport] = "grpc_asyncio",
204 client_options: Optional[ClientOptions] = None,
205 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO,
206 ) -> None:
207 """Instantiates the big query read async client.
208
209 Args:
210 credentials (Optional[google.auth.credentials.Credentials]): The
211 authorization credentials to attach to requests. These
212 credentials identify the application to the service; if none
213 are specified, the client will attempt to ascertain the
214 credentials from the environment.
215 transport (Union[str, ~.BigQueryReadTransport]): The
216 transport to use. If set to None, a transport is chosen
217 automatically.
218 client_options (Optional[Union[google.api_core.client_options.ClientOptions, dict]]):
219 Custom options for the client.
220
221 1. The ``api_endpoint`` property can be used to override the
222 default endpoint provided by the client when ``transport`` is
223 not explicitly provided. Only if this property is not set and
224 ``transport`` was not explicitly provided, the endpoint is
225 determined by the GOOGLE_API_USE_MTLS_ENDPOINT environment
226 variable, which have one of the following values:
227 "always" (always use the default mTLS endpoint), "never" (always
228 use the default regular endpoint) and "auto" (auto-switch to the
229 default mTLS endpoint if client certificate is present; this is
230 the default value).
231
232 2. If the GOOGLE_API_USE_CLIENT_CERTIFICATE environment variable
233 is "true", then the ``client_cert_source`` property can be used
234 to provide a client certificate for mTLS transport. If
235 not provided, the default SSL client certificate will be used if
236 present. If GOOGLE_API_USE_CLIENT_CERTIFICATE is "false" or not
237 set, no client certificate will be used.
238
239 3. The ``universe_domain`` property can be used to override the
240 default "googleapis.com" universe. Note that ``api_endpoint``
241 property still takes precedence; and ``universe_domain`` is
242 currently not supported for mTLS.
243
244 client_info (google.api_core.gapic_v1.client_info.ClientInfo):
245 The client info used to send a user-agent string along with
246 API requests. If ``None``, then default info will be used.
247 Generally, you only need to set this if you're developing
248 your own client library.
249
250 Raises:
251 google.auth.exceptions.MutualTlsChannelError: If mutual TLS transport
252 creation failed for any reason.
253 """
254 self._client = BigQueryReadClient(
255 credentials=credentials,
256 transport=transport,
257 client_options=client_options,
258 client_info=client_info,
259 )
260
261 async def create_read_session(
262 self,
263 request: Optional[Union[storage.CreateReadSessionRequest, dict]] = None,
264 *,
265 parent: Optional[str] = None,
266 read_session: Optional[stream.ReadSession] = None,
267 max_stream_count: Optional[int] = None,
268 retry: OptionalRetry = gapic_v1.method.DEFAULT,
269 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
270 metadata: Sequence[Tuple[str, str]] = (),
271 ) -> stream.ReadSession:
272 r"""Creates a new read session. A read session divides
273 the contents of a BigQuery table into one or more
274 streams, which can then be used to read data from the
275 table. The read session also specifies properties of the
276 data to be read, such as a list of columns or a
277 push-down filter describing the rows to be returned.
278
279 A particular row can be read by at most one stream. When
280 the caller has reached the end of each stream in the
281 session, then all the data in the table has been read.
282
283 Data is assigned to each stream such that roughly the
284 same number of rows can be read from each stream.
285 Because the server-side unit for assigning data is
286 collections of rows, the API does not guarantee that
287 each stream will return the same number or rows.
288 Additionally, the limits are enforced based on the
289 number of pre-filtered rows, so some filters can lead to
290 lopsided assignments.
291
292 Read sessions automatically expire 6 hours after they
293 are created and do not require manual clean-up by the
294 caller.
295
296 .. code-block:: python
297
298 # This snippet has been automatically generated and should be regarded as a
299 # code template only.
300 # It will require modifications to work:
301 # - It may require correct/in-range values for request initialization.
302 # - It may require specifying regional endpoints when creating the service
303 # client as shown in:
304 # https://googleapis.dev/python/google-api-core/latest/client_options.html
305 from google.cloud import bigquery_storage_v1
306
307 async def sample_create_read_session():
308 # Create a client
309 client = bigquery_storage_v1.BigQueryReadAsyncClient()
310
311 # Initialize request argument(s)
312 request = bigquery_storage_v1.CreateReadSessionRequest(
313 parent="parent_value",
314 )
315
316 # Make the request
317 response = await client.create_read_session(request=request)
318
319 # Handle the response
320 print(response)
321
322 Args:
323 request (Optional[Union[google.cloud.bigquery_storage_v1.types.CreateReadSessionRequest, dict]]):
324 The request object. Request message for ``CreateReadSession``.
325 parent (:class:`str`):
326 Required. The request project that owns the session, in
327 the form of ``projects/{project_id}``.
328
329 This corresponds to the ``parent`` field
330 on the ``request`` instance; if ``request`` is provided, this
331 should not be set.
332 read_session (:class:`google.cloud.bigquery_storage_v1.types.ReadSession`):
333 Required. Session to be created.
334 This corresponds to the ``read_session`` field
335 on the ``request`` instance; if ``request`` is provided, this
336 should not be set.
337 max_stream_count (:class:`int`):
338 Max initial number of streams. If unset or zero, the
339 server will provide a value of streams so as to produce
340 reasonable throughput. Must be non-negative. The number
341 of streams may be lower than the requested number,
342 depending on the amount parallelism that is reasonable
343 for the table. There is a default system max limit of
344 1,000.
345
346 This must be greater than or equal to
347 preferred_min_stream_count. Typically, clients should
348 either leave this unset to let the system to determine
349 an upper bound OR set this a size for the maximum "units
350 of work" it can gracefully handle.
351
352 This corresponds to the ``max_stream_count`` field
353 on the ``request`` instance; if ``request`` is provided, this
354 should not be set.
355 retry (google.api_core.retry_async.AsyncRetry): Designation of what errors, if any,
356 should be retried.
357 timeout (float): The timeout for this request.
358 metadata (Sequence[Tuple[str, str]]): Strings which should be
359 sent along with the request as metadata.
360
361 Returns:
362 google.cloud.bigquery_storage_v1.types.ReadSession:
363 Information about the ReadSession.
364 """
365 # Create or coerce a protobuf request object.
366 # Quick check: If we got a request object, we should *not* have
367 # gotten any keyword arguments that map to the request.
368 has_flattened_params = any([parent, read_session, max_stream_count])
369 if request is not None and has_flattened_params:
370 raise ValueError(
371 "If the `request` argument is set, then none of "
372 "the individual field arguments should be set."
373 )
374
375 request = storage.CreateReadSessionRequest(request)
376
377 # If we have keyword arguments corresponding to fields on the
378 # request, apply these.
379 if parent is not None:
380 request.parent = parent
381 if read_session is not None:
382 request.read_session = read_session
383 if max_stream_count is not None:
384 request.max_stream_count = max_stream_count
385
386 # Wrap the RPC method; this adds retry and timeout information,
387 # and friendly error handling.
388 rpc = gapic_v1.method_async.wrap_method(
389 self._client._transport.create_read_session,
390 default_retry=retries.AsyncRetry(
391 initial=0.1,
392 maximum=60.0,
393 multiplier=1.3,
394 predicate=retries.if_exception_type(
395 core_exceptions.DeadlineExceeded,
396 core_exceptions.ServiceUnavailable,
397 ),
398 deadline=600.0,
399 ),
400 default_timeout=600.0,
401 client_info=DEFAULT_CLIENT_INFO,
402 )
403
404 # Certain fields should be provided within the metadata header;
405 # add these here.
406 metadata = tuple(metadata) + (
407 gapic_v1.routing_header.to_grpc_metadata(
408 (("read_session.table", request.read_session.table),)
409 ),
410 )
411
412 # Validate the universe domain.
413 self._client._validate_universe_domain()
414
415 # Send the request.
416 response = await rpc(
417 request,
418 retry=retry,
419 timeout=timeout,
420 metadata=metadata,
421 )
422
423 # Done; return the response.
424 return response
425
426 def read_rows(
427 self,
428 request: Optional[Union[storage.ReadRowsRequest, dict]] = None,
429 *,
430 read_stream: Optional[str] = None,
431 offset: Optional[int] = None,
432 retry: OptionalRetry = gapic_v1.method.DEFAULT,
433 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
434 metadata: Sequence[Tuple[str, str]] = (),
435 ) -> Awaitable[AsyncIterable[storage.ReadRowsResponse]]:
436 r"""Reads rows from the stream in the format prescribed
437 by the ReadSession. Each response contains one or more
438 table rows, up to a maximum of 100 MiB per response;
439 read requests which attempt to read individual rows
440 larger than 100 MiB will fail.
441
442 Each request also returns a set of stream statistics
443 reflecting the current state of the stream.
444
445 .. code-block:: python
446
447 # This snippet has been automatically generated and should be regarded as a
448 # code template only.
449 # It will require modifications to work:
450 # - It may require correct/in-range values for request initialization.
451 # - It may require specifying regional endpoints when creating the service
452 # client as shown in:
453 # https://googleapis.dev/python/google-api-core/latest/client_options.html
454 from google.cloud import bigquery_storage_v1
455
456 async def sample_read_rows():
457 # Create a client
458 client = bigquery_storage_v1.BigQueryReadAsyncClient()
459
460 # Initialize request argument(s)
461 request = bigquery_storage_v1.ReadRowsRequest(
462 read_stream="read_stream_value",
463 )
464
465 # Make the request
466 stream = await client.read_rows(request=request)
467
468 # Handle the response
469 async for response in stream:
470 print(response)
471
472 Args:
473 request (Optional[Union[google.cloud.bigquery_storage_v1.types.ReadRowsRequest, dict]]):
474 The request object. Request message for ``ReadRows``.
475 read_stream (:class:`str`):
476 Required. Stream to read rows from.
477 This corresponds to the ``read_stream`` field
478 on the ``request`` instance; if ``request`` is provided, this
479 should not be set.
480 offset (:class:`int`):
481 The offset requested must be less
482 than the last row read from Read.
483 Requesting a larger offset is undefined.
484 If not specified, start reading from
485 offset zero.
486
487 This corresponds to the ``offset`` field
488 on the ``request`` instance; if ``request`` is provided, this
489 should not be set.
490 retry (google.api_core.retry_async.AsyncRetry): Designation of what errors, if any,
491 should be retried.
492 timeout (float): The timeout for this request.
493 metadata (Sequence[Tuple[str, str]]): Strings which should be
494 sent along with the request as metadata.
495
496 Returns:
497 AsyncIterable[google.cloud.bigquery_storage_v1.types.ReadRowsResponse]:
498 Response from calling ReadRows may include row data, progress and
499 throttling information.
500
501 """
502 # Create or coerce a protobuf request object.
503 # Quick check: If we got a request object, we should *not* have
504 # gotten any keyword arguments that map to the request.
505 has_flattened_params = any([read_stream, offset])
506 if request is not None and has_flattened_params:
507 raise ValueError(
508 "If the `request` argument is set, then none of "
509 "the individual field arguments should be set."
510 )
511
512 request = storage.ReadRowsRequest(request)
513
514 # If we have keyword arguments corresponding to fields on the
515 # request, apply these.
516 if read_stream is not None:
517 request.read_stream = read_stream
518 if offset is not None:
519 request.offset = offset
520
521 # Wrap the RPC method; this adds retry and timeout information,
522 # and friendly error handling.
523 rpc = gapic_v1.method_async.wrap_method(
524 self._client._transport.read_rows,
525 default_retry=retries.AsyncRetry(
526 initial=0.1,
527 maximum=60.0,
528 multiplier=1.3,
529 predicate=retries.if_exception_type(
530 core_exceptions.ServiceUnavailable,
531 ),
532 deadline=86400.0,
533 ),
534 default_timeout=86400.0,
535 client_info=DEFAULT_CLIENT_INFO,
536 )
537
538 # Certain fields should be provided within the metadata header;
539 # add these here.
540 metadata = tuple(metadata) + (
541 gapic_v1.routing_header.to_grpc_metadata(
542 (("read_stream", request.read_stream),)
543 ),
544 )
545
546 # Validate the universe domain.
547 self._client._validate_universe_domain()
548
549 # Send the request.
550 response = rpc(
551 request,
552 retry=retry,
553 timeout=timeout,
554 metadata=metadata,
555 )
556
557 # Done; return the response.
558 return response
559
560 async def split_read_stream(
561 self,
562 request: Optional[Union[storage.SplitReadStreamRequest, dict]] = None,
563 *,
564 retry: OptionalRetry = gapic_v1.method.DEFAULT,
565 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
566 metadata: Sequence[Tuple[str, str]] = (),
567 ) -> storage.SplitReadStreamResponse:
568 r"""Splits a given ``ReadStream`` into two ``ReadStream`` objects.
569 These ``ReadStream`` objects are referred to as the primary and
570 the residual streams of the split. The original ``ReadStream``
571 can still be read from in the same manner as before. Both of the
572 returned ``ReadStream`` objects can also be read from, and the
573 rows returned by both child streams will be the same as the rows
574 read from the original stream.
575
576 Moreover, the two child streams will be allocated back-to-back
577 in the original ``ReadStream``. Concretely, it is guaranteed
578 that for streams original, primary, and residual, that
579 original[0-j] = primary[0-j] and original[j-n] = residual[0-m]
580 once the streams have been read to completion.
581
582 .. code-block:: python
583
584 # This snippet has been automatically generated and should be regarded as a
585 # code template only.
586 # It will require modifications to work:
587 # - It may require correct/in-range values for request initialization.
588 # - It may require specifying regional endpoints when creating the service
589 # client as shown in:
590 # https://googleapis.dev/python/google-api-core/latest/client_options.html
591 from google.cloud import bigquery_storage_v1
592
593 async def sample_split_read_stream():
594 # Create a client
595 client = bigquery_storage_v1.BigQueryReadAsyncClient()
596
597 # Initialize request argument(s)
598 request = bigquery_storage_v1.SplitReadStreamRequest(
599 name="name_value",
600 )
601
602 # Make the request
603 response = await client.split_read_stream(request=request)
604
605 # Handle the response
606 print(response)
607
608 Args:
609 request (Optional[Union[google.cloud.bigquery_storage_v1.types.SplitReadStreamRequest, dict]]):
610 The request object. Request message for ``SplitReadStream``.
611 retry (google.api_core.retry_async.AsyncRetry): Designation of what errors, if any,
612 should be retried.
613 timeout (float): The timeout for this request.
614 metadata (Sequence[Tuple[str, str]]): Strings which should be
615 sent along with the request as metadata.
616
617 Returns:
618 google.cloud.bigquery_storage_v1.types.SplitReadStreamResponse:
619 Response message for SplitReadStream.
620 """
621 # Create or coerce a protobuf request object.
622 request = storage.SplitReadStreamRequest(request)
623
624 # Wrap the RPC method; this adds retry and timeout information,
625 # and friendly error handling.
626 rpc = gapic_v1.method_async.wrap_method(
627 self._client._transport.split_read_stream,
628 default_retry=retries.AsyncRetry(
629 initial=0.1,
630 maximum=60.0,
631 multiplier=1.3,
632 predicate=retries.if_exception_type(
633 core_exceptions.DeadlineExceeded,
634 core_exceptions.ServiceUnavailable,
635 ),
636 deadline=600.0,
637 ),
638 default_timeout=600.0,
639 client_info=DEFAULT_CLIENT_INFO,
640 )
641
642 # Certain fields should be provided within the metadata header;
643 # add these here.
644 metadata = tuple(metadata) + (
645 gapic_v1.routing_header.to_grpc_metadata((("name", request.name),)),
646 )
647
648 # Validate the universe domain.
649 self._client._validate_universe_domain()
650
651 # Send the request.
652 response = await rpc(
653 request,
654 retry=retry,
655 timeout=timeout,
656 metadata=metadata,
657 )
658
659 # Done; return the response.
660 return response
661
662 async def __aenter__(self) -> "BigQueryReadAsyncClient":
663 return self
664
665 async def __aexit__(self, exc_type, exc, tb):
666 await self.transport.close()
667
668
669DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo(
670 gapic_version=package_version.__version__
671)
672
673
674__all__ = ("BigQueryReadAsyncClient",)