/src/postgres/src/backend/access/gin/gininsert.c
Line | Count | Source (jump to first uncovered line) |
1 | | /*------------------------------------------------------------------------- |
2 | | * |
3 | | * gininsert.c |
4 | | * insert routines for the postgres inverted index access method. |
5 | | * |
6 | | * |
7 | | * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group |
8 | | * Portions Copyright (c) 1994, Regents of the University of California |
9 | | * |
10 | | * IDENTIFICATION |
11 | | * src/backend/access/gin/gininsert.c |
12 | | *------------------------------------------------------------------------- |
13 | | */ |
14 | | |
15 | | #include "postgres.h" |
16 | | |
17 | | #include "access/gin_private.h" |
18 | | #include "access/gin_tuple.h" |
19 | | #include "access/parallel.h" |
20 | | #include "access/table.h" |
21 | | #include "access/tableam.h" |
22 | | #include "access/xloginsert.h" |
23 | | #include "catalog/index.h" |
24 | | #include "catalog/pg_collation.h" |
25 | | #include "commands/progress.h" |
26 | | #include "miscadmin.h" |
27 | | #include "nodes/execnodes.h" |
28 | | #include "pgstat.h" |
29 | | #include "storage/bufmgr.h" |
30 | | #include "storage/predicate.h" |
31 | | #include "tcop/tcopprot.h" |
32 | | #include "utils/datum.h" |
33 | | #include "utils/memutils.h" |
34 | | #include "utils/rel.h" |
35 | | #include "utils/builtins.h" |
36 | | |
37 | | |
38 | | /* Magic numbers for parallel state sharing */ |
39 | 0 | #define PARALLEL_KEY_GIN_SHARED UINT64CONST(0xB000000000000001) |
40 | 0 | #define PARALLEL_KEY_TUPLESORT UINT64CONST(0xB000000000000002) |
41 | 0 | #define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xB000000000000003) |
42 | 0 | #define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xB000000000000004) |
43 | 0 | #define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xB000000000000005) |
44 | | |
45 | | /* |
46 | | * Status for index builds performed in parallel. This is allocated in a |
47 | | * dynamic shared memory segment. |
48 | | */ |
49 | | typedef struct GinBuildShared |
50 | | { |
51 | | /* |
52 | | * These fields are not modified during the build. They primarily exist |
53 | | * for the benefit of worker processes that need to create state |
54 | | * corresponding to that used by the leader. |
55 | | */ |
56 | | Oid heaprelid; |
57 | | Oid indexrelid; |
58 | | bool isconcurrent; |
59 | | int scantuplesortstates; |
60 | | |
61 | | /* |
62 | | * workersdonecv is used to monitor the progress of workers. All parallel |
63 | | * participants must indicate that they are done before leader can use |
64 | | * results built by the workers (and before leader can write the data into |
65 | | * the index). |
66 | | */ |
67 | | ConditionVariable workersdonecv; |
68 | | |
69 | | /* |
70 | | * mutex protects all following fields |
71 | | * |
72 | | * These fields contain status information of interest to GIN index builds |
73 | | * that must work just the same when an index is built in parallel. |
74 | | */ |
75 | | slock_t mutex; |
76 | | |
77 | | /* |
78 | | * Mutable state that is maintained by workers, and reported back to |
79 | | * leader at end of the scans. |
80 | | * |
81 | | * nparticipantsdone is number of worker processes finished. |
82 | | * |
83 | | * reltuples is the total number of input heap tuples. |
84 | | * |
85 | | * indtuples is the total number of tuples that made it into the index. |
86 | | */ |
87 | | int nparticipantsdone; |
88 | | double reltuples; |
89 | | double indtuples; |
90 | | |
91 | | /* |
92 | | * ParallelTableScanDescData data follows. Can't directly embed here, as |
93 | | * implementations of the parallel table scan desc interface might need |
94 | | * stronger alignment. |
95 | | */ |
96 | | } GinBuildShared; |
97 | | |
98 | | /* |
99 | | * Return pointer to a GinBuildShared's parallel table scan. |
100 | | * |
101 | | * c.f. shm_toc_allocate as to why BUFFERALIGN is used, rather than just |
102 | | * MAXALIGN. |
103 | | */ |
104 | | #define ParallelTableScanFromGinBuildShared(shared) \ |
105 | 0 | (ParallelTableScanDesc) ((char *) (shared) + BUFFERALIGN(sizeof(GinBuildShared))) |
106 | | |
107 | | /* |
108 | | * Status for leader in parallel index build. |
109 | | */ |
110 | | typedef struct GinLeader |
111 | | { |
112 | | /* parallel context itself */ |
113 | | ParallelContext *pcxt; |
114 | | |
115 | | /* |
116 | | * nparticipanttuplesorts is the exact number of worker processes |
117 | | * successfully launched, plus one leader process if it participates as a |
118 | | * worker (only DISABLE_LEADER_PARTICIPATION builds avoid leader |
119 | | * participating as a worker). |
120 | | */ |
121 | | int nparticipanttuplesorts; |
122 | | |
123 | | /* |
124 | | * Leader process convenience pointers to shared state (leader avoids TOC |
125 | | * lookups). |
126 | | * |
127 | | * GinBuildShared is the shared state for entire build. sharedsort is the |
128 | | * shared, tuplesort-managed state passed to each process tuplesort. |
129 | | * snapshot is the snapshot used by the scan iff an MVCC snapshot is |
130 | | * required. |
131 | | */ |
132 | | GinBuildShared *ginshared; |
133 | | Sharedsort *sharedsort; |
134 | | Snapshot snapshot; |
135 | | WalUsage *walusage; |
136 | | BufferUsage *bufferusage; |
137 | | } GinLeader; |
138 | | |
139 | | typedef struct |
140 | | { |
141 | | GinState ginstate; |
142 | | double indtuples; |
143 | | GinStatsData buildStats; |
144 | | MemoryContext tmpCtx; |
145 | | MemoryContext funcCtx; |
146 | | BuildAccumulator accum; |
147 | | ItemPointerData tid; |
148 | | int work_mem; |
149 | | |
150 | | /* |
151 | | * bs_leader is only present when a parallel index build is performed, and |
152 | | * only in the leader process. |
153 | | */ |
154 | | GinLeader *bs_leader; |
155 | | int bs_worker_id; |
156 | | |
157 | | /* used to pass information from workers to leader */ |
158 | | double bs_numtuples; |
159 | | double bs_reltuples; |
160 | | |
161 | | /* |
162 | | * The sortstate is used by workers (including the leader). It has to be |
163 | | * part of the build state, because that's the only thing passed to the |
164 | | * build callback etc. |
165 | | */ |
166 | | Tuplesortstate *bs_sortstate; |
167 | | |
168 | | /* |
169 | | * The sortstate used only within a single worker for the first merge pass |
170 | | * happening there. In principle it doesn't need to be part of the build |
171 | | * state and we could pass it around directly, but it's more convenient |
172 | | * this way. And it's part of the build state, after all. |
173 | | */ |
174 | | Tuplesortstate *bs_worker_sort; |
175 | | } GinBuildState; |
176 | | |
177 | | |
178 | | /* parallel index builds */ |
179 | | static void _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index, |
180 | | bool isconcurrent, int request); |
181 | | static void _gin_end_parallel(GinLeader *ginleader, GinBuildState *state); |
182 | | static Size _gin_parallel_estimate_shared(Relation heap, Snapshot snapshot); |
183 | | static double _gin_parallel_heapscan(GinBuildState *state); |
184 | | static double _gin_parallel_merge(GinBuildState *state); |
185 | | static void _gin_leader_participate_as_worker(GinBuildState *buildstate, |
186 | | Relation heap, Relation index); |
187 | | static void _gin_parallel_scan_and_build(GinBuildState *state, |
188 | | GinBuildShared *ginshared, |
189 | | Sharedsort *sharedsort, |
190 | | Relation heap, Relation index, |
191 | | int sortmem, bool progress); |
192 | | |
193 | | static ItemPointer _gin_parse_tuple_items(GinTuple *a); |
194 | | static Datum _gin_parse_tuple_key(GinTuple *a); |
195 | | |
196 | | static GinTuple *_gin_build_tuple(OffsetNumber attrnum, unsigned char category, |
197 | | Datum key, int16 typlen, bool typbyval, |
198 | | ItemPointerData *items, uint32 nitems, |
199 | | Size *len); |
200 | | |
201 | | /* |
202 | | * Adds array of item pointers to tuple's posting list, or |
203 | | * creates posting tree and tuple pointing to tree in case |
204 | | * of not enough space. Max size of tuple is defined in |
205 | | * GinFormTuple(). Returns a new, modified index tuple. |
206 | | * items[] must be in sorted order with no duplicates. |
207 | | */ |
208 | | static IndexTuple |
209 | | addItemPointersToLeafTuple(GinState *ginstate, |
210 | | IndexTuple old, |
211 | | ItemPointerData *items, uint32 nitem, |
212 | | GinStatsData *buildStats, Buffer buffer) |
213 | 0 | { |
214 | 0 | OffsetNumber attnum; |
215 | 0 | Datum key; |
216 | 0 | GinNullCategory category; |
217 | 0 | IndexTuple res; |
218 | 0 | ItemPointerData *newItems, |
219 | 0 | *oldItems; |
220 | 0 | int oldNPosting, |
221 | 0 | newNPosting; |
222 | 0 | GinPostingList *compressedList; |
223 | |
|
224 | 0 | Assert(!GinIsPostingTree(old)); |
225 | |
|
226 | 0 | attnum = gintuple_get_attrnum(ginstate, old); |
227 | 0 | key = gintuple_get_key(ginstate, old, &category); |
228 | | |
229 | | /* merge the old and new posting lists */ |
230 | 0 | oldItems = ginReadTuple(ginstate, attnum, old, &oldNPosting); |
231 | |
|
232 | 0 | newItems = ginMergeItemPointers(items, nitem, |
233 | 0 | oldItems, oldNPosting, |
234 | 0 | &newNPosting); |
235 | | |
236 | | /* Compress the posting list, and try to a build tuple with room for it */ |
237 | 0 | res = NULL; |
238 | 0 | compressedList = ginCompressPostingList(newItems, newNPosting, GinMaxItemSize, |
239 | 0 | NULL); |
240 | 0 | pfree(newItems); |
241 | 0 | if (compressedList) |
242 | 0 | { |
243 | 0 | res = GinFormTuple(ginstate, attnum, key, category, |
244 | 0 | (char *) compressedList, |
245 | 0 | SizeOfGinPostingList(compressedList), |
246 | 0 | newNPosting, |
247 | 0 | false); |
248 | 0 | pfree(compressedList); |
249 | 0 | } |
250 | 0 | if (!res) |
251 | 0 | { |
252 | | /* posting list would be too big, convert to posting tree */ |
253 | 0 | BlockNumber postingRoot; |
254 | | |
255 | | /* |
256 | | * Initialize posting tree with the old tuple's posting list. It's |
257 | | * surely small enough to fit on one posting-tree page, and should |
258 | | * already be in order with no duplicates. |
259 | | */ |
260 | 0 | postingRoot = createPostingTree(ginstate->index, |
261 | 0 | oldItems, |
262 | 0 | oldNPosting, |
263 | 0 | buildStats, |
264 | 0 | buffer); |
265 | | |
266 | | /* Now insert the TIDs-to-be-added into the posting tree */ |
267 | 0 | ginInsertItemPointers(ginstate->index, postingRoot, |
268 | 0 | items, nitem, |
269 | 0 | buildStats); |
270 | | |
271 | | /* And build a new posting-tree-only result tuple */ |
272 | 0 | res = GinFormTuple(ginstate, attnum, key, category, NULL, 0, 0, true); |
273 | 0 | GinSetPostingTree(res, postingRoot); |
274 | 0 | } |
275 | 0 | pfree(oldItems); |
276 | |
|
277 | 0 | return res; |
278 | 0 | } |
279 | | |
280 | | /* |
281 | | * Build a fresh leaf tuple, either posting-list or posting-tree format |
282 | | * depending on whether the given items list will fit. |
283 | | * items[] must be in sorted order with no duplicates. |
284 | | * |
285 | | * This is basically the same logic as in addItemPointersToLeafTuple, |
286 | | * but working from slightly different input. |
287 | | */ |
288 | | static IndexTuple |
289 | | buildFreshLeafTuple(GinState *ginstate, |
290 | | OffsetNumber attnum, Datum key, GinNullCategory category, |
291 | | ItemPointerData *items, uint32 nitem, |
292 | | GinStatsData *buildStats, Buffer buffer) |
293 | 0 | { |
294 | 0 | IndexTuple res = NULL; |
295 | 0 | GinPostingList *compressedList; |
296 | | |
297 | | /* try to build a posting list tuple with all the items */ |
298 | 0 | compressedList = ginCompressPostingList(items, nitem, GinMaxItemSize, NULL); |
299 | 0 | if (compressedList) |
300 | 0 | { |
301 | 0 | res = GinFormTuple(ginstate, attnum, key, category, |
302 | 0 | (char *) compressedList, |
303 | 0 | SizeOfGinPostingList(compressedList), |
304 | 0 | nitem, false); |
305 | 0 | pfree(compressedList); |
306 | 0 | } |
307 | 0 | if (!res) |
308 | 0 | { |
309 | | /* posting list would be too big, build posting tree */ |
310 | 0 | BlockNumber postingRoot; |
311 | | |
312 | | /* |
313 | | * Build posting-tree-only result tuple. We do this first so as to |
314 | | * fail quickly if the key is too big. |
315 | | */ |
316 | 0 | res = GinFormTuple(ginstate, attnum, key, category, NULL, 0, 0, true); |
317 | | |
318 | | /* |
319 | | * Initialize a new posting tree with the TIDs. |
320 | | */ |
321 | 0 | postingRoot = createPostingTree(ginstate->index, items, nitem, |
322 | 0 | buildStats, buffer); |
323 | | |
324 | | /* And save the root link in the result tuple */ |
325 | 0 | GinSetPostingTree(res, postingRoot); |
326 | 0 | } |
327 | |
|
328 | 0 | return res; |
329 | 0 | } |
330 | | |
331 | | /* |
332 | | * Insert one or more heap TIDs associated with the given key value. |
333 | | * This will either add a single key entry, or enlarge a pre-existing entry. |
334 | | * |
335 | | * During an index build, buildStats is non-null and the counters |
336 | | * it contains should be incremented as needed. |
337 | | */ |
338 | | void |
339 | | ginEntryInsert(GinState *ginstate, |
340 | | OffsetNumber attnum, Datum key, GinNullCategory category, |
341 | | ItemPointerData *items, uint32 nitem, |
342 | | GinStatsData *buildStats) |
343 | 0 | { |
344 | 0 | GinBtreeData btree; |
345 | 0 | GinBtreeEntryInsertData insertdata; |
346 | 0 | GinBtreeStack *stack; |
347 | 0 | IndexTuple itup; |
348 | 0 | Page page; |
349 | |
|
350 | 0 | insertdata.isDelete = false; |
351 | |
|
352 | 0 | ginPrepareEntryScan(&btree, attnum, key, category, ginstate); |
353 | 0 | btree.isBuild = (buildStats != NULL); |
354 | |
|
355 | 0 | stack = ginFindLeafPage(&btree, false, false); |
356 | 0 | page = BufferGetPage(stack->buffer); |
357 | |
|
358 | 0 | if (btree.findItem(&btree, stack)) |
359 | 0 | { |
360 | | /* found pre-existing entry */ |
361 | 0 | itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, stack->off)); |
362 | |
|
363 | 0 | if (GinIsPostingTree(itup)) |
364 | 0 | { |
365 | | /* add entries to existing posting tree */ |
366 | 0 | BlockNumber rootPostingTree = GinGetPostingTree(itup); |
367 | | |
368 | | /* release all stack */ |
369 | 0 | LockBuffer(stack->buffer, GIN_UNLOCK); |
370 | 0 | freeGinBtreeStack(stack); |
371 | | |
372 | | /* insert into posting tree */ |
373 | 0 | ginInsertItemPointers(ginstate->index, rootPostingTree, |
374 | 0 | items, nitem, |
375 | 0 | buildStats); |
376 | 0 | return; |
377 | 0 | } |
378 | | |
379 | 0 | CheckForSerializableConflictIn(ginstate->index, NULL, |
380 | 0 | BufferGetBlockNumber(stack->buffer)); |
381 | | /* modify an existing leaf entry */ |
382 | 0 | itup = addItemPointersToLeafTuple(ginstate, itup, |
383 | 0 | items, nitem, buildStats, stack->buffer); |
384 | |
|
385 | 0 | insertdata.isDelete = true; |
386 | 0 | } |
387 | 0 | else |
388 | 0 | { |
389 | 0 | CheckForSerializableConflictIn(ginstate->index, NULL, |
390 | 0 | BufferGetBlockNumber(stack->buffer)); |
391 | | /* no match, so construct a new leaf entry */ |
392 | 0 | itup = buildFreshLeafTuple(ginstate, attnum, key, category, |
393 | 0 | items, nitem, buildStats, stack->buffer); |
394 | | |
395 | | /* |
396 | | * nEntries counts leaf tuples, so increment it only when we make a |
397 | | * new one. |
398 | | */ |
399 | 0 | if (buildStats) |
400 | 0 | buildStats->nEntries++; |
401 | 0 | } |
402 | | |
403 | | /* Insert the new or modified leaf tuple */ |
404 | 0 | insertdata.entry = itup; |
405 | 0 | ginInsertValue(&btree, stack, &insertdata, buildStats); |
406 | 0 | pfree(itup); |
407 | 0 | } |
408 | | |
409 | | /* |
410 | | * Extract index entries for a single indexable item, and add them to the |
411 | | * BuildAccumulator's state. |
412 | | * |
413 | | * This function is used only during initial index creation. |
414 | | */ |
415 | | static void |
416 | | ginHeapTupleBulkInsert(GinBuildState *buildstate, OffsetNumber attnum, |
417 | | Datum value, bool isNull, |
418 | | ItemPointer heapptr) |
419 | 0 | { |
420 | 0 | Datum *entries; |
421 | 0 | GinNullCategory *categories; |
422 | 0 | int32 nentries; |
423 | 0 | MemoryContext oldCtx; |
424 | |
|
425 | 0 | oldCtx = MemoryContextSwitchTo(buildstate->funcCtx); |
426 | 0 | entries = ginExtractEntries(buildstate->accum.ginstate, attnum, |
427 | 0 | value, isNull, |
428 | 0 | &nentries, &categories); |
429 | 0 | MemoryContextSwitchTo(oldCtx); |
430 | |
|
431 | 0 | ginInsertBAEntries(&buildstate->accum, heapptr, attnum, |
432 | 0 | entries, categories, nentries); |
433 | |
|
434 | 0 | buildstate->indtuples += nentries; |
435 | |
|
436 | 0 | MemoryContextReset(buildstate->funcCtx); |
437 | 0 | } |
438 | | |
439 | | static void |
440 | | ginBuildCallback(Relation index, ItemPointer tid, Datum *values, |
441 | | bool *isnull, bool tupleIsAlive, void *state) |
442 | 0 | { |
443 | 0 | GinBuildState *buildstate = (GinBuildState *) state; |
444 | 0 | MemoryContext oldCtx; |
445 | 0 | int i; |
446 | |
|
447 | 0 | oldCtx = MemoryContextSwitchTo(buildstate->tmpCtx); |
448 | |
|
449 | 0 | for (i = 0; i < buildstate->ginstate.origTupdesc->natts; i++) |
450 | 0 | ginHeapTupleBulkInsert(buildstate, (OffsetNumber) (i + 1), |
451 | 0 | values[i], isnull[i], tid); |
452 | | |
453 | | /* If we've maxed out our available memory, dump everything to the index */ |
454 | 0 | if (buildstate->accum.allocatedMemory >= maintenance_work_mem * (Size) 1024) |
455 | 0 | { |
456 | 0 | ItemPointerData *list; |
457 | 0 | Datum key; |
458 | 0 | GinNullCategory category; |
459 | 0 | uint32 nlist; |
460 | 0 | OffsetNumber attnum; |
461 | |
|
462 | 0 | ginBeginBAScan(&buildstate->accum); |
463 | 0 | while ((list = ginGetBAEntry(&buildstate->accum, |
464 | 0 | &attnum, &key, &category, &nlist)) != NULL) |
465 | 0 | { |
466 | | /* there could be many entries, so be willing to abort here */ |
467 | 0 | CHECK_FOR_INTERRUPTS(); |
468 | 0 | ginEntryInsert(&buildstate->ginstate, attnum, key, category, |
469 | 0 | list, nlist, &buildstate->buildStats); |
470 | 0 | } |
471 | |
|
472 | 0 | MemoryContextReset(buildstate->tmpCtx); |
473 | 0 | ginInitBA(&buildstate->accum); |
474 | 0 | } |
475 | |
|
476 | 0 | MemoryContextSwitchTo(oldCtx); |
477 | 0 | } |
478 | | |
479 | | /* |
480 | | * ginFlushBuildState |
481 | | * Write all data from BuildAccumulator into the tuplesort. |
482 | | */ |
483 | | static void |
484 | | ginFlushBuildState(GinBuildState *buildstate, Relation index) |
485 | 0 | { |
486 | 0 | ItemPointerData *list; |
487 | 0 | Datum key; |
488 | 0 | GinNullCategory category; |
489 | 0 | uint32 nlist; |
490 | 0 | OffsetNumber attnum; |
491 | 0 | TupleDesc tdesc = RelationGetDescr(index); |
492 | |
|
493 | 0 | ginBeginBAScan(&buildstate->accum); |
494 | 0 | while ((list = ginGetBAEntry(&buildstate->accum, |
495 | 0 | &attnum, &key, &category, &nlist)) != NULL) |
496 | 0 | { |
497 | | /* information about the key */ |
498 | 0 | Form_pg_attribute attr = TupleDescAttr(tdesc, (attnum - 1)); |
499 | | |
500 | | /* GIN tuple and tuple length */ |
501 | 0 | GinTuple *tup; |
502 | 0 | Size tuplen; |
503 | | |
504 | | /* there could be many entries, so be willing to abort here */ |
505 | 0 | CHECK_FOR_INTERRUPTS(); |
506 | |
|
507 | 0 | tup = _gin_build_tuple(attnum, category, |
508 | 0 | key, attr->attlen, attr->attbyval, |
509 | 0 | list, nlist, &tuplen); |
510 | |
|
511 | 0 | tuplesort_putgintuple(buildstate->bs_worker_sort, tup, tuplen); |
512 | |
|
513 | 0 | pfree(tup); |
514 | 0 | } |
515 | |
|
516 | 0 | MemoryContextReset(buildstate->tmpCtx); |
517 | 0 | ginInitBA(&buildstate->accum); |
518 | 0 | } |
519 | | |
520 | | /* |
521 | | * ginBuildCallbackParallel |
522 | | * Callback for the parallel index build. |
523 | | * |
524 | | * This is similar to the serial build callback ginBuildCallback, but |
525 | | * instead of writing the accumulated entries into the index, each worker |
526 | | * writes them into a (local) tuplesort. |
527 | | * |
528 | | * The worker then sorts and combines these entries, before writing them |
529 | | * into a shared tuplesort for the leader (see _gin_parallel_scan_and_build |
530 | | * for the whole process). |
531 | | */ |
532 | | static void |
533 | | ginBuildCallbackParallel(Relation index, ItemPointer tid, Datum *values, |
534 | | bool *isnull, bool tupleIsAlive, void *state) |
535 | 0 | { |
536 | 0 | GinBuildState *buildstate = (GinBuildState *) state; |
537 | 0 | MemoryContext oldCtx; |
538 | 0 | int i; |
539 | |
|
540 | 0 | oldCtx = MemoryContextSwitchTo(buildstate->tmpCtx); |
541 | | |
542 | | /* |
543 | | * if scan wrapped around - flush accumulated entries and start anew |
544 | | * |
545 | | * With parallel scans, we don't have a guarantee the scan does not start |
546 | | * half-way through the relation (serial builds disable sync scans and |
547 | | * always start from block 0, parallel scans require allow_sync=true). |
548 | | * |
549 | | * Building the posting lists assumes the TIDs are monotonic and never go |
550 | | * back, and the wrap around would break that. We handle that by detecting |
551 | | * the wraparound, and flushing all entries. This means we'll later see |
552 | | * two separate entries with non-overlapping TID lists (which can be |
553 | | * combined by merge sort). |
554 | | * |
555 | | * To detect a wraparound, we remember the last TID seen by each worker |
556 | | * (for any key). If the next TID seen by the worker is lower, the scan |
557 | | * must have wrapped around. |
558 | | */ |
559 | 0 | if (ItemPointerCompare(tid, &buildstate->tid) < 0) |
560 | 0 | ginFlushBuildState(buildstate, index); |
561 | | |
562 | | /* remember the TID we're about to process */ |
563 | 0 | buildstate->tid = *tid; |
564 | |
|
565 | 0 | for (i = 0; i < buildstate->ginstate.origTupdesc->natts; i++) |
566 | 0 | ginHeapTupleBulkInsert(buildstate, (OffsetNumber) (i + 1), |
567 | 0 | values[i], isnull[i], tid); |
568 | | |
569 | | /* |
570 | | * If we've maxed out our available memory, dump everything to the |
571 | | * tuplesort. We use half the per-worker fraction of maintenance_work_mem, |
572 | | * the other half is used for the tuplesort. |
573 | | */ |
574 | 0 | if (buildstate->accum.allocatedMemory >= buildstate->work_mem * (Size) 1024) |
575 | 0 | ginFlushBuildState(buildstate, index); |
576 | |
|
577 | 0 | MemoryContextSwitchTo(oldCtx); |
578 | 0 | } |
579 | | |
580 | | IndexBuildResult * |
581 | | ginbuild(Relation heap, Relation index, IndexInfo *indexInfo) |
582 | 0 | { |
583 | 0 | IndexBuildResult *result; |
584 | 0 | double reltuples; |
585 | 0 | GinBuildState buildstate; |
586 | 0 | GinBuildState *state = &buildstate; |
587 | 0 | Buffer RootBuffer, |
588 | 0 | MetaBuffer; |
589 | 0 | ItemPointerData *list; |
590 | 0 | Datum key; |
591 | 0 | GinNullCategory category; |
592 | 0 | uint32 nlist; |
593 | 0 | MemoryContext oldCtx; |
594 | 0 | OffsetNumber attnum; |
595 | |
|
596 | 0 | if (RelationGetNumberOfBlocks(index) != 0) |
597 | 0 | elog(ERROR, "index \"%s\" already contains data", |
598 | 0 | RelationGetRelationName(index)); |
599 | | |
600 | 0 | initGinState(&buildstate.ginstate, index); |
601 | 0 | buildstate.indtuples = 0; |
602 | 0 | memset(&buildstate.buildStats, 0, sizeof(GinStatsData)); |
603 | | |
604 | | /* Initialize fields for parallel build too. */ |
605 | 0 | buildstate.bs_numtuples = 0; |
606 | 0 | buildstate.bs_reltuples = 0; |
607 | 0 | buildstate.bs_leader = NULL; |
608 | 0 | memset(&buildstate.tid, 0, sizeof(ItemPointerData)); |
609 | | |
610 | | /* initialize the meta page */ |
611 | 0 | MetaBuffer = GinNewBuffer(index); |
612 | | |
613 | | /* initialize the root page */ |
614 | 0 | RootBuffer = GinNewBuffer(index); |
615 | |
|
616 | 0 | START_CRIT_SECTION(); |
617 | 0 | GinInitMetabuffer(MetaBuffer); |
618 | 0 | MarkBufferDirty(MetaBuffer); |
619 | 0 | GinInitBuffer(RootBuffer, GIN_LEAF); |
620 | 0 | MarkBufferDirty(RootBuffer); |
621 | | |
622 | |
|
623 | 0 | UnlockReleaseBuffer(MetaBuffer); |
624 | 0 | UnlockReleaseBuffer(RootBuffer); |
625 | 0 | END_CRIT_SECTION(); |
626 | | |
627 | | /* count the root as first entry page */ |
628 | 0 | buildstate.buildStats.nEntryPages++; |
629 | | |
630 | | /* |
631 | | * create a temporary memory context that is used to hold data not yet |
632 | | * dumped out to the index |
633 | | */ |
634 | 0 | buildstate.tmpCtx = AllocSetContextCreate(CurrentMemoryContext, |
635 | 0 | "Gin build temporary context", |
636 | 0 | ALLOCSET_DEFAULT_SIZES); |
637 | | |
638 | | /* |
639 | | * create a temporary memory context that is used for calling |
640 | | * ginExtractEntries(), and can be reset after each tuple |
641 | | */ |
642 | 0 | buildstate.funcCtx = AllocSetContextCreate(CurrentMemoryContext, |
643 | 0 | "Gin build temporary context for user-defined function", |
644 | 0 | ALLOCSET_DEFAULT_SIZES); |
645 | |
|
646 | 0 | buildstate.accum.ginstate = &buildstate.ginstate; |
647 | 0 | ginInitBA(&buildstate.accum); |
648 | | |
649 | | /* Report table scan phase started */ |
650 | 0 | pgstat_progress_update_param(PROGRESS_CREATEIDX_SUBPHASE, |
651 | 0 | PROGRESS_GIN_PHASE_INDEXBUILD_TABLESCAN); |
652 | | |
653 | | /* |
654 | | * Attempt to launch parallel worker scan when required |
655 | | * |
656 | | * XXX plan_create_index_workers makes the number of workers dependent on |
657 | | * maintenance_work_mem, requiring 32MB for each worker. For GIN that's |
658 | | * reasonable too, because we sort the data just like btree. It does |
659 | | * ignore the memory used to accumulate data in memory (set by work_mem), |
660 | | * but there is no way to communicate that to plan_create_index_workers. |
661 | | */ |
662 | 0 | if (indexInfo->ii_ParallelWorkers > 0) |
663 | 0 | _gin_begin_parallel(state, heap, index, indexInfo->ii_Concurrent, |
664 | 0 | indexInfo->ii_ParallelWorkers); |
665 | | |
666 | | /* |
667 | | * If parallel build requested and at least one worker process was |
668 | | * successfully launched, set up coordination state, wait for workers to |
669 | | * complete. Then read all tuples from the shared tuplesort and insert |
670 | | * them into the index. |
671 | | * |
672 | | * In serial mode, simply scan the table and build the index one index |
673 | | * tuple at a time. |
674 | | */ |
675 | 0 | if (state->bs_leader) |
676 | 0 | { |
677 | 0 | SortCoordinate coordinate; |
678 | |
|
679 | 0 | coordinate = (SortCoordinate) palloc0(sizeof(SortCoordinateData)); |
680 | 0 | coordinate->isWorker = false; |
681 | 0 | coordinate->nParticipants = |
682 | 0 | state->bs_leader->nparticipanttuplesorts; |
683 | 0 | coordinate->sharedsort = state->bs_leader->sharedsort; |
684 | | |
685 | | /* |
686 | | * Begin leader tuplesort. |
687 | | * |
688 | | * In cases where parallelism is involved, the leader receives the |
689 | | * same share of maintenance_work_mem as a serial sort (it is |
690 | | * generally treated in the same way as a serial sort once we return). |
691 | | * Parallel worker Tuplesortstates will have received only a fraction |
692 | | * of maintenance_work_mem, though. |
693 | | * |
694 | | * We rely on the lifetime of the Leader Tuplesortstate almost not |
695 | | * overlapping with any worker Tuplesortstate's lifetime. There may |
696 | | * be some small overlap, but that's okay because we rely on leader |
697 | | * Tuplesortstate only allocating a small, fixed amount of memory |
698 | | * here. When its tuplesort_performsort() is called (by our caller), |
699 | | * and significant amounts of memory are likely to be used, all |
700 | | * workers must have already freed almost all memory held by their |
701 | | * Tuplesortstates (they are about to go away completely, too). The |
702 | | * overall effect is that maintenance_work_mem always represents an |
703 | | * absolute high watermark on the amount of memory used by a CREATE |
704 | | * INDEX operation, regardless of the use of parallelism or any other |
705 | | * factor. |
706 | | */ |
707 | 0 | state->bs_sortstate = |
708 | 0 | tuplesort_begin_index_gin(heap, index, |
709 | 0 | maintenance_work_mem, coordinate, |
710 | 0 | TUPLESORT_NONE); |
711 | | |
712 | | /* scan the relation in parallel and merge per-worker results */ |
713 | 0 | reltuples = _gin_parallel_merge(state); |
714 | |
|
715 | 0 | _gin_end_parallel(state->bs_leader, state); |
716 | 0 | } |
717 | 0 | else /* no parallel index build */ |
718 | 0 | { |
719 | | /* |
720 | | * Do the heap scan. We disallow sync scan here because |
721 | | * dataPlaceToPage prefers to receive tuples in TID order. |
722 | | */ |
723 | 0 | reltuples = table_index_build_scan(heap, index, indexInfo, false, true, |
724 | 0 | ginBuildCallback, &buildstate, NULL); |
725 | | |
726 | | /* dump remaining entries to the index */ |
727 | 0 | oldCtx = MemoryContextSwitchTo(buildstate.tmpCtx); |
728 | 0 | ginBeginBAScan(&buildstate.accum); |
729 | 0 | while ((list = ginGetBAEntry(&buildstate.accum, |
730 | 0 | &attnum, &key, &category, &nlist)) != NULL) |
731 | 0 | { |
732 | | /* there could be many entries, so be willing to abort here */ |
733 | 0 | CHECK_FOR_INTERRUPTS(); |
734 | 0 | ginEntryInsert(&buildstate.ginstate, attnum, key, category, |
735 | 0 | list, nlist, &buildstate.buildStats); |
736 | 0 | } |
737 | 0 | MemoryContextSwitchTo(oldCtx); |
738 | 0 | } |
739 | |
|
740 | 0 | MemoryContextDelete(buildstate.funcCtx); |
741 | 0 | MemoryContextDelete(buildstate.tmpCtx); |
742 | | |
743 | | /* |
744 | | * Update metapage stats |
745 | | */ |
746 | 0 | buildstate.buildStats.nTotalPages = RelationGetNumberOfBlocks(index); |
747 | 0 | ginUpdateStats(index, &buildstate.buildStats, true); |
748 | | |
749 | | /* |
750 | | * We didn't write WAL records as we built the index, so if WAL-logging is |
751 | | * required, write all pages to the WAL now. |
752 | | */ |
753 | 0 | if (RelationNeedsWAL(index)) |
754 | 0 | { |
755 | 0 | log_newpage_range(index, MAIN_FORKNUM, |
756 | 0 | 0, RelationGetNumberOfBlocks(index), |
757 | 0 | true); |
758 | 0 | } |
759 | | |
760 | | /* |
761 | | * Return statistics |
762 | | */ |
763 | 0 | result = (IndexBuildResult *) palloc(sizeof(IndexBuildResult)); |
764 | |
|
765 | 0 | result->heap_tuples = reltuples; |
766 | 0 | result->index_tuples = buildstate.indtuples; |
767 | |
|
768 | 0 | return result; |
769 | 0 | } |
770 | | |
771 | | /* |
772 | | * ginbuildempty() -- build an empty gin index in the initialization fork |
773 | | */ |
774 | | void |
775 | | ginbuildempty(Relation index) |
776 | 0 | { |
777 | 0 | Buffer RootBuffer, |
778 | 0 | MetaBuffer; |
779 | | |
780 | | /* An empty GIN index has two pages. */ |
781 | 0 | MetaBuffer = ExtendBufferedRel(BMR_REL(index), INIT_FORKNUM, NULL, |
782 | 0 | EB_LOCK_FIRST | EB_SKIP_EXTENSION_LOCK); |
783 | 0 | RootBuffer = ExtendBufferedRel(BMR_REL(index), INIT_FORKNUM, NULL, |
784 | 0 | EB_LOCK_FIRST | EB_SKIP_EXTENSION_LOCK); |
785 | | |
786 | | /* Initialize and xlog metabuffer and root buffer. */ |
787 | 0 | START_CRIT_SECTION(); |
788 | 0 | GinInitMetabuffer(MetaBuffer); |
789 | 0 | MarkBufferDirty(MetaBuffer); |
790 | 0 | log_newpage_buffer(MetaBuffer, true); |
791 | 0 | GinInitBuffer(RootBuffer, GIN_LEAF); |
792 | 0 | MarkBufferDirty(RootBuffer); |
793 | 0 | log_newpage_buffer(RootBuffer, false); |
794 | 0 | END_CRIT_SECTION(); |
795 | | |
796 | | /* Unlock and release the buffers. */ |
797 | 0 | UnlockReleaseBuffer(MetaBuffer); |
798 | 0 | UnlockReleaseBuffer(RootBuffer); |
799 | 0 | } |
800 | | |
801 | | /* |
802 | | * Insert index entries for a single indexable item during "normal" |
803 | | * (non-fast-update) insertion |
804 | | */ |
805 | | static void |
806 | | ginHeapTupleInsert(GinState *ginstate, OffsetNumber attnum, |
807 | | Datum value, bool isNull, |
808 | | ItemPointer item) |
809 | 0 | { |
810 | 0 | Datum *entries; |
811 | 0 | GinNullCategory *categories; |
812 | 0 | int32 i, |
813 | 0 | nentries; |
814 | |
|
815 | 0 | entries = ginExtractEntries(ginstate, attnum, value, isNull, |
816 | 0 | &nentries, &categories); |
817 | |
|
818 | 0 | for (i = 0; i < nentries; i++) |
819 | 0 | ginEntryInsert(ginstate, attnum, entries[i], categories[i], |
820 | 0 | item, 1, NULL); |
821 | 0 | } |
822 | | |
823 | | bool |
824 | | gininsert(Relation index, Datum *values, bool *isnull, |
825 | | ItemPointer ht_ctid, Relation heapRel, |
826 | | IndexUniqueCheck checkUnique, |
827 | | bool indexUnchanged, |
828 | | IndexInfo *indexInfo) |
829 | 0 | { |
830 | 0 | GinState *ginstate = (GinState *) indexInfo->ii_AmCache; |
831 | 0 | MemoryContext oldCtx; |
832 | 0 | MemoryContext insertCtx; |
833 | 0 | int i; |
834 | | |
835 | | /* Initialize GinState cache if first call in this statement */ |
836 | 0 | if (ginstate == NULL) |
837 | 0 | { |
838 | 0 | oldCtx = MemoryContextSwitchTo(indexInfo->ii_Context); |
839 | 0 | ginstate = (GinState *) palloc(sizeof(GinState)); |
840 | 0 | initGinState(ginstate, index); |
841 | 0 | indexInfo->ii_AmCache = ginstate; |
842 | 0 | MemoryContextSwitchTo(oldCtx); |
843 | 0 | } |
844 | |
|
845 | 0 | insertCtx = AllocSetContextCreate(CurrentMemoryContext, |
846 | 0 | "Gin insert temporary context", |
847 | 0 | ALLOCSET_DEFAULT_SIZES); |
848 | |
|
849 | 0 | oldCtx = MemoryContextSwitchTo(insertCtx); |
850 | |
|
851 | 0 | if (GinGetUseFastUpdate(index)) |
852 | 0 | { |
853 | 0 | GinTupleCollector collector; |
854 | |
|
855 | 0 | memset(&collector, 0, sizeof(GinTupleCollector)); |
856 | |
|
857 | 0 | for (i = 0; i < ginstate->origTupdesc->natts; i++) |
858 | 0 | ginHeapTupleFastCollect(ginstate, &collector, |
859 | 0 | (OffsetNumber) (i + 1), |
860 | 0 | values[i], isnull[i], |
861 | 0 | ht_ctid); |
862 | |
|
863 | 0 | ginHeapTupleFastInsert(ginstate, &collector); |
864 | 0 | } |
865 | 0 | else |
866 | 0 | { |
867 | 0 | for (i = 0; i < ginstate->origTupdesc->natts; i++) |
868 | 0 | ginHeapTupleInsert(ginstate, (OffsetNumber) (i + 1), |
869 | 0 | values[i], isnull[i], |
870 | 0 | ht_ctid); |
871 | 0 | } |
872 | |
|
873 | 0 | MemoryContextSwitchTo(oldCtx); |
874 | 0 | MemoryContextDelete(insertCtx); |
875 | |
|
876 | 0 | return false; |
877 | 0 | } |
878 | | |
879 | | /* |
880 | | * Create parallel context, and launch workers for leader. |
881 | | * |
882 | | * buildstate argument should be initialized (with the exception of the |
883 | | * tuplesort states, which may later be created based on shared |
884 | | * state initially set up here). |
885 | | * |
886 | | * isconcurrent indicates if operation is CREATE INDEX CONCURRENTLY. |
887 | | * |
888 | | * request is the target number of parallel worker processes to launch. |
889 | | * |
890 | | * Sets buildstate's GinLeader, which caller must use to shut down parallel |
891 | | * mode by passing it to _gin_end_parallel() at the very end of its index |
892 | | * build. If not even a single worker process can be launched, this is |
893 | | * never set, and caller should proceed with a serial index build. |
894 | | */ |
895 | | static void |
896 | | _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index, |
897 | | bool isconcurrent, int request) |
898 | 0 | { |
899 | 0 | ParallelContext *pcxt; |
900 | 0 | int scantuplesortstates; |
901 | 0 | Snapshot snapshot; |
902 | 0 | Size estginshared; |
903 | 0 | Size estsort; |
904 | 0 | GinBuildShared *ginshared; |
905 | 0 | Sharedsort *sharedsort; |
906 | 0 | GinLeader *ginleader = (GinLeader *) palloc0(sizeof(GinLeader)); |
907 | 0 | WalUsage *walusage; |
908 | 0 | BufferUsage *bufferusage; |
909 | 0 | bool leaderparticipates = true; |
910 | 0 | int querylen; |
911 | |
|
912 | | #ifdef DISABLE_LEADER_PARTICIPATION |
913 | | leaderparticipates = false; |
914 | | #endif |
915 | | |
916 | | /* |
917 | | * Enter parallel mode, and create context for parallel build of gin index |
918 | | */ |
919 | 0 | EnterParallelMode(); |
920 | 0 | Assert(request > 0); |
921 | 0 | pcxt = CreateParallelContext("postgres", "_gin_parallel_build_main", |
922 | 0 | request); |
923 | |
|
924 | 0 | scantuplesortstates = leaderparticipates ? request + 1 : request; |
925 | | |
926 | | /* |
927 | | * Prepare for scan of the base relation. In a normal index build, we use |
928 | | * SnapshotAny because we must retrieve all tuples and do our own time |
929 | | * qual checks (because we have to index RECENTLY_DEAD tuples). In a |
930 | | * concurrent build, we take a regular MVCC snapshot and index whatever's |
931 | | * live according to that. |
932 | | */ |
933 | 0 | if (!isconcurrent) |
934 | 0 | snapshot = SnapshotAny; |
935 | 0 | else |
936 | 0 | snapshot = RegisterSnapshot(GetTransactionSnapshot()); |
937 | | |
938 | | /* |
939 | | * Estimate size for our own PARALLEL_KEY_GIN_SHARED workspace. |
940 | | */ |
941 | 0 | estginshared = _gin_parallel_estimate_shared(heap, snapshot); |
942 | 0 | shm_toc_estimate_chunk(&pcxt->estimator, estginshared); |
943 | 0 | estsort = tuplesort_estimate_shared(scantuplesortstates); |
944 | 0 | shm_toc_estimate_chunk(&pcxt->estimator, estsort); |
945 | |
|
946 | 0 | shm_toc_estimate_keys(&pcxt->estimator, 2); |
947 | | |
948 | | /* |
949 | | * Estimate space for WalUsage and BufferUsage -- PARALLEL_KEY_WAL_USAGE |
950 | | * and PARALLEL_KEY_BUFFER_USAGE. |
951 | | * |
952 | | * If there are no extensions loaded that care, we could skip this. We |
953 | | * have no way of knowing whether anyone's looking at pgWalUsage or |
954 | | * pgBufferUsage, so do it unconditionally. |
955 | | */ |
956 | 0 | shm_toc_estimate_chunk(&pcxt->estimator, |
957 | 0 | mul_size(sizeof(WalUsage), pcxt->nworkers)); |
958 | 0 | shm_toc_estimate_keys(&pcxt->estimator, 1); |
959 | 0 | shm_toc_estimate_chunk(&pcxt->estimator, |
960 | 0 | mul_size(sizeof(BufferUsage), pcxt->nworkers)); |
961 | 0 | shm_toc_estimate_keys(&pcxt->estimator, 1); |
962 | | |
963 | | /* Finally, estimate PARALLEL_KEY_QUERY_TEXT space */ |
964 | 0 | if (debug_query_string) |
965 | 0 | { |
966 | 0 | querylen = strlen(debug_query_string); |
967 | 0 | shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1); |
968 | 0 | shm_toc_estimate_keys(&pcxt->estimator, 1); |
969 | 0 | } |
970 | 0 | else |
971 | 0 | querylen = 0; /* keep compiler quiet */ |
972 | | |
973 | | /* Everyone's had a chance to ask for space, so now create the DSM */ |
974 | 0 | InitializeParallelDSM(pcxt); |
975 | | |
976 | | /* If no DSM segment was available, back out (do serial build) */ |
977 | 0 | if (pcxt->seg == NULL) |
978 | 0 | { |
979 | 0 | if (IsMVCCSnapshot(snapshot)) |
980 | 0 | UnregisterSnapshot(snapshot); |
981 | 0 | DestroyParallelContext(pcxt); |
982 | 0 | ExitParallelMode(); |
983 | 0 | return; |
984 | 0 | } |
985 | | |
986 | | /* Store shared build state, for which we reserved space */ |
987 | 0 | ginshared = (GinBuildShared *) shm_toc_allocate(pcxt->toc, estginshared); |
988 | | /* Initialize immutable state */ |
989 | 0 | ginshared->heaprelid = RelationGetRelid(heap); |
990 | 0 | ginshared->indexrelid = RelationGetRelid(index); |
991 | 0 | ginshared->isconcurrent = isconcurrent; |
992 | 0 | ginshared->scantuplesortstates = scantuplesortstates; |
993 | |
|
994 | 0 | ConditionVariableInit(&ginshared->workersdonecv); |
995 | 0 | SpinLockInit(&ginshared->mutex); |
996 | | |
997 | | /* Initialize mutable state */ |
998 | 0 | ginshared->nparticipantsdone = 0; |
999 | 0 | ginshared->reltuples = 0.0; |
1000 | 0 | ginshared->indtuples = 0.0; |
1001 | |
|
1002 | 0 | table_parallelscan_initialize(heap, |
1003 | 0 | ParallelTableScanFromGinBuildShared(ginshared), |
1004 | 0 | snapshot); |
1005 | | |
1006 | | /* |
1007 | | * Store shared tuplesort-private state, for which we reserved space. |
1008 | | * Then, initialize opaque state using tuplesort routine. |
1009 | | */ |
1010 | 0 | sharedsort = (Sharedsort *) shm_toc_allocate(pcxt->toc, estsort); |
1011 | 0 | tuplesort_initialize_shared(sharedsort, scantuplesortstates, |
1012 | 0 | pcxt->seg); |
1013 | |
|
1014 | 0 | shm_toc_insert(pcxt->toc, PARALLEL_KEY_GIN_SHARED, ginshared); |
1015 | 0 | shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLESORT, sharedsort); |
1016 | | |
1017 | | /* Store query string for workers */ |
1018 | 0 | if (debug_query_string) |
1019 | 0 | { |
1020 | 0 | char *sharedquery; |
1021 | |
|
1022 | 0 | sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1); |
1023 | 0 | memcpy(sharedquery, debug_query_string, querylen + 1); |
1024 | 0 | shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, sharedquery); |
1025 | 0 | } |
1026 | | |
1027 | | /* |
1028 | | * Allocate space for each worker's WalUsage and BufferUsage; no need to |
1029 | | * initialize. |
1030 | | */ |
1031 | 0 | walusage = shm_toc_allocate(pcxt->toc, |
1032 | 0 | mul_size(sizeof(WalUsage), pcxt->nworkers)); |
1033 | 0 | shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage); |
1034 | 0 | bufferusage = shm_toc_allocate(pcxt->toc, |
1035 | 0 | mul_size(sizeof(BufferUsage), pcxt->nworkers)); |
1036 | 0 | shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufferusage); |
1037 | | |
1038 | | /* Launch workers, saving status for leader/caller */ |
1039 | 0 | LaunchParallelWorkers(pcxt); |
1040 | 0 | ginleader->pcxt = pcxt; |
1041 | 0 | ginleader->nparticipanttuplesorts = pcxt->nworkers_launched; |
1042 | 0 | if (leaderparticipates) |
1043 | 0 | ginleader->nparticipanttuplesorts++; |
1044 | 0 | ginleader->ginshared = ginshared; |
1045 | 0 | ginleader->sharedsort = sharedsort; |
1046 | 0 | ginleader->snapshot = snapshot; |
1047 | 0 | ginleader->walusage = walusage; |
1048 | 0 | ginleader->bufferusage = bufferusage; |
1049 | | |
1050 | | /* If no workers were successfully launched, back out (do serial build) */ |
1051 | 0 | if (pcxt->nworkers_launched == 0) |
1052 | 0 | { |
1053 | 0 | _gin_end_parallel(ginleader, NULL); |
1054 | 0 | return; |
1055 | 0 | } |
1056 | | |
1057 | | /* Save leader state now that it's clear build will be parallel */ |
1058 | 0 | buildstate->bs_leader = ginleader; |
1059 | | |
1060 | | /* Join heap scan ourselves */ |
1061 | 0 | if (leaderparticipates) |
1062 | 0 | _gin_leader_participate_as_worker(buildstate, heap, index); |
1063 | | |
1064 | | /* |
1065 | | * Caller needs to wait for all launched workers when we return. Make |
1066 | | * sure that the failure-to-start case will not hang forever. |
1067 | | */ |
1068 | 0 | WaitForParallelWorkersToAttach(pcxt); |
1069 | 0 | } |
1070 | | |
1071 | | /* |
1072 | | * Shut down workers, destroy parallel context, and end parallel mode. |
1073 | | */ |
1074 | | static void |
1075 | | _gin_end_parallel(GinLeader *ginleader, GinBuildState *state) |
1076 | 0 | { |
1077 | 0 | int i; |
1078 | | |
1079 | | /* Shutdown worker processes */ |
1080 | 0 | WaitForParallelWorkersToFinish(ginleader->pcxt); |
1081 | | |
1082 | | /* |
1083 | | * Next, accumulate WAL usage. (This must wait for the workers to finish, |
1084 | | * or we might get incomplete data.) |
1085 | | */ |
1086 | 0 | for (i = 0; i < ginleader->pcxt->nworkers_launched; i++) |
1087 | 0 | InstrAccumParallelQuery(&ginleader->bufferusage[i], &ginleader->walusage[i]); |
1088 | | |
1089 | | /* Free last reference to MVCC snapshot, if one was used */ |
1090 | 0 | if (IsMVCCSnapshot(ginleader->snapshot)) |
1091 | 0 | UnregisterSnapshot(ginleader->snapshot); |
1092 | 0 | DestroyParallelContext(ginleader->pcxt); |
1093 | 0 | ExitParallelMode(); |
1094 | 0 | } |
1095 | | |
1096 | | /* |
1097 | | * Within leader, wait for end of heap scan. |
1098 | | * |
1099 | | * When called, parallel heap scan started by _gin_begin_parallel() will |
1100 | | * already be underway within worker processes (when leader participates |
1101 | | * as a worker, we should end up here just as workers are finishing). |
1102 | | * |
1103 | | * Returns the total number of heap tuples scanned. |
1104 | | */ |
1105 | | static double |
1106 | | _gin_parallel_heapscan(GinBuildState *state) |
1107 | 0 | { |
1108 | 0 | GinBuildShared *ginshared = state->bs_leader->ginshared; |
1109 | 0 | int nparticipanttuplesorts; |
1110 | |
|
1111 | 0 | nparticipanttuplesorts = state->bs_leader->nparticipanttuplesorts; |
1112 | 0 | for (;;) |
1113 | 0 | { |
1114 | 0 | SpinLockAcquire(&ginshared->mutex); |
1115 | 0 | if (ginshared->nparticipantsdone == nparticipanttuplesorts) |
1116 | 0 | { |
1117 | | /* copy the data into leader state */ |
1118 | 0 | state->bs_reltuples = ginshared->reltuples; |
1119 | 0 | state->bs_numtuples = ginshared->indtuples; |
1120 | |
|
1121 | 0 | SpinLockRelease(&ginshared->mutex); |
1122 | 0 | break; |
1123 | 0 | } |
1124 | 0 | SpinLockRelease(&ginshared->mutex); |
1125 | |
|
1126 | 0 | ConditionVariableSleep(&ginshared->workersdonecv, |
1127 | 0 | WAIT_EVENT_PARALLEL_CREATE_INDEX_SCAN); |
1128 | 0 | } |
1129 | |
|
1130 | 0 | ConditionVariableCancelSleep(); |
1131 | |
|
1132 | 0 | return state->bs_reltuples; |
1133 | 0 | } |
1134 | | |
1135 | | /* |
1136 | | * Buffer used to accumulate TIDs from multiple GinTuples for the same key |
1137 | | * (we read these from the tuplesort, sorted by the key). |
1138 | | * |
1139 | | * This is similar to BuildAccumulator in that it's used to collect TIDs |
1140 | | * in memory before inserting them into the index, but it's much simpler |
1141 | | * as it only deals with a single index key at a time. |
1142 | | * |
1143 | | * When adding TIDs to the buffer, we make sure to keep them sorted, both |
1144 | | * during the initial table scan (and detecting when the scan wraps around), |
1145 | | * and during merging (where we do mergesort). |
1146 | | */ |
1147 | | typedef struct GinBuffer |
1148 | | { |
1149 | | OffsetNumber attnum; |
1150 | | GinNullCategory category; |
1151 | | Datum key; /* 0 if no key (and keylen == 0) */ |
1152 | | Size keylen; /* number of bytes (not typlen) */ |
1153 | | |
1154 | | /* type info */ |
1155 | | int16 typlen; |
1156 | | bool typbyval; |
1157 | | |
1158 | | /* Number of TIDs to collect before attempt to write some out. */ |
1159 | | int maxitems; |
1160 | | |
1161 | | /* array of TID values */ |
1162 | | int nitems; |
1163 | | int nfrozen; |
1164 | | SortSupport ssup; /* for sorting/comparing keys */ |
1165 | | ItemPointerData *items; |
1166 | | } GinBuffer; |
1167 | | |
1168 | | /* |
1169 | | * Check that TID array contains valid values, and that it's sorted (if we |
1170 | | * expect it to be). |
1171 | | */ |
1172 | | static void |
1173 | | AssertCheckItemPointers(GinBuffer *buffer) |
1174 | 0 | { |
1175 | | #ifdef USE_ASSERT_CHECKING |
1176 | | /* we should not have a buffer with no TIDs to sort */ |
1177 | | Assert(buffer->items != NULL); |
1178 | | Assert(buffer->nitems > 0); |
1179 | | |
1180 | | for (int i = 0; i < buffer->nitems; i++) |
1181 | | { |
1182 | | Assert(ItemPointerIsValid(&buffer->items[i])); |
1183 | | |
1184 | | /* don't check ordering for the first TID item */ |
1185 | | if (i == 0) |
1186 | | continue; |
1187 | | |
1188 | | Assert(ItemPointerCompare(&buffer->items[i - 1], &buffer->items[i]) < 0); |
1189 | | } |
1190 | | #endif |
1191 | 0 | } |
1192 | | |
1193 | | /* |
1194 | | * GinBuffer checks |
1195 | | * |
1196 | | * Make sure the nitems/items fields are consistent (either the array is empty |
1197 | | * or not empty, the fields need to agree). If there are items, check ordering. |
1198 | | */ |
1199 | | static void |
1200 | | AssertCheckGinBuffer(GinBuffer *buffer) |
1201 | 0 | { |
1202 | | #ifdef USE_ASSERT_CHECKING |
1203 | | /* if we have any items, the array must exist */ |
1204 | | Assert(!((buffer->nitems > 0) && (buffer->items == NULL))); |
1205 | | |
1206 | | /* |
1207 | | * The buffer may be empty, in which case we must not call the check of |
1208 | | * item pointers, because that assumes non-emptiness. |
1209 | | */ |
1210 | | if (buffer->nitems == 0) |
1211 | | return; |
1212 | | |
1213 | | /* Make sure the item pointers are valid and sorted. */ |
1214 | | AssertCheckItemPointers(buffer); |
1215 | | #endif |
1216 | 0 | } |
1217 | | |
1218 | | /* |
1219 | | * GinBufferInit |
1220 | | * Initialize buffer to store tuples for a GIN index. |
1221 | | * |
1222 | | * Initialize the buffer used to accumulate TID for a single key at a time |
1223 | | * (we process the data sorted), so we know when we received all data for |
1224 | | * a given key. |
1225 | | * |
1226 | | * Initializes sort support procedures for all index attributes. |
1227 | | */ |
1228 | | static GinBuffer * |
1229 | | GinBufferInit(Relation index) |
1230 | 0 | { |
1231 | 0 | GinBuffer *buffer = palloc0(sizeof(GinBuffer)); |
1232 | 0 | int i, |
1233 | 0 | nKeys; |
1234 | 0 | TupleDesc desc = RelationGetDescr(index); |
1235 | | |
1236 | | /* |
1237 | | * How many items can we fit into the memory limit? We don't want to end |
1238 | | * with too many TIDs. and 64kB seems more than enough. But maybe this |
1239 | | * should be tied to maintenance_work_mem or something like that? |
1240 | | */ |
1241 | 0 | buffer->maxitems = (64 * 1024L) / sizeof(ItemPointerData); |
1242 | |
|
1243 | 0 | nKeys = IndexRelationGetNumberOfKeyAttributes(index); |
1244 | |
|
1245 | 0 | buffer->ssup = palloc0(sizeof(SortSupportData) * nKeys); |
1246 | | |
1247 | | /* |
1248 | | * Lookup ordering operator for the index key data type, and initialize |
1249 | | * the sort support function. |
1250 | | */ |
1251 | 0 | for (i = 0; i < nKeys; i++) |
1252 | 0 | { |
1253 | 0 | Oid cmpFunc; |
1254 | 0 | SortSupport sortKey = &buffer->ssup[i]; |
1255 | 0 | Form_pg_attribute att = TupleDescAttr(desc, i); |
1256 | |
|
1257 | 0 | sortKey->ssup_cxt = CurrentMemoryContext; |
1258 | 0 | sortKey->ssup_collation = index->rd_indcollation[i]; |
1259 | |
|
1260 | 0 | if (!OidIsValid(sortKey->ssup_collation)) |
1261 | 0 | sortKey->ssup_collation = DEFAULT_COLLATION_OID; |
1262 | |
|
1263 | 0 | sortKey->ssup_nulls_first = false; |
1264 | 0 | sortKey->ssup_attno = i + 1; |
1265 | 0 | sortKey->abbreviate = false; |
1266 | |
|
1267 | 0 | Assert(sortKey->ssup_attno != 0); |
1268 | | |
1269 | | /* |
1270 | | * If the compare proc isn't specified in the opclass definition, look |
1271 | | * up the index key type's default btree comparator. |
1272 | | */ |
1273 | 0 | cmpFunc = index_getprocid(index, i + 1, GIN_COMPARE_PROC); |
1274 | 0 | if (cmpFunc == InvalidOid) |
1275 | 0 | { |
1276 | 0 | TypeCacheEntry *typentry; |
1277 | |
|
1278 | 0 | typentry = lookup_type_cache(att->atttypid, |
1279 | 0 | TYPECACHE_CMP_PROC_FINFO); |
1280 | 0 | if (!OidIsValid(typentry->cmp_proc_finfo.fn_oid)) |
1281 | 0 | ereport(ERROR, |
1282 | 0 | (errcode(ERRCODE_UNDEFINED_FUNCTION), |
1283 | 0 | errmsg("could not identify a comparison function for type %s", |
1284 | 0 | format_type_be(att->atttypid)))); |
1285 | | |
1286 | 0 | cmpFunc = typentry->cmp_proc_finfo.fn_oid; |
1287 | 0 | } |
1288 | | |
1289 | 0 | PrepareSortSupportComparisonShim(cmpFunc, sortKey); |
1290 | 0 | } |
1291 | | |
1292 | 0 | return buffer; |
1293 | 0 | } |
1294 | | |
1295 | | /* Is the buffer empty, i.e. has no TID values in the array? */ |
1296 | | static bool |
1297 | | GinBufferIsEmpty(GinBuffer *buffer) |
1298 | 0 | { |
1299 | 0 | return (buffer->nitems == 0); |
1300 | 0 | } |
1301 | | |
1302 | | /* |
1303 | | * GinBufferKeyEquals |
1304 | | * Can the buffer store TIDs for the provided GIN tuple (same key)? |
1305 | | * |
1306 | | * Compare if the tuple matches the already accumulated data in the GIN |
1307 | | * buffer. Compare scalar fields first, before the actual key. |
1308 | | * |
1309 | | * Returns true if the key matches, and the TID belongs to the buffer, or |
1310 | | * false if the key does not match. |
1311 | | */ |
1312 | | static bool |
1313 | | GinBufferKeyEquals(GinBuffer *buffer, GinTuple *tup) |
1314 | 0 | { |
1315 | 0 | int r; |
1316 | 0 | Datum tupkey; |
1317 | |
|
1318 | 0 | AssertCheckGinBuffer(buffer); |
1319 | |
|
1320 | 0 | if (tup->attrnum != buffer->attnum) |
1321 | 0 | return false; |
1322 | | |
1323 | | /* same attribute should have the same type info */ |
1324 | 0 | Assert(tup->typbyval == buffer->typbyval); |
1325 | 0 | Assert(tup->typlen == buffer->typlen); |
1326 | |
|
1327 | 0 | if (tup->category != buffer->category) |
1328 | 0 | return false; |
1329 | | |
1330 | | /* |
1331 | | * For NULL/empty keys, this means equality, for normal keys we need to |
1332 | | * compare the actual key value. |
1333 | | */ |
1334 | 0 | if (buffer->category != GIN_CAT_NORM_KEY) |
1335 | 0 | return true; |
1336 | | |
1337 | | /* |
1338 | | * For the tuple, get either the first sizeof(Datum) bytes for byval |
1339 | | * types, or a pointer to the beginning of the data array. |
1340 | | */ |
1341 | 0 | tupkey = (buffer->typbyval) ? *(Datum *) tup->data : PointerGetDatum(tup->data); |
1342 | |
|
1343 | 0 | r = ApplySortComparator(buffer->key, false, |
1344 | 0 | tupkey, false, |
1345 | 0 | &buffer->ssup[buffer->attnum - 1]); |
1346 | |
|
1347 | 0 | return (r == 0); |
1348 | 0 | } |
1349 | | |
1350 | | /* |
1351 | | * GinBufferShouldTrim |
1352 | | * Should we trim the list of item pointers? |
1353 | | * |
1354 | | * By trimming we understand writing out and removing the tuple IDs that |
1355 | | * we know can't change by future merges. We can deduce the TID up to which |
1356 | | * this is guaranteed from the "first" TID in each GIN tuple, which provides |
1357 | | * a "horizon" (for a given key) thanks to the sort. |
1358 | | * |
1359 | | * We don't want to do this too often - compressing longer TID lists is more |
1360 | | * efficient. But we also don't want to accumulate too many TIDs, for two |
1361 | | * reasons. First, it consumes memory and we might exceed maintenance_work_mem |
1362 | | * (or whatever limit applies), even if that's unlikely because TIDs are very |
1363 | | * small so we can fit a lot of them. Second, and more importantly, long TID |
1364 | | * lists are an issue if the scan wraps around, because a key may get a very |
1365 | | * wide list (with min/max TID for that key), forcing "full" mergesorts for |
1366 | | * every list merged into it (instead of the efficient append). |
1367 | | * |
1368 | | * So we look at two things when deciding if to trim - if the resulting list |
1369 | | * (after adding TIDs from the new tuple) would be too long, and if there is |
1370 | | * enough TIDs to trim (with values less than "first" TID from the new tuple), |
1371 | | * we do the trim. By enough we mean at least 128 TIDs (mostly an arbitrary |
1372 | | * number). |
1373 | | */ |
1374 | | static bool |
1375 | | GinBufferShouldTrim(GinBuffer *buffer, GinTuple *tup) |
1376 | 0 | { |
1377 | | /* not enough TIDs to trim (1024 is somewhat arbitrary number) */ |
1378 | 0 | if (buffer->nfrozen < 1024) |
1379 | 0 | return false; |
1380 | | |
1381 | | /* no need to trim if we have not hit the memory limit yet */ |
1382 | 0 | if ((buffer->nitems + tup->nitems) < buffer->maxitems) |
1383 | 0 | return false; |
1384 | | |
1385 | | /* |
1386 | | * OK, we have enough frozen TIDs to flush, and we have hit the memory |
1387 | | * limit, so it's time to write it out. |
1388 | | */ |
1389 | 0 | return true; |
1390 | 0 | } |
1391 | | |
1392 | | /* |
1393 | | * GinBufferStoreTuple |
1394 | | * Add data (especially TID list) from a GIN tuple to the buffer. |
1395 | | * |
1396 | | * The buffer is expected to be empty (in which case it's initialized), or |
1397 | | * having the same key. The TID values from the tuple are combined with the |
1398 | | * stored values using a merge sort. |
1399 | | * |
1400 | | * The tuples (for the same key) are expected to be sorted by first TID. But |
1401 | | * this does not guarantee the lists do not overlap, especially in the leader, |
1402 | | * because the workers process interleaving data. There should be no overlaps |
1403 | | * in a single worker - it could happen when the parallel scan wraps around, |
1404 | | * but we detect that and flush the data (see ginBuildCallbackParallel). |
1405 | | * |
1406 | | * By sorting the GinTuple not only by key, but also by the first TID, we make |
1407 | | * it more less likely the lists will overlap during merge. We merge them using |
1408 | | * mergesort, but it's cheaper to just append one list to the other. |
1409 | | * |
1410 | | * How often can the lists overlap? There should be no overlaps in workers, |
1411 | | * and in the leader we can see overlaps between lists built by different |
1412 | | * workers. But the workers merge the items as much as possible, so there |
1413 | | * should not be too many. |
1414 | | */ |
1415 | | static void |
1416 | | GinBufferStoreTuple(GinBuffer *buffer, GinTuple *tup) |
1417 | 0 | { |
1418 | 0 | ItemPointerData *items; |
1419 | 0 | Datum key; |
1420 | |
|
1421 | 0 | AssertCheckGinBuffer(buffer); |
1422 | |
|
1423 | 0 | key = _gin_parse_tuple_key(tup); |
1424 | 0 | items = _gin_parse_tuple_items(tup); |
1425 | | |
1426 | | /* if the buffer is empty, set the fields (and copy the key) */ |
1427 | 0 | if (GinBufferIsEmpty(buffer)) |
1428 | 0 | { |
1429 | 0 | buffer->category = tup->category; |
1430 | 0 | buffer->keylen = tup->keylen; |
1431 | 0 | buffer->attnum = tup->attrnum; |
1432 | |
|
1433 | 0 | buffer->typlen = tup->typlen; |
1434 | 0 | buffer->typbyval = tup->typbyval; |
1435 | |
|
1436 | 0 | if (tup->category == GIN_CAT_NORM_KEY) |
1437 | 0 | buffer->key = datumCopy(key, buffer->typbyval, buffer->typlen); |
1438 | 0 | else |
1439 | 0 | buffer->key = (Datum) 0; |
1440 | 0 | } |
1441 | | |
1442 | | /* |
1443 | | * Try freeze TIDs at the beginning of the list, i.e. exclude them from |
1444 | | * the mergesort. We can do that with TIDs before the first TID in the new |
1445 | | * tuple we're about to add into the buffer. |
1446 | | * |
1447 | | * We do this incrementally when adding data into the in-memory buffer, |
1448 | | * and not later (e.g. when hitting a memory limit), because it allows us |
1449 | | * to skip the frozen data during the mergesort, making it cheaper. |
1450 | | */ |
1451 | | |
1452 | | /* |
1453 | | * Check if the last TID in the current list is frozen. This is the case |
1454 | | * when merging non-overlapping lists, e.g. in each parallel worker. |
1455 | | */ |
1456 | 0 | if ((buffer->nitems > 0) && |
1457 | 0 | (ItemPointerCompare(&buffer->items[buffer->nitems - 1], |
1458 | 0 | GinTupleGetFirst(tup)) == 0)) |
1459 | 0 | buffer->nfrozen = buffer->nitems; |
1460 | | |
1461 | | /* |
1462 | | * Now find the last TID we know to be frozen, i.e. the last TID right |
1463 | | * before the new GIN tuple. |
1464 | | * |
1465 | | * Start with the first not-yet-frozen tuple, and walk until we find the |
1466 | | * first TID that's higher. If we already know the whole list is frozen |
1467 | | * (i.e. nfrozen == nitems), this does nothing. |
1468 | | * |
1469 | | * XXX This might do a binary search for sufficiently long lists, but it |
1470 | | * does not seem worth the complexity. Overlapping lists should be rare |
1471 | | * common, TID comparisons are cheap, and we should quickly freeze most of |
1472 | | * the list. |
1473 | | */ |
1474 | 0 | for (int i = buffer->nfrozen; i < buffer->nitems; i++) |
1475 | 0 | { |
1476 | | /* Is the TID after the first TID of the new tuple? Can't freeze. */ |
1477 | 0 | if (ItemPointerCompare(&buffer->items[i], |
1478 | 0 | GinTupleGetFirst(tup)) > 0) |
1479 | 0 | break; |
1480 | | |
1481 | 0 | buffer->nfrozen++; |
1482 | 0 | } |
1483 | | |
1484 | | /* add the new TIDs into the buffer, combine using merge-sort */ |
1485 | 0 | { |
1486 | 0 | int nnew; |
1487 | 0 | ItemPointer new; |
1488 | | |
1489 | | /* |
1490 | | * Resize the array - we do this first, because we'll dereference the |
1491 | | * first unfrozen TID, which would fail if the array is NULL. We'll |
1492 | | * still pass 0 as number of elements in that array though. |
1493 | | */ |
1494 | 0 | if (buffer->items == NULL) |
1495 | 0 | buffer->items = palloc((buffer->nitems + tup->nitems) * sizeof(ItemPointerData)); |
1496 | 0 | else |
1497 | 0 | buffer->items = repalloc(buffer->items, |
1498 | 0 | (buffer->nitems + tup->nitems) * sizeof(ItemPointerData)); |
1499 | |
|
1500 | 0 | new = ginMergeItemPointers(&buffer->items[buffer->nfrozen], /* first unfrozen */ |
1501 | 0 | (buffer->nitems - buffer->nfrozen), /* num of unfrozen */ |
1502 | 0 | items, tup->nitems, &nnew); |
1503 | |
|
1504 | 0 | Assert(nnew == (tup->nitems + (buffer->nitems - buffer->nfrozen))); |
1505 | |
|
1506 | 0 | memcpy(&buffer->items[buffer->nfrozen], new, |
1507 | 0 | nnew * sizeof(ItemPointerData)); |
1508 | |
|
1509 | 0 | pfree(new); |
1510 | |
|
1511 | 0 | buffer->nitems += tup->nitems; |
1512 | |
|
1513 | 0 | AssertCheckItemPointers(buffer); |
1514 | 0 | } |
1515 | | |
1516 | | /* free the decompressed TID list */ |
1517 | 0 | pfree(items); |
1518 | 0 | } |
1519 | | |
1520 | | /* |
1521 | | * GinBufferReset |
1522 | | * Reset the buffer into a state as if it contains no data. |
1523 | | */ |
1524 | | static void |
1525 | | GinBufferReset(GinBuffer *buffer) |
1526 | 0 | { |
1527 | 0 | Assert(!GinBufferIsEmpty(buffer)); |
1528 | | |
1529 | | /* release byref values, do nothing for by-val ones */ |
1530 | 0 | if ((buffer->category == GIN_CAT_NORM_KEY) && !buffer->typbyval) |
1531 | 0 | pfree(DatumGetPointer(buffer->key)); |
1532 | | |
1533 | | /* |
1534 | | * Not required, but makes it more likely to trigger NULL dereference if |
1535 | | * using the value incorrectly, etc. |
1536 | | */ |
1537 | 0 | buffer->key = (Datum) 0; |
1538 | |
|
1539 | 0 | buffer->attnum = 0; |
1540 | 0 | buffer->category = 0; |
1541 | 0 | buffer->keylen = 0; |
1542 | 0 | buffer->nitems = 0; |
1543 | 0 | buffer->nfrozen = 0; |
1544 | |
|
1545 | 0 | buffer->typlen = 0; |
1546 | 0 | buffer->typbyval = 0; |
1547 | 0 | } |
1548 | | |
1549 | | /* |
1550 | | * GinBufferTrim |
1551 | | * Discard the "frozen" part of the TID list (which should have been |
1552 | | * written to disk/index before this call). |
1553 | | */ |
1554 | | static void |
1555 | | GinBufferTrim(GinBuffer *buffer) |
1556 | 0 | { |
1557 | 0 | Assert((buffer->nfrozen > 0) && (buffer->nfrozen <= buffer->nitems)); |
1558 | |
|
1559 | 0 | memmove(&buffer->items[0], &buffer->items[buffer->nfrozen], |
1560 | 0 | sizeof(ItemPointerData) * (buffer->nitems - buffer->nfrozen)); |
1561 | |
|
1562 | 0 | buffer->nitems -= buffer->nfrozen; |
1563 | 0 | buffer->nfrozen = 0; |
1564 | 0 | } |
1565 | | |
1566 | | /* |
1567 | | * GinBufferFree |
1568 | | * Release memory associated with the GinBuffer (including TID array). |
1569 | | */ |
1570 | | static void |
1571 | | GinBufferFree(GinBuffer *buffer) |
1572 | 0 | { |
1573 | 0 | if (buffer->items) |
1574 | 0 | pfree(buffer->items); |
1575 | | |
1576 | | /* release byref values, do nothing for by-val ones */ |
1577 | 0 | if (!GinBufferIsEmpty(buffer) && |
1578 | 0 | (buffer->category == GIN_CAT_NORM_KEY) && !buffer->typbyval) |
1579 | 0 | pfree(DatumGetPointer(buffer->key)); |
1580 | |
|
1581 | 0 | pfree(buffer); |
1582 | 0 | } |
1583 | | |
1584 | | /* |
1585 | | * GinBufferCanAddKey |
1586 | | * Check if a given GIN tuple can be added to the current buffer. |
1587 | | * |
1588 | | * Returns true if the buffer is either empty or for the same index key. |
1589 | | */ |
1590 | | static bool |
1591 | | GinBufferCanAddKey(GinBuffer *buffer, GinTuple *tup) |
1592 | 0 | { |
1593 | | /* empty buffer can accept data for any key */ |
1594 | 0 | if (GinBufferIsEmpty(buffer)) |
1595 | 0 | return true; |
1596 | | |
1597 | | /* otherwise just data for the same key */ |
1598 | 0 | return GinBufferKeyEquals(buffer, tup); |
1599 | 0 | } |
1600 | | |
1601 | | /* |
1602 | | * Within leader, wait for end of heap scan and merge per-worker results. |
1603 | | * |
1604 | | * After waiting for all workers to finish, merge the per-worker results into |
1605 | | * the complete index. The results from each worker are sorted by block number |
1606 | | * (start of the page range). While combining the per-worker results we merge |
1607 | | * summaries for the same page range, and also fill-in empty summaries for |
1608 | | * ranges without any tuples. |
1609 | | * |
1610 | | * Returns the total number of heap tuples scanned. |
1611 | | */ |
1612 | | static double |
1613 | | _gin_parallel_merge(GinBuildState *state) |
1614 | 0 | { |
1615 | 0 | GinTuple *tup; |
1616 | 0 | Size tuplen; |
1617 | 0 | double reltuples = 0; |
1618 | 0 | GinBuffer *buffer; |
1619 | | |
1620 | | /* GIN tuples from workers, merged by leader */ |
1621 | 0 | double numtuples = 0; |
1622 | | |
1623 | | /* wait for workers to scan table and produce partial results */ |
1624 | 0 | reltuples = _gin_parallel_heapscan(state); |
1625 | | |
1626 | | /* Execute the sort */ |
1627 | 0 | pgstat_progress_update_param(PROGRESS_CREATEIDX_SUBPHASE, |
1628 | 0 | PROGRESS_GIN_PHASE_PERFORMSORT_2); |
1629 | | |
1630 | | /* do the actual sort in the leader */ |
1631 | 0 | tuplesort_performsort(state->bs_sortstate); |
1632 | | |
1633 | | /* |
1634 | | * Initialize buffer to combine entries for the same key. |
1635 | | * |
1636 | | * The leader is allowed to use the whole maintenance_work_mem buffer to |
1637 | | * combine data. The parallel workers already completed. |
1638 | | */ |
1639 | 0 | buffer = GinBufferInit(state->ginstate.index); |
1640 | | |
1641 | | /* |
1642 | | * Set the progress target for the next phase. Reset the block number |
1643 | | * values set by table_index_build_scan |
1644 | | */ |
1645 | 0 | { |
1646 | 0 | const int progress_index[] = { |
1647 | 0 | PROGRESS_CREATEIDX_SUBPHASE, |
1648 | 0 | PROGRESS_CREATEIDX_TUPLES_TOTAL, |
1649 | 0 | PROGRESS_SCAN_BLOCKS_TOTAL, |
1650 | 0 | PROGRESS_SCAN_BLOCKS_DONE |
1651 | 0 | }; |
1652 | 0 | const int64 progress_vals[] = { |
1653 | 0 | PROGRESS_GIN_PHASE_MERGE_2, |
1654 | 0 | state->bs_numtuples, |
1655 | 0 | 0, 0 |
1656 | 0 | }; |
1657 | |
|
1658 | 0 | pgstat_progress_update_multi_param(4, progress_index, progress_vals); |
1659 | 0 | } |
1660 | | |
1661 | | /* |
1662 | | * Read the GIN tuples from the shared tuplesort, sorted by category and |
1663 | | * key. That probably gives us order matching how data is organized in the |
1664 | | * index. |
1665 | | * |
1666 | | * We don't insert the GIN tuples right away, but instead accumulate as |
1667 | | * many TIDs for the same key as possible, and then insert that at once. |
1668 | | * This way we don't need to decompress/recompress the posting lists, etc. |
1669 | | */ |
1670 | 0 | while ((tup = tuplesort_getgintuple(state->bs_sortstate, &tuplen, true)) != NULL) |
1671 | 0 | { |
1672 | 0 | MemoryContext oldCtx; |
1673 | |
|
1674 | 0 | CHECK_FOR_INTERRUPTS(); |
1675 | | |
1676 | | /* |
1677 | | * If the buffer can accept the new GIN tuple, just store it there and |
1678 | | * we're done. If it's a different key (or maybe too much data) flush |
1679 | | * the current contents into the index first. |
1680 | | */ |
1681 | 0 | if (!GinBufferCanAddKey(buffer, tup)) |
1682 | 0 | { |
1683 | | /* |
1684 | | * Buffer is not empty and it's storing a different key - flush |
1685 | | * the data into the insert, and start a new entry for current |
1686 | | * GinTuple. |
1687 | | */ |
1688 | 0 | AssertCheckItemPointers(buffer); |
1689 | |
|
1690 | 0 | oldCtx = MemoryContextSwitchTo(state->tmpCtx); |
1691 | |
|
1692 | 0 | ginEntryInsert(&state->ginstate, |
1693 | 0 | buffer->attnum, buffer->key, buffer->category, |
1694 | 0 | buffer->items, buffer->nitems, &state->buildStats); |
1695 | |
|
1696 | 0 | MemoryContextSwitchTo(oldCtx); |
1697 | 0 | MemoryContextReset(state->tmpCtx); |
1698 | | |
1699 | | /* discard the existing data */ |
1700 | 0 | GinBufferReset(buffer); |
1701 | 0 | } |
1702 | | |
1703 | | /* |
1704 | | * We're about to add a GIN tuple to the buffer - check the memory |
1705 | | * limit first, and maybe write out some of the data into the index |
1706 | | * first, if needed (and possible). We only flush the part of the TID |
1707 | | * list that we know won't change, and only if there's enough data for |
1708 | | * compression to work well. |
1709 | | */ |
1710 | 0 | if (GinBufferShouldTrim(buffer, tup)) |
1711 | 0 | { |
1712 | 0 | Assert(buffer->nfrozen > 0); |
1713 | | |
1714 | | /* |
1715 | | * Buffer is not empty and it's storing a different key - flush |
1716 | | * the data into the insert, and start a new entry for current |
1717 | | * GinTuple. |
1718 | | */ |
1719 | 0 | AssertCheckItemPointers(buffer); |
1720 | |
|
1721 | 0 | oldCtx = MemoryContextSwitchTo(state->tmpCtx); |
1722 | |
|
1723 | 0 | ginEntryInsert(&state->ginstate, |
1724 | 0 | buffer->attnum, buffer->key, buffer->category, |
1725 | 0 | buffer->items, buffer->nfrozen, &state->buildStats); |
1726 | |
|
1727 | 0 | MemoryContextSwitchTo(oldCtx); |
1728 | 0 | MemoryContextReset(state->tmpCtx); |
1729 | | |
1730 | | /* truncate the data we've just discarded */ |
1731 | 0 | GinBufferTrim(buffer); |
1732 | 0 | } |
1733 | | |
1734 | | /* |
1735 | | * Remember data for the current tuple (either remember the new key, |
1736 | | * or append if to the existing data). |
1737 | | */ |
1738 | 0 | GinBufferStoreTuple(buffer, tup); |
1739 | | |
1740 | | /* Report progress */ |
1741 | 0 | pgstat_progress_update_param(PROGRESS_CREATEIDX_TUPLES_DONE, |
1742 | 0 | ++numtuples); |
1743 | 0 | } |
1744 | | |
1745 | | /* flush data remaining in the buffer (for the last key) */ |
1746 | 0 | if (!GinBufferIsEmpty(buffer)) |
1747 | 0 | { |
1748 | 0 | AssertCheckItemPointers(buffer); |
1749 | |
|
1750 | 0 | ginEntryInsert(&state->ginstate, |
1751 | 0 | buffer->attnum, buffer->key, buffer->category, |
1752 | 0 | buffer->items, buffer->nitems, &state->buildStats); |
1753 | | |
1754 | | /* discard the existing data */ |
1755 | 0 | GinBufferReset(buffer); |
1756 | | |
1757 | | /* Report progress */ |
1758 | 0 | pgstat_progress_update_param(PROGRESS_CREATEIDX_TUPLES_DONE, |
1759 | 0 | ++numtuples); |
1760 | 0 | } |
1761 | | |
1762 | | /* relase all the memory */ |
1763 | 0 | GinBufferFree(buffer); |
1764 | |
|
1765 | 0 | tuplesort_end(state->bs_sortstate); |
1766 | |
|
1767 | 0 | return reltuples; |
1768 | 0 | } |
1769 | | |
1770 | | /* |
1771 | | * Returns size of shared memory required to store state for a parallel |
1772 | | * gin index build based on the snapshot its parallel scan will use. |
1773 | | */ |
1774 | | static Size |
1775 | | _gin_parallel_estimate_shared(Relation heap, Snapshot snapshot) |
1776 | 0 | { |
1777 | | /* c.f. shm_toc_allocate as to why BUFFERALIGN is used */ |
1778 | 0 | return add_size(BUFFERALIGN(sizeof(GinBuildShared)), |
1779 | 0 | table_parallelscan_estimate(heap, snapshot)); |
1780 | 0 | } |
1781 | | |
1782 | | /* |
1783 | | * Within leader, participate as a parallel worker. |
1784 | | */ |
1785 | | static void |
1786 | | _gin_leader_participate_as_worker(GinBuildState *buildstate, Relation heap, Relation index) |
1787 | 0 | { |
1788 | 0 | GinLeader *ginleader = buildstate->bs_leader; |
1789 | 0 | int sortmem; |
1790 | | |
1791 | | /* |
1792 | | * Might as well use reliable figure when doling out maintenance_work_mem |
1793 | | * (when requested number of workers were not launched, this will be |
1794 | | * somewhat higher than it is for other workers). |
1795 | | */ |
1796 | 0 | sortmem = maintenance_work_mem / ginleader->nparticipanttuplesorts; |
1797 | | |
1798 | | /* Perform work common to all participants */ |
1799 | 0 | _gin_parallel_scan_and_build(buildstate, ginleader->ginshared, |
1800 | 0 | ginleader->sharedsort, heap, index, |
1801 | 0 | sortmem, true); |
1802 | 0 | } |
1803 | | |
1804 | | /* |
1805 | | * _gin_process_worker_data |
1806 | | * First phase of the key merging, happening in the worker. |
1807 | | * |
1808 | | * Depending on the number of distinct keys, the TID lists produced by the |
1809 | | * callback may be very short (due to frequent evictions in the callback). |
1810 | | * But combining many tiny lists is expensive, so we try to do as much as |
1811 | | * possible in the workers and only then pass the results to the leader. |
1812 | | * |
1813 | | * We read the tuples sorted by the key, and merge them into larger lists. |
1814 | | * At the moment there's no memory limit, so this will just produce one |
1815 | | * huge (sorted) list per key in each worker. Which means the leader will |
1816 | | * do a very limited number of mergesorts, which is good. |
1817 | | */ |
1818 | | static void |
1819 | | _gin_process_worker_data(GinBuildState *state, Tuplesortstate *worker_sort, |
1820 | | bool progress) |
1821 | 0 | { |
1822 | 0 | GinTuple *tup; |
1823 | 0 | Size tuplen; |
1824 | |
|
1825 | 0 | GinBuffer *buffer; |
1826 | | |
1827 | | /* |
1828 | | * Initialize buffer to combine entries for the same key. |
1829 | | * |
1830 | | * The workers are limited to the same amount of memory as during the sort |
1831 | | * in ginBuildCallbackParallel. But this probably should be the 32MB used |
1832 | | * during planning, just like there. |
1833 | | */ |
1834 | 0 | buffer = GinBufferInit(state->ginstate.index); |
1835 | | |
1836 | | /* sort the raw per-worker data */ |
1837 | 0 | if (progress) |
1838 | 0 | pgstat_progress_update_param(PROGRESS_CREATEIDX_SUBPHASE, |
1839 | 0 | PROGRESS_GIN_PHASE_PERFORMSORT_1); |
1840 | |
|
1841 | 0 | tuplesort_performsort(state->bs_worker_sort); |
1842 | | |
1843 | | /* reset the number of GIN tuples produced by this worker */ |
1844 | 0 | state->bs_numtuples = 0; |
1845 | |
|
1846 | 0 | if (progress) |
1847 | 0 | pgstat_progress_update_param(PROGRESS_CREATEIDX_SUBPHASE, |
1848 | 0 | PROGRESS_GIN_PHASE_MERGE_1); |
1849 | | |
1850 | | /* |
1851 | | * Read the GIN tuples from the shared tuplesort, sorted by the key, and |
1852 | | * merge them into larger chunks for the leader to combine. |
1853 | | */ |
1854 | 0 | while ((tup = tuplesort_getgintuple(worker_sort, &tuplen, true)) != NULL) |
1855 | 0 | { |
1856 | |
|
1857 | 0 | CHECK_FOR_INTERRUPTS(); |
1858 | | |
1859 | | /* |
1860 | | * If the buffer can accept the new GIN tuple, just store it there and |
1861 | | * we're done. If it's a different key (or maybe too much data) flush |
1862 | | * the current contents into the index first. |
1863 | | */ |
1864 | 0 | if (!GinBufferCanAddKey(buffer, tup)) |
1865 | 0 | { |
1866 | 0 | GinTuple *ntup; |
1867 | 0 | Size ntuplen; |
1868 | | |
1869 | | /* |
1870 | | * Buffer is not empty and it's storing a different key - flush |
1871 | | * the data into the insert, and start a new entry for current |
1872 | | * GinTuple. |
1873 | | */ |
1874 | 0 | AssertCheckItemPointers(buffer); |
1875 | |
|
1876 | 0 | ntup = _gin_build_tuple(buffer->attnum, buffer->category, |
1877 | 0 | buffer->key, buffer->typlen, buffer->typbyval, |
1878 | 0 | buffer->items, buffer->nitems, &ntuplen); |
1879 | |
|
1880 | 0 | tuplesort_putgintuple(state->bs_sortstate, ntup, ntuplen); |
1881 | 0 | state->bs_numtuples++; |
1882 | |
|
1883 | 0 | pfree(ntup); |
1884 | | |
1885 | | /* discard the existing data */ |
1886 | 0 | GinBufferReset(buffer); |
1887 | 0 | } |
1888 | | |
1889 | | /* |
1890 | | * We're about to add a GIN tuple to the buffer - check the memory |
1891 | | * limit first, and maybe write out some of the data into the index |
1892 | | * first, if needed (and possible). We only flush the part of the TID |
1893 | | * list that we know won't change, and only if there's enough data for |
1894 | | * compression to work well. |
1895 | | */ |
1896 | 0 | if (GinBufferShouldTrim(buffer, tup)) |
1897 | 0 | { |
1898 | 0 | GinTuple *ntup; |
1899 | 0 | Size ntuplen; |
1900 | |
|
1901 | 0 | Assert(buffer->nfrozen > 0); |
1902 | | |
1903 | | /* |
1904 | | * Buffer is not empty and it's storing a different key - flush |
1905 | | * the data into the insert, and start a new entry for current |
1906 | | * GinTuple. |
1907 | | */ |
1908 | 0 | AssertCheckItemPointers(buffer); |
1909 | |
|
1910 | 0 | ntup = _gin_build_tuple(buffer->attnum, buffer->category, |
1911 | 0 | buffer->key, buffer->typlen, buffer->typbyval, |
1912 | 0 | buffer->items, buffer->nfrozen, &ntuplen); |
1913 | |
|
1914 | 0 | tuplesort_putgintuple(state->bs_sortstate, ntup, ntuplen); |
1915 | |
|
1916 | 0 | pfree(ntup); |
1917 | | |
1918 | | /* truncate the data we've just discarded */ |
1919 | 0 | GinBufferTrim(buffer); |
1920 | 0 | } |
1921 | | |
1922 | | /* |
1923 | | * Remember data for the current tuple (either remember the new key, |
1924 | | * or append if to the existing data). |
1925 | | */ |
1926 | 0 | GinBufferStoreTuple(buffer, tup); |
1927 | 0 | } |
1928 | | |
1929 | | /* flush data remaining in the buffer (for the last key) */ |
1930 | 0 | if (!GinBufferIsEmpty(buffer)) |
1931 | 0 | { |
1932 | 0 | GinTuple *ntup; |
1933 | 0 | Size ntuplen; |
1934 | |
|
1935 | 0 | AssertCheckItemPointers(buffer); |
1936 | |
|
1937 | 0 | ntup = _gin_build_tuple(buffer->attnum, buffer->category, |
1938 | 0 | buffer->key, buffer->typlen, buffer->typbyval, |
1939 | 0 | buffer->items, buffer->nitems, &ntuplen); |
1940 | |
|
1941 | 0 | tuplesort_putgintuple(state->bs_sortstate, ntup, ntuplen); |
1942 | 0 | state->bs_numtuples++; |
1943 | |
|
1944 | 0 | pfree(ntup); |
1945 | | |
1946 | | /* discard the existing data */ |
1947 | 0 | GinBufferReset(buffer); |
1948 | 0 | } |
1949 | | |
1950 | | /* relase all the memory */ |
1951 | 0 | GinBufferFree(buffer); |
1952 | |
|
1953 | 0 | tuplesort_end(worker_sort); |
1954 | 0 | } |
1955 | | |
1956 | | /* |
1957 | | * Perform a worker's portion of a parallel GIN index build sort. |
1958 | | * |
1959 | | * This generates a tuplesort for the worker portion of the table. |
1960 | | * |
1961 | | * sortmem is the amount of working memory to use within each worker, |
1962 | | * expressed in KBs. |
1963 | | * |
1964 | | * When this returns, workers are done, and need only release resources. |
1965 | | * |
1966 | | * Before feeding data into a shared tuplesort (for the leader process), |
1967 | | * the workers process data in two phases. |
1968 | | * |
1969 | | * 1) A worker reads a portion of rows from the table, accumulates entries |
1970 | | * in memory, and flushes them into a private tuplesort (e.g. because of |
1971 | | * using too much memory). |
1972 | | * |
1973 | | * 2) The private tuplesort gets sorted (by key and TID), the worker reads |
1974 | | * the data again, and combines the entries as much as possible. This has |
1975 | | * to happen eventually, and this way it's done in workers in parallel. |
1976 | | * |
1977 | | * Finally, the combined entries are written into the shared tuplesort, so |
1978 | | * that the leader can process them. |
1979 | | * |
1980 | | * How well this works (compared to just writing entries into the shared |
1981 | | * tuplesort) depends on the data set. For large tables with many distinct |
1982 | | * keys this helps a lot. With many distinct keys it's likely the buffers has |
1983 | | * to be flushed often, generating many entries with the same key and short |
1984 | | * TID lists. These entries need to be sorted and merged at some point, |
1985 | | * before writing them to the index. The merging is quite expensive, it can |
1986 | | * easily be ~50% of a serial build, and doing as much of it in the workers |
1987 | | * means it's parallelized. The leader still has to merge results from the |
1988 | | * workers, but it's much more efficient to merge few large entries than |
1989 | | * many tiny ones. |
1990 | | * |
1991 | | * This also reduces the amount of data the workers pass to the leader through |
1992 | | * the shared tuplesort. OTOH the workers need more space for the private sort, |
1993 | | * possibly up to 2x of the data, if no entries be merged in a worker. But this |
1994 | | * is very unlikely, and the only consequence is inefficiency, so we ignore it. |
1995 | | */ |
1996 | | static void |
1997 | | _gin_parallel_scan_and_build(GinBuildState *state, |
1998 | | GinBuildShared *ginshared, Sharedsort *sharedsort, |
1999 | | Relation heap, Relation index, |
2000 | | int sortmem, bool progress) |
2001 | 0 | { |
2002 | 0 | SortCoordinate coordinate; |
2003 | 0 | TableScanDesc scan; |
2004 | 0 | double reltuples; |
2005 | 0 | IndexInfo *indexInfo; |
2006 | | |
2007 | | /* Initialize local tuplesort coordination state */ |
2008 | 0 | coordinate = palloc0(sizeof(SortCoordinateData)); |
2009 | 0 | coordinate->isWorker = true; |
2010 | 0 | coordinate->nParticipants = -1; |
2011 | 0 | coordinate->sharedsort = sharedsort; |
2012 | | |
2013 | | /* remember how much space is allowed for the accumulated entries */ |
2014 | 0 | state->work_mem = (sortmem / 2); |
2015 | | |
2016 | | /* Begin "partial" tuplesort */ |
2017 | 0 | state->bs_sortstate = tuplesort_begin_index_gin(heap, index, |
2018 | 0 | state->work_mem, |
2019 | 0 | coordinate, |
2020 | 0 | TUPLESORT_NONE); |
2021 | | |
2022 | | /* Local per-worker sort of raw-data */ |
2023 | 0 | state->bs_worker_sort = tuplesort_begin_index_gin(heap, index, |
2024 | 0 | state->work_mem, |
2025 | 0 | NULL, |
2026 | 0 | TUPLESORT_NONE); |
2027 | | |
2028 | | /* Join parallel scan */ |
2029 | 0 | indexInfo = BuildIndexInfo(index); |
2030 | 0 | indexInfo->ii_Concurrent = ginshared->isconcurrent; |
2031 | |
|
2032 | 0 | scan = table_beginscan_parallel(heap, |
2033 | 0 | ParallelTableScanFromGinBuildShared(ginshared)); |
2034 | |
|
2035 | 0 | reltuples = table_index_build_scan(heap, index, indexInfo, true, progress, |
2036 | 0 | ginBuildCallbackParallel, state, scan); |
2037 | | |
2038 | | /* write remaining accumulated entries */ |
2039 | 0 | ginFlushBuildState(state, index); |
2040 | | |
2041 | | /* |
2042 | | * Do the first phase of in-worker processing - sort the data produced by |
2043 | | * the callback, and combine them into much larger chunks and place that |
2044 | | * into the shared tuplestore for leader to process. |
2045 | | */ |
2046 | 0 | _gin_process_worker_data(state, state->bs_worker_sort, progress); |
2047 | | |
2048 | | /* sort the GIN tuples built by this worker */ |
2049 | 0 | tuplesort_performsort(state->bs_sortstate); |
2050 | |
|
2051 | 0 | state->bs_reltuples += reltuples; |
2052 | | |
2053 | | /* |
2054 | | * Done. Record ambuild statistics. |
2055 | | */ |
2056 | 0 | SpinLockAcquire(&ginshared->mutex); |
2057 | 0 | ginshared->nparticipantsdone++; |
2058 | 0 | ginshared->reltuples += state->bs_reltuples; |
2059 | 0 | ginshared->indtuples += state->bs_numtuples; |
2060 | 0 | SpinLockRelease(&ginshared->mutex); |
2061 | | |
2062 | | /* Notify leader */ |
2063 | 0 | ConditionVariableSignal(&ginshared->workersdonecv); |
2064 | |
|
2065 | 0 | tuplesort_end(state->bs_sortstate); |
2066 | 0 | } |
2067 | | |
2068 | | /* |
2069 | | * Perform work within a launched parallel process. |
2070 | | */ |
2071 | | void |
2072 | | _gin_parallel_build_main(dsm_segment *seg, shm_toc *toc) |
2073 | 0 | { |
2074 | 0 | char *sharedquery; |
2075 | 0 | GinBuildShared *ginshared; |
2076 | 0 | Sharedsort *sharedsort; |
2077 | 0 | GinBuildState buildstate; |
2078 | 0 | Relation heapRel; |
2079 | 0 | Relation indexRel; |
2080 | 0 | LOCKMODE heapLockmode; |
2081 | 0 | LOCKMODE indexLockmode; |
2082 | 0 | WalUsage *walusage; |
2083 | 0 | BufferUsage *bufferusage; |
2084 | 0 | int sortmem; |
2085 | | |
2086 | | /* |
2087 | | * The only possible status flag that can be set to the parallel worker is |
2088 | | * PROC_IN_SAFE_IC. |
2089 | | */ |
2090 | 0 | Assert((MyProc->statusFlags == 0) || |
2091 | 0 | (MyProc->statusFlags == PROC_IN_SAFE_IC)); |
2092 | | |
2093 | | /* Set debug_query_string for individual workers first */ |
2094 | 0 | sharedquery = shm_toc_lookup(toc, PARALLEL_KEY_QUERY_TEXT, true); |
2095 | 0 | debug_query_string = sharedquery; |
2096 | | |
2097 | | /* Report the query string from leader */ |
2098 | 0 | pgstat_report_activity(STATE_RUNNING, debug_query_string); |
2099 | | |
2100 | | /* Look up gin shared state */ |
2101 | 0 | ginshared = shm_toc_lookup(toc, PARALLEL_KEY_GIN_SHARED, false); |
2102 | | |
2103 | | /* Open relations using lock modes known to be obtained by index.c */ |
2104 | 0 | if (!ginshared->isconcurrent) |
2105 | 0 | { |
2106 | 0 | heapLockmode = ShareLock; |
2107 | 0 | indexLockmode = AccessExclusiveLock; |
2108 | 0 | } |
2109 | 0 | else |
2110 | 0 | { |
2111 | 0 | heapLockmode = ShareUpdateExclusiveLock; |
2112 | 0 | indexLockmode = RowExclusiveLock; |
2113 | 0 | } |
2114 | | |
2115 | | /* Open relations within worker */ |
2116 | 0 | heapRel = table_open(ginshared->heaprelid, heapLockmode); |
2117 | 0 | indexRel = index_open(ginshared->indexrelid, indexLockmode); |
2118 | | |
2119 | | /* initialize the GIN build state */ |
2120 | 0 | initGinState(&buildstate.ginstate, indexRel); |
2121 | 0 | buildstate.indtuples = 0; |
2122 | 0 | memset(&buildstate.buildStats, 0, sizeof(GinStatsData)); |
2123 | 0 | memset(&buildstate.tid, 0, sizeof(ItemPointerData)); |
2124 | | |
2125 | | /* |
2126 | | * create a temporary memory context that is used to hold data not yet |
2127 | | * dumped out to the index |
2128 | | */ |
2129 | 0 | buildstate.tmpCtx = AllocSetContextCreate(CurrentMemoryContext, |
2130 | 0 | "Gin build temporary context", |
2131 | 0 | ALLOCSET_DEFAULT_SIZES); |
2132 | | |
2133 | | /* |
2134 | | * create a temporary memory context that is used for calling |
2135 | | * ginExtractEntries(), and can be reset after each tuple |
2136 | | */ |
2137 | 0 | buildstate.funcCtx = AllocSetContextCreate(CurrentMemoryContext, |
2138 | 0 | "Gin build temporary context for user-defined function", |
2139 | 0 | ALLOCSET_DEFAULT_SIZES); |
2140 | |
|
2141 | 0 | buildstate.accum.ginstate = &buildstate.ginstate; |
2142 | 0 | ginInitBA(&buildstate.accum); |
2143 | | |
2144 | | |
2145 | | /* Look up shared state private to tuplesort.c */ |
2146 | 0 | sharedsort = shm_toc_lookup(toc, PARALLEL_KEY_TUPLESORT, false); |
2147 | 0 | tuplesort_attach_shared(sharedsort, seg); |
2148 | | |
2149 | | /* Prepare to track buffer usage during parallel execution */ |
2150 | 0 | InstrStartParallelQuery(); |
2151 | | |
2152 | | /* |
2153 | | * Might as well use reliable figure when doling out maintenance_work_mem |
2154 | | * (when requested number of workers were not launched, this will be |
2155 | | * somewhat higher than it is for other workers). |
2156 | | */ |
2157 | 0 | sortmem = maintenance_work_mem / ginshared->scantuplesortstates; |
2158 | |
|
2159 | 0 | _gin_parallel_scan_and_build(&buildstate, ginshared, sharedsort, |
2160 | 0 | heapRel, indexRel, sortmem, false); |
2161 | | |
2162 | | /* Report WAL/buffer usage during parallel execution */ |
2163 | 0 | bufferusage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false); |
2164 | 0 | walusage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false); |
2165 | 0 | InstrEndParallelQuery(&bufferusage[ParallelWorkerNumber], |
2166 | 0 | &walusage[ParallelWorkerNumber]); |
2167 | |
|
2168 | 0 | index_close(indexRel, indexLockmode); |
2169 | 0 | table_close(heapRel, heapLockmode); |
2170 | 0 | } |
2171 | | |
2172 | | /* |
2173 | | * Used to keep track of compressed TID lists when building a GIN tuple. |
2174 | | */ |
2175 | | typedef struct |
2176 | | { |
2177 | | dlist_node node; /* linked list pointers */ |
2178 | | GinPostingList *seg; |
2179 | | } GinSegmentInfo; |
2180 | | |
2181 | | /* |
2182 | | * _gin_build_tuple |
2183 | | * Serialize the state for an index key into a tuple for tuplesort. |
2184 | | * |
2185 | | * The tuple has a number of scalar fields (mostly matching the build state), |
2186 | | * and then a data array that stores the key first, and then the TID list. |
2187 | | * |
2188 | | * For by-reference data types, we store the actual data. For by-val types |
2189 | | * we simply copy the whole Datum, so that we don't have to care about stuff |
2190 | | * like endianess etc. We could make it a little bit smaller, but it's not |
2191 | | * worth it - it's a tiny fraction of the data, and we need to MAXALIGN the |
2192 | | * start of the TID list anyway. So we wouldn't save anything. |
2193 | | * |
2194 | | * The TID list is serialized as compressed - it's highly compressible, and |
2195 | | * we already have ginCompressPostingList for this purpose. The list may be |
2196 | | * pretty long, so we compress it into multiple segments and then copy all |
2197 | | * of that into the GIN tuple. |
2198 | | */ |
2199 | | static GinTuple * |
2200 | | _gin_build_tuple(OffsetNumber attrnum, unsigned char category, |
2201 | | Datum key, int16 typlen, bool typbyval, |
2202 | | ItemPointerData *items, uint32 nitems, |
2203 | | Size *len) |
2204 | 0 | { |
2205 | 0 | GinTuple *tuple; |
2206 | 0 | char *ptr; |
2207 | |
|
2208 | 0 | Size tuplen; |
2209 | 0 | int keylen; |
2210 | |
|
2211 | 0 | dlist_mutable_iter iter; |
2212 | 0 | dlist_head segments; |
2213 | 0 | int ncompressed; |
2214 | 0 | Size compresslen; |
2215 | | |
2216 | | /* |
2217 | | * Calculate how long is the key value. Only keys with GIN_CAT_NORM_KEY |
2218 | | * have actual non-empty key. We include varlena headers and \0 bytes for |
2219 | | * strings, to make it easier to access the data in-line. |
2220 | | * |
2221 | | * For byval types we simply copy the whole Datum. We could store just the |
2222 | | * necessary bytes, but this is simpler to work with and not worth the |
2223 | | * extra complexity. Moreover we still need to do the MAXALIGN to allow |
2224 | | * direct access to items pointers. |
2225 | | * |
2226 | | * XXX Note that for byval types we store the whole datum, no matter what |
2227 | | * the typlen value is. |
2228 | | */ |
2229 | 0 | if (category != GIN_CAT_NORM_KEY) |
2230 | 0 | keylen = 0; |
2231 | 0 | else if (typbyval) |
2232 | 0 | keylen = sizeof(Datum); |
2233 | 0 | else if (typlen > 0) |
2234 | 0 | keylen = typlen; |
2235 | 0 | else if (typlen == -1) |
2236 | 0 | keylen = VARSIZE_ANY(key); |
2237 | 0 | else if (typlen == -2) |
2238 | 0 | keylen = strlen(DatumGetPointer(key)) + 1; |
2239 | 0 | else |
2240 | 0 | elog(ERROR, "unexpected typlen value (%d)", typlen); |
2241 | | |
2242 | | /* compress the item pointers */ |
2243 | 0 | ncompressed = 0; |
2244 | 0 | compresslen = 0; |
2245 | 0 | dlist_init(&segments); |
2246 | | |
2247 | | /* generate compressed segments of TID list chunks */ |
2248 | 0 | while (ncompressed < nitems) |
2249 | 0 | { |
2250 | 0 | int cnt; |
2251 | 0 | GinSegmentInfo *seginfo = palloc(sizeof(GinSegmentInfo)); |
2252 | |
|
2253 | 0 | seginfo->seg = ginCompressPostingList(&items[ncompressed], |
2254 | 0 | (nitems - ncompressed), |
2255 | 0 | UINT16_MAX, |
2256 | 0 | &cnt); |
2257 | |
|
2258 | 0 | ncompressed += cnt; |
2259 | 0 | compresslen += SizeOfGinPostingList(seginfo->seg); |
2260 | |
|
2261 | 0 | dlist_push_tail(&segments, &seginfo->node); |
2262 | 0 | } |
2263 | | |
2264 | | /* |
2265 | | * Determine GIN tuple length with all the data included. Be careful about |
2266 | | * alignment, to allow direct access to compressed segments (those require |
2267 | | * only SHORTALIGN). |
2268 | | */ |
2269 | 0 | tuplen = SHORTALIGN(offsetof(GinTuple, data) + keylen) + compresslen; |
2270 | |
|
2271 | 0 | *len = tuplen; |
2272 | | |
2273 | | /* |
2274 | | * Allocate space for the whole GIN tuple. |
2275 | | * |
2276 | | * The palloc0 is needed - writetup_index_gin will write the whole tuple |
2277 | | * to disk, so we need to make sure the padding bytes are defined |
2278 | | * (otherwise valgrind would report this). |
2279 | | */ |
2280 | 0 | tuple = palloc0(tuplen); |
2281 | |
|
2282 | 0 | tuple->tuplen = tuplen; |
2283 | 0 | tuple->attrnum = attrnum; |
2284 | 0 | tuple->category = category; |
2285 | 0 | tuple->keylen = keylen; |
2286 | 0 | tuple->nitems = nitems; |
2287 | | |
2288 | | /* key type info */ |
2289 | 0 | tuple->typlen = typlen; |
2290 | 0 | tuple->typbyval = typbyval; |
2291 | | |
2292 | | /* |
2293 | | * Copy the key and items into the tuple. First the key value, which we |
2294 | | * can simply copy right at the beginning of the data array. |
2295 | | */ |
2296 | 0 | if (category == GIN_CAT_NORM_KEY) |
2297 | 0 | { |
2298 | 0 | if (typbyval) |
2299 | 0 | { |
2300 | 0 | memcpy(tuple->data, &key, sizeof(Datum)); |
2301 | 0 | } |
2302 | 0 | else if (typlen > 0) /* byref, fixed length */ |
2303 | 0 | { |
2304 | 0 | memcpy(tuple->data, DatumGetPointer(key), typlen); |
2305 | 0 | } |
2306 | 0 | else if (typlen == -1) |
2307 | 0 | { |
2308 | 0 | memcpy(tuple->data, DatumGetPointer(key), keylen); |
2309 | 0 | } |
2310 | 0 | else if (typlen == -2) |
2311 | 0 | { |
2312 | 0 | memcpy(tuple->data, DatumGetPointer(key), keylen); |
2313 | 0 | } |
2314 | 0 | } |
2315 | | |
2316 | | /* finally, copy the TIDs into the array */ |
2317 | 0 | ptr = (char *) tuple + SHORTALIGN(offsetof(GinTuple, data) + keylen); |
2318 | | |
2319 | | /* copy in the compressed data, and free the segments */ |
2320 | 0 | dlist_foreach_modify(iter, &segments) |
2321 | 0 | { |
2322 | 0 | GinSegmentInfo *seginfo = dlist_container(GinSegmentInfo, node, iter.cur); |
2323 | |
|
2324 | 0 | memcpy(ptr, seginfo->seg, SizeOfGinPostingList(seginfo->seg)); |
2325 | |
|
2326 | 0 | ptr += SizeOfGinPostingList(seginfo->seg); |
2327 | |
|
2328 | 0 | dlist_delete(&seginfo->node); |
2329 | |
|
2330 | 0 | pfree(seginfo->seg); |
2331 | 0 | pfree(seginfo); |
2332 | 0 | } |
2333 | |
|
2334 | 0 | return tuple; |
2335 | 0 | } |
2336 | | |
2337 | | /* |
2338 | | * _gin_parse_tuple_key |
2339 | | * Return a Datum representing the key stored in the tuple. |
2340 | | * |
2341 | | * Most of the tuple fields are directly accessible, the only thing that |
2342 | | * needs more care is the key and the TID list. |
2343 | | * |
2344 | | * For the key, this returns a regular Datum representing it. It's either the |
2345 | | * actual key value, or a pointer to the beginning of the data array (which is |
2346 | | * where the data was copied by _gin_build_tuple). |
2347 | | */ |
2348 | | static Datum |
2349 | | _gin_parse_tuple_key(GinTuple *a) |
2350 | 0 | { |
2351 | 0 | Datum key; |
2352 | |
|
2353 | 0 | if (a->category != GIN_CAT_NORM_KEY) |
2354 | 0 | return (Datum) 0; |
2355 | | |
2356 | 0 | if (a->typbyval) |
2357 | 0 | { |
2358 | 0 | memcpy(&key, a->data, a->keylen); |
2359 | 0 | return key; |
2360 | 0 | } |
2361 | | |
2362 | 0 | return PointerGetDatum(a->data); |
2363 | 0 | } |
2364 | | |
2365 | | /* |
2366 | | * _gin_parse_tuple_items |
2367 | | * Return a pointer to a palloc'd array of decompressed TID array. |
2368 | | */ |
2369 | | static ItemPointer |
2370 | | _gin_parse_tuple_items(GinTuple *a) |
2371 | 0 | { |
2372 | 0 | int len; |
2373 | 0 | char *ptr; |
2374 | 0 | int ndecoded; |
2375 | 0 | ItemPointer items; |
2376 | |
|
2377 | 0 | len = a->tuplen - SHORTALIGN(offsetof(GinTuple, data) + a->keylen); |
2378 | 0 | ptr = (char *) a + SHORTALIGN(offsetof(GinTuple, data) + a->keylen); |
2379 | |
|
2380 | 0 | items = ginPostingListDecodeAllSegments((GinPostingList *) ptr, len, &ndecoded); |
2381 | |
|
2382 | 0 | Assert(ndecoded == a->nitems); |
2383 | |
|
2384 | 0 | return (ItemPointer) items; |
2385 | 0 | } |
2386 | | |
2387 | | /* |
2388 | | * _gin_compare_tuples |
2389 | | * Compare GIN tuples, used by tuplesort during parallel index build. |
2390 | | * |
2391 | | * The scalar fields (attrnum, category) are compared first, the key value is |
2392 | | * compared last. The comparisons are done using type-specific sort support |
2393 | | * functions. |
2394 | | * |
2395 | | * If the key value matches, we compare the first TID value in the TID list, |
2396 | | * which means the tuples are merged in an order in which they are most |
2397 | | * likely to be simply concatenated. (This "first" TID will also allow us |
2398 | | * to determine a point up to which the list is fully determined and can be |
2399 | | * written into the index to enforce a memory limit etc.) |
2400 | | */ |
2401 | | int |
2402 | | _gin_compare_tuples(GinTuple *a, GinTuple *b, SortSupport ssup) |
2403 | 0 | { |
2404 | 0 | int r; |
2405 | 0 | Datum keya, |
2406 | 0 | keyb; |
2407 | |
|
2408 | 0 | if (a->attrnum < b->attrnum) |
2409 | 0 | return -1; |
2410 | | |
2411 | 0 | if (a->attrnum > b->attrnum) |
2412 | 0 | return 1; |
2413 | | |
2414 | 0 | if (a->category < b->category) |
2415 | 0 | return -1; |
2416 | | |
2417 | 0 | if (a->category > b->category) |
2418 | 0 | return 1; |
2419 | | |
2420 | 0 | if (a->category == GIN_CAT_NORM_KEY) |
2421 | 0 | { |
2422 | 0 | keya = _gin_parse_tuple_key(a); |
2423 | 0 | keyb = _gin_parse_tuple_key(b); |
2424 | |
|
2425 | 0 | r = ApplySortComparator(keya, false, |
2426 | 0 | keyb, false, |
2427 | 0 | &ssup[a->attrnum - 1]); |
2428 | | |
2429 | | /* if the key is the same, consider the first TID in the array */ |
2430 | 0 | return (r != 0) ? r : ItemPointerCompare(GinTupleGetFirst(a), |
2431 | 0 | GinTupleGetFirst(b)); |
2432 | 0 | } |
2433 | | |
2434 | 0 | return ItemPointerCompare(GinTupleGetFirst(a), |
2435 | 0 | GinTupleGetFirst(b)); |
2436 | 0 | } |