Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/api_core/bidi.py: 21%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

260 statements  

1# Copyright 2017, Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# https://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Helpers for synchronous bidirectional streaming RPCs.""" 

16 

17import collections 

18import datetime 

19import logging 

20import queue as queue_module 

21import threading 

22import time 

23 

24from google.api_core import exceptions 

25from google.api_core.bidi_base import BidiRpcBase 

26 

27_LOGGER = logging.getLogger(__name__) 

28_BIDIRECTIONAL_CONSUMER_NAME = "Thread-ConsumeBidirectionalStream" 

29 

30 

31class _RequestQueueGenerator(object): 

32 """A helper for sending requests to a gRPC stream from a Queue. 

33 

34 This generator takes requests off a given queue and yields them to gRPC. 

35 

36 This helper is useful when you have an indeterminate, indefinite, or 

37 otherwise open-ended set of requests to send through a request-streaming 

38 (or bidirectional) RPC. 

39 

40 

41 Example:: 

42 

43 requests = request_queue_generator(q) 

44 call = stub.StreamingRequest(iter(requests)) 

45 requests.call = call 

46 

47 for response in call: 

48 print(response) 

49 q.put(...) 

50 

51 

52 Args: 

53 queue (queue_module.Queue): The request queue. 

54 period (float): The number of seconds to wait for items from the queue 

55 before checking if the RPC is cancelled. In practice, this 

56 determines the maximum amount of time the request consumption 

57 thread will live after the RPC is cancelled. 

58 initial_request (Union[protobuf.Message, 

59 Callable[None, protobuf.Message]]): The initial request to 

60 yield. This is done independently of the request queue to allow fo 

61 easily restarting streams that require some initial configuration 

62 request. 

63 """ 

64 

65 def __init__(self, queue, period=1, initial_request=None): 

66 self._queue = queue 

67 self._period = period 

68 self._initial_request = initial_request 

69 self.call = None 

70 

71 def _is_active(self): 

72 # Note: there is a possibility that this starts *before* the call 

73 # property is set. So we have to check if self.call is set before 

74 # seeing if it's active. We need to return True if self.call is None. 

75 # See https://github.com/googleapis/python-api-core/issues/560. 

76 return self.call is None or self.call.is_active() 

77 

78 def __iter__(self): 

79 # The reason this is necessary is because gRPC takes an iterator as the 

80 # request for request-streaming RPCs. gRPC consumes this iterator in 

81 # another thread to allow it to block while generating requests for 

82 # the stream. However, if the generator blocks indefinitely gRPC will 

83 # not be able to clean up the thread as it'll be blocked on 

84 # `next(iterator)` and not be able to check the channel status to stop 

85 # iterating. This helper mitigates that by waiting on the queue with 

86 # a timeout and checking the RPC state before yielding. 

87 # 

88 # Finally, it allows for retrying without swapping queues because if 

89 # it does pull an item off the queue when the RPC is inactive, it'll 

90 # immediately put it back and then exit. This is necessary because 

91 # yielding the item in this case will cause gRPC to discard it. In 

92 # practice, this means that the order of messages is not guaranteed. 

93 # If such a thing is necessary it would be easy to use a priority 

94 # queue. 

95 # 

96 # Note that it is possible to accomplish this behavior without 

97 # "spinning" (using a queue timeout). One possible way would be to use 

98 # more threads to multiplex the grpc end event with the queue, another 

99 # possible way is to use selectors and a custom event/queue object. 

100 # Both of these approaches are significant from an engineering 

101 # perspective for small benefit - the CPU consumed by spinning is 

102 # pretty minuscule. 

103 

104 if self._initial_request is not None: 

105 if callable(self._initial_request): 

106 yield self._initial_request() 

107 else: 

108 yield self._initial_request 

109 

110 while True: 

111 try: 

112 item = self._queue.get(timeout=self._period) 

113 except queue_module.Empty: 

114 if not self._is_active(): 

115 _LOGGER.debug( 

116 "Empty queue and inactive call, exiting request " "generator." 

117 ) 

118 return 

119 else: 

120 # call is still active, keep waiting for queue items. 

121 continue 

122 

123 # The consumer explicitly sent "None", indicating that the request 

124 # should end. 

125 if item is None: 

126 _LOGGER.debug("Cleanly exiting request generator.") 

127 return 

128 

129 if not self._is_active(): 

130 # We have an item, but the call is closed. We should put the 

131 # item back on the queue so that the next call can consume it. 

132 self._queue.put(item) 

133 _LOGGER.debug( 

134 "Inactive call, replacing item on queue and exiting " 

135 "request generator." 

136 ) 

137 return 

138 

139 yield item 

140 

141 

142class _Throttle(object): 

143 """A context manager limiting the total entries in a sliding time window. 

144 

145 If more than ``access_limit`` attempts are made to enter the context manager 

146 instance in the last ``time window`` interval, the exceeding requests block 

147 until enough time elapses. 

148 

149 The context manager instances are thread-safe and can be shared between 

150 multiple threads. If multiple requests are blocked and waiting to enter, 

151 the exact order in which they are allowed to proceed is not determined. 

152 

153 Example:: 

154 

155 max_three_per_second = _Throttle( 

156 access_limit=3, time_window=datetime.timedelta(seconds=1) 

157 ) 

158 

159 for i in range(5): 

160 with max_three_per_second as time_waited: 

161 print("{}: Waited {} seconds to enter".format(i, time_waited)) 

162 

163 Args: 

164 access_limit (int): the maximum number of entries allowed in the time window 

165 time_window (datetime.timedelta): the width of the sliding time window 

166 """ 

167 

168 def __init__(self, access_limit, time_window): 

169 if access_limit < 1: 

170 raise ValueError("access_limit argument must be positive") 

171 

172 if time_window <= datetime.timedelta(0): 

173 raise ValueError("time_window argument must be a positive timedelta") 

174 

175 self._time_window = time_window 

176 self._access_limit = access_limit 

177 self._past_entries = collections.deque( 

178 maxlen=access_limit 

179 ) # least recent first 

180 self._entry_lock = threading.Lock() 

181 

182 def __enter__(self): 

183 with self._entry_lock: 

184 cutoff_time = datetime.datetime.now() - self._time_window 

185 

186 # drop the entries that are too old, as they are no longer relevant 

187 while self._past_entries and self._past_entries[0] < cutoff_time: 

188 self._past_entries.popleft() 

189 

190 if len(self._past_entries) < self._access_limit: 

191 self._past_entries.append(datetime.datetime.now()) 

192 return 0.0 # no waiting was needed 

193 

194 to_wait = (self._past_entries[0] - cutoff_time).total_seconds() 

195 time.sleep(to_wait) 

196 

197 self._past_entries.append(datetime.datetime.now()) 

198 return to_wait 

199 

200 def __exit__(self, *_): 

201 pass 

202 

203 def __repr__(self): 

204 return "{}(access_limit={}, time_window={})".format( 

205 self.__class__.__name__, self._access_limit, repr(self._time_window) 

206 ) 

207 

208 

209class BidiRpc(BidiRpcBase): 

210 """A helper for consuming a bi-directional streaming RPC. 

211 

212 This maps gRPC's built-in interface which uses a request iterator and a 

213 response iterator into a socket-like :func:`send` and :func:`recv`. This 

214 is a more useful pattern for long-running or asymmetric streams (streams 

215 where there is not a direct correlation between the requests and 

216 responses). 

217 

218 Example:: 

219 

220 initial_request = example_pb2.StreamingRpcRequest( 

221 setting='example') 

222 rpc = BidiRpc( 

223 stub.StreamingRpc, 

224 initial_request=initial_request, 

225 metadata=[('name', 'value')] 

226 ) 

227 

228 rpc.open() 

229 

230 while rpc.is_active(): 

231 print(rpc.recv()) 

232 rpc.send(example_pb2.StreamingRpcRequest( 

233 data='example')) 

234 

235 rpc.close() 

236 

237 This does *not* retry the stream on errors. See :class:`ResumableBidiRpc`. 

238 

239 Args: 

240 start_rpc (grpc.StreamStreamMultiCallable): The gRPC method used to 

241 start the RPC. 

242 initial_request (Union[protobuf.Message, 

243 Callable[None, protobuf.Message]]): The initial request to 

244 yield. This is useful if an initial request is needed to start the 

245 stream. 

246 metadata (Sequence[Tuple(str, str)]): RPC metadata to include in 

247 the request. 

248 """ 

249 

250 def _create_queue(self): 

251 """Create a queue for requests.""" 

252 return queue_module.Queue() 

253 

254 def open(self): 

255 """Opens the stream.""" 

256 if self.is_active: 

257 raise ValueError("Cannot open an already open stream.") 

258 

259 request_generator = _RequestQueueGenerator( 

260 self._request_queue, initial_request=self._initial_request 

261 ) 

262 try: 

263 call = self._start_rpc(iter(request_generator), metadata=self._rpc_metadata) 

264 except exceptions.GoogleAPICallError as exc: 

265 # The original `grpc.RpcError` (which is usually also a `grpc.Call`) is 

266 # available from the ``response`` property on the mapped exception. 

267 self._on_call_done(exc.response) 

268 raise 

269 

270 request_generator.call = call 

271 

272 # TODO: api_core should expose the future interface for wrapped 

273 # callables as well. 

274 if hasattr(call, "_wrapped"): # pragma: NO COVER 

275 call._wrapped.add_done_callback(self._on_call_done) 

276 else: 

277 call.add_done_callback(self._on_call_done) 

278 

279 self._request_generator = request_generator 

280 self.call = call 

281 

282 def close(self): 

283 """Closes the stream.""" 

284 if self.call is None: 

285 return 

286 

287 self._request_queue.put(None) 

288 self.call.cancel() 

289 self._request_generator = None 

290 self._initial_request = None 

291 self._callbacks = [] 

292 # Don't set self.call to None. Keep it around so that send/recv can 

293 # raise the error. 

294 

295 def send(self, request): 

296 """Queue a message to be sent on the stream. 

297 

298 Send is non-blocking. 

299 

300 If the underlying RPC has been closed, this will raise. 

301 

302 Args: 

303 request (protobuf.Message): The request to send. 

304 """ 

305 if self.call is None: 

306 raise ValueError("Cannot send on an RPC stream that has never been opened.") 

307 

308 # Don't use self.is_active(), as ResumableBidiRpc will overload it 

309 # to mean something semantically different. 

310 if self.call.is_active(): 

311 self._request_queue.put(request) 

312 else: 

313 # calling next should cause the call to raise. 

314 next(self.call) 

315 

316 def recv(self): 

317 """Wait for a message to be returned from the stream. 

318 

319 Recv is blocking. 

320 

321 If the underlying RPC has been closed, this will raise. 

322 

323 Returns: 

324 protobuf.Message: The received message. 

325 """ 

326 if self.call is None: 

327 raise ValueError("Cannot recv on an RPC stream that has never been opened.") 

328 

329 return next(self.call) 

330 

331 @property 

332 def is_active(self): 

333 """True if this stream is currently open and active.""" 

334 return self.call is not None and self.call.is_active() 

335 

336 

337def _never_terminate(future_or_error): 

338 """By default, no errors cause BiDi termination.""" 

339 return False 

340 

341 

342class ResumableBidiRpc(BidiRpc): 

343 """A :class:`BidiRpc` that can automatically resume the stream on errors. 

344 

345 It uses the ``should_recover`` arg to determine if it should re-establish 

346 the stream on error. 

347 

348 Example:: 

349 

350 def should_recover(exc): 

351 return ( 

352 isinstance(exc, grpc.RpcError) and 

353 exc.code() == grpc.StatusCode.UNAVAILABLE) 

354 

355 initial_request = example_pb2.StreamingRpcRequest( 

356 setting='example') 

357 

358 metadata = [('header_name', 'value')] 

359 

360 rpc = ResumableBidiRpc( 

361 stub.StreamingRpc, 

362 should_recover=should_recover, 

363 initial_request=initial_request, 

364 metadata=metadata 

365 ) 

366 

367 rpc.open() 

368 

369 while rpc.is_active(): 

370 print(rpc.recv()) 

371 rpc.send(example_pb2.StreamingRpcRequest( 

372 data='example')) 

373 

374 Args: 

375 start_rpc (grpc.StreamStreamMultiCallable): The gRPC method used to 

376 start the RPC. 

377 initial_request (Union[protobuf.Message, 

378 Callable[None, protobuf.Message]]): The initial request to 

379 yield. This is useful if an initial request is needed to start the 

380 stream. 

381 should_recover (Callable[[Exception], bool]): A function that returns 

382 True if the stream should be recovered. This will be called 

383 whenever an error is encountered on the stream. 

384 should_terminate (Callable[[Exception], bool]): A function that returns 

385 True if the stream should be terminated. This will be called 

386 whenever an error is encountered on the stream. 

387 metadata Sequence[Tuple(str, str)]: RPC metadata to include in 

388 the request. 

389 throttle_reopen (bool): If ``True``, throttling will be applied to 

390 stream reopen calls. Defaults to ``False``. 

391 """ 

392 

393 def __init__( 

394 self, 

395 start_rpc, 

396 should_recover, 

397 should_terminate=_never_terminate, 

398 initial_request=None, 

399 metadata=None, 

400 throttle_reopen=False, 

401 ): 

402 super(ResumableBidiRpc, self).__init__(start_rpc, initial_request, metadata) 

403 self._should_recover = should_recover 

404 self._should_terminate = should_terminate 

405 self._operational_lock = threading.RLock() 

406 self._finalized = False 

407 self._finalize_lock = threading.Lock() 

408 

409 if throttle_reopen: 

410 self._reopen_throttle = _Throttle( 

411 access_limit=5, time_window=datetime.timedelta(seconds=10) 

412 ) 

413 else: 

414 self._reopen_throttle = None 

415 

416 def _finalize(self, result): 

417 with self._finalize_lock: 

418 if self._finalized: 

419 return 

420 

421 for callback in self._callbacks: 

422 callback(result) 

423 

424 self._finalized = True 

425 

426 def _on_call_done(self, future): 

427 # Unlike the base class, we only execute the callbacks on a terminal 

428 # error, not for errors that we can recover from. Note that grpc's 

429 # "future" here is also a grpc.RpcError. 

430 with self._operational_lock: 

431 if self._should_terminate(future): 

432 self._finalize(future) 

433 elif not self._should_recover(future): 

434 self._finalize(future) 

435 else: 

436 _LOGGER.debug("Re-opening stream from gRPC callback.") 

437 self._reopen() 

438 

439 def _reopen(self): 

440 with self._operational_lock: 

441 # Another thread already managed to re-open this stream. 

442 if self.call is not None and self.call.is_active(): 

443 _LOGGER.debug("Stream was already re-established.") 

444 return 

445 

446 self.call = None 

447 # Request generator should exit cleanly since the RPC its bound to 

448 # has exited. 

449 self._request_generator = None 

450 

451 # Note: we do not currently do any sort of backoff here. The 

452 # assumption is that re-establishing the stream under normal 

453 # circumstances will happen in intervals greater than 60s. 

454 # However, it is possible in a degenerative case that the server 

455 # closes the stream rapidly which would lead to thrashing here, 

456 # but hopefully in those cases the server would return a non- 

457 # retryable error. 

458 

459 try: 

460 if self._reopen_throttle: 

461 with self._reopen_throttle: 

462 self.open() 

463 else: 

464 self.open() 

465 # If re-opening or re-calling the method fails for any reason, 

466 # consider it a terminal error and finalize the stream. 

467 except Exception as exc: 

468 _LOGGER.debug("Failed to re-open stream due to %s", exc) 

469 self._finalize(exc) 

470 raise 

471 

472 _LOGGER.info("Re-established stream") 

473 

474 def _recoverable(self, method, *args, **kwargs): 

475 """Wraps a method to recover the stream and retry on error. 

476 

477 If a retryable error occurs while making the call, then the stream will 

478 be re-opened and the method will be retried. This happens indefinitely 

479 so long as the error is a retryable one. If an error occurs while 

480 re-opening the stream, then this method will raise immediately and 

481 trigger finalization of this object. 

482 

483 Args: 

484 method (Callable[..., Any]): The method to call. 

485 args: The args to pass to the method. 

486 kwargs: The kwargs to pass to the method. 

487 """ 

488 while True: 

489 try: 

490 return method(*args, **kwargs) 

491 

492 except Exception as exc: 

493 with self._operational_lock: 

494 _LOGGER.debug("Call to retryable %r caused %s.", method, exc) 

495 

496 if self._should_terminate(exc): 

497 self.close() 

498 _LOGGER.debug("Terminating %r due to %s.", method, exc) 

499 self._finalize(exc) 

500 break 

501 

502 if not self._should_recover(exc): 

503 self.close() 

504 _LOGGER.debug("Not retrying %r due to %s.", method, exc) 

505 self._finalize(exc) 

506 raise exc 

507 

508 _LOGGER.debug("Re-opening stream from retryable %r.", method) 

509 self._reopen() 

510 

511 def _send(self, request): 

512 # Grab a reference to the RPC call. Because another thread (notably 

513 # the gRPC error thread) can modify self.call (by invoking reopen), 

514 # we should ensure our reference can not change underneath us. 

515 # If self.call is modified (such as replaced with a new RPC call) then 

516 # this will use the "old" RPC, which should result in the same 

517 # exception passed into gRPC's error handler being raised here, which 

518 # will be handled by the usual error handling in retryable. 

519 with self._operational_lock: 

520 call = self.call 

521 

522 if call is None: 

523 raise ValueError("Cannot send on an RPC that has never been opened.") 

524 

525 # Don't use self.is_active(), as ResumableBidiRpc will overload it 

526 # to mean something semantically different. 

527 if call.is_active(): 

528 self._request_queue.put(request) 

529 pass 

530 else: 

531 # calling next should cause the call to raise. 

532 next(call) 

533 

534 def send(self, request): 

535 return self._recoverable(self._send, request) 

536 

537 def _recv(self): 

538 with self._operational_lock: 

539 call = self.call 

540 

541 if call is None: 

542 raise ValueError("Cannot recv on an RPC that has never been opened.") 

543 

544 return next(call) 

545 

546 def recv(self): 

547 return self._recoverable(self._recv) 

548 

549 def close(self): 

550 self._finalize(None) 

551 super(ResumableBidiRpc, self).close() 

552 

553 @property 

554 def is_active(self): 

555 """bool: True if this stream is currently open and active.""" 

556 # Use the operational lock. It's entirely possible for something 

557 # to check the active state *while* the RPC is being retried. 

558 # Also, use finalized to track the actual terminal state here. 

559 # This is because if the stream is re-established by the gRPC thread 

560 # it's technically possible to check this between when gRPC marks the 

561 # RPC as inactive and when gRPC executes our callback that re-opens 

562 # the stream. 

563 with self._operational_lock: 

564 return self.call is not None and not self._finalized 

565 

566 

567class BackgroundConsumer(object): 

568 """A bi-directional stream consumer that runs in a separate thread. 

569 

570 This maps the consumption of a stream into a callback-based model. It also 

571 provides :func:`pause` and :func:`resume` to allow for flow-control. 

572 

573 Example:: 

574 

575 def should_recover(exc): 

576 return ( 

577 isinstance(exc, grpc.RpcError) and 

578 exc.code() == grpc.StatusCode.UNAVAILABLE) 

579 

580 initial_request = example_pb2.StreamingRpcRequest( 

581 setting='example') 

582 

583 rpc = ResumeableBidiRpc( 

584 stub.StreamingRpc, 

585 initial_request=initial_request, 

586 should_recover=should_recover) 

587 

588 def on_response(response): 

589 print(response) 

590 

591 consumer = BackgroundConsumer(rpc, on_response) 

592 consumer.start() 

593 

594 Note that error handling *must* be done by using the provided 

595 ``bidi_rpc``'s ``add_done_callback``. This helper will automatically exit 

596 whenever the RPC itself exits and will not provide any error details. 

597 

598 Args: 

599 bidi_rpc (BidiRpc): The RPC to consume. Should not have been 

600 ``open()``ed yet. 

601 on_response (Callable[[protobuf.Message], None]): The callback to 

602 be called for every response on the stream. 

603 on_fatal_exception (Callable[[Exception], None]): The callback to 

604 be called on fatal errors during consumption. Default None. 

605 """ 

606 

607 def __init__(self, bidi_rpc, on_response, on_fatal_exception=None): 

608 self._bidi_rpc = bidi_rpc 

609 self._on_response = on_response 

610 self._paused = False 

611 self._on_fatal_exception = on_fatal_exception 

612 self._wake = threading.Condition() 

613 self._thread = None 

614 self._operational_lock = threading.Lock() 

615 

616 def _on_call_done(self, future): 

617 # Resume the thread if it's paused, this prevents blocking forever 

618 # when the RPC has terminated. 

619 self.resume() 

620 

621 def _thread_main(self, ready): 

622 try: 

623 ready.set() 

624 self._bidi_rpc.add_done_callback(self._on_call_done) 

625 self._bidi_rpc.open() 

626 

627 while self._bidi_rpc.is_active: 

628 # Do not allow the paused status to change at all during this 

629 # section. There is a condition where we could be resumed 

630 # between checking if we are paused and calling wake.wait(), 

631 # which means that we will miss the notification to wake up 

632 # (oops!) and wait for a notification that will never come. 

633 # Keeping the lock throughout avoids that. 

634 # In the future, we could use `Condition.wait_for` if we drop 

635 # Python 2.7. 

636 # See: https://github.com/googleapis/python-api-core/issues/211 

637 with self._wake: 

638 while self._paused: 

639 _LOGGER.debug("paused, waiting for waking.") 

640 self._wake.wait() 

641 _LOGGER.debug("woken.") 

642 

643 _LOGGER.debug("waiting for recv.") 

644 response = self._bidi_rpc.recv() 

645 _LOGGER.debug("recved response.") 

646 if self._on_response is not None: 

647 self._on_response(response) 

648 

649 except exceptions.GoogleAPICallError as exc: 

650 _LOGGER.debug( 

651 "%s caught error %s and will exit. Generally this is due to " 

652 "the RPC itself being cancelled and the error will be " 

653 "surfaced to the calling code.", 

654 _BIDIRECTIONAL_CONSUMER_NAME, 

655 exc, 

656 exc_info=True, 

657 ) 

658 if self._on_fatal_exception is not None: 

659 self._on_fatal_exception(exc) 

660 

661 except Exception as exc: 

662 _LOGGER.exception( 

663 "%s caught unexpected exception %s and will exit.", 

664 _BIDIRECTIONAL_CONSUMER_NAME, 

665 exc, 

666 ) 

667 if self._on_fatal_exception is not None: 

668 self._on_fatal_exception(exc) 

669 

670 _LOGGER.info("%s exiting", _BIDIRECTIONAL_CONSUMER_NAME) 

671 

672 def start(self): 

673 """Start the background thread and begin consuming the thread.""" 

674 with self._operational_lock: 

675 ready = threading.Event() 

676 thread = threading.Thread( 

677 name=_BIDIRECTIONAL_CONSUMER_NAME, 

678 target=self._thread_main, 

679 args=(ready,), 

680 daemon=True, 

681 ) 

682 thread.start() 

683 # Other parts of the code rely on `thread.is_alive` which 

684 # isn't sufficient to know if a thread is active, just that it may 

685 # soon be active. This can cause races. Further protect 

686 # against races by using a ready event and wait on it to be set. 

687 ready.wait() 

688 self._thread = thread 

689 _LOGGER.debug("Started helper thread %s", thread.name) 

690 

691 def stop(self): 

692 """Stop consuming the stream and shutdown the background thread. 

693 

694 NOTE: Cannot be called within `_thread_main`, since it is not 

695 possible to join a thread to itself. 

696 """ 

697 with self._operational_lock: 

698 self._bidi_rpc.close() 

699 

700 if self._thread is not None: 

701 # Resume the thread to wake it up in case it is sleeping. 

702 self.resume() 

703 # The daemonized thread may itself block, so don't wait 

704 # for it longer than a second. 

705 self._thread.join(1.0) 

706 if self._thread.is_alive(): # pragma: NO COVER 

707 _LOGGER.warning("Background thread did not exit.") 

708 

709 self._thread = None 

710 self._on_response = None 

711 self._on_fatal_exception = None 

712 

713 @property 

714 def is_active(self): 

715 """bool: True if the background thread is active.""" 

716 return self._thread is not None and self._thread.is_alive() 

717 

718 def pause(self): 

719 """Pauses the response stream. 

720 

721 This does *not* pause the request stream. 

722 """ 

723 with self._wake: 

724 self._paused = True 

725 

726 def resume(self): 

727 """Resumes the response stream.""" 

728 with self._wake: 

729 self._paused = False 

730 self._wake.notify_all() 

731 

732 @property 

733 def is_paused(self): 

734 """bool: True if the response stream is paused.""" 

735 return self._paused