Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.10/site-packages/autoflake.py: 39%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

693 statements  

1#!/usr/bin/env python 

2# Copyright (C) Steven Myint 

3# 

4# Permission is hereby granted, free of charge, to any person obtaining 

5# a copy of this software and associated documentation files (the 

6# "Software"), to deal in the Software without restriction, including 

7# without limitation the rights to use, copy, modify, merge, publish, 

8# distribute, sublicense, and/or sell copies of the Software, and to 

9# permit persons to whom the Software is furnished to do so, subject to 

10# the following conditions: 

11# 

12# The above copyright notice and this permission notice shall be included 

13# in all copies or substantial portions of the Software. 

14# 

15# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 

16# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 

17# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. 

18# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY 

19# CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, 

20# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE 

21# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. 

22"""Removes unused imports and unused variables as reported by pyflakes.""" 

23from __future__ import annotations 

24 

25import ast 

26import collections 

27import difflib 

28import fnmatch 

29import io 

30import logging 

31import os 

32import pathlib 

33import re 

34import signal 

35import string 

36import sys 

37import sysconfig 

38import tokenize 

39from collections.abc import Iterable 

40from collections.abc import Mapping 

41from collections.abc import MutableMapping 

42from collections.abc import Sequence 

43from typing import Any 

44from typing import Callable 

45from typing import cast 

46from typing import IO 

47 

48import pyflakes.api 

49import pyflakes.messages 

50import pyflakes.reporter 

51 

52 

53__version__ = "2.3.1" 

54 

55 

56_LOGGER = logging.getLogger("autoflake") 

57_LOGGER.propagate = False 

58 

59ATOMS = frozenset([tokenize.NAME, tokenize.NUMBER, tokenize.STRING]) 

60 

61EXCEPT_REGEX = re.compile(r"^\s*except [\s,()\w]+ as \w+:$") 

62PYTHON_SHEBANG_REGEX = re.compile(r"^#!.*\bpython[3]?\b\s*$") 

63 

64MAX_PYTHON_FILE_DETECTION_BYTES = 1024 

65 

66IGNORE_COMMENT_REGEX = re.compile( 

67 r"\s*#\s{1,}autoflake:\s{1,}\bskip_file\b", 

68 re.MULTILINE, 

69) 

70 

71 

72def standard_paths() -> Iterable[str]: 

73 """Yield paths to standard modules.""" 

74 paths = sysconfig.get_paths() 

75 path_names = ("stdlib", "platstdlib") 

76 for path_name in path_names: 

77 # Yield lib paths. 

78 if path_name in paths: 

79 path = paths[path_name] 

80 if os.path.isdir(path): 

81 yield from os.listdir(path) 

82 

83 # Yield lib-dynload paths. 

84 dynload_path = os.path.join(path, "lib-dynload") 

85 if os.path.isdir(dynload_path): 

86 yield from os.listdir(dynload_path) 

87 

88 

89def standard_package_names() -> Iterable[str]: 

90 """Yield standard module names.""" 

91 for name in standard_paths(): 

92 if name.startswith("_") or "-" in name: 

93 continue 

94 

95 if "." in name and not name.endswith(("so", "py", "pyc")): 

96 continue 

97 

98 yield name.split(".")[0] 

99 

100 

101IMPORTS_WITH_SIDE_EFFECTS = {"antigravity", "rlcompleter", "this"} 

102 

103# In case they are built into CPython. 

104BINARY_IMPORTS = { 

105 "datetime", 

106 "grp", 

107 "io", 

108 "json", 

109 "math", 

110 "multiprocessing", 

111 "parser", 

112 "pwd", 

113 "string", 

114 "operator", 

115 "os", 

116 "sys", 

117 "time", 

118} 

119 

120SAFE_IMPORTS = ( 

121 frozenset(standard_package_names()) - IMPORTS_WITH_SIDE_EFFECTS | BINARY_IMPORTS 

122) 

123 

124 

125def unused_import_line_numbers( 

126 messages: Iterable[pyflakes.messages.Message], 

127) -> Iterable[int]: 

128 """Yield line numbers of unused imports.""" 

129 for message in messages: 

130 if isinstance(message, pyflakes.messages.UnusedImport): 

131 yield message.lineno 

132 

133 

134def unused_import_module_name( 

135 messages: Iterable[pyflakes.messages.Message], 

136) -> Iterable[tuple[int, str]]: 

137 """Yield line number and module name of unused imports.""" 

138 pattern = re.compile(r"\'(.+?)\'") 

139 for message in messages: 

140 if isinstance(message, pyflakes.messages.UnusedImport): 

141 module_name = pattern.search(str(message)) 

142 if module_name: 

143 module_name = module_name.group()[1:-1] 

144 yield (message.lineno, module_name) 

145 

146 

147def star_import_used_line_numbers( 

148 messages: Iterable[pyflakes.messages.Message], 

149) -> Iterable[int]: 

150 """Yield line number of star import usage.""" 

151 for message in messages: 

152 if isinstance(message, pyflakes.messages.ImportStarUsed): 

153 yield message.lineno 

154 

155 

156def star_import_usage_undefined_name( 

157 messages: Iterable[pyflakes.messages.Message], 

158) -> Iterable[tuple[int, str, str]]: 

159 """Yield line number, undefined name, and its possible origin module.""" 

160 for message in messages: 

161 if isinstance(message, pyflakes.messages.ImportStarUsage): 

162 undefined_name = message.message_args[0] 

163 module_name = message.message_args[1] 

164 yield (message.lineno, undefined_name, module_name) 

165 

166 

167def unused_variable_line_numbers( 

168 messages: Iterable[pyflakes.messages.Message], 

169) -> Iterable[int]: 

170 """Yield line numbers of unused variables.""" 

171 for message in messages: 

172 if isinstance(message, pyflakes.messages.UnusedVariable): 

173 yield message.lineno 

174 

175 

176def duplicate_key_line_numbers( 

177 messages: Iterable[pyflakes.messages.Message], 

178 source: str, 

179) -> Iterable[int]: 

180 """Yield line numbers of duplicate keys.""" 

181 messages = [ 

182 message 

183 for message in messages 

184 if isinstance(message, pyflakes.messages.MultiValueRepeatedKeyLiteral) 

185 ] 

186 

187 if messages: 

188 # Filter out complex cases. We don't want to bother trying to parse 

189 # this stuff and get it right. We can do it on a key-by-key basis. 

190 

191 key_to_messages = create_key_to_messages_dict(messages) 

192 

193 lines = source.split("\n") 

194 

195 for key, messages in key_to_messages.items(): 

196 good = True 

197 for message in messages: 

198 line = lines[message.lineno - 1] 

199 key = message.message_args[0] 

200 

201 if not dict_entry_has_key(line, key): 

202 good = False 

203 

204 if good: 

205 for message in messages: 

206 yield message.lineno 

207 

208 

209def create_key_to_messages_dict( 

210 messages: Iterable[pyflakes.messages.MultiValueRepeatedKeyLiteral], 

211) -> Mapping[Any, Iterable[pyflakes.messages.MultiValueRepeatedKeyLiteral]]: 

212 """Return dict mapping the key to list of messages.""" 

213 dictionary: dict[ 

214 Any, 

215 list[pyflakes.messages.MultiValueRepeatedKeyLiteral], 

216 ] = collections.defaultdict(list) 

217 for message in messages: 

218 dictionary[message.message_args[0]].append(message) 

219 return dictionary 

220 

221 

222def check(source: str) -> Iterable[pyflakes.messages.Message]: 

223 """Return messages from pyflakes.""" 

224 reporter = ListReporter() 

225 try: 

226 pyflakes.api.check(source, filename="<string>", reporter=reporter) 

227 except (AttributeError, RecursionError, UnicodeDecodeError): 

228 pass 

229 return reporter.messages 

230 

231 

232class StubFile: 

233 """Stub out file for pyflakes.""" 

234 

235 def write(self, *_: Any) -> None: 

236 """Stub out.""" 

237 

238 

239class ListReporter(pyflakes.reporter.Reporter): 

240 """Accumulate messages in messages list.""" 

241 

242 def __init__(self) -> None: 

243 """Initialize. 

244 

245 Ignore errors from Reporter. 

246 """ 

247 ignore = StubFile() 

248 pyflakes.reporter.Reporter.__init__(self, ignore, ignore) 

249 self.messages: list[pyflakes.messages.Message] = [] 

250 

251 def flake(self, message: pyflakes.messages.Message) -> None: 

252 """Accumulate messages.""" 

253 self.messages.append(message) 

254 

255 

256def extract_package_name(line: str) -> str | None: 

257 """Return package name in import statement.""" 

258 assert "\\" not in line 

259 assert "(" not in line 

260 assert ")" not in line 

261 assert ";" not in line 

262 

263 if line.lstrip().startswith(("import", "from")): 

264 word = line.split()[1] 

265 else: 

266 # Ignore doctests. 

267 return None 

268 

269 package = word.split(".")[0] 

270 assert " " not in package 

271 

272 return package 

273 

274 

275def multiline_import(line: str, previous_line: str = "") -> bool: 

276 """Return True if import is spans multiples lines.""" 

277 for symbol in "()": 

278 if symbol in line: 

279 return True 

280 

281 return multiline_statement(line, previous_line) 

282 

283 

284def multiline_statement(line: str, previous_line: str = "") -> bool: 

285 """Return True if this is part of a multiline statement.""" 

286 for symbol in "\\:;": 

287 if symbol in line: 

288 return True 

289 

290 sio = io.StringIO(line) 

291 try: 

292 list(tokenize.generate_tokens(sio.readline)) 

293 return previous_line.rstrip().endswith("\\") 

294 except (SyntaxError, tokenize.TokenError): 

295 return True 

296 

297 

298class PendingFix: 

299 """Allows a rewrite operation to span multiple lines. 

300 

301 In the main rewrite loop, every time a helper function returns a 

302 ``PendingFix`` object instead of a string, this object will be called 

303 with the following line. 

304 """ 

305 

306 def __init__(self, line: str) -> None: 

307 """Analyse and store the first line.""" 

308 self.accumulator = collections.deque([line]) 

309 

310 def __call__(self, line: str) -> PendingFix | str: 

311 """Process line considering the accumulator. 

312 

313 Return self to keep processing the following lines or a string 

314 with the final result of all the lines processed at once. 

315 """ 

316 raise NotImplementedError("Abstract method needs to be overwritten") 

317 

318 

319def _valid_char_in_line(char: str, line: str) -> bool: 

320 """Return True if a char appears in the line and is not commented.""" 

321 comment_index = line.find("#") 

322 char_index = line.find(char) 

323 valid_char_in_line = char_index >= 0 and ( 

324 comment_index > char_index or comment_index < 0 

325 ) 

326 return valid_char_in_line 

327 

328 

329def _top_module(module_name: str) -> str: 

330 """Return the name of the top level module in the hierarchy.""" 

331 if module_name[0] == ".": 

332 return "%LOCAL_MODULE%" 

333 return module_name.split(".")[0] 

334 

335 

336def _modules_to_remove( 

337 unused_modules: Iterable[str], 

338 safe_to_remove: Iterable[str] = SAFE_IMPORTS, 

339) -> Iterable[str]: 

340 """Discard unused modules that are not safe to remove from the list.""" 

341 return [x for x in unused_modules if _top_module(x) in safe_to_remove] 

342 

343 

344def _segment_module(segment: str) -> str: 

345 """Extract the module identifier inside the segment. 

346 

347 It might be the case the segment does not have a module (e.g. is composed 

348 just by a parenthesis or line continuation and whitespace). In this 

349 scenario we just keep the segment... These characters are not valid in 

350 identifiers, so they will never be contained in the list of unused modules 

351 anyway. 

352 """ 

353 return segment.strip(string.whitespace + ",\\()") or segment 

354 

355 

356class FilterMultilineImport(PendingFix): 

357 """Remove unused imports from multiline import statements. 

358 

359 This class handles both the cases: "from imports" and "direct imports". 

360 

361 Some limitations exist (e.g. imports with comments, lines joined by ``;``, 

362 etc). In these cases, the statement is left unchanged to avoid problems. 

363 """ 

364 

365 IMPORT_RE = re.compile(r"\bimport\b\s*") 

366 INDENTATION_RE = re.compile(r"^\s*") 

367 BASE_RE = re.compile(r"\bfrom\s+([^ ]+)") 

368 SEGMENT_RE = re.compile( 

369 r"([^,\s]+(?:[\s\\]+as[\s\\]+[^,\s]+)?[,\s\\)]*)", 

370 re.M, 

371 ) 

372 # ^ module + comma + following space (including new line and continuation) 

373 IDENTIFIER_RE = re.compile(r"[^,\s]+") 

374 

375 def __init__( 

376 self, 

377 line: str, 

378 unused_module: Iterable[str] = (), 

379 remove_all_unused_imports: bool = False, 

380 safe_to_remove: Iterable[str] = SAFE_IMPORTS, 

381 previous_line: str = "", 

382 ): 

383 """Receive the same parameters as ``filter_unused_import``.""" 

384 self.remove: Iterable[str] = unused_module 

385 self.parenthesized: bool = "(" in line 

386 self.from_, imports = self.IMPORT_RE.split(line, maxsplit=1) 

387 match = self.BASE_RE.search(self.from_) 

388 self.base = match.group(1) if match else None 

389 self.give_up: bool = False 

390 

391 if not remove_all_unused_imports: 

392 if self.base and _top_module(self.base) not in safe_to_remove: 

393 self.give_up = True 

394 else: 

395 self.remove = _modules_to_remove(self.remove, safe_to_remove) 

396 

397 if "\\" in previous_line: 

398 # Ignore tricky things like "try: \<new line> import" ... 

399 self.give_up = True 

400 

401 self.analyze(line) 

402 

403 PendingFix.__init__(self, imports) 

404 

405 def is_over(self, line: str | None = None) -> bool: 

406 """Return True if the multiline import statement is over.""" 

407 line = line or self.accumulator[-1] 

408 

409 if self.parenthesized: 

410 return _valid_char_in_line(")", line) 

411 

412 return not _valid_char_in_line("\\", line) 

413 

414 def analyze(self, line: str) -> None: 

415 """Decide if the statement will be fixed or left unchanged.""" 

416 if any(ch in line for ch in ";:#"): 

417 self.give_up = True 

418 

419 def fix(self, accumulated: Iterable[str]) -> str: 

420 """Given a collection of accumulated lines, fix the entire import.""" 

421 old_imports = "".join(accumulated) 

422 ending = get_line_ending(old_imports) 

423 # Split imports into segments that contain the module name + 

424 # comma + whitespace and eventual <newline> \ ( ) chars 

425 segments = [x for x in self.SEGMENT_RE.findall(old_imports) if x] 

426 modules = [_segment_module(x) for x in segments] 

427 keep = _filter_imports(modules, self.base, self.remove) 

428 

429 # Short-circuit if no import was discarded 

430 if len(keep) == len(segments): 

431 return self.from_ + "import " + "".join(accumulated) 

432 

433 fixed = "" 

434 if keep: 

435 # Since it is very difficult to deal with all the line breaks and 

436 # continuations, let's use the code layout that already exists and 

437 # just replace the module identifiers inside the first N-1 segments 

438 # + the last segment 

439 templates = list(zip(modules, segments)) 

440 templates = templates[: len(keep) - 1] + templates[-1:] 

441 # It is important to keep the last segment, since it might contain 

442 # important chars like `)` 

443 fixed = "".join( 

444 template.replace(module, keep[i]) 

445 for i, (module, template) in enumerate(templates) 

446 ) 

447 

448 # Fix the edge case: inline parenthesis + just one surviving import 

449 if self.parenthesized and any(ch not in fixed for ch in "()"): 

450 fixed = fixed.strip(string.whitespace + "()") + ending 

451 

452 # Replace empty imports with a "pass" statement 

453 empty = len(fixed.strip(string.whitespace + "\\(),")) < 1 

454 if empty: 

455 match = self.INDENTATION_RE.search(self.from_) 

456 assert match is not None 

457 indentation = match.group(0) 

458 return indentation + "pass" + ending 

459 

460 return self.from_ + "import " + fixed 

461 

462 def __call__(self, line: str | None = None) -> PendingFix | str: 

463 """Accumulate all the lines in the import and then trigger the fix.""" 

464 if line: 

465 self.accumulator.append(line) 

466 self.analyze(line) 

467 if not self.is_over(line): 

468 return self 

469 if self.give_up: 

470 return self.from_ + "import " + "".join(self.accumulator) 

471 

472 return self.fix(self.accumulator) 

473 

474 

475def _filter_imports( 

476 imports: Iterable[str], 

477 parent: str | None = None, 

478 unused_module: Iterable[str] = (), 

479) -> Sequence[str]: 

480 # We compare full module name (``a.module`` not `module`) to 

481 # guarantee the exact same module as detected from pyflakes. 

482 sep = "" if parent and parent[-1] == "." else "." 

483 

484 def full_name(name: str) -> str: 

485 return name if parent is None else parent + sep + name 

486 

487 return [x for x in imports if full_name(x) not in unused_module] 

488 

489 

490def filter_from_import(line: str, unused_module: Iterable[str]) -> str: 

491 """Parse and filter ``from something import a, b, c``. 

492 

493 Return line without unused import modules, or `pass` if all of the 

494 module in import is unused. 

495 """ 

496 (indentation, imports) = re.split( 

497 pattern=r"\bimport\b", 

498 string=line, 

499 maxsplit=1, 

500 ) 

501 match = re.search( 

502 pattern=r"\bfrom\s+([^ ]+)", 

503 string=indentation, 

504 ) 

505 assert match is not None 

506 base_module = match.group(1) 

507 

508 imports = re.split(pattern=r"\s*,\s*", string=imports.strip()) 

509 filtered_imports = _filter_imports(imports, base_module, unused_module) 

510 

511 # All of the import in this statement is unused 

512 if not filtered_imports: 

513 return get_indentation(line) + "pass" + get_line_ending(line) 

514 

515 indentation += "import " 

516 

517 return indentation + ", ".join(filtered_imports) + get_line_ending(line) 

518 

519 

520def break_up_import(line: str) -> str: 

521 """Return line with imports on separate lines.""" 

522 assert "\\" not in line 

523 assert "(" not in line 

524 assert ")" not in line 

525 assert ";" not in line 

526 assert "#" not in line 

527 assert not line.lstrip().startswith("from") 

528 

529 newline = get_line_ending(line) 

530 if not newline: 

531 return line 

532 

533 (indentation, imports) = re.split( 

534 pattern=r"\bimport\b", 

535 string=line, 

536 maxsplit=1, 

537 ) 

538 

539 indentation += "import " 

540 assert newline 

541 

542 return "".join( 

543 [indentation + i.strip() + newline for i in imports.split(",")], 

544 ) 

545 

546 

547def filter_code( 

548 source: str, 

549 additional_imports: Iterable[str] | None = None, 

550 expand_star_imports: bool = False, 

551 remove_all_unused_imports: bool = False, 

552 remove_duplicate_keys: bool = False, 

553 remove_unused_variables: bool = False, 

554 remove_rhs_for_unused_variables: bool = False, 

555 ignore_init_module_imports: bool = False, 

556) -> Iterable[str]: 

557 """Yield code with unused imports removed.""" 

558 imports = SAFE_IMPORTS 

559 if additional_imports: 

560 imports |= frozenset(additional_imports) 

561 del additional_imports 

562 

563 messages = check(source) 

564 

565 if ignore_init_module_imports: 

566 marked_import_line_numbers: frozenset[int] = frozenset() 

567 else: 

568 marked_import_line_numbers = frozenset( 

569 unused_import_line_numbers(messages), 

570 ) 

571 marked_unused_module: dict[int, list[str]] = collections.defaultdict(list) 

572 for line_number, module_name in unused_import_module_name(messages): 

573 marked_unused_module[line_number].append(module_name) 

574 

575 undefined_names: list[str] = [] 

576 if expand_star_imports and not ( 

577 # See explanations in #18. 

578 re.search(r"\b__all__\b", source) 

579 or re.search(r"\bdel\b", source) 

580 ): 

581 marked_star_import_line_numbers = frozenset( 

582 star_import_used_line_numbers(messages), 

583 ) 

584 if len(marked_star_import_line_numbers) > 1: 

585 # Auto expanding only possible for single star import 

586 marked_star_import_line_numbers = frozenset() 

587 else: 

588 for line_number, undefined_name, _ in star_import_usage_undefined_name( 

589 messages, 

590 ): 

591 undefined_names.append(undefined_name) 

592 if not undefined_names: 

593 marked_star_import_line_numbers = frozenset() 

594 else: 

595 marked_star_import_line_numbers = frozenset() 

596 

597 if remove_unused_variables: 

598 marked_variable_line_numbers = frozenset( 

599 unused_variable_line_numbers(messages), 

600 ) 

601 else: 

602 marked_variable_line_numbers = frozenset() 

603 

604 if remove_duplicate_keys: 

605 marked_key_line_numbers: frozenset[int] = frozenset( 

606 duplicate_key_line_numbers(messages, source), 

607 ) 

608 else: 

609 marked_key_line_numbers = frozenset() 

610 

611 line_messages = get_messages_by_line(messages) 

612 

613 sio = io.StringIO(source) 

614 previous_line = "" 

615 result: str | PendingFix = "" 

616 for line_number, line in enumerate(sio.readlines(), start=1): 

617 if isinstance(result, PendingFix): 

618 result = result(line) 

619 elif "#" in line: 

620 result = line 

621 elif line_number in marked_import_line_numbers: 

622 result = filter_unused_import( 

623 line, 

624 unused_module=marked_unused_module[line_number], 

625 remove_all_unused_imports=remove_all_unused_imports, 

626 imports=imports, 

627 previous_line=previous_line, 

628 ) 

629 elif line_number in marked_variable_line_numbers: 

630 result = filter_unused_variable( 

631 line, 

632 drop_rhs=remove_rhs_for_unused_variables, 

633 ) 

634 elif line_number in marked_key_line_numbers: 

635 result = filter_duplicate_key( 

636 line, 

637 line_messages[line_number], 

638 line_number, 

639 marked_key_line_numbers, 

640 source, 

641 ) 

642 elif line_number in marked_star_import_line_numbers: 

643 result = filter_star_import(line, undefined_names) 

644 else: 

645 result = line 

646 

647 if not isinstance(result, PendingFix): 

648 yield result 

649 

650 previous_line = line 

651 

652 

653def get_messages_by_line( 

654 messages: Iterable[pyflakes.messages.Message], 

655) -> Mapping[int, pyflakes.messages.Message]: 

656 """Return dictionary that maps line number to message.""" 

657 line_messages: dict[int, pyflakes.messages.Message] = {} 

658 for message in messages: 

659 line_messages[message.lineno] = message 

660 return line_messages 

661 

662 

663def filter_star_import( 

664 line: str, 

665 marked_star_import_undefined_name: Iterable[str], 

666) -> str: 

667 """Return line with the star import expanded.""" 

668 undefined_name = sorted(set(marked_star_import_undefined_name)) 

669 return re.sub(r"\*", ", ".join(undefined_name), line) 

670 

671 

672def filter_unused_import( 

673 line: str, 

674 unused_module: Iterable[str], 

675 remove_all_unused_imports: bool, 

676 imports: Iterable[str], 

677 previous_line: str = "", 

678) -> PendingFix | str: 

679 """Return line if used, otherwise return None.""" 

680 # Ignore doctests. 

681 if line.lstrip().startswith(">"): 

682 return line 

683 

684 if multiline_import(line, previous_line): 

685 filt = FilterMultilineImport( 

686 line, 

687 unused_module, 

688 remove_all_unused_imports, 

689 imports, 

690 previous_line, 

691 ) 

692 return filt() 

693 

694 is_from_import = line.lstrip().startswith("from") 

695 

696 if "," in line and not is_from_import: 

697 return break_up_import(line) 

698 

699 package = extract_package_name(line) 

700 if not remove_all_unused_imports and package is not None and package not in imports: 

701 return line 

702 

703 if "," in line: 

704 assert is_from_import 

705 return filter_from_import(line, unused_module) 

706 else: 

707 # We need to replace import with "pass" in case the import is the 

708 # only line inside a block. For example, 

709 # "if True:\n import os". In such cases, if the import is 

710 # removed, the block will be left hanging with no body. 

711 return get_indentation(line) + "pass" + get_line_ending(line) 

712 

713 

714def filter_unused_variable( 

715 line: str, 

716 previous_line: str = "", 

717 drop_rhs: bool = False, 

718) -> str: 

719 """Return line if used, otherwise return None.""" 

720 if re.match(EXCEPT_REGEX, line): 

721 return re.sub(r" as \w+:$", ":", line, count=1) 

722 elif multiline_statement(line, previous_line): 

723 return line 

724 elif line.count("=") == 1: 

725 split_line = line.split("=") 

726 assert len(split_line) == 2 

727 value = split_line[1].lstrip() 

728 if "," in split_line[0]: 

729 return line 

730 

731 if is_literal_or_name(value): 

732 # Rather than removing the line, replace with it "pass" to avoid 

733 # a possible hanging block with no body. 

734 value = "pass" + get_line_ending(line) 

735 if drop_rhs: 

736 return get_indentation(line) + value 

737 

738 if drop_rhs: 

739 return "" 

740 return get_indentation(line) + value 

741 else: 

742 return line 

743 

744 

745def filter_duplicate_key( 

746 line: str, 

747 message: pyflakes.messages.Message, 

748 line_number: int, 

749 marked_line_numbers: Iterable[int], 

750 source: str, 

751 previous_line: str = "", 

752) -> str: 

753 """Return '' if first occurrence of the key otherwise return `line`.""" 

754 if marked_line_numbers and line_number == sorted(marked_line_numbers)[0]: 

755 return "" 

756 

757 return line 

758 

759 

760def dict_entry_has_key(line: str, key: Any) -> bool: 

761 """Return True if `line` is a dict entry that uses `key`. 

762 

763 Return False for multiline cases where the line should not be removed by 

764 itself. 

765 

766 """ 

767 if "#" in line: 

768 return False 

769 

770 result = re.match(r"\s*(.*)\s*:\s*(.*),\s*$", line) 

771 if not result: 

772 return False 

773 

774 try: 

775 candidate_key = ast.literal_eval(result.group(1)) 

776 except (SyntaxError, ValueError): 

777 return False 

778 

779 if multiline_statement(result.group(2)): 

780 return False 

781 

782 return cast(bool, candidate_key == key) 

783 

784 

785def is_literal_or_name(value: str) -> bool: 

786 """Return True if value is a literal or a name.""" 

787 try: 

788 ast.literal_eval(value) 

789 return True 

790 except (SyntaxError, ValueError): 

791 pass 

792 

793 if value.strip() in ["dict()", "list()", "set()"]: 

794 return True 

795 

796 # Support removal of variables on the right side. But make sure 

797 # there are no dots, which could mean an access of a property. 

798 return re.match(r"^\w+\s*$", value) is not None 

799 

800 

801def useless_pass_line_numbers( 

802 source: str, 

803 ignore_pass_after_docstring: bool = False, 

804) -> Iterable[int]: 

805 """Yield line numbers of unneeded "pass" statements.""" 

806 sio = io.StringIO(source) 

807 previous_token_type = None 

808 last_pass_row = None 

809 last_pass_indentation = None 

810 previous_line = "" 

811 previous_non_empty_line = "" 

812 for token in tokenize.generate_tokens(sio.readline): 

813 token_type = token[0] 

814 start_row = token[2][0] 

815 line = token[4] 

816 

817 is_pass = token_type == tokenize.NAME and line.strip() == "pass" 

818 

819 # Leading "pass". 

820 if ( 

821 start_row - 1 == last_pass_row 

822 and get_indentation(line) == last_pass_indentation 

823 and token_type in ATOMS 

824 and not is_pass 

825 ): 

826 yield start_row - 1 

827 

828 if is_pass: 

829 last_pass_row = start_row 

830 last_pass_indentation = get_indentation(line) 

831 

832 is_trailing_pass = ( 

833 previous_token_type != tokenize.INDENT 

834 and not previous_line.rstrip().endswith("\\") 

835 ) 

836 

837 is_pass_after_docstring = previous_non_empty_line.rstrip().endswith( 

838 ("'''", '"""'), 

839 ) 

840 

841 # Trailing "pass". 

842 if is_trailing_pass: 

843 if is_pass_after_docstring and ignore_pass_after_docstring: 

844 continue 

845 else: 

846 yield start_row 

847 

848 previous_token_type = token_type 

849 previous_line = line 

850 if line.strip(): 

851 previous_non_empty_line = line 

852 

853 

854def filter_useless_pass( 

855 source: str, 

856 ignore_pass_statements: bool = False, 

857 ignore_pass_after_docstring: bool = False, 

858) -> Iterable[str]: 

859 """Yield code with useless "pass" lines removed.""" 

860 if ignore_pass_statements: 

861 marked_lines: frozenset[int] = frozenset() 

862 else: 

863 try: 

864 marked_lines = frozenset( 

865 useless_pass_line_numbers( 

866 source, 

867 ignore_pass_after_docstring, 

868 ), 

869 ) 

870 except (SyntaxError, tokenize.TokenError): 

871 marked_lines = frozenset() 

872 

873 sio = io.StringIO(source) 

874 for line_number, line in enumerate(sio.readlines(), start=1): 

875 if line_number not in marked_lines: 

876 yield line 

877 

878 

879def get_indentation(line: str) -> str: 

880 """Return leading whitespace.""" 

881 if line.strip(): 

882 non_whitespace_index = len(line) - len(line.lstrip()) 

883 return line[:non_whitespace_index] 

884 else: 

885 return "" 

886 

887 

888def get_line_ending(line: str) -> str: 

889 """Return line ending.""" 

890 non_whitespace_index = len(line.rstrip()) - len(line) 

891 if not non_whitespace_index: 

892 return "" 

893 else: 

894 return line[non_whitespace_index:] 

895 

896 

897def fix_code( 

898 source: str, 

899 additional_imports: Iterable[str] | None = None, 

900 expand_star_imports: bool = False, 

901 remove_all_unused_imports: bool = False, 

902 remove_duplicate_keys: bool = False, 

903 remove_unused_variables: bool = False, 

904 remove_rhs_for_unused_variables: bool = False, 

905 ignore_init_module_imports: bool = False, 

906 ignore_pass_statements: bool = False, 

907 ignore_pass_after_docstring: bool = False, 

908) -> str: 

909 """Return code with all filtering run on it.""" 

910 if not source: 

911 return source 

912 

913 if IGNORE_COMMENT_REGEX.search(source): 

914 return source 

915 

916 # pyflakes does not handle "nonlocal" correctly. 

917 if "nonlocal" in source: 

918 remove_unused_variables = False 

919 

920 filtered_source = None 

921 while True: 

922 filtered_source = "".join( 

923 filter_useless_pass( 

924 "".join( 

925 filter_code( 

926 source, 

927 additional_imports=additional_imports, 

928 expand_star_imports=expand_star_imports, 

929 remove_all_unused_imports=remove_all_unused_imports, 

930 remove_duplicate_keys=remove_duplicate_keys, 

931 remove_unused_variables=remove_unused_variables, 

932 remove_rhs_for_unused_variables=( 

933 remove_rhs_for_unused_variables 

934 ), 

935 ignore_init_module_imports=ignore_init_module_imports, 

936 ), 

937 ), 

938 ignore_pass_statements=ignore_pass_statements, 

939 ignore_pass_after_docstring=ignore_pass_after_docstring, 

940 ), 

941 ) 

942 

943 if filtered_source == source: 

944 break 

945 source = filtered_source 

946 

947 return filtered_source 

948 

949 

950def fix_file( 

951 filename: str, 

952 args: Mapping[str, Any], 

953 standard_out: IO[str] | None = None, 

954) -> int: 

955 """Run fix_code() on a file.""" 

956 if standard_out is None: 

957 standard_out = sys.stdout 

958 encoding = detect_encoding(filename) 

959 with open_with_encoding(filename, encoding=encoding) as input_file: 

960 return _fix_file( 

961 input_file, 

962 filename, 

963 args, 

964 args["write_to_stdout"], 

965 cast(IO[str], standard_out), 

966 encoding=encoding, 

967 ) 

968 

969 

970def _fix_file( 

971 input_file: IO[str], 

972 filename: str, 

973 args: Mapping[str, Any], 

974 write_to_stdout: bool, 

975 standard_out: IO[str], 

976 encoding: str | None = None, 

977) -> int: 

978 source = input_file.read() 

979 original_source = source 

980 

981 isInitFile = os.path.basename(filename) == "__init__.py" 

982 

983 if args["ignore_init_module_imports"] and isInitFile: 

984 ignore_init_module_imports = True 

985 else: 

986 ignore_init_module_imports = False 

987 

988 filtered_source = fix_code( 

989 source, 

990 additional_imports=(args["imports"].split(",") if "imports" in args else None), 

991 expand_star_imports=args["expand_star_imports"], 

992 remove_all_unused_imports=args["remove_all_unused_imports"], 

993 remove_duplicate_keys=args["remove_duplicate_keys"], 

994 remove_unused_variables=args["remove_unused_variables"], 

995 remove_rhs_for_unused_variables=(args["remove_rhs_for_unused_variables"]), 

996 ignore_init_module_imports=ignore_init_module_imports, 

997 ignore_pass_statements=args["ignore_pass_statements"], 

998 ignore_pass_after_docstring=args["ignore_pass_after_docstring"], 

999 ) 

1000 

1001 if original_source != filtered_source: 

1002 if args["check"]: 

1003 standard_out.write( 

1004 f"{filename}: Unused imports/variables detected{os.linesep}", 

1005 ) 

1006 return 1 

1007 if args["check_diff"]: 

1008 diff = get_diff_text( 

1009 io.StringIO(original_source).readlines(), 

1010 io.StringIO(filtered_source).readlines(), 

1011 filename, 

1012 ) 

1013 standard_out.write("".join(diff)) 

1014 return 1 

1015 if write_to_stdout: 

1016 standard_out.write(filtered_source) 

1017 elif args["in_place"]: 

1018 with open_with_encoding( 

1019 filename, 

1020 mode="w", 

1021 encoding=encoding, 

1022 ) as output_file: 

1023 output_file.write(filtered_source) 

1024 _LOGGER.info("Fixed %s", filename) 

1025 else: 

1026 diff = get_diff_text( 

1027 io.StringIO(original_source).readlines(), 

1028 io.StringIO(filtered_source).readlines(), 

1029 filename, 

1030 ) 

1031 standard_out.write("".join(diff)) 

1032 elif write_to_stdout: 

1033 standard_out.write(filtered_source) 

1034 else: 

1035 if (args["check"] or args["check_diff"]) and not args["quiet"]: 

1036 standard_out.write(f"{filename}: No issues detected!{os.linesep}") 

1037 else: 

1038 _LOGGER.debug("Clean %s: nothing to fix", filename) 

1039 

1040 return 0 

1041 

1042 

1043def open_with_encoding( 

1044 filename: str, 

1045 encoding: str | None, 

1046 mode: str = "r", 

1047 limit_byte_check: int = -1, 

1048) -> IO[str]: 

1049 """Return opened file with a specific encoding.""" 

1050 if not encoding: 

1051 encoding = detect_encoding(filename, limit_byte_check=limit_byte_check) 

1052 

1053 return open( 

1054 filename, 

1055 mode=mode, 

1056 encoding=encoding, 

1057 newline="", # Preserve line endings 

1058 ) 

1059 

1060 

1061def detect_encoding(filename: str, limit_byte_check: int = -1) -> str: 

1062 """Return file encoding.""" 

1063 try: 

1064 with open(filename, "rb") as input_file: 

1065 encoding = _detect_encoding(input_file.readline) 

1066 

1067 # Check for correctness of encoding. 

1068 with open_with_encoding(filename, encoding) as input_file: 

1069 input_file.read(limit_byte_check) 

1070 

1071 return encoding 

1072 except (LookupError, SyntaxError, UnicodeDecodeError): 

1073 return "latin-1" 

1074 

1075 

1076def _detect_encoding(readline: Callable[[], bytes]) -> str: 

1077 """Return file encoding.""" 

1078 try: 

1079 encoding = tokenize.detect_encoding(readline)[0] 

1080 return encoding 

1081 except (LookupError, SyntaxError, UnicodeDecodeError): 

1082 return "latin-1" 

1083 

1084 

1085def get_diff_text(old: Sequence[str], new: Sequence[str], filename: str) -> str: 

1086 """Return text of unified diff between old and new.""" 

1087 newline = "\n" 

1088 diff = difflib.unified_diff( 

1089 old, 

1090 new, 

1091 "original/" + filename, 

1092 "fixed/" + filename, 

1093 lineterm=newline, 

1094 ) 

1095 

1096 text = "" 

1097 for line in diff: 

1098 text += line 

1099 

1100 # Work around missing newline (http://bugs.python.org/issue2142). 

1101 if not line.endswith(newline): 

1102 text += newline + r"\ No newline at end of file" + newline 

1103 

1104 return text 

1105 

1106 

1107def _split_comma_separated(string: str) -> set[str]: 

1108 """Return a set of strings.""" 

1109 return {text.strip() for text in string.split(",") if text.strip()} 

1110 

1111 

1112def is_python_file(filename: str) -> bool: 

1113 """Return True if filename is Python file.""" 

1114 if filename.endswith(".py"): 

1115 return True 

1116 

1117 try: 

1118 with open_with_encoding( 

1119 filename, 

1120 None, 

1121 limit_byte_check=MAX_PYTHON_FILE_DETECTION_BYTES, 

1122 ) as f: 

1123 text = f.read(MAX_PYTHON_FILE_DETECTION_BYTES) 

1124 if not text: 

1125 return False 

1126 first_line = text.splitlines()[0] 

1127 except (OSError, IndexError): 

1128 return False 

1129 

1130 if not PYTHON_SHEBANG_REGEX.match(first_line): 

1131 return False 

1132 

1133 return True 

1134 

1135 

1136def is_exclude_file(filename: str, exclude: Iterable[str]) -> bool: 

1137 """Return True if file matches exclude pattern.""" 

1138 base_name = os.path.basename(filename) 

1139 

1140 if base_name.startswith("."): 

1141 return True 

1142 

1143 for pattern in exclude: 

1144 if fnmatch.fnmatch(base_name, pattern): 

1145 return True 

1146 if fnmatch.fnmatch(filename, pattern): 

1147 return True 

1148 return False 

1149 

1150 

1151def match_file(filename: str, exclude: Iterable[str]) -> bool: 

1152 """Return True if file is okay for modifying/recursing.""" 

1153 if is_exclude_file(filename, exclude): 

1154 _LOGGER.debug("Skipped %s: matched to exclude pattern", filename) 

1155 return False 

1156 

1157 if not os.path.isdir(filename) and not is_python_file(filename): 

1158 return False 

1159 

1160 return True 

1161 

1162 

1163def find_files( 

1164 filenames: list[str], 

1165 recursive: bool, 

1166 exclude: Iterable[str], 

1167) -> Iterable[str]: 

1168 """Yield filenames.""" 

1169 while filenames: 

1170 name = filenames.pop(0) 

1171 if recursive and os.path.isdir(name): 

1172 for root, directories, children in os.walk(name): 

1173 filenames += [ 

1174 os.path.join(root, f) 

1175 for f in children 

1176 if match_file( 

1177 os.path.join(root, f), 

1178 exclude, 

1179 ) 

1180 ] 

1181 directories[:] = [ 

1182 d 

1183 for d in directories 

1184 if match_file( 

1185 os.path.join(root, d), 

1186 exclude, 

1187 ) 

1188 ] 

1189 else: 

1190 if not is_exclude_file(name, exclude): 

1191 yield name 

1192 else: 

1193 _LOGGER.debug("Skipped %s: matched to exclude pattern", name) 

1194 

1195 

1196def process_pyproject_toml(toml_file_path: str) -> MutableMapping[str, Any] | None: 

1197 """Extract config mapping from pyproject.toml file.""" 

1198 try: 

1199 import tomllib 

1200 except ModuleNotFoundError: 

1201 import tomli as tomllib 

1202 

1203 with open(toml_file_path, "rb") as f: 

1204 return tomllib.load(f).get("tool", {}).get("autoflake", None) 

1205 

1206 

1207def process_config_file(config_file_path: str) -> MutableMapping[str, Any] | None: 

1208 """Extract config mapping from config file.""" 

1209 import configparser 

1210 

1211 reader = configparser.ConfigParser() 

1212 reader.read(config_file_path, encoding="utf-8") 

1213 if not reader.has_section("autoflake"): 

1214 return None 

1215 

1216 return reader["autoflake"] 

1217 

1218 

1219def find_and_process_config(args: Mapping[str, Any]) -> MutableMapping[str, Any] | None: 

1220 # Configuration file parsers {filename: parser function}. 

1221 CONFIG_FILES: Mapping[str, Callable[[str], MutableMapping[str, Any] | None]] = { 

1222 "pyproject.toml": process_pyproject_toml, 

1223 "setup.cfg": process_config_file, 

1224 } 

1225 # Traverse the file tree common to all files given as argument looking for 

1226 # a configuration file 

1227 config_path = os.path.commonpath([os.path.abspath(file) for file in args["files"]]) 

1228 config: Mapping[str, Any] | None = None 

1229 while True: 

1230 for config_file, processor in CONFIG_FILES.items(): 

1231 config_file_path = os.path.join( 

1232 os.path.join(config_path, config_file), 

1233 ) 

1234 if os.path.isfile(config_file_path): 

1235 config = processor(config_file_path) 

1236 if config is not None: 

1237 break 

1238 if config is not None: 

1239 break 

1240 config_path, tail = os.path.split(config_path) 

1241 if not tail: 

1242 break 

1243 return config 

1244 

1245 

1246def merge_configuration_file( 

1247 flag_args: MutableMapping[str, Any], 

1248) -> tuple[MutableMapping[str, Any], bool]: 

1249 """Merge configuration from a file into args.""" 

1250 BOOL_TYPES = { 

1251 "1": True, 

1252 "yes": True, 

1253 "true": True, 

1254 "on": True, 

1255 "0": False, 

1256 "no": False, 

1257 "false": False, 

1258 "off": False, 

1259 } 

1260 

1261 if "config_file" in flag_args: 

1262 config_file = pathlib.Path(flag_args["config_file"]).resolve() 

1263 process_method = process_config_file 

1264 if config_file.suffix == ".toml": 

1265 process_method = process_pyproject_toml 

1266 

1267 config = process_method(str(config_file)) 

1268 

1269 if not config: 

1270 _LOGGER.error( 

1271 "can't parse config file '%s'", 

1272 config_file, 

1273 ) 

1274 return flag_args, False 

1275 else: 

1276 config = find_and_process_config(flag_args) 

1277 

1278 BOOL_FLAGS = { 

1279 "check", 

1280 "check_diff", 

1281 "expand_star_imports", 

1282 "ignore_init_module_imports", 

1283 "ignore_pass_after_docstring", 

1284 "ignore_pass_statements", 

1285 "in_place", 

1286 "quiet", 

1287 "recursive", 

1288 "remove_all_unused_imports", 

1289 "remove_duplicate_keys", 

1290 "remove_rhs_for_unused_variables", 

1291 "remove_unused_variables", 

1292 "write_to_stdout", 

1293 } 

1294 

1295 config_args: dict[str, Any] = {} 

1296 if config is not None: 

1297 for name, value in config.items(): 

1298 arg = name.replace("-", "_") 

1299 if arg in BOOL_FLAGS: 

1300 # boolean properties 

1301 if isinstance(value, str): 

1302 value = BOOL_TYPES.get(value.lower(), value) 

1303 if not isinstance(value, bool): 

1304 _LOGGER.error( 

1305 "'%s' in the config file should be a boolean", 

1306 name, 

1307 ) 

1308 return flag_args, False 

1309 config_args[arg] = value 

1310 else: 

1311 if isinstance(value, list) and all( 

1312 isinstance(val, str) for val in value 

1313 ): 

1314 value = ",".join(str(val) for val in value) 

1315 if not isinstance(value, str): 

1316 _LOGGER.error( 

1317 "'%s' in the config file should be a comma separated" 

1318 " string or list of strings", 

1319 name, 

1320 ) 

1321 return flag_args, False 

1322 

1323 config_args[arg] = value 

1324 

1325 # merge args that can be merged 

1326 merged_args = {} 

1327 mergeable_keys = {"imports", "exclude"} 

1328 for key in mergeable_keys: 

1329 values = ( 

1330 v for v in (config_args.get(key), flag_args.get(key)) if v is not None 

1331 ) 

1332 value = ",".join(values) 

1333 if value != "": 

1334 merged_args[key] = value 

1335 

1336 default_args = {arg: False for arg in BOOL_FLAGS} 

1337 return { 

1338 **default_args, 

1339 **config_args, 

1340 **flag_args, 

1341 **merged_args, 

1342 }, True 

1343 

1344 

1345def _main( 

1346 argv: Sequence[str], 

1347 standard_out: IO[str] | None, 

1348 standard_error: IO[str] | None, 

1349 standard_input: IO[str] | None = None, 

1350) -> int: 

1351 """Return exit status. 

1352 

1353 0 means no error. 

1354 """ 

1355 import argparse 

1356 

1357 parser = argparse.ArgumentParser( 

1358 description=__doc__, 

1359 prog="autoflake", 

1360 argument_default=argparse.SUPPRESS, 

1361 ) 

1362 check_group = parser.add_mutually_exclusive_group() 

1363 check_group.add_argument( 

1364 "-c", 

1365 "--check", 

1366 action="store_true", 

1367 help="return error code if changes are needed", 

1368 ) 

1369 check_group.add_argument( 

1370 "-cd", 

1371 "--check-diff", 

1372 action="store_true", 

1373 help="return error code if changes are needed, also display file diffs", 

1374 ) 

1375 

1376 imports_group = parser.add_mutually_exclusive_group() 

1377 imports_group.add_argument( 

1378 "--imports", 

1379 help="by default, only unused standard library " 

1380 "imports are removed; specify a comma-separated " 

1381 "list of additional modules/packages", 

1382 ) 

1383 imports_group.add_argument( 

1384 "--remove-all-unused-imports", 

1385 action="store_true", 

1386 help="remove all unused imports (not just those from " "the standard library)", 

1387 ) 

1388 

1389 parser.add_argument( 

1390 "-r", 

1391 "--recursive", 

1392 action="store_true", 

1393 help="drill down directories recursively", 

1394 ) 

1395 parser.add_argument( 

1396 "-j", 

1397 "--jobs", 

1398 type=int, 

1399 metavar="n", 

1400 default=0, 

1401 help="number of parallel jobs; " "match CPU count if value is 0 (default: 0)", 

1402 ) 

1403 parser.add_argument( 

1404 "--exclude", 

1405 metavar="globs", 

1406 help="exclude file/directory names that match these " "comma-separated globs", 

1407 ) 

1408 parser.add_argument( 

1409 "--expand-star-imports", 

1410 action="store_true", 

1411 help="expand wildcard star imports with undefined " 

1412 "names; this only triggers if there is only " 

1413 "one star import in the file; this is skipped if " 

1414 "there are any uses of `__all__` or `del` in the " 

1415 "file", 

1416 ) 

1417 parser.add_argument( 

1418 "--ignore-init-module-imports", 

1419 action="store_true", 

1420 help="exclude __init__.py when removing unused " "imports", 

1421 ) 

1422 parser.add_argument( 

1423 "--remove-duplicate-keys", 

1424 action="store_true", 

1425 help="remove all duplicate keys in objects", 

1426 ) 

1427 parser.add_argument( 

1428 "--remove-unused-variables", 

1429 action="store_true", 

1430 help="remove unused variables", 

1431 ) 

1432 parser.add_argument( 

1433 "--remove-rhs-for-unused-variables", 

1434 action="store_true", 

1435 help="remove RHS of statements when removing unused " "variables (unsafe)", 

1436 ) 

1437 parser.add_argument( 

1438 "--ignore-pass-statements", 

1439 action="store_true", 

1440 help="ignore all pass statements", 

1441 ) 

1442 parser.add_argument( 

1443 "--ignore-pass-after-docstring", 

1444 action="store_true", 

1445 help='ignore pass statements after a newline ending on \'"""\'', 

1446 ) 

1447 parser.add_argument( 

1448 "--version", 

1449 action="version", 

1450 version="%(prog)s " + __version__, 

1451 ) 

1452 parser.add_argument( 

1453 "--quiet", 

1454 action="store_true", 

1455 help="Suppress output if there are no issues", 

1456 ) 

1457 parser.add_argument( 

1458 "-v", 

1459 "--verbose", 

1460 action="count", 

1461 dest="verbosity", 

1462 default=0, 

1463 help="print more verbose logs (you can " "repeat `-v` to make it more verbose)", 

1464 ) 

1465 parser.add_argument( 

1466 "--stdin-display-name", 

1467 dest="stdin_display_name", 

1468 default="stdin", 

1469 help="the name used when processing input from stdin", 

1470 ) 

1471 

1472 parser.add_argument( 

1473 "--config", 

1474 dest="config_file", 

1475 help=( 

1476 "Explicitly set the config file " 

1477 "instead of auto determining based on file location" 

1478 ), 

1479 ) 

1480 

1481 parser.add_argument("files", nargs="+", help="files to format") 

1482 

1483 output_group = parser.add_mutually_exclusive_group() 

1484 output_group.add_argument( 

1485 "-i", 

1486 "--in-place", 

1487 action="store_true", 

1488 help="make changes to files instead of printing diffs", 

1489 ) 

1490 output_group.add_argument( 

1491 "-s", 

1492 "--stdout", 

1493 action="store_true", 

1494 dest="write_to_stdout", 

1495 help=( 

1496 "print changed text to stdout. defaults to true " 

1497 "when formatting stdin, or to false otherwise" 

1498 ), 

1499 ) 

1500 

1501 args: MutableMapping[str, Any] = vars(parser.parse_args(argv[1:])) 

1502 

1503 if standard_error is None: 

1504 _LOGGER.addHandler(logging.NullHandler()) 

1505 else: 

1506 _LOGGER.addHandler(logging.StreamHandler(standard_error)) 

1507 loglevels = [logging.WARNING, logging.INFO, logging.DEBUG] 

1508 try: 

1509 loglevel = loglevels[args["verbosity"]] 

1510 except IndexError: # Too much -v 

1511 loglevel = loglevels[-1] 

1512 _LOGGER.setLevel(loglevel) 

1513 

1514 args, success = merge_configuration_file(args) 

1515 if not success: 

1516 return 1 

1517 

1518 if args["remove_rhs_for_unused_variables"] and not ( 

1519 args["remove_unused_variables"] 

1520 ): 

1521 _LOGGER.error( 

1522 "Using --remove-rhs-for-unused-variables only makes sense when " 

1523 "used with --remove-unused-variables", 

1524 ) 

1525 return 1 

1526 

1527 if "exclude" in args: 

1528 args["exclude"] = _split_comma_separated(args["exclude"]) 

1529 else: 

1530 args["exclude"] = set() 

1531 

1532 if args["jobs"] < 1: 

1533 worker_count = os.cpu_count() 

1534 if sys.platform == "win32": 

1535 # Work around https://bugs.python.org/issue26903 

1536 worker_count = min(worker_count, 60) 

1537 args["jobs"] = worker_count or 1 

1538 

1539 filenames = list(set(args["files"])) 

1540 

1541 # convert argparse namespace to a dict so that it can be serialized 

1542 # by multiprocessing 

1543 exit_status = 0 

1544 files = list(find_files(filenames, args["recursive"], args["exclude"])) 

1545 if ( 

1546 args["jobs"] == 1 

1547 or len(files) == 1 

1548 or args["jobs"] == 1 

1549 or "-" in files 

1550 or standard_out is not None 

1551 ): 

1552 for name in files: 

1553 if name == "-" and standard_input is not None: 

1554 exit_status |= _fix_file( 

1555 standard_input, 

1556 args["stdin_display_name"], 

1557 args=args, 

1558 write_to_stdout=True, 

1559 standard_out=standard_out or sys.stdout, 

1560 ) 

1561 else: 

1562 try: 

1563 exit_status |= fix_file( 

1564 name, 

1565 args=args, 

1566 standard_out=standard_out, 

1567 ) 

1568 except OSError as exception: 

1569 _LOGGER.error(str(exception)) 

1570 exit_status |= 1 

1571 else: 

1572 import multiprocessing 

1573 

1574 with multiprocessing.Pool(args["jobs"]) as pool: 

1575 futs = [] 

1576 for name in files: 

1577 fut = pool.apply_async(fix_file, args=(name, args)) 

1578 futs.append(fut) 

1579 for fut in futs: 

1580 try: 

1581 exit_status |= fut.get() 

1582 except OSError as exception: 

1583 _LOGGER.error(str(exception)) 

1584 exit_status |= 1 

1585 

1586 return exit_status 

1587 

1588 

1589def main() -> int: 

1590 """Command-line entry point.""" 

1591 try: 

1592 # Exit on broken pipe. 

1593 signal.signal(signal.SIGPIPE, signal.SIG_DFL) 

1594 except AttributeError: # pragma: no cover 

1595 # SIGPIPE is not available on Windows. 

1596 pass 

1597 

1598 try: 

1599 return _main( 

1600 sys.argv, 

1601 standard_out=None, 

1602 standard_error=sys.stderr, 

1603 standard_input=sys.stdin, 

1604 ) 

1605 except KeyboardInterrupt: # pragma: no cover 

1606 return 2 # pragma: no cover 

1607 

1608 

1609if __name__ == "__main__": 

1610 sys.exit(main())