1# Licensed under the LGPL: https://www.gnu.org/licenses/old-licenses/lgpl-2.1.en.html
2# For details: https://github.com/pylint-dev/astroid/blob/main/LICENSE
3# Copyright (c) https://github.com/pylint-dev/astroid/blob/main/CONTRIBUTORS.txt
4
5import textwrap
6
7from astroid import nodes
8from astroid.brain.helpers import register_module_extender
9from astroid.builder import parse
10from astroid.const import PY310_PLUS, PY311_PLUS
11from astroid.manager import AstroidManager
12
13
14def _subprocess_transform() -> nodes.Module:
15 communicate = (bytes("string", "ascii"), bytes("string", "ascii"))
16 communicate_signature = "def communicate(self, input=None, timeout=None)"
17 args = """\
18 self, args, bufsize=-1, executable=None, stdin=None, stdout=None, stderr=None,
19 preexec_fn=None, close_fds=True, shell=False, cwd=None, env=None,
20 universal_newlines=None, startupinfo=None, creationflags=0, restore_signals=True,
21 start_new_session=False, pass_fds=(), *, encoding=None, errors=None, text=None,
22 user=None, group=None, extra_groups=None, umask=-1"""
23
24 if PY310_PLUS:
25 args += ", pipesize=-1"
26 if PY311_PLUS:
27 args += ", process_group=None"
28
29 init = f"""
30 def __init__({args}):
31 pass"""
32 wait_signature = "def wait(self, timeout=None)"
33 ctx_manager = """
34 def __enter__(self): return self
35 def __exit__(self, *args): pass
36 """
37 py3_args = "args = []"
38
39 check_output_signature = """
40 check_output(
41 args, *,
42 stdin=None,
43 stderr=None,
44 shell=False,
45 cwd=None,
46 encoding=None,
47 errors=None,
48 universal_newlines=False,
49 timeout=None,
50 env=None,
51 text=None,
52 restore_signals=True,
53 preexec_fn=None,
54 pass_fds=(),
55 input=None,
56 bufsize=0,
57 executable=None,
58 close_fds=False,
59 startupinfo=None,
60 creationflags=0,
61 start_new_session=False
62 ):
63 """.strip()
64
65 code = textwrap.dedent(
66 f"""
67 def {check_output_signature}
68 if universal_newlines:
69 return ""
70 return b""
71
72 class Popen(object):
73 returncode = pid = 0
74 stdin = stdout = stderr = file()
75 {py3_args}
76
77 {communicate_signature}:
78 return {communicate!r}
79 {wait_signature}:
80 return self.returncode
81 def poll(self):
82 return self.returncode
83 def send_signal(self, signal):
84 pass
85 def terminate(self):
86 pass
87 def kill(self):
88 pass
89 {ctx_manager}
90 @classmethod
91 def __class_getitem__(cls, item):
92 pass
93 """
94 )
95
96 init_lines = textwrap.dedent(init).splitlines()
97 indented_init = "\n".join(" " * 4 + line for line in init_lines)
98 code += indented_init
99 return parse(code)
100
101
102def register(manager: AstroidManager) -> None:
103 register_module_extender(manager, "subprocess", _subprocess_transform)