Put back some logical decoding stuff csn
authorHeikki Linnakangas <heikki.linnakangas@iki.fi>
Thu, 22 May 2014 03:49:02 +0000 (23:49 -0400)
committerHeikki Linnakangas <heikki.linnakangas@iki.fi>
Thu, 22 May 2014 11:44:11 +0000 (07:44 -0400)
src/backend/commands/vacuum.c
src/backend/storage/ipc/procarray.c
src/backend/storage/ipc/standby.c
src/include/storage/procarray.h

index 2d37b8d899de53c51e268c513e2dc2da37b4272e..bfc255016a106f1cc996317f857db83a27f08e64 100644 (file)
@@ -428,9 +428,13 @@ vacuum_set_xid_limits(Relation rel,
         */
        *oldestSnapshot = GetOldestSnapshotLSN(rel, true);
 
-       AdvanceRecentGlobalXmin();
+       /*
+        * Note that we no longer use the oldestXmin value for deciding which
+        * tuples can be removed. That's oldestSnapshot's charter now.  oldestXmin
+        * is only used to calculate the freeze limit.
+        */
+       oldestXmin = GetOldestXmin(rel, true);
 
-       oldestXmin = RecentGlobalXmin;
        Assert(TransactionIdIsNormal(oldestXmin));
 
        /*
index 22225afe1e0b29753d5608f65306cc9746bd98c1..e267a86388f4aa415b8f1827933ea65a2cda0f54 100644 (file)
@@ -342,8 +342,6 @@ ProcArrayInitRecovery(TransactionId initializedUptoXID)
        Assert(TransactionIdIsNormal(initializedUptoXID));
 
        /*
-        * FIXME: comment refers to SUBTRANS, I don't think this is needed anymore
-        *
         * we set latestObservedXid to the xid SUBTRANS has been initialized upto,
         * so we can extend it from that point onwards in
         * RecordKnownAssignedTransactionIds, and when we get consistent in
@@ -561,14 +559,16 @@ AdvanceRecentGlobalXmin(void)
 
        oldestSnapshot = GetOldestSnapshotLSN(NULL, false);
 
-       LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+       LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
 
+       /* Do step 1, if we don't have a global xmin pending already. */
        if (ShmemVariableCache->pendingGlobalXmin == InvalidTransactionId)
        {
                ShmemVariableCache->pendingGlobalXmin = xmin;
                ShmemVariableCache->pendingLSN = pendingLSN;
        }
 
+       /* Have we finished the wait in step 3? */
        if (oldestSnapshot <= ShmemVariableCache->pendingLSN)
        {
                /* install new globalxmin */
@@ -581,7 +581,7 @@ AdvanceRecentGlobalXmin(void)
 
        RecentGlobalXmin = ShmemVariableCache->globalXmin;
 
-       LWLockRelease(ProcArrayLock);
+       LWLockRelease(XidGenLock);
 }
 
 /*
@@ -632,11 +632,56 @@ AdvanceRecentGlobalXmin(void)
  * increasing that setting on the fly is another easy way to make
  * GetOldestXmin() move backwards, with no consequences for data integrity.
  */
+TransactionId
+GetOldestXmin(Relation rel, bool ignoreVacuum)
+{
+       TransactionId result;
+
+       volatile TransactionId replication_slot_xmin = InvalidTransactionId;
+       volatile TransactionId replication_slot_catalog_xmin = InvalidTransactionId;
+
+       AdvanceRecentGlobalXmin();
+
+       result = RecentGlobalXmin;
+
+       LWLockAcquire(ProcArrayLock, LW_SHARED);
+
+       /* fetch into volatile var while ProcArrayLock is held */
+       replication_slot_xmin = procArray->replication_slot_xmin;
+       replication_slot_catalog_xmin = procArray->replication_slot_catalog_xmin;
+
+       LWLockRelease(ProcArrayLock);
+
+       /*
+        * Check whether there are replication slots requiring an older xmin.
+        */
+       if (TransactionIdIsValid(replication_slot_xmin) &&
+               NormalTransactionIdPrecedes(replication_slot_xmin, result))
+               result = replication_slot_xmin;
+
+       /*
+        * After locks have been released and defer_cleanup_age has been applied,
+        * check whether we need to back up further to make logical decoding
+        * possible. We need to do so if we're computing the global limit (rel =
+        * NULL) or if the passed relation is a catalog relation of some kind.
+        */
+       if ((rel == NULL ||
+                RelationIsAccessibleInLogicalDecoding(rel)) &&
+               TransactionIdIsValid(replication_slot_catalog_xmin) &&
+               NormalTransactionIdPrecedes(replication_slot_catalog_xmin, result))
+               result = replication_slot_catalog_xmin;
+
+       return result;
+}
 
 /*
  * Get the LSN of the oldest snapshot still active.
  *
  * With LSN-based snapshots, this is more accurate than GetOldestXmin().
+ *
+ * FIXME: the replication_slot_xmin and replication_slot_catalog_xmin values
+ * don't affect this, so when this is used to decide if a dead tuple can be
+ * vacuumed, it breaks logical decoding.
  */
 XLogRecPtr
 GetOldestSnapshotLSN(Relation rel, bool ignoreVacuum)
@@ -734,7 +779,7 @@ GetSnapshotData(Snapshot snapshot)
 
        takenDuringRecovery = RecoveryInProgress();
 
-       LWLockAcquire(ProcArrayLock, LW_SHARED);
+       LWLockAcquire(XidGenLock, LW_SHARED);
 
        if (takenDuringRecovery)
        {
@@ -753,7 +798,7 @@ GetSnapshotData(Snapshot snapshot)
        /* update RecentGlobalXmin while we have a chance */
        RecentGlobalXmin = ShmemVariableCache->globalXmin;
 
-       LWLockRelease(ProcArrayLock);
+       LWLockRelease(XidGenLock);
 
        snapshot->xmin = xmin;
        snapshot->xmax = xmax;
@@ -805,7 +850,10 @@ ProcArrayInstallImportedSnapshot(XLogRecPtr snapshotlsn, TransactionId sourcexid
        if (!TransactionIdIsNormal(sourcexid))
                return false;
 
-       /* Get exclusive lock so source xact can't end while we're doing this */
+       /*
+        * Get exclusive lock so source xact can't end while we're doing this.
+        * FIXME: holding the ProcArrayLock doesn't do the trick anymore
+        */
        LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
 
        for (index = 0; index < arrayP->numProcs; index++)
@@ -860,29 +908,20 @@ ProcArrayInstallImportedSnapshot(XLogRecPtr snapshotlsn, TransactionId sourcexid
 /*
  * GetRunningTransactionData -- returns information about running transactions.
  *
- * Similar to GetSnapshotData but returns more information. We include
- * all PGXACTs with an assigned TransactionId, even VACUUM processes.
+ * We acquire XigGenlock, but the caller is responsible for releasing it.
+ * Acquiring XigGenLock ensures that no new XID can be assigned until
+ * the caller has WAL-logged this snapshot, and releases the lock.
+ * FIXME: I'm not sure if we need that interlock anymore.
  *
- * We acquire XidGenLock and ProcArrayLock, but the caller is responsible for
- * releasing them. Acquiring XidGenLock ensures that no new XIDs enter the proc
- * array until the caller has WAL-logged this snapshot, and releases the
- * lock. Acquiring ProcArrayLock ensures that no transactions commit until the
- * lock is released.
+ * Returns the current xmin and xmax, like GetSnapshotData does.
+ * FIXME: not sure we need this separately from GetSnapshotdata anymore.
  *
  * The returned data structure is statically allocated; caller should not
  * modify it, and must not assume it is valid past the next call.
  *
- * This is never executed during recovery so there is no need to look at
- * KnownAssignedXids.
- *
  * We don't worry about updating other counters, we want to keep this as
  * simple as possible and leave GetSnapshotData() as the primary code for
  * that bookkeeping.
- *
- * Note that if any transaction has overflowed its cached subtransactions
- * then there is no real need include any subtransactions. That isn't a
- * common enough case to worry about optimising the size of the WAL record,
- * and we may wish to see that data for diagnostic purposes anyway.
  */
 RunningTransactions
 GetRunningTransactionData(void)
@@ -898,7 +937,6 @@ GetRunningTransactionData(void)
         * Ensure that no xids enter or leave the procarray while we obtain
         * snapshot.
         */
-       LWLockAcquire(ProcArrayLock, LW_SHARED);
        LWLockAcquire(XidGenLock, LW_SHARED);
 
        oldestRunningXid = ShmemVariableCache->nextXid;
@@ -926,14 +964,11 @@ GetRunningTransactionData(void)
 /*
  * GetOldestActiveTransactionId()
  *
- * Similar to GetSnapshotData but returns just oldestActiveXid. We include
+ * Returns the oldest XID that's still running. We include
  * all PGXACTs with an assigned TransactionId, even VACUUM processes.
  * We look at all databases, though there is no need to include WALSender
  * since this has no effect on hot standby conflicts.
  *
- * This is never executed during recovery so there is no need to look at
- * KnownAssignedXids.
- *
  * We don't worry about updating other counters, we want to keep this as
  * simple as possible and leave GetSnapshotData() as the primary code for
  * that bookkeeping.
@@ -1046,6 +1081,8 @@ GetOldestSafeDecodingTransactionId(void)
         * above, so we'll have to wait a bit longer there. We unfortunately can
         * *not* use KnownAssignedXidsGetOldestXmin() since the KnownAssignedXids
         * machinery can miss values and return an older value than is safe.
+        * FIXME: I don't understand this stuff, but at least the comments need
+        * adjustment because KnownAssignedXids is no more.
         */
        if (!recovery_in_progress)
        {
index 0322eba79afe8334c26b68f38b8a4b38670e665a..013d69c3ed5c4f7c17a6d3a8753cda737d7d42ae 100644 (file)
@@ -870,27 +870,8 @@ LogStandbySnapshot(void)
         */
        running = GetRunningTransactionData();
 
-       /*
-        * GetRunningTransactionData() acquired ProcArrayLock, we must release it.
-        * For Hot Standby this can be done before inserting the WAL record
-        * because ProcArrayApplyRecoveryInfo() rechecks the commit status using
-        * the clog. For logical decoding, though, the lock can't be released
-        * early becuase the clog might be "in the future" from the POV of the
-        * historic snapshot. This would allow for situations where we're waiting
-        * for the end of a transaction listed in the xl_running_xacts record
-        * which, according to the WAL, have commit before the xl_running_xacts
-        * record. Fortunately this routine isn't executed frequently, and it's
-        * only a shared lock.
-        */
-       if (wal_level < WAL_LEVEL_LOGICAL)
-               LWLockRelease(ProcArrayLock);
-
        recptr = LogCurrentRunningXacts(running);
 
-       /* Release lock if we kept it longer ... */
-       if (wal_level >= WAL_LEVEL_LOGICAL)
-               LWLockRelease(ProcArrayLock);
-
        /* GetRunningTransactionData() acquired XidGenLock, we must release it */
        LWLockRelease(XidGenLock);
 
index 19a6e8b2485630ce99bb7a38cb8f7bb7b71bc441..4fd938ee5d3015a9a9964c1e1a695c3aac5983ff 100644 (file)
@@ -45,6 +45,7 @@ extern RunningTransactions GetRunningTransactionData(void);
 
 extern bool TransactionIdIsActive(TransactionId xid);
 extern XLogRecPtr GetOldestSnapshotLSN(Relation rel, bool ignoreVacuum);
+extern TransactionId GetOldestXmin(Relation rel, bool ignoreVacuum);
 extern TransactionId GetOldestActiveTransactionId(void);
 extern TransactionId GetOldestSafeDecodingTransactionId(void);