Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/autoflake.py: 49%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

697 statements  

1#!/usr/bin/env python 

2# Copyright (C) Steven Myint 

3# 

4# Permission is hereby granted, free of charge, to any person obtaining 

5# a copy of this software and associated documentation files (the 

6# "Software"), to deal in the Software without restriction, including 

7# without limitation the rights to use, copy, modify, merge, publish, 

8# distribute, sublicense, and/or sell copies of the Software, and to 

9# permit persons to whom the Software is furnished to do so, subject to 

10# the following conditions: 

11# 

12# The above copyright notice and this permission notice shall be included 

13# in all copies or substantial portions of the Software. 

14# 

15# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 

16# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 

17# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. 

18# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY 

19# CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, 

20# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE 

21# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. 

22"""Removes unused imports and unused variables as reported by pyflakes.""" 

23 

24from __future__ import annotations 

25 

26import ast 

27import collections 

28import difflib 

29import fnmatch 

30import io 

31import logging 

32import os 

33import pathlib 

34import re 

35import signal 

36import string 

37import sys 

38import sysconfig 

39import tokenize 

40from collections.abc import Callable 

41from collections.abc import Iterable 

42from collections.abc import Mapping 

43from collections.abc import MutableMapping 

44from collections.abc import Sequence 

45from typing import Any 

46from typing import cast 

47from typing import IO 

48 

49import pyflakes.api 

50import pyflakes.messages 

51import pyflakes.reporter 

52 

53__version__ = "2.3.3" 

54 

55 

56_LOGGER = logging.getLogger("autoflake") 

57_LOGGER.propagate = False 

58 

59ATOMS = frozenset([tokenize.NAME, tokenize.NUMBER, tokenize.STRING]) 

60 

61EXCEPT_REGEX = re.compile(r"^\s*except [\s,()\w]+ as \w+:$") 

62PYTHON_SHEBANG_REGEX = re.compile(r"^#!.*\bpython[3]?\b\s*$") 

63 

64MAX_PYTHON_FILE_DETECTION_BYTES = 1024 

65 

66IGNORE_COMMENT_REGEX = re.compile( 

67 r"\s*#\s{1,}autoflake:\s{1,}\bskip_file\b", 

68 re.MULTILINE, 

69) 

70 

71 

72def standard_paths() -> Iterable[str]: 

73 """Yield paths to standard modules.""" 

74 paths = sysconfig.get_paths() 

75 path_names = ("stdlib", "platstdlib") 

76 for path_name in path_names: 

77 # Yield lib paths. 

78 if path_name in paths: 

79 path = paths[path_name] 

80 if os.path.isdir(path): 

81 yield from os.listdir(path) 

82 

83 # Yield lib-dynload paths. 

84 dynload_path = os.path.join(path, "lib-dynload") 

85 if os.path.isdir(dynload_path): 

86 yield from os.listdir(dynload_path) 

87 

88 

89def standard_package_names() -> Iterable[str]: 

90 """Yield standard module names.""" 

91 for name in standard_paths(): 

92 if name.startswith("_") or "-" in name: 

93 continue 

94 

95 if "." in name and not name.endswith(("so", "py", "pyc")): 

96 continue 

97 

98 yield name.split(".")[0] 

99 

100 

101IMPORTS_WITH_SIDE_EFFECTS = {"antigravity", "rlcompleter", "this"} 

102 

103# In case they are built into CPython. 

104BINARY_IMPORTS = { 

105 "datetime", 

106 "grp", 

107 "io", 

108 "json", 

109 "math", 

110 "multiprocessing", 

111 "parser", 

112 "pwd", 

113 "string", 

114 "operator", 

115 "os", 

116 "sys", 

117 "time", 

118} 

119 

120SAFE_IMPORTS = ( 

121 frozenset(standard_package_names()) - IMPORTS_WITH_SIDE_EFFECTS | BINARY_IMPORTS 

122) 

123 

124 

125def unused_import_line_numbers( 

126 messages: Iterable[pyflakes.messages.Message], 

127) -> Iterable[int]: 

128 """Yield line numbers of unused imports.""" 

129 for message in messages: 

130 if isinstance(message, pyflakes.messages.UnusedImport): 

131 yield message.lineno 

132 

133 

134def unused_import_module_name( 

135 messages: Iterable[pyflakes.messages.Message], 

136) -> Iterable[tuple[int, str]]: 

137 """Yield line number and module name of unused imports.""" 

138 pattern = re.compile(r"\'(.+?)\'") 

139 for message in messages: 

140 if isinstance(message, pyflakes.messages.UnusedImport): 

141 module_name = pattern.search(str(message)) 

142 if module_name: 

143 module_name = module_name.group()[1:-1] 

144 yield (message.lineno, module_name) 

145 

146 

147def star_import_used_line_numbers( 

148 messages: Iterable[pyflakes.messages.Message], 

149) -> Iterable[int]: 

150 """Yield line number of star import usage.""" 

151 for message in messages: 

152 if isinstance(message, pyflakes.messages.ImportStarUsed): 

153 yield message.lineno 

154 

155 

156def star_import_usage_undefined_name( 

157 messages: Iterable[pyflakes.messages.Message], 

158) -> Iterable[tuple[int, str, str]]: 

159 """Yield line number, undefined name, and its possible origin module.""" 

160 for message in messages: 

161 if isinstance(message, pyflakes.messages.ImportStarUsage): 

162 undefined_name = message.message_args[0] 

163 module_name = message.message_args[1] 

164 yield (message.lineno, undefined_name, module_name) 

165 

166 

167def unused_variable_line_numbers( 

168 messages: Iterable[pyflakes.messages.Message], 

169) -> Iterable[int]: 

170 """Yield line numbers of unused variables.""" 

171 for message in messages: 

172 if isinstance(message, pyflakes.messages.UnusedVariable): 

173 yield message.lineno 

174 

175 

176def duplicate_key_line_numbers( 

177 messages: Iterable[pyflakes.messages.Message], 

178 source: str, 

179) -> Iterable[int]: 

180 """Yield line numbers of duplicate keys.""" 

181 messages = [ 

182 message 

183 for message in messages 

184 if isinstance(message, pyflakes.messages.MultiValueRepeatedKeyLiteral) 

185 ] 

186 

187 if messages: 

188 # Filter out complex cases. We don't want to bother trying to parse 

189 # this stuff and get it right. We can do it on a key-by-key basis. 

190 

191 key_to_messages = create_key_to_messages_dict(messages) 

192 

193 lines = source.split("\n") 

194 

195 for key, messages in key_to_messages.items(): 

196 good = True 

197 for message in messages: 

198 line = lines[message.lineno - 1] 

199 key = message.message_args[0] 

200 

201 if not dict_entry_has_key(line, key): 

202 good = False 

203 

204 if good: 

205 for message in messages: 

206 yield message.lineno 

207 

208 

209def create_key_to_messages_dict( 

210 messages: Iterable[pyflakes.messages.MultiValueRepeatedKeyLiteral], 

211) -> Mapping[Any, Iterable[pyflakes.messages.MultiValueRepeatedKeyLiteral]]: 

212 """Return dict mapping the key to list of messages.""" 

213 dictionary: dict[ 

214 Any, 

215 list[pyflakes.messages.MultiValueRepeatedKeyLiteral], 

216 ] = collections.defaultdict(list) 

217 for message in messages: 

218 dictionary[message.message_args[0]].append(message) 

219 return dictionary 

220 

221 

222def check(source: str) -> Iterable[pyflakes.messages.Message]: 

223 """Return messages from pyflakes.""" 

224 reporter = ListReporter() 

225 try: 

226 pyflakes.api.check(source, filename="<string>", reporter=reporter) 

227 except (AttributeError, RecursionError, UnicodeDecodeError): 

228 pass 

229 return reporter.messages 

230 

231 

232class StubFile: 

233 """Stub out file for pyflakes.""" 

234 

235 def write(self, *_: Any) -> None: 

236 """Stub out.""" 

237 

238 

239class ListReporter(pyflakes.reporter.Reporter): 

240 """Accumulate messages in messages list.""" 

241 

242 def __init__(self) -> None: 

243 """Initialize. 

244 

245 Ignore errors from Reporter. 

246 """ 

247 ignore = StubFile() 

248 pyflakes.reporter.Reporter.__init__(self, ignore, ignore) 

249 self.messages: list[pyflakes.messages.Message] = [] 

250 

251 def flake(self, message: pyflakes.messages.Message) -> None: 

252 """Accumulate messages.""" 

253 self.messages.append(message) 

254 

255 

256def extract_package_name(line: str) -> str | None: 

257 """Return package name in import statement.""" 

258 assert "\\" not in line 

259 assert "(" not in line 

260 assert ")" not in line 

261 assert ";" not in line 

262 

263 if line.lstrip().startswith(("import", "from")): 

264 parts = line.split() 

265 if len(parts) < 2: 

266 return None 

267 word = parts[1] 

268 else: 

269 # Ignore doctests. 

270 return None 

271 

272 package = word.split(".")[0] 

273 assert " " not in package 

274 

275 return package 

276 

277 

278def multiline_import(line: str, previous_line: str = "") -> bool: 

279 """Return True if import is spans multiples lines.""" 

280 for symbol in "()": 

281 if symbol in line: 

282 return True 

283 

284 return multiline_statement(line, previous_line) 

285 

286 

287def multiline_statement(line: str, previous_line: str = "") -> bool: 

288 """Return True if this is part of a multiline statement.""" 

289 for symbol in "\\:;": 

290 if symbol in line: 

291 return True 

292 

293 sio = io.StringIO(line) 

294 try: 

295 list(tokenize.generate_tokens(sio.readline)) 

296 return previous_line.rstrip().endswith("\\") 

297 except (SyntaxError, tokenize.TokenError): 

298 return True 

299 

300 

301class PendingFix: 

302 """Allows a rewrite operation to span multiple lines. 

303 

304 In the main rewrite loop, every time a helper function returns a 

305 ``PendingFix`` object instead of a string, this object will be called 

306 with the following line. 

307 """ 

308 

309 def __init__(self, line: str) -> None: 

310 """Analyse and store the first line.""" 

311 self.accumulator = collections.deque([line]) 

312 

313 def __call__(self, line: str) -> PendingFix | str: 

314 """Process line considering the accumulator. 

315 

316 Return self to keep processing the following lines or a string 

317 with the final result of all the lines processed at once. 

318 """ 

319 raise NotImplementedError("Abstract method needs to be overwritten") 

320 

321 

322def _valid_char_in_line(char: str, line: str) -> bool: 

323 """Return True if a char appears in the line and is not commented.""" 

324 comment_index = line.find("#") 

325 char_index = line.find(char) 

326 valid_char_in_line = char_index >= 0 and ( 

327 comment_index > char_index or comment_index < 0 

328 ) 

329 return valid_char_in_line 

330 

331 

332def _top_module(module_name: str) -> str: 

333 """Return the name of the top level module in the hierarchy.""" 

334 if module_name[0] == ".": 

335 return "%LOCAL_MODULE%" 

336 return module_name.split(".")[0] 

337 

338 

339def _modules_to_remove( 

340 unused_modules: Iterable[str], 

341 safe_to_remove: Iterable[str] = SAFE_IMPORTS, 

342) -> Iterable[str]: 

343 """Discard unused modules that are not safe to remove from the list.""" 

344 return [x for x in unused_modules if _top_module(x) in safe_to_remove] 

345 

346 

347def _segment_module(segment: str) -> str: 

348 """Extract the module identifier inside the segment. 

349 

350 It might be the case the segment does not have a module (e.g. is composed 

351 just by a parenthesis or line continuation and whitespace). In this 

352 scenario we just keep the segment... These characters are not valid in 

353 identifiers, so they will never be contained in the list of unused modules 

354 anyway. 

355 """ 

356 return segment.strip(string.whitespace + ",\\()") or segment 

357 

358 

359class FilterMultilineImport(PendingFix): 

360 """Remove unused imports from multiline import statements. 

361 

362 This class handles both the cases: "from imports" and "direct imports". 

363 

364 Some limitations exist (e.g. imports with comments, lines joined by ``;``, 

365 etc). In these cases, the statement is left unchanged to avoid problems. 

366 """ 

367 

368 IMPORT_RE = re.compile(r"\bimport\b\s*") 

369 INDENTATION_RE = re.compile(r"^\s*") 

370 BASE_RE = re.compile(r"\bfrom\s+([^ ]+)") 

371 SEGMENT_RE = re.compile( 

372 r"([^,\s]+(?:[\s\\]+as[\s\\]+[^,\s]+)?[,\s\\)]*)", 

373 re.M, 

374 ) 

375 # ^ module + comma + following space (including new line and continuation) 

376 IDENTIFIER_RE = re.compile(r"[^,\s]+") 

377 

378 def __init__( 

379 self, 

380 line: str, 

381 unused_module: Iterable[str] = (), 

382 remove_all_unused_imports: bool = False, 

383 safe_to_remove: Iterable[str] = SAFE_IMPORTS, 

384 previous_line: str = "", 

385 ): 

386 """Receive the same parameters as ``filter_unused_import``.""" 

387 self.remove: Iterable[str] = unused_module 

388 self.parenthesized: bool = "(" in line 

389 self.from_, imports = self.IMPORT_RE.split(line, maxsplit=1) 

390 match = self.BASE_RE.search(self.from_) 

391 self.base = match.group(1) if match else None 

392 self.give_up: bool = False 

393 

394 if not remove_all_unused_imports: 

395 if self.base and _top_module(self.base) not in safe_to_remove: 

396 self.give_up = True 

397 else: 

398 self.remove = _modules_to_remove(self.remove, safe_to_remove) 

399 

400 if "\\" in previous_line: 

401 # Ignore tricky things like "try: \<new line> import" ... 

402 self.give_up = True 

403 

404 self.analyze(line) 

405 

406 PendingFix.__init__(self, imports) 

407 

408 def is_over(self, line: str | None = None) -> bool: 

409 """Return True if the multiline import statement is over.""" 

410 line = line or self.accumulator[-1] 

411 

412 if self.parenthesized: 

413 return _valid_char_in_line(")", line) 

414 

415 return not _valid_char_in_line("\\", line) 

416 

417 def analyze(self, line: str) -> None: 

418 """Decide if the statement will be fixed or left unchanged.""" 

419 if any(ch in line for ch in ";:#"): 

420 self.give_up = True 

421 

422 def fix(self, accumulated: Iterable[str]) -> str: 

423 """Given a collection of accumulated lines, fix the entire import.""" 

424 old_imports = "".join(accumulated) 

425 ending = get_line_ending(old_imports) 

426 # Split imports into segments that contain the module name + 

427 # comma + whitespace and eventual <newline> \ ( ) chars 

428 segments = [x for x in self.SEGMENT_RE.findall(old_imports) if x] 

429 modules = [_segment_module(x) for x in segments] 

430 keep = _filter_imports(modules, self.base, self.remove) 

431 

432 # Short-circuit if no import was discarded 

433 if len(keep) == len(segments): 

434 return self.from_ + "import " + "".join(accumulated) 

435 

436 fixed = "" 

437 if keep: 

438 # Since it is very difficult to deal with all the line breaks and 

439 # continuations, let's use the code layout that already exists and 

440 # just replace the module identifiers inside the first N-1 segments 

441 # + the last segment 

442 templates = list(zip(modules, segments)) 

443 templates = templates[: len(keep) - 1] + templates[-1:] 

444 # It is important to keep the last segment, since it might contain 

445 # important chars like `)` 

446 fixed = "".join( 

447 template.replace(module, keep[i]) 

448 for i, (module, template) in enumerate(templates) 

449 ) 

450 

451 # Fix the edge case: inline parenthesis + just one surviving import 

452 if self.parenthesized and any(ch not in fixed for ch in "()"): 

453 fixed = fixed.strip(string.whitespace + "()") + ending 

454 

455 # Replace empty imports with a "pass" statement 

456 empty = len(fixed.strip(string.whitespace + "\\(),")) < 1 

457 if empty: 

458 match = self.INDENTATION_RE.search(self.from_) 

459 assert match is not None 

460 indentation = match.group(0) 

461 return indentation + "pass" + ending 

462 

463 return self.from_ + "import " + fixed 

464 

465 def __call__(self, line: str | None = None) -> PendingFix | str: 

466 """Accumulate all the lines in the import and then trigger the fix.""" 

467 if line: 

468 self.accumulator.append(line) 

469 self.analyze(line) 

470 if not self.is_over(line): 

471 return self 

472 if self.give_up: 

473 return self.from_ + "import " + "".join(self.accumulator) 

474 

475 return self.fix(self.accumulator) 

476 

477 

478def _filter_imports( 

479 imports: Iterable[str], 

480 parent: str | None = None, 

481 unused_module: Iterable[str] = (), 

482) -> Sequence[str]: 

483 # We compare full module name (``a.module`` not `module`) to 

484 # guarantee the exact same module as detected from pyflakes. 

485 sep = "" if parent and parent[-1] == "." else "." 

486 

487 def full_name(name: str) -> str: 

488 return name if parent is None else parent + sep + name 

489 

490 return [x for x in imports if full_name(x) not in unused_module] 

491 

492 

493def filter_from_import(line: str, unused_module: Iterable[str]) -> str: 

494 """Parse and filter ``from something import a, b, c``. 

495 

496 Return line without unused import modules, or `pass` if all of the 

497 module in import is unused. 

498 """ 

499 indentation, imports = re.split( 

500 pattern=r"\bimport\b", 

501 string=line, 

502 maxsplit=1, 

503 ) 

504 match = re.search( 

505 pattern=r"\bfrom\s+([^ ]+)", 

506 string=indentation, 

507 ) 

508 assert match is not None 

509 base_module = match.group(1) 

510 

511 imports = re.split(pattern=r"\s*,\s*", string=imports.strip()) 

512 filtered_imports = _filter_imports(imports, base_module, unused_module) 

513 

514 # All of the import in this statement is unused 

515 if not filtered_imports: 

516 return get_indentation(line) + "pass" + get_line_ending(line) 

517 

518 indentation += "import " 

519 

520 return indentation + ", ".join(filtered_imports) + get_line_ending(line) 

521 

522 

523def break_up_import(line: str) -> str: 

524 """Return line with imports on separate lines.""" 

525 assert "\\" not in line 

526 assert "(" not in line 

527 assert ")" not in line 

528 assert ";" not in line 

529 assert "#" not in line 

530 assert not line.lstrip().startswith("from") 

531 

532 newline = get_line_ending(line) 

533 if not newline: 

534 return line 

535 

536 indentation, imports = re.split( 

537 pattern=r"\bimport\b", 

538 string=line, 

539 maxsplit=1, 

540 ) 

541 

542 indentation += "import " 

543 assert newline 

544 

545 return "".join( 

546 [indentation + i.strip() + newline for i in imports.split(",")], 

547 ) 

548 

549 

550def filter_code( 

551 source: str, 

552 additional_imports: Iterable[str] | None = None, 

553 expand_star_imports: bool = False, 

554 remove_all_unused_imports: bool = False, 

555 remove_duplicate_keys: bool = False, 

556 remove_unused_variables: bool = False, 

557 remove_rhs_for_unused_variables: bool = False, 

558 ignore_init_module_imports: bool = False, 

559) -> Iterable[str]: 

560 """Yield code with unused imports removed.""" 

561 imports = SAFE_IMPORTS 

562 if additional_imports: 

563 imports |= frozenset(additional_imports) 

564 del additional_imports 

565 

566 messages = check(source) 

567 

568 if ignore_init_module_imports: 

569 marked_import_line_numbers: frozenset[int] = frozenset() 

570 else: 

571 marked_import_line_numbers = frozenset( 

572 unused_import_line_numbers(messages), 

573 ) 

574 marked_unused_module: dict[int, list[str]] = collections.defaultdict(list) 

575 for line_number, module_name in unused_import_module_name(messages): 

576 marked_unused_module[line_number].append(module_name) 

577 

578 undefined_names: list[str] = [] 

579 if expand_star_imports and not ( 

580 # See explanations in #18. 

581 re.search(r"\b__all__\b", source) or re.search(r"\bdel\b", source) 

582 ): 

583 marked_star_import_line_numbers = frozenset( 

584 star_import_used_line_numbers(messages), 

585 ) 

586 if len(marked_star_import_line_numbers) > 1: 

587 # Auto expanding only possible for single star import 

588 marked_star_import_line_numbers = frozenset() 

589 else: 

590 for line_number, undefined_name, _ in star_import_usage_undefined_name( 

591 messages, 

592 ): 

593 undefined_names.append(undefined_name) 

594 if not undefined_names: 

595 marked_star_import_line_numbers = frozenset() 

596 else: 

597 marked_star_import_line_numbers = frozenset() 

598 

599 if remove_unused_variables: 

600 marked_variable_line_numbers = frozenset( 

601 unused_variable_line_numbers(messages), 

602 ) 

603 else: 

604 marked_variable_line_numbers = frozenset() 

605 

606 if remove_duplicate_keys: 

607 marked_key_line_numbers: frozenset[int] = frozenset( 

608 duplicate_key_line_numbers(messages, source), 

609 ) 

610 else: 

611 marked_key_line_numbers = frozenset() 

612 

613 line_messages = get_messages_by_line(messages) 

614 

615 sio = io.StringIO(source) 

616 previous_line = "" 

617 result: str | PendingFix = "" 

618 for line_number, line in enumerate(sio.readlines(), start=1): 

619 if isinstance(result, PendingFix): 

620 result = result(line) 

621 elif "#" in line: 

622 result = line 

623 elif line_number in marked_import_line_numbers: 

624 result = filter_unused_import( 

625 line, 

626 unused_module=marked_unused_module[line_number], 

627 remove_all_unused_imports=remove_all_unused_imports, 

628 imports=imports, 

629 previous_line=previous_line, 

630 ) 

631 elif line_number in marked_variable_line_numbers: 

632 result = filter_unused_variable( 

633 line, 

634 drop_rhs=remove_rhs_for_unused_variables, 

635 ) 

636 elif line_number in marked_key_line_numbers: 

637 result = filter_duplicate_key( 

638 line, 

639 line_messages[line_number], 

640 line_number, 

641 marked_key_line_numbers, 

642 source, 

643 ) 

644 elif line_number in marked_star_import_line_numbers: 

645 result = filter_star_import(line, undefined_names) 

646 else: 

647 result = line 

648 

649 if not isinstance(result, PendingFix): 

650 yield result 

651 

652 previous_line = line 

653 

654 

655def get_messages_by_line( 

656 messages: Iterable[pyflakes.messages.Message], 

657) -> Mapping[int, pyflakes.messages.Message]: 

658 """Return dictionary that maps line number to message.""" 

659 line_messages: dict[int, pyflakes.messages.Message] = {} 

660 for message in messages: 

661 line_messages[message.lineno] = message 

662 return line_messages 

663 

664 

665def filter_star_import( 

666 line: str, 

667 marked_star_import_undefined_name: Iterable[str], 

668) -> str: 

669 """Return line with the star import expanded.""" 

670 undefined_name = sorted(set(marked_star_import_undefined_name)) 

671 return re.sub(r"\*", ", ".join(undefined_name), line) 

672 

673 

674def filter_unused_import( 

675 line: str, 

676 unused_module: Iterable[str], 

677 remove_all_unused_imports: bool, 

678 imports: Iterable[str], 

679 previous_line: str = "", 

680) -> PendingFix | str: 

681 """Return line if used, otherwise return None.""" 

682 # Ignore doctests. 

683 if line.lstrip().startswith(">"): 

684 return line 

685 

686 if multiline_import(line, previous_line): 

687 filt = FilterMultilineImport( 

688 line, 

689 unused_module, 

690 remove_all_unused_imports, 

691 imports, 

692 previous_line, 

693 ) 

694 return filt() 

695 

696 is_from_import = line.lstrip().startswith("from") 

697 

698 if "," in line and not is_from_import: 

699 return break_up_import(line) 

700 

701 package = extract_package_name(line) 

702 if not remove_all_unused_imports and package is not None and package not in imports: 

703 return line 

704 

705 if "," in line: 

706 assert is_from_import 

707 return filter_from_import(line, unused_module) 

708 else: 

709 # We need to replace import with "pass" in case the import is the 

710 # only line inside a block. For example, 

711 # "if True:\n import os". In such cases, if the import is 

712 # removed, the block will be left hanging with no body. 

713 return get_indentation(line) + "pass" + get_line_ending(line) 

714 

715 

716def filter_unused_variable( 

717 line: str, 

718 previous_line: str = "", 

719 drop_rhs: bool = False, 

720) -> str: 

721 """Return line if used, otherwise return None.""" 

722 if re.match(EXCEPT_REGEX, line): 

723 return re.sub(r" as \w+:$", ":", line, count=1) 

724 elif multiline_statement(line, previous_line): 

725 return line 

726 elif line.count("=") == 1: 

727 split_line = line.split("=") 

728 assert len(split_line) == 2 

729 value = split_line[1].lstrip() 

730 if "," in split_line[0]: 

731 return line 

732 

733 if is_literal_or_name(value): 

734 # Rather than removing the line, replace with it "pass" to avoid 

735 # a possible hanging block with no body. 

736 value = "pass" + get_line_ending(line) 

737 if drop_rhs: 

738 return get_indentation(line) + value 

739 

740 if drop_rhs: 

741 return "" 

742 return get_indentation(line) + value 

743 else: 

744 return line 

745 

746 

747def filter_duplicate_key( 

748 line: str, 

749 message: pyflakes.messages.Message, 

750 line_number: int, 

751 marked_line_numbers: Iterable[int], 

752 source: str, 

753 previous_line: str = "", 

754) -> str: 

755 """Return '' if first occurrence of the key otherwise return `line`.""" 

756 if marked_line_numbers and line_number == sorted(marked_line_numbers)[0]: 

757 return "" 

758 

759 return line 

760 

761 

762def dict_entry_has_key(line: str, key: Any) -> bool: 

763 """Return True if `line` is a dict entry that uses `key`. 

764 

765 Return False for multiline cases where the line should not be removed by 

766 itself. 

767 

768 """ 

769 if "#" in line: 

770 return False 

771 

772 result = re.match(r"\s*(.*)\s*:\s*(.*),\s*$", line) 

773 if not result: 

774 return False 

775 

776 try: 

777 candidate_key = ast.literal_eval(result.group(1)) 

778 except (SyntaxError, ValueError): 

779 return False 

780 

781 if multiline_statement(result.group(2)): 

782 return False 

783 

784 return cast(bool, candidate_key == key) 

785 

786 

787def is_literal_or_name(value: str) -> bool: 

788 """Return True if value is a literal or a name.""" 

789 try: 

790 ast.literal_eval(value) 

791 return True 

792 except (SyntaxError, TypeError, ValueError): 

793 pass 

794 

795 if value.strip() in ["dict()", "list()", "set()"]: 

796 return True 

797 

798 # Support removal of variables on the right side. But make sure 

799 # there are no dots, which could mean an access of a property. 

800 return re.match(r"^\w+\s*$", value) is not None 

801 

802 

803def useless_pass_line_numbers( 

804 source: str, 

805 ignore_pass_after_docstring: bool = False, 

806) -> Iterable[int]: 

807 """Yield line numbers of unneeded "pass" statements.""" 

808 sio = io.StringIO(source) 

809 previous_token_type = None 

810 last_pass_row = None 

811 last_pass_indentation = None 

812 previous_line = "" 

813 previous_non_empty_line = "" 

814 for token in tokenize.generate_tokens(sio.readline): 

815 token_type = token[0] 

816 start_row = token[2][0] 

817 line = token[4] 

818 

819 is_pass = token_type == tokenize.NAME and line.strip() == "pass" 

820 

821 # Leading "pass". 

822 if ( 

823 start_row - 1 == last_pass_row 

824 and get_indentation(line) == last_pass_indentation 

825 and token_type in ATOMS 

826 and not is_pass 

827 ): 

828 yield start_row - 1 

829 

830 if is_pass: 

831 last_pass_row = start_row 

832 last_pass_indentation = get_indentation(line) 

833 

834 is_trailing_pass = ( 

835 previous_token_type != tokenize.INDENT 

836 and not previous_line.rstrip().endswith("\\") 

837 ) 

838 

839 is_pass_after_docstring = previous_non_empty_line.rstrip().endswith( 

840 ("'''", '"""'), 

841 ) 

842 

843 # Trailing "pass". 

844 if is_trailing_pass: 

845 if is_pass_after_docstring and ignore_pass_after_docstring: 

846 continue 

847 else: 

848 yield start_row 

849 

850 previous_token_type = token_type 

851 previous_line = line 

852 if line.strip(): 

853 previous_non_empty_line = line 

854 

855 

856def filter_useless_pass( 

857 source: str, 

858 ignore_pass_statements: bool = False, 

859 ignore_pass_after_docstring: bool = False, 

860) -> Iterable[str]: 

861 """Yield code with useless "pass" lines removed.""" 

862 if ignore_pass_statements: 

863 marked_lines: frozenset[int] = frozenset() 

864 else: 

865 try: 

866 marked_lines = frozenset( 

867 useless_pass_line_numbers( 

868 source, 

869 ignore_pass_after_docstring, 

870 ), 

871 ) 

872 except (SyntaxError, tokenize.TokenError): 

873 marked_lines = frozenset() 

874 

875 sio = io.StringIO(source) 

876 for line_number, line in enumerate(sio.readlines(), start=1): 

877 if line_number not in marked_lines: 

878 yield line 

879 

880 

881def get_indentation(line: str) -> str: 

882 """Return leading whitespace.""" 

883 if line.strip(): 

884 non_whitespace_index = len(line) - len(line.lstrip()) 

885 return line[:non_whitespace_index] 

886 else: 

887 return "" 

888 

889 

890def get_line_ending(line: str) -> str: 

891 """Return line ending.""" 

892 non_whitespace_index = len(line.rstrip()) - len(line) 

893 if not non_whitespace_index: 

894 return "" 

895 else: 

896 return line[non_whitespace_index:] 

897 

898 

899def fix_code( 

900 source: str, 

901 additional_imports: Iterable[str] | None = None, 

902 expand_star_imports: bool = False, 

903 remove_all_unused_imports: bool = False, 

904 remove_duplicate_keys: bool = False, 

905 remove_unused_variables: bool = False, 

906 remove_rhs_for_unused_variables: bool = False, 

907 ignore_init_module_imports: bool = False, 

908 ignore_pass_statements: bool = False, 

909 ignore_pass_after_docstring: bool = False, 

910) -> str: 

911 """Return code with all filtering run on it.""" 

912 if not source: 

913 return source 

914 

915 if IGNORE_COMMENT_REGEX.search(source): 

916 return source 

917 

918 # pyflakes does not handle "nonlocal" correctly. 

919 if "nonlocal" in source: 

920 remove_unused_variables = False 

921 

922 filtered_source = None 

923 while True: 

924 filtered_source = "".join( 

925 filter_useless_pass( 

926 "".join( 

927 filter_code( 

928 source, 

929 additional_imports=additional_imports, 

930 expand_star_imports=expand_star_imports, 

931 remove_all_unused_imports=remove_all_unused_imports, 

932 remove_duplicate_keys=remove_duplicate_keys, 

933 remove_unused_variables=remove_unused_variables, 

934 remove_rhs_for_unused_variables=( 

935 remove_rhs_for_unused_variables 

936 ), 

937 ignore_init_module_imports=ignore_init_module_imports, 

938 ), 

939 ), 

940 ignore_pass_statements=ignore_pass_statements, 

941 ignore_pass_after_docstring=ignore_pass_after_docstring, 

942 ), 

943 ) 

944 

945 if filtered_source == source: 

946 break 

947 source = filtered_source 

948 

949 return filtered_source 

950 

951 

952def fix_file( 

953 filename: str, 

954 args: Mapping[str, Any], 

955 standard_out: IO[str] | None = None, 

956) -> int: 

957 """Run fix_code() on a file.""" 

958 if standard_out is None: 

959 standard_out = sys.stdout 

960 encoding = detect_encoding(filename) 

961 with open_with_encoding(filename, encoding=encoding) as input_file: 

962 return _fix_file( 

963 input_file, 

964 filename, 

965 args, 

966 args["write_to_stdout"], 

967 cast(IO[str], standard_out), 

968 encoding=encoding, 

969 ) 

970 

971 

972def _fix_file( 

973 input_file: IO[str], 

974 filename: str, 

975 args: Mapping[str, Any], 

976 write_to_stdout: bool, 

977 standard_out: IO[str], 

978 encoding: str | None = None, 

979) -> int: 

980 source = input_file.read() 

981 original_source = source 

982 

983 isInitFile = os.path.basename(filename) == "__init__.py" 

984 

985 if args["ignore_init_module_imports"] and isInitFile: 

986 ignore_init_module_imports = True 

987 else: 

988 ignore_init_module_imports = False 

989 

990 filtered_source = fix_code( 

991 source, 

992 additional_imports=(args["imports"].split(",") if "imports" in args else None), 

993 expand_star_imports=args["expand_star_imports"], 

994 remove_all_unused_imports=args["remove_all_unused_imports"], 

995 remove_duplicate_keys=args["remove_duplicate_keys"], 

996 remove_unused_variables=args["remove_unused_variables"], 

997 remove_rhs_for_unused_variables=(args["remove_rhs_for_unused_variables"]), 

998 ignore_init_module_imports=ignore_init_module_imports, 

999 ignore_pass_statements=args["ignore_pass_statements"], 

1000 ignore_pass_after_docstring=args["ignore_pass_after_docstring"], 

1001 ) 

1002 

1003 if original_source != filtered_source: 

1004 if args["check"]: 

1005 standard_out.write( 

1006 f"{filename}: Unused imports/variables detected{os.linesep}", 

1007 ) 

1008 return 1 

1009 if args["check_diff"]: 

1010 diff = get_diff_text( 

1011 io.StringIO(original_source).readlines(), 

1012 io.StringIO(filtered_source).readlines(), 

1013 filename, 

1014 ) 

1015 standard_out.write("".join(diff)) 

1016 return 1 

1017 if write_to_stdout: 

1018 standard_out.write(filtered_source) 

1019 elif args["in_place"]: 

1020 with open_with_encoding( 

1021 filename, 

1022 mode="w", 

1023 encoding=encoding, 

1024 ) as output_file: 

1025 output_file.write(filtered_source) 

1026 _LOGGER.info("Fixed %s", filename) 

1027 else: 

1028 diff = get_diff_text( 

1029 io.StringIO(original_source).readlines(), 

1030 io.StringIO(filtered_source).readlines(), 

1031 filename, 

1032 ) 

1033 standard_out.write("".join(diff)) 

1034 elif write_to_stdout: 

1035 standard_out.write(filtered_source) 

1036 else: 

1037 if (args["check"] or args["check_diff"]) and not args["quiet"]: 

1038 standard_out.write(f"{filename}: No issues detected!{os.linesep}") 

1039 else: 

1040 _LOGGER.debug("Clean %s: nothing to fix", filename) 

1041 

1042 return 0 

1043 

1044 

1045def open_with_encoding( 

1046 filename: str, 

1047 encoding: str | None, 

1048 mode: str = "r", 

1049 limit_byte_check: int = -1, 

1050) -> IO[str]: 

1051 """Return opened file with a specific encoding.""" 

1052 if not encoding: 

1053 encoding = detect_encoding(filename, limit_byte_check=limit_byte_check) 

1054 

1055 return open( 

1056 filename, 

1057 mode=mode, 

1058 encoding=encoding, 

1059 newline="", # Preserve line endings 

1060 ) 

1061 

1062 

1063def detect_encoding(filename: str, limit_byte_check: int = -1) -> str: 

1064 """Return file encoding.""" 

1065 try: 

1066 with open(filename, "rb") as input_file: 

1067 encoding = _detect_encoding(input_file.readline) 

1068 

1069 # Check for correctness of encoding. 

1070 with open_with_encoding(filename, encoding) as input_file: 

1071 input_file.read(limit_byte_check) 

1072 

1073 return encoding 

1074 except (LookupError, SyntaxError, UnicodeDecodeError): 

1075 return "latin-1" 

1076 

1077 

1078def _detect_encoding(readline: Callable[[], bytes]) -> str: 

1079 """Return file encoding.""" 

1080 try: 

1081 encoding = tokenize.detect_encoding(readline)[0] 

1082 return encoding 

1083 except (LookupError, SyntaxError, UnicodeDecodeError): 

1084 return "latin-1" 

1085 

1086 

1087def get_diff_text(old: Sequence[str], new: Sequence[str], filename: str) -> str: 

1088 """Return text of unified diff between old and new.""" 

1089 newline = "\n" 

1090 diff = difflib.unified_diff( 

1091 old, 

1092 new, 

1093 "original/" + filename, 

1094 "fixed/" + filename, 

1095 lineterm=newline, 

1096 ) 

1097 

1098 text = "" 

1099 for line in diff: 

1100 text += line 

1101 

1102 # Work around missing newline (http://bugs.python.org/issue2142). 

1103 if not line.endswith(newline): 

1104 text += newline + r"\ No newline at end of file" + newline 

1105 

1106 return text 

1107 

1108 

1109def _split_comma_separated(string: str) -> set[str]: 

1110 """Return a set of strings.""" 

1111 return {text.strip() for text in string.split(",") if text.strip()} 

1112 

1113 

1114def is_python_file(filename: str) -> bool: 

1115 """Return True if filename is Python file.""" 

1116 if filename.endswith(".py"): 

1117 return True 

1118 

1119 try: 

1120 with open_with_encoding( 

1121 filename, 

1122 None, 

1123 limit_byte_check=MAX_PYTHON_FILE_DETECTION_BYTES, 

1124 ) as f: 

1125 text = f.read(MAX_PYTHON_FILE_DETECTION_BYTES) 

1126 if not text: 

1127 return False 

1128 first_line = text.splitlines()[0] 

1129 except (OSError, IndexError): 

1130 return False 

1131 

1132 if not PYTHON_SHEBANG_REGEX.match(first_line): 

1133 return False 

1134 

1135 return True 

1136 

1137 

1138def is_exclude_file(filename: str, exclude: Iterable[str]) -> bool: 

1139 """Return True if file matches exclude pattern.""" 

1140 base_name = os.path.basename(filename) 

1141 

1142 if base_name.startswith("."): 

1143 return True 

1144 

1145 for pattern in exclude: 

1146 if fnmatch.fnmatch(base_name, pattern): 

1147 return True 

1148 if fnmatch.fnmatch(filename, pattern): 

1149 return True 

1150 return False 

1151 

1152 

1153def match_file(filename: str, exclude: Iterable[str]) -> bool: 

1154 """Return True if file is okay for modifying/recursing.""" 

1155 if is_exclude_file(filename, exclude): 

1156 _LOGGER.debug("Skipped %s: matched to exclude pattern", filename) 

1157 return False 

1158 

1159 if not os.path.isdir(filename) and not is_python_file(filename): 

1160 return False 

1161 

1162 return True 

1163 

1164 

1165def find_files( 

1166 filenames: list[str], 

1167 recursive: bool, 

1168 exclude: Iterable[str], 

1169) -> Iterable[str]: 

1170 """Yield filenames.""" 

1171 while filenames: 

1172 name = filenames.pop(0) 

1173 if recursive and os.path.isdir(name): 

1174 for root, directories, children in os.walk(name): 

1175 filenames += [ 

1176 os.path.join(root, f) 

1177 for f in children 

1178 if match_file( 

1179 os.path.join(root, f), 

1180 exclude, 

1181 ) 

1182 ] 

1183 directories[:] = [ 

1184 d 

1185 for d in directories 

1186 if match_file( 

1187 os.path.join(root, d), 

1188 exclude, 

1189 ) 

1190 ] 

1191 else: 

1192 if not is_exclude_file(name, exclude): 

1193 yield name 

1194 else: 

1195 _LOGGER.debug("Skipped %s: matched to exclude pattern", name) 

1196 

1197 

1198def process_pyproject_toml(toml_file_path: str) -> MutableMapping[str, Any] | None: 

1199 """Extract config mapping from pyproject.toml file.""" 

1200 try: 

1201 import tomllib 

1202 except ModuleNotFoundError: 

1203 import tomli as tomllib 

1204 

1205 with open(toml_file_path, "rb") as f: 

1206 return tomllib.load(f).get("tool", {}).get("autoflake", None) 

1207 

1208 

1209def process_config_file(config_file_path: str) -> MutableMapping[str, Any] | None: 

1210 """Extract config mapping from config file.""" 

1211 import configparser 

1212 

1213 reader = configparser.ConfigParser() 

1214 reader.read(config_file_path, encoding="utf-8") 

1215 if not reader.has_section("autoflake"): 

1216 return None 

1217 

1218 return reader["autoflake"] 

1219 

1220 

1221def find_and_process_config(args: Mapping[str, Any]) -> MutableMapping[str, Any] | None: 

1222 # Configuration file parsers {filename: parser function}. 

1223 CONFIG_FILES: Mapping[str, Callable[[str], MutableMapping[str, Any] | None]] = { 

1224 "pyproject.toml": process_pyproject_toml, 

1225 "setup.cfg": process_config_file, 

1226 } 

1227 # Traverse the file tree common to all files given as argument looking for 

1228 # a configuration file 

1229 config_path = os.path.commonpath([os.path.abspath(file) for file in args["files"]]) 

1230 config: Mapping[str, Any] | None = None 

1231 while True: 

1232 for config_file, processor in CONFIG_FILES.items(): 

1233 config_file_path = os.path.join( 

1234 os.path.join(config_path, config_file), 

1235 ) 

1236 if os.path.isfile(config_file_path): 

1237 config = processor(config_file_path) 

1238 if config is not None: 

1239 break 

1240 if config is not None: 

1241 break 

1242 config_path, tail = os.path.split(config_path) 

1243 if not tail: 

1244 break 

1245 return config 

1246 

1247 

1248def merge_configuration_file( 

1249 flag_args: MutableMapping[str, Any], 

1250) -> tuple[MutableMapping[str, Any], bool]: 

1251 """Merge configuration from a file into args.""" 

1252 BOOL_TYPES = { 

1253 "1": True, 

1254 "yes": True, 

1255 "true": True, 

1256 "on": True, 

1257 "0": False, 

1258 "no": False, 

1259 "false": False, 

1260 "off": False, 

1261 } 

1262 

1263 if "config_file" in flag_args: 

1264 config_file = pathlib.Path(flag_args["config_file"]).resolve() 

1265 process_method = process_config_file 

1266 if config_file.suffix == ".toml": 

1267 process_method = process_pyproject_toml 

1268 

1269 config = process_method(str(config_file)) 

1270 

1271 if not config: 

1272 _LOGGER.error( 

1273 "can't parse config file '%s'", 

1274 config_file, 

1275 ) 

1276 return flag_args, False 

1277 else: 

1278 config = find_and_process_config(flag_args) 

1279 

1280 BOOL_FLAGS = { 

1281 "check", 

1282 "check_diff", 

1283 "expand_star_imports", 

1284 "ignore_init_module_imports", 

1285 "ignore_pass_after_docstring", 

1286 "ignore_pass_statements", 

1287 "in_place", 

1288 "quiet", 

1289 "recursive", 

1290 "remove_all_unused_imports", 

1291 "remove_duplicate_keys", 

1292 "remove_rhs_for_unused_variables", 

1293 "remove_unused_variables", 

1294 "write_to_stdout", 

1295 } 

1296 

1297 config_args: dict[str, Any] = {} 

1298 if config is not None: 

1299 for name, value in config.items(): 

1300 arg = name.replace("-", "_") 

1301 if arg in BOOL_FLAGS: 

1302 # boolean properties 

1303 if isinstance(value, str): 

1304 value = BOOL_TYPES.get(value.lower(), value) 

1305 if not isinstance(value, bool): 

1306 _LOGGER.error( 

1307 "'%s' in the config file should be a boolean", 

1308 name, 

1309 ) 

1310 return flag_args, False 

1311 config_args[arg] = value 

1312 else: 

1313 if isinstance(value, list) and all( 

1314 isinstance(val, str) for val in value 

1315 ): 

1316 value = ",".join(str(val) for val in value) 

1317 if not isinstance(value, str): 

1318 _LOGGER.error( 

1319 "'%s' in the config file should be a comma separated" 

1320 " string or list of strings", 

1321 name, 

1322 ) 

1323 return flag_args, False 

1324 

1325 config_args[arg] = value 

1326 

1327 # merge args that can be merged 

1328 merged_args = {} 

1329 mergeable_keys = {"imports", "exclude"} 

1330 for key in mergeable_keys: 

1331 values = ( 

1332 v for v in (config_args.get(key), flag_args.get(key)) if v is not None 

1333 ) 

1334 value = ",".join(values) 

1335 if value != "": 

1336 merged_args[key] = value 

1337 

1338 default_args = {arg: False for arg in BOOL_FLAGS} 

1339 return { 

1340 **default_args, 

1341 **config_args, 

1342 **flag_args, 

1343 **merged_args, 

1344 }, True 

1345 

1346 

1347def _main( 

1348 argv: Sequence[str], 

1349 standard_out: IO[str] | None, 

1350 standard_error: IO[str] | None, 

1351 standard_input: IO[str] | None = None, 

1352) -> int: 

1353 """Return exit status. 

1354 

1355 0 means no error. 

1356 """ 

1357 import argparse 

1358 

1359 parser = argparse.ArgumentParser( 

1360 description=__doc__, 

1361 prog="autoflake", 

1362 argument_default=argparse.SUPPRESS, 

1363 ) 

1364 check_group = parser.add_mutually_exclusive_group() 

1365 check_group.add_argument( 

1366 "-c", 

1367 "--check", 

1368 action="store_true", 

1369 help="return error code if changes are needed", 

1370 ) 

1371 check_group.add_argument( 

1372 "-cd", 

1373 "--check-diff", 

1374 action="store_true", 

1375 help="return error code if changes are needed, also display file diffs", 

1376 ) 

1377 

1378 imports_group = parser.add_mutually_exclusive_group() 

1379 imports_group.add_argument( 

1380 "--imports", 

1381 help="by default, only unused standard library " 

1382 "imports are removed; specify a comma-separated " 

1383 "list of additional modules/packages", 

1384 ) 

1385 imports_group.add_argument( 

1386 "--remove-all-unused-imports", 

1387 action="store_true", 

1388 help="remove all unused imports (not just those from the standard library)", 

1389 ) 

1390 

1391 parser.add_argument( 

1392 "-r", 

1393 "--recursive", 

1394 action="store_true", 

1395 help="drill down directories recursively", 

1396 ) 

1397 parser.add_argument( 

1398 "-j", 

1399 "--jobs", 

1400 type=int, 

1401 metavar="n", 

1402 default=0, 

1403 help="number of parallel jobs; match CPU count if value is 0 (default: 0)", 

1404 ) 

1405 parser.add_argument( 

1406 "--exclude", 

1407 metavar="globs", 

1408 help="exclude file/directory names that match these comma-separated globs", 

1409 ) 

1410 parser.add_argument( 

1411 "--expand-star-imports", 

1412 action="store_true", 

1413 help="expand wildcard star imports with undefined " 

1414 "names; this only triggers if there is only " 

1415 "one star import in the file; this is skipped if " 

1416 "there are any uses of `__all__` or `del` in the " 

1417 "file", 

1418 ) 

1419 parser.add_argument( 

1420 "--ignore-init-module-imports", 

1421 action="store_true", 

1422 help="exclude __init__.py when removing unused imports", 

1423 ) 

1424 parser.add_argument( 

1425 "--remove-duplicate-keys", 

1426 action="store_true", 

1427 help="remove all duplicate keys in objects", 

1428 ) 

1429 parser.add_argument( 

1430 "--remove-unused-variables", 

1431 action="store_true", 

1432 help="remove unused variables", 

1433 ) 

1434 parser.add_argument( 

1435 "--remove-rhs-for-unused-variables", 

1436 action="store_true", 

1437 help="remove RHS of statements when removing unused variables (unsafe)", 

1438 ) 

1439 parser.add_argument( 

1440 "--ignore-pass-statements", 

1441 action="store_true", 

1442 help="ignore all pass statements", 

1443 ) 

1444 parser.add_argument( 

1445 "--ignore-pass-after-docstring", 

1446 action="store_true", 

1447 help='ignore pass statements after a newline ending on \'"""\'', 

1448 ) 

1449 parser.add_argument( 

1450 "--version", 

1451 action="version", 

1452 version="%(prog)s " + __version__, 

1453 ) 

1454 parser.add_argument( 

1455 "--quiet", 

1456 action="store_true", 

1457 help="Suppress output if there are no issues", 

1458 ) 

1459 parser.add_argument( 

1460 "-v", 

1461 "--verbose", 

1462 action="count", 

1463 dest="verbosity", 

1464 default=0, 

1465 help="print more verbose logs (you can repeat `-v` to make it more verbose)", 

1466 ) 

1467 parser.add_argument( 

1468 "--stdin-display-name", 

1469 dest="stdin_display_name", 

1470 default="stdin", 

1471 help="the name used when processing input from stdin", 

1472 ) 

1473 

1474 parser.add_argument( 

1475 "--config", 

1476 dest="config_file", 

1477 help=( 

1478 "Explicitly set the config file " 

1479 "instead of auto determining based on file location" 

1480 ), 

1481 ) 

1482 

1483 parser.add_argument("files", nargs="+", help="files to format") 

1484 

1485 output_group = parser.add_mutually_exclusive_group() 

1486 output_group.add_argument( 

1487 "-i", 

1488 "--in-place", 

1489 action="store_true", 

1490 help="make changes to files instead of printing diffs", 

1491 ) 

1492 output_group.add_argument( 

1493 "-s", 

1494 "--stdout", 

1495 action="store_true", 

1496 dest="write_to_stdout", 

1497 help=( 

1498 "print changed text to stdout. defaults to true " 

1499 "when formatting stdin, or to false otherwise" 

1500 ), 

1501 ) 

1502 

1503 args: MutableMapping[str, Any] = vars(parser.parse_args(argv[1:])) 

1504 

1505 if standard_error is None: 

1506 _LOGGER.addHandler(logging.NullHandler()) 

1507 else: 

1508 _LOGGER.addHandler(logging.StreamHandler(standard_error)) 

1509 loglevels = [logging.WARNING, logging.INFO, logging.DEBUG] 

1510 try: 

1511 loglevel = loglevels[args["verbosity"]] 

1512 except IndexError: # Too much -v 

1513 loglevel = loglevels[-1] 

1514 _LOGGER.setLevel(loglevel) 

1515 

1516 args, success = merge_configuration_file(args) 

1517 if not success: 

1518 return 1 

1519 

1520 if ( 

1521 args["remove_rhs_for_unused_variables"] 

1522 and not (args["remove_unused_variables"]) 

1523 ): 

1524 _LOGGER.error( 

1525 "Using --remove-rhs-for-unused-variables only makes sense when " 

1526 "used with --remove-unused-variables", 

1527 ) 

1528 return 1 

1529 

1530 if "exclude" in args: 

1531 args["exclude"] = _split_comma_separated(args["exclude"]) 

1532 else: 

1533 args["exclude"] = set() 

1534 

1535 if args["jobs"] < 1: 

1536 worker_count = os.cpu_count() 

1537 if sys.platform == "win32": 

1538 # Work around https://bugs.python.org/issue26903 

1539 worker_count = min(worker_count, 60) 

1540 args["jobs"] = worker_count or 1 

1541 

1542 filenames = list(set(args["files"])) 

1543 

1544 # convert argparse namespace to a dict so that it can be serialized 

1545 # by multiprocessing 

1546 exit_status = 0 

1547 files = list(find_files(filenames, args["recursive"], args["exclude"])) 

1548 if ( 

1549 args["jobs"] == 1 

1550 or len(files) == 1 

1551 or args["jobs"] == 1 

1552 or "-" in files 

1553 or standard_out is not None 

1554 ): 

1555 for name in files: 

1556 if name == "-" and standard_input is not None: 

1557 exit_status |= _fix_file( 

1558 standard_input, 

1559 args["stdin_display_name"], 

1560 args=args, 

1561 write_to_stdout=True, 

1562 standard_out=standard_out or sys.stdout, 

1563 ) 

1564 else: 

1565 try: 

1566 exit_status |= fix_file( 

1567 name, 

1568 args=args, 

1569 standard_out=standard_out, 

1570 ) 

1571 except OSError as exception: 

1572 _LOGGER.error(str(exception)) 

1573 exit_status |= 1 

1574 else: 

1575 import multiprocessing 

1576 

1577 with multiprocessing.Pool(args["jobs"]) as pool: 

1578 futs = [] 

1579 for name in files: 

1580 fut = pool.apply_async(fix_file, args=(name, args)) 

1581 futs.append(fut) 

1582 for fut in futs: 

1583 try: 

1584 exit_status |= fut.get() 

1585 except OSError as exception: 

1586 _LOGGER.error(str(exception)) 

1587 exit_status |= 1 

1588 

1589 return exit_status 

1590 

1591 

1592def main() -> int: 

1593 """Command-line entry point.""" 

1594 try: 

1595 # Exit on broken pipe. 

1596 signal.signal(signal.SIGPIPE, signal.SIG_DFL) 

1597 except AttributeError: # pragma: no cover 

1598 # SIGPIPE is not available on Windows. 

1599 pass 

1600 

1601 try: 

1602 return _main( 

1603 sys.argv, 

1604 standard_out=None, 

1605 standard_error=sys.stderr, 

1606 standard_input=sys.stdin, 

1607 ) 

1608 except KeyboardInterrupt: # pragma: no cover 

1609 return 2 # pragma: no cover 

1610 

1611 

1612if __name__ == "__main__": 

1613 sys.exit(main())