/src/mozilla-central/dom/media/AsyncLogger.h
Line | Count | Source (jump to first uncovered line) |
1 | | /* This Source Code Form is subject to the terms of the Mozilla Public |
2 | | * License, v. 2.0. If a copy of the MPL was not distributed with this |
3 | | * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ |
4 | | |
5 | | /* Implementation of an asynchronous lock-free logging system. */ |
6 | | |
7 | | #ifndef mozilla_dom_AsyncLogger_h |
8 | | #define mozilla_dom_AsyncLogger_h |
9 | | |
10 | | #include <atomic> |
11 | | #include <thread> |
12 | | #include "mozilla/Logging.h" |
13 | | #include "mozilla/Attributes.h" |
14 | | #include "mozilla/MathAlgorithms.h" |
15 | | #include "mozilla/Sprintf.h" |
16 | | |
17 | | namespace mozilla { |
18 | | |
19 | | namespace detail { |
20 | | |
21 | | // This class implements a lock-free multiple producer single consumer queue of |
22 | | // fixed size log messages, with the following characteristics: |
23 | | // - Unbounded (uses a intrinsic linked list) |
24 | | // - Allocates on Push. Push can be called on any thread. |
25 | | // - Deallocates on Pop. Pop MUST always be called on the same thread for the |
26 | | // life-time of the queue. |
27 | | // |
28 | | // In our scenario, the producer threads are real-time, they can't block. The |
29 | | // consummer thread runs every now and then and empties the queue to a log |
30 | | // file, on disk. |
31 | | // |
32 | | // Having fixed size messages and jemalloc is probably not the fastest, but |
33 | | // allows having a simpler design, we count on the fact that jemalloc will get |
34 | | // the memory from a thread-local source most of the time. |
35 | | template<size_t MESSAGE_LENGTH> |
36 | | class MPSCQueue |
37 | | { |
38 | | public: |
39 | | struct Message { |
40 | | Message() |
41 | 3 | { |
42 | 3 | mNext.store(nullptr, std::memory_order_relaxed); |
43 | 3 | } |
44 | | Message(const Message& aMessage) = delete; |
45 | | void operator=(const Message& aMessage) = delete; |
46 | | |
47 | | char data[MESSAGE_LENGTH]; |
48 | | std::atomic<Message*> mNext; |
49 | | }; |
50 | | // Creates a new MPSCQueue. Initially, the queue has a single sentinel node, |
51 | | // pointed to by both mHead and mTail. |
52 | | MPSCQueue() |
53 | | // At construction, the initial message points to nullptr (it has no |
54 | | // successor). It is a sentinel node, that does not contain meaningful |
55 | | // data. |
56 | | : mHead(new Message()) |
57 | | , mTail(mHead.load(std::memory_order_relaxed)) |
58 | 3 | { } |
59 | | |
60 | | ~MPSCQueue() |
61 | 0 | { |
62 | 0 | Message dummy; |
63 | 0 | while (this->Pop(dummy.data)) {} |
64 | 0 | Message* front = mHead.load(std::memory_order_relaxed); |
65 | 0 | delete front; |
66 | 0 | } |
67 | | |
68 | | void |
69 | | Push(MPSCQueue<MESSAGE_LENGTH>::Message* aMessage) |
70 | 0 | { |
71 | 0 | // The next two non-commented line are called A and B in this paragraph. |
72 | 0 | // Producer threads i, i-1, etc. are numbered in the order they reached |
73 | 0 | // A in time, thread i being the thread that has reached A first. |
74 | 0 | // Atomically, on line A the new `mHead` is set to be the node that was |
75 | 0 | // just allocated, with strong memory order. From now one, any thread |
76 | 0 | // that reaches A will see that the node just allocated is |
77 | 0 | // effectively the head of the list, and will make itself the new head |
78 | 0 | // of the list. |
79 | 0 | // In a bad case (when thread i executes A and then |
80 | 0 | // is not scheduled for a long time), it is possible that thread i-1 and |
81 | 0 | // subsequent threads create a seemingly disconnected set of nodes, but |
82 | 0 | // they all have the correct value for the next node to set as their |
83 | 0 | // mNext member on their respective stacks (in `prev`), and this is |
84 | 0 | // always correct. When the scheduler resumes, and line B is executed, |
85 | 0 | // the correct linkage is resumed. |
86 | 0 | // Before line B, since mNext for the node the was the last element of |
87 | 0 | // the queue still has an mNext of nullptr, Pop will not see the node |
88 | 0 | // added. |
89 | 0 | // For line A, it's critical to have strong ordering both ways (since |
90 | 0 | // it's going to possibly be read and write repeatidly by multiple |
91 | 0 | // threads) |
92 | 0 | // Line B can have weaker guarantees, it's only going to be written by a |
93 | 0 | // single thread, and we just need to ensure it's read properly by a |
94 | 0 | // single other one. |
95 | 0 | Message* prev = mHead.exchange(aMessage, std::memory_order_acq_rel); |
96 | 0 | prev->mNext.store(aMessage, std::memory_order_release); |
97 | 0 | } |
98 | | |
99 | | // Allocates a new node, copy aInput to the new memory location, and pushes |
100 | | // it to the end of the list. |
101 | | void |
102 | | Push(const char aInput[MESSAGE_LENGTH]) |
103 | | { |
104 | | // Create a new message, and copy the messages passed on argument to the |
105 | | // new memory location. We are not touching the queue right now. The |
106 | | // successor for this new node is set to be nullptr. |
107 | | Message* msg = new Message(); |
108 | | strncpy(msg->data, aInput, MESSAGE_LENGTH); |
109 | | |
110 | | Push(msg); |
111 | | } |
112 | | |
113 | | // Copy the content of the first message of the queue to aOutput, and |
114 | | // frees the message. Returns true if there was a message, in which case |
115 | | // `aOutput` contains a valid value. If the queue was empty, returns false, |
116 | | // in which case `aOutput` is left untouched. |
117 | | bool |
118 | | Pop(char aOutput[MESSAGE_LENGTH]) |
119 | 0 | { |
120 | 0 | // Similarly, in this paragraph, the two following lines are called A |
121 | 0 | // and B, and threads are called thread i, i-1, etc. in order of |
122 | 0 | // execution of line A. |
123 | 0 | // On line A, the first element of the queue is acquired. It is simply a |
124 | 0 | // sentinel node. |
125 | 0 | // On line B, we acquire the node that has the data we want. If B is |
126 | 0 | // null, then only the sentinel node was present in the queue, we can |
127 | 0 | // safely return false. |
128 | 0 | // mTail can be loaded with relaxed ordering, since it's not written nor |
129 | 0 | // read by any other thread (this queue is single consumer). |
130 | 0 | // mNext can be written to by one of the producer, so it's necessary to |
131 | 0 | // ensure those writes are seen, hence the stricter ordering. |
132 | 0 | Message* tail = mTail.load(std::memory_order_relaxed); |
133 | 0 | Message* next = tail->mNext.load(std::memory_order_acquire); |
134 | 0 |
|
135 | 0 | if (next == nullptr) { |
136 | 0 | return false; |
137 | 0 | } |
138 | 0 | |
139 | 0 | strncpy(aOutput, next->data, MESSAGE_LENGTH); |
140 | 0 |
|
141 | 0 | // Simply shift the queue one node further, so that the sentinel node is |
142 | 0 | // now pointing to the correct most ancient node. It contains stale data, |
143 | 0 | // but this data will never be read again. |
144 | 0 | // It's only necessary to ensure the previous load on this thread is not |
145 | 0 | // reordered past this line, so release ordering is sufficient here. |
146 | 0 | mTail.store(next, std::memory_order_release); |
147 | 0 |
|
148 | 0 | // This thread is now the only thing that points to `tail`, it can be |
149 | 0 | // safely deleted. |
150 | 0 | delete tail; |
151 | 0 |
|
152 | 0 | return true; |
153 | 0 | } |
154 | | |
155 | | private: |
156 | | // An atomic pointer to the most recent message in the queue. |
157 | | std::atomic<Message*> mHead; |
158 | | // An atomic pointer to a sentinel node, that points to the oldest message |
159 | | // in the queue. |
160 | | std::atomic<Message*> mTail; |
161 | | |
162 | | MPSCQueue(const MPSCQueue&) = delete; |
163 | | void operator=(const MPSCQueue&) = delete; |
164 | | public: |
165 | | // The goal here is to make it easy on the allocator. We pack a pointer in the |
166 | | // message struct, and we still want to do power of two allocations to |
167 | | // minimize allocator slop. The allocation size are going to be constant, so |
168 | | // the allocation is probably going to hit the thread local cache in jemalloc, |
169 | | // making it cheap and, more importantly, lock-free enough. |
170 | | static const size_t MESSAGE_PADDING = sizeof(Message::mNext); |
171 | | private: |
172 | | static_assert(IsPowerOfTwo(MESSAGE_LENGTH + MESSAGE_PADDING), |
173 | | "MPSCQueue internal allocations must have a size that is a" |
174 | | "power of two "); |
175 | | }; |
176 | | } // end namespace detail |
177 | | |
178 | | // This class implements a lock-free asynchronous logger, that outputs to |
179 | | // MOZ_LOG. |
180 | | // Any thread can use this logger without external synchronization and without |
181 | | // being blocked. This log is suitable for use in real-time audio threads. |
182 | | // Log formatting is best done externally, this class implements the output |
183 | | // mechanism only. |
184 | | // This class uses a thread internally, and must be started and stopped |
185 | | // manually. |
186 | | // If logging is disabled, all the calls are no-op. |
187 | | class AsyncLogger |
188 | | { |
189 | | public: |
190 | | static const uint32_t MAX_MESSAGE_LENGTH = 512 - detail::MPSCQueue<sizeof(void*)>::MESSAGE_PADDING; |
191 | | |
192 | | // aLogModuleName is the name of the MOZ_LOG module. |
193 | | explicit AsyncLogger(const char* aLogModuleName) |
194 | | : mThread(nullptr) |
195 | | , mLogModule(aLogModuleName) |
196 | | , mRunning(false) |
197 | 3 | { } |
198 | | |
199 | | ~AsyncLogger() |
200 | 0 | { |
201 | 0 | if (Enabled()) { |
202 | 0 | Stop(); |
203 | 0 | } |
204 | 0 | } |
205 | | |
206 | | void Start() |
207 | 0 | { |
208 | 0 | MOZ_ASSERT(!mRunning, "Double calls to AsyncLogger::Start"); |
209 | 0 | if (Enabled()) { |
210 | 0 | mRunning = true; |
211 | 0 | Run(); |
212 | 0 | } |
213 | 0 | } |
214 | | |
215 | | void Stop() |
216 | 0 | { |
217 | 0 | if (Enabled()) { |
218 | 0 | if (mRunning) { |
219 | 0 | mRunning = false; |
220 | 0 | mThread->join(); |
221 | 0 | } |
222 | 0 | } else { |
223 | 0 | MOZ_ASSERT(!mRunning && !mThread); |
224 | 0 | } |
225 | 0 | } |
226 | | |
227 | | void Log(const char* format, ...) MOZ_FORMAT_PRINTF(2,3) |
228 | 0 | { |
229 | 0 | if (Enabled()) { |
230 | 0 | auto* msg = new detail::MPSCQueue<MAX_MESSAGE_LENGTH>::Message(); |
231 | 0 | va_list args; |
232 | 0 | va_start(args, format); |
233 | 0 | VsprintfLiteral(msg->data, format, args); |
234 | 0 | va_end(args); |
235 | 0 | mMessageQueue.Push(msg); |
236 | 0 | } |
237 | 0 | } |
238 | | |
239 | | bool Enabled() |
240 | 0 | { |
241 | 0 | return MOZ_LOG_TEST(mLogModule, mozilla::LogLevel::Verbose); |
242 | 0 | } |
243 | | |
244 | | private: |
245 | | void Run() |
246 | 0 | { |
247 | 0 | MOZ_ASSERT(Enabled()); |
248 | 0 | mThread.reset(new std::thread([this]() { |
249 | 0 | while (mRunning) { |
250 | 0 | char message[MAX_MESSAGE_LENGTH]; |
251 | 0 | while (mMessageQueue.Pop(message) && mRunning) { |
252 | 0 | MOZ_LOG(mLogModule, mozilla::LogLevel::Verbose, ("%s", message)); |
253 | 0 | } |
254 | 0 | Sleep(); |
255 | 0 | } |
256 | 0 | })); |
257 | 0 | } |
258 | | |
259 | 0 | void Sleep() { std::this_thread::sleep_for(std::chrono::milliseconds(10)); } |
260 | | |
261 | | std::unique_ptr<std::thread> mThread; |
262 | | mozilla::LazyLogModule mLogModule; |
263 | | detail::MPSCQueue<MAX_MESSAGE_LENGTH> mMessageQueue; |
264 | | std::atomic<bool> mRunning; |
265 | | }; |
266 | | |
267 | | } // end namespace mozilla |
268 | | |
269 | | #endif // mozilla_dom_AsyncLogger_h |