Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/extractor.py: 21%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

91 statements  

1"""File extraction related functions.""" 

2 

3import errno 

4import os 

5from pathlib import Path 

6from typing import Union 

7 

8from structlog import get_logger 

9 

10from .file_utils import carve, is_safe_path 

11from .models import Chunk, File, PaddingChunk, TaskResult, UnknownChunk, ValidChunk 

12from .report import MaliciousSymlinkRemoved 

13 

14logger = get_logger() 

15 

16FILE_PERMISSION_MASK = 0o644 

17DIR_PERMISSION_MASK = 0o775 

18 

19 

20def carve_chunk_to_file(carve_path: Path, file: File, chunk: Chunk): 

21 """Extract valid chunk to a file, which we then pass to another tool to extract it.""" 

22 logger.debug("Carving chunk", path=carve_path) 

23 carve(carve_path, file, chunk.start_offset, chunk.size) 

24 

25 

26def fix_permission(path: Path): 

27 if path.is_symlink(): 

28 return 

29 

30 if not path.exists(): 

31 return 

32 

33 mode = path.stat().st_mode 

34 

35 if path.is_file(): 

36 mode |= FILE_PERMISSION_MASK 

37 elif path.is_dir(): 

38 mode |= DIR_PERMISSION_MASK 

39 

40 path.chmod(mode) 

41 

42 

43def is_recursive_link(path: Path) -> bool: 

44 try: 

45 path.resolve() 

46 except RuntimeError: 

47 return True 

48 return False 

49 

50 

51def fix_symlink(path: Path, outdir: Path, task_result: TaskResult) -> Path: 

52 """Rewrites absolute symlinks to point within the extraction directory (outdir). 

53 

54 If it's not a relative symlink it is either removed it it attempts 

55 to traverse outside of the extraction directory or rewritten to be 

56 fully portable (no mention of the extraction directory in the link 

57 value). 

58 """ 

59 if is_recursive_link(path): 

60 logger.error("Symlink loop identified, removing", path=path) 

61 error_report = MaliciousSymlinkRemoved( 

62 link=path.as_posix(), target=path.readlink().as_posix() 

63 ) 

64 task_result.add_report(error_report) 

65 path.unlink() 

66 return path 

67 

68 raw_target = os.readlink(path) # noqa: PTH115 

69 if not raw_target: 

70 logger.error("Symlink with empty target, removing.") 

71 path.unlink() 

72 return path 

73 

74 target = Path(raw_target) 

75 if target.is_absolute(): 

76 target = Path(target.as_posix().lstrip("/")) 

77 else: 

78 target = path.resolve() 

79 

80 safe = is_safe_path(outdir, target) 

81 

82 if not safe: 

83 logger.error("Path traversal attempt through symlink, removing", target=target) 

84 error_report = MaliciousSymlinkRemoved( 

85 link=path.as_posix(), target=target.as_posix() 

86 ) 

87 task_result.add_report(error_report) 

88 path.unlink() 

89 else: 

90 relative_target = os.path.relpath(outdir.joinpath(target), start=path.parent) 

91 path.unlink() 

92 path.symlink_to(relative_target) 

93 return path 

94 

95 

96def fix_extracted_directory(outdir: Path, task_result: TaskResult): 

97 def _fix_extracted_directory(directory: Path): 

98 if not directory.exists(): 

99 return 

100 for path in directory.iterdir(): 

101 try: 

102 fix_permission(path) 

103 if path.is_symlink(): 

104 fix_symlink(path, outdir, task_result) 

105 continue 

106 if path.is_dir(): 

107 _fix_extracted_directory(path) 

108 except OSError as e: 

109 if e.errno == errno.ENAMETOOLONG: 

110 continue 

111 raise e from None 

112 

113 fix_permission(outdir) 

114 _fix_extracted_directory(outdir) 

115 

116 

117def carve_unknown_chunk( 

118 extract_dir: Path, file: File, chunk: Union[UnknownChunk, PaddingChunk] 

119) -> Path: 

120 extension = "unknown" 

121 if isinstance(chunk, PaddingChunk): 

122 extension = "padding" 

123 

124 filename = f"{chunk.start_offset}-{chunk.end_offset}.{extension}" 

125 carve_path = extract_dir / filename 

126 logger.info("Extracting unknown chunk", path=carve_path, chunk=chunk) 

127 carve_chunk_to_file(carve_path, file, chunk) 

128 return carve_path 

129 

130 

131def carve_valid_chunk(extract_dir: Path, file: File, chunk: ValidChunk) -> Path: 

132 filename = f"{chunk.start_offset}-{chunk.end_offset}.{chunk.handler.NAME}" 

133 carve_path = extract_dir / filename 

134 logger.info("Extracting valid chunk", path=carve_path, chunk=chunk) 

135 carve_chunk_to_file(carve_path, file, chunk) 

136 return carve_path