From f0fa9f80a61e9b852068879399d17713f24659b0 Mon Sep 17 00:00:00 2001 From: Marko Kreen Date: Tue, 27 Mar 2012 18:29:33 +0300 Subject: [PATCH] Put per-cluster connect strings into tree --- src/cluster.c | 177 ++++++++++++++++++++++++-------------------------- src/execute.c | 72 ++++++++++++-------- src/plproxy.h | 10 ++- src/result.c | 4 +- 4 files changed, 140 insertions(+), 123 deletions(-) diff --git a/src/cluster.c b/src/cluster.c index af7ffa5..257d855 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -96,6 +96,26 @@ static int cluster_name_cmp(uintptr_t val, struct AANode *node) return strcmp(name, cluster->name); } +static int conn_cstr_cmp(uintptr_t val, struct AANode *node) +{ + const char *name = (const char *)val; + const ProxyConnection *conn = (ProxyConnection *)node; + + return strcmp(name, conn->connstr); +} + +static void conn_free(struct AANode *node, void *arg) +{ + ProxyConnection *conn = (ProxyConnection *)node; + + if (conn->res) + PQclear(conn->res); + if (conn->cur->db) + PQfinish(conn->cur->db); + pfree(conn->cur); + pfree(conn); +} + /* * Create cache memory area and prepare plans */ @@ -155,43 +175,21 @@ plproxy_cluster_plan_init(void) init_done = 1; } -static void free_state(ProxyConnectionState *st) -{ - if (!st) - return; - if (st->db) - PQfinish(st->db); - memset(st, 0, sizeof(*st)); - pfree(st); -} - /* * Drop partition and connection data from cluster. */ static void free_connlist(ProxyCluster *cluster) { - int i; - ProxyConnection *conn; + aatree_destroy(&cluster->conn_tree); - for (i = 0; i < cluster->conn_count; i++) - { - conn = &cluster->conn_list[i]; - if (conn->res) - PQclear(conn->res); - if (conn->connstr) - pfree((void *) conn->connstr); - free_state(conn->cur); - conn->cur = NULL; - } pfree(cluster->part_map); - pfree(cluster->conn_list); + pfree(cluster->active_list); cluster->part_map = NULL; cluster->part_count = 0; cluster->part_mask = 0; - cluster->conn_list = NULL; - cluster->conn_count = 0; + cluster->active_count = 0; } /* @@ -200,7 +198,7 @@ free_connlist(ProxyCluster *cluster) static ProxyConnection * add_connection(ProxyCluster *cluster, char *connstr, int part_num) { - int i; + struct AANode *node; ProxyConnection *conn = NULL; char *username; StringInfo final; @@ -214,24 +212,22 @@ add_connection(ProxyCluster *cluster, char *connstr, int part_num) username = GetUserNameFromId(GetSessionUserId()); appendStringInfo(final, " user=%s", username); } + connstr = final->data; /* check if already have it */ - for (i = 0; i < cluster->conn_count && !conn; i++) - { - ProxyConnection *c = &cluster->conn_list[i]; - - if (strcmp(c->connstr, final->data) == 0) - conn = c; - } + node = aatree_search(&cluster->conn_tree, (uintptr_t)connstr); + if (node) + conn = (ProxyConnection *)node; /* add new connection */ if (!conn) { - conn = &cluster->conn_list[cluster->conn_count]; - conn->connstr = MemoryContextStrdup(cluster_mem, final->data); + conn = MemoryContextAllocZero(cluster_mem, sizeof(ProxyConnection)); + conn->connstr = MemoryContextStrdup(cluster_mem, connstr); conn->cluster = cluster; conn->cur = MemoryContextAllocZero(cluster_mem, sizeof(ProxyConnectionState)); - cluster->conn_count++; + + aatree_insert(&cluster->conn_tree, (uintptr_t)connstr, &conn->node); } cluster->part_map[part_num] = conn; @@ -343,7 +339,7 @@ allocate_cluster_partitions(ProxyCluster *cluster, int nparts) MemoryContext old_ctx; /* free old one */ - if (cluster->conn_list) + if (cluster->part_map) free_connlist(cluster); cluster->part_count = nparts; @@ -351,9 +347,8 @@ allocate_cluster_partitions(ProxyCluster *cluster, int nparts) /* allocate lists */ old_ctx = MemoryContextSwitchTo(cluster_mem); - cluster->part_map = palloc0(nparts * sizeof(ProxyConnection *)); - cluster->conn_list = palloc0(nparts * sizeof(ProxyConnection)); + cluster->active_list = palloc0(nparts * sizeof(ProxyConnection *)); MemoryContextSwitchTo(old_ctx); } @@ -794,6 +789,8 @@ new_cluster(const char *name) cluster->name = pstrdup(name); cluster->needs_reload = true; + aatree_init(&cluster->conn_tree, conn_cstr_cmp, conn_free); + MemoryContextSwitchTo(old_ctx); return cluster; @@ -849,26 +846,26 @@ fake_cluster(ProxyFunction *func, const char *connect_str) return (ProxyCluster *)n; /* create if not */ + cluster = new_cluster(connect_str); old_ctx = MemoryContextSwitchTo(cluster_mem); - cluster = palloc0(sizeof(*cluster)); - cluster->name = pstrdup(connect_str); + cluster->needs_reload = 0; cluster->version = 1; cluster->part_count = 1; cluster->part_mask = 0; - cluster->conn_count = 1; - cluster->part_map = palloc(sizeof(ProxyConnection *)); - cluster->conn_list = palloc0(sizeof(ProxyConnection)); - conn = &cluster->conn_list[0]; - conn->cluster = cluster; - cluster->part_map[0] = conn; + cluster->part_map = palloc(cluster->part_count * sizeof(ProxyConnection *)); + cluster->active_list = palloc(cluster->part_count * sizeof(ProxyConnection *)); + conn = palloc0(sizeof(ProxyConnection)); + conn->cluster = cluster; conn->connstr = pstrdup(cluster->name); - conn->cur = palloc0(sizeof(ProxyConnectionState)); conn->cur->state = C_NONE; + aatree_insert(&cluster->conn_tree, (uintptr_t)conn->connstr, &conn->node); + cluster->part_map[0] = conn; + MemoryContextSwitchTo(old_ctx); aatree_insert(&fake_cluster_tree, (uintptr_t)connect_str, &cluster->node); @@ -954,68 +951,64 @@ plproxy_find_cluster(ProxyFunction *func, FunctionCallInfo fcinfo) return cluster; } -static void -clean_cluster(ProxyCluster *cluster, struct timeval * now) +/* + * Clean old connections and results from all clusters. + */ + +static void clean_conn(struct AANode *node, void *arg) { - ProxyConnection *conn; + ProxyConnection *conn = (ProxyConnection *)node; ProxyConnectionState *cur; - ProxyConfig *cf = &cluster->config; + ProxyConfig *cf = &conn->cluster->config; + struct timeval *now = arg; time_t age; - int i; bool drop; - for (i = 0; i < cluster->conn_count; i++) + if (conn->res) { - conn = &cluster->conn_list[i]; - if (conn->res) - { - PQclear(conn->res); - conn->res = NULL; - } + PQclear(conn->res); + conn->res = NULL; + } - cur = conn->cur; - if (!cur->db) - continue; + cur = conn->cur; + if (!cur->db) + return; - drop = false; - if (PQstatus(cur->db) != CONNECTION_OK) - { + drop = false; + if (PQstatus(cur->db) != CONNECTION_OK) + { + drop = true; + } + else if (cf->connection_lifetime <= 0) + { + /* no aging */ + } + else + { + age = now->tv_sec - cur->connect_time; + if (age >= cf->connection_lifetime) drop = true; - } - else if (cf->connection_lifetime <= 0) - { - /* no aging */ - } - else - { - age = now->tv_sec - cur->connect_time; - if (age >= cf->connection_lifetime) - drop = true; - } + } - if (drop) - { - PQfinish(cur->db); - cur->db = NULL; - cur->state = C_NONE; - } + if (drop) + { + PQfinish(cur->db); + cur->db = NULL; + cur->state = C_NONE; } } -/* - * Clean old connections and results from all clusters. - */ - -static void w_clean_cluster(struct AANode *n, void *arg) +static void clean_cluster(struct AANode *n, void *arg) { - ProxyCluster *c = (ProxyCluster *)n; + ProxyCluster *cluster = (ProxyCluster *)n; struct timeval *now = arg; - clean_cluster(c, now); + + aatree_walk(&cluster->conn_tree, AA_WALK_IN_ORDER, clean_conn, now); } void plproxy_cluster_maint(struct timeval * now) { - aatree_walk(&cluster_tree, AA_WALK_IN_ORDER, w_clean_cluster, now); - aatree_walk(&fake_cluster_tree, AA_WALK_IN_ORDER, w_clean_cluster, now); + aatree_walk(&cluster_tree, AA_WALK_IN_ORDER, clean_cluster, now); + aatree_walk(&fake_cluster_tree, AA_WALK_IN_ORDER, clean_cluster, now); } diff --git a/src/execute.c b/src/execute.c index 5eced58..e332a09 100644 --- a/src/execute.c +++ b/src/execute.c @@ -556,10 +556,10 @@ poll_conns(ProxyFunction *func, ProxyCluster *cluster) int numfds = 0; int ev = 0; - if (pfd_allocated < cluster->conn_count) + if (pfd_allocated < cluster->active_count) { struct pollfd *tmp; - int num = cluster->conn_count; + int num = cluster->active_count; if (num < 64) num = 64; if (pfd_cache == NULL) @@ -572,9 +572,9 @@ poll_conns(ProxyFunction *func, ProxyCluster *cluster) pfd_allocated = num; } - for (i = 0; i < cluster->conn_count; i++) + for (i = 0; i < cluster->active_count; i++) { - conn = &cluster->conn_list[i]; + conn = cluster->active_list[i]; if (!conn->run_tag) continue; @@ -615,9 +615,9 @@ poll_conns(ProxyFunction *func, ProxyCluster *cluster) /* now recheck the conns */ pf = pfd_cache; - for (i = 0; i < cluster->conn_count; i++) + for (i = 0; i < cluster->active_count; i++) { - conn = &cluster->conn_list[i]; + conn = cluster->active_list[i]; if (!conn->run_tag) continue; @@ -691,9 +691,9 @@ remote_execute(ProxyFunction *func) struct timeval now; /* either launch connection or send query */ - for (i = 0; i < cluster->conn_count; i++) + for (i = 0; i < cluster->active_count; i++) { - conn = &cluster->conn_list[i]; + conn = cluster->active_list[i]; if (!conn->run_tag) continue; @@ -719,9 +719,9 @@ remote_execute(ProxyFunction *func) /* recheck */ pending = 0; gettimeofday(&now, NULL); - for (i = 0; i < cluster->conn_count; i++) + for (i = 0; i < cluster->active_count; i++) { - conn = &cluster->conn_list[i]; + conn = cluster->active_list[i]; if (!conn->run_tag) continue; @@ -737,9 +737,9 @@ remote_execute(ProxyFunction *func) } /* review results, calculate total */ - for (i = 0; i < cluster->conn_count; i++) + for (i = 0; i < cluster->active_count; i++) { - conn = &cluster->conn_list[i]; + conn = cluster->active_list[i]; if ((conn->run_tag || conn->res) && !(conn->run_tag && conn->res)) @@ -775,9 +775,9 @@ remote_cancel(ProxyFunction *func) if (cluster == NULL) return; - for (i = 0; i < cluster->conn_count; i++) + for (i = 0; i < cluster->active_count; i++) { - conn = &cluster->conn_list[i]; + conn = cluster->active_list[i]; if (conn->cur->state == C_NONE || conn->cur->state == C_READY || conn->cur->state == C_DONE) @@ -796,6 +796,22 @@ remote_cancel(ProxyFunction *func) } } +/* + * Tag & move tagged connections to active list + */ + +static void tag_part(struct ProxyCluster *cluster, int i, int tag) +{ + ProxyConnection *conn = cluster->part_map[i]; + + if (!conn->run_tag) + { + cluster->active_list[cluster->active_count] = conn; + cluster->active_count++; + } + conn->run_tag = tag; +} + /* * Run hash function and tag connections. If any of the hash function * arguments are mentioned in the split_arrays an element of the array @@ -838,7 +854,7 @@ tag_hash_partitions(ProxyFunction *func, FunctionCallInfo fcinfo, int tag, plproxy_error(func, "Hash result must be int2, int4 or int8"); hashval &= cluster->part_mask; - cluster->part_map[hashval]->run_tag = tag; + tag_part(cluster, hashval, tag); } /* sanity check */ @@ -889,17 +905,17 @@ tag_run_on_partitions(ProxyFunction *func, FunctionCallInfo fcinfo, int tag, break; case R_ALL: for (i = 0; i < cluster->part_count; i++) - cluster->part_map[i]->run_tag = tag; + tag_part(cluster, i, tag); break; case R_EXACT: i = func->exact_nr; if (i < 0 || i >= cluster->part_count) plproxy_error(func, "part number out of range"); - cluster->part_map[i]->run_tag = tag; + tag_part(cluster, i, tag); break; case R_ANY: i = random() & cluster->part_mask; - cluster->part_map[i]->run_tag = tag; + tag_part(cluster, i, tag); break; default: plproxy_error(func, "uninitialized run_type"); @@ -979,9 +995,9 @@ prepare_and_tag_partitions(ProxyFunction *func, FunctionCallInfo fcinfo) tag_run_on_partitions(func, fcinfo, my_tag, arrays_to_split, row); /* Add the array elements to the partitions tagged in previous step */ - for (part = 0; part < cluster->conn_count; part++) + for (part = 0; part < cluster->active_count; part++) { - ProxyConnection *conn = &cluster->conn_list[part]; + ProxyConnection *conn = cluster->active_list[part]; if (conn->run_tag != my_tag) continue; @@ -1008,9 +1024,9 @@ prepare_and_tag_partitions(ProxyFunction *func, FunctionCallInfo fcinfo) * Finally, copy the accumulated arrays to the actual connections * to be used as parameters. */ - for (i = 0; i < cluster->conn_count; i++) + for (i = 0; i < cluster->active_count; i++) { - ProxyConnection *conn = &cluster->conn_list[i]; + ProxyConnection *conn = cluster->active_list[i]; if (!conn->run_tag) continue; @@ -1056,9 +1072,9 @@ prepare_query_parameters(ProxyFunction *func, FunctionCallInfo fcinfo) } /* Add the parameters to partitions */ - for (part = 0; part < cluster->conn_count; part++) + for (part = 0; part < cluster->active_count; part++) { - ProxyConnection *conn = &cluster->conn_list[part]; + ProxyConnection *conn = cluster->active_list[part]; if (!conn->run_tag) continue; @@ -1103,9 +1119,9 @@ plproxy_clean_results(ProxyCluster *cluster) cluster->ret_total = 0; cluster->ret_cur_conn = 0; - for (i = 0; i < cluster->conn_count; i++) + for (i = 0; i < cluster->active_count; i++) { - conn = &cluster->conn_list[i]; + conn = cluster->active_list[i]; if (conn->res) { PQclear(conn->res); @@ -1115,6 +1131,10 @@ plproxy_clean_results(ProxyCluster *cluster) conn->run_tag = 0; conn->bstate = NULL; } + + /* reset active_list */ + cluster->active_count = 0; + /* conn state checks are done in prepare_conn */ } diff --git a/src/plproxy.h b/src/plproxy.h index 4715e15..f9e4573 100644 --- a/src/plproxy.h +++ b/src/plproxy.h @@ -148,6 +148,8 @@ typedef struct ProxyConnectionState { /* Single database connection */ typedef struct ProxyConnection { + struct AANode node; + struct ProxyCluster *cluster; const char *connstr; /* Connection string for libpq */ @@ -185,10 +187,12 @@ typedef struct ProxyCluster int part_count; /* Number of partitions - power of 2 */ int part_mask; /* Mask to use to get part number from hash */ - ProxyConnection **part_map; /* Pointers to conn_list */ + ProxyConnection **part_map; /* Pointers to ProxyConnections */ + + int active_count; /* number of active connections */ + ProxyConnection **active_list; /* active ProxyConnection in current query */ - int conn_count; /* Number of actual database connections */ - ProxyConnection *conn_list; /* List of actual database connections */ + struct AATree conn_tree; /* connstr -> ProxyConnection */ int ret_cur_conn; /* Result walking: index of current conn */ int ret_cur_pos; /* Result walking: index of current row */ diff --git a/src/result.c b/src/result.c index 8b4bf2b..1b21e8b 100644 --- a/src/result.c +++ b/src/result.c @@ -118,10 +118,10 @@ walk_results(ProxyFunction *func, ProxyCluster *cluster) { ProxyConnection *conn; - for (; cluster->ret_cur_conn < cluster->conn_count; + for (; cluster->ret_cur_conn < cluster->active_count; cluster->ret_cur_conn++) { - conn = cluster->conn_list + cluster->ret_cur_conn; + conn = cluster->active_list[cluster->ret_cur_conn]; if (conn->res == NULL) continue; if (conn->pos == PQntuples(conn->res)) -- 2.39.5