Coverage Report

Created: 2025-07-23 06:03

/src/resiprocate/rutil/AbstractFifo.hxx
Line
Count
Source (jump to first uncovered line)
1
#ifndef RESIP_AbstractFifo_hxx
2
#define RESIP_AbstractFifo_hxx 
3
4
#include "rutil/ResipAssert.h"
5
#include <deque>
6
7
#include "rutil/Mutex.hxx"
8
#include "rutil/Condition.hxx"
9
#include "rutil/Lock.hxx"
10
#include "rutil/CongestionManager.hxx"
11
12
#include "rutil/compat.hxx"
13
#include "rutil/Timer.hxx"
14
15
namespace resip
16
{
17
/**
18
  @brief Interface for providing metrics on FIFOs, primarily used by 
19
   CongestionManager.
20
   Provides four different types of metrics:
21
      - size : The number of elements in the queue
22
      - time-depth : The age of the oldest item in the queue (ie, the front)
23
      - expected wait-time : A heuristic estimating the amount of time a message
24
         would take to be serviced if it were added to the queue.
25
      - average service time : The average time it takes to service a single
26
         element from the queue (this is helpful in congestion control, but is
27
         mostly intended for logging).
28
*/
29
class FifoStatsInterface
30
{
31
   public:
32
         
33
     FifoStatsInterface();
34
     virtual ~FifoStatsInterface();
35
     
36
     /**
37
         Returns the expected time it will take to service all messages 
38
         currently in the queue (in milli-seconds)
39
     */
40
     virtual time_t expectedWaitTimeMilliSec() const =0;
41
42
     /**
43
         Returns the difference in time between the youngest and oldest item in 
44
         the FIFO in seconds
45
      */
46
      virtual time_t getTimeDepth() const = 0;
47
     
48
     /**
49
         Returns the number of elements in the FIFO
50
      */
51
      virtual size_t getCountDepth() const = 0;
52
53
     /**
54
         Returns the average time it takes for individual messages to be 
55
         serviced (in micro-seconds)
56
     */
57
     virtual time_t averageServiceTimeMicroSec() const = 0;
58
59
      /**
60
         @internal
61
         Return this fifo's role-number. The meaning of the return is defined on
62
         a per-application basis, and will have special meaning to the 
63
         CongestionManager implementation specific to that app. For instance, 
64
         1 might be understood to represent the main state machine fifo in 
65
         resip, 2 might indicate a transport fifo (of which there may be 
66
         several), 3 might indicate a particular TU's fifo, etc.
67
         These are intended for use by CongestionManager only.
68
      */
69
0
      inline uint8_t getRole() const {return mRole;}
70
71
      /**
72
         @internal
73
         Set this fifo's role-number.
74
         @see getRole()
75
      */
76
0
      inline void setRole(uint8_t role) {mRole=role;}
77
78
      /**
79
         Sets the description for this fifo. This is used in the logging for
80
         this fifo's statistics, and can also be used by the CongestionManager 
81
         to assign a role-number.
82
         @param description The description for this fifo.
83
      */
84
      inline void setDescription(const resip::Data& description)
85
0
      {
86
0
         mDescription=description;
87
0
      }
88
89
      /**
90
         Gets the description for this fifo.
91
         @see setDescription()
92
      */
93
0
      virtual const resip::Data& getDescription() const {return mDescription;}
94
95
   protected:
96
      Data mDescription;
97
      uint8_t mRole;
98
};
99
100
/**
101
  * The getNext() method takes an argument {ms} that normally
102
  * the number of milliseconds to wait. There are two special values:
103
  * NOWAIT
104
  *     Don't wait/block/sleep. If no message to retrieve, return NULL.
105
  * FOREVER
106
  *     Wait forever until a message is available.
107
  * Note that the encoding (0 vs -1) is the oppositive convention
108
  * of standard APIs such as epoll_wait(). This is for historical reasons.
109
  */
110
#define RESIP_FIFO_NOWAIT -1
111
#define RESIP_FIFO_FOREVER  0
112
113
/**
114
   @brief The base class from which various templated Fifo classes are derived.
115
116
   (aka template hoist) 
117
   AbstractFifo's get operations are all threadsafe; AbstractFifo does not 
118
   define any put operations (these are defined in subclasses).
119
   @note Users of the resip stack will not need to interact with this class 
120
      directly in most cases. Look at Fifo and TimeLimitFifo instead.
121
122
   @ingroup message_passing
123
 */
124
template <typename T>
125
class AbstractFifo : public FifoStatsInterface
126
{
127
   public:
128
     /** 
129
      * @brief Constructor
130
      * @param maxSize max number of messages to keep
131
      **/
132
      AbstractFifo()
133
         : FifoStatsInterface(),
134
            mLastSampleTakenMicroSec(0),
135
            mCounter(0),
136
            mAverageServiceTimeMicroSec(0),
137
            mSize(0)
138
      {}
139
140
      virtual ~AbstractFifo()
141
      {
142
      }
143
144
      /** 
145
         @brief is the queue empty?
146
         @return true if the queue is empty and false otherwise
147
       **/
148
      bool empty() const
149
      {
150
         Lock lock(mMutex); (void)lock;
151
         return mFifo.empty();
152
      }
153
154
      /**
155
        @brief get the current size of the fifo.
156
        @note Note you should not use this function to determine
157
        whether a call to getNext() will block or not. Use
158
        messageAvailable() instead.
159
        @return the number of messages in the queue
160
       */
161
      virtual unsigned int size() const
162
      {
163
         Lock lock(mMutex); (void)lock;
164
         return (unsigned int)mFifo.size();
165
      }
166
167
      /**
168
      @brief is a message available?
169
      @retval true if a message is available and false otherwise
170
       */
171
       
172
      bool messageAvailable() const
173
      {
174
         Lock lock(mMutex); (void)lock;
175
         return !mFifo.empty();
176
      }
177
178
      /**
179
      @brief computes the time delta between the oldest and newest queue members
180
      @note defaults to zero, overridden by TimeLimitFifo<T>
181
      @return the time delta between the oldest and newest queue members
182
      */
183
      virtual time_t getTimeDepth() const
184
      {
185
         return 0;
186
      }
187
188
      virtual size_t getCountDepth() const
189
      {
190
         return mSize;
191
      }
192
193
      virtual time_t expectedWaitTimeMilliSec() const
194
0
      {
195
0
         return ((mAverageServiceTimeMicroSec*mSize)+500)/1000;
196
0
      }
197
198
      virtual time_t averageServiceTimeMicroSec() const
199
      {
200
         return mAverageServiceTimeMicroSec;
201
      }
202
203
      /// remove all elements in the queue (or not)
204
      virtual void clear() {};
205
206
   protected:
207
      /** 
208
          @brief Returns the first message available.
209
          @details Returns the first message available. It will wait if no
210
          messages are available. If a signal interrupts the wait,
211
          it will retry the wait. Signals can therefore not be caught
212
          via getNext. If you need to detect a signal, use block
213
          prior to calling getNext.
214
          @return the first message available
215
       */
216
      T getNext()
217
      {
218
         Lock lock(mMutex); (void)lock;
219
         onFifoPolled();
220
221
         // Wait util there are messages available.
222
         while (mFifo.empty())
223
         {
224
            mCondition.wait(lock);
225
         }
226
227
         // Return the first message on the fifo.
228
         //
229
         T firstMessage(mFifo.front());
230
         mFifo.pop_front();
231
         onMessagePopped();
232
         return firstMessage;
233
      }
234
235
236
      /**
237
        @brief Returns the next message available.
238
        @details Returns the next message available. Will wait up to
239
        ms milliseconds if no information is available. If
240
        the specified time passes or a signal interrupts the
241
        wait, this method returns 0. This interface provides
242
        no mechanism to distinguish between timeout and
243
        interrupt.
244
       */
245
      bool getNext(int ms, T& toReturn)
246
      {
247
         if(ms == 0) 
248
         {
249
            toReturn = getNext();
250
            return true;
251
         }
252
253
         if(ms < 0)
254
         {
255
            Lock lock(mMutex); (void)lock;
256
            onFifoPolled();
257
            if (mFifo.empty())  // WATCHOUT: Do not test mSize instead
258
              return false;
259
            toReturn = mFifo.front();
260
            mFifo.pop_front();
261
            onMessagePopped();
262
            return true;
263
         }
264
265
         const auto begin = std::chrono::steady_clock::now();
266
         const auto end = begin + std::chrono::milliseconds(ms);
267
         Lock lock(mMutex); (void)lock;
268
         onFifoPolled();
269
270
         // Wait until there are messages available
271
         while (mFifo.empty())
272
         {
273
            if(ms==0)
274
            {
275
               return false;
276
            }
277
            auto now = std::chrono::steady_clock::now();
278
            if(now >= end)
279
            {
280
                return false;
281
            }
282
      
283
            // bail if total wait time exceeds limit
284
            bool signaled = mCondition.wait_until(lock, end) == std::cv_status::no_timeout;
285
            if (!signaled)
286
            {
287
               return false;
288
            }
289
         }
290
      
291
         // Return the first message on the fifo.
292
         //
293
         toReturn=mFifo.front();
294
         mFifo.pop_front();
295
         onMessagePopped();
296
         return true;
297
      }
298
299
      typedef std::deque<T> Messages;
300
301
      void getMultiple(Messages& other, unsigned int max)
302
      {
303
         Lock lock(mMutex); (void)lock;
304
         onFifoPolled();
305
         resip_assert(other.empty());
306
         while (mFifo.empty())
307
         {
308
            mCondition.wait(lock);
309
         }
310
311
         if(mFifo.size() <= max)
312
         {
313
            std::swap(mFifo, other);
314
            onMessagePopped(mSize);
315
         }
316
         else
317
         {
318
            size_t num=max;
319
            while( 0 != max-- )
320
            {
321
               other.push_back(mFifo.front());
322
               mFifo.pop_front();
323
            }
324
            onMessagePopped((unsigned int)num);
325
         }
326
      }
327
328
      bool getMultiple(int ms, Messages& other, unsigned int max)
329
      {
330
         if(ms==0)
331
         {
332
            getMultiple(other,max);
333
            return true;
334
         }
335
336
         resip_assert(other.empty());
337
         const auto begin = std::chrono::steady_clock::now();
338
         const auto end = begin + std::chrono::milliseconds(ms); // !kh! ms should've been unsigned :(
339
         Lock lock(mMutex); (void)lock;
340
         onFifoPolled();
341
342
         // Wait until there are messages available
343
         while (mFifo.empty())
344
         {
345
            if(ms < 0)
346
            {
347
               return false;
348
            }
349
            const auto now = std::chrono::steady_clock::now();
350
            if(now >= end)
351
            {
352
                return false;
353
            }
354
355
            // bail if total wait time exceeds limit
356
            bool signaled = mCondition.wait_until(lock, end) == std::cv_status::no_timeout;
357
            if (!signaled)
358
            {
359
               return false;
360
            }
361
         }
362
363
         if(mFifo.size() <= max)
364
         {
365
            std::swap(mFifo, other);
366
            onMessagePopped(mSize);
367
         }
368
         else
369
         {
370
            size_t num=max;
371
            while( 0 != max-- )
372
            {
373
               other.push_back(mFifo.front());
374
               mFifo.pop_front();
375
            }
376
            onMessagePopped((unsigned int)num);
377
         }
378
         return true;
379
      }
380
381
      size_t add(const T& item)
382
      {
383
         Lock lock(mMutex); (void)lock;
384
         mFifo.push_back(item);
385
         mCondition.notify_one();
386
         onMessagePushed(1);
387
         return mFifo.size();
388
      }
389
390
      size_t addMultiple(Messages& items)
391
0
      {
392
0
         Lock lock(mMutex); (void)lock;
393
0
         size_t size=items.size();
394
0
         if(mFifo.empty())
395
0
         {
396
0
            std::swap(mFifo, items);
397
0
         }
398
0
         else
399
0
         {
400
            // I suppose it is possible to optimize this as a push_front() from
401
            // mFifo to items, and then do a swap, if items is larger.
402
0
            while(!items.empty())
403
0
            {
404
0
               mFifo.push_back(items.front());
405
0
               items.pop_front();
406
0
            }
407
0
         }
408
0
         mCondition.notify_one();
409
0
         onMessagePushed((int)size);
410
0
         return mFifo.size();
411
0
      }
412
413
      /** @brief container for FIFO items */
414
      Messages mFifo;
415
      /** @brief access serialization lock */
416
      mutable Mutex mMutex;
417
      /** @brief condition for waiting on new queue items */
418
      Condition mCondition;
419
420
      mutable uint64_t mLastSampleTakenMicroSec;
421
      mutable uint32_t mCounter;
422
      mutable uint32_t mAverageServiceTimeMicroSec;
423
      // std::deque has to perform some amount of traversal to calculate its 
424
      // size; we maintain this count so that it can be queried without locking, 
425
      // in situations where it being off by a small amount is ok.
426
      uint32_t mSize;
427
428
      virtual void onFifoPolled()
429
      {
430
         // !bwc! TODO allow this sampling frequency to be tweaked
431
         if(mLastSampleTakenMicroSec &&
432
            mCounter &&
433
            (mCounter >= 64 || mFifo.empty()))
434
         {
435
            uint64_t now(Timer::getTimeMicroSec());
436
            uint64_t diff = now-mLastSampleTakenMicroSec;
437
438
            if(mCounter >= 4096)
439
            {
440
               mAverageServiceTimeMicroSec=(uint32_t)resipIntDiv(diff, mCounter);
441
            }
442
            else // fifo got emptied; merge into a rolling average
443
            {
444
               // .bwc. This is a moving average with period 64, round to 
445
               // nearest int.
446
               mAverageServiceTimeMicroSec=(uint32_t)resipIntDiv(
447
                     diff+((4096-mCounter)*mAverageServiceTimeMicroSec),
448
                     4096U);
449
            }
450
            mCounter=0;
451
            if(mFifo.empty())
452
            {
453
               mLastSampleTakenMicroSec=0;
454
            }
455
            else
456
            {
457
               mLastSampleTakenMicroSec=now;
458
            }
459
         }
460
      }
461
462
      /**
463
         Called when a message (or messages) are removed from this fifo. Used to
464
         drive service time calculations.
465
      */
466
      virtual void onMessagePopped(unsigned int num=1)
467
      {
468
         mCounter+=num;
469
         mSize-=num;
470
      }
471
472
      virtual void onMessagePushed(int num)
473
0
      {
474
0
         if(mSize==0)
475
0
         {
476
0
            // Fifo went from empty to non-empty. Take a timestamp, and record
477
0
            // how long it takes to process some messages.
478
0
            mLastSampleTakenMicroSec=Timer::getTimeMicroSec();
479
0
         }
480
0
         mSize+=num;
481
0
      }
482
   private:
483
      // no value semantics
484
      AbstractFifo(const AbstractFifo&);
485
      AbstractFifo& operator=(const AbstractFifo&);
486
};
487
488
} // namespace resip
489
490
#endif
491
492
/* ====================================================================
493
 * The Vovida Software License, Version 1.0 
494
 * 
495
 * Redistribution and use in source and binary forms, with or without
496
 * modification, are permitted provided that the following conditions
497
 * are met:
498
 * 
499
 * 1. Redistributions of source code must retain the above copyright
500
 *    notice, this list of conditions and the following disclaimer.
501
 * 
502
 * 2. Redistributions in binary form must reproduce the above copyright
503
 *    notice, this list of conditions and the following disclaimer in
504
 *    the documentation and/or other materials provided with the
505
 *    distribution.
506
 * 
507
 * 3. The names "VOCAL", "Vovida Open Communication Application Library",
508
 *    and "Vovida Open Communication Application Library (VOCAL)" must
509
 *    not be used to endorse or promote products derived from this
510
 *    software without prior written permission. For written
511
 *    permission, please contact vocal@vovida.org.
512
 *
513
 * 4. Products derived from this software may not be called "VOCAL", nor
514
 *    may "VOCAL" appear in their name, without prior written
515
 *    permission of Vovida Networks, Inc.
516
 * 
517
 * THIS SOFTWARE IS PROVIDED "AS IS" AND ANY EXPRESSED OR IMPLIED
518
 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
519
 * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, TITLE AND
520
 * NON-INFRINGEMENT ARE DISCLAIMED.  IN NO EVENT SHALL VOVIDA
521
 * NETWORKS, INC. OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT DAMAGES
522
 * IN EXCESS OF $1,000, NOR FOR ANY INDIRECT, INCIDENTAL, SPECIAL,
523
 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
524
 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
525
 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
526
 * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
527
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
528
 * USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
529
 * DAMAGE.
530
 * 
531
 * ====================================================================
532
 * 
533
 * This software consists of voluntary contributions made by Vovida
534
 * Networks, Inc. and many individuals on behalf of Vovida Networks,
535
 * Inc.  For more information on Vovida Networks, Inc., please see
536
 * <http://www.vovida.org/>.
537
 *
538
 */