Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/extractor.py: 20%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

90 statements  

1"""File extraction related functions.""" 

2 

3import errno 

4import os 

5from pathlib import Path 

6 

7from structlog import get_logger 

8 

9from .file_utils import carve, is_safe_path 

10from .models import Chunk, File, PaddingChunk, TaskResult, UnknownChunk, ValidChunk 

11from .report import MaliciousSymlinkRemoved 

12 

13logger = get_logger() 

14 

15FILE_PERMISSION_MASK = 0o644 

16DIR_PERMISSION_MASK = 0o775 

17 

18 

19def carve_chunk_to_file(carve_path: Path, file: File, chunk: Chunk): 

20 """Extract valid chunk to a file, which we then pass to another tool to extract it.""" 

21 logger.debug("Carving chunk", path=carve_path) 

22 carve(carve_path, file, chunk.start_offset, chunk.size) 

23 

24 

25def fix_permission(path: Path): 

26 if path.is_symlink(): 

27 return 

28 

29 if not path.exists(): 

30 return 

31 

32 mode = path.stat().st_mode 

33 

34 if path.is_file(): 

35 mode |= FILE_PERMISSION_MASK 

36 elif path.is_dir(): 

37 mode |= DIR_PERMISSION_MASK 

38 

39 path.chmod(mode) 

40 

41 

42def is_recursive_link(path: Path) -> bool: 

43 try: 

44 path.resolve() 

45 except RuntimeError: 

46 return True 

47 return False 

48 

49 

50def fix_symlink(path: Path, outdir: Path, task_result: TaskResult) -> Path: 

51 """Rewrites absolute symlinks to point within the extraction directory (outdir). 

52 

53 If it's not a relative symlink it is either removed it it attempts 

54 to traverse outside of the extraction directory or rewritten to be 

55 fully portable (no mention of the extraction directory in the link 

56 value). 

57 """ 

58 if is_recursive_link(path): 

59 logger.error("Symlink loop identified, removing", path=path) 

60 error_report = MaliciousSymlinkRemoved( 

61 link=path.as_posix(), target=path.readlink().as_posix() 

62 ) 

63 task_result.add_report(error_report) 

64 path.unlink() 

65 return path 

66 

67 raw_target = os.readlink(path) # noqa: PTH115 

68 if not raw_target: 

69 logger.error("Symlink with empty target, removing.") 

70 path.unlink() 

71 return path 

72 

73 target = Path(raw_target) 

74 if target.is_absolute(): 

75 target = Path(target.as_posix().lstrip("/")) 

76 else: 

77 target = path.resolve() 

78 

79 safe = is_safe_path(outdir, target) 

80 

81 if not safe: 

82 logger.error("Path traversal attempt through symlink, removing", target=target) 

83 error_report = MaliciousSymlinkRemoved( 

84 link=path.as_posix(), target=target.as_posix() 

85 ) 

86 task_result.add_report(error_report) 

87 path.unlink() 

88 else: 

89 relative_target = os.path.relpath(outdir.joinpath(target), start=path.parent) 

90 path.unlink() 

91 path.symlink_to(relative_target) 

92 return path 

93 

94 

95def fix_extracted_directory(outdir: Path, task_result: TaskResult): 

96 def _fix_extracted_directory(directory: Path): 

97 if not directory.exists(): 

98 return 

99 for path in directory.iterdir(): 

100 try: 

101 fix_permission(path) 

102 if path.is_symlink(): 

103 fix_symlink(path, outdir, task_result) 

104 continue 

105 if path.is_dir(): 

106 _fix_extracted_directory(path) 

107 except OSError as e: 

108 if e.errno == errno.ENAMETOOLONG: 

109 continue 

110 raise e from None 

111 

112 fix_permission(outdir) 

113 _fix_extracted_directory(outdir) 

114 

115 

116def carve_unknown_chunk( 

117 extract_dir: Path, file: File, chunk: UnknownChunk | PaddingChunk 

118) -> Path: 

119 extension = "unknown" 

120 if isinstance(chunk, PaddingChunk): 

121 extension = "padding" 

122 

123 filename = f"{chunk.start_offset}-{chunk.end_offset}.{extension}" 

124 carve_path = extract_dir / filename 

125 logger.info("Extracting unknown chunk", path=carve_path, chunk=chunk) 

126 carve_chunk_to_file(carve_path, file, chunk) 

127 return carve_path 

128 

129 

130def carve_valid_chunk(extract_dir: Path, file: File, chunk: ValidChunk) -> Path: 

131 filename = f"{chunk.start_offset}-{chunk.end_offset}.{chunk.handler.NAME}" 

132 carve_path = extract_dir / filename 

133 logger.info("Extracting valid chunk", path=carve_path, chunk=chunk) 

134 carve_chunk_to_file(carve_path, file, chunk) 

135 return carve_path