1import shlex
2import subprocess
3from pathlib import Path
4from typing import TYPE_CHECKING, Optional, Union
5
6from structlog import get_logger
7
8from unblob.models import DirectoryExtractor, ExtractError, Extractor
9from unblob.report import (
10 ExtractCommandFailedReport,
11 ExtractorDependencyNotFoundReport,
12 ExtractorTimedOut,
13)
14
15if TYPE_CHECKING:
16 import io
17
18# value that is high enough not to block long running execution such as extraction of large
19# disk images, but small enough to make sure unblob finish its execution at some point.
20COMMAND_TIMEOUT = 12 * 60 * 60
21
22logger = get_logger()
23
24
25class Command(Extractor):
26 def __init__(self, executable, *args, stdout: Optional[str] = None):
27 """Extract using external extractor and template parameters.
28
29 Has extra support for extractors (notably 7z), which can not be directed to output to a file, but can extract to stdout:
30 When the parameter `stdout` is set, the command's stdout will be redirected to `outdir / stdout`.
31 """
32 self._executable = executable
33 self._command_template = [executable, *args]
34 self._stdout = stdout
35
36 def extract(self, inpath: Path, outdir: Path):
37 cmd = self._make_extract_command(inpath, outdir)
38 command = shlex.join(cmd)
39 logger.debug("Running extract command", command=command)
40 stdout_file: Union[int, io.FileIO] = subprocess.PIPE
41
42 def no_op():
43 pass
44
45 cleanup = no_op
46
47 try:
48 if self._stdout:
49 stdout_file = (outdir / self._stdout).open(mode="wb", buffering=0)
50 cleanup = stdout_file.close
51
52 res = subprocess.run(
53 cmd,
54 stdout=stdout_file,
55 stderr=subprocess.PIPE,
56 timeout=COMMAND_TIMEOUT,
57 check=False,
58 )
59 if res.returncode != 0:
60 error_report = ExtractCommandFailedReport(
61 command=command,
62 stdout=res.stdout,
63 stderr=res.stderr,
64 exit_code=res.returncode,
65 )
66
67 logger.error("Extract command failed", **error_report.asdict())
68 raise ExtractError(error_report)
69 except FileNotFoundError:
70 error_report = ExtractorDependencyNotFoundReport(
71 dependencies=self.get_dependencies()
72 )
73 logger.error(
74 "Can't run extract command. Is the extractor installed?",
75 **error_report.asdict(),
76 )
77 raise ExtractError(error_report) from None
78 except subprocess.TimeoutExpired as e:
79 error_report = ExtractorTimedOut(cmd=e.cmd, timeout=e.timeout)
80 logger.error(
81 "Extract command timed out.",
82 **error_report.asdict(),
83 )
84 raise ExtractError(error_report) from None
85 finally:
86 cleanup()
87
88 def _make_extract_command(self, inpath: Path, outdir: Path):
89 replacements = dict(inpath=inpath, outdir=outdir)
90
91 args = []
92 for t in self._command_template:
93 try:
94 args.append(t.format(**replacements))
95 except KeyError as k:
96 raise InvalidCommandTemplate("Invalid template placeholder", t) from k
97 except ValueError as v:
98 raise InvalidCommandTemplate("The template is malformed", t) from v
99 return args
100
101 def get_dependencies(self) -> list[str]:
102 return [self._executable]
103
104
105class MultiFileCommand(DirectoryExtractor):
106 def __init__(self, *args, **kwargs):
107 self._command = Command(*args, **kwargs)
108
109 def extract(self, paths: list[Path], outdir: Path):
110 return self._command.extract(paths[0], outdir)
111
112 def get_dependencies(self) -> list[str]:
113 return self._command.get_dependencies()
114
115
116class InvalidCommandTemplate(ValueError):
117 pass