Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/cloud/bigquery_storage_v1/services/big_query_read/transports/grpc.py: 41%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

63 statements  

1# -*- coding: utf-8 -*- 

2# Copyright 2024 Google LLC 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16from typing import Callable, Dict, Optional, Sequence, Tuple, Union 

17import warnings 

18 

19from google.api_core import gapic_v1, grpc_helpers 

20import google.auth # type: ignore 

21from google.auth import credentials as ga_credentials # type: ignore 

22from google.auth.transport.grpc import SslCredentials # type: ignore 

23import grpc # type: ignore 

24 

25from google.cloud.bigquery_storage_v1.types import storage, stream 

26 

27from .base import DEFAULT_CLIENT_INFO, BigQueryReadTransport 

28 

29 

30class BigQueryReadGrpcTransport(BigQueryReadTransport): 

31 """gRPC backend transport for BigQueryRead. 

32 

33 BigQuery Read API. 

34 

35 The Read API can be used to read data from BigQuery. 

36 

37 This class defines the same methods as the primary client, so the 

38 primary client can load the underlying transport implementation 

39 and call it. 

40 

41 It sends protocol buffers over the wire using gRPC (which is built on 

42 top of HTTP/2); the ``grpcio`` package must be installed. 

43 """ 

44 

45 _stubs: Dict[str, Callable] 

46 

47 def __init__( 

48 self, 

49 *, 

50 host: str = "bigquerystorage.googleapis.com", 

51 credentials: Optional[ga_credentials.Credentials] = None, 

52 credentials_file: Optional[str] = None, 

53 scopes: Optional[Sequence[str]] = None, 

54 channel: Optional[grpc.Channel] = None, 

55 api_mtls_endpoint: Optional[str] = None, 

56 client_cert_source: Optional[Callable[[], Tuple[bytes, bytes]]] = None, 

57 ssl_channel_credentials: Optional[grpc.ChannelCredentials] = None, 

58 client_cert_source_for_mtls: Optional[Callable[[], Tuple[bytes, bytes]]] = None, 

59 quota_project_id: Optional[str] = None, 

60 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, 

61 always_use_jwt_access: Optional[bool] = False, 

62 api_audience: Optional[str] = None, 

63 ) -> None: 

64 """Instantiate the transport. 

65 

66 Args: 

67 host (Optional[str]): 

68 The hostname to connect to (default: 'bigquerystorage.googleapis.com'). 

69 credentials (Optional[google.auth.credentials.Credentials]): The 

70 authorization credentials to attach to requests. These 

71 credentials identify the application to the service; if none 

72 are specified, the client will attempt to ascertain the 

73 credentials from the environment. 

74 This argument is ignored if ``channel`` is provided. 

75 credentials_file (Optional[str]): A file with credentials that can 

76 be loaded with :func:`google.auth.load_credentials_from_file`. 

77 This argument is ignored if ``channel`` is provided. 

78 scopes (Optional(Sequence[str])): A list of scopes. This argument is 

79 ignored if ``channel`` is provided. 

80 channel (Optional[grpc.Channel]): A ``Channel`` instance through 

81 which to make calls. 

82 api_mtls_endpoint (Optional[str]): Deprecated. The mutual TLS endpoint. 

83 If provided, it overrides the ``host`` argument and tries to create 

84 a mutual TLS channel with client SSL credentials from 

85 ``client_cert_source`` or application default SSL credentials. 

86 client_cert_source (Optional[Callable[[], Tuple[bytes, bytes]]]): 

87 Deprecated. A callback to provide client SSL certificate bytes and 

88 private key bytes, both in PEM format. It is ignored if 

89 ``api_mtls_endpoint`` is None. 

90 ssl_channel_credentials (grpc.ChannelCredentials): SSL credentials 

91 for the grpc channel. It is ignored if ``channel`` is provided. 

92 client_cert_source_for_mtls (Optional[Callable[[], Tuple[bytes, bytes]]]): 

93 A callback to provide client certificate bytes and private key bytes, 

94 both in PEM format. It is used to configure a mutual TLS channel. It is 

95 ignored if ``channel`` or ``ssl_channel_credentials`` is provided. 

96 quota_project_id (Optional[str]): An optional project to use for billing 

97 and quota. 

98 client_info (google.api_core.gapic_v1.client_info.ClientInfo): 

99 The client info used to send a user-agent string along with 

100 API requests. If ``None``, then default info will be used. 

101 Generally, you only need to set this if you're developing 

102 your own client library. 

103 always_use_jwt_access (Optional[bool]): Whether self signed JWT should 

104 be used for service account credentials. 

105 

106 Raises: 

107 google.auth.exceptions.MutualTLSChannelError: If mutual TLS transport 

108 creation failed for any reason. 

109 google.api_core.exceptions.DuplicateCredentialArgs: If both ``credentials`` 

110 and ``credentials_file`` are passed. 

111 """ 

112 self._grpc_channel = None 

113 self._ssl_channel_credentials = ssl_channel_credentials 

114 self._stubs: Dict[str, Callable] = {} 

115 

116 if api_mtls_endpoint: 

117 warnings.warn("api_mtls_endpoint is deprecated", DeprecationWarning) 

118 if client_cert_source: 

119 warnings.warn("client_cert_source is deprecated", DeprecationWarning) 

120 

121 if channel: 

122 # Ignore credentials if a channel was passed. 

123 credentials = False 

124 # If a channel was explicitly provided, set it. 

125 self._grpc_channel = channel 

126 self._ssl_channel_credentials = None 

127 

128 else: 

129 if api_mtls_endpoint: 

130 host = api_mtls_endpoint 

131 

132 # Create SSL credentials with client_cert_source or application 

133 # default SSL credentials. 

134 if client_cert_source: 

135 cert, key = client_cert_source() 

136 self._ssl_channel_credentials = grpc.ssl_channel_credentials( 

137 certificate_chain=cert, private_key=key 

138 ) 

139 else: 

140 self._ssl_channel_credentials = SslCredentials().ssl_credentials 

141 

142 else: 

143 if client_cert_source_for_mtls and not ssl_channel_credentials: 

144 cert, key = client_cert_source_for_mtls() 

145 self._ssl_channel_credentials = grpc.ssl_channel_credentials( 

146 certificate_chain=cert, private_key=key 

147 ) 

148 

149 # The base transport sets the host, credentials and scopes 

150 super().__init__( 

151 host=host, 

152 credentials=credentials, 

153 credentials_file=credentials_file, 

154 scopes=scopes, 

155 quota_project_id=quota_project_id, 

156 client_info=client_info, 

157 always_use_jwt_access=always_use_jwt_access, 

158 api_audience=api_audience, 

159 ) 

160 

161 if not self._grpc_channel: 

162 self._grpc_channel = type(self).create_channel( 

163 self._host, 

164 # use the credentials which are saved 

165 credentials=self._credentials, 

166 # Set ``credentials_file`` to ``None`` here as 

167 # the credentials that we saved earlier should be used. 

168 credentials_file=None, 

169 scopes=self._scopes, 

170 ssl_credentials=self._ssl_channel_credentials, 

171 quota_project_id=quota_project_id, 

172 options=[ 

173 ("grpc.max_send_message_length", -1), 

174 ("grpc.max_receive_message_length", -1), 

175 ], 

176 ) 

177 

178 # Wrap messages. This must be done after self._grpc_channel exists 

179 self._prep_wrapped_messages(client_info) 

180 

181 @classmethod 

182 def create_channel( 

183 cls, 

184 host: str = "bigquerystorage.googleapis.com", 

185 credentials: Optional[ga_credentials.Credentials] = None, 

186 credentials_file: Optional[str] = None, 

187 scopes: Optional[Sequence[str]] = None, 

188 quota_project_id: Optional[str] = None, 

189 **kwargs, 

190 ) -> grpc.Channel: 

191 """Create and return a gRPC channel object. 

192 Args: 

193 host (Optional[str]): The host for the channel to use. 

194 credentials (Optional[~.Credentials]): The 

195 authorization credentials to attach to requests. These 

196 credentials identify this application to the service. If 

197 none are specified, the client will attempt to ascertain 

198 the credentials from the environment. 

199 credentials_file (Optional[str]): A file with credentials that can 

200 be loaded with :func:`google.auth.load_credentials_from_file`. 

201 This argument is mutually exclusive with credentials. 

202 scopes (Optional[Sequence[str]]): A optional list of scopes needed for this 

203 service. These are only used when credentials are not specified and 

204 are passed to :func:`google.auth.default`. 

205 quota_project_id (Optional[str]): An optional project to use for billing 

206 and quota. 

207 kwargs (Optional[dict]): Keyword arguments, which are passed to the 

208 channel creation. 

209 Returns: 

210 grpc.Channel: A gRPC channel object. 

211 

212 Raises: 

213 google.api_core.exceptions.DuplicateCredentialArgs: If both ``credentials`` 

214 and ``credentials_file`` are passed. 

215 """ 

216 

217 return grpc_helpers.create_channel( 

218 host, 

219 credentials=credentials, 

220 credentials_file=credentials_file, 

221 quota_project_id=quota_project_id, 

222 default_scopes=cls.AUTH_SCOPES, 

223 scopes=scopes, 

224 default_host=cls.DEFAULT_HOST, 

225 **kwargs, 

226 ) 

227 

228 @property 

229 def grpc_channel(self) -> grpc.Channel: 

230 """Return the channel designed to connect to this service.""" 

231 return self._grpc_channel 

232 

233 @property 

234 def create_read_session( 

235 self, 

236 ) -> Callable[[storage.CreateReadSessionRequest], stream.ReadSession]: 

237 r"""Return a callable for the create read session method over gRPC. 

238 

239 Creates a new read session. A read session divides 

240 the contents of a BigQuery table into one or more 

241 streams, which can then be used to read data from the 

242 table. The read session also specifies properties of the 

243 data to be read, such as a list of columns or a 

244 push-down filter describing the rows to be returned. 

245 

246 A particular row can be read by at most one stream. When 

247 the caller has reached the end of each stream in the 

248 session, then all the data in the table has been read. 

249 

250 Data is assigned to each stream such that roughly the 

251 same number of rows can be read from each stream. 

252 Because the server-side unit for assigning data is 

253 collections of rows, the API does not guarantee that 

254 each stream will return the same number or rows. 

255 Additionally, the limits are enforced based on the 

256 number of pre-filtered rows, so some filters can lead to 

257 lopsided assignments. 

258 

259 Read sessions automatically expire 6 hours after they 

260 are created and do not require manual clean-up by the 

261 caller. 

262 

263 Returns: 

264 Callable[[~.CreateReadSessionRequest], 

265 ~.ReadSession]: 

266 A function that, when called, will call the underlying RPC 

267 on the server. 

268 """ 

269 # Generate a "stub function" on-the-fly which will actually make 

270 # the request. 

271 # gRPC handles serialization and deserialization, so we just need 

272 # to pass in the functions for each. 

273 if "create_read_session" not in self._stubs: 

274 self._stubs["create_read_session"] = self.grpc_channel.unary_unary( 

275 "/google.cloud.bigquery.storage.v1.BigQueryRead/CreateReadSession", 

276 request_serializer=storage.CreateReadSessionRequest.serialize, 

277 response_deserializer=stream.ReadSession.deserialize, 

278 ) 

279 return self._stubs["create_read_session"] 

280 

281 @property 

282 def read_rows( 

283 self, 

284 ) -> Callable[[storage.ReadRowsRequest], storage.ReadRowsResponse]: 

285 r"""Return a callable for the read rows method over gRPC. 

286 

287 Reads rows from the stream in the format prescribed 

288 by the ReadSession. Each response contains one or more 

289 table rows, up to a maximum of 100 MiB per response; 

290 read requests which attempt to read individual rows 

291 larger than 100 MiB will fail. 

292 

293 Each request also returns a set of stream statistics 

294 reflecting the current state of the stream. 

295 

296 Returns: 

297 Callable[[~.ReadRowsRequest], 

298 ~.ReadRowsResponse]: 

299 A function that, when called, will call the underlying RPC 

300 on the server. 

301 """ 

302 # Generate a "stub function" on-the-fly which will actually make 

303 # the request. 

304 # gRPC handles serialization and deserialization, so we just need 

305 # to pass in the functions for each. 

306 if "read_rows" not in self._stubs: 

307 self._stubs["read_rows"] = self.grpc_channel.unary_stream( 

308 "/google.cloud.bigquery.storage.v1.BigQueryRead/ReadRows", 

309 request_serializer=storage.ReadRowsRequest.serialize, 

310 response_deserializer=storage.ReadRowsResponse.deserialize, 

311 ) 

312 return self._stubs["read_rows"] 

313 

314 @property 

315 def split_read_stream( 

316 self, 

317 ) -> Callable[[storage.SplitReadStreamRequest], storage.SplitReadStreamResponse]: 

318 r"""Return a callable for the split read stream method over gRPC. 

319 

320 Splits a given ``ReadStream`` into two ``ReadStream`` objects. 

321 These ``ReadStream`` objects are referred to as the primary and 

322 the residual streams of the split. The original ``ReadStream`` 

323 can still be read from in the same manner as before. Both of the 

324 returned ``ReadStream`` objects can also be read from, and the 

325 rows returned by both child streams will be the same as the rows 

326 read from the original stream. 

327 

328 Moreover, the two child streams will be allocated back-to-back 

329 in the original ``ReadStream``. Concretely, it is guaranteed 

330 that for streams original, primary, and residual, that 

331 original[0-j] = primary[0-j] and original[j-n] = residual[0-m] 

332 once the streams have been read to completion. 

333 

334 Returns: 

335 Callable[[~.SplitReadStreamRequest], 

336 ~.SplitReadStreamResponse]: 

337 A function that, when called, will call the underlying RPC 

338 on the server. 

339 """ 

340 # Generate a "stub function" on-the-fly which will actually make 

341 # the request. 

342 # gRPC handles serialization and deserialization, so we just need 

343 # to pass in the functions for each. 

344 if "split_read_stream" not in self._stubs: 

345 self._stubs["split_read_stream"] = self.grpc_channel.unary_unary( 

346 "/google.cloud.bigquery.storage.v1.BigQueryRead/SplitReadStream", 

347 request_serializer=storage.SplitReadStreamRequest.serialize, 

348 response_deserializer=storage.SplitReadStreamResponse.deserialize, 

349 ) 

350 return self._stubs["split_read_stream"] 

351 

352 def close(self): 

353 self.grpc_channel.close() 

354 

355 @property 

356 def kind(self) -> str: 

357 return "grpc" 

358 

359 

360__all__ = ("BigQueryReadGrpcTransport",)