Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/grpc/_compression.py: 32%
22 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1# Copyright 2019 The gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15from __future__ import annotations
17from typing import Optional
19import grpc
20from grpc._cython import cygrpc
21from grpc._typing import MetadataType
23NoCompression = cygrpc.CompressionAlgorithm.none
24Deflate = cygrpc.CompressionAlgorithm.deflate
25Gzip = cygrpc.CompressionAlgorithm.gzip
27_METADATA_STRING_MAPPING = {
28 NoCompression: "identity",
29 Deflate: "deflate",
30 Gzip: "gzip",
31}
34def _compression_algorithm_to_metadata_value(
35 compression: grpc.Compression,
36) -> str:
37 return _METADATA_STRING_MAPPING[compression]
40def compression_algorithm_to_metadata(compression: grpc.Compression):
41 return (
42 cygrpc.GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY,
43 _compression_algorithm_to_metadata_value(compression),
44 )
47def create_channel_option(compression: Optional[grpc.Compression]):
48 return (
49 ((cygrpc.GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM, int(compression)),)
50 if compression
51 else ()
52 )
55def augment_metadata(
56 metadata: Optional[MetadataType], compression: Optional[grpc.Compression]
57):
58 if not metadata and not compression:
59 return None
60 base_metadata = tuple(metadata) if metadata else ()
61 compression_metadata = (
62 (compression_algorithm_to_metadata(compression),) if compression else ()
63 )
64 return base_metadata + compression_metadata
67__all__ = (
68 "NoCompression",
69 "Deflate",
70 "Gzip",
71)