Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/cloud/bigquery_storage_v1/services/big_query_write/async_client.py: 47%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

135 statements  

1# -*- coding: utf-8 -*- 

2# Copyright 2024 Google LLC 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16from collections import OrderedDict 

17import functools 

18import re 

19from typing import ( 

20 AsyncIterable, 

21 AsyncIterator, 

22 Awaitable, 

23 Dict, 

24 Mapping, 

25 MutableMapping, 

26 MutableSequence, 

27 Optional, 

28 Sequence, 

29 Tuple, 

30 Type, 

31 Union, 

32) 

33 

34from google.api_core import exceptions as core_exceptions 

35from google.api_core import gapic_v1 

36from google.api_core import retry_async as retries 

37from google.api_core.client_options import ClientOptions 

38from google.auth import credentials as ga_credentials # type: ignore 

39from google.oauth2 import service_account # type: ignore 

40 

41from google.cloud.bigquery_storage_v1 import gapic_version as package_version 

42 

43try: 

44 OptionalRetry = Union[retries.AsyncRetry, gapic_v1.method._MethodDefault, None] 

45except AttributeError: # pragma: NO COVER 

46 OptionalRetry = Union[retries.AsyncRetry, object, None] # type: ignore 

47 

48from google.protobuf import timestamp_pb2 # type: ignore 

49from google.rpc import status_pb2 # type: ignore 

50 

51from google.cloud.bigquery_storage_v1.types import storage, stream, table 

52 

53from .client import BigQueryWriteClient 

54from .transports.base import DEFAULT_CLIENT_INFO, BigQueryWriteTransport 

55from .transports.grpc_asyncio import BigQueryWriteGrpcAsyncIOTransport 

56 

57 

58class BigQueryWriteAsyncClient: 

59 """BigQuery Write API. 

60 

61 The Write API can be used to write data to BigQuery. 

62 

63 For supplementary information about the Write API, see: 

64 

65 https://cloud.google.com/bigquery/docs/write-api 

66 """ 

67 

68 _client: BigQueryWriteClient 

69 

70 # Copy defaults from the synchronous client for use here. 

71 # Note: DEFAULT_ENDPOINT is deprecated. Use _DEFAULT_ENDPOINT_TEMPLATE instead. 

72 DEFAULT_ENDPOINT = BigQueryWriteClient.DEFAULT_ENDPOINT 

73 DEFAULT_MTLS_ENDPOINT = BigQueryWriteClient.DEFAULT_MTLS_ENDPOINT 

74 _DEFAULT_ENDPOINT_TEMPLATE = BigQueryWriteClient._DEFAULT_ENDPOINT_TEMPLATE 

75 _DEFAULT_UNIVERSE = BigQueryWriteClient._DEFAULT_UNIVERSE 

76 

77 table_path = staticmethod(BigQueryWriteClient.table_path) 

78 parse_table_path = staticmethod(BigQueryWriteClient.parse_table_path) 

79 write_stream_path = staticmethod(BigQueryWriteClient.write_stream_path) 

80 parse_write_stream_path = staticmethod(BigQueryWriteClient.parse_write_stream_path) 

81 common_billing_account_path = staticmethod( 

82 BigQueryWriteClient.common_billing_account_path 

83 ) 

84 parse_common_billing_account_path = staticmethod( 

85 BigQueryWriteClient.parse_common_billing_account_path 

86 ) 

87 common_folder_path = staticmethod(BigQueryWriteClient.common_folder_path) 

88 parse_common_folder_path = staticmethod( 

89 BigQueryWriteClient.parse_common_folder_path 

90 ) 

91 common_organization_path = staticmethod( 

92 BigQueryWriteClient.common_organization_path 

93 ) 

94 parse_common_organization_path = staticmethod( 

95 BigQueryWriteClient.parse_common_organization_path 

96 ) 

97 common_project_path = staticmethod(BigQueryWriteClient.common_project_path) 

98 parse_common_project_path = staticmethod( 

99 BigQueryWriteClient.parse_common_project_path 

100 ) 

101 common_location_path = staticmethod(BigQueryWriteClient.common_location_path) 

102 parse_common_location_path = staticmethod( 

103 BigQueryWriteClient.parse_common_location_path 

104 ) 

105 

106 @classmethod 

107 def from_service_account_info(cls, info: dict, *args, **kwargs): 

108 """Creates an instance of this client using the provided credentials 

109 info. 

110 

111 Args: 

112 info (dict): The service account private key info. 

113 args: Additional arguments to pass to the constructor. 

114 kwargs: Additional arguments to pass to the constructor. 

115 

116 Returns: 

117 BigQueryWriteAsyncClient: The constructed client. 

118 """ 

119 return BigQueryWriteClient.from_service_account_info.__func__(BigQueryWriteAsyncClient, info, *args, **kwargs) # type: ignore 

120 

121 @classmethod 

122 def from_service_account_file(cls, filename: str, *args, **kwargs): 

123 """Creates an instance of this client using the provided credentials 

124 file. 

125 

126 Args: 

127 filename (str): The path to the service account private key json 

128 file. 

129 args: Additional arguments to pass to the constructor. 

130 kwargs: Additional arguments to pass to the constructor. 

131 

132 Returns: 

133 BigQueryWriteAsyncClient: The constructed client. 

134 """ 

135 return BigQueryWriteClient.from_service_account_file.__func__(BigQueryWriteAsyncClient, filename, *args, **kwargs) # type: ignore 

136 

137 from_service_account_json = from_service_account_file 

138 

139 @classmethod 

140 def get_mtls_endpoint_and_cert_source( 

141 cls, client_options: Optional[ClientOptions] = None 

142 ): 

143 """Return the API endpoint and client cert source for mutual TLS. 

144 

145 The client cert source is determined in the following order: 

146 (1) if `GOOGLE_API_USE_CLIENT_CERTIFICATE` environment variable is not "true", the 

147 client cert source is None. 

148 (2) if `client_options.client_cert_source` is provided, use the provided one; if the 

149 default client cert source exists, use the default one; otherwise the client cert 

150 source is None. 

151 

152 The API endpoint is determined in the following order: 

153 (1) if `client_options.api_endpoint` if provided, use the provided one. 

154 (2) if `GOOGLE_API_USE_CLIENT_CERTIFICATE` environment variable is "always", use the 

155 default mTLS endpoint; if the environment variable is "never", use the default API 

156 endpoint; otherwise if client cert source exists, use the default mTLS endpoint, otherwise 

157 use the default API endpoint. 

158 

159 More details can be found at https://google.aip.dev/auth/4114. 

160 

161 Args: 

162 client_options (google.api_core.client_options.ClientOptions): Custom options for the 

163 client. Only the `api_endpoint` and `client_cert_source` properties may be used 

164 in this method. 

165 

166 Returns: 

167 Tuple[str, Callable[[], Tuple[bytes, bytes]]]: returns the API endpoint and the 

168 client cert source to use. 

169 

170 Raises: 

171 google.auth.exceptions.MutualTLSChannelError: If any errors happen. 

172 """ 

173 return BigQueryWriteClient.get_mtls_endpoint_and_cert_source(client_options) # type: ignore 

174 

175 @property 

176 def transport(self) -> BigQueryWriteTransport: 

177 """Returns the transport used by the client instance. 

178 

179 Returns: 

180 BigQueryWriteTransport: The transport used by the client instance. 

181 """ 

182 return self._client.transport 

183 

184 @property 

185 def api_endpoint(self): 

186 """Return the API endpoint used by the client instance. 

187 

188 Returns: 

189 str: The API endpoint used by the client instance. 

190 """ 

191 return self._client._api_endpoint 

192 

193 @property 

194 def universe_domain(self) -> str: 

195 """Return the universe domain used by the client instance. 

196 

197 Returns: 

198 str: The universe domain used 

199 by the client instance. 

200 """ 

201 return self._client._universe_domain 

202 

203 get_transport_class = functools.partial( 

204 type(BigQueryWriteClient).get_transport_class, type(BigQueryWriteClient) 

205 ) 

206 

207 def __init__( 

208 self, 

209 *, 

210 credentials: Optional[ga_credentials.Credentials] = None, 

211 transport: Union[str, BigQueryWriteTransport] = "grpc_asyncio", 

212 client_options: Optional[ClientOptions] = None, 

213 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, 

214 ) -> None: 

215 """Instantiates the big query write async client. 

216 

217 Args: 

218 credentials (Optional[google.auth.credentials.Credentials]): The 

219 authorization credentials to attach to requests. These 

220 credentials identify the application to the service; if none 

221 are specified, the client will attempt to ascertain the 

222 credentials from the environment. 

223 transport (Union[str, ~.BigQueryWriteTransport]): The 

224 transport to use. If set to None, a transport is chosen 

225 automatically. 

226 client_options (Optional[Union[google.api_core.client_options.ClientOptions, dict]]): 

227 Custom options for the client. 

228 

229 1. The ``api_endpoint`` property can be used to override the 

230 default endpoint provided by the client when ``transport`` is 

231 not explicitly provided. Only if this property is not set and 

232 ``transport`` was not explicitly provided, the endpoint is 

233 determined by the GOOGLE_API_USE_MTLS_ENDPOINT environment 

234 variable, which have one of the following values: 

235 "always" (always use the default mTLS endpoint), "never" (always 

236 use the default regular endpoint) and "auto" (auto-switch to the 

237 default mTLS endpoint if client certificate is present; this is 

238 the default value). 

239 

240 2. If the GOOGLE_API_USE_CLIENT_CERTIFICATE environment variable 

241 is "true", then the ``client_cert_source`` property can be used 

242 to provide a client certificate for mTLS transport. If 

243 not provided, the default SSL client certificate will be used if 

244 present. If GOOGLE_API_USE_CLIENT_CERTIFICATE is "false" or not 

245 set, no client certificate will be used. 

246 

247 3. The ``universe_domain`` property can be used to override the 

248 default "googleapis.com" universe. Note that ``api_endpoint`` 

249 property still takes precedence; and ``universe_domain`` is 

250 currently not supported for mTLS. 

251 

252 client_info (google.api_core.gapic_v1.client_info.ClientInfo): 

253 The client info used to send a user-agent string along with 

254 API requests. If ``None``, then default info will be used. 

255 Generally, you only need to set this if you're developing 

256 your own client library. 

257 

258 Raises: 

259 google.auth.exceptions.MutualTlsChannelError: If mutual TLS transport 

260 creation failed for any reason. 

261 """ 

262 self._client = BigQueryWriteClient( 

263 credentials=credentials, 

264 transport=transport, 

265 client_options=client_options, 

266 client_info=client_info, 

267 ) 

268 

269 async def create_write_stream( 

270 self, 

271 request: Optional[Union[storage.CreateWriteStreamRequest, dict]] = None, 

272 *, 

273 parent: Optional[str] = None, 

274 write_stream: Optional[stream.WriteStream] = None, 

275 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

276 timeout: Union[float, object] = gapic_v1.method.DEFAULT, 

277 metadata: Sequence[Tuple[str, str]] = (), 

278 ) -> stream.WriteStream: 

279 r"""Creates a write stream to the given table. Additionally, every 

280 table has a special stream named '_default' to which data can be 

281 written. This stream doesn't need to be created using 

282 CreateWriteStream. It is a stream that can be used 

283 simultaneously by any number of clients. Data written to this 

284 stream is considered committed as soon as an acknowledgement is 

285 received. 

286 

287 .. code-block:: python 

288 

289 # This snippet has been automatically generated and should be regarded as a 

290 # code template only. 

291 # It will require modifications to work: 

292 # - It may require correct/in-range values for request initialization. 

293 # - It may require specifying regional endpoints when creating the service 

294 # client as shown in: 

295 # https://googleapis.dev/python/google-api-core/latest/client_options.html 

296 from google.cloud import bigquery_storage_v1 

297 

298 async def sample_create_write_stream(): 

299 # Create a client 

300 client = bigquery_storage_v1.BigQueryWriteAsyncClient() 

301 

302 # Initialize request argument(s) 

303 request = bigquery_storage_v1.CreateWriteStreamRequest( 

304 parent="parent_value", 

305 ) 

306 

307 # Make the request 

308 response = await client.create_write_stream(request=request) 

309 

310 # Handle the response 

311 print(response) 

312 

313 Args: 

314 request (Optional[Union[google.cloud.bigquery_storage_v1.types.CreateWriteStreamRequest, dict]]): 

315 The request object. Request message for ``CreateWriteStream``. 

316 parent (:class:`str`): 

317 Required. Reference to the table to which the stream 

318 belongs, in the format of 

319 ``projects/{project}/datasets/{dataset}/tables/{table}``. 

320 

321 This corresponds to the ``parent`` field 

322 on the ``request`` instance; if ``request`` is provided, this 

323 should not be set. 

324 write_stream (:class:`google.cloud.bigquery_storage_v1.types.WriteStream`): 

325 Required. Stream to be created. 

326 This corresponds to the ``write_stream`` field 

327 on the ``request`` instance; if ``request`` is provided, this 

328 should not be set. 

329 retry (google.api_core.retry_async.AsyncRetry): Designation of what errors, if any, 

330 should be retried. 

331 timeout (float): The timeout for this request. 

332 metadata (Sequence[Tuple[str, str]]): Strings which should be 

333 sent along with the request as metadata. 

334 

335 Returns: 

336 google.cloud.bigquery_storage_v1.types.WriteStream: 

337 Information about a single stream 

338 that gets data inside the storage 

339 system. 

340 

341 """ 

342 # Create or coerce a protobuf request object. 

343 # Quick check: If we got a request object, we should *not* have 

344 # gotten any keyword arguments that map to the request. 

345 has_flattened_params = any([parent, write_stream]) 

346 if request is not None and has_flattened_params: 

347 raise ValueError( 

348 "If the `request` argument is set, then none of " 

349 "the individual field arguments should be set." 

350 ) 

351 

352 request = storage.CreateWriteStreamRequest(request) 

353 

354 # If we have keyword arguments corresponding to fields on the 

355 # request, apply these. 

356 if parent is not None: 

357 request.parent = parent 

358 if write_stream is not None: 

359 request.write_stream = write_stream 

360 

361 # Wrap the RPC method; this adds retry and timeout information, 

362 # and friendly error handling. 

363 rpc = gapic_v1.method_async.wrap_method( 

364 self._client._transport.create_write_stream, 

365 default_retry=retries.AsyncRetry( 

366 initial=10.0, 

367 maximum=120.0, 

368 multiplier=1.3, 

369 predicate=retries.if_exception_type( 

370 core_exceptions.DeadlineExceeded, 

371 core_exceptions.ResourceExhausted, 

372 core_exceptions.ServiceUnavailable, 

373 ), 

374 deadline=1200.0, 

375 ), 

376 default_timeout=1200.0, 

377 client_info=DEFAULT_CLIENT_INFO, 

378 ) 

379 

380 # Certain fields should be provided within the metadata header; 

381 # add these here. 

382 metadata = tuple(metadata) + ( 

383 gapic_v1.routing_header.to_grpc_metadata((("parent", request.parent),)), 

384 ) 

385 

386 # Validate the universe domain. 

387 self._client._validate_universe_domain() 

388 

389 # Send the request. 

390 response = await rpc( 

391 request, 

392 retry=retry, 

393 timeout=timeout, 

394 metadata=metadata, 

395 ) 

396 

397 # Done; return the response. 

398 return response 

399 

400 def append_rows( 

401 self, 

402 requests: Optional[AsyncIterator[storage.AppendRowsRequest]] = None, 

403 *, 

404 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

405 timeout: Union[float, object] = gapic_v1.method.DEFAULT, 

406 metadata: Sequence[Tuple[str, str]] = (), 

407 ) -> Awaitable[AsyncIterable[storage.AppendRowsResponse]]: 

408 r"""Appends data to the given stream. 

409 

410 If ``offset`` is specified, the ``offset`` is checked against 

411 the end of stream. The server returns ``OUT_OF_RANGE`` in 

412 ``AppendRowsResponse`` if an attempt is made to append to an 

413 offset beyond the current end of the stream or 

414 ``ALREADY_EXISTS`` if user provides an ``offset`` that has 

415 already been written to. User can retry with adjusted offset 

416 within the same RPC connection. If ``offset`` is not specified, 

417 append happens at the end of the stream. 

418 

419 The response contains an optional offset at which the append 

420 happened. No offset information will be returned for appends to 

421 a default stream. 

422 

423 Responses are received in the same order in which requests are 

424 sent. There will be one response for each successful inserted 

425 request. Responses may optionally embed error information if the 

426 originating AppendRequest was not successfully processed. 

427 

428 The specifics of when successfully appended data is made visible 

429 to the table are governed by the type of stream: 

430 

431 - For COMMITTED streams (which includes the default stream), 

432 data is visible immediately upon successful append. 

433 

434 - For BUFFERED streams, data is made visible via a subsequent 

435 ``FlushRows`` rpc which advances a cursor to a newer offset 

436 in the stream. 

437 

438 - For PENDING streams, data is not made visible until the 

439 stream itself is finalized (via the ``FinalizeWriteStream`` 

440 rpc), and the stream is explicitly committed via the 

441 ``BatchCommitWriteStreams`` rpc. 

442 

443 .. code-block:: python 

444 

445 # This snippet has been automatically generated and should be regarded as a 

446 # code template only. 

447 # It will require modifications to work: 

448 # - It may require correct/in-range values for request initialization. 

449 # - It may require specifying regional endpoints when creating the service 

450 # client as shown in: 

451 # https://googleapis.dev/python/google-api-core/latest/client_options.html 

452 from google.cloud import bigquery_storage_v1 

453 

454 async def sample_append_rows(): 

455 # Create a client 

456 client = bigquery_storage_v1.BigQueryWriteAsyncClient() 

457 

458 # Initialize request argument(s) 

459 request = bigquery_storage_v1.AppendRowsRequest( 

460 write_stream="write_stream_value", 

461 ) 

462 

463 # This method expects an iterator which contains 

464 # 'bigquery_storage_v1.AppendRowsRequest' objects 

465 # Here we create a generator that yields a single `request` for 

466 # demonstrative purposes. 

467 requests = [request] 

468 

469 def request_generator(): 

470 for request in requests: 

471 yield request 

472 

473 # Make the request 

474 stream = await client.append_rows(requests=request_generator()) 

475 

476 # Handle the response 

477 async for response in stream: 

478 print(response) 

479 

480 Args: 

481 requests (AsyncIterator[`google.cloud.bigquery_storage_v1.types.AppendRowsRequest`]): 

482 The request object AsyncIterator. Request message for ``AppendRows``. 

483 

484 Because AppendRows is a bidirectional streaming RPC, 

485 certain parts of the AppendRowsRequest need only be 

486 specified for the first request before switching table 

487 destinations. You can also switch table destinations 

488 within the same connection for the default stream. 

489 

490 The size of a single AppendRowsRequest must be less than 

491 10 MB in size. Requests larger than this return an 

492 error, typically ``INVALID_ARGUMENT``. 

493 retry (google.api_core.retry_async.AsyncRetry): Designation of what errors, if any, 

494 should be retried. 

495 timeout (float): The timeout for this request. 

496 metadata (Sequence[Tuple[str, str]]): Strings which should be 

497 sent along with the request as metadata. 

498 

499 Returns: 

500 AsyncIterable[google.cloud.bigquery_storage_v1.types.AppendRowsResponse]: 

501 Response message for AppendRows. 

502 """ 

503 

504 # Wrap the RPC method; this adds retry and timeout information, 

505 # and friendly error handling. 

506 rpc = gapic_v1.method_async.wrap_method( 

507 self._client._transport.append_rows, 

508 default_retry=retries.AsyncRetry( 

509 initial=0.1, 

510 maximum=60.0, 

511 multiplier=1.3, 

512 predicate=retries.if_exception_type( 

513 core_exceptions.ServiceUnavailable, 

514 ), 

515 deadline=86400.0, 

516 ), 

517 default_timeout=86400.0, 

518 client_info=DEFAULT_CLIENT_INFO, 

519 ) 

520 

521 # Certain fields should be provided within the metadata header; 

522 # add these here. 

523 metadata = tuple(metadata) + (gapic_v1.routing_header.to_grpc_metadata(()),) 

524 

525 # Validate the universe domain. 

526 self._client._validate_universe_domain() 

527 

528 # Send the request. 

529 response = rpc( 

530 requests, 

531 retry=retry, 

532 timeout=timeout, 

533 metadata=metadata, 

534 ) 

535 

536 # Done; return the response. 

537 return response 

538 

539 async def get_write_stream( 

540 self, 

541 request: Optional[Union[storage.GetWriteStreamRequest, dict]] = None, 

542 *, 

543 name: Optional[str] = None, 

544 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

545 timeout: Union[float, object] = gapic_v1.method.DEFAULT, 

546 metadata: Sequence[Tuple[str, str]] = (), 

547 ) -> stream.WriteStream: 

548 r"""Gets information about a write stream. 

549 

550 .. code-block:: python 

551 

552 # This snippet has been automatically generated and should be regarded as a 

553 # code template only. 

554 # It will require modifications to work: 

555 # - It may require correct/in-range values for request initialization. 

556 # - It may require specifying regional endpoints when creating the service 

557 # client as shown in: 

558 # https://googleapis.dev/python/google-api-core/latest/client_options.html 

559 from google.cloud import bigquery_storage_v1 

560 

561 async def sample_get_write_stream(): 

562 # Create a client 

563 client = bigquery_storage_v1.BigQueryWriteAsyncClient() 

564 

565 # Initialize request argument(s) 

566 request = bigquery_storage_v1.GetWriteStreamRequest( 

567 name="name_value", 

568 ) 

569 

570 # Make the request 

571 response = await client.get_write_stream(request=request) 

572 

573 # Handle the response 

574 print(response) 

575 

576 Args: 

577 request (Optional[Union[google.cloud.bigquery_storage_v1.types.GetWriteStreamRequest, dict]]): 

578 The request object. Request message for ``GetWriteStreamRequest``. 

579 name (:class:`str`): 

580 Required. Name of the stream to get, in the form of 

581 ``projects/{project}/datasets/{dataset}/tables/{table}/streams/{stream}``. 

582 

583 This corresponds to the ``name`` field 

584 on the ``request`` instance; if ``request`` is provided, this 

585 should not be set. 

586 retry (google.api_core.retry_async.AsyncRetry): Designation of what errors, if any, 

587 should be retried. 

588 timeout (float): The timeout for this request. 

589 metadata (Sequence[Tuple[str, str]]): Strings which should be 

590 sent along with the request as metadata. 

591 

592 Returns: 

593 google.cloud.bigquery_storage_v1.types.WriteStream: 

594 Information about a single stream 

595 that gets data inside the storage 

596 system. 

597 

598 """ 

599 # Create or coerce a protobuf request object. 

600 # Quick check: If we got a request object, we should *not* have 

601 # gotten any keyword arguments that map to the request. 

602 has_flattened_params = any([name]) 

603 if request is not None and has_flattened_params: 

604 raise ValueError( 

605 "If the `request` argument is set, then none of " 

606 "the individual field arguments should be set." 

607 ) 

608 

609 request = storage.GetWriteStreamRequest(request) 

610 

611 # If we have keyword arguments corresponding to fields on the 

612 # request, apply these. 

613 if name is not None: 

614 request.name = name 

615 

616 # Wrap the RPC method; this adds retry and timeout information, 

617 # and friendly error handling. 

618 rpc = gapic_v1.method_async.wrap_method( 

619 self._client._transport.get_write_stream, 

620 default_retry=retries.AsyncRetry( 

621 initial=0.1, 

622 maximum=60.0, 

623 multiplier=1.3, 

624 predicate=retries.if_exception_type( 

625 core_exceptions.DeadlineExceeded, 

626 core_exceptions.ResourceExhausted, 

627 core_exceptions.ServiceUnavailable, 

628 ), 

629 deadline=600.0, 

630 ), 

631 default_timeout=600.0, 

632 client_info=DEFAULT_CLIENT_INFO, 

633 ) 

634 

635 # Certain fields should be provided within the metadata header; 

636 # add these here. 

637 metadata = tuple(metadata) + ( 

638 gapic_v1.routing_header.to_grpc_metadata((("name", request.name),)), 

639 ) 

640 

641 # Validate the universe domain. 

642 self._client._validate_universe_domain() 

643 

644 # Send the request. 

645 response = await rpc( 

646 request, 

647 retry=retry, 

648 timeout=timeout, 

649 metadata=metadata, 

650 ) 

651 

652 # Done; return the response. 

653 return response 

654 

655 async def finalize_write_stream( 

656 self, 

657 request: Optional[Union[storage.FinalizeWriteStreamRequest, dict]] = None, 

658 *, 

659 name: Optional[str] = None, 

660 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

661 timeout: Union[float, object] = gapic_v1.method.DEFAULT, 

662 metadata: Sequence[Tuple[str, str]] = (), 

663 ) -> storage.FinalizeWriteStreamResponse: 

664 r"""Finalize a write stream so that no new data can be appended to 

665 the stream. Finalize is not supported on the '_default' stream. 

666 

667 .. code-block:: python 

668 

669 # This snippet has been automatically generated and should be regarded as a 

670 # code template only. 

671 # It will require modifications to work: 

672 # - It may require correct/in-range values for request initialization. 

673 # - It may require specifying regional endpoints when creating the service 

674 # client as shown in: 

675 # https://googleapis.dev/python/google-api-core/latest/client_options.html 

676 from google.cloud import bigquery_storage_v1 

677 

678 async def sample_finalize_write_stream(): 

679 # Create a client 

680 client = bigquery_storage_v1.BigQueryWriteAsyncClient() 

681 

682 # Initialize request argument(s) 

683 request = bigquery_storage_v1.FinalizeWriteStreamRequest( 

684 name="name_value", 

685 ) 

686 

687 # Make the request 

688 response = await client.finalize_write_stream(request=request) 

689 

690 # Handle the response 

691 print(response) 

692 

693 Args: 

694 request (Optional[Union[google.cloud.bigquery_storage_v1.types.FinalizeWriteStreamRequest, dict]]): 

695 The request object. Request message for invoking ``FinalizeWriteStream``. 

696 name (:class:`str`): 

697 Required. Name of the stream to finalize, in the form of 

698 ``projects/{project}/datasets/{dataset}/tables/{table}/streams/{stream}``. 

699 

700 This corresponds to the ``name`` field 

701 on the ``request`` instance; if ``request`` is provided, this 

702 should not be set. 

703 retry (google.api_core.retry_async.AsyncRetry): Designation of what errors, if any, 

704 should be retried. 

705 timeout (float): The timeout for this request. 

706 metadata (Sequence[Tuple[str, str]]): Strings which should be 

707 sent along with the request as metadata. 

708 

709 Returns: 

710 google.cloud.bigquery_storage_v1.types.FinalizeWriteStreamResponse: 

711 Response message for FinalizeWriteStream. 

712 """ 

713 # Create or coerce a protobuf request object. 

714 # Quick check: If we got a request object, we should *not* have 

715 # gotten any keyword arguments that map to the request. 

716 has_flattened_params = any([name]) 

717 if request is not None and has_flattened_params: 

718 raise ValueError( 

719 "If the `request` argument is set, then none of " 

720 "the individual field arguments should be set." 

721 ) 

722 

723 request = storage.FinalizeWriteStreamRequest(request) 

724 

725 # If we have keyword arguments corresponding to fields on the 

726 # request, apply these. 

727 if name is not None: 

728 request.name = name 

729 

730 # Wrap the RPC method; this adds retry and timeout information, 

731 # and friendly error handling. 

732 rpc = gapic_v1.method_async.wrap_method( 

733 self._client._transport.finalize_write_stream, 

734 default_retry=retries.AsyncRetry( 

735 initial=0.1, 

736 maximum=60.0, 

737 multiplier=1.3, 

738 predicate=retries.if_exception_type( 

739 core_exceptions.DeadlineExceeded, 

740 core_exceptions.ResourceExhausted, 

741 core_exceptions.ServiceUnavailable, 

742 ), 

743 deadline=600.0, 

744 ), 

745 default_timeout=600.0, 

746 client_info=DEFAULT_CLIENT_INFO, 

747 ) 

748 

749 # Certain fields should be provided within the metadata header; 

750 # add these here. 

751 metadata = tuple(metadata) + ( 

752 gapic_v1.routing_header.to_grpc_metadata((("name", request.name),)), 

753 ) 

754 

755 # Validate the universe domain. 

756 self._client._validate_universe_domain() 

757 

758 # Send the request. 

759 response = await rpc( 

760 request, 

761 retry=retry, 

762 timeout=timeout, 

763 metadata=metadata, 

764 ) 

765 

766 # Done; return the response. 

767 return response 

768 

769 async def batch_commit_write_streams( 

770 self, 

771 request: Optional[Union[storage.BatchCommitWriteStreamsRequest, dict]] = None, 

772 *, 

773 parent: Optional[str] = None, 

774 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

775 timeout: Union[float, object] = gapic_v1.method.DEFAULT, 

776 metadata: Sequence[Tuple[str, str]] = (), 

777 ) -> storage.BatchCommitWriteStreamsResponse: 

778 r"""Atomically commits a group of ``PENDING`` streams that belong to 

779 the same ``parent`` table. 

780 

781 Streams must be finalized before commit and cannot be committed 

782 multiple times. Once a stream is committed, data in the stream 

783 becomes available for read operations. 

784 

785 .. code-block:: python 

786 

787 # This snippet has been automatically generated and should be regarded as a 

788 # code template only. 

789 # It will require modifications to work: 

790 # - It may require correct/in-range values for request initialization. 

791 # - It may require specifying regional endpoints when creating the service 

792 # client as shown in: 

793 # https://googleapis.dev/python/google-api-core/latest/client_options.html 

794 from google.cloud import bigquery_storage_v1 

795 

796 async def sample_batch_commit_write_streams(): 

797 # Create a client 

798 client = bigquery_storage_v1.BigQueryWriteAsyncClient() 

799 

800 # Initialize request argument(s) 

801 request = bigquery_storage_v1.BatchCommitWriteStreamsRequest( 

802 parent="parent_value", 

803 write_streams=['write_streams_value1', 'write_streams_value2'], 

804 ) 

805 

806 # Make the request 

807 response = await client.batch_commit_write_streams(request=request) 

808 

809 # Handle the response 

810 print(response) 

811 

812 Args: 

813 request (Optional[Union[google.cloud.bigquery_storage_v1.types.BatchCommitWriteStreamsRequest, dict]]): 

814 The request object. Request message for ``BatchCommitWriteStreams``. 

815 parent (:class:`str`): 

816 Required. Parent table that all the streams should 

817 belong to, in the form of 

818 ``projects/{project}/datasets/{dataset}/tables/{table}``. 

819 

820 This corresponds to the ``parent`` field 

821 on the ``request`` instance; if ``request`` is provided, this 

822 should not be set. 

823 retry (google.api_core.retry_async.AsyncRetry): Designation of what errors, if any, 

824 should be retried. 

825 timeout (float): The timeout for this request. 

826 metadata (Sequence[Tuple[str, str]]): Strings which should be 

827 sent along with the request as metadata. 

828 

829 Returns: 

830 google.cloud.bigquery_storage_v1.types.BatchCommitWriteStreamsResponse: 

831 Response message for BatchCommitWriteStreams. 

832 """ 

833 # Create or coerce a protobuf request object. 

834 # Quick check: If we got a request object, we should *not* have 

835 # gotten any keyword arguments that map to the request. 

836 has_flattened_params = any([parent]) 

837 if request is not None and has_flattened_params: 

838 raise ValueError( 

839 "If the `request` argument is set, then none of " 

840 "the individual field arguments should be set." 

841 ) 

842 

843 request = storage.BatchCommitWriteStreamsRequest(request) 

844 

845 # If we have keyword arguments corresponding to fields on the 

846 # request, apply these. 

847 if parent is not None: 

848 request.parent = parent 

849 

850 # Wrap the RPC method; this adds retry and timeout information, 

851 # and friendly error handling. 

852 rpc = gapic_v1.method_async.wrap_method( 

853 self._client._transport.batch_commit_write_streams, 

854 default_retry=retries.AsyncRetry( 

855 initial=0.1, 

856 maximum=60.0, 

857 multiplier=1.3, 

858 predicate=retries.if_exception_type( 

859 core_exceptions.DeadlineExceeded, 

860 core_exceptions.ResourceExhausted, 

861 core_exceptions.ServiceUnavailable, 

862 ), 

863 deadline=600.0, 

864 ), 

865 default_timeout=600.0, 

866 client_info=DEFAULT_CLIENT_INFO, 

867 ) 

868 

869 # Certain fields should be provided within the metadata header; 

870 # add these here. 

871 metadata = tuple(metadata) + ( 

872 gapic_v1.routing_header.to_grpc_metadata((("parent", request.parent),)), 

873 ) 

874 

875 # Validate the universe domain. 

876 self._client._validate_universe_domain() 

877 

878 # Send the request. 

879 response = await rpc( 

880 request, 

881 retry=retry, 

882 timeout=timeout, 

883 metadata=metadata, 

884 ) 

885 

886 # Done; return the response. 

887 return response 

888 

889 async def flush_rows( 

890 self, 

891 request: Optional[Union[storage.FlushRowsRequest, dict]] = None, 

892 *, 

893 write_stream: Optional[str] = None, 

894 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

895 timeout: Union[float, object] = gapic_v1.method.DEFAULT, 

896 metadata: Sequence[Tuple[str, str]] = (), 

897 ) -> storage.FlushRowsResponse: 

898 r"""Flushes rows to a BUFFERED stream. 

899 

900 If users are appending rows to BUFFERED stream, flush operation 

901 is required in order for the rows to become available for 

902 reading. A Flush operation flushes up to any previously flushed 

903 offset in a BUFFERED stream, to the offset specified in the 

904 request. 

905 

906 Flush is not supported on the \_default stream, since it is not 

907 BUFFERED. 

908 

909 .. code-block:: python 

910 

911 # This snippet has been automatically generated and should be regarded as a 

912 # code template only. 

913 # It will require modifications to work: 

914 # - It may require correct/in-range values for request initialization. 

915 # - It may require specifying regional endpoints when creating the service 

916 # client as shown in: 

917 # https://googleapis.dev/python/google-api-core/latest/client_options.html 

918 from google.cloud import bigquery_storage_v1 

919 

920 async def sample_flush_rows(): 

921 # Create a client 

922 client = bigquery_storage_v1.BigQueryWriteAsyncClient() 

923 

924 # Initialize request argument(s) 

925 request = bigquery_storage_v1.FlushRowsRequest( 

926 write_stream="write_stream_value", 

927 ) 

928 

929 # Make the request 

930 response = await client.flush_rows(request=request) 

931 

932 # Handle the response 

933 print(response) 

934 

935 Args: 

936 request (Optional[Union[google.cloud.bigquery_storage_v1.types.FlushRowsRequest, dict]]): 

937 The request object. Request message for ``FlushRows``. 

938 write_stream (:class:`str`): 

939 Required. The stream that is the 

940 target of the flush operation. 

941 

942 This corresponds to the ``write_stream`` field 

943 on the ``request`` instance; if ``request`` is provided, this 

944 should not be set. 

945 retry (google.api_core.retry_async.AsyncRetry): Designation of what errors, if any, 

946 should be retried. 

947 timeout (float): The timeout for this request. 

948 metadata (Sequence[Tuple[str, str]]): Strings which should be 

949 sent along with the request as metadata. 

950 

951 Returns: 

952 google.cloud.bigquery_storage_v1.types.FlushRowsResponse: 

953 Respond message for FlushRows. 

954 """ 

955 # Create or coerce a protobuf request object. 

956 # Quick check: If we got a request object, we should *not* have 

957 # gotten any keyword arguments that map to the request. 

958 has_flattened_params = any([write_stream]) 

959 if request is not None and has_flattened_params: 

960 raise ValueError( 

961 "If the `request` argument is set, then none of " 

962 "the individual field arguments should be set." 

963 ) 

964 

965 request = storage.FlushRowsRequest(request) 

966 

967 # If we have keyword arguments corresponding to fields on the 

968 # request, apply these. 

969 if write_stream is not None: 

970 request.write_stream = write_stream 

971 

972 # Wrap the RPC method; this adds retry and timeout information, 

973 # and friendly error handling. 

974 rpc = gapic_v1.method_async.wrap_method( 

975 self._client._transport.flush_rows, 

976 default_retry=retries.AsyncRetry( 

977 initial=0.1, 

978 maximum=60.0, 

979 multiplier=1.3, 

980 predicate=retries.if_exception_type( 

981 core_exceptions.DeadlineExceeded, 

982 core_exceptions.ResourceExhausted, 

983 core_exceptions.ServiceUnavailable, 

984 ), 

985 deadline=600.0, 

986 ), 

987 default_timeout=600.0, 

988 client_info=DEFAULT_CLIENT_INFO, 

989 ) 

990 

991 # Certain fields should be provided within the metadata header; 

992 # add these here. 

993 metadata = tuple(metadata) + ( 

994 gapic_v1.routing_header.to_grpc_metadata( 

995 (("write_stream", request.write_stream),) 

996 ), 

997 ) 

998 

999 # Validate the universe domain. 

1000 self._client._validate_universe_domain() 

1001 

1002 # Send the request. 

1003 response = await rpc( 

1004 request, 

1005 retry=retry, 

1006 timeout=timeout, 

1007 metadata=metadata, 

1008 ) 

1009 

1010 # Done; return the response. 

1011 return response 

1012 

1013 async def __aenter__(self) -> "BigQueryWriteAsyncClient": 

1014 return self 

1015 

1016 async def __aexit__(self, exc_type, exc, tb): 

1017 await self.transport.close() 

1018 

1019 

1020DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo( 

1021 gapic_version=package_version.__version__ 

1022) 

1023 

1024 

1025__all__ = ("BigQueryWriteAsyncClient",)