Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/cloud/bigquery_storage_v1/services/big_query_write/async_client.py: 45%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

150 statements  

1# -*- coding: utf-8 -*- 

2# Copyright 2025 Google LLC 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16from collections import OrderedDict 

17import logging as std_logging 

18import re 

19from typing import ( 

20 AsyncIterable, 

21 AsyncIterator, 

22 Awaitable, 

23 Callable, 

24 Dict, 

25 Mapping, 

26 MutableMapping, 

27 MutableSequence, 

28 Optional, 

29 Sequence, 

30 Tuple, 

31 Type, 

32 Union, 

33) 

34 

35from google.api_core import exceptions as core_exceptions 

36from google.api_core import gapic_v1 

37from google.api_core import retry_async as retries 

38from google.api_core.client_options import ClientOptions 

39from google.auth import credentials as ga_credentials # type: ignore 

40from google.oauth2 import service_account # type: ignore 

41import google.protobuf 

42 

43from google.cloud.bigquery_storage_v1 import gapic_version as package_version 

44 

45try: 

46 OptionalRetry = Union[retries.AsyncRetry, gapic_v1.method._MethodDefault, None] 

47except AttributeError: # pragma: NO COVER 

48 OptionalRetry = Union[retries.AsyncRetry, object, None] # type: ignore 

49 

50from google.protobuf import timestamp_pb2 # type: ignore 

51from google.rpc import status_pb2 # type: ignore 

52 

53from google.cloud.bigquery_storage_v1.types import storage, stream, table 

54 

55from .client import BigQueryWriteClient 

56from .transports.base import DEFAULT_CLIENT_INFO, BigQueryWriteTransport 

57from .transports.grpc_asyncio import BigQueryWriteGrpcAsyncIOTransport 

58 

59try: 

60 from google.api_core import client_logging # type: ignore 

61 

62 CLIENT_LOGGING_SUPPORTED = True # pragma: NO COVER 

63except ImportError: # pragma: NO COVER 

64 CLIENT_LOGGING_SUPPORTED = False 

65 

66_LOGGER = std_logging.getLogger(__name__) 

67 

68 

69class BigQueryWriteAsyncClient: 

70 """BigQuery Write API. 

71 

72 The Write API can be used to write data to BigQuery. 

73 

74 For supplementary information about the Write API, see: 

75 

76 https://cloud.google.com/bigquery/docs/write-api 

77 """ 

78 

79 _client: BigQueryWriteClient 

80 

81 # Copy defaults from the synchronous client for use here. 

82 # Note: DEFAULT_ENDPOINT is deprecated. Use _DEFAULT_ENDPOINT_TEMPLATE instead. 

83 DEFAULT_ENDPOINT = BigQueryWriteClient.DEFAULT_ENDPOINT 

84 DEFAULT_MTLS_ENDPOINT = BigQueryWriteClient.DEFAULT_MTLS_ENDPOINT 

85 _DEFAULT_ENDPOINT_TEMPLATE = BigQueryWriteClient._DEFAULT_ENDPOINT_TEMPLATE 

86 _DEFAULT_UNIVERSE = BigQueryWriteClient._DEFAULT_UNIVERSE 

87 

88 table_path = staticmethod(BigQueryWriteClient.table_path) 

89 parse_table_path = staticmethod(BigQueryWriteClient.parse_table_path) 

90 write_stream_path = staticmethod(BigQueryWriteClient.write_stream_path) 

91 parse_write_stream_path = staticmethod(BigQueryWriteClient.parse_write_stream_path) 

92 common_billing_account_path = staticmethod( 

93 BigQueryWriteClient.common_billing_account_path 

94 ) 

95 parse_common_billing_account_path = staticmethod( 

96 BigQueryWriteClient.parse_common_billing_account_path 

97 ) 

98 common_folder_path = staticmethod(BigQueryWriteClient.common_folder_path) 

99 parse_common_folder_path = staticmethod( 

100 BigQueryWriteClient.parse_common_folder_path 

101 ) 

102 common_organization_path = staticmethod( 

103 BigQueryWriteClient.common_organization_path 

104 ) 

105 parse_common_organization_path = staticmethod( 

106 BigQueryWriteClient.parse_common_organization_path 

107 ) 

108 common_project_path = staticmethod(BigQueryWriteClient.common_project_path) 

109 parse_common_project_path = staticmethod( 

110 BigQueryWriteClient.parse_common_project_path 

111 ) 

112 common_location_path = staticmethod(BigQueryWriteClient.common_location_path) 

113 parse_common_location_path = staticmethod( 

114 BigQueryWriteClient.parse_common_location_path 

115 ) 

116 

117 @classmethod 

118 def from_service_account_info(cls, info: dict, *args, **kwargs): 

119 """Creates an instance of this client using the provided credentials 

120 info. 

121 

122 Args: 

123 info (dict): The service account private key info. 

124 args: Additional arguments to pass to the constructor. 

125 kwargs: Additional arguments to pass to the constructor. 

126 

127 Returns: 

128 BigQueryWriteAsyncClient: The constructed client. 

129 """ 

130 return BigQueryWriteClient.from_service_account_info.__func__(BigQueryWriteAsyncClient, info, *args, **kwargs) # type: ignore 

131 

132 @classmethod 

133 def from_service_account_file(cls, filename: str, *args, **kwargs): 

134 """Creates an instance of this client using the provided credentials 

135 file. 

136 

137 Args: 

138 filename (str): The path to the service account private key json 

139 file. 

140 args: Additional arguments to pass to the constructor. 

141 kwargs: Additional arguments to pass to the constructor. 

142 

143 Returns: 

144 BigQueryWriteAsyncClient: The constructed client. 

145 """ 

146 return BigQueryWriteClient.from_service_account_file.__func__(BigQueryWriteAsyncClient, filename, *args, **kwargs) # type: ignore 

147 

148 from_service_account_json = from_service_account_file 

149 

150 @classmethod 

151 def get_mtls_endpoint_and_cert_source( 

152 cls, client_options: Optional[ClientOptions] = None 

153 ): 

154 """Return the API endpoint and client cert source for mutual TLS. 

155 

156 The client cert source is determined in the following order: 

157 (1) if `GOOGLE_API_USE_CLIENT_CERTIFICATE` environment variable is not "true", the 

158 client cert source is None. 

159 (2) if `client_options.client_cert_source` is provided, use the provided one; if the 

160 default client cert source exists, use the default one; otherwise the client cert 

161 source is None. 

162 

163 The API endpoint is determined in the following order: 

164 (1) if `client_options.api_endpoint` if provided, use the provided one. 

165 (2) if `GOOGLE_API_USE_CLIENT_CERTIFICATE` environment variable is "always", use the 

166 default mTLS endpoint; if the environment variable is "never", use the default API 

167 endpoint; otherwise if client cert source exists, use the default mTLS endpoint, otherwise 

168 use the default API endpoint. 

169 

170 More details can be found at https://google.aip.dev/auth/4114. 

171 

172 Args: 

173 client_options (google.api_core.client_options.ClientOptions): Custom options for the 

174 client. Only the `api_endpoint` and `client_cert_source` properties may be used 

175 in this method. 

176 

177 Returns: 

178 Tuple[str, Callable[[], Tuple[bytes, bytes]]]: returns the API endpoint and the 

179 client cert source to use. 

180 

181 Raises: 

182 google.auth.exceptions.MutualTLSChannelError: If any errors happen. 

183 """ 

184 return BigQueryWriteClient.get_mtls_endpoint_and_cert_source(client_options) # type: ignore 

185 

186 @property 

187 def transport(self) -> BigQueryWriteTransport: 

188 """Returns the transport used by the client instance. 

189 

190 Returns: 

191 BigQueryWriteTransport: The transport used by the client instance. 

192 """ 

193 return self._client.transport 

194 

195 @property 

196 def api_endpoint(self): 

197 """Return the API endpoint used by the client instance. 

198 

199 Returns: 

200 str: The API endpoint used by the client instance. 

201 """ 

202 return self._client._api_endpoint 

203 

204 @property 

205 def universe_domain(self) -> str: 

206 """Return the universe domain used by the client instance. 

207 

208 Returns: 

209 str: The universe domain used 

210 by the client instance. 

211 """ 

212 return self._client._universe_domain 

213 

214 get_transport_class = BigQueryWriteClient.get_transport_class 

215 

216 def __init__( 

217 self, 

218 *, 

219 credentials: Optional[ga_credentials.Credentials] = None, 

220 transport: Optional[ 

221 Union[str, BigQueryWriteTransport, Callable[..., BigQueryWriteTransport]] 

222 ] = "grpc_asyncio", 

223 client_options: Optional[ClientOptions] = None, 

224 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, 

225 ) -> None: 

226 """Instantiates the big query write async client. 

227 

228 Args: 

229 credentials (Optional[google.auth.credentials.Credentials]): The 

230 authorization credentials to attach to requests. These 

231 credentials identify the application to the service; if none 

232 are specified, the client will attempt to ascertain the 

233 credentials from the environment. 

234 transport (Optional[Union[str,BigQueryWriteTransport,Callable[..., BigQueryWriteTransport]]]): 

235 The transport to use, or a Callable that constructs and returns a new transport to use. 

236 If a Callable is given, it will be called with the same set of initialization 

237 arguments as used in the BigQueryWriteTransport constructor. 

238 If set to None, a transport is chosen automatically. 

239 client_options (Optional[Union[google.api_core.client_options.ClientOptions, dict]]): 

240 Custom options for the client. 

241 

242 1. The ``api_endpoint`` property can be used to override the 

243 default endpoint provided by the client when ``transport`` is 

244 not explicitly provided. Only if this property is not set and 

245 ``transport`` was not explicitly provided, the endpoint is 

246 determined by the GOOGLE_API_USE_MTLS_ENDPOINT environment 

247 variable, which have one of the following values: 

248 "always" (always use the default mTLS endpoint), "never" (always 

249 use the default regular endpoint) and "auto" (auto-switch to the 

250 default mTLS endpoint if client certificate is present; this is 

251 the default value). 

252 

253 2. If the GOOGLE_API_USE_CLIENT_CERTIFICATE environment variable 

254 is "true", then the ``client_cert_source`` property can be used 

255 to provide a client certificate for mTLS transport. If 

256 not provided, the default SSL client certificate will be used if 

257 present. If GOOGLE_API_USE_CLIENT_CERTIFICATE is "false" or not 

258 set, no client certificate will be used. 

259 

260 3. The ``universe_domain`` property can be used to override the 

261 default "googleapis.com" universe. Note that ``api_endpoint`` 

262 property still takes precedence; and ``universe_domain`` is 

263 currently not supported for mTLS. 

264 

265 client_info (google.api_core.gapic_v1.client_info.ClientInfo): 

266 The client info used to send a user-agent string along with 

267 API requests. If ``None``, then default info will be used. 

268 Generally, you only need to set this if you're developing 

269 your own client library. 

270 

271 Raises: 

272 google.auth.exceptions.MutualTlsChannelError: If mutual TLS transport 

273 creation failed for any reason. 

274 """ 

275 self._client = BigQueryWriteClient( 

276 credentials=credentials, 

277 transport=transport, 

278 client_options=client_options, 

279 client_info=client_info, 

280 ) 

281 

282 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

283 std_logging.DEBUG 

284 ): # pragma: NO COVER 

285 _LOGGER.debug( 

286 "Created client `google.cloud.bigquery.storage_v1.BigQueryWriteAsyncClient`.", 

287 extra={ 

288 "serviceName": "google.cloud.bigquery.storage.v1.BigQueryWrite", 

289 "universeDomain": getattr( 

290 self._client._transport._credentials, "universe_domain", "" 

291 ), 

292 "credentialsType": f"{type(self._client._transport._credentials).__module__}.{type(self._client._transport._credentials).__qualname__}", 

293 "credentialsInfo": getattr( 

294 self.transport._credentials, "get_cred_info", lambda: None 

295 )(), 

296 } 

297 if hasattr(self._client._transport, "_credentials") 

298 else { 

299 "serviceName": "google.cloud.bigquery.storage.v1.BigQueryWrite", 

300 "credentialsType": None, 

301 }, 

302 ) 

303 

304 async def create_write_stream( 

305 self, 

306 request: Optional[Union[storage.CreateWriteStreamRequest, dict]] = None, 

307 *, 

308 parent: Optional[str] = None, 

309 write_stream: Optional[stream.WriteStream] = None, 

310 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

311 timeout: Union[float, object] = gapic_v1.method.DEFAULT, 

312 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (), 

313 ) -> stream.WriteStream: 

314 r"""Creates a write stream to the given table. Additionally, every 

315 table has a special stream named '_default' to which data can be 

316 written. This stream doesn't need to be created using 

317 CreateWriteStream. It is a stream that can be used 

318 simultaneously by any number of clients. Data written to this 

319 stream is considered committed as soon as an acknowledgement is 

320 received. 

321 

322 .. code-block:: python 

323 

324 # This snippet has been automatically generated and should be regarded as a 

325 # code template only. 

326 # It will require modifications to work: 

327 # - It may require correct/in-range values for request initialization. 

328 # - It may require specifying regional endpoints when creating the service 

329 # client as shown in: 

330 # https://googleapis.dev/python/google-api-core/latest/client_options.html 

331 from google.cloud import bigquery_storage_v1 

332 

333 async def sample_create_write_stream(): 

334 # Create a client 

335 client = bigquery_storage_v1.BigQueryWriteAsyncClient() 

336 

337 # Initialize request argument(s) 

338 request = bigquery_storage_v1.CreateWriteStreamRequest( 

339 parent="parent_value", 

340 ) 

341 

342 # Make the request 

343 response = await client.create_write_stream(request=request) 

344 

345 # Handle the response 

346 print(response) 

347 

348 Args: 

349 request (Optional[Union[google.cloud.bigquery_storage_v1.types.CreateWriteStreamRequest, dict]]): 

350 The request object. Request message for ``CreateWriteStream``. 

351 parent (:class:`str`): 

352 Required. Reference to the table to which the stream 

353 belongs, in the format of 

354 ``projects/{project}/datasets/{dataset}/tables/{table}``. 

355 

356 This corresponds to the ``parent`` field 

357 on the ``request`` instance; if ``request`` is provided, this 

358 should not be set. 

359 write_stream (:class:`google.cloud.bigquery_storage_v1.types.WriteStream`): 

360 Required. Stream to be created. 

361 This corresponds to the ``write_stream`` field 

362 on the ``request`` instance; if ``request`` is provided, this 

363 should not be set. 

364 retry (google.api_core.retry_async.AsyncRetry): Designation of what errors, if any, 

365 should be retried. 

366 timeout (float): The timeout for this request. 

367 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be 

368 sent along with the request as metadata. Normally, each value must be of type `str`, 

369 but for metadata keys ending with the suffix `-bin`, the corresponding values must 

370 be of type `bytes`. 

371 

372 Returns: 

373 google.cloud.bigquery_storage_v1.types.WriteStream: 

374 Information about a single stream 

375 that gets data inside the storage 

376 system. 

377 

378 """ 

379 # Create or coerce a protobuf request object. 

380 # - Quick check: If we got a request object, we should *not* have 

381 # gotten any keyword arguments that map to the request. 

382 flattened_params = [parent, write_stream] 

383 has_flattened_params = ( 

384 len([param for param in flattened_params if param is not None]) > 0 

385 ) 

386 if request is not None and has_flattened_params: 

387 raise ValueError( 

388 "If the `request` argument is set, then none of " 

389 "the individual field arguments should be set." 

390 ) 

391 

392 # - Use the request object if provided (there's no risk of modifying the input as 

393 # there are no flattened fields), or create one. 

394 if not isinstance(request, storage.CreateWriteStreamRequest): 

395 request = storage.CreateWriteStreamRequest(request) 

396 

397 # If we have keyword arguments corresponding to fields on the 

398 # request, apply these. 

399 if parent is not None: 

400 request.parent = parent 

401 if write_stream is not None: 

402 request.write_stream = write_stream 

403 

404 # Wrap the RPC method; this adds retry and timeout information, 

405 # and friendly error handling. 

406 rpc = self._client._transport._wrapped_methods[ 

407 self._client._transport.create_write_stream 

408 ] 

409 

410 # Certain fields should be provided within the metadata header; 

411 # add these here. 

412 metadata = tuple(metadata) + ( 

413 gapic_v1.routing_header.to_grpc_metadata((("parent", request.parent),)), 

414 ) 

415 

416 # Validate the universe domain. 

417 self._client._validate_universe_domain() 

418 

419 # Send the request. 

420 response = await rpc( 

421 request, 

422 retry=retry, 

423 timeout=timeout, 

424 metadata=metadata, 

425 ) 

426 

427 # Done; return the response. 

428 return response 

429 

430 def append_rows( 

431 self, 

432 requests: Optional[AsyncIterator[storage.AppendRowsRequest]] = None, 

433 *, 

434 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

435 timeout: Union[float, object] = gapic_v1.method.DEFAULT, 

436 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (), 

437 ) -> Awaitable[AsyncIterable[storage.AppendRowsResponse]]: 

438 r"""Appends data to the given stream. 

439 

440 If ``offset`` is specified, the ``offset`` is checked against 

441 the end of stream. The server returns ``OUT_OF_RANGE`` in 

442 ``AppendRowsResponse`` if an attempt is made to append to an 

443 offset beyond the current end of the stream or 

444 ``ALREADY_EXISTS`` if user provides an ``offset`` that has 

445 already been written to. User can retry with adjusted offset 

446 within the same RPC connection. If ``offset`` is not specified, 

447 append happens at the end of the stream. 

448 

449 The response contains an optional offset at which the append 

450 happened. No offset information will be returned for appends to 

451 a default stream. 

452 

453 Responses are received in the same order in which requests are 

454 sent. There will be one response for each successful inserted 

455 request. Responses may optionally embed error information if the 

456 originating AppendRequest was not successfully processed. 

457 

458 The specifics of when successfully appended data is made visible 

459 to the table are governed by the type of stream: 

460 

461 - For COMMITTED streams (which includes the default stream), 

462 data is visible immediately upon successful append. 

463 

464 - For BUFFERED streams, data is made visible via a subsequent 

465 ``FlushRows`` rpc which advances a cursor to a newer offset 

466 in the stream. 

467 

468 - For PENDING streams, data is not made visible until the 

469 stream itself is finalized (via the ``FinalizeWriteStream`` 

470 rpc), and the stream is explicitly committed via the 

471 ``BatchCommitWriteStreams`` rpc. 

472 

473 .. code-block:: python 

474 

475 # This snippet has been automatically generated and should be regarded as a 

476 # code template only. 

477 # It will require modifications to work: 

478 # - It may require correct/in-range values for request initialization. 

479 # - It may require specifying regional endpoints when creating the service 

480 # client as shown in: 

481 # https://googleapis.dev/python/google-api-core/latest/client_options.html 

482 from google.cloud import bigquery_storage_v1 

483 

484 async def sample_append_rows(): 

485 # Create a client 

486 client = bigquery_storage_v1.BigQueryWriteAsyncClient() 

487 

488 # Initialize request argument(s) 

489 request = bigquery_storage_v1.AppendRowsRequest( 

490 write_stream="write_stream_value", 

491 ) 

492 

493 # This method expects an iterator which contains 

494 # 'bigquery_storage_v1.AppendRowsRequest' objects 

495 # Here we create a generator that yields a single `request` for 

496 # demonstrative purposes. 

497 requests = [request] 

498 

499 def request_generator(): 

500 for request in requests: 

501 yield request 

502 

503 # Make the request 

504 stream = await client.append_rows(requests=request_generator()) 

505 

506 # Handle the response 

507 async for response in stream: 

508 print(response) 

509 

510 Args: 

511 requests (AsyncIterator[`google.cloud.bigquery_storage_v1.types.AppendRowsRequest`]): 

512 The request object AsyncIterator. Request message for ``AppendRows``. 

513 

514 Because AppendRows is a bidirectional streaming RPC, 

515 certain parts of the AppendRowsRequest need only be 

516 specified for the first request before switching table 

517 destinations. You can also switch table destinations 

518 within the same connection for the default stream. 

519 

520 The size of a single AppendRowsRequest must be less than 

521 10 MB in size. Requests larger than this return an 

522 error, typically ``INVALID_ARGUMENT``. 

523 retry (google.api_core.retry_async.AsyncRetry): Designation of what errors, if any, 

524 should be retried. 

525 timeout (float): The timeout for this request. 

526 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be 

527 sent along with the request as metadata. Normally, each value must be of type `str`, 

528 but for metadata keys ending with the suffix `-bin`, the corresponding values must 

529 be of type `bytes`. 

530 

531 Returns: 

532 AsyncIterable[google.cloud.bigquery_storage_v1.types.AppendRowsResponse]: 

533 Response message for AppendRows. 

534 """ 

535 

536 # Wrap the RPC method; this adds retry and timeout information, 

537 # and friendly error handling. 

538 rpc = self._client._transport._wrapped_methods[ 

539 self._client._transport.append_rows 

540 ] 

541 

542 # Certain fields should be provided within the metadata header; 

543 # add these here. 

544 metadata = tuple(metadata) + (gapic_v1.routing_header.to_grpc_metadata(()),) 

545 

546 # Validate the universe domain. 

547 self._client._validate_universe_domain() 

548 

549 # Send the request. 

550 response = rpc( 

551 requests, 

552 retry=retry, 

553 timeout=timeout, 

554 metadata=metadata, 

555 ) 

556 

557 # Done; return the response. 

558 return response 

559 

560 async def get_write_stream( 

561 self, 

562 request: Optional[Union[storage.GetWriteStreamRequest, dict]] = None, 

563 *, 

564 name: Optional[str] = None, 

565 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

566 timeout: Union[float, object] = gapic_v1.method.DEFAULT, 

567 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (), 

568 ) -> stream.WriteStream: 

569 r"""Gets information about a write stream. 

570 

571 .. code-block:: python 

572 

573 # This snippet has been automatically generated and should be regarded as a 

574 # code template only. 

575 # It will require modifications to work: 

576 # - It may require correct/in-range values for request initialization. 

577 # - It may require specifying regional endpoints when creating the service 

578 # client as shown in: 

579 # https://googleapis.dev/python/google-api-core/latest/client_options.html 

580 from google.cloud import bigquery_storage_v1 

581 

582 async def sample_get_write_stream(): 

583 # Create a client 

584 client = bigquery_storage_v1.BigQueryWriteAsyncClient() 

585 

586 # Initialize request argument(s) 

587 request = bigquery_storage_v1.GetWriteStreamRequest( 

588 name="name_value", 

589 ) 

590 

591 # Make the request 

592 response = await client.get_write_stream(request=request) 

593 

594 # Handle the response 

595 print(response) 

596 

597 Args: 

598 request (Optional[Union[google.cloud.bigquery_storage_v1.types.GetWriteStreamRequest, dict]]): 

599 The request object. Request message for ``GetWriteStreamRequest``. 

600 name (:class:`str`): 

601 Required. Name of the stream to get, in the form of 

602 ``projects/{project}/datasets/{dataset}/tables/{table}/streams/{stream}``. 

603 

604 This corresponds to the ``name`` field 

605 on the ``request`` instance; if ``request`` is provided, this 

606 should not be set. 

607 retry (google.api_core.retry_async.AsyncRetry): Designation of what errors, if any, 

608 should be retried. 

609 timeout (float): The timeout for this request. 

610 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be 

611 sent along with the request as metadata. Normally, each value must be of type `str`, 

612 but for metadata keys ending with the suffix `-bin`, the corresponding values must 

613 be of type `bytes`. 

614 

615 Returns: 

616 google.cloud.bigquery_storage_v1.types.WriteStream: 

617 Information about a single stream 

618 that gets data inside the storage 

619 system. 

620 

621 """ 

622 # Create or coerce a protobuf request object. 

623 # - Quick check: If we got a request object, we should *not* have 

624 # gotten any keyword arguments that map to the request. 

625 flattened_params = [name] 

626 has_flattened_params = ( 

627 len([param for param in flattened_params if param is not None]) > 0 

628 ) 

629 if request is not None and has_flattened_params: 

630 raise ValueError( 

631 "If the `request` argument is set, then none of " 

632 "the individual field arguments should be set." 

633 ) 

634 

635 # - Use the request object if provided (there's no risk of modifying the input as 

636 # there are no flattened fields), or create one. 

637 if not isinstance(request, storage.GetWriteStreamRequest): 

638 request = storage.GetWriteStreamRequest(request) 

639 

640 # If we have keyword arguments corresponding to fields on the 

641 # request, apply these. 

642 if name is not None: 

643 request.name = name 

644 

645 # Wrap the RPC method; this adds retry and timeout information, 

646 # and friendly error handling. 

647 rpc = self._client._transport._wrapped_methods[ 

648 self._client._transport.get_write_stream 

649 ] 

650 

651 # Certain fields should be provided within the metadata header; 

652 # add these here. 

653 metadata = tuple(metadata) + ( 

654 gapic_v1.routing_header.to_grpc_metadata((("name", request.name),)), 

655 ) 

656 

657 # Validate the universe domain. 

658 self._client._validate_universe_domain() 

659 

660 # Send the request. 

661 response = await rpc( 

662 request, 

663 retry=retry, 

664 timeout=timeout, 

665 metadata=metadata, 

666 ) 

667 

668 # Done; return the response. 

669 return response 

670 

671 async def finalize_write_stream( 

672 self, 

673 request: Optional[Union[storage.FinalizeWriteStreamRequest, dict]] = None, 

674 *, 

675 name: Optional[str] = None, 

676 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

677 timeout: Union[float, object] = gapic_v1.method.DEFAULT, 

678 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (), 

679 ) -> storage.FinalizeWriteStreamResponse: 

680 r"""Finalize a write stream so that no new data can be appended to 

681 the stream. Finalize is not supported on the '_default' stream. 

682 

683 .. code-block:: python 

684 

685 # This snippet has been automatically generated and should be regarded as a 

686 # code template only. 

687 # It will require modifications to work: 

688 # - It may require correct/in-range values for request initialization. 

689 # - It may require specifying regional endpoints when creating the service 

690 # client as shown in: 

691 # https://googleapis.dev/python/google-api-core/latest/client_options.html 

692 from google.cloud import bigquery_storage_v1 

693 

694 async def sample_finalize_write_stream(): 

695 # Create a client 

696 client = bigquery_storage_v1.BigQueryWriteAsyncClient() 

697 

698 # Initialize request argument(s) 

699 request = bigquery_storage_v1.FinalizeWriteStreamRequest( 

700 name="name_value", 

701 ) 

702 

703 # Make the request 

704 response = await client.finalize_write_stream(request=request) 

705 

706 # Handle the response 

707 print(response) 

708 

709 Args: 

710 request (Optional[Union[google.cloud.bigquery_storage_v1.types.FinalizeWriteStreamRequest, dict]]): 

711 The request object. Request message for invoking ``FinalizeWriteStream``. 

712 name (:class:`str`): 

713 Required. Name of the stream to finalize, in the form of 

714 ``projects/{project}/datasets/{dataset}/tables/{table}/streams/{stream}``. 

715 

716 This corresponds to the ``name`` field 

717 on the ``request`` instance; if ``request`` is provided, this 

718 should not be set. 

719 retry (google.api_core.retry_async.AsyncRetry): Designation of what errors, if any, 

720 should be retried. 

721 timeout (float): The timeout for this request. 

722 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be 

723 sent along with the request as metadata. Normally, each value must be of type `str`, 

724 but for metadata keys ending with the suffix `-bin`, the corresponding values must 

725 be of type `bytes`. 

726 

727 Returns: 

728 google.cloud.bigquery_storage_v1.types.FinalizeWriteStreamResponse: 

729 Response message for FinalizeWriteStream. 

730 """ 

731 # Create or coerce a protobuf request object. 

732 # - Quick check: If we got a request object, we should *not* have 

733 # gotten any keyword arguments that map to the request. 

734 flattened_params = [name] 

735 has_flattened_params = ( 

736 len([param for param in flattened_params if param is not None]) > 0 

737 ) 

738 if request is not None and has_flattened_params: 

739 raise ValueError( 

740 "If the `request` argument is set, then none of " 

741 "the individual field arguments should be set." 

742 ) 

743 

744 # - Use the request object if provided (there's no risk of modifying the input as 

745 # there are no flattened fields), or create one. 

746 if not isinstance(request, storage.FinalizeWriteStreamRequest): 

747 request = storage.FinalizeWriteStreamRequest(request) 

748 

749 # If we have keyword arguments corresponding to fields on the 

750 # request, apply these. 

751 if name is not None: 

752 request.name = name 

753 

754 # Wrap the RPC method; this adds retry and timeout information, 

755 # and friendly error handling. 

756 rpc = self._client._transport._wrapped_methods[ 

757 self._client._transport.finalize_write_stream 

758 ] 

759 

760 # Certain fields should be provided within the metadata header; 

761 # add these here. 

762 metadata = tuple(metadata) + ( 

763 gapic_v1.routing_header.to_grpc_metadata((("name", request.name),)), 

764 ) 

765 

766 # Validate the universe domain. 

767 self._client._validate_universe_domain() 

768 

769 # Send the request. 

770 response = await rpc( 

771 request, 

772 retry=retry, 

773 timeout=timeout, 

774 metadata=metadata, 

775 ) 

776 

777 # Done; return the response. 

778 return response 

779 

780 async def batch_commit_write_streams( 

781 self, 

782 request: Optional[Union[storage.BatchCommitWriteStreamsRequest, dict]] = None, 

783 *, 

784 parent: Optional[str] = None, 

785 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

786 timeout: Union[float, object] = gapic_v1.method.DEFAULT, 

787 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (), 

788 ) -> storage.BatchCommitWriteStreamsResponse: 

789 r"""Atomically commits a group of ``PENDING`` streams that belong to 

790 the same ``parent`` table. 

791 

792 Streams must be finalized before commit and cannot be committed 

793 multiple times. Once a stream is committed, data in the stream 

794 becomes available for read operations. 

795 

796 .. code-block:: python 

797 

798 # This snippet has been automatically generated and should be regarded as a 

799 # code template only. 

800 # It will require modifications to work: 

801 # - It may require correct/in-range values for request initialization. 

802 # - It may require specifying regional endpoints when creating the service 

803 # client as shown in: 

804 # https://googleapis.dev/python/google-api-core/latest/client_options.html 

805 from google.cloud import bigquery_storage_v1 

806 

807 async def sample_batch_commit_write_streams(): 

808 # Create a client 

809 client = bigquery_storage_v1.BigQueryWriteAsyncClient() 

810 

811 # Initialize request argument(s) 

812 request = bigquery_storage_v1.BatchCommitWriteStreamsRequest( 

813 parent="parent_value", 

814 write_streams=['write_streams_value1', 'write_streams_value2'], 

815 ) 

816 

817 # Make the request 

818 response = await client.batch_commit_write_streams(request=request) 

819 

820 # Handle the response 

821 print(response) 

822 

823 Args: 

824 request (Optional[Union[google.cloud.bigquery_storage_v1.types.BatchCommitWriteStreamsRequest, dict]]): 

825 The request object. Request message for ``BatchCommitWriteStreams``. 

826 parent (:class:`str`): 

827 Required. Parent table that all the streams should 

828 belong to, in the form of 

829 ``projects/{project}/datasets/{dataset}/tables/{table}``. 

830 

831 This corresponds to the ``parent`` field 

832 on the ``request`` instance; if ``request`` is provided, this 

833 should not be set. 

834 retry (google.api_core.retry_async.AsyncRetry): Designation of what errors, if any, 

835 should be retried. 

836 timeout (float): The timeout for this request. 

837 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be 

838 sent along with the request as metadata. Normally, each value must be of type `str`, 

839 but for metadata keys ending with the suffix `-bin`, the corresponding values must 

840 be of type `bytes`. 

841 

842 Returns: 

843 google.cloud.bigquery_storage_v1.types.BatchCommitWriteStreamsResponse: 

844 Response message for BatchCommitWriteStreams. 

845 """ 

846 # Create or coerce a protobuf request object. 

847 # - Quick check: If we got a request object, we should *not* have 

848 # gotten any keyword arguments that map to the request. 

849 flattened_params = [parent] 

850 has_flattened_params = ( 

851 len([param for param in flattened_params if param is not None]) > 0 

852 ) 

853 if request is not None and has_flattened_params: 

854 raise ValueError( 

855 "If the `request` argument is set, then none of " 

856 "the individual field arguments should be set." 

857 ) 

858 

859 # - Use the request object if provided (there's no risk of modifying the input as 

860 # there are no flattened fields), or create one. 

861 if not isinstance(request, storage.BatchCommitWriteStreamsRequest): 

862 request = storage.BatchCommitWriteStreamsRequest(request) 

863 

864 # If we have keyword arguments corresponding to fields on the 

865 # request, apply these. 

866 if parent is not None: 

867 request.parent = parent 

868 

869 # Wrap the RPC method; this adds retry and timeout information, 

870 # and friendly error handling. 

871 rpc = self._client._transport._wrapped_methods[ 

872 self._client._transport.batch_commit_write_streams 

873 ] 

874 

875 # Certain fields should be provided within the metadata header; 

876 # add these here. 

877 metadata = tuple(metadata) + ( 

878 gapic_v1.routing_header.to_grpc_metadata((("parent", request.parent),)), 

879 ) 

880 

881 # Validate the universe domain. 

882 self._client._validate_universe_domain() 

883 

884 # Send the request. 

885 response = await rpc( 

886 request, 

887 retry=retry, 

888 timeout=timeout, 

889 metadata=metadata, 

890 ) 

891 

892 # Done; return the response. 

893 return response 

894 

895 async def flush_rows( 

896 self, 

897 request: Optional[Union[storage.FlushRowsRequest, dict]] = None, 

898 *, 

899 write_stream: Optional[str] = None, 

900 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

901 timeout: Union[float, object] = gapic_v1.method.DEFAULT, 

902 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (), 

903 ) -> storage.FlushRowsResponse: 

904 r"""Flushes rows to a BUFFERED stream. 

905 

906 If users are appending rows to BUFFERED stream, flush operation 

907 is required in order for the rows to become available for 

908 reading. A Flush operation flushes up to any previously flushed 

909 offset in a BUFFERED stream, to the offset specified in the 

910 request. 

911 

912 Flush is not supported on the \_default stream, since it is not 

913 BUFFERED. 

914 

915 .. code-block:: python 

916 

917 # This snippet has been automatically generated and should be regarded as a 

918 # code template only. 

919 # It will require modifications to work: 

920 # - It may require correct/in-range values for request initialization. 

921 # - It may require specifying regional endpoints when creating the service 

922 # client as shown in: 

923 # https://googleapis.dev/python/google-api-core/latest/client_options.html 

924 from google.cloud import bigquery_storage_v1 

925 

926 async def sample_flush_rows(): 

927 # Create a client 

928 client = bigquery_storage_v1.BigQueryWriteAsyncClient() 

929 

930 # Initialize request argument(s) 

931 request = bigquery_storage_v1.FlushRowsRequest( 

932 write_stream="write_stream_value", 

933 ) 

934 

935 # Make the request 

936 response = await client.flush_rows(request=request) 

937 

938 # Handle the response 

939 print(response) 

940 

941 Args: 

942 request (Optional[Union[google.cloud.bigquery_storage_v1.types.FlushRowsRequest, dict]]): 

943 The request object. Request message for ``FlushRows``. 

944 write_stream (:class:`str`): 

945 Required. The stream that is the 

946 target of the flush operation. 

947 

948 This corresponds to the ``write_stream`` field 

949 on the ``request`` instance; if ``request`` is provided, this 

950 should not be set. 

951 retry (google.api_core.retry_async.AsyncRetry): Designation of what errors, if any, 

952 should be retried. 

953 timeout (float): The timeout for this request. 

954 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be 

955 sent along with the request as metadata. Normally, each value must be of type `str`, 

956 but for metadata keys ending with the suffix `-bin`, the corresponding values must 

957 be of type `bytes`. 

958 

959 Returns: 

960 google.cloud.bigquery_storage_v1.types.FlushRowsResponse: 

961 Respond message for FlushRows. 

962 """ 

963 # Create or coerce a protobuf request object. 

964 # - Quick check: If we got a request object, we should *not* have 

965 # gotten any keyword arguments that map to the request. 

966 flattened_params = [write_stream] 

967 has_flattened_params = ( 

968 len([param for param in flattened_params if param is not None]) > 0 

969 ) 

970 if request is not None and has_flattened_params: 

971 raise ValueError( 

972 "If the `request` argument is set, then none of " 

973 "the individual field arguments should be set." 

974 ) 

975 

976 # - Use the request object if provided (there's no risk of modifying the input as 

977 # there are no flattened fields), or create one. 

978 if not isinstance(request, storage.FlushRowsRequest): 

979 request = storage.FlushRowsRequest(request) 

980 

981 # If we have keyword arguments corresponding to fields on the 

982 # request, apply these. 

983 if write_stream is not None: 

984 request.write_stream = write_stream 

985 

986 # Wrap the RPC method; this adds retry and timeout information, 

987 # and friendly error handling. 

988 rpc = self._client._transport._wrapped_methods[ 

989 self._client._transport.flush_rows 

990 ] 

991 

992 # Certain fields should be provided within the metadata header; 

993 # add these here. 

994 metadata = tuple(metadata) + ( 

995 gapic_v1.routing_header.to_grpc_metadata( 

996 (("write_stream", request.write_stream),) 

997 ), 

998 ) 

999 

1000 # Validate the universe domain. 

1001 self._client._validate_universe_domain() 

1002 

1003 # Send the request. 

1004 response = await rpc( 

1005 request, 

1006 retry=retry, 

1007 timeout=timeout, 

1008 metadata=metadata, 

1009 ) 

1010 

1011 # Done; return the response. 

1012 return response 

1013 

1014 async def __aenter__(self) -> "BigQueryWriteAsyncClient": 

1015 return self 

1016 

1017 async def __aexit__(self, exc_type, exc, tb): 

1018 await self.transport.close() 

1019 

1020 

1021DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo( 

1022 gapic_version=package_version.__version__ 

1023) 

1024 

1025if hasattr(DEFAULT_CLIENT_INFO, "protobuf_runtime_version"): # pragma: NO COVER 

1026 DEFAULT_CLIENT_INFO.protobuf_runtime_version = google.protobuf.__version__ 

1027 

1028 

1029__all__ = ("BigQueryWriteAsyncClient",)