Coverage for /pythoncovmergedfiles/medio/medio/src/python-multipart/fuzz/fuzz_multipart_parser.py: 59%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

70 statements  

1###### Coverage stub 

2import atexit 

3import coverage 

4cov = coverage.coverage(data_file='.coverage', cover_pylib=True) 

5cov.start() 

6# Register an exist handler that will print coverage 

7def exit_handler(): 

8 cov.stop() 

9 cov.save() 

10atexit.register(exit_handler) 

11####### End of coverage stub 

12import logging 

13import sys 

14 

15logging.disable(logging.CRITICAL) 

16 

17import atheris 

18from helpers import EnhancedDataProvider 

19 

20with atheris.instrument_imports(): 

21 from python_multipart.exceptions import MultipartParseError 

22 from python_multipart.multipart import MultipartParser 

23 

24 

25def _noop() -> None: 

26 pass 

27 

28 

29def _noop_data(data: bytes, start: int, end: int) -> None: 

30 pass 

31 

32 

33def _make_parser(boundary: bytes, max_size: float = float("inf")) -> MultipartParser: 

34 return MultipartParser( 

35 boundary, 

36 callbacks={ 

37 "on_part_begin": _noop, 

38 "on_part_data": _noop_data, 

39 "on_part_end": _noop, 

40 "on_header_begin": _noop, 

41 "on_header_field": _noop_data, 

42 "on_header_value": _noop_data, 

43 "on_header_end": _noop, 

44 "on_headers_finished": _noop, 

45 "on_end": _noop, 

46 }, 

47 max_size=max_size, 

48 ) 

49 

50 

51def fuzz_single_write(fdp: EnhancedDataProvider) -> None: 

52 boundary_len = fdp.ConsumeIntInRange(1, max(1, min(70, fdp.remaining_bytes() // 2))) 

53 boundary = fdp.ConsumeBytes(boundary_len) 

54 # Drop CR/LF to avoid ValueError from MultipartParser boundary validation. 

55 boundary = boundary.replace(b"\r", b"-").replace(b"\n", b"-").rstrip(b" \t") or b"B" 

56 

57 parser = _make_parser(boundary) 

58 parser.write(fdp.ConsumeRandomBytes()) 

59 parser.finalize() 

60 

61 

62def fuzz_chunked_write(fdp: EnhancedDataProvider) -> None: 

63 boundary_len = fdp.ConsumeIntInRange(1, max(1, min(70, fdp.remaining_bytes() // 3))) 

64 boundary = fdp.ConsumeBytes(boundary_len) 

65 boundary = boundary.replace(b"\r", b"-").replace(b"\n", b"-").rstrip(b" \t") or b"B" 

66 

67 num_chunks = fdp.ConsumeIntInRange(1, 16) 

68 parser = _make_parser(boundary) 

69 body = fdp.ConsumeRandomBytes() 

70 if body: 

71 chunk_size = max(1, (len(body) + num_chunks - 1) // num_chunks) 

72 for i in range(0, len(body), chunk_size): 

73 parser.write(body[i : i + chunk_size]) 

74 parser.finalize() 

75 

76 

77def fuzz_max_size(fdp: EnhancedDataProvider) -> None: 

78 boundary_len = fdp.ConsumeIntInRange(1, max(1, min(70, fdp.remaining_bytes() // 2))) 

79 boundary = fdp.ConsumeBytes(boundary_len) 

80 boundary = boundary.replace(b"\r", b"-").replace(b"\n", b"-").rstrip(b" \t") or b"B" 

81 

82 max_size = fdp.ConsumeIntInRange(1, 2048) 

83 parser = _make_parser(boundary, max_size=max_size) 

84 parser.write(fdp.ConsumeRandomBytes()) 

85 parser.finalize() 

86 

87 

88def fuzz_invalid_boundary_constructor(fdp: EnhancedDataProvider) -> None: 

89 boundary_len = fdp.ConsumeIntInRange(0, min(70, fdp.remaining_bytes())) 

90 boundary = fdp.ConsumeBytes(boundary_len) 

91 try: 

92 _make_parser(boundary) 

93 except ValueError: 

94 return 

95 

96 

97def TestOneInput(data: bytes) -> None: 

98 fdp = EnhancedDataProvider(data) 

99 targets = [fuzz_single_write, fuzz_chunked_write, fuzz_max_size, fuzz_invalid_boundary_constructor] 

100 target = fdp.PickValueInList(targets) 

101 

102 try: 

103 target(fdp) 

104 except MultipartParseError: 

105 return 

106 

107 

108def main(): 

109 atheris.Setup(sys.argv, TestOneInput) 

110 atheris.Fuzz() 

111 

112 

113if __name__ == "__main__": 

114 main()