/src/postgres/src/backend/commands/copyfrom.c
Line | Count | Source |
1 | | /*------------------------------------------------------------------------- |
2 | | * |
3 | | * copyfrom.c |
4 | | * COPY <table> FROM file/program/client |
5 | | * |
6 | | * This file contains routines needed to efficiently load tuples into a |
7 | | * table. That includes looking up the correct partition, firing triggers, |
8 | | * calling the table AM function to insert the data, and updating indexes. |
9 | | * Reading data from the input file or client and parsing it into Datums |
10 | | * is handled in copyfromparse.c. |
11 | | * |
12 | | * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group |
13 | | * Portions Copyright (c) 1994, Regents of the University of California |
14 | | * |
15 | | * |
16 | | * IDENTIFICATION |
17 | | * src/backend/commands/copyfrom.c |
18 | | * |
19 | | *------------------------------------------------------------------------- |
20 | | */ |
21 | | #include "postgres.h" |
22 | | |
23 | | #include <ctype.h> |
24 | | #include <unistd.h> |
25 | | #include <sys/stat.h> |
26 | | |
27 | | #include "access/heapam.h" |
28 | | #include "access/tableam.h" |
29 | | #include "access/xact.h" |
30 | | #include "catalog/namespace.h" |
31 | | #include "commands/copyapi.h" |
32 | | #include "commands/copyfrom_internal.h" |
33 | | #include "commands/progress.h" |
34 | | #include "commands/trigger.h" |
35 | | #include "executor/execPartition.h" |
36 | | #include "executor/executor.h" |
37 | | #include "executor/nodeModifyTable.h" |
38 | | #include "executor/tuptable.h" |
39 | | #include "foreign/fdwapi.h" |
40 | | #include "mb/pg_wchar.h" |
41 | | #include "miscadmin.h" |
42 | | #include "nodes/miscnodes.h" |
43 | | #include "optimizer/optimizer.h" |
44 | | #include "pgstat.h" |
45 | | #include "rewrite/rewriteHandler.h" |
46 | | #include "storage/fd.h" |
47 | | #include "tcop/tcopprot.h" |
48 | | #include "utils/lsyscache.h" |
49 | | #include "utils/memutils.h" |
50 | | #include "utils/portal.h" |
51 | | #include "utils/rel.h" |
52 | | #include "utils/snapmgr.h" |
53 | | |
54 | | /* |
55 | | * No more than this many tuples per CopyMultiInsertBuffer |
56 | | * |
57 | | * Caution: Don't make this too big, as we could end up with this many |
58 | | * CopyMultiInsertBuffer items stored in CopyMultiInsertInfo's |
59 | | * multiInsertBuffers list. Increasing this can cause quadratic growth in |
60 | | * memory requirements during copies into partitioned tables with a large |
61 | | * number of partitions. |
62 | | */ |
63 | 0 | #define MAX_BUFFERED_TUPLES 1000 |
64 | | |
65 | | /* |
66 | | * Flush buffers if there are >= this many bytes, as counted by the input |
67 | | * size, of tuples stored. |
68 | | */ |
69 | 0 | #define MAX_BUFFERED_BYTES 65535 |
70 | | |
71 | | /* |
72 | | * Trim the list of buffers back down to this number after flushing. This |
73 | | * must be >= 2. |
74 | | */ |
75 | 0 | #define MAX_PARTITION_BUFFERS 32 |
76 | | |
77 | | /* Stores multi-insert data related to a single relation in CopyFrom. */ |
78 | | typedef struct CopyMultiInsertBuffer |
79 | | { |
80 | | TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */ |
81 | | ResultRelInfo *resultRelInfo; /* ResultRelInfo for 'relid' */ |
82 | | BulkInsertState bistate; /* BulkInsertState for this rel if plain |
83 | | * table; NULL if foreign table */ |
84 | | int nused; /* number of 'slots' containing tuples */ |
85 | | uint64 linenos[MAX_BUFFERED_TUPLES]; /* Line # of tuple in copy |
86 | | * stream */ |
87 | | } CopyMultiInsertBuffer; |
88 | | |
89 | | /* |
90 | | * Stores one or many CopyMultiInsertBuffers and details about the size and |
91 | | * number of tuples which are stored in them. This allows multiple buffers to |
92 | | * exist at once when COPYing into a partitioned table. |
93 | | */ |
94 | | typedef struct CopyMultiInsertInfo |
95 | | { |
96 | | List *multiInsertBuffers; /* List of tracked CopyMultiInsertBuffers */ |
97 | | int bufferedTuples; /* number of tuples buffered over all buffers */ |
98 | | int bufferedBytes; /* number of bytes from all buffered tuples */ |
99 | | CopyFromState cstate; /* Copy state for this CopyMultiInsertInfo */ |
100 | | EState *estate; /* Executor state used for COPY */ |
101 | | CommandId mycid; /* Command Id used for COPY */ |
102 | | int ti_options; /* table insert options */ |
103 | | } CopyMultiInsertInfo; |
104 | | |
105 | | |
106 | | /* non-export function prototypes */ |
107 | | static void ClosePipeFromProgram(CopyFromState cstate); |
108 | | |
109 | | /* |
110 | | * Built-in format-specific routines. One-row callbacks are defined in |
111 | | * copyfromparse.c. |
112 | | */ |
113 | | static void CopyFromTextLikeInFunc(CopyFromState cstate, Oid atttypid, FmgrInfo *finfo, |
114 | | Oid *typioparam); |
115 | | static void CopyFromTextLikeStart(CopyFromState cstate, TupleDesc tupDesc); |
116 | | static void CopyFromTextLikeEnd(CopyFromState cstate); |
117 | | static void CopyFromBinaryInFunc(CopyFromState cstate, Oid atttypid, |
118 | | FmgrInfo *finfo, Oid *typioparam); |
119 | | static void CopyFromBinaryStart(CopyFromState cstate, TupleDesc tupDesc); |
120 | | static void CopyFromBinaryEnd(CopyFromState cstate); |
121 | | |
122 | | |
123 | | /* |
124 | | * COPY FROM routines for built-in formats. |
125 | | * |
126 | | * CSV and text formats share the same TextLike routines except for the |
127 | | * one-row callback. |
128 | | */ |
129 | | |
130 | | /* text format */ |
131 | | static const CopyFromRoutine CopyFromRoutineText = { |
132 | | .CopyFromInFunc = CopyFromTextLikeInFunc, |
133 | | .CopyFromStart = CopyFromTextLikeStart, |
134 | | .CopyFromOneRow = CopyFromTextOneRow, |
135 | | .CopyFromEnd = CopyFromTextLikeEnd, |
136 | | }; |
137 | | |
138 | | /* CSV format */ |
139 | | static const CopyFromRoutine CopyFromRoutineCSV = { |
140 | | .CopyFromInFunc = CopyFromTextLikeInFunc, |
141 | | .CopyFromStart = CopyFromTextLikeStart, |
142 | | .CopyFromOneRow = CopyFromCSVOneRow, |
143 | | .CopyFromEnd = CopyFromTextLikeEnd, |
144 | | }; |
145 | | |
146 | | /* binary format */ |
147 | | static const CopyFromRoutine CopyFromRoutineBinary = { |
148 | | .CopyFromInFunc = CopyFromBinaryInFunc, |
149 | | .CopyFromStart = CopyFromBinaryStart, |
150 | | .CopyFromOneRow = CopyFromBinaryOneRow, |
151 | | .CopyFromEnd = CopyFromBinaryEnd, |
152 | | }; |
153 | | |
154 | | /* Return a COPY FROM routine for the given options */ |
155 | | static const CopyFromRoutine * |
156 | | CopyFromGetRoutine(const CopyFormatOptions *opts) |
157 | 0 | { |
158 | 0 | if (opts->csv_mode) |
159 | 0 | return &CopyFromRoutineCSV; |
160 | 0 | else if (opts->binary) |
161 | 0 | return &CopyFromRoutineBinary; |
162 | | |
163 | | /* default is text */ |
164 | 0 | return &CopyFromRoutineText; |
165 | 0 | } |
166 | | |
167 | | /* Implementation of the start callback for text and CSV formats */ |
168 | | static void |
169 | | CopyFromTextLikeStart(CopyFromState cstate, TupleDesc tupDesc) |
170 | 0 | { |
171 | 0 | AttrNumber attr_count; |
172 | | |
173 | | /* |
174 | | * If encoding conversion is needed, we need another buffer to hold the |
175 | | * converted input data. Otherwise, we can just point input_buf to the |
176 | | * same buffer as raw_buf. |
177 | | */ |
178 | 0 | if (cstate->need_transcoding) |
179 | 0 | { |
180 | 0 | cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1); |
181 | 0 | cstate->input_buf_index = cstate->input_buf_len = 0; |
182 | 0 | } |
183 | 0 | else |
184 | 0 | cstate->input_buf = cstate->raw_buf; |
185 | 0 | cstate->input_reached_eof = false; |
186 | |
|
187 | 0 | initStringInfo(&cstate->line_buf); |
188 | | |
189 | | /* |
190 | | * Create workspace for CopyReadAttributes results; used by CSV and text |
191 | | * format. |
192 | | */ |
193 | 0 | attr_count = list_length(cstate->attnumlist); |
194 | 0 | cstate->max_fields = attr_count; |
195 | 0 | cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *)); |
196 | 0 | } |
197 | | |
198 | | /* |
199 | | * Implementation of the infunc callback for text and CSV formats. Assign |
200 | | * the input function data to the given *finfo. |
201 | | */ |
202 | | static void |
203 | | CopyFromTextLikeInFunc(CopyFromState cstate, Oid atttypid, FmgrInfo *finfo, |
204 | | Oid *typioparam) |
205 | 0 | { |
206 | 0 | Oid func_oid; |
207 | |
|
208 | 0 | getTypeInputInfo(atttypid, &func_oid, typioparam); |
209 | 0 | fmgr_info(func_oid, finfo); |
210 | 0 | } |
211 | | |
212 | | /* Implementation of the end callback for text and CSV formats */ |
213 | | static void |
214 | | CopyFromTextLikeEnd(CopyFromState cstate) |
215 | 0 | { |
216 | | /* nothing to do */ |
217 | 0 | } |
218 | | |
219 | | /* Implementation of the start callback for binary format */ |
220 | | static void |
221 | | CopyFromBinaryStart(CopyFromState cstate, TupleDesc tupDesc) |
222 | 0 | { |
223 | | /* Read and verify binary header */ |
224 | 0 | ReceiveCopyBinaryHeader(cstate); |
225 | 0 | } |
226 | | |
227 | | /* |
228 | | * Implementation of the infunc callback for binary format. Assign |
229 | | * the binary input function to the given *finfo. |
230 | | */ |
231 | | static void |
232 | | CopyFromBinaryInFunc(CopyFromState cstate, Oid atttypid, |
233 | | FmgrInfo *finfo, Oid *typioparam) |
234 | 0 | { |
235 | 0 | Oid func_oid; |
236 | |
|
237 | 0 | getTypeBinaryInputInfo(atttypid, &func_oid, typioparam); |
238 | 0 | fmgr_info(func_oid, finfo); |
239 | 0 | } |
240 | | |
241 | | /* Implementation of the end callback for binary format */ |
242 | | static void |
243 | | CopyFromBinaryEnd(CopyFromState cstate) |
244 | 0 | { |
245 | | /* nothing to do */ |
246 | 0 | } |
247 | | |
248 | | /* |
249 | | * error context callback for COPY FROM |
250 | | * |
251 | | * The argument for the error context must be CopyFromState. |
252 | | */ |
253 | | void |
254 | | CopyFromErrorCallback(void *arg) |
255 | 0 | { |
256 | 0 | CopyFromState cstate = (CopyFromState) arg; |
257 | |
|
258 | 0 | if (cstate->relname_only) |
259 | 0 | { |
260 | 0 | errcontext("COPY %s", |
261 | 0 | cstate->cur_relname); |
262 | 0 | return; |
263 | 0 | } |
264 | 0 | if (cstate->opts.binary) |
265 | 0 | { |
266 | | /* can't usefully display the data */ |
267 | 0 | if (cstate->cur_attname) |
268 | 0 | errcontext("COPY %s, line %" PRIu64 ", column %s", |
269 | 0 | cstate->cur_relname, |
270 | 0 | cstate->cur_lineno, |
271 | 0 | cstate->cur_attname); |
272 | 0 | else |
273 | 0 | errcontext("COPY %s, line %" PRIu64, |
274 | 0 | cstate->cur_relname, |
275 | 0 | cstate->cur_lineno); |
276 | 0 | } |
277 | 0 | else |
278 | 0 | { |
279 | 0 | if (cstate->cur_attname && cstate->cur_attval) |
280 | 0 | { |
281 | | /* error is relevant to a particular column */ |
282 | 0 | char *attval; |
283 | |
|
284 | 0 | attval = CopyLimitPrintoutLength(cstate->cur_attval); |
285 | 0 | errcontext("COPY %s, line %" PRIu64 ", column %s: \"%s\"", |
286 | 0 | cstate->cur_relname, |
287 | 0 | cstate->cur_lineno, |
288 | 0 | cstate->cur_attname, |
289 | 0 | attval); |
290 | 0 | pfree(attval); |
291 | 0 | } |
292 | 0 | else if (cstate->cur_attname) |
293 | 0 | { |
294 | | /* error is relevant to a particular column, value is NULL */ |
295 | 0 | errcontext("COPY %s, line %" PRIu64 ", column %s: null input", |
296 | 0 | cstate->cur_relname, |
297 | 0 | cstate->cur_lineno, |
298 | 0 | cstate->cur_attname); |
299 | 0 | } |
300 | 0 | else |
301 | 0 | { |
302 | | /* |
303 | | * Error is relevant to a particular line. |
304 | | * |
305 | | * If line_buf still contains the correct line, print it. |
306 | | */ |
307 | 0 | if (cstate->line_buf_valid) |
308 | 0 | { |
309 | 0 | char *lineval; |
310 | |
|
311 | 0 | lineval = CopyLimitPrintoutLength(cstate->line_buf.data); |
312 | 0 | errcontext("COPY %s, line %" PRIu64 ": \"%s\"", |
313 | 0 | cstate->cur_relname, |
314 | 0 | cstate->cur_lineno, lineval); |
315 | 0 | pfree(lineval); |
316 | 0 | } |
317 | 0 | else |
318 | 0 | { |
319 | 0 | errcontext("COPY %s, line %" PRIu64, |
320 | 0 | cstate->cur_relname, |
321 | 0 | cstate->cur_lineno); |
322 | 0 | } |
323 | 0 | } |
324 | 0 | } |
325 | 0 | } |
326 | | |
327 | | /* |
328 | | * Make sure we don't print an unreasonable amount of COPY data in a message. |
329 | | * |
330 | | * Returns a pstrdup'd copy of the input. |
331 | | */ |
332 | | char * |
333 | | CopyLimitPrintoutLength(const char *str) |
334 | 0 | { |
335 | 0 | #define MAX_COPY_DATA_DISPLAY 100 |
336 | |
|
337 | 0 | int slen = strlen(str); |
338 | 0 | int len; |
339 | 0 | char *res; |
340 | | |
341 | | /* Fast path if definitely okay */ |
342 | 0 | if (slen <= MAX_COPY_DATA_DISPLAY) |
343 | 0 | return pstrdup(str); |
344 | | |
345 | | /* Apply encoding-dependent truncation */ |
346 | 0 | len = pg_mbcliplen(str, slen, MAX_COPY_DATA_DISPLAY); |
347 | | |
348 | | /* |
349 | | * Truncate, and add "..." to show we truncated the input. |
350 | | */ |
351 | 0 | res = (char *) palloc(len + 4); |
352 | 0 | memcpy(res, str, len); |
353 | 0 | strcpy(res + len, "..."); |
354 | |
|
355 | 0 | return res; |
356 | 0 | } |
357 | | |
358 | | /* |
359 | | * Allocate memory and initialize a new CopyMultiInsertBuffer for this |
360 | | * ResultRelInfo. |
361 | | */ |
362 | | static CopyMultiInsertBuffer * |
363 | | CopyMultiInsertBufferInit(ResultRelInfo *rri) |
364 | 0 | { |
365 | 0 | CopyMultiInsertBuffer *buffer; |
366 | |
|
367 | 0 | buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer)); |
368 | 0 | memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES); |
369 | 0 | buffer->resultRelInfo = rri; |
370 | 0 | buffer->bistate = (rri->ri_FdwRoutine == NULL) ? GetBulkInsertState() : NULL; |
371 | 0 | buffer->nused = 0; |
372 | |
|
373 | 0 | return buffer; |
374 | 0 | } |
375 | | |
376 | | /* |
377 | | * Make a new buffer for this ResultRelInfo. |
378 | | */ |
379 | | static inline void |
380 | | CopyMultiInsertInfoSetupBuffer(CopyMultiInsertInfo *miinfo, |
381 | | ResultRelInfo *rri) |
382 | 0 | { |
383 | 0 | CopyMultiInsertBuffer *buffer; |
384 | |
|
385 | 0 | buffer = CopyMultiInsertBufferInit(rri); |
386 | | |
387 | | /* Setup back-link so we can easily find this buffer again */ |
388 | 0 | rri->ri_CopyMultiInsertBuffer = buffer; |
389 | | /* Record that we're tracking this buffer */ |
390 | 0 | miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer); |
391 | 0 | } |
392 | | |
393 | | /* |
394 | | * Initialize an already allocated CopyMultiInsertInfo. |
395 | | * |
396 | | * If rri is a non-partitioned table then a CopyMultiInsertBuffer is set up |
397 | | * for that table. |
398 | | */ |
399 | | static void |
400 | | CopyMultiInsertInfoInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri, |
401 | | CopyFromState cstate, EState *estate, CommandId mycid, |
402 | | int ti_options) |
403 | 0 | { |
404 | 0 | miinfo->multiInsertBuffers = NIL; |
405 | 0 | miinfo->bufferedTuples = 0; |
406 | 0 | miinfo->bufferedBytes = 0; |
407 | 0 | miinfo->cstate = cstate; |
408 | 0 | miinfo->estate = estate; |
409 | 0 | miinfo->mycid = mycid; |
410 | 0 | miinfo->ti_options = ti_options; |
411 | | |
412 | | /* |
413 | | * Only setup the buffer when not dealing with a partitioned table. |
414 | | * Buffers for partitioned tables will just be setup when we need to send |
415 | | * tuples their way for the first time. |
416 | | */ |
417 | 0 | if (rri->ri_RelationDesc->rd_rel->relkind != RELKIND_PARTITIONED_TABLE) |
418 | 0 | CopyMultiInsertInfoSetupBuffer(miinfo, rri); |
419 | 0 | } |
420 | | |
421 | | /* |
422 | | * Returns true if the buffers are full |
423 | | */ |
424 | | static inline bool |
425 | | CopyMultiInsertInfoIsFull(CopyMultiInsertInfo *miinfo) |
426 | 0 | { |
427 | 0 | if (miinfo->bufferedTuples >= MAX_BUFFERED_TUPLES || |
428 | 0 | miinfo->bufferedBytes >= MAX_BUFFERED_BYTES) |
429 | 0 | return true; |
430 | 0 | return false; |
431 | 0 | } |
432 | | |
433 | | /* |
434 | | * Returns true if we have no buffered tuples |
435 | | */ |
436 | | static inline bool |
437 | | CopyMultiInsertInfoIsEmpty(CopyMultiInsertInfo *miinfo) |
438 | 0 | { |
439 | 0 | return miinfo->bufferedTuples == 0; |
440 | 0 | } |
441 | | |
442 | | /* |
443 | | * Write the tuples stored in 'buffer' out to the table. |
444 | | */ |
445 | | static inline void |
446 | | CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo, |
447 | | CopyMultiInsertBuffer *buffer, |
448 | | int64 *processed) |
449 | 0 | { |
450 | 0 | CopyFromState cstate = miinfo->cstate; |
451 | 0 | EState *estate = miinfo->estate; |
452 | 0 | int nused = buffer->nused; |
453 | 0 | ResultRelInfo *resultRelInfo = buffer->resultRelInfo; |
454 | 0 | TupleTableSlot **slots = buffer->slots; |
455 | 0 | int i; |
456 | |
|
457 | 0 | if (resultRelInfo->ri_FdwRoutine) |
458 | 0 | { |
459 | 0 | int batch_size = resultRelInfo->ri_BatchSize; |
460 | 0 | int sent = 0; |
461 | |
|
462 | 0 | Assert(buffer->bistate == NULL); |
463 | | |
464 | | /* Ensure that the FDW supports batching and it's enabled */ |
465 | 0 | Assert(resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert); |
466 | 0 | Assert(batch_size > 1); |
467 | | |
468 | | /* |
469 | | * We suppress error context information other than the relation name, |
470 | | * if one of the operations below fails. |
471 | | */ |
472 | 0 | Assert(!cstate->relname_only); |
473 | 0 | cstate->relname_only = true; |
474 | |
|
475 | 0 | while (sent < nused) |
476 | 0 | { |
477 | 0 | int size = (batch_size < nused - sent) ? batch_size : (nused - sent); |
478 | 0 | int inserted = size; |
479 | 0 | TupleTableSlot **rslots; |
480 | | |
481 | | /* insert into foreign table: let the FDW do it */ |
482 | 0 | rslots = |
483 | 0 | resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert(estate, |
484 | 0 | resultRelInfo, |
485 | 0 | &slots[sent], |
486 | 0 | NULL, |
487 | 0 | &inserted); |
488 | |
|
489 | 0 | sent += size; |
490 | | |
491 | | /* No need to do anything if there are no inserted rows */ |
492 | 0 | if (inserted <= 0) |
493 | 0 | continue; |
494 | | |
495 | | /* Triggers on foreign tables should not have transition tables */ |
496 | 0 | Assert(resultRelInfo->ri_TrigDesc == NULL || |
497 | 0 | resultRelInfo->ri_TrigDesc->trig_insert_new_table == false); |
498 | | |
499 | | /* Run AFTER ROW INSERT triggers */ |
500 | 0 | if (resultRelInfo->ri_TrigDesc != NULL && |
501 | 0 | resultRelInfo->ri_TrigDesc->trig_insert_after_row) |
502 | 0 | { |
503 | 0 | Oid relid = RelationGetRelid(resultRelInfo->ri_RelationDesc); |
504 | |
|
505 | 0 | for (i = 0; i < inserted; i++) |
506 | 0 | { |
507 | 0 | TupleTableSlot *slot = rslots[i]; |
508 | | |
509 | | /* |
510 | | * AFTER ROW Triggers might reference the tableoid column, |
511 | | * so (re-)initialize tts_tableOid before evaluating them. |
512 | | */ |
513 | 0 | slot->tts_tableOid = relid; |
514 | |
|
515 | 0 | ExecARInsertTriggers(estate, resultRelInfo, |
516 | 0 | slot, NIL, |
517 | 0 | cstate->transition_capture); |
518 | 0 | } |
519 | 0 | } |
520 | | |
521 | | /* Update the row counter and progress of the COPY command */ |
522 | 0 | *processed += inserted; |
523 | 0 | pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED, |
524 | 0 | *processed); |
525 | 0 | } |
526 | |
|
527 | 0 | for (i = 0; i < nused; i++) |
528 | 0 | ExecClearTuple(slots[i]); |
529 | | |
530 | | /* reset relname_only */ |
531 | 0 | cstate->relname_only = false; |
532 | 0 | } |
533 | 0 | else |
534 | 0 | { |
535 | 0 | CommandId mycid = miinfo->mycid; |
536 | 0 | int ti_options = miinfo->ti_options; |
537 | 0 | bool line_buf_valid = cstate->line_buf_valid; |
538 | 0 | uint64 save_cur_lineno = cstate->cur_lineno; |
539 | 0 | MemoryContext oldcontext; |
540 | |
|
541 | 0 | Assert(buffer->bistate != NULL); |
542 | | |
543 | | /* |
544 | | * Print error context information correctly, if one of the operations |
545 | | * below fails. |
546 | | */ |
547 | 0 | cstate->line_buf_valid = false; |
548 | | |
549 | | /* |
550 | | * table_multi_insert may leak memory, so switch to short-lived memory |
551 | | * context before calling it. |
552 | | */ |
553 | 0 | oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); |
554 | 0 | table_multi_insert(resultRelInfo->ri_RelationDesc, |
555 | 0 | slots, |
556 | 0 | nused, |
557 | 0 | mycid, |
558 | 0 | ti_options, |
559 | 0 | buffer->bistate); |
560 | 0 | MemoryContextSwitchTo(oldcontext); |
561 | |
|
562 | 0 | for (i = 0; i < nused; i++) |
563 | 0 | { |
564 | | /* |
565 | | * If there are any indexes, update them for all the inserted |
566 | | * tuples, and run AFTER ROW INSERT triggers. |
567 | | */ |
568 | 0 | if (resultRelInfo->ri_NumIndices > 0) |
569 | 0 | { |
570 | 0 | List *recheckIndexes; |
571 | |
|
572 | 0 | cstate->cur_lineno = buffer->linenos[i]; |
573 | 0 | recheckIndexes = |
574 | 0 | ExecInsertIndexTuples(resultRelInfo, |
575 | 0 | buffer->slots[i], estate, false, |
576 | 0 | false, NULL, NIL, false); |
577 | 0 | ExecARInsertTriggers(estate, resultRelInfo, |
578 | 0 | slots[i], recheckIndexes, |
579 | 0 | cstate->transition_capture); |
580 | 0 | list_free(recheckIndexes); |
581 | 0 | } |
582 | | |
583 | | /* |
584 | | * There's no indexes, but see if we need to run AFTER ROW INSERT |
585 | | * triggers anyway. |
586 | | */ |
587 | 0 | else if (resultRelInfo->ri_TrigDesc != NULL && |
588 | 0 | (resultRelInfo->ri_TrigDesc->trig_insert_after_row || |
589 | 0 | resultRelInfo->ri_TrigDesc->trig_insert_new_table)) |
590 | 0 | { |
591 | 0 | cstate->cur_lineno = buffer->linenos[i]; |
592 | 0 | ExecARInsertTriggers(estate, resultRelInfo, |
593 | 0 | slots[i], NIL, |
594 | 0 | cstate->transition_capture); |
595 | 0 | } |
596 | |
|
597 | 0 | ExecClearTuple(slots[i]); |
598 | 0 | } |
599 | | |
600 | | /* Update the row counter and progress of the COPY command */ |
601 | 0 | *processed += nused; |
602 | 0 | pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED, |
603 | 0 | *processed); |
604 | | |
605 | | /* reset cur_lineno and line_buf_valid to what they were */ |
606 | 0 | cstate->line_buf_valid = line_buf_valid; |
607 | 0 | cstate->cur_lineno = save_cur_lineno; |
608 | 0 | } |
609 | | |
610 | | /* Mark that all slots are free */ |
611 | 0 | buffer->nused = 0; |
612 | 0 | } |
613 | | |
614 | | /* |
615 | | * Drop used slots and free member for this buffer. |
616 | | * |
617 | | * The buffer must be flushed before cleanup. |
618 | | */ |
619 | | static inline void |
620 | | CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo, |
621 | | CopyMultiInsertBuffer *buffer) |
622 | 0 | { |
623 | 0 | ResultRelInfo *resultRelInfo = buffer->resultRelInfo; |
624 | 0 | int i; |
625 | | |
626 | | /* Ensure buffer was flushed */ |
627 | 0 | Assert(buffer->nused == 0); |
628 | | |
629 | | /* Remove back-link to ourself */ |
630 | 0 | resultRelInfo->ri_CopyMultiInsertBuffer = NULL; |
631 | |
|
632 | 0 | if (resultRelInfo->ri_FdwRoutine == NULL) |
633 | 0 | { |
634 | 0 | Assert(buffer->bistate != NULL); |
635 | 0 | FreeBulkInsertState(buffer->bistate); |
636 | 0 | } |
637 | 0 | else |
638 | 0 | Assert(buffer->bistate == NULL); |
639 | | |
640 | | /* Since we only create slots on demand, just drop the non-null ones. */ |
641 | 0 | for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++) |
642 | 0 | ExecDropSingleTupleTableSlot(buffer->slots[i]); |
643 | |
|
644 | 0 | if (resultRelInfo->ri_FdwRoutine == NULL) |
645 | 0 | table_finish_bulk_insert(resultRelInfo->ri_RelationDesc, |
646 | 0 | miinfo->ti_options); |
647 | |
|
648 | 0 | pfree(buffer); |
649 | 0 | } |
650 | | |
651 | | /* |
652 | | * Write out all stored tuples in all buffers out to the tables. |
653 | | * |
654 | | * Once flushed we also trim the tracked buffers list down to size by removing |
655 | | * the buffers created earliest first. |
656 | | * |
657 | | * Callers should pass 'curr_rri' as the ResultRelInfo that's currently being |
658 | | * used. When cleaning up old buffers we'll never remove the one for |
659 | | * 'curr_rri'. |
660 | | */ |
661 | | static inline void |
662 | | CopyMultiInsertInfoFlush(CopyMultiInsertInfo *miinfo, ResultRelInfo *curr_rri, |
663 | | int64 *processed) |
664 | 0 | { |
665 | 0 | ListCell *lc; |
666 | |
|
667 | 0 | foreach(lc, miinfo->multiInsertBuffers) |
668 | 0 | { |
669 | 0 | CopyMultiInsertBuffer *buffer = (CopyMultiInsertBuffer *) lfirst(lc); |
670 | |
|
671 | 0 | CopyMultiInsertBufferFlush(miinfo, buffer, processed); |
672 | 0 | } |
673 | |
|
674 | 0 | miinfo->bufferedTuples = 0; |
675 | 0 | miinfo->bufferedBytes = 0; |
676 | | |
677 | | /* |
678 | | * Trim the list of tracked buffers down if it exceeds the limit. Here we |
679 | | * remove buffers starting with the ones we created first. It seems less |
680 | | * likely that these older ones will be needed than the ones that were |
681 | | * just created. |
682 | | */ |
683 | 0 | while (list_length(miinfo->multiInsertBuffers) > MAX_PARTITION_BUFFERS) |
684 | 0 | { |
685 | 0 | CopyMultiInsertBuffer *buffer; |
686 | |
|
687 | 0 | buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers); |
688 | | |
689 | | /* |
690 | | * We never want to remove the buffer that's currently being used, so |
691 | | * if we happen to find that then move it to the end of the list. |
692 | | */ |
693 | 0 | if (buffer->resultRelInfo == curr_rri) |
694 | 0 | { |
695 | | /* |
696 | | * The code below would misbehave if we were trying to reduce the |
697 | | * list to less than two items. |
698 | | */ |
699 | 0 | StaticAssertDecl(MAX_PARTITION_BUFFERS >= 2, |
700 | 0 | "MAX_PARTITION_BUFFERS must be >= 2"); |
701 | |
|
702 | 0 | miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers); |
703 | 0 | miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer); |
704 | 0 | buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers); |
705 | 0 | } |
706 | |
|
707 | 0 | CopyMultiInsertBufferCleanup(miinfo, buffer); |
708 | 0 | miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers); |
709 | 0 | } |
710 | 0 | } |
711 | | |
712 | | /* |
713 | | * Cleanup allocated buffers and free memory |
714 | | */ |
715 | | static inline void |
716 | | CopyMultiInsertInfoCleanup(CopyMultiInsertInfo *miinfo) |
717 | 0 | { |
718 | 0 | ListCell *lc; |
719 | |
|
720 | 0 | foreach(lc, miinfo->multiInsertBuffers) |
721 | 0 | CopyMultiInsertBufferCleanup(miinfo, lfirst(lc)); |
722 | |
|
723 | 0 | list_free(miinfo->multiInsertBuffers); |
724 | 0 | } |
725 | | |
726 | | /* |
727 | | * Get the next TupleTableSlot that the next tuple should be stored in. |
728 | | * |
729 | | * Callers must ensure that the buffer is not full. |
730 | | * |
731 | | * Note: 'miinfo' is unused but has been included for consistency with the |
732 | | * other functions in this area. |
733 | | */ |
734 | | static inline TupleTableSlot * |
735 | | CopyMultiInsertInfoNextFreeSlot(CopyMultiInsertInfo *miinfo, |
736 | | ResultRelInfo *rri) |
737 | 0 | { |
738 | 0 | CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer; |
739 | 0 | int nused; |
740 | |
|
741 | 0 | Assert(buffer != NULL); |
742 | 0 | Assert(buffer->nused < MAX_BUFFERED_TUPLES); |
743 | |
|
744 | 0 | nused = buffer->nused; |
745 | |
|
746 | 0 | if (buffer->slots[nused] == NULL) |
747 | 0 | buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL); |
748 | 0 | return buffer->slots[nused]; |
749 | 0 | } |
750 | | |
751 | | /* |
752 | | * Record the previously reserved TupleTableSlot that was reserved by |
753 | | * CopyMultiInsertInfoNextFreeSlot as being consumed. |
754 | | */ |
755 | | static inline void |
756 | | CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri, |
757 | | TupleTableSlot *slot, int tuplen, uint64 lineno) |
758 | 0 | { |
759 | 0 | CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer; |
760 | |
|
761 | 0 | Assert(buffer != NULL); |
762 | 0 | Assert(slot == buffer->slots[buffer->nused]); |
763 | | |
764 | | /* Store the line number so we can properly report any errors later */ |
765 | 0 | buffer->linenos[buffer->nused] = lineno; |
766 | | |
767 | | /* Record this slot as being used */ |
768 | 0 | buffer->nused++; |
769 | | |
770 | | /* Update how many tuples are stored and their size */ |
771 | 0 | miinfo->bufferedTuples++; |
772 | 0 | miinfo->bufferedBytes += tuplen; |
773 | 0 | } |
774 | | |
775 | | /* |
776 | | * Copy FROM file to relation. |
777 | | */ |
778 | | uint64 |
779 | | CopyFrom(CopyFromState cstate) |
780 | 0 | { |
781 | 0 | ResultRelInfo *resultRelInfo; |
782 | 0 | ResultRelInfo *target_resultRelInfo; |
783 | 0 | ResultRelInfo *prevResultRelInfo = NULL; |
784 | 0 | EState *estate = CreateExecutorState(); /* for ExecConstraints() */ |
785 | 0 | ModifyTableState *mtstate; |
786 | 0 | ExprContext *econtext; |
787 | 0 | TupleTableSlot *singleslot = NULL; |
788 | 0 | MemoryContext oldcontext = CurrentMemoryContext; |
789 | |
|
790 | 0 | PartitionTupleRouting *proute = NULL; |
791 | 0 | ErrorContextCallback errcallback; |
792 | 0 | CommandId mycid = GetCurrentCommandId(true); |
793 | 0 | int ti_options = 0; /* start with default options for insert */ |
794 | 0 | BulkInsertState bistate = NULL; |
795 | 0 | CopyInsertMethod insertMethod; |
796 | 0 | CopyMultiInsertInfo multiInsertInfo = {0}; /* pacify compiler */ |
797 | 0 | int64 processed = 0; |
798 | 0 | int64 excluded = 0; |
799 | 0 | bool has_before_insert_row_trig; |
800 | 0 | bool has_instead_insert_row_trig; |
801 | 0 | bool leafpart_use_multi_insert = false; |
802 | |
|
803 | 0 | Assert(cstate->rel); |
804 | 0 | Assert(list_length(cstate->range_table) == 1); |
805 | |
|
806 | 0 | if (cstate->opts.on_error != COPY_ON_ERROR_STOP) |
807 | 0 | Assert(cstate->escontext); |
808 | | |
809 | | /* |
810 | | * The target must be a plain, foreign, or partitioned relation, or have |
811 | | * an INSTEAD OF INSERT row trigger. (Currently, such triggers are only |
812 | | * allowed on views, so we only hint about them in the view case.) |
813 | | */ |
814 | 0 | if (cstate->rel->rd_rel->relkind != RELKIND_RELATION && |
815 | 0 | cstate->rel->rd_rel->relkind != RELKIND_FOREIGN_TABLE && |
816 | 0 | cstate->rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE && |
817 | 0 | !(cstate->rel->trigdesc && |
818 | 0 | cstate->rel->trigdesc->trig_insert_instead_row)) |
819 | 0 | { |
820 | 0 | if (cstate->rel->rd_rel->relkind == RELKIND_VIEW) |
821 | 0 | ereport(ERROR, |
822 | 0 | (errcode(ERRCODE_WRONG_OBJECT_TYPE), |
823 | 0 | errmsg("cannot copy to view \"%s\"", |
824 | 0 | RelationGetRelationName(cstate->rel)), |
825 | 0 | errhint("To enable copying to a view, provide an INSTEAD OF INSERT trigger."))); |
826 | 0 | else if (cstate->rel->rd_rel->relkind == RELKIND_MATVIEW) |
827 | 0 | ereport(ERROR, |
828 | 0 | (errcode(ERRCODE_WRONG_OBJECT_TYPE), |
829 | 0 | errmsg("cannot copy to materialized view \"%s\"", |
830 | 0 | RelationGetRelationName(cstate->rel)))); |
831 | 0 | else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE) |
832 | 0 | ereport(ERROR, |
833 | 0 | (errcode(ERRCODE_WRONG_OBJECT_TYPE), |
834 | 0 | errmsg("cannot copy to sequence \"%s\"", |
835 | 0 | RelationGetRelationName(cstate->rel)))); |
836 | 0 | else |
837 | 0 | ereport(ERROR, |
838 | 0 | (errcode(ERRCODE_WRONG_OBJECT_TYPE), |
839 | 0 | errmsg("cannot copy to non-table relation \"%s\"", |
840 | 0 | RelationGetRelationName(cstate->rel)))); |
841 | 0 | } |
842 | | |
843 | | /* |
844 | | * If the target file is new-in-transaction, we assume that checking FSM |
845 | | * for free space is a waste of time. This could possibly be wrong, but |
846 | | * it's unlikely. |
847 | | */ |
848 | 0 | if (RELKIND_HAS_STORAGE(cstate->rel->rd_rel->relkind) && |
849 | 0 | (cstate->rel->rd_createSubid != InvalidSubTransactionId || |
850 | 0 | cstate->rel->rd_firstRelfilelocatorSubid != InvalidSubTransactionId)) |
851 | 0 | ti_options |= TABLE_INSERT_SKIP_FSM; |
852 | | |
853 | | /* |
854 | | * Optimize if new relation storage was created in this subxact or one of |
855 | | * its committed children and we won't see those rows later as part of an |
856 | | * earlier scan or command. The subxact test ensures that if this subxact |
857 | | * aborts then the frozen rows won't be visible after xact cleanup. Note |
858 | | * that the stronger test of exactly which subtransaction created it is |
859 | | * crucial for correctness of this optimization. The test for an earlier |
860 | | * scan or command tolerates false negatives. FREEZE causes other sessions |
861 | | * to see rows they would not see under MVCC, and a false negative merely |
862 | | * spreads that anomaly to the current session. |
863 | | */ |
864 | 0 | if (cstate->opts.freeze) |
865 | 0 | { |
866 | | /* |
867 | | * We currently disallow COPY FREEZE on partitioned tables. The |
868 | | * reason for this is that we've simply not yet opened the partitions |
869 | | * to determine if the optimization can be applied to them. We could |
870 | | * go and open them all here, but doing so may be quite a costly |
871 | | * overhead for small copies. In any case, we may just end up routing |
872 | | * tuples to a small number of partitions. It seems better just to |
873 | | * raise an ERROR for partitioned tables. |
874 | | */ |
875 | 0 | if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) |
876 | 0 | { |
877 | 0 | ereport(ERROR, |
878 | 0 | (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
879 | 0 | errmsg("cannot perform COPY FREEZE on a partitioned table"))); |
880 | 0 | } |
881 | | |
882 | | /* There's currently no support for COPY FREEZE on foreign tables. */ |
883 | 0 | if (cstate->rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE) |
884 | 0 | ereport(ERROR, |
885 | 0 | (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
886 | 0 | errmsg("cannot perform COPY FREEZE on a foreign table"))); |
887 | | |
888 | | /* |
889 | | * Tolerate one registration for the benefit of FirstXactSnapshot. |
890 | | * Scan-bearing queries generally create at least two registrations, |
891 | | * though relying on that is fragile, as is ignoring ActiveSnapshot. |
892 | | * Clear CatalogSnapshot to avoid counting its registration. We'll |
893 | | * still detect ongoing catalog scans, each of which separately |
894 | | * registers the snapshot it uses. |
895 | | */ |
896 | 0 | InvalidateCatalogSnapshot(); |
897 | 0 | if (!ThereAreNoPriorRegisteredSnapshots() || !ThereAreNoReadyPortals()) |
898 | 0 | ereport(ERROR, |
899 | 0 | (errcode(ERRCODE_INVALID_TRANSACTION_STATE), |
900 | 0 | errmsg("cannot perform COPY FREEZE because of prior transaction activity"))); |
901 | | |
902 | 0 | if (cstate->rel->rd_createSubid != GetCurrentSubTransactionId() && |
903 | 0 | cstate->rel->rd_newRelfilelocatorSubid != GetCurrentSubTransactionId()) |
904 | 0 | ereport(ERROR, |
905 | 0 | (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
906 | 0 | errmsg("cannot perform COPY FREEZE because the table was not created or truncated in the current subtransaction"))); |
907 | | |
908 | 0 | ti_options |= TABLE_INSERT_FROZEN; |
909 | 0 | } |
910 | | |
911 | | /* |
912 | | * We need a ResultRelInfo so we can use the regular executor's |
913 | | * index-entry-making machinery. (There used to be a huge amount of code |
914 | | * here that basically duplicated execUtils.c ...) |
915 | | */ |
916 | 0 | ExecInitRangeTable(estate, cstate->range_table, cstate->rteperminfos, |
917 | 0 | bms_make_singleton(1)); |
918 | 0 | resultRelInfo = target_resultRelInfo = makeNode(ResultRelInfo); |
919 | 0 | ExecInitResultRelation(estate, resultRelInfo, 1); |
920 | | |
921 | | /* Verify the named relation is a valid target for INSERT */ |
922 | 0 | CheckValidResultRel(resultRelInfo, CMD_INSERT, ONCONFLICT_NONE, NIL); |
923 | |
|
924 | 0 | ExecOpenIndices(resultRelInfo, false); |
925 | | |
926 | | /* |
927 | | * Set up a ModifyTableState so we can let FDW(s) init themselves for |
928 | | * foreign-table result relation(s). |
929 | | */ |
930 | 0 | mtstate = makeNode(ModifyTableState); |
931 | 0 | mtstate->ps.plan = NULL; |
932 | 0 | mtstate->ps.state = estate; |
933 | 0 | mtstate->operation = CMD_INSERT; |
934 | 0 | mtstate->mt_nrels = 1; |
935 | 0 | mtstate->resultRelInfo = resultRelInfo; |
936 | 0 | mtstate->rootResultRelInfo = resultRelInfo; |
937 | |
|
938 | 0 | if (resultRelInfo->ri_FdwRoutine != NULL && |
939 | 0 | resultRelInfo->ri_FdwRoutine->BeginForeignInsert != NULL) |
940 | 0 | resultRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate, |
941 | 0 | resultRelInfo); |
942 | | |
943 | | /* |
944 | | * Also, if the named relation is a foreign table, determine if the FDW |
945 | | * supports batch insert and determine the batch size (a FDW may support |
946 | | * batching, but it may be disabled for the server/table). |
947 | | * |
948 | | * If the FDW does not support batching, we set the batch size to 1. |
949 | | */ |
950 | 0 | if (resultRelInfo->ri_FdwRoutine != NULL && |
951 | 0 | resultRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize && |
952 | 0 | resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert) |
953 | 0 | resultRelInfo->ri_BatchSize = |
954 | 0 | resultRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize(resultRelInfo); |
955 | 0 | else |
956 | 0 | resultRelInfo->ri_BatchSize = 1; |
957 | |
|
958 | 0 | Assert(resultRelInfo->ri_BatchSize >= 1); |
959 | | |
960 | | /* Prepare to catch AFTER triggers. */ |
961 | 0 | AfterTriggerBeginQuery(); |
962 | | |
963 | | /* |
964 | | * If there are any triggers with transition tables on the named relation, |
965 | | * we need to be prepared to capture transition tuples. |
966 | | * |
967 | | * Because partition tuple routing would like to know about whether |
968 | | * transition capture is active, we also set it in mtstate, which is |
969 | | * passed to ExecFindPartition() below. |
970 | | */ |
971 | 0 | cstate->transition_capture = mtstate->mt_transition_capture = |
972 | 0 | MakeTransitionCaptureState(cstate->rel->trigdesc, |
973 | 0 | RelationGetRelid(cstate->rel), |
974 | 0 | CMD_INSERT); |
975 | | |
976 | | /* |
977 | | * If the named relation is a partitioned table, initialize state for |
978 | | * CopyFrom tuple routing. |
979 | | */ |
980 | 0 | if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) |
981 | 0 | proute = ExecSetupPartitionTupleRouting(estate, cstate->rel); |
982 | |
|
983 | 0 | if (cstate->whereClause) |
984 | 0 | cstate->qualexpr = ExecInitQual(castNode(List, cstate->whereClause), |
985 | 0 | &mtstate->ps); |
986 | | |
987 | | /* |
988 | | * It's generally more efficient to prepare a bunch of tuples for |
989 | | * insertion, and insert them in one |
990 | | * table_multi_insert()/ExecForeignBatchInsert() call, than call |
991 | | * table_tuple_insert()/ExecForeignInsert() separately for every tuple. |
992 | | * However, there are a number of reasons why we might not be able to do |
993 | | * this. These are explained below. |
994 | | */ |
995 | 0 | if (resultRelInfo->ri_TrigDesc != NULL && |
996 | 0 | (resultRelInfo->ri_TrigDesc->trig_insert_before_row || |
997 | 0 | resultRelInfo->ri_TrigDesc->trig_insert_instead_row)) |
998 | 0 | { |
999 | | /* |
1000 | | * Can't support multi-inserts when there are any BEFORE/INSTEAD OF |
1001 | | * triggers on the table. Such triggers might query the table we're |
1002 | | * inserting into and act differently if the tuples that have already |
1003 | | * been processed and prepared for insertion are not there. |
1004 | | */ |
1005 | 0 | insertMethod = CIM_SINGLE; |
1006 | 0 | } |
1007 | 0 | else if (resultRelInfo->ri_FdwRoutine != NULL && |
1008 | 0 | resultRelInfo->ri_BatchSize == 1) |
1009 | 0 | { |
1010 | | /* |
1011 | | * Can't support multi-inserts to a foreign table if the FDW does not |
1012 | | * support batching, or it's disabled for the server or foreign table. |
1013 | | */ |
1014 | 0 | insertMethod = CIM_SINGLE; |
1015 | 0 | } |
1016 | 0 | else if (proute != NULL && resultRelInfo->ri_TrigDesc != NULL && |
1017 | 0 | resultRelInfo->ri_TrigDesc->trig_insert_new_table) |
1018 | 0 | { |
1019 | | /* |
1020 | | * For partitioned tables we can't support multi-inserts when there |
1021 | | * are any statement level insert triggers. It might be possible to |
1022 | | * allow partitioned tables with such triggers in the future, but for |
1023 | | * now, CopyMultiInsertInfoFlush expects that any after row insert and |
1024 | | * statement level insert triggers are on the same relation. |
1025 | | */ |
1026 | 0 | insertMethod = CIM_SINGLE; |
1027 | 0 | } |
1028 | 0 | else if (cstate->volatile_defexprs) |
1029 | 0 | { |
1030 | | /* |
1031 | | * Can't support multi-inserts if there are any volatile default |
1032 | | * expressions in the table. Similarly to the trigger case above, |
1033 | | * such expressions may query the table we're inserting into. |
1034 | | * |
1035 | | * Note: It does not matter if any partitions have any volatile |
1036 | | * default expressions as we use the defaults from the target of the |
1037 | | * COPY command. |
1038 | | */ |
1039 | 0 | insertMethod = CIM_SINGLE; |
1040 | 0 | } |
1041 | 0 | else if (contain_volatile_functions(cstate->whereClause)) |
1042 | 0 | { |
1043 | | /* |
1044 | | * Can't support multi-inserts if there are any volatile function |
1045 | | * expressions in WHERE clause. Similarly to the trigger case above, |
1046 | | * such expressions may query the table we're inserting into. |
1047 | | * |
1048 | | * Note: the whereClause was already preprocessed in DoCopy(), so it's |
1049 | | * okay to use contain_volatile_functions() directly. |
1050 | | */ |
1051 | 0 | insertMethod = CIM_SINGLE; |
1052 | 0 | } |
1053 | 0 | else |
1054 | 0 | { |
1055 | | /* |
1056 | | * For partitioned tables, we may still be able to perform bulk |
1057 | | * inserts. However, the possibility of this depends on which types |
1058 | | * of triggers exist on the partition. We must disable bulk inserts |
1059 | | * if the partition is a foreign table that can't use batching or it |
1060 | | * has any before row insert or insert instead triggers (same as we |
1061 | | * checked above for the parent table). Since the partition's |
1062 | | * resultRelInfos are initialized only when we actually need to insert |
1063 | | * the first tuple into them, we must have the intermediate insert |
1064 | | * method of CIM_MULTI_CONDITIONAL to flag that we must later |
1065 | | * determine if we can use bulk-inserts for the partition being |
1066 | | * inserted into. |
1067 | | */ |
1068 | 0 | if (proute) |
1069 | 0 | insertMethod = CIM_MULTI_CONDITIONAL; |
1070 | 0 | else |
1071 | 0 | insertMethod = CIM_MULTI; |
1072 | |
|
1073 | 0 | CopyMultiInsertInfoInit(&multiInsertInfo, resultRelInfo, cstate, |
1074 | 0 | estate, mycid, ti_options); |
1075 | 0 | } |
1076 | | |
1077 | | /* |
1078 | | * If not using batch mode (which allocates slots as needed) set up a |
1079 | | * tuple slot too. When inserting into a partitioned table, we also need |
1080 | | * one, even if we might batch insert, to read the tuple in the root |
1081 | | * partition's form. |
1082 | | */ |
1083 | 0 | if (insertMethod == CIM_SINGLE || insertMethod == CIM_MULTI_CONDITIONAL) |
1084 | 0 | { |
1085 | 0 | singleslot = table_slot_create(resultRelInfo->ri_RelationDesc, |
1086 | 0 | &estate->es_tupleTable); |
1087 | 0 | bistate = GetBulkInsertState(); |
1088 | 0 | } |
1089 | |
|
1090 | 0 | has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc && |
1091 | 0 | resultRelInfo->ri_TrigDesc->trig_insert_before_row); |
1092 | |
|
1093 | 0 | has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc && |
1094 | 0 | resultRelInfo->ri_TrigDesc->trig_insert_instead_row); |
1095 | | |
1096 | | /* |
1097 | | * Check BEFORE STATEMENT insertion triggers. It's debatable whether we |
1098 | | * should do this for COPY, since it's not really an "INSERT" statement as |
1099 | | * such. However, executing these triggers maintains consistency with the |
1100 | | * EACH ROW triggers that we already fire on COPY. |
1101 | | */ |
1102 | 0 | ExecBSInsertTriggers(estate, resultRelInfo); |
1103 | |
|
1104 | 0 | econtext = GetPerTupleExprContext(estate); |
1105 | | |
1106 | | /* Set up callback to identify error line number */ |
1107 | 0 | errcallback.callback = CopyFromErrorCallback; |
1108 | 0 | errcallback.arg = cstate; |
1109 | 0 | errcallback.previous = error_context_stack; |
1110 | 0 | error_context_stack = &errcallback; |
1111 | |
|
1112 | 0 | for (;;) |
1113 | 0 | { |
1114 | 0 | TupleTableSlot *myslot; |
1115 | 0 | bool skip_tuple; |
1116 | |
|
1117 | 0 | CHECK_FOR_INTERRUPTS(); |
1118 | | |
1119 | | /* |
1120 | | * Reset the per-tuple exprcontext. We do this after every tuple, to |
1121 | | * clean-up after expression evaluations etc. |
1122 | | */ |
1123 | 0 | ResetPerTupleExprContext(estate); |
1124 | | |
1125 | | /* select slot to (initially) load row into */ |
1126 | 0 | if (insertMethod == CIM_SINGLE || proute) |
1127 | 0 | { |
1128 | 0 | myslot = singleslot; |
1129 | 0 | Assert(myslot != NULL); |
1130 | 0 | } |
1131 | 0 | else |
1132 | 0 | { |
1133 | 0 | Assert(resultRelInfo == target_resultRelInfo); |
1134 | 0 | Assert(insertMethod == CIM_MULTI); |
1135 | |
|
1136 | 0 | myslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo, |
1137 | 0 | resultRelInfo); |
1138 | 0 | } |
1139 | | |
1140 | | /* |
1141 | | * Switch to per-tuple context before calling NextCopyFrom, which does |
1142 | | * evaluate default expressions etc. and requires per-tuple context. |
1143 | | */ |
1144 | 0 | MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); |
1145 | |
|
1146 | 0 | ExecClearTuple(myslot); |
1147 | | |
1148 | | /* Directly store the values/nulls array in the slot */ |
1149 | 0 | if (!NextCopyFrom(cstate, econtext, myslot->tts_values, myslot->tts_isnull)) |
1150 | 0 | break; |
1151 | | |
1152 | 0 | if (cstate->opts.on_error == COPY_ON_ERROR_IGNORE && |
1153 | 0 | cstate->escontext->error_occurred) |
1154 | 0 | { |
1155 | | /* |
1156 | | * Soft error occurred, skip this tuple and just make |
1157 | | * ErrorSaveContext ready for the next NextCopyFrom. Since we |
1158 | | * don't set details_wanted and error_data is not to be filled, |
1159 | | * just resetting error_occurred is enough. |
1160 | | */ |
1161 | 0 | cstate->escontext->error_occurred = false; |
1162 | | |
1163 | | /* Report that this tuple was skipped by the ON_ERROR clause */ |
1164 | 0 | pgstat_progress_update_param(PROGRESS_COPY_TUPLES_SKIPPED, |
1165 | 0 | cstate->num_errors); |
1166 | |
|
1167 | 0 | if (cstate->opts.reject_limit > 0 && |
1168 | 0 | cstate->num_errors > cstate->opts.reject_limit) |
1169 | 0 | ereport(ERROR, |
1170 | 0 | (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION), |
1171 | 0 | errmsg("skipped more than REJECT_LIMIT (%" PRId64 ") rows due to data type incompatibility", |
1172 | 0 | cstate->opts.reject_limit))); |
1173 | | |
1174 | | /* Repeat NextCopyFrom() until no soft error occurs */ |
1175 | 0 | continue; |
1176 | 0 | } |
1177 | | |
1178 | 0 | ExecStoreVirtualTuple(myslot); |
1179 | | |
1180 | | /* |
1181 | | * Constraints and where clause might reference the tableoid column, |
1182 | | * so (re-)initialize tts_tableOid before evaluating them. |
1183 | | */ |
1184 | 0 | myslot->tts_tableOid = RelationGetRelid(target_resultRelInfo->ri_RelationDesc); |
1185 | | |
1186 | | /* Triggers and stuff need to be invoked in query context. */ |
1187 | 0 | MemoryContextSwitchTo(oldcontext); |
1188 | |
|
1189 | 0 | if (cstate->whereClause) |
1190 | 0 | { |
1191 | 0 | econtext->ecxt_scantuple = myslot; |
1192 | | /* Skip items that don't match COPY's WHERE clause */ |
1193 | 0 | if (!ExecQual(cstate->qualexpr, econtext)) |
1194 | 0 | { |
1195 | | /* |
1196 | | * Report that this tuple was filtered out by the WHERE |
1197 | | * clause. |
1198 | | */ |
1199 | 0 | pgstat_progress_update_param(PROGRESS_COPY_TUPLES_EXCLUDED, |
1200 | 0 | ++excluded); |
1201 | 0 | continue; |
1202 | 0 | } |
1203 | 0 | } |
1204 | | |
1205 | | /* Determine the partition to insert the tuple into */ |
1206 | 0 | if (proute) |
1207 | 0 | { |
1208 | 0 | TupleConversionMap *map; |
1209 | | |
1210 | | /* |
1211 | | * Attempt to find a partition suitable for this tuple. |
1212 | | * ExecFindPartition() will raise an error if none can be found or |
1213 | | * if the found partition is not suitable for INSERTs. |
1214 | | */ |
1215 | 0 | resultRelInfo = ExecFindPartition(mtstate, target_resultRelInfo, |
1216 | 0 | proute, myslot, estate); |
1217 | |
|
1218 | 0 | if (prevResultRelInfo != resultRelInfo) |
1219 | 0 | { |
1220 | | /* Determine which triggers exist on this partition */ |
1221 | 0 | has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc && |
1222 | 0 | resultRelInfo->ri_TrigDesc->trig_insert_before_row); |
1223 | |
|
1224 | 0 | has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc && |
1225 | 0 | resultRelInfo->ri_TrigDesc->trig_insert_instead_row); |
1226 | | |
1227 | | /* |
1228 | | * Disable multi-inserts when the partition has BEFORE/INSTEAD |
1229 | | * OF triggers, or if the partition is a foreign table that |
1230 | | * can't use batching. |
1231 | | */ |
1232 | 0 | leafpart_use_multi_insert = insertMethod == CIM_MULTI_CONDITIONAL && |
1233 | 0 | !has_before_insert_row_trig && |
1234 | 0 | !has_instead_insert_row_trig && |
1235 | 0 | (resultRelInfo->ri_FdwRoutine == NULL || |
1236 | 0 | resultRelInfo->ri_BatchSize > 1); |
1237 | | |
1238 | | /* Set the multi-insert buffer to use for this partition. */ |
1239 | 0 | if (leafpart_use_multi_insert) |
1240 | 0 | { |
1241 | 0 | if (resultRelInfo->ri_CopyMultiInsertBuffer == NULL) |
1242 | 0 | CopyMultiInsertInfoSetupBuffer(&multiInsertInfo, |
1243 | 0 | resultRelInfo); |
1244 | 0 | } |
1245 | 0 | else if (insertMethod == CIM_MULTI_CONDITIONAL && |
1246 | 0 | !CopyMultiInsertInfoIsEmpty(&multiInsertInfo)) |
1247 | 0 | { |
1248 | | /* |
1249 | | * Flush pending inserts if this partition can't use |
1250 | | * batching, so rows are visible to triggers etc. |
1251 | | */ |
1252 | 0 | CopyMultiInsertInfoFlush(&multiInsertInfo, |
1253 | 0 | resultRelInfo, |
1254 | 0 | &processed); |
1255 | 0 | } |
1256 | |
|
1257 | 0 | if (bistate != NULL) |
1258 | 0 | ReleaseBulkInsertStatePin(bistate); |
1259 | 0 | prevResultRelInfo = resultRelInfo; |
1260 | 0 | } |
1261 | | |
1262 | | /* |
1263 | | * If we're capturing transition tuples, we might need to convert |
1264 | | * from the partition rowtype to root rowtype. But if there are no |
1265 | | * BEFORE triggers on the partition that could change the tuple, |
1266 | | * we can just remember the original unconverted tuple to avoid a |
1267 | | * needless round trip conversion. |
1268 | | */ |
1269 | 0 | if (cstate->transition_capture != NULL) |
1270 | 0 | cstate->transition_capture->tcs_original_insert_tuple = |
1271 | 0 | !has_before_insert_row_trig ? myslot : NULL; |
1272 | | |
1273 | | /* |
1274 | | * We might need to convert from the root rowtype to the partition |
1275 | | * rowtype. |
1276 | | */ |
1277 | 0 | map = ExecGetRootToChildMap(resultRelInfo, estate); |
1278 | 0 | if (insertMethod == CIM_SINGLE || !leafpart_use_multi_insert) |
1279 | 0 | { |
1280 | | /* non batch insert */ |
1281 | 0 | if (map != NULL) |
1282 | 0 | { |
1283 | 0 | TupleTableSlot *new_slot; |
1284 | |
|
1285 | 0 | new_slot = resultRelInfo->ri_PartitionTupleSlot; |
1286 | 0 | myslot = execute_attr_map_slot(map->attrMap, myslot, new_slot); |
1287 | 0 | } |
1288 | 0 | } |
1289 | 0 | else |
1290 | 0 | { |
1291 | | /* |
1292 | | * Prepare to queue up tuple for later batch insert into |
1293 | | * current partition. |
1294 | | */ |
1295 | 0 | TupleTableSlot *batchslot; |
1296 | | |
1297 | | /* no other path available for partitioned table */ |
1298 | 0 | Assert(insertMethod == CIM_MULTI_CONDITIONAL); |
1299 | |
|
1300 | 0 | batchslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo, |
1301 | 0 | resultRelInfo); |
1302 | |
|
1303 | 0 | if (map != NULL) |
1304 | 0 | myslot = execute_attr_map_slot(map->attrMap, myslot, |
1305 | 0 | batchslot); |
1306 | 0 | else |
1307 | 0 | { |
1308 | | /* |
1309 | | * This looks more expensive than it is (Believe me, I |
1310 | | * optimized it away. Twice.). The input is in virtual |
1311 | | * form, and we'll materialize the slot below - for most |
1312 | | * slot types the copy performs the work materialization |
1313 | | * would later require anyway. |
1314 | | */ |
1315 | 0 | ExecCopySlot(batchslot, myslot); |
1316 | 0 | myslot = batchslot; |
1317 | 0 | } |
1318 | 0 | } |
1319 | | |
1320 | | /* ensure that triggers etc see the right relation */ |
1321 | 0 | myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc); |
1322 | 0 | } |
1323 | |
|
1324 | 0 | skip_tuple = false; |
1325 | | |
1326 | | /* BEFORE ROW INSERT Triggers */ |
1327 | 0 | if (has_before_insert_row_trig) |
1328 | 0 | { |
1329 | 0 | if (!ExecBRInsertTriggers(estate, resultRelInfo, myslot)) |
1330 | 0 | skip_tuple = true; /* "do nothing" */ |
1331 | 0 | } |
1332 | |
|
1333 | 0 | if (!skip_tuple) |
1334 | 0 | { |
1335 | | /* |
1336 | | * If there is an INSTEAD OF INSERT ROW trigger, let it handle the |
1337 | | * tuple. Otherwise, proceed with inserting the tuple into the |
1338 | | * table or foreign table. |
1339 | | */ |
1340 | 0 | if (has_instead_insert_row_trig) |
1341 | 0 | { |
1342 | 0 | ExecIRInsertTriggers(estate, resultRelInfo, myslot); |
1343 | 0 | } |
1344 | 0 | else |
1345 | 0 | { |
1346 | | /* Compute stored generated columns */ |
1347 | 0 | if (resultRelInfo->ri_RelationDesc->rd_att->constr && |
1348 | 0 | resultRelInfo->ri_RelationDesc->rd_att->constr->has_generated_stored) |
1349 | 0 | ExecComputeStoredGenerated(resultRelInfo, estate, myslot, |
1350 | 0 | CMD_INSERT); |
1351 | | |
1352 | | /* |
1353 | | * If the target is a plain table, check the constraints of |
1354 | | * the tuple. |
1355 | | */ |
1356 | 0 | if (resultRelInfo->ri_FdwRoutine == NULL && |
1357 | 0 | resultRelInfo->ri_RelationDesc->rd_att->constr) |
1358 | 0 | ExecConstraints(resultRelInfo, myslot, estate); |
1359 | | |
1360 | | /* |
1361 | | * Also check the tuple against the partition constraint, if |
1362 | | * there is one; except that if we got here via tuple-routing, |
1363 | | * we don't need to if there's no BR trigger defined on the |
1364 | | * partition. |
1365 | | */ |
1366 | 0 | if (resultRelInfo->ri_RelationDesc->rd_rel->relispartition && |
1367 | 0 | (proute == NULL || has_before_insert_row_trig)) |
1368 | 0 | ExecPartitionCheck(resultRelInfo, myslot, estate, true); |
1369 | | |
1370 | | /* Store the slot in the multi-insert buffer, when enabled. */ |
1371 | 0 | if (insertMethod == CIM_MULTI || leafpart_use_multi_insert) |
1372 | 0 | { |
1373 | | /* |
1374 | | * The slot previously might point into the per-tuple |
1375 | | * context. For batching it needs to be longer lived. |
1376 | | */ |
1377 | 0 | ExecMaterializeSlot(myslot); |
1378 | | |
1379 | | /* Add this tuple to the tuple buffer */ |
1380 | 0 | CopyMultiInsertInfoStore(&multiInsertInfo, |
1381 | 0 | resultRelInfo, myslot, |
1382 | 0 | cstate->line_buf.len, |
1383 | 0 | cstate->cur_lineno); |
1384 | | |
1385 | | /* |
1386 | | * If enough inserts have queued up, then flush all |
1387 | | * buffers out to their tables. |
1388 | | */ |
1389 | 0 | if (CopyMultiInsertInfoIsFull(&multiInsertInfo)) |
1390 | 0 | CopyMultiInsertInfoFlush(&multiInsertInfo, |
1391 | 0 | resultRelInfo, |
1392 | 0 | &processed); |
1393 | | |
1394 | | /* |
1395 | | * We delay updating the row counter and progress of the |
1396 | | * COPY command until after writing the tuples stored in |
1397 | | * the buffer out to the table, as in single insert mode. |
1398 | | * See CopyMultiInsertBufferFlush(). |
1399 | | */ |
1400 | 0 | continue; /* next tuple please */ |
1401 | 0 | } |
1402 | 0 | else |
1403 | 0 | { |
1404 | 0 | List *recheckIndexes = NIL; |
1405 | | |
1406 | | /* OK, store the tuple */ |
1407 | 0 | if (resultRelInfo->ri_FdwRoutine != NULL) |
1408 | 0 | { |
1409 | 0 | myslot = resultRelInfo->ri_FdwRoutine->ExecForeignInsert(estate, |
1410 | 0 | resultRelInfo, |
1411 | 0 | myslot, |
1412 | 0 | NULL); |
1413 | |
|
1414 | 0 | if (myslot == NULL) /* "do nothing" */ |
1415 | 0 | continue; /* next tuple please */ |
1416 | | |
1417 | | /* |
1418 | | * AFTER ROW Triggers might reference the tableoid |
1419 | | * column, so (re-)initialize tts_tableOid before |
1420 | | * evaluating them. |
1421 | | */ |
1422 | 0 | myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc); |
1423 | 0 | } |
1424 | 0 | else |
1425 | 0 | { |
1426 | | /* OK, store the tuple and create index entries for it */ |
1427 | 0 | table_tuple_insert(resultRelInfo->ri_RelationDesc, |
1428 | 0 | myslot, mycid, ti_options, bistate); |
1429 | |
|
1430 | 0 | if (resultRelInfo->ri_NumIndices > 0) |
1431 | 0 | recheckIndexes = ExecInsertIndexTuples(resultRelInfo, |
1432 | 0 | myslot, |
1433 | 0 | estate, |
1434 | 0 | false, |
1435 | 0 | false, |
1436 | 0 | NULL, |
1437 | 0 | NIL, |
1438 | 0 | false); |
1439 | 0 | } |
1440 | | |
1441 | | /* AFTER ROW INSERT Triggers */ |
1442 | 0 | ExecARInsertTriggers(estate, resultRelInfo, myslot, |
1443 | 0 | recheckIndexes, cstate->transition_capture); |
1444 | |
|
1445 | 0 | list_free(recheckIndexes); |
1446 | 0 | } |
1447 | 0 | } |
1448 | | |
1449 | | /* |
1450 | | * We count only tuples not suppressed by a BEFORE INSERT trigger |
1451 | | * or FDW; this is the same definition used by nodeModifyTable.c |
1452 | | * for counting tuples inserted by an INSERT command. Update |
1453 | | * progress of the COPY command as well. |
1454 | | */ |
1455 | 0 | pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED, |
1456 | 0 | ++processed); |
1457 | 0 | } |
1458 | 0 | } |
1459 | | |
1460 | | /* Flush any remaining buffered tuples */ |
1461 | 0 | if (insertMethod != CIM_SINGLE) |
1462 | 0 | { |
1463 | 0 | if (!CopyMultiInsertInfoIsEmpty(&multiInsertInfo)) |
1464 | 0 | CopyMultiInsertInfoFlush(&multiInsertInfo, NULL, &processed); |
1465 | 0 | } |
1466 | | |
1467 | | /* Done, clean up */ |
1468 | 0 | error_context_stack = errcallback.previous; |
1469 | |
|
1470 | 0 | if (cstate->opts.on_error != COPY_ON_ERROR_STOP && |
1471 | 0 | cstate->num_errors > 0 && |
1472 | 0 | cstate->opts.log_verbosity >= COPY_LOG_VERBOSITY_DEFAULT) |
1473 | 0 | ereport(NOTICE, |
1474 | 0 | errmsg_plural("%" PRIu64 " row was skipped due to data type incompatibility", |
1475 | 0 | "%" PRIu64 " rows were skipped due to data type incompatibility", |
1476 | 0 | cstate->num_errors, |
1477 | 0 | cstate->num_errors)); |
1478 | | |
1479 | 0 | if (bistate != NULL) |
1480 | 0 | FreeBulkInsertState(bistate); |
1481 | |
|
1482 | 0 | MemoryContextSwitchTo(oldcontext); |
1483 | | |
1484 | | /* Execute AFTER STATEMENT insertion triggers */ |
1485 | 0 | ExecASInsertTriggers(estate, target_resultRelInfo, cstate->transition_capture); |
1486 | | |
1487 | | /* Handle queued AFTER triggers */ |
1488 | 0 | AfterTriggerEndQuery(estate); |
1489 | |
|
1490 | 0 | ExecResetTupleTable(estate->es_tupleTable, false); |
1491 | | |
1492 | | /* Allow the FDW to shut down */ |
1493 | 0 | if (target_resultRelInfo->ri_FdwRoutine != NULL && |
1494 | 0 | target_resultRelInfo->ri_FdwRoutine->EndForeignInsert != NULL) |
1495 | 0 | target_resultRelInfo->ri_FdwRoutine->EndForeignInsert(estate, |
1496 | 0 | target_resultRelInfo); |
1497 | | |
1498 | | /* Tear down the multi-insert buffer data */ |
1499 | 0 | if (insertMethod != CIM_SINGLE) |
1500 | 0 | CopyMultiInsertInfoCleanup(&multiInsertInfo); |
1501 | | |
1502 | | /* Close all the partitioned tables, leaf partitions, and their indices */ |
1503 | 0 | if (proute) |
1504 | 0 | ExecCleanupTupleRouting(mtstate, proute); |
1505 | | |
1506 | | /* Close the result relations, including any trigger target relations */ |
1507 | 0 | ExecCloseResultRelations(estate); |
1508 | 0 | ExecCloseRangeTableRelations(estate); |
1509 | |
|
1510 | 0 | FreeExecutorState(estate); |
1511 | |
|
1512 | 0 | return processed; |
1513 | 0 | } |
1514 | | |
1515 | | /* |
1516 | | * Setup to read tuples from a file for COPY FROM. |
1517 | | * |
1518 | | * 'rel': Used as a template for the tuples |
1519 | | * 'whereClause': WHERE clause from the COPY FROM command |
1520 | | * 'filename': Name of server-local file to read, NULL for STDIN |
1521 | | * 'is_program': true if 'filename' is program to execute |
1522 | | * 'data_source_cb': callback that provides the input data |
1523 | | * 'attnamelist': List of char *, columns to include. NIL selects all cols. |
1524 | | * 'options': List of DefElem. See copy_opt_item in gram.y for selections. |
1525 | | * |
1526 | | * Returns a CopyFromState, to be passed to NextCopyFrom and related functions. |
1527 | | */ |
1528 | | CopyFromState |
1529 | | BeginCopyFrom(ParseState *pstate, |
1530 | | Relation rel, |
1531 | | Node *whereClause, |
1532 | | const char *filename, |
1533 | | bool is_program, |
1534 | | copy_data_source_cb data_source_cb, |
1535 | | List *attnamelist, |
1536 | | List *options) |
1537 | 0 | { |
1538 | 0 | CopyFromState cstate; |
1539 | 0 | bool pipe = (filename == NULL); |
1540 | 0 | TupleDesc tupDesc; |
1541 | 0 | AttrNumber num_phys_attrs, |
1542 | 0 | num_defaults; |
1543 | 0 | FmgrInfo *in_functions; |
1544 | 0 | Oid *typioparams; |
1545 | 0 | int *defmap; |
1546 | 0 | ExprState **defexprs; |
1547 | 0 | MemoryContext oldcontext; |
1548 | 0 | bool volatile_defexprs; |
1549 | 0 | const int progress_cols[] = { |
1550 | 0 | PROGRESS_COPY_COMMAND, |
1551 | 0 | PROGRESS_COPY_TYPE, |
1552 | 0 | PROGRESS_COPY_BYTES_TOTAL |
1553 | 0 | }; |
1554 | 0 | int64 progress_vals[] = { |
1555 | 0 | PROGRESS_COPY_COMMAND_FROM, |
1556 | 0 | 0, |
1557 | 0 | 0 |
1558 | 0 | }; |
1559 | | |
1560 | | /* Allocate workspace and zero all fields */ |
1561 | 0 | cstate = (CopyFromStateData *) palloc0(sizeof(CopyFromStateData)); |
1562 | | |
1563 | | /* |
1564 | | * We allocate everything used by a cstate in a new memory context. This |
1565 | | * avoids memory leaks during repeated use of COPY in a query. |
1566 | | */ |
1567 | 0 | cstate->copycontext = AllocSetContextCreate(CurrentMemoryContext, |
1568 | 0 | "COPY", |
1569 | 0 | ALLOCSET_DEFAULT_SIZES); |
1570 | |
|
1571 | 0 | oldcontext = MemoryContextSwitchTo(cstate->copycontext); |
1572 | | |
1573 | | /* Extract options from the statement node tree */ |
1574 | 0 | ProcessCopyOptions(pstate, &cstate->opts, true /* is_from */ , options); |
1575 | | |
1576 | | /* Set the format routine */ |
1577 | 0 | cstate->routine = CopyFromGetRoutine(&cstate->opts); |
1578 | | |
1579 | | /* Process the target relation */ |
1580 | 0 | cstate->rel = rel; |
1581 | |
|
1582 | 0 | tupDesc = RelationGetDescr(cstate->rel); |
1583 | | |
1584 | | /* process common options or initialization */ |
1585 | | |
1586 | | /* Generate or convert list of attributes to process */ |
1587 | 0 | cstate->attnumlist = CopyGetAttnums(tupDesc, cstate->rel, attnamelist); |
1588 | |
|
1589 | 0 | num_phys_attrs = tupDesc->natts; |
1590 | | |
1591 | | /* Convert FORCE_NOT_NULL name list to per-column flags, check validity */ |
1592 | 0 | cstate->opts.force_notnull_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool)); |
1593 | 0 | if (cstate->opts.force_notnull_all) |
1594 | 0 | MemSet(cstate->opts.force_notnull_flags, true, num_phys_attrs * sizeof(bool)); |
1595 | 0 | else if (cstate->opts.force_notnull) |
1596 | 0 | { |
1597 | 0 | List *attnums; |
1598 | 0 | ListCell *cur; |
1599 | |
|
1600 | 0 | attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.force_notnull); |
1601 | |
|
1602 | 0 | foreach(cur, attnums) |
1603 | 0 | { |
1604 | 0 | int attnum = lfirst_int(cur); |
1605 | 0 | Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1); |
1606 | |
|
1607 | 0 | if (!list_member_int(cstate->attnumlist, attnum)) |
1608 | 0 | ereport(ERROR, |
1609 | 0 | (errcode(ERRCODE_INVALID_COLUMN_REFERENCE), |
1610 | | /*- translator: first %s is the name of a COPY option, e.g. FORCE_NOT_NULL */ |
1611 | 0 | errmsg("%s column \"%s\" not referenced by COPY", |
1612 | 0 | "FORCE_NOT_NULL", NameStr(attr->attname)))); |
1613 | 0 | cstate->opts.force_notnull_flags[attnum - 1] = true; |
1614 | 0 | } |
1615 | 0 | } |
1616 | | |
1617 | | /* Set up soft error handler for ON_ERROR */ |
1618 | 0 | if (cstate->opts.on_error != COPY_ON_ERROR_STOP) |
1619 | 0 | { |
1620 | 0 | cstate->escontext = makeNode(ErrorSaveContext); |
1621 | 0 | cstate->escontext->type = T_ErrorSaveContext; |
1622 | 0 | cstate->escontext->error_occurred = false; |
1623 | | |
1624 | | /* |
1625 | | * Currently we only support COPY_ON_ERROR_IGNORE. We'll add other |
1626 | | * options later |
1627 | | */ |
1628 | 0 | if (cstate->opts.on_error == COPY_ON_ERROR_IGNORE) |
1629 | 0 | cstate->escontext->details_wanted = false; |
1630 | 0 | } |
1631 | 0 | else |
1632 | 0 | cstate->escontext = NULL; |
1633 | | |
1634 | | /* Convert FORCE_NULL name list to per-column flags, check validity */ |
1635 | 0 | cstate->opts.force_null_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool)); |
1636 | 0 | if (cstate->opts.force_null_all) |
1637 | 0 | MemSet(cstate->opts.force_null_flags, true, num_phys_attrs * sizeof(bool)); |
1638 | 0 | else if (cstate->opts.force_null) |
1639 | 0 | { |
1640 | 0 | List *attnums; |
1641 | 0 | ListCell *cur; |
1642 | |
|
1643 | 0 | attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.force_null); |
1644 | |
|
1645 | 0 | foreach(cur, attnums) |
1646 | 0 | { |
1647 | 0 | int attnum = lfirst_int(cur); |
1648 | 0 | Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1); |
1649 | |
|
1650 | 0 | if (!list_member_int(cstate->attnumlist, attnum)) |
1651 | 0 | ereport(ERROR, |
1652 | 0 | (errcode(ERRCODE_INVALID_COLUMN_REFERENCE), |
1653 | | /*- translator: first %s is the name of a COPY option, e.g. FORCE_NOT_NULL */ |
1654 | 0 | errmsg("%s column \"%s\" not referenced by COPY", |
1655 | 0 | "FORCE_NULL", NameStr(attr->attname)))); |
1656 | 0 | cstate->opts.force_null_flags[attnum - 1] = true; |
1657 | 0 | } |
1658 | 0 | } |
1659 | | |
1660 | | /* Convert convert_selectively name list to per-column flags */ |
1661 | 0 | if (cstate->opts.convert_selectively) |
1662 | 0 | { |
1663 | 0 | List *attnums; |
1664 | 0 | ListCell *cur; |
1665 | |
|
1666 | 0 | cstate->convert_select_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool)); |
1667 | |
|
1668 | 0 | attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.convert_select); |
1669 | |
|
1670 | 0 | foreach(cur, attnums) |
1671 | 0 | { |
1672 | 0 | int attnum = lfirst_int(cur); |
1673 | 0 | Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1); |
1674 | |
|
1675 | 0 | if (!list_member_int(cstate->attnumlist, attnum)) |
1676 | 0 | ereport(ERROR, |
1677 | 0 | (errcode(ERRCODE_INVALID_COLUMN_REFERENCE), |
1678 | 0 | errmsg_internal("selected column \"%s\" not referenced by COPY", |
1679 | 0 | NameStr(attr->attname)))); |
1680 | 0 | cstate->convert_select_flags[attnum - 1] = true; |
1681 | 0 | } |
1682 | 0 | } |
1683 | | |
1684 | | /* Use client encoding when ENCODING option is not specified. */ |
1685 | 0 | if (cstate->opts.file_encoding < 0) |
1686 | 0 | cstate->file_encoding = pg_get_client_encoding(); |
1687 | 0 | else |
1688 | 0 | cstate->file_encoding = cstate->opts.file_encoding; |
1689 | | |
1690 | | /* |
1691 | | * Look up encoding conversion function. |
1692 | | */ |
1693 | 0 | if (cstate->file_encoding == GetDatabaseEncoding() || |
1694 | 0 | cstate->file_encoding == PG_SQL_ASCII || |
1695 | 0 | GetDatabaseEncoding() == PG_SQL_ASCII) |
1696 | 0 | { |
1697 | 0 | cstate->need_transcoding = false; |
1698 | 0 | } |
1699 | 0 | else |
1700 | 0 | { |
1701 | 0 | cstate->need_transcoding = true; |
1702 | 0 | cstate->conversion_proc = FindDefaultConversionProc(cstate->file_encoding, |
1703 | 0 | GetDatabaseEncoding()); |
1704 | 0 | if (!OidIsValid(cstate->conversion_proc)) |
1705 | 0 | ereport(ERROR, |
1706 | 0 | (errcode(ERRCODE_UNDEFINED_FUNCTION), |
1707 | 0 | errmsg("default conversion function for encoding \"%s\" to \"%s\" does not exist", |
1708 | 0 | pg_encoding_to_char(cstate->file_encoding), |
1709 | 0 | pg_encoding_to_char(GetDatabaseEncoding())))); |
1710 | 0 | } |
1711 | | |
1712 | 0 | cstate->copy_src = COPY_FILE; /* default */ |
1713 | |
|
1714 | 0 | cstate->whereClause = whereClause; |
1715 | | |
1716 | | /* Initialize state variables */ |
1717 | 0 | cstate->eol_type = EOL_UNKNOWN; |
1718 | 0 | cstate->cur_relname = RelationGetRelationName(cstate->rel); |
1719 | 0 | cstate->cur_lineno = 0; |
1720 | 0 | cstate->cur_attname = NULL; |
1721 | 0 | cstate->cur_attval = NULL; |
1722 | 0 | cstate->relname_only = false; |
1723 | | |
1724 | | /* |
1725 | | * Allocate buffers for the input pipeline. |
1726 | | * |
1727 | | * attribute_buf and raw_buf are used in both text and binary modes, but |
1728 | | * input_buf and line_buf only in text mode. |
1729 | | */ |
1730 | 0 | cstate->raw_buf = palloc(RAW_BUF_SIZE + 1); |
1731 | 0 | cstate->raw_buf_index = cstate->raw_buf_len = 0; |
1732 | 0 | cstate->raw_reached_eof = false; |
1733 | |
|
1734 | 0 | initStringInfo(&cstate->attribute_buf); |
1735 | | |
1736 | | /* Assign range table and rteperminfos, we'll need them in CopyFrom. */ |
1737 | 0 | if (pstate) |
1738 | 0 | { |
1739 | 0 | cstate->range_table = pstate->p_rtable; |
1740 | 0 | cstate->rteperminfos = pstate->p_rteperminfos; |
1741 | 0 | } |
1742 | |
|
1743 | 0 | num_defaults = 0; |
1744 | 0 | volatile_defexprs = false; |
1745 | | |
1746 | | /* |
1747 | | * Pick up the required catalog information for each attribute in the |
1748 | | * relation, including the input function, the element type (to pass to |
1749 | | * the input function), and info about defaults and constraints. (Which |
1750 | | * input function we use depends on text/binary format choice.) |
1751 | | */ |
1752 | 0 | in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo)); |
1753 | 0 | typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid)); |
1754 | 0 | defmap = (int *) palloc(num_phys_attrs * sizeof(int)); |
1755 | 0 | defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *)); |
1756 | |
|
1757 | 0 | for (int attnum = 1; attnum <= num_phys_attrs; attnum++) |
1758 | 0 | { |
1759 | 0 | Form_pg_attribute att = TupleDescAttr(tupDesc, attnum - 1); |
1760 | | |
1761 | | /* We don't need info for dropped attributes */ |
1762 | 0 | if (att->attisdropped) |
1763 | 0 | continue; |
1764 | | |
1765 | | /* Fetch the input function and typioparam info */ |
1766 | 0 | cstate->routine->CopyFromInFunc(cstate, att->atttypid, |
1767 | 0 | &in_functions[attnum - 1], |
1768 | 0 | &typioparams[attnum - 1]); |
1769 | | |
1770 | | /* Get default info if available */ |
1771 | 0 | defexprs[attnum - 1] = NULL; |
1772 | | |
1773 | | /* |
1774 | | * We only need the default values for columns that do not appear in |
1775 | | * the column list, unless the DEFAULT option was given. We never need |
1776 | | * default values for generated columns. |
1777 | | */ |
1778 | 0 | if ((cstate->opts.default_print != NULL || |
1779 | 0 | !list_member_int(cstate->attnumlist, attnum)) && |
1780 | 0 | !att->attgenerated) |
1781 | 0 | { |
1782 | 0 | Expr *defexpr = (Expr *) build_column_default(cstate->rel, |
1783 | 0 | attnum); |
1784 | |
|
1785 | 0 | if (defexpr != NULL) |
1786 | 0 | { |
1787 | | /* Run the expression through planner */ |
1788 | 0 | defexpr = expression_planner(defexpr); |
1789 | | |
1790 | | /* Initialize executable expression in copycontext */ |
1791 | 0 | defexprs[attnum - 1] = ExecInitExpr(defexpr, NULL); |
1792 | | |
1793 | | /* if NOT copied from input */ |
1794 | | /* use default value if one exists */ |
1795 | 0 | if (!list_member_int(cstate->attnumlist, attnum)) |
1796 | 0 | { |
1797 | 0 | defmap[num_defaults] = attnum - 1; |
1798 | 0 | num_defaults++; |
1799 | 0 | } |
1800 | | |
1801 | | /* |
1802 | | * If a default expression looks at the table being loaded, |
1803 | | * then it could give the wrong answer when using |
1804 | | * multi-insert. Since database access can be dynamic this is |
1805 | | * hard to test for exactly, so we use the much wider test of |
1806 | | * whether the default expression is volatile. We allow for |
1807 | | * the special case of when the default expression is the |
1808 | | * nextval() of a sequence which in this specific case is |
1809 | | * known to be safe for use with the multi-insert |
1810 | | * optimization. Hence we use this special case function |
1811 | | * checker rather than the standard check for |
1812 | | * contain_volatile_functions(). Note also that we already |
1813 | | * ran the expression through expression_planner(). |
1814 | | */ |
1815 | 0 | if (!volatile_defexprs) |
1816 | 0 | volatile_defexprs = contain_volatile_functions_not_nextval((Node *) defexpr); |
1817 | 0 | } |
1818 | 0 | } |
1819 | 0 | } |
1820 | |
|
1821 | 0 | cstate->defaults = (bool *) palloc0(tupDesc->natts * sizeof(bool)); |
1822 | | |
1823 | | /* initialize progress */ |
1824 | 0 | pgstat_progress_start_command(PROGRESS_COMMAND_COPY, |
1825 | 0 | cstate->rel ? RelationGetRelid(cstate->rel) : InvalidOid); |
1826 | 0 | cstate->bytes_processed = 0; |
1827 | | |
1828 | | /* We keep those variables in cstate. */ |
1829 | 0 | cstate->in_functions = in_functions; |
1830 | 0 | cstate->typioparams = typioparams; |
1831 | 0 | cstate->defmap = defmap; |
1832 | 0 | cstate->defexprs = defexprs; |
1833 | 0 | cstate->volatile_defexprs = volatile_defexprs; |
1834 | 0 | cstate->num_defaults = num_defaults; |
1835 | 0 | cstate->is_program = is_program; |
1836 | |
|
1837 | 0 | if (data_source_cb) |
1838 | 0 | { |
1839 | 0 | progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK; |
1840 | 0 | cstate->copy_src = COPY_CALLBACK; |
1841 | 0 | cstate->data_source_cb = data_source_cb; |
1842 | 0 | } |
1843 | 0 | else if (pipe) |
1844 | 0 | { |
1845 | 0 | progress_vals[1] = PROGRESS_COPY_TYPE_PIPE; |
1846 | 0 | Assert(!is_program); /* the grammar does not allow this */ |
1847 | 0 | if (whereToSendOutput == DestRemote) |
1848 | 0 | ReceiveCopyBegin(cstate); |
1849 | 0 | else |
1850 | 0 | cstate->copy_file = stdin; |
1851 | 0 | } |
1852 | 0 | else |
1853 | 0 | { |
1854 | 0 | cstate->filename = pstrdup(filename); |
1855 | |
|
1856 | 0 | if (cstate->is_program) |
1857 | 0 | { |
1858 | 0 | progress_vals[1] = PROGRESS_COPY_TYPE_PROGRAM; |
1859 | 0 | cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_R); |
1860 | 0 | if (cstate->copy_file == NULL) |
1861 | 0 | ereport(ERROR, |
1862 | 0 | (errcode_for_file_access(), |
1863 | 0 | errmsg("could not execute command \"%s\": %m", |
1864 | 0 | cstate->filename))); |
1865 | 0 | } |
1866 | 0 | else |
1867 | 0 | { |
1868 | 0 | struct stat st; |
1869 | |
|
1870 | 0 | progress_vals[1] = PROGRESS_COPY_TYPE_FILE; |
1871 | 0 | cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R); |
1872 | 0 | if (cstate->copy_file == NULL) |
1873 | 0 | { |
1874 | | /* copy errno because ereport subfunctions might change it */ |
1875 | 0 | int save_errno = errno; |
1876 | |
|
1877 | 0 | ereport(ERROR, |
1878 | 0 | (errcode_for_file_access(), |
1879 | 0 | errmsg("could not open file \"%s\" for reading: %m", |
1880 | 0 | cstate->filename), |
1881 | 0 | (save_errno == ENOENT || save_errno == EACCES) ? |
1882 | 0 | errhint("COPY FROM instructs the PostgreSQL server process to read a file. " |
1883 | 0 | "You may want a client-side facility such as psql's \\copy.") : 0)); |
1884 | 0 | } |
1885 | | |
1886 | 0 | if (fstat(fileno(cstate->copy_file), &st)) |
1887 | 0 | ereport(ERROR, |
1888 | 0 | (errcode_for_file_access(), |
1889 | 0 | errmsg("could not stat file \"%s\": %m", |
1890 | 0 | cstate->filename))); |
1891 | | |
1892 | 0 | if (S_ISDIR(st.st_mode)) |
1893 | 0 | ereport(ERROR, |
1894 | 0 | (errcode(ERRCODE_WRONG_OBJECT_TYPE), |
1895 | 0 | errmsg("\"%s\" is a directory", cstate->filename))); |
1896 | | |
1897 | 0 | progress_vals[2] = st.st_size; |
1898 | 0 | } |
1899 | 0 | } |
1900 | | |
1901 | 0 | pgstat_progress_update_multi_param(3, progress_cols, progress_vals); |
1902 | |
|
1903 | 0 | cstate->routine->CopyFromStart(cstate, tupDesc); |
1904 | |
|
1905 | 0 | MemoryContextSwitchTo(oldcontext); |
1906 | |
|
1907 | 0 | return cstate; |
1908 | 0 | } |
1909 | | |
1910 | | /* |
1911 | | * Clean up storage and release resources for COPY FROM. |
1912 | | */ |
1913 | | void |
1914 | | EndCopyFrom(CopyFromState cstate) |
1915 | 0 | { |
1916 | | /* Invoke the end callback */ |
1917 | 0 | cstate->routine->CopyFromEnd(cstate); |
1918 | | |
1919 | | /* No COPY FROM related resources except memory. */ |
1920 | 0 | if (cstate->is_program) |
1921 | 0 | { |
1922 | 0 | ClosePipeFromProgram(cstate); |
1923 | 0 | } |
1924 | 0 | else |
1925 | 0 | { |
1926 | 0 | if (cstate->filename != NULL && FreeFile(cstate->copy_file)) |
1927 | 0 | ereport(ERROR, |
1928 | 0 | (errcode_for_file_access(), |
1929 | 0 | errmsg("could not close file \"%s\": %m", |
1930 | 0 | cstate->filename))); |
1931 | 0 | } |
1932 | | |
1933 | 0 | pgstat_progress_end_command(); |
1934 | |
|
1935 | 0 | MemoryContextDelete(cstate->copycontext); |
1936 | 0 | pfree(cstate); |
1937 | 0 | } |
1938 | | |
1939 | | /* |
1940 | | * Closes the pipe from an external program, checking the pclose() return code. |
1941 | | */ |
1942 | | static void |
1943 | | ClosePipeFromProgram(CopyFromState cstate) |
1944 | 0 | { |
1945 | 0 | int pclose_rc; |
1946 | |
|
1947 | 0 | Assert(cstate->is_program); |
1948 | |
|
1949 | 0 | pclose_rc = ClosePipeStream(cstate->copy_file); |
1950 | 0 | if (pclose_rc == -1) |
1951 | 0 | ereport(ERROR, |
1952 | 0 | (errcode_for_file_access(), |
1953 | 0 | errmsg("could not close pipe to external command: %m"))); |
1954 | 0 | else if (pclose_rc != 0) |
1955 | 0 | { |
1956 | | /* |
1957 | | * If we ended a COPY FROM PROGRAM before reaching EOF, then it's |
1958 | | * expectable for the called program to fail with SIGPIPE, and we |
1959 | | * should not report that as an error. Otherwise, SIGPIPE indicates a |
1960 | | * problem. |
1961 | | */ |
1962 | 0 | if (!cstate->raw_reached_eof && |
1963 | 0 | wait_result_is_signal(pclose_rc, SIGPIPE)) |
1964 | 0 | return; |
1965 | | |
1966 | 0 | ereport(ERROR, |
1967 | 0 | (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION), |
1968 | 0 | errmsg("program \"%s\" failed", |
1969 | 0 | cstate->filename), |
1970 | 0 | errdetail_internal("%s", wait_result_to_str(pclose_rc)))); |
1971 | 0 | } |
1972 | 0 | } |