Decode wal (with wal_level=logical) into changes in an ApplyCache instance
authorAndres Freund <andres@anarazel.de>
Thu, 7 Jun 2012 10:15:29 +0000 (12:15 +0200)
committerAndres Freund <andres@anarazel.de>
Mon, 18 Jun 2012 11:11:40 +0000 (13:11 +0200)
This requires an up2date catalog and can thus only be run on a replica.

Missing:
- HEAP_NEWPAGE support
- HEAP2_MULTI_INSERT support
- DDL integration. *No* ddl, including TRUNCATE is possible atm

src/backend/replication/logical/Makefile
src/backend/replication/logical/decode.c [new file with mode: 0644]
src/include/replication/decode.h [new file with mode: 0644]

index 2eadab8c3c7b14ebc7fb4d2a711d27dc56b3764b..7dd966325ed23ec318c488228983b795ea5741b2 100644 (file)
@@ -14,6 +14,6 @@ include $(top_builddir)/src/Makefile.global
 
 override CPPFLAGS := -I$(srcdir) $(CPPFLAGS)
 
-OBJS = applycache.o
+OBJS = applycache.o decode.o
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
new file mode 100644 (file)
index 0000000..7e07d50
--- /dev/null
@@ -0,0 +1,439 @@
+/*-------------------------------------------------------------------------
+ *
+ * decode.c
+ *
+ * Decodes wal records from an xlogreader.h callback into an applycache
+ *
+ *
+ * Portions Copyright (c) 2010-2012, PostgreSQL Global Development Group
+ *
+ *
+ * IDENTIFICATION
+ *   src/backend/replication/logical/decode.c
+ *
+ */
+#include "postgres.h"
+
+#include "access/heapam.h"
+#include "access/transam.h"
+#include "access/xlog_internal.h"
+#include "access/xact.h"
+
+#include "replication/applycache.h"
+#include "replication/decode.h"
+
+#include "utils/memutils.h"
+#include "utils/syscache.h"
+#include "utils/lsyscache.h"
+
+static void DecodeXLogTuple(char* data, Size len,
+                            HeapTuple table, ApplyCacheTupleBuf* tuple);
+
+static void DecodeInsert(ApplyCache *cache, XLogRecordBuffer* buf);
+
+static void DecodeUpdate(ApplyCache *cache, XLogRecordBuffer* buf);
+
+static void DecodeDelete(ApplyCache *cache, XLogRecordBuffer* buf);
+
+static void DecodeNewpage(ApplyCache *cache, XLogRecordBuffer* buf);
+static void DecodeMultiInsert(ApplyCache *cache, XLogRecordBuffer* buf);
+
+static void DecodeCommit(ApplyCache* cache, XLogRecordBuffer* buf, TransactionId xid,
+                        TransactionId *sub_xids, int nsubxacts);
+
+
+void DecodeRecordIntoApplyCache(ApplyCache *cache, XLogRecordBuffer* buf)
+{
+   XLogRecord* r = &buf->record;
+   uint8 info = r->xl_info & ~XLR_INFO_MASK;
+
+   switch (r->xl_rmid)
+   {
+       case RM_HEAP_ID:
+       {
+           info &= XLOG_HEAP_OPMASK;
+           switch (info)
+           {
+               case XLOG_HEAP_INSERT:
+                   DecodeInsert(cache, buf);
+                   break;
+
+               /* no guarantee that we get an HOT update again, so handle it as a normal update*/
+               case XLOG_HEAP_HOT_UPDATE:
+               case XLOG_HEAP_UPDATE:
+                   DecodeUpdate(cache, buf);
+                   break;
+
+               case XLOG_HEAP_NEWPAGE:
+                   DecodeNewpage(cache, buf);
+                   break;
+
+               case XLOG_HEAP_DELETE:
+                   DecodeDelete(cache, buf);
+                   break;
+               default:
+                   break;
+           }
+           break;
+       }
+       case RM_HEAP2_ID:
+       {
+           info &= XLOG_HEAP_OPMASK;
+           switch (info)
+           {
+               case XLOG_HEAP2_MULTI_INSERT:
+                   /* this also handles the XLOG_HEAP_INIT_PAGE case */
+                   DecodeMultiInsert(cache, buf);
+                   break;
+               default:
+                   /* everything else here is just physical stuff were not interested in */
+                   break;
+           }
+           break;
+       }
+
+       case RM_XACT_ID:
+       {
+           switch (info)
+           {
+               case XLOG_XACT_COMMIT:
+               {
+                   TransactionId *sub_xids;
+                   xl_xact_commit *xlrec = (xl_xact_commit*)buf->record_data;
+
+                   /* FIXME: this is not really allowed if there is no subtransactions */
+                   sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
+                   DecodeCommit(cache, buf, r->xl_xid, sub_xids, xlrec->nsubxacts);
+
+                   break;
+               }
+               case XLOG_XACT_COMMIT_PREPARED:
+               {
+                   TransactionId *sub_xids;
+                   xl_xact_commit_prepared *xlrec = (xl_xact_commit_prepared*)buf->record_data;
+
+                   sub_xids = (TransactionId *) &(xlrec->crec.xnodes[xlrec->crec.nrels]);
+
+                   DecodeCommit(cache, buf, r->xl_xid, sub_xids,
+                                xlrec->crec.nsubxacts);
+
+                   break;
+               }
+               case XLOG_XACT_COMMIT_COMPACT:
+               {
+                   xl_xact_commit_compact *xlrec = (xl_xact_commit_compact*)buf->record_data;
+                   DecodeCommit(cache, buf, r->xl_xid, xlrec->subxacts,
+                                xlrec->nsubxacts);
+                   break;
+               }
+               case XLOG_XACT_ABORT:
+               case XLOG_XACT_ABORT_PREPARED:
+               {
+                   TransactionId *sub_xids;
+                   xl_xact_abort *xlrec = (xl_xact_abort*)buf->record_data;
+                   int i;
+
+                   /* FIXME: this is not really allowed if there is no subtransactions */
+                   sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
+
+                   for(i = 0; i < xlrec->nsubxacts; i++)
+                   {
+                       ApplyCacheAbort(cache, *sub_xids, buf->origptr);
+                       sub_xids += 1;
+                   }
+
+                   /* TODO: check that this also contains not-yet-aborted subtxns */
+                   ApplyCacheAbort(cache, r->xl_xid, buf->origptr);
+
+                   elog(WARNING, "ABORT %u", r->xl_xid);
+                   break;
+               }
+               case XLOG_XACT_ASSIGNMENT:
+                   /*
+                    * XXX: We could reassign transactions to the parent here
+                    * to save space and effort when merging transactions at
+                    * commit.
+                    */
+                   break;
+               case XLOG_XACT_PREPARE:
+                   /*
+                    * FXIME: we should replay the transaction and prepare it
+                    * as well.
+                    */
+                   break;
+               default:
+                   break;
+                   ;
+           }
+           break;
+       }
+       default:
+           break;
+   }
+}
+
+static void
+DecodeCommit(ApplyCache* cache, XLogRecordBuffer* buf, TransactionId xid,
+             TransactionId *sub_xids, int nsubxacts)
+{
+   int i;
+
+   for (i = 0; i < nsubxacts; i++)
+   {
+       ApplyCacheCommitChild(cache, xid, *sub_xids, buf->origptr);
+       sub_xids++;
+   }
+
+   /* replay actions of all transaction + subtransactions in order */
+   ApplyCacheCommit(cache, xid, buf->origptr);
+}
+
+static void DecodeInsert(ApplyCache *cache, XLogRecordBuffer* buf)
+{
+   XLogRecord* r = &buf->record;
+   xl_heap_insert *xlrec = (xl_heap_insert *) buf->record_data;
+
+   Oid relfilenode = xlrec->target.node.relNode;
+
+   ApplyCacheChange* change;
+
+   if (r->xl_info & XLR_BKP_BLOCK_1
+       && r->xl_len < (SizeOfHeapUpdate + SizeOfHeapHeader))
+   {
+       elog(FATAL, "huh, no tuple data on wal_level = logical?");
+   }
+
+   if(relfilenode == 0)
+   {
+       elog(ERROR, "nailed catalog changed");
+   }
+
+   change = ApplyCacheGetChange(cache);
+   change->action = APPLY_CACHE_CHANGE_INSERT;
+
+   /*
+    * Lookup the pg_class entry for the relfilenode to get the real oid
+    */
+   {
+       MemoryContext curctx = MemoryContextSwitchTo(TopMemoryContext);
+       change->table = SearchSysCacheCopy1(RELFILENODE,
+                                           relfilenode);
+       MemoryContextSwitchTo(curctx);
+   }
+
+   if (!HeapTupleIsValid(change->table))
+   {
+#ifdef SHOULD_BE_HANDLED_BETTER
+       elog(WARNING, "cache lookup failed for relfilenode %u, systable?",
+            relfilenode);
+#endif
+       ApplyCacheReturnChange(cache, change);
+       return;
+   }
+
+   if (HeapTupleGetOid(change->table) < FirstNormalObjectId)
+   {
+#ifdef VERBOSE_DEBUG
+       elog(LOG, "skipping change to systable");
+#endif
+       ApplyCacheReturnChange(cache, change);
+       return;
+   }
+
+#ifdef VERBOSE_DEBUG
+   {
+       /*for accessing the cache */
+       Form_pg_class class_form;
+       class_form = (Form_pg_class) GETSTRUCT(change->table);
+       elog(WARNING, "INSERT INTO \"%s\"", NameStr(class_form->relname));
+   }
+#endif
+
+   change->newtuple = ApplyCacheGetTupleBuf(cache);
+
+   DecodeXLogTuple((char*)xlrec + SizeOfHeapInsert,
+                   r->xl_len - SizeOfHeapInsert,
+                   change->table, change->newtuple);
+
+   ApplyCacheAddChange(cache, r->xl_xid, buf->origptr, change);
+}
+
+static void
+DecodeUpdate(ApplyCache *cache, XLogRecordBuffer* buf)
+{
+   XLogRecord* r = &buf->record;
+   xl_heap_update *xlrec = (xl_heap_update *) buf->record_data;
+
+   Oid relfilenode = xlrec->target.node.relNode;
+
+   ApplyCacheChange* change;
+
+   if ((r->xl_info & XLR_BKP_BLOCK_1 || r->xl_info & XLR_BKP_BLOCK_2) &&
+       (r->xl_len < (SizeOfHeapUpdate + SizeOfHeapHeader)))
+   {
+       elog(FATAL, "huh, no tuple data on wal_level = logical?");
+   }
+
+   change = ApplyCacheGetChange(cache);
+   change->action = APPLY_CACHE_CHANGE_UPDATE;
+
+   /*
+    * Lookup the pg_class entry for the relfilenode to get the real oid
+    */
+   {
+       MemoryContext curctx = MemoryContextSwitchTo(TopMemoryContext);
+       change->table = SearchSysCacheCopy1(RELFILENODE,
+                                           relfilenode);
+       MemoryContextSwitchTo(curctx);
+   }
+
+   if (!HeapTupleIsValid(change->table))
+   {
+#ifdef SHOULD_BE_HANDLED_BETTER
+       elog(WARNING, "cache lookup failed for relfilenode %u, systable?",
+            relfilenode);
+#endif
+       ApplyCacheReturnChange(cache, change);
+       return;
+   }
+
+   if (HeapTupleGetOid(change->table) < FirstNormalObjectId)
+   {
+#ifdef VERBOSE_DEBUG
+       elog(LOG, "skipping change to systable");
+#endif
+       ApplyCacheReturnChange(cache, change);
+       return;
+   }
+
+#ifdef VERBOSE_DEBUG
+   {
+       /*for accessing the cache */
+       Form_pg_class class_form;
+       class_form = (Form_pg_class) GETSTRUCT(change->table);
+       elog(WARNING, "UPDATE \"%s\"", NameStr(class_form->relname));
+   }
+#endif
+
+   /* FIXME: need to save the old tuple as well if we want primary key updates to work. */
+   change->newtuple = ApplyCacheGetTupleBuf(cache);
+
+   DecodeXLogTuple((char*)xlrec + SizeOfHeapUpdate,
+                   r->xl_len - SizeOfHeapUpdate,
+                   change->table, change->newtuple);
+
+   ApplyCacheAddChange(cache, r->xl_xid, buf->origptr, change);
+}
+
+static void DecodeDelete(ApplyCache *cache, XLogRecordBuffer* buf)
+{
+   XLogRecord* r = &buf->record;
+
+   xl_heap_delete *xlrec = (xl_heap_delete *) buf->record_data;
+
+   Oid relfilenode = xlrec->target.node.relNode;
+
+   ApplyCacheChange* change;
+
+   change = ApplyCacheGetChange(cache);
+   change->action = APPLY_CACHE_CHANGE_DELETE;
+
+   if (r->xl_len <= (SizeOfHeapDelete + SizeOfHeapHeader))
+   {
+       elog(FATAL, "huh, no primary key for a delete on wal_level = logical?");
+   }
+
+   /*
+    * Lookup the pg_class entry for the relfilenode to get the real oid
+    */
+   {
+       MemoryContext curctx = MemoryContextSwitchTo(TopMemoryContext);
+       change->table = SearchSysCacheCopy1(RELFILENODE,
+                                           relfilenode);
+       MemoryContextSwitchTo(curctx);
+   }
+
+   if (!HeapTupleIsValid(change->table))
+   {
+#ifdef SHOULD_BE_HANDLED_BETTER
+       elog(WARNING, "cache lookup failed for relfilenode %u, systable?",
+            relfilenode);
+#endif
+       ApplyCacheReturnChange(cache, change);
+       return;
+   }
+
+   if (HeapTupleGetOid(change->table) < FirstNormalObjectId)
+   {
+#ifdef VERBOSE_DEBUG
+       elog(LOG, "skipping change to systable");
+#endif
+       ApplyCacheReturnChange(cache, change);
+       return;
+   }
+
+#ifdef VERBOSE_DEBUG
+   {
+       /*for accessing the cache */
+       Form_pg_class class_form;
+       class_form = (Form_pg_class) GETSTRUCT(change->table);
+       elog(WARNING, "DELETE FROM \"%s\"", NameStr(class_form->relname));
+   }
+#endif
+
+   change->oldtuple = ApplyCacheGetTupleBuf(cache);
+
+   DecodeXLogTuple((char*)xlrec + SizeOfHeapDelete,
+                   r->xl_len - SizeOfHeapDelete,
+                   change->table, change->oldtuple);
+
+   ApplyCacheAddChange(cache, r->xl_xid, buf->origptr, change);
+}
+
+
+static void
+DecodeNewpage(ApplyCache *cache, XLogRecordBuffer* buf)
+{
+   elog(WARNING, "skipping XLOG_HEAP_NEWPAGE record because we are too dumb");
+}
+
+static void
+DecodeMultiInsert(ApplyCache *cache, XLogRecordBuffer* buf)
+{
+   elog(WARNING, "skipping XLOG_HEAP2_MULTI_INSERT record because we are too dumb");
+}
+
+
+static void DecodeXLogTuple(char* data, Size len,
+                            HeapTuple table, ApplyCacheTupleBuf* tuple)
+{
+   xl_heap_header xlhdr;
+   int datalen = len - SizeOfHeapHeader;
+
+   Assert(datalen >= 0);
+   Assert(datalen <= MaxHeapTupleSize);
+
+   tuple->tuple.t_len = datalen + offsetof(HeapTupleHeaderData, t_bits);
+
+   /* not a disk based tuple */
+   ItemPointerSetInvalid(&tuple->tuple.t_self);
+
+   /* probably not needed, but ... (is it actually valid to set it?) */
+   tuple->tuple.t_tableOid = HeapTupleGetOid(table);
+   tuple->tuple.t_data = &tuple->header;
+
+   /* data is not stored aligned */
+   memcpy((char *) &xlhdr,
+          data,
+          SizeOfHeapHeader);
+
+   memset(&tuple->header, 0, sizeof(HeapTupleHeaderData));
+
+   memcpy((char *) &tuple->header + offsetof(HeapTupleHeaderData, t_bits),
+          data + SizeOfHeapHeader,
+          datalen);
+
+   tuple->header.t_infomask = xlhdr.t_infomask;
+   tuple->header.t_infomask2 = xlhdr.t_infomask2;
+   tuple->header.t_hoff = xlhdr.t_hoff;
+}
diff --git a/src/include/replication/decode.h b/src/include/replication/decode.h
new file mode 100644 (file)
index 0000000..53088e2
--- /dev/null
@@ -0,0 +1,23 @@
+/*-------------------------------------------------------------------------
+ * decode.h
+ *     PostgreSQL WAL to logical transformation
+ *
+ * Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef DECODE_H
+#define DECODE_H
+
+#include "access/xlogreader.h"
+#include "replication/applycache.h"
+
+void DecodeRecordIntoApplyCache(ApplyCache *cache, XLogRecordBuffer* buf);
+
+typedef struct ReaderApplyState
+{
+   ApplyCache *apply_cache;
+} ReaderApplyState;
+
+#endif