Coverage for /pythoncovmergedfiles/medio/medio/src/dateparser/fuzzing/fuzz_helpers.py: 60%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

47 statements  

1import contextlib 

2import datetime 

3import io 

4import tempfile 

5from typing import List, TypeVar 

6 

7import atheris 

8 

9T = TypeVar("T") 

10 

11 

12class EnhancedFuzzedDataProvider(atheris.FuzzedDataProvider): 

13 def ConsumeRandomBytes(self) -> bytes: 

14 return self.ConsumeBytes(self.ConsumeIntInRange(0, self.remaining_bytes())) 

15 

16 def ConsumeRandomString(self) -> str: 

17 return self.ConsumeUnicodeNoSurrogates( 

18 self.ConsumeIntInRange(0, self.remaining_bytes()) 

19 ) 

20 

21 def ConsumeRemainingString(self) -> str: 

22 return self.ConsumeUnicodeNoSurrogates(self.remaining_bytes()) 

23 

24 def ConsumeRemainingBytes(self) -> bytes: 

25 return self.ConsumeBytes(self.remaining_bytes()) 

26 

27 def ConsumeSublist(self, source: List[T]) -> List[T]: 

28 """ 

29 Returns a shuffled sub-list of the given list of len [1, len(source)] 

30 """ 

31 chosen = [elem for elem in source if self.ConsumeBool()] 

32 

33 # Shuffle 

34 for i in range(len(chosen) - 1, 1, -1): 

35 j = self.ConsumeIntInRange(0, i) 

36 chosen[i], chosen[j] = chosen[j], chosen[i] 

37 

38 return chosen or [self.PickValueInList(source)] 

39 

40 def ConsumeDate(self) -> datetime.datetime: 

41 try: 

42 return datetime.datetime.fromtimestamp(self.ConsumeFloat()) 

43 except (OverflowError, OSError, ValueError): 

44 return datetime.datetime(year=1970, month=1, day=1) 

45 

46 @contextlib.contextmanager 

47 def ConsumeMemoryFile( 

48 self, all_data: bool = False, as_bytes: bool = True 

49 ) -> io.BytesIO: 

50 if all_data: 

51 file_data = ( 

52 self.ConsumeRemainingBytes() 

53 if as_bytes 

54 else self.ConsumeRemainingString() 

55 ) 

56 else: 

57 file_data = ( 

58 self.ConsumeRandomBytes() if as_bytes else self.ConsumeRandomString() 

59 ) 

60 

61 file = io.BytesIO(file_data) if as_bytes else io.StringIO(file_data) 

62 yield file 

63 file.close() 

64 

65 @contextlib.contextmanager 

66 def ConsumeTemporaryFile( 

67 self, suffix: str, all_data: bool = False, as_bytes: bool = True 

68 ) -> str: 

69 if all_data: 

70 file_data = ( 

71 self.ConsumeRemainingBytes() 

72 if as_bytes 

73 else self.ConsumeRemainingString() 

74 ) 

75 else: 

76 file_data = ( 

77 self.ConsumeRandomBytes() if as_bytes else self.ConsumeRandomString() 

78 ) 

79 

80 mode = "w+b" if as_bytes else "w+" 

81 tfile = tempfile.NamedTemporaryFile(mode=mode, suffix=suffix) 

82 tfile.write(file_data) 

83 tfile.seek(0) 

84 tfile.flush() 

85 yield tfile.name 

86 tfile.close()