1"""
2Makes it possible to do the compiled analysis in a subprocess. This has two
3goals:
4
51. Making it safer - Segfaults and RuntimeErrors as well as stdout/stderr can
6 be ignored and dealt with.
72. Make it possible to handle different Python versions as well as virtualenvs.
8
9The architecture here is briefly:
10 - For each Jedi `Environment` there is a corresponding subprocess which
11 operates within the target environment. If the subprocess dies it is replaced
12 at this level.
13 - `CompiledSubprocess` manages exactly one subprocess and handles communication
14 from the parent side.
15 - `Listener` runs within the subprocess, processing each request and yielding
16 results.
17 - `InterpreterEnvironment` provides an API which matches that of `Environment`,
18 but runs functionality inline rather than within a subprocess. It is thus
19 used both directly in places where a subprocess is unnecessary and/or
20 undesirable and also within subprocesses themselves.
21 - `InferenceStateSubprocess` (or `InferenceStateSameProcess`) provide high
22 level access to functionality within the subprocess from within the parent.
23 Each `InterpreterState` has an instance of one of these, provided by its
24 environment.
25"""
26
27import collections
28import os
29import sys
30import queue
31import subprocess
32import traceback
33import weakref
34from functools import partial
35from threading import Thread
36from typing import Dict, TYPE_CHECKING, Any
37
38from jedi._compatibility import pickle_dump, pickle_load
39from jedi import debug
40from jedi.cache import memoize_method
41from jedi.inference.compiled.subprocess import functions
42from jedi.inference.compiled.access import DirectObjectAccess, AccessPath, \
43 SignatureParam
44from jedi.api.exceptions import InternalError
45
46if TYPE_CHECKING:
47 from jedi.inference import InferenceState
48
49
50_MAIN_PATH = os.path.join(os.path.dirname(__file__), '__main__.py')
51PICKLE_PROTOCOL = 4
52
53
54def _GeneralizedPopen(*args, **kwargs):
55 if sys.platform == "win32":
56 try:
57 # Was introduced in Python 3.7.
58 CREATE_NO_WINDOW = subprocess.CREATE_NO_WINDOW
59 except AttributeError:
60 CREATE_NO_WINDOW = 0x08000000
61 kwargs['creationflags'] = CREATE_NO_WINDOW
62 # The child process doesn't need file descriptors except 0, 1, 2.
63 # This is unix only.
64 kwargs['close_fds'] = 'posix' in sys.builtin_module_names
65
66 return subprocess.Popen(*args, **kwargs)
67
68
69def _enqueue_output(out, queue_):
70 for line in iter(out.readline, b''):
71 queue_.put(line)
72
73
74def _add_stderr_to_debug(stderr_queue):
75 while True:
76 # Try to do some error reporting from the subprocess and print its
77 # stderr contents.
78 try:
79 line = stderr_queue.get_nowait()
80 line = line.decode('utf-8', 'replace')
81 debug.warning('stderr output: %s' % line.rstrip('\n'))
82 except queue.Empty:
83 break
84
85
86def _get_function(name):
87 return getattr(functions, name)
88
89
90def _cleanup_process(process, thread):
91 try:
92 process.kill()
93 process.wait()
94 except OSError:
95 # Raised if the process is already killed.
96 pass
97 thread.join()
98 for stream in [process.stdin, process.stdout, process.stderr]:
99 try:
100 stream.close()
101 except OSError:
102 # Raised if the stream is broken.
103 pass
104
105
106class _InferenceStateProcess:
107 get_compiled_method_return: Any
108
109 def __init__(self, inference_state: 'InferenceState') -> None:
110 self._inference_state_weakref = weakref.ref(inference_state)
111 self._handles: Dict[int, AccessHandle] = {}
112
113 def get_or_create_access_handle(self, obj):
114 id_ = id(obj)
115 try:
116 return self.get_access_handle(id_)
117 except KeyError:
118 access = DirectObjectAccess(self._inference_state_weakref(), obj)
119 handle = AccessHandle(self, access, id_)
120 self.set_access_handle(handle)
121 return handle
122
123 def get_access_handle(self, id_):
124 return self._handles[id_]
125
126 def set_access_handle(self, handle):
127 self._handles[handle.id] = handle
128
129
130class InferenceStateSameProcess(_InferenceStateProcess):
131 """
132 Basically just an easy access to functions.py. It has the same API
133 as InferenceStateSubprocess and does the same thing without using a subprocess.
134 This is necessary for the Interpreter process.
135 """
136 def __getattr__(self, name):
137 return partial(_get_function(name), self._inference_state_weakref())
138
139
140class InferenceStateSubprocess(_InferenceStateProcess):
141 """
142 API to functionality which will run in a subprocess.
143
144 This mediates the interaction between an `InferenceState` and the actual
145 execution of functionality running within a `CompiledSubprocess`. Available
146 functions are defined in `.functions`, though should be accessed via
147 attributes on this class of the same name.
148
149 This class is responsible for indicating that the `InferenceState` within
150 the subprocess can be removed once the corresponding instance in the parent
151 goes away.
152 """
153
154 def __init__(
155 self,
156 inference_state: 'InferenceState',
157 compiled_subprocess: 'CompiledSubprocess',
158 ) -> None:
159 super().__init__(inference_state)
160 self._used = False
161 self._compiled_subprocess = compiled_subprocess
162
163 # Opaque id we'll pass to the subprocess to identify the context (an
164 # `InferenceState`) which should be used for the request. This allows us
165 # to make subsequent requests which operate on results from previous
166 # ones, while keeping a single subprocess which can work with several
167 # contexts in the parent process. Once it is no longer needed(i.e: when
168 # this class goes away), we also use this id to indicate that the
169 # subprocess can discard the context.
170 #
171 # Note: this id is deliberately coupled to this class (and not to
172 # `InferenceState`) as this class manages access handle mappings which
173 # must correspond to those in the subprocess. This approach also avoids
174 # race conditions from successive `InferenceState`s with the same object
175 # id (as observed while adding support for Python 3.13).
176 #
177 # This value does not need to be the `id()` of this instance, we merely
178 # need to ensure that it enables the (visible) lifetime of the context
179 # within the subprocess to match that of this class. We therefore also
180 # depend on the semantics of `CompiledSubprocess.delete_inference_state`
181 # for correctness.
182 self._inference_state_id = id(self)
183
184 def __getattr__(self, name):
185 func = _get_function(name)
186
187 def wrapper(*args, **kwargs):
188 self._used = True
189
190 result = self._compiled_subprocess.run(
191 self._inference_state_id,
192 func,
193 args=args,
194 kwargs=kwargs,
195 )
196 # IMO it should be possible to create a hook in pickle.load to
197 # mess with the loaded objects. However it's extremely complicated
198 # to work around this so just do it with this call. ~ dave
199 return self._convert_access_handles(result)
200
201 return wrapper
202
203 def _convert_access_handles(self, obj):
204 if isinstance(obj, SignatureParam):
205 return SignatureParam(*self._convert_access_handles(tuple(obj)))
206 elif isinstance(obj, tuple):
207 return tuple(self._convert_access_handles(o) for o in obj)
208 elif isinstance(obj, list):
209 return [self._convert_access_handles(o) for o in obj]
210 elif isinstance(obj, AccessHandle):
211 try:
212 # Rewrite the access handle to one we're already having.
213 obj = self.get_access_handle(obj.id)
214 except KeyError:
215 obj.add_subprocess(self)
216 self.set_access_handle(obj)
217 elif isinstance(obj, AccessPath):
218 return AccessPath(self._convert_access_handles(obj.accesses))
219 return obj
220
221 def __del__(self):
222 if self._used and not self._compiled_subprocess.is_crashed:
223 self._compiled_subprocess.delete_inference_state(self._inference_state_id)
224
225
226class CompiledSubprocess:
227 """
228 A subprocess which runs inference within a target environment.
229
230 This class manages the interface to a single instance of such a process as
231 well as the lifecycle of the process itself. See `.__main__` and `Listener`
232 for the implementation of the subprocess and details of the protocol.
233
234 A single live instance of this is maintained by `jedi.api.environment.Environment`,
235 so that typically a single subprocess is used at a time.
236 """
237
238 is_crashed = False
239
240 def __init__(self, executable, env_vars=None):
241 self._executable = executable
242 self._env_vars = env_vars
243 self._inference_state_deletion_queue = collections.deque()
244 self._cleanup_callable = lambda: None
245
246 def __repr__(self):
247 pid = os.getpid()
248 return '<%s _executable=%r, is_crashed=%r, pid=%r>' % (
249 self.__class__.__name__,
250 self._executable,
251 self.is_crashed,
252 pid,
253 )
254
255 @memoize_method
256 def _get_process(self):
257 debug.dbg('Start environment subprocess %s', self._executable)
258 parso_path = sys.modules['parso'].__file__
259 args = (
260 self._executable,
261 _MAIN_PATH,
262 os.path.dirname(os.path.dirname(parso_path)),
263 '.'.join(str(x) for x in sys.version_info[:3]),
264 )
265 process = _GeneralizedPopen(
266 args,
267 stdin=subprocess.PIPE,
268 stdout=subprocess.PIPE,
269 stderr=subprocess.PIPE,
270 env=self._env_vars
271 )
272 self._stderr_queue = queue.Queue()
273 self._stderr_thread = t = Thread(
274 target=_enqueue_output,
275 args=(process.stderr, self._stderr_queue)
276 )
277 t.daemon = True
278 t.start()
279 # Ensure the subprocess is properly cleaned up when the object
280 # is garbage collected.
281 self._cleanup_callable = weakref.finalize(self,
282 _cleanup_process,
283 process,
284 t)
285 return process
286
287 def run(self, inference_state_id, function, args=(), kwargs={}):
288 # Delete old inference_states.
289 while True:
290 try:
291 delete_id = self._inference_state_deletion_queue.pop()
292 except IndexError:
293 break
294 else:
295 self._send(delete_id, None)
296
297 assert callable(function)
298 return self._send(inference_state_id, function, args, kwargs)
299
300 def get_sys_path(self):
301 return self._send(None, functions.get_sys_path, (), {})
302
303 def _kill(self):
304 self.is_crashed = True
305 self._cleanup_callable()
306
307 def _send(self, inference_state_id, function, args=(), kwargs={}):
308 if self.is_crashed:
309 raise InternalError("The subprocess %s has crashed." % self._executable)
310
311 data = inference_state_id, function, args, kwargs
312 try:
313 pickle_dump(data, self._get_process().stdin, PICKLE_PROTOCOL)
314 except BrokenPipeError:
315 self._kill()
316 raise InternalError("The subprocess %s was killed. Maybe out of memory?"
317 % self._executable)
318
319 try:
320 is_exception, traceback, result = pickle_load(self._get_process().stdout)
321 except EOFError as eof_error:
322 try:
323 stderr = self._get_process().stderr.read().decode('utf-8', 'replace')
324 except Exception as exc:
325 stderr = '<empty/not available (%r)>' % exc
326 self._kill()
327 _add_stderr_to_debug(self._stderr_queue)
328 raise InternalError(
329 "The subprocess %s has crashed (%r, stderr=%s)." % (
330 self._executable,
331 eof_error,
332 stderr,
333 ))
334
335 _add_stderr_to_debug(self._stderr_queue)
336
337 if is_exception:
338 # Replace the attribute error message with a the traceback. It's
339 # way more informative.
340 result.args = (traceback,)
341 raise result
342 return result
343
344 def delete_inference_state(self, inference_state_id):
345 """
346 Indicate that an inference state (in the subprocess) is no longer
347 needed.
348
349 The state corresponding to the given id will become inaccessible and the
350 id may safely be re-used to refer to a different context.
351
352 Note: it is not guaranteed that the corresponding state will actually be
353 deleted immediately.
354 """
355 # Warning: if changing the semantics of context deletion see the comment
356 # in `InferenceStateSubprocess.__init__` regarding potential race
357 # conditions.
358
359 # Currently we are not deleting the related state instantly. They only
360 # get deleted once the subprocess is used again. It would probably a
361 # better solution to move all of this into a thread. However, the memory
362 # usage of a single inference_state shouldn't be that high.
363 self._inference_state_deletion_queue.append(inference_state_id)
364
365
366class Listener:
367 """
368 Main loop for the subprocess which actually does the inference.
369
370 This class runs within the target environment. It listens to instructions
371 from the parent process, runs inference and returns the results.
372
373 The subprocess has a long lifetime and is expected to process several
374 requests, including for different `InferenceState` instances in the parent.
375 See `CompiledSubprocess` for the parent half of the system.
376
377 Communication is via pickled data sent serially over stdin and stdout.
378 Stderr is read only if the child process crashes.
379
380 The request protocol is a 4-tuple of:
381 * inference_state_id | None: an opaque identifier of the parent's
382 `InferenceState`. An `InferenceState` operating over an
383 `InterpreterEnvironment` is created within this process for each of
384 these, ensuring that each parent context has a corresponding context
385 here. This allows context to be persisted between requests. Unless
386 `None`, the local `InferenceState` will be passed to the given function
387 as the first positional argument.
388 * function | None: the function to run. This is expected to be a member of
389 `.functions`. `None` indicates that the corresponding inference state is
390 no longer needed and should be dropped.
391 * args: positional arguments to the `function`. If any of these are
392 `AccessHandle` instances they will be adapted to the local
393 `InferenceState` before being passed.
394 * kwargs: keyword arguments to the `function`. If any of these are
395 `AccessHandle` instances they will be adapted to the local
396 `InferenceState` before being passed.
397
398 The result protocol is a 3-tuple of either:
399 * (False, None, function result): if the function returns without error, or
400 * (True, traceback, exception): if the function raises an exception
401 """
402
403 def __init__(self):
404 self._inference_states = {}
405
406 def _get_inference_state(self, function, inference_state_id):
407 from jedi.inference import InferenceState
408
409 try:
410 inference_state = self._inference_states[inference_state_id]
411 except KeyError:
412 from jedi import InterpreterEnvironment
413 inference_state = InferenceState(
414 # The project is not actually needed. Nothing should need to
415 # access it.
416 project=None,
417 environment=InterpreterEnvironment()
418 )
419 self._inference_states[inference_state_id] = inference_state
420 return inference_state
421
422 def _run(self, inference_state_id, function, args, kwargs):
423 if inference_state_id is None:
424 return function(*args, **kwargs)
425 elif function is None:
426 # Warning: if changing the semantics of context deletion see the comment
427 # in `InferenceStateSubprocess.__init__` regarding potential race
428 # conditions.
429 del self._inference_states[inference_state_id]
430 else:
431 inference_state = self._get_inference_state(function, inference_state_id)
432
433 # Exchange all handles
434 args = list(args)
435 for i, arg in enumerate(args):
436 if isinstance(arg, AccessHandle):
437 args[i] = inference_state.compiled_subprocess.get_access_handle(arg.id)
438 for key, value in kwargs.items():
439 if isinstance(value, AccessHandle):
440 kwargs[key] = inference_state.compiled_subprocess.get_access_handle(value.id)
441
442 return function(inference_state, *args, **kwargs)
443
444 def listen(self):
445 stdout = sys.stdout
446 # Mute stdout. Nobody should actually be able to write to it,
447 # because stdout is used for IPC.
448 sys.stdout = open(os.devnull, 'w')
449 stdin = sys.stdin
450 stdout = stdout.buffer
451 stdin = stdin.buffer
452
453 while True:
454 try:
455 payload = pickle_load(stdin)
456 except EOFError:
457 # It looks like the parent process closed.
458 # Don't make a big fuss here and just exit.
459 exit(0)
460 try:
461 result = False, None, self._run(*payload)
462 except Exception as e:
463 result = True, traceback.format_exc(), e
464
465 pickle_dump(result, stdout, PICKLE_PROTOCOL)
466
467
468class AccessHandle:
469 def __init__(
470 self,
471 subprocess: _InferenceStateProcess,
472 access: DirectObjectAccess,
473 id_: int,
474 ) -> None:
475 self.access = access
476 self._subprocess = subprocess
477 self.id = id_
478
479 def add_subprocess(self, subprocess):
480 self._subprocess = subprocess
481
482 def __repr__(self):
483 try:
484 detail = self.access
485 except AttributeError:
486 detail = '#' + str(self.id)
487 return '<%s of %s>' % (self.__class__.__name__, detail)
488
489 def __getstate__(self):
490 return self.id
491
492 def __setstate__(self, state):
493 self.id = state
494
495 def __getattr__(self, name):
496 if name in ('id', 'access') or name.startswith('_'):
497 raise AttributeError("Something went wrong with unpickling")
498
499 # print('getattr', name, file=sys.stderr)
500 return partial(self._workaround, name)
501
502 def _workaround(self, name, *args, **kwargs):
503 """
504 TODO Currently we're passing slice objects around. This should not
505 happen. They are also the only unhashable objects that we're passing
506 around.
507 """
508 if args and isinstance(args[0], slice):
509 return self._subprocess.get_compiled_method_return(self.id, name, *args, **kwargs)
510 return self._cached_results(name, *args, **kwargs)
511
512 @memoize_method
513 def _cached_results(self, name, *args, **kwargs):
514 return self._subprocess.get_compiled_method_return(self.id, name, *args, **kwargs)