/src/postgres/src/backend/executor/nodeMemoize.c
Line | Count | Source (jump to first uncovered line) |
1 | | /*------------------------------------------------------------------------- |
2 | | * |
3 | | * nodeMemoize.c |
4 | | * Routines to handle caching of results from parameterized nodes |
5 | | * |
6 | | * Portions Copyright (c) 2021-2025, PostgreSQL Global Development Group |
7 | | * Portions Copyright (c) 1994, Regents of the University of California |
8 | | * |
9 | | * |
10 | | * IDENTIFICATION |
11 | | * src/backend/executor/nodeMemoize.c |
12 | | * |
13 | | * Memoize nodes are intended to sit above parameterized nodes in the plan |
14 | | * tree in order to cache results from them. The intention here is that a |
15 | | * repeat scan with a parameter value that has already been seen by the node |
16 | | * can fetch tuples from the cache rather than having to re-scan the inner |
17 | | * node all over again. The query planner may choose to make use of one of |
18 | | * these when it thinks rescans for previously seen values are likely enough |
19 | | * to warrant adding the additional node. |
20 | | * |
21 | | * The method of cache we use is a hash table. When the cache fills, we never |
22 | | * spill tuples to disk, instead, we choose to evict the least recently used |
23 | | * cache entry from the cache. We remember the least recently used entry by |
24 | | * always pushing new entries and entries we look for onto the tail of a |
25 | | * doubly linked list. This means that older items always bubble to the top |
26 | | * of this LRU list. |
27 | | * |
28 | | * Sometimes our callers won't run their scans to completion. For example a |
29 | | * semi-join only needs to run until it finds a matching tuple, and once it |
30 | | * does, the join operator skips to the next outer tuple and does not execute |
31 | | * the inner side again on that scan. Because of this, we must keep track of |
32 | | * when a cache entry is complete, and by default, we know it is when we run |
33 | | * out of tuples to read during the scan. However, there are cases where we |
34 | | * can mark the cache entry as complete without exhausting the scan of all |
35 | | * tuples. One case is unique joins, where the join operator knows that there |
36 | | * will only be at most one match for any given outer tuple. In order to |
37 | | * support such cases we allow the "singlerow" option to be set for the cache. |
38 | | * This option marks the cache entry as complete after we read the first tuple |
39 | | * from the subnode. |
40 | | * |
41 | | * It's possible when we're filling the cache for a given set of parameters |
42 | | * that we're unable to free enough memory to store any more tuples. If this |
43 | | * happens then we'll have already evicted all other cache entries. When |
44 | | * caching another tuple would cause us to exceed our memory budget, we must |
45 | | * free the entry that we're currently populating and move the state machine |
46 | | * into MEMO_CACHE_BYPASS_MODE. This means that we'll not attempt to cache |
47 | | * any further tuples for this particular scan. We don't have the memory for |
48 | | * it. The state machine will be reset again on the next rescan. If the |
49 | | * memory requirements to cache the next parameter's tuples are less |
50 | | * demanding, then that may allow us to start putting useful entries back into |
51 | | * the cache again. |
52 | | * |
53 | | * |
54 | | * INTERFACE ROUTINES |
55 | | * ExecMemoize - lookup cache, exec subplan when not found |
56 | | * ExecInitMemoize - initialize node and subnodes |
57 | | * ExecEndMemoize - shutdown node and subnodes |
58 | | * ExecReScanMemoize - rescan the memoize node |
59 | | * |
60 | | * ExecMemoizeEstimate estimates DSM space needed for parallel plan |
61 | | * ExecMemoizeInitializeDSM initialize DSM for parallel plan |
62 | | * ExecMemoizeInitializeWorker attach to DSM info in parallel worker |
63 | | * ExecMemoizeRetrieveInstrumentation get instrumentation from worker |
64 | | *------------------------------------------------------------------------- |
65 | | */ |
66 | | |
67 | | #include "postgres.h" |
68 | | |
69 | | #include "common/hashfn.h" |
70 | | #include "executor/executor.h" |
71 | | #include "executor/nodeMemoize.h" |
72 | | #include "lib/ilist.h" |
73 | | #include "miscadmin.h" |
74 | | #include "utils/datum.h" |
75 | | #include "utils/lsyscache.h" |
76 | | |
77 | | /* States of the ExecMemoize state machine */ |
78 | 0 | #define MEMO_CACHE_LOOKUP 1 /* Attempt to perform a cache lookup */ |
79 | 0 | #define MEMO_CACHE_FETCH_NEXT_TUPLE 2 /* Get another tuple from the cache */ |
80 | 0 | #define MEMO_FILLING_CACHE 3 /* Read outer node to fill cache */ |
81 | 0 | #define MEMO_CACHE_BYPASS_MODE 4 /* Bypass mode. Just read from our |
82 | | * subplan without caching anything */ |
83 | 0 | #define MEMO_END_OF_SCAN 5 /* Ready for rescan */ |
84 | | |
85 | | |
86 | | /* Helper macros for memory accounting */ |
87 | 0 | #define EMPTY_ENTRY_MEMORY_BYTES(e) (sizeof(MemoizeEntry) + \ |
88 | 0 | sizeof(MemoizeKey) + \ |
89 | 0 | (e)->key->params->t_len); |
90 | 0 | #define CACHE_TUPLE_BYTES(t) (sizeof(MemoizeTuple) + \ |
91 | 0 | (t)->mintuple->t_len) |
92 | | |
93 | | /* MemoizeTuple Stores an individually cached tuple */ |
94 | | typedef struct MemoizeTuple |
95 | | { |
96 | | MinimalTuple mintuple; /* Cached tuple */ |
97 | | struct MemoizeTuple *next; /* The next tuple with the same parameter |
98 | | * values or NULL if it's the last one */ |
99 | | } MemoizeTuple; |
100 | | |
101 | | /* |
102 | | * MemoizeKey |
103 | | * The hash table key for cached entries plus the LRU list link |
104 | | */ |
105 | | typedef struct MemoizeKey |
106 | | { |
107 | | MinimalTuple params; |
108 | | dlist_node lru_node; /* Pointer to next/prev key in LRU list */ |
109 | | } MemoizeKey; |
110 | | |
111 | | /* |
112 | | * MemoizeEntry |
113 | | * The data struct that the cache hash table stores |
114 | | */ |
115 | | typedef struct MemoizeEntry |
116 | | { |
117 | | MemoizeKey *key; /* Hash key for hash table lookups */ |
118 | | MemoizeTuple *tuplehead; /* Pointer to the first tuple or NULL if no |
119 | | * tuples are cached for this entry */ |
120 | | uint32 hash; /* Hash value (cached) */ |
121 | | char status; /* Hash status */ |
122 | | bool complete; /* Did we read the outer plan to completion? */ |
123 | | } MemoizeEntry; |
124 | | |
125 | | |
126 | | #define SH_PREFIX memoize |
127 | | #define SH_ELEMENT_TYPE MemoizeEntry |
128 | | #define SH_KEY_TYPE MemoizeKey * |
129 | | #define SH_SCOPE static inline |
130 | | #define SH_DECLARE |
131 | | #include "lib/simplehash.h" |
132 | | |
133 | | static uint32 MemoizeHash_hash(struct memoize_hash *tb, |
134 | | const MemoizeKey *key); |
135 | | static bool MemoizeHash_equal(struct memoize_hash *tb, |
136 | | const MemoizeKey *key1, |
137 | | const MemoizeKey *key2); |
138 | | |
139 | | #define SH_PREFIX memoize |
140 | 0 | #define SH_ELEMENT_TYPE MemoizeEntry |
141 | | #define SH_KEY_TYPE MemoizeKey * |
142 | 0 | #define SH_KEY key |
143 | 0 | #define SH_HASH_KEY(tb, key) MemoizeHash_hash(tb, key) |
144 | 0 | #define SH_EQUAL(tb, a, b) MemoizeHash_equal(tb, a, b) |
145 | | #define SH_SCOPE static inline |
146 | | #define SH_STORE_HASH |
147 | 0 | #define SH_GET_HASH(tb, a) a->hash |
148 | | #define SH_DEFINE |
149 | | #include "lib/simplehash.h" |
150 | | |
151 | | /* |
152 | | * MemoizeHash_hash |
153 | | * Hash function for simplehash hashtable. 'key' is unused here as we |
154 | | * require that all table lookups first populate the MemoizeState's |
155 | | * probeslot with the key values to be looked up. |
156 | | */ |
157 | | static uint32 |
158 | | MemoizeHash_hash(struct memoize_hash *tb, const MemoizeKey *key) |
159 | 0 | { |
160 | 0 | MemoizeState *mstate = (MemoizeState *) tb->private_data; |
161 | 0 | ExprContext *econtext = mstate->ss.ps.ps_ExprContext; |
162 | 0 | MemoryContext oldcontext; |
163 | 0 | TupleTableSlot *pslot = mstate->probeslot; |
164 | 0 | uint32 hashkey = 0; |
165 | 0 | int numkeys = mstate->nkeys; |
166 | |
|
167 | 0 | oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory); |
168 | |
|
169 | 0 | if (mstate->binary_mode) |
170 | 0 | { |
171 | 0 | for (int i = 0; i < numkeys; i++) |
172 | 0 | { |
173 | | /* combine successive hashkeys by rotating */ |
174 | 0 | hashkey = pg_rotate_left32(hashkey, 1); |
175 | |
|
176 | 0 | if (!pslot->tts_isnull[i]) /* treat nulls as having hash key 0 */ |
177 | 0 | { |
178 | 0 | CompactAttribute *attr; |
179 | 0 | uint32 hkey; |
180 | |
|
181 | 0 | attr = TupleDescCompactAttr(pslot->tts_tupleDescriptor, i); |
182 | |
|
183 | 0 | hkey = datum_image_hash(pslot->tts_values[i], attr->attbyval, attr->attlen); |
184 | |
|
185 | 0 | hashkey ^= hkey; |
186 | 0 | } |
187 | 0 | } |
188 | 0 | } |
189 | 0 | else |
190 | 0 | { |
191 | 0 | FmgrInfo *hashfunctions = mstate->hashfunctions; |
192 | 0 | Oid *collations = mstate->collations; |
193 | |
|
194 | 0 | for (int i = 0; i < numkeys; i++) |
195 | 0 | { |
196 | | /* combine successive hashkeys by rotating */ |
197 | 0 | hashkey = pg_rotate_left32(hashkey, 1); |
198 | |
|
199 | 0 | if (!pslot->tts_isnull[i]) /* treat nulls as having hash key 0 */ |
200 | 0 | { |
201 | 0 | uint32 hkey; |
202 | |
|
203 | 0 | hkey = DatumGetUInt32(FunctionCall1Coll(&hashfunctions[i], |
204 | 0 | collations[i], pslot->tts_values[i])); |
205 | 0 | hashkey ^= hkey; |
206 | 0 | } |
207 | 0 | } |
208 | 0 | } |
209 | |
|
210 | 0 | MemoryContextSwitchTo(oldcontext); |
211 | 0 | return murmurhash32(hashkey); |
212 | 0 | } |
213 | | |
214 | | /* |
215 | | * MemoizeHash_equal |
216 | | * Equality function for confirming hash value matches during a hash |
217 | | * table lookup. 'key2' is never used. Instead the MemoizeState's |
218 | | * probeslot is always populated with details of what's being looked up. |
219 | | */ |
220 | | static bool |
221 | | MemoizeHash_equal(struct memoize_hash *tb, const MemoizeKey *key1, |
222 | | const MemoizeKey *key2) |
223 | 0 | { |
224 | 0 | MemoizeState *mstate = (MemoizeState *) tb->private_data; |
225 | 0 | ExprContext *econtext = mstate->ss.ps.ps_ExprContext; |
226 | 0 | TupleTableSlot *tslot = mstate->tableslot; |
227 | 0 | TupleTableSlot *pslot = mstate->probeslot; |
228 | | |
229 | | /* probeslot should have already been prepared by prepare_probe_slot() */ |
230 | 0 | ExecStoreMinimalTuple(key1->params, tslot, false); |
231 | |
|
232 | 0 | if (mstate->binary_mode) |
233 | 0 | { |
234 | 0 | MemoryContext oldcontext; |
235 | 0 | int numkeys = mstate->nkeys; |
236 | 0 | bool match = true; |
237 | |
|
238 | 0 | oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory); |
239 | |
|
240 | 0 | slot_getallattrs(tslot); |
241 | 0 | slot_getallattrs(pslot); |
242 | |
|
243 | 0 | for (int i = 0; i < numkeys; i++) |
244 | 0 | { |
245 | 0 | CompactAttribute *attr; |
246 | |
|
247 | 0 | if (tslot->tts_isnull[i] != pslot->tts_isnull[i]) |
248 | 0 | { |
249 | 0 | match = false; |
250 | 0 | break; |
251 | 0 | } |
252 | | |
253 | | /* both NULL? they're equal */ |
254 | 0 | if (tslot->tts_isnull[i]) |
255 | 0 | continue; |
256 | | |
257 | | /* perform binary comparison on the two datums */ |
258 | 0 | attr = TupleDescCompactAttr(tslot->tts_tupleDescriptor, i); |
259 | 0 | if (!datum_image_eq(tslot->tts_values[i], pslot->tts_values[i], |
260 | 0 | attr->attbyval, attr->attlen)) |
261 | 0 | { |
262 | 0 | match = false; |
263 | 0 | break; |
264 | 0 | } |
265 | 0 | } |
266 | |
|
267 | 0 | MemoryContextSwitchTo(oldcontext); |
268 | 0 | return match; |
269 | 0 | } |
270 | 0 | else |
271 | 0 | { |
272 | 0 | econtext->ecxt_innertuple = tslot; |
273 | 0 | econtext->ecxt_outertuple = pslot; |
274 | 0 | return ExecQual(mstate->cache_eq_expr, econtext); |
275 | 0 | } |
276 | 0 | } |
277 | | |
278 | | /* |
279 | | * Initialize the hash table to empty. The MemoizeState's hashtable field |
280 | | * must point to NULL. |
281 | | */ |
282 | | static void |
283 | | build_hash_table(MemoizeState *mstate, uint32 size) |
284 | 0 | { |
285 | 0 | Assert(mstate->hashtable == NULL); |
286 | | |
287 | | /* Make a guess at a good size when we're not given a valid size. */ |
288 | 0 | if (size == 0) |
289 | 0 | size = 1024; |
290 | | |
291 | | /* memoize_create will convert the size to a power of 2 */ |
292 | 0 | mstate->hashtable = memoize_create(mstate->tableContext, size, mstate); |
293 | 0 | } |
294 | | |
295 | | /* |
296 | | * prepare_probe_slot |
297 | | * Populate mstate's probeslot with the values from the tuple stored |
298 | | * in 'key'. If 'key' is NULL, then perform the population by evaluating |
299 | | * mstate's param_exprs. |
300 | | */ |
301 | | static inline void |
302 | | prepare_probe_slot(MemoizeState *mstate, MemoizeKey *key) |
303 | 0 | { |
304 | 0 | TupleTableSlot *pslot = mstate->probeslot; |
305 | 0 | TupleTableSlot *tslot = mstate->tableslot; |
306 | 0 | int numKeys = mstate->nkeys; |
307 | |
|
308 | 0 | ExecClearTuple(pslot); |
309 | |
|
310 | 0 | if (key == NULL) |
311 | 0 | { |
312 | 0 | ExprContext *econtext = mstate->ss.ps.ps_ExprContext; |
313 | 0 | MemoryContext oldcontext; |
314 | |
|
315 | 0 | oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory); |
316 | | |
317 | | /* Set the probeslot's values based on the current parameter values */ |
318 | 0 | for (int i = 0; i < numKeys; i++) |
319 | 0 | pslot->tts_values[i] = ExecEvalExpr(mstate->param_exprs[i], |
320 | 0 | econtext, |
321 | 0 | &pslot->tts_isnull[i]); |
322 | |
|
323 | 0 | MemoryContextSwitchTo(oldcontext); |
324 | 0 | } |
325 | 0 | else |
326 | 0 | { |
327 | | /* Process the key's MinimalTuple and store the values in probeslot */ |
328 | 0 | ExecStoreMinimalTuple(key->params, tslot, false); |
329 | 0 | slot_getallattrs(tslot); |
330 | 0 | memcpy(pslot->tts_values, tslot->tts_values, sizeof(Datum) * numKeys); |
331 | 0 | memcpy(pslot->tts_isnull, tslot->tts_isnull, sizeof(bool) * numKeys); |
332 | 0 | } |
333 | |
|
334 | 0 | ExecStoreVirtualTuple(pslot); |
335 | 0 | } |
336 | | |
337 | | /* |
338 | | * entry_purge_tuples |
339 | | * Remove all tuples from the cache entry pointed to by 'entry'. This |
340 | | * leaves an empty cache entry. Also, update the memory accounting to |
341 | | * reflect the removal of the tuples. |
342 | | */ |
343 | | static inline void |
344 | | entry_purge_tuples(MemoizeState *mstate, MemoizeEntry *entry) |
345 | 0 | { |
346 | 0 | MemoizeTuple *tuple = entry->tuplehead; |
347 | 0 | uint64 freed_mem = 0; |
348 | |
|
349 | 0 | while (tuple != NULL) |
350 | 0 | { |
351 | 0 | MemoizeTuple *next = tuple->next; |
352 | |
|
353 | 0 | freed_mem += CACHE_TUPLE_BYTES(tuple); |
354 | | |
355 | | /* Free memory used for this tuple */ |
356 | 0 | pfree(tuple->mintuple); |
357 | 0 | pfree(tuple); |
358 | |
|
359 | 0 | tuple = next; |
360 | 0 | } |
361 | |
|
362 | 0 | entry->complete = false; |
363 | 0 | entry->tuplehead = NULL; |
364 | | |
365 | | /* Update the memory accounting */ |
366 | 0 | mstate->mem_used -= freed_mem; |
367 | 0 | } |
368 | | |
369 | | /* |
370 | | * remove_cache_entry |
371 | | * Remove 'entry' from the cache and free memory used by it. |
372 | | */ |
373 | | static void |
374 | | remove_cache_entry(MemoizeState *mstate, MemoizeEntry *entry) |
375 | 0 | { |
376 | 0 | MemoizeKey *key = entry->key; |
377 | |
|
378 | 0 | dlist_delete(&entry->key->lru_node); |
379 | | |
380 | | /* Remove all of the tuples from this entry */ |
381 | 0 | entry_purge_tuples(mstate, entry); |
382 | | |
383 | | /* |
384 | | * Update memory accounting. entry_purge_tuples should have already |
385 | | * subtracted the memory used for each cached tuple. Here we just update |
386 | | * the amount used by the entry itself. |
387 | | */ |
388 | 0 | mstate->mem_used -= EMPTY_ENTRY_MEMORY_BYTES(entry); |
389 | | |
390 | | /* Remove the entry from the cache */ |
391 | 0 | memoize_delete_item(mstate->hashtable, entry); |
392 | |
|
393 | 0 | pfree(key->params); |
394 | 0 | pfree(key); |
395 | 0 | } |
396 | | |
397 | | /* |
398 | | * cache_purge_all |
399 | | * Remove all items from the cache |
400 | | */ |
401 | | static void |
402 | | cache_purge_all(MemoizeState *mstate) |
403 | 0 | { |
404 | 0 | uint64 evictions = 0; |
405 | |
|
406 | 0 | if (mstate->hashtable != NULL) |
407 | 0 | evictions = mstate->hashtable->members; |
408 | | |
409 | | /* |
410 | | * Likely the most efficient way to remove all items is to just reset the |
411 | | * memory context for the cache and then rebuild a fresh hash table. This |
412 | | * saves having to remove each item one by one and pfree each cached tuple |
413 | | */ |
414 | 0 | MemoryContextReset(mstate->tableContext); |
415 | | |
416 | | /* NULLify so we recreate the table on the next call */ |
417 | 0 | mstate->hashtable = NULL; |
418 | | |
419 | | /* reset the LRU list */ |
420 | 0 | dlist_init(&mstate->lru_list); |
421 | 0 | mstate->last_tuple = NULL; |
422 | 0 | mstate->entry = NULL; |
423 | |
|
424 | 0 | mstate->mem_used = 0; |
425 | | |
426 | | /* XXX should we add something new to track these purges? */ |
427 | 0 | mstate->stats.cache_evictions += evictions; /* Update Stats */ |
428 | 0 | } |
429 | | |
430 | | /* |
431 | | * cache_reduce_memory |
432 | | * Evict older and less recently used items from the cache in order to |
433 | | * reduce the memory consumption back to something below the |
434 | | * MemoizeState's mem_limit. |
435 | | * |
436 | | * 'specialkey', if not NULL, causes the function to return false if the entry |
437 | | * which the key belongs to is removed from the cache. |
438 | | */ |
439 | | static bool |
440 | | cache_reduce_memory(MemoizeState *mstate, MemoizeKey *specialkey) |
441 | 0 | { |
442 | 0 | bool specialkey_intact = true; /* for now */ |
443 | 0 | dlist_mutable_iter iter; |
444 | 0 | uint64 evictions = 0; |
445 | | |
446 | | /* Update peak memory usage */ |
447 | 0 | if (mstate->mem_used > mstate->stats.mem_peak) |
448 | 0 | mstate->stats.mem_peak = mstate->mem_used; |
449 | | |
450 | | /* We expect only to be called when we've gone over budget on memory */ |
451 | 0 | Assert(mstate->mem_used > mstate->mem_limit); |
452 | | |
453 | | /* Start the eviction process starting at the head of the LRU list. */ |
454 | 0 | dlist_foreach_modify(iter, &mstate->lru_list) |
455 | 0 | { |
456 | 0 | MemoizeKey *key = dlist_container(MemoizeKey, lru_node, iter.cur); |
457 | 0 | MemoizeEntry *entry; |
458 | | |
459 | | /* |
460 | | * Populate the hash probe slot in preparation for looking up this LRU |
461 | | * entry. |
462 | | */ |
463 | 0 | prepare_probe_slot(mstate, key); |
464 | | |
465 | | /* |
466 | | * Ideally the LRU list pointers would be stored in the entry itself |
467 | | * rather than in the key. Unfortunately, we can't do that as the |
468 | | * simplehash.h code may resize the table and allocate new memory for |
469 | | * entries which would result in those pointers pointing to the old |
470 | | * buckets. However, it's fine to use the key to store this as that's |
471 | | * only referenced by a pointer in the entry, which of course follows |
472 | | * the entry whenever the hash table is resized. Since we only have a |
473 | | * pointer to the key here, we must perform a hash table lookup to |
474 | | * find the entry that the key belongs to. |
475 | | */ |
476 | 0 | entry = memoize_lookup(mstate->hashtable, NULL); |
477 | | |
478 | | /* |
479 | | * Sanity check that we found the entry belonging to the LRU list |
480 | | * item. A misbehaving hash or equality function could cause the |
481 | | * entry not to be found or the wrong entry to be found. |
482 | | */ |
483 | 0 | if (unlikely(entry == NULL || entry->key != key)) |
484 | 0 | elog(ERROR, "could not find memoization table entry"); |
485 | | |
486 | | /* |
487 | | * If we're being called to free memory while the cache is being |
488 | | * populated with new tuples, then we'd better take some care as we |
489 | | * could end up freeing the entry which 'specialkey' belongs to. |
490 | | * Generally callers will pass 'specialkey' as the key for the cache |
491 | | * entry which is currently being populated, so we must set |
492 | | * 'specialkey_intact' to false to inform the caller the specialkey |
493 | | * entry has been removed. |
494 | | */ |
495 | 0 | if (key == specialkey) |
496 | 0 | specialkey_intact = false; |
497 | | |
498 | | /* |
499 | | * Finally remove the entry. This will remove from the LRU list too. |
500 | | */ |
501 | 0 | remove_cache_entry(mstate, entry); |
502 | |
|
503 | 0 | evictions++; |
504 | | |
505 | | /* Exit if we've freed enough memory */ |
506 | 0 | if (mstate->mem_used <= mstate->mem_limit) |
507 | 0 | break; |
508 | 0 | } |
509 | | |
510 | 0 | mstate->stats.cache_evictions += evictions; /* Update Stats */ |
511 | |
|
512 | 0 | return specialkey_intact; |
513 | 0 | } |
514 | | |
515 | | /* |
516 | | * cache_lookup |
517 | | * Perform a lookup to see if we've already cached tuples based on the |
518 | | * scan's current parameters. If we find an existing entry we move it to |
519 | | * the end of the LRU list, set *found to true then return it. If we |
520 | | * don't find an entry then we create a new one and add it to the end of |
521 | | * the LRU list. We also update cache memory accounting and remove older |
522 | | * entries if we go over the memory budget. If we managed to free enough |
523 | | * memory we return the new entry, else we return NULL. |
524 | | * |
525 | | * Callers can assume we'll never return NULL when *found is true. |
526 | | */ |
527 | | static MemoizeEntry * |
528 | | cache_lookup(MemoizeState *mstate, bool *found) |
529 | 0 | { |
530 | 0 | MemoizeKey *key; |
531 | 0 | MemoizeEntry *entry; |
532 | 0 | MemoryContext oldcontext; |
533 | | |
534 | | /* prepare the probe slot with the current scan parameters */ |
535 | 0 | prepare_probe_slot(mstate, NULL); |
536 | | |
537 | | /* |
538 | | * Add the new entry to the cache. No need to pass a valid key since the |
539 | | * hash function uses mstate's probeslot, which we populated above. |
540 | | */ |
541 | 0 | entry = memoize_insert(mstate->hashtable, NULL, found); |
542 | |
|
543 | 0 | if (*found) |
544 | 0 | { |
545 | | /* |
546 | | * Move existing entry to the tail of the LRU list to mark it as the |
547 | | * most recently used item. |
548 | | */ |
549 | 0 | dlist_move_tail(&mstate->lru_list, &entry->key->lru_node); |
550 | |
|
551 | 0 | return entry; |
552 | 0 | } |
553 | | |
554 | 0 | oldcontext = MemoryContextSwitchTo(mstate->tableContext); |
555 | | |
556 | | /* Allocate a new key */ |
557 | 0 | entry->key = key = (MemoizeKey *) palloc(sizeof(MemoizeKey)); |
558 | 0 | key->params = ExecCopySlotMinimalTuple(mstate->probeslot); |
559 | | |
560 | | /* Update the total cache memory utilization */ |
561 | 0 | mstate->mem_used += EMPTY_ENTRY_MEMORY_BYTES(entry); |
562 | | |
563 | | /* Initialize this entry */ |
564 | 0 | entry->complete = false; |
565 | 0 | entry->tuplehead = NULL; |
566 | | |
567 | | /* |
568 | | * Since this is the most recently used entry, push this entry onto the |
569 | | * end of the LRU list. |
570 | | */ |
571 | 0 | dlist_push_tail(&mstate->lru_list, &entry->key->lru_node); |
572 | |
|
573 | 0 | mstate->last_tuple = NULL; |
574 | |
|
575 | 0 | MemoryContextSwitchTo(oldcontext); |
576 | | |
577 | | /* |
578 | | * If we've gone over our memory budget, then we'll free up some space in |
579 | | * the cache. |
580 | | */ |
581 | 0 | if (mstate->mem_used > mstate->mem_limit) |
582 | 0 | { |
583 | | /* |
584 | | * Try to free up some memory. It's highly unlikely that we'll fail |
585 | | * to do so here since the entry we've just added is yet to contain |
586 | | * any tuples and we're able to remove any other entry to reduce the |
587 | | * memory consumption. |
588 | | */ |
589 | 0 | if (unlikely(!cache_reduce_memory(mstate, key))) |
590 | 0 | return NULL; |
591 | | |
592 | | /* |
593 | | * The process of removing entries from the cache may have caused the |
594 | | * code in simplehash.h to shuffle elements to earlier buckets in the |
595 | | * hash table. If it has, we'll need to find the entry again by |
596 | | * performing a lookup. Fortunately, we can detect if this has |
597 | | * happened by seeing if the entry is still in use and that the key |
598 | | * pointer matches our expected key. |
599 | | */ |
600 | 0 | if (entry->status != memoize_SH_IN_USE || entry->key != key) |
601 | 0 | { |
602 | | /* |
603 | | * We need to repopulate the probeslot as lookups performed during |
604 | | * the cache evictions above will have stored some other key. |
605 | | */ |
606 | 0 | prepare_probe_slot(mstate, key); |
607 | | |
608 | | /* Re-find the newly added entry */ |
609 | 0 | entry = memoize_lookup(mstate->hashtable, NULL); |
610 | 0 | Assert(entry != NULL); |
611 | 0 | } |
612 | 0 | } |
613 | | |
614 | 0 | return entry; |
615 | 0 | } |
616 | | |
617 | | /* |
618 | | * cache_store_tuple |
619 | | * Add the tuple stored in 'slot' to the mstate's current cache entry. |
620 | | * The cache entry must have already been made with cache_lookup(). |
621 | | * mstate's last_tuple field must point to the tail of mstate->entry's |
622 | | * list of tuples. |
623 | | */ |
624 | | static bool |
625 | | cache_store_tuple(MemoizeState *mstate, TupleTableSlot *slot) |
626 | 0 | { |
627 | 0 | MemoizeTuple *tuple; |
628 | 0 | MemoizeEntry *entry = mstate->entry; |
629 | 0 | MemoryContext oldcontext; |
630 | |
|
631 | 0 | Assert(slot != NULL); |
632 | 0 | Assert(entry != NULL); |
633 | |
|
634 | 0 | oldcontext = MemoryContextSwitchTo(mstate->tableContext); |
635 | |
|
636 | 0 | tuple = (MemoizeTuple *) palloc(sizeof(MemoizeTuple)); |
637 | 0 | tuple->mintuple = ExecCopySlotMinimalTuple(slot); |
638 | 0 | tuple->next = NULL; |
639 | | |
640 | | /* Account for the memory we just consumed */ |
641 | 0 | mstate->mem_used += CACHE_TUPLE_BYTES(tuple); |
642 | |
|
643 | 0 | if (entry->tuplehead == NULL) |
644 | 0 | { |
645 | | /* |
646 | | * This is the first tuple for this entry, so just point the list head |
647 | | * to it. |
648 | | */ |
649 | 0 | entry->tuplehead = tuple; |
650 | 0 | } |
651 | 0 | else |
652 | 0 | { |
653 | | /* push this tuple onto the tail of the list */ |
654 | 0 | mstate->last_tuple->next = tuple; |
655 | 0 | } |
656 | |
|
657 | 0 | mstate->last_tuple = tuple; |
658 | 0 | MemoryContextSwitchTo(oldcontext); |
659 | | |
660 | | /* |
661 | | * If we've gone over our memory budget then free up some space in the |
662 | | * cache. |
663 | | */ |
664 | 0 | if (mstate->mem_used > mstate->mem_limit) |
665 | 0 | { |
666 | 0 | MemoizeKey *key = entry->key; |
667 | |
|
668 | 0 | if (!cache_reduce_memory(mstate, key)) |
669 | 0 | return false; |
670 | | |
671 | | /* |
672 | | * The process of removing entries from the cache may have caused the |
673 | | * code in simplehash.h to shuffle elements to earlier buckets in the |
674 | | * hash table. If it has, we'll need to find the entry again by |
675 | | * performing a lookup. Fortunately, we can detect if this has |
676 | | * happened by seeing if the entry is still in use and that the key |
677 | | * pointer matches our expected key. |
678 | | */ |
679 | 0 | if (entry->status != memoize_SH_IN_USE || entry->key != key) |
680 | 0 | { |
681 | | /* |
682 | | * We need to repopulate the probeslot as lookups performed during |
683 | | * the cache evictions above will have stored some other key. |
684 | | */ |
685 | 0 | prepare_probe_slot(mstate, key); |
686 | | |
687 | | /* Re-find the entry */ |
688 | 0 | mstate->entry = entry = memoize_lookup(mstate->hashtable, NULL); |
689 | 0 | Assert(entry != NULL); |
690 | 0 | } |
691 | 0 | } |
692 | | |
693 | 0 | return true; |
694 | 0 | } |
695 | | |
696 | | static TupleTableSlot * |
697 | | ExecMemoize(PlanState *pstate) |
698 | 0 | { |
699 | 0 | MemoizeState *node = castNode(MemoizeState, pstate); |
700 | 0 | ExprContext *econtext = node->ss.ps.ps_ExprContext; |
701 | 0 | PlanState *outerNode; |
702 | 0 | TupleTableSlot *slot; |
703 | |
|
704 | 0 | CHECK_FOR_INTERRUPTS(); |
705 | | |
706 | | /* |
707 | | * Reset per-tuple memory context to free any expression evaluation |
708 | | * storage allocated in the previous tuple cycle. |
709 | | */ |
710 | 0 | ResetExprContext(econtext); |
711 | |
|
712 | 0 | switch (node->mstatus) |
713 | 0 | { |
714 | 0 | case MEMO_CACHE_LOOKUP: |
715 | 0 | { |
716 | 0 | MemoizeEntry *entry; |
717 | 0 | TupleTableSlot *outerslot; |
718 | 0 | bool found; |
719 | |
|
720 | 0 | Assert(node->entry == NULL); |
721 | | |
722 | | /* first call? we'll need a hash table. */ |
723 | 0 | if (unlikely(node->hashtable == NULL)) |
724 | 0 | build_hash_table(node, ((Memoize *) pstate->plan)->est_entries); |
725 | | |
726 | | /* |
727 | | * We're only ever in this state for the first call of the |
728 | | * scan. Here we have a look to see if we've already seen the |
729 | | * current parameters before and if we have already cached a |
730 | | * complete set of records that the outer plan will return for |
731 | | * these parameters. |
732 | | * |
733 | | * When we find a valid cache entry, we'll return the first |
734 | | * tuple from it. If not found, we'll create a cache entry and |
735 | | * then try to fetch a tuple from the outer scan. If we find |
736 | | * one there, we'll try to cache it. |
737 | | */ |
738 | | |
739 | | /* see if we've got anything cached for the current parameters */ |
740 | 0 | entry = cache_lookup(node, &found); |
741 | |
|
742 | 0 | if (found && entry->complete) |
743 | 0 | { |
744 | 0 | node->stats.cache_hits += 1; /* stats update */ |
745 | | |
746 | | /* |
747 | | * Set last_tuple and entry so that the state |
748 | | * MEMO_CACHE_FETCH_NEXT_TUPLE can easily find the next |
749 | | * tuple for these parameters. |
750 | | */ |
751 | 0 | node->last_tuple = entry->tuplehead; |
752 | 0 | node->entry = entry; |
753 | | |
754 | | /* Fetch the first cached tuple, if there is one */ |
755 | 0 | if (entry->tuplehead) |
756 | 0 | { |
757 | 0 | node->mstatus = MEMO_CACHE_FETCH_NEXT_TUPLE; |
758 | |
|
759 | 0 | slot = node->ss.ps.ps_ResultTupleSlot; |
760 | 0 | ExecStoreMinimalTuple(entry->tuplehead->mintuple, |
761 | 0 | slot, false); |
762 | |
|
763 | 0 | return slot; |
764 | 0 | } |
765 | | |
766 | | /* The cache entry is void of any tuples. */ |
767 | 0 | node->mstatus = MEMO_END_OF_SCAN; |
768 | 0 | return NULL; |
769 | 0 | } |
770 | | |
771 | | /* Handle cache miss */ |
772 | 0 | node->stats.cache_misses += 1; /* stats update */ |
773 | |
|
774 | 0 | if (found) |
775 | 0 | { |
776 | | /* |
777 | | * A cache entry was found, but the scan for that entry |
778 | | * did not run to completion. We'll just remove all |
779 | | * tuples and start again. It might be tempting to |
780 | | * continue where we left off, but there's no guarantee |
781 | | * the outer node will produce the tuples in the same |
782 | | * order as it did last time. |
783 | | */ |
784 | 0 | entry_purge_tuples(node, entry); |
785 | 0 | } |
786 | | |
787 | | /* Scan the outer node for a tuple to cache */ |
788 | 0 | outerNode = outerPlanState(node); |
789 | 0 | outerslot = ExecProcNode(outerNode); |
790 | 0 | if (TupIsNull(outerslot)) |
791 | 0 | { |
792 | | /* |
793 | | * cache_lookup may have returned NULL due to failure to |
794 | | * free enough cache space, so ensure we don't do anything |
795 | | * here that assumes it worked. There's no need to go into |
796 | | * bypass mode here as we're setting mstatus to end of |
797 | | * scan. |
798 | | */ |
799 | 0 | if (likely(entry)) |
800 | 0 | entry->complete = true; |
801 | |
|
802 | 0 | node->mstatus = MEMO_END_OF_SCAN; |
803 | 0 | return NULL; |
804 | 0 | } |
805 | | |
806 | 0 | node->entry = entry; |
807 | | |
808 | | /* |
809 | | * If we failed to create the entry or failed to store the |
810 | | * tuple in the entry, then go into bypass mode. |
811 | | */ |
812 | 0 | if (unlikely(entry == NULL || |
813 | 0 | !cache_store_tuple(node, outerslot))) |
814 | 0 | { |
815 | 0 | node->stats.cache_overflows += 1; /* stats update */ |
816 | |
|
817 | 0 | node->mstatus = MEMO_CACHE_BYPASS_MODE; |
818 | | |
819 | | /* |
820 | | * No need to clear out last_tuple as we'll stay in bypass |
821 | | * mode until the end of the scan. |
822 | | */ |
823 | 0 | } |
824 | 0 | else |
825 | 0 | { |
826 | | /* |
827 | | * If we only expect a single row from this scan then we |
828 | | * can mark that we're not expecting more. This allows |
829 | | * cache lookups to work even when the scan has not been |
830 | | * executed to completion. |
831 | | */ |
832 | 0 | entry->complete = node->singlerow; |
833 | 0 | node->mstatus = MEMO_FILLING_CACHE; |
834 | 0 | } |
835 | |
|
836 | 0 | slot = node->ss.ps.ps_ResultTupleSlot; |
837 | 0 | ExecCopySlot(slot, outerslot); |
838 | 0 | return slot; |
839 | 0 | } |
840 | | |
841 | 0 | case MEMO_CACHE_FETCH_NEXT_TUPLE: |
842 | 0 | { |
843 | | /* We shouldn't be in this state if these are not set */ |
844 | 0 | Assert(node->entry != NULL); |
845 | 0 | Assert(node->last_tuple != NULL); |
846 | | |
847 | | /* Skip to the next tuple to output */ |
848 | 0 | node->last_tuple = node->last_tuple->next; |
849 | | |
850 | | /* No more tuples in the cache */ |
851 | 0 | if (node->last_tuple == NULL) |
852 | 0 | { |
853 | 0 | node->mstatus = MEMO_END_OF_SCAN; |
854 | 0 | return NULL; |
855 | 0 | } |
856 | | |
857 | 0 | slot = node->ss.ps.ps_ResultTupleSlot; |
858 | 0 | ExecStoreMinimalTuple(node->last_tuple->mintuple, slot, |
859 | 0 | false); |
860 | |
|
861 | 0 | return slot; |
862 | 0 | } |
863 | | |
864 | 0 | case MEMO_FILLING_CACHE: |
865 | 0 | { |
866 | 0 | TupleTableSlot *outerslot; |
867 | 0 | MemoizeEntry *entry = node->entry; |
868 | | |
869 | | /* entry should already have been set by MEMO_CACHE_LOOKUP */ |
870 | 0 | Assert(entry != NULL); |
871 | | |
872 | | /* |
873 | | * When in the MEMO_FILLING_CACHE state, we've just had a |
874 | | * cache miss and are populating the cache with the current |
875 | | * scan tuples. |
876 | | */ |
877 | 0 | outerNode = outerPlanState(node); |
878 | 0 | outerslot = ExecProcNode(outerNode); |
879 | 0 | if (TupIsNull(outerslot)) |
880 | 0 | { |
881 | | /* No more tuples. Mark it as complete */ |
882 | 0 | entry->complete = true; |
883 | 0 | node->mstatus = MEMO_END_OF_SCAN; |
884 | 0 | return NULL; |
885 | 0 | } |
886 | | |
887 | | /* |
888 | | * Validate if the planner properly set the singlerow flag. It |
889 | | * should only set that if each cache entry can, at most, |
890 | | * return 1 row. |
891 | | */ |
892 | 0 | if (unlikely(entry->complete)) |
893 | 0 | elog(ERROR, "cache entry already complete"); |
894 | | |
895 | | /* Record the tuple in the current cache entry */ |
896 | 0 | if (unlikely(!cache_store_tuple(node, outerslot))) |
897 | 0 | { |
898 | | /* Couldn't store it? Handle overflow */ |
899 | 0 | node->stats.cache_overflows += 1; /* stats update */ |
900 | |
|
901 | 0 | node->mstatus = MEMO_CACHE_BYPASS_MODE; |
902 | | |
903 | | /* |
904 | | * No need to clear out entry or last_tuple as we'll stay |
905 | | * in bypass mode until the end of the scan. |
906 | | */ |
907 | 0 | } |
908 | |
|
909 | 0 | slot = node->ss.ps.ps_ResultTupleSlot; |
910 | 0 | ExecCopySlot(slot, outerslot); |
911 | 0 | return slot; |
912 | 0 | } |
913 | | |
914 | 0 | case MEMO_CACHE_BYPASS_MODE: |
915 | 0 | { |
916 | 0 | TupleTableSlot *outerslot; |
917 | | |
918 | | /* |
919 | | * When in bypass mode we just continue to read tuples without |
920 | | * caching. We need to wait until the next rescan before we |
921 | | * can come out of this mode. |
922 | | */ |
923 | 0 | outerNode = outerPlanState(node); |
924 | 0 | outerslot = ExecProcNode(outerNode); |
925 | 0 | if (TupIsNull(outerslot)) |
926 | 0 | { |
927 | 0 | node->mstatus = MEMO_END_OF_SCAN; |
928 | 0 | return NULL; |
929 | 0 | } |
930 | | |
931 | 0 | slot = node->ss.ps.ps_ResultTupleSlot; |
932 | 0 | ExecCopySlot(slot, outerslot); |
933 | 0 | return slot; |
934 | 0 | } |
935 | | |
936 | 0 | case MEMO_END_OF_SCAN: |
937 | | |
938 | | /* |
939 | | * We've already returned NULL for this scan, but just in case |
940 | | * something calls us again by mistake. |
941 | | */ |
942 | 0 | return NULL; |
943 | | |
944 | 0 | default: |
945 | 0 | elog(ERROR, "unrecognized memoize state: %d", |
946 | 0 | (int) node->mstatus); |
947 | 0 | return NULL; |
948 | 0 | } /* switch */ |
949 | 0 | } |
950 | | |
951 | | MemoizeState * |
952 | | ExecInitMemoize(Memoize *node, EState *estate, int eflags) |
953 | 0 | { |
954 | 0 | MemoizeState *mstate = makeNode(MemoizeState); |
955 | 0 | Plan *outerNode; |
956 | 0 | int i; |
957 | 0 | int nkeys; |
958 | 0 | Oid *eqfuncoids; |
959 | | |
960 | | /* check for unsupported flags */ |
961 | 0 | Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK))); |
962 | |
|
963 | 0 | mstate->ss.ps.plan = (Plan *) node; |
964 | 0 | mstate->ss.ps.state = estate; |
965 | 0 | mstate->ss.ps.ExecProcNode = ExecMemoize; |
966 | | |
967 | | /* |
968 | | * Miscellaneous initialization |
969 | | * |
970 | | * create expression context for node |
971 | | */ |
972 | 0 | ExecAssignExprContext(estate, &mstate->ss.ps); |
973 | |
|
974 | 0 | outerNode = outerPlan(node); |
975 | 0 | outerPlanState(mstate) = ExecInitNode(outerNode, estate, eflags); |
976 | | |
977 | | /* |
978 | | * Initialize return slot and type. No need to initialize projection info |
979 | | * because this node doesn't do projections. |
980 | | */ |
981 | 0 | ExecInitResultTupleSlotTL(&mstate->ss.ps, &TTSOpsMinimalTuple); |
982 | 0 | mstate->ss.ps.ps_ProjInfo = NULL; |
983 | | |
984 | | /* |
985 | | * Initialize scan slot and type. |
986 | | */ |
987 | 0 | ExecCreateScanSlotFromOuterPlan(estate, &mstate->ss, &TTSOpsMinimalTuple); |
988 | | |
989 | | /* |
990 | | * Set the state machine to lookup the cache. We won't find anything |
991 | | * until we cache something, but this saves a special case to create the |
992 | | * first entry. |
993 | | */ |
994 | 0 | mstate->mstatus = MEMO_CACHE_LOOKUP; |
995 | |
|
996 | 0 | mstate->nkeys = nkeys = node->numKeys; |
997 | 0 | mstate->hashkeydesc = ExecTypeFromExprList(node->param_exprs); |
998 | 0 | mstate->tableslot = MakeSingleTupleTableSlot(mstate->hashkeydesc, |
999 | 0 | &TTSOpsMinimalTuple); |
1000 | 0 | mstate->probeslot = MakeSingleTupleTableSlot(mstate->hashkeydesc, |
1001 | 0 | &TTSOpsVirtual); |
1002 | |
|
1003 | 0 | mstate->param_exprs = (ExprState **) palloc(nkeys * sizeof(ExprState *)); |
1004 | 0 | mstate->collations = node->collations; /* Just point directly to the plan |
1005 | | * data */ |
1006 | 0 | mstate->hashfunctions = (FmgrInfo *) palloc(nkeys * sizeof(FmgrInfo)); |
1007 | |
|
1008 | 0 | eqfuncoids = palloc(nkeys * sizeof(Oid)); |
1009 | |
|
1010 | 0 | for (i = 0; i < nkeys; i++) |
1011 | 0 | { |
1012 | 0 | Oid hashop = node->hashOperators[i]; |
1013 | 0 | Oid left_hashfn; |
1014 | 0 | Oid right_hashfn; |
1015 | 0 | Expr *param_expr = (Expr *) list_nth(node->param_exprs, i); |
1016 | |
|
1017 | 0 | if (!get_op_hash_functions(hashop, &left_hashfn, &right_hashfn)) |
1018 | 0 | elog(ERROR, "could not find hash function for hash operator %u", |
1019 | 0 | hashop); |
1020 | | |
1021 | 0 | fmgr_info(left_hashfn, &mstate->hashfunctions[i]); |
1022 | |
|
1023 | 0 | mstate->param_exprs[i] = ExecInitExpr(param_expr, (PlanState *) mstate); |
1024 | 0 | eqfuncoids[i] = get_opcode(hashop); |
1025 | 0 | } |
1026 | | |
1027 | 0 | mstate->cache_eq_expr = ExecBuildParamSetEqual(mstate->hashkeydesc, |
1028 | 0 | &TTSOpsMinimalTuple, |
1029 | 0 | &TTSOpsVirtual, |
1030 | 0 | eqfuncoids, |
1031 | 0 | node->collations, |
1032 | 0 | node->param_exprs, |
1033 | 0 | (PlanState *) mstate); |
1034 | |
|
1035 | 0 | pfree(eqfuncoids); |
1036 | 0 | mstate->mem_used = 0; |
1037 | | |
1038 | | /* Limit the total memory consumed by the cache to this */ |
1039 | 0 | mstate->mem_limit = get_hash_memory_limit(); |
1040 | | |
1041 | | /* A memory context dedicated for the cache */ |
1042 | 0 | mstate->tableContext = AllocSetContextCreate(CurrentMemoryContext, |
1043 | 0 | "MemoizeHashTable", |
1044 | 0 | ALLOCSET_DEFAULT_SIZES); |
1045 | |
|
1046 | 0 | dlist_init(&mstate->lru_list); |
1047 | 0 | mstate->last_tuple = NULL; |
1048 | 0 | mstate->entry = NULL; |
1049 | | |
1050 | | /* |
1051 | | * Mark if we can assume the cache entry is completed after we get the |
1052 | | * first record for it. Some callers might not call us again after |
1053 | | * getting the first match. e.g. A join operator performing a unique join |
1054 | | * is able to skip to the next outer tuple after getting the first |
1055 | | * matching inner tuple. In this case, the cache entry is complete after |
1056 | | * getting the first tuple. This allows us to mark it as so. |
1057 | | */ |
1058 | 0 | mstate->singlerow = node->singlerow; |
1059 | 0 | mstate->keyparamids = node->keyparamids; |
1060 | | |
1061 | | /* |
1062 | | * Record if the cache keys should be compared bit by bit, or logically |
1063 | | * using the type's hash equality operator |
1064 | | */ |
1065 | 0 | mstate->binary_mode = node->binary_mode; |
1066 | | |
1067 | | /* Zero the statistics counters */ |
1068 | 0 | memset(&mstate->stats, 0, sizeof(MemoizeInstrumentation)); |
1069 | | |
1070 | | /* |
1071 | | * Because it may require a large allocation, we delay building of the |
1072 | | * hash table until executor run. |
1073 | | */ |
1074 | 0 | mstate->hashtable = NULL; |
1075 | |
|
1076 | 0 | return mstate; |
1077 | 0 | } |
1078 | | |
1079 | | void |
1080 | | ExecEndMemoize(MemoizeState *node) |
1081 | 0 | { |
1082 | | #ifdef USE_ASSERT_CHECKING |
1083 | | /* Validate the memory accounting code is correct in assert builds. */ |
1084 | | if (node->hashtable != NULL) |
1085 | | { |
1086 | | int count; |
1087 | | uint64 mem = 0; |
1088 | | memoize_iterator i; |
1089 | | MemoizeEntry *entry; |
1090 | | |
1091 | | memoize_start_iterate(node->hashtable, &i); |
1092 | | |
1093 | | count = 0; |
1094 | | while ((entry = memoize_iterate(node->hashtable, &i)) != NULL) |
1095 | | { |
1096 | | MemoizeTuple *tuple = entry->tuplehead; |
1097 | | |
1098 | | mem += EMPTY_ENTRY_MEMORY_BYTES(entry); |
1099 | | while (tuple != NULL) |
1100 | | { |
1101 | | mem += CACHE_TUPLE_BYTES(tuple); |
1102 | | tuple = tuple->next; |
1103 | | } |
1104 | | count++; |
1105 | | } |
1106 | | |
1107 | | Assert(count == node->hashtable->members); |
1108 | | Assert(mem == node->mem_used); |
1109 | | } |
1110 | | #endif |
1111 | | |
1112 | | /* |
1113 | | * When ending a parallel worker, copy the statistics gathered by the |
1114 | | * worker back into shared memory so that it can be picked up by the main |
1115 | | * process to report in EXPLAIN ANALYZE. |
1116 | | */ |
1117 | 0 | if (node->shared_info != NULL && IsParallelWorker()) |
1118 | 0 | { |
1119 | 0 | MemoizeInstrumentation *si; |
1120 | | |
1121 | | /* Make mem_peak available for EXPLAIN */ |
1122 | 0 | if (node->stats.mem_peak == 0) |
1123 | 0 | node->stats.mem_peak = node->mem_used; |
1124 | |
|
1125 | 0 | Assert(ParallelWorkerNumber <= node->shared_info->num_workers); |
1126 | 0 | si = &node->shared_info->sinstrument[ParallelWorkerNumber]; |
1127 | 0 | memcpy(si, &node->stats, sizeof(MemoizeInstrumentation)); |
1128 | 0 | } |
1129 | | |
1130 | | /* Remove the cache context */ |
1131 | 0 | MemoryContextDelete(node->tableContext); |
1132 | | |
1133 | | /* |
1134 | | * shut down the subplan |
1135 | | */ |
1136 | 0 | ExecEndNode(outerPlanState(node)); |
1137 | 0 | } |
1138 | | |
1139 | | void |
1140 | | ExecReScanMemoize(MemoizeState *node) |
1141 | 0 | { |
1142 | 0 | PlanState *outerPlan = outerPlanState(node); |
1143 | | |
1144 | | /* Mark that we must lookup the cache for a new set of parameters */ |
1145 | 0 | node->mstatus = MEMO_CACHE_LOOKUP; |
1146 | | |
1147 | | /* nullify pointers used for the last scan */ |
1148 | 0 | node->entry = NULL; |
1149 | 0 | node->last_tuple = NULL; |
1150 | | |
1151 | | /* |
1152 | | * if chgParam of subnode is not null then plan will be re-scanned by |
1153 | | * first ExecProcNode. |
1154 | | */ |
1155 | 0 | if (outerPlan->chgParam == NULL) |
1156 | 0 | ExecReScan(outerPlan); |
1157 | | |
1158 | | /* |
1159 | | * Purge the entire cache if a parameter changed that is not part of the |
1160 | | * cache key. |
1161 | | */ |
1162 | 0 | if (bms_nonempty_difference(outerPlan->chgParam, node->keyparamids)) |
1163 | 0 | cache_purge_all(node); |
1164 | 0 | } |
1165 | | |
1166 | | /* |
1167 | | * ExecEstimateCacheEntryOverheadBytes |
1168 | | * For use in the query planner to help it estimate the amount of memory |
1169 | | * required to store a single entry in the cache. |
1170 | | */ |
1171 | | double |
1172 | | ExecEstimateCacheEntryOverheadBytes(double ntuples) |
1173 | 0 | { |
1174 | 0 | return sizeof(MemoizeEntry) + sizeof(MemoizeKey) + sizeof(MemoizeTuple) * |
1175 | 0 | ntuples; |
1176 | 0 | } |
1177 | | |
1178 | | /* ---------------------------------------------------------------- |
1179 | | * Parallel Query Support |
1180 | | * ---------------------------------------------------------------- |
1181 | | */ |
1182 | | |
1183 | | /* ---------------------------------------------------------------- |
1184 | | * ExecMemoizeEstimate |
1185 | | * |
1186 | | * Estimate space required to propagate memoize statistics. |
1187 | | * ---------------------------------------------------------------- |
1188 | | */ |
1189 | | void |
1190 | | ExecMemoizeEstimate(MemoizeState *node, ParallelContext *pcxt) |
1191 | 0 | { |
1192 | 0 | Size size; |
1193 | | |
1194 | | /* don't need this if not instrumenting or no workers */ |
1195 | 0 | if (!node->ss.ps.instrument || pcxt->nworkers == 0) |
1196 | 0 | return; |
1197 | | |
1198 | 0 | size = mul_size(pcxt->nworkers, sizeof(MemoizeInstrumentation)); |
1199 | 0 | size = add_size(size, offsetof(SharedMemoizeInfo, sinstrument)); |
1200 | 0 | shm_toc_estimate_chunk(&pcxt->estimator, size); |
1201 | 0 | shm_toc_estimate_keys(&pcxt->estimator, 1); |
1202 | 0 | } |
1203 | | |
1204 | | /* ---------------------------------------------------------------- |
1205 | | * ExecMemoizeInitializeDSM |
1206 | | * |
1207 | | * Initialize DSM space for memoize statistics. |
1208 | | * ---------------------------------------------------------------- |
1209 | | */ |
1210 | | void |
1211 | | ExecMemoizeInitializeDSM(MemoizeState *node, ParallelContext *pcxt) |
1212 | 0 | { |
1213 | 0 | Size size; |
1214 | | |
1215 | | /* don't need this if not instrumenting or no workers */ |
1216 | 0 | if (!node->ss.ps.instrument || pcxt->nworkers == 0) |
1217 | 0 | return; |
1218 | | |
1219 | 0 | size = offsetof(SharedMemoizeInfo, sinstrument) |
1220 | 0 | + pcxt->nworkers * sizeof(MemoizeInstrumentation); |
1221 | 0 | node->shared_info = shm_toc_allocate(pcxt->toc, size); |
1222 | | /* ensure any unfilled slots will contain zeroes */ |
1223 | 0 | memset(node->shared_info, 0, size); |
1224 | 0 | node->shared_info->num_workers = pcxt->nworkers; |
1225 | 0 | shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id, |
1226 | 0 | node->shared_info); |
1227 | 0 | } |
1228 | | |
1229 | | /* ---------------------------------------------------------------- |
1230 | | * ExecMemoizeInitializeWorker |
1231 | | * |
1232 | | * Attach worker to DSM space for memoize statistics. |
1233 | | * ---------------------------------------------------------------- |
1234 | | */ |
1235 | | void |
1236 | | ExecMemoizeInitializeWorker(MemoizeState *node, ParallelWorkerContext *pwcxt) |
1237 | 0 | { |
1238 | 0 | node->shared_info = |
1239 | 0 | shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, true); |
1240 | 0 | } |
1241 | | |
1242 | | /* ---------------------------------------------------------------- |
1243 | | * ExecMemoizeRetrieveInstrumentation |
1244 | | * |
1245 | | * Transfer memoize statistics from DSM to private memory. |
1246 | | * ---------------------------------------------------------------- |
1247 | | */ |
1248 | | void |
1249 | | ExecMemoizeRetrieveInstrumentation(MemoizeState *node) |
1250 | 0 | { |
1251 | 0 | Size size; |
1252 | 0 | SharedMemoizeInfo *si; |
1253 | |
|
1254 | 0 | if (node->shared_info == NULL) |
1255 | 0 | return; |
1256 | | |
1257 | 0 | size = offsetof(SharedMemoizeInfo, sinstrument) |
1258 | 0 | + node->shared_info->num_workers * sizeof(MemoizeInstrumentation); |
1259 | 0 | si = palloc(size); |
1260 | 0 | memcpy(si, node->shared_info, size); |
1261 | 0 | node->shared_info = si; |
1262 | 0 | } |