Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/cloud/bigquery_storage_v1/services/big_query_write/transports/base.py: 55%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

65 statements  

1# -*- coding: utf-8 -*- 

2# Copyright 2025 Google LLC 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16import abc 

17from typing import Awaitable, Callable, Dict, Optional, Sequence, Union 

18 

19import google.api_core 

20from google.api_core import exceptions as core_exceptions 

21from google.api_core import gapic_v1 

22from google.api_core import retry as retries 

23import google.auth # type: ignore 

24from google.auth import credentials as ga_credentials # type: ignore 

25from google.oauth2 import service_account # type: ignore 

26import google.protobuf 

27 

28from google.cloud.bigquery_storage_v1 import gapic_version as package_version 

29from google.cloud.bigquery_storage_v1.types import storage, stream 

30 

31DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo( 

32 gapic_version=package_version.__version__ 

33) 

34 

35if hasattr(DEFAULT_CLIENT_INFO, "protobuf_runtime_version"): # pragma: NO COVER 

36 DEFAULT_CLIENT_INFO.protobuf_runtime_version = google.protobuf.__version__ 

37 

38 

39class BigQueryWriteTransport(abc.ABC): 

40 """Abstract transport class for BigQueryWrite.""" 

41 

42 AUTH_SCOPES = ( 

43 "https://www.googleapis.com/auth/bigquery", 

44 "https://www.googleapis.com/auth/bigquery.insertdata", 

45 "https://www.googleapis.com/auth/cloud-platform", 

46 ) 

47 

48 DEFAULT_HOST: str = "bigquerystorage.googleapis.com" 

49 

50 def __init__( 

51 self, 

52 *, 

53 host: str = DEFAULT_HOST, 

54 credentials: Optional[ga_credentials.Credentials] = None, 

55 credentials_file: Optional[str] = None, 

56 scopes: Optional[Sequence[str]] = None, 

57 quota_project_id: Optional[str] = None, 

58 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, 

59 always_use_jwt_access: Optional[bool] = False, 

60 api_audience: Optional[str] = None, 

61 **kwargs, 

62 ) -> None: 

63 """Instantiate the transport. 

64 

65 Args: 

66 host (Optional[str]): 

67 The hostname to connect to (default: 'bigquerystorage.googleapis.com'). 

68 credentials (Optional[google.auth.credentials.Credentials]): The 

69 authorization credentials to attach to requests. These 

70 credentials identify the application to the service; if none 

71 are specified, the client will attempt to ascertain the 

72 credentials from the environment. 

73 credentials_file (Optional[str]): A file with credentials that can 

74 be loaded with :func:`google.auth.load_credentials_from_file`. 

75 This argument is mutually exclusive with credentials. 

76 scopes (Optional[Sequence[str]]): A list of scopes. 

77 quota_project_id (Optional[str]): An optional project to use for billing 

78 and quota. 

79 client_info (google.api_core.gapic_v1.client_info.ClientInfo): 

80 The client info used to send a user-agent string along with 

81 API requests. If ``None``, then default info will be used. 

82 Generally, you only need to set this if you're developing 

83 your own client library. 

84 always_use_jwt_access (Optional[bool]): Whether self signed JWT should 

85 be used for service account credentials. 

86 """ 

87 

88 scopes_kwargs = {"scopes": scopes, "default_scopes": self.AUTH_SCOPES} 

89 

90 # Save the scopes. 

91 self._scopes = scopes 

92 if not hasattr(self, "_ignore_credentials"): 

93 self._ignore_credentials: bool = False 

94 

95 # If no credentials are provided, then determine the appropriate 

96 # defaults. 

97 if credentials and credentials_file: 

98 raise core_exceptions.DuplicateCredentialArgs( 

99 "'credentials_file' and 'credentials' are mutually exclusive" 

100 ) 

101 

102 if credentials_file is not None: 

103 credentials, _ = google.auth.load_credentials_from_file( 

104 credentials_file, **scopes_kwargs, quota_project_id=quota_project_id 

105 ) 

106 elif credentials is None and not self._ignore_credentials: 

107 credentials, _ = google.auth.default( 

108 **scopes_kwargs, quota_project_id=quota_project_id 

109 ) 

110 # Don't apply audience if the credentials file passed from user. 

111 if hasattr(credentials, "with_gdch_audience"): 

112 credentials = credentials.with_gdch_audience( 

113 api_audience if api_audience else host 

114 ) 

115 

116 # If the credentials are service account credentials, then always try to use self signed JWT. 

117 if ( 

118 always_use_jwt_access 

119 and isinstance(credentials, service_account.Credentials) 

120 and hasattr(service_account.Credentials, "with_always_use_jwt_access") 

121 ): 

122 credentials = credentials.with_always_use_jwt_access(True) 

123 

124 # Save the credentials. 

125 self._credentials = credentials 

126 

127 # Save the hostname. Default to port 443 (HTTPS) if none is specified. 

128 if ":" not in host: 

129 host += ":443" 

130 self._host = host 

131 

132 @property 

133 def host(self): 

134 return self._host 

135 

136 def _prep_wrapped_messages(self, client_info): 

137 # Precompute the wrapped methods. 

138 self._wrapped_methods = { 

139 self.create_write_stream: gapic_v1.method.wrap_method( 

140 self.create_write_stream, 

141 default_retry=retries.Retry( 

142 initial=10.0, 

143 maximum=120.0, 

144 multiplier=1.3, 

145 predicate=retries.if_exception_type( 

146 core_exceptions.DeadlineExceeded, 

147 core_exceptions.ResourceExhausted, 

148 core_exceptions.ServiceUnavailable, 

149 ), 

150 deadline=1200.0, 

151 ), 

152 default_timeout=1200.0, 

153 client_info=client_info, 

154 ), 

155 self.append_rows: gapic_v1.method.wrap_method( 

156 self.append_rows, 

157 default_retry=retries.Retry( 

158 initial=0.1, 

159 maximum=60.0, 

160 multiplier=1.3, 

161 predicate=retries.if_exception_type( 

162 core_exceptions.ServiceUnavailable, 

163 ), 

164 deadline=86400.0, 

165 ), 

166 default_timeout=86400.0, 

167 client_info=client_info, 

168 ), 

169 self.get_write_stream: gapic_v1.method.wrap_method( 

170 self.get_write_stream, 

171 default_retry=retries.Retry( 

172 initial=0.1, 

173 maximum=60.0, 

174 multiplier=1.3, 

175 predicate=retries.if_exception_type( 

176 core_exceptions.DeadlineExceeded, 

177 core_exceptions.ResourceExhausted, 

178 core_exceptions.ServiceUnavailable, 

179 ), 

180 deadline=600.0, 

181 ), 

182 default_timeout=600.0, 

183 client_info=client_info, 

184 ), 

185 self.finalize_write_stream: gapic_v1.method.wrap_method( 

186 self.finalize_write_stream, 

187 default_retry=retries.Retry( 

188 initial=0.1, 

189 maximum=60.0, 

190 multiplier=1.3, 

191 predicate=retries.if_exception_type( 

192 core_exceptions.DeadlineExceeded, 

193 core_exceptions.ResourceExhausted, 

194 core_exceptions.ServiceUnavailable, 

195 ), 

196 deadline=600.0, 

197 ), 

198 default_timeout=600.0, 

199 client_info=client_info, 

200 ), 

201 self.batch_commit_write_streams: gapic_v1.method.wrap_method( 

202 self.batch_commit_write_streams, 

203 default_retry=retries.Retry( 

204 initial=0.1, 

205 maximum=60.0, 

206 multiplier=1.3, 

207 predicate=retries.if_exception_type( 

208 core_exceptions.DeadlineExceeded, 

209 core_exceptions.ResourceExhausted, 

210 core_exceptions.ServiceUnavailable, 

211 ), 

212 deadline=600.0, 

213 ), 

214 default_timeout=600.0, 

215 client_info=client_info, 

216 ), 

217 self.flush_rows: gapic_v1.method.wrap_method( 

218 self.flush_rows, 

219 default_retry=retries.Retry( 

220 initial=0.1, 

221 maximum=60.0, 

222 multiplier=1.3, 

223 predicate=retries.if_exception_type( 

224 core_exceptions.DeadlineExceeded, 

225 core_exceptions.ResourceExhausted, 

226 core_exceptions.ServiceUnavailable, 

227 ), 

228 deadline=600.0, 

229 ), 

230 default_timeout=600.0, 

231 client_info=client_info, 

232 ), 

233 } 

234 

235 def close(self): 

236 """Closes resources associated with the transport. 

237 

238 .. warning:: 

239 Only call this method if the transport is NOT shared 

240 with other clients - this may cause errors in other clients! 

241 """ 

242 raise NotImplementedError() 

243 

244 @property 

245 def create_write_stream( 

246 self, 

247 ) -> Callable[ 

248 [storage.CreateWriteStreamRequest], 

249 Union[stream.WriteStream, Awaitable[stream.WriteStream]], 

250 ]: 

251 raise NotImplementedError() 

252 

253 @property 

254 def append_rows( 

255 self, 

256 ) -> Callable[ 

257 [storage.AppendRowsRequest], 

258 Union[storage.AppendRowsResponse, Awaitable[storage.AppendRowsResponse]], 

259 ]: 

260 raise NotImplementedError() 

261 

262 @property 

263 def get_write_stream( 

264 self, 

265 ) -> Callable[ 

266 [storage.GetWriteStreamRequest], 

267 Union[stream.WriteStream, Awaitable[stream.WriteStream]], 

268 ]: 

269 raise NotImplementedError() 

270 

271 @property 

272 def finalize_write_stream( 

273 self, 

274 ) -> Callable[ 

275 [storage.FinalizeWriteStreamRequest], 

276 Union[ 

277 storage.FinalizeWriteStreamResponse, 

278 Awaitable[storage.FinalizeWriteStreamResponse], 

279 ], 

280 ]: 

281 raise NotImplementedError() 

282 

283 @property 

284 def batch_commit_write_streams( 

285 self, 

286 ) -> Callable[ 

287 [storage.BatchCommitWriteStreamsRequest], 

288 Union[ 

289 storage.BatchCommitWriteStreamsResponse, 

290 Awaitable[storage.BatchCommitWriteStreamsResponse], 

291 ], 

292 ]: 

293 raise NotImplementedError() 

294 

295 @property 

296 def flush_rows( 

297 self, 

298 ) -> Callable[ 

299 [storage.FlushRowsRequest], 

300 Union[storage.FlushRowsResponse, Awaitable[storage.FlushRowsResponse]], 

301 ]: 

302 raise NotImplementedError() 

303 

304 @property 

305 def kind(self) -> str: 

306 raise NotImplementedError() 

307 

308 

309__all__ = ("BigQueryWriteTransport",)