Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/grpc_status/_async.py: 32%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

22 statements  

1# Copyright 2020 The gRPC Authors 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14"""Reference implementation for status mapping in gRPC Python.""" 

15 

16from google.rpc import status_pb2 

17from grpc.experimental import aio 

18 

19from ._common import GRPC_DETAILS_METADATA_KEY 

20from ._common import code_to_grpc_status_code 

21 

22 

23async def from_call(call: aio.Call): 

24 """Returns a google.rpc.status.Status message from a given grpc.aio.Call. 

25 

26 This is an EXPERIMENTAL API. 

27 

28 Args: 

29 call: An grpc.aio.Call instance. 

30 

31 Returns: 

32 A google.rpc.status.Status message representing the status of the RPC. 

33 """ 

34 code = await call.code() 

35 details = await call.details() 

36 trailing_metadata = await call.trailing_metadata() 

37 if trailing_metadata is None: 

38 return None 

39 for key, value in trailing_metadata: 

40 if key == GRPC_DETAILS_METADATA_KEY: 

41 rich_status = status_pb2.Status.FromString(value) 

42 if code.value[0] != rich_status.code: 

43 raise ValueError( 

44 "Code in Status proto (%s) doesn't match status code (%s)" 

45 % (code_to_grpc_status_code(rich_status.code), code) 

46 ) 

47 if details != rich_status.message: 

48 raise ValueError( 

49 "Message in Status proto (%s) doesn't match status details" 

50 " (%s)" % (rich_status.message, details) 

51 ) 

52 return rich_status 

53 return None 

54 

55 

56__all__ = [ 

57 "from_call", 

58]