Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/api_core/rest_streaming.py: 18%

65 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:45 +0000

1# Copyright 2021 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Helpers for server-side streaming in REST.""" 

16 

17from collections import deque 

18import string 

19from typing import Deque 

20 

21import requests 

22 

23 

24class ResponseIterator: 

25 """Iterator over REST API responses. 

26 

27 Args: 

28 response (requests.Response): An API response object. 

29 response_message_cls (Callable[proto.Message]): A proto 

30 class expected to be returned from an API. 

31 """ 

32 

33 def __init__(self, response: requests.Response, response_message_cls): 

34 self._response = response 

35 self._response_message_cls = response_message_cls 

36 # Inner iterator over HTTP response's content. 

37 self._response_itr = self._response.iter_content(decode_unicode=True) 

38 # Contains a list of JSON responses ready to be sent to user. 

39 self._ready_objs: Deque[str] = deque() 

40 # Current JSON response being built. 

41 self._obj = "" 

42 # Keeps track of the nesting level within a JSON object. 

43 self._level = 0 

44 # Keeps track whether HTTP response is currently sending values 

45 # inside of a string value. 

46 self._in_string = False 

47 # Whether an escape symbol "\" was encountered. 

48 self._escape_next = False 

49 

50 def cancel(self): 

51 """Cancel existing streaming operation.""" 

52 self._response.close() 

53 

54 def _process_chunk(self, chunk: str): 

55 if self._level == 0: 

56 if chunk[0] != "[": 

57 raise ValueError( 

58 "Can only parse array of JSON objects, instead got %s" % chunk 

59 ) 

60 for char in chunk: 

61 if char == "{": 

62 if self._level == 1: 

63 # Level 1 corresponds to the outermost JSON object 

64 # (i.e. the one we care about). 

65 self._obj = "" 

66 if not self._in_string: 

67 self._level += 1 

68 self._obj += char 

69 elif char == "}": 

70 self._obj += char 

71 if not self._in_string: 

72 self._level -= 1 

73 if not self._in_string and self._level == 1: 

74 self._ready_objs.append(self._obj) 

75 elif char == '"': 

76 # Helps to deal with an escaped quotes inside of a string. 

77 if not self._escape_next: 

78 self._in_string = not self._in_string 

79 self._obj += char 

80 elif char in string.whitespace: 

81 if self._in_string: 

82 self._obj += char 

83 elif char == "[": 

84 if self._level == 0: 

85 self._level += 1 

86 else: 

87 self._obj += char 

88 elif char == "]": 

89 if self._level == 1: 

90 self._level -= 1 

91 else: 

92 self._obj += char 

93 else: 

94 self._obj += char 

95 self._escape_next = not self._escape_next if char == "\\" else False 

96 

97 def __next__(self): 

98 while not self._ready_objs: 

99 try: 

100 chunk = next(self._response_itr) 

101 self._process_chunk(chunk) 

102 except StopIteration as e: 

103 if self._level > 0: 

104 raise ValueError("Unfinished stream: %s" % self._obj) 

105 raise e 

106 return self._grab() 

107 

108 def _grab(self): 

109 # Add extra quotes to make json.loads happy. 

110 return self._response_message_cls.from_json(self._ready_objs.popleft()) 

111 

112 def __iter__(self): 

113 return self