Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pexpect/utils.py: 14%

105 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-04-20 06:09 +0000

1import os 

2import sys 

3import stat 

4import select 

5import time 

6import errno 

7 

8try: 

9 InterruptedError 

10except NameError: 

11 # Alias Python2 exception to Python3 

12 InterruptedError = select.error 

13 

14if sys.version_info[0] >= 3: 

15 string_types = (str,) 

16else: 

17 string_types = (unicode, str) 

18 

19 

20def is_executable_file(path): 

21 """Checks that path is an executable regular file, or a symlink towards one. 

22 

23 This is roughly ``os.path isfile(path) and os.access(path, os.X_OK)``. 

24 """ 

25 # follow symlinks, 

26 fpath = os.path.realpath(path) 

27 

28 if not os.path.isfile(fpath): 

29 # non-files (directories, fifo, etc.) 

30 return False 

31 

32 mode = os.stat(fpath).st_mode 

33 

34 if (sys.platform.startswith('sunos') 

35 and os.getuid() == 0): 

36 # When root on Solaris, os.X_OK is True for *all* files, irregardless 

37 # of their executability -- instead, any permission bit of any user, 

38 # group, or other is fine enough. 

39 # 

40 # (This may be true for other "Unix98" OS's such as HP-UX and AIX) 

41 return bool(mode & (stat.S_IXUSR | 

42 stat.S_IXGRP | 

43 stat.S_IXOTH)) 

44 

45 return os.access(fpath, os.X_OK) 

46 

47 

48def which(filename, env=None): 

49 '''This takes a given filename; tries to find it in the environment path; 

50 then checks if it is executable. This returns the full path to the filename 

51 if found and executable. Otherwise this returns None.''' 

52 

53 # Special case where filename contains an explicit path. 

54 if os.path.dirname(filename) != '' and is_executable_file(filename): 

55 return filename 

56 if env is None: 

57 env = os.environ 

58 p = env.get('PATH') 

59 if not p: 

60 p = os.defpath 

61 pathlist = p.split(os.pathsep) 

62 for path in pathlist: 

63 ff = os.path.join(path, filename) 

64 if is_executable_file(ff): 

65 return ff 

66 return None 

67 

68 

69def split_command_line(command_line): 

70 

71 '''This splits a command line into a list of arguments. It splits arguments 

72 on spaces, but handles embedded quotes, doublequotes, and escaped 

73 characters. It's impossible to do this with a regular expression, so I 

74 wrote a little state machine to parse the command line. ''' 

75 

76 arg_list = [] 

77 arg = '' 

78 

79 # Constants to name the states we can be in. 

80 state_basic = 0 

81 state_esc = 1 

82 state_singlequote = 2 

83 state_doublequote = 3 

84 # The state when consuming whitespace between commands. 

85 state_whitespace = 4 

86 state = state_basic 

87 

88 for c in command_line: 

89 if state == state_basic or state == state_whitespace: 

90 if c == '\\': 

91 # Escape the next character 

92 state = state_esc 

93 elif c == r"'": 

94 # Handle single quote 

95 state = state_singlequote 

96 elif c == r'"': 

97 # Handle double quote 

98 state = state_doublequote 

99 elif c.isspace(): 

100 # Add arg to arg_list if we aren't in the middle of whitespace. 

101 if state == state_whitespace: 

102 # Do nothing. 

103 None 

104 else: 

105 arg_list.append(arg) 

106 arg = '' 

107 state = state_whitespace 

108 else: 

109 arg = arg + c 

110 state = state_basic 

111 elif state == state_esc: 

112 arg = arg + c 

113 state = state_basic 

114 elif state == state_singlequote: 

115 if c == r"'": 

116 state = state_basic 

117 else: 

118 arg = arg + c 

119 elif state == state_doublequote: 

120 if c == r'"': 

121 state = state_basic 

122 else: 

123 arg = arg + c 

124 

125 if arg != '': 

126 arg_list.append(arg) 

127 return arg_list 

128 

129 

130def select_ignore_interrupts(iwtd, owtd, ewtd, timeout=None): 

131 

132 '''This is a wrapper around select.select() that ignores signals. If 

133 select.select raises a select.error exception and errno is an EINTR 

134 error then it is ignored. Mainly this is used to ignore sigwinch 

135 (terminal resize). ''' 

136 

137 # if select() is interrupted by a signal (errno==EINTR) then 

138 # we loop back and enter the select() again. 

139 if timeout is not None: 

140 end_time = time.time() + timeout 

141 while True: 

142 try: 

143 return select.select(iwtd, owtd, ewtd, timeout) 

144 except InterruptedError: 

145 err = sys.exc_info()[1] 

146 if err.args[0] == errno.EINTR: 

147 # if we loop back we have to subtract the 

148 # amount of time we already waited. 

149 if timeout is not None: 

150 timeout = end_time - time.time() 

151 if timeout < 0: 

152 return([], [], []) 

153 else: 

154 # something else caused the select.error, so 

155 # this actually is an exception. 

156 raise 

157 

158 

159def poll_ignore_interrupts(fds, timeout=None): 

160 '''Simple wrapper around poll to register file descriptors and 

161 ignore signals.''' 

162 

163 if timeout is not None: 

164 end_time = time.time() + timeout 

165 

166 poller = select.poll() 

167 for fd in fds: 

168 poller.register(fd, select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR) 

169 

170 while True: 

171 try: 

172 timeout_ms = None if timeout is None else timeout * 1000 

173 results = poller.poll(timeout_ms) 

174 return [afd for afd, _ in results] 

175 except InterruptedError: 

176 err = sys.exc_info()[1] 

177 if err.args[0] == errno.EINTR: 

178 # if we loop back we have to subtract the 

179 # amount of time we already waited. 

180 if timeout is not None: 

181 timeout = end_time - time.time() 

182 if timeout < 0: 

183 return [] 

184 else: 

185 # something else caused the select.error, so 

186 # this actually is an exception. 

187 raise