Go並行処理パターン実践:Worker PoolからPipelineまで7つのプロダクションパターン
ゴルーチンリークが無制限並行処理と出会う時:プロダクションの悪夢
午前3時、本番環境のOOMアラート。調査の結果:HTTPハンドラがゴルーチンを無制限に起動——リクエストごとに3つのゴルーチン、QPS 1000なら3000個、10分で18万個のゴルーチンがリークし、メモリが満杯に。さらに恐ろしいのは、これらのゴルーチンが保持するチャネル参照がGC回収を妨げ、最終的にノード全体がクラッシュすることだ。
これは決して稀なケースではない。Goの並行処理プリミティブはシンプルだ——go func()1行で並行処理を起動できるが、プロダクションの並行プログラミングはゴルーチンを起動するだけでは終わらない。並行度の制御、エラー伝播の処理、グレースフルシャットダウンの実装、リソースリークの防止が必要だ。本記事では7つのプロダクション級並行パターンを解説し、堅牢なGo並行サービスの構築を支援する。
コア概念リファレンス
| プリミティブ | 用途 | 主な特徴 | 典型的なユースケース |
|---|---|---|---|
goroutine |
軽量並行実行ユニット | スタックメモリ拡張可能(2KB開始)、Goランタイムスケジューリング | 並行実行が必要なあらゆるタスク |
channel |
ゴルーチン間通信 | 型安全、バッファ付き/なし、クローズ可能 | データ渡し、シグナル通知、結果収集 |
sync.WaitGroup |
ゴルーチングループの待機 | Add/Done/Waitの3点セット |
バッチタスク待機、並行ファンアウト |
sync.Mutex |
共有状態の相互排他ロック | ゼロ値使用可能、TryLock対応(Go 1.18+) |
カウンター、キャッシュ更新、設定ホットリロード |
context.Context |
キャンセルシグナルとタイムアウトの伝播 | 不変、子コンテキストの派生のみ可能 | リクエストタイムアウト、グレースフルシャットダウン、トレーシング |
errgroup.Group |
並行実行+エラー収集 | 最初のエラーで全ゴルーチンをキャンセル | バッチAPI呼び出し、並行データフェッチ |
semaphore.Weighted |
重み付きセマフォによるレート制限 | 重み対応、タイムアウト付き取得 | APIレート制限、リソースクォータ制御 |
プロダクション並行プログラミングの5つの課題
課題1:無制限並行処理によるリソース枯渇
func handleRequests(urls []string) {
for _, url := range urls {
go fetch(url)
}
}
各URLがゴルーチンを起動——1万URLなら1万の同時接続。DB接続プールが枯渇し、下流サービスが圧迫され、メモリが急増する。
課題2:ゴルーチンリーク
func process(ch <-chan int) {
for {
val := <-ch
fmt.Println(val)
}
}
チャネルが閉じられない限り、このゴルーチンは永遠に終了しない。長時間稼働するサービスでは、リークしたゴルーチンが蓄積し続ける。
課題3:エラーが黙って飲み込まれる
func fetchAll(urls []string) {
var wg sync.WaitGroup
for _, url := range urls {
wg.Add(1)
go func(u string) {
defer wg.Done()
resp, err := http.Get(u)
if err != nil {
log.Printf("fetch %s failed: %v", u, err)
return
}
process(resp)
}(url)
}
wg.Wait()
}
エラーはログに出力されるだけで、呼び出し元は失敗に気づかない。3つのURLのうち2つが失敗しても、呼び出し元は全て成功したと思い込む。
課題4:グレースフルシャットダウン不可
サービスがSIGTERMを受信した時、実行中のゴルーチンが強制中断される。書き込み中のデータが破損し、処理中のトランザクションが中途半端になる可能性がある。
課題5:並行パターンの組み合わせが困難
Worker Poolにはレート制限、Pipelineにはエラー伝播、Fan-outには結果集約が必要。各パターンを単独で実装するのは難しくないが、1つのサービスで組み合わせると、パターン間の相互作用でデッドロックが発生しやすい。
7つのプロダクション級並行パターン
パターン1:Worker Pool——制約付きゴルーチンプール
Worker Poolは最も基本的で重要な並行パターンだ。核心的な考え方:固定数のワーカーがタスクキューからタスクを取得して実行し、無制限並行を回避する。
package workerpool
import (
"context"
"sync"
)
type Task func(ctx context.Context) error
type Pool struct {
workers int
tasks chan Task
wg sync.WaitGroup
}
func NewPool(workers int, bufferSize int) *Pool {
return &Pool{
workers: workers,
tasks: make(chan Task, bufferSize),
}
}
func (p *Pool) Start(ctx context.Context) {
for i := 0; i < p.workers; i++ {
p.wg.Add(1)
go func(workerID int) {
defer p.wg.Done()
for {
select {
case <-ctx.Done():
return
case task, ok := <-p.tasks:
if !ok {
return
}
_ = task(ctx)
}
}
}(i)
}
}
func (p *Pool) Submit(task Task) bool {
select {
case p.tasks <- task:
return true
default:
return false
}
}
func (p *Pool) Stop() {
close(p.tasks)
p.wg.Wait()
}
使用例:
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
pool := NewPool(10, 100)
pool.Start(ctx)
urls := []string{
"https://api.example.com/users",
"https://api.example.com/orders",
"https://api.example.com/products",
}
for _, url := range urls {
u := url
pool.Submit(func(ctx context.Context) error {
return fetchURL(ctx, u)
})
}
pool.Stop()
}
主要な設計ポイント:
- ワーカー数固定でゴルーチン爆発を防止
- バッファ付きタスクチャネルがタスクキューとして機能
- コンテキストによるキャンセル対応、ワーカーのグレースフル終了
Submitはノンブロッキング、キュー満杯時にfalseを返す
パターン2:Fan-out/Fan-in——並行スキャッタ・ギャザー
Fan-outはデータソースを複数のゴルーチンに分配して並行処理し、Fan-inは複数のゴルーチンの結果を1つのチャネルに集約する。
package fan
import (
"context"
"sync"
)
func FanOut[T any](ctx context.Context, source <-chan T, workers int) []<-chan T {
channels := make([]<-chan T, workers)
for i := 0; i < workers; i++ {
ch := make(chan T)
channels[i] = ch
go func() {
defer close(ch)
for {
select {
case <-ctx.Done():
return
case val, ok := <-source:
if !ok {
return
}
select {
case ch <- val:
case <-ctx.Done():
return
}
}
}
}()
}
return channels
}
func FanIn[T any](ctx context.Context, channels ...<-chan T) <-chan T {
out := make(chan T)
var wg sync.WaitGroup
wg.Add(len(channels))
for _, ch := range channels {
go func(c <-chan T) {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
case val, ok := <-c:
if !ok {
return
}
select {
case out <- val:
case <-ctx.Done():
return
}
}
}
}(ch)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
使用例——並行オーダー処理:
func processOrders(ctx context.Context, orders <-chan Order) <-chan Result {
workers := FanOut(ctx, orders, 5)
return FanIn(ctx, workers...)
}
主要な設計ポイント:
- ジェネリクス対応(Go 1.18+)、任意の型に適用可能
- 各fan-outゴルーチンがソースチャネルから独立して消費
- fan-inはWaitGroupで全入力チャネルのクローズを待機
- コンテキストキャンセル時に全ゴルーチンが終了可能
パターン3:Pipeline——ステージベース処理
Pipelineは複雑な処理をステージに分解し、各ステージをゴルーチンとしてチャネルで接続する。
package pipeline
import (
"context"
)
type Stage[In any, Out any] func(ctx context.Context, in <-chan In) <-chan Out
func NewPipeline[In any, Out any](
ctx context.Context,
source <-chan In,
stages ...Stage[In, In],
) <-chan Out {
current := source
for _, stage := range stages {
current = stage(ctx, current)
}
return any(current).(<-chan Out)
}
func NewStage[In any, Out any](
process func(ctx context.Context, in In) (Out, error),
bufferSize int,
) Stage[In, Out] {
return func(ctx context.Context, in <-chan In) <-chan Out {
out := make(chan Out, bufferSize)
go func() {
defer close(out)
for {
select {
case <-ctx.Done():
return
case val, ok := <-in:
if !ok {
return
}
result, err := process(ctx, val)
if err != nil {
continue
}
select {
case out <- result:
case <-ctx.Done():
return
}
}
}
}()
return out
}
}
使用例——データ処理パイプライン:
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
raw := make(chan RawData, 100)
validate := NewStage[RawData, ValidData](func(ctx context.Context, in RawData) (ValidData, error) {
if err := in.Validate(); err != nil {
return ValidData{}, err
}
return in.ToValid(), nil
}, 50)
enrich := NewStage[ValidData, EnrichedData](func(ctx context.Context, in ValidData) (EnrichedData, error) {
return fetchExtraInfo(ctx, in)
}, 50)
transform := NewStage[EnrichedData, FinalData](func(ctx context.Context, in EnrichedData) (FinalData, error) {
return in.Transform()
}, 50)
result := NewPipeline(ctx, raw, validate, enrich, transform)
go func() {
for r := range result {
saveToDB(r)
}
}()
}
主要な設計ポイント:
- 各ステージは独立したゴルーチン、独立してスケール可能
- チャネルがステージ間のバックプレッシャーとして機能
- エラーはステージ内で処理、パイプライン全体を中断しない
- コンテキストキャンセル時に全ステージがグレースフル終了
パターン4:errgroup——並行エラー処理
errgroupはsync.WaitGroupのエラー対応版:最初のエラーが全ゴルーチンをキャンセルする。
package main
import (
"context"
"fmt"
"net/http"
"time"
"golang.org/x/sync/errgroup"
)
type FetchResult struct {
URL string
Body string
Size int
}
func fetchMultiple(ctx context.Context, urls []string) ([]FetchResult, error) {
g, ctx := errgroup.WithContext(ctx)
results := make([]FetchResult, len(urls))
for i, url := range urls {
i, url := i, url
g.Go(func() error {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return fmt.Errorf("create request %s: %w", url, err)
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return fmt.Errorf("fetch %s: %w", url, err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("fetch %s: status %d", url, resp.StatusCode)
}
body, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("read %s: %w", url, err)
}
results[i] = FetchResult{
URL: url,
Body: string(body),
Size: len(body),
}
return nil
})
}
if err := g.Wait(); err != nil {
return nil, err
}
return results, nil
}
並行度制限付きerrgroup:
func fetchWithLimit(ctx context.Context, urls []string, maxConcurrent int) ([]FetchResult, error) {
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(maxConcurrent)
results := make([]FetchResult, len(urls))
for i, url := range urls {
i, url := i, url
g.Go(func() error {
results[i], _ = fetchOne(ctx, url)
return nil
})
}
if err := g.Wait(); err != nil {
return nil, err
}
return results, nil
}
主要な設計ポイント:
errgroup.WithContextがキャンセルシグナルを自動伝播g.SetLimit(n)で最大並行数を制御(Go 1.20+)- 最初のエラーが実行中の全ゴルーチンをキャンセル
- クロージャの変数キャプチャには
i, url := i, urlが必要
パターン5:Semaphore——セマフォによるレート制限
semaphore.Weightedは重み付きセマフォを提供し、異なる重みのリソース割り当てに適している。
package ratelimit
import (
"context"
"fmt"
"golang.org/x/sync/semaphore"
)
type RateLimiter struct {
sem *semaphore.Weighted
}
func NewRateLimiter(maxWeight int64) *RateLimiter {
return &RateLimiter{
sem: semaphore.NewWeighted(maxWeight),
}
}
func (r *RateLimiter) Do(ctx context.Context, weight int64, fn func() error) error {
if err := r.sem.Acquire(ctx, weight); err != nil {
return fmt.Errorf("acquire semaphore: %w", err)
}
defer r.sem.Release(weight)
return fn()
}
使用例——APIティアードレート制限:
func main() {
limiter := NewRateLimiter(100)
err := limiter.Do(context.Background(), 10, func() error {
return callLightAPI()
})
err = limiter.Do(context.Background(), 50, func() error {
return callHeavyAPI()
})
}
タイムアウト付きセマフォ取得:
func (r *RateLimiter) DoWithTimeout(ctx context.Context, weight int64, timeout time.Duration, fn func() error) error {
acquireCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
if err := r.sem.Acquire(acquireCtx, weight); err != nil {
return fmt.Errorf("semaphore acquire timeout: %w", err)
}
defer r.sem.Release(weight)
return fn()
}
主要な設計ポイント:
- 重み付きセマフォ、異なる操作が異なるクォータを消費
Acquireはコンテキストキャンセルとタイムアウトに対応ReleaseはAcquireとペアで呼び出す必要がある- APIティアードレート制限、リソースクォータ制御に適している
パターン6:Contextキャンセルとタイムアウト
ContextはGo並行プログラミングの「命綱」であり、キャンセルシグナル、タイムアウト、デッドラインの伝播に使用する。
package ctxutil
import (
"context"
"fmt"
"time"
)
type Result struct {
Data string
Error error
}
func FetchWithTimeout(ctx context.Context, url string, timeout time.Duration) (string, error) {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return "", fmt.Errorf("create request: %w", err)
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
if ctx.Err() == context.DeadlineExceeded {
return "", fmt.Errorf("fetch %s timed out after %v: %w", url, timeout, ctx.Err())
}
return "", fmt.Errorf("fetch %s: %w", url, err)
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return "", fmt.Errorf("read response: %w", err)
}
return string(body), nil
}
func BatchFetch(ctx context.Context, urls []string, timeout time.Duration) []Result {
results := make([]Result, len(urls))
var wg sync.WaitGroup
for i, url := range urls {
wg.Add(1)
go func(idx int, u string) {
defer wg.Done()
data, err := FetchWithTimeout(ctx, u, timeout)
results[idx] = Result{Data: data, Error: err}
}(i, url)
}
wg.Wait()
return results
}
グレースフルシャットダウンパターン:
func main() {
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()
server := &http.Server{Addr: ":8080"}
go func() {
<-ctx.Done()
shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
_ = server.Shutdown(shutdownCtx)
}()
_ = server.ListenAndServe()
}
主要な設計ポイント:
WithTimeout/WithDeadlineでタイムアウトを設定WithCancelで手動キャンセル制御signal.NotifyContextでシステムシグナルをリッスン- コンテキストキャンセルは全子ゴルーチンに伝播
defer cancel()でコンテキストリークを防止
パターン7:プロダクション並行サービス——パターン組み合わせ
上記パターンを組み合わせて、プロダクション級並行サービスを構築する:Worker Pool + Pipeline + errgroup + Context。
package concurrencyservice
import (
"context"
"fmt"
"sync"
"time"
"golang.org/x/sync/errgroup"
)
type Service struct {
workers int
bufferSize int
timeout time.Duration
}
func NewService(workers, bufferSize int, timeout time.Duration) *Service {
return &Service{
workers: workers,
bufferSize: bufferSize,
timeout: timeout,
}
}
type Job struct {
ID string
Input any
}
type Output struct {
Job Job
Result any
Err error
}
func (s *Service) Process(ctx context.Context, jobs []Job) ([]Output, error) {
ctx, cancel := context.WithTimeout(ctx, s.timeout)
defer cancel()
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(s.workers)
jobCh := make(chan Job, s.bufferSize)
resultCh := make(chan Output, s.bufferSize)
g.Go(func() error {
defer close(jobCh)
for _, job := range jobs {
select {
case <-ctx.Done():
return ctx.Err()
case jobCh <- job:
}
}
return nil
})
var processWg sync.WaitGroup
for i := 0; i < s.workers; i++ {
processWg.Add(1)
go func() {
defer processWg.Done()
for job := range jobCh {
output := Output{Job: job}
output.Result, output.Err = s.processOne(ctx, job)
select {
case resultCh <- output:
case <-ctx.Done():
return
}
}
}()
}
go func() {
processWg.Wait()
close(resultCh)
}()
var outputs []Output
for result := range resultCh {
outputs = append(outputs, result)
}
if err := g.Wait(); err != nil {
return outputs, fmt.Errorf("service process: %w", err)
}
return outputs, nil
}
func (s *Service) processOne(ctx context.Context, job Job) (any, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
return fmt.Sprintf("processed-%s", job.ID), nil
}
主要な設計ポイント:
- errgroupが並行度制御+エラー伝播を担当
- チャネルがタスク配信と結果収集に使用
- コンテキストが統一タイムアウトとキャンセルを管理
- WaitGroupが全ワーカー完了後に結果チャネルをクローズ
- 階層設計:スケジューリング層(errgroup)+実行層(ワーカーゴルーチン)+収集層(range resultCh)
5つのよくある落とし穴と修正
落とし穴1:クロージャの変数キャプチャ
❌ 誤った書き方:
for _, url := range urls {
go func() {
fetch(url)
}()
}
全ゴルーチンが同じurl変数を共有し、最終的に最後のURLをフェッチする。
✅ 正しい書き方:
for _, url := range urls {
url := url
go func() {
fetch(url)
}()
}
落とし穴2:チャネル未クローズによるゴルーチンリーク
❌ 誤った書き方:
func producer() <-chan int {
ch := make(chan int)
go func() {
for i := 0; i < 100; i++ {
ch <- i
}
}()
return ch
}
コンシューマのrangeが永遠に終わらず、ゴルーチンがリークする。
✅ 正しい書き方:
func producer() <-chan int {
ch := make(chan int)
go func() {
defer close(ch)
for i := 0; i < 100; i++ {
ch <- i
}
}()
return ch
}
落とし穴3:WaitGroupのAdd位置の誤り
❌ 誤った書き方:
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
go func() {
wg.Add(1)
defer wg.Done()
doWork()
}()
}
wg.Wait()
ゴルーチンがwg.Add(1)に到達する前に、メインゴルーチンのwg.Wait()が通過してしまう。
✅ 正しい書き方:
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
doWork()
}()
}
wg.Wait()
落とし穴4:select内のtime.Afterリーク
❌ 誤った書き方:
select {
case result := <-ch:
process(result)
case <-time.After(5 * time.Second):
return errors.New("timeout")
}
selectのたびに新しいtime.Afterチャネルが作成され、頻繁な呼び出しでタイマーがリークする。
✅ 正しい書き方:
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
select {
case result := <-ch:
process(result)
case <-ctx.Done():
return ctx.Err()
}
落とし穴5:Mutexのコピー
❌ 誤った書き方:
type SafeMap struct {
mu sync.Mutex
data map[string]string
}
func copyMap(m SafeMap) SafeMap {
return m
}
Mutexのゼロ値は「アンロック」状態。ロック済みMutexをコピーするとデッドロックやデータ競合が発生する。
✅ 正しい書き方:
type SafeMap struct {
mu *sync.Mutex
data map[string]string
}
func NewSafeMap() *SafeMap {
return &SafeMap{
mu: &sync.Mutex{},
data: make(map[string]string),
}
}
エラートラブルシューティングリファレンス
| エラー現象 | 考えられる原因 | 調査方法 | 解決策 |
|---|---|---|---|
fatal error: all goroutines are asleep - deadlock! |
全ゴルーチンがブロック、アクティブなゴルーチンなし | チャネルの読み書きのペア確認、selectのdefault確認 | チャネルにプロデューサーとコンシューマーを確保、コンテキストキャンセルを追加 |
| ゴルーチン数が増加し続ける | ゴルーチンリーク、チャネル未クローズ | runtime.NumGoroutine()監視、pprofゴルーチンプロファイル |
全ゴルーチンに終了パスを確保、defer close(ch) |
| メモリが増加し続ける | ゴルーチンが大きなオブジェクト参照を保持、チャネル蓄積 | pprofヒーププロファイル、チャネルバッファサイズ確認 | チャネルバッファを制限、参照を速やかに解放 |
context canceledエラーが頻発 |
上流コンテキストがキャンセル、タイムアウト設定が短すぎる | コンテキストチェーンを確認、タイムアウト時間が妥当か検証 | タイムアウト時間を調整、ビジネスエラーとタイムアウトエラーを区別 |
| 並行処理結果の欠落 | ゴルーチンがチャネル書き込み前に終了 | ゴルーチン終了ロジックを確認、チャネルクローズ確認 | WaitGroupで全ゴルーチン完了後にチャネルをクローズ |
race conditionの検出 |
複数ゴルーチンが共有変数を同時読み書き | go test -race、グローバル変数とクロージャキャプチャを確認 |
Mutex/RWMutexで保護、またはチャネル通信に切り替え |
| チャネルブロックで応答なし | プロデューサー/コンシューマーの速度不整合 | チャネルバッファサイズ確認、生産/消費レート監視 | バッファを増加、select+defaultでノンブロッキング送信 |
| グレースフルシャットダウン失敗 | ゴルーチンがコンテキストキャンセルに応答しない | ゴルーチンがctx.Done()をselectしているか確認 | 長時間実行される全ゴルーチンがctx.Done()をチェックするよう確認 |
| errgroupが1つのエラーしか返さない | errgroupは最初のエラーでキャンセルする設計 | 全エラーの収集が必要か確認 | カスタムエラーコレクターまたは複数errgroupを使用 |
| セマフォ取得タイムアウト | 並行度制限が厳しすぎる、リクエストのキューイングが長すぎる | セマフォ待機時間を監視、重みクォータを調整 | セマフォ容量を増加、重み配分を最適化 |
高度な最適化テクニック
最適化1:動的Worker Pool
システム負荷に基づいてワーカー数を動的に調整する:
package dynamicpool
import (
"context"
"runtime"
"sync"
"sync/atomic"
"time"
)
type DynamicPool struct {
minWorkers int64
maxWorkers int64
active atomic.Int64
tasks chan Task
wg sync.WaitGroup
adjustTick *time.Ticker
}
func NewDynamicPool(minW, maxW int) *DynamicPool {
return &DynamicPool{
minWorkers: int64(minW),
maxWorkers: int64(maxW),
tasks: make(chan Task, maxW*2),
adjustTick: time.NewTicker(5 * time.Second),
}
}
func (p *DynamicPool) Start(ctx context.Context) {
for i := 0; i < int(p.minWorkers); i++ {
p.addWorker(ctx)
}
go func() {
for {
select {
case <-ctx.Done():
p.adjustTick.Stop()
return
case <-p.adjustTick.C:
p.adjustWorkers(ctx)
}
}
}()
}
func (p *DynamicPool) addWorker(ctx context.Context) {
p.active.Add(1)
p.wg.Add(1)
go func() {
defer func() {
p.active.Add(-1)
p.wg.Done()
}()
for task := range p.tasks {
_ = task(ctx)
}
}()
}
func (p *DynamicPool) adjustWorkers(ctx context.Context) {
current := p.active.Load()
queueLen := len(p.tasks)
if queueLen > int(current) && current < p.maxWorkers {
p.addWorker(ctx)
} else if queueLen == 0 && current > p.minWorkers {
for i := 0; i < 5 && int(p.active.Load()) > int(p.minWorkers); i++ {
p.tasks <- nil
}
}
}
func (p *DynamicPool) Stop() {
close(p.tasks)
p.wg.Wait()
}
最適化2:バッチ処理(Batching)
複数の小さなタスクを1つのバッチ操作にまとめ、システムコールとIOオーバーヘッドを削減する:
package batcher
import (
"context"
"sync"
"time"
)
type Batcher[T any, R any] struct {
batchSize int
flushInterval time.Duration
handler func(ctx context.Context, batch []T) ([]R, error)
mu sync.Mutex
buffer []T
results []R
}
func NewBatcher[T any, R any](
batchSize int,
flushInterval time.Duration,
handler func(ctx context.Context, batch []T) ([]R, error),
) *Batcher[T, R] {
return &Batcher[T, R]{
batchSize: batchSize,
flushInterval: flushInterval,
handler: handler,
buffer: make([]T, 0, batchSize),
}
}
func (b *Batcher[T, R]) Add(ctx context.Context, item T) (R, error) {
b.mu.Lock()
b.buffer = append(b.buffer, item)
if len(b.buffer) >= b.batchSize {
batch := b.buffer
b.buffer = make([]T, 0, b.batchSize)
b.mu.Unlock()
results, err := b.handler(ctx, batch)
if err != nil {
var zero R
return zero, err
}
b.results = append(b.results, results...)
var zero R
if len(results) > 0 {
return results[0], nil
}
return zero, nil
}
b.mu.Unlock()
var zero R
return zero, nil
}
func (b *Batcher[T, R]) Flush(ctx context.Context) ([]R, error) {
b.mu.Lock()
batch := b.buffer
b.buffer = make([]T, 0, b.batchSize)
b.mu.Unlock()
if len(batch) == 0 {
return b.results, nil
}
results, err := b.handler(ctx, batch)
if err != nil {
return nil, err
}
b.results = append(b.results, results...)
return b.results, nil
}
最適化3:ゼロコピーチャネル渡し
ポインタとsync.Poolを使用してチャネル渡し時のメモリ割り当てを削減する:
package zerocopy
import (
"sync"
)
type Buffer struct {
Data []byte
}
var bufferPool = sync.Pool{
New: func() any {
return &Buffer{Data: make([]byte, 0, 4096)}
},
}
func GetBuffer() *Buffer {
return bufferPool.Get().(*Buffer)
}
func PutBuffer(buf *Buffer) {
buf.Data = buf.Data[:0]
bufferPool.Put(buf)
}
func ProcessPipeline(ctx context.Context, input <-chan []byte) <-chan *Buffer {
out := make(chan *Buffer, 64)
go func() {
defer close(out)
for {
select {
case <-ctx.Done():
return
case data, ok := <-input:
if !ok {
return
}
buf := GetBuffer()
buf.Data = append(buf.Data[:0], data...)
select {
case out <- buf:
case <-ctx.Done():
PutBuffer(buf)
return
}
}
}
}()
return out
}
並行パターン比較
| 特徴 | goroutine+channel | sync.WaitGroup | errgroup | Worker Pool |
|---|---|---|---|---|
| 並行制御 | 組み込み制御なし | 組み込み制御なし | SetLimit |
固定ワーカー数 |
| エラー処理 | 手動実装が必要 | 手動実装が必要 | 自動初回エラーキャンセル | 手動実装が必要 |
| バックプレッシャー | チャネルバッファ | なし | なし | タスクチャネル |
| キャンセル伝播 | 手動コンテキスト | 手動コンテキスト | 自動コンテキスト | 手動コンテキスト |
| 結果収集 | チャネル受信 | 共有変数が必要 | スライス返却 | チャネル受信 |
| ユースケース | ストリームデータ処理 | バッチタスク待機 | 並行API呼び出し | レート制限タスク処理 |
| 複雑さ | 中 | 低 | 低 | 中 |
| ゴルーチン数 | 不固定 | 不固定 | 制限あり | 固定 |
| プロダクション推奨度 | ⭐⭐⭐ | ⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ |
まとめ
Goの並行プログラミングは「動けばいい」ではなく、4つの問いに答えることだ:何個のゴルーチンが動いているか?いつ終了するか?エラー時はどうするか?リソースをどう回収するか? Worker Poolは「何個」を、Contextは「いつ終了」を、errgroupは「エラー時」を、defer close()は「リソース回収」を答える。この7つのパターンをマスターすれば、プロダクション級Go並行プログラミングの核心的な方法論を手に入れることができる。
おすすめツール
- JSONフォーマッター — 並行サービスのJSONレスポンスをフォーマット、データ構造の問題を迅速にデバッグ
- ハッシュ計算ツール — リクエスト署名とデータチェックサムの計算、並行リクエスト間のデータ一貫性を確保
- Base64エンコード/デコード — 並行サービスでのバイナリデータエンコード転送を処理
ブラウザローカルツールを無料で試す →