/src/postgres/src/backend/commands/copyto.c
Line | Count | Source (jump to first uncovered line) |
1 | | /*------------------------------------------------------------------------- |
2 | | * |
3 | | * copyto.c |
4 | | * COPY <table> TO file/program/client |
5 | | * |
6 | | * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group |
7 | | * Portions Copyright (c) 1994, Regents of the University of California |
8 | | * |
9 | | * |
10 | | * IDENTIFICATION |
11 | | * src/backend/commands/copyto.c |
12 | | * |
13 | | *------------------------------------------------------------------------- |
14 | | */ |
15 | | #include "postgres.h" |
16 | | |
17 | | #include <ctype.h> |
18 | | #include <unistd.h> |
19 | | #include <sys/stat.h> |
20 | | |
21 | | #include "access/tableam.h" |
22 | | #include "commands/copyapi.h" |
23 | | #include "commands/progress.h" |
24 | | #include "executor/execdesc.h" |
25 | | #include "executor/executor.h" |
26 | | #include "executor/tuptable.h" |
27 | | #include "libpq/libpq.h" |
28 | | #include "libpq/pqformat.h" |
29 | | #include "mb/pg_wchar.h" |
30 | | #include "miscadmin.h" |
31 | | #include "pgstat.h" |
32 | | #include "storage/fd.h" |
33 | | #include "tcop/tcopprot.h" |
34 | | #include "utils/lsyscache.h" |
35 | | #include "utils/memutils.h" |
36 | | #include "utils/rel.h" |
37 | | #include "utils/snapmgr.h" |
38 | | |
39 | | /* |
40 | | * Represents the different dest cases we need to worry about at |
41 | | * the bottom level |
42 | | */ |
43 | | typedef enum CopyDest |
44 | | { |
45 | | COPY_FILE, /* to file (or a piped program) */ |
46 | | COPY_FRONTEND, /* to frontend */ |
47 | | COPY_CALLBACK, /* to callback function */ |
48 | | } CopyDest; |
49 | | |
50 | | /* |
51 | | * This struct contains all the state variables used throughout a COPY TO |
52 | | * operation. |
53 | | * |
54 | | * Multi-byte encodings: all supported client-side encodings encode multi-byte |
55 | | * characters by having the first byte's high bit set. Subsequent bytes of the |
56 | | * character can have the high bit not set. When scanning data in such an |
57 | | * encoding to look for a match to a single-byte (ie ASCII) character, we must |
58 | | * use the full pg_encoding_mblen() machinery to skip over multibyte |
59 | | * characters, else we might find a false match to a trailing byte. In |
60 | | * supported server encodings, there is no possibility of a false match, and |
61 | | * it's faster to make useless comparisons to trailing bytes than it is to |
62 | | * invoke pg_encoding_mblen() to skip over them. encoding_embeds_ascii is true |
63 | | * when we have to do it the hard way. |
64 | | */ |
65 | | typedef struct CopyToStateData |
66 | | { |
67 | | /* format-specific routines */ |
68 | | const CopyToRoutine *routine; |
69 | | |
70 | | /* low-level state data */ |
71 | | CopyDest copy_dest; /* type of copy source/destination */ |
72 | | FILE *copy_file; /* used if copy_dest == COPY_FILE */ |
73 | | StringInfo fe_msgbuf; /* used for all dests during COPY TO */ |
74 | | |
75 | | int file_encoding; /* file or remote side's character encoding */ |
76 | | bool need_transcoding; /* file encoding diff from server? */ |
77 | | bool encoding_embeds_ascii; /* ASCII can be non-first byte? */ |
78 | | |
79 | | /* parameters from the COPY command */ |
80 | | Relation rel; /* relation to copy to */ |
81 | | QueryDesc *queryDesc; /* executable query to copy from */ |
82 | | List *attnumlist; /* integer list of attnums to copy */ |
83 | | char *filename; /* filename, or NULL for STDOUT */ |
84 | | bool is_program; /* is 'filename' a program to popen? */ |
85 | | copy_data_dest_cb data_dest_cb; /* function for writing data */ |
86 | | |
87 | | CopyFormatOptions opts; |
88 | | Node *whereClause; /* WHERE condition (or NULL) */ |
89 | | |
90 | | /* |
91 | | * Working state |
92 | | */ |
93 | | MemoryContext copycontext; /* per-copy execution context */ |
94 | | |
95 | | FmgrInfo *out_functions; /* lookup info for output functions */ |
96 | | MemoryContext rowcontext; /* per-row evaluation context */ |
97 | | uint64 bytes_processed; /* number of bytes processed so far */ |
98 | | } CopyToStateData; |
99 | | |
100 | | /* DestReceiver for COPY (query) TO */ |
101 | | typedef struct |
102 | | { |
103 | | DestReceiver pub; /* publicly-known function pointers */ |
104 | | CopyToState cstate; /* CopyToStateData for the command */ |
105 | | uint64 processed; /* # of tuples processed */ |
106 | | } DR_copy; |
107 | | |
108 | | /* NOTE: there's a copy of this in copyfromparse.c */ |
109 | | static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0"; |
110 | | |
111 | | |
112 | | /* non-export function prototypes */ |
113 | | static void EndCopy(CopyToState cstate); |
114 | | static void ClosePipeToProgram(CopyToState cstate); |
115 | | static void CopyOneRowTo(CopyToState cstate, TupleTableSlot *slot); |
116 | | static void CopyAttributeOutText(CopyToState cstate, const char *string); |
117 | | static void CopyAttributeOutCSV(CopyToState cstate, const char *string, |
118 | | bool use_quote); |
119 | | |
120 | | /* built-in format-specific routines */ |
121 | | static void CopyToTextLikeStart(CopyToState cstate, TupleDesc tupDesc); |
122 | | static void CopyToTextLikeOutFunc(CopyToState cstate, Oid atttypid, FmgrInfo *finfo); |
123 | | static void CopyToTextOneRow(CopyToState cstate, TupleTableSlot *slot); |
124 | | static void CopyToCSVOneRow(CopyToState cstate, TupleTableSlot *slot); |
125 | | static void CopyToTextLikeOneRow(CopyToState cstate, TupleTableSlot *slot, |
126 | | bool is_csv); |
127 | | static void CopyToTextLikeEnd(CopyToState cstate); |
128 | | static void CopyToBinaryStart(CopyToState cstate, TupleDesc tupDesc); |
129 | | static void CopyToBinaryOutFunc(CopyToState cstate, Oid atttypid, FmgrInfo *finfo); |
130 | | static void CopyToBinaryOneRow(CopyToState cstate, TupleTableSlot *slot); |
131 | | static void CopyToBinaryEnd(CopyToState cstate); |
132 | | |
133 | | /* Low-level communications functions */ |
134 | | static void SendCopyBegin(CopyToState cstate); |
135 | | static void SendCopyEnd(CopyToState cstate); |
136 | | static void CopySendData(CopyToState cstate, const void *databuf, int datasize); |
137 | | static void CopySendString(CopyToState cstate, const char *str); |
138 | | static void CopySendChar(CopyToState cstate, char c); |
139 | | static void CopySendEndOfRow(CopyToState cstate); |
140 | | static void CopySendTextLikeEndOfRow(CopyToState cstate); |
141 | | static void CopySendInt32(CopyToState cstate, int32 val); |
142 | | static void CopySendInt16(CopyToState cstate, int16 val); |
143 | | |
144 | | /* |
145 | | * COPY TO routines for built-in formats. |
146 | | * |
147 | | * CSV and text formats share the same TextLike routines except for the |
148 | | * one-row callback. |
149 | | */ |
150 | | |
151 | | /* text format */ |
152 | | static const CopyToRoutine CopyToRoutineText = { |
153 | | .CopyToStart = CopyToTextLikeStart, |
154 | | .CopyToOutFunc = CopyToTextLikeOutFunc, |
155 | | .CopyToOneRow = CopyToTextOneRow, |
156 | | .CopyToEnd = CopyToTextLikeEnd, |
157 | | }; |
158 | | |
159 | | /* CSV format */ |
160 | | static const CopyToRoutine CopyToRoutineCSV = { |
161 | | .CopyToStart = CopyToTextLikeStart, |
162 | | .CopyToOutFunc = CopyToTextLikeOutFunc, |
163 | | .CopyToOneRow = CopyToCSVOneRow, |
164 | | .CopyToEnd = CopyToTextLikeEnd, |
165 | | }; |
166 | | |
167 | | /* binary format */ |
168 | | static const CopyToRoutine CopyToRoutineBinary = { |
169 | | .CopyToStart = CopyToBinaryStart, |
170 | | .CopyToOutFunc = CopyToBinaryOutFunc, |
171 | | .CopyToOneRow = CopyToBinaryOneRow, |
172 | | .CopyToEnd = CopyToBinaryEnd, |
173 | | }; |
174 | | |
175 | | /* Return a COPY TO routine for the given options */ |
176 | | static const CopyToRoutine * |
177 | | CopyToGetRoutine(const CopyFormatOptions *opts) |
178 | 0 | { |
179 | 0 | if (opts->csv_mode) |
180 | 0 | return &CopyToRoutineCSV; |
181 | 0 | else if (opts->binary) |
182 | 0 | return &CopyToRoutineBinary; |
183 | | |
184 | | /* default is text */ |
185 | 0 | return &CopyToRoutineText; |
186 | 0 | } |
187 | | |
188 | | /* Implementation of the start callback for text and CSV formats */ |
189 | | static void |
190 | | CopyToTextLikeStart(CopyToState cstate, TupleDesc tupDesc) |
191 | 0 | { |
192 | | /* |
193 | | * For non-binary copy, we need to convert null_print to file encoding, |
194 | | * because it will be sent directly with CopySendString. |
195 | | */ |
196 | 0 | if (cstate->need_transcoding) |
197 | 0 | cstate->opts.null_print_client = pg_server_to_any(cstate->opts.null_print, |
198 | 0 | cstate->opts.null_print_len, |
199 | 0 | cstate->file_encoding); |
200 | | |
201 | | /* if a header has been requested send the line */ |
202 | 0 | if (cstate->opts.header_line == COPY_HEADER_TRUE) |
203 | 0 | { |
204 | 0 | ListCell *cur; |
205 | 0 | bool hdr_delim = false; |
206 | |
|
207 | 0 | foreach(cur, cstate->attnumlist) |
208 | 0 | { |
209 | 0 | int attnum = lfirst_int(cur); |
210 | 0 | char *colname; |
211 | |
|
212 | 0 | if (hdr_delim) |
213 | 0 | CopySendChar(cstate, cstate->opts.delim[0]); |
214 | 0 | hdr_delim = true; |
215 | |
|
216 | 0 | colname = NameStr(TupleDescAttr(tupDesc, attnum - 1)->attname); |
217 | |
|
218 | 0 | if (cstate->opts.csv_mode) |
219 | 0 | CopyAttributeOutCSV(cstate, colname, false); |
220 | 0 | else |
221 | 0 | CopyAttributeOutText(cstate, colname); |
222 | 0 | } |
223 | |
|
224 | 0 | CopySendTextLikeEndOfRow(cstate); |
225 | 0 | } |
226 | 0 | } |
227 | | |
228 | | /* |
229 | | * Implementation of the outfunc callback for text and CSV formats. Assign |
230 | | * the output function data to the given *finfo. |
231 | | */ |
232 | | static void |
233 | | CopyToTextLikeOutFunc(CopyToState cstate, Oid atttypid, FmgrInfo *finfo) |
234 | 0 | { |
235 | 0 | Oid func_oid; |
236 | 0 | bool is_varlena; |
237 | | |
238 | | /* Set output function for an attribute */ |
239 | 0 | getTypeOutputInfo(atttypid, &func_oid, &is_varlena); |
240 | 0 | fmgr_info(func_oid, finfo); |
241 | 0 | } |
242 | | |
243 | | /* Implementation of the per-row callback for text format */ |
244 | | static void |
245 | | CopyToTextOneRow(CopyToState cstate, TupleTableSlot *slot) |
246 | 0 | { |
247 | 0 | CopyToTextLikeOneRow(cstate, slot, false); |
248 | 0 | } |
249 | | |
250 | | /* Implementation of the per-row callback for CSV format */ |
251 | | static void |
252 | | CopyToCSVOneRow(CopyToState cstate, TupleTableSlot *slot) |
253 | 0 | { |
254 | 0 | CopyToTextLikeOneRow(cstate, slot, true); |
255 | 0 | } |
256 | | |
257 | | /* |
258 | | * Workhorse for CopyToTextOneRow() and CopyToCSVOneRow(). |
259 | | * |
260 | | * We use pg_attribute_always_inline to reduce function call overhead |
261 | | * and to help compilers to optimize away the 'is_csv' condition. |
262 | | */ |
263 | | static pg_attribute_always_inline void |
264 | | CopyToTextLikeOneRow(CopyToState cstate, |
265 | | TupleTableSlot *slot, |
266 | | bool is_csv) |
267 | 0 | { |
268 | 0 | bool need_delim = false; |
269 | 0 | FmgrInfo *out_functions = cstate->out_functions; |
270 | |
|
271 | 0 | foreach_int(attnum, cstate->attnumlist) |
272 | 0 | { |
273 | 0 | Datum value = slot->tts_values[attnum - 1]; |
274 | 0 | bool isnull = slot->tts_isnull[attnum - 1]; |
275 | |
|
276 | 0 | if (need_delim) |
277 | 0 | CopySendChar(cstate, cstate->opts.delim[0]); |
278 | 0 | need_delim = true; |
279 | |
|
280 | 0 | if (isnull) |
281 | 0 | { |
282 | 0 | CopySendString(cstate, cstate->opts.null_print_client); |
283 | 0 | } |
284 | 0 | else |
285 | 0 | { |
286 | 0 | char *string; |
287 | |
|
288 | 0 | string = OutputFunctionCall(&out_functions[attnum - 1], |
289 | 0 | value); |
290 | |
|
291 | 0 | if (is_csv) |
292 | 0 | CopyAttributeOutCSV(cstate, string, |
293 | 0 | cstate->opts.force_quote_flags[attnum - 1]); |
294 | 0 | else |
295 | 0 | CopyAttributeOutText(cstate, string); |
296 | 0 | } |
297 | 0 | } |
298 | |
|
299 | 0 | CopySendTextLikeEndOfRow(cstate); |
300 | 0 | } |
301 | | |
302 | | /* Implementation of the end callback for text and CSV formats */ |
303 | | static void |
304 | | CopyToTextLikeEnd(CopyToState cstate) |
305 | 0 | { |
306 | | /* Nothing to do here */ |
307 | 0 | } |
308 | | |
309 | | /* |
310 | | * Implementation of the start callback for binary format. Send a header |
311 | | * for a binary copy. |
312 | | */ |
313 | | static void |
314 | | CopyToBinaryStart(CopyToState cstate, TupleDesc tupDesc) |
315 | 0 | { |
316 | 0 | int32 tmp; |
317 | | |
318 | | /* Signature */ |
319 | 0 | CopySendData(cstate, BinarySignature, 11); |
320 | | /* Flags field */ |
321 | 0 | tmp = 0; |
322 | 0 | CopySendInt32(cstate, tmp); |
323 | | /* No header extension */ |
324 | 0 | tmp = 0; |
325 | 0 | CopySendInt32(cstate, tmp); |
326 | 0 | } |
327 | | |
328 | | /* |
329 | | * Implementation of the outfunc callback for binary format. Assign |
330 | | * the binary output function to the given *finfo. |
331 | | */ |
332 | | static void |
333 | | CopyToBinaryOutFunc(CopyToState cstate, Oid atttypid, FmgrInfo *finfo) |
334 | 0 | { |
335 | 0 | Oid func_oid; |
336 | 0 | bool is_varlena; |
337 | | |
338 | | /* Set output function for an attribute */ |
339 | 0 | getTypeBinaryOutputInfo(atttypid, &func_oid, &is_varlena); |
340 | 0 | fmgr_info(func_oid, finfo); |
341 | 0 | } |
342 | | |
343 | | /* Implementation of the per-row callback for binary format */ |
344 | | static void |
345 | | CopyToBinaryOneRow(CopyToState cstate, TupleTableSlot *slot) |
346 | 0 | { |
347 | 0 | FmgrInfo *out_functions = cstate->out_functions; |
348 | | |
349 | | /* Binary per-tuple header */ |
350 | 0 | CopySendInt16(cstate, list_length(cstate->attnumlist)); |
351 | |
|
352 | 0 | foreach_int(attnum, cstate->attnumlist) |
353 | 0 | { |
354 | 0 | Datum value = slot->tts_values[attnum - 1]; |
355 | 0 | bool isnull = slot->tts_isnull[attnum - 1]; |
356 | |
|
357 | 0 | if (isnull) |
358 | 0 | { |
359 | 0 | CopySendInt32(cstate, -1); |
360 | 0 | } |
361 | 0 | else |
362 | 0 | { |
363 | 0 | bytea *outputbytes; |
364 | |
|
365 | 0 | outputbytes = SendFunctionCall(&out_functions[attnum - 1], |
366 | 0 | value); |
367 | 0 | CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ); |
368 | 0 | CopySendData(cstate, VARDATA(outputbytes), |
369 | 0 | VARSIZE(outputbytes) - VARHDRSZ); |
370 | 0 | } |
371 | 0 | } |
372 | |
|
373 | 0 | CopySendEndOfRow(cstate); |
374 | 0 | } |
375 | | |
376 | | /* Implementation of the end callback for binary format */ |
377 | | static void |
378 | | CopyToBinaryEnd(CopyToState cstate) |
379 | 0 | { |
380 | | /* Generate trailer for a binary copy */ |
381 | 0 | CopySendInt16(cstate, -1); |
382 | | /* Need to flush out the trailer */ |
383 | 0 | CopySendEndOfRow(cstate); |
384 | 0 | } |
385 | | |
386 | | /* |
387 | | * Send copy start/stop messages for frontend copies. These have changed |
388 | | * in past protocol redesigns. |
389 | | */ |
390 | | static void |
391 | | SendCopyBegin(CopyToState cstate) |
392 | 0 | { |
393 | 0 | StringInfoData buf; |
394 | 0 | int natts = list_length(cstate->attnumlist); |
395 | 0 | int16 format = (cstate->opts.binary ? 1 : 0); |
396 | 0 | int i; |
397 | |
|
398 | 0 | pq_beginmessage(&buf, PqMsg_CopyOutResponse); |
399 | 0 | pq_sendbyte(&buf, format); /* overall format */ |
400 | 0 | pq_sendint16(&buf, natts); |
401 | 0 | for (i = 0; i < natts; i++) |
402 | 0 | pq_sendint16(&buf, format); /* per-column formats */ |
403 | 0 | pq_endmessage(&buf); |
404 | 0 | cstate->copy_dest = COPY_FRONTEND; |
405 | 0 | } |
406 | | |
407 | | static void |
408 | | SendCopyEnd(CopyToState cstate) |
409 | 0 | { |
410 | | /* Shouldn't have any unsent data */ |
411 | 0 | Assert(cstate->fe_msgbuf->len == 0); |
412 | | /* Send Copy Done message */ |
413 | 0 | pq_putemptymessage(PqMsg_CopyDone); |
414 | 0 | } |
415 | | |
416 | | /*---------- |
417 | | * CopySendData sends output data to the destination (file or frontend) |
418 | | * CopySendString does the same for null-terminated strings |
419 | | * CopySendChar does the same for single characters |
420 | | * CopySendEndOfRow does the appropriate thing at end of each data row |
421 | | * (data is not actually flushed except by CopySendEndOfRow) |
422 | | * |
423 | | * NB: no data conversion is applied by these functions |
424 | | *---------- |
425 | | */ |
426 | | static void |
427 | | CopySendData(CopyToState cstate, const void *databuf, int datasize) |
428 | 0 | { |
429 | 0 | appendBinaryStringInfo(cstate->fe_msgbuf, databuf, datasize); |
430 | 0 | } |
431 | | |
432 | | static void |
433 | | CopySendString(CopyToState cstate, const char *str) |
434 | 0 | { |
435 | 0 | appendBinaryStringInfo(cstate->fe_msgbuf, str, strlen(str)); |
436 | 0 | } |
437 | | |
438 | | static void |
439 | | CopySendChar(CopyToState cstate, char c) |
440 | 0 | { |
441 | 0 | appendStringInfoCharMacro(cstate->fe_msgbuf, c); |
442 | 0 | } |
443 | | |
444 | | static void |
445 | | CopySendEndOfRow(CopyToState cstate) |
446 | 0 | { |
447 | 0 | StringInfo fe_msgbuf = cstate->fe_msgbuf; |
448 | |
|
449 | 0 | switch (cstate->copy_dest) |
450 | 0 | { |
451 | 0 | case COPY_FILE: |
452 | 0 | if (fwrite(fe_msgbuf->data, fe_msgbuf->len, 1, |
453 | 0 | cstate->copy_file) != 1 || |
454 | 0 | ferror(cstate->copy_file)) |
455 | 0 | { |
456 | 0 | if (cstate->is_program) |
457 | 0 | { |
458 | 0 | if (errno == EPIPE) |
459 | 0 | { |
460 | | /* |
461 | | * The pipe will be closed automatically on error at |
462 | | * the end of transaction, but we might get a better |
463 | | * error message from the subprocess' exit code than |
464 | | * just "Broken Pipe" |
465 | | */ |
466 | 0 | ClosePipeToProgram(cstate); |
467 | | |
468 | | /* |
469 | | * If ClosePipeToProgram() didn't throw an error, the |
470 | | * program terminated normally, but closed the pipe |
471 | | * first. Restore errno, and throw an error. |
472 | | */ |
473 | 0 | errno = EPIPE; |
474 | 0 | } |
475 | 0 | ereport(ERROR, |
476 | 0 | (errcode_for_file_access(), |
477 | 0 | errmsg("could not write to COPY program: %m"))); |
478 | 0 | } |
479 | 0 | else |
480 | 0 | ereport(ERROR, |
481 | 0 | (errcode_for_file_access(), |
482 | 0 | errmsg("could not write to COPY file: %m"))); |
483 | 0 | } |
484 | 0 | break; |
485 | 0 | case COPY_FRONTEND: |
486 | | /* Dump the accumulated row as one CopyData message */ |
487 | 0 | (void) pq_putmessage(PqMsg_CopyData, fe_msgbuf->data, fe_msgbuf->len); |
488 | 0 | break; |
489 | 0 | case COPY_CALLBACK: |
490 | 0 | cstate->data_dest_cb(fe_msgbuf->data, fe_msgbuf->len); |
491 | 0 | break; |
492 | 0 | } |
493 | | |
494 | | /* Update the progress */ |
495 | 0 | cstate->bytes_processed += fe_msgbuf->len; |
496 | 0 | pgstat_progress_update_param(PROGRESS_COPY_BYTES_PROCESSED, cstate->bytes_processed); |
497 | |
|
498 | 0 | resetStringInfo(fe_msgbuf); |
499 | 0 | } |
500 | | |
501 | | /* |
502 | | * Wrapper function of CopySendEndOfRow for text and CSV formats. Sends the |
503 | | * line termination and do common appropriate things for the end of row. |
504 | | */ |
505 | | static inline void |
506 | | CopySendTextLikeEndOfRow(CopyToState cstate) |
507 | 0 | { |
508 | 0 | switch (cstate->copy_dest) |
509 | 0 | { |
510 | 0 | case COPY_FILE: |
511 | | /* Default line termination depends on platform */ |
512 | 0 | #ifndef WIN32 |
513 | 0 | CopySendChar(cstate, '\n'); |
514 | | #else |
515 | | CopySendString(cstate, "\r\n"); |
516 | | #endif |
517 | 0 | break; |
518 | 0 | case COPY_FRONTEND: |
519 | | /* The FE/BE protocol uses \n as newline for all platforms */ |
520 | 0 | CopySendChar(cstate, '\n'); |
521 | 0 | break; |
522 | 0 | default: |
523 | 0 | break; |
524 | 0 | } |
525 | | |
526 | | /* Now take the actions related to the end of a row */ |
527 | 0 | CopySendEndOfRow(cstate); |
528 | 0 | } |
529 | | |
530 | | /* |
531 | | * These functions do apply some data conversion |
532 | | */ |
533 | | |
534 | | /* |
535 | | * CopySendInt32 sends an int32 in network byte order |
536 | | */ |
537 | | static inline void |
538 | | CopySendInt32(CopyToState cstate, int32 val) |
539 | 0 | { |
540 | 0 | uint32 buf; |
541 | |
|
542 | 0 | buf = pg_hton32((uint32) val); |
543 | 0 | CopySendData(cstate, &buf, sizeof(buf)); |
544 | 0 | } |
545 | | |
546 | | /* |
547 | | * CopySendInt16 sends an int16 in network byte order |
548 | | */ |
549 | | static inline void |
550 | | CopySendInt16(CopyToState cstate, int16 val) |
551 | 0 | { |
552 | 0 | uint16 buf; |
553 | |
|
554 | 0 | buf = pg_hton16((uint16) val); |
555 | 0 | CopySendData(cstate, &buf, sizeof(buf)); |
556 | 0 | } |
557 | | |
558 | | /* |
559 | | * Closes the pipe to an external program, checking the pclose() return code. |
560 | | */ |
561 | | static void |
562 | | ClosePipeToProgram(CopyToState cstate) |
563 | 0 | { |
564 | 0 | int pclose_rc; |
565 | |
|
566 | 0 | Assert(cstate->is_program); |
567 | |
|
568 | 0 | pclose_rc = ClosePipeStream(cstate->copy_file); |
569 | 0 | if (pclose_rc == -1) |
570 | 0 | ereport(ERROR, |
571 | 0 | (errcode_for_file_access(), |
572 | 0 | errmsg("could not close pipe to external command: %m"))); |
573 | 0 | else if (pclose_rc != 0) |
574 | 0 | { |
575 | 0 | ereport(ERROR, |
576 | 0 | (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION), |
577 | 0 | errmsg("program \"%s\" failed", |
578 | 0 | cstate->filename), |
579 | 0 | errdetail_internal("%s", wait_result_to_str(pclose_rc)))); |
580 | 0 | } |
581 | 0 | } |
582 | | |
583 | | /* |
584 | | * Release resources allocated in a cstate for COPY TO/FROM. |
585 | | */ |
586 | | static void |
587 | | EndCopy(CopyToState cstate) |
588 | 0 | { |
589 | 0 | if (cstate->is_program) |
590 | 0 | { |
591 | 0 | ClosePipeToProgram(cstate); |
592 | 0 | } |
593 | 0 | else |
594 | 0 | { |
595 | 0 | if (cstate->filename != NULL && FreeFile(cstate->copy_file)) |
596 | 0 | ereport(ERROR, |
597 | 0 | (errcode_for_file_access(), |
598 | 0 | errmsg("could not close file \"%s\": %m", |
599 | 0 | cstate->filename))); |
600 | 0 | } |
601 | | |
602 | 0 | pgstat_progress_end_command(); |
603 | |
|
604 | 0 | MemoryContextDelete(cstate->copycontext); |
605 | 0 | pfree(cstate); |
606 | 0 | } |
607 | | |
608 | | /* |
609 | | * Setup CopyToState to read tuples from a table or a query for COPY TO. |
610 | | * |
611 | | * 'rel': Relation to be copied |
612 | | * 'raw_query': Query whose results are to be copied |
613 | | * 'queryRelId': OID of base relation to convert to a query (for RLS) |
614 | | * 'filename': Name of server-local file to write, NULL for STDOUT |
615 | | * 'is_program': true if 'filename' is program to execute |
616 | | * 'data_dest_cb': Callback that processes the output data |
617 | | * 'attnamelist': List of char *, columns to include. NIL selects all cols. |
618 | | * 'options': List of DefElem. See copy_opt_item in gram.y for selections. |
619 | | * |
620 | | * Returns a CopyToState, to be passed to DoCopyTo() and related functions. |
621 | | */ |
622 | | CopyToState |
623 | | BeginCopyTo(ParseState *pstate, |
624 | | Relation rel, |
625 | | RawStmt *raw_query, |
626 | | Oid queryRelId, |
627 | | const char *filename, |
628 | | bool is_program, |
629 | | copy_data_dest_cb data_dest_cb, |
630 | | List *attnamelist, |
631 | | List *options) |
632 | 0 | { |
633 | 0 | CopyToState cstate; |
634 | 0 | bool pipe = (filename == NULL && data_dest_cb == NULL); |
635 | 0 | TupleDesc tupDesc; |
636 | 0 | int num_phys_attrs; |
637 | 0 | MemoryContext oldcontext; |
638 | 0 | const int progress_cols[] = { |
639 | 0 | PROGRESS_COPY_COMMAND, |
640 | 0 | PROGRESS_COPY_TYPE |
641 | 0 | }; |
642 | 0 | int64 progress_vals[] = { |
643 | 0 | PROGRESS_COPY_COMMAND_TO, |
644 | 0 | 0 |
645 | 0 | }; |
646 | |
|
647 | 0 | if (rel != NULL && rel->rd_rel->relkind != RELKIND_RELATION) |
648 | 0 | { |
649 | 0 | if (rel->rd_rel->relkind == RELKIND_VIEW) |
650 | 0 | ereport(ERROR, |
651 | 0 | (errcode(ERRCODE_WRONG_OBJECT_TYPE), |
652 | 0 | errmsg("cannot copy from view \"%s\"", |
653 | 0 | RelationGetRelationName(rel)), |
654 | 0 | errhint("Try the COPY (SELECT ...) TO variant."))); |
655 | 0 | else if (rel->rd_rel->relkind == RELKIND_MATVIEW) |
656 | 0 | { |
657 | 0 | if (!RelationIsPopulated(rel)) |
658 | 0 | ereport(ERROR, |
659 | 0 | errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
660 | 0 | errmsg("cannot copy from unpopulated materialized view \"%s\"", |
661 | 0 | RelationGetRelationName(rel)), |
662 | 0 | errhint("Use the REFRESH MATERIALIZED VIEW command.")); |
663 | 0 | } |
664 | 0 | else if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE) |
665 | 0 | ereport(ERROR, |
666 | 0 | (errcode(ERRCODE_WRONG_OBJECT_TYPE), |
667 | 0 | errmsg("cannot copy from foreign table \"%s\"", |
668 | 0 | RelationGetRelationName(rel)), |
669 | 0 | errhint("Try the COPY (SELECT ...) TO variant."))); |
670 | 0 | else if (rel->rd_rel->relkind == RELKIND_SEQUENCE) |
671 | 0 | ereport(ERROR, |
672 | 0 | (errcode(ERRCODE_WRONG_OBJECT_TYPE), |
673 | 0 | errmsg("cannot copy from sequence \"%s\"", |
674 | 0 | RelationGetRelationName(rel)))); |
675 | 0 | else if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) |
676 | 0 | ereport(ERROR, |
677 | 0 | (errcode(ERRCODE_WRONG_OBJECT_TYPE), |
678 | 0 | errmsg("cannot copy from partitioned table \"%s\"", |
679 | 0 | RelationGetRelationName(rel)), |
680 | 0 | errhint("Try the COPY (SELECT ...) TO variant."))); |
681 | 0 | else |
682 | 0 | ereport(ERROR, |
683 | 0 | (errcode(ERRCODE_WRONG_OBJECT_TYPE), |
684 | 0 | errmsg("cannot copy from non-table relation \"%s\"", |
685 | 0 | RelationGetRelationName(rel)))); |
686 | 0 | } |
687 | | |
688 | | |
689 | | /* Allocate workspace and zero all fields */ |
690 | 0 | cstate = (CopyToStateData *) palloc0(sizeof(CopyToStateData)); |
691 | | |
692 | | /* |
693 | | * We allocate everything used by a cstate in a new memory context. This |
694 | | * avoids memory leaks during repeated use of COPY in a query. |
695 | | */ |
696 | 0 | cstate->copycontext = AllocSetContextCreate(CurrentMemoryContext, |
697 | 0 | "COPY", |
698 | 0 | ALLOCSET_DEFAULT_SIZES); |
699 | |
|
700 | 0 | oldcontext = MemoryContextSwitchTo(cstate->copycontext); |
701 | | |
702 | | /* Extract options from the statement node tree */ |
703 | 0 | ProcessCopyOptions(pstate, &cstate->opts, false /* is_from */ , options); |
704 | | |
705 | | /* Set format routine */ |
706 | 0 | cstate->routine = CopyToGetRoutine(&cstate->opts); |
707 | | |
708 | | /* Process the source/target relation or query */ |
709 | 0 | if (rel) |
710 | 0 | { |
711 | 0 | Assert(!raw_query); |
712 | |
|
713 | 0 | cstate->rel = rel; |
714 | |
|
715 | 0 | tupDesc = RelationGetDescr(cstate->rel); |
716 | 0 | } |
717 | 0 | else |
718 | 0 | { |
719 | 0 | List *rewritten; |
720 | 0 | Query *query; |
721 | 0 | PlannedStmt *plan; |
722 | 0 | DestReceiver *dest; |
723 | |
|
724 | 0 | cstate->rel = NULL; |
725 | | |
726 | | /* |
727 | | * Run parse analysis and rewrite. Note this also acquires sufficient |
728 | | * locks on the source table(s). |
729 | | */ |
730 | 0 | rewritten = pg_analyze_and_rewrite_fixedparams(raw_query, |
731 | 0 | pstate->p_sourcetext, NULL, 0, |
732 | 0 | NULL); |
733 | | |
734 | | /* check that we got back something we can work with */ |
735 | 0 | if (rewritten == NIL) |
736 | 0 | { |
737 | 0 | ereport(ERROR, |
738 | 0 | (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
739 | 0 | errmsg("DO INSTEAD NOTHING rules are not supported for COPY"))); |
740 | 0 | } |
741 | 0 | else if (list_length(rewritten) > 1) |
742 | 0 | { |
743 | 0 | ListCell *lc; |
744 | | |
745 | | /* examine queries to determine which error message to issue */ |
746 | 0 | foreach(lc, rewritten) |
747 | 0 | { |
748 | 0 | Query *q = lfirst_node(Query, lc); |
749 | |
|
750 | 0 | if (q->querySource == QSRC_QUAL_INSTEAD_RULE) |
751 | 0 | ereport(ERROR, |
752 | 0 | (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
753 | 0 | errmsg("conditional DO INSTEAD rules are not supported for COPY"))); |
754 | 0 | if (q->querySource == QSRC_NON_INSTEAD_RULE) |
755 | 0 | ereport(ERROR, |
756 | 0 | (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
757 | 0 | errmsg("DO ALSO rules are not supported for COPY"))); |
758 | 0 | } |
759 | | |
760 | 0 | ereport(ERROR, |
761 | 0 | (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
762 | 0 | errmsg("multi-statement DO INSTEAD rules are not supported for COPY"))); |
763 | 0 | } |
764 | | |
765 | 0 | query = linitial_node(Query, rewritten); |
766 | | |
767 | | /* The grammar allows SELECT INTO, but we don't support that */ |
768 | 0 | if (query->utilityStmt != NULL && |
769 | 0 | IsA(query->utilityStmt, CreateTableAsStmt)) |
770 | 0 | ereport(ERROR, |
771 | 0 | (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
772 | 0 | errmsg("COPY (SELECT INTO) is not supported"))); |
773 | | |
774 | | /* The only other utility command we could see is NOTIFY */ |
775 | 0 | if (query->utilityStmt != NULL) |
776 | 0 | ereport(ERROR, |
777 | 0 | (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
778 | 0 | errmsg("COPY query must not be a utility command"))); |
779 | | |
780 | | /* |
781 | | * Similarly the grammar doesn't enforce the presence of a RETURNING |
782 | | * clause, but this is required here. |
783 | | */ |
784 | 0 | if (query->commandType != CMD_SELECT && |
785 | 0 | query->returningList == NIL) |
786 | 0 | { |
787 | 0 | Assert(query->commandType == CMD_INSERT || |
788 | 0 | query->commandType == CMD_UPDATE || |
789 | 0 | query->commandType == CMD_DELETE || |
790 | 0 | query->commandType == CMD_MERGE); |
791 | |
|
792 | 0 | ereport(ERROR, |
793 | 0 | (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
794 | 0 | errmsg("COPY query must have a RETURNING clause"))); |
795 | 0 | } |
796 | | |
797 | | /* plan the query */ |
798 | 0 | plan = pg_plan_query(query, pstate->p_sourcetext, |
799 | 0 | CURSOR_OPT_PARALLEL_OK, NULL); |
800 | | |
801 | | /* |
802 | | * With row-level security and a user using "COPY relation TO", we |
803 | | * have to convert the "COPY relation TO" to a query-based COPY (eg: |
804 | | * "COPY (SELECT * FROM ONLY relation) TO"), to allow the rewriter to |
805 | | * add in any RLS clauses. |
806 | | * |
807 | | * When this happens, we are passed in the relid of the originally |
808 | | * found relation (which we have locked). As the planner will look up |
809 | | * the relation again, we double-check here to make sure it found the |
810 | | * same one that we have locked. |
811 | | */ |
812 | 0 | if (queryRelId != InvalidOid) |
813 | 0 | { |
814 | | /* |
815 | | * Note that with RLS involved there may be multiple relations, |
816 | | * and while the one we need is almost certainly first, we don't |
817 | | * make any guarantees of that in the planner, so check the whole |
818 | | * list and make sure we find the original relation. |
819 | | */ |
820 | 0 | if (!list_member_oid(plan->relationOids, queryRelId)) |
821 | 0 | ereport(ERROR, |
822 | 0 | (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
823 | 0 | errmsg("relation referenced by COPY statement has changed"))); |
824 | 0 | } |
825 | | |
826 | | /* |
827 | | * Use a snapshot with an updated command ID to ensure this query sees |
828 | | * results of any previously executed queries. |
829 | | */ |
830 | 0 | PushCopiedSnapshot(GetActiveSnapshot()); |
831 | 0 | UpdateActiveSnapshotCommandId(); |
832 | | |
833 | | /* Create dest receiver for COPY OUT */ |
834 | 0 | dest = CreateDestReceiver(DestCopyOut); |
835 | 0 | ((DR_copy *) dest)->cstate = cstate; |
836 | | |
837 | | /* Create a QueryDesc requesting no output */ |
838 | 0 | cstate->queryDesc = CreateQueryDesc(plan, pstate->p_sourcetext, |
839 | 0 | GetActiveSnapshot(), |
840 | 0 | InvalidSnapshot, |
841 | 0 | dest, NULL, NULL, 0); |
842 | | |
843 | | /* |
844 | | * Call ExecutorStart to prepare the plan for execution. |
845 | | * |
846 | | * ExecutorStart computes a result tupdesc for us |
847 | | */ |
848 | 0 | ExecutorStart(cstate->queryDesc, 0); |
849 | |
|
850 | 0 | tupDesc = cstate->queryDesc->tupDesc; |
851 | 0 | } |
852 | | |
853 | | /* Generate or convert list of attributes to process */ |
854 | 0 | cstate->attnumlist = CopyGetAttnums(tupDesc, cstate->rel, attnamelist); |
855 | |
|
856 | 0 | num_phys_attrs = tupDesc->natts; |
857 | | |
858 | | /* Convert FORCE_QUOTE name list to per-column flags, check validity */ |
859 | 0 | cstate->opts.force_quote_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool)); |
860 | 0 | if (cstate->opts.force_quote_all) |
861 | 0 | { |
862 | 0 | MemSet(cstate->opts.force_quote_flags, true, num_phys_attrs * sizeof(bool)); |
863 | 0 | } |
864 | 0 | else if (cstate->opts.force_quote) |
865 | 0 | { |
866 | 0 | List *attnums; |
867 | 0 | ListCell *cur; |
868 | |
|
869 | 0 | attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.force_quote); |
870 | |
|
871 | 0 | foreach(cur, attnums) |
872 | 0 | { |
873 | 0 | int attnum = lfirst_int(cur); |
874 | 0 | Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1); |
875 | |
|
876 | 0 | if (!list_member_int(cstate->attnumlist, attnum)) |
877 | 0 | ereport(ERROR, |
878 | 0 | (errcode(ERRCODE_INVALID_COLUMN_REFERENCE), |
879 | | /*- translator: %s is the name of a COPY option, e.g. FORCE_NOT_NULL */ |
880 | 0 | errmsg("%s column \"%s\" not referenced by COPY", |
881 | 0 | "FORCE_QUOTE", NameStr(attr->attname)))); |
882 | 0 | cstate->opts.force_quote_flags[attnum - 1] = true; |
883 | 0 | } |
884 | 0 | } |
885 | | |
886 | | /* Use client encoding when ENCODING option is not specified. */ |
887 | 0 | if (cstate->opts.file_encoding < 0) |
888 | 0 | cstate->file_encoding = pg_get_client_encoding(); |
889 | 0 | else |
890 | 0 | cstate->file_encoding = cstate->opts.file_encoding; |
891 | | |
892 | | /* |
893 | | * Set up encoding conversion info if the file and server encodings differ |
894 | | * (see also pg_server_to_any). |
895 | | */ |
896 | 0 | if (cstate->file_encoding == GetDatabaseEncoding() || |
897 | 0 | cstate->file_encoding == PG_SQL_ASCII) |
898 | 0 | cstate->need_transcoding = false; |
899 | 0 | else |
900 | 0 | cstate->need_transcoding = true; |
901 | | |
902 | | /* See Multibyte encoding comment above */ |
903 | 0 | cstate->encoding_embeds_ascii = PG_ENCODING_IS_CLIENT_ONLY(cstate->file_encoding); |
904 | |
|
905 | 0 | cstate->copy_dest = COPY_FILE; /* default */ |
906 | |
|
907 | 0 | if (data_dest_cb) |
908 | 0 | { |
909 | 0 | progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK; |
910 | 0 | cstate->copy_dest = COPY_CALLBACK; |
911 | 0 | cstate->data_dest_cb = data_dest_cb; |
912 | 0 | } |
913 | 0 | else if (pipe) |
914 | 0 | { |
915 | 0 | progress_vals[1] = PROGRESS_COPY_TYPE_PIPE; |
916 | |
|
917 | 0 | Assert(!is_program); /* the grammar does not allow this */ |
918 | 0 | if (whereToSendOutput != DestRemote) |
919 | 0 | cstate->copy_file = stdout; |
920 | 0 | } |
921 | 0 | else |
922 | 0 | { |
923 | 0 | cstate->filename = pstrdup(filename); |
924 | 0 | cstate->is_program = is_program; |
925 | |
|
926 | 0 | if (is_program) |
927 | 0 | { |
928 | 0 | progress_vals[1] = PROGRESS_COPY_TYPE_PROGRAM; |
929 | 0 | cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_W); |
930 | 0 | if (cstate->copy_file == NULL) |
931 | 0 | ereport(ERROR, |
932 | 0 | (errcode_for_file_access(), |
933 | 0 | errmsg("could not execute command \"%s\": %m", |
934 | 0 | cstate->filename))); |
935 | 0 | } |
936 | 0 | else |
937 | 0 | { |
938 | 0 | mode_t oumask; /* Pre-existing umask value */ |
939 | 0 | struct stat st; |
940 | |
|
941 | 0 | progress_vals[1] = PROGRESS_COPY_TYPE_FILE; |
942 | | |
943 | | /* |
944 | | * Prevent write to relative path ... too easy to shoot oneself in |
945 | | * the foot by overwriting a database file ... |
946 | | */ |
947 | 0 | if (!is_absolute_path(filename)) |
948 | 0 | ereport(ERROR, |
949 | 0 | (errcode(ERRCODE_INVALID_NAME), |
950 | 0 | errmsg("relative path not allowed for COPY to file"))); |
951 | | |
952 | 0 | oumask = umask(S_IWGRP | S_IWOTH); |
953 | 0 | PG_TRY(); |
954 | 0 | { |
955 | 0 | cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_W); |
956 | 0 | } |
957 | 0 | PG_FINALLY(); |
958 | 0 | { |
959 | 0 | umask(oumask); |
960 | 0 | } |
961 | 0 | PG_END_TRY(); |
962 | 0 | if (cstate->copy_file == NULL) |
963 | 0 | { |
964 | | /* copy errno because ereport subfunctions might change it */ |
965 | 0 | int save_errno = errno; |
966 | |
|
967 | 0 | ereport(ERROR, |
968 | 0 | (errcode_for_file_access(), |
969 | 0 | errmsg("could not open file \"%s\" for writing: %m", |
970 | 0 | cstate->filename), |
971 | 0 | (save_errno == ENOENT || save_errno == EACCES) ? |
972 | 0 | errhint("COPY TO instructs the PostgreSQL server process to write a file. " |
973 | 0 | "You may want a client-side facility such as psql's \\copy.") : 0)); |
974 | 0 | } |
975 | | |
976 | 0 | if (fstat(fileno(cstate->copy_file), &st)) |
977 | 0 | ereport(ERROR, |
978 | 0 | (errcode_for_file_access(), |
979 | 0 | errmsg("could not stat file \"%s\": %m", |
980 | 0 | cstate->filename))); |
981 | | |
982 | 0 | if (S_ISDIR(st.st_mode)) |
983 | 0 | ereport(ERROR, |
984 | 0 | (errcode(ERRCODE_WRONG_OBJECT_TYPE), |
985 | 0 | errmsg("\"%s\" is a directory", cstate->filename))); |
986 | 0 | } |
987 | 0 | } |
988 | | |
989 | | /* initialize progress */ |
990 | 0 | pgstat_progress_start_command(PROGRESS_COMMAND_COPY, |
991 | 0 | cstate->rel ? RelationGetRelid(cstate->rel) : InvalidOid); |
992 | 0 | pgstat_progress_update_multi_param(2, progress_cols, progress_vals); |
993 | |
|
994 | 0 | cstate->bytes_processed = 0; |
995 | |
|
996 | 0 | MemoryContextSwitchTo(oldcontext); |
997 | |
|
998 | 0 | return cstate; |
999 | 0 | } |
1000 | | |
1001 | | /* |
1002 | | * Clean up storage and release resources for COPY TO. |
1003 | | */ |
1004 | | void |
1005 | | EndCopyTo(CopyToState cstate) |
1006 | 0 | { |
1007 | 0 | if (cstate->queryDesc != NULL) |
1008 | 0 | { |
1009 | | /* Close down the query and free resources. */ |
1010 | 0 | ExecutorFinish(cstate->queryDesc); |
1011 | 0 | ExecutorEnd(cstate->queryDesc); |
1012 | 0 | FreeQueryDesc(cstate->queryDesc); |
1013 | 0 | PopActiveSnapshot(); |
1014 | 0 | } |
1015 | | |
1016 | | /* Clean up storage */ |
1017 | 0 | EndCopy(cstate); |
1018 | 0 | } |
1019 | | |
1020 | | /* |
1021 | | * Copy from relation or query TO file. |
1022 | | * |
1023 | | * Returns the number of rows processed. |
1024 | | */ |
1025 | | uint64 |
1026 | | DoCopyTo(CopyToState cstate) |
1027 | 0 | { |
1028 | 0 | bool pipe = (cstate->filename == NULL && cstate->data_dest_cb == NULL); |
1029 | 0 | bool fe_copy = (pipe && whereToSendOutput == DestRemote); |
1030 | 0 | TupleDesc tupDesc; |
1031 | 0 | int num_phys_attrs; |
1032 | 0 | ListCell *cur; |
1033 | 0 | uint64 processed; |
1034 | |
|
1035 | 0 | if (fe_copy) |
1036 | 0 | SendCopyBegin(cstate); |
1037 | |
|
1038 | 0 | if (cstate->rel) |
1039 | 0 | tupDesc = RelationGetDescr(cstate->rel); |
1040 | 0 | else |
1041 | 0 | tupDesc = cstate->queryDesc->tupDesc; |
1042 | 0 | num_phys_attrs = tupDesc->natts; |
1043 | 0 | cstate->opts.null_print_client = cstate->opts.null_print; /* default */ |
1044 | | |
1045 | | /* We use fe_msgbuf as a per-row buffer regardless of copy_dest */ |
1046 | 0 | cstate->fe_msgbuf = makeStringInfo(); |
1047 | | |
1048 | | /* Get info about the columns we need to process. */ |
1049 | 0 | cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo)); |
1050 | 0 | foreach(cur, cstate->attnumlist) |
1051 | 0 | { |
1052 | 0 | int attnum = lfirst_int(cur); |
1053 | 0 | Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1); |
1054 | |
|
1055 | 0 | cstate->routine->CopyToOutFunc(cstate, attr->atttypid, |
1056 | 0 | &cstate->out_functions[attnum - 1]); |
1057 | 0 | } |
1058 | | |
1059 | | /* |
1060 | | * Create a temporary memory context that we can reset once per row to |
1061 | | * recover palloc'd memory. This avoids any problems with leaks inside |
1062 | | * datatype output routines, and should be faster than retail pfree's |
1063 | | * anyway. (We don't need a whole econtext as CopyFrom does.) |
1064 | | */ |
1065 | 0 | cstate->rowcontext = AllocSetContextCreate(CurrentMemoryContext, |
1066 | 0 | "COPY TO", |
1067 | 0 | ALLOCSET_DEFAULT_SIZES); |
1068 | |
|
1069 | 0 | cstate->routine->CopyToStart(cstate, tupDesc); |
1070 | |
|
1071 | 0 | if (cstate->rel) |
1072 | 0 | { |
1073 | 0 | TupleTableSlot *slot; |
1074 | 0 | TableScanDesc scandesc; |
1075 | |
|
1076 | 0 | scandesc = table_beginscan(cstate->rel, GetActiveSnapshot(), 0, NULL); |
1077 | 0 | slot = table_slot_create(cstate->rel, NULL); |
1078 | |
|
1079 | 0 | processed = 0; |
1080 | 0 | while (table_scan_getnextslot(scandesc, ForwardScanDirection, slot)) |
1081 | 0 | { |
1082 | 0 | CHECK_FOR_INTERRUPTS(); |
1083 | | |
1084 | | /* Deconstruct the tuple ... */ |
1085 | 0 | slot_getallattrs(slot); |
1086 | | |
1087 | | /* Format and send the data */ |
1088 | 0 | CopyOneRowTo(cstate, slot); |
1089 | | |
1090 | | /* |
1091 | | * Increment the number of processed tuples, and report the |
1092 | | * progress. |
1093 | | */ |
1094 | 0 | pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED, |
1095 | 0 | ++processed); |
1096 | 0 | } |
1097 | |
|
1098 | 0 | ExecDropSingleTupleTableSlot(slot); |
1099 | 0 | table_endscan(scandesc); |
1100 | 0 | } |
1101 | 0 | else |
1102 | 0 | { |
1103 | | /* run the plan --- the dest receiver will send tuples */ |
1104 | 0 | ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0); |
1105 | 0 | processed = ((DR_copy *) cstate->queryDesc->dest)->processed; |
1106 | 0 | } |
1107 | |
|
1108 | 0 | cstate->routine->CopyToEnd(cstate); |
1109 | |
|
1110 | 0 | MemoryContextDelete(cstate->rowcontext); |
1111 | |
|
1112 | 0 | if (fe_copy) |
1113 | 0 | SendCopyEnd(cstate); |
1114 | |
|
1115 | 0 | return processed; |
1116 | 0 | } |
1117 | | |
1118 | | /* |
1119 | | * Emit one row during DoCopyTo(). |
1120 | | */ |
1121 | | static inline void |
1122 | | CopyOneRowTo(CopyToState cstate, TupleTableSlot *slot) |
1123 | 0 | { |
1124 | 0 | MemoryContext oldcontext; |
1125 | |
|
1126 | 0 | MemoryContextReset(cstate->rowcontext); |
1127 | 0 | oldcontext = MemoryContextSwitchTo(cstate->rowcontext); |
1128 | | |
1129 | | /* Make sure the tuple is fully deconstructed */ |
1130 | 0 | slot_getallattrs(slot); |
1131 | |
|
1132 | 0 | cstate->routine->CopyToOneRow(cstate, slot); |
1133 | |
|
1134 | 0 | MemoryContextSwitchTo(oldcontext); |
1135 | 0 | } |
1136 | | |
1137 | | /* |
1138 | | * Send text representation of one attribute, with conversion and escaping |
1139 | | */ |
1140 | | #define DUMPSOFAR() \ |
1141 | 0 | do { \ |
1142 | 0 | if (ptr > start) \ |
1143 | 0 | CopySendData(cstate, start, ptr - start); \ |
1144 | 0 | } while (0) |
1145 | | |
1146 | | static void |
1147 | | CopyAttributeOutText(CopyToState cstate, const char *string) |
1148 | 0 | { |
1149 | 0 | const char *ptr; |
1150 | 0 | const char *start; |
1151 | 0 | char c; |
1152 | 0 | char delimc = cstate->opts.delim[0]; |
1153 | |
|
1154 | 0 | if (cstate->need_transcoding) |
1155 | 0 | ptr = pg_server_to_any(string, strlen(string), cstate->file_encoding); |
1156 | 0 | else |
1157 | 0 | ptr = string; |
1158 | | |
1159 | | /* |
1160 | | * We have to grovel through the string searching for control characters |
1161 | | * and instances of the delimiter character. In most cases, though, these |
1162 | | * are infrequent. To avoid overhead from calling CopySendData once per |
1163 | | * character, we dump out all characters between escaped characters in a |
1164 | | * single call. The loop invariant is that the data from "start" to "ptr" |
1165 | | * can be sent literally, but hasn't yet been. |
1166 | | * |
1167 | | * We can skip pg_encoding_mblen() overhead when encoding is safe, because |
1168 | | * in valid backend encodings, extra bytes of a multibyte character never |
1169 | | * look like ASCII. This loop is sufficiently performance-critical that |
1170 | | * it's worth making two copies of it to get the IS_HIGHBIT_SET() test out |
1171 | | * of the normal safe-encoding path. |
1172 | | */ |
1173 | 0 | if (cstate->encoding_embeds_ascii) |
1174 | 0 | { |
1175 | 0 | start = ptr; |
1176 | 0 | while ((c = *ptr) != '\0') |
1177 | 0 | { |
1178 | 0 | if ((unsigned char) c < (unsigned char) 0x20) |
1179 | 0 | { |
1180 | | /* |
1181 | | * \r and \n must be escaped, the others are traditional. We |
1182 | | * prefer to dump these using the C-like notation, rather than |
1183 | | * a backslash and the literal character, because it makes the |
1184 | | * dump file a bit more proof against Microsoftish data |
1185 | | * mangling. |
1186 | | */ |
1187 | 0 | switch (c) |
1188 | 0 | { |
1189 | 0 | case '\b': |
1190 | 0 | c = 'b'; |
1191 | 0 | break; |
1192 | 0 | case '\f': |
1193 | 0 | c = 'f'; |
1194 | 0 | break; |
1195 | 0 | case '\n': |
1196 | 0 | c = 'n'; |
1197 | 0 | break; |
1198 | 0 | case '\r': |
1199 | 0 | c = 'r'; |
1200 | 0 | break; |
1201 | 0 | case '\t': |
1202 | 0 | c = 't'; |
1203 | 0 | break; |
1204 | 0 | case '\v': |
1205 | 0 | c = 'v'; |
1206 | 0 | break; |
1207 | 0 | default: |
1208 | | /* If it's the delimiter, must backslash it */ |
1209 | 0 | if (c == delimc) |
1210 | 0 | break; |
1211 | | /* All ASCII control chars are length 1 */ |
1212 | 0 | ptr++; |
1213 | 0 | continue; /* fall to end of loop */ |
1214 | 0 | } |
1215 | | /* if we get here, we need to convert the control char */ |
1216 | 0 | DUMPSOFAR(); |
1217 | 0 | CopySendChar(cstate, '\\'); |
1218 | 0 | CopySendChar(cstate, c); |
1219 | 0 | start = ++ptr; /* do not include char in next run */ |
1220 | 0 | } |
1221 | 0 | else if (c == '\\' || c == delimc) |
1222 | 0 | { |
1223 | 0 | DUMPSOFAR(); |
1224 | 0 | CopySendChar(cstate, '\\'); |
1225 | 0 | start = ptr++; /* we include char in next run */ |
1226 | 0 | } |
1227 | 0 | else if (IS_HIGHBIT_SET(c)) |
1228 | 0 | ptr += pg_encoding_mblen(cstate->file_encoding, ptr); |
1229 | 0 | else |
1230 | 0 | ptr++; |
1231 | 0 | } |
1232 | 0 | } |
1233 | 0 | else |
1234 | 0 | { |
1235 | 0 | start = ptr; |
1236 | 0 | while ((c = *ptr) != '\0') |
1237 | 0 | { |
1238 | 0 | if ((unsigned char) c < (unsigned char) 0x20) |
1239 | 0 | { |
1240 | | /* |
1241 | | * \r and \n must be escaped, the others are traditional. We |
1242 | | * prefer to dump these using the C-like notation, rather than |
1243 | | * a backslash and the literal character, because it makes the |
1244 | | * dump file a bit more proof against Microsoftish data |
1245 | | * mangling. |
1246 | | */ |
1247 | 0 | switch (c) |
1248 | 0 | { |
1249 | 0 | case '\b': |
1250 | 0 | c = 'b'; |
1251 | 0 | break; |
1252 | 0 | case '\f': |
1253 | 0 | c = 'f'; |
1254 | 0 | break; |
1255 | 0 | case '\n': |
1256 | 0 | c = 'n'; |
1257 | 0 | break; |
1258 | 0 | case '\r': |
1259 | 0 | c = 'r'; |
1260 | 0 | break; |
1261 | 0 | case '\t': |
1262 | 0 | c = 't'; |
1263 | 0 | break; |
1264 | 0 | case '\v': |
1265 | 0 | c = 'v'; |
1266 | 0 | break; |
1267 | 0 | default: |
1268 | | /* If it's the delimiter, must backslash it */ |
1269 | 0 | if (c == delimc) |
1270 | 0 | break; |
1271 | | /* All ASCII control chars are length 1 */ |
1272 | 0 | ptr++; |
1273 | 0 | continue; /* fall to end of loop */ |
1274 | 0 | } |
1275 | | /* if we get here, we need to convert the control char */ |
1276 | 0 | DUMPSOFAR(); |
1277 | 0 | CopySendChar(cstate, '\\'); |
1278 | 0 | CopySendChar(cstate, c); |
1279 | 0 | start = ++ptr; /* do not include char in next run */ |
1280 | 0 | } |
1281 | 0 | else if (c == '\\' || c == delimc) |
1282 | 0 | { |
1283 | 0 | DUMPSOFAR(); |
1284 | 0 | CopySendChar(cstate, '\\'); |
1285 | 0 | start = ptr++; /* we include char in next run */ |
1286 | 0 | } |
1287 | 0 | else |
1288 | 0 | ptr++; |
1289 | 0 | } |
1290 | 0 | } |
1291 | | |
1292 | 0 | DUMPSOFAR(); |
1293 | 0 | } |
1294 | | |
1295 | | /* |
1296 | | * Send text representation of one attribute, with conversion and |
1297 | | * CSV-style escaping |
1298 | | */ |
1299 | | static void |
1300 | | CopyAttributeOutCSV(CopyToState cstate, const char *string, |
1301 | | bool use_quote) |
1302 | 0 | { |
1303 | 0 | const char *ptr; |
1304 | 0 | const char *start; |
1305 | 0 | char c; |
1306 | 0 | char delimc = cstate->opts.delim[0]; |
1307 | 0 | char quotec = cstate->opts.quote[0]; |
1308 | 0 | char escapec = cstate->opts.escape[0]; |
1309 | 0 | bool single_attr = (list_length(cstate->attnumlist) == 1); |
1310 | | |
1311 | | /* force quoting if it matches null_print (before conversion!) */ |
1312 | 0 | if (!use_quote && strcmp(string, cstate->opts.null_print) == 0) |
1313 | 0 | use_quote = true; |
1314 | |
|
1315 | 0 | if (cstate->need_transcoding) |
1316 | 0 | ptr = pg_server_to_any(string, strlen(string), cstate->file_encoding); |
1317 | 0 | else |
1318 | 0 | ptr = string; |
1319 | | |
1320 | | /* |
1321 | | * Make a preliminary pass to discover if it needs quoting |
1322 | | */ |
1323 | 0 | if (!use_quote) |
1324 | 0 | { |
1325 | | /* |
1326 | | * Quote '\.' if it appears alone on a line, so that it will not be |
1327 | | * interpreted as an end-of-data marker. (PG 18 and up will not |
1328 | | * interpret '\.' in CSV that way, except in embedded-in-SQL data; but |
1329 | | * we want the data to be loadable by older versions too. Also, this |
1330 | | * avoids breaking clients that are still using PQgetline().) |
1331 | | */ |
1332 | 0 | if (single_attr && strcmp(ptr, "\\.") == 0) |
1333 | 0 | use_quote = true; |
1334 | 0 | else |
1335 | 0 | { |
1336 | 0 | const char *tptr = ptr; |
1337 | |
|
1338 | 0 | while ((c = *tptr) != '\0') |
1339 | 0 | { |
1340 | 0 | if (c == delimc || c == quotec || c == '\n' || c == '\r') |
1341 | 0 | { |
1342 | 0 | use_quote = true; |
1343 | 0 | break; |
1344 | 0 | } |
1345 | 0 | if (IS_HIGHBIT_SET(c) && cstate->encoding_embeds_ascii) |
1346 | 0 | tptr += pg_encoding_mblen(cstate->file_encoding, tptr); |
1347 | 0 | else |
1348 | 0 | tptr++; |
1349 | 0 | } |
1350 | 0 | } |
1351 | 0 | } |
1352 | |
|
1353 | 0 | if (use_quote) |
1354 | 0 | { |
1355 | 0 | CopySendChar(cstate, quotec); |
1356 | | |
1357 | | /* |
1358 | | * We adopt the same optimization strategy as in CopyAttributeOutText |
1359 | | */ |
1360 | 0 | start = ptr; |
1361 | 0 | while ((c = *ptr) != '\0') |
1362 | 0 | { |
1363 | 0 | if (c == quotec || c == escapec) |
1364 | 0 | { |
1365 | 0 | DUMPSOFAR(); |
1366 | 0 | CopySendChar(cstate, escapec); |
1367 | 0 | start = ptr; /* we include char in next run */ |
1368 | 0 | } |
1369 | 0 | if (IS_HIGHBIT_SET(c) && cstate->encoding_embeds_ascii) |
1370 | 0 | ptr += pg_encoding_mblen(cstate->file_encoding, ptr); |
1371 | 0 | else |
1372 | 0 | ptr++; |
1373 | 0 | } |
1374 | 0 | DUMPSOFAR(); |
1375 | |
|
1376 | 0 | CopySendChar(cstate, quotec); |
1377 | 0 | } |
1378 | 0 | else |
1379 | 0 | { |
1380 | | /* If it doesn't need quoting, we can just dump it as-is */ |
1381 | 0 | CopySendString(cstate, ptr); |
1382 | 0 | } |
1383 | 0 | } |
1384 | | |
1385 | | /* |
1386 | | * copy_dest_startup --- executor startup |
1387 | | */ |
1388 | | static void |
1389 | | copy_dest_startup(DestReceiver *self, int operation, TupleDesc typeinfo) |
1390 | 0 | { |
1391 | | /* no-op */ |
1392 | 0 | } |
1393 | | |
1394 | | /* |
1395 | | * copy_dest_receive --- receive one tuple |
1396 | | */ |
1397 | | static bool |
1398 | | copy_dest_receive(TupleTableSlot *slot, DestReceiver *self) |
1399 | 0 | { |
1400 | 0 | DR_copy *myState = (DR_copy *) self; |
1401 | 0 | CopyToState cstate = myState->cstate; |
1402 | | |
1403 | | /* Send the data */ |
1404 | 0 | CopyOneRowTo(cstate, slot); |
1405 | | |
1406 | | /* Increment the number of processed tuples, and report the progress */ |
1407 | 0 | pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED, |
1408 | 0 | ++myState->processed); |
1409 | |
|
1410 | 0 | return true; |
1411 | 0 | } |
1412 | | |
1413 | | /* |
1414 | | * copy_dest_shutdown --- executor end |
1415 | | */ |
1416 | | static void |
1417 | | copy_dest_shutdown(DestReceiver *self) |
1418 | 0 | { |
1419 | | /* no-op */ |
1420 | 0 | } |
1421 | | |
1422 | | /* |
1423 | | * copy_dest_destroy --- release DestReceiver object |
1424 | | */ |
1425 | | static void |
1426 | | copy_dest_destroy(DestReceiver *self) |
1427 | 0 | { |
1428 | 0 | pfree(self); |
1429 | 0 | } |
1430 | | |
1431 | | /* |
1432 | | * CreateCopyDestReceiver -- create a suitable DestReceiver object |
1433 | | */ |
1434 | | DestReceiver * |
1435 | | CreateCopyDestReceiver(void) |
1436 | 0 | { |
1437 | 0 | DR_copy *self = (DR_copy *) palloc(sizeof(DR_copy)); |
1438 | |
|
1439 | 0 | self->pub.receiveSlot = copy_dest_receive; |
1440 | 0 | self->pub.rStartup = copy_dest_startup; |
1441 | 0 | self->pub.rShutdown = copy_dest_shutdown; |
1442 | 0 | self->pub.rDestroy = copy_dest_destroy; |
1443 | 0 | self->pub.mydest = DestCopyOut; |
1444 | |
|
1445 | 0 | self->cstate = NULL; /* will be set later */ |
1446 | 0 | self->processed = 0; |
1447 | |
|
1448 | 0 | return (DestReceiver *) self; |
1449 | 0 | } |