TMP: work around missing snapshot registrations.
authorAndres Freund <andres@anarazel.de>
Tue, 7 Apr 2020 04:28:55 +0000 (21:28 -0700)
committerAndres Freund <andres@anarazel.de>
Tue, 7 Apr 2020 04:28:55 +0000 (21:28 -0700)
This is just what's hit by the tests. It's not an actual fix.

src/backend/catalog/namespace.c
src/backend/catalog/pg_subscription.c
src/backend/commands/indexcmds.c
src/backend/commands/tablecmds.c
src/backend/replication/logical/tablesync.c
src/backend/replication/logical/worker.c
src/backend/utils/time/snapmgr.c

index 2ec23016fe53a912bb6610037555b064719b0c85..e4696d8d417e0bac730601a8ba6771a9f745be28 100644 (file)
@@ -55,6 +55,7 @@
 #include "utils/inval.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
+#include "utils/snapmgr.h"
 #include "utils/syscache.h"
 #include "utils/varlena.h"
 
@@ -4244,12 +4245,18 @@ RemoveTempRelationsCallback(int code, Datum arg)
 {
    if (OidIsValid(myTempNamespace))    /* should always be true */
    {
+       Snapshot snap;
+
        /* Need to ensure we have a usable transaction. */
        AbortOutOfAnyTransaction();
        StartTransactionCommand();
 
+       /* ensure xmin stays set */
+       snap = RegisterSnapshot(GetCatalogSnapshot(InvalidOid));
+
        RemoveTempRelations(myTempNamespace);
 
+       UnregisterSnapshot(snap);
        CommitTransactionCommand();
    }
 }
index cb1573111545d331f7a546e9f725314cd5958c2d..4a324dfb4f1abf962c7d5a40ce3b8b695900884a 100644 (file)
@@ -31,6 +31,7 @@
 #include "utils/fmgroids.h"
 #include "utils/pg_lsn.h"
 #include "utils/rel.h"
+#include "utils/snapmgr.h"
 #include "utils/syscache.h"
 
 static List *textarray_to_stringlist(ArrayType *textarray);
@@ -286,6 +287,7 @@ UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
    bool        nulls[Natts_pg_subscription_rel];
    Datum       values[Natts_pg_subscription_rel];
    bool        replaces[Natts_pg_subscription_rel];
+   Snapshot snap = RegisterSnapshot(GetCatalogSnapshot(InvalidOid));
 
    LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
 
@@ -321,6 +323,8 @@ UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
 
    /* Cleanup. */
    table_close(rel, NoLock);
+
+   UnregisterSnapshot(snap);
 }
 
 /*
index 2baca12c5f475aad6fbf58df29bdfa36a4f20ee0..094bf6139f0596a83e4e6dd82b36179aa5bfb417 100644 (file)
@@ -2837,6 +2837,7 @@ ReindexRelationConcurrently(Oid relationOid, int options)
    char       *relationName = NULL;
    char       *relationNamespace = NULL;
    PGRUsage    ru0;
+   Snapshot    snap;
 
    /*
     * Create a memory context that will survive forced transaction commits we
@@ -3306,6 +3307,7 @@ ReindexRelationConcurrently(Oid relationOid, int options)
     */
 
    StartTransactionCommand();
+   snap = RegisterSnapshot(GetCatalogSnapshot(InvalidOid));
 
    forboth(lc, indexIds, lc2, newIndexIds)
    {
@@ -3354,8 +3356,11 @@ ReindexRelationConcurrently(Oid relationOid, int options)
    }
 
    /* Commit this transaction and make index swaps visible */
+   UnregisterSnapshot(snap);
    CommitTransactionCommand();
+
    StartTransactionCommand();
+   snap = RegisterSnapshot(GetCatalogSnapshot(InvalidOid));
 
    /*
     * Phase 5 of REINDEX CONCURRENTLY
@@ -3386,7 +3391,9 @@ ReindexRelationConcurrently(Oid relationOid, int options)
    }
 
    /* Commit this transaction to make the updates visible. */
+   UnregisterSnapshot(snap);
    CommitTransactionCommand();
+
    StartTransactionCommand();
 
    /*
@@ -3400,6 +3407,7 @@ ReindexRelationConcurrently(Oid relationOid, int options)
    WaitForLockersMultiple(lockTags, AccessExclusiveLock, true);
 
    PushActiveSnapshot(GetTransactionSnapshot());
+   snap = RegisterSnapshot(GetCatalogSnapshot(InvalidOid));
 
    {
        ObjectAddresses *objects = new_object_addresses();
@@ -3425,6 +3433,7 @@ ReindexRelationConcurrently(Oid relationOid, int options)
    }
 
    PopActiveSnapshot();
+   UnregisterSnapshot(snap);
    CommitTransactionCommand();
 
    /*
index 6162fb018c72f212bd8495115f2e7778afffc837..e1eacc6a4a63494e4996c93e4df927190aec1e0d 100644 (file)
@@ -15200,6 +15200,7 @@ PreCommit_on_commit_actions(void)
    ListCell   *l;
    List       *oids_to_truncate = NIL;
    List       *oids_to_drop = NIL;
+   Snapshot    snap;
 
    foreach(l, on_commits)
    {
@@ -15231,6 +15232,11 @@ PreCommit_on_commit_actions(void)
        }
    }
 
+   if (oids_to_truncate == NIL && oids_to_drop == NIL)
+       return;
+
+   snap = RegisterSnapshot(GetCatalogSnapshot(InvalidOid));
+
    /*
     * Truncate relations before dropping so that all dependencies between
     * relations are removed after they are worked on.  Doing it like this
@@ -15284,6 +15290,8 @@ PreCommit_on_commit_actions(void)
        }
 #endif
    }
+
+   UnregisterSnapshot(snap);
 }
 
 /*
index c27d97058955a9c6023afb71b8017648cd740ea3..aec5a044790fdc76afd68516501da6730354a86a 100644 (file)
@@ -863,6 +863,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
            {
                Relation    rel;
                WalRcvExecResult *res;
+               Snapshot    snap;
 
                SpinLockAcquire(&MyLogicalRepWorker->relmutex);
                MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC;
@@ -871,10 +872,14 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 
                /* Update the state and make it visible to others. */
                StartTransactionCommand();
+               snap = RegisterSnapshot(GetCatalogSnapshot(InvalidOid));
+
                UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
                                           MyLogicalRepWorker->relid,
                                           MyLogicalRepWorker->relstate,
                                           MyLogicalRepWorker->relstate_lsn);
+
+               UnregisterSnapshot(snap);
                CommitTransactionCommand();
                pgstat_report_stat(false);
 
@@ -918,6 +923,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
                                   CRS_USE_SNAPSHOT, origin_startpos);
 
                PushActiveSnapshot(GetTransactionSnapshot());
+               snap = RegisterSnapshot(GetCatalogSnapshot(InvalidOid));
                copy_table(rel);
                PopActiveSnapshot();
 
@@ -933,6 +939,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
                /* Make the copy visible. */
                CommandCounterIncrement();
 
+               UnregisterSnapshot(snap);
+
                /*
                 * We are done with the initial data synchronization, update
                 * the state.
@@ -957,6 +965,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
                 */
                if (*origin_startpos >= MyLogicalRepWorker->relstate_lsn)
                {
+                   snap = RegisterSnapshot(GetCatalogSnapshot(InvalidOid));
+
                    /*
                     * Update the new state in catalog.  No need to bother
                     * with the shmem state as we are exiting for good.
@@ -965,6 +975,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
                                               MyLogicalRepWorker->relid,
                                               SUBREL_STATE_SYNCDONE,
                                               *origin_startpos);
+                   UnregisterSnapshot(snap);
+
                    finish_sync_worker();
                }
                break;
index a752a1224d6fef6f168198e12436a10a9a8fa7b3..f10f3f843d1f8d32c49391a9534c842e5d405ca7 100644 (file)
@@ -1245,6 +1245,9 @@ apply_handle_truncate(StringInfo s)
 
    ensure_transaction();
 
+   /* catalog modifications need a set snapshot */
+   PushActiveSnapshot(GetTransactionSnapshot());
+
    remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
 
    foreach(lc, remote_relids)
@@ -1332,6 +1335,7 @@ apply_handle_truncate(StringInfo s)
    }
 
    CommandCounterIncrement();
+   PopActiveSnapshot();
 }
 
 
index 1c063c592cebbfe05ca46e5bcc9b437d8aa81425..b5cff157bf63012cea892ce94465169bf194ef4b 100644 (file)
@@ -441,6 +441,8 @@ GetOldestSnapshot(void)
 Snapshot
 GetCatalogSnapshot(Oid relid)
 {
+   Assert(IsTransactionState());
+
    /*
     * Return historic snapshot while we're doing logical decoding, so we can
     * see the appropriate state of the catalog.
@@ -1017,6 +1019,8 @@ SnapshotResetXmin(void)
    if (pairingheap_is_empty(&RegisteredSnapshots))
    {
        MyPgXact->xmin = InvalidTransactionId;
+       TransactionXmin = InvalidTransactionId;
+       RecentXmin = InvalidTransactionId;
        return;
    }