Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/api_core/bidi.py: 22%

260 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:03 +0000

1# Copyright 2017, Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# https://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Bi-directional streaming RPC helpers.""" 

16 

17import collections 

18import datetime 

19import logging 

20import queue as queue_module 

21import threading 

22import time 

23 

24from google.api_core import exceptions 

25 

26_LOGGER = logging.getLogger(__name__) 

27_BIDIRECTIONAL_CONSUMER_NAME = "Thread-ConsumeBidirectionalStream" 

28 

29 

30class _RequestQueueGenerator(object): 

31 """A helper for sending requests to a gRPC stream from a Queue. 

32 

33 This generator takes requests off a given queue and yields them to gRPC. 

34 

35 This helper is useful when you have an indeterminate, indefinite, or 

36 otherwise open-ended set of requests to send through a request-streaming 

37 (or bidirectional) RPC. 

38 

39 The reason this is necessary is because gRPC takes an iterator as the 

40 request for request-streaming RPCs. gRPC consumes this iterator in another 

41 thread to allow it to block while generating requests for the stream. 

42 However, if the generator blocks indefinitely gRPC will not be able to 

43 clean up the thread as it'll be blocked on `next(iterator)` and not be able 

44 to check the channel status to stop iterating. This helper mitigates that 

45 by waiting on the queue with a timeout and checking the RPC state before 

46 yielding. 

47 

48 Finally, it allows for retrying without swapping queues because if it does 

49 pull an item off the queue when the RPC is inactive, it'll immediately put 

50 it back and then exit. This is necessary because yielding the item in this 

51 case will cause gRPC to discard it. In practice, this means that the order 

52 of messages is not guaranteed. If such a thing is necessary it would be 

53 easy to use a priority queue. 

54 

55 Example:: 

56 

57 requests = request_queue_generator(q) 

58 call = stub.StreamingRequest(iter(requests)) 

59 requests.call = call 

60 

61 for response in call: 

62 print(response) 

63 q.put(...) 

64 

65 Note that it is possible to accomplish this behavior without "spinning" 

66 (using a queue timeout). One possible way would be to use more threads to 

67 multiplex the grpc end event with the queue, another possible way is to 

68 use selectors and a custom event/queue object. Both of these approaches 

69 are significant from an engineering perspective for small benefit - the 

70 CPU consumed by spinning is pretty minuscule. 

71 

72 Args: 

73 queue (queue_module.Queue): The request queue. 

74 period (float): The number of seconds to wait for items from the queue 

75 before checking if the RPC is cancelled. In practice, this 

76 determines the maximum amount of time the request consumption 

77 thread will live after the RPC is cancelled. 

78 initial_request (Union[protobuf.Message, 

79 Callable[None, protobuf.Message]]): The initial request to 

80 yield. This is done independently of the request queue to allow fo 

81 easily restarting streams that require some initial configuration 

82 request. 

83 """ 

84 

85 def __init__(self, queue, period=1, initial_request=None): 

86 self._queue = queue 

87 self._period = period 

88 self._initial_request = initial_request 

89 self.call = None 

90 

91 def _is_active(self): 

92 # Note: there is a possibility that this starts *before* the call 

93 # property is set. So we have to check if self.call is set before 

94 # seeing if it's active. 

95 if self.call is not None and not self.call.is_active(): 

96 return False 

97 else: 

98 return True 

99 

100 def __iter__(self): 

101 if self._initial_request is not None: 

102 if callable(self._initial_request): 

103 yield self._initial_request() 

104 else: 

105 yield self._initial_request 

106 

107 while True: 

108 try: 

109 item = self._queue.get(timeout=self._period) 

110 except queue_module.Empty: 

111 if not self._is_active(): 

112 _LOGGER.debug( 

113 "Empty queue and inactive call, exiting request " "generator." 

114 ) 

115 return 

116 else: 

117 # call is still active, keep waiting for queue items. 

118 continue 

119 

120 # The consumer explicitly sent "None", indicating that the request 

121 # should end. 

122 if item is None: 

123 _LOGGER.debug("Cleanly exiting request generator.") 

124 return 

125 

126 if not self._is_active(): 

127 # We have an item, but the call is closed. We should put the 

128 # item back on the queue so that the next call can consume it. 

129 self._queue.put(item) 

130 _LOGGER.debug( 

131 "Inactive call, replacing item on queue and exiting " 

132 "request generator." 

133 ) 

134 return 

135 

136 yield item 

137 

138 

139class _Throttle(object): 

140 """A context manager limiting the total entries in a sliding time window. 

141 

142 If more than ``access_limit`` attempts are made to enter the context manager 

143 instance in the last ``time window`` interval, the exceeding requests block 

144 until enough time elapses. 

145 

146 The context manager instances are thread-safe and can be shared between 

147 multiple threads. If multiple requests are blocked and waiting to enter, 

148 the exact order in which they are allowed to proceed is not determined. 

149 

150 Example:: 

151 

152 max_three_per_second = _Throttle( 

153 access_limit=3, time_window=datetime.timedelta(seconds=1) 

154 ) 

155 

156 for i in range(5): 

157 with max_three_per_second as time_waited: 

158 print("{}: Waited {} seconds to enter".format(i, time_waited)) 

159 

160 Args: 

161 access_limit (int): the maximum number of entries allowed in the time window 

162 time_window (datetime.timedelta): the width of the sliding time window 

163 """ 

164 

165 def __init__(self, access_limit, time_window): 

166 if access_limit < 1: 

167 raise ValueError("access_limit argument must be positive") 

168 

169 if time_window <= datetime.timedelta(0): 

170 raise ValueError("time_window argument must be a positive timedelta") 

171 

172 self._time_window = time_window 

173 self._access_limit = access_limit 

174 self._past_entries = collections.deque( 

175 maxlen=access_limit 

176 ) # least recent first 

177 self._entry_lock = threading.Lock() 

178 

179 def __enter__(self): 

180 with self._entry_lock: 

181 cutoff_time = datetime.datetime.now() - self._time_window 

182 

183 # drop the entries that are too old, as they are no longer relevant 

184 while self._past_entries and self._past_entries[0] < cutoff_time: 

185 self._past_entries.popleft() 

186 

187 if len(self._past_entries) < self._access_limit: 

188 self._past_entries.append(datetime.datetime.now()) 

189 return 0.0 # no waiting was needed 

190 

191 to_wait = (self._past_entries[0] - cutoff_time).total_seconds() 

192 time.sleep(to_wait) 

193 

194 self._past_entries.append(datetime.datetime.now()) 

195 return to_wait 

196 

197 def __exit__(self, *_): 

198 pass 

199 

200 def __repr__(self): 

201 return "{}(access_limit={}, time_window={})".format( 

202 self.__class__.__name__, self._access_limit, repr(self._time_window) 

203 ) 

204 

205 

206class BidiRpc(object): 

207 """A helper for consuming a bi-directional streaming RPC. 

208 

209 This maps gRPC's built-in interface which uses a request iterator and a 

210 response iterator into a socket-like :func:`send` and :func:`recv`. This 

211 is a more useful pattern for long-running or asymmetric streams (streams 

212 where there is not a direct correlation between the requests and 

213 responses). 

214 

215 Example:: 

216 

217 initial_request = example_pb2.StreamingRpcRequest( 

218 setting='example') 

219 rpc = BidiRpc( 

220 stub.StreamingRpc, 

221 initial_request=initial_request, 

222 metadata=[('name', 'value')] 

223 ) 

224 

225 rpc.open() 

226 

227 while rpc.is_active(): 

228 print(rpc.recv()) 

229 rpc.send(example_pb2.StreamingRpcRequest( 

230 data='example')) 

231 

232 This does *not* retry the stream on errors. See :class:`ResumableBidiRpc`. 

233 

234 Args: 

235 start_rpc (grpc.StreamStreamMultiCallable): The gRPC method used to 

236 start the RPC. 

237 initial_request (Union[protobuf.Message, 

238 Callable[None, protobuf.Message]]): The initial request to 

239 yield. This is useful if an initial request is needed to start the 

240 stream. 

241 metadata (Sequence[Tuple(str, str)]): RPC metadata to include in 

242 the request. 

243 """ 

244 

245 def __init__(self, start_rpc, initial_request=None, metadata=None): 

246 self._start_rpc = start_rpc 

247 self._initial_request = initial_request 

248 self._rpc_metadata = metadata 

249 self._request_queue = queue_module.Queue() 

250 self._request_generator = None 

251 self._is_active = False 

252 self._callbacks = [] 

253 self.call = None 

254 

255 def add_done_callback(self, callback): 

256 """Adds a callback that will be called when the RPC terminates. 

257 

258 This occurs when the RPC errors or is successfully terminated. 

259 

260 Args: 

261 callback (Callable[[grpc.Future], None]): The callback to execute. 

262 It will be provided with the same gRPC future as the underlying 

263 stream which will also be a :class:`grpc.Call`. 

264 """ 

265 self._callbacks.append(callback) 

266 

267 def _on_call_done(self, future): 

268 for callback in self._callbacks: 

269 callback(future) 

270 

271 def open(self): 

272 """Opens the stream.""" 

273 if self.is_active: 

274 raise ValueError("Can not open an already open stream.") 

275 

276 request_generator = _RequestQueueGenerator( 

277 self._request_queue, initial_request=self._initial_request 

278 ) 

279 call = self._start_rpc(iter(request_generator), metadata=self._rpc_metadata) 

280 

281 request_generator.call = call 

282 

283 # TODO: api_core should expose the future interface for wrapped 

284 # callables as well. 

285 if hasattr(call, "_wrapped"): # pragma: NO COVER 

286 call._wrapped.add_done_callback(self._on_call_done) 

287 else: 

288 call.add_done_callback(self._on_call_done) 

289 

290 self._request_generator = request_generator 

291 self.call = call 

292 

293 def close(self): 

294 """Closes the stream.""" 

295 if self.call is None: 

296 return 

297 

298 self._request_queue.put(None) 

299 self.call.cancel() 

300 self._request_generator = None 

301 # Don't set self.call to None. Keep it around so that send/recv can 

302 # raise the error. 

303 

304 def send(self, request): 

305 """Queue a message to be sent on the stream. 

306 

307 Send is non-blocking. 

308 

309 If the underlying RPC has been closed, this will raise. 

310 

311 Args: 

312 request (protobuf.Message): The request to send. 

313 """ 

314 if self.call is None: 

315 raise ValueError("Can not send() on an RPC that has never been open()ed.") 

316 

317 # Don't use self.is_active(), as ResumableBidiRpc will overload it 

318 # to mean something semantically different. 

319 if self.call.is_active(): 

320 self._request_queue.put(request) 

321 else: 

322 # calling next should cause the call to raise. 

323 next(self.call) 

324 

325 def recv(self): 

326 """Wait for a message to be returned from the stream. 

327 

328 Recv is blocking. 

329 

330 If the underlying RPC has been closed, this will raise. 

331 

332 Returns: 

333 protobuf.Message: The received message. 

334 """ 

335 if self.call is None: 

336 raise ValueError("Can not recv() on an RPC that has never been open()ed.") 

337 

338 return next(self.call) 

339 

340 @property 

341 def is_active(self): 

342 """bool: True if this stream is currently open and active.""" 

343 return self.call is not None and self.call.is_active() 

344 

345 @property 

346 def pending_requests(self): 

347 """int: Returns an estimate of the number of queued requests.""" 

348 return self._request_queue.qsize() 

349 

350 

351def _never_terminate(future_or_error): 

352 """By default, no errors cause BiDi termination.""" 

353 return False 

354 

355 

356class ResumableBidiRpc(BidiRpc): 

357 """A :class:`BidiRpc` that can automatically resume the stream on errors. 

358 

359 It uses the ``should_recover`` arg to determine if it should re-establish 

360 the stream on error. 

361 

362 Example:: 

363 

364 def should_recover(exc): 

365 return ( 

366 isinstance(exc, grpc.RpcError) and 

367 exc.code() == grpc.StatusCode.UNAVAILABLE) 

368 

369 initial_request = example_pb2.StreamingRpcRequest( 

370 setting='example') 

371 

372 metadata = [('header_name', 'value')] 

373 

374 rpc = ResumableBidiRpc( 

375 stub.StreamingRpc, 

376 should_recover=should_recover, 

377 initial_request=initial_request, 

378 metadata=metadata 

379 ) 

380 

381 rpc.open() 

382 

383 while rpc.is_active(): 

384 print(rpc.recv()) 

385 rpc.send(example_pb2.StreamingRpcRequest( 

386 data='example')) 

387 

388 Args: 

389 start_rpc (grpc.StreamStreamMultiCallable): The gRPC method used to 

390 start the RPC. 

391 initial_request (Union[protobuf.Message, 

392 Callable[None, protobuf.Message]]): The initial request to 

393 yield. This is useful if an initial request is needed to start the 

394 stream. 

395 should_recover (Callable[[Exception], bool]): A function that returns 

396 True if the stream should be recovered. This will be called 

397 whenever an error is encountered on the stream. 

398 should_terminate (Callable[[Exception], bool]): A function that returns 

399 True if the stream should be terminated. This will be called 

400 whenever an error is encountered on the stream. 

401 metadata Sequence[Tuple(str, str)]: RPC metadata to include in 

402 the request. 

403 throttle_reopen (bool): If ``True``, throttling will be applied to 

404 stream reopen calls. Defaults to ``False``. 

405 """ 

406 

407 def __init__( 

408 self, 

409 start_rpc, 

410 should_recover, 

411 should_terminate=_never_terminate, 

412 initial_request=None, 

413 metadata=None, 

414 throttle_reopen=False, 

415 ): 

416 super(ResumableBidiRpc, self).__init__(start_rpc, initial_request, metadata) 

417 self._should_recover = should_recover 

418 self._should_terminate = should_terminate 

419 self._operational_lock = threading.RLock() 

420 self._finalized = False 

421 self._finalize_lock = threading.Lock() 

422 

423 if throttle_reopen: 

424 self._reopen_throttle = _Throttle( 

425 access_limit=5, time_window=datetime.timedelta(seconds=10) 

426 ) 

427 else: 

428 self._reopen_throttle = None 

429 

430 def _finalize(self, result): 

431 with self._finalize_lock: 

432 if self._finalized: 

433 return 

434 

435 for callback in self._callbacks: 

436 callback(result) 

437 

438 self._finalized = True 

439 

440 def _on_call_done(self, future): 

441 # Unlike the base class, we only execute the callbacks on a terminal 

442 # error, not for errors that we can recover from. Note that grpc's 

443 # "future" here is also a grpc.RpcError. 

444 with self._operational_lock: 

445 if self._should_terminate(future): 

446 self._finalize(future) 

447 elif not self._should_recover(future): 

448 self._finalize(future) 

449 else: 

450 _LOGGER.debug("Re-opening stream from gRPC callback.") 

451 self._reopen() 

452 

453 def _reopen(self): 

454 with self._operational_lock: 

455 # Another thread already managed to re-open this stream. 

456 if self.call is not None and self.call.is_active(): 

457 _LOGGER.debug("Stream was already re-established.") 

458 return 

459 

460 self.call = None 

461 # Request generator should exit cleanly since the RPC its bound to 

462 # has exited. 

463 self._request_generator = None 

464 

465 # Note: we do not currently do any sort of backoff here. The 

466 # assumption is that re-establishing the stream under normal 

467 # circumstances will happen in intervals greater than 60s. 

468 # However, it is possible in a degenerative case that the server 

469 # closes the stream rapidly which would lead to thrashing here, 

470 # but hopefully in those cases the server would return a non- 

471 # retryable error. 

472 

473 try: 

474 if self._reopen_throttle: 

475 with self._reopen_throttle: 

476 self.open() 

477 else: 

478 self.open() 

479 # If re-opening or re-calling the method fails for any reason, 

480 # consider it a terminal error and finalize the stream. 

481 except Exception as exc: 

482 _LOGGER.debug("Failed to re-open stream due to %s", exc) 

483 self._finalize(exc) 

484 raise 

485 

486 _LOGGER.info("Re-established stream") 

487 

488 def _recoverable(self, method, *args, **kwargs): 

489 """Wraps a method to recover the stream and retry on error. 

490 

491 If a retryable error occurs while making the call, then the stream will 

492 be re-opened and the method will be retried. This happens indefinitely 

493 so long as the error is a retryable one. If an error occurs while 

494 re-opening the stream, then this method will raise immediately and 

495 trigger finalization of this object. 

496 

497 Args: 

498 method (Callable[..., Any]): The method to call. 

499 args: The args to pass to the method. 

500 kwargs: The kwargs to pass to the method. 

501 """ 

502 while True: 

503 try: 

504 return method(*args, **kwargs) 

505 

506 except Exception as exc: 

507 with self._operational_lock: 

508 _LOGGER.debug("Call to retryable %r caused %s.", method, exc) 

509 

510 if self._should_terminate(exc): 

511 self.close() 

512 _LOGGER.debug("Terminating %r due to %s.", method, exc) 

513 self._finalize(exc) 

514 break 

515 

516 if not self._should_recover(exc): 

517 self.close() 

518 _LOGGER.debug("Not retrying %r due to %s.", method, exc) 

519 self._finalize(exc) 

520 raise exc 

521 

522 _LOGGER.debug("Re-opening stream from retryable %r.", method) 

523 self._reopen() 

524 

525 def _send(self, request): 

526 # Grab a reference to the RPC call. Because another thread (notably 

527 # the gRPC error thread) can modify self.call (by invoking reopen), 

528 # we should ensure our reference can not change underneath us. 

529 # If self.call is modified (such as replaced with a new RPC call) then 

530 # this will use the "old" RPC, which should result in the same 

531 # exception passed into gRPC's error handler being raised here, which 

532 # will be handled by the usual error handling in retryable. 

533 with self._operational_lock: 

534 call = self.call 

535 

536 if call is None: 

537 raise ValueError("Can not send() on an RPC that has never been open()ed.") 

538 

539 # Don't use self.is_active(), as ResumableBidiRpc will overload it 

540 # to mean something semantically different. 

541 if call.is_active(): 

542 self._request_queue.put(request) 

543 pass 

544 else: 

545 # calling next should cause the call to raise. 

546 next(call) 

547 

548 def send(self, request): 

549 return self._recoverable(self._send, request) 

550 

551 def _recv(self): 

552 with self._operational_lock: 

553 call = self.call 

554 

555 if call is None: 

556 raise ValueError("Can not recv() on an RPC that has never been open()ed.") 

557 

558 return next(call) 

559 

560 def recv(self): 

561 return self._recoverable(self._recv) 

562 

563 def close(self): 

564 self._finalize(None) 

565 super(ResumableBidiRpc, self).close() 

566 

567 @property 

568 def is_active(self): 

569 """bool: True if this stream is currently open and active.""" 

570 # Use the operational lock. It's entirely possible for something 

571 # to check the active state *while* the RPC is being retried. 

572 # Also, use finalized to track the actual terminal state here. 

573 # This is because if the stream is re-established by the gRPC thread 

574 # it's technically possible to check this between when gRPC marks the 

575 # RPC as inactive and when gRPC executes our callback that re-opens 

576 # the stream. 

577 with self._operational_lock: 

578 return self.call is not None and not self._finalized 

579 

580 

581class BackgroundConsumer(object): 

582 """A bi-directional stream consumer that runs in a separate thread. 

583 

584 This maps the consumption of a stream into a callback-based model. It also 

585 provides :func:`pause` and :func:`resume` to allow for flow-control. 

586 

587 Example:: 

588 

589 def should_recover(exc): 

590 return ( 

591 isinstance(exc, grpc.RpcError) and 

592 exc.code() == grpc.StatusCode.UNAVAILABLE) 

593 

594 initial_request = example_pb2.StreamingRpcRequest( 

595 setting='example') 

596 

597 rpc = ResumeableBidiRpc( 

598 stub.StreamingRpc, 

599 initial_request=initial_request, 

600 should_recover=should_recover) 

601 

602 def on_response(response): 

603 print(response) 

604 

605 consumer = BackgroundConsumer(rpc, on_response) 

606 consumer.start() 

607 

608 Note that error handling *must* be done by using the provided 

609 ``bidi_rpc``'s ``add_done_callback``. This helper will automatically exit 

610 whenever the RPC itself exits and will not provide any error details. 

611 

612 Args: 

613 bidi_rpc (BidiRpc): The RPC to consume. Should not have been 

614 ``open()``ed yet. 

615 on_response (Callable[[protobuf.Message], None]): The callback to 

616 be called for every response on the stream. 

617 """ 

618 

619 def __init__(self, bidi_rpc, on_response): 

620 self._bidi_rpc = bidi_rpc 

621 self._on_response = on_response 

622 self._paused = False 

623 self._wake = threading.Condition() 

624 self._thread = None 

625 self._operational_lock = threading.Lock() 

626 

627 def _on_call_done(self, future): 

628 # Resume the thread if it's paused, this prevents blocking forever 

629 # when the RPC has terminated. 

630 self.resume() 

631 

632 def _thread_main(self, ready): 

633 try: 

634 ready.set() 

635 self._bidi_rpc.add_done_callback(self._on_call_done) 

636 self._bidi_rpc.open() 

637 

638 while self._bidi_rpc.is_active: 

639 # Do not allow the paused status to change at all during this 

640 # section. There is a condition where we could be resumed 

641 # between checking if we are paused and calling wake.wait(), 

642 # which means that we will miss the notification to wake up 

643 # (oops!) and wait for a notification that will never come. 

644 # Keeping the lock throughout avoids that. 

645 # In the future, we could use `Condition.wait_for` if we drop 

646 # Python 2.7. 

647 # See: https://github.com/googleapis/python-api-core/issues/211 

648 with self._wake: 

649 while self._paused: 

650 _LOGGER.debug("paused, waiting for waking.") 

651 self._wake.wait() 

652 _LOGGER.debug("woken.") 

653 

654 _LOGGER.debug("waiting for recv.") 

655 response = self._bidi_rpc.recv() 

656 _LOGGER.debug("recved response.") 

657 self._on_response(response) 

658 

659 except exceptions.GoogleAPICallError as exc: 

660 _LOGGER.debug( 

661 "%s caught error %s and will exit. Generally this is due to " 

662 "the RPC itself being cancelled and the error will be " 

663 "surfaced to the calling code.", 

664 _BIDIRECTIONAL_CONSUMER_NAME, 

665 exc, 

666 exc_info=True, 

667 ) 

668 

669 except Exception as exc: 

670 _LOGGER.exception( 

671 "%s caught unexpected exception %s and will exit.", 

672 _BIDIRECTIONAL_CONSUMER_NAME, 

673 exc, 

674 ) 

675 

676 _LOGGER.info("%s exiting", _BIDIRECTIONAL_CONSUMER_NAME) 

677 

678 def start(self): 

679 """Start the background thread and begin consuming the thread.""" 

680 with self._operational_lock: 

681 ready = threading.Event() 

682 thread = threading.Thread( 

683 name=_BIDIRECTIONAL_CONSUMER_NAME, 

684 target=self._thread_main, 

685 args=(ready,), 

686 ) 

687 thread.daemon = True 

688 thread.start() 

689 # Other parts of the code rely on `thread.is_alive` which 

690 # isn't sufficient to know if a thread is active, just that it may 

691 # soon be active. This can cause races. Further protect 

692 # against races by using a ready event and wait on it to be set. 

693 ready.wait() 

694 self._thread = thread 

695 _LOGGER.debug("Started helper thread %s", thread.name) 

696 

697 def stop(self): 

698 """Stop consuming the stream and shutdown the background thread.""" 

699 with self._operational_lock: 

700 self._bidi_rpc.close() 

701 

702 if self._thread is not None: 

703 # Resume the thread to wake it up in case it is sleeping. 

704 self.resume() 

705 # The daemonized thread may itself block, so don't wait 

706 # for it longer than a second. 

707 self._thread.join(1.0) 

708 if self._thread.is_alive(): # pragma: NO COVER 

709 _LOGGER.warning("Background thread did not exit.") 

710 

711 self._thread = None 

712 

713 @property 

714 def is_active(self): 

715 """bool: True if the background thread is active.""" 

716 return self._thread is not None and self._thread.is_alive() 

717 

718 def pause(self): 

719 """Pauses the response stream. 

720 

721 This does *not* pause the request stream. 

722 """ 

723 with self._wake: 

724 self._paused = True 

725 

726 def resume(self): 

727 """Resumes the response stream.""" 

728 with self._wake: 

729 self._paused = False 

730 self._wake.notify_all() 

731 

732 @property 

733 def is_paused(self): 

734 """bool: True if the response stream is paused.""" 

735 return self._paused