Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/fuzz_multipart.py: 54%

46 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:52 +0000

1###### Coverage stub 

2import atexit 

3import coverage 

4cov = coverage.coverage(data_file='.coverage', cover_pylib=True) 

5cov.start() 

6# Register an exist handler that will print coverage 

7def exit_handler(): 

8 cov.stop() 

9 cov.save() 

10atexit.register(exit_handler) 

11####### End of coverage stub 

12#!/usr/bin/python3 

13# Copyright 2022 Google LLC 

14# 

15# Licensed under the Apache License, Version 2.0 (the "License"); 

16# you may not use this file except in compliance with the License. 

17# You may obtain a copy of the License at 

18# 

19# http://www.apache.org/licenses/LICENSE-2.0 

20# 

21# Unless required by applicable law or agreed to in writing, software 

22# distributed under the License is distributed on an "AS IS" BASIS, 

23# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

24# See the License for the specific language governing permissions and 

25# limitations under the License. 

26 

27import io 

28import sys 

29import atheris 

30 

31# aiohttp imports 

32import asyncio 

33with atheris.instrument_imports(): 

34 import aiohttp 

35 from aiohttp.hdrs import ( 

36 CONTENT_TYPE, 

37 ) 

38 

39class FuzzStream: 

40 def __init__(self, content): 

41 self.content = io.BytesIO(content) 

42 

43 async def read(self, size = None): 

44 return self.content.read(size) 

45 

46 def at_eof(self): 

47 return self.content.tell() == len(self.content.getbuffer()) 

48 

49 async def readline(self): 

50 return self.content.readline() 

51 

52 def unread_data(self, data): 

53 self.content = io.BytesIO(data + self.content.read()) 

54 

55 

56@atheris.instrument_func 

57async def fuzz_bodypart_reader(data): 

58 newline=b'\n' 

59 fdp = atheris.FuzzedDataProvider(data) 

60 obj = aiohttp.BodyPartReader( 

61 b"--:", 

62 { 

63 CONTENT_TYPE: fdp.ConsumeUnicode(30) 

64 }, 

65 FuzzStream(fdp.ConsumeBytes(atheris.ALL_REMAINING)), 

66 _newline=newline 

67 ) 

68 if not obj.at_eof(): 

69 await obj.form() 

70 

71@atheris.instrument_func 

72def TestOneInput(data): 

73 try: 

74 asyncio.run(fuzz_bodypart_reader(data)) 

75 except AssertionError: 

76 return 

77 

78def main(): 

79 atheris.Setup(sys.argv, TestOneInput, enable_python_coverage=True) 

80 loop = asyncio.get_event_loop() 

81 asyncio.set_event_loop(loop) 

82 atheris.Fuzz() 

83 

84if __name__ == "__main__": 

85 main()