Goリアルタイムシステム設計実践:WebSocketからイベントストリーミングまでの5つのプロダクションパターン

后端开发

10万のWebSocket接続が同時に切断される時:リアルタイムシステムのプロダクションの悪夢

金曜日の午後8時、ライブ配信プラットフォームに50万人の同時接続ユーザー。設定変更がローリングデプロイをトリガーし、全WebSocket接続が同時に切断——50万クライアントが一斉に再接続。雪崩効果でゲートウェイが即座にダウン。さらに悪いことに、再接続ストームがメッセージキューのバックログを引き起こし、コンシューマーが追いつかず、レイテンシが50msから30秒に急増、ユーザー体験が完全に崩壊。

これは決して稀なケースではない。Goのリアルタイムシステム開発は「WebSocketにアップグレードする」だけでは終わらない——接続ライフサイクルの管理、メッセージルーティングの処理、バックプレッシャーと再接続ストームへの対処、イベントの順序付き配信の確保が必要だ。本記事では5つのプロダクション級リアルタイムパターンを解説し、堅牢なGoリアルタイムシステムの構築を支援する。


コア概念リファレンス

技術 用途 主な特徴 典型的なユースケース
WebSocket 全二重リアルタイム通信 永続接続、低レイテンシ、双方向プッシュ チャットシステム、協調編集、リアルタイムゲーム
SSE サーバーからクライアントへの一方向プッシュ HTTPプロトコル、自動再接続、テキストフレーム 通知プッシュ、株価行情、ログストリーム
NATS 高性能メッセージキュー パブサブ、JetStream永続化、クラスタリング マイクロサービスイベントバス、IoTデータ配信
Go channel プロセス内イベント伝達 型安全、select多重化、クローズ可能 イベント駆動アーキテクチャ、ゴルーチン間通信
gorilla/websocket Go WebSocketライブラリ 接続プール、ping/pong、並行安全 プロダクション級WebSocketサービス

リアルタイムシステム設計の5つの課題

課題1:接続ライフサイクル管理

+--------+     +--------+     +--------+
| Client |---->| Server |----| Client |
+--------+     +--------+     +--------+
     |              |              |
     |  接続確立     |   接続確立    |
     |------------->|<-------------|
     |              |              |
     |  ハートビート  |   ハートビート |
     |<--ping/pong->|<--ping/pong->|
     |              |              |
     |  接続切断     |   接続切断    |
     |<------------X|X------------>|
     |              |              |
     |  再接続ストーム |   再接続ストーム|
     |=============>|<=============|

10万接続が同時に切断・再接続する時、バックオフ戦略なしでは自分自身にDDoS攻撃を仕掛けるようなものだ。

課題2:メッセージルーティングとファンアウト

1つのメッセージを1万人のサブスクライバーにプッシュする必要がある。サブスクライバーごとに1つのゴルーチン?それとも共有ライターゴルーチン?

課題3:バックプレッシャーとフロー制御

プロデューサーの速度がコンシューマーを大幅に上回る。チャネルが満杯の時どうする?破棄?ブロック?グレード?

課題4:イベントの順序性

同じユーザーのイベントは順序通りに配信されなければならないが、分散環境で複数インスタンスがこれをどう保証するか?

課題5:グレースフルデグラデーション

WebSocketが利用不可ならSSEにフォールバック、SSEが不可ならポーリングにフォールバック。デグラデーション戦略をどう設計するか?


5つのプロダクション級リアルタイムパターン

パターン1:WebSocket Hub——チャットシステム

WebSocket Hubはリアルタイムシステムで最も古典的なパターン:中央Hubが全接続を管理し、メッセージはHubを通じて全サブスクライバーにブロードキャストされる。

                    +-------+
                    |  Hub  |
                    +-------+
                   /   |   |   \
                  v    v   v    v
              +----+ +----+ +----+ +----+
              | C1 | | C2 | | C3 | | C4 |
              +----+ +----+ +----+ +----+
              
  クライアントがメッセージ送信 --> Hubがブロードキャスト --> 全クライアントが受信
package wschat

import (
    "log"
    "net/http"
    "sync"

    "github.com/gorilla/websocket"
)

var upgrader = websocket.Upgrader{
    ReadBufferSize:  1024,
    WriteBufferSize: 1024,
    CheckOrigin: func(r *http.Request) bool {
        return true
    },
}

type Message struct {
    Username string `json:"username"`
    Content  string `json:"content"`
    Room     string `json:"room"`
}

type Client struct {
    hub  *Hub
    conn *websocket.Conn
    send chan []byte
    room string
}

type Hub struct {
    clients    map[*Client]bool
    broadcast  chan []byte
    register   chan *Client
    unregister chan *Client
    mu         sync.RWMutex
    rooms      map[string]map[*Client]bool
}

func NewHub() *Hub {
    return &Hub{
        broadcast:  make(chan []byte, 256),
        register:   make(chan *Client),
        unregister: make(chan *Client),
        clients:    make(map[*Client]bool),
        rooms:      make(map[string]map[*Client]bool),
    }
}

func (h *Hub) Run() {
    for {
        select {
        case client := <-h.register:
            h.mu.Lock()
            h.clients[client] = true
            if h.rooms[client.room] == nil {
                h.rooms[client.room] = make(map[*Client]bool)
            }
            h.rooms[client.room][client] = true
            h.mu.Unlock()

        case client := <-h.unregister:
            h.mu.Lock()
            if _, ok := h.clients[client]; ok {
                delete(h.clients, client)
                if room, ok := h.rooms[client.room]; ok {
                    delete(room, client)
                    if len(room) == 0 {
                        delete(h.rooms, client.room)
                    }
                }
                close(client.send)
            }
            h.mu.Unlock()

        case message := <-h.broadcast:
            h.mu.RLock()
            for client := range h.clients {
                select {
                case client.send <- message:
                default:
                    h.mu.RUnlock()
                    h.mu.Lock()
                    close(client.send)
                    delete(h.clients, client)
                    h.mu.Unlock()
                    h.mu.RLock()
                }
            }
            h.mu.RUnlock()
        }
    }
}

func (c *Client) readPump() {
    defer func() {
        c.hub.unregister <- c
        c.conn.Close()
    }()

    c.conn.SetReadLimit(512)
    c.conn.SetPongHandler(func(string) error {
        return nil
    })

    for {
        _, message, err := c.conn.ReadMessage()
        if err != nil {
            if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
                log.Printf("read error: %v", err)
            }
            break
        }
        c.hub.broadcast <- message
    }
}

func (c *Client) writePump() {
    ticker := time.NewTicker(54 * time.Second)
    defer func() {
        ticker.Stop()
        c.conn.Close()
    }()

    for {
        select {
        case message, ok := <-c.send:
            c.conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
            if !ok {
                c.conn.WriteMessage(websocket.CloseMessage, []byte{})
                return
            }

            w, err := c.conn.NextWriter(websocket.TextMessage)
            if err != nil {
                return
            }
            w.Write(message)

            n := len(c.send)
            for i := 0; i < n; i++ {
                w.Write([]byte{'\n'})
                w.Write(<-c.send)
            }

            if err := w.Close(); err != nil {
                return
            }

        case <-ticker.C:
            c.conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
            if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
                return
            }
        }
    }
}

func ServeWs(hub *Hub, w http.ResponseWriter, r *http.Request) {
    conn, err := upgrader.Upgrade(w, r, nil)
    if err != nil {
        log.Println(err)
        return
    }

    room := r.URL.Query().Get("room")
    if room == "" {
        room = "default"
    }

    client := &Client{
        hub:  hub,
        conn: conn,
        send: make(chan []byte, 256),
        room: room,
    }
    hub.register <- client

    go client.writePump()
    go client.readPump()
}

主要な設計ポイント

  • Hubはチャネルで登録/登録解除/ブロードキャストを管理、ロック競合を回避
  • 各Clientに独立したreadPump/writePumpゴルーチン
  • writePumpはバッチ送信(チャネルからn件のメッセージをまとめて書き込み)
  • ping/pongハートビート検出、54秒間隔
  • 送信チャネル満杯時、スローコンシューマーを自動切断

パターン2:Server-Sent Events(SSE)——リアルタイム更新プッシュ

SSEはWebSocketより軽量で、HTTPプロトコルベース、自動再接続とイベントID継続をネイティブサポート。

+--------+          +--------+
| Client |          | Server |
+--------+          +--------+
     |                   |
     |  GET /events     |
     |------------------>|
     |                   |
     |  Content-Type:   |
     |  text/event-stream|
     |<------------------|
     |                   |
     |  data: {"price":  |
     |   42000}          |
     |<------------------|
     |                   |
     |  id: 123          |
     |  data: {"price":  |
     |   42050}          |
     |<------------------|
package sse

import (
    "fmt"
    "net/http"
    "sync"
)

type Event struct {
    ID    string
    Event string
    Data  string
}

type Broker struct {
    clients     map[chan Event]bool
    newClients  chan chan Event
    deadClients chan chan Event
    mu          sync.RWMutex
}

func NewBroker() *Broker {
    return &Broker{
        clients:     make(map[chan Event]bool),
        newClients:  make(chan chan Event),
        deadClients: make(chan chan Event),
    }
}

func (b *Broker) Start() {
    for {
        select {
        case c := <-b.newClients:
            b.mu.Lock()
            b.clients[c] = true
            b.mu.Unlock()
            log.Printf("client connected, total: %d", len(b.clients))

        case c := <-b.deadClients:
            b.mu.Lock()
            delete(b.clients, c)
            b.mu.Unlock()
            close(c)
            log.Printf("client disconnected, total: %d", len(b.clients))
        }
    }
}

func (b *Broker) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    flusher, ok := w.(http.Flusher)
    if !ok {
        http.Error(w, "streaming not supported", http.StatusInternalServerError)
        return
    }

    w.Header().Set("Content-Type", "text/event-stream")
    w.Header().Set("Cache-Control", "no-cache")
    w.Header().Set("Connection", "keep-alive")
    w.Header().Set("Access-Control-Allow-Origin", "*")

    clientChan := make(chan Event, 64)
    b.newClients <- clientChan

    notify := r.Context().Done()
    go func() {
        <-notify
        b.deadClients <- clientChan
    }()

    for {
        select {
        case event, ok := <-clientChan:
            if !ok {
                return
            }
            if event.ID != "" {
                fmt.Fprintf(w, "id: %s\n", event.ID)
            }
            if event.Event != "" {
                fmt.Fprintf(w, "event: %s\n", event.Event)
            }
            fmt.Fprintf(w, "data: %s\n\n", event.Data)
            flusher.Flush()
        case <-r.Context().Done():
            return
        }
    }
}

func (b *Broker) Notify(event Event) {
    b.mu.RLock()
    defer b.mu.RUnlock()
    for client := range b.clients {
        select {
        case client <- event:
        default:
            go func(c chan Event) {
                b.deadClients <- c
            }(client)
        }
    }
}

使用例——株価行情プッシュ:

func main() {
    broker := NewBroker()
    go broker.Start()

    go func() {
        eventID := 0
        for {
            time.Sleep(1 * time.Second)
            eventID++
            price := fetchStockPrice("BTC")
            broker.Notify(Event{
                ID:    fmt.Sprintf("%d", eventID),
                Event: "price-update",
                Data:  fmt.Sprintf(`{"symbol":"BTC","price":%.2f}`, price),
            })
        }
    }()

    http.Handle("/events", broker)
    http.ListenAndServe(":8080", nil)
}

主要な設計ポイント

  • HTTPベース、プロトコルアップグレード不要、プロキシ/CDNフレンドリー
  • Last-Event-IDで再接続時のイベント継続をサポート
  • http.Flusherでデータの即時プッシュを保証
  • コンテキストキャンセルでクライアントを自動クリーンアップ
  • チャネル満杯時、スローコンシューマーを自動切断

パターン3:NATSメッセージキュー統合——マイクロサービスイベントバス

NATSはGoエコシステムで最も人気のあるメッセージキューの一つで、単一ノードのスループットは1500万msg/秒に達し、リアルタイムシステムに天然適している。

+--------+    +--------+    +--------+
|ServiceA|    | NATS   |    |ServiceB|
+--------+    | Server |    +--------+
     |        +--------+         |
     | publish |          | subscribe
     |-------->|          |-------->|
     |         |          |         |
     |         |  publish |         |
     |         |<---------|         |
     | subscribe          |         |
     |<--------|          |         |
+--------+    +--------+    +--------+
|ServiceC|    | NATS   |    |ServiceD|
+--------+    | JetStream   +--------+
              +--------+
package natsbus

import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "sync"

    "github.com/nats-io/nats.go"
)

type Event struct {
    Type    string          `json:"type"`
    Payload json.RawMessage `json:"payload"`
    Source  string          `json:"source"`
}

type EventBus struct {
    nc       *nats.Conn
    js       nats.JetStreamContext
    handlers map[string][]func(Event)
    mu       sync.RWMutex
}

func NewEventBus(url string) (*EventBus, error) {
    nc, err := nats.Connect(url,
        nats.ReconnectWait(2*time.Second),
        nats.MaxReconnects(60),
        nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
            log.Printf("NATS disconnected: %v", err)
        }),
        nats.ReconnectHandler(func(nc *nats.Conn) {
            log.Printf("NATS reconnected to %s", nc.ConnectedUrl())
        }),
    )
    if err != nil {
        return nil, fmt.Errorf("connect to NATS: %w", err)
    }

    js, err := nc.JetStream()
    if err != nil {
        return nil, fmt.Errorf("get JetStream context: %w", err)
    }

    return &EventBus{
        nc:       nc,
        js:       js,
        handlers: make(map[string][]func(Event)),
    }, nil
}

func (b *EventBus) Publish(ctx context.Context, subject string, event Event) error {
    data, err := json.Marshal(event)
    if err != nil {
        return fmt.Errorf("marshal event: %w", err)
    }

    select {
    case <-ctx.Done():
        return ctx.Err()
    default:
    }

    if _, err := b.js.Publish(subject, data); err != nil {
        return fmt.Errorf("publish to %s: %w", subject, err)
    }
    return nil
}

func (b *EventBus) Subscribe(ctx context.Context, subject, durable string, handler func(Event)) error {
    streamName := fmt.Sprintf("STREAM_%s", durable)

    if _, err := b.js.StreamInfo(streamName); err != nil {
        if _, err := b.js.AddStream(&nats.StreamConfig{
            Name:     streamName,
            Subjects: []string{subject},
            Retention: nats.LimitsPolicy,
            MaxMsgs:  1000000,
            MaxAge:   7 * 24 * time.Hour,
        }); err != nil {
            return fmt.Errorf("add stream %s: %w", streamName, err)
        }
    }

    sub, err := b.js.Subscribe(subject, func(msg *nats.Msg) {
        var event Event
        if err := json.Unmarshal(msg.Data, &event); err != nil {
            log.Printf("unmarshal event: %v", err)
            msg.Nak()
            return
        }
        handler(event)
        msg.Ack()
    }, nats.Durable(durable), nats.ManualAck(), nats.DeliverAll())
    if err != nil {
        return fmt.Errorf("subscribe to %s: %w", subject, err)
    }

    go func() {
        <-ctx.Done()
        sub.Unsubscribe()
    }()

    return nil
}

func (b *EventBus) Close() {
    b.nc.Close()
}

使用例——オーダーイベントストリーム:

func main() {
    bus, err := NewEventBus("nats://localhost:4222")
    if err != nil {
        log.Fatal(err)
    }
    defer bus.Close()

    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    err = bus.Subscribe(ctx, "orders.*", "order-processor", func(event natsbus.Event) {
        switch event.Type {
        case "order.created":
            processOrder(event.Payload)
        case "order.paid":
            shipOrder(event.Payload)
        case "order.cancelled":
            refundOrder(event.Payload)
        }
    })

    bus.Publish(ctx, "orders.us", natsbus.Event{
        Type:   "order.created",
        Source: "checkout-service",
        Payload: json.RawMessage(`{"id":"ORD-001","amount":99.99}`),
    })
}

主要な設計ポイント

  • JetStream永続化、メッセージ損失なし
  • Durableサブスクライバー、再接続後に前回位置から再開
  • ManualAckでメッセージ処理成功後にのみ確認
  • 自動再接続設定(ReconnectWaitMaxReconnects
  • Stream設定でメッセージ保持ポリシー(7日間、100万件)

パターン4:イベント駆動アーキテクチャ——Goチャネル

Goチャネルはプロセス内イベント駆動アーキテクチャに天然適している。ジェネリクスと組み合わせて型安全なイベントバスを構築できる。

+----------+     +----------+     +----------+
| OrderSvc |     |EventBus  |     | NotifySvc|
+----------+     |          |     +----------+
     |           |  orders  |          |
     |---------->|  ----->  |--------->|
     |           |          |     +----------+
     |           |  payments|     | AccSvc   |
     |           |  ----->  |     +----------+
     |           |          |--------->|
     |           +----------+
package eventbus

import (
    "context"
    "fmt"
    "sync"
)

type Event[T any] struct {
    Topic   string
    Payload T
    Err     error
}

type Subscription[T any] struct {
    ch     chan Event[T]
    cancel context.CancelFunc
}

type Bus[T any] struct {
    subscribers map[string][]chan Event[T]
    mu          sync.RWMutex
    bufferSize  int
}

func NewBus[T any](bufferSize int) *Bus[T] {
    return &Bus[T]{
        subscribers: make(map[string][]chan Event[T]),
        bufferSize:  bufferSize,
    }
}

func (b *Bus[T]) Publish(ctx context.Context, topic string, payload T) error {
    b.mu.RLock()
    subs := b.subscribers[topic]
    b.mu.RUnlock()

    event := Event[T]{
        Topic:   topic,
        Payload: payload,
    }

    for _, ch := range subs {
        select {
        case ch <- event:
        case <-ctx.Done():
            return ctx.Err()
        default:
            go func(c chan Event[T]) {
                select {
                case c <- event:
                case <-ctx.Done():
                }
            }(ch)
        }
    }
    return nil
}

func (b *Bus[T]) Subscribe(ctx context.Context, topic string) *Subscription[T] {
    ch := make(chan Event[T], b.bufferSize)

    b.mu.Lock()
    b.subscribers[topic] = append(b.subscribers[topic], ch)
    b.mu.Unlock()

    ctx, cancel := context.WithCancel(ctx)

    go func() {
        <-ctx.Done()
        b.mu.Lock()
        subs := b.subscribers[topic]
        for i, sub := range subs {
            if sub == ch {
                b.subscribers[topic] = append(subs[:i], subs[i+1:]...)
                break
            }
        }
        b.mu.Unlock()
        close(ch)
    }()

    return &Subscription[T]{ch: ch, cancel: cancel}
}

func (s *Subscription[T]) Events() <-chan Event[T] {
    return s.ch
}

func (s *Subscription[T]) Unsubscribe() {
    s.cancel()
}

使用例——オーダーイベントストリーム:

type OrderEvent struct {
    OrderID string
    Status  string
    Amount  float64
}

func main() {
    bus := eventbus.NewBus[OrderEvent](256)
    ctx := context.Background()

    sub := bus.Subscribe(ctx, "orders")
    go func() {
        for event := range sub.Events() {
            fmt.Printf("order %s: %s ($%.2f)\n",
                event.Payload.OrderID,
                event.Payload.Status,
                event.Payload.Amount,
            )
        }
    }()

    bus.Publish(ctx, "orders", OrderEvent{
        OrderID: "ORD-001",
        Status:  "created",
        Amount:  99.99,
    })

    bus.Publish(ctx, "orders", OrderEvent{
        OrderID: "ORD-001",
        Status:  "paid",
        Amount:  99.99,
    })
}

主要な設計ポイント

  • ジェネリックイベントバス、型安全
  • ノンブロッキングパブリッシュ、チャネル満杯時に非同期送信
  • コンテキストキャンセルでサブスクリプションを自動クリーンアップ
  • サブスクライバーごとに独立したチャネル、相互影響なし
  • 外部依存ゼロ、純粋なGo標準ライブラリ実装

パターン5:プロダクションデプロイ——接続管理と再接続戦略

プロダクション環境では接続管理、再接続バックオフ、接続レート制限、ヘルスチェックなどの運用問題を処理する必要がある。

+--------+     +----------+     +--------+
| Client |---->| Gateway  |---->| Service|
+--------+     +----------+     +--------+
     |              |                |
     |  1.接続要求   |                |
     |------------->|                |
     |              |  2.レート制限   |
     |              |---             |
     |              |  |             |
     |              |<--             |
     |  3.WSアップグレード|            |
     |<------------>|                |
     |              |                |
     |  4.接続登録   |                |
     |              |--------------->|
     |              |                |
     |  5.ハートビート |               |
     |<--ping/pong->|                |
     |              |                |
     |  6.切断       |                |
     |<------------X|                |
     |              |  7.登録解除     |
     |              |--------------->|
     |              |                |
     |  8.指数バックオフ|              |
     |  再接続       |                |
     |---->---->---->|                |
package connmanager

import (
    "context"
    "fmt"
    "math/rand"
    "net/http"
    "sync"
    "sync/atomic"
    "time"

    "github.com/gorilla/websocket"
)

type ConnectionManager struct {
    maxConnections    int64
    currentConns      atomic.Int64
    connections       map[string]*ManagedConn
    mu                sync.RWMutex
    heartbeatInterval time.Duration
    writeTimeout      time.Duration
    maxMessageSize    int64
}

type ManagedConn struct {
    ID        string
    Conn      *websocket.Conn
    Connected time.Time
    LastPing  time.Time
    cancel    context.CancelFunc
}

type ConnConfig struct {
    MaxConnections    int64
    HeartbeatInterval time.Duration
    WriteTimeout      time.Duration
    MaxMessageSize    int64
}

func NewConnectionManager(cfg ConnConfig) *ConnectionManager {
    return &ConnectionManager{
        maxConnections:    cfg.MaxConnections,
        connections:       make(map[string]*ManagedConn),
        heartbeatInterval: cfg.HeartbeatInterval,
        writeTimeout:      cfg.WriteTimeout,
        maxMessageSize:    cfg.MaxMessageSize,
    }
}

func (m *ConnectionManager) HandleConnection(w http.ResponseWriter, r *http.Request, handler func([]byte)) {
    if m.currentConns.Load() >= m.maxConnections {
        http.Error(w, "too many connections", http.StatusServiceUnavailable)
        return
    }

    upgrader := websocket.Upgrader{
        ReadBufferSize:  1024,
        WriteBufferSize: 1024,
        CheckOrigin:     func(r *http.Request) bool { return true },
    }

    conn, err := upgrader.Upgrade(w, r, nil)
    if err != nil {
        return
    }

    connID := generateConnID()
    ctx, cancel := context.WithCancel(r.Context())

    mc := &ManagedConn{
        ID:        connID,
        Conn:      conn,
        Connected: time.Now(),
        LastPing:  time.Now(),
        cancel:    cancel,
    }

    m.mu.Lock()
    m.connections[connID] = mc
    m.mu.Unlock()
    m.currentConns.Add(1)

    go m.readPump(mc, handler)
    go m.writePump(mc)
}

func (m *ConnectionManager) readPump(mc *ManagedConn, handler func([]byte)) {
    defer m.removeConnection(mc)

    mc.Conn.SetReadLimit(m.maxMessageSize)
    mc.Conn.SetPongHandler(func(string) error {
        mc.LastPing = time.Now()
        return nil
    })

    for {
        _, message, err := mc.Conn.ReadMessage()
        if err != nil {
            if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) {
                log.Printf("conn %s unexpected close: %v", mc.ID, err)
            }
            return
        }
        handler(message)
    }
}

func (m *ConnectionManager) writePump(mc *ManagedConn) {
    ticker := time.NewTicker(m.heartbeatInterval)
    defer func() {
        ticker.Stop()
        mc.Conn.Close()
    }()

    for {
        select {
        case <-ticker.C:
            mc.Conn.SetWriteDeadline(time.Now().Add(m.writeTimeout))
            if err := mc.Conn.WriteMessage(websocket.PingMessage, nil); err != nil {
                return
            }
        case <-mc.Conn.Context().Done():
            return
        }
    }
}

func (m *ConnectionManager) removeConnection(mc *ManagedConn) {
    mc.cancel()
    mc.Conn.Close()

    m.mu.Lock()
    delete(m.connections, mc.ID)
    m.mu.Unlock()
    m.currentConns.Add(-1)
}

func (m *ConnectionManager) Broadcast(message []byte) error {
    m.mu.RLock()
    defer m.mu.RUnlock()

    for _, mc := range m.connections {
        mc.Conn.SetWriteDeadline(time.Now().Add(m.writeTimeout))
        if err := mc.Conn.WriteMessage(websocket.TextMessage, message); err != nil {
            log.Printf("broadcast to %s failed: %v", mc.ID, err)
        }
    }
    return nil
}

func (m *ConnectionManager) Stats() (total int64, active int) {
    return m.currentConns.Load(), len(m.connections)
}

func generateConnID() string {
    return fmt.Sprintf("%d-%06d", time.Now().UnixNano(), rand.Intn(1000000))
}

クライアント再接続戦略:

package reconnect

import (
    "context"
    "math"
    "math/rand"
    "time"

    "github.com/gorilla/websocket"
)

type ReconnectConfig struct {
    MaxRetries int
    BaseDelay  time.Duration
    MaxDelay   time.Duration
    Jitter     float64
}

func DefaultReconnectConfig() ReconnectConfig {
    return ReconnectConfig{
        MaxRetries: 100,
        BaseDelay:  1 * time.Second,
        MaxDelay:   30 * time.Second,
        Jitter:     0.25,
    }
}

type ReconnectingClient struct {
    url    string
    config ReconnectConfig
    conn   *websocket.Conn
    mu     sync.Mutex
}

func NewReconnectingClient(url string, config ReconnectConfig) *ReconnectingClient {
    return &ReconnectingClient{
        url:    url,
        config: config,
    }
}

func (c *ReconnectingClient) Connect(ctx context.Context) (*websocket.Conn, error) {
    var lastErr error

    for attempt := 0; attempt < c.config.MaxRetries; attempt++ {
        select {
        case <-ctx.Done():
            return nil, ctx.Err()
        default:
        }

        conn, _, err := websocket.DefaultDialer.DialContext(ctx, c.url, nil)
        if err == nil {
            c.mu.Lock()
            c.conn = conn
            c.mu.Unlock()
            return conn, nil
        }
        lastErr = err

        delay := c.backoff(attempt)
        log.Printf("connect attempt %d failed: %v, retrying in %v", attempt+1, err, delay)

        select {
        case <-ctx.Done():
            return nil, ctx.Err()
        case <-time.After(delay):
        }
    }

    return nil, fmt.Errorf("max retries (%d) exceeded: %w", c.config.MaxRetries, lastErr)
}

func (c *ReconnectingClient) backoff(attempt int) time.Duration {
    delay := float64(c.config.BaseDelay) * math.Pow(2, float64(attempt))
    if delay > float64(c.config.MaxDelay) {
        delay = float64(c.config.MaxDelay)
    }

    jitter := delay * c.config.Jitter
    delay = delay - jitter + rand.Float64()*jitter*2

    return time.Duration(delay)
}

func (c *ReconnectingClient) Listen(ctx context.Context, handler func([]byte)) {
    for {
        conn, err := c.Connect(ctx)
        if err != nil {
            if ctx.Err() != nil {
                return
            }
            continue
        }

        for {
            _, message, err := conn.ReadMessage()
            if err != nil {
                log.Printf("read error: %v, reconnecting...", err)
                conn.Close()
                break
            }
            handler(message)
        }
    }
}

主要な設計ポイント

  • 接続数制限、リソース枯渇を防止
  • 指数バックオフ+ジッター再接続、再接続ストームを回避
  • ハートビート検出、デッド接続を自動クリーンアップ
  • 接続統計、運用監視をサポート
  • グレースフル切断、コンテキストキャンセル時にリソースをクリーンアップ

5つのよくある落とし穴と修正

落とし穴1:WebSocketの並行書き込み

❌ 誤った書き方:

go func() {
    conn.WriteMessage(websocket.TextMessage, msg1)
}()
go func() {
    conn.WriteMessage(websocket.TextMessage, msg2)
}()

gorilla/websocketの接続は並行安全ではない。複数ゴルーチンが同時に書き込むとpanicが発生する。

✅ 正しい書き方:

type SafeConn struct {
    conn *websocket.Conn
    mu   sync.Mutex
}

func (s *SafeConn) WriteMessage(msgType int, data []byte) error {
    s.mu.Lock()
    defer s.mu.Unlock()
    return s.conn.WriteMessage(msgType, data)
}

落とし穴2:SSEのFlush忘れ

❌ 誤った書き方:

fmt.Fprintf(w, "data: %s\n\n", message)

データはHTTPレスポンスライターにバッファリングされ、クライアントは受信できない。

✅ 正しい書き方:

fmt.Fprintf(w, "data: %s\n\n", message)
flusher, _ := w.(http.Flusher)
flusher.Flush()

落とし穴3:NATSサブスクライバーのAck忘れ

❌ 誤った書き方:

sub, _ := js.Subscribe("orders", func(msg *nats.Msg) {
    processOrder(msg.Data)
})

JetStreamメッセージが確認されず、再配信が繰り返され、最終的にMaxDeliverに達してデッドレターに入る。

✅ 正しい書き方:

sub, _ := js.Subscribe("orders", func(msg *nats.Msg) {
    if err := processOrder(msg.Data); err != nil {
        msg.Nak()
        return
    }
    msg.Ack()
}, nats.ManualAck())

落とし穴4:バックオフなしの再接続

❌ 誤った書き方:

for {
    conn, err := dial(url)
    if err != nil {
        continue
    }
}

10万クライアントが同時に再接続し、サーバーを瞬時に圧倒する。

✅ 正しい書き方:

for attempt := 0; ; attempt++ {
    conn, err := dial(url)
    if err != nil {
        delay := backoff(attempt)
        time.Sleep(delay)
        continue
    }
}

落とし穴5:チャネルブロードキャストでスローコンシューマーがブロック

❌ 誤った書き方:

for _, ch := range subscribers {
    ch <- message
}

1人のスローコンシューマーが全サブスクライバーをブロックする。

✅ 正しい書き方:

for _, ch := range subscribers {
    select {
    case ch <- message:
    default:
        log.Printf("slow consumer, dropping message")
    }
}

エラートラブルシューティングリファレンス

エラー現象 考えられる原因 調査方法 解決策
WebSocket接続が頻繁に切断 ハートビートタイムアウト、ネットワークジッター ping/pong間隔の確認、ネットワークレイテンシの測定 ハートビート間隔の増加、再接続ロジックの追加
concurrent write to websocket connection panic 複数ゴルーチンが同時にWebSocketに書き込み 並行WriteMessage呼び出しを検索 ミューテックスまたは単一ゴルーチンで書き込み
SSEクライアントがデータを受信できない Flush忘れ レスポンスライターがFlushを呼び出しているか確認 各書き込み後にflusher.Flush()を呼び出し
NATSメッセージが繰り返し消費される AckなしまたはAckタイムアウト JetStream MaxDeliver設定の確認 ManualAckを使用、処理成功後にAck
再接続ストームでサーバーがダウン バックオフ戦略なし 接続確立レートの監視 指数バックオフ+ジッター再接続
ゴルーチン数が増加し続ける 接続のクリーンアップ不足、チャネル未クローズ pprofゴルーチンプロファイル 切断時にゴルーチンとチャネルを確実にクリーンアップ
メモリが増加し続ける メッセージバックログ、接続リーク pprofヒーププロファイル チャネルバッファの制限、バックプレッシャー機構の追加
イベントの順序が乱れる 複数インスタンスの並行処理 キーによるパーティショニングの確認 一貫性ハッシュまたはスティッキーセッションの使用
ブロードキャストレイテンシが高い シリアル送信、スローコンシューマー 接続ごとの送信レイテンシの監視 並列送信+タイムアウトでスローコンシューマーをスキップ
グレースフルシャットダウン失敗 接続未クローズ、メッセージ損失 シャットダウンフローの確認 ドレインモード:新規接続の受け付け停止、既存接続の完了を待機

高度な最適化テクニック

最適化1:WebSocket接続シャーディング

大量の接続を複数のHubに分割し、ロック競合を削減:

package shardhub

import (
    "hash/fnv"
    "sync"
)

type ShardedHub struct {
    shards []*Hub
    count  int
}

func NewShardedHub(shardCount int) *ShardedHub {
    shards := make([]*Hub, shardCount)
    for i := range shards {
        shards[i] = NewHub()
        go shards[i].Run()
    }
    return &ShardedHub{shards: shards, count: shardCount}
}

func (s *ShardedHub) getShard(key string) *Hub {
    h := fnv.New32a()
    h.Write([]byte(key))
    return s.shards[h.Sum32()%uint32(s.count)]
}

func (s *ShardedHub) Register(client *Client) {
    shard := s.getShard(client.room)
    shard.register <- client
}

func (s *ShardedHub) Broadcast(room string, message []byte) {
    shard := s.getShard(room)
    shard.broadcast <- message
}

最適化2:SSEマルチプレキシング

1つのSSE接続で複数のイベントタイプをプッシュし、接続数を削減:

func (b *Broker) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    flusher, _ := w.(http.Flusher)
    w.Header().Set("Content-Type", "text/event-stream")

    clientChan := make(chan Event, 64)
    topics := r.URL.Query()["topic"]

    for _, topic := range topics {
        b.subscribe(topic, clientChan)
    }

    for {
        select {
        case event := <-clientChan:
            fmt.Fprintf(w, "event: %s\n", event.Event)
            fmt.Fprintf(w, "data: %s\n\n", event.Data)
            flusher.Flush()
        case <-r.Context().Done():
            return
        }
    }
}

最適化3:NATSコンシューマーグループ

複数のコンシューマーがdurable nameを共有し、負荷分散を実現:

func (b *EventBus) SubscribeGroup(ctx context.Context, subject, group, durable string, handler func(Event)) error {
    sub, err := b.js.Subscribe(subject, func(msg *nats.Msg) {
        var event Event
        json.Unmarshal(msg.Data, &event)
        handler(event)
        msg.Ack()
    },
        nats.Durable(durable),
        nats.Queue(group),
        nats.ManualAck(),
        nats.DeliverAll(),
    )
    if err != nil {
        return err
    }

    go func() {
        <-ctx.Done()
        sub.Unsubscribe()
    }()

    return nil
}

リアルタイム技術比較

特徴 WebSocket SSE NATS Goチャネル
通信方向 双方向 一方向(サーバー→クライアント) 双方向 双方向
プロトコル WS HTTP TCP プロセス内
再接続サポート 手動実装 内蔵(Last-Event-ID) 内蔵 N/A
メッセージ永続化 なし なし JetStream なし
ブラウザサポート 全て 全て N/A N/A
プロキシ/CDNフレンドリー 低い 高い N/A N/A
スループット 高い 非常に高い 非常に高い
レイテンシ ~1ms ~10ms ~0.5ms ~0.01ms
ユースケース チャット、協調編集 通知、行情 マイクロサービスイベントバス プロセス内イベント駆動
プロダクション推奨度 ⭐⭐⭐⭐ ⭐⭐⭐⭐ ⭐⭐⭐⭐⭐ ⭐⭐⭐⭐

まとめ

Goリアルタイムシステムの核心はプロトコルの選択ではなく、5つの問いに答えることだ:接続はどう管理するか?メッセージはどうルーティングするか?バックプレッシャーはどう処理するか?再接続はどうバックオフするか?イベントはどう永続化するか? WebSocket Hubは「接続管理」を、SSEは「シンプルプッシュ」を、NATSは「メッセージ永続化とルーティング」を、Goチャネルは「プロセス内イベントストリーミング」を、ConnectionManagerは「再接続とレート制限」を答える。この5つのパターンをマスターすれば、プロダクション級Goリアルタイムシステム設計の核心的な方法論を身につけたことになる。


推奨ツール


関連記事

ブラウザローカルツールを無料で試す →

#Go实时系统#WebSocket#SSE#消息队列#事件驱动#2026#后端开发