Coverage for /pythoncovmergedfiles/medio/medio/src/fuzz_h11.py: 62%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

58 statements  

1###### Coverage stub 

2import atexit 

3import coverage 

4cov = coverage.coverage(data_file='.coverage', cover_pylib=True) 

5cov.start() 

6# Register an exist handler that will print coverage 

7def exit_handler(): 

8 cov.stop() 

9 cov.save() 

10atexit.register(exit_handler) 

11####### End of coverage stub 

12#!/usr/bin/python3 

13# Copyright 2023 Google LLC 

14# 

15# Licensed under the Apache License, Version 2.0 (the "License"); 

16# you may not use this file except in compliance with the License. 

17# You may obtain a copy of the License at 

18# 

19# http://www.apache.org/licenses/LICENSE-2.0 

20# 

21# Unless required by applicable law or agreed to in writing, software 

22# distributed under the License is distributed on an "AS IS" BASIS, 

23# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

24# See the License for the specific language governing permissions and 

25# limitations under the License. 

26import sys 

27import atheris 

28 

29import h11 

30 

31 

32def fuzz_headers(data): 

33 fdp = atheris.FuzzedDataProvider(data) 

34 fuzz_headers = [(fdp.ConsumeBytes(32), fdp.ConsumeBytes(1024)), 

35 (fdp.ConsumeBytes(32), fdp.ConsumeBytes(1024))] 

36 try: 

37 normalized_headers = h11._headers.normalize_and_validate(fuzz_headers) 

38 get_comma_header(normalized_headers, b'connection') 

39 set_comma_header(normalized_headers, fdp.ConsumeBytes(64)) 

40 except (h11._util.ProtocolError): 

41 pass 

42 

43 try: 

44 h11._headers.has_expect_100_continue( 

45 h11._events.Request(method='GET', 

46 target='/', 

47 headers=fuzz_headers, 

48 http_version='1.0')) 

49 except (h11._util.ProtocolError): 

50 pass 

51 

52 

53def fuzz_receivebuffer(data): 

54 fdp = atheris.FuzzedDataProvider(data) 

55 rec_buf = h11._receivebuffer.ReceiveBuffer() 

56 for i in range(5): 

57 rec_buf += fdp.ConsumeBytes(124) 

58 rec_buf.maybe_extract_at_most(fdp.ConsumeIntInRange(1, 100)) 

59 rec_buf.maybe_extract_next_line() 

60 rec_buf.maybe_extract_lines() 

61 

62 

63def fuzz_connection(data): 

64 fdp = atheris.FuzzedDataProvider(data) 

65 conn = h11.Connection(our_role=h11.CLIENT) 

66 event = h11.Request( 

67 method="GET", 

68 target="/get", 

69 headers=[("Host", "127.0.0.1"), ("Connection", "close")], 

70 ) 

71 

72 conn.send(event) 

73 conn.send(h11.EndOfMessage()) 

74 conn.next_event() 

75 conn.receive_data(fdp.ConsumeBytes(1024)) 

76 

77 try: 

78 event2 = h11.Request( 

79 method=fdp.ConsumeUnicodeNoSurrogates(5), 

80 target=fdp.ConsumeUnicodeNoSurrogates(124), 

81 headers=[(fdp.ConsumeUnicodeNoSurrogates(5), 

82 fdp.ConsumeUnicodeNoSurrogates(124)), 

83 (fdp.ConsumeUnicodeNoSurrogates(124), 

84 fdp.ConsumeUnicodeNoSurrogates(124))], 

85 ) 

86 

87 conn.send(event2) 

88 conn.send(h11.EndOfMessage()) 

89 conn.next_event() 

90 conn.receive_data(fdp.ConsumeBytes(1024)) 

91 except (h11._util.ProtocolError): 

92 pass 

93 

94 

95def TestOneInput(data): 

96 fuzz_headers(data) 

97 fuzz_receivebuffer(data) 

98 fuzz_connection(data) 

99 

100 

101def main(): 

102 atheris.instrument_all() 

103 atheris.Setup(sys.argv, TestOneInput) 

104 atheris.Fuzz() 

105 

106 

107if __name__ == "__main__": 

108 main()