Goゴルーチンリーク調査:2026年構造化並行処理5つのパターンでゴルーチンリークに完全別れを告げる
このような奇妙な場面に遭遇したことはありませんか?
本番サービスが数時間実行後、メモリ使用量が上昇し続け、CPUが90%に達し、pprofで確認すると——数千のゴルーチンがchannelの受信でスタックし、永遠にデータを待っています。再起動後は正常に戻りますが、数時間後に再発します。これが典型的なゴルーチンリークであり、Go並行プログラミングで最も隠蔽的で致命的なバグです。
さらに恐ろしいのは、ゴルーチンリークは直接エラーを出さず、慢性毒のようにシステムリソースをゆっくり消費し、OOM Killerが介入するまで続きます。2026年になりました。「裸のゴルーチン起動」で並行コードを書くべきではありません——構造化並行処理(Structured Concurrency) こそが正解です。
構造化並行処理とは?
構造化並行処理は2018年のMartin Oderskyの提案に由来し、核心思想は:並行サブタスクのライフサイクルは親タスクによって厳格に管理されなければならない。構造化プログラミングがgotoの混乱を排除したように、構造化並行処理はfire-and-forgetによるゴルーチンリークを排除します。
| 特性 | 非構造化並行処理 | 構造化並行処理 |
|---|---|---|
| ライフサイクル | 制御不能、リークの可能性 | 親終了時に子も終了 |
| エラー伝播 | 手動処理必要 | 自動的に上方向に伝播 |
| リソース回収 | 保証なし | 保証あり |
| デバッグ難度 | 非常に高い | 追跡可能 |
| 代表パターン | go func(){} |
errgroup、Contextキャンセル |
Go言語には言語レベルの構造化並行処理サポートはありませんが、context、errgroup、syncなどの標準ライブラリを使用して完全な構造化並行処理体系を構築できます。
ゴルーチンリークの5つの根本原因
1. 未バッファchannelへのデータ送信
func leak1() {
ch := make(chan int)
go func() {
ch <- 42 // 永遠にブロック、受信者なし
}()
// 関数が返り、ゴルーチンリーク
}
2. 未閉鎖channelからの受信
func leak2() {
ch := make(chan int, 10)
go func() {
for v := range ch { // chが閉じられず、永遠にブロック
fmt.Println(v)
}
}()
}
3. Contextがキャンセルされない
func leak3(ctx context.Context) {
ctx2 := context.Background() // 元のctxのキャンセル信号を継承しない
go func() {
select {
case <-ctx2.Done(): // 永遠にトリガーされない
case <-time.After(10 * time.Hour):
}
}()
}
4. WaitGroupカウントの不一致
func leak4() {
var wg sync.WaitGroup
wg.Add(5)
for i := 0; i < 3; i++ { // 3つだけ起動したが、Addは5
go func() { wg.Done() }()
}
wg.Wait() // 永遠にブロック
}
5. HTTPレスポンスボディの未閉鎖
func leak5() {
resp, _ := http.Get("https://example.com")
// defer resp.Body.Close() // 閉じるのを忘れた!
// 基盤transportのゴルーチンリークを引き起こす
}
パターン1:errgroup——エラー伝播付き並行処理
errgroupは構造化並行処理の最も基礎的な構成要素であり、すべてのサブゴルーチンが完了する(またはいずれかがエラーの場合、残りをキャンセルする)まで親タスクが続行しないことを保証します。
ステップバイステップ
Step 1: errgroupのインストール
go get golang.org/x/sync/errgroup
Step 2: 基本的な使用法——並行APIリクエスト
package main
import (
"context"
"fmt"
"net/http"
"time"
"golang.org/x/sync/errgroup"
)
func fetchAPI(ctx context.Context, url string) ([]byte, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return nil, err
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
return nil, nil
}
func fetchAll(ctx context.Context, urls []string) error {
g, ctx := errgroup.WithContext(ctx)
results := make([][]byte, len(urls))
for i, url := range urls {
i, url := i, url
g.Go(func() error {
data, err := fetchAPI(ctx, url)
if err != nil {
return err
}
results[i] = data
return nil
})
}
if err := g.Wait(); err != nil {
return fmt.Errorf("fetch failed: %w", err)
}
return nil
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
urls := []string{
"https://api.example.com/users",
"https://api.example.com/orders",
"https://api.example.com/products",
}
if err := fetchAll(ctx, urls); err != nil {
fmt.Println("Error:", err)
}
}
パターン2:セマフォ——並行度の制限
並行度を制御する必要がある場合(DB接続数の制限など)、semaphoreパッケージを使用します。
完全コード
package main
import (
"context"
"fmt"
"sync/atomic"
"time"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
)
func processWithLimit(ctx context.Context, tasks []string, maxConcurrency int64) error {
sem := semaphore.NewWeighted(maxConcurrency)
g, ctx := errgroup.WithContext(ctx)
var activeCount int64
for _, task := range tasks {
task := task
if err := sem.Acquire(ctx, 1); err != nil {
return fmt.Errorf("acquire semaphore: %w", err)
}
g.Go(func() error {
defer sem.Release(1)
current := atomic.AddInt64(&activeCount, 1)
defer atomic.AddInt64(&activeCount, -1)
fmt.Printf("Processing %s (active: %d/%d)\n", task, current, maxConcurrency)
select {
case <-time.After(500 * time.Millisecond):
return nil
case <-ctx.Done():
return ctx.Err()
}
})
}
return g.Wait()
}
func main() {
ctx := context.Background()
tasks := make([]string, 20)
for i := range tasks {
tasks[i] = fmt.Sprintf("task-%d", i)
}
if err := processWithLimit(ctx, tasks, 5); err != nil {
fmt.Println("Error:", err)
}
}
パターン3:Pipeline——ストリーム処理
Pipelineパターンはデータを複数のステージに流し、各ステージはゴルーチンのグループで、ステージ間はchannelで接続されます。
完全コード
package main
import (
"context"
"fmt"
"sync"
"golang.org/x/sync/errgroup"
)
func generate(ctx context.Context, nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
select {
case out <- n:
case <-ctx.Done():
return
}
}
}()
return out
}
func square(ctx context.Context, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
select {
case out <- n * n:
case <-ctx.Done():
return
}
}
}()
return out
}
func filter(ctx context.Context, in <-chan int, predicate func(int) bool) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
if predicate(n) {
select {
case out <- n:
case <-ctx.Done():
return
}
}
}
}()
return out
}
func runPipeline(ctx context.Context) error {
g, ctx := errgroup.WithContext(ctx)
var mu sync.Mutex
results := []int{}
ch := generate(ctx, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
ch = square(ctx, ch)
ch = filter(ctx, ch, func(n int) bool { return n > 20 })
g.Go(func() error {
for n := range ch {
mu.Lock()
results = append(results, n)
mu.Unlock()
}
return nil
})
if err := g.Wait(); err != nil {
return err
}
fmt.Println("Results:", results)
return nil
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
runPipeline(ctx)
}
パターン4:Fan-out/Fan-in——分配と集約
タスクを複数のワーカーに分配し(fan-out)、結果を集約します(fan-in)。
完全コード
package main
import (
"context"
"fmt"
"sync"
"golang.org/x/sync/errgroup"
)
func fanOut(ctx context.Context, in <-chan int, workerCount int) []<-chan int {
channels := make([]<-chan int, workerCount)
for i := 0; i < workerCount; i++ {
channels[i] = processWorker(ctx, in, i)
}
return channels
}
func processWorker(ctx context.Context, in <-chan int, id int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
result := n * n
select {
case out <- result:
case <-ctx.Done():
return
}
}
}()
return out
}
func fanIn(ctx context.Context, channels ...<-chan int) <-chan int {
out := make(chan int)
var wg sync.WaitGroup
wg.Add(len(channels))
for _, ch := range channels {
ch := ch
go func() {
defer wg.Done()
for n := range ch {
select {
case out <- n:
case <-ctx.Done():
return
}
}
}()
}
go func() {
wg.Wait()
close(out)
}()
return out
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
in := make(chan int, 10)
for i := 1; i <= 10; i++ {
in <- i
}
close(in)
workerChs := fanOut(ctx, in, 3)
merged := fanIn(ctx, workerChs...)
for result := range merged {
fmt.Println(result)
}
}
パターン5:Worker Pool——制御可能なワークプール
最も汎用的な構造化並行処理パターンで、ゴルーチン数とライフサイクルを厳格に制御します。
完全コード
package main
import (
"context"
"fmt"
"sync"
"time"
)
type Task struct {
ID int
Input string
}
type Result struct {
TaskID int
Output string
Err error
}
type WorkerPool struct {
workerCount int
tasks chan Task
results chan Result
wg sync.WaitGroup
cancel context.CancelFunc
}
func NewWorkerPool(ctx context.Context, workerCount, taskBufSize int) *WorkerPool {
ctx, cancel := context.WithCancel(ctx)
pool := &WorkerPool{
workerCount: workerCount,
tasks: make(chan Task, taskBufSize),
results: make(chan Result, taskBufSize),
cancel: cancel,
}
pool.start(ctx)
return pool
}
func (p *WorkerPool) start(ctx context.Context) {
for i := 0; i < p.workerCount; i++ {
p.wg.Add(1)
go func(workerID int) {
defer p.wg.Done()
for {
select {
case task, ok := <-p.tasks:
if !ok {
return
}
result := p.process(ctx, workerID, task)
select {
case p.results <- result:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}(i)
}
}
func (p *WorkerPool) process(ctx context.Context, workerID int, task Task) Result {
select {
case <-time.After(100 * time.Millisecond):
return Result{
TaskID: task.ID,
Output: fmt.Sprintf("worker-%d processed: %s", workerID, task.Input),
}
case <-ctx.Done():
return Result{TaskID: task.ID, Err: ctx.Err()}
}
}
func (p *WorkerPool) Submit(task Task) { p.tasks <- task }
func (p *WorkerPool) Results() <-chan Result { return p.results }
func (p *WorkerPool) Shutdown() {
close(p.tasks)
p.wg.Wait()
close(p.results)
}
func (p *WorkerPool) Cancel() { p.cancel() }
func main() {
ctx := context.Background()
pool := NewWorkerPool(ctx, 4, 100)
go func() {
for i := 0; i < 20; i++ {
pool.Submit(Task{ID: i, Input: fmt.Sprintf("data-%d", i)})
}
pool.Shutdown()
}()
for result := range pool.Results() {
fmt.Printf("Task %d: %s (err: %v)\n", result.TaskID, result.Output, result.Err)
}
}
落とし穴ガイド
落とし穴1:errgroupでエラーを飲み込む
// ❌ 誤り:g.Goでnilを返し、エラーが飲み込まれる
g.Go(func() error {
if err := doWork(); err != nil {
log.Println(err)
return nil
}
return nil
})
// ✅ 正しい:エラーを返さなければならない
g.Go(func() error {
return doWork()
})
落とし穴2:Contextの伝播がない
// ❌ 誤り:context.Background()を使用、キャンセル信号が失われる
g.Go(func() error {
return fetch(context.Background(), url)
})
// ✅ 正しい:errgroupのctxを使用
g.Go(func() error {
return fetch(ctx, url)
})
落とし穴3:ループ変数のキャプチャエラー
// ❌ 誤り:Go 1.22以前、iはループ変数の参照
for i := 0; i < 10; i++ {
g.Go(func() error {
return process(i)
})
}
// ✅ 正しい:Go 1.22+で自動修正、または手動バインド
for i := 0; i < 10; i++ {
i := i
g.Go(func() error {
return process(i)
})
}
落とし穴4:channel未閉鎖によるrangeデッドロック
// ❌ 誤り:生産者がchannelを閉じない
func produce() <-chan int {
ch := make(chan int)
go func() {
for i := 0; i < 10; i++ {
ch <- i
}
// close(ch)を忘れた
}()
return ch
}
// ✅ 正しい:defer closeが必要
func produce() <-chan int {
ch := make(chan int)
go func() {
defer close(ch)
for i := 0; i < 10; i++ {
ch <- i
}
}()
return ch
}
落とし穴5:errgroup外でのセマフォ使用によるリーク
// ❌ 誤り:g.Goの外でAcquire
sem.Acquire(ctx, 1)
g.Go(func() error {
defer sem.Release(1)
return work()
})
// ✅ 正しい:g.Go内でAcquire
g.Go(func() error {
if err := sem.Acquire(ctx, 1); err != nil {
return err
}
defer sem.Release(1)
return work()
})
エラートラブルシューティング
| # | エラーメッセージ | 原因 | 解決方法 |
|---|---|---|---|
| 1 | fatal error: all goroutines are asleep - deadlock! |
全ゴルーチンがchannel操作でブロック | channelに生産者/消費者がいるか確認、channelが閉じられることを保証 |
| 2 | context deadline exceeded |
操作タイムアウト、contextキャンセル | タイムアウトを増やすかパフォーマンスを最適化、スロークエリの確認 |
| 3 | errgroup: Go called after Wait |
Waitの後にGoを呼び出した | すべてのGo呼び出しがWaitの前に完了することを保証 |
| 4 | panic: sync: WaitGroup is reused before previous Wait has returned |
WaitGroupの誤用 | 毎回新しいWaitGroupを作成、または前のWaitの完了を保証 |
| 5 | panic: close of closed channel |
channelの二重閉鎖 | sync.Onceでcloseを保護、または1つのゴルーチンのみが閉鎖を担当 |
| 6 | panic: send on closed channel |
閉鎖後のchannelに送信 | 送信者がchannelの閉鎖タイミングを知っていることを保証、selectで検出 |
| 7 | goroutine profile: total XXX (数が増加し続ける) |
ゴルーチンリーク | pprofで調査、終了していないゴルーチンのスタックを確認 |
| 8 | runtime: out of memory |
ゴルーチン過多でメモリ枯渇 | 並行度を制限、worker poolやsemaphoreを使用 |
| 9 | signal: killed (OOM Killer) |
システムメモリ不足でプロセス強制終了 | ゴルーチン数を削減、メモリ制限を増やすか使用量を最適化 |
| 10 | import cycle not allowed |
並行コードのパッケージ循環依存 | パッケージ構造を再編成、共有型を独立パッケージに抽出 |
高度な最適化
1. runtime/pprofでゴルーチン数をリアルタイム監視
go func() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for range ticker.C {
fmt.Printf("Goroutines: %d\n", runtime.NumGoroutine())
}
}()
2. 各ゴルーチンにタイムアウト保護を追加
func safeGoroutine(ctx context.Context, fn func() error) error {
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
done := make(chan error, 1)
go func() { done <- fn() }()
select {
case err := <-done:
return err
case <-ctx.Done():
return fmt.Errorf("goroutine timed out: %w", ctx.Err())
}
}
3. go-leakテストフレームワークでリークを検出
import "go.uber.org/goleak"
func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
}
4. 構造化ログでゴルーチンを関連付け
func tracedGoroutine(ctx context.Context, name string, fn func(context.Context) error) error {
logger := slog.With("goroutine", name, "traceID", ctx.Value("traceID"))
logger.Info("goroutine started")
err := fn(ctx)
logger.Info("goroutine finished", "error", err)
return err
}
比較分析
| 次元 | errgroup | Semaphore | Pipeline | Fan-out/Fan-in | Worker Pool |
|---|---|---|---|---|---|
| 並行制御 | 無制限 | 並行度制限 | ステージ直列 | 無制限 | 固定ワーカー数 |
| エラー処理 | 自動伝播 | 手動 | 手動 | 手動 | 手動 |
| ユースケース | 並行リクエスト | レート制限 | ストリーム処理 | 計算集約型 | 汎用タスク処理 |
| 実装複雑度 | 低 | 中 | 中 | 高 | 高 |
| ゴルーチン安全 | ✅ | ✅ | ⚠️注意必要 | ⚠️注意必要 | ✅ |
| バックプレッシャ | ❌ | ✅ | ✅ | ❌ | ✅ |
| キャンセル伝播 | ✅自動 | ✅自動 | ⚠️手動 | ⚠️手動 | ✅ |
まとめ:ゴルーチンリークの本質は「制御不能な並行処理」——サブタスクのライフサイクルが親タスクの制御から逃れることです。構造化並行処理は、errgroup(エラー伝播)、semaphore(並行制限)、pipeline(ストリーム処理)、fan-out/fan-in(分配と集約)、worker pool(制御可能なプール化)の5つのパターンにより、異なる次元から並行安全性を保証します。2026年、裸の
go func(){}に別れを告げ、構造化並行処理を取り入れましょう。
オンラインツール推奨
- JSONフォーマッター:/ja/json/format
- Base64エンコード/デコード:/ja/encode/base64
- cURLからコード:/ja/dev/curl-to-code
- Go Playground:/ja/dev/go-playground
ブラウザローカルツールを無料で試す →