Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/cloud/bigquery_storage_v1/services/big_query_write/transports/grpc.py: 45%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

92 statements  

1# -*- coding: utf-8 -*- 

2# Copyright 2025 Google LLC 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16import json 

17import logging as std_logging 

18import pickle 

19from typing import Callable, Dict, Optional, Sequence, Tuple, Union 

20import warnings 

21 

22from google.api_core import gapic_v1, grpc_helpers 

23import google.auth # type: ignore 

24from google.auth import credentials as ga_credentials # type: ignore 

25from google.auth.transport.grpc import SslCredentials # type: ignore 

26from google.protobuf.json_format import MessageToJson 

27import google.protobuf.message 

28import grpc # type: ignore 

29import proto # type: ignore 

30 

31from google.cloud.bigquery_storage_v1.types import storage, stream 

32 

33from .base import DEFAULT_CLIENT_INFO, BigQueryWriteTransport 

34 

35try: 

36 from google.api_core import client_logging # type: ignore 

37 

38 CLIENT_LOGGING_SUPPORTED = True # pragma: NO COVER 

39except ImportError: # pragma: NO COVER 

40 CLIENT_LOGGING_SUPPORTED = False 

41 

42_LOGGER = std_logging.getLogger(__name__) 

43 

44 

45class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor): # pragma: NO COVER 

46 def intercept_unary_unary(self, continuation, client_call_details, request): 

47 logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

48 std_logging.DEBUG 

49 ) 

50 if logging_enabled: # pragma: NO COVER 

51 request_metadata = client_call_details.metadata 

52 if isinstance(request, proto.Message): 

53 request_payload = type(request).to_json(request) 

54 elif isinstance(request, google.protobuf.message.Message): 

55 request_payload = MessageToJson(request) 

56 else: 

57 request_payload = f"{type(request).__name__}: {pickle.dumps(request)}" 

58 

59 request_metadata = { 

60 key: value.decode("utf-8") if isinstance(value, bytes) else value 

61 for key, value in request_metadata 

62 } 

63 grpc_request = { 

64 "payload": request_payload, 

65 "requestMethod": "grpc", 

66 "metadata": dict(request_metadata), 

67 } 

68 _LOGGER.debug( 

69 f"Sending request for {client_call_details.method}", 

70 extra={ 

71 "serviceName": "google.cloud.bigquery.storage.v1.BigQueryWrite", 

72 "rpcName": str(client_call_details.method), 

73 "request": grpc_request, 

74 "metadata": grpc_request["metadata"], 

75 }, 

76 ) 

77 response = continuation(client_call_details, request) 

78 if logging_enabled: # pragma: NO COVER 

79 response_metadata = response.trailing_metadata() 

80 # Convert gRPC metadata `<class 'grpc.aio._metadata.Metadata'>` to list of tuples 

81 metadata = ( 

82 dict([(k, str(v)) for k, v in response_metadata]) 

83 if response_metadata 

84 else None 

85 ) 

86 result = response.result() 

87 if isinstance(result, proto.Message): 

88 response_payload = type(result).to_json(result) 

89 elif isinstance(result, google.protobuf.message.Message): 

90 response_payload = MessageToJson(result) 

91 else: 

92 response_payload = f"{type(result).__name__}: {pickle.dumps(result)}" 

93 grpc_response = { 

94 "payload": response_payload, 

95 "metadata": metadata, 

96 "status": "OK", 

97 } 

98 _LOGGER.debug( 

99 f"Received response for {client_call_details.method}.", 

100 extra={ 

101 "serviceName": "google.cloud.bigquery.storage.v1.BigQueryWrite", 

102 "rpcName": client_call_details.method, 

103 "response": grpc_response, 

104 "metadata": grpc_response["metadata"], 

105 }, 

106 ) 

107 return response 

108 

109 

110class BigQueryWriteGrpcTransport(BigQueryWriteTransport): 

111 """gRPC backend transport for BigQueryWrite. 

112 

113 BigQuery Write API. 

114 

115 The Write API can be used to write data to BigQuery. 

116 

117 For supplementary information about the Write API, see: 

118 

119 https://cloud.google.com/bigquery/docs/write-api 

120 

121 This class defines the same methods as the primary client, so the 

122 primary client can load the underlying transport implementation 

123 and call it. 

124 

125 It sends protocol buffers over the wire using gRPC (which is built on 

126 top of HTTP/2); the ``grpcio`` package must be installed. 

127 """ 

128 

129 _stubs: Dict[str, Callable] 

130 

131 def __init__( 

132 self, 

133 *, 

134 host: str = "bigquerystorage.googleapis.com", 

135 credentials: Optional[ga_credentials.Credentials] = None, 

136 credentials_file: Optional[str] = None, 

137 scopes: Optional[Sequence[str]] = None, 

138 channel: Optional[Union[grpc.Channel, Callable[..., grpc.Channel]]] = None, 

139 api_mtls_endpoint: Optional[str] = None, 

140 client_cert_source: Optional[Callable[[], Tuple[bytes, bytes]]] = None, 

141 ssl_channel_credentials: Optional[grpc.ChannelCredentials] = None, 

142 client_cert_source_for_mtls: Optional[Callable[[], Tuple[bytes, bytes]]] = None, 

143 quota_project_id: Optional[str] = None, 

144 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, 

145 always_use_jwt_access: Optional[bool] = False, 

146 api_audience: Optional[str] = None, 

147 ) -> None: 

148 """Instantiate the transport. 

149 

150 Args: 

151 host (Optional[str]): 

152 The hostname to connect to (default: 'bigquerystorage.googleapis.com'). 

153 credentials (Optional[google.auth.credentials.Credentials]): The 

154 authorization credentials to attach to requests. These 

155 credentials identify the application to the service; if none 

156 are specified, the client will attempt to ascertain the 

157 credentials from the environment. 

158 This argument is ignored if a ``channel`` instance is provided. 

159 credentials_file (Optional[str]): A file with credentials that can 

160 be loaded with :func:`google.auth.load_credentials_from_file`. 

161 This argument is ignored if a ``channel`` instance is provided. 

162 scopes (Optional(Sequence[str])): A list of scopes. This argument is 

163 ignored if a ``channel`` instance is provided. 

164 channel (Optional[Union[grpc.Channel, Callable[..., grpc.Channel]]]): 

165 A ``Channel`` instance through which to make calls, or a Callable 

166 that constructs and returns one. If set to None, ``self.create_channel`` 

167 is used to create the channel. If a Callable is given, it will be called 

168 with the same arguments as used in ``self.create_channel``. 

169 api_mtls_endpoint (Optional[str]): Deprecated. The mutual TLS endpoint. 

170 If provided, it overrides the ``host`` argument and tries to create 

171 a mutual TLS channel with client SSL credentials from 

172 ``client_cert_source`` or application default SSL credentials. 

173 client_cert_source (Optional[Callable[[], Tuple[bytes, bytes]]]): 

174 Deprecated. A callback to provide client SSL certificate bytes and 

175 private key bytes, both in PEM format. It is ignored if 

176 ``api_mtls_endpoint`` is None. 

177 ssl_channel_credentials (grpc.ChannelCredentials): SSL credentials 

178 for the grpc channel. It is ignored if a ``channel`` instance is provided. 

179 client_cert_source_for_mtls (Optional[Callable[[], Tuple[bytes, bytes]]]): 

180 A callback to provide client certificate bytes and private key bytes, 

181 both in PEM format. It is used to configure a mutual TLS channel. It is 

182 ignored if a ``channel`` instance or ``ssl_channel_credentials`` is provided. 

183 quota_project_id (Optional[str]): An optional project to use for billing 

184 and quota. 

185 client_info (google.api_core.gapic_v1.client_info.ClientInfo): 

186 The client info used to send a user-agent string along with 

187 API requests. If ``None``, then default info will be used. 

188 Generally, you only need to set this if you're developing 

189 your own client library. 

190 always_use_jwt_access (Optional[bool]): Whether self signed JWT should 

191 be used for service account credentials. 

192 

193 Raises: 

194 google.auth.exceptions.MutualTLSChannelError: If mutual TLS transport 

195 creation failed for any reason. 

196 google.api_core.exceptions.DuplicateCredentialArgs: If both ``credentials`` 

197 and ``credentials_file`` are passed. 

198 """ 

199 self._grpc_channel = None 

200 self._ssl_channel_credentials = ssl_channel_credentials 

201 self._stubs: Dict[str, Callable] = {} 

202 

203 if api_mtls_endpoint: 

204 warnings.warn("api_mtls_endpoint is deprecated", DeprecationWarning) 

205 if client_cert_source: 

206 warnings.warn("client_cert_source is deprecated", DeprecationWarning) 

207 

208 if isinstance(channel, grpc.Channel): 

209 # Ignore credentials if a channel was passed. 

210 credentials = None 

211 self._ignore_credentials = True 

212 # If a channel was explicitly provided, set it. 

213 self._grpc_channel = channel 

214 self._ssl_channel_credentials = None 

215 

216 else: 

217 if api_mtls_endpoint: 

218 host = api_mtls_endpoint 

219 

220 # Create SSL credentials with client_cert_source or application 

221 # default SSL credentials. 

222 if client_cert_source: 

223 cert, key = client_cert_source() 

224 self._ssl_channel_credentials = grpc.ssl_channel_credentials( 

225 certificate_chain=cert, private_key=key 

226 ) 

227 else: 

228 self._ssl_channel_credentials = SslCredentials().ssl_credentials 

229 

230 else: 

231 if client_cert_source_for_mtls and not ssl_channel_credentials: 

232 cert, key = client_cert_source_for_mtls() 

233 self._ssl_channel_credentials = grpc.ssl_channel_credentials( 

234 certificate_chain=cert, private_key=key 

235 ) 

236 

237 # The base transport sets the host, credentials and scopes 

238 super().__init__( 

239 host=host, 

240 credentials=credentials, 

241 credentials_file=credentials_file, 

242 scopes=scopes, 

243 quota_project_id=quota_project_id, 

244 client_info=client_info, 

245 always_use_jwt_access=always_use_jwt_access, 

246 api_audience=api_audience, 

247 ) 

248 

249 if not self._grpc_channel: 

250 # initialize with the provided callable or the default channel 

251 channel_init = channel or type(self).create_channel 

252 self._grpc_channel = channel_init( 

253 self._host, 

254 # use the credentials which are saved 

255 credentials=self._credentials, 

256 # Set ``credentials_file`` to ``None`` here as 

257 # the credentials that we saved earlier should be used. 

258 credentials_file=None, 

259 scopes=self._scopes, 

260 ssl_credentials=self._ssl_channel_credentials, 

261 quota_project_id=quota_project_id, 

262 options=[ 

263 ("grpc.max_send_message_length", -1), 

264 ("grpc.max_receive_message_length", -1), 

265 ], 

266 ) 

267 

268 self._interceptor = _LoggingClientInterceptor() 

269 self._logged_channel = grpc.intercept_channel( 

270 self._grpc_channel, self._interceptor 

271 ) 

272 

273 # Wrap messages. This must be done after self._logged_channel exists 

274 self._prep_wrapped_messages(client_info) 

275 

276 @classmethod 

277 def create_channel( 

278 cls, 

279 host: str = "bigquerystorage.googleapis.com", 

280 credentials: Optional[ga_credentials.Credentials] = None, 

281 credentials_file: Optional[str] = None, 

282 scopes: Optional[Sequence[str]] = None, 

283 quota_project_id: Optional[str] = None, 

284 **kwargs, 

285 ) -> grpc.Channel: 

286 """Create and return a gRPC channel object. 

287 Args: 

288 host (Optional[str]): The host for the channel to use. 

289 credentials (Optional[~.Credentials]): The 

290 authorization credentials to attach to requests. These 

291 credentials identify this application to the service. If 

292 none are specified, the client will attempt to ascertain 

293 the credentials from the environment. 

294 credentials_file (Optional[str]): A file with credentials that can 

295 be loaded with :func:`google.auth.load_credentials_from_file`. 

296 This argument is mutually exclusive with credentials. 

297 scopes (Optional[Sequence[str]]): A optional list of scopes needed for this 

298 service. These are only used when credentials are not specified and 

299 are passed to :func:`google.auth.default`. 

300 quota_project_id (Optional[str]): An optional project to use for billing 

301 and quota. 

302 kwargs (Optional[dict]): Keyword arguments, which are passed to the 

303 channel creation. 

304 Returns: 

305 grpc.Channel: A gRPC channel object. 

306 

307 Raises: 

308 google.api_core.exceptions.DuplicateCredentialArgs: If both ``credentials`` 

309 and ``credentials_file`` are passed. 

310 """ 

311 

312 return grpc_helpers.create_channel( 

313 host, 

314 credentials=credentials, 

315 credentials_file=credentials_file, 

316 quota_project_id=quota_project_id, 

317 default_scopes=cls.AUTH_SCOPES, 

318 scopes=scopes, 

319 default_host=cls.DEFAULT_HOST, 

320 **kwargs, 

321 ) 

322 

323 @property 

324 def grpc_channel(self) -> grpc.Channel: 

325 """Return the channel designed to connect to this service.""" 

326 return self._grpc_channel 

327 

328 @property 

329 def create_write_stream( 

330 self, 

331 ) -> Callable[[storage.CreateWriteStreamRequest], stream.WriteStream]: 

332 r"""Return a callable for the create write stream method over gRPC. 

333 

334 Creates a write stream to the given table. Additionally, every 

335 table has a special stream named '_default' to which data can be 

336 written. This stream doesn't need to be created using 

337 CreateWriteStream. It is a stream that can be used 

338 simultaneously by any number of clients. Data written to this 

339 stream is considered committed as soon as an acknowledgement is 

340 received. 

341 

342 Returns: 

343 Callable[[~.CreateWriteStreamRequest], 

344 ~.WriteStream]: 

345 A function that, when called, will call the underlying RPC 

346 on the server. 

347 """ 

348 # Generate a "stub function" on-the-fly which will actually make 

349 # the request. 

350 # gRPC handles serialization and deserialization, so we just need 

351 # to pass in the functions for each. 

352 if "create_write_stream" not in self._stubs: 

353 self._stubs["create_write_stream"] = self._logged_channel.unary_unary( 

354 "/google.cloud.bigquery.storage.v1.BigQueryWrite/CreateWriteStream", 

355 request_serializer=storage.CreateWriteStreamRequest.serialize, 

356 response_deserializer=stream.WriteStream.deserialize, 

357 ) 

358 return self._stubs["create_write_stream"] 

359 

360 @property 

361 def append_rows( 

362 self, 

363 ) -> Callable[[storage.AppendRowsRequest], storage.AppendRowsResponse]: 

364 r"""Return a callable for the append rows method over gRPC. 

365 

366 Appends data to the given stream. 

367 

368 If ``offset`` is specified, the ``offset`` is checked against 

369 the end of stream. The server returns ``OUT_OF_RANGE`` in 

370 ``AppendRowsResponse`` if an attempt is made to append to an 

371 offset beyond the current end of the stream or 

372 ``ALREADY_EXISTS`` if user provides an ``offset`` that has 

373 already been written to. User can retry with adjusted offset 

374 within the same RPC connection. If ``offset`` is not specified, 

375 append happens at the end of the stream. 

376 

377 The response contains an optional offset at which the append 

378 happened. No offset information will be returned for appends to 

379 a default stream. 

380 

381 Responses are received in the same order in which requests are 

382 sent. There will be one response for each successful inserted 

383 request. Responses may optionally embed error information if the 

384 originating AppendRequest was not successfully processed. 

385 

386 The specifics of when successfully appended data is made visible 

387 to the table are governed by the type of stream: 

388 

389 - For COMMITTED streams (which includes the default stream), 

390 data is visible immediately upon successful append. 

391 

392 - For BUFFERED streams, data is made visible via a subsequent 

393 ``FlushRows`` rpc which advances a cursor to a newer offset 

394 in the stream. 

395 

396 - For PENDING streams, data is not made visible until the 

397 stream itself is finalized (via the ``FinalizeWriteStream`` 

398 rpc), and the stream is explicitly committed via the 

399 ``BatchCommitWriteStreams`` rpc. 

400 

401 Returns: 

402 Callable[[~.AppendRowsRequest], 

403 ~.AppendRowsResponse]: 

404 A function that, when called, will call the underlying RPC 

405 on the server. 

406 """ 

407 # Generate a "stub function" on-the-fly which will actually make 

408 # the request. 

409 # gRPC handles serialization and deserialization, so we just need 

410 # to pass in the functions for each. 

411 if "append_rows" not in self._stubs: 

412 self._stubs["append_rows"] = self._logged_channel.stream_stream( 

413 "/google.cloud.bigquery.storage.v1.BigQueryWrite/AppendRows", 

414 request_serializer=storage.AppendRowsRequest.serialize, 

415 response_deserializer=storage.AppendRowsResponse.deserialize, 

416 ) 

417 return self._stubs["append_rows"] 

418 

419 @property 

420 def get_write_stream( 

421 self, 

422 ) -> Callable[[storage.GetWriteStreamRequest], stream.WriteStream]: 

423 r"""Return a callable for the get write stream method over gRPC. 

424 

425 Gets information about a write stream. 

426 

427 Returns: 

428 Callable[[~.GetWriteStreamRequest], 

429 ~.WriteStream]: 

430 A function that, when called, will call the underlying RPC 

431 on the server. 

432 """ 

433 # Generate a "stub function" on-the-fly which will actually make 

434 # the request. 

435 # gRPC handles serialization and deserialization, so we just need 

436 # to pass in the functions for each. 

437 if "get_write_stream" not in self._stubs: 

438 self._stubs["get_write_stream"] = self._logged_channel.unary_unary( 

439 "/google.cloud.bigquery.storage.v1.BigQueryWrite/GetWriteStream", 

440 request_serializer=storage.GetWriteStreamRequest.serialize, 

441 response_deserializer=stream.WriteStream.deserialize, 

442 ) 

443 return self._stubs["get_write_stream"] 

444 

445 @property 

446 def finalize_write_stream( 

447 self, 

448 ) -> Callable[ 

449 [storage.FinalizeWriteStreamRequest], storage.FinalizeWriteStreamResponse 

450 ]: 

451 r"""Return a callable for the finalize write stream method over gRPC. 

452 

453 Finalize a write stream so that no new data can be appended to 

454 the stream. Finalize is not supported on the '_default' stream. 

455 

456 Returns: 

457 Callable[[~.FinalizeWriteStreamRequest], 

458 ~.FinalizeWriteStreamResponse]: 

459 A function that, when called, will call the underlying RPC 

460 on the server. 

461 """ 

462 # Generate a "stub function" on-the-fly which will actually make 

463 # the request. 

464 # gRPC handles serialization and deserialization, so we just need 

465 # to pass in the functions for each. 

466 if "finalize_write_stream" not in self._stubs: 

467 self._stubs["finalize_write_stream"] = self._logged_channel.unary_unary( 

468 "/google.cloud.bigquery.storage.v1.BigQueryWrite/FinalizeWriteStream", 

469 request_serializer=storage.FinalizeWriteStreamRequest.serialize, 

470 response_deserializer=storage.FinalizeWriteStreamResponse.deserialize, 

471 ) 

472 return self._stubs["finalize_write_stream"] 

473 

474 @property 

475 def batch_commit_write_streams( 

476 self, 

477 ) -> Callable[ 

478 [storage.BatchCommitWriteStreamsRequest], 

479 storage.BatchCommitWriteStreamsResponse, 

480 ]: 

481 r"""Return a callable for the batch commit write streams method over gRPC. 

482 

483 Atomically commits a group of ``PENDING`` streams that belong to 

484 the same ``parent`` table. 

485 

486 Streams must be finalized before commit and cannot be committed 

487 multiple times. Once a stream is committed, data in the stream 

488 becomes available for read operations. 

489 

490 Returns: 

491 Callable[[~.BatchCommitWriteStreamsRequest], 

492 ~.BatchCommitWriteStreamsResponse]: 

493 A function that, when called, will call the underlying RPC 

494 on the server. 

495 """ 

496 # Generate a "stub function" on-the-fly which will actually make 

497 # the request. 

498 # gRPC handles serialization and deserialization, so we just need 

499 # to pass in the functions for each. 

500 if "batch_commit_write_streams" not in self._stubs: 

501 self._stubs[ 

502 "batch_commit_write_streams" 

503 ] = self._logged_channel.unary_unary( 

504 "/google.cloud.bigquery.storage.v1.BigQueryWrite/BatchCommitWriteStreams", 

505 request_serializer=storage.BatchCommitWriteStreamsRequest.serialize, 

506 response_deserializer=storage.BatchCommitWriteStreamsResponse.deserialize, 

507 ) 

508 return self._stubs["batch_commit_write_streams"] 

509 

510 @property 

511 def flush_rows( 

512 self, 

513 ) -> Callable[[storage.FlushRowsRequest], storage.FlushRowsResponse]: 

514 r"""Return a callable for the flush rows method over gRPC. 

515 

516 Flushes rows to a BUFFERED stream. 

517 

518 If users are appending rows to BUFFERED stream, flush operation 

519 is required in order for the rows to become available for 

520 reading. A Flush operation flushes up to any previously flushed 

521 offset in a BUFFERED stream, to the offset specified in the 

522 request. 

523 

524 Flush is not supported on the \_default stream, since it is not 

525 BUFFERED. 

526 

527 Returns: 

528 Callable[[~.FlushRowsRequest], 

529 ~.FlushRowsResponse]: 

530 A function that, when called, will call the underlying RPC 

531 on the server. 

532 """ 

533 # Generate a "stub function" on-the-fly which will actually make 

534 # the request. 

535 # gRPC handles serialization and deserialization, so we just need 

536 # to pass in the functions for each. 

537 if "flush_rows" not in self._stubs: 

538 self._stubs["flush_rows"] = self._logged_channel.unary_unary( 

539 "/google.cloud.bigquery.storage.v1.BigQueryWrite/FlushRows", 

540 request_serializer=storage.FlushRowsRequest.serialize, 

541 response_deserializer=storage.FlushRowsResponse.deserialize, 

542 ) 

543 return self._stubs["flush_rows"] 

544 

545 def close(self): 

546 self._logged_channel.close() 

547 

548 @property 

549 def kind(self) -> str: 

550 return "grpc" 

551 

552 

553__all__ = ("BigQueryWriteGrpcTransport",)