A small, learning-oriented message broker in Go β inspired by NATS, Kafka, Redis Streams, and RabbitMQ β built to understand how such systems work under the hood. Not production-ready; it's a teaching toy.
flowchart TB
subgraph Clients["Clients"]
producer["Producer"]
worker1["Group member A"]
worker2["Group member B"]
end
subgraph Broker["minibroker"]
direction TB
server["server.go<br/>(TCP :4222)"]
metrics["metrics.go<br/>/metrics :9100"]
subgraph Topic["Topic: events"]
direction TB
subgraph Parts["Partitions"]
p0["partition 0"]
p1["partition 1"]
p2["partition 2"]
p3["partition 3"]
end
subgraph Groups["Consumer groups"]
g1["group: workers<br/>A=[0,2] B=[1,3]"]
end
end
subgraph Disk["data/events/"]
direction LR
seg0["part-0/<br/>0000...0000.log<br/>0000...1000.log"]
seg1["part-1/..."]
seg2["part-2/..."]
seg3["part-3/..."]
off["offsets/workers/<br/>0.off 1.off 2.off 3.off"]
end
end
producer -- "PUB" --> server
worker1 -- "SUB group=workers" --> server
worker2 -- "SUB group=workers" --> server
server --> Topic
Parts --> Disk
Groups --> off
Broker -.-> metrics
flowchart LR
client["Client<br/>Dial(n1, n2, n3)"]
subgraph Cluster["Raft cluster"]
direction LR
n1[":4221<br/>n1 LEADER"]
n2[":4222<br/>n2 follower"]
n3[":4223<br/>n3 follower"]
n1 <-. "raft :8001" .-> n2
n1 <-. "raft :8001" .-> n3
n2 <-. "raft :8001" .-> n3
end
client -- "PUB (writes)" --> n1
client -- "SUB (reads)" --> n2
client -.->|"auto-redirect<br/>on not-leader"| n1
n1 -. "replicates via Raft" .-> n2
n1 -. "replicates via Raft" .-> n3
Writes go through the leader's Raft log; followers apply the committed entries to their own broker state (segment files, commit offsets). Reads are served from local state on whichever node the client connects to, so subscribers on a follower are eventually consistent.
- Persistent, append-only log per partition on disk, with CRC32 on every record
- Segmentation β segment files roll after N records
- Retention β segment-count based (
-retain) and time based (-retain-for); a background sweeper trims old sealed segments - Partitions per topic with keyed or round-robin publish routing
- Consumer groups with server-tracked, per-partition committed offsets
- Automatic rebalance β when group members join or leave, partitions are
redistributed and affected clients receive
REBALANCEnotifications - Heartbeats + session timeouts β stuck group members get evicted after
-heartbeat-timeout - Log compaction by key β
-compact-everyrewrites sealed segments keeping only the latest record per key (preserving the offset space) - Raft replication across N nodes (via
hashicorp/raft) β writes go through a leader; followers serve reads from the replicated local state - Binary wire protocol β payloads can contain arbitrary bytes
- Go client library with auto-reconnect, heartbeats, and per-subscription delivery goroutines
Every frame:
[u32 total-len][u8 op][op-specific body]
Strings are encoded as [u16 len][bytes], byte slices as [u32 len][bytes],
offsets are u64, partition ids are u32. All integers big-endian.
| op | direction | purpose |
|---|---|---|
| PUB | cβs | publish to a topic (routed by key or round-robin) |
| SUB | cβs | subscribe to a partition (head/offset) or join a group |
| UNSUB | cβs | leave a subscription |
| COMMIT | cβs | commit an offset for (topic, group, partition) |
| QUIT | cβs | end session |
| HEARTBEAT | cβs | group-member liveness ping |
| OK | sβc | ack for a request |
| ERR | sβc | error reply |
| MSG | sβc | server push: a record for a subscribed topic/partition |
| REBALANCE | sβc | group's partition assignment changed |
Record format on disk:
[u32 body_len]
[u64 offset] # stored so compaction can preserve offsets
[u16 key_len][key bytes]
[u32 payload_len][payload bytes]
[u32 crc32] # IEEE over offset || key_len || key || payload_len || payload
data/
<topic>/
part-0/
00000000000000000000.log # records: [u32 len][payload]
00000000000000001000.log
part-1/
...
offsets/
<group>/
0.off # committed offset for (group, partition 0)
1.off
make build
./minibroker -partitions 4Flags:
| flag | default | meaning |
|---|---|---|
-addr |
:4222 |
TCP listen address |
-dir |
./data |
data directory |
-partitions |
4 |
partitions per topic (global for this broker) |
-segment-size |
1000 |
records per segment before rolling |
-retain |
100 |
total segments kept per partition (count-based retention) |
-retain-for |
0 |
time-based retention: drop sealed segments older than this (0 off) |
-sweep-every |
30s |
how often the time-retention sweep runs |
-heartbeat-timeout |
15s |
evict a group member that hasn't heartbeat'd in this window |
-compact-every |
0 |
run log compaction at this interval (0 disables) |
-cluster-id |
`` | this node's id (enables Raft replication when set) |
-cluster-addr |
`` | this node's Raft RPC address, e.g. :8001 |
-cluster-peers |
`` | comma-separated peer list id@addr |
-cluster-bootstrap |
false |
true on exactly one node the first time the cluster is created |
-metrics-addr |
`` | serve Prometheus /metrics here (e.g. :9100); empty disables |
-healthcheck |
false |
internal: used by the Docker HEALTHCHECK to probe the broker port |
./examples/cluster/run-cluster.shThis starts three minibroker processes on ports 4221/4222/4223 (broker) and
8001/8002/8003 (Raft), waits for an election, publishes through the leader,
and confirms a follower received the replicated messages.
Multi-stage Dockerfile (distroless runtime, ~12 MB final image) and two compose files β one for single-node, one for the 3-node cluster.
| target | what it does |
|---|---|
make docker-build |
build the local minibroker:latest image |
make docker-up |
single broker on localhost:4222, named volume broker-data |
make docker-down |
stop the single broker (keeps the data volume) |
make docker-cluster-up |
3-node Raft cluster, brokers on :4221 / :4222 / :4223 |
make docker-cluster-down |
stop the cluster AND wipe volumes (fresh bootstrap next time) |
make docker-clean |
remove both compose stacks + the image |
Once the cluster is up, go run ./examples/cluster probes the three ports,
finds the leader, publishes through it, and reads the replicated records
back from a follower.
See CLIENT.md for the full API reference. Quick example:
package main
import (
"fmt"
"log"
"minibroker/client"
)
func main() {
c, err := client.Dial("localhost:4222")
if err != nil {
log.Fatal(err)
}
defer c.Close()
// Publish β nil key means round-robin across partitions.
partition, offset, err := c.Publish("events", nil, []byte("hello"))
if err != nil {
log.Fatal(err)
}
fmt.Printf("wrote partition=%d offset=%d\n", partition, offset)
// Join a consumer group β the broker picks our partitions and sends
// us a REBALANCE whenever the assignment changes.
err = c.SubscribeGroup("events", "workers",
func(topic, group string, assignment []int32) {
fmt.Println("my partitions:", assignment)
},
func(topic string, pid int32, offset int64, payload []byte) {
fmt.Printf("[%s/%d@%d] %s\n", topic, pid, offset, payload)
// Commit next-offset-to-read (Kafka convention).
_ = c.Commit(topic, "workers", pid, offset+1)
})
if err != nil {
log.Fatal(err)
}
select {} // run until killed
}For a single-partition, ephemeral subscription (no group), use
c.Subscribe(topic, partition, fromOffset, handler) β pass fromOffset=-1
to start at the partition's current head, or 0 to replay from the
beginning.
Each demo expects a broker listening on localhost:4222. Start one in a
second terminal with the hint next to each target.
| target | what it shows | broker hint |
|---|---|---|
make demo |
basic pub/sub, replay from offset, unsubscribe | make run |
make demo-reconnect |
client survives a broker restart | make run (then kill + restart) |
make demo-segments |
segmentation and retention | make run-segments |
make demo-groups |
consumer group resumes from a committed offset across sessions | make run |
make demo-partitions |
two workers in a group split four partitions; binary-safe payloads | make run-partitions |
make demo-heartbeat |
well-behaved client persists; stuck raw socket gets evicted | make run-heartbeat |
make demo-cluster |
spins up 3 nodes, publishes via leader, reads from follower | (script starts the cluster itself) |
minibroker/
βββ proto/ binary framing + op codes
βββ partition.go single-partition segmented log
βββ topic.go N partitions + consumer-group state + rebalance
βββ broker.go topic registry + publish routing
βββ server.go TCP server; handles all ops; manages group iterators
βββ main.go flags and entrypoint
βββ client/ Go client library (Dial, Publish, Subscribe, ...)
βββ examples/ five runnable demos
Head-to-head publish throughput against three well-known brokers. Every target runs the same loop: publish one message, wait for the broker to acknowledge it durably, publish the next. No pipelining, no batching β just a fair synchronous-per-message comparison.
All brokers run in Docker on the same host (compose file:
docker-compose.bench.yml) and the harness
lives in bench/. Reproduce with:
docker compose -f docker-compose.bench.yml up -d
go run ./bench -target minibroker -n 20000 -size 100
go run ./bench -target nats-js -n 20000 -size 100
go run ./bench -target rabbit -n 20000 -size 100
go run ./bench -target redis -addr localhost:6380 -n 20000 -size 100
docker compose -f docker-compose.bench.yml down -vResults (single-producer, host=Linux, measured on the same run; your
mileage will vary):
100-byte payloads, n=20,000:
| broker | elapsed | msg/s | MiB/s | notes |
|---|---|---|---|---|
| nats-js | 1.52s | 13,138 | 1.25 | JetStream file storage; batches fsyncs |
| minibroker | 28.36s | 705 | 0.07 | fsync on every append |
| redis-streams | 29.21s | 685 | 0.07 | AOF appendfsync always |
| rabbitmq | 32.42s | 617 | 0.06 | durable queue + persistent + confirms |
1 KiB payloads, n=10,000:
| broker | elapsed | msg/s | MiB/s |
|---|---|---|---|
| nats-js | 0.91s | 11,012 | 10.75 |
| minibroker | 14.56s | 687 | 0.67 |
| redis-streams | 14.92s | 670 | 0.65 |
| rabbitmq | 16.39s | 610 | 0.60 |
- The three brokers that fsync on every message (minibroker,
redis-with-
appendfsync=always, rabbitmq with durable+persistent+ confirms) all land in the same band β about 600β700 msg/s. Throughput is dominated by disk-sync latency on this host, not any broker's code path. - NATS JetStream is ~18Γ faster because it does not fsync every append β writes hit the page cache and the OS flushes on its own schedule (or at NATS's periodic checkpoint). This is a different durability class: "survives a process crash" vs "survives a power cut."
- Payload size barely matters for the fsync-bound brokers β at 100 B and 1 KiB throughput is within 5% of each other. For NATS it's also flat because the bottleneck isn't disk throughput; it's the round trips.
- Add an async/batched publish path. Today every
Publishwaits for the OK reply, so the N round-trips dominate.PublishBatch(records)with one fsync at the end would trivially 10β20Γ throughput. - Group fsyncs across concurrent publishers (a.k.a. group commit).
- Use
O_DIRECTor large sequential writes to let the OS coalesce disk I/O.
- No TLS, no auth, no ACLs.
- No per-topic configuration (partition count is a global broker flag).
- Size-based retention (byte budget) β only count and time based.
- Publishers do not batch; each PUB is one fsync (and one Raft round-trip when clustered).
- Raft log / stable store lives in memory; only snapshots hit disk. A node
restart currently replays from snapshot + in-memory log and expects peers
to fill gaps. Swap in
raftboltdbfor durable Raft state. - No consumer-group session IDs beyond the in-server memberID; if a client reconnects quickly enough, its previous slot may still linger until the heartbeat timeout fires.