wal_decoding: logical changeset extraction walsender interface
authorAndres Freund <andres@anarazel.de>
Wed, 4 Dec 2013 15:37:38 +0000 (16:37 +0100)
committerAndres Freund <andres@anarazel.de>
Sun, 8 Dec 2013 18:20:37 +0000 (19:20 +0100)
src/backend/replication/repl_gram.y
src/backend/replication/repl_scanner.l
src/backend/replication/walsender.c
src/include/nodes/nodes.h
src/include/nodes/replnodes.h
src/include/replication/walsender_private.h
src/tools/pgindent/typedefs.list

index 8c8378045e6ff2eb23af3e3f770186c184270a07..7e055bb0585460a9700882f1f7a989ed1b6da6b2 100644 (file)
@@ -65,7 +65,7 @@ Node *replication_parse_result;
 }
 
 /* Non-keyword tokens */
-%token <str> SCONST
+%token <str> SCONST IDENT
 %token <uintval> UCONST
 %token <recptr> RECPTR
 
@@ -73,6 +73,9 @@ Node *replication_parse_result;
 %token K_BASE_BACKUP
 %token K_IDENTIFY_SYSTEM
 %token K_START_REPLICATION
+%token K_INIT_LOGICAL_REPLICATION
+%token K_START_LOGICAL_REPLICATION
+%token K_FREE_LOGICAL_REPLICATION
 %token K_TIMELINE_HISTORY
 %token K_LABEL
 %token K_PROGRESS
@@ -82,10 +85,13 @@ Node *replication_parse_result;
 %token K_TIMELINE
 
 %type <node>   command
-%type <node>   base_backup start_replication identify_system timeline_history
+%type <node>   base_backup start_replication start_logical_replication init_logical_replication free_logical_replication identify_system timeline_history
 %type <list>   base_backup_opt_list
 %type <defelt> base_backup_opt
 %type <uintval>    opt_timeline
+%type <list>   plugin_options plugin_opt_list
+%type <defelt> plugin_opt_elem
+%type <node>   plugin_opt_arg
 %%
 
 firstcmd: command opt_semicolon
@@ -102,6 +108,9 @@ command:
            identify_system
            | base_backup
            | start_replication
+           | init_logical_replication
+           | start_logical_replication
+           | free_logical_replication
            | timeline_history
            ;
 
@@ -186,6 +195,67 @@ opt_timeline:
                | /* nothing */         { $$ = 0; }
            ;
 
+init_logical_replication:
+           K_INIT_LOGICAL_REPLICATION IDENT IDENT
+               {
+                   InitLogicalReplicationCmd *cmd;
+                   cmd = makeNode(InitLogicalReplicationCmd);
+                   cmd->name = $2;
+                   cmd->plugin = $3;
+                   $$ = (Node *) cmd;
+               }
+           ;
+
+start_logical_replication:
+           K_START_LOGICAL_REPLICATION IDENT RECPTR plugin_options
+               {
+                   StartLogicalReplicationCmd *cmd;
+                   cmd = makeNode(StartLogicalReplicationCmd);
+                   cmd->name = $2;
+                   cmd->startpoint = $3;
+                   cmd->options = $4;
+                   $$ = (Node *) cmd;
+               }
+           ;
+
+plugin_options:
+           '(' plugin_opt_list ')'         { $$ = $2; }
+           | /* EMPTY */                   { $$ = NIL; }
+       ;
+
+plugin_opt_list:
+           plugin_opt_elem
+               {
+                   $$ = list_make1($1);
+               }
+           | plugin_opt_list ',' plugin_opt_elem
+               {
+                   $$ = lappend($1, $3);
+               }
+       ;
+
+plugin_opt_elem:
+           IDENT plugin_opt_arg
+               {
+                   $$ = makeDefElem($1, $2);
+               }
+       ;
+
+plugin_opt_arg:
+           SCONST                          { $$ = (Node *) makeString($1); }
+           | /* EMPTY */                   { $$ = NULL; }
+       ;
+
+free_logical_replication:
+           K_FREE_LOGICAL_REPLICATION IDENT
+               {
+                   FreeLogicalReplicationCmd *cmd;
+                   cmd = makeNode(FreeLogicalReplicationCmd);
+                   cmd->name = $2;
+                   $$ = (Node *) cmd;
+               }
+           ;
+
 /*
  * TIMELINE_HISTORY %d
  */
index 3d930f1301216d4150da89eddb01eb55781e1799..2b0f2ff73b09565e9a1ce6b25abd05e3570ecbd0 100644 (file)
@@ -16,6 +16,7 @@
 #include "postgres.h"
 
 #include "utils/builtins.h"
+#include "parser/scansup.h"
 
 /* Avoid exit() on fatal scanner errors (a bit ugly -- see yy_fatal_error) */
 #undef fprintf
@@ -48,7 +49,7 @@ static void addlitchar(unsigned char ychar);
 %option warn
 %option prefix="replication_yy"
 
-%x xq
+%x xq xd
 
 /* Extended quote
  * xqdouble implements embedded quote, ''''
@@ -57,12 +58,26 @@ xqstart         {quote}
 xqdouble       {quote}{quote}
 xqinside       [^']+
 
+/* Double quote
+ * Allows embedded spaces and other special characters into identifiers.
+ */
+dquote         \"
+xdstart            {dquote}
+xdstop         {dquote}
+xddouble       {dquote}{dquote}
+xdinside       [^"]+
+
 digit          [0-9]+
 hexdigit       [0-9A-Za-z]+
 
 quote          '
 quotestop      {quote}
 
+ident_start        [A-Za-z\200-\377_]
+ident_cont     [A-Za-z\200-\377_0-9\$]
+
+identifier     {ident_start}{ident_cont}*
+
 %%
 
 BASE_BACKUP            { return K_BASE_BACKUP; }
@@ -74,9 +89,14 @@ PROGRESS         { return K_PROGRESS; }
 WAL            { return K_WAL; }
 TIMELINE           { return K_TIMELINE; }
 START_REPLICATION  { return K_START_REPLICATION; }
+INIT_LOGICAL_REPLICATION   { return K_INIT_LOGICAL_REPLICATION; }
+START_LOGICAL_REPLICATION  { return K_START_LOGICAL_REPLICATION; }
+FREE_LOGICAL_REPLICATION   { return K_FREE_LOGICAL_REPLICATION; }
 TIMELINE_HISTORY   { return K_TIMELINE_HISTORY; }
 ","                { return ','; }
 ";"                { return ';'; }
+"("                { return '('; }
+")"                { return ')'; }
 
 [\n]           ;
 [\t]           ;
@@ -100,20 +120,49 @@ TIMELINE_HISTORY  { return K_TIMELINE_HISTORY; }
                    BEGIN(xq);
                    startlit();
                }
+
 <xq>{quotestop}    {
                    yyless(1);
                    BEGIN(INITIAL);
                    yylval.str = litbufdup();
                    return SCONST;
                }
-<xq>{xqdouble} {
+
+<xq>{xqdouble} {
                    addlitchar('\'');
                }
+
 <xq>{xqinside}  {
                    addlit(yytext, yyleng);
                }
 
-<xq><<EOF>>        { yyerror("unterminated quoted string"); }
+{xdstart}      {
+                   BEGIN(xd);
+                   startlit();
+               }
+
+<xd>{xdstop}   {
+                   int len;
+                   yyless(1);
+                   BEGIN(INITIAL);
+                   yylval.str = litbufdup();
+                   len = strlen(yylval.str);
+                   truncate_identifier(yylval.str, len, true);
+                   return IDENT;
+               }
+
+<xd>{xdinside}  {
+                   addlit(yytext, yyleng);
+               }
+
+{identifier}   {
+                   int len = strlen(yytext);
+
+                   yylval.str = downcase_truncate_identifier(yytext, len, true);
+                   return IDENT;
+               }
+
+<xq,xd><<EOF>> { yyerror("unterminated quoted string"); }
 
 
 <<EOF>>            {
index e2a42c6e08a0fef1b539819dd865123e3c08c3d7..7055adb6eb0c62c73db606095919ee6e83fc19ca 100644 (file)
@@ -45,8 +45,8 @@
 
 #include "access/timeline.h"
 #include "access/transam.h"
-#include "access/xlog_internal.h"
 #include "access/xact.h"
+#include "access/xlog_internal.h"
 
 #include "catalog/pg_type.h"
 #include "commands/dbcommands.h"
 #include "miscadmin.h"
 #include "nodes/replnodes.h"
 #include "replication/basebackup.h"
+#include "replication/decode.h"
+#include "replication/logical.h"
+#include "replication/logicalfuncs.h"
+#include "replication/snapbuild.h"
 #include "replication/syncrep.h"
 #include "replication/walreceiver.h"
 #include "replication/walsender.h"
@@ -157,6 +161,9 @@ static bool ping_sent = false;
 static bool streamingDoneSending;
 static bool streamingDoneReceiving;
 
+/* Are we there yet? */
+static bool        WalSndCaughtUp = false;
+
 /* Flags set by signal handlers for later service in main loop */
 static volatile sig_atomic_t got_SIGHUP = false;
 static volatile sig_atomic_t walsender_ready_to_stop = false;
@@ -169,24 +176,42 @@ static volatile sig_atomic_t walsender_ready_to_stop = false;
  */
 static volatile sig_atomic_t replication_active = false;
 
+/* XXX reader */
+static MemoryContext decoding_ctx = NULL;
+static MemoryContext old_decoding_ctx = NULL;
+
+static LogicalDecodingContext *logical_decoding_ctx = NULL;
+static XLogRecPtr  logical_startptr = InvalidXLogRecPtr;
+
 /* Signal handlers */
 static void WalSndSigHupHandler(SIGNAL_ARGS);
 static void WalSndXLogSendHandler(SIGNAL_ARGS);
 static void WalSndLastCycleHandler(SIGNAL_ARGS);
 
 /* Prototypes for private functions */
-static void WalSndLoop(void);
+typedef void (*WalSndSendData)(void);
+static void WalSndLoop(WalSndSendData send_data);
 static void InitWalSenderSlot(void);
 static void WalSndKill(int code, Datum arg);
-static void XLogSend(bool *caughtup);
+static void XLogSendPhysical(void);
+static void XLogSendLogical(void);
+static void WalSndDone(WalSndSendData send_data);
 static XLogRecPtr GetStandbyFlushRecPtr(void);
 static void IdentifySystem(void);
 static void StartReplication(StartReplicationCmd *cmd);
+static void InitLogicalReplication(InitLogicalReplicationCmd *cmd);
+static void StartLogicalReplication(StartLogicalReplicationCmd *cmd);
+static void FreeLogicalReplication(FreeLogicalReplicationCmd *cmd);
 static void ProcessStandbyMessage(void);
 static void ProcessStandbyReplyMessage(void);
 static void ProcessStandbyHSFeedbackMessage(void);
 static void ProcessRepliesIfAny(void);
 static void WalSndKeepalive(bool requestReply);
+static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid);
+static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid);
+static void XLogRead(char *buf, XLogRecPtr startptr, Size count);
+
+
 
 
 /* Initialize walsender process before entering the main command loop */
@@ -594,7 +619,7 @@ StartReplication(StartReplicationCmd *cmd)
        /* Main loop of walsender */
        replication_active = true;
 
-       WalSndLoop();
+       WalSndLoop(XLogSendPhysical);
 
        replication_active = false;
        if (walsender_ready_to_stop)
@@ -661,6 +686,531 @@ StartReplication(StartReplicationCmd *cmd)
    pq_puttextmessage('C', "START_STREAMING");
 }
 
+static int
+replay_read_page(XLogReaderState* state, XLogRecPtr targetPagePtr, int reqLen,
+                XLogRecPtr targetRecPtr, char* cur_page, TimeLineID *pageTLI)
+{
+   XLogRecPtr flushptr;
+   int     count;
+
+   flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
+
+   /* more than one block available */
+   if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
+       count = XLOG_BLCKSZ;
+   /* not enough data there */
+   else if (targetPagePtr + reqLen > flushptr)
+       return -1;
+   /* part of the page available */
+   else
+       count = flushptr - targetPagePtr;
+
+   /* FIXME: more sensible/efficient implementation */
+   XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ);
+
+   return count;
+}
+
+/*
+ * Initialize logical replication and wait for an initial consistent point to
+ * start sending changes from.
+ */
+static void
+InitLogicalReplication(InitLogicalReplicationCmd *cmd)
+{
+   const char *slot_name;
+   StringInfoData buf;
+   char        xpos[MAXFNAMELEN];
+   const char *snapshot_name = NULL;
+   LogicalDecodingContext *ctx;
+   XLogRecPtr startptr;
+
+   CheckLogicalDecodingRequirements();
+
+   Assert(!MyLogicalDecodingSlot);
+
+   /* XXX apply sanity checking to slot name? */
+   LogicalDecodingAcquireFreeSlot(cmd->name, cmd->plugin);
+
+   Assert(MyLogicalDecodingSlot);
+
+   decoding_ctx = AllocSetContextCreate(TopMemoryContext,
+                                        "decoding context",
+                                        ALLOCSET_DEFAULT_MINSIZE,
+                                        ALLOCSET_DEFAULT_INITSIZE,
+                                        ALLOCSET_DEFAULT_MAXSIZE);
+   old_decoding_ctx = MemoryContextSwitchTo(decoding_ctx);
+
+   /* setup state for XLogReadPage */
+   sendTimeLineIsHistoric = false;
+   sendTimeLine = ThisTimeLineID;
+
+   initStringInfo(&output_message);
+   ctx = CreateLogicalDecodingContext(MyLogicalDecodingSlot, true, InvalidXLogRecPtr,
+                                      NIL, replay_read_page,
+                                      WalSndPrepareWrite, WalSndWriteData);
+
+   MemoryContextSwitchTo(old_decoding_ctx);
+
+   startptr = MyLogicalDecodingSlot->restart_decoding;
+
+   elog(WARNING, "Initiating logical rep from %X/%X",
+        (uint32)(startptr >> 32), (uint32)startptr);
+
+   for (;;)
+   {
+       XLogRecord *record;
+       XLogRecordBuffer buf;
+       char *err = NULL;
+
+       /* the read_page callback waits for new WAL */
+       record = XLogReadRecord(ctx->reader, startptr, &err);
+       /* xlog record was invalid */
+       if (err)
+           elog(ERROR, "%s", err);
+
+       /* read up from last position next time round */
+       startptr = InvalidXLogRecPtr;
+
+       Assert(record);
+
+       buf.origptr = ctx->reader->ReadRecPtr;
+       buf.endptr = ctx->reader->EndRecPtr;
+       buf.record = *record;
+       buf.record_data = XLogRecGetData(record);
+       DecodeRecordIntoReorderBuffer(ctx, &buf);
+
+       /* only continue till we found a consistent spot */
+       if (LogicalDecodingContextReady(ctx))
+       {
+           /* export plain, importable, snapshot to the user */
+           snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);
+           break;
+       }
+   }
+
+   MyLogicalDecodingSlot->confirmed_flush = ctx->reader->EndRecPtr;
+   slot_name = NameStr(MyLogicalDecodingSlot->name);
+   snprintf(xpos, sizeof(xpos), "%X/%X",
+            (uint32) (MyLogicalDecodingSlot->confirmed_flush >> 32),
+            (uint32) MyLogicalDecodingSlot->confirmed_flush);
+
+   pq_beginmessage(&buf, 'T');
+   pq_sendint(&buf, 4, 2);     /* 4 fields */
+
+   /* first field: slot name */
+   pq_sendstring(&buf, "replication_id");  /* col name */
+   pq_sendint(&buf, 0, 4);     /* table oid */
+   pq_sendint(&buf, 0, 2);     /* attnum */
+   pq_sendint(&buf, TEXTOID, 4);       /* type oid */
+   pq_sendint(&buf, -1, 2);    /* typlen */
+   pq_sendint(&buf, 0, 4);     /* typmod */
+   pq_sendint(&buf, 0, 2);     /* format code */
+
+   /* second field: LSN at which we became consistent  */
+   pq_sendstring(&buf, "consistent_point");    /* col name */
+   pq_sendint(&buf, 0, 4);     /* table oid */
+   pq_sendint(&buf, 0, 2);     /* attnum */
+   pq_sendint(&buf, TEXTOID, 4);       /* type oid */
+   pq_sendint(&buf, -1, 2);    /* typlen */
+   pq_sendint(&buf, 0, 4);     /* typmod */
+   pq_sendint(&buf, 0, 2);     /* format code */
+
+   /* third field: exported snapshot's name */
+   pq_sendstring(&buf, "snapshot_name");   /* col name */
+   pq_sendint(&buf, 0, 4);     /* table oid */
+   pq_sendint(&buf, 0, 2);     /* attnum */
+   pq_sendint(&buf, TEXTOID, 4);       /* type oid */
+   pq_sendint(&buf, -1, 2);    /* typlen */
+   pq_sendint(&buf, 0, 4);     /* typmod */
+   pq_sendint(&buf, 0, 2);     /* format code */
+
+   /* fourth field: output plugin */
+   pq_sendstring(&buf, "plugin");  /* col name */
+   pq_sendint(&buf, 0, 4);     /* table oid */
+   pq_sendint(&buf, 0, 2);     /* attnum */
+   pq_sendint(&buf, TEXTOID, 4);       /* type oid */
+   pq_sendint(&buf, -1, 2);    /* typlen */
+   pq_sendint(&buf, 0, 4);     /* typmod */
+   pq_sendint(&buf, 0, 2);     /* format code */
+
+   pq_endmessage(&buf);
+
+   /* Send a DataRow message */
+   pq_beginmessage(&buf, 'D');
+   pq_sendint(&buf, 4, 2);     /* # of columns */
+
+   /* replication_id */
+   pq_sendint(&buf, strlen(slot_name), 4); /* col1 len */
+   pq_sendbytes(&buf, slot_name, strlen(slot_name));
+
+   /* consistent wal location */
+   pq_sendint(&buf, strlen(xpos), 4); /* col2 len */
+   pq_sendbytes(&buf, xpos, strlen(xpos));
+
+   /* snapshot name */
+   pq_sendint(&buf, strlen(snapshot_name), 4); /* col3 len */
+   pq_sendbytes(&buf, snapshot_name, strlen(snapshot_name));
+
+   /* plugin */
+   pq_sendint(&buf, strlen(cmd->plugin), 4); /* col4 len */
+   pq_sendbytes(&buf, cmd->plugin, strlen(cmd->plugin));
+
+   pq_endmessage(&buf);
+
+   FreeLogicalDecodingContext(ctx);
+
+   /*
+    * release active status again, START_LOGICAL_REPLICATION will reacquire it
+    */
+   LogicalDecodingReleaseSlot();
+}
+
+/*
+ * Load previously initiated logical slot and prepare for sending data (via
+ * WalSndLoop).
+ */
+static void
+StartLogicalReplication(StartLogicalReplicationCmd *cmd)
+{
+   StringInfoData buf;
+   XLogRecPtr confirmed_flush;
+
+   elog(WARNING, "Starting logical replication from %x/%x",
+        (uint32)(cmd->startpoint >> 32), (uint32)cmd->startpoint);
+
+   /* make sure that our requirements are still fulfilled */
+   CheckLogicalDecodingRequirements();
+
+   Assert(!MyLogicalDecodingSlot);
+
+   LogicalDecodingReAcquireSlot(cmd->name);
+
+   if (am_cascading_walsender && !RecoveryInProgress())
+   {
+       ereport(LOG,
+               (errmsg("terminating walsender process to force cascaded standby to update timeline and reconnect")));
+       walsender_ready_to_stop = true;
+   }
+
+   WalSndSetState(WALSNDSTATE_CATCHUP);
+
+   /* Send a CopyBothResponse message, and start streaming */
+   pq_beginmessage(&buf, 'W');
+   pq_sendbyte(&buf, 0);
+   pq_sendint(&buf, 0, 2);
+   pq_endmessage(&buf);
+   pq_flush();
+
+   /* setup state for XLogReadPage */
+   sendTimeLineIsHistoric = false;
+   sendTimeLine = ThisTimeLineID;
+
+   confirmed_flush = MyLogicalDecodingSlot->confirmed_flush;
+
+   Assert(confirmed_flush != InvalidXLogRecPtr);
+
+   /* continue from last position */
+   if (cmd->startpoint == InvalidXLogRecPtr)
+       cmd->startpoint = MyLogicalDecodingSlot->confirmed_flush;
+   else if (cmd->startpoint < MyLogicalDecodingSlot->confirmed_flush)
+       elog(ERROR, "cannot stream from %X/%X, minimum is %X/%X",
+            (uint32)(cmd->startpoint >> 32), (uint32)cmd->startpoint,
+            (uint32)(confirmed_flush >> 32), (uint32)confirmed_flush);
+
+   /*
+    * Initialize position to the last ack'ed one, then the xlog records begin
+    * to be shipped from that position.
+    */
+   logical_decoding_ctx = CreateLogicalDecodingContext(
+       MyLogicalDecodingSlot, false, cmd->startpoint, cmd->options,
+       replay_read_page, WalSndPrepareWrite, WalSndWriteData);
+
+   /*
+    * XXX: For feedback purposes it would be nicer to set sentPtr to
+    * cmd->startpoint, but we use it to know where to read xlog in the main
+    * loop...
+    */
+   sentPtr = MyLogicalDecodingSlot->restart_decoding;
+   logical_startptr = sentPtr;
+
+   /* Also update the start position status in shared memory */
+   {
+       /* use volatile pointer to prevent code rearrangement */
+       volatile WalSnd *walsnd = MyWalSnd;
+
+       SpinLockAcquire(&walsnd->mutex);
+       walsnd->sentPtr = MyLogicalDecodingSlot->restart_decoding;
+       SpinLockRelease(&walsnd->mutex);
+   }
+
+   elog(LOG, "starting to decode from %X/%X, replay %X/%X",
+        (uint32)(MyWalSnd->sentPtr >> 32), (uint32)MyWalSnd->sentPtr,
+        (uint32)(cmd->startpoint >> 32), (uint32)cmd->startpoint);
+
+   replication_active = true;
+
+   SyncRepInitConfig();
+
+   /* Main loop of walsender */
+   WalSndLoop(XLogSendLogical);
+
+   FreeLogicalDecodingContext(logical_decoding_ctx);
+   LogicalDecodingReleaseSlot();
+
+   replication_active = false;
+   if (walsender_ready_to_stop)
+       proc_exit(0);
+   WalSndSetState(WALSNDSTATE_STARTUP);
+
+   /* Get out of COPY mode (CommandComplete). */
+   EndCommand("COPY 0", DestRemote);
+}
+
+/*
+ * Free permanent state by a now inactive but defined logical slot.
+ */
+static void
+FreeLogicalReplication(FreeLogicalReplicationCmd *cmd)
+{
+   CheckLogicalDecodingRequirements();
+   LogicalDecodingFreeSlot(cmd->name);
+   EndCommand("FREE_LOGICAL_REPLICATION", DestRemote);
+}
+
+/*
+ * LogicalDecodingContext 'prepare_write' callback.
+ *
+ * Prepare a write into a StringInfo.
+ *
+ * Don't do anything lasting in here, it's quite possible that nothing will done
+ * with the data.
+ */
+static void
+WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
+{
+   AssertVariableIsOfType(&WalSndPrepareWrite, LogicalOutputPluginWriterPrepareWrite);
+
+   resetStringInfo(ctx->out);
+
+   pq_sendbyte(ctx->out, 'w');
+   pq_sendint64(ctx->out, lsn);    /* dataStart */
+   /* XXX: overwrite when data is assembled */
+   pq_sendint64(ctx->out, lsn);    /* walEnd */
+   /* XXX: gather that value later just as it's done in XLogSendPhysical */
+   pq_sendint64(ctx->out, 0 /*GetCurrentIntegerTimestamp() */);/* sendtime */
+}
+
+/*
+ * LogicalDecodingContext 'write' callback.
+ *
+ * Actually write out data previously prepared by WalSndPrepareWrite out to the
+ * network, take as long as needed but process replies from the other side
+ * during that.
+ */
+static void
+WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
+{
+   AssertVariableIsOfType(&WalSndWriteData, LogicalOutputPluginWriterWrite);
+
+   /* output previously gathered data in a CopyData packet */
+   pq_putmessage_noblock('d', ctx->out->data, ctx->out->len);
+
+   /* fast path */
+   /* Try to flush pending output to the client */
+   if (pq_flush_if_writable() != 0)
+       return;
+
+   if (!pq_is_send_pending())
+       return;
+
+   for (;;)
+   {
+       int         wakeEvents;
+       long        sleeptime = 10000;      /* 10s */
+
+       /*
+        * Emergency bailout if postmaster has died.  This is to avoid the
+        * necessity for manual cleanup of all postmaster children.
+        */
+       if (!PostmasterIsAlive())
+           exit(1);
+
+       /* Process any requests or signals received recently */
+       if (got_SIGHUP)
+       {
+           got_SIGHUP = false;
+           ProcessConfigFile(PGC_SIGHUP);
+           SyncRepInitConfig();
+       }
+
+       CHECK_FOR_INTERRUPTS();
+
+       /* Check for input from the client */
+       ProcessRepliesIfAny();
+
+       /* Clear any already-pending wakeups */
+       ResetLatch(&MyWalSnd->latch);
+
+       /* Try to flush pending output to the client */
+       if (pq_flush_if_writable() != 0)
+           break;
+
+       /* If we finished clearing the buffered data, we're done here. */
+       if (!pq_is_send_pending())
+           break;
+
+       /*
+        * Note we don't set a timeout here.  It would be pointless, because
+        * if the socket is not writable there's not much we can do elsewhere
+        * anyway.
+        */
+       wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
+           WL_SOCKET_WRITEABLE | WL_SOCKET_READABLE | WL_TIMEOUT;
+
+       ImmediateInterruptOK = true;
+       CHECK_FOR_INTERRUPTS();
+       WaitLatchOrSocket(&MyWalSnd->latch, wakeEvents,
+                         MyProcPort->sock, sleeptime);
+       ImmediateInterruptOK = false;
+   }
+
+   /* reactivate latch so WalSndLoop knows to continue */
+   SetLatch(&MyWalSnd->latch);
+}
+
+/*
+ * Wait till WAL < loc is flushed to disk so it can be safely read.
+ */
+XLogRecPtr
+WalSndWaitForWal(XLogRecPtr loc)
+{
+   int         wakeEvents;
+   XLogRecPtr  flushptr;
+
+   /* fast path if everything is there already */
+   /*
+    * XXX: introduce RecentFlushPtr to avoid acquiring the spinlock in the
+    * fast path case where we already know we have enough WAL available.
+    */
+   if (!RecoveryInProgress())
+       flushptr = GetFlushRecPtr();
+   else
+       flushptr = GetXLogReplayRecPtr(NULL);
+
+   if (loc <= flushptr)
+       return flushptr;
+
+   /*
+    * Waiting for new WAL, we've caught up.
+    */
+   WalSndCaughtUp = true;
+
+   for (;;)
+   {
+       long        sleeptime = 10000;      /* 10 s */
+
+       wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
+           WL_SOCKET_READABLE | WL_TIMEOUT;
+
+       /*
+        * Emergency bailout if postmaster has died.  This is to avoid the
+        * necessity for manual cleanup of all postmaster children.
+        */
+       if (!PostmasterIsAlive())
+           exit(1);
+
+       /* Process any requests or signals received recently */
+       if (got_SIGHUP)
+       {
+           got_SIGHUP = false;
+           ProcessConfigFile(PGC_SIGHUP);
+           SyncRepInitConfig();
+       }
+
+       CHECK_FOR_INTERRUPTS();
+
+       /* Check for input from the client */
+       ProcessRepliesIfAny();
+
+       /* Clear any already-pending wakeups */
+       ResetLatch(&MyWalSnd->latch);
+
+       /* Update our idea of flushed position. */
+       if (!RecoveryInProgress())
+           flushptr = GetFlushRecPtr();
+       else
+           flushptr = GetXLogReplayRecPtr(NULL);
+
+       /* If postmaster asked us to stop, don't wait here anymore */
+       if (walsender_ready_to_stop)
+           break;
+
+       /* check whether we're done */
+       if (loc <= flushptr)
+           break;
+
+       /*
+        * We only send regular messages to the client for full
+        * decoded transactions. If the flush position differs
+        */
+       if (MyWalSnd->flush < sentPtr && !ping_sent)
+           WalSndKeepalive(true);
+
+       /* Try to flush pending output to the client */
+       if (pq_is_send_pending())
+       {
+           pq_flush_if_writable();
+           if (pq_is_send_pending())
+               wakeEvents |= WL_SOCKET_WRITEABLE;
+       }
+
+       /* Determine time until replication timeout */
+       if (wal_sender_timeout > 0)
+       {
+           if (!ping_sent)
+           {
+               TimestampTz timeout;
+
+               /*
+                * If half of wal_sender_timeout has lapsed without receiving
+                * any reply from standby, send a keep-alive message to standby
+                * requesting an immediate reply.
+                */
+               timeout = TimestampTzPlusMilliseconds(last_reply_timestamp,
+                                                     wal_sender_timeout / 2);
+               if (GetCurrentTimestamp() >= timeout)
+               {
+                   WalSndKeepalive(true);
+                   ping_sent = true;
+                   wakeEvents |= WL_SOCKET_WRITEABLE;
+                   /* Try to flush pending output to the client */
+                   if (pq_flush_if_writable() != 0)
+                       break;
+               }
+           }
+
+           sleeptime = 1 + (wal_sender_timeout / 10);
+       }
+
+       ImmediateInterruptOK = true;
+       CHECK_FOR_INTERRUPTS();
+       WaitLatchOrSocket(&MyWalSnd->latch, wakeEvents,
+                         MyProcPort->sock, sleeptime);
+       ImmediateInterruptOK = false;
+
+       /*
+        * The equivalent code in WalSndLoop checks here that replication
+        * timeout hasn't been exceeded.  We don't do that here.   XXX explain
+        * why.
+        */
+   }
+
+   /* reactivate latch so WalSndLoop knows to continue */
+   SetLatch(&MyWalSnd->latch);
+   return flushptr;
+}
+
 /*
  * Execute an incoming replication command.
  */
@@ -672,6 +1222,12 @@ exec_replication_command(const char *cmd_string)
    MemoryContext cmd_context;
    MemoryContext old_context;
 
+   /*
+    * INIT_LOGICAL_REPLICATION exports a snapshot until the next command
+    * arrives. Clean up the old stuff if there's anything.
+    */
+   SnapBuildClearExportedSnapshot();
+
    elog(DEBUG1, "received replication command: %s", cmd_string);
 
    CHECK_FOR_INTERRUPTS();
@@ -703,6 +1259,18 @@ exec_replication_command(const char *cmd_string)
            StartReplication((StartReplicationCmd *) cmd_node);
            break;
 
+       case T_InitLogicalReplicationCmd:
+           InitLogicalReplication((InitLogicalReplicationCmd *) cmd_node);
+           break;
+
+       case T_StartLogicalReplicationCmd:
+           StartLogicalReplication((StartLogicalReplicationCmd *) cmd_node);
+           break;
+
+       case T_FreeLogicalReplicationCmd:
+           FreeLogicalReplication((FreeLogicalReplicationCmd *) cmd_node);
+           break;
+
        case T_BaseBackupCmd:
            SendBaseBackup((BaseBackupCmd *) cmd_node);
            break;
@@ -912,6 +1480,12 @@ ProcessStandbyReplyMessage(void)
        SpinLockRelease(&walsnd->mutex);
    }
 
+   /*
+    * Advance our local xmin horizon when the client confirmed a flush.
+    */
+   if (MyLogicalDecodingSlot && flushPtr != InvalidXLogRecPtr)
+       LogicalConfirmReceivedLocation(flushPtr);
+
    if (!am_cascading_walsender)
        SyncRepReleaseWaiters();
 }
@@ -996,10 +1570,8 @@ ProcessStandbyHSFeedbackMessage(void)
 
 /* Main loop of walsender process that streams the WAL over Copy messages. */
 static void
-WalSndLoop(void)
+WalSndLoop(WalSndSendData send_data)
 {
-   bool        caughtup = false;
-
    /*
     * Allocate buffers that will be used for each outgoing and incoming
     * message.  We do this just once to reduce palloc overhead.
@@ -1051,21 +1623,21 @@ WalSndLoop(void)
 
        /*
         * If we don't have any pending data in the output buffer, try to send
-        * some more.  If there is some, we don't bother to call XLogSend
+        * some more.  If there is some, we don't bother to call send_data
         * again until we've flushed it ... but we'd better assume we are not
         * caught up.
         */
        if (!pq_is_send_pending())
-           XLogSend(&caughtup);
+           send_data();
        else
-           caughtup = false;
+           WalSndCaughtUp = false;
 
        /* Try to flush pending output to the client */
        if (pq_flush_if_writable() != 0)
            goto send_failure;
 
        /* If nothing remains to be sent right now ... */
-       if (caughtup && !pq_is_send_pending())
+       if (WalSndCaughtUp && !pq_is_send_pending())
        {
            /*
             * If we're in catchup state, move to streaming.  This is an
@@ -1091,29 +1663,17 @@ WalSndLoop(void)
             * the walsender is not sure which.
             */
            if (walsender_ready_to_stop)
-           {
-               /* ... let's just be real sure we're caught up ... */
-               XLogSend(&caughtup);
-               if (caughtup && sentPtr == MyWalSnd->flush &&
-                   !pq_is_send_pending())
-               {
-                   /* Inform the standby that XLOG streaming is done */
-                   EndCommand("COPY 0", DestRemote);
-                   pq_flush();
-
-                   proc_exit(0);
-               }
-           }
+               WalSndDone(send_data);
        }
 
        /*
         * We don't block if not caught up, unless there is unsent data
         * pending in which case we'd better block until the socket is
-        * write-ready.  This test is only needed for the case where XLogSend
+        * write-ready.  This test is only needed for the case where send_data
         * loaded a subset of the available data but then pq_flush_if_writable
         * flushed it all --- we should immediately try to send more.
         */
-       if ((caughtup && !streamingDoneSending) || pq_is_send_pending())
+       if ((WalSndCaughtUp && !streamingDoneSending) || pq_is_send_pending())
        {
            TimestampTz timeout = 0;
            long        sleeptime = 10000;      /* 10 s */
@@ -1442,15 +2002,17 @@ retry:
 }
 
 /*
+ * Send out the WAL in its normal physical/stored form.
+ *
  * Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk,
  * but not yet sent to the client, and buffer it in the libpq output
  * buffer.
  *
- * If there is no unsent WAL remaining, *caughtup is set to true, otherwise
- * *caughtup is set to false.
+ * If there is no unsent WAL remaining, WalSndCaughtUp is set to true,
+ * otherwise WalSndCaughtUp is set to false.
  */
 static void
-XLogSend(bool *caughtup)
+XLogSendPhysical(void)
 {
    XLogRecPtr  SendRqstPtr;
    XLogRecPtr  startptr;
@@ -1459,7 +2021,7 @@ XLogSend(bool *caughtup)
 
    if (streamingDoneSending)
    {
-       *caughtup = true;
+       WalSndCaughtUp = true;
        return;
    }
 
@@ -1576,7 +2138,7 @@ XLogSend(bool *caughtup)
        pq_putmessage_noblock('c', NULL, 0);
        streamingDoneSending = true;
 
-       *caughtup = true;
+       WalSndCaughtUp = true;
 
        elog(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)",
             (uint32) (sendTimeLineValidUpto >> 32), (uint32) sendTimeLineValidUpto,
@@ -1588,7 +2150,7 @@ XLogSend(bool *caughtup)
    Assert(sentPtr <= SendRqstPtr);
    if (SendRqstPtr <= sentPtr)
    {
-       *caughtup = true;
+       WalSndCaughtUp = true;
        return;
    }
 
@@ -1612,15 +2174,15 @@ XLogSend(bool *caughtup)
    {
        endptr = SendRqstPtr;
        if (sendTimeLineIsHistoric)
-           *caughtup = false;
+           WalSndCaughtUp = false;
        else
-           *caughtup = true;
+           WalSndCaughtUp = true;
    }
    else
    {
        /* round down to page boundary. */
        endptr -= (endptr % XLOG_BLCKSZ);
-       *caughtup = false;
+       WalSndCaughtUp = false;
    }
 
    nbytes = endptr - startptr;
@@ -1680,6 +2242,105 @@ XLogSend(bool *caughtup)
    return;
 }
 
+/*
+ * Send out the WAL after it being decoded into a logical format by the output
+ * plugin specified in INIT_LOGICAL_DECODING
+ */
+static void
+XLogSendLogical(void)
+{
+   XLogRecord *record;
+   char       *errm;
+
+   if (decoding_ctx == NULL)
+   {
+       decoding_ctx = AllocSetContextCreate(TopMemoryContext,
+                                            "decoding context",
+                                            ALLOCSET_DEFAULT_MINSIZE,
+                                            ALLOCSET_DEFAULT_INITSIZE,
+                                            ALLOCSET_DEFAULT_MAXSIZE);
+   }
+
+   /*
+    * Don't know whether we've caught up yet. We'll set it to true in
+    * WalSndWaitForWal, if we're actually waiting. We also set to true if
+    * XLogReadRecord() had to stop reading but WalSndWaitForWal didn't wait -
+    * i.e. when we're shutting down.
+    */
+   WalSndCaughtUp = false;
+
+   record = XLogReadRecord(logical_decoding_ctx->reader, logical_startptr, &errm);
+   logical_startptr = InvalidXLogRecPtr;
+
+   /* xlog record was invalid */
+   if (errm != NULL)
+       elog(ERROR, "%s", errm);
+
+   if (record != NULL)
+   {
+       XLogRecordBuffer buf;
+
+       buf.origptr = logical_decoding_ctx->reader->ReadRecPtr;
+       buf.endptr = logical_decoding_ctx->reader->EndRecPtr;
+       buf.record = *record;
+       buf.record_data = XLogRecGetData(record);
+
+       old_decoding_ctx = MemoryContextSwitchTo(decoding_ctx);
+
+       DecodeRecordIntoReorderBuffer(logical_decoding_ctx, &buf);
+
+       MemoryContextSwitchTo(old_decoding_ctx);
+
+       sentPtr = logical_decoding_ctx->reader->EndRecPtr;
+   }
+   else
+   {
+       /*
+        * If the record we just wanted read is at or beyond the flushed point,
+        * then we're caught up.
+        */
+       if (logical_decoding_ctx->reader->EndRecPtr >= GetFlushRecPtr())
+           WalSndCaughtUp = true;
+   }
+
+   /* Update shared memory status */
+   {
+       /* use volatile pointer to prevent code rearrangement */
+       volatile WalSnd *walsnd = MyWalSnd;
+
+       SpinLockAcquire(&walsnd->mutex);
+       walsnd->sentPtr = sentPtr;
+       SpinLockRelease(&walsnd->mutex);
+   }
+}
+
+/*
+ * The sender is caught up, so we can go away for shutdown processing
+ * to finish normally.  (This should only be called when the shutdown
+ * signal has been received from postmaster.)
+ *
+ * Note that if while doing this we determine that there's still more
+ * data to send, this function will return control to the caller.
+ */
+static void
+WalSndDone(WalSndSendData send_data)
+{
+   /* ... let's just be real sure we're caught up ... */
+   send_data();
+
+   if (WalSndCaughtUp && sentPtr == MyWalSnd->flush &&
+       !pq_is_send_pending())
+   {
+       /* Inform the standby that XLOG streaming is done */
+       EndCommand("COPY 0", DestRemote);
+       pq_flush();
+
+       proc_exit(0);
+   }
+   if (!ping_sent)
+       WalSndKeepalive(true);
+}
+
 /*
  * Returns the latest point in WAL that has been safely flushed to disk, and
  * can be sent to the standby. This should only be called when in recovery,
index ff9af7691c91af86275a7a651a4201c5fed377ec..4238797d2182ac0d96de2cceef4cbba0ec825794 100644 (file)
@@ -411,6 +411,9 @@ typedef enum NodeTag
    T_IdentifySystemCmd,
    T_BaseBackupCmd,
    T_StartReplicationCmd,
+   T_InitLogicalReplicationCmd,
+   T_StartLogicalReplicationCmd,
+   T_FreeLogicalReplicationCmd,
    T_TimeLineHistoryCmd,
 
    /*
index 85b45448d396bd6bfc1fd9bf7064972c1f5fca8e..3da8d401df4d0a216b0779152bec41f64a2ccffc 100644 (file)
@@ -51,6 +51,41 @@ typedef struct StartReplicationCmd
 } StartReplicationCmd;
 
 
+/* ----------------------
+ *     INIT_LOGICAL_REPLICATION command
+ * ----------------------
+ */
+typedef struct InitLogicalReplicationCmd
+{
+   NodeTag     type;
+   char       *name;
+   char       *plugin;
+} InitLogicalReplicationCmd;
+
+
+/* ----------------------
+ *     START_LOGICAL_REPLICATION command
+ * ----------------------
+ */
+typedef struct StartLogicalReplicationCmd
+{
+   NodeTag     type;
+   char       *name;
+   XLogRecPtr  startpoint;
+   List       *options;
+} StartLogicalReplicationCmd;
+
+/* ----------------------
+ *     FREE_LOGICAL_REPLICATION command
+ * ----------------------
+ */
+typedef struct FreeLogicalReplicationCmd
+{
+   NodeTag     type;
+   char       *name;
+} FreeLogicalReplicationCmd;
+
+
 /* ----------------------
  *     TIMELINE_HISTORY command
  * ----------------------
index 7eaa21b9f7e6eb8ce02a96d1d35b0e916356dc9f..28d98d500742da07fe1435c0cd14d35d99283098 100644 (file)
@@ -108,4 +108,7 @@ extern void replication_scanner_finish(void);
 
 extern Node *replication_parse_result;
 
+/* logical wal sender data gathering functions */
+extern XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
+
 #endif   /* _WALSENDER_PRIVATE_H */
index 65edbc2f587e9eb88fb906cc027c548bbeda732e..de8bca96c14f61162f1731056ddb16a898b5f387 100644 (file)
@@ -621,6 +621,7 @@ Form_pg_ts_template
 Form_pg_type
 Form_pg_user_mapping
 FormatNode
+FreeLogicalReplicationCmd
 FromCharDateMode
 FromExpr
 FuncCall
@@ -819,6 +820,7 @@ IndxInfo
 InfoItem
 InhInfo
 InhOption
+InitLogicalReplicationCmd
 InheritableSocket
 InlineCodeBlock
 InsertStmt
@@ -1683,6 +1685,7 @@ StandardChunkHeader
 StartBlobPtr
 StartBlobsPtr
 StartDataPtr
+StartLogicalReplicationCmd
 StartReplicationCmd
 StartupPacket
 StatEntry
@@ -1906,6 +1909,7 @@ WalRcvData
 WalRcvState
 WalSnd
 WalSndCtlData
+WalSndSendData
 WalSndState
 WholeRowVarExprState
 WindowAgg