1# -*- coding: utf-8 -*-
2# Copyright 2022 Google LLC
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16from collections import OrderedDict
17import os
18import re
19from typing import (
20 Dict,
21 Mapping,
22 MutableMapping,
23 MutableSequence,
24 Optional,
25 Iterable,
26 Iterator,
27 Sequence,
28 Tuple,
29 Type,
30 Union,
31 cast,
32)
33
34from google.cloud.bigquery_storage_v1 import gapic_version as package_version
35
36from google.api_core import client_options as client_options_lib
37from google.api_core import exceptions as core_exceptions
38from google.api_core import gapic_v1
39from google.api_core import retry as retries
40from google.auth import credentials as ga_credentials # type: ignore
41from google.auth.transport import mtls # type: ignore
42from google.auth.transport.grpc import SslCredentials # type: ignore
43from google.auth.exceptions import MutualTLSChannelError # type: ignore
44from google.oauth2 import service_account # type: ignore
45
46try:
47 OptionalRetry = Union[retries.Retry, gapic_v1.method._MethodDefault]
48except AttributeError: # pragma: NO COVER
49 OptionalRetry = Union[retries.Retry, object] # type: ignore
50
51from google.cloud.bigquery_storage_v1.types import storage
52from google.cloud.bigquery_storage_v1.types import stream
53from google.cloud.bigquery_storage_v1.types import table
54from google.protobuf import timestamp_pb2 # type: ignore
55from google.rpc import status_pb2 # type: ignore
56from .transports.base import BigQueryWriteTransport, DEFAULT_CLIENT_INFO
57from .transports.grpc import BigQueryWriteGrpcTransport
58from .transports.grpc_asyncio import BigQueryWriteGrpcAsyncIOTransport
59
60
61class BigQueryWriteClientMeta(type):
62 """Metaclass for the BigQueryWrite client.
63
64 This provides class-level methods for building and retrieving
65 support objects (e.g. transport) without polluting the client instance
66 objects.
67 """
68
69 _transport_registry = OrderedDict() # type: Dict[str, Type[BigQueryWriteTransport]]
70 _transport_registry["grpc"] = BigQueryWriteGrpcTransport
71 _transport_registry["grpc_asyncio"] = BigQueryWriteGrpcAsyncIOTransport
72
73 def get_transport_class(
74 cls,
75 label: Optional[str] = None,
76 ) -> Type[BigQueryWriteTransport]:
77 """Returns an appropriate transport class.
78
79 Args:
80 label: The name of the desired transport. If none is
81 provided, then the first transport in the registry is used.
82
83 Returns:
84 The transport class to use.
85 """
86 # If a specific transport is requested, return that one.
87 if label:
88 return cls._transport_registry[label]
89
90 # No transport is requested; return the default (that is, the first one
91 # in the dictionary).
92 return next(iter(cls._transport_registry.values()))
93
94
95class BigQueryWriteClient(metaclass=BigQueryWriteClientMeta):
96 """BigQuery Write API.
97 The Write API can be used to write data to BigQuery.
98 For supplementary information about the Write API, see:
99 https://cloud.google.com/bigquery/docs/write-api
100 """
101
102 @staticmethod
103 def _get_default_mtls_endpoint(api_endpoint):
104 """Converts api endpoint to mTLS endpoint.
105
106 Convert "*.sandbox.googleapis.com" and "*.googleapis.com" to
107 "*.mtls.sandbox.googleapis.com" and "*.mtls.googleapis.com" respectively.
108 Args:
109 api_endpoint (Optional[str]): the api endpoint to convert.
110 Returns:
111 str: converted mTLS api endpoint.
112 """
113 if not api_endpoint:
114 return api_endpoint
115
116 mtls_endpoint_re = re.compile(
117 r"(?P<name>[^.]+)(?P<mtls>\.mtls)?(?P<sandbox>\.sandbox)?(?P<googledomain>\.googleapis\.com)?"
118 )
119
120 m = mtls_endpoint_re.match(api_endpoint)
121 name, mtls, sandbox, googledomain = m.groups()
122 if mtls or not googledomain:
123 return api_endpoint
124
125 if sandbox:
126 return api_endpoint.replace(
127 "sandbox.googleapis.com", "mtls.sandbox.googleapis.com"
128 )
129
130 return api_endpoint.replace(".googleapis.com", ".mtls.googleapis.com")
131
132 DEFAULT_ENDPOINT = "bigquerystorage.googleapis.com"
133 DEFAULT_MTLS_ENDPOINT = _get_default_mtls_endpoint.__func__( # type: ignore
134 DEFAULT_ENDPOINT
135 )
136
137 @classmethod
138 def from_service_account_info(cls, info: dict, *args, **kwargs):
139 """Creates an instance of this client using the provided credentials
140 info.
141
142 Args:
143 info (dict): The service account private key info.
144 args: Additional arguments to pass to the constructor.
145 kwargs: Additional arguments to pass to the constructor.
146
147 Returns:
148 BigQueryWriteClient: The constructed client.
149 """
150 credentials = service_account.Credentials.from_service_account_info(info)
151 kwargs["credentials"] = credentials
152 return cls(*args, **kwargs)
153
154 @classmethod
155 def from_service_account_file(cls, filename: str, *args, **kwargs):
156 """Creates an instance of this client using the provided credentials
157 file.
158
159 Args:
160 filename (str): The path to the service account private key json
161 file.
162 args: Additional arguments to pass to the constructor.
163 kwargs: Additional arguments to pass to the constructor.
164
165 Returns:
166 BigQueryWriteClient: The constructed client.
167 """
168 credentials = service_account.Credentials.from_service_account_file(filename)
169 kwargs["credentials"] = credentials
170 return cls(*args, **kwargs)
171
172 from_service_account_json = from_service_account_file
173
174 @property
175 def transport(self) -> BigQueryWriteTransport:
176 """Returns the transport used by the client instance.
177
178 Returns:
179 BigQueryWriteTransport: The transport used by the client
180 instance.
181 """
182 return self._transport
183
184 @staticmethod
185 def table_path(
186 project: str,
187 dataset: str,
188 table: str,
189 ) -> str:
190 """Returns a fully-qualified table string."""
191 return "projects/{project}/datasets/{dataset}/tables/{table}".format(
192 project=project,
193 dataset=dataset,
194 table=table,
195 )
196
197 @staticmethod
198 def parse_table_path(path: str) -> Dict[str, str]:
199 """Parses a table path into its component segments."""
200 m = re.match(
201 r"^projects/(?P<project>.+?)/datasets/(?P<dataset>.+?)/tables/(?P<table>.+?)$",
202 path,
203 )
204 return m.groupdict() if m else {}
205
206 @staticmethod
207 def write_stream_path(
208 project: str,
209 dataset: str,
210 table: str,
211 stream: str,
212 ) -> str:
213 """Returns a fully-qualified write_stream string."""
214 return "projects/{project}/datasets/{dataset}/tables/{table}/streams/{stream}".format(
215 project=project,
216 dataset=dataset,
217 table=table,
218 stream=stream,
219 )
220
221 @staticmethod
222 def parse_write_stream_path(path: str) -> Dict[str, str]:
223 """Parses a write_stream path into its component segments."""
224 m = re.match(
225 r"^projects/(?P<project>.+?)/datasets/(?P<dataset>.+?)/tables/(?P<table>.+?)/streams/(?P<stream>.+?)$",
226 path,
227 )
228 return m.groupdict() if m else {}
229
230 @staticmethod
231 def common_billing_account_path(
232 billing_account: str,
233 ) -> str:
234 """Returns a fully-qualified billing_account string."""
235 return "billingAccounts/{billing_account}".format(
236 billing_account=billing_account,
237 )
238
239 @staticmethod
240 def parse_common_billing_account_path(path: str) -> Dict[str, str]:
241 """Parse a billing_account path into its component segments."""
242 m = re.match(r"^billingAccounts/(?P<billing_account>.+?)$", path)
243 return m.groupdict() if m else {}
244
245 @staticmethod
246 def common_folder_path(
247 folder: str,
248 ) -> str:
249 """Returns a fully-qualified folder string."""
250 return "folders/{folder}".format(
251 folder=folder,
252 )
253
254 @staticmethod
255 def parse_common_folder_path(path: str) -> Dict[str, str]:
256 """Parse a folder path into its component segments."""
257 m = re.match(r"^folders/(?P<folder>.+?)$", path)
258 return m.groupdict() if m else {}
259
260 @staticmethod
261 def common_organization_path(
262 organization: str,
263 ) -> str:
264 """Returns a fully-qualified organization string."""
265 return "organizations/{organization}".format(
266 organization=organization,
267 )
268
269 @staticmethod
270 def parse_common_organization_path(path: str) -> Dict[str, str]:
271 """Parse a organization path into its component segments."""
272 m = re.match(r"^organizations/(?P<organization>.+?)$", path)
273 return m.groupdict() if m else {}
274
275 @staticmethod
276 def common_project_path(
277 project: str,
278 ) -> str:
279 """Returns a fully-qualified project string."""
280 return "projects/{project}".format(
281 project=project,
282 )
283
284 @staticmethod
285 def parse_common_project_path(path: str) -> Dict[str, str]:
286 """Parse a project path into its component segments."""
287 m = re.match(r"^projects/(?P<project>.+?)$", path)
288 return m.groupdict() if m else {}
289
290 @staticmethod
291 def common_location_path(
292 project: str,
293 location: str,
294 ) -> str:
295 """Returns a fully-qualified location string."""
296 return "projects/{project}/locations/{location}".format(
297 project=project,
298 location=location,
299 )
300
301 @staticmethod
302 def parse_common_location_path(path: str) -> Dict[str, str]:
303 """Parse a location path into its component segments."""
304 m = re.match(r"^projects/(?P<project>.+?)/locations/(?P<location>.+?)$", path)
305 return m.groupdict() if m else {}
306
307 @classmethod
308 def get_mtls_endpoint_and_cert_source(
309 cls, client_options: Optional[client_options_lib.ClientOptions] = None
310 ):
311 """Return the API endpoint and client cert source for mutual TLS.
312
313 The client cert source is determined in the following order:
314 (1) if `GOOGLE_API_USE_CLIENT_CERTIFICATE` environment variable is not "true", the
315 client cert source is None.
316 (2) if `client_options.client_cert_source` is provided, use the provided one; if the
317 default client cert source exists, use the default one; otherwise the client cert
318 source is None.
319
320 The API endpoint is determined in the following order:
321 (1) if `client_options.api_endpoint` if provided, use the provided one.
322 (2) if `GOOGLE_API_USE_CLIENT_CERTIFICATE` environment variable is "always", use the
323 default mTLS endpoint; if the environment variable is "never", use the default API
324 endpoint; otherwise if client cert source exists, use the default mTLS endpoint, otherwise
325 use the default API endpoint.
326
327 More details can be found at https://google.aip.dev/auth/4114.
328
329 Args:
330 client_options (google.api_core.client_options.ClientOptions): Custom options for the
331 client. Only the `api_endpoint` and `client_cert_source` properties may be used
332 in this method.
333
334 Returns:
335 Tuple[str, Callable[[], Tuple[bytes, bytes]]]: returns the API endpoint and the
336 client cert source to use.
337
338 Raises:
339 google.auth.exceptions.MutualTLSChannelError: If any errors happen.
340 """
341 if client_options is None:
342 client_options = client_options_lib.ClientOptions()
343 use_client_cert = os.getenv("GOOGLE_API_USE_CLIENT_CERTIFICATE", "false")
344 use_mtls_endpoint = os.getenv("GOOGLE_API_USE_MTLS_ENDPOINT", "auto")
345 if use_client_cert not in ("true", "false"):
346 raise ValueError(
347 "Environment variable `GOOGLE_API_USE_CLIENT_CERTIFICATE` must be either `true` or `false`"
348 )
349 if use_mtls_endpoint not in ("auto", "never", "always"):
350 raise MutualTLSChannelError(
351 "Environment variable `GOOGLE_API_USE_MTLS_ENDPOINT` must be `never`, `auto` or `always`"
352 )
353
354 # Figure out the client cert source to use.
355 client_cert_source = None
356 if use_client_cert == "true":
357 if client_options.client_cert_source:
358 client_cert_source = client_options.client_cert_source
359 elif mtls.has_default_client_cert_source():
360 client_cert_source = mtls.default_client_cert_source()
361
362 # Figure out which api endpoint to use.
363 if client_options.api_endpoint is not None:
364 api_endpoint = client_options.api_endpoint
365 elif use_mtls_endpoint == "always" or (
366 use_mtls_endpoint == "auto" and client_cert_source
367 ):
368 api_endpoint = cls.DEFAULT_MTLS_ENDPOINT
369 else:
370 api_endpoint = cls.DEFAULT_ENDPOINT
371
372 return api_endpoint, client_cert_source
373
374 def __init__(
375 self,
376 *,
377 credentials: Optional[ga_credentials.Credentials] = None,
378 transport: Optional[Union[str, BigQueryWriteTransport]] = None,
379 client_options: Optional[Union[client_options_lib.ClientOptions, dict]] = None,
380 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO,
381 ) -> None:
382 """Instantiates the big query write client.
383
384 Args:
385 credentials (Optional[google.auth.credentials.Credentials]): The
386 authorization credentials to attach to requests. These
387 credentials identify the application to the service; if none
388 are specified, the client will attempt to ascertain the
389 credentials from the environment.
390 transport (Union[str, BigQueryWriteTransport]): The
391 transport to use. If set to None, a transport is chosen
392 automatically.
393 client_options (Optional[Union[google.api_core.client_options.ClientOptions, dict]]): Custom options for the
394 client. It won't take effect if a ``transport`` instance is provided.
395 (1) The ``api_endpoint`` property can be used to override the
396 default endpoint provided by the client. GOOGLE_API_USE_MTLS_ENDPOINT
397 environment variable can also be used to override the endpoint:
398 "always" (always use the default mTLS endpoint), "never" (always
399 use the default regular endpoint) and "auto" (auto switch to the
400 default mTLS endpoint if client certificate is present, this is
401 the default value). However, the ``api_endpoint`` property takes
402 precedence if provided.
403 (2) If GOOGLE_API_USE_CLIENT_CERTIFICATE environment variable
404 is "true", then the ``client_cert_source`` property can be used
405 to provide client certificate for mutual TLS transport. If
406 not provided, the default SSL client certificate will be used if
407 present. If GOOGLE_API_USE_CLIENT_CERTIFICATE is "false" or not
408 set, no client certificate will be used.
409 client_info (google.api_core.gapic_v1.client_info.ClientInfo):
410 The client info used to send a user-agent string along with
411 API requests. If ``None``, then default info will be used.
412 Generally, you only need to set this if you're developing
413 your own client library.
414
415 Raises:
416 google.auth.exceptions.MutualTLSChannelError: If mutual TLS transport
417 creation failed for any reason.
418 """
419 if isinstance(client_options, dict):
420 client_options = client_options_lib.from_dict(client_options)
421 if client_options is None:
422 client_options = client_options_lib.ClientOptions()
423 client_options = cast(client_options_lib.ClientOptions, client_options)
424
425 api_endpoint, client_cert_source_func = self.get_mtls_endpoint_and_cert_source(
426 client_options
427 )
428
429 api_key_value = getattr(client_options, "api_key", None)
430 if api_key_value and credentials:
431 raise ValueError(
432 "client_options.api_key and credentials are mutually exclusive"
433 )
434
435 # Save or instantiate the transport.
436 # Ordinarily, we provide the transport, but allowing a custom transport
437 # instance provides an extensibility point for unusual situations.
438 if isinstance(transport, BigQueryWriteTransport):
439 # transport is a BigQueryWriteTransport instance.
440 if credentials or client_options.credentials_file or api_key_value:
441 raise ValueError(
442 "When providing a transport instance, "
443 "provide its credentials directly."
444 )
445 if client_options.scopes:
446 raise ValueError(
447 "When providing a transport instance, provide its scopes "
448 "directly."
449 )
450 self._transport = transport
451 else:
452 import google.auth._default # type: ignore
453
454 if api_key_value and hasattr(
455 google.auth._default, "get_api_key_credentials"
456 ):
457 credentials = google.auth._default.get_api_key_credentials(
458 api_key_value
459 )
460
461 Transport = type(self).get_transport_class(transport)
462 self._transport = Transport(
463 credentials=credentials,
464 credentials_file=client_options.credentials_file,
465 host=api_endpoint,
466 scopes=client_options.scopes,
467 client_cert_source_for_mtls=client_cert_source_func,
468 quota_project_id=client_options.quota_project_id,
469 client_info=client_info,
470 always_use_jwt_access=True,
471 api_audience=client_options.api_audience,
472 )
473
474 def create_write_stream(
475 self,
476 request: Optional[Union[storage.CreateWriteStreamRequest, dict]] = None,
477 *,
478 parent: Optional[str] = None,
479 write_stream: Optional[stream.WriteStream] = None,
480 retry: OptionalRetry = gapic_v1.method.DEFAULT,
481 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
482 metadata: Sequence[Tuple[str, str]] = (),
483 ) -> stream.WriteStream:
484 r"""Creates a write stream to the given table. Additionally, every
485 table has a special stream named '_default' to which data can be
486 written. This stream doesn't need to be created using
487 CreateWriteStream. It is a stream that can be used
488 simultaneously by any number of clients. Data written to this
489 stream is considered committed as soon as an acknowledgement is
490 received.
491
492 .. code-block:: python
493
494 # This snippet has been automatically generated and should be regarded as a
495 # code template only.
496 # It will require modifications to work:
497 # - It may require correct/in-range values for request initialization.
498 # - It may require specifying regional endpoints when creating the service
499 # client as shown in:
500 # https://googleapis.dev/python/google-api-core/latest/client_options.html
501 from google.cloud import bigquery_storage_v1
502
503 def sample_create_write_stream():
504 # Create a client
505 client = bigquery_storage_v1.BigQueryWriteClient()
506
507 # Initialize request argument(s)
508 request = bigquery_storage_v1.CreateWriteStreamRequest(
509 parent="parent_value",
510 )
511
512 # Make the request
513 response = client.create_write_stream(request=request)
514
515 # Handle the response
516 print(response)
517
518 Args:
519 request (Union[google.cloud.bigquery_storage_v1.types.CreateWriteStreamRequest, dict]):
520 The request object. Request message for ``CreateWriteStream``.
521 parent (str):
522 Required. Reference to the table to which the stream
523 belongs, in the format of
524 ``projects/{project}/datasets/{dataset}/tables/{table}``.
525
526 This corresponds to the ``parent`` field
527 on the ``request`` instance; if ``request`` is provided, this
528 should not be set.
529 write_stream (google.cloud.bigquery_storage_v1.types.WriteStream):
530 Required. Stream to be created.
531 This corresponds to the ``write_stream`` field
532 on the ``request`` instance; if ``request`` is provided, this
533 should not be set.
534 retry (google.api_core.retry.Retry): Designation of what errors, if any,
535 should be retried.
536 timeout (float): The timeout for this request.
537 metadata (Sequence[Tuple[str, str]]): Strings which should be
538 sent along with the request as metadata.
539
540 Returns:
541 google.cloud.bigquery_storage_v1.types.WriteStream:
542 Information about a single stream
543 that gets data inside the storage
544 system.
545
546 """
547 # Create or coerce a protobuf request object.
548 # Quick check: If we got a request object, we should *not* have
549 # gotten any keyword arguments that map to the request.
550 has_flattened_params = any([parent, write_stream])
551 if request is not None and has_flattened_params:
552 raise ValueError(
553 "If the `request` argument is set, then none of "
554 "the individual field arguments should be set."
555 )
556
557 # Minor optimization to avoid making a copy if the user passes
558 # in a storage.CreateWriteStreamRequest.
559 # There's no risk of modifying the input as we've already verified
560 # there are no flattened fields.
561 if not isinstance(request, storage.CreateWriteStreamRequest):
562 request = storage.CreateWriteStreamRequest(request)
563 # If we have keyword arguments corresponding to fields on the
564 # request, apply these.
565 if parent is not None:
566 request.parent = parent
567 if write_stream is not None:
568 request.write_stream = write_stream
569
570 # Wrap the RPC method; this adds retry and timeout information,
571 # and friendly error handling.
572 rpc = self._transport._wrapped_methods[self._transport.create_write_stream]
573
574 # Certain fields should be provided within the metadata header;
575 # add these here.
576 metadata = tuple(metadata) + (
577 gapic_v1.routing_header.to_grpc_metadata((("parent", request.parent),)),
578 )
579
580 # Send the request.
581 response = rpc(
582 request,
583 retry=retry,
584 timeout=timeout,
585 metadata=metadata,
586 )
587
588 # Done; return the response.
589 return response
590
591 def append_rows(
592 self,
593 requests: Optional[Iterator[storage.AppendRowsRequest]] = None,
594 *,
595 retry: OptionalRetry = gapic_v1.method.DEFAULT,
596 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
597 metadata: Sequence[Tuple[str, str]] = (),
598 ) -> Iterable[storage.AppendRowsResponse]:
599 r"""Appends data to the given stream.
600
601 If ``offset`` is specified, the ``offset`` is checked against
602 the end of stream. The server returns ``OUT_OF_RANGE`` in
603 ``AppendRowsResponse`` if an attempt is made to append to an
604 offset beyond the current end of the stream or
605 ``ALREADY_EXISTS`` if user provides an ``offset`` that has
606 already been written to. User can retry with adjusted offset
607 within the same RPC connection. If ``offset`` is not specified,
608 append happens at the end of the stream.
609
610 The response contains an optional offset at which the append
611 happened. No offset information will be returned for appends to
612 a default stream.
613
614 Responses are received in the same order in which requests are
615 sent. There will be one response for each successful inserted
616 request. Responses may optionally embed error information if the
617 originating AppendRequest was not successfully processed.
618
619 The specifics of when successfully appended data is made visible
620 to the table are governed by the type of stream:
621
622 - For COMMITTED streams (which includes the default stream),
623 data is visible immediately upon successful append.
624
625 - For BUFFERED streams, data is made visible via a subsequent
626 ``FlushRows`` rpc which advances a cursor to a newer offset
627 in the stream.
628
629 - For PENDING streams, data is not made visible until the
630 stream itself is finalized (via the ``FinalizeWriteStream``
631 rpc), and the stream is explicitly committed via the
632 ``BatchCommitWriteStreams`` rpc.
633
634 .. code-block:: python
635
636 # This snippet has been automatically generated and should be regarded as a
637 # code template only.
638 # It will require modifications to work:
639 # - It may require correct/in-range values for request initialization.
640 # - It may require specifying regional endpoints when creating the service
641 # client as shown in:
642 # https://googleapis.dev/python/google-api-core/latest/client_options.html
643 from google.cloud import bigquery_storage_v1
644
645 def sample_append_rows():
646 # Create a client
647 client = bigquery_storage_v1.BigQueryWriteClient()
648
649 # Initialize request argument(s)
650 request = bigquery_storage_v1.AppendRowsRequest(
651 write_stream="write_stream_value",
652 )
653
654 # This method expects an iterator which contains
655 # 'bigquery_storage_v1.AppendRowsRequest' objects
656 # Here we create a generator that yields a single `request` for
657 # demonstrative purposes.
658 requests = [request]
659
660 def request_generator():
661 for request in requests:
662 yield request
663
664 # Make the request
665 stream = client.append_rows(requests=request_generator())
666
667 # Handle the response
668 for response in stream:
669 print(response)
670
671 Args:
672 requests (Iterator[google.cloud.bigquery_storage_v1.types.AppendRowsRequest]):
673 The request object iterator. Request message for ``AppendRows``.
674
675 Due to the nature of AppendRows being a bidirectional
676 streaming RPC, certain parts of the AppendRowsRequest
677 need only be specified for the first request sent each
678 time the gRPC network connection is opened/reopened.
679
680 The size of a single AppendRowsRequest must be less than
681 10 MB in size. Requests larger than this return an
682 error, typically ``INVALID_ARGUMENT``.
683 retry (google.api_core.retry.Retry): Designation of what errors, if any,
684 should be retried.
685 timeout (float): The timeout for this request.
686 metadata (Sequence[Tuple[str, str]]): Strings which should be
687 sent along with the request as metadata.
688
689 Returns:
690 Iterable[google.cloud.bigquery_storage_v1.types.AppendRowsResponse]:
691 Response message for AppendRows.
692 """
693
694 # Wrap the RPC method; this adds retry and timeout information,
695 # and friendly error handling.
696 rpc = self._transport._wrapped_methods[self._transport.append_rows]
697
698 # Certain fields should be provided within the metadata header;
699 # add these here.
700 metadata = tuple(metadata) + (gapic_v1.routing_header.to_grpc_metadata(()),)
701
702 # Send the request.
703 response = rpc(
704 requests,
705 retry=retry,
706 timeout=timeout,
707 metadata=metadata,
708 )
709
710 # Done; return the response.
711 return response
712
713 def get_write_stream(
714 self,
715 request: Optional[Union[storage.GetWriteStreamRequest, dict]] = None,
716 *,
717 name: Optional[str] = None,
718 retry: OptionalRetry = gapic_v1.method.DEFAULT,
719 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
720 metadata: Sequence[Tuple[str, str]] = (),
721 ) -> stream.WriteStream:
722 r"""Gets information about a write stream.
723
724 .. code-block:: python
725
726 # This snippet has been automatically generated and should be regarded as a
727 # code template only.
728 # It will require modifications to work:
729 # - It may require correct/in-range values for request initialization.
730 # - It may require specifying regional endpoints when creating the service
731 # client as shown in:
732 # https://googleapis.dev/python/google-api-core/latest/client_options.html
733 from google.cloud import bigquery_storage_v1
734
735 def sample_get_write_stream():
736 # Create a client
737 client = bigquery_storage_v1.BigQueryWriteClient()
738
739 # Initialize request argument(s)
740 request = bigquery_storage_v1.GetWriteStreamRequest(
741 name="name_value",
742 )
743
744 # Make the request
745 response = client.get_write_stream(request=request)
746
747 # Handle the response
748 print(response)
749
750 Args:
751 request (Union[google.cloud.bigquery_storage_v1.types.GetWriteStreamRequest, dict]):
752 The request object. Request message for ``GetWriteStreamRequest``.
753 name (str):
754 Required. Name of the stream to get, in the form of
755 ``projects/{project}/datasets/{dataset}/tables/{table}/streams/{stream}``.
756
757 This corresponds to the ``name`` field
758 on the ``request`` instance; if ``request`` is provided, this
759 should not be set.
760 retry (google.api_core.retry.Retry): Designation of what errors, if any,
761 should be retried.
762 timeout (float): The timeout for this request.
763 metadata (Sequence[Tuple[str, str]]): Strings which should be
764 sent along with the request as metadata.
765
766 Returns:
767 google.cloud.bigquery_storage_v1.types.WriteStream:
768 Information about a single stream
769 that gets data inside the storage
770 system.
771
772 """
773 # Create or coerce a protobuf request object.
774 # Quick check: If we got a request object, we should *not* have
775 # gotten any keyword arguments that map to the request.
776 has_flattened_params = any([name])
777 if request is not None and has_flattened_params:
778 raise ValueError(
779 "If the `request` argument is set, then none of "
780 "the individual field arguments should be set."
781 )
782
783 # Minor optimization to avoid making a copy if the user passes
784 # in a storage.GetWriteStreamRequest.
785 # There's no risk of modifying the input as we've already verified
786 # there are no flattened fields.
787 if not isinstance(request, storage.GetWriteStreamRequest):
788 request = storage.GetWriteStreamRequest(request)
789 # If we have keyword arguments corresponding to fields on the
790 # request, apply these.
791 if name is not None:
792 request.name = name
793
794 # Wrap the RPC method; this adds retry and timeout information,
795 # and friendly error handling.
796 rpc = self._transport._wrapped_methods[self._transport.get_write_stream]
797
798 # Certain fields should be provided within the metadata header;
799 # add these here.
800 metadata = tuple(metadata) + (
801 gapic_v1.routing_header.to_grpc_metadata((("name", request.name),)),
802 )
803
804 # Send the request.
805 response = rpc(
806 request,
807 retry=retry,
808 timeout=timeout,
809 metadata=metadata,
810 )
811
812 # Done; return the response.
813 return response
814
815 def finalize_write_stream(
816 self,
817 request: Optional[Union[storage.FinalizeWriteStreamRequest, dict]] = None,
818 *,
819 name: Optional[str] = None,
820 retry: OptionalRetry = gapic_v1.method.DEFAULT,
821 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
822 metadata: Sequence[Tuple[str, str]] = (),
823 ) -> storage.FinalizeWriteStreamResponse:
824 r"""Finalize a write stream so that no new data can be appended to
825 the stream. Finalize is not supported on the '_default' stream.
826
827 .. code-block:: python
828
829 # This snippet has been automatically generated and should be regarded as a
830 # code template only.
831 # It will require modifications to work:
832 # - It may require correct/in-range values for request initialization.
833 # - It may require specifying regional endpoints when creating the service
834 # client as shown in:
835 # https://googleapis.dev/python/google-api-core/latest/client_options.html
836 from google.cloud import bigquery_storage_v1
837
838 def sample_finalize_write_stream():
839 # Create a client
840 client = bigquery_storage_v1.BigQueryWriteClient()
841
842 # Initialize request argument(s)
843 request = bigquery_storage_v1.FinalizeWriteStreamRequest(
844 name="name_value",
845 )
846
847 # Make the request
848 response = client.finalize_write_stream(request=request)
849
850 # Handle the response
851 print(response)
852
853 Args:
854 request (Union[google.cloud.bigquery_storage_v1.types.FinalizeWriteStreamRequest, dict]):
855 The request object. Request message for invoking ``FinalizeWriteStream``.
856 name (str):
857 Required. Name of the stream to finalize, in the form of
858 ``projects/{project}/datasets/{dataset}/tables/{table}/streams/{stream}``.
859
860 This corresponds to the ``name`` field
861 on the ``request`` instance; if ``request`` is provided, this
862 should not be set.
863 retry (google.api_core.retry.Retry): Designation of what errors, if any,
864 should be retried.
865 timeout (float): The timeout for this request.
866 metadata (Sequence[Tuple[str, str]]): Strings which should be
867 sent along with the request as metadata.
868
869 Returns:
870 google.cloud.bigquery_storage_v1.types.FinalizeWriteStreamResponse:
871 Response message for FinalizeWriteStream.
872 """
873 # Create or coerce a protobuf request object.
874 # Quick check: If we got a request object, we should *not* have
875 # gotten any keyword arguments that map to the request.
876 has_flattened_params = any([name])
877 if request is not None and has_flattened_params:
878 raise ValueError(
879 "If the `request` argument is set, then none of "
880 "the individual field arguments should be set."
881 )
882
883 # Minor optimization to avoid making a copy if the user passes
884 # in a storage.FinalizeWriteStreamRequest.
885 # There's no risk of modifying the input as we've already verified
886 # there are no flattened fields.
887 if not isinstance(request, storage.FinalizeWriteStreamRequest):
888 request = storage.FinalizeWriteStreamRequest(request)
889 # If we have keyword arguments corresponding to fields on the
890 # request, apply these.
891 if name is not None:
892 request.name = name
893
894 # Wrap the RPC method; this adds retry and timeout information,
895 # and friendly error handling.
896 rpc = self._transport._wrapped_methods[self._transport.finalize_write_stream]
897
898 # Certain fields should be provided within the metadata header;
899 # add these here.
900 metadata = tuple(metadata) + (
901 gapic_v1.routing_header.to_grpc_metadata((("name", request.name),)),
902 )
903
904 # Send the request.
905 response = rpc(
906 request,
907 retry=retry,
908 timeout=timeout,
909 metadata=metadata,
910 )
911
912 # Done; return the response.
913 return response
914
915 def batch_commit_write_streams(
916 self,
917 request: Optional[Union[storage.BatchCommitWriteStreamsRequest, dict]] = None,
918 *,
919 parent: Optional[str] = None,
920 retry: OptionalRetry = gapic_v1.method.DEFAULT,
921 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
922 metadata: Sequence[Tuple[str, str]] = (),
923 ) -> storage.BatchCommitWriteStreamsResponse:
924 r"""Atomically commits a group of ``PENDING`` streams that belong to
925 the same ``parent`` table.
926
927 Streams must be finalized before commit and cannot be committed
928 multiple times. Once a stream is committed, data in the stream
929 becomes available for read operations.
930
931 .. code-block:: python
932
933 # This snippet has been automatically generated and should be regarded as a
934 # code template only.
935 # It will require modifications to work:
936 # - It may require correct/in-range values for request initialization.
937 # - It may require specifying regional endpoints when creating the service
938 # client as shown in:
939 # https://googleapis.dev/python/google-api-core/latest/client_options.html
940 from google.cloud import bigquery_storage_v1
941
942 def sample_batch_commit_write_streams():
943 # Create a client
944 client = bigquery_storage_v1.BigQueryWriteClient()
945
946 # Initialize request argument(s)
947 request = bigquery_storage_v1.BatchCommitWriteStreamsRequest(
948 parent="parent_value",
949 write_streams=['write_streams_value1', 'write_streams_value2'],
950 )
951
952 # Make the request
953 response = client.batch_commit_write_streams(request=request)
954
955 # Handle the response
956 print(response)
957
958 Args:
959 request (Union[google.cloud.bigquery_storage_v1.types.BatchCommitWriteStreamsRequest, dict]):
960 The request object. Request message for ``BatchCommitWriteStreams``.
961 parent (str):
962 Required. Parent table that all the streams should
963 belong to, in the form of
964 ``projects/{project}/datasets/{dataset}/tables/{table}``.
965
966 This corresponds to the ``parent`` field
967 on the ``request`` instance; if ``request`` is provided, this
968 should not be set.
969 retry (google.api_core.retry.Retry): Designation of what errors, if any,
970 should be retried.
971 timeout (float): The timeout for this request.
972 metadata (Sequence[Tuple[str, str]]): Strings which should be
973 sent along with the request as metadata.
974
975 Returns:
976 google.cloud.bigquery_storage_v1.types.BatchCommitWriteStreamsResponse:
977 Response message for BatchCommitWriteStreams.
978 """
979 # Create or coerce a protobuf request object.
980 # Quick check: If we got a request object, we should *not* have
981 # gotten any keyword arguments that map to the request.
982 has_flattened_params = any([parent])
983 if request is not None and has_flattened_params:
984 raise ValueError(
985 "If the `request` argument is set, then none of "
986 "the individual field arguments should be set."
987 )
988
989 # Minor optimization to avoid making a copy if the user passes
990 # in a storage.BatchCommitWriteStreamsRequest.
991 # There's no risk of modifying the input as we've already verified
992 # there are no flattened fields.
993 if not isinstance(request, storage.BatchCommitWriteStreamsRequest):
994 request = storage.BatchCommitWriteStreamsRequest(request)
995 # If we have keyword arguments corresponding to fields on the
996 # request, apply these.
997 if parent is not None:
998 request.parent = parent
999
1000 # Wrap the RPC method; this adds retry and timeout information,
1001 # and friendly error handling.
1002 rpc = self._transport._wrapped_methods[
1003 self._transport.batch_commit_write_streams
1004 ]
1005
1006 # Certain fields should be provided within the metadata header;
1007 # add these here.
1008 metadata = tuple(metadata) + (
1009 gapic_v1.routing_header.to_grpc_metadata((("parent", request.parent),)),
1010 )
1011
1012 # Send the request.
1013 response = rpc(
1014 request,
1015 retry=retry,
1016 timeout=timeout,
1017 metadata=metadata,
1018 )
1019
1020 # Done; return the response.
1021 return response
1022
1023 def flush_rows(
1024 self,
1025 request: Optional[Union[storage.FlushRowsRequest, dict]] = None,
1026 *,
1027 write_stream: Optional[str] = None,
1028 retry: OptionalRetry = gapic_v1.method.DEFAULT,
1029 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
1030 metadata: Sequence[Tuple[str, str]] = (),
1031 ) -> storage.FlushRowsResponse:
1032 r"""Flushes rows to a BUFFERED stream.
1033
1034 If users are appending rows to BUFFERED stream, flush operation
1035 is required in order for the rows to become available for
1036 reading. A Flush operation flushes up to any previously flushed
1037 offset in a BUFFERED stream, to the offset specified in the
1038 request.
1039
1040 Flush is not supported on the \_default stream, since it is not
1041 BUFFERED.
1042
1043 .. code-block:: python
1044
1045 # This snippet has been automatically generated and should be regarded as a
1046 # code template only.
1047 # It will require modifications to work:
1048 # - It may require correct/in-range values for request initialization.
1049 # - It may require specifying regional endpoints when creating the service
1050 # client as shown in:
1051 # https://googleapis.dev/python/google-api-core/latest/client_options.html
1052 from google.cloud import bigquery_storage_v1
1053
1054 def sample_flush_rows():
1055 # Create a client
1056 client = bigquery_storage_v1.BigQueryWriteClient()
1057
1058 # Initialize request argument(s)
1059 request = bigquery_storage_v1.FlushRowsRequest(
1060 write_stream="write_stream_value",
1061 )
1062
1063 # Make the request
1064 response = client.flush_rows(request=request)
1065
1066 # Handle the response
1067 print(response)
1068
1069 Args:
1070 request (Union[google.cloud.bigquery_storage_v1.types.FlushRowsRequest, dict]):
1071 The request object. Request message for ``FlushRows``.
1072 write_stream (str):
1073 Required. The stream that is the
1074 target of the flush operation.
1075
1076 This corresponds to the ``write_stream`` field
1077 on the ``request`` instance; if ``request`` is provided, this
1078 should not be set.
1079 retry (google.api_core.retry.Retry): Designation of what errors, if any,
1080 should be retried.
1081 timeout (float): The timeout for this request.
1082 metadata (Sequence[Tuple[str, str]]): Strings which should be
1083 sent along with the request as metadata.
1084
1085 Returns:
1086 google.cloud.bigquery_storage_v1.types.FlushRowsResponse:
1087 Respond message for FlushRows.
1088 """
1089 # Create or coerce a protobuf request object.
1090 # Quick check: If we got a request object, we should *not* have
1091 # gotten any keyword arguments that map to the request.
1092 has_flattened_params = any([write_stream])
1093 if request is not None and has_flattened_params:
1094 raise ValueError(
1095 "If the `request` argument is set, then none of "
1096 "the individual field arguments should be set."
1097 )
1098
1099 # Minor optimization to avoid making a copy if the user passes
1100 # in a storage.FlushRowsRequest.
1101 # There's no risk of modifying the input as we've already verified
1102 # there are no flattened fields.
1103 if not isinstance(request, storage.FlushRowsRequest):
1104 request = storage.FlushRowsRequest(request)
1105 # If we have keyword arguments corresponding to fields on the
1106 # request, apply these.
1107 if write_stream is not None:
1108 request.write_stream = write_stream
1109
1110 # Wrap the RPC method; this adds retry and timeout information,
1111 # and friendly error handling.
1112 rpc = self._transport._wrapped_methods[self._transport.flush_rows]
1113
1114 # Certain fields should be provided within the metadata header;
1115 # add these here.
1116 metadata = tuple(metadata) + (
1117 gapic_v1.routing_header.to_grpc_metadata(
1118 (("write_stream", request.write_stream),)
1119 ),
1120 )
1121
1122 # Send the request.
1123 response = rpc(
1124 request,
1125 retry=retry,
1126 timeout=timeout,
1127 metadata=metadata,
1128 )
1129
1130 # Done; return the response.
1131 return response
1132
1133 def __enter__(self) -> "BigQueryWriteClient":
1134 return self
1135
1136 def __exit__(self, type, value, traceback):
1137 """Releases underlying transport's resources.
1138
1139 .. warning::
1140 ONLY use as a context manager if the transport is NOT shared
1141 with other clients! Exiting the with block will CLOSE the transport
1142 and may cause errors in other clients!
1143 """
1144 self.transport.close()
1145
1146
1147DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo(
1148 gapic_version=package_version.__version__
1149)
1150
1151
1152__all__ = ("BigQueryWriteClient",)