1# Copyright The OpenTelemetry Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import logging
16import threading
17from collections import OrderedDict
18from collections.abc import MutableMapping
19from typing import Optional, Sequence, Tuple, Union
20
21from opentelemetry.util import types
22
23# bytes are accepted as a user supplied value for attributes but
24# decoded to strings internally.
25_VALID_ATTR_VALUE_TYPES = (bool, str, bytes, int, float)
26
27
28_logger = logging.getLogger(__name__)
29
30
31def _clean_attribute(
32 key: str, value: types.AttributeValue, max_len: Optional[int]
33) -> Optional[Union[types.AttributeValue, Tuple[Union[str, int, float], ...]]]:
34 """Checks if attribute value is valid and cleans it if required.
35
36 The function returns the cleaned value or None if the value is not valid.
37
38 An attribute value is valid if it is either:
39 - A primitive type: string, boolean, double precision floating
40 point (IEEE 754-1985) or integer.
41 - An array of primitive type values. The array MUST be homogeneous,
42 i.e. it MUST NOT contain values of different types.
43
44 An attribute needs cleansing if:
45 - Its length is greater than the maximum allowed length.
46 - It needs to be encoded/decoded e.g, bytes to strings.
47 """
48
49 if not (key and isinstance(key, str)):
50 _logger.warning("invalid key `%s`. must be non-empty string.", key)
51 return None
52
53 if isinstance(value, _VALID_ATTR_VALUE_TYPES):
54 return _clean_attribute_value(value, max_len)
55
56 if isinstance(value, Sequence):
57 sequence_first_valid_type = None
58 cleaned_seq = []
59
60 for element in value:
61 element = _clean_attribute_value(element, max_len) # type: ignore
62 if element is None:
63 cleaned_seq.append(element)
64 continue
65
66 element_type = type(element)
67 # Reject attribute value if sequence contains a value with an incompatible type.
68 if element_type not in _VALID_ATTR_VALUE_TYPES:
69 _logger.warning(
70 "Invalid type %s in attribute '%s' value sequence. Expected one of "
71 "%s or None",
72 element_type.__name__,
73 key,
74 [
75 valid_type.__name__
76 for valid_type in _VALID_ATTR_VALUE_TYPES
77 ],
78 )
79 return None
80
81 # The type of the sequence must be homogeneous. The first non-None
82 # element determines the type of the sequence
83 if sequence_first_valid_type is None:
84 sequence_first_valid_type = element_type
85 # use equality instead of isinstance as isinstance(True, int) evaluates to True
86 elif element_type != sequence_first_valid_type:
87 _logger.warning(
88 "Attribute %r mixes types %s and %s in attribute value sequence",
89 key,
90 sequence_first_valid_type.__name__,
91 type(element).__name__,
92 )
93 return None
94
95 cleaned_seq.append(element)
96
97 # Freeze mutable sequences defensively
98 return tuple(cleaned_seq)
99
100 _logger.warning(
101 "Invalid type %s for attribute '%s' value. Expected one of %s or a "
102 "sequence of those types",
103 type(value).__name__,
104 key,
105 [valid_type.__name__ for valid_type in _VALID_ATTR_VALUE_TYPES],
106 )
107 return None
108
109
110def _clean_attribute_value(
111 value: types.AttributeValue, limit: Optional[int]
112) -> Optional[types.AttributeValue]:
113 if value is None:
114 return None
115
116 if isinstance(value, bytes):
117 try:
118 value = value.decode()
119 except UnicodeDecodeError:
120 _logger.warning("Byte attribute could not be decoded.")
121 return None
122
123 if limit is not None and isinstance(value, str):
124 value = value[:limit]
125 return value
126
127
128class BoundedAttributes(MutableMapping): # type: ignore
129 """An ordered dict with a fixed max capacity.
130
131 Oldest elements are dropped when the dict is full and a new element is
132 added.
133 """
134
135 def __init__(
136 self,
137 maxlen: Optional[int] = None,
138 attributes: types.Attributes = None,
139 immutable: bool = True,
140 max_value_len: Optional[int] = None,
141 ):
142 if maxlen is not None:
143 if not isinstance(maxlen, int) or maxlen < 0:
144 raise ValueError(
145 "maxlen must be valid int greater or equal to 0"
146 )
147 self.maxlen = maxlen
148 self.dropped = 0
149 self.max_value_len = max_value_len
150 # OrderedDict is not used until the maxlen is reached for efficiency.
151
152 self._dict: Union[
153 MutableMapping[str, types.AttributeValue],
154 OrderedDict[str, types.AttributeValue],
155 ] = {}
156 self._lock = threading.RLock()
157 if attributes:
158 for key, value in attributes.items():
159 self[key] = value
160 self._immutable = immutable
161
162 def __repr__(self) -> str:
163 return f"{dict(self._dict)}"
164
165 def __getitem__(self, key: str) -> types.AttributeValue:
166 return self._dict[key]
167
168 def __setitem__(self, key: str, value: types.AttributeValue) -> None:
169 if getattr(self, "_immutable", False): # type: ignore
170 raise TypeError
171 with self._lock:
172 if self.maxlen is not None and self.maxlen == 0:
173 self.dropped += 1
174 return
175
176 value = _clean_attribute(key, value, self.max_value_len) # type: ignore
177 if value is not None:
178 if key in self._dict:
179 del self._dict[key]
180 elif (
181 self.maxlen is not None and len(self._dict) == self.maxlen
182 ):
183 if not isinstance(self._dict, OrderedDict):
184 self._dict = OrderedDict(self._dict)
185 self._dict.popitem(last=False) # type: ignore
186 self.dropped += 1
187
188 self._dict[key] = value # type: ignore
189
190 def __delitem__(self, key: str) -> None:
191 if getattr(self, "_immutable", False): # type: ignore
192 raise TypeError
193 with self._lock:
194 del self._dict[key]
195
196 def __iter__(self): # type: ignore
197 with self._lock:
198 return iter(self._dict.copy()) # type: ignore
199
200 def __len__(self) -> int:
201 return len(self._dict)
202
203 def copy(self): # type: ignore
204 return self._dict.copy() # type: ignore