Coverage Report

Created: 2025-06-15 06:31

/src/postgres/src/backend/access/brin/brin_validate.c
Line
Count
Source (jump to first uncovered line)
1
/*-------------------------------------------------------------------------
2
 *
3
 * brin_validate.c
4
 *    Opclass validator for BRIN.
5
 *
6
 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7
 * Portions Copyright (c) 1994, Regents of the University of California
8
 *
9
 * IDENTIFICATION
10
 *    src/backend/access/brin/brin_validate.c
11
 *
12
 *-------------------------------------------------------------------------
13
 */
14
#include "postgres.h"
15
16
#include "access/amvalidate.h"
17
#include "access/brin_internal.h"
18
#include "access/htup_details.h"
19
#include "catalog/pg_amop.h"
20
#include "catalog/pg_amproc.h"
21
#include "catalog/pg_opclass.h"
22
#include "catalog/pg_type.h"
23
#include "utils/builtins.h"
24
#include "utils/lsyscache.h"
25
#include "utils/regproc.h"
26
#include "utils/syscache.h"
27
28
/*
29
 * Validator for a BRIN opclass.
30
 *
31
 * Some of the checks done here cover the whole opfamily, and therefore are
32
 * redundant when checking each opclass in a family.  But they don't run long
33
 * enough to be much of a problem, so we accept the duplication rather than
34
 * complicate the amvalidate API.
35
 */
36
bool
37
brinvalidate(Oid opclassoid)
38
0
{
39
0
  bool    result = true;
40
0
  HeapTuple classtup;
41
0
  Form_pg_opclass classform;
42
0
  Oid     opfamilyoid;
43
0
  Oid     opcintype;
44
0
  char     *opclassname;
45
0
  char     *opfamilyname;
46
0
  CatCList   *proclist,
47
0
         *oprlist;
48
0
  uint64    allfuncs = 0;
49
0
  uint64    allops = 0;
50
0
  List     *grouplist;
51
0
  OpFamilyOpFuncGroup *opclassgroup;
52
0
  int     i;
53
0
  ListCell   *lc;
54
55
  /* Fetch opclass information */
56
0
  classtup = SearchSysCache1(CLAOID, ObjectIdGetDatum(opclassoid));
57
0
  if (!HeapTupleIsValid(classtup))
58
0
    elog(ERROR, "cache lookup failed for operator class %u", opclassoid);
59
0
  classform = (Form_pg_opclass) GETSTRUCT(classtup);
60
61
0
  opfamilyoid = classform->opcfamily;
62
0
  opcintype = classform->opcintype;
63
0
  opclassname = NameStr(classform->opcname);
64
65
  /* Fetch opfamily information */
66
0
  opfamilyname = get_opfamily_name(opfamilyoid, false);
67
68
  /* Fetch all operators and support functions of the opfamily */
69
0
  oprlist = SearchSysCacheList1(AMOPSTRATEGY, ObjectIdGetDatum(opfamilyoid));
70
0
  proclist = SearchSysCacheList1(AMPROCNUM, ObjectIdGetDatum(opfamilyoid));
71
72
  /* Check individual support functions */
73
0
  for (i = 0; i < proclist->n_members; i++)
74
0
  {
75
0
    HeapTuple proctup = &proclist->members[i]->tuple;
76
0
    Form_pg_amproc procform = (Form_pg_amproc) GETSTRUCT(proctup);
77
0
    bool    ok;
78
79
    /* Check procedure numbers and function signatures */
80
0
    switch (procform->amprocnum)
81
0
    {
82
0
      case BRIN_PROCNUM_OPCINFO:
83
0
        ok = check_amproc_signature(procform->amproc, INTERNALOID, true,
84
0
                      1, 1, INTERNALOID);
85
0
        break;
86
0
      case BRIN_PROCNUM_ADDVALUE:
87
0
        ok = check_amproc_signature(procform->amproc, BOOLOID, true,
88
0
                      4, 4, INTERNALOID, INTERNALOID,
89
0
                      INTERNALOID, INTERNALOID);
90
0
        break;
91
0
      case BRIN_PROCNUM_CONSISTENT:
92
0
        ok = check_amproc_signature(procform->amproc, BOOLOID, true,
93
0
                      3, 4, INTERNALOID, INTERNALOID,
94
0
                      INTERNALOID, INT4OID);
95
0
        break;
96
0
      case BRIN_PROCNUM_UNION:
97
0
        ok = check_amproc_signature(procform->amproc, BOOLOID, true,
98
0
                      3, 3, INTERNALOID, INTERNALOID,
99
0
                      INTERNALOID);
100
0
        break;
101
0
      case BRIN_PROCNUM_OPTIONS:
102
0
        ok = check_amoptsproc_signature(procform->amproc);
103
0
        break;
104
0
      default:
105
        /* Complain if it's not a valid optional proc number */
106
0
        if (procform->amprocnum < BRIN_FIRST_OPTIONAL_PROCNUM ||
107
0
          procform->amprocnum > BRIN_LAST_OPTIONAL_PROCNUM)
108
0
        {
109
0
          ereport(INFO,
110
0
              (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
111
0
               errmsg("operator family \"%s\" of access method %s contains function %s with invalid support number %d",
112
0
                  opfamilyname, "brin",
113
0
                  format_procedure(procform->amproc),
114
0
                  procform->amprocnum)));
115
0
          result = false;
116
0
          continue; /* omit bad proc numbers from allfuncs */
117
0
        }
118
        /* Can't check signatures of optional procs, so assume OK */
119
0
        ok = true;
120
0
        break;
121
0
    }
122
123
0
    if (!ok)
124
0
    {
125
0
      ereport(INFO,
126
0
          (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
127
0
           errmsg("operator family \"%s\" of access method %s contains function %s with wrong signature for support number %d",
128
0
              opfamilyname, "brin",
129
0
              format_procedure(procform->amproc),
130
0
              procform->amprocnum)));
131
0
      result = false;
132
0
    }
133
134
    /* Track all valid procedure numbers seen in opfamily */
135
0
    allfuncs |= ((uint64) 1) << procform->amprocnum;
136
0
  }
137
138
  /* Check individual operators */
139
0
  for (i = 0; i < oprlist->n_members; i++)
140
0
  {
141
0
    HeapTuple oprtup = &oprlist->members[i]->tuple;
142
0
    Form_pg_amop oprform = (Form_pg_amop) GETSTRUCT(oprtup);
143
144
    /* Check that only allowed strategy numbers exist */
145
0
    if (oprform->amopstrategy < 1 || oprform->amopstrategy > 63)
146
0
    {
147
0
      ereport(INFO,
148
0
          (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
149
0
           errmsg("operator family \"%s\" of access method %s contains operator %s with invalid strategy number %d",
150
0
              opfamilyname, "brin",
151
0
              format_operator(oprform->amopopr),
152
0
              oprform->amopstrategy)));
153
0
      result = false;
154
0
    }
155
0
    else
156
0
    {
157
      /*
158
       * The set of operators supplied varies across BRIN opfamilies.
159
       * Our plan is to identify all operator strategy numbers used in
160
       * the opfamily and then complain about datatype combinations that
161
       * are missing any operator(s).  However, consider only numbers
162
       * that appear in some non-cross-type case, since cross-type
163
       * operators may have unique strategies.  (This is not a great
164
       * heuristic, in particular an erroneous number used in a
165
       * cross-type operator will not get noticed; but the core BRIN
166
       * opfamilies are messy enough to make it necessary.)
167
       */
168
0
      if (oprform->amoplefttype == oprform->amoprighttype)
169
0
        allops |= ((uint64) 1) << oprform->amopstrategy;
170
0
    }
171
172
    /* brin doesn't support ORDER BY operators */
173
0
    if (oprform->amoppurpose != AMOP_SEARCH ||
174
0
      OidIsValid(oprform->amopsortfamily))
175
0
    {
176
0
      ereport(INFO,
177
0
          (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
178
0
           errmsg("operator family \"%s\" of access method %s contains invalid ORDER BY specification for operator %s",
179
0
              opfamilyname, "brin",
180
0
              format_operator(oprform->amopopr))));
181
0
      result = false;
182
0
    }
183
184
    /* Check operator signature --- same for all brin strategies */
185
0
    if (!check_amop_signature(oprform->amopopr, BOOLOID,
186
0
                  oprform->amoplefttype,
187
0
                  oprform->amoprighttype))
188
0
    {
189
0
      ereport(INFO,
190
0
          (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
191
0
           errmsg("operator family \"%s\" of access method %s contains operator %s with wrong signature",
192
0
              opfamilyname, "brin",
193
0
              format_operator(oprform->amopopr))));
194
0
      result = false;
195
0
    }
196
0
  }
197
198
  /* Now check for inconsistent groups of operators/functions */
199
0
  grouplist = identify_opfamily_groups(oprlist, proclist);
200
0
  opclassgroup = NULL;
201
0
  foreach(lc, grouplist)
202
0
  {
203
0
    OpFamilyOpFuncGroup *thisgroup = (OpFamilyOpFuncGroup *) lfirst(lc);
204
205
    /* Remember the group exactly matching the test opclass */
206
0
    if (thisgroup->lefttype == opcintype &&
207
0
      thisgroup->righttype == opcintype)
208
0
      opclassgroup = thisgroup;
209
210
    /*
211
     * Some BRIN opfamilies expect cross-type support functions to exist,
212
     * and some don't.  We don't know exactly which are which, so if we
213
     * find a cross-type operator for which there are no support functions
214
     * at all, let it pass.  (Don't expect that all operators exist for
215
     * such cross-type cases, either.)
216
     */
217
0
    if (thisgroup->functionset == 0 &&
218
0
      thisgroup->lefttype != thisgroup->righttype)
219
0
      continue;
220
221
    /*
222
     * Else complain if there seems to be an incomplete set of either
223
     * operators or support functions for this datatype pair.
224
     */
225
0
    if (thisgroup->operatorset != allops)
226
0
    {
227
0
      ereport(INFO,
228
0
          (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
229
0
           errmsg("operator family \"%s\" of access method %s is missing operator(s) for types %s and %s",
230
0
              opfamilyname, "brin",
231
0
              format_type_be(thisgroup->lefttype),
232
0
              format_type_be(thisgroup->righttype))));
233
0
      result = false;
234
0
    }
235
0
    if (thisgroup->functionset != allfuncs)
236
0
    {
237
0
      ereport(INFO,
238
0
          (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
239
0
           errmsg("operator family \"%s\" of access method %s is missing support function(s) for types %s and %s",
240
0
              opfamilyname, "brin",
241
0
              format_type_be(thisgroup->lefttype),
242
0
              format_type_be(thisgroup->righttype))));
243
0
      result = false;
244
0
    }
245
0
  }
246
247
  /* Check that the originally-named opclass is complete */
248
0
  if (!opclassgroup || opclassgroup->operatorset != allops)
249
0
  {
250
0
    ereport(INFO,
251
0
        (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
252
0
         errmsg("operator class \"%s\" of access method %s is missing operator(s)",
253
0
            opclassname, "brin")));
254
0
    result = false;
255
0
  }
256
0
  for (i = 1; i <= BRIN_MANDATORY_NPROCS; i++)
257
0
  {
258
0
    if (opclassgroup &&
259
0
      (opclassgroup->functionset & (((int64) 1) << i)) != 0)
260
0
      continue;     /* got it */
261
0
    ereport(INFO,
262
0
        (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
263
0
         errmsg("operator class \"%s\" of access method %s is missing support function %d",
264
0
            opclassname, "brin", i)));
265
0
    result = false;
266
0
  }
267
268
0
  ReleaseCatCacheList(proclist);
269
0
  ReleaseCatCacheList(oprlist);
270
0
  ReleaseSysCache(classtup);
271
272
0
  return result;
273
0
}