Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/cloud/tasks_v2/services/cloud_tasks/transports/rest_base.py: 51%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

412 statements  

1# -*- coding: utf-8 -*- 

2# Copyright 2025 Google LLC 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16import json # type: ignore 

17import re 

18from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union 

19 

20from google.api_core import gapic_v1, path_template 

21from google.cloud.location import locations_pb2 # type: ignore 

22from google.iam.v1 import iam_policy_pb2 # type: ignore 

23from google.iam.v1 import policy_pb2 # type: ignore 

24from google.protobuf import empty_pb2 # type: ignore 

25from google.protobuf import json_format 

26 

27from google.cloud.tasks_v2.types import cloudtasks 

28from google.cloud.tasks_v2.types import queue 

29from google.cloud.tasks_v2.types import queue as gct_queue 

30from google.cloud.tasks_v2.types import task 

31from google.cloud.tasks_v2.types import task as gct_task 

32 

33from .base import DEFAULT_CLIENT_INFO, CloudTasksTransport 

34 

35 

36class _BaseCloudTasksRestTransport(CloudTasksTransport): 

37 """Base REST backend transport for CloudTasks. 

38 

39 Note: This class is not meant to be used directly. Use its sync and 

40 async sub-classes instead. 

41 

42 This class defines the same methods as the primary client, so the 

43 primary client can load the underlying transport implementation 

44 and call it. 

45 

46 It sends JSON representations of protocol buffers over HTTP/1.1 

47 """ 

48 

49 def __init__( 

50 self, 

51 *, 

52 host: str = "cloudtasks.googleapis.com", 

53 credentials: Optional[Any] = None, 

54 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, 

55 always_use_jwt_access: Optional[bool] = False, 

56 url_scheme: str = "https", 

57 api_audience: Optional[str] = None, 

58 ) -> None: 

59 """Instantiate the transport. 

60 Args: 

61 host (Optional[str]): 

62 The hostname to connect to (default: 'cloudtasks.googleapis.com'). 

63 credentials (Optional[Any]): The 

64 authorization credentials to attach to requests. These 

65 credentials identify the application to the service; if none 

66 are specified, the client will attempt to ascertain the 

67 credentials from the environment. 

68 client_info (google.api_core.gapic_v1.client_info.ClientInfo): 

69 The client info used to send a user-agent string along with 

70 API requests. If ``None``, then default info will be used. 

71 Generally, you only need to set this if you are developing 

72 your own client library. 

73 always_use_jwt_access (Optional[bool]): Whether self signed JWT should 

74 be used for service account credentials. 

75 url_scheme: the protocol scheme for the API endpoint. Normally 

76 "https", but for testing or local servers, 

77 "http" can be specified. 

78 """ 

79 # Run the base constructor 

80 maybe_url_match = re.match("^(?P<scheme>http(?:s)?://)?(?P<host>.*)$", host) 

81 if maybe_url_match is None: 

82 raise ValueError( 

83 f"Unexpected hostname structure: {host}" 

84 ) # pragma: NO COVER 

85 

86 url_match_items = maybe_url_match.groupdict() 

87 

88 host = f"{url_scheme}://{host}" if not url_match_items["scheme"] else host 

89 

90 super().__init__( 

91 host=host, 

92 credentials=credentials, 

93 client_info=client_info, 

94 always_use_jwt_access=always_use_jwt_access, 

95 api_audience=api_audience, 

96 ) 

97 

98 class _BaseCreateQueue: 

99 def __hash__(self): # pragma: NO COVER 

100 return NotImplementedError("__hash__ must be implemented.") 

101 

102 __REQUIRED_FIELDS_DEFAULT_VALUES: Dict[str, Any] = {} 

103 

104 @classmethod 

105 def _get_unset_required_fields(cls, message_dict): 

106 return { 

107 k: v 

108 for k, v in cls.__REQUIRED_FIELDS_DEFAULT_VALUES.items() 

109 if k not in message_dict 

110 } 

111 

112 @staticmethod 

113 def _get_http_options(): 

114 http_options: List[Dict[str, str]] = [ 

115 { 

116 "method": "post", 

117 "uri": "/v2/{parent=projects/*/locations/*}/queues", 

118 "body": "queue", 

119 }, 

120 ] 

121 return http_options 

122 

123 @staticmethod 

124 def _get_transcoded_request(http_options, request): 

125 pb_request = cloudtasks.CreateQueueRequest.pb(request) 

126 transcoded_request = path_template.transcode(http_options, pb_request) 

127 return transcoded_request 

128 

129 @staticmethod 

130 def _get_request_body_json(transcoded_request): 

131 # Jsonify the request body 

132 

133 body = json_format.MessageToJson( 

134 transcoded_request["body"], use_integers_for_enums=True 

135 ) 

136 return body 

137 

138 @staticmethod 

139 def _get_query_params_json(transcoded_request): 

140 query_params = json.loads( 

141 json_format.MessageToJson( 

142 transcoded_request["query_params"], 

143 use_integers_for_enums=True, 

144 ) 

145 ) 

146 query_params.update( 

147 _BaseCloudTasksRestTransport._BaseCreateQueue._get_unset_required_fields( 

148 query_params 

149 ) 

150 ) 

151 

152 query_params["$alt"] = "json;enum-encoding=int" 

153 return query_params 

154 

155 class _BaseCreateTask: 

156 def __hash__(self): # pragma: NO COVER 

157 return NotImplementedError("__hash__ must be implemented.") 

158 

159 __REQUIRED_FIELDS_DEFAULT_VALUES: Dict[str, Any] = {} 

160 

161 @classmethod 

162 def _get_unset_required_fields(cls, message_dict): 

163 return { 

164 k: v 

165 for k, v in cls.__REQUIRED_FIELDS_DEFAULT_VALUES.items() 

166 if k not in message_dict 

167 } 

168 

169 @staticmethod 

170 def _get_http_options(): 

171 http_options: List[Dict[str, str]] = [ 

172 { 

173 "method": "post", 

174 "uri": "/v2/{parent=projects/*/locations/*/queues/*}/tasks", 

175 "body": "*", 

176 }, 

177 ] 

178 return http_options 

179 

180 @staticmethod 

181 def _get_transcoded_request(http_options, request): 

182 pb_request = cloudtasks.CreateTaskRequest.pb(request) 

183 transcoded_request = path_template.transcode(http_options, pb_request) 

184 return transcoded_request 

185 

186 @staticmethod 

187 def _get_request_body_json(transcoded_request): 

188 # Jsonify the request body 

189 

190 body = json_format.MessageToJson( 

191 transcoded_request["body"], use_integers_for_enums=True 

192 ) 

193 return body 

194 

195 @staticmethod 

196 def _get_query_params_json(transcoded_request): 

197 query_params = json.loads( 

198 json_format.MessageToJson( 

199 transcoded_request["query_params"], 

200 use_integers_for_enums=True, 

201 ) 

202 ) 

203 query_params.update( 

204 _BaseCloudTasksRestTransport._BaseCreateTask._get_unset_required_fields( 

205 query_params 

206 ) 

207 ) 

208 

209 query_params["$alt"] = "json;enum-encoding=int" 

210 return query_params 

211 

212 class _BaseDeleteQueue: 

213 def __hash__(self): # pragma: NO COVER 

214 return NotImplementedError("__hash__ must be implemented.") 

215 

216 __REQUIRED_FIELDS_DEFAULT_VALUES: Dict[str, Any] = {} 

217 

218 @classmethod 

219 def _get_unset_required_fields(cls, message_dict): 

220 return { 

221 k: v 

222 for k, v in cls.__REQUIRED_FIELDS_DEFAULT_VALUES.items() 

223 if k not in message_dict 

224 } 

225 

226 @staticmethod 

227 def _get_http_options(): 

228 http_options: List[Dict[str, str]] = [ 

229 { 

230 "method": "delete", 

231 "uri": "/v2/{name=projects/*/locations/*/queues/*}", 

232 }, 

233 ] 

234 return http_options 

235 

236 @staticmethod 

237 def _get_transcoded_request(http_options, request): 

238 pb_request = cloudtasks.DeleteQueueRequest.pb(request) 

239 transcoded_request = path_template.transcode(http_options, pb_request) 

240 return transcoded_request 

241 

242 @staticmethod 

243 def _get_query_params_json(transcoded_request): 

244 query_params = json.loads( 

245 json_format.MessageToJson( 

246 transcoded_request["query_params"], 

247 use_integers_for_enums=True, 

248 ) 

249 ) 

250 query_params.update( 

251 _BaseCloudTasksRestTransport._BaseDeleteQueue._get_unset_required_fields( 

252 query_params 

253 ) 

254 ) 

255 

256 query_params["$alt"] = "json;enum-encoding=int" 

257 return query_params 

258 

259 class _BaseDeleteTask: 

260 def __hash__(self): # pragma: NO COVER 

261 return NotImplementedError("__hash__ must be implemented.") 

262 

263 __REQUIRED_FIELDS_DEFAULT_VALUES: Dict[str, Any] = {} 

264 

265 @classmethod 

266 def _get_unset_required_fields(cls, message_dict): 

267 return { 

268 k: v 

269 for k, v in cls.__REQUIRED_FIELDS_DEFAULT_VALUES.items() 

270 if k not in message_dict 

271 } 

272 

273 @staticmethod 

274 def _get_http_options(): 

275 http_options: List[Dict[str, str]] = [ 

276 { 

277 "method": "delete", 

278 "uri": "/v2/{name=projects/*/locations/*/queues/*/tasks/*}", 

279 }, 

280 ] 

281 return http_options 

282 

283 @staticmethod 

284 def _get_transcoded_request(http_options, request): 

285 pb_request = cloudtasks.DeleteTaskRequest.pb(request) 

286 transcoded_request = path_template.transcode(http_options, pb_request) 

287 return transcoded_request 

288 

289 @staticmethod 

290 def _get_query_params_json(transcoded_request): 

291 query_params = json.loads( 

292 json_format.MessageToJson( 

293 transcoded_request["query_params"], 

294 use_integers_for_enums=True, 

295 ) 

296 ) 

297 query_params.update( 

298 _BaseCloudTasksRestTransport._BaseDeleteTask._get_unset_required_fields( 

299 query_params 

300 ) 

301 ) 

302 

303 query_params["$alt"] = "json;enum-encoding=int" 

304 return query_params 

305 

306 class _BaseGetIamPolicy: 

307 def __hash__(self): # pragma: NO COVER 

308 return NotImplementedError("__hash__ must be implemented.") 

309 

310 __REQUIRED_FIELDS_DEFAULT_VALUES: Dict[str, Any] = {} 

311 

312 @classmethod 

313 def _get_unset_required_fields(cls, message_dict): 

314 return { 

315 k: v 

316 for k, v in cls.__REQUIRED_FIELDS_DEFAULT_VALUES.items() 

317 if k not in message_dict 

318 } 

319 

320 @staticmethod 

321 def _get_http_options(): 

322 http_options: List[Dict[str, str]] = [ 

323 { 

324 "method": "post", 

325 "uri": "/v2/{resource=projects/*/locations/*/queues/*}:getIamPolicy", 

326 "body": "*", 

327 }, 

328 ] 

329 return http_options 

330 

331 @staticmethod 

332 def _get_transcoded_request(http_options, request): 

333 pb_request = request 

334 transcoded_request = path_template.transcode(http_options, pb_request) 

335 return transcoded_request 

336 

337 @staticmethod 

338 def _get_request_body_json(transcoded_request): 

339 # Jsonify the request body 

340 

341 body = json_format.MessageToJson( 

342 transcoded_request["body"], use_integers_for_enums=True 

343 ) 

344 return body 

345 

346 @staticmethod 

347 def _get_query_params_json(transcoded_request): 

348 query_params = json.loads( 

349 json_format.MessageToJson( 

350 transcoded_request["query_params"], 

351 use_integers_for_enums=True, 

352 ) 

353 ) 

354 query_params.update( 

355 _BaseCloudTasksRestTransport._BaseGetIamPolicy._get_unset_required_fields( 

356 query_params 

357 ) 

358 ) 

359 

360 query_params["$alt"] = "json;enum-encoding=int" 

361 return query_params 

362 

363 class _BaseGetQueue: 

364 def __hash__(self): # pragma: NO COVER 

365 return NotImplementedError("__hash__ must be implemented.") 

366 

367 __REQUIRED_FIELDS_DEFAULT_VALUES: Dict[str, Any] = {} 

368 

369 @classmethod 

370 def _get_unset_required_fields(cls, message_dict): 

371 return { 

372 k: v 

373 for k, v in cls.__REQUIRED_FIELDS_DEFAULT_VALUES.items() 

374 if k not in message_dict 

375 } 

376 

377 @staticmethod 

378 def _get_http_options(): 

379 http_options: List[Dict[str, str]] = [ 

380 { 

381 "method": "get", 

382 "uri": "/v2/{name=projects/*/locations/*/queues/*}", 

383 }, 

384 ] 

385 return http_options 

386 

387 @staticmethod 

388 def _get_transcoded_request(http_options, request): 

389 pb_request = cloudtasks.GetQueueRequest.pb(request) 

390 transcoded_request = path_template.transcode(http_options, pb_request) 

391 return transcoded_request 

392 

393 @staticmethod 

394 def _get_query_params_json(transcoded_request): 

395 query_params = json.loads( 

396 json_format.MessageToJson( 

397 transcoded_request["query_params"], 

398 use_integers_for_enums=True, 

399 ) 

400 ) 

401 query_params.update( 

402 _BaseCloudTasksRestTransport._BaseGetQueue._get_unset_required_fields( 

403 query_params 

404 ) 

405 ) 

406 

407 query_params["$alt"] = "json;enum-encoding=int" 

408 return query_params 

409 

410 class _BaseGetTask: 

411 def __hash__(self): # pragma: NO COVER 

412 return NotImplementedError("__hash__ must be implemented.") 

413 

414 __REQUIRED_FIELDS_DEFAULT_VALUES: Dict[str, Any] = {} 

415 

416 @classmethod 

417 def _get_unset_required_fields(cls, message_dict): 

418 return { 

419 k: v 

420 for k, v in cls.__REQUIRED_FIELDS_DEFAULT_VALUES.items() 

421 if k not in message_dict 

422 } 

423 

424 @staticmethod 

425 def _get_http_options(): 

426 http_options: List[Dict[str, str]] = [ 

427 { 

428 "method": "get", 

429 "uri": "/v2/{name=projects/*/locations/*/queues/*/tasks/*}", 

430 }, 

431 ] 

432 return http_options 

433 

434 @staticmethod 

435 def _get_transcoded_request(http_options, request): 

436 pb_request = cloudtasks.GetTaskRequest.pb(request) 

437 transcoded_request = path_template.transcode(http_options, pb_request) 

438 return transcoded_request 

439 

440 @staticmethod 

441 def _get_query_params_json(transcoded_request): 

442 query_params = json.loads( 

443 json_format.MessageToJson( 

444 transcoded_request["query_params"], 

445 use_integers_for_enums=True, 

446 ) 

447 ) 

448 query_params.update( 

449 _BaseCloudTasksRestTransport._BaseGetTask._get_unset_required_fields( 

450 query_params 

451 ) 

452 ) 

453 

454 query_params["$alt"] = "json;enum-encoding=int" 

455 return query_params 

456 

457 class _BaseListQueues: 

458 def __hash__(self): # pragma: NO COVER 

459 return NotImplementedError("__hash__ must be implemented.") 

460 

461 __REQUIRED_FIELDS_DEFAULT_VALUES: Dict[str, Any] = {} 

462 

463 @classmethod 

464 def _get_unset_required_fields(cls, message_dict): 

465 return { 

466 k: v 

467 for k, v in cls.__REQUIRED_FIELDS_DEFAULT_VALUES.items() 

468 if k not in message_dict 

469 } 

470 

471 @staticmethod 

472 def _get_http_options(): 

473 http_options: List[Dict[str, str]] = [ 

474 { 

475 "method": "get", 

476 "uri": "/v2/{parent=projects/*/locations/*}/queues", 

477 }, 

478 ] 

479 return http_options 

480 

481 @staticmethod 

482 def _get_transcoded_request(http_options, request): 

483 pb_request = cloudtasks.ListQueuesRequest.pb(request) 

484 transcoded_request = path_template.transcode(http_options, pb_request) 

485 return transcoded_request 

486 

487 @staticmethod 

488 def _get_query_params_json(transcoded_request): 

489 query_params = json.loads( 

490 json_format.MessageToJson( 

491 transcoded_request["query_params"], 

492 use_integers_for_enums=True, 

493 ) 

494 ) 

495 query_params.update( 

496 _BaseCloudTasksRestTransport._BaseListQueues._get_unset_required_fields( 

497 query_params 

498 ) 

499 ) 

500 

501 query_params["$alt"] = "json;enum-encoding=int" 

502 return query_params 

503 

504 class _BaseListTasks: 

505 def __hash__(self): # pragma: NO COVER 

506 return NotImplementedError("__hash__ must be implemented.") 

507 

508 __REQUIRED_FIELDS_DEFAULT_VALUES: Dict[str, Any] = {} 

509 

510 @classmethod 

511 def _get_unset_required_fields(cls, message_dict): 

512 return { 

513 k: v 

514 for k, v in cls.__REQUIRED_FIELDS_DEFAULT_VALUES.items() 

515 if k not in message_dict 

516 } 

517 

518 @staticmethod 

519 def _get_http_options(): 

520 http_options: List[Dict[str, str]] = [ 

521 { 

522 "method": "get", 

523 "uri": "/v2/{parent=projects/*/locations/*/queues/*}/tasks", 

524 }, 

525 ] 

526 return http_options 

527 

528 @staticmethod 

529 def _get_transcoded_request(http_options, request): 

530 pb_request = cloudtasks.ListTasksRequest.pb(request) 

531 transcoded_request = path_template.transcode(http_options, pb_request) 

532 return transcoded_request 

533 

534 @staticmethod 

535 def _get_query_params_json(transcoded_request): 

536 query_params = json.loads( 

537 json_format.MessageToJson( 

538 transcoded_request["query_params"], 

539 use_integers_for_enums=True, 

540 ) 

541 ) 

542 query_params.update( 

543 _BaseCloudTasksRestTransport._BaseListTasks._get_unset_required_fields( 

544 query_params 

545 ) 

546 ) 

547 

548 query_params["$alt"] = "json;enum-encoding=int" 

549 return query_params 

550 

551 class _BasePauseQueue: 

552 def __hash__(self): # pragma: NO COVER 

553 return NotImplementedError("__hash__ must be implemented.") 

554 

555 __REQUIRED_FIELDS_DEFAULT_VALUES: Dict[str, Any] = {} 

556 

557 @classmethod 

558 def _get_unset_required_fields(cls, message_dict): 

559 return { 

560 k: v 

561 for k, v in cls.__REQUIRED_FIELDS_DEFAULT_VALUES.items() 

562 if k not in message_dict 

563 } 

564 

565 @staticmethod 

566 def _get_http_options(): 

567 http_options: List[Dict[str, str]] = [ 

568 { 

569 "method": "post", 

570 "uri": "/v2/{name=projects/*/locations/*/queues/*}:pause", 

571 "body": "*", 

572 }, 

573 ] 

574 return http_options 

575 

576 @staticmethod 

577 def _get_transcoded_request(http_options, request): 

578 pb_request = cloudtasks.PauseQueueRequest.pb(request) 

579 transcoded_request = path_template.transcode(http_options, pb_request) 

580 return transcoded_request 

581 

582 @staticmethod 

583 def _get_request_body_json(transcoded_request): 

584 # Jsonify the request body 

585 

586 body = json_format.MessageToJson( 

587 transcoded_request["body"], use_integers_for_enums=True 

588 ) 

589 return body 

590 

591 @staticmethod 

592 def _get_query_params_json(transcoded_request): 

593 query_params = json.loads( 

594 json_format.MessageToJson( 

595 transcoded_request["query_params"], 

596 use_integers_for_enums=True, 

597 ) 

598 ) 

599 query_params.update( 

600 _BaseCloudTasksRestTransport._BasePauseQueue._get_unset_required_fields( 

601 query_params 

602 ) 

603 ) 

604 

605 query_params["$alt"] = "json;enum-encoding=int" 

606 return query_params 

607 

608 class _BasePurgeQueue: 

609 def __hash__(self): # pragma: NO COVER 

610 return NotImplementedError("__hash__ must be implemented.") 

611 

612 __REQUIRED_FIELDS_DEFAULT_VALUES: Dict[str, Any] = {} 

613 

614 @classmethod 

615 def _get_unset_required_fields(cls, message_dict): 

616 return { 

617 k: v 

618 for k, v in cls.__REQUIRED_FIELDS_DEFAULT_VALUES.items() 

619 if k not in message_dict 

620 } 

621 

622 @staticmethod 

623 def _get_http_options(): 

624 http_options: List[Dict[str, str]] = [ 

625 { 

626 "method": "post", 

627 "uri": "/v2/{name=projects/*/locations/*/queues/*}:purge", 

628 "body": "*", 

629 }, 

630 ] 

631 return http_options 

632 

633 @staticmethod 

634 def _get_transcoded_request(http_options, request): 

635 pb_request = cloudtasks.PurgeQueueRequest.pb(request) 

636 transcoded_request = path_template.transcode(http_options, pb_request) 

637 return transcoded_request 

638 

639 @staticmethod 

640 def _get_request_body_json(transcoded_request): 

641 # Jsonify the request body 

642 

643 body = json_format.MessageToJson( 

644 transcoded_request["body"], use_integers_for_enums=True 

645 ) 

646 return body 

647 

648 @staticmethod 

649 def _get_query_params_json(transcoded_request): 

650 query_params = json.loads( 

651 json_format.MessageToJson( 

652 transcoded_request["query_params"], 

653 use_integers_for_enums=True, 

654 ) 

655 ) 

656 query_params.update( 

657 _BaseCloudTasksRestTransport._BasePurgeQueue._get_unset_required_fields( 

658 query_params 

659 ) 

660 ) 

661 

662 query_params["$alt"] = "json;enum-encoding=int" 

663 return query_params 

664 

665 class _BaseResumeQueue: 

666 def __hash__(self): # pragma: NO COVER 

667 return NotImplementedError("__hash__ must be implemented.") 

668 

669 __REQUIRED_FIELDS_DEFAULT_VALUES: Dict[str, Any] = {} 

670 

671 @classmethod 

672 def _get_unset_required_fields(cls, message_dict): 

673 return { 

674 k: v 

675 for k, v in cls.__REQUIRED_FIELDS_DEFAULT_VALUES.items() 

676 if k not in message_dict 

677 } 

678 

679 @staticmethod 

680 def _get_http_options(): 

681 http_options: List[Dict[str, str]] = [ 

682 { 

683 "method": "post", 

684 "uri": "/v2/{name=projects/*/locations/*/queues/*}:resume", 

685 "body": "*", 

686 }, 

687 ] 

688 return http_options 

689 

690 @staticmethod 

691 def _get_transcoded_request(http_options, request): 

692 pb_request = cloudtasks.ResumeQueueRequest.pb(request) 

693 transcoded_request = path_template.transcode(http_options, pb_request) 

694 return transcoded_request 

695 

696 @staticmethod 

697 def _get_request_body_json(transcoded_request): 

698 # Jsonify the request body 

699 

700 body = json_format.MessageToJson( 

701 transcoded_request["body"], use_integers_for_enums=True 

702 ) 

703 return body 

704 

705 @staticmethod 

706 def _get_query_params_json(transcoded_request): 

707 query_params = json.loads( 

708 json_format.MessageToJson( 

709 transcoded_request["query_params"], 

710 use_integers_for_enums=True, 

711 ) 

712 ) 

713 query_params.update( 

714 _BaseCloudTasksRestTransport._BaseResumeQueue._get_unset_required_fields( 

715 query_params 

716 ) 

717 ) 

718 

719 query_params["$alt"] = "json;enum-encoding=int" 

720 return query_params 

721 

722 class _BaseRunTask: 

723 def __hash__(self): # pragma: NO COVER 

724 return NotImplementedError("__hash__ must be implemented.") 

725 

726 __REQUIRED_FIELDS_DEFAULT_VALUES: Dict[str, Any] = {} 

727 

728 @classmethod 

729 def _get_unset_required_fields(cls, message_dict): 

730 return { 

731 k: v 

732 for k, v in cls.__REQUIRED_FIELDS_DEFAULT_VALUES.items() 

733 if k not in message_dict 

734 } 

735 

736 @staticmethod 

737 def _get_http_options(): 

738 http_options: List[Dict[str, str]] = [ 

739 { 

740 "method": "post", 

741 "uri": "/v2/{name=projects/*/locations/*/queues/*/tasks/*}:run", 

742 "body": "*", 

743 }, 

744 ] 

745 return http_options 

746 

747 @staticmethod 

748 def _get_transcoded_request(http_options, request): 

749 pb_request = cloudtasks.RunTaskRequest.pb(request) 

750 transcoded_request = path_template.transcode(http_options, pb_request) 

751 return transcoded_request 

752 

753 @staticmethod 

754 def _get_request_body_json(transcoded_request): 

755 # Jsonify the request body 

756 

757 body = json_format.MessageToJson( 

758 transcoded_request["body"], use_integers_for_enums=True 

759 ) 

760 return body 

761 

762 @staticmethod 

763 def _get_query_params_json(transcoded_request): 

764 query_params = json.loads( 

765 json_format.MessageToJson( 

766 transcoded_request["query_params"], 

767 use_integers_for_enums=True, 

768 ) 

769 ) 

770 query_params.update( 

771 _BaseCloudTasksRestTransport._BaseRunTask._get_unset_required_fields( 

772 query_params 

773 ) 

774 ) 

775 

776 query_params["$alt"] = "json;enum-encoding=int" 

777 return query_params 

778 

779 class _BaseSetIamPolicy: 

780 def __hash__(self): # pragma: NO COVER 

781 return NotImplementedError("__hash__ must be implemented.") 

782 

783 __REQUIRED_FIELDS_DEFAULT_VALUES: Dict[str, Any] = {} 

784 

785 @classmethod 

786 def _get_unset_required_fields(cls, message_dict): 

787 return { 

788 k: v 

789 for k, v in cls.__REQUIRED_FIELDS_DEFAULT_VALUES.items() 

790 if k not in message_dict 

791 } 

792 

793 @staticmethod 

794 def _get_http_options(): 

795 http_options: List[Dict[str, str]] = [ 

796 { 

797 "method": "post", 

798 "uri": "/v2/{resource=projects/*/locations/*/queues/*}:setIamPolicy", 

799 "body": "*", 

800 }, 

801 ] 

802 return http_options 

803 

804 @staticmethod 

805 def _get_transcoded_request(http_options, request): 

806 pb_request = request 

807 transcoded_request = path_template.transcode(http_options, pb_request) 

808 return transcoded_request 

809 

810 @staticmethod 

811 def _get_request_body_json(transcoded_request): 

812 # Jsonify the request body 

813 

814 body = json_format.MessageToJson( 

815 transcoded_request["body"], use_integers_for_enums=True 

816 ) 

817 return body 

818 

819 @staticmethod 

820 def _get_query_params_json(transcoded_request): 

821 query_params = json.loads( 

822 json_format.MessageToJson( 

823 transcoded_request["query_params"], 

824 use_integers_for_enums=True, 

825 ) 

826 ) 

827 query_params.update( 

828 _BaseCloudTasksRestTransport._BaseSetIamPolicy._get_unset_required_fields( 

829 query_params 

830 ) 

831 ) 

832 

833 query_params["$alt"] = "json;enum-encoding=int" 

834 return query_params 

835 

836 class _BaseTestIamPermissions: 

837 def __hash__(self): # pragma: NO COVER 

838 return NotImplementedError("__hash__ must be implemented.") 

839 

840 __REQUIRED_FIELDS_DEFAULT_VALUES: Dict[str, Any] = {} 

841 

842 @classmethod 

843 def _get_unset_required_fields(cls, message_dict): 

844 return { 

845 k: v 

846 for k, v in cls.__REQUIRED_FIELDS_DEFAULT_VALUES.items() 

847 if k not in message_dict 

848 } 

849 

850 @staticmethod 

851 def _get_http_options(): 

852 http_options: List[Dict[str, str]] = [ 

853 { 

854 "method": "post", 

855 "uri": "/v2/{resource=projects/*/locations/*/queues/*}:testIamPermissions", 

856 "body": "*", 

857 }, 

858 ] 

859 return http_options 

860 

861 @staticmethod 

862 def _get_transcoded_request(http_options, request): 

863 pb_request = request 

864 transcoded_request = path_template.transcode(http_options, pb_request) 

865 return transcoded_request 

866 

867 @staticmethod 

868 def _get_request_body_json(transcoded_request): 

869 # Jsonify the request body 

870 

871 body = json_format.MessageToJson( 

872 transcoded_request["body"], use_integers_for_enums=True 

873 ) 

874 return body 

875 

876 @staticmethod 

877 def _get_query_params_json(transcoded_request): 

878 query_params = json.loads( 

879 json_format.MessageToJson( 

880 transcoded_request["query_params"], 

881 use_integers_for_enums=True, 

882 ) 

883 ) 

884 query_params.update( 

885 _BaseCloudTasksRestTransport._BaseTestIamPermissions._get_unset_required_fields( 

886 query_params 

887 ) 

888 ) 

889 

890 query_params["$alt"] = "json;enum-encoding=int" 

891 return query_params 

892 

893 class _BaseUpdateQueue: 

894 def __hash__(self): # pragma: NO COVER 

895 return NotImplementedError("__hash__ must be implemented.") 

896 

897 __REQUIRED_FIELDS_DEFAULT_VALUES: Dict[str, Any] = {} 

898 

899 @classmethod 

900 def _get_unset_required_fields(cls, message_dict): 

901 return { 

902 k: v 

903 for k, v in cls.__REQUIRED_FIELDS_DEFAULT_VALUES.items() 

904 if k not in message_dict 

905 } 

906 

907 @staticmethod 

908 def _get_http_options(): 

909 http_options: List[Dict[str, str]] = [ 

910 { 

911 "method": "patch", 

912 "uri": "/v2/{queue.name=projects/*/locations/*/queues/*}", 

913 "body": "queue", 

914 }, 

915 ] 

916 return http_options 

917 

918 @staticmethod 

919 def _get_transcoded_request(http_options, request): 

920 pb_request = cloudtasks.UpdateQueueRequest.pb(request) 

921 transcoded_request = path_template.transcode(http_options, pb_request) 

922 return transcoded_request 

923 

924 @staticmethod 

925 def _get_request_body_json(transcoded_request): 

926 # Jsonify the request body 

927 

928 body = json_format.MessageToJson( 

929 transcoded_request["body"], use_integers_for_enums=True 

930 ) 

931 return body 

932 

933 @staticmethod 

934 def _get_query_params_json(transcoded_request): 

935 query_params = json.loads( 

936 json_format.MessageToJson( 

937 transcoded_request["query_params"], 

938 use_integers_for_enums=True, 

939 ) 

940 ) 

941 query_params.update( 

942 _BaseCloudTasksRestTransport._BaseUpdateQueue._get_unset_required_fields( 

943 query_params 

944 ) 

945 ) 

946 

947 query_params["$alt"] = "json;enum-encoding=int" 

948 return query_params 

949 

950 class _BaseGetLocation: 

951 def __hash__(self): # pragma: NO COVER 

952 return NotImplementedError("__hash__ must be implemented.") 

953 

954 @staticmethod 

955 def _get_http_options(): 

956 http_options: List[Dict[str, str]] = [ 

957 { 

958 "method": "get", 

959 "uri": "/v2/{name=projects/*/locations/*}", 

960 }, 

961 ] 

962 return http_options 

963 

964 @staticmethod 

965 def _get_transcoded_request(http_options, request): 

966 request_kwargs = json_format.MessageToDict(request) 

967 transcoded_request = path_template.transcode(http_options, **request_kwargs) 

968 return transcoded_request 

969 

970 @staticmethod 

971 def _get_query_params_json(transcoded_request): 

972 query_params = json.loads(json.dumps(transcoded_request["query_params"])) 

973 return query_params 

974 

975 class _BaseListLocations: 

976 def __hash__(self): # pragma: NO COVER 

977 return NotImplementedError("__hash__ must be implemented.") 

978 

979 @staticmethod 

980 def _get_http_options(): 

981 http_options: List[Dict[str, str]] = [ 

982 { 

983 "method": "get", 

984 "uri": "/v2/{name=projects/*}/locations", 

985 }, 

986 ] 

987 return http_options 

988 

989 @staticmethod 

990 def _get_transcoded_request(http_options, request): 

991 request_kwargs = json_format.MessageToDict(request) 

992 transcoded_request = path_template.transcode(http_options, **request_kwargs) 

993 return transcoded_request 

994 

995 @staticmethod 

996 def _get_query_params_json(transcoded_request): 

997 query_params = json.loads(json.dumps(transcoded_request["query_params"])) 

998 return query_params 

999 

1000 

1001__all__ = ("_BaseCloudTasksRestTransport",)