Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/cloud/bigquery_storage_v1/services/big_query_write/transports/grpc.py: 42%

80 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:10 +0000

1# -*- coding: utf-8 -*- 

2# Copyright 2022 Google LLC 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16import warnings 

17from typing import Callable, Dict, Optional, Sequence, Tuple, Union 

18 

19from google.api_core import grpc_helpers 

20from google.api_core import gapic_v1 

21import google.auth # type: ignore 

22from google.auth import credentials as ga_credentials # type: ignore 

23from google.auth.transport.grpc import SslCredentials # type: ignore 

24 

25import grpc # type: ignore 

26 

27from google.cloud.bigquery_storage_v1.types import storage 

28from google.cloud.bigquery_storage_v1.types import stream 

29from .base import BigQueryWriteTransport, DEFAULT_CLIENT_INFO 

30 

31 

32class BigQueryWriteGrpcTransport(BigQueryWriteTransport): 

33 """gRPC backend transport for BigQueryWrite. 

34 

35 BigQuery Write API. 

36 The Write API can be used to write data to BigQuery. 

37 For supplementary information about the Write API, see: 

38 https://cloud.google.com/bigquery/docs/write-api 

39 

40 This class defines the same methods as the primary client, so the 

41 primary client can load the underlying transport implementation 

42 and call it. 

43 

44 It sends protocol buffers over the wire using gRPC (which is built on 

45 top of HTTP/2); the ``grpcio`` package must be installed. 

46 """ 

47 

48 _stubs: Dict[str, Callable] 

49 

50 def __init__( 

51 self, 

52 *, 

53 host: str = "bigquerystorage.googleapis.com", 

54 credentials: Optional[ga_credentials.Credentials] = None, 

55 credentials_file: Optional[str] = None, 

56 scopes: Optional[Sequence[str]] = None, 

57 channel: Optional[grpc.Channel] = None, 

58 api_mtls_endpoint: Optional[str] = None, 

59 client_cert_source: Optional[Callable[[], Tuple[bytes, bytes]]] = None, 

60 ssl_channel_credentials: Optional[grpc.ChannelCredentials] = None, 

61 client_cert_source_for_mtls: Optional[Callable[[], Tuple[bytes, bytes]]] = None, 

62 quota_project_id: Optional[str] = None, 

63 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, 

64 always_use_jwt_access: Optional[bool] = False, 

65 api_audience: Optional[str] = None, 

66 ) -> None: 

67 """Instantiate the transport. 

68 

69 Args: 

70 host (Optional[str]): 

71 The hostname to connect to. 

72 credentials (Optional[google.auth.credentials.Credentials]): The 

73 authorization credentials to attach to requests. These 

74 credentials identify the application to the service; if none 

75 are specified, the client will attempt to ascertain the 

76 credentials from the environment. 

77 This argument is ignored if ``channel`` is provided. 

78 credentials_file (Optional[str]): A file with credentials that can 

79 be loaded with :func:`google.auth.load_credentials_from_file`. 

80 This argument is ignored if ``channel`` is provided. 

81 scopes (Optional(Sequence[str])): A list of scopes. This argument is 

82 ignored if ``channel`` is provided. 

83 channel (Optional[grpc.Channel]): A ``Channel`` instance through 

84 which to make calls. 

85 api_mtls_endpoint (Optional[str]): Deprecated. The mutual TLS endpoint. 

86 If provided, it overrides the ``host`` argument and tries to create 

87 a mutual TLS channel with client SSL credentials from 

88 ``client_cert_source`` or application default SSL credentials. 

89 client_cert_source (Optional[Callable[[], Tuple[bytes, bytes]]]): 

90 Deprecated. A callback to provide client SSL certificate bytes and 

91 private key bytes, both in PEM format. It is ignored if 

92 ``api_mtls_endpoint`` is None. 

93 ssl_channel_credentials (grpc.ChannelCredentials): SSL credentials 

94 for the grpc channel. It is ignored if ``channel`` is provided. 

95 client_cert_source_for_mtls (Optional[Callable[[], Tuple[bytes, bytes]]]): 

96 A callback to provide client certificate bytes and private key bytes, 

97 both in PEM format. It is used to configure a mutual TLS channel. It is 

98 ignored if ``channel`` or ``ssl_channel_credentials`` is provided. 

99 quota_project_id (Optional[str]): An optional project to use for billing 

100 and quota. 

101 client_info (google.api_core.gapic_v1.client_info.ClientInfo): 

102 The client info used to send a user-agent string along with 

103 API requests. If ``None``, then default info will be used. 

104 Generally, you only need to set this if you're developing 

105 your own client library. 

106 always_use_jwt_access (Optional[bool]): Whether self signed JWT should 

107 be used for service account credentials. 

108 

109 Raises: 

110 google.auth.exceptions.MutualTLSChannelError: If mutual TLS transport 

111 creation failed for any reason. 

112 google.api_core.exceptions.DuplicateCredentialArgs: If both ``credentials`` 

113 and ``credentials_file`` are passed. 

114 """ 

115 self._grpc_channel = None 

116 self._ssl_channel_credentials = ssl_channel_credentials 

117 self._stubs: Dict[str, Callable] = {} 

118 

119 if api_mtls_endpoint: 

120 warnings.warn("api_mtls_endpoint is deprecated", DeprecationWarning) 

121 if client_cert_source: 

122 warnings.warn("client_cert_source is deprecated", DeprecationWarning) 

123 

124 if channel: 

125 # Ignore credentials if a channel was passed. 

126 credentials = False 

127 # If a channel was explicitly provided, set it. 

128 self._grpc_channel = channel 

129 self._ssl_channel_credentials = None 

130 

131 else: 

132 if api_mtls_endpoint: 

133 host = api_mtls_endpoint 

134 

135 # Create SSL credentials with client_cert_source or application 

136 # default SSL credentials. 

137 if client_cert_source: 

138 cert, key = client_cert_source() 

139 self._ssl_channel_credentials = grpc.ssl_channel_credentials( 

140 certificate_chain=cert, private_key=key 

141 ) 

142 else: 

143 self._ssl_channel_credentials = SslCredentials().ssl_credentials 

144 

145 else: 

146 if client_cert_source_for_mtls and not ssl_channel_credentials: 

147 cert, key = client_cert_source_for_mtls() 

148 self._ssl_channel_credentials = grpc.ssl_channel_credentials( 

149 certificate_chain=cert, private_key=key 

150 ) 

151 

152 # The base transport sets the host, credentials and scopes 

153 super().__init__( 

154 host=host, 

155 credentials=credentials, 

156 credentials_file=credentials_file, 

157 scopes=scopes, 

158 quota_project_id=quota_project_id, 

159 client_info=client_info, 

160 always_use_jwt_access=always_use_jwt_access, 

161 api_audience=api_audience, 

162 ) 

163 

164 if not self._grpc_channel: 

165 self._grpc_channel = type(self).create_channel( 

166 self._host, 

167 # use the credentials which are saved 

168 credentials=self._credentials, 

169 # Set ``credentials_file`` to ``None`` here as 

170 # the credentials that we saved earlier should be used. 

171 credentials_file=None, 

172 scopes=self._scopes, 

173 ssl_credentials=self._ssl_channel_credentials, 

174 quota_project_id=quota_project_id, 

175 options=[ 

176 ("grpc.max_send_message_length", -1), 

177 ("grpc.max_receive_message_length", -1), 

178 ], 

179 ) 

180 

181 # Wrap messages. This must be done after self._grpc_channel exists 

182 self._prep_wrapped_messages(client_info) 

183 

184 @classmethod 

185 def create_channel( 

186 cls, 

187 host: str = "bigquerystorage.googleapis.com", 

188 credentials: Optional[ga_credentials.Credentials] = None, 

189 credentials_file: Optional[str] = None, 

190 scopes: Optional[Sequence[str]] = None, 

191 quota_project_id: Optional[str] = None, 

192 **kwargs, 

193 ) -> grpc.Channel: 

194 """Create and return a gRPC channel object. 

195 Args: 

196 host (Optional[str]): The host for the channel to use. 

197 credentials (Optional[~.Credentials]): The 

198 authorization credentials to attach to requests. These 

199 credentials identify this application to the service. If 

200 none are specified, the client will attempt to ascertain 

201 the credentials from the environment. 

202 credentials_file (Optional[str]): A file with credentials that can 

203 be loaded with :func:`google.auth.load_credentials_from_file`. 

204 This argument is mutually exclusive with credentials. 

205 scopes (Optional[Sequence[str]]): A optional list of scopes needed for this 

206 service. These are only used when credentials are not specified and 

207 are passed to :func:`google.auth.default`. 

208 quota_project_id (Optional[str]): An optional project to use for billing 

209 and quota. 

210 kwargs (Optional[dict]): Keyword arguments, which are passed to the 

211 channel creation. 

212 Returns: 

213 grpc.Channel: A gRPC channel object. 

214 

215 Raises: 

216 google.api_core.exceptions.DuplicateCredentialArgs: If both ``credentials`` 

217 and ``credentials_file`` are passed. 

218 """ 

219 

220 return grpc_helpers.create_channel( 

221 host, 

222 credentials=credentials, 

223 credentials_file=credentials_file, 

224 quota_project_id=quota_project_id, 

225 default_scopes=cls.AUTH_SCOPES, 

226 scopes=scopes, 

227 default_host=cls.DEFAULT_HOST, 

228 **kwargs, 

229 ) 

230 

231 @property 

232 def grpc_channel(self) -> grpc.Channel: 

233 """Return the channel designed to connect to this service.""" 

234 return self._grpc_channel 

235 

236 @property 

237 def create_write_stream( 

238 self, 

239 ) -> Callable[[storage.CreateWriteStreamRequest], stream.WriteStream]: 

240 r"""Return a callable for the create write stream method over gRPC. 

241 

242 Creates a write stream to the given table. Additionally, every 

243 table has a special stream named '_default' to which data can be 

244 written. This stream doesn't need to be created using 

245 CreateWriteStream. It is a stream that can be used 

246 simultaneously by any number of clients. Data written to this 

247 stream is considered committed as soon as an acknowledgement is 

248 received. 

249 

250 Returns: 

251 Callable[[~.CreateWriteStreamRequest], 

252 ~.WriteStream]: 

253 A function that, when called, will call the underlying RPC 

254 on the server. 

255 """ 

256 # Generate a "stub function" on-the-fly which will actually make 

257 # the request. 

258 # gRPC handles serialization and deserialization, so we just need 

259 # to pass in the functions for each. 

260 if "create_write_stream" not in self._stubs: 

261 self._stubs["create_write_stream"] = self.grpc_channel.unary_unary( 

262 "/google.cloud.bigquery.storage.v1.BigQueryWrite/CreateWriteStream", 

263 request_serializer=storage.CreateWriteStreamRequest.serialize, 

264 response_deserializer=stream.WriteStream.deserialize, 

265 ) 

266 return self._stubs["create_write_stream"] 

267 

268 @property 

269 def append_rows( 

270 self, 

271 ) -> Callable[[storage.AppendRowsRequest], storage.AppendRowsResponse]: 

272 r"""Return a callable for the append rows method over gRPC. 

273 

274 Appends data to the given stream. 

275 

276 If ``offset`` is specified, the ``offset`` is checked against 

277 the end of stream. The server returns ``OUT_OF_RANGE`` in 

278 ``AppendRowsResponse`` if an attempt is made to append to an 

279 offset beyond the current end of the stream or 

280 ``ALREADY_EXISTS`` if user provides an ``offset`` that has 

281 already been written to. User can retry with adjusted offset 

282 within the same RPC connection. If ``offset`` is not specified, 

283 append happens at the end of the stream. 

284 

285 The response contains an optional offset at which the append 

286 happened. No offset information will be returned for appends to 

287 a default stream. 

288 

289 Responses are received in the same order in which requests are 

290 sent. There will be one response for each successful inserted 

291 request. Responses may optionally embed error information if the 

292 originating AppendRequest was not successfully processed. 

293 

294 The specifics of when successfully appended data is made visible 

295 to the table are governed by the type of stream: 

296 

297 - For COMMITTED streams (which includes the default stream), 

298 data is visible immediately upon successful append. 

299 

300 - For BUFFERED streams, data is made visible via a subsequent 

301 ``FlushRows`` rpc which advances a cursor to a newer offset 

302 in the stream. 

303 

304 - For PENDING streams, data is not made visible until the 

305 stream itself is finalized (via the ``FinalizeWriteStream`` 

306 rpc), and the stream is explicitly committed via the 

307 ``BatchCommitWriteStreams`` rpc. 

308 

309 Returns: 

310 Callable[[~.AppendRowsRequest], 

311 ~.AppendRowsResponse]: 

312 A function that, when called, will call the underlying RPC 

313 on the server. 

314 """ 

315 # Generate a "stub function" on-the-fly which will actually make 

316 # the request. 

317 # gRPC handles serialization and deserialization, so we just need 

318 # to pass in the functions for each. 

319 if "append_rows" not in self._stubs: 

320 self._stubs["append_rows"] = self.grpc_channel.stream_stream( 

321 "/google.cloud.bigquery.storage.v1.BigQueryWrite/AppendRows", 

322 request_serializer=storage.AppendRowsRequest.serialize, 

323 response_deserializer=storage.AppendRowsResponse.deserialize, 

324 ) 

325 return self._stubs["append_rows"] 

326 

327 @property 

328 def get_write_stream( 

329 self, 

330 ) -> Callable[[storage.GetWriteStreamRequest], stream.WriteStream]: 

331 r"""Return a callable for the get write stream method over gRPC. 

332 

333 Gets information about a write stream. 

334 

335 Returns: 

336 Callable[[~.GetWriteStreamRequest], 

337 ~.WriteStream]: 

338 A function that, when called, will call the underlying RPC 

339 on the server. 

340 """ 

341 # Generate a "stub function" on-the-fly which will actually make 

342 # the request. 

343 # gRPC handles serialization and deserialization, so we just need 

344 # to pass in the functions for each. 

345 if "get_write_stream" not in self._stubs: 

346 self._stubs["get_write_stream"] = self.grpc_channel.unary_unary( 

347 "/google.cloud.bigquery.storage.v1.BigQueryWrite/GetWriteStream", 

348 request_serializer=storage.GetWriteStreamRequest.serialize, 

349 response_deserializer=stream.WriteStream.deserialize, 

350 ) 

351 return self._stubs["get_write_stream"] 

352 

353 @property 

354 def finalize_write_stream( 

355 self, 

356 ) -> Callable[ 

357 [storage.FinalizeWriteStreamRequest], storage.FinalizeWriteStreamResponse 

358 ]: 

359 r"""Return a callable for the finalize write stream method over gRPC. 

360 

361 Finalize a write stream so that no new data can be appended to 

362 the stream. Finalize is not supported on the '_default' stream. 

363 

364 Returns: 

365 Callable[[~.FinalizeWriteStreamRequest], 

366 ~.FinalizeWriteStreamResponse]: 

367 A function that, when called, will call the underlying RPC 

368 on the server. 

369 """ 

370 # Generate a "stub function" on-the-fly which will actually make 

371 # the request. 

372 # gRPC handles serialization and deserialization, so we just need 

373 # to pass in the functions for each. 

374 if "finalize_write_stream" not in self._stubs: 

375 self._stubs["finalize_write_stream"] = self.grpc_channel.unary_unary( 

376 "/google.cloud.bigquery.storage.v1.BigQueryWrite/FinalizeWriteStream", 

377 request_serializer=storage.FinalizeWriteStreamRequest.serialize, 

378 response_deserializer=storage.FinalizeWriteStreamResponse.deserialize, 

379 ) 

380 return self._stubs["finalize_write_stream"] 

381 

382 @property 

383 def batch_commit_write_streams( 

384 self, 

385 ) -> Callable[ 

386 [storage.BatchCommitWriteStreamsRequest], 

387 storage.BatchCommitWriteStreamsResponse, 

388 ]: 

389 r"""Return a callable for the batch commit write streams method over gRPC. 

390 

391 Atomically commits a group of ``PENDING`` streams that belong to 

392 the same ``parent`` table. 

393 

394 Streams must be finalized before commit and cannot be committed 

395 multiple times. Once a stream is committed, data in the stream 

396 becomes available for read operations. 

397 

398 Returns: 

399 Callable[[~.BatchCommitWriteStreamsRequest], 

400 ~.BatchCommitWriteStreamsResponse]: 

401 A function that, when called, will call the underlying RPC 

402 on the server. 

403 """ 

404 # Generate a "stub function" on-the-fly which will actually make 

405 # the request. 

406 # gRPC handles serialization and deserialization, so we just need 

407 # to pass in the functions for each. 

408 if "batch_commit_write_streams" not in self._stubs: 

409 self._stubs["batch_commit_write_streams"] = self.grpc_channel.unary_unary( 

410 "/google.cloud.bigquery.storage.v1.BigQueryWrite/BatchCommitWriteStreams", 

411 request_serializer=storage.BatchCommitWriteStreamsRequest.serialize, 

412 response_deserializer=storage.BatchCommitWriteStreamsResponse.deserialize, 

413 ) 

414 return self._stubs["batch_commit_write_streams"] 

415 

416 @property 

417 def flush_rows( 

418 self, 

419 ) -> Callable[[storage.FlushRowsRequest], storage.FlushRowsResponse]: 

420 r"""Return a callable for the flush rows method over gRPC. 

421 

422 Flushes rows to a BUFFERED stream. 

423 

424 If users are appending rows to BUFFERED stream, flush operation 

425 is required in order for the rows to become available for 

426 reading. A Flush operation flushes up to any previously flushed 

427 offset in a BUFFERED stream, to the offset specified in the 

428 request. 

429 

430 Flush is not supported on the \_default stream, since it is not 

431 BUFFERED. 

432 

433 Returns: 

434 Callable[[~.FlushRowsRequest], 

435 ~.FlushRowsResponse]: 

436 A function that, when called, will call the underlying RPC 

437 on the server. 

438 """ 

439 # Generate a "stub function" on-the-fly which will actually make 

440 # the request. 

441 # gRPC handles serialization and deserialization, so we just need 

442 # to pass in the functions for each. 

443 if "flush_rows" not in self._stubs: 

444 self._stubs["flush_rows"] = self.grpc_channel.unary_unary( 

445 "/google.cloud.bigquery.storage.v1.BigQueryWrite/FlushRows", 

446 request_serializer=storage.FlushRowsRequest.serialize, 

447 response_deserializer=storage.FlushRowsResponse.deserialize, 

448 ) 

449 return self._stubs["flush_rows"] 

450 

451 def close(self): 

452 self.grpc_channel.close() 

453 

454 @property 

455 def kind(self) -> str: 

456 return "grpc" 

457 

458 

459__all__ = ("BigQueryWriteGrpcTransport",)