Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.10/site-packages/google/pubsub_v1/services/publisher/transports/base.py: 60%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

84 statements  

1# -*- coding: utf-8 -*- 

2# Copyright 2024 Google LLC 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16import abc 

17from typing import Awaitable, Callable, Dict, Optional, Sequence, Union 

18 

19from google.pubsub_v1 import gapic_version as package_version 

20 

21import google.auth # type: ignore 

22import google.api_core 

23from google.api_core import exceptions as core_exceptions 

24from google.api_core import gapic_v1 

25from google.api_core import retry as retries 

26from google.auth import credentials as ga_credentials # type: ignore 

27from google.oauth2 import service_account # type: ignore 

28 

29from google.iam.v1 import iam_policy_pb2 # type: ignore 

30from google.iam.v1 import policy_pb2 # type: ignore 

31from google.protobuf import empty_pb2 # type: ignore 

32from google.pubsub_v1.types import pubsub 

33 

34DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo( 

35 client_library_version=package_version.__version__ 

36) 

37 

38 

39class PublisherTransport(abc.ABC): 

40 """Abstract transport class for Publisher.""" 

41 

42 AUTH_SCOPES = ( 

43 "https://www.googleapis.com/auth/cloud-platform", 

44 "https://www.googleapis.com/auth/pubsub", 

45 ) 

46 

47 DEFAULT_HOST: str = "pubsub.googleapis.com" 

48 

49 def __init__( 

50 self, 

51 *, 

52 host: str = DEFAULT_HOST, 

53 credentials: Optional[ga_credentials.Credentials] = None, 

54 credentials_file: Optional[str] = None, 

55 scopes: Optional[Sequence[str]] = None, 

56 quota_project_id: Optional[str] = None, 

57 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, 

58 always_use_jwt_access: Optional[bool] = False, 

59 api_audience: Optional[str] = None, 

60 **kwargs, 

61 ) -> None: 

62 """Instantiate the transport. 

63 

64 Args: 

65 host (Optional[str]): 

66 The hostname to connect to (default: 'pubsub.googleapis.com'). 

67 credentials (Optional[google.auth.credentials.Credentials]): The 

68 authorization credentials to attach to requests. These 

69 credentials identify the application to the service; if none 

70 are specified, the client will attempt to ascertain the 

71 credentials from the environment. 

72 credentials_file (Optional[str]): A file with credentials that can 

73 be loaded with :func:`google.auth.load_credentials_from_file`. 

74 This argument is mutually exclusive with credentials. 

75 scopes (Optional[Sequence[str]]): A list of scopes. 

76 quota_project_id (Optional[str]): An optional project to use for billing 

77 and quota. 

78 client_info (google.api_core.gapic_v1.client_info.ClientInfo): 

79 The client info used to send a user-agent string along with 

80 API requests. If ``None``, then default info will be used. 

81 Generally, you only need to set this if you're developing 

82 your own client library. 

83 always_use_jwt_access (Optional[bool]): Whether self signed JWT should 

84 be used for service account credentials. 

85 """ 

86 

87 scopes_kwargs = {"scopes": scopes, "default_scopes": self.AUTH_SCOPES} 

88 

89 # Save the scopes. 

90 self._scopes = scopes 

91 if not hasattr(self, "_ignore_credentials"): 

92 self._ignore_credentials: bool = False 

93 

94 # If no credentials are provided, then determine the appropriate 

95 # defaults. 

96 if credentials and credentials_file: 

97 raise core_exceptions.DuplicateCredentialArgs( 

98 "'credentials_file' and 'credentials' are mutually exclusive" 

99 ) 

100 

101 if credentials_file is not None: 

102 credentials, _ = google.auth.load_credentials_from_file( 

103 credentials_file, **scopes_kwargs, quota_project_id=quota_project_id 

104 ) 

105 elif credentials is None and not self._ignore_credentials: 

106 credentials, _ = google.auth.default( 

107 **scopes_kwargs, quota_project_id=quota_project_id 

108 ) 

109 # Don't apply audience if the credentials file passed from user. 

110 if hasattr(credentials, "with_gdch_audience"): 

111 credentials = credentials.with_gdch_audience( 

112 api_audience if api_audience else host 

113 ) 

114 

115 # If the credentials are service account credentials, then always try to use self signed JWT. 

116 if ( 

117 always_use_jwt_access 

118 and isinstance(credentials, service_account.Credentials) 

119 and hasattr(service_account.Credentials, "with_always_use_jwt_access") 

120 ): 

121 credentials = credentials.with_always_use_jwt_access(True) 

122 

123 # Save the credentials. 

124 self._credentials = credentials 

125 

126 # Save the hostname. Default to port 443 (HTTPS) if none is specified. 

127 if ":" not in host: 

128 host += ":443" 

129 self._host = host 

130 

131 @property 

132 def host(self): 

133 return self._host 

134 

135 def _prep_wrapped_messages(self, client_info): 

136 # Precompute the wrapped methods. 

137 self._wrapped_methods = { 

138 self.create_topic: gapic_v1.method.wrap_method( 

139 self.create_topic, 

140 default_retry=retries.Retry( 

141 initial=0.1, 

142 maximum=60.0, 

143 multiplier=1.3, 

144 predicate=retries.if_exception_type( 

145 core_exceptions.ServiceUnavailable, 

146 ), 

147 deadline=600.0, 

148 ), 

149 default_timeout=60.0, 

150 client_info=client_info, 

151 ), 

152 self.update_topic: gapic_v1.method.wrap_method( 

153 self.update_topic, 

154 default_retry=retries.Retry( 

155 initial=0.1, 

156 maximum=60.0, 

157 multiplier=1.3, 

158 predicate=retries.if_exception_type( 

159 core_exceptions.ServiceUnavailable, 

160 ), 

161 deadline=600.0, 

162 ), 

163 default_timeout=60.0, 

164 client_info=client_info, 

165 ), 

166 self.publish: gapic_v1.method.wrap_method( 

167 self.publish, 

168 default_retry=retries.Retry( 

169 initial=0.1, 

170 maximum=60.0, 

171 multiplier=4, 

172 predicate=retries.if_exception_type( 

173 core_exceptions.Aborted, 

174 core_exceptions.Cancelled, 

175 core_exceptions.DeadlineExceeded, 

176 core_exceptions.InternalServerError, 

177 core_exceptions.ResourceExhausted, 

178 core_exceptions.ServiceUnavailable, 

179 core_exceptions.Unknown, 

180 ), 

181 deadline=600.0, 

182 ), 

183 default_timeout=60.0, 

184 client_info=client_info, 

185 ), 

186 self.get_topic: gapic_v1.method.wrap_method( 

187 self.get_topic, 

188 default_retry=retries.Retry( 

189 initial=0.1, 

190 maximum=60.0, 

191 multiplier=1.3, 

192 predicate=retries.if_exception_type( 

193 core_exceptions.Aborted, 

194 core_exceptions.ServiceUnavailable, 

195 core_exceptions.Unknown, 

196 ), 

197 deadline=600.0, 

198 ), 

199 default_timeout=60.0, 

200 client_info=client_info, 

201 ), 

202 self.list_topics: gapic_v1.method.wrap_method( 

203 self.list_topics, 

204 default_retry=retries.Retry( 

205 initial=0.1, 

206 maximum=60.0, 

207 multiplier=1.3, 

208 predicate=retries.if_exception_type( 

209 core_exceptions.Aborted, 

210 core_exceptions.ServiceUnavailable, 

211 core_exceptions.Unknown, 

212 ), 

213 deadline=600.0, 

214 ), 

215 default_timeout=60.0, 

216 client_info=client_info, 

217 ), 

218 self.list_topic_subscriptions: gapic_v1.method.wrap_method( 

219 self.list_topic_subscriptions, 

220 default_retry=retries.Retry( 

221 initial=0.1, 

222 maximum=60.0, 

223 multiplier=1.3, 

224 predicate=retries.if_exception_type( 

225 core_exceptions.Aborted, 

226 core_exceptions.ServiceUnavailable, 

227 core_exceptions.Unknown, 

228 ), 

229 deadline=600.0, 

230 ), 

231 default_timeout=60.0, 

232 client_info=client_info, 

233 ), 

234 self.list_topic_snapshots: gapic_v1.method.wrap_method( 

235 self.list_topic_snapshots, 

236 default_retry=retries.Retry( 

237 initial=0.1, 

238 maximum=60.0, 

239 multiplier=1.3, 

240 predicate=retries.if_exception_type( 

241 core_exceptions.Aborted, 

242 core_exceptions.ServiceUnavailable, 

243 core_exceptions.Unknown, 

244 ), 

245 deadline=600.0, 

246 ), 

247 default_timeout=60.0, 

248 client_info=client_info, 

249 ), 

250 self.delete_topic: gapic_v1.method.wrap_method( 

251 self.delete_topic, 

252 default_retry=retries.Retry( 

253 initial=0.1, 

254 maximum=60.0, 

255 multiplier=1.3, 

256 predicate=retries.if_exception_type( 

257 core_exceptions.ServiceUnavailable, 

258 ), 

259 deadline=600.0, 

260 ), 

261 default_timeout=60.0, 

262 client_info=client_info, 

263 ), 

264 self.detach_subscription: gapic_v1.method.wrap_method( 

265 self.detach_subscription, 

266 default_retry=retries.Retry( 

267 initial=0.1, 

268 maximum=60.0, 

269 multiplier=1.3, 

270 predicate=retries.if_exception_type( 

271 core_exceptions.ServiceUnavailable, 

272 ), 

273 deadline=600.0, 

274 ), 

275 default_timeout=60.0, 

276 client_info=client_info, 

277 ), 

278 self.get_iam_policy: gapic_v1.method.wrap_method( 

279 self.get_iam_policy, 

280 default_timeout=None, 

281 client_info=client_info, 

282 ), 

283 self.set_iam_policy: gapic_v1.method.wrap_method( 

284 self.set_iam_policy, 

285 default_timeout=None, 

286 client_info=client_info, 

287 ), 

288 self.test_iam_permissions: gapic_v1.method.wrap_method( 

289 self.test_iam_permissions, 

290 default_timeout=None, 

291 client_info=client_info, 

292 ), 

293 } 

294 

295 def close(self): 

296 """Closes resources associated with the transport. 

297 

298 .. warning:: 

299 Only call this method if the transport is NOT shared 

300 with other clients - this may cause errors in other clients! 

301 """ 

302 raise NotImplementedError() 

303 

304 @property 

305 def create_topic( 

306 self, 

307 ) -> Callable[[pubsub.Topic], Union[pubsub.Topic, Awaitable[pubsub.Topic]]]: 

308 raise NotImplementedError() 

309 

310 @property 

311 def update_topic( 

312 self, 

313 ) -> Callable[ 

314 [pubsub.UpdateTopicRequest], Union[pubsub.Topic, Awaitable[pubsub.Topic]] 

315 ]: 

316 raise NotImplementedError() 

317 

318 @property 

319 def publish( 

320 self, 

321 ) -> Callable[ 

322 [pubsub.PublishRequest], 

323 Union[pubsub.PublishResponse, Awaitable[pubsub.PublishResponse]], 

324 ]: 

325 raise NotImplementedError() 

326 

327 @property 

328 def get_topic( 

329 self, 

330 ) -> Callable[ 

331 [pubsub.GetTopicRequest], Union[pubsub.Topic, Awaitable[pubsub.Topic]] 

332 ]: 

333 raise NotImplementedError() 

334 

335 @property 

336 def list_topics( 

337 self, 

338 ) -> Callable[ 

339 [pubsub.ListTopicsRequest], 

340 Union[pubsub.ListTopicsResponse, Awaitable[pubsub.ListTopicsResponse]], 

341 ]: 

342 raise NotImplementedError() 

343 

344 @property 

345 def list_topic_subscriptions( 

346 self, 

347 ) -> Callable[ 

348 [pubsub.ListTopicSubscriptionsRequest], 

349 Union[ 

350 pubsub.ListTopicSubscriptionsResponse, 

351 Awaitable[pubsub.ListTopicSubscriptionsResponse], 

352 ], 

353 ]: 

354 raise NotImplementedError() 

355 

356 @property 

357 def list_topic_snapshots( 

358 self, 

359 ) -> Callable[ 

360 [pubsub.ListTopicSnapshotsRequest], 

361 Union[ 

362 pubsub.ListTopicSnapshotsResponse, 

363 Awaitable[pubsub.ListTopicSnapshotsResponse], 

364 ], 

365 ]: 

366 raise NotImplementedError() 

367 

368 @property 

369 def delete_topic( 

370 self, 

371 ) -> Callable[ 

372 [pubsub.DeleteTopicRequest], Union[empty_pb2.Empty, Awaitable[empty_pb2.Empty]] 

373 ]: 

374 raise NotImplementedError() 

375 

376 @property 

377 def detach_subscription( 

378 self, 

379 ) -> Callable[ 

380 [pubsub.DetachSubscriptionRequest], 

381 Union[ 

382 pubsub.DetachSubscriptionResponse, 

383 Awaitable[pubsub.DetachSubscriptionResponse], 

384 ], 

385 ]: 

386 raise NotImplementedError() 

387 

388 @property 

389 def set_iam_policy( 

390 self, 

391 ) -> Callable[ 

392 [iam_policy_pb2.SetIamPolicyRequest], 

393 Union[policy_pb2.Policy, Awaitable[policy_pb2.Policy]], 

394 ]: 

395 raise NotImplementedError() 

396 

397 @property 

398 def get_iam_policy( 

399 self, 

400 ) -> Callable[ 

401 [iam_policy_pb2.GetIamPolicyRequest], 

402 Union[policy_pb2.Policy, Awaitable[policy_pb2.Policy]], 

403 ]: 

404 raise NotImplementedError() 

405 

406 @property 

407 def test_iam_permissions( 

408 self, 

409 ) -> Callable[ 

410 [iam_policy_pb2.TestIamPermissionsRequest], 

411 Union[ 

412 iam_policy_pb2.TestIamPermissionsResponse, 

413 Awaitable[iam_policy_pb2.TestIamPermissionsResponse], 

414 ], 

415 ]: 

416 raise NotImplementedError() 

417 

418 @property 

419 def kind(self) -> str: 

420 raise NotImplementedError() 

421 

422 

423__all__ = ("PublisherTransport",)