Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/jedi/inference/compiled/subprocess/__init__.py: 26%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

239 statements  

1""" 

2Makes it possible to do the compiled analysis in a subprocess. This has two 

3goals: 

4 

51. Making it safer - Segfaults and RuntimeErrors as well as stdout/stderr can 

6 be ignored and dealt with. 

72. Make it possible to handle different Python versions as well as virtualenvs. 

8 

9The architecture here is briefly: 

10 - For each Jedi `Environment` there is a corresponding subprocess which 

11 operates within the target environment. If the subprocess dies it is replaced 

12 at this level. 

13 - `CompiledSubprocess` manages exactly one subprocess and handles communication 

14 from the parent side. 

15 - `Listener` runs within the subprocess, processing each request and yielding 

16 results. 

17 - `InterpreterEnvironment` provides an API which matches that of `Environment`, 

18 but runs functionality inline rather than within a subprocess. It is thus 

19 used both directly in places where a subprocess is unnecessary and/or 

20 undesirable and also within subprocesses themselves. 

21 - `InferenceStateSubprocess` (or `InferenceStateSameProcess`) provide high 

22 level access to functionality within the subprocess from within the parent. 

23 Each `InterpreterState` has an instance of one of these, provided by its 

24 environment. 

25""" 

26 

27import collections 

28import os 

29import sys 

30import queue 

31import subprocess 

32import traceback 

33import weakref 

34from functools import partial 

35from threading import Thread 

36from typing import Dict, TYPE_CHECKING 

37 

38from jedi._compatibility import pickle_dump, pickle_load 

39from jedi import debug 

40from jedi.cache import memoize_method 

41from jedi.inference.compiled.subprocess import functions 

42from jedi.inference.compiled.access import DirectObjectAccess, AccessPath, \ 

43 SignatureParam 

44from jedi.api.exceptions import InternalError 

45 

46if TYPE_CHECKING: 

47 from jedi.inference import InferenceState 

48 

49 

50_MAIN_PATH = os.path.join(os.path.dirname(__file__), '__main__.py') 

51PICKLE_PROTOCOL = 4 

52 

53 

54def _GeneralizedPopen(*args, **kwargs): 

55 if os.name == 'nt': 

56 try: 

57 # Was introduced in Python 3.7. 

58 CREATE_NO_WINDOW = subprocess.CREATE_NO_WINDOW 

59 except AttributeError: 

60 CREATE_NO_WINDOW = 0x08000000 

61 kwargs['creationflags'] = CREATE_NO_WINDOW 

62 # The child process doesn't need file descriptors except 0, 1, 2. 

63 # This is unix only. 

64 kwargs['close_fds'] = 'posix' in sys.builtin_module_names 

65 

66 return subprocess.Popen(*args, **kwargs) 

67 

68 

69def _enqueue_output(out, queue_): 

70 for line in iter(out.readline, b''): 

71 queue_.put(line) 

72 

73 

74def _add_stderr_to_debug(stderr_queue): 

75 while True: 

76 # Try to do some error reporting from the subprocess and print its 

77 # stderr contents. 

78 try: 

79 line = stderr_queue.get_nowait() 

80 line = line.decode('utf-8', 'replace') 

81 debug.warning('stderr output: %s' % line.rstrip('\n')) 

82 except queue.Empty: 

83 break 

84 

85 

86def _get_function(name): 

87 return getattr(functions, name) 

88 

89 

90def _cleanup_process(process, thread): 

91 try: 

92 process.kill() 

93 process.wait() 

94 except OSError: 

95 # Raised if the process is already killed. 

96 pass 

97 thread.join() 

98 for stream in [process.stdin, process.stdout, process.stderr]: 

99 try: 

100 stream.close() 

101 except OSError: 

102 # Raised if the stream is broken. 

103 pass 

104 

105 

106class _InferenceStateProcess: 

107 def __init__(self, inference_state: 'InferenceState') -> None: 

108 self._inference_state_weakref = weakref.ref(inference_state) 

109 self._handles: Dict[int, AccessHandle] = {} 

110 

111 def get_or_create_access_handle(self, obj): 

112 id_ = id(obj) 

113 try: 

114 return self.get_access_handle(id_) 

115 except KeyError: 

116 access = DirectObjectAccess(self._inference_state_weakref(), obj) 

117 handle = AccessHandle(self, access, id_) 

118 self.set_access_handle(handle) 

119 return handle 

120 

121 def get_access_handle(self, id_): 

122 return self._handles[id_] 

123 

124 def set_access_handle(self, handle): 

125 self._handles[handle.id] = handle 

126 

127 

128class InferenceStateSameProcess(_InferenceStateProcess): 

129 """ 

130 Basically just an easy access to functions.py. It has the same API 

131 as InferenceStateSubprocess and does the same thing without using a subprocess. 

132 This is necessary for the Interpreter process. 

133 """ 

134 def __getattr__(self, name): 

135 return partial(_get_function(name), self._inference_state_weakref()) 

136 

137 

138class InferenceStateSubprocess(_InferenceStateProcess): 

139 """ 

140 API to functionality which will run in a subprocess. 

141 

142 This mediates the interaction between an `InferenceState` and the actual 

143 execution of functionality running within a `CompiledSubprocess`. Available 

144 functions are defined in `.functions`, though should be accessed via 

145 attributes on this class of the same name. 

146 

147 This class is responsible for indicating that the `InferenceState` within 

148 the subprocess can be removed once the corresponding instance in the parent 

149 goes away. 

150 """ 

151 

152 def __init__( 

153 self, 

154 inference_state: 'InferenceState', 

155 compiled_subprocess: 'CompiledSubprocess', 

156 ) -> None: 

157 super().__init__(inference_state) 

158 self._used = False 

159 self._compiled_subprocess = compiled_subprocess 

160 

161 # Opaque id we'll pass to the subprocess to identify the context (an 

162 # `InferenceState`) which should be used for the request. This allows us 

163 # to make subsequent requests which operate on results from previous 

164 # ones, while keeping a single subprocess which can work with several 

165 # contexts in the parent process. Once it is no longer needed(i.e: when 

166 # this class goes away), we also use this id to indicate that the 

167 # subprocess can discard the context. 

168 # 

169 # Note: this id is deliberately coupled to this class (and not to 

170 # `InferenceState`) as this class manages access handle mappings which 

171 # must correspond to those in the subprocess. This approach also avoids 

172 # race conditions from successive `InferenceState`s with the same object 

173 # id (as observed while adding support for Python 3.13). 

174 # 

175 # This value does not need to be the `id()` of this instance, we merely 

176 # need to ensure that it enables the (visible) lifetime of the context 

177 # within the subprocess to match that of this class. We therefore also 

178 # depend on the semantics of `CompiledSubprocess.delete_inference_state` 

179 # for correctness. 

180 self._inference_state_id = id(self) 

181 

182 def __getattr__(self, name): 

183 func = _get_function(name) 

184 

185 def wrapper(*args, **kwargs): 

186 self._used = True 

187 

188 result = self._compiled_subprocess.run( 

189 self._inference_state_id, 

190 func, 

191 args=args, 

192 kwargs=kwargs, 

193 ) 

194 # IMO it should be possible to create a hook in pickle.load to 

195 # mess with the loaded objects. However it's extremely complicated 

196 # to work around this so just do it with this call. ~ dave 

197 return self._convert_access_handles(result) 

198 

199 return wrapper 

200 

201 def _convert_access_handles(self, obj): 

202 if isinstance(obj, SignatureParam): 

203 return SignatureParam(*self._convert_access_handles(tuple(obj))) 

204 elif isinstance(obj, tuple): 

205 return tuple(self._convert_access_handles(o) for o in obj) 

206 elif isinstance(obj, list): 

207 return [self._convert_access_handles(o) for o in obj] 

208 elif isinstance(obj, AccessHandle): 

209 try: 

210 # Rewrite the access handle to one we're already having. 

211 obj = self.get_access_handle(obj.id) 

212 except KeyError: 

213 obj.add_subprocess(self) 

214 self.set_access_handle(obj) 

215 elif isinstance(obj, AccessPath): 

216 return AccessPath(self._convert_access_handles(obj.accesses)) 

217 return obj 

218 

219 def __del__(self): 

220 if self._used and not self._compiled_subprocess.is_crashed: 

221 self._compiled_subprocess.delete_inference_state(self._inference_state_id) 

222 

223 

224class CompiledSubprocess: 

225 """ 

226 A subprocess which runs inference within a target environment. 

227 

228 This class manages the interface to a single instance of such a process as 

229 well as the lifecycle of the process itself. See `.__main__` and `Listener` 

230 for the implementation of the subprocess and details of the protocol. 

231 

232 A single live instance of this is maintained by `jedi.api.environment.Environment`, 

233 so that typically a single subprocess is used at a time. 

234 """ 

235 

236 is_crashed = False 

237 

238 def __init__(self, executable, env_vars=None): 

239 self._executable = executable 

240 self._env_vars = env_vars 

241 self._inference_state_deletion_queue = collections.deque() 

242 self._cleanup_callable = lambda: None 

243 

244 def __repr__(self): 

245 pid = os.getpid() 

246 return '<%s _executable=%r, is_crashed=%r, pid=%r>' % ( 

247 self.__class__.__name__, 

248 self._executable, 

249 self.is_crashed, 

250 pid, 

251 ) 

252 

253 @memoize_method 

254 def _get_process(self): 

255 debug.dbg('Start environment subprocess %s', self._executable) 

256 parso_path = sys.modules['parso'].__file__ 

257 args = ( 

258 self._executable, 

259 _MAIN_PATH, 

260 os.path.dirname(os.path.dirname(parso_path)), 

261 '.'.join(str(x) for x in sys.version_info[:3]), 

262 ) 

263 process = _GeneralizedPopen( 

264 args, 

265 stdin=subprocess.PIPE, 

266 stdout=subprocess.PIPE, 

267 stderr=subprocess.PIPE, 

268 env=self._env_vars 

269 ) 

270 self._stderr_queue = queue.Queue() 

271 self._stderr_thread = t = Thread( 

272 target=_enqueue_output, 

273 args=(process.stderr, self._stderr_queue) 

274 ) 

275 t.daemon = True 

276 t.start() 

277 # Ensure the subprocess is properly cleaned up when the object 

278 # is garbage collected. 

279 self._cleanup_callable = weakref.finalize(self, 

280 _cleanup_process, 

281 process, 

282 t) 

283 return process 

284 

285 def run(self, inference_state_id, function, args=(), kwargs={}): 

286 # Delete old inference_states. 

287 while True: 

288 try: 

289 delete_id = self._inference_state_deletion_queue.pop() 

290 except IndexError: 

291 break 

292 else: 

293 self._send(delete_id, None) 

294 

295 assert callable(function) 

296 return self._send(inference_state_id, function, args, kwargs) 

297 

298 def get_sys_path(self): 

299 return self._send(None, functions.get_sys_path, (), {}) 

300 

301 def _kill(self): 

302 self.is_crashed = True 

303 self._cleanup_callable() 

304 

305 def _send(self, inference_state_id, function, args=(), kwargs={}): 

306 if self.is_crashed: 

307 raise InternalError("The subprocess %s has crashed." % self._executable) 

308 

309 data = inference_state_id, function, args, kwargs 

310 try: 

311 pickle_dump(data, self._get_process().stdin, PICKLE_PROTOCOL) 

312 except BrokenPipeError: 

313 self._kill() 

314 raise InternalError("The subprocess %s was killed. Maybe out of memory?" 

315 % self._executable) 

316 

317 try: 

318 is_exception, traceback, result = pickle_load(self._get_process().stdout) 

319 except EOFError as eof_error: 

320 try: 

321 stderr = self._get_process().stderr.read().decode('utf-8', 'replace') 

322 except Exception as exc: 

323 stderr = '<empty/not available (%r)>' % exc 

324 self._kill() 

325 _add_stderr_to_debug(self._stderr_queue) 

326 raise InternalError( 

327 "The subprocess %s has crashed (%r, stderr=%s)." % ( 

328 self._executable, 

329 eof_error, 

330 stderr, 

331 )) 

332 

333 _add_stderr_to_debug(self._stderr_queue) 

334 

335 if is_exception: 

336 # Replace the attribute error message with a the traceback. It's 

337 # way more informative. 

338 result.args = (traceback,) 

339 raise result 

340 return result 

341 

342 def delete_inference_state(self, inference_state_id): 

343 """ 

344 Indicate that an inference state (in the subprocess) is no longer 

345 needed. 

346 

347 The state corresponding to the given id will become inaccessible and the 

348 id may safely be re-used to refer to a different context. 

349 

350 Note: it is not guaranteed that the corresponding state will actually be 

351 deleted immediately. 

352 """ 

353 # Warning: if changing the semantics of context deletion see the comment 

354 # in `InferenceStateSubprocess.__init__` regarding potential race 

355 # conditions. 

356 

357 # Currently we are not deleting the related state instantly. They only 

358 # get deleted once the subprocess is used again. It would probably a 

359 # better solution to move all of this into a thread. However, the memory 

360 # usage of a single inference_state shouldn't be that high. 

361 self._inference_state_deletion_queue.append(inference_state_id) 

362 

363 

364class Listener: 

365 """ 

366 Main loop for the subprocess which actually does the inference. 

367 

368 This class runs within the target environment. It listens to instructions 

369 from the parent process, runs inference and returns the results. 

370 

371 The subprocess has a long lifetime and is expected to process several 

372 requests, including for different `InferenceState` instances in the parent. 

373 See `CompiledSubprocess` for the parent half of the system. 

374 

375 Communication is via pickled data sent serially over stdin and stdout. 

376 Stderr is read only if the child process crashes. 

377 

378 The request protocol is a 4-tuple of: 

379 * inference_state_id | None: an opaque identifier of the parent's 

380 `InferenceState`. An `InferenceState` operating over an 

381 `InterpreterEnvironment` is created within this process for each of 

382 these, ensuring that each parent context has a corresponding context 

383 here. This allows context to be persisted between requests. Unless 

384 `None`, the local `InferenceState` will be passed to the given function 

385 as the first positional argument. 

386 * function | None: the function to run. This is expected to be a member of 

387 `.functions`. `None` indicates that the corresponding inference state is 

388 no longer needed and should be dropped. 

389 * args: positional arguments to the `function`. If any of these are 

390 `AccessHandle` instances they will be adapted to the local 

391 `InferenceState` before being passed. 

392 * kwargs: keyword arguments to the `function`. If any of these are 

393 `AccessHandle` instances they will be adapted to the local 

394 `InferenceState` before being passed. 

395 

396 The result protocol is a 3-tuple of either: 

397 * (False, None, function result): if the function returns without error, or 

398 * (True, traceback, exception): if the function raises an exception 

399 """ 

400 

401 def __init__(self): 

402 self._inference_states = {} 

403 

404 def _get_inference_state(self, function, inference_state_id): 

405 from jedi.inference import InferenceState 

406 

407 try: 

408 inference_state = self._inference_states[inference_state_id] 

409 except KeyError: 

410 from jedi import InterpreterEnvironment 

411 inference_state = InferenceState( 

412 # The project is not actually needed. Nothing should need to 

413 # access it. 

414 project=None, 

415 environment=InterpreterEnvironment() 

416 ) 

417 self._inference_states[inference_state_id] = inference_state 

418 return inference_state 

419 

420 def _run(self, inference_state_id, function, args, kwargs): 

421 if inference_state_id is None: 

422 return function(*args, **kwargs) 

423 elif function is None: 

424 # Warning: if changing the semantics of context deletion see the comment 

425 # in `InferenceStateSubprocess.__init__` regarding potential race 

426 # conditions. 

427 del self._inference_states[inference_state_id] 

428 else: 

429 inference_state = self._get_inference_state(function, inference_state_id) 

430 

431 # Exchange all handles 

432 args = list(args) 

433 for i, arg in enumerate(args): 

434 if isinstance(arg, AccessHandle): 

435 args[i] = inference_state.compiled_subprocess.get_access_handle(arg.id) 

436 for key, value in kwargs.items(): 

437 if isinstance(value, AccessHandle): 

438 kwargs[key] = inference_state.compiled_subprocess.get_access_handle(value.id) 

439 

440 return function(inference_state, *args, **kwargs) 

441 

442 def listen(self): 

443 stdout = sys.stdout 

444 # Mute stdout. Nobody should actually be able to write to it, 

445 # because stdout is used for IPC. 

446 sys.stdout = open(os.devnull, 'w') 

447 stdin = sys.stdin 

448 stdout = stdout.buffer 

449 stdin = stdin.buffer 

450 

451 while True: 

452 try: 

453 payload = pickle_load(stdin) 

454 except EOFError: 

455 # It looks like the parent process closed. 

456 # Don't make a big fuss here and just exit. 

457 exit(0) 

458 try: 

459 result = False, None, self._run(*payload) 

460 except Exception as e: 

461 result = True, traceback.format_exc(), e 

462 

463 pickle_dump(result, stdout, PICKLE_PROTOCOL) 

464 

465 

466class AccessHandle: 

467 def __init__( 

468 self, 

469 subprocess: _InferenceStateProcess, 

470 access: DirectObjectAccess, 

471 id_: int, 

472 ) -> None: 

473 self.access = access 

474 self._subprocess = subprocess 

475 self.id = id_ 

476 

477 def add_subprocess(self, subprocess): 

478 self._subprocess = subprocess 

479 

480 def __repr__(self): 

481 try: 

482 detail = self.access 

483 except AttributeError: 

484 detail = '#' + str(self.id) 

485 return '<%s of %s>' % (self.__class__.__name__, detail) 

486 

487 def __getstate__(self): 

488 return self.id 

489 

490 def __setstate__(self, state): 

491 self.id = state 

492 

493 def __getattr__(self, name): 

494 if name in ('id', 'access') or name.startswith('_'): 

495 raise AttributeError("Something went wrong with unpickling") 

496 

497 # print('getattr', name, file=sys.stderr) 

498 return partial(self._workaround, name) 

499 

500 def _workaround(self, name, *args, **kwargs): 

501 """ 

502 TODO Currently we're passing slice objects around. This should not 

503 happen. They are also the only unhashable objects that we're passing 

504 around. 

505 """ 

506 if args and isinstance(args[0], slice): 

507 return self._subprocess.get_compiled_method_return(self.id, name, *args, **kwargs) 

508 return self._cached_results(name, *args, **kwargs) 

509 

510 @memoize_method 

511 def _cached_results(self, name, *args, **kwargs): 

512 return self._subprocess.get_compiled_method_return(self.id, name, *args, **kwargs)