Coverage Report

Created: 2025-10-09 06:07

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/postgres/src/backend/commands/copyfrom.c
Line
Count
Source
1
/*-------------------------------------------------------------------------
2
 *
3
 * copyfrom.c
4
 *    COPY <table> FROM file/program/client
5
 *
6
 * This file contains routines needed to efficiently load tuples into a
7
 * table.  That includes looking up the correct partition, firing triggers,
8
 * calling the table AM function to insert the data, and updating indexes.
9
 * Reading data from the input file or client and parsing it into Datums
10
 * is handled in copyfromparse.c.
11
 *
12
 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
13
 * Portions Copyright (c) 1994, Regents of the University of California
14
 *
15
 *
16
 * IDENTIFICATION
17
 *    src/backend/commands/copyfrom.c
18
 *
19
 *-------------------------------------------------------------------------
20
 */
21
#include "postgres.h"
22
23
#include <ctype.h>
24
#include <unistd.h>
25
#include <sys/stat.h>
26
27
#include "access/heapam.h"
28
#include "access/tableam.h"
29
#include "access/xact.h"
30
#include "catalog/namespace.h"
31
#include "commands/copyapi.h"
32
#include "commands/copyfrom_internal.h"
33
#include "commands/progress.h"
34
#include "commands/trigger.h"
35
#include "executor/execPartition.h"
36
#include "executor/executor.h"
37
#include "executor/nodeModifyTable.h"
38
#include "executor/tuptable.h"
39
#include "foreign/fdwapi.h"
40
#include "mb/pg_wchar.h"
41
#include "miscadmin.h"
42
#include "nodes/miscnodes.h"
43
#include "optimizer/optimizer.h"
44
#include "pgstat.h"
45
#include "rewrite/rewriteHandler.h"
46
#include "storage/fd.h"
47
#include "tcop/tcopprot.h"
48
#include "utils/lsyscache.h"
49
#include "utils/memutils.h"
50
#include "utils/portal.h"
51
#include "utils/rel.h"
52
#include "utils/snapmgr.h"
53
54
/*
55
 * No more than this many tuples per CopyMultiInsertBuffer
56
 *
57
 * Caution: Don't make this too big, as we could end up with this many
58
 * CopyMultiInsertBuffer items stored in CopyMultiInsertInfo's
59
 * multiInsertBuffers list.  Increasing this can cause quadratic growth in
60
 * memory requirements during copies into partitioned tables with a large
61
 * number of partitions.
62
 */
63
0
#define MAX_BUFFERED_TUPLES   1000
64
65
/*
66
 * Flush buffers if there are >= this many bytes, as counted by the input
67
 * size, of tuples stored.
68
 */
69
0
#define MAX_BUFFERED_BYTES    65535
70
71
/*
72
 * Trim the list of buffers back down to this number after flushing.  This
73
 * must be >= 2.
74
 */
75
0
#define MAX_PARTITION_BUFFERS 32
76
77
/* Stores multi-insert data related to a single relation in CopyFrom. */
78
typedef struct CopyMultiInsertBuffer
79
{
80
  TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */
81
  ResultRelInfo *resultRelInfo; /* ResultRelInfo for 'relid' */
82
  BulkInsertState bistate;  /* BulkInsertState for this rel if plain
83
                 * table; NULL if foreign table */
84
  int     nused;      /* number of 'slots' containing tuples */
85
  uint64    linenos[MAX_BUFFERED_TUPLES]; /* Line # of tuple in copy
86
                         * stream */
87
} CopyMultiInsertBuffer;
88
89
/*
90
 * Stores one or many CopyMultiInsertBuffers and details about the size and
91
 * number of tuples which are stored in them.  This allows multiple buffers to
92
 * exist at once when COPYing into a partitioned table.
93
 */
94
typedef struct CopyMultiInsertInfo
95
{
96
  List     *multiInsertBuffers; /* List of tracked CopyMultiInsertBuffers */
97
  int     bufferedTuples; /* number of tuples buffered over all buffers */
98
  int     bufferedBytes;  /* number of bytes from all buffered tuples */
99
  CopyFromState cstate;   /* Copy state for this CopyMultiInsertInfo */
100
  EState     *estate;     /* Executor state used for COPY */
101
  CommandId mycid;      /* Command Id used for COPY */
102
  int     ti_options;   /* table insert options */
103
} CopyMultiInsertInfo;
104
105
106
/* non-export function prototypes */
107
static void ClosePipeFromProgram(CopyFromState cstate);
108
109
/*
110
 * Built-in format-specific routines. One-row callbacks are defined in
111
 * copyfromparse.c.
112
 */
113
static void CopyFromTextLikeInFunc(CopyFromState cstate, Oid atttypid, FmgrInfo *finfo,
114
                   Oid *typioparam);
115
static void CopyFromTextLikeStart(CopyFromState cstate, TupleDesc tupDesc);
116
static void CopyFromTextLikeEnd(CopyFromState cstate);
117
static void CopyFromBinaryInFunc(CopyFromState cstate, Oid atttypid,
118
                 FmgrInfo *finfo, Oid *typioparam);
119
static void CopyFromBinaryStart(CopyFromState cstate, TupleDesc tupDesc);
120
static void CopyFromBinaryEnd(CopyFromState cstate);
121
122
123
/*
124
 * COPY FROM routines for built-in formats.
125
 *
126
 * CSV and text formats share the same TextLike routines except for the
127
 * one-row callback.
128
 */
129
130
/* text format */
131
static const CopyFromRoutine CopyFromRoutineText = {
132
  .CopyFromInFunc = CopyFromTextLikeInFunc,
133
  .CopyFromStart = CopyFromTextLikeStart,
134
  .CopyFromOneRow = CopyFromTextOneRow,
135
  .CopyFromEnd = CopyFromTextLikeEnd,
136
};
137
138
/* CSV format */
139
static const CopyFromRoutine CopyFromRoutineCSV = {
140
  .CopyFromInFunc = CopyFromTextLikeInFunc,
141
  .CopyFromStart = CopyFromTextLikeStart,
142
  .CopyFromOneRow = CopyFromCSVOneRow,
143
  .CopyFromEnd = CopyFromTextLikeEnd,
144
};
145
146
/* binary format */
147
static const CopyFromRoutine CopyFromRoutineBinary = {
148
  .CopyFromInFunc = CopyFromBinaryInFunc,
149
  .CopyFromStart = CopyFromBinaryStart,
150
  .CopyFromOneRow = CopyFromBinaryOneRow,
151
  .CopyFromEnd = CopyFromBinaryEnd,
152
};
153
154
/* Return a COPY FROM routine for the given options */
155
static const CopyFromRoutine *
156
CopyFromGetRoutine(const CopyFormatOptions *opts)
157
0
{
158
0
  if (opts->csv_mode)
159
0
    return &CopyFromRoutineCSV;
160
0
  else if (opts->binary)
161
0
    return &CopyFromRoutineBinary;
162
163
  /* default is text */
164
0
  return &CopyFromRoutineText;
165
0
}
166
167
/* Implementation of the start callback for text and CSV formats */
168
static void
169
CopyFromTextLikeStart(CopyFromState cstate, TupleDesc tupDesc)
170
0
{
171
0
  AttrNumber  attr_count;
172
173
  /*
174
   * If encoding conversion is needed, we need another buffer to hold the
175
   * converted input data.  Otherwise, we can just point input_buf to the
176
   * same buffer as raw_buf.
177
   */
178
0
  if (cstate->need_transcoding)
179
0
  {
180
0
    cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1);
181
0
    cstate->input_buf_index = cstate->input_buf_len = 0;
182
0
  }
183
0
  else
184
0
    cstate->input_buf = cstate->raw_buf;
185
0
  cstate->input_reached_eof = false;
186
187
0
  initStringInfo(&cstate->line_buf);
188
189
  /*
190
   * Create workspace for CopyReadAttributes results; used by CSV and text
191
   * format.
192
   */
193
0
  attr_count = list_length(cstate->attnumlist);
194
0
  cstate->max_fields = attr_count;
195
0
  cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *));
196
0
}
197
198
/*
199
 * Implementation of the infunc callback for text and CSV formats. Assign
200
 * the input function data to the given *finfo.
201
 */
202
static void
203
CopyFromTextLikeInFunc(CopyFromState cstate, Oid atttypid, FmgrInfo *finfo,
204
             Oid *typioparam)
205
0
{
206
0
  Oid     func_oid;
207
208
0
  getTypeInputInfo(atttypid, &func_oid, typioparam);
209
0
  fmgr_info(func_oid, finfo);
210
0
}
211
212
/* Implementation of the end callback for text and CSV formats */
213
static void
214
CopyFromTextLikeEnd(CopyFromState cstate)
215
0
{
216
  /* nothing to do */
217
0
}
218
219
/* Implementation of the start callback for binary format */
220
static void
221
CopyFromBinaryStart(CopyFromState cstate, TupleDesc tupDesc)
222
0
{
223
  /* Read and verify binary header */
224
0
  ReceiveCopyBinaryHeader(cstate);
225
0
}
226
227
/*
228
 * Implementation of the infunc callback for binary format. Assign
229
 * the binary input function to the given *finfo.
230
 */
231
static void
232
CopyFromBinaryInFunc(CopyFromState cstate, Oid atttypid,
233
           FmgrInfo *finfo, Oid *typioparam)
234
0
{
235
0
  Oid     func_oid;
236
237
0
  getTypeBinaryInputInfo(atttypid, &func_oid, typioparam);
238
0
  fmgr_info(func_oid, finfo);
239
0
}
240
241
/* Implementation of the end callback for binary format */
242
static void
243
CopyFromBinaryEnd(CopyFromState cstate)
244
0
{
245
  /* nothing to do */
246
0
}
247
248
/*
249
 * error context callback for COPY FROM
250
 *
251
 * The argument for the error context must be CopyFromState.
252
 */
253
void
254
CopyFromErrorCallback(void *arg)
255
0
{
256
0
  CopyFromState cstate = (CopyFromState) arg;
257
258
0
  if (cstate->relname_only)
259
0
  {
260
0
    errcontext("COPY %s",
261
0
           cstate->cur_relname);
262
0
    return;
263
0
  }
264
0
  if (cstate->opts.binary)
265
0
  {
266
    /* can't usefully display the data */
267
0
    if (cstate->cur_attname)
268
0
      errcontext("COPY %s, line %" PRIu64 ", column %s",
269
0
             cstate->cur_relname,
270
0
             cstate->cur_lineno,
271
0
             cstate->cur_attname);
272
0
    else
273
0
      errcontext("COPY %s, line %" PRIu64,
274
0
             cstate->cur_relname,
275
0
             cstate->cur_lineno);
276
0
  }
277
0
  else
278
0
  {
279
0
    if (cstate->cur_attname && cstate->cur_attval)
280
0
    {
281
      /* error is relevant to a particular column */
282
0
      char     *attval;
283
284
0
      attval = CopyLimitPrintoutLength(cstate->cur_attval);
285
0
      errcontext("COPY %s, line %" PRIu64 ", column %s: \"%s\"",
286
0
             cstate->cur_relname,
287
0
             cstate->cur_lineno,
288
0
             cstate->cur_attname,
289
0
             attval);
290
0
      pfree(attval);
291
0
    }
292
0
    else if (cstate->cur_attname)
293
0
    {
294
      /* error is relevant to a particular column, value is NULL */
295
0
      errcontext("COPY %s, line %" PRIu64 ", column %s: null input",
296
0
             cstate->cur_relname,
297
0
             cstate->cur_lineno,
298
0
             cstate->cur_attname);
299
0
    }
300
0
    else
301
0
    {
302
      /*
303
       * Error is relevant to a particular line.
304
       *
305
       * If line_buf still contains the correct line, print it.
306
       */
307
0
      if (cstate->line_buf_valid)
308
0
      {
309
0
        char     *lineval;
310
311
0
        lineval = CopyLimitPrintoutLength(cstate->line_buf.data);
312
0
        errcontext("COPY %s, line %" PRIu64 ": \"%s\"",
313
0
               cstate->cur_relname,
314
0
               cstate->cur_lineno, lineval);
315
0
        pfree(lineval);
316
0
      }
317
0
      else
318
0
      {
319
0
        errcontext("COPY %s, line %" PRIu64,
320
0
               cstate->cur_relname,
321
0
               cstate->cur_lineno);
322
0
      }
323
0
    }
324
0
  }
325
0
}
326
327
/*
328
 * Make sure we don't print an unreasonable amount of COPY data in a message.
329
 *
330
 * Returns a pstrdup'd copy of the input.
331
 */
332
char *
333
CopyLimitPrintoutLength(const char *str)
334
0
{
335
0
#define MAX_COPY_DATA_DISPLAY 100
336
337
0
  int     slen = strlen(str);
338
0
  int     len;
339
0
  char     *res;
340
341
  /* Fast path if definitely okay */
342
0
  if (slen <= MAX_COPY_DATA_DISPLAY)
343
0
    return pstrdup(str);
344
345
  /* Apply encoding-dependent truncation */
346
0
  len = pg_mbcliplen(str, slen, MAX_COPY_DATA_DISPLAY);
347
348
  /*
349
   * Truncate, and add "..." to show we truncated the input.
350
   */
351
0
  res = (char *) palloc(len + 4);
352
0
  memcpy(res, str, len);
353
0
  strcpy(res + len, "...");
354
355
0
  return res;
356
0
}
357
358
/*
359
 * Allocate memory and initialize a new CopyMultiInsertBuffer for this
360
 * ResultRelInfo.
361
 */
362
static CopyMultiInsertBuffer *
363
CopyMultiInsertBufferInit(ResultRelInfo *rri)
364
0
{
365
0
  CopyMultiInsertBuffer *buffer;
366
367
0
  buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer));
368
0
  memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES);
369
0
  buffer->resultRelInfo = rri;
370
0
  buffer->bistate = (rri->ri_FdwRoutine == NULL) ? GetBulkInsertState() : NULL;
371
0
  buffer->nused = 0;
372
373
0
  return buffer;
374
0
}
375
376
/*
377
 * Make a new buffer for this ResultRelInfo.
378
 */
379
static inline void
380
CopyMultiInsertInfoSetupBuffer(CopyMultiInsertInfo *miinfo,
381
                 ResultRelInfo *rri)
382
0
{
383
0
  CopyMultiInsertBuffer *buffer;
384
385
0
  buffer = CopyMultiInsertBufferInit(rri);
386
387
  /* Setup back-link so we can easily find this buffer again */
388
0
  rri->ri_CopyMultiInsertBuffer = buffer;
389
  /* Record that we're tracking this buffer */
390
0
  miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
391
0
}
392
393
/*
394
 * Initialize an already allocated CopyMultiInsertInfo.
395
 *
396
 * If rri is a non-partitioned table then a CopyMultiInsertBuffer is set up
397
 * for that table.
398
 */
399
static void
400
CopyMultiInsertInfoInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
401
            CopyFromState cstate, EState *estate, CommandId mycid,
402
            int ti_options)
403
0
{
404
0
  miinfo->multiInsertBuffers = NIL;
405
0
  miinfo->bufferedTuples = 0;
406
0
  miinfo->bufferedBytes = 0;
407
0
  miinfo->cstate = cstate;
408
0
  miinfo->estate = estate;
409
0
  miinfo->mycid = mycid;
410
0
  miinfo->ti_options = ti_options;
411
412
  /*
413
   * Only setup the buffer when not dealing with a partitioned table.
414
   * Buffers for partitioned tables will just be setup when we need to send
415
   * tuples their way for the first time.
416
   */
417
0
  if (rri->ri_RelationDesc->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
418
0
    CopyMultiInsertInfoSetupBuffer(miinfo, rri);
419
0
}
420
421
/*
422
 * Returns true if the buffers are full
423
 */
424
static inline bool
425
CopyMultiInsertInfoIsFull(CopyMultiInsertInfo *miinfo)
426
0
{
427
0
  if (miinfo->bufferedTuples >= MAX_BUFFERED_TUPLES ||
428
0
    miinfo->bufferedBytes >= MAX_BUFFERED_BYTES)
429
0
    return true;
430
0
  return false;
431
0
}
432
433
/*
434
 * Returns true if we have no buffered tuples
435
 */
436
static inline bool
437
CopyMultiInsertInfoIsEmpty(CopyMultiInsertInfo *miinfo)
438
0
{
439
0
  return miinfo->bufferedTuples == 0;
440
0
}
441
442
/*
443
 * Write the tuples stored in 'buffer' out to the table.
444
 */
445
static inline void
446
CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
447
               CopyMultiInsertBuffer *buffer,
448
               int64 *processed)
449
0
{
450
0
  CopyFromState cstate = miinfo->cstate;
451
0
  EState     *estate = miinfo->estate;
452
0
  int     nused = buffer->nused;
453
0
  ResultRelInfo *resultRelInfo = buffer->resultRelInfo;
454
0
  TupleTableSlot **slots = buffer->slots;
455
0
  int     i;
456
457
0
  if (resultRelInfo->ri_FdwRoutine)
458
0
  {
459
0
    int     batch_size = resultRelInfo->ri_BatchSize;
460
0
    int     sent = 0;
461
462
0
    Assert(buffer->bistate == NULL);
463
464
    /* Ensure that the FDW supports batching and it's enabled */
465
0
    Assert(resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert);
466
0
    Assert(batch_size > 1);
467
468
    /*
469
     * We suppress error context information other than the relation name,
470
     * if one of the operations below fails.
471
     */
472
0
    Assert(!cstate->relname_only);
473
0
    cstate->relname_only = true;
474
475
0
    while (sent < nused)
476
0
    {
477
0
      int     size = (batch_size < nused - sent) ? batch_size : (nused - sent);
478
0
      int     inserted = size;
479
0
      TupleTableSlot **rslots;
480
481
      /* insert into foreign table: let the FDW do it */
482
0
      rslots =
483
0
        resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert(estate,
484
0
                                   resultRelInfo,
485
0
                                   &slots[sent],
486
0
                                   NULL,
487
0
                                   &inserted);
488
489
0
      sent += size;
490
491
      /* No need to do anything if there are no inserted rows */
492
0
      if (inserted <= 0)
493
0
        continue;
494
495
      /* Triggers on foreign tables should not have transition tables */
496
0
      Assert(resultRelInfo->ri_TrigDesc == NULL ||
497
0
           resultRelInfo->ri_TrigDesc->trig_insert_new_table == false);
498
499
      /* Run AFTER ROW INSERT triggers */
500
0
      if (resultRelInfo->ri_TrigDesc != NULL &&
501
0
        resultRelInfo->ri_TrigDesc->trig_insert_after_row)
502
0
      {
503
0
        Oid     relid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
504
505
0
        for (i = 0; i < inserted; i++)
506
0
        {
507
0
          TupleTableSlot *slot = rslots[i];
508
509
          /*
510
           * AFTER ROW Triggers might reference the tableoid column,
511
           * so (re-)initialize tts_tableOid before evaluating them.
512
           */
513
0
          slot->tts_tableOid = relid;
514
515
0
          ExecARInsertTriggers(estate, resultRelInfo,
516
0
                     slot, NIL,
517
0
                     cstate->transition_capture);
518
0
        }
519
0
      }
520
521
      /* Update the row counter and progress of the COPY command */
522
0
      *processed += inserted;
523
0
      pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
524
0
                     *processed);
525
0
    }
526
527
0
    for (i = 0; i < nused; i++)
528
0
      ExecClearTuple(slots[i]);
529
530
    /* reset relname_only */
531
0
    cstate->relname_only = false;
532
0
  }
533
0
  else
534
0
  {
535
0
    CommandId mycid = miinfo->mycid;
536
0
    int     ti_options = miinfo->ti_options;
537
0
    bool    line_buf_valid = cstate->line_buf_valid;
538
0
    uint64    save_cur_lineno = cstate->cur_lineno;
539
0
    MemoryContext oldcontext;
540
541
0
    Assert(buffer->bistate != NULL);
542
543
    /*
544
     * Print error context information correctly, if one of the operations
545
     * below fails.
546
     */
547
0
    cstate->line_buf_valid = false;
548
549
    /*
550
     * table_multi_insert may leak memory, so switch to short-lived memory
551
     * context before calling it.
552
     */
553
0
    oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
554
0
    table_multi_insert(resultRelInfo->ri_RelationDesc,
555
0
               slots,
556
0
               nused,
557
0
               mycid,
558
0
               ti_options,
559
0
               buffer->bistate);
560
0
    MemoryContextSwitchTo(oldcontext);
561
562
0
    for (i = 0; i < nused; i++)
563
0
    {
564
      /*
565
       * If there are any indexes, update them for all the inserted
566
       * tuples, and run AFTER ROW INSERT triggers.
567
       */
568
0
      if (resultRelInfo->ri_NumIndices > 0)
569
0
      {
570
0
        List     *recheckIndexes;
571
572
0
        cstate->cur_lineno = buffer->linenos[i];
573
0
        recheckIndexes =
574
0
          ExecInsertIndexTuples(resultRelInfo,
575
0
                      buffer->slots[i], estate, false,
576
0
                      false, NULL, NIL, false);
577
0
        ExecARInsertTriggers(estate, resultRelInfo,
578
0
                   slots[i], recheckIndexes,
579
0
                   cstate->transition_capture);
580
0
        list_free(recheckIndexes);
581
0
      }
582
583
      /*
584
       * There's no indexes, but see if we need to run AFTER ROW INSERT
585
       * triggers anyway.
586
       */
587
0
      else if (resultRelInfo->ri_TrigDesc != NULL &&
588
0
           (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
589
0
            resultRelInfo->ri_TrigDesc->trig_insert_new_table))
590
0
      {
591
0
        cstate->cur_lineno = buffer->linenos[i];
592
0
        ExecARInsertTriggers(estate, resultRelInfo,
593
0
                   slots[i], NIL,
594
0
                   cstate->transition_capture);
595
0
      }
596
597
0
      ExecClearTuple(slots[i]);
598
0
    }
599
600
    /* Update the row counter and progress of the COPY command */
601
0
    *processed += nused;
602
0
    pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
603
0
                   *processed);
604
605
    /* reset cur_lineno and line_buf_valid to what they were */
606
0
    cstate->line_buf_valid = line_buf_valid;
607
0
    cstate->cur_lineno = save_cur_lineno;
608
0
  }
609
610
  /* Mark that all slots are free */
611
0
  buffer->nused = 0;
612
0
}
613
614
/*
615
 * Drop used slots and free member for this buffer.
616
 *
617
 * The buffer must be flushed before cleanup.
618
 */
619
static inline void
620
CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo,
621
               CopyMultiInsertBuffer *buffer)
622
0
{
623
0
  ResultRelInfo *resultRelInfo = buffer->resultRelInfo;
624
0
  int     i;
625
626
  /* Ensure buffer was flushed */
627
0
  Assert(buffer->nused == 0);
628
629
  /* Remove back-link to ourself */
630
0
  resultRelInfo->ri_CopyMultiInsertBuffer = NULL;
631
632
0
  if (resultRelInfo->ri_FdwRoutine == NULL)
633
0
  {
634
0
    Assert(buffer->bistate != NULL);
635
0
    FreeBulkInsertState(buffer->bistate);
636
0
  }
637
0
  else
638
0
    Assert(buffer->bistate == NULL);
639
640
  /* Since we only create slots on demand, just drop the non-null ones. */
641
0
  for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
642
0
    ExecDropSingleTupleTableSlot(buffer->slots[i]);
643
644
0
  if (resultRelInfo->ri_FdwRoutine == NULL)
645
0
    table_finish_bulk_insert(resultRelInfo->ri_RelationDesc,
646
0
                 miinfo->ti_options);
647
648
0
  pfree(buffer);
649
0
}
650
651
/*
652
 * Write out all stored tuples in all buffers out to the tables.
653
 *
654
 * Once flushed we also trim the tracked buffers list down to size by removing
655
 * the buffers created earliest first.
656
 *
657
 * Callers should pass 'curr_rri' as the ResultRelInfo that's currently being
658
 * used.  When cleaning up old buffers we'll never remove the one for
659
 * 'curr_rri'.
660
 */
661
static inline void
662
CopyMultiInsertInfoFlush(CopyMultiInsertInfo *miinfo, ResultRelInfo *curr_rri,
663
             int64 *processed)
664
0
{
665
0
  ListCell   *lc;
666
667
0
  foreach(lc, miinfo->multiInsertBuffers)
668
0
  {
669
0
    CopyMultiInsertBuffer *buffer = (CopyMultiInsertBuffer *) lfirst(lc);
670
671
0
    CopyMultiInsertBufferFlush(miinfo, buffer, processed);
672
0
  }
673
674
0
  miinfo->bufferedTuples = 0;
675
0
  miinfo->bufferedBytes = 0;
676
677
  /*
678
   * Trim the list of tracked buffers down if it exceeds the limit.  Here we
679
   * remove buffers starting with the ones we created first.  It seems less
680
   * likely that these older ones will be needed than the ones that were
681
   * just created.
682
   */
683
0
  while (list_length(miinfo->multiInsertBuffers) > MAX_PARTITION_BUFFERS)
684
0
  {
685
0
    CopyMultiInsertBuffer *buffer;
686
687
0
    buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
688
689
    /*
690
     * We never want to remove the buffer that's currently being used, so
691
     * if we happen to find that then move it to the end of the list.
692
     */
693
0
    if (buffer->resultRelInfo == curr_rri)
694
0
    {
695
      /*
696
       * The code below would misbehave if we were trying to reduce the
697
       * list to less than two items.
698
       */
699
0
      StaticAssertDecl(MAX_PARTITION_BUFFERS >= 2,
700
0
               "MAX_PARTITION_BUFFERS must be >= 2");
701
702
0
      miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers);
703
0
      miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
704
0
      buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
705
0
    }
706
707
0
    CopyMultiInsertBufferCleanup(miinfo, buffer);
708
0
    miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers);
709
0
  }
710
0
}
711
712
/*
713
 * Cleanup allocated buffers and free memory
714
 */
715
static inline void
716
CopyMultiInsertInfoCleanup(CopyMultiInsertInfo *miinfo)
717
0
{
718
0
  ListCell   *lc;
719
720
0
  foreach(lc, miinfo->multiInsertBuffers)
721
0
    CopyMultiInsertBufferCleanup(miinfo, lfirst(lc));
722
723
0
  list_free(miinfo->multiInsertBuffers);
724
0
}
725
726
/*
727
 * Get the next TupleTableSlot that the next tuple should be stored in.
728
 *
729
 * Callers must ensure that the buffer is not full.
730
 *
731
 * Note: 'miinfo' is unused but has been included for consistency with the
732
 * other functions in this area.
733
 */
734
static inline TupleTableSlot *
735
CopyMultiInsertInfoNextFreeSlot(CopyMultiInsertInfo *miinfo,
736
                ResultRelInfo *rri)
737
0
{
738
0
  CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
739
0
  int     nused;
740
741
0
  Assert(buffer != NULL);
742
0
  Assert(buffer->nused < MAX_BUFFERED_TUPLES);
743
744
0
  nused = buffer->nused;
745
746
0
  if (buffer->slots[nused] == NULL)
747
0
    buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL);
748
0
  return buffer->slots[nused];
749
0
}
750
751
/*
752
 * Record the previously reserved TupleTableSlot that was reserved by
753
 * CopyMultiInsertInfoNextFreeSlot as being consumed.
754
 */
755
static inline void
756
CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
757
             TupleTableSlot *slot, int tuplen, uint64 lineno)
758
0
{
759
0
  CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
760
761
0
  Assert(buffer != NULL);
762
0
  Assert(slot == buffer->slots[buffer->nused]);
763
764
  /* Store the line number so we can properly report any errors later */
765
0
  buffer->linenos[buffer->nused] = lineno;
766
767
  /* Record this slot as being used */
768
0
  buffer->nused++;
769
770
  /* Update how many tuples are stored and their size */
771
0
  miinfo->bufferedTuples++;
772
0
  miinfo->bufferedBytes += tuplen;
773
0
}
774
775
/*
776
 * Copy FROM file to relation.
777
 */
778
uint64
779
CopyFrom(CopyFromState cstate)
780
0
{
781
0
  ResultRelInfo *resultRelInfo;
782
0
  ResultRelInfo *target_resultRelInfo;
783
0
  ResultRelInfo *prevResultRelInfo = NULL;
784
0
  EState     *estate = CreateExecutorState(); /* for ExecConstraints() */
785
0
  ModifyTableState *mtstate;
786
0
  ExprContext *econtext;
787
0
  TupleTableSlot *singleslot = NULL;
788
0
  MemoryContext oldcontext = CurrentMemoryContext;
789
790
0
  PartitionTupleRouting *proute = NULL;
791
0
  ErrorContextCallback errcallback;
792
0
  CommandId mycid = GetCurrentCommandId(true);
793
0
  int     ti_options = 0; /* start with default options for insert */
794
0
  BulkInsertState bistate = NULL;
795
0
  CopyInsertMethod insertMethod;
796
0
  CopyMultiInsertInfo multiInsertInfo = {0};  /* pacify compiler */
797
0
  int64   processed = 0;
798
0
  int64   excluded = 0;
799
0
  bool    has_before_insert_row_trig;
800
0
  bool    has_instead_insert_row_trig;
801
0
  bool    leafpart_use_multi_insert = false;
802
803
0
  Assert(cstate->rel);
804
0
  Assert(list_length(cstate->range_table) == 1);
805
806
0
  if (cstate->opts.on_error != COPY_ON_ERROR_STOP)
807
0
    Assert(cstate->escontext);
808
809
  /*
810
   * The target must be a plain, foreign, or partitioned relation, or have
811
   * an INSTEAD OF INSERT row trigger.  (Currently, such triggers are only
812
   * allowed on views, so we only hint about them in the view case.)
813
   */
814
0
  if (cstate->rel->rd_rel->relkind != RELKIND_RELATION &&
815
0
    cstate->rel->rd_rel->relkind != RELKIND_FOREIGN_TABLE &&
816
0
    cstate->rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE &&
817
0
    !(cstate->rel->trigdesc &&
818
0
      cstate->rel->trigdesc->trig_insert_instead_row))
819
0
  {
820
0
    if (cstate->rel->rd_rel->relkind == RELKIND_VIEW)
821
0
      ereport(ERROR,
822
0
          (errcode(ERRCODE_WRONG_OBJECT_TYPE),
823
0
           errmsg("cannot copy to view \"%s\"",
824
0
              RelationGetRelationName(cstate->rel)),
825
0
           errhint("To enable copying to a view, provide an INSTEAD OF INSERT trigger.")));
826
0
    else if (cstate->rel->rd_rel->relkind == RELKIND_MATVIEW)
827
0
      ereport(ERROR,
828
0
          (errcode(ERRCODE_WRONG_OBJECT_TYPE),
829
0
           errmsg("cannot copy to materialized view \"%s\"",
830
0
              RelationGetRelationName(cstate->rel))));
831
0
    else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE)
832
0
      ereport(ERROR,
833
0
          (errcode(ERRCODE_WRONG_OBJECT_TYPE),
834
0
           errmsg("cannot copy to sequence \"%s\"",
835
0
              RelationGetRelationName(cstate->rel))));
836
0
    else
837
0
      ereport(ERROR,
838
0
          (errcode(ERRCODE_WRONG_OBJECT_TYPE),
839
0
           errmsg("cannot copy to non-table relation \"%s\"",
840
0
              RelationGetRelationName(cstate->rel))));
841
0
  }
842
843
  /*
844
   * If the target file is new-in-transaction, we assume that checking FSM
845
   * for free space is a waste of time.  This could possibly be wrong, but
846
   * it's unlikely.
847
   */
848
0
  if (RELKIND_HAS_STORAGE(cstate->rel->rd_rel->relkind) &&
849
0
    (cstate->rel->rd_createSubid != InvalidSubTransactionId ||
850
0
     cstate->rel->rd_firstRelfilelocatorSubid != InvalidSubTransactionId))
851
0
    ti_options |= TABLE_INSERT_SKIP_FSM;
852
853
  /*
854
   * Optimize if new relation storage was created in this subxact or one of
855
   * its committed children and we won't see those rows later as part of an
856
   * earlier scan or command. The subxact test ensures that if this subxact
857
   * aborts then the frozen rows won't be visible after xact cleanup.  Note
858
   * that the stronger test of exactly which subtransaction created it is
859
   * crucial for correctness of this optimization. The test for an earlier
860
   * scan or command tolerates false negatives. FREEZE causes other sessions
861
   * to see rows they would not see under MVCC, and a false negative merely
862
   * spreads that anomaly to the current session.
863
   */
864
0
  if (cstate->opts.freeze)
865
0
  {
866
    /*
867
     * We currently disallow COPY FREEZE on partitioned tables.  The
868
     * reason for this is that we've simply not yet opened the partitions
869
     * to determine if the optimization can be applied to them.  We could
870
     * go and open them all here, but doing so may be quite a costly
871
     * overhead for small copies.  In any case, we may just end up routing
872
     * tuples to a small number of partitions.  It seems better just to
873
     * raise an ERROR for partitioned tables.
874
     */
875
0
    if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
876
0
    {
877
0
      ereport(ERROR,
878
0
          (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
879
0
           errmsg("cannot perform COPY FREEZE on a partitioned table")));
880
0
    }
881
882
    /* There's currently no support for COPY FREEZE on foreign tables. */
883
0
    if (cstate->rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
884
0
      ereport(ERROR,
885
0
          (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
886
0
           errmsg("cannot perform COPY FREEZE on a foreign table")));
887
888
    /*
889
     * Tolerate one registration for the benefit of FirstXactSnapshot.
890
     * Scan-bearing queries generally create at least two registrations,
891
     * though relying on that is fragile, as is ignoring ActiveSnapshot.
892
     * Clear CatalogSnapshot to avoid counting its registration.  We'll
893
     * still detect ongoing catalog scans, each of which separately
894
     * registers the snapshot it uses.
895
     */
896
0
    InvalidateCatalogSnapshot();
897
0
    if (!ThereAreNoPriorRegisteredSnapshots() || !ThereAreNoReadyPortals())
898
0
      ereport(ERROR,
899
0
          (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
900
0
           errmsg("cannot perform COPY FREEZE because of prior transaction activity")));
901
902
0
    if (cstate->rel->rd_createSubid != GetCurrentSubTransactionId() &&
903
0
      cstate->rel->rd_newRelfilelocatorSubid != GetCurrentSubTransactionId())
904
0
      ereport(ERROR,
905
0
          (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
906
0
           errmsg("cannot perform COPY FREEZE because the table was not created or truncated in the current subtransaction")));
907
908
0
    ti_options |= TABLE_INSERT_FROZEN;
909
0
  }
910
911
  /*
912
   * We need a ResultRelInfo so we can use the regular executor's
913
   * index-entry-making machinery.  (There used to be a huge amount of code
914
   * here that basically duplicated execUtils.c ...)
915
   */
916
0
  ExecInitRangeTable(estate, cstate->range_table, cstate->rteperminfos,
917
0
             bms_make_singleton(1));
918
0
  resultRelInfo = target_resultRelInfo = makeNode(ResultRelInfo);
919
0
  ExecInitResultRelation(estate, resultRelInfo, 1);
920
921
  /* Verify the named relation is a valid target for INSERT */
922
0
  CheckValidResultRel(resultRelInfo, CMD_INSERT, ONCONFLICT_NONE, NIL);
923
924
0
  ExecOpenIndices(resultRelInfo, false);
925
926
  /*
927
   * Set up a ModifyTableState so we can let FDW(s) init themselves for
928
   * foreign-table result relation(s).
929
   */
930
0
  mtstate = makeNode(ModifyTableState);
931
0
  mtstate->ps.plan = NULL;
932
0
  mtstate->ps.state = estate;
933
0
  mtstate->operation = CMD_INSERT;
934
0
  mtstate->mt_nrels = 1;
935
0
  mtstate->resultRelInfo = resultRelInfo;
936
0
  mtstate->rootResultRelInfo = resultRelInfo;
937
938
0
  if (resultRelInfo->ri_FdwRoutine != NULL &&
939
0
    resultRelInfo->ri_FdwRoutine->BeginForeignInsert != NULL)
940
0
    resultRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate,
941
0
                             resultRelInfo);
942
943
  /*
944
   * Also, if the named relation is a foreign table, determine if the FDW
945
   * supports batch insert and determine the batch size (a FDW may support
946
   * batching, but it may be disabled for the server/table).
947
   *
948
   * If the FDW does not support batching, we set the batch size to 1.
949
   */
950
0
  if (resultRelInfo->ri_FdwRoutine != NULL &&
951
0
    resultRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize &&
952
0
    resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert)
953
0
    resultRelInfo->ri_BatchSize =
954
0
      resultRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize(resultRelInfo);
955
0
  else
956
0
    resultRelInfo->ri_BatchSize = 1;
957
958
0
  Assert(resultRelInfo->ri_BatchSize >= 1);
959
960
  /* Prepare to catch AFTER triggers. */
961
0
  AfterTriggerBeginQuery();
962
963
  /*
964
   * If there are any triggers with transition tables on the named relation,
965
   * we need to be prepared to capture transition tuples.
966
   *
967
   * Because partition tuple routing would like to know about whether
968
   * transition capture is active, we also set it in mtstate, which is
969
   * passed to ExecFindPartition() below.
970
   */
971
0
  cstate->transition_capture = mtstate->mt_transition_capture =
972
0
    MakeTransitionCaptureState(cstate->rel->trigdesc,
973
0
                   RelationGetRelid(cstate->rel),
974
0
                   CMD_INSERT);
975
976
  /*
977
   * If the named relation is a partitioned table, initialize state for
978
   * CopyFrom tuple routing.
979
   */
980
0
  if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
981
0
    proute = ExecSetupPartitionTupleRouting(estate, cstate->rel);
982
983
0
  if (cstate->whereClause)
984
0
    cstate->qualexpr = ExecInitQual(castNode(List, cstate->whereClause),
985
0
                    &mtstate->ps);
986
987
  /*
988
   * It's generally more efficient to prepare a bunch of tuples for
989
   * insertion, and insert them in one
990
   * table_multi_insert()/ExecForeignBatchInsert() call, than call
991
   * table_tuple_insert()/ExecForeignInsert() separately for every tuple.
992
   * However, there are a number of reasons why we might not be able to do
993
   * this.  These are explained below.
994
   */
995
0
  if (resultRelInfo->ri_TrigDesc != NULL &&
996
0
    (resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
997
0
     resultRelInfo->ri_TrigDesc->trig_insert_instead_row))
998
0
  {
999
    /*
1000
     * Can't support multi-inserts when there are any BEFORE/INSTEAD OF
1001
     * triggers on the table. Such triggers might query the table we're
1002
     * inserting into and act differently if the tuples that have already
1003
     * been processed and prepared for insertion are not there.
1004
     */
1005
0
    insertMethod = CIM_SINGLE;
1006
0
  }
1007
0
  else if (resultRelInfo->ri_FdwRoutine != NULL &&
1008
0
       resultRelInfo->ri_BatchSize == 1)
1009
0
  {
1010
    /*
1011
     * Can't support multi-inserts to a foreign table if the FDW does not
1012
     * support batching, or it's disabled for the server or foreign table.
1013
     */
1014
0
    insertMethod = CIM_SINGLE;
1015
0
  }
1016
0
  else if (proute != NULL && resultRelInfo->ri_TrigDesc != NULL &&
1017
0
       resultRelInfo->ri_TrigDesc->trig_insert_new_table)
1018
0
  {
1019
    /*
1020
     * For partitioned tables we can't support multi-inserts when there
1021
     * are any statement level insert triggers. It might be possible to
1022
     * allow partitioned tables with such triggers in the future, but for
1023
     * now, CopyMultiInsertInfoFlush expects that any after row insert and
1024
     * statement level insert triggers are on the same relation.
1025
     */
1026
0
    insertMethod = CIM_SINGLE;
1027
0
  }
1028
0
  else if (cstate->volatile_defexprs)
1029
0
  {
1030
    /*
1031
     * Can't support multi-inserts if there are any volatile default
1032
     * expressions in the table.  Similarly to the trigger case above,
1033
     * such expressions may query the table we're inserting into.
1034
     *
1035
     * Note: It does not matter if any partitions have any volatile
1036
     * default expressions as we use the defaults from the target of the
1037
     * COPY command.
1038
     */
1039
0
    insertMethod = CIM_SINGLE;
1040
0
  }
1041
0
  else if (contain_volatile_functions(cstate->whereClause))
1042
0
  {
1043
    /*
1044
     * Can't support multi-inserts if there are any volatile function
1045
     * expressions in WHERE clause.  Similarly to the trigger case above,
1046
     * such expressions may query the table we're inserting into.
1047
     *
1048
     * Note: the whereClause was already preprocessed in DoCopy(), so it's
1049
     * okay to use contain_volatile_functions() directly.
1050
     */
1051
0
    insertMethod = CIM_SINGLE;
1052
0
  }
1053
0
  else
1054
0
  {
1055
    /*
1056
     * For partitioned tables, we may still be able to perform bulk
1057
     * inserts.  However, the possibility of this depends on which types
1058
     * of triggers exist on the partition.  We must disable bulk inserts
1059
     * if the partition is a foreign table that can't use batching or it
1060
     * has any before row insert or insert instead triggers (same as we
1061
     * checked above for the parent table).  Since the partition's
1062
     * resultRelInfos are initialized only when we actually need to insert
1063
     * the first tuple into them, we must have the intermediate insert
1064
     * method of CIM_MULTI_CONDITIONAL to flag that we must later
1065
     * determine if we can use bulk-inserts for the partition being
1066
     * inserted into.
1067
     */
1068
0
    if (proute)
1069
0
      insertMethod = CIM_MULTI_CONDITIONAL;
1070
0
    else
1071
0
      insertMethod = CIM_MULTI;
1072
1073
0
    CopyMultiInsertInfoInit(&multiInsertInfo, resultRelInfo, cstate,
1074
0
                estate, mycid, ti_options);
1075
0
  }
1076
1077
  /*
1078
   * If not using batch mode (which allocates slots as needed) set up a
1079
   * tuple slot too. When inserting into a partitioned table, we also need
1080
   * one, even if we might batch insert, to read the tuple in the root
1081
   * partition's form.
1082
   */
1083
0
  if (insertMethod == CIM_SINGLE || insertMethod == CIM_MULTI_CONDITIONAL)
1084
0
  {
1085
0
    singleslot = table_slot_create(resultRelInfo->ri_RelationDesc,
1086
0
                     &estate->es_tupleTable);
1087
0
    bistate = GetBulkInsertState();
1088
0
  }
1089
1090
0
  has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
1091
0
                  resultRelInfo->ri_TrigDesc->trig_insert_before_row);
1092
1093
0
  has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
1094
0
                   resultRelInfo->ri_TrigDesc->trig_insert_instead_row);
1095
1096
  /*
1097
   * Check BEFORE STATEMENT insertion triggers. It's debatable whether we
1098
   * should do this for COPY, since it's not really an "INSERT" statement as
1099
   * such. However, executing these triggers maintains consistency with the
1100
   * EACH ROW triggers that we already fire on COPY.
1101
   */
1102
0
  ExecBSInsertTriggers(estate, resultRelInfo);
1103
1104
0
  econtext = GetPerTupleExprContext(estate);
1105
1106
  /* Set up callback to identify error line number */
1107
0
  errcallback.callback = CopyFromErrorCallback;
1108
0
  errcallback.arg = cstate;
1109
0
  errcallback.previous = error_context_stack;
1110
0
  error_context_stack = &errcallback;
1111
1112
0
  for (;;)
1113
0
  {
1114
0
    TupleTableSlot *myslot;
1115
0
    bool    skip_tuple;
1116
1117
0
    CHECK_FOR_INTERRUPTS();
1118
1119
    /*
1120
     * Reset the per-tuple exprcontext. We do this after every tuple, to
1121
     * clean-up after expression evaluations etc.
1122
     */
1123
0
    ResetPerTupleExprContext(estate);
1124
1125
    /* select slot to (initially) load row into */
1126
0
    if (insertMethod == CIM_SINGLE || proute)
1127
0
    {
1128
0
      myslot = singleslot;
1129
0
      Assert(myslot != NULL);
1130
0
    }
1131
0
    else
1132
0
    {
1133
0
      Assert(resultRelInfo == target_resultRelInfo);
1134
0
      Assert(insertMethod == CIM_MULTI);
1135
1136
0
      myslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo,
1137
0
                           resultRelInfo);
1138
0
    }
1139
1140
    /*
1141
     * Switch to per-tuple context before calling NextCopyFrom, which does
1142
     * evaluate default expressions etc. and requires per-tuple context.
1143
     */
1144
0
    MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
1145
1146
0
    ExecClearTuple(myslot);
1147
1148
    /* Directly store the values/nulls array in the slot */
1149
0
    if (!NextCopyFrom(cstate, econtext, myslot->tts_values, myslot->tts_isnull))
1150
0
      break;
1151
1152
0
    if (cstate->opts.on_error == COPY_ON_ERROR_IGNORE &&
1153
0
      cstate->escontext->error_occurred)
1154
0
    {
1155
      /*
1156
       * Soft error occurred, skip this tuple and just make
1157
       * ErrorSaveContext ready for the next NextCopyFrom. Since we
1158
       * don't set details_wanted and error_data is not to be filled,
1159
       * just resetting error_occurred is enough.
1160
       */
1161
0
      cstate->escontext->error_occurred = false;
1162
1163
      /* Report that this tuple was skipped by the ON_ERROR clause */
1164
0
      pgstat_progress_update_param(PROGRESS_COPY_TUPLES_SKIPPED,
1165
0
                     cstate->num_errors);
1166
1167
0
      if (cstate->opts.reject_limit > 0 &&
1168
0
        cstate->num_errors > cstate->opts.reject_limit)
1169
0
        ereport(ERROR,
1170
0
            (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1171
0
             errmsg("skipped more than REJECT_LIMIT (%" PRId64 ") rows due to data type incompatibility",
1172
0
                cstate->opts.reject_limit)));
1173
1174
      /* Repeat NextCopyFrom() until no soft error occurs */
1175
0
      continue;
1176
0
    }
1177
1178
0
    ExecStoreVirtualTuple(myslot);
1179
1180
    /*
1181
     * Constraints and where clause might reference the tableoid column,
1182
     * so (re-)initialize tts_tableOid before evaluating them.
1183
     */
1184
0
    myslot->tts_tableOid = RelationGetRelid(target_resultRelInfo->ri_RelationDesc);
1185
1186
    /* Triggers and stuff need to be invoked in query context. */
1187
0
    MemoryContextSwitchTo(oldcontext);
1188
1189
0
    if (cstate->whereClause)
1190
0
    {
1191
0
      econtext->ecxt_scantuple = myslot;
1192
      /* Skip items that don't match COPY's WHERE clause */
1193
0
      if (!ExecQual(cstate->qualexpr, econtext))
1194
0
      {
1195
        /*
1196
         * Report that this tuple was filtered out by the WHERE
1197
         * clause.
1198
         */
1199
0
        pgstat_progress_update_param(PROGRESS_COPY_TUPLES_EXCLUDED,
1200
0
                       ++excluded);
1201
0
        continue;
1202
0
      }
1203
0
    }
1204
1205
    /* Determine the partition to insert the tuple into */
1206
0
    if (proute)
1207
0
    {
1208
0
      TupleConversionMap *map;
1209
1210
      /*
1211
       * Attempt to find a partition suitable for this tuple.
1212
       * ExecFindPartition() will raise an error if none can be found or
1213
       * if the found partition is not suitable for INSERTs.
1214
       */
1215
0
      resultRelInfo = ExecFindPartition(mtstate, target_resultRelInfo,
1216
0
                        proute, myslot, estate);
1217
1218
0
      if (prevResultRelInfo != resultRelInfo)
1219
0
      {
1220
        /* Determine which triggers exist on this partition */
1221
0
        has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
1222
0
                        resultRelInfo->ri_TrigDesc->trig_insert_before_row);
1223
1224
0
        has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
1225
0
                         resultRelInfo->ri_TrigDesc->trig_insert_instead_row);
1226
1227
        /*
1228
         * Disable multi-inserts when the partition has BEFORE/INSTEAD
1229
         * OF triggers, or if the partition is a foreign table that
1230
         * can't use batching.
1231
         */
1232
0
        leafpart_use_multi_insert = insertMethod == CIM_MULTI_CONDITIONAL &&
1233
0
          !has_before_insert_row_trig &&
1234
0
          !has_instead_insert_row_trig &&
1235
0
          (resultRelInfo->ri_FdwRoutine == NULL ||
1236
0
           resultRelInfo->ri_BatchSize > 1);
1237
1238
        /* Set the multi-insert buffer to use for this partition. */
1239
0
        if (leafpart_use_multi_insert)
1240
0
        {
1241
0
          if (resultRelInfo->ri_CopyMultiInsertBuffer == NULL)
1242
0
            CopyMultiInsertInfoSetupBuffer(&multiInsertInfo,
1243
0
                             resultRelInfo);
1244
0
        }
1245
0
        else if (insertMethod == CIM_MULTI_CONDITIONAL &&
1246
0
             !CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
1247
0
        {
1248
          /*
1249
           * Flush pending inserts if this partition can't use
1250
           * batching, so rows are visible to triggers etc.
1251
           */
1252
0
          CopyMultiInsertInfoFlush(&multiInsertInfo,
1253
0
                       resultRelInfo,
1254
0
                       &processed);
1255
0
        }
1256
1257
0
        if (bistate != NULL)
1258
0
          ReleaseBulkInsertStatePin(bistate);
1259
0
        prevResultRelInfo = resultRelInfo;
1260
0
      }
1261
1262
      /*
1263
       * If we're capturing transition tuples, we might need to convert
1264
       * from the partition rowtype to root rowtype. But if there are no
1265
       * BEFORE triggers on the partition that could change the tuple,
1266
       * we can just remember the original unconverted tuple to avoid a
1267
       * needless round trip conversion.
1268
       */
1269
0
      if (cstate->transition_capture != NULL)
1270
0
        cstate->transition_capture->tcs_original_insert_tuple =
1271
0
          !has_before_insert_row_trig ? myslot : NULL;
1272
1273
      /*
1274
       * We might need to convert from the root rowtype to the partition
1275
       * rowtype.
1276
       */
1277
0
      map = ExecGetRootToChildMap(resultRelInfo, estate);
1278
0
      if (insertMethod == CIM_SINGLE || !leafpart_use_multi_insert)
1279
0
      {
1280
        /* non batch insert */
1281
0
        if (map != NULL)
1282
0
        {
1283
0
          TupleTableSlot *new_slot;
1284
1285
0
          new_slot = resultRelInfo->ri_PartitionTupleSlot;
1286
0
          myslot = execute_attr_map_slot(map->attrMap, myslot, new_slot);
1287
0
        }
1288
0
      }
1289
0
      else
1290
0
      {
1291
        /*
1292
         * Prepare to queue up tuple for later batch insert into
1293
         * current partition.
1294
         */
1295
0
        TupleTableSlot *batchslot;
1296
1297
        /* no other path available for partitioned table */
1298
0
        Assert(insertMethod == CIM_MULTI_CONDITIONAL);
1299
1300
0
        batchslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo,
1301
0
                              resultRelInfo);
1302
1303
0
        if (map != NULL)
1304
0
          myslot = execute_attr_map_slot(map->attrMap, myslot,
1305
0
                           batchslot);
1306
0
        else
1307
0
        {
1308
          /*
1309
           * This looks more expensive than it is (Believe me, I
1310
           * optimized it away. Twice.). The input is in virtual
1311
           * form, and we'll materialize the slot below - for most
1312
           * slot types the copy performs the work materialization
1313
           * would later require anyway.
1314
           */
1315
0
          ExecCopySlot(batchslot, myslot);
1316
0
          myslot = batchslot;
1317
0
        }
1318
0
      }
1319
1320
      /* ensure that triggers etc see the right relation  */
1321
0
      myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
1322
0
    }
1323
1324
0
    skip_tuple = false;
1325
1326
    /* BEFORE ROW INSERT Triggers */
1327
0
    if (has_before_insert_row_trig)
1328
0
    {
1329
0
      if (!ExecBRInsertTriggers(estate, resultRelInfo, myslot))
1330
0
        skip_tuple = true; /* "do nothing" */
1331
0
    }
1332
1333
0
    if (!skip_tuple)
1334
0
    {
1335
      /*
1336
       * If there is an INSTEAD OF INSERT ROW trigger, let it handle the
1337
       * tuple.  Otherwise, proceed with inserting the tuple into the
1338
       * table or foreign table.
1339
       */
1340
0
      if (has_instead_insert_row_trig)
1341
0
      {
1342
0
        ExecIRInsertTriggers(estate, resultRelInfo, myslot);
1343
0
      }
1344
0
      else
1345
0
      {
1346
        /* Compute stored generated columns */
1347
0
        if (resultRelInfo->ri_RelationDesc->rd_att->constr &&
1348
0
          resultRelInfo->ri_RelationDesc->rd_att->constr->has_generated_stored)
1349
0
          ExecComputeStoredGenerated(resultRelInfo, estate, myslot,
1350
0
                         CMD_INSERT);
1351
1352
        /*
1353
         * If the target is a plain table, check the constraints of
1354
         * the tuple.
1355
         */
1356
0
        if (resultRelInfo->ri_FdwRoutine == NULL &&
1357
0
          resultRelInfo->ri_RelationDesc->rd_att->constr)
1358
0
          ExecConstraints(resultRelInfo, myslot, estate);
1359
1360
        /*
1361
         * Also check the tuple against the partition constraint, if
1362
         * there is one; except that if we got here via tuple-routing,
1363
         * we don't need to if there's no BR trigger defined on the
1364
         * partition.
1365
         */
1366
0
        if (resultRelInfo->ri_RelationDesc->rd_rel->relispartition &&
1367
0
          (proute == NULL || has_before_insert_row_trig))
1368
0
          ExecPartitionCheck(resultRelInfo, myslot, estate, true);
1369
1370
        /* Store the slot in the multi-insert buffer, when enabled. */
1371
0
        if (insertMethod == CIM_MULTI || leafpart_use_multi_insert)
1372
0
        {
1373
          /*
1374
           * The slot previously might point into the per-tuple
1375
           * context. For batching it needs to be longer lived.
1376
           */
1377
0
          ExecMaterializeSlot(myslot);
1378
1379
          /* Add this tuple to the tuple buffer */
1380
0
          CopyMultiInsertInfoStore(&multiInsertInfo,
1381
0
                       resultRelInfo, myslot,
1382
0
                       cstate->line_buf.len,
1383
0
                       cstate->cur_lineno);
1384
1385
          /*
1386
           * If enough inserts have queued up, then flush all
1387
           * buffers out to their tables.
1388
           */
1389
0
          if (CopyMultiInsertInfoIsFull(&multiInsertInfo))
1390
0
            CopyMultiInsertInfoFlush(&multiInsertInfo,
1391
0
                         resultRelInfo,
1392
0
                         &processed);
1393
1394
          /*
1395
           * We delay updating the row counter and progress of the
1396
           * COPY command until after writing the tuples stored in
1397
           * the buffer out to the table, as in single insert mode.
1398
           * See CopyMultiInsertBufferFlush().
1399
           */
1400
0
          continue; /* next tuple please */
1401
0
        }
1402
0
        else
1403
0
        {
1404
0
          List     *recheckIndexes = NIL;
1405
1406
          /* OK, store the tuple */
1407
0
          if (resultRelInfo->ri_FdwRoutine != NULL)
1408
0
          {
1409
0
            myslot = resultRelInfo->ri_FdwRoutine->ExecForeignInsert(estate,
1410
0
                                         resultRelInfo,
1411
0
                                         myslot,
1412
0
                                         NULL);
1413
1414
0
            if (myslot == NULL) /* "do nothing" */
1415
0
              continue; /* next tuple please */
1416
1417
            /*
1418
             * AFTER ROW Triggers might reference the tableoid
1419
             * column, so (re-)initialize tts_tableOid before
1420
             * evaluating them.
1421
             */
1422
0
            myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
1423
0
          }
1424
0
          else
1425
0
          {
1426
            /* OK, store the tuple and create index entries for it */
1427
0
            table_tuple_insert(resultRelInfo->ri_RelationDesc,
1428
0
                       myslot, mycid, ti_options, bistate);
1429
1430
0
            if (resultRelInfo->ri_NumIndices > 0)
1431
0
              recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
1432
0
                                   myslot,
1433
0
                                   estate,
1434
0
                                   false,
1435
0
                                   false,
1436
0
                                   NULL,
1437
0
                                   NIL,
1438
0
                                   false);
1439
0
          }
1440
1441
          /* AFTER ROW INSERT Triggers */
1442
0
          ExecARInsertTriggers(estate, resultRelInfo, myslot,
1443
0
                     recheckIndexes, cstate->transition_capture);
1444
1445
0
          list_free(recheckIndexes);
1446
0
        }
1447
0
      }
1448
1449
      /*
1450
       * We count only tuples not suppressed by a BEFORE INSERT trigger
1451
       * or FDW; this is the same definition used by nodeModifyTable.c
1452
       * for counting tuples inserted by an INSERT command.  Update
1453
       * progress of the COPY command as well.
1454
       */
1455
0
      pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
1456
0
                     ++processed);
1457
0
    }
1458
0
  }
1459
1460
  /* Flush any remaining buffered tuples */
1461
0
  if (insertMethod != CIM_SINGLE)
1462
0
  {
1463
0
    if (!CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
1464
0
      CopyMultiInsertInfoFlush(&multiInsertInfo, NULL, &processed);
1465
0
  }
1466
1467
  /* Done, clean up */
1468
0
  error_context_stack = errcallback.previous;
1469
1470
0
  if (cstate->opts.on_error != COPY_ON_ERROR_STOP &&
1471
0
    cstate->num_errors > 0 &&
1472
0
    cstate->opts.log_verbosity >= COPY_LOG_VERBOSITY_DEFAULT)
1473
0
    ereport(NOTICE,
1474
0
        errmsg_plural("%" PRIu64 " row was skipped due to data type incompatibility",
1475
0
                "%" PRIu64 " rows were skipped due to data type incompatibility",
1476
0
                cstate->num_errors,
1477
0
                cstate->num_errors));
1478
1479
0
  if (bistate != NULL)
1480
0
    FreeBulkInsertState(bistate);
1481
1482
0
  MemoryContextSwitchTo(oldcontext);
1483
1484
  /* Execute AFTER STATEMENT insertion triggers */
1485
0
  ExecASInsertTriggers(estate, target_resultRelInfo, cstate->transition_capture);
1486
1487
  /* Handle queued AFTER triggers */
1488
0
  AfterTriggerEndQuery(estate);
1489
1490
0
  ExecResetTupleTable(estate->es_tupleTable, false);
1491
1492
  /* Allow the FDW to shut down */
1493
0
  if (target_resultRelInfo->ri_FdwRoutine != NULL &&
1494
0
    target_resultRelInfo->ri_FdwRoutine->EndForeignInsert != NULL)
1495
0
    target_resultRelInfo->ri_FdwRoutine->EndForeignInsert(estate,
1496
0
                                target_resultRelInfo);
1497
1498
  /* Tear down the multi-insert buffer data */
1499
0
  if (insertMethod != CIM_SINGLE)
1500
0
    CopyMultiInsertInfoCleanup(&multiInsertInfo);
1501
1502
  /* Close all the partitioned tables, leaf partitions, and their indices */
1503
0
  if (proute)
1504
0
    ExecCleanupTupleRouting(mtstate, proute);
1505
1506
  /* Close the result relations, including any trigger target relations */
1507
0
  ExecCloseResultRelations(estate);
1508
0
  ExecCloseRangeTableRelations(estate);
1509
1510
0
  FreeExecutorState(estate);
1511
1512
0
  return processed;
1513
0
}
1514
1515
/*
1516
 * Setup to read tuples from a file for COPY FROM.
1517
 *
1518
 * 'rel': Used as a template for the tuples
1519
 * 'whereClause': WHERE clause from the COPY FROM command
1520
 * 'filename': Name of server-local file to read, NULL for STDIN
1521
 * 'is_program': true if 'filename' is program to execute
1522
 * 'data_source_cb': callback that provides the input data
1523
 * 'attnamelist': List of char *, columns to include. NIL selects all cols.
1524
 * 'options': List of DefElem. See copy_opt_item in gram.y for selections.
1525
 *
1526
 * Returns a CopyFromState, to be passed to NextCopyFrom and related functions.
1527
 */
1528
CopyFromState
1529
BeginCopyFrom(ParseState *pstate,
1530
        Relation rel,
1531
        Node *whereClause,
1532
        const char *filename,
1533
        bool is_program,
1534
        copy_data_source_cb data_source_cb,
1535
        List *attnamelist,
1536
        List *options)
1537
0
{
1538
0
  CopyFromState cstate;
1539
0
  bool    pipe = (filename == NULL);
1540
0
  TupleDesc tupDesc;
1541
0
  AttrNumber  num_phys_attrs,
1542
0
        num_defaults;
1543
0
  FmgrInfo   *in_functions;
1544
0
  Oid      *typioparams;
1545
0
  int      *defmap;
1546
0
  ExprState **defexprs;
1547
0
  MemoryContext oldcontext;
1548
0
  bool    volatile_defexprs;
1549
0
  const int progress_cols[] = {
1550
0
    PROGRESS_COPY_COMMAND,
1551
0
    PROGRESS_COPY_TYPE,
1552
0
    PROGRESS_COPY_BYTES_TOTAL
1553
0
  };
1554
0
  int64   progress_vals[] = {
1555
0
    PROGRESS_COPY_COMMAND_FROM,
1556
0
    0,
1557
0
    0
1558
0
  };
1559
1560
  /* Allocate workspace and zero all fields */
1561
0
  cstate = (CopyFromStateData *) palloc0(sizeof(CopyFromStateData));
1562
1563
  /*
1564
   * We allocate everything used by a cstate in a new memory context. This
1565
   * avoids memory leaks during repeated use of COPY in a query.
1566
   */
1567
0
  cstate->copycontext = AllocSetContextCreate(CurrentMemoryContext,
1568
0
                        "COPY",
1569
0
                        ALLOCSET_DEFAULT_SIZES);
1570
1571
0
  oldcontext = MemoryContextSwitchTo(cstate->copycontext);
1572
1573
  /* Extract options from the statement node tree */
1574
0
  ProcessCopyOptions(pstate, &cstate->opts, true /* is_from */ , options);
1575
1576
  /* Set the format routine */
1577
0
  cstate->routine = CopyFromGetRoutine(&cstate->opts);
1578
1579
  /* Process the target relation */
1580
0
  cstate->rel = rel;
1581
1582
0
  tupDesc = RelationGetDescr(cstate->rel);
1583
1584
  /* process common options or initialization */
1585
1586
  /* Generate or convert list of attributes to process */
1587
0
  cstate->attnumlist = CopyGetAttnums(tupDesc, cstate->rel, attnamelist);
1588
1589
0
  num_phys_attrs = tupDesc->natts;
1590
1591
  /* Convert FORCE_NOT_NULL name list to per-column flags, check validity */
1592
0
  cstate->opts.force_notnull_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1593
0
  if (cstate->opts.force_notnull_all)
1594
0
    MemSet(cstate->opts.force_notnull_flags, true, num_phys_attrs * sizeof(bool));
1595
0
  else if (cstate->opts.force_notnull)
1596
0
  {
1597
0
    List     *attnums;
1598
0
    ListCell   *cur;
1599
1600
0
    attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.force_notnull);
1601
1602
0
    foreach(cur, attnums)
1603
0
    {
1604
0
      int     attnum = lfirst_int(cur);
1605
0
      Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1606
1607
0
      if (!list_member_int(cstate->attnumlist, attnum))
1608
0
        ereport(ERROR,
1609
0
            (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1610
        /*- translator: first %s is the name of a COPY option, e.g. FORCE_NOT_NULL */
1611
0
             errmsg("%s column \"%s\" not referenced by COPY",
1612
0
                "FORCE_NOT_NULL", NameStr(attr->attname))));
1613
0
      cstate->opts.force_notnull_flags[attnum - 1] = true;
1614
0
    }
1615
0
  }
1616
1617
  /* Set up soft error handler for ON_ERROR */
1618
0
  if (cstate->opts.on_error != COPY_ON_ERROR_STOP)
1619
0
  {
1620
0
    cstate->escontext = makeNode(ErrorSaveContext);
1621
0
    cstate->escontext->type = T_ErrorSaveContext;
1622
0
    cstate->escontext->error_occurred = false;
1623
1624
    /*
1625
     * Currently we only support COPY_ON_ERROR_IGNORE. We'll add other
1626
     * options later
1627
     */
1628
0
    if (cstate->opts.on_error == COPY_ON_ERROR_IGNORE)
1629
0
      cstate->escontext->details_wanted = false;
1630
0
  }
1631
0
  else
1632
0
    cstate->escontext = NULL;
1633
1634
  /* Convert FORCE_NULL name list to per-column flags, check validity */
1635
0
  cstate->opts.force_null_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1636
0
  if (cstate->opts.force_null_all)
1637
0
    MemSet(cstate->opts.force_null_flags, true, num_phys_attrs * sizeof(bool));
1638
0
  else if (cstate->opts.force_null)
1639
0
  {
1640
0
    List     *attnums;
1641
0
    ListCell   *cur;
1642
1643
0
    attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.force_null);
1644
1645
0
    foreach(cur, attnums)
1646
0
    {
1647
0
      int     attnum = lfirst_int(cur);
1648
0
      Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1649
1650
0
      if (!list_member_int(cstate->attnumlist, attnum))
1651
0
        ereport(ERROR,
1652
0
            (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1653
        /*- translator: first %s is the name of a COPY option, e.g. FORCE_NOT_NULL */
1654
0
             errmsg("%s column \"%s\" not referenced by COPY",
1655
0
                "FORCE_NULL", NameStr(attr->attname))));
1656
0
      cstate->opts.force_null_flags[attnum - 1] = true;
1657
0
    }
1658
0
  }
1659
1660
  /* Convert convert_selectively name list to per-column flags */
1661
0
  if (cstate->opts.convert_selectively)
1662
0
  {
1663
0
    List     *attnums;
1664
0
    ListCell   *cur;
1665
1666
0
    cstate->convert_select_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1667
1668
0
    attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.convert_select);
1669
1670
0
    foreach(cur, attnums)
1671
0
    {
1672
0
      int     attnum = lfirst_int(cur);
1673
0
      Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1674
1675
0
      if (!list_member_int(cstate->attnumlist, attnum))
1676
0
        ereport(ERROR,
1677
0
            (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1678
0
             errmsg_internal("selected column \"%s\" not referenced by COPY",
1679
0
                     NameStr(attr->attname))));
1680
0
      cstate->convert_select_flags[attnum - 1] = true;
1681
0
    }
1682
0
  }
1683
1684
  /* Use client encoding when ENCODING option is not specified. */
1685
0
  if (cstate->opts.file_encoding < 0)
1686
0
    cstate->file_encoding = pg_get_client_encoding();
1687
0
  else
1688
0
    cstate->file_encoding = cstate->opts.file_encoding;
1689
1690
  /*
1691
   * Look up encoding conversion function.
1692
   */
1693
0
  if (cstate->file_encoding == GetDatabaseEncoding() ||
1694
0
    cstate->file_encoding == PG_SQL_ASCII ||
1695
0
    GetDatabaseEncoding() == PG_SQL_ASCII)
1696
0
  {
1697
0
    cstate->need_transcoding = false;
1698
0
  }
1699
0
  else
1700
0
  {
1701
0
    cstate->need_transcoding = true;
1702
0
    cstate->conversion_proc = FindDefaultConversionProc(cstate->file_encoding,
1703
0
                              GetDatabaseEncoding());
1704
0
    if (!OidIsValid(cstate->conversion_proc))
1705
0
      ereport(ERROR,
1706
0
          (errcode(ERRCODE_UNDEFINED_FUNCTION),
1707
0
           errmsg("default conversion function for encoding \"%s\" to \"%s\" does not exist",
1708
0
              pg_encoding_to_char(cstate->file_encoding),
1709
0
              pg_encoding_to_char(GetDatabaseEncoding()))));
1710
0
  }
1711
1712
0
  cstate->copy_src = COPY_FILE; /* default */
1713
1714
0
  cstate->whereClause = whereClause;
1715
1716
  /* Initialize state variables */
1717
0
  cstate->eol_type = EOL_UNKNOWN;
1718
0
  cstate->cur_relname = RelationGetRelationName(cstate->rel);
1719
0
  cstate->cur_lineno = 0;
1720
0
  cstate->cur_attname = NULL;
1721
0
  cstate->cur_attval = NULL;
1722
0
  cstate->relname_only = false;
1723
1724
  /*
1725
   * Allocate buffers for the input pipeline.
1726
   *
1727
   * attribute_buf and raw_buf are used in both text and binary modes, but
1728
   * input_buf and line_buf only in text mode.
1729
   */
1730
0
  cstate->raw_buf = palloc(RAW_BUF_SIZE + 1);
1731
0
  cstate->raw_buf_index = cstate->raw_buf_len = 0;
1732
0
  cstate->raw_reached_eof = false;
1733
1734
0
  initStringInfo(&cstate->attribute_buf);
1735
1736
  /* Assign range table and rteperminfos, we'll need them in CopyFrom. */
1737
0
  if (pstate)
1738
0
  {
1739
0
    cstate->range_table = pstate->p_rtable;
1740
0
    cstate->rteperminfos = pstate->p_rteperminfos;
1741
0
  }
1742
1743
0
  num_defaults = 0;
1744
0
  volatile_defexprs = false;
1745
1746
  /*
1747
   * Pick up the required catalog information for each attribute in the
1748
   * relation, including the input function, the element type (to pass to
1749
   * the input function), and info about defaults and constraints. (Which
1750
   * input function we use depends on text/binary format choice.)
1751
   */
1752
0
  in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
1753
0
  typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid));
1754
0
  defmap = (int *) palloc(num_phys_attrs * sizeof(int));
1755
0
  defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
1756
1757
0
  for (int attnum = 1; attnum <= num_phys_attrs; attnum++)
1758
0
  {
1759
0
    Form_pg_attribute att = TupleDescAttr(tupDesc, attnum - 1);
1760
1761
    /* We don't need info for dropped attributes */
1762
0
    if (att->attisdropped)
1763
0
      continue;
1764
1765
    /* Fetch the input function and typioparam info */
1766
0
    cstate->routine->CopyFromInFunc(cstate, att->atttypid,
1767
0
                    &in_functions[attnum - 1],
1768
0
                    &typioparams[attnum - 1]);
1769
1770
    /* Get default info if available */
1771
0
    defexprs[attnum - 1] = NULL;
1772
1773
    /*
1774
     * We only need the default values for columns that do not appear in
1775
     * the column list, unless the DEFAULT option was given. We never need
1776
     * default values for generated columns.
1777
     */
1778
0
    if ((cstate->opts.default_print != NULL ||
1779
0
       !list_member_int(cstate->attnumlist, attnum)) &&
1780
0
      !att->attgenerated)
1781
0
    {
1782
0
      Expr     *defexpr = (Expr *) build_column_default(cstate->rel,
1783
0
                                attnum);
1784
1785
0
      if (defexpr != NULL)
1786
0
      {
1787
        /* Run the expression through planner */
1788
0
        defexpr = expression_planner(defexpr);
1789
1790
        /* Initialize executable expression in copycontext */
1791
0
        defexprs[attnum - 1] = ExecInitExpr(defexpr, NULL);
1792
1793
        /* if NOT copied from input */
1794
        /* use default value if one exists */
1795
0
        if (!list_member_int(cstate->attnumlist, attnum))
1796
0
        {
1797
0
          defmap[num_defaults] = attnum - 1;
1798
0
          num_defaults++;
1799
0
        }
1800
1801
        /*
1802
         * If a default expression looks at the table being loaded,
1803
         * then it could give the wrong answer when using
1804
         * multi-insert. Since database access can be dynamic this is
1805
         * hard to test for exactly, so we use the much wider test of
1806
         * whether the default expression is volatile. We allow for
1807
         * the special case of when the default expression is the
1808
         * nextval() of a sequence which in this specific case is
1809
         * known to be safe for use with the multi-insert
1810
         * optimization. Hence we use this special case function
1811
         * checker rather than the standard check for
1812
         * contain_volatile_functions().  Note also that we already
1813
         * ran the expression through expression_planner().
1814
         */
1815
0
        if (!volatile_defexprs)
1816
0
          volatile_defexprs = contain_volatile_functions_not_nextval((Node *) defexpr);
1817
0
      }
1818
0
    }
1819
0
  }
1820
1821
0
  cstate->defaults = (bool *) palloc0(tupDesc->natts * sizeof(bool));
1822
1823
  /* initialize progress */
1824
0
  pgstat_progress_start_command(PROGRESS_COMMAND_COPY,
1825
0
                  cstate->rel ? RelationGetRelid(cstate->rel) : InvalidOid);
1826
0
  cstate->bytes_processed = 0;
1827
1828
  /* We keep those variables in cstate. */
1829
0
  cstate->in_functions = in_functions;
1830
0
  cstate->typioparams = typioparams;
1831
0
  cstate->defmap = defmap;
1832
0
  cstate->defexprs = defexprs;
1833
0
  cstate->volatile_defexprs = volatile_defexprs;
1834
0
  cstate->num_defaults = num_defaults;
1835
0
  cstate->is_program = is_program;
1836
1837
0
  if (data_source_cb)
1838
0
  {
1839
0
    progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK;
1840
0
    cstate->copy_src = COPY_CALLBACK;
1841
0
    cstate->data_source_cb = data_source_cb;
1842
0
  }
1843
0
  else if (pipe)
1844
0
  {
1845
0
    progress_vals[1] = PROGRESS_COPY_TYPE_PIPE;
1846
0
    Assert(!is_program);  /* the grammar does not allow this */
1847
0
    if (whereToSendOutput == DestRemote)
1848
0
      ReceiveCopyBegin(cstate);
1849
0
    else
1850
0
      cstate->copy_file = stdin;
1851
0
  }
1852
0
  else
1853
0
  {
1854
0
    cstate->filename = pstrdup(filename);
1855
1856
0
    if (cstate->is_program)
1857
0
    {
1858
0
      progress_vals[1] = PROGRESS_COPY_TYPE_PROGRAM;
1859
0
      cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_R);
1860
0
      if (cstate->copy_file == NULL)
1861
0
        ereport(ERROR,
1862
0
            (errcode_for_file_access(),
1863
0
             errmsg("could not execute command \"%s\": %m",
1864
0
                cstate->filename)));
1865
0
    }
1866
0
    else
1867
0
    {
1868
0
      struct stat st;
1869
1870
0
      progress_vals[1] = PROGRESS_COPY_TYPE_FILE;
1871
0
      cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
1872
0
      if (cstate->copy_file == NULL)
1873
0
      {
1874
        /* copy errno because ereport subfunctions might change it */
1875
0
        int     save_errno = errno;
1876
1877
0
        ereport(ERROR,
1878
0
            (errcode_for_file_access(),
1879
0
             errmsg("could not open file \"%s\" for reading: %m",
1880
0
                cstate->filename),
1881
0
             (save_errno == ENOENT || save_errno == EACCES) ?
1882
0
             errhint("COPY FROM instructs the PostgreSQL server process to read a file. "
1883
0
                 "You may want a client-side facility such as psql's \\copy.") : 0));
1884
0
      }
1885
1886
0
      if (fstat(fileno(cstate->copy_file), &st))
1887
0
        ereport(ERROR,
1888
0
            (errcode_for_file_access(),
1889
0
             errmsg("could not stat file \"%s\": %m",
1890
0
                cstate->filename)));
1891
1892
0
      if (S_ISDIR(st.st_mode))
1893
0
        ereport(ERROR,
1894
0
            (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1895
0
             errmsg("\"%s\" is a directory", cstate->filename)));
1896
1897
0
      progress_vals[2] = st.st_size;
1898
0
    }
1899
0
  }
1900
1901
0
  pgstat_progress_update_multi_param(3, progress_cols, progress_vals);
1902
1903
0
  cstate->routine->CopyFromStart(cstate, tupDesc);
1904
1905
0
  MemoryContextSwitchTo(oldcontext);
1906
1907
0
  return cstate;
1908
0
}
1909
1910
/*
1911
 * Clean up storage and release resources for COPY FROM.
1912
 */
1913
void
1914
EndCopyFrom(CopyFromState cstate)
1915
0
{
1916
  /* Invoke the end callback */
1917
0
  cstate->routine->CopyFromEnd(cstate);
1918
1919
  /* No COPY FROM related resources except memory. */
1920
0
  if (cstate->is_program)
1921
0
  {
1922
0
    ClosePipeFromProgram(cstate);
1923
0
  }
1924
0
  else
1925
0
  {
1926
0
    if (cstate->filename != NULL && FreeFile(cstate->copy_file))
1927
0
      ereport(ERROR,
1928
0
          (errcode_for_file_access(),
1929
0
           errmsg("could not close file \"%s\": %m",
1930
0
              cstate->filename)));
1931
0
  }
1932
1933
0
  pgstat_progress_end_command();
1934
1935
0
  MemoryContextDelete(cstate->copycontext);
1936
0
  pfree(cstate);
1937
0
}
1938
1939
/*
1940
 * Closes the pipe from an external program, checking the pclose() return code.
1941
 */
1942
static void
1943
ClosePipeFromProgram(CopyFromState cstate)
1944
0
{
1945
0
  int     pclose_rc;
1946
1947
0
  Assert(cstate->is_program);
1948
1949
0
  pclose_rc = ClosePipeStream(cstate->copy_file);
1950
0
  if (pclose_rc == -1)
1951
0
    ereport(ERROR,
1952
0
        (errcode_for_file_access(),
1953
0
         errmsg("could not close pipe to external command: %m")));
1954
0
  else if (pclose_rc != 0)
1955
0
  {
1956
    /*
1957
     * If we ended a COPY FROM PROGRAM before reaching EOF, then it's
1958
     * expectable for the called program to fail with SIGPIPE, and we
1959
     * should not report that as an error.  Otherwise, SIGPIPE indicates a
1960
     * problem.
1961
     */
1962
0
    if (!cstate->raw_reached_eof &&
1963
0
      wait_result_is_signal(pclose_rc, SIGPIPE))
1964
0
      return;
1965
1966
0
    ereport(ERROR,
1967
0
        (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
1968
0
         errmsg("program \"%s\" failed",
1969
0
            cstate->filename),
1970
0
         errdetail_internal("%s", wait_result_to_str(pclose_rc))));
1971
0
  }
1972
0
}