/src/postgres/src/backend/executor/nodeHashjoin.c
Line | Count | Source (jump to first uncovered line) |
1 | | /*------------------------------------------------------------------------- |
2 | | * |
3 | | * nodeHashjoin.c |
4 | | * Routines to handle hash join nodes |
5 | | * |
6 | | * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group |
7 | | * Portions Copyright (c) 1994, Regents of the University of California |
8 | | * |
9 | | * |
10 | | * IDENTIFICATION |
11 | | * src/backend/executor/nodeHashjoin.c |
12 | | * |
13 | | * HASH JOIN |
14 | | * |
15 | | * This is based on the "hybrid hash join" algorithm described shortly in the |
16 | | * following page |
17 | | * |
18 | | * https://en.wikipedia.org/wiki/Hash_join#Hybrid_hash_join |
19 | | * |
20 | | * and in detail in the referenced paper: |
21 | | * |
22 | | * "An Adaptive Hash Join Algorithm for Multiuser Environments" |
23 | | * Hansjörg Zeller; Jim Gray (1990). Proceedings of the 16th VLDB conference. |
24 | | * Brisbane: 186–197. |
25 | | * |
26 | | * If the inner side tuples of a hash join do not fit in memory, the hash join |
27 | | * can be executed in multiple batches. |
28 | | * |
29 | | * If the statistics on the inner side relation are accurate, planner chooses a |
30 | | * multi-batch strategy and estimates the number of batches. |
31 | | * |
32 | | * The query executor measures the real size of the hashtable and increases the |
33 | | * number of batches if the hashtable grows too large. |
34 | | * |
35 | | * The number of batches is always a power of two, so an increase in the number |
36 | | * of batches doubles it. |
37 | | * |
38 | | * Serial hash join measures batch size lazily -- waiting until it is loading a |
39 | | * batch to determine if it will fit in memory. While inserting tuples into the |
40 | | * hashtable, serial hash join will, if that tuple were to exceed work_mem, |
41 | | * dump out the hashtable and reassign them either to other batch files or the |
42 | | * current batch resident in the hashtable. |
43 | | * |
44 | | * Parallel hash join, on the other hand, completes all changes to the number |
45 | | * of batches during the build phase. If it increases the number of batches, it |
46 | | * dumps out all the tuples from all batches and reassigns them to entirely new |
47 | | * batch files. Then it checks every batch to ensure it will fit in the space |
48 | | * budget for the query. |
49 | | * |
50 | | * In both parallel and serial hash join, the executor currently makes a best |
51 | | * effort. If a particular batch will not fit in memory, it tries doubling the |
52 | | * number of batches. If after a batch increase, there is a batch which |
53 | | * retained all or none of its tuples, the executor disables growth in the |
54 | | * number of batches globally. After growth is disabled, all batches that would |
55 | | * have previously triggered an increase in the number of batches instead |
56 | | * exceed the space allowed. |
57 | | * |
58 | | * PARALLELISM |
59 | | * |
60 | | * Hash joins can participate in parallel query execution in several ways. A |
61 | | * parallel-oblivious hash join is one where the node is unaware that it is |
62 | | * part of a parallel plan. In this case, a copy of the inner plan is used to |
63 | | * build a copy of the hash table in every backend, and the outer plan could |
64 | | * either be built from a partial or complete path, so that the results of the |
65 | | * hash join are correspondingly either partial or complete. A parallel-aware |
66 | | * hash join is one that behaves differently, coordinating work between |
67 | | * backends, and appears as Parallel Hash Join in EXPLAIN output. A Parallel |
68 | | * Hash Join always appears with a Parallel Hash node. |
69 | | * |
70 | | * Parallel-aware hash joins use the same per-backend state machine to track |
71 | | * progress through the hash join algorithm as parallel-oblivious hash joins. |
72 | | * In a parallel-aware hash join, there is also a shared state machine that |
73 | | * co-operating backends use to synchronize their local state machines and |
74 | | * program counters. The shared state machine is managed with a Barrier IPC |
75 | | * primitive. When all attached participants arrive at a barrier, the phase |
76 | | * advances and all waiting participants are released. |
77 | | * |
78 | | * When a participant begins working on a parallel hash join, it must first |
79 | | * figure out how much progress has already been made, because participants |
80 | | * don't wait for each other to begin. For this reason there are switch |
81 | | * statements at key points in the code where we have to synchronize our local |
82 | | * state machine with the phase, and then jump to the correct part of the |
83 | | * algorithm so that we can get started. |
84 | | * |
85 | | * One barrier called build_barrier is used to coordinate the hashing phases. |
86 | | * The phase is represented by an integer which begins at zero and increments |
87 | | * one by one, but in the code it is referred to by symbolic names as follows. |
88 | | * An asterisk indicates a phase that is performed by a single arbitrarily |
89 | | * chosen process. |
90 | | * |
91 | | * PHJ_BUILD_ELECT -- initial state |
92 | | * PHJ_BUILD_ALLOCATE* -- one sets up the batches and table 0 |
93 | | * PHJ_BUILD_HASH_INNER -- all hash the inner rel |
94 | | * PHJ_BUILD_HASH_OUTER -- (multi-batch only) all hash the outer |
95 | | * PHJ_BUILD_RUN -- building done, probing can begin |
96 | | * PHJ_BUILD_FREE* -- all work complete, one frees batches |
97 | | * |
98 | | * While in the phase PHJ_BUILD_HASH_INNER a separate pair of barriers may |
99 | | * be used repeatedly as required to coordinate expansions in the number of |
100 | | * batches or buckets. Their phases are as follows: |
101 | | * |
102 | | * PHJ_GROW_BATCHES_ELECT -- initial state |
103 | | * PHJ_GROW_BATCHES_REALLOCATE* -- one allocates new batches |
104 | | * PHJ_GROW_BATCHES_REPARTITION -- all repartition |
105 | | * PHJ_GROW_BATCHES_DECIDE* -- one detects skew and cleans up |
106 | | * PHJ_GROW_BATCHES_FINISH -- finished one growth cycle |
107 | | * |
108 | | * PHJ_GROW_BUCKETS_ELECT -- initial state |
109 | | * PHJ_GROW_BUCKETS_REALLOCATE* -- one allocates new buckets |
110 | | * PHJ_GROW_BUCKETS_REINSERT -- all insert tuples |
111 | | * |
112 | | * If the planner got the number of batches and buckets right, those won't be |
113 | | * necessary, but on the other hand we might finish up needing to expand the |
114 | | * buckets or batches multiple times while hashing the inner relation to stay |
115 | | * within our memory budget and load factor target. For that reason it's a |
116 | | * separate pair of barriers using circular phases. |
117 | | * |
118 | | * The PHJ_BUILD_HASH_OUTER phase is required only for multi-batch joins, |
119 | | * because we need to divide the outer relation into batches up front in order |
120 | | * to be able to process batches entirely independently. In contrast, the |
121 | | * parallel-oblivious algorithm simply throws tuples 'forward' to 'later' |
122 | | * batches whenever it encounters them while scanning and probing, which it |
123 | | * can do because it processes batches in serial order. |
124 | | * |
125 | | * Once PHJ_BUILD_RUN is reached, backends then split up and process |
126 | | * different batches, or gang up and work together on probing batches if there |
127 | | * aren't enough to go around. For each batch there is a separate barrier |
128 | | * with the following phases: |
129 | | * |
130 | | * PHJ_BATCH_ELECT -- initial state |
131 | | * PHJ_BATCH_ALLOCATE* -- one allocates buckets |
132 | | * PHJ_BATCH_LOAD -- all load the hash table from disk |
133 | | * PHJ_BATCH_PROBE -- all probe |
134 | | * PHJ_BATCH_SCAN* -- one does right/right-anti/full unmatched scan |
135 | | * PHJ_BATCH_FREE* -- one frees memory |
136 | | * |
137 | | * Batch 0 is a special case, because it starts out in phase |
138 | | * PHJ_BATCH_PROBE; populating batch 0's hash table is done during |
139 | | * PHJ_BUILD_HASH_INNER so we can skip loading. |
140 | | * |
141 | | * Initially we try to plan for a single-batch hash join using the combined |
142 | | * hash_mem of all participants to create a large shared hash table. If that |
143 | | * turns out either at planning or execution time to be impossible then we |
144 | | * fall back to regular hash_mem sized hash tables. |
145 | | * |
146 | | * To avoid deadlocks, we never wait for any barrier unless it is known that |
147 | | * all other backends attached to it are actively executing the node or have |
148 | | * finished. Practically, that means that we never emit a tuple while attached |
149 | | * to a barrier, unless the barrier has reached a phase that means that no |
150 | | * process will wait on it again. We emit tuples while attached to the build |
151 | | * barrier in phase PHJ_BUILD_RUN, and to a per-batch barrier in phase |
152 | | * PHJ_BATCH_PROBE. These are advanced to PHJ_BUILD_FREE and PHJ_BATCH_SCAN |
153 | | * respectively without waiting, using BarrierArriveAndDetach() and |
154 | | * BarrierArriveAndDetachExceptLast() respectively. The last to detach |
155 | | * receives a different return value so that it knows that it's safe to |
156 | | * clean up. Any straggler process that attaches after that phase is reached |
157 | | * will see that it's too late to participate or access the relevant shared |
158 | | * memory objects. |
159 | | * |
160 | | *------------------------------------------------------------------------- |
161 | | */ |
162 | | |
163 | | #include "postgres.h" |
164 | | |
165 | | #include "access/htup_details.h" |
166 | | #include "access/parallel.h" |
167 | | #include "executor/executor.h" |
168 | | #include "executor/hashjoin.h" |
169 | | #include "executor/nodeHash.h" |
170 | | #include "executor/nodeHashjoin.h" |
171 | | #include "miscadmin.h" |
172 | | #include "utils/lsyscache.h" |
173 | | #include "utils/sharedtuplestore.h" |
174 | | #include "utils/wait_event.h" |
175 | | |
176 | | |
177 | | /* |
178 | | * States of the ExecHashJoin state machine |
179 | | */ |
180 | 0 | #define HJ_BUILD_HASHTABLE 1 |
181 | 0 | #define HJ_NEED_NEW_OUTER 2 |
182 | 0 | #define HJ_SCAN_BUCKET 3 |
183 | 0 | #define HJ_FILL_OUTER_TUPLE 4 |
184 | 0 | #define HJ_FILL_INNER_TUPLES 5 |
185 | 0 | #define HJ_NEED_NEW_BATCH 6 |
186 | | |
187 | | /* Returns true if doing null-fill on outer relation */ |
188 | 0 | #define HJ_FILL_OUTER(hjstate) ((hjstate)->hj_NullInnerTupleSlot != NULL) |
189 | | /* Returns true if doing null-fill on inner relation */ |
190 | 0 | #define HJ_FILL_INNER(hjstate) ((hjstate)->hj_NullOuterTupleSlot != NULL) |
191 | | |
192 | | static TupleTableSlot *ExecHashJoinOuterGetTuple(PlanState *outerNode, |
193 | | HashJoinState *hjstate, |
194 | | uint32 *hashvalue); |
195 | | static TupleTableSlot *ExecParallelHashJoinOuterGetTuple(PlanState *outerNode, |
196 | | HashJoinState *hjstate, |
197 | | uint32 *hashvalue); |
198 | | static TupleTableSlot *ExecHashJoinGetSavedTuple(HashJoinState *hjstate, |
199 | | BufFile *file, |
200 | | uint32 *hashvalue, |
201 | | TupleTableSlot *tupleSlot); |
202 | | static bool ExecHashJoinNewBatch(HashJoinState *hjstate); |
203 | | static bool ExecParallelHashJoinNewBatch(HashJoinState *hjstate); |
204 | | static void ExecParallelHashJoinPartitionOuter(HashJoinState *hjstate); |
205 | | |
206 | | |
207 | | /* ---------------------------------------------------------------- |
208 | | * ExecHashJoinImpl |
209 | | * |
210 | | * This function implements the Hybrid Hashjoin algorithm. It is marked |
211 | | * with an always-inline attribute so that ExecHashJoin() and |
212 | | * ExecParallelHashJoin() can inline it. Compilers that respect the |
213 | | * attribute should create versions specialized for parallel == true and |
214 | | * parallel == false with unnecessary branches removed. |
215 | | * |
216 | | * Note: the relation we build hash table on is the "inner" |
217 | | * the other one is "outer". |
218 | | * ---------------------------------------------------------------- |
219 | | */ |
220 | | static pg_attribute_always_inline TupleTableSlot * |
221 | | ExecHashJoinImpl(PlanState *pstate, bool parallel) |
222 | 0 | { |
223 | 0 | HashJoinState *node = castNode(HashJoinState, pstate); |
224 | 0 | PlanState *outerNode; |
225 | 0 | HashState *hashNode; |
226 | 0 | ExprState *joinqual; |
227 | 0 | ExprState *otherqual; |
228 | 0 | ExprContext *econtext; |
229 | 0 | HashJoinTable hashtable; |
230 | 0 | TupleTableSlot *outerTupleSlot; |
231 | 0 | uint32 hashvalue; |
232 | 0 | int batchno; |
233 | 0 | ParallelHashJoinState *parallel_state; |
234 | | |
235 | | /* |
236 | | * get information from HashJoin node |
237 | | */ |
238 | 0 | joinqual = node->js.joinqual; |
239 | 0 | otherqual = node->js.ps.qual; |
240 | 0 | hashNode = (HashState *) innerPlanState(node); |
241 | 0 | outerNode = outerPlanState(node); |
242 | 0 | hashtable = node->hj_HashTable; |
243 | 0 | econtext = node->js.ps.ps_ExprContext; |
244 | 0 | parallel_state = hashNode->parallel_state; |
245 | | |
246 | | /* |
247 | | * Reset per-tuple memory context to free any expression evaluation |
248 | | * storage allocated in the previous tuple cycle. |
249 | | */ |
250 | 0 | ResetExprContext(econtext); |
251 | | |
252 | | /* |
253 | | * run the hash join state machine |
254 | | */ |
255 | 0 | for (;;) |
256 | 0 | { |
257 | | /* |
258 | | * It's possible to iterate this loop many times before returning a |
259 | | * tuple, in some pathological cases such as needing to move much of |
260 | | * the current batch to a later batch. So let's check for interrupts |
261 | | * each time through. |
262 | | */ |
263 | 0 | CHECK_FOR_INTERRUPTS(); |
264 | |
|
265 | 0 | switch (node->hj_JoinState) |
266 | 0 | { |
267 | 0 | case HJ_BUILD_HASHTABLE: |
268 | | |
269 | | /* |
270 | | * First time through: build hash table for inner relation. |
271 | | */ |
272 | 0 | Assert(hashtable == NULL); |
273 | | |
274 | | /* |
275 | | * If the outer relation is completely empty, and it's not |
276 | | * right/right-anti/full join, we can quit without building |
277 | | * the hash table. However, for an inner join it is only a |
278 | | * win to check this when the outer relation's startup cost is |
279 | | * less than the projected cost of building the hash table. |
280 | | * Otherwise it's best to build the hash table first and see |
281 | | * if the inner relation is empty. (When it's a left join, we |
282 | | * should always make this check, since we aren't going to be |
283 | | * able to skip the join on the strength of an empty inner |
284 | | * relation anyway.) |
285 | | * |
286 | | * If we are rescanning the join, we make use of information |
287 | | * gained on the previous scan: don't bother to try the |
288 | | * prefetch if the previous scan found the outer relation |
289 | | * nonempty. This is not 100% reliable since with new |
290 | | * parameters the outer relation might yield different |
291 | | * results, but it's a good heuristic. |
292 | | * |
293 | | * The only way to make the check is to try to fetch a tuple |
294 | | * from the outer plan node. If we succeed, we have to stash |
295 | | * it away for later consumption by ExecHashJoinOuterGetTuple. |
296 | | */ |
297 | 0 | if (HJ_FILL_INNER(node)) |
298 | 0 | { |
299 | | /* no chance to not build the hash table */ |
300 | 0 | node->hj_FirstOuterTupleSlot = NULL; |
301 | 0 | } |
302 | 0 | else if (parallel) |
303 | 0 | { |
304 | | /* |
305 | | * The empty-outer optimization is not implemented for |
306 | | * shared hash tables, because no one participant can |
307 | | * determine that there are no outer tuples, and it's not |
308 | | * yet clear that it's worth the synchronization overhead |
309 | | * of reaching consensus to figure that out. So we have |
310 | | * to build the hash table. |
311 | | */ |
312 | 0 | node->hj_FirstOuterTupleSlot = NULL; |
313 | 0 | } |
314 | 0 | else if (HJ_FILL_OUTER(node) || |
315 | 0 | (outerNode->plan->startup_cost < hashNode->ps.plan->total_cost && |
316 | 0 | !node->hj_OuterNotEmpty)) |
317 | 0 | { |
318 | 0 | node->hj_FirstOuterTupleSlot = ExecProcNode(outerNode); |
319 | 0 | if (TupIsNull(node->hj_FirstOuterTupleSlot)) |
320 | 0 | { |
321 | 0 | node->hj_OuterNotEmpty = false; |
322 | 0 | return NULL; |
323 | 0 | } |
324 | 0 | else |
325 | 0 | node->hj_OuterNotEmpty = true; |
326 | 0 | } |
327 | 0 | else |
328 | 0 | node->hj_FirstOuterTupleSlot = NULL; |
329 | | |
330 | | /* |
331 | | * Create the hash table. If using Parallel Hash, then |
332 | | * whoever gets here first will create the hash table and any |
333 | | * later arrivals will merely attach to it. |
334 | | */ |
335 | 0 | hashtable = ExecHashTableCreate(hashNode); |
336 | 0 | node->hj_HashTable = hashtable; |
337 | | |
338 | | /* |
339 | | * Execute the Hash node, to build the hash table. If using |
340 | | * Parallel Hash, then we'll try to help hashing unless we |
341 | | * arrived too late. |
342 | | */ |
343 | 0 | hashNode->hashtable = hashtable; |
344 | 0 | (void) MultiExecProcNode((PlanState *) hashNode); |
345 | | |
346 | | /* |
347 | | * If the inner relation is completely empty, and we're not |
348 | | * doing a left outer join, we can quit without scanning the |
349 | | * outer relation. |
350 | | */ |
351 | 0 | if (hashtable->totalTuples == 0 && !HJ_FILL_OUTER(node)) |
352 | 0 | { |
353 | 0 | if (parallel) |
354 | 0 | { |
355 | | /* |
356 | | * Advance the build barrier to PHJ_BUILD_RUN before |
357 | | * proceeding so we can negotiate resource cleanup. |
358 | | */ |
359 | 0 | Barrier *build_barrier = ¶llel_state->build_barrier; |
360 | |
|
361 | 0 | while (BarrierPhase(build_barrier) < PHJ_BUILD_RUN) |
362 | 0 | BarrierArriveAndWait(build_barrier, 0); |
363 | 0 | } |
364 | 0 | return NULL; |
365 | 0 | } |
366 | | |
367 | | /* |
368 | | * need to remember whether nbatch has increased since we |
369 | | * began scanning the outer relation |
370 | | */ |
371 | 0 | hashtable->nbatch_outstart = hashtable->nbatch; |
372 | | |
373 | | /* |
374 | | * Reset OuterNotEmpty for scan. (It's OK if we fetched a |
375 | | * tuple above, because ExecHashJoinOuterGetTuple will |
376 | | * immediately set it again.) |
377 | | */ |
378 | 0 | node->hj_OuterNotEmpty = false; |
379 | |
|
380 | 0 | if (parallel) |
381 | 0 | { |
382 | 0 | Barrier *build_barrier; |
383 | |
|
384 | 0 | build_barrier = ¶llel_state->build_barrier; |
385 | 0 | Assert(BarrierPhase(build_barrier) == PHJ_BUILD_HASH_OUTER || |
386 | 0 | BarrierPhase(build_barrier) == PHJ_BUILD_RUN || |
387 | 0 | BarrierPhase(build_barrier) == PHJ_BUILD_FREE); |
388 | 0 | if (BarrierPhase(build_barrier) == PHJ_BUILD_HASH_OUTER) |
389 | 0 | { |
390 | | /* |
391 | | * If multi-batch, we need to hash the outer relation |
392 | | * up front. |
393 | | */ |
394 | 0 | if (hashtable->nbatch > 1) |
395 | 0 | ExecParallelHashJoinPartitionOuter(node); |
396 | 0 | BarrierArriveAndWait(build_barrier, |
397 | 0 | WAIT_EVENT_HASH_BUILD_HASH_OUTER); |
398 | 0 | } |
399 | 0 | else if (BarrierPhase(build_barrier) == PHJ_BUILD_FREE) |
400 | 0 | { |
401 | | /* |
402 | | * If we attached so late that the job is finished and |
403 | | * the batch state has been freed, we can return |
404 | | * immediately. |
405 | | */ |
406 | 0 | return NULL; |
407 | 0 | } |
408 | | |
409 | | /* Each backend should now select a batch to work on. */ |
410 | 0 | Assert(BarrierPhase(build_barrier) == PHJ_BUILD_RUN); |
411 | 0 | hashtable->curbatch = -1; |
412 | 0 | node->hj_JoinState = HJ_NEED_NEW_BATCH; |
413 | |
|
414 | 0 | continue; |
415 | 0 | } |
416 | 0 | else |
417 | 0 | node->hj_JoinState = HJ_NEED_NEW_OUTER; |
418 | | |
419 | | /* FALL THRU */ |
420 | | |
421 | 0 | case HJ_NEED_NEW_OUTER: |
422 | | |
423 | | /* |
424 | | * We don't have an outer tuple, try to get the next one |
425 | | */ |
426 | 0 | if (parallel) |
427 | 0 | outerTupleSlot = |
428 | 0 | ExecParallelHashJoinOuterGetTuple(outerNode, node, |
429 | 0 | &hashvalue); |
430 | 0 | else |
431 | 0 | outerTupleSlot = |
432 | 0 | ExecHashJoinOuterGetTuple(outerNode, node, &hashvalue); |
433 | |
|
434 | 0 | if (TupIsNull(outerTupleSlot)) |
435 | 0 | { |
436 | | /* end of batch, or maybe whole join */ |
437 | 0 | if (HJ_FILL_INNER(node)) |
438 | 0 | { |
439 | | /* set up to scan for unmatched inner tuples */ |
440 | 0 | if (parallel) |
441 | 0 | { |
442 | | /* |
443 | | * Only one process is currently allow to handle |
444 | | * each batch's unmatched tuples, in a parallel |
445 | | * join. |
446 | | */ |
447 | 0 | if (ExecParallelPrepHashTableForUnmatched(node)) |
448 | 0 | node->hj_JoinState = HJ_FILL_INNER_TUPLES; |
449 | 0 | else |
450 | 0 | node->hj_JoinState = HJ_NEED_NEW_BATCH; |
451 | 0 | } |
452 | 0 | else |
453 | 0 | { |
454 | 0 | ExecPrepHashTableForUnmatched(node); |
455 | 0 | node->hj_JoinState = HJ_FILL_INNER_TUPLES; |
456 | 0 | } |
457 | 0 | } |
458 | 0 | else |
459 | 0 | node->hj_JoinState = HJ_NEED_NEW_BATCH; |
460 | 0 | continue; |
461 | 0 | } |
462 | | |
463 | 0 | econtext->ecxt_outertuple = outerTupleSlot; |
464 | 0 | node->hj_MatchedOuter = false; |
465 | | |
466 | | /* |
467 | | * Find the corresponding bucket for this tuple in the main |
468 | | * hash table or skew hash table. |
469 | | */ |
470 | 0 | node->hj_CurHashValue = hashvalue; |
471 | 0 | ExecHashGetBucketAndBatch(hashtable, hashvalue, |
472 | 0 | &node->hj_CurBucketNo, &batchno); |
473 | 0 | node->hj_CurSkewBucketNo = ExecHashGetSkewBucket(hashtable, |
474 | 0 | hashvalue); |
475 | 0 | node->hj_CurTuple = NULL; |
476 | | |
477 | | /* |
478 | | * The tuple might not belong to the current batch (where |
479 | | * "current batch" includes the skew buckets if any). |
480 | | */ |
481 | 0 | if (batchno != hashtable->curbatch && |
482 | 0 | node->hj_CurSkewBucketNo == INVALID_SKEW_BUCKET_NO) |
483 | 0 | { |
484 | 0 | bool shouldFree; |
485 | 0 | MinimalTuple mintuple = ExecFetchSlotMinimalTuple(outerTupleSlot, |
486 | 0 | &shouldFree); |
487 | | |
488 | | /* |
489 | | * Need to postpone this outer tuple to a later batch. |
490 | | * Save it in the corresponding outer-batch file. |
491 | | */ |
492 | 0 | Assert(parallel_state == NULL); |
493 | 0 | Assert(batchno > hashtable->curbatch); |
494 | 0 | ExecHashJoinSaveTuple(mintuple, hashvalue, |
495 | 0 | &hashtable->outerBatchFile[batchno], |
496 | 0 | hashtable); |
497 | |
|
498 | 0 | if (shouldFree) |
499 | 0 | heap_free_minimal_tuple(mintuple); |
500 | | |
501 | | /* Loop around, staying in HJ_NEED_NEW_OUTER state */ |
502 | 0 | continue; |
503 | 0 | } |
504 | | |
505 | | /* OK, let's scan the bucket for matches */ |
506 | 0 | node->hj_JoinState = HJ_SCAN_BUCKET; |
507 | | |
508 | | /* FALL THRU */ |
509 | |
|
510 | 0 | case HJ_SCAN_BUCKET: |
511 | | |
512 | | /* |
513 | | * Scan the selected hash bucket for matches to current outer |
514 | | */ |
515 | 0 | if (parallel) |
516 | 0 | { |
517 | 0 | if (!ExecParallelScanHashBucket(node, econtext)) |
518 | 0 | { |
519 | | /* out of matches; check for possible outer-join fill */ |
520 | 0 | node->hj_JoinState = HJ_FILL_OUTER_TUPLE; |
521 | 0 | continue; |
522 | 0 | } |
523 | 0 | } |
524 | 0 | else |
525 | 0 | { |
526 | 0 | if (!ExecScanHashBucket(node, econtext)) |
527 | 0 | { |
528 | | /* out of matches; check for possible outer-join fill */ |
529 | 0 | node->hj_JoinState = HJ_FILL_OUTER_TUPLE; |
530 | 0 | continue; |
531 | 0 | } |
532 | 0 | } |
533 | | |
534 | | /* |
535 | | * In a right-semijoin, we only need the first match for each |
536 | | * inner tuple. |
537 | | */ |
538 | 0 | if (node->js.jointype == JOIN_RIGHT_SEMI && |
539 | 0 | HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple))) |
540 | 0 | continue; |
541 | | |
542 | | /* |
543 | | * We've got a match, but still need to test non-hashed quals. |
544 | | * ExecScanHashBucket already set up all the state needed to |
545 | | * call ExecQual. |
546 | | * |
547 | | * If we pass the qual, then save state for next call and have |
548 | | * ExecProject form the projection, store it in the tuple |
549 | | * table, and return the slot. |
550 | | * |
551 | | * Only the joinquals determine tuple match status, but all |
552 | | * quals must pass to actually return the tuple. |
553 | | */ |
554 | 0 | if (joinqual == NULL || ExecQual(joinqual, econtext)) |
555 | 0 | { |
556 | 0 | node->hj_MatchedOuter = true; |
557 | | |
558 | | /* |
559 | | * This is really only needed if HJ_FILL_INNER(node) or if |
560 | | * we are in a right-semijoin, but we'll avoid the branch |
561 | | * and just set it always. |
562 | | */ |
563 | 0 | if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple))) |
564 | 0 | HeapTupleHeaderSetMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple)); |
565 | | |
566 | | /* In an antijoin, we never return a matched tuple */ |
567 | 0 | if (node->js.jointype == JOIN_ANTI) |
568 | 0 | { |
569 | 0 | node->hj_JoinState = HJ_NEED_NEW_OUTER; |
570 | 0 | continue; |
571 | 0 | } |
572 | | |
573 | | /* |
574 | | * If we only need to consider the first matching inner |
575 | | * tuple, then advance to next outer tuple after we've |
576 | | * processed this one. |
577 | | */ |
578 | 0 | if (node->js.single_match) |
579 | 0 | node->hj_JoinState = HJ_NEED_NEW_OUTER; |
580 | | |
581 | | /* |
582 | | * In a right-antijoin, we never return a matched tuple. |
583 | | * If it's not an inner_unique join, we need to stay on |
584 | | * the current outer tuple to continue scanning the inner |
585 | | * side for matches. |
586 | | */ |
587 | 0 | if (node->js.jointype == JOIN_RIGHT_ANTI) |
588 | 0 | continue; |
589 | | |
590 | 0 | if (otherqual == NULL || ExecQual(otherqual, econtext)) |
591 | 0 | return ExecProject(node->js.ps.ps_ProjInfo); |
592 | 0 | else |
593 | 0 | InstrCountFiltered2(node, 1); |
594 | 0 | } |
595 | 0 | else |
596 | 0 | InstrCountFiltered1(node, 1); |
597 | 0 | break; |
598 | | |
599 | 0 | case HJ_FILL_OUTER_TUPLE: |
600 | | |
601 | | /* |
602 | | * The current outer tuple has run out of matches, so check |
603 | | * whether to emit a dummy outer-join tuple. Whether we emit |
604 | | * one or not, the next state is NEED_NEW_OUTER. |
605 | | */ |
606 | 0 | node->hj_JoinState = HJ_NEED_NEW_OUTER; |
607 | |
|
608 | 0 | if (!node->hj_MatchedOuter && |
609 | 0 | HJ_FILL_OUTER(node)) |
610 | 0 | { |
611 | | /* |
612 | | * Generate a fake join tuple with nulls for the inner |
613 | | * tuple, and return it if it passes the non-join quals. |
614 | | */ |
615 | 0 | econtext->ecxt_innertuple = node->hj_NullInnerTupleSlot; |
616 | |
|
617 | 0 | if (otherqual == NULL || ExecQual(otherqual, econtext)) |
618 | 0 | return ExecProject(node->js.ps.ps_ProjInfo); |
619 | 0 | else |
620 | 0 | InstrCountFiltered2(node, 1); |
621 | 0 | } |
622 | 0 | break; |
623 | | |
624 | 0 | case HJ_FILL_INNER_TUPLES: |
625 | | |
626 | | /* |
627 | | * We have finished a batch, but we are doing |
628 | | * right/right-anti/full join, so any unmatched inner tuples |
629 | | * in the hashtable have to be emitted before we continue to |
630 | | * the next batch. |
631 | | */ |
632 | 0 | if (!(parallel ? ExecParallelScanHashTableForUnmatched(node, econtext) |
633 | 0 | : ExecScanHashTableForUnmatched(node, econtext))) |
634 | 0 | { |
635 | | /* no more unmatched tuples */ |
636 | 0 | node->hj_JoinState = HJ_NEED_NEW_BATCH; |
637 | 0 | continue; |
638 | 0 | } |
639 | | |
640 | | /* |
641 | | * Generate a fake join tuple with nulls for the outer tuple, |
642 | | * and return it if it passes the non-join quals. |
643 | | */ |
644 | 0 | econtext->ecxt_outertuple = node->hj_NullOuterTupleSlot; |
645 | |
|
646 | 0 | if (otherqual == NULL || ExecQual(otherqual, econtext)) |
647 | 0 | return ExecProject(node->js.ps.ps_ProjInfo); |
648 | 0 | else |
649 | 0 | InstrCountFiltered2(node, 1); |
650 | 0 | break; |
651 | | |
652 | 0 | case HJ_NEED_NEW_BATCH: |
653 | | |
654 | | /* |
655 | | * Try to advance to next batch. Done if there are no more. |
656 | | */ |
657 | 0 | if (parallel) |
658 | 0 | { |
659 | 0 | if (!ExecParallelHashJoinNewBatch(node)) |
660 | 0 | return NULL; /* end of parallel-aware join */ |
661 | 0 | } |
662 | 0 | else |
663 | 0 | { |
664 | 0 | if (!ExecHashJoinNewBatch(node)) |
665 | 0 | return NULL; /* end of parallel-oblivious join */ |
666 | 0 | } |
667 | 0 | node->hj_JoinState = HJ_NEED_NEW_OUTER; |
668 | 0 | break; |
669 | | |
670 | 0 | default: |
671 | 0 | elog(ERROR, "unrecognized hashjoin state: %d", |
672 | 0 | (int) node->hj_JoinState); |
673 | 0 | } |
674 | 0 | } |
675 | 0 | } |
676 | | |
677 | | /* ---------------------------------------------------------------- |
678 | | * ExecHashJoin |
679 | | * |
680 | | * Parallel-oblivious version. |
681 | | * ---------------------------------------------------------------- |
682 | | */ |
683 | | static TupleTableSlot * /* return: a tuple or NULL */ |
684 | | ExecHashJoin(PlanState *pstate) |
685 | 0 | { |
686 | | /* |
687 | | * On sufficiently smart compilers this should be inlined with the |
688 | | * parallel-aware branches removed. |
689 | | */ |
690 | 0 | return ExecHashJoinImpl(pstate, false); |
691 | 0 | } |
692 | | |
693 | | /* ---------------------------------------------------------------- |
694 | | * ExecParallelHashJoin |
695 | | * |
696 | | * Parallel-aware version. |
697 | | * ---------------------------------------------------------------- |
698 | | */ |
699 | | static TupleTableSlot * /* return: a tuple or NULL */ |
700 | | ExecParallelHashJoin(PlanState *pstate) |
701 | 0 | { |
702 | | /* |
703 | | * On sufficiently smart compilers this should be inlined with the |
704 | | * parallel-oblivious branches removed. |
705 | | */ |
706 | 0 | return ExecHashJoinImpl(pstate, true); |
707 | 0 | } |
708 | | |
709 | | /* ---------------------------------------------------------------- |
710 | | * ExecInitHashJoin |
711 | | * |
712 | | * Init routine for HashJoin node. |
713 | | * ---------------------------------------------------------------- |
714 | | */ |
715 | | HashJoinState * |
716 | | ExecInitHashJoin(HashJoin *node, EState *estate, int eflags) |
717 | 0 | { |
718 | 0 | HashJoinState *hjstate; |
719 | 0 | Plan *outerNode; |
720 | 0 | Hash *hashNode; |
721 | 0 | TupleDesc outerDesc, |
722 | 0 | innerDesc; |
723 | 0 | const TupleTableSlotOps *ops; |
724 | | |
725 | | /* check for unsupported flags */ |
726 | 0 | Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK))); |
727 | | |
728 | | /* |
729 | | * create state structure |
730 | | */ |
731 | 0 | hjstate = makeNode(HashJoinState); |
732 | 0 | hjstate->js.ps.plan = (Plan *) node; |
733 | 0 | hjstate->js.ps.state = estate; |
734 | | |
735 | | /* |
736 | | * See ExecHashJoinInitializeDSM() and ExecHashJoinInitializeWorker() |
737 | | * where this function may be replaced with a parallel version, if we |
738 | | * managed to launch a parallel query. |
739 | | */ |
740 | 0 | hjstate->js.ps.ExecProcNode = ExecHashJoin; |
741 | 0 | hjstate->js.jointype = node->join.jointype; |
742 | | |
743 | | /* |
744 | | * Miscellaneous initialization |
745 | | * |
746 | | * create expression context for node |
747 | | */ |
748 | 0 | ExecAssignExprContext(estate, &hjstate->js.ps); |
749 | | |
750 | | /* |
751 | | * initialize child nodes |
752 | | * |
753 | | * Note: we could suppress the REWIND flag for the inner input, which |
754 | | * would amount to betting that the hash will be a single batch. Not |
755 | | * clear if this would be a win or not. |
756 | | */ |
757 | 0 | outerNode = outerPlan(node); |
758 | 0 | hashNode = (Hash *) innerPlan(node); |
759 | |
|
760 | 0 | outerPlanState(hjstate) = ExecInitNode(outerNode, estate, eflags); |
761 | 0 | outerDesc = ExecGetResultType(outerPlanState(hjstate)); |
762 | 0 | innerPlanState(hjstate) = ExecInitNode((Plan *) hashNode, estate, eflags); |
763 | 0 | innerDesc = ExecGetResultType(innerPlanState(hjstate)); |
764 | | |
765 | | /* |
766 | | * Initialize result slot, type and projection. |
767 | | */ |
768 | 0 | ExecInitResultTupleSlotTL(&hjstate->js.ps, &TTSOpsVirtual); |
769 | 0 | ExecAssignProjectionInfo(&hjstate->js.ps, NULL); |
770 | | |
771 | | /* |
772 | | * tuple table initialization |
773 | | */ |
774 | 0 | ops = ExecGetResultSlotOps(outerPlanState(hjstate), NULL); |
775 | 0 | hjstate->hj_OuterTupleSlot = ExecInitExtraTupleSlot(estate, outerDesc, |
776 | 0 | ops); |
777 | | |
778 | | /* |
779 | | * detect whether we need only consider the first matching inner tuple |
780 | | */ |
781 | 0 | hjstate->js.single_match = (node->join.inner_unique || |
782 | 0 | node->join.jointype == JOIN_SEMI); |
783 | | |
784 | | /* set up null tuples for outer joins, if needed */ |
785 | 0 | switch (node->join.jointype) |
786 | 0 | { |
787 | 0 | case JOIN_INNER: |
788 | 0 | case JOIN_SEMI: |
789 | 0 | case JOIN_RIGHT_SEMI: |
790 | 0 | break; |
791 | 0 | case JOIN_LEFT: |
792 | 0 | case JOIN_ANTI: |
793 | 0 | hjstate->hj_NullInnerTupleSlot = |
794 | 0 | ExecInitNullTupleSlot(estate, innerDesc, &TTSOpsVirtual); |
795 | 0 | break; |
796 | 0 | case JOIN_RIGHT: |
797 | 0 | case JOIN_RIGHT_ANTI: |
798 | 0 | hjstate->hj_NullOuterTupleSlot = |
799 | 0 | ExecInitNullTupleSlot(estate, outerDesc, &TTSOpsVirtual); |
800 | 0 | break; |
801 | 0 | case JOIN_FULL: |
802 | 0 | hjstate->hj_NullOuterTupleSlot = |
803 | 0 | ExecInitNullTupleSlot(estate, outerDesc, &TTSOpsVirtual); |
804 | 0 | hjstate->hj_NullInnerTupleSlot = |
805 | 0 | ExecInitNullTupleSlot(estate, innerDesc, &TTSOpsVirtual); |
806 | 0 | break; |
807 | 0 | default: |
808 | 0 | elog(ERROR, "unrecognized join type: %d", |
809 | 0 | (int) node->join.jointype); |
810 | 0 | } |
811 | | |
812 | | /* |
813 | | * now for some voodoo. our temporary tuple slot is actually the result |
814 | | * tuple slot of the Hash node (which is our inner plan). we can do this |
815 | | * because Hash nodes don't return tuples via ExecProcNode() -- instead |
816 | | * the hash join node uses ExecScanHashBucket() to get at the contents of |
817 | | * the hash table. -cim 6/9/91 |
818 | | */ |
819 | 0 | { |
820 | 0 | HashState *hashstate = (HashState *) innerPlanState(hjstate); |
821 | 0 | Hash *hash = (Hash *) hashstate->ps.plan; |
822 | 0 | TupleTableSlot *slot = hashstate->ps.ps_ResultTupleSlot; |
823 | 0 | Oid *outer_hashfuncid; |
824 | 0 | Oid *inner_hashfuncid; |
825 | 0 | bool *hash_strict; |
826 | 0 | ListCell *lc; |
827 | 0 | int nkeys; |
828 | | |
829 | |
|
830 | 0 | hjstate->hj_HashTupleSlot = slot; |
831 | | |
832 | | /* |
833 | | * Build ExprStates to obtain hash values for either side of the join. |
834 | | * This must be done here as ExecBuildHash32Expr needs to know how to |
835 | | * handle NULL inputs and the required handling of that depends on the |
836 | | * jointype. We don't know the join type in ExecInitHash() and we |
837 | | * must build the ExprStates before ExecHashTableCreate() so we |
838 | | * properly attribute any SubPlans that exist in the hash expressions |
839 | | * to the correct PlanState. |
840 | | */ |
841 | 0 | nkeys = list_length(node->hashoperators); |
842 | |
|
843 | 0 | outer_hashfuncid = palloc_array(Oid, nkeys); |
844 | 0 | inner_hashfuncid = palloc_array(Oid, nkeys); |
845 | 0 | hash_strict = palloc_array(bool, nkeys); |
846 | | |
847 | | /* |
848 | | * Determine the hash function for each side of the join for the given |
849 | | * hash operator. |
850 | | */ |
851 | 0 | foreach(lc, node->hashoperators) |
852 | 0 | { |
853 | 0 | Oid hashop = lfirst_oid(lc); |
854 | 0 | int i = foreach_current_index(lc); |
855 | |
|
856 | 0 | if (!get_op_hash_functions(hashop, |
857 | 0 | &outer_hashfuncid[i], |
858 | 0 | &inner_hashfuncid[i])) |
859 | 0 | elog(ERROR, |
860 | 0 | "could not find hash function for hash operator %u", |
861 | 0 | hashop); |
862 | 0 | hash_strict[i] = op_strict(hashop); |
863 | 0 | } |
864 | | |
865 | | /* |
866 | | * Build an ExprState to generate the hash value for the expressions |
867 | | * on the outer of the join. This ExprState must finish generating |
868 | | * the hash value when HJ_FILL_OUTER() is true. Otherwise, |
869 | | * ExecBuildHash32Expr will set up the ExprState to abort early if it |
870 | | * finds a NULL. In these cases, we don't need to store these tuples |
871 | | * in the hash table as the jointype does not require it. |
872 | | */ |
873 | 0 | hjstate->hj_OuterHash = |
874 | 0 | ExecBuildHash32Expr(hjstate->js.ps.ps_ResultTupleDesc, |
875 | 0 | hjstate->js.ps.resultops, |
876 | 0 | outer_hashfuncid, |
877 | 0 | node->hashcollations, |
878 | 0 | node->hashkeys, |
879 | 0 | hash_strict, |
880 | 0 | &hjstate->js.ps, |
881 | 0 | 0, |
882 | 0 | HJ_FILL_OUTER(hjstate)); |
883 | | |
884 | | /* As above, but for the inner side of the join */ |
885 | 0 | hashstate->hash_expr = |
886 | 0 | ExecBuildHash32Expr(hashstate->ps.ps_ResultTupleDesc, |
887 | 0 | hashstate->ps.resultops, |
888 | 0 | inner_hashfuncid, |
889 | 0 | node->hashcollations, |
890 | 0 | hash->hashkeys, |
891 | 0 | hash_strict, |
892 | 0 | &hashstate->ps, |
893 | 0 | 0, |
894 | 0 | HJ_FILL_INNER(hjstate)); |
895 | | |
896 | | /* |
897 | | * Set up the skew table hash function while we have a record of the |
898 | | * first key's hash function Oid. |
899 | | */ |
900 | 0 | if (OidIsValid(hash->skewTable)) |
901 | 0 | { |
902 | 0 | hashstate->skew_hashfunction = palloc0(sizeof(FmgrInfo)); |
903 | 0 | hashstate->skew_collation = linitial_oid(node->hashcollations); |
904 | 0 | fmgr_info(outer_hashfuncid[0], hashstate->skew_hashfunction); |
905 | 0 | } |
906 | | |
907 | | /* no need to keep these */ |
908 | 0 | pfree(outer_hashfuncid); |
909 | 0 | pfree(inner_hashfuncid); |
910 | 0 | pfree(hash_strict); |
911 | 0 | } |
912 | | |
913 | | /* |
914 | | * initialize child expressions |
915 | | */ |
916 | 0 | hjstate->js.ps.qual = |
917 | 0 | ExecInitQual(node->join.plan.qual, (PlanState *) hjstate); |
918 | 0 | hjstate->js.joinqual = |
919 | 0 | ExecInitQual(node->join.joinqual, (PlanState *) hjstate); |
920 | 0 | hjstate->hashclauses = |
921 | 0 | ExecInitQual(node->hashclauses, (PlanState *) hjstate); |
922 | | |
923 | | /* |
924 | | * initialize hash-specific info |
925 | | */ |
926 | 0 | hjstate->hj_HashTable = NULL; |
927 | 0 | hjstate->hj_FirstOuterTupleSlot = NULL; |
928 | |
|
929 | 0 | hjstate->hj_CurHashValue = 0; |
930 | 0 | hjstate->hj_CurBucketNo = 0; |
931 | 0 | hjstate->hj_CurSkewBucketNo = INVALID_SKEW_BUCKET_NO; |
932 | 0 | hjstate->hj_CurTuple = NULL; |
933 | |
|
934 | 0 | hjstate->hj_JoinState = HJ_BUILD_HASHTABLE; |
935 | 0 | hjstate->hj_MatchedOuter = false; |
936 | 0 | hjstate->hj_OuterNotEmpty = false; |
937 | |
|
938 | 0 | return hjstate; |
939 | 0 | } |
940 | | |
941 | | /* ---------------------------------------------------------------- |
942 | | * ExecEndHashJoin |
943 | | * |
944 | | * clean up routine for HashJoin node |
945 | | * ---------------------------------------------------------------- |
946 | | */ |
947 | | void |
948 | | ExecEndHashJoin(HashJoinState *node) |
949 | 0 | { |
950 | | /* |
951 | | * Free hash table |
952 | | */ |
953 | 0 | if (node->hj_HashTable) |
954 | 0 | { |
955 | 0 | ExecHashTableDestroy(node->hj_HashTable); |
956 | 0 | node->hj_HashTable = NULL; |
957 | 0 | } |
958 | | |
959 | | /* |
960 | | * clean up subtrees |
961 | | */ |
962 | 0 | ExecEndNode(outerPlanState(node)); |
963 | 0 | ExecEndNode(innerPlanState(node)); |
964 | 0 | } |
965 | | |
966 | | /* |
967 | | * ExecHashJoinOuterGetTuple |
968 | | * |
969 | | * get the next outer tuple for a parallel oblivious hashjoin: either by |
970 | | * executing the outer plan node in the first pass, or from the temp |
971 | | * files for the hashjoin batches. |
972 | | * |
973 | | * Returns a null slot if no more outer tuples (within the current batch). |
974 | | * |
975 | | * On success, the tuple's hash value is stored at *hashvalue --- this is |
976 | | * either originally computed, or re-read from the temp file. |
977 | | */ |
978 | | static TupleTableSlot * |
979 | | ExecHashJoinOuterGetTuple(PlanState *outerNode, |
980 | | HashJoinState *hjstate, |
981 | | uint32 *hashvalue) |
982 | 0 | { |
983 | 0 | HashJoinTable hashtable = hjstate->hj_HashTable; |
984 | 0 | int curbatch = hashtable->curbatch; |
985 | 0 | TupleTableSlot *slot; |
986 | |
|
987 | 0 | if (curbatch == 0) /* if it is the first pass */ |
988 | 0 | { |
989 | | /* |
990 | | * Check to see if first outer tuple was already fetched by |
991 | | * ExecHashJoin() and not used yet. |
992 | | */ |
993 | 0 | slot = hjstate->hj_FirstOuterTupleSlot; |
994 | 0 | if (!TupIsNull(slot)) |
995 | 0 | hjstate->hj_FirstOuterTupleSlot = NULL; |
996 | 0 | else |
997 | 0 | slot = ExecProcNode(outerNode); |
998 | |
|
999 | 0 | while (!TupIsNull(slot)) |
1000 | 0 | { |
1001 | 0 | bool isnull; |
1002 | | |
1003 | | /* |
1004 | | * We have to compute the tuple's hash value. |
1005 | | */ |
1006 | 0 | ExprContext *econtext = hjstate->js.ps.ps_ExprContext; |
1007 | |
|
1008 | 0 | econtext->ecxt_outertuple = slot; |
1009 | |
|
1010 | 0 | ResetExprContext(econtext); |
1011 | |
|
1012 | 0 | *hashvalue = DatumGetUInt32(ExecEvalExprSwitchContext(hjstate->hj_OuterHash, |
1013 | 0 | econtext, |
1014 | 0 | &isnull)); |
1015 | |
|
1016 | 0 | if (!isnull) |
1017 | 0 | { |
1018 | | /* remember outer relation is not empty for possible rescan */ |
1019 | 0 | hjstate->hj_OuterNotEmpty = true; |
1020 | |
|
1021 | 0 | return slot; |
1022 | 0 | } |
1023 | | |
1024 | | /* |
1025 | | * That tuple couldn't match because of a NULL, so discard it and |
1026 | | * continue with the next one. |
1027 | | */ |
1028 | 0 | slot = ExecProcNode(outerNode); |
1029 | 0 | } |
1030 | 0 | } |
1031 | 0 | else if (curbatch < hashtable->nbatch) |
1032 | 0 | { |
1033 | 0 | BufFile *file = hashtable->outerBatchFile[curbatch]; |
1034 | | |
1035 | | /* |
1036 | | * In outer-join cases, we could get here even though the batch file |
1037 | | * is empty. |
1038 | | */ |
1039 | 0 | if (file == NULL) |
1040 | 0 | return NULL; |
1041 | | |
1042 | 0 | slot = ExecHashJoinGetSavedTuple(hjstate, |
1043 | 0 | file, |
1044 | 0 | hashvalue, |
1045 | 0 | hjstate->hj_OuterTupleSlot); |
1046 | 0 | if (!TupIsNull(slot)) |
1047 | 0 | return slot; |
1048 | 0 | } |
1049 | | |
1050 | | /* End of this batch */ |
1051 | 0 | return NULL; |
1052 | 0 | } |
1053 | | |
1054 | | /* |
1055 | | * ExecHashJoinOuterGetTuple variant for the parallel case. |
1056 | | */ |
1057 | | static TupleTableSlot * |
1058 | | ExecParallelHashJoinOuterGetTuple(PlanState *outerNode, |
1059 | | HashJoinState *hjstate, |
1060 | | uint32 *hashvalue) |
1061 | 0 | { |
1062 | 0 | HashJoinTable hashtable = hjstate->hj_HashTable; |
1063 | 0 | int curbatch = hashtable->curbatch; |
1064 | 0 | TupleTableSlot *slot; |
1065 | | |
1066 | | /* |
1067 | | * In the Parallel Hash case we only run the outer plan directly for |
1068 | | * single-batch hash joins. Otherwise we have to go to batch files, even |
1069 | | * for batch 0. |
1070 | | */ |
1071 | 0 | if (curbatch == 0 && hashtable->nbatch == 1) |
1072 | 0 | { |
1073 | 0 | slot = ExecProcNode(outerNode); |
1074 | |
|
1075 | 0 | while (!TupIsNull(slot)) |
1076 | 0 | { |
1077 | 0 | bool isnull; |
1078 | |
|
1079 | 0 | ExprContext *econtext = hjstate->js.ps.ps_ExprContext; |
1080 | |
|
1081 | 0 | econtext->ecxt_outertuple = slot; |
1082 | |
|
1083 | 0 | ResetExprContext(econtext); |
1084 | |
|
1085 | 0 | *hashvalue = DatumGetUInt32(ExecEvalExprSwitchContext(hjstate->hj_OuterHash, |
1086 | 0 | econtext, |
1087 | 0 | &isnull)); |
1088 | |
|
1089 | 0 | if (!isnull) |
1090 | 0 | return slot; |
1091 | | |
1092 | | /* |
1093 | | * That tuple couldn't match because of a NULL, so discard it and |
1094 | | * continue with the next one. |
1095 | | */ |
1096 | 0 | slot = ExecProcNode(outerNode); |
1097 | 0 | } |
1098 | 0 | } |
1099 | 0 | else if (curbatch < hashtable->nbatch) |
1100 | 0 | { |
1101 | 0 | MinimalTuple tuple; |
1102 | |
|
1103 | 0 | tuple = sts_parallel_scan_next(hashtable->batches[curbatch].outer_tuples, |
1104 | 0 | hashvalue); |
1105 | 0 | if (tuple != NULL) |
1106 | 0 | { |
1107 | 0 | ExecForceStoreMinimalTuple(tuple, |
1108 | 0 | hjstate->hj_OuterTupleSlot, |
1109 | 0 | false); |
1110 | 0 | slot = hjstate->hj_OuterTupleSlot; |
1111 | 0 | return slot; |
1112 | 0 | } |
1113 | 0 | else |
1114 | 0 | ExecClearTuple(hjstate->hj_OuterTupleSlot); |
1115 | 0 | } |
1116 | | |
1117 | | /* End of this batch */ |
1118 | 0 | hashtable->batches[curbatch].outer_eof = true; |
1119 | |
|
1120 | 0 | return NULL; |
1121 | 0 | } |
1122 | | |
1123 | | /* |
1124 | | * ExecHashJoinNewBatch |
1125 | | * switch to a new hashjoin batch |
1126 | | * |
1127 | | * Returns true if successful, false if there are no more batches. |
1128 | | */ |
1129 | | static bool |
1130 | | ExecHashJoinNewBatch(HashJoinState *hjstate) |
1131 | 0 | { |
1132 | 0 | HashJoinTable hashtable = hjstate->hj_HashTable; |
1133 | 0 | int nbatch; |
1134 | 0 | int curbatch; |
1135 | 0 | BufFile *innerFile; |
1136 | 0 | TupleTableSlot *slot; |
1137 | 0 | uint32 hashvalue; |
1138 | |
|
1139 | 0 | nbatch = hashtable->nbatch; |
1140 | 0 | curbatch = hashtable->curbatch; |
1141 | |
|
1142 | 0 | if (curbatch > 0) |
1143 | 0 | { |
1144 | | /* |
1145 | | * We no longer need the previous outer batch file; close it right |
1146 | | * away to free disk space. |
1147 | | */ |
1148 | 0 | if (hashtable->outerBatchFile[curbatch]) |
1149 | 0 | BufFileClose(hashtable->outerBatchFile[curbatch]); |
1150 | 0 | hashtable->outerBatchFile[curbatch] = NULL; |
1151 | 0 | } |
1152 | 0 | else /* we just finished the first batch */ |
1153 | 0 | { |
1154 | | /* |
1155 | | * Reset some of the skew optimization state variables, since we no |
1156 | | * longer need to consider skew tuples after the first batch. The |
1157 | | * memory context reset we are about to do will release the skew |
1158 | | * hashtable itself. |
1159 | | */ |
1160 | 0 | hashtable->skewEnabled = false; |
1161 | 0 | hashtable->skewBucket = NULL; |
1162 | 0 | hashtable->skewBucketNums = NULL; |
1163 | 0 | hashtable->nSkewBuckets = 0; |
1164 | 0 | hashtable->spaceUsedSkew = 0; |
1165 | 0 | } |
1166 | | |
1167 | | /* |
1168 | | * We can always skip over any batches that are completely empty on both |
1169 | | * sides. We can sometimes skip over batches that are empty on only one |
1170 | | * side, but there are exceptions: |
1171 | | * |
1172 | | * 1. In a left/full outer join, we have to process outer batches even if |
1173 | | * the inner batch is empty. Similarly, in a right/right-anti/full outer |
1174 | | * join, we have to process inner batches even if the outer batch is |
1175 | | * empty. |
1176 | | * |
1177 | | * 2. If we have increased nbatch since the initial estimate, we have to |
1178 | | * scan inner batches since they might contain tuples that need to be |
1179 | | * reassigned to later inner batches. |
1180 | | * |
1181 | | * 3. Similarly, if we have increased nbatch since starting the outer |
1182 | | * scan, we have to rescan outer batches in case they contain tuples that |
1183 | | * need to be reassigned. |
1184 | | */ |
1185 | 0 | curbatch++; |
1186 | 0 | while (curbatch < nbatch && |
1187 | 0 | (hashtable->outerBatchFile[curbatch] == NULL || |
1188 | 0 | hashtable->innerBatchFile[curbatch] == NULL)) |
1189 | 0 | { |
1190 | 0 | if (hashtable->outerBatchFile[curbatch] && |
1191 | 0 | HJ_FILL_OUTER(hjstate)) |
1192 | 0 | break; /* must process due to rule 1 */ |
1193 | 0 | if (hashtable->innerBatchFile[curbatch] && |
1194 | 0 | HJ_FILL_INNER(hjstate)) |
1195 | 0 | break; /* must process due to rule 1 */ |
1196 | 0 | if (hashtable->innerBatchFile[curbatch] && |
1197 | 0 | nbatch != hashtable->nbatch_original) |
1198 | 0 | break; /* must process due to rule 2 */ |
1199 | 0 | if (hashtable->outerBatchFile[curbatch] && |
1200 | 0 | nbatch != hashtable->nbatch_outstart) |
1201 | 0 | break; /* must process due to rule 3 */ |
1202 | | /* We can ignore this batch. */ |
1203 | | /* Release associated temp files right away. */ |
1204 | 0 | if (hashtable->innerBatchFile[curbatch]) |
1205 | 0 | BufFileClose(hashtable->innerBatchFile[curbatch]); |
1206 | 0 | hashtable->innerBatchFile[curbatch] = NULL; |
1207 | 0 | if (hashtable->outerBatchFile[curbatch]) |
1208 | 0 | BufFileClose(hashtable->outerBatchFile[curbatch]); |
1209 | 0 | hashtable->outerBatchFile[curbatch] = NULL; |
1210 | 0 | curbatch++; |
1211 | 0 | } |
1212 | |
|
1213 | 0 | if (curbatch >= nbatch) |
1214 | 0 | return false; /* no more batches */ |
1215 | | |
1216 | 0 | hashtable->curbatch = curbatch; |
1217 | | |
1218 | | /* |
1219 | | * Reload the hash table with the new inner batch (which could be empty) |
1220 | | */ |
1221 | 0 | ExecHashTableReset(hashtable); |
1222 | |
|
1223 | 0 | innerFile = hashtable->innerBatchFile[curbatch]; |
1224 | |
|
1225 | 0 | if (innerFile != NULL) |
1226 | 0 | { |
1227 | 0 | if (BufFileSeek(innerFile, 0, 0, SEEK_SET)) |
1228 | 0 | ereport(ERROR, |
1229 | 0 | (errcode_for_file_access(), |
1230 | 0 | errmsg("could not rewind hash-join temporary file"))); |
1231 | | |
1232 | 0 | while ((slot = ExecHashJoinGetSavedTuple(hjstate, |
1233 | 0 | innerFile, |
1234 | 0 | &hashvalue, |
1235 | 0 | hjstate->hj_HashTupleSlot))) |
1236 | 0 | { |
1237 | | /* |
1238 | | * NOTE: some tuples may be sent to future batches. Also, it is |
1239 | | * possible for hashtable->nbatch to be increased here! |
1240 | | */ |
1241 | 0 | ExecHashTableInsert(hashtable, slot, hashvalue); |
1242 | 0 | } |
1243 | | |
1244 | | /* |
1245 | | * after we build the hash table, the inner batch file is no longer |
1246 | | * needed |
1247 | | */ |
1248 | 0 | BufFileClose(innerFile); |
1249 | 0 | hashtable->innerBatchFile[curbatch] = NULL; |
1250 | 0 | } |
1251 | | |
1252 | | /* |
1253 | | * Rewind outer batch file (if present), so that we can start reading it. |
1254 | | */ |
1255 | 0 | if (hashtable->outerBatchFile[curbatch] != NULL) |
1256 | 0 | { |
1257 | 0 | if (BufFileSeek(hashtable->outerBatchFile[curbatch], 0, 0, SEEK_SET)) |
1258 | 0 | ereport(ERROR, |
1259 | 0 | (errcode_for_file_access(), |
1260 | 0 | errmsg("could not rewind hash-join temporary file"))); |
1261 | 0 | } |
1262 | | |
1263 | 0 | return true; |
1264 | 0 | } |
1265 | | |
1266 | | /* |
1267 | | * Choose a batch to work on, and attach to it. Returns true if successful, |
1268 | | * false if there are no more batches. |
1269 | | */ |
1270 | | static bool |
1271 | | ExecParallelHashJoinNewBatch(HashJoinState *hjstate) |
1272 | 0 | { |
1273 | 0 | HashJoinTable hashtable = hjstate->hj_HashTable; |
1274 | 0 | int start_batchno; |
1275 | 0 | int batchno; |
1276 | | |
1277 | | /* |
1278 | | * If we were already attached to a batch, remember not to bother checking |
1279 | | * it again, and detach from it (possibly freeing the hash table if we are |
1280 | | * last to detach). |
1281 | | */ |
1282 | 0 | if (hashtable->curbatch >= 0) |
1283 | 0 | { |
1284 | 0 | hashtable->batches[hashtable->curbatch].done = true; |
1285 | 0 | ExecHashTableDetachBatch(hashtable); |
1286 | 0 | } |
1287 | | |
1288 | | /* |
1289 | | * Search for a batch that isn't done. We use an atomic counter to start |
1290 | | * our search at a different batch in every participant when there are |
1291 | | * more batches than participants. |
1292 | | */ |
1293 | 0 | batchno = start_batchno = |
1294 | 0 | pg_atomic_fetch_add_u32(&hashtable->parallel_state->distributor, 1) % |
1295 | 0 | hashtable->nbatch; |
1296 | 0 | do |
1297 | 0 | { |
1298 | 0 | uint32 hashvalue; |
1299 | 0 | MinimalTuple tuple; |
1300 | 0 | TupleTableSlot *slot; |
1301 | |
|
1302 | 0 | if (!hashtable->batches[batchno].done) |
1303 | 0 | { |
1304 | 0 | SharedTuplestoreAccessor *inner_tuples; |
1305 | 0 | Barrier *batch_barrier = |
1306 | 0 | &hashtable->batches[batchno].shared->batch_barrier; |
1307 | |
|
1308 | 0 | switch (BarrierAttach(batch_barrier)) |
1309 | 0 | { |
1310 | 0 | case PHJ_BATCH_ELECT: |
1311 | | |
1312 | | /* One backend allocates the hash table. */ |
1313 | 0 | if (BarrierArriveAndWait(batch_barrier, |
1314 | 0 | WAIT_EVENT_HASH_BATCH_ELECT)) |
1315 | 0 | ExecParallelHashTableAlloc(hashtable, batchno); |
1316 | | /* Fall through. */ |
1317 | |
|
1318 | 0 | case PHJ_BATCH_ALLOCATE: |
1319 | | /* Wait for allocation to complete. */ |
1320 | 0 | BarrierArriveAndWait(batch_barrier, |
1321 | 0 | WAIT_EVENT_HASH_BATCH_ALLOCATE); |
1322 | | /* Fall through. */ |
1323 | |
|
1324 | 0 | case PHJ_BATCH_LOAD: |
1325 | | /* Start (or join in) loading tuples. */ |
1326 | 0 | ExecParallelHashTableSetCurrentBatch(hashtable, batchno); |
1327 | 0 | inner_tuples = hashtable->batches[batchno].inner_tuples; |
1328 | 0 | sts_begin_parallel_scan(inner_tuples); |
1329 | 0 | while ((tuple = sts_parallel_scan_next(inner_tuples, |
1330 | 0 | &hashvalue))) |
1331 | 0 | { |
1332 | 0 | ExecForceStoreMinimalTuple(tuple, |
1333 | 0 | hjstate->hj_HashTupleSlot, |
1334 | 0 | false); |
1335 | 0 | slot = hjstate->hj_HashTupleSlot; |
1336 | 0 | ExecParallelHashTableInsertCurrentBatch(hashtable, slot, |
1337 | 0 | hashvalue); |
1338 | 0 | } |
1339 | 0 | sts_end_parallel_scan(inner_tuples); |
1340 | 0 | BarrierArriveAndWait(batch_barrier, |
1341 | 0 | WAIT_EVENT_HASH_BATCH_LOAD); |
1342 | | /* Fall through. */ |
1343 | |
|
1344 | 0 | case PHJ_BATCH_PROBE: |
1345 | | |
1346 | | /* |
1347 | | * This batch is ready to probe. Return control to |
1348 | | * caller. We stay attached to batch_barrier so that the |
1349 | | * hash table stays alive until everyone's finished |
1350 | | * probing it, but no participant is allowed to wait at |
1351 | | * this barrier again (or else a deadlock could occur). |
1352 | | * All attached participants must eventually detach from |
1353 | | * the barrier and one worker must advance the phase so |
1354 | | * that the final phase is reached. |
1355 | | */ |
1356 | 0 | ExecParallelHashTableSetCurrentBatch(hashtable, batchno); |
1357 | 0 | sts_begin_parallel_scan(hashtable->batches[batchno].outer_tuples); |
1358 | |
|
1359 | 0 | return true; |
1360 | 0 | case PHJ_BATCH_SCAN: |
1361 | | |
1362 | | /* |
1363 | | * In principle, we could help scan for unmatched tuples, |
1364 | | * since that phase is already underway (the thing we |
1365 | | * can't do under current deadlock-avoidance rules is wait |
1366 | | * for others to arrive at PHJ_BATCH_SCAN, because |
1367 | | * PHJ_BATCH_PROBE emits tuples, but in this case we just |
1368 | | * got here without waiting). That is not yet done. For |
1369 | | * now, we just detach and go around again. We have to |
1370 | | * use ExecHashTableDetachBatch() because there's a small |
1371 | | * chance we'll be the last to detach, and then we're |
1372 | | * responsible for freeing memory. |
1373 | | */ |
1374 | 0 | ExecParallelHashTableSetCurrentBatch(hashtable, batchno); |
1375 | 0 | hashtable->batches[batchno].done = true; |
1376 | 0 | ExecHashTableDetachBatch(hashtable); |
1377 | 0 | break; |
1378 | | |
1379 | 0 | case PHJ_BATCH_FREE: |
1380 | | |
1381 | | /* |
1382 | | * Already done. Detach and go around again (if any |
1383 | | * remain). |
1384 | | */ |
1385 | 0 | BarrierDetach(batch_barrier); |
1386 | 0 | hashtable->batches[batchno].done = true; |
1387 | 0 | hashtable->curbatch = -1; |
1388 | 0 | break; |
1389 | | |
1390 | 0 | default: |
1391 | 0 | elog(ERROR, "unexpected batch phase %d", |
1392 | 0 | BarrierPhase(batch_barrier)); |
1393 | 0 | } |
1394 | 0 | } |
1395 | 0 | batchno = (batchno + 1) % hashtable->nbatch; |
1396 | 0 | } while (batchno != start_batchno); |
1397 | | |
1398 | 0 | return false; |
1399 | 0 | } |
1400 | | |
1401 | | /* |
1402 | | * ExecHashJoinSaveTuple |
1403 | | * save a tuple to a batch file. |
1404 | | * |
1405 | | * The data recorded in the file for each tuple is its hash value, |
1406 | | * then the tuple in MinimalTuple format. |
1407 | | * |
1408 | | * fileptr points to a batch file in one of the hashtable arrays. |
1409 | | * |
1410 | | * The batch files (and their buffers) are allocated in the spill context |
1411 | | * created for the hashtable. |
1412 | | */ |
1413 | | void |
1414 | | ExecHashJoinSaveTuple(MinimalTuple tuple, uint32 hashvalue, |
1415 | | BufFile **fileptr, HashJoinTable hashtable) |
1416 | 0 | { |
1417 | 0 | BufFile *file = *fileptr; |
1418 | | |
1419 | | /* |
1420 | | * The batch file is lazily created. If this is the first tuple written to |
1421 | | * this batch, the batch file is created and its buffer is allocated in |
1422 | | * the spillCxt context, NOT in the batchCxt. |
1423 | | * |
1424 | | * During the build phase, buffered files are created for inner batches. |
1425 | | * Each batch's buffered file is closed (and its buffer freed) after the |
1426 | | * batch is loaded into memory during the outer side scan. Therefore, it |
1427 | | * is necessary to allocate the batch file buffer in a memory context |
1428 | | * which outlives the batch itself. |
1429 | | * |
1430 | | * Also, we use spillCxt instead of hashCxt for a better accounting of the |
1431 | | * spilling memory consumption. |
1432 | | */ |
1433 | 0 | if (file == NULL) |
1434 | 0 | { |
1435 | 0 | MemoryContext oldctx = MemoryContextSwitchTo(hashtable->spillCxt); |
1436 | |
|
1437 | 0 | file = BufFileCreateTemp(false); |
1438 | 0 | *fileptr = file; |
1439 | |
|
1440 | 0 | MemoryContextSwitchTo(oldctx); |
1441 | 0 | } |
1442 | |
|
1443 | 0 | BufFileWrite(file, &hashvalue, sizeof(uint32)); |
1444 | 0 | BufFileWrite(file, tuple, tuple->t_len); |
1445 | 0 | } |
1446 | | |
1447 | | /* |
1448 | | * ExecHashJoinGetSavedTuple |
1449 | | * read the next tuple from a batch file. Return NULL if no more. |
1450 | | * |
1451 | | * On success, *hashvalue is set to the tuple's hash value, and the tuple |
1452 | | * itself is stored in the given slot. |
1453 | | */ |
1454 | | static TupleTableSlot * |
1455 | | ExecHashJoinGetSavedTuple(HashJoinState *hjstate, |
1456 | | BufFile *file, |
1457 | | uint32 *hashvalue, |
1458 | | TupleTableSlot *tupleSlot) |
1459 | 0 | { |
1460 | 0 | uint32 header[2]; |
1461 | 0 | size_t nread; |
1462 | 0 | MinimalTuple tuple; |
1463 | | |
1464 | | /* |
1465 | | * We check for interrupts here because this is typically taken as an |
1466 | | * alternative code path to an ExecProcNode() call, which would include |
1467 | | * such a check. |
1468 | | */ |
1469 | 0 | CHECK_FOR_INTERRUPTS(); |
1470 | | |
1471 | | /* |
1472 | | * Since both the hash value and the MinimalTuple length word are uint32, |
1473 | | * we can read them both in one BufFileRead() call without any type |
1474 | | * cheating. |
1475 | | */ |
1476 | 0 | nread = BufFileReadMaybeEOF(file, header, sizeof(header), true); |
1477 | 0 | if (nread == 0) /* end of file */ |
1478 | 0 | { |
1479 | 0 | ExecClearTuple(tupleSlot); |
1480 | 0 | return NULL; |
1481 | 0 | } |
1482 | 0 | *hashvalue = header[0]; |
1483 | 0 | tuple = (MinimalTuple) palloc(header[1]); |
1484 | 0 | tuple->t_len = header[1]; |
1485 | 0 | BufFileReadExact(file, |
1486 | 0 | (char *) tuple + sizeof(uint32), |
1487 | 0 | header[1] - sizeof(uint32)); |
1488 | 0 | ExecForceStoreMinimalTuple(tuple, tupleSlot, true); |
1489 | 0 | return tupleSlot; |
1490 | 0 | } |
1491 | | |
1492 | | |
1493 | | void |
1494 | | ExecReScanHashJoin(HashJoinState *node) |
1495 | 0 | { |
1496 | 0 | PlanState *outerPlan = outerPlanState(node); |
1497 | 0 | PlanState *innerPlan = innerPlanState(node); |
1498 | | |
1499 | | /* |
1500 | | * In a multi-batch join, we currently have to do rescans the hard way, |
1501 | | * primarily because batch temp files may have already been released. But |
1502 | | * if it's a single-batch join, and there is no parameter change for the |
1503 | | * inner subnode, then we can just re-use the existing hash table without |
1504 | | * rebuilding it. |
1505 | | */ |
1506 | 0 | if (node->hj_HashTable != NULL) |
1507 | 0 | { |
1508 | 0 | if (node->hj_HashTable->nbatch == 1 && |
1509 | 0 | innerPlan->chgParam == NULL) |
1510 | 0 | { |
1511 | | /* |
1512 | | * Okay to reuse the hash table; needn't rescan inner, either. |
1513 | | * |
1514 | | * However, if it's a right/right-anti/right-semi/full join, we'd |
1515 | | * better reset the inner-tuple match flags contained in the |
1516 | | * table. |
1517 | | */ |
1518 | 0 | if (HJ_FILL_INNER(node) || node->js.jointype == JOIN_RIGHT_SEMI) |
1519 | 0 | ExecHashTableResetMatchFlags(node->hj_HashTable); |
1520 | | |
1521 | | /* |
1522 | | * Also, we need to reset our state about the emptiness of the |
1523 | | * outer relation, so that the new scan of the outer will update |
1524 | | * it correctly if it turns out to be empty this time. (There's no |
1525 | | * harm in clearing it now because ExecHashJoin won't need the |
1526 | | * info. In the other cases, where the hash table doesn't exist |
1527 | | * or we are destroying it, we leave this state alone because |
1528 | | * ExecHashJoin will need it the first time through.) |
1529 | | */ |
1530 | 0 | node->hj_OuterNotEmpty = false; |
1531 | | |
1532 | | /* ExecHashJoin can skip the BUILD_HASHTABLE step */ |
1533 | 0 | node->hj_JoinState = HJ_NEED_NEW_OUTER; |
1534 | 0 | } |
1535 | 0 | else |
1536 | 0 | { |
1537 | | /* must destroy and rebuild hash table */ |
1538 | 0 | HashState *hashNode = castNode(HashState, innerPlan); |
1539 | |
|
1540 | 0 | Assert(hashNode->hashtable == node->hj_HashTable); |
1541 | | /* accumulate stats from old hash table, if wanted */ |
1542 | | /* (this should match ExecShutdownHash) */ |
1543 | 0 | if (hashNode->ps.instrument && !hashNode->hinstrument) |
1544 | 0 | hashNode->hinstrument = (HashInstrumentation *) |
1545 | 0 | palloc0(sizeof(HashInstrumentation)); |
1546 | 0 | if (hashNode->hinstrument) |
1547 | 0 | ExecHashAccumInstrumentation(hashNode->hinstrument, |
1548 | 0 | hashNode->hashtable); |
1549 | | /* for safety, be sure to clear child plan node's pointer too */ |
1550 | 0 | hashNode->hashtable = NULL; |
1551 | |
|
1552 | 0 | ExecHashTableDestroy(node->hj_HashTable); |
1553 | 0 | node->hj_HashTable = NULL; |
1554 | 0 | node->hj_JoinState = HJ_BUILD_HASHTABLE; |
1555 | | |
1556 | | /* |
1557 | | * if chgParam of subnode is not null then plan will be re-scanned |
1558 | | * by first ExecProcNode. |
1559 | | */ |
1560 | 0 | if (innerPlan->chgParam == NULL) |
1561 | 0 | ExecReScan(innerPlan); |
1562 | 0 | } |
1563 | 0 | } |
1564 | | |
1565 | | /* Always reset intra-tuple state */ |
1566 | 0 | node->hj_CurHashValue = 0; |
1567 | 0 | node->hj_CurBucketNo = 0; |
1568 | 0 | node->hj_CurSkewBucketNo = INVALID_SKEW_BUCKET_NO; |
1569 | 0 | node->hj_CurTuple = NULL; |
1570 | |
|
1571 | 0 | node->hj_MatchedOuter = false; |
1572 | 0 | node->hj_FirstOuterTupleSlot = NULL; |
1573 | | |
1574 | | /* |
1575 | | * if chgParam of subnode is not null then plan will be re-scanned by |
1576 | | * first ExecProcNode. |
1577 | | */ |
1578 | 0 | if (outerPlan->chgParam == NULL) |
1579 | 0 | ExecReScan(outerPlan); |
1580 | 0 | } |
1581 | | |
1582 | | void |
1583 | | ExecShutdownHashJoin(HashJoinState *node) |
1584 | 0 | { |
1585 | 0 | if (node->hj_HashTable) |
1586 | 0 | { |
1587 | | /* |
1588 | | * Detach from shared state before DSM memory goes away. This makes |
1589 | | * sure that we don't have any pointers into DSM memory by the time |
1590 | | * ExecEndHashJoin runs. |
1591 | | */ |
1592 | 0 | ExecHashTableDetachBatch(node->hj_HashTable); |
1593 | 0 | ExecHashTableDetach(node->hj_HashTable); |
1594 | 0 | } |
1595 | 0 | } |
1596 | | |
1597 | | static void |
1598 | | ExecParallelHashJoinPartitionOuter(HashJoinState *hjstate) |
1599 | 0 | { |
1600 | 0 | PlanState *outerState = outerPlanState(hjstate); |
1601 | 0 | ExprContext *econtext = hjstate->js.ps.ps_ExprContext; |
1602 | 0 | HashJoinTable hashtable = hjstate->hj_HashTable; |
1603 | 0 | TupleTableSlot *slot; |
1604 | 0 | uint32 hashvalue; |
1605 | 0 | int i; |
1606 | |
|
1607 | 0 | Assert(hjstate->hj_FirstOuterTupleSlot == NULL); |
1608 | | |
1609 | | /* Execute outer plan, writing all tuples to shared tuplestores. */ |
1610 | 0 | for (;;) |
1611 | 0 | { |
1612 | 0 | bool isnull; |
1613 | |
|
1614 | 0 | slot = ExecProcNode(outerState); |
1615 | 0 | if (TupIsNull(slot)) |
1616 | 0 | break; |
1617 | 0 | econtext->ecxt_outertuple = slot; |
1618 | |
|
1619 | 0 | ResetExprContext(econtext); |
1620 | |
|
1621 | 0 | hashvalue = DatumGetUInt32(ExecEvalExprSwitchContext(hjstate->hj_OuterHash, |
1622 | 0 | econtext, |
1623 | 0 | &isnull)); |
1624 | |
|
1625 | 0 | if (!isnull) |
1626 | 0 | { |
1627 | 0 | int batchno; |
1628 | 0 | int bucketno; |
1629 | 0 | bool shouldFree; |
1630 | 0 | MinimalTuple mintup = ExecFetchSlotMinimalTuple(slot, &shouldFree); |
1631 | |
|
1632 | 0 | ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno, |
1633 | 0 | &batchno); |
1634 | 0 | sts_puttuple(hashtable->batches[batchno].outer_tuples, |
1635 | 0 | &hashvalue, mintup); |
1636 | |
|
1637 | 0 | if (shouldFree) |
1638 | 0 | heap_free_minimal_tuple(mintup); |
1639 | 0 | } |
1640 | 0 | CHECK_FOR_INTERRUPTS(); |
1641 | 0 | } |
1642 | | |
1643 | | /* Make sure all outer partitions are readable by any backend. */ |
1644 | 0 | for (i = 0; i < hashtable->nbatch; ++i) |
1645 | 0 | sts_end_write(hashtable->batches[i].outer_tuples); |
1646 | 0 | } |
1647 | | |
1648 | | void |
1649 | | ExecHashJoinEstimate(HashJoinState *state, ParallelContext *pcxt) |
1650 | 0 | { |
1651 | 0 | shm_toc_estimate_chunk(&pcxt->estimator, sizeof(ParallelHashJoinState)); |
1652 | 0 | shm_toc_estimate_keys(&pcxt->estimator, 1); |
1653 | 0 | } |
1654 | | |
1655 | | void |
1656 | | ExecHashJoinInitializeDSM(HashJoinState *state, ParallelContext *pcxt) |
1657 | 0 | { |
1658 | 0 | int plan_node_id = state->js.ps.plan->plan_node_id; |
1659 | 0 | HashState *hashNode; |
1660 | 0 | ParallelHashJoinState *pstate; |
1661 | | |
1662 | | /* |
1663 | | * Disable shared hash table mode if we failed to create a real DSM |
1664 | | * segment, because that means that we don't have a DSA area to work with. |
1665 | | */ |
1666 | 0 | if (pcxt->seg == NULL) |
1667 | 0 | return; |
1668 | | |
1669 | 0 | ExecSetExecProcNode(&state->js.ps, ExecParallelHashJoin); |
1670 | | |
1671 | | /* |
1672 | | * Set up the state needed to coordinate access to the shared hash |
1673 | | * table(s), using the plan node ID as the toc key. |
1674 | | */ |
1675 | 0 | pstate = shm_toc_allocate(pcxt->toc, sizeof(ParallelHashJoinState)); |
1676 | 0 | shm_toc_insert(pcxt->toc, plan_node_id, pstate); |
1677 | | |
1678 | | /* |
1679 | | * Set up the shared hash join state with no batches initially. |
1680 | | * ExecHashTableCreate() will prepare at least one later and set nbatch |
1681 | | * and space_allowed. |
1682 | | */ |
1683 | 0 | pstate->nbatch = 0; |
1684 | 0 | pstate->space_allowed = 0; |
1685 | 0 | pstate->batches = InvalidDsaPointer; |
1686 | 0 | pstate->old_batches = InvalidDsaPointer; |
1687 | 0 | pstate->nbuckets = 0; |
1688 | 0 | pstate->growth = PHJ_GROWTH_OK; |
1689 | 0 | pstate->chunk_work_queue = InvalidDsaPointer; |
1690 | 0 | pg_atomic_init_u32(&pstate->distributor, 0); |
1691 | 0 | pstate->nparticipants = pcxt->nworkers + 1; |
1692 | 0 | pstate->total_tuples = 0; |
1693 | 0 | LWLockInitialize(&pstate->lock, |
1694 | 0 | LWTRANCHE_PARALLEL_HASH_JOIN); |
1695 | 0 | BarrierInit(&pstate->build_barrier, 0); |
1696 | 0 | BarrierInit(&pstate->grow_batches_barrier, 0); |
1697 | 0 | BarrierInit(&pstate->grow_buckets_barrier, 0); |
1698 | | |
1699 | | /* Set up the space we'll use for shared temporary files. */ |
1700 | 0 | SharedFileSetInit(&pstate->fileset, pcxt->seg); |
1701 | | |
1702 | | /* Initialize the shared state in the hash node. */ |
1703 | 0 | hashNode = (HashState *) innerPlanState(state); |
1704 | 0 | hashNode->parallel_state = pstate; |
1705 | 0 | } |
1706 | | |
1707 | | /* ---------------------------------------------------------------- |
1708 | | * ExecHashJoinReInitializeDSM |
1709 | | * |
1710 | | * Reset shared state before beginning a fresh scan. |
1711 | | * ---------------------------------------------------------------- |
1712 | | */ |
1713 | | void |
1714 | | ExecHashJoinReInitializeDSM(HashJoinState *state, ParallelContext *pcxt) |
1715 | 0 | { |
1716 | 0 | int plan_node_id = state->js.ps.plan->plan_node_id; |
1717 | 0 | ParallelHashJoinState *pstate; |
1718 | | |
1719 | | /* Nothing to do if we failed to create a DSM segment. */ |
1720 | 0 | if (pcxt->seg == NULL) |
1721 | 0 | return; |
1722 | | |
1723 | 0 | pstate = shm_toc_lookup(pcxt->toc, plan_node_id, false); |
1724 | | |
1725 | | /* |
1726 | | * It would be possible to reuse the shared hash table in single-batch |
1727 | | * cases by resetting and then fast-forwarding build_barrier to |
1728 | | * PHJ_BUILD_FREE and batch 0's batch_barrier to PHJ_BATCH_PROBE, but |
1729 | | * currently shared hash tables are already freed by now (by the last |
1730 | | * participant to detach from the batch). We could consider keeping it |
1731 | | * around for single-batch joins. We'd also need to adjust |
1732 | | * finalize_plan() so that it doesn't record a dummy dependency for |
1733 | | * Parallel Hash nodes, preventing the rescan optimization. For now we |
1734 | | * don't try. |
1735 | | */ |
1736 | | |
1737 | | /* Detach, freeing any remaining shared memory. */ |
1738 | 0 | if (state->hj_HashTable != NULL) |
1739 | 0 | { |
1740 | 0 | ExecHashTableDetachBatch(state->hj_HashTable); |
1741 | 0 | ExecHashTableDetach(state->hj_HashTable); |
1742 | 0 | } |
1743 | | |
1744 | | /* Clear any shared batch files. */ |
1745 | 0 | SharedFileSetDeleteAll(&pstate->fileset); |
1746 | | |
1747 | | /* Reset build_barrier to PHJ_BUILD_ELECT so we can go around again. */ |
1748 | 0 | BarrierInit(&pstate->build_barrier, 0); |
1749 | 0 | } |
1750 | | |
1751 | | void |
1752 | | ExecHashJoinInitializeWorker(HashJoinState *state, |
1753 | | ParallelWorkerContext *pwcxt) |
1754 | 0 | { |
1755 | 0 | HashState *hashNode; |
1756 | 0 | int plan_node_id = state->js.ps.plan->plan_node_id; |
1757 | 0 | ParallelHashJoinState *pstate = |
1758 | 0 | shm_toc_lookup(pwcxt->toc, plan_node_id, false); |
1759 | | |
1760 | | /* Attach to the space for shared temporary files. */ |
1761 | 0 | SharedFileSetAttach(&pstate->fileset, pwcxt->seg); |
1762 | | |
1763 | | /* Attach to the shared state in the hash node. */ |
1764 | 0 | hashNode = (HashState *) innerPlanState(state); |
1765 | 0 | hashNode->parallel_state = pstate; |
1766 | |
|
1767 | 0 | ExecSetExecProcNode(&state->js.ps, ExecParallelHashJoin); |
1768 | 0 | } |