/src/resiprocate/rutil/AbstractFifo.hxx
Line | Count | Source (jump to first uncovered line) |
1 | | #ifndef RESIP_AbstractFifo_hxx |
2 | | #define RESIP_AbstractFifo_hxx |
3 | | |
4 | | #include "rutil/ResipAssert.h" |
5 | | #include <deque> |
6 | | |
7 | | #include "rutil/Mutex.hxx" |
8 | | #include "rutil/Condition.hxx" |
9 | | #include "rutil/Lock.hxx" |
10 | | #include "rutil/CongestionManager.hxx" |
11 | | |
12 | | #include "rutil/compat.hxx" |
13 | | #include "rutil/Timer.hxx" |
14 | | |
15 | | namespace resip |
16 | | { |
17 | | /** |
18 | | @brief Interface for providing metrics on FIFOs, primarily used by |
19 | | CongestionManager. |
20 | | Provides four different types of metrics: |
21 | | - size : The number of elements in the queue |
22 | | - time-depth : The age of the oldest item in the queue (ie, the front) |
23 | | - expected wait-time : A heuristic estimating the amount of time a message |
24 | | would take to be serviced if it were added to the queue. |
25 | | - average service time : The average time it takes to service a single |
26 | | element from the queue (this is helpful in congestion control, but is |
27 | | mostly intended for logging). |
28 | | */ |
29 | | class FifoStatsInterface |
30 | | { |
31 | | public: |
32 | | |
33 | | FifoStatsInterface(); |
34 | | virtual ~FifoStatsInterface(); |
35 | | |
36 | | /** |
37 | | Returns the expected time it will take to service all messages |
38 | | currently in the queue (in milli-seconds) |
39 | | */ |
40 | | virtual time_t expectedWaitTimeMilliSec() const =0; |
41 | | |
42 | | /** |
43 | | Returns the difference in time between the youngest and oldest item in |
44 | | the FIFO in seconds |
45 | | */ |
46 | | virtual time_t getTimeDepth() const = 0; |
47 | | |
48 | | /** |
49 | | Returns the number of elements in the FIFO |
50 | | */ |
51 | | virtual size_t getCountDepth() const = 0; |
52 | | |
53 | | /** |
54 | | Returns the average time it takes for individual messages to be |
55 | | serviced (in micro-seconds) |
56 | | */ |
57 | | virtual time_t averageServiceTimeMicroSec() const = 0; |
58 | | |
59 | | /** |
60 | | @internal |
61 | | Return this fifo's role-number. The meaning of the return is defined on |
62 | | a per-application basis, and will have special meaning to the |
63 | | CongestionManager implementation specific to that app. For instance, |
64 | | 1 might be understood to represent the main state machine fifo in |
65 | | resip, 2 might indicate a transport fifo (of which there may be |
66 | | several), 3 might indicate a particular TU's fifo, etc. |
67 | | These are intended for use by CongestionManager only. |
68 | | */ |
69 | 0 | inline uint8_t getRole() const {return mRole;} |
70 | | |
71 | | /** |
72 | | @internal |
73 | | Set this fifo's role-number. |
74 | | @see getRole() |
75 | | */ |
76 | 0 | inline void setRole(uint8_t role) {mRole=role;} |
77 | | |
78 | | /** |
79 | | Sets the description for this fifo. This is used in the logging for |
80 | | this fifo's statistics, and can also be used by the CongestionManager |
81 | | to assign a role-number. |
82 | | @param description The description for this fifo. |
83 | | */ |
84 | | inline void setDescription(const resip::Data& description) |
85 | 0 | { |
86 | 0 | mDescription=description; |
87 | 0 | } |
88 | | |
89 | | /** |
90 | | Gets the description for this fifo. |
91 | | @see setDescription() |
92 | | */ |
93 | 0 | virtual const resip::Data& getDescription() const {return mDescription;} |
94 | | |
95 | | protected: |
96 | | Data mDescription; |
97 | | uint8_t mRole; |
98 | | }; |
99 | | |
100 | | /** |
101 | | * The getNext() method takes an argument {ms} that normally |
102 | | * the number of milliseconds to wait. There are two special values: |
103 | | * NOWAIT |
104 | | * Don't wait/block/sleep. If no message to retrieve, return NULL. |
105 | | * FOREVER |
106 | | * Wait forever until a message is available. |
107 | | * Note that the encoding (0 vs -1) is the oppositive convention |
108 | | * of standard APIs such as epoll_wait(). This is for historical reasons. |
109 | | */ |
110 | | #define RESIP_FIFO_NOWAIT -1 |
111 | | #define RESIP_FIFO_FOREVER 0 |
112 | | |
113 | | /** |
114 | | @brief The base class from which various templated Fifo classes are derived. |
115 | | |
116 | | (aka template hoist) |
117 | | AbstractFifo's get operations are all threadsafe; AbstractFifo does not |
118 | | define any put operations (these are defined in subclasses). |
119 | | @note Users of the resip stack will not need to interact with this class |
120 | | directly in most cases. Look at Fifo and TimeLimitFifo instead. |
121 | | |
122 | | @ingroup message_passing |
123 | | */ |
124 | | template <typename T> |
125 | | class AbstractFifo : public FifoStatsInterface |
126 | | { |
127 | | public: |
128 | | /** |
129 | | * @brief Constructor |
130 | | * @param maxSize max number of messages to keep |
131 | | **/ |
132 | | AbstractFifo() |
133 | | : FifoStatsInterface(), |
134 | | mLastSampleTakenMicroSec(0), |
135 | | mCounter(0), |
136 | | mAverageServiceTimeMicroSec(0), |
137 | | mSize(0) |
138 | | {} |
139 | | |
140 | | virtual ~AbstractFifo() |
141 | | { |
142 | | } |
143 | | |
144 | | /** |
145 | | @brief is the queue empty? |
146 | | @return true if the queue is empty and false otherwise |
147 | | **/ |
148 | | bool empty() const |
149 | | { |
150 | | Lock lock(mMutex); (void)lock; |
151 | | return mFifo.empty(); |
152 | | } |
153 | | |
154 | | /** |
155 | | @brief get the current size of the fifo. |
156 | | @note Note you should not use this function to determine |
157 | | whether a call to getNext() will block or not. Use |
158 | | messageAvailable() instead. |
159 | | @return the number of messages in the queue |
160 | | */ |
161 | | virtual unsigned int size() const |
162 | | { |
163 | | Lock lock(mMutex); (void)lock; |
164 | | return (unsigned int)mFifo.size(); |
165 | | } |
166 | | |
167 | | /** |
168 | | @brief is a message available? |
169 | | @retval true if a message is available and false otherwise |
170 | | */ |
171 | | |
172 | | bool messageAvailable() const |
173 | | { |
174 | | Lock lock(mMutex); (void)lock; |
175 | | return !mFifo.empty(); |
176 | | } |
177 | | |
178 | | /** |
179 | | @brief computes the time delta between the oldest and newest queue members |
180 | | @note defaults to zero, overridden by TimeLimitFifo<T> |
181 | | @return the time delta between the oldest and newest queue members |
182 | | */ |
183 | | virtual time_t getTimeDepth() const |
184 | | { |
185 | | return 0; |
186 | | } |
187 | | |
188 | | virtual size_t getCountDepth() const |
189 | | { |
190 | | return mSize; |
191 | | } |
192 | | |
193 | | virtual time_t expectedWaitTimeMilliSec() const |
194 | 0 | { |
195 | 0 | return ((mAverageServiceTimeMicroSec*mSize)+500)/1000; |
196 | 0 | } |
197 | | |
198 | | virtual time_t averageServiceTimeMicroSec() const |
199 | | { |
200 | | return mAverageServiceTimeMicroSec; |
201 | | } |
202 | | |
203 | | /// remove all elements in the queue (or not) |
204 | | virtual void clear() {}; |
205 | | |
206 | | protected: |
207 | | /** |
208 | | @brief Returns the first message available. |
209 | | @details Returns the first message available. It will wait if no |
210 | | messages are available. If a signal interrupts the wait, |
211 | | it will retry the wait. Signals can therefore not be caught |
212 | | via getNext. If you need to detect a signal, use block |
213 | | prior to calling getNext. |
214 | | @return the first message available |
215 | | */ |
216 | | T getNext() |
217 | | { |
218 | | Lock lock(mMutex); (void)lock; |
219 | | onFifoPolled(); |
220 | | |
221 | | // Wait util there are messages available. |
222 | | while (mFifo.empty()) |
223 | | { |
224 | | mCondition.wait(lock); |
225 | | } |
226 | | |
227 | | // Return the first message on the fifo. |
228 | | // |
229 | | T firstMessage(mFifo.front()); |
230 | | mFifo.pop_front(); |
231 | | onMessagePopped(); |
232 | | return firstMessage; |
233 | | } |
234 | | |
235 | | |
236 | | /** |
237 | | @brief Returns the next message available. |
238 | | @details Returns the next message available. Will wait up to |
239 | | ms milliseconds if no information is available. If |
240 | | the specified time passes or a signal interrupts the |
241 | | wait, this method returns 0. This interface provides |
242 | | no mechanism to distinguish between timeout and |
243 | | interrupt. |
244 | | */ |
245 | | bool getNext(int ms, T& toReturn) |
246 | | { |
247 | | if(ms == 0) |
248 | | { |
249 | | toReturn = getNext(); |
250 | | return true; |
251 | | } |
252 | | |
253 | | if(ms < 0) |
254 | | { |
255 | | Lock lock(mMutex); (void)lock; |
256 | | onFifoPolled(); |
257 | | if (mFifo.empty()) // WATCHOUT: Do not test mSize instead |
258 | | return false; |
259 | | toReturn = mFifo.front(); |
260 | | mFifo.pop_front(); |
261 | | onMessagePopped(); |
262 | | return true; |
263 | | } |
264 | | |
265 | | const auto begin = std::chrono::steady_clock::now(); |
266 | | const auto end = begin + std::chrono::milliseconds(ms); |
267 | | Lock lock(mMutex); (void)lock; |
268 | | onFifoPolled(); |
269 | | |
270 | | // Wait until there are messages available |
271 | | while (mFifo.empty()) |
272 | | { |
273 | | if(ms==0) |
274 | | { |
275 | | return false; |
276 | | } |
277 | | auto now = std::chrono::steady_clock::now(); |
278 | | if(now >= end) |
279 | | { |
280 | | return false; |
281 | | } |
282 | | |
283 | | // bail if total wait time exceeds limit |
284 | | bool signaled = mCondition.wait_until(lock, end) == std::cv_status::no_timeout; |
285 | | if (!signaled) |
286 | | { |
287 | | return false; |
288 | | } |
289 | | } |
290 | | |
291 | | // Return the first message on the fifo. |
292 | | // |
293 | | toReturn=mFifo.front(); |
294 | | mFifo.pop_front(); |
295 | | onMessagePopped(); |
296 | | return true; |
297 | | } |
298 | | |
299 | | typedef std::deque<T> Messages; |
300 | | |
301 | | void getMultiple(Messages& other, unsigned int max) |
302 | | { |
303 | | Lock lock(mMutex); (void)lock; |
304 | | onFifoPolled(); |
305 | | resip_assert(other.empty()); |
306 | | while (mFifo.empty()) |
307 | | { |
308 | | mCondition.wait(lock); |
309 | | } |
310 | | |
311 | | if(mFifo.size() <= max) |
312 | | { |
313 | | std::swap(mFifo, other); |
314 | | onMessagePopped(mSize); |
315 | | } |
316 | | else |
317 | | { |
318 | | size_t num=max; |
319 | | while( 0 != max-- ) |
320 | | { |
321 | | other.push_back(mFifo.front()); |
322 | | mFifo.pop_front(); |
323 | | } |
324 | | onMessagePopped((unsigned int)num); |
325 | | } |
326 | | } |
327 | | |
328 | | bool getMultiple(int ms, Messages& other, unsigned int max) |
329 | | { |
330 | | if(ms==0) |
331 | | { |
332 | | getMultiple(other,max); |
333 | | return true; |
334 | | } |
335 | | |
336 | | resip_assert(other.empty()); |
337 | | const auto begin = std::chrono::steady_clock::now(); |
338 | | const auto end = begin + std::chrono::milliseconds(ms); // !kh! ms should've been unsigned :( |
339 | | Lock lock(mMutex); (void)lock; |
340 | | onFifoPolled(); |
341 | | |
342 | | // Wait until there are messages available |
343 | | while (mFifo.empty()) |
344 | | { |
345 | | if(ms < 0) |
346 | | { |
347 | | return false; |
348 | | } |
349 | | const auto now = std::chrono::steady_clock::now(); |
350 | | if(now >= end) |
351 | | { |
352 | | return false; |
353 | | } |
354 | | |
355 | | // bail if total wait time exceeds limit |
356 | | bool signaled = mCondition.wait_until(lock, end) == std::cv_status::no_timeout; |
357 | | if (!signaled) |
358 | | { |
359 | | return false; |
360 | | } |
361 | | } |
362 | | |
363 | | if(mFifo.size() <= max) |
364 | | { |
365 | | std::swap(mFifo, other); |
366 | | onMessagePopped(mSize); |
367 | | } |
368 | | else |
369 | | { |
370 | | size_t num=max; |
371 | | while( 0 != max-- ) |
372 | | { |
373 | | other.push_back(mFifo.front()); |
374 | | mFifo.pop_front(); |
375 | | } |
376 | | onMessagePopped((unsigned int)num); |
377 | | } |
378 | | return true; |
379 | | } |
380 | | |
381 | | size_t add(const T& item) |
382 | | { |
383 | | Lock lock(mMutex); (void)lock; |
384 | | mFifo.push_back(item); |
385 | | mCondition.notify_one(); |
386 | | onMessagePushed(1); |
387 | | return mFifo.size(); |
388 | | } |
389 | | |
390 | | size_t addMultiple(Messages& items) |
391 | 0 | { |
392 | 0 | Lock lock(mMutex); (void)lock; |
393 | 0 | size_t size=items.size(); |
394 | 0 | if(mFifo.empty()) |
395 | 0 | { |
396 | 0 | std::swap(mFifo, items); |
397 | 0 | } |
398 | 0 | else |
399 | 0 | { |
400 | | // I suppose it is possible to optimize this as a push_front() from |
401 | | // mFifo to items, and then do a swap, if items is larger. |
402 | 0 | while(!items.empty()) |
403 | 0 | { |
404 | 0 | mFifo.push_back(items.front()); |
405 | 0 | items.pop_front(); |
406 | 0 | } |
407 | 0 | } |
408 | 0 | mCondition.notify_one(); |
409 | 0 | onMessagePushed((int)size); |
410 | 0 | return mFifo.size(); |
411 | 0 | } |
412 | | |
413 | | /** @brief container for FIFO items */ |
414 | | Messages mFifo; |
415 | | /** @brief access serialization lock */ |
416 | | mutable Mutex mMutex; |
417 | | /** @brief condition for waiting on new queue items */ |
418 | | Condition mCondition; |
419 | | |
420 | | mutable uint64_t mLastSampleTakenMicroSec; |
421 | | mutable uint32_t mCounter; |
422 | | mutable uint32_t mAverageServiceTimeMicroSec; |
423 | | // std::deque has to perform some amount of traversal to calculate its |
424 | | // size; we maintain this count so that it can be queried without locking, |
425 | | // in situations where it being off by a small amount is ok. |
426 | | uint32_t mSize; |
427 | | |
428 | | virtual void onFifoPolled() |
429 | | { |
430 | | // !bwc! TODO allow this sampling frequency to be tweaked |
431 | | if(mLastSampleTakenMicroSec && |
432 | | mCounter && |
433 | | (mCounter >= 64 || mFifo.empty())) |
434 | | { |
435 | | uint64_t now(Timer::getTimeMicroSec()); |
436 | | uint64_t diff = now-mLastSampleTakenMicroSec; |
437 | | |
438 | | if(mCounter >= 4096) |
439 | | { |
440 | | mAverageServiceTimeMicroSec=(uint32_t)resipIntDiv(diff, mCounter); |
441 | | } |
442 | | else // fifo got emptied; merge into a rolling average |
443 | | { |
444 | | // .bwc. This is a moving average with period 64, round to |
445 | | // nearest int. |
446 | | mAverageServiceTimeMicroSec=(uint32_t)resipIntDiv( |
447 | | diff+((4096-mCounter)*mAverageServiceTimeMicroSec), |
448 | | 4096U); |
449 | | } |
450 | | mCounter=0; |
451 | | if(mFifo.empty()) |
452 | | { |
453 | | mLastSampleTakenMicroSec=0; |
454 | | } |
455 | | else |
456 | | { |
457 | | mLastSampleTakenMicroSec=now; |
458 | | } |
459 | | } |
460 | | } |
461 | | |
462 | | /** |
463 | | Called when a message (or messages) are removed from this fifo. Used to |
464 | | drive service time calculations. |
465 | | */ |
466 | | virtual void onMessagePopped(unsigned int num=1) |
467 | | { |
468 | | mCounter+=num; |
469 | | mSize-=num; |
470 | | } |
471 | | |
472 | | virtual void onMessagePushed(int num) |
473 | 0 | { |
474 | 0 | if(mSize==0) |
475 | 0 | { |
476 | 0 | // Fifo went from empty to non-empty. Take a timestamp, and record |
477 | 0 | // how long it takes to process some messages. |
478 | 0 | mLastSampleTakenMicroSec=Timer::getTimeMicroSec(); |
479 | 0 | } |
480 | 0 | mSize+=num; |
481 | 0 | } |
482 | | private: |
483 | | // no value semantics |
484 | | AbstractFifo(const AbstractFifo&); |
485 | | AbstractFifo& operator=(const AbstractFifo&); |
486 | | }; |
487 | | |
488 | | } // namespace resip |
489 | | |
490 | | #endif |
491 | | |
492 | | /* ==================================================================== |
493 | | * The Vovida Software License, Version 1.0 |
494 | | * |
495 | | * Redistribution and use in source and binary forms, with or without |
496 | | * modification, are permitted provided that the following conditions |
497 | | * are met: |
498 | | * |
499 | | * 1. Redistributions of source code must retain the above copyright |
500 | | * notice, this list of conditions and the following disclaimer. |
501 | | * |
502 | | * 2. Redistributions in binary form must reproduce the above copyright |
503 | | * notice, this list of conditions and the following disclaimer in |
504 | | * the documentation and/or other materials provided with the |
505 | | * distribution. |
506 | | * |
507 | | * 3. The names "VOCAL", "Vovida Open Communication Application Library", |
508 | | * and "Vovida Open Communication Application Library (VOCAL)" must |
509 | | * not be used to endorse or promote products derived from this |
510 | | * software without prior written permission. For written |
511 | | * permission, please contact vocal@vovida.org. |
512 | | * |
513 | | * 4. Products derived from this software may not be called "VOCAL", nor |
514 | | * may "VOCAL" appear in their name, without prior written |
515 | | * permission of Vovida Networks, Inc. |
516 | | * |
517 | | * THIS SOFTWARE IS PROVIDED "AS IS" AND ANY EXPRESSED OR IMPLIED |
518 | | * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES |
519 | | * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, TITLE AND |
520 | | * NON-INFRINGEMENT ARE DISCLAIMED. IN NO EVENT SHALL VOVIDA |
521 | | * NETWORKS, INC. OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT DAMAGES |
522 | | * IN EXCESS OF $1,000, NOR FOR ANY INDIRECT, INCIDENTAL, SPECIAL, |
523 | | * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, |
524 | | * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR |
525 | | * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY |
526 | | * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
527 | | * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE |
528 | | * USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH |
529 | | * DAMAGE. |
530 | | * |
531 | | * ==================================================================== |
532 | | * |
533 | | * This software consists of voluntary contributions made by Vovida |
534 | | * Networks, Inc. and many individuals on behalf of Vovida Networks, |
535 | | * Inc. For more information on Vovida Networks, Inc., please see |
536 | | * <http://www.vovida.org/>. |
537 | | * |
538 | | */ |