Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/pubsub_v1/services/publisher/transports/base.py: 59%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

86 statements  

1# -*- coding: utf-8 -*- 

2# Copyright 2025 Google LLC 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16import abc 

17from typing import Awaitable, Callable, Dict, Optional, Sequence, Union 

18 

19from google.pubsub_v1 import gapic_version as package_version 

20 

21import google.auth # type: ignore 

22import google.api_core 

23from google.api_core import exceptions as core_exceptions 

24from google.api_core import gapic_v1 

25from google.api_core import retry as retries 

26from google.auth import credentials as ga_credentials # type: ignore 

27from google.oauth2 import service_account # type: ignore 

28import google.protobuf 

29 

30from google.iam.v1 import iam_policy_pb2 # type: ignore 

31from google.iam.v1 import policy_pb2 # type: ignore 

32from google.protobuf import empty_pb2 # type: ignore 

33from google.pubsub_v1.types import pubsub 

34 

35DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo( 

36 client_library_version=package_version.__version__ 

37) 

38 

39if hasattr(DEFAULT_CLIENT_INFO, "protobuf_runtime_version"): # pragma: NO COVER 

40 DEFAULT_CLIENT_INFO.protobuf_runtime_version = google.protobuf.__version__ 

41 

42 

43class PublisherTransport(abc.ABC): 

44 """Abstract transport class for Publisher.""" 

45 

46 AUTH_SCOPES = ( 

47 "https://www.googleapis.com/auth/cloud-platform", 

48 "https://www.googleapis.com/auth/pubsub", 

49 ) 

50 

51 DEFAULT_HOST: str = "pubsub.googleapis.com" 

52 

53 def __init__( 

54 self, 

55 *, 

56 host: str = DEFAULT_HOST, 

57 credentials: Optional[ga_credentials.Credentials] = None, 

58 credentials_file: Optional[str] = None, 

59 scopes: Optional[Sequence[str]] = None, 

60 quota_project_id: Optional[str] = None, 

61 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, 

62 always_use_jwt_access: Optional[bool] = False, 

63 api_audience: Optional[str] = None, 

64 **kwargs, 

65 ) -> None: 

66 """Instantiate the transport. 

67 

68 Args: 

69 host (Optional[str]): 

70 The hostname to connect to (default: 'pubsub.googleapis.com'). 

71 credentials (Optional[google.auth.credentials.Credentials]): The 

72 authorization credentials to attach to requests. These 

73 credentials identify the application to the service; if none 

74 are specified, the client will attempt to ascertain the 

75 credentials from the environment. 

76 credentials_file (Optional[str]): A file with credentials that can 

77 be loaded with :func:`google.auth.load_credentials_from_file`. 

78 This argument is mutually exclusive with credentials. 

79 scopes (Optional[Sequence[str]]): A list of scopes. 

80 quota_project_id (Optional[str]): An optional project to use for billing 

81 and quota. 

82 client_info (google.api_core.gapic_v1.client_info.ClientInfo): 

83 The client info used to send a user-agent string along with 

84 API requests. If ``None``, then default info will be used. 

85 Generally, you only need to set this if you're developing 

86 your own client library. 

87 always_use_jwt_access (Optional[bool]): Whether self signed JWT should 

88 be used for service account credentials. 

89 """ 

90 

91 scopes_kwargs = {"scopes": scopes, "default_scopes": self.AUTH_SCOPES} 

92 

93 # Save the scopes. 

94 self._scopes = scopes 

95 if not hasattr(self, "_ignore_credentials"): 

96 self._ignore_credentials: bool = False 

97 

98 # If no credentials are provided, then determine the appropriate 

99 # defaults. 

100 if credentials and credentials_file: 

101 raise core_exceptions.DuplicateCredentialArgs( 

102 "'credentials_file' and 'credentials' are mutually exclusive" 

103 ) 

104 

105 if credentials_file is not None: 

106 credentials, _ = google.auth.load_credentials_from_file( 

107 credentials_file, **scopes_kwargs, quota_project_id=quota_project_id 

108 ) 

109 elif credentials is None and not self._ignore_credentials: 

110 credentials, _ = google.auth.default( 

111 **scopes_kwargs, quota_project_id=quota_project_id 

112 ) 

113 # Don't apply audience if the credentials file passed from user. 

114 if hasattr(credentials, "with_gdch_audience"): 

115 credentials = credentials.with_gdch_audience( 

116 api_audience if api_audience else host 

117 ) 

118 

119 # If the credentials are service account credentials, then always try to use self signed JWT. 

120 if ( 

121 always_use_jwt_access 

122 and isinstance(credentials, service_account.Credentials) 

123 and hasattr(service_account.Credentials, "with_always_use_jwt_access") 

124 ): 

125 credentials = credentials.with_always_use_jwt_access(True) 

126 

127 # Save the credentials. 

128 self._credentials = credentials 

129 

130 # Save the hostname. Default to port 443 (HTTPS) if none is specified. 

131 if ":" not in host: 

132 host += ":443" 

133 self._host = host 

134 

135 @property 

136 def host(self): 

137 return self._host 

138 

139 def _prep_wrapped_messages(self, client_info): 

140 # Precompute the wrapped methods. 

141 self._wrapped_methods = { 

142 self.create_topic: gapic_v1.method.wrap_method( 

143 self.create_topic, 

144 default_retry=retries.Retry( 

145 initial=0.1, 

146 maximum=60.0, 

147 multiplier=1.3, 

148 predicate=retries.if_exception_type( 

149 core_exceptions.ServiceUnavailable, 

150 ), 

151 deadline=600.0, 

152 ), 

153 default_timeout=60.0, 

154 client_info=client_info, 

155 ), 

156 self.update_topic: gapic_v1.method.wrap_method( 

157 self.update_topic, 

158 default_retry=retries.Retry( 

159 initial=0.1, 

160 maximum=60.0, 

161 multiplier=1.3, 

162 predicate=retries.if_exception_type( 

163 core_exceptions.ServiceUnavailable, 

164 ), 

165 deadline=600.0, 

166 ), 

167 default_timeout=60.0, 

168 client_info=client_info, 

169 ), 

170 self.publish: gapic_v1.method.wrap_method( 

171 self.publish, 

172 default_retry=retries.Retry( 

173 initial=0.1, 

174 maximum=60.0, 

175 multiplier=4, 

176 predicate=retries.if_exception_type( 

177 core_exceptions.Aborted, 

178 core_exceptions.Cancelled, 

179 core_exceptions.DeadlineExceeded, 

180 core_exceptions.InternalServerError, 

181 core_exceptions.ResourceExhausted, 

182 core_exceptions.ServiceUnavailable, 

183 core_exceptions.Unknown, 

184 ), 

185 deadline=600.0, 

186 ), 

187 default_timeout=60.0, 

188 client_info=client_info, 

189 ), 

190 self.get_topic: gapic_v1.method.wrap_method( 

191 self.get_topic, 

192 default_retry=retries.Retry( 

193 initial=0.1, 

194 maximum=60.0, 

195 multiplier=1.3, 

196 predicate=retries.if_exception_type( 

197 core_exceptions.Aborted, 

198 core_exceptions.ServiceUnavailable, 

199 core_exceptions.Unknown, 

200 ), 

201 deadline=600.0, 

202 ), 

203 default_timeout=60.0, 

204 client_info=client_info, 

205 ), 

206 self.list_topics: gapic_v1.method.wrap_method( 

207 self.list_topics, 

208 default_retry=retries.Retry( 

209 initial=0.1, 

210 maximum=60.0, 

211 multiplier=1.3, 

212 predicate=retries.if_exception_type( 

213 core_exceptions.Aborted, 

214 core_exceptions.ServiceUnavailable, 

215 core_exceptions.Unknown, 

216 ), 

217 deadline=600.0, 

218 ), 

219 default_timeout=60.0, 

220 client_info=client_info, 

221 ), 

222 self.list_topic_subscriptions: gapic_v1.method.wrap_method( 

223 self.list_topic_subscriptions, 

224 default_retry=retries.Retry( 

225 initial=0.1, 

226 maximum=60.0, 

227 multiplier=1.3, 

228 predicate=retries.if_exception_type( 

229 core_exceptions.Aborted, 

230 core_exceptions.ServiceUnavailable, 

231 core_exceptions.Unknown, 

232 ), 

233 deadline=600.0, 

234 ), 

235 default_timeout=60.0, 

236 client_info=client_info, 

237 ), 

238 self.list_topic_snapshots: gapic_v1.method.wrap_method( 

239 self.list_topic_snapshots, 

240 default_retry=retries.Retry( 

241 initial=0.1, 

242 maximum=60.0, 

243 multiplier=1.3, 

244 predicate=retries.if_exception_type( 

245 core_exceptions.Aborted, 

246 core_exceptions.ServiceUnavailable, 

247 core_exceptions.Unknown, 

248 ), 

249 deadline=600.0, 

250 ), 

251 default_timeout=60.0, 

252 client_info=client_info, 

253 ), 

254 self.delete_topic: gapic_v1.method.wrap_method( 

255 self.delete_topic, 

256 default_retry=retries.Retry( 

257 initial=0.1, 

258 maximum=60.0, 

259 multiplier=1.3, 

260 predicate=retries.if_exception_type( 

261 core_exceptions.ServiceUnavailable, 

262 ), 

263 deadline=600.0, 

264 ), 

265 default_timeout=60.0, 

266 client_info=client_info, 

267 ), 

268 self.detach_subscription: gapic_v1.method.wrap_method( 

269 self.detach_subscription, 

270 default_retry=retries.Retry( 

271 initial=0.1, 

272 maximum=60.0, 

273 multiplier=1.3, 

274 predicate=retries.if_exception_type( 

275 core_exceptions.ServiceUnavailable, 

276 ), 

277 deadline=600.0, 

278 ), 

279 default_timeout=60.0, 

280 client_info=client_info, 

281 ), 

282 self.get_iam_policy: gapic_v1.method.wrap_method( 

283 self.get_iam_policy, 

284 default_timeout=None, 

285 client_info=client_info, 

286 ), 

287 self.set_iam_policy: gapic_v1.method.wrap_method( 

288 self.set_iam_policy, 

289 default_timeout=None, 

290 client_info=client_info, 

291 ), 

292 self.test_iam_permissions: gapic_v1.method.wrap_method( 

293 self.test_iam_permissions, 

294 default_timeout=None, 

295 client_info=client_info, 

296 ), 

297 } 

298 

299 def close(self): 

300 """Closes resources associated with the transport. 

301 

302 .. warning:: 

303 Only call this method if the transport is NOT shared 

304 with other clients - this may cause errors in other clients! 

305 """ 

306 raise NotImplementedError() 

307 

308 @property 

309 def create_topic( 

310 self, 

311 ) -> Callable[[pubsub.Topic], Union[pubsub.Topic, Awaitable[pubsub.Topic]]]: 

312 raise NotImplementedError() 

313 

314 @property 

315 def update_topic( 

316 self, 

317 ) -> Callable[ 

318 [pubsub.UpdateTopicRequest], Union[pubsub.Topic, Awaitable[pubsub.Topic]] 

319 ]: 

320 raise NotImplementedError() 

321 

322 @property 

323 def publish( 

324 self, 

325 ) -> Callable[ 

326 [pubsub.PublishRequest], 

327 Union[pubsub.PublishResponse, Awaitable[pubsub.PublishResponse]], 

328 ]: 

329 raise NotImplementedError() 

330 

331 @property 

332 def get_topic( 

333 self, 

334 ) -> Callable[ 

335 [pubsub.GetTopicRequest], Union[pubsub.Topic, Awaitable[pubsub.Topic]] 

336 ]: 

337 raise NotImplementedError() 

338 

339 @property 

340 def list_topics( 

341 self, 

342 ) -> Callable[ 

343 [pubsub.ListTopicsRequest], 

344 Union[pubsub.ListTopicsResponse, Awaitable[pubsub.ListTopicsResponse]], 

345 ]: 

346 raise NotImplementedError() 

347 

348 @property 

349 def list_topic_subscriptions( 

350 self, 

351 ) -> Callable[ 

352 [pubsub.ListTopicSubscriptionsRequest], 

353 Union[ 

354 pubsub.ListTopicSubscriptionsResponse, 

355 Awaitable[pubsub.ListTopicSubscriptionsResponse], 

356 ], 

357 ]: 

358 raise NotImplementedError() 

359 

360 @property 

361 def list_topic_snapshots( 

362 self, 

363 ) -> Callable[ 

364 [pubsub.ListTopicSnapshotsRequest], 

365 Union[ 

366 pubsub.ListTopicSnapshotsResponse, 

367 Awaitable[pubsub.ListTopicSnapshotsResponse], 

368 ], 

369 ]: 

370 raise NotImplementedError() 

371 

372 @property 

373 def delete_topic( 

374 self, 

375 ) -> Callable[ 

376 [pubsub.DeleteTopicRequest], Union[empty_pb2.Empty, Awaitable[empty_pb2.Empty]] 

377 ]: 

378 raise NotImplementedError() 

379 

380 @property 

381 def detach_subscription( 

382 self, 

383 ) -> Callable[ 

384 [pubsub.DetachSubscriptionRequest], 

385 Union[ 

386 pubsub.DetachSubscriptionResponse, 

387 Awaitable[pubsub.DetachSubscriptionResponse], 

388 ], 

389 ]: 

390 raise NotImplementedError() 

391 

392 @property 

393 def set_iam_policy( 

394 self, 

395 ) -> Callable[ 

396 [iam_policy_pb2.SetIamPolicyRequest], 

397 Union[policy_pb2.Policy, Awaitable[policy_pb2.Policy]], 

398 ]: 

399 raise NotImplementedError() 

400 

401 @property 

402 def get_iam_policy( 

403 self, 

404 ) -> Callable[ 

405 [iam_policy_pb2.GetIamPolicyRequest], 

406 Union[policy_pb2.Policy, Awaitable[policy_pb2.Policy]], 

407 ]: 

408 raise NotImplementedError() 

409 

410 @property 

411 def test_iam_permissions( 

412 self, 

413 ) -> Callable[ 

414 [iam_policy_pb2.TestIamPermissionsRequest], 

415 Union[ 

416 iam_policy_pb2.TestIamPermissionsResponse, 

417 Awaitable[iam_policy_pb2.TestIamPermissionsResponse], 

418 ], 

419 ]: 

420 raise NotImplementedError() 

421 

422 @property 

423 def kind(self) -> str: 

424 raise NotImplementedError() 

425 

426 

427__all__ = ("PublisherTransport",)