Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/cloud/bigquery_storage_v1/services/big_query_write/transports/grpc.py: 41%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

78 statements  

1# -*- coding: utf-8 -*- 

2# Copyright 2024 Google LLC 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16from typing import Callable, Dict, Optional, Sequence, Tuple, Union 

17import warnings 

18 

19from google.api_core import gapic_v1, grpc_helpers 

20import google.auth # type: ignore 

21from google.auth import credentials as ga_credentials # type: ignore 

22from google.auth.transport.grpc import SslCredentials # type: ignore 

23import grpc # type: ignore 

24 

25from google.cloud.bigquery_storage_v1.types import storage, stream 

26 

27from .base import DEFAULT_CLIENT_INFO, BigQueryWriteTransport 

28 

29 

30class BigQueryWriteGrpcTransport(BigQueryWriteTransport): 

31 """gRPC backend transport for BigQueryWrite. 

32 

33 BigQuery Write API. 

34 

35 The Write API can be used to write data to BigQuery. 

36 

37 For supplementary information about the Write API, see: 

38 

39 https://cloud.google.com/bigquery/docs/write-api 

40 

41 This class defines the same methods as the primary client, so the 

42 primary client can load the underlying transport implementation 

43 and call it. 

44 

45 It sends protocol buffers over the wire using gRPC (which is built on 

46 top of HTTP/2); the ``grpcio`` package must be installed. 

47 """ 

48 

49 _stubs: Dict[str, Callable] 

50 

51 def __init__( 

52 self, 

53 *, 

54 host: str = "bigquerystorage.googleapis.com", 

55 credentials: Optional[ga_credentials.Credentials] = None, 

56 credentials_file: Optional[str] = None, 

57 scopes: Optional[Sequence[str]] = None, 

58 channel: Optional[grpc.Channel] = None, 

59 api_mtls_endpoint: Optional[str] = None, 

60 client_cert_source: Optional[Callable[[], Tuple[bytes, bytes]]] = None, 

61 ssl_channel_credentials: Optional[grpc.ChannelCredentials] = None, 

62 client_cert_source_for_mtls: Optional[Callable[[], Tuple[bytes, bytes]]] = None, 

63 quota_project_id: Optional[str] = None, 

64 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, 

65 always_use_jwt_access: Optional[bool] = False, 

66 api_audience: Optional[str] = None, 

67 ) -> None: 

68 """Instantiate the transport. 

69 

70 Args: 

71 host (Optional[str]): 

72 The hostname to connect to (default: 'bigquerystorage.googleapis.com'). 

73 credentials (Optional[google.auth.credentials.Credentials]): The 

74 authorization credentials to attach to requests. These 

75 credentials identify the application to the service; if none 

76 are specified, the client will attempt to ascertain the 

77 credentials from the environment. 

78 This argument is ignored if ``channel`` is provided. 

79 credentials_file (Optional[str]): A file with credentials that can 

80 be loaded with :func:`google.auth.load_credentials_from_file`. 

81 This argument is ignored if ``channel`` is provided. 

82 scopes (Optional(Sequence[str])): A list of scopes. This argument is 

83 ignored if ``channel`` is provided. 

84 channel (Optional[grpc.Channel]): A ``Channel`` instance through 

85 which to make calls. 

86 api_mtls_endpoint (Optional[str]): Deprecated. The mutual TLS endpoint. 

87 If provided, it overrides the ``host`` argument and tries to create 

88 a mutual TLS channel with client SSL credentials from 

89 ``client_cert_source`` or application default SSL credentials. 

90 client_cert_source (Optional[Callable[[], Tuple[bytes, bytes]]]): 

91 Deprecated. A callback to provide client SSL certificate bytes and 

92 private key bytes, both in PEM format. It is ignored if 

93 ``api_mtls_endpoint`` is None. 

94 ssl_channel_credentials (grpc.ChannelCredentials): SSL credentials 

95 for the grpc channel. It is ignored if ``channel`` is provided. 

96 client_cert_source_for_mtls (Optional[Callable[[], Tuple[bytes, bytes]]]): 

97 A callback to provide client certificate bytes and private key bytes, 

98 both in PEM format. It is used to configure a mutual TLS channel. It is 

99 ignored if ``channel`` or ``ssl_channel_credentials`` is provided. 

100 quota_project_id (Optional[str]): An optional project to use for billing 

101 and quota. 

102 client_info (google.api_core.gapic_v1.client_info.ClientInfo): 

103 The client info used to send a user-agent string along with 

104 API requests. If ``None``, then default info will be used. 

105 Generally, you only need to set this if you're developing 

106 your own client library. 

107 always_use_jwt_access (Optional[bool]): Whether self signed JWT should 

108 be used for service account credentials. 

109 

110 Raises: 

111 google.auth.exceptions.MutualTLSChannelError: If mutual TLS transport 

112 creation failed for any reason. 

113 google.api_core.exceptions.DuplicateCredentialArgs: If both ``credentials`` 

114 and ``credentials_file`` are passed. 

115 """ 

116 self._grpc_channel = None 

117 self._ssl_channel_credentials = ssl_channel_credentials 

118 self._stubs: Dict[str, Callable] = {} 

119 

120 if api_mtls_endpoint: 

121 warnings.warn("api_mtls_endpoint is deprecated", DeprecationWarning) 

122 if client_cert_source: 

123 warnings.warn("client_cert_source is deprecated", DeprecationWarning) 

124 

125 if channel: 

126 # Ignore credentials if a channel was passed. 

127 credentials = False 

128 # If a channel was explicitly provided, set it. 

129 self._grpc_channel = channel 

130 self._ssl_channel_credentials = None 

131 

132 else: 

133 if api_mtls_endpoint: 

134 host = api_mtls_endpoint 

135 

136 # Create SSL credentials with client_cert_source or application 

137 # default SSL credentials. 

138 if client_cert_source: 

139 cert, key = client_cert_source() 

140 self._ssl_channel_credentials = grpc.ssl_channel_credentials( 

141 certificate_chain=cert, private_key=key 

142 ) 

143 else: 

144 self._ssl_channel_credentials = SslCredentials().ssl_credentials 

145 

146 else: 

147 if client_cert_source_for_mtls and not ssl_channel_credentials: 

148 cert, key = client_cert_source_for_mtls() 

149 self._ssl_channel_credentials = grpc.ssl_channel_credentials( 

150 certificate_chain=cert, private_key=key 

151 ) 

152 

153 # The base transport sets the host, credentials and scopes 

154 super().__init__( 

155 host=host, 

156 credentials=credentials, 

157 credentials_file=credentials_file, 

158 scopes=scopes, 

159 quota_project_id=quota_project_id, 

160 client_info=client_info, 

161 always_use_jwt_access=always_use_jwt_access, 

162 api_audience=api_audience, 

163 ) 

164 

165 if not self._grpc_channel: 

166 self._grpc_channel = type(self).create_channel( 

167 self._host, 

168 # use the credentials which are saved 

169 credentials=self._credentials, 

170 # Set ``credentials_file`` to ``None`` here as 

171 # the credentials that we saved earlier should be used. 

172 credentials_file=None, 

173 scopes=self._scopes, 

174 ssl_credentials=self._ssl_channel_credentials, 

175 quota_project_id=quota_project_id, 

176 options=[ 

177 ("grpc.max_send_message_length", -1), 

178 ("grpc.max_receive_message_length", -1), 

179 ], 

180 ) 

181 

182 # Wrap messages. This must be done after self._grpc_channel exists 

183 self._prep_wrapped_messages(client_info) 

184 

185 @classmethod 

186 def create_channel( 

187 cls, 

188 host: str = "bigquerystorage.googleapis.com", 

189 credentials: Optional[ga_credentials.Credentials] = None, 

190 credentials_file: Optional[str] = None, 

191 scopes: Optional[Sequence[str]] = None, 

192 quota_project_id: Optional[str] = None, 

193 **kwargs, 

194 ) -> grpc.Channel: 

195 """Create and return a gRPC channel object. 

196 Args: 

197 host (Optional[str]): The host for the channel to use. 

198 credentials (Optional[~.Credentials]): The 

199 authorization credentials to attach to requests. These 

200 credentials identify this application to the service. If 

201 none are specified, the client will attempt to ascertain 

202 the credentials from the environment. 

203 credentials_file (Optional[str]): A file with credentials that can 

204 be loaded with :func:`google.auth.load_credentials_from_file`. 

205 This argument is mutually exclusive with credentials. 

206 scopes (Optional[Sequence[str]]): A optional list of scopes needed for this 

207 service. These are only used when credentials are not specified and 

208 are passed to :func:`google.auth.default`. 

209 quota_project_id (Optional[str]): An optional project to use for billing 

210 and quota. 

211 kwargs (Optional[dict]): Keyword arguments, which are passed to the 

212 channel creation. 

213 Returns: 

214 grpc.Channel: A gRPC channel object. 

215 

216 Raises: 

217 google.api_core.exceptions.DuplicateCredentialArgs: If both ``credentials`` 

218 and ``credentials_file`` are passed. 

219 """ 

220 

221 return grpc_helpers.create_channel( 

222 host, 

223 credentials=credentials, 

224 credentials_file=credentials_file, 

225 quota_project_id=quota_project_id, 

226 default_scopes=cls.AUTH_SCOPES, 

227 scopes=scopes, 

228 default_host=cls.DEFAULT_HOST, 

229 **kwargs, 

230 ) 

231 

232 @property 

233 def grpc_channel(self) -> grpc.Channel: 

234 """Return the channel designed to connect to this service.""" 

235 return self._grpc_channel 

236 

237 @property 

238 def create_write_stream( 

239 self, 

240 ) -> Callable[[storage.CreateWriteStreamRequest], stream.WriteStream]: 

241 r"""Return a callable for the create write stream method over gRPC. 

242 

243 Creates a write stream to the given table. Additionally, every 

244 table has a special stream named '_default' to which data can be 

245 written. This stream doesn't need to be created using 

246 CreateWriteStream. It is a stream that can be used 

247 simultaneously by any number of clients. Data written to this 

248 stream is considered committed as soon as an acknowledgement is 

249 received. 

250 

251 Returns: 

252 Callable[[~.CreateWriteStreamRequest], 

253 ~.WriteStream]: 

254 A function that, when called, will call the underlying RPC 

255 on the server. 

256 """ 

257 # Generate a "stub function" on-the-fly which will actually make 

258 # the request. 

259 # gRPC handles serialization and deserialization, so we just need 

260 # to pass in the functions for each. 

261 if "create_write_stream" not in self._stubs: 

262 self._stubs["create_write_stream"] = self.grpc_channel.unary_unary( 

263 "/google.cloud.bigquery.storage.v1.BigQueryWrite/CreateWriteStream", 

264 request_serializer=storage.CreateWriteStreamRequest.serialize, 

265 response_deserializer=stream.WriteStream.deserialize, 

266 ) 

267 return self._stubs["create_write_stream"] 

268 

269 @property 

270 def append_rows( 

271 self, 

272 ) -> Callable[[storage.AppendRowsRequest], storage.AppendRowsResponse]: 

273 r"""Return a callable for the append rows method over gRPC. 

274 

275 Appends data to the given stream. 

276 

277 If ``offset`` is specified, the ``offset`` is checked against 

278 the end of stream. The server returns ``OUT_OF_RANGE`` in 

279 ``AppendRowsResponse`` if an attempt is made to append to an 

280 offset beyond the current end of the stream or 

281 ``ALREADY_EXISTS`` if user provides an ``offset`` that has 

282 already been written to. User can retry with adjusted offset 

283 within the same RPC connection. If ``offset`` is not specified, 

284 append happens at the end of the stream. 

285 

286 The response contains an optional offset at which the append 

287 happened. No offset information will be returned for appends to 

288 a default stream. 

289 

290 Responses are received in the same order in which requests are 

291 sent. There will be one response for each successful inserted 

292 request. Responses may optionally embed error information if the 

293 originating AppendRequest was not successfully processed. 

294 

295 The specifics of when successfully appended data is made visible 

296 to the table are governed by the type of stream: 

297 

298 - For COMMITTED streams (which includes the default stream), 

299 data is visible immediately upon successful append. 

300 

301 - For BUFFERED streams, data is made visible via a subsequent 

302 ``FlushRows`` rpc which advances a cursor to a newer offset 

303 in the stream. 

304 

305 - For PENDING streams, data is not made visible until the 

306 stream itself is finalized (via the ``FinalizeWriteStream`` 

307 rpc), and the stream is explicitly committed via the 

308 ``BatchCommitWriteStreams`` rpc. 

309 

310 Returns: 

311 Callable[[~.AppendRowsRequest], 

312 ~.AppendRowsResponse]: 

313 A function that, when called, will call the underlying RPC 

314 on the server. 

315 """ 

316 # Generate a "stub function" on-the-fly which will actually make 

317 # the request. 

318 # gRPC handles serialization and deserialization, so we just need 

319 # to pass in the functions for each. 

320 if "append_rows" not in self._stubs: 

321 self._stubs["append_rows"] = self.grpc_channel.stream_stream( 

322 "/google.cloud.bigquery.storage.v1.BigQueryWrite/AppendRows", 

323 request_serializer=storage.AppendRowsRequest.serialize, 

324 response_deserializer=storage.AppendRowsResponse.deserialize, 

325 ) 

326 return self._stubs["append_rows"] 

327 

328 @property 

329 def get_write_stream( 

330 self, 

331 ) -> Callable[[storage.GetWriteStreamRequest], stream.WriteStream]: 

332 r"""Return a callable for the get write stream method over gRPC. 

333 

334 Gets information about a write stream. 

335 

336 Returns: 

337 Callable[[~.GetWriteStreamRequest], 

338 ~.WriteStream]: 

339 A function that, when called, will call the underlying RPC 

340 on the server. 

341 """ 

342 # Generate a "stub function" on-the-fly which will actually make 

343 # the request. 

344 # gRPC handles serialization and deserialization, so we just need 

345 # to pass in the functions for each. 

346 if "get_write_stream" not in self._stubs: 

347 self._stubs["get_write_stream"] = self.grpc_channel.unary_unary( 

348 "/google.cloud.bigquery.storage.v1.BigQueryWrite/GetWriteStream", 

349 request_serializer=storage.GetWriteStreamRequest.serialize, 

350 response_deserializer=stream.WriteStream.deserialize, 

351 ) 

352 return self._stubs["get_write_stream"] 

353 

354 @property 

355 def finalize_write_stream( 

356 self, 

357 ) -> Callable[ 

358 [storage.FinalizeWriteStreamRequest], storage.FinalizeWriteStreamResponse 

359 ]: 

360 r"""Return a callable for the finalize write stream method over gRPC. 

361 

362 Finalize a write stream so that no new data can be appended to 

363 the stream. Finalize is not supported on the '_default' stream. 

364 

365 Returns: 

366 Callable[[~.FinalizeWriteStreamRequest], 

367 ~.FinalizeWriteStreamResponse]: 

368 A function that, when called, will call the underlying RPC 

369 on the server. 

370 """ 

371 # Generate a "stub function" on-the-fly which will actually make 

372 # the request. 

373 # gRPC handles serialization and deserialization, so we just need 

374 # to pass in the functions for each. 

375 if "finalize_write_stream" not in self._stubs: 

376 self._stubs["finalize_write_stream"] = self.grpc_channel.unary_unary( 

377 "/google.cloud.bigquery.storage.v1.BigQueryWrite/FinalizeWriteStream", 

378 request_serializer=storage.FinalizeWriteStreamRequest.serialize, 

379 response_deserializer=storage.FinalizeWriteStreamResponse.deserialize, 

380 ) 

381 return self._stubs["finalize_write_stream"] 

382 

383 @property 

384 def batch_commit_write_streams( 

385 self, 

386 ) -> Callable[ 

387 [storage.BatchCommitWriteStreamsRequest], 

388 storage.BatchCommitWriteStreamsResponse, 

389 ]: 

390 r"""Return a callable for the batch commit write streams method over gRPC. 

391 

392 Atomically commits a group of ``PENDING`` streams that belong to 

393 the same ``parent`` table. 

394 

395 Streams must be finalized before commit and cannot be committed 

396 multiple times. Once a stream is committed, data in the stream 

397 becomes available for read operations. 

398 

399 Returns: 

400 Callable[[~.BatchCommitWriteStreamsRequest], 

401 ~.BatchCommitWriteStreamsResponse]: 

402 A function that, when called, will call the underlying RPC 

403 on the server. 

404 """ 

405 # Generate a "stub function" on-the-fly which will actually make 

406 # the request. 

407 # gRPC handles serialization and deserialization, so we just need 

408 # to pass in the functions for each. 

409 if "batch_commit_write_streams" not in self._stubs: 

410 self._stubs["batch_commit_write_streams"] = self.grpc_channel.unary_unary( 

411 "/google.cloud.bigquery.storage.v1.BigQueryWrite/BatchCommitWriteStreams", 

412 request_serializer=storage.BatchCommitWriteStreamsRequest.serialize, 

413 response_deserializer=storage.BatchCommitWriteStreamsResponse.deserialize, 

414 ) 

415 return self._stubs["batch_commit_write_streams"] 

416 

417 @property 

418 def flush_rows( 

419 self, 

420 ) -> Callable[[storage.FlushRowsRequest], storage.FlushRowsResponse]: 

421 r"""Return a callable for the flush rows method over gRPC. 

422 

423 Flushes rows to a BUFFERED stream. 

424 

425 If users are appending rows to BUFFERED stream, flush operation 

426 is required in order for the rows to become available for 

427 reading. A Flush operation flushes up to any previously flushed 

428 offset in a BUFFERED stream, to the offset specified in the 

429 request. 

430 

431 Flush is not supported on the \_default stream, since it is not 

432 BUFFERED. 

433 

434 Returns: 

435 Callable[[~.FlushRowsRequest], 

436 ~.FlushRowsResponse]: 

437 A function that, when called, will call the underlying RPC 

438 on the server. 

439 """ 

440 # Generate a "stub function" on-the-fly which will actually make 

441 # the request. 

442 # gRPC handles serialization and deserialization, so we just need 

443 # to pass in the functions for each. 

444 if "flush_rows" not in self._stubs: 

445 self._stubs["flush_rows"] = self.grpc_channel.unary_unary( 

446 "/google.cloud.bigquery.storage.v1.BigQueryWrite/FlushRows", 

447 request_serializer=storage.FlushRowsRequest.serialize, 

448 response_deserializer=storage.FlushRowsResponse.deserialize, 

449 ) 

450 return self._stubs["flush_rows"] 

451 

452 def close(self): 

453 self.grpc_channel.close() 

454 

455 @property 

456 def kind(self) -> str: 

457 return "grpc" 

458 

459 

460__all__ = ("BigQueryWriteGrpcTransport",)