Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/opencensus/common/utils/__init__.py: 56%
52 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-06 06:04 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-06 06:04 +0000
1# Copyright 2018, OpenCensus Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15try:
16 from weakref import WeakMethod
17except ImportError:
18 from opencensus.common.backports import WeakMethod
20import calendar
21import datetime
22import weakref
24UTF8 = 'utf-8'
26# Max length is 128 bytes for a truncatable string.
27MAX_LENGTH = 128
29ISO_DATETIME_REGEX = '%Y-%m-%dT%H:%M:%S.%fZ'
32def get_truncatable_str(str_to_convert):
33 """Truncate a string if exceed limit and record the truncated bytes
34 count.
35 """
36 truncated, truncated_byte_count = check_str_length(
37 str_to_convert, MAX_LENGTH)
39 result = {
40 'value': truncated,
41 'truncated_byte_count': truncated_byte_count,
42 }
43 return result
46def check_str_length(str_to_check, limit=MAX_LENGTH):
47 """Check the length of a string. If exceeds limit, then truncate it.
49 :type str_to_check: str
50 :param str_to_check: String to check.
52 :type limit: int
53 :param limit: The upper limit of the length.
55 :rtype: tuple
56 :returns: The string it self if not exceeded length, or truncated string
57 if exceeded and the truncated byte count.
58 """
59 str_bytes = str_to_check.encode(UTF8)
60 str_len = len(str_bytes)
61 truncated_byte_count = 0
63 if str_len > limit:
64 truncated_byte_count = str_len - limit
65 str_bytes = str_bytes[:limit]
67 result = str(str_bytes.decode(UTF8, errors='ignore'))
69 return (result, truncated_byte_count)
72def to_iso_str(ts=None):
73 """Get an ISO 8601 string for a UTC datetime."""
74 if ts is None:
75 ts = datetime.datetime.utcnow()
76 return ts.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
79def timestamp_to_microseconds(timestamp):
80 """Convert a timestamp string into a microseconds value
81 :param timestamp
82 :return time in microseconds
83 """
84 timestamp_str = datetime.datetime.strptime(timestamp, ISO_DATETIME_REGEX)
85 epoch_time_secs = calendar.timegm(timestamp_str.timetuple())
86 epoch_time_mus = epoch_time_secs * 1e6 + timestamp_str.microsecond
87 return epoch_time_mus
90def iuniq(ible):
91 """Get an iterator over unique items of `ible`."""
92 items = set()
93 for item in ible:
94 if item not in items:
95 items.add(item)
96 yield item
99def uniq(ible):
100 """Get a list of unique items of `ible`."""
101 return list(iuniq(ible))
104def window(ible, length):
105 """Split `ible` into multiple lists of length `length`.
107 >>> list(window(range(5), 2))
108 [[0, 1], [2, 3], [4]]
109 """
110 if length <= 0: # pragma: NO COVER
111 raise ValueError
112 ible = iter(ible)
113 while True:
114 elts = [xx for ii, xx in zip(range(length), ible)]
115 if elts:
116 yield elts
117 else:
118 break
121def get_weakref(func):
122 """Get a weak reference to bound or unbound `func`.
124 If `func` is unbound (i.e. has no __self__ attr) get a weakref.ref,
125 otherwise get a wrapper that simulates weakref.ref.
126 """
127 if func is None:
128 raise ValueError
129 if not hasattr(func, '__self__'):
130 return weakref.ref(func)
131 return WeakMethod(func)