#include "access/timeline.h"
#include "access/transam.h"
-#include "access/xlog_internal.h"
#include "access/xact.h"
+#include "access/xlog_internal.h"
#include "catalog/pg_type.h"
#include "commands/dbcommands.h"
#include "miscadmin.h"
#include "nodes/replnodes.h"
#include "replication/basebackup.h"
+#include "replication/decode.h"
+#include "replication/logical.h"
+#include "replication/logicalfuncs.h"
+#include "replication/snapbuild.h"
#include "replication/syncrep.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
static bool streamingDoneSending;
static bool streamingDoneReceiving;
+/* Are we there yet? */
+static bool WalSndCaughtUp = false;
+
/* Flags set by signal handlers for later service in main loop */
static volatile sig_atomic_t got_SIGHUP = false;
static volatile sig_atomic_t walsender_ready_to_stop = false;
*/
static volatile sig_atomic_t replication_active = false;
+/* XXX reader */
+static MemoryContext decoding_ctx = NULL;
+static MemoryContext old_decoding_ctx = NULL;
+
+static LogicalDecodingContext *logical_decoding_ctx = NULL;
+static XLogRecPtr logical_startptr = InvalidXLogRecPtr;
+
/* Signal handlers */
static void WalSndSigHupHandler(SIGNAL_ARGS);
static void WalSndXLogSendHandler(SIGNAL_ARGS);
static void WalSndLastCycleHandler(SIGNAL_ARGS);
/* Prototypes for private functions */
-static void WalSndLoop(void);
+typedef void (*WalSndSendData)(void);
+static void WalSndLoop(WalSndSendData send_data);
static void InitWalSenderSlot(void);
static void WalSndKill(int code, Datum arg);
-static void XLogSend(bool *caughtup);
+static void XLogSendPhysical(void);
+static void XLogSendLogical(void);
+static void WalSndDone(WalSndSendData send_data);
static XLogRecPtr GetStandbyFlushRecPtr(void);
static void IdentifySystem(void);
static void StartReplication(StartReplicationCmd *cmd);
+static void InitLogicalReplication(InitLogicalReplicationCmd *cmd);
+static void StartLogicalReplication(StartLogicalReplicationCmd *cmd);
+static void FreeLogicalReplication(FreeLogicalReplicationCmd *cmd);
static void ProcessStandbyMessage(void);
static void ProcessStandbyReplyMessage(void);
static void ProcessStandbyHSFeedbackMessage(void);
static void ProcessRepliesIfAny(void);
static void WalSndKeepalive(bool requestReply);
+static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid);
+static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid);
+static void XLogRead(char *buf, XLogRecPtr startptr, Size count);
+
+
/* Initialize walsender process before entering the main command loop */
/* Main loop of walsender */
replication_active = true;
- WalSndLoop();
+ WalSndLoop(XLogSendPhysical);
replication_active = false;
if (walsender_ready_to_stop)
pq_puttextmessage('C', "START_STREAMING");
}
+static int
+replay_read_page(XLogReaderState* state, XLogRecPtr targetPagePtr, int reqLen,
+ XLogRecPtr targetRecPtr, char* cur_page, TimeLineID *pageTLI)
+{
+ XLogRecPtr flushptr;
+ int count;
+
+ flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
+
+ /* more than one block available */
+ if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
+ count = XLOG_BLCKSZ;
+ /* not enough data there */
+ else if (targetPagePtr + reqLen > flushptr)
+ return -1;
+ /* part of the page available */
+ else
+ count = flushptr - targetPagePtr;
+
+ /* FIXME: more sensible/efficient implementation */
+ XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ);
+
+ return count;
+}
+
+/*
+ * Initialize logical replication and wait for an initial consistent point to
+ * start sending changes from.
+ */
+static void
+InitLogicalReplication(InitLogicalReplicationCmd *cmd)
+{
+ const char *slot_name;
+ StringInfoData buf;
+ char xpos[MAXFNAMELEN];
+ const char *snapshot_name = NULL;
+ LogicalDecodingContext *ctx;
+ XLogRecPtr startptr;
+
+ CheckLogicalDecodingRequirements();
+
+ Assert(!MyLogicalDecodingSlot);
+
+ /* XXX apply sanity checking to slot name? */
+ LogicalDecodingAcquireFreeSlot(cmd->name, cmd->plugin);
+
+ Assert(MyLogicalDecodingSlot);
+
+ decoding_ctx = AllocSetContextCreate(TopMemoryContext,
+ "decoding context",
+ ALLOCSET_DEFAULT_MINSIZE,
+ ALLOCSET_DEFAULT_INITSIZE,
+ ALLOCSET_DEFAULT_MAXSIZE);
+ old_decoding_ctx = MemoryContextSwitchTo(decoding_ctx);
+
+ /* setup state for XLogReadPage */
+ sendTimeLineIsHistoric = false;
+ sendTimeLine = ThisTimeLineID;
+
+ initStringInfo(&output_message);
+ ctx = CreateLogicalDecodingContext(MyLogicalDecodingSlot, true, InvalidXLogRecPtr,
+ NIL, replay_read_page,
+ WalSndPrepareWrite, WalSndWriteData);
+
+ MemoryContextSwitchTo(old_decoding_ctx);
+
+ startptr = MyLogicalDecodingSlot->restart_decoding;
+
+ elog(WARNING, "Initiating logical rep from %X/%X",
+ (uint32)(startptr >> 32), (uint32)startptr);
+
+ for (;;)
+ {
+ XLogRecord *record;
+ XLogRecordBuffer buf;
+ char *err = NULL;
+
+ /* the read_page callback waits for new WAL */
+ record = XLogReadRecord(ctx->reader, startptr, &err);
+ /* xlog record was invalid */
+ if (err)
+ elog(ERROR, "%s", err);
+
+ /* read up from last position next time round */
+ startptr = InvalidXLogRecPtr;
+
+ Assert(record);
+
+ buf.origptr = ctx->reader->ReadRecPtr;
+ buf.endptr = ctx->reader->EndRecPtr;
+ buf.record = *record;
+ buf.record_data = XLogRecGetData(record);
+ DecodeRecordIntoReorderBuffer(ctx, &buf);
+
+ /* only continue till we found a consistent spot */
+ if (LogicalDecodingContextReady(ctx))
+ {
+ /* export plain, importable, snapshot to the user */
+ snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);
+ break;
+ }
+ }
+
+ MyLogicalDecodingSlot->confirmed_flush = ctx->reader->EndRecPtr;
+ slot_name = NameStr(MyLogicalDecodingSlot->name);
+ snprintf(xpos, sizeof(xpos), "%X/%X",
+ (uint32) (MyLogicalDecodingSlot->confirmed_flush >> 32),
+ (uint32) MyLogicalDecodingSlot->confirmed_flush);
+
+ pq_beginmessage(&buf, 'T');
+ pq_sendint(&buf, 4, 2); /* 4 fields */
+
+ /* first field: slot name */
+ pq_sendstring(&buf, "replication_id"); /* col name */
+ pq_sendint(&buf, 0, 4); /* table oid */
+ pq_sendint(&buf, 0, 2); /* attnum */
+ pq_sendint(&buf, TEXTOID, 4); /* type oid */
+ pq_sendint(&buf, -1, 2); /* typlen */
+ pq_sendint(&buf, 0, 4); /* typmod */
+ pq_sendint(&buf, 0, 2); /* format code */
+
+ /* second field: LSN at which we became consistent */
+ pq_sendstring(&buf, "consistent_point"); /* col name */
+ pq_sendint(&buf, 0, 4); /* table oid */
+ pq_sendint(&buf, 0, 2); /* attnum */
+ pq_sendint(&buf, TEXTOID, 4); /* type oid */
+ pq_sendint(&buf, -1, 2); /* typlen */
+ pq_sendint(&buf, 0, 4); /* typmod */
+ pq_sendint(&buf, 0, 2); /* format code */
+
+ /* third field: exported snapshot's name */
+ pq_sendstring(&buf, "snapshot_name"); /* col name */
+ pq_sendint(&buf, 0, 4); /* table oid */
+ pq_sendint(&buf, 0, 2); /* attnum */
+ pq_sendint(&buf, TEXTOID, 4); /* type oid */
+ pq_sendint(&buf, -1, 2); /* typlen */
+ pq_sendint(&buf, 0, 4); /* typmod */
+ pq_sendint(&buf, 0, 2); /* format code */
+
+ /* fourth field: output plugin */
+ pq_sendstring(&buf, "plugin"); /* col name */
+ pq_sendint(&buf, 0, 4); /* table oid */
+ pq_sendint(&buf, 0, 2); /* attnum */
+ pq_sendint(&buf, TEXTOID, 4); /* type oid */
+ pq_sendint(&buf, -1, 2); /* typlen */
+ pq_sendint(&buf, 0, 4); /* typmod */
+ pq_sendint(&buf, 0, 2); /* format code */
+
+ pq_endmessage(&buf);
+
+ /* Send a DataRow message */
+ pq_beginmessage(&buf, 'D');
+ pq_sendint(&buf, 4, 2); /* # of columns */
+
+ /* replication_id */
+ pq_sendint(&buf, strlen(slot_name), 4); /* col1 len */
+ pq_sendbytes(&buf, slot_name, strlen(slot_name));
+
+ /* consistent wal location */
+ pq_sendint(&buf, strlen(xpos), 4); /* col2 len */
+ pq_sendbytes(&buf, xpos, strlen(xpos));
+
+ /* snapshot name */
+ pq_sendint(&buf, strlen(snapshot_name), 4); /* col3 len */
+ pq_sendbytes(&buf, snapshot_name, strlen(snapshot_name));
+
+ /* plugin */
+ pq_sendint(&buf, strlen(cmd->plugin), 4); /* col4 len */
+ pq_sendbytes(&buf, cmd->plugin, strlen(cmd->plugin));
+
+ pq_endmessage(&buf);
+
+ FreeLogicalDecodingContext(ctx);
+
+ /*
+ * release active status again, START_LOGICAL_REPLICATION will reacquire it
+ */
+ LogicalDecodingReleaseSlot();
+}
+
+/*
+ * Load previously initiated logical slot and prepare for sending data (via
+ * WalSndLoop).
+ */
+static void
+StartLogicalReplication(StartLogicalReplicationCmd *cmd)
+{
+ StringInfoData buf;
+ XLogRecPtr confirmed_flush;
+
+ elog(WARNING, "Starting logical replication from %x/%x",
+ (uint32)(cmd->startpoint >> 32), (uint32)cmd->startpoint);
+
+ /* make sure that our requirements are still fulfilled */
+ CheckLogicalDecodingRequirements();
+
+ Assert(!MyLogicalDecodingSlot);
+
+ LogicalDecodingReAcquireSlot(cmd->name);
+
+ if (am_cascading_walsender && !RecoveryInProgress())
+ {
+ ereport(LOG,
+ (errmsg("terminating walsender process to force cascaded standby to update timeline and reconnect")));
+ walsender_ready_to_stop = true;
+ }
+
+ WalSndSetState(WALSNDSTATE_CATCHUP);
+
+ /* Send a CopyBothResponse message, and start streaming */
+ pq_beginmessage(&buf, 'W');
+ pq_sendbyte(&buf, 0);
+ pq_sendint(&buf, 0, 2);
+ pq_endmessage(&buf);
+ pq_flush();
+
+ /* setup state for XLogReadPage */
+ sendTimeLineIsHistoric = false;
+ sendTimeLine = ThisTimeLineID;
+
+ confirmed_flush = MyLogicalDecodingSlot->confirmed_flush;
+
+ Assert(confirmed_flush != InvalidXLogRecPtr);
+
+ /* continue from last position */
+ if (cmd->startpoint == InvalidXLogRecPtr)
+ cmd->startpoint = MyLogicalDecodingSlot->confirmed_flush;
+ else if (cmd->startpoint < MyLogicalDecodingSlot->confirmed_flush)
+ elog(ERROR, "cannot stream from %X/%X, minimum is %X/%X",
+ (uint32)(cmd->startpoint >> 32), (uint32)cmd->startpoint,
+ (uint32)(confirmed_flush >> 32), (uint32)confirmed_flush);
+
+ /*
+ * Initialize position to the last ack'ed one, then the xlog records begin
+ * to be shipped from that position.
+ */
+ logical_decoding_ctx = CreateLogicalDecodingContext(
+ MyLogicalDecodingSlot, false, cmd->startpoint, cmd->options,
+ replay_read_page, WalSndPrepareWrite, WalSndWriteData);
+
+ /*
+ * XXX: For feedback purposes it would be nicer to set sentPtr to
+ * cmd->startpoint, but we use it to know where to read xlog in the main
+ * loop...
+ */
+ sentPtr = MyLogicalDecodingSlot->restart_decoding;
+ logical_startptr = sentPtr;
+
+ /* Also update the start position status in shared memory */
+ {
+ /* use volatile pointer to prevent code rearrangement */
+ volatile WalSnd *walsnd = MyWalSnd;
+
+ SpinLockAcquire(&walsnd->mutex);
+ walsnd->sentPtr = MyLogicalDecodingSlot->restart_decoding;
+ SpinLockRelease(&walsnd->mutex);
+ }
+
+ elog(LOG, "starting to decode from %X/%X, replay %X/%X",
+ (uint32)(MyWalSnd->sentPtr >> 32), (uint32)MyWalSnd->sentPtr,
+ (uint32)(cmd->startpoint >> 32), (uint32)cmd->startpoint);
+
+ replication_active = true;
+
+ SyncRepInitConfig();
+
+ /* Main loop of walsender */
+ WalSndLoop(XLogSendLogical);
+
+ FreeLogicalDecodingContext(logical_decoding_ctx);
+ LogicalDecodingReleaseSlot();
+
+ replication_active = false;
+ if (walsender_ready_to_stop)
+ proc_exit(0);
+ WalSndSetState(WALSNDSTATE_STARTUP);
+
+ /* Get out of COPY mode (CommandComplete). */
+ EndCommand("COPY 0", DestRemote);
+}
+
+/*
+ * Free permanent state by a now inactive but defined logical slot.
+ */
+static void
+FreeLogicalReplication(FreeLogicalReplicationCmd *cmd)
+{
+ CheckLogicalDecodingRequirements();
+ LogicalDecodingFreeSlot(cmd->name);
+ EndCommand("FREE_LOGICAL_REPLICATION", DestRemote);
+}
+
+/*
+ * LogicalDecodingContext 'prepare_write' callback.
+ *
+ * Prepare a write into a StringInfo.
+ *
+ * Don't do anything lasting in here, it's quite possible that nothing will done
+ * with the data.
+ */
+static void
+WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
+{
+ AssertVariableIsOfType(&WalSndPrepareWrite, LogicalOutputPluginWriterPrepareWrite);
+
+ resetStringInfo(ctx->out);
+
+ pq_sendbyte(ctx->out, 'w');
+ pq_sendint64(ctx->out, lsn); /* dataStart */
+ /* XXX: overwrite when data is assembled */
+ pq_sendint64(ctx->out, lsn); /* walEnd */
+ /* XXX: gather that value later just as it's done in XLogSendPhysical */
+ pq_sendint64(ctx->out, 0 /*GetCurrentIntegerTimestamp() */);/* sendtime */
+}
+
+/*
+ * LogicalDecodingContext 'write' callback.
+ *
+ * Actually write out data previously prepared by WalSndPrepareWrite out to the
+ * network, take as long as needed but process replies from the other side
+ * during that.
+ */
+static void
+WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
+{
+ AssertVariableIsOfType(&WalSndWriteData, LogicalOutputPluginWriterWrite);
+
+ /* output previously gathered data in a CopyData packet */
+ pq_putmessage_noblock('d', ctx->out->data, ctx->out->len);
+
+ /* fast path */
+ /* Try to flush pending output to the client */
+ if (pq_flush_if_writable() != 0)
+ return;
+
+ if (!pq_is_send_pending())
+ return;
+
+ for (;;)
+ {
+ int wakeEvents;
+ long sleeptime = 10000; /* 10s */
+
+ /*
+ * Emergency bailout if postmaster has died. This is to avoid the
+ * necessity for manual cleanup of all postmaster children.
+ */
+ if (!PostmasterIsAlive())
+ exit(1);
+
+ /* Process any requests or signals received recently */
+ if (got_SIGHUP)
+ {
+ got_SIGHUP = false;
+ ProcessConfigFile(PGC_SIGHUP);
+ SyncRepInitConfig();
+ }
+
+ CHECK_FOR_INTERRUPTS();
+
+ /* Check for input from the client */
+ ProcessRepliesIfAny();
+
+ /* Clear any already-pending wakeups */
+ ResetLatch(&MyWalSnd->latch);
+
+ /* Try to flush pending output to the client */
+ if (pq_flush_if_writable() != 0)
+ break;
+
+ /* If we finished clearing the buffered data, we're done here. */
+ if (!pq_is_send_pending())
+ break;
+
+ /*
+ * Note we don't set a timeout here. It would be pointless, because
+ * if the socket is not writable there's not much we can do elsewhere
+ * anyway.
+ */
+ wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
+ WL_SOCKET_WRITEABLE | WL_SOCKET_READABLE | WL_TIMEOUT;
+
+ ImmediateInterruptOK = true;
+ CHECK_FOR_INTERRUPTS();
+ WaitLatchOrSocket(&MyWalSnd->latch, wakeEvents,
+ MyProcPort->sock, sleeptime);
+ ImmediateInterruptOK = false;
+ }
+
+ /* reactivate latch so WalSndLoop knows to continue */
+ SetLatch(&MyWalSnd->latch);
+}
+
+/*
+ * Wait till WAL < loc is flushed to disk so it can be safely read.
+ */
+XLogRecPtr
+WalSndWaitForWal(XLogRecPtr loc)
+{
+ int wakeEvents;
+ XLogRecPtr flushptr;
+
+ /* fast path if everything is there already */
+ /*
+ * XXX: introduce RecentFlushPtr to avoid acquiring the spinlock in the
+ * fast path case where we already know we have enough WAL available.
+ */
+ if (!RecoveryInProgress())
+ flushptr = GetFlushRecPtr();
+ else
+ flushptr = GetXLogReplayRecPtr(NULL);
+
+ if (loc <= flushptr)
+ return flushptr;
+
+ /*
+ * Waiting for new WAL, we've caught up.
+ */
+ WalSndCaughtUp = true;
+
+ for (;;)
+ {
+ long sleeptime = 10000; /* 10 s */
+
+ wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
+ WL_SOCKET_READABLE | WL_TIMEOUT;
+
+ /*
+ * Emergency bailout if postmaster has died. This is to avoid the
+ * necessity for manual cleanup of all postmaster children.
+ */
+ if (!PostmasterIsAlive())
+ exit(1);
+
+ /* Process any requests or signals received recently */
+ if (got_SIGHUP)
+ {
+ got_SIGHUP = false;
+ ProcessConfigFile(PGC_SIGHUP);
+ SyncRepInitConfig();
+ }
+
+ CHECK_FOR_INTERRUPTS();
+
+ /* Check for input from the client */
+ ProcessRepliesIfAny();
+
+ /* Clear any already-pending wakeups */
+ ResetLatch(&MyWalSnd->latch);
+
+ /* Update our idea of flushed position. */
+ if (!RecoveryInProgress())
+ flushptr = GetFlushRecPtr();
+ else
+ flushptr = GetXLogReplayRecPtr(NULL);
+
+ /* If postmaster asked us to stop, don't wait here anymore */
+ if (walsender_ready_to_stop)
+ break;
+
+ /* check whether we're done */
+ if (loc <= flushptr)
+ break;
+
+ /*
+ * We only send regular messages to the client for full
+ * decoded transactions. If the flush position differs
+ */
+ if (MyWalSnd->flush < sentPtr && !ping_sent)
+ WalSndKeepalive(true);
+
+ /* Try to flush pending output to the client */
+ if (pq_is_send_pending())
+ {
+ pq_flush_if_writable();
+ if (pq_is_send_pending())
+ wakeEvents |= WL_SOCKET_WRITEABLE;
+ }
+
+ /* Determine time until replication timeout */
+ if (wal_sender_timeout > 0)
+ {
+ if (!ping_sent)
+ {
+ TimestampTz timeout;
+
+ /*
+ * If half of wal_sender_timeout has lapsed without receiving
+ * any reply from standby, send a keep-alive message to standby
+ * requesting an immediate reply.
+ */
+ timeout = TimestampTzPlusMilliseconds(last_reply_timestamp,
+ wal_sender_timeout / 2);
+ if (GetCurrentTimestamp() >= timeout)
+ {
+ WalSndKeepalive(true);
+ ping_sent = true;
+ wakeEvents |= WL_SOCKET_WRITEABLE;
+ /* Try to flush pending output to the client */
+ if (pq_flush_if_writable() != 0)
+ break;
+ }
+ }
+
+ sleeptime = 1 + (wal_sender_timeout / 10);
+ }
+
+ ImmediateInterruptOK = true;
+ CHECK_FOR_INTERRUPTS();
+ WaitLatchOrSocket(&MyWalSnd->latch, wakeEvents,
+ MyProcPort->sock, sleeptime);
+ ImmediateInterruptOK = false;
+
+ /*
+ * The equivalent code in WalSndLoop checks here that replication
+ * timeout hasn't been exceeded. We don't do that here. XXX explain
+ * why.
+ */
+ }
+
+ /* reactivate latch so WalSndLoop knows to continue */
+ SetLatch(&MyWalSnd->latch);
+ return flushptr;
+}
+
/*
* Execute an incoming replication command.
*/
MemoryContext cmd_context;
MemoryContext old_context;
+ /*
+ * INIT_LOGICAL_REPLICATION exports a snapshot until the next command
+ * arrives. Clean up the old stuff if there's anything.
+ */
+ SnapBuildClearExportedSnapshot();
+
elog(DEBUG1, "received replication command: %s", cmd_string);
CHECK_FOR_INTERRUPTS();
StartReplication((StartReplicationCmd *) cmd_node);
break;
+ case T_InitLogicalReplicationCmd:
+ InitLogicalReplication((InitLogicalReplicationCmd *) cmd_node);
+ break;
+
+ case T_StartLogicalReplicationCmd:
+ StartLogicalReplication((StartLogicalReplicationCmd *) cmd_node);
+ break;
+
+ case T_FreeLogicalReplicationCmd:
+ FreeLogicalReplication((FreeLogicalReplicationCmd *) cmd_node);
+ break;
+
case T_BaseBackupCmd:
SendBaseBackup((BaseBackupCmd *) cmd_node);
break;
SpinLockRelease(&walsnd->mutex);
}
+ /*
+ * Advance our local xmin horizon when the client confirmed a flush.
+ */
+ if (MyLogicalDecodingSlot && flushPtr != InvalidXLogRecPtr)
+ LogicalConfirmReceivedLocation(flushPtr);
+
if (!am_cascading_walsender)
SyncRepReleaseWaiters();
}
/* Main loop of walsender process that streams the WAL over Copy messages. */
static void
-WalSndLoop(void)
+WalSndLoop(WalSndSendData send_data)
{
- bool caughtup = false;
-
/*
* Allocate buffers that will be used for each outgoing and incoming
* message. We do this just once to reduce palloc overhead.
/*
* If we don't have any pending data in the output buffer, try to send
- * some more. If there is some, we don't bother to call XLogSend
+ * some more. If there is some, we don't bother to call send_data
* again until we've flushed it ... but we'd better assume we are not
* caught up.
*/
if (!pq_is_send_pending())
- XLogSend(&caughtup);
+ send_data();
else
- caughtup = false;
+ WalSndCaughtUp = false;
/* Try to flush pending output to the client */
if (pq_flush_if_writable() != 0)
goto send_failure;
/* If nothing remains to be sent right now ... */
- if (caughtup && !pq_is_send_pending())
+ if (WalSndCaughtUp && !pq_is_send_pending())
{
/*
* If we're in catchup state, move to streaming. This is an
* the walsender is not sure which.
*/
if (walsender_ready_to_stop)
- {
- /* ... let's just be real sure we're caught up ... */
- XLogSend(&caughtup);
- if (caughtup && sentPtr == MyWalSnd->flush &&
- !pq_is_send_pending())
- {
- /* Inform the standby that XLOG streaming is done */
- EndCommand("COPY 0", DestRemote);
- pq_flush();
-
- proc_exit(0);
- }
- }
+ WalSndDone(send_data);
}
/*
* We don't block if not caught up, unless there is unsent data
* pending in which case we'd better block until the socket is
- * write-ready. This test is only needed for the case where XLogSend
+ * write-ready. This test is only needed for the case where send_data
* loaded a subset of the available data but then pq_flush_if_writable
* flushed it all --- we should immediately try to send more.
*/
- if ((caughtup && !streamingDoneSending) || pq_is_send_pending())
+ if ((WalSndCaughtUp && !streamingDoneSending) || pq_is_send_pending())
{
TimestampTz timeout = 0;
long sleeptime = 10000; /* 10 s */
}
/*
+ * Send out the WAL in its normal physical/stored form.
+ *
* Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk,
* but not yet sent to the client, and buffer it in the libpq output
* buffer.
*
- * If there is no unsent WAL remaining, *caughtup is set to true, otherwise
- * *caughtup is set to false.
+ * If there is no unsent WAL remaining, WalSndCaughtUp is set to true,
+ * otherwise WalSndCaughtUp is set to false.
*/
static void
-XLogSend(bool *caughtup)
+XLogSendPhysical(void)
{
XLogRecPtr SendRqstPtr;
XLogRecPtr startptr;
if (streamingDoneSending)
{
- *caughtup = true;
+ WalSndCaughtUp = true;
return;
}
pq_putmessage_noblock('c', NULL, 0);
streamingDoneSending = true;
- *caughtup = true;
+ WalSndCaughtUp = true;
elog(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)",
(uint32) (sendTimeLineValidUpto >> 32), (uint32) sendTimeLineValidUpto,
Assert(sentPtr <= SendRqstPtr);
if (SendRqstPtr <= sentPtr)
{
- *caughtup = true;
+ WalSndCaughtUp = true;
return;
}
{
endptr = SendRqstPtr;
if (sendTimeLineIsHistoric)
- *caughtup = false;
+ WalSndCaughtUp = false;
else
- *caughtup = true;
+ WalSndCaughtUp = true;
}
else
{
/* round down to page boundary. */
endptr -= (endptr % XLOG_BLCKSZ);
- *caughtup = false;
+ WalSndCaughtUp = false;
}
nbytes = endptr - startptr;
return;
}
+/*
+ * Send out the WAL after it being decoded into a logical format by the output
+ * plugin specified in INIT_LOGICAL_DECODING
+ */
+static void
+XLogSendLogical(void)
+{
+ XLogRecord *record;
+ char *errm;
+
+ if (decoding_ctx == NULL)
+ {
+ decoding_ctx = AllocSetContextCreate(TopMemoryContext,
+ "decoding context",
+ ALLOCSET_DEFAULT_MINSIZE,
+ ALLOCSET_DEFAULT_INITSIZE,
+ ALLOCSET_DEFAULT_MAXSIZE);
+ }
+
+ /*
+ * Don't know whether we've caught up yet. We'll set it to true in
+ * WalSndWaitForWal, if we're actually waiting. We also set to true if
+ * XLogReadRecord() had to stop reading but WalSndWaitForWal didn't wait -
+ * i.e. when we're shutting down.
+ */
+ WalSndCaughtUp = false;
+
+ record = XLogReadRecord(logical_decoding_ctx->reader, logical_startptr, &errm);
+ logical_startptr = InvalidXLogRecPtr;
+
+ /* xlog record was invalid */
+ if (errm != NULL)
+ elog(ERROR, "%s", errm);
+
+ if (record != NULL)
+ {
+ XLogRecordBuffer buf;
+
+ buf.origptr = logical_decoding_ctx->reader->ReadRecPtr;
+ buf.endptr = logical_decoding_ctx->reader->EndRecPtr;
+ buf.record = *record;
+ buf.record_data = XLogRecGetData(record);
+
+ old_decoding_ctx = MemoryContextSwitchTo(decoding_ctx);
+
+ DecodeRecordIntoReorderBuffer(logical_decoding_ctx, &buf);
+
+ MemoryContextSwitchTo(old_decoding_ctx);
+
+ sentPtr = logical_decoding_ctx->reader->EndRecPtr;
+ }
+ else
+ {
+ /*
+ * If the record we just wanted read is at or beyond the flushed point,
+ * then we're caught up.
+ */
+ if (logical_decoding_ctx->reader->EndRecPtr >= GetFlushRecPtr())
+ WalSndCaughtUp = true;
+ }
+
+ /* Update shared memory status */
+ {
+ /* use volatile pointer to prevent code rearrangement */
+ volatile WalSnd *walsnd = MyWalSnd;
+
+ SpinLockAcquire(&walsnd->mutex);
+ walsnd->sentPtr = sentPtr;
+ SpinLockRelease(&walsnd->mutex);
+ }
+}
+
+/*
+ * The sender is caught up, so we can go away for shutdown processing
+ * to finish normally. (This should only be called when the shutdown
+ * signal has been received from postmaster.)
+ *
+ * Note that if while doing this we determine that there's still more
+ * data to send, this function will return control to the caller.
+ */
+static void
+WalSndDone(WalSndSendData send_data)
+{
+ /* ... let's just be real sure we're caught up ... */
+ send_data();
+
+ if (WalSndCaughtUp && sentPtr == MyWalSnd->flush &&
+ !pq_is_send_pending())
+ {
+ /* Inform the standby that XLOG streaming is done */
+ EndCommand("COPY 0", DestRemote);
+ pq_flush();
+
+ proc_exit(0);
+ }
+ if (!ping_sent)
+ WalSndKeepalive(true);
+}
+
/*
* Returns the latest point in WAL that has been safely flushed to disk, and
* can be sent to the standby. This should only be called when in recovery,