Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/libcst/codemod/_cli.py: 26%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright (c) Meta Platforms, Inc. and affiliates.
2#
3# This source code is licensed under the MIT license found in the
4# LICENSE file in the root directory of this source tree.
5#
6"""
7Provides helpers for CLI interaction.
8"""
10import difflib
11import functools
12import os.path
13import re
14import subprocess
15import sys
16import time
17import traceback
18from concurrent.futures import as_completed, Executor
19from copy import deepcopy
20from dataclasses import dataclass
21from multiprocessing import cpu_count
22from pathlib import Path
23from typing import AnyStr, Callable, cast, Dict, List, Optional, Sequence, Type, Union
24from warnings import warn
26from libcst import parse_module, PartialParserConfig
27from libcst.codemod._codemod import Codemod
28from libcst.codemod._context import CodemodContext
29from libcst.codemod._dummy_pool import DummyExecutor
30from libcst.codemod._runner import (
31 SkipFile,
32 SkipReason,
33 transform_module,
34 TransformExit,
35 TransformFailure,
36 TransformResult,
37 TransformSkip,
38 TransformSuccess,
39)
40from libcst.helpers import calculate_module_and_package
41from libcst.metadata import FullRepoManager
43_DEFAULT_GENERATED_CODE_MARKER: str = f"@gen{''}erated"
46def invoke_formatter(formatter_args: Sequence[str], code: AnyStr) -> AnyStr:
47 """
48 Given a code string, run an external formatter on the code and return new
49 formatted code.
50 """
52 # Make sure there is something to run
53 if len(formatter_args) == 0:
54 raise ValueError("No formatter configured but code formatting requested.")
56 # Invoke the formatter, giving it the code as stdin and assuming the formatted
57 # code comes from stdout.
58 work_with_bytes = isinstance(code, bytes)
59 return cast(
60 AnyStr,
61 subprocess.check_output(
62 formatter_args,
63 input=code,
64 universal_newlines=not work_with_bytes,
65 encoding=None if work_with_bytes else "utf-8",
66 ),
67 )
70def print_execution_result(result: TransformResult) -> None:
71 for warning in result.warning_messages:
72 print(f"WARNING: {warning}", file=sys.stderr)
74 if isinstance(result, TransformFailure):
75 error = result.error
76 if isinstance(error, subprocess.CalledProcessError):
77 print(error.output.decode("utf-8"), file=sys.stderr)
78 print(result.traceback_str, file=sys.stderr)
81def gather_files(
82 files_or_dirs: Sequence[str], *, include_stubs: bool = False
83) -> List[str]:
84 """
85 Given a list of files or directories (can be intermingled), return a list of
86 all python files that exist at those locations. If ``include_stubs`` is ``True``,
87 this will include ``.py`` and ``.pyi`` stub files. If it is ``False``, only
88 ``.py`` files will be included in the returned list.
89 """
90 ret: List[str] = []
91 for fd in files_or_dirs:
92 if os.path.isfile(fd):
93 ret.append(fd)
94 elif os.path.isdir(fd):
95 ret.extend(
96 str(p)
97 for p in Path(fd).rglob("*.py*")
98 if Path.is_file(p)
99 and (
100 str(p).endswith("py") or (include_stubs and str(p).endswith("pyi"))
101 )
102 )
103 return sorted(ret)
106def diff_code(
107 oldcode: str, newcode: str, context: int, *, filename: Optional[str] = None
108) -> str:
109 """
110 Given two strings representing a module before and after a codemod, produce
111 a unified diff of the changes with ``context`` lines of context. Optionally,
112 assign the ``filename`` to the change, and if it is not available, assume
113 that the change was performed on stdin/stdout. If no change is detected,
114 return an empty string instead of returning an empty unified diff. This is
115 comparable to revision control software which only shows differences for
116 files that have changed.
117 """
119 if oldcode == newcode:
120 return ""
122 if filename:
123 difflines = difflib.unified_diff(
124 oldcode.split("\n"),
125 newcode.split("\n"),
126 fromfile=filename,
127 tofile=filename,
128 lineterm="",
129 n=context,
130 )
131 else:
132 difflines = difflib.unified_diff(
133 oldcode.split("\n"), newcode.split("\n"), lineterm="", n=context
134 )
135 return "\n".join(difflines)
138def exec_transform_with_prettyprint(
139 transform: Codemod,
140 code: str,
141 *,
142 include_generated: bool = False,
143 generated_code_marker: str = _DEFAULT_GENERATED_CODE_MARKER,
144 format_code: bool = False,
145 formatter_args: Sequence[str] = (),
146 python_version: Optional[str] = None,
147) -> Optional[str]:
148 """
149 Given an instantiated codemod and a string representing a module, transform that
150 code by executing the transform, optionally invoking the formatter and finally
151 printing any generated warnings to stderr. If the code includes the generated
152 marker at any spot and ``include_generated`` is not set to ``True``, the code
153 will not be modified. If ``format_code`` is set to ``False`` or the instantiated
154 codemod does not modify the code, the code will not be formatted. If a
155 ``python_version`` is provided, then we will parse the module using
156 this version. Otherwise, we will use the version of the currently executing python
157 binary.
159 In all cases a module will be returned. Whether it is changed depends on the
160 input parameters as well as the codemod itself.
161 """
163 if not include_generated and generated_code_marker in code:
164 print(
165 "WARNING: Code is generated and we are set to ignore generated code, "
166 + "skipping!",
167 file=sys.stderr,
168 )
169 return code
171 result = transform_module(transform, code, python_version=python_version)
172 maybe_code: Optional[str] = (
173 None
174 if isinstance(result, (TransformFailure, TransformExit, TransformSkip))
175 else result.code
176 )
178 if maybe_code is not None and format_code:
179 try:
180 maybe_code = invoke_formatter(formatter_args, maybe_code)
181 except Exception as ex:
182 # Failed to format code, treat as a failure and make sure that
183 # we print the exception for debugging.
184 maybe_code = None
185 result = TransformFailure(
186 error=ex,
187 traceback_str=traceback.format_exc(),
188 warning_messages=result.warning_messages,
189 )
191 # Finally, print the output, regardless of what happened
192 print_execution_result(result)
193 return maybe_code
196@dataclass(frozen=True)
197class ExecutionResult:
198 # File we have results for
199 filename: str
200 # Whether we actually changed the code for the file or not
201 changed: bool
202 # The actual result
203 transform_result: TransformResult
206@dataclass(frozen=True)
207class ExecutionConfig:
208 blacklist_patterns: Sequence[str] = ()
209 format_code: bool = False
210 formatter_args: Sequence[str] = ()
211 generated_code_marker: str = _DEFAULT_GENERATED_CODE_MARKER
212 include_generated: bool = False
213 python_version: Optional[str] = None
214 repo_root: Optional[str] = None
215 unified_diff: Optional[int] = None
218def _prepare_context(
219 repo_root: str,
220 filename: str,
221 scratch: Dict[str, object],
222 repo_manager: Optional[FullRepoManager],
223) -> CodemodContext:
224 # determine the module and package name for this file
225 try:
226 module_name_and_package = calculate_module_and_package(repo_root, filename)
227 mod_name = module_name_and_package.name
228 pkg_name = module_name_and_package.package
229 except ValueError as ex:
230 print(f"Failed to determine module name for {filename}: {ex}", file=sys.stderr)
231 mod_name = None
232 pkg_name = None
233 return CodemodContext(
234 scratch=scratch,
235 filename=filename,
236 full_module_name=mod_name,
237 full_package_name=pkg_name,
238 metadata_manager=repo_manager,
239 )
242def _instantiate_transformer(
243 transformer: Union[Codemod, Type[Codemod]],
244 repo_root: str,
245 filename: str,
246 original_scratch: Dict[str, object],
247 codemod_kwargs: Dict[str, object],
248 repo_manager: Optional[FullRepoManager],
249) -> Codemod:
250 if isinstance(transformer, type):
251 return transformer( # type: ignore
252 context=_prepare_context(repo_root, filename, {}, repo_manager),
253 **codemod_kwargs,
254 )
255 transformer.context = _prepare_context(
256 repo_root, filename, deepcopy(original_scratch), repo_manager
257 )
258 return transformer
261def _check_for_skip(
262 filename: str, config: ExecutionConfig
263) -> Union[ExecutionResult, bytes]:
264 for pattern in config.blacklist_patterns:
265 if re.fullmatch(pattern, filename):
266 return ExecutionResult(
267 filename=filename,
268 changed=False,
269 transform_result=TransformSkip(
270 skip_reason=SkipReason.BLACKLISTED,
271 skip_description=f"Blacklisted by pattern {pattern}.",
272 ),
273 )
275 with open(filename, "rb") as fp:
276 oldcode = fp.read()
278 # Skip generated files
279 if (
280 not config.include_generated
281 and config.generated_code_marker.encode("utf-8") in oldcode
282 ):
283 return ExecutionResult(
284 filename=filename,
285 changed=False,
286 transform_result=TransformSkip(
287 skip_reason=SkipReason.GENERATED,
288 skip_description="Generated file.",
289 ),
290 )
291 return oldcode
294def _execute_transform(
295 transformer: Union[Codemod, Type[Codemod]],
296 filename: str,
297 config: ExecutionConfig,
298 original_scratch: Dict[str, object],
299 codemod_args: Optional[Dict[str, object]],
300 repo_manager: Optional[FullRepoManager],
301) -> ExecutionResult:
302 warnings: list[str] = []
303 try:
304 oldcode = _check_for_skip(filename, config)
305 if isinstance(oldcode, ExecutionResult):
306 return oldcode
308 transformer_instance = _instantiate_transformer(
309 transformer,
310 config.repo_root or ".",
311 filename,
312 original_scratch,
313 codemod_args or {},
314 repo_manager,
315 )
317 # Run the transform, bail if we failed or if we aren't formatting code
318 try:
319 input_tree = parse_module(
320 oldcode,
321 config=(
322 PartialParserConfig(python_version=str(config.python_version))
323 if config.python_version is not None
324 else PartialParserConfig()
325 ),
326 )
327 output_tree = transformer_instance.transform_module(input_tree)
328 newcode = output_tree.bytes
329 encoding = output_tree.encoding
330 warnings.extend(transformer_instance.context.warnings)
331 except SkipFile as ex:
332 warnings.extend(transformer_instance.context.warnings)
333 return ExecutionResult(
334 filename=filename,
335 changed=False,
336 transform_result=TransformSkip(
337 skip_reason=SkipReason.OTHER,
338 skip_description=str(ex),
339 warning_messages=warnings,
340 ),
341 )
343 # Call formatter if needed, but only if we actually changed something in this
344 # file
345 if config.format_code and newcode != oldcode:
346 newcode = invoke_formatter(config.formatter_args, newcode)
348 # Format as unified diff if needed, otherwise save it back
349 changed = oldcode != newcode
350 if config.unified_diff:
351 newcode = diff_code(
352 oldcode.decode(encoding),
353 newcode.decode(encoding),
354 config.unified_diff,
355 filename=filename,
356 )
357 else:
358 # Write back if we changed
359 if changed:
360 with open(filename, "wb") as fp:
361 fp.write(newcode)
362 # Not strictly necessary, but saves space in pickle since we won't use it
363 newcode = ""
365 # Inform success
366 return ExecutionResult(
367 filename=filename,
368 changed=changed,
369 transform_result=TransformSuccess(warning_messages=warnings, code=newcode),
370 )
372 except KeyboardInterrupt:
373 return ExecutionResult(
374 filename=filename,
375 changed=False,
376 transform_result=TransformExit(warning_messages=warnings),
377 )
378 except Exception as ex:
379 return ExecutionResult(
380 filename=filename,
381 changed=False,
382 transform_result=TransformFailure(
383 error=ex,
384 traceback_str=traceback.format_exc(),
385 warning_messages=warnings,
386 ),
387 )
390class Progress:
391 ERASE_CURRENT_LINE: str = "\r\033[2K"
393 def __init__(self, *, enabled: bool, total: int) -> None:
394 self.enabled = enabled
395 self.total = total
396 # 1/100 = 0, len("0") = 1, precision = 0, more digits for more files
397 self.pretty_precision: int = len(str(self.total // 100)) - 1
398 # Pretend we start processing immediately. This is not true, but it's
399 # close enough to true.
400 self.started_at: float = time.time()
402 def print(self, finished: int) -> None:
403 if not self.enabled:
404 return
405 left = self.total - finished
406 percent = 100.0 * (float(finished) / float(self.total))
407 elapsed_time = max(time.time() - self.started_at, 0)
409 print(
410 f"{self.ERASE_CURRENT_LINE}{self._human_seconds(elapsed_time)} {percent:.{self.pretty_precision}f}% complete, {self.estimate_completion(elapsed_time, finished, left)} estimated for {left} files to go...",
411 end="",
412 file=sys.stderr,
413 )
415 def _human_seconds(self, seconds: Union[int, float]) -> str:
416 """
417 This returns a string which is a human-ish readable elapsed time such
418 as 30.42s or 10m 31s
419 """
421 minutes, seconds = divmod(seconds, 60)
422 hours, minutes = divmod(minutes, 60)
423 if hours > 0:
424 return f"{hours:.0f}h {minutes:02.0f}m {seconds:02.0f}s"
425 elif minutes > 0:
426 return f"{minutes:02.0f}m {seconds:02.0f}s"
427 else:
428 return f"{seconds:02.2f}s"
430 def estimate_completion(
431 self, elapsed_seconds: float, files_finished: int, files_left: int
432 ) -> str:
433 """
434 Computes a really basic estimated completion given a number of
435 operations still to do.
436 """
438 if files_finished <= 0 or elapsed_seconds == 0:
439 # Technically infinite but calculating sounds better.
440 return "[calculating]"
442 fps = files_finished / elapsed_seconds
443 estimated_seconds_left = files_left / fps
444 return self._human_seconds(estimated_seconds_left)
446 def clear(self) -> None:
447 if not self.enabled:
448 return
449 print(self.ERASE_CURRENT_LINE, end="", file=sys.stderr)
452def _print_parallel_result(
453 exec_result: ExecutionResult,
454 progress: Progress,
455 *,
456 unified_diff: bool,
457 show_successes: bool,
458 hide_generated: bool,
459 hide_blacklisted: bool,
460) -> None:
461 filename = exec_result.filename
462 result = exec_result.transform_result
464 if isinstance(result, TransformSkip):
465 # Skipped file, print message and don't write back since not changed.
466 if not (
467 (result.skip_reason is SkipReason.BLACKLISTED and hide_blacklisted)
468 or (result.skip_reason is SkipReason.GENERATED and hide_generated)
469 ):
470 progress.clear()
471 print(f"Codemodding {filename}", file=sys.stderr)
472 print_execution_result(result)
473 print(
474 f"Skipped codemodding {filename}: {result.skip_description}\n",
475 file=sys.stderr,
476 )
477 elif isinstance(result, TransformFailure):
478 # Print any exception, don't write the file back.
479 progress.clear()
480 print(f"Codemodding {filename}", file=sys.stderr)
481 print_execution_result(result)
482 print(f"Failed to codemod {filename}\n", file=sys.stderr)
483 elif isinstance(result, TransformSuccess):
484 if show_successes or result.warning_messages:
485 # Print any warnings, save the changes if there were any.
486 progress.clear()
487 print(f"Codemodding {filename}", file=sys.stderr)
488 print_execution_result(result)
489 print(
490 f"Successfully codemodded {filename}"
491 + (" with warnings\n" if result.warning_messages else "\n"),
492 file=sys.stderr,
493 )
495 # In unified diff mode, the code is a diff we must print.
496 if unified_diff and result.code:
497 print(result.code)
500@dataclass(frozen=True)
501class ParallelTransformResult:
502 """
503 The result of running
504 :func:`~libcst.codemod.parallel_exec_transform_with_prettyprint` against
505 a series of files. This is a simple summary, with counts for number of
506 successfully codemodded files, number of files that we failed to codemod,
507 number of warnings generated when running the codemod across the files, and
508 the number of files that we skipped when running the codemod.
509 """
511 #: Number of files that we successfully transformed.
512 successes: int
513 #: Number of files that we failed to transform.
514 failures: int
515 #: Number of warnings generated when running transform across files.
516 warnings: int
517 #: Number of files skipped because they were blacklisted, generated
518 #: or the codemod requested to skip.
519 skips: int
522def parallel_exec_transform_with_prettyprint( # noqa: C901
523 transform: Union[Codemod, Type[Codemod]],
524 files: Sequence[str],
525 *,
526 jobs: Optional[int] = None,
527 unified_diff: Optional[int] = None,
528 include_generated: bool = False,
529 generated_code_marker: str = _DEFAULT_GENERATED_CODE_MARKER,
530 format_code: bool = False,
531 formatter_args: Sequence[str] = (),
532 show_successes: bool = False,
533 hide_generated: bool = False,
534 hide_blacklisted: bool = False,
535 hide_progress: bool = False,
536 blacklist_patterns: Sequence[str] = (),
537 python_version: Optional[str] = None,
538 repo_root: Optional[str] = None,
539 codemod_args: Optional[Dict[str, object]] = None,
540) -> ParallelTransformResult:
541 """
542 Given a list of files and a codemod we should apply to them, fork and apply the
543 codemod in parallel to all of the files, including any configured formatter. The
544 ``jobs`` parameter controls the maximum number of in-flight transforms, and needs to
545 be at least 1. If not included, the number of jobs will automatically be set to the
546 number of CPU cores. If ``unified_diff`` is set to a number, changes to files will
547 be printed to stdout with ``unified_diff`` lines of context. If it is set to
548 ``None`` or left out, files themselves will be updated with changes and formatting.
549 If a ``python_version`` is provided, then we will parse each source file using this
550 version. Otherwise, we will use the version of the currently executing python
551 binary.
553 A progress indicator as well as any generated warnings will be printed to stderr. To
554 supress the interactive progress indicator, set ``hide_progress`` to ``True``. Files
555 that include the generated code marker will be skipped unless the
556 ``include_generated`` parameter is set to ``True``. Similarly, files that match a
557 supplied blacklist of regex patterns will be skipped. Warnings for skipping both
558 blacklisted and generated files will be printed to stderr along with warnings
559 generated by the codemod unless ``hide_blacklisted`` and ``hide_generated`` are set
560 to ``True``. Files that were successfully codemodded will not be printed to stderr
561 unless ``show_successes`` is set to ``True``.
563 We take a :class:`~libcst.codemod._codemod.Codemod` class, or an instantiated
564 :class:`~libcst.codemod._codemod.Codemod`. In the former case, the codemod will be
565 instantiated for each file, with ``codemod_args`` passed in to the constructor.
566 Passing an already instantiated :class:`~libcst.codemod._codemod.Codemod` is
567 deprecated, because it leads to sharing of the
568 :class:`~libcst.codemod._codemod.Codemod` instance across files, which is a common
569 source of hard-to-track-down bugs when the :class:`~libcst.codemod._codemod.Codemod`
570 tracks its state on the instance.
571 """
573 if isinstance(transform, Codemod):
574 warn(
575 "Passing transformer instances to `parallel_exec_transform_with_prettyprint` "
576 "is deprecated and will break in a future version. "
577 "Please pass the transformer class instead.",
578 DeprecationWarning,
579 stacklevel=2,
580 )
582 # Ensure that we have no duplicates, otherwise we might get race conditions
583 # on write.
584 files = sorted({os.path.abspath(f) for f in files})
585 total = len(files)
586 progress = Progress(enabled=not hide_progress, total=total)
588 chunksize = 4
589 # Grab number of cores if we need to
590 jobs = min(
591 jobs if jobs is not None else cpu_count(),
592 (len(files) + chunksize - 1) // chunksize,
593 )
595 if jobs < 1:
596 raise ValueError("Must have at least one job to process!")
598 if total == 0:
599 return ParallelTransformResult(successes=0, failures=0, skips=0, warnings=0)
601 metadata_manager: Optional[FullRepoManager] = None
602 if repo_root is not None:
603 # Make sure if there is a root that we have the absolute path to it.
604 repo_root = os.path.abspath(repo_root)
605 # Spin up a full repo metadata manager so that we can provide metadata
606 # like type inference to individual forked processes.
607 print("Calculating full-repo metadata...", file=sys.stderr)
608 metadata_manager = FullRepoManager(
609 repo_root,
610 files,
611 transform.get_inherited_dependencies(),
612 )
613 metadata_manager.resolve_cache()
615 print("Executing codemod...", file=sys.stderr)
617 config = ExecutionConfig(
618 repo_root=repo_root,
619 unified_diff=unified_diff,
620 include_generated=include_generated,
621 generated_code_marker=generated_code_marker,
622 format_code=format_code,
623 formatter_args=formatter_args,
624 blacklist_patterns=blacklist_patterns,
625 python_version=python_version,
626 )
628 pool_impl: Callable[[], Executor]
629 if total == 1 or jobs == 1:
630 # Simple case, we should not pay for process overhead.
631 # Let's just use a dummy synchronous executor.
632 jobs = 1
633 pool_impl = DummyExecutor
634 elif getattr(sys, "_is_gil_enabled", lambda: True)(): # pyre-ignore[16]
635 from concurrent.futures import ProcessPoolExecutor
637 pool_impl = functools.partial(ProcessPoolExecutor, max_workers=jobs)
638 # Warm the parser, pre-fork.
639 parse_module(
640 "",
641 config=(
642 PartialParserConfig(python_version=python_version)
643 if python_version is not None
644 else PartialParserConfig()
645 ),
646 )
647 else:
648 from concurrent.futures import ThreadPoolExecutor
650 pool_impl = functools.partial(ThreadPoolExecutor, max_workers=jobs)
652 successes: int = 0
653 failures: int = 0
654 warnings: int = 0
655 skips: int = 0
656 original_scratch = (
657 deepcopy(transform.context.scratch) if isinstance(transform, Codemod) else {}
658 )
660 with pool_impl() as executor: # type: ignore
661 try:
662 futures = [
663 executor.submit(
664 _execute_transform,
665 transformer=transform,
666 filename=filename,
667 config=config,
668 original_scratch=original_scratch,
669 codemod_args=codemod_args,
670 repo_manager=metadata_manager,
671 )
672 for filename in files
673 ]
674 for future in as_completed(futures):
675 result = future.result()
676 # Print an execution result, keep track of failures
677 _print_parallel_result(
678 result,
679 progress,
680 unified_diff=bool(unified_diff),
681 show_successes=show_successes,
682 hide_generated=hide_generated,
683 hide_blacklisted=hide_blacklisted,
684 )
685 progress.print(successes + failures + skips)
687 if isinstance(result.transform_result, TransformFailure):
688 failures += 1
689 elif isinstance(result.transform_result, TransformSuccess):
690 successes += 1
691 elif isinstance(
692 result.transform_result, (TransformExit, TransformSkip)
693 ):
694 skips += 1
696 warnings += len(result.transform_result.warning_messages)
697 finally:
698 progress.clear()
700 # Return whether there was one or more failure.
701 return ParallelTransformResult(
702 successes=successes, failures=failures, skips=skips, warnings=warnings
703 )