Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/api_core/rest_streaming.py: 21%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

72 statements  

1# Copyright 2021 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Helpers for server-side streaming in REST.""" 

16 

17from collections import deque 

18import string 

19from typing import Deque, Union 

20 

21import proto 

22import requests 

23import google.protobuf.message 

24from google.protobuf.json_format import Parse 

25 

26 

27class ResponseIterator: 

28 """Iterator over REST API responses. 

29 

30 Args: 

31 response (requests.Response): An API response object. 

32 response_message_cls (Union[proto.Message, google.protobuf.message.Message]): A response 

33 class expected to be returned from an API. 

34 

35 Raises: 

36 ValueError: If `response_message_cls` is not a subclass of `proto.Message` or `google.protobuf.message.Message`. 

37 """ 

38 

39 def __init__( 

40 self, 

41 response: requests.Response, 

42 response_message_cls: Union[proto.Message, google.protobuf.message.Message], 

43 ): 

44 self._response = response 

45 self._response_message_cls = response_message_cls 

46 # Inner iterator over HTTP response's content. 

47 self._response_itr = self._response.iter_content(decode_unicode=True) 

48 # Contains a list of JSON responses ready to be sent to user. 

49 self._ready_objs: Deque[str] = deque() 

50 # Current JSON response being built. 

51 self._obj = "" 

52 # Keeps track of the nesting level within a JSON object. 

53 self._level = 0 

54 # Keeps track whether HTTP response is currently sending values 

55 # inside of a string value. 

56 self._in_string = False 

57 # Whether an escape symbol "\" was encountered. 

58 self._escape_next = False 

59 

60 def cancel(self): 

61 """Cancel existing streaming operation.""" 

62 self._response.close() 

63 

64 def _process_chunk(self, chunk: str): 

65 if self._level == 0: 

66 if chunk[0] != "[": 

67 raise ValueError( 

68 "Can only parse array of JSON objects, instead got %s" % chunk 

69 ) 

70 for char in chunk: 

71 if char == "{": 

72 if self._level == 1: 

73 # Level 1 corresponds to the outermost JSON object 

74 # (i.e. the one we care about). 

75 self._obj = "" 

76 if not self._in_string: 

77 self._level += 1 

78 self._obj += char 

79 elif char == "}": 

80 self._obj += char 

81 if not self._in_string: 

82 self._level -= 1 

83 if not self._in_string and self._level == 1: 

84 self._ready_objs.append(self._obj) 

85 elif char == '"': 

86 # Helps to deal with an escaped quotes inside of a string. 

87 if not self._escape_next: 

88 self._in_string = not self._in_string 

89 self._obj += char 

90 elif char in string.whitespace: 

91 if self._in_string: 

92 self._obj += char 

93 elif char == "[": 

94 if self._level == 0: 

95 self._level += 1 

96 else: 

97 self._obj += char 

98 elif char == "]": 

99 if self._level == 1: 

100 self._level -= 1 

101 else: 

102 self._obj += char 

103 else: 

104 self._obj += char 

105 self._escape_next = not self._escape_next if char == "\\" else False 

106 

107 def __next__(self): 

108 while not self._ready_objs: 

109 try: 

110 chunk = next(self._response_itr) 

111 self._process_chunk(chunk) 

112 except StopIteration as e: 

113 if self._level > 0: 

114 raise ValueError("Unfinished stream: %s" % self._obj) 

115 raise e 

116 return self._grab() 

117 

118 def _grab(self): 

119 # Add extra quotes to make json.loads happy. 

120 if issubclass(self._response_message_cls, proto.Message): 

121 return self._response_message_cls.from_json(self._ready_objs.popleft()) 

122 elif issubclass(self._response_message_cls, google.protobuf.message.Message): 

123 return Parse(self._ready_objs.popleft(), self._response_message_cls()) 

124 else: 

125 raise ValueError( 

126 "Response message class must be a subclass of proto.Message or google.protobuf.message.Message." 

127 ) 

128 

129 def __iter__(self): 

130 return self