Coverage for /pythoncovmergedfiles/medio/medio/src/enhanced_fdp.py: 54%

41 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2023-09-25 06:15 +0000

1# Copyright 2021 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14# 

15################################################################################ 

16""" 

17Defines the EnhancedFuzzedDataProvider 

18""" 

19from contextlib import contextmanager 

20from enum import Enum 

21from io import BytesIO, StringIO 

22from tempfile import NamedTemporaryFile 

23from typing import Optional, Union 

24 

25from atheris import FuzzedDataProvider 

26 

27 

28class EnhancedFuzzedDataProvider(FuzzedDataProvider): 

29 """ 

30 Extends the functionality of FuzzedDataProvider 

31 """ 

32 

33 def _consume_random_count(self) -> int: 

34 """ 

35 :return: A count of bytes that is strictly in range 0<=n<=remaining_bytes 

36 """ 

37 return self.ConsumeIntInRange(0, self.remaining_bytes()) 

38 

39 def _consume_file_data(self, all_data: bool, as_bytes: bool) -> Union[bytes, str]: 

40 """ 

41 Consumes data for a file 

42 :param all_data: Whether to consume all remaining bytes from the buffer 

43 :param as_bytes: Consumed output is bytes if true, otherwise a string 

44 :return: The consumed output 

45 """ 

46 if all_data: 

47 file_data = self.ConsumeRemainingBytes() if as_bytes else self.ConsumeRemainingString() 

48 else: 

49 file_data = self.ConsumeRandomBytes() if as_bytes else self.ConsumeRandomString() 

50 

51 return file_data 

52 

53 def ConsumeRandomBytes(self) -> bytes: 

54 """ 

55 Consume a 'random' count of the remaining bytes 

56 :return: 0<=n<=remaining_bytes bytes 

57 """ 

58 return self.ConsumeBytes(self._consume_random_count()) 

59 

60 def ConsumeRemainingBytes(self) -> bytes: 

61 """ 

62 :return: The remaining buffer 

63 """ 

64 return self.ConsumeBytes(self.remaining_bytes()) 

65 

66 def ConsumeRandomString(self) -> str: 

67 """ 

68 Consume a 'random' length string, excluding surrogates 

69 :return: The string 

70 """ 

71 return self.ConsumeUnicodeNoSurrogates(self._consume_random_count()) 

72 

73 def ConsumeRemainingString(self) -> str: 

74 """ 

75 :return: The remaining buffer, as a string without surrogates 

76 """ 

77 return self.ConsumeUnicodeNoSurrogates(self.remaining_bytes()) 

78 

79 def PickValueInEnum(self, enum): 

80 return self.PickValueInList([e.value for e in enum]) 

81 

82 @contextmanager 

83 def ConsumeMemoryFile(self, all_data: bool, as_bytes: bool) -> Union[BytesIO, StringIO]: 

84 """ 

85 Consumes a file-like object, that resides entirely in memory 

86 :param all_data: Whether to populate the file with all remaining data or not 

87 :param as_bytes: Whether the file should hold bytes or strings 

88 :return: The in-memory file 

89 """ 

90 file_data = self._consume_file_data(all_data, as_bytes) 

91 file = BytesIO(file_data) if as_bytes else StringIO(file_data) 

92 yield file 

93 file.close() 

94 

95 @contextmanager 

96 def ConsumeTemporaryFile(self, all_data: bool, as_bytes: bool, suffix: Optional[str] = None) -> str: 

97 """ 

98 Consumes a temporary file, handling its deletion 

99 :param all_data: Whether to populate the file with all remaining data or not 

100 :param as_bytes: Whether the file should hold bytes or strings 

101 :param suffix: A suffix to use for the generated file, e.g. 'txt' 

102 :return: The path to the temporary file 

103 """ 

104 file_data = self._consume_file_data(all_data, as_bytes) 

105 mode = 'w+b' if as_bytes else 'w+' 

106 tfile = NamedTemporaryFile(mode=mode, suffix=suffix) 

107 tfile.write(file_data) 

108 tfile.seek(0) 

109 tfile.flush() 

110 yield tfile.name 

111 tfile.close()