Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/pubsub_v1/services/publisher/transports/base.py: 59%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

86 statements  

1# -*- coding: utf-8 -*- 

2# Copyright 2025 Google LLC 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16import abc 

17from typing import Awaitable, Callable, Dict, Optional, Sequence, Union 

18 

19from google.pubsub_v1 import gapic_version as package_version 

20 

21import google.auth # type: ignore 

22import google.api_core 

23from google.api_core import exceptions as core_exceptions 

24from google.api_core import gapic_v1 

25from google.api_core import retry as retries 

26from google.auth import credentials as ga_credentials # type: ignore 

27from google.oauth2 import service_account # type: ignore 

28import google.protobuf 

29 

30from google.iam.v1 import iam_policy_pb2 # type: ignore 

31from google.iam.v1 import policy_pb2 # type: ignore 

32from google.protobuf import empty_pb2 # type: ignore 

33from google.pubsub_v1.types import pubsub 

34 

35DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo( 

36 client_library_version=package_version.__version__ 

37) 

38 

39if hasattr(DEFAULT_CLIENT_INFO, "protobuf_runtime_version"): # pragma: NO COVER 

40 DEFAULT_CLIENT_INFO.protobuf_runtime_version = google.protobuf.__version__ 

41 

42 

43class PublisherTransport(abc.ABC): 

44 """Abstract transport class for Publisher.""" 

45 

46 AUTH_SCOPES = ( 

47 "https://www.googleapis.com/auth/cloud-platform", 

48 "https://www.googleapis.com/auth/pubsub", 

49 ) 

50 

51 DEFAULT_HOST: str = "pubsub.googleapis.com" 

52 

53 def __init__( 

54 self, 

55 *, 

56 host: str = DEFAULT_HOST, 

57 credentials: Optional[ga_credentials.Credentials] = None, 

58 credentials_file: Optional[str] = None, 

59 scopes: Optional[Sequence[str]] = None, 

60 quota_project_id: Optional[str] = None, 

61 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, 

62 always_use_jwt_access: Optional[bool] = False, 

63 api_audience: Optional[str] = None, 

64 **kwargs, 

65 ) -> None: 

66 """Instantiate the transport. 

67 

68 Args: 

69 host (Optional[str]): 

70 The hostname to connect to (default: 'pubsub.googleapis.com'). 

71 credentials (Optional[google.auth.credentials.Credentials]): The 

72 authorization credentials to attach to requests. These 

73 credentials identify the application to the service; if none 

74 are specified, the client will attempt to ascertain the 

75 credentials from the environment. 

76 credentials_file (Optional[str]): Deprecated. A file with credentials that can 

77 be loaded with :func:`google.auth.load_credentials_from_file`. 

78 This argument is mutually exclusive with credentials. This argument will be 

79 removed in the next major version of this library. 

80 scopes (Optional[Sequence[str]]): A list of scopes. 

81 quota_project_id (Optional[str]): An optional project to use for billing 

82 and quota. 

83 client_info (google.api_core.gapic_v1.client_info.ClientInfo): 

84 The client info used to send a user-agent string along with 

85 API requests. If ``None``, then default info will be used. 

86 Generally, you only need to set this if you're developing 

87 your own client library. 

88 always_use_jwt_access (Optional[bool]): Whether self signed JWT should 

89 be used for service account credentials. 

90 """ 

91 

92 scopes_kwargs = {"scopes": scopes, "default_scopes": self.AUTH_SCOPES} 

93 

94 # Save the scopes. 

95 self._scopes = scopes 

96 if not hasattr(self, "_ignore_credentials"): 

97 self._ignore_credentials: bool = False 

98 

99 # If no credentials are provided, then determine the appropriate 

100 # defaults. 

101 if credentials and credentials_file: 

102 raise core_exceptions.DuplicateCredentialArgs( 

103 "'credentials_file' and 'credentials' are mutually exclusive" 

104 ) 

105 

106 if credentials_file is not None: 

107 credentials, _ = google.auth.load_credentials_from_file( 

108 credentials_file, **scopes_kwargs, quota_project_id=quota_project_id 

109 ) 

110 elif credentials is None and not self._ignore_credentials: 

111 credentials, _ = google.auth.default( 

112 **scopes_kwargs, quota_project_id=quota_project_id 

113 ) 

114 # Don't apply audience if the credentials file passed from user. 

115 if hasattr(credentials, "with_gdch_audience"): 

116 credentials = credentials.with_gdch_audience( 

117 api_audience if api_audience else host 

118 ) 

119 

120 # If the credentials are service account credentials, then always try to use self signed JWT. 

121 if ( 

122 always_use_jwt_access 

123 and isinstance(credentials, service_account.Credentials) 

124 and hasattr(service_account.Credentials, "with_always_use_jwt_access") 

125 ): 

126 credentials = credentials.with_always_use_jwt_access(True) 

127 

128 # Save the credentials. 

129 self._credentials = credentials 

130 

131 # Save the hostname. Default to port 443 (HTTPS) if none is specified. 

132 if ":" not in host: 

133 host += ":443" 

134 self._host = host 

135 

136 @property 

137 def host(self): 

138 return self._host 

139 

140 def _prep_wrapped_messages(self, client_info): 

141 # Precompute the wrapped methods. 

142 self._wrapped_methods = { 

143 self.create_topic: gapic_v1.method.wrap_method( 

144 self.create_topic, 

145 default_retry=retries.Retry( 

146 initial=0.1, 

147 maximum=60.0, 

148 multiplier=1.3, 

149 predicate=retries.if_exception_type( 

150 core_exceptions.ServiceUnavailable, 

151 ), 

152 deadline=600.0, 

153 ), 

154 default_timeout=60.0, 

155 client_info=client_info, 

156 ), 

157 self.update_topic: gapic_v1.method.wrap_method( 

158 self.update_topic, 

159 default_retry=retries.Retry( 

160 initial=0.1, 

161 maximum=60.0, 

162 multiplier=1.3, 

163 predicate=retries.if_exception_type( 

164 core_exceptions.ServiceUnavailable, 

165 ), 

166 deadline=600.0, 

167 ), 

168 default_timeout=60.0, 

169 client_info=client_info, 

170 ), 

171 self.publish: gapic_v1.method.wrap_method( 

172 self.publish, 

173 default_retry=retries.Retry( 

174 initial=0.1, 

175 maximum=60.0, 

176 multiplier=4, 

177 predicate=retries.if_exception_type( 

178 core_exceptions.Aborted, 

179 core_exceptions.Cancelled, 

180 core_exceptions.DeadlineExceeded, 

181 core_exceptions.InternalServerError, 

182 core_exceptions.ResourceExhausted, 

183 core_exceptions.ServiceUnavailable, 

184 core_exceptions.Unknown, 

185 ), 

186 deadline=600.0, 

187 ), 

188 default_timeout=60.0, 

189 client_info=client_info, 

190 ), 

191 self.get_topic: gapic_v1.method.wrap_method( 

192 self.get_topic, 

193 default_retry=retries.Retry( 

194 initial=0.1, 

195 maximum=60.0, 

196 multiplier=1.3, 

197 predicate=retries.if_exception_type( 

198 core_exceptions.Aborted, 

199 core_exceptions.ServiceUnavailable, 

200 core_exceptions.Unknown, 

201 ), 

202 deadline=600.0, 

203 ), 

204 default_timeout=60.0, 

205 client_info=client_info, 

206 ), 

207 self.list_topics: gapic_v1.method.wrap_method( 

208 self.list_topics, 

209 default_retry=retries.Retry( 

210 initial=0.1, 

211 maximum=60.0, 

212 multiplier=1.3, 

213 predicate=retries.if_exception_type( 

214 core_exceptions.Aborted, 

215 core_exceptions.ServiceUnavailable, 

216 core_exceptions.Unknown, 

217 ), 

218 deadline=600.0, 

219 ), 

220 default_timeout=60.0, 

221 client_info=client_info, 

222 ), 

223 self.list_topic_subscriptions: gapic_v1.method.wrap_method( 

224 self.list_topic_subscriptions, 

225 default_retry=retries.Retry( 

226 initial=0.1, 

227 maximum=60.0, 

228 multiplier=1.3, 

229 predicate=retries.if_exception_type( 

230 core_exceptions.Aborted, 

231 core_exceptions.ServiceUnavailable, 

232 core_exceptions.Unknown, 

233 ), 

234 deadline=600.0, 

235 ), 

236 default_timeout=60.0, 

237 client_info=client_info, 

238 ), 

239 self.list_topic_snapshots: gapic_v1.method.wrap_method( 

240 self.list_topic_snapshots, 

241 default_retry=retries.Retry( 

242 initial=0.1, 

243 maximum=60.0, 

244 multiplier=1.3, 

245 predicate=retries.if_exception_type( 

246 core_exceptions.Aborted, 

247 core_exceptions.ServiceUnavailable, 

248 core_exceptions.Unknown, 

249 ), 

250 deadline=600.0, 

251 ), 

252 default_timeout=60.0, 

253 client_info=client_info, 

254 ), 

255 self.delete_topic: gapic_v1.method.wrap_method( 

256 self.delete_topic, 

257 default_retry=retries.Retry( 

258 initial=0.1, 

259 maximum=60.0, 

260 multiplier=1.3, 

261 predicate=retries.if_exception_type( 

262 core_exceptions.ServiceUnavailable, 

263 ), 

264 deadline=600.0, 

265 ), 

266 default_timeout=60.0, 

267 client_info=client_info, 

268 ), 

269 self.detach_subscription: gapic_v1.method.wrap_method( 

270 self.detach_subscription, 

271 default_retry=retries.Retry( 

272 initial=0.1, 

273 maximum=60.0, 

274 multiplier=1.3, 

275 predicate=retries.if_exception_type( 

276 core_exceptions.ServiceUnavailable, 

277 ), 

278 deadline=600.0, 

279 ), 

280 default_timeout=60.0, 

281 client_info=client_info, 

282 ), 

283 self.get_iam_policy: gapic_v1.method.wrap_method( 

284 self.get_iam_policy, 

285 default_timeout=None, 

286 client_info=client_info, 

287 ), 

288 self.set_iam_policy: gapic_v1.method.wrap_method( 

289 self.set_iam_policy, 

290 default_timeout=None, 

291 client_info=client_info, 

292 ), 

293 self.test_iam_permissions: gapic_v1.method.wrap_method( 

294 self.test_iam_permissions, 

295 default_timeout=None, 

296 client_info=client_info, 

297 ), 

298 } 

299 

300 def close(self): 

301 """Closes resources associated with the transport. 

302 

303 .. warning:: 

304 Only call this method if the transport is NOT shared 

305 with other clients - this may cause errors in other clients! 

306 """ 

307 raise NotImplementedError() 

308 

309 @property 

310 def create_topic( 

311 self, 

312 ) -> Callable[[pubsub.Topic], Union[pubsub.Topic, Awaitable[pubsub.Topic]]]: 

313 raise NotImplementedError() 

314 

315 @property 

316 def update_topic( 

317 self, 

318 ) -> Callable[ 

319 [pubsub.UpdateTopicRequest], Union[pubsub.Topic, Awaitable[pubsub.Topic]] 

320 ]: 

321 raise NotImplementedError() 

322 

323 @property 

324 def publish( 

325 self, 

326 ) -> Callable[ 

327 [pubsub.PublishRequest], 

328 Union[pubsub.PublishResponse, Awaitable[pubsub.PublishResponse]], 

329 ]: 

330 raise NotImplementedError() 

331 

332 @property 

333 def get_topic( 

334 self, 

335 ) -> Callable[ 

336 [pubsub.GetTopicRequest], Union[pubsub.Topic, Awaitable[pubsub.Topic]] 

337 ]: 

338 raise NotImplementedError() 

339 

340 @property 

341 def list_topics( 

342 self, 

343 ) -> Callable[ 

344 [pubsub.ListTopicsRequest], 

345 Union[pubsub.ListTopicsResponse, Awaitable[pubsub.ListTopicsResponse]], 

346 ]: 

347 raise NotImplementedError() 

348 

349 @property 

350 def list_topic_subscriptions( 

351 self, 

352 ) -> Callable[ 

353 [pubsub.ListTopicSubscriptionsRequest], 

354 Union[ 

355 pubsub.ListTopicSubscriptionsResponse, 

356 Awaitable[pubsub.ListTopicSubscriptionsResponse], 

357 ], 

358 ]: 

359 raise NotImplementedError() 

360 

361 @property 

362 def list_topic_snapshots( 

363 self, 

364 ) -> Callable[ 

365 [pubsub.ListTopicSnapshotsRequest], 

366 Union[ 

367 pubsub.ListTopicSnapshotsResponse, 

368 Awaitable[pubsub.ListTopicSnapshotsResponse], 

369 ], 

370 ]: 

371 raise NotImplementedError() 

372 

373 @property 

374 def delete_topic( 

375 self, 

376 ) -> Callable[ 

377 [pubsub.DeleteTopicRequest], Union[empty_pb2.Empty, Awaitable[empty_pb2.Empty]] 

378 ]: 

379 raise NotImplementedError() 

380 

381 @property 

382 def detach_subscription( 

383 self, 

384 ) -> Callable[ 

385 [pubsub.DetachSubscriptionRequest], 

386 Union[ 

387 pubsub.DetachSubscriptionResponse, 

388 Awaitable[pubsub.DetachSubscriptionResponse], 

389 ], 

390 ]: 

391 raise NotImplementedError() 

392 

393 @property 

394 def set_iam_policy( 

395 self, 

396 ) -> Callable[ 

397 [iam_policy_pb2.SetIamPolicyRequest], 

398 Union[policy_pb2.Policy, Awaitable[policy_pb2.Policy]], 

399 ]: 

400 raise NotImplementedError() 

401 

402 @property 

403 def get_iam_policy( 

404 self, 

405 ) -> Callable[ 

406 [iam_policy_pb2.GetIamPolicyRequest], 

407 Union[policy_pb2.Policy, Awaitable[policy_pb2.Policy]], 

408 ]: 

409 raise NotImplementedError() 

410 

411 @property 

412 def test_iam_permissions( 

413 self, 

414 ) -> Callable[ 

415 [iam_policy_pb2.TestIamPermissionsRequest], 

416 Union[ 

417 iam_policy_pb2.TestIamPermissionsResponse, 

418 Awaitable[iam_policy_pb2.TestIamPermissionsResponse], 

419 ], 

420 ]: 

421 raise NotImplementedError() 

422 

423 @property 

424 def kind(self) -> str: 

425 raise NotImplementedError() 

426 

427 

428__all__ = ("PublisherTransport",)