Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/cloud/bigquery_storage_v1/services/big_query_write/client.py: 39%

226 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:10 +0000

1# -*- coding: utf-8 -*- 

2# Copyright 2022 Google LLC 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16from collections import OrderedDict 

17import os 

18import re 

19from typing import ( 

20 Dict, 

21 Mapping, 

22 MutableMapping, 

23 MutableSequence, 

24 Optional, 

25 Iterable, 

26 Iterator, 

27 Sequence, 

28 Tuple, 

29 Type, 

30 Union, 

31 cast, 

32) 

33 

34from google.cloud.bigquery_storage_v1 import gapic_version as package_version 

35 

36from google.api_core import client_options as client_options_lib 

37from google.api_core import exceptions as core_exceptions 

38from google.api_core import gapic_v1 

39from google.api_core import retry as retries 

40from google.auth import credentials as ga_credentials # type: ignore 

41from google.auth.transport import mtls # type: ignore 

42from google.auth.transport.grpc import SslCredentials # type: ignore 

43from google.auth.exceptions import MutualTLSChannelError # type: ignore 

44from google.oauth2 import service_account # type: ignore 

45 

46try: 

47 OptionalRetry = Union[retries.Retry, gapic_v1.method._MethodDefault] 

48except AttributeError: # pragma: NO COVER 

49 OptionalRetry = Union[retries.Retry, object] # type: ignore 

50 

51from google.cloud.bigquery_storage_v1.types import storage 

52from google.cloud.bigquery_storage_v1.types import stream 

53from google.cloud.bigquery_storage_v1.types import table 

54from google.protobuf import timestamp_pb2 # type: ignore 

55from google.rpc import status_pb2 # type: ignore 

56from .transports.base import BigQueryWriteTransport, DEFAULT_CLIENT_INFO 

57from .transports.grpc import BigQueryWriteGrpcTransport 

58from .transports.grpc_asyncio import BigQueryWriteGrpcAsyncIOTransport 

59 

60 

61class BigQueryWriteClientMeta(type): 

62 """Metaclass for the BigQueryWrite client. 

63 

64 This provides class-level methods for building and retrieving 

65 support objects (e.g. transport) without polluting the client instance 

66 objects. 

67 """ 

68 

69 _transport_registry = OrderedDict() # type: Dict[str, Type[BigQueryWriteTransport]] 

70 _transport_registry["grpc"] = BigQueryWriteGrpcTransport 

71 _transport_registry["grpc_asyncio"] = BigQueryWriteGrpcAsyncIOTransport 

72 

73 def get_transport_class( 

74 cls, 

75 label: Optional[str] = None, 

76 ) -> Type[BigQueryWriteTransport]: 

77 """Returns an appropriate transport class. 

78 

79 Args: 

80 label: The name of the desired transport. If none is 

81 provided, then the first transport in the registry is used. 

82 

83 Returns: 

84 The transport class to use. 

85 """ 

86 # If a specific transport is requested, return that one. 

87 if label: 

88 return cls._transport_registry[label] 

89 

90 # No transport is requested; return the default (that is, the first one 

91 # in the dictionary). 

92 return next(iter(cls._transport_registry.values())) 

93 

94 

95class BigQueryWriteClient(metaclass=BigQueryWriteClientMeta): 

96 """BigQuery Write API. 

97 The Write API can be used to write data to BigQuery. 

98 For supplementary information about the Write API, see: 

99 https://cloud.google.com/bigquery/docs/write-api 

100 """ 

101 

102 @staticmethod 

103 def _get_default_mtls_endpoint(api_endpoint): 

104 """Converts api endpoint to mTLS endpoint. 

105 

106 Convert "*.sandbox.googleapis.com" and "*.googleapis.com" to 

107 "*.mtls.sandbox.googleapis.com" and "*.mtls.googleapis.com" respectively. 

108 Args: 

109 api_endpoint (Optional[str]): the api endpoint to convert. 

110 Returns: 

111 str: converted mTLS api endpoint. 

112 """ 

113 if not api_endpoint: 

114 return api_endpoint 

115 

116 mtls_endpoint_re = re.compile( 

117 r"(?P<name>[^.]+)(?P<mtls>\.mtls)?(?P<sandbox>\.sandbox)?(?P<googledomain>\.googleapis\.com)?" 

118 ) 

119 

120 m = mtls_endpoint_re.match(api_endpoint) 

121 name, mtls, sandbox, googledomain = m.groups() 

122 if mtls or not googledomain: 

123 return api_endpoint 

124 

125 if sandbox: 

126 return api_endpoint.replace( 

127 "sandbox.googleapis.com", "mtls.sandbox.googleapis.com" 

128 ) 

129 

130 return api_endpoint.replace(".googleapis.com", ".mtls.googleapis.com") 

131 

132 DEFAULT_ENDPOINT = "bigquerystorage.googleapis.com" 

133 DEFAULT_MTLS_ENDPOINT = _get_default_mtls_endpoint.__func__( # type: ignore 

134 DEFAULT_ENDPOINT 

135 ) 

136 

137 @classmethod 

138 def from_service_account_info(cls, info: dict, *args, **kwargs): 

139 """Creates an instance of this client using the provided credentials 

140 info. 

141 

142 Args: 

143 info (dict): The service account private key info. 

144 args: Additional arguments to pass to the constructor. 

145 kwargs: Additional arguments to pass to the constructor. 

146 

147 Returns: 

148 BigQueryWriteClient: The constructed client. 

149 """ 

150 credentials = service_account.Credentials.from_service_account_info(info) 

151 kwargs["credentials"] = credentials 

152 return cls(*args, **kwargs) 

153 

154 @classmethod 

155 def from_service_account_file(cls, filename: str, *args, **kwargs): 

156 """Creates an instance of this client using the provided credentials 

157 file. 

158 

159 Args: 

160 filename (str): The path to the service account private key json 

161 file. 

162 args: Additional arguments to pass to the constructor. 

163 kwargs: Additional arguments to pass to the constructor. 

164 

165 Returns: 

166 BigQueryWriteClient: The constructed client. 

167 """ 

168 credentials = service_account.Credentials.from_service_account_file(filename) 

169 kwargs["credentials"] = credentials 

170 return cls(*args, **kwargs) 

171 

172 from_service_account_json = from_service_account_file 

173 

174 @property 

175 def transport(self) -> BigQueryWriteTransport: 

176 """Returns the transport used by the client instance. 

177 

178 Returns: 

179 BigQueryWriteTransport: The transport used by the client 

180 instance. 

181 """ 

182 return self._transport 

183 

184 @staticmethod 

185 def table_path( 

186 project: str, 

187 dataset: str, 

188 table: str, 

189 ) -> str: 

190 """Returns a fully-qualified table string.""" 

191 return "projects/{project}/datasets/{dataset}/tables/{table}".format( 

192 project=project, 

193 dataset=dataset, 

194 table=table, 

195 ) 

196 

197 @staticmethod 

198 def parse_table_path(path: str) -> Dict[str, str]: 

199 """Parses a table path into its component segments.""" 

200 m = re.match( 

201 r"^projects/(?P<project>.+?)/datasets/(?P<dataset>.+?)/tables/(?P<table>.+?)$", 

202 path, 

203 ) 

204 return m.groupdict() if m else {} 

205 

206 @staticmethod 

207 def write_stream_path( 

208 project: str, 

209 dataset: str, 

210 table: str, 

211 stream: str, 

212 ) -> str: 

213 """Returns a fully-qualified write_stream string.""" 

214 return "projects/{project}/datasets/{dataset}/tables/{table}/streams/{stream}".format( 

215 project=project, 

216 dataset=dataset, 

217 table=table, 

218 stream=stream, 

219 ) 

220 

221 @staticmethod 

222 def parse_write_stream_path(path: str) -> Dict[str, str]: 

223 """Parses a write_stream path into its component segments.""" 

224 m = re.match( 

225 r"^projects/(?P<project>.+?)/datasets/(?P<dataset>.+?)/tables/(?P<table>.+?)/streams/(?P<stream>.+?)$", 

226 path, 

227 ) 

228 return m.groupdict() if m else {} 

229 

230 @staticmethod 

231 def common_billing_account_path( 

232 billing_account: str, 

233 ) -> str: 

234 """Returns a fully-qualified billing_account string.""" 

235 return "billingAccounts/{billing_account}".format( 

236 billing_account=billing_account, 

237 ) 

238 

239 @staticmethod 

240 def parse_common_billing_account_path(path: str) -> Dict[str, str]: 

241 """Parse a billing_account path into its component segments.""" 

242 m = re.match(r"^billingAccounts/(?P<billing_account>.+?)$", path) 

243 return m.groupdict() if m else {} 

244 

245 @staticmethod 

246 def common_folder_path( 

247 folder: str, 

248 ) -> str: 

249 """Returns a fully-qualified folder string.""" 

250 return "folders/{folder}".format( 

251 folder=folder, 

252 ) 

253 

254 @staticmethod 

255 def parse_common_folder_path(path: str) -> Dict[str, str]: 

256 """Parse a folder path into its component segments.""" 

257 m = re.match(r"^folders/(?P<folder>.+?)$", path) 

258 return m.groupdict() if m else {} 

259 

260 @staticmethod 

261 def common_organization_path( 

262 organization: str, 

263 ) -> str: 

264 """Returns a fully-qualified organization string.""" 

265 return "organizations/{organization}".format( 

266 organization=organization, 

267 ) 

268 

269 @staticmethod 

270 def parse_common_organization_path(path: str) -> Dict[str, str]: 

271 """Parse a organization path into its component segments.""" 

272 m = re.match(r"^organizations/(?P<organization>.+?)$", path) 

273 return m.groupdict() if m else {} 

274 

275 @staticmethod 

276 def common_project_path( 

277 project: str, 

278 ) -> str: 

279 """Returns a fully-qualified project string.""" 

280 return "projects/{project}".format( 

281 project=project, 

282 ) 

283 

284 @staticmethod 

285 def parse_common_project_path(path: str) -> Dict[str, str]: 

286 """Parse a project path into its component segments.""" 

287 m = re.match(r"^projects/(?P<project>.+?)$", path) 

288 return m.groupdict() if m else {} 

289 

290 @staticmethod 

291 def common_location_path( 

292 project: str, 

293 location: str, 

294 ) -> str: 

295 """Returns a fully-qualified location string.""" 

296 return "projects/{project}/locations/{location}".format( 

297 project=project, 

298 location=location, 

299 ) 

300 

301 @staticmethod 

302 def parse_common_location_path(path: str) -> Dict[str, str]: 

303 """Parse a location path into its component segments.""" 

304 m = re.match(r"^projects/(?P<project>.+?)/locations/(?P<location>.+?)$", path) 

305 return m.groupdict() if m else {} 

306 

307 @classmethod 

308 def get_mtls_endpoint_and_cert_source( 

309 cls, client_options: Optional[client_options_lib.ClientOptions] = None 

310 ): 

311 """Return the API endpoint and client cert source for mutual TLS. 

312 

313 The client cert source is determined in the following order: 

314 (1) if `GOOGLE_API_USE_CLIENT_CERTIFICATE` environment variable is not "true", the 

315 client cert source is None. 

316 (2) if `client_options.client_cert_source` is provided, use the provided one; if the 

317 default client cert source exists, use the default one; otherwise the client cert 

318 source is None. 

319 

320 The API endpoint is determined in the following order: 

321 (1) if `client_options.api_endpoint` if provided, use the provided one. 

322 (2) if `GOOGLE_API_USE_CLIENT_CERTIFICATE` environment variable is "always", use the 

323 default mTLS endpoint; if the environment variable is "never", use the default API 

324 endpoint; otherwise if client cert source exists, use the default mTLS endpoint, otherwise 

325 use the default API endpoint. 

326 

327 More details can be found at https://google.aip.dev/auth/4114. 

328 

329 Args: 

330 client_options (google.api_core.client_options.ClientOptions): Custom options for the 

331 client. Only the `api_endpoint` and `client_cert_source` properties may be used 

332 in this method. 

333 

334 Returns: 

335 Tuple[str, Callable[[], Tuple[bytes, bytes]]]: returns the API endpoint and the 

336 client cert source to use. 

337 

338 Raises: 

339 google.auth.exceptions.MutualTLSChannelError: If any errors happen. 

340 """ 

341 if client_options is None: 

342 client_options = client_options_lib.ClientOptions() 

343 use_client_cert = os.getenv("GOOGLE_API_USE_CLIENT_CERTIFICATE", "false") 

344 use_mtls_endpoint = os.getenv("GOOGLE_API_USE_MTLS_ENDPOINT", "auto") 

345 if use_client_cert not in ("true", "false"): 

346 raise ValueError( 

347 "Environment variable `GOOGLE_API_USE_CLIENT_CERTIFICATE` must be either `true` or `false`" 

348 ) 

349 if use_mtls_endpoint not in ("auto", "never", "always"): 

350 raise MutualTLSChannelError( 

351 "Environment variable `GOOGLE_API_USE_MTLS_ENDPOINT` must be `never`, `auto` or `always`" 

352 ) 

353 

354 # Figure out the client cert source to use. 

355 client_cert_source = None 

356 if use_client_cert == "true": 

357 if client_options.client_cert_source: 

358 client_cert_source = client_options.client_cert_source 

359 elif mtls.has_default_client_cert_source(): 

360 client_cert_source = mtls.default_client_cert_source() 

361 

362 # Figure out which api endpoint to use. 

363 if client_options.api_endpoint is not None: 

364 api_endpoint = client_options.api_endpoint 

365 elif use_mtls_endpoint == "always" or ( 

366 use_mtls_endpoint == "auto" and client_cert_source 

367 ): 

368 api_endpoint = cls.DEFAULT_MTLS_ENDPOINT 

369 else: 

370 api_endpoint = cls.DEFAULT_ENDPOINT 

371 

372 return api_endpoint, client_cert_source 

373 

374 def __init__( 

375 self, 

376 *, 

377 credentials: Optional[ga_credentials.Credentials] = None, 

378 transport: Optional[Union[str, BigQueryWriteTransport]] = None, 

379 client_options: Optional[Union[client_options_lib.ClientOptions, dict]] = None, 

380 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, 

381 ) -> None: 

382 """Instantiates the big query write client. 

383 

384 Args: 

385 credentials (Optional[google.auth.credentials.Credentials]): The 

386 authorization credentials to attach to requests. These 

387 credentials identify the application to the service; if none 

388 are specified, the client will attempt to ascertain the 

389 credentials from the environment. 

390 transport (Union[str, BigQueryWriteTransport]): The 

391 transport to use. If set to None, a transport is chosen 

392 automatically. 

393 client_options (Optional[Union[google.api_core.client_options.ClientOptions, dict]]): Custom options for the 

394 client. It won't take effect if a ``transport`` instance is provided. 

395 (1) The ``api_endpoint`` property can be used to override the 

396 default endpoint provided by the client. GOOGLE_API_USE_MTLS_ENDPOINT 

397 environment variable can also be used to override the endpoint: 

398 "always" (always use the default mTLS endpoint), "never" (always 

399 use the default regular endpoint) and "auto" (auto switch to the 

400 default mTLS endpoint if client certificate is present, this is 

401 the default value). However, the ``api_endpoint`` property takes 

402 precedence if provided. 

403 (2) If GOOGLE_API_USE_CLIENT_CERTIFICATE environment variable 

404 is "true", then the ``client_cert_source`` property can be used 

405 to provide client certificate for mutual TLS transport. If 

406 not provided, the default SSL client certificate will be used if 

407 present. If GOOGLE_API_USE_CLIENT_CERTIFICATE is "false" or not 

408 set, no client certificate will be used. 

409 client_info (google.api_core.gapic_v1.client_info.ClientInfo): 

410 The client info used to send a user-agent string along with 

411 API requests. If ``None``, then default info will be used. 

412 Generally, you only need to set this if you're developing 

413 your own client library. 

414 

415 Raises: 

416 google.auth.exceptions.MutualTLSChannelError: If mutual TLS transport 

417 creation failed for any reason. 

418 """ 

419 if isinstance(client_options, dict): 

420 client_options = client_options_lib.from_dict(client_options) 

421 if client_options is None: 

422 client_options = client_options_lib.ClientOptions() 

423 client_options = cast(client_options_lib.ClientOptions, client_options) 

424 

425 api_endpoint, client_cert_source_func = self.get_mtls_endpoint_and_cert_source( 

426 client_options 

427 ) 

428 

429 api_key_value = getattr(client_options, "api_key", None) 

430 if api_key_value and credentials: 

431 raise ValueError( 

432 "client_options.api_key and credentials are mutually exclusive" 

433 ) 

434 

435 # Save or instantiate the transport. 

436 # Ordinarily, we provide the transport, but allowing a custom transport 

437 # instance provides an extensibility point for unusual situations. 

438 if isinstance(transport, BigQueryWriteTransport): 

439 # transport is a BigQueryWriteTransport instance. 

440 if credentials or client_options.credentials_file or api_key_value: 

441 raise ValueError( 

442 "When providing a transport instance, " 

443 "provide its credentials directly." 

444 ) 

445 if client_options.scopes: 

446 raise ValueError( 

447 "When providing a transport instance, provide its scopes " 

448 "directly." 

449 ) 

450 self._transport = transport 

451 else: 

452 import google.auth._default # type: ignore 

453 

454 if api_key_value and hasattr( 

455 google.auth._default, "get_api_key_credentials" 

456 ): 

457 credentials = google.auth._default.get_api_key_credentials( 

458 api_key_value 

459 ) 

460 

461 Transport = type(self).get_transport_class(transport) 

462 self._transport = Transport( 

463 credentials=credentials, 

464 credentials_file=client_options.credentials_file, 

465 host=api_endpoint, 

466 scopes=client_options.scopes, 

467 client_cert_source_for_mtls=client_cert_source_func, 

468 quota_project_id=client_options.quota_project_id, 

469 client_info=client_info, 

470 always_use_jwt_access=True, 

471 api_audience=client_options.api_audience, 

472 ) 

473 

474 def create_write_stream( 

475 self, 

476 request: Optional[Union[storage.CreateWriteStreamRequest, dict]] = None, 

477 *, 

478 parent: Optional[str] = None, 

479 write_stream: Optional[stream.WriteStream] = None, 

480 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

481 timeout: Union[float, object] = gapic_v1.method.DEFAULT, 

482 metadata: Sequence[Tuple[str, str]] = (), 

483 ) -> stream.WriteStream: 

484 r"""Creates a write stream to the given table. Additionally, every 

485 table has a special stream named '_default' to which data can be 

486 written. This stream doesn't need to be created using 

487 CreateWriteStream. It is a stream that can be used 

488 simultaneously by any number of clients. Data written to this 

489 stream is considered committed as soon as an acknowledgement is 

490 received. 

491 

492 .. code-block:: python 

493 

494 # This snippet has been automatically generated and should be regarded as a 

495 # code template only. 

496 # It will require modifications to work: 

497 # - It may require correct/in-range values for request initialization. 

498 # - It may require specifying regional endpoints when creating the service 

499 # client as shown in: 

500 # https://googleapis.dev/python/google-api-core/latest/client_options.html 

501 from google.cloud import bigquery_storage_v1 

502 

503 def sample_create_write_stream(): 

504 # Create a client 

505 client = bigquery_storage_v1.BigQueryWriteClient() 

506 

507 # Initialize request argument(s) 

508 request = bigquery_storage_v1.CreateWriteStreamRequest( 

509 parent="parent_value", 

510 ) 

511 

512 # Make the request 

513 response = client.create_write_stream(request=request) 

514 

515 # Handle the response 

516 print(response) 

517 

518 Args: 

519 request (Union[google.cloud.bigquery_storage_v1.types.CreateWriteStreamRequest, dict]): 

520 The request object. Request message for ``CreateWriteStream``. 

521 parent (str): 

522 Required. Reference to the table to which the stream 

523 belongs, in the format of 

524 ``projects/{project}/datasets/{dataset}/tables/{table}``. 

525 

526 This corresponds to the ``parent`` field 

527 on the ``request`` instance; if ``request`` is provided, this 

528 should not be set. 

529 write_stream (google.cloud.bigquery_storage_v1.types.WriteStream): 

530 Required. Stream to be created. 

531 This corresponds to the ``write_stream`` field 

532 on the ``request`` instance; if ``request`` is provided, this 

533 should not be set. 

534 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

535 should be retried. 

536 timeout (float): The timeout for this request. 

537 metadata (Sequence[Tuple[str, str]]): Strings which should be 

538 sent along with the request as metadata. 

539 

540 Returns: 

541 google.cloud.bigquery_storage_v1.types.WriteStream: 

542 Information about a single stream 

543 that gets data inside the storage 

544 system. 

545 

546 """ 

547 # Create or coerce a protobuf request object. 

548 # Quick check: If we got a request object, we should *not* have 

549 # gotten any keyword arguments that map to the request. 

550 has_flattened_params = any([parent, write_stream]) 

551 if request is not None and has_flattened_params: 

552 raise ValueError( 

553 "If the `request` argument is set, then none of " 

554 "the individual field arguments should be set." 

555 ) 

556 

557 # Minor optimization to avoid making a copy if the user passes 

558 # in a storage.CreateWriteStreamRequest. 

559 # There's no risk of modifying the input as we've already verified 

560 # there are no flattened fields. 

561 if not isinstance(request, storage.CreateWriteStreamRequest): 

562 request = storage.CreateWriteStreamRequest(request) 

563 # If we have keyword arguments corresponding to fields on the 

564 # request, apply these. 

565 if parent is not None: 

566 request.parent = parent 

567 if write_stream is not None: 

568 request.write_stream = write_stream 

569 

570 # Wrap the RPC method; this adds retry and timeout information, 

571 # and friendly error handling. 

572 rpc = self._transport._wrapped_methods[self._transport.create_write_stream] 

573 

574 # Certain fields should be provided within the metadata header; 

575 # add these here. 

576 metadata = tuple(metadata) + ( 

577 gapic_v1.routing_header.to_grpc_metadata((("parent", request.parent),)), 

578 ) 

579 

580 # Send the request. 

581 response = rpc( 

582 request, 

583 retry=retry, 

584 timeout=timeout, 

585 metadata=metadata, 

586 ) 

587 

588 # Done; return the response. 

589 return response 

590 

591 def append_rows( 

592 self, 

593 requests: Optional[Iterator[storage.AppendRowsRequest]] = None, 

594 *, 

595 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

596 timeout: Union[float, object] = gapic_v1.method.DEFAULT, 

597 metadata: Sequence[Tuple[str, str]] = (), 

598 ) -> Iterable[storage.AppendRowsResponse]: 

599 r"""Appends data to the given stream. 

600 

601 If ``offset`` is specified, the ``offset`` is checked against 

602 the end of stream. The server returns ``OUT_OF_RANGE`` in 

603 ``AppendRowsResponse`` if an attempt is made to append to an 

604 offset beyond the current end of the stream or 

605 ``ALREADY_EXISTS`` if user provides an ``offset`` that has 

606 already been written to. User can retry with adjusted offset 

607 within the same RPC connection. If ``offset`` is not specified, 

608 append happens at the end of the stream. 

609 

610 The response contains an optional offset at which the append 

611 happened. No offset information will be returned for appends to 

612 a default stream. 

613 

614 Responses are received in the same order in which requests are 

615 sent. There will be one response for each successful inserted 

616 request. Responses may optionally embed error information if the 

617 originating AppendRequest was not successfully processed. 

618 

619 The specifics of when successfully appended data is made visible 

620 to the table are governed by the type of stream: 

621 

622 - For COMMITTED streams (which includes the default stream), 

623 data is visible immediately upon successful append. 

624 

625 - For BUFFERED streams, data is made visible via a subsequent 

626 ``FlushRows`` rpc which advances a cursor to a newer offset 

627 in the stream. 

628 

629 - For PENDING streams, data is not made visible until the 

630 stream itself is finalized (via the ``FinalizeWriteStream`` 

631 rpc), and the stream is explicitly committed via the 

632 ``BatchCommitWriteStreams`` rpc. 

633 

634 .. code-block:: python 

635 

636 # This snippet has been automatically generated and should be regarded as a 

637 # code template only. 

638 # It will require modifications to work: 

639 # - It may require correct/in-range values for request initialization. 

640 # - It may require specifying regional endpoints when creating the service 

641 # client as shown in: 

642 # https://googleapis.dev/python/google-api-core/latest/client_options.html 

643 from google.cloud import bigquery_storage_v1 

644 

645 def sample_append_rows(): 

646 # Create a client 

647 client = bigquery_storage_v1.BigQueryWriteClient() 

648 

649 # Initialize request argument(s) 

650 request = bigquery_storage_v1.AppendRowsRequest( 

651 write_stream="write_stream_value", 

652 ) 

653 

654 # This method expects an iterator which contains 

655 # 'bigquery_storage_v1.AppendRowsRequest' objects 

656 # Here we create a generator that yields a single `request` for 

657 # demonstrative purposes. 

658 requests = [request] 

659 

660 def request_generator(): 

661 for request in requests: 

662 yield request 

663 

664 # Make the request 

665 stream = client.append_rows(requests=request_generator()) 

666 

667 # Handle the response 

668 for response in stream: 

669 print(response) 

670 

671 Args: 

672 requests (Iterator[google.cloud.bigquery_storage_v1.types.AppendRowsRequest]): 

673 The request object iterator. Request message for ``AppendRows``. 

674 

675 Due to the nature of AppendRows being a bidirectional 

676 streaming RPC, certain parts of the AppendRowsRequest 

677 need only be specified for the first request sent each 

678 time the gRPC network connection is opened/reopened. 

679 

680 The size of a single AppendRowsRequest must be less than 

681 10 MB in size. Requests larger than this return an 

682 error, typically ``INVALID_ARGUMENT``. 

683 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

684 should be retried. 

685 timeout (float): The timeout for this request. 

686 metadata (Sequence[Tuple[str, str]]): Strings which should be 

687 sent along with the request as metadata. 

688 

689 Returns: 

690 Iterable[google.cloud.bigquery_storage_v1.types.AppendRowsResponse]: 

691 Response message for AppendRows. 

692 """ 

693 

694 # Wrap the RPC method; this adds retry and timeout information, 

695 # and friendly error handling. 

696 rpc = self._transport._wrapped_methods[self._transport.append_rows] 

697 

698 # Certain fields should be provided within the metadata header; 

699 # add these here. 

700 metadata = tuple(metadata) + (gapic_v1.routing_header.to_grpc_metadata(()),) 

701 

702 # Send the request. 

703 response = rpc( 

704 requests, 

705 retry=retry, 

706 timeout=timeout, 

707 metadata=metadata, 

708 ) 

709 

710 # Done; return the response. 

711 return response 

712 

713 def get_write_stream( 

714 self, 

715 request: Optional[Union[storage.GetWriteStreamRequest, dict]] = None, 

716 *, 

717 name: Optional[str] = None, 

718 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

719 timeout: Union[float, object] = gapic_v1.method.DEFAULT, 

720 metadata: Sequence[Tuple[str, str]] = (), 

721 ) -> stream.WriteStream: 

722 r"""Gets information about a write stream. 

723 

724 .. code-block:: python 

725 

726 # This snippet has been automatically generated and should be regarded as a 

727 # code template only. 

728 # It will require modifications to work: 

729 # - It may require correct/in-range values for request initialization. 

730 # - It may require specifying regional endpoints when creating the service 

731 # client as shown in: 

732 # https://googleapis.dev/python/google-api-core/latest/client_options.html 

733 from google.cloud import bigquery_storage_v1 

734 

735 def sample_get_write_stream(): 

736 # Create a client 

737 client = bigquery_storage_v1.BigQueryWriteClient() 

738 

739 # Initialize request argument(s) 

740 request = bigquery_storage_v1.GetWriteStreamRequest( 

741 name="name_value", 

742 ) 

743 

744 # Make the request 

745 response = client.get_write_stream(request=request) 

746 

747 # Handle the response 

748 print(response) 

749 

750 Args: 

751 request (Union[google.cloud.bigquery_storage_v1.types.GetWriteStreamRequest, dict]): 

752 The request object. Request message for ``GetWriteStreamRequest``. 

753 name (str): 

754 Required. Name of the stream to get, in the form of 

755 ``projects/{project}/datasets/{dataset}/tables/{table}/streams/{stream}``. 

756 

757 This corresponds to the ``name`` field 

758 on the ``request`` instance; if ``request`` is provided, this 

759 should not be set. 

760 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

761 should be retried. 

762 timeout (float): The timeout for this request. 

763 metadata (Sequence[Tuple[str, str]]): Strings which should be 

764 sent along with the request as metadata. 

765 

766 Returns: 

767 google.cloud.bigquery_storage_v1.types.WriteStream: 

768 Information about a single stream 

769 that gets data inside the storage 

770 system. 

771 

772 """ 

773 # Create or coerce a protobuf request object. 

774 # Quick check: If we got a request object, we should *not* have 

775 # gotten any keyword arguments that map to the request. 

776 has_flattened_params = any([name]) 

777 if request is not None and has_flattened_params: 

778 raise ValueError( 

779 "If the `request` argument is set, then none of " 

780 "the individual field arguments should be set." 

781 ) 

782 

783 # Minor optimization to avoid making a copy if the user passes 

784 # in a storage.GetWriteStreamRequest. 

785 # There's no risk of modifying the input as we've already verified 

786 # there are no flattened fields. 

787 if not isinstance(request, storage.GetWriteStreamRequest): 

788 request = storage.GetWriteStreamRequest(request) 

789 # If we have keyword arguments corresponding to fields on the 

790 # request, apply these. 

791 if name is not None: 

792 request.name = name 

793 

794 # Wrap the RPC method; this adds retry and timeout information, 

795 # and friendly error handling. 

796 rpc = self._transport._wrapped_methods[self._transport.get_write_stream] 

797 

798 # Certain fields should be provided within the metadata header; 

799 # add these here. 

800 metadata = tuple(metadata) + ( 

801 gapic_v1.routing_header.to_grpc_metadata((("name", request.name),)), 

802 ) 

803 

804 # Send the request. 

805 response = rpc( 

806 request, 

807 retry=retry, 

808 timeout=timeout, 

809 metadata=metadata, 

810 ) 

811 

812 # Done; return the response. 

813 return response 

814 

815 def finalize_write_stream( 

816 self, 

817 request: Optional[Union[storage.FinalizeWriteStreamRequest, dict]] = None, 

818 *, 

819 name: Optional[str] = None, 

820 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

821 timeout: Union[float, object] = gapic_v1.method.DEFAULT, 

822 metadata: Sequence[Tuple[str, str]] = (), 

823 ) -> storage.FinalizeWriteStreamResponse: 

824 r"""Finalize a write stream so that no new data can be appended to 

825 the stream. Finalize is not supported on the '_default' stream. 

826 

827 .. code-block:: python 

828 

829 # This snippet has been automatically generated and should be regarded as a 

830 # code template only. 

831 # It will require modifications to work: 

832 # - It may require correct/in-range values for request initialization. 

833 # - It may require specifying regional endpoints when creating the service 

834 # client as shown in: 

835 # https://googleapis.dev/python/google-api-core/latest/client_options.html 

836 from google.cloud import bigquery_storage_v1 

837 

838 def sample_finalize_write_stream(): 

839 # Create a client 

840 client = bigquery_storage_v1.BigQueryWriteClient() 

841 

842 # Initialize request argument(s) 

843 request = bigquery_storage_v1.FinalizeWriteStreamRequest( 

844 name="name_value", 

845 ) 

846 

847 # Make the request 

848 response = client.finalize_write_stream(request=request) 

849 

850 # Handle the response 

851 print(response) 

852 

853 Args: 

854 request (Union[google.cloud.bigquery_storage_v1.types.FinalizeWriteStreamRequest, dict]): 

855 The request object. Request message for invoking ``FinalizeWriteStream``. 

856 name (str): 

857 Required. Name of the stream to finalize, in the form of 

858 ``projects/{project}/datasets/{dataset}/tables/{table}/streams/{stream}``. 

859 

860 This corresponds to the ``name`` field 

861 on the ``request`` instance; if ``request`` is provided, this 

862 should not be set. 

863 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

864 should be retried. 

865 timeout (float): The timeout for this request. 

866 metadata (Sequence[Tuple[str, str]]): Strings which should be 

867 sent along with the request as metadata. 

868 

869 Returns: 

870 google.cloud.bigquery_storage_v1.types.FinalizeWriteStreamResponse: 

871 Response message for FinalizeWriteStream. 

872 """ 

873 # Create or coerce a protobuf request object. 

874 # Quick check: If we got a request object, we should *not* have 

875 # gotten any keyword arguments that map to the request. 

876 has_flattened_params = any([name]) 

877 if request is not None and has_flattened_params: 

878 raise ValueError( 

879 "If the `request` argument is set, then none of " 

880 "the individual field arguments should be set." 

881 ) 

882 

883 # Minor optimization to avoid making a copy if the user passes 

884 # in a storage.FinalizeWriteStreamRequest. 

885 # There's no risk of modifying the input as we've already verified 

886 # there are no flattened fields. 

887 if not isinstance(request, storage.FinalizeWriteStreamRequest): 

888 request = storage.FinalizeWriteStreamRequest(request) 

889 # If we have keyword arguments corresponding to fields on the 

890 # request, apply these. 

891 if name is not None: 

892 request.name = name 

893 

894 # Wrap the RPC method; this adds retry and timeout information, 

895 # and friendly error handling. 

896 rpc = self._transport._wrapped_methods[self._transport.finalize_write_stream] 

897 

898 # Certain fields should be provided within the metadata header; 

899 # add these here. 

900 metadata = tuple(metadata) + ( 

901 gapic_v1.routing_header.to_grpc_metadata((("name", request.name),)), 

902 ) 

903 

904 # Send the request. 

905 response = rpc( 

906 request, 

907 retry=retry, 

908 timeout=timeout, 

909 metadata=metadata, 

910 ) 

911 

912 # Done; return the response. 

913 return response 

914 

915 def batch_commit_write_streams( 

916 self, 

917 request: Optional[Union[storage.BatchCommitWriteStreamsRequest, dict]] = None, 

918 *, 

919 parent: Optional[str] = None, 

920 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

921 timeout: Union[float, object] = gapic_v1.method.DEFAULT, 

922 metadata: Sequence[Tuple[str, str]] = (), 

923 ) -> storage.BatchCommitWriteStreamsResponse: 

924 r"""Atomically commits a group of ``PENDING`` streams that belong to 

925 the same ``parent`` table. 

926 

927 Streams must be finalized before commit and cannot be committed 

928 multiple times. Once a stream is committed, data in the stream 

929 becomes available for read operations. 

930 

931 .. code-block:: python 

932 

933 # This snippet has been automatically generated and should be regarded as a 

934 # code template only. 

935 # It will require modifications to work: 

936 # - It may require correct/in-range values for request initialization. 

937 # - It may require specifying regional endpoints when creating the service 

938 # client as shown in: 

939 # https://googleapis.dev/python/google-api-core/latest/client_options.html 

940 from google.cloud import bigquery_storage_v1 

941 

942 def sample_batch_commit_write_streams(): 

943 # Create a client 

944 client = bigquery_storage_v1.BigQueryWriteClient() 

945 

946 # Initialize request argument(s) 

947 request = bigquery_storage_v1.BatchCommitWriteStreamsRequest( 

948 parent="parent_value", 

949 write_streams=['write_streams_value1', 'write_streams_value2'], 

950 ) 

951 

952 # Make the request 

953 response = client.batch_commit_write_streams(request=request) 

954 

955 # Handle the response 

956 print(response) 

957 

958 Args: 

959 request (Union[google.cloud.bigquery_storage_v1.types.BatchCommitWriteStreamsRequest, dict]): 

960 The request object. Request message for ``BatchCommitWriteStreams``. 

961 parent (str): 

962 Required. Parent table that all the streams should 

963 belong to, in the form of 

964 ``projects/{project}/datasets/{dataset}/tables/{table}``. 

965 

966 This corresponds to the ``parent`` field 

967 on the ``request`` instance; if ``request`` is provided, this 

968 should not be set. 

969 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

970 should be retried. 

971 timeout (float): The timeout for this request. 

972 metadata (Sequence[Tuple[str, str]]): Strings which should be 

973 sent along with the request as metadata. 

974 

975 Returns: 

976 google.cloud.bigquery_storage_v1.types.BatchCommitWriteStreamsResponse: 

977 Response message for BatchCommitWriteStreams. 

978 """ 

979 # Create or coerce a protobuf request object. 

980 # Quick check: If we got a request object, we should *not* have 

981 # gotten any keyword arguments that map to the request. 

982 has_flattened_params = any([parent]) 

983 if request is not None and has_flattened_params: 

984 raise ValueError( 

985 "If the `request` argument is set, then none of " 

986 "the individual field arguments should be set." 

987 ) 

988 

989 # Minor optimization to avoid making a copy if the user passes 

990 # in a storage.BatchCommitWriteStreamsRequest. 

991 # There's no risk of modifying the input as we've already verified 

992 # there are no flattened fields. 

993 if not isinstance(request, storage.BatchCommitWriteStreamsRequest): 

994 request = storage.BatchCommitWriteStreamsRequest(request) 

995 # If we have keyword arguments corresponding to fields on the 

996 # request, apply these. 

997 if parent is not None: 

998 request.parent = parent 

999 

1000 # Wrap the RPC method; this adds retry and timeout information, 

1001 # and friendly error handling. 

1002 rpc = self._transport._wrapped_methods[ 

1003 self._transport.batch_commit_write_streams 

1004 ] 

1005 

1006 # Certain fields should be provided within the metadata header; 

1007 # add these here. 

1008 metadata = tuple(metadata) + ( 

1009 gapic_v1.routing_header.to_grpc_metadata((("parent", request.parent),)), 

1010 ) 

1011 

1012 # Send the request. 

1013 response = rpc( 

1014 request, 

1015 retry=retry, 

1016 timeout=timeout, 

1017 metadata=metadata, 

1018 ) 

1019 

1020 # Done; return the response. 

1021 return response 

1022 

1023 def flush_rows( 

1024 self, 

1025 request: Optional[Union[storage.FlushRowsRequest, dict]] = None, 

1026 *, 

1027 write_stream: Optional[str] = None, 

1028 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

1029 timeout: Union[float, object] = gapic_v1.method.DEFAULT, 

1030 metadata: Sequence[Tuple[str, str]] = (), 

1031 ) -> storage.FlushRowsResponse: 

1032 r"""Flushes rows to a BUFFERED stream. 

1033 

1034 If users are appending rows to BUFFERED stream, flush operation 

1035 is required in order for the rows to become available for 

1036 reading. A Flush operation flushes up to any previously flushed 

1037 offset in a BUFFERED stream, to the offset specified in the 

1038 request. 

1039 

1040 Flush is not supported on the \_default stream, since it is not 

1041 BUFFERED. 

1042 

1043 .. code-block:: python 

1044 

1045 # This snippet has been automatically generated and should be regarded as a 

1046 # code template only. 

1047 # It will require modifications to work: 

1048 # - It may require correct/in-range values for request initialization. 

1049 # - It may require specifying regional endpoints when creating the service 

1050 # client as shown in: 

1051 # https://googleapis.dev/python/google-api-core/latest/client_options.html 

1052 from google.cloud import bigquery_storage_v1 

1053 

1054 def sample_flush_rows(): 

1055 # Create a client 

1056 client = bigquery_storage_v1.BigQueryWriteClient() 

1057 

1058 # Initialize request argument(s) 

1059 request = bigquery_storage_v1.FlushRowsRequest( 

1060 write_stream="write_stream_value", 

1061 ) 

1062 

1063 # Make the request 

1064 response = client.flush_rows(request=request) 

1065 

1066 # Handle the response 

1067 print(response) 

1068 

1069 Args: 

1070 request (Union[google.cloud.bigquery_storage_v1.types.FlushRowsRequest, dict]): 

1071 The request object. Request message for ``FlushRows``. 

1072 write_stream (str): 

1073 Required. The stream that is the 

1074 target of the flush operation. 

1075 

1076 This corresponds to the ``write_stream`` field 

1077 on the ``request`` instance; if ``request`` is provided, this 

1078 should not be set. 

1079 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

1080 should be retried. 

1081 timeout (float): The timeout for this request. 

1082 metadata (Sequence[Tuple[str, str]]): Strings which should be 

1083 sent along with the request as metadata. 

1084 

1085 Returns: 

1086 google.cloud.bigquery_storage_v1.types.FlushRowsResponse: 

1087 Respond message for FlushRows. 

1088 """ 

1089 # Create or coerce a protobuf request object. 

1090 # Quick check: If we got a request object, we should *not* have 

1091 # gotten any keyword arguments that map to the request. 

1092 has_flattened_params = any([write_stream]) 

1093 if request is not None and has_flattened_params: 

1094 raise ValueError( 

1095 "If the `request` argument is set, then none of " 

1096 "the individual field arguments should be set." 

1097 ) 

1098 

1099 # Minor optimization to avoid making a copy if the user passes 

1100 # in a storage.FlushRowsRequest. 

1101 # There's no risk of modifying the input as we've already verified 

1102 # there are no flattened fields. 

1103 if not isinstance(request, storage.FlushRowsRequest): 

1104 request = storage.FlushRowsRequest(request) 

1105 # If we have keyword arguments corresponding to fields on the 

1106 # request, apply these. 

1107 if write_stream is not None: 

1108 request.write_stream = write_stream 

1109 

1110 # Wrap the RPC method; this adds retry and timeout information, 

1111 # and friendly error handling. 

1112 rpc = self._transport._wrapped_methods[self._transport.flush_rows] 

1113 

1114 # Certain fields should be provided within the metadata header; 

1115 # add these here. 

1116 metadata = tuple(metadata) + ( 

1117 gapic_v1.routing_header.to_grpc_metadata( 

1118 (("write_stream", request.write_stream),) 

1119 ), 

1120 ) 

1121 

1122 # Send the request. 

1123 response = rpc( 

1124 request, 

1125 retry=retry, 

1126 timeout=timeout, 

1127 metadata=metadata, 

1128 ) 

1129 

1130 # Done; return the response. 

1131 return response 

1132 

1133 def __enter__(self) -> "BigQueryWriteClient": 

1134 return self 

1135 

1136 def __exit__(self, type, value, traceback): 

1137 """Releases underlying transport's resources. 

1138 

1139 .. warning:: 

1140 ONLY use as a context manager if the transport is NOT shared 

1141 with other clients! Exiting the with block will CLOSE the transport 

1142 and may cause errors in other clients! 

1143 """ 

1144 self.transport.close() 

1145 

1146 

1147DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo( 

1148 gapic_version=package_version.__version__ 

1149) 

1150 

1151 

1152__all__ = ("BigQueryWriteClient",)