Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/cloud/bigquery_storage_v1/services/big_query_write/transports/grpc_asyncio.py: 47%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

103 statements  

1# -*- coding: utf-8 -*- 

2# Copyright 2025 Google LLC 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16import inspect 

17import json 

18import logging as std_logging 

19import pickle 

20from typing import Awaitable, Callable, Dict, Optional, Sequence, Tuple, Union 

21import warnings 

22 

23from google.api_core import exceptions as core_exceptions 

24from google.api_core import gapic_v1, grpc_helpers_async 

25from google.api_core import retry_async as retries 

26from google.auth import credentials as ga_credentials # type: ignore 

27from google.auth.transport.grpc import SslCredentials # type: ignore 

28from google.protobuf.json_format import MessageToJson 

29import google.protobuf.message 

30import grpc # type: ignore 

31from grpc.experimental import aio # type: ignore 

32import proto # type: ignore 

33 

34from google.cloud.bigquery_storage_v1.types import storage, stream 

35 

36from .base import DEFAULT_CLIENT_INFO, BigQueryWriteTransport 

37from .grpc import BigQueryWriteGrpcTransport 

38 

39try: 

40 from google.api_core import client_logging # type: ignore 

41 

42 CLIENT_LOGGING_SUPPORTED = True # pragma: NO COVER 

43except ImportError: # pragma: NO COVER 

44 CLIENT_LOGGING_SUPPORTED = False 

45 

46_LOGGER = std_logging.getLogger(__name__) 

47 

48 

49class _LoggingClientAIOInterceptor( 

50 grpc.aio.UnaryUnaryClientInterceptor 

51): # pragma: NO COVER 

52 async def intercept_unary_unary(self, continuation, client_call_details, request): 

53 logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

54 std_logging.DEBUG 

55 ) 

56 if logging_enabled: # pragma: NO COVER 

57 request_metadata = client_call_details.metadata 

58 if isinstance(request, proto.Message): 

59 request_payload = type(request).to_json(request) 

60 elif isinstance(request, google.protobuf.message.Message): 

61 request_payload = MessageToJson(request) 

62 else: 

63 request_payload = f"{type(request).__name__}: {pickle.dumps(request)}" 

64 

65 request_metadata = { 

66 key: value.decode("utf-8") if isinstance(value, bytes) else value 

67 for key, value in request_metadata 

68 } 

69 grpc_request = { 

70 "payload": request_payload, 

71 "requestMethod": "grpc", 

72 "metadata": dict(request_metadata), 

73 } 

74 _LOGGER.debug( 

75 f"Sending request for {client_call_details.method}", 

76 extra={ 

77 "serviceName": "google.cloud.bigquery.storage.v1.BigQueryWrite", 

78 "rpcName": str(client_call_details.method), 

79 "request": grpc_request, 

80 "metadata": grpc_request["metadata"], 

81 }, 

82 ) 

83 response = await continuation(client_call_details, request) 

84 if logging_enabled: # pragma: NO COVER 

85 response_metadata = await response.trailing_metadata() 

86 # Convert gRPC metadata `<class 'grpc.aio._metadata.Metadata'>` to list of tuples 

87 metadata = ( 

88 dict([(k, str(v)) for k, v in response_metadata]) 

89 if response_metadata 

90 else None 

91 ) 

92 result = await response 

93 if isinstance(result, proto.Message): 

94 response_payload = type(result).to_json(result) 

95 elif isinstance(result, google.protobuf.message.Message): 

96 response_payload = MessageToJson(result) 

97 else: 

98 response_payload = f"{type(result).__name__}: {pickle.dumps(result)}" 

99 grpc_response = { 

100 "payload": response_payload, 

101 "metadata": metadata, 

102 "status": "OK", 

103 } 

104 _LOGGER.debug( 

105 f"Received response to rpc {client_call_details.method}.", 

106 extra={ 

107 "serviceName": "google.cloud.bigquery.storage.v1.BigQueryWrite", 

108 "rpcName": str(client_call_details.method), 

109 "response": grpc_response, 

110 "metadata": grpc_response["metadata"], 

111 }, 

112 ) 

113 return response 

114 

115 

116class BigQueryWriteGrpcAsyncIOTransport(BigQueryWriteTransport): 

117 """gRPC AsyncIO backend transport for BigQueryWrite. 

118 

119 BigQuery Write API. 

120 

121 The Write API can be used to write data to BigQuery. 

122 

123 For supplementary information about the Write API, see: 

124 

125 https://cloud.google.com/bigquery/docs/write-api 

126 

127 This class defines the same methods as the primary client, so the 

128 primary client can load the underlying transport implementation 

129 and call it. 

130 

131 It sends protocol buffers over the wire using gRPC (which is built on 

132 top of HTTP/2); the ``grpcio`` package must be installed. 

133 """ 

134 

135 _grpc_channel: aio.Channel 

136 _stubs: Dict[str, Callable] = {} 

137 

138 @classmethod 

139 def create_channel( 

140 cls, 

141 host: str = "bigquerystorage.googleapis.com", 

142 credentials: Optional[ga_credentials.Credentials] = None, 

143 credentials_file: Optional[str] = None, 

144 scopes: Optional[Sequence[str]] = None, 

145 quota_project_id: Optional[str] = None, 

146 **kwargs, 

147 ) -> aio.Channel: 

148 """Create and return a gRPC AsyncIO channel object. 

149 Args: 

150 host (Optional[str]): The host for the channel to use. 

151 credentials (Optional[~.Credentials]): The 

152 authorization credentials to attach to requests. These 

153 credentials identify this application to the service. If 

154 none are specified, the client will attempt to ascertain 

155 the credentials from the environment. 

156 credentials_file (Optional[str]): A file with credentials that can 

157 be loaded with :func:`google.auth.load_credentials_from_file`. 

158 scopes (Optional[Sequence[str]]): A optional list of scopes needed for this 

159 service. These are only used when credentials are not specified and 

160 are passed to :func:`google.auth.default`. 

161 quota_project_id (Optional[str]): An optional project to use for billing 

162 and quota. 

163 kwargs (Optional[dict]): Keyword arguments, which are passed to the 

164 channel creation. 

165 Returns: 

166 aio.Channel: A gRPC AsyncIO channel object. 

167 """ 

168 

169 return grpc_helpers_async.create_channel( 

170 host, 

171 credentials=credentials, 

172 credentials_file=credentials_file, 

173 quota_project_id=quota_project_id, 

174 default_scopes=cls.AUTH_SCOPES, 

175 scopes=scopes, 

176 default_host=cls.DEFAULT_HOST, 

177 **kwargs, 

178 ) 

179 

180 def __init__( 

181 self, 

182 *, 

183 host: str = "bigquerystorage.googleapis.com", 

184 credentials: Optional[ga_credentials.Credentials] = None, 

185 credentials_file: Optional[str] = None, 

186 scopes: Optional[Sequence[str]] = None, 

187 channel: Optional[Union[aio.Channel, Callable[..., aio.Channel]]] = None, 

188 api_mtls_endpoint: Optional[str] = None, 

189 client_cert_source: Optional[Callable[[], Tuple[bytes, bytes]]] = None, 

190 ssl_channel_credentials: Optional[grpc.ChannelCredentials] = None, 

191 client_cert_source_for_mtls: Optional[Callable[[], Tuple[bytes, bytes]]] = None, 

192 quota_project_id: Optional[str] = None, 

193 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, 

194 always_use_jwt_access: Optional[bool] = False, 

195 api_audience: Optional[str] = None, 

196 ) -> None: 

197 """Instantiate the transport. 

198 

199 Args: 

200 host (Optional[str]): 

201 The hostname to connect to (default: 'bigquerystorage.googleapis.com'). 

202 credentials (Optional[google.auth.credentials.Credentials]): The 

203 authorization credentials to attach to requests. These 

204 credentials identify the application to the service; if none 

205 are specified, the client will attempt to ascertain the 

206 credentials from the environment. 

207 This argument is ignored if a ``channel`` instance is provided. 

208 credentials_file (Optional[str]): A file with credentials that can 

209 be loaded with :func:`google.auth.load_credentials_from_file`. 

210 This argument is ignored if a ``channel`` instance is provided. 

211 scopes (Optional[Sequence[str]]): A optional list of scopes needed for this 

212 service. These are only used when credentials are not specified and 

213 are passed to :func:`google.auth.default`. 

214 channel (Optional[Union[aio.Channel, Callable[..., aio.Channel]]]): 

215 A ``Channel`` instance through which to make calls, or a Callable 

216 that constructs and returns one. If set to None, ``self.create_channel`` 

217 is used to create the channel. If a Callable is given, it will be called 

218 with the same arguments as used in ``self.create_channel``. 

219 api_mtls_endpoint (Optional[str]): Deprecated. The mutual TLS endpoint. 

220 If provided, it overrides the ``host`` argument and tries to create 

221 a mutual TLS channel with client SSL credentials from 

222 ``client_cert_source`` or application default SSL credentials. 

223 client_cert_source (Optional[Callable[[], Tuple[bytes, bytes]]]): 

224 Deprecated. A callback to provide client SSL certificate bytes and 

225 private key bytes, both in PEM format. It is ignored if 

226 ``api_mtls_endpoint`` is None. 

227 ssl_channel_credentials (grpc.ChannelCredentials): SSL credentials 

228 for the grpc channel. It is ignored if a ``channel`` instance is provided. 

229 client_cert_source_for_mtls (Optional[Callable[[], Tuple[bytes, bytes]]]): 

230 A callback to provide client certificate bytes and private key bytes, 

231 both in PEM format. It is used to configure a mutual TLS channel. It is 

232 ignored if a ``channel`` instance or ``ssl_channel_credentials`` is provided. 

233 quota_project_id (Optional[str]): An optional project to use for billing 

234 and quota. 

235 client_info (google.api_core.gapic_v1.client_info.ClientInfo): 

236 The client info used to send a user-agent string along with 

237 API requests. If ``None``, then default info will be used. 

238 Generally, you only need to set this if you're developing 

239 your own client library. 

240 always_use_jwt_access (Optional[bool]): Whether self signed JWT should 

241 be used for service account credentials. 

242 

243 Raises: 

244 google.auth.exceptions.MutualTlsChannelError: If mutual TLS transport 

245 creation failed for any reason. 

246 google.api_core.exceptions.DuplicateCredentialArgs: If both ``credentials`` 

247 and ``credentials_file`` are passed. 

248 """ 

249 self._grpc_channel = None 

250 self._ssl_channel_credentials = ssl_channel_credentials 

251 self._stubs: Dict[str, Callable] = {} 

252 

253 if api_mtls_endpoint: 

254 warnings.warn("api_mtls_endpoint is deprecated", DeprecationWarning) 

255 if client_cert_source: 

256 warnings.warn("client_cert_source is deprecated", DeprecationWarning) 

257 

258 if isinstance(channel, aio.Channel): 

259 # Ignore credentials if a channel was passed. 

260 credentials = None 

261 self._ignore_credentials = True 

262 # If a channel was explicitly provided, set it. 

263 self._grpc_channel = channel 

264 self._ssl_channel_credentials = None 

265 else: 

266 if api_mtls_endpoint: 

267 host = api_mtls_endpoint 

268 

269 # Create SSL credentials with client_cert_source or application 

270 # default SSL credentials. 

271 if client_cert_source: 

272 cert, key = client_cert_source() 

273 self._ssl_channel_credentials = grpc.ssl_channel_credentials( 

274 certificate_chain=cert, private_key=key 

275 ) 

276 else: 

277 self._ssl_channel_credentials = SslCredentials().ssl_credentials 

278 

279 else: 

280 if client_cert_source_for_mtls and not ssl_channel_credentials: 

281 cert, key = client_cert_source_for_mtls() 

282 self._ssl_channel_credentials = grpc.ssl_channel_credentials( 

283 certificate_chain=cert, private_key=key 

284 ) 

285 

286 # The base transport sets the host, credentials and scopes 

287 super().__init__( 

288 host=host, 

289 credentials=credentials, 

290 credentials_file=credentials_file, 

291 scopes=scopes, 

292 quota_project_id=quota_project_id, 

293 client_info=client_info, 

294 always_use_jwt_access=always_use_jwt_access, 

295 api_audience=api_audience, 

296 ) 

297 

298 if not self._grpc_channel: 

299 # initialize with the provided callable or the default channel 

300 channel_init = channel or type(self).create_channel 

301 self._grpc_channel = channel_init( 

302 self._host, 

303 # use the credentials which are saved 

304 credentials=self._credentials, 

305 # Set ``credentials_file`` to ``None`` here as 

306 # the credentials that we saved earlier should be used. 

307 credentials_file=None, 

308 scopes=self._scopes, 

309 ssl_credentials=self._ssl_channel_credentials, 

310 quota_project_id=quota_project_id, 

311 options=[ 

312 ("grpc.max_send_message_length", -1), 

313 ("grpc.max_receive_message_length", -1), 

314 ], 

315 ) 

316 

317 self._interceptor = _LoggingClientAIOInterceptor() 

318 self._grpc_channel._unary_unary_interceptors.append(self._interceptor) 

319 self._logged_channel = self._grpc_channel 

320 self._wrap_with_kind = ( 

321 "kind" in inspect.signature(gapic_v1.method_async.wrap_method).parameters 

322 ) 

323 # Wrap messages. This must be done after self._logged_channel exists 

324 self._prep_wrapped_messages(client_info) 

325 

326 @property 

327 def grpc_channel(self) -> aio.Channel: 

328 """Create the channel designed to connect to this service. 

329 

330 This property caches on the instance; repeated calls return 

331 the same channel. 

332 """ 

333 # Return the channel from cache. 

334 return self._grpc_channel 

335 

336 @property 

337 def create_write_stream( 

338 self, 

339 ) -> Callable[[storage.CreateWriteStreamRequest], Awaitable[stream.WriteStream]]: 

340 r"""Return a callable for the create write stream method over gRPC. 

341 

342 Creates a write stream to the given table. Additionally, every 

343 table has a special stream named '_default' to which data can be 

344 written. This stream doesn't need to be created using 

345 CreateWriteStream. It is a stream that can be used 

346 simultaneously by any number of clients. Data written to this 

347 stream is considered committed as soon as an acknowledgement is 

348 received. 

349 

350 Returns: 

351 Callable[[~.CreateWriteStreamRequest], 

352 Awaitable[~.WriteStream]]: 

353 A function that, when called, will call the underlying RPC 

354 on the server. 

355 """ 

356 # Generate a "stub function" on-the-fly which will actually make 

357 # the request. 

358 # gRPC handles serialization and deserialization, so we just need 

359 # to pass in the functions for each. 

360 if "create_write_stream" not in self._stubs: 

361 self._stubs["create_write_stream"] = self._logged_channel.unary_unary( 

362 "/google.cloud.bigquery.storage.v1.BigQueryWrite/CreateWriteStream", 

363 request_serializer=storage.CreateWriteStreamRequest.serialize, 

364 response_deserializer=stream.WriteStream.deserialize, 

365 ) 

366 return self._stubs["create_write_stream"] 

367 

368 @property 

369 def append_rows( 

370 self, 

371 ) -> Callable[[storage.AppendRowsRequest], Awaitable[storage.AppendRowsResponse]]: 

372 r"""Return a callable for the append rows method over gRPC. 

373 

374 Appends data to the given stream. 

375 

376 If ``offset`` is specified, the ``offset`` is checked against 

377 the end of stream. The server returns ``OUT_OF_RANGE`` in 

378 ``AppendRowsResponse`` if an attempt is made to append to an 

379 offset beyond the current end of the stream or 

380 ``ALREADY_EXISTS`` if user provides an ``offset`` that has 

381 already been written to. User can retry with adjusted offset 

382 within the same RPC connection. If ``offset`` is not specified, 

383 append happens at the end of the stream. 

384 

385 The response contains an optional offset at which the append 

386 happened. No offset information will be returned for appends to 

387 a default stream. 

388 

389 Responses are received in the same order in which requests are 

390 sent. There will be one response for each successful inserted 

391 request. Responses may optionally embed error information if the 

392 originating AppendRequest was not successfully processed. 

393 

394 The specifics of when successfully appended data is made visible 

395 to the table are governed by the type of stream: 

396 

397 - For COMMITTED streams (which includes the default stream), 

398 data is visible immediately upon successful append. 

399 

400 - For BUFFERED streams, data is made visible via a subsequent 

401 ``FlushRows`` rpc which advances a cursor to a newer offset 

402 in the stream. 

403 

404 - For PENDING streams, data is not made visible until the 

405 stream itself is finalized (via the ``FinalizeWriteStream`` 

406 rpc), and the stream is explicitly committed via the 

407 ``BatchCommitWriteStreams`` rpc. 

408 

409 Returns: 

410 Callable[[~.AppendRowsRequest], 

411 Awaitable[~.AppendRowsResponse]]: 

412 A function that, when called, will call the underlying RPC 

413 on the server. 

414 """ 

415 # Generate a "stub function" on-the-fly which will actually make 

416 # the request. 

417 # gRPC handles serialization and deserialization, so we just need 

418 # to pass in the functions for each. 

419 if "append_rows" not in self._stubs: 

420 self._stubs["append_rows"] = self._logged_channel.stream_stream( 

421 "/google.cloud.bigquery.storage.v1.BigQueryWrite/AppendRows", 

422 request_serializer=storage.AppendRowsRequest.serialize, 

423 response_deserializer=storage.AppendRowsResponse.deserialize, 

424 ) 

425 return self._stubs["append_rows"] 

426 

427 @property 

428 def get_write_stream( 

429 self, 

430 ) -> Callable[[storage.GetWriteStreamRequest], Awaitable[stream.WriteStream]]: 

431 r"""Return a callable for the get write stream method over gRPC. 

432 

433 Gets information about a write stream. 

434 

435 Returns: 

436 Callable[[~.GetWriteStreamRequest], 

437 Awaitable[~.WriteStream]]: 

438 A function that, when called, will call the underlying RPC 

439 on the server. 

440 """ 

441 # Generate a "stub function" on-the-fly which will actually make 

442 # the request. 

443 # gRPC handles serialization and deserialization, so we just need 

444 # to pass in the functions for each. 

445 if "get_write_stream" not in self._stubs: 

446 self._stubs["get_write_stream"] = self._logged_channel.unary_unary( 

447 "/google.cloud.bigquery.storage.v1.BigQueryWrite/GetWriteStream", 

448 request_serializer=storage.GetWriteStreamRequest.serialize, 

449 response_deserializer=stream.WriteStream.deserialize, 

450 ) 

451 return self._stubs["get_write_stream"] 

452 

453 @property 

454 def finalize_write_stream( 

455 self, 

456 ) -> Callable[ 

457 [storage.FinalizeWriteStreamRequest], 

458 Awaitable[storage.FinalizeWriteStreamResponse], 

459 ]: 

460 r"""Return a callable for the finalize write stream method over gRPC. 

461 

462 Finalize a write stream so that no new data can be appended to 

463 the stream. Finalize is not supported on the '_default' stream. 

464 

465 Returns: 

466 Callable[[~.FinalizeWriteStreamRequest], 

467 Awaitable[~.FinalizeWriteStreamResponse]]: 

468 A function that, when called, will call the underlying RPC 

469 on the server. 

470 """ 

471 # Generate a "stub function" on-the-fly which will actually make 

472 # the request. 

473 # gRPC handles serialization and deserialization, so we just need 

474 # to pass in the functions for each. 

475 if "finalize_write_stream" not in self._stubs: 

476 self._stubs["finalize_write_stream"] = self._logged_channel.unary_unary( 

477 "/google.cloud.bigquery.storage.v1.BigQueryWrite/FinalizeWriteStream", 

478 request_serializer=storage.FinalizeWriteStreamRequest.serialize, 

479 response_deserializer=storage.FinalizeWriteStreamResponse.deserialize, 

480 ) 

481 return self._stubs["finalize_write_stream"] 

482 

483 @property 

484 def batch_commit_write_streams( 

485 self, 

486 ) -> Callable[ 

487 [storage.BatchCommitWriteStreamsRequest], 

488 Awaitable[storage.BatchCommitWriteStreamsResponse], 

489 ]: 

490 r"""Return a callable for the batch commit write streams method over gRPC. 

491 

492 Atomically commits a group of ``PENDING`` streams that belong to 

493 the same ``parent`` table. 

494 

495 Streams must be finalized before commit and cannot be committed 

496 multiple times. Once a stream is committed, data in the stream 

497 becomes available for read operations. 

498 

499 Returns: 

500 Callable[[~.BatchCommitWriteStreamsRequest], 

501 Awaitable[~.BatchCommitWriteStreamsResponse]]: 

502 A function that, when called, will call the underlying RPC 

503 on the server. 

504 """ 

505 # Generate a "stub function" on-the-fly which will actually make 

506 # the request. 

507 # gRPC handles serialization and deserialization, so we just need 

508 # to pass in the functions for each. 

509 if "batch_commit_write_streams" not in self._stubs: 

510 self._stubs[ 

511 "batch_commit_write_streams" 

512 ] = self._logged_channel.unary_unary( 

513 "/google.cloud.bigquery.storage.v1.BigQueryWrite/BatchCommitWriteStreams", 

514 request_serializer=storage.BatchCommitWriteStreamsRequest.serialize, 

515 response_deserializer=storage.BatchCommitWriteStreamsResponse.deserialize, 

516 ) 

517 return self._stubs["batch_commit_write_streams"] 

518 

519 @property 

520 def flush_rows( 

521 self, 

522 ) -> Callable[[storage.FlushRowsRequest], Awaitable[storage.FlushRowsResponse]]: 

523 r"""Return a callable for the flush rows method over gRPC. 

524 

525 Flushes rows to a BUFFERED stream. 

526 

527 If users are appending rows to BUFFERED stream, flush operation 

528 is required in order for the rows to become available for 

529 reading. A Flush operation flushes up to any previously flushed 

530 offset in a BUFFERED stream, to the offset specified in the 

531 request. 

532 

533 Flush is not supported on the \_default stream, since it is not 

534 BUFFERED. 

535 

536 Returns: 

537 Callable[[~.FlushRowsRequest], 

538 Awaitable[~.FlushRowsResponse]]: 

539 A function that, when called, will call the underlying RPC 

540 on the server. 

541 """ 

542 # Generate a "stub function" on-the-fly which will actually make 

543 # the request. 

544 # gRPC handles serialization and deserialization, so we just need 

545 # to pass in the functions for each. 

546 if "flush_rows" not in self._stubs: 

547 self._stubs["flush_rows"] = self._logged_channel.unary_unary( 

548 "/google.cloud.bigquery.storage.v1.BigQueryWrite/FlushRows", 

549 request_serializer=storage.FlushRowsRequest.serialize, 

550 response_deserializer=storage.FlushRowsResponse.deserialize, 

551 ) 

552 return self._stubs["flush_rows"] 

553 

554 def _prep_wrapped_messages(self, client_info): 

555 """Precompute the wrapped methods, overriding the base class method to use async wrappers.""" 

556 self._wrapped_methods = { 

557 self.create_write_stream: self._wrap_method( 

558 self.create_write_stream, 

559 default_retry=retries.AsyncRetry( 

560 initial=10.0, 

561 maximum=120.0, 

562 multiplier=1.3, 

563 predicate=retries.if_exception_type( 

564 core_exceptions.DeadlineExceeded, 

565 core_exceptions.ResourceExhausted, 

566 core_exceptions.ServiceUnavailable, 

567 ), 

568 deadline=1200.0, 

569 ), 

570 default_timeout=1200.0, 

571 client_info=client_info, 

572 ), 

573 self.append_rows: self._wrap_method( 

574 self.append_rows, 

575 default_retry=retries.AsyncRetry( 

576 initial=0.1, 

577 maximum=60.0, 

578 multiplier=1.3, 

579 predicate=retries.if_exception_type( 

580 core_exceptions.ServiceUnavailable, 

581 ), 

582 deadline=86400.0, 

583 ), 

584 default_timeout=86400.0, 

585 client_info=client_info, 

586 ), 

587 self.get_write_stream: self._wrap_method( 

588 self.get_write_stream, 

589 default_retry=retries.AsyncRetry( 

590 initial=0.1, 

591 maximum=60.0, 

592 multiplier=1.3, 

593 predicate=retries.if_exception_type( 

594 core_exceptions.DeadlineExceeded, 

595 core_exceptions.ResourceExhausted, 

596 core_exceptions.ServiceUnavailable, 

597 ), 

598 deadline=600.0, 

599 ), 

600 default_timeout=600.0, 

601 client_info=client_info, 

602 ), 

603 self.finalize_write_stream: self._wrap_method( 

604 self.finalize_write_stream, 

605 default_retry=retries.AsyncRetry( 

606 initial=0.1, 

607 maximum=60.0, 

608 multiplier=1.3, 

609 predicate=retries.if_exception_type( 

610 core_exceptions.DeadlineExceeded, 

611 core_exceptions.ResourceExhausted, 

612 core_exceptions.ServiceUnavailable, 

613 ), 

614 deadline=600.0, 

615 ), 

616 default_timeout=600.0, 

617 client_info=client_info, 

618 ), 

619 self.batch_commit_write_streams: self._wrap_method( 

620 self.batch_commit_write_streams, 

621 default_retry=retries.AsyncRetry( 

622 initial=0.1, 

623 maximum=60.0, 

624 multiplier=1.3, 

625 predicate=retries.if_exception_type( 

626 core_exceptions.DeadlineExceeded, 

627 core_exceptions.ResourceExhausted, 

628 core_exceptions.ServiceUnavailable, 

629 ), 

630 deadline=600.0, 

631 ), 

632 default_timeout=600.0, 

633 client_info=client_info, 

634 ), 

635 self.flush_rows: self._wrap_method( 

636 self.flush_rows, 

637 default_retry=retries.AsyncRetry( 

638 initial=0.1, 

639 maximum=60.0, 

640 multiplier=1.3, 

641 predicate=retries.if_exception_type( 

642 core_exceptions.DeadlineExceeded, 

643 core_exceptions.ResourceExhausted, 

644 core_exceptions.ServiceUnavailable, 

645 ), 

646 deadline=600.0, 

647 ), 

648 default_timeout=600.0, 

649 client_info=client_info, 

650 ), 

651 } 

652 

653 def _wrap_method(self, func, *args, **kwargs): 

654 if self._wrap_with_kind: # pragma: NO COVER 

655 kwargs["kind"] = self.kind 

656 return gapic_v1.method_async.wrap_method(func, *args, **kwargs) 

657 

658 def close(self): 

659 return self._logged_channel.close() 

660 

661 @property 

662 def kind(self) -> str: 

663 return "grpc_asyncio" 

664 

665 

666__all__ = ("BigQueryWriteGrpcAsyncIOTransport",)