Go並行処理パターン実践:Worker PoolからPipelineまで7つのプロダクションパターン

编程语言

ゴルーチンリークが無制限並行処理と出会う時:プロダクションの悪夢

午前3時、本番環境のOOMアラート。調査の結果:HTTPハンドラがゴルーチンを無制限に起動——リクエストごとに3つのゴルーチン、QPS 1000なら3000個、10分で18万個のゴルーチンがリークし、メモリが満杯に。さらに恐ろしいのは、これらのゴルーチンが保持するチャネル参照がGC回収を妨げ、最終的にノード全体がクラッシュすることだ。

これは決して稀なケースではない。Goの並行処理プリミティブはシンプルだ——go func()1行で並行処理を起動できるが、プロダクションの並行プログラミングはゴルーチンを起動するだけでは終わらない。並行度の制御、エラー伝播の処理、グレースフルシャットダウンの実装、リソースリークの防止が必要だ。本記事では7つのプロダクション級並行パターンを解説し、堅牢なGo並行サービスの構築を支援する。


コア概念リファレンス

プリミティブ 用途 主な特徴 典型的なユースケース
goroutine 軽量並行実行ユニット スタックメモリ拡張可能(2KB開始)、Goランタイムスケジューリング 並行実行が必要なあらゆるタスク
channel ゴルーチン間通信 型安全、バッファ付き/なし、クローズ可能 データ渡し、シグナル通知、結果収集
sync.WaitGroup ゴルーチングループの待機 Add/Done/Waitの3点セット バッチタスク待機、並行ファンアウト
sync.Mutex 共有状態の相互排他ロック ゼロ値使用可能、TryLock対応(Go 1.18+) カウンター、キャッシュ更新、設定ホットリロード
context.Context キャンセルシグナルとタイムアウトの伝播 不変、子コンテキストの派生のみ可能 リクエストタイムアウト、グレースフルシャットダウン、トレーシング
errgroup.Group 並行実行+エラー収集 最初のエラーで全ゴルーチンをキャンセル バッチAPI呼び出し、並行データフェッチ
semaphore.Weighted 重み付きセマフォによるレート制限 重み対応、タイムアウト付き取得 APIレート制限、リソースクォータ制御

プロダクション並行プログラミングの5つの課題

課題1:無制限並行処理によるリソース枯渇

func handleRequests(urls []string) {
    for _, url := range urls {
        go fetch(url)
    }
}

各URLがゴルーチンを起動——1万URLなら1万の同時接続。DB接続プールが枯渇し、下流サービスが圧迫され、メモリが急増する。

課題2:ゴルーチンリーク

func process(ch <-chan int) {
    for {
        val := <-ch
        fmt.Println(val)
    }
}

チャネルが閉じられない限り、このゴルーチンは永遠に終了しない。長時間稼働するサービスでは、リークしたゴルーチンが蓄積し続ける。

課題3:エラーが黙って飲み込まれる

func fetchAll(urls []string) {
    var wg sync.WaitGroup
    for _, url := range urls {
        wg.Add(1)
        go func(u string) {
            defer wg.Done()
            resp, err := http.Get(u)
            if err != nil {
                log.Printf("fetch %s failed: %v", u, err)
                return
            }
            process(resp)
        }(url)
    }
    wg.Wait()
}

エラーはログに出力されるだけで、呼び出し元は失敗に気づかない。3つのURLのうち2つが失敗しても、呼び出し元は全て成功したと思い込む。

課題4:グレースフルシャットダウン不可

サービスがSIGTERMを受信した時、実行中のゴルーチンが強制中断される。書き込み中のデータが破損し、処理中のトランザクションが中途半端になる可能性がある。

課題5:並行パターンの組み合わせが困難

Worker Poolにはレート制限、Pipelineにはエラー伝播、Fan-outには結果集約が必要。各パターンを単独で実装するのは難しくないが、1つのサービスで組み合わせると、パターン間の相互作用でデッドロックが発生しやすい。


7つのプロダクション級並行パターン

パターン1:Worker Pool——制約付きゴルーチンプール

Worker Poolは最も基本的で重要な並行パターンだ。核心的な考え方:固定数のワーカーがタスクキューからタスクを取得して実行し、無制限並行を回避する。

package workerpool

import (
    "context"
    "sync"
)

type Task func(ctx context.Context) error

type Pool struct {
    workers int
    tasks   chan Task
    wg      sync.WaitGroup
}

func NewPool(workers int, bufferSize int) *Pool {
    return &Pool{
        workers: workers,
        tasks:   make(chan Task, bufferSize),
    }
}

func (p *Pool) Start(ctx context.Context) {
    for i := 0; i < p.workers; i++ {
        p.wg.Add(1)
        go func(workerID int) {
            defer p.wg.Done()
            for {
                select {
                case <-ctx.Done():
                    return
                case task, ok := <-p.tasks:
                    if !ok {
                        return
                    }
                    _ = task(ctx)
                }
            }
        }(i)
    }
}

func (p *Pool) Submit(task Task) bool {
    select {
    case p.tasks <- task:
        return true
    default:
        return false
    }
}

func (p *Pool) Stop() {
    close(p.tasks)
    p.wg.Wait()
}

使用例:

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
    defer cancel()

    pool := NewPool(10, 100)
    pool.Start(ctx)

    urls := []string{
        "https://api.example.com/users",
        "https://api.example.com/orders",
        "https://api.example.com/products",
    }

    for _, url := range urls {
        u := url
        pool.Submit(func(ctx context.Context) error {
            return fetchURL(ctx, u)
        })
    }

    pool.Stop()
}

主要な設計ポイント

  • ワーカー数固定でゴルーチン爆発を防止
  • バッファ付きタスクチャネルがタスクキューとして機能
  • コンテキストによるキャンセル対応、ワーカーのグレースフル終了
  • Submitはノンブロッキング、キュー満杯時にfalseを返す

パターン2:Fan-out/Fan-in——並行スキャッタ・ギャザー

Fan-outはデータソースを複数のゴルーチンに分配して並行処理し、Fan-inは複数のゴルーチンの結果を1つのチャネルに集約する。

package fan

import (
    "context"
    "sync"
)

func FanOut[T any](ctx context.Context, source <-chan T, workers int) []<-chan T {
    channels := make([]<-chan T, workers)
    for i := 0; i < workers; i++ {
        ch := make(chan T)
        channels[i] = ch
        go func() {
            defer close(ch)
            for {
                select {
                case <-ctx.Done():
                    return
                case val, ok := <-source:
                    if !ok {
                        return
                    }
                    select {
                    case ch <- val:
                    case <-ctx.Done():
                        return
                    }
                }
            }
        }()
    }
    return channels
}

func FanIn[T any](ctx context.Context, channels ...<-chan T) <-chan T {
    out := make(chan T)
    var wg sync.WaitGroup
    wg.Add(len(channels))

    for _, ch := range channels {
        go func(c <-chan T) {
            defer wg.Done()
            for {
                select {
                case <-ctx.Done():
                    return
                case val, ok := <-c:
                    if !ok {
                        return
                    }
                    select {
                    case out <- val:
                    case <-ctx.Done():
                        return
                    }
                }
            }
        }(ch)
    }

    go func() {
        wg.Wait()
        close(out)
    }()

    return out
}

使用例——並行オーダー処理:

func processOrders(ctx context.Context, orders <-chan Order) <-chan Result {
    workers := FanOut(ctx, orders, 5)
    return FanIn(ctx, workers...)
}

主要な設計ポイント

  • ジェネリクス対応(Go 1.18+)、任意の型に適用可能
  • 各fan-outゴルーチンがソースチャネルから独立して消費
  • fan-inはWaitGroupで全入力チャネルのクローズを待機
  • コンテキストキャンセル時に全ゴルーチンが終了可能

パターン3:Pipeline——ステージベース処理

Pipelineは複雑な処理をステージに分解し、各ステージをゴルーチンとしてチャネルで接続する。

package pipeline

import (
    "context"
)

type Stage[In any, Out any] func(ctx context.Context, in <-chan In) <-chan Out

func NewPipeline[In any, Out any](
    ctx context.Context,
    source <-chan In,
    stages ...Stage[In, In],
) <-chan Out {
    current := source
    for _, stage := range stages {
        current = stage(ctx, current)
    }
    return any(current).(<-chan Out)
}

func NewStage[In any, Out any](
    process func(ctx context.Context, in In) (Out, error),
    bufferSize int,
) Stage[In, Out] {
    return func(ctx context.Context, in <-chan In) <-chan Out {
        out := make(chan Out, bufferSize)
        go func() {
            defer close(out)
            for {
                select {
                case <-ctx.Done():
                    return
                case val, ok := <-in:
                    if !ok {
                        return
                    }
                    result, err := process(ctx, val)
                    if err != nil {
                        continue
                    }
                    select {
                    case out <- result:
                    case <-ctx.Done():
                        return
                    }
                }
            }
        }()
        return out
    }
}

使用例——データ処理パイプライン:

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    raw := make(chan RawData, 100)

    validate := NewStage[RawData, ValidData](func(ctx context.Context, in RawData) (ValidData, error) {
        if err := in.Validate(); err != nil {
            return ValidData{}, err
        }
        return in.ToValid(), nil
    }, 50)

    enrich := NewStage[ValidData, EnrichedData](func(ctx context.Context, in ValidData) (EnrichedData, error) {
        return fetchExtraInfo(ctx, in)
    }, 50)

    transform := NewStage[EnrichedData, FinalData](func(ctx context.Context, in EnrichedData) (FinalData, error) {
        return in.Transform()
    }, 50)

    result := NewPipeline(ctx, raw, validate, enrich, transform)

    go func() {
        for r := range result {
            saveToDB(r)
        }
    }()
}

主要な設計ポイント

  • 各ステージは独立したゴルーチン、独立してスケール可能
  • チャネルがステージ間のバックプレッシャーとして機能
  • エラーはステージ内で処理、パイプライン全体を中断しない
  • コンテキストキャンセル時に全ステージがグレースフル終了

パターン4:errgroup——並行エラー処理

errgroupsync.WaitGroupのエラー対応版:最初のエラーが全ゴルーチンをキャンセルする。

package main

import (
    "context"
    "fmt"
    "net/http"
    "time"

    "golang.org/x/sync/errgroup"
)

type FetchResult struct {
    URL  string
    Body string
    Size int
}

func fetchMultiple(ctx context.Context, urls []string) ([]FetchResult, error) {
    g, ctx := errgroup.WithContext(ctx)
    results := make([]FetchResult, len(urls))

    for i, url := range urls {
        i, url := i, url
        g.Go(func() error {
            req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
            if err != nil {
                return fmt.Errorf("create request %s: %w", url, err)
            }

            resp, err := http.DefaultClient.Do(req)
            if err != nil {
                return fmt.Errorf("fetch %s: %w", url, err)
            }
            defer resp.Body.Close()

            if resp.StatusCode != http.StatusOK {
                return fmt.Errorf("fetch %s: status %d", url, resp.StatusCode)
            }

            body, err := io.ReadAll(resp.Body)
            if err != nil {
                return fmt.Errorf("read %s: %w", url, err)
            }

            results[i] = FetchResult{
                URL:  url,
                Body: string(body),
                Size: len(body),
            }
            return nil
        })
    }

    if err := g.Wait(); err != nil {
        return nil, err
    }
    return results, nil
}

並行度制限付きerrgroup:

func fetchWithLimit(ctx context.Context, urls []string, maxConcurrent int) ([]FetchResult, error) {
    g, ctx := errgroup.WithContext(ctx)
    g.SetLimit(maxConcurrent)

    results := make([]FetchResult, len(urls))
    for i, url := range urls {
        i, url := i, url
        g.Go(func() error {
            results[i], _ = fetchOne(ctx, url)
            return nil
        })
    }

    if err := g.Wait(); err != nil {
        return nil, err
    }
    return results, nil
}

主要な設計ポイント

  • errgroup.WithContextがキャンセルシグナルを自動伝播
  • g.SetLimit(n)で最大並行数を制御(Go 1.20+)
  • 最初のエラーが実行中の全ゴルーチンをキャンセル
  • クロージャの変数キャプチャにはi, url := i, urlが必要

パターン5:Semaphore——セマフォによるレート制限

semaphore.Weightedは重み付きセマフォを提供し、異なる重みのリソース割り当てに適している。

package ratelimit

import (
    "context"
    "fmt"

    "golang.org/x/sync/semaphore"
)

type RateLimiter struct {
    sem *semaphore.Weighted
}

func NewRateLimiter(maxWeight int64) *RateLimiter {
    return &RateLimiter{
        sem: semaphore.NewWeighted(maxWeight),
    }
}

func (r *RateLimiter) Do(ctx context.Context, weight int64, fn func() error) error {
    if err := r.sem.Acquire(ctx, weight); err != nil {
        return fmt.Errorf("acquire semaphore: %w", err)
    }
    defer r.sem.Release(weight)
    return fn()
}

使用例——APIティアードレート制限:

func main() {
    limiter := NewRateLimiter(100)

    err := limiter.Do(context.Background(), 10, func() error {
        return callLightAPI()
    })

    err = limiter.Do(context.Background(), 50, func() error {
        return callHeavyAPI()
    })
}

タイムアウト付きセマフォ取得:

func (r *RateLimiter) DoWithTimeout(ctx context.Context, weight int64, timeout time.Duration, fn func() error) error {
    acquireCtx, cancel := context.WithTimeout(ctx, timeout)
    defer cancel()

    if err := r.sem.Acquire(acquireCtx, weight); err != nil {
        return fmt.Errorf("semaphore acquire timeout: %w", err)
    }
    defer r.sem.Release(weight)
    return fn()
}

主要な設計ポイント

  • 重み付きセマフォ、異なる操作が異なるクォータを消費
  • Acquireはコンテキストキャンセルとタイムアウトに対応
  • ReleaseAcquireとペアで呼び出す必要がある
  • APIティアードレート制限、リソースクォータ制御に適している

パターン6:Contextキャンセルとタイムアウト

ContextはGo並行プログラミングの「命綱」であり、キャンセルシグナル、タイムアウト、デッドラインの伝播に使用する。

package ctxutil

import (
    "context"
    "fmt"
    "time"
)

type Result struct {
    Data  string
    Error error
}

func FetchWithTimeout(ctx context.Context, url string, timeout time.Duration) (string, error) {
    ctx, cancel := context.WithTimeout(ctx, timeout)
    defer cancel()

    req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
    if err != nil {
        return "", fmt.Errorf("create request: %w", err)
    }

    resp, err := http.DefaultClient.Do(req)
    if err != nil {
        if ctx.Err() == context.DeadlineExceeded {
            return "", fmt.Errorf("fetch %s timed out after %v: %w", url, timeout, ctx.Err())
        }
        return "", fmt.Errorf("fetch %s: %w", url, err)
    }
    defer resp.Body.Close()

    body, err := io.ReadAll(resp.Body)
    if err != nil {
        return "", fmt.Errorf("read response: %w", err)
    }
    return string(body), nil
}

func BatchFetch(ctx context.Context, urls []string, timeout time.Duration) []Result {
    results := make([]Result, len(urls))
    var wg sync.WaitGroup

    for i, url := range urls {
        wg.Add(1)
        go func(idx int, u string) {
            defer wg.Done()
            data, err := FetchWithTimeout(ctx, u, timeout)
            results[idx] = Result{Data: data, Error: err}
        }(i, url)
    }

    wg.Wait()
    return results
}

グレースフルシャットダウンパターン:

func main() {
    ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
    defer stop()

    server := &http.Server{Addr: ":8080"}

    go func() {
        <-ctx.Done()
        shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
        defer cancel()
        _ = server.Shutdown(shutdownCtx)
    }()

    _ = server.ListenAndServe()
}

主要な設計ポイント

  • WithTimeout/WithDeadlineでタイムアウトを設定
  • WithCancelで手動キャンセル制御
  • signal.NotifyContextでシステムシグナルをリッスン
  • コンテキストキャンセルは全子ゴルーチンに伝播
  • defer cancel()でコンテキストリークを防止

パターン7:プロダクション並行サービス——パターン組み合わせ

上記パターンを組み合わせて、プロダクション級並行サービスを構築する:Worker Pool + Pipeline + errgroup + Context。

package concurrencyservice

import (
    "context"
    "fmt"
    "sync"
    "time"

    "golang.org/x/sync/errgroup"
)

type Service struct {
    workers    int
    bufferSize int
    timeout    time.Duration
}

func NewService(workers, bufferSize int, timeout time.Duration) *Service {
    return &Service{
        workers:    workers,
        bufferSize: bufferSize,
        timeout:    timeout,
    }
}

type Job struct {
    ID    string
    Input any
}

type Output struct {
    Job    Job
    Result any
    Err    error
}

func (s *Service) Process(ctx context.Context, jobs []Job) ([]Output, error) {
    ctx, cancel := context.WithTimeout(ctx, s.timeout)
    defer cancel()

    g, ctx := errgroup.WithContext(ctx)
    g.SetLimit(s.workers)

    jobCh := make(chan Job, s.bufferSize)
    resultCh := make(chan Output, s.bufferSize)

    g.Go(func() error {
        defer close(jobCh)
        for _, job := range jobs {
            select {
            case <-ctx.Done():
                return ctx.Err()
            case jobCh <- job:
            }
        }
        return nil
    })

    var processWg sync.WaitGroup
    for i := 0; i < s.workers; i++ {
        processWg.Add(1)
        go func() {
            defer processWg.Done()
            for job := range jobCh {
                output := Output{Job: job}
                output.Result, output.Err = s.processOne(ctx, job)
                select {
                case resultCh <- output:
                case <-ctx.Done():
                    return
                }
            }
        }()
    }

    go func() {
        processWg.Wait()
        close(resultCh)
    }()

    var outputs []Output
    for result := range resultCh {
        outputs = append(outputs, result)
    }

    if err := g.Wait(); err != nil {
        return outputs, fmt.Errorf("service process: %w", err)
    }
    return outputs, nil
}

func (s *Service) processOne(ctx context.Context, job Job) (any, error) {
    select {
    case <-ctx.Done():
        return nil, ctx.Err()
    default:
    }
    return fmt.Sprintf("processed-%s", job.ID), nil
}

主要な設計ポイント

  • errgroupが並行度制御+エラー伝播を担当
  • チャネルがタスク配信と結果収集に使用
  • コンテキストが統一タイムアウトとキャンセルを管理
  • WaitGroupが全ワーカー完了後に結果チャネルをクローズ
  • 階層設計:スケジューリング層(errgroup)+実行層(ワーカーゴルーチン)+収集層(range resultCh)

5つのよくある落とし穴と修正

落とし穴1:クロージャの変数キャプチャ

❌ 誤った書き方:

for _, url := range urls {
    go func() {
        fetch(url)
    }()
}

全ゴルーチンが同じurl変数を共有し、最終的に最後のURLをフェッチする。

✅ 正しい書き方:

for _, url := range urls {
    url := url
    go func() {
        fetch(url)
    }()
}

落とし穴2:チャネル未クローズによるゴルーチンリーク

❌ 誤った書き方:

func producer() <-chan int {
    ch := make(chan int)
    go func() {
        for i := 0; i < 100; i++ {
            ch <- i
        }
    }()
    return ch
}

コンシューマのrangeが永遠に終わらず、ゴルーチンがリークする。

✅ 正しい書き方:

func producer() <-chan int {
    ch := make(chan int)
    go func() {
        defer close(ch)
        for i := 0; i < 100; i++ {
            ch <- i
        }
    }()
    return ch
}

落とし穴3:WaitGroupのAdd位置の誤り

❌ 誤った書き方:

var wg sync.WaitGroup
for i := 0; i < 10; i++ {
    go func() {
        wg.Add(1)
        defer wg.Done()
        doWork()
    }()
}
wg.Wait()

ゴルーチンがwg.Add(1)に到達する前に、メインゴルーチンのwg.Wait()が通過してしまう。

✅ 正しい書き方:

var wg sync.WaitGroup
for i := 0; i < 10; i++ {
    wg.Add(1)
    go func() {
        defer wg.Done()
        doWork()
    }()
}
wg.Wait()

落とし穴4:select内のtime.Afterリーク

❌ 誤った書き方:

select {
case result := <-ch:
    process(result)
case <-time.After(5 * time.Second):
    return errors.New("timeout")
}

selectのたびに新しいtime.Afterチャネルが作成され、頻繁な呼び出しでタイマーがリークする。

✅ 正しい書き方:

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

select {
case result := <-ch:
    process(result)
case <-ctx.Done():
    return ctx.Err()
}

落とし穴5:Mutexのコピー

❌ 誤った書き方:

type SafeMap struct {
    mu   sync.Mutex
    data map[string]string
}

func copyMap(m SafeMap) SafeMap {
    return m
}

Mutexのゼロ値は「アンロック」状態。ロック済みMutexをコピーするとデッドロックやデータ競合が発生する。

✅ 正しい書き方:

type SafeMap struct {
    mu   *sync.Mutex
    data map[string]string
}

func NewSafeMap() *SafeMap {
    return &SafeMap{
        mu:   &sync.Mutex{},
        data: make(map[string]string),
    }
}

エラートラブルシューティングリファレンス

エラー現象 考えられる原因 調査方法 解決策
fatal error: all goroutines are asleep - deadlock! 全ゴルーチンがブロック、アクティブなゴルーチンなし チャネルの読み書きのペア確認、selectのdefault確認 チャネルにプロデューサーとコンシューマーを確保、コンテキストキャンセルを追加
ゴルーチン数が増加し続ける ゴルーチンリーク、チャネル未クローズ runtime.NumGoroutine()監視、pprofゴルーチンプロファイル 全ゴルーチンに終了パスを確保、defer close(ch)
メモリが増加し続ける ゴルーチンが大きなオブジェクト参照を保持、チャネル蓄積 pprofヒーププロファイル、チャネルバッファサイズ確認 チャネルバッファを制限、参照を速やかに解放
context canceledエラーが頻発 上流コンテキストがキャンセル、タイムアウト設定が短すぎる コンテキストチェーンを確認、タイムアウト時間が妥当か検証 タイムアウト時間を調整、ビジネスエラーとタイムアウトエラーを区別
並行処理結果の欠落 ゴルーチンがチャネル書き込み前に終了 ゴルーチン終了ロジックを確認、チャネルクローズ確認 WaitGroupで全ゴルーチン完了後にチャネルをクローズ
race conditionの検出 複数ゴルーチンが共有変数を同時読み書き go test -race、グローバル変数とクロージャキャプチャを確認 Mutex/RWMutexで保護、またはチャネル通信に切り替え
チャネルブロックで応答なし プロデューサー/コンシューマーの速度不整合 チャネルバッファサイズ確認、生産/消費レート監視 バッファを増加、select+defaultでノンブロッキング送信
グレースフルシャットダウン失敗 ゴルーチンがコンテキストキャンセルに応答しない ゴルーチンがctx.Done()をselectしているか確認 長時間実行される全ゴルーチンがctx.Done()をチェックするよう確認
errgroupが1つのエラーしか返さない errgroupは最初のエラーでキャンセルする設計 全エラーの収集が必要か確認 カスタムエラーコレクターまたは複数errgroupを使用
セマフォ取得タイムアウト 並行度制限が厳しすぎる、リクエストのキューイングが長すぎる セマフォ待機時間を監視、重みクォータを調整 セマフォ容量を増加、重み配分を最適化

高度な最適化テクニック

最適化1:動的Worker Pool

システム負荷に基づいてワーカー数を動的に調整する:

package dynamicpool

import (
    "context"
    "runtime"
    "sync"
    "sync/atomic"
    "time"
)

type DynamicPool struct {
    minWorkers int64
    maxWorkers int64
    active     atomic.Int64
    tasks      chan Task
    wg         sync.WaitGroup
    adjustTick *time.Ticker
}

func NewDynamicPool(minW, maxW int) *DynamicPool {
    return &DynamicPool{
        minWorkers: int64(minW),
        maxWorkers: int64(maxW),
        tasks:      make(chan Task, maxW*2),
        adjustTick: time.NewTicker(5 * time.Second),
    }
}

func (p *DynamicPool) Start(ctx context.Context) {
    for i := 0; i < int(p.minWorkers); i++ {
        p.addWorker(ctx)
    }

    go func() {
        for {
            select {
            case <-ctx.Done():
                p.adjustTick.Stop()
                return
            case <-p.adjustTick.C:
                p.adjustWorkers(ctx)
            }
        }
    }()
}

func (p *DynamicPool) addWorker(ctx context.Context) {
    p.active.Add(1)
    p.wg.Add(1)
    go func() {
        defer func() {
            p.active.Add(-1)
            p.wg.Done()
        }()
        for task := range p.tasks {
            _ = task(ctx)
        }
    }()
}

func (p *DynamicPool) adjustWorkers(ctx context.Context) {
    current := p.active.Load()
    queueLen := len(p.tasks)

    if queueLen > int(current) && current < p.maxWorkers {
        p.addWorker(ctx)
    } else if queueLen == 0 && current > p.minWorkers {
        for i := 0; i < 5 && int(p.active.Load()) > int(p.minWorkers); i++ {
            p.tasks <- nil
        }
    }
}

func (p *DynamicPool) Stop() {
    close(p.tasks)
    p.wg.Wait()
}

最適化2:バッチ処理(Batching)

複数の小さなタスクを1つのバッチ操作にまとめ、システムコールとIOオーバーヘッドを削減する:

package batcher

import (
    "context"
    "sync"
    "time"
)

type Batcher[T any, R any] struct {
    batchSize     int
    flushInterval time.Duration
    handler       func(ctx context.Context, batch []T) ([]R, error)
    mu            sync.Mutex
    buffer        []T
    results       []R
}

func NewBatcher[T any, R any](
    batchSize int,
    flushInterval time.Duration,
    handler func(ctx context.Context, batch []T) ([]R, error),
) *Batcher[T, R] {
    return &Batcher[T, R]{
        batchSize:     batchSize,
        flushInterval: flushInterval,
        handler:       handler,
        buffer:        make([]T, 0, batchSize),
    }
}

func (b *Batcher[T, R]) Add(ctx context.Context, item T) (R, error) {
    b.mu.Lock()
    b.buffer = append(b.buffer, item)

    if len(b.buffer) >= b.batchSize {
        batch := b.buffer
        b.buffer = make([]T, 0, b.batchSize)
        b.mu.Unlock()

        results, err := b.handler(ctx, batch)
        if err != nil {
            var zero R
            return zero, err
        }
        b.results = append(b.results, results...)
        var zero R
        if len(results) > 0 {
            return results[0], nil
        }
        return zero, nil
    }
    b.mu.Unlock()

    var zero R
    return zero, nil
}

func (b *Batcher[T, R]) Flush(ctx context.Context) ([]R, error) {
    b.mu.Lock()
    batch := b.buffer
    b.buffer = make([]T, 0, b.batchSize)
    b.mu.Unlock()

    if len(batch) == 0 {
        return b.results, nil
    }

    results, err := b.handler(ctx, batch)
    if err != nil {
        return nil, err
    }
    b.results = append(b.results, results...)
    return b.results, nil
}

最適化3:ゼロコピーチャネル渡し

ポインタとsync.Poolを使用してチャネル渡し時のメモリ割り当てを削減する:

package zerocopy

import (
    "sync"
)

type Buffer struct {
    Data []byte
}

var bufferPool = sync.Pool{
    New: func() any {
        return &Buffer{Data: make([]byte, 0, 4096)}
    },
}

func GetBuffer() *Buffer {
    return bufferPool.Get().(*Buffer)
}

func PutBuffer(buf *Buffer) {
    buf.Data = buf.Data[:0]
    bufferPool.Put(buf)
}

func ProcessPipeline(ctx context.Context, input <-chan []byte) <-chan *Buffer {
    out := make(chan *Buffer, 64)
    go func() {
        defer close(out)
        for {
            select {
            case <-ctx.Done():
                return
            case data, ok := <-input:
                if !ok {
                    return
                }
                buf := GetBuffer()
                buf.Data = append(buf.Data[:0], data...)

                select {
                case out <- buf:
                case <-ctx.Done():
                    PutBuffer(buf)
                    return
                }
            }
        }
    }()
    return out
}

並行パターン比較

特徴 goroutine+channel sync.WaitGroup errgroup Worker Pool
並行制御 組み込み制御なし 組み込み制御なし SetLimit 固定ワーカー数
エラー処理 手動実装が必要 手動実装が必要 自動初回エラーキャンセル 手動実装が必要
バックプレッシャー チャネルバッファ なし なし タスクチャネル
キャンセル伝播 手動コンテキスト 手動コンテキスト 自動コンテキスト 手動コンテキスト
結果収集 チャネル受信 共有変数が必要 スライス返却 チャネル受信
ユースケース ストリームデータ処理 バッチタスク待機 並行API呼び出し レート制限タスク処理
複雑さ
ゴルーチン数 不固定 不固定 制限あり 固定
プロダクション推奨度 ⭐⭐⭐ ⭐⭐ ⭐⭐⭐⭐⭐ ⭐⭐⭐⭐⭐

まとめ

Goの並行プログラミングは「動けばいい」ではなく、4つの問いに答えることだ:何個のゴルーチンが動いているか?いつ終了するか?エラー時はどうするか?リソースをどう回収するか? Worker Poolは「何個」を、Contextは「いつ終了」を、errgroupは「エラー時」を、defer close()は「リソース回収」を答える。この7つのパターンをマスターすれば、プロダクション級Go並行プログラミングの核心的な方法論を手に入れることができる。


おすすめツール

ブラウザローカルツールを無料で試す →

#Go#并发#goroutine#channel#sync#2026#并发模式