Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/autoflake.py: 49%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1#!/usr/bin/env python
2# Copyright (C) Steven Myint
3#
4# Permission is hereby granted, free of charge, to any person obtaining
5# a copy of this software and associated documentation files (the
6# "Software"), to deal in the Software without restriction, including
7# without limitation the rights to use, copy, modify, merge, publish,
8# distribute, sublicense, and/or sell copies of the Software, and to
9# permit persons to whom the Software is furnished to do so, subject to
10# the following conditions:
11#
12# The above copyright notice and this permission notice shall be included
13# in all copies or substantial portions of the Software.
14#
15# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
16# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
17# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
18# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
19# CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
20# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
21# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
22"""Removes unused imports and unused variables as reported by pyflakes."""
24from __future__ import annotations
26import ast
27import collections
28import difflib
29import fnmatch
30import io
31import logging
32import os
33import pathlib
34import re
35import signal
36import string
37import sys
38import sysconfig
39import tokenize
40from collections.abc import Callable
41from collections.abc import Iterable
42from collections.abc import Mapping
43from collections.abc import MutableMapping
44from collections.abc import Sequence
45from typing import Any
46from typing import cast
47from typing import IO
49import pyflakes.api
50import pyflakes.messages
51import pyflakes.reporter
53__version__ = "2.3.3"
56_LOGGER = logging.getLogger("autoflake")
57_LOGGER.propagate = False
59ATOMS = frozenset([tokenize.NAME, tokenize.NUMBER, tokenize.STRING])
61EXCEPT_REGEX = re.compile(r"^\s*except [\s,()\w]+ as \w+:$")
62PYTHON_SHEBANG_REGEX = re.compile(r"^#!.*\bpython[3]?\b\s*$")
64MAX_PYTHON_FILE_DETECTION_BYTES = 1024
66IGNORE_COMMENT_REGEX = re.compile(
67 r"\s*#\s{1,}autoflake:\s{1,}\bskip_file\b",
68 re.MULTILINE,
69)
72def standard_paths() -> Iterable[str]:
73 """Yield paths to standard modules."""
74 paths = sysconfig.get_paths()
75 path_names = ("stdlib", "platstdlib")
76 for path_name in path_names:
77 # Yield lib paths.
78 if path_name in paths:
79 path = paths[path_name]
80 if os.path.isdir(path):
81 yield from os.listdir(path)
83 # Yield lib-dynload paths.
84 dynload_path = os.path.join(path, "lib-dynload")
85 if os.path.isdir(dynload_path):
86 yield from os.listdir(dynload_path)
89def standard_package_names() -> Iterable[str]:
90 """Yield standard module names."""
91 for name in standard_paths():
92 if name.startswith("_") or "-" in name:
93 continue
95 if "." in name and not name.endswith(("so", "py", "pyc")):
96 continue
98 yield name.split(".")[0]
101IMPORTS_WITH_SIDE_EFFECTS = {"antigravity", "rlcompleter", "this"}
103# In case they are built into CPython.
104BINARY_IMPORTS = {
105 "datetime",
106 "grp",
107 "io",
108 "json",
109 "math",
110 "multiprocessing",
111 "parser",
112 "pwd",
113 "string",
114 "operator",
115 "os",
116 "sys",
117 "time",
118}
120SAFE_IMPORTS = (
121 frozenset(standard_package_names()) - IMPORTS_WITH_SIDE_EFFECTS | BINARY_IMPORTS
122)
125def unused_import_line_numbers(
126 messages: Iterable[pyflakes.messages.Message],
127) -> Iterable[int]:
128 """Yield line numbers of unused imports."""
129 for message in messages:
130 if isinstance(message, pyflakes.messages.UnusedImport):
131 yield message.lineno
134def unused_import_module_name(
135 messages: Iterable[pyflakes.messages.Message],
136) -> Iterable[tuple[int, str]]:
137 """Yield line number and module name of unused imports."""
138 pattern = re.compile(r"\'(.+?)\'")
139 for message in messages:
140 if isinstance(message, pyflakes.messages.UnusedImport):
141 module_name = pattern.search(str(message))
142 if module_name:
143 module_name = module_name.group()[1:-1]
144 yield (message.lineno, module_name)
147def star_import_used_line_numbers(
148 messages: Iterable[pyflakes.messages.Message],
149) -> Iterable[int]:
150 """Yield line number of star import usage."""
151 for message in messages:
152 if isinstance(message, pyflakes.messages.ImportStarUsed):
153 yield message.lineno
156def star_import_usage_undefined_name(
157 messages: Iterable[pyflakes.messages.Message],
158) -> Iterable[tuple[int, str, str]]:
159 """Yield line number, undefined name, and its possible origin module."""
160 for message in messages:
161 if isinstance(message, pyflakes.messages.ImportStarUsage):
162 undefined_name = message.message_args[0]
163 module_name = message.message_args[1]
164 yield (message.lineno, undefined_name, module_name)
167def unused_variable_line_numbers(
168 messages: Iterable[pyflakes.messages.Message],
169) -> Iterable[int]:
170 """Yield line numbers of unused variables."""
171 for message in messages:
172 if isinstance(message, pyflakes.messages.UnusedVariable):
173 yield message.lineno
176def duplicate_key_line_numbers(
177 messages: Iterable[pyflakes.messages.Message],
178 source: str,
179) -> Iterable[int]:
180 """Yield line numbers of duplicate keys."""
181 messages = [
182 message
183 for message in messages
184 if isinstance(message, pyflakes.messages.MultiValueRepeatedKeyLiteral)
185 ]
187 if messages:
188 # Filter out complex cases. We don't want to bother trying to parse
189 # this stuff and get it right. We can do it on a key-by-key basis.
191 key_to_messages = create_key_to_messages_dict(messages)
193 lines = source.split("\n")
195 for key, messages in key_to_messages.items():
196 good = True
197 for message in messages:
198 line = lines[message.lineno - 1]
199 key = message.message_args[0]
201 if not dict_entry_has_key(line, key):
202 good = False
204 if good:
205 for message in messages:
206 yield message.lineno
209def create_key_to_messages_dict(
210 messages: Iterable[pyflakes.messages.MultiValueRepeatedKeyLiteral],
211) -> Mapping[Any, Iterable[pyflakes.messages.MultiValueRepeatedKeyLiteral]]:
212 """Return dict mapping the key to list of messages."""
213 dictionary: dict[
214 Any,
215 list[pyflakes.messages.MultiValueRepeatedKeyLiteral],
216 ] = collections.defaultdict(list)
217 for message in messages:
218 dictionary[message.message_args[0]].append(message)
219 return dictionary
222def check(source: str) -> Iterable[pyflakes.messages.Message]:
223 """Return messages from pyflakes."""
224 reporter = ListReporter()
225 try:
226 pyflakes.api.check(source, filename="<string>", reporter=reporter)
227 except (AttributeError, RecursionError, UnicodeDecodeError):
228 pass
229 return reporter.messages
232class StubFile:
233 """Stub out file for pyflakes."""
235 def write(self, *_: Any) -> None:
236 """Stub out."""
239class ListReporter(pyflakes.reporter.Reporter):
240 """Accumulate messages in messages list."""
242 def __init__(self) -> None:
243 """Initialize.
245 Ignore errors from Reporter.
246 """
247 ignore = StubFile()
248 pyflakes.reporter.Reporter.__init__(self, ignore, ignore)
249 self.messages: list[pyflakes.messages.Message] = []
251 def flake(self, message: pyflakes.messages.Message) -> None:
252 """Accumulate messages."""
253 self.messages.append(message)
256def extract_package_name(line: str) -> str | None:
257 """Return package name in import statement."""
258 assert "\\" not in line
259 assert "(" not in line
260 assert ")" not in line
261 assert ";" not in line
263 if line.lstrip().startswith(("import", "from")):
264 parts = line.split()
265 if len(parts) < 2:
266 return None
267 word = parts[1]
268 else:
269 # Ignore doctests.
270 return None
272 package = word.split(".")[0]
273 assert " " not in package
275 return package
278def multiline_import(line: str, previous_line: str = "") -> bool:
279 """Return True if import is spans multiples lines."""
280 for symbol in "()":
281 if symbol in line:
282 return True
284 return multiline_statement(line, previous_line)
287def multiline_statement(line: str, previous_line: str = "") -> bool:
288 """Return True if this is part of a multiline statement."""
289 for symbol in "\\:;":
290 if symbol in line:
291 return True
293 sio = io.StringIO(line)
294 try:
295 list(tokenize.generate_tokens(sio.readline))
296 return previous_line.rstrip().endswith("\\")
297 except (SyntaxError, tokenize.TokenError):
298 return True
301class PendingFix:
302 """Allows a rewrite operation to span multiple lines.
304 In the main rewrite loop, every time a helper function returns a
305 ``PendingFix`` object instead of a string, this object will be called
306 with the following line.
307 """
309 def __init__(self, line: str) -> None:
310 """Analyse and store the first line."""
311 self.accumulator = collections.deque([line])
313 def __call__(self, line: str) -> PendingFix | str:
314 """Process line considering the accumulator.
316 Return self to keep processing the following lines or a string
317 with the final result of all the lines processed at once.
318 """
319 raise NotImplementedError("Abstract method needs to be overwritten")
322def _valid_char_in_line(char: str, line: str) -> bool:
323 """Return True if a char appears in the line and is not commented."""
324 comment_index = line.find("#")
325 char_index = line.find(char)
326 valid_char_in_line = char_index >= 0 and (
327 comment_index > char_index or comment_index < 0
328 )
329 return valid_char_in_line
332def _top_module(module_name: str) -> str:
333 """Return the name of the top level module in the hierarchy."""
334 if module_name[0] == ".":
335 return "%LOCAL_MODULE%"
336 return module_name.split(".")[0]
339def _modules_to_remove(
340 unused_modules: Iterable[str],
341 safe_to_remove: Iterable[str] = SAFE_IMPORTS,
342) -> Iterable[str]:
343 """Discard unused modules that are not safe to remove from the list."""
344 return [x for x in unused_modules if _top_module(x) in safe_to_remove]
347def _segment_module(segment: str) -> str:
348 """Extract the module identifier inside the segment.
350 It might be the case the segment does not have a module (e.g. is composed
351 just by a parenthesis or line continuation and whitespace). In this
352 scenario we just keep the segment... These characters are not valid in
353 identifiers, so they will never be contained in the list of unused modules
354 anyway.
355 """
356 return segment.strip(string.whitespace + ",\\()") or segment
359class FilterMultilineImport(PendingFix):
360 """Remove unused imports from multiline import statements.
362 This class handles both the cases: "from imports" and "direct imports".
364 Some limitations exist (e.g. imports with comments, lines joined by ``;``,
365 etc). In these cases, the statement is left unchanged to avoid problems.
366 """
368 IMPORT_RE = re.compile(r"\bimport\b\s*")
369 INDENTATION_RE = re.compile(r"^\s*")
370 BASE_RE = re.compile(r"\bfrom\s+([^ ]+)")
371 SEGMENT_RE = re.compile(
372 r"([^,\s]+(?:[\s\\]+as[\s\\]+[^,\s]+)?[,\s\\)]*)",
373 re.M,
374 )
375 # ^ module + comma + following space (including new line and continuation)
376 IDENTIFIER_RE = re.compile(r"[^,\s]+")
378 def __init__(
379 self,
380 line: str,
381 unused_module: Iterable[str] = (),
382 remove_all_unused_imports: bool = False,
383 safe_to_remove: Iterable[str] = SAFE_IMPORTS,
384 previous_line: str = "",
385 ):
386 """Receive the same parameters as ``filter_unused_import``."""
387 self.remove: Iterable[str] = unused_module
388 self.parenthesized: bool = "(" in line
389 self.from_, imports = self.IMPORT_RE.split(line, maxsplit=1)
390 match = self.BASE_RE.search(self.from_)
391 self.base = match.group(1) if match else None
392 self.give_up: bool = False
394 if not remove_all_unused_imports:
395 if self.base and _top_module(self.base) not in safe_to_remove:
396 self.give_up = True
397 else:
398 self.remove = _modules_to_remove(self.remove, safe_to_remove)
400 if "\\" in previous_line:
401 # Ignore tricky things like "try: \<new line> import" ...
402 self.give_up = True
404 self.analyze(line)
406 PendingFix.__init__(self, imports)
408 def is_over(self, line: str | None = None) -> bool:
409 """Return True if the multiline import statement is over."""
410 line = line or self.accumulator[-1]
412 if self.parenthesized:
413 return _valid_char_in_line(")", line)
415 return not _valid_char_in_line("\\", line)
417 def analyze(self, line: str) -> None:
418 """Decide if the statement will be fixed or left unchanged."""
419 if any(ch in line for ch in ";:#"):
420 self.give_up = True
422 def fix(self, accumulated: Iterable[str]) -> str:
423 """Given a collection of accumulated lines, fix the entire import."""
424 old_imports = "".join(accumulated)
425 ending = get_line_ending(old_imports)
426 # Split imports into segments that contain the module name +
427 # comma + whitespace and eventual <newline> \ ( ) chars
428 segments = [x for x in self.SEGMENT_RE.findall(old_imports) if x]
429 modules = [_segment_module(x) for x in segments]
430 keep = _filter_imports(modules, self.base, self.remove)
432 # Short-circuit if no import was discarded
433 if len(keep) == len(segments):
434 return self.from_ + "import " + "".join(accumulated)
436 fixed = ""
437 if keep:
438 # Since it is very difficult to deal with all the line breaks and
439 # continuations, let's use the code layout that already exists and
440 # just replace the module identifiers inside the first N-1 segments
441 # + the last segment
442 templates = list(zip(modules, segments))
443 templates = templates[: len(keep) - 1] + templates[-1:]
444 # It is important to keep the last segment, since it might contain
445 # important chars like `)`
446 fixed = "".join(
447 template.replace(module, keep[i])
448 for i, (module, template) in enumerate(templates)
449 )
451 # Fix the edge case: inline parenthesis + just one surviving import
452 if self.parenthesized and any(ch not in fixed for ch in "()"):
453 fixed = fixed.strip(string.whitespace + "()") + ending
455 # Replace empty imports with a "pass" statement
456 empty = len(fixed.strip(string.whitespace + "\\(),")) < 1
457 if empty:
458 match = self.INDENTATION_RE.search(self.from_)
459 assert match is not None
460 indentation = match.group(0)
461 return indentation + "pass" + ending
463 return self.from_ + "import " + fixed
465 def __call__(self, line: str | None = None) -> PendingFix | str:
466 """Accumulate all the lines in the import and then trigger the fix."""
467 if line:
468 self.accumulator.append(line)
469 self.analyze(line)
470 if not self.is_over(line):
471 return self
472 if self.give_up:
473 return self.from_ + "import " + "".join(self.accumulator)
475 return self.fix(self.accumulator)
478def _filter_imports(
479 imports: Iterable[str],
480 parent: str | None = None,
481 unused_module: Iterable[str] = (),
482) -> Sequence[str]:
483 # We compare full module name (``a.module`` not `module`) to
484 # guarantee the exact same module as detected from pyflakes.
485 sep = "" if parent and parent[-1] == "." else "."
487 def full_name(name: str) -> str:
488 return name if parent is None else parent + sep + name
490 return [x for x in imports if full_name(x) not in unused_module]
493def filter_from_import(line: str, unused_module: Iterable[str]) -> str:
494 """Parse and filter ``from something import a, b, c``.
496 Return line without unused import modules, or `pass` if all of the
497 module in import is unused.
498 """
499 indentation, imports = re.split(
500 pattern=r"\bimport\b",
501 string=line,
502 maxsplit=1,
503 )
504 match = re.search(
505 pattern=r"\bfrom\s+([^ ]+)",
506 string=indentation,
507 )
508 assert match is not None
509 base_module = match.group(1)
511 imports = re.split(pattern=r"\s*,\s*", string=imports.strip())
512 filtered_imports = _filter_imports(imports, base_module, unused_module)
514 # All of the import in this statement is unused
515 if not filtered_imports:
516 return get_indentation(line) + "pass" + get_line_ending(line)
518 indentation += "import "
520 return indentation + ", ".join(filtered_imports) + get_line_ending(line)
523def break_up_import(line: str) -> str:
524 """Return line with imports on separate lines."""
525 assert "\\" not in line
526 assert "(" not in line
527 assert ")" not in line
528 assert ";" not in line
529 assert "#" not in line
530 assert not line.lstrip().startswith("from")
532 newline = get_line_ending(line)
533 if not newline:
534 return line
536 indentation, imports = re.split(
537 pattern=r"\bimport\b",
538 string=line,
539 maxsplit=1,
540 )
542 indentation += "import "
543 assert newline
545 return "".join(
546 [indentation + i.strip() + newline for i in imports.split(",")],
547 )
550def filter_code(
551 source: str,
552 additional_imports: Iterable[str] | None = None,
553 expand_star_imports: bool = False,
554 remove_all_unused_imports: bool = False,
555 remove_duplicate_keys: bool = False,
556 remove_unused_variables: bool = False,
557 remove_rhs_for_unused_variables: bool = False,
558 ignore_init_module_imports: bool = False,
559) -> Iterable[str]:
560 """Yield code with unused imports removed."""
561 imports = SAFE_IMPORTS
562 if additional_imports:
563 imports |= frozenset(additional_imports)
564 del additional_imports
566 messages = check(source)
568 if ignore_init_module_imports:
569 marked_import_line_numbers: frozenset[int] = frozenset()
570 else:
571 marked_import_line_numbers = frozenset(
572 unused_import_line_numbers(messages),
573 )
574 marked_unused_module: dict[int, list[str]] = collections.defaultdict(list)
575 for line_number, module_name in unused_import_module_name(messages):
576 marked_unused_module[line_number].append(module_name)
578 undefined_names: list[str] = []
579 if expand_star_imports and not (
580 # See explanations in #18.
581 re.search(r"\b__all__\b", source) or re.search(r"\bdel\b", source)
582 ):
583 marked_star_import_line_numbers = frozenset(
584 star_import_used_line_numbers(messages),
585 )
586 if len(marked_star_import_line_numbers) > 1:
587 # Auto expanding only possible for single star import
588 marked_star_import_line_numbers = frozenset()
589 else:
590 for line_number, undefined_name, _ in star_import_usage_undefined_name(
591 messages,
592 ):
593 undefined_names.append(undefined_name)
594 if not undefined_names:
595 marked_star_import_line_numbers = frozenset()
596 else:
597 marked_star_import_line_numbers = frozenset()
599 if remove_unused_variables:
600 marked_variable_line_numbers = frozenset(
601 unused_variable_line_numbers(messages),
602 )
603 else:
604 marked_variable_line_numbers = frozenset()
606 if remove_duplicate_keys:
607 marked_key_line_numbers: frozenset[int] = frozenset(
608 duplicate_key_line_numbers(messages, source),
609 )
610 else:
611 marked_key_line_numbers = frozenset()
613 line_messages = get_messages_by_line(messages)
615 sio = io.StringIO(source)
616 previous_line = ""
617 result: str | PendingFix = ""
618 for line_number, line in enumerate(sio.readlines(), start=1):
619 if isinstance(result, PendingFix):
620 result = result(line)
621 elif "#" in line:
622 result = line
623 elif line_number in marked_import_line_numbers:
624 result = filter_unused_import(
625 line,
626 unused_module=marked_unused_module[line_number],
627 remove_all_unused_imports=remove_all_unused_imports,
628 imports=imports,
629 previous_line=previous_line,
630 )
631 elif line_number in marked_variable_line_numbers:
632 result = filter_unused_variable(
633 line,
634 drop_rhs=remove_rhs_for_unused_variables,
635 )
636 elif line_number in marked_key_line_numbers:
637 result = filter_duplicate_key(
638 line,
639 line_messages[line_number],
640 line_number,
641 marked_key_line_numbers,
642 source,
643 )
644 elif line_number in marked_star_import_line_numbers:
645 result = filter_star_import(line, undefined_names)
646 else:
647 result = line
649 if not isinstance(result, PendingFix):
650 yield result
652 previous_line = line
655def get_messages_by_line(
656 messages: Iterable[pyflakes.messages.Message],
657) -> Mapping[int, pyflakes.messages.Message]:
658 """Return dictionary that maps line number to message."""
659 line_messages: dict[int, pyflakes.messages.Message] = {}
660 for message in messages:
661 line_messages[message.lineno] = message
662 return line_messages
665def filter_star_import(
666 line: str,
667 marked_star_import_undefined_name: Iterable[str],
668) -> str:
669 """Return line with the star import expanded."""
670 undefined_name = sorted(set(marked_star_import_undefined_name))
671 return re.sub(r"\*", ", ".join(undefined_name), line)
674def filter_unused_import(
675 line: str,
676 unused_module: Iterable[str],
677 remove_all_unused_imports: bool,
678 imports: Iterable[str],
679 previous_line: str = "",
680) -> PendingFix | str:
681 """Return line if used, otherwise return None."""
682 # Ignore doctests.
683 if line.lstrip().startswith(">"):
684 return line
686 if multiline_import(line, previous_line):
687 filt = FilterMultilineImport(
688 line,
689 unused_module,
690 remove_all_unused_imports,
691 imports,
692 previous_line,
693 )
694 return filt()
696 is_from_import = line.lstrip().startswith("from")
698 if "," in line and not is_from_import:
699 return break_up_import(line)
701 package = extract_package_name(line)
702 if not remove_all_unused_imports and package is not None and package not in imports:
703 return line
705 if "," in line:
706 assert is_from_import
707 return filter_from_import(line, unused_module)
708 else:
709 # We need to replace import with "pass" in case the import is the
710 # only line inside a block. For example,
711 # "if True:\n import os". In such cases, if the import is
712 # removed, the block will be left hanging with no body.
713 return get_indentation(line) + "pass" + get_line_ending(line)
716def filter_unused_variable(
717 line: str,
718 previous_line: str = "",
719 drop_rhs: bool = False,
720) -> str:
721 """Return line if used, otherwise return None."""
722 if re.match(EXCEPT_REGEX, line):
723 return re.sub(r" as \w+:$", ":", line, count=1)
724 elif multiline_statement(line, previous_line):
725 return line
726 elif line.count("=") == 1:
727 split_line = line.split("=")
728 assert len(split_line) == 2
729 value = split_line[1].lstrip()
730 if "," in split_line[0]:
731 return line
733 if is_literal_or_name(value):
734 # Rather than removing the line, replace with it "pass" to avoid
735 # a possible hanging block with no body.
736 value = "pass" + get_line_ending(line)
737 if drop_rhs:
738 return get_indentation(line) + value
740 if drop_rhs:
741 return ""
742 return get_indentation(line) + value
743 else:
744 return line
747def filter_duplicate_key(
748 line: str,
749 message: pyflakes.messages.Message,
750 line_number: int,
751 marked_line_numbers: Iterable[int],
752 source: str,
753 previous_line: str = "",
754) -> str:
755 """Return '' if first occurrence of the key otherwise return `line`."""
756 if marked_line_numbers and line_number == sorted(marked_line_numbers)[0]:
757 return ""
759 return line
762def dict_entry_has_key(line: str, key: Any) -> bool:
763 """Return True if `line` is a dict entry that uses `key`.
765 Return False for multiline cases where the line should not be removed by
766 itself.
768 """
769 if "#" in line:
770 return False
772 result = re.match(r"\s*(.*)\s*:\s*(.*),\s*$", line)
773 if not result:
774 return False
776 try:
777 candidate_key = ast.literal_eval(result.group(1))
778 except (SyntaxError, ValueError):
779 return False
781 if multiline_statement(result.group(2)):
782 return False
784 return cast(bool, candidate_key == key)
787def is_literal_or_name(value: str) -> bool:
788 """Return True if value is a literal or a name."""
789 try:
790 ast.literal_eval(value)
791 return True
792 except (SyntaxError, TypeError, ValueError):
793 pass
795 if value.strip() in ["dict()", "list()", "set()"]:
796 return True
798 # Support removal of variables on the right side. But make sure
799 # there are no dots, which could mean an access of a property.
800 return re.match(r"^\w+\s*$", value) is not None
803def useless_pass_line_numbers(
804 source: str,
805 ignore_pass_after_docstring: bool = False,
806) -> Iterable[int]:
807 """Yield line numbers of unneeded "pass" statements."""
808 sio = io.StringIO(source)
809 previous_token_type = None
810 last_pass_row = None
811 last_pass_indentation = None
812 previous_line = ""
813 previous_non_empty_line = ""
814 for token in tokenize.generate_tokens(sio.readline):
815 token_type = token[0]
816 start_row = token[2][0]
817 line = token[4]
819 is_pass = token_type == tokenize.NAME and line.strip() == "pass"
821 # Leading "pass".
822 if (
823 start_row - 1 == last_pass_row
824 and get_indentation(line) == last_pass_indentation
825 and token_type in ATOMS
826 and not is_pass
827 ):
828 yield start_row - 1
830 if is_pass:
831 last_pass_row = start_row
832 last_pass_indentation = get_indentation(line)
834 is_trailing_pass = (
835 previous_token_type != tokenize.INDENT
836 and not previous_line.rstrip().endswith("\\")
837 )
839 is_pass_after_docstring = previous_non_empty_line.rstrip().endswith(
840 ("'''", '"""'),
841 )
843 # Trailing "pass".
844 if is_trailing_pass:
845 if is_pass_after_docstring and ignore_pass_after_docstring:
846 continue
847 else:
848 yield start_row
850 previous_token_type = token_type
851 previous_line = line
852 if line.strip():
853 previous_non_empty_line = line
856def filter_useless_pass(
857 source: str,
858 ignore_pass_statements: bool = False,
859 ignore_pass_after_docstring: bool = False,
860) -> Iterable[str]:
861 """Yield code with useless "pass" lines removed."""
862 if ignore_pass_statements:
863 marked_lines: frozenset[int] = frozenset()
864 else:
865 try:
866 marked_lines = frozenset(
867 useless_pass_line_numbers(
868 source,
869 ignore_pass_after_docstring,
870 ),
871 )
872 except (SyntaxError, tokenize.TokenError):
873 marked_lines = frozenset()
875 sio = io.StringIO(source)
876 for line_number, line in enumerate(sio.readlines(), start=1):
877 if line_number not in marked_lines:
878 yield line
881def get_indentation(line: str) -> str:
882 """Return leading whitespace."""
883 if line.strip():
884 non_whitespace_index = len(line) - len(line.lstrip())
885 return line[:non_whitespace_index]
886 else:
887 return ""
890def get_line_ending(line: str) -> str:
891 """Return line ending."""
892 non_whitespace_index = len(line.rstrip()) - len(line)
893 if not non_whitespace_index:
894 return ""
895 else:
896 return line[non_whitespace_index:]
899def fix_code(
900 source: str,
901 additional_imports: Iterable[str] | None = None,
902 expand_star_imports: bool = False,
903 remove_all_unused_imports: bool = False,
904 remove_duplicate_keys: bool = False,
905 remove_unused_variables: bool = False,
906 remove_rhs_for_unused_variables: bool = False,
907 ignore_init_module_imports: bool = False,
908 ignore_pass_statements: bool = False,
909 ignore_pass_after_docstring: bool = False,
910) -> str:
911 """Return code with all filtering run on it."""
912 if not source:
913 return source
915 if IGNORE_COMMENT_REGEX.search(source):
916 return source
918 # pyflakes does not handle "nonlocal" correctly.
919 if "nonlocal" in source:
920 remove_unused_variables = False
922 filtered_source = None
923 while True:
924 filtered_source = "".join(
925 filter_useless_pass(
926 "".join(
927 filter_code(
928 source,
929 additional_imports=additional_imports,
930 expand_star_imports=expand_star_imports,
931 remove_all_unused_imports=remove_all_unused_imports,
932 remove_duplicate_keys=remove_duplicate_keys,
933 remove_unused_variables=remove_unused_variables,
934 remove_rhs_for_unused_variables=(
935 remove_rhs_for_unused_variables
936 ),
937 ignore_init_module_imports=ignore_init_module_imports,
938 ),
939 ),
940 ignore_pass_statements=ignore_pass_statements,
941 ignore_pass_after_docstring=ignore_pass_after_docstring,
942 ),
943 )
945 if filtered_source == source:
946 break
947 source = filtered_source
949 return filtered_source
952def fix_file(
953 filename: str,
954 args: Mapping[str, Any],
955 standard_out: IO[str] | None = None,
956) -> int:
957 """Run fix_code() on a file."""
958 if standard_out is None:
959 standard_out = sys.stdout
960 encoding = detect_encoding(filename)
961 with open_with_encoding(filename, encoding=encoding) as input_file:
962 return _fix_file(
963 input_file,
964 filename,
965 args,
966 args["write_to_stdout"],
967 cast(IO[str], standard_out),
968 encoding=encoding,
969 )
972def _fix_file(
973 input_file: IO[str],
974 filename: str,
975 args: Mapping[str, Any],
976 write_to_stdout: bool,
977 standard_out: IO[str],
978 encoding: str | None = None,
979) -> int:
980 source = input_file.read()
981 original_source = source
983 isInitFile = os.path.basename(filename) == "__init__.py"
985 if args["ignore_init_module_imports"] and isInitFile:
986 ignore_init_module_imports = True
987 else:
988 ignore_init_module_imports = False
990 filtered_source = fix_code(
991 source,
992 additional_imports=(args["imports"].split(",") if "imports" in args else None),
993 expand_star_imports=args["expand_star_imports"],
994 remove_all_unused_imports=args["remove_all_unused_imports"],
995 remove_duplicate_keys=args["remove_duplicate_keys"],
996 remove_unused_variables=args["remove_unused_variables"],
997 remove_rhs_for_unused_variables=(args["remove_rhs_for_unused_variables"]),
998 ignore_init_module_imports=ignore_init_module_imports,
999 ignore_pass_statements=args["ignore_pass_statements"],
1000 ignore_pass_after_docstring=args["ignore_pass_after_docstring"],
1001 )
1003 if original_source != filtered_source:
1004 if args["check"]:
1005 standard_out.write(
1006 f"{filename}: Unused imports/variables detected{os.linesep}",
1007 )
1008 return 1
1009 if args["check_diff"]:
1010 diff = get_diff_text(
1011 io.StringIO(original_source).readlines(),
1012 io.StringIO(filtered_source).readlines(),
1013 filename,
1014 )
1015 standard_out.write("".join(diff))
1016 return 1
1017 if write_to_stdout:
1018 standard_out.write(filtered_source)
1019 elif args["in_place"]:
1020 with open_with_encoding(
1021 filename,
1022 mode="w",
1023 encoding=encoding,
1024 ) as output_file:
1025 output_file.write(filtered_source)
1026 _LOGGER.info("Fixed %s", filename)
1027 else:
1028 diff = get_diff_text(
1029 io.StringIO(original_source).readlines(),
1030 io.StringIO(filtered_source).readlines(),
1031 filename,
1032 )
1033 standard_out.write("".join(diff))
1034 elif write_to_stdout:
1035 standard_out.write(filtered_source)
1036 else:
1037 if (args["check"] or args["check_diff"]) and not args["quiet"]:
1038 standard_out.write(f"{filename}: No issues detected!{os.linesep}")
1039 else:
1040 _LOGGER.debug("Clean %s: nothing to fix", filename)
1042 return 0
1045def open_with_encoding(
1046 filename: str,
1047 encoding: str | None,
1048 mode: str = "r",
1049 limit_byte_check: int = -1,
1050) -> IO[str]:
1051 """Return opened file with a specific encoding."""
1052 if not encoding:
1053 encoding = detect_encoding(filename, limit_byte_check=limit_byte_check)
1055 return open(
1056 filename,
1057 mode=mode,
1058 encoding=encoding,
1059 newline="", # Preserve line endings
1060 )
1063def detect_encoding(filename: str, limit_byte_check: int = -1) -> str:
1064 """Return file encoding."""
1065 try:
1066 with open(filename, "rb") as input_file:
1067 encoding = _detect_encoding(input_file.readline)
1069 # Check for correctness of encoding.
1070 with open_with_encoding(filename, encoding) as input_file:
1071 input_file.read(limit_byte_check)
1073 return encoding
1074 except (LookupError, SyntaxError, UnicodeDecodeError):
1075 return "latin-1"
1078def _detect_encoding(readline: Callable[[], bytes]) -> str:
1079 """Return file encoding."""
1080 try:
1081 encoding = tokenize.detect_encoding(readline)[0]
1082 return encoding
1083 except (LookupError, SyntaxError, UnicodeDecodeError):
1084 return "latin-1"
1087def get_diff_text(old: Sequence[str], new: Sequence[str], filename: str) -> str:
1088 """Return text of unified diff between old and new."""
1089 newline = "\n"
1090 diff = difflib.unified_diff(
1091 old,
1092 new,
1093 "original/" + filename,
1094 "fixed/" + filename,
1095 lineterm=newline,
1096 )
1098 text = ""
1099 for line in diff:
1100 text += line
1102 # Work around missing newline (http://bugs.python.org/issue2142).
1103 if not line.endswith(newline):
1104 text += newline + r"\ No newline at end of file" + newline
1106 return text
1109def _split_comma_separated(string: str) -> set[str]:
1110 """Return a set of strings."""
1111 return {text.strip() for text in string.split(",") if text.strip()}
1114def is_python_file(filename: str) -> bool:
1115 """Return True if filename is Python file."""
1116 if filename.endswith(".py"):
1117 return True
1119 try:
1120 with open_with_encoding(
1121 filename,
1122 None,
1123 limit_byte_check=MAX_PYTHON_FILE_DETECTION_BYTES,
1124 ) as f:
1125 text = f.read(MAX_PYTHON_FILE_DETECTION_BYTES)
1126 if not text:
1127 return False
1128 first_line = text.splitlines()[0]
1129 except (OSError, IndexError):
1130 return False
1132 if not PYTHON_SHEBANG_REGEX.match(first_line):
1133 return False
1135 return True
1138def is_exclude_file(filename: str, exclude: Iterable[str]) -> bool:
1139 """Return True if file matches exclude pattern."""
1140 base_name = os.path.basename(filename)
1142 if base_name.startswith("."):
1143 return True
1145 for pattern in exclude:
1146 if fnmatch.fnmatch(base_name, pattern):
1147 return True
1148 if fnmatch.fnmatch(filename, pattern):
1149 return True
1150 return False
1153def match_file(filename: str, exclude: Iterable[str]) -> bool:
1154 """Return True if file is okay for modifying/recursing."""
1155 if is_exclude_file(filename, exclude):
1156 _LOGGER.debug("Skipped %s: matched to exclude pattern", filename)
1157 return False
1159 if not os.path.isdir(filename) and not is_python_file(filename):
1160 return False
1162 return True
1165def find_files(
1166 filenames: list[str],
1167 recursive: bool,
1168 exclude: Iterable[str],
1169) -> Iterable[str]:
1170 """Yield filenames."""
1171 while filenames:
1172 name = filenames.pop(0)
1173 if recursive and os.path.isdir(name):
1174 for root, directories, children in os.walk(name):
1175 filenames += [
1176 os.path.join(root, f)
1177 for f in children
1178 if match_file(
1179 os.path.join(root, f),
1180 exclude,
1181 )
1182 ]
1183 directories[:] = [
1184 d
1185 for d in directories
1186 if match_file(
1187 os.path.join(root, d),
1188 exclude,
1189 )
1190 ]
1191 else:
1192 if not is_exclude_file(name, exclude):
1193 yield name
1194 else:
1195 _LOGGER.debug("Skipped %s: matched to exclude pattern", name)
1198def process_pyproject_toml(toml_file_path: str) -> MutableMapping[str, Any] | None:
1199 """Extract config mapping from pyproject.toml file."""
1200 try:
1201 import tomllib
1202 except ModuleNotFoundError:
1203 import tomli as tomllib
1205 with open(toml_file_path, "rb") as f:
1206 return tomllib.load(f).get("tool", {}).get("autoflake", None)
1209def process_config_file(config_file_path: str) -> MutableMapping[str, Any] | None:
1210 """Extract config mapping from config file."""
1211 import configparser
1213 reader = configparser.ConfigParser()
1214 reader.read(config_file_path, encoding="utf-8")
1215 if not reader.has_section("autoflake"):
1216 return None
1218 return reader["autoflake"]
1221def find_and_process_config(args: Mapping[str, Any]) -> MutableMapping[str, Any] | None:
1222 # Configuration file parsers {filename: parser function}.
1223 CONFIG_FILES: Mapping[str, Callable[[str], MutableMapping[str, Any] | None]] = {
1224 "pyproject.toml": process_pyproject_toml,
1225 "setup.cfg": process_config_file,
1226 }
1227 # Traverse the file tree common to all files given as argument looking for
1228 # a configuration file
1229 config_path = os.path.commonpath([os.path.abspath(file) for file in args["files"]])
1230 config: Mapping[str, Any] | None = None
1231 while True:
1232 for config_file, processor in CONFIG_FILES.items():
1233 config_file_path = os.path.join(
1234 os.path.join(config_path, config_file),
1235 )
1236 if os.path.isfile(config_file_path):
1237 config = processor(config_file_path)
1238 if config is not None:
1239 break
1240 if config is not None:
1241 break
1242 config_path, tail = os.path.split(config_path)
1243 if not tail:
1244 break
1245 return config
1248def merge_configuration_file(
1249 flag_args: MutableMapping[str, Any],
1250) -> tuple[MutableMapping[str, Any], bool]:
1251 """Merge configuration from a file into args."""
1252 BOOL_TYPES = {
1253 "1": True,
1254 "yes": True,
1255 "true": True,
1256 "on": True,
1257 "0": False,
1258 "no": False,
1259 "false": False,
1260 "off": False,
1261 }
1263 if "config_file" in flag_args:
1264 config_file = pathlib.Path(flag_args["config_file"]).resolve()
1265 process_method = process_config_file
1266 if config_file.suffix == ".toml":
1267 process_method = process_pyproject_toml
1269 config = process_method(str(config_file))
1271 if not config:
1272 _LOGGER.error(
1273 "can't parse config file '%s'",
1274 config_file,
1275 )
1276 return flag_args, False
1277 else:
1278 config = find_and_process_config(flag_args)
1280 BOOL_FLAGS = {
1281 "check",
1282 "check_diff",
1283 "expand_star_imports",
1284 "ignore_init_module_imports",
1285 "ignore_pass_after_docstring",
1286 "ignore_pass_statements",
1287 "in_place",
1288 "quiet",
1289 "recursive",
1290 "remove_all_unused_imports",
1291 "remove_duplicate_keys",
1292 "remove_rhs_for_unused_variables",
1293 "remove_unused_variables",
1294 "write_to_stdout",
1295 }
1297 config_args: dict[str, Any] = {}
1298 if config is not None:
1299 for name, value in config.items():
1300 arg = name.replace("-", "_")
1301 if arg in BOOL_FLAGS:
1302 # boolean properties
1303 if isinstance(value, str):
1304 value = BOOL_TYPES.get(value.lower(), value)
1305 if not isinstance(value, bool):
1306 _LOGGER.error(
1307 "'%s' in the config file should be a boolean",
1308 name,
1309 )
1310 return flag_args, False
1311 config_args[arg] = value
1312 else:
1313 if isinstance(value, list) and all(
1314 isinstance(val, str) for val in value
1315 ):
1316 value = ",".join(str(val) for val in value)
1317 if not isinstance(value, str):
1318 _LOGGER.error(
1319 "'%s' in the config file should be a comma separated"
1320 " string or list of strings",
1321 name,
1322 )
1323 return flag_args, False
1325 config_args[arg] = value
1327 # merge args that can be merged
1328 merged_args = {}
1329 mergeable_keys = {"imports", "exclude"}
1330 for key in mergeable_keys:
1331 values = (
1332 v for v in (config_args.get(key), flag_args.get(key)) if v is not None
1333 )
1334 value = ",".join(values)
1335 if value != "":
1336 merged_args[key] = value
1338 default_args = {arg: False for arg in BOOL_FLAGS}
1339 return {
1340 **default_args,
1341 **config_args,
1342 **flag_args,
1343 **merged_args,
1344 }, True
1347def _main(
1348 argv: Sequence[str],
1349 standard_out: IO[str] | None,
1350 standard_error: IO[str] | None,
1351 standard_input: IO[str] | None = None,
1352) -> int:
1353 """Return exit status.
1355 0 means no error.
1356 """
1357 import argparse
1359 parser = argparse.ArgumentParser(
1360 description=__doc__,
1361 prog="autoflake",
1362 argument_default=argparse.SUPPRESS,
1363 )
1364 check_group = parser.add_mutually_exclusive_group()
1365 check_group.add_argument(
1366 "-c",
1367 "--check",
1368 action="store_true",
1369 help="return error code if changes are needed",
1370 )
1371 check_group.add_argument(
1372 "-cd",
1373 "--check-diff",
1374 action="store_true",
1375 help="return error code if changes are needed, also display file diffs",
1376 )
1378 imports_group = parser.add_mutually_exclusive_group()
1379 imports_group.add_argument(
1380 "--imports",
1381 help="by default, only unused standard library "
1382 "imports are removed; specify a comma-separated "
1383 "list of additional modules/packages",
1384 )
1385 imports_group.add_argument(
1386 "--remove-all-unused-imports",
1387 action="store_true",
1388 help="remove all unused imports (not just those from the standard library)",
1389 )
1391 parser.add_argument(
1392 "-r",
1393 "--recursive",
1394 action="store_true",
1395 help="drill down directories recursively",
1396 )
1397 parser.add_argument(
1398 "-j",
1399 "--jobs",
1400 type=int,
1401 metavar="n",
1402 default=0,
1403 help="number of parallel jobs; match CPU count if value is 0 (default: 0)",
1404 )
1405 parser.add_argument(
1406 "--exclude",
1407 metavar="globs",
1408 help="exclude file/directory names that match these comma-separated globs",
1409 )
1410 parser.add_argument(
1411 "--expand-star-imports",
1412 action="store_true",
1413 help="expand wildcard star imports with undefined "
1414 "names; this only triggers if there is only "
1415 "one star import in the file; this is skipped if "
1416 "there are any uses of `__all__` or `del` in the "
1417 "file",
1418 )
1419 parser.add_argument(
1420 "--ignore-init-module-imports",
1421 action="store_true",
1422 help="exclude __init__.py when removing unused imports",
1423 )
1424 parser.add_argument(
1425 "--remove-duplicate-keys",
1426 action="store_true",
1427 help="remove all duplicate keys in objects",
1428 )
1429 parser.add_argument(
1430 "--remove-unused-variables",
1431 action="store_true",
1432 help="remove unused variables",
1433 )
1434 parser.add_argument(
1435 "--remove-rhs-for-unused-variables",
1436 action="store_true",
1437 help="remove RHS of statements when removing unused variables (unsafe)",
1438 )
1439 parser.add_argument(
1440 "--ignore-pass-statements",
1441 action="store_true",
1442 help="ignore all pass statements",
1443 )
1444 parser.add_argument(
1445 "--ignore-pass-after-docstring",
1446 action="store_true",
1447 help='ignore pass statements after a newline ending on \'"""\'',
1448 )
1449 parser.add_argument(
1450 "--version",
1451 action="version",
1452 version="%(prog)s " + __version__,
1453 )
1454 parser.add_argument(
1455 "--quiet",
1456 action="store_true",
1457 help="Suppress output if there are no issues",
1458 )
1459 parser.add_argument(
1460 "-v",
1461 "--verbose",
1462 action="count",
1463 dest="verbosity",
1464 default=0,
1465 help="print more verbose logs (you can repeat `-v` to make it more verbose)",
1466 )
1467 parser.add_argument(
1468 "--stdin-display-name",
1469 dest="stdin_display_name",
1470 default="stdin",
1471 help="the name used when processing input from stdin",
1472 )
1474 parser.add_argument(
1475 "--config",
1476 dest="config_file",
1477 help=(
1478 "Explicitly set the config file "
1479 "instead of auto determining based on file location"
1480 ),
1481 )
1483 parser.add_argument("files", nargs="+", help="files to format")
1485 output_group = parser.add_mutually_exclusive_group()
1486 output_group.add_argument(
1487 "-i",
1488 "--in-place",
1489 action="store_true",
1490 help="make changes to files instead of printing diffs",
1491 )
1492 output_group.add_argument(
1493 "-s",
1494 "--stdout",
1495 action="store_true",
1496 dest="write_to_stdout",
1497 help=(
1498 "print changed text to stdout. defaults to true "
1499 "when formatting stdin, or to false otherwise"
1500 ),
1501 )
1503 args: MutableMapping[str, Any] = vars(parser.parse_args(argv[1:]))
1505 if standard_error is None:
1506 _LOGGER.addHandler(logging.NullHandler())
1507 else:
1508 _LOGGER.addHandler(logging.StreamHandler(standard_error))
1509 loglevels = [logging.WARNING, logging.INFO, logging.DEBUG]
1510 try:
1511 loglevel = loglevels[args["verbosity"]]
1512 except IndexError: # Too much -v
1513 loglevel = loglevels[-1]
1514 _LOGGER.setLevel(loglevel)
1516 args, success = merge_configuration_file(args)
1517 if not success:
1518 return 1
1520 if (
1521 args["remove_rhs_for_unused_variables"]
1522 and not (args["remove_unused_variables"])
1523 ):
1524 _LOGGER.error(
1525 "Using --remove-rhs-for-unused-variables only makes sense when "
1526 "used with --remove-unused-variables",
1527 )
1528 return 1
1530 if "exclude" in args:
1531 args["exclude"] = _split_comma_separated(args["exclude"])
1532 else:
1533 args["exclude"] = set()
1535 if args["jobs"] < 1:
1536 worker_count = os.cpu_count()
1537 if sys.platform == "win32":
1538 # Work around https://bugs.python.org/issue26903
1539 worker_count = min(worker_count, 60)
1540 args["jobs"] = worker_count or 1
1542 filenames = list(set(args["files"]))
1544 # convert argparse namespace to a dict so that it can be serialized
1545 # by multiprocessing
1546 exit_status = 0
1547 files = list(find_files(filenames, args["recursive"], args["exclude"]))
1548 if (
1549 args["jobs"] == 1
1550 or len(files) == 1
1551 or args["jobs"] == 1
1552 or "-" in files
1553 or standard_out is not None
1554 ):
1555 for name in files:
1556 if name == "-" and standard_input is not None:
1557 exit_status |= _fix_file(
1558 standard_input,
1559 args["stdin_display_name"],
1560 args=args,
1561 write_to_stdout=True,
1562 standard_out=standard_out or sys.stdout,
1563 )
1564 else:
1565 try:
1566 exit_status |= fix_file(
1567 name,
1568 args=args,
1569 standard_out=standard_out,
1570 )
1571 except OSError as exception:
1572 _LOGGER.error(str(exception))
1573 exit_status |= 1
1574 else:
1575 import multiprocessing
1577 with multiprocessing.Pool(args["jobs"]) as pool:
1578 futs = []
1579 for name in files:
1580 fut = pool.apply_async(fix_file, args=(name, args))
1581 futs.append(fut)
1582 for fut in futs:
1583 try:
1584 exit_status |= fut.get()
1585 except OSError as exception:
1586 _LOGGER.error(str(exception))
1587 exit_status |= 1
1589 return exit_status
1592def main() -> int:
1593 """Command-line entry point."""
1594 try:
1595 # Exit on broken pipe.
1596 signal.signal(signal.SIGPIPE, signal.SIG_DFL)
1597 except AttributeError: # pragma: no cover
1598 # SIGPIPE is not available on Windows.
1599 pass
1601 try:
1602 return _main(
1603 sys.argv,
1604 standard_out=None,
1605 standard_error=sys.stderr,
1606 standard_input=sys.stdin,
1607 )
1608 except KeyboardInterrupt: # pragma: no cover
1609 return 2 # pragma: no cover
1612if __name__ == "__main__":
1613 sys.exit(main())