Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/jedi/inference/compiled/subprocess/__init__.py: 26%

233 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-04-20 06:09 +0000

1""" 

2Makes it possible to do the compiled analysis in a subprocess. This has two 

3goals: 

4 

51. Making it safer - Segfaults and RuntimeErrors as well as stdout/stderr can 

6 be ignored and dealt with. 

72. Make it possible to handle different Python versions as well as virtualenvs. 

8""" 

9 

10import collections 

11import os 

12import sys 

13import queue 

14import subprocess 

15import traceback 

16import weakref 

17from functools import partial 

18from threading import Thread 

19 

20from jedi._compatibility import pickle_dump, pickle_load 

21from jedi import debug 

22from jedi.cache import memoize_method 

23from jedi.inference.compiled.subprocess import functions 

24from jedi.inference.compiled.access import DirectObjectAccess, AccessPath, \ 

25 SignatureParam 

26from jedi.api.exceptions import InternalError 

27 

28 

29_MAIN_PATH = os.path.join(os.path.dirname(__file__), '__main__.py') 

30PICKLE_PROTOCOL = 4 

31 

32 

33def _GeneralizedPopen(*args, **kwargs): 

34 if os.name == 'nt': 

35 try: 

36 # Was introduced in Python 3.7. 

37 CREATE_NO_WINDOW = subprocess.CREATE_NO_WINDOW 

38 except AttributeError: 

39 CREATE_NO_WINDOW = 0x08000000 

40 kwargs['creationflags'] = CREATE_NO_WINDOW 

41 # The child process doesn't need file descriptors except 0, 1, 2. 

42 # This is unix only. 

43 kwargs['close_fds'] = 'posix' in sys.builtin_module_names 

44 

45 return subprocess.Popen(*args, **kwargs) 

46 

47 

48def _enqueue_output(out, queue_): 

49 for line in iter(out.readline, b''): 

50 queue_.put(line) 

51 

52 

53def _add_stderr_to_debug(stderr_queue): 

54 while True: 

55 # Try to do some error reporting from the subprocess and print its 

56 # stderr contents. 

57 try: 

58 line = stderr_queue.get_nowait() 

59 line = line.decode('utf-8', 'replace') 

60 debug.warning('stderr output: %s' % line.rstrip('\n')) 

61 except queue.Empty: 

62 break 

63 

64 

65def _get_function(name): 

66 return getattr(functions, name) 

67 

68 

69def _cleanup_process(process, thread): 

70 try: 

71 process.kill() 

72 process.wait() 

73 except OSError: 

74 # Raised if the process is already killed. 

75 pass 

76 thread.join() 

77 for stream in [process.stdin, process.stdout, process.stderr]: 

78 try: 

79 stream.close() 

80 except OSError: 

81 # Raised if the stream is broken. 

82 pass 

83 

84 

85class _InferenceStateProcess: 

86 def __init__(self, inference_state): 

87 self._inference_state_weakref = weakref.ref(inference_state) 

88 self._inference_state_id = id(inference_state) 

89 self._handles = {} 

90 

91 def get_or_create_access_handle(self, obj): 

92 id_ = id(obj) 

93 try: 

94 return self.get_access_handle(id_) 

95 except KeyError: 

96 access = DirectObjectAccess(self._inference_state_weakref(), obj) 

97 handle = AccessHandle(self, access, id_) 

98 self.set_access_handle(handle) 

99 return handle 

100 

101 def get_access_handle(self, id_): 

102 return self._handles[id_] 

103 

104 def set_access_handle(self, handle): 

105 self._handles[handle.id] = handle 

106 

107 

108class InferenceStateSameProcess(_InferenceStateProcess): 

109 """ 

110 Basically just an easy access to functions.py. It has the same API 

111 as InferenceStateSubprocess and does the same thing without using a subprocess. 

112 This is necessary for the Interpreter process. 

113 """ 

114 def __getattr__(self, name): 

115 return partial(_get_function(name), self._inference_state_weakref()) 

116 

117 

118class InferenceStateSubprocess(_InferenceStateProcess): 

119 def __init__(self, inference_state, compiled_subprocess): 

120 super().__init__(inference_state) 

121 self._used = False 

122 self._compiled_subprocess = compiled_subprocess 

123 

124 def __getattr__(self, name): 

125 func = _get_function(name) 

126 

127 def wrapper(*args, **kwargs): 

128 self._used = True 

129 

130 result = self._compiled_subprocess.run( 

131 self._inference_state_weakref(), 

132 func, 

133 args=args, 

134 kwargs=kwargs, 

135 ) 

136 # IMO it should be possible to create a hook in pickle.load to 

137 # mess with the loaded objects. However it's extremely complicated 

138 # to work around this so just do it with this call. ~ dave 

139 return self._convert_access_handles(result) 

140 

141 return wrapper 

142 

143 def _convert_access_handles(self, obj): 

144 if isinstance(obj, SignatureParam): 

145 return SignatureParam(*self._convert_access_handles(tuple(obj))) 

146 elif isinstance(obj, tuple): 

147 return tuple(self._convert_access_handles(o) for o in obj) 

148 elif isinstance(obj, list): 

149 return [self._convert_access_handles(o) for o in obj] 

150 elif isinstance(obj, AccessHandle): 

151 try: 

152 # Rewrite the access handle to one we're already having. 

153 obj = self.get_access_handle(obj.id) 

154 except KeyError: 

155 obj.add_subprocess(self) 

156 self.set_access_handle(obj) 

157 elif isinstance(obj, AccessPath): 

158 return AccessPath(self._convert_access_handles(obj.accesses)) 

159 return obj 

160 

161 def __del__(self): 

162 if self._used and not self._compiled_subprocess.is_crashed: 

163 self._compiled_subprocess.delete_inference_state(self._inference_state_id) 

164 

165 

166class CompiledSubprocess: 

167 is_crashed = False 

168 

169 def __init__(self, executable, env_vars=None): 

170 self._executable = executable 

171 self._env_vars = env_vars 

172 self._inference_state_deletion_queue = collections.deque() 

173 self._cleanup_callable = lambda: None 

174 

175 def __repr__(self): 

176 pid = os.getpid() 

177 return '<%s _executable=%r, is_crashed=%r, pid=%r>' % ( 

178 self.__class__.__name__, 

179 self._executable, 

180 self.is_crashed, 

181 pid, 

182 ) 

183 

184 @memoize_method 

185 def _get_process(self): 

186 debug.dbg('Start environment subprocess %s', self._executable) 

187 parso_path = sys.modules['parso'].__file__ 

188 args = ( 

189 self._executable, 

190 _MAIN_PATH, 

191 os.path.dirname(os.path.dirname(parso_path)), 

192 '.'.join(str(x) for x in sys.version_info[:3]), 

193 ) 

194 process = _GeneralizedPopen( 

195 args, 

196 stdin=subprocess.PIPE, 

197 stdout=subprocess.PIPE, 

198 stderr=subprocess.PIPE, 

199 env=self._env_vars 

200 ) 

201 self._stderr_queue = queue.Queue() 

202 self._stderr_thread = t = Thread( 

203 target=_enqueue_output, 

204 args=(process.stderr, self._stderr_queue) 

205 ) 

206 t.daemon = True 

207 t.start() 

208 # Ensure the subprocess is properly cleaned up when the object 

209 # is garbage collected. 

210 self._cleanup_callable = weakref.finalize(self, 

211 _cleanup_process, 

212 process, 

213 t) 

214 return process 

215 

216 def run(self, inference_state, function, args=(), kwargs={}): 

217 # Delete old inference_states. 

218 while True: 

219 try: 

220 inference_state_id = self._inference_state_deletion_queue.pop() 

221 except IndexError: 

222 break 

223 else: 

224 self._send(inference_state_id, None) 

225 

226 assert callable(function) 

227 return self._send(id(inference_state), function, args, kwargs) 

228 

229 def get_sys_path(self): 

230 return self._send(None, functions.get_sys_path, (), {}) 

231 

232 def _kill(self): 

233 self.is_crashed = True 

234 self._cleanup_callable() 

235 

236 def _send(self, inference_state_id, function, args=(), kwargs={}): 

237 if self.is_crashed: 

238 raise InternalError("The subprocess %s has crashed." % self._executable) 

239 

240 data = inference_state_id, function, args, kwargs 

241 try: 

242 pickle_dump(data, self._get_process().stdin, PICKLE_PROTOCOL) 

243 except BrokenPipeError: 

244 self._kill() 

245 raise InternalError("The subprocess %s was killed. Maybe out of memory?" 

246 % self._executable) 

247 

248 try: 

249 is_exception, traceback, result = pickle_load(self._get_process().stdout) 

250 except EOFError as eof_error: 

251 try: 

252 stderr = self._get_process().stderr.read().decode('utf-8', 'replace') 

253 except Exception as exc: 

254 stderr = '<empty/not available (%r)>' % exc 

255 self._kill() 

256 _add_stderr_to_debug(self._stderr_queue) 

257 raise InternalError( 

258 "The subprocess %s has crashed (%r, stderr=%s)." % ( 

259 self._executable, 

260 eof_error, 

261 stderr, 

262 )) 

263 

264 _add_stderr_to_debug(self._stderr_queue) 

265 

266 if is_exception: 

267 # Replace the attribute error message with a the traceback. It's 

268 # way more informative. 

269 result.args = (traceback,) 

270 raise result 

271 return result 

272 

273 def delete_inference_state(self, inference_state_id): 

274 """ 

275 Currently we are not deleting inference_state instantly. They only get 

276 deleted once the subprocess is used again. It would probably a better 

277 solution to move all of this into a thread. However, the memory usage 

278 of a single inference_state shouldn't be that high. 

279 """ 

280 # With an argument - the inference_state gets deleted. 

281 self._inference_state_deletion_queue.append(inference_state_id) 

282 

283 

284class Listener: 

285 def __init__(self): 

286 self._inference_states = {} 

287 # TODO refactor so we don't need to process anymore just handle 

288 # controlling. 

289 self._process = _InferenceStateProcess(Listener) 

290 

291 def _get_inference_state(self, function, inference_state_id): 

292 from jedi.inference import InferenceState 

293 

294 try: 

295 inference_state = self._inference_states[inference_state_id] 

296 except KeyError: 

297 from jedi import InterpreterEnvironment 

298 inference_state = InferenceState( 

299 # The project is not actually needed. Nothing should need to 

300 # access it. 

301 project=None, 

302 environment=InterpreterEnvironment() 

303 ) 

304 self._inference_states[inference_state_id] = inference_state 

305 return inference_state 

306 

307 def _run(self, inference_state_id, function, args, kwargs): 

308 if inference_state_id is None: 

309 return function(*args, **kwargs) 

310 elif function is None: 

311 del self._inference_states[inference_state_id] 

312 else: 

313 inference_state = self._get_inference_state(function, inference_state_id) 

314 

315 # Exchange all handles 

316 args = list(args) 

317 for i, arg in enumerate(args): 

318 if isinstance(arg, AccessHandle): 

319 args[i] = inference_state.compiled_subprocess.get_access_handle(arg.id) 

320 for key, value in kwargs.items(): 

321 if isinstance(value, AccessHandle): 

322 kwargs[key] = inference_state.compiled_subprocess.get_access_handle(value.id) 

323 

324 return function(inference_state, *args, **kwargs) 

325 

326 def listen(self): 

327 stdout = sys.stdout 

328 # Mute stdout. Nobody should actually be able to write to it, 

329 # because stdout is used for IPC. 

330 sys.stdout = open(os.devnull, 'w') 

331 stdin = sys.stdin 

332 stdout = stdout.buffer 

333 stdin = stdin.buffer 

334 

335 while True: 

336 try: 

337 payload = pickle_load(stdin) 

338 except EOFError: 

339 # It looks like the parent process closed. 

340 # Don't make a big fuss here and just exit. 

341 exit(0) 

342 try: 

343 result = False, None, self._run(*payload) 

344 except Exception as e: 

345 result = True, traceback.format_exc(), e 

346 

347 pickle_dump(result, stdout, PICKLE_PROTOCOL) 

348 

349 

350class AccessHandle: 

351 def __init__(self, subprocess, access, id_): 

352 self.access = access 

353 self._subprocess = subprocess 

354 self.id = id_ 

355 

356 def add_subprocess(self, subprocess): 

357 self._subprocess = subprocess 

358 

359 def __repr__(self): 

360 try: 

361 detail = self.access 

362 except AttributeError: 

363 detail = '#' + str(self.id) 

364 return '<%s of %s>' % (self.__class__.__name__, detail) 

365 

366 def __getstate__(self): 

367 return self.id 

368 

369 def __setstate__(self, state): 

370 self.id = state 

371 

372 def __getattr__(self, name): 

373 if name in ('id', 'access') or name.startswith('_'): 

374 raise AttributeError("Something went wrong with unpickling") 

375 

376 # print('getattr', name, file=sys.stderr) 

377 return partial(self._workaround, name) 

378 

379 def _workaround(self, name, *args, **kwargs): 

380 """ 

381 TODO Currently we're passing slice objects around. This should not 

382 happen. They are also the only unhashable objects that we're passing 

383 around. 

384 """ 

385 if args and isinstance(args[0], slice): 

386 return self._subprocess.get_compiled_method_return(self.id, name, *args, **kwargs) 

387 return self._cached_results(name, *args, **kwargs) 

388 

389 @memoize_method 

390 def _cached_results(self, name, *args, **kwargs): 

391 return self._subprocess.get_compiled_method_return(self.id, name, *args, **kwargs)