1# Copyright 2024 Google LLC 
    2# 
    3# Licensed under the Apache License, Version 2.0 (the "License"); 
    4# you may not use this file except in compliance with the License. 
    5# You may obtain a copy of the License at 
    6# 
    7#     http://www.apache.org/licenses/LICENSE-2.0 
    8# 
    9# Unless required by applicable law or agreed to in writing, software 
    10# distributed under the License is distributed on an "AS IS" BASIS, 
    11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
    12# See the License for the specific language governing permissions and 
    13# limitations under the License. 
    14 
    15"""Helpers for server-side streaming in REST.""" 
    16 
    17from collections import deque 
    18import string 
    19from typing import Deque, Union 
    20import types 
    21 
    22import proto 
    23import google.protobuf.message 
    24from google.protobuf.json_format import Parse 
    25 
    26 
    27class BaseResponseIterator: 
    28    """Base Iterator over REST API responses. This class should not be used directly. 
    29 
    30    Args: 
    31        response_message_cls (Union[proto.Message, google.protobuf.message.Message]): A response 
    32        class expected to be returned from an API. 
    33 
    34    Raises: 
    35        ValueError: If `response_message_cls` is not a subclass of `proto.Message` or `google.protobuf.message.Message`. 
    36    """ 
    37 
    38    def __init__( 
    39        self, 
    40        response_message_cls: Union[proto.Message, google.protobuf.message.Message], 
    41    ): 
    42        self._response_message_cls = response_message_cls 
    43        # Contains a list of JSON responses ready to be sent to user. 
    44        self._ready_objs: Deque[str] = deque() 
    45        # Current JSON response being built. 
    46        self._obj = "" 
    47        # Keeps track of the nesting level within a JSON object. 
    48        self._level = 0 
    49        # Keeps track whether HTTP response is currently sending values 
    50        # inside of a string value. 
    51        self._in_string = False 
    52        # Whether an escape symbol "\" was encountered. 
    53        self._escape_next = False 
    54 
    55        self._grab = types.MethodType(self._create_grab(), self) 
    56 
    57    def _process_chunk(self, chunk: str): 
    58        if self._level == 0: 
    59            if chunk[0] != "[": 
    60                raise ValueError( 
    61                    "Can only parse array of JSON objects, instead got %s" % chunk 
    62                ) 
    63        for char in chunk: 
    64            if char == "{": 
    65                if self._level == 1: 
    66                    # Level 1 corresponds to the outermost JSON object 
    67                    # (i.e. the one we care about). 
    68                    self._obj = "" 
    69                if not self._in_string: 
    70                    self._level += 1 
    71                self._obj += char 
    72            elif char == "}": 
    73                self._obj += char 
    74                if not self._in_string: 
    75                    self._level -= 1 
    76                if not self._in_string and self._level == 1: 
    77                    self._ready_objs.append(self._obj) 
    78            elif char == '"': 
    79                # Helps to deal with an escaped quotes inside of a string. 
    80                if not self._escape_next: 
    81                    self._in_string = not self._in_string 
    82                self._obj += char 
    83            elif char in string.whitespace: 
    84                if self._in_string: 
    85                    self._obj += char 
    86            elif char == "[": 
    87                if self._level == 0: 
    88                    self._level += 1 
    89                else: 
    90                    self._obj += char 
    91            elif char == "]": 
    92                if self._level == 1: 
    93                    self._level -= 1 
    94                else: 
    95                    self._obj += char 
    96            else: 
    97                self._obj += char 
    98            self._escape_next = not self._escape_next if char == "\\" else False 
    99 
    100    def _create_grab(self): 
    101        if issubclass(self._response_message_cls, proto.Message): 
    102 
    103            def grab(this): 
    104                return this._response_message_cls.from_json( 
    105                    this._ready_objs.popleft(), ignore_unknown_fields=True 
    106                ) 
    107 
    108            return grab 
    109        elif issubclass(self._response_message_cls, google.protobuf.message.Message): 
    110 
    111            def grab(this): 
    112                return Parse(this._ready_objs.popleft(), this._response_message_cls()) 
    113 
    114            return grab 
    115        else: 
    116            raise ValueError( 
    117                "Response message class must be a subclass of proto.Message or google.protobuf.message.Message." 
    118            )