{
PGconn *streamConn;
PGresult *res;
+ char *copybuf = NULL;
int fd;
StringInfoData query;
XLogRecPtr last_received = InvalidXLogRecPtr;
/* int ret; */
int rc;
int r;
- char *copybuf = NULL;
/*
* Background workers mustn't call usleep() or any direct equivalent:
ResetLatch(&MyProc->procLatch);
+ MemoryContextSwitchTo(MessageContext);
+
/* emergency bailout if postmaster has died */
if (rc & WL_POSTMASTER_DEATH)
proc_exit(1);
if (exit_worker)
break;
+ if (copybuf != NULL)
+ {
+ PQfreemem(copybuf);
+ copybuf = NULL;
+ }
+
r = PQgetCopyData(streamConn, ©buf, 1);
if (r == -1)
int c;
StringInfoData s;
+ MemoryContextSwitchTo(MessageContext);
+
initStringInfo(&s);
s.data = copybuf;
s.len = r;
+ s.maxlen = -1;
c = pq_getmsgbyte(&s);
/* other message types are purposefully ignored */
}
- MemoryContextResetAndDeleteChildren(MessageContext);
}
/* confirm all writes at once */
ResetLatch(&MyProc->procLatch);
rc = WaitLatch(&MyProc->procLatch, WL_TIMEOUT, 1000L);
}
+ MemoryContextResetAndDeleteChildren(MessageContext);
}
proc_exit(0);
Assert(bdr_apply_worker != NULL);
- /*
- * Read tuple into a context that's long lived enough for CONCURRENTLY
- * processing.
- */
- MemoryContextSwitchTo(MessageContext);
rel = read_rel(s, RowExclusiveLock);
action = pq_getmsgbyte(s);
bdr_performing_work(void)
{
if (started_transaction)
+ {
+ if (CurrentMemoryContext != TopTransactionContext)
+ MemoryContextSwitchTo(TopTransactionContext);
return false;
+ }
started_transaction = true;
StartTransactionCommand();