Feature: implement NegotiateProtocolVersion message.
authorTatsuo Ishii <ishii@postgresql.org>
Wed, 9 Jul 2025 06:58:16 +0000 (15:58 +0900)
committerTatsuo Ishii <ishii@postgresql.org>
Wed, 9 Jul 2025 06:58:16 +0000 (15:58 +0900)
Implementing the message is necessary when frontend requests the
protocol version 3.2 (i.e. PostgreSQL 18+ or compatible clients),
while backend still only supports 3.0 (i.e. backend is PostgreSQL 17
or before).

This commit handles the message so that the message is forwarded from
backend to frontend when there's no connection cache exists.

If connection cache exists, pgpool sends the message, which has been
saved at the time when the connection cache was created, to frontend.

Note that the frontend/backend protocol 3.2 changes the BackendKeyData
message format, but it's not implemented in this commit yet. This
means that still pgpool cannot handle 3.2 protocol.

Discussion: https://www.postgresql.org/message-id/20250708.112133.1324153277751075866.ishii%40postgresql.org

src/auth/pool_auth.c
src/include/pool.h
src/protocol/child.c
src/protocol/pool_connection_pool.c

index be9f334340d3d142b24d0bbd791c15565e9ca5fa..54d646bc3405ed55512cd8d367c06b2949419e90 100644 (file)
@@ -79,6 +79,7 @@ static void authenticate_frontend_SCRAM(POOL_CONNECTION * backend, POOL_CONNECTI
 static void authenticate_frontend_clear_text(POOL_CONNECTION * frontend);
 static bool get_auth_password(POOL_CONNECTION * backend, POOL_CONNECTION * frontend, int reauth,
                                  char **password, PasswordType *passwordType);
+static void ProcessNegotiateProtocol(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * cp);
 
 /*
  * Do authentication. Assuming the only caller is
@@ -342,6 +343,7 @@ pool_do_auth(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * cp)
 
        protoMajor = MAIN_CONNECTION(cp)->sp->major;
 
+read_kind:
        kind = pool_read_kind(cp);
        if (kind < 0)
                ereport(ERROR,
@@ -365,6 +367,12 @@ pool_do_auth(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * cp)
                                 errdetail("backend response with kind \'E\' when expecting \'R\'"),
                                 errhint("This issue can be caused by version mismatch (current version %d)", protoMajor)));
        }
+       else if (kind == 'v')
+       {
+               /* NegotiateProtocolVersion received */
+               ProcessNegotiateProtocol(frontend, cp);
+               goto read_kind;
+       }
        else if (kind != 'R')
                ereport(ERROR,
                                (errmsg("backend authentication failed"),
@@ -597,8 +605,11 @@ pool_do_auth(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * cp)
 
                }
 
-               send_auth_ok(frontend, protoMajor);
-               authkind = 0;
+               if (kind == 'R')
+               {
+                       send_auth_ok(frontend, protoMajor);
+                       authkind = 0;
+               }
        }
 
        else
@@ -756,7 +767,16 @@ pool_do_auth(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * cp)
                        CONNECTION_SLOT(cp, i)->key = cp->info[i].key = key;
 
                        cp->info[i].major = sp->major;
-                       cp->info[i].minor = sp->minor;
+
+                       /*
+                        * If NegotiateProtocol message has been received, set the minor
+                        * version. Othewise use the version in the StartupMessage.
+                        */
+                       if (CONNECTION_SLOT(cp, i)->nplen > 0)
+                               cp->info[i].minor = CONNECTION_SLOT(cp, i)->negotiated_minor;
+                       else
+                               cp->info[i].minor = sp->minor;
+
                        strlcpy(cp->info[i].database, sp->database, sizeof(cp->info[i].database));
                        strlcpy(cp->info[i].user, sp->user, sizeof(cp->info[i].user));
                        cp->info[i].counter = 1;
@@ -779,16 +799,31 @@ pool_do_auth(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * cp)
 }
 
 /*
-* do re-authentication for reused connection. if success return 0 otherwise throws ereport.
+* do re-authentication for reused connection. if success return 0 otherwise
+* throws ereport.
 */
 int
 pool_do_reauth(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * cp)
 {
        int                     protoMajor;
        int                     msglen;
+       POOL_CONNECTION_POOL_SLOT       *sp;
 
        protoMajor = MAJOR(cp);
 
+       /*
+        * If NegotiateProtocolMsg has been received from backend, forward it to
+        * frontend. If the frontend dislike it, it will disconnect the
+        * connection. Otherwise it will silently continue.
+        */
+       sp = CONNECTION_SLOT(cp, MAIN_NODE_ID);
+       if (protoMajor == PROTO_MAJOR_V3 && sp->nplen > 0)
+       {
+               elog(DEBUG1, "negotiateProtocol message is forwarded to frontend at reauth");
+               pool_write_and_flush(frontend, sp->negotiateProtocolMsg,
+                                                        sp->nplen);
+       }
+
        /*
         * if hba is enabled we would already have passed authentication
         */
@@ -822,6 +857,9 @@ pool_do_reauth(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * cp)
                }
        }
 
+       /*
+        * Send auth ok
+        */
        pool_write(frontend, "R", 1);
 
        if (protoMajor == PROTO_MAJOR_V3)
@@ -832,7 +870,10 @@ pool_do_reauth(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * cp)
 
        msglen = htonl(0);
        pool_write_and_flush(frontend, &msglen, sizeof(msglen));
+
+       /* send BackendKeyData */
        pool_send_backend_key_data(frontend, MAIN_CONNECTION(cp)->pid, MAIN_CONNECTION(cp)->key, protoMajor);
+
        return 0;
 }
 
@@ -2074,3 +2115,70 @@ pg_SASL_continue(POOL_CONNECTION * backend, char *payload, int payloadlen, void
 
        return 0;
 }
+
+/*
+ * Forward NegotiateProtocol message to frontend.
+ *
+ * When this function is called, message kind has been already read.
+ */
+static void
+ProcessNegotiateProtocol(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *cp)
+{
+       int32   len;
+       int32   savelen;
+       int32   protoMajor;
+       int32   protoMinor;
+       int32   protov;
+       bool    forwardMsg = false;
+       int             i;
+
+       elog(DEBUG1, "Forwarding NegotiateProtocol message to frontend");
+       pool_write(frontend, "v", 1);   /* forward message kind */
+       savelen = len = pool_read_int(cp);              /* message length including self */
+       pool_write(frontend, &len, 4);  /* forward message length */
+       len = ntohl(len) - 4;                   /* length of rest of the message */
+       protov = pool_read_int(cp);     /* read protocol version */
+       protoMajor = PG_PROTOCOL_MAJOR(ntohl(protov));          /* protocol major version */
+       protoMinor = PG_PROTOCOL_MINOR(ntohl(protov));  /* protocol minor version */
+       pool_write(frontend, &protov, 4);       /* forward protocol version */
+       elog(DEBUG1, "protocol verion offered: major: %d minor: %d", protoMajor, protoMinor);
+       len -= 4;
+       for (i = 0; i < NUM_BACKENDS; i++)
+       {
+               if (VALID_BACKEND(i))
+               {
+                       POOL_CONNECTION_POOL_SLOT       *sp;
+                       char    *p;
+                       char    *np;
+                       Size    nplen;
+
+                       p = pool_read2(CONNECTION(cp, i), len);
+                       if (!forwardMsg)
+                       {
+                               pool_write_and_flush(frontend, p, len); /* forward rest of message */
+                               forwardMsg = true;
+                       }
+                       /* save negatiate protocol version */
+                       sp = CONNECTION_SLOT(cp, i);
+                       sp->negotiated_major = protoMajor;
+                       sp->negotiated_minor = protoMinor;
+
+                       /* save negatiate protocol message */
+                       nplen = 1 +     /* message kind */
+                               sizeof(savelen) +       /* message length */
+                               sizeof(protov) +        /* protocol version */
+                               len;                            /* rest of message */
+                       /* allocate message area */
+                       sp->negotiateProtocolMsg = MemoryContextAlloc(TopMemoryContext, nplen);
+                       np = sp->negotiateProtocolMsg;
+                       sp->nplen = nplen;      /* set message length */
+
+                       *np++ = 'v';
+                       memcpy(np, &savelen, sizeof(savelen));
+                       np += sizeof(savelen);
+                       memcpy(np, &protov, sizeof(protov));
+                       np += sizeof(protov);
+                       memcpy(np, p, len);
+               }
+       }
+}
index c9b4dc27e6e1ada9144c27afbfea78db4e58026f..28cf1757ca63c3dea278365c0ac50e8b72fc3f57 100644 (file)
@@ -4,7 +4,9 @@
  * pgpool: a language independent connection pool server for PostgreSQL
  * written by Tatsuo Ishii
  *
- * Copyright (c) 2003-2024     PgPool Global Development Group
+ * Portions Copyright (c) 2003-2025    PgPool Global Development Group
+ * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
  *
  * Permission to use, copy, modify, and distribute this software and
  * its documentation for any purpose and without fee is hereby
@@ -103,6 +105,41 @@ typedef enum
        POOL_SOCKET_EOF
 }                      POOL_SOCKET_STATE;
 
+/*
+ * Imported from src/include/libpq/pqcomm.h as of PostgreSQL 18.
+ *
+ * These manipulate the frontend/backend protocol version number.
+ *
+ * The major number should be incremented for incompatible changes.  The minor
+ * number should be incremented for compatible changes (eg. additional
+ * functionality).
+ *
+ * If a backend supports version m.n of the protocol it must actually support
+ * versions m.[0..n].  Backend support for version m-1 can be dropped after a
+ * `reasonable' length of time.
+ *
+ * A frontend isn't required to support anything other than the current
+ * version.
+ */
+
+#define PG_PROTOCOL_MAJOR(v)   ((v) >> 16)
+#define PG_PROTOCOL_MINOR(v)   ((v) & 0x0000ffff)
+#define PG_PROTOCOL_FULL(v)    (PG_PROTOCOL_MAJOR(v) * 10000 + PG_PROTOCOL_MINOR(v))
+#define PG_PROTOCOL(m,n)       (((m) << 16) | (n))
+
+/*
+ * The earliest and latest frontend/backend protocol version supported.
+ */
+
+#define PG_PROTOCOL_EARLIEST   PG_PROTOCOL(3,0)
+#define PG_PROTOCOL_LATEST             PG_PROTOCOL(3,2)
+
+typedef uint32 ProtocolVersion; /* FE/BE protocol version number */
+
+typedef ProtocolVersion MsgType;
+
+/* end of importing */
+
 /* protocol major version numbers */
 #define PROTO_MAJOR_V2 2
 #define PROTO_MAJOR_V3 3
@@ -262,6 +299,15 @@ typedef struct
        time_t          closetime;              /* absolute time in second when the connection
                                                                 * closed if 0, that means the connection is
                                                                 * under use. */
+       /*
+        * Protocol version after negotiation. If nplen == 0, no negotiation has
+        * been done.
+        */
+       int                     negotiated_major;
+       int                     negotiated_minor;
+       char            *negotiateProtocolMsg;  /* Raw NegotiateProtocol messag */
+       int32           nplen;                  /* message length of NegotiateProtocol messag */
+
 }                      POOL_CONNECTION_POOL_SLOT;
 
 typedef struct
index f4142f90dcc238994ead08e81515f28dd3e343c0..7aea33540ef75ffacdc4e07b74fa46f09db8ced4 100644 (file)
@@ -637,8 +637,8 @@ read_startup_packet(POOL_CONNECTION * cp)
 
        sp->len = len;
        memcpy(&protov, sp->startup_packet, sizeof(protov));
-       sp->major = ntohl(protov) >> 16;
-       sp->minor = ntohl(protov) & 0x0000ffff;
+       sp->major = PG_PROTOCOL_MAJOR(ntohl(protov));
+       sp->minor = PG_PROTOCOL_MINOR(ntohl(protov));
        cp->protoVersion = sp->major;
 
        switch (sp->major)
index 225294a1b11fc8bdcf1798888455ba675a5b94fa..666187216a94f98c5362175c2db8cf89aa889f5e 100644 (file)
@@ -235,6 +235,8 @@ pool_discard_cp(char *user, char *database, int protoMajor)
                }
                CONNECTION_SLOT(p, i)->sp = NULL;
                pool_close(CONNECTION(p, i));
+               if (CONNECTION_SLOT(p, i)->negotiateProtocolMsg)
+                       pfree(CONNECTION_SLOT(p, i)->negotiateProtocolMsg);
                pfree(CONNECTION_SLOT(p, i));
        }
 
@@ -945,7 +947,7 @@ static POOL_CONNECTION_POOL * new_connection(POOL_CONNECTION_POOL * p)
                        continue;
                }
 
-               s = palloc(sizeof(POOL_CONNECTION_POOL_SLOT));
+               s = palloc0(sizeof(POOL_CONNECTION_POOL_SLOT));
 
                if (create_cp(s, i) == NULL)
                {