*/
struct pg_conn;
+/* Forward declarations */
+struct TupleTableSlot; /* from executor/tuptable.h */
+struct EState; /* from nodes/execnodes.h */
+struct ScanKeyData; /* from access/skey.h for ScanKey */
+enum LockTupleMode; /* from access/heapam.h */
+
+
/*
* Flags to indicate which fields are present in a commit record sent by the
* output plugin.
BDR_OUTPUT_COMMIT_HAS_ORIGIN = 1
} BdrOutputCommitFlags;
+typedef enum BDRConflictHandlerType
+{
+ BDRUpdateUpdateConflictHandler,
+ BDRUpdateDeleteConflictHandler,
+ BDRInsertInsertConflictHandler,
+ BDRInsertUpdateConflictHandler
+} BDRConflictHandlerType;
+
/*
* BdrApplyWorker describes a BDR worker connection.
*
bool is_valid;
} BdrConnectionConfig;
+typedef struct BDRConflictHandler
+{
+ Oid handler_oid;
+ BDRConflictHandlerType handler_type;
+ uint64 timeframe;
+} BDRConflictHandler;
+
+/*
+ * This structure is for caching relation specific information, such as
+ * conflict handlers.
+ */
+typedef struct BDRRelation
+{
+ /* hash key */
+ Oid reloid;
+
+ Relation rel;
+
+ BDRConflictHandler *conflict_handlers;
+ size_t conflict_handlers_len;
+} BDRRelation;
+
+
/*
* Params for every connection in bdr.connections.
*
extern void fetch_sysid_via_node_id(RepNodeId node_id, uint64 *sysid,
TimeLineID *tli);
+/* Index maintenance, heap access, etc */
+extern struct EState * bdr_create_rel_estate(Relation rel);
+extern void UserTableUpdateIndexes(struct EState *estate,
+ struct TupleTableSlot *slot);
+extern void UserTableUpdateOpenIndexes(struct EState *estate,
+ struct TupleTableSlot *slot);
+extern void build_index_scan_keys(struct EState *estate,
+ struct ScanKeyData **scan_keys,
+ struct TupleTableSlot *slot);
+extern void build_index_scan_key(struct ScanKeyData *skey, Relation rel,
+ Relation idxrel,
+ struct TupleTableSlot *slot);
+extern bool find_pkey_tuple(struct ScanKeyData *skey, BDRRelation *rel,
+ Relation idxrel, struct TupleTableSlot *slot, bool
+ lock, enum LockTupleMode mode);
+
/* sequence support */
extern void bdr_sequencer_shmem_init(int nnodes, int sequencers);
extern void bdr_sequencer_init(int seq_slot);
uint64 *out_sysid, TimeLineID* out_timeline, RepNodeId
*out_replication_identifier, char **out_snapshot);
-typedef enum BDRConflictHandlerType
-{
- BDRUpdateUpdateConflictHandler,
- BDRUpdateDeleteConflictHandler,
- BDRInsertInsertConflictHandler,
- BDRInsertUpdateConflictHandler
-} BDRConflictHandlerType;
-
-typedef struct BDRConflictHandler
-{
- Oid handler_oid;
- BDRConflictHandlerType handler_type;
- uint64 timeframe;
-} BDRConflictHandler;
-
-/*
- * This structure is for caching relation specific information, such as
- * conflict handlers.
- */
-typedef struct BDRRelation
-{
- /* hash key */
- Oid reloid;
-
- Relation rel;
-
- BDRConflictHandler *conflict_handlers;
- size_t conflict_handlers_len;
-} BDRRelation;
-
/* use instead of heap_open()/heap_close() */
extern BDRRelation *bdr_heap_open(Oid reloid, LOCKMODE lockmode);
extern void bdr_heap_close(BDRRelation * rel, LOCKMODE lockmode);
#include "pgstat.h"
#include "access/committs.h"
-#include "access/heapam.h"
#include "access/htup_details.h"
#include "access/relscan.h"
#include "access/xact.h"
#include "catalog/namespace.h"
#include "catalog/pg_type.h"
-#include "executor/executor.h"
-
#include "libpq/pqformat.h"
-#include "parser/parse_relation.h"
#include "parser/parse_type.h"
#include "replication/logical.h"
#include "replication/replication_identifier.h"
-#include "storage/bufmgr.h"
#include "storage/lmgr.h"
#include "storage/lwlock.h"
#include "utils/memutils.h"
#include "utils/snapmgr.h"
#include "utils/syscache.h"
-#include "utils/tqual.h"
/* Useful for development:
#define VERBOSE_INSERT
*/
BdrConnectionConfig *bdr_apply_config = NULL;
-static void build_index_scan_keys(EState *estate, ScanKey *scan_keys, TupleTableSlot *slot);
-static void build_index_scan_key(ScanKey skey, Relation rel, Relation idx_rel, TupleTableSlot *slot);
-static bool find_pkey_tuple(ScanKey skey, BDRRelation *rel, Relation idx_rel, TupleTableSlot *slot,
- bool lock, LockTupleMode mode);
-
-static void UserTableUpdateIndexes(EState *estate, TupleTableSlot *slot);
-static void UserTableUpdateOpenIndexes(EState *estate, TupleTableSlot *slot);
-static EState *bdr_create_rel_estate(Relation rel);
-
static BDRRelation *read_rel(StringInfo s, LOCKMODE mode);
extern void read_tuple_parts(StringInfo s, BDRRelation *rel, BDRTupleData *tup);
static void read_tuple(StringInfo s, BDRRelation *rel, TupleTableSlot *slot);
}
}
-static EState *
-bdr_create_rel_estate(Relation rel)
-{
- EState *estate;
- ResultRelInfo *resultRelInfo;
-
- estate = CreateExecutorState();
-
- resultRelInfo = makeNode(ResultRelInfo);
- resultRelInfo->ri_RangeTableIndex = 1; /* dummy */
- resultRelInfo->ri_RelationDesc = rel;
- resultRelInfo->ri_TrigInstrument = NULL;
-
- estate->es_result_relations = resultRelInfo;
- estate->es_num_result_relations = 1;
- estate->es_result_relation_info = resultRelInfo;
-
- return estate;
-}
-
-static void
-UserTableUpdateIndexes(EState *estate, TupleTableSlot *slot)
-{
- /* HOT update does not require index inserts */
- if (HeapTupleIsHeapOnly(slot->tts_tuple))
- return;
-
- ExecOpenIndices(estate->es_result_relation_info);
- UserTableUpdateOpenIndexes(estate, slot);
- ExecCloseIndices(estate->es_result_relation_info);
-}
-
-static void
-UserTableUpdateOpenIndexes(EState *estate, TupleTableSlot *slot)
-{
- List *recheckIndexes = NIL;
-
- /* HOT update does not require index inserts */
- if (HeapTupleIsHeapOnly(slot->tts_tuple))
- return;
-
- if (estate->es_result_relation_info->ri_NumIndices > 0)
- {
- recheckIndexes = ExecInsertIndexTuples(slot,
- &slot->tts_tuple->t_self,
- estate);
-
- if (recheckIndexes != NIL)
- ereport(ERROR,
- (errmsg("bdr doesn't support index rechecks")));
- }
-
- /* FIXME: recheck the indexes */
- list_free(recheckIndexes);
-}
-
-static void
-build_index_scan_keys(EState *estate, ScanKey *scan_keys, TupleTableSlot *slot)
-{
- ResultRelInfo *relinfo;
- int i;
-
- relinfo = estate->es_result_relation_info;
-
- /* build scankeys for each index */
- for (i = 0; i < relinfo->ri_NumIndices; i++)
- {
- IndexInfo *ii = relinfo->ri_IndexRelationInfo[i];
-
- if (!ii->ii_Unique)
- continue;
-
- scan_keys[i] = palloc(ii->ii_NumIndexAttrs * sizeof(ScanKeyData));
- build_index_scan_key(scan_keys[i],
- relinfo->ri_RelationDesc,
- relinfo->ri_IndexRelationDescs[i],
- slot);
- }
-}
-
-/*
- * Setup a ScanKey for a search in the relation 'rel' for a tuple 'key' that
- * is setup to match 'rel' (*NOT* idxrel!).
- */
-static void
-build_index_scan_key(ScanKey skey, Relation rel, Relation idxrel, TupleTableSlot *slot)
-{
- int attoff;
- Datum indclassDatum;
- Datum indkeyDatum;
- bool isnull;
- oidvector *opclass;
- int2vector *indkey;
- HeapTuple key = slot->tts_tuple;
-
- indclassDatum = SysCacheGetAttr(INDEXRELID, idxrel->rd_indextuple,
- Anum_pg_index_indclass, &isnull);
- Assert(!isnull);
- opclass = (oidvector *) DatumGetPointer(indclassDatum);
-
- indkeyDatum = SysCacheGetAttr(INDEXRELID, idxrel->rd_indextuple,
- Anum_pg_index_indkey, &isnull);
- Assert(!isnull);
- indkey = (int2vector *) DatumGetPointer(indkeyDatum);
-
-
- for (attoff = 0; attoff < RelationGetNumberOfAttributes(idxrel); attoff++)
- {
- Oid operator;
- Oid opfamily;
- RegProcedure regop;
- int pkattno = attoff + 1;
- int mainattno = indkey->values[attoff];
- Oid atttype = attnumTypeId(rel, mainattno);
- Oid optype = get_opclass_input_type(opclass->values[attoff]);
-
- opfamily = get_opclass_family(opclass->values[attoff]);
-
- operator = get_opfamily_member(opfamily, optype,
- optype,
- BTEqualStrategyNumber);
-
- if (!OidIsValid(operator))
- elog(ERROR,
- "could not lookup equality operator for type %u, optype %u in opfamily %u",
- atttype, optype, opfamily);
-
- regop = get_opcode(operator);
-
- /* FIXME: convert type? */
- ScanKeyInit(&skey[attoff],
- pkattno,
- BTEqualStrategyNumber,
- regop,
- fastgetattr(key, mainattno,
- RelationGetDescr(rel), &isnull));
- if (isnull)
- elog(ERROR, "index tuple with a null column");
- }
-}
-
-/*
- * Search the index 'idxrel' for a tuple identified by 'skey' in 'rel'.
- *
- * If a matching tuple is found setup 'tid' to point to it and return true,
- * false is returned otherwise.
- */
-static bool
-find_pkey_tuple(ScanKey skey, BDRRelation *rel, Relation idxrel,
- TupleTableSlot *slot, bool lock, LockTupleMode mode)
-{
- HeapTuple scantuple;
- bool found;
- IndexScanDesc scan;
- SnapshotData snap;
- TransactionId xwait;
-
- InitDirtySnapshot(snap);
- scan = index_beginscan(rel->rel, idxrel,
- &snap,
- RelationGetNumberOfAttributes(idxrel),
- 0);
-
-retry:
- found = false;
-
- index_rescan(scan, skey, RelationGetNumberOfAttributes(idxrel), NULL, 0);
-
- if ((scantuple = index_getnext(scan, ForwardScanDirection)) != NULL)
- {
- found = true;
- /* FIXME: Improve TupleSlot to not require copying the whole tuple */
- ExecStoreTuple(scantuple, slot, InvalidBuffer, false);
- ExecMaterializeSlot(slot);
-
- xwait = TransactionIdIsValid(snap.xmin) ?
- snap.xmin : snap.xmax;
-
- if (TransactionIdIsValid(xwait))
- {
- XactLockTableWait(xwait, NULL, NULL, XLTW_None);
- goto retry;
- }
- }
-
- if (lock && found)
- {
- Buffer buf;
- HeapUpdateFailureData hufd;
- HTSU_Result res;
- HeapTupleData locktup;
-
- ItemPointerCopy(&slot->tts_tuple->t_self, &locktup.t_self);
-
- PushActiveSnapshot(GetLatestSnapshot());
-
- res = heap_lock_tuple(rel->rel, &locktup, GetCurrentCommandId(false), mode,
- false /* wait */,
- false /* don't follow updates */,
- &buf, &hufd);
- /* the tuple slot already has the buffer pinned */
- ReleaseBuffer(buf);
-
- PopActiveSnapshot();
-
- switch (res)
- {
- case HeapTupleMayBeUpdated:
- break;
- case HeapTupleUpdated:
- /* XXX: Improve handling here */
- ereport(LOG,
- (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
- errmsg("concurrent update, retrying")));
- goto retry;
- default:
- elog(ERROR, "unexpected HTSU_Result after locking: %u", res);
- break;
- }
- }
-
- index_endscan(scan);
-
- return found;
-}
-
-
/*
* Read a remote action type and process the action record.
*
--- /dev/null
+/* -------------------------------------------------------------------------
+ *
+ * bdr_executor.c
+ * Relation and index access and maintenance routines required by bdr
+ *
+ * BDR does a lot of direct access to indexes and relations, some of which
+ * isn't handled by simple calls into the backend. Most of it lives here.
+ *
+ * Copyright (C) 2012-2014, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * contrib/bdr/bdr_executor.c
+ *
+ * -------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "bdr.h"
+
+#include "access/heapam.h"
+#include "access/skey.h"
+#include "access/xact.h"
+
+#include "executor/executor.h"
+#include "executor/tuptable.h"
+
+#include "nodes/execnodes.h"
+
+#include "parser/parse_relation.h"
+
+#include "storage/bufmgr.h"
+#include "storage/lmgr.h"
+
+#include "utils/lsyscache.h"
+#include "utils/snapmgr.h"
+#include "utils/syscache.h"
+#include "utils/tqual.h"
+
+EState *
+bdr_create_rel_estate(Relation rel)
+{
+ EState *estate;
+ ResultRelInfo *resultRelInfo;
+
+ estate = CreateExecutorState();
+
+ resultRelInfo = makeNode(ResultRelInfo);
+ resultRelInfo->ri_RangeTableIndex = 1; /* dummy */
+ resultRelInfo->ri_RelationDesc = rel;
+ resultRelInfo->ri_TrigInstrument = NULL;
+
+ estate->es_result_relations = resultRelInfo;
+ estate->es_num_result_relations = 1;
+ estate->es_result_relation_info = resultRelInfo;
+
+ return estate;
+}
+
+void
+UserTableUpdateIndexes(EState *estate, TupleTableSlot *slot)
+{
+ /* HOT update does not require index inserts */
+ if (HeapTupleIsHeapOnly(slot->tts_tuple))
+ return;
+
+ ExecOpenIndices(estate->es_result_relation_info);
+ UserTableUpdateOpenIndexes(estate, slot);
+ ExecCloseIndices(estate->es_result_relation_info);
+}
+
+void
+UserTableUpdateOpenIndexes(EState *estate, TupleTableSlot *slot)
+{
+ List *recheckIndexes = NIL;
+
+ /* HOT update does not require index inserts */
+ if (HeapTupleIsHeapOnly(slot->tts_tuple))
+ return;
+
+ if (estate->es_result_relation_info->ri_NumIndices > 0)
+ {
+ recheckIndexes = ExecInsertIndexTuples(slot,
+ &slot->tts_tuple->t_self,
+ estate);
+
+ if (recheckIndexes != NIL)
+ ereport(ERROR,
+ (errmsg("bdr doesn't support index rechecks")));
+ }
+
+ /* FIXME: recheck the indexes */
+ list_free(recheckIndexes);
+}
+
+void
+build_index_scan_keys(EState *estate, ScanKey *scan_keys, TupleTableSlot *slot)
+{
+ ResultRelInfo *relinfo;
+ int i;
+
+ relinfo = estate->es_result_relation_info;
+
+ /* build scankeys for each index */
+ for (i = 0; i < relinfo->ri_NumIndices; i++)
+ {
+ IndexInfo *ii = relinfo->ri_IndexRelationInfo[i];
+
+ if (!ii->ii_Unique)
+ continue;
+
+ scan_keys[i] = palloc(ii->ii_NumIndexAttrs * sizeof(ScanKeyData));
+ build_index_scan_key(scan_keys[i],
+ relinfo->ri_RelationDesc,
+ relinfo->ri_IndexRelationDescs[i],
+ slot);
+ }
+}
+
+/*
+ * Setup a ScanKey for a search in the relation 'rel' for a tuple 'key' that
+ * is setup to match 'rel' (*NOT* idxrel!).
+ */
+void
+build_index_scan_key(ScanKey skey, Relation rel, Relation idxrel, TupleTableSlot *slot)
+{
+ int attoff;
+ Datum indclassDatum;
+ Datum indkeyDatum;
+ bool isnull;
+ oidvector *opclass;
+ int2vector *indkey;
+ HeapTuple key = slot->tts_tuple;
+
+ indclassDatum = SysCacheGetAttr(INDEXRELID, idxrel->rd_indextuple,
+ Anum_pg_index_indclass, &isnull);
+ Assert(!isnull);
+ opclass = (oidvector *) DatumGetPointer(indclassDatum);
+
+ indkeyDatum = SysCacheGetAttr(INDEXRELID, idxrel->rd_indextuple,
+ Anum_pg_index_indkey, &isnull);
+ Assert(!isnull);
+ indkey = (int2vector *) DatumGetPointer(indkeyDatum);
+
+
+ for (attoff = 0; attoff < RelationGetNumberOfAttributes(idxrel); attoff++)
+ {
+ Oid operator;
+ Oid opfamily;
+ RegProcedure regop;
+ int pkattno = attoff + 1;
+ int mainattno = indkey->values[attoff];
+ Oid atttype = attnumTypeId(rel, mainattno);
+ Oid optype = get_opclass_input_type(opclass->values[attoff]);
+
+ opfamily = get_opclass_family(opclass->values[attoff]);
+
+ operator = get_opfamily_member(opfamily, optype,
+ optype,
+ BTEqualStrategyNumber);
+
+ if (!OidIsValid(operator))
+ elog(ERROR,
+ "could not lookup equality operator for type %u, optype %u in opfamily %u",
+ atttype, optype, opfamily);
+
+ regop = get_opcode(operator);
+
+ /* FIXME: convert type? */
+ ScanKeyInit(&skey[attoff],
+ pkattno,
+ BTEqualStrategyNumber,
+ regop,
+ fastgetattr(key, mainattno,
+ RelationGetDescr(rel), &isnull));
+ if (isnull)
+ elog(ERROR, "index tuple with a null column");
+ }
+}
+
+/*
+ * Search the index 'idxrel' for a tuple identified by 'skey' in 'rel'.
+ *
+ * If a matching tuple is found setup 'tid' to point to it and return true,
+ * false is returned otherwise.
+ */
+bool
+find_pkey_tuple(ScanKey skey, BDRRelation *rel, Relation idxrel,
+ TupleTableSlot *slot, bool lock, LockTupleMode mode)
+{
+ HeapTuple scantuple;
+ bool found;
+ IndexScanDesc scan;
+ SnapshotData snap;
+ TransactionId xwait;
+
+ InitDirtySnapshot(snap);
+ scan = index_beginscan(rel->rel, idxrel,
+ &snap,
+ RelationGetNumberOfAttributes(idxrel),
+ 0);
+
+retry:
+ found = false;
+
+ index_rescan(scan, skey, RelationGetNumberOfAttributes(idxrel), NULL, 0);
+
+ if ((scantuple = index_getnext(scan, ForwardScanDirection)) != NULL)
+ {
+ found = true;
+ /* FIXME: Improve TupleSlot to not require copying the whole tuple */
+ ExecStoreTuple(scantuple, slot, InvalidBuffer, false);
+ ExecMaterializeSlot(slot);
+
+ xwait = TransactionIdIsValid(snap.xmin) ?
+ snap.xmin : snap.xmax;
+
+ if (TransactionIdIsValid(xwait))
+ {
+ XactLockTableWait(xwait, NULL, NULL, XLTW_None);
+ goto retry;
+ }
+ }
+
+ if (lock && found)
+ {
+ Buffer buf;
+ HeapUpdateFailureData hufd;
+ HTSU_Result res;
+ HeapTupleData locktup;
+
+ ItemPointerCopy(&slot->tts_tuple->t_self, &locktup.t_self);
+
+ PushActiveSnapshot(GetLatestSnapshot());
+
+ res = heap_lock_tuple(rel->rel, &locktup, GetCurrentCommandId(false), mode,
+ false /* wait */,
+ false /* don't follow updates */,
+ &buf, &hufd);
+ /* the tuple slot already has the buffer pinned */
+ ReleaseBuffer(buf);
+
+ PopActiveSnapshot();
+
+ switch (res)
+ {
+ case HeapTupleMayBeUpdated:
+ break;
+ case HeapTupleUpdated:
+ /* XXX: Improve handling here */
+ ereport(LOG,
+ (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+ errmsg("concurrent update, retrying")));
+ goto retry;
+ default:
+ elog(ERROR, "unexpected HTSU_Result after locking: %u", res);
+ break;
+ }
+ }
+
+ index_endscan(scan);
+
+ return found;
+}
+
MODULE_big = bdr
OBJS = bdr.o bdr_apply.o bdr_compat.o bdr_commandfilter.o bdr_count.o \
- bdr_seq.o bdr_init_replica.o bdr_relcache.o bdr_conflict_handlers.o
+ bdr_seq.o bdr_init_replica.o bdr_relcache.o bdr_conflict_handlers.o \
+ bdr_executor.o
EXTENSION = bdr
DATA = bdr--0.5.sql
$bdr_apply->AddFiles('contrib\bdr', 'bdr.c', 'bdr_apply.c',
'bdr_commandfilter.c', 'bdr_compat.c',
'bdr_count.c', 'bdr_seq.c', 'bdr_init_replica.c',
- 'bdr_relcache.c', 'bdr_conflict_handlers.c');
+ 'bdr_relcache.c', 'bdr_conflict_handlers.c',
+ 'bdr_executor.c');
$bdr_apply->AddReference($postgres);
$bdr_apply->AddLibrary('wsock32.lib');
$bdr_apply->AddIncludeDir('src\interfaces\libpq');