Coverage for /pythoncovmergedfiles/medio/medio/src/fuzz_client.py: 50%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

44 statements  

1###### Coverage stub 

2import atexit 

3import coverage 

4cov = coverage.coverage(data_file='.coverage', cover_pylib=True) 

5cov.start() 

6# Register an exist handler that will print coverage 

7def exit_handler(): 

8 cov.stop() 

9 cov.save() 

10atexit.register(exit_handler) 

11####### End of coverage stub 

12#!/usr/bin/python3 

13# Copyright 2022 Google LLC 

14# 

15# Licensed under the Apache License, Version 2.0 (the "License"); 

16# you may not use this file except in compliance with the License. 

17# You may obtain a copy of the License at 

18# 

19# http://www.apache.org/licenses/LICENSE-2.0 

20# 

21# Unless required by applicable law or agreed to in writing, software 

22# distributed under the License is distributed on an "AS IS" BASIS, 

23# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

24# See the License for the specific language governing permissions and 

25# limitations under the License. 

26 

27import os 

28import sys 

29import atheris 

30import mock 

31 

32from google.auth import credentials 

33from google.cloud.secretmanager_v1beta1.services.secret_manager_service import ( 

34 SecretManagerServiceAsyncClient, 

35 SecretManagerServiceClient, 

36 pagers, 

37 transports, 

38) 

39from google.cloud.secretmanager_v1beta1.types import resources, service 

40 

41def test_add_secret_version(client, fdp): 

42 """Calls add_secret_version on the client with data from the fuzzer""" 

43 

44 # Create the input. 

45 if fdp.ConsumeBool(): 

46 request = service.AddSecretVersionRequest() 

47 try: 

48 request.parent = fdp.ConsumeUnicodeNoSurrogates(20) 

49 except: 

50 request.parent = None 

51 else: 

52 request = {} 

53 

54 parent = None 

55 payload = None 

56 if fdp.ConsumeBool(): 

57 parent = fdp.ConsumeUnicodeNoSurrogates(10) 

58 request = None 

59 payload = resources.SecretPayload() 

60 payload.data = fdp.ConsumeBytes(10) 

61 

62 

63 # Mock call within the gRPC stub and fake the request. 

64 with mock.patch.object( 

65 type(client.transport.add_secret_version), "__call__" 

66 ) as call: 

67 # Create return value for the call. 

68 call.return_value = resources.SecretVersion( 

69 name="name_value", 

70 state=resources.SecretVersion.State.ENABLED, 

71 ) 

72 

73 response = client.add_secret_version( 

74 request = request, 

75 parent = parent, 

76 payload = payload 

77 ) 

78 

79 

80def TestOneInput(data): 

81 fdp = atheris.FuzzedDataProvider(data) 

82 

83 # Create a client we can call. 

84 client = SecretManagerServiceClient( 

85 credentials=credentials.AnonymousCredentials(), 

86 transport="grpc", 

87 ) 

88 

89 test_add_secret_version(client, fdp) 

90 

91 

92def main(): 

93 atheris.instrument_all() 

94 atheris.Setup(sys.argv, TestOneInput, enable_python_coverage=True) 

95 atheris.Fuzz() 

96 

97if __name__ == "__main__": 

98 main()