Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/cloud/bigquery_storage_v1/services/big_query_write/transports/base.py: 57%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

61 statements  

1# -*- coding: utf-8 -*- 

2# Copyright 2024 Google LLC 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16import abc 

17from typing import Awaitable, Callable, Dict, Optional, Sequence, Union 

18 

19import google.api_core 

20from google.api_core import exceptions as core_exceptions 

21from google.api_core import gapic_v1 

22from google.api_core import retry as retries 

23import google.auth # type: ignore 

24from google.auth import credentials as ga_credentials # type: ignore 

25from google.oauth2 import service_account # type: ignore 

26 

27from google.cloud.bigquery_storage_v1 import gapic_version as package_version 

28from google.cloud.bigquery_storage_v1.types import storage, stream 

29 

30DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo( 

31 gapic_version=package_version.__version__ 

32) 

33 

34 

35class BigQueryWriteTransport(abc.ABC): 

36 """Abstract transport class for BigQueryWrite.""" 

37 

38 AUTH_SCOPES = ( 

39 "https://www.googleapis.com/auth/bigquery", 

40 "https://www.googleapis.com/auth/bigquery.insertdata", 

41 "https://www.googleapis.com/auth/cloud-platform", 

42 ) 

43 

44 DEFAULT_HOST: str = "bigquerystorage.googleapis.com" 

45 

46 def __init__( 

47 self, 

48 *, 

49 host: str = DEFAULT_HOST, 

50 credentials: Optional[ga_credentials.Credentials] = None, 

51 credentials_file: Optional[str] = None, 

52 scopes: Optional[Sequence[str]] = None, 

53 quota_project_id: Optional[str] = None, 

54 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, 

55 always_use_jwt_access: Optional[bool] = False, 

56 api_audience: Optional[str] = None, 

57 **kwargs, 

58 ) -> None: 

59 """Instantiate the transport. 

60 

61 Args: 

62 host (Optional[str]): 

63 The hostname to connect to (default: 'bigquerystorage.googleapis.com'). 

64 credentials (Optional[google.auth.credentials.Credentials]): The 

65 authorization credentials to attach to requests. These 

66 credentials identify the application to the service; if none 

67 are specified, the client will attempt to ascertain the 

68 credentials from the environment. 

69 credentials_file (Optional[str]): A file with credentials that can 

70 be loaded with :func:`google.auth.load_credentials_from_file`. 

71 This argument is mutually exclusive with credentials. 

72 scopes (Optional[Sequence[str]]): A list of scopes. 

73 quota_project_id (Optional[str]): An optional project to use for billing 

74 and quota. 

75 client_info (google.api_core.gapic_v1.client_info.ClientInfo): 

76 The client info used to send a user-agent string along with 

77 API requests. If ``None``, then default info will be used. 

78 Generally, you only need to set this if you're developing 

79 your own client library. 

80 always_use_jwt_access (Optional[bool]): Whether self signed JWT should 

81 be used for service account credentials. 

82 """ 

83 

84 scopes_kwargs = {"scopes": scopes, "default_scopes": self.AUTH_SCOPES} 

85 

86 # Save the scopes. 

87 self._scopes = scopes 

88 

89 # If no credentials are provided, then determine the appropriate 

90 # defaults. 

91 if credentials and credentials_file: 

92 raise core_exceptions.DuplicateCredentialArgs( 

93 "'credentials_file' and 'credentials' are mutually exclusive" 

94 ) 

95 

96 if credentials_file is not None: 

97 credentials, _ = google.auth.load_credentials_from_file( 

98 credentials_file, **scopes_kwargs, quota_project_id=quota_project_id 

99 ) 

100 elif credentials is None: 

101 credentials, _ = google.auth.default( 

102 **scopes_kwargs, quota_project_id=quota_project_id 

103 ) 

104 # Don't apply audience if the credentials file passed from user. 

105 if hasattr(credentials, "with_gdch_audience"): 

106 credentials = credentials.with_gdch_audience( 

107 api_audience if api_audience else host 

108 ) 

109 

110 # If the credentials are service account credentials, then always try to use self signed JWT. 

111 if ( 

112 always_use_jwt_access 

113 and isinstance(credentials, service_account.Credentials) 

114 and hasattr(service_account.Credentials, "with_always_use_jwt_access") 

115 ): 

116 credentials = credentials.with_always_use_jwt_access(True) 

117 

118 # Save the credentials. 

119 self._credentials = credentials 

120 

121 # Save the hostname. Default to port 443 (HTTPS) if none is specified. 

122 if ":" not in host: 

123 host += ":443" 

124 self._host = host 

125 

126 @property 

127 def host(self): 

128 return self._host 

129 

130 def _prep_wrapped_messages(self, client_info): 

131 # Precompute the wrapped methods. 

132 self._wrapped_methods = { 

133 self.create_write_stream: gapic_v1.method.wrap_method( 

134 self.create_write_stream, 

135 default_retry=retries.Retry( 

136 initial=10.0, 

137 maximum=120.0, 

138 multiplier=1.3, 

139 predicate=retries.if_exception_type( 

140 core_exceptions.DeadlineExceeded, 

141 core_exceptions.ResourceExhausted, 

142 core_exceptions.ServiceUnavailable, 

143 ), 

144 deadline=1200.0, 

145 ), 

146 default_timeout=1200.0, 

147 client_info=client_info, 

148 ), 

149 self.append_rows: gapic_v1.method.wrap_method( 

150 self.append_rows, 

151 default_retry=retries.Retry( 

152 initial=0.1, 

153 maximum=60.0, 

154 multiplier=1.3, 

155 predicate=retries.if_exception_type( 

156 core_exceptions.ServiceUnavailable, 

157 ), 

158 deadline=86400.0, 

159 ), 

160 default_timeout=86400.0, 

161 client_info=client_info, 

162 ), 

163 self.get_write_stream: gapic_v1.method.wrap_method( 

164 self.get_write_stream, 

165 default_retry=retries.Retry( 

166 initial=0.1, 

167 maximum=60.0, 

168 multiplier=1.3, 

169 predicate=retries.if_exception_type( 

170 core_exceptions.DeadlineExceeded, 

171 core_exceptions.ResourceExhausted, 

172 core_exceptions.ServiceUnavailable, 

173 ), 

174 deadline=600.0, 

175 ), 

176 default_timeout=600.0, 

177 client_info=client_info, 

178 ), 

179 self.finalize_write_stream: gapic_v1.method.wrap_method( 

180 self.finalize_write_stream, 

181 default_retry=retries.Retry( 

182 initial=0.1, 

183 maximum=60.0, 

184 multiplier=1.3, 

185 predicate=retries.if_exception_type( 

186 core_exceptions.DeadlineExceeded, 

187 core_exceptions.ResourceExhausted, 

188 core_exceptions.ServiceUnavailable, 

189 ), 

190 deadline=600.0, 

191 ), 

192 default_timeout=600.0, 

193 client_info=client_info, 

194 ), 

195 self.batch_commit_write_streams: gapic_v1.method.wrap_method( 

196 self.batch_commit_write_streams, 

197 default_retry=retries.Retry( 

198 initial=0.1, 

199 maximum=60.0, 

200 multiplier=1.3, 

201 predicate=retries.if_exception_type( 

202 core_exceptions.DeadlineExceeded, 

203 core_exceptions.ResourceExhausted, 

204 core_exceptions.ServiceUnavailable, 

205 ), 

206 deadline=600.0, 

207 ), 

208 default_timeout=600.0, 

209 client_info=client_info, 

210 ), 

211 self.flush_rows: gapic_v1.method.wrap_method( 

212 self.flush_rows, 

213 default_retry=retries.Retry( 

214 initial=0.1, 

215 maximum=60.0, 

216 multiplier=1.3, 

217 predicate=retries.if_exception_type( 

218 core_exceptions.DeadlineExceeded, 

219 core_exceptions.ResourceExhausted, 

220 core_exceptions.ServiceUnavailable, 

221 ), 

222 deadline=600.0, 

223 ), 

224 default_timeout=600.0, 

225 client_info=client_info, 

226 ), 

227 } 

228 

229 def close(self): 

230 """Closes resources associated with the transport. 

231 

232 .. warning:: 

233 Only call this method if the transport is NOT shared 

234 with other clients - this may cause errors in other clients! 

235 """ 

236 raise NotImplementedError() 

237 

238 @property 

239 def create_write_stream( 

240 self, 

241 ) -> Callable[ 

242 [storage.CreateWriteStreamRequest], 

243 Union[stream.WriteStream, Awaitable[stream.WriteStream]], 

244 ]: 

245 raise NotImplementedError() 

246 

247 @property 

248 def append_rows( 

249 self, 

250 ) -> Callable[ 

251 [storage.AppendRowsRequest], 

252 Union[storage.AppendRowsResponse, Awaitable[storage.AppendRowsResponse]], 

253 ]: 

254 raise NotImplementedError() 

255 

256 @property 

257 def get_write_stream( 

258 self, 

259 ) -> Callable[ 

260 [storage.GetWriteStreamRequest], 

261 Union[stream.WriteStream, Awaitable[stream.WriteStream]], 

262 ]: 

263 raise NotImplementedError() 

264 

265 @property 

266 def finalize_write_stream( 

267 self, 

268 ) -> Callable[ 

269 [storage.FinalizeWriteStreamRequest], 

270 Union[ 

271 storage.FinalizeWriteStreamResponse, 

272 Awaitable[storage.FinalizeWriteStreamResponse], 

273 ], 

274 ]: 

275 raise NotImplementedError() 

276 

277 @property 

278 def batch_commit_write_streams( 

279 self, 

280 ) -> Callable[ 

281 [storage.BatchCommitWriteStreamsRequest], 

282 Union[ 

283 storage.BatchCommitWriteStreamsResponse, 

284 Awaitable[storage.BatchCommitWriteStreamsResponse], 

285 ], 

286 ]: 

287 raise NotImplementedError() 

288 

289 @property 

290 def flush_rows( 

291 self, 

292 ) -> Callable[ 

293 [storage.FlushRowsRequest], 

294 Union[storage.FlushRowsResponse, Awaitable[storage.FlushRowsResponse]], 

295 ]: 

296 raise NotImplementedError() 

297 

298 @property 

299 def kind(self) -> str: 

300 raise NotImplementedError() 

301 

302 

303__all__ = ("BigQueryWriteTransport",)