Coverage for /pythoncovmergedfiles/medio/medio/src/pikepdf/fuzzing/fuzz_helpers.py: 51%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

55 statements  

1# SPDX-FileCopyrightText: 2024 ennamarie19 

2# SPDX-License-Identifier: MIT 

3from __future__ import annotations 

4 

5import contextlib 

6import datetime 

7import io 

8import tempfile 

9from collections.abc import Generator 

10from typing import TypeVar 

11 

12import atheris 

13 

14T = TypeVar("T") 

15 

16 

17class EnhancedFuzzedDataProvider(atheris.FuzzedDataProvider): 

18 def ConsumeRandomBytes(self) -> bytes: 

19 return self.ConsumeBytes(self.ConsumeIntInRange(0, self.remaining_bytes())) 

20 

21 def ConsumeRandomString(self) -> str: 

22 return self.ConsumeUnicodeNoSurrogates( 

23 self.ConsumeIntInRange(0, self.remaining_bytes()) 

24 ) 

25 

26 def ConsumeRemainingString(self) -> str: 

27 return self.ConsumeUnicodeNoSurrogates(self.remaining_bytes()) 

28 

29 def ConsumeRemainingBytes(self) -> bytes: 

30 return self.ConsumeBytes(self.remaining_bytes()) 

31 

32 def ConsumeSublist(self, source: list[T]) -> list[T]: 

33 """Returns a shuffled sub-list of the given list of len [1, len(source)]""" 

34 chosen = [elem for elem in source if self.ConsumeBool()] 

35 

36 # Shuffle 

37 for i in range(len(chosen) - 1, 1, -1): 

38 j = self.ConsumeIntInRange(0, i) 

39 chosen[i], chosen[j] = chosen[j], chosen[i] 

40 

41 return chosen or [self.PickValueInList(source)] 

42 

43 def ConsumeDate(self) -> datetime.datetime: 

44 try: 

45 return datetime.datetime.fromtimestamp(self.ConsumeFloat()) 

46 except (OverflowError, OSError, ValueError): 

47 return datetime.datetime(year=1970, month=1, day=1) 

48 

49 @contextlib.contextmanager 

50 def ConsumeMemoryFile( 

51 self, all_data: bool = False, as_bytes: bool = True 

52 ) -> Generator[io.IOBase, None, None]: 

53 if all_data: 

54 file_data = ( 

55 self.ConsumeRemainingBytes() 

56 if as_bytes 

57 else self.ConsumeRemainingString() 

58 ) 

59 else: 

60 file_data = ( 

61 self.ConsumeRandomBytes() if as_bytes else self.ConsumeRandomString() 

62 ) 

63 

64 file: io.StringIO | io.BytesIO | None = None 

65 

66 if isinstance(file_data, str): 

67 file = io.StringIO(file_data) 

68 elif isinstance(file_data, bytes): 

69 file = io.BytesIO(file_data) 

70 else: 

71 file = io.StringIO(str(file_data)) 

72 

73 yield file 

74 file.close() 

75 

76 @contextlib.contextmanager 

77 def ConsumeTemporaryFile( 

78 self, suffix: str, all_data: bool = False, as_bytes: bool = True 

79 ) -> Generator[str, None, None]: 

80 if all_data: 

81 file_data = ( 

82 self.ConsumeRemainingBytes() 

83 if as_bytes 

84 else self.ConsumeRemainingString() 

85 ) 

86 else: 

87 file_data = ( 

88 self.ConsumeRandomBytes() if as_bytes else self.ConsumeRandomString() 

89 ) 

90 

91 mode = "w+b" if as_bytes else "w+" 

92 tfile = tempfile.NamedTemporaryFile(mode=mode, suffix=suffix) 

93 tfile.write(file_data) 

94 tfile.seek(0) 

95 tfile.flush() 

96 

97 yield tfile.name 

98 tfile.close()