Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/cloud/bigquery_storage_v1/services/big_query_write/transports/grpc_asyncio.py: 42%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

77 statements  

1# -*- coding: utf-8 -*- 

2# Copyright 2024 Google LLC 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16from typing import Awaitable, Callable, Dict, Optional, Sequence, Tuple, Union 

17import warnings 

18 

19from google.api_core import gapic_v1, grpc_helpers_async 

20from google.auth import credentials as ga_credentials # type: ignore 

21from google.auth.transport.grpc import SslCredentials # type: ignore 

22import grpc # type: ignore 

23from grpc.experimental import aio # type: ignore 

24 

25from google.cloud.bigquery_storage_v1.types import storage, stream 

26 

27from .base import DEFAULT_CLIENT_INFO, BigQueryWriteTransport 

28from .grpc import BigQueryWriteGrpcTransport 

29 

30 

31class BigQueryWriteGrpcAsyncIOTransport(BigQueryWriteTransport): 

32 """gRPC AsyncIO backend transport for BigQueryWrite. 

33 

34 BigQuery Write API. 

35 

36 The Write API can be used to write data to BigQuery. 

37 

38 For supplementary information about the Write API, see: 

39 

40 https://cloud.google.com/bigquery/docs/write-api 

41 

42 This class defines the same methods as the primary client, so the 

43 primary client can load the underlying transport implementation 

44 and call it. 

45 

46 It sends protocol buffers over the wire using gRPC (which is built on 

47 top of HTTP/2); the ``grpcio`` package must be installed. 

48 """ 

49 

50 _grpc_channel: aio.Channel 

51 _stubs: Dict[str, Callable] = {} 

52 

53 @classmethod 

54 def create_channel( 

55 cls, 

56 host: str = "bigquerystorage.googleapis.com", 

57 credentials: Optional[ga_credentials.Credentials] = None, 

58 credentials_file: Optional[str] = None, 

59 scopes: Optional[Sequence[str]] = None, 

60 quota_project_id: Optional[str] = None, 

61 **kwargs, 

62 ) -> aio.Channel: 

63 """Create and return a gRPC AsyncIO channel object. 

64 Args: 

65 host (Optional[str]): The host for the channel to use. 

66 credentials (Optional[~.Credentials]): The 

67 authorization credentials to attach to requests. These 

68 credentials identify this application to the service. If 

69 none are specified, the client will attempt to ascertain 

70 the credentials from the environment. 

71 credentials_file (Optional[str]): A file with credentials that can 

72 be loaded with :func:`google.auth.load_credentials_from_file`. 

73 This argument is ignored if ``channel`` is provided. 

74 scopes (Optional[Sequence[str]]): A optional list of scopes needed for this 

75 service. These are only used when credentials are not specified and 

76 are passed to :func:`google.auth.default`. 

77 quota_project_id (Optional[str]): An optional project to use for billing 

78 and quota. 

79 kwargs (Optional[dict]): Keyword arguments, which are passed to the 

80 channel creation. 

81 Returns: 

82 aio.Channel: A gRPC AsyncIO channel object. 

83 """ 

84 

85 return grpc_helpers_async.create_channel( 

86 host, 

87 credentials=credentials, 

88 credentials_file=credentials_file, 

89 quota_project_id=quota_project_id, 

90 default_scopes=cls.AUTH_SCOPES, 

91 scopes=scopes, 

92 default_host=cls.DEFAULT_HOST, 

93 **kwargs, 

94 ) 

95 

96 def __init__( 

97 self, 

98 *, 

99 host: str = "bigquerystorage.googleapis.com", 

100 credentials: Optional[ga_credentials.Credentials] = None, 

101 credentials_file: Optional[str] = None, 

102 scopes: Optional[Sequence[str]] = None, 

103 channel: Optional[aio.Channel] = None, 

104 api_mtls_endpoint: Optional[str] = None, 

105 client_cert_source: Optional[Callable[[], Tuple[bytes, bytes]]] = None, 

106 ssl_channel_credentials: Optional[grpc.ChannelCredentials] = None, 

107 client_cert_source_for_mtls: Optional[Callable[[], Tuple[bytes, bytes]]] = None, 

108 quota_project_id: Optional[str] = None, 

109 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, 

110 always_use_jwt_access: Optional[bool] = False, 

111 api_audience: Optional[str] = None, 

112 ) -> None: 

113 """Instantiate the transport. 

114 

115 Args: 

116 host (Optional[str]): 

117 The hostname to connect to (default: 'bigquerystorage.googleapis.com'). 

118 credentials (Optional[google.auth.credentials.Credentials]): The 

119 authorization credentials to attach to requests. These 

120 credentials identify the application to the service; if none 

121 are specified, the client will attempt to ascertain the 

122 credentials from the environment. 

123 This argument is ignored if ``channel`` is provided. 

124 credentials_file (Optional[str]): A file with credentials that can 

125 be loaded with :func:`google.auth.load_credentials_from_file`. 

126 This argument is ignored if ``channel`` is provided. 

127 scopes (Optional[Sequence[str]]): A optional list of scopes needed for this 

128 service. These are only used when credentials are not specified and 

129 are passed to :func:`google.auth.default`. 

130 channel (Optional[aio.Channel]): A ``Channel`` instance through 

131 which to make calls. 

132 api_mtls_endpoint (Optional[str]): Deprecated. The mutual TLS endpoint. 

133 If provided, it overrides the ``host`` argument and tries to create 

134 a mutual TLS channel with client SSL credentials from 

135 ``client_cert_source`` or application default SSL credentials. 

136 client_cert_source (Optional[Callable[[], Tuple[bytes, bytes]]]): 

137 Deprecated. A callback to provide client SSL certificate bytes and 

138 private key bytes, both in PEM format. It is ignored if 

139 ``api_mtls_endpoint`` is None. 

140 ssl_channel_credentials (grpc.ChannelCredentials): SSL credentials 

141 for the grpc channel. It is ignored if ``channel`` is provided. 

142 client_cert_source_for_mtls (Optional[Callable[[], Tuple[bytes, bytes]]]): 

143 A callback to provide client certificate bytes and private key bytes, 

144 both in PEM format. It is used to configure a mutual TLS channel. It is 

145 ignored if ``channel`` or ``ssl_channel_credentials`` is provided. 

146 quota_project_id (Optional[str]): An optional project to use for billing 

147 and quota. 

148 client_info (google.api_core.gapic_v1.client_info.ClientInfo): 

149 The client info used to send a user-agent string along with 

150 API requests. If ``None``, then default info will be used. 

151 Generally, you only need to set this if you're developing 

152 your own client library. 

153 always_use_jwt_access (Optional[bool]): Whether self signed JWT should 

154 be used for service account credentials. 

155 

156 Raises: 

157 google.auth.exceptions.MutualTlsChannelError: If mutual TLS transport 

158 creation failed for any reason. 

159 google.api_core.exceptions.DuplicateCredentialArgs: If both ``credentials`` 

160 and ``credentials_file`` are passed. 

161 """ 

162 self._grpc_channel = None 

163 self._ssl_channel_credentials = ssl_channel_credentials 

164 self._stubs: Dict[str, Callable] = {} 

165 

166 if api_mtls_endpoint: 

167 warnings.warn("api_mtls_endpoint is deprecated", DeprecationWarning) 

168 if client_cert_source: 

169 warnings.warn("client_cert_source is deprecated", DeprecationWarning) 

170 

171 if channel: 

172 # Ignore credentials if a channel was passed. 

173 credentials = False 

174 # If a channel was explicitly provided, set it. 

175 self._grpc_channel = channel 

176 self._ssl_channel_credentials = None 

177 else: 

178 if api_mtls_endpoint: 

179 host = api_mtls_endpoint 

180 

181 # Create SSL credentials with client_cert_source or application 

182 # default SSL credentials. 

183 if client_cert_source: 

184 cert, key = client_cert_source() 

185 self._ssl_channel_credentials = grpc.ssl_channel_credentials( 

186 certificate_chain=cert, private_key=key 

187 ) 

188 else: 

189 self._ssl_channel_credentials = SslCredentials().ssl_credentials 

190 

191 else: 

192 if client_cert_source_for_mtls and not ssl_channel_credentials: 

193 cert, key = client_cert_source_for_mtls() 

194 self._ssl_channel_credentials = grpc.ssl_channel_credentials( 

195 certificate_chain=cert, private_key=key 

196 ) 

197 

198 # The base transport sets the host, credentials and scopes 

199 super().__init__( 

200 host=host, 

201 credentials=credentials, 

202 credentials_file=credentials_file, 

203 scopes=scopes, 

204 quota_project_id=quota_project_id, 

205 client_info=client_info, 

206 always_use_jwt_access=always_use_jwt_access, 

207 api_audience=api_audience, 

208 ) 

209 

210 if not self._grpc_channel: 

211 self._grpc_channel = type(self).create_channel( 

212 self._host, 

213 # use the credentials which are saved 

214 credentials=self._credentials, 

215 # Set ``credentials_file`` to ``None`` here as 

216 # the credentials that we saved earlier should be used. 

217 credentials_file=None, 

218 scopes=self._scopes, 

219 ssl_credentials=self._ssl_channel_credentials, 

220 quota_project_id=quota_project_id, 

221 options=[ 

222 ("grpc.max_send_message_length", -1), 

223 ("grpc.max_receive_message_length", -1), 

224 ], 

225 ) 

226 

227 # Wrap messages. This must be done after self._grpc_channel exists 

228 self._prep_wrapped_messages(client_info) 

229 

230 @property 

231 def grpc_channel(self) -> aio.Channel: 

232 """Create the channel designed to connect to this service. 

233 

234 This property caches on the instance; repeated calls return 

235 the same channel. 

236 """ 

237 # Return the channel from cache. 

238 return self._grpc_channel 

239 

240 @property 

241 def create_write_stream( 

242 self, 

243 ) -> Callable[[storage.CreateWriteStreamRequest], Awaitable[stream.WriteStream]]: 

244 r"""Return a callable for the create write stream method over gRPC. 

245 

246 Creates a write stream to the given table. Additionally, every 

247 table has a special stream named '_default' to which data can be 

248 written. This stream doesn't need to be created using 

249 CreateWriteStream. It is a stream that can be used 

250 simultaneously by any number of clients. Data written to this 

251 stream is considered committed as soon as an acknowledgement is 

252 received. 

253 

254 Returns: 

255 Callable[[~.CreateWriteStreamRequest], 

256 Awaitable[~.WriteStream]]: 

257 A function that, when called, will call the underlying RPC 

258 on the server. 

259 """ 

260 # Generate a "stub function" on-the-fly which will actually make 

261 # the request. 

262 # gRPC handles serialization and deserialization, so we just need 

263 # to pass in the functions for each. 

264 if "create_write_stream" not in self._stubs: 

265 self._stubs["create_write_stream"] = self.grpc_channel.unary_unary( 

266 "/google.cloud.bigquery.storage.v1.BigQueryWrite/CreateWriteStream", 

267 request_serializer=storage.CreateWriteStreamRequest.serialize, 

268 response_deserializer=stream.WriteStream.deserialize, 

269 ) 

270 return self._stubs["create_write_stream"] 

271 

272 @property 

273 def append_rows( 

274 self, 

275 ) -> Callable[[storage.AppendRowsRequest], Awaitable[storage.AppendRowsResponse]]: 

276 r"""Return a callable for the append rows method over gRPC. 

277 

278 Appends data to the given stream. 

279 

280 If ``offset`` is specified, the ``offset`` is checked against 

281 the end of stream. The server returns ``OUT_OF_RANGE`` in 

282 ``AppendRowsResponse`` if an attempt is made to append to an 

283 offset beyond the current end of the stream or 

284 ``ALREADY_EXISTS`` if user provides an ``offset`` that has 

285 already been written to. User can retry with adjusted offset 

286 within the same RPC connection. If ``offset`` is not specified, 

287 append happens at the end of the stream. 

288 

289 The response contains an optional offset at which the append 

290 happened. No offset information will be returned for appends to 

291 a default stream. 

292 

293 Responses are received in the same order in which requests are 

294 sent. There will be one response for each successful inserted 

295 request. Responses may optionally embed error information if the 

296 originating AppendRequest was not successfully processed. 

297 

298 The specifics of when successfully appended data is made visible 

299 to the table are governed by the type of stream: 

300 

301 - For COMMITTED streams (which includes the default stream), 

302 data is visible immediately upon successful append. 

303 

304 - For BUFFERED streams, data is made visible via a subsequent 

305 ``FlushRows`` rpc which advances a cursor to a newer offset 

306 in the stream. 

307 

308 - For PENDING streams, data is not made visible until the 

309 stream itself is finalized (via the ``FinalizeWriteStream`` 

310 rpc), and the stream is explicitly committed via the 

311 ``BatchCommitWriteStreams`` rpc. 

312 

313 Returns: 

314 Callable[[~.AppendRowsRequest], 

315 Awaitable[~.AppendRowsResponse]]: 

316 A function that, when called, will call the underlying RPC 

317 on the server. 

318 """ 

319 # Generate a "stub function" on-the-fly which will actually make 

320 # the request. 

321 # gRPC handles serialization and deserialization, so we just need 

322 # to pass in the functions for each. 

323 if "append_rows" not in self._stubs: 

324 self._stubs["append_rows"] = self.grpc_channel.stream_stream( 

325 "/google.cloud.bigquery.storage.v1.BigQueryWrite/AppendRows", 

326 request_serializer=storage.AppendRowsRequest.serialize, 

327 response_deserializer=storage.AppendRowsResponse.deserialize, 

328 ) 

329 return self._stubs["append_rows"] 

330 

331 @property 

332 def get_write_stream( 

333 self, 

334 ) -> Callable[[storage.GetWriteStreamRequest], Awaitable[stream.WriteStream]]: 

335 r"""Return a callable for the get write stream method over gRPC. 

336 

337 Gets information about a write stream. 

338 

339 Returns: 

340 Callable[[~.GetWriteStreamRequest], 

341 Awaitable[~.WriteStream]]: 

342 A function that, when called, will call the underlying RPC 

343 on the server. 

344 """ 

345 # Generate a "stub function" on-the-fly which will actually make 

346 # the request. 

347 # gRPC handles serialization and deserialization, so we just need 

348 # to pass in the functions for each. 

349 if "get_write_stream" not in self._stubs: 

350 self._stubs["get_write_stream"] = self.grpc_channel.unary_unary( 

351 "/google.cloud.bigquery.storage.v1.BigQueryWrite/GetWriteStream", 

352 request_serializer=storage.GetWriteStreamRequest.serialize, 

353 response_deserializer=stream.WriteStream.deserialize, 

354 ) 

355 return self._stubs["get_write_stream"] 

356 

357 @property 

358 def finalize_write_stream( 

359 self, 

360 ) -> Callable[ 

361 [storage.FinalizeWriteStreamRequest], 

362 Awaitable[storage.FinalizeWriteStreamResponse], 

363 ]: 

364 r"""Return a callable for the finalize write stream method over gRPC. 

365 

366 Finalize a write stream so that no new data can be appended to 

367 the stream. Finalize is not supported on the '_default' stream. 

368 

369 Returns: 

370 Callable[[~.FinalizeWriteStreamRequest], 

371 Awaitable[~.FinalizeWriteStreamResponse]]: 

372 A function that, when called, will call the underlying RPC 

373 on the server. 

374 """ 

375 # Generate a "stub function" on-the-fly which will actually make 

376 # the request. 

377 # gRPC handles serialization and deserialization, so we just need 

378 # to pass in the functions for each. 

379 if "finalize_write_stream" not in self._stubs: 

380 self._stubs["finalize_write_stream"] = self.grpc_channel.unary_unary( 

381 "/google.cloud.bigquery.storage.v1.BigQueryWrite/FinalizeWriteStream", 

382 request_serializer=storage.FinalizeWriteStreamRequest.serialize, 

383 response_deserializer=storage.FinalizeWriteStreamResponse.deserialize, 

384 ) 

385 return self._stubs["finalize_write_stream"] 

386 

387 @property 

388 def batch_commit_write_streams( 

389 self, 

390 ) -> Callable[ 

391 [storage.BatchCommitWriteStreamsRequest], 

392 Awaitable[storage.BatchCommitWriteStreamsResponse], 

393 ]: 

394 r"""Return a callable for the batch commit write streams method over gRPC. 

395 

396 Atomically commits a group of ``PENDING`` streams that belong to 

397 the same ``parent`` table. 

398 

399 Streams must be finalized before commit and cannot be committed 

400 multiple times. Once a stream is committed, data in the stream 

401 becomes available for read operations. 

402 

403 Returns: 

404 Callable[[~.BatchCommitWriteStreamsRequest], 

405 Awaitable[~.BatchCommitWriteStreamsResponse]]: 

406 A function that, when called, will call the underlying RPC 

407 on the server. 

408 """ 

409 # Generate a "stub function" on-the-fly which will actually make 

410 # the request. 

411 # gRPC handles serialization and deserialization, so we just need 

412 # to pass in the functions for each. 

413 if "batch_commit_write_streams" not in self._stubs: 

414 self._stubs["batch_commit_write_streams"] = self.grpc_channel.unary_unary( 

415 "/google.cloud.bigquery.storage.v1.BigQueryWrite/BatchCommitWriteStreams", 

416 request_serializer=storage.BatchCommitWriteStreamsRequest.serialize, 

417 response_deserializer=storage.BatchCommitWriteStreamsResponse.deserialize, 

418 ) 

419 return self._stubs["batch_commit_write_streams"] 

420 

421 @property 

422 def flush_rows( 

423 self, 

424 ) -> Callable[[storage.FlushRowsRequest], Awaitable[storage.FlushRowsResponse]]: 

425 r"""Return a callable for the flush rows method over gRPC. 

426 

427 Flushes rows to a BUFFERED stream. 

428 

429 If users are appending rows to BUFFERED stream, flush operation 

430 is required in order for the rows to become available for 

431 reading. A Flush operation flushes up to any previously flushed 

432 offset in a BUFFERED stream, to the offset specified in the 

433 request. 

434 

435 Flush is not supported on the \_default stream, since it is not 

436 BUFFERED. 

437 

438 Returns: 

439 Callable[[~.FlushRowsRequest], 

440 Awaitable[~.FlushRowsResponse]]: 

441 A function that, when called, will call the underlying RPC 

442 on the server. 

443 """ 

444 # Generate a "stub function" on-the-fly which will actually make 

445 # the request. 

446 # gRPC handles serialization and deserialization, so we just need 

447 # to pass in the functions for each. 

448 if "flush_rows" not in self._stubs: 

449 self._stubs["flush_rows"] = self.grpc_channel.unary_unary( 

450 "/google.cloud.bigquery.storage.v1.BigQueryWrite/FlushRows", 

451 request_serializer=storage.FlushRowsRequest.serialize, 

452 response_deserializer=storage.FlushRowsResponse.deserialize, 

453 ) 

454 return self._stubs["flush_rows"] 

455 

456 def close(self): 

457 return self.grpc_channel.close() 

458 

459 

460__all__ = ("BigQueryWriteGrpcAsyncIOTransport",)