From 53094143e3c1fc9a8090cce66e73e26d58c67b93 Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Fri, 19 Feb 2016 12:07:51 -0800 Subject: [PATCH] Basic obstruction-free single producer, multiple consumer ringbuffer. This is pretty darn limited, supporting only small queues - but could easily be improved. --- src/backend/lib/Makefile | 3 +- src/backend/lib/ringbuf.c | 161 ++++++++++++++++++++++++++++++++++++++ src/include/lib/ringbuf.h | 72 +++++++++++++++++ 3 files changed, 235 insertions(+), 1 deletion(-) create mode 100644 src/backend/lib/ringbuf.c create mode 100644 src/include/lib/ringbuf.h diff --git a/src/backend/lib/Makefile b/src/backend/lib/Makefile index 3c1ee1df83..b0a63fba30 100644 --- a/src/backend/lib/Makefile +++ b/src/backend/lib/Makefile @@ -13,6 +13,7 @@ top_builddir = ../../.. include $(top_builddir)/src/Makefile.global OBJS = binaryheap.o bipartite_match.o bloomfilter.o dshash.o hyperloglog.o \ - ilist.o integerset.o knapsack.o pairingheap.o rbtree.o stringinfo.o + ilist.o integerset.o knapsack.o pairingheap.o rbtree.o ringbuf.o \ + stringinfo.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/lib/ringbuf.c b/src/backend/lib/ringbuf.c new file mode 100644 index 0000000000..3de2a4977d --- /dev/null +++ b/src/backend/lib/ringbuf.c @@ -0,0 +1,161 @@ +/*------------------------------------------------------------------------- + * + * ringbuf.c + + * Single producer, multiple consumer ringbuffer where consumption is + * obstruction-free (i.e. no progress guarantee, but a consumer that is + * stopped will not block progress). + * + * Implemented by essentially using an optimistic lock on the read side. + * + * XXX: It'd be nice if we could modify this so there's variants for push/pop + * that work for different concurrency scenarios. E.g. having spsc_push(), + * spmc_push(), ... - that'd avoid having to use different interfaces for + * different needs. + * + * Copyright (c) 2015, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/lib/ringbuf.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "lib/ringbuf.h" +#include "storage/proc.h" + +static inline uint32 +ringbuf_backendid(ringbuf *rb, uint32 pos) +{ + return pos & 0xffff0000; +} + +uint32 +ringbuf_elements(ringbuf *rb) +{ + uint32 read_off = ringbuf_pos(rb, pg_atomic_read_u32(&rb->read_state)); + uint32 write_off = ringbuf_pos(rb, rb->write_off); + + /* not wrapped around */ + if (read_off <= write_off) + { + return write_off - read_off; + } + + /* wrapped around */ + return (rb->size - read_off) + write_off; +} + +size_t +ringbuf_size(size_t nelems) +{ + Assert(nelems <= 0x0000FFFF); + return sizeof(ringbuf) + sizeof(void *) * nelems; +} + +/* + * Memory needs to be externally allocated and be at least + * ringbuf_size(nelems) large. + */ +ringbuf * +ringbuf_create(void *target, size_t nelems) +{ + ringbuf *rb = (ringbuf *) target; + + Assert(nelems <= 0x0000FFFF); + + memset(target, 0, ringbuf_size(nelems)); + + rb->size = nelems; + pg_atomic_init_u32(&rb->read_state, 0); + rb->write_off = 0; + + return rb; +} + +bool +ringbuf_push(ringbuf *rb, void *data) +{ + uint32 read_off = pg_atomic_read_u32(&rb->read_state); + + /* + * Check if full - can be outdated, but that's ok. New readers are just + * going to further consume elements, never cause the buffer to become + * full. + */ + if (ringbuf_pos(rb, read_off) + == ringbuf_pos(rb, ringbuf_advance_pos(rb, rb->write_off))) + { + return false; + } + + rb->elements[ringbuf_pos(rb, rb->write_off)] = data; + + /* + * The write adding the data needs to be visible before the corresponding + * increase of write_off is visible. + */ + pg_write_barrier(); + + rb->write_off = ringbuf_advance_pos(rb, rb->write_off); + + return true; +} + + +bool +ringbuf_pop(ringbuf *rb, void **data) +{ + void *ret; + uint32 mybackend = MyProc->backendId; + + Assert((mybackend & 0x0000ffff) == mybackend); + + while (true) + { + uint32 read_state = pg_atomic_read_u32(&rb->read_state); + uint32 read_off = ringbuf_pos(rb, read_state); + uint32 old_read_state = read_state; + + /* check if empty - can be outdated, but that's ok */ + if (read_off == ringbuf_pos(rb, rb->write_off)) + return false; + + /* + * Add our backend id to the position, to detect wrap around. + * XXX + * + * XXX: Skip if the ID already is ours. That's probably likely enough + * to warrant the additional branch. + */ + read_state = (read_state & 0x0000ffff) | mybackend << 16; + + /* + * Mix the reader position into the current read_off, otherwise + * unchanged. If the offset changed since, retry from start. + * + * NB: This also serves as the read barrier pairing with the write + * barrier in ringbuf_push(). + */ + if (!pg_atomic_compare_exchange_u32(&rb->read_state, &old_read_state, + read_state)) + continue; + old_read_state = read_state; /* with backend id mixed in */ + + /* finally read the data */ + ret = rb->elements[read_off]; + + /* compute next offset */ + read_state = ringbuf_advance_pos(rb, read_state); + + if (pg_atomic_compare_exchange_u32(&rb->read_state, &old_read_state, + read_state)) + break; + } + + *data = ret; + + return true; +} diff --git a/src/include/lib/ringbuf.h b/src/include/lib/ringbuf.h new file mode 100644 index 0000000000..3be450bb8f --- /dev/null +++ b/src/include/lib/ringbuf.h @@ -0,0 +1,72 @@ +/* + * ringbuf.h + * + * Single writer.multiple reader lockless & obstruction free ringbuffer. + * + * Copyright (c) 2015, PostgreSQL Global Development Group + * + * src/include/lib/ringbuf.h + */ +#ifndef RINGBUF_H +#define RINGBUF_H + +#include "port/atomics.h" + +typedef struct ringbuf +{ + uint32 size; + + /* 16 bit reader id, 16 bit offset */ + /* XXX: probably should be on separate cachelines */ + pg_atomic_uint32 read_state; + uint32_t write_off; + + void *elements[FLEXIBLE_ARRAY_MEMBER]; +} ringbuf; + +size_t ringbuf_size(size_t nelems); + +ringbuf *ringbuf_create(void *target, size_t nelems); + +static inline uint32 +ringbuf_pos(ringbuf *rb, uint32 pos) +{ + /* + * XXX: replacing rb->size with a bitmask op would avoid expensive + * divisions. Requiring a pow2 size seems ok. + */ + return (pos & 0x0000ffff) % rb->size; +} + +/* + * Compute the new offset, slightly complicated by the fact that we only want + * to modify the lower 16 bits. + */ +static inline uint32 +ringbuf_advance_pos(ringbuf *rb, uint32 pos) +{ + return ((ringbuf_pos(rb, pos) + 1) & 0x0000FFFF) | (pos & 0xFFFF0000); +} + +static inline bool +ringbuf_empty(ringbuf *rb) +{ + uint32 read_state = pg_atomic_read_u32(&rb->read_state); + + return ringbuf_pos(rb, read_state) == ringbuf_pos(rb, rb->write_off); +} + +static inline bool +ringbuf_full(ringbuf *rb) +{ + uint32 read_state = pg_atomic_read_u32(&rb->read_state); + + return ringbuf_pos(rb, read_state) == + ringbuf_pos(rb, ringbuf_advance_pos(rb, rb->write_off)); +} + +uint32 ringbuf_elements(ringbuf *rb); +bool ringbuf_push(ringbuf *rb, void *data); +bool ringbuf_pop(ringbuf *rb, void **data); + +#endif -- 2.39.5