Basic obstruction-free single producer, multiple consumer ringbuffer.
authorAndres Freund <andres@anarazel.de>
Fri, 19 Feb 2016 20:07:51 +0000 (12:07 -0800)
committerAndres Freund <andres@anarazel.de>
Mon, 10 Jun 2019 23:27:01 +0000 (16:27 -0700)
This is pretty darn limited, supporting only small queues - but could
easily be improved.

src/backend/lib/Makefile
src/backend/lib/ringbuf.c [new file with mode: 0644]
src/include/lib/ringbuf.h [new file with mode: 0644]

index 3c1ee1df83abccc3619f69d0023655a3f336f1fb..b0a63fba3091ff6d089f13b5054466a5d453f722 100644 (file)
@@ -13,6 +13,7 @@ top_builddir = ../../..
 include $(top_builddir)/src/Makefile.global
 
 OBJS = binaryheap.o bipartite_match.o bloomfilter.o dshash.o hyperloglog.o \
-       ilist.o integerset.o knapsack.o pairingheap.o rbtree.o stringinfo.o
+       ilist.o integerset.o knapsack.o pairingheap.o rbtree.o ringbuf.o \
+       stringinfo.o
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/lib/ringbuf.c b/src/backend/lib/ringbuf.c
new file mode 100644 (file)
index 0000000..3de2a49
--- /dev/null
@@ -0,0 +1,161 @@
+/*-------------------------------------------------------------------------
+ *
+ * ringbuf.c
+
+ *   Single producer, multiple consumer ringbuffer where consumption is
+ *   obstruction-free (i.e. no progress guarantee, but a consumer that is
+ *   stopped will not block progress).
+ *
+ * Implemented by essentially using an optimistic lock on the read side.
+ *
+ * XXX: It'd be nice if we could modify this so there's variants for push/pop
+ * that work for different concurrency scenarios. E.g. having spsc_push(),
+ * spmc_push(), ... - that'd avoid having to use different interfaces for
+ * different needs.
+ *
+ * Copyright (c) 2015, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *   src/backend/lib/ringbuf.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "lib/ringbuf.h"
+#include "storage/proc.h"
+
+static inline uint32
+ringbuf_backendid(ringbuf *rb, uint32 pos)
+{
+   return pos & 0xffff0000;
+}
+
+uint32
+ringbuf_elements(ringbuf *rb)
+{
+   uint32 read_off = ringbuf_pos(rb, pg_atomic_read_u32(&rb->read_state));
+   uint32 write_off = ringbuf_pos(rb, rb->write_off);
+
+   /* not wrapped around */
+   if (read_off <= write_off)
+   {
+       return write_off - read_off;
+   }
+
+   /* wrapped around */
+   return (rb->size - read_off) + write_off;
+}
+
+size_t
+ringbuf_size(size_t nelems)
+{
+   Assert(nelems <= 0x0000FFFF);
+   return sizeof(ringbuf) + sizeof(void *) * nelems;
+}
+
+/*
+ * Memory needs to be externally allocated and be at least
+ * ringbuf_size(nelems) large.
+ */
+ringbuf *
+ringbuf_create(void *target, size_t nelems)
+{
+   ringbuf *rb = (ringbuf *) target;
+
+   Assert(nelems <= 0x0000FFFF);
+
+   memset(target, 0, ringbuf_size(nelems));
+
+   rb->size = nelems;
+   pg_atomic_init_u32(&rb->read_state, 0);
+   rb->write_off = 0;
+
+   return rb;
+}
+
+bool
+ringbuf_push(ringbuf *rb, void *data)
+{
+   uint32 read_off = pg_atomic_read_u32(&rb->read_state);
+
+   /*
+    * Check if full - can be outdated, but that's ok. New readers are just
+    * going to further consume elements, never cause the buffer to become
+    * full.
+    */
+   if (ringbuf_pos(rb, read_off)
+       == ringbuf_pos(rb, ringbuf_advance_pos(rb, rb->write_off)))
+   {
+       return false;
+   }
+
+   rb->elements[ringbuf_pos(rb, rb->write_off)] = data;
+
+   /*
+    * The write adding the data needs to be visible before the corresponding
+    * increase of write_off is visible.
+    */
+   pg_write_barrier();
+
+   rb->write_off = ringbuf_advance_pos(rb, rb->write_off);
+
+   return true;
+}
+
+
+bool
+ringbuf_pop(ringbuf *rb, void **data)
+{
+   void *ret;
+   uint32 mybackend = MyProc->backendId;
+
+   Assert((mybackend & 0x0000ffff) == mybackend);
+
+   while (true)
+   {
+       uint32 read_state = pg_atomic_read_u32(&rb->read_state);
+       uint32 read_off = ringbuf_pos(rb, read_state);
+       uint32 old_read_state = read_state;
+
+       /* check if empty - can be outdated, but that's ok */
+       if (read_off == ringbuf_pos(rb, rb->write_off))
+           return false;
+
+       /*
+        * Add our backend id to the position, to detect wrap around.
+        * XXX
+        *
+        * XXX: Skip if the ID already is ours. That's probably likely enough
+        * to warrant the additional branch.
+        */
+       read_state = (read_state & 0x0000ffff) | mybackend << 16;
+
+       /*
+        * Mix the reader position into the current read_off, otherwise
+        * unchanged. If the offset changed since, retry from start.
+        *
+        * NB: This also serves as the read barrier pairing with the write
+        * barrier in ringbuf_push().
+        */
+       if (!pg_atomic_compare_exchange_u32(&rb->read_state, &old_read_state,
+                                           read_state))
+           continue;
+       old_read_state = read_state; /* with backend id mixed in */
+
+       /* finally read the data */
+       ret = rb->elements[read_off];
+
+       /* compute next offset */
+       read_state = ringbuf_advance_pos(rb, read_state);
+
+       if (pg_atomic_compare_exchange_u32(&rb->read_state, &old_read_state,
+                                          read_state))
+           break;
+   }
+
+   *data = ret;
+
+   return true;
+}
diff --git a/src/include/lib/ringbuf.h b/src/include/lib/ringbuf.h
new file mode 100644 (file)
index 0000000..3be450b
--- /dev/null
@@ -0,0 +1,72 @@
+/*
+ * ringbuf.h
+ *
+ * Single writer.multiple reader lockless & obstruction free ringbuffer.
+ *
+ * Copyright (c) 2015, PostgreSQL Global Development Group
+ *
+ * src/include/lib/ringbuf.h
+ */
+#ifndef RINGBUF_H
+#define RINGBUF_H
+
+#include "port/atomics.h"
+
+typedef struct ringbuf
+{
+   uint32 size;
+
+   /* 16 bit reader id, 16 bit offset */
+   /* XXX: probably should be on separate cachelines */
+   pg_atomic_uint32 read_state;
+   uint32_t write_off;
+
+   void *elements[FLEXIBLE_ARRAY_MEMBER];
+} ringbuf;
+
+size_t ringbuf_size(size_t nelems);
+
+ringbuf *ringbuf_create(void *target, size_t nelems);
+
+static inline uint32
+ringbuf_pos(ringbuf *rb, uint32 pos)
+{
+   /*
+    * XXX: replacing rb->size with a bitmask op would avoid expensive
+    * divisions. Requiring a pow2 size seems ok.
+    */
+   return (pos & 0x0000ffff) % rb->size;
+}
+
+/*
+ * Compute the new offset, slightly complicated by the fact that we only want
+ * to modify the lower 16 bits.
+ */
+static inline uint32
+ringbuf_advance_pos(ringbuf *rb, uint32 pos)
+{
+   return ((ringbuf_pos(rb, pos) + 1) & 0x0000FFFF) | (pos & 0xFFFF0000);
+}
+
+static inline bool
+ringbuf_empty(ringbuf *rb)
+{
+   uint32 read_state = pg_atomic_read_u32(&rb->read_state);
+
+   return ringbuf_pos(rb, read_state) == ringbuf_pos(rb, rb->write_off);
+}
+
+static inline bool
+ringbuf_full(ringbuf *rb)
+{
+   uint32 read_state = pg_atomic_read_u32(&rb->read_state);
+
+   return ringbuf_pos(rb, read_state) ==
+       ringbuf_pos(rb, ringbuf_advance_pos(rb, rb->write_off));
+}
+
+uint32 ringbuf_elements(ringbuf *rb);
+bool ringbuf_push(ringbuf *rb, void *data);
+bool ringbuf_pop(ringbuf *rb, void **data);
+
+#endif