Go Realtime System Design: 5 Production Patterns from WebSocket to Event Streaming
When 100K WebSocket Connections Disconnect Simultaneously: A Realtime Production Nightmare
Friday 8 PM, livestream platform with 500K concurrent users. A config change triggers a rolling deployment, all WebSocket connections drop simultaneously — 500K clients reconnect at once. The avalanche effect takes down the gateway instantly. Worse, the reconnection storm causes message queue backlog, consumers can't keep up, latency spikes from 50ms to 30 seconds, and user experience collapses entirely.
This isn't an isolated case. Go realtime system development goes far beyond "upgrade to WebSocket" — you need to manage connection lifecycles, handle message routing, deal with backpressure and reconnection storms, and ensure ordered event delivery. This article covers 5 production-grade realtime patterns to help you build robust Go realtime systems.
Core Concepts Reference
| Technology | Purpose | Key Features | Typical Use Case |
|---|---|---|---|
WebSocket |
Full-duplex realtime communication | Persistent connection, low latency, bidirectional push | Chat systems, collaborative editing, realtime gaming |
SSE |
Server-to-client unidirectional push | HTTP protocol, auto-reconnect, text frames | Notification push, stock quotes, log streams |
NATS |
High-performance message queue | Pub/sub, JetStream persistence, clustering | Microservice event bus, IoT data distribution |
Go channel |
In-process event passing | Type-safe, select multiplexing, closeable | Event-driven architecture, inter-goroutine communication |
gorilla/websocket |
Go WebSocket library | Connection pool, ping/pong, concurrency-safe | Production-grade WebSocket services |
5 Challenges of Realtime System Design
Challenge 1: Connection Lifecycle Management
+--------+ +--------+ +--------+
| Client |---->| Server |----| Client |
+--------+ +--------+ +--------+
| | |
| Connect | Connect |
|------------->|<-------------|
| | |
| Heartbeat | Heartbeat |
|<--ping/pong->|<--ping/pong->|
| | |
| Disconnect | Disconnect |
|<------------X|X------------>|
| | |
| Reconnect | Reconnect |
| Storm | Storm |
|=============>|<=============|
100K connections disconnecting and reconnecting simultaneously without backoff is self-DDoS.
Challenge 2: Message Routing and Fan-out
One message needs to be pushed to 10K subscribers. One goroutine per subscriber? Or a shared writer goroutine?
Challenge 3: Backpressure and Flow Control
Producer speed far exceeds consumer speed. What happens when the channel is full? Drop? Block? Degrade?
Challenge 4: Event Ordering
Events for the same user must be delivered in order, but how do multiple instances guarantee this in a distributed environment?
Challenge 5: Graceful Degradation
Fall back to SSE when WebSocket is unavailable, fall back to polling when SSE is unavailable. How to design the degradation strategy?
5 Production-Grade Realtime Patterns
Pattern 1: WebSocket Hub — Chat System
The WebSocket Hub is the most classic realtime pattern: a central Hub manages all connections, messages are broadcast through the Hub to all subscribers.
+-------+
| Hub |
+-------+
/ | | \
v v v v
+----+ +----+ +----+ +----+
| C1 | | C2 | | C3 | | C4 |
+----+ +----+ +----+ +----+
Client sends message --> Hub broadcasts --> All clients receive
package wschat
import (
"log"
"net/http"
"sync"
"github.com/gorilla/websocket"
)
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool {
return true
},
}
type Message struct {
Username string `json:"username"`
Content string `json:"content"`
Room string `json:"room"`
}
type Client struct {
hub *Hub
conn *websocket.Conn
send chan []byte
room string
}
type Hub struct {
clients map[*Client]bool
broadcast chan []byte
register chan *Client
unregister chan *Client
mu sync.RWMutex
rooms map[string]map[*Client]bool
}
func NewHub() *Hub {
return &Hub{
broadcast: make(chan []byte, 256),
register: make(chan *Client),
unregister: make(chan *Client),
clients: make(map[*Client]bool),
rooms: make(map[string]map[*Client]bool),
}
}
func (h *Hub) Run() {
for {
select {
case client := <-h.register:
h.mu.Lock()
h.clients[client] = true
if h.rooms[client.room] == nil {
h.rooms[client.room] = make(map[*Client]bool)
}
h.rooms[client.room][client] = true
h.mu.Unlock()
case client := <-h.unregister:
h.mu.Lock()
if _, ok := h.clients[client]; ok {
delete(h.clients, client)
if room, ok := h.rooms[client.room]; ok {
delete(room, client)
if len(room) == 0 {
delete(h.rooms, client.room)
}
}
close(client.send)
}
h.mu.Unlock()
case message := <-h.broadcast:
h.mu.RLock()
for client := range h.clients {
select {
case client.send <- message:
default:
h.mu.RUnlock()
h.mu.Lock()
close(client.send)
delete(h.clients, client)
h.mu.Unlock()
h.mu.RLock()
}
}
h.mu.RUnlock()
}
}
}
func (c *Client) readPump() {
defer func() {
c.hub.unregister <- c
c.conn.Close()
}()
c.conn.SetReadLimit(512)
c.conn.SetPongHandler(func(string) error {
return nil
})
for {
_, message, err := c.conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
log.Printf("read error: %v", err)
}
break
}
c.hub.broadcast <- message
}
}
func (c *Client) writePump() {
ticker := time.NewTicker(54 * time.Second)
defer func() {
ticker.Stop()
c.conn.Close()
}()
for {
select {
case message, ok := <-c.send:
c.conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
if !ok {
c.conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}
w, err := c.conn.NextWriter(websocket.TextMessage)
if err != nil {
return
}
w.Write(message)
n := len(c.send)
for i := 0; i < n; i++ {
w.Write([]byte{'\n'})
w.Write(<-c.send)
}
if err := w.Close(); err != nil {
return
}
case <-ticker.C:
c.conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
return
}
}
}
}
func ServeWs(hub *Hub, w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Println(err)
return
}
room := r.URL.Query().Get("room")
if room == "" {
room = "default"
}
client := &Client{
hub: hub,
conn: conn,
send: make(chan []byte, 256),
room: room,
}
hub.register <- client
go client.writePump()
go client.readPump()
}
Key design points:
- Hub uses channels for register/unregister/broadcast, avoiding lock contention
- Each Client has independent readPump/writePump goroutines
- writePump batches sends (drains n messages from channel together)
- ping/pong heartbeat detection, 54-second interval
- Slow consumers auto-disconnected when send channel is full
Pattern 2: Server-Sent Events (SSE) — Realtime Update Push
SSE is lighter than WebSocket, based on HTTP protocol, with built-in auto-reconnect and event ID continuation.
+--------+ +--------+
| Client | | Server |
+--------+ +--------+
| |
| GET /events |
|------------------>|
| |
| Content-Type: |
| text/event-stream|
|<------------------|
| |
| data: {"price": |
| 42000} |
|<------------------|
| |
| id: 123 |
| data: {"price": |
| 42050} |
|<------------------|
package sse
import (
"fmt"
"net/http"
"sync"
)
type Event struct {
ID string
Event string
Data string
}
type Broker struct {
clients map[chan Event]bool
newClients chan chan Event
deadClients chan chan Event
mu sync.RWMutex
}
func NewBroker() *Broker {
return &Broker{
clients: make(map[chan Event]bool),
newClients: make(chan chan Event),
deadClients: make(chan chan Event),
}
}
func (b *Broker) Start() {
for {
select {
case c := <-b.newClients:
b.mu.Lock()
b.clients[c] = true
b.mu.Unlock()
log.Printf("client connected, total: %d", len(b.clients))
case c := <-b.deadClients:
b.mu.Lock()
delete(b.clients, c)
b.mu.Unlock()
close(c)
log.Printf("client disconnected, total: %d", len(b.clients))
}
}
}
func (b *Broker) ServeHTTP(w http.ResponseWriter, r *http.Request) {
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "streaming not supported", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Access-Control-Allow-Origin", "*")
clientChan := make(chan Event, 64)
b.newClients <- clientChan
notify := r.Context().Done()
go func() {
<-notify
b.deadClients <- clientChan
}()
for {
select {
case event, ok := <-clientChan:
if !ok {
return
}
if event.ID != "" {
fmt.Fprintf(w, "id: %s\n", event.ID)
}
if event.Event != "" {
fmt.Fprintf(w, "event: %s\n", event.Event)
}
fmt.Fprintf(w, "data: %s\n\n", event.Data)
flusher.Flush()
case <-r.Context().Done():
return
}
}
}
func (b *Broker) Notify(event Event) {
b.mu.RLock()
defer b.mu.RUnlock()
for client := range b.clients {
select {
case client <- event:
default:
go func(c chan Event) {
b.deadClients <- c
}(client)
}
}
}
Usage example — stock quote push:
func main() {
broker := NewBroker()
go broker.Start()
go func() {
eventID := 0
for {
time.Sleep(1 * time.Second)
eventID++
price := fetchStockPrice("BTC")
broker.Notify(Event{
ID: fmt.Sprintf("%d", eventID),
Event: "price-update",
Data: fmt.Sprintf(`{"symbol":"BTC","price":%.2f}`, price),
})
}
}()
http.Handle("/events", broker)
http.ListenAndServe(":8080", nil)
}
Key design points:
- HTTP-based, no protocol upgrade needed, proxy/CDN friendly
Last-Event-IDsupports reconnection with event continuationhttp.Flusherensures data is pushed immediately- Context cancellation auto-cleans up clients
- Slow consumers auto-disconnected when channel is full
Pattern 3: NATS Message Queue Integration — Microservice Event Bus
NATS is one of the most popular message queues in the Go ecosystem, with single-node throughput up to 15M msg/s, naturally suited for realtime systems.
+--------+ +--------+ +--------+
|ServiceA| | NATS | |ServiceB|
+--------+ | Server | +--------+
| +--------+ |
| publish | | subscribe
|-------->| |-------->|
| | | |
| | publish | |
| |<---------| |
| subscribe | |
|<--------| | |
+--------+ +--------+ +--------+
|ServiceC| | NATS | |ServiceD|
+--------+ | JetStream +--------+
+--------+
package natsbus
import (
"context"
"encoding/json"
"fmt"
"log"
"sync"
"github.com/nats-io/nats.go"
)
type Event struct {
Type string `json:"type"`
Payload json.RawMessage `json:"payload"`
Source string `json:"source"`
}
type EventBus struct {
nc *nats.Conn
js nats.JetStreamContext
handlers map[string][]func(Event)
mu sync.RWMutex
}
func NewEventBus(url string) (*EventBus, error) {
nc, err := nats.Connect(url,
nats.ReconnectWait(2*time.Second),
nats.MaxReconnects(60),
nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
log.Printf("NATS disconnected: %v", err)
}),
nats.ReconnectHandler(func(nc *nats.Conn) {
log.Printf("NATS reconnected to %s", nc.ConnectedUrl())
}),
)
if err != nil {
return nil, fmt.Errorf("connect to NATS: %w", err)
}
js, err := nc.JetStream()
if err != nil {
return nil, fmt.Errorf("get JetStream context: %w", err)
}
return &EventBus{
nc: nc,
js: js,
handlers: make(map[string][]func(Event)),
}, nil
}
func (b *EventBus) Publish(ctx context.Context, subject string, event Event) error {
data, err := json.Marshal(event)
if err != nil {
return fmt.Errorf("marshal event: %w", err)
}
select {
case <-ctx.Done():
return ctx.Err()
default:
}
if _, err := b.js.Publish(subject, data); err != nil {
return fmt.Errorf("publish to %s: %w", subject, err)
}
return nil
}
func (b *EventBus) Subscribe(ctx context.Context, subject, durable string, handler func(Event)) error {
streamName := fmt.Sprintf("STREAM_%s", durable)
if _, err := b.js.StreamInfo(streamName); err != nil {
if _, err := b.js.AddStream(&nats.StreamConfig{
Name: streamName,
Subjects: []string{subject},
Retention: nats.LimitsPolicy,
MaxMsgs: 1000000,
MaxAge: 7 * 24 * time.Hour,
}); err != nil {
return fmt.Errorf("add stream %s: %w", streamName, err)
}
}
sub, err := b.js.Subscribe(subject, func(msg *nats.Msg) {
var event Event
if err := json.Unmarshal(msg.Data, &event); err != nil {
log.Printf("unmarshal event: %v", err)
msg.Nak()
return
}
handler(event)
msg.Ack()
}, nats.Durable(durable), nats.ManualAck(), nats.DeliverAll())
if err != nil {
return fmt.Errorf("subscribe to %s: %w", subject, err)
}
go func() {
<-ctx.Done()
sub.Unsubscribe()
}()
return nil
}
func (b *EventBus) Close() {
b.nc.Close()
}
Usage example — order event stream:
func main() {
bus, err := NewEventBus("nats://localhost:4222")
if err != nil {
log.Fatal(err)
}
defer bus.Close()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err = bus.Subscribe(ctx, "orders.*", "order-processor", func(event natsbus.Event) {
switch event.Type {
case "order.created":
processOrder(event.Payload)
case "order.paid":
shipOrder(event.Payload)
case "order.cancelled":
refundOrder(event.Payload)
}
})
bus.Publish(ctx, "orders.us", natsbus.Event{
Type: "order.created",
Source: "checkout-service",
Payload: json.RawMessage(`{"id":"ORD-001","amount":99.99}`),
})
}
Key design points:
- JetStream persistence, messages never lost
Durablesubscribers, resume from last position after reconnectionManualAckensures messages are only confirmed after successful processing- Auto-reconnect configuration (
ReconnectWait,MaxReconnects) - Stream configuration for message retention policy (7 days, 1M messages)
Pattern 4: Event-Driven Architecture — Go Channels
Go channels are naturally suited for in-process event-driven architecture. Combined with generics, you can build type-safe event buses.
+----------+ +----------+ +----------+
| OrderSvc | |EventBus | | NotifySvc|
+----------+ | | +----------+
| | orders | |
|---------->| -----> |--------->|
| | | +----------+
| | payments| | AccSvc |
| | -----> | +----------+
| | |--------->|
| +----------+
package eventbus
import (
"context"
"fmt"
"sync"
)
type Event[T any] struct {
Topic string
Payload T
Err error
}
type Subscription[T any] struct {
ch chan Event[T]
cancel context.CancelFunc
}
type Bus[T any] struct {
subscribers map[string][]chan Event[T]
mu sync.RWMutex
bufferSize int
}
func NewBus[T any](bufferSize int) *Bus[T] {
return &Bus[T]{
subscribers: make(map[string][]chan Event[T]),
bufferSize: bufferSize,
}
}
func (b *Bus[T]) Publish(ctx context.Context, topic string, payload T) error {
b.mu.RLock()
subs := b.subscribers[topic]
b.mu.RUnlock()
event := Event[T]{
Topic: topic,
Payload: payload,
}
for _, ch := range subs {
select {
case ch <- event:
case <-ctx.Done():
return ctx.Err()
default:
go func(c chan Event[T]) {
select {
case c <- event:
case <-ctx.Done():
}
}(ch)
}
}
return nil
}
func (b *Bus[T]) Subscribe(ctx context.Context, topic string) *Subscription[T] {
ch := make(chan Event[T], b.bufferSize)
b.mu.Lock()
b.subscribers[topic] = append(b.subscribers[topic], ch)
b.mu.Unlock()
ctx, cancel := context.WithCancel(ctx)
go func() {
<-ctx.Done()
b.mu.Lock()
subs := b.subscribers[topic]
for i, sub := range subs {
if sub == ch {
b.subscribers[topic] = append(subs[:i], subs[i+1:]...)
break
}
}
b.mu.Unlock()
close(ch)
}()
return &Subscription[T]{ch: ch, cancel: cancel}
}
func (s *Subscription[T]) Events() <-chan Event[T] {
return s.ch
}
func (s *Subscription[T]) Unsubscribe() {
s.cancel()
}
Usage example — order event stream:
type OrderEvent struct {
OrderID string
Status string
Amount float64
}
func main() {
bus := eventbus.NewBus[OrderEvent](256)
ctx := context.Background()
sub := bus.Subscribe(ctx, "orders")
go func() {
for event := range sub.Events() {
fmt.Printf("order %s: %s ($%.2f)\n",
event.Payload.OrderID,
event.Payload.Status,
event.Payload.Amount,
)
}
}()
bus.Publish(ctx, "orders", OrderEvent{
OrderID: "ORD-001",
Status: "created",
Amount: 99.99,
})
bus.Publish(ctx, "orders", OrderEvent{
OrderID: "ORD-001",
Status: "paid",
Amount: 99.99,
})
}
Key design points:
- Generic event bus, type-safe
- Non-blocking publish, async send when channel is full
- Context cancellation auto-cleans up subscriptions
- Independent channels per subscriber, no cross-interference
- Zero external dependencies, pure Go standard library
Pattern 5: Production Deployment — Connection Management & Reconnection Strategy
Production environments require handling connection management, reconnection backoff, connection rate limiting, health checks, and other operational concerns.
+--------+ +----------+ +--------+
| Client |---->| Gateway |---->| Service|
+--------+ +----------+ +--------+
| | |
| 1.Connect | |
|------------->| |
| | 2.Rate limit |
| |--- |
| | | |
| |<-- |
| 3.Upgrade WS| |
|<------------>| |
| | |
| 4.Register | |
| |--------------->|
| | |
| 5.Heartbeat | |
|<--ping/pong->| |
| | |
| 6.Disconnect| |
|<------------X| |
| | 7.Unregister |
| |--------------->|
| | |
| 8.Exponential| |
| backoff | |
|---->---->---->| |
package connmanager
import (
"context"
"fmt"
"math/rand"
"net/http"
"sync"
"sync/atomic"
"time"
"github.com/gorilla/websocket"
)
type ConnectionManager struct {
maxConnections int64
currentConns atomic.Int64
connections map[string]*ManagedConn
mu sync.RWMutex
heartbeatInterval time.Duration
writeTimeout time.Duration
maxMessageSize int64
}
type ManagedConn struct {
ID string
Conn *websocket.Conn
Connected time.Time
LastPing time.Time
cancel context.CancelFunc
}
type ConnConfig struct {
MaxConnections int64
HeartbeatInterval time.Duration
WriteTimeout time.Duration
MaxMessageSize int64
}
func NewConnectionManager(cfg ConnConfig) *ConnectionManager {
return &ConnectionManager{
maxConnections: cfg.MaxConnections,
connections: make(map[string]*ManagedConn),
heartbeatInterval: cfg.HeartbeatInterval,
writeTimeout: cfg.WriteTimeout,
maxMessageSize: cfg.MaxMessageSize,
}
}
func (m *ConnectionManager) HandleConnection(w http.ResponseWriter, r *http.Request, handler func([]byte)) {
if m.currentConns.Load() >= m.maxConnections {
http.Error(w, "too many connections", http.StatusServiceUnavailable)
return
}
upgrader := websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool { return true },
}
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
return
}
connID := generateConnID()
ctx, cancel := context.WithCancel(r.Context())
mc := &ManagedConn{
ID: connID,
Conn: conn,
Connected: time.Now(),
LastPing: time.Now(),
cancel: cancel,
}
m.mu.Lock()
m.connections[connID] = mc
m.mu.Unlock()
m.currentConns.Add(1)
go m.readPump(mc, handler)
go m.writePump(mc)
}
func (m *ConnectionManager) readPump(mc *ManagedConn, handler func([]byte)) {
defer m.removeConnection(mc)
mc.Conn.SetReadLimit(m.maxMessageSize)
mc.Conn.SetPongHandler(func(string) error {
mc.LastPing = time.Now()
return nil
})
for {
_, message, err := mc.Conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) {
log.Printf("conn %s unexpected close: %v", mc.ID, err)
}
return
}
handler(message)
}
}
func (m *ConnectionManager) writePump(mc *ManagedConn) {
ticker := time.NewTicker(m.heartbeatInterval)
defer func() {
ticker.Stop()
mc.Conn.Close()
}()
for {
select {
case <-ticker.C:
mc.Conn.SetWriteDeadline(time.Now().Add(m.writeTimeout))
if err := mc.Conn.WriteMessage(websocket.PingMessage, nil); err != nil {
return
}
case <-mc.Conn.Context().Done():
return
}
}
}
func (m *ConnectionManager) removeConnection(mc *ManagedConn) {
mc.cancel()
mc.Conn.Close()
m.mu.Lock()
delete(m.connections, mc.ID)
m.mu.Unlock()
m.currentConns.Add(-1)
}
func (m *ConnectionManager) Broadcast(message []byte) error {
m.mu.RLock()
defer m.mu.RUnlock()
for _, mc := range m.connections {
mc.Conn.SetWriteDeadline(time.Now().Add(m.writeTimeout))
if err := mc.Conn.WriteMessage(websocket.TextMessage, message); err != nil {
log.Printf("broadcast to %s failed: %v", mc.ID, err)
}
}
return nil
}
func (m *ConnectionManager) Stats() (total int64, active int) {
return m.currentConns.Load(), len(m.connections)
}
func generateConnID() string {
return fmt.Sprintf("%d-%06d", time.Now().UnixNano(), rand.Intn(1000000))
}
Client reconnection strategy:
package reconnect
import (
"context"
"math"
"math/rand"
"time"
"github.com/gorilla/websocket"
)
type ReconnectConfig struct {
MaxRetries int
BaseDelay time.Duration
MaxDelay time.Duration
Jitter float64
}
func DefaultReconnectConfig() ReconnectConfig {
return ReconnectConfig{
MaxRetries: 100,
BaseDelay: 1 * time.Second,
MaxDelay: 30 * time.Second,
Jitter: 0.25,
}
}
type ReconnectingClient struct {
url string
config ReconnectConfig
conn *websocket.Conn
mu sync.Mutex
}
func NewReconnectingClient(url string, config ReconnectConfig) *ReconnectingClient {
return &ReconnectingClient{
url: url,
config: config,
}
}
func (c *ReconnectingClient) Connect(ctx context.Context) (*websocket.Conn, error) {
var lastErr error
for attempt := 0; attempt < c.config.MaxRetries; attempt++ {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
conn, _, err := websocket.DefaultDialer.DialContext(ctx, c.url, nil)
if err == nil {
c.mu.Lock()
c.conn = conn
c.mu.Unlock()
return conn, nil
}
lastErr = err
delay := c.backoff(attempt)
log.Printf("connect attempt %d failed: %v, retrying in %v", attempt+1, err, delay)
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-time.After(delay):
}
}
return nil, fmt.Errorf("max retries (%d) exceeded: %w", c.config.MaxRetries, lastErr)
}
func (c *ReconnectingClient) backoff(attempt int) time.Duration {
delay := float64(c.config.BaseDelay) * math.Pow(2, float64(attempt))
if delay > float64(c.config.MaxDelay) {
delay = float64(c.config.MaxDelay)
}
jitter := delay * c.config.Jitter
delay = delay - jitter + rand.Float64()*jitter*2
return time.Duration(delay)
}
func (c *ReconnectingClient) Listen(ctx context.Context, handler func([]byte)) {
for {
conn, err := c.Connect(ctx)
if err != nil {
if ctx.Err() != nil {
return
}
continue
}
for {
_, message, err := conn.ReadMessage()
if err != nil {
log.Printf("read error: %v, reconnecting...", err)
conn.Close()
break
}
handler(message)
}
}
}
Key design points:
- Connection count limit, preventing resource exhaustion
- Exponential backoff + jitter reconnection, avoiding reconnection storms
- Heartbeat detection, auto-cleanup of dead connections
- Connection statistics, supporting operational monitoring
- Graceful disconnect, resource cleanup on context cancellation
5 Common Pitfalls and Fixes
Pitfall 1: Concurrent WebSocket Writes
❌ Wrong:
go func() {
conn.WriteMessage(websocket.TextMessage, msg1)
}()
go func() {
conn.WriteMessage(websocket.TextMessage, msg2)
}()
gorilla/websocket connections are not concurrency-safe. Multiple goroutines writing simultaneously will panic.
✅ Correct:
type SafeConn struct {
conn *websocket.Conn
mu sync.Mutex
}
func (s *SafeConn) WriteMessage(msgType int, data []byte) error {
s.mu.Lock()
defer s.mu.Unlock()
return s.conn.WriteMessage(msgType, data)
}
Pitfall 2: Forgetting to Flush SSE
❌ Wrong:
fmt.Fprintf(w, "data: %s\n\n", message)
Data is buffered in the HTTP response writer, client never receives it.
✅ Correct:
fmt.Fprintf(w, "data: %s\n\n", message)
flusher, _ := w.(http.Flusher)
flusher.Flush()
Pitfall 3: NATS Subscriber Not Acking
❌ Wrong:
sub, _ := js.Subscribe("orders", func(msg *nats.Msg) {
processOrder(msg.Data)
})
JetStream messages are never acknowledged, redelivery keeps retrying, eventually reaching MaxDeliver and entering dead letter.
✅ Correct:
sub, _ := js.Subscribe("orders", func(msg *nats.Msg) {
if err := processOrder(msg.Data); err != nil {
msg.Nak()
return
}
msg.Ack()
}, nats.ManualAck())
Pitfall 4: Reconnection Without Backoff
❌ Wrong:
for {
conn, err := dial(url)
if err != nil {
continue
}
}
100K clients reconnecting simultaneously instantly overwhelms the server.
✅ Correct:
for attempt := 0; ; attempt++ {
conn, err := dial(url)
if err != nil {
delay := backoff(attempt)
time.Sleep(delay)
continue
}
}
Pitfall 5: Channel Broadcast Blocking on Slow Consumers
❌ Wrong:
for _, ch := range subscribers {
ch <- message
}
One slow consumer blocks all subscribers.
✅ Correct:
for _, ch := range subscribers {
select {
case ch <- message:
default:
log.Printf("slow consumer, dropping message")
}
}
Error Troubleshooting Reference
| Symptom | Possible Cause | Investigation | Solution |
|---|---|---|---|
| WebSocket connections frequently disconnect | Heartbeat timeout, network jitter | Check ping/pong interval, measure network latency | Increase heartbeat interval, add reconnection logic |
concurrent write to websocket connection panic |
Multiple goroutines writing WebSocket simultaneously | Search for concurrent WriteMessage calls | Use mutex or single goroutine for writes |
| SSE client not receiving data | Forgot to Flush | Check if response writer calls Flush | Call flusher.Flush() after each write |
| NATS messages consumed repeatedly | No Ack or Ack timeout | Check JetStream MaxDeliver config | Use ManualAck, Ack after successful processing |
| Reconnection storm overwhelms server | No backoff strategy | Monitor connection establishment rate | Exponential backoff + jitter reconnection |
| Goroutine count keeps growing | Connections not cleaned up, channels not closed | pprof goroutine profile | Ensure goroutines and channels are cleaned on disconnect |
| Memory keeps growing | Message backlog, connection leaks | pprof heap profile | Limit channel buffer, add backpressure mechanism |
| Events out of order | Multiple instances processing concurrently | Check if partitioned by key | Use consistent hashing or sticky sessions |
| High broadcast latency | Serial sending, slow consumers | Monitor per-connection send latency | Parallel send + timeout skip slow consumers |
| Graceful shutdown failure | Connections not closed, messages lost | Check shutdown flow | Drain mode: stop accepting new connections, wait for existing to complete |
Advanced Optimization Techniques
Optimization 1: WebSocket Connection Sharding
Split large numbers of connections across multiple Hubs to reduce lock contention:
package shardhub
import (
"hash/fnv"
"sync"
)
type ShardedHub struct {
shards []*Hub
count int
}
func NewShardedHub(shardCount int) *ShardedHub {
shards := make([]*Hub, shardCount)
for i := range shards {
shards[i] = NewHub()
go shards[i].Run()
}
return &ShardedHub{shards: shards, count: shardCount}
}
func (s *ShardedHub) getShard(key string) *Hub {
h := fnv.New32a()
h.Write([]byte(key))
return s.shards[h.Sum32()%uint32(s.count)]
}
func (s *ShardedHub) Register(client *Client) {
shard := s.getShard(client.room)
shard.register <- client
}
func (s *ShardedHub) Broadcast(room string, message []byte) {
shard := s.getShard(room)
shard.broadcast <- message
}
Optimization 2: SSE Multiplexing
Push multiple event types over a single SSE connection, reducing connection count:
func (b *Broker) ServeHTTP(w http.ResponseWriter, r *http.Request) {
flusher, _ := w.(http.Flusher)
w.Header().Set("Content-Type", "text/event-stream")
clientChan := make(chan Event, 64)
topics := r.URL.Query()["topic"]
for _, topic := range topics {
b.subscribe(topic, clientChan)
}
for {
select {
case event := <-clientChan:
fmt.Fprintf(w, "event: %s\n", event.Event)
fmt.Fprintf(w, "data: %s\n\n", event.Data)
flusher.Flush()
case <-r.Context().Done():
return
}
}
}
Optimization 3: NATS Consumer Groups
Multiple consumers share a durable name for load balancing:
func (b *EventBus) SubscribeGroup(ctx context.Context, subject, group, durable string, handler func(Event)) error {
sub, err := b.js.Subscribe(subject, func(msg *nats.Msg) {
var event Event
json.Unmarshal(msg.Data, &event)
handler(event)
msg.Ack()
},
nats.Durable(durable),
nats.Queue(group),
nats.ManualAck(),
nats.DeliverAll(),
)
if err != nil {
return err
}
go func() {
<-ctx.Done()
sub.Unsubscribe()
}()
return nil
}
Realtime Technology Comparison
| Feature | WebSocket | SSE | NATS | Go channel |
|---|---|---|---|---|
| Communication | Bidirectional | Unidirectional (server→client) | Bidirectional | Bidirectional |
| Protocol | WS | HTTP | TCP | In-process |
| Reconnect support | Manual implementation | Built-in (Last-Event-ID) | Built-in | N/A |
| Message persistence | None | None | JetStream | None |
| Browser support | All | All | N/A | N/A |
| Proxy/CDN friendly | Poor | Good | N/A | N/A |
| Throughput | High | Medium | Very high | Very high |
| Latency | ~1ms | ~10ms | ~0.5ms | ~0.01ms |
| Use case | Chat, collaborative editing | Notifications, quotes | Microservice event bus | In-process event-driven |
| Production recommendation | ⭐⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ |
Summary
The core of Go realtime systems isn't choosing which protocol, but answering five questions: How are connections managed? How are messages routed? How is backpressure handled? How is reconnection backed off? How are events persisted? WebSocket Hub answers "connection management," SSE answers "simple push," NATS answers "message persistence and routing," Go channels answer "in-process event streaming," and ConnectionManager answers "reconnection and rate limiting." Master these 5 patterns, and you'll have the core methodology for production-grade Go realtime system design.
Recommended Tools
- JSON Formatter — Format realtime system JSON messages, quickly debug data structure issues
- Base64 Encode/Decode — Handle binary frame encoding in WebSocket transmission
- HTTP Status Code Lookup — Troubleshoot HTTP errors during SSE and WebSocket upgrade
Further Reading
Try these browser-local tools — no sign-up required →