Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/jedi/inference/compiled/subprocess/__init__.py: 27%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

240 statements  

1""" 

2Makes it possible to do the compiled analysis in a subprocess. This has two 

3goals: 

4 

51. Making it safer - Segfaults and RuntimeErrors as well as stdout/stderr can 

6 be ignored and dealt with. 

72. Make it possible to handle different Python versions as well as virtualenvs. 

8 

9The architecture here is briefly: 

10 - For each Jedi `Environment` there is a corresponding subprocess which 

11 operates within the target environment. If the subprocess dies it is replaced 

12 at this level. 

13 - `CompiledSubprocess` manages exactly one subprocess and handles communication 

14 from the parent side. 

15 - `Listener` runs within the subprocess, processing each request and yielding 

16 results. 

17 - `InterpreterEnvironment` provides an API which matches that of `Environment`, 

18 but runs functionality inline rather than within a subprocess. It is thus 

19 used both directly in places where a subprocess is unnecessary and/or 

20 undesirable and also within subprocesses themselves. 

21 - `InferenceStateSubprocess` (or `InferenceStateSameProcess`) provide high 

22 level access to functionality within the subprocess from within the parent. 

23 Each `InterpreterState` has an instance of one of these, provided by its 

24 environment. 

25""" 

26 

27import collections 

28import os 

29import sys 

30import queue 

31import subprocess 

32import traceback 

33import weakref 

34from functools import partial 

35from threading import Thread 

36from typing import Dict, TYPE_CHECKING, Any 

37 

38from jedi._compatibility import pickle_dump, pickle_load 

39from jedi import debug 

40from jedi.cache import memoize_method 

41from jedi.inference.compiled.subprocess import functions 

42from jedi.inference.compiled.access import DirectObjectAccess, AccessPath, \ 

43 SignatureParam 

44from jedi.api.exceptions import InternalError 

45 

46if TYPE_CHECKING: 

47 from jedi.inference import InferenceState 

48 

49 

50_MAIN_PATH = os.path.join(os.path.dirname(__file__), '__main__.py') 

51PICKLE_PROTOCOL = 4 

52 

53 

54def _GeneralizedPopen(*args, **kwargs): 

55 if sys.platform == "win32": 

56 try: 

57 # Was introduced in Python 3.7. 

58 CREATE_NO_WINDOW = subprocess.CREATE_NO_WINDOW 

59 except AttributeError: 

60 CREATE_NO_WINDOW = 0x08000000 

61 kwargs['creationflags'] = CREATE_NO_WINDOW 

62 # The child process doesn't need file descriptors except 0, 1, 2. 

63 # This is unix only. 

64 kwargs['close_fds'] = 'posix' in sys.builtin_module_names 

65 

66 return subprocess.Popen(*args, **kwargs) 

67 

68 

69def _enqueue_output(out, queue_): 

70 for line in iter(out.readline, b''): 

71 queue_.put(line) 

72 

73 

74def _add_stderr_to_debug(stderr_queue): 

75 while True: 

76 # Try to do some error reporting from the subprocess and print its 

77 # stderr contents. 

78 try: 

79 line = stderr_queue.get_nowait() 

80 line = line.decode('utf-8', 'replace') 

81 debug.warning('stderr output: %s' % line.rstrip('\n')) 

82 except queue.Empty: 

83 break 

84 

85 

86def _get_function(name): 

87 return getattr(functions, name) 

88 

89 

90def _cleanup_process(process, thread): 

91 try: 

92 process.kill() 

93 process.wait() 

94 except OSError: 

95 # Raised if the process is already killed. 

96 pass 

97 thread.join() 

98 for stream in [process.stdin, process.stdout, process.stderr]: 

99 try: 

100 stream.close() 

101 except OSError: 

102 # Raised if the stream is broken. 

103 pass 

104 

105 

106class _InferenceStateProcess: 

107 get_compiled_method_return: Any 

108 

109 def __init__(self, inference_state: 'InferenceState') -> None: 

110 self._inference_state_weakref = weakref.ref(inference_state) 

111 self._handles: Dict[int, AccessHandle] = {} 

112 

113 def get_or_create_access_handle(self, obj): 

114 id_ = id(obj) 

115 try: 

116 return self.get_access_handle(id_) 

117 except KeyError: 

118 access = DirectObjectAccess(self._inference_state_weakref(), obj) 

119 handle = AccessHandle(self, access, id_) 

120 self.set_access_handle(handle) 

121 return handle 

122 

123 def get_access_handle(self, id_): 

124 return self._handles[id_] 

125 

126 def set_access_handle(self, handle): 

127 self._handles[handle.id] = handle 

128 

129 

130class InferenceStateSameProcess(_InferenceStateProcess): 

131 """ 

132 Basically just an easy access to functions.py. It has the same API 

133 as InferenceStateSubprocess and does the same thing without using a subprocess. 

134 This is necessary for the Interpreter process. 

135 """ 

136 def __getattr__(self, name): 

137 return partial(_get_function(name), self._inference_state_weakref()) 

138 

139 

140class InferenceStateSubprocess(_InferenceStateProcess): 

141 """ 

142 API to functionality which will run in a subprocess. 

143 

144 This mediates the interaction between an `InferenceState` and the actual 

145 execution of functionality running within a `CompiledSubprocess`. Available 

146 functions are defined in `.functions`, though should be accessed via 

147 attributes on this class of the same name. 

148 

149 This class is responsible for indicating that the `InferenceState` within 

150 the subprocess can be removed once the corresponding instance in the parent 

151 goes away. 

152 """ 

153 

154 def __init__( 

155 self, 

156 inference_state: 'InferenceState', 

157 compiled_subprocess: 'CompiledSubprocess', 

158 ) -> None: 

159 super().__init__(inference_state) 

160 self._used = False 

161 self._compiled_subprocess = compiled_subprocess 

162 

163 # Opaque id we'll pass to the subprocess to identify the context (an 

164 # `InferenceState`) which should be used for the request. This allows us 

165 # to make subsequent requests which operate on results from previous 

166 # ones, while keeping a single subprocess which can work with several 

167 # contexts in the parent process. Once it is no longer needed(i.e: when 

168 # this class goes away), we also use this id to indicate that the 

169 # subprocess can discard the context. 

170 # 

171 # Note: this id is deliberately coupled to this class (and not to 

172 # `InferenceState`) as this class manages access handle mappings which 

173 # must correspond to those in the subprocess. This approach also avoids 

174 # race conditions from successive `InferenceState`s with the same object 

175 # id (as observed while adding support for Python 3.13). 

176 # 

177 # This value does not need to be the `id()` of this instance, we merely 

178 # need to ensure that it enables the (visible) lifetime of the context 

179 # within the subprocess to match that of this class. We therefore also 

180 # depend on the semantics of `CompiledSubprocess.delete_inference_state` 

181 # for correctness. 

182 self._inference_state_id = id(self) 

183 

184 def __getattr__(self, name): 

185 func = _get_function(name) 

186 

187 def wrapper(*args, **kwargs): 

188 self._used = True 

189 

190 result = self._compiled_subprocess.run( 

191 self._inference_state_id, 

192 func, 

193 args=args, 

194 kwargs=kwargs, 

195 ) 

196 # IMO it should be possible to create a hook in pickle.load to 

197 # mess with the loaded objects. However it's extremely complicated 

198 # to work around this so just do it with this call. ~ dave 

199 return self._convert_access_handles(result) 

200 

201 return wrapper 

202 

203 def _convert_access_handles(self, obj): 

204 if isinstance(obj, SignatureParam): 

205 return SignatureParam(*self._convert_access_handles(tuple(obj))) 

206 elif isinstance(obj, tuple): 

207 return tuple(self._convert_access_handles(o) for o in obj) 

208 elif isinstance(obj, list): 

209 return [self._convert_access_handles(o) for o in obj] 

210 elif isinstance(obj, AccessHandle): 

211 try: 

212 # Rewrite the access handle to one we're already having. 

213 obj = self.get_access_handle(obj.id) 

214 except KeyError: 

215 obj.add_subprocess(self) 

216 self.set_access_handle(obj) 

217 elif isinstance(obj, AccessPath): 

218 return AccessPath(self._convert_access_handles(obj.accesses)) 

219 return obj 

220 

221 def __del__(self): 

222 if self._used and not self._compiled_subprocess.is_crashed: 

223 self._compiled_subprocess.delete_inference_state(self._inference_state_id) 

224 

225 

226class CompiledSubprocess: 

227 """ 

228 A subprocess which runs inference within a target environment. 

229 

230 This class manages the interface to a single instance of such a process as 

231 well as the lifecycle of the process itself. See `.__main__` and `Listener` 

232 for the implementation of the subprocess and details of the protocol. 

233 

234 A single live instance of this is maintained by `jedi.api.environment.Environment`, 

235 so that typically a single subprocess is used at a time. 

236 """ 

237 

238 is_crashed = False 

239 

240 def __init__(self, executable, env_vars=None): 

241 self._executable = executable 

242 self._env_vars = env_vars 

243 self._inference_state_deletion_queue = collections.deque() 

244 self._cleanup_callable = lambda: None 

245 

246 def __repr__(self): 

247 pid = os.getpid() 

248 return '<%s _executable=%r, is_crashed=%r, pid=%r>' % ( 

249 self.__class__.__name__, 

250 self._executable, 

251 self.is_crashed, 

252 pid, 

253 ) 

254 

255 @memoize_method 

256 def _get_process(self): 

257 debug.dbg('Start environment subprocess %s', self._executable) 

258 parso_path = sys.modules['parso'].__file__ 

259 args = ( 

260 self._executable, 

261 _MAIN_PATH, 

262 os.path.dirname(os.path.dirname(parso_path)), 

263 '.'.join(str(x) for x in sys.version_info[:3]), 

264 ) 

265 process = _GeneralizedPopen( 

266 args, 

267 stdin=subprocess.PIPE, 

268 stdout=subprocess.PIPE, 

269 stderr=subprocess.PIPE, 

270 env=self._env_vars 

271 ) 

272 self._stderr_queue = queue.Queue() 

273 self._stderr_thread = t = Thread( 

274 target=_enqueue_output, 

275 args=(process.stderr, self._stderr_queue) 

276 ) 

277 t.daemon = True 

278 t.start() 

279 # Ensure the subprocess is properly cleaned up when the object 

280 # is garbage collected. 

281 self._cleanup_callable = weakref.finalize(self, 

282 _cleanup_process, 

283 process, 

284 t) 

285 return process 

286 

287 def run(self, inference_state_id, function, args=(), kwargs={}): 

288 # Delete old inference_states. 

289 while True: 

290 try: 

291 delete_id = self._inference_state_deletion_queue.pop() 

292 except IndexError: 

293 break 

294 else: 

295 self._send(delete_id, None) 

296 

297 assert callable(function) 

298 return self._send(inference_state_id, function, args, kwargs) 

299 

300 def get_sys_path(self): 

301 return self._send(None, functions.get_sys_path, (), {}) 

302 

303 def _kill(self): 

304 self.is_crashed = True 

305 self._cleanup_callable() 

306 

307 def _send(self, inference_state_id, function, args=(), kwargs={}): 

308 if self.is_crashed: 

309 raise InternalError("The subprocess %s has crashed." % self._executable) 

310 

311 data = inference_state_id, function, args, kwargs 

312 try: 

313 pickle_dump(data, self._get_process().stdin, PICKLE_PROTOCOL) 

314 except BrokenPipeError: 

315 self._kill() 

316 raise InternalError("The subprocess %s was killed. Maybe out of memory?" 

317 % self._executable) 

318 

319 try: 

320 is_exception, traceback, result = pickle_load(self._get_process().stdout) 

321 except EOFError as eof_error: 

322 try: 

323 stderr = self._get_process().stderr.read().decode('utf-8', 'replace') 

324 except Exception as exc: 

325 stderr = '<empty/not available (%r)>' % exc 

326 self._kill() 

327 _add_stderr_to_debug(self._stderr_queue) 

328 raise InternalError( 

329 "The subprocess %s has crashed (%r, stderr=%s)." % ( 

330 self._executable, 

331 eof_error, 

332 stderr, 

333 )) 

334 

335 _add_stderr_to_debug(self._stderr_queue) 

336 

337 if is_exception: 

338 # Replace the attribute error message with a the traceback. It's 

339 # way more informative. 

340 result.args = (traceback,) 

341 raise result 

342 return result 

343 

344 def delete_inference_state(self, inference_state_id): 

345 """ 

346 Indicate that an inference state (in the subprocess) is no longer 

347 needed. 

348 

349 The state corresponding to the given id will become inaccessible and the 

350 id may safely be re-used to refer to a different context. 

351 

352 Note: it is not guaranteed that the corresponding state will actually be 

353 deleted immediately. 

354 """ 

355 # Warning: if changing the semantics of context deletion see the comment 

356 # in `InferenceStateSubprocess.__init__` regarding potential race 

357 # conditions. 

358 

359 # Currently we are not deleting the related state instantly. They only 

360 # get deleted once the subprocess is used again. It would probably a 

361 # better solution to move all of this into a thread. However, the memory 

362 # usage of a single inference_state shouldn't be that high. 

363 self._inference_state_deletion_queue.append(inference_state_id) 

364 

365 

366class Listener: 

367 """ 

368 Main loop for the subprocess which actually does the inference. 

369 

370 This class runs within the target environment. It listens to instructions 

371 from the parent process, runs inference and returns the results. 

372 

373 The subprocess has a long lifetime and is expected to process several 

374 requests, including for different `InferenceState` instances in the parent. 

375 See `CompiledSubprocess` for the parent half of the system. 

376 

377 Communication is via pickled data sent serially over stdin and stdout. 

378 Stderr is read only if the child process crashes. 

379 

380 The request protocol is a 4-tuple of: 

381 * inference_state_id | None: an opaque identifier of the parent's 

382 `InferenceState`. An `InferenceState` operating over an 

383 `InterpreterEnvironment` is created within this process for each of 

384 these, ensuring that each parent context has a corresponding context 

385 here. This allows context to be persisted between requests. Unless 

386 `None`, the local `InferenceState` will be passed to the given function 

387 as the first positional argument. 

388 * function | None: the function to run. This is expected to be a member of 

389 `.functions`. `None` indicates that the corresponding inference state is 

390 no longer needed and should be dropped. 

391 * args: positional arguments to the `function`. If any of these are 

392 `AccessHandle` instances they will be adapted to the local 

393 `InferenceState` before being passed. 

394 * kwargs: keyword arguments to the `function`. If any of these are 

395 `AccessHandle` instances they will be adapted to the local 

396 `InferenceState` before being passed. 

397 

398 The result protocol is a 3-tuple of either: 

399 * (False, None, function result): if the function returns without error, or 

400 * (True, traceback, exception): if the function raises an exception 

401 """ 

402 

403 def __init__(self): 

404 self._inference_states = {} 

405 

406 def _get_inference_state(self, function, inference_state_id): 

407 from jedi.inference import InferenceState 

408 

409 try: 

410 inference_state = self._inference_states[inference_state_id] 

411 except KeyError: 

412 from jedi import InterpreterEnvironment 

413 inference_state = InferenceState( 

414 # The project is not actually needed. Nothing should need to 

415 # access it. 

416 project=None, 

417 environment=InterpreterEnvironment() 

418 ) 

419 self._inference_states[inference_state_id] = inference_state 

420 return inference_state 

421 

422 def _run(self, inference_state_id, function, args, kwargs): 

423 if inference_state_id is None: 

424 return function(*args, **kwargs) 

425 elif function is None: 

426 # Warning: if changing the semantics of context deletion see the comment 

427 # in `InferenceStateSubprocess.__init__` regarding potential race 

428 # conditions. 

429 del self._inference_states[inference_state_id] 

430 else: 

431 inference_state = self._get_inference_state(function, inference_state_id) 

432 

433 # Exchange all handles 

434 args = list(args) 

435 for i, arg in enumerate(args): 

436 if isinstance(arg, AccessHandle): 

437 args[i] = inference_state.compiled_subprocess.get_access_handle(arg.id) 

438 for key, value in kwargs.items(): 

439 if isinstance(value, AccessHandle): 

440 kwargs[key] = inference_state.compiled_subprocess.get_access_handle(value.id) 

441 

442 return function(inference_state, *args, **kwargs) 

443 

444 def listen(self): 

445 stdout = sys.stdout 

446 # Mute stdout. Nobody should actually be able to write to it, 

447 # because stdout is used for IPC. 

448 sys.stdout = open(os.devnull, 'w') 

449 stdin = sys.stdin 

450 stdout = stdout.buffer 

451 stdin = stdin.buffer 

452 

453 while True: 

454 try: 

455 payload = pickle_load(stdin) 

456 except EOFError: 

457 # It looks like the parent process closed. 

458 # Don't make a big fuss here and just exit. 

459 exit(0) 

460 try: 

461 result = False, None, self._run(*payload) 

462 except Exception as e: 

463 result = True, traceback.format_exc(), e 

464 

465 pickle_dump(result, stdout, PICKLE_PROTOCOL) 

466 

467 

468class AccessHandle: 

469 def __init__( 

470 self, 

471 subprocess: _InferenceStateProcess, 

472 access: DirectObjectAccess, 

473 id_: int, 

474 ) -> None: 

475 self.access = access 

476 self._subprocess = subprocess 

477 self.id = id_ 

478 

479 def add_subprocess(self, subprocess): 

480 self._subprocess = subprocess 

481 

482 def __repr__(self): 

483 try: 

484 detail = self.access 

485 except AttributeError: 

486 detail = '#' + str(self.id) 

487 return '<%s of %s>' % (self.__class__.__name__, detail) 

488 

489 def __getstate__(self): 

490 return self.id 

491 

492 def __setstate__(self, state): 

493 self.id = state 

494 

495 def __getattr__(self, name): 

496 if name in ('id', 'access') or name.startswith('_'): 

497 raise AttributeError("Something went wrong with unpickling") 

498 

499 # print('getattr', name, file=sys.stderr) 

500 return partial(self._workaround, name) 

501 

502 def _workaround(self, name, *args, **kwargs): 

503 """ 

504 TODO Currently we're passing slice objects around. This should not 

505 happen. They are also the only unhashable objects that we're passing 

506 around. 

507 """ 

508 if args and isinstance(args[0], slice): 

509 return self._subprocess.get_compiled_method_return(self.id, name, *args, **kwargs) 

510 return self._cached_results(name, *args, **kwargs) 

511 

512 @memoize_method 

513 def _cached_results(self, name, *args, **kwargs): 

514 return self._subprocess.get_compiled_method_return(self.id, name, *args, **kwargs)