Coverage for /pythoncovmergedfiles/medio/medio/src/python-multipart/fuzz/fuzz_multipart_parser.py: 59%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1###### Coverage stub
2import atexit
3import coverage
4cov = coverage.coverage(data_file='.coverage', cover_pylib=True)
5cov.start()
6# Register an exist handler that will print coverage
7def exit_handler():
8 cov.stop()
9 cov.save()
10atexit.register(exit_handler)
11####### End of coverage stub
12import logging
13import sys
15logging.disable(logging.CRITICAL)
17import atheris
18from helpers import EnhancedDataProvider
20with atheris.instrument_imports():
21 from python_multipart.exceptions import MultipartParseError
22 from python_multipart.multipart import MultipartParser
25def _noop() -> None:
26 pass
29def _noop_data(data: bytes, start: int, end: int) -> None:
30 pass
33def _make_parser(boundary: bytes, max_size: float = float("inf")) -> MultipartParser:
34 return MultipartParser(
35 boundary,
36 callbacks={
37 "on_part_begin": _noop,
38 "on_part_data": _noop_data,
39 "on_part_end": _noop,
40 "on_header_begin": _noop,
41 "on_header_field": _noop_data,
42 "on_header_value": _noop_data,
43 "on_header_end": _noop,
44 "on_headers_finished": _noop,
45 "on_end": _noop,
46 },
47 max_size=max_size,
48 )
51def fuzz_single_write(fdp: EnhancedDataProvider) -> None:
52 boundary_len = fdp.ConsumeIntInRange(1, max(1, min(70, fdp.remaining_bytes() // 2)))
53 boundary = fdp.ConsumeBytes(boundary_len)
54 # Drop CR/LF to avoid ValueError from MultipartParser boundary validation.
55 boundary = boundary.replace(b"\r", b"-").replace(b"\n", b"-").rstrip(b" \t") or b"B"
57 parser = _make_parser(boundary)
58 parser.write(fdp.ConsumeRandomBytes())
59 parser.finalize()
62def fuzz_chunked_write(fdp: EnhancedDataProvider) -> None:
63 boundary_len = fdp.ConsumeIntInRange(1, max(1, min(70, fdp.remaining_bytes() // 3)))
64 boundary = fdp.ConsumeBytes(boundary_len)
65 boundary = boundary.replace(b"\r", b"-").replace(b"\n", b"-").rstrip(b" \t") or b"B"
67 num_chunks = fdp.ConsumeIntInRange(1, 16)
68 parser = _make_parser(boundary)
69 body = fdp.ConsumeRandomBytes()
70 if body:
71 chunk_size = max(1, (len(body) + num_chunks - 1) // num_chunks)
72 for i in range(0, len(body), chunk_size):
73 parser.write(body[i : i + chunk_size])
74 parser.finalize()
77def fuzz_max_size(fdp: EnhancedDataProvider) -> None:
78 boundary_len = fdp.ConsumeIntInRange(1, max(1, min(70, fdp.remaining_bytes() // 2)))
79 boundary = fdp.ConsumeBytes(boundary_len)
80 boundary = boundary.replace(b"\r", b"-").replace(b"\n", b"-").rstrip(b" \t") or b"B"
82 max_size = fdp.ConsumeIntInRange(1, 2048)
83 parser = _make_parser(boundary, max_size=max_size)
84 parser.write(fdp.ConsumeRandomBytes())
85 parser.finalize()
88def fuzz_invalid_boundary_constructor(fdp: EnhancedDataProvider) -> None:
89 boundary_len = fdp.ConsumeIntInRange(0, min(70, fdp.remaining_bytes()))
90 boundary = fdp.ConsumeBytes(boundary_len)
91 try:
92 _make_parser(boundary)
93 except ValueError:
94 return
97def TestOneInput(data: bytes) -> None:
98 fdp = EnhancedDataProvider(data)
99 targets = [fuzz_single_write, fuzz_chunked_write, fuzz_max_size, fuzz_invalid_boundary_constructor]
100 target = fdp.PickValueInList(targets)
102 try:
103 target(fdp)
104 except MultipartParseError:
105 return
108def main():
109 atheris.Setup(sys.argv, TestOneInput)
110 atheris.Fuzz()
113if __name__ == "__main__":
114 main()