1# Copyright 2024 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Helpers for server-side streaming in REST."""
16
17from collections import deque
18import string
19from typing import Deque, Union
20import types
21
22import proto
23import google.protobuf.message
24from google.protobuf.json_format import Parse
25
26
27class BaseResponseIterator:
28 """Base Iterator over REST API responses. This class should not be used directly.
29
30 Args:
31 response_message_cls (Union[proto.Message, google.protobuf.message.Message]): A response
32 class expected to be returned from an API.
33
34 Raises:
35 ValueError: If `response_message_cls` is not a subclass of `proto.Message` or `google.protobuf.message.Message`.
36 """
37
38 def __init__(
39 self,
40 response_message_cls: Union[proto.Message, google.protobuf.message.Message],
41 ):
42 self._response_message_cls = response_message_cls
43 # Contains a list of JSON responses ready to be sent to user.
44 self._ready_objs: Deque[str] = deque()
45 # Current JSON response being built.
46 self._obj = ""
47 # Keeps track of the nesting level within a JSON object.
48 self._level = 0
49 # Keeps track whether HTTP response is currently sending values
50 # inside of a string value.
51 self._in_string = False
52 # Whether an escape symbol "\" was encountered.
53 self._escape_next = False
54
55 self._grab = types.MethodType(self._create_grab(), self)
56
57 def _process_chunk(self, chunk: str):
58 if self._level == 0:
59 if chunk[0] != "[":
60 raise ValueError(
61 "Can only parse array of JSON objects, instead got %s" % chunk
62 )
63 for char in chunk:
64 if char == "{":
65 if self._level == 1:
66 # Level 1 corresponds to the outermost JSON object
67 # (i.e. the one we care about).
68 self._obj = ""
69 if not self._in_string:
70 self._level += 1
71 self._obj += char
72 elif char == "}":
73 self._obj += char
74 if not self._in_string:
75 self._level -= 1
76 if not self._in_string and self._level == 1:
77 self._ready_objs.append(self._obj)
78 elif char == '"':
79 # Helps to deal with an escaped quotes inside of a string.
80 if not self._escape_next:
81 self._in_string = not self._in_string
82 self._obj += char
83 elif char in string.whitespace:
84 if self._in_string:
85 self._obj += char
86 elif char == "[":
87 if self._level == 0:
88 self._level += 1
89 else:
90 self._obj += char
91 elif char == "]":
92 if self._level == 1:
93 self._level -= 1
94 else:
95 self._obj += char
96 else:
97 self._obj += char
98 self._escape_next = not self._escape_next if char == "\\" else False
99
100 def _create_grab(self):
101 if issubclass(self._response_message_cls, proto.Message):
102
103 def grab(this):
104 return this._response_message_cls.from_json(
105 this._ready_objs.popleft(), ignore_unknown_fields=True
106 )
107
108 return grab
109 elif issubclass(self._response_message_cls, google.protobuf.message.Message):
110
111 def grab(this):
112 return Parse(this._ready_objs.popleft(), this._response_message_cls())
113
114 return grab
115 else:
116 raise ValueError(
117 "Response message class must be a subclass of proto.Message or google.protobuf.message.Message."
118 )