Go即時系統設計實戰:從WebSocket到事件流的5種生產模式

后端开发

當10萬WebSocket連線同時斷開:即時系統的生產噩夢

週五晚上8點,直播平台同時線上50萬用戶。一個配置變更觸發了滾動部署,所有WebSocket連線同時斷開,50萬客戶端同時重連——雪崩效應直接把閘道打掛了。更糟糕的是,重連風暴導致訊息佇列堆積,消費者跟不上,延遲從50ms飆升到30秒,使用者體驗徹底崩潰。

這不是個例。Go的即時系統開發遠不止「升級到WebSocket」那麼簡單——你需要管理連線生命週期、處理訊息路由、應對背壓和重連風暴、確保事件有序投遞。本文將從5種生產級即時模式出發,幫你構建健壯的Go即時系統。


核心概念速查

技術 用途 關鍵特性 典型場景
WebSocket 全雙工即時通訊 持久連線,低延遲,雙向推送 聊天系統、協作編輯、即時遊戲
SSE 服務端單向推送 HTTP協定,自動重連,文字幀 通知推送、股票行情、日誌流
NATS 高效能訊息佇列 發布訂閱,JetStream持久化,叢集化 微服務事件匯流排、IoT資料分發
Go channel 進程內事件傳遞 型別安全,select多工複用,可關閉 事件驅動架構、goroutine間通訊
gorilla/websocket Go WebSocket函式庫 連線池、ping/pong、並行安全 生產級WebSocket服務

即時系統設計的5大挑戰

挑戰1:連線生命週期管理

+--------+     +--------+     +--------+
| Client |---->| Server |----| Client |
+--------+     +--------+     +--------+
     |              |              |
     |  連線建立     |   連線建立    |
     |------------->|<-------------|
     |              |              |
     |  心跳偵測     |   心跳偵測    |
     |<--ping/pong->|<--ping/pong->|
     |              |              |
     |  連線斷開     |   連線斷開    |
     |<------------X|X------------>|
     |              |              |
     |  重連風暴     |   重連風暴    |
     |=============>|<=============|

10萬連線同時斷開重連,沒有退避策略就是DDoS自己。

挑戰2:訊息路由與扇出

一則訊息需要推送給1萬個訂閱者,如何高效分發?每個訂閱者一個goroutine?還是共享寫入協程?

挑戰3:背壓與流控

生產者速度遠超消費者,channel滿了怎麼辦?丟棄?阻塞?降級?

挑戰4:事件有序性

同一使用者的事件必須有序投遞,但分散式環境下多個實例如何保證?

挑戰5:優雅降級

WebSocket不可用時降級到SSE,SSE不可用時降級到輪詢。降級策略如何設計?


5種生產級即時模式

模式1:WebSocket Hub——聊天系統

WebSocket Hub是即時系統最經典的模式:一個中心Hub管理所有連線,訊息透過Hub廣播到所有訂閱者。

                    +-------+
                    |  Hub  |
                    +-------+
                   /   |   |   \
                  v    v   v    v
              +----+ +----+ +----+ +----+
              | C1 | | C2 | | C3 | | C4 |
              +----+ +----+ +----+ +----+
              
  Client傳送訊息 --> Hub廣播 --> 所有Client接收
package wschat

import (
    "log"
    "net/http"
    "sync"

    "github.com/gorilla/websocket"
)

var upgrader = websocket.Upgrader{
    ReadBufferSize:  1024,
    WriteBufferSize: 1024,
    CheckOrigin: func(r *http.Request) bool {
        return true
    },
}

type Message struct {
    Username string `json:"username"`
    Content  string `json:"content"`
    Room     string `json:"room"`
}

type Client struct {
    hub  *Hub
    conn *websocket.Conn
    send chan []byte
    room string
}

type Hub struct {
    clients    map[*Client]bool
    broadcast  chan []byte
    register   chan *Client
    unregister chan *Client
    mu         sync.RWMutex
    rooms      map[string]map[*Client]bool
}

func NewHub() *Hub {
    return &Hub{
        broadcast:  make(chan []byte, 256),
        register:   make(chan *Client),
        unregister: make(chan *Client),
        clients:    make(map[*Client]bool),
        rooms:      make(map[string]map[*Client]bool),
    }
}

func (h *Hub) Run() {
    for {
        select {
        case client := <-h.register:
            h.mu.Lock()
            h.clients[client] = true
            if h.rooms[client.room] == nil {
                h.rooms[client.room] = make(map[*Client]bool)
            }
            h.rooms[client.room][client] = true
            h.mu.Unlock()

        case client := <-h.unregister:
            h.mu.Lock()
            if _, ok := h.clients[client]; ok {
                delete(h.clients, client)
                if room, ok := h.rooms[client.room]; ok {
                    delete(room, client)
                    if len(room) == 0 {
                        delete(h.rooms, client.room)
                    }
                }
                close(client.send)
            }
            h.mu.Unlock()

        case message := <-h.broadcast:
            h.mu.RLock()
            for client := range h.clients {
                select {
                case client.send <- message:
                default:
                    h.mu.RUnlock()
                    h.mu.Lock()
                    close(client.send)
                    delete(h.clients, client)
                    h.mu.Unlock()
                    h.mu.RLock()
                }
            }
            h.mu.RUnlock()
        }
    }
}

func (c *Client) readPump() {
    defer func() {
        c.hub.unregister <- c
        c.conn.Close()
    }()

    c.conn.SetReadLimit(512)
    c.conn.SetPongHandler(func(string) error {
        return nil
    })

    for {
        _, message, err := c.conn.ReadMessage()
        if err != nil {
            if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
                log.Printf("read error: %v", err)
            }
            break
        }
        c.hub.broadcast <- message
    }
}

func (c *Client) writePump() {
    ticker := time.NewTicker(54 * time.Second)
    defer func() {
        ticker.Stop()
        c.conn.Close()
    }()

    for {
        select {
        case message, ok := <-c.send:
            c.conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
            if !ok {
                c.conn.WriteMessage(websocket.CloseMessage, []byte{})
                return
            }

            w, err := c.conn.NextWriter(websocket.TextMessage)
            if err != nil {
                return
            }
            w.Write(message)

            n := len(c.send)
            for i := 0; i < n; i++ {
                w.Write([]byte{'\n'})
                w.Write(<-c.send)
            }

            if err := w.Close(); err != nil {
                return
            }

        case <-ticker.C:
            c.conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
            if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
                return
            }
        }
    }
}

func ServeWs(hub *Hub, w http.ResponseWriter, r *http.Request) {
    conn, err := upgrader.Upgrade(w, r, nil)
    if err != nil {
        log.Println(err)
        return
    }

    room := r.URL.Query().Get("room")
    if room == "" {
        room = "default"
    }

    client := &Client{
        hub:  hub,
        conn: conn,
        send: make(chan []byte, 256),
        room: room,
    }
    hub.register <- client

    go client.writePump()
    go client.readPump()
}

關鍵設計要點

  • Hub用channel管理註冊/註銷/廣播,避免鎖競爭
  • 每個Client獨立的readPump/writePump goroutine
  • writePump批次傳送(從channel中取n條一起寫入)
  • ping/pong心跳偵測,54秒間隔
  • send channel滿時自動斷開慢消費者

模式2:Server-Sent Events(SSE)——即時更新推送

SSE比WebSocket更輕量,基於HTTP協定,天然支援自動重連和事件ID續傳。

+--------+          +--------+
| Client |          | Server |
+--------+          +--------+
     |                   |
     |  GET /events     |
     |------------------>|
     |                   |
     |  Content-Type:   |
     |  text/event-stream|
     |<------------------|
     |                   |
     |  data: {"price":  |
     |   42000}          |
     |<------------------|
     |                   |
     |  id: 123          |
     |  data: {"price":  |
     |   42050}          |
     |<------------------|
package sse

import (
    "fmt"
    "net/http"
    "sync"
)

type Event struct {
    ID    string
    Event string
    Data  string
}

type Broker struct {
    clients     map[chan Event]bool
    newClients  chan chan Event
    deadClients chan chan Event
    mu          sync.RWMutex
}

func NewBroker() *Broker {
    return &Broker{
        clients:     make(map[chan Event]bool),
        newClients:  make(chan chan Event),
        deadClients: make(chan chan Event),
    }
}

func (b *Broker) Start() {
    for {
        select {
        case c := <-b.newClients:
            b.mu.Lock()
            b.clients[c] = true
            b.mu.Unlock()
            log.Printf("client connected, total: %d", len(b.clients))

        case c := <-b.deadClients:
            b.mu.Lock()
            delete(b.clients, c)
            b.mu.Unlock()
            close(c)
            log.Printf("client disconnected, total: %d", len(b.clients))
        }
    }
}

func (b *Broker) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    flusher, ok := w.(http.Flusher)
    if !ok {
        http.Error(w, "streaming not supported", http.StatusInternalServerError)
        return
    }

    w.Header().Set("Content-Type", "text/event-stream")
    w.Header().Set("Cache-Control", "no-cache")
    w.Header().Set("Connection", "keep-alive")
    w.Header().Set("Access-Control-Allow-Origin", "*")

    clientChan := make(chan Event, 64)
    b.newClients <- clientChan

    notify := r.Context().Done()
    go func() {
        <-notify
        b.deadClients <- clientChan
    }()

    for {
        select {
        case event, ok := <-clientChan:
            if !ok {
                return
            }
            if event.ID != "" {
                fmt.Fprintf(w, "id: %s\n", event.ID)
            }
            if event.Event != "" {
                fmt.Fprintf(w, "event: %s\n", event.Event)
            }
            fmt.Fprintf(w, "data: %s\n\n", event.Data)
            flusher.Flush()
        case <-r.Context().Done():
            return
        }
    }
}

func (b *Broker) Notify(event Event) {
    b.mu.RLock()
    defer b.mu.RUnlock()
    for client := range b.clients {
        select {
        case client <- event:
        default:
            go func(c chan Event) {
                b.deadClients <- c
            }(client)
        }
    }
}

使用範例——股票行情推送:

func main() {
    broker := NewBroker()
    go broker.Start()

    go func() {
        eventID := 0
        for {
            time.Sleep(1 * time.Second)
            eventID++
            price := fetchStockPrice("BTC")
            broker.Notify(Event{
                ID:    fmt.Sprintf("%d", eventID),
                Event: "price-update",
                Data:  fmt.Sprintf(`{"symbol":"BTC","price":%.2f}`, price),
            })
        }
    }()

    http.Handle("/events", broker)
    http.ListenAndServe(":8080", nil)
}

關鍵設計要點

  • 基於HTTP,無需協定升級,穿透代理/CDN友善
  • Last-Event-ID支援斷線續傳
  • http.Flusher確保資料即時推送
  • context取消自動清理客戶端
  • 慢消費者自動斷開(channel滿時移除)

模式3:NATS訊息佇列整合——微服務事件匯流排

NATS是Go生態中最流行的訊息佇列之一,單節點吞吐量可達1500萬msg/s,天然適合即時系統。

+--------+    +--------+    +--------+
|ServiceA|    | NATS   |    |ServiceB|
+--------+    | Server |    +--------+
     |        +--------+         |
     | publish |          | subscribe
     |-------->|          |-------->|
     |         |          |         |
     |         |  publish |         |
     |         |<---------|         |
     | subscribe          |         |
     |<--------|          |         |
+--------+    +--------+    +--------+
|ServiceC|    | NATS   |    |ServiceD|
+--------+    | JetStream   +--------+
              +--------+
package natsbus

import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "sync"

    "github.com/nats-io/nats.go"
)

type Event struct {
    Type    string          `json:"type"`
    Payload json.RawMessage `json:"payload"`
    Source  string          `json:"source"`
}

type EventBus struct {
    nc       *nats.Conn
    js       nats.JetStreamContext
    handlers map[string][]func(Event)
    mu       sync.RWMutex
}

func NewEventBus(url string) (*EventBus, error) {
    nc, err := nats.Connect(url,
        nats.ReconnectWait(2*time.Second),
        nats.MaxReconnects(60),
        nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
            log.Printf("NATS disconnected: %v", err)
        }),
        nats.ReconnectHandler(func(nc *nats.Conn) {
            log.Printf("NATS reconnected to %s", nc.ConnectedUrl())
        }),
    )
    if err != nil {
        return nil, fmt.Errorf("connect to NATS: %w", err)
    }

    js, err := nc.JetStream()
    if err != nil {
        return nil, fmt.Errorf("get JetStream context: %w", err)
    }

    return &EventBus{
        nc:       nc,
        js:       js,
        handlers: make(map[string][]func(Event)),
    }, nil
}

func (b *EventBus) Publish(ctx context.Context, subject string, event Event) error {
    data, err := json.Marshal(event)
    if err != nil {
        return fmt.Errorf("marshal event: %w", err)
    }

    select {
    case <-ctx.Done():
        return ctx.Err()
    default:
    }

    if _, err := b.js.Publish(subject, data); err != nil {
        return fmt.Errorf("publish to %s: %w", subject, err)
    }
    return nil
}

func (b *EventBus) Subscribe(ctx context.Context, subject, durable string, handler func(Event)) error {
    streamName := fmt.Sprintf("STREAM_%s", durable)

    if _, err := b.js.StreamInfo(streamName); err != nil {
        if _, err := b.js.AddStream(&nats.StreamConfig{
            Name:     streamName,
            Subjects: []string{subject},
            Retention: nats.LimitsPolicy,
            MaxMsgs:  1000000,
            MaxAge:   7 * 24 * time.Hour,
        }); err != nil {
            return fmt.Errorf("add stream %s: %w", streamName, err)
        }
    }

    sub, err := b.js.Subscribe(subject, func(msg *nats.Msg) {
        var event Event
        if err := json.Unmarshal(msg.Data, &event); err != nil {
            log.Printf("unmarshal event: %v", err)
            msg.Nak()
            return
        }
        handler(event)
        msg.Ack()
    }, nats.Durable(durable), nats.ManualAck(), nats.DeliverAll())
    if err != nil {
        return fmt.Errorf("subscribe to %s: %w", subject, err)
    }

    go func() {
        <-ctx.Done()
        sub.Unsubscribe()
    }()

    return nil
}

func (b *EventBus) Close() {
    b.nc.Close()
}

使用範例——訂單事件流:

func main() {
    bus, err := NewEventBus("nats://localhost:4222")
    if err != nil {
        log.Fatal(err)
    }
    defer bus.Close()

    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    err = bus.Subscribe(ctx, "orders.*", "order-processor", func(event natsbus.Event) {
        switch event.Type {
        case "order.created":
            processOrder(event.Payload)
        case "order.paid":
            shipOrder(event.Payload)
        case "order.cancelled":
            refundOrder(event.Payload)
        }
    })

    bus.Publish(ctx, "orders.us", natsbus.Event{
        Type:   "order.created",
        Source: "checkout-service",
        Payload: json.RawMessage(`{"id":"ORD-001","amount":99.99}`),
    })
}

關鍵設計要點

  • JetStream持久化,訊息不遺失
  • Durable訂閱者,斷線重連後從上次位置繼續
  • ManualAck確保訊息處理成功後才確認
  • 自動重連配置(ReconnectWaitMaxReconnects
  • Stream配置訊息保留策略(7天、100萬條)

模式4:事件驅動架構——Go channels

Go channel天然適合進程內事件驅動架構。結合泛型,可以構建型別安全的事件匯流排。

+----------+     +----------+     +----------+
| OrderSvc |     |EventBus  |     | NotifySvc|
+----------+     |          |     +----------+
     |           |  orders  |          |
     |---------->|  ----->  |--------->|
     |           |          |     +----------+
     |           |  payments|     | AccSvc   |
     |           |  ----->  |     +----------+
     |           |          |--------->|
     |           +----------+
package eventbus

import (
    "context"
    "fmt"
    "sync"
)

type Event[T any] struct {
    Topic   string
    Payload T
    Err     error
}

type Subscription[T any] struct {
    ch     chan Event[T]
    cancel context.CancelFunc
}

type Bus[T any] struct {
    subscribers map[string][]chan Event[T]
    mu          sync.RWMutex
    bufferSize  int
}

func NewBus[T any](bufferSize int) *Bus[T] {
    return &Bus[T]{
        subscribers: make(map[string][]chan Event[T]),
        bufferSize:  bufferSize,
    }
}

func (b *Bus[T]) Publish(ctx context.Context, topic string, payload T) error {
    b.mu.RLock()
    subs := b.subscribers[topic]
    b.mu.RUnlock()

    event := Event[T]{
        Topic:   topic,
        Payload: payload,
    }

    for _, ch := range subs {
        select {
        case ch <- event:
        case <-ctx.Done():
            return ctx.Err()
        default:
            go func(c chan Event[T]) {
                select {
                case c <- event:
                case <-ctx.Done():
                }
            }(ch)
        }
    }
    return nil
}

func (b *Bus[T]) Subscribe(ctx context.Context, topic string) *Subscription[T] {
    ch := make(chan Event[T], b.bufferSize)

    b.mu.Lock()
    b.subscribers[topic] = append(b.subscribers[topic], ch)
    b.mu.Unlock()

    ctx, cancel := context.WithCancel(ctx)

    go func() {
        <-ctx.Done()
        b.mu.Lock()
        subs := b.subscribers[topic]
        for i, sub := range subs {
            if sub == ch {
                b.subscribers[topic] = append(subs[:i], subs[i+1:]...)
                break
            }
        }
        b.mu.Unlock()
        close(ch)
    }()

    return &Subscription[T]{ch: ch, cancel: cancel}
}

func (s *Subscription[T]) Events() <-chan Event[T] {
    return s.ch
}

func (s *Subscription[T]) Unsubscribe() {
    s.cancel()
}

使用範例——訂單事件流:

type OrderEvent struct {
    OrderID string
    Status  string
    Amount  float64
}

func main() {
    bus := eventbus.NewBus[OrderEvent](256)
    ctx := context.Background()

    sub := bus.Subscribe(ctx, "orders")
    go func() {
        for event := range sub.Events() {
            fmt.Printf("order %s: %s ($%.2f)\n",
                event.Payload.OrderID,
                event.Payload.Status,
                event.Payload.Amount,
            )
        }
    }()

    bus.Publish(ctx, "orders", OrderEvent{
        OrderID: "ORD-001",
        Status:  "created",
        Amount:  99.99,
    })

    bus.Publish(ctx, "orders", OrderEvent{
        OrderID: "ORD-001",
        Status:  "paid",
        Amount:  99.99,
    })
}

關鍵設計要點

  • 泛型事件匯流排,型別安全
  • 非阻塞發布,channel滿時非同步傳送
  • context取消自動清理訂閱
  • 訂閱者獨立channel,互不影響
  • 零外部依賴,純Go標準庫實作

模式5:生產部署——連線管理與重連策略

生產環境需要處理連線管理、重連退避、連線限流、健康檢查等維運問題。

+--------+     +----------+     +--------+
| Client |---->| Gateway  |---->| Service|
+--------+     +----------+     +--------+
     |              |                |
     |  1.連線請求   |                |
     |------------->|                |
     |              |  2.限流檢查     |
     |              |---             |
     |              |  |             |
     |              |<--             |
     |  3.升級WS     |                |
     |<------------>|                |
     |              |                |
     |  4.註冊連線   |                |
     |              |--------------->|
     |              |                |
     |  5.心跳       |                |
     |<--ping/pong->|                |
     |              |                |
     |  6.斷開       |                |
     |<------------X|                |
     |              |  7.註銷連線     |
     |              |--------------->|
     |              |                |
     |  8.指數退避重連|                |
     |---->---->---->|                |
package connmanager

import (
    "context"
    "fmt"
    "math/rand"
    "net/http"
    "sync"
    "sync/atomic"
    "time"

    "github.com/gorilla/websocket"
)

type ConnectionManager struct {
    maxConnections    int64
    currentConns      atomic.Int64
    connections       map[string]*ManagedConn
    mu                sync.RWMutex
    heartbeatInterval time.Duration
    writeTimeout      time.Duration
    maxMessageSize    int64
}

type ManagedConn struct {
    ID        string
    Conn      *websocket.Conn
    Connected time.Time
    LastPing  time.Time
    cancel    context.CancelFunc
}

type ConnConfig struct {
    MaxConnections    int64
    HeartbeatInterval time.Duration
    WriteTimeout      time.Duration
    MaxMessageSize    int64
}

func NewConnectionManager(cfg ConnConfig) *ConnectionManager {
    return &ConnectionManager{
        maxConnections:    cfg.MaxConnections,
        connections:       make(map[string]*ManagedConn),
        heartbeatInterval: cfg.HeartbeatInterval,
        writeTimeout:      cfg.WriteTimeout,
        maxMessageSize:    cfg.MaxMessageSize,
    }
}

func (m *ConnectionManager) HandleConnection(w http.ResponseWriter, r *http.Request, handler func([]byte)) {
    if m.currentConns.Load() >= m.maxConnections {
        http.Error(w, "too many connections", http.StatusServiceUnavailable)
        return
    }

    upgrader := websocket.Upgrader{
        ReadBufferSize:  1024,
        WriteBufferSize: 1024,
        CheckOrigin:     func(r *http.Request) bool { return true },
    }

    conn, err := upgrader.Upgrade(w, r, nil)
    if err != nil {
        return
    }

    connID := generateConnID()
    ctx, cancel := context.WithCancel(r.Context())

    mc := &ManagedConn{
        ID:        connID,
        Conn:      conn,
        Connected: time.Now(),
        LastPing:  time.Now(),
        cancel:    cancel,
    }

    m.mu.Lock()
    m.connections[connID] = mc
    m.mu.Unlock()
    m.currentConns.Add(1)

    go m.readPump(mc, handler)
    go m.writePump(mc)
}

func (m *ConnectionManager) readPump(mc *ManagedConn, handler func([]byte)) {
    defer m.removeConnection(mc)

    mc.Conn.SetReadLimit(m.maxMessageSize)
    mc.Conn.SetPongHandler(func(string) error {
        mc.LastPing = time.Now()
        return nil
    })

    for {
        _, message, err := mc.Conn.ReadMessage()
        if err != nil {
            if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) {
                log.Printf("conn %s unexpected close: %v", mc.ID, err)
            }
            return
        }
        handler(message)
    }
}

func (m *ConnectionManager) writePump(mc *ManagedConn) {
    ticker := time.NewTicker(m.heartbeatInterval)
    defer func() {
        ticker.Stop()
        mc.Conn.Close()
    }()

    for {
        select {
        case <-ticker.C:
            mc.Conn.SetWriteDeadline(time.Now().Add(m.writeTimeout))
            if err := mc.Conn.WriteMessage(websocket.PingMessage, nil); err != nil {
                return
            }
        case <-mc.Conn.Context().Done():
            return
        }
    }
}

func (m *ConnectionManager) removeConnection(mc *ManagedConn) {
    mc.cancel()
    mc.Conn.Close()

    m.mu.Lock()
    delete(m.connections, mc.ID)
    m.mu.Unlock()
    m.currentConns.Add(-1)
}

func (m *ConnectionManager) Broadcast(message []byte) error {
    m.mu.RLock()
    defer m.mu.RUnlock()

    for _, mc := range m.connections {
        mc.Conn.SetWriteDeadline(time.Now().Add(m.writeTimeout))
        if err := mc.Conn.WriteMessage(websocket.TextMessage, message); err != nil {
            log.Printf("broadcast to %s failed: %v", mc.ID, err)
        }
    }
    return nil
}

func (m *ConnectionManager) Stats() (total int64, active int) {
    return m.currentConns.Load(), len(m.connections)
}

func generateConnID() string {
    return fmt.Sprintf("%d-%06d", time.Now().UnixNano(), rand.Intn(1000000))
}

客戶端重連策略:

package reconnect

import (
    "context"
    "math"
    "math/rand"
    "time"

    "github.com/gorilla/websocket"
)

type ReconnectConfig struct {
    MaxRetries int
    BaseDelay  time.Duration
    MaxDelay   time.Duration
    Jitter     float64
}

func DefaultReconnectConfig() ReconnectConfig {
    return ReconnectConfig{
        MaxRetries: 100,
        BaseDelay:  1 * time.Second,
        MaxDelay:   30 * time.Second,
        Jitter:     0.25,
    }
}

type ReconnectingClient struct {
    url    string
    config ReconnectConfig
    conn   *websocket.Conn
    mu     sync.Mutex
}

func NewReconnectingClient(url string, config ReconnectConfig) *ReconnectingClient {
    return &ReconnectingClient{
        url:    url,
        config: config,
    }
}

func (c *ReconnectingClient) Connect(ctx context.Context) (*websocket.Conn, error) {
    var lastErr error

    for attempt := 0; attempt < c.config.MaxRetries; attempt++ {
        select {
        case <-ctx.Done():
            return nil, ctx.Err()
        default:
        }

        conn, _, err := websocket.DefaultDialer.DialContext(ctx, c.url, nil)
        if err == nil {
            c.mu.Lock()
            c.conn = conn
            c.mu.Unlock()
            return conn, nil
        }
        lastErr = err

        delay := c.backoff(attempt)
        log.Printf("connect attempt %d failed: %v, retrying in %v", attempt+1, err, delay)

        select {
        case <-ctx.Done():
            return nil, ctx.Err()
        case <-time.After(delay):
        }
    }

    return nil, fmt.Errorf("max retries (%d) exceeded: %w", c.config.MaxRetries, lastErr)
}

func (c *ReconnectingClient) backoff(attempt int) time.Duration {
    delay := float64(c.config.BaseDelay) * math.Pow(2, float64(attempt))
    if delay > float64(c.config.MaxDelay) {
        delay = float64(c.config.MaxDelay)
    }

    jitter := delay * c.config.Jitter
    delay = delay - jitter + rand.Float64()*jitter*2

    return time.Duration(delay)
}

func (c *ReconnectingClient) Listen(ctx context.Context, handler func([]byte)) {
    for {
        conn, err := c.Connect(ctx)
        if err != nil {
            if ctx.Err() != nil {
                return
            }
            continue
        }

        for {
            _, message, err := conn.ReadMessage()
            if err != nil {
                log.Printf("read error: %v, reconnecting...", err)
                conn.Close()
                break
            }
            handler(message)
        }
    }
}

關鍵設計要點

  • 連線數限制,防止資源耗盡
  • 指數退避+抖動重連,避免重連風暴
  • 心跳偵測,自動清理死連線
  • 連線統計,支援維運監控
  • 優雅斷開,context取消時清理資源

5大常見陷阱及修復

陷阱1:WebSocket寫入並行

❌ 錯誤寫法:

go func() {
    conn.WriteMessage(websocket.TextMessage, msg1)
}()
go func() {
    conn.WriteMessage(websocket.TextMessage, msg2)
}()

gorilla/websocket的連線不是並行安全的,多個goroutine同時寫入會panic。

✅ 正確寫法:

type SafeConn struct {
    conn *websocket.Conn
    mu   sync.Mutex
}

func (s *SafeConn) WriteMessage(msgType int, data []byte) error {
    s.mu.Lock()
    defer s.mu.Unlock()
    return s.conn.WriteMessage(msgType, data)
}

陷阱2:SSE忘記Flush

❌ 錯誤寫法:

fmt.Fprintf(w, "data: %s\n\n", message)

資料會緩衝在HTTP response writer中,客戶端收不到。

✅ 正確寫法:

fmt.Fprintf(w, "data: %s\n\n", message)
flusher, _ := w.(http.Flusher)
flusher.Flush()

陷阱3:NATS訂閱者不Ack

❌ 錯誤寫法:

sub, _ := js.Subscribe("orders", func(msg *nats.Msg) {
    processOrder(msg.Data)
})

JetStream訊息不會被確認,紅色投遞不斷重試,最終達到MaxDeliver後進入死信。

✅ 正確寫法:

sub, _ := js.Subscribe("orders", func(msg *nats.Msg) {
    if err := processOrder(msg.Data); err != nil {
        msg.Nak()
        return
    }
    msg.Ack()
}, nats.ManualAck())

陷阱4:重連無退避

❌ 錯誤寫法:

for {
    conn, err := dial(url)
    if err != nil {
        continue
    }
}

10萬客戶端同時重連,瞬間打掛服務端。

✅ 正確寫法:

for attempt := 0; ; attempt++ {
    conn, err := dial(url)
    if err != nil {
        delay := backoff(attempt)
        time.Sleep(delay)
        continue
    }
}

陷阱5:channel廣播導致慢消費者阻塞

❌ 錯誤寫法:

for _, ch := range subscribers {
    ch <- message
}

一個慢消費者阻塞所有訂閱者。

✅ 正確寫法:

for _, ch := range subscribers {
    select {
    case ch <- message:
    default:
        log.Printf("slow consumer, dropping message")
    }
}

錯誤排查速查表

錯誤現象 可能原因 排查方法 解決方案
WebSocket連線頻繁斷開 心跳超時、網路抖動 檢查ping/pong間隔,檢視網路延遲 增加心跳間隔,新增重連邏輯
concurrent write to websocket connection panic 多goroutine同時寫入WebSocket 搜尋並行WriteMessage呼叫 使用互斥鎖或單goroutine寫入
SSE客戶端收不到資料 忘記Flush 檢查response writer是否呼叫了Flush 每次寫入後呼叫flusher.Flush()
NATS訊息重複消費 未Ack或Ack超時 檢查JetStream MaxDeliver配置 使用ManualAck,處理成功後Ack
重連風暴打掛服務 無退避策略 監控連線建立速率 指數退避+抖動重連
goroutine數量持續增長 連線未清理、channel未關閉 pprof goroutine profile 確保連線斷開時清理goroutine和channel
記憶體持續增長 訊息堆積、連線泄漏 pprof heap profile 限制channel buffer,新增背壓機制
事件亂序 多實例並行處理 檢查是否按key分割 使用一致性雜湊或sticky session
廣播延遲高 序列傳送、慢消費者 監控每個連線的傳送延遲 並行傳送+超時跳過慢消費者
服務優雅退出失敗 連線未關閉、訊息遺失 檢查shutdown流程 drain模式:停止接收新連線,等待現有連線完成

進階最佳化技巧

最佳化1:WebSocket連線分片

將大量連線分到多個Hub,減少鎖競爭:

package shardhub

import (
    "hash/fnv"
    "sync"
)

type ShardedHub struct {
    shards []*Hub
    count  int
}

func NewShardedHub(shardCount int) *ShardedHub {
    shards := make([]*Hub, shardCount)
    for i := range shards {
        shards[i] = NewHub()
        go shards[i].Run()
    }
    return &ShardedHub{shards: shards, count: shardCount}
}

func (s *ShardedHub) getShard(key string) *Hub {
    h := fnv.New32a()
    h.Write([]byte(key))
    return s.shards[h.Sum32()%uint32(s.count)]
}

func (s *ShardedHub) Register(client *Client) {
    shard := s.getShard(client.room)
    shard.register <- client
}

func (s *ShardedHub) Broadcast(room string, message []byte) {
    shard := s.getShard(room)
    shard.broadcast <- message
}

最佳化2:SSE多工複用

一個SSE連線推送多種事件型別,減少連線數:

func (b *Broker) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    flusher, _ := w.(http.Flusher)
    w.Header().Set("Content-Type", "text/event-stream")

    clientChan := make(chan Event, 64)
    topics := r.URL.Query()["topic"]

    for _, topic := range topics {
        b.subscribe(topic, clientChan)
    }

    for {
        select {
        case event := <-clientChan:
            fmt.Fprintf(w, "event: %s\n", event.Event)
            fmt.Fprintf(w, "data: %s\n\n", event.Data)
            flusher.Flush()
        case <-r.Context().Done():
            return
        }
    }
}

最佳化3:NATS消費者群組

多個消費者共享一個durable name,實現負載均衡:

func (b *EventBus) SubscribeGroup(ctx context.Context, subject, group, durable string, handler func(Event)) error {
    sub, err := b.js.Subscribe(subject, func(msg *nats.Msg) {
        var event Event
        json.Unmarshal(msg.Data, &event)
        handler(event)
        msg.Ack()
    },
        nats.Durable(durable),
        nats.Queue(group),
        nats.ManualAck(),
        nats.DeliverAll(),
    )
    if err != nil {
        return err
    }

    go func() {
        <-ctx.Done()
        sub.Unsubscribe()
    }()

    return nil
}

即時技術對比

特性 WebSocket SSE NATS Go channel
通訊方向 雙向 單向(服務端→客戶端) 雙向 雙向
協定 WS HTTP TCP 進程內
重連支援 需手動實作 內建(Last-Event-ID) 內建 N/A
訊息持久化 JetStream
瀏覽器支援 全部 全部 N/A N/A
代理/CDN友善 N/A N/A
吞吐量 極高 極高
延遲 ~1ms ~10ms ~0.5ms ~0.01ms
適用場景 聊天、協作編輯 通知、行情 微服務事件匯流排 進程內事件驅動
生產推薦度 ⭐⭐⭐⭐ ⭐⭐⭐⭐ ⭐⭐⭐⭐⭐ ⭐⭐⭐⭐

總結

Go即時系統的核心不是選擇哪個協定,而是回答五個問題:連線怎麼管理?訊息怎麼路由?背壓怎麼處理?重連怎麼退避?事件怎麼持久化? WebSocket Hub回答了「連線管理」,SSE回答了「簡單推送」,NATS回答了「訊息持久化和路由」,Go channel回答了「進程內事件流」,ConnectionManager回答了「重連和限流」。掌握這5種模式,你就掌握了生產級Go即時系統設計的核心方法論。


推薦工具


延伸閱讀

本站提供瀏覽器本地工具,免註冊即可試用 →

#Go实时系统#WebSocket#SSE#消息队列#事件驱动#2026#后端开发