Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/pubsub_v1/services/publisher/transports/base.py: 61%

79 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:25 +0000

1# -*- coding: utf-8 -*- 

2# Copyright 2022 Google LLC 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16import abc 

17from typing import Awaitable, Callable, Dict, Optional, Sequence, Union 

18 

19from google.pubsub_v1 import gapic_version as package_version 

20 

21import google.auth # type: ignore 

22import google.api_core 

23from google.api_core import exceptions as core_exceptions 

24from google.api_core import gapic_v1 

25from google.api_core import retry as retries 

26from google.auth import credentials as ga_credentials # type: ignore 

27from google.oauth2 import service_account # type: ignore 

28 

29from google.iam.v1 import iam_policy_pb2 # type: ignore 

30from google.iam.v1 import policy_pb2 # type: ignore 

31from google.protobuf import empty_pb2 # type: ignore 

32from google.pubsub_v1.types import pubsub 

33 

34DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo( 

35 client_library_version=package_version.__version__ 

36) 

37 

38 

39class PublisherTransport(abc.ABC): 

40 """Abstract transport class for Publisher.""" 

41 

42 AUTH_SCOPES = ( 

43 "https://www.googleapis.com/auth/cloud-platform", 

44 "https://www.googleapis.com/auth/pubsub", 

45 ) 

46 

47 DEFAULT_HOST: str = "pubsub.googleapis.com" 

48 

49 def __init__( 

50 self, 

51 *, 

52 host: str = DEFAULT_HOST, 

53 credentials: Optional[ga_credentials.Credentials] = None, 

54 credentials_file: Optional[str] = None, 

55 scopes: Optional[Sequence[str]] = None, 

56 quota_project_id: Optional[str] = None, 

57 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, 

58 always_use_jwt_access: Optional[bool] = False, 

59 api_audience: Optional[str] = None, 

60 **kwargs, 

61 ) -> None: 

62 """Instantiate the transport. 

63 

64 Args: 

65 host (Optional[str]): 

66 The hostname to connect to. 

67 credentials (Optional[google.auth.credentials.Credentials]): The 

68 authorization credentials to attach to requests. These 

69 credentials identify the application to the service; if none 

70 are specified, the client will attempt to ascertain the 

71 credentials from the environment. 

72 credentials_file (Optional[str]): A file with credentials that can 

73 be loaded with :func:`google.auth.load_credentials_from_file`. 

74 This argument is mutually exclusive with credentials. 

75 scopes (Optional[Sequence[str]]): A list of scopes. 

76 quota_project_id (Optional[str]): An optional project to use for billing 

77 and quota. 

78 client_info (google.api_core.gapic_v1.client_info.ClientInfo): 

79 The client info used to send a user-agent string along with 

80 API requests. If ``None``, then default info will be used. 

81 Generally, you only need to set this if you're developing 

82 your own client library. 

83 always_use_jwt_access (Optional[bool]): Whether self signed JWT should 

84 be used for service account credentials. 

85 """ 

86 

87 scopes_kwargs = {"scopes": scopes, "default_scopes": self.AUTH_SCOPES} 

88 

89 # Save the scopes. 

90 self._scopes = scopes 

91 

92 # If no credentials are provided, then determine the appropriate 

93 # defaults. 

94 if credentials and credentials_file: 

95 raise core_exceptions.DuplicateCredentialArgs( 

96 "'credentials_file' and 'credentials' are mutually exclusive" 

97 ) 

98 

99 if credentials_file is not None: 

100 credentials, _ = google.auth.load_credentials_from_file( 

101 credentials_file, **scopes_kwargs, quota_project_id=quota_project_id 

102 ) 

103 elif credentials is None: 

104 credentials, _ = google.auth.default( 

105 **scopes_kwargs, quota_project_id=quota_project_id 

106 ) 

107 # Don't apply audience if the credentials file passed from user. 

108 if hasattr(credentials, "with_gdch_audience"): 

109 credentials = credentials.with_gdch_audience( 

110 api_audience if api_audience else host 

111 ) 

112 

113 # If the credentials are service account credentials, then always try to use self signed JWT. 

114 if ( 

115 always_use_jwt_access 

116 and isinstance(credentials, service_account.Credentials) 

117 and hasattr(service_account.Credentials, "with_always_use_jwt_access") 

118 ): 

119 credentials = credentials.with_always_use_jwt_access(True) 

120 

121 # Save the credentials. 

122 self._credentials = credentials 

123 

124 # Save the hostname. Default to port 443 (HTTPS) if none is specified. 

125 if ":" not in host: 

126 host += ":443" 

127 self._host = host 

128 

129 def _prep_wrapped_messages(self, client_info): 

130 # Precompute the wrapped methods. 

131 self._wrapped_methods = { 

132 self.create_topic: gapic_v1.method.wrap_method( 

133 self.create_topic, 

134 default_retry=retries.Retry( 

135 initial=0.1, 

136 maximum=60.0, 

137 multiplier=1.3, 

138 predicate=retries.if_exception_type( 

139 core_exceptions.ServiceUnavailable, 

140 ), 

141 deadline=600.0, 

142 ), 

143 default_timeout=60.0, 

144 client_info=client_info, 

145 ), 

146 self.update_topic: gapic_v1.method.wrap_method( 

147 self.update_topic, 

148 default_retry=retries.Retry( 

149 initial=0.1, 

150 maximum=60.0, 

151 multiplier=1.3, 

152 predicate=retries.if_exception_type( 

153 core_exceptions.ServiceUnavailable, 

154 ), 

155 deadline=600.0, 

156 ), 

157 default_timeout=60.0, 

158 client_info=client_info, 

159 ), 

160 self.publish: gapic_v1.method.wrap_method( 

161 self.publish, 

162 default_retry=retries.Retry( 

163 initial=0.1, 

164 maximum=60.0, 

165 multiplier=1.3, 

166 predicate=retries.if_exception_type( 

167 core_exceptions.Aborted, 

168 core_exceptions.Cancelled, 

169 core_exceptions.DeadlineExceeded, 

170 core_exceptions.InternalServerError, 

171 core_exceptions.ResourceExhausted, 

172 core_exceptions.ServiceUnavailable, 

173 core_exceptions.Unknown, 

174 ), 

175 deadline=600.0, 

176 ), 

177 default_timeout=60.0, 

178 client_info=client_info, 

179 ), 

180 self.get_topic: gapic_v1.method.wrap_method( 

181 self.get_topic, 

182 default_retry=retries.Retry( 

183 initial=0.1, 

184 maximum=60.0, 

185 multiplier=1.3, 

186 predicate=retries.if_exception_type( 

187 core_exceptions.Aborted, 

188 core_exceptions.ServiceUnavailable, 

189 core_exceptions.Unknown, 

190 ), 

191 deadline=600.0, 

192 ), 

193 default_timeout=60.0, 

194 client_info=client_info, 

195 ), 

196 self.list_topics: gapic_v1.method.wrap_method( 

197 self.list_topics, 

198 default_retry=retries.Retry( 

199 initial=0.1, 

200 maximum=60.0, 

201 multiplier=1.3, 

202 predicate=retries.if_exception_type( 

203 core_exceptions.Aborted, 

204 core_exceptions.ServiceUnavailable, 

205 core_exceptions.Unknown, 

206 ), 

207 deadline=600.0, 

208 ), 

209 default_timeout=60.0, 

210 client_info=client_info, 

211 ), 

212 self.list_topic_subscriptions: gapic_v1.method.wrap_method( 

213 self.list_topic_subscriptions, 

214 default_retry=retries.Retry( 

215 initial=0.1, 

216 maximum=60.0, 

217 multiplier=1.3, 

218 predicate=retries.if_exception_type( 

219 core_exceptions.Aborted, 

220 core_exceptions.ServiceUnavailable, 

221 core_exceptions.Unknown, 

222 ), 

223 deadline=600.0, 

224 ), 

225 default_timeout=60.0, 

226 client_info=client_info, 

227 ), 

228 self.list_topic_snapshots: gapic_v1.method.wrap_method( 

229 self.list_topic_snapshots, 

230 default_retry=retries.Retry( 

231 initial=0.1, 

232 maximum=60.0, 

233 multiplier=1.3, 

234 predicate=retries.if_exception_type( 

235 core_exceptions.Aborted, 

236 core_exceptions.ServiceUnavailable, 

237 core_exceptions.Unknown, 

238 ), 

239 deadline=600.0, 

240 ), 

241 default_timeout=60.0, 

242 client_info=client_info, 

243 ), 

244 self.delete_topic: gapic_v1.method.wrap_method( 

245 self.delete_topic, 

246 default_retry=retries.Retry( 

247 initial=0.1, 

248 maximum=60.0, 

249 multiplier=1.3, 

250 predicate=retries.if_exception_type( 

251 core_exceptions.ServiceUnavailable, 

252 ), 

253 deadline=600.0, 

254 ), 

255 default_timeout=60.0, 

256 client_info=client_info, 

257 ), 

258 self.detach_subscription: gapic_v1.method.wrap_method( 

259 self.detach_subscription, 

260 default_retry=retries.Retry( 

261 initial=0.1, 

262 maximum=60.0, 

263 multiplier=1.3, 

264 predicate=retries.if_exception_type( 

265 core_exceptions.ServiceUnavailable, 

266 ), 

267 deadline=600.0, 

268 ), 

269 default_timeout=60.0, 

270 client_info=client_info, 

271 ), 

272 } 

273 

274 def close(self): 

275 """Closes resources associated with the transport. 

276 

277 .. warning:: 

278 Only call this method if the transport is NOT shared 

279 with other clients - this may cause errors in other clients! 

280 """ 

281 raise NotImplementedError() 

282 

283 @property 

284 def create_topic( 

285 self, 

286 ) -> Callable[[pubsub.Topic], Union[pubsub.Topic, Awaitable[pubsub.Topic]]]: 

287 raise NotImplementedError() 

288 

289 @property 

290 def update_topic( 

291 self, 

292 ) -> Callable[ 

293 [pubsub.UpdateTopicRequest], Union[pubsub.Topic, Awaitable[pubsub.Topic]] 

294 ]: 

295 raise NotImplementedError() 

296 

297 @property 

298 def publish( 

299 self, 

300 ) -> Callable[ 

301 [pubsub.PublishRequest], 

302 Union[pubsub.PublishResponse, Awaitable[pubsub.PublishResponse]], 

303 ]: 

304 raise NotImplementedError() 

305 

306 @property 

307 def get_topic( 

308 self, 

309 ) -> Callable[ 

310 [pubsub.GetTopicRequest], Union[pubsub.Topic, Awaitable[pubsub.Topic]] 

311 ]: 

312 raise NotImplementedError() 

313 

314 @property 

315 def list_topics( 

316 self, 

317 ) -> Callable[ 

318 [pubsub.ListTopicsRequest], 

319 Union[pubsub.ListTopicsResponse, Awaitable[pubsub.ListTopicsResponse]], 

320 ]: 

321 raise NotImplementedError() 

322 

323 @property 

324 def list_topic_subscriptions( 

325 self, 

326 ) -> Callable[ 

327 [pubsub.ListTopicSubscriptionsRequest], 

328 Union[ 

329 pubsub.ListTopicSubscriptionsResponse, 

330 Awaitable[pubsub.ListTopicSubscriptionsResponse], 

331 ], 

332 ]: 

333 raise NotImplementedError() 

334 

335 @property 

336 def list_topic_snapshots( 

337 self, 

338 ) -> Callable[ 

339 [pubsub.ListTopicSnapshotsRequest], 

340 Union[ 

341 pubsub.ListTopicSnapshotsResponse, 

342 Awaitable[pubsub.ListTopicSnapshotsResponse], 

343 ], 

344 ]: 

345 raise NotImplementedError() 

346 

347 @property 

348 def delete_topic( 

349 self, 

350 ) -> Callable[ 

351 [pubsub.DeleteTopicRequest], Union[empty_pb2.Empty, Awaitable[empty_pb2.Empty]] 

352 ]: 

353 raise NotImplementedError() 

354 

355 @property 

356 def detach_subscription( 

357 self, 

358 ) -> Callable[ 

359 [pubsub.DetachSubscriptionRequest], 

360 Union[ 

361 pubsub.DetachSubscriptionResponse, 

362 Awaitable[pubsub.DetachSubscriptionResponse], 

363 ], 

364 ]: 

365 raise NotImplementedError() 

366 

367 @property 

368 def set_iam_policy( 

369 self, 

370 ) -> Callable[ 

371 [iam_policy_pb2.SetIamPolicyRequest], 

372 Union[policy_pb2.Policy, Awaitable[policy_pb2.Policy]], 

373 ]: 

374 raise NotImplementedError() 

375 

376 @property 

377 def get_iam_policy( 

378 self, 

379 ) -> Callable[ 

380 [iam_policy_pb2.GetIamPolicyRequest], 

381 Union[policy_pb2.Policy, Awaitable[policy_pb2.Policy]], 

382 ]: 

383 raise NotImplementedError() 

384 

385 @property 

386 def test_iam_permissions( 

387 self, 

388 ) -> Callable[ 

389 [iam_policy_pb2.TestIamPermissionsRequest], 

390 Union[ 

391 iam_policy_pb2.TestIamPermissionsResponse, 

392 Awaitable[iam_policy_pb2.TestIamPermissionsResponse], 

393 ], 

394 ]: 

395 raise NotImplementedError() 

396 

397 @property 

398 def kind(self) -> str: 

399 raise NotImplementedError() 

400 

401 

402__all__ = ("PublisherTransport",)