1# -*- coding: utf-8 -*-
2# Copyright 2022 Google LLC
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16from collections import OrderedDict
17import functools
18import re
19from typing import (
20 Dict,
21 Mapping,
22 MutableMapping,
23 MutableSequence,
24 Optional,
25 AsyncIterable,
26 Awaitable,
27 AsyncIterator,
28 Sequence,
29 Tuple,
30 Type,
31 Union,
32)
33
34from google.cloud.bigquery_storage_v1 import gapic_version as package_version
35
36from google.api_core.client_options import ClientOptions
37from google.api_core import exceptions as core_exceptions
38from google.api_core import gapic_v1
39from google.api_core import retry as retries
40from google.auth import credentials as ga_credentials # type: ignore
41from google.oauth2 import service_account # type: ignore
42
43try:
44 OptionalRetry = Union[retries.Retry, gapic_v1.method._MethodDefault]
45except AttributeError: # pragma: NO COVER
46 OptionalRetry = Union[retries.Retry, object] # type: ignore
47
48from google.cloud.bigquery_storage_v1.types import storage
49from google.cloud.bigquery_storage_v1.types import stream
50from google.cloud.bigquery_storage_v1.types import table
51from google.protobuf import timestamp_pb2 # type: ignore
52from google.rpc import status_pb2 # type: ignore
53from .transports.base import BigQueryWriteTransport, DEFAULT_CLIENT_INFO
54from .transports.grpc_asyncio import BigQueryWriteGrpcAsyncIOTransport
55from .client import BigQueryWriteClient
56
57
58class BigQueryWriteAsyncClient:
59 """BigQuery Write API.
60 The Write API can be used to write data to BigQuery.
61 For supplementary information about the Write API, see:
62 https://cloud.google.com/bigquery/docs/write-api
63 """
64
65 _client: BigQueryWriteClient
66
67 DEFAULT_ENDPOINT = BigQueryWriteClient.DEFAULT_ENDPOINT
68 DEFAULT_MTLS_ENDPOINT = BigQueryWriteClient.DEFAULT_MTLS_ENDPOINT
69
70 table_path = staticmethod(BigQueryWriteClient.table_path)
71 parse_table_path = staticmethod(BigQueryWriteClient.parse_table_path)
72 write_stream_path = staticmethod(BigQueryWriteClient.write_stream_path)
73 parse_write_stream_path = staticmethod(BigQueryWriteClient.parse_write_stream_path)
74 common_billing_account_path = staticmethod(
75 BigQueryWriteClient.common_billing_account_path
76 )
77 parse_common_billing_account_path = staticmethod(
78 BigQueryWriteClient.parse_common_billing_account_path
79 )
80 common_folder_path = staticmethod(BigQueryWriteClient.common_folder_path)
81 parse_common_folder_path = staticmethod(
82 BigQueryWriteClient.parse_common_folder_path
83 )
84 common_organization_path = staticmethod(
85 BigQueryWriteClient.common_organization_path
86 )
87 parse_common_organization_path = staticmethod(
88 BigQueryWriteClient.parse_common_organization_path
89 )
90 common_project_path = staticmethod(BigQueryWriteClient.common_project_path)
91 parse_common_project_path = staticmethod(
92 BigQueryWriteClient.parse_common_project_path
93 )
94 common_location_path = staticmethod(BigQueryWriteClient.common_location_path)
95 parse_common_location_path = staticmethod(
96 BigQueryWriteClient.parse_common_location_path
97 )
98
99 @classmethod
100 def from_service_account_info(cls, info: dict, *args, **kwargs):
101 """Creates an instance of this client using the provided credentials
102 info.
103
104 Args:
105 info (dict): The service account private key info.
106 args: Additional arguments to pass to the constructor.
107 kwargs: Additional arguments to pass to the constructor.
108
109 Returns:
110 BigQueryWriteAsyncClient: The constructed client.
111 """
112 return BigQueryWriteClient.from_service_account_info.__func__(BigQueryWriteAsyncClient, info, *args, **kwargs) # type: ignore
113
114 @classmethod
115 def from_service_account_file(cls, filename: str, *args, **kwargs):
116 """Creates an instance of this client using the provided credentials
117 file.
118
119 Args:
120 filename (str): The path to the service account private key json
121 file.
122 args: Additional arguments to pass to the constructor.
123 kwargs: Additional arguments to pass to the constructor.
124
125 Returns:
126 BigQueryWriteAsyncClient: The constructed client.
127 """
128 return BigQueryWriteClient.from_service_account_file.__func__(BigQueryWriteAsyncClient, filename, *args, **kwargs) # type: ignore
129
130 from_service_account_json = from_service_account_file
131
132 @classmethod
133 def get_mtls_endpoint_and_cert_source(
134 cls, client_options: Optional[ClientOptions] = None
135 ):
136 """Return the API endpoint and client cert source for mutual TLS.
137
138 The client cert source is determined in the following order:
139 (1) if `GOOGLE_API_USE_CLIENT_CERTIFICATE` environment variable is not "true", the
140 client cert source is None.
141 (2) if `client_options.client_cert_source` is provided, use the provided one; if the
142 default client cert source exists, use the default one; otherwise the client cert
143 source is None.
144
145 The API endpoint is determined in the following order:
146 (1) if `client_options.api_endpoint` if provided, use the provided one.
147 (2) if `GOOGLE_API_USE_CLIENT_CERTIFICATE` environment variable is "always", use the
148 default mTLS endpoint; if the environment variable is "never", use the default API
149 endpoint; otherwise if client cert source exists, use the default mTLS endpoint, otherwise
150 use the default API endpoint.
151
152 More details can be found at https://google.aip.dev/auth/4114.
153
154 Args:
155 client_options (google.api_core.client_options.ClientOptions): Custom options for the
156 client. Only the `api_endpoint` and `client_cert_source` properties may be used
157 in this method.
158
159 Returns:
160 Tuple[str, Callable[[], Tuple[bytes, bytes]]]: returns the API endpoint and the
161 client cert source to use.
162
163 Raises:
164 google.auth.exceptions.MutualTLSChannelError: If any errors happen.
165 """
166 return BigQueryWriteClient.get_mtls_endpoint_and_cert_source(client_options) # type: ignore
167
168 @property
169 def transport(self) -> BigQueryWriteTransport:
170 """Returns the transport used by the client instance.
171
172 Returns:
173 BigQueryWriteTransport: The transport used by the client instance.
174 """
175 return self._client.transport
176
177 get_transport_class = functools.partial(
178 type(BigQueryWriteClient).get_transport_class, type(BigQueryWriteClient)
179 )
180
181 def __init__(
182 self,
183 *,
184 credentials: Optional[ga_credentials.Credentials] = None,
185 transport: Union[str, BigQueryWriteTransport] = "grpc_asyncio",
186 client_options: Optional[ClientOptions] = None,
187 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO,
188 ) -> None:
189 """Instantiates the big query write client.
190
191 Args:
192 credentials (Optional[google.auth.credentials.Credentials]): The
193 authorization credentials to attach to requests. These
194 credentials identify the application to the service; if none
195 are specified, the client will attempt to ascertain the
196 credentials from the environment.
197 transport (Union[str, ~.BigQueryWriteTransport]): The
198 transport to use. If set to None, a transport is chosen
199 automatically.
200 client_options (ClientOptions): Custom options for the client. It
201 won't take effect if a ``transport`` instance is provided.
202 (1) The ``api_endpoint`` property can be used to override the
203 default endpoint provided by the client. GOOGLE_API_USE_MTLS_ENDPOINT
204 environment variable can also be used to override the endpoint:
205 "always" (always use the default mTLS endpoint), "never" (always
206 use the default regular endpoint) and "auto" (auto switch to the
207 default mTLS endpoint if client certificate is present, this is
208 the default value). However, the ``api_endpoint`` property takes
209 precedence if provided.
210 (2) If GOOGLE_API_USE_CLIENT_CERTIFICATE environment variable
211 is "true", then the ``client_cert_source`` property can be used
212 to provide client certificate for mutual TLS transport. If
213 not provided, the default SSL client certificate will be used if
214 present. If GOOGLE_API_USE_CLIENT_CERTIFICATE is "false" or not
215 set, no client certificate will be used.
216
217 Raises:
218 google.auth.exceptions.MutualTlsChannelError: If mutual TLS transport
219 creation failed for any reason.
220 """
221 self._client = BigQueryWriteClient(
222 credentials=credentials,
223 transport=transport,
224 client_options=client_options,
225 client_info=client_info,
226 )
227
228 async def create_write_stream(
229 self,
230 request: Optional[Union[storage.CreateWriteStreamRequest, dict]] = None,
231 *,
232 parent: Optional[str] = None,
233 write_stream: Optional[stream.WriteStream] = None,
234 retry: OptionalRetry = gapic_v1.method.DEFAULT,
235 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
236 metadata: Sequence[Tuple[str, str]] = (),
237 ) -> stream.WriteStream:
238 r"""Creates a write stream to the given table. Additionally, every
239 table has a special stream named '_default' to which data can be
240 written. This stream doesn't need to be created using
241 CreateWriteStream. It is a stream that can be used
242 simultaneously by any number of clients. Data written to this
243 stream is considered committed as soon as an acknowledgement is
244 received.
245
246 .. code-block:: python
247
248 # This snippet has been automatically generated and should be regarded as a
249 # code template only.
250 # It will require modifications to work:
251 # - It may require correct/in-range values for request initialization.
252 # - It may require specifying regional endpoints when creating the service
253 # client as shown in:
254 # https://googleapis.dev/python/google-api-core/latest/client_options.html
255 from google.cloud import bigquery_storage_v1
256
257 async def sample_create_write_stream():
258 # Create a client
259 client = bigquery_storage_v1.BigQueryWriteAsyncClient()
260
261 # Initialize request argument(s)
262 request = bigquery_storage_v1.CreateWriteStreamRequest(
263 parent="parent_value",
264 )
265
266 # Make the request
267 response = await client.create_write_stream(request=request)
268
269 # Handle the response
270 print(response)
271
272 Args:
273 request (Optional[Union[google.cloud.bigquery_storage_v1.types.CreateWriteStreamRequest, dict]]):
274 The request object. Request message for ``CreateWriteStream``.
275 parent (:class:`str`):
276 Required. Reference to the table to which the stream
277 belongs, in the format of
278 ``projects/{project}/datasets/{dataset}/tables/{table}``.
279
280 This corresponds to the ``parent`` field
281 on the ``request`` instance; if ``request`` is provided, this
282 should not be set.
283 write_stream (:class:`google.cloud.bigquery_storage_v1.types.WriteStream`):
284 Required. Stream to be created.
285 This corresponds to the ``write_stream`` field
286 on the ``request`` instance; if ``request`` is provided, this
287 should not be set.
288 retry (google.api_core.retry.Retry): Designation of what errors, if any,
289 should be retried.
290 timeout (float): The timeout for this request.
291 metadata (Sequence[Tuple[str, str]]): Strings which should be
292 sent along with the request as metadata.
293
294 Returns:
295 google.cloud.bigquery_storage_v1.types.WriteStream:
296 Information about a single stream
297 that gets data inside the storage
298 system.
299
300 """
301 # Create or coerce a protobuf request object.
302 # Quick check: If we got a request object, we should *not* have
303 # gotten any keyword arguments that map to the request.
304 has_flattened_params = any([parent, write_stream])
305 if request is not None and has_flattened_params:
306 raise ValueError(
307 "If the `request` argument is set, then none of "
308 "the individual field arguments should be set."
309 )
310
311 request = storage.CreateWriteStreamRequest(request)
312
313 # If we have keyword arguments corresponding to fields on the
314 # request, apply these.
315 if parent is not None:
316 request.parent = parent
317 if write_stream is not None:
318 request.write_stream = write_stream
319
320 # Wrap the RPC method; this adds retry and timeout information,
321 # and friendly error handling.
322 rpc = gapic_v1.method_async.wrap_method(
323 self._client._transport.create_write_stream,
324 default_retry=retries.Retry(
325 initial=10.0,
326 maximum=120.0,
327 multiplier=1.3,
328 predicate=retries.if_exception_type(
329 core_exceptions.DeadlineExceeded,
330 core_exceptions.ResourceExhausted,
331 core_exceptions.ServiceUnavailable,
332 ),
333 deadline=1200.0,
334 ),
335 default_timeout=1200.0,
336 client_info=DEFAULT_CLIENT_INFO,
337 )
338
339 # Certain fields should be provided within the metadata header;
340 # add these here.
341 metadata = tuple(metadata) + (
342 gapic_v1.routing_header.to_grpc_metadata((("parent", request.parent),)),
343 )
344
345 # Send the request.
346 response = await rpc(
347 request,
348 retry=retry,
349 timeout=timeout,
350 metadata=metadata,
351 )
352
353 # Done; return the response.
354 return response
355
356 def append_rows(
357 self,
358 requests: Optional[AsyncIterator[storage.AppendRowsRequest]] = None,
359 *,
360 retry: OptionalRetry = gapic_v1.method.DEFAULT,
361 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
362 metadata: Sequence[Tuple[str, str]] = (),
363 ) -> Awaitable[AsyncIterable[storage.AppendRowsResponse]]:
364 r"""Appends data to the given stream.
365
366 If ``offset`` is specified, the ``offset`` is checked against
367 the end of stream. The server returns ``OUT_OF_RANGE`` in
368 ``AppendRowsResponse`` if an attempt is made to append to an
369 offset beyond the current end of the stream or
370 ``ALREADY_EXISTS`` if user provides an ``offset`` that has
371 already been written to. User can retry with adjusted offset
372 within the same RPC connection. If ``offset`` is not specified,
373 append happens at the end of the stream.
374
375 The response contains an optional offset at which the append
376 happened. No offset information will be returned for appends to
377 a default stream.
378
379 Responses are received in the same order in which requests are
380 sent. There will be one response for each successful inserted
381 request. Responses may optionally embed error information if the
382 originating AppendRequest was not successfully processed.
383
384 The specifics of when successfully appended data is made visible
385 to the table are governed by the type of stream:
386
387 - For COMMITTED streams (which includes the default stream),
388 data is visible immediately upon successful append.
389
390 - For BUFFERED streams, data is made visible via a subsequent
391 ``FlushRows`` rpc which advances a cursor to a newer offset
392 in the stream.
393
394 - For PENDING streams, data is not made visible until the
395 stream itself is finalized (via the ``FinalizeWriteStream``
396 rpc), and the stream is explicitly committed via the
397 ``BatchCommitWriteStreams`` rpc.
398
399 .. code-block:: python
400
401 # This snippet has been automatically generated and should be regarded as a
402 # code template only.
403 # It will require modifications to work:
404 # - It may require correct/in-range values for request initialization.
405 # - It may require specifying regional endpoints when creating the service
406 # client as shown in:
407 # https://googleapis.dev/python/google-api-core/latest/client_options.html
408 from google.cloud import bigquery_storage_v1
409
410 async def sample_append_rows():
411 # Create a client
412 client = bigquery_storage_v1.BigQueryWriteAsyncClient()
413
414 # Initialize request argument(s)
415 request = bigquery_storage_v1.AppendRowsRequest(
416 write_stream="write_stream_value",
417 )
418
419 # This method expects an iterator which contains
420 # 'bigquery_storage_v1.AppendRowsRequest' objects
421 # Here we create a generator that yields a single `request` for
422 # demonstrative purposes.
423 requests = [request]
424
425 def request_generator():
426 for request in requests:
427 yield request
428
429 # Make the request
430 stream = await client.append_rows(requests=request_generator())
431
432 # Handle the response
433 async for response in stream:
434 print(response)
435
436 Args:
437 requests (AsyncIterator[`google.cloud.bigquery_storage_v1.types.AppendRowsRequest`]):
438 The request object AsyncIterator. Request message for ``AppendRows``.
439
440 Due to the nature of AppendRows being a bidirectional
441 streaming RPC, certain parts of the AppendRowsRequest
442 need only be specified for the first request sent each
443 time the gRPC network connection is opened/reopened.
444
445 The size of a single AppendRowsRequest must be less than
446 10 MB in size. Requests larger than this return an
447 error, typically ``INVALID_ARGUMENT``.
448 retry (google.api_core.retry.Retry): Designation of what errors, if any,
449 should be retried.
450 timeout (float): The timeout for this request.
451 metadata (Sequence[Tuple[str, str]]): Strings which should be
452 sent along with the request as metadata.
453
454 Returns:
455 AsyncIterable[google.cloud.bigquery_storage_v1.types.AppendRowsResponse]:
456 Response message for AppendRows.
457 """
458
459 # Wrap the RPC method; this adds retry and timeout information,
460 # and friendly error handling.
461 rpc = gapic_v1.method_async.wrap_method(
462 self._client._transport.append_rows,
463 default_retry=retries.Retry(
464 initial=0.1,
465 maximum=60.0,
466 multiplier=1.3,
467 predicate=retries.if_exception_type(
468 core_exceptions.ServiceUnavailable,
469 ),
470 deadline=86400.0,
471 ),
472 default_timeout=86400.0,
473 client_info=DEFAULT_CLIENT_INFO,
474 )
475
476 # Certain fields should be provided within the metadata header;
477 # add these here.
478 metadata = tuple(metadata) + (gapic_v1.routing_header.to_grpc_metadata(()),)
479
480 # Send the request.
481 response = rpc(
482 requests,
483 retry=retry,
484 timeout=timeout,
485 metadata=metadata,
486 )
487
488 # Done; return the response.
489 return response
490
491 async def get_write_stream(
492 self,
493 request: Optional[Union[storage.GetWriteStreamRequest, dict]] = None,
494 *,
495 name: Optional[str] = None,
496 retry: OptionalRetry = gapic_v1.method.DEFAULT,
497 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
498 metadata: Sequence[Tuple[str, str]] = (),
499 ) -> stream.WriteStream:
500 r"""Gets information about a write stream.
501
502 .. code-block:: python
503
504 # This snippet has been automatically generated and should be regarded as a
505 # code template only.
506 # It will require modifications to work:
507 # - It may require correct/in-range values for request initialization.
508 # - It may require specifying regional endpoints when creating the service
509 # client as shown in:
510 # https://googleapis.dev/python/google-api-core/latest/client_options.html
511 from google.cloud import bigquery_storage_v1
512
513 async def sample_get_write_stream():
514 # Create a client
515 client = bigquery_storage_v1.BigQueryWriteAsyncClient()
516
517 # Initialize request argument(s)
518 request = bigquery_storage_v1.GetWriteStreamRequest(
519 name="name_value",
520 )
521
522 # Make the request
523 response = await client.get_write_stream(request=request)
524
525 # Handle the response
526 print(response)
527
528 Args:
529 request (Optional[Union[google.cloud.bigquery_storage_v1.types.GetWriteStreamRequest, dict]]):
530 The request object. Request message for ``GetWriteStreamRequest``.
531 name (:class:`str`):
532 Required. Name of the stream to get, in the form of
533 ``projects/{project}/datasets/{dataset}/tables/{table}/streams/{stream}``.
534
535 This corresponds to the ``name`` field
536 on the ``request`` instance; if ``request`` is provided, this
537 should not be set.
538 retry (google.api_core.retry.Retry): Designation of what errors, if any,
539 should be retried.
540 timeout (float): The timeout for this request.
541 metadata (Sequence[Tuple[str, str]]): Strings which should be
542 sent along with the request as metadata.
543
544 Returns:
545 google.cloud.bigquery_storage_v1.types.WriteStream:
546 Information about a single stream
547 that gets data inside the storage
548 system.
549
550 """
551 # Create or coerce a protobuf request object.
552 # Quick check: If we got a request object, we should *not* have
553 # gotten any keyword arguments that map to the request.
554 has_flattened_params = any([name])
555 if request is not None and has_flattened_params:
556 raise ValueError(
557 "If the `request` argument is set, then none of "
558 "the individual field arguments should be set."
559 )
560
561 request = storage.GetWriteStreamRequest(request)
562
563 # If we have keyword arguments corresponding to fields on the
564 # request, apply these.
565 if name is not None:
566 request.name = name
567
568 # Wrap the RPC method; this adds retry and timeout information,
569 # and friendly error handling.
570 rpc = gapic_v1.method_async.wrap_method(
571 self._client._transport.get_write_stream,
572 default_retry=retries.Retry(
573 initial=0.1,
574 maximum=60.0,
575 multiplier=1.3,
576 predicate=retries.if_exception_type(
577 core_exceptions.DeadlineExceeded,
578 core_exceptions.ServiceUnavailable,
579 ),
580 deadline=600.0,
581 ),
582 default_timeout=600.0,
583 client_info=DEFAULT_CLIENT_INFO,
584 )
585
586 # Certain fields should be provided within the metadata header;
587 # add these here.
588 metadata = tuple(metadata) + (
589 gapic_v1.routing_header.to_grpc_metadata((("name", request.name),)),
590 )
591
592 # Send the request.
593 response = await rpc(
594 request,
595 retry=retry,
596 timeout=timeout,
597 metadata=metadata,
598 )
599
600 # Done; return the response.
601 return response
602
603 async def finalize_write_stream(
604 self,
605 request: Optional[Union[storage.FinalizeWriteStreamRequest, dict]] = None,
606 *,
607 name: Optional[str] = None,
608 retry: OptionalRetry = gapic_v1.method.DEFAULT,
609 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
610 metadata: Sequence[Tuple[str, str]] = (),
611 ) -> storage.FinalizeWriteStreamResponse:
612 r"""Finalize a write stream so that no new data can be appended to
613 the stream. Finalize is not supported on the '_default' stream.
614
615 .. code-block:: python
616
617 # This snippet has been automatically generated and should be regarded as a
618 # code template only.
619 # It will require modifications to work:
620 # - It may require correct/in-range values for request initialization.
621 # - It may require specifying regional endpoints when creating the service
622 # client as shown in:
623 # https://googleapis.dev/python/google-api-core/latest/client_options.html
624 from google.cloud import bigquery_storage_v1
625
626 async def sample_finalize_write_stream():
627 # Create a client
628 client = bigquery_storage_v1.BigQueryWriteAsyncClient()
629
630 # Initialize request argument(s)
631 request = bigquery_storage_v1.FinalizeWriteStreamRequest(
632 name="name_value",
633 )
634
635 # Make the request
636 response = await client.finalize_write_stream(request=request)
637
638 # Handle the response
639 print(response)
640
641 Args:
642 request (Optional[Union[google.cloud.bigquery_storage_v1.types.FinalizeWriteStreamRequest, dict]]):
643 The request object. Request message for invoking ``FinalizeWriteStream``.
644 name (:class:`str`):
645 Required. Name of the stream to finalize, in the form of
646 ``projects/{project}/datasets/{dataset}/tables/{table}/streams/{stream}``.
647
648 This corresponds to the ``name`` field
649 on the ``request`` instance; if ``request`` is provided, this
650 should not be set.
651 retry (google.api_core.retry.Retry): Designation of what errors, if any,
652 should be retried.
653 timeout (float): The timeout for this request.
654 metadata (Sequence[Tuple[str, str]]): Strings which should be
655 sent along with the request as metadata.
656
657 Returns:
658 google.cloud.bigquery_storage_v1.types.FinalizeWriteStreamResponse:
659 Response message for FinalizeWriteStream.
660 """
661 # Create or coerce a protobuf request object.
662 # Quick check: If we got a request object, we should *not* have
663 # gotten any keyword arguments that map to the request.
664 has_flattened_params = any([name])
665 if request is not None and has_flattened_params:
666 raise ValueError(
667 "If the `request` argument is set, then none of "
668 "the individual field arguments should be set."
669 )
670
671 request = storage.FinalizeWriteStreamRequest(request)
672
673 # If we have keyword arguments corresponding to fields on the
674 # request, apply these.
675 if name is not None:
676 request.name = name
677
678 # Wrap the RPC method; this adds retry and timeout information,
679 # and friendly error handling.
680 rpc = gapic_v1.method_async.wrap_method(
681 self._client._transport.finalize_write_stream,
682 default_retry=retries.Retry(
683 initial=0.1,
684 maximum=60.0,
685 multiplier=1.3,
686 predicate=retries.if_exception_type(
687 core_exceptions.DeadlineExceeded,
688 core_exceptions.ServiceUnavailable,
689 ),
690 deadline=600.0,
691 ),
692 default_timeout=600.0,
693 client_info=DEFAULT_CLIENT_INFO,
694 )
695
696 # Certain fields should be provided within the metadata header;
697 # add these here.
698 metadata = tuple(metadata) + (
699 gapic_v1.routing_header.to_grpc_metadata((("name", request.name),)),
700 )
701
702 # Send the request.
703 response = await rpc(
704 request,
705 retry=retry,
706 timeout=timeout,
707 metadata=metadata,
708 )
709
710 # Done; return the response.
711 return response
712
713 async def batch_commit_write_streams(
714 self,
715 request: Optional[Union[storage.BatchCommitWriteStreamsRequest, dict]] = None,
716 *,
717 parent: Optional[str] = None,
718 retry: OptionalRetry = gapic_v1.method.DEFAULT,
719 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
720 metadata: Sequence[Tuple[str, str]] = (),
721 ) -> storage.BatchCommitWriteStreamsResponse:
722 r"""Atomically commits a group of ``PENDING`` streams that belong to
723 the same ``parent`` table.
724
725 Streams must be finalized before commit and cannot be committed
726 multiple times. Once a stream is committed, data in the stream
727 becomes available for read operations.
728
729 .. code-block:: python
730
731 # This snippet has been automatically generated and should be regarded as a
732 # code template only.
733 # It will require modifications to work:
734 # - It may require correct/in-range values for request initialization.
735 # - It may require specifying regional endpoints when creating the service
736 # client as shown in:
737 # https://googleapis.dev/python/google-api-core/latest/client_options.html
738 from google.cloud import bigquery_storage_v1
739
740 async def sample_batch_commit_write_streams():
741 # Create a client
742 client = bigquery_storage_v1.BigQueryWriteAsyncClient()
743
744 # Initialize request argument(s)
745 request = bigquery_storage_v1.BatchCommitWriteStreamsRequest(
746 parent="parent_value",
747 write_streams=['write_streams_value1', 'write_streams_value2'],
748 )
749
750 # Make the request
751 response = await client.batch_commit_write_streams(request=request)
752
753 # Handle the response
754 print(response)
755
756 Args:
757 request (Optional[Union[google.cloud.bigquery_storage_v1.types.BatchCommitWriteStreamsRequest, dict]]):
758 The request object. Request message for ``BatchCommitWriteStreams``.
759 parent (:class:`str`):
760 Required. Parent table that all the streams should
761 belong to, in the form of
762 ``projects/{project}/datasets/{dataset}/tables/{table}``.
763
764 This corresponds to the ``parent`` field
765 on the ``request`` instance; if ``request`` is provided, this
766 should not be set.
767 retry (google.api_core.retry.Retry): Designation of what errors, if any,
768 should be retried.
769 timeout (float): The timeout for this request.
770 metadata (Sequence[Tuple[str, str]]): Strings which should be
771 sent along with the request as metadata.
772
773 Returns:
774 google.cloud.bigquery_storage_v1.types.BatchCommitWriteStreamsResponse:
775 Response message for BatchCommitWriteStreams.
776 """
777 # Create or coerce a protobuf request object.
778 # Quick check: If we got a request object, we should *not* have
779 # gotten any keyword arguments that map to the request.
780 has_flattened_params = any([parent])
781 if request is not None and has_flattened_params:
782 raise ValueError(
783 "If the `request` argument is set, then none of "
784 "the individual field arguments should be set."
785 )
786
787 request = storage.BatchCommitWriteStreamsRequest(request)
788
789 # If we have keyword arguments corresponding to fields on the
790 # request, apply these.
791 if parent is not None:
792 request.parent = parent
793
794 # Wrap the RPC method; this adds retry and timeout information,
795 # and friendly error handling.
796 rpc = gapic_v1.method_async.wrap_method(
797 self._client._transport.batch_commit_write_streams,
798 default_retry=retries.Retry(
799 initial=0.1,
800 maximum=60.0,
801 multiplier=1.3,
802 predicate=retries.if_exception_type(
803 core_exceptions.DeadlineExceeded,
804 core_exceptions.ServiceUnavailable,
805 ),
806 deadline=600.0,
807 ),
808 default_timeout=600.0,
809 client_info=DEFAULT_CLIENT_INFO,
810 )
811
812 # Certain fields should be provided within the metadata header;
813 # add these here.
814 metadata = tuple(metadata) + (
815 gapic_v1.routing_header.to_grpc_metadata((("parent", request.parent),)),
816 )
817
818 # Send the request.
819 response = await rpc(
820 request,
821 retry=retry,
822 timeout=timeout,
823 metadata=metadata,
824 )
825
826 # Done; return the response.
827 return response
828
829 async def flush_rows(
830 self,
831 request: Optional[Union[storage.FlushRowsRequest, dict]] = None,
832 *,
833 write_stream: Optional[str] = None,
834 retry: OptionalRetry = gapic_v1.method.DEFAULT,
835 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
836 metadata: Sequence[Tuple[str, str]] = (),
837 ) -> storage.FlushRowsResponse:
838 r"""Flushes rows to a BUFFERED stream.
839
840 If users are appending rows to BUFFERED stream, flush operation
841 is required in order for the rows to become available for
842 reading. A Flush operation flushes up to any previously flushed
843 offset in a BUFFERED stream, to the offset specified in the
844 request.
845
846 Flush is not supported on the \_default stream, since it is not
847 BUFFERED.
848
849 .. code-block:: python
850
851 # This snippet has been automatically generated and should be regarded as a
852 # code template only.
853 # It will require modifications to work:
854 # - It may require correct/in-range values for request initialization.
855 # - It may require specifying regional endpoints when creating the service
856 # client as shown in:
857 # https://googleapis.dev/python/google-api-core/latest/client_options.html
858 from google.cloud import bigquery_storage_v1
859
860 async def sample_flush_rows():
861 # Create a client
862 client = bigquery_storage_v1.BigQueryWriteAsyncClient()
863
864 # Initialize request argument(s)
865 request = bigquery_storage_v1.FlushRowsRequest(
866 write_stream="write_stream_value",
867 )
868
869 # Make the request
870 response = await client.flush_rows(request=request)
871
872 # Handle the response
873 print(response)
874
875 Args:
876 request (Optional[Union[google.cloud.bigquery_storage_v1.types.FlushRowsRequest, dict]]):
877 The request object. Request message for ``FlushRows``.
878 write_stream (:class:`str`):
879 Required. The stream that is the
880 target of the flush operation.
881
882 This corresponds to the ``write_stream`` field
883 on the ``request`` instance; if ``request`` is provided, this
884 should not be set.
885 retry (google.api_core.retry.Retry): Designation of what errors, if any,
886 should be retried.
887 timeout (float): The timeout for this request.
888 metadata (Sequence[Tuple[str, str]]): Strings which should be
889 sent along with the request as metadata.
890
891 Returns:
892 google.cloud.bigquery_storage_v1.types.FlushRowsResponse:
893 Respond message for FlushRows.
894 """
895 # Create or coerce a protobuf request object.
896 # Quick check: If we got a request object, we should *not* have
897 # gotten any keyword arguments that map to the request.
898 has_flattened_params = any([write_stream])
899 if request is not None and has_flattened_params:
900 raise ValueError(
901 "If the `request` argument is set, then none of "
902 "the individual field arguments should be set."
903 )
904
905 request = storage.FlushRowsRequest(request)
906
907 # If we have keyword arguments corresponding to fields on the
908 # request, apply these.
909 if write_stream is not None:
910 request.write_stream = write_stream
911
912 # Wrap the RPC method; this adds retry and timeout information,
913 # and friendly error handling.
914 rpc = gapic_v1.method_async.wrap_method(
915 self._client._transport.flush_rows,
916 default_retry=retries.Retry(
917 initial=0.1,
918 maximum=60.0,
919 multiplier=1.3,
920 predicate=retries.if_exception_type(
921 core_exceptions.DeadlineExceeded,
922 core_exceptions.ServiceUnavailable,
923 ),
924 deadline=600.0,
925 ),
926 default_timeout=600.0,
927 client_info=DEFAULT_CLIENT_INFO,
928 )
929
930 # Certain fields should be provided within the metadata header;
931 # add these here.
932 metadata = tuple(metadata) + (
933 gapic_v1.routing_header.to_grpc_metadata(
934 (("write_stream", request.write_stream),)
935 ),
936 )
937
938 # Send the request.
939 response = await rpc(
940 request,
941 retry=retry,
942 timeout=timeout,
943 metadata=metadata,
944 )
945
946 # Done; return the response.
947 return response
948
949 async def __aenter__(self):
950 return self
951
952 async def __aexit__(self, exc_type, exc, tb):
953 await self.transport.close()
954
955
956DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo(
957 gapic_version=package_version.__version__
958)
959
960
961__all__ = ("BigQueryWriteAsyncClient",)