Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/opentelemetry/sdk/resources/__init__.py: 76%

164 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:35 +0000

1# Copyright The OpenTelemetry Authors 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15""" 

16This package implements `OpenTelemetry Resources 

17<https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/resource/sdk.md#resource-sdk>`_: 

18 

19 *A Resource is an immutable representation of the entity producing 

20 telemetry. For example, a process producing telemetry that is running in 

21 a container on Kubernetes has a Pod name, it is in a namespace and 

22 possibly is part of a Deployment which also has a name. All three of 

23 these attributes can be included in the Resource.* 

24 

25Resource objects are created with `Resource.create`, which accepts attributes 

26(key-values). Resources should NOT be created via constructor, and working with 

27`Resource` objects should only be done via the Resource API methods. Resource 

28attributes can also be passed at process invocation in the 

29:envvar:`OTEL_RESOURCE_ATTRIBUTES` environment variable. You should register 

30your resource with the `opentelemetry.sdk.trace.TracerProvider` by passing 

31them into their constructors. The `Resource` passed to a provider is available 

32to the exporter, which can send on this information as it sees fit. 

33 

34.. code-block:: python 

35 

36 trace.set_tracer_provider( 

37 TracerProvider( 

38 resource=Resource.create({ 

39 "service.name": "shoppingcart", 

40 "service.instance.id": "instance-12", 

41 }), 

42 ), 

43 ) 

44 print(trace.get_tracer_provider().resource.attributes) 

45 

46 {'telemetry.sdk.language': 'python', 

47 'telemetry.sdk.name': 'opentelemetry', 

48 'telemetry.sdk.version': '0.13.dev0', 

49 'service.name': 'shoppingcart', 

50 'service.instance.id': 'instance-12'} 

51 

52Note that the OpenTelemetry project documents certain `"standard attributes" 

53<https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/resource/semantic_conventions/README.md>`_ 

54that have prescribed semantic meanings, for example ``service.name`` in the 

55above example. 

56 """ 

57 

58import abc 

59import concurrent.futures 

60import logging 

61import os 

62import sys 

63import typing 

64from json import dumps 

65from urllib import parse 

66 

67import pkg_resources 

68 

69from opentelemetry.attributes import BoundedAttributes 

70from opentelemetry.sdk.environment_variables import ( 

71 OTEL_RESOURCE_ATTRIBUTES, 

72 OTEL_SERVICE_NAME, 

73) 

74from opentelemetry.semconv.resource import ResourceAttributes 

75from opentelemetry.util.types import AttributeValue 

76 

77LabelValue = AttributeValue 

78Attributes = typing.Dict[str, LabelValue] 

79logger = logging.getLogger(__name__) 

80 

81 

82CLOUD_PROVIDER = ResourceAttributes.CLOUD_PROVIDER 

83CLOUD_ACCOUNT_ID = ResourceAttributes.CLOUD_ACCOUNT_ID 

84CLOUD_REGION = ResourceAttributes.CLOUD_REGION 

85CLOUD_AVAILABILITY_ZONE = ResourceAttributes.CLOUD_AVAILABILITY_ZONE 

86CONTAINER_NAME = ResourceAttributes.CONTAINER_NAME 

87CONTAINER_ID = ResourceAttributes.CONTAINER_ID 

88CONTAINER_IMAGE_NAME = ResourceAttributes.CONTAINER_IMAGE_NAME 

89CONTAINER_IMAGE_TAG = ResourceAttributes.CONTAINER_IMAGE_TAG 

90DEPLOYMENT_ENVIRONMENT = ResourceAttributes.DEPLOYMENT_ENVIRONMENT 

91FAAS_NAME = ResourceAttributes.FAAS_NAME 

92FAAS_ID = ResourceAttributes.FAAS_ID 

93FAAS_VERSION = ResourceAttributes.FAAS_VERSION 

94FAAS_INSTANCE = ResourceAttributes.FAAS_INSTANCE 

95HOST_NAME = ResourceAttributes.HOST_NAME 

96HOST_TYPE = ResourceAttributes.HOST_TYPE 

97HOST_IMAGE_NAME = ResourceAttributes.HOST_IMAGE_NAME 

98HOST_IMAGE_ID = ResourceAttributes.HOST_IMAGE_ID 

99HOST_IMAGE_VERSION = ResourceAttributes.HOST_IMAGE_VERSION 

100KUBERNETES_CLUSTER_NAME = ResourceAttributes.K8S_CLUSTER_NAME 

101KUBERNETES_NAMESPACE_NAME = ResourceAttributes.K8S_NAMESPACE_NAME 

102KUBERNETES_POD_UID = ResourceAttributes.K8S_POD_UID 

103KUBERNETES_POD_NAME = ResourceAttributes.K8S_POD_NAME 

104KUBERNETES_CONTAINER_NAME = ResourceAttributes.K8S_CONTAINER_NAME 

105KUBERNETES_REPLICA_SET_UID = ResourceAttributes.K8S_REPLICASET_UID 

106KUBERNETES_REPLICA_SET_NAME = ResourceAttributes.K8S_REPLICASET_NAME 

107KUBERNETES_DEPLOYMENT_UID = ResourceAttributes.K8S_DEPLOYMENT_UID 

108KUBERNETES_DEPLOYMENT_NAME = ResourceAttributes.K8S_DEPLOYMENT_NAME 

109KUBERNETES_STATEFUL_SET_UID = ResourceAttributes.K8S_STATEFULSET_UID 

110KUBERNETES_STATEFUL_SET_NAME = ResourceAttributes.K8S_STATEFULSET_NAME 

111KUBERNETES_DAEMON_SET_UID = ResourceAttributes.K8S_DAEMONSET_UID 

112KUBERNETES_DAEMON_SET_NAME = ResourceAttributes.K8S_DAEMONSET_NAME 

113KUBERNETES_JOB_UID = ResourceAttributes.K8S_JOB_UID 

114KUBERNETES_JOB_NAME = ResourceAttributes.K8S_JOB_NAME 

115KUBERNETES_CRON_JOB_UID = ResourceAttributes.K8S_CRONJOB_UID 

116KUBERNETES_CRON_JOB_NAME = ResourceAttributes.K8S_CRONJOB_NAME 

117OS_TYPE = ResourceAttributes.OS_TYPE 

118OS_DESCRIPTION = ResourceAttributes.OS_DESCRIPTION 

119PROCESS_PID = ResourceAttributes.PROCESS_PID 

120PROCESS_EXECUTABLE_NAME = ResourceAttributes.PROCESS_EXECUTABLE_NAME 

121PROCESS_EXECUTABLE_PATH = ResourceAttributes.PROCESS_EXECUTABLE_PATH 

122PROCESS_COMMAND = ResourceAttributes.PROCESS_COMMAND 

123PROCESS_COMMAND_LINE = ResourceAttributes.PROCESS_COMMAND_LINE 

124PROCESS_COMMAND_ARGS = ResourceAttributes.PROCESS_COMMAND_ARGS 

125PROCESS_OWNER = ResourceAttributes.PROCESS_OWNER 

126PROCESS_RUNTIME_NAME = ResourceAttributes.PROCESS_RUNTIME_NAME 

127PROCESS_RUNTIME_VERSION = ResourceAttributes.PROCESS_RUNTIME_VERSION 

128PROCESS_RUNTIME_DESCRIPTION = ResourceAttributes.PROCESS_RUNTIME_DESCRIPTION 

129SERVICE_NAME = ResourceAttributes.SERVICE_NAME 

130SERVICE_NAMESPACE = ResourceAttributes.SERVICE_NAMESPACE 

131SERVICE_INSTANCE_ID = ResourceAttributes.SERVICE_INSTANCE_ID 

132SERVICE_VERSION = ResourceAttributes.SERVICE_VERSION 

133TELEMETRY_SDK_NAME = ResourceAttributes.TELEMETRY_SDK_NAME 

134TELEMETRY_SDK_VERSION = ResourceAttributes.TELEMETRY_SDK_VERSION 

135TELEMETRY_AUTO_VERSION = ResourceAttributes.TELEMETRY_AUTO_VERSION 

136TELEMETRY_SDK_LANGUAGE = ResourceAttributes.TELEMETRY_SDK_LANGUAGE 

137 

138 

139_OPENTELEMETRY_SDK_VERSION = pkg_resources.get_distribution( 

140 "opentelemetry-sdk" 

141).version 

142 

143 

144class Resource: 

145 """A Resource is an immutable representation of the entity producing telemetry as Attributes.""" 

146 

147 def __init__( 

148 self, attributes: Attributes, schema_url: typing.Optional[str] = None 

149 ): 

150 self._attributes = BoundedAttributes(attributes=attributes) 

151 if schema_url is None: 

152 schema_url = "" 

153 self._schema_url = schema_url 

154 

155 @staticmethod 

156 def create( 

157 attributes: typing.Optional[Attributes] = None, 

158 schema_url: typing.Optional[str] = None, 

159 ) -> "Resource": 

160 """Creates a new `Resource` from attributes. 

161 

162 Args: 

163 attributes: Optional zero or more key-value pairs. 

164 schema_url: Optional URL pointing to the schema 

165 

166 Returns: 

167 The newly-created Resource. 

168 """ 

169 if not attributes: 

170 attributes = {} 

171 resource = _DEFAULT_RESOURCE.merge( 

172 OTELResourceDetector().detect() 

173 ).merge(Resource(attributes, schema_url)) 

174 if not resource.attributes.get(SERVICE_NAME, None): 

175 default_service_name = "unknown_service" 

176 process_executable_name = resource.attributes.get( 

177 PROCESS_EXECUTABLE_NAME, None 

178 ) 

179 if process_executable_name: 

180 default_service_name += ":" + process_executable_name 

181 resource = resource.merge( 

182 Resource({SERVICE_NAME: default_service_name}, schema_url) 

183 ) 

184 return resource 

185 

186 @staticmethod 

187 def get_empty() -> "Resource": 

188 return _EMPTY_RESOURCE 

189 

190 @property 

191 def attributes(self) -> Attributes: 

192 return self._attributes 

193 

194 @property 

195 def schema_url(self) -> str: 

196 return self._schema_url 

197 

198 def merge(self, other: "Resource") -> "Resource": 

199 """Merges this resource and an updating resource into a new `Resource`. 

200 

201 If a key exists on both the old and updating resource, the value of the 

202 updating resource will override the old resource value. 

203 

204 The updating resource's `schema_url` will be used only if the old 

205 `schema_url` is empty. Attempting to merge two resources with 

206 different, non-empty values for `schema_url` will result in an error 

207 and return the old resource. 

208 

209 Args: 

210 other: The other resource to be merged. 

211 

212 Returns: 

213 The newly-created Resource. 

214 """ 

215 merged_attributes = self.attributes.copy() 

216 merged_attributes.update(other.attributes) 

217 

218 if self.schema_url == "": 

219 schema_url = other.schema_url 

220 elif other.schema_url == "": 

221 schema_url = self.schema_url 

222 elif self.schema_url == other.schema_url: 

223 schema_url = other.schema_url 

224 else: 

225 logger.error( 

226 "Failed to merge resources: The two schemas %s and %s are incompatible", 

227 self.schema_url, 

228 other.schema_url, 

229 ) 

230 return self 

231 

232 return Resource(merged_attributes, schema_url) 

233 

234 def __eq__(self, other: object) -> bool: 

235 if not isinstance(other, Resource): 

236 return False 

237 return ( 

238 self._attributes == other._attributes 

239 and self._schema_url == other._schema_url 

240 ) 

241 

242 def __hash__(self): 

243 return hash( 

244 f"{dumps(self._attributes.copy(), sort_keys=True)}|{self._schema_url}" 

245 ) 

246 

247 def to_json(self, indent=4) -> str: 

248 return dumps( 

249 { 

250 "attributes": dict(self._attributes), 

251 "schema_url": self._schema_url, 

252 }, 

253 indent=indent, 

254 ) 

255 

256 

257_EMPTY_RESOURCE = Resource({}) 

258_DEFAULT_RESOURCE = Resource( 

259 { 

260 TELEMETRY_SDK_LANGUAGE: "python", 

261 TELEMETRY_SDK_NAME: "opentelemetry", 

262 TELEMETRY_SDK_VERSION: _OPENTELEMETRY_SDK_VERSION, 

263 } 

264) 

265 

266 

267class ResourceDetector(abc.ABC): 

268 def __init__(self, raise_on_error=False): 

269 self.raise_on_error = raise_on_error 

270 

271 @abc.abstractmethod 

272 def detect(self) -> "Resource": 

273 raise NotImplementedError() 

274 

275 

276class OTELResourceDetector(ResourceDetector): 

277 # pylint: disable=no-self-use 

278 def detect(self) -> "Resource": 

279 env_resources_items = os.environ.get(OTEL_RESOURCE_ATTRIBUTES) 

280 env_resource_map = {} 

281 

282 if env_resources_items: 

283 for item in env_resources_items.split(","): 

284 try: 

285 key, value = item.split("=", maxsplit=1) 

286 except ValueError as exc: 

287 logger.warning( 

288 "Invalid key value resource attribute pair %s: %s", 

289 item, 

290 exc, 

291 ) 

292 continue 

293 value_url_decoded = parse.unquote(value.strip()) 

294 env_resource_map[key.strip()] = value_url_decoded 

295 

296 service_name = os.environ.get(OTEL_SERVICE_NAME) 

297 if service_name: 

298 env_resource_map[SERVICE_NAME] = service_name 

299 return Resource(env_resource_map) 

300 

301 

302class ProcessResourceDetector(ResourceDetector): 

303 # pylint: disable=no-self-use 

304 def detect(self) -> "Resource": 

305 _runtime_version = ".".join( 

306 map( 

307 str, 

308 sys.version_info[:3] 

309 if sys.version_info.releaselevel == "final" 

310 and not sys.version_info.serial 

311 else sys.version_info, 

312 ) 

313 ) 

314 

315 return Resource( 

316 { 

317 PROCESS_RUNTIME_DESCRIPTION: sys.version, 

318 PROCESS_RUNTIME_NAME: sys.implementation.name, 

319 PROCESS_RUNTIME_VERSION: _runtime_version, 

320 } 

321 ) 

322 

323 

324def get_aggregated_resources( 

325 detectors: typing.List["ResourceDetector"], 

326 initial_resource: typing.Optional[Resource] = None, 

327 timeout=5, 

328) -> "Resource": 

329 """Retrieves resources from detectors in the order that they were passed 

330 

331 :param detectors: List of resources in order of priority 

332 :param initial_resource: Static resource. This has highest priority 

333 :param timeout: Number of seconds to wait for each detector to return 

334 :return: 

335 """ 

336 detectors_merged_resource = initial_resource or Resource.create() 

337 

338 with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: 

339 futures = [executor.submit(detector.detect) for detector in detectors] 

340 for detector_ind, future in enumerate(futures): 

341 detector = detectors[detector_ind] 

342 try: 

343 detected_resource = future.result(timeout=timeout) 

344 # pylint: disable=broad-except 

345 except Exception as ex: 

346 detected_resource = _EMPTY_RESOURCE 

347 if detector.raise_on_error: 

348 raise ex 

349 logger.warning( 

350 "Exception %s in detector %s, ignoring", ex, detector 

351 ) 

352 finally: 

353 detectors_merged_resource = detectors_merged_resource.merge( 

354 detected_resource 

355 ) 

356 

357 return detectors_merged_resource