Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/api_core/bidi.py: 22%

262 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-09 06:27 +0000

1# Copyright 2017, Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# https://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Bi-directional streaming RPC helpers.""" 

16 

17import collections 

18import datetime 

19import logging 

20import queue as queue_module 

21import threading 

22import time 

23 

24from google.api_core import exceptions 

25 

26_LOGGER = logging.getLogger(__name__) 

27_BIDIRECTIONAL_CONSUMER_NAME = "Thread-ConsumeBidirectionalStream" 

28 

29 

30class _RequestQueueGenerator(object): 

31 """A helper for sending requests to a gRPC stream from a Queue. 

32 

33 This generator takes requests off a given queue and yields them to gRPC. 

34 

35 This helper is useful when you have an indeterminate, indefinite, or 

36 otherwise open-ended set of requests to send through a request-streaming 

37 (or bidirectional) RPC. 

38 

39 The reason this is necessary is because gRPC takes an iterator as the 

40 request for request-streaming RPCs. gRPC consumes this iterator in another 

41 thread to allow it to block while generating requests for the stream. 

42 However, if the generator blocks indefinitely gRPC will not be able to 

43 clean up the thread as it'll be blocked on `next(iterator)` and not be able 

44 to check the channel status to stop iterating. This helper mitigates that 

45 by waiting on the queue with a timeout and checking the RPC state before 

46 yielding. 

47 

48 Finally, it allows for retrying without swapping queues because if it does 

49 pull an item off the queue when the RPC is inactive, it'll immediately put 

50 it back and then exit. This is necessary because yielding the item in this 

51 case will cause gRPC to discard it. In practice, this means that the order 

52 of messages is not guaranteed. If such a thing is necessary it would be 

53 easy to use a priority queue. 

54 

55 Example:: 

56 

57 requests = request_queue_generator(q) 

58 call = stub.StreamingRequest(iter(requests)) 

59 requests.call = call 

60 

61 for response in call: 

62 print(response) 

63 q.put(...) 

64 

65 Note that it is possible to accomplish this behavior without "spinning" 

66 (using a queue timeout). One possible way would be to use more threads to 

67 multiplex the grpc end event with the queue, another possible way is to 

68 use selectors and a custom event/queue object. Both of these approaches 

69 are significant from an engineering perspective for small benefit - the 

70 CPU consumed by spinning is pretty minuscule. 

71 

72 Args: 

73 queue (queue_module.Queue): The request queue. 

74 period (float): The number of seconds to wait for items from the queue 

75 before checking if the RPC is cancelled. In practice, this 

76 determines the maximum amount of time the request consumption 

77 thread will live after the RPC is cancelled. 

78 initial_request (Union[protobuf.Message, 

79 Callable[None, protobuf.Message]]): The initial request to 

80 yield. This is done independently of the request queue to allow fo 

81 easily restarting streams that require some initial configuration 

82 request. 

83 """ 

84 

85 def __init__(self, queue, period=1, initial_request=None): 

86 self._queue = queue 

87 self._period = period 

88 self._initial_request = initial_request 

89 self.call = None 

90 

91 def _is_active(self): 

92 # Note: there is a possibility that this starts *before* the call 

93 # property is set. So we have to check if self.call is set before 

94 # seeing if it's active. We need to return True if self.call is None. 

95 # See https://github.com/googleapis/python-api-core/issues/560. 

96 return self.call is None or self.call.is_active() 

97 

98 def __iter__(self): 

99 if self._initial_request is not None: 

100 if callable(self._initial_request): 

101 yield self._initial_request() 

102 else: 

103 yield self._initial_request 

104 

105 while True: 

106 try: 

107 item = self._queue.get(timeout=self._period) 

108 except queue_module.Empty: 

109 if not self._is_active(): 

110 _LOGGER.debug( 

111 "Empty queue and inactive call, exiting request " "generator." 

112 ) 

113 return 

114 else: 

115 # call is still active, keep waiting for queue items. 

116 continue 

117 

118 # The consumer explicitly sent "None", indicating that the request 

119 # should end. 

120 if item is None: 

121 _LOGGER.debug("Cleanly exiting request generator.") 

122 return 

123 

124 if not self._is_active(): 

125 # We have an item, but the call is closed. We should put the 

126 # item back on the queue so that the next call can consume it. 

127 self._queue.put(item) 

128 _LOGGER.debug( 

129 "Inactive call, replacing item on queue and exiting " 

130 "request generator." 

131 ) 

132 return 

133 

134 yield item 

135 

136 

137class _Throttle(object): 

138 """A context manager limiting the total entries in a sliding time window. 

139 

140 If more than ``access_limit`` attempts are made to enter the context manager 

141 instance in the last ``time window`` interval, the exceeding requests block 

142 until enough time elapses. 

143 

144 The context manager instances are thread-safe and can be shared between 

145 multiple threads. If multiple requests are blocked and waiting to enter, 

146 the exact order in which they are allowed to proceed is not determined. 

147 

148 Example:: 

149 

150 max_three_per_second = _Throttle( 

151 access_limit=3, time_window=datetime.timedelta(seconds=1) 

152 ) 

153 

154 for i in range(5): 

155 with max_three_per_second as time_waited: 

156 print("{}: Waited {} seconds to enter".format(i, time_waited)) 

157 

158 Args: 

159 access_limit (int): the maximum number of entries allowed in the time window 

160 time_window (datetime.timedelta): the width of the sliding time window 

161 """ 

162 

163 def __init__(self, access_limit, time_window): 

164 if access_limit < 1: 

165 raise ValueError("access_limit argument must be positive") 

166 

167 if time_window <= datetime.timedelta(0): 

168 raise ValueError("time_window argument must be a positive timedelta") 

169 

170 self._time_window = time_window 

171 self._access_limit = access_limit 

172 self._past_entries = collections.deque( 

173 maxlen=access_limit 

174 ) # least recent first 

175 self._entry_lock = threading.Lock() 

176 

177 def __enter__(self): 

178 with self._entry_lock: 

179 cutoff_time = datetime.datetime.now() - self._time_window 

180 

181 # drop the entries that are too old, as they are no longer relevant 

182 while self._past_entries and self._past_entries[0] < cutoff_time: 

183 self._past_entries.popleft() 

184 

185 if len(self._past_entries) < self._access_limit: 

186 self._past_entries.append(datetime.datetime.now()) 

187 return 0.0 # no waiting was needed 

188 

189 to_wait = (self._past_entries[0] - cutoff_time).total_seconds() 

190 time.sleep(to_wait) 

191 

192 self._past_entries.append(datetime.datetime.now()) 

193 return to_wait 

194 

195 def __exit__(self, *_): 

196 pass 

197 

198 def __repr__(self): 

199 return "{}(access_limit={}, time_window={})".format( 

200 self.__class__.__name__, self._access_limit, repr(self._time_window) 

201 ) 

202 

203 

204class BidiRpc(object): 

205 """A helper for consuming a bi-directional streaming RPC. 

206 

207 This maps gRPC's built-in interface which uses a request iterator and a 

208 response iterator into a socket-like :func:`send` and :func:`recv`. This 

209 is a more useful pattern for long-running or asymmetric streams (streams 

210 where there is not a direct correlation between the requests and 

211 responses). 

212 

213 Example:: 

214 

215 initial_request = example_pb2.StreamingRpcRequest( 

216 setting='example') 

217 rpc = BidiRpc( 

218 stub.StreamingRpc, 

219 initial_request=initial_request, 

220 metadata=[('name', 'value')] 

221 ) 

222 

223 rpc.open() 

224 

225 while rpc.is_active(): 

226 print(rpc.recv()) 

227 rpc.send(example_pb2.StreamingRpcRequest( 

228 data='example')) 

229 

230 This does *not* retry the stream on errors. See :class:`ResumableBidiRpc`. 

231 

232 Args: 

233 start_rpc (grpc.StreamStreamMultiCallable): The gRPC method used to 

234 start the RPC. 

235 initial_request (Union[protobuf.Message, 

236 Callable[None, protobuf.Message]]): The initial request to 

237 yield. This is useful if an initial request is needed to start the 

238 stream. 

239 metadata (Sequence[Tuple(str, str)]): RPC metadata to include in 

240 the request. 

241 """ 

242 

243 def __init__(self, start_rpc, initial_request=None, metadata=None): 

244 self._start_rpc = start_rpc 

245 self._initial_request = initial_request 

246 self._rpc_metadata = metadata 

247 self._request_queue = queue_module.Queue() 

248 self._request_generator = None 

249 self._is_active = False 

250 self._callbacks = [] 

251 self.call = None 

252 

253 def add_done_callback(self, callback): 

254 """Adds a callback that will be called when the RPC terminates. 

255 

256 This occurs when the RPC errors or is successfully terminated. 

257 

258 Args: 

259 callback (Callable[[grpc.Future], None]): The callback to execute. 

260 It will be provided with the same gRPC future as the underlying 

261 stream which will also be a :class:`grpc.Call`. 

262 """ 

263 self._callbacks.append(callback) 

264 

265 def _on_call_done(self, future): 

266 # This occurs when the RPC errors or is successfully terminated. 

267 # Note that grpc's "future" here can also be a grpc.RpcError. 

268 # See note in https://github.com/grpc/grpc/issues/10885#issuecomment-302651331 

269 # that `grpc.RpcError` is also `grpc.call`. 

270 for callback in self._callbacks: 

271 callback(future) 

272 

273 def open(self): 

274 """Opens the stream.""" 

275 if self.is_active: 

276 raise ValueError("Can not open an already open stream.") 

277 

278 request_generator = _RequestQueueGenerator( 

279 self._request_queue, initial_request=self._initial_request 

280 ) 

281 try: 

282 call = self._start_rpc(iter(request_generator), metadata=self._rpc_metadata) 

283 except exceptions.GoogleAPICallError as exc: 

284 # The original `grpc.RpcError` (which is usually also a `grpc.Call`) is 

285 # available from the ``response`` property on the mapped exception. 

286 self._on_call_done(exc.response) 

287 raise 

288 

289 request_generator.call = call 

290 

291 # TODO: api_core should expose the future interface for wrapped 

292 # callables as well. 

293 if hasattr(call, "_wrapped"): # pragma: NO COVER 

294 call._wrapped.add_done_callback(self._on_call_done) 

295 else: 

296 call.add_done_callback(self._on_call_done) 

297 

298 self._request_generator = request_generator 

299 self.call = call 

300 

301 def close(self): 

302 """Closes the stream.""" 

303 if self.call is None: 

304 return 

305 

306 self._request_queue.put(None) 

307 self.call.cancel() 

308 self._request_generator = None 

309 # Don't set self.call to None. Keep it around so that send/recv can 

310 # raise the error. 

311 

312 def send(self, request): 

313 """Queue a message to be sent on the stream. 

314 

315 Send is non-blocking. 

316 

317 If the underlying RPC has been closed, this will raise. 

318 

319 Args: 

320 request (protobuf.Message): The request to send. 

321 """ 

322 if self.call is None: 

323 raise ValueError("Can not send() on an RPC that has never been open()ed.") 

324 

325 # Don't use self.is_active(), as ResumableBidiRpc will overload it 

326 # to mean something semantically different. 

327 if self.call.is_active(): 

328 self._request_queue.put(request) 

329 else: 

330 # calling next should cause the call to raise. 

331 next(self.call) 

332 

333 def recv(self): 

334 """Wait for a message to be returned from the stream. 

335 

336 Recv is blocking. 

337 

338 If the underlying RPC has been closed, this will raise. 

339 

340 Returns: 

341 protobuf.Message: The received message. 

342 """ 

343 if self.call is None: 

344 raise ValueError("Can not recv() on an RPC that has never been open()ed.") 

345 

346 return next(self.call) 

347 

348 @property 

349 def is_active(self): 

350 """bool: True if this stream is currently open and active.""" 

351 return self.call is not None and self.call.is_active() 

352 

353 @property 

354 def pending_requests(self): 

355 """int: Returns an estimate of the number of queued requests.""" 

356 return self._request_queue.qsize() 

357 

358 

359def _never_terminate(future_or_error): 

360 """By default, no errors cause BiDi termination.""" 

361 return False 

362 

363 

364class ResumableBidiRpc(BidiRpc): 

365 """A :class:`BidiRpc` that can automatically resume the stream on errors. 

366 

367 It uses the ``should_recover`` arg to determine if it should re-establish 

368 the stream on error. 

369 

370 Example:: 

371 

372 def should_recover(exc): 

373 return ( 

374 isinstance(exc, grpc.RpcError) and 

375 exc.code() == grpc.StatusCode.UNAVAILABLE) 

376 

377 initial_request = example_pb2.StreamingRpcRequest( 

378 setting='example') 

379 

380 metadata = [('header_name', 'value')] 

381 

382 rpc = ResumableBidiRpc( 

383 stub.StreamingRpc, 

384 should_recover=should_recover, 

385 initial_request=initial_request, 

386 metadata=metadata 

387 ) 

388 

389 rpc.open() 

390 

391 while rpc.is_active(): 

392 print(rpc.recv()) 

393 rpc.send(example_pb2.StreamingRpcRequest( 

394 data='example')) 

395 

396 Args: 

397 start_rpc (grpc.StreamStreamMultiCallable): The gRPC method used to 

398 start the RPC. 

399 initial_request (Union[protobuf.Message, 

400 Callable[None, protobuf.Message]]): The initial request to 

401 yield. This is useful if an initial request is needed to start the 

402 stream. 

403 should_recover (Callable[[Exception], bool]): A function that returns 

404 True if the stream should be recovered. This will be called 

405 whenever an error is encountered on the stream. 

406 should_terminate (Callable[[Exception], bool]): A function that returns 

407 True if the stream should be terminated. This will be called 

408 whenever an error is encountered on the stream. 

409 metadata Sequence[Tuple(str, str)]: RPC metadata to include in 

410 the request. 

411 throttle_reopen (bool): If ``True``, throttling will be applied to 

412 stream reopen calls. Defaults to ``False``. 

413 """ 

414 

415 def __init__( 

416 self, 

417 start_rpc, 

418 should_recover, 

419 should_terminate=_never_terminate, 

420 initial_request=None, 

421 metadata=None, 

422 throttle_reopen=False, 

423 ): 

424 super(ResumableBidiRpc, self).__init__(start_rpc, initial_request, metadata) 

425 self._should_recover = should_recover 

426 self._should_terminate = should_terminate 

427 self._operational_lock = threading.RLock() 

428 self._finalized = False 

429 self._finalize_lock = threading.Lock() 

430 

431 if throttle_reopen: 

432 self._reopen_throttle = _Throttle( 

433 access_limit=5, time_window=datetime.timedelta(seconds=10) 

434 ) 

435 else: 

436 self._reopen_throttle = None 

437 

438 def _finalize(self, result): 

439 with self._finalize_lock: 

440 if self._finalized: 

441 return 

442 

443 for callback in self._callbacks: 

444 callback(result) 

445 

446 self._finalized = True 

447 

448 def _on_call_done(self, future): 

449 # Unlike the base class, we only execute the callbacks on a terminal 

450 # error, not for errors that we can recover from. Note that grpc's 

451 # "future" here is also a grpc.RpcError. 

452 with self._operational_lock: 

453 if self._should_terminate(future): 

454 self._finalize(future) 

455 elif not self._should_recover(future): 

456 self._finalize(future) 

457 else: 

458 _LOGGER.debug("Re-opening stream from gRPC callback.") 

459 self._reopen() 

460 

461 def _reopen(self): 

462 with self._operational_lock: 

463 # Another thread already managed to re-open this stream. 

464 if self.call is not None and self.call.is_active(): 

465 _LOGGER.debug("Stream was already re-established.") 

466 return 

467 

468 self.call = None 

469 # Request generator should exit cleanly since the RPC its bound to 

470 # has exited. 

471 self._request_generator = None 

472 

473 # Note: we do not currently do any sort of backoff here. The 

474 # assumption is that re-establishing the stream under normal 

475 # circumstances will happen in intervals greater than 60s. 

476 # However, it is possible in a degenerative case that the server 

477 # closes the stream rapidly which would lead to thrashing here, 

478 # but hopefully in those cases the server would return a non- 

479 # retryable error. 

480 

481 try: 

482 if self._reopen_throttle: 

483 with self._reopen_throttle: 

484 self.open() 

485 else: 

486 self.open() 

487 # If re-opening or re-calling the method fails for any reason, 

488 # consider it a terminal error and finalize the stream. 

489 except Exception as exc: 

490 _LOGGER.debug("Failed to re-open stream due to %s", exc) 

491 self._finalize(exc) 

492 raise 

493 

494 _LOGGER.info("Re-established stream") 

495 

496 def _recoverable(self, method, *args, **kwargs): 

497 """Wraps a method to recover the stream and retry on error. 

498 

499 If a retryable error occurs while making the call, then the stream will 

500 be re-opened and the method will be retried. This happens indefinitely 

501 so long as the error is a retryable one. If an error occurs while 

502 re-opening the stream, then this method will raise immediately and 

503 trigger finalization of this object. 

504 

505 Args: 

506 method (Callable[..., Any]): The method to call. 

507 args: The args to pass to the method. 

508 kwargs: The kwargs to pass to the method. 

509 """ 

510 while True: 

511 try: 

512 return method(*args, **kwargs) 

513 

514 except Exception as exc: 

515 with self._operational_lock: 

516 _LOGGER.debug("Call to retryable %r caused %s.", method, exc) 

517 

518 if self._should_terminate(exc): 

519 self.close() 

520 _LOGGER.debug("Terminating %r due to %s.", method, exc) 

521 self._finalize(exc) 

522 break 

523 

524 if not self._should_recover(exc): 

525 self.close() 

526 _LOGGER.debug("Not retrying %r due to %s.", method, exc) 

527 self._finalize(exc) 

528 raise exc 

529 

530 _LOGGER.debug("Re-opening stream from retryable %r.", method) 

531 self._reopen() 

532 

533 def _send(self, request): 

534 # Grab a reference to the RPC call. Because another thread (notably 

535 # the gRPC error thread) can modify self.call (by invoking reopen), 

536 # we should ensure our reference can not change underneath us. 

537 # If self.call is modified (such as replaced with a new RPC call) then 

538 # this will use the "old" RPC, which should result in the same 

539 # exception passed into gRPC's error handler being raised here, which 

540 # will be handled by the usual error handling in retryable. 

541 with self._operational_lock: 

542 call = self.call 

543 

544 if call is None: 

545 raise ValueError("Can not send() on an RPC that has never been open()ed.") 

546 

547 # Don't use self.is_active(), as ResumableBidiRpc will overload it 

548 # to mean something semantically different. 

549 if call.is_active(): 

550 self._request_queue.put(request) 

551 pass 

552 else: 

553 # calling next should cause the call to raise. 

554 next(call) 

555 

556 def send(self, request): 

557 return self._recoverable(self._send, request) 

558 

559 def _recv(self): 

560 with self._operational_lock: 

561 call = self.call 

562 

563 if call is None: 

564 raise ValueError("Can not recv() on an RPC that has never been open()ed.") 

565 

566 return next(call) 

567 

568 def recv(self): 

569 return self._recoverable(self._recv) 

570 

571 def close(self): 

572 self._finalize(None) 

573 super(ResumableBidiRpc, self).close() 

574 

575 @property 

576 def is_active(self): 

577 """bool: True if this stream is currently open and active.""" 

578 # Use the operational lock. It's entirely possible for something 

579 # to check the active state *while* the RPC is being retried. 

580 # Also, use finalized to track the actual terminal state here. 

581 # This is because if the stream is re-established by the gRPC thread 

582 # it's technically possible to check this between when gRPC marks the 

583 # RPC as inactive and when gRPC executes our callback that re-opens 

584 # the stream. 

585 with self._operational_lock: 

586 return self.call is not None and not self._finalized 

587 

588 

589class BackgroundConsumer(object): 

590 """A bi-directional stream consumer that runs in a separate thread. 

591 

592 This maps the consumption of a stream into a callback-based model. It also 

593 provides :func:`pause` and :func:`resume` to allow for flow-control. 

594 

595 Example:: 

596 

597 def should_recover(exc): 

598 return ( 

599 isinstance(exc, grpc.RpcError) and 

600 exc.code() == grpc.StatusCode.UNAVAILABLE) 

601 

602 initial_request = example_pb2.StreamingRpcRequest( 

603 setting='example') 

604 

605 rpc = ResumeableBidiRpc( 

606 stub.StreamingRpc, 

607 initial_request=initial_request, 

608 should_recover=should_recover) 

609 

610 def on_response(response): 

611 print(response) 

612 

613 consumer = BackgroundConsumer(rpc, on_response) 

614 consumer.start() 

615 

616 Note that error handling *must* be done by using the provided 

617 ``bidi_rpc``'s ``add_done_callback``. This helper will automatically exit 

618 whenever the RPC itself exits and will not provide any error details. 

619 

620 Args: 

621 bidi_rpc (BidiRpc): The RPC to consume. Should not have been 

622 ``open()``ed yet. 

623 on_response (Callable[[protobuf.Message], None]): The callback to 

624 be called for every response on the stream. 

625 """ 

626 

627 def __init__(self, bidi_rpc, on_response): 

628 self._bidi_rpc = bidi_rpc 

629 self._on_response = on_response 

630 self._paused = False 

631 self._wake = threading.Condition() 

632 self._thread = None 

633 self._operational_lock = threading.Lock() 

634 

635 def _on_call_done(self, future): 

636 # Resume the thread if it's paused, this prevents blocking forever 

637 # when the RPC has terminated. 

638 self.resume() 

639 

640 def _thread_main(self, ready): 

641 try: 

642 ready.set() 

643 self._bidi_rpc.add_done_callback(self._on_call_done) 

644 self._bidi_rpc.open() 

645 

646 while self._bidi_rpc.is_active: 

647 # Do not allow the paused status to change at all during this 

648 # section. There is a condition where we could be resumed 

649 # between checking if we are paused and calling wake.wait(), 

650 # which means that we will miss the notification to wake up 

651 # (oops!) and wait for a notification that will never come. 

652 # Keeping the lock throughout avoids that. 

653 # In the future, we could use `Condition.wait_for` if we drop 

654 # Python 2.7. 

655 # See: https://github.com/googleapis/python-api-core/issues/211 

656 with self._wake: 

657 while self._paused: 

658 _LOGGER.debug("paused, waiting for waking.") 

659 self._wake.wait() 

660 _LOGGER.debug("woken.") 

661 

662 _LOGGER.debug("waiting for recv.") 

663 response = self._bidi_rpc.recv() 

664 _LOGGER.debug("recved response.") 

665 self._on_response(response) 

666 

667 except exceptions.GoogleAPICallError as exc: 

668 _LOGGER.debug( 

669 "%s caught error %s and will exit. Generally this is due to " 

670 "the RPC itself being cancelled and the error will be " 

671 "surfaced to the calling code.", 

672 _BIDIRECTIONAL_CONSUMER_NAME, 

673 exc, 

674 exc_info=True, 

675 ) 

676 

677 except Exception as exc: 

678 _LOGGER.exception( 

679 "%s caught unexpected exception %s and will exit.", 

680 _BIDIRECTIONAL_CONSUMER_NAME, 

681 exc, 

682 ) 

683 

684 _LOGGER.info("%s exiting", _BIDIRECTIONAL_CONSUMER_NAME) 

685 

686 def start(self): 

687 """Start the background thread and begin consuming the thread.""" 

688 with self._operational_lock: 

689 ready = threading.Event() 

690 thread = threading.Thread( 

691 name=_BIDIRECTIONAL_CONSUMER_NAME, 

692 target=self._thread_main, 

693 args=(ready,), 

694 ) 

695 thread.daemon = True 

696 thread.start() 

697 # Other parts of the code rely on `thread.is_alive` which 

698 # isn't sufficient to know if a thread is active, just that it may 

699 # soon be active. This can cause races. Further protect 

700 # against races by using a ready event and wait on it to be set. 

701 ready.wait() 

702 self._thread = thread 

703 _LOGGER.debug("Started helper thread %s", thread.name) 

704 

705 def stop(self): 

706 """Stop consuming the stream and shutdown the background thread.""" 

707 with self._operational_lock: 

708 self._bidi_rpc.close() 

709 

710 if self._thread is not None: 

711 # Resume the thread to wake it up in case it is sleeping. 

712 self.resume() 

713 # The daemonized thread may itself block, so don't wait 

714 # for it longer than a second. 

715 self._thread.join(1.0) 

716 if self._thread.is_alive(): # pragma: NO COVER 

717 _LOGGER.warning("Background thread did not exit.") 

718 

719 self._thread = None 

720 

721 @property 

722 def is_active(self): 

723 """bool: True if the background thread is active.""" 

724 return self._thread is not None and self._thread.is_alive() 

725 

726 def pause(self): 

727 """Pauses the response stream. 

728 

729 This does *not* pause the request stream. 

730 """ 

731 with self._wake: 

732 self._paused = True 

733 

734 def resume(self): 

735 """Resumes the response stream.""" 

736 with self._wake: 

737 self._paused = False 

738 self._wake.notify_all() 

739 

740 @property 

741 def is_paused(self): 

742 """bool: True if the response stream is paused.""" 

743 return self._paused