Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/opentelemetry/sdk/resources/__init__.py: 76%
164 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
1# Copyright The OpenTelemetry Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""
16This package implements `OpenTelemetry Resources
17<https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/resource/sdk.md#resource-sdk>`_:
19 *A Resource is an immutable representation of the entity producing
20 telemetry. For example, a process producing telemetry that is running in
21 a container on Kubernetes has a Pod name, it is in a namespace and
22 possibly is part of a Deployment which also has a name. All three of
23 these attributes can be included in the Resource.*
25Resource objects are created with `Resource.create`, which accepts attributes
26(key-values). Resources should NOT be created via constructor, and working with
27`Resource` objects should only be done via the Resource API methods. Resource
28attributes can also be passed at process invocation in the
29:envvar:`OTEL_RESOURCE_ATTRIBUTES` environment variable. You should register
30your resource with the `opentelemetry.sdk.trace.TracerProvider` by passing
31them into their constructors. The `Resource` passed to a provider is available
32to the exporter, which can send on this information as it sees fit.
34.. code-block:: python
36 trace.set_tracer_provider(
37 TracerProvider(
38 resource=Resource.create({
39 "service.name": "shoppingcart",
40 "service.instance.id": "instance-12",
41 }),
42 ),
43 )
44 print(trace.get_tracer_provider().resource.attributes)
46 {'telemetry.sdk.language': 'python',
47 'telemetry.sdk.name': 'opentelemetry',
48 'telemetry.sdk.version': '0.13.dev0',
49 'service.name': 'shoppingcart',
50 'service.instance.id': 'instance-12'}
52Note that the OpenTelemetry project documents certain `"standard attributes"
53<https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/resource/semantic_conventions/README.md>`_
54that have prescribed semantic meanings, for example ``service.name`` in the
55above example.
56 """
58import abc
59import concurrent.futures
60import logging
61import os
62import sys
63import typing
64from json import dumps
65from urllib import parse
67import pkg_resources
69from opentelemetry.attributes import BoundedAttributes
70from opentelemetry.sdk.environment_variables import (
71 OTEL_RESOURCE_ATTRIBUTES,
72 OTEL_SERVICE_NAME,
73)
74from opentelemetry.semconv.resource import ResourceAttributes
75from opentelemetry.util.types import AttributeValue
77LabelValue = AttributeValue
78Attributes = typing.Dict[str, LabelValue]
79logger = logging.getLogger(__name__)
82CLOUD_PROVIDER = ResourceAttributes.CLOUD_PROVIDER
83CLOUD_ACCOUNT_ID = ResourceAttributes.CLOUD_ACCOUNT_ID
84CLOUD_REGION = ResourceAttributes.CLOUD_REGION
85CLOUD_AVAILABILITY_ZONE = ResourceAttributes.CLOUD_AVAILABILITY_ZONE
86CONTAINER_NAME = ResourceAttributes.CONTAINER_NAME
87CONTAINER_ID = ResourceAttributes.CONTAINER_ID
88CONTAINER_IMAGE_NAME = ResourceAttributes.CONTAINER_IMAGE_NAME
89CONTAINER_IMAGE_TAG = ResourceAttributes.CONTAINER_IMAGE_TAG
90DEPLOYMENT_ENVIRONMENT = ResourceAttributes.DEPLOYMENT_ENVIRONMENT
91FAAS_NAME = ResourceAttributes.FAAS_NAME
92FAAS_ID = ResourceAttributes.FAAS_ID
93FAAS_VERSION = ResourceAttributes.FAAS_VERSION
94FAAS_INSTANCE = ResourceAttributes.FAAS_INSTANCE
95HOST_NAME = ResourceAttributes.HOST_NAME
96HOST_TYPE = ResourceAttributes.HOST_TYPE
97HOST_IMAGE_NAME = ResourceAttributes.HOST_IMAGE_NAME
98HOST_IMAGE_ID = ResourceAttributes.HOST_IMAGE_ID
99HOST_IMAGE_VERSION = ResourceAttributes.HOST_IMAGE_VERSION
100KUBERNETES_CLUSTER_NAME = ResourceAttributes.K8S_CLUSTER_NAME
101KUBERNETES_NAMESPACE_NAME = ResourceAttributes.K8S_NAMESPACE_NAME
102KUBERNETES_POD_UID = ResourceAttributes.K8S_POD_UID
103KUBERNETES_POD_NAME = ResourceAttributes.K8S_POD_NAME
104KUBERNETES_CONTAINER_NAME = ResourceAttributes.K8S_CONTAINER_NAME
105KUBERNETES_REPLICA_SET_UID = ResourceAttributes.K8S_REPLICASET_UID
106KUBERNETES_REPLICA_SET_NAME = ResourceAttributes.K8S_REPLICASET_NAME
107KUBERNETES_DEPLOYMENT_UID = ResourceAttributes.K8S_DEPLOYMENT_UID
108KUBERNETES_DEPLOYMENT_NAME = ResourceAttributes.K8S_DEPLOYMENT_NAME
109KUBERNETES_STATEFUL_SET_UID = ResourceAttributes.K8S_STATEFULSET_UID
110KUBERNETES_STATEFUL_SET_NAME = ResourceAttributes.K8S_STATEFULSET_NAME
111KUBERNETES_DAEMON_SET_UID = ResourceAttributes.K8S_DAEMONSET_UID
112KUBERNETES_DAEMON_SET_NAME = ResourceAttributes.K8S_DAEMONSET_NAME
113KUBERNETES_JOB_UID = ResourceAttributes.K8S_JOB_UID
114KUBERNETES_JOB_NAME = ResourceAttributes.K8S_JOB_NAME
115KUBERNETES_CRON_JOB_UID = ResourceAttributes.K8S_CRONJOB_UID
116KUBERNETES_CRON_JOB_NAME = ResourceAttributes.K8S_CRONJOB_NAME
117OS_TYPE = ResourceAttributes.OS_TYPE
118OS_DESCRIPTION = ResourceAttributes.OS_DESCRIPTION
119PROCESS_PID = ResourceAttributes.PROCESS_PID
120PROCESS_EXECUTABLE_NAME = ResourceAttributes.PROCESS_EXECUTABLE_NAME
121PROCESS_EXECUTABLE_PATH = ResourceAttributes.PROCESS_EXECUTABLE_PATH
122PROCESS_COMMAND = ResourceAttributes.PROCESS_COMMAND
123PROCESS_COMMAND_LINE = ResourceAttributes.PROCESS_COMMAND_LINE
124PROCESS_COMMAND_ARGS = ResourceAttributes.PROCESS_COMMAND_ARGS
125PROCESS_OWNER = ResourceAttributes.PROCESS_OWNER
126PROCESS_RUNTIME_NAME = ResourceAttributes.PROCESS_RUNTIME_NAME
127PROCESS_RUNTIME_VERSION = ResourceAttributes.PROCESS_RUNTIME_VERSION
128PROCESS_RUNTIME_DESCRIPTION = ResourceAttributes.PROCESS_RUNTIME_DESCRIPTION
129SERVICE_NAME = ResourceAttributes.SERVICE_NAME
130SERVICE_NAMESPACE = ResourceAttributes.SERVICE_NAMESPACE
131SERVICE_INSTANCE_ID = ResourceAttributes.SERVICE_INSTANCE_ID
132SERVICE_VERSION = ResourceAttributes.SERVICE_VERSION
133TELEMETRY_SDK_NAME = ResourceAttributes.TELEMETRY_SDK_NAME
134TELEMETRY_SDK_VERSION = ResourceAttributes.TELEMETRY_SDK_VERSION
135TELEMETRY_AUTO_VERSION = ResourceAttributes.TELEMETRY_AUTO_VERSION
136TELEMETRY_SDK_LANGUAGE = ResourceAttributes.TELEMETRY_SDK_LANGUAGE
139_OPENTELEMETRY_SDK_VERSION = pkg_resources.get_distribution(
140 "opentelemetry-sdk"
141).version
144class Resource:
145 """A Resource is an immutable representation of the entity producing telemetry as Attributes."""
147 def __init__(
148 self, attributes: Attributes, schema_url: typing.Optional[str] = None
149 ):
150 self._attributes = BoundedAttributes(attributes=attributes)
151 if schema_url is None:
152 schema_url = ""
153 self._schema_url = schema_url
155 @staticmethod
156 def create(
157 attributes: typing.Optional[Attributes] = None,
158 schema_url: typing.Optional[str] = None,
159 ) -> "Resource":
160 """Creates a new `Resource` from attributes.
162 Args:
163 attributes: Optional zero or more key-value pairs.
164 schema_url: Optional URL pointing to the schema
166 Returns:
167 The newly-created Resource.
168 """
169 if not attributes:
170 attributes = {}
171 resource = _DEFAULT_RESOURCE.merge(
172 OTELResourceDetector().detect()
173 ).merge(Resource(attributes, schema_url))
174 if not resource.attributes.get(SERVICE_NAME, None):
175 default_service_name = "unknown_service"
176 process_executable_name = resource.attributes.get(
177 PROCESS_EXECUTABLE_NAME, None
178 )
179 if process_executable_name:
180 default_service_name += ":" + process_executable_name
181 resource = resource.merge(
182 Resource({SERVICE_NAME: default_service_name}, schema_url)
183 )
184 return resource
186 @staticmethod
187 def get_empty() -> "Resource":
188 return _EMPTY_RESOURCE
190 @property
191 def attributes(self) -> Attributes:
192 return self._attributes
194 @property
195 def schema_url(self) -> str:
196 return self._schema_url
198 def merge(self, other: "Resource") -> "Resource":
199 """Merges this resource and an updating resource into a new `Resource`.
201 If a key exists on both the old and updating resource, the value of the
202 updating resource will override the old resource value.
204 The updating resource's `schema_url` will be used only if the old
205 `schema_url` is empty. Attempting to merge two resources with
206 different, non-empty values for `schema_url` will result in an error
207 and return the old resource.
209 Args:
210 other: The other resource to be merged.
212 Returns:
213 The newly-created Resource.
214 """
215 merged_attributes = self.attributes.copy()
216 merged_attributes.update(other.attributes)
218 if self.schema_url == "":
219 schema_url = other.schema_url
220 elif other.schema_url == "":
221 schema_url = self.schema_url
222 elif self.schema_url == other.schema_url:
223 schema_url = other.schema_url
224 else:
225 logger.error(
226 "Failed to merge resources: The two schemas %s and %s are incompatible",
227 self.schema_url,
228 other.schema_url,
229 )
230 return self
232 return Resource(merged_attributes, schema_url)
234 def __eq__(self, other: object) -> bool:
235 if not isinstance(other, Resource):
236 return False
237 return (
238 self._attributes == other._attributes
239 and self._schema_url == other._schema_url
240 )
242 def __hash__(self):
243 return hash(
244 f"{dumps(self._attributes.copy(), sort_keys=True)}|{self._schema_url}"
245 )
247 def to_json(self, indent=4) -> str:
248 return dumps(
249 {
250 "attributes": dict(self._attributes),
251 "schema_url": self._schema_url,
252 },
253 indent=indent,
254 )
257_EMPTY_RESOURCE = Resource({})
258_DEFAULT_RESOURCE = Resource(
259 {
260 TELEMETRY_SDK_LANGUAGE: "python",
261 TELEMETRY_SDK_NAME: "opentelemetry",
262 TELEMETRY_SDK_VERSION: _OPENTELEMETRY_SDK_VERSION,
263 }
264)
267class ResourceDetector(abc.ABC):
268 def __init__(self, raise_on_error=False):
269 self.raise_on_error = raise_on_error
271 @abc.abstractmethod
272 def detect(self) -> "Resource":
273 raise NotImplementedError()
276class OTELResourceDetector(ResourceDetector):
277 # pylint: disable=no-self-use
278 def detect(self) -> "Resource":
279 env_resources_items = os.environ.get(OTEL_RESOURCE_ATTRIBUTES)
280 env_resource_map = {}
282 if env_resources_items:
283 for item in env_resources_items.split(","):
284 try:
285 key, value = item.split("=", maxsplit=1)
286 except ValueError as exc:
287 logger.warning(
288 "Invalid key value resource attribute pair %s: %s",
289 item,
290 exc,
291 )
292 continue
293 value_url_decoded = parse.unquote(value.strip())
294 env_resource_map[key.strip()] = value_url_decoded
296 service_name = os.environ.get(OTEL_SERVICE_NAME)
297 if service_name:
298 env_resource_map[SERVICE_NAME] = service_name
299 return Resource(env_resource_map)
302class ProcessResourceDetector(ResourceDetector):
303 # pylint: disable=no-self-use
304 def detect(self) -> "Resource":
305 _runtime_version = ".".join(
306 map(
307 str,
308 sys.version_info[:3]
309 if sys.version_info.releaselevel == "final"
310 and not sys.version_info.serial
311 else sys.version_info,
312 )
313 )
315 return Resource(
316 {
317 PROCESS_RUNTIME_DESCRIPTION: sys.version,
318 PROCESS_RUNTIME_NAME: sys.implementation.name,
319 PROCESS_RUNTIME_VERSION: _runtime_version,
320 }
321 )
324def get_aggregated_resources(
325 detectors: typing.List["ResourceDetector"],
326 initial_resource: typing.Optional[Resource] = None,
327 timeout=5,
328) -> "Resource":
329 """Retrieves resources from detectors in the order that they were passed
331 :param detectors: List of resources in order of priority
332 :param initial_resource: Static resource. This has highest priority
333 :param timeout: Number of seconds to wait for each detector to return
334 :return:
335 """
336 detectors_merged_resource = initial_resource or Resource.create()
338 with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
339 futures = [executor.submit(detector.detect) for detector in detectors]
340 for detector_ind, future in enumerate(futures):
341 detector = detectors[detector_ind]
342 try:
343 detected_resource = future.result(timeout=timeout)
344 # pylint: disable=broad-except
345 except Exception as ex:
346 detected_resource = _EMPTY_RESOURCE
347 if detector.raise_on_error:
348 raise ex
349 logger.warning(
350 "Exception %s in detector %s, ignoring", ex, detector
351 )
352 finally:
353 detectors_merged_resource = detectors_merged_resource.merge(
354 detected_resource
355 )
357 return detectors_merged_resource