Skip to content

co-codin/message-broker-go

Repository files navigation

minibroker

A small, learning-oriented message broker in Go β€” inspired by NATS, Kafka, Redis Streams, and RabbitMQ β€” built to understand how such systems work under the hood. Not production-ready; it's a teaching toy.

Architecture

Single-node broker

flowchart TB
    subgraph Clients["Clients"]
        producer["Producer"]
        worker1["Group member A"]
        worker2["Group member B"]
    end

    subgraph Broker["minibroker"]
        direction TB
        server["server.go<br/>(TCP :4222)"]
        metrics["metrics.go<br/>/metrics :9100"]

        subgraph Topic["Topic: events"]
            direction TB
            subgraph Parts["Partitions"]
                p0["partition 0"]
                p1["partition 1"]
                p2["partition 2"]
                p3["partition 3"]
            end
            subgraph Groups["Consumer groups"]
                g1["group: workers<br/>A=[0,2]  B=[1,3]"]
            end
        end

        subgraph Disk["data/events/"]
            direction LR
            seg0["part-0/<br/>0000...0000.log<br/>0000...1000.log"]
            seg1["part-1/..."]
            seg2["part-2/..."]
            seg3["part-3/..."]
            off["offsets/workers/<br/>0.off  1.off  2.off  3.off"]
        end
    end

    producer -- "PUB" --> server
    worker1 -- "SUB group=workers" --> server
    worker2 -- "SUB group=workers" --> server
    server --> Topic
    Parts --> Disk
    Groups --> off
    Broker -.-> metrics
Loading

3-node Raft cluster

flowchart LR
    client["Client<br/>Dial(n1, n2, n3)"]

    subgraph Cluster["Raft cluster"]
        direction LR
        n1[":4221<br/>n1 LEADER"]
        n2[":4222<br/>n2 follower"]
        n3[":4223<br/>n3 follower"]
        n1 <-. "raft :8001" .-> n2
        n1 <-. "raft :8001" .-> n3
        n2 <-. "raft :8001" .-> n3
    end

    client -- "PUB (writes)" --> n1
    client -- "SUB (reads)" --> n2
    client -.->|"auto-redirect<br/>on not-leader"| n1
    n1 -. "replicates via Raft" .-> n2
    n1 -. "replicates via Raft" .-> n3
Loading

Writes go through the leader's Raft log; followers apply the committed entries to their own broker state (segment files, commit offsets). Reads are served from local state on whichever node the client connects to, so subscribers on a follower are eventually consistent.

What it does

  • Persistent, append-only log per partition on disk, with CRC32 on every record
  • Segmentation β€” segment files roll after N records
  • Retention β€” segment-count based (-retain) and time based (-retain-for); a background sweeper trims old sealed segments
  • Partitions per topic with keyed or round-robin publish routing
  • Consumer groups with server-tracked, per-partition committed offsets
  • Automatic rebalance β€” when group members join or leave, partitions are redistributed and affected clients receive REBALANCE notifications
  • Heartbeats + session timeouts β€” stuck group members get evicted after -heartbeat-timeout
  • Log compaction by key β€” -compact-every rewrites sealed segments keeping only the latest record per key (preserving the offset space)
  • Raft replication across N nodes (via hashicorp/raft) β€” writes go through a leader; followers serve reads from the replicated local state
  • Binary wire protocol β€” payloads can contain arbitrary bytes
  • Go client library with auto-reconnect, heartbeats, and per-subscription delivery goroutines

Wire protocol

Every frame:

[u32 total-len][u8 op][op-specific body]

Strings are encoded as [u16 len][bytes], byte slices as [u32 len][bytes], offsets are u64, partition ids are u32. All integers big-endian.

op direction purpose
PUB c→s publish to a topic (routed by key or round-robin)
SUB c→s subscribe to a partition (head/offset) or join a group
UNSUB c→s leave a subscription
COMMIT c→s commit an offset for (topic, group, partition)
QUIT c→s end session
HEARTBEAT c→s group-member liveness ping
OK s→c ack for a request
ERR s→c error reply
MSG s→c server push: a record for a subscribed topic/partition
REBALANCE s→c group's partition assignment changed

Record format on disk:

[u32 body_len]
[u64 offset]                   # stored so compaction can preserve offsets
[u16 key_len][key bytes]
[u32 payload_len][payload bytes]
[u32 crc32]                    # IEEE over offset || key_len || key || payload_len || payload

Disk layout

data/
  <topic>/
    part-0/
      00000000000000000000.log     # records: [u32 len][payload]
      00000000000000001000.log
    part-1/
      ...
    offsets/
      <group>/
        0.off                      # committed offset for (group, partition 0)
        1.off

Running

make build
./minibroker -partitions 4

Flags:

flag default meaning
-addr :4222 TCP listen address
-dir ./data data directory
-partitions 4 partitions per topic (global for this broker)
-segment-size 1000 records per segment before rolling
-retain 100 total segments kept per partition (count-based retention)
-retain-for 0 time-based retention: drop sealed segments older than this (0 off)
-sweep-every 30s how often the time-retention sweep runs
-heartbeat-timeout 15s evict a group member that hasn't heartbeat'd in this window
-compact-every 0 run log compaction at this interval (0 disables)
-cluster-id `` this node's id (enables Raft replication when set)
-cluster-addr `` this node's Raft RPC address, e.g. :8001
-cluster-peers `` comma-separated peer list id@addr
-cluster-bootstrap false true on exactly one node the first time the cluster is created
-metrics-addr `` serve Prometheus /metrics here (e.g. :9100); empty disables
-healthcheck false internal: used by the Docker HEALTHCHECK to probe the broker port

Running a 3-node cluster

./examples/cluster/run-cluster.sh

This starts three minibroker processes on ports 4221/4222/4223 (broker) and 8001/8002/8003 (Raft), waits for an election, publishes through the leader, and confirms a follower received the replicated messages.

Docker

Multi-stage Dockerfile (distroless runtime, ~12 MB final image) and two compose files β€” one for single-node, one for the 3-node cluster.

target what it does
make docker-build build the local minibroker:latest image
make docker-up single broker on localhost:4222, named volume broker-data
make docker-down stop the single broker (keeps the data volume)
make docker-cluster-up 3-node Raft cluster, brokers on :4221 / :4222 / :4223
make docker-cluster-down stop the cluster AND wipe volumes (fresh bootstrap next time)
make docker-clean remove both compose stacks + the image

Once the cluster is up, go run ./examples/cluster probes the three ports, finds the leader, publishes through it, and reads the replicated records back from a follower.

Using the Go client

See CLIENT.md for the full API reference. Quick example:

package main

import (
	"fmt"
	"log"

	"minibroker/client"
)

func main() {
	c, err := client.Dial("localhost:4222")
	if err != nil {
		log.Fatal(err)
	}
	defer c.Close()

	// Publish β€” nil key means round-robin across partitions.
	partition, offset, err := c.Publish("events", nil, []byte("hello"))
	if err != nil {
		log.Fatal(err)
	}
	fmt.Printf("wrote partition=%d offset=%d\n", partition, offset)

	// Join a consumer group β€” the broker picks our partitions and sends
	// us a REBALANCE whenever the assignment changes.
	err = c.SubscribeGroup("events", "workers",
		func(topic, group string, assignment []int32) {
			fmt.Println("my partitions:", assignment)
		},
		func(topic string, pid int32, offset int64, payload []byte) {
			fmt.Printf("[%s/%d@%d] %s\n", topic, pid, offset, payload)
			// Commit next-offset-to-read (Kafka convention).
			_ = c.Commit(topic, "workers", pid, offset+1)
		})
	if err != nil {
		log.Fatal(err)
	}

	select {} // run until killed
}

For a single-partition, ephemeral subscription (no group), use c.Subscribe(topic, partition, fromOffset, handler) β€” pass fromOffset=-1 to start at the partition's current head, or 0 to replay from the beginning.

Demos

Each demo expects a broker listening on localhost:4222. Start one in a second terminal with the hint next to each target.

target what it shows broker hint
make demo basic pub/sub, replay from offset, unsubscribe make run
make demo-reconnect client survives a broker restart make run (then kill + restart)
make demo-segments segmentation and retention make run-segments
make demo-groups consumer group resumes from a committed offset across sessions make run
make demo-partitions two workers in a group split four partitions; binary-safe payloads make run-partitions
make demo-heartbeat well-behaved client persists; stuck raw socket gets evicted make run-heartbeat
make demo-cluster spins up 3 nodes, publishes via leader, reads from follower (script starts the cluster itself)

Project layout

minibroker/
β”œβ”€β”€ proto/            binary framing + op codes
β”œβ”€β”€ partition.go      single-partition segmented log
β”œβ”€β”€ topic.go          N partitions + consumer-group state + rebalance
β”œβ”€β”€ broker.go         topic registry + publish routing
β”œβ”€β”€ server.go         TCP server; handles all ops; manages group iterators
β”œβ”€β”€ main.go           flags and entrypoint
β”œβ”€β”€ client/           Go client library (Dial, Publish, Subscribe, ...)
└── examples/         five runnable demos

Benchmarks

Head-to-head publish throughput against three well-known brokers. Every target runs the same loop: publish one message, wait for the broker to acknowledge it durably, publish the next. No pipelining, no batching β€” just a fair synchronous-per-message comparison.

All brokers run in Docker on the same host (compose file: docker-compose.bench.yml) and the harness lives in bench/. Reproduce with:

docker compose -f docker-compose.bench.yml up -d
go run ./bench -target minibroker    -n 20000 -size 100
go run ./bench -target nats-js       -n 20000 -size 100
go run ./bench -target rabbit        -n 20000 -size 100
go run ./bench -target redis -addr localhost:6380 -n 20000 -size 100
docker compose -f docker-compose.bench.yml down -v

Results (single-producer, host=Linux, measured on the same run; your mileage will vary):

100-byte payloads, n=20,000:

broker elapsed msg/s MiB/s notes
nats-js 1.52s 13,138 1.25 JetStream file storage; batches fsyncs
minibroker 28.36s 705 0.07 fsync on every append
redis-streams 29.21s 685 0.07 AOF appendfsync always
rabbitmq 32.42s 617 0.06 durable queue + persistent + confirms

1 KiB payloads, n=10,000:

broker elapsed msg/s MiB/s
nats-js 0.91s 11,012 10.75
minibroker 14.56s 687 0.67
redis-streams 14.92s 670 0.65
rabbitmq 16.39s 610 0.60

How to read this

  • The three brokers that fsync on every message (minibroker, redis-with-appendfsync=always, rabbitmq with durable+persistent+ confirms) all land in the same band β€” about 600–700 msg/s. Throughput is dominated by disk-sync latency on this host, not any broker's code path.
  • NATS JetStream is ~18Γ— faster because it does not fsync every append β€” writes hit the page cache and the OS flushes on its own schedule (or at NATS's periodic checkpoint). This is a different durability class: "survives a process crash" vs "survives a power cut."
  • Payload size barely matters for the fsync-bound brokers β€” at 100 B and 1 KiB throughput is within 5% of each other. For NATS it's also flat because the bottleneck isn't disk throughput; it's the round trips.

Where minibroker could close the gap

  • Add an async/batched publish path. Today every Publish waits for the OK reply, so the N round-trips dominate. PublishBatch(records) with one fsync at the end would trivially 10–20Γ— throughput.
  • Group fsyncs across concurrent publishers (a.k.a. group commit).
  • Use O_DIRECT or large sequential writes to let the OS coalesce disk I/O.

Things this doesn't do (on purpose, for now)

  • No TLS, no auth, no ACLs.
  • No per-topic configuration (partition count is a global broker flag).
  • Size-based retention (byte budget) β€” only count and time based.
  • Publishers do not batch; each PUB is one fsync (and one Raft round-trip when clustered).
  • Raft log / stable store lives in memory; only snapshots hit disk. A node restart currently replays from snapshot + in-memory log and expects peers to fill gaps. Swap in raftboltdb for durable Raft state.
  • No consumer-group session IDs beyond the in-server memberID; if a client reconnects quickly enough, its previous slot may still linger until the heartbeat timeout fires.

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors