1"""Wrappers for forwarding stdout/stderr over zmq"""
2
3# Copyright (c) IPython Development Team.
4# Distributed under the terms of the Modified BSD License.
5
6import asyncio
7import atexit
8import contextvars
9import io
10import os
11import sys
12import threading
13import traceback
14import warnings
15from binascii import b2a_hex
16from collections import defaultdict, deque
17from io import StringIO, TextIOBase
18from threading import local
19from typing import Any, Callable, Optional
20
21import zmq
22from jupyter_client.session import extract_header
23from tornado.ioloop import IOLoop
24from zmq.eventloop.zmqstream import ZMQStream
25
26# -----------------------------------------------------------------------------
27# Globals
28# -----------------------------------------------------------------------------
29
30MASTER = 0
31CHILD = 1
32
33PIPE_BUFFER_SIZE = 1000
34
35# -----------------------------------------------------------------------------
36# IO classes
37# -----------------------------------------------------------------------------
38
39
40class IOPubThread:
41 """An object for sending IOPub messages in a background thread
42
43 Prevents a blocking main thread from delaying output from threads.
44
45 IOPubThread(pub_socket).background_socket is a Socket-API-providing object
46 whose IO is always run in a thread.
47 """
48
49 def __init__(self, socket, pipe=False):
50 """Create IOPub thread
51
52 Parameters
53 ----------
54 socket : zmq.PUB Socket
55 the socket on which messages will be sent.
56 pipe : bool
57 Whether this process should listen for IOPub messages
58 piped from subprocesses.
59 """
60 self.socket = socket
61 self._stopped = False
62 self.background_socket = BackgroundSocket(self)
63 self._master_pid = os.getpid()
64 self._pipe_flag = pipe
65 self.io_loop = IOLoop(make_current=False)
66 if pipe:
67 self._setup_pipe_in()
68 self._local = threading.local()
69 self._events: deque[Callable[..., Any]] = deque()
70 self._event_pipes: dict[threading.Thread, Any] = {}
71 self._event_pipe_gc_lock: threading.Lock = threading.Lock()
72 self._event_pipe_gc_seconds: float = 10
73 self._event_pipe_gc_task: Optional[asyncio.Task[Any]] = None
74 self._setup_event_pipe()
75 self.thread = threading.Thread(target=self._thread_main, name="IOPub")
76 self.thread.daemon = True
77 self.thread.pydev_do_not_trace = True # type:ignore[attr-defined]
78 self.thread.is_pydev_daemon_thread = True # type:ignore[attr-defined]
79 self.thread.name = "IOPub"
80
81 def _thread_main(self):
82 """The inner loop that's actually run in a thread"""
83
84 def _start_event_gc():
85 self._event_pipe_gc_task = asyncio.ensure_future(self._run_event_pipe_gc())
86
87 self.io_loop.run_sync(_start_event_gc)
88
89 if not self._stopped:
90 # avoid race if stop called before start thread gets here
91 # probably only comes up in tests
92 self.io_loop.start()
93
94 if self._event_pipe_gc_task is not None:
95 # cancel gc task to avoid pending task warnings
96 async def _cancel():
97 self._event_pipe_gc_task.cancel() # type:ignore[union-attr]
98
99 if not self._stopped:
100 self.io_loop.run_sync(_cancel)
101 else:
102 self._event_pipe_gc_task.cancel()
103
104 self.io_loop.close(all_fds=True)
105
106 def _setup_event_pipe(self):
107 """Create the PULL socket listening for events that should fire in this thread."""
108 ctx = self.socket.context
109 pipe_in = ctx.socket(zmq.PULL)
110 pipe_in.linger = 0
111
112 _uuid = b2a_hex(os.urandom(16)).decode("ascii")
113 iface = self._event_interface = "inproc://%s" % _uuid
114 pipe_in.bind(iface)
115 self._event_puller = ZMQStream(pipe_in, self.io_loop)
116 self._event_puller.on_recv(self._handle_event)
117
118 async def _run_event_pipe_gc(self):
119 """Task to run event pipe gc continuously"""
120 while True:
121 await asyncio.sleep(self._event_pipe_gc_seconds)
122 try:
123 await self._event_pipe_gc()
124 except Exception as e:
125 print(f"Exception in IOPubThread._event_pipe_gc: {e}", file=sys.__stderr__)
126
127 async def _event_pipe_gc(self):
128 """run a single garbage collection on event pipes"""
129 if not self._event_pipes:
130 # don't acquire the lock if there's nothing to do
131 return
132 with self._event_pipe_gc_lock:
133 for thread, socket in list(self._event_pipes.items()):
134 if not thread.is_alive():
135 socket.close()
136 del self._event_pipes[thread]
137
138 @property
139 def _event_pipe(self):
140 """thread-local event pipe for signaling events that should be processed in the thread"""
141 try:
142 event_pipe = self._local.event_pipe
143 except AttributeError:
144 # new thread, new event pipe
145 ctx = self.socket.context
146 event_pipe = ctx.socket(zmq.PUSH)
147 event_pipe.linger = 0
148 event_pipe.connect(self._event_interface)
149 self._local.event_pipe = event_pipe
150 # associate event pipes to their threads
151 # so they can be closed explicitly
152 # implicit close on __del__ throws a ResourceWarning
153 with self._event_pipe_gc_lock:
154 self._event_pipes[threading.current_thread()] = event_pipe
155 return event_pipe
156
157 def _handle_event(self, msg):
158 """Handle an event on the event pipe
159
160 Content of the message is ignored.
161
162 Whenever *an* event arrives on the event stream,
163 *all* waiting events are processed in order.
164 """
165 # freeze event count so new writes don't extend the queue
166 # while we are processing
167 n_events = len(self._events)
168 for _ in range(n_events):
169 event_f = self._events.popleft()
170 event_f()
171
172 def _setup_pipe_in(self):
173 """setup listening pipe for IOPub from forked subprocesses"""
174 ctx = self.socket.context
175
176 # use UUID to authenticate pipe messages
177 self._pipe_uuid = os.urandom(16)
178
179 pipe_in = ctx.socket(zmq.PULL)
180 pipe_in.linger = 0
181
182 try:
183 self._pipe_port = pipe_in.bind_to_random_port("tcp://127.0.0.1")
184 except zmq.ZMQError as e:
185 warnings.warn(
186 "Couldn't bind IOPub Pipe to 127.0.0.1: %s" % e
187 + "\nsubprocess output will be unavailable.",
188 stacklevel=2,
189 )
190 self._pipe_flag = False
191 pipe_in.close()
192 return
193 self._pipe_in = ZMQStream(pipe_in, self.io_loop)
194 self._pipe_in.on_recv(self._handle_pipe_msg)
195
196 def _handle_pipe_msg(self, msg):
197 """handle a pipe message from a subprocess"""
198 if not self._pipe_flag or not self._is_master_process():
199 return
200 if msg[0] != self._pipe_uuid:
201 print("Bad pipe message: %s", msg, file=sys.__stderr__)
202 return
203 self.send_multipart(msg[1:])
204
205 def _setup_pipe_out(self):
206 # must be new context after fork
207 ctx = zmq.Context()
208 pipe_out = ctx.socket(zmq.PUSH)
209 pipe_out.linger = 3000 # 3s timeout for pipe_out sends before discarding the message
210 pipe_out.connect("tcp://127.0.0.1:%i" % self._pipe_port)
211 return ctx, pipe_out
212
213 def _is_master_process(self):
214 return os.getpid() == self._master_pid
215
216 def _check_mp_mode(self):
217 """check for forks, and switch to zmq pipeline if necessary"""
218 if not self._pipe_flag or self._is_master_process():
219 return MASTER
220 return CHILD
221
222 def start(self):
223 """Start the IOPub thread"""
224 self.thread.name = "IOPub"
225 self.thread.start()
226 # make sure we don't prevent process exit
227 # I'm not sure why setting daemon=True above isn't enough, but it doesn't appear to be.
228 atexit.register(self.stop)
229
230 def stop(self):
231 """Stop the IOPub thread"""
232 self._stopped = True
233 if not self.thread.is_alive():
234 return
235 self.io_loop.add_callback(self.io_loop.stop)
236
237 self.thread.join(timeout=30)
238 if self.thread.is_alive():
239 # avoid infinite hang if stop fails
240 msg = "IOPub thread did not terminate in 30 seconds"
241 raise TimeoutError(msg)
242 # close *all* event pipes, created in any thread
243 # event pipes can only be used from other threads while self.thread.is_alive()
244 # so after thread.join, this should be safe
245 for _thread, event_pipe in self._event_pipes.items():
246 event_pipe.close()
247
248 def close(self):
249 """Close the IOPub thread."""
250 if self.closed:
251 return
252 self.socket.close()
253 self.socket = None
254
255 @property
256 def closed(self):
257 return self.socket is None
258
259 def schedule(self, f):
260 """Schedule a function to be called in our IO thread.
261
262 If the thread is not running, call immediately.
263 """
264 if self.thread.is_alive():
265 self._events.append(f)
266 # wake event thread (message content is ignored)
267 self._event_pipe.send(b"")
268 else:
269 f()
270
271 def send_multipart(self, *args, **kwargs):
272 """send_multipart schedules actual zmq send in my thread.
273
274 If my thread isn't running (e.g. forked process), send immediately.
275 """
276 self.schedule(lambda: self._really_send(*args, **kwargs))
277
278 def _really_send(self, msg, *args, **kwargs):
279 """The callback that actually sends messages"""
280 if self.closed:
281 return
282
283 mp_mode = self._check_mp_mode()
284
285 if mp_mode != CHILD:
286 # we are master, do a regular send
287 self.socket.send_multipart(msg, *args, **kwargs)
288 else:
289 # we are a child, pipe to master
290 # new context/socket for every pipe-out
291 # since forks don't teardown politely, use ctx.term to ensure send has completed
292 ctx, pipe_out = self._setup_pipe_out()
293 pipe_out.send_multipart([self._pipe_uuid, *msg], *args, **kwargs)
294 pipe_out.close()
295 ctx.term()
296
297
298class BackgroundSocket:
299 """Wrapper around IOPub thread that provides zmq send[_multipart]"""
300
301 io_thread = None
302
303 def __init__(self, io_thread):
304 """Initialize the socket."""
305 self.io_thread = io_thread
306
307 def __getattr__(self, attr):
308 """Wrap socket attr access for backward-compatibility"""
309 if attr.startswith("__") and attr.endswith("__"):
310 # don't wrap magic methods
311 super().__getattr__(attr) # type:ignore[misc]
312 assert self.io_thread is not None
313 if hasattr(self.io_thread.socket, attr):
314 warnings.warn(
315 f"Accessing zmq Socket attribute {attr} on BackgroundSocket"
316 f" is deprecated since ipykernel 4.3.0"
317 f" use .io_thread.socket.{attr}",
318 DeprecationWarning,
319 stacklevel=2,
320 )
321 return getattr(self.io_thread.socket, attr)
322 return super().__getattr__(attr) # type:ignore[misc]
323
324 def __setattr__(self, attr, value):
325 """Set an attribute on the socket."""
326 if attr == "io_thread" or (attr.startswith("__") and attr.endswith("__")):
327 super().__setattr__(attr, value)
328 else:
329 warnings.warn(
330 f"Setting zmq Socket attribute {attr} on BackgroundSocket"
331 f" is deprecated since ipykernel 4.3.0"
332 f" use .io_thread.socket.{attr}",
333 DeprecationWarning,
334 stacklevel=2,
335 )
336 assert self.io_thread is not None
337 setattr(self.io_thread.socket, attr, value)
338
339 def send(self, msg, *args, **kwargs):
340 """Send a message to the socket."""
341 return self.send_multipart([msg], *args, **kwargs)
342
343 def send_multipart(self, *args, **kwargs):
344 """Schedule send in IO thread"""
345 assert self.io_thread is not None
346 return self.io_thread.send_multipart(*args, **kwargs)
347
348
349class OutStream(TextIOBase):
350 """A file like object that publishes the stream to a 0MQ PUB socket.
351
352 Output is handed off to an IO Thread
353 """
354
355 # timeout for flush to avoid infinite hang
356 # in case of misbehavior
357 flush_timeout = 10
358 # The time interval between automatic flushes, in seconds.
359 flush_interval = 0.2
360 topic = None
361 encoding = "UTF-8"
362 _exc: Optional[Any] = None
363
364 def fileno(self):
365 """
366 Things like subprocess will peak and write to the fileno() of stderr/stdout.
367 """
368 if getattr(self, "_original_stdstream_copy", None) is not None:
369 return self._original_stdstream_copy
370 msg = "fileno"
371 raise io.UnsupportedOperation(msg)
372
373 def _watch_pipe_fd(self):
374 """
375 We've redirected standards streams 0 and 1 into a pipe.
376
377 We need to watch in a thread and redirect them to the right places.
378
379 1) the ZMQ channels to show in notebook interfaces,
380 2) the original stdout/err, to capture errors in terminals.
381
382 We cannot schedule this on the ioloop thread, as this might be blocking.
383
384 """
385
386 try:
387 bts = os.read(self._fid, PIPE_BUFFER_SIZE)
388 while bts and self._should_watch:
389 self.write(bts.decode(errors="replace"))
390 os.write(self._original_stdstream_copy, bts)
391 bts = os.read(self._fid, PIPE_BUFFER_SIZE)
392 except Exception:
393 self._exc = sys.exc_info()
394
395 def __init__(
396 self,
397 session,
398 pub_thread,
399 name,
400 pipe=None,
401 echo=None,
402 *,
403 watchfd=True,
404 isatty=False,
405 ):
406 """
407 Parameters
408 ----------
409 session : object
410 the session object
411 pub_thread : threading.Thread
412 the publication thread
413 name : str {'stderr', 'stdout'}
414 the name of the standard stream to replace
415 pipe : object
416 the pipe object
417 echo : bool
418 whether to echo output
419 watchfd : bool (default, True)
420 Watch the file descriptor corresponding to the replaced stream.
421 This is useful if you know some underlying code will write directly
422 the file descriptor by its number. It will spawn a watching thread,
423 that will swap the give file descriptor for a pipe, read from the
424 pipe, and insert this into the current Stream.
425 isatty : bool (default, False)
426 Indication of whether this stream has terminal capabilities (e.g. can handle colors)
427
428 """
429 if pipe is not None:
430 warnings.warn(
431 "pipe argument to OutStream is deprecated and ignored since ipykernel 4.2.3.",
432 DeprecationWarning,
433 stacklevel=2,
434 )
435 # This is necessary for compatibility with Python built-in streams
436 self.session = session
437 if not isinstance(pub_thread, IOPubThread):
438 # Backward-compat: given socket, not thread. Wrap in a thread.
439 warnings.warn(
440 "Since IPykernel 4.3, OutStream should be created with "
441 "IOPubThread, not %r" % pub_thread,
442 DeprecationWarning,
443 stacklevel=2,
444 )
445 pub_thread = IOPubThread(pub_thread)
446 pub_thread.start()
447 self.pub_thread = pub_thread
448 self.name = name
449 self.topic = b"stream." + name.encode()
450 self._parent_header: contextvars.ContextVar[dict[str, Any]] = contextvars.ContextVar(
451 "parent_header"
452 )
453 self._parent_header.set({})
454 self._thread_to_parent = {}
455 self._thread_to_parent_header = {}
456 self._parent_header_global = {}
457 self._master_pid = os.getpid()
458 self._flush_pending = False
459 self._subprocess_flush_pending = False
460 self._io_loop = pub_thread.io_loop
461 self._buffer_lock = threading.RLock()
462 self._buffers = defaultdict(StringIO)
463 self.echo = None
464 self._isatty = bool(isatty)
465 self._should_watch = False
466 self._local = local()
467
468 if (
469 watchfd
470 and (
471 (sys.platform.startswith("linux") or sys.platform.startswith("darwin"))
472 # Pytest set its own capture. Don't redirect from within pytest.
473 and ("PYTEST_CURRENT_TEST" not in os.environ)
474 )
475 # allow forcing watchfd (mainly for tests)
476 or watchfd == "force"
477 ):
478 self._should_watch = True
479 self._setup_stream_redirects(name)
480
481 if echo:
482 if hasattr(echo, "read") and hasattr(echo, "write"):
483 # make sure we aren't trying to echo on the FD we're watching!
484 # that would cause an infinite loop, always echoing on itself
485 if self._should_watch:
486 try:
487 echo_fd = echo.fileno()
488 except Exception:
489 echo_fd = None
490
491 if echo_fd is not None and echo_fd == self._original_stdstream_fd:
492 # echo on the _copy_ we made during
493 # this is the actual terminal FD now
494 echo = io.TextIOWrapper(
495 io.FileIO(
496 self._original_stdstream_copy,
497 "w",
498 )
499 )
500 self.echo = echo
501 else:
502 msg = "echo argument must be a file-like object"
503 raise ValueError(msg)
504
505 @property
506 def parent_header(self):
507 try:
508 # asyncio-specific
509 return self._parent_header.get()
510 except LookupError:
511 try:
512 # thread-specific
513 identity = threading.current_thread().ident
514 # retrieve the outermost (oldest ancestor,
515 # discounting the kernel thread) thread identity
516 while identity in self._thread_to_parent:
517 identity = self._thread_to_parent[identity]
518 # use the header of the oldest ancestor
519 return self._thread_to_parent_header[identity]
520 except KeyError:
521 # global (fallback)
522 return self._parent_header_global
523
524 @parent_header.setter
525 def parent_header(self, value):
526 self._parent_header_global = value
527 return self._parent_header.set(value)
528
529 def isatty(self):
530 """Return a bool indicating whether this is an 'interactive' stream.
531
532 Returns:
533 Boolean
534 """
535 return self._isatty
536
537 def _setup_stream_redirects(self, name):
538 pr, pw = os.pipe()
539 fno = self._original_stdstream_fd = getattr(sys, name).fileno()
540 self._original_stdstream_copy = os.dup(fno)
541 os.dup2(pw, fno)
542
543 self._fid = pr
544
545 self._exc = None
546 self.watch_fd_thread = threading.Thread(target=self._watch_pipe_fd)
547 self.watch_fd_thread.daemon = True
548 self.watch_fd_thread.start()
549
550 def _is_master_process(self):
551 return os.getpid() == self._master_pid
552
553 def set_parent(self, parent):
554 """Set the parent header."""
555 self.parent_header = extract_header(parent)
556
557 def close(self):
558 """Close the stream."""
559 if self._should_watch:
560 self._should_watch = False
561 # thread won't wake unless there's something to read
562 # writing something after _should_watch will not be echoed
563 os.write(self._original_stdstream_fd, b"\0")
564 self.watch_fd_thread.join()
565 # restore original FDs
566 os.dup2(self._original_stdstream_copy, self._original_stdstream_fd)
567 os.close(self._original_stdstream_copy)
568 if self._exc:
569 etype, value, tb = self._exc
570 traceback.print_exception(etype, value, tb)
571 self.pub_thread = None
572
573 @property
574 def closed(self):
575 return self.pub_thread is None
576
577 def _schedule_flush(self):
578 """schedule a flush in the IO thread
579
580 call this on write, to indicate that flush should be called soon.
581 """
582 if self._flush_pending:
583 return
584 self._flush_pending = True
585
586 # add_timeout has to be handed to the io thread via event pipe
587 def _schedule_in_thread():
588 self._io_loop.call_later(self.flush_interval, self._flush)
589
590 self.pub_thread.schedule(_schedule_in_thread)
591
592 def flush(self):
593 """trigger actual zmq send
594
595 send will happen in the background thread
596 """
597 if (
598 self.pub_thread
599 and self.pub_thread.thread is not None
600 and self.pub_thread.thread.is_alive()
601 and self.pub_thread.thread.ident != threading.current_thread().ident
602 ):
603 # request flush on the background thread
604 self.pub_thread.schedule(self._flush)
605 # wait for flush to actually get through, if we can.
606 evt = threading.Event()
607 self.pub_thread.schedule(evt.set)
608 # and give a timeout to avoid
609 if not evt.wait(self.flush_timeout):
610 # write directly to __stderr__ instead of warning because
611 # if this is happening sys.stderr may be the problem.
612 print("IOStream.flush timed out", file=sys.__stderr__)
613 else:
614 self._flush()
615
616 def _flush(self):
617 """This is where the actual send happens.
618
619 _flush should generally be called in the IO thread,
620 unless the thread has been destroyed (e.g. forked subprocess).
621 """
622 self._flush_pending = False
623 self._subprocess_flush_pending = False
624
625 if self.echo is not None:
626 try:
627 self.echo.flush()
628 except OSError as e:
629 if self.echo is not sys.__stderr__:
630 print(f"Flush failed: {e}", file=sys.__stderr__)
631
632 for parent, data in self._flush_buffers():
633 if data:
634 # FIXME: this disables Session's fork-safe check,
635 # since pub_thread is itself fork-safe.
636 # There should be a better way to do this.
637 self.session.pid = os.getpid()
638 content = {"name": self.name, "text": data}
639 msg = self.session.msg("stream", content, parent=parent)
640
641 # Each transform either returns a new
642 # message or None. If None is returned,
643 # the message has been 'used' and we return.
644 for hook in self._hooks:
645 msg = hook(msg)
646 if msg is None:
647 return
648
649 self.session.send(
650 self.pub_thread,
651 msg,
652 ident=self.topic,
653 )
654
655 def write(self, string: str) -> Optional[int]: # type:ignore[override]
656 """Write to current stream after encoding if necessary
657
658 Returns
659 -------
660 len : int
661 number of items from input parameter written to stream.
662
663 """
664 parent = self.parent_header
665
666 if not isinstance(string, str):
667 msg = f"write() argument must be str, not {type(string)}" # type:ignore[unreachable]
668 raise TypeError(msg)
669
670 if self.echo is not None:
671 try:
672 self.echo.write(string)
673 except OSError as e:
674 if self.echo is not sys.__stderr__:
675 print(f"Write failed: {e}", file=sys.__stderr__)
676
677 if self.pub_thread is None:
678 msg = "I/O operation on closed file"
679 raise ValueError(msg)
680
681 is_child = not self._is_master_process()
682 # only touch the buffer in the IO thread to avoid races
683 with self._buffer_lock:
684 self._buffers[frozenset(parent.items())].write(string)
685 if is_child:
686 # mp.Pool cannot be trusted to flush promptly (or ever),
687 # and this helps.
688 if self._subprocess_flush_pending:
689 return None
690 self._subprocess_flush_pending = True
691 # We can not rely on self._io_loop.call_later from a subprocess
692 self.pub_thread.schedule(self._flush)
693 else:
694 self._schedule_flush()
695
696 return len(string)
697
698 def writelines(self, sequence):
699 """Write lines to the stream."""
700 if self.pub_thread is None:
701 msg = "I/O operation on closed file"
702 raise ValueError(msg)
703 for string in sequence:
704 self.write(string)
705
706 def writable(self):
707 """Test whether the stream is writable."""
708 return True
709
710 def _flush_buffers(self):
711 """clear the current buffer and return the current buffer data."""
712 buffers = self._rotate_buffers()
713 for frozen_parent, buffer in buffers.items():
714 data = buffer.getvalue()
715 buffer.close()
716 yield dict(frozen_parent), data
717
718 def _rotate_buffers(self):
719 """Returns the current buffer and replaces it with an empty buffer."""
720 with self._buffer_lock:
721 old_buffers = self._buffers
722 self._buffers = defaultdict(StringIO)
723 return old_buffers
724
725 @property
726 def _hooks(self):
727 if not hasattr(self._local, "hooks"):
728 # create new list for a new thread
729 self._local.hooks = []
730 return self._local.hooks
731
732 def register_hook(self, hook):
733 """
734 Registers a hook with the thread-local storage.
735
736 Parameters
737 ----------
738 hook : Any callable object
739
740 Returns
741 -------
742 Either a publishable message, or `None`.
743 The hook callable must return a message from
744 the __call__ method if they still require the
745 `session.send` method to be called after transformation.
746 Returning `None` will halt that execution path, and
747 session.send will not be called.
748 """
749 self._hooks.append(hook)
750
751 def unregister_hook(self, hook):
752 """
753 Un-registers a hook with the thread-local storage.
754
755 Parameters
756 ----------
757 hook : Any callable object which has previously been
758 registered as a hook.
759
760 Returns
761 -------
762 bool - `True` if the hook was removed, `False` if it wasn't
763 found.
764 """
765 try:
766 self._hooks.remove(hook)
767 return True
768 except ValueError:
769 return False