From: Yoshiyuki Asaba Date: Tue, 6 Nov 2007 09:08:53 +0000 (+0000) Subject: Fix insert_lock with extended query protocol. X-Git-Url: http://git.postgresql.org/gitweb/?a=commitdiff_plain;h=d18a3b5d32255775818e755185423d8908768514;p=pgpool1.git Fix insert_lock with extended query protocol. --- diff --git a/pool_process_query.c b/pool_process_query.c index f645507..2735494 100644 --- a/pool_process_query.c +++ b/pool_process_query.c @@ -168,6 +168,7 @@ static PreparedStatement *unnamed_statement = NULL; static PreparedStatement *unnamed_portal = NULL; static int force_replication = 0; /* non 0 if force to replicate query */ static int prepare_in_session = 0; +static int receive_sync = 0; static int is_drop_database(char *query); /* returns non 0 if this is a DROP DATABASE command */ static void query_ps_status(char *query, POOL_CONNECTION_POOL *backend); /* show ps status */ @@ -1008,6 +1009,7 @@ static POOL_STATUS Parse(POOL_CONNECTION *frontend, char *name, *stmt; int deadlock_detected = 0; int checked = 0; + POOL_STATUS status; /* read Parse packet */ if (pool_read(frontend, &len, sizeof(len)) < 0) @@ -1055,6 +1057,35 @@ static POOL_STATUS Parse(POOL_CONNECTION *frontend, return POOL_END; } + if (REPLICATION && need_insert_lock(backend, stmt)) + { + char kind; + + if (TSTATE(backend) != 'T') + { + /* synchronize transaction state */ + for (i = 0; i < backend->num; i++) + { + POOL_CONNECTION *cp = backend->slots[i]->con; + + send_extended_protocol_message(cp, "S", 0, ""); + } + + kind = pool_read_kind(backend); + if (kind != 'Z') + return POOL_END; + if (ReadyForQuery(frontend, backend, 0) != POOL_CONTINUE) + return POOL_END; + } + + /* start a transaction if needed and lock the table */ + status = insert_lock(backend, stmt); + if (status != POOL_CONTINUE) + { + return status; + } + } + /* forward Parse message to backends */ for (i = 0; i < backend->num; i++) { @@ -1182,9 +1213,11 @@ static POOL_STATUS ReadyForQuery(POOL_CONNECTION *frontend, { StartupPacket *sp; char psbuf[1024]; + int len; + signed char state; /* if a transaction is started for insert lock, we need to close it. */ - if (internal_transaction_started) + if (internal_transaction_started && receive_sync == 0) { int i; int len; @@ -1215,36 +1248,37 @@ static POOL_STATUS ReadyForQuery(POOL_CONNECTION *frontend, internal_transaction_started = 0; } - if (pool_flush(frontend)) - return POOL_END; + receive_sync = 0; + + if (MAJOR(backend) == PROTO_MAJOR_V3) + { + if ((len = pool_read_message_length(backend)) < 0) + return POOL_END; + + pool_debug("ReadyForQuery: message length: %d", len); + + state = pool_read_kind(backend); + if (state < 0) + return POOL_END; + + /* set transaction state */ + pool_debug("ReadyForQuery: transaction state: %c", state); + MASTER(backend)->tstate = state; + if (REPLICATION) + SECONDARY(backend)->tstate = state; + } if (send_ready) { + if (pool_flush(frontend)) + return POOL_END; + pool_write(frontend, "Z", 1); if (MAJOR(backend) == PROTO_MAJOR_V3) { - int len; - signed char state; - - if ((len = pool_read_message_length(backend)) < 0) - return POOL_END; - - pool_debug("ReadyForQuery: message length: %d", len); - len = htonl(len); pool_write(frontend, &len, sizeof(len)); - - state = pool_read_kind(backend); - if (state < 0) - return POOL_END; - - /* set transaction state */ - pool_debug("ReadyForQuery: transaction state: %c", state); - MASTER(backend)->tstate = state; - if (REPLICATION) - SECONDARY(backend)->tstate = state; - pool_write(frontend, &state, 1); } @@ -2935,6 +2969,9 @@ POOL_STATUS SimpleForwardToBackend(char kind, POOL_CONNECTION *frontend, POOL_CO char *p; char *name = NULL; + if (kind == 'S') /* Sync message */ + receive_sync = 1; + if (pool_write(MASTER(backend), &kind, 1)) return POOL_END; if (REPLICATION) @@ -3594,7 +3631,7 @@ static int need_insert_lock(POOL_CONNECTION_POOL *backend, char *query) while (*query && isspace(*query)) query++; } - + /* * either insert_lock directive specified and without "NO INSERT LOCK" comment * or "INSERT LOCK" comment exists?