Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/pty.py: 16%
112 statements
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
1"""Pseudo terminal utilities."""
3# Bugs: No signal handling. Doesn't set slave termios and window size.
4# Only tested on Linux.
5# See: W. Richard Stevens. 1992. Advanced Programming in the
6# UNIX Environment. Chapter 19.
7# Author: Steen Lumholt -- with additions by Guido.
9from select import select
10import os
11import sys
12import tty
14__all__ = ["openpty","fork","spawn"]
16STDIN_FILENO = 0
17STDOUT_FILENO = 1
18STDERR_FILENO = 2
20CHILD = 0
22def openpty():
23 """openpty() -> (master_fd, slave_fd)
24 Open a pty master/slave pair, using os.openpty() if possible."""
26 try:
27 return os.openpty()
28 except (AttributeError, OSError):
29 pass
30 master_fd, slave_name = _open_terminal()
31 slave_fd = slave_open(slave_name)
32 return master_fd, slave_fd
34def master_open():
35 """master_open() -> (master_fd, slave_name)
36 Open a pty master and return the fd, and the filename of the slave end.
37 Deprecated, use openpty() instead."""
39 try:
40 master_fd, slave_fd = os.openpty()
41 except (AttributeError, OSError):
42 pass
43 else:
44 slave_name = os.ttyname(slave_fd)
45 os.close(slave_fd)
46 return master_fd, slave_name
48 return _open_terminal()
50def _open_terminal():
51 """Open pty master and return (master_fd, tty_name)."""
52 for x in 'pqrstuvwxyzPQRST':
53 for y in '0123456789abcdef':
54 pty_name = '/dev/pty' + x + y
55 try:
56 fd = os.open(pty_name, os.O_RDWR)
57 except OSError:
58 continue
59 return (fd, '/dev/tty' + x + y)
60 raise OSError('out of pty devices')
62def slave_open(tty_name):
63 """slave_open(tty_name) -> slave_fd
64 Open the pty slave and acquire the controlling terminal, returning
65 opened filedescriptor.
66 Deprecated, use openpty() instead."""
68 result = os.open(tty_name, os.O_RDWR)
69 try:
70 from fcntl import ioctl, I_PUSH
71 except ImportError:
72 return result
73 try:
74 ioctl(result, I_PUSH, "ptem")
75 ioctl(result, I_PUSH, "ldterm")
76 except OSError:
77 pass
78 return result
80def fork():
81 """fork() -> (pid, master_fd)
82 Fork and make the child a session leader with a controlling terminal."""
84 try:
85 pid, fd = os.forkpty()
86 except (AttributeError, OSError):
87 pass
88 else:
89 if pid == CHILD:
90 try:
91 os.setsid()
92 except OSError:
93 # os.forkpty() already set us session leader
94 pass
95 return pid, fd
97 master_fd, slave_fd = openpty()
98 pid = os.fork()
99 if pid == CHILD:
100 # Establish a new session.
101 os.setsid()
102 os.close(master_fd)
104 # Slave becomes stdin/stdout/stderr of child.
105 os.dup2(slave_fd, STDIN_FILENO)
106 os.dup2(slave_fd, STDOUT_FILENO)
107 os.dup2(slave_fd, STDERR_FILENO)
108 if (slave_fd > STDERR_FILENO):
109 os.close (slave_fd)
111 # Explicitly open the tty to make it become a controlling tty.
112 tmp_fd = os.open(os.ttyname(STDOUT_FILENO), os.O_RDWR)
113 os.close(tmp_fd)
114 else:
115 os.close(slave_fd)
117 # Parent and child process.
118 return pid, master_fd
120def _writen(fd, data):
121 """Write all the data to a descriptor."""
122 while data:
123 n = os.write(fd, data)
124 data = data[n:]
126def _read(fd):
127 """Default read function."""
128 return os.read(fd, 1024)
130def _copy(master_fd, master_read=_read, stdin_read=_read):
131 """Parent copy loop.
132 Copies
133 pty master -> standard output (master_read)
134 standard input -> pty master (stdin_read)"""
135 fds = [master_fd, STDIN_FILENO]
136 while True:
137 rfds, wfds, xfds = select(fds, [], [])
138 if master_fd in rfds:
139 data = master_read(master_fd)
140 if not data: # Reached EOF.
141 fds.remove(master_fd)
142 else:
143 os.write(STDOUT_FILENO, data)
144 if STDIN_FILENO in rfds:
145 data = stdin_read(STDIN_FILENO)
146 if not data:
147 fds.remove(STDIN_FILENO)
148 else:
149 _writen(master_fd, data)
151def spawn(argv, master_read=_read, stdin_read=_read):
152 """Create a spawned process."""
153 if type(argv) == type(''):
154 argv = (argv,)
155 sys.audit('pty.spawn', argv)
156 pid, master_fd = fork()
157 if pid == CHILD:
158 os.execlp(argv[0], *argv)
159 try:
160 mode = tty.tcgetattr(STDIN_FILENO)
161 tty.setraw(STDIN_FILENO)
162 restore = 1
163 except tty.error: # This is the same as termios.error
164 restore = 0
165 try:
166 _copy(master_fd, master_read, stdin_read)
167 except OSError:
168 if restore:
169 tty.tcsetattr(STDIN_FILENO, tty.TCSAFLUSH, mode)
171 os.close(master_fd)
172 return os.waitpid(pid, 0)[1]