1# -*- coding: utf-8 -*-
2# Copyright 2024 Google LLC
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16from collections import OrderedDict
17import functools
18import re
19from typing import (
20 AsyncIterable,
21 AsyncIterator,
22 Awaitable,
23 Dict,
24 Mapping,
25 MutableMapping,
26 MutableSequence,
27 Optional,
28 Sequence,
29 Tuple,
30 Type,
31 Union,
32)
33
34from google.api_core import exceptions as core_exceptions
35from google.api_core import gapic_v1
36from google.api_core import retry_async as retries
37from google.api_core.client_options import ClientOptions
38from google.auth import credentials as ga_credentials # type: ignore
39from google.oauth2 import service_account # type: ignore
40
41from google.cloud.bigquery_storage_v1 import gapic_version as package_version
42
43try:
44 OptionalRetry = Union[retries.AsyncRetry, gapic_v1.method._MethodDefault, None]
45except AttributeError: # pragma: NO COVER
46 OptionalRetry = Union[retries.AsyncRetry, object, None] # type: ignore
47
48from google.protobuf import timestamp_pb2 # type: ignore
49from google.rpc import status_pb2 # type: ignore
50
51from google.cloud.bigquery_storage_v1.types import storage, stream, table
52
53from .client import BigQueryWriteClient
54from .transports.base import DEFAULT_CLIENT_INFO, BigQueryWriteTransport
55from .transports.grpc_asyncio import BigQueryWriteGrpcAsyncIOTransport
56
57
58class BigQueryWriteAsyncClient:
59 """BigQuery Write API.
60
61 The Write API can be used to write data to BigQuery.
62
63 For supplementary information about the Write API, see:
64
65 https://cloud.google.com/bigquery/docs/write-api
66 """
67
68 _client: BigQueryWriteClient
69
70 # Copy defaults from the synchronous client for use here.
71 # Note: DEFAULT_ENDPOINT is deprecated. Use _DEFAULT_ENDPOINT_TEMPLATE instead.
72 DEFAULT_ENDPOINT = BigQueryWriteClient.DEFAULT_ENDPOINT
73 DEFAULT_MTLS_ENDPOINT = BigQueryWriteClient.DEFAULT_MTLS_ENDPOINT
74 _DEFAULT_ENDPOINT_TEMPLATE = BigQueryWriteClient._DEFAULT_ENDPOINT_TEMPLATE
75 _DEFAULT_UNIVERSE = BigQueryWriteClient._DEFAULT_UNIVERSE
76
77 table_path = staticmethod(BigQueryWriteClient.table_path)
78 parse_table_path = staticmethod(BigQueryWriteClient.parse_table_path)
79 write_stream_path = staticmethod(BigQueryWriteClient.write_stream_path)
80 parse_write_stream_path = staticmethod(BigQueryWriteClient.parse_write_stream_path)
81 common_billing_account_path = staticmethod(
82 BigQueryWriteClient.common_billing_account_path
83 )
84 parse_common_billing_account_path = staticmethod(
85 BigQueryWriteClient.parse_common_billing_account_path
86 )
87 common_folder_path = staticmethod(BigQueryWriteClient.common_folder_path)
88 parse_common_folder_path = staticmethod(
89 BigQueryWriteClient.parse_common_folder_path
90 )
91 common_organization_path = staticmethod(
92 BigQueryWriteClient.common_organization_path
93 )
94 parse_common_organization_path = staticmethod(
95 BigQueryWriteClient.parse_common_organization_path
96 )
97 common_project_path = staticmethod(BigQueryWriteClient.common_project_path)
98 parse_common_project_path = staticmethod(
99 BigQueryWriteClient.parse_common_project_path
100 )
101 common_location_path = staticmethod(BigQueryWriteClient.common_location_path)
102 parse_common_location_path = staticmethod(
103 BigQueryWriteClient.parse_common_location_path
104 )
105
106 @classmethod
107 def from_service_account_info(cls, info: dict, *args, **kwargs):
108 """Creates an instance of this client using the provided credentials
109 info.
110
111 Args:
112 info (dict): The service account private key info.
113 args: Additional arguments to pass to the constructor.
114 kwargs: Additional arguments to pass to the constructor.
115
116 Returns:
117 BigQueryWriteAsyncClient: The constructed client.
118 """
119 return BigQueryWriteClient.from_service_account_info.__func__(BigQueryWriteAsyncClient, info, *args, **kwargs) # type: ignore
120
121 @classmethod
122 def from_service_account_file(cls, filename: str, *args, **kwargs):
123 """Creates an instance of this client using the provided credentials
124 file.
125
126 Args:
127 filename (str): The path to the service account private key json
128 file.
129 args: Additional arguments to pass to the constructor.
130 kwargs: Additional arguments to pass to the constructor.
131
132 Returns:
133 BigQueryWriteAsyncClient: The constructed client.
134 """
135 return BigQueryWriteClient.from_service_account_file.__func__(BigQueryWriteAsyncClient, filename, *args, **kwargs) # type: ignore
136
137 from_service_account_json = from_service_account_file
138
139 @classmethod
140 def get_mtls_endpoint_and_cert_source(
141 cls, client_options: Optional[ClientOptions] = None
142 ):
143 """Return the API endpoint and client cert source for mutual TLS.
144
145 The client cert source is determined in the following order:
146 (1) if `GOOGLE_API_USE_CLIENT_CERTIFICATE` environment variable is not "true", the
147 client cert source is None.
148 (2) if `client_options.client_cert_source` is provided, use the provided one; if the
149 default client cert source exists, use the default one; otherwise the client cert
150 source is None.
151
152 The API endpoint is determined in the following order:
153 (1) if `client_options.api_endpoint` if provided, use the provided one.
154 (2) if `GOOGLE_API_USE_CLIENT_CERTIFICATE` environment variable is "always", use the
155 default mTLS endpoint; if the environment variable is "never", use the default API
156 endpoint; otherwise if client cert source exists, use the default mTLS endpoint, otherwise
157 use the default API endpoint.
158
159 More details can be found at https://google.aip.dev/auth/4114.
160
161 Args:
162 client_options (google.api_core.client_options.ClientOptions): Custom options for the
163 client. Only the `api_endpoint` and `client_cert_source` properties may be used
164 in this method.
165
166 Returns:
167 Tuple[str, Callable[[], Tuple[bytes, bytes]]]: returns the API endpoint and the
168 client cert source to use.
169
170 Raises:
171 google.auth.exceptions.MutualTLSChannelError: If any errors happen.
172 """
173 return BigQueryWriteClient.get_mtls_endpoint_and_cert_source(client_options) # type: ignore
174
175 @property
176 def transport(self) -> BigQueryWriteTransport:
177 """Returns the transport used by the client instance.
178
179 Returns:
180 BigQueryWriteTransport: The transport used by the client instance.
181 """
182 return self._client.transport
183
184 @property
185 def api_endpoint(self):
186 """Return the API endpoint used by the client instance.
187
188 Returns:
189 str: The API endpoint used by the client instance.
190 """
191 return self._client._api_endpoint
192
193 @property
194 def universe_domain(self) -> str:
195 """Return the universe domain used by the client instance.
196
197 Returns:
198 str: The universe domain used
199 by the client instance.
200 """
201 return self._client._universe_domain
202
203 get_transport_class = functools.partial(
204 type(BigQueryWriteClient).get_transport_class, type(BigQueryWriteClient)
205 )
206
207 def __init__(
208 self,
209 *,
210 credentials: Optional[ga_credentials.Credentials] = None,
211 transport: Union[str, BigQueryWriteTransport] = "grpc_asyncio",
212 client_options: Optional[ClientOptions] = None,
213 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO,
214 ) -> None:
215 """Instantiates the big query write async client.
216
217 Args:
218 credentials (Optional[google.auth.credentials.Credentials]): The
219 authorization credentials to attach to requests. These
220 credentials identify the application to the service; if none
221 are specified, the client will attempt to ascertain the
222 credentials from the environment.
223 transport (Union[str, ~.BigQueryWriteTransport]): The
224 transport to use. If set to None, a transport is chosen
225 automatically.
226 client_options (Optional[Union[google.api_core.client_options.ClientOptions, dict]]):
227 Custom options for the client.
228
229 1. The ``api_endpoint`` property can be used to override the
230 default endpoint provided by the client when ``transport`` is
231 not explicitly provided. Only if this property is not set and
232 ``transport`` was not explicitly provided, the endpoint is
233 determined by the GOOGLE_API_USE_MTLS_ENDPOINT environment
234 variable, which have one of the following values:
235 "always" (always use the default mTLS endpoint), "never" (always
236 use the default regular endpoint) and "auto" (auto-switch to the
237 default mTLS endpoint if client certificate is present; this is
238 the default value).
239
240 2. If the GOOGLE_API_USE_CLIENT_CERTIFICATE environment variable
241 is "true", then the ``client_cert_source`` property can be used
242 to provide a client certificate for mTLS transport. If
243 not provided, the default SSL client certificate will be used if
244 present. If GOOGLE_API_USE_CLIENT_CERTIFICATE is "false" or not
245 set, no client certificate will be used.
246
247 3. The ``universe_domain`` property can be used to override the
248 default "googleapis.com" universe. Note that ``api_endpoint``
249 property still takes precedence; and ``universe_domain`` is
250 currently not supported for mTLS.
251
252 client_info (google.api_core.gapic_v1.client_info.ClientInfo):
253 The client info used to send a user-agent string along with
254 API requests. If ``None``, then default info will be used.
255 Generally, you only need to set this if you're developing
256 your own client library.
257
258 Raises:
259 google.auth.exceptions.MutualTlsChannelError: If mutual TLS transport
260 creation failed for any reason.
261 """
262 self._client = BigQueryWriteClient(
263 credentials=credentials,
264 transport=transport,
265 client_options=client_options,
266 client_info=client_info,
267 )
268
269 async def create_write_stream(
270 self,
271 request: Optional[Union[storage.CreateWriteStreamRequest, dict]] = None,
272 *,
273 parent: Optional[str] = None,
274 write_stream: Optional[stream.WriteStream] = None,
275 retry: OptionalRetry = gapic_v1.method.DEFAULT,
276 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
277 metadata: Sequence[Tuple[str, str]] = (),
278 ) -> stream.WriteStream:
279 r"""Creates a write stream to the given table. Additionally, every
280 table has a special stream named '_default' to which data can be
281 written. This stream doesn't need to be created using
282 CreateWriteStream. It is a stream that can be used
283 simultaneously by any number of clients. Data written to this
284 stream is considered committed as soon as an acknowledgement is
285 received.
286
287 .. code-block:: python
288
289 # This snippet has been automatically generated and should be regarded as a
290 # code template only.
291 # It will require modifications to work:
292 # - It may require correct/in-range values for request initialization.
293 # - It may require specifying regional endpoints when creating the service
294 # client as shown in:
295 # https://googleapis.dev/python/google-api-core/latest/client_options.html
296 from google.cloud import bigquery_storage_v1
297
298 async def sample_create_write_stream():
299 # Create a client
300 client = bigquery_storage_v1.BigQueryWriteAsyncClient()
301
302 # Initialize request argument(s)
303 request = bigquery_storage_v1.CreateWriteStreamRequest(
304 parent="parent_value",
305 )
306
307 # Make the request
308 response = await client.create_write_stream(request=request)
309
310 # Handle the response
311 print(response)
312
313 Args:
314 request (Optional[Union[google.cloud.bigquery_storage_v1.types.CreateWriteStreamRequest, dict]]):
315 The request object. Request message for ``CreateWriteStream``.
316 parent (:class:`str`):
317 Required. Reference to the table to which the stream
318 belongs, in the format of
319 ``projects/{project}/datasets/{dataset}/tables/{table}``.
320
321 This corresponds to the ``parent`` field
322 on the ``request`` instance; if ``request`` is provided, this
323 should not be set.
324 write_stream (:class:`google.cloud.bigquery_storage_v1.types.WriteStream`):
325 Required. Stream to be created.
326 This corresponds to the ``write_stream`` field
327 on the ``request`` instance; if ``request`` is provided, this
328 should not be set.
329 retry (google.api_core.retry_async.AsyncRetry): Designation of what errors, if any,
330 should be retried.
331 timeout (float): The timeout for this request.
332 metadata (Sequence[Tuple[str, str]]): Strings which should be
333 sent along with the request as metadata.
334
335 Returns:
336 google.cloud.bigquery_storage_v1.types.WriteStream:
337 Information about a single stream
338 that gets data inside the storage
339 system.
340
341 """
342 # Create or coerce a protobuf request object.
343 # Quick check: If we got a request object, we should *not* have
344 # gotten any keyword arguments that map to the request.
345 has_flattened_params = any([parent, write_stream])
346 if request is not None and has_flattened_params:
347 raise ValueError(
348 "If the `request` argument is set, then none of "
349 "the individual field arguments should be set."
350 )
351
352 request = storage.CreateWriteStreamRequest(request)
353
354 # If we have keyword arguments corresponding to fields on the
355 # request, apply these.
356 if parent is not None:
357 request.parent = parent
358 if write_stream is not None:
359 request.write_stream = write_stream
360
361 # Wrap the RPC method; this adds retry and timeout information,
362 # and friendly error handling.
363 rpc = gapic_v1.method_async.wrap_method(
364 self._client._transport.create_write_stream,
365 default_retry=retries.AsyncRetry(
366 initial=10.0,
367 maximum=120.0,
368 multiplier=1.3,
369 predicate=retries.if_exception_type(
370 core_exceptions.DeadlineExceeded,
371 core_exceptions.ResourceExhausted,
372 core_exceptions.ServiceUnavailable,
373 ),
374 deadline=1200.0,
375 ),
376 default_timeout=1200.0,
377 client_info=DEFAULT_CLIENT_INFO,
378 )
379
380 # Certain fields should be provided within the metadata header;
381 # add these here.
382 metadata = tuple(metadata) + (
383 gapic_v1.routing_header.to_grpc_metadata((("parent", request.parent),)),
384 )
385
386 # Validate the universe domain.
387 self._client._validate_universe_domain()
388
389 # Send the request.
390 response = await rpc(
391 request,
392 retry=retry,
393 timeout=timeout,
394 metadata=metadata,
395 )
396
397 # Done; return the response.
398 return response
399
400 def append_rows(
401 self,
402 requests: Optional[AsyncIterator[storage.AppendRowsRequest]] = None,
403 *,
404 retry: OptionalRetry = gapic_v1.method.DEFAULT,
405 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
406 metadata: Sequence[Tuple[str, str]] = (),
407 ) -> Awaitable[AsyncIterable[storage.AppendRowsResponse]]:
408 r"""Appends data to the given stream.
409
410 If ``offset`` is specified, the ``offset`` is checked against
411 the end of stream. The server returns ``OUT_OF_RANGE`` in
412 ``AppendRowsResponse`` if an attempt is made to append to an
413 offset beyond the current end of the stream or
414 ``ALREADY_EXISTS`` if user provides an ``offset`` that has
415 already been written to. User can retry with adjusted offset
416 within the same RPC connection. If ``offset`` is not specified,
417 append happens at the end of the stream.
418
419 The response contains an optional offset at which the append
420 happened. No offset information will be returned for appends to
421 a default stream.
422
423 Responses are received in the same order in which requests are
424 sent. There will be one response for each successful inserted
425 request. Responses may optionally embed error information if the
426 originating AppendRequest was not successfully processed.
427
428 The specifics of when successfully appended data is made visible
429 to the table are governed by the type of stream:
430
431 - For COMMITTED streams (which includes the default stream),
432 data is visible immediately upon successful append.
433
434 - For BUFFERED streams, data is made visible via a subsequent
435 ``FlushRows`` rpc which advances a cursor to a newer offset
436 in the stream.
437
438 - For PENDING streams, data is not made visible until the
439 stream itself is finalized (via the ``FinalizeWriteStream``
440 rpc), and the stream is explicitly committed via the
441 ``BatchCommitWriteStreams`` rpc.
442
443 .. code-block:: python
444
445 # This snippet has been automatically generated and should be regarded as a
446 # code template only.
447 # It will require modifications to work:
448 # - It may require correct/in-range values for request initialization.
449 # - It may require specifying regional endpoints when creating the service
450 # client as shown in:
451 # https://googleapis.dev/python/google-api-core/latest/client_options.html
452 from google.cloud import bigquery_storage_v1
453
454 async def sample_append_rows():
455 # Create a client
456 client = bigquery_storage_v1.BigQueryWriteAsyncClient()
457
458 # Initialize request argument(s)
459 request = bigquery_storage_v1.AppendRowsRequest(
460 write_stream="write_stream_value",
461 )
462
463 # This method expects an iterator which contains
464 # 'bigquery_storage_v1.AppendRowsRequest' objects
465 # Here we create a generator that yields a single `request` for
466 # demonstrative purposes.
467 requests = [request]
468
469 def request_generator():
470 for request in requests:
471 yield request
472
473 # Make the request
474 stream = await client.append_rows(requests=request_generator())
475
476 # Handle the response
477 async for response in stream:
478 print(response)
479
480 Args:
481 requests (AsyncIterator[`google.cloud.bigquery_storage_v1.types.AppendRowsRequest`]):
482 The request object AsyncIterator. Request message for ``AppendRows``.
483
484 Because AppendRows is a bidirectional streaming RPC,
485 certain parts of the AppendRowsRequest need only be
486 specified for the first request before switching table
487 destinations. You can also switch table destinations
488 within the same connection for the default stream.
489
490 The size of a single AppendRowsRequest must be less than
491 10 MB in size. Requests larger than this return an
492 error, typically ``INVALID_ARGUMENT``.
493 retry (google.api_core.retry_async.AsyncRetry): Designation of what errors, if any,
494 should be retried.
495 timeout (float): The timeout for this request.
496 metadata (Sequence[Tuple[str, str]]): Strings which should be
497 sent along with the request as metadata.
498
499 Returns:
500 AsyncIterable[google.cloud.bigquery_storage_v1.types.AppendRowsResponse]:
501 Response message for AppendRows.
502 """
503
504 # Wrap the RPC method; this adds retry and timeout information,
505 # and friendly error handling.
506 rpc = gapic_v1.method_async.wrap_method(
507 self._client._transport.append_rows,
508 default_retry=retries.AsyncRetry(
509 initial=0.1,
510 maximum=60.0,
511 multiplier=1.3,
512 predicate=retries.if_exception_type(
513 core_exceptions.ServiceUnavailable,
514 ),
515 deadline=86400.0,
516 ),
517 default_timeout=86400.0,
518 client_info=DEFAULT_CLIENT_INFO,
519 )
520
521 # Certain fields should be provided within the metadata header;
522 # add these here.
523 metadata = tuple(metadata) + (gapic_v1.routing_header.to_grpc_metadata(()),)
524
525 # Validate the universe domain.
526 self._client._validate_universe_domain()
527
528 # Send the request.
529 response = rpc(
530 requests,
531 retry=retry,
532 timeout=timeout,
533 metadata=metadata,
534 )
535
536 # Done; return the response.
537 return response
538
539 async def get_write_stream(
540 self,
541 request: Optional[Union[storage.GetWriteStreamRequest, dict]] = None,
542 *,
543 name: Optional[str] = None,
544 retry: OptionalRetry = gapic_v1.method.DEFAULT,
545 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
546 metadata: Sequence[Tuple[str, str]] = (),
547 ) -> stream.WriteStream:
548 r"""Gets information about a write stream.
549
550 .. code-block:: python
551
552 # This snippet has been automatically generated and should be regarded as a
553 # code template only.
554 # It will require modifications to work:
555 # - It may require correct/in-range values for request initialization.
556 # - It may require specifying regional endpoints when creating the service
557 # client as shown in:
558 # https://googleapis.dev/python/google-api-core/latest/client_options.html
559 from google.cloud import bigquery_storage_v1
560
561 async def sample_get_write_stream():
562 # Create a client
563 client = bigquery_storage_v1.BigQueryWriteAsyncClient()
564
565 # Initialize request argument(s)
566 request = bigquery_storage_v1.GetWriteStreamRequest(
567 name="name_value",
568 )
569
570 # Make the request
571 response = await client.get_write_stream(request=request)
572
573 # Handle the response
574 print(response)
575
576 Args:
577 request (Optional[Union[google.cloud.bigquery_storage_v1.types.GetWriteStreamRequest, dict]]):
578 The request object. Request message for ``GetWriteStreamRequest``.
579 name (:class:`str`):
580 Required. Name of the stream to get, in the form of
581 ``projects/{project}/datasets/{dataset}/tables/{table}/streams/{stream}``.
582
583 This corresponds to the ``name`` field
584 on the ``request`` instance; if ``request`` is provided, this
585 should not be set.
586 retry (google.api_core.retry_async.AsyncRetry): Designation of what errors, if any,
587 should be retried.
588 timeout (float): The timeout for this request.
589 metadata (Sequence[Tuple[str, str]]): Strings which should be
590 sent along with the request as metadata.
591
592 Returns:
593 google.cloud.bigquery_storage_v1.types.WriteStream:
594 Information about a single stream
595 that gets data inside the storage
596 system.
597
598 """
599 # Create or coerce a protobuf request object.
600 # Quick check: If we got a request object, we should *not* have
601 # gotten any keyword arguments that map to the request.
602 has_flattened_params = any([name])
603 if request is not None and has_flattened_params:
604 raise ValueError(
605 "If the `request` argument is set, then none of "
606 "the individual field arguments should be set."
607 )
608
609 request = storage.GetWriteStreamRequest(request)
610
611 # If we have keyword arguments corresponding to fields on the
612 # request, apply these.
613 if name is not None:
614 request.name = name
615
616 # Wrap the RPC method; this adds retry and timeout information,
617 # and friendly error handling.
618 rpc = gapic_v1.method_async.wrap_method(
619 self._client._transport.get_write_stream,
620 default_retry=retries.AsyncRetry(
621 initial=0.1,
622 maximum=60.0,
623 multiplier=1.3,
624 predicate=retries.if_exception_type(
625 core_exceptions.DeadlineExceeded,
626 core_exceptions.ResourceExhausted,
627 core_exceptions.ServiceUnavailable,
628 ),
629 deadline=600.0,
630 ),
631 default_timeout=600.0,
632 client_info=DEFAULT_CLIENT_INFO,
633 )
634
635 # Certain fields should be provided within the metadata header;
636 # add these here.
637 metadata = tuple(metadata) + (
638 gapic_v1.routing_header.to_grpc_metadata((("name", request.name),)),
639 )
640
641 # Validate the universe domain.
642 self._client._validate_universe_domain()
643
644 # Send the request.
645 response = await rpc(
646 request,
647 retry=retry,
648 timeout=timeout,
649 metadata=metadata,
650 )
651
652 # Done; return the response.
653 return response
654
655 async def finalize_write_stream(
656 self,
657 request: Optional[Union[storage.FinalizeWriteStreamRequest, dict]] = None,
658 *,
659 name: Optional[str] = None,
660 retry: OptionalRetry = gapic_v1.method.DEFAULT,
661 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
662 metadata: Sequence[Tuple[str, str]] = (),
663 ) -> storage.FinalizeWriteStreamResponse:
664 r"""Finalize a write stream so that no new data can be appended to
665 the stream. Finalize is not supported on the '_default' stream.
666
667 .. code-block:: python
668
669 # This snippet has been automatically generated and should be regarded as a
670 # code template only.
671 # It will require modifications to work:
672 # - It may require correct/in-range values for request initialization.
673 # - It may require specifying regional endpoints when creating the service
674 # client as shown in:
675 # https://googleapis.dev/python/google-api-core/latest/client_options.html
676 from google.cloud import bigquery_storage_v1
677
678 async def sample_finalize_write_stream():
679 # Create a client
680 client = bigquery_storage_v1.BigQueryWriteAsyncClient()
681
682 # Initialize request argument(s)
683 request = bigquery_storage_v1.FinalizeWriteStreamRequest(
684 name="name_value",
685 )
686
687 # Make the request
688 response = await client.finalize_write_stream(request=request)
689
690 # Handle the response
691 print(response)
692
693 Args:
694 request (Optional[Union[google.cloud.bigquery_storage_v1.types.FinalizeWriteStreamRequest, dict]]):
695 The request object. Request message for invoking ``FinalizeWriteStream``.
696 name (:class:`str`):
697 Required. Name of the stream to finalize, in the form of
698 ``projects/{project}/datasets/{dataset}/tables/{table}/streams/{stream}``.
699
700 This corresponds to the ``name`` field
701 on the ``request`` instance; if ``request`` is provided, this
702 should not be set.
703 retry (google.api_core.retry_async.AsyncRetry): Designation of what errors, if any,
704 should be retried.
705 timeout (float): The timeout for this request.
706 metadata (Sequence[Tuple[str, str]]): Strings which should be
707 sent along with the request as metadata.
708
709 Returns:
710 google.cloud.bigquery_storage_v1.types.FinalizeWriteStreamResponse:
711 Response message for FinalizeWriteStream.
712 """
713 # Create or coerce a protobuf request object.
714 # Quick check: If we got a request object, we should *not* have
715 # gotten any keyword arguments that map to the request.
716 has_flattened_params = any([name])
717 if request is not None and has_flattened_params:
718 raise ValueError(
719 "If the `request` argument is set, then none of "
720 "the individual field arguments should be set."
721 )
722
723 request = storage.FinalizeWriteStreamRequest(request)
724
725 # If we have keyword arguments corresponding to fields on the
726 # request, apply these.
727 if name is not None:
728 request.name = name
729
730 # Wrap the RPC method; this adds retry and timeout information,
731 # and friendly error handling.
732 rpc = gapic_v1.method_async.wrap_method(
733 self._client._transport.finalize_write_stream,
734 default_retry=retries.AsyncRetry(
735 initial=0.1,
736 maximum=60.0,
737 multiplier=1.3,
738 predicate=retries.if_exception_type(
739 core_exceptions.DeadlineExceeded,
740 core_exceptions.ResourceExhausted,
741 core_exceptions.ServiceUnavailable,
742 ),
743 deadline=600.0,
744 ),
745 default_timeout=600.0,
746 client_info=DEFAULT_CLIENT_INFO,
747 )
748
749 # Certain fields should be provided within the metadata header;
750 # add these here.
751 metadata = tuple(metadata) + (
752 gapic_v1.routing_header.to_grpc_metadata((("name", request.name),)),
753 )
754
755 # Validate the universe domain.
756 self._client._validate_universe_domain()
757
758 # Send the request.
759 response = await rpc(
760 request,
761 retry=retry,
762 timeout=timeout,
763 metadata=metadata,
764 )
765
766 # Done; return the response.
767 return response
768
769 async def batch_commit_write_streams(
770 self,
771 request: Optional[Union[storage.BatchCommitWriteStreamsRequest, dict]] = None,
772 *,
773 parent: Optional[str] = None,
774 retry: OptionalRetry = gapic_v1.method.DEFAULT,
775 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
776 metadata: Sequence[Tuple[str, str]] = (),
777 ) -> storage.BatchCommitWriteStreamsResponse:
778 r"""Atomically commits a group of ``PENDING`` streams that belong to
779 the same ``parent`` table.
780
781 Streams must be finalized before commit and cannot be committed
782 multiple times. Once a stream is committed, data in the stream
783 becomes available for read operations.
784
785 .. code-block:: python
786
787 # This snippet has been automatically generated and should be regarded as a
788 # code template only.
789 # It will require modifications to work:
790 # - It may require correct/in-range values for request initialization.
791 # - It may require specifying regional endpoints when creating the service
792 # client as shown in:
793 # https://googleapis.dev/python/google-api-core/latest/client_options.html
794 from google.cloud import bigquery_storage_v1
795
796 async def sample_batch_commit_write_streams():
797 # Create a client
798 client = bigquery_storage_v1.BigQueryWriteAsyncClient()
799
800 # Initialize request argument(s)
801 request = bigquery_storage_v1.BatchCommitWriteStreamsRequest(
802 parent="parent_value",
803 write_streams=['write_streams_value1', 'write_streams_value2'],
804 )
805
806 # Make the request
807 response = await client.batch_commit_write_streams(request=request)
808
809 # Handle the response
810 print(response)
811
812 Args:
813 request (Optional[Union[google.cloud.bigquery_storage_v1.types.BatchCommitWriteStreamsRequest, dict]]):
814 The request object. Request message for ``BatchCommitWriteStreams``.
815 parent (:class:`str`):
816 Required. Parent table that all the streams should
817 belong to, in the form of
818 ``projects/{project}/datasets/{dataset}/tables/{table}``.
819
820 This corresponds to the ``parent`` field
821 on the ``request`` instance; if ``request`` is provided, this
822 should not be set.
823 retry (google.api_core.retry_async.AsyncRetry): Designation of what errors, if any,
824 should be retried.
825 timeout (float): The timeout for this request.
826 metadata (Sequence[Tuple[str, str]]): Strings which should be
827 sent along with the request as metadata.
828
829 Returns:
830 google.cloud.bigquery_storage_v1.types.BatchCommitWriteStreamsResponse:
831 Response message for BatchCommitWriteStreams.
832 """
833 # Create or coerce a protobuf request object.
834 # Quick check: If we got a request object, we should *not* have
835 # gotten any keyword arguments that map to the request.
836 has_flattened_params = any([parent])
837 if request is not None and has_flattened_params:
838 raise ValueError(
839 "If the `request` argument is set, then none of "
840 "the individual field arguments should be set."
841 )
842
843 request = storage.BatchCommitWriteStreamsRequest(request)
844
845 # If we have keyword arguments corresponding to fields on the
846 # request, apply these.
847 if parent is not None:
848 request.parent = parent
849
850 # Wrap the RPC method; this adds retry and timeout information,
851 # and friendly error handling.
852 rpc = gapic_v1.method_async.wrap_method(
853 self._client._transport.batch_commit_write_streams,
854 default_retry=retries.AsyncRetry(
855 initial=0.1,
856 maximum=60.0,
857 multiplier=1.3,
858 predicate=retries.if_exception_type(
859 core_exceptions.DeadlineExceeded,
860 core_exceptions.ResourceExhausted,
861 core_exceptions.ServiceUnavailable,
862 ),
863 deadline=600.0,
864 ),
865 default_timeout=600.0,
866 client_info=DEFAULT_CLIENT_INFO,
867 )
868
869 # Certain fields should be provided within the metadata header;
870 # add these here.
871 metadata = tuple(metadata) + (
872 gapic_v1.routing_header.to_grpc_metadata((("parent", request.parent),)),
873 )
874
875 # Validate the universe domain.
876 self._client._validate_universe_domain()
877
878 # Send the request.
879 response = await rpc(
880 request,
881 retry=retry,
882 timeout=timeout,
883 metadata=metadata,
884 )
885
886 # Done; return the response.
887 return response
888
889 async def flush_rows(
890 self,
891 request: Optional[Union[storage.FlushRowsRequest, dict]] = None,
892 *,
893 write_stream: Optional[str] = None,
894 retry: OptionalRetry = gapic_v1.method.DEFAULT,
895 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
896 metadata: Sequence[Tuple[str, str]] = (),
897 ) -> storage.FlushRowsResponse:
898 r"""Flushes rows to a BUFFERED stream.
899
900 If users are appending rows to BUFFERED stream, flush operation
901 is required in order for the rows to become available for
902 reading. A Flush operation flushes up to any previously flushed
903 offset in a BUFFERED stream, to the offset specified in the
904 request.
905
906 Flush is not supported on the \_default stream, since it is not
907 BUFFERED.
908
909 .. code-block:: python
910
911 # This snippet has been automatically generated and should be regarded as a
912 # code template only.
913 # It will require modifications to work:
914 # - It may require correct/in-range values for request initialization.
915 # - It may require specifying regional endpoints when creating the service
916 # client as shown in:
917 # https://googleapis.dev/python/google-api-core/latest/client_options.html
918 from google.cloud import bigquery_storage_v1
919
920 async def sample_flush_rows():
921 # Create a client
922 client = bigquery_storage_v1.BigQueryWriteAsyncClient()
923
924 # Initialize request argument(s)
925 request = bigquery_storage_v1.FlushRowsRequest(
926 write_stream="write_stream_value",
927 )
928
929 # Make the request
930 response = await client.flush_rows(request=request)
931
932 # Handle the response
933 print(response)
934
935 Args:
936 request (Optional[Union[google.cloud.bigquery_storage_v1.types.FlushRowsRequest, dict]]):
937 The request object. Request message for ``FlushRows``.
938 write_stream (:class:`str`):
939 Required. The stream that is the
940 target of the flush operation.
941
942 This corresponds to the ``write_stream`` field
943 on the ``request`` instance; if ``request`` is provided, this
944 should not be set.
945 retry (google.api_core.retry_async.AsyncRetry): Designation of what errors, if any,
946 should be retried.
947 timeout (float): The timeout for this request.
948 metadata (Sequence[Tuple[str, str]]): Strings which should be
949 sent along with the request as metadata.
950
951 Returns:
952 google.cloud.bigquery_storage_v1.types.FlushRowsResponse:
953 Respond message for FlushRows.
954 """
955 # Create or coerce a protobuf request object.
956 # Quick check: If we got a request object, we should *not* have
957 # gotten any keyword arguments that map to the request.
958 has_flattened_params = any([write_stream])
959 if request is not None and has_flattened_params:
960 raise ValueError(
961 "If the `request` argument is set, then none of "
962 "the individual field arguments should be set."
963 )
964
965 request = storage.FlushRowsRequest(request)
966
967 # If we have keyword arguments corresponding to fields on the
968 # request, apply these.
969 if write_stream is not None:
970 request.write_stream = write_stream
971
972 # Wrap the RPC method; this adds retry and timeout information,
973 # and friendly error handling.
974 rpc = gapic_v1.method_async.wrap_method(
975 self._client._transport.flush_rows,
976 default_retry=retries.AsyncRetry(
977 initial=0.1,
978 maximum=60.0,
979 multiplier=1.3,
980 predicate=retries.if_exception_type(
981 core_exceptions.DeadlineExceeded,
982 core_exceptions.ResourceExhausted,
983 core_exceptions.ServiceUnavailable,
984 ),
985 deadline=600.0,
986 ),
987 default_timeout=600.0,
988 client_info=DEFAULT_CLIENT_INFO,
989 )
990
991 # Certain fields should be provided within the metadata header;
992 # add these here.
993 metadata = tuple(metadata) + (
994 gapic_v1.routing_header.to_grpc_metadata(
995 (("write_stream", request.write_stream),)
996 ),
997 )
998
999 # Validate the universe domain.
1000 self._client._validate_universe_domain()
1001
1002 # Send the request.
1003 response = await rpc(
1004 request,
1005 retry=retry,
1006 timeout=timeout,
1007 metadata=metadata,
1008 )
1009
1010 # Done; return the response.
1011 return response
1012
1013 async def __aenter__(self) -> "BigQueryWriteAsyncClient":
1014 return self
1015
1016 async def __aexit__(self, exc_type, exc, tb):
1017 await self.transport.close()
1018
1019
1020DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo(
1021 gapic_version=package_version.__version__
1022)
1023
1024
1025__all__ = ("BigQueryWriteAsyncClient",)