1import json
2import mmap
3import os
4import struct
5from typing import List
6
7_INITIAL_MMAP_SIZE = 1 << 16
8_pack_integer_func = struct.Struct(b'i').pack
9_pack_two_doubles_func = struct.Struct(b'dd').pack
10_unpack_integer = struct.Struct(b'i').unpack_from
11_unpack_two_doubles = struct.Struct(b'dd').unpack_from
12
13
14# struct.pack_into has atomicity issues because it will temporarily write 0 into
15# the mmap, resulting in false reads to 0 when experiencing a lot of writes.
16# Using direct assignment solves this issue.
17
18
19def _pack_two_doubles(data, pos, value, timestamp):
20 data[pos:pos + 16] = _pack_two_doubles_func(value, timestamp)
21
22
23def _pack_integer(data, pos, value):
24 data[pos:pos + 4] = _pack_integer_func(value)
25
26
27def _read_all_values(data, used=0):
28 """Yield (key, value, timestamp, pos). No locking is performed."""
29
30 if used <= 0:
31 # If not valid `used` value is passed in, read it from the file.
32 used = _unpack_integer(data, 0)[0]
33
34 pos = 8
35
36 while pos < used:
37 encoded_len = _unpack_integer(data, pos)[0]
38 # check we are not reading beyond bounds
39 if encoded_len + pos > used:
40 raise RuntimeError('Read beyond file size detected, file is corrupted.')
41 pos += 4
42 encoded_key = data[pos:pos + encoded_len]
43 padded_len = encoded_len + (8 - (encoded_len + 4) % 8)
44 pos += padded_len
45 value, timestamp = _unpack_two_doubles(data, pos)
46 yield encoded_key.decode('utf-8'), value, timestamp, pos
47 pos += 16
48
49
50class MmapedDict:
51 """A dict of doubles, backed by an mmapped file.
52
53 The file starts with a 4 byte int, indicating how much of it is used.
54 Then 4 bytes of padding.
55 There's then a number of entries, consisting of a 4 byte int which is the
56 size of the next field, a utf-8 encoded string key, padding to a 8 byte
57 alignment, and then a 8 byte float which is the value and a 8 byte float
58 which is a UNIX timestamp in seconds.
59
60 Not thread safe.
61 """
62
63 def __init__(self, filename, read_mode=False):
64 self._f = open(filename, 'rb' if read_mode else 'a+b')
65 self._fname = filename
66 capacity = os.fstat(self._f.fileno()).st_size
67 if capacity == 0:
68 self._f.truncate(_INITIAL_MMAP_SIZE)
69 capacity = _INITIAL_MMAP_SIZE
70 self._capacity = capacity
71 self._m = mmap.mmap(self._f.fileno(), self._capacity,
72 access=mmap.ACCESS_READ if read_mode else mmap.ACCESS_WRITE)
73
74 self._positions = {}
75 self._used = _unpack_integer(self._m, 0)[0]
76 if self._used == 0:
77 self._used = 8
78 _pack_integer(self._m, 0, self._used)
79 else:
80 if not read_mode:
81 for key, _, _, pos in self._read_all_values():
82 self._positions[key] = pos
83
84 @staticmethod
85 def read_all_values_from_file(filename):
86 with open(filename, 'rb') as infp:
87 # Read the first block of data, including the first 4 bytes which tell us
88 # how much of the file (which is preallocated to _INITIAL_MMAP_SIZE bytes) is occupied.
89 data = infp.read(mmap.PAGESIZE)
90 used = _unpack_integer(data, 0)[0]
91 if used > len(data): # Then read in the rest, if needed.
92 data += infp.read(used - len(data))
93 return _read_all_values(data, used)
94
95 def _init_value(self, key):
96 """Initialize a value. Lock must be held by caller."""
97 encoded = key.encode('utf-8')
98 # Pad to be 8-byte aligned.
99 padded = encoded + (b' ' * (8 - (len(encoded) + 4) % 8))
100 value = struct.pack(f'i{len(padded)}sdd'.encode(), len(encoded), padded, 0.0, 0.0)
101 while self._used + len(value) > self._capacity:
102 self._capacity *= 2
103 self._f.truncate(self._capacity)
104 self._m = mmap.mmap(self._f.fileno(), self._capacity)
105 self._m[self._used:self._used + len(value)] = value
106
107 # Update how much space we've used.
108 self._used += len(value)
109 _pack_integer(self._m, 0, self._used)
110 self._positions[key] = self._used - 16
111
112 def _read_all_values(self):
113 """Yield (key, value, pos). No locking is performed."""
114 return _read_all_values(data=self._m, used=self._used)
115
116 def read_all_values(self):
117 """Yield (key, value, timestamp). No locking is performed."""
118 for k, v, ts, _ in self._read_all_values():
119 yield k, v, ts
120
121 def read_value(self, key):
122 if key not in self._positions:
123 self._init_value(key)
124 pos = self._positions[key]
125 return _unpack_two_doubles(self._m, pos)
126
127 def write_value(self, key, value, timestamp):
128 if key not in self._positions:
129 self._init_value(key)
130 pos = self._positions[key]
131 _pack_two_doubles(self._m, pos, value, timestamp)
132
133 def close(self):
134 if self._f:
135 self._m.close()
136 self._m = None
137 self._f.close()
138 self._f = None
139
140
141def mmap_key(metric_name: str, name: str, labelnames: List[str], labelvalues: List[str], help_text: str) -> str:
142 """Format a key for use in the mmap file."""
143 # ensure labels are in consistent order for identity
144 labels = dict(zip(labelnames, labelvalues))
145 return json.dumps([metric_name, name, labels, help_text], sort_keys=True)