Go併發模式實戰:從Worker Pool到Pipeline的7種生產模式

编程语言

當goroutine泄漏遇上無界併發:生產環境的噩夢

凌晨3點,線上服務OOM告警。排查發現:一個HTTP handler裡無限制地啟動goroutine,每個請求3個goroutine,QPS 1000就是3000個,10分鐘就泄漏了18萬個goroutine,記憶體直接打滿。更可怕的是,這些goroutine持有的channel引用阻止了GC回收,最終整個節點崩潰。

這不是個例。Go的併發原語雖然簡單——go func()一行程式碼就能啟動併發,但生產環境的併發程式設計遠不止啟動goroutine那麼簡單。你需要控制併發度、處理錯誤傳播、實現優雅退出、避免資源泄漏。本文將從7種生產級併發模式出發,幫你構建健壯的Go併發服務。


核心概念速查

原語 用途 關鍵特性 典型場景
goroutine 輕量級併發執行單元 棧記憶體可伸縮(2KB起),排程由Go runtime管理 任何需要併發執行的任務
channel goroutine間通訊 型別安全,支援緩衝/無緩衝,可關閉 資料傳遞、訊號通知、結果收集
sync.WaitGroup 等待一組goroutine完成 Add/Done/Wait三件套 批量任務等待、併發扇出
sync.Mutex 互斥鎖保護共享狀態 零值可用,支援TryLock(Go 1.18+) 計數器、快取更新、配置熱更新
context.Context 傳播取消訊號和超時 不可變,只能派生子context 請求超時、優雅退出、鏈路追蹤
errgroup.Group 併發執行+錯誤收集 首個錯誤取消所有goroutine 批量API呼叫、平行資料獲取
semaphore.Weighted 加權訊號量限流 支援權重,可超時獲取 API限流、資源配額控制

生產環境併發程式設計的5大挑戰

挑戰1:無界併發導致資源耗盡

func handleRequests(urls []string) {
    for _, url := range urls {
        go fetch(url)
    }
}

每個URL啟動一個goroutine,1萬個URL就是1萬個併發連線。資料庫連線池被打滿,下游服務被壓垮,記憶體暴漲。

挑戰2:goroutine泄漏

func process(ch <-chan int) {
    for {
        val := <-ch
        fmt.Println(val)
    }
}

如果channel永遠不會關閉,這個goroutine永遠不會退出。在長期執行的服務中,泄漏的goroutine會不斷累積。

挑戰3:錯誤被靜默吞掉

func fetchAll(urls []string) {
    var wg sync.WaitGroup
    for _, url := range urls {
        wg.Add(1)
        go func(u string) {
            defer wg.Done()
            resp, err := http.Get(u)
            if err != nil {
                log.Printf("fetch %s failed: %v", u, err)
                return
            }
            process(resp)
        }(url)
    }
    wg.Wait()
}

錯誤只是log了,呼叫方完全不知道有失敗。如果3個URL中有2個失敗,呼叫方以為全部成功。

挑戰4:無法優雅退出

服務收到SIGTERM訊號時,正在執行的goroutine被強制中斷,正在寫入的資料可能損壞,正在處理的交易可能半完成。

挑戰5:併發模式組合困難

Worker Pool需要限流,Pipeline需要錯誤傳播,Fan-out需要結果聚合。單獨實現每個模式不難,但在一個服務中組合使用時,模式間的互動容易產生死結。


7種生產級併發模式

模式1:Worker Pool——有界goroutine池

Worker Pool是最基礎也最重要的併發模式。核心思想:固定數量的worker從任務佇列取任務執行,避免無界併發。

package workerpool

import (
    "context"
    "sync"
)

type Task func(ctx context.Context) error

type Pool struct {
    workers int
    tasks   chan Task
    wg      sync.WaitGroup
}

func NewPool(workers int, bufferSize int) *Pool {
    return &Pool{
        workers: workers,
        tasks:   make(chan Task, bufferSize),
    }
}

func (p *Pool) Start(ctx context.Context) {
    for i := 0; i < p.workers; i++ {
        p.wg.Add(1)
        go func(workerID int) {
            defer p.wg.Done()
            for {
                select {
                case <-ctx.Done():
                    return
                case task, ok := <-p.tasks:
                    if !ok {
                        return
                    }
                    _ = task(ctx)
                }
            }
        }(i)
    }
}

func (p *Pool) Submit(task Task) bool {
    select {
    case p.tasks <- task:
        return true
    default:
        return false
    }
}

func (p *Pool) Stop() {
    close(p.tasks)
    p.wg.Wait()
}

使用範例:

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
    defer cancel()

    pool := NewPool(10, 100)
    pool.Start(ctx)

    urls := []string{
        "https://api.example.com/users",
        "https://api.example.com/orders",
        "https://api.example.com/products",
    }

    for _, url := range urls {
        u := url
        pool.Submit(func(ctx context.Context) error {
            return fetchURL(ctx, u)
        })
    }

    pool.Stop()
}

關鍵設計要點

  • worker數量固定,避免goroutine爆炸
  • 帶緩衝的task channel作為任務佇列
  • context支援取消,worker可優雅退出
  • Submit非阻塞,任務佇列滿時回傳false

模式2:Fan-out/Fan-in——平行扇出扇入

Fan-out將一個資料來源分發到多個goroutine平行處理,Fan-in將多個goroutine的結果匯聚到一個channel。

package fan

import (
    "context"
    "sync"
)

func FanOut[T any](ctx context.Context, source <-chan T, workers int) []<-chan T {
    channels := make([]<-chan T, workers)
    for i := 0; i < workers; i++ {
        ch := make(chan T)
        channels[i] = ch
        go func() {
            defer close(ch)
            for {
                select {
                case <-ctx.Done():
                    return
                case val, ok := <-source:
                    if !ok {
                        return
                    }
                    select {
                    case ch <- val:
                    case <-ctx.Done():
                        return
                    }
                }
            }
        }()
    }
    return channels
}

func FanIn[T any](ctx context.Context, channels ...<-chan T) <-chan T {
    out := make(chan T)
    var wg sync.WaitGroup
    wg.Add(len(channels))

    for _, ch := range channels {
        go func(c <-chan T) {
            defer wg.Done()
            for {
                select {
                case <-ctx.Done():
                    return
                case val, ok := <-c:
                    if !ok {
                        return
                    }
                    select {
                    case out <- val:
                    case <-ctx.Done():
                        return
                    }
                }
            }
        }(ch)
    }

    go func() {
        wg.Wait()
        close(out)
    }()

    return out
}

使用範例——平行處理訂單:

func processOrders(ctx context.Context, orders <-chan Order) <-chan Result {
    workers := FanOut(ctx, orders, 5)
    return FanIn(ctx, workers...)
}

關鍵設計要點

  • 泛型支援(Go 1.18+),適用於任意型別
  • 每個fan-out goroutine獨立消費source channel
  • fan-in使用WaitGroup等待所有輸入channel關閉
  • context取消時所有goroutine都能退出

模式3:Pipeline——階段式處理流水線

Pipeline將複雜處理拆分為多個階段(stage),每個階段是一個goroutine,透過channel連線。

package pipeline

import (
    "context"
)

type Stage[In any, Out any] func(ctx context.Context, in <-chan In) <-chan Out

func NewPipeline[In any, Out any](
    ctx context.Context,
    source <-chan In,
    stages ...Stage[In, In],
) <-chan Out {
    current := source
    for _, stage := range stages {
        current = stage(ctx, current)
    }
    return any(current).(<-chan Out)
}

func NewStage[In any, Out any](
    process func(ctx context.Context, in In) (Out, error),
    bufferSize int,
) Stage[In, Out] {
    return func(ctx context.Context, in <-chan In) <-chan Out {
        out := make(chan Out, bufferSize)
        go func() {
            defer close(out)
            for {
                select {
                case <-ctx.Done():
                    return
                case val, ok := <-in:
                    if !ok {
                        return
                    }
                    result, err := process(ctx, val)
                    if err != nil {
                        continue
                    }
                    select {
                    case out <- result:
                    case <-ctx.Done():
                        return
                    }
                }
            }
        }()
        return out
    }
}

使用範例——資料處理流水線:

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    raw := make(chan RawData, 100)

    validate := NewStage[RawData, ValidData](func(ctx context.Context, in RawData) (ValidData, error) {
        if err := in.Validate(); err != nil {
            return ValidData{}, err
        }
        return in.ToValid(), nil
    }, 50)

    enrich := NewStage[ValidData, EnrichedData](func(ctx context.Context, in ValidData) (EnrichedData, error) {
        return fetchExtraInfo(ctx, in)
    }, 50)

    transform := NewStage[EnrichedData, FinalData](func(ctx context.Context, in EnrichedData) (FinalData, error) {
        return in.Transform()
    }, 50)

    result := NewPipeline(ctx, raw, validate, enrich, transform)

    go func() {
        for r := range result {
            saveToDB(r)
        }
    }()
}

關鍵設計要點

  • 每個stage是獨立的goroutine,可獨立伸縮
  • channel作為stage間的背壓機制
  • 錯誤在stage內部處理,不中斷整個pipeline
  • context取消時所有stage優雅退出

模式4:errgroup——併發錯誤處理

errgroupsync.WaitGroup的錯誤感知版本:首個錯誤會取消所有goroutine。

package main

import (
    "context"
    "fmt"
    "net/http"
    "time"

    "golang.org/x/sync/errgroup"
)

type FetchResult struct {
    URL  string
    Body string
    Size int
}

func fetchMultiple(ctx context.Context, urls []string) ([]FetchResult, error) {
    g, ctx := errgroup.WithContext(ctx)
    results := make([]FetchResult, len(urls))

    for i, url := range urls {
        i, url := i, url
        g.Go(func() error {
            req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
            if err != nil {
                return fmt.Errorf("create request %s: %w", url, err)
            }

            resp, err := http.DefaultClient.Do(req)
            if err != nil {
                return fmt.Errorf("fetch %s: %w", url, err)
            }
            defer resp.Body.Close()

            if resp.StatusCode != http.StatusOK {
                return fmt.Errorf("fetch %s: status %d", url, resp.StatusCode)
            }

            body, err := io.ReadAll(resp.Body)
            if err != nil {
                return fmt.Errorf("read %s: %w", url, err)
            }

            results[i] = FetchResult{
                URL:  url,
                Body: string(body),
                Size: len(body),
            }
            return nil
        })
    }

    if err := g.Wait(); err != nil {
        return nil, err
    }
    return results, nil
}

帶併發度限制的errgroup:

func fetchWithLimit(ctx context.Context, urls []string, maxConcurrent int) ([]FetchResult, error) {
    g, ctx := errgroup.WithContext(ctx)
    g.SetLimit(maxConcurrent)

    results := make([]FetchResult, len(urls))
    for i, url := range urls {
        i, url := i, url
        g.Go(func() error {
            results[i], _ = fetchOne(ctx, url)
            return nil
        })
    }

    if err := g.Wait(); err != nil {
        return nil, err
    }
    return results, nil
}

關鍵設計要點

  • errgroup.WithContext自動傳播取消訊號
  • g.SetLimit(n)控制最大併發數(Go 1.20+)
  • 首個錯誤取消所有進行中的goroutine
  • 閉包變數捕獲需要i, url := i, url

模式5:Semaphore——訊號量限流

semaphore.Weighted提供加權訊號量,適合不同權重的資源分配場景。

package ratelimit

import (
    "context"
    "fmt"

    "golang.org/x/sync/semaphore"
)

type RateLimiter struct {
    sem *semaphore.Weighted
}

func NewRateLimiter(maxWeight int64) *RateLimiter {
    return &RateLimiter{
        sem: semaphore.NewWeighted(maxWeight),
    }
}

func (r *RateLimiter) Do(ctx context.Context, weight int64, fn func() error) error {
    if err := r.sem.Acquire(ctx, weight); err != nil {
        return fmt.Errorf("acquire semaphore: %w", err)
    }
    defer r.sem.Release(weight)
    return fn()
}

使用範例——API分級限流:

func main() {
    limiter := NewRateLimiter(100)

    err := limiter.Do(context.Background(), 10, func() error {
        return callLightAPI()
    })

    err = limiter.Do(context.Background(), 50, func() error {
        return callHeavyAPI()
    })
}

帶超時的訊號量獲取:

func (r *RateLimiter) DoWithTimeout(ctx context.Context, weight int64, timeout time.Duration, fn func() error) error {
    acquireCtx, cancel := context.WithTimeout(ctx, timeout)
    defer cancel()

    if err := r.sem.Acquire(acquireCtx, weight); err != nil {
        return fmt.Errorf("semaphore acquire timeout: %w", err)
    }
    defer r.sem.Release(weight)
    return fn()
}

關鍵設計要點

  • 加權訊號量,不同操作消耗不同配額
  • Acquire支援context取消和超時
  • Release必須與Acquire配對呼叫
  • 適合API分級限流、資源配額控制

模式6:Context取消與超時

Context是Go併發程式設計的「生命線」,用於傳播取消訊號、超時和截止時間。

package ctxutil

import (
    "context"
    "fmt"
    "time"
)

type Result struct {
    Data  string
    Error error
}

func FetchWithTimeout(ctx context.Context, url string, timeout time.Duration) (string, error) {
    ctx, cancel := context.WithTimeout(ctx, timeout)
    defer cancel()

    req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
    if err != nil {
        return "", fmt.Errorf("create request: %w", err)
    }

    resp, err := http.DefaultClient.Do(req)
    if err != nil {
        if ctx.Err() == context.DeadlineExceeded {
            return "", fmt.Errorf("fetch %s timed out after %v: %w", url, timeout, ctx.Err())
        }
        return "", fmt.Errorf("fetch %s: %w", url, err)
    }
    defer resp.Body.Close()

    body, err := io.ReadAll(resp.Body)
    if err != nil {
        return "", fmt.Errorf("read response: %w", err)
    }
    return string(body), nil
}

func BatchFetch(ctx context.Context, urls []string, timeout time.Duration) []Result {
    results := make([]Result, len(urls))
    var wg sync.WaitGroup

    for i, url := range urls {
        wg.Add(1)
        go func(idx int, u string) {
            defer wg.Done()
            data, err := FetchWithTimeout(ctx, u, timeout)
            results[idx] = Result{Data: data, Error: err}
        }(i, url)
    }

    wg.Wait()
    return results
}

優雅退出模式:

func main() {
    ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
    defer stop()

    server := &http.Server{Addr: ":8080"}

    go func() {
        <-ctx.Done()
        shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
        defer cancel()
        _ = server.Shutdown(shutdownCtx)
    }()

    _ = server.ListenAndServe()
}

關鍵設計要點

  • WithTimeout/WithDeadline設定超時
  • WithCancel手動控制取消
  • signal.NotifyContext監聽系統訊號
  • context取消會傳播到所有子goroutine
  • defer cancel()防止context泄漏

模式7:生產級併發服務——模式組合

將上述模式組合,構建一個生產級併發服務:Worker Pool + Pipeline + errgroup + Context。

package concurrencyservice

import (
    "context"
    "fmt"
    "sync"
    "time"

    "golang.org/x/sync/errgroup"
)

type Service struct {
    workers    int
    bufferSize int
    timeout    time.Duration
}

func NewService(workers, bufferSize int, timeout time.Duration) *Service {
    return &Service{
        workers:    workers,
        bufferSize: bufferSize,
        timeout:    timeout,
    }
}

type Job struct {
    ID    string
    Input any
}

type Output struct {
    Job    Job
    Result any
    Err    error
}

func (s *Service) Process(ctx context.Context, jobs []Job) ([]Output, error) {
    ctx, cancel := context.WithTimeout(ctx, s.timeout)
    defer cancel()

    g, ctx := errgroup.WithContext(ctx)
    g.SetLimit(s.workers)

    jobCh := make(chan Job, s.bufferSize)
    resultCh := make(chan Output, s.bufferSize)

    g.Go(func() error {
        defer close(jobCh)
        for _, job := range jobs {
            select {
            case <-ctx.Done():
                return ctx.Err()
            case jobCh <- job:
            }
        }
        return nil
    })

    var processWg sync.WaitGroup
    for i := 0; i < s.workers; i++ {
        processWg.Add(1)
        go func() {
            defer processWg.Done()
            for job := range jobCh {
                output := Output{Job: job}
                output.Result, output.Err = s.processOne(ctx, job)
                select {
                case resultCh <- output:
                case <-ctx.Done():
                    return
                }
            }
        }()
    }

    go func() {
        processWg.Wait()
        close(resultCh)
    }()

    var outputs []Output
    for result := range resultCh {
        outputs = append(outputs, result)
    }

    if err := g.Wait(); err != nil {
        return outputs, fmt.Errorf("service process: %w", err)
    }
    return outputs, nil
}

func (s *Service) processOne(ctx context.Context, job Job) (any, error) {
    select {
    case <-ctx.Done():
        return nil, ctx.Err()
    default:
    }
    return fmt.Sprintf("processed-%s", job.ID), nil
}

關鍵設計要點

  • errgroup控制併發度+錯誤傳播
  • channel作為任務分發和結果收集
  • context統一超時和取消
  • WaitGroup確保所有worker完成後才關閉result channel
  • 分層設計:排程層(errgroup)+ 執行層(worker goroutine)+ 收集層(range resultCh)

5大常見陷阱及修復

陷阱1:閉包變數捕獲

❌ 錯誤寫法:

for _, url := range urls {
    go func() {
        fetch(url)
    }()
}

所有goroutine共享同一個url變數,最終都fetch最後一個URL。

✅ 正確寫法:

for _, url := range urls {
    url := url
    go func() {
        fetch(url)
    }()
}

陷阱2:channel未關閉導致goroutine泄漏

❌ 錯誤寫法:

func producer() <-chan int {
    ch := make(chan int)
    go func() {
        for i := 0; i < 100; i++ {
            ch <- i
        }
    }()
    return ch
}

消費者range channel永遠不會結束,goroutine泄漏。

✅ 正確寫法:

func producer() <-chan int {
    ch := make(chan int)
    go func() {
        defer close(ch)
        for i := 0; i < 100; i++ {
            ch <- i
        }
    }()
    return ch
}

陷阱3:WaitGroup的Add位置錯誤

❌ 錯誤寫法:

var wg sync.WaitGroup
for i := 0; i < 10; i++ {
    go func() {
        wg.Add(1)
        defer wg.Done()
        doWork()
    }()
}
wg.Wait()

goroutine可能還沒執行到wg.Add(1),主goroutine就呼叫了wg.Wait(),直接通過。

✅ 正確寫法:

var wg sync.WaitGroup
for i := 0; i < 10; i++ {
    wg.Add(1)
    go func() {
        defer wg.Done()
        doWork()
    }()
}
wg.Wait()

陷阱4:select的time.After泄漏

❌ 錯誤寫法:

select {
case result := <-ch:
    process(result)
case <-time.After(5 * time.Second):
    return errors.New("timeout")
}

每次select都建立新的time.After channel,如果頻繁呼叫會泄漏timer。

✅ 正確寫法:

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

select {
case result := <-ch:
    process(result)
case <-ctx.Done():
    return ctx.Err()
}

陷阱5:Mutex複製

❌ 錯誤寫法:

type SafeMap struct {
    mu   sync.Mutex
    data map[string]string
}

func copyMap(m SafeMap) SafeMap {
    return m
}

Mutex零值是「未鎖定」,複製一個已鎖定的Mutex會導致死結或資料競爭。

✅ 正確寫法:

type SafeMap struct {
    mu   *sync.Mutex
    data map[string]string
}

func NewSafeMap() *SafeMap {
    return &SafeMap{
        mu:   &sync.Mutex{},
        data: make(map[string]string),
    }
}

錯誤排查速查表

錯誤現象 可能原因 排查方法 解決方案
fatal error: all goroutines are asleep - deadlock! 所有goroutine阻塞,無活躍goroutine 檢查channel讀寫是否配對,select是否有default 確保channel有生產者和消費者,新增context取消
goroutine數量持續增長 goroutine泄漏,channel未關閉 runtime.NumGoroutine()監控,pprof goroutine profile 確保所有goroutine有退出路徑,defer close(ch)
記憶體持續增長 goroutine持有大物件引用,channel堆積 pprof heap profile,檢查channel buffer大小 限制channel buffer,及時釋放引用
context canceled 錯誤頻繁 上游context被取消,超時設定過短 檢查context鏈路,確認超時時間是否合理 調整超時時間,區分業務錯誤和超時錯誤
併發結果丟失 goroutine在寫入channel前退出 檢查goroutine退出邏輯,確認channel是否已關閉 使用WaitGroup確保所有goroutine完成後再關閉channel
race condition 報錯 多個goroutine同時讀寫共享變數 go test -race,檢查全域變數和閉包捕獲 使用Mutex/RWMutex保護,或改用channel通訊
channel阻塞無回應 生產者/消費者速度不匹配 檢查channel buffer大小,監控生產和消費速率 增加buffer,使用select+default非阻塞傳送
服務優雅退出失敗 goroutine未回應context取消 檢查goroutine是否select了ctx.Done() 確保所有長時間執行的goroutine檢查ctx.Done()
errgroup只回傳一個錯誤 errgroup設計為首錯取消 檢查是否需要收集所有錯誤 使用自定義錯誤收集器或多個errgroup
訊號量獲取超時 併發度限制過嚴,請求排隊過長 監控semaphore等待時間,調整權重配額 增加訊號量容量,最佳化權重分配

進階最佳化技巧

最佳化1:動態Worker Pool

根據系統負載動態調整worker數量:

package dynamicpool

import (
    "context"
    "runtime"
    "sync"
    "sync/atomic"
    "time"
)

type DynamicPool struct {
    minWorkers int64
    maxWorkers int64
    active     atomic.Int64
    tasks      chan Task
    wg         sync.WaitGroup
    adjustTick *time.Ticker
}

func NewDynamicPool(minW, maxW int) *DynamicPool {
    return &DynamicPool{
        minWorkers: int64(minW),
        maxWorkers: int64(maxW),
        tasks:      make(chan Task, maxW*2),
        adjustTick: time.NewTicker(5 * time.Second),
    }
}

func (p *DynamicPool) Start(ctx context.Context) {
    for i := 0; i < int(p.minWorkers); i++ {
        p.addWorker(ctx)
    }

    go func() {
        for {
            select {
            case <-ctx.Done():
                p.adjustTick.Stop()
                return
            case <-p.adjustTick.C:
                p.adjustWorkers(ctx)
            }
        }
    }()
}

func (p *DynamicPool) addWorker(ctx context.Context) {
    p.active.Add(1)
    p.wg.Add(1)
    go func() {
        defer func() {
            p.active.Add(-1)
            p.wg.Done()
        }()
        for task := range p.tasks {
            _ = task(ctx)
        }
    }()
}

func (p *DynamicPool) adjustWorkers(ctx context.Context) {
    current := p.active.Load()
    queueLen := len(p.tasks)

    if queueLen > int(current) && current < p.maxWorkers {
        p.addWorker(ctx)
    } else if queueLen == 0 && current > p.minWorkers {
        for i := 0; i < 5 && int(p.active.Load()) > int(p.minWorkers); i++ {
            p.tasks <- nil
        }
    }
}

func (p *DynamicPool) Stop() {
    close(p.tasks)
    p.wg.Wait()
}

最佳化2:批次處理(Batching)

將多個小任務合併為一個批次操作,減少系統呼叫和IO開銷:

package batcher

import (
    "context"
    "sync"
    "time"
)

type Batcher[T any, R any] struct {
    batchSize     int
    flushInterval time.Duration
    handler       func(ctx context.Context, batch []T) ([]R, error)
    mu            sync.Mutex
    buffer        []T
    results       []R
}

func NewBatcher[T any, R any](
    batchSize int,
    flushInterval time.Duration,
    handler func(ctx context.Context, batch []T) ([]R, error),
) *Batcher[T, R] {
    return &Batcher[T, R]{
        batchSize:     batchSize,
        flushInterval: flushInterval,
        handler:       handler,
        buffer:        make([]T, 0, batchSize),
    }
}

func (b *Batcher[T, R]) Add(ctx context.Context, item T) (R, error) {
    b.mu.Lock()
    b.buffer = append(b.buffer, item)

    if len(b.buffer) >= b.batchSize {
        batch := b.buffer
        b.buffer = make([]T, 0, b.batchSize)
        b.mu.Unlock()

        results, err := b.handler(ctx, batch)
        if err != nil {
            var zero R
            return zero, err
        }
        b.results = append(b.results, results...)
        var zero R
        if len(results) > 0 {
            return results[0], nil
        }
        return zero, nil
    }
    b.mu.Unlock()

    var zero R
    return zero, nil
}

func (b *Batcher[T, R]) Flush(ctx context.Context) ([]R, error) {
    b.mu.Lock()
    batch := b.buffer
    b.buffer = make([]T, 0, b.batchSize)
    b.mu.Unlock()

    if len(batch) == 0 {
        return b.results, nil
    }

    results, err := b.handler(ctx, batch)
    if err != nil {
        return nil, err
    }
    b.results = append(b.results, results...)
    return b.results, nil
}

最佳化3:零拷貝Channel傳遞

使用指標和sync.Pool減少channel傳遞時的記憶體分配:

package zerocopy

import (
    "sync"
)

type Buffer struct {
    Data []byte
}

var bufferPool = sync.Pool{
    New: func() any {
        return &Buffer{Data: make([]byte, 0, 4096)}
    },
}

func GetBuffer() *Buffer {
    return bufferPool.Get().(*Buffer)
}

func PutBuffer(buf *Buffer) {
    buf.Data = buf.Data[:0]
    bufferPool.Put(buf)
}

func ProcessPipeline(ctx context.Context, input <-chan []byte) <-chan *Buffer {
    out := make(chan *Buffer, 64)
    go func() {
        defer close(out)
        for {
            select {
            case <-ctx.Done():
                return
            case data, ok := <-input:
                if !ok {
                    return
                }
                buf := GetBuffer()
                buf.Data = append(buf.Data[:0], data...)

                select {
                case out <- buf:
                case <-ctx.Done():
                    PutBuffer(buf)
                    return
                }
            }
        }
    }()
    return out
}

併發模式對比

特性 goroutine+channel sync.WaitGroup errgroup Worker Pool
併發控制 無內建控制 無內建控制 SetLimit 固定worker數
錯誤處理 需手動實現 需手動實現 自動首錯取消 需手動實現
背壓機制 channel buffer task channel
取消傳播 需手動context 需手動context 自動context 需手動context
結果收集 channel接收 需共享變數 回傳slice channel接收
適用場景 串流資料處理 批量任務等待 併發API呼叫 限流任務處理
複雜度
goroutine數量 不固定 不固定 有限制 固定
生產推薦度 ⭐⭐⭐ ⭐⭐ ⭐⭐⭐⭐⭐ ⭐⭐⭐⭐⭐

總結

Go的併發程式設計不是「能跑就行」,而是要回答四個問題:多少個goroutine在跑?它們什麼時候退出?出錯怎麼辦?資源怎麼回收? Worker Pool回答了「多少個」,Context回答了「什麼時候退出」,errgroup回答了「出錯怎麼辦」,defer close()回答了「資源怎麼回收」。掌握這7種模式,你就掌握了生產級Go併發程式設計的核心方法論。


推薦工具

  • JSON格式化工具 — 格式化併發服務的JSON回應,快速排查資料結構問題
  • 雜湊計算工具 — 計算請求簽名和資料校驗,確保併發請求的資料一致性
  • Base64編解碼 — 處理併發服務中的二進位資料編碼傳輸

本站提供瀏覽器本地工具,免註冊即可試用 →

#Go#并发#goroutine#channel#sync#2026#并发模式