Coverage Report

Created: 2025-11-12 07:01

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/tarantool/src/box/xrow_update_array.c
Line
Count
Source
1
/*
2
 * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file.
3
 *
4
 * Redistribution and use in source and binary forms, with or
5
 * without modification, are permitted provided that the following
6
 * conditions are met:
7
 *
8
 * 1. Redistributions of source code must retain the above
9
 *    copyright notice, this list of conditions and the
10
 *    following disclaimer.
11
 *
12
 * 2. Redistributions in binary form must reproduce the above
13
 *    copyright notice, this list of conditions and the following
14
 *    disclaimer in the documentation and/or other materials
15
 *    provided with the distribution.
16
 *
17
 * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
18
 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
19
 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
20
 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
21
 * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
22
 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23
 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
25
 * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
26
 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
28
 * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
29
 * SUCH DAMAGE.
30
 */
31
#include "xrow_update_field.h"
32
#include "msgpuck.h"
33
#include "fiber.h"
34
#include "schema_def.h"
35
#include "tuple_format.h"
36
37
/**
38
 * Make sure @a op contains a valid field number to where the
39
 * operation should be applied next. Field number may be not
40
 * known, if the array's parent didn't propagate operation's
41
 * lexer. In fact, the parent fills fieldno only in some rare
42
 * cases like branching. Generally, an array should care about
43
 * fieldno by itself.
44
 */
45
static inline int
46
xrow_update_op_prepare_num_token(struct xrow_update_op *op)
47
0
{
48
0
  if (op->is_token_consumed && xrow_update_op_next_token(op) != 0)
49
0
    return -1;
50
0
  if (op->token_type != JSON_TOKEN_NUM) {
51
0
    return xrow_update_err(op, "can't update an array by a "\
52
0
               "non-numeric index");
53
0
  }
54
0
  return 0;
55
0
}
56
57
/**
58
 * Make field index non-negative and check for the field
59
 * existence.
60
 */
61
static inline int
62
xrow_update_op_adjust_field_no(struct xrow_update_op *op, int32_t field_count)
63
0
{
64
0
  assert(op->token_type == JSON_TOKEN_NUM && !op->is_token_consumed);
65
0
  if (op->field_no >= 0) {
66
0
    if (op->field_no < field_count)
67
0
      return 0;
68
0
  } else if (op->field_no + field_count >= 0) {
69
0
    op->field_no += field_count;
70
0
    return 0;
71
0
  }
72
0
  return xrow_update_err_no_such_field(op);
73
0
}
74
75
/**
76
 * Updated array is divided into array items. Each item is a range
77
 * of fields. Item updates first field of the range and stores
78
 * size of others to save them with no changes into a new tuple
79
 * later. It allows on update of a single field in an array store
80
 * at most 2 objects - item for the previous fields, and item for
81
 * this one + its tail. This is how rope data structure works - a
82
 * binary tree designed for big contiguous object updates.
83
 */
84
struct xrow_update_array_item {
85
  /** First field in the range, contains an update. */
86
  struct xrow_update_field field;
87
  /** Pointer to other fields in the range. */
88
  const char *tail_data;
89
  /** Size of other fields in the range. */
90
  uint32_t tail_size;
91
};
92
93
/** Initialize an array item. */
94
static inline void
95
xrow_update_array_item_create(struct xrow_update_array_item *item,
96
            enum xrow_update_type type, const char *data,
97
            uint32_t size, uint32_t tail_size)
98
0
{
99
0
  item->field.type = type;
100
0
  item->field.data = data;
101
0
  item->field.size = size;
102
0
  item->tail_data = data + size;
103
0
  item->tail_size = tail_size;
104
0
}
105
106
/** Split a range of fields in two. */
107
static struct xrow_update_array_item *
108
xrow_update_array_item_split(struct region *region,
109
           struct xrow_update_array_item *prev, size_t size,
110
           size_t offset)
111
0
{
112
0
  (void) size;
113
0
  struct xrow_update_array_item *next = (struct xrow_update_array_item *)
114
0
    xrow_update_alloc(region, sizeof(*next));
115
0
  assert(offset > 0 && prev->tail_size > 0);
116
117
0
  const char *field = prev->tail_data;
118
0
  const char *range_end = prev->tail_data + prev->tail_size;
119
120
0
  for (uint32_t i = 1; i < offset; ++i)
121
0
    mp_next(&field);
122
123
0
  prev->tail_size = field - prev->tail_data;
124
0
  const char *field_end = field;
125
0
  mp_next(&field_end);
126
0
  xrow_update_array_item_create(next, XUPDATE_NOP, field,
127
0
              field_end - field, range_end - field_end);
128
0
  return next;
129
0
}
130
131
0
#define ROPE_SPLIT_F xrow_update_array_item_split
132
0
#define ROPE_ALLOC_F xrow_update_alloc
133
0
#define rope_data_t struct xrow_update_array_item *
134
#define rope_ctx_t struct region *
135
#define rope_name xrow_update
136
137
#include "salad/rope.h"
138
139
/**
140
 * Extract from the array an item whose range starts from the
141
 * field affected by @a op.
142
 */
143
static inline struct xrow_update_array_item *
144
xrow_update_array_extract_item(struct xrow_update_field *field,
145
             struct xrow_update_op *op)
146
0
{
147
0
  assert(field->type == XUPDATE_ARRAY);
148
0
  struct xrow_update_rope *rope = field->array.rope;
149
0
  uint32_t size = xrow_update_rope_size(rope);
150
0
  if (xrow_update_op_adjust_field_no(op, size) == 0)
151
0
    return xrow_update_rope_extract(rope, op->field_no);
152
0
  return NULL;
153
0
}
154
155
void
156
xrow_update_array_create(struct xrow_update_field *field, const char *header,
157
       const char *data, const char *data_end,
158
       uint32_t field_count)
159
0
{
160
0
  field->type = XUPDATE_ARRAY;
161
0
  field->data = header;
162
0
  field->size = data_end - header;
163
0
  struct region *region = &fiber()->gc;
164
0
  field->array.rope = xrow_update_rope_new(region);
165
0
  assert(field->array.rope != NULL);
166
0
  struct xrow_update_array_item *item = (struct xrow_update_array_item *)
167
0
    xrow_update_alloc(region, sizeof(*item));
168
0
  if (data == data_end)
169
0
    return;
170
  /*
171
   * Initial item consists of one range - the whole array.
172
   */
173
0
  const char *begin = data;
174
0
  mp_next(&data);
175
0
  xrow_update_array_item_create(item, XUPDATE_NOP, begin, data - begin,
176
0
              data_end - data);
177
0
  int rc = xrow_update_rope_append(field->array.rope, item, field_count);
178
0
  assert(rc == 0);
179
0
  (void)rc;
180
0
}
181
182
void
183
xrow_update_array_create_with_child(struct xrow_update_field *field,
184
            const char *header,
185
            const struct xrow_update_field *child,
186
            int32_t field_no)
187
0
{
188
0
  const char *data = header;
189
0
  uint32_t field_count = mp_decode_array(&data);
190
0
  const char *first_field = data;
191
0
  const char *first_field_end = first_field;
192
0
  mp_next(&first_field_end);
193
0
  struct region *region = &fiber()->gc;
194
0
  struct xrow_update_rope *rope = xrow_update_rope_new(region);
195
0
  assert(rope != NULL);
196
0
  struct xrow_update_array_item *item = (struct xrow_update_array_item *)
197
0
    xrow_update_alloc(region, sizeof(*item));
198
0
  const char *end = first_field_end;
199
0
  if (field_no > 0) {
200
0
    for (int32_t i = 1; i < field_no; ++i)
201
0
      mp_next(&end);
202
0
    xrow_update_array_item_create(item, XUPDATE_NOP, first_field,
203
0
                first_field_end - first_field,
204
0
                end - first_field_end);
205
0
    int rc = xrow_update_rope_append(rope, item, field_no);
206
0
    assert(rc == 0);
207
0
    (void)rc;
208
0
    item = (struct xrow_update_array_item *)
209
0
      xrow_update_alloc(region, sizeof(*item));
210
0
    first_field = end;
211
0
    first_field_end = first_field;
212
0
    mp_next(&first_field_end);
213
0
    end = first_field_end;
214
0
  }
215
0
  for (uint32_t i = field_no + 1; i < field_count; ++i)
216
0
    mp_next(&end);
217
0
  item->field = *child;
218
0
  xrow_update_array_item_create(item, child->type, first_field,
219
0
              first_field_end - first_field,
220
0
              end - first_field_end);
221
0
  field->type = XUPDATE_ARRAY;
222
0
  field->data = header;
223
0
  field->size = end - header;
224
0
  field->array.rope = rope;
225
0
  int rc = xrow_update_rope_append(rope, item, field_count - field_no);
226
0
  assert(rc == 0);
227
0
  (void)rc;
228
0
}
229
230
uint32_t
231
xrow_update_array_sizeof(struct xrow_update_field *field)
232
0
{
233
0
  assert(field->type == XUPDATE_ARRAY);
234
0
  struct xrow_update_rope_iter it;
235
0
  xrow_update_rope_iter_create(&it, field->array.rope);
236
237
0
  uint32_t size = xrow_update_rope_size(field->array.rope);
238
0
  uint32_t res = mp_sizeof_array(size);
239
0
  const struct xrow_update_rope_node *node =
240
0
          xrow_update_rope_iter_start(&it);
241
0
  for (; node != NULL; node = xrow_update_rope_iter_next(&it)) {
242
0
    struct xrow_update_array_item *item =
243
0
      xrow_update_rope_leaf_data(node);
244
0
    res += xrow_update_field_sizeof(&item->field) + item->tail_size;
245
0
  }
246
0
  return res;
247
0
}
248
249
uint32_t
250
xrow_update_array_store(struct xrow_update_field *field,
251
      struct json_tree *format_tree,
252
      struct json_token *this_node, char *out, char *out_end)
253
0
{
254
0
  assert(field->type == XUPDATE_ARRAY);
255
0
  char *out_begin = out;
256
0
  out = mp_encode_array(out, xrow_update_rope_size(field->array.rope));
257
0
  struct xrow_update_rope_iter it;
258
0
  xrow_update_rope_iter_create(&it, field->array.rope);
259
0
  const struct xrow_update_rope_node *node =
260
0
          xrow_update_rope_iter_start(&it);
261
0
  uint32_t total_field_count = 0;
262
0
  if (this_node == NULL) {
263
0
    for (; node != NULL; node = xrow_update_rope_iter_next(&it)) {
264
0
      struct xrow_update_array_item *item =
265
0
        xrow_update_rope_leaf_data(node);
266
0
      uint32_t field_count = xrow_update_rope_leaf_size(node);
267
0
      out += xrow_update_field_store(&item->field, NULL, NULL,
268
0
                   out, out_end);
269
0
      assert(item->tail_size == 0 || field_count > 1);
270
0
      memcpy(out, item->tail_data, item->tail_size);
271
0
      out += item->tail_size;
272
0
      total_field_count += field_count;
273
0
    }
274
0
  } else {
275
0
    struct json_token token;
276
0
    token.type = JSON_TOKEN_NUM;
277
0
    token.num = 0;
278
0
    struct json_token *next_node;
279
0
    for (; node != NULL; node = xrow_update_rope_iter_next(&it)) {
280
0
      struct xrow_update_array_item *item =
281
0
        xrow_update_rope_leaf_data(node);
282
0
      next_node = json_tree_lookup(format_tree, this_node, &token);
283
0
      uint32_t field_count = xrow_update_rope_leaf_size(node);
284
0
      out += xrow_update_field_store(&item->field, format_tree,
285
0
                   next_node, out, out_end);
286
0
      assert(item->tail_size == 0 || field_count > 1);
287
0
      memcpy(out, item->tail_data, item->tail_size);
288
0
      out += item->tail_size;
289
0
      token.num += field_count;
290
0
      total_field_count += field_count;
291
0
    }
292
0
  }
293
0
  (void) total_field_count;
294
0
  assert(xrow_update_rope_size(field->array.rope) == total_field_count);
295
0
  assert(out <= out_end);
296
0
  return out - out_begin;
297
0
}
298
299
/**
300
 * Helper function that appends nils in the end so that op will insert
301
 * without gaps
302
 */
303
static void
304
xrow_update_array_append_nils(struct xrow_update_field *field,
305
            struct xrow_update_op *op)
306
0
{
307
0
  struct xrow_update_rope *rope = field->array.rope;
308
0
  uint32_t size = xrow_update_rope_size(rope);
309
0
  if (op->field_no < 0 || (uint32_t)op->field_no <= size)
310
0
    return;
311
  /*
312
   * Do not allow autofill of nested arrays with nulls. It is not
313
   * supported only because there is no an easy way how to apply that to
314
   * bar updates which can also affect arrays.
315
   */
316
0
  if (!op->is_for_root)
317
0
    return;
318
0
  uint32_t nil_count = op->field_no - size;
319
0
  struct xrow_update_array_item *item =
320
0
    (struct xrow_update_array_item *)
321
0
    xrow_update_alloc(rope->ctx, sizeof(*item));
322
0
  assert(mp_sizeof_nil() == 1);
323
0
  char *item_data = (char *)xregion_alloc(rope->ctx, nil_count);
324
0
  memset(item_data, 0xc0, nil_count);
325
0
  xrow_update_array_item_create(item, XUPDATE_NOP, item_data, 1,
326
0
              nil_count - 1);
327
0
  int rc = xrow_update_rope_insert(rope, op->field_no, item, nil_count);
328
0
  assert(rc == 0);
329
0
  (void)rc;
330
0
}
331
332
int
333
xrow_update_op_do_array_insert(struct xrow_update_op *op,
334
             struct xrow_update_field *field)
335
0
{
336
0
  assert(field->type == XUPDATE_ARRAY);
337
0
  struct xrow_update_array_item *item;
338
0
  if (xrow_update_op_prepare_num_token(op) != 0)
339
0
    return -1;
340
341
0
  if (!xrow_update_op_is_term(op)) {
342
0
    item = xrow_update_array_extract_item(field, op);
343
0
    if (item == NULL)
344
0
      return -1;
345
0
    op->is_token_consumed = true;
346
0
    return xrow_update_op_do_field_insert(op, &item->field);
347
0
  }
348
349
0
  xrow_update_array_append_nils(field, op);
350
351
0
  struct xrow_update_rope *rope = field->array.rope;
352
0
  uint32_t size = xrow_update_rope_size(rope);
353
0
  int64_t tuple_field_cnt_lim = BOX_FIELD_MAX;
354
0
  struct errinj *err_inj =
355
0
    errinj(ERRINJ_TUPLE_FIELD_COUNT_LIMIT, ERRINJ_INT);
356
0
  if (err_inj != NULL && err_inj->iparam > 0) {
357
0
    tuple_field_cnt_lim = err_inj->iparam;
358
0
  }
359
0
  if (size >= tuple_field_cnt_lim) {
360
0
    diag_set(ClientError, ER_TUPLE_FIELD_COUNT_LIMIT);
361
0
    return -1;
362
0
  }
363
0
  if (xrow_update_op_adjust_field_no(op, size + 1) != 0)
364
0
    return -1;
365
366
0
  item = (struct xrow_update_array_item *)
367
0
    xrow_update_alloc(rope->ctx, sizeof(*item));
368
0
  xrow_update_array_item_create(item, XUPDATE_NOP, op->arg.set.value,
369
0
              op->arg.set.length, 0);
370
0
  int rc = xrow_update_rope_insert(rope, op->field_no, item, 1);
371
0
  assert(rc == 0);
372
0
  (void)rc;
373
0
  return 0;
374
0
}
375
376
int
377
xrow_update_op_do_array_set(struct xrow_update_op *op,
378
          struct xrow_update_field *field)
379
0
{
380
0
  assert(field->type == XUPDATE_ARRAY);
381
0
  struct xrow_update_rope *rope = field->array.rope;
382
0
  if (xrow_update_op_prepare_num_token(op) != 0)
383
0
    return -1;
384
385
  /* Interpret '=' for n + 1 field as insert. */
386
0
  if (op->field_no >= (int32_t) xrow_update_rope_size(rope))
387
0
    return xrow_update_op_do_array_insert(op, field);
388
389
0
  struct xrow_update_array_item *item =
390
0
    xrow_update_array_extract_item(field, op);
391
0
  if (item == NULL)
392
0
    return -1;
393
0
  if (!xrow_update_op_is_term(op)) {
394
0
    op->is_token_consumed = true;
395
0
    return xrow_update_op_do_field_set(op, &item->field);
396
0
  }
397
0
  item->field.type = XUPDATE_NOP;
398
0
  item->field.data = op->arg.set.value;
399
0
  item->field.size = op->arg.set.length;
400
0
  return 0;
401
0
}
402
403
int
404
xrow_update_op_do_array_delete(struct xrow_update_op *op,
405
             struct xrow_update_field *field)
406
0
{
407
0
  assert(field->type == XUPDATE_ARRAY);
408
0
  if (xrow_update_op_prepare_num_token(op) != 0)
409
0
    return -1;
410
411
0
  if (!xrow_update_op_is_term(op)) {
412
0
    struct xrow_update_array_item *item =
413
0
      xrow_update_array_extract_item(field, op);
414
0
    if (item == NULL)
415
0
      return -1;
416
0
    op->is_token_consumed = true;
417
0
    return xrow_update_op_do_field_delete(op, &item->field);
418
0
  }
419
420
0
  struct xrow_update_rope *rope = field->array.rope;
421
0
  uint32_t size = xrow_update_rope_size(rope);
422
0
  if (xrow_update_op_adjust_field_no(op, size) != 0) {
423
0
    if (op->field_no >= (int)size)
424
0
      return 0;
425
0
    return -1;
426
0
  }
427
0
  uint32_t delete_count = op->arg.del.count;
428
0
  if ((uint64_t) op->field_no + delete_count > size)
429
0
    delete_count = size - op->field_no;
430
0
  assert(delete_count > 0);
431
0
  xrow_update_rope_erase(rope, op->field_no, delete_count);
432
0
  return 0;
433
0
}
434
435
#define DO_SCALAR_OP_GENERIC(op_type)           \
436
int                   \
437
xrow_update_op_do_array_##op_type(struct xrow_update_op *op,      \
438
0
          struct xrow_update_field *field)    \
439
0
{                   \
440
0
  if (xrow_update_op_prepare_num_token(op) != 0)       \
441
0
    return -1;             \
442
0
  struct xrow_update_array_item *item =         \
443
0
    xrow_update_array_extract_item(field, op);      \
444
0
  if (item == NULL)             \
445
0
    return -1;             \
446
0
  if (!xrow_update_op_is_term(op)) {         \
447
0
    op->is_token_consumed = true;         \
448
0
    return xrow_update_op_do_field_##op_type(op, &item->field);  \
449
0
  }                 \
450
0
  struct xrow_update_scalar *scalar = &item->field.scalar;    \
451
0
  /* Just stub for non scalar field. op_do_ * will fail on it. */   \
452
0
  struct xrow_update_scalar na = { .type = XUPDATE_TYPE_NONE };   \
453
0
  if (item->field.type == XUPDATE_NOP) {         \
454
0
    const char *data = item->field.data;        \
455
0
    xrow_update_mp_read_scalar(&data, &item->field.scalar);   \
456
0
  } else if (item->field.type != XUPDATE_SCALAR) {     \
457
0
    scalar = &na;             \
458
0
  }                  \
459
0
  if (xrow_update_op_do_##op_type(op, scalar) != 0)     \
460
0
    return -1;             \
461
0
  item->field.type = XUPDATE_SCALAR;          \
462
0
  return 0;               \
463
0
}
464
465
0
DO_SCALAR_OP_GENERIC(arith)
466
467
0
DO_SCALAR_OP_GENERIC(bit)
468
469
DO_SCALAR_OP_GENERIC(splice)