Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/prometheus_client/mmap_dict.py: 26%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

91 statements  

1import json 

2import mmap 

3import os 

4import struct 

5from typing import List 

6 

7_INITIAL_MMAP_SIZE = 1 << 16 

8_pack_integer_func = struct.Struct(b'i').pack 

9_pack_two_doubles_func = struct.Struct(b'dd').pack 

10_unpack_integer = struct.Struct(b'i').unpack_from 

11_unpack_two_doubles = struct.Struct(b'dd').unpack_from 

12 

13 

14# struct.pack_into has atomicity issues because it will temporarily write 0 into 

15# the mmap, resulting in false reads to 0 when experiencing a lot of writes. 

16# Using direct assignment solves this issue. 

17 

18 

19def _pack_two_doubles(data, pos, value, timestamp): 

20 data[pos:pos + 16] = _pack_two_doubles_func(value, timestamp) 

21 

22 

23def _pack_integer(data, pos, value): 

24 data[pos:pos + 4] = _pack_integer_func(value) 

25 

26 

27def _read_all_values(data, used=0): 

28 """Yield (key, value, timestamp, pos). No locking is performed.""" 

29 

30 if used <= 0: 

31 # If not valid `used` value is passed in, read it from the file. 

32 used = _unpack_integer(data, 0)[0] 

33 

34 pos = 8 

35 

36 while pos < used: 

37 encoded_len = _unpack_integer(data, pos)[0] 

38 # check we are not reading beyond bounds 

39 if encoded_len + pos > used: 

40 raise RuntimeError('Read beyond file size detected, file is corrupted.') 

41 pos += 4 

42 encoded_key = data[pos:pos + encoded_len] 

43 padded_len = encoded_len + (8 - (encoded_len + 4) % 8) 

44 pos += padded_len 

45 value, timestamp = _unpack_two_doubles(data, pos) 

46 yield encoded_key.decode('utf-8'), value, timestamp, pos 

47 pos += 16 

48 

49 

50class MmapedDict: 

51 """A dict of doubles, backed by an mmapped file. 

52 

53 The file starts with a 4 byte int, indicating how much of it is used. 

54 Then 4 bytes of padding. 

55 There's then a number of entries, consisting of a 4 byte int which is the 

56 size of the next field, a utf-8 encoded string key, padding to a 8 byte 

57 alignment, and then a 8 byte float which is the value and a 8 byte float 

58 which is a UNIX timestamp in seconds. 

59 

60 Not thread safe. 

61 """ 

62 

63 def __init__(self, filename, read_mode=False): 

64 self._f = open(filename, 'rb' if read_mode else 'a+b') 

65 self._fname = filename 

66 capacity = os.fstat(self._f.fileno()).st_size 

67 if capacity == 0: 

68 self._f.truncate(_INITIAL_MMAP_SIZE) 

69 capacity = _INITIAL_MMAP_SIZE 

70 self._capacity = capacity 

71 self._m = mmap.mmap(self._f.fileno(), self._capacity, 

72 access=mmap.ACCESS_READ if read_mode else mmap.ACCESS_WRITE) 

73 

74 self._positions = {} 

75 self._used = _unpack_integer(self._m, 0)[0] 

76 if self._used == 0: 

77 self._used = 8 

78 _pack_integer(self._m, 0, self._used) 

79 else: 

80 if not read_mode: 

81 for key, _, _, pos in self._read_all_values(): 

82 self._positions[key] = pos 

83 

84 @staticmethod 

85 def read_all_values_from_file(filename): 

86 with open(filename, 'rb') as infp: 

87 # Read the first block of data, including the first 4 bytes which tell us 

88 # how much of the file (which is preallocated to _INITIAL_MMAP_SIZE bytes) is occupied. 

89 data = infp.read(mmap.PAGESIZE) 

90 used = _unpack_integer(data, 0)[0] 

91 if used > len(data): # Then read in the rest, if needed. 

92 data += infp.read(used - len(data)) 

93 return _read_all_values(data, used) 

94 

95 def _init_value(self, key): 

96 """Initialize a value. Lock must be held by caller.""" 

97 encoded = key.encode('utf-8') 

98 # Pad to be 8-byte aligned. 

99 padded = encoded + (b' ' * (8 - (len(encoded) + 4) % 8)) 

100 value = struct.pack(f'i{len(padded)}sdd'.encode(), len(encoded), padded, 0.0, 0.0) 

101 while self._used + len(value) > self._capacity: 

102 self._capacity *= 2 

103 self._f.truncate(self._capacity) 

104 self._m = mmap.mmap(self._f.fileno(), self._capacity) 

105 self._m[self._used:self._used + len(value)] = value 

106 

107 # Update how much space we've used. 

108 self._used += len(value) 

109 _pack_integer(self._m, 0, self._used) 

110 self._positions[key] = self._used - 16 

111 

112 def _read_all_values(self): 

113 """Yield (key, value, pos). No locking is performed.""" 

114 return _read_all_values(data=self._m, used=self._used) 

115 

116 def read_all_values(self): 

117 """Yield (key, value, timestamp). No locking is performed.""" 

118 for k, v, ts, _ in self._read_all_values(): 

119 yield k, v, ts 

120 

121 def read_value(self, key): 

122 if key not in self._positions: 

123 self._init_value(key) 

124 pos = self._positions[key] 

125 return _unpack_two_doubles(self._m, pos) 

126 

127 def write_value(self, key, value, timestamp): 

128 if key not in self._positions: 

129 self._init_value(key) 

130 pos = self._positions[key] 

131 _pack_two_doubles(self._m, pos, value, timestamp) 

132 

133 def close(self): 

134 if self._f: 

135 self._m.close() 

136 self._m = None 

137 self._f.close() 

138 self._f = None 

139 

140 

141def mmap_key(metric_name: str, name: str, labelnames: List[str], labelvalues: List[str], help_text: str) -> str: 

142 """Format a key for use in the mmap file.""" 

143 # ensure labels are in consistent order for identity 

144 labels = dict(zip(labelnames, labelvalues)) 

145 return json.dumps([metric_name, name, labels, help_text], sort_keys=True)