1# Copyright 2024, Google LLC All rights reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15from typing import Optional, List
16
17from opentelemetry.propagators.textmap import Setter, Getter
18
19from google.pubsub_v1 import PubsubMessage
20
21
22class OpenTelemetryContextSetter(Setter):
23 """
24 Used by Open Telemetry for context propagation.
25 """
26
27 def set(self, carrier: PubsubMessage, key: str, value: str) -> None:
28 """
29 Injects trace context into Pub/Sub message attributes with
30 "googclient_" prefix.
31
32 Args:
33 carrier(PubsubMessage): The Pub/Sub message which is the carrier of Open Telemetry
34 data.
35 key(str): The key for which the Open Telemetry context data needs to be set.
36 value(str): The Open Telemetry context value to be set.
37
38 Returns:
39 None
40 """
41 carrier.attributes["googclient_" + key] = value
42
43
44class OpenTelemetryContextGetter(Getter):
45 """
46 Used by Open Telemetry for context propagation.
47 """
48
49 def get(self, carrier: PubsubMessage, key: str) -> Optional[List[str]]:
50 if ("googclient_" + key) not in carrier.attributes:
51 return None
52 return [carrier.attributes["googclient_" + key]]
53
54 def keys(self, carrier: PubsubMessage) -> List[str]:
55 return list(map(str, carrier.attributes.keys()))