Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/extractors/command.py: 39%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

64 statements  

1import shlex 

2import subprocess 

3from pathlib import Path 

4from typing import TYPE_CHECKING, Optional, Union 

5 

6from structlog import get_logger 

7 

8from unblob.models import DirectoryExtractor, ExtractError, Extractor 

9from unblob.report import ( 

10 ExtractCommandFailedReport, 

11 ExtractorDependencyNotFoundReport, 

12 ExtractorTimedOut, 

13) 

14 

15if TYPE_CHECKING: 

16 import io 

17 

18# value that is high enough not to block long running execution such as extraction of large 

19# disk images, but small enough to make sure unblob finish its execution at some point. 

20COMMAND_TIMEOUT = 12 * 60 * 60 

21 

22logger = get_logger() 

23 

24 

25class Command(Extractor): 

26 def __init__(self, executable, *args, stdout: Optional[str] = None): 

27 """Extract using external extractor and template parameters. 

28 

29 Has extra support for extractors (notably 7z), which can not be directed to output to a file, but can extract to stdout: 

30 When the parameter `stdout` is set, the command's stdout will be redirected to `outdir / stdout`. 

31 """ 

32 self._executable = executable 

33 self._command_template = [executable, *args] 

34 self._stdout = stdout 

35 

36 def extract(self, inpath: Path, outdir: Path): 

37 cmd = self._make_extract_command(inpath, outdir) 

38 command = shlex.join(cmd) 

39 logger.debug("Running extract command", command=command) 

40 stdout_file: Union[int, io.FileIO] = subprocess.PIPE 

41 

42 def no_op(): 

43 pass 

44 

45 cleanup = no_op 

46 

47 try: 

48 if self._stdout: 

49 stdout_file = (outdir / self._stdout).open(mode="wb", buffering=0) 

50 cleanup = stdout_file.close 

51 

52 res = subprocess.run( 

53 cmd, 

54 stdout=stdout_file, 

55 stderr=subprocess.PIPE, 

56 timeout=COMMAND_TIMEOUT, 

57 check=False, 

58 ) 

59 if res.returncode != 0: 

60 error_report = ExtractCommandFailedReport( 

61 command=command, 

62 stdout=res.stdout, 

63 stderr=res.stderr, 

64 exit_code=res.returncode, 

65 ) 

66 

67 logger.error("Extract command failed", **error_report.asdict()) 

68 raise ExtractError(error_report) 

69 except FileNotFoundError: 

70 error_report = ExtractorDependencyNotFoundReport( 

71 dependencies=self.get_dependencies() 

72 ) 

73 logger.error( 

74 "Can't run extract command. Is the extractor installed?", 

75 **error_report.asdict(), 

76 ) 

77 raise ExtractError(error_report) from None 

78 except subprocess.TimeoutExpired as e: 

79 error_report = ExtractorTimedOut(cmd=e.cmd, timeout=e.timeout) 

80 logger.error( 

81 "Extract command timed out.", 

82 **error_report.asdict(), 

83 ) 

84 raise ExtractError(error_report) from None 

85 finally: 

86 cleanup() 

87 

88 def _make_extract_command(self, inpath: Path, outdir: Path): 

89 replacements = dict(inpath=inpath, outdir=outdir) 

90 

91 args = [] 

92 for t in self._command_template: 

93 try: 

94 args.append(t.format(**replacements)) 

95 except KeyError as k: 

96 raise InvalidCommandTemplate("Invalid template placeholder", t) from k 

97 except ValueError as v: 

98 raise InvalidCommandTemplate("The template is malformed", t) from v 

99 return args 

100 

101 def get_dependencies(self) -> list[str]: 

102 return [self._executable] 

103 

104 

105class MultiFileCommand(DirectoryExtractor): 

106 def __init__(self, *args, **kwargs): 

107 self._command = Command(*args, **kwargs) 

108 

109 def extract(self, paths: list[Path], outdir: Path): 

110 return self._command.extract(paths[0], outdir) 

111 

112 def get_dependencies(self) -> list[str]: 

113 return self._command.get_dependencies() 

114 

115 

116class InvalidCommandTemplate(ValueError): 

117 pass