Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/grpc/_compression.py: 17%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

23 statements  

1# Copyright 2019 The gRPC authors. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15from __future__ import annotations 

16 

17from typing import Optional, Union 

18 

19import grpc 

20from grpc._cython import cygrpc 

21from grpc._typing import MetadataType 

22 

23NoCompression = cygrpc.CompressionAlgorithm.none 

24Deflate = cygrpc.CompressionAlgorithm.deflate 

25Gzip = cygrpc.CompressionAlgorithm.gzip 

26 

27_METADATA_STRING_MAPPING = { 

28 NoCompression: "identity", 

29 Deflate: "deflate", 

30 Gzip: "gzip", 

31} 

32 

33 

34def _compression_algorithm_to_metadata_value( 

35 compression: grpc.Compression, 

36) -> str: 

37 return _METADATA_STRING_MAPPING[compression] 

38 

39 

40def compression_algorithm_to_metadata(compression: grpc.Compression): 

41 return ( 

42 cygrpc.GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY, 

43 _compression_algorithm_to_metadata_value(compression), 

44 ) 

45 

46 

47def create_channel_option(compression: Optional[grpc.Compression]): 

48 return ( 

49 ((cygrpc.GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM, int(compression)),) 

50 if compression 

51 else () 

52 ) 

53 

54 

55def augment_metadata( 

56 metadata: Optional[Union[MetadataType, grpc.aio.Metadata]], 

57 compression: Optional[grpc.Compression], 

58): 

59 if not metadata and not compression: 

60 return None 

61 base_metadata = tuple(metadata) if metadata else () 

62 compression_metadata = ( 

63 (compression_algorithm_to_metadata(compression),) if compression else () 

64 ) 

65 return base_metadata + compression_metadata 

66 

67 

68__all__ = ( 

69 "Deflate", 

70 "Gzip", 

71 "NoCompression", 

72)