Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/opencensus/common/utils/__init__.py: 56%

52 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-06 06:04 +0000

1# Copyright 2018, OpenCensus Authors 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15try: 

16 from weakref import WeakMethod 

17except ImportError: 

18 from opencensus.common.backports import WeakMethod 

19 

20import calendar 

21import datetime 

22import weakref 

23 

24UTF8 = 'utf-8' 

25 

26# Max length is 128 bytes for a truncatable string. 

27MAX_LENGTH = 128 

28 

29ISO_DATETIME_REGEX = '%Y-%m-%dT%H:%M:%S.%fZ' 

30 

31 

32def get_truncatable_str(str_to_convert): 

33 """Truncate a string if exceed limit and record the truncated bytes 

34 count. 

35 """ 

36 truncated, truncated_byte_count = check_str_length( 

37 str_to_convert, MAX_LENGTH) 

38 

39 result = { 

40 'value': truncated, 

41 'truncated_byte_count': truncated_byte_count, 

42 } 

43 return result 

44 

45 

46def check_str_length(str_to_check, limit=MAX_LENGTH): 

47 """Check the length of a string. If exceeds limit, then truncate it. 

48 

49 :type str_to_check: str 

50 :param str_to_check: String to check. 

51 

52 :type limit: int 

53 :param limit: The upper limit of the length. 

54 

55 :rtype: tuple 

56 :returns: The string it self if not exceeded length, or truncated string 

57 if exceeded and the truncated byte count. 

58 """ 

59 str_bytes = str_to_check.encode(UTF8) 

60 str_len = len(str_bytes) 

61 truncated_byte_count = 0 

62 

63 if str_len > limit: 

64 truncated_byte_count = str_len - limit 

65 str_bytes = str_bytes[:limit] 

66 

67 result = str(str_bytes.decode(UTF8, errors='ignore')) 

68 

69 return (result, truncated_byte_count) 

70 

71 

72def to_iso_str(ts=None): 

73 """Get an ISO 8601 string for a UTC datetime.""" 

74 if ts is None: 

75 ts = datetime.datetime.utcnow() 

76 return ts.strftime("%Y-%m-%dT%H:%M:%S.%fZ") 

77 

78 

79def timestamp_to_microseconds(timestamp): 

80 """Convert a timestamp string into a microseconds value 

81 :param timestamp 

82 :return time in microseconds 

83 """ 

84 timestamp_str = datetime.datetime.strptime(timestamp, ISO_DATETIME_REGEX) 

85 epoch_time_secs = calendar.timegm(timestamp_str.timetuple()) 

86 epoch_time_mus = epoch_time_secs * 1e6 + timestamp_str.microsecond 

87 return epoch_time_mus 

88 

89 

90def iuniq(ible): 

91 """Get an iterator over unique items of `ible`.""" 

92 items = set() 

93 for item in ible: 

94 if item not in items: 

95 items.add(item) 

96 yield item 

97 

98 

99def uniq(ible): 

100 """Get a list of unique items of `ible`.""" 

101 return list(iuniq(ible)) 

102 

103 

104def window(ible, length): 

105 """Split `ible` into multiple lists of length `length`. 

106 

107 >>> list(window(range(5), 2)) 

108 [[0, 1], [2, 3], [4]] 

109 """ 

110 if length <= 0: # pragma: NO COVER 

111 raise ValueError 

112 ible = iter(ible) 

113 while True: 

114 elts = [xx for ii, xx in zip(range(length), ible)] 

115 if elts: 

116 yield elts 

117 else: 

118 break 

119 

120 

121def get_weakref(func): 

122 """Get a weak reference to bound or unbound `func`. 

123 

124 If `func` is unbound (i.e. has no __self__ attr) get a weakref.ref, 

125 otherwise get a wrapper that simulates weakref.ref. 

126 """ 

127 if func is None: 

128 raise ValueError 

129 if not hasattr(func, '__self__'): 

130 return weakref.ref(func) 

131 return WeakMethod(func)