From 8dfa2d8ba0d9e8c947d9186613fd52d011428330 Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Wed, 4 Dec 2013 16:37:38 +0100 Subject: [PATCH] wal_decoding: pg_recvlogical: Introduce pg_receivexlog equivalent for logical changes --- src/bin/pg_basebackup/.gitignore | 1 + src/bin/pg_basebackup/Makefile | 11 +- src/bin/pg_basebackup/pg_recvlogical.c | 872 +++++++++++++++++++++++++ src/bin/pg_basebackup/receivelog.c | 137 +--- src/bin/pg_basebackup/receivelog.h | 2 + src/bin/pg_basebackup/streamutil.c | 123 +++- src/bin/pg_basebackup/streamutil.h | 10 + 7 files changed, 1032 insertions(+), 124 deletions(-) create mode 100644 src/bin/pg_basebackup/pg_recvlogical.c diff --git a/src/bin/pg_basebackup/.gitignore b/src/bin/pg_basebackup/.gitignore index 1334a1f77b..eb2978cae7 100644 --- a/src/bin/pg_basebackup/.gitignore +++ b/src/bin/pg_basebackup/.gitignore @@ -1,2 +1,3 @@ /pg_basebackup /pg_receivexlog +/pg_receivellog diff --git a/src/bin/pg_basebackup/Makefile b/src/bin/pg_basebackup/Makefile index a707c93dc5..8a60525ccf 100644 --- a/src/bin/pg_basebackup/Makefile +++ b/src/bin/pg_basebackup/Makefile @@ -20,7 +20,7 @@ override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS) OBJS=receivelog.o streamutil.o $(WIN32RES) -all: pg_basebackup pg_receivexlog +all: pg_basebackup pg_receivexlog pg_recvlogical pg_basebackup: pg_basebackup.o $(OBJS) | submake-libpq submake-libpgport $(CC) $(CFLAGS) pg_basebackup.o $(OBJS) $(libpq_pgport) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X) @@ -28,9 +28,13 @@ pg_basebackup: pg_basebackup.o $(OBJS) | submake-libpq submake-libpgport pg_receivexlog: pg_receivexlog.o $(OBJS) | submake-libpq submake-libpgport $(CC) $(CFLAGS) pg_receivexlog.o $(OBJS) $(libpq_pgport) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X) +pg_recvlogical: pg_recvlogical.o $(OBJS) | submake-libpq submake-libpgport + $(CC) $(CFLAGS) pg_recvlogical.o $(OBJS) $(libpq_pgport) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X) + install: all installdirs $(INSTALL_PROGRAM) pg_basebackup$(X) '$(DESTDIR)$(bindir)/pg_basebackup$(X)' $(INSTALL_PROGRAM) pg_receivexlog$(X) '$(DESTDIR)$(bindir)/pg_receivexlog$(X)' + $(INSTALL_PROGRAM) pg_recvlogical$(X) '$(DESTDIR)$(bindir)/pg_recvlogical$(X)' installdirs: $(MKDIR_P) '$(DESTDIR)$(bindir)' @@ -38,6 +42,9 @@ installdirs: uninstall: rm -f '$(DESTDIR)$(bindir)/pg_basebackup$(X)' rm -f '$(DESTDIR)$(bindir)/pg_receivexlog$(X)' + rm -f '$(DESTDIR)$(bindir)/pg_recvlogical$(X)' clean distclean maintainer-clean: - rm -f pg_basebackup$(X) pg_receivexlog$(X) $(OBJS) pg_basebackup.o pg_receivexlog.o + rm -f pg_basebackup$(X) pg_receivexlog$(X) pg_recvlogical$(X) \ + pg_basebackup.o pg_receivexlog.o pg_recvlogical.o \ + $(OBJS) diff --git a/src/bin/pg_basebackup/pg_recvlogical.c b/src/bin/pg_basebackup/pg_recvlogical.c new file mode 100644 index 0000000000..c695210e44 --- /dev/null +++ b/src/bin/pg_basebackup/pg_recvlogical.c @@ -0,0 +1,872 @@ +/*------------------------------------------------------------------------- + * + * pg_recvlogical.c - receive streaming logical log data and write it + * to a local file. + * + * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/bin/pg_basebackup/pg_recvlogical.c + *------------------------------------------------------------------------- + */ + +#include "postgres_fe.h" + +#include "streamutil.h" + +#include "getopt_long.h" + +#include "libpq-fe.h" +#include "libpq/pqsignal.h" + +#include "access/xlog_internal.h" +#include "common/fe_memutils.h" + +#include +#include +#include + +/* Time to sleep between reconnection attempts */ +#define RECONNECT_SLEEP_TIME 5 + +/* Global Options */ +static char *outfile = NULL; +static int verbose = 0; +static int noloop = 0; +static int standby_message_timeout = 10 * 1000; /* 10 sec = default */ +static const char *slot = NULL; +static XLogRecPtr startpos = InvalidXLogRecPtr; +static bool do_init_slot = false; +static bool do_start_slot = false; +static bool do_stop_slot = false; + +/* filled pairwise with option, value. value may be NULL */ +static char **options; +static size_t noptions = 0; +static const char *plugin = "test_decoding"; + +/* Global State */ +static int outfd = -1; +static volatile bool time_to_abort = false; + +static void usage(void); +static void StreamLog(); + +static void +usage(void) +{ + printf(_("%s receives PostgreSQL logical change stream.\n\n"), + progname); + printf(_("Usage:\n")); + printf(_(" %s [OPTION]...\n"), progname); + printf(_("\nOptions:\n")); + printf(_(" -f, --file=FILE receive log into this file. - for stdout\n")); + printf(_(" -n, --no-loop do not loop on connection lost\n")); + printf(_(" -v, --verbose output verbose messages\n")); + printf(_(" -V, --version output version information, then exit\n")); + printf(_(" -?, --help show this help, then exit\n")); + printf(_("\nConnection options:\n")); + printf(_(" -d, --database=DBNAME database to connect to\n")); + printf(_(" -h, --host=HOSTNAME database server host or socket directory\n")); + printf(_(" -p, --port=PORT database server port number\n")); + printf(_(" -U, --username=NAME connect as specified database user\n")); + printf(_(" -w, --no-password never prompt for password\n")); + printf(_(" -W, --password force password prompt (should happen automatically)\n")); + printf(_("\nReplication options:\n")); + printf(_(" -o, --option=NAME[=VALUE]\n" + " Specify option NAME with optional value VAL, to be passed\n" + " to the output plugin\n")); + printf(_(" -P, --plugin=PLUGIN use output plugin PLUGIN (defaults to test_decoding)\n")); + printf(_(" -s, --status-interval=INTERVAL\n" + " time between status packets sent to server (in seconds)\n")); + printf(_(" -S, --slot=SLOT use existing replication slot SLOT instead of starting a new one\n")); + printf(_(" -I, --startpos=PTR Where in an existing slot should the streaming start")); + printf(_("\nAction to be performed:\n")); + printf(_(" --init initiate a new replication slot (for the slotname see --slot)\n")); + printf(_(" --start start streaming in a replication slot (for the slotname see --slot)\n")); + printf(_(" --stop stop the replication slot (for the slotname see --slot)\n")); + printf(_("\nReport bugs to .\n")); +} + +/* + * Send a Standby Status Update message to server. + */ +static bool +sendFeedback(PGconn *conn, XLogRecPtr blockpos, int64 now, bool force, bool replyRequested) +{ + char replybuf[1 + 8 + 8 + 8 + 8 + 1]; + int len = 0; + + /* + * we normally don't want to send superflous feedbacks, but if + * it's because of a timeout we need to, otherwise + * replication_timeout will kill us. + */ + if (blockpos == startpos && !force) + return true; + + if (verbose) + fprintf(stderr, + _("%s: confirming flush up to %X/%X (slot %s)\n"), + progname, (uint32) (blockpos >> 32), (uint32) blockpos, + slot); + + replybuf[len] = 'r'; + len += 1; + fe_sendint64(blockpos, &replybuf[len]); /* write */ + len += 8; + fe_sendint64(blockpos, &replybuf[len]); /* flush */ + len += 8; + fe_sendint64(InvalidXLogRecPtr, &replybuf[len]); /* apply */ + len += 8; + fe_sendint64(now, &replybuf[len]); /* sendTime */ + len += 8; + replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */ + len += 1; + + startpos = blockpos; + + if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn)) + { + fprintf(stderr, _("%s: could not send feedback packet: %s"), + progname, PQerrorMessage(conn)); + return false; + } + + return true; +} + +/* + * Start the log streaming + */ +static void +StreamLog(void) +{ + PGresult *res; + char query[512]; + char *copybuf = NULL; + int64 last_status = -1; + XLogRecPtr logoff = InvalidXLogRecPtr; + int written; + int i; + + /* + * Connect in replication mode to the server + */ + if (!conn) + conn = GetConnection(); + if (!conn) + /* Error message already written in GetConnection() */ + return; + + /* + * Start the replication + */ + if (verbose) + fprintf(stderr, + _("%s: starting log streaming at %X/%X (slot %s)\n"), + progname, (uint32) (startpos >> 32), (uint32) startpos, + slot); + + /* Initiate the replication stream at specified location */ + written = snprintf(query, sizeof(query), "START_LOGICAL_REPLICATION \"%s\" %X/%X", + slot, (uint32) (startpos >> 32), (uint32) startpos); + + /* + * add options to string, if present + * Oh, if we just had stringinfo in src/common... + */ + if (noptions) + written += snprintf(query + written, sizeof(query) - written, " ("); + + for (i = 0; i < noptions; i++) + { + /* separator */ + if (i > 0) + written += snprintf(query + written, sizeof(query) - written, ", "); + + /* write option name */ + written += snprintf(query + written, sizeof(query) - written, "\"%s\"", + options[(i * 2)]); + + if (written >= sizeof(query) - 1) + { + fprintf(stderr, _("%s: option string too long\n"), progname); + exit(1); /* no point in retrying, fatal error */ + } + + /* write option name if specified */ + if (options[(i * 2) + 1] != NULL) + { + written += snprintf(query + written, sizeof(query) - written, " '%s'", + options[(i * 2) + 1]); + + if (written >= sizeof(query) - 1) + { + fprintf(stderr, _("%s: option string too long\n"), progname); + exit(1); /* no point in retrying, fatal error */ + } + } + } + + if (noptions) + { + written += snprintf(query + written, sizeof(query) - written, ")"); + if (written >= sizeof(query) - 1) + { + fprintf(stderr, _("%s: option string too long\n"), progname); + exit(1); /* no point in retrying, fatal error */ + } + } + + res = PQexec(conn, query); + if (PQresultStatus(res) != PGRES_COPY_BOTH) + { + fprintf(stderr, _("%s: could not send replication command \"%s\": %s\n"), + progname, query, PQresultErrorMessage(res)); + PQclear(res); + goto error; + } + PQclear(res); + + if (verbose) + fprintf(stderr, + _("%s: initiated streaming\n"), + progname); + + while (!time_to_abort) + { + int r; + int bytes_left; + int bytes_written; + int64 now; + int hdr_len; + + if (copybuf != NULL) + { + PQfreemem(copybuf); + copybuf = NULL; + } + + /* + * Potentially send a status message to the master + */ + now = feGetCurrentTimestamp(); + if (standby_message_timeout > 0 && + feTimestampDifferenceExceeds(last_status, now, + standby_message_timeout)) + { + /* Time to send feedback! */ + if (!sendFeedback(conn, logoff, now, true, false)) + goto error; + + last_status = now; + } + + r = PQgetCopyData(conn, ©buf, 1); + if (r == 0) + { + /* + * In async mode, and no data available. We block on reading but + * not more than the specified timeout, so that we can send a + * response back to the client. + */ + fd_set input_mask; + struct timeval timeout; + struct timeval *timeoutptr; + + + { + now = feGetCurrentTimestamp(); + if (!sendFeedback(conn, logoff, now, false, false)) + goto error; + } + + FD_ZERO(&input_mask); + FD_SET(PQsocket(conn), &input_mask); + if (standby_message_timeout) + { + int64 targettime; + long secs; + int usecs; + + targettime = last_status + (standby_message_timeout - 1) * + ((int64) 1000); + feTimestampDifference(now, + targettime, + &secs, + &usecs); + if (secs <= 0) + timeout.tv_sec = 1; /* Always sleep at least 1 sec */ + else + timeout.tv_sec = secs; + timeout.tv_usec = usecs; + timeoutptr = &timeout; + } + else + timeoutptr = NULL; + + r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr); + if (r == 0 || (r < 0 && errno == EINTR)) + { + /* + * Got a timeout or signal. Continue the loop and either + * deliver a status packet to the server or just go back into + * blocking. + */ + continue; + } + else if (r < 0) + { + fprintf(stderr, _("%s: select() failed: %s\n"), + progname, strerror(errno)); + goto error; + } + /* Else there is actually data on the socket */ + if (PQconsumeInput(conn) == 0) + { + fprintf(stderr, + _("%s: could not receive data from WAL stream: %s"), + progname, PQerrorMessage(conn)); + goto error; + } + continue; + } + if (r == -1) + /* End of copy stream */ + break; + if (r == -2) + { + fprintf(stderr, _("%s: could not read COPY data: %s"), + progname, PQerrorMessage(conn)); + goto error; + } + + /* Check the message type. */ + if (copybuf[0] == 'k') + { + int pos; + bool replyRequested; + XLogRecPtr walEnd; + + /* + * Parse the keepalive message, enclosed in the CopyData message. + * We just check if the server requested a reply, and ignore the + * rest. + */ + pos = 1; /* skip msgtype 'k' */ + walEnd = fe_recvint64(©buf[pos]); + logoff = Max(walEnd, logoff); + + pos += 8; /* read walEnd */ + + pos += 8; /* skip sendTime */ + + if (r < pos + 1) + { + fprintf(stderr, _("%s: streaming header too small: %d\n"), + progname, r); + goto error; + } + replyRequested = copybuf[pos]; + + /* If the server requested an immediate reply, send one. */ + if (replyRequested) + { + now = feGetCurrentTimestamp(); + if (!sendFeedback(conn, logoff, now, false, false)) + goto error; + last_status = now; + } + continue; + } + else if (copybuf[0] != 'w') + { + fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"), + progname, copybuf[0]); + goto error; + } + + + /* + * Read the header of the XLogData message, enclosed in the CopyData + * message. We only need the WAL location field (dataStart), the rest + * of the header is ignored. + */ + hdr_len = 1; /* msgtype 'w' */ + hdr_len += 8; /* dataStart */ + hdr_len += 8; /* walEnd */ + hdr_len += 8; /* sendTime */ + if (r < hdr_len + 1) + { + fprintf(stderr, _("%s: streaming header too small: %d\n"), + progname, r); + goto error; + } + + /* Extract WAL location for this block */ + { + XLogRecPtr temp = fe_recvint64(©buf[1]); + + logoff = Max(temp, logoff); + } + + if (outfd == -1 && strcmp(outfile, "-") == 0) + { + outfd = fileno(stdout); + } + else if (outfd == -1) + { + outfd = open(outfile, O_CREAT | O_APPEND | O_WRONLY | PG_BINARY, + S_IRUSR | S_IWUSR); + if (outfd == -1) + { + fprintf(stderr, + _("%s: could not open log file \"%s\": %s\n"), + progname, outfile, strerror(errno)); + goto error; + } + } + + bytes_left = r - hdr_len; + bytes_written = 0; + + + while (bytes_left) + { + int ret; + + ret = write(outfd, + copybuf + hdr_len + bytes_written, + bytes_left); + + if (ret < 0) + { + fprintf(stderr, + _("%s: could not write %u bytes to log file \"%s\": %s\n"), + progname, bytes_left, outfile, + strerror(errno)); + goto error; + } + + /* Write was successful, advance our position */ + bytes_written += ret; + bytes_left -= ret; + } + + if (write(outfd, "\n", 1) != 1) + { + fprintf(stderr, + _("%s: could not write %u bytes to log file \"%s\": %s\n"), + progname, 1, outfile, + strerror(errno)); + goto error; + } + } + + res = PQgetResult(conn); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + fprintf(stderr, + _("%s: unexpected termination of replication stream: %s"), + progname, PQresultErrorMessage(res)); + goto error; + } + PQclear(res); + + if (copybuf != NULL) + PQfreemem(copybuf); + + if (outfd != -1 && close(outfd) != 0) + fprintf(stderr, _("%s: could not close file \"%s\": %s\n"), + progname, outfile, strerror(errno)); + outfd = -1; +error: + PQfinish(conn); + conn = NULL; +} + +/* + * When sigint is called, just tell the system to exit at the next possible + * moment. + */ +#ifndef WIN32 + +static void +sigint_handler(int signum) +{ + time_to_abort = true; +} +#endif + +int +main(int argc, char **argv) +{ + PGresult *res; + static struct option long_options[] = { +/* general options */ + {"file", required_argument, NULL, 'f'}, + {"no-loop", no_argument, NULL, 'n'}, + {"verbose", no_argument, NULL, 'v'}, + {"version", no_argument, NULL, 'V'}, + {"help", no_argument, NULL, '?'}, +/* connnection options */ + {"database", required_argument, NULL, 'd'}, + {"host", required_argument, NULL, 'h'}, + {"port", required_argument, NULL, 'p'}, + {"username", required_argument, NULL, 'U'}, + {"no-password", no_argument, NULL, 'w'}, + {"password", no_argument, NULL, 'W'}, +/* replication options */ + {"option", required_argument, NULL, 'o'}, + {"plugin", required_argument, NULL, 'P'}, + {"status-interval", required_argument, NULL, 's'}, + {"slot", required_argument, NULL, 'S'}, + {"startpos", required_argument, NULL, 'I'}, +/* action */ + {"init", no_argument, NULL, 1}, + {"start", no_argument, NULL, 2}, + {"stop", no_argument, NULL, 3}, + {NULL, 0, NULL, 0} + }; + int c; + int option_index; + uint32 hi, + lo; + + progname = get_progname(argv[0]); + set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_recvlogical")); + + if (argc > 1) + { + if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0) + { + usage(); + exit(0); + } + else if (strcmp(argv[1], "-V") == 0 || + strcmp(argv[1], "--version") == 0) + { + puts("pg_recvlogical (PostgreSQL) " PG_VERSION); + exit(0); + } + } + + while ((c = getopt_long(argc, argv, "f:nvd:h:o:p:U:wWP:s:S:", + long_options, &option_index)) != -1) + { + switch (c) + { +/* general options */ + case 'f': + outfile = pg_strdup(optarg); + break; + case 'n': + noloop = 1; + break; + case 'v': + verbose++; + break; +/* connnection options */ + case 'd': + dbname = pg_strdup(optarg); + break; + case 'h': + dbhost = pg_strdup(optarg); + break; + case 'p': + if (atoi(optarg) <= 0) + { + fprintf(stderr, _("%s: invalid port number \"%s\"\n"), + progname, optarg); + exit(1); + } + dbport = pg_strdup(optarg); + break; + case 'U': + dbuser = pg_strdup(optarg); + break; + case 'w': + dbgetpassword = -1; + break; + case 'W': + dbgetpassword = 1; + break; +/* replication options */ + case 'o': + { + char *data = pg_strdup(optarg); + char *val = strchr(data, '='); + + if (val != NULL) + { + /* remove =; separate data from val */ + *val = '\0'; + val++; + } + + noptions += 1; + options = pg_realloc(options, sizeof(char*) * noptions * 2); + + options[(noptions - 1) * 2] = data; + options[(noptions - 1) * 2 + 1] = val; + } + + break; + case 'P': + plugin = pg_strdup(optarg); + break; + case 's': + standby_message_timeout = atoi(optarg) * 1000; + if (standby_message_timeout < 0) + { + fprintf(stderr, _("%s: invalid status interval \"%s\"\n"), + progname, optarg); + exit(1); + } + break; + case 'S': + slot = pg_strdup(optarg); + break; + case 'I': + if (sscanf(optarg, "%X/%X", &hi, &lo) != 2) + { + fprintf(stderr, + _("%s: could not parse start position \"%s\"\n"), + progname, optarg); + exit(1); + } + startpos = ((uint64) hi) << 32 | lo; + break; +/* action */ + case 1: + do_init_slot = true; + break; + case 2: + do_start_slot = true; + break; + case 3: + do_stop_slot = true; + break; + + default: + + /* + * getopt_long already emitted a complaint + */ + fprintf(stderr, _("Try \"%s --help\" for more information.\n"), + progname); + exit(1); + } + } + + /* + * Any non-option arguments? + */ + if (optind < argc) + { + fprintf(stderr, + _("%s: too many command-line arguments (first is \"%s\")\n"), + progname, argv[optind]); + fprintf(stderr, _("Try \"%s --help\" for more information.\n"), + progname); + exit(1); + } + + /* + * Required arguments + */ + if (slot == NULL) + { + fprintf(stderr, _("%s: no slot specified\n"), progname); + fprintf(stderr, _("Try \"%s --help\" for more information.\n"), + progname); + exit(1); + } + + if (!do_stop_slot && outfile == NULL) + { + fprintf(stderr, _("%s: no target file specified\n"), progname); + fprintf(stderr, _("Try \"%s --help\" for more information.\n"), + progname); + exit(1); + } + + if (!do_stop_slot && dbname == NULL) + { + fprintf(stderr, _("%s: no database specified\n"), progname); + fprintf(stderr, _("Try \"%s --help\" for more information.\n"), + progname); + exit(1); + } + + if (!do_stop_slot && !do_init_slot && !do_start_slot) + { + fprintf(stderr, _("%s: at least one action needs to be specified\n"), progname); + fprintf(stderr, _("Try \"%s --help\" for more information.\n"), + progname); + exit(1); + } + + if (do_stop_slot && (do_init_slot || do_start_slot)) + { + fprintf(stderr, _("%s: --stop cannot be combined with --init or --start\n"), progname); + fprintf(stderr, _("Try \"%s --help\" for more information.\n"), + progname); + exit(1); + } + + if (startpos && (do_init_slot || do_stop_slot)) + { + fprintf(stderr, _("%s: --startpos cannot be combined with --init or --stop\n"), progname); + fprintf(stderr, _("Try \"%s --help\" for more information.\n"), + progname); + exit(1); + } + +#ifndef WIN32 + pqsignal(SIGINT, sigint_handler); +#endif + + /* + * don't really need this but it actually helps to get more precise error + * messages about authentication, required GUCs and such without starting + * to loop around connection attempts lateron. + */ + { + conn = GetConnection(); + if (!conn) + /* Error message already written in GetConnection() */ + exit(1); + + /* + * Run IDENTIFY_SYSTEM so we can get the timeline and current xlog + * position. + */ + res = PQexec(conn, "IDENTIFY_SYSTEM"); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + fprintf(stderr, _("%s: could not send replication command \"%s\": %s"), + progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn)); + disconnect_and_exit(1); + } + + if (PQntuples(res) != 1 || PQnfields(res) != 4) + { + fprintf(stderr, + _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d fields\n"), + progname, PQntuples(res), PQnfields(res), 1, 4); + disconnect_and_exit(1); + } + PQclear(res); + } + + + /* + * stop a replication slot + */ + if (do_stop_slot) + { + char query[256]; + + if (verbose) + fprintf(stderr, + _("%s: freeing replication slot \"%s\"\n"), + progname, slot); + + snprintf(query, sizeof(query), "FREE_LOGICAL_REPLICATION \"%s\"", + slot); + res = PQexec(conn, query); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + fprintf(stderr, _("%s: could not send replication command \"%s\": %s"), + progname, query, PQerrorMessage(conn)); + disconnect_and_exit(1); + } + + if (PQntuples(res) != 0 || PQnfields(res) != 0) + { + fprintf(stderr, + _("%s: could not stop logical rep: got %d rows and %d fields, expected %d rows and %d fields\n"), + progname, PQntuples(res), PQnfields(res), 0, 0); + disconnect_and_exit(1); + } + + PQclear(res); + disconnect_and_exit(0); + } + + /* + * init a replication slot + */ + if (do_init_slot) + { + char query[256]; + + if (verbose) + fprintf(stderr, + _("%s: initializing replication slot \"%s\"\n"), + progname, slot); + + snprintf(query, sizeof(query), "INIT_LOGICAL_REPLICATION \"%s\" \"%s\"", + slot, plugin); + + res = PQexec(conn, query); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + fprintf(stderr, _("%s: could not send replication command \"%s\": %s"), + progname, query, PQerrorMessage(conn)); + disconnect_and_exit(1); + } + + if (PQntuples(res) != 1 || PQnfields(res) != 4) + { + fprintf(stderr, + _("%s: could not init logical rep: got %d rows and %d fields, expected %d rows and %d fields\n"), + progname, PQntuples(res), PQnfields(res), 1, 4); + disconnect_and_exit(1); + } + + if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &hi, &lo) != 2) + { + fprintf(stderr, + _("%s: could not parse log location \"%s\"\n"), + progname, PQgetvalue(res, 0, 1)); + disconnect_and_exit(1); + } + startpos = ((uint64) hi) << 32 | lo; + + slot = strdup(PQgetvalue(res, 0, 0)); + PQclear(res); + } + + + if (!do_start_slot) + disconnect_and_exit(0); + + while (true) + { + StreamLog(); + if (time_to_abort) + { + /* + * We've been Ctrl-C'ed. That's not an error, so exit without an + * errorcode. + */ + disconnect_and_exit(0); + } + else if (noloop) + { + fprintf(stderr, _("%s: disconnected.\n"), progname); + exit(1); + } + else + { + fprintf(stderr, + /* translator: check source for value for %d */ + _("%s: disconnected. Waiting %d seconds to try again.\n"), + progname, RECONNECT_SLEEP_TIME); + pg_usleep(RECONNECT_SLEEP_TIME * 1000000); + } + } +} diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c index 7c85d306b2..6e573da85d 100644 --- a/src/bin/pg_basebackup/receivelog.c +++ b/src/bin/pg_basebackup/receivelog.c @@ -11,21 +11,18 @@ * src/bin/pg_basebackup/receivelog.c *------------------------------------------------------------------------- */ + #include "postgres_fe.h" -#include -#include -#include -#include -/* for ntohl/htonl */ -#include -#include +/* local includes */ +#include "receivelog.h" +#include "streamutil.h" #include "libpq-fe.h" #include "access/xlog_internal.h" -#include "receivelog.h" -#include "streamutil.h" +#include +#include /* fd and filename for currently open WAL file */ @@ -191,63 +188,6 @@ close_walfile(char *basedir, char *partial_suffix) } -/* - * Local version of GetCurrentTimestamp(), since we are not linked with - * backend code. The protocol always uses integer timestamps, regardless of - * server setting. - */ -static int64 -localGetCurrentTimestamp(void) -{ - int64 result; - struct timeval tp; - - gettimeofday(&tp, NULL); - - result = (int64) tp.tv_sec - - ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY); - - result = (result * USECS_PER_SEC) + tp.tv_usec; - - return result; -} - -/* - * Local version of TimestampDifference(), since we are not linked with - * backend code. - */ -static void -localTimestampDifference(int64 start_time, int64 stop_time, - long *secs, int *microsecs) -{ - int64 diff = stop_time - start_time; - - if (diff <= 0) - { - *secs = 0; - *microsecs = 0; - } - else - { - *secs = (long) (diff / USECS_PER_SEC); - *microsecs = (int) (diff % USECS_PER_SEC); - } -} - -/* - * Local version of TimestampDifferenceExceeds(), since we are not - * linked with backend code. - */ -static bool -localTimestampDifferenceExceeds(int64 start_time, - int64 stop_time, - int msec) -{ - int64 diff = stop_time - start_time; - - return (diff >= msec * INT64CONST(1000)); -} - /* * Check if a timeline history file exists. */ @@ -367,47 +307,6 @@ writeTimeLineHistoryFile(char *basedir, TimeLineID tli, char *filename, char *co return true; } -/* - * Converts an int64 to network byte order. - */ -static void -sendint64(int64 i, char *buf) -{ - uint32 n32; - - /* High order half first, since we're doing MSB-first */ - n32 = (uint32) (i >> 32); - n32 = htonl(n32); - memcpy(&buf[0], &n32, 4); - - /* Now the low order half */ - n32 = (uint32) i; - n32 = htonl(n32); - memcpy(&buf[4], &n32, 4); -} - -/* - * Converts an int64 from network byte order to native format. - */ -static int64 -recvint64(char *buf) -{ - int64 result; - uint32 h32; - uint32 l32; - - memcpy(&h32, buf, 4); - memcpy(&l32, buf + 4, 4); - h32 = ntohl(h32); - l32 = ntohl(l32); - - result = h32; - result <<= 32; - result |= l32; - - return result; -} - /* * Send a Standby Status Update message to server. */ @@ -419,13 +318,13 @@ sendFeedback(PGconn *conn, XLogRecPtr blockpos, int64 now, bool replyRequested) replybuf[len] = 'r'; len += 1; - sendint64(blockpos, &replybuf[len]); /* write */ + fe_sendint64(blockpos, &replybuf[len]); /* write */ len += 8; - sendint64(InvalidXLogRecPtr, &replybuf[len]); /* flush */ + fe_sendint64(InvalidXLogRecPtr, &replybuf[len]); /* flush */ len += 8; - sendint64(InvalidXLogRecPtr, &replybuf[len]); /* apply */ + fe_sendint64(InvalidXLogRecPtr, &replybuf[len]); /* apply */ len += 8; - sendint64(now, &replybuf[len]); /* sendTime */ + fe_sendint64(now, &replybuf[len]); /* sendTime */ len += 8; replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */ len += 1; @@ -827,9 +726,9 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, /* * Potentially send a status message to the master */ - now = localGetCurrentTimestamp(); + now = feGetCurrentTimestamp(); if (still_sending && standby_message_timeout > 0 && - localTimestampDifferenceExceeds(last_status, now, + feTimestampDifferenceExceeds(last_status, now, standby_message_timeout)) { /* Time to send feedback! */ @@ -858,10 +757,10 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, int usecs; targettime = last_status + (standby_message_timeout - 1) * ((int64) 1000); - localTimestampDifference(now, - targettime, - &secs, - &usecs); + feTimestampDifference(now, + targettime, + &secs, + &usecs); if (secs <= 0) timeout.tv_sec = 1; /* Always sleep at least 1 sec */ else @@ -965,7 +864,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, /* If the server requested an immediate reply, send one. */ if (replyRequested && still_sending) { - now = localGetCurrentTimestamp(); + now = feGetCurrentTimestamp(); if (!sendFeedback(conn, blockpos, now, false)) goto error; last_status = now; @@ -995,7 +894,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, progname, r); goto error; } - blockpos = recvint64(©buf[1]); + blockpos = fe_recvint64(©buf[1]); /* Extract WAL location for this block */ xlogoff = blockpos % XLOG_SEG_SIZE; diff --git a/src/bin/pg_basebackup/receivelog.h b/src/bin/pg_basebackup/receivelog.h index 7c983cd604..f4789a580a 100644 --- a/src/bin/pg_basebackup/receivelog.h +++ b/src/bin/pg_basebackup/receivelog.h @@ -1,3 +1,5 @@ +#include "libpq-fe.h" + #include "access/xlogdefs.h" /* diff --git a/src/bin/pg_basebackup/streamutil.c b/src/bin/pg_basebackup/streamutil.c index 6cc6cd200e..4ad1dfbfb3 100644 --- a/src/bin/pg_basebackup/streamutil.c +++ b/src/bin/pg_basebackup/streamutil.c @@ -11,17 +11,35 @@ *------------------------------------------------------------------------- */ -#include "postgres_fe.h" +/* + * We have to use postgres.h not postgres_fe.h here, because there's + * backend-only stuff in the datetime include files we need. But we need a + * frontend-ish environment otherwise. Hence this ugly hack. + */ +#define FRONTEND 1 +#include "postgres.h" + #include "streamutil.h" +#include "common/fe_memutils.h" +#include "utils/datetime.h" + #include #include +#include +#include +#include + +/* for ntohl/htonl */ +#include +#include const char *progname; char *connection_string = NULL; char *dbhost = NULL; char *dbuser = NULL; char *dbport = NULL; +char *dbname = NULL; int dbgetpassword = 0; /* 0=auto, -1=never, 1=always */ static char *dbpassword = NULL; PGconn *conn = NULL; @@ -86,10 +104,10 @@ GetConnection(void) } keywords[i] = "dbname"; - values[i] = "replication"; + values[i] = dbname == NULL ? "replication" : dbname; i++; keywords[i] = "replication"; - values[i] = "true"; + values[i] = dbname == NULL ? "true" : "database"; i++; keywords[i] = "fallback_application_name"; values[i] = progname; @@ -211,3 +229,102 @@ GetConnection(void) return tmpconn; } + + +/* + * Frontend version of GetCurrentTimestamp(), since we are not linked with + * backend code. The protocol always uses integer timestamps, regardless of + * server setting. + */ +int64 +feGetCurrentTimestamp(void) +{ + int64 result; + struct timeval tp; + + gettimeofday(&tp, NULL); + + result = (int64) tp.tv_sec - + ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY); + + result = (result * USECS_PER_SEC) + tp.tv_usec; + + return result; +} + +/* + * Frontend version of TimestampDifference(), since we are not linked with + * backend code. + */ +void +feTimestampDifference(int64 start_time, int64 stop_time, + long *secs, int *microsecs) +{ + int64 diff = stop_time - start_time; + + if (diff <= 0) + { + *secs = 0; + *microsecs = 0; + } + else + { + *secs = (long) (diff / USECS_PER_SEC); + *microsecs = (int) (diff % USECS_PER_SEC); + } +} + +/* + * Frontend version of TimestampDifferenceExceeds(), since we are not + * linked with backend code. + */ +bool +feTimestampDifferenceExceeds(int64 start_time, + int64 stop_time, + int msec) +{ + int64 diff = stop_time - start_time; + + return (diff >= msec * INT64CONST(1000)); +} + +/* + * Converts an int64 to network byte order. + */ +void +fe_sendint64(int64 i, char *buf) +{ + uint32 n32; + + /* High order half first, since we're doing MSB-first */ + n32 = (uint32) (i >> 32); + n32 = htonl(n32); + memcpy(&buf[0], &n32, 4); + + /* Now the low order half */ + n32 = (uint32) i; + n32 = htonl(n32); + memcpy(&buf[4], &n32, 4); +} + +/* + * Converts an int64 from network byte order to native format. + */ +int64 +fe_recvint64(char *buf) +{ + int64 result; + uint32 h32; + uint32 l32; + + memcpy(&h32, buf, 4); + memcpy(&l32, buf + 4, 4); + h32 = ntohl(h32); + l32 = ntohl(l32); + + result = h32; + result <<= 32; + result |= l32; + + return result; +} diff --git a/src/bin/pg_basebackup/streamutil.h b/src/bin/pg_basebackup/streamutil.h index 77d6b86ced..4286df86ed 100644 --- a/src/bin/pg_basebackup/streamutil.h +++ b/src/bin/pg_basebackup/streamutil.h @@ -5,6 +5,7 @@ extern char *connection_string; extern char *dbhost; extern char *dbuser; extern char *dbport; +extern char *dbname; extern int dbgetpassword; /* Connection kept global so we can disconnect easily */ @@ -17,3 +18,12 @@ extern PGconn *conn; } extern PGconn *GetConnection(void); + +extern int64 feGetCurrentTimestamp(void); +extern void feTimestampDifference(int64 start_time, int64 stop_time, + long *secs, int *microsecs); + +extern bool feTimestampDifferenceExceeds(int64 start_time, int64 stop_time, + int msec); +extern void fe_sendint64(int64 i, char *buf); +extern int64 fe_recvint64(char *buf); -- 2.39.5