Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.10/site-packages/autoflake.py: 39%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1#!/usr/bin/env python
2# Copyright (C) Steven Myint
3#
4# Permission is hereby granted, free of charge, to any person obtaining
5# a copy of this software and associated documentation files (the
6# "Software"), to deal in the Software without restriction, including
7# without limitation the rights to use, copy, modify, merge, publish,
8# distribute, sublicense, and/or sell copies of the Software, and to
9# permit persons to whom the Software is furnished to do so, subject to
10# the following conditions:
11#
12# The above copyright notice and this permission notice shall be included
13# in all copies or substantial portions of the Software.
14#
15# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
16# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
17# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
18# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
19# CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
20# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
21# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
22"""Removes unused imports and unused variables as reported by pyflakes."""
23from __future__ import annotations
25import ast
26import collections
27import difflib
28import fnmatch
29import io
30import logging
31import os
32import pathlib
33import re
34import signal
35import string
36import sys
37import sysconfig
38import tokenize
39from collections.abc import Iterable
40from collections.abc import Mapping
41from collections.abc import MutableMapping
42from collections.abc import Sequence
43from typing import Any
44from typing import Callable
45from typing import cast
46from typing import IO
48import pyflakes.api
49import pyflakes.messages
50import pyflakes.reporter
53__version__ = "2.3.1"
56_LOGGER = logging.getLogger("autoflake")
57_LOGGER.propagate = False
59ATOMS = frozenset([tokenize.NAME, tokenize.NUMBER, tokenize.STRING])
61EXCEPT_REGEX = re.compile(r"^\s*except [\s,()\w]+ as \w+:$")
62PYTHON_SHEBANG_REGEX = re.compile(r"^#!.*\bpython[3]?\b\s*$")
64MAX_PYTHON_FILE_DETECTION_BYTES = 1024
66IGNORE_COMMENT_REGEX = re.compile(
67 r"\s*#\s{1,}autoflake:\s{1,}\bskip_file\b",
68 re.MULTILINE,
69)
72def standard_paths() -> Iterable[str]:
73 """Yield paths to standard modules."""
74 paths = sysconfig.get_paths()
75 path_names = ("stdlib", "platstdlib")
76 for path_name in path_names:
77 # Yield lib paths.
78 if path_name in paths:
79 path = paths[path_name]
80 if os.path.isdir(path):
81 yield from os.listdir(path)
83 # Yield lib-dynload paths.
84 dynload_path = os.path.join(path, "lib-dynload")
85 if os.path.isdir(dynload_path):
86 yield from os.listdir(dynload_path)
89def standard_package_names() -> Iterable[str]:
90 """Yield standard module names."""
91 for name in standard_paths():
92 if name.startswith("_") or "-" in name:
93 continue
95 if "." in name and not name.endswith(("so", "py", "pyc")):
96 continue
98 yield name.split(".")[0]
101IMPORTS_WITH_SIDE_EFFECTS = {"antigravity", "rlcompleter", "this"}
103# In case they are built into CPython.
104BINARY_IMPORTS = {
105 "datetime",
106 "grp",
107 "io",
108 "json",
109 "math",
110 "multiprocessing",
111 "parser",
112 "pwd",
113 "string",
114 "operator",
115 "os",
116 "sys",
117 "time",
118}
120SAFE_IMPORTS = (
121 frozenset(standard_package_names()) - IMPORTS_WITH_SIDE_EFFECTS | BINARY_IMPORTS
122)
125def unused_import_line_numbers(
126 messages: Iterable[pyflakes.messages.Message],
127) -> Iterable[int]:
128 """Yield line numbers of unused imports."""
129 for message in messages:
130 if isinstance(message, pyflakes.messages.UnusedImport):
131 yield message.lineno
134def unused_import_module_name(
135 messages: Iterable[pyflakes.messages.Message],
136) -> Iterable[tuple[int, str]]:
137 """Yield line number and module name of unused imports."""
138 pattern = re.compile(r"\'(.+?)\'")
139 for message in messages:
140 if isinstance(message, pyflakes.messages.UnusedImport):
141 module_name = pattern.search(str(message))
142 if module_name:
143 module_name = module_name.group()[1:-1]
144 yield (message.lineno, module_name)
147def star_import_used_line_numbers(
148 messages: Iterable[pyflakes.messages.Message],
149) -> Iterable[int]:
150 """Yield line number of star import usage."""
151 for message in messages:
152 if isinstance(message, pyflakes.messages.ImportStarUsed):
153 yield message.lineno
156def star_import_usage_undefined_name(
157 messages: Iterable[pyflakes.messages.Message],
158) -> Iterable[tuple[int, str, str]]:
159 """Yield line number, undefined name, and its possible origin module."""
160 for message in messages:
161 if isinstance(message, pyflakes.messages.ImportStarUsage):
162 undefined_name = message.message_args[0]
163 module_name = message.message_args[1]
164 yield (message.lineno, undefined_name, module_name)
167def unused_variable_line_numbers(
168 messages: Iterable[pyflakes.messages.Message],
169) -> Iterable[int]:
170 """Yield line numbers of unused variables."""
171 for message in messages:
172 if isinstance(message, pyflakes.messages.UnusedVariable):
173 yield message.lineno
176def duplicate_key_line_numbers(
177 messages: Iterable[pyflakes.messages.Message],
178 source: str,
179) -> Iterable[int]:
180 """Yield line numbers of duplicate keys."""
181 messages = [
182 message
183 for message in messages
184 if isinstance(message, pyflakes.messages.MultiValueRepeatedKeyLiteral)
185 ]
187 if messages:
188 # Filter out complex cases. We don't want to bother trying to parse
189 # this stuff and get it right. We can do it on a key-by-key basis.
191 key_to_messages = create_key_to_messages_dict(messages)
193 lines = source.split("\n")
195 for key, messages in key_to_messages.items():
196 good = True
197 for message in messages:
198 line = lines[message.lineno - 1]
199 key = message.message_args[0]
201 if not dict_entry_has_key(line, key):
202 good = False
204 if good:
205 for message in messages:
206 yield message.lineno
209def create_key_to_messages_dict(
210 messages: Iterable[pyflakes.messages.MultiValueRepeatedKeyLiteral],
211) -> Mapping[Any, Iterable[pyflakes.messages.MultiValueRepeatedKeyLiteral]]:
212 """Return dict mapping the key to list of messages."""
213 dictionary: dict[
214 Any,
215 list[pyflakes.messages.MultiValueRepeatedKeyLiteral],
216 ] = collections.defaultdict(list)
217 for message in messages:
218 dictionary[message.message_args[0]].append(message)
219 return dictionary
222def check(source: str) -> Iterable[pyflakes.messages.Message]:
223 """Return messages from pyflakes."""
224 reporter = ListReporter()
225 try:
226 pyflakes.api.check(source, filename="<string>", reporter=reporter)
227 except (AttributeError, RecursionError, UnicodeDecodeError):
228 pass
229 return reporter.messages
232class StubFile:
233 """Stub out file for pyflakes."""
235 def write(self, *_: Any) -> None:
236 """Stub out."""
239class ListReporter(pyflakes.reporter.Reporter):
240 """Accumulate messages in messages list."""
242 def __init__(self) -> None:
243 """Initialize.
245 Ignore errors from Reporter.
246 """
247 ignore = StubFile()
248 pyflakes.reporter.Reporter.__init__(self, ignore, ignore)
249 self.messages: list[pyflakes.messages.Message] = []
251 def flake(self, message: pyflakes.messages.Message) -> None:
252 """Accumulate messages."""
253 self.messages.append(message)
256def extract_package_name(line: str) -> str | None:
257 """Return package name in import statement."""
258 assert "\\" not in line
259 assert "(" not in line
260 assert ")" not in line
261 assert ";" not in line
263 if line.lstrip().startswith(("import", "from")):
264 word = line.split()[1]
265 else:
266 # Ignore doctests.
267 return None
269 package = word.split(".")[0]
270 assert " " not in package
272 return package
275def multiline_import(line: str, previous_line: str = "") -> bool:
276 """Return True if import is spans multiples lines."""
277 for symbol in "()":
278 if symbol in line:
279 return True
281 return multiline_statement(line, previous_line)
284def multiline_statement(line: str, previous_line: str = "") -> bool:
285 """Return True if this is part of a multiline statement."""
286 for symbol in "\\:;":
287 if symbol in line:
288 return True
290 sio = io.StringIO(line)
291 try:
292 list(tokenize.generate_tokens(sio.readline))
293 return previous_line.rstrip().endswith("\\")
294 except (SyntaxError, tokenize.TokenError):
295 return True
298class PendingFix:
299 """Allows a rewrite operation to span multiple lines.
301 In the main rewrite loop, every time a helper function returns a
302 ``PendingFix`` object instead of a string, this object will be called
303 with the following line.
304 """
306 def __init__(self, line: str) -> None:
307 """Analyse and store the first line."""
308 self.accumulator = collections.deque([line])
310 def __call__(self, line: str) -> PendingFix | str:
311 """Process line considering the accumulator.
313 Return self to keep processing the following lines or a string
314 with the final result of all the lines processed at once.
315 """
316 raise NotImplementedError("Abstract method needs to be overwritten")
319def _valid_char_in_line(char: str, line: str) -> bool:
320 """Return True if a char appears in the line and is not commented."""
321 comment_index = line.find("#")
322 char_index = line.find(char)
323 valid_char_in_line = char_index >= 0 and (
324 comment_index > char_index or comment_index < 0
325 )
326 return valid_char_in_line
329def _top_module(module_name: str) -> str:
330 """Return the name of the top level module in the hierarchy."""
331 if module_name[0] == ".":
332 return "%LOCAL_MODULE%"
333 return module_name.split(".")[0]
336def _modules_to_remove(
337 unused_modules: Iterable[str],
338 safe_to_remove: Iterable[str] = SAFE_IMPORTS,
339) -> Iterable[str]:
340 """Discard unused modules that are not safe to remove from the list."""
341 return [x for x in unused_modules if _top_module(x) in safe_to_remove]
344def _segment_module(segment: str) -> str:
345 """Extract the module identifier inside the segment.
347 It might be the case the segment does not have a module (e.g. is composed
348 just by a parenthesis or line continuation and whitespace). In this
349 scenario we just keep the segment... These characters are not valid in
350 identifiers, so they will never be contained in the list of unused modules
351 anyway.
352 """
353 return segment.strip(string.whitespace + ",\\()") or segment
356class FilterMultilineImport(PendingFix):
357 """Remove unused imports from multiline import statements.
359 This class handles both the cases: "from imports" and "direct imports".
361 Some limitations exist (e.g. imports with comments, lines joined by ``;``,
362 etc). In these cases, the statement is left unchanged to avoid problems.
363 """
365 IMPORT_RE = re.compile(r"\bimport\b\s*")
366 INDENTATION_RE = re.compile(r"^\s*")
367 BASE_RE = re.compile(r"\bfrom\s+([^ ]+)")
368 SEGMENT_RE = re.compile(
369 r"([^,\s]+(?:[\s\\]+as[\s\\]+[^,\s]+)?[,\s\\)]*)",
370 re.M,
371 )
372 # ^ module + comma + following space (including new line and continuation)
373 IDENTIFIER_RE = re.compile(r"[^,\s]+")
375 def __init__(
376 self,
377 line: str,
378 unused_module: Iterable[str] = (),
379 remove_all_unused_imports: bool = False,
380 safe_to_remove: Iterable[str] = SAFE_IMPORTS,
381 previous_line: str = "",
382 ):
383 """Receive the same parameters as ``filter_unused_import``."""
384 self.remove: Iterable[str] = unused_module
385 self.parenthesized: bool = "(" in line
386 self.from_, imports = self.IMPORT_RE.split(line, maxsplit=1)
387 match = self.BASE_RE.search(self.from_)
388 self.base = match.group(1) if match else None
389 self.give_up: bool = False
391 if not remove_all_unused_imports:
392 if self.base and _top_module(self.base) not in safe_to_remove:
393 self.give_up = True
394 else:
395 self.remove = _modules_to_remove(self.remove, safe_to_remove)
397 if "\\" in previous_line:
398 # Ignore tricky things like "try: \<new line> import" ...
399 self.give_up = True
401 self.analyze(line)
403 PendingFix.__init__(self, imports)
405 def is_over(self, line: str | None = None) -> bool:
406 """Return True if the multiline import statement is over."""
407 line = line or self.accumulator[-1]
409 if self.parenthesized:
410 return _valid_char_in_line(")", line)
412 return not _valid_char_in_line("\\", line)
414 def analyze(self, line: str) -> None:
415 """Decide if the statement will be fixed or left unchanged."""
416 if any(ch in line for ch in ";:#"):
417 self.give_up = True
419 def fix(self, accumulated: Iterable[str]) -> str:
420 """Given a collection of accumulated lines, fix the entire import."""
421 old_imports = "".join(accumulated)
422 ending = get_line_ending(old_imports)
423 # Split imports into segments that contain the module name +
424 # comma + whitespace and eventual <newline> \ ( ) chars
425 segments = [x for x in self.SEGMENT_RE.findall(old_imports) if x]
426 modules = [_segment_module(x) for x in segments]
427 keep = _filter_imports(modules, self.base, self.remove)
429 # Short-circuit if no import was discarded
430 if len(keep) == len(segments):
431 return self.from_ + "import " + "".join(accumulated)
433 fixed = ""
434 if keep:
435 # Since it is very difficult to deal with all the line breaks and
436 # continuations, let's use the code layout that already exists and
437 # just replace the module identifiers inside the first N-1 segments
438 # + the last segment
439 templates = list(zip(modules, segments))
440 templates = templates[: len(keep) - 1] + templates[-1:]
441 # It is important to keep the last segment, since it might contain
442 # important chars like `)`
443 fixed = "".join(
444 template.replace(module, keep[i])
445 for i, (module, template) in enumerate(templates)
446 )
448 # Fix the edge case: inline parenthesis + just one surviving import
449 if self.parenthesized and any(ch not in fixed for ch in "()"):
450 fixed = fixed.strip(string.whitespace + "()") + ending
452 # Replace empty imports with a "pass" statement
453 empty = len(fixed.strip(string.whitespace + "\\(),")) < 1
454 if empty:
455 match = self.INDENTATION_RE.search(self.from_)
456 assert match is not None
457 indentation = match.group(0)
458 return indentation + "pass" + ending
460 return self.from_ + "import " + fixed
462 def __call__(self, line: str | None = None) -> PendingFix | str:
463 """Accumulate all the lines in the import and then trigger the fix."""
464 if line:
465 self.accumulator.append(line)
466 self.analyze(line)
467 if not self.is_over(line):
468 return self
469 if self.give_up:
470 return self.from_ + "import " + "".join(self.accumulator)
472 return self.fix(self.accumulator)
475def _filter_imports(
476 imports: Iterable[str],
477 parent: str | None = None,
478 unused_module: Iterable[str] = (),
479) -> Sequence[str]:
480 # We compare full module name (``a.module`` not `module`) to
481 # guarantee the exact same module as detected from pyflakes.
482 sep = "" if parent and parent[-1] == "." else "."
484 def full_name(name: str) -> str:
485 return name if parent is None else parent + sep + name
487 return [x for x in imports if full_name(x) not in unused_module]
490def filter_from_import(line: str, unused_module: Iterable[str]) -> str:
491 """Parse and filter ``from something import a, b, c``.
493 Return line without unused import modules, or `pass` if all of the
494 module in import is unused.
495 """
496 (indentation, imports) = re.split(
497 pattern=r"\bimport\b",
498 string=line,
499 maxsplit=1,
500 )
501 match = re.search(
502 pattern=r"\bfrom\s+([^ ]+)",
503 string=indentation,
504 )
505 assert match is not None
506 base_module = match.group(1)
508 imports = re.split(pattern=r"\s*,\s*", string=imports.strip())
509 filtered_imports = _filter_imports(imports, base_module, unused_module)
511 # All of the import in this statement is unused
512 if not filtered_imports:
513 return get_indentation(line) + "pass" + get_line_ending(line)
515 indentation += "import "
517 return indentation + ", ".join(filtered_imports) + get_line_ending(line)
520def break_up_import(line: str) -> str:
521 """Return line with imports on separate lines."""
522 assert "\\" not in line
523 assert "(" not in line
524 assert ")" not in line
525 assert ";" not in line
526 assert "#" not in line
527 assert not line.lstrip().startswith("from")
529 newline = get_line_ending(line)
530 if not newline:
531 return line
533 (indentation, imports) = re.split(
534 pattern=r"\bimport\b",
535 string=line,
536 maxsplit=1,
537 )
539 indentation += "import "
540 assert newline
542 return "".join(
543 [indentation + i.strip() + newline for i in imports.split(",")],
544 )
547def filter_code(
548 source: str,
549 additional_imports: Iterable[str] | None = None,
550 expand_star_imports: bool = False,
551 remove_all_unused_imports: bool = False,
552 remove_duplicate_keys: bool = False,
553 remove_unused_variables: bool = False,
554 remove_rhs_for_unused_variables: bool = False,
555 ignore_init_module_imports: bool = False,
556) -> Iterable[str]:
557 """Yield code with unused imports removed."""
558 imports = SAFE_IMPORTS
559 if additional_imports:
560 imports |= frozenset(additional_imports)
561 del additional_imports
563 messages = check(source)
565 if ignore_init_module_imports:
566 marked_import_line_numbers: frozenset[int] = frozenset()
567 else:
568 marked_import_line_numbers = frozenset(
569 unused_import_line_numbers(messages),
570 )
571 marked_unused_module: dict[int, list[str]] = collections.defaultdict(list)
572 for line_number, module_name in unused_import_module_name(messages):
573 marked_unused_module[line_number].append(module_name)
575 undefined_names: list[str] = []
576 if expand_star_imports and not (
577 # See explanations in #18.
578 re.search(r"\b__all__\b", source)
579 or re.search(r"\bdel\b", source)
580 ):
581 marked_star_import_line_numbers = frozenset(
582 star_import_used_line_numbers(messages),
583 )
584 if len(marked_star_import_line_numbers) > 1:
585 # Auto expanding only possible for single star import
586 marked_star_import_line_numbers = frozenset()
587 else:
588 for line_number, undefined_name, _ in star_import_usage_undefined_name(
589 messages,
590 ):
591 undefined_names.append(undefined_name)
592 if not undefined_names:
593 marked_star_import_line_numbers = frozenset()
594 else:
595 marked_star_import_line_numbers = frozenset()
597 if remove_unused_variables:
598 marked_variable_line_numbers = frozenset(
599 unused_variable_line_numbers(messages),
600 )
601 else:
602 marked_variable_line_numbers = frozenset()
604 if remove_duplicate_keys:
605 marked_key_line_numbers: frozenset[int] = frozenset(
606 duplicate_key_line_numbers(messages, source),
607 )
608 else:
609 marked_key_line_numbers = frozenset()
611 line_messages = get_messages_by_line(messages)
613 sio = io.StringIO(source)
614 previous_line = ""
615 result: str | PendingFix = ""
616 for line_number, line in enumerate(sio.readlines(), start=1):
617 if isinstance(result, PendingFix):
618 result = result(line)
619 elif "#" in line:
620 result = line
621 elif line_number in marked_import_line_numbers:
622 result = filter_unused_import(
623 line,
624 unused_module=marked_unused_module[line_number],
625 remove_all_unused_imports=remove_all_unused_imports,
626 imports=imports,
627 previous_line=previous_line,
628 )
629 elif line_number in marked_variable_line_numbers:
630 result = filter_unused_variable(
631 line,
632 drop_rhs=remove_rhs_for_unused_variables,
633 )
634 elif line_number in marked_key_line_numbers:
635 result = filter_duplicate_key(
636 line,
637 line_messages[line_number],
638 line_number,
639 marked_key_line_numbers,
640 source,
641 )
642 elif line_number in marked_star_import_line_numbers:
643 result = filter_star_import(line, undefined_names)
644 else:
645 result = line
647 if not isinstance(result, PendingFix):
648 yield result
650 previous_line = line
653def get_messages_by_line(
654 messages: Iterable[pyflakes.messages.Message],
655) -> Mapping[int, pyflakes.messages.Message]:
656 """Return dictionary that maps line number to message."""
657 line_messages: dict[int, pyflakes.messages.Message] = {}
658 for message in messages:
659 line_messages[message.lineno] = message
660 return line_messages
663def filter_star_import(
664 line: str,
665 marked_star_import_undefined_name: Iterable[str],
666) -> str:
667 """Return line with the star import expanded."""
668 undefined_name = sorted(set(marked_star_import_undefined_name))
669 return re.sub(r"\*", ", ".join(undefined_name), line)
672def filter_unused_import(
673 line: str,
674 unused_module: Iterable[str],
675 remove_all_unused_imports: bool,
676 imports: Iterable[str],
677 previous_line: str = "",
678) -> PendingFix | str:
679 """Return line if used, otherwise return None."""
680 # Ignore doctests.
681 if line.lstrip().startswith(">"):
682 return line
684 if multiline_import(line, previous_line):
685 filt = FilterMultilineImport(
686 line,
687 unused_module,
688 remove_all_unused_imports,
689 imports,
690 previous_line,
691 )
692 return filt()
694 is_from_import = line.lstrip().startswith("from")
696 if "," in line and not is_from_import:
697 return break_up_import(line)
699 package = extract_package_name(line)
700 if not remove_all_unused_imports and package is not None and package not in imports:
701 return line
703 if "," in line:
704 assert is_from_import
705 return filter_from_import(line, unused_module)
706 else:
707 # We need to replace import with "pass" in case the import is the
708 # only line inside a block. For example,
709 # "if True:\n import os". In such cases, if the import is
710 # removed, the block will be left hanging with no body.
711 return get_indentation(line) + "pass" + get_line_ending(line)
714def filter_unused_variable(
715 line: str,
716 previous_line: str = "",
717 drop_rhs: bool = False,
718) -> str:
719 """Return line if used, otherwise return None."""
720 if re.match(EXCEPT_REGEX, line):
721 return re.sub(r" as \w+:$", ":", line, count=1)
722 elif multiline_statement(line, previous_line):
723 return line
724 elif line.count("=") == 1:
725 split_line = line.split("=")
726 assert len(split_line) == 2
727 value = split_line[1].lstrip()
728 if "," in split_line[0]:
729 return line
731 if is_literal_or_name(value):
732 # Rather than removing the line, replace with it "pass" to avoid
733 # a possible hanging block with no body.
734 value = "pass" + get_line_ending(line)
735 if drop_rhs:
736 return get_indentation(line) + value
738 if drop_rhs:
739 return ""
740 return get_indentation(line) + value
741 else:
742 return line
745def filter_duplicate_key(
746 line: str,
747 message: pyflakes.messages.Message,
748 line_number: int,
749 marked_line_numbers: Iterable[int],
750 source: str,
751 previous_line: str = "",
752) -> str:
753 """Return '' if first occurrence of the key otherwise return `line`."""
754 if marked_line_numbers and line_number == sorted(marked_line_numbers)[0]:
755 return ""
757 return line
760def dict_entry_has_key(line: str, key: Any) -> bool:
761 """Return True if `line` is a dict entry that uses `key`.
763 Return False for multiline cases where the line should not be removed by
764 itself.
766 """
767 if "#" in line:
768 return False
770 result = re.match(r"\s*(.*)\s*:\s*(.*),\s*$", line)
771 if not result:
772 return False
774 try:
775 candidate_key = ast.literal_eval(result.group(1))
776 except (SyntaxError, ValueError):
777 return False
779 if multiline_statement(result.group(2)):
780 return False
782 return cast(bool, candidate_key == key)
785def is_literal_or_name(value: str) -> bool:
786 """Return True if value is a literal or a name."""
787 try:
788 ast.literal_eval(value)
789 return True
790 except (SyntaxError, ValueError):
791 pass
793 if value.strip() in ["dict()", "list()", "set()"]:
794 return True
796 # Support removal of variables on the right side. But make sure
797 # there are no dots, which could mean an access of a property.
798 return re.match(r"^\w+\s*$", value) is not None
801def useless_pass_line_numbers(
802 source: str,
803 ignore_pass_after_docstring: bool = False,
804) -> Iterable[int]:
805 """Yield line numbers of unneeded "pass" statements."""
806 sio = io.StringIO(source)
807 previous_token_type = None
808 last_pass_row = None
809 last_pass_indentation = None
810 previous_line = ""
811 previous_non_empty_line = ""
812 for token in tokenize.generate_tokens(sio.readline):
813 token_type = token[0]
814 start_row = token[2][0]
815 line = token[4]
817 is_pass = token_type == tokenize.NAME and line.strip() == "pass"
819 # Leading "pass".
820 if (
821 start_row - 1 == last_pass_row
822 and get_indentation(line) == last_pass_indentation
823 and token_type in ATOMS
824 and not is_pass
825 ):
826 yield start_row - 1
828 if is_pass:
829 last_pass_row = start_row
830 last_pass_indentation = get_indentation(line)
832 is_trailing_pass = (
833 previous_token_type != tokenize.INDENT
834 and not previous_line.rstrip().endswith("\\")
835 )
837 is_pass_after_docstring = previous_non_empty_line.rstrip().endswith(
838 ("'''", '"""'),
839 )
841 # Trailing "pass".
842 if is_trailing_pass:
843 if is_pass_after_docstring and ignore_pass_after_docstring:
844 continue
845 else:
846 yield start_row
848 previous_token_type = token_type
849 previous_line = line
850 if line.strip():
851 previous_non_empty_line = line
854def filter_useless_pass(
855 source: str,
856 ignore_pass_statements: bool = False,
857 ignore_pass_after_docstring: bool = False,
858) -> Iterable[str]:
859 """Yield code with useless "pass" lines removed."""
860 if ignore_pass_statements:
861 marked_lines: frozenset[int] = frozenset()
862 else:
863 try:
864 marked_lines = frozenset(
865 useless_pass_line_numbers(
866 source,
867 ignore_pass_after_docstring,
868 ),
869 )
870 except (SyntaxError, tokenize.TokenError):
871 marked_lines = frozenset()
873 sio = io.StringIO(source)
874 for line_number, line in enumerate(sio.readlines(), start=1):
875 if line_number not in marked_lines:
876 yield line
879def get_indentation(line: str) -> str:
880 """Return leading whitespace."""
881 if line.strip():
882 non_whitespace_index = len(line) - len(line.lstrip())
883 return line[:non_whitespace_index]
884 else:
885 return ""
888def get_line_ending(line: str) -> str:
889 """Return line ending."""
890 non_whitespace_index = len(line.rstrip()) - len(line)
891 if not non_whitespace_index:
892 return ""
893 else:
894 return line[non_whitespace_index:]
897def fix_code(
898 source: str,
899 additional_imports: Iterable[str] | None = None,
900 expand_star_imports: bool = False,
901 remove_all_unused_imports: bool = False,
902 remove_duplicate_keys: bool = False,
903 remove_unused_variables: bool = False,
904 remove_rhs_for_unused_variables: bool = False,
905 ignore_init_module_imports: bool = False,
906 ignore_pass_statements: bool = False,
907 ignore_pass_after_docstring: bool = False,
908) -> str:
909 """Return code with all filtering run on it."""
910 if not source:
911 return source
913 if IGNORE_COMMENT_REGEX.search(source):
914 return source
916 # pyflakes does not handle "nonlocal" correctly.
917 if "nonlocal" in source:
918 remove_unused_variables = False
920 filtered_source = None
921 while True:
922 filtered_source = "".join(
923 filter_useless_pass(
924 "".join(
925 filter_code(
926 source,
927 additional_imports=additional_imports,
928 expand_star_imports=expand_star_imports,
929 remove_all_unused_imports=remove_all_unused_imports,
930 remove_duplicate_keys=remove_duplicate_keys,
931 remove_unused_variables=remove_unused_variables,
932 remove_rhs_for_unused_variables=(
933 remove_rhs_for_unused_variables
934 ),
935 ignore_init_module_imports=ignore_init_module_imports,
936 ),
937 ),
938 ignore_pass_statements=ignore_pass_statements,
939 ignore_pass_after_docstring=ignore_pass_after_docstring,
940 ),
941 )
943 if filtered_source == source:
944 break
945 source = filtered_source
947 return filtered_source
950def fix_file(
951 filename: str,
952 args: Mapping[str, Any],
953 standard_out: IO[str] | None = None,
954) -> int:
955 """Run fix_code() on a file."""
956 if standard_out is None:
957 standard_out = sys.stdout
958 encoding = detect_encoding(filename)
959 with open_with_encoding(filename, encoding=encoding) as input_file:
960 return _fix_file(
961 input_file,
962 filename,
963 args,
964 args["write_to_stdout"],
965 cast(IO[str], standard_out),
966 encoding=encoding,
967 )
970def _fix_file(
971 input_file: IO[str],
972 filename: str,
973 args: Mapping[str, Any],
974 write_to_stdout: bool,
975 standard_out: IO[str],
976 encoding: str | None = None,
977) -> int:
978 source = input_file.read()
979 original_source = source
981 isInitFile = os.path.basename(filename) == "__init__.py"
983 if args["ignore_init_module_imports"] and isInitFile:
984 ignore_init_module_imports = True
985 else:
986 ignore_init_module_imports = False
988 filtered_source = fix_code(
989 source,
990 additional_imports=(args["imports"].split(",") if "imports" in args else None),
991 expand_star_imports=args["expand_star_imports"],
992 remove_all_unused_imports=args["remove_all_unused_imports"],
993 remove_duplicate_keys=args["remove_duplicate_keys"],
994 remove_unused_variables=args["remove_unused_variables"],
995 remove_rhs_for_unused_variables=(args["remove_rhs_for_unused_variables"]),
996 ignore_init_module_imports=ignore_init_module_imports,
997 ignore_pass_statements=args["ignore_pass_statements"],
998 ignore_pass_after_docstring=args["ignore_pass_after_docstring"],
999 )
1001 if original_source != filtered_source:
1002 if args["check"]:
1003 standard_out.write(
1004 f"{filename}: Unused imports/variables detected{os.linesep}",
1005 )
1006 return 1
1007 if args["check_diff"]:
1008 diff = get_diff_text(
1009 io.StringIO(original_source).readlines(),
1010 io.StringIO(filtered_source).readlines(),
1011 filename,
1012 )
1013 standard_out.write("".join(diff))
1014 return 1
1015 if write_to_stdout:
1016 standard_out.write(filtered_source)
1017 elif args["in_place"]:
1018 with open_with_encoding(
1019 filename,
1020 mode="w",
1021 encoding=encoding,
1022 ) as output_file:
1023 output_file.write(filtered_source)
1024 _LOGGER.info("Fixed %s", filename)
1025 else:
1026 diff = get_diff_text(
1027 io.StringIO(original_source).readlines(),
1028 io.StringIO(filtered_source).readlines(),
1029 filename,
1030 )
1031 standard_out.write("".join(diff))
1032 elif write_to_stdout:
1033 standard_out.write(filtered_source)
1034 else:
1035 if (args["check"] or args["check_diff"]) and not args["quiet"]:
1036 standard_out.write(f"{filename}: No issues detected!{os.linesep}")
1037 else:
1038 _LOGGER.debug("Clean %s: nothing to fix", filename)
1040 return 0
1043def open_with_encoding(
1044 filename: str,
1045 encoding: str | None,
1046 mode: str = "r",
1047 limit_byte_check: int = -1,
1048) -> IO[str]:
1049 """Return opened file with a specific encoding."""
1050 if not encoding:
1051 encoding = detect_encoding(filename, limit_byte_check=limit_byte_check)
1053 return open(
1054 filename,
1055 mode=mode,
1056 encoding=encoding,
1057 newline="", # Preserve line endings
1058 )
1061def detect_encoding(filename: str, limit_byte_check: int = -1) -> str:
1062 """Return file encoding."""
1063 try:
1064 with open(filename, "rb") as input_file:
1065 encoding = _detect_encoding(input_file.readline)
1067 # Check for correctness of encoding.
1068 with open_with_encoding(filename, encoding) as input_file:
1069 input_file.read(limit_byte_check)
1071 return encoding
1072 except (LookupError, SyntaxError, UnicodeDecodeError):
1073 return "latin-1"
1076def _detect_encoding(readline: Callable[[], bytes]) -> str:
1077 """Return file encoding."""
1078 try:
1079 encoding = tokenize.detect_encoding(readline)[0]
1080 return encoding
1081 except (LookupError, SyntaxError, UnicodeDecodeError):
1082 return "latin-1"
1085def get_diff_text(old: Sequence[str], new: Sequence[str], filename: str) -> str:
1086 """Return text of unified diff between old and new."""
1087 newline = "\n"
1088 diff = difflib.unified_diff(
1089 old,
1090 new,
1091 "original/" + filename,
1092 "fixed/" + filename,
1093 lineterm=newline,
1094 )
1096 text = ""
1097 for line in diff:
1098 text += line
1100 # Work around missing newline (http://bugs.python.org/issue2142).
1101 if not line.endswith(newline):
1102 text += newline + r"\ No newline at end of file" + newline
1104 return text
1107def _split_comma_separated(string: str) -> set[str]:
1108 """Return a set of strings."""
1109 return {text.strip() for text in string.split(",") if text.strip()}
1112def is_python_file(filename: str) -> bool:
1113 """Return True if filename is Python file."""
1114 if filename.endswith(".py"):
1115 return True
1117 try:
1118 with open_with_encoding(
1119 filename,
1120 None,
1121 limit_byte_check=MAX_PYTHON_FILE_DETECTION_BYTES,
1122 ) as f:
1123 text = f.read(MAX_PYTHON_FILE_DETECTION_BYTES)
1124 if not text:
1125 return False
1126 first_line = text.splitlines()[0]
1127 except (OSError, IndexError):
1128 return False
1130 if not PYTHON_SHEBANG_REGEX.match(first_line):
1131 return False
1133 return True
1136def is_exclude_file(filename: str, exclude: Iterable[str]) -> bool:
1137 """Return True if file matches exclude pattern."""
1138 base_name = os.path.basename(filename)
1140 if base_name.startswith("."):
1141 return True
1143 for pattern in exclude:
1144 if fnmatch.fnmatch(base_name, pattern):
1145 return True
1146 if fnmatch.fnmatch(filename, pattern):
1147 return True
1148 return False
1151def match_file(filename: str, exclude: Iterable[str]) -> bool:
1152 """Return True if file is okay for modifying/recursing."""
1153 if is_exclude_file(filename, exclude):
1154 _LOGGER.debug("Skipped %s: matched to exclude pattern", filename)
1155 return False
1157 if not os.path.isdir(filename) and not is_python_file(filename):
1158 return False
1160 return True
1163def find_files(
1164 filenames: list[str],
1165 recursive: bool,
1166 exclude: Iterable[str],
1167) -> Iterable[str]:
1168 """Yield filenames."""
1169 while filenames:
1170 name = filenames.pop(0)
1171 if recursive and os.path.isdir(name):
1172 for root, directories, children in os.walk(name):
1173 filenames += [
1174 os.path.join(root, f)
1175 for f in children
1176 if match_file(
1177 os.path.join(root, f),
1178 exclude,
1179 )
1180 ]
1181 directories[:] = [
1182 d
1183 for d in directories
1184 if match_file(
1185 os.path.join(root, d),
1186 exclude,
1187 )
1188 ]
1189 else:
1190 if not is_exclude_file(name, exclude):
1191 yield name
1192 else:
1193 _LOGGER.debug("Skipped %s: matched to exclude pattern", name)
1196def process_pyproject_toml(toml_file_path: str) -> MutableMapping[str, Any] | None:
1197 """Extract config mapping from pyproject.toml file."""
1198 try:
1199 import tomllib
1200 except ModuleNotFoundError:
1201 import tomli as tomllib
1203 with open(toml_file_path, "rb") as f:
1204 return tomllib.load(f).get("tool", {}).get("autoflake", None)
1207def process_config_file(config_file_path: str) -> MutableMapping[str, Any] | None:
1208 """Extract config mapping from config file."""
1209 import configparser
1211 reader = configparser.ConfigParser()
1212 reader.read(config_file_path, encoding="utf-8")
1213 if not reader.has_section("autoflake"):
1214 return None
1216 return reader["autoflake"]
1219def find_and_process_config(args: Mapping[str, Any]) -> MutableMapping[str, Any] | None:
1220 # Configuration file parsers {filename: parser function}.
1221 CONFIG_FILES: Mapping[str, Callable[[str], MutableMapping[str, Any] | None]] = {
1222 "pyproject.toml": process_pyproject_toml,
1223 "setup.cfg": process_config_file,
1224 }
1225 # Traverse the file tree common to all files given as argument looking for
1226 # a configuration file
1227 config_path = os.path.commonpath([os.path.abspath(file) for file in args["files"]])
1228 config: Mapping[str, Any] | None = None
1229 while True:
1230 for config_file, processor in CONFIG_FILES.items():
1231 config_file_path = os.path.join(
1232 os.path.join(config_path, config_file),
1233 )
1234 if os.path.isfile(config_file_path):
1235 config = processor(config_file_path)
1236 if config is not None:
1237 break
1238 if config is not None:
1239 break
1240 config_path, tail = os.path.split(config_path)
1241 if not tail:
1242 break
1243 return config
1246def merge_configuration_file(
1247 flag_args: MutableMapping[str, Any],
1248) -> tuple[MutableMapping[str, Any], bool]:
1249 """Merge configuration from a file into args."""
1250 BOOL_TYPES = {
1251 "1": True,
1252 "yes": True,
1253 "true": True,
1254 "on": True,
1255 "0": False,
1256 "no": False,
1257 "false": False,
1258 "off": False,
1259 }
1261 if "config_file" in flag_args:
1262 config_file = pathlib.Path(flag_args["config_file"]).resolve()
1263 process_method = process_config_file
1264 if config_file.suffix == ".toml":
1265 process_method = process_pyproject_toml
1267 config = process_method(str(config_file))
1269 if not config:
1270 _LOGGER.error(
1271 "can't parse config file '%s'",
1272 config_file,
1273 )
1274 return flag_args, False
1275 else:
1276 config = find_and_process_config(flag_args)
1278 BOOL_FLAGS = {
1279 "check",
1280 "check_diff",
1281 "expand_star_imports",
1282 "ignore_init_module_imports",
1283 "ignore_pass_after_docstring",
1284 "ignore_pass_statements",
1285 "in_place",
1286 "quiet",
1287 "recursive",
1288 "remove_all_unused_imports",
1289 "remove_duplicate_keys",
1290 "remove_rhs_for_unused_variables",
1291 "remove_unused_variables",
1292 "write_to_stdout",
1293 }
1295 config_args: dict[str, Any] = {}
1296 if config is not None:
1297 for name, value in config.items():
1298 arg = name.replace("-", "_")
1299 if arg in BOOL_FLAGS:
1300 # boolean properties
1301 if isinstance(value, str):
1302 value = BOOL_TYPES.get(value.lower(), value)
1303 if not isinstance(value, bool):
1304 _LOGGER.error(
1305 "'%s' in the config file should be a boolean",
1306 name,
1307 )
1308 return flag_args, False
1309 config_args[arg] = value
1310 else:
1311 if isinstance(value, list) and all(
1312 isinstance(val, str) for val in value
1313 ):
1314 value = ",".join(str(val) for val in value)
1315 if not isinstance(value, str):
1316 _LOGGER.error(
1317 "'%s' in the config file should be a comma separated"
1318 " string or list of strings",
1319 name,
1320 )
1321 return flag_args, False
1323 config_args[arg] = value
1325 # merge args that can be merged
1326 merged_args = {}
1327 mergeable_keys = {"imports", "exclude"}
1328 for key in mergeable_keys:
1329 values = (
1330 v for v in (config_args.get(key), flag_args.get(key)) if v is not None
1331 )
1332 value = ",".join(values)
1333 if value != "":
1334 merged_args[key] = value
1336 default_args = {arg: False for arg in BOOL_FLAGS}
1337 return {
1338 **default_args,
1339 **config_args,
1340 **flag_args,
1341 **merged_args,
1342 }, True
1345def _main(
1346 argv: Sequence[str],
1347 standard_out: IO[str] | None,
1348 standard_error: IO[str] | None,
1349 standard_input: IO[str] | None = None,
1350) -> int:
1351 """Return exit status.
1353 0 means no error.
1354 """
1355 import argparse
1357 parser = argparse.ArgumentParser(
1358 description=__doc__,
1359 prog="autoflake",
1360 argument_default=argparse.SUPPRESS,
1361 )
1362 check_group = parser.add_mutually_exclusive_group()
1363 check_group.add_argument(
1364 "-c",
1365 "--check",
1366 action="store_true",
1367 help="return error code if changes are needed",
1368 )
1369 check_group.add_argument(
1370 "-cd",
1371 "--check-diff",
1372 action="store_true",
1373 help="return error code if changes are needed, also display file diffs",
1374 )
1376 imports_group = parser.add_mutually_exclusive_group()
1377 imports_group.add_argument(
1378 "--imports",
1379 help="by default, only unused standard library "
1380 "imports are removed; specify a comma-separated "
1381 "list of additional modules/packages",
1382 )
1383 imports_group.add_argument(
1384 "--remove-all-unused-imports",
1385 action="store_true",
1386 help="remove all unused imports (not just those from " "the standard library)",
1387 )
1389 parser.add_argument(
1390 "-r",
1391 "--recursive",
1392 action="store_true",
1393 help="drill down directories recursively",
1394 )
1395 parser.add_argument(
1396 "-j",
1397 "--jobs",
1398 type=int,
1399 metavar="n",
1400 default=0,
1401 help="number of parallel jobs; " "match CPU count if value is 0 (default: 0)",
1402 )
1403 parser.add_argument(
1404 "--exclude",
1405 metavar="globs",
1406 help="exclude file/directory names that match these " "comma-separated globs",
1407 )
1408 parser.add_argument(
1409 "--expand-star-imports",
1410 action="store_true",
1411 help="expand wildcard star imports with undefined "
1412 "names; this only triggers if there is only "
1413 "one star import in the file; this is skipped if "
1414 "there are any uses of `__all__` or `del` in the "
1415 "file",
1416 )
1417 parser.add_argument(
1418 "--ignore-init-module-imports",
1419 action="store_true",
1420 help="exclude __init__.py when removing unused " "imports",
1421 )
1422 parser.add_argument(
1423 "--remove-duplicate-keys",
1424 action="store_true",
1425 help="remove all duplicate keys in objects",
1426 )
1427 parser.add_argument(
1428 "--remove-unused-variables",
1429 action="store_true",
1430 help="remove unused variables",
1431 )
1432 parser.add_argument(
1433 "--remove-rhs-for-unused-variables",
1434 action="store_true",
1435 help="remove RHS of statements when removing unused " "variables (unsafe)",
1436 )
1437 parser.add_argument(
1438 "--ignore-pass-statements",
1439 action="store_true",
1440 help="ignore all pass statements",
1441 )
1442 parser.add_argument(
1443 "--ignore-pass-after-docstring",
1444 action="store_true",
1445 help='ignore pass statements after a newline ending on \'"""\'',
1446 )
1447 parser.add_argument(
1448 "--version",
1449 action="version",
1450 version="%(prog)s " + __version__,
1451 )
1452 parser.add_argument(
1453 "--quiet",
1454 action="store_true",
1455 help="Suppress output if there are no issues",
1456 )
1457 parser.add_argument(
1458 "-v",
1459 "--verbose",
1460 action="count",
1461 dest="verbosity",
1462 default=0,
1463 help="print more verbose logs (you can " "repeat `-v` to make it more verbose)",
1464 )
1465 parser.add_argument(
1466 "--stdin-display-name",
1467 dest="stdin_display_name",
1468 default="stdin",
1469 help="the name used when processing input from stdin",
1470 )
1472 parser.add_argument(
1473 "--config",
1474 dest="config_file",
1475 help=(
1476 "Explicitly set the config file "
1477 "instead of auto determining based on file location"
1478 ),
1479 )
1481 parser.add_argument("files", nargs="+", help="files to format")
1483 output_group = parser.add_mutually_exclusive_group()
1484 output_group.add_argument(
1485 "-i",
1486 "--in-place",
1487 action="store_true",
1488 help="make changes to files instead of printing diffs",
1489 )
1490 output_group.add_argument(
1491 "-s",
1492 "--stdout",
1493 action="store_true",
1494 dest="write_to_stdout",
1495 help=(
1496 "print changed text to stdout. defaults to true "
1497 "when formatting stdin, or to false otherwise"
1498 ),
1499 )
1501 args: MutableMapping[str, Any] = vars(parser.parse_args(argv[1:]))
1503 if standard_error is None:
1504 _LOGGER.addHandler(logging.NullHandler())
1505 else:
1506 _LOGGER.addHandler(logging.StreamHandler(standard_error))
1507 loglevels = [logging.WARNING, logging.INFO, logging.DEBUG]
1508 try:
1509 loglevel = loglevels[args["verbosity"]]
1510 except IndexError: # Too much -v
1511 loglevel = loglevels[-1]
1512 _LOGGER.setLevel(loglevel)
1514 args, success = merge_configuration_file(args)
1515 if not success:
1516 return 1
1518 if args["remove_rhs_for_unused_variables"] and not (
1519 args["remove_unused_variables"]
1520 ):
1521 _LOGGER.error(
1522 "Using --remove-rhs-for-unused-variables only makes sense when "
1523 "used with --remove-unused-variables",
1524 )
1525 return 1
1527 if "exclude" in args:
1528 args["exclude"] = _split_comma_separated(args["exclude"])
1529 else:
1530 args["exclude"] = set()
1532 if args["jobs"] < 1:
1533 worker_count = os.cpu_count()
1534 if sys.platform == "win32":
1535 # Work around https://bugs.python.org/issue26903
1536 worker_count = min(worker_count, 60)
1537 args["jobs"] = worker_count or 1
1539 filenames = list(set(args["files"]))
1541 # convert argparse namespace to a dict so that it can be serialized
1542 # by multiprocessing
1543 exit_status = 0
1544 files = list(find_files(filenames, args["recursive"], args["exclude"]))
1545 if (
1546 args["jobs"] == 1
1547 or len(files) == 1
1548 or args["jobs"] == 1
1549 or "-" in files
1550 or standard_out is not None
1551 ):
1552 for name in files:
1553 if name == "-" and standard_input is not None:
1554 exit_status |= _fix_file(
1555 standard_input,
1556 args["stdin_display_name"],
1557 args=args,
1558 write_to_stdout=True,
1559 standard_out=standard_out or sys.stdout,
1560 )
1561 else:
1562 try:
1563 exit_status |= fix_file(
1564 name,
1565 args=args,
1566 standard_out=standard_out,
1567 )
1568 except OSError as exception:
1569 _LOGGER.error(str(exception))
1570 exit_status |= 1
1571 else:
1572 import multiprocessing
1574 with multiprocessing.Pool(args["jobs"]) as pool:
1575 futs = []
1576 for name in files:
1577 fut = pool.apply_async(fix_file, args=(name, args))
1578 futs.append(fut)
1579 for fut in futs:
1580 try:
1581 exit_status |= fut.get()
1582 except OSError as exception:
1583 _LOGGER.error(str(exception))
1584 exit_status |= 1
1586 return exit_status
1589def main() -> int:
1590 """Command-line entry point."""
1591 try:
1592 # Exit on broken pipe.
1593 signal.signal(signal.SIGPIPE, signal.SIG_DFL)
1594 except AttributeError: # pragma: no cover
1595 # SIGPIPE is not available on Windows.
1596 pass
1598 try:
1599 return _main(
1600 sys.argv,
1601 standard_out=None,
1602 standard_error=sys.stderr,
1603 standard_input=sys.stdin,
1604 )
1605 except KeyboardInterrupt: # pragma: no cover
1606 return 2 # pragma: no cover
1609if __name__ == "__main__":
1610 sys.exit(main())