Coverage for /pythoncovmergedfiles/medio/medio/src/fuzz_h11.py: 62%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1###### Coverage stub
2import atexit
3import coverage
4cov = coverage.coverage(data_file='.coverage', cover_pylib=True)
5cov.start()
6# Register an exist handler that will print coverage
7def exit_handler():
8 cov.stop()
9 cov.save()
10atexit.register(exit_handler)
11####### End of coverage stub
12#!/usr/bin/python3
13# Copyright 2023 Google LLC
14#
15# Licensed under the Apache License, Version 2.0 (the "License");
16# you may not use this file except in compliance with the License.
17# You may obtain a copy of the License at
18#
19# http://www.apache.org/licenses/LICENSE-2.0
20#
21# Unless required by applicable law or agreed to in writing, software
22# distributed under the License is distributed on an "AS IS" BASIS,
23# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
24# See the License for the specific language governing permissions and
25# limitations under the License.
26import sys
27import atheris
29import h11
32def fuzz_headers(data):
33 fdp = atheris.FuzzedDataProvider(data)
34 fuzz_headers = [(fdp.ConsumeBytes(32), fdp.ConsumeBytes(1024)),
35 (fdp.ConsumeBytes(32), fdp.ConsumeBytes(1024))]
36 try:
37 normalized_headers = h11._headers.normalize_and_validate(fuzz_headers)
38 get_comma_header(normalized_headers, b'connection')
39 set_comma_header(normalized_headers, fdp.ConsumeBytes(64))
40 except (h11._util.ProtocolError):
41 pass
43 try:
44 h11._headers.has_expect_100_continue(
45 h11._events.Request(method='GET',
46 target='/',
47 headers=fuzz_headers,
48 http_version='1.0'))
49 except (h11._util.ProtocolError):
50 pass
53def fuzz_receivebuffer(data):
54 fdp = atheris.FuzzedDataProvider(data)
55 rec_buf = h11._receivebuffer.ReceiveBuffer()
56 for i in range(5):
57 rec_buf += fdp.ConsumeBytes(124)
58 rec_buf.maybe_extract_at_most(fdp.ConsumeIntInRange(1, 100))
59 rec_buf.maybe_extract_next_line()
60 rec_buf.maybe_extract_lines()
63def fuzz_connection(data):
64 fdp = atheris.FuzzedDataProvider(data)
65 conn = h11.Connection(our_role=h11.CLIENT)
66 event = h11.Request(
67 method="GET",
68 target="/get",
69 headers=[("Host", "127.0.0.1"), ("Connection", "close")],
70 )
72 conn.send(event)
73 conn.send(h11.EndOfMessage())
74 conn.next_event()
75 conn.receive_data(fdp.ConsumeBytes(1024))
77 try:
78 event2 = h11.Request(
79 method=fdp.ConsumeUnicodeNoSurrogates(5),
80 target=fdp.ConsumeUnicodeNoSurrogates(124),
81 headers=[(fdp.ConsumeUnicodeNoSurrogates(5),
82 fdp.ConsumeUnicodeNoSurrogates(124)),
83 (fdp.ConsumeUnicodeNoSurrogates(124),
84 fdp.ConsumeUnicodeNoSurrogates(124))],
85 )
87 conn.send(event2)
88 conn.send(h11.EndOfMessage())
89 conn.next_event()
90 conn.receive_data(fdp.ConsumeBytes(1024))
91 except (h11._util.ProtocolError):
92 pass
95def TestOneInput(data):
96 fuzz_headers(data)
97 fuzz_receivebuffer(data)
98 fuzz_connection(data)
101def main():
102 atheris.instrument_all()
103 atheris.Setup(sys.argv, TestOneInput)
104 atheris.Fuzz()
107if __name__ == "__main__":
108 main()