Coverage for /pythoncovmergedfiles/medio/medio/src/fuzz_server.py: 45%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

84 statements  

1###### Coverage stub 

2import atexit 

3import coverage 

4cov = coverage.coverage(data_file='.coverage', cover_pylib=True) 

5cov.start() 

6# Register an exist handler that will print coverage 

7def exit_handler(): 

8 cov.stop() 

9 cov.save() 

10atexit.register(exit_handler) 

11####### End of coverage stub 

12#!/usr/bin/python3 

13# Copyright 2022 Google LLC 

14# 

15# Licensed under the Apache License, Version 2.0 (the "License"); 

16# you may not use this file except in compliance with the License. 

17# You may obtain a copy of the License at 

18# 

19# http://www.apache.org/licenses/LICENSE-2.0 

20# 

21# Unless required by applicable law or agreed to in writing, software 

22# distributed under the License is distributed on an "AS IS" BASIS, 

23# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

24# See the License for the specific language governing permissions and 

25# limitations under the License. 

26"""Fuzz grpc server using the Greeter example""" 

27 

28import os 

29import sys 

30import time 

31import grpc 

32from google.protobuf import any_pb2 

33from google.rpc import status_pb2 

34from grpc_status import rpc_status 

35 

36import socket 

37import atheris 

38import threading 

39import argparse 

40from concurrent.futures import ThreadPoolExecutor 

41from google.protobuf.internal import builder as _builder 

42 

43# Extract path of fuzzer so we can include protobuf modules 

44if getattr(sys, 'frozen', False): 

45 app_path = os.path.dirname(sys.executable) 

46elif __file__: 

47 app_path = os.path.dirname(__file__) 

48else: 

49 raise Exception("Could not extract path needed to import loop.py") 

50sys.path.append(app_path) 

51 

52import helloworld_pb2 

53import helloworld_pb2_grpc 

54 

55runs_left = None 

56server = None 

57 

58# Simple server 

59class FuzzGreeter(helloworld_pb2_grpc.GreeterServicer): 

60 def SayHello(self, request, context): 

61 print("In server") 

62 return helloworld_pb2.HelloReply(message='Hello from fuzz server, %s!' % request.name) 

63 

64 

65def serve() -> None: 

66 """Starts fuzz server""" 

67 global server 

68 server = grpc.server(ThreadPoolExecutor(max_workers=1)) 

69 helloworld_pb2_grpc.add_GreeterServicer_to_server(FuzzGreeter(), server) 

70 server.add_insecure_port('[::]:50051') 

71 server.start() 

72 #server.wait_for_termination() 

73 return 

74 

75@atheris.instrument_func 

76def TestInput(input_bytes): 

77 """Send fuzzing input to the server""" 

78 global runs_left 

79 global server 

80 if runs_left != None: 

81 runs_left = runs_left - 1 

82 if runs_left <= 2: 

83 server.stop() 

84 return 

85 

86 time.sleep(0.02) 

87 try: 

88 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: 

89 s.connect(("localhost", 50051)) 

90 s.sendall(input_bytes) 

91 data = s.recv(1024) 

92 except OSError: 

93 # We don't want to report network errors 

94 return 

95 

96 # Hit the rpc_status too 

97 fdp = atheris.FuzzedDataProvider(input_bytes) 

98 try: 

99 rich_status = status_pb2.Status( 

100 code=fdp.ConsumeIntInRange(1,30000), 

101 message=fdp.ConsumeUnicodeNoSurrogates(60) 

102 ) 

103 rpc_status.to_status(rich_status) 

104 except ValueError: 

105 pass 

106 

107 return 

108 

109 

110def get_run_count_if_there(): 

111 """Ensure proper exit for coverage builds""" 

112 parser = argparse.ArgumentParser() 

113 parser.add_argument("-atheris_runs", required=False, default=None) 

114 args, _ = parser.parse_known_args() 

115 if args.atheris_runs is None: 

116 print("None args") 

117 return None 

118 print(f"Got a fixed set of runs {args.atheris_runs}") 

119 return args.atheris_runs 

120 

121 

122def main(): 

123 global runs_left 

124 max_runs = get_run_count_if_there() 

125 if max_runs is not None: 

126 runs_left = int(max_runs) 

127 

128 # Launch a grpc server 

129 serve() 

130 

131 # Start fuzzing 

132 atheris.instrument_all() 

133 atheris.Setup(sys.argv, TestInput, enable_python_coverage=True) 

134 atheris.Fuzz() 

135 

136 

137if __name__ == "__main__": 

138 main()