1# -*- coding: utf-8 -*-
2# Copyright 2022 Google LLC
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16from collections import OrderedDict
17import functools
18import re
19from typing import (
20 Dict,
21 Mapping,
22 MutableMapping,
23 MutableSequence,
24 Optional,
25 AsyncIterable,
26 Awaitable,
27 Sequence,
28 Tuple,
29 Type,
30 Union,
31)
32
33from google.cloud.bigquery_storage_v1 import gapic_version as package_version
34
35from google.api_core.client_options import ClientOptions
36from google.api_core import exceptions as core_exceptions
37from google.api_core import gapic_v1
38from google.api_core import retry as retries
39from google.auth import credentials as ga_credentials # type: ignore
40from google.oauth2 import service_account # type: ignore
41
42try:
43 OptionalRetry = Union[retries.Retry, gapic_v1.method._MethodDefault]
44except AttributeError: # pragma: NO COVER
45 OptionalRetry = Union[retries.Retry, object] # type: ignore
46
47from google.cloud.bigquery_storage_v1.types import arrow
48from google.cloud.bigquery_storage_v1.types import avro
49from google.cloud.bigquery_storage_v1.types import storage
50from google.cloud.bigquery_storage_v1.types import stream
51from google.protobuf import timestamp_pb2 # type: ignore
52from .transports.base import BigQueryReadTransport, DEFAULT_CLIENT_INFO
53from .transports.grpc_asyncio import BigQueryReadGrpcAsyncIOTransport
54from .client import BigQueryReadClient
55
56
57class BigQueryReadAsyncClient:
58 """BigQuery Read API.
59 The Read API can be used to read data from BigQuery.
60 """
61
62 _client: BigQueryReadClient
63
64 DEFAULT_ENDPOINT = BigQueryReadClient.DEFAULT_ENDPOINT
65 DEFAULT_MTLS_ENDPOINT = BigQueryReadClient.DEFAULT_MTLS_ENDPOINT
66
67 read_session_path = staticmethod(BigQueryReadClient.read_session_path)
68 parse_read_session_path = staticmethod(BigQueryReadClient.parse_read_session_path)
69 read_stream_path = staticmethod(BigQueryReadClient.read_stream_path)
70 parse_read_stream_path = staticmethod(BigQueryReadClient.parse_read_stream_path)
71 table_path = staticmethod(BigQueryReadClient.table_path)
72 parse_table_path = staticmethod(BigQueryReadClient.parse_table_path)
73 common_billing_account_path = staticmethod(
74 BigQueryReadClient.common_billing_account_path
75 )
76 parse_common_billing_account_path = staticmethod(
77 BigQueryReadClient.parse_common_billing_account_path
78 )
79 common_folder_path = staticmethod(BigQueryReadClient.common_folder_path)
80 parse_common_folder_path = staticmethod(BigQueryReadClient.parse_common_folder_path)
81 common_organization_path = staticmethod(BigQueryReadClient.common_organization_path)
82 parse_common_organization_path = staticmethod(
83 BigQueryReadClient.parse_common_organization_path
84 )
85 common_project_path = staticmethod(BigQueryReadClient.common_project_path)
86 parse_common_project_path = staticmethod(
87 BigQueryReadClient.parse_common_project_path
88 )
89 common_location_path = staticmethod(BigQueryReadClient.common_location_path)
90 parse_common_location_path = staticmethod(
91 BigQueryReadClient.parse_common_location_path
92 )
93
94 @classmethod
95 def from_service_account_info(cls, info: dict, *args, **kwargs):
96 """Creates an instance of this client using the provided credentials
97 info.
98
99 Args:
100 info (dict): The service account private key info.
101 args: Additional arguments to pass to the constructor.
102 kwargs: Additional arguments to pass to the constructor.
103
104 Returns:
105 BigQueryReadAsyncClient: The constructed client.
106 """
107 return BigQueryReadClient.from_service_account_info.__func__(BigQueryReadAsyncClient, info, *args, **kwargs) # type: ignore
108
109 @classmethod
110 def from_service_account_file(cls, filename: str, *args, **kwargs):
111 """Creates an instance of this client using the provided credentials
112 file.
113
114 Args:
115 filename (str): The path to the service account private key json
116 file.
117 args: Additional arguments to pass to the constructor.
118 kwargs: Additional arguments to pass to the constructor.
119
120 Returns:
121 BigQueryReadAsyncClient: The constructed client.
122 """
123 return BigQueryReadClient.from_service_account_file.__func__(BigQueryReadAsyncClient, filename, *args, **kwargs) # type: ignore
124
125 from_service_account_json = from_service_account_file
126
127 @classmethod
128 def get_mtls_endpoint_and_cert_source(
129 cls, client_options: Optional[ClientOptions] = None
130 ):
131 """Return the API endpoint and client cert source for mutual TLS.
132
133 The client cert source is determined in the following order:
134 (1) if `GOOGLE_API_USE_CLIENT_CERTIFICATE` environment variable is not "true", the
135 client cert source is None.
136 (2) if `client_options.client_cert_source` is provided, use the provided one; if the
137 default client cert source exists, use the default one; otherwise the client cert
138 source is None.
139
140 The API endpoint is determined in the following order:
141 (1) if `client_options.api_endpoint` if provided, use the provided one.
142 (2) if `GOOGLE_API_USE_CLIENT_CERTIFICATE` environment variable is "always", use the
143 default mTLS endpoint; if the environment variable is "never", use the default API
144 endpoint; otherwise if client cert source exists, use the default mTLS endpoint, otherwise
145 use the default API endpoint.
146
147 More details can be found at https://google.aip.dev/auth/4114.
148
149 Args:
150 client_options (google.api_core.client_options.ClientOptions): Custom options for the
151 client. Only the `api_endpoint` and `client_cert_source` properties may be used
152 in this method.
153
154 Returns:
155 Tuple[str, Callable[[], Tuple[bytes, bytes]]]: returns the API endpoint and the
156 client cert source to use.
157
158 Raises:
159 google.auth.exceptions.MutualTLSChannelError: If any errors happen.
160 """
161 return BigQueryReadClient.get_mtls_endpoint_and_cert_source(client_options) # type: ignore
162
163 @property
164 def transport(self) -> BigQueryReadTransport:
165 """Returns the transport used by the client instance.
166
167 Returns:
168 BigQueryReadTransport: The transport used by the client instance.
169 """
170 return self._client.transport
171
172 get_transport_class = functools.partial(
173 type(BigQueryReadClient).get_transport_class, type(BigQueryReadClient)
174 )
175
176 def __init__(
177 self,
178 *,
179 credentials: Optional[ga_credentials.Credentials] = None,
180 transport: Union[str, BigQueryReadTransport] = "grpc_asyncio",
181 client_options: Optional[ClientOptions] = None,
182 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO,
183 ) -> None:
184 """Instantiates the big query read client.
185
186 Args:
187 credentials (Optional[google.auth.credentials.Credentials]): The
188 authorization credentials to attach to requests. These
189 credentials identify the application to the service; if none
190 are specified, the client will attempt to ascertain the
191 credentials from the environment.
192 transport (Union[str, ~.BigQueryReadTransport]): The
193 transport to use. If set to None, a transport is chosen
194 automatically.
195 client_options (ClientOptions): Custom options for the client. It
196 won't take effect if a ``transport`` instance is provided.
197 (1) The ``api_endpoint`` property can be used to override the
198 default endpoint provided by the client. GOOGLE_API_USE_MTLS_ENDPOINT
199 environment variable can also be used to override the endpoint:
200 "always" (always use the default mTLS endpoint), "never" (always
201 use the default regular endpoint) and "auto" (auto switch to the
202 default mTLS endpoint if client certificate is present, this is
203 the default value). However, the ``api_endpoint`` property takes
204 precedence if provided.
205 (2) If GOOGLE_API_USE_CLIENT_CERTIFICATE environment variable
206 is "true", then the ``client_cert_source`` property can be used
207 to provide client certificate for mutual TLS transport. If
208 not provided, the default SSL client certificate will be used if
209 present. If GOOGLE_API_USE_CLIENT_CERTIFICATE is "false" or not
210 set, no client certificate will be used.
211
212 Raises:
213 google.auth.exceptions.MutualTlsChannelError: If mutual TLS transport
214 creation failed for any reason.
215 """
216 self._client = BigQueryReadClient(
217 credentials=credentials,
218 transport=transport,
219 client_options=client_options,
220 client_info=client_info,
221 )
222
223 async def create_read_session(
224 self,
225 request: Optional[Union[storage.CreateReadSessionRequest, dict]] = None,
226 *,
227 parent: Optional[str] = None,
228 read_session: Optional[stream.ReadSession] = None,
229 max_stream_count: Optional[int] = None,
230 retry: OptionalRetry = gapic_v1.method.DEFAULT,
231 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
232 metadata: Sequence[Tuple[str, str]] = (),
233 ) -> stream.ReadSession:
234 r"""Creates a new read session. A read session divides
235 the contents of a BigQuery table into one or more
236 streams, which can then be used to read data from the
237 table. The read session also specifies properties of the
238 data to be read, such as a list of columns or a
239 push-down filter describing the rows to be returned.
240
241 A particular row can be read by at most one stream. When
242 the caller has reached the end of each stream in the
243 session, then all the data in the table has been read.
244
245 Data is assigned to each stream such that roughly the
246 same number of rows can be read from each stream.
247 Because the server-side unit for assigning data is
248 collections of rows, the API does not guarantee that
249 each stream will return the same number or rows.
250 Additionally, the limits are enforced based on the
251 number of pre-filtered rows, so some filters can lead to
252 lopsided assignments.
253
254 Read sessions automatically expire 6 hours after they
255 are created and do not require manual clean-up by the
256 caller.
257
258 .. code-block:: python
259
260 # This snippet has been automatically generated and should be regarded as a
261 # code template only.
262 # It will require modifications to work:
263 # - It may require correct/in-range values for request initialization.
264 # - It may require specifying regional endpoints when creating the service
265 # client as shown in:
266 # https://googleapis.dev/python/google-api-core/latest/client_options.html
267 from google.cloud import bigquery_storage_v1
268
269 async def sample_create_read_session():
270 # Create a client
271 client = bigquery_storage_v1.BigQueryReadAsyncClient()
272
273 # Initialize request argument(s)
274 request = bigquery_storage_v1.CreateReadSessionRequest(
275 parent="parent_value",
276 )
277
278 # Make the request
279 response = await client.create_read_session(request=request)
280
281 # Handle the response
282 print(response)
283
284 Args:
285 request (Optional[Union[google.cloud.bigquery_storage_v1.types.CreateReadSessionRequest, dict]]):
286 The request object. Request message for ``CreateReadSession``.
287 parent (:class:`str`):
288 Required. The request project that owns the session, in
289 the form of ``projects/{project_id}``.
290
291 This corresponds to the ``parent`` field
292 on the ``request`` instance; if ``request`` is provided, this
293 should not be set.
294 read_session (:class:`google.cloud.bigquery_storage_v1.types.ReadSession`):
295 Required. Session to be created.
296 This corresponds to the ``read_session`` field
297 on the ``request`` instance; if ``request`` is provided, this
298 should not be set.
299 max_stream_count (:class:`int`):
300 Max initial number of streams. If unset or zero, the
301 server will provide a value of streams so as to produce
302 reasonable throughput. Must be non-negative. The number
303 of streams may be lower than the requested number,
304 depending on the amount parallelism that is reasonable
305 for the table. There is a default system max limit of
306 1,000.
307
308 This must be greater than or equal to
309 preferred_min_stream_count. Typically, clients should
310 either leave this unset to let the system to determine
311 an upper bound OR set this a size for the maximum "units
312 of work" it can gracefully handle.
313
314 This corresponds to the ``max_stream_count`` field
315 on the ``request`` instance; if ``request`` is provided, this
316 should not be set.
317 retry (google.api_core.retry.Retry): Designation of what errors, if any,
318 should be retried.
319 timeout (float): The timeout for this request.
320 metadata (Sequence[Tuple[str, str]]): Strings which should be
321 sent along with the request as metadata.
322
323 Returns:
324 google.cloud.bigquery_storage_v1.types.ReadSession:
325 Information about the ReadSession.
326 """
327 # Create or coerce a protobuf request object.
328 # Quick check: If we got a request object, we should *not* have
329 # gotten any keyword arguments that map to the request.
330 has_flattened_params = any([parent, read_session, max_stream_count])
331 if request is not None and has_flattened_params:
332 raise ValueError(
333 "If the `request` argument is set, then none of "
334 "the individual field arguments should be set."
335 )
336
337 request = storage.CreateReadSessionRequest(request)
338
339 # If we have keyword arguments corresponding to fields on the
340 # request, apply these.
341 if parent is not None:
342 request.parent = parent
343 if read_session is not None:
344 request.read_session = read_session
345 if max_stream_count is not None:
346 request.max_stream_count = max_stream_count
347
348 # Wrap the RPC method; this adds retry and timeout information,
349 # and friendly error handling.
350 rpc = gapic_v1.method_async.wrap_method(
351 self._client._transport.create_read_session,
352 default_retry=retries.Retry(
353 initial=0.1,
354 maximum=60.0,
355 multiplier=1.3,
356 predicate=retries.if_exception_type(
357 core_exceptions.DeadlineExceeded,
358 core_exceptions.ServiceUnavailable,
359 ),
360 deadline=600.0,
361 ),
362 default_timeout=600.0,
363 client_info=DEFAULT_CLIENT_INFO,
364 )
365
366 # Certain fields should be provided within the metadata header;
367 # add these here.
368 metadata = tuple(metadata) + (
369 gapic_v1.routing_header.to_grpc_metadata(
370 (("read_session.table", request.read_session.table),)
371 ),
372 )
373
374 # Send the request.
375 response = await rpc(
376 request,
377 retry=retry,
378 timeout=timeout,
379 metadata=metadata,
380 )
381
382 # Done; return the response.
383 return response
384
385 def read_rows(
386 self,
387 request: Optional[Union[storage.ReadRowsRequest, dict]] = None,
388 *,
389 read_stream: Optional[str] = None,
390 offset: Optional[int] = None,
391 retry: OptionalRetry = gapic_v1.method.DEFAULT,
392 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
393 metadata: Sequence[Tuple[str, str]] = (),
394 ) -> Awaitable[AsyncIterable[storage.ReadRowsResponse]]:
395 r"""Reads rows from the stream in the format prescribed
396 by the ReadSession. Each response contains one or more
397 table rows, up to a maximum of 100 MiB per response;
398 read requests which attempt to read individual rows
399 larger than 100 MiB will fail.
400
401 Each request also returns a set of stream statistics
402 reflecting the current state of the stream.
403
404 .. code-block:: python
405
406 # This snippet has been automatically generated and should be regarded as a
407 # code template only.
408 # It will require modifications to work:
409 # - It may require correct/in-range values for request initialization.
410 # - It may require specifying regional endpoints when creating the service
411 # client as shown in:
412 # https://googleapis.dev/python/google-api-core/latest/client_options.html
413 from google.cloud import bigquery_storage_v1
414
415 async def sample_read_rows():
416 # Create a client
417 client = bigquery_storage_v1.BigQueryReadAsyncClient()
418
419 # Initialize request argument(s)
420 request = bigquery_storage_v1.ReadRowsRequest(
421 read_stream="read_stream_value",
422 )
423
424 # Make the request
425 stream = await client.read_rows(request=request)
426
427 # Handle the response
428 async for response in stream:
429 print(response)
430
431 Args:
432 request (Optional[Union[google.cloud.bigquery_storage_v1.types.ReadRowsRequest, dict]]):
433 The request object. Request message for ``ReadRows``.
434 read_stream (:class:`str`):
435 Required. Stream to read rows from.
436 This corresponds to the ``read_stream`` field
437 on the ``request`` instance; if ``request`` is provided, this
438 should not be set.
439 offset (:class:`int`):
440 The offset requested must be less
441 than the last row read from Read.
442 Requesting a larger offset is undefined.
443 If not specified, start reading from
444 offset zero.
445
446 This corresponds to the ``offset`` field
447 on the ``request`` instance; if ``request`` is provided, this
448 should not be set.
449 retry (google.api_core.retry.Retry): Designation of what errors, if any,
450 should be retried.
451 timeout (float): The timeout for this request.
452 metadata (Sequence[Tuple[str, str]]): Strings which should be
453 sent along with the request as metadata.
454
455 Returns:
456 AsyncIterable[google.cloud.bigquery_storage_v1.types.ReadRowsResponse]:
457 Response from calling ReadRows may include row data, progress and
458 throttling information.
459
460 """
461 # Create or coerce a protobuf request object.
462 # Quick check: If we got a request object, we should *not* have
463 # gotten any keyword arguments that map to the request.
464 has_flattened_params = any([read_stream, offset])
465 if request is not None and has_flattened_params:
466 raise ValueError(
467 "If the `request` argument is set, then none of "
468 "the individual field arguments should be set."
469 )
470
471 request = storage.ReadRowsRequest(request)
472
473 # If we have keyword arguments corresponding to fields on the
474 # request, apply these.
475 if read_stream is not None:
476 request.read_stream = read_stream
477 if offset is not None:
478 request.offset = offset
479
480 # Wrap the RPC method; this adds retry and timeout information,
481 # and friendly error handling.
482 rpc = gapic_v1.method_async.wrap_method(
483 self._client._transport.read_rows,
484 default_retry=retries.Retry(
485 initial=0.1,
486 maximum=60.0,
487 multiplier=1.3,
488 predicate=retries.if_exception_type(
489 core_exceptions.ServiceUnavailable,
490 ),
491 deadline=86400.0,
492 ),
493 default_timeout=86400.0,
494 client_info=DEFAULT_CLIENT_INFO,
495 )
496
497 # Certain fields should be provided within the metadata header;
498 # add these here.
499 metadata = tuple(metadata) + (
500 gapic_v1.routing_header.to_grpc_metadata(
501 (("read_stream", request.read_stream),)
502 ),
503 )
504
505 # Send the request.
506 response = rpc(
507 request,
508 retry=retry,
509 timeout=timeout,
510 metadata=metadata,
511 )
512
513 # Done; return the response.
514 return response
515
516 async def split_read_stream(
517 self,
518 request: Optional[Union[storage.SplitReadStreamRequest, dict]] = None,
519 *,
520 retry: OptionalRetry = gapic_v1.method.DEFAULT,
521 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
522 metadata: Sequence[Tuple[str, str]] = (),
523 ) -> storage.SplitReadStreamResponse:
524 r"""Splits a given ``ReadStream`` into two ``ReadStream`` objects.
525 These ``ReadStream`` objects are referred to as the primary and
526 the residual streams of the split. The original ``ReadStream``
527 can still be read from in the same manner as before. Both of the
528 returned ``ReadStream`` objects can also be read from, and the
529 rows returned by both child streams will be the same as the rows
530 read from the original stream.
531
532 Moreover, the two child streams will be allocated back-to-back
533 in the original ``ReadStream``. Concretely, it is guaranteed
534 that for streams original, primary, and residual, that
535 original[0-j] = primary[0-j] and original[j-n] = residual[0-m]
536 once the streams have been read to completion.
537
538 .. code-block:: python
539
540 # This snippet has been automatically generated and should be regarded as a
541 # code template only.
542 # It will require modifications to work:
543 # - It may require correct/in-range values for request initialization.
544 # - It may require specifying regional endpoints when creating the service
545 # client as shown in:
546 # https://googleapis.dev/python/google-api-core/latest/client_options.html
547 from google.cloud import bigquery_storage_v1
548
549 async def sample_split_read_stream():
550 # Create a client
551 client = bigquery_storage_v1.BigQueryReadAsyncClient()
552
553 # Initialize request argument(s)
554 request = bigquery_storage_v1.SplitReadStreamRequest(
555 name="name_value",
556 )
557
558 # Make the request
559 response = await client.split_read_stream(request=request)
560
561 # Handle the response
562 print(response)
563
564 Args:
565 request (Optional[Union[google.cloud.bigquery_storage_v1.types.SplitReadStreamRequest, dict]]):
566 The request object. Request message for ``SplitReadStream``.
567 retry (google.api_core.retry.Retry): Designation of what errors, if any,
568 should be retried.
569 timeout (float): The timeout for this request.
570 metadata (Sequence[Tuple[str, str]]): Strings which should be
571 sent along with the request as metadata.
572
573 Returns:
574 google.cloud.bigquery_storage_v1.types.SplitReadStreamResponse:
575 Response message for SplitReadStream.
576 """
577 # Create or coerce a protobuf request object.
578 request = storage.SplitReadStreamRequest(request)
579
580 # Wrap the RPC method; this adds retry and timeout information,
581 # and friendly error handling.
582 rpc = gapic_v1.method_async.wrap_method(
583 self._client._transport.split_read_stream,
584 default_retry=retries.Retry(
585 initial=0.1,
586 maximum=60.0,
587 multiplier=1.3,
588 predicate=retries.if_exception_type(
589 core_exceptions.DeadlineExceeded,
590 core_exceptions.ServiceUnavailable,
591 ),
592 deadline=600.0,
593 ),
594 default_timeout=600.0,
595 client_info=DEFAULT_CLIENT_INFO,
596 )
597
598 # Certain fields should be provided within the metadata header;
599 # add these here.
600 metadata = tuple(metadata) + (
601 gapic_v1.routing_header.to_grpc_metadata((("name", request.name),)),
602 )
603
604 # Send the request.
605 response = await rpc(
606 request,
607 retry=retry,
608 timeout=timeout,
609 metadata=metadata,
610 )
611
612 # Done; return the response.
613 return response
614
615 async def __aenter__(self):
616 return self
617
618 async def __aexit__(self, exc_type, exc, tb):
619 await self.transport.close()
620
621
622DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo(
623 gapic_version=package_version.__version__
624)
625
626
627__all__ = ("BigQueryReadAsyncClient",)