Go Realtime System Design: 5 Production Patterns from WebSocket to Event Streaming

后端开发

When 100K WebSocket Connections Disconnect Simultaneously: A Realtime Production Nightmare

Friday 8 PM, livestream platform with 500K concurrent users. A config change triggers a rolling deployment, all WebSocket connections drop simultaneously — 500K clients reconnect at once. The avalanche effect takes down the gateway instantly. Worse, the reconnection storm causes message queue backlog, consumers can't keep up, latency spikes from 50ms to 30 seconds, and user experience collapses entirely.

This isn't an isolated case. Go realtime system development goes far beyond "upgrade to WebSocket" — you need to manage connection lifecycles, handle message routing, deal with backpressure and reconnection storms, and ensure ordered event delivery. This article covers 5 production-grade realtime patterns to help you build robust Go realtime systems.


Core Concepts Reference

Technology Purpose Key Features Typical Use Case
WebSocket Full-duplex realtime communication Persistent connection, low latency, bidirectional push Chat systems, collaborative editing, realtime gaming
SSE Server-to-client unidirectional push HTTP protocol, auto-reconnect, text frames Notification push, stock quotes, log streams
NATS High-performance message queue Pub/sub, JetStream persistence, clustering Microservice event bus, IoT data distribution
Go channel In-process event passing Type-safe, select multiplexing, closeable Event-driven architecture, inter-goroutine communication
gorilla/websocket Go WebSocket library Connection pool, ping/pong, concurrency-safe Production-grade WebSocket services

5 Challenges of Realtime System Design

Challenge 1: Connection Lifecycle Management

+--------+     +--------+     +--------+
| Client |---->| Server |----| Client |
+--------+     +--------+     +--------+
     |              |              |
     |  Connect     |   Connect    |
     |------------->|<-------------|
     |              |              |
     |  Heartbeat   |   Heartbeat  |
     |<--ping/pong->|<--ping/pong->|
     |              |              |
     |  Disconnect  |   Disconnect |
     |<------------X|X------------>|
     |              |              |
     |  Reconnect   |   Reconnect  |
     |  Storm       |   Storm      |
     |=============>|<=============|

100K connections disconnecting and reconnecting simultaneously without backoff is self-DDoS.

Challenge 2: Message Routing and Fan-out

One message needs to be pushed to 10K subscribers. One goroutine per subscriber? Or a shared writer goroutine?

Challenge 3: Backpressure and Flow Control

Producer speed far exceeds consumer speed. What happens when the channel is full? Drop? Block? Degrade?

Challenge 4: Event Ordering

Events for the same user must be delivered in order, but how do multiple instances guarantee this in a distributed environment?

Challenge 5: Graceful Degradation

Fall back to SSE when WebSocket is unavailable, fall back to polling when SSE is unavailable. How to design the degradation strategy?


5 Production-Grade Realtime Patterns

Pattern 1: WebSocket Hub — Chat System

The WebSocket Hub is the most classic realtime pattern: a central Hub manages all connections, messages are broadcast through the Hub to all subscribers.

                    +-------+
                    |  Hub  |
                    +-------+
                   /   |   |   \
                  v    v   v    v
              +----+ +----+ +----+ +----+
              | C1 | | C2 | | C3 | | C4 |
              +----+ +----+ +----+ +----+
              
  Client sends message --> Hub broadcasts --> All clients receive
package wschat

import (
    "log"
    "net/http"
    "sync"

    "github.com/gorilla/websocket"
)

var upgrader = websocket.Upgrader{
    ReadBufferSize:  1024,
    WriteBufferSize: 1024,
    CheckOrigin: func(r *http.Request) bool {
        return true
    },
}

type Message struct {
    Username string `json:"username"`
    Content  string `json:"content"`
    Room     string `json:"room"`
}

type Client struct {
    hub  *Hub
    conn *websocket.Conn
    send chan []byte
    room string
}

type Hub struct {
    clients    map[*Client]bool
    broadcast  chan []byte
    register   chan *Client
    unregister chan *Client
    mu         sync.RWMutex
    rooms      map[string]map[*Client]bool
}

func NewHub() *Hub {
    return &Hub{
        broadcast:  make(chan []byte, 256),
        register:   make(chan *Client),
        unregister: make(chan *Client),
        clients:    make(map[*Client]bool),
        rooms:      make(map[string]map[*Client]bool),
    }
}

func (h *Hub) Run() {
    for {
        select {
        case client := <-h.register:
            h.mu.Lock()
            h.clients[client] = true
            if h.rooms[client.room] == nil {
                h.rooms[client.room] = make(map[*Client]bool)
            }
            h.rooms[client.room][client] = true
            h.mu.Unlock()

        case client := <-h.unregister:
            h.mu.Lock()
            if _, ok := h.clients[client]; ok {
                delete(h.clients, client)
                if room, ok := h.rooms[client.room]; ok {
                    delete(room, client)
                    if len(room) == 0 {
                        delete(h.rooms, client.room)
                    }
                }
                close(client.send)
            }
            h.mu.Unlock()

        case message := <-h.broadcast:
            h.mu.RLock()
            for client := range h.clients {
                select {
                case client.send <- message:
                default:
                    h.mu.RUnlock()
                    h.mu.Lock()
                    close(client.send)
                    delete(h.clients, client)
                    h.mu.Unlock()
                    h.mu.RLock()
                }
            }
            h.mu.RUnlock()
        }
    }
}

func (c *Client) readPump() {
    defer func() {
        c.hub.unregister <- c
        c.conn.Close()
    }()

    c.conn.SetReadLimit(512)
    c.conn.SetPongHandler(func(string) error {
        return nil
    })

    for {
        _, message, err := c.conn.ReadMessage()
        if err != nil {
            if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
                log.Printf("read error: %v", err)
            }
            break
        }
        c.hub.broadcast <- message
    }
}

func (c *Client) writePump() {
    ticker := time.NewTicker(54 * time.Second)
    defer func() {
        ticker.Stop()
        c.conn.Close()
    }()

    for {
        select {
        case message, ok := <-c.send:
            c.conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
            if !ok {
                c.conn.WriteMessage(websocket.CloseMessage, []byte{})
                return
            }

            w, err := c.conn.NextWriter(websocket.TextMessage)
            if err != nil {
                return
            }
            w.Write(message)

            n := len(c.send)
            for i := 0; i < n; i++ {
                w.Write([]byte{'\n'})
                w.Write(<-c.send)
            }

            if err := w.Close(); err != nil {
                return
            }

        case <-ticker.C:
            c.conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
            if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
                return
            }
        }
    }
}

func ServeWs(hub *Hub, w http.ResponseWriter, r *http.Request) {
    conn, err := upgrader.Upgrade(w, r, nil)
    if err != nil {
        log.Println(err)
        return
    }

    room := r.URL.Query().Get("room")
    if room == "" {
        room = "default"
    }

    client := &Client{
        hub:  hub,
        conn: conn,
        send: make(chan []byte, 256),
        room: room,
    }
    hub.register <- client

    go client.writePump()
    go client.readPump()
}

Key design points:

  • Hub uses channels for register/unregister/broadcast, avoiding lock contention
  • Each Client has independent readPump/writePump goroutines
  • writePump batches sends (drains n messages from channel together)
  • ping/pong heartbeat detection, 54-second interval
  • Slow consumers auto-disconnected when send channel is full

Pattern 2: Server-Sent Events (SSE) — Realtime Update Push

SSE is lighter than WebSocket, based on HTTP protocol, with built-in auto-reconnect and event ID continuation.

+--------+          +--------+
| Client |          | Server |
+--------+          +--------+
     |                   |
     |  GET /events     |
     |------------------>|
     |                   |
     |  Content-Type:   |
     |  text/event-stream|
     |<------------------|
     |                   |
     |  data: {"price":  |
     |   42000}          |
     |<------------------|
     |                   |
     |  id: 123          |
     |  data: {"price":  |
     |   42050}          |
     |<------------------|
package sse

import (
    "fmt"
    "net/http"
    "sync"
)

type Event struct {
    ID    string
    Event string
    Data  string
}

type Broker struct {
    clients     map[chan Event]bool
    newClients  chan chan Event
    deadClients chan chan Event
    mu          sync.RWMutex
}

func NewBroker() *Broker {
    return &Broker{
        clients:     make(map[chan Event]bool),
        newClients:  make(chan chan Event),
        deadClients: make(chan chan Event),
    }
}

func (b *Broker) Start() {
    for {
        select {
        case c := <-b.newClients:
            b.mu.Lock()
            b.clients[c] = true
            b.mu.Unlock()
            log.Printf("client connected, total: %d", len(b.clients))

        case c := <-b.deadClients:
            b.mu.Lock()
            delete(b.clients, c)
            b.mu.Unlock()
            close(c)
            log.Printf("client disconnected, total: %d", len(b.clients))
        }
    }
}

func (b *Broker) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    flusher, ok := w.(http.Flusher)
    if !ok {
        http.Error(w, "streaming not supported", http.StatusInternalServerError)
        return
    }

    w.Header().Set("Content-Type", "text/event-stream")
    w.Header().Set("Cache-Control", "no-cache")
    w.Header().Set("Connection", "keep-alive")
    w.Header().Set("Access-Control-Allow-Origin", "*")

    clientChan := make(chan Event, 64)
    b.newClients <- clientChan

    notify := r.Context().Done()
    go func() {
        <-notify
        b.deadClients <- clientChan
    }()

    for {
        select {
        case event, ok := <-clientChan:
            if !ok {
                return
            }
            if event.ID != "" {
                fmt.Fprintf(w, "id: %s\n", event.ID)
            }
            if event.Event != "" {
                fmt.Fprintf(w, "event: %s\n", event.Event)
            }
            fmt.Fprintf(w, "data: %s\n\n", event.Data)
            flusher.Flush()
        case <-r.Context().Done():
            return
        }
    }
}

func (b *Broker) Notify(event Event) {
    b.mu.RLock()
    defer b.mu.RUnlock()
    for client := range b.clients {
        select {
        case client <- event:
        default:
            go func(c chan Event) {
                b.deadClients <- c
            }(client)
        }
    }
}

Usage example — stock quote push:

func main() {
    broker := NewBroker()
    go broker.Start()

    go func() {
        eventID := 0
        for {
            time.Sleep(1 * time.Second)
            eventID++
            price := fetchStockPrice("BTC")
            broker.Notify(Event{
                ID:    fmt.Sprintf("%d", eventID),
                Event: "price-update",
                Data:  fmt.Sprintf(`{"symbol":"BTC","price":%.2f}`, price),
            })
        }
    }()

    http.Handle("/events", broker)
    http.ListenAndServe(":8080", nil)
}

Key design points:

  • HTTP-based, no protocol upgrade needed, proxy/CDN friendly
  • Last-Event-ID supports reconnection with event continuation
  • http.Flusher ensures data is pushed immediately
  • Context cancellation auto-cleans up clients
  • Slow consumers auto-disconnected when channel is full

Pattern 3: NATS Message Queue Integration — Microservice Event Bus

NATS is one of the most popular message queues in the Go ecosystem, with single-node throughput up to 15M msg/s, naturally suited for realtime systems.

+--------+    +--------+    +--------+
|ServiceA|    | NATS   |    |ServiceB|
+--------+    | Server |    +--------+
     |        +--------+         |
     | publish |          | subscribe
     |-------->|          |-------->|
     |         |          |         |
     |         |  publish |         |
     |         |<---------|         |
     | subscribe          |         |
     |<--------|          |         |
+--------+    +--------+    +--------+
|ServiceC|    | NATS   |    |ServiceD|
+--------+    | JetStream   +--------+
              +--------+
package natsbus

import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "sync"

    "github.com/nats-io/nats.go"
)

type Event struct {
    Type    string          `json:"type"`
    Payload json.RawMessage `json:"payload"`
    Source  string          `json:"source"`
}

type EventBus struct {
    nc       *nats.Conn
    js       nats.JetStreamContext
    handlers map[string][]func(Event)
    mu       sync.RWMutex
}

func NewEventBus(url string) (*EventBus, error) {
    nc, err := nats.Connect(url,
        nats.ReconnectWait(2*time.Second),
        nats.MaxReconnects(60),
        nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
            log.Printf("NATS disconnected: %v", err)
        }),
        nats.ReconnectHandler(func(nc *nats.Conn) {
            log.Printf("NATS reconnected to %s", nc.ConnectedUrl())
        }),
    )
    if err != nil {
        return nil, fmt.Errorf("connect to NATS: %w", err)
    }

    js, err := nc.JetStream()
    if err != nil {
        return nil, fmt.Errorf("get JetStream context: %w", err)
    }

    return &EventBus{
        nc:       nc,
        js:       js,
        handlers: make(map[string][]func(Event)),
    }, nil
}

func (b *EventBus) Publish(ctx context.Context, subject string, event Event) error {
    data, err := json.Marshal(event)
    if err != nil {
        return fmt.Errorf("marshal event: %w", err)
    }

    select {
    case <-ctx.Done():
        return ctx.Err()
    default:
    }

    if _, err := b.js.Publish(subject, data); err != nil {
        return fmt.Errorf("publish to %s: %w", subject, err)
    }
    return nil
}

func (b *EventBus) Subscribe(ctx context.Context, subject, durable string, handler func(Event)) error {
    streamName := fmt.Sprintf("STREAM_%s", durable)

    if _, err := b.js.StreamInfo(streamName); err != nil {
        if _, err := b.js.AddStream(&nats.StreamConfig{
            Name:     streamName,
            Subjects: []string{subject},
            Retention: nats.LimitsPolicy,
            MaxMsgs:  1000000,
            MaxAge:   7 * 24 * time.Hour,
        }); err != nil {
            return fmt.Errorf("add stream %s: %w", streamName, err)
        }
    }

    sub, err := b.js.Subscribe(subject, func(msg *nats.Msg) {
        var event Event
        if err := json.Unmarshal(msg.Data, &event); err != nil {
            log.Printf("unmarshal event: %v", err)
            msg.Nak()
            return
        }
        handler(event)
        msg.Ack()
    }, nats.Durable(durable), nats.ManualAck(), nats.DeliverAll())
    if err != nil {
        return fmt.Errorf("subscribe to %s: %w", subject, err)
    }

    go func() {
        <-ctx.Done()
        sub.Unsubscribe()
    }()

    return nil
}

func (b *EventBus) Close() {
    b.nc.Close()
}

Usage example — order event stream:

func main() {
    bus, err := NewEventBus("nats://localhost:4222")
    if err != nil {
        log.Fatal(err)
    }
    defer bus.Close()

    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    err = bus.Subscribe(ctx, "orders.*", "order-processor", func(event natsbus.Event) {
        switch event.Type {
        case "order.created":
            processOrder(event.Payload)
        case "order.paid":
            shipOrder(event.Payload)
        case "order.cancelled":
            refundOrder(event.Payload)
        }
    })

    bus.Publish(ctx, "orders.us", natsbus.Event{
        Type:   "order.created",
        Source: "checkout-service",
        Payload: json.RawMessage(`{"id":"ORD-001","amount":99.99}`),
    })
}

Key design points:

  • JetStream persistence, messages never lost
  • Durable subscribers, resume from last position after reconnection
  • ManualAck ensures messages are only confirmed after successful processing
  • Auto-reconnect configuration (ReconnectWait, MaxReconnects)
  • Stream configuration for message retention policy (7 days, 1M messages)

Pattern 4: Event-Driven Architecture — Go Channels

Go channels are naturally suited for in-process event-driven architecture. Combined with generics, you can build type-safe event buses.

+----------+     +----------+     +----------+
| OrderSvc |     |EventBus  |     | NotifySvc|
+----------+     |          |     +----------+
     |           |  orders  |          |
     |---------->|  ----->  |--------->|
     |           |          |     +----------+
     |           |  payments|     | AccSvc   |
     |           |  ----->  |     +----------+
     |           |          |--------->|
     |           +----------+
package eventbus

import (
    "context"
    "fmt"
    "sync"
)

type Event[T any] struct {
    Topic   string
    Payload T
    Err     error
}

type Subscription[T any] struct {
    ch     chan Event[T]
    cancel context.CancelFunc
}

type Bus[T any] struct {
    subscribers map[string][]chan Event[T]
    mu          sync.RWMutex
    bufferSize  int
}

func NewBus[T any](bufferSize int) *Bus[T] {
    return &Bus[T]{
        subscribers: make(map[string][]chan Event[T]),
        bufferSize:  bufferSize,
    }
}

func (b *Bus[T]) Publish(ctx context.Context, topic string, payload T) error {
    b.mu.RLock()
    subs := b.subscribers[topic]
    b.mu.RUnlock()

    event := Event[T]{
        Topic:   topic,
        Payload: payload,
    }

    for _, ch := range subs {
        select {
        case ch <- event:
        case <-ctx.Done():
            return ctx.Err()
        default:
            go func(c chan Event[T]) {
                select {
                case c <- event:
                case <-ctx.Done():
                }
            }(ch)
        }
    }
    return nil
}

func (b *Bus[T]) Subscribe(ctx context.Context, topic string) *Subscription[T] {
    ch := make(chan Event[T], b.bufferSize)

    b.mu.Lock()
    b.subscribers[topic] = append(b.subscribers[topic], ch)
    b.mu.Unlock()

    ctx, cancel := context.WithCancel(ctx)

    go func() {
        <-ctx.Done()
        b.mu.Lock()
        subs := b.subscribers[topic]
        for i, sub := range subs {
            if sub == ch {
                b.subscribers[topic] = append(subs[:i], subs[i+1:]...)
                break
            }
        }
        b.mu.Unlock()
        close(ch)
    }()

    return &Subscription[T]{ch: ch, cancel: cancel}
}

func (s *Subscription[T]) Events() <-chan Event[T] {
    return s.ch
}

func (s *Subscription[T]) Unsubscribe() {
    s.cancel()
}

Usage example — order event stream:

type OrderEvent struct {
    OrderID string
    Status  string
    Amount  float64
}

func main() {
    bus := eventbus.NewBus[OrderEvent](256)
    ctx := context.Background()

    sub := bus.Subscribe(ctx, "orders")
    go func() {
        for event := range sub.Events() {
            fmt.Printf("order %s: %s ($%.2f)\n",
                event.Payload.OrderID,
                event.Payload.Status,
                event.Payload.Amount,
            )
        }
    }()

    bus.Publish(ctx, "orders", OrderEvent{
        OrderID: "ORD-001",
        Status:  "created",
        Amount:  99.99,
    })

    bus.Publish(ctx, "orders", OrderEvent{
        OrderID: "ORD-001",
        Status:  "paid",
        Amount:  99.99,
    })
}

Key design points:

  • Generic event bus, type-safe
  • Non-blocking publish, async send when channel is full
  • Context cancellation auto-cleans up subscriptions
  • Independent channels per subscriber, no cross-interference
  • Zero external dependencies, pure Go standard library

Pattern 5: Production Deployment — Connection Management & Reconnection Strategy

Production environments require handling connection management, reconnection backoff, connection rate limiting, health checks, and other operational concerns.

+--------+     +----------+     +--------+
| Client |---->| Gateway  |---->| Service|
+--------+     +----------+     +--------+
     |              |                |
     |  1.Connect   |                |
     |------------->|                |
     |              |  2.Rate limit  |
     |              |---             |
     |              |  |             |
     |              |<--             |
     |  3.Upgrade WS|                |
     |<------------>|                |
     |              |                |
     |  4.Register  |                |
     |              |--------------->|
     |              |                |
     |  5.Heartbeat |                |
     |<--ping/pong->|                |
     |              |                |
     |  6.Disconnect|                |
     |<------------X|                |
     |              |  7.Unregister  |
     |              |--------------->|
     |              |                |
     |  8.Exponential|               |
     |  backoff     |                |
     |---->---->---->|                |
package connmanager

import (
    "context"
    "fmt"
    "math/rand"
    "net/http"
    "sync"
    "sync/atomic"
    "time"

    "github.com/gorilla/websocket"
)

type ConnectionManager struct {
    maxConnections    int64
    currentConns      atomic.Int64
    connections       map[string]*ManagedConn
    mu                sync.RWMutex
    heartbeatInterval time.Duration
    writeTimeout      time.Duration
    maxMessageSize    int64
}

type ManagedConn struct {
    ID        string
    Conn      *websocket.Conn
    Connected time.Time
    LastPing  time.Time
    cancel    context.CancelFunc
}

type ConnConfig struct {
    MaxConnections    int64
    HeartbeatInterval time.Duration
    WriteTimeout      time.Duration
    MaxMessageSize    int64
}

func NewConnectionManager(cfg ConnConfig) *ConnectionManager {
    return &ConnectionManager{
        maxConnections:    cfg.MaxConnections,
        connections:       make(map[string]*ManagedConn),
        heartbeatInterval: cfg.HeartbeatInterval,
        writeTimeout:      cfg.WriteTimeout,
        maxMessageSize:    cfg.MaxMessageSize,
    }
}

func (m *ConnectionManager) HandleConnection(w http.ResponseWriter, r *http.Request, handler func([]byte)) {
    if m.currentConns.Load() >= m.maxConnections {
        http.Error(w, "too many connections", http.StatusServiceUnavailable)
        return
    }

    upgrader := websocket.Upgrader{
        ReadBufferSize:  1024,
        WriteBufferSize: 1024,
        CheckOrigin:     func(r *http.Request) bool { return true },
    }

    conn, err := upgrader.Upgrade(w, r, nil)
    if err != nil {
        return
    }

    connID := generateConnID()
    ctx, cancel := context.WithCancel(r.Context())

    mc := &ManagedConn{
        ID:        connID,
        Conn:      conn,
        Connected: time.Now(),
        LastPing:  time.Now(),
        cancel:    cancel,
    }

    m.mu.Lock()
    m.connections[connID] = mc
    m.mu.Unlock()
    m.currentConns.Add(1)

    go m.readPump(mc, handler)
    go m.writePump(mc)
}

func (m *ConnectionManager) readPump(mc *ManagedConn, handler func([]byte)) {
    defer m.removeConnection(mc)

    mc.Conn.SetReadLimit(m.maxMessageSize)
    mc.Conn.SetPongHandler(func(string) error {
        mc.LastPing = time.Now()
        return nil
    })

    for {
        _, message, err := mc.Conn.ReadMessage()
        if err != nil {
            if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) {
                log.Printf("conn %s unexpected close: %v", mc.ID, err)
            }
            return
        }
        handler(message)
    }
}

func (m *ConnectionManager) writePump(mc *ManagedConn) {
    ticker := time.NewTicker(m.heartbeatInterval)
    defer func() {
        ticker.Stop()
        mc.Conn.Close()
    }()

    for {
        select {
        case <-ticker.C:
            mc.Conn.SetWriteDeadline(time.Now().Add(m.writeTimeout))
            if err := mc.Conn.WriteMessage(websocket.PingMessage, nil); err != nil {
                return
            }
        case <-mc.Conn.Context().Done():
            return
        }
    }
}

func (m *ConnectionManager) removeConnection(mc *ManagedConn) {
    mc.cancel()
    mc.Conn.Close()

    m.mu.Lock()
    delete(m.connections, mc.ID)
    m.mu.Unlock()
    m.currentConns.Add(-1)
}

func (m *ConnectionManager) Broadcast(message []byte) error {
    m.mu.RLock()
    defer m.mu.RUnlock()

    for _, mc := range m.connections {
        mc.Conn.SetWriteDeadline(time.Now().Add(m.writeTimeout))
        if err := mc.Conn.WriteMessage(websocket.TextMessage, message); err != nil {
            log.Printf("broadcast to %s failed: %v", mc.ID, err)
        }
    }
    return nil
}

func (m *ConnectionManager) Stats() (total int64, active int) {
    return m.currentConns.Load(), len(m.connections)
}

func generateConnID() string {
    return fmt.Sprintf("%d-%06d", time.Now().UnixNano(), rand.Intn(1000000))
}

Client reconnection strategy:

package reconnect

import (
    "context"
    "math"
    "math/rand"
    "time"

    "github.com/gorilla/websocket"
)

type ReconnectConfig struct {
    MaxRetries int
    BaseDelay  time.Duration
    MaxDelay   time.Duration
    Jitter     float64
}

func DefaultReconnectConfig() ReconnectConfig {
    return ReconnectConfig{
        MaxRetries: 100,
        BaseDelay:  1 * time.Second,
        MaxDelay:   30 * time.Second,
        Jitter:     0.25,
    }
}

type ReconnectingClient struct {
    url    string
    config ReconnectConfig
    conn   *websocket.Conn
    mu     sync.Mutex
}

func NewReconnectingClient(url string, config ReconnectConfig) *ReconnectingClient {
    return &ReconnectingClient{
        url:    url,
        config: config,
    }
}

func (c *ReconnectingClient) Connect(ctx context.Context) (*websocket.Conn, error) {
    var lastErr error

    for attempt := 0; attempt < c.config.MaxRetries; attempt++ {
        select {
        case <-ctx.Done():
            return nil, ctx.Err()
        default:
        }

        conn, _, err := websocket.DefaultDialer.DialContext(ctx, c.url, nil)
        if err == nil {
            c.mu.Lock()
            c.conn = conn
            c.mu.Unlock()
            return conn, nil
        }
        lastErr = err

        delay := c.backoff(attempt)
        log.Printf("connect attempt %d failed: %v, retrying in %v", attempt+1, err, delay)

        select {
        case <-ctx.Done():
            return nil, ctx.Err()
        case <-time.After(delay):
        }
    }

    return nil, fmt.Errorf("max retries (%d) exceeded: %w", c.config.MaxRetries, lastErr)
}

func (c *ReconnectingClient) backoff(attempt int) time.Duration {
    delay := float64(c.config.BaseDelay) * math.Pow(2, float64(attempt))
    if delay > float64(c.config.MaxDelay) {
        delay = float64(c.config.MaxDelay)
    }

    jitter := delay * c.config.Jitter
    delay = delay - jitter + rand.Float64()*jitter*2

    return time.Duration(delay)
}

func (c *ReconnectingClient) Listen(ctx context.Context, handler func([]byte)) {
    for {
        conn, err := c.Connect(ctx)
        if err != nil {
            if ctx.Err() != nil {
                return
            }
            continue
        }

        for {
            _, message, err := conn.ReadMessage()
            if err != nil {
                log.Printf("read error: %v, reconnecting...", err)
                conn.Close()
                break
            }
            handler(message)
        }
    }
}

Key design points:

  • Connection count limit, preventing resource exhaustion
  • Exponential backoff + jitter reconnection, avoiding reconnection storms
  • Heartbeat detection, auto-cleanup of dead connections
  • Connection statistics, supporting operational monitoring
  • Graceful disconnect, resource cleanup on context cancellation

5 Common Pitfalls and Fixes

Pitfall 1: Concurrent WebSocket Writes

❌ Wrong:

go func() {
    conn.WriteMessage(websocket.TextMessage, msg1)
}()
go func() {
    conn.WriteMessage(websocket.TextMessage, msg2)
}()

gorilla/websocket connections are not concurrency-safe. Multiple goroutines writing simultaneously will panic.

✅ Correct:

type SafeConn struct {
    conn *websocket.Conn
    mu   sync.Mutex
}

func (s *SafeConn) WriteMessage(msgType int, data []byte) error {
    s.mu.Lock()
    defer s.mu.Unlock()
    return s.conn.WriteMessage(msgType, data)
}

Pitfall 2: Forgetting to Flush SSE

❌ Wrong:

fmt.Fprintf(w, "data: %s\n\n", message)

Data is buffered in the HTTP response writer, client never receives it.

✅ Correct:

fmt.Fprintf(w, "data: %s\n\n", message)
flusher, _ := w.(http.Flusher)
flusher.Flush()

Pitfall 3: NATS Subscriber Not Acking

❌ Wrong:

sub, _ := js.Subscribe("orders", func(msg *nats.Msg) {
    processOrder(msg.Data)
})

JetStream messages are never acknowledged, redelivery keeps retrying, eventually reaching MaxDeliver and entering dead letter.

✅ Correct:

sub, _ := js.Subscribe("orders", func(msg *nats.Msg) {
    if err := processOrder(msg.Data); err != nil {
        msg.Nak()
        return
    }
    msg.Ack()
}, nats.ManualAck())

Pitfall 4: Reconnection Without Backoff

❌ Wrong:

for {
    conn, err := dial(url)
    if err != nil {
        continue
    }
}

100K clients reconnecting simultaneously instantly overwhelms the server.

✅ Correct:

for attempt := 0; ; attempt++ {
    conn, err := dial(url)
    if err != nil {
        delay := backoff(attempt)
        time.Sleep(delay)
        continue
    }
}

Pitfall 5: Channel Broadcast Blocking on Slow Consumers

❌ Wrong:

for _, ch := range subscribers {
    ch <- message
}

One slow consumer blocks all subscribers.

✅ Correct:

for _, ch := range subscribers {
    select {
    case ch <- message:
    default:
        log.Printf("slow consumer, dropping message")
    }
}

Error Troubleshooting Reference

Symptom Possible Cause Investigation Solution
WebSocket connections frequently disconnect Heartbeat timeout, network jitter Check ping/pong interval, measure network latency Increase heartbeat interval, add reconnection logic
concurrent write to websocket connection panic Multiple goroutines writing WebSocket simultaneously Search for concurrent WriteMessage calls Use mutex or single goroutine for writes
SSE client not receiving data Forgot to Flush Check if response writer calls Flush Call flusher.Flush() after each write
NATS messages consumed repeatedly No Ack or Ack timeout Check JetStream MaxDeliver config Use ManualAck, Ack after successful processing
Reconnection storm overwhelms server No backoff strategy Monitor connection establishment rate Exponential backoff + jitter reconnection
Goroutine count keeps growing Connections not cleaned up, channels not closed pprof goroutine profile Ensure goroutines and channels are cleaned on disconnect
Memory keeps growing Message backlog, connection leaks pprof heap profile Limit channel buffer, add backpressure mechanism
Events out of order Multiple instances processing concurrently Check if partitioned by key Use consistent hashing or sticky sessions
High broadcast latency Serial sending, slow consumers Monitor per-connection send latency Parallel send + timeout skip slow consumers
Graceful shutdown failure Connections not closed, messages lost Check shutdown flow Drain mode: stop accepting new connections, wait for existing to complete

Advanced Optimization Techniques

Optimization 1: WebSocket Connection Sharding

Split large numbers of connections across multiple Hubs to reduce lock contention:

package shardhub

import (
    "hash/fnv"
    "sync"
)

type ShardedHub struct {
    shards []*Hub
    count  int
}

func NewShardedHub(shardCount int) *ShardedHub {
    shards := make([]*Hub, shardCount)
    for i := range shards {
        shards[i] = NewHub()
        go shards[i].Run()
    }
    return &ShardedHub{shards: shards, count: shardCount}
}

func (s *ShardedHub) getShard(key string) *Hub {
    h := fnv.New32a()
    h.Write([]byte(key))
    return s.shards[h.Sum32()%uint32(s.count)]
}

func (s *ShardedHub) Register(client *Client) {
    shard := s.getShard(client.room)
    shard.register <- client
}

func (s *ShardedHub) Broadcast(room string, message []byte) {
    shard := s.getShard(room)
    shard.broadcast <- message
}

Optimization 2: SSE Multiplexing

Push multiple event types over a single SSE connection, reducing connection count:

func (b *Broker) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    flusher, _ := w.(http.Flusher)
    w.Header().Set("Content-Type", "text/event-stream")

    clientChan := make(chan Event, 64)
    topics := r.URL.Query()["topic"]

    for _, topic := range topics {
        b.subscribe(topic, clientChan)
    }

    for {
        select {
        case event := <-clientChan:
            fmt.Fprintf(w, "event: %s\n", event.Event)
            fmt.Fprintf(w, "data: %s\n\n", event.Data)
            flusher.Flush()
        case <-r.Context().Done():
            return
        }
    }
}

Optimization 3: NATS Consumer Groups

Multiple consumers share a durable name for load balancing:

func (b *EventBus) SubscribeGroup(ctx context.Context, subject, group, durable string, handler func(Event)) error {
    sub, err := b.js.Subscribe(subject, func(msg *nats.Msg) {
        var event Event
        json.Unmarshal(msg.Data, &event)
        handler(event)
        msg.Ack()
    },
        nats.Durable(durable),
        nats.Queue(group),
        nats.ManualAck(),
        nats.DeliverAll(),
    )
    if err != nil {
        return err
    }

    go func() {
        <-ctx.Done()
        sub.Unsubscribe()
    }()

    return nil
}

Realtime Technology Comparison

Feature WebSocket SSE NATS Go channel
Communication Bidirectional Unidirectional (server→client) Bidirectional Bidirectional
Protocol WS HTTP TCP In-process
Reconnect support Manual implementation Built-in (Last-Event-ID) Built-in N/A
Message persistence None None JetStream None
Browser support All All N/A N/A
Proxy/CDN friendly Poor Good N/A N/A
Throughput High Medium Very high Very high
Latency ~1ms ~10ms ~0.5ms ~0.01ms
Use case Chat, collaborative editing Notifications, quotes Microservice event bus In-process event-driven
Production recommendation ⭐⭐⭐⭐ ⭐⭐⭐⭐ ⭐⭐⭐⭐⭐ ⭐⭐⭐⭐

Summary

The core of Go realtime systems isn't choosing which protocol, but answering five questions: How are connections managed? How are messages routed? How is backpressure handled? How is reconnection backed off? How are events persisted? WebSocket Hub answers "connection management," SSE answers "simple push," NATS answers "message persistence and routing," Go channels answer "in-process event streaming," and ConnectionManager answers "reconnection and rate limiting." Master these 5 patterns, and you'll have the core methodology for production-grade Go realtime system design.



Further Reading

Try these browser-local tools — no sign-up required →

#Go实时系统#WebSocket#SSE#消息队列#事件驱动#2026#后端开发