Coverage Report

Created: 2025-11-05 06:27

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/tarantool/src/box/alter.cc
Line
Count
Source
1
/*
2
 * Copyright 2010-2016, Tarantool AUTHORS, please see AUTHORS file.
3
 *
4
 * Redistribution and use in source and binary forms, with or
5
 * without modification, are permitted provided that the following
6
 * conditions are met:
7
 *
8
 * 1. Redistributions of source code must retain the above
9
 *    copyright notice, this list of conditions and the
10
 *    following disclaimer.
11
 *
12
 * 2. Redistributions in binary form must reproduce the above
13
 *    copyright notice, this list of conditions and the following
14
 *    disclaimer in the documentation and/or other materials
15
 *    provided with the distribution.
16
 *
17
 * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
18
 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
19
 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
20
 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
21
 * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
22
 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23
 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
25
 * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
26
 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
28
 * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
29
 * SUCH DAMAGE.
30
 */
31
#include "alter.h"
32
#include "assoc.h"
33
#include "column_mask.h"
34
#include "schema.h"
35
#include "user.h"
36
#include "space.h"
37
#include "index.h"
38
#include "func.h"
39
#include "coll_id_cache.h"
40
#include "coll_id_def.h"
41
#include "txn.h"
42
#include "txn_limbo.h"
43
#include "tuple.h"
44
#include "tuple_constraint.h"
45
#include "fiber.h" /* for gc_pool */
46
#include "scoped_guard.h"
47
#include <base64.h>
48
#include <new> /* for placement new */
49
#include <stdio.h> /* snprintf() */
50
#include <ctype.h>
51
#include "replication.h" /* for replica_set_id() */
52
#include "session.h" /* to fetch the current user. */
53
#include "xrow.h"
54
#include "iproto_constants.h"
55
#include "identifier.h"
56
#include "version.h"
57
#include "sequence.h"
58
#include "sql.h"
59
#include "space_upgrade.h"
60
#include "box.h"
61
#include "authentication.h"
62
#include "node_name.h"
63
#include "core/func_adapter.h"
64
#include "relay.h"
65
#include "gc.h"
66
#include "memtx_tx.h"
67
68
/* {{{ Auxiliary functions and methods. */
69
70
static void
71
box_schema_version_bump(void)
72
0
{
73
0
  ++schema_version;
74
0
  box_broadcast_schema();
75
0
}
76
77
/**
78
 * Checks if the current user can perform a DDL operation on an object with
79
 * the given name, owner id, and cached runtime access information.
80
 * Returns 0 on success, -1 on failure.
81
 */
82
static int
83
access_check_ddl(const char *name, uint32_t owner_uid, struct access *object,
84
     enum schema_object_type type, enum priv_type priv_type)
85
0
{
86
0
  struct credentials *cr = effective_user();
87
0
  user_access_t has_access = cr->universal_access;
88
89
0
  user_access_t access = ((PRIV_U | (user_access_t) priv_type) &
90
0
        ~has_access);
91
0
  bool is_owner = owner_uid == cr->uid || cr->uid == ADMIN;
92
0
  if (access == 0)
93
0
    return 0; /* Access granted. */
94
  /* Check for specific entity access. */
95
0
  struct access *entity = entity_access_get(type);
96
0
  if (entity != NULL)
97
0
    access &= ~entity[cr->auth_token].effective;
98
  /*
99
   * Only the owner of the object or someone who has
100
   * specific DDL privilege on the object can execute
101
   * DDL. If a user has no USAGE access and is owner,
102
   * deny access as well.
103
   * If a user wants to CREATE an object, they're of course
104
   * the owner of the object, but this should be ignored --
105
   * CREATE privilege is required.
106
   */
107
0
  if (access == 0 || (is_owner && !(access & (PRIV_U | PRIV_C))))
108
0
    return 0; /* Access granted. */
109
  /*
110
   * USAGE can be granted only globally.
111
   */
112
0
  if (!(access & PRIV_U)) {
113
    /* Check for privileges on a single object. */
114
0
    if (object != NULL)
115
0
      access &= ~object[cr->auth_token].effective;
116
0
    if (access == 0)
117
0
      return 0; /* Access granted. */
118
0
  }
119
  /* Create a meaningful error message. */
120
0
  struct user *user = user_find(cr->uid);
121
0
  if (user == NULL)
122
0
    return -1;
123
0
  const char *object_name;
124
0
  const char *pname;
125
0
  if (access & PRIV_U) {
126
0
    object_name = schema_object_name(SC_UNIVERSE);
127
0
    pname = priv_name(PRIV_U);
128
0
    name = "";
129
0
  } else {
130
0
    object_name = schema_object_name(type);
131
0
    pname = priv_name(access);
132
0
  }
133
0
  diag_set(AccessDeniedError, pname, object_name, name, user->def->name);
134
0
  return -1;
135
0
}
136
137
/**
138
 * Return an error if the given index definition
139
 * is incompatible with a sequence.
140
 */
141
static int
142
index_def_check_sequence(struct index_def *index_def, uint32_t sequence_fieldno,
143
       const char *sequence_path, uint32_t sequence_path_len,
144
       const char *space_name)
145
0
{
146
0
  struct key_def *key_def = index_def->key_def;
147
0
  struct key_part *sequence_part = NULL;
148
0
  for (uint32_t i = 0; i < key_def->part_count; ++i) {
149
0
    struct key_part *part = &key_def->parts[i];
150
0
    if (part->fieldno != sequence_fieldno)
151
0
      continue;
152
0
    if ((part->path == NULL && sequence_path == NULL) ||
153
0
        (part->path != NULL && sequence_path != NULL &&
154
0
         json_path_cmp(part->path, part->path_len,
155
0
           sequence_path, sequence_path_len,
156
0
           TUPLE_INDEX_BASE) == 0)) {
157
0
      sequence_part = part;
158
0
      break;
159
0
    }
160
0
  }
161
0
  if (sequence_part == NULL) {
162
0
    diag_set(ClientError, ER_MODIFY_INDEX, index_def->name,
163
0
       space_name, "sequence field must be a part of "
164
0
             "the index");
165
0
    return -1;
166
0
  }
167
0
  enum field_type type = sequence_part->type;
168
0
  if (type != FIELD_TYPE_UNSIGNED && type != FIELD_TYPE_INTEGER) {
169
0
    diag_set(ClientError, ER_MODIFY_INDEX, index_def->name,
170
0
       space_name, "sequence cannot be used with "
171
0
             "a non-integer key");
172
0
    return -1;
173
0
  }
174
0
  return 0;
175
0
}
176
177
/**
178
 * Support function for index_def_new_from_tuple(..)
179
 * Checks tuple (of _index space) and returns an error if it is invalid
180
 * Checks only types of fields and their count!
181
 */
182
static int
183
index_def_check_tuple(struct tuple *tuple)
184
0
{
185
0
  const mp_type common_template[] =
186
0
    {MP_UINT, MP_UINT, MP_STR, MP_STR, MP_MAP, MP_ARRAY};
187
0
  const char *data = tuple_data(tuple);
188
0
  uint32_t field_count = mp_decode_array(&data);
189
0
  const char *field_start = data;
190
0
  if (field_count != 6)
191
0
    goto err;
192
0
  for (size_t i = 0; i < lengthof(common_template); i++) {
193
0
    enum mp_type type = mp_typeof(*data);
194
0
    if (type != common_template[i])
195
0
      goto err;
196
0
    mp_next(&data);
197
0
  }
198
0
  return 0;
199
200
0
err:
201
0
  char got[DIAG_ERRMSG_MAX];
202
0
  char *p = got, *e = got + sizeof(got);
203
0
  data = field_start;
204
0
  for (uint32_t i = 0; i < field_count && p < e; i++) {
205
0
    enum mp_type type = mp_typeof(*data);
206
0
    mp_next(&data);
207
0
    p += snprintf(p, e - p, i ? ", %s" : "%s", mp_type_strs[type]);
208
0
  }
209
0
  diag_set(ClientError, ER_WRONG_INDEX_RECORD, got,
210
0
      "space id (unsigned), index id (unsigned), name (string), "\
211
0
      "type (string), options (map), parts (array)");
212
0
  return -1;
213
0
}
214
215
/**
216
 * Validates index options.
217
 */
218
static int
219
index_opts_validate(struct index_opts *opts)
220
0
{
221
0
  if (opts->distance == rtree_index_distance_type_MAX) {
222
0
    diag_set(ClientError, ER_WRONG_INDEX_OPTIONS,
223
0
       "distance must be either 'euclid' or 'manhattan'");
224
0
    return -1;
225
0
  }
226
0
  if (opts->page_size <= 0 || (opts->range_size > 0 &&
227
0
             opts->page_size > opts->range_size)) {
228
0
    diag_set(ClientError, ER_WRONG_INDEX_OPTIONS,
229
0
       "page_size must be greater than 0 and "
230
0
       "less than or equal to range_size");
231
0
    return -1;
232
0
  }
233
0
  if (opts->run_count_per_level <= 0) {
234
0
    diag_set(ClientError, ER_WRONG_INDEX_OPTIONS,
235
0
       "run_count_per_level must be greater than 0");
236
0
    return -1;
237
0
  }
238
0
  if (opts->run_size_ratio <= 1) {
239
0
    diag_set(ClientError, ER_WRONG_INDEX_OPTIONS,
240
0
       "run_size_ratio must be greater than 1");
241
0
    return -1;
242
0
  }
243
0
  if (opts->bloom_fpr <= 0 || opts->bloom_fpr > 1) {
244
0
    diag_set(ClientError, ER_WRONG_INDEX_OPTIONS,
245
0
       "bloom_fpr must be greater than 0 and "
246
0
       "less than or equal to 1");
247
0
    return -1;
248
0
  }
249
0
  int rc = -1;
250
0
  struct region *gc = &fiber()->gc;
251
0
  size_t gc_svp = region_used(gc);
252
0
  if (opts->covered_field_count != 0) {
253
0
    uint32_t *fields = xregion_alloc_array(
254
0
            gc, typeof(*fields),
255
0
            opts->covered_field_count);
256
0
    memcpy(fields, opts->covered_fields,
257
0
           opts->covered_field_count * sizeof(*fields));
258
0
    qsort(fields, opts->covered_field_count, sizeof(*fields),
259
0
          cmp_u32);
260
0
    for (uint32_t i = 0; i < opts->covered_field_count; i++) {
261
0
      if (i > 0 && fields[i] == fields[i - 1]) {
262
0
        diag_set(ClientError, ER_WRONG_INDEX_OPTIONS,
263
0
           "'covers' has duplicates");
264
0
        goto out;
265
0
      }
266
0
    }
267
0
  }
268
0
  rc = 0;
269
0
out:
270
0
  region_truncate(gc, gc_svp);
271
0
  return rc;
272
0
}
273
274
/** Decode an optional node name field from the tuple. */
275
static int
276
tuple_field_node_name(char *out, struct tuple *tuple, uint32_t fieldno,
277
          const char *field_name)
278
0
{
279
0
  const char *name, *field;
280
0
  uint32_t len;
281
0
  if (tuple == NULL)
282
0
    goto nil;
283
0
  field = tuple_field(tuple, fieldno);
284
0
  if (field == NULL || mp_typeof(*field) == MP_NIL)
285
0
    goto nil;
286
0
  if (mp_typeof(*field) != MP_STR)
287
0
    goto error;
288
0
  name = mp_decode_str(&field, &len);
289
0
  if (!node_name_is_valid_n(name, len))
290
0
    goto error;
291
0
  memcpy(out, name, len);
292
0
  out[len] = 0;
293
0
  return 0;
294
0
nil:
295
0
  *out = 0;
296
0
  return 0;
297
0
error:
298
0
  diag_set(ClientError, ER_FIELD_TYPE, field_name, "a valid name",
299
0
     "a bad name");
300
0
  return -1;
301
0
}
302
303
/**
304
 * Helper routine for functional index function verification:
305
 * only a deterministic persistent Lua function may be used in
306
 * functional index for now; memtx MVCC does not support functional
307
 * multikey indexes.
308
 */
309
static int
310
0
func_index_check_func(struct func *func) {
311
0
  assert(func != NULL);
312
313
0
  if (memtx_tx_manager_use_mvcc_engine && func->def->opts.is_multikey) {
314
0
    diag_set(ClientError, ER_UNSUPPORTED, "Memtx MVCC engine",
315
0
       "functional multikey indexes");
316
0
    return -1;
317
0
  }
318
319
0
  if (func->def->language != FUNC_LANGUAGE_LUA ||
320
0
      func->def->body == NULL || !func->def->is_deterministic) {
321
0
    const char *errmsg = tt_sprintf(
322
0
      "function '%s' doesn't meet functional index "
323
0
      "function criteria (stored, deterministic, written in Lua)",
324
0
      func->def->name);
325
0
    diag_set(ClientError, ER_WRONG_INDEX_OPTIONS, errmsg);
326
0
    return -1;
327
0
  }
328
0
  return 0;
329
0
}
330
331
/**
332
 * Create a index_def object from a record in _index
333
 * system space.
334
 *
335
 * Check that:
336
 * - index id is within range
337
 * - index type is supported
338
 * - part count > 0
339
 * - there are parts for the specified part count
340
 * - types of parts in the parts array are known to the system
341
 * - fieldno of each part in the parts array is within limits
342
 */
343
static struct index_def *
344
index_def_new_from_tuple(struct tuple *tuple, struct space *space)
345
0
{
346
0
  if (index_def_check_tuple(tuple) != 0)
347
0
    return NULL;
348
349
0
  struct index_opts opts;
350
0
  uint32_t id;
351
0
  if (tuple_field_u32(tuple, BOX_INDEX_FIELD_SPACE_ID, &id) != 0)
352
0
    return NULL;
353
0
  uint32_t index_id;
354
0
  if (tuple_field_u32(tuple, BOX_INDEX_FIELD_ID, &index_id) != 0)
355
0
    return NULL;
356
0
  const char *out = tuple_field_cstr(tuple, BOX_INDEX_FIELD_TYPE);
357
0
  if (out == NULL)
358
0
    return NULL;
359
0
  enum index_type type = STR2ENUM(index_type, out);
360
0
  uint32_t name_len;
361
0
  const char *name = tuple_field_str(tuple, BOX_INDEX_FIELD_NAME,
362
0
             &name_len);
363
0
  if (name == NULL)
364
0
    return NULL;
365
0
  const char *opts_field = tuple_field_with_type(tuple,
366
0
         BOX_INDEX_FIELD_OPTS, MP_MAP);
367
0
  if (opts_field == NULL)
368
0
    return NULL;
369
0
  index_opts_create(&opts);
370
0
  struct region *gc = &fiber()->gc;
371
0
  RegionGuard region_guard(gc);
372
0
  if (opts_decode(&opts, index_opts_reg, &opts_field, gc) != 0) {
373
0
    diag_set(ClientError, ER_WRONG_INDEX_OPTIONS,
374
0
       diag_last_error(diag_get())->errmsg);
375
0
    return NULL;
376
0
  }
377
0
  if (index_opts_validate(&opts) != 0)
378
0
    return NULL;
379
0
  if (opts.covered_field_count != 0 && index_id == 0) {
380
0
    diag_set(ClientError, ER_WRONG_INDEX_OPTIONS,
381
0
       "covers is allowed only for secondary index");
382
0
    return NULL;
383
0
  }
384
0
  if (name_len > BOX_NAME_MAX) {
385
0
    diag_set(ClientError, ER_MODIFY_INDEX,
386
0
        tt_cstr(name, BOX_INVALID_NAME_MAX),
387
0
        space_name(space), "index name is too long");
388
0
    return NULL;
389
0
  }
390
0
  if (identifier_check(name, name_len) != 0)
391
0
    return NULL;
392
393
0
  const char *parts = tuple_field(tuple, BOX_INDEX_FIELD_PARTS);
394
0
  uint32_t part_count = mp_decode_array(&parts);
395
0
  if (part_count == 0) {
396
0
    diag_set(ClientError, ER_MODIFY_INDEX, tt_cstr(name, name_len),
397
0
       space_name(space), "part count must be positive");
398
0
    return NULL;
399
0
  }
400
0
  if (part_count > BOX_INDEX_PART_MAX) {
401
0
    diag_set(ClientError, ER_MODIFY_INDEX, tt_cstr(name, name_len),
402
0
       space_name(space), "too many key parts");
403
0
    return NULL;
404
0
  }
405
0
  struct key_def *key_def = NULL;
406
0
  struct key_part_def *part_def = (struct key_part_def *)
407
0
    malloc(sizeof(*part_def) * part_count);
408
0
  if (part_def == NULL) {
409
0
    diag_set(OutOfMemory, sizeof(*part_def) * part_count,
410
0
       "malloc", "key_part_def");
411
0
    return NULL;
412
0
  }
413
0
  auto key_def_guard = make_scoped_guard([&] {
414
0
    free(part_def);
415
0
    if (key_def != NULL)
416
0
      key_def_delete(key_def);
417
0
  });
418
0
  if (key_def_decode_parts(part_def, part_count, &parts,
419
0
         space->def->fields,
420
0
         space->def->field_count, &fiber()->gc) != 0)
421
0
    return NULL;
422
0
  bool for_func_index = opts.func_id > 0;
423
0
  key_def = key_def_new(part_def, part_count,
424
0
            (type != TREE ? KEY_DEF_UNORDERED : 0) |
425
0
            (for_func_index ? KEY_DEF_FOR_FUNC_INDEX : 0));
426
0
  if (key_def == NULL)
427
0
    return NULL;
428
0
  struct index_def *index_def =
429
0
    index_def_new(id, index_id, name, name_len, space->def->name,
430
0
            space->def->engine_name, type, &opts, key_def,
431
0
            space_index_key_def(space, 0));
432
0
  auto index_def_guard = make_scoped_guard([=] { index_def_delete(index_def); });
433
0
  if (index_def_check(index_def, space_name(space)) != 0)
434
0
    return NULL;
435
0
  if (space_check_index_def(space, index_def) != 0)
436
0
    return NULL;
437
  /*
438
   * Set opts.hint to the unambiguous ON or OFF value. This
439
   * allows to compare opts.hint like in index_opts_is_equal()
440
   * or memtx_index_def_change_requires_rebuild().
441
   */
442
0
  if (index_def->opts.hint == INDEX_HINT_DEFAULT) {
443
0
    if (space_is_memtx(space) && type == TREE)
444
0
      index_def->opts.hint = INDEX_HINT_ON;
445
0
    else
446
0
      index_def->opts.hint = INDEX_HINT_OFF;
447
0
  }
448
  /*
449
   * In case of functional index definition, resolve a
450
   * function pointer to perform a complete index build
451
   * (istead of initializing it in inactive state) in
452
   * on_replace_dd_index trigger. This allows wrap index
453
   * creation operation into transaction: only the first
454
   * opperation in transaction is allowed to yeld.
455
   *
456
   * The initialisation during recovery is slightly
457
   * different, because function cache is not initialized
458
   * during _index space loading. Therefore the completion
459
   * of a functional index creation is performed in
460
   * _func_index space's trigger, via IndexRebuild
461
   * operation.
462
   */
463
0
  struct func *func = NULL;
464
0
  if (for_func_index && (func = func_by_id(opts.func_id)) != NULL) {
465
0
    if (func_access_check(func) != 0)
466
0
      return NULL;
467
0
    if (func_index_check_func(func) != 0)
468
0
      return NULL;
469
0
    index_def_set_func(index_def, func);
470
0
  }
471
0
  index_def_guard.is_active = false;
472
0
  return index_def;
473
0
}
474
475
/**
476
 * Fill space opts from the msgpack stream (MP_MAP field in the
477
 * tuple).
478
 */
479
static int
480
space_opts_decode(struct space_opts *opts, const char *map,
481
      struct region *region)
482
0
{
483
0
  space_opts_create(opts);
484
0
  opts->type = SPACE_TYPE_DEFAULT;
485
0
  if (opts_decode(opts, space_opts_reg, &map, region) != 0) {
486
0
    diag_set(ClientError, ER_WRONG_SPACE_OPTIONS,
487
0
       diag_last_error(diag_get())->errmsg);
488
0
    return -1;
489
0
  }
490
  /*
491
   * This can only be SPACE_TYPE_DEFAULT if neither 'type' nor 'temporary'
492
   * was specified, which means the space type is normal.
493
   */
494
0
  if (opts->type == SPACE_TYPE_DEFAULT)
495
0
    opts->type = SPACE_TYPE_NORMAL;
496
0
  return 0;
497
0
}
498
499
/**
500
 * Fill space_def structure from struct tuple.
501
 */
502
static struct space_def *
503
space_def_new_from_tuple(struct tuple *tuple, uint32_t errcode,
504
       struct region *region)
505
0
{
506
0
  uint32_t name_len;
507
0
  const char *name = tuple_field_str(tuple, BOX_SPACE_FIELD_NAME,
508
0
             &name_len);
509
0
  if (name == NULL)
510
0
    return NULL;
511
0
  if (name_len > BOX_NAME_MAX) {
512
0
    diag_set(ClientError, errcode,
513
0
       tt_cstr(name, BOX_INVALID_NAME_MAX),
514
0
       "space name is too long");
515
0
    return NULL;
516
0
  }
517
0
  if (identifier_check(name, name_len) != 0)
518
0
    return NULL;
519
0
  uint32_t id;
520
0
  if (tuple_field_u32(tuple, BOX_SPACE_FIELD_ID, &id) != 0)
521
0
    return NULL;
522
0
  if (id > BOX_SPACE_MAX) {
523
0
    diag_set(ClientError, errcode, tt_cstr(name, name_len),
524
0
       "space id is too big");
525
0
    return NULL;
526
0
  }
527
0
  if (id == 0) {
528
0
    diag_set(ClientError, errcode, tt_cstr(name, name_len),
529
0
       "space id 0 is reserved");
530
0
    return NULL;
531
0
  }
532
0
  uint32_t uid;
533
0
  if (tuple_field_u32(tuple, BOX_SPACE_FIELD_UID, &uid) != 0)
534
0
    return NULL;
535
0
  uint32_t exact_field_count;
536
0
  if (tuple_field_u32(tuple, BOX_SPACE_FIELD_FIELD_COUNT,
537
0
          &exact_field_count) != 0)
538
0
    return NULL;
539
0
  uint32_t engine_name_len;
540
0
  const char *engine_name = tuple_field_str(tuple,
541
0
    BOX_SPACE_FIELD_ENGINE, &engine_name_len);
542
0
  if (engine_name == NULL)
543
0
    return NULL;
544
  /*
545
   * Engines are compiled-in so their names are known in
546
   * advance to be shorter than names of other identifiers.
547
   */
548
0
  if (engine_name_len > ENGINE_NAME_MAX) {
549
0
    diag_set(ClientError, errcode, tt_cstr(name, name_len),
550
0
       "space engine name is too long");
551
0
    return NULL;
552
0
  }
553
0
  if (identifier_check(engine_name, engine_name_len) != 0)
554
0
    return NULL;
555
  /* Check space opts. */
556
0
  const char *space_opts = tuple_field_with_type(tuple,
557
0
    BOX_SPACE_FIELD_OPTS, MP_MAP);
558
0
  if (space_opts == NULL)
559
0
    return NULL;
560
  /* Check space format */
561
0
  const char *format = tuple_field_with_type(tuple,
562
0
    BOX_SPACE_FIELD_FORMAT, MP_ARRAY);
563
0
  if (format == NULL)
564
0
    return NULL;
565
0
  const char *format_ptr = format;
566
0
  struct field_def *fields = NULL;
567
0
  uint32_t field_count;
568
0
  RegionGuard region_guard(&fiber()->gc);
569
0
  if (field_def_array_decode(&format_ptr, &fields, &field_count,
570
0
           region, false) != 0)
571
0
    return NULL;
572
0
  size_t format_len = format_ptr - format;
573
0
  if (exact_field_count != 0 &&
574
0
      exact_field_count < field_count) {
575
0
    diag_set(ClientError, errcode, tt_cstr(name, name_len),
576
0
       "exact_field_count must be either 0 or >= "\
577
0
        "formatted field count");
578
0
    return NULL;
579
0
  }
580
0
  struct space_opts opts;
581
0
  if (space_opts_decode(&opts, space_opts, region) != 0)
582
0
    return NULL;
583
  /*
584
   * Currently, only predefined replication groups
585
   * are supported.
586
   */
587
0
  if (opts.group_id != GROUP_DEFAULT &&
588
0
      opts.group_id != GROUP_LOCAL) {
589
0
    diag_set(ClientError, ER_NO_SUCH_GROUP,
590
0
       int2str(opts.group_id));
591
0
    return NULL;
592
0
  }
593
0
  if (opts.is_view && opts.sql == NULL) {
594
0
    diag_set(ClientError, ER_VIEW_MISSING_SQL);
595
0
    return NULL;
596
0
  }
597
0
  if (opts.is_sync && opts.group_id == GROUP_LOCAL) {
598
0
    diag_set(ClientError, errcode, tt_cstr(name, name_len),
599
0
       "local space can't be synchronous");
600
0
    return NULL;
601
0
  }
602
0
  if (space_opts_is_temporary(&opts) && opts.constraint_count > 0) {
603
0
    diag_set(ClientError, ER_UNSUPPORTED, "temporary space",
604
0
       "constraints");
605
0
    return NULL;
606
0
  }
607
0
  struct space_def *def =
608
0
    space_def_new(id, uid, exact_field_count, name, name_len,
609
0
            engine_name, engine_name_len, &opts, fields,
610
0
            field_count, format, format_len);
611
0
  if (def == NULL)
612
0
    return NULL;
613
0
  auto def_guard = make_scoped_guard([=] { space_def_delete(def); });
614
0
  struct engine *engine = engine_find(def->engine_name);
615
0
  if (engine == NULL)
616
0
    return NULL;
617
0
  if (engine_check_space_def(engine, def) != 0)
618
0
    return NULL;
619
0
  def_guard.is_active = false;
620
0
  return def;
621
0
}
622
623
/**
624
 * Space old and new space triggers (move the original triggers
625
 * to the new space, or vice versa, restore the original triggers
626
 * in the old space).
627
 */
628
static void
629
space_swap_triggers(struct space *new_space, struct space *old_space)
630
0
{
631
0
  rlist_swap(&new_space->before_replace, &old_space->before_replace);
632
0
  rlist_swap(&new_space->on_replace, &old_space->on_replace);
633
  /** Swap SQL Triggers pointer. */
634
0
  struct sql_trigger *new_value = new_space->sql_triggers;
635
0
  new_space->sql_triggers = old_space->sql_triggers;
636
0
  old_space->sql_triggers = new_value;
637
0
}
638
639
/**
640
 * True if the space has records identified by key 'uid'.
641
 * Uses 'iid' index.
642
 */
643
int
644
space_has_data(uint32_t id, uint32_t iid, uint32_t uid, bool *out)
645
0
{
646
0
  struct space *space = space_by_id(id);
647
0
  if (space == NULL) {
648
0
    *out = false;
649
0
    return 0;
650
0
  }
651
652
0
  if (space_index(space, iid) == NULL) {
653
0
    *out = false;
654
0
    return 0;
655
0
  }
656
657
0
  if (!space_is_memtx(space)) {
658
0
    diag_set(ClientError, ER_UNSUPPORTED,
659
0
       space->engine->name, "system data");
660
0
    return -1;
661
0
  }
662
0
  struct index *index = index_find(space, iid);
663
0
  if (index == NULL)
664
0
    return -1;
665
666
0
  char key[6];
667
0
  assert(mp_sizeof_uint(BOX_SYSTEM_ID_MIN) <= sizeof(key));
668
0
  mp_encode_uint(key, uid);
669
0
  struct iterator *it = index_create_iterator(index, ITER_EQ, key, 1);
670
0
  if (it == NULL)
671
0
    return -1;
672
0
  IteratorGuard iter_guard(it);
673
0
  struct tuple *tuple;
674
0
  if (iterator_next(it, &tuple) != 0)
675
0
    return -1;
676
0
  *out = (tuple != NULL);
677
0
  return 0;
678
0
}
679
680
/* }}} */
681
682
/* {{{ struct alter_space - the body of a full blown alter */
683
struct alter_space;
684
685
class AlterSpaceOp {
686
public:
687
  AlterSpaceOp(struct alter_space *alter);
688
689
  /** Link in alter_space::ops. */
690
  struct rlist link;
691
  /**
692
   * Called before creating the new space. Used to update
693
   * the space definition and/or key list that will be used
694
   * for creating the new space. Must not yield or fail.
695
   */
696
0
  virtual void alter_def(struct alter_space * /* alter */) {}
697
  /**
698
   * Called after creating a new space. Used for performing
699
   * long-lasting operations, such as index rebuild or format
700
   * check. May yield. May throw an exception. Must not modify
701
   * the old space.
702
   */
703
0
  virtual void prepare(struct alter_space * /* alter */) {}
704
  /**
705
   * Called after all registered operations have completed
706
   * the preparation phase. Used to propagate the old space
707
   * state to the new space (e.g. move unchanged indexes).
708
   * Must not yield or fail.
709
   */
710
0
  virtual void alter(struct alter_space * /* alter */) {}
711
  /**
712
   * Called after the change has been successfully written
713
   * to WAL. Must not fail.
714
   */
715
  virtual void commit(struct alter_space * /* alter */,
716
0
          int64_t /* signature */) {}
717
  /**
718
   * Called in case a WAL error occurred. It is supposed to undo
719
   * the effect of AlterSpaceOp::prepare and AlterSpaceOp::alter.
720
   * Must not fail.
721
   */
722
0
  virtual void rollback(struct alter_space * /* alter */) {}
723
724
0
  virtual ~AlterSpaceOp() {}
725
726
  void *operator new(size_t size)
727
0
  {
728
0
    void *ptr = xregion_aligned_alloc(&in_txn()->region, size,
729
0
              alignof(uint64_t));
730
0
    memset(ptr, 0, size);
731
0
    return ptr;
732
0
  }
733
0
  void operator delete(void * /* ptr */) {}
734
};
735
736
/**
737
 * A trigger installed on transaction commit/rollback events of
738
 * the transaction which initiated the alter.
739
 */
740
static struct trigger *
741
txn_alter_trigger_new(trigger_f run, void *data)
742
0
{
743
0
  size_t size = sizeof(struct trigger);
744
0
  struct trigger *trigger = (struct trigger *)
745
0
    region_aligned_alloc(&in_txn()->region, size,
746
0
             alignof(struct trigger));
747
0
  if (trigger == NULL) {
748
0
    diag_set(OutOfMemory, size, "region", "struct trigger");
749
0
    return NULL;
750
0
  }
751
0
  trigger_create(trigger, run, data, NULL);
752
0
  return trigger;
753
0
}
754
755
/**
756
 * List of all alive alter_space objects.
757
 */
758
static RLIST_HEAD(alter_space_list);
759
760
struct alter_space {
761
  /** Link in alter_space_list. */
762
  struct rlist in_list;
763
  /** Transaction doing this alter. */
764
  struct txn *txn;
765
  /** List of alter operations */
766
  struct rlist ops;
767
  /** Definition of the new space - space_def. */
768
  struct space_def *space_def;
769
  /** Definition of the new space - keys. */
770
  struct rlist key_list;
771
  /** Old space. */
772
  struct space *old_space;
773
  /** New space. */
774
  struct space *new_space;
775
  /**
776
   * Assigned to the new primary key definition if we're
777
   * rebuilding the primary key, i.e. changing its key parts
778
   * substantially.
779
   */
780
  struct key_def *pk_def;
781
  /**
782
   * Min field count of a new space. It is calculated before
783
   * the new space is created and used to update optionality
784
   * of key_defs and key_parts.
785
   */
786
  uint32_t new_min_field_count;
787
  /**
788
   * Number of rows in the transaction at the time when this
789
   * DDL operation was performed. It is used to compute this
790
   * operation signature on commit, which is needed to keep
791
   * xlog in sync with vylog, see alter_space_commit().
792
   */
793
  int n_rows;
794
};
795
796
/**
797
 * Fiber cond that wakes up all waiters on every deletion of alter_space.
798
 */
799
static FIBER_COND(alter_space_delete_cond);
800
801
static struct alter_space *
802
alter_space_new(struct space *old_space)
803
0
{
804
0
  struct txn *txn = in_txn();
805
0
  size_t size = sizeof(struct alter_space);
806
0
  struct alter_space *alter = (struct alter_space *)
807
0
    region_aligned_alloc(&in_txn()->region, size,
808
0
             alignof(struct alter_space));
809
0
  if (alter == NULL) {
810
0
    diag_set(OutOfMemory, size, "region", "struct alter_space");
811
0
    return NULL;
812
0
  }
813
0
  alter = (struct alter_space *)memset(alter, 0, size);
814
0
  rlist_create(&alter->ops);
815
0
  rlist_add_entry(&alter_space_list, alter, in_list);
816
0
  alter->txn = in_txn();
817
0
  alter->old_space = old_space;
818
0
  alter->space_def = space_def_dup(alter->old_space->def);
819
0
  alter->new_min_field_count = old_space->format->min_field_count;
820
0
  alter->n_rows = txn_n_rows(txn);
821
0
  return alter;
822
0
}
823
824
/** Destroy alter. */
825
static void
826
alter_space_delete(struct alter_space *alter)
827
0
{
828
0
  fiber_cond_broadcast(&alter_space_delete_cond);
829
0
  rlist_del_entry(alter, in_list);
830
  /* Destroy the ops. */
831
0
  while (! rlist_empty(&alter->ops)) {
832
0
    AlterSpaceOp *op = rlist_shift_entry(&alter->ops,
833
0
                 AlterSpaceOp, link);
834
0
    delete op;
835
0
  }
836
  /* Delete the new space, if any. */
837
0
  if (alter->new_space)
838
0
    space_delete(alter->new_space);
839
0
  space_def_delete(alter->space_def);
840
0
}
841
842
AlterSpaceOp::AlterSpaceOp(struct alter_space *alter)
843
0
{
844
  /* Add to the tail: operations must be processed in order. */
845
0
  rlist_add_tail_entry(&alter->ops, this, link);
846
0
}
847
848
/**
849
 * This is a per-space lock which protects the space from
850
 * concurrent DDL. The current algorithm template for DDL is:
851
 * 1) Capture change of a system table in a on_replace
852
 *    trigger
853
 * 2) Build new schema object, e..g new struct space, and insert
854
 *    it in the cache - all the subsequent transactions will begin
855
 *    using this object
856
 * 3) Write the operation to WAL; this yields, giving a window to
857
 *    concurrent transactions to use the object, but if there is
858
 *    a rollback of WAL write, the roll back is *cascading*, so all
859
 *    subsequent transactions are rolled back first.
860
 * Step 2 doesn't yield most of the time - e.g. rename of
861
 * a column, or a compatible change of the format builds a new
862
 * space objects immediately. Some long operations run in
863
 * background, after WAL write: this is drop index, and, transitively,
864
 * drop space, so these don't yield either. But a few operations
865
 * need to do a long job *before* WAL write: this is create index and
866
 * deploy of the new format, which checks each space row to
867
 * conform with index/format constraints, row by row. So this lock
868
 * is here exactly for these operations. If we allow another DDL
869
 * against the same space to get in while these operations are in
870
 * progress, it will use old space object in the space cache, and
871
 * thus overwrite this transaction's space object, or, worse yet,
872
 * will get overwritten itself when a long-running DDL completes.
873
 *
874
 * Since we consider such concurrent operations to be rare, this
875
 * lock is optimistic: if there is a lock already, we simply throw
876
 * an exception.
877
 */
878
class AlterSpaceLock {
879
  /** Set of all taken locks. */
880
  static struct mh_i32_t *registry;
881
  /** Identifier of the space this lock is for. */
882
  uint32_t space_id;
883
public:
884
  /** Take a lock for the altered space. */
885
0
  AlterSpaceLock(struct alter_space *alter) {
886
0
    if (registry == NULL) {
887
0
      registry = mh_i32_new();
888
0
    }
889
0
    space_id = alter->old_space->def->id;
890
0
    if (mh_i32_find(registry, space_id, NULL) != mh_end(registry)) {
891
0
      tnt_raise(ClientError, ER_ALTER_SPACE,
892
0
          space_name(alter->old_space),
893
0
          "the space is already being modified");
894
0
    }
895
0
    mh_i32_put(registry, &space_id, NULL, NULL);
896
0
  }
897
0
  ~AlterSpaceLock() {
898
0
    mh_int_t k = mh_i32_find(registry, space_id, NULL);
899
0
    assert(k != mh_end(registry));
900
0
    mh_i32_del(registry, k, NULL);
901
0
  }
902
};
903
904
struct mh_i32_t *AlterSpaceLock::registry;
905
906
/**
907
 * Commit the alter.
908
 *
909
 * Move all unchanged indexes from the old space to the new space.
910
 * Set the newly built indexes in the new space, or free memory
911
 * of the dropped indexes.
912
 * Replace the old space with a new one in the space cache.
913
 */
914
static int
915
alter_space_commit(struct trigger *trigger, void *event)
916
0
{
917
0
  (void)event;
918
0
  struct txn *txn = in_txn();
919
0
  struct alter_space *alter = (struct alter_space *) trigger->data;
920
  /*
921
   * The engine (vinyl) expects us to pass the signature of
922
   * the row that performed this operation, not the signature
923
   * of the transaction itself (this is needed to sync vylog
924
   * with xlog on recovery). It's trivial to get this given
925
   * the number of rows in the transaction at the time when
926
   * the operation was performed.
927
   */
928
0
  int64_t signature = txn->signature - txn_n_rows(txn) + alter->n_rows;
929
  /*
930
   * Commit alter ops, this will move the changed
931
   * indexes into their new places.
932
   */
933
0
  class AlterSpaceOp *op;
934
0
  try {
935
0
    rlist_foreach_entry(op, &alter->ops, link)
936
0
      op->commit(alter, signature);
937
0
  } catch (Exception *e) {
938
0
    return -1;
939
0
  }
940
941
0
  struct space *space = alter->new_space;
942
0
  alter->new_space = NULL; /* for alter_space_delete(). */
943
  /*
944
   * Delete the old version of the space, we are not
945
   * going to use it.
946
   */
947
0
  space_delete(alter->old_space);
948
0
  alter->old_space = NULL;
949
0
  alter_space_delete(alter);
950
951
0
  space_upgrade_run(space);
952
0
  return 0;
953
0
}
954
955
/**
956
 * Rollback all effects of space alter. This is
957
 * a transaction trigger, and it fires most likely
958
 * upon a failed write to the WAL.
959
 *
960
 * Keep in mind that we may end up here in case of
961
 * alter_space_commit() failure (unlikely)
962
 */
963
static int
964
alter_space_rollback(struct trigger *trigger, void * /* event */)
965
0
{
966
0
  struct alter_space *alter = (struct alter_space *) trigger->data;
967
  /* Rollback alter ops */
968
0
  class AlterSpaceOp *op;
969
0
  try {
970
0
    rlist_foreach_entry(op, &alter->ops, link) {
971
0
      op->rollback(alter);
972
0
    }
973
0
  } catch (Exception *e) {
974
0
    return -1;
975
0
  }
976
  /* Rebuild index maps once for all indexes. */
977
0
  space_fill_index_map(alter->old_space);
978
0
  space_fill_index_map(alter->new_space);
979
  /*
980
   * Don't forget about space triggers, foreign keys and
981
   * constraints.
982
   */
983
0
  space_swap_triggers(alter->new_space, alter->old_space);
984
0
  space_reattach_constraints(alter->old_space);
985
0
  space_pin_collations(alter->old_space);
986
0
  space_pin_defaults(alter->old_space);
987
0
  space_cache_replace(alter->new_space, alter->old_space);
988
0
  SWAP(alter->new_space->sequence_path, alter->old_space->sequence_path);
989
0
  alter_space_delete(alter);
990
0
  return 0;
991
0
}
992
993
/**
994
 * alter_space_do() - do all the work necessary to
995
 * create a new space.
996
 *
997
 * If something may fail during alter, it must be done here,
998
 * before a record is written to the Write Ahead Log.  Only
999
 * trivial and infallible actions are left to the commit phase
1000
 * of the alter.
1001
 *
1002
 * The implementation of this function follows "Template Method"
1003
 * pattern, providing a skeleton of the alter, while all the
1004
 * details are encapsulated in AlterSpaceOp methods.
1005
 *
1006
 * These are the major steps of alter defining the structure of
1007
 * the algorithm and performed regardless of what is altered:
1008
 *
1009
 * - a copy of the definition of the old space is created
1010
 * - the definition of the old space is altered, to get
1011
 *   definition of a new space
1012
 * - an instance of the new space is created, according to the new
1013
 *   definition; the space is so far empty
1014
 * - data structures of the new space are built; sometimes, it
1015
 *   doesn't need to happen, e.g. when alter only changes the name
1016
 *   of a space or an index, or other accidental property.
1017
 *   If any data structure needs to be built, e.g. a new index,
1018
 *   only this index is built, not the entire space with all its
1019
 *   indexes.
1020
 * - at commit, the new space is coalesced with the old one.
1021
 *   On rollback, the new space is deleted.
1022
 */
1023
static void
1024
alter_space_do(struct txn_stmt *stmt, struct alter_space *alter)
1025
0
{
1026
0
  struct space_alter_stmt alter_stmt;
1027
0
  alter_stmt.old_tuple = stmt->old_tuple;
1028
0
  alter_stmt.new_tuple = stmt->new_tuple;
1029
0
  rlist_add_entry(&stmt->space->alter_stmts, &alter_stmt, link);
1030
0
  auto alter_stmt_guard = make_scoped_guard([&] {
1031
0
    rlist_del_entry(&alter_stmt, link);
1032
0
  });
1033
  /**
1034
   * AlterSpaceOp::prepare() may perform a potentially long
1035
   * lasting operation that may yield, e.g. building of a new
1036
   * index. We really don't want the space to be replaced by
1037
   * another DDL operation while this one is in progress so
1038
   * we lock out all concurrent DDL for this space.
1039
   */
1040
0
  AlterSpaceLock lock(alter);
1041
  /*
1042
   * Prepare triggers while we may fail. Note, we don't have to
1043
   * free them in case of failure, because they are allocated on
1044
   * the region.
1045
   */
1046
0
  struct trigger *on_commit, *on_rollback;
1047
0
  on_commit = txn_alter_trigger_new(alter_space_commit, alter);
1048
0
  on_rollback = txn_alter_trigger_new(alter_space_rollback, alter);
1049
0
  if (on_commit == NULL || on_rollback == NULL)
1050
0
    diag_raise();
1051
1052
  /* Create a definition of the new space. */
1053
0
  space_dump_def(alter->old_space, &alter->key_list);
1054
0
  class AlterSpaceOp *op;
1055
  /*
1056
   * Alter the definition of the old space, so that
1057
   * a new space can be created with a new definition.
1058
   */
1059
0
  rlist_foreach_entry(op, &alter->ops, link)
1060
0
    op->alter_def(alter);
1061
  /*
1062
   * Create a new (empty) space for the new definition.
1063
   * Sic: the triggers are not moved over yet.
1064
   */
1065
0
  alter->new_space = space_new_xc(alter->space_def, &alter->key_list);
1066
  /*
1067
   * Copy the replace function, the new space is at the same recovery
1068
   * phase as the old one. This hack is especially necessary for
1069
   * system spaces, which may be altered in some row in the
1070
   * snapshot/xlog, but needs to continue staying "fully
1071
   * built".
1072
   */
1073
0
  space_prepare_alter_xc(alter->old_space, alter->new_space);
1074
1075
0
  alter->new_space->sequence = alter->old_space->sequence;
1076
0
  alter->new_space->sequence_fieldno = alter->old_space->sequence_fieldno;
1077
0
  SWAP(alter->new_space->sequence_path, alter->old_space->sequence_path);
1078
0
  memcpy(alter->new_space->access, alter->old_space->access,
1079
0
         sizeof(alter->old_space->access));
1080
1081
0
  space_prepare_upgrade_xc(alter->old_space, alter->new_space);
1082
1083
  /*
1084
   * Build new indexes, check if tuples conform to
1085
   * the new space format.
1086
   */
1087
0
  rlist_foreach_entry(op, &alter->ops, link)
1088
0
    op->prepare(alter);
1089
1090
  /*
1091
   * This function must not throw exceptions or yield after
1092
   * this point.
1093
   */
1094
1095
  /* Move old indexes, update space format. */
1096
0
  rlist_foreach_entry(op, &alter->ops, link)
1097
0
    op->alter(alter);
1098
1099
  /* Rebuild index maps once for all indexes. */
1100
0
  space_fill_index_map(alter->old_space);
1101
0
  space_fill_index_map(alter->new_space);
1102
1103
  /*
1104
   * Don't forget about space triggers, foreign keys and
1105
   * constraints.
1106
   */
1107
0
  space_swap_triggers(alter->new_space, alter->old_space);
1108
  /*
1109
   * The new space is ready. Time to update the space
1110
   * cache with it.
1111
   */
1112
0
  space_finish_alter(alter->old_space, alter->new_space);
1113
0
  space_cache_replace(alter->old_space, alter->new_space);
1114
0
  space_detach_constraints(alter->old_space);
1115
0
  space_unpin_collations(alter->old_space);
1116
0
  space_unpin_defaults(alter->old_space);
1117
  /*
1118
   * Install transaction commit/rollback triggers to either
1119
   * finish or rollback the DDL depending on the results of
1120
   * writing to WAL.
1121
   */
1122
0
  txn_stmt_on_commit(stmt, on_commit);
1123
0
  txn_stmt_on_rollback(stmt, on_rollback);
1124
0
}
1125
1126
/* }}}  */
1127
1128
/* {{{ AlterSpaceOp descendants - alter operations, such as Add/Drop index */
1129
1130
/**
1131
 * This operation does not modify the space, it just checks that
1132
 * tuples stored in it conform to the new format.
1133
 */
1134
class CheckSpaceFormat: public AlterSpaceOp
1135
{
1136
public:
1137
  CheckSpaceFormat(struct alter_space *alter)
1138
0
    :AlterSpaceOp(alter) {}
1139
  virtual void prepare(struct alter_space *alter);
1140
};
1141
1142
/**
1143
 * The object is used to grant ability to yield with RAII approach.
1144
 * Transaction is allowed to yield only on its first statement, so if the
1145
 * statement is not first, it simply does nothing.
1146
 * If it's the first statement, the guard blocks execution until all previous
1147
 * alters will be rolled back or committed so that the space object won't be
1148
 * deleted right from under our feet. In the case when the previous alters were
1149
 * rolled back and the space was removed from space cache, the constructor
1150
 * throws an error.
1151
 */
1152
class AlterYieldGuard
1153
{
1154
public:
1155
0
  AlterYieldGuard(struct space *old_space) {
1156
0
    if (!txn_is_first_statement(in_txn()))
1157
0
      return;
1158
0
    txn_can_yield(in_txn(), true);
1159
0
    uint32_t space_id = old_space->def->id;
1160
0
    while (true) {
1161
0
      bool space_is_being_altered = false;
1162
0
      struct alter_space *alter;
1163
0
      rlist_foreach_entry(alter, &alter_space_list, in_list) {
1164
0
        if (alter->txn != in_txn() &&
1165
0
            alter->old_space->def->id == space_id) {
1166
0
          space_is_being_altered = true;
1167
0
          break;
1168
0
        }
1169
0
      }
1170
0
      if (!space_is_being_altered)
1171
0
        break;
1172
      /*
1173
       * Wait for deletion of any alter to check if the
1174
       * space is being altered again.
1175
       */
1176
0
      fiber_cond_wait(&alter_space_delete_cond);
1177
0
    }
1178
    /* Check if the space is still alive. */
1179
0
    if (space_by_id(space_id) != old_space) {
1180
0
      txn_can_yield(in_txn(), false);
1181
      /* Cannot access the space name since it was deleted. */
1182
0
      tnt_raise(ClientError, ER_ALTER_SPACE,
1183
0
          tt_sprintf("%u", space_id),
1184
0
          "the space was concurrently modified");
1185
0
    }
1186
0
  }
1187
1188
0
  ~AlterYieldGuard() {
1189
0
    txn_can_yield(in_txn(), false);
1190
0
  }
1191
};
1192
1193
static inline void
1194
space_check_format_with_yield(struct space *space,
1195
            struct tuple_format *format)
1196
0
{
1197
0
  AlterYieldGuard guard(space);
1198
0
  space_check_format_xc(space, format);
1199
0
}
1200
1201
void
1202
CheckSpaceFormat::prepare(struct alter_space *alter)
1203
0
{
1204
0
  struct space *new_space = alter->new_space;
1205
0
  struct space *old_space = alter->old_space;
1206
0
  struct tuple_format *new_format = new_space->format;
1207
1208
0
  assert(new_format != NULL);
1209
0
  for (uint32_t i = 0; i < old_space->index_count; i++) {
1210
0
    struct key_def *key_def =
1211
0
      alter->old_space->index[i]->def->key_def;
1212
0
    if (!tuple_format_is_compatible_with_key_def(new_format,
1213
0
                   key_def))
1214
0
      diag_raise();
1215
0
  }
1216
1217
0
  if (new_space->upgrade != NULL) {
1218
    /*
1219
     * Tuples stored in the space will be checked against
1220
     * the new format during space upgrade.
1221
     */
1222
0
    return;
1223
0
  }
1224
0
  space_check_format_with_yield(old_space, new_format);
1225
0
}
1226
1227
/** Change non-essential properties of a space. */
1228
class ModifySpace: public AlterSpaceOp
1229
{
1230
  void space_swap_dictionaries(struct space_def *old_def,
1231
             struct space_def *new_def,
1232
             struct tuple_format *new_format);
1233
public:
1234
  ModifySpace(struct alter_space *alter, struct space_def *def)
1235
0
    : AlterSpaceOp(alter), new_def(def) {}
1236
  /* New space definition. */
1237
  struct space_def *new_def;
1238
  virtual void alter_def(struct alter_space *alter);
1239
  virtual void alter(struct alter_space *alter);
1240
  virtual void rollback(struct alter_space *alter);
1241
  virtual ~ModifySpace();
1242
};
1243
1244
void
1245
ModifySpace::space_swap_dictionaries(struct space_def *old_def,
1246
             struct space_def *new_def,
1247
             struct tuple_format *new_format)
1248
0
{
1249
  /*
1250
   * When new space_def is created, it allocates new dictionary.
1251
   * Move new names into an old dictionary, which already is referenced
1252
   * by existing tuple formats. New dictionary object is deleted later,
1253
   * in alter_space_delete.
1254
   */
1255
0
  tuple_dictionary_unref(new_format->dict);
1256
0
  new_format->dict = old_def->dict;
1257
0
  tuple_dictionary_ref(new_format->dict);
1258
1259
0
  SWAP(old_def->dict, new_def->dict);
1260
0
  tuple_dictionary_swap(old_def->dict, new_def->dict);
1261
0
}
1262
1263
/** Amend the definition of the new space. */
1264
void
1265
ModifySpace::alter_def(struct alter_space *alter)
1266
0
{
1267
0
  new_def->view_ref_count = alter->old_space->def->view_ref_count;
1268
0
  space_def_delete(alter->space_def);
1269
0
  alter->space_def = new_def;
1270
  /* Now alter owns the def. */
1271
0
  new_def = NULL;
1272
0
}
1273
1274
void
1275
ModifySpace::alter(struct alter_space *alter)
1276
0
{
1277
  /*
1278
   * Unless it's online space upgrade, use the old dictionary for the new
1279
   * space, because it's already referenced by existing tuple formats.
1280
   *
1281
   * For online space upgrade, all tuples fetched from the space will be
1282
   * upgraded to the new format before returning to the user so it isn't
1283
   * necessary. Moreover, the old tuples may be incompatible with the new
1284
   * format so using the new dictionary for them would be wrong and could
1285
   * result in error accessing fields by name from the space upgrade
1286
   * function.
1287
   */
1288
0
  if (alter->space_def->opts.upgrade_def == NULL) {
1289
0
    space_swap_dictionaries(alter->old_space->def,
1290
0
          alter->new_space->def,
1291
0
          alter->new_space->format);
1292
0
  }
1293
0
}
1294
1295
void
1296
ModifySpace::rollback(struct alter_space *alter)
1297
0
{
1298
0
  if (alter->space_def->opts.upgrade_def == NULL) {
1299
0
    space_swap_dictionaries(alter->new_space->def,
1300
0
          alter->old_space->def,
1301
0
          alter->new_space->format);
1302
0
  }
1303
0
}
1304
1305
ModifySpace::~ModifySpace()
1306
0
{
1307
0
  if (new_def != NULL)
1308
0
    space_def_delete(new_def);
1309
0
}
1310
1311
/** DropIndex - remove an index from space. */
1312
1313
class DropIndex: public AlterSpaceOp
1314
{
1315
public:
1316
  DropIndex(struct alter_space *alter, struct index *index)
1317
0
    :AlterSpaceOp(alter), old_index(index) {}
1318
  struct index *old_index;
1319
  virtual void alter_def(struct alter_space *alter);
1320
  virtual void prepare(struct alter_space *alter);
1321
  virtual void commit(struct alter_space *alter, int64_t lsn);
1322
};
1323
1324
/*
1325
 * Alter the definition of the new space and remove
1326
 * the new index from it.
1327
 */
1328
void
1329
DropIndex::alter_def(struct alter_space * /* alter */)
1330
0
{
1331
0
  rlist_del_entry(old_index->def, link);
1332
0
}
1333
1334
/* Do the drop. */
1335
void
1336
DropIndex::prepare(struct alter_space *alter)
1337
0
{
1338
0
  if (old_index->def->iid == 0)
1339
0
    space_drop_primary_key(alter->new_space);
1340
0
}
1341
1342
void
1343
DropIndex::commit(struct alter_space *alter, int64_t signature)
1344
0
{
1345
0
  (void)alter;
1346
0
  index_commit_drop(old_index, signature);
1347
0
}
1348
1349
/**
1350
 * A no-op to preserve the old index data in the new space.
1351
 * Added to the alter specification when the index at hand
1352
 * is not affected by alter in any way.
1353
 */
1354
class MoveIndex: public AlterSpaceOp
1355
{
1356
public:
1357
  MoveIndex(struct alter_space *alter, uint32_t iid_arg)
1358
0
    :AlterSpaceOp(alter), iid(iid_arg) {}
1359
  /** id of the index on the move. */
1360
  uint32_t iid;
1361
  virtual void alter(struct alter_space *alter);
1362
  virtual void rollback(struct alter_space *alter);
1363
};
1364
1365
void
1366
MoveIndex::alter(struct alter_space *alter)
1367
0
{
1368
0
  space_swap_index(alter->old_space, alter->new_space, iid, iid);
1369
0
}
1370
1371
void
1372
MoveIndex::rollback(struct alter_space *alter)
1373
0
{
1374
0
  space_swap_index(alter->old_space, alter->new_space, iid, iid);
1375
0
}
1376
1377
/**
1378
 * Change non-essential properties of an index, i.e.
1379
 * properties not involving index data or layout on disk.
1380
 */
1381
class ModifyIndex: public AlterSpaceOp
1382
{
1383
public:
1384
  ModifyIndex(struct alter_space *alter,
1385
        struct index *index, struct index_def *def)
1386
0
    : AlterSpaceOp(alter), old_index(index),
1387
0
      new_index(NULL), new_index_def(def) {
1388
0
          if (new_index_def->iid == 0 &&
1389
0
              key_part_cmp(new_index_def->key_def->parts,
1390
0
                           new_index_def->key_def->part_count,
1391
0
                           old_index->def->key_def->parts,
1392
0
                           old_index->def->key_def->part_count) != 0) {
1393
                  /*
1394
                   * Primary parts have been changed -
1395
                   * update secondary indexes.
1396
                   */
1397
0
                  alter->pk_def = new_index_def->key_def;
1398
0
          }
1399
0
  }
1400
  struct index *old_index;
1401
  struct index *new_index;
1402
  struct index_def *new_index_def;
1403
  virtual void alter_def(struct alter_space *alter);
1404
  virtual void alter(struct alter_space *alter);
1405
  virtual void commit(struct alter_space *alter, int64_t lsn);
1406
  virtual void rollback(struct alter_space *alter);
1407
  virtual ~ModifyIndex();
1408
};
1409
1410
/** Update the definition of the new space */
1411
void
1412
ModifyIndex::alter_def(struct alter_space *alter)
1413
0
{
1414
0
  rlist_del_entry(old_index->def, link);
1415
0
  index_def_list_add(&alter->key_list, new_index_def);
1416
0
}
1417
1418
void
1419
ModifyIndex::alter(struct alter_space *alter)
1420
0
{
1421
0
  new_index = space_index(alter->new_space, new_index_def->iid);
1422
0
  assert(old_index->def->iid == new_index->def->iid);
1423
  /*
1424
   * Move the old index to the new space to preserve the
1425
   * original data, but use the new definition.
1426
   */
1427
0
  space_swap_index(alter->old_space, alter->new_space,
1428
0
       old_index->def->iid, new_index->def->iid);
1429
0
  SWAP(old_index, new_index);
1430
0
  SWAP(old_index->def, new_index->def);
1431
0
  index_update_def(new_index);
1432
0
}
1433
1434
void
1435
ModifyIndex::commit(struct alter_space *alter, int64_t signature)
1436
0
{
1437
0
  (void)alter;
1438
0
  index_commit_modify(new_index, signature);
1439
0
}
1440
1441
void
1442
ModifyIndex::rollback(struct alter_space *alter)
1443
0
{
1444
  /*
1445
   * Restore indexes.
1446
   */
1447
0
  space_swap_index(alter->old_space, alter->new_space,
1448
0
       old_index->def->iid, new_index->def->iid);
1449
0
  SWAP(old_index, new_index);
1450
0
  SWAP(old_index->def, new_index->def);
1451
0
  index_update_def(old_index);
1452
0
}
1453
1454
ModifyIndex::~ModifyIndex()
1455
0
{
1456
0
  index_def_delete(new_index_def);
1457
0
}
1458
1459
/** CreateIndex - add a new index to the space. */
1460
class CreateIndex: public AlterSpaceOp
1461
{
1462
  /** New index. */
1463
  struct index *new_index;
1464
  /** New index index_def. */
1465
  struct index_def *new_index_def;
1466
public:
1467
  CreateIndex(struct alter_space *alter, struct index_def *def)
1468
0
    :AlterSpaceOp(alter), new_index(NULL), new_index_def(def)
1469
0
  {}
1470
  virtual void alter_def(struct alter_space *alter);
1471
  virtual void prepare(struct alter_space *alter);
1472
  virtual void commit(struct alter_space *alter, int64_t lsn);
1473
  virtual ~CreateIndex();
1474
};
1475
1476
/** Add definition of the new key to the new space def. */
1477
void
1478
CreateIndex::alter_def(struct alter_space *alter)
1479
0
{
1480
0
  index_def_list_add(&alter->key_list, new_index_def);
1481
0
}
1482
1483
static inline void
1484
space_build_index_with_yield(struct space *old_space, struct space *new_space,
1485
           struct index *new_index)
1486
0
{
1487
0
  AlterYieldGuard guard(old_space);
1488
0
  space_build_index_xc(old_space, new_space, new_index);
1489
0
}
1490
1491
/**
1492
 * Optionally build the new index.
1493
 *
1494
 * During recovery the space is often not fully constructed yet
1495
 * anyway, so there is no need to fully populate index with data,
1496
 * it is done at the end of recovery.
1497
 *
1498
 * Note, that system spaces are exception to this, since
1499
 * they are fully enabled at all times.
1500
 */
1501
void
1502
CreateIndex::prepare(struct alter_space *alter)
1503
0
{
1504
  /* Get the new index and build it.  */
1505
0
  new_index = space_index(alter->new_space, new_index_def->iid);
1506
0
  assert(new_index != NULL);
1507
1508
0
  struct key_def *key_def = new_index->def->key_def;
1509
0
  if (!tuple_format_is_compatible_with_key_def(alter->new_space->format,
1510
0
                 key_def))
1511
0
    diag_raise();
1512
0
  if (new_index_def->iid == 0) {
1513
    /*
1514
     * Adding a primary key: bring the space
1515
     * up to speed with the current recovery
1516
     * state. During snapshot recovery it
1517
     * means preparing the primary key for
1518
     * build (beginBuild()). During xlog
1519
     * recovery, it means building the primary
1520
     * key. After recovery, it means building
1521
     * all keys.
1522
     */
1523
0
    space_add_primary_key_xc(alter->new_space);
1524
0
    return;
1525
0
  }
1526
0
  space_build_index_with_yield(alter->old_space, alter->new_space,
1527
0
             new_index);
1528
0
}
1529
1530
void
1531
CreateIndex::commit(struct alter_space *alter, int64_t signature)
1532
0
{
1533
0
  (void) alter;
1534
0
  assert(new_index != NULL);
1535
0
  index_commit_create(new_index, signature);
1536
0
  new_index = NULL;
1537
0
}
1538
1539
CreateIndex::~CreateIndex()
1540
0
{
1541
0
  if (new_index != NULL)
1542
0
    index_abort_create(new_index);
1543
0
  if (new_index_def != NULL)
1544
0
    index_def_delete(new_index_def);
1545
0
}
1546
1547
/**
1548
 * RebuildIndex - drop the old index data and rebuild index
1549
 * from by reading the primary key. Used when key_def of
1550
 * an index is changed.
1551
 */
1552
class RebuildIndex: public AlterSpaceOp
1553
{
1554
public:
1555
  RebuildIndex(struct alter_space *alter,
1556
         struct index_def *new_index_def_arg,
1557
         struct index_def *old_index_def_arg)
1558
0
    :AlterSpaceOp(alter), new_index(NULL),
1559
0
    new_index_def(new_index_def_arg),
1560
0
    old_index_def(old_index_def_arg)
1561
0
  {
1562
    /* We may want to rebuild secondary keys as well. */
1563
0
    if (new_index_def->iid == 0)
1564
0
      alter->pk_def = new_index_def->key_def;
1565
0
  }
1566
  /** New index. */
1567
  struct index *new_index;
1568
  /** New index index_def. */
1569
  struct index_def *new_index_def;
1570
  /** Old index index_def. */
1571
  struct index_def *old_index_def;
1572
  virtual void alter_def(struct alter_space *alter);
1573
  virtual void prepare(struct alter_space *alter);
1574
  virtual void commit(struct alter_space *alter, int64_t signature);
1575
  virtual ~RebuildIndex();
1576
};
1577
1578
/** Add definition of the new key to the new space def. */
1579
void
1580
RebuildIndex::alter_def(struct alter_space *alter)
1581
0
{
1582
0
  rlist_del_entry(old_index_def, link);
1583
0
  index_def_list_add(&alter->key_list, new_index_def);
1584
0
}
1585
1586
void
1587
RebuildIndex::prepare(struct alter_space *alter)
1588
0
{
1589
  /* Get the new index and build it.  */
1590
0
  new_index = space_index(alter->new_space, new_index_def->iid);
1591
0
  assert(new_index != NULL);
1592
0
  space_build_index_with_yield(alter->old_space, alter->new_space,
1593
0
             new_index);
1594
0
}
1595
1596
void
1597
RebuildIndex::commit(struct alter_space *alter, int64_t signature)
1598
0
{
1599
0
  struct index *old_index = space_index(alter->old_space,
1600
0
                old_index_def->iid);
1601
0
  assert(old_index != NULL);
1602
0
  index_commit_drop(old_index, signature);
1603
0
  assert(new_index != NULL);
1604
0
  index_commit_create(new_index, signature);
1605
0
  new_index = NULL;
1606
0
}
1607
1608
RebuildIndex::~RebuildIndex()
1609
0
{
1610
0
  if (new_index != NULL)
1611
0
    index_abort_create(new_index);
1612
0
  if (new_index_def != NULL)
1613
0
    index_def_delete(new_index_def);
1614
0
}
1615
1616
/**
1617
 * RebuildFuncIndex - prepare func index definition,
1618
 * drop the old index data and rebuild index from by reading the
1619
 * primary key.
1620
 */
1621
class RebuildFuncIndex: public RebuildIndex
1622
{
1623
  /*
1624
   * This method is used before the class is built (is used to calculate
1625
   * an argument for base class constructor), so it should be static.
1626
   */
1627
  static struct index_def *
1628
  func_index_def_new(struct index_def *index_def, struct func *func)
1629
0
  {
1630
0
    struct index_def *new_index_def = index_def_dup(index_def);
1631
0
    index_def_set_func(new_index_def, func);
1632
0
    return new_index_def;
1633
0
  }
1634
public:
1635
  RebuildFuncIndex(struct alter_space *alter,
1636
       struct index_def *old_index_def_arg, struct func *func) :
1637
0
    RebuildIndex(alter, func_index_def_new(old_index_def_arg, func),
1638
0
           old_index_def_arg) {}
1639
};
1640
1641
/**
1642
 * Drop the old index with data and create a disabled index in place of it.
1643
 */
1644
class DisableFuncIndex: public AlterSpaceOp
1645
{
1646
  struct index_def *new_index_def;
1647
  struct index_def *old_index_def;
1648
public:
1649
  DisableFuncIndex(struct alter_space *alter,
1650
       struct index_def *old_index_def_arg)
1651
0
    : AlterSpaceOp(alter), old_index_def(old_index_def_arg)
1652
0
  {
1653
    /* Functional indexes are only implemented in memtx. */
1654
0
    assert(!strcmp(alter->old_space->engine->name, "memtx"));
1655
0
    new_index_def = index_def_dup(old_index_def);
1656
0
    index_def_set_func(new_index_def, NULL);
1657
0
  }
1658
1659
  virtual void alter_def(struct alter_space *alter);
1660
  virtual ~DisableFuncIndex();
1661
};
1662
1663
void
1664
DisableFuncIndex::alter_def(struct alter_space *alter)
1665
0
{
1666
0
  rlist_del_entry(old_index_def, link);
1667
0
  index_def_list_add(&alter->key_list, new_index_def);
1668
0
}
1669
1670
DisableFuncIndex::~DisableFuncIndex()
1671
0
{
1672
0
  if (new_index_def != NULL)
1673
0
    index_def_delete(new_index_def);
1674
0
}
1675
1676
/** TruncateIndex - truncate an index. */
1677
class TruncateIndex: public AlterSpaceOp
1678
{
1679
  /** id of the index to truncate. */
1680
  uint32_t iid;
1681
  /**
1682
   * In case TRUNCATE fails, we need to clean up the new
1683
   * index data in the engine.
1684
   */
1685
  struct index *old_index;
1686
  struct index *new_index;
1687
public:
1688
  TruncateIndex(struct alter_space *alter, uint32_t iid)
1689
0
    : AlterSpaceOp(alter), iid(iid),
1690
0
      old_index(NULL), new_index(NULL) {}
1691
  virtual void prepare(struct alter_space *alter);
1692
  virtual void commit(struct alter_space *alter, int64_t signature);
1693
  virtual ~TruncateIndex();
1694
};
1695
1696
void
1697
TruncateIndex::prepare(struct alter_space *alter)
1698
0
{
1699
0
  old_index = space_index(alter->old_space, iid);
1700
0
  new_index = space_index(alter->new_space, iid);
1701
1702
0
  if (iid == 0) {
1703
    /*
1704
     * Notify the engine that the primary index
1705
     * was truncated.
1706
     */
1707
0
    space_drop_primary_key(alter->new_space);
1708
0
    space_add_primary_key_xc(alter->new_space);
1709
0
    return;
1710
0
  }
1711
1712
  /*
1713
   * Although the new index is empty, we still need to call
1714
   * space_build_index() to let the engine know that the
1715
   * index was recreated. For example, Vinyl uses this
1716
   * callback to load indexes during local recovery.
1717
   * No need to yield here since we build an empty index.
1718
   */
1719
0
  assert(new_index != NULL);
1720
0
  space_build_index_xc(alter->new_space, alter->new_space, new_index);
1721
0
}
1722
1723
void
1724
TruncateIndex::commit(struct alter_space *alter, int64_t signature)
1725
0
{
1726
0
  (void)alter;
1727
0
  index_commit_drop(old_index, signature);
1728
0
  index_commit_create(new_index, signature);
1729
0
  new_index = NULL;
1730
0
}
1731
1732
TruncateIndex::~TruncateIndex()
1733
0
{
1734
0
  if (new_index == NULL)
1735
0
    return;
1736
0
  index_abort_create(new_index);
1737
0
}
1738
1739
/**
1740
 * UpdateSchemaVersion - increment schema_version. Used on
1741
 * in alter_space_do(), i.e. when creating or dropping
1742
 * an index, altering a space.
1743
 */
1744
class UpdateSchemaVersion: public AlterSpaceOp
1745
{
1746
public:
1747
  UpdateSchemaVersion(struct alter_space * alter)
1748
0
    :AlterSpaceOp(alter) {}
1749
  virtual void alter(struct alter_space *alter);
1750
};
1751
1752
void
1753
UpdateSchemaVersion::alter(struct alter_space *alter)
1754
0
{
1755
0
    (void)alter;
1756
0
    box_schema_version_bump();
1757
0
}
1758
1759
/* }}} */
1760
1761
/**
1762
 * Delete the space. It is already removed from the space cache.
1763
 */
1764
static int
1765
on_drop_space_commit(struct trigger *trigger, void *event)
1766
0
{
1767
0
  (void) event;
1768
0
  struct space *space = (struct space *)trigger->data;
1769
0
  space_remove_temporary_triggers(space);
1770
0
  space_delete(space);
1771
0
  return 0;
1772
0
}
1773
1774
/**
1775
 * Return the original space back into the cache. The effect
1776
 * of all other events happened after the space was removed were
1777
 * reverted by the cascading rollback.
1778
 */
1779
static int
1780
on_drop_space_rollback(struct trigger *trigger, void *event)
1781
0
{
1782
0
  (void) event;
1783
0
  struct space *space = (struct space *)trigger->data;
1784
0
  space_cache_replace(NULL, space);
1785
0
  space_reattach_constraints(space);
1786
0
  space_pin_collations(space);
1787
0
  space_pin_defaults(space);
1788
0
  return 0;
1789
0
}
1790
1791
/**
1792
 * A trigger invoked on commit/rollback of DROP/ADD space.
1793
 * The trigger removes the space from the space cache.
1794
 *
1795
 * By the time the space is removed, it should be empty: we
1796
 * rely on cascading rollback.
1797
 */
1798
static int
1799
on_create_space_rollback(struct trigger *trigger, void *event)
1800
0
{
1801
0
  (void) event;
1802
0
  struct space *space = (struct space *)trigger->data;
1803
0
  space_cache_replace(space, NULL);
1804
0
  space_delete(space);
1805
0
  return 0;
1806
0
}
1807
1808
/**
1809
 * Create MoveIndex operation for a range of indexes in a space
1810
 * for range [begin, end)
1811
 */
1812
int
1813
alter_space_move_indexes(struct alter_space *alter, uint32_t begin,
1814
       uint32_t end)
1815
0
{
1816
0
  struct space *old_space = alter->old_space;
1817
0
  bool is_min_field_count_changed;
1818
0
  if (old_space->def->opts.upgrade_def != NULL) {
1819
    /*
1820
     * When space upgrade completes, we should unconditionally
1821
     * update optionality because current min_field_count value
1822
     * of a format respects min_field_count of format which space
1823
     * had before upgrade. It was needed to correctly index both
1824
     * old and new tuples.
1825
     */
1826
0
    is_min_field_count_changed = true;
1827
0
  } else {
1828
0
    is_min_field_count_changed =
1829
0
      old_space->format->min_field_count !=
1830
0
      alter->new_min_field_count;
1831
0
  }
1832
0
  for (uint32_t index_id = begin; index_id < end; ++index_id) {
1833
0
    struct index *old_index = space_index(old_space, index_id);
1834
0
    if (old_index == NULL)
1835
0
      continue;
1836
0
    struct index_def *old_def = old_index->def;
1837
0
    struct index_def *new_def;
1838
0
    uint32_t min_field_count = alter->new_min_field_count;
1839
0
    if (alter->pk_def == NULL || !index_depends_on_pk(old_index)) {
1840
0
      if (is_min_field_count_changed) {
1841
0
        new_def = index_def_dup(old_def);
1842
0
        index_def_update_optionality(new_def,
1843
0
                   min_field_count);
1844
0
        try {
1845
0
          (void) new ModifyIndex(alter, old_index, new_def);
1846
0
        } catch (Exception *e) {
1847
0
          return -1;
1848
0
        }
1849
0
      } else {
1850
0
        try {
1851
0
          (void) new MoveIndex(alter, old_def->iid);
1852
0
        } catch (Exception *e) {
1853
0
          return -1;
1854
0
        }
1855
0
      }
1856
0
      continue;
1857
0
    }
1858
    /*
1859
     * Rebuild secondary indexes that depend on the
1860
     * primary key since primary key parts have changed.
1861
     */
1862
0
    new_def = index_def_new(old_def->space_id, old_def->iid,
1863
0
          old_def->name, strlen(old_def->name),
1864
0
          old_def->space_name,
1865
0
          old_def->engine_name,
1866
0
          old_def->type, &old_def->opts,
1867
0
          old_def->key_def, alter->pk_def);
1868
0
    index_def_update_optionality(new_def, min_field_count);
1869
0
    auto guard = make_scoped_guard([=] { index_def_delete(new_def); });
1870
0
    if (!index_def_change_requires_rebuild(old_index, new_def)) {
1871
0
      try {
1872
0
        (void) new ModifyIndex(alter, old_index, new_def);
1873
0
      } catch (Exception *e) {
1874
0
        return -1;
1875
0
      }
1876
0
    } else {
1877
      /*
1878
       * Rebuild of several indexes in one statement is broken
1879
       * because we need to maintain consistency of built
1880
       * indexes while building the next ones, and we don't.
1881
       * So when build of the next index yields and new tuples
1882
       * appear in the space, already built indexes are not
1883
       * populated with them. So simply forbid such alters.
1884
       */
1885
0
      diag_set(ClientError, ER_UNSUPPORTED, "Tarantool",
1886
0
         "non-trivial alter of primary index along "
1887
0
         "with rebuild of dependent secondary "
1888
0
         "indexes");
1889
0
      return -1;
1890
0
    }
1891
0
    guard.is_active = false;
1892
0
  }
1893
0
  return 0;
1894
0
}
1895
1896
/**
1897
 * Walk through all spaces from 'FROM' clause of given select,
1898
 * and update their view reference counters.
1899
 *
1900
 * @param select Tables from this select to be updated.
1901
 * @param update_value +1 on view creation, -1 on drop.
1902
 * @retval 0 on success, -1 on error (diag is set).
1903
 */
1904
static int
1905
update_view_references(struct Select *select, int update_value)
1906
0
{
1907
0
  assert(update_value == 1 || update_value == -1);
1908
0
  struct SrcList *list = sql_select_expand_from_tables(select);
1909
0
  int from_tables_count = sql_src_list_entry_count(list);
1910
  /* Firstly check that everything is correct. */
1911
0
  for (int i = 0; i < from_tables_count; ++i) {
1912
0
    const char *space_name = sql_src_list_entry_name(list, i);
1913
0
    assert(space_name != NULL);
1914
    /*
1915
     * Views are allowed to contain CTEs. CTE is a
1916
     * temporary object, created and destroyed at SQL
1917
     * runtime (it is represented by an ephemeral
1918
     * table). So, it is absent in space cache and as
1919
     * a consequence we can't increment its reference
1920
     * counter. Skip iteration.
1921
     */
1922
0
    if (sql_select_constains_cte(select, space_name))
1923
0
      continue;
1924
0
    struct space *space = space_by_name0(space_name);
1925
0
    if (space == NULL) {
1926
0
      diag_set(ClientError, ER_NO_SUCH_SPACE, space_name);
1927
0
      goto error;
1928
0
    }
1929
0
    if (space_is_temporary(space)) {
1930
0
      diag_set(ClientError, ER_UNSUPPORTED,
1931
0
         "CREATE VIEW", "temporary spaces");
1932
0
      goto error;
1933
0
    }
1934
0
  }
1935
  /* Secondly do the job. */
1936
0
  for (int i = 0; i < from_tables_count; ++i) {
1937
0
    const char *space_name = sql_src_list_entry_name(list, i);
1938
    /* See comment before sql_select_constains_cte call above. */
1939
0
    if (sql_select_constains_cte(select, space_name))
1940
0
      continue;
1941
0
    struct space *space = space_by_name0(space_name);
1942
0
    assert(space->def->view_ref_count > 0 || update_value > 0);
1943
0
    space->def->view_ref_count += update_value;
1944
0
  }
1945
0
  sqlSrcListDelete(list);
1946
0
  return 0;
1947
0
error:
1948
0
  sqlSrcListDelete(list);
1949
0
  return -1;
1950
0
}
1951
1952
/**
1953
 * Trigger which is fired to commit creation of new SQL view.
1954
 * Its purpose is to release memory of SELECT.
1955
 */
1956
static int
1957
on_create_view_commit(struct trigger *trigger, void *event)
1958
0
{
1959
0
  (void) event;
1960
0
  struct Select *select = (struct Select *)trigger->data;
1961
0
  sql_select_delete(select);
1962
0
  return 0;
1963
0
}
1964
1965
/**
1966
 * Trigger which is fired to rollback creation of new SQL view.
1967
 * Decrements view reference counters of dependent spaces and
1968
 * releases memory for SELECT.
1969
 */
1970
static int
1971
on_create_view_rollback(struct trigger *trigger, void *event)
1972
0
{
1973
0
  (void) event;
1974
0
  struct Select *select = (struct Select *)trigger->data;
1975
0
  int rc = update_view_references(select, -1);
1976
0
  assert(rc == 0); (void)rc;
1977
0
  sql_select_delete(select);
1978
0
  return 0;
1979
0
}
1980
1981
/**
1982
 * Trigger which is fired to commit drop of SQL view.
1983
 * Its purpose is to decrement view reference counters of
1984
 * dependent spaces and release memory for SELECT.
1985
 */
1986
static int
1987
on_drop_view_commit(struct trigger *trigger, void *event)
1988
0
{
1989
0
  (void) event;
1990
0
  struct Select *select = (struct Select *)trigger->data;
1991
0
  sql_select_delete(select);
1992
0
  return 0;
1993
0
}
1994
1995
/**
1996
 * This trigger is invoked to rollback drop of SQL view.
1997
 * Release memory for struct SELECT compiled in
1998
 * on_replace_dd_space trigger.
1999
 */
2000
static int
2001
on_drop_view_rollback(struct trigger *trigger, void *event)
2002
0
{
2003
0
  (void) event;
2004
0
  struct Select *select = (struct Select *)trigger->data;
2005
0
  int rc = update_view_references(select, 1);
2006
0
  assert(rc == 0); (void)rc;
2007
0
  sql_select_delete(select);
2008
0
  return 0;
2009
0
}
2010
2011
/**
2012
 * Return -1 and set diag if the space is pinned by someone.
2013
 */
2014
static int
2015
space_check_pinned(struct space *space)
2016
0
{
2017
0
  enum space_cache_holder_type pinned_type;
2018
0
  if (space_cache_is_pinned(space, &pinned_type)) {
2019
0
    const char *type_str =
2020
0
      space_cache_holder_type_strs[pinned_type];
2021
0
    diag_set(ClientError, ER_ALTER_SPACE,
2022
0
       space_name(space),
2023
0
       tt_sprintf("space is referenced by %s", type_str));
2024
0
    return -1;
2025
0
  }
2026
0
  return 0;
2027
0
}
2028
2029
/**
2030
 * Check whether @a space holders prohibit truncate of the space.
2031
 * For example truncation in not allowed if another non-empty space refers
2032
 * to this space via foreign key link.
2033
 * Return 0 if allowed, or -1 if not allowed (diag is set).
2034
 */
2035
static int
2036
space_check_truncate(struct space *space)
2037
0
{
2038
  /* Check for foreign keys that refers to this space. */
2039
0
  struct space_cache_holder *h;
2040
0
  rlist_foreach_entry(h, &space->space_cache_pin_list, link) {
2041
0
    if (h->selfpin)
2042
0
      continue;
2043
0
    if (h->type != SPACE_HOLDER_FOREIGN_KEY)
2044
0
      continue;
2045
0
    struct tuple_constraint *constr =
2046
0
      container_of(h, struct tuple_constraint,
2047
0
             space_cache_holder);
2048
0
    struct space *other_space = constr->space;
2049
    /*
2050
     * If the referring space is empty then the truncate can't
2051
     * break foreign key consistency.
2052
     */
2053
0
    if (space_bsize(other_space) == 0)
2054
0
      continue;
2055
0
    const char *type_str =
2056
0
      space_cache_holder_type_strs[h->type];
2057
0
    diag_set(ClientError, ER_ALTER_SPACE,
2058
0
       space_name(space),
2059
0
       tt_sprintf("space is referenced by %s", type_str));
2060
0
    return -1;
2061
0
  }
2062
0
  return 0;
2063
0
}
2064
2065
/**
2066
 * Check whether @a old_space holders prohibit alter to @a new_space_def.
2067
 * For example if the space becomes data-temporary, there can be foreign keys
2068
 * from non-data-temporary space, so this alter must not be allowed.
2069
 * Return 0 if allowed, or -1 if not allowed (diag is set).
2070
 */
2071
static int
2072
space_check_alter(struct space *old_space, struct space_def *new_space_def)
2073
0
{
2074
  /*
2075
   * group_id, which is currently used for defining local spaces, is
2076
   * now can't be changed; if it could, an additional check would be
2077
   * required below.
2078
   */
2079
0
  assert(old_space->def->opts.group_id == new_space_def->opts.group_id);
2080
  /* Only alter from non-data-temporary to data-temporary can cause
2081
   * problems.
2082
   */
2083
0
  if (space_is_data_temporary(old_space) ||
2084
0
      !space_opts_is_data_temporary(&new_space_def->opts))
2085
0
    return 0;
2086
  /* Check for foreign keys that refers to this space. */
2087
0
  struct space_cache_holder *h;
2088
0
  rlist_foreach_entry(h, &old_space->space_cache_pin_list, link) {
2089
0
    if (h->selfpin)
2090
0
      continue;
2091
0
    if (h->type != SPACE_HOLDER_FOREIGN_KEY)
2092
0
      continue;
2093
0
    struct tuple_constraint *constr =
2094
0
      container_of(h, struct tuple_constraint,
2095
0
             space_cache_holder);
2096
0
    struct space *other_space = constr->space;
2097
    /*
2098
     * If the referring space is data-temporary too then the alter
2099
     * can't break foreign key consistency after restart.
2100
     */
2101
0
    if (space_opts_is_data_temporary(&other_space->def->opts))
2102
0
      continue;
2103
0
    diag_set(ClientError, ER_ALTER_SPACE,
2104
0
       space_name(old_space),
2105
0
       tt_sprintf("foreign key '%s' from non-data-temporary"
2106
0
            " space '%s' can't refer to data-temporary"
2107
0
            " space",
2108
0
            constr->def.name, space_name(other_space)));
2109
0
    return -1;
2110
0
  }
2111
0
  return 0;
2112
0
}
2113
2114
/*
2115
 * box_process1() bypasses the read-only check for the _space system space
2116
 * because there it's not yet known if the related space is temporary. Perform
2117
 * the check here if the space isn't temporary and the statement was issued by
2118
 * this replica.
2119
 */
2120
static int
2121
filter_temporary_ddl_stmt(struct txn *txn, const struct space_def *def)
2122
0
{
2123
0
  if (def == NULL)
2124
0
    return 0;
2125
0
  struct txn_stmt *stmt = txn_current_stmt(txn);
2126
0
  if (space_opts_is_temporary(&def->opts)) {
2127
0
    txn_stmt_mark_as_temporary(txn, stmt);
2128
0
    return 0;
2129
0
  }
2130
0
  if (stmt->row->replica_id == 0 && recovery_state != INITIAL_RECOVERY)
2131
0
    return box_check_writable();
2132
0
  return 0;
2133
0
}
2134
2135
/**
2136
 * A trigger which is invoked on replace in a data dictionary
2137
 * space _space.
2138
 *
2139
 * Generally, whenever a data dictionary change occurs
2140
 * 2 things should be done:
2141
 *
2142
 * - space cache should be updated
2143
 *
2144
 * - the space which is changed should be rebuilt according
2145
 *   to the nature of the modification, i.e. indexes added/dropped,
2146
 *   tuple format changed, etc.
2147
 *
2148
 * When dealing with an update of _space space, we have 3 major
2149
 * cases:
2150
 *
2151
 * 1) insert a new tuple: creates a new space
2152
 *    The trigger prepares a space structure to insert
2153
 *    into the  space cache and registers an on commit
2154
 *    hook to perform the registration. Should the statement
2155
 *    itself fail, transaction is rolled back, the transaction
2156
 *    rollback hook must be there to delete the created space
2157
 *    object, avoiding a memory leak. The hooks are written
2158
 *    in a way that excludes the possibility of a failure.
2159
 *
2160
 * 2) delete a tuple: drops an existing space.
2161
 *
2162
 *    A space can be dropped only if it has no indexes.
2163
 *    The only reason for this restriction is that there
2164
 *    must be no tuples in _index without a corresponding tuple
2165
 *    in _space. It's not possible to delete such tuples
2166
 *    automatically (this would require multi-statement
2167
 *    transactions), so instead the trigger verifies that the
2168
 *    records have been deleted by the user.
2169
 *
2170
 *    Then the trigger registers transaction commit hook to
2171
 *    perform the deletion from the space cache.  No rollback hook
2172
 *    is required: if the transaction is rolled back, nothing is
2173
 *    done.
2174
 *
2175
 * 3) modify an existing tuple: some space
2176
 *    properties are immutable, but it's OK to change
2177
 *    space name or field count. This is done in WAL-error-
2178
 *    safe mode.
2179
 *
2180
 * A note about memcached_space: Tarantool 1.4 had a check
2181
 * which prevented re-definition of memcached_space. With
2182
 * dynamic space configuration such a check would be particularly
2183
 * clumsy, so it is simply not done.
2184
 */
2185
static int
2186
on_replace_dd_space(struct trigger * /* trigger */, void *event)
2187
0
{
2188
0
  struct txn *txn = (struct txn *) event;
2189
0
  struct txn_stmt *stmt = txn_current_stmt(txn);
2190
0
  struct tuple *old_tuple = stmt->old_tuple;
2191
0
  struct tuple *new_tuple = stmt->new_tuple;
2192
0
  struct region *region = &fiber()->gc;
2193
  /*
2194
   * Things to keep in mind:
2195
   * - old_tuple is set only in case of UPDATE.  For INSERT
2196
   *   or REPLACE it is NULL.
2197
   * - the trigger may be called inside recovery from a snapshot,
2198
   *   when index look up is not possible
2199
   * - _space, _index and other metaspaces initially don't
2200
   *   have a tuple which represents it, this tuple is only
2201
   *   created during recovery from a snapshot.
2202
   *
2203
   * Let's establish whether an old space exists. Use
2204
   * old_tuple ID field, if old_tuple is set, since UPDATE
2205
   * may have changed space id.
2206
   */
2207
0
  uint32_t old_id;
2208
0
  if (tuple_field_u32(old_tuple ? old_tuple : new_tuple,
2209
0
          BOX_SPACE_FIELD_ID, &old_id) != 0)
2210
0
    return -1;
2211
0
  struct space *old_space = space_by_id(old_id);
2212
0
  struct space_def *def = NULL;
2213
0
  if (new_tuple != NULL) {
2214
0
    uint32_t errcode = (old_tuple == NULL) ?
2215
0
      ER_CREATE_SPACE : ER_ALTER_SPACE;
2216
0
    def = space_def_new_from_tuple(new_tuple, errcode, region);
2217
0
  }
2218
0
  auto def_guard = make_scoped_guard([=] {
2219
0
    if (def != NULL)
2220
0
      space_def_delete(def);
2221
0
  });
2222
0
  if (filter_temporary_ddl_stmt(txn, old_space != NULL ?
2223
0
              old_space->def : def) != 0)
2224
0
    return -1;
2225
0
  if (new_tuple != NULL && old_space == NULL) { /* INSERT */
2226
0
    if (def == NULL)
2227
0
      return -1;
2228
0
    if (access_check_ddl(def->name, def->uid, NULL,
2229
0
             SC_SPACE, PRIV_C) != 0)
2230
0
      return -1;
2231
0
    RLIST_HEAD(empty_list);
2232
0
    struct space *space = space_new(def, &empty_list);
2233
0
    if (space == NULL)
2234
0
      return -1;
2235
    /**
2236
     * The new space must be inserted in the space
2237
     * cache right away to achieve linearisable
2238
     * execution on a replica.
2239
     */
2240
0
    space_cache_replace(NULL, space);
2241
    /*
2242
     * Do not forget to update schema_version right after
2243
     * inserting the space to the space_cache, since no
2244
     * AlterSpaceOps are registered in case of space
2245
     * create.
2246
     */
2247
0
    box_schema_version_bump();
2248
    /*
2249
     * So may happen that until the DDL change record
2250
     * is written to the WAL, the space is used for
2251
     * insert/update/delete. All these updates are
2252
     * rolled back by the pipelined rollback mechanism,
2253
     * so it's safe to simply drop the space on
2254
     * rollback.
2255
     */
2256
0
    struct trigger *on_rollback =
2257
0
      txn_alter_trigger_new(on_create_space_rollback, space);
2258
0
    if (on_rollback == NULL)
2259
0
      return -1;
2260
0
    txn_stmt_on_rollback(stmt, on_rollback);
2261
0
    if (def->opts.is_view) {
2262
0
      struct Select *select = sql_view_compile(def->opts.sql);
2263
0
      if (select == NULL)
2264
0
        return -1;
2265
0
      auto select_guard = make_scoped_guard([=] {
2266
0
        sql_select_delete(select);
2267
0
      });
2268
0
      if (update_view_references(select, 1) != 0)
2269
0
        return -1;
2270
0
      struct trigger *on_commit_view =
2271
0
        txn_alter_trigger_new(on_create_view_commit,
2272
0
                  select);
2273
0
      if (on_commit_view == NULL)
2274
0
        return -1;
2275
0
      txn_stmt_on_commit(stmt, on_commit_view);
2276
0
      struct trigger *on_rollback_view =
2277
0
        txn_alter_trigger_new(on_create_view_rollback,
2278
0
                  select);
2279
0
      if (on_rollback_view == NULL)
2280
0
        return -1;
2281
0
      txn_stmt_on_rollback(stmt, on_rollback_view);
2282
0
      select_guard.is_active = false;
2283
0
    }
2284
0
  } else if (new_tuple == NULL) { /* DELETE */
2285
0
    if (access_check_ddl(old_space->def->name, old_space->def->uid,
2286
0
             old_space->access, SC_SPACE, PRIV_D) != 0)
2287
0
      return -1;
2288
    /* Verify that the space is empty (has no indexes) */
2289
0
    if (old_space->index_count) {
2290
0
      diag_set(ClientError, ER_DROP_SPACE,
2291
0
          space_name(old_space),
2292
0
          "the space has indexes");
2293
0
      return -1;
2294
0
    }
2295
0
    bool out;
2296
0
    if (schema_find_grants("space", old_space->def->id, &out) != 0) {
2297
0
      return -1;
2298
0
    }
2299
0
    if (out) {
2300
0
      diag_set(ClientError, ER_DROP_SPACE,
2301
0
          space_name(old_space),
2302
0
          "the space has grants");
2303
0
      return -1;
2304
0
    }
2305
0
    if (space_has_data(BOX_TRUNCATE_ID, 0, old_space->def->id, &out) != 0)
2306
0
      return -1;
2307
0
    if (out) {
2308
0
      diag_set(ClientError, ER_DROP_SPACE,
2309
0
          space_name(old_space),
2310
0
          "the space has truncate record");
2311
0
      return -1;
2312
0
    }
2313
0
    if (old_space->def->view_ref_count > 0) {
2314
0
      diag_set(ClientError, ER_DROP_SPACE,
2315
0
          space_name(old_space),
2316
0
          "other views depend on this space");
2317
0
      return -1;
2318
0
    }
2319
    /* Check whether old_space is used somewhere. */
2320
0
    if (space_check_pinned(old_space) != 0)
2321
0
      return -1;
2322
    /* One can't just remove a system space. */
2323
0
    if (!dd_check_is_disabled() &&
2324
0
        space_is_system(old_space)) {
2325
0
      diag_set(ClientError, ER_DROP_SPACE,
2326
0
         space_name(old_space),
2327
0
         "the space is a system space");
2328
0
      return -1;
2329
0
    }
2330
    /**
2331
     * We need to unpin spaces that are referenced by deleted one.
2332
     * Let's detach space constraints - they will be deleted
2333
     * on commit or reattached on rollback.
2334
     */
2335
0
    space_detach_constraints(old_space);
2336
0
    space_unpin_collations(old_space);
2337
0
    space_unpin_defaults(old_space);
2338
    /**
2339
     * The space must be deleted from the space
2340
     * cache right away to achieve linearisable
2341
     * execution on a replica.
2342
     */
2343
0
    space_cache_replace(old_space, NULL);
2344
    /*
2345
     * Do not forget to update schema_version right after
2346
     * deleting the space from the space_cache, since no
2347
     * AlterSpaceOps are registered in case of space drop.
2348
     */
2349
0
    box_schema_version_bump();
2350
0
    struct trigger *on_commit =
2351
0
      txn_alter_trigger_new(on_drop_space_commit, old_space);
2352
0
    if (on_commit == NULL)
2353
0
      return -1;
2354
0
    txn_stmt_on_commit(stmt, on_commit);
2355
0
    struct trigger *on_rollback =
2356
0
      txn_alter_trigger_new(on_drop_space_rollback, old_space);
2357
0
    if (on_rollback == NULL)
2358
0
      return -1;
2359
0
    txn_stmt_on_rollback(stmt, on_rollback);
2360
0
    if (old_space->def->opts.is_view) {
2361
0
      struct Select *select =
2362
0
        sql_view_compile(old_space->def->opts.sql);
2363
0
      if (select == NULL)
2364
0
        return -1;
2365
0
      auto select_guard = make_scoped_guard([=] {
2366
0
        sql_select_delete(select);
2367
0
      });
2368
0
      struct trigger *on_commit_view =
2369
0
        txn_alter_trigger_new(on_drop_view_commit,
2370
0
                  select);
2371
0
      if (on_commit_view == NULL)
2372
0
        return -1;
2373
0
      txn_stmt_on_commit(stmt, on_commit_view);
2374
0
      struct trigger *on_rollback_view =
2375
0
        txn_alter_trigger_new(on_drop_view_rollback,
2376
0
                  select);
2377
0
      if (on_rollback_view == NULL)
2378
0
        return -1;
2379
0
      txn_stmt_on_rollback(stmt, on_rollback_view);
2380
0
      int rc = update_view_references(select, -1);
2381
0
      assert(rc == 0); (void)rc;
2382
0
      select_guard.is_active = false;
2383
0
    }
2384
0
  } else { /* UPDATE, REPLACE */
2385
0
    assert(old_space != NULL && new_tuple != NULL);
2386
0
    if (old_space->def->opts.is_view) {
2387
0
      diag_set(ClientError, ER_ALTER_SPACE,
2388
0
         space_name(old_space),
2389
0
         "view can not be altered");
2390
0
      return -1;
2391
0
    }
2392
0
    if (def == NULL)
2393
0
      return -1;
2394
0
    if (access_check_ddl(def->name, def->uid, old_space->access,
2395
0
             SC_SPACE, PRIV_A) != 0)
2396
0
      return -1;
2397
0
    if (def->id != space_id(old_space)) {
2398
0
      diag_set(ClientError, ER_ALTER_SPACE,
2399
0
          space_name(old_space),
2400
0
          "space id is immutable");
2401
0
      return -1;
2402
0
    }
2403
0
    if (strcmp(def->engine_name, old_space->def->engine_name) != 0) {
2404
0
      diag_set(ClientError, ER_ALTER_SPACE,
2405
0
          space_name(old_space),
2406
0
          "can not change space engine");
2407
0
      return -1;
2408
0
    }
2409
0
    if (def->opts.group_id != space_group_id(old_space)) {
2410
0
      diag_set(ClientError, ER_ALTER_SPACE,
2411
0
          space_name(old_space),
2412
0
          "replication group is immutable");
2413
0
      return -1;
2414
0
    }
2415
0
    if (space_is_temporary(old_space) !=
2416
0
         space_opts_is_temporary(&def->opts)) {
2417
0
      diag_set(ClientError, ER_ALTER_SPACE,
2418
0
         old_space->def->name,
2419
0
         "temporariness cannot change");
2420
0
      return -1;
2421
0
    }
2422
0
    if (def->opts.is_view != old_space->def->opts.is_view) {
2423
0
      diag_set(ClientError, ER_ALTER_SPACE,
2424
0
          space_name(old_space),
2425
0
          "can not convert a space to "
2426
0
          "a view and vice versa");
2427
0
      return -1;
2428
0
    }
2429
0
    if (strcmp(def->name, old_space->def->name) != 0 &&
2430
0
        old_space->def->view_ref_count > 0) {
2431
0
      diag_set(ClientError, ER_ALTER_SPACE,
2432
0
          space_name(old_space),
2433
0
          "can not rename space which is referenced by "
2434
0
          "view");
2435
0
      return -1;
2436
0
    }
2437
2438
0
    if (space_check_alter(old_space, def) != 0)
2439
0
      return -1;
2440
2441
    /*
2442
     * Allow change of space properties, but do it
2443
     * in WAL-error-safe mode.
2444
     */
2445
0
    struct alter_space *alter = alter_space_new(old_space);
2446
0
    if (alter == NULL)
2447
0
      return -1;
2448
0
    auto alter_guard =
2449
0
      make_scoped_guard([=] {alter_space_delete(alter);});
2450
    /*
2451
     * Calculate a new min_field_count. It can be
2452
     * changed by resetting space:format(), if an old
2453
     * format covers some nullable indexed fields in
2454
     * the format tail. And when the format is reset,
2455
     * these fields become optional - index
2456
     * comparators must be updated.
2457
     */
2458
0
    struct key_def **keys = NULL;
2459
0
    RegionGuard region_guard(&fiber()->gc);
2460
0
    if (old_space->index_count > 0)
2461
0
      keys = xregion_alloc_array(&fiber()->gc,
2462
0
               typeof(keys[0]),
2463
0
               old_space->index_count);
2464
0
    for (uint32_t i = 0; i < old_space->index_count; ++i)
2465
0
      keys[i] = old_space->index[i]->def->key_def;
2466
0
    alter->new_min_field_count =
2467
0
      tuple_format_min_field_count(keys,
2468
0
                 old_space->index_count,
2469
0
                 def->fields,
2470
0
                 def->field_count);
2471
    /*
2472
     * In the case of space upgrade both old and new tuples will
2473
     * be indexed, so new_min_field_count should be the minimal
2474
     * min_field_count of old and new formats.
2475
     * Space upgrade does a replace in this space without
2476
     * upgrade_def on completion, so actual min_field_count will
2477
     * be set when space upgrade completes.
2478
     */
2479
0
    if (def->opts.upgrade_def != NULL) {
2480
0
      alter->new_min_field_count = MIN(
2481
0
        alter->new_min_field_count,
2482
0
        old_space->format->min_field_count);
2483
0
    }
2484
0
    try {
2485
0
      (void) new CheckSpaceFormat(alter);
2486
0
      (void) new ModifySpace(alter, def);
2487
0
    } catch (Exception *e) {
2488
0
      return -1;
2489
0
    }
2490
0
    def_guard.is_active = false;
2491
    /* Create MoveIndex ops for all space indexes. */
2492
0
    if (alter_space_move_indexes(alter, 0,
2493
0
        old_space->index_id_max + 1) != 0)
2494
0
      return -1;
2495
0
    try {
2496
      /* Remember to update schema_version. */
2497
0
      (void) new UpdateSchemaVersion(alter);
2498
0
      alter_space_do(stmt, alter);
2499
0
    } catch (Exception *e) {
2500
0
      return -1;
2501
0
    }
2502
0
    alter_guard.is_active = false;
2503
0
  }
2504
0
  return 0;
2505
0
}
2506
2507
/**
2508
 * Just like with _space, 3 major cases:
2509
 *
2510
 * - insert a tuple = addition of a new index. The
2511
 *   space should exist.
2512
 *
2513
 * - delete a tuple - drop index.
2514
 *
2515
 * - update a tuple - change of index type or key parts.
2516
 *   Change of index type is the same as deletion of the old
2517
 *   index and addition of the new one.
2518
 *
2519
 *   A new index needs to be built before we attempt to commit
2520
 *   a record to the write ahead log, since:
2521
 *
2522
 *   1) if it fails, it's not good to end up with a corrupt index
2523
 *   which is already committed to WAL
2524
 *
2525
 *   2) Tarantool indexes also work as constraints (min number of
2526
 *   fields in the space, field uniqueness), and it's not good to
2527
 *   commit to WAL a constraint which is not enforced in the
2528
 *   current data set.
2529
 *
2530
 *   When adding a new index, ideally we'd also need to rebuild
2531
 *   all tuple formats in all tuples, since the old format may not
2532
 *   be ideal for the new index. We, however, do not do that,
2533
 *   since that would entail rebuilding all indexes at once.
2534
 *   Instead, the default tuple format of the space is changed,
2535
 *   and as tuples get updated/replaced, all tuples acquire a new
2536
 *   format.
2537
 *
2538
 *   The same is the case with dropping an index: nothing is
2539
 *   rebuilt right away, but gradually the extra space reserved
2540
 *   for offsets is relinquished to the slab allocator as tuples
2541
 *   are modified.
2542
 */
2543
static int
2544
on_replace_dd_index(struct trigger * /* trigger */, void *event)
2545
0
{
2546
0
  struct txn *txn = (struct txn *) event;
2547
0
  struct txn_stmt *stmt = txn_current_stmt(txn);
2548
0
  struct tuple *old_tuple = stmt->old_tuple;
2549
0
  struct tuple *new_tuple = stmt->new_tuple;
2550
0
  uint32_t id, iid;
2551
0
  if (tuple_field_u32(old_tuple ? old_tuple : new_tuple,
2552
0
          BOX_INDEX_FIELD_SPACE_ID, &id) != 0)
2553
0
    return -1;
2554
0
  if (tuple_field_u32(old_tuple ? old_tuple : new_tuple,
2555
0
          BOX_INDEX_FIELD_ID, &iid) != 0)
2556
0
    return -1;
2557
0
  struct space *old_space = space_cache_find(id);
2558
0
  if (old_space == NULL)
2559
0
    return -1;
2560
0
  if (filter_temporary_ddl_stmt(txn, old_space->def) != 0)
2561
0
    return -1;
2562
0
  if (old_space->def->opts.is_view) {
2563
0
    diag_set(ClientError, ER_ALTER_SPACE, space_name(old_space),
2564
0
        "can not add index on a view");
2565
0
    return -1;
2566
0
  }
2567
0
  enum priv_type priv_type = new_tuple ? PRIV_C : PRIV_D;
2568
0
  if (old_tuple && new_tuple)
2569
0
    priv_type = PRIV_A;
2570
0
  if (access_check_ddl(old_space->def->name, old_space->def->uid,
2571
0
           old_space->access, SC_SPACE, priv_type) != 0)
2572
0
    return -1;
2573
0
  struct index *old_index = space_index(old_space, iid);
2574
2575
  /*
2576
   * Deal with various cases of dropping of the primary key.
2577
   */
2578
0
  if (iid == 0 && new_tuple == NULL) {
2579
    /*
2580
     * Dropping the primary key in a system space: off limits.
2581
     */
2582
0
    if (!dd_check_is_disabled() &&
2583
0
        space_is_system(old_space)) {
2584
0
      diag_set(ClientError, ER_LAST_DROP,
2585
0
          space_name(old_space));
2586
0
      return -1;
2587
0
    }
2588
    /*
2589
     * Can't drop primary key before secondary keys.
2590
     */
2591
0
    if (old_space->index_count > 1) {
2592
0
      diag_set(ClientError, ER_DROP_PRIMARY_KEY,
2593
0
          space_name(old_space));
2594
0
      return -1;
2595
0
    }
2596
    /*
2597
     * Can't drop primary key before space sequence.
2598
     */
2599
0
    if (old_space->sequence != NULL) {
2600
0
      diag_set(ClientError, ER_ALTER_SPACE,
2601
0
          space_name(old_space),
2602
0
          "can not drop primary key while "
2603
0
          "space sequence exists");
2604
0
      return -1;
2605
0
    }
2606
    /*
2607
     * Check space's holders.
2608
     */
2609
0
    if (space_check_truncate(old_space) != 0)
2610
0
      return -1;
2611
0
  }
2612
2613
0
  if (iid != 0 && space_index(old_space, 0) == NULL) {
2614
    /*
2615
     * A secondary index can not be created without
2616
     * a primary key.
2617
     */
2618
0
    diag_set(ClientError, ER_ALTER_SPACE,
2619
0
        space_name(old_space),
2620
0
        "can not add a secondary key before primary");
2621
0
    return -1;
2622
0
  }
2623
2624
0
  struct alter_space *alter = alter_space_new(old_space);
2625
0
  if (alter == NULL)
2626
0
    return -1;
2627
0
  auto scoped_guard =
2628
0
    make_scoped_guard([=] { alter_space_delete(alter); });
2629
2630
  /*
2631
   * Handle the following 4 cases:
2632
   * 1. Simple drop of an index.
2633
   * 2. Creation of a new index: primary or secondary.
2634
   * 3. Change of an index which does not require a rebuild.
2635
   * 4. Change of an index which does require a rebuild.
2636
   */
2637
  /* Case 1: drop the index, if it is dropped. */
2638
0
  if (old_index != NULL && new_tuple == NULL) {
2639
0
    if (alter_space_move_indexes(alter, 0, iid) != 0)
2640
0
      return -1;
2641
0
    try {
2642
0
      (void) new DropIndex(alter, old_index);
2643
0
    } catch (Exception *e) {
2644
0
      return -1;
2645
0
    }
2646
0
  }
2647
  /* Case 2: create an index, if it is simply created. */
2648
0
  if (old_index == NULL && new_tuple != NULL) {
2649
0
    if (alter_space_move_indexes(alter, 0, iid))
2650
0
      return -1;
2651
0
    struct index_def *def =
2652
0
      index_def_new_from_tuple(new_tuple, old_space);
2653
0
    if (def == NULL)
2654
0
      return -1;
2655
0
    index_def_update_optionality(def, alter->new_min_field_count);
2656
0
    try {
2657
0
      (void) new CreateIndex(alter, def);
2658
0
    } catch (Exception *e) {
2659
0
      index_def_delete(def);
2660
0
      return -1;
2661
0
    }
2662
0
  }
2663
  /* Case 3 and 4: check if we need to rebuild index data. */
2664
0
  if (old_index != NULL && new_tuple != NULL) {
2665
0
    struct index_def *index_def;
2666
0
    index_def = index_def_new_from_tuple(new_tuple, old_space);
2667
0
    if (index_def == NULL)
2668
0
      return -1;
2669
0
    auto index_def_guard =
2670
0
      make_scoped_guard([=] { index_def_delete(index_def); });
2671
    /*
2672
     * To detect which key parts are optional,
2673
     * min_field_count is required. But
2674
     * min_field_count from the old space format can
2675
     * not be used. For example, consider the case,
2676
     * when a space has no format, has a primary index
2677
     * on the first field and has a single secondary
2678
     * index on a non-nullable second field. Min field
2679
     * count here is 2. Now alter the secondary index
2680
     * to make its part be nullable. In the
2681
     * 'old_space' min_field_count is still 2, but
2682
     * actually it is already 1. Actual
2683
     * min_field_count must be calculated using old
2684
     * unchanged indexes, NEW definition of an updated
2685
     * index and a space format, defined by a user.
2686
     */
2687
0
    struct key_def **keys;
2688
0
    size_t bsize;
2689
0
    RegionGuard region_guard(&fiber()->gc);
2690
0
    keys = region_alloc_array(&fiber()->gc, typeof(keys[0]),
2691
0
            old_space->index_count, &bsize);
2692
0
    if (keys == NULL) {
2693
0
      diag_set(OutOfMemory, bsize, "region_alloc_array",
2694
0
         "keys");
2695
0
      return -1;
2696
0
    }
2697
0
    for (uint32_t i = 0, j = 0; i < old_space->index_count; ++i) {
2698
0
      struct index_def *d = old_space->index[i]->def;
2699
0
      if (d->iid != index_def->iid)
2700
0
        keys[j++] = d->key_def;
2701
0
      else
2702
0
        keys[j++] = index_def->key_def;
2703
0
    }
2704
0
    struct space_def *def = old_space->def;
2705
0
    alter->new_min_field_count =
2706
0
      tuple_format_min_field_count(keys,
2707
0
                 old_space->index_count,
2708
0
                 def->fields,
2709
0
                 def->field_count);
2710
0
    index_def_update_optionality(index_def,
2711
0
               alter->new_min_field_count);
2712
0
    if (alter_space_move_indexes(alter, 0, iid))
2713
0
      return -1;
2714
0
    if (index_def_is_equal(index_def, old_index->def)) {
2715
      /* Index is not changed so just move it. */
2716
0
      try {
2717
0
        (void) new MoveIndex(alter, old_index->def->iid);
2718
0
      } catch (Exception *e) {
2719
0
        return -1;
2720
0
      }
2721
2722
0
    } else if (index_def_change_requires_rebuild(old_index,
2723
0
                   index_def)) {
2724
      /*
2725
       * Operation demands an index rebuild.
2726
       */
2727
0
      try {
2728
0
        (void) new RebuildIndex(alter, index_def,
2729
0
              old_index->def);
2730
0
      } catch (Exception *e) {
2731
0
        return -1;
2732
0
      }
2733
0
      index_def_guard.is_active = false;
2734
0
    } else {
2735
      /*
2736
       * Operation can be done without index rebuild,
2737
       * but we still need to check that tuples stored
2738
       * in the space conform to the new format.
2739
       */
2740
0
      try {
2741
0
        (void) new CheckSpaceFormat(alter);
2742
0
        (void) new ModifyIndex(alter, old_index, index_def);
2743
0
      } catch (Exception *e) {
2744
0
        return -1;
2745
0
      }
2746
0
      index_def_guard.is_active = false;
2747
0
    }
2748
0
  }
2749
  /*
2750
   * Create MoveIndex ops for the remaining indexes in the
2751
   * old space.
2752
   */
2753
0
  if (alter_space_move_indexes(alter, iid + 1, old_space->index_id_max + 1) != 0)
2754
0
    return -1;
2755
0
  try {
2756
    /* Add an op to update schema_version on commit. */
2757
0
    (void) new UpdateSchemaVersion(alter);
2758
0
    alter_space_do(stmt, alter);
2759
0
  } catch (Exception *e) {
2760
0
    return -1;
2761
0
  }
2762
0
  scoped_guard.is_active = false;
2763
0
  return 0;
2764
0
}
2765
2766
/**
2767
 * A trigger invoked on replace in space _truncate.
2768
 *
2769
 * In a nutshell, we truncate a space by replacing it with
2770
 * a new empty space with the same definition and indexes.
2771
 * Note, although we instantiate the new space before WAL
2772
 * write, we don't propagate changes to the old space in
2773
 * case a WAL write error happens and we have to rollback.
2774
 * This is OK, because a WAL write error implies cascading
2775
 * rollback of all transactions following this one.
2776
 */
2777
static int
2778
on_replace_dd_truncate(struct trigger * /* trigger */, void *event)
2779
0
{
2780
0
  struct txn *txn = (struct txn *) event;
2781
0
  struct txn_stmt *stmt = txn_current_stmt(txn);
2782
0
  struct tuple *new_tuple = stmt->new_tuple;
2783
2784
0
  if (recovery_state == INITIAL_RECOVERY) {
2785
    /* Space creation during initial recovery - nothing to do. */
2786
0
    return 0;
2787
0
  }
2788
2789
0
  struct tuple *any_tuple = new_tuple;
2790
0
  if (any_tuple == NULL)
2791
0
    any_tuple = stmt->old_tuple;
2792
0
  uint32_t space_id;
2793
0
  if (tuple_field_u32(any_tuple, BOX_TRUNCATE_FIELD_SPACE_ID,
2794
0
          &space_id) != 0)
2795
0
    return -1;
2796
0
  struct space *old_space = space_cache_find(space_id);
2797
0
  if (old_space == NULL)
2798
0
    return -1;
2799
0
  if (space_is_temporary(old_space))
2800
0
    txn_stmt_mark_as_temporary(txn, stmt);
2801
2802
0
  if (new_tuple == NULL) {
2803
    /* Space drop - nothing else to do. */
2804
0
    return 0;
2805
0
  }
2806
2807
  /*
2808
   * box_process1() bypasses the read-only check for the _truncate system
2809
   * space because there the space that is going to be truncated isn't yet
2810
   * known. Perform the check here if this statement was issued by this
2811
   * replica and the space isn't data-temporary or local.
2812
   */
2813
0
  bool is_temp = space_is_data_temporary(old_space) ||
2814
0
           space_is_local(old_space);
2815
0
  if (!is_temp && stmt->row->replica_id == 0 &&
2816
0
      box_check_writable() != 0)
2817
0
    return -1;
2818
2819
  /*
2820
   * Check if a write privilege was given, return an error if not.
2821
   * The check should precede initial recovery check to correctly
2822
   * handle direct insert into _truncate systable.
2823
   */
2824
0
  if (access_check_space(old_space, PRIV_W) != 0)
2825
0
    return -1;
2826
2827
  /*
2828
   * System spaces use triggers to keep records in sync
2829
   * with internal objects. Since space truncation doesn't
2830
   * invoke triggers, we don't permit it for system spaces.
2831
   */
2832
0
  if (space_is_system(old_space)) {
2833
0
    diag_set(ClientError, ER_TRUNCATE_SYSTEM_SPACE,
2834
0
        space_name(old_space));
2835
0
    return -1;
2836
0
  }
2837
2838
  /* Check space's holders. */
2839
0
  if (space_check_truncate(old_space) != 0)
2840
0
    return -1;
2841
2842
0
  struct alter_space *alter = alter_space_new(old_space);
2843
0
  if (alter == NULL)
2844
0
    return -1;
2845
0
  auto scoped_guard =
2846
0
    make_scoped_guard([=] { alter_space_delete(alter); });
2847
2848
  /*
2849
   * Modify the WAL header to prohibit
2850
   * replication of local & data-temporary
2851
   * spaces truncation
2852
   * unless it's a temporary space
2853
   * in which case the header doesn't exist.
2854
   */
2855
0
  if (is_temp && !space_is_temporary(old_space)) {
2856
0
    stmt->row->group_id = GROUP_LOCAL;
2857
    /*
2858
     * The trigger is invoked after txn->n_local_rows
2859
     * is counted, so don't forget to update it here.
2860
     */
2861
0
    ++txn->n_local_rows;
2862
0
  }
2863
2864
0
  try {
2865
    /*
2866
     * Recreate all indexes of the truncated space.
2867
     */
2868
0
    for (uint32_t i = 0; i < old_space->index_count; i++) {
2869
0
      struct index *old_index = old_space->index[i];
2870
0
      (void) new TruncateIndex(alter, old_index->def->iid);
2871
0
    }
2872
2873
0
    alter_space_do(stmt, alter);
2874
0
  } catch (Exception *e) {
2875
0
    return -1;
2876
0
  }
2877
0
  scoped_guard.is_active = false;
2878
0
  return 0;
2879
0
}
2880
2881
/* {{{ access control */
2882
2883
int
2884
user_has_data(struct user *user, bool *has_data)
2885
0
{
2886
0
  uint32_t uid = user->def->uid;
2887
0
  uint32_t spaces[] = { BOX_SPACE_ID, BOX_FUNC_ID, BOX_SEQUENCE_ID,
2888
0
            BOX_PRIV_ID, BOX_PRIV_ID };
2889
  /*
2890
   * owner index id #1 for _space and _func and _priv.
2891
   * For _priv also check that the user has no grants.
2892
   */
2893
0
  uint32_t indexes[] = { 1, 1, 1, 1, 0 };
2894
0
  uint32_t count = sizeof(spaces)/sizeof(*spaces);
2895
0
  bool out;
2896
0
  for (uint32_t i = 0; i < count; i++) {
2897
0
    if (space_has_data(spaces[i], indexes[i], uid, &out) != 0)
2898
0
      return -1;
2899
0
    if (out) {
2900
0
      *has_data = true;
2901
0
      return 0;
2902
0
    }
2903
0
  }
2904
0
  if (! user_map_is_empty(&user->users)) {
2905
0
    *has_data = true;
2906
0
    return 0;
2907
0
  }
2908
  /*
2909
   * If there was a role, the previous check would have
2910
   * returned true.
2911
   */
2912
0
  assert(user_map_is_empty(&user->roles));
2913
0
  *has_data = false;
2914
0
  return 0;
2915
0
}
2916
2917
/**
2918
 * Initialize the user authenticator from the _user space data.
2919
 */
2920
static int
2921
user_def_fill_auth_data(struct user_def *user, const char *auth_data)
2922
0
{
2923
0
  uint8_t type = mp_typeof(*auth_data);
2924
0
  if (type == MP_ARRAY || type == MP_NIL) {
2925
    /*
2926
     * Nothing useful.
2927
     * MP_ARRAY is a special case since Lua arrays are
2928
     * indistinguishable from tables, so an empty
2929
     * table may well be encoded as an msgpack array.
2930
     * Treat as no data.
2931
     */
2932
0
    return 0;
2933
0
  }
2934
0
  if (mp_typeof(*auth_data) != MP_MAP) {
2935
    /** Prevent users from making silly mistakes */
2936
0
    diag_set(ClientError, ER_CREATE_USER,
2937
0
        user->name, "invalid password format, "
2938
0
        "use box.schema.user.passwd() to reset password");
2939
0
    return -1;
2940
0
  }
2941
0
  uint32_t method_count = mp_decode_map(&auth_data);
2942
0
  for (uint32_t i = 0; i < method_count; i++) {
2943
0
    if (mp_typeof(*auth_data) != MP_STR) {
2944
0
      mp_next(&auth_data);
2945
0
      mp_next(&auth_data);
2946
0
      continue;
2947
0
    }
2948
0
    uint32_t method_name_len;
2949
0
    const char *method_name = mp_decode_str(&auth_data,
2950
0
              &method_name_len);
2951
0
    const char *auth_data_end = auth_data;
2952
0
    mp_next(&auth_data_end);
2953
0
    const struct auth_method *method = auth_method_by_name(
2954
0
            method_name, method_name_len);
2955
0
    if (method == NULL) {
2956
0
      auth_data = auth_data_end;
2957
0
      continue;
2958
0
    }
2959
0
    struct authenticator *auth = authenticator_new(
2960
0
        method, auth_data, auth_data_end);
2961
0
    if (auth == NULL)
2962
0
      return -1;
2963
    /* The guest user may only have an empty password. */
2964
0
    if (user->uid == GUEST &&
2965
0
        !authenticate_password(auth, "", 0)) {
2966
0
      authenticator_delete(auth);
2967
0
      diag_set(ClientError, ER_GUEST_USER_PASSWORD);
2968
0
      return -1;
2969
0
    }
2970
0
    user->auth = auth;
2971
0
    break;
2972
0
  }
2973
0
  return 0;
2974
0
}
2975
2976
static struct user_def *
2977
user_def_new_from_tuple(struct tuple *tuple)
2978
0
{
2979
0
  uint32_t name_len;
2980
0
  const char *name = tuple_field_str(tuple, BOX_USER_FIELD_NAME,
2981
0
             &name_len);
2982
0
  if (name == NULL)
2983
0
    return NULL;
2984
0
  if (name_len > BOX_NAME_MAX) {
2985
0
    diag_set(ClientError, ER_CREATE_USER,
2986
0
        tt_cstr(name, BOX_INVALID_NAME_MAX),
2987
0
        "user name is too long");
2988
0
    return NULL;
2989
0
  }
2990
0
  uint32_t uid;
2991
0
  if (tuple_field_u32(tuple, BOX_USER_FIELD_ID, &uid) != 0)
2992
0
    return NULL;
2993
0
  uint32_t owner;
2994
0
  if (tuple_field_u32(tuple, BOX_USER_FIELD_UID, &owner) != 0)
2995
0
    return NULL;
2996
0
  const char *type_str = tuple_field_cstr(tuple, BOX_USER_FIELD_TYPE);
2997
0
  if (type_str == NULL)
2998
0
    return NULL;
2999
0
  enum schema_object_type type = schema_object_type(type_str);
3000
0
  if (type != SC_ROLE && type != SC_USER) {
3001
0
    diag_set(ClientError, ER_CREATE_USER,
3002
0
       tt_cstr(name, name_len), "unknown user type");
3003
0
    return NULL;
3004
0
  }
3005
0
  if (identifier_check(name, name_len) != 0)
3006
0
    return NULL;
3007
0
  struct user_def *user = user_def_new(uid, owner, type, name, name_len);
3008
0
  auto def_guard = make_scoped_guard([=] { user_def_delete(user); });
3009
  /*
3010
   * AUTH_DATA field in _user space should contain
3011
   * chap-sha1 -> base64_encode(sha1(sha1(password), 0).
3012
   * Check for trivial errors when a plain text
3013
   * password is saved in this field instead.
3014
   */
3015
0
  if (tuple_field_count(tuple) > BOX_USER_FIELD_AUTH) {
3016
0
    const char *auth_data = tuple_field(tuple, BOX_USER_FIELD_AUTH);
3017
0
    const char *tmp = auth_data;
3018
0
    bool is_auth_empty;
3019
0
    if (mp_typeof(*auth_data) == MP_ARRAY &&
3020
0
        mp_decode_array(&tmp) == 0) {
3021
0
      is_auth_empty = true;
3022
0
    } else if (mp_typeof(*auth_data) == MP_MAP &&
3023
0
         mp_decode_map(&tmp) == 0) {
3024
0
      is_auth_empty = true;
3025
0
    } else {
3026
0
      is_auth_empty = false;
3027
0
    }
3028
0
    if (!is_auth_empty && user->type == SC_ROLE) {
3029
0
      diag_set(ClientError, ER_CREATE_ROLE, user->name,
3030
0
          "authentication data can not be set for a "\
3031
0
          "role");
3032
0
      return NULL;
3033
0
    }
3034
0
    if (user_def_fill_auth_data(user, auth_data) != 0)
3035
0
      return NULL;
3036
0
  }
3037
0
  if (tuple_field_count(tuple) > BOX_USER_FIELD_LAST_MODIFIED &&
3038
0
      tuple_field_u64(tuple, BOX_USER_FIELD_LAST_MODIFIED,
3039
0
          &user->last_modified) != 0)
3040
0
    return NULL;
3041
0
  def_guard.is_active = false;
3042
0
  return user;
3043
0
}
3044
3045
static int
3046
user_cache_remove_user(struct trigger *trigger, void * /* event */)
3047
0
{
3048
0
  struct tuple *tuple = (struct tuple *)trigger->data;
3049
0
  uint32_t uid;
3050
0
  if (tuple_field_u32(tuple, BOX_USER_FIELD_ID, &uid) != 0)
3051
0
    return -1;
3052
0
  user_cache_delete(uid);
3053
0
  return 0;
3054
0
}
3055
3056
static int
3057
user_cache_alter_user(struct trigger *trigger, void * /* event */)
3058
0
{
3059
0
  struct tuple *tuple = (struct tuple *)trigger->data;
3060
0
  struct user_def *user = user_def_new_from_tuple(tuple);
3061
0
  if (user == NULL)
3062
0
    return -1;
3063
0
  auto def_guard = make_scoped_guard([=] { user_def_delete(user); });
3064
  /* Can throw if, e.g. too many users. */
3065
0
  try {
3066
0
    user_cache_replace(user);
3067
0
  } catch (Exception *e) {
3068
0
    return -1;
3069
0
  }
3070
0
  def_guard.is_active = false;
3071
0
  return 0;
3072
0
}
3073
3074
/**
3075
 * A trigger invoked on replace in the user table.
3076
 */
3077
static int
3078
on_replace_dd_user(struct trigger * /* trigger */, void *event)
3079
0
{
3080
0
  struct txn *txn = (struct txn *) event;
3081
0
  struct txn_stmt *stmt = txn_current_stmt(txn);
3082
0
  struct tuple *old_tuple = stmt->old_tuple;
3083
0
  struct tuple *new_tuple = stmt->new_tuple;
3084
3085
0
  uint32_t uid;
3086
0
  if (tuple_field_u32(old_tuple ? old_tuple : new_tuple,
3087
0
          BOX_USER_FIELD_ID, &uid) != 0)
3088
0
    return -1;
3089
0
  struct user *old_user = user_by_id(uid);
3090
0
  if (new_tuple != NULL && old_user == NULL) { /* INSERT */
3091
0
    struct user_def *user = user_def_new_from_tuple(new_tuple);
3092
0
    if (user == NULL)
3093
0
      return -1;
3094
0
    auto def_guard = make_scoped_guard([=] {
3095
0
      user_def_delete(user);
3096
0
    });
3097
0
    if (access_check_ddl(user->name, user->owner, NULL,
3098
0
             user->type, PRIV_C) != 0)
3099
0
      return -1;
3100
0
    try {
3101
0
      (void) user_cache_replace(user);
3102
0
    } catch (Exception *e) {
3103
0
      return -1;
3104
0
    }
3105
0
    def_guard.is_active = false;
3106
0
    struct trigger *on_rollback =
3107
0
      txn_alter_trigger_new(user_cache_remove_user, new_tuple);
3108
0
    if (on_rollback == NULL)
3109
0
      return -1;
3110
0
    txn_stmt_on_rollback(stmt, on_rollback);
3111
0
  } else if (new_tuple == NULL) { /* DELETE */
3112
0
    if (access_check_ddl(old_user->def->name, old_user->def->owner,
3113
0
             old_user->access, old_user->def->type,
3114
0
             PRIV_D) != 0)
3115
0
      return -1;
3116
    /* Can't drop guest or super user */
3117
0
    if (uid <= (uint32_t) BOX_SYSTEM_USER_ID_MAX || uid == SUPER) {
3118
0
      diag_set(ClientError, ER_DROP_USER,
3119
0
          old_user->def->name,
3120
0
          "the user or the role is a system");
3121
0
      return -1;
3122
0
    }
3123
    /*
3124
     * Can only delete user if it has no spaces,
3125
     * no functions and no grants.
3126
     */
3127
0
    bool has_data;
3128
0
    if (user_has_data(old_user, &has_data) != 0) {
3129
0
      return -1;
3130
0
    }
3131
0
    if (has_data) {
3132
0
      diag_set(ClientError, ER_DROP_USER,
3133
0
          old_user->def->name, "the user has objects");
3134
0
      return -1;
3135
0
    }
3136
0
    user_cache_delete(uid);
3137
0
    struct trigger *on_rollback =
3138
0
      txn_alter_trigger_new(user_cache_alter_user, old_tuple);
3139
0
    if (on_rollback == NULL)
3140
0
      return -1;
3141
0
    txn_stmt_on_rollback(stmt, on_rollback);
3142
0
  } else { /* UPDATE, REPLACE */
3143
0
    assert(old_user != NULL && new_tuple != NULL);
3144
    /*
3145
     * Allow change of user properties (name,
3146
     * password) but first check that the change is
3147
     * correct.
3148
     */
3149
0
    struct user_def *user = user_def_new_from_tuple(new_tuple);
3150
0
    if (user == NULL)
3151
0
      return -1;
3152
0
    auto def_guard = make_scoped_guard([=] {
3153
0
      user_def_delete(user);
3154
0
    });
3155
0
    if (access_check_ddl(user->name, user->uid, old_user->access,
3156
0
             old_user->def->type, PRIV_A) != 0)
3157
0
      return -1;
3158
0
    try {
3159
0
      user_cache_replace(user);
3160
0
    } catch (Exception *e) {
3161
0
      return -1;
3162
0
    }
3163
0
    def_guard.is_active = false;
3164
0
    struct trigger *on_rollback =
3165
0
      txn_alter_trigger_new(user_cache_alter_user, old_tuple);
3166
0
    if (on_rollback == NULL)
3167
0
      return -1;
3168
0
    txn_stmt_on_rollback(stmt, on_rollback);
3169
0
  }
3170
0
  return 0;
3171
0
}
3172
3173
/**
3174
 * Get function identifiers from a tuple.
3175
 *
3176
 * @param tuple Tuple to get ids from.
3177
 * @param[out] fid Function identifier.
3178
 * @param[out] uid Owner identifier.
3179
 */
3180
static inline int
3181
func_def_get_ids_from_tuple(struct tuple *tuple, uint32_t *fid, uint32_t *uid)
3182
0
{
3183
0
  if (tuple_field_u32(tuple, BOX_FUNC_FIELD_ID, fid) != 0)
3184
0
    return -1;
3185
0
  return tuple_field_u32(tuple, BOX_FUNC_FIELD_UID, uid);
3186
0
}
3187
3188
/** Create a function definition from tuple. */
3189
static struct func_def *
3190
func_def_new_from_tuple(struct tuple *tuple)
3191
0
{
3192
0
  uint32_t field_count = tuple_field_count(tuple);
3193
0
  uint32_t name_len;
3194
0
  const char *name = tuple_field_str(tuple, BOX_FUNC_FIELD_NAME,
3195
0
             &name_len);
3196
0
  if (name == NULL)
3197
0
    return NULL;
3198
0
  if (name_len > BOX_NAME_MAX) {
3199
0
    diag_set(ClientError, ER_CREATE_FUNCTION,
3200
0
        tt_cstr(name, BOX_INVALID_NAME_MAX),
3201
0
        "function name is too long");
3202
0
    return NULL;
3203
0
  }
3204
0
  if (identifier_check(name, name_len) != 0)
3205
0
    return NULL;
3206
0
  enum func_language language = FUNC_LANGUAGE_LUA;
3207
0
  if (field_count > BOX_FUNC_FIELD_LANGUAGE) {
3208
0
    const char *language_str =
3209
0
      tuple_field_cstr(tuple, BOX_FUNC_FIELD_LANGUAGE);
3210
0
    if (language_str == NULL)
3211
0
      return NULL;
3212
0
    language = STR2ENUM(func_language, language_str);
3213
    /*
3214
     * 'SQL_BUILTIN' was dropped in 2.9, but to support upgrade
3215
     * from previous versions, we allow to create such functions.
3216
     */
3217
0
    if (language == func_language_MAX ||
3218
0
        language == FUNC_LANGUAGE_SQL ||
3219
0
        (language == FUNC_LANGUAGE_SQL_BUILTIN &&
3220
0
         !dd_check_is_disabled())) {
3221
0
      diag_set(ClientError, ER_FUNCTION_LANGUAGE,
3222
0
         language_str, tt_cstr(name, name_len));
3223
0
      return NULL;
3224
0
    }
3225
0
  }
3226
0
  uint32_t body_len = 0;
3227
0
  const char *body = NULL;
3228
0
  uint32_t comment_len = 0;
3229
0
  const char *comment = NULL;
3230
0
  if (field_count > BOX_FUNC_FIELD_BODY) {
3231
0
    body = tuple_field_str(tuple, BOX_FUNC_FIELD_BODY, &body_len);
3232
0
    if (body == NULL)
3233
0
      return NULL;
3234
0
    comment = tuple_field_str(tuple, BOX_FUNC_FIELD_COMMENT,
3235
0
            &comment_len);
3236
0
    if (comment == NULL)
3237
0
      return NULL;
3238
0
    uint32_t len;
3239
0
    const char *routine_type = tuple_field_str(tuple,
3240
0
          BOX_FUNC_FIELD_ROUTINE_TYPE, &len);
3241
0
    if (routine_type == NULL)
3242
0
      return NULL;
3243
0
    if (len != strlen("function") ||
3244
0
        strncasecmp(routine_type, "function", len) != 0) {
3245
0
      diag_set(ClientError, ER_CREATE_FUNCTION, name,
3246
0
          "unsupported routine_type value");
3247
0
      return NULL;
3248
0
    }
3249
0
    const char *sql_data_access = tuple_field_str(tuple,
3250
0
          BOX_FUNC_FIELD_SQL_DATA_ACCESS, &len);
3251
0
    if (sql_data_access == NULL)
3252
0
      return NULL;
3253
0
    if (len != strlen("none") ||
3254
0
        strncasecmp(sql_data_access, "none", len) != 0) {
3255
0
      diag_set(ClientError, ER_CREATE_FUNCTION, name,
3256
0
          "unsupported sql_data_access value");
3257
0
      return NULL;
3258
0
    }
3259
0
    bool is_null_call;
3260
0
    if (tuple_field_bool(tuple, BOX_FUNC_FIELD_IS_NULL_CALL,
3261
0
             &is_null_call) != 0)
3262
0
      return NULL;
3263
0
    if (is_null_call != true) {
3264
0
      diag_set(ClientError, ER_CREATE_FUNCTION, name,
3265
0
          "unsupported is_null_call value");
3266
0
      return NULL;
3267
0
    }
3268
0
  }
3269
0
  uint32_t fid, uid;
3270
0
  if (func_def_get_ids_from_tuple(tuple, &fid, &uid) != 0)
3271
0
    return NULL;
3272
0
  if (fid > BOX_FUNCTION_MAX) {
3273
0
    diag_set(ClientError, ER_CREATE_FUNCTION,
3274
0
        tt_cstr(name, name_len), "function id is too big");
3275
0
    return NULL;
3276
0
  }
3277
0
  const char *triggers = NULL;
3278
0
  if (field_count > BOX_FUNC_FIELD_TRIGGER) {
3279
0
    triggers = tuple_field_with_type(tuple, BOX_FUNC_FIELD_TRIGGER,
3280
0
             MP_ARRAY);
3281
0
    if (triggers == NULL)
3282
0
      return NULL;
3283
0
    const char *triggers_cursor = triggers;
3284
0
    uint32_t trigger_count = mp_decode_array(&triggers_cursor);
3285
0
    for (uint32_t i = 0; i < trigger_count; i++) {
3286
0
      enum mp_type actual_type = mp_typeof(*triggers_cursor);
3287
0
      if (actual_type != MP_STR) {
3288
0
        diag_set(ClientError, ER_CREATE_FUNCTION,
3289
0
           name, "trigger name must be a string");
3290
0
        return NULL;
3291
0
      }
3292
0
      mp_next(&triggers_cursor);
3293
0
    };
3294
    /** Do not set the field if the array is empty. */
3295
0
    if (trigger_count == 0)
3296
0
      triggers = NULL;
3297
0
  }
3298
3299
0
  if (triggers != NULL &&
3300
0
      schema_check_feature(SCHEMA_FEATURE_PERSISTENT_TRIGGERS) != 0)
3301
0
    return NULL;
3302
3303
0
  struct func_def *def = func_def_new(fid, uid, name, name_len,
3304
0
              language, body, body_len,
3305
0
              comment, comment_len, triggers);
3306
0
  auto def_guard = make_scoped_guard([=] { func_def_delete(def); });
3307
0
  if (field_count > BOX_FUNC_FIELD_SETUID) {
3308
0
    uint32_t out;
3309
0
    if (tuple_field_u32(tuple, BOX_FUNC_FIELD_SETUID, &out) != 0)
3310
0
      return NULL;
3311
0
    def->setuid = out;
3312
0
  }
3313
0
  if (field_count > BOX_FUNC_FIELD_BODY) {
3314
0
    if (tuple_field_bool(tuple, BOX_FUNC_FIELD_IS_DETERMINISTIC,
3315
0
             &(def->is_deterministic)) != 0)
3316
0
      return NULL;
3317
0
    if (tuple_field_bool(tuple, BOX_FUNC_FIELD_IS_SANDBOXED,
3318
0
             &(def->is_sandboxed)) != 0)
3319
0
      return NULL;
3320
0
    const char *returns =
3321
0
      tuple_field_cstr(tuple, BOX_FUNC_FIELD_RETURNS);
3322
0
    if (returns == NULL)
3323
0
      return NULL;
3324
0
    def->returns = STR2ENUM(field_type, returns);
3325
0
    if (def->returns == field_type_MAX) {
3326
0
      diag_set(ClientError, ER_CREATE_FUNCTION,
3327
0
          def->name, "invalid returns value");
3328
0
      return NULL;
3329
0
    }
3330
0
    def->exports.all = 0;
3331
0
    const char *exports = tuple_field_with_type(tuple,
3332
0
      BOX_FUNC_FIELD_EXPORTS, MP_ARRAY);
3333
0
    if (exports == NULL)
3334
0
      return NULL;
3335
0
    uint32_t cnt = mp_decode_array(&exports);
3336
0
    for (uint32_t i = 0; i < cnt; i++) {
3337
0
      enum mp_type actual_type = mp_typeof(*exports);
3338
0
      if (actual_type != MP_STR) {
3339
0
        diag_set(ClientError, ER_FIELD_TYPE,
3340
0
           int2str(BOX_FUNC_FIELD_EXPORTS + 1),
3341
0
           mp_type_strs[MP_STR], mp_type_strs[actual_type]);
3342
0
        return NULL;
3343
0
      }
3344
0
      uint32_t len;
3345
0
      const char *str = mp_decode_str(&exports, &len);
3346
0
      switch (STRN2ENUM(func_language, str, len)) {
3347
0
      case FUNC_LANGUAGE_LUA:
3348
0
        def->exports.lua = true;
3349
0
        break;
3350
0
      case FUNC_LANGUAGE_SQL:
3351
0
        def->exports.sql = true;
3352
0
        break;
3353
0
      default:
3354
0
        diag_set(ClientError, ER_CREATE_FUNCTION,
3355
0
            def->name, "invalid exports value");
3356
0
        return NULL;
3357
0
      }
3358
0
    }
3359
0
    const char *aggregate =
3360
0
      tuple_field_cstr(tuple, BOX_FUNC_FIELD_AGGREGATE);
3361
0
    if (aggregate == NULL)
3362
0
      return NULL;
3363
0
    def->aggregate = STR2ENUM(func_aggregate, aggregate);
3364
0
    if (def->aggregate == func_aggregate_MAX) {
3365
0
      diag_set(ClientError, ER_CREATE_FUNCTION,
3366
0
          def->name, "invalid aggregate value");
3367
0
      return NULL;
3368
0
    }
3369
0
    if (def->aggregate == FUNC_AGGREGATE_GROUP &&
3370
0
        def->exports.lua) {
3371
0
      diag_set(ClientError, ER_CREATE_FUNCTION, def->name,
3372
0
         "aggregate function can only be accessed in "
3373
0
         "SQL");
3374
0
      return NULL;
3375
0
    }
3376
0
    const char *param_list = tuple_field_with_type(tuple,
3377
0
      BOX_FUNC_FIELD_PARAM_LIST, MP_ARRAY);
3378
0
    if (param_list == NULL)
3379
0
      return NULL;
3380
0
    uint32_t argc = mp_decode_array(&param_list);
3381
0
    for (uint32_t i = 0; i < argc; i++) {
3382
0
      enum mp_type actual_type = mp_typeof(*param_list);
3383
0
      if (actual_type != MP_STR) {
3384
0
        diag_set(ClientError, ER_CREATE_FUNCTION,
3385
0
           def->name,
3386
0
           "parameter type must be a string");
3387
0
        return NULL;
3388
0
      }
3389
0
      uint32_t len;
3390
0
      const char *str = mp_decode_str(&param_list, &len);
3391
0
      if (STRN2ENUM(field_type, str, len) == field_type_MAX) {
3392
0
        diag_set(ClientError, ER_CREATE_FUNCTION,
3393
0
            def->name, "invalid argument type");
3394
0
        return NULL;
3395
0
      }
3396
0
    }
3397
0
    if (def->aggregate == FUNC_AGGREGATE_GROUP && argc == 0) {
3398
0
      diag_set(ClientError, ER_CREATE_FUNCTION, def->name,
3399
0
         "aggregate function must have at least one "
3400
0
         "argument");
3401
0
      return NULL;
3402
0
    }
3403
0
    def->param_count = argc;
3404
0
    const char *opts = tuple_field(tuple, BOX_FUNC_FIELD_OPTS);
3405
0
    if (opts_decode(&def->opts, func_opts_reg, &opts, NULL) != 0) {
3406
0
      diag_set(ClientError, ER_WRONG_FUNCTION_OPTIONS,
3407
0
         diag_last_error(diag_get())->errmsg);
3408
0
      return NULL;
3409
0
    }
3410
0
  } else {
3411
    /* By default export to Lua, but not other frontends. */
3412
0
    def->exports.lua = true;
3413
0
  }
3414
0
  if (func_def_check(def) != 0)
3415
0
    return NULL;
3416
0
  def_guard.is_active = false;
3417
0
  return def;
3418
0
}
3419
3420
/**
3421
 * Depending on @a set value, creates and sets or deletes triggers
3422
 * in the events listed by triggers option.
3423
 */
3424
static void
3425
func_alter_triggers(struct func *func, bool set)
3426
0
{
3427
0
  struct func_def *def = func->def;
3428
0
  const char *triggers = def->triggers;
3429
0
  if (triggers == NULL)
3430
0
    return;
3431
0
  const enum func_holder_type holder_type = FUNC_HOLDER_TRIGGER;
3432
0
  uint32_t trigger_count = mp_decode_array(&triggers);
3433
0
  for (uint32_t i = 0; i < trigger_count; i++) {
3434
0
    assert(mp_typeof(*triggers) == MP_STR);
3435
0
    uint32_t len;
3436
0
    const char *event_name = mp_decode_str(&triggers, &len);
3437
0
    const char *event_name_cstr = tt_cstr(event_name, len);
3438
0
    struct event *event = event_get(event_name_cstr, true);
3439
0
    const char *trg_name = tt_cstr(def->name, def->name_len);
3440
0
    struct func_adapter *trg = NULL;
3441
0
    if (set)
3442
0
      trg = func_adapter_func_create(func, holder_type);
3443
0
    event_reset_trigger(event, trg_name, trg);
3444
0
  };
3445
0
}
3446
3447
static int
3448
on_create_func_rollback(struct trigger *trigger, void * /* event */)
3449
0
{
3450
  /* Remove the new function from the cache and delete it. */
3451
0
  struct func *func = (struct func *)trigger->data;
3452
0
  func_alter_triggers(func, false);
3453
0
  func_cache_delete(func->def->fid);
3454
0
  if (trigger_run(&on_alter_func, func) != 0)
3455
0
    return -1;
3456
0
  func_delete(func);
3457
0
  return 0;
3458
0
}
3459
3460
static int
3461
on_drop_func_commit(struct trigger *trigger, void * /* event */)
3462
0
{
3463
  /* Delete the old function. */
3464
0
  struct func *func = (struct func *)trigger->data;
3465
0
  func_delete(func);
3466
0
  return 0;
3467
0
}
3468
3469
static int
3470
on_drop_func_rollback(struct trigger *trigger, void * /* event */)
3471
0
{
3472
0
  struct func *func = (struct func *)trigger->data;
3473
  /* Insert the old function back into the cache if it was removed. */
3474
0
  if (func_by_id(func->def->fid) == NULL)
3475
0
    func_cache_insert(func);
3476
0
  func_alter_triggers(func, true);
3477
0
  if (trigger_run(&on_alter_func, func) != 0)
3478
0
    return -1;
3479
0
  return 0;
3480
0
}
3481
3482
/**
3483
 * A trigger invoked on replace in a space containing
3484
 * functions on which there were defined any grants.
3485
 */
3486
static int
3487
on_replace_dd_func(struct trigger * /* trigger */, void *event)
3488
0
{
3489
0
  struct txn *txn = (struct txn *) event;
3490
0
  struct txn_stmt *stmt = txn_current_stmt(txn);
3491
0
  struct tuple *old_tuple = stmt->old_tuple;
3492
0
  struct tuple *new_tuple = stmt->new_tuple;
3493
3494
0
  uint32_t fid;
3495
0
  if (tuple_field_u32(old_tuple ? old_tuple : new_tuple,
3496
0
          BOX_FUNC_FIELD_ID, &fid) != 0)
3497
0
    return -1;
3498
0
  struct func *old_func = func_by_id(fid);
3499
0
  if (new_tuple != NULL && old_func == NULL) { /* INSERT */
3500
0
    struct func_def *def = func_def_new_from_tuple(new_tuple);
3501
0
    if (def == NULL)
3502
0
      return -1;
3503
0
    auto def_guard = make_scoped_guard([=] {
3504
0
      func_def_delete(def);
3505
0
    });
3506
0
    if (access_check_ddl(def->name, def->uid, NULL,
3507
0
             SC_FUNCTION, PRIV_C) != 0)
3508
0
      return -1;
3509
0
    struct trigger *on_rollback =
3510
0
      txn_alter_trigger_new(on_create_func_rollback, NULL);
3511
0
    if (on_rollback == NULL)
3512
0
      return -1;
3513
0
    struct func *func = func_new(def);
3514
0
    if (func == NULL)
3515
0
      return -1;
3516
0
    func_cache_insert(func);
3517
0
    func_alter_triggers(func, true);
3518
0
    on_rollback->data = func;
3519
0
    txn_stmt_on_rollback(stmt, on_rollback);
3520
0
    if (trigger_run(&on_alter_func, func) != 0)
3521
0
      return -1;
3522
0
  } else if (new_tuple == NULL) {         /* DELETE */
3523
0
    uint32_t uid;
3524
0
    if (func_def_get_ids_from_tuple(old_tuple, &fid, &uid) != 0)
3525
0
      return -1;
3526
    /*
3527
     * Can only delete func if you're the one
3528
     * who created it or a superuser.
3529
     */
3530
0
    if (access_check_ddl(old_func->def->name, uid, old_func->access,
3531
0
             SC_FUNCTION, PRIV_D) != 0)
3532
0
      return -1;
3533
    /* Can only delete func if it has no grants. */
3534
0
    bool out;
3535
0
    if (schema_find_grants("function", old_func->def->fid, &out) != 0) {
3536
0
      return -1;
3537
0
    }
3538
0
    if (out) {
3539
0
      diag_set(ClientError, ER_DROP_FUNCTION,
3540
0
         (unsigned)old_func->def->fid,
3541
0
         "function has grants");
3542
0
      return -1;
3543
0
    }
3544
0
    if (space_has_data(BOX_FUNC_INDEX_ID, 1, old_func->def->fid, &out) != 0)
3545
0
      return -1;
3546
0
    if (old_func != NULL && out) {
3547
0
      diag_set(ClientError, ER_DROP_FUNCTION,
3548
0
         (unsigned)old_func->def->fid,
3549
0
         "function has references");
3550
0
      return -1;
3551
0
    }
3552
3553
0
    struct trigger *on_commit =
3554
0
      txn_alter_trigger_new(on_drop_func_commit, old_func);
3555
0
    struct trigger *on_rollback =
3556
0
      txn_alter_trigger_new(on_drop_func_rollback, old_func);
3557
0
    if (on_commit == NULL || on_rollback == NULL)
3558
0
      return -1;
3559
0
    txn_stmt_on_commit(stmt, on_commit);
3560
0
    txn_stmt_on_rollback(stmt, on_rollback);
3561
3562
    /* Triggers pin the function - drop them before pin check. */
3563
0
    func_alter_triggers(old_func, false);
3564
3565
    /* Check whether old_func is used somewhere. */
3566
0
    enum func_holder_type pinned_type;
3567
0
    if (func_is_pinned(old_func, &pinned_type)) {
3568
0
      const char *type_str =
3569
0
        func_cache_holder_type_strs[pinned_type];
3570
0
      diag_set(ClientError, ER_DROP_FUNCTION,
3571
0
         (unsigned)old_func->def->fid,
3572
0
         tt_sprintf("function is referenced by %s",
3573
0
              type_str));
3574
0
      return -1;
3575
0
    }
3576
0
    func_cache_delete(old_func->def->fid);
3577
0
    if (trigger_run(&on_alter_func, old_func) != 0)
3578
0
      return -1;
3579
0
  } else {                                /* UPDATE, REPLACE */
3580
0
    assert(new_tuple != NULL && old_tuple != NULL);
3581
    /**
3582
     * Allow an alter that doesn't change the
3583
     * definition to support upgrade script.
3584
     */
3585
0
    struct func_def *old_def = NULL, *new_def = NULL;
3586
0
    auto guard = make_scoped_guard([&old_def, &new_def] {
3587
0
      free(old_def);
3588
0
      free(new_def);
3589
0
    });
3590
0
    old_def = func_def_new_from_tuple(old_tuple);
3591
0
    new_def = func_def_new_from_tuple(new_tuple);
3592
0
    if (old_def == NULL || new_def == NULL)
3593
0
      return -1;
3594
0
    if (func_def_cmp(new_def, old_def) != 0) {
3595
0
      diag_set(ClientError, ER_UNSUPPORTED, "function",
3596
0
          "alter");
3597
0
      return -1;
3598
0
    }
3599
0
  }
3600
0
  return 0;
3601
0
}
3602
3603
/** Create a collation identifier definition from tuple. */
3604
int
3605
coll_id_def_new_from_tuple(struct tuple *tuple, struct coll_id_def *def)
3606
0
{
3607
0
  memset(def, 0, sizeof(*def));
3608
0
  uint32_t name_len, locale_len, type_len;
3609
0
  if (tuple_field_u32(tuple, BOX_COLLATION_FIELD_ID, &(def->id)) != 0)
3610
0
    return -1;
3611
0
  def->name = tuple_field_str(tuple, BOX_COLLATION_FIELD_NAME, &name_len);
3612
0
  if (def->name == NULL)
3613
0
    return -1;
3614
0
  def->name_len = name_len;
3615
0
  if (name_len > BOX_NAME_MAX) {
3616
0
    diag_set(ClientError, ER_CANT_CREATE_COLLATION,
3617
0
        "collation name is too long");
3618
0
    return -1;
3619
0
  }
3620
0
  if (identifier_check(def->name, name_len) != 0)
3621
0
    return -1;
3622
0
  if (tuple_field_u32(tuple, BOX_COLLATION_FIELD_UID, &(def->owner_id)) != 0)
3623
0
    return -1;
3624
0
  const char *type = tuple_field_str(tuple, BOX_COLLATION_FIELD_TYPE,
3625
0
             &type_len);
3626
0
  if (type == NULL)
3627
0
    return -1;
3628
0
  struct coll_def *base = &def->base;
3629
0
  base->type = STRN2ENUM(coll_type, type, type_len);
3630
0
  if (base->type == coll_type_MAX) {
3631
0
    diag_set(ClientError, ER_CANT_CREATE_COLLATION,
3632
0
        "unknown collation type");
3633
0
    return -1;
3634
0
  }
3635
0
  const char *locale = tuple_field_str(tuple, BOX_COLLATION_FIELD_LOCALE,
3636
0
               &locale_len);
3637
0
  if (locale == NULL)
3638
0
    return -1;
3639
0
  if (locale_len > COLL_LOCALE_LEN_MAX) {
3640
0
    diag_set(ClientError, ER_CANT_CREATE_COLLATION,
3641
0
        "collation locale is too long");
3642
0
    return -1;
3643
0
  }
3644
0
  if (locale_len > 0)
3645
0
    if (identifier_check(locale, locale_len) != 0)
3646
0
      return -1;
3647
0
  snprintf(base->locale, sizeof(base->locale), "%.*s", locale_len,
3648
0
     locale);
3649
0
  const char *options = tuple_field_with_type(tuple,
3650
0
          BOX_COLLATION_FIELD_OPTIONS, MP_MAP);
3651
0
  if (options == NULL)
3652
0
    return -1;
3653
0
  if (opts_decode(&base->icu, coll_icu_opts_reg, &options, NULL) != 0) {
3654
0
    diag_set(ClientError, ER_WRONG_COLLATION_OPTIONS,
3655
0
       diag_last_error(diag_get())->errmsg);
3656
0
    return -1;
3657
0
  }
3658
3659
0
  if (base->icu.french_collation == coll_icu_on_off_MAX) {
3660
0
    diag_set(ClientError, ER_CANT_CREATE_COLLATION,
3661
0
        "ICU wrong french_collation option setting, "
3662
0
          "expected ON | OFF");
3663
0
    return -1;
3664
0
  }
3665
3666
0
  if (base->icu.alternate_handling == coll_icu_alternate_handling_MAX) {
3667
0
    diag_set(ClientError, ER_CANT_CREATE_COLLATION,
3668
0
        "ICU wrong alternate_handling option setting, "
3669
0
          "expected NON_IGNORABLE | SHIFTED");
3670
0
    return -1;
3671
0
  }
3672
3673
0
  if (base->icu.case_first == coll_icu_case_first_MAX) {
3674
0
    diag_set(ClientError, ER_CANT_CREATE_COLLATION,
3675
0
        "ICU wrong case_first option setting, "
3676
0
          "expected OFF | UPPER_FIRST | LOWER_FIRST");
3677
0
    return -1;
3678
0
  }
3679
3680
0
  if (base->icu.case_level == coll_icu_on_off_MAX) {
3681
0
    diag_set(ClientError, ER_CANT_CREATE_COLLATION,
3682
0
        "ICU wrong case_level option setting, "
3683
0
          "expected ON | OFF");
3684
0
    return -1;
3685
0
  }
3686
3687
0
  if (base->icu.normalization_mode == coll_icu_on_off_MAX) {
3688
0
    diag_set(ClientError, ER_CANT_CREATE_COLLATION,
3689
0
        "ICU wrong normalization_mode option setting, "
3690
0
          "expected ON | OFF");
3691
0
    return -1;
3692
0
  }
3693
3694
0
  if (base->icu.strength == coll_icu_strength_MAX) {
3695
0
    diag_set(ClientError, ER_CANT_CREATE_COLLATION,
3696
0
        "ICU wrong strength option setting, "
3697
0
          "expected PRIMARY | SECONDARY | "
3698
0
          "TERTIARY | QUATERNARY | IDENTICAL");
3699
0
    return -1;
3700
0
  }
3701
3702
0
  if (base->icu.numeric_collation == coll_icu_on_off_MAX) {
3703
0
    diag_set(ClientError, ER_CANT_CREATE_COLLATION,
3704
0
        "ICU wrong numeric_collation option setting, "
3705
0
          "expected ON | OFF");
3706
0
    return -1;
3707
0
  }
3708
0
  return 0;
3709
0
}
3710
3711
/** Delete the new collation identifier. */
3712
static int
3713
on_create_collation_rollback(struct trigger *trigger, void *event)
3714
0
{
3715
0
  (void) event;
3716
0
  struct coll_id *coll_id = (struct coll_id *) trigger->data;
3717
0
  coll_id_cache_delete(coll_id);
3718
0
  coll_id_delete(coll_id);
3719
0
  return 0;
3720
0
}
3721
3722
3723
/** Free a deleted collation identifier on commit. */
3724
static int
3725
on_drop_collation_commit(struct trigger *trigger, void *event)
3726
0
{
3727
0
  (void) event;
3728
0
  struct coll_id *coll_id = (struct coll_id *) trigger->data;
3729
0
  coll_id_delete(coll_id);
3730
0
  return 0;
3731
0
}
3732
3733
/** Put the collation identifier back on rollback. */
3734
static int
3735
on_drop_collation_rollback(struct trigger *trigger, void *event)
3736
0
{
3737
0
  (void) event;
3738
0
  struct coll_id *coll_id = (struct coll_id *) trigger->data;
3739
0
  struct coll_id *replaced_id;
3740
0
  if (coll_id_cache_replace(coll_id, &replaced_id) != 0)
3741
0
    panic("Out of memory on insertion into collation cache");
3742
0
  assert(replaced_id == NULL);
3743
0
  return 0;
3744
0
}
3745
3746
/**
3747
 * A trigger invoked on replace in a space containing
3748
 * collations that a user defined.
3749
 */
3750
static int
3751
on_replace_dd_collation(struct trigger * /* trigger */, void *event)
3752
0
{
3753
0
  struct txn *txn = (struct txn *) event;
3754
0
  struct txn_stmt *stmt = txn_current_stmt(txn);
3755
0
  struct tuple *old_tuple = stmt->old_tuple;
3756
0
  struct tuple *new_tuple = stmt->new_tuple;
3757
0
  if (new_tuple == NULL && old_tuple != NULL) {
3758
    /* DELETE */
3759
0
    struct trigger *on_commit =
3760
0
      txn_alter_trigger_new(on_drop_collation_commit, NULL);
3761
0
    struct trigger *on_rollback =
3762
0
      txn_alter_trigger_new(on_drop_collation_rollback, NULL);
3763
0
    if (on_commit == NULL || on_rollback == NULL)
3764
0
      return -1;
3765
0
    uint32_t out;
3766
0
    if (tuple_field_u32(old_tuple, BOX_COLLATION_FIELD_ID, &out) != 0)
3767
0
      return -1;
3768
0
    int32_t old_id = out;
3769
    /*
3770
     * Don't allow user to drop "none" collation
3771
     * since it is very special and vastly used
3772
     * under the hood. Hence, we can rely on the
3773
     * fact that "none" collation features id == 0.
3774
     */
3775
0
    if (old_id == COLL_NONE) {
3776
0
      diag_set(ClientError, ER_DROP_COLLATION, "none",
3777
0
          "system collation");
3778
0
      return -1;
3779
0
    }
3780
0
    struct coll_id *old_coll_id = coll_by_id(old_id);
3781
0
    assert(old_coll_id != NULL);
3782
0
    if (access_check_ddl(old_coll_id->name, old_coll_id->owner_id,
3783
0
             NULL, SC_COLLATION, PRIV_D) != 0)
3784
0
      return -1;
3785
    /*
3786
     * Don't allow user to drop a collation identifier that is
3787
     * currently used.
3788
     */
3789
0
    enum coll_id_holder_type pinned_type;
3790
0
    if (coll_id_is_pinned(old_coll_id, &pinned_type)) {
3791
0
      const char *type_str =
3792
0
        coll_id_holder_type_strs[pinned_type];
3793
0
      diag_set(ClientError, ER_DROP_COLLATION,
3794
0
         old_coll_id->name,
3795
0
         tt_sprintf("collation is referenced by %s",
3796
0
              type_str));
3797
0
      return -1;
3798
0
    }
3799
    /*
3800
     * Set on_commit/on_rollback triggers after
3801
     * deletion from the cache to make trigger logic
3802
     * simple.
3803
     */
3804
0
    coll_id_cache_delete(old_coll_id);
3805
0
    on_rollback->data = old_coll_id;
3806
0
    on_commit->data = old_coll_id;
3807
0
    txn_stmt_on_rollback(stmt, on_rollback);
3808
0
    txn_stmt_on_commit(stmt, on_commit);
3809
0
  } else if (new_tuple != NULL && old_tuple == NULL) {
3810
    /* INSERT */
3811
0
    struct trigger *on_rollback =
3812
0
      txn_alter_trigger_new(on_create_collation_rollback, NULL);
3813
0
    if (on_rollback == NULL)
3814
0
      return -1;
3815
0
    struct coll_id_def new_def;
3816
0
    if (coll_id_def_new_from_tuple(new_tuple, &new_def) != 0)
3817
0
      return -1;
3818
0
    if (access_check_ddl(new_def.name, new_def.owner_id,
3819
0
             NULL, SC_COLLATION, PRIV_C) != 0)
3820
0
      return -1;
3821
0
    struct coll_id *new_coll_id = coll_id_new(&new_def);
3822
0
    if (new_coll_id == NULL)
3823
0
      return -1;
3824
0
    struct coll_id *replaced_id;
3825
0
    if (coll_id_cache_replace(new_coll_id, &replaced_id) != 0) {
3826
0
      coll_id_delete(new_coll_id);
3827
0
      return -1;
3828
0
    }
3829
0
    assert(replaced_id == NULL);
3830
0
    on_rollback->data = new_coll_id;
3831
0
    txn_stmt_on_rollback(stmt, on_rollback);
3832
0
  } else {
3833
    /* UPDATE */
3834
0
    assert(new_tuple != NULL && old_tuple != NULL);
3835
0
    diag_set(ClientError, ER_UNSUPPORTED, "collation", "alter");
3836
0
    return -1;
3837
0
  }
3838
0
  return 0;
3839
0
}
3840
3841
/**
3842
 * Create a privilege definition from tuple.
3843
 */
3844
int
3845
priv_def_create_from_tuple(struct priv_def *priv, struct tuple *tuple)
3846
0
{
3847
0
  if (tuple_field_u32(tuple, BOX_PRIV_FIELD_ID, &(priv->grantor_id)) != 0 ||
3848
0
      tuple_field_u32(tuple, BOX_PRIV_FIELD_UID, &(priv->grantee_id)) != 0)
3849
0
    return -1;
3850
3851
0
  const char *object_type =
3852
0
    tuple_field_cstr(tuple, BOX_PRIV_FIELD_OBJECT_TYPE);
3853
0
  if (object_type == NULL)
3854
0
    return -1;
3855
0
  priv->object_type = schema_object_type(object_type);
3856
0
  assert(priv->object_type < schema_object_type_MAX);
3857
3858
0
  const char *data = tuple_field(tuple, BOX_PRIV_FIELD_OBJECT_ID);
3859
0
  if (data == NULL) {
3860
0
    diag_set(ClientError, ER_NO_SUCH_FIELD_NO,
3861
0
        BOX_PRIV_FIELD_OBJECT_ID + TUPLE_INDEX_BASE);
3862
0
    return -1;
3863
0
  }
3864
  /*
3865
   * When granting or revoking privileges on a whole entity
3866
   * we pass empty string ('') to object_id to indicate
3867
   * grant on every object of that entity.
3868
   * So check for that first.
3869
   */
3870
0
  switch (mp_typeof(*data)) {
3871
0
  case MP_STR:
3872
0
    priv->object_name = mp_decode_str(&data,
3873
0
              &priv->object_name_len);
3874
0
    if (priv->object_name_len == 0) {
3875
      /* Entity-wide privilege. */
3876
0
      priv->is_entity_access = true;
3877
0
      priv->object_id = 0;
3878
0
      priv->object_name = NULL;
3879
0
      break;
3880
0
    } else if (priv->object_type == SC_LUA_CALL) {
3881
      /*
3882
       * lua_call objects are global Lua functions.
3883
       * They aren't stored in the database hence
3884
       * don't have numeric ids. They are identified
3885
       * by string names.
3886
       */
3887
0
      priv->is_entity_access = false;
3888
0
      priv->object_id = 0;
3889
0
      break;
3890
0
    }
3891
0
    FALLTHROUGH;
3892
0
  default:
3893
0
    priv->is_entity_access = false;
3894
0
    if (tuple_field_u32(tuple,
3895
0
        BOX_PRIV_FIELD_OBJECT_ID, &(priv->object_id)) != 0)
3896
0
      return -1;
3897
0
    priv->object_name = NULL;
3898
0
    priv->object_name_len = 0;
3899
0
  }
3900
0
  if (priv->object_type == SC_UNKNOWN) {
3901
0
    diag_set(ClientError, ER_UNKNOWN_SCHEMA_OBJECT,
3902
0
        object_type);
3903
0
    return -1;
3904
0
  }
3905
0
  uint32_t out;
3906
0
  if (tuple_field_u32(tuple, BOX_PRIV_FIELD_ACCESS, &out) != 0)
3907
0
    return -1;
3908
0
  priv->access = out;
3909
0
  return 0;
3910
0
}
3911
3912
/*
3913
 * This function checks that:
3914
 * - a privilege is granted from an existing user to an existing
3915
 *   user on an existing object
3916
 * - the grantor has the right to grant (is the owner of the object)
3917
 *
3918
 * @XXX Potentially there is a race in case of rollback, since an
3919
 * object can be changed during WAL write.
3920
 * In the future we must protect grant/revoke with a logical lock.
3921
 */
3922
static int
3923
priv_def_check(struct priv_def *priv, enum priv_type priv_type)
3924
0
{
3925
0
  struct user *grantor = user_find(priv->grantor_id);
3926
0
  if (grantor == NULL)
3927
0
    return -1;
3928
  /* May be a role */
3929
0
  struct user *grantee = user_by_id(priv->grantee_id);
3930
0
  if (grantee == NULL) {
3931
0
    diag_set(ClientError, ER_NO_SUCH_USER,
3932
0
        int2str(priv->grantee_id));
3933
0
    return -1;
3934
0
  }
3935
0
  const char *name = "";
3936
0
  struct access *object = NULL;
3937
0
  switch (priv->object_type) {
3938
0
  case SC_SPACE:
3939
0
  {
3940
0
    if (priv->is_entity_access)
3941
0
      break;
3942
0
    struct space *space = space_cache_find(priv->object_id);
3943
0
    if (space == NULL)
3944
0
      return -1;
3945
0
    name = space_name(space);
3946
0
    object = space->access;
3947
0
    if (space->def->uid != grantor->def->uid &&
3948
0
        grantor->def->uid != ADMIN) {
3949
0
      diag_set(AccessDeniedError,
3950
0
          priv_name(priv_type),
3951
0
          schema_object_name(SC_SPACE), name,
3952
0
          grantor->def->name);
3953
0
      return -1;
3954
0
    }
3955
0
    if (space_is_temporary(space)) {
3956
0
      diag_set(ClientError, ER_UNSUPPORTED,
3957
0
         "temporary space", "privileges");
3958
0
      return -1;
3959
0
    }
3960
0
    break;
3961
0
  }
3962
0
  case SC_FUNCTION:
3963
0
  {
3964
0
    if (priv->is_entity_access)
3965
0
      break;
3966
0
    struct func *func = func_by_id(priv->object_id);
3967
0
    if (func == NULL) {
3968
0
      diag_set(ClientError, ER_NO_SUCH_FUNCTION, int2str(priv->object_id));
3969
0
      return -1;
3970
0
    }
3971
0
    name = func->def->name;
3972
0
    object = func->access;
3973
0
    if (func->def->uid != grantor->def->uid &&
3974
0
        grantor->def->uid != ADMIN) {
3975
0
      diag_set(AccessDeniedError,
3976
0
          priv_name(priv_type),
3977
0
          schema_object_name(SC_FUNCTION), name,
3978
0
          grantor->def->name);
3979
0
      return -1;
3980
0
    }
3981
0
    break;
3982
0
  }
3983
0
  case SC_SEQUENCE:
3984
0
  {
3985
0
    if (priv->is_entity_access)
3986
0
      break;
3987
0
    struct sequence *seq = sequence_by_id(priv->object_id);
3988
0
    if (seq == NULL) {
3989
0
      diag_set(ClientError, ER_NO_SUCH_SEQUENCE, int2str(priv->object_id));
3990
0
      return -1;
3991
0
    }
3992
0
    name = seq->def->name;
3993
0
    object = seq->access;
3994
0
    if (seq->def->uid != grantor->def->uid &&
3995
0
        grantor->def->uid != ADMIN) {
3996
0
      diag_set(AccessDeniedError,
3997
0
          priv_name(priv_type),
3998
0
          schema_object_name(SC_SEQUENCE), name,
3999
0
          grantor->def->name);
4000
0
      return -1;
4001
0
    }
4002
0
    break;
4003
0
  }
4004
0
  case SC_ROLE:
4005
0
  {
4006
0
    if (priv->is_entity_access)
4007
0
      break;
4008
0
    struct user *role = user_by_id(priv->object_id);
4009
0
    if (role == NULL || role->def->type != SC_ROLE) {
4010
0
      diag_set(ClientError, ER_NO_SUCH_ROLE,
4011
0
          role ? role->def->name :
4012
0
          int2str(priv->object_id));
4013
0
      return -1;
4014
0
    }
4015
0
    name = role->def->name;
4016
0
    object = role->access;
4017
    /*
4018
     * Only the creator of the role can grant or revoke it.
4019
     * Everyone can grant 'PUBLIC' role.
4020
     */
4021
0
    if (role->def->owner != grantor->def->uid &&
4022
0
        grantor->def->uid != ADMIN &&
4023
0
        (role->def->uid != PUBLIC || priv->access != PRIV_X)) {
4024
0
      diag_set(AccessDeniedError,
4025
0
          priv_name(priv_type),
4026
0
          schema_object_name(SC_ROLE), name,
4027
0
          grantor->def->name);
4028
0
      return -1;
4029
0
    }
4030
    /* Not necessary to do during revoke, but who cares. */
4031
0
    if (role_check(grantee, role) != 0)
4032
0
      return -1;
4033
0
    break;
4034
0
  }
4035
0
  case SC_USER:
4036
0
  {
4037
0
    if (priv->is_entity_access)
4038
0
      break;
4039
0
    struct user *user = user_by_id(priv->object_id);
4040
0
    if (user == NULL || user->def->type != SC_USER) {
4041
0
      diag_set(ClientError, ER_NO_SUCH_USER,
4042
0
          user ? user->def->name :
4043
0
          int2str(priv->object_id));
4044
0
      return -1;
4045
0
    }
4046
0
    name = user->def->name;
4047
0
    object = user->access;
4048
0
    if (user->def->owner != grantor->def->uid &&
4049
0
        grantor->def->uid != ADMIN) {
4050
0
      diag_set(AccessDeniedError,
4051
0
          priv_name(priv_type),
4052
0
          schema_object_name(SC_USER), name,
4053
0
          grantor->def->name);
4054
0
      return -1;
4055
0
    }
4056
0
    break;
4057
0
  }
4058
0
  default:
4059
0
    break;
4060
0
  }
4061
  /* Only admin may grant privileges on an entire entity. */
4062
0
  if (object == NULL && grantor->def->uid != ADMIN) {
4063
0
    diag_set(AccessDeniedError, priv_name(priv_type),
4064
0
       schema_object_name(priv->object_type), name,
4065
0
       grantor->def->name);
4066
0
    return -1;
4067
0
  }
4068
0
  if (access_check_ddl(name, grantor->def->uid, object,
4069
0
           priv->object_type, priv_type) != 0)
4070
0
    return -1;
4071
0
  if (priv->access == 0) {
4072
0
    diag_set(ClientError, ER_GRANT,
4073
0
        "the grant tuple has no privileges");
4074
0
    return -1;
4075
0
  }
4076
0
  return 0;
4077
0
}
4078
4079
/**
4080
 * Update a metadata cache object with the new access
4081
 * data. For the purpose of the rolled back statement, please refer to
4082
 * `user_reload_privs`.
4083
 */
4084
static int
4085
grant_or_revoke(struct priv_def *priv, struct txn_stmt *rolled_back_stmt)
4086
0
{
4087
0
  struct user *grantee = user_by_id(priv->grantee_id);
4088
0
  if (grantee == NULL)
4089
0
    return 0;
4090
  /*
4091
   * Grant a role to a user only when privilege type is 'execute'
4092
   * and the role is specified.
4093
   */
4094
0
  if (priv->object_type == SC_ROLE && !(priv->access & ~PRIV_X)) {
4095
0
    struct user *role = user_by_id(priv->object_id);
4096
0
    if (role == NULL || role->def->type != SC_ROLE)
4097
0
      return 0;
4098
0
    if (priv->access) {
4099
0
      if (role_grant(grantee, role) != 0)
4100
0
        return -1;
4101
0
    } else {
4102
0
      if (role_revoke(grantee, role) != 0)
4103
0
        return -1;
4104
0
    }
4105
0
  } else {
4106
0
    if (priv_grant(grantee, priv, rolled_back_stmt) != 0)
4107
0
      return -1;
4108
0
  }
4109
0
  return 0;
4110
0
}
4111
4112
/** A trigger called on rollback of grant. */
4113
static int
4114
revoke_priv(struct trigger *trigger, void *event)
4115
0
{
4116
0
  struct tuple *tuple = (struct tuple *)trigger->data;
4117
0
  struct priv_def priv;
4118
0
  if (priv_def_create_from_tuple(&priv, tuple) != 0)
4119
0
    return -1;
4120
0
  priv.access = 0;
4121
0
  if (grant_or_revoke(&priv, (struct txn_stmt *)event) != 0)
4122
0
    return -1;
4123
0
  return 0;
4124
0
}
4125
4126
/** A trigger called on rollback of revoke or modify. */
4127
static int
4128
modify_priv(struct trigger *trigger, void *event)
4129
0
{
4130
0
  struct tuple *tuple = (struct tuple *)trigger->data;
4131
0
  struct priv_def priv;
4132
0
  if (priv_def_create_from_tuple(&priv, tuple) != 0 ||
4133
0
      grant_or_revoke(&priv, (struct txn_stmt *)event) != 0)
4134
0
    return -1;
4135
0
  return 0;
4136
0
}
4137
4138
/**
4139
 * A trigger invoked on replace in the space containing
4140
 * all granted privileges.
4141
 */
4142
static int
4143
on_replace_dd_priv(struct trigger * /* trigger */, void *event)
4144
0
{
4145
0
  struct txn *txn = (struct txn *) event;
4146
0
  struct txn_stmt *stmt = txn_current_stmt(txn);
4147
0
  struct tuple *old_tuple = stmt->old_tuple;
4148
0
  struct tuple *new_tuple = stmt->new_tuple;
4149
0
  struct priv_def priv;
4150
4151
0
  if (new_tuple != NULL && old_tuple == NULL) { /* grant */
4152
0
    if (priv_def_create_from_tuple(&priv, new_tuple) != 0 ||
4153
0
        priv_def_check(&priv, PRIV_GRANT) != 0 ||
4154
0
        grant_or_revoke(&priv, NULL) != 0)
4155
0
      return -1;
4156
0
    struct trigger *on_rollback =
4157
0
      txn_alter_trigger_new(revoke_priv, new_tuple);
4158
0
    if (on_rollback == NULL)
4159
0
      return -1;
4160
0
    txn_stmt_on_rollback(stmt, on_rollback);
4161
0
  } else if (new_tuple == NULL) {                /* revoke */
4162
0
    assert(old_tuple);
4163
0
    if (priv_def_create_from_tuple(&priv, old_tuple) != 0 ||
4164
0
        priv_def_check(&priv, PRIV_REVOKE) != 0)
4165
0
      return -1;
4166
0
    priv.access = 0;
4167
0
    if (grant_or_revoke(&priv, NULL) != 0)
4168
0
      return -1;
4169
0
    struct trigger *on_rollback =
4170
0
      txn_alter_trigger_new(modify_priv, old_tuple);
4171
0
    if (on_rollback == NULL)
4172
0
      return -1;
4173
0
    txn_stmt_on_rollback(stmt, on_rollback);
4174
0
  } else {                                       /* modify */
4175
0
    if (priv_def_create_from_tuple(&priv, new_tuple) != 0 ||
4176
0
        priv_def_check(&priv, PRIV_GRANT) != 0 ||
4177
0
        grant_or_revoke(&priv, NULL) != 0)
4178
0
      return -1;
4179
0
    struct trigger *on_rollback =
4180
0
      txn_alter_trigger_new(modify_priv, old_tuple);
4181
0
    if (on_rollback == NULL)
4182
0
      return -1;
4183
0
    txn_stmt_on_rollback(stmt, on_rollback);
4184
0
  }
4185
0
  return 0;
4186
0
}
4187
4188
/* }}} access control */
4189
4190
/* {{{ cluster configuration */
4191
4192
/** Set replicaset UUID on _schema commit. */
4193
static int
4194
on_commit_replicaset_uuid(struct trigger *trigger, void * /* event */)
4195
0
{
4196
0
  const struct tt_uuid *uuid = (typeof(uuid))trigger->data;
4197
0
  if (tt_uuid_is_equal(&REPLICASET_UUID, uuid))
4198
0
    return 0;
4199
0
  REPLICASET_UUID = *uuid;
4200
0
  box_broadcast_id();
4201
0
  say_info("replicaset uuid %s", tt_uuid_str(uuid));
4202
0
  return 0;
4203
0
}
4204
4205
/** Set replicaset name on _schema commit. */
4206
static int
4207
on_commit_replicaset_name(struct trigger *trigger, void * /* event */)
4208
0
{
4209
0
  const char *name = (typeof(name))trigger->data;
4210
0
  if (strcmp(REPLICASET_NAME, name) == 0)
4211
0
    return 0;
4212
0
  strlcpy(REPLICASET_NAME, name, NODE_NAME_SIZE_MAX);
4213
0
  box_broadcast_id();
4214
0
  say_info("replicaset name: %s", node_name_str(name));
4215
0
  return 0;
4216
0
}
4217
4218
static int
4219
start_synchro_filtering(va_list /* ap */)
4220
0
{
4221
0
  txn_limbo_filter_enable(&txn_limbo);
4222
0
  return 0;
4223
0
}
4224
4225
static int
4226
stop_synchro_filtering(va_list /* ap */)
4227
0
{
4228
0
  txn_limbo_filter_disable(&txn_limbo);
4229
0
  return 0;
4230
0
}
4231
4232
/** Data passed to on_commit_dd_version trigger. */
4233
struct on_commit_dd_version_data {
4234
  /** A fiber to perform async work after commit. */
4235
  struct fiber *fiber;
4236
  /** New version. */
4237
  uint32_t version_id;
4238
};
4239
4240
/**
4241
 * Update the cached schema version and enable version-dependent features, like
4242
 * split-brain detection. Reenabling is done asynchronously by a separate fiber
4243
 * prepared by on_replace trigger.
4244
 */
4245
static int
4246
on_commit_dd_version(struct trigger *trigger, void * /* event */)
4247
0
{
4248
0
  struct on_commit_dd_version_data *data =
4249
0
    (struct on_commit_dd_version_data *)trigger->data;
4250
0
  dd_version_id = data->version_id;
4251
0
  struct fiber *fiber = data->fiber;
4252
0
  if (fiber != NULL)
4253
0
    fiber_wakeup(fiber);
4254
0
  box_broadcast_status();
4255
0
  return 0;
4256
0
}
4257
4258
/** Set cluster name on _schema commit. */
4259
static int
4260
on_commit_cluster_name(struct trigger *trigger, void * /* event */)
4261
0
{
4262
0
  const char *name = (typeof(name))trigger->data;
4263
0
  if (strcmp(CLUSTER_NAME, name) == 0)
4264
0
    return 0;
4265
0
  strlcpy(CLUSTER_NAME, name, NODE_NAME_SIZE_MAX);
4266
0
  box_broadcast_id();
4267
0
  say_info("cluster name: %s", node_name_str(name));
4268
0
  return 0;
4269
0
}
4270
4271
/**
4272
 * This trigger implements the "last write wins" strategy for
4273
 * "bootstrap_leader_uuid" tuple of space _schema. Comparison is performed by
4274
 * a timestamp and replica_id of the node which authored the change.
4275
 */
4276
static int
4277
before_replace_dd_schema(struct trigger * /* trigger */, void *event)
4278
0
{
4279
0
  struct txn *txn = (struct txn *)event;
4280
0
  struct txn_stmt *stmt = txn_current_stmt(txn);
4281
0
  struct tuple *old_tuple = stmt->old_tuple;
4282
0
  struct tuple *new_tuple = stmt->new_tuple;
4283
0
  const char *key = tuple_field_cstr(new_tuple != NULL ?
4284
0
             new_tuple : old_tuple,
4285
0
             BOX_SCHEMA_FIELD_KEY);
4286
0
  if (key == NULL)
4287
0
    return -1;
4288
0
  if (strcmp(key, "bootstrap_leader_uuid") == 0) {
4289
0
    uint64_t old_ts = 0;
4290
0
    uint32_t old_id = 0;
4291
0
    uint64_t new_ts = UINT64_MAX;
4292
0
    uint32_t new_id = UINT32_MAX;
4293
0
    int ts_field = BOX_SCHEMA_FIELD_VALUE + 1;
4294
0
    int id_field = BOX_SCHEMA_FIELD_VALUE + 2;
4295
    /*
4296
     * Assume anything can be stored in old_tuple, so do not require
4297
     * it to have a timestamp or replica_id. In contrary to that,
4298
     * always require new tuple to have a valid timestamp and
4299
     * replica_id.
4300
     */
4301
0
    if (old_tuple != NULL) {
4302
0
      const char *field = tuple_field(old_tuple, ts_field);
4303
0
      if (field != NULL && mp_typeof(*field) == MP_UINT)
4304
0
        old_ts = mp_decode_uint(&field);
4305
0
      field = tuple_field(old_tuple, id_field);
4306
0
      if (field != NULL && mp_typeof(*field) == MP_UINT)
4307
0
        old_id = mp_decode_uint(&field);
4308
0
    }
4309
0
    if (new_tuple != NULL &&
4310
0
        (tuple_field_u64(new_tuple, ts_field, &new_ts) != 0 ||
4311
0
         tuple_field_u32(new_tuple, id_field, &new_id) != 0)) {
4312
0
      return -1;
4313
0
    }
4314
0
    if (new_ts < old_ts || (new_ts == old_ts && new_id < old_id)) {
4315
0
      say_info("Ignore the replace of tuple %s with %s in "
4316
0
         "space _schema: the former has a newer "
4317
0
         "timestamp", tuple_str(old_tuple),
4318
0
         tuple_str(new_tuple));
4319
0
      goto return_old;
4320
0
    }
4321
0
  }
4322
0
  return 0;
4323
0
return_old:
4324
0
  if (new_tuple != NULL)
4325
0
    tuple_unref(new_tuple);
4326
0
  if (old_tuple != NULL)
4327
0
    tuple_ref(old_tuple);
4328
0
  stmt->new_tuple = old_tuple;
4329
0
  return 0;
4330
0
}
4331
4332
/** An on_commit trigger to update bootstrap leader uuid. */
4333
static int
4334
on_commit_schema_set_bootstrap_leader_uuid(struct trigger *trigger, void *event)
4335
0
{
4336
0
  (void)event;
4337
0
  struct tt_uuid *uuid = (struct tt_uuid *)trigger->data;
4338
0
  bootstrap_leader_uuid = *uuid;
4339
4340
  /*
4341
   * Sync the supervised bootstrap flags.
4342
   *
4343
   * is_supervised_bootstrap_leader has to be consistent
4344
   * with bootstrap_leader_uuid.
4345
   *
4346
   * The graceful bootstrap request is no-op, when the
4347
   * database is already bootstrapped. If we met the flag
4348
   * here, it is likely that
4349
   * box.ctl.make_bootstrap_leader({graceful = true}) is
4350
   * issued during the bootstrap or recovery process. It
4351
   * means that request is not actual anymore: we either
4352
   * already the bootstrap leader or another peer bootstraps
4353
   * us.
4354
   */
4355
0
  is_supervised_bootstrap_leader = tt_uuid_is_equal(uuid, &INSTANCE_UUID);
4356
0
  is_graceful_supervised_bootstrap_requested = false;
4357
4358
0
  say_info("instance %s is assigned as a bootstrap leader",
4359
0
     tt_uuid_str(uuid));
4360
0
  box_broadcast_ballot();
4361
0
  return 0;
4362
0
}
4363
4364
/**
4365
 * This trigger is invoked only upon initial recovery, when
4366
 * reading contents of the system spaces from the snapshot.
4367
 *
4368
 * Before a cluster is assigned a cluster id it's read only.
4369
 * Since during recovery state of the WAL doesn't
4370
 * concern us, we can safely change the cluster id in before-replace
4371
 * event, not in after-replace event.
4372
 */
4373
static int
4374
on_replace_dd_schema(struct trigger * /* trigger */, void *event)
4375
0
{
4376
0
  struct txn *txn = (struct txn *) event;
4377
0
  struct txn_stmt *stmt = txn_current_stmt(txn);
4378
0
  struct tuple *old_tuple = stmt->old_tuple;
4379
0
  struct tuple *new_tuple = stmt->new_tuple;
4380
0
  const char *key = tuple_field_cstr(new_tuple ? new_tuple : old_tuple,
4381
0
             BOX_SCHEMA_FIELD_KEY);
4382
0
  if (key == NULL)
4383
0
    return -1;
4384
0
  if (strcmp(key, "cluster") == 0 ||
4385
0
      strcmp(key, "replicaset_uuid") == 0) {
4386
0
    if (new_tuple == NULL) {
4387
      /*
4388
       * At least one of the keys has to stay. Or the
4389
       * replicaset UUID would be lost after restart.
4390
       */
4391
0
      const char *other_key = strcmp(key, "cluster") == 0 ?
4392
0
        "replicaset_uuid" : "cluster";
4393
0
      char mpkey[64];
4394
0
      char *mpkey_end = mp_encode_array(mpkey, 1);
4395
0
      mpkey_end = mp_encode_str0(mpkey_end, other_key);
4396
0
      struct tuple *other = NULL;
4397
0
      if (box_index_get(BOX_SCHEMA_ID, 0, mpkey, mpkey_end,
4398
0
            &other) != 0)
4399
0
        return -1;
4400
0
      if (other == NULL) {
4401
0
        diag_set(ClientError, ER_REPLICASET_UUID_IS_RO);
4402
0
        return -1;
4403
0
      }
4404
      /*
4405
       * Deletion of the old key is allowed for upgrade.
4406
       * Deletion of the new one is needed for downgrade.
4407
       * Can't ban either.
4408
       */
4409
0
      return 0;
4410
0
    }
4411
0
    tt_uuid uu;
4412
0
    if (tuple_field_uuid(new_tuple, BOX_SCHEMA_FIELD_VALUE, &uu) != 0)
4413
0
      return -1;
4414
0
    if (!tt_uuid_is_nil(&REPLICASET_UUID) &&
4415
0
        !tt_uuid_is_equal(&REPLICASET_UUID, &uu)) {
4416
0
      diag_set(ClientError, ER_REPLICASET_UUID_IS_RO);
4417
0
      return -1;
4418
0
    }
4419
0
    struct tt_uuid *uuid_copy = xregion_alloc_object(
4420
0
      &txn->region, typeof(*uuid_copy));
4421
0
    *uuid_copy = uu;
4422
0
    struct trigger *on_commit = txn_alter_trigger_new(
4423
0
      on_commit_replicaset_uuid, (void *)uuid_copy);
4424
0
    if (on_commit == NULL)
4425
0
      return -1;
4426
0
    txn_stmt_on_commit(stmt, on_commit);
4427
0
  } else if (strcmp(key, "version") == 0) {
4428
0
    uint32_t version = 0;
4429
0
    if (new_tuple != NULL) {
4430
0
      uint32_t major, minor, patch;
4431
0
      if (tuple_field_u32(new_tuple, 1, &major) != 0 ||
4432
0
          tuple_field_u32(new_tuple, 2, &minor) != 0) {
4433
0
        diag_set(ClientError, ER_WRONG_DD_VERSION);
4434
0
        return -1;
4435
0
      }
4436
      /* Version can be major.minor with no patch. */
4437
0
      if (tuple_field_u32(new_tuple, 3, &patch) != 0)
4438
0
        patch = 0;
4439
0
      version = version_id(major, minor, patch);
4440
0
    } else {
4441
0
      assert(old_tuple != NULL);
4442
      /*
4443
       * _schema:delete({'version'}) for
4444
       * example, for box.internal.bootstrap().
4445
       */
4446
0
      version = tarantool_version_id();
4447
0
    }
4448
0
    struct on_commit_dd_version_data *data = xregion_alloc_object(
4449
0
      &txn->region, typeof(*data));
4450
0
    data->version_id = version;
4451
0
    data->fiber = NULL;
4452
0
    struct trigger *on_commit = txn_alter_trigger_new(
4453
0
      on_commit_dd_version, data);
4454
0
    if (on_commit == NULL)
4455
0
      return -1;
4456
0
    txn_stmt_on_commit(stmt, on_commit);
4457
0
    if (recovery_state != FINISHED_RECOVERY) {
4458
0
      return 0;
4459
0
    }
4460
    /*
4461
     * Set data->fiber after on_commit is created, because we can't
4462
     * remove a not-yet-run fiber in case of on_commit creation
4463
     * failure.
4464
     */
4465
0
    struct fiber *fiber = NULL;
4466
0
    if (version > version_id(2, 10, 1) &&
4467
0
        recovery_state == FINISHED_RECOVERY) {
4468
0
      fiber = fiber_new_system("synchro_filter_enabler",
4469
0
             start_synchro_filtering);
4470
0
      if (fiber == NULL)
4471
0
        return -1;
4472
0
    } else if (version <= version_id(2, 10, 1) &&
4473
0
         recovery_state == FINISHED_RECOVERY) {
4474
0
      fiber = fiber_new_system("synchro_filter_disabler",
4475
0
             stop_synchro_filtering);
4476
0
      if (fiber == NULL)
4477
0
        return -1;
4478
0
    }
4479
0
    data->fiber = fiber;
4480
    /*
4481
     * When upgrading to 3.3.0, new local space _gc_consumers
4482
     * is created on each replica and we need to fill it with
4483
     * consumers of already connected replicas since they won't
4484
     * be inserted to the space automatically.
4485
     * We cannot do it in `box.schema.upgrade` procedure since
4486
     * it's called on master and the space is created on each
4487
     * replica.
4488
     */
4489
0
    if (version == version_id(3, 3, 0) &&
4490
0
        gc_persist_consumers() != 0)
4491
0
      return -1;
4492
0
  } else if (strcmp(key, "bootstrap_leader_uuid") == 0) {
4493
0
    struct tt_uuid *uuid = xregion_alloc_object(&txn->region,
4494
0
                  typeof(*uuid));
4495
0
    if (!new_tuple) {
4496
0
      *uuid = uuid_nil;
4497
0
    } else if (tuple_field_uuid(new_tuple, BOX_SCHEMA_FIELD_VALUE,
4498
0
              uuid) != 0) {
4499
0
      return -1;
4500
0
    }
4501
0
    struct trigger *on_commit = txn_alter_trigger_new(
4502
0
      on_commit_schema_set_bootstrap_leader_uuid, uuid);
4503
0
    if (on_commit == NULL)
4504
0
      return -1;
4505
0
    txn_on_commit(txn, on_commit);
4506
0
  } else if (strcmp(key, "cluster_name") == 0) {
4507
0
    char name[NODE_NAME_SIZE_MAX];
4508
0
    const char *field_name = "_schema['cluster_name'].value";
4509
0
    if (tuple_field_node_name(name, new_tuple,
4510
0
            BOX_SCHEMA_FIELD_VALUE,
4511
0
            field_name) != 0)
4512
0
      return -1;
4513
0
    if (box_is_configured() && *CLUSTER_NAME != 0 &&
4514
0
        strcmp(name, CLUSTER_NAME) != 0) {
4515
0
      if (!box_is_force_recovery) {
4516
0
        diag_set(ClientError, ER_UNSUPPORTED,
4517
0
           "Tarantool", "cluster name change "
4518
0
           "(without 'force_recovery')");
4519
0
        return -1;
4520
0
      }
4521
0
      say_info("cluster rename allowed by 'force_recovery'");
4522
0
    }
4523
0
    size_t size = strlen(name) + 1;
4524
0
    char *name_copy = (char *)xregion_alloc(&txn->region, size);
4525
0
    memcpy(name_copy, name, size);
4526
0
    struct trigger *on_commit = txn_alter_trigger_new(
4527
0
      on_commit_cluster_name, name_copy);
4528
0
    if (on_commit == NULL)
4529
0
      return -1;
4530
0
    txn_stmt_on_commit(stmt, on_commit);
4531
0
  } else if (strcmp(key, "replicaset_name") == 0) {
4532
0
    char name[NODE_NAME_SIZE_MAX];
4533
0
    const char *field_name = "_schema['replicaset_name'].value";
4534
0
    if (tuple_field_node_name(name, new_tuple,
4535
0
            BOX_SCHEMA_FIELD_VALUE,
4536
0
            field_name) != 0)
4537
0
      return -1;
4538
0
    if (box_is_configured() && *REPLICASET_NAME != 0 &&
4539
0
        strcmp(name, REPLICASET_NAME) != 0) {
4540
0
      if (!box_is_force_recovery) {
4541
0
        diag_set(ClientError, ER_UNSUPPORTED,
4542
0
           "Tarantool", "replicaset name change "
4543
0
           "(without 'force_recovery')");
4544
0
        return -1;
4545
0
      }
4546
0
      say_info("replicaset name mismatch, "
4547
0
         "ignore due to 'force_recovery'");
4548
0
    }
4549
0
    size_t size = strlen(name) + 1;
4550
0
    char *name_copy = (char *)xregion_alloc(&txn->region, size);
4551
0
    memcpy(name_copy, name, size);
4552
0
    struct trigger *on_commit = txn_alter_trigger_new(
4553
0
      on_commit_replicaset_name, name_copy);
4554
0
    if (on_commit == NULL)
4555
0
      return -1;
4556
0
    txn_stmt_on_commit(stmt, on_commit);
4557
0
  }
4558
0
  return 0;
4559
0
}
4560
4561
/** Unregister the replica affected by the change. */
4562
static int
4563
on_replace_cluster_clear_id(struct trigger *trigger, void *event)
4564
0
{
4565
0
  struct replica *replica = (struct replica *)trigger->data;
4566
0
  struct txn_stmt *stmt = (struct txn_stmt *)event;
4567
0
  (void)stmt;
4568
0
  if (replica->id == instance_id) {
4569
0
    assert(stmt->row->replica_id != 0 ||
4570
0
           recovery_state != FINISHED_RECOVERY);
4571
0
    if (recovery_state == FINISHED_RECOVERY) {
4572
0
      diag_set(ClientError, ER_LOCAL_INSTANCE_ID_IS_READ_ONLY,
4573
0
         (unsigned)instance_id);
4574
0
      struct diag *diag = diag_get();
4575
0
      replicaset_foreach(replica) {
4576
0
        if (replica->applier != NULL)
4577
0
          applier_kill(replica->applier,
4578
0
                 diag_last_error(diag));
4579
0
        if (replica->relay != NULL)
4580
0
          relay_cancel(replica->relay);
4581
0
      }
4582
0
      diag_clear(diag);
4583
0
    }
4584
0
  }
4585
0
  replica_clear_id(replica);
4586
0
  return 0;
4587
0
}
4588
4589
/** Update the synchronous replication quorum. */
4590
static int
4591
on_replace_cluster_update_quorum(struct trigger * /* trigger */,
4592
         void * /* event */)
4593
0
{
4594
0
  box_update_replication_synchro_quorum();
4595
0
  return 0;
4596
0
}
4597
4598
/** Replica definition. */
4599
struct replica_def {
4600
  /** Instance ID. */
4601
  uint32_t id;
4602
  /** Instance UUID. */
4603
  struct tt_uuid uuid;
4604
  /** Instance name. */
4605
  char name[NODE_NAME_SIZE_MAX];
4606
};
4607
4608
/** Build replica definition from a _cluster's tuple. */
4609
static struct replica_def *
4610
replica_def_new_from_tuple(struct tuple *tuple, struct region *region)
4611
0
{
4612
0
  struct replica_def *def = xregion_alloc_object(region, typeof(*def));
4613
0
  memset(def, 0, sizeof(*def));
4614
0
  if (tuple_field_u32(tuple, BOX_CLUSTER_FIELD_ID, &def->id) != 0)
4615
0
    return NULL;
4616
0
  if (replica_check_id(def->id) != 0)
4617
0
    return NULL;
4618
0
  if (tuple_field_uuid(tuple, BOX_CLUSTER_FIELD_UUID, &def->uuid) != 0)
4619
0
    return NULL;
4620
0
  if (tt_uuid_is_nil(&def->uuid)) {
4621
0
    diag_set(ClientError, ER_INVALID_UUID, tt_uuid_str(&def->uuid));
4622
0
    return NULL;
4623
0
  }
4624
0
  if (tuple_field_node_name(def->name, tuple, BOX_CLUSTER_FIELD_NAME,
4625
0
          "_cluster.name") != 0)
4626
0
    return NULL;
4627
0
  return def;
4628
0
}
4629
4630
/** Add an instance on commit/rollback. */
4631
static int
4632
on_replace_cluster_add_replica(struct trigger *trigger, void * /* event */)
4633
0
{
4634
0
  const struct replica_def *def = (typeof(def))trigger->data;
4635
0
  struct replica *r = replicaset_add(def->id, &def->uuid);
4636
0
  replica_set_name(r, def->name);
4637
0
  return 0;
4638
0
}
4639
4640
/** Set instance name on commit/rollback. */
4641
static int
4642
on_replace_cluster_set_name(struct trigger *trigger, void * /* event */)
4643
0
{
4644
0
  const struct replica_def *def = (typeof(def))trigger->data;
4645
0
  struct replica *replica = replica_by_id(def->id);
4646
0
  if (replica == NULL)
4647
0
    panic("Couldn't find a replica in _cluster in txn trigger");
4648
0
  const struct replica *other = replica_by_name(def->name);
4649
0
  if (replica == other)
4650
0
    return 0;
4651
  /*
4652
   * The old name shouldn't be possible to be occupied. It would mean some
4653
   * other _cluster txn took it and then yielded, allowing this trigger to
4654
   * run. But _cluster txns are rolled back on yield. So this other txn
4655
   * would be rolled back before this trigger. Hence this situation is not
4656
   * reachable.
4657
   */
4658
0
  if (other != NULL)
4659
0
    panic("Duplicate replica name managed to slip through txns");
4660
0
  replica_set_name(replica, def->name);
4661
0
  return 0;
4662
0
}
4663
4664
/** Set instance name on _cluster update. */
4665
static int
4666
on_replace_dd_cluster_set_name(struct replica *replica,
4667
             const char *new_name)
4668
0
{
4669
0
  struct txn_stmt *stmt = txn_current_stmt(in_txn());
4670
0
  assert(replica->id != REPLICA_ID_NIL);
4671
0
  if (strcmp(replica->name, new_name) == 0)
4672
0
    return 0;
4673
0
  if (tt_uuid_is_equal(&replica->uuid, &INSTANCE_UUID) &&
4674
0
      box_is_configured() && *INSTANCE_NAME != 0 &&
4675
0
      strcmp(new_name, INSTANCE_NAME) != 0) {
4676
0
    if (!box_is_force_recovery) {
4677
0
      diag_set(ClientError, ER_UNSUPPORTED, "Tarantool",
4678
0
         "replica rename or name drop");
4679
0
      return -1;
4680
0
    }
4681
0
    say_info("replica rename allowed due to 'force_recovery'");
4682
0
  }
4683
0
  const struct replica *other = replica_by_name(new_name);
4684
0
  if (other != NULL) {
4685
0
    diag_set(ClientError, ER_INSTANCE_NAME_DUPLICATE,
4686
0
       node_name_str(new_name), tt_uuid_str(&other->uuid));
4687
0
    return -1;
4688
0
  }
4689
0
  struct replica_def *def = xregion_alloc_object(
4690
0
    &in_txn()->region, typeof(*def));
4691
0
  memset(def, 0, sizeof(*def));
4692
0
  def->id = replica->id;
4693
0
  strlcpy(def->name, replica->name, NODE_NAME_SIZE_MAX);
4694
0
  struct trigger *on_rollback = txn_alter_trigger_new(
4695
0
    on_replace_cluster_set_name, (void *)def);
4696
0
  if (on_rollback == NULL)
4697
0
    return -1;
4698
0
  txn_stmt_on_rollback(stmt, on_rollback);
4699
  /*
4700
   * Set the new name now so as newer transactions couldn't take it too
4701
   * while this one is going to WAL.
4702
   */
4703
0
  replica_set_name(replica, new_name);
4704
0
  return 0;
4705
0
}
4706
4707
/** Set instance UUID on _cluster update. */
4708
static int
4709
on_replace_dd_cluster_set_uuid(struct replica *replica,
4710
             const struct replica_def *new_def)
4711
0
{
4712
0
  struct replica *old_replica = replica;
4713
0
  if (replica_has_connections(old_replica)) {
4714
0
    diag_set(ClientError, ER_UNSUPPORTED, "Replica",
4715
0
       "UUID update when the old replica is still here");
4716
0
    return -1;
4717
0
  }
4718
0
  if (*old_replica->name == 0) {
4719
0
    diag_set(ClientError, ER_UNSUPPORTED, "Replica without a name",
4720
0
       "UUID update");
4721
0
    return -1;
4722
0
  }
4723
0
  struct replica *new_replica = replica_by_uuid(&new_def->uuid);
4724
0
  if (new_replica != NULL && new_replica->id != REPLICA_ID_NIL) {
4725
0
    diag_set(ClientError, ER_UNSUPPORTED, "Replica",
4726
0
       "UUID update when the new UUID is already registered");
4727
0
    return -1;
4728
0
  }
4729
0
  if (strcmp(new_def->name, replica->name) != 0) {
4730
0
    diag_set(ClientError, ER_UNSUPPORTED, "Replica",
4731
0
       "UUID and name update together");
4732
0
    return -1;
4733
0
  }
4734
0
  struct trigger *on_rollback_drop_new = txn_alter_trigger_new(
4735
0
    on_replace_cluster_clear_id, NULL);
4736
0
  struct trigger *on_rollback_add_old = txn_alter_trigger_new(
4737
0
    on_replace_cluster_add_replica, NULL);
4738
0
  if (on_rollback_drop_new == NULL || on_rollback_add_old == NULL)
4739
0
    return -1;
4740
0
  struct replica_def *old_def = xregion_alloc_object(
4741
0
    &in_txn()->region, typeof(*old_def));
4742
0
  memset(old_def, 0, sizeof(*old_def));
4743
0
  old_def->id = old_replica->id;
4744
0
  strlcpy(old_def->name, old_replica->name, NODE_NAME_SIZE_MAX);
4745
0
  old_def->uuid = old_replica->uuid;
4746
4747
0
  replica_clear_id(old_replica);
4748
0
  if (replica_by_uuid(&old_def->uuid) != NULL)
4749
0
    panic("Replica with old UUID wasn't deleted");
4750
0
  if (new_replica == NULL)
4751
0
    new_replica = replicaset_add(new_def->id, &new_def->uuid);
4752
0
  else
4753
0
    replica_set_id(new_replica, new_def->id);
4754
0
  replica_set_name(new_replica, old_def->name);
4755
0
  on_rollback_drop_new->data = new_replica;
4756
0
  on_rollback_add_old->data = old_def;
4757
0
  struct txn_stmt *stmt = txn_current_stmt(in_txn());
4758
0
  txn_stmt_on_rollback(stmt, on_rollback_drop_new);
4759
0
  txn_stmt_on_rollback(stmt, on_rollback_add_old);
4760
0
  return 0;
4761
0
}
4762
4763
/** _cluster update - both old and new tuples are present. */
4764
static int
4765
on_replace_dd_cluster_update(const struct replica_def *old_def,
4766
           const struct replica_def *new_def)
4767
0
{
4768
0
  assert(new_def->id == old_def->id);
4769
0
  struct replica *replica = replica_by_id(new_def->id);
4770
0
  if (replica == NULL)
4771
0
    panic("Found a _cluster tuple not having a replica");
4772
0
  if (!tt_uuid_is_equal(&new_def->uuid, &old_def->uuid)) {
4773
0
    if (tt_uuid_is_equal(&old_def->uuid, &INSTANCE_UUID)) {
4774
0
      diag_set(ClientError, ER_UNSUPPORTED, "Replica",
4775
0
         "own UUID update in _cluster");
4776
0
      return -1;
4777
0
    }
4778
    /*
4779
     * Drop gc_consumer for the replaced replica.
4780
     * See `on_replace_dd_cluster_delete` for explanation why we
4781
     * don't drop consumer on recovery.
4782
     */
4783
0
    if (recovery_state == FINISHED_RECOVERY &&
4784
0
        gc_erase_consumer(&replica->uuid) != 0)
4785
0
      return -1;
4786
0
    if (on_replace_dd_cluster_set_uuid(replica, new_def) != 0)
4787
0
      return -1;
4788
    /* The replica was re-created. */
4789
0
    replica = replica_by_id(new_def->id);
4790
0
  }
4791
0
  return on_replace_dd_cluster_set_name(replica, new_def->name);
4792
0
}
4793
4794
/** _cluster insert - only a new tuple is present. */
4795
static int
4796
on_replace_dd_cluster_insert(const struct replica_def *new_def)
4797
0
{
4798
0
  struct txn_stmt *stmt = txn_current_stmt(in_txn());
4799
0
  struct replica *replica = replica_by_id(new_def->id);
4800
  /*
4801
   * With read-views enabled there might be already a replica whose
4802
   * registration is in progress in another transaction. With the same
4803
   * replica ID.
4804
   */
4805
0
  if (replica != NULL) {
4806
0
    const char *msg = tt_sprintf(
4807
0
      "more than 1 replica with the same ID %u: "
4808
0
      "new uuid - %s, old uuid - %s", new_def->id,
4809
0
      tt_uuid_str(&new_def->uuid),
4810
0
      tt_uuid_str(&replica->uuid));
4811
0
    diag_set(ClientError, ER_UNSUPPORTED, "Tarantool", msg);
4812
0
    return -1;
4813
0
  }
4814
0
  replica = replica_by_name(new_def->name);
4815
0
  if (replica != NULL) {
4816
0
    diag_set(ClientError, ER_INSTANCE_NAME_DUPLICATE,
4817
0
       node_name_str(new_def->name),
4818
0
       tt_uuid_str(&replica->uuid));
4819
0
    return -1;
4820
0
  }
4821
  /*
4822
   * Update the quorum only after commit. Otherwise the replica would have
4823
   * to ack its own insertion.
4824
   */
4825
0
  struct trigger *on_commit = txn_alter_trigger_new(
4826
0
    on_replace_cluster_update_quorum, NULL);
4827
0
  if (on_commit == NULL)
4828
0
    return -1;
4829
0
  txn_stmt_on_commit(stmt, on_commit);
4830
0
  struct trigger *on_rollback = txn_alter_trigger_new(
4831
0
    on_replace_cluster_clear_id, NULL);
4832
0
  if (on_rollback == NULL)
4833
0
    return -1;
4834
  /*
4835
   * Register the replica before commit so as to occupy the replica ID
4836
   * now. While WAL write is in progress, new replicas might come, they
4837
   * should see the ID is already in use.
4838
   */
4839
0
  replica = replica_by_uuid(&new_def->uuid);
4840
0
  if (replica != NULL)
4841
0
    replica_set_id(replica, new_def->id);
4842
0
  else
4843
0
    replica = replicaset_add(new_def->id, &new_def->uuid);
4844
0
  on_rollback->data = replica;
4845
0
  txn_stmt_on_rollback(stmt, on_rollback);
4846
0
  return on_replace_dd_cluster_set_name(replica, new_def->name);
4847
0
}
4848
4849
/** _cluster delete - only the old tuple is present. */
4850
static int
4851
on_replace_dd_cluster_delete(const struct replica_def *old_def)
4852
0
{
4853
0
  struct txn_stmt *stmt = txn_current_stmt(in_txn());
4854
  /*
4855
   * It's okay to delete the instance id when the deletion is coming from
4856
   * the master or doing recovery (i.e., after we already applied the
4857
   * deletion from the master).
4858
   */
4859
0
  if (old_def->id == instance_id && stmt->row->replica_id == 0 &&
4860
0
      recovery_state == FINISHED_RECOVERY) {
4861
0
    diag_set(ClientError, ER_LOCAL_INSTANCE_ID_IS_READ_ONLY,
4862
0
       (unsigned)old_def->id);
4863
0
    return -1;
4864
0
  }
4865
0
  struct replica *replica = replica_by_id(old_def->id);
4866
0
  if (replica == NULL) {
4867
    /*
4868
     * Impossible, but it is important not to leave undefined
4869
     * behaviour if there is a bug. Too sensitive subsystem is
4870
     * affected.
4871
     */
4872
0
    panic("Tried to unregister a replica not stored in "
4873
0
          "replica_by_id map, id is %u, uuid is %s",
4874
0
          old_def->id, tt_uuid_str(&old_def->uuid));
4875
0
  }
4876
0
  if (!tt_uuid_is_equal(&replica->uuid, &old_def->uuid)) {
4877
0
    panic("Tried to unregister a replica with id %u, but its uuid "
4878
0
          "is different from stored internally: in space - %s, "
4879
0
          "internally - %s", old_def->id,
4880
0
          tt_uuid_str(&old_def->uuid), tt_uuid_str(&replica->uuid));
4881
0
  }
4882
  /*
4883
   * Unregister gc consumer of the replica. No-op on recovery because
4884
   * it's not safe to write to space during recovery and we don't need it
4885
   * anyway: consumer either was already deleted if it's local recovery or
4886
   * wasn't created at all if it's remote recovery since persistent
4887
   * consumers are local.
4888
   */
4889
0
  if (recovery_state == FINISHED_RECOVERY &&
4890
0
      gc_erase_consumer(&replica->uuid) != 0)
4891
0
    return -1;
4892
  /*
4893
   * Unregister only after commit. Otherwise if the transaction would be
4894
   * rolled back, there might be already another replica taken the freed
4895
   * ID.
4896
   */
4897
0
  struct trigger *on_commit = txn_alter_trigger_new(
4898
0
    on_replace_cluster_clear_id, replica);
4899
0
  if (on_commit == NULL)
4900
0
    return -1;
4901
0
  txn_stmt_on_commit(stmt, on_commit);
4902
0
  on_commit = txn_alter_trigger_new(
4903
0
    on_replace_cluster_update_quorum, replica);
4904
0
  if (on_commit == NULL)
4905
0
    return -1;
4906
0
  txn_stmt_on_commit(stmt, on_commit);
4907
0
  return 0;
4908
0
}
4909
4910
/** Space _cluster on-replace trigger. */
4911
static int
4912
on_replace_dd_cluster(struct trigger *trigger, void *event)
4913
0
{
4914
0
  (void) trigger;
4915
0
  struct txn *txn = (struct txn *) event;
4916
0
  struct txn_stmt *stmt = txn_current_stmt(txn);
4917
0
  struct replica_def *new_def = NULL;
4918
0
  struct replica_def *old_def = NULL;
4919
0
  if (stmt->new_tuple != NULL) {
4920
0
    new_def = replica_def_new_from_tuple(stmt->new_tuple,
4921
0
                 &txn->region);
4922
0
    if (new_def == NULL)
4923
0
      return -1;
4924
0
  }
4925
0
  if (stmt->old_tuple != NULL) {
4926
0
    old_def = replica_def_new_from_tuple(stmt->old_tuple,
4927
0
                 &txn->region);
4928
0
    if (old_def == NULL)
4929
0
      return -1;
4930
0
  }
4931
0
  if (new_def != NULL) {
4932
0
    if (old_def != NULL)
4933
0
      return on_replace_dd_cluster_update(old_def, new_def);
4934
0
    return on_replace_dd_cluster_insert(new_def);
4935
0
  }
4936
0
  assert(old_def != NULL);
4937
0
  return on_replace_dd_cluster_delete(old_def);
4938
0
}
4939
4940
/* }}} cluster configuration */
4941
4942
/* {{{ sequence */
4943
4944
/** Create a sequence definition from a tuple. */
4945
static struct sequence_def *
4946
sequence_def_new_from_tuple(struct tuple *tuple, uint32_t errcode)
4947
0
{
4948
0
  uint32_t name_len;
4949
0
  const char *name = tuple_field_str(tuple, BOX_USER_FIELD_NAME,
4950
0
             &name_len);
4951
0
  if (name == NULL)
4952
0
    return NULL;
4953
0
  if (name_len > BOX_NAME_MAX) {
4954
0
    diag_set(ClientError, errcode,
4955
0
        tt_cstr(name, BOX_INVALID_NAME_MAX),
4956
0
        "sequence name is too long");
4957
0
    return NULL;
4958
0
  }
4959
0
  if (identifier_check(name, name_len) != 0)
4960
0
    return NULL;
4961
0
  size_t sz = sequence_def_sizeof(name_len);
4962
0
  struct sequence_def *def = (struct sequence_def *) malloc(sz);
4963
0
  if (def == NULL) {
4964
0
    diag_set(OutOfMemory, sz, "malloc", "sequence");
4965
0
    return NULL;
4966
0
  }
4967
0
  auto def_guard = make_scoped_guard([=] { free(def); });
4968
0
  memcpy(def->name, name, name_len);
4969
0
  def->name[name_len] = '\0';
4970
0
  if (tuple_field_u32(tuple, BOX_SEQUENCE_FIELD_ID, &(def->id)) != 0)
4971
0
    return NULL;
4972
0
  if (tuple_field_u32(tuple, BOX_SEQUENCE_FIELD_UID, &(def->uid)) != 0)
4973
0
    return NULL;
4974
0
  if (tuple_field_i64(tuple, BOX_SEQUENCE_FIELD_STEP, &(def->step)) != 0)
4975
0
    return NULL;
4976
0
  if (tuple_field_i64(tuple, BOX_SEQUENCE_FIELD_MIN, &(def->min)) != 0)
4977
0
    return NULL;
4978
0
  if (tuple_field_i64(tuple, BOX_SEQUENCE_FIELD_MAX, &(def->max)) != 0)
4979
0
    return NULL;
4980
0
  if (tuple_field_i64(tuple, BOX_SEQUENCE_FIELD_START, &(def->start)) != 0)
4981
0
    return NULL;
4982
0
  if (tuple_field_i64(tuple, BOX_SEQUENCE_FIELD_CACHE, &(def->cache)) != 0)
4983
0
    return NULL;
4984
0
  if (tuple_field_bool(tuple, BOX_SEQUENCE_FIELD_CYCLE, &(def->cycle)) != 0)
4985
0
    return NULL;
4986
0
  if (def->step == 0) {
4987
0
    diag_set(ClientError, errcode, def->name,
4988
0
       "step option must be non-zero");
4989
0
    return NULL;
4990
0
  }
4991
0
  if (def->min > def->max) {
4992
0
    diag_set(ClientError, errcode, def->name,
4993
0
       "max must be greater than or equal to min");
4994
0
    return NULL;
4995
0
  }
4996
0
  if (def->start < def->min || def->start > def->max) {
4997
0
    diag_set(ClientError, errcode, def->name,
4998
0
       "start must be between min and max");
4999
0
    return NULL;
5000
0
  }
5001
0
  def_guard.is_active = false;
5002
0
  return def;
5003
0
}
5004
5005
static int
5006
on_create_sequence_rollback(struct trigger *trigger, void * /* event */)
5007
0
{
5008
  /* Remove the new sequence from the cache and delete it. */
5009
0
  struct sequence *seq = (struct sequence *)trigger->data;
5010
0
  sequence_cache_delete(seq->def->id);
5011
0
  if (trigger_run(&on_alter_sequence, seq) != 0)
5012
0
    return -1;
5013
0
  sequence_delete(seq);
5014
0
  return 0;
5015
0
}
5016
5017
static int
5018
on_drop_sequence_commit(struct trigger *trigger, void * /* event */)
5019
0
{
5020
  /* Delete the old sequence. */
5021
0
  struct sequence *seq = (struct sequence *)trigger->data;
5022
0
  sequence_delete(seq);
5023
0
  return 0;
5024
0
}
5025
5026
static int
5027
on_drop_sequence_rollback(struct trigger *trigger, void * /* event */)
5028
0
{
5029
  /* Insert the old sequence back into the cache. */
5030
0
  struct sequence *seq = (struct sequence *)trigger->data;
5031
0
  sequence_cache_insert(seq);
5032
0
  if (trigger_run(&on_alter_sequence, seq) != 0)
5033
0
    return -1;
5034
0
  return 0;
5035
0
}
5036
5037
5038
static int
5039
on_alter_sequence_commit(struct trigger *trigger, void * /* event */)
5040
0
{
5041
  /* Delete the old old sequence definition. */
5042
0
  struct sequence_def *def = (struct sequence_def *)trigger->data;
5043
0
  free(def);
5044
0
  return 0;
5045
0
}
5046
5047
static int
5048
on_alter_sequence_rollback(struct trigger *trigger, void * /* event */)
5049
0
{
5050
  /* Restore the old sequence definition. */
5051
0
  struct sequence_def *def = (struct sequence_def *)trigger->data;
5052
0
  struct sequence *seq = sequence_by_id(def->id);
5053
0
  assert(seq != NULL);
5054
0
  free(seq->def);
5055
0
  seq->def = def;
5056
0
  if (trigger_run(&on_alter_sequence, seq) != 0)
5057
0
    return -1;
5058
0
  return 0;
5059
0
}
5060
5061
/**
5062
 * A trigger invoked on replace in space _sequence.
5063
 * Used to alter a sequence definition.
5064
 */
5065
static int
5066
on_replace_dd_sequence(struct trigger * /* trigger */, void *event)
5067
0
{
5068
0
  struct txn *txn = (struct txn *) event;
5069
0
  struct txn_stmt *stmt = txn_current_stmt(txn);
5070
0
  struct tuple *old_tuple = stmt->old_tuple;
5071
0
  struct tuple *new_tuple = stmt->new_tuple;
5072
5073
0
  struct sequence_def *new_def = NULL;
5074
0
  auto def_guard = make_scoped_guard([&new_def] { free(new_def); });
5075
5076
0
  struct sequence *seq;
5077
0
  if (old_tuple == NULL && new_tuple != NULL) {   /* INSERT */
5078
0
    new_def = sequence_def_new_from_tuple(new_tuple,
5079
0
                  ER_CREATE_SEQUENCE);
5080
0
    if (new_def == NULL)
5081
0
      return -1;
5082
0
    if (access_check_ddl(new_def->name, new_def->uid, NULL,
5083
0
             SC_SEQUENCE, PRIV_C) != 0)
5084
0
      return -1;
5085
0
    struct trigger *on_rollback =
5086
0
      txn_alter_trigger_new(on_create_sequence_rollback, NULL);
5087
0
    if (on_rollback == NULL)
5088
0
      return -1;
5089
0
    seq = sequence_new(new_def);
5090
0
    sequence_cache_insert(seq);
5091
0
    on_rollback->data = seq;
5092
0
    txn_stmt_on_rollback(stmt, on_rollback);
5093
0
  } else if (old_tuple != NULL && new_tuple == NULL) { /* DELETE */
5094
0
    uint32_t id;
5095
0
    if (tuple_field_u32(old_tuple, BOX_SEQUENCE_DATA_FIELD_ID, &id) != 0)
5096
0
      return -1;
5097
0
    seq = sequence_by_id(id);
5098
0
    assert(seq != NULL);
5099
0
    if (access_check_ddl(seq->def->name, seq->def->uid, seq->access,
5100
0
             SC_SEQUENCE, PRIV_D) != 0)
5101
0
      return -1;
5102
0
    bool out;
5103
0
    if (space_has_data(BOX_SEQUENCE_DATA_ID, 0, id, &out) != 0)
5104
0
      return -1;
5105
0
    if (out) {
5106
0
      diag_set(ClientError, ER_DROP_SEQUENCE,
5107
0
          seq->def->name, "the sequence has data");
5108
0
      return -1;
5109
0
    }
5110
0
    if (space_has_data(BOX_SPACE_SEQUENCE_ID, 1, id, &out) != 0)
5111
0
      return -1;
5112
0
    if (out) {
5113
0
      diag_set(ClientError, ER_DROP_SEQUENCE,
5114
0
          seq->def->name, "the sequence is in use");
5115
0
      return -1;
5116
0
    }
5117
0
    if (schema_find_grants("sequence", seq->def->id, &out) != 0) {
5118
0
      return -1;
5119
0
    }
5120
0
    if (out) {
5121
0
      diag_set(ClientError, ER_DROP_SEQUENCE,
5122
0
          seq->def->name, "the sequence has grants");
5123
0
      return -1;
5124
0
    }
5125
0
    struct trigger *on_commit =
5126
0
      txn_alter_trigger_new(on_drop_sequence_commit, seq);
5127
0
    struct trigger *on_rollback =
5128
0
      txn_alter_trigger_new(on_drop_sequence_rollback, seq);
5129
0
    if (on_commit == NULL || on_rollback == NULL)
5130
0
      return -1;
5131
0
    sequence_cache_delete(seq->def->id);
5132
0
    txn_stmt_on_commit(stmt, on_commit);
5133
0
    txn_stmt_on_rollback(stmt, on_rollback);
5134
0
  } else {           /* UPDATE */
5135
0
    new_def = sequence_def_new_from_tuple(new_tuple,
5136
0
                  ER_ALTER_SEQUENCE);
5137
0
    if (new_def == NULL)
5138
0
      return -1;
5139
0
    seq = sequence_by_id(new_def->id);
5140
0
    assert(seq != NULL);
5141
0
    if (access_check_ddl(seq->def->name, seq->def->uid, seq->access,
5142
0
             SC_SEQUENCE, PRIV_A) != 0)
5143
0
      return -1;
5144
0
    struct trigger *on_commit =
5145
0
      txn_alter_trigger_new(on_alter_sequence_commit, seq->def);
5146
0
    struct trigger *on_rollback =
5147
0
      txn_alter_trigger_new(on_alter_sequence_rollback, seq->def);
5148
0
    if (on_commit == NULL || on_rollback == NULL)
5149
0
      return -1;
5150
0
    seq->def = new_def;
5151
0
    txn_stmt_on_commit(stmt, on_commit);
5152
0
    txn_stmt_on_rollback(stmt, on_rollback);
5153
0
  }
5154
5155
0
  def_guard.is_active = false;
5156
0
  if (trigger_run(&on_alter_sequence, seq) != 0)
5157
0
    return -1;
5158
0
  return 0;
5159
0
}
5160
5161
/** Restore the old sequence value on rollback. */
5162
static int
5163
on_drop_sequence_data_rollback(struct trigger *trigger, void * /* event */)
5164
0
{
5165
0
  struct tuple *tuple = (struct tuple *)trigger->data;
5166
0
  uint32_t id;
5167
0
  if (tuple_field_u32(tuple, BOX_SEQUENCE_DATA_FIELD_ID, &id) != 0)
5168
0
    return -1;
5169
0
  int64_t val;
5170
0
  if (tuple_field_i64(tuple, BOX_SEQUENCE_DATA_FIELD_VALUE, &val) != 0)
5171
0
    return -1;
5172
0
  struct sequence *seq = sequence_by_id(id);
5173
0
  assert(seq != NULL);
5174
0
  if (sequence_set(seq, val) != 0)
5175
0
    panic("Can't restore sequence value");
5176
0
  return 0;
5177
0
}
5178
5179
/**
5180
 * A trigger invoked on replace in space _sequence_data.
5181
 * Used to update a sequence value.
5182
 */
5183
static int
5184
on_replace_dd_sequence_data(struct trigger * /* trigger */, void *event)
5185
0
{
5186
0
  struct txn *txn = (struct txn *) event;
5187
0
  struct txn_stmt *stmt = txn_current_stmt(txn);
5188
0
  struct tuple *old_tuple = stmt->old_tuple;
5189
0
  struct tuple *new_tuple = stmt->new_tuple;
5190
5191
0
  uint32_t id;
5192
0
  if (tuple_field_u32(old_tuple ?: new_tuple, BOX_SEQUENCE_DATA_FIELD_ID,
5193
0
          &id) != 0)
5194
0
    return -1;
5195
0
  struct sequence *seq = sequence_by_id(id);
5196
0
  if (seq == NULL) {
5197
0
    diag_set(ClientError, ER_NO_SUCH_SEQUENCE, int2str(id));
5198
0
    return -1;
5199
0
  }
5200
0
  if (new_tuple != NULL) {     /* INSERT, UPDATE */
5201
0
    int64_t value;
5202
0
    if (tuple_field_i64(new_tuple, BOX_SEQUENCE_DATA_FIELD_VALUE,
5203
0
            &value) != 0)
5204
0
      return -1;
5205
0
    if (sequence_set(seq, value) != 0)
5206
0
      return -1;
5207
0
  } else {         /* DELETE */
5208
    /*
5209
     * A sequence isn't supposed to roll back to the old
5210
     * value if the transaction it was used in is aborted
5211
     * for some reason. However, if a sequence is dropped,
5212
     * we do want to restore the original sequence value
5213
     * on rollback.
5214
     */
5215
0
    struct trigger *on_rollback = txn_alter_trigger_new(
5216
0
        on_drop_sequence_data_rollback, old_tuple);
5217
0
    if (on_rollback == NULL)
5218
0
      return -1;
5219
0
    txn_stmt_on_rollback(stmt, on_rollback);
5220
0
    sequence_reset(seq);
5221
0
  }
5222
0
  return 0;
5223
0
}
5224
5225
/**
5226
 * Extract field number and path from _space_sequence tuple.
5227
 * The path is allocated using malloc().
5228
 */
5229
static int
5230
sequence_field_from_tuple(struct space *space, struct tuple *tuple,
5231
        char **path_ptr, uint32_t *out)
5232
0
{
5233
0
  struct index *pk = index_find(space, 0);
5234
0
  if (pk == NULL) {
5235
0
    return -1;
5236
0
  }
5237
0
  struct key_part *part = &pk->def->key_def->parts[0];
5238
0
  uint32_t fieldno = part->fieldno;
5239
0
  const char *path_raw = part->path;
5240
0
  uint32_t path_len = part->path_len;
5241
5242
  /* Sequence field was added in 2.2.1. */
5243
0
  if (tuple_field_count(tuple) > BOX_SPACE_SEQUENCE_FIELD_FIELDNO) {
5244
0
    if (tuple_field_u32(tuple, BOX_SPACE_SEQUENCE_FIELD_FIELDNO,
5245
0
            &fieldno) != 0)
5246
0
      return -1;
5247
0
    path_raw = tuple_field_str(tuple, BOX_SPACE_SEQUENCE_FIELD_PATH,
5248
0
             &path_len);
5249
0
    if (path_raw == NULL)
5250
0
      return -1;
5251
0
    if (path_len == 0)
5252
0
      path_raw = NULL;
5253
0
  }
5254
0
  if (index_def_check_sequence(pk->def, fieldno, path_raw, path_len,
5255
0
             space_name(space)) != 0)
5256
0
    return -1;
5257
0
  char *path = NULL;
5258
0
  if (path_raw != NULL) {
5259
0
    path = (char *)malloc(path_len + 1);
5260
0
    if (path == NULL) {
5261
0
      diag_set(OutOfMemory, path_len + 1,
5262
0
          "malloc", "sequence path");
5263
0
      return -1;
5264
0
    }
5265
0
    memcpy(path, path_raw, path_len);
5266
0
    path[path_len] = 0;
5267
0
  }
5268
0
  *path_ptr = path;
5269
0
  *out = fieldno;
5270
0
  return 0;
5271
0
}
5272
5273
/** Attach a sequence to a space on rollback in _space_sequence. */
5274
static int
5275
set_space_sequence(struct trigger *trigger, void * /* event */)
5276
0
{
5277
0
  struct tuple *tuple = (struct tuple *)trigger->data;
5278
0
  uint32_t space_id;
5279
0
  if (tuple_field_u32(tuple, BOX_SPACE_SEQUENCE_FIELD_ID, &space_id) != 0)
5280
0
    return -1;
5281
0
  uint32_t sequence_id;
5282
0
  if (tuple_field_u32(tuple, BOX_SPACE_SEQUENCE_FIELD_SEQUENCE_ID,
5283
0
          &sequence_id) != 0)
5284
0
    return -1;
5285
0
  bool is_generated;
5286
0
  if (tuple_field_bool(tuple, BOX_SPACE_SEQUENCE_FIELD_IS_GENERATED,
5287
0
    &is_generated) != 0)
5288
0
    return -1;
5289
0
  struct space *space = space_by_id(space_id);
5290
0
  assert(space != NULL);
5291
0
  struct sequence *seq = sequence_by_id(sequence_id);
5292
0
  assert(seq != NULL);
5293
0
  char *path;
5294
0
  uint32_t fieldno;
5295
0
  if (sequence_field_from_tuple(space, tuple, &path, &fieldno) != 0)
5296
0
    return -1;
5297
0
  seq->is_generated = is_generated;
5298
0
  space->sequence = seq;
5299
0
  space->sequence_fieldno = fieldno;
5300
0
  free(space->sequence_path);
5301
0
  space->sequence_path = path;
5302
0
  if (trigger_run(&on_alter_space, space) != 0)
5303
0
    return -1;
5304
0
  return 0;
5305
0
}
5306
5307
/** Detach a sequence from a space on rollback in _space_sequence. */
5308
static int
5309
clear_space_sequence(struct trigger *trigger, void * /* event */)
5310
0
{
5311
0
  struct tuple *tuple = (struct tuple *)trigger->data;
5312
0
  uint32_t space_id;
5313
0
  if (tuple_field_u32(tuple, BOX_SPACE_SEQUENCE_FIELD_ID, &space_id) != 0)
5314
0
    return -1;
5315
0
  struct space *space = space_by_id(space_id);
5316
0
  assert(space != NULL);
5317
0
  assert(space->sequence != NULL);
5318
0
  space->sequence->is_generated = false;
5319
0
  space->sequence = NULL;
5320
0
  space->sequence_fieldno = 0;
5321
0
  free(space->sequence_path);
5322
0
  space->sequence_path = NULL;
5323
0
  if (trigger_run(&on_alter_space, space) != 0)
5324
0
    return -1;
5325
0
  return 0;
5326
0
}
5327
5328
/**
5329
 * A trigger invoked on replace in space _space_sequence.
5330
 * Used to update space <-> sequence mapping.
5331
 */
5332
static int
5333
on_replace_dd_space_sequence(struct trigger * /* trigger */, void *event)
5334
0
{
5335
0
  struct txn *txn = (struct txn *) event;
5336
0
  struct txn_stmt *stmt = txn_current_stmt(txn);
5337
0
  struct tuple *tuple = stmt->new_tuple ? stmt->new_tuple : stmt->old_tuple;
5338
0
  uint32_t space_id;
5339
0
  if (tuple_field_u32(tuple, BOX_SPACE_SEQUENCE_FIELD_ID, &space_id) != 0)
5340
0
    return -1;
5341
0
  uint32_t sequence_id;
5342
0
  if (tuple_field_u32(tuple, BOX_SPACE_SEQUENCE_FIELD_SEQUENCE_ID,
5343
0
          &sequence_id) != 0)
5344
0
    return -1;
5345
0
  bool is_generated;
5346
0
  if (tuple_field_bool(tuple, BOX_SPACE_SEQUENCE_FIELD_IS_GENERATED,
5347
0
           &is_generated) != 0)
5348
0
    return -1;
5349
0
  struct space *space = space_cache_find(space_id);
5350
0
  if (space == NULL)
5351
0
    return -1;
5352
0
  if (space_is_temporary(space)) {
5353
0
    diag_set(ClientError, ER_SQL_EXECUTE,
5354
0
       "sequences are not supported for temporary spaces");
5355
0
    return -1;
5356
0
  }
5357
0
  struct sequence *seq = sequence_by_id(sequence_id);
5358
0
  if (seq == NULL) {
5359
0
    diag_set(ClientError, ER_NO_SUCH_SEQUENCE, int2str(sequence_id));
5360
0
    return -1;
5361
0
  }
5362
0
  if (stmt->new_tuple != NULL && stmt->old_tuple != NULL) {
5363
    /*
5364
     * Makes no sense to support update, it would
5365
     * complicate the code, and won't simplify
5366
     * anything else.
5367
     */
5368
0
    diag_set(ClientError, ER_UNSUPPORTED,
5369
0
       "space \"_space_sequence\"", "update");
5370
0
    return -1;
5371
0
  }
5372
0
  enum priv_type priv_type = stmt->new_tuple ? PRIV_C : PRIV_D;
5373
5374
  /* Check we have the correct access type on the sequence.  * */
5375
0
  if (is_generated || !stmt->new_tuple) {
5376
0
    if (access_check_ddl(seq->def->name, seq->def->uid, seq->access,
5377
0
             SC_SEQUENCE, priv_type) != 0)
5378
0
      return -1;
5379
0
  } else {
5380
    /*
5381
     * In case user wants to attach an existing sequence,
5382
     * check that it has read and write access.
5383
     */
5384
0
    if (access_check_ddl(seq->def->name, seq->def->uid, seq->access,
5385
0
             SC_SEQUENCE, PRIV_R) != 0)
5386
0
      return -1;
5387
0
    if (access_check_ddl(seq->def->name, seq->def->uid, seq->access,
5388
0
             SC_SEQUENCE, PRIV_W) != 0)
5389
0
      return -1;
5390
0
  }
5391
  /** Check we have alter access on space. */
5392
0
  if (access_check_ddl(space->def->name, space->def->uid, space->access,
5393
0
           SC_SPACE, PRIV_A) != 0)
5394
0
    return -1;
5395
5396
0
  if (stmt->new_tuple != NULL) {     /* INSERT, UPDATE */
5397
0
    char *sequence_path;
5398
0
    uint32_t sequence_fieldno;
5399
0
    if (sequence_field_from_tuple(space, tuple, &sequence_path,
5400
0
                &sequence_fieldno) != 0)
5401
0
      return -1;
5402
0
    auto sequence_path_guard = make_scoped_guard([=] {
5403
0
      free(sequence_path);
5404
0
    });
5405
0
    if (seq->is_generated) {
5406
0
      diag_set(ClientError, ER_ALTER_SPACE,
5407
0
          space_name(space),
5408
0
          "can not attach generated sequence");
5409
0
      return -1;
5410
0
    }
5411
0
    struct trigger *on_rollback;
5412
0
    if (stmt->old_tuple != NULL)
5413
0
      on_rollback = txn_alter_trigger_new(set_space_sequence,
5414
0
                  stmt->old_tuple);
5415
0
    else
5416
0
      on_rollback = txn_alter_trigger_new(clear_space_sequence,
5417
0
                  stmt->new_tuple);
5418
0
    if (on_rollback == NULL)
5419
0
      return -1;
5420
0
    seq->is_generated = is_generated;
5421
0
    space->sequence = seq;
5422
0
    space->sequence_fieldno = sequence_fieldno;
5423
0
    free(space->sequence_path);
5424
0
    space->sequence_path = sequence_path;
5425
0
    sequence_path_guard.is_active = false;
5426
0
    txn_stmt_on_rollback(stmt, on_rollback);
5427
0
  } else {         /* DELETE */
5428
0
    struct trigger *on_rollback;
5429
0
    on_rollback = txn_alter_trigger_new(set_space_sequence,
5430
0
                stmt->old_tuple);
5431
0
    if (on_rollback == NULL)
5432
0
      return -1;
5433
0
    assert(space->sequence == seq);
5434
0
    seq->is_generated = false;
5435
0
    space->sequence = NULL;
5436
0
    space->sequence_fieldno = 0;
5437
0
    free(space->sequence_path);
5438
0
    space->sequence_path = NULL;
5439
0
    txn_stmt_on_rollback(stmt, on_rollback);
5440
0
  }
5441
0
  if (trigger_run(&on_alter_space, space) != 0)
5442
0
    return -1;
5443
0
  return 0;
5444
0
}
5445
5446
/* }}} sequence */
5447
5448
/** Delete the new trigger on rollback of an INSERT statement. */
5449
static int
5450
on_create_trigger_rollback(struct trigger *trigger, void * /* event */)
5451
0
{
5452
0
  struct sql_trigger *old_trigger = (struct sql_trigger *)trigger->data;
5453
0
  struct sql_trigger *new_trigger;
5454
0
  int rc = sql_trigger_replace(sql_trigger_name(old_trigger),
5455
0
             sql_trigger_space_id(old_trigger),
5456
0
             NULL, &new_trigger);
5457
0
  (void)rc;
5458
0
  assert(rc == 0);
5459
0
  assert(new_trigger == old_trigger);
5460
0
  sql_trigger_delete(new_trigger);
5461
0
  return 0;
5462
0
}
5463
5464
/** Restore the old trigger on rollback of a DELETE statement. */
5465
static int
5466
on_drop_trigger_rollback(struct trigger *trigger, void * /* event */)
5467
0
{
5468
0
  struct sql_trigger *old_trigger = (struct sql_trigger *)trigger->data;
5469
0
  struct sql_trigger *new_trigger;
5470
0
  if (old_trigger == NULL)
5471
0
    return 0;
5472
0
  if (sql_trigger_replace(sql_trigger_name(old_trigger),
5473
0
        sql_trigger_space_id(old_trigger),
5474
0
        old_trigger, &new_trigger) != 0)
5475
0
    panic("Out of memory on insertion into trigger hash");
5476
0
  assert(new_trigger == NULL);
5477
0
  return 0;
5478
0
}
5479
5480
/**
5481
 * Restore the old trigger and delete the new trigger on rollback
5482
 * of a REPLACE statement.
5483
 */
5484
static int
5485
on_replace_trigger_rollback(struct trigger *trigger, void * /* event */)
5486
0
{
5487
0
  struct sql_trigger *old_trigger = (struct sql_trigger *)trigger->data;
5488
0
  struct sql_trigger *new_trigger;
5489
0
  if (sql_trigger_replace(sql_trigger_name(old_trigger),
5490
0
        sql_trigger_space_id(old_trigger),
5491
0
        old_trigger, &new_trigger) != 0)
5492
0
    panic("Out of memory on insertion into trigger hash");
5493
0
  sql_trigger_delete(new_trigger);
5494
0
  return 0;
5495
0
}
5496
5497
/**
5498
 * Trigger invoked on commit in the _trigger space.
5499
 * Drop useless old sql_trigger AST object if any.
5500
 */
5501
static int
5502
on_replace_trigger_commit(struct trigger *trigger, void * /* event */)
5503
0
{
5504
0
  struct sql_trigger *old_trigger = (struct sql_trigger *)trigger->data;
5505
0
  sql_trigger_delete(old_trigger);
5506
0
  return 0;
5507
0
}
5508
5509
/**
5510
 * A trigger invoked on replace in a space containing
5511
 * SQL triggers.
5512
 */
5513
static int
5514
on_replace_dd_trigger(struct trigger * /* trigger */, void *event)
5515
0
{
5516
0
  struct txn *txn = (struct txn *) event;
5517
0
  struct txn_stmt *stmt = txn_current_stmt(txn);
5518
0
  struct tuple *old_tuple = stmt->old_tuple;
5519
0
  struct tuple *new_tuple = stmt->new_tuple;
5520
5521
0
  struct trigger *on_rollback = txn_alter_trigger_new(NULL, NULL);
5522
0
  struct trigger *on_commit =
5523
0
    txn_alter_trigger_new(on_replace_trigger_commit, NULL);
5524
0
  if (on_commit == NULL || on_rollback == NULL)
5525
0
    return -1;
5526
5527
0
  if (old_tuple != NULL && new_tuple == NULL) {
5528
    /* DROP trigger. */
5529
0
    uint32_t trigger_name_len;
5530
0
    const char *trigger_name_src = tuple_field_str(old_tuple,
5531
0
      BOX_TRIGGER_FIELD_NAME, &trigger_name_len);
5532
0
    if (trigger_name_src == NULL)
5533
0
      return -1;
5534
0
    uint32_t space_id;
5535
0
    if (tuple_field_u32(old_tuple, BOX_TRIGGER_FIELD_SPACE_ID,
5536
0
            &space_id) != 0)
5537
0
      return -1;
5538
0
    RegionGuard region_guard(&fiber()->gc);
5539
0
    char *trigger_name = (char *)region_alloc(&fiber()->gc,
5540
0
                trigger_name_len + 1);
5541
0
    if (trigger_name == NULL)
5542
0
      return -1;
5543
0
    memcpy(trigger_name, trigger_name_src, trigger_name_len);
5544
0
    trigger_name[trigger_name_len] = 0;
5545
5546
0
    struct sql_trigger *old_trigger;
5547
0
    int rc = sql_trigger_replace(trigger_name, space_id, NULL,
5548
0
               &old_trigger);
5549
0
    (void)rc;
5550
0
    assert(rc == 0);
5551
5552
0
    on_commit->data = old_trigger;
5553
0
    on_rollback->data = old_trigger;
5554
0
    on_rollback->run = on_drop_trigger_rollback;
5555
0
  } else {
5556
    /* INSERT, REPLACE trigger. */
5557
0
    uint32_t trigger_name_len;
5558
0
    const char *trigger_name_src = tuple_field_str(new_tuple,
5559
0
      BOX_TRIGGER_FIELD_NAME, &trigger_name_len);
5560
0
    if (trigger_name_src == NULL)
5561
0
      return -1;
5562
0
    const char *space_opts = tuple_field_with_type(new_tuple,
5563
0
        BOX_TRIGGER_FIELD_OPTS,MP_MAP);
5564
0
    if (space_opts == NULL)
5565
0
      return -1;
5566
0
    struct space_opts opts;
5567
0
    struct region *region = &fiber()->gc;
5568
0
    RegionGuard region_guard(region);
5569
0
    if (space_opts_decode(&opts, space_opts, region) != 0)
5570
0
      return -1;
5571
0
    struct sql_trigger *new_trigger = sql_trigger_compile(opts.sql);
5572
0
    if (new_trigger == NULL)
5573
0
      return -1;
5574
5575
0
    auto new_trigger_guard = make_scoped_guard([=] {
5576
0
      sql_trigger_delete(new_trigger);
5577
0
    });
5578
5579
0
    const char *trigger_name = sql_trigger_name(new_trigger);
5580
0
    if (strlen(trigger_name) != trigger_name_len ||
5581
0
        memcmp(trigger_name_src, trigger_name,
5582
0
         trigger_name_len) != 0) {
5583
0
      diag_set(ClientError, ER_SQL_EXECUTE,
5584
0
          "trigger name does not match extracted "
5585
0
          "from SQL");
5586
0
      return -1;
5587
0
    }
5588
0
    uint32_t space_id;
5589
0
    if (tuple_field_u32(new_tuple, BOX_TRIGGER_FIELD_SPACE_ID,
5590
0
            &space_id) != 0)
5591
0
      return -1;
5592
0
    if (space_id != sql_trigger_space_id(new_trigger)) {
5593
0
      diag_set(ClientError, ER_SQL_EXECUTE,
5594
0
          "trigger space_id does not match the value "
5595
0
          "resolved on AST building from SQL");
5596
0
      return -1;
5597
0
    }
5598
0
    struct space *space = space_cache_find(space_id);
5599
0
    if (space != NULL && space_is_temporary(space)) {
5600
0
      diag_set(ClientError, ER_SQL_EXECUTE,
5601
0
         "triggers are not supported for "
5602
0
         "temporary spaces");
5603
0
      return -1;
5604
0
    }
5605
5606
0
    struct sql_trigger *old_trigger;
5607
0
    if (sql_trigger_replace(trigger_name,
5608
0
          sql_trigger_space_id(new_trigger),
5609
0
          new_trigger, &old_trigger) != 0)
5610
0
      return -1;
5611
5612
0
    on_commit->data = old_trigger;
5613
0
    if (old_tuple != NULL) {
5614
0
      on_rollback->data = old_trigger;
5615
0
      on_rollback->run = on_replace_trigger_rollback;
5616
0
    } else {
5617
0
      on_rollback->data = new_trigger;
5618
0
      on_rollback->run = on_create_trigger_rollback;
5619
0
    }
5620
0
    new_trigger_guard.is_active = false;
5621
0
  }
5622
5623
0
  txn_stmt_on_rollback(stmt, on_rollback);
5624
0
  txn_stmt_on_commit(stmt, on_commit);
5625
0
  box_schema_version_bump();
5626
0
  return 0;
5627
0
}
5628
5629
/** A trigger invoked on replace in the _func_index space. */
5630
static int
5631
on_replace_dd_func_index(struct trigger *trigger, void *event)
5632
0
{
5633
0
  (void) trigger;
5634
0
  struct txn *txn = (struct txn *) event;
5635
0
  struct txn_stmt *stmt = txn_current_stmt(txn);
5636
0
  struct tuple *old_tuple = stmt->old_tuple;
5637
0
  struct tuple *new_tuple = stmt->new_tuple;
5638
5639
0
  struct alter_space *alter = NULL;
5640
0
  struct func *func = NULL;
5641
0
  struct index *index;
5642
0
  struct space *space;
5643
0
  if (old_tuple == NULL && new_tuple != NULL) {
5644
0
    uint32_t space_id;
5645
0
    uint32_t index_id;
5646
0
    uint32_t fid;
5647
0
    if (tuple_field_u32(new_tuple, BOX_FUNC_INDEX_FIELD_SPACE_ID,
5648
0
            &space_id) != 0)
5649
0
      return -1;
5650
0
    if (tuple_field_u32(new_tuple, BOX_FUNC_INDEX_FIELD_INDEX_ID,
5651
0
            &index_id) != 0)
5652
0
      return -1;
5653
0
    if (tuple_field_u32(new_tuple, BOX_FUNC_INDEX_FUNCTION_ID,
5654
0
                  &fid) != 0)
5655
0
      return -1;
5656
0
    space = space_cache_find(space_id);
5657
0
    if (space == NULL)
5658
0
      return -1;
5659
0
    if (space_is_temporary(space)) {
5660
0
      diag_set(ClientError, ER_UNSUPPORTED,
5661
0
         "temporary space", "functional indexes");
5662
0
      return -1;
5663
0
    }
5664
0
    index = index_find(space, index_id);
5665
0
    if (index == NULL)
5666
0
      return -1;
5667
0
    func = func_by_id(fid);
5668
0
    if (func == NULL) {
5669
0
      diag_set(ClientError, ER_NO_SUCH_FUNCTION, int2str(fid));
5670
0
      return -1;
5671
0
    }
5672
    /*
5673
     * These checks are duplicated from the _index's on_replace
5674
     * trigger in order to perform required checks during recovery.
5675
     *
5676
     * See the comment above the same checks in the
5677
     * index_def_new_from_tuple function.
5678
     */
5679
0
    if (func_access_check(func) != 0)
5680
0
      return -1;
5681
0
    if (func_index_check_func(func) != 0)
5682
0
      return -1;
5683
0
    if (index->def->opts.func_id != func->def->fid) {
5684
0
      diag_set(ClientError, ER_WRONG_INDEX_OPTIONS,
5685
0
         "Function ids defined in _index and "
5686
0
         "_func_index don't match");
5687
0
      return -1;
5688
0
    }
5689
0
  } else if (old_tuple != NULL && new_tuple == NULL) {
5690
0
    uint32_t space_id;
5691
0
    uint32_t index_id;
5692
0
    if (tuple_field_u32(old_tuple, BOX_FUNC_INDEX_FIELD_SPACE_ID,
5693
0
            &space_id) != 0)
5694
0
      return -1;
5695
0
    if (tuple_field_u32(old_tuple, BOX_FUNC_INDEX_FIELD_INDEX_ID,
5696
0
            &index_id) != 0)
5697
0
      return -1;
5698
0
    space = space_cache_find(space_id);
5699
0
    if (space == NULL)
5700
0
      return -1;
5701
0
    index = index_find(space, index_id);
5702
0
    if (index == NULL)
5703
0
      return -1;
5704
0
    func = NULL;
5705
0
  } else {
5706
0
    assert(old_tuple != NULL && new_tuple != NULL);
5707
0
    diag_set(ClientError, ER_UNSUPPORTED, "functional index", "alter");
5708
0
    return -1;
5709
0
  }
5710
5711
  /**
5712
   * Index is already initialized for corresponding
5713
   * function. Index rebuild is not required.
5714
   */
5715
0
  if (index_def_get_func(index->def) == func)
5716
0
    return 0;
5717
5718
0
  alter = alter_space_new(space);
5719
0
  if (alter == NULL)
5720
0
    return -1;
5721
0
  auto scoped_guard = make_scoped_guard([=] {alter_space_delete(alter);});
5722
0
  if (alter_space_move_indexes(alter, 0, index->def->iid) != 0)
5723
0
    return -1;
5724
0
  if (func != NULL) {
5725
    /* Set func, rebuild the functional index. */
5726
0
    try {
5727
0
      (void)new RebuildFuncIndex(alter, index->def, func);
5728
0
    } catch (Exception *e) {
5729
0
      return -1;
5730
0
    }
5731
0
  } else {
5732
    /* Reset func, disable the functional index. */
5733
0
    try {
5734
0
      (void)new DisableFuncIndex(alter, index->def);
5735
0
    } catch (Exception *e) {
5736
0
      return -1;
5737
0
    }
5738
0
  }
5739
0
  if (alter_space_move_indexes(alter, index->def->iid + 1,
5740
0
             space->index_id_max + 1) != 0)
5741
0
    return -1;
5742
0
  try {
5743
0
    (void) new UpdateSchemaVersion(alter);
5744
0
    alter_space_do(stmt, alter);
5745
0
  } catch (Exception *e) {
5746
0
    return -1;
5747
0
  }
5748
5749
0
  scoped_guard.is_active = false;
5750
0
  return 0;
5751
0
}
5752
5753
TRIGGER(alter_space_on_replace_space, on_replace_dd_space);
5754
TRIGGER(alter_space_on_replace_index, on_replace_dd_index);
5755
TRIGGER(on_replace_truncate, on_replace_dd_truncate);
5756
TRIGGER(on_replace_schema, on_replace_dd_schema);
5757
TRIGGER(before_replace_schema, before_replace_dd_schema);
5758
TRIGGER(on_replace_user, on_replace_dd_user);
5759
TRIGGER(on_replace_func, on_replace_dd_func);
5760
TRIGGER(on_replace_collation, on_replace_dd_collation);
5761
TRIGGER(on_replace_priv, on_replace_dd_priv);
5762
TRIGGER(on_replace_cluster, on_replace_dd_cluster);
5763
TRIGGER(on_replace_sequence, on_replace_dd_sequence);
5764
TRIGGER(on_replace_sequence_data, on_replace_dd_sequence_data);
5765
TRIGGER(on_replace_space_sequence, on_replace_dd_space_sequence);
5766
TRIGGER(on_replace_trigger, on_replace_dd_trigger);
5767
TRIGGER(on_replace_func_index, on_replace_dd_func_index);
5768
/* vim: set foldmethod=marker */