1# Copyright 2021 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Helpers for server-side streaming in REST."""
16
17from collections import deque
18import string
19from typing import Deque, Union
20
21import proto
22import requests
23import google.protobuf.message
24from google.protobuf.json_format import Parse
25
26
27class ResponseIterator:
28 """Iterator over REST API responses.
29
30 Args:
31 response (requests.Response): An API response object.
32 response_message_cls (Union[proto.Message, google.protobuf.message.Message]): A response
33 class expected to be returned from an API.
34
35 Raises:
36 ValueError: If `response_message_cls` is not a subclass of `proto.Message` or `google.protobuf.message.Message`.
37 """
38
39 def __init__(
40 self,
41 response: requests.Response,
42 response_message_cls: Union[proto.Message, google.protobuf.message.Message],
43 ):
44 self._response = response
45 self._response_message_cls = response_message_cls
46 # Inner iterator over HTTP response's content.
47 self._response_itr = self._response.iter_content(decode_unicode=True)
48 # Contains a list of JSON responses ready to be sent to user.
49 self._ready_objs: Deque[str] = deque()
50 # Current JSON response being built.
51 self._obj = ""
52 # Keeps track of the nesting level within a JSON object.
53 self._level = 0
54 # Keeps track whether HTTP response is currently sending values
55 # inside of a string value.
56 self._in_string = False
57 # Whether an escape symbol "\" was encountered.
58 self._escape_next = False
59
60 def cancel(self):
61 """Cancel existing streaming operation."""
62 self._response.close()
63
64 def _process_chunk(self, chunk: str):
65 if self._level == 0:
66 if chunk[0] != "[":
67 raise ValueError(
68 "Can only parse array of JSON objects, instead got %s" % chunk
69 )
70 for char in chunk:
71 if char == "{":
72 if self._level == 1:
73 # Level 1 corresponds to the outermost JSON object
74 # (i.e. the one we care about).
75 self._obj = ""
76 if not self._in_string:
77 self._level += 1
78 self._obj += char
79 elif char == "}":
80 self._obj += char
81 if not self._in_string:
82 self._level -= 1
83 if not self._in_string and self._level == 1:
84 self._ready_objs.append(self._obj)
85 elif char == '"':
86 # Helps to deal with an escaped quotes inside of a string.
87 if not self._escape_next:
88 self._in_string = not self._in_string
89 self._obj += char
90 elif char in string.whitespace:
91 if self._in_string:
92 self._obj += char
93 elif char == "[":
94 if self._level == 0:
95 self._level += 1
96 else:
97 self._obj += char
98 elif char == "]":
99 if self._level == 1:
100 self._level -= 1
101 else:
102 self._obj += char
103 else:
104 self._obj += char
105 self._escape_next = not self._escape_next if char == "\\" else False
106
107 def __next__(self):
108 while not self._ready_objs:
109 try:
110 chunk = next(self._response_itr)
111 self._process_chunk(chunk)
112 except StopIteration as e:
113 if self._level > 0:
114 raise ValueError("Unfinished stream: %s" % self._obj)
115 raise e
116 return self._grab()
117
118 def _grab(self):
119 # Add extra quotes to make json.loads happy.
120 if issubclass(self._response_message_cls, proto.Message):
121 return self._response_message_cls.from_json(self._ready_objs.popleft())
122 elif issubclass(self._response_message_cls, google.protobuf.message.Message):
123 return Parse(self._ready_objs.popleft(), self._response_message_cls())
124 else:
125 raise ValueError(
126 "Response message class must be a subclass of proto.Message or google.protobuf.message.Message."
127 )
128
129 def __iter__(self):
130 return self