1# Copyright The OpenTelemetry Authors
2# SPDX-License-Identifier: Apache-2.0
3
4import copy
5import logging
6import threading
7from collections import OrderedDict
8from collections.abc import Mapping, MutableMapping, Sequence
9
10from opentelemetry.util import types
11
12# bytes are accepted as a user supplied value for attributes but
13# decoded to strings internally.
14_VALID_ATTR_VALUE_TYPES = (bool, str, bytes, int, float)
15# AnyValue possible values
16_VALID_ANY_VALUE_TYPES = (
17 type(None),
18 bool,
19 bytes,
20 int,
21 float,
22 str,
23 Sequence,
24 Mapping,
25)
26
27
28# TODO: Remove this workaround and revert to the simpler implementation
29# once Python 3.9 support is dropped (planned around May 2026).
30# This exists only to avoid issues caused by deprecated behavior in 3.9.
31def _type_name(t):
32 return getattr(t, "__name__", getattr(t, "_name", repr(t)))
33
34
35_logger = logging.getLogger(__name__)
36
37
38def _clean_attribute(
39 key: str, value: types.AttributeValue, max_len: int | None
40) -> types.AttributeValue | tuple[str | int | float, ...] | None:
41 """Checks if attribute value is valid and cleans it if required.
42
43 The function returns the cleaned value or None if the value is not valid.
44
45 An attribute value is valid if it is either:
46 - A primitive type: string, boolean, double precision floating
47 point (IEEE 754-1985) or integer.
48 - An array of primitive type values. The array MUST be homogeneous,
49 i.e. it MUST NOT contain values of different types.
50
51 An attribute needs cleansing if:
52 - Its length is greater than the maximum allowed length.
53 - It needs to be encoded/decoded e.g, bytes to strings.
54 """
55
56 if not (key and isinstance(key, str)):
57 _logger.warning("invalid key `%s`. must be non-empty string.", key)
58 return None
59
60 if isinstance(value, _VALID_ATTR_VALUE_TYPES):
61 return _clean_attribute_value(value, max_len)
62
63 if isinstance(value, Sequence):
64 sequence_first_valid_type = None
65 cleaned_seq = []
66
67 for element in value:
68 element = _clean_attribute_value(element, max_len) # type: ignore
69 if element is None:
70 cleaned_seq.append(element)
71 continue
72
73 element_type = type(element)
74 # Reject attribute value if sequence contains a value with an incompatible type.
75 if element_type not in _VALID_ATTR_VALUE_TYPES:
76 _logger.warning(
77 "Invalid type %s in attribute '%s' value sequence. Expected one of "
78 "%s or None",
79 element_type.__name__,
80 key,
81 [
82 valid_type.__name__
83 for valid_type in _VALID_ATTR_VALUE_TYPES
84 ],
85 )
86 return None
87
88 # The type of the sequence must be homogeneous. The first non-None
89 # element determines the type of the sequence
90 if sequence_first_valid_type is None:
91 sequence_first_valid_type = element_type
92 # use equality instead of isinstance as isinstance(True, int) evaluates to True
93 elif element_type != sequence_first_valid_type:
94 _logger.warning(
95 "Attribute %r mixes types %s and %s in attribute value sequence",
96 key,
97 sequence_first_valid_type.__name__,
98 type(element).__name__,
99 )
100 return None
101
102 cleaned_seq.append(element)
103
104 # Freeze mutable sequences defensively
105 return tuple(cleaned_seq)
106
107 _logger.warning(
108 "Invalid type %s for attribute '%s' value. Expected one of %s or a "
109 "sequence of those types",
110 type(value).__name__,
111 key,
112 [valid_type.__name__ for valid_type in _VALID_ATTR_VALUE_TYPES],
113 )
114 return None
115
116
117def _clean_extended_attribute_value( # pylint: disable=too-many-branches
118 value: types.AnyValue, max_len: int | None
119) -> types.AnyValue:
120 # for primitive types just return the value and eventually shorten the string length
121 if value is None or isinstance(value, _VALID_ATTR_VALUE_TYPES):
122 if max_len is not None and isinstance(value, str):
123 value = value[:max_len]
124 return value
125
126 if isinstance(value, Mapping):
127 cleaned_dict: dict[str, types.AnyValue] = {}
128 for key, element in value.items():
129 # skip invalid keys
130 if not (key and isinstance(key, str)):
131 _logger.warning(
132 "invalid key `%s`. must be non-empty string.", key
133 )
134 continue
135
136 cleaned_dict[key] = _clean_extended_attribute(
137 key=key, value=element, max_len=max_len
138 )
139
140 return cleaned_dict
141
142 if isinstance(value, Sequence):
143 sequence_first_valid_type = None
144 cleaned_seq: list[types.AnyValue] = []
145
146 for element in value:
147 if element is None:
148 cleaned_seq.append(element)
149 continue
150
151 if max_len is not None and isinstance(element, str):
152 element = element[:max_len]
153
154 element_type = type(element)
155 if element_type not in _VALID_ATTR_VALUE_TYPES:
156 element = _clean_extended_attribute_value(
157 element, max_len=max_len
158 )
159 element_type = type(element) # type: ignore
160
161 # The type of the sequence must be homogeneous. The first non-None
162 # element determines the type of the sequence
163 if sequence_first_valid_type is None:
164 sequence_first_valid_type = element_type
165 # use equality instead of isinstance as isinstance(True, int) evaluates to True
166 elif element_type != sequence_first_valid_type:
167 _logger.warning(
168 "Mixed types %s and %s in attribute value sequence",
169 sequence_first_valid_type.__name__,
170 type(element).__name__,
171 )
172 return None
173
174 cleaned_seq.append(element)
175
176 # Freeze mutable sequences defensively
177 return tuple(cleaned_seq)
178
179 # Some applications such as Django add values to log records whose types fall outside the
180 # primitive types and `_VALID_ANY_VALUE_TYPES`, i.e., they are not of type `AnyValue`.
181 # Rather than attempt to whitelist every possible instrumentation, we stringify those values here
182 # so they can still be represented as attributes, falling back to the original TypeError only if
183 # converting to string raises.
184 try:
185 return str(value)
186 except Exception:
187 raise TypeError(
188 f"Invalid type {type(value).__name__} for attribute value. "
189 f"Expected one of {[_type_name(valid_type) for valid_type in _VALID_ANY_VALUE_TYPES]} or a "
190 "sequence of those types",
191 )
192
193
194def _clean_extended_attribute(
195 key: str, value: types.AnyValue, max_len: int | None
196) -> types.AnyValue:
197 """Checks if attribute value is valid and cleans it if required.
198
199 The function returns the cleaned value or None if the value is not valid.
200
201 An attribute value is valid if it is an AnyValue.
202 An attribute needs cleansing if:
203 - Its length is greater than the maximum allowed length.
204 """
205
206 if not (key and isinstance(key, str)):
207 _logger.warning("invalid key `%s`. must be non-empty string.", key)
208 return None
209
210 try:
211 return _clean_extended_attribute_value(value, max_len=max_len)
212 except TypeError as exception:
213 _logger.warning("Attribute %s: %s", key, exception)
214 return None
215
216
217def _clean_attribute_value(
218 value: types.AttributeValue, limit: int | None
219) -> types.AttributeValue | None:
220 if value is None:
221 return None
222
223 if isinstance(value, bytes):
224 try:
225 value = value.decode()
226 except UnicodeDecodeError:
227 _logger.warning("Byte attribute could not be decoded.")
228 return None
229
230 if limit is not None and isinstance(value, str):
231 value = value[:limit]
232 return value
233
234
235class BoundedAttributes(MutableMapping): # type: ignore
236 """An ordered dict with a fixed max capacity.
237
238 Oldest elements are dropped when the dict is full and a new element is
239 added.
240 """
241
242 def __init__(
243 self,
244 maxlen: int | None = None,
245 attributes: types._ExtendedAttributes | None = None,
246 immutable: bool = True,
247 max_value_len: int | None = None,
248 extended_attributes: bool = False,
249 ):
250 if maxlen is not None:
251 if not isinstance(maxlen, int) or maxlen < 0:
252 raise ValueError(
253 "maxlen must be valid int greater or equal to 0"
254 )
255 self.maxlen = maxlen
256 self.dropped = 0
257 self.max_value_len = max_value_len
258 self._extended_attributes = extended_attributes
259 # OrderedDict is not used until the maxlen is reached for efficiency.
260
261 self._dict: (
262 MutableMapping[str, types.AnyValue]
263 | OrderedDict[str, types.AnyValue]
264 ) = {}
265 self._lock = threading.RLock()
266 if attributes:
267 for key, value in attributes.items():
268 self[key] = value
269 self._immutable = immutable
270
271 def __repr__(self) -> str:
272 return f"{dict(self._dict)}"
273
274 def __getitem__(self, key: str) -> types.AnyValue:
275 return self._dict[key]
276
277 def __setitem__(self, key: str, value: types.AnyValue) -> None:
278 if getattr(self, "_immutable", False): # type: ignore
279 raise TypeError
280 with self._lock:
281 if self.maxlen is not None and self.maxlen == 0:
282 self.dropped += 1
283 return
284
285 if self._extended_attributes:
286 value = _clean_extended_attribute(
287 key, value, self.max_value_len
288 )
289 else:
290 value = _clean_attribute(key, value, self.max_value_len) # type: ignore
291 if value is None:
292 return
293
294 if key in self._dict:
295 del self._dict[key]
296 elif self.maxlen is not None and len(self._dict) == self.maxlen:
297 if not isinstance(self._dict, OrderedDict):
298 self._dict = OrderedDict(self._dict)
299 self._dict.popitem(last=False) # type: ignore
300 self.dropped += 1
301
302 self._dict[key] = value # type: ignore
303
304 def __delitem__(self, key: str) -> None:
305 if getattr(self, "_immutable", False): # type: ignore
306 raise TypeError
307 with self._lock:
308 del self._dict[key]
309
310 def __iter__(self): # type: ignore
311 with self._lock:
312 return iter(self._dict.copy()) # type: ignore
313
314 def __len__(self) -> int:
315 return len(self._dict)
316
317 def __deepcopy__(self, memo: dict) -> "BoundedAttributes":
318 copy_ = BoundedAttributes(
319 maxlen=self.maxlen,
320 immutable=self._immutable,
321 max_value_len=self.max_value_len,
322 extended_attributes=self._extended_attributes,
323 )
324 memo[id(self)] = copy_
325 with self._lock:
326 # Assign _dict directly to avoid re-cleaning already clean values
327 # and to bypass the immutability guard in __setitem__
328 copy_._dict = copy.deepcopy(self._dict, memo)
329 copy_.dropped = self.dropped
330 return copy_
331
332 def copy(self): # type: ignore
333 return self._dict.copy() # type: ignore