Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/ipykernel/iostream.py: 21%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

392 statements  

1"""Wrappers for forwarding stdout/stderr over zmq""" 

2 

3# Copyright (c) IPython Development Team. 

4# Distributed under the terms of the Modified BSD License. 

5 

6import asyncio 

7import atexit 

8import contextvars 

9import io 

10import os 

11import sys 

12import threading 

13import traceback 

14import warnings 

15from binascii import b2a_hex 

16from collections import defaultdict, deque 

17from io import StringIO, TextIOBase 

18from threading import local 

19from typing import Any, Callable, Optional 

20 

21import zmq 

22from jupyter_client.session import extract_header 

23from tornado.ioloop import IOLoop 

24from zmq.eventloop.zmqstream import ZMQStream 

25 

26# ----------------------------------------------------------------------------- 

27# Globals 

28# ----------------------------------------------------------------------------- 

29 

30MASTER = 0 

31CHILD = 1 

32 

33PIPE_BUFFER_SIZE = 1000 

34 

35# ----------------------------------------------------------------------------- 

36# IO classes 

37# ----------------------------------------------------------------------------- 

38 

39 

40class IOPubThread: 

41 """An object for sending IOPub messages in a background thread 

42 

43 Prevents a blocking main thread from delaying output from threads. 

44 

45 IOPubThread(pub_socket).background_socket is a Socket-API-providing object 

46 whose IO is always run in a thread. 

47 """ 

48 

49 def __init__(self, socket, pipe=False): 

50 """Create IOPub thread 

51 

52 Parameters 

53 ---------- 

54 socket : zmq.PUB Socket 

55 the socket on which messages will be sent. 

56 pipe : bool 

57 Whether this process should listen for IOPub messages 

58 piped from subprocesses. 

59 """ 

60 self.socket = socket 

61 self._stopped = False 

62 self.background_socket = BackgroundSocket(self) 

63 self._master_pid = os.getpid() 

64 self._pipe_flag = pipe 

65 self.io_loop = IOLoop(make_current=False) 

66 if pipe: 

67 self._setup_pipe_in() 

68 self._local = threading.local() 

69 self._events: deque[Callable[..., Any]] = deque() 

70 self._event_pipes: dict[threading.Thread, Any] = {} 

71 self._event_pipe_gc_lock: threading.Lock = threading.Lock() 

72 self._event_pipe_gc_seconds: float = 10 

73 self._event_pipe_gc_task: Optional[asyncio.Task[Any]] = None 

74 self._setup_event_pipe() 

75 self.thread = threading.Thread(target=self._thread_main, name="IOPub") 

76 self.thread.daemon = True 

77 self.thread.pydev_do_not_trace = True # type:ignore[attr-defined] 

78 self.thread.is_pydev_daemon_thread = True # type:ignore[attr-defined] 

79 self.thread.name = "IOPub" 

80 

81 def _thread_main(self): 

82 """The inner loop that's actually run in a thread""" 

83 

84 def _start_event_gc(): 

85 self._event_pipe_gc_task = asyncio.ensure_future(self._run_event_pipe_gc()) 

86 

87 self.io_loop.run_sync(_start_event_gc) 

88 

89 if not self._stopped: 

90 # avoid race if stop called before start thread gets here 

91 # probably only comes up in tests 

92 self.io_loop.start() 

93 

94 if self._event_pipe_gc_task is not None: 

95 # cancel gc task to avoid pending task warnings 

96 async def _cancel(): 

97 self._event_pipe_gc_task.cancel() # type:ignore[union-attr] 

98 

99 if not self._stopped: 

100 self.io_loop.run_sync(_cancel) 

101 else: 

102 self._event_pipe_gc_task.cancel() 

103 

104 self.io_loop.close(all_fds=True) 

105 

106 def _setup_event_pipe(self): 

107 """Create the PULL socket listening for events that should fire in this thread.""" 

108 ctx = self.socket.context 

109 pipe_in = ctx.socket(zmq.PULL) 

110 pipe_in.linger = 0 

111 

112 _uuid = b2a_hex(os.urandom(16)).decode("ascii") 

113 iface = self._event_interface = "inproc://%s" % _uuid 

114 pipe_in.bind(iface) 

115 self._event_puller = ZMQStream(pipe_in, self.io_loop) 

116 self._event_puller.on_recv(self._handle_event) 

117 

118 async def _run_event_pipe_gc(self): 

119 """Task to run event pipe gc continuously""" 

120 while True: 

121 await asyncio.sleep(self._event_pipe_gc_seconds) 

122 try: 

123 await self._event_pipe_gc() 

124 except Exception as e: 

125 print(f"Exception in IOPubThread._event_pipe_gc: {e}", file=sys.__stderr__) 

126 

127 async def _event_pipe_gc(self): 

128 """run a single garbage collection on event pipes""" 

129 if not self._event_pipes: 

130 # don't acquire the lock if there's nothing to do 

131 return 

132 with self._event_pipe_gc_lock: 

133 for thread, socket in list(self._event_pipes.items()): 

134 if not thread.is_alive(): 

135 socket.close() 

136 del self._event_pipes[thread] 

137 

138 @property 

139 def _event_pipe(self): 

140 """thread-local event pipe for signaling events that should be processed in the thread""" 

141 try: 

142 event_pipe = self._local.event_pipe 

143 except AttributeError: 

144 # new thread, new event pipe 

145 ctx = self.socket.context 

146 event_pipe = ctx.socket(zmq.PUSH) 

147 event_pipe.linger = 0 

148 event_pipe.connect(self._event_interface) 

149 self._local.event_pipe = event_pipe 

150 # associate event pipes to their threads 

151 # so they can be closed explicitly 

152 # implicit close on __del__ throws a ResourceWarning 

153 with self._event_pipe_gc_lock: 

154 self._event_pipes[threading.current_thread()] = event_pipe 

155 return event_pipe 

156 

157 def _handle_event(self, msg): 

158 """Handle an event on the event pipe 

159 

160 Content of the message is ignored. 

161 

162 Whenever *an* event arrives on the event stream, 

163 *all* waiting events are processed in order. 

164 """ 

165 # freeze event count so new writes don't extend the queue 

166 # while we are processing 

167 n_events = len(self._events) 

168 for _ in range(n_events): 

169 event_f = self._events.popleft() 

170 event_f() 

171 

172 def _setup_pipe_in(self): 

173 """setup listening pipe for IOPub from forked subprocesses""" 

174 ctx = self.socket.context 

175 

176 # use UUID to authenticate pipe messages 

177 self._pipe_uuid = os.urandom(16) 

178 

179 pipe_in = ctx.socket(zmq.PULL) 

180 pipe_in.linger = 0 

181 

182 try: 

183 self._pipe_port = pipe_in.bind_to_random_port("tcp://127.0.0.1") 

184 except zmq.ZMQError as e: 

185 warnings.warn( 

186 "Couldn't bind IOPub Pipe to 127.0.0.1: %s" % e 

187 + "\nsubprocess output will be unavailable.", 

188 stacklevel=2, 

189 ) 

190 self._pipe_flag = False 

191 pipe_in.close() 

192 return 

193 self._pipe_in = ZMQStream(pipe_in, self.io_loop) 

194 self._pipe_in.on_recv(self._handle_pipe_msg) 

195 

196 def _handle_pipe_msg(self, msg): 

197 """handle a pipe message from a subprocess""" 

198 if not self._pipe_flag or not self._is_master_process(): 

199 return 

200 if msg[0] != self._pipe_uuid: 

201 print("Bad pipe message: %s", msg, file=sys.__stderr__) 

202 return 

203 self.send_multipart(msg[1:]) 

204 

205 def _setup_pipe_out(self): 

206 # must be new context after fork 

207 ctx = zmq.Context() 

208 pipe_out = ctx.socket(zmq.PUSH) 

209 pipe_out.linger = 3000 # 3s timeout for pipe_out sends before discarding the message 

210 pipe_out.connect("tcp://127.0.0.1:%i" % self._pipe_port) 

211 return ctx, pipe_out 

212 

213 def _is_master_process(self): 

214 return os.getpid() == self._master_pid 

215 

216 def _check_mp_mode(self): 

217 """check for forks, and switch to zmq pipeline if necessary""" 

218 if not self._pipe_flag or self._is_master_process(): 

219 return MASTER 

220 return CHILD 

221 

222 def start(self): 

223 """Start the IOPub thread""" 

224 self.thread.name = "IOPub" 

225 self.thread.start() 

226 # make sure we don't prevent process exit 

227 # I'm not sure why setting daemon=True above isn't enough, but it doesn't appear to be. 

228 atexit.register(self.stop) 

229 

230 def stop(self): 

231 """Stop the IOPub thread""" 

232 self._stopped = True 

233 if not self.thread.is_alive(): 

234 return 

235 self.io_loop.add_callback(self.io_loop.stop) 

236 

237 self.thread.join(timeout=30) 

238 if self.thread.is_alive(): 

239 # avoid infinite hang if stop fails 

240 msg = "IOPub thread did not terminate in 30 seconds" 

241 raise TimeoutError(msg) 

242 # close *all* event pipes, created in any thread 

243 # event pipes can only be used from other threads while self.thread.is_alive() 

244 # so after thread.join, this should be safe 

245 for _thread, event_pipe in self._event_pipes.items(): 

246 event_pipe.close() 

247 

248 def close(self): 

249 """Close the IOPub thread.""" 

250 if self.closed: 

251 return 

252 self.socket.close() 

253 self.socket = None 

254 

255 @property 

256 def closed(self): 

257 return self.socket is None 

258 

259 def schedule(self, f): 

260 """Schedule a function to be called in our IO thread. 

261 

262 If the thread is not running, call immediately. 

263 """ 

264 if self.thread.is_alive(): 

265 self._events.append(f) 

266 # wake event thread (message content is ignored) 

267 self._event_pipe.send(b"") 

268 else: 

269 f() 

270 

271 def send_multipart(self, *args, **kwargs): 

272 """send_multipart schedules actual zmq send in my thread. 

273 

274 If my thread isn't running (e.g. forked process), send immediately. 

275 """ 

276 self.schedule(lambda: self._really_send(*args, **kwargs)) 

277 

278 def _really_send(self, msg, *args, **kwargs): 

279 """The callback that actually sends messages""" 

280 if self.closed: 

281 return 

282 

283 mp_mode = self._check_mp_mode() 

284 

285 if mp_mode != CHILD: 

286 # we are master, do a regular send 

287 self.socket.send_multipart(msg, *args, **kwargs) 

288 else: 

289 # we are a child, pipe to master 

290 # new context/socket for every pipe-out 

291 # since forks don't teardown politely, use ctx.term to ensure send has completed 

292 ctx, pipe_out = self._setup_pipe_out() 

293 pipe_out.send_multipart([self._pipe_uuid, *msg], *args, **kwargs) 

294 pipe_out.close() 

295 ctx.term() 

296 

297 

298class BackgroundSocket: 

299 """Wrapper around IOPub thread that provides zmq send[_multipart]""" 

300 

301 io_thread = None 

302 

303 def __init__(self, io_thread): 

304 """Initialize the socket.""" 

305 self.io_thread = io_thread 

306 

307 def __getattr__(self, attr): 

308 """Wrap socket attr access for backward-compatibility""" 

309 if attr.startswith("__") and attr.endswith("__"): 

310 # don't wrap magic methods 

311 super().__getattr__(attr) # type:ignore[misc] 

312 assert self.io_thread is not None 

313 if hasattr(self.io_thread.socket, attr): 

314 warnings.warn( 

315 f"Accessing zmq Socket attribute {attr} on BackgroundSocket" 

316 f" is deprecated since ipykernel 4.3.0" 

317 f" use .io_thread.socket.{attr}", 

318 DeprecationWarning, 

319 stacklevel=2, 

320 ) 

321 return getattr(self.io_thread.socket, attr) 

322 return super().__getattr__(attr) # type:ignore[misc] 

323 

324 def __setattr__(self, attr, value): 

325 """Set an attribute on the socket.""" 

326 if attr == "io_thread" or (attr.startswith("__") and attr.endswith("__")): 

327 super().__setattr__(attr, value) 

328 else: 

329 warnings.warn( 

330 f"Setting zmq Socket attribute {attr} on BackgroundSocket" 

331 f" is deprecated since ipykernel 4.3.0" 

332 f" use .io_thread.socket.{attr}", 

333 DeprecationWarning, 

334 stacklevel=2, 

335 ) 

336 assert self.io_thread is not None 

337 setattr(self.io_thread.socket, attr, value) 

338 

339 def send(self, msg, *args, **kwargs): 

340 """Send a message to the socket.""" 

341 return self.send_multipart([msg], *args, **kwargs) 

342 

343 def send_multipart(self, *args, **kwargs): 

344 """Schedule send in IO thread""" 

345 assert self.io_thread is not None 

346 return self.io_thread.send_multipart(*args, **kwargs) 

347 

348 

349class OutStream(TextIOBase): 

350 """A file like object that publishes the stream to a 0MQ PUB socket. 

351 

352 Output is handed off to an IO Thread 

353 """ 

354 

355 # timeout for flush to avoid infinite hang 

356 # in case of misbehavior 

357 flush_timeout = 10 

358 # The time interval between automatic flushes, in seconds. 

359 flush_interval = 0.2 

360 topic = None 

361 encoding = "UTF-8" 

362 _exc: Optional[Any] = None 

363 

364 def fileno(self): 

365 """ 

366 Things like subprocess will peak and write to the fileno() of stderr/stdout. 

367 """ 

368 if getattr(self, "_original_stdstream_copy", None) is not None: 

369 return self._original_stdstream_copy 

370 msg = "fileno" 

371 raise io.UnsupportedOperation(msg) 

372 

373 def _watch_pipe_fd(self): 

374 """ 

375 We've redirected standards streams 0 and 1 into a pipe. 

376 

377 We need to watch in a thread and redirect them to the right places. 

378 

379 1) the ZMQ channels to show in notebook interfaces, 

380 2) the original stdout/err, to capture errors in terminals. 

381 

382 We cannot schedule this on the ioloop thread, as this might be blocking. 

383 

384 """ 

385 

386 try: 

387 bts = os.read(self._fid, PIPE_BUFFER_SIZE) 

388 while bts and self._should_watch: 

389 self.write(bts.decode(errors="replace")) 

390 os.write(self._original_stdstream_copy, bts) 

391 bts = os.read(self._fid, PIPE_BUFFER_SIZE) 

392 except Exception: 

393 self._exc = sys.exc_info() 

394 

395 def __init__( 

396 self, 

397 session, 

398 pub_thread, 

399 name, 

400 pipe=None, 

401 echo=None, 

402 *, 

403 watchfd=True, 

404 isatty=False, 

405 ): 

406 """ 

407 Parameters 

408 ---------- 

409 session : object 

410 the session object 

411 pub_thread : threading.Thread 

412 the publication thread 

413 name : str {'stderr', 'stdout'} 

414 the name of the standard stream to replace 

415 pipe : object 

416 the pipe object 

417 echo : bool 

418 whether to echo output 

419 watchfd : bool (default, True) 

420 Watch the file descriptor corresponding to the replaced stream. 

421 This is useful if you know some underlying code will write directly 

422 the file descriptor by its number. It will spawn a watching thread, 

423 that will swap the give file descriptor for a pipe, read from the 

424 pipe, and insert this into the current Stream. 

425 isatty : bool (default, False) 

426 Indication of whether this stream has terminal capabilities (e.g. can handle colors) 

427 

428 """ 

429 if pipe is not None: 

430 warnings.warn( 

431 "pipe argument to OutStream is deprecated and ignored since ipykernel 4.2.3.", 

432 DeprecationWarning, 

433 stacklevel=2, 

434 ) 

435 # This is necessary for compatibility with Python built-in streams 

436 self.session = session 

437 if not isinstance(pub_thread, IOPubThread): 

438 # Backward-compat: given socket, not thread. Wrap in a thread. 

439 warnings.warn( 

440 "Since IPykernel 4.3, OutStream should be created with " 

441 "IOPubThread, not %r" % pub_thread, 

442 DeprecationWarning, 

443 stacklevel=2, 

444 ) 

445 pub_thread = IOPubThread(pub_thread) 

446 pub_thread.start() 

447 self.pub_thread = pub_thread 

448 self.name = name 

449 self.topic = b"stream." + name.encode() 

450 self._parent_header: contextvars.ContextVar[dict[str, Any]] = contextvars.ContextVar( 

451 "parent_header" 

452 ) 

453 self._parent_header.set({}) 

454 self._thread_to_parent = {} 

455 self._thread_to_parent_header = {} 

456 self._parent_header_global = {} 

457 self._master_pid = os.getpid() 

458 self._flush_pending = False 

459 self._subprocess_flush_pending = False 

460 self._io_loop = pub_thread.io_loop 

461 self._buffer_lock = threading.RLock() 

462 self._buffers = defaultdict(StringIO) 

463 self.echo = None 

464 self._isatty = bool(isatty) 

465 self._should_watch = False 

466 self._local = local() 

467 

468 if ( 

469 watchfd 

470 and ( 

471 (sys.platform.startswith("linux") or sys.platform.startswith("darwin")) 

472 # Pytest set its own capture. Don't redirect from within pytest. 

473 and ("PYTEST_CURRENT_TEST" not in os.environ) 

474 ) 

475 # allow forcing watchfd (mainly for tests) 

476 or watchfd == "force" 

477 ): 

478 self._should_watch = True 

479 self._setup_stream_redirects(name) 

480 

481 if echo: 

482 if hasattr(echo, "read") and hasattr(echo, "write"): 

483 # make sure we aren't trying to echo on the FD we're watching! 

484 # that would cause an infinite loop, always echoing on itself 

485 if self._should_watch: 

486 try: 

487 echo_fd = echo.fileno() 

488 except Exception: 

489 echo_fd = None 

490 

491 if echo_fd is not None and echo_fd == self._original_stdstream_fd: 

492 # echo on the _copy_ we made during 

493 # this is the actual terminal FD now 

494 echo = io.TextIOWrapper( 

495 io.FileIO( 

496 self._original_stdstream_copy, 

497 "w", 

498 ) 

499 ) 

500 self.echo = echo 

501 else: 

502 msg = "echo argument must be a file-like object" 

503 raise ValueError(msg) 

504 

505 @property 

506 def parent_header(self): 

507 try: 

508 # asyncio-specific 

509 return self._parent_header.get() 

510 except LookupError: 

511 try: 

512 # thread-specific 

513 identity = threading.current_thread().ident 

514 # retrieve the outermost (oldest ancestor, 

515 # discounting the kernel thread) thread identity 

516 while identity in self._thread_to_parent: 

517 identity = self._thread_to_parent[identity] 

518 # use the header of the oldest ancestor 

519 return self._thread_to_parent_header[identity] 

520 except KeyError: 

521 # global (fallback) 

522 return self._parent_header_global 

523 

524 @parent_header.setter 

525 def parent_header(self, value): 

526 self._parent_header_global = value 

527 return self._parent_header.set(value) 

528 

529 def isatty(self): 

530 """Return a bool indicating whether this is an 'interactive' stream. 

531 

532 Returns: 

533 Boolean 

534 """ 

535 return self._isatty 

536 

537 def _setup_stream_redirects(self, name): 

538 pr, pw = os.pipe() 

539 fno = self._original_stdstream_fd = getattr(sys, name).fileno() 

540 self._original_stdstream_copy = os.dup(fno) 

541 os.dup2(pw, fno) 

542 

543 self._fid = pr 

544 

545 self._exc = None 

546 self.watch_fd_thread = threading.Thread(target=self._watch_pipe_fd) 

547 self.watch_fd_thread.daemon = True 

548 self.watch_fd_thread.start() 

549 

550 def _is_master_process(self): 

551 return os.getpid() == self._master_pid 

552 

553 def set_parent(self, parent): 

554 """Set the parent header.""" 

555 self.parent_header = extract_header(parent) 

556 

557 def close(self): 

558 """Close the stream.""" 

559 if self._should_watch: 

560 self._should_watch = False 

561 # thread won't wake unless there's something to read 

562 # writing something after _should_watch will not be echoed 

563 os.write(self._original_stdstream_fd, b"\0") 

564 self.watch_fd_thread.join() 

565 # restore original FDs 

566 os.dup2(self._original_stdstream_copy, self._original_stdstream_fd) 

567 os.close(self._original_stdstream_copy) 

568 if self._exc: 

569 etype, value, tb = self._exc 

570 traceback.print_exception(etype, value, tb) 

571 self.pub_thread = None 

572 

573 @property 

574 def closed(self): 

575 return self.pub_thread is None 

576 

577 def _schedule_flush(self): 

578 """schedule a flush in the IO thread 

579 

580 call this on write, to indicate that flush should be called soon. 

581 """ 

582 if self._flush_pending: 

583 return 

584 self._flush_pending = True 

585 

586 # add_timeout has to be handed to the io thread via event pipe 

587 def _schedule_in_thread(): 

588 self._io_loop.call_later(self.flush_interval, self._flush) 

589 

590 self.pub_thread.schedule(_schedule_in_thread) 

591 

592 def flush(self): 

593 """trigger actual zmq send 

594 

595 send will happen in the background thread 

596 """ 

597 if ( 

598 self.pub_thread 

599 and self.pub_thread.thread is not None 

600 and self.pub_thread.thread.is_alive() 

601 and self.pub_thread.thread.ident != threading.current_thread().ident 

602 ): 

603 # request flush on the background thread 

604 self.pub_thread.schedule(self._flush) 

605 # wait for flush to actually get through, if we can. 

606 evt = threading.Event() 

607 self.pub_thread.schedule(evt.set) 

608 # and give a timeout to avoid 

609 if not evt.wait(self.flush_timeout): 

610 # write directly to __stderr__ instead of warning because 

611 # if this is happening sys.stderr may be the problem. 

612 print("IOStream.flush timed out", file=sys.__stderr__) 

613 else: 

614 self._flush() 

615 

616 def _flush(self): 

617 """This is where the actual send happens. 

618 

619 _flush should generally be called in the IO thread, 

620 unless the thread has been destroyed (e.g. forked subprocess). 

621 """ 

622 self._flush_pending = False 

623 self._subprocess_flush_pending = False 

624 

625 if self.echo is not None: 

626 try: 

627 self.echo.flush() 

628 except OSError as e: 

629 if self.echo is not sys.__stderr__: 

630 print(f"Flush failed: {e}", file=sys.__stderr__) 

631 

632 for parent, data in self._flush_buffers(): 

633 if data: 

634 # FIXME: this disables Session's fork-safe check, 

635 # since pub_thread is itself fork-safe. 

636 # There should be a better way to do this. 

637 self.session.pid = os.getpid() 

638 content = {"name": self.name, "text": data} 

639 msg = self.session.msg("stream", content, parent=parent) 

640 

641 # Each transform either returns a new 

642 # message or None. If None is returned, 

643 # the message has been 'used' and we return. 

644 for hook in self._hooks: 

645 msg = hook(msg) 

646 if msg is None: 

647 return 

648 

649 self.session.send( 

650 self.pub_thread, 

651 msg, 

652 ident=self.topic, 

653 ) 

654 

655 def write(self, string: str) -> Optional[int]: # type:ignore[override] 

656 """Write to current stream after encoding if necessary 

657 

658 Returns 

659 ------- 

660 len : int 

661 number of items from input parameter written to stream. 

662 

663 """ 

664 parent = self.parent_header 

665 

666 if not isinstance(string, str): 

667 msg = f"write() argument must be str, not {type(string)}" # type:ignore[unreachable] 

668 raise TypeError(msg) 

669 

670 if self.echo is not None: 

671 try: 

672 self.echo.write(string) 

673 except OSError as e: 

674 if self.echo is not sys.__stderr__: 

675 print(f"Write failed: {e}", file=sys.__stderr__) 

676 

677 if self.pub_thread is None: 

678 msg = "I/O operation on closed file" 

679 raise ValueError(msg) 

680 

681 is_child = not self._is_master_process() 

682 # only touch the buffer in the IO thread to avoid races 

683 with self._buffer_lock: 

684 self._buffers[frozenset(parent.items())].write(string) 

685 if is_child: 

686 # mp.Pool cannot be trusted to flush promptly (or ever), 

687 # and this helps. 

688 if self._subprocess_flush_pending: 

689 return None 

690 self._subprocess_flush_pending = True 

691 # We can not rely on self._io_loop.call_later from a subprocess 

692 self.pub_thread.schedule(self._flush) 

693 else: 

694 self._schedule_flush() 

695 

696 return len(string) 

697 

698 def writelines(self, sequence): 

699 """Write lines to the stream.""" 

700 if self.pub_thread is None: 

701 msg = "I/O operation on closed file" 

702 raise ValueError(msg) 

703 for string in sequence: 

704 self.write(string) 

705 

706 def writable(self): 

707 """Test whether the stream is writable.""" 

708 return True 

709 

710 def _flush_buffers(self): 

711 """clear the current buffer and return the current buffer data.""" 

712 buffers = self._rotate_buffers() 

713 for frozen_parent, buffer in buffers.items(): 

714 data = buffer.getvalue() 

715 buffer.close() 

716 yield dict(frozen_parent), data 

717 

718 def _rotate_buffers(self): 

719 """Returns the current buffer and replaces it with an empty buffer.""" 

720 with self._buffer_lock: 

721 old_buffers = self._buffers 

722 self._buffers = defaultdict(StringIO) 

723 return old_buffers 

724 

725 @property 

726 def _hooks(self): 

727 if not hasattr(self._local, "hooks"): 

728 # create new list for a new thread 

729 self._local.hooks = [] 

730 return self._local.hooks 

731 

732 def register_hook(self, hook): 

733 """ 

734 Registers a hook with the thread-local storage. 

735 

736 Parameters 

737 ---------- 

738 hook : Any callable object 

739 

740 Returns 

741 ------- 

742 Either a publishable message, or `None`. 

743 The hook callable must return a message from 

744 the __call__ method if they still require the 

745 `session.send` method to be called after transformation. 

746 Returning `None` will halt that execution path, and 

747 session.send will not be called. 

748 """ 

749 self._hooks.append(hook) 

750 

751 def unregister_hook(self, hook): 

752 """ 

753 Un-registers a hook with the thread-local storage. 

754 

755 Parameters 

756 ---------- 

757 hook : Any callable object which has previously been 

758 registered as a hook. 

759 

760 Returns 

761 ------- 

762 bool - `True` if the hook was removed, `False` if it wasn't 

763 found. 

764 """ 

765 try: 

766 self._hooks.remove(hook) 

767 return True 

768 except ValueError: 

769 return False