1"""Base class for threads."""
2from threading import Thread
3
4from tornado.ioloop import IOLoop
5
6CONTROL_THREAD_NAME = "Control"
7SHELL_CHANNEL_THREAD_NAME = "Shell channel"
8
9
10class BaseThread(Thread):
11 """Base class for threads."""
12
13 def __init__(self, **kwargs):
14 """Initialize the thread."""
15 super().__init__(**kwargs)
16 self.io_loop = IOLoop(make_current=False)
17 self.pydev_do_not_trace = True
18 self.is_pydev_daemon_thread = True
19
20 def run(self) -> None:
21 """Run the thread."""
22 try:
23 self.io_loop.start()
24 finally:
25 self.io_loop.close()
26
27 def stop(self) -> None:
28 """Stop the thread.
29
30 This method is threadsafe.
31 """
32 self.io_loop.add_callback(self.io_loop.stop)