Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/pubsub_v1/services/publisher/transports/rest_base.py: 52%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

262 statements  

1# -*- coding: utf-8 -*- 

2# Copyright 2025 Google LLC 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16import json # type: ignore 

17from google.api_core import path_template 

18from google.api_core import gapic_v1 

19 

20from google.protobuf import json_format 

21from google.iam.v1 import iam_policy_pb2 # type: ignore 

22from google.iam.v1 import policy_pb2 # type: ignore 

23from .base import PublisherTransport, DEFAULT_CLIENT_INFO 

24 

25import re 

26from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union 

27 

28 

29from google.protobuf import empty_pb2 # type: ignore 

30from google.pubsub_v1.types import pubsub 

31 

32 

33class _BasePublisherRestTransport(PublisherTransport): 

34 """Base REST backend transport for Publisher. 

35 

36 Note: This class is not meant to be used directly. Use its sync and 

37 async sub-classes instead. 

38 

39 This class defines the same methods as the primary client, so the 

40 primary client can load the underlying transport implementation 

41 and call it. 

42 

43 It sends JSON representations of protocol buffers over HTTP/1.1 

44 """ 

45 

46 def __init__( 

47 self, 

48 *, 

49 host: str = "pubsub.googleapis.com", 

50 credentials: Optional[Any] = None, 

51 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, 

52 always_use_jwt_access: Optional[bool] = False, 

53 url_scheme: str = "https", 

54 api_audience: Optional[str] = None, 

55 ) -> None: 

56 """Instantiate the transport. 

57 Args: 

58 host (Optional[str]): 

59 The hostname to connect to (default: 'pubsub.googleapis.com'). 

60 credentials (Optional[Any]): The 

61 authorization credentials to attach to requests. These 

62 credentials identify the application to the service; if none 

63 are specified, the client will attempt to ascertain the 

64 credentials from the environment. 

65 client_info (google.api_core.gapic_v1.client_info.ClientInfo): 

66 The client info used to send a user-agent string along with 

67 API requests. If ``None``, then default info will be used. 

68 Generally, you only need to set this if you are developing 

69 your own client library. 

70 always_use_jwt_access (Optional[bool]): Whether self signed JWT should 

71 be used for service account credentials. 

72 url_scheme: the protocol scheme for the API endpoint. Normally 

73 "https", but for testing or local servers, 

74 "http" can be specified. 

75 """ 

76 # Run the base constructor 

77 maybe_url_match = re.match("^(?P<scheme>http(?:s)?://)?(?P<host>.*)$", host) 

78 if maybe_url_match is None: 

79 raise ValueError( 

80 f"Unexpected hostname structure: {host}" 

81 ) # pragma: NO COVER 

82 

83 url_match_items = maybe_url_match.groupdict() 

84 

85 host = f"{url_scheme}://{host}" if not url_match_items["scheme"] else host 

86 

87 super().__init__( 

88 host=host, 

89 credentials=credentials, 

90 client_info=client_info, 

91 always_use_jwt_access=always_use_jwt_access, 

92 api_audience=api_audience, 

93 ) 

94 

95 class _BaseCreateTopic: 

96 def __hash__(self): # pragma: NO COVER 

97 return NotImplementedError("__hash__ must be implemented.") 

98 

99 __REQUIRED_FIELDS_DEFAULT_VALUES: Dict[str, Any] = {} 

100 

101 @classmethod 

102 def _get_unset_required_fields(cls, message_dict): 

103 return { 

104 k: v 

105 for k, v in cls.__REQUIRED_FIELDS_DEFAULT_VALUES.items() 

106 if k not in message_dict 

107 } 

108 

109 @staticmethod 

110 def _get_http_options(): 

111 http_options: List[Dict[str, str]] = [ 

112 { 

113 "method": "put", 

114 "uri": "/v1/{name=projects/*/topics/*}", 

115 "body": "*", 

116 }, 

117 ] 

118 return http_options 

119 

120 @staticmethod 

121 def _get_transcoded_request(http_options, request): 

122 pb_request = pubsub.Topic.pb(request) 

123 transcoded_request = path_template.transcode(http_options, pb_request) 

124 return transcoded_request 

125 

126 @staticmethod 

127 def _get_request_body_json(transcoded_request): 

128 # Jsonify the request body 

129 

130 body = json_format.MessageToJson( 

131 transcoded_request["body"], use_integers_for_enums=True 

132 ) 

133 return body 

134 

135 @staticmethod 

136 def _get_query_params_json(transcoded_request): 

137 query_params = json.loads( 

138 json_format.MessageToJson( 

139 transcoded_request["query_params"], 

140 use_integers_for_enums=True, 

141 ) 

142 ) 

143 query_params.update( 

144 _BasePublisherRestTransport._BaseCreateTopic._get_unset_required_fields( 

145 query_params 

146 ) 

147 ) 

148 

149 query_params["$alt"] = "json;enum-encoding=int" 

150 return query_params 

151 

152 class _BaseDeleteTopic: 

153 def __hash__(self): # pragma: NO COVER 

154 return NotImplementedError("__hash__ must be implemented.") 

155 

156 __REQUIRED_FIELDS_DEFAULT_VALUES: Dict[str, Any] = {} 

157 

158 @classmethod 

159 def _get_unset_required_fields(cls, message_dict): 

160 return { 

161 k: v 

162 for k, v in cls.__REQUIRED_FIELDS_DEFAULT_VALUES.items() 

163 if k not in message_dict 

164 } 

165 

166 @staticmethod 

167 def _get_http_options(): 

168 http_options: List[Dict[str, str]] = [ 

169 { 

170 "method": "delete", 

171 "uri": "/v1/{topic=projects/*/topics/*}", 

172 }, 

173 ] 

174 return http_options 

175 

176 @staticmethod 

177 def _get_transcoded_request(http_options, request): 

178 pb_request = pubsub.DeleteTopicRequest.pb(request) 

179 transcoded_request = path_template.transcode(http_options, pb_request) 

180 return transcoded_request 

181 

182 @staticmethod 

183 def _get_query_params_json(transcoded_request): 

184 query_params = json.loads( 

185 json_format.MessageToJson( 

186 transcoded_request["query_params"], 

187 use_integers_for_enums=True, 

188 ) 

189 ) 

190 query_params.update( 

191 _BasePublisherRestTransport._BaseDeleteTopic._get_unset_required_fields( 

192 query_params 

193 ) 

194 ) 

195 

196 query_params["$alt"] = "json;enum-encoding=int" 

197 return query_params 

198 

199 class _BaseDetachSubscription: 

200 def __hash__(self): # pragma: NO COVER 

201 return NotImplementedError("__hash__ must be implemented.") 

202 

203 __REQUIRED_FIELDS_DEFAULT_VALUES: Dict[str, Any] = {} 

204 

205 @classmethod 

206 def _get_unset_required_fields(cls, message_dict): 

207 return { 

208 k: v 

209 for k, v in cls.__REQUIRED_FIELDS_DEFAULT_VALUES.items() 

210 if k not in message_dict 

211 } 

212 

213 @staticmethod 

214 def _get_http_options(): 

215 http_options: List[Dict[str, str]] = [ 

216 { 

217 "method": "post", 

218 "uri": "/v1/{subscription=projects/*/subscriptions/*}:detach", 

219 }, 

220 ] 

221 return http_options 

222 

223 @staticmethod 

224 def _get_transcoded_request(http_options, request): 

225 pb_request = pubsub.DetachSubscriptionRequest.pb(request) 

226 transcoded_request = path_template.transcode(http_options, pb_request) 

227 return transcoded_request 

228 

229 @staticmethod 

230 def _get_query_params_json(transcoded_request): 

231 query_params = json.loads( 

232 json_format.MessageToJson( 

233 transcoded_request["query_params"], 

234 use_integers_for_enums=True, 

235 ) 

236 ) 

237 query_params.update( 

238 _BasePublisherRestTransport._BaseDetachSubscription._get_unset_required_fields( 

239 query_params 

240 ) 

241 ) 

242 

243 query_params["$alt"] = "json;enum-encoding=int" 

244 return query_params 

245 

246 class _BaseGetTopic: 

247 def __hash__(self): # pragma: NO COVER 

248 return NotImplementedError("__hash__ must be implemented.") 

249 

250 __REQUIRED_FIELDS_DEFAULT_VALUES: Dict[str, Any] = {} 

251 

252 @classmethod 

253 def _get_unset_required_fields(cls, message_dict): 

254 return { 

255 k: v 

256 for k, v in cls.__REQUIRED_FIELDS_DEFAULT_VALUES.items() 

257 if k not in message_dict 

258 } 

259 

260 @staticmethod 

261 def _get_http_options(): 

262 http_options: List[Dict[str, str]] = [ 

263 { 

264 "method": "get", 

265 "uri": "/v1/{topic=projects/*/topics/*}", 

266 }, 

267 ] 

268 return http_options 

269 

270 @staticmethod 

271 def _get_transcoded_request(http_options, request): 

272 pb_request = pubsub.GetTopicRequest.pb(request) 

273 transcoded_request = path_template.transcode(http_options, pb_request) 

274 return transcoded_request 

275 

276 @staticmethod 

277 def _get_query_params_json(transcoded_request): 

278 query_params = json.loads( 

279 json_format.MessageToJson( 

280 transcoded_request["query_params"], 

281 use_integers_for_enums=True, 

282 ) 

283 ) 

284 query_params.update( 

285 _BasePublisherRestTransport._BaseGetTopic._get_unset_required_fields( 

286 query_params 

287 ) 

288 ) 

289 

290 query_params["$alt"] = "json;enum-encoding=int" 

291 return query_params 

292 

293 class _BaseListTopics: 

294 def __hash__(self): # pragma: NO COVER 

295 return NotImplementedError("__hash__ must be implemented.") 

296 

297 __REQUIRED_FIELDS_DEFAULT_VALUES: Dict[str, Any] = {} 

298 

299 @classmethod 

300 def _get_unset_required_fields(cls, message_dict): 

301 return { 

302 k: v 

303 for k, v in cls.__REQUIRED_FIELDS_DEFAULT_VALUES.items() 

304 if k not in message_dict 

305 } 

306 

307 @staticmethod 

308 def _get_http_options(): 

309 http_options: List[Dict[str, str]] = [ 

310 { 

311 "method": "get", 

312 "uri": "/v1/{project=projects/*}/topics", 

313 }, 

314 ] 

315 return http_options 

316 

317 @staticmethod 

318 def _get_transcoded_request(http_options, request): 

319 pb_request = pubsub.ListTopicsRequest.pb(request) 

320 transcoded_request = path_template.transcode(http_options, pb_request) 

321 return transcoded_request 

322 

323 @staticmethod 

324 def _get_query_params_json(transcoded_request): 

325 query_params = json.loads( 

326 json_format.MessageToJson( 

327 transcoded_request["query_params"], 

328 use_integers_for_enums=True, 

329 ) 

330 ) 

331 query_params.update( 

332 _BasePublisherRestTransport._BaseListTopics._get_unset_required_fields( 

333 query_params 

334 ) 

335 ) 

336 

337 query_params["$alt"] = "json;enum-encoding=int" 

338 return query_params 

339 

340 class _BaseListTopicSnapshots: 

341 def __hash__(self): # pragma: NO COVER 

342 return NotImplementedError("__hash__ must be implemented.") 

343 

344 __REQUIRED_FIELDS_DEFAULT_VALUES: Dict[str, Any] = {} 

345 

346 @classmethod 

347 def _get_unset_required_fields(cls, message_dict): 

348 return { 

349 k: v 

350 for k, v in cls.__REQUIRED_FIELDS_DEFAULT_VALUES.items() 

351 if k not in message_dict 

352 } 

353 

354 @staticmethod 

355 def _get_http_options(): 

356 http_options: List[Dict[str, str]] = [ 

357 { 

358 "method": "get", 

359 "uri": "/v1/{topic=projects/*/topics/*}/snapshots", 

360 }, 

361 ] 

362 return http_options 

363 

364 @staticmethod 

365 def _get_transcoded_request(http_options, request): 

366 pb_request = pubsub.ListTopicSnapshotsRequest.pb(request) 

367 transcoded_request = path_template.transcode(http_options, pb_request) 

368 return transcoded_request 

369 

370 @staticmethod 

371 def _get_query_params_json(transcoded_request): 

372 query_params = json.loads( 

373 json_format.MessageToJson( 

374 transcoded_request["query_params"], 

375 use_integers_for_enums=True, 

376 ) 

377 ) 

378 query_params.update( 

379 _BasePublisherRestTransport._BaseListTopicSnapshots._get_unset_required_fields( 

380 query_params 

381 ) 

382 ) 

383 

384 query_params["$alt"] = "json;enum-encoding=int" 

385 return query_params 

386 

387 class _BaseListTopicSubscriptions: 

388 def __hash__(self): # pragma: NO COVER 

389 return NotImplementedError("__hash__ must be implemented.") 

390 

391 __REQUIRED_FIELDS_DEFAULT_VALUES: Dict[str, Any] = {} 

392 

393 @classmethod 

394 def _get_unset_required_fields(cls, message_dict): 

395 return { 

396 k: v 

397 for k, v in cls.__REQUIRED_FIELDS_DEFAULT_VALUES.items() 

398 if k not in message_dict 

399 } 

400 

401 @staticmethod 

402 def _get_http_options(): 

403 http_options: List[Dict[str, str]] = [ 

404 { 

405 "method": "get", 

406 "uri": "/v1/{topic=projects/*/topics/*}/subscriptions", 

407 }, 

408 ] 

409 return http_options 

410 

411 @staticmethod 

412 def _get_transcoded_request(http_options, request): 

413 pb_request = pubsub.ListTopicSubscriptionsRequest.pb(request) 

414 transcoded_request = path_template.transcode(http_options, pb_request) 

415 return transcoded_request 

416 

417 @staticmethod 

418 def _get_query_params_json(transcoded_request): 

419 query_params = json.loads( 

420 json_format.MessageToJson( 

421 transcoded_request["query_params"], 

422 use_integers_for_enums=True, 

423 ) 

424 ) 

425 query_params.update( 

426 _BasePublisherRestTransport._BaseListTopicSubscriptions._get_unset_required_fields( 

427 query_params 

428 ) 

429 ) 

430 

431 query_params["$alt"] = "json;enum-encoding=int" 

432 return query_params 

433 

434 class _BasePublish: 

435 def __hash__(self): # pragma: NO COVER 

436 return NotImplementedError("__hash__ must be implemented.") 

437 

438 __REQUIRED_FIELDS_DEFAULT_VALUES: Dict[str, Any] = {} 

439 

440 @classmethod 

441 def _get_unset_required_fields(cls, message_dict): 

442 return { 

443 k: v 

444 for k, v in cls.__REQUIRED_FIELDS_DEFAULT_VALUES.items() 

445 if k not in message_dict 

446 } 

447 

448 @staticmethod 

449 def _get_http_options(): 

450 http_options: List[Dict[str, str]] = [ 

451 { 

452 "method": "post", 

453 "uri": "/v1/{topic=projects/*/topics/*}:publish", 

454 "body": "*", 

455 }, 

456 ] 

457 return http_options 

458 

459 @staticmethod 

460 def _get_transcoded_request(http_options, request): 

461 pb_request = pubsub.PublishRequest.pb(request) 

462 transcoded_request = path_template.transcode(http_options, pb_request) 

463 return transcoded_request 

464 

465 @staticmethod 

466 def _get_request_body_json(transcoded_request): 

467 # Jsonify the request body 

468 

469 body = json_format.MessageToJson( 

470 transcoded_request["body"], use_integers_for_enums=True 

471 ) 

472 return body 

473 

474 @staticmethod 

475 def _get_query_params_json(transcoded_request): 

476 query_params = json.loads( 

477 json_format.MessageToJson( 

478 transcoded_request["query_params"], 

479 use_integers_for_enums=True, 

480 ) 

481 ) 

482 query_params.update( 

483 _BasePublisherRestTransport._BasePublish._get_unset_required_fields( 

484 query_params 

485 ) 

486 ) 

487 

488 query_params["$alt"] = "json;enum-encoding=int" 

489 return query_params 

490 

491 class _BaseUpdateTopic: 

492 def __hash__(self): # pragma: NO COVER 

493 return NotImplementedError("__hash__ must be implemented.") 

494 

495 __REQUIRED_FIELDS_DEFAULT_VALUES: Dict[str, Any] = {} 

496 

497 @classmethod 

498 def _get_unset_required_fields(cls, message_dict): 

499 return { 

500 k: v 

501 for k, v in cls.__REQUIRED_FIELDS_DEFAULT_VALUES.items() 

502 if k not in message_dict 

503 } 

504 

505 @staticmethod 

506 def _get_http_options(): 

507 http_options: List[Dict[str, str]] = [ 

508 { 

509 "method": "patch", 

510 "uri": "/v1/{topic.name=projects/*/topics/*}", 

511 "body": "*", 

512 }, 

513 ] 

514 return http_options 

515 

516 @staticmethod 

517 def _get_transcoded_request(http_options, request): 

518 pb_request = pubsub.UpdateTopicRequest.pb(request) 

519 transcoded_request = path_template.transcode(http_options, pb_request) 

520 return transcoded_request 

521 

522 @staticmethod 

523 def _get_request_body_json(transcoded_request): 

524 # Jsonify the request body 

525 

526 body = json_format.MessageToJson( 

527 transcoded_request["body"], use_integers_for_enums=True 

528 ) 

529 return body 

530 

531 @staticmethod 

532 def _get_query_params_json(transcoded_request): 

533 query_params = json.loads( 

534 json_format.MessageToJson( 

535 transcoded_request["query_params"], 

536 use_integers_for_enums=True, 

537 ) 

538 ) 

539 query_params.update( 

540 _BasePublisherRestTransport._BaseUpdateTopic._get_unset_required_fields( 

541 query_params 

542 ) 

543 ) 

544 

545 query_params["$alt"] = "json;enum-encoding=int" 

546 return query_params 

547 

548 class _BaseGetIamPolicy: 

549 def __hash__(self): # pragma: NO COVER 

550 return NotImplementedError("__hash__ must be implemented.") 

551 

552 @staticmethod 

553 def _get_http_options(): 

554 http_options: List[Dict[str, str]] = [ 

555 { 

556 "method": "get", 

557 "uri": "/v1/{resource=projects/*/topics/*}:getIamPolicy", 

558 }, 

559 { 

560 "method": "get", 

561 "uri": "/v1/{resource=projects/*/subscriptions/*}:getIamPolicy", 

562 }, 

563 { 

564 "method": "get", 

565 "uri": "/v1/{resource=projects/*/snapshots/*}:getIamPolicy", 

566 }, 

567 { 

568 "method": "get", 

569 "uri": "/v1/{resource=projects/*/schemas/*}:getIamPolicy", 

570 }, 

571 ] 

572 return http_options 

573 

574 @staticmethod 

575 def _get_transcoded_request(http_options, request): 

576 request_kwargs = json_format.MessageToDict(request) 

577 transcoded_request = path_template.transcode(http_options, **request_kwargs) 

578 return transcoded_request 

579 

580 @staticmethod 

581 def _get_query_params_json(transcoded_request): 

582 query_params = json.loads(json.dumps(transcoded_request["query_params"])) 

583 return query_params 

584 

585 class _BaseSetIamPolicy: 

586 def __hash__(self): # pragma: NO COVER 

587 return NotImplementedError("__hash__ must be implemented.") 

588 

589 @staticmethod 

590 def _get_http_options(): 

591 http_options: List[Dict[str, str]] = [ 

592 { 

593 "method": "post", 

594 "uri": "/v1/{resource=projects/*/topics/*}:setIamPolicy", 

595 "body": "*", 

596 }, 

597 { 

598 "method": "post", 

599 "uri": "/v1/{resource=projects/*/subscriptions/*}:setIamPolicy", 

600 "body": "*", 

601 }, 

602 { 

603 "method": "post", 

604 "uri": "/v1/{resource=projects/*/snapshots/*}:setIamPolicy", 

605 "body": "*", 

606 }, 

607 { 

608 "method": "post", 

609 "uri": "/v1/{resource=projects/*/schemas/*}:setIamPolicy", 

610 "body": "*", 

611 }, 

612 ] 

613 return http_options 

614 

615 @staticmethod 

616 def _get_transcoded_request(http_options, request): 

617 request_kwargs = json_format.MessageToDict(request) 

618 transcoded_request = path_template.transcode(http_options, **request_kwargs) 

619 return transcoded_request 

620 

621 @staticmethod 

622 def _get_request_body_json(transcoded_request): 

623 body = json.dumps(transcoded_request["body"]) 

624 return body 

625 

626 @staticmethod 

627 def _get_query_params_json(transcoded_request): 

628 query_params = json.loads(json.dumps(transcoded_request["query_params"])) 

629 return query_params 

630 

631 class _BaseTestIamPermissions: 

632 def __hash__(self): # pragma: NO COVER 

633 return NotImplementedError("__hash__ must be implemented.") 

634 

635 @staticmethod 

636 def _get_http_options(): 

637 http_options: List[Dict[str, str]] = [ 

638 { 

639 "method": "post", 

640 "uri": "/v1/{resource=projects/*/subscriptions/*}:testIamPermissions", 

641 "body": "*", 

642 }, 

643 { 

644 "method": "post", 

645 "uri": "/v1/{resource=projects/*/topics/*}:testIamPermissions", 

646 "body": "*", 

647 }, 

648 { 

649 "method": "post", 

650 "uri": "/v1/{resource=projects/*/snapshots/*}:testIamPermissions", 

651 "body": "*", 

652 }, 

653 { 

654 "method": "post", 

655 "uri": "/v1/{resource=projects/*/schemas/*}:testIamPermissions", 

656 "body": "*", 

657 }, 

658 ] 

659 return http_options 

660 

661 @staticmethod 

662 def _get_transcoded_request(http_options, request): 

663 request_kwargs = json_format.MessageToDict(request) 

664 transcoded_request = path_template.transcode(http_options, **request_kwargs) 

665 return transcoded_request 

666 

667 @staticmethod 

668 def _get_request_body_json(transcoded_request): 

669 body = json.dumps(transcoded_request["body"]) 

670 return body 

671 

672 @staticmethod 

673 def _get_query_params_json(transcoded_request): 

674 query_params = json.loads(json.dumps(transcoded_request["query_params"])) 

675 return query_params 

676 

677 

678__all__ = ("_BasePublisherRestTransport",)