Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/api_core/bidi.py: 21%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

274 statements  

1# Copyright 2017, Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# https://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Bi-directional streaming RPC helpers.""" 

16 

17import collections 

18import datetime 

19import logging 

20import queue as queue_module 

21import threading 

22import time 

23 

24from google.api_core import exceptions 

25 

26_LOGGER = logging.getLogger(__name__) 

27_BIDIRECTIONAL_CONSUMER_NAME = "Thread-ConsumeBidirectionalStream" 

28 

29 

30class _RequestQueueGenerator(object): 

31 """A helper for sending requests to a gRPC stream from a Queue. 

32 

33 This generator takes requests off a given queue and yields them to gRPC. 

34 

35 This helper is useful when you have an indeterminate, indefinite, or 

36 otherwise open-ended set of requests to send through a request-streaming 

37 (or bidirectional) RPC. 

38 

39 The reason this is necessary is because gRPC takes an iterator as the 

40 request for request-streaming RPCs. gRPC consumes this iterator in another 

41 thread to allow it to block while generating requests for the stream. 

42 However, if the generator blocks indefinitely gRPC will not be able to 

43 clean up the thread as it'll be blocked on `next(iterator)` and not be able 

44 to check the channel status to stop iterating. This helper mitigates that 

45 by waiting on the queue with a timeout and checking the RPC state before 

46 yielding. 

47 

48 Finally, it allows for retrying without swapping queues because if it does 

49 pull an item off the queue when the RPC is inactive, it'll immediately put 

50 it back and then exit. This is necessary because yielding the item in this 

51 case will cause gRPC to discard it. In practice, this means that the order 

52 of messages is not guaranteed. If such a thing is necessary it would be 

53 easy to use a priority queue. 

54 

55 Example:: 

56 

57 requests = request_queue_generator(q) 

58 call = stub.StreamingRequest(iter(requests)) 

59 requests.call = call 

60 

61 for response in call: 

62 print(response) 

63 q.put(...) 

64 

65 Note that it is possible to accomplish this behavior without "spinning" 

66 (using a queue timeout). One possible way would be to use more threads to 

67 multiplex the grpc end event with the queue, another possible way is to 

68 use selectors and a custom event/queue object. Both of these approaches 

69 are significant from an engineering perspective for small benefit - the 

70 CPU consumed by spinning is pretty minuscule. 

71 

72 Args: 

73 queue (queue_module.Queue): The request queue. 

74 period (float): The number of seconds to wait for items from the queue 

75 before checking if the RPC is cancelled. In practice, this 

76 determines the maximum amount of time the request consumption 

77 thread will live after the RPC is cancelled. 

78 initial_request (Union[protobuf.Message, 

79 Callable[None, protobuf.Message]]): The initial request to 

80 yield. This is done independently of the request queue to allow fo 

81 easily restarting streams that require some initial configuration 

82 request. 

83 """ 

84 

85 def __init__(self, queue, period=1, initial_request=None): 

86 self._queue = queue 

87 self._period = period 

88 self._initial_request = initial_request 

89 self.call = None 

90 

91 def _is_active(self): 

92 # Note: there is a possibility that this starts *before* the call 

93 # property is set. So we have to check if self.call is set before 

94 # seeing if it's active. We need to return True if self.call is None. 

95 # See https://github.com/googleapis/python-api-core/issues/560. 

96 return self.call is None or self.call.is_active() 

97 

98 def __iter__(self): 

99 if self._initial_request is not None: 

100 if callable(self._initial_request): 

101 yield self._initial_request() 

102 else: 

103 yield self._initial_request 

104 

105 while True: 

106 try: 

107 item = self._queue.get(timeout=self._period) 

108 except queue_module.Empty: 

109 if not self._is_active(): 

110 _LOGGER.debug( 

111 "Empty queue and inactive call, exiting request " "generator." 

112 ) 

113 return 

114 else: 

115 # call is still active, keep waiting for queue items. 

116 continue 

117 

118 # The consumer explicitly sent "None", indicating that the request 

119 # should end. 

120 if item is None: 

121 _LOGGER.debug("Cleanly exiting request generator.") 

122 return 

123 

124 if not self._is_active(): 

125 # We have an item, but the call is closed. We should put the 

126 # item back on the queue so that the next call can consume it. 

127 self._queue.put(item) 

128 _LOGGER.debug( 

129 "Inactive call, replacing item on queue and exiting " 

130 "request generator." 

131 ) 

132 return 

133 

134 yield item 

135 

136 

137class _Throttle(object): 

138 """A context manager limiting the total entries in a sliding time window. 

139 

140 If more than ``access_limit`` attempts are made to enter the context manager 

141 instance in the last ``time window`` interval, the exceeding requests block 

142 until enough time elapses. 

143 

144 The context manager instances are thread-safe and can be shared between 

145 multiple threads. If multiple requests are blocked and waiting to enter, 

146 the exact order in which they are allowed to proceed is not determined. 

147 

148 Example:: 

149 

150 max_three_per_second = _Throttle( 

151 access_limit=3, time_window=datetime.timedelta(seconds=1) 

152 ) 

153 

154 for i in range(5): 

155 with max_three_per_second as time_waited: 

156 print("{}: Waited {} seconds to enter".format(i, time_waited)) 

157 

158 Args: 

159 access_limit (int): the maximum number of entries allowed in the time window 

160 time_window (datetime.timedelta): the width of the sliding time window 

161 """ 

162 

163 def __init__(self, access_limit, time_window): 

164 if access_limit < 1: 

165 raise ValueError("access_limit argument must be positive") 

166 

167 if time_window <= datetime.timedelta(0): 

168 raise ValueError("time_window argument must be a positive timedelta") 

169 

170 self._time_window = time_window 

171 self._access_limit = access_limit 

172 self._past_entries = collections.deque( 

173 maxlen=access_limit 

174 ) # least recent first 

175 self._entry_lock = threading.Lock() 

176 

177 def __enter__(self): 

178 with self._entry_lock: 

179 cutoff_time = datetime.datetime.now() - self._time_window 

180 

181 # drop the entries that are too old, as they are no longer relevant 

182 while self._past_entries and self._past_entries[0] < cutoff_time: 

183 self._past_entries.popleft() 

184 

185 if len(self._past_entries) < self._access_limit: 

186 self._past_entries.append(datetime.datetime.now()) 

187 return 0.0 # no waiting was needed 

188 

189 to_wait = (self._past_entries[0] - cutoff_time).total_seconds() 

190 time.sleep(to_wait) 

191 

192 self._past_entries.append(datetime.datetime.now()) 

193 return to_wait 

194 

195 def __exit__(self, *_): 

196 pass 

197 

198 def __repr__(self): 

199 return "{}(access_limit={}, time_window={})".format( 

200 self.__class__.__name__, self._access_limit, repr(self._time_window) 

201 ) 

202 

203 

204class BidiRpc(object): 

205 """A helper for consuming a bi-directional streaming RPC. 

206 

207 This maps gRPC's built-in interface which uses a request iterator and a 

208 response iterator into a socket-like :func:`send` and :func:`recv`. This 

209 is a more useful pattern for long-running or asymmetric streams (streams 

210 where there is not a direct correlation between the requests and 

211 responses). 

212 

213 Example:: 

214 

215 initial_request = example_pb2.StreamingRpcRequest( 

216 setting='example') 

217 rpc = BidiRpc( 

218 stub.StreamingRpc, 

219 initial_request=initial_request, 

220 metadata=[('name', 'value')] 

221 ) 

222 

223 rpc.open() 

224 

225 while rpc.is_active(): 

226 print(rpc.recv()) 

227 rpc.send(example_pb2.StreamingRpcRequest( 

228 data='example')) 

229 

230 This does *not* retry the stream on errors. See :class:`ResumableBidiRpc`. 

231 

232 Args: 

233 start_rpc (grpc.StreamStreamMultiCallable): The gRPC method used to 

234 start the RPC. 

235 initial_request (Union[protobuf.Message, 

236 Callable[None, protobuf.Message]]): The initial request to 

237 yield. This is useful if an initial request is needed to start the 

238 stream. 

239 metadata (Sequence[Tuple(str, str)]): RPC metadata to include in 

240 the request. 

241 """ 

242 

243 def __init__(self, start_rpc, initial_request=None, metadata=None): 

244 self._start_rpc = start_rpc 

245 self._initial_request = initial_request 

246 self._rpc_metadata = metadata 

247 self._request_queue = queue_module.Queue() 

248 self._request_generator = None 

249 self._is_active = False 

250 self._callbacks = [] 

251 self.call = None 

252 

253 def add_done_callback(self, callback): 

254 """Adds a callback that will be called when the RPC terminates. 

255 

256 This occurs when the RPC errors or is successfully terminated. 

257 

258 Args: 

259 callback (Callable[[grpc.Future], None]): The callback to execute. 

260 It will be provided with the same gRPC future as the underlying 

261 stream which will also be a :class:`grpc.Call`. 

262 """ 

263 self._callbacks.append(callback) 

264 

265 def _on_call_done(self, future): 

266 # This occurs when the RPC errors or is successfully terminated. 

267 # Note that grpc's "future" here can also be a grpc.RpcError. 

268 # See note in https://github.com/grpc/grpc/issues/10885#issuecomment-302651331 

269 # that `grpc.RpcError` is also `grpc.call`. 

270 for callback in self._callbacks: 

271 callback(future) 

272 

273 def open(self): 

274 """Opens the stream.""" 

275 if self.is_active: 

276 raise ValueError("Can not open an already open stream.") 

277 

278 request_generator = _RequestQueueGenerator( 

279 self._request_queue, initial_request=self._initial_request 

280 ) 

281 try: 

282 call = self._start_rpc(iter(request_generator), metadata=self._rpc_metadata) 

283 except exceptions.GoogleAPICallError as exc: 

284 # The original `grpc.RpcError` (which is usually also a `grpc.Call`) is 

285 # available from the ``response`` property on the mapped exception. 

286 self._on_call_done(exc.response) 

287 raise 

288 

289 request_generator.call = call 

290 

291 # TODO: api_core should expose the future interface for wrapped 

292 # callables as well. 

293 if hasattr(call, "_wrapped"): # pragma: NO COVER 

294 call._wrapped.add_done_callback(self._on_call_done) 

295 else: 

296 call.add_done_callback(self._on_call_done) 

297 

298 self._request_generator = request_generator 

299 self.call = call 

300 

301 def close(self): 

302 """Closes the stream.""" 

303 if self.call is None: 

304 return 

305 

306 self._request_queue.put(None) 

307 self.call.cancel() 

308 self._request_generator = None 

309 self._initial_request = None 

310 self._callbacks = [] 

311 # Don't set self.call to None. Keep it around so that send/recv can 

312 # raise the error. 

313 

314 def send(self, request): 

315 """Queue a message to be sent on the stream. 

316 

317 Send is non-blocking. 

318 

319 If the underlying RPC has been closed, this will raise. 

320 

321 Args: 

322 request (protobuf.Message): The request to send. 

323 """ 

324 if self.call is None: 

325 raise ValueError("Can not send() on an RPC that has never been open()ed.") 

326 

327 # Don't use self.is_active(), as ResumableBidiRpc will overload it 

328 # to mean something semantically different. 

329 if self.call.is_active(): 

330 self._request_queue.put(request) 

331 else: 

332 # calling next should cause the call to raise. 

333 next(self.call) 

334 

335 def recv(self): 

336 """Wait for a message to be returned from the stream. 

337 

338 Recv is blocking. 

339 

340 If the underlying RPC has been closed, this will raise. 

341 

342 Returns: 

343 protobuf.Message: The received message. 

344 """ 

345 if self.call is None: 

346 raise ValueError("Can not recv() on an RPC that has never been open()ed.") 

347 

348 return next(self.call) 

349 

350 @property 

351 def is_active(self): 

352 """bool: True if this stream is currently open and active.""" 

353 return self.call is not None and self.call.is_active() 

354 

355 @property 

356 def pending_requests(self): 

357 """int: Returns an estimate of the number of queued requests.""" 

358 return self._request_queue.qsize() 

359 

360 

361def _never_terminate(future_or_error): 

362 """By default, no errors cause BiDi termination.""" 

363 return False 

364 

365 

366class ResumableBidiRpc(BidiRpc): 

367 """A :class:`BidiRpc` that can automatically resume the stream on errors. 

368 

369 It uses the ``should_recover`` arg to determine if it should re-establish 

370 the stream on error. 

371 

372 Example:: 

373 

374 def should_recover(exc): 

375 return ( 

376 isinstance(exc, grpc.RpcError) and 

377 exc.code() == grpc.StatusCode.UNAVAILABLE) 

378 

379 initial_request = example_pb2.StreamingRpcRequest( 

380 setting='example') 

381 

382 metadata = [('header_name', 'value')] 

383 

384 rpc = ResumableBidiRpc( 

385 stub.StreamingRpc, 

386 should_recover=should_recover, 

387 initial_request=initial_request, 

388 metadata=metadata 

389 ) 

390 

391 rpc.open() 

392 

393 while rpc.is_active(): 

394 print(rpc.recv()) 

395 rpc.send(example_pb2.StreamingRpcRequest( 

396 data='example')) 

397 

398 Args: 

399 start_rpc (grpc.StreamStreamMultiCallable): The gRPC method used to 

400 start the RPC. 

401 initial_request (Union[protobuf.Message, 

402 Callable[None, protobuf.Message]]): The initial request to 

403 yield. This is useful if an initial request is needed to start the 

404 stream. 

405 should_recover (Callable[[Exception], bool]): A function that returns 

406 True if the stream should be recovered. This will be called 

407 whenever an error is encountered on the stream. 

408 should_terminate (Callable[[Exception], bool]): A function that returns 

409 True if the stream should be terminated. This will be called 

410 whenever an error is encountered on the stream. 

411 metadata Sequence[Tuple(str, str)]: RPC metadata to include in 

412 the request. 

413 throttle_reopen (bool): If ``True``, throttling will be applied to 

414 stream reopen calls. Defaults to ``False``. 

415 """ 

416 

417 def __init__( 

418 self, 

419 start_rpc, 

420 should_recover, 

421 should_terminate=_never_terminate, 

422 initial_request=None, 

423 metadata=None, 

424 throttle_reopen=False, 

425 ): 

426 super(ResumableBidiRpc, self).__init__(start_rpc, initial_request, metadata) 

427 self._should_recover = should_recover 

428 self._should_terminate = should_terminate 

429 self._operational_lock = threading.RLock() 

430 self._finalized = False 

431 self._finalize_lock = threading.Lock() 

432 

433 if throttle_reopen: 

434 self._reopen_throttle = _Throttle( 

435 access_limit=5, time_window=datetime.timedelta(seconds=10) 

436 ) 

437 else: 

438 self._reopen_throttle = None 

439 

440 def _finalize(self, result): 

441 with self._finalize_lock: 

442 if self._finalized: 

443 return 

444 

445 for callback in self._callbacks: 

446 callback(result) 

447 

448 self._finalized = True 

449 

450 def _on_call_done(self, future): 

451 # Unlike the base class, we only execute the callbacks on a terminal 

452 # error, not for errors that we can recover from. Note that grpc's 

453 # "future" here is also a grpc.RpcError. 

454 with self._operational_lock: 

455 if self._should_terminate(future): 

456 self._finalize(future) 

457 elif not self._should_recover(future): 

458 self._finalize(future) 

459 else: 

460 _LOGGER.debug("Re-opening stream from gRPC callback.") 

461 self._reopen() 

462 

463 def _reopen(self): 

464 with self._operational_lock: 

465 # Another thread already managed to re-open this stream. 

466 if self.call is not None and self.call.is_active(): 

467 _LOGGER.debug("Stream was already re-established.") 

468 return 

469 

470 self.call = None 

471 # Request generator should exit cleanly since the RPC its bound to 

472 # has exited. 

473 self._request_generator = None 

474 

475 # Note: we do not currently do any sort of backoff here. The 

476 # assumption is that re-establishing the stream under normal 

477 # circumstances will happen in intervals greater than 60s. 

478 # However, it is possible in a degenerative case that the server 

479 # closes the stream rapidly which would lead to thrashing here, 

480 # but hopefully in those cases the server would return a non- 

481 # retryable error. 

482 

483 try: 

484 if self._reopen_throttle: 

485 with self._reopen_throttle: 

486 self.open() 

487 else: 

488 self.open() 

489 # If re-opening or re-calling the method fails for any reason, 

490 # consider it a terminal error and finalize the stream. 

491 except Exception as exc: 

492 _LOGGER.debug("Failed to re-open stream due to %s", exc) 

493 self._finalize(exc) 

494 raise 

495 

496 _LOGGER.info("Re-established stream") 

497 

498 def _recoverable(self, method, *args, **kwargs): 

499 """Wraps a method to recover the stream and retry on error. 

500 

501 If a retryable error occurs while making the call, then the stream will 

502 be re-opened and the method will be retried. This happens indefinitely 

503 so long as the error is a retryable one. If an error occurs while 

504 re-opening the stream, then this method will raise immediately and 

505 trigger finalization of this object. 

506 

507 Args: 

508 method (Callable[..., Any]): The method to call. 

509 args: The args to pass to the method. 

510 kwargs: The kwargs to pass to the method. 

511 """ 

512 while True: 

513 try: 

514 return method(*args, **kwargs) 

515 

516 except Exception as exc: 

517 with self._operational_lock: 

518 _LOGGER.debug("Call to retryable %r caused %s.", method, exc) 

519 

520 if self._should_terminate(exc): 

521 self.close() 

522 _LOGGER.debug("Terminating %r due to %s.", method, exc) 

523 self._finalize(exc) 

524 break 

525 

526 if not self._should_recover(exc): 

527 self.close() 

528 _LOGGER.debug("Not retrying %r due to %s.", method, exc) 

529 self._finalize(exc) 

530 raise exc 

531 

532 _LOGGER.debug("Re-opening stream from retryable %r.", method) 

533 self._reopen() 

534 

535 def _send(self, request): 

536 # Grab a reference to the RPC call. Because another thread (notably 

537 # the gRPC error thread) can modify self.call (by invoking reopen), 

538 # we should ensure our reference can not change underneath us. 

539 # If self.call is modified (such as replaced with a new RPC call) then 

540 # this will use the "old" RPC, which should result in the same 

541 # exception passed into gRPC's error handler being raised here, which 

542 # will be handled by the usual error handling in retryable. 

543 with self._operational_lock: 

544 call = self.call 

545 

546 if call is None: 

547 raise ValueError("Can not send() on an RPC that has never been open()ed.") 

548 

549 # Don't use self.is_active(), as ResumableBidiRpc will overload it 

550 # to mean something semantically different. 

551 if call.is_active(): 

552 self._request_queue.put(request) 

553 pass 

554 else: 

555 # calling next should cause the call to raise. 

556 next(call) 

557 

558 def send(self, request): 

559 return self._recoverable(self._send, request) 

560 

561 def _recv(self): 

562 with self._operational_lock: 

563 call = self.call 

564 

565 if call is None: 

566 raise ValueError("Can not recv() on an RPC that has never been open()ed.") 

567 

568 return next(call) 

569 

570 def recv(self): 

571 return self._recoverable(self._recv) 

572 

573 def close(self): 

574 self._finalize(None) 

575 super(ResumableBidiRpc, self).close() 

576 

577 @property 

578 def is_active(self): 

579 """bool: True if this stream is currently open and active.""" 

580 # Use the operational lock. It's entirely possible for something 

581 # to check the active state *while* the RPC is being retried. 

582 # Also, use finalized to track the actual terminal state here. 

583 # This is because if the stream is re-established by the gRPC thread 

584 # it's technically possible to check this between when gRPC marks the 

585 # RPC as inactive and when gRPC executes our callback that re-opens 

586 # the stream. 

587 with self._operational_lock: 

588 return self.call is not None and not self._finalized 

589 

590 

591class BackgroundConsumer(object): 

592 """A bi-directional stream consumer that runs in a separate thread. 

593 

594 This maps the consumption of a stream into a callback-based model. It also 

595 provides :func:`pause` and :func:`resume` to allow for flow-control. 

596 

597 Example:: 

598 

599 def should_recover(exc): 

600 return ( 

601 isinstance(exc, grpc.RpcError) and 

602 exc.code() == grpc.StatusCode.UNAVAILABLE) 

603 

604 initial_request = example_pb2.StreamingRpcRequest( 

605 setting='example') 

606 

607 rpc = ResumeableBidiRpc( 

608 stub.StreamingRpc, 

609 initial_request=initial_request, 

610 should_recover=should_recover) 

611 

612 def on_response(response): 

613 print(response) 

614 

615 consumer = BackgroundConsumer(rpc, on_response) 

616 consumer.start() 

617 

618 Note that error handling *must* be done by using the provided 

619 ``bidi_rpc``'s ``add_done_callback``. This helper will automatically exit 

620 whenever the RPC itself exits and will not provide any error details. 

621 

622 Args: 

623 bidi_rpc (BidiRpc): The RPC to consume. Should not have been 

624 ``open()``ed yet. 

625 on_response (Callable[[protobuf.Message], None]): The callback to 

626 be called for every response on the stream. 

627 on_fatal_exception (Callable[[Exception], None]): The callback to 

628 be called on fatal errors during consumption. Default None. 

629 """ 

630 

631 def __init__(self, bidi_rpc, on_response, on_fatal_exception=None): 

632 self._bidi_rpc = bidi_rpc 

633 self._on_response = on_response 

634 self._paused = False 

635 self._on_fatal_exception = on_fatal_exception 

636 self._wake = threading.Condition() 

637 self._thread = None 

638 self._operational_lock = threading.Lock() 

639 

640 def _on_call_done(self, future): 

641 # Resume the thread if it's paused, this prevents blocking forever 

642 # when the RPC has terminated. 

643 self.resume() 

644 

645 def _thread_main(self, ready): 

646 try: 

647 ready.set() 

648 self._bidi_rpc.add_done_callback(self._on_call_done) 

649 self._bidi_rpc.open() 

650 

651 while self._bidi_rpc.is_active: 

652 # Do not allow the paused status to change at all during this 

653 # section. There is a condition where we could be resumed 

654 # between checking if we are paused and calling wake.wait(), 

655 # which means that we will miss the notification to wake up 

656 # (oops!) and wait for a notification that will never come. 

657 # Keeping the lock throughout avoids that. 

658 # In the future, we could use `Condition.wait_for` if we drop 

659 # Python 2.7. 

660 # See: https://github.com/googleapis/python-api-core/issues/211 

661 with self._wake: 

662 while self._paused: 

663 _LOGGER.debug("paused, waiting for waking.") 

664 self._wake.wait() 

665 _LOGGER.debug("woken.") 

666 

667 _LOGGER.debug("waiting for recv.") 

668 response = self._bidi_rpc.recv() 

669 _LOGGER.debug("recved response.") 

670 if self._on_response is not None: 

671 self._on_response(response) 

672 

673 except exceptions.GoogleAPICallError as exc: 

674 _LOGGER.debug( 

675 "%s caught error %s and will exit. Generally this is due to " 

676 "the RPC itself being cancelled and the error will be " 

677 "surfaced to the calling code.", 

678 _BIDIRECTIONAL_CONSUMER_NAME, 

679 exc, 

680 exc_info=True, 

681 ) 

682 if self._on_fatal_exception is not None: 

683 self._on_fatal_exception(exc) 

684 

685 except Exception as exc: 

686 _LOGGER.exception( 

687 "%s caught unexpected exception %s and will exit.", 

688 _BIDIRECTIONAL_CONSUMER_NAME, 

689 exc, 

690 ) 

691 if self._on_fatal_exception is not None: 

692 self._on_fatal_exception(exc) 

693 

694 _LOGGER.info("%s exiting", _BIDIRECTIONAL_CONSUMER_NAME) 

695 

696 def start(self): 

697 """Start the background thread and begin consuming the thread.""" 

698 with self._operational_lock: 

699 ready = threading.Event() 

700 thread = threading.Thread( 

701 name=_BIDIRECTIONAL_CONSUMER_NAME, 

702 target=self._thread_main, 

703 args=(ready,), 

704 daemon=True, 

705 ) 

706 thread.start() 

707 # Other parts of the code rely on `thread.is_alive` which 

708 # isn't sufficient to know if a thread is active, just that it may 

709 # soon be active. This can cause races. Further protect 

710 # against races by using a ready event and wait on it to be set. 

711 ready.wait() 

712 self._thread = thread 

713 _LOGGER.debug("Started helper thread %s", thread.name) 

714 

715 def stop(self): 

716 """Stop consuming the stream and shutdown the background thread. 

717 

718 NOTE: Cannot be called within `_thread_main`, since it is not 

719 possible to join a thread to itself. 

720 """ 

721 with self._operational_lock: 

722 self._bidi_rpc.close() 

723 

724 if self._thread is not None: 

725 # Resume the thread to wake it up in case it is sleeping. 

726 self.resume() 

727 # The daemonized thread may itself block, so don't wait 

728 # for it longer than a second. 

729 self._thread.join(1.0) 

730 if self._thread.is_alive(): # pragma: NO COVER 

731 _LOGGER.warning("Background thread did not exit.") 

732 

733 self._thread = None 

734 self._on_response = None 

735 self._on_fatal_exception = None 

736 

737 @property 

738 def is_active(self): 

739 """bool: True if the background thread is active.""" 

740 return self._thread is not None and self._thread.is_alive() 

741 

742 def pause(self): 

743 """Pauses the response stream. 

744 

745 This does *not* pause the request stream. 

746 """ 

747 with self._wake: 

748 self._paused = True 

749 

750 def resume(self): 

751 """Resumes the response stream.""" 

752 with self._wake: 

753 self._paused = False 

754 self._wake.notify_all() 

755 

756 @property 

757 def is_paused(self): 

758 """bool: True if the response stream is paused.""" 

759 return self._paused