Coverage for /pythoncovmergedfiles/medio/medio/src/fuzz_http.py: 57%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1###### Coverage stub
2import atexit
3import coverage
4cov = coverage.coverage(data_file='.coverage', cover_pylib=True)
5cov.start()
6# Register an exist handler that will print coverage
7def exit_handler():
8 cov.stop()
9 cov.save()
10atexit.register(exit_handler)
11####### End of coverage stub
12#!/usr/bin/python3
13# Copyright 2022 Google LLC
14#
15# Licensed under the Apache License, Version 2.0 (the "License");
16# you may not use this file except in compliance with the License.
17# You may obtain a copy of the License at
18#
19# http://www.apache.org/licenses/LICENSE-2.0
20#
21# Unless required by applicable law or agreed to in writing, software
22# distributed under the License is distributed on an "AS IS" BASIS,
23# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
24# See the License for the specific language governing permissions and
25# limitations under the License.
27import os
28import sys
29import atheris
31from websocket._exceptions import WebSocketException
32from websocket._http import read_headers, _tunnel
35class FuzzMock:
36 """FuzzMock class inspired by SockMock"""
37 def __init__(self, packet):
38 self.data = [packet]
39 self.sent = []
41 def gettimeout(self):
42 return None
44 def recv(self, bufsize):
45 if self.data:
46 e = self.data.pop(0)
47 if isinstance(e, Exception):
48 raise e
49 if len(e) > bufsize:
50 self.data.insert(0, e[bufsize:])
51 return e[:bufsize]
53 def send(self, data):
54 self.sent.append(data)
55 return len(data)
58def TestOneInput(data):
59 try:
60 fdp = atheris.FuzzedDataProvider(data)
61 # Create mock socket with fuzzer-derived data.
62 mock_sock = FuzzMock(fdp.ConsumeBytes(512))
63 _tunnel(
64 mock_sock,
65 fdp.ConsumeUnicodeNoSurrogates(200),
66 fdp.ConsumeIntInRange(1, 1000),
67 (
68 fdp.ConsumeUnicodeNoSurrogates(100),
69 fdp.ConsumeUnicodeNoSurrogates(100)
70 )
71 )
72 except WebSocketException:
73 pass
74 except UnicodeDecodeError:
75 pass
78def main():
79 atheris.instrument_all()
80 atheris.Setup(sys.argv, TestOneInput, enable_python_coverage=True)
81 atheris.Fuzz()
84if __name__ == "__main__":
85 main()