1# -*- coding: utf-8 -*-
2# Copyright 2025 Google LLC
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16from collections import OrderedDict
17import logging as std_logging
18import re
19from typing import (
20 AsyncIterable,
21 AsyncIterator,
22 Awaitable,
23 Callable,
24 Dict,
25 Mapping,
26 MutableMapping,
27 MutableSequence,
28 Optional,
29 Sequence,
30 Tuple,
31 Type,
32 Union,
33)
34
35from google.api_core import exceptions as core_exceptions
36from google.api_core import gapic_v1
37from google.api_core import retry_async as retries
38from google.api_core.client_options import ClientOptions
39from google.auth import credentials as ga_credentials # type: ignore
40from google.oauth2 import service_account # type: ignore
41import google.protobuf
42
43from google.cloud.bigquery_storage_v1 import gapic_version as package_version
44
45try:
46 OptionalRetry = Union[retries.AsyncRetry, gapic_v1.method._MethodDefault, None]
47except AttributeError: # pragma: NO COVER
48 OptionalRetry = Union[retries.AsyncRetry, object, None] # type: ignore
49
50from google.protobuf import timestamp_pb2 # type: ignore
51from google.rpc import status_pb2 # type: ignore
52
53from google.cloud.bigquery_storage_v1.types import storage, stream, table
54
55from .client import BigQueryWriteClient
56from .transports.base import DEFAULT_CLIENT_INFO, BigQueryWriteTransport
57from .transports.grpc_asyncio import BigQueryWriteGrpcAsyncIOTransport
58
59try:
60 from google.api_core import client_logging # type: ignore
61
62 CLIENT_LOGGING_SUPPORTED = True # pragma: NO COVER
63except ImportError: # pragma: NO COVER
64 CLIENT_LOGGING_SUPPORTED = False
65
66_LOGGER = std_logging.getLogger(__name__)
67
68
69class BigQueryWriteAsyncClient:
70 """BigQuery Write API.
71
72 The Write API can be used to write data to BigQuery.
73
74 For supplementary information about the Write API, see:
75
76 https://cloud.google.com/bigquery/docs/write-api
77 """
78
79 _client: BigQueryWriteClient
80
81 # Copy defaults from the synchronous client for use here.
82 # Note: DEFAULT_ENDPOINT is deprecated. Use _DEFAULT_ENDPOINT_TEMPLATE instead.
83 DEFAULT_ENDPOINT = BigQueryWriteClient.DEFAULT_ENDPOINT
84 DEFAULT_MTLS_ENDPOINT = BigQueryWriteClient.DEFAULT_MTLS_ENDPOINT
85 _DEFAULT_ENDPOINT_TEMPLATE = BigQueryWriteClient._DEFAULT_ENDPOINT_TEMPLATE
86 _DEFAULT_UNIVERSE = BigQueryWriteClient._DEFAULT_UNIVERSE
87
88 table_path = staticmethod(BigQueryWriteClient.table_path)
89 parse_table_path = staticmethod(BigQueryWriteClient.parse_table_path)
90 write_stream_path = staticmethod(BigQueryWriteClient.write_stream_path)
91 parse_write_stream_path = staticmethod(BigQueryWriteClient.parse_write_stream_path)
92 common_billing_account_path = staticmethod(
93 BigQueryWriteClient.common_billing_account_path
94 )
95 parse_common_billing_account_path = staticmethod(
96 BigQueryWriteClient.parse_common_billing_account_path
97 )
98 common_folder_path = staticmethod(BigQueryWriteClient.common_folder_path)
99 parse_common_folder_path = staticmethod(
100 BigQueryWriteClient.parse_common_folder_path
101 )
102 common_organization_path = staticmethod(
103 BigQueryWriteClient.common_organization_path
104 )
105 parse_common_organization_path = staticmethod(
106 BigQueryWriteClient.parse_common_organization_path
107 )
108 common_project_path = staticmethod(BigQueryWriteClient.common_project_path)
109 parse_common_project_path = staticmethod(
110 BigQueryWriteClient.parse_common_project_path
111 )
112 common_location_path = staticmethod(BigQueryWriteClient.common_location_path)
113 parse_common_location_path = staticmethod(
114 BigQueryWriteClient.parse_common_location_path
115 )
116
117 @classmethod
118 def from_service_account_info(cls, info: dict, *args, **kwargs):
119 """Creates an instance of this client using the provided credentials
120 info.
121
122 Args:
123 info (dict): The service account private key info.
124 args: Additional arguments to pass to the constructor.
125 kwargs: Additional arguments to pass to the constructor.
126
127 Returns:
128 BigQueryWriteAsyncClient: The constructed client.
129 """
130 return BigQueryWriteClient.from_service_account_info.__func__(BigQueryWriteAsyncClient, info, *args, **kwargs) # type: ignore
131
132 @classmethod
133 def from_service_account_file(cls, filename: str, *args, **kwargs):
134 """Creates an instance of this client using the provided credentials
135 file.
136
137 Args:
138 filename (str): The path to the service account private key json
139 file.
140 args: Additional arguments to pass to the constructor.
141 kwargs: Additional arguments to pass to the constructor.
142
143 Returns:
144 BigQueryWriteAsyncClient: The constructed client.
145 """
146 return BigQueryWriteClient.from_service_account_file.__func__(BigQueryWriteAsyncClient, filename, *args, **kwargs) # type: ignore
147
148 from_service_account_json = from_service_account_file
149
150 @classmethod
151 def get_mtls_endpoint_and_cert_source(
152 cls, client_options: Optional[ClientOptions] = None
153 ):
154 """Return the API endpoint and client cert source for mutual TLS.
155
156 The client cert source is determined in the following order:
157 (1) if `GOOGLE_API_USE_CLIENT_CERTIFICATE` environment variable is not "true", the
158 client cert source is None.
159 (2) if `client_options.client_cert_source` is provided, use the provided one; if the
160 default client cert source exists, use the default one; otherwise the client cert
161 source is None.
162
163 The API endpoint is determined in the following order:
164 (1) if `client_options.api_endpoint` if provided, use the provided one.
165 (2) if `GOOGLE_API_USE_CLIENT_CERTIFICATE` environment variable is "always", use the
166 default mTLS endpoint; if the environment variable is "never", use the default API
167 endpoint; otherwise if client cert source exists, use the default mTLS endpoint, otherwise
168 use the default API endpoint.
169
170 More details can be found at https://google.aip.dev/auth/4114.
171
172 Args:
173 client_options (google.api_core.client_options.ClientOptions): Custom options for the
174 client. Only the `api_endpoint` and `client_cert_source` properties may be used
175 in this method.
176
177 Returns:
178 Tuple[str, Callable[[], Tuple[bytes, bytes]]]: returns the API endpoint and the
179 client cert source to use.
180
181 Raises:
182 google.auth.exceptions.MutualTLSChannelError: If any errors happen.
183 """
184 return BigQueryWriteClient.get_mtls_endpoint_and_cert_source(client_options) # type: ignore
185
186 @property
187 def transport(self) -> BigQueryWriteTransport:
188 """Returns the transport used by the client instance.
189
190 Returns:
191 BigQueryWriteTransport: The transport used by the client instance.
192 """
193 return self._client.transport
194
195 @property
196 def api_endpoint(self):
197 """Return the API endpoint used by the client instance.
198
199 Returns:
200 str: The API endpoint used by the client instance.
201 """
202 return self._client._api_endpoint
203
204 @property
205 def universe_domain(self) -> str:
206 """Return the universe domain used by the client instance.
207
208 Returns:
209 str: The universe domain used
210 by the client instance.
211 """
212 return self._client._universe_domain
213
214 get_transport_class = BigQueryWriteClient.get_transport_class
215
216 def __init__(
217 self,
218 *,
219 credentials: Optional[ga_credentials.Credentials] = None,
220 transport: Optional[
221 Union[str, BigQueryWriteTransport, Callable[..., BigQueryWriteTransport]]
222 ] = "grpc_asyncio",
223 client_options: Optional[ClientOptions] = None,
224 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO,
225 ) -> None:
226 """Instantiates the big query write async client.
227
228 Args:
229 credentials (Optional[google.auth.credentials.Credentials]): The
230 authorization credentials to attach to requests. These
231 credentials identify the application to the service; if none
232 are specified, the client will attempt to ascertain the
233 credentials from the environment.
234 transport (Optional[Union[str,BigQueryWriteTransport,Callable[..., BigQueryWriteTransport]]]):
235 The transport to use, or a Callable that constructs and returns a new transport to use.
236 If a Callable is given, it will be called with the same set of initialization
237 arguments as used in the BigQueryWriteTransport constructor.
238 If set to None, a transport is chosen automatically.
239 client_options (Optional[Union[google.api_core.client_options.ClientOptions, dict]]):
240 Custom options for the client.
241
242 1. The ``api_endpoint`` property can be used to override the
243 default endpoint provided by the client when ``transport`` is
244 not explicitly provided. Only if this property is not set and
245 ``transport`` was not explicitly provided, the endpoint is
246 determined by the GOOGLE_API_USE_MTLS_ENDPOINT environment
247 variable, which have one of the following values:
248 "always" (always use the default mTLS endpoint), "never" (always
249 use the default regular endpoint) and "auto" (auto-switch to the
250 default mTLS endpoint if client certificate is present; this is
251 the default value).
252
253 2. If the GOOGLE_API_USE_CLIENT_CERTIFICATE environment variable
254 is "true", then the ``client_cert_source`` property can be used
255 to provide a client certificate for mTLS transport. If
256 not provided, the default SSL client certificate will be used if
257 present. If GOOGLE_API_USE_CLIENT_CERTIFICATE is "false" or not
258 set, no client certificate will be used.
259
260 3. The ``universe_domain`` property can be used to override the
261 default "googleapis.com" universe. Note that ``api_endpoint``
262 property still takes precedence; and ``universe_domain`` is
263 currently not supported for mTLS.
264
265 client_info (google.api_core.gapic_v1.client_info.ClientInfo):
266 The client info used to send a user-agent string along with
267 API requests. If ``None``, then default info will be used.
268 Generally, you only need to set this if you're developing
269 your own client library.
270
271 Raises:
272 google.auth.exceptions.MutualTlsChannelError: If mutual TLS transport
273 creation failed for any reason.
274 """
275 self._client = BigQueryWriteClient(
276 credentials=credentials,
277 transport=transport,
278 client_options=client_options,
279 client_info=client_info,
280 )
281
282 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(
283 std_logging.DEBUG
284 ): # pragma: NO COVER
285 _LOGGER.debug(
286 "Created client `google.cloud.bigquery.storage_v1.BigQueryWriteAsyncClient`.",
287 extra={
288 "serviceName": "google.cloud.bigquery.storage.v1.BigQueryWrite",
289 "universeDomain": getattr(
290 self._client._transport._credentials, "universe_domain", ""
291 ),
292 "credentialsType": f"{type(self._client._transport._credentials).__module__}.{type(self._client._transport._credentials).__qualname__}",
293 "credentialsInfo": getattr(
294 self.transport._credentials, "get_cred_info", lambda: None
295 )(),
296 }
297 if hasattr(self._client._transport, "_credentials")
298 else {
299 "serviceName": "google.cloud.bigquery.storage.v1.BigQueryWrite",
300 "credentialsType": None,
301 },
302 )
303
304 async def create_write_stream(
305 self,
306 request: Optional[Union[storage.CreateWriteStreamRequest, dict]] = None,
307 *,
308 parent: Optional[str] = None,
309 write_stream: Optional[stream.WriteStream] = None,
310 retry: OptionalRetry = gapic_v1.method.DEFAULT,
311 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
312 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
313 ) -> stream.WriteStream:
314 r"""Creates a write stream to the given table. Additionally, every
315 table has a special stream named '_default' to which data can be
316 written. This stream doesn't need to be created using
317 CreateWriteStream. It is a stream that can be used
318 simultaneously by any number of clients. Data written to this
319 stream is considered committed as soon as an acknowledgement is
320 received.
321
322 .. code-block:: python
323
324 # This snippet has been automatically generated and should be regarded as a
325 # code template only.
326 # It will require modifications to work:
327 # - It may require correct/in-range values for request initialization.
328 # - It may require specifying regional endpoints when creating the service
329 # client as shown in:
330 # https://googleapis.dev/python/google-api-core/latest/client_options.html
331 from google.cloud import bigquery_storage_v1
332
333 async def sample_create_write_stream():
334 # Create a client
335 client = bigquery_storage_v1.BigQueryWriteAsyncClient()
336
337 # Initialize request argument(s)
338 request = bigquery_storage_v1.CreateWriteStreamRequest(
339 parent="parent_value",
340 )
341
342 # Make the request
343 response = await client.create_write_stream(request=request)
344
345 # Handle the response
346 print(response)
347
348 Args:
349 request (Optional[Union[google.cloud.bigquery_storage_v1.types.CreateWriteStreamRequest, dict]]):
350 The request object. Request message for ``CreateWriteStream``.
351 parent (:class:`str`):
352 Required. Reference to the table to which the stream
353 belongs, in the format of
354 ``projects/{project}/datasets/{dataset}/tables/{table}``.
355
356 This corresponds to the ``parent`` field
357 on the ``request`` instance; if ``request`` is provided, this
358 should not be set.
359 write_stream (:class:`google.cloud.bigquery_storage_v1.types.WriteStream`):
360 Required. Stream to be created.
361 This corresponds to the ``write_stream`` field
362 on the ``request`` instance; if ``request`` is provided, this
363 should not be set.
364 retry (google.api_core.retry_async.AsyncRetry): Designation of what errors, if any,
365 should be retried.
366 timeout (float): The timeout for this request.
367 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
368 sent along with the request as metadata. Normally, each value must be of type `str`,
369 but for metadata keys ending with the suffix `-bin`, the corresponding values must
370 be of type `bytes`.
371
372 Returns:
373 google.cloud.bigquery_storage_v1.types.WriteStream:
374 Information about a single stream
375 that gets data inside the storage
376 system.
377
378 """
379 # Create or coerce a protobuf request object.
380 # - Quick check: If we got a request object, we should *not* have
381 # gotten any keyword arguments that map to the request.
382 flattened_params = [parent, write_stream]
383 has_flattened_params = (
384 len([param for param in flattened_params if param is not None]) > 0
385 )
386 if request is not None and has_flattened_params:
387 raise ValueError(
388 "If the `request` argument is set, then none of "
389 "the individual field arguments should be set."
390 )
391
392 # - Use the request object if provided (there's no risk of modifying the input as
393 # there are no flattened fields), or create one.
394 if not isinstance(request, storage.CreateWriteStreamRequest):
395 request = storage.CreateWriteStreamRequest(request)
396
397 # If we have keyword arguments corresponding to fields on the
398 # request, apply these.
399 if parent is not None:
400 request.parent = parent
401 if write_stream is not None:
402 request.write_stream = write_stream
403
404 # Wrap the RPC method; this adds retry and timeout information,
405 # and friendly error handling.
406 rpc = self._client._transport._wrapped_methods[
407 self._client._transport.create_write_stream
408 ]
409
410 # Certain fields should be provided within the metadata header;
411 # add these here.
412 metadata = tuple(metadata) + (
413 gapic_v1.routing_header.to_grpc_metadata((("parent", request.parent),)),
414 )
415
416 # Validate the universe domain.
417 self._client._validate_universe_domain()
418
419 # Send the request.
420 response = await rpc(
421 request,
422 retry=retry,
423 timeout=timeout,
424 metadata=metadata,
425 )
426
427 # Done; return the response.
428 return response
429
430 def append_rows(
431 self,
432 requests: Optional[AsyncIterator[storage.AppendRowsRequest]] = None,
433 *,
434 retry: OptionalRetry = gapic_v1.method.DEFAULT,
435 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
436 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
437 ) -> Awaitable[AsyncIterable[storage.AppendRowsResponse]]:
438 r"""Appends data to the given stream.
439
440 If ``offset`` is specified, the ``offset`` is checked against
441 the end of stream. The server returns ``OUT_OF_RANGE`` in
442 ``AppendRowsResponse`` if an attempt is made to append to an
443 offset beyond the current end of the stream or
444 ``ALREADY_EXISTS`` if user provides an ``offset`` that has
445 already been written to. User can retry with adjusted offset
446 within the same RPC connection. If ``offset`` is not specified,
447 append happens at the end of the stream.
448
449 The response contains an optional offset at which the append
450 happened. No offset information will be returned for appends to
451 a default stream.
452
453 Responses are received in the same order in which requests are
454 sent. There will be one response for each successful inserted
455 request. Responses may optionally embed error information if the
456 originating AppendRequest was not successfully processed.
457
458 The specifics of when successfully appended data is made visible
459 to the table are governed by the type of stream:
460
461 - For COMMITTED streams (which includes the default stream),
462 data is visible immediately upon successful append.
463
464 - For BUFFERED streams, data is made visible via a subsequent
465 ``FlushRows`` rpc which advances a cursor to a newer offset
466 in the stream.
467
468 - For PENDING streams, data is not made visible until the
469 stream itself is finalized (via the ``FinalizeWriteStream``
470 rpc), and the stream is explicitly committed via the
471 ``BatchCommitWriteStreams`` rpc.
472
473 .. code-block:: python
474
475 # This snippet has been automatically generated and should be regarded as a
476 # code template only.
477 # It will require modifications to work:
478 # - It may require correct/in-range values for request initialization.
479 # - It may require specifying regional endpoints when creating the service
480 # client as shown in:
481 # https://googleapis.dev/python/google-api-core/latest/client_options.html
482 from google.cloud import bigquery_storage_v1
483
484 async def sample_append_rows():
485 # Create a client
486 client = bigquery_storage_v1.BigQueryWriteAsyncClient()
487
488 # Initialize request argument(s)
489 request = bigquery_storage_v1.AppendRowsRequest(
490 write_stream="write_stream_value",
491 )
492
493 # This method expects an iterator which contains
494 # 'bigquery_storage_v1.AppendRowsRequest' objects
495 # Here we create a generator that yields a single `request` for
496 # demonstrative purposes.
497 requests = [request]
498
499 def request_generator():
500 for request in requests:
501 yield request
502
503 # Make the request
504 stream = await client.append_rows(requests=request_generator())
505
506 # Handle the response
507 async for response in stream:
508 print(response)
509
510 Args:
511 requests (AsyncIterator[`google.cloud.bigquery_storage_v1.types.AppendRowsRequest`]):
512 The request object AsyncIterator. Request message for ``AppendRows``.
513
514 Because AppendRows is a bidirectional streaming RPC,
515 certain parts of the AppendRowsRequest need only be
516 specified for the first request before switching table
517 destinations. You can also switch table destinations
518 within the same connection for the default stream.
519
520 The size of a single AppendRowsRequest must be less than
521 10 MB in size. Requests larger than this return an
522 error, typically ``INVALID_ARGUMENT``.
523 retry (google.api_core.retry_async.AsyncRetry): Designation of what errors, if any,
524 should be retried.
525 timeout (float): The timeout for this request.
526 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
527 sent along with the request as metadata. Normally, each value must be of type `str`,
528 but for metadata keys ending with the suffix `-bin`, the corresponding values must
529 be of type `bytes`.
530
531 Returns:
532 AsyncIterable[google.cloud.bigquery_storage_v1.types.AppendRowsResponse]:
533 Response message for AppendRows.
534 """
535
536 # Wrap the RPC method; this adds retry and timeout information,
537 # and friendly error handling.
538 rpc = self._client._transport._wrapped_methods[
539 self._client._transport.append_rows
540 ]
541
542 # Certain fields should be provided within the metadata header;
543 # add these here.
544 metadata = tuple(metadata) + (gapic_v1.routing_header.to_grpc_metadata(()),)
545
546 # Validate the universe domain.
547 self._client._validate_universe_domain()
548
549 # Send the request.
550 response = rpc(
551 requests,
552 retry=retry,
553 timeout=timeout,
554 metadata=metadata,
555 )
556
557 # Done; return the response.
558 return response
559
560 async def get_write_stream(
561 self,
562 request: Optional[Union[storage.GetWriteStreamRequest, dict]] = None,
563 *,
564 name: Optional[str] = None,
565 retry: OptionalRetry = gapic_v1.method.DEFAULT,
566 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
567 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
568 ) -> stream.WriteStream:
569 r"""Gets information about a write stream.
570
571 .. code-block:: python
572
573 # This snippet has been automatically generated and should be regarded as a
574 # code template only.
575 # It will require modifications to work:
576 # - It may require correct/in-range values for request initialization.
577 # - It may require specifying regional endpoints when creating the service
578 # client as shown in:
579 # https://googleapis.dev/python/google-api-core/latest/client_options.html
580 from google.cloud import bigquery_storage_v1
581
582 async def sample_get_write_stream():
583 # Create a client
584 client = bigquery_storage_v1.BigQueryWriteAsyncClient()
585
586 # Initialize request argument(s)
587 request = bigquery_storage_v1.GetWriteStreamRequest(
588 name="name_value",
589 )
590
591 # Make the request
592 response = await client.get_write_stream(request=request)
593
594 # Handle the response
595 print(response)
596
597 Args:
598 request (Optional[Union[google.cloud.bigquery_storage_v1.types.GetWriteStreamRequest, dict]]):
599 The request object. Request message for ``GetWriteStreamRequest``.
600 name (:class:`str`):
601 Required. Name of the stream to get, in the form of
602 ``projects/{project}/datasets/{dataset}/tables/{table}/streams/{stream}``.
603
604 This corresponds to the ``name`` field
605 on the ``request`` instance; if ``request`` is provided, this
606 should not be set.
607 retry (google.api_core.retry_async.AsyncRetry): Designation of what errors, if any,
608 should be retried.
609 timeout (float): The timeout for this request.
610 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
611 sent along with the request as metadata. Normally, each value must be of type `str`,
612 but for metadata keys ending with the suffix `-bin`, the corresponding values must
613 be of type `bytes`.
614
615 Returns:
616 google.cloud.bigquery_storage_v1.types.WriteStream:
617 Information about a single stream
618 that gets data inside the storage
619 system.
620
621 """
622 # Create or coerce a protobuf request object.
623 # - Quick check: If we got a request object, we should *not* have
624 # gotten any keyword arguments that map to the request.
625 flattened_params = [name]
626 has_flattened_params = (
627 len([param for param in flattened_params if param is not None]) > 0
628 )
629 if request is not None and has_flattened_params:
630 raise ValueError(
631 "If the `request` argument is set, then none of "
632 "the individual field arguments should be set."
633 )
634
635 # - Use the request object if provided (there's no risk of modifying the input as
636 # there are no flattened fields), or create one.
637 if not isinstance(request, storage.GetWriteStreamRequest):
638 request = storage.GetWriteStreamRequest(request)
639
640 # If we have keyword arguments corresponding to fields on the
641 # request, apply these.
642 if name is not None:
643 request.name = name
644
645 # Wrap the RPC method; this adds retry and timeout information,
646 # and friendly error handling.
647 rpc = self._client._transport._wrapped_methods[
648 self._client._transport.get_write_stream
649 ]
650
651 # Certain fields should be provided within the metadata header;
652 # add these here.
653 metadata = tuple(metadata) + (
654 gapic_v1.routing_header.to_grpc_metadata((("name", request.name),)),
655 )
656
657 # Validate the universe domain.
658 self._client._validate_universe_domain()
659
660 # Send the request.
661 response = await rpc(
662 request,
663 retry=retry,
664 timeout=timeout,
665 metadata=metadata,
666 )
667
668 # Done; return the response.
669 return response
670
671 async def finalize_write_stream(
672 self,
673 request: Optional[Union[storage.FinalizeWriteStreamRequest, dict]] = None,
674 *,
675 name: Optional[str] = None,
676 retry: OptionalRetry = gapic_v1.method.DEFAULT,
677 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
678 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
679 ) -> storage.FinalizeWriteStreamResponse:
680 r"""Finalize a write stream so that no new data can be appended to
681 the stream. Finalize is not supported on the '_default' stream.
682
683 .. code-block:: python
684
685 # This snippet has been automatically generated and should be regarded as a
686 # code template only.
687 # It will require modifications to work:
688 # - It may require correct/in-range values for request initialization.
689 # - It may require specifying regional endpoints when creating the service
690 # client as shown in:
691 # https://googleapis.dev/python/google-api-core/latest/client_options.html
692 from google.cloud import bigquery_storage_v1
693
694 async def sample_finalize_write_stream():
695 # Create a client
696 client = bigquery_storage_v1.BigQueryWriteAsyncClient()
697
698 # Initialize request argument(s)
699 request = bigquery_storage_v1.FinalizeWriteStreamRequest(
700 name="name_value",
701 )
702
703 # Make the request
704 response = await client.finalize_write_stream(request=request)
705
706 # Handle the response
707 print(response)
708
709 Args:
710 request (Optional[Union[google.cloud.bigquery_storage_v1.types.FinalizeWriteStreamRequest, dict]]):
711 The request object. Request message for invoking ``FinalizeWriteStream``.
712 name (:class:`str`):
713 Required. Name of the stream to finalize, in the form of
714 ``projects/{project}/datasets/{dataset}/tables/{table}/streams/{stream}``.
715
716 This corresponds to the ``name`` field
717 on the ``request`` instance; if ``request`` is provided, this
718 should not be set.
719 retry (google.api_core.retry_async.AsyncRetry): Designation of what errors, if any,
720 should be retried.
721 timeout (float): The timeout for this request.
722 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
723 sent along with the request as metadata. Normally, each value must be of type `str`,
724 but for metadata keys ending with the suffix `-bin`, the corresponding values must
725 be of type `bytes`.
726
727 Returns:
728 google.cloud.bigquery_storage_v1.types.FinalizeWriteStreamResponse:
729 Response message for FinalizeWriteStream.
730 """
731 # Create or coerce a protobuf request object.
732 # - Quick check: If we got a request object, we should *not* have
733 # gotten any keyword arguments that map to the request.
734 flattened_params = [name]
735 has_flattened_params = (
736 len([param for param in flattened_params if param is not None]) > 0
737 )
738 if request is not None and has_flattened_params:
739 raise ValueError(
740 "If the `request` argument is set, then none of "
741 "the individual field arguments should be set."
742 )
743
744 # - Use the request object if provided (there's no risk of modifying the input as
745 # there are no flattened fields), or create one.
746 if not isinstance(request, storage.FinalizeWriteStreamRequest):
747 request = storage.FinalizeWriteStreamRequest(request)
748
749 # If we have keyword arguments corresponding to fields on the
750 # request, apply these.
751 if name is not None:
752 request.name = name
753
754 # Wrap the RPC method; this adds retry and timeout information,
755 # and friendly error handling.
756 rpc = self._client._transport._wrapped_methods[
757 self._client._transport.finalize_write_stream
758 ]
759
760 # Certain fields should be provided within the metadata header;
761 # add these here.
762 metadata = tuple(metadata) + (
763 gapic_v1.routing_header.to_grpc_metadata((("name", request.name),)),
764 )
765
766 # Validate the universe domain.
767 self._client._validate_universe_domain()
768
769 # Send the request.
770 response = await rpc(
771 request,
772 retry=retry,
773 timeout=timeout,
774 metadata=metadata,
775 )
776
777 # Done; return the response.
778 return response
779
780 async def batch_commit_write_streams(
781 self,
782 request: Optional[Union[storage.BatchCommitWriteStreamsRequest, dict]] = None,
783 *,
784 parent: Optional[str] = None,
785 retry: OptionalRetry = gapic_v1.method.DEFAULT,
786 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
787 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
788 ) -> storage.BatchCommitWriteStreamsResponse:
789 r"""Atomically commits a group of ``PENDING`` streams that belong to
790 the same ``parent`` table.
791
792 Streams must be finalized before commit and cannot be committed
793 multiple times. Once a stream is committed, data in the stream
794 becomes available for read operations.
795
796 .. code-block:: python
797
798 # This snippet has been automatically generated and should be regarded as a
799 # code template only.
800 # It will require modifications to work:
801 # - It may require correct/in-range values for request initialization.
802 # - It may require specifying regional endpoints when creating the service
803 # client as shown in:
804 # https://googleapis.dev/python/google-api-core/latest/client_options.html
805 from google.cloud import bigquery_storage_v1
806
807 async def sample_batch_commit_write_streams():
808 # Create a client
809 client = bigquery_storage_v1.BigQueryWriteAsyncClient()
810
811 # Initialize request argument(s)
812 request = bigquery_storage_v1.BatchCommitWriteStreamsRequest(
813 parent="parent_value",
814 write_streams=['write_streams_value1', 'write_streams_value2'],
815 )
816
817 # Make the request
818 response = await client.batch_commit_write_streams(request=request)
819
820 # Handle the response
821 print(response)
822
823 Args:
824 request (Optional[Union[google.cloud.bigquery_storage_v1.types.BatchCommitWriteStreamsRequest, dict]]):
825 The request object. Request message for ``BatchCommitWriteStreams``.
826 parent (:class:`str`):
827 Required. Parent table that all the streams should
828 belong to, in the form of
829 ``projects/{project}/datasets/{dataset}/tables/{table}``.
830
831 This corresponds to the ``parent`` field
832 on the ``request`` instance; if ``request`` is provided, this
833 should not be set.
834 retry (google.api_core.retry_async.AsyncRetry): Designation of what errors, if any,
835 should be retried.
836 timeout (float): The timeout for this request.
837 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
838 sent along with the request as metadata. Normally, each value must be of type `str`,
839 but for metadata keys ending with the suffix `-bin`, the corresponding values must
840 be of type `bytes`.
841
842 Returns:
843 google.cloud.bigquery_storage_v1.types.BatchCommitWriteStreamsResponse:
844 Response message for BatchCommitWriteStreams.
845 """
846 # Create or coerce a protobuf request object.
847 # - Quick check: If we got a request object, we should *not* have
848 # gotten any keyword arguments that map to the request.
849 flattened_params = [parent]
850 has_flattened_params = (
851 len([param for param in flattened_params if param is not None]) > 0
852 )
853 if request is not None and has_flattened_params:
854 raise ValueError(
855 "If the `request` argument is set, then none of "
856 "the individual field arguments should be set."
857 )
858
859 # - Use the request object if provided (there's no risk of modifying the input as
860 # there are no flattened fields), or create one.
861 if not isinstance(request, storage.BatchCommitWriteStreamsRequest):
862 request = storage.BatchCommitWriteStreamsRequest(request)
863
864 # If we have keyword arguments corresponding to fields on the
865 # request, apply these.
866 if parent is not None:
867 request.parent = parent
868
869 # Wrap the RPC method; this adds retry and timeout information,
870 # and friendly error handling.
871 rpc = self._client._transport._wrapped_methods[
872 self._client._transport.batch_commit_write_streams
873 ]
874
875 # Certain fields should be provided within the metadata header;
876 # add these here.
877 metadata = tuple(metadata) + (
878 gapic_v1.routing_header.to_grpc_metadata((("parent", request.parent),)),
879 )
880
881 # Validate the universe domain.
882 self._client._validate_universe_domain()
883
884 # Send the request.
885 response = await rpc(
886 request,
887 retry=retry,
888 timeout=timeout,
889 metadata=metadata,
890 )
891
892 # Done; return the response.
893 return response
894
895 async def flush_rows(
896 self,
897 request: Optional[Union[storage.FlushRowsRequest, dict]] = None,
898 *,
899 write_stream: Optional[str] = None,
900 retry: OptionalRetry = gapic_v1.method.DEFAULT,
901 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
902 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
903 ) -> storage.FlushRowsResponse:
904 r"""Flushes rows to a BUFFERED stream.
905
906 If users are appending rows to BUFFERED stream, flush operation
907 is required in order for the rows to become available for
908 reading. A Flush operation flushes up to any previously flushed
909 offset in a BUFFERED stream, to the offset specified in the
910 request.
911
912 Flush is not supported on the \_default stream, since it is not
913 BUFFERED.
914
915 .. code-block:: python
916
917 # This snippet has been automatically generated and should be regarded as a
918 # code template only.
919 # It will require modifications to work:
920 # - It may require correct/in-range values for request initialization.
921 # - It may require specifying regional endpoints when creating the service
922 # client as shown in:
923 # https://googleapis.dev/python/google-api-core/latest/client_options.html
924 from google.cloud import bigquery_storage_v1
925
926 async def sample_flush_rows():
927 # Create a client
928 client = bigquery_storage_v1.BigQueryWriteAsyncClient()
929
930 # Initialize request argument(s)
931 request = bigquery_storage_v1.FlushRowsRequest(
932 write_stream="write_stream_value",
933 )
934
935 # Make the request
936 response = await client.flush_rows(request=request)
937
938 # Handle the response
939 print(response)
940
941 Args:
942 request (Optional[Union[google.cloud.bigquery_storage_v1.types.FlushRowsRequest, dict]]):
943 The request object. Request message for ``FlushRows``.
944 write_stream (:class:`str`):
945 Required. The stream that is the
946 target of the flush operation.
947
948 This corresponds to the ``write_stream`` field
949 on the ``request`` instance; if ``request`` is provided, this
950 should not be set.
951 retry (google.api_core.retry_async.AsyncRetry): Designation of what errors, if any,
952 should be retried.
953 timeout (float): The timeout for this request.
954 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
955 sent along with the request as metadata. Normally, each value must be of type `str`,
956 but for metadata keys ending with the suffix `-bin`, the corresponding values must
957 be of type `bytes`.
958
959 Returns:
960 google.cloud.bigquery_storage_v1.types.FlushRowsResponse:
961 Respond message for FlushRows.
962 """
963 # Create or coerce a protobuf request object.
964 # - Quick check: If we got a request object, we should *not* have
965 # gotten any keyword arguments that map to the request.
966 flattened_params = [write_stream]
967 has_flattened_params = (
968 len([param for param in flattened_params if param is not None]) > 0
969 )
970 if request is not None and has_flattened_params:
971 raise ValueError(
972 "If the `request` argument is set, then none of "
973 "the individual field arguments should be set."
974 )
975
976 # - Use the request object if provided (there's no risk of modifying the input as
977 # there are no flattened fields), or create one.
978 if not isinstance(request, storage.FlushRowsRequest):
979 request = storage.FlushRowsRequest(request)
980
981 # If we have keyword arguments corresponding to fields on the
982 # request, apply these.
983 if write_stream is not None:
984 request.write_stream = write_stream
985
986 # Wrap the RPC method; this adds retry and timeout information,
987 # and friendly error handling.
988 rpc = self._client._transport._wrapped_methods[
989 self._client._transport.flush_rows
990 ]
991
992 # Certain fields should be provided within the metadata header;
993 # add these here.
994 metadata = tuple(metadata) + (
995 gapic_v1.routing_header.to_grpc_metadata(
996 (("write_stream", request.write_stream),)
997 ),
998 )
999
1000 # Validate the universe domain.
1001 self._client._validate_universe_domain()
1002
1003 # Send the request.
1004 response = await rpc(
1005 request,
1006 retry=retry,
1007 timeout=timeout,
1008 metadata=metadata,
1009 )
1010
1011 # Done; return the response.
1012 return response
1013
1014 async def __aenter__(self) -> "BigQueryWriteAsyncClient":
1015 return self
1016
1017 async def __aexit__(self, exc_type, exc, tb):
1018 await self.transport.close()
1019
1020
1021DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo(
1022 gapic_version=package_version.__version__
1023)
1024
1025if hasattr(DEFAULT_CLIENT_INFO, "protobuf_runtime_version"): # pragma: NO COVER
1026 DEFAULT_CLIENT_INFO.protobuf_runtime_version = google.protobuf.__version__
1027
1028
1029__all__ = ("BigQueryWriteAsyncClient",)