Coverage Report

Created: 2025-07-18 06:32

/src/opensips/db/db_insertq.c
Line
Count
Source (jump to first uncovered line)
1
/*
2
 * Copyright (C) 2011 OpenSIPS Solutions
3
 *
4
 * This file is part of opensips, a free SIP server.
5
 *
6
 * opensips is free software; you can redistribute it and/or modify
7
 * it under the terms of the GNU General Public License as published by
8
 * the Free Software Foundation; either version 2 of the License, or
9
 * (at your option) any later version.
10
 *
11
 * opensips is distributed in the hope that it will be useful,
12
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
 * GNU General Public License for more details.
15
 *
16
 * You should have received a copy of the GNU General Public License
17
 * along with this program; if not, write to the Free Software
18
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
19
 *
20
 *
21
 * history:
22
 * ---------
23
 *  2011-06-07  created (vlad)
24
 */
25
26
#include "../timer.h"
27
#include "../pt.h"
28
29
#include "db_insertq.h"
30
#include "db_cap.h"
31
32
int query_buffer_size = 0;
33
int query_flush_time = 0;
34
query_list_t **query_list = NULL;
35
query_list_t **last_query = NULL;
36
gen_lock_t *ql_lock;
37
38
/* inits all the global variables needed for the insert query lists */
39
int init_query_list(void)
40
0
{
41
0
  query_list = shm_malloc(sizeof(query_list_t *));
42
0
  if (!query_list)
43
0
  {
44
0
    LM_ERR("no more shm\n");
45
0
    return -1;
46
0
  }
47
48
0
  last_query = shm_malloc(sizeof(query_list_t *));
49
0
  if (!last_query)
50
0
  {
51
0
    LM_ERR("no more shm\n");
52
0
    shm_free(query_list);
53
0
    return -1;
54
0
  }
55
56
0
  *query_list = NULL;
57
0
  *last_query = NULL;
58
59
0
  ql_lock = lock_alloc();
60
0
  if (ql_lock == 0) {
61
0
    LM_ERR("failed to alloc lock\n");
62
0
    goto error0;
63
0
  }
64
65
0
  if (lock_init(ql_lock) == 0) {
66
0
    LM_ERR("failed to init lock\n");
67
0
    goto error1;
68
0
  }
69
70
0
  LM_DBG("Initialised query list. Insert queue size = %d\n",query_buffer_size);
71
0
  return 0;
72
73
0
error1:
74
0
  lock_dealloc(ql_lock);
75
0
error0:
76
0
  shm_free(query_list);
77
0
  return -1;
78
0
}
79
80
/* Initializes needed structures and registeres timer
81
 *
82
 * Important : To be called before forking so all processes
83
 * inherit same queue */
84
int init_ql_support(void)
85
0
{
86
0
  if (query_buffer_size > 1)
87
0
  {
88
0
    if  (init_query_list() != 0 ||
89
0
      register_timer("querydb-flush", ql_timer_routine,NULL,
90
0
        query_flush_time>0?query_flush_time:DEF_FLUSH_TIME,
91
0
        TIMER_FLAG_DELAY_ON_DELAY) < 0 )
92
0
    {
93
0
      LM_ERR("failed initializing ins list support\n");
94
0
      return -1;
95
0
    }
96
0
  }
97
98
0
  return 0;
99
0
}
100
101
102
void flush_query_list(void)
103
0
{
104
0
  query_list_t *it;
105
0
  static db_ps_t my_ps = NULL;
106
0
  int i;
107
108
  /* no locks, only attendent is left at this point */
109
0
  for (it=*query_list;it;it=it->next)
110
0
  {
111
0
    if (it->no_rows > 0)
112
0
    {
113
0
      memset(&it->dbf,0,sizeof(db_func_t));
114
0
      if (db_bind_mod(&it->url,&it->dbf) < 0)
115
0
      {
116
0
        LM_ERR("failed to bind to db at shutdown\n");
117
0
        lock_release(it->lock);
118
0
        continue;
119
0
      }
120
121
0
      it->conn[process_no] = it->dbf.init(&it->url);
122
0
      if (it->conn[process_no] == 0)
123
0
      {
124
0
        LM_ERR("unable to connect to DB at shutdown\n");
125
0
        lock_release(it->lock);
126
0
        continue;
127
0
      }
128
129
0
      it->dbf.use_table(it->conn[process_no],&it->table);
130
131
      //Reset prepared statement between query lists/connections
132
0
      my_ps = NULL;
133
134
      /* and let's insert the rows */
135
0
      for (i=0;i<it->no_rows;i++)
136
0
      {
137
0
        CON_SET_CURR_PS(it->conn[process_no], &my_ps);
138
0
        if (it->dbf.insert(it->conn[process_no],it->cols,it->rows[i],
139
0
              it->col_no) < 0)
140
0
          LM_ERR("failed to insert into DB\n");
141
142
0
        shm_free(it->rows[i]);
143
0
      }
144
145
      /* no longer need this connection */
146
0
      if (it->conn[process_no] && it->dbf.close)
147
0
        it->dbf.close(it->conn[process_no]);
148
0
    }
149
0
  }
150
0
}
151
152
/* free all resources used by insert buffering */
153
void destroy_query_list(void)
154
0
{
155
0
  query_list_t *it;
156
157
0
  for (it=*query_list;it;it=it->next)
158
0
  {
159
0
    lock_destroy(it->lock);
160
0
    lock_dealloc(it->lock);
161
0
    shm_free(it);
162
0
  }
163
164
0
  lock_destroy(ql_lock);
165
0
  lock_dealloc(ql_lock);
166
0
}
167
168
/* to be called only at shutdown *
169
 * flushes all remaining rows to DB
170
 * and frees memory */
171
void handle_ql_shutdown(void)
172
0
{
173
0
  if (query_buffer_size > 1 && query_list && *query_list)
174
0
  {
175
0
    flush_query_list();
176
0
    destroy_query_list();
177
0
  }
178
0
}
179
180
/* adds a new type of query to the list
181
 * assumes ql_lock is acquired*/
182
void ql_add_unsafe(query_list_t *entry)
183
0
{
184
0
  if (*query_list == NULL)
185
0
  {
186
0
    *query_list = entry;
187
0
    *last_query = entry;
188
0
  }
189
0
  else
190
0
  {
191
0
    (*last_query)->next=entry;
192
0
    entry->prev = *last_query;
193
0
    *last_query = entry;
194
0
  }
195
0
}
196
197
int ql_detach_rows_unsafe(query_list_t *entry,db_val_t ***ins_rows)
198
0
{
199
0
  static db_val_t **detached_rows = NULL;
200
0
  int no_rows;
201
202
0
  if (detached_rows == NULL)
203
0
  {
204
    /* one time allocate buffer to pkg */
205
0
    detached_rows = pkg_malloc(query_buffer_size*sizeof(db_val_t *));
206
0
    if (detached_rows == NULL)
207
0
    {
208
0
      LM_ERR("no more pkg mem\n");
209
0
      lock_release(entry->lock);
210
0
      return -1;
211
0
    }
212
0
  }
213
214
0
  if (entry->no_rows == 0)
215
0
    return 0;
216
217
0
  memcpy(detached_rows,entry->rows,query_buffer_size * sizeof(db_val_t *));
218
0
  memset(entry->rows,0,query_buffer_size * sizeof(db_val_t *));
219
220
0
  no_rows = entry->no_rows;
221
0
  LM_DBG("detached %d rows\n",no_rows);
222
223
0
  entry->no_rows = 0;
224
0
  entry->oldest_query = 0;
225
0
  *ins_rows = detached_rows;
226
227
0
  return no_rows;
228
0
}
229
230
/* safely adds a new row to the insert list
231
 * also checks if the queue is full and returns all the rows that need to
232
 * be flushed to DB to the caller
233
 *
234
 * returns the number of rows detached
235
 *
236
 * Important : it is the caller's job to shm_free the rows
237
 * after flushing to DB
238
 * */
239
int ql_row_add(query_list_t *entry,const db_val_t *row,db_val_t ***ins_rows)
240
0
{
241
0
  int val_size,i,len,no_rows = 0;
242
0
  char *pos;
243
0
  db_val_t *shm_row;
244
245
0
  val_size = entry->col_no * sizeof(db_val_t);
246
0
  for (i=0;i<entry->col_no;i++)
247
0
  {
248
0
    if (VAL_TYPE(row+i) == DB_STR && VAL_NULL(row+i) == 0)
249
0
    {
250
0
      val_size += VAL_STR(row+i).len;
251
0
      continue;
252
0
    }
253
0
    if (VAL_TYPE(row+i) == DB_STRING && VAL_NULL(row+i) == 0)
254
0
    {
255
0
      val_size += strlen(VAL_STRING(row+i))+1;
256
0
      continue;
257
0
    }
258
0
    if (VAL_TYPE(row+i) == DB_BLOB && VAL_NULL(row+i) == 0)
259
0
      val_size += VAL_BLOB(row+i).len;
260
0
  }
261
262
0
  shm_row = shm_malloc(val_size);
263
0
  if (shm_row == NULL)
264
0
  {
265
0
    LM_ERR("no more shm\n");
266
0
    return -1;
267
0
  }
268
269
0
  LM_DBG("adding row to table [%.*s] &  entry %p\n",entry->table.len,entry->table.s,entry);
270
271
  /* save row info to shm */
272
0
  pos = (char *)(shm_row + entry->col_no);
273
0
  memcpy(shm_row,row,entry->col_no * sizeof(db_val_t));
274
0
  for (i=0;i<entry->col_no;i++)
275
0
  {
276
0
    if (VAL_TYPE(row+i) == DB_STR && VAL_NULL(row+i) == 0)
277
0
    {
278
0
      len = VAL_STR(row+i).len;
279
0
      VAL_STR(shm_row+i).len = len;
280
0
      VAL_STR(shm_row+i).s = pos;
281
0
      memcpy(VAL_STR(shm_row+i).s,VAL_STR(row+i).s,len);
282
0
      pos += len;
283
0
      continue;
284
0
    }
285
0
    if (VAL_TYPE(row+i) == DB_STRING && VAL_NULL(row+i) == 0)
286
0
    {
287
0
      len = strlen(VAL_STRING(row+i)) + 1;
288
0
      VAL_STRING(shm_row+i) = pos;
289
0
      memcpy((void *)VAL_STRING(shm_row+i),VAL_STRING(row+i),len);
290
0
      pos += len;
291
0
      continue;
292
0
    }
293
0
    if (VAL_TYPE(row+i) == DB_BLOB && VAL_NULL(row+i) == 0)
294
0
    {
295
0
      len = VAL_BLOB(row+i).len;
296
0
      VAL_BLOB(shm_row+i).len = len;
297
0
      VAL_BLOB(shm_row+i).s = pos;
298
0
      memcpy(VAL_BLOB(shm_row+i).s,VAL_BLOB(row+i).s,len);
299
0
      pos += len;
300
0
    }
301
0
  }
302
303
0
  LM_DBG("before locking query entry\n");
304
0
  lock_get(entry->lock);
305
306
  /* store oldest query for timer to know */
307
0
  if (entry->no_rows == 0)
308
0
    entry->oldest_query = time(0);
309
310
0
  entry->rows[entry->no_rows++] = shm_row;
311
0
  LM_DBG("query for table [%.*s] has %d rows\n",entry->table.len,entry->table.s,entry->no_rows);
312
313
  /* is it time to flush to DB ? */
314
0
  if (entry->no_rows == query_buffer_size)
315
0
  {
316
0
    if ((no_rows = ql_detach_rows_unsafe(entry,ins_rows)) < 0)
317
0
    {
318
0
      LM_ERR("failed to detach rows for insertion\n");
319
0
      lock_release(entry->lock);
320
0
      return -1;
321
0
    }
322
0
  }
323
324
0
  lock_release(entry->lock);
325
0
  return no_rows;
326
0
}
327
328
/* initializez a new query entry */
329
query_list_t *ql_init(db_con_t *con,db_key_t *cols,int col_no)
330
0
{
331
0
  int key_size,row_q_size,size,i;
332
0
  char *pos;
333
0
  query_list_t *entry;
334
335
0
  key_size = col_no * sizeof(db_key_t) + col_no * sizeof(str);
336
0
  for (i=0;i<col_no;i++)
337
0
    key_size += cols[i]->len;
338
339
0
  row_q_size = sizeof(db_val_t *) * query_buffer_size;
340
0
  size = sizeof(query_list_t) +
341
0
    counted_max_processes * sizeof(db_con_t *) +
342
0
    con->table->len + key_size + row_q_size + con->url.len;
343
344
0
  entry = shm_malloc(size);
345
0
  if (entry == NULL)
346
0
  {
347
0
    LM_ERR("no more shm\n");
348
0
    return NULL;
349
0
  }
350
351
0
  memset(entry,0,size);
352
0
  LM_DBG("alloced %p for %d bytes\n",entry,size);
353
354
0
  entry->lock = lock_alloc();
355
0
  if (entry->lock == 0)
356
0
  {
357
0
    LM_ERR("failed to alloc lock\n");
358
0
    shm_free(entry);
359
0
    return NULL;
360
0
  }
361
362
0
  if (lock_init(entry->lock) == 0)
363
0
  {
364
0
    LM_ERR("failed to init lock\n");
365
0
    lock_dealloc(entry->lock);
366
0
    shm_free(entry);
367
0
    return NULL;
368
0
  }
369
370
  /* deal with the table name */
371
0
  entry->table.s = (char *)entry+sizeof(query_list_t);
372
0
  entry->table.len = con->table->len;
373
0
  memcpy(entry->table.s,con->table->s,con->table->len);
374
375
  /* deal with the columns */
376
0
  entry->cols = (db_key_t *)(void *)((char *)entry+sizeof(query_list_t)+
377
0
                con->table->len);
378
0
  entry->col_no = col_no;
379
380
0
  pos = (char *)(entry->cols + col_no) + col_no * sizeof(str);
381
0
  for (i=0;i<col_no;i++)
382
0
  {
383
0
    entry->cols[i] = (str *)(entry->cols + col_no) + i;
384
0
    entry->cols[i]->len = cols[i]->len;
385
0
    entry->cols[i]->s = pos;
386
0
    memcpy(pos,cols[i]->s,cols[i]->len);
387
0
    pos += cols[i]->len;
388
0
  }
389
390
  /* deal with the rows */
391
0
  entry->rows = (db_val_t **)(void *)((char *)(entry + 1) +
392
0
          con->table->len + key_size);
393
394
  /* save url for later use by timer */
395
0
  entry->url.s = (char *)entry + sizeof(query_list_t) +
396
0
          con->table->len + key_size + row_q_size;
397
0
  entry->url.len = con->url.len;
398
0
  memcpy(entry->url.s,con->url.s,con->url.len);
399
400
  /* build array of connections per process */
401
0
  entry->conn = (db_con_t**)(void *)((char *)(entry + 1) +
402
0
          con->table->len + key_size + row_q_size + con->url.len);
403
404
0
  LM_DBG("initialized query list for table [%.*s]\n",entry->table.len,entry->table.s);
405
0
  return entry;
406
0
}
407
408
/* attempts to find a query list described by the given parameters
409
 * if found, returns the entry
410
 * else, return NULL
411
 * assumes ql_lock is acquired
412
 */
413
query_list_t *find_query_list_unsafe(const str *table,db_key_t *cols,int col_no)
414
0
{
415
0
  query_list_t *it,*entry=NULL;
416
0
  int i;
417
418
0
  LM_DBG("attempting to find q\n");
419
420
0
  for (it=*query_list;it;it=it->next)
421
0
  {
422
0
    LM_DBG("iterating through %p\n",it);
423
424
    /* match number of columns */
425
0
    if (it->col_no != col_no)
426
0
    {
427
0
      LM_DBG("different col no it = %d , %d\n",it->col_no,col_no);
428
0
      continue;
429
0
    }
430
431
    /* match table name */
432
0
    if (it->table.len != table->len ||
433
0
        memcmp(it->table.s,table->s,table->len) != 0)
434
0
    {
435
0
      LM_DBG("different tables - [%.*s] - [%.*s] \n",it->table.len,it->table.s,
436
0
            table->len,table->s);
437
0
      continue;
438
0
    }
439
440
    /* match columns */
441
0
    for (i=0;i<col_no;i++)
442
0
    {
443
0
      if (it->cols[i]->len != cols[i]->len ||
444
0
          memcmp(it->cols[i]->s,cols[i]->s,cols[i]->len) != 0)
445
0
      {
446
0
        LM_DBG("failed matching column %d - [%.*s] - [%.*s]\n",i,it->cols[i]->len,
447
0
          it->cols[i]->s,cols[i]->len,cols[i]->s);
448
0
        goto next_query;
449
0
      }
450
0
    }
451
452
    /* got here, we have found our match */
453
0
    entry = it;
454
0
    LM_DBG("successful match on %p\n",entry);
455
0
    break;
456
457
0
next_query:
458
0
    ;
459
0
  }
460
461
0
  LM_DBG("returning %p\n",entry);
462
0
  return entry;
463
0
}
464
465
/* set's the query_list that will be used for inserts
466
 * on the provided db connection
467
 *
468
 * also takes care of initialisation of this is the first process
469
 * attempting to execute this type of query */
470
int con_set_inslist(db_func_t *dbf,db_con_t *con,query_list_t **list,
471
              db_key_t *cols,int col_no)
472
0
{
473
0
  query_list_t *entry;
474
475
  /* if buffering not enabled, ignore */
476
0
  if (query_buffer_size <= 1)
477
0
    return 0;
478
479
  /* if buffering is enabled, but user is using a module
480
   * that does not support multiple inserts,
481
   * also ignore */
482
0
  if (!DB_CAPABILITY(*dbf,DB_CAP_MULTIPLE_INSERT))
483
0
    return 0;
484
485
0
  if (list == NULL)
486
0
    return 0;
487
488
  /* first time we are being called from this process */
489
0
  if (*list == NULL)
490
0
  {
491
0
    LM_DBG("first inslist call. searching for query list \n");
492
0
    lock_get(ql_lock);
493
0
    entry = find_query_list_unsafe(con->table,cols,col_no);
494
0
    if (entry == NULL)
495
0
    {
496
0
      LM_DBG("couldn't find entry for this query\n");
497
      /* first query of this type is done from this process,
498
       * it's my job to initialize the query list
499
       * and save for later use */
500
0
      entry = ql_init(con,cols,col_no);
501
0
      if (entry == NULL)
502
0
      {
503
0
        LM_ERR("failed to initialize ins queue\n");
504
0
        lock_release(ql_lock);
505
0
        return -1;
506
0
      }
507
508
0
      ql_add_unsafe(entry);
509
0
      con->ins_list = entry;
510
0
      *list = entry;
511
0
    }
512
0
    else
513
0
    {
514
0
      LM_DBG("query list already exists - attaching\n");
515
      /* another process has done a query of this type,
516
       * just attach to the con and save for later use */
517
0
      con->ins_list = entry;
518
0
      *list = entry;
519
0
    }
520
521
0
    lock_release(ql_lock);
522
0
    return 0;
523
0
  }
524
0
  else
525
0
  {
526
    /* we've previously found our query list */
527
0
    LM_DBG("process already found it's query list\n");
528
0
    con->ins_list = *list;
529
0
  }
530
531
0
  LM_DBG("successfully returned from con_set_inslist\n");
532
0
  return 0;
533
0
}
534
535
/* clean shm memory used by the rows */
536
void cleanup_rows(db_val_t **rows)
537
0
{
538
0
  int i;
539
540
0
  if (rows != NULL)
541
0
    for (i=0;i<query_buffer_size;i++)
542
0
      if (rows[i] != NULL)
543
0
      {
544
0
        shm_free(rows[i]);
545
0
        rows[i] = NULL;
546
0
      }
547
0
}
548
549
/* handler for timer
550
 * that flushes old rows to DB */
551
void ql_timer_routine(unsigned int ticks,void *param)
552
0
{
553
0
  query_list_t *it;
554
0
  time_t now;
555
556
0
  now = time(0);
557
558
0
  for (it=*query_list;it;it=it->next)
559
0
  {
560
0
    lock_get(it->lock);
561
562
    /* are there any old queries in queue ? */
563
0
    if (it->oldest_query && (now - it->oldest_query > query_flush_time))
564
0
    {
565
0
      LM_DBG("insert timer kicking in for query %p [%d]\n",it, it->no_rows);
566
567
0
      if (it->dbf.init == NULL)
568
0
      {
569
        /* first time timer kicked in for this query */
570
0
        if (db_bind_mod(&it->url,&it->dbf) < 0)
571
0
        {
572
0
          LM_ERR("timer failed to bind to db\n");
573
0
          lock_release(it->lock);
574
0
          continue;
575
0
        }
576
0
      }
577
578
0
      if (it->conn[process_no] == NULL)
579
0
      {
580
0
        if (!it->dbf.init) {
581
0
          LM_ERR("DB engine does not have init function\n");
582
0
          lock_release(it->lock);
583
0
          continue;
584
0
        }
585
0
        it->conn[process_no] = it->dbf.init(&it->url);
586
0
        if (it->conn[process_no] == 0)
587
0
        {
588
0
          LM_ERR("unable to connect to DB\n");
589
0
          lock_release(it->lock);
590
0
          continue;
591
0
        }
592
593
0
        LM_DBG("timer has init conn for query %p\n",it);
594
0
      }
595
596
0
      it->dbf.use_table(it->conn[process_no],&it->table);
597
598
      /* simulate the finding of the right query list */
599
0
      it->conn[process_no]->ins_list = it;
600
      /* tell the core that this is the insert timer handler */
601
0
      CON_FLUSH_UNSAFE(it->conn[process_no]);
602
603
      /* no actual new row to provide, flush existing ones */
604
0
      if (it->dbf.insert(it->conn[process_no],it->cols,(db_val_t *)-1,
605
0
            it->col_no) < 0)
606
0
        LM_ERR("failed to insert rows to DB\n");
607
0
    }
608
0
    else
609
0
      lock_release(it->lock);
610
0
  }
611
0
}
612
613
int ql_flush_rows(db_func_t *dbf,db_con_t *conn,query_list_t *entry)
614
0
{
615
0
  if (query_buffer_size <= 1 || !entry)
616
0
    return 0;
617
618
  /* simulate the finding of the right query list */
619
0
  conn->ins_list = entry;
620
  /* tell the core that we need to flush right away */
621
0
  CON_FLUSH_SAFE(conn);
622
623
  /* no actual new row to provide, flush existing ones */
624
0
  if (dbf->insert(conn,entry->cols,(db_val_t *)-1,entry->col_no) < 0)
625
0
  {
626
0
    LM_ERR("failed to flush rows to DB\n");
627
0
    return -1;
628
0
  }
629
630
0
  return 0;
631
0
}
632
633
void ql_force_process_disconnect(int p_id)
634
0
{
635
0
  query_list_t *it;
636
637
0
  if (query_list) {
638
0
    for (it=*query_list;it;it=it->next) {
639
0
      lock_get(it->lock);
640
641
0
      if (it->conn[p_id]) {
642
0
        it->dbf.close(it->conn[p_id]);
643
0
        it->conn[p_id]=NULL;
644
0
      }
645
646
0
      lock_release(it->lock);
647
0
    }
648
0
  }
649
0
}
650