Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/jedi/inference/compiled/subprocess/__init__.py: 26%
233 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-20 06:09 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-20 06:09 +0000
1"""
2Makes it possible to do the compiled analysis in a subprocess. This has two
3goals:
51. Making it safer - Segfaults and RuntimeErrors as well as stdout/stderr can
6 be ignored and dealt with.
72. Make it possible to handle different Python versions as well as virtualenvs.
8"""
10import collections
11import os
12import sys
13import queue
14import subprocess
15import traceback
16import weakref
17from functools import partial
18from threading import Thread
20from jedi._compatibility import pickle_dump, pickle_load
21from jedi import debug
22from jedi.cache import memoize_method
23from jedi.inference.compiled.subprocess import functions
24from jedi.inference.compiled.access import DirectObjectAccess, AccessPath, \
25 SignatureParam
26from jedi.api.exceptions import InternalError
29_MAIN_PATH = os.path.join(os.path.dirname(__file__), '__main__.py')
30PICKLE_PROTOCOL = 4
33def _GeneralizedPopen(*args, **kwargs):
34 if os.name == 'nt':
35 try:
36 # Was introduced in Python 3.7.
37 CREATE_NO_WINDOW = subprocess.CREATE_NO_WINDOW
38 except AttributeError:
39 CREATE_NO_WINDOW = 0x08000000
40 kwargs['creationflags'] = CREATE_NO_WINDOW
41 # The child process doesn't need file descriptors except 0, 1, 2.
42 # This is unix only.
43 kwargs['close_fds'] = 'posix' in sys.builtin_module_names
45 return subprocess.Popen(*args, **kwargs)
48def _enqueue_output(out, queue_):
49 for line in iter(out.readline, b''):
50 queue_.put(line)
53def _add_stderr_to_debug(stderr_queue):
54 while True:
55 # Try to do some error reporting from the subprocess and print its
56 # stderr contents.
57 try:
58 line = stderr_queue.get_nowait()
59 line = line.decode('utf-8', 'replace')
60 debug.warning('stderr output: %s' % line.rstrip('\n'))
61 except queue.Empty:
62 break
65def _get_function(name):
66 return getattr(functions, name)
69def _cleanup_process(process, thread):
70 try:
71 process.kill()
72 process.wait()
73 except OSError:
74 # Raised if the process is already killed.
75 pass
76 thread.join()
77 for stream in [process.stdin, process.stdout, process.stderr]:
78 try:
79 stream.close()
80 except OSError:
81 # Raised if the stream is broken.
82 pass
85class _InferenceStateProcess:
86 def __init__(self, inference_state):
87 self._inference_state_weakref = weakref.ref(inference_state)
88 self._inference_state_id = id(inference_state)
89 self._handles = {}
91 def get_or_create_access_handle(self, obj):
92 id_ = id(obj)
93 try:
94 return self.get_access_handle(id_)
95 except KeyError:
96 access = DirectObjectAccess(self._inference_state_weakref(), obj)
97 handle = AccessHandle(self, access, id_)
98 self.set_access_handle(handle)
99 return handle
101 def get_access_handle(self, id_):
102 return self._handles[id_]
104 def set_access_handle(self, handle):
105 self._handles[handle.id] = handle
108class InferenceStateSameProcess(_InferenceStateProcess):
109 """
110 Basically just an easy access to functions.py. It has the same API
111 as InferenceStateSubprocess and does the same thing without using a subprocess.
112 This is necessary for the Interpreter process.
113 """
114 def __getattr__(self, name):
115 return partial(_get_function(name), self._inference_state_weakref())
118class InferenceStateSubprocess(_InferenceStateProcess):
119 def __init__(self, inference_state, compiled_subprocess):
120 super().__init__(inference_state)
121 self._used = False
122 self._compiled_subprocess = compiled_subprocess
124 def __getattr__(self, name):
125 func = _get_function(name)
127 def wrapper(*args, **kwargs):
128 self._used = True
130 result = self._compiled_subprocess.run(
131 self._inference_state_weakref(),
132 func,
133 args=args,
134 kwargs=kwargs,
135 )
136 # IMO it should be possible to create a hook in pickle.load to
137 # mess with the loaded objects. However it's extremely complicated
138 # to work around this so just do it with this call. ~ dave
139 return self._convert_access_handles(result)
141 return wrapper
143 def _convert_access_handles(self, obj):
144 if isinstance(obj, SignatureParam):
145 return SignatureParam(*self._convert_access_handles(tuple(obj)))
146 elif isinstance(obj, tuple):
147 return tuple(self._convert_access_handles(o) for o in obj)
148 elif isinstance(obj, list):
149 return [self._convert_access_handles(o) for o in obj]
150 elif isinstance(obj, AccessHandle):
151 try:
152 # Rewrite the access handle to one we're already having.
153 obj = self.get_access_handle(obj.id)
154 except KeyError:
155 obj.add_subprocess(self)
156 self.set_access_handle(obj)
157 elif isinstance(obj, AccessPath):
158 return AccessPath(self._convert_access_handles(obj.accesses))
159 return obj
161 def __del__(self):
162 if self._used and not self._compiled_subprocess.is_crashed:
163 self._compiled_subprocess.delete_inference_state(self._inference_state_id)
166class CompiledSubprocess:
167 is_crashed = False
169 def __init__(self, executable, env_vars=None):
170 self._executable = executable
171 self._env_vars = env_vars
172 self._inference_state_deletion_queue = collections.deque()
173 self._cleanup_callable = lambda: None
175 def __repr__(self):
176 pid = os.getpid()
177 return '<%s _executable=%r, is_crashed=%r, pid=%r>' % (
178 self.__class__.__name__,
179 self._executable,
180 self.is_crashed,
181 pid,
182 )
184 @memoize_method
185 def _get_process(self):
186 debug.dbg('Start environment subprocess %s', self._executable)
187 parso_path = sys.modules['parso'].__file__
188 args = (
189 self._executable,
190 _MAIN_PATH,
191 os.path.dirname(os.path.dirname(parso_path)),
192 '.'.join(str(x) for x in sys.version_info[:3]),
193 )
194 process = _GeneralizedPopen(
195 args,
196 stdin=subprocess.PIPE,
197 stdout=subprocess.PIPE,
198 stderr=subprocess.PIPE,
199 env=self._env_vars
200 )
201 self._stderr_queue = queue.Queue()
202 self._stderr_thread = t = Thread(
203 target=_enqueue_output,
204 args=(process.stderr, self._stderr_queue)
205 )
206 t.daemon = True
207 t.start()
208 # Ensure the subprocess is properly cleaned up when the object
209 # is garbage collected.
210 self._cleanup_callable = weakref.finalize(self,
211 _cleanup_process,
212 process,
213 t)
214 return process
216 def run(self, inference_state, function, args=(), kwargs={}):
217 # Delete old inference_states.
218 while True:
219 try:
220 inference_state_id = self._inference_state_deletion_queue.pop()
221 except IndexError:
222 break
223 else:
224 self._send(inference_state_id, None)
226 assert callable(function)
227 return self._send(id(inference_state), function, args, kwargs)
229 def get_sys_path(self):
230 return self._send(None, functions.get_sys_path, (), {})
232 def _kill(self):
233 self.is_crashed = True
234 self._cleanup_callable()
236 def _send(self, inference_state_id, function, args=(), kwargs={}):
237 if self.is_crashed:
238 raise InternalError("The subprocess %s has crashed." % self._executable)
240 data = inference_state_id, function, args, kwargs
241 try:
242 pickle_dump(data, self._get_process().stdin, PICKLE_PROTOCOL)
243 except BrokenPipeError:
244 self._kill()
245 raise InternalError("The subprocess %s was killed. Maybe out of memory?"
246 % self._executable)
248 try:
249 is_exception, traceback, result = pickle_load(self._get_process().stdout)
250 except EOFError as eof_error:
251 try:
252 stderr = self._get_process().stderr.read().decode('utf-8', 'replace')
253 except Exception as exc:
254 stderr = '<empty/not available (%r)>' % exc
255 self._kill()
256 _add_stderr_to_debug(self._stderr_queue)
257 raise InternalError(
258 "The subprocess %s has crashed (%r, stderr=%s)." % (
259 self._executable,
260 eof_error,
261 stderr,
262 ))
264 _add_stderr_to_debug(self._stderr_queue)
266 if is_exception:
267 # Replace the attribute error message with a the traceback. It's
268 # way more informative.
269 result.args = (traceback,)
270 raise result
271 return result
273 def delete_inference_state(self, inference_state_id):
274 """
275 Currently we are not deleting inference_state instantly. They only get
276 deleted once the subprocess is used again. It would probably a better
277 solution to move all of this into a thread. However, the memory usage
278 of a single inference_state shouldn't be that high.
279 """
280 # With an argument - the inference_state gets deleted.
281 self._inference_state_deletion_queue.append(inference_state_id)
284class Listener:
285 def __init__(self):
286 self._inference_states = {}
287 # TODO refactor so we don't need to process anymore just handle
288 # controlling.
289 self._process = _InferenceStateProcess(Listener)
291 def _get_inference_state(self, function, inference_state_id):
292 from jedi.inference import InferenceState
294 try:
295 inference_state = self._inference_states[inference_state_id]
296 except KeyError:
297 from jedi import InterpreterEnvironment
298 inference_state = InferenceState(
299 # The project is not actually needed. Nothing should need to
300 # access it.
301 project=None,
302 environment=InterpreterEnvironment()
303 )
304 self._inference_states[inference_state_id] = inference_state
305 return inference_state
307 def _run(self, inference_state_id, function, args, kwargs):
308 if inference_state_id is None:
309 return function(*args, **kwargs)
310 elif function is None:
311 del self._inference_states[inference_state_id]
312 else:
313 inference_state = self._get_inference_state(function, inference_state_id)
315 # Exchange all handles
316 args = list(args)
317 for i, arg in enumerate(args):
318 if isinstance(arg, AccessHandle):
319 args[i] = inference_state.compiled_subprocess.get_access_handle(arg.id)
320 for key, value in kwargs.items():
321 if isinstance(value, AccessHandle):
322 kwargs[key] = inference_state.compiled_subprocess.get_access_handle(value.id)
324 return function(inference_state, *args, **kwargs)
326 def listen(self):
327 stdout = sys.stdout
328 # Mute stdout. Nobody should actually be able to write to it,
329 # because stdout is used for IPC.
330 sys.stdout = open(os.devnull, 'w')
331 stdin = sys.stdin
332 stdout = stdout.buffer
333 stdin = stdin.buffer
335 while True:
336 try:
337 payload = pickle_load(stdin)
338 except EOFError:
339 # It looks like the parent process closed.
340 # Don't make a big fuss here and just exit.
341 exit(0)
342 try:
343 result = False, None, self._run(*payload)
344 except Exception as e:
345 result = True, traceback.format_exc(), e
347 pickle_dump(result, stdout, PICKLE_PROTOCOL)
350class AccessHandle:
351 def __init__(self, subprocess, access, id_):
352 self.access = access
353 self._subprocess = subprocess
354 self.id = id_
356 def add_subprocess(self, subprocess):
357 self._subprocess = subprocess
359 def __repr__(self):
360 try:
361 detail = self.access
362 except AttributeError:
363 detail = '#' + str(self.id)
364 return '<%s of %s>' % (self.__class__.__name__, detail)
366 def __getstate__(self):
367 return self.id
369 def __setstate__(self, state):
370 self.id = state
372 def __getattr__(self, name):
373 if name in ('id', 'access') or name.startswith('_'):
374 raise AttributeError("Something went wrong with unpickling")
376 # print('getattr', name, file=sys.stderr)
377 return partial(self._workaround, name)
379 def _workaround(self, name, *args, **kwargs):
380 """
381 TODO Currently we're passing slice objects around. This should not
382 happen. They are also the only unhashable objects that we're passing
383 around.
384 """
385 if args and isinstance(args[0], slice):
386 return self._subprocess.get_compiled_method_return(self.id, name, *args, **kwargs)
387 return self._cached_results(name, *args, **kwargs)
389 @memoize_method
390 def _cached_results(self, name, *args, **kwargs):
391 return self._subprocess.get_compiled_method_return(self.id, name, *args, **kwargs)