Goリアルタイムシステム設計実践:WebSocketからイベントストリーミングまでの5つのプロダクションパターン
10万のWebSocket接続が同時に切断される時:リアルタイムシステムのプロダクションの悪夢
金曜日の午後8時、ライブ配信プラットフォームに50万人の同時接続ユーザー。設定変更がローリングデプロイをトリガーし、全WebSocket接続が同時に切断——50万クライアントが一斉に再接続。雪崩効果でゲートウェイが即座にダウン。さらに悪いことに、再接続ストームがメッセージキューのバックログを引き起こし、コンシューマーが追いつかず、レイテンシが50msから30秒に急増、ユーザー体験が完全に崩壊。
これは決して稀なケースではない。Goのリアルタイムシステム開発は「WebSocketにアップグレードする」だけでは終わらない——接続ライフサイクルの管理、メッセージルーティングの処理、バックプレッシャーと再接続ストームへの対処、イベントの順序付き配信の確保が必要だ。本記事では5つのプロダクション級リアルタイムパターンを解説し、堅牢なGoリアルタイムシステムの構築を支援する。
コア概念リファレンス
| 技術 | 用途 | 主な特徴 | 典型的なユースケース |
|---|---|---|---|
WebSocket |
全二重リアルタイム通信 | 永続接続、低レイテンシ、双方向プッシュ | チャットシステム、協調編集、リアルタイムゲーム |
SSE |
サーバーからクライアントへの一方向プッシュ | HTTPプロトコル、自動再接続、テキストフレーム | 通知プッシュ、株価行情、ログストリーム |
NATS |
高性能メッセージキュー | パブサブ、JetStream永続化、クラスタリング | マイクロサービスイベントバス、IoTデータ配信 |
Go channel |
プロセス内イベント伝達 | 型安全、select多重化、クローズ可能 | イベント駆動アーキテクチャ、ゴルーチン間通信 |
gorilla/websocket |
Go WebSocketライブラリ | 接続プール、ping/pong、並行安全 | プロダクション級WebSocketサービス |
リアルタイムシステム設計の5つの課題
課題1:接続ライフサイクル管理
+--------+ +--------+ +--------+
| Client |---->| Server |----| Client |
+--------+ +--------+ +--------+
| | |
| 接続確立 | 接続確立 |
|------------->|<-------------|
| | |
| ハートビート | ハートビート |
|<--ping/pong->|<--ping/pong->|
| | |
| 接続切断 | 接続切断 |
|<------------X|X------------>|
| | |
| 再接続ストーム | 再接続ストーム|
|=============>|<=============|
10万接続が同時に切断・再接続する時、バックオフ戦略なしでは自分自身にDDoS攻撃を仕掛けるようなものだ。
課題2:メッセージルーティングとファンアウト
1つのメッセージを1万人のサブスクライバーにプッシュする必要がある。サブスクライバーごとに1つのゴルーチン?それとも共有ライターゴルーチン?
課題3:バックプレッシャーとフロー制御
プロデューサーの速度がコンシューマーを大幅に上回る。チャネルが満杯の時どうする?破棄?ブロック?グレード?
課題4:イベントの順序性
同じユーザーのイベントは順序通りに配信されなければならないが、分散環境で複数インスタンスがこれをどう保証するか?
課題5:グレースフルデグラデーション
WebSocketが利用不可ならSSEにフォールバック、SSEが不可ならポーリングにフォールバック。デグラデーション戦略をどう設計するか?
5つのプロダクション級リアルタイムパターン
パターン1:WebSocket Hub——チャットシステム
WebSocket Hubはリアルタイムシステムで最も古典的なパターン:中央Hubが全接続を管理し、メッセージはHubを通じて全サブスクライバーにブロードキャストされる。
+-------+
| Hub |
+-------+
/ | | \
v v v v
+----+ +----+ +----+ +----+
| C1 | | C2 | | C3 | | C4 |
+----+ +----+ +----+ +----+
クライアントがメッセージ送信 --> Hubがブロードキャスト --> 全クライアントが受信
package wschat
import (
"log"
"net/http"
"sync"
"github.com/gorilla/websocket"
)
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool {
return true
},
}
type Message struct {
Username string `json:"username"`
Content string `json:"content"`
Room string `json:"room"`
}
type Client struct {
hub *Hub
conn *websocket.Conn
send chan []byte
room string
}
type Hub struct {
clients map[*Client]bool
broadcast chan []byte
register chan *Client
unregister chan *Client
mu sync.RWMutex
rooms map[string]map[*Client]bool
}
func NewHub() *Hub {
return &Hub{
broadcast: make(chan []byte, 256),
register: make(chan *Client),
unregister: make(chan *Client),
clients: make(map[*Client]bool),
rooms: make(map[string]map[*Client]bool),
}
}
func (h *Hub) Run() {
for {
select {
case client := <-h.register:
h.mu.Lock()
h.clients[client] = true
if h.rooms[client.room] == nil {
h.rooms[client.room] = make(map[*Client]bool)
}
h.rooms[client.room][client] = true
h.mu.Unlock()
case client := <-h.unregister:
h.mu.Lock()
if _, ok := h.clients[client]; ok {
delete(h.clients, client)
if room, ok := h.rooms[client.room]; ok {
delete(room, client)
if len(room) == 0 {
delete(h.rooms, client.room)
}
}
close(client.send)
}
h.mu.Unlock()
case message := <-h.broadcast:
h.mu.RLock()
for client := range h.clients {
select {
case client.send <- message:
default:
h.mu.RUnlock()
h.mu.Lock()
close(client.send)
delete(h.clients, client)
h.mu.Unlock()
h.mu.RLock()
}
}
h.mu.RUnlock()
}
}
}
func (c *Client) readPump() {
defer func() {
c.hub.unregister <- c
c.conn.Close()
}()
c.conn.SetReadLimit(512)
c.conn.SetPongHandler(func(string) error {
return nil
})
for {
_, message, err := c.conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
log.Printf("read error: %v", err)
}
break
}
c.hub.broadcast <- message
}
}
func (c *Client) writePump() {
ticker := time.NewTicker(54 * time.Second)
defer func() {
ticker.Stop()
c.conn.Close()
}()
for {
select {
case message, ok := <-c.send:
c.conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
if !ok {
c.conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}
w, err := c.conn.NextWriter(websocket.TextMessage)
if err != nil {
return
}
w.Write(message)
n := len(c.send)
for i := 0; i < n; i++ {
w.Write([]byte{'\n'})
w.Write(<-c.send)
}
if err := w.Close(); err != nil {
return
}
case <-ticker.C:
c.conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
return
}
}
}
}
func ServeWs(hub *Hub, w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Println(err)
return
}
room := r.URL.Query().Get("room")
if room == "" {
room = "default"
}
client := &Client{
hub: hub,
conn: conn,
send: make(chan []byte, 256),
room: room,
}
hub.register <- client
go client.writePump()
go client.readPump()
}
主要な設計ポイント:
- Hubはチャネルで登録/登録解除/ブロードキャストを管理、ロック競合を回避
- 各Clientに独立したreadPump/writePumpゴルーチン
- writePumpはバッチ送信(チャネルからn件のメッセージをまとめて書き込み)
- ping/pongハートビート検出、54秒間隔
- 送信チャネル満杯時、スローコンシューマーを自動切断
パターン2:Server-Sent Events(SSE)——リアルタイム更新プッシュ
SSEはWebSocketより軽量で、HTTPプロトコルベース、自動再接続とイベントID継続をネイティブサポート。
+--------+ +--------+
| Client | | Server |
+--------+ +--------+
| |
| GET /events |
|------------------>|
| |
| Content-Type: |
| text/event-stream|
|<------------------|
| |
| data: {"price": |
| 42000} |
|<------------------|
| |
| id: 123 |
| data: {"price": |
| 42050} |
|<------------------|
package sse
import (
"fmt"
"net/http"
"sync"
)
type Event struct {
ID string
Event string
Data string
}
type Broker struct {
clients map[chan Event]bool
newClients chan chan Event
deadClients chan chan Event
mu sync.RWMutex
}
func NewBroker() *Broker {
return &Broker{
clients: make(map[chan Event]bool),
newClients: make(chan chan Event),
deadClients: make(chan chan Event),
}
}
func (b *Broker) Start() {
for {
select {
case c := <-b.newClients:
b.mu.Lock()
b.clients[c] = true
b.mu.Unlock()
log.Printf("client connected, total: %d", len(b.clients))
case c := <-b.deadClients:
b.mu.Lock()
delete(b.clients, c)
b.mu.Unlock()
close(c)
log.Printf("client disconnected, total: %d", len(b.clients))
}
}
}
func (b *Broker) ServeHTTP(w http.ResponseWriter, r *http.Request) {
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "streaming not supported", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Access-Control-Allow-Origin", "*")
clientChan := make(chan Event, 64)
b.newClients <- clientChan
notify := r.Context().Done()
go func() {
<-notify
b.deadClients <- clientChan
}()
for {
select {
case event, ok := <-clientChan:
if !ok {
return
}
if event.ID != "" {
fmt.Fprintf(w, "id: %s\n", event.ID)
}
if event.Event != "" {
fmt.Fprintf(w, "event: %s\n", event.Event)
}
fmt.Fprintf(w, "data: %s\n\n", event.Data)
flusher.Flush()
case <-r.Context().Done():
return
}
}
}
func (b *Broker) Notify(event Event) {
b.mu.RLock()
defer b.mu.RUnlock()
for client := range b.clients {
select {
case client <- event:
default:
go func(c chan Event) {
b.deadClients <- c
}(client)
}
}
}
使用例——株価行情プッシュ:
func main() {
broker := NewBroker()
go broker.Start()
go func() {
eventID := 0
for {
time.Sleep(1 * time.Second)
eventID++
price := fetchStockPrice("BTC")
broker.Notify(Event{
ID: fmt.Sprintf("%d", eventID),
Event: "price-update",
Data: fmt.Sprintf(`{"symbol":"BTC","price":%.2f}`, price),
})
}
}()
http.Handle("/events", broker)
http.ListenAndServe(":8080", nil)
}
主要な設計ポイント:
- HTTPベース、プロトコルアップグレード不要、プロキシ/CDNフレンドリー
Last-Event-IDで再接続時のイベント継続をサポートhttp.Flusherでデータの即時プッシュを保証- コンテキストキャンセルでクライアントを自動クリーンアップ
- チャネル満杯時、スローコンシューマーを自動切断
パターン3:NATSメッセージキュー統合——マイクロサービスイベントバス
NATSはGoエコシステムで最も人気のあるメッセージキューの一つで、単一ノードのスループットは1500万msg/秒に達し、リアルタイムシステムに天然適している。
+--------+ +--------+ +--------+
|ServiceA| | NATS | |ServiceB|
+--------+ | Server | +--------+
| +--------+ |
| publish | | subscribe
|-------->| |-------->|
| | | |
| | publish | |
| |<---------| |
| subscribe | |
|<--------| | |
+--------+ +--------+ +--------+
|ServiceC| | NATS | |ServiceD|
+--------+ | JetStream +--------+
+--------+
package natsbus
import (
"context"
"encoding/json"
"fmt"
"log"
"sync"
"github.com/nats-io/nats.go"
)
type Event struct {
Type string `json:"type"`
Payload json.RawMessage `json:"payload"`
Source string `json:"source"`
}
type EventBus struct {
nc *nats.Conn
js nats.JetStreamContext
handlers map[string][]func(Event)
mu sync.RWMutex
}
func NewEventBus(url string) (*EventBus, error) {
nc, err := nats.Connect(url,
nats.ReconnectWait(2*time.Second),
nats.MaxReconnects(60),
nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
log.Printf("NATS disconnected: %v", err)
}),
nats.ReconnectHandler(func(nc *nats.Conn) {
log.Printf("NATS reconnected to %s", nc.ConnectedUrl())
}),
)
if err != nil {
return nil, fmt.Errorf("connect to NATS: %w", err)
}
js, err := nc.JetStream()
if err != nil {
return nil, fmt.Errorf("get JetStream context: %w", err)
}
return &EventBus{
nc: nc,
js: js,
handlers: make(map[string][]func(Event)),
}, nil
}
func (b *EventBus) Publish(ctx context.Context, subject string, event Event) error {
data, err := json.Marshal(event)
if err != nil {
return fmt.Errorf("marshal event: %w", err)
}
select {
case <-ctx.Done():
return ctx.Err()
default:
}
if _, err := b.js.Publish(subject, data); err != nil {
return fmt.Errorf("publish to %s: %w", subject, err)
}
return nil
}
func (b *EventBus) Subscribe(ctx context.Context, subject, durable string, handler func(Event)) error {
streamName := fmt.Sprintf("STREAM_%s", durable)
if _, err := b.js.StreamInfo(streamName); err != nil {
if _, err := b.js.AddStream(&nats.StreamConfig{
Name: streamName,
Subjects: []string{subject},
Retention: nats.LimitsPolicy,
MaxMsgs: 1000000,
MaxAge: 7 * 24 * time.Hour,
}); err != nil {
return fmt.Errorf("add stream %s: %w", streamName, err)
}
}
sub, err := b.js.Subscribe(subject, func(msg *nats.Msg) {
var event Event
if err := json.Unmarshal(msg.Data, &event); err != nil {
log.Printf("unmarshal event: %v", err)
msg.Nak()
return
}
handler(event)
msg.Ack()
}, nats.Durable(durable), nats.ManualAck(), nats.DeliverAll())
if err != nil {
return fmt.Errorf("subscribe to %s: %w", subject, err)
}
go func() {
<-ctx.Done()
sub.Unsubscribe()
}()
return nil
}
func (b *EventBus) Close() {
b.nc.Close()
}
使用例——オーダーイベントストリーム:
func main() {
bus, err := NewEventBus("nats://localhost:4222")
if err != nil {
log.Fatal(err)
}
defer bus.Close()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err = bus.Subscribe(ctx, "orders.*", "order-processor", func(event natsbus.Event) {
switch event.Type {
case "order.created":
processOrder(event.Payload)
case "order.paid":
shipOrder(event.Payload)
case "order.cancelled":
refundOrder(event.Payload)
}
})
bus.Publish(ctx, "orders.us", natsbus.Event{
Type: "order.created",
Source: "checkout-service",
Payload: json.RawMessage(`{"id":"ORD-001","amount":99.99}`),
})
}
主要な設計ポイント:
- JetStream永続化、メッセージ損失なし
Durableサブスクライバー、再接続後に前回位置から再開ManualAckでメッセージ処理成功後にのみ確認- 自動再接続設定(
ReconnectWait、MaxReconnects) - Stream設定でメッセージ保持ポリシー(7日間、100万件)
パターン4:イベント駆動アーキテクチャ——Goチャネル
Goチャネルはプロセス内イベント駆動アーキテクチャに天然適している。ジェネリクスと組み合わせて型安全なイベントバスを構築できる。
+----------+ +----------+ +----------+
| OrderSvc | |EventBus | | NotifySvc|
+----------+ | | +----------+
| | orders | |
|---------->| -----> |--------->|
| | | +----------+
| | payments| | AccSvc |
| | -----> | +----------+
| | |--------->|
| +----------+
package eventbus
import (
"context"
"fmt"
"sync"
)
type Event[T any] struct {
Topic string
Payload T
Err error
}
type Subscription[T any] struct {
ch chan Event[T]
cancel context.CancelFunc
}
type Bus[T any] struct {
subscribers map[string][]chan Event[T]
mu sync.RWMutex
bufferSize int
}
func NewBus[T any](bufferSize int) *Bus[T] {
return &Bus[T]{
subscribers: make(map[string][]chan Event[T]),
bufferSize: bufferSize,
}
}
func (b *Bus[T]) Publish(ctx context.Context, topic string, payload T) error {
b.mu.RLock()
subs := b.subscribers[topic]
b.mu.RUnlock()
event := Event[T]{
Topic: topic,
Payload: payload,
}
for _, ch := range subs {
select {
case ch <- event:
case <-ctx.Done():
return ctx.Err()
default:
go func(c chan Event[T]) {
select {
case c <- event:
case <-ctx.Done():
}
}(ch)
}
}
return nil
}
func (b *Bus[T]) Subscribe(ctx context.Context, topic string) *Subscription[T] {
ch := make(chan Event[T], b.bufferSize)
b.mu.Lock()
b.subscribers[topic] = append(b.subscribers[topic], ch)
b.mu.Unlock()
ctx, cancel := context.WithCancel(ctx)
go func() {
<-ctx.Done()
b.mu.Lock()
subs := b.subscribers[topic]
for i, sub := range subs {
if sub == ch {
b.subscribers[topic] = append(subs[:i], subs[i+1:]...)
break
}
}
b.mu.Unlock()
close(ch)
}()
return &Subscription[T]{ch: ch, cancel: cancel}
}
func (s *Subscription[T]) Events() <-chan Event[T] {
return s.ch
}
func (s *Subscription[T]) Unsubscribe() {
s.cancel()
}
使用例——オーダーイベントストリーム:
type OrderEvent struct {
OrderID string
Status string
Amount float64
}
func main() {
bus := eventbus.NewBus[OrderEvent](256)
ctx := context.Background()
sub := bus.Subscribe(ctx, "orders")
go func() {
for event := range sub.Events() {
fmt.Printf("order %s: %s ($%.2f)\n",
event.Payload.OrderID,
event.Payload.Status,
event.Payload.Amount,
)
}
}()
bus.Publish(ctx, "orders", OrderEvent{
OrderID: "ORD-001",
Status: "created",
Amount: 99.99,
})
bus.Publish(ctx, "orders", OrderEvent{
OrderID: "ORD-001",
Status: "paid",
Amount: 99.99,
})
}
主要な設計ポイント:
- ジェネリックイベントバス、型安全
- ノンブロッキングパブリッシュ、チャネル満杯時に非同期送信
- コンテキストキャンセルでサブスクリプションを自動クリーンアップ
- サブスクライバーごとに独立したチャネル、相互影響なし
- 外部依存ゼロ、純粋なGo標準ライブラリ実装
パターン5:プロダクションデプロイ——接続管理と再接続戦略
プロダクション環境では接続管理、再接続バックオフ、接続レート制限、ヘルスチェックなどの運用問題を処理する必要がある。
+--------+ +----------+ +--------+
| Client |---->| Gateway |---->| Service|
+--------+ +----------+ +--------+
| | |
| 1.接続要求 | |
|------------->| |
| | 2.レート制限 |
| |--- |
| | | |
| |<-- |
| 3.WSアップグレード| |
|<------------>| |
| | |
| 4.接続登録 | |
| |--------------->|
| | |
| 5.ハートビート | |
|<--ping/pong->| |
| | |
| 6.切断 | |
|<------------X| |
| | 7.登録解除 |
| |--------------->|
| | |
| 8.指数バックオフ| |
| 再接続 | |
|---->---->---->| |
package connmanager
import (
"context"
"fmt"
"math/rand"
"net/http"
"sync"
"sync/atomic"
"time"
"github.com/gorilla/websocket"
)
type ConnectionManager struct {
maxConnections int64
currentConns atomic.Int64
connections map[string]*ManagedConn
mu sync.RWMutex
heartbeatInterval time.Duration
writeTimeout time.Duration
maxMessageSize int64
}
type ManagedConn struct {
ID string
Conn *websocket.Conn
Connected time.Time
LastPing time.Time
cancel context.CancelFunc
}
type ConnConfig struct {
MaxConnections int64
HeartbeatInterval time.Duration
WriteTimeout time.Duration
MaxMessageSize int64
}
func NewConnectionManager(cfg ConnConfig) *ConnectionManager {
return &ConnectionManager{
maxConnections: cfg.MaxConnections,
connections: make(map[string]*ManagedConn),
heartbeatInterval: cfg.HeartbeatInterval,
writeTimeout: cfg.WriteTimeout,
maxMessageSize: cfg.MaxMessageSize,
}
}
func (m *ConnectionManager) HandleConnection(w http.ResponseWriter, r *http.Request, handler func([]byte)) {
if m.currentConns.Load() >= m.maxConnections {
http.Error(w, "too many connections", http.StatusServiceUnavailable)
return
}
upgrader := websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool { return true },
}
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
return
}
connID := generateConnID()
ctx, cancel := context.WithCancel(r.Context())
mc := &ManagedConn{
ID: connID,
Conn: conn,
Connected: time.Now(),
LastPing: time.Now(),
cancel: cancel,
}
m.mu.Lock()
m.connections[connID] = mc
m.mu.Unlock()
m.currentConns.Add(1)
go m.readPump(mc, handler)
go m.writePump(mc)
}
func (m *ConnectionManager) readPump(mc *ManagedConn, handler func([]byte)) {
defer m.removeConnection(mc)
mc.Conn.SetReadLimit(m.maxMessageSize)
mc.Conn.SetPongHandler(func(string) error {
mc.LastPing = time.Now()
return nil
})
for {
_, message, err := mc.Conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) {
log.Printf("conn %s unexpected close: %v", mc.ID, err)
}
return
}
handler(message)
}
}
func (m *ConnectionManager) writePump(mc *ManagedConn) {
ticker := time.NewTicker(m.heartbeatInterval)
defer func() {
ticker.Stop()
mc.Conn.Close()
}()
for {
select {
case <-ticker.C:
mc.Conn.SetWriteDeadline(time.Now().Add(m.writeTimeout))
if err := mc.Conn.WriteMessage(websocket.PingMessage, nil); err != nil {
return
}
case <-mc.Conn.Context().Done():
return
}
}
}
func (m *ConnectionManager) removeConnection(mc *ManagedConn) {
mc.cancel()
mc.Conn.Close()
m.mu.Lock()
delete(m.connections, mc.ID)
m.mu.Unlock()
m.currentConns.Add(-1)
}
func (m *ConnectionManager) Broadcast(message []byte) error {
m.mu.RLock()
defer m.mu.RUnlock()
for _, mc := range m.connections {
mc.Conn.SetWriteDeadline(time.Now().Add(m.writeTimeout))
if err := mc.Conn.WriteMessage(websocket.TextMessage, message); err != nil {
log.Printf("broadcast to %s failed: %v", mc.ID, err)
}
}
return nil
}
func (m *ConnectionManager) Stats() (total int64, active int) {
return m.currentConns.Load(), len(m.connections)
}
func generateConnID() string {
return fmt.Sprintf("%d-%06d", time.Now().UnixNano(), rand.Intn(1000000))
}
クライアント再接続戦略:
package reconnect
import (
"context"
"math"
"math/rand"
"time"
"github.com/gorilla/websocket"
)
type ReconnectConfig struct {
MaxRetries int
BaseDelay time.Duration
MaxDelay time.Duration
Jitter float64
}
func DefaultReconnectConfig() ReconnectConfig {
return ReconnectConfig{
MaxRetries: 100,
BaseDelay: 1 * time.Second,
MaxDelay: 30 * time.Second,
Jitter: 0.25,
}
}
type ReconnectingClient struct {
url string
config ReconnectConfig
conn *websocket.Conn
mu sync.Mutex
}
func NewReconnectingClient(url string, config ReconnectConfig) *ReconnectingClient {
return &ReconnectingClient{
url: url,
config: config,
}
}
func (c *ReconnectingClient) Connect(ctx context.Context) (*websocket.Conn, error) {
var lastErr error
for attempt := 0; attempt < c.config.MaxRetries; attempt++ {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
conn, _, err := websocket.DefaultDialer.DialContext(ctx, c.url, nil)
if err == nil {
c.mu.Lock()
c.conn = conn
c.mu.Unlock()
return conn, nil
}
lastErr = err
delay := c.backoff(attempt)
log.Printf("connect attempt %d failed: %v, retrying in %v", attempt+1, err, delay)
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-time.After(delay):
}
}
return nil, fmt.Errorf("max retries (%d) exceeded: %w", c.config.MaxRetries, lastErr)
}
func (c *ReconnectingClient) backoff(attempt int) time.Duration {
delay := float64(c.config.BaseDelay) * math.Pow(2, float64(attempt))
if delay > float64(c.config.MaxDelay) {
delay = float64(c.config.MaxDelay)
}
jitter := delay * c.config.Jitter
delay = delay - jitter + rand.Float64()*jitter*2
return time.Duration(delay)
}
func (c *ReconnectingClient) Listen(ctx context.Context, handler func([]byte)) {
for {
conn, err := c.Connect(ctx)
if err != nil {
if ctx.Err() != nil {
return
}
continue
}
for {
_, message, err := conn.ReadMessage()
if err != nil {
log.Printf("read error: %v, reconnecting...", err)
conn.Close()
break
}
handler(message)
}
}
}
主要な設計ポイント:
- 接続数制限、リソース枯渇を防止
- 指数バックオフ+ジッター再接続、再接続ストームを回避
- ハートビート検出、デッド接続を自動クリーンアップ
- 接続統計、運用監視をサポート
- グレースフル切断、コンテキストキャンセル時にリソースをクリーンアップ
5つのよくある落とし穴と修正
落とし穴1:WebSocketの並行書き込み
❌ 誤った書き方:
go func() {
conn.WriteMessage(websocket.TextMessage, msg1)
}()
go func() {
conn.WriteMessage(websocket.TextMessage, msg2)
}()
gorilla/websocketの接続は並行安全ではない。複数ゴルーチンが同時に書き込むとpanicが発生する。
✅ 正しい書き方:
type SafeConn struct {
conn *websocket.Conn
mu sync.Mutex
}
func (s *SafeConn) WriteMessage(msgType int, data []byte) error {
s.mu.Lock()
defer s.mu.Unlock()
return s.conn.WriteMessage(msgType, data)
}
落とし穴2:SSEのFlush忘れ
❌ 誤った書き方:
fmt.Fprintf(w, "data: %s\n\n", message)
データはHTTPレスポンスライターにバッファリングされ、クライアントは受信できない。
✅ 正しい書き方:
fmt.Fprintf(w, "data: %s\n\n", message)
flusher, _ := w.(http.Flusher)
flusher.Flush()
落とし穴3:NATSサブスクライバーのAck忘れ
❌ 誤った書き方:
sub, _ := js.Subscribe("orders", func(msg *nats.Msg) {
processOrder(msg.Data)
})
JetStreamメッセージが確認されず、再配信が繰り返され、最終的にMaxDeliverに達してデッドレターに入る。
✅ 正しい書き方:
sub, _ := js.Subscribe("orders", func(msg *nats.Msg) {
if err := processOrder(msg.Data); err != nil {
msg.Nak()
return
}
msg.Ack()
}, nats.ManualAck())
落とし穴4:バックオフなしの再接続
❌ 誤った書き方:
for {
conn, err := dial(url)
if err != nil {
continue
}
}
10万クライアントが同時に再接続し、サーバーを瞬時に圧倒する。
✅ 正しい書き方:
for attempt := 0; ; attempt++ {
conn, err := dial(url)
if err != nil {
delay := backoff(attempt)
time.Sleep(delay)
continue
}
}
落とし穴5:チャネルブロードキャストでスローコンシューマーがブロック
❌ 誤った書き方:
for _, ch := range subscribers {
ch <- message
}
1人のスローコンシューマーが全サブスクライバーをブロックする。
✅ 正しい書き方:
for _, ch := range subscribers {
select {
case ch <- message:
default:
log.Printf("slow consumer, dropping message")
}
}
エラートラブルシューティングリファレンス
| エラー現象 | 考えられる原因 | 調査方法 | 解決策 |
|---|---|---|---|
| WebSocket接続が頻繁に切断 | ハートビートタイムアウト、ネットワークジッター | ping/pong間隔の確認、ネットワークレイテンシの測定 | ハートビート間隔の増加、再接続ロジックの追加 |
concurrent write to websocket connection panic |
複数ゴルーチンが同時にWebSocketに書き込み | 並行WriteMessage呼び出しを検索 | ミューテックスまたは単一ゴルーチンで書き込み |
| SSEクライアントがデータを受信できない | Flush忘れ | レスポンスライターがFlushを呼び出しているか確認 | 各書き込み後にflusher.Flush()を呼び出し |
| NATSメッセージが繰り返し消費される | AckなしまたはAckタイムアウト | JetStream MaxDeliver設定の確認 | ManualAckを使用、処理成功後にAck |
| 再接続ストームでサーバーがダウン | バックオフ戦略なし | 接続確立レートの監視 | 指数バックオフ+ジッター再接続 |
| ゴルーチン数が増加し続ける | 接続のクリーンアップ不足、チャネル未クローズ | pprofゴルーチンプロファイル | 切断時にゴルーチンとチャネルを確実にクリーンアップ |
| メモリが増加し続ける | メッセージバックログ、接続リーク | pprofヒーププロファイル | チャネルバッファの制限、バックプレッシャー機構の追加 |
| イベントの順序が乱れる | 複数インスタンスの並行処理 | キーによるパーティショニングの確認 | 一貫性ハッシュまたはスティッキーセッションの使用 |
| ブロードキャストレイテンシが高い | シリアル送信、スローコンシューマー | 接続ごとの送信レイテンシの監視 | 並列送信+タイムアウトでスローコンシューマーをスキップ |
| グレースフルシャットダウン失敗 | 接続未クローズ、メッセージ損失 | シャットダウンフローの確認 | ドレインモード:新規接続の受け付け停止、既存接続の完了を待機 |
高度な最適化テクニック
最適化1:WebSocket接続シャーディング
大量の接続を複数のHubに分割し、ロック競合を削減:
package shardhub
import (
"hash/fnv"
"sync"
)
type ShardedHub struct {
shards []*Hub
count int
}
func NewShardedHub(shardCount int) *ShardedHub {
shards := make([]*Hub, shardCount)
for i := range shards {
shards[i] = NewHub()
go shards[i].Run()
}
return &ShardedHub{shards: shards, count: shardCount}
}
func (s *ShardedHub) getShard(key string) *Hub {
h := fnv.New32a()
h.Write([]byte(key))
return s.shards[h.Sum32()%uint32(s.count)]
}
func (s *ShardedHub) Register(client *Client) {
shard := s.getShard(client.room)
shard.register <- client
}
func (s *ShardedHub) Broadcast(room string, message []byte) {
shard := s.getShard(room)
shard.broadcast <- message
}
最適化2:SSEマルチプレキシング
1つのSSE接続で複数のイベントタイプをプッシュし、接続数を削減:
func (b *Broker) ServeHTTP(w http.ResponseWriter, r *http.Request) {
flusher, _ := w.(http.Flusher)
w.Header().Set("Content-Type", "text/event-stream")
clientChan := make(chan Event, 64)
topics := r.URL.Query()["topic"]
for _, topic := range topics {
b.subscribe(topic, clientChan)
}
for {
select {
case event := <-clientChan:
fmt.Fprintf(w, "event: %s\n", event.Event)
fmt.Fprintf(w, "data: %s\n\n", event.Data)
flusher.Flush()
case <-r.Context().Done():
return
}
}
}
最適化3:NATSコンシューマーグループ
複数のコンシューマーがdurable nameを共有し、負荷分散を実現:
func (b *EventBus) SubscribeGroup(ctx context.Context, subject, group, durable string, handler func(Event)) error {
sub, err := b.js.Subscribe(subject, func(msg *nats.Msg) {
var event Event
json.Unmarshal(msg.Data, &event)
handler(event)
msg.Ack()
},
nats.Durable(durable),
nats.Queue(group),
nats.ManualAck(),
nats.DeliverAll(),
)
if err != nil {
return err
}
go func() {
<-ctx.Done()
sub.Unsubscribe()
}()
return nil
}
リアルタイム技術比較
| 特徴 | WebSocket | SSE | NATS | Goチャネル |
|---|---|---|---|---|
| 通信方向 | 双方向 | 一方向(サーバー→クライアント) | 双方向 | 双方向 |
| プロトコル | WS | HTTP | TCP | プロセス内 |
| 再接続サポート | 手動実装 | 内蔵(Last-Event-ID) | 内蔵 | N/A |
| メッセージ永続化 | なし | なし | JetStream | なし |
| ブラウザサポート | 全て | 全て | N/A | N/A |
| プロキシ/CDNフレンドリー | 低い | 高い | N/A | N/A |
| スループット | 高い | 中 | 非常に高い | 非常に高い |
| レイテンシ | ~1ms | ~10ms | ~0.5ms | ~0.01ms |
| ユースケース | チャット、協調編集 | 通知、行情 | マイクロサービスイベントバス | プロセス内イベント駆動 |
| プロダクション推奨度 | ⭐⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ |
まとめ
Goリアルタイムシステムの核心はプロトコルの選択ではなく、5つの問いに答えることだ:接続はどう管理するか?メッセージはどうルーティングするか?バックプレッシャーはどう処理するか?再接続はどうバックオフするか?イベントはどう永続化するか? WebSocket Hubは「接続管理」を、SSEは「シンプルプッシュ」を、NATSは「メッセージ永続化とルーティング」を、Goチャネルは「プロセス内イベントストリーミング」を、ConnectionManagerは「再接続とレート制限」を答える。この5つのパターンをマスターすれば、プロダクション級Goリアルタイムシステム設計の核心的な方法論を身につけたことになる。
推奨ツール
- JSONフォーマッター — リアルタイムシステムのJSONメッセージをフォーマット、データ構造の問題を迅速にデバッグ
- Base64エンコード/デコード — WebSocketバイナリフレームのエンコード転送を処理
- HTTPステータスコード検索 — SSEとWebSocketアップグレード中のHTTPエラーをトラブルシューティング
関連記事
ブラウザローカルツールを無料で試す →