Coverage for /pythoncovmergedfiles/medio/medio/src/fuzzing/projects/pycups/fuzzer/fuzz_ipp_io.py: 75%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

69 statements  

1###### Coverage stub 

2import atexit 

3import coverage 

4cov = coverage.coverage(data_file='.coverage', cover_pylib=True) 

5cov.start() 

6# Register an exist handler that will print coverage 

7def exit_handler(): 

8 cov.stop() 

9 cov.save() 

10atexit.register(exit_handler) 

11####### End of coverage stub 

12#!/usr/bin/python3 

13import sys 

14import atheris 

15import cups 

16 

17# callback to provide fuzzed data for readIO 

18class ReadCallback: 

19 def __init__(self, data: bytes): 

20 self.data = data 

21 self.offset = 0 

22 

23 def __call__(self, size: int): 

24 if self.offset >= len(self.data) or size <= 0: 

25 return b"" 

26 chunk = self.data[self.offset:self.offset + size] 

27 self.offset += size 

28 return chunk 

29 

30# callback to receive data from writeIO 

31class WriteCallback: 

32 def __init__(self): 

33 self.collected = b"" 

34 self.call_count = 0 

35 

36 def __call__(self, buf: bytes): 

37 self.call_count += 1 

38 if not buf: 

39 return 0 

40 self.collected += buf 

41 return len(buf) 

42 

43# callback that raises errors 

44class ErrorCallback: 

45 def __init__(self, error_on_call: int = 1): 

46 self.call_count = 0 

47 self.error_on_call = error_on_call 

48 

49 def __call__(self, *args): 

50 self.call_count += 1 

51 if self.call_count == self.error_on_call: 

52 raise RuntimeError("Callback error") 

53 return b"" if len(args) == 1 and isinstance(args[0], int) else len(args[0]) if args else 0 

54 

55def TestOneInput(data: bytes): 

56 if len(data) < 4: 

57 return 

58 

59 fdp = atheris.FuzzedDataProvider(data) 

60 

61 try: 

62 req = cups.IPPRequest() #create the IPP request object 

63 operation = fdp.ConsumeIntInRange(0, 3) 

64 

65 #selecting fuzzing scenario 

66 if operation == 0: 

67 cb = ReadCallback(fdp.ConsumeBytes(1024)) 

68 req.readIO(cb, blocking=fdp.ConsumeBool()) 

69 elif operation == 1: 

70 cb = WriteCallback() 

71 req.writeIO(cb, blocking=fdp.ConsumeBool()) 

72 elif operation == 2: 

73 cb = ErrorCallback(fdp.ConsumeIntInRange(1, 5)) 

74 req.readIO(cb, blocking=fdp.ConsumeBool()) 

75 else: 

76 cb = ErrorCallback(fdp.ConsumeIntInRange(1, 5)) 

77 req.writeIO(cb, blocking=fdp.ConsumeBool()) 

78 

79 if fdp.remaining_bytes() > 0 and hasattr(req, 'setChunkSize'): 

80 req.setChunkSize(fdp.ConsumeIntInRange(1, 8192)) 

81 

82 except Exception: 

83 pass 

84 

85def main(): 

86 atheris.instrument_all() 

87 atheris.Setup(sys.argv, TestOneInput, enable_python_coverage=True) 

88 atheris.Fuzz() 

89 

90if __name__ == "__main__": 

91 main()