Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/pty.py: 16%

112 statements  

« prev     ^ index     » next       coverage.py v7.0.1, created at 2022-12-25 06:11 +0000

1"""Pseudo terminal utilities.""" 

2 

3# Bugs: No signal handling. Doesn't set slave termios and window size. 

4# Only tested on Linux. 

5# See: W. Richard Stevens. 1992. Advanced Programming in the 

6# UNIX Environment. Chapter 19. 

7# Author: Steen Lumholt -- with additions by Guido. 

8 

9from select import select 

10import os 

11import sys 

12import tty 

13 

14__all__ = ["openpty","fork","spawn"] 

15 

16STDIN_FILENO = 0 

17STDOUT_FILENO = 1 

18STDERR_FILENO = 2 

19 

20CHILD = 0 

21 

22def openpty(): 

23 """openpty() -> (master_fd, slave_fd) 

24 Open a pty master/slave pair, using os.openpty() if possible.""" 

25 

26 try: 

27 return os.openpty() 

28 except (AttributeError, OSError): 

29 pass 

30 master_fd, slave_name = _open_terminal() 

31 slave_fd = slave_open(slave_name) 

32 return master_fd, slave_fd 

33 

34def master_open(): 

35 """master_open() -> (master_fd, slave_name) 

36 Open a pty master and return the fd, and the filename of the slave end. 

37 Deprecated, use openpty() instead.""" 

38 

39 try: 

40 master_fd, slave_fd = os.openpty() 

41 except (AttributeError, OSError): 

42 pass 

43 else: 

44 slave_name = os.ttyname(slave_fd) 

45 os.close(slave_fd) 

46 return master_fd, slave_name 

47 

48 return _open_terminal() 

49 

50def _open_terminal(): 

51 """Open pty master and return (master_fd, tty_name).""" 

52 for x in 'pqrstuvwxyzPQRST': 

53 for y in '0123456789abcdef': 

54 pty_name = '/dev/pty' + x + y 

55 try: 

56 fd = os.open(pty_name, os.O_RDWR) 

57 except OSError: 

58 continue 

59 return (fd, '/dev/tty' + x + y) 

60 raise OSError('out of pty devices') 

61 

62def slave_open(tty_name): 

63 """slave_open(tty_name) -> slave_fd 

64 Open the pty slave and acquire the controlling terminal, returning 

65 opened filedescriptor. 

66 Deprecated, use openpty() instead.""" 

67 

68 result = os.open(tty_name, os.O_RDWR) 

69 try: 

70 from fcntl import ioctl, I_PUSH 

71 except ImportError: 

72 return result 

73 try: 

74 ioctl(result, I_PUSH, "ptem") 

75 ioctl(result, I_PUSH, "ldterm") 

76 except OSError: 

77 pass 

78 return result 

79 

80def fork(): 

81 """fork() -> (pid, master_fd) 

82 Fork and make the child a session leader with a controlling terminal.""" 

83 

84 try: 

85 pid, fd = os.forkpty() 

86 except (AttributeError, OSError): 

87 pass 

88 else: 

89 if pid == CHILD: 

90 try: 

91 os.setsid() 

92 except OSError: 

93 # os.forkpty() already set us session leader 

94 pass 

95 return pid, fd 

96 

97 master_fd, slave_fd = openpty() 

98 pid = os.fork() 

99 if pid == CHILD: 

100 # Establish a new session. 

101 os.setsid() 

102 os.close(master_fd) 

103 

104 # Slave becomes stdin/stdout/stderr of child. 

105 os.dup2(slave_fd, STDIN_FILENO) 

106 os.dup2(slave_fd, STDOUT_FILENO) 

107 os.dup2(slave_fd, STDERR_FILENO) 

108 if (slave_fd > STDERR_FILENO): 

109 os.close (slave_fd) 

110 

111 # Explicitly open the tty to make it become a controlling tty. 

112 tmp_fd = os.open(os.ttyname(STDOUT_FILENO), os.O_RDWR) 

113 os.close(tmp_fd) 

114 else: 

115 os.close(slave_fd) 

116 

117 # Parent and child process. 

118 return pid, master_fd 

119 

120def _writen(fd, data): 

121 """Write all the data to a descriptor.""" 

122 while data: 

123 n = os.write(fd, data) 

124 data = data[n:] 

125 

126def _read(fd): 

127 """Default read function.""" 

128 return os.read(fd, 1024) 

129 

130def _copy(master_fd, master_read=_read, stdin_read=_read): 

131 """Parent copy loop. 

132 Copies 

133 pty master -> standard output (master_read) 

134 standard input -> pty master (stdin_read)""" 

135 fds = [master_fd, STDIN_FILENO] 

136 while True: 

137 rfds, wfds, xfds = select(fds, [], []) 

138 if master_fd in rfds: 

139 data = master_read(master_fd) 

140 if not data: # Reached EOF. 

141 fds.remove(master_fd) 

142 else: 

143 os.write(STDOUT_FILENO, data) 

144 if STDIN_FILENO in rfds: 

145 data = stdin_read(STDIN_FILENO) 

146 if not data: 

147 fds.remove(STDIN_FILENO) 

148 else: 

149 _writen(master_fd, data) 

150 

151def spawn(argv, master_read=_read, stdin_read=_read): 

152 """Create a spawned process.""" 

153 if type(argv) == type(''): 

154 argv = (argv,) 

155 sys.audit('pty.spawn', argv) 

156 pid, master_fd = fork() 

157 if pid == CHILD: 

158 os.execlp(argv[0], *argv) 

159 try: 

160 mode = tty.tcgetattr(STDIN_FILENO) 

161 tty.setraw(STDIN_FILENO) 

162 restore = 1 

163 except tty.error: # This is the same as termios.error 

164 restore = 0 

165 try: 

166 _copy(master_fd, master_read, stdin_read) 

167 except OSError: 

168 if restore: 

169 tty.tcsetattr(STDIN_FILENO, tty.TCSAFLUSH, mode) 

170 

171 os.close(master_fd) 

172 return os.waitpid(pid, 0)[1]