Go并发模式实战:从Worker Pool到Pipeline的7种生产模式

编程语言

当goroutine泄漏遇上无界并发:生产环境的噩梦

凌晨3点,线上服务OOM告警。排查发现:一个HTTP handler里无限制地启动goroutine,每个请求3个goroutine,QPS 1000就是3000个,10分钟就泄漏了18万个goroutine,内存直接打满。更可怕的是,这些goroutine持有的channel引用阻止了GC回收,最终整个节点崩溃。

这不是个例。Go的并发原语虽然简单——go func()一行代码就能启动并发,但生产环境的并发编程远不止启动goroutine那么简单。你需要控制并发度、处理错误传播、实现优雅退出、避免资源泄漏。本文将从7种生产级并发模式出发,帮你构建健壮的Go并发服务。


核心概念速查

原语 用途 关键特性 典型场景
goroutine 轻量级并发执行单元 栈内存可伸缩(2KB起),调度由Go runtime管理 任何需要并发执行的任务
channel goroutine间通信 类型安全,支持缓冲/无缓冲,可关闭 数据传递、信号通知、结果收集
sync.WaitGroup 等待一组goroutine完成 Add/Done/Wait三件套 批量任务等待、并发扇出
sync.Mutex 互斥锁保护共享状态 零值可用,支持TryLock(Go 1.18+) 计数器、缓存更新、配置热更新
context.Context 传播取消信号和超时 不可变,只能派生子context 请求超时、优雅退出、链路追踪
errgroup.Group 并发执行+错误收集 首个错误取消所有goroutine 批量API调用、并行数据获取
semaphore.Weighted 加权信号量限流 支持权重,可超时获取 API限流、资源配额控制

生产环境并发编程的5大挑战

挑战1:无界并发导致资源耗尽

func handleRequests(urls []string) {
    for _, url := range urls {
        go fetch(url)
    }
}

每个URL启动一个goroutine,1万个URL就是1万个并发连接。数据库连接池被打满,下游服务被压垮,内存暴涨。

挑战2:goroutine泄漏

func process(ch <-chan int) {
    for {
        val := <-ch
        fmt.Println(val)
    }
}

如果channel永远不会关闭,这个goroutine永远不会退出。在长期运行的服务中,泄漏的goroutine会不断累积。

挑战3:错误被静默吞掉

func fetchAll(urls []string) {
    var wg sync.WaitGroup
    for _, url := range urls {
        wg.Add(1)
        go func(u string) {
            defer wg.Done()
            resp, err := http.Get(u)
            if err != nil {
                log.Printf("fetch %s failed: %v", u, err)
                return
            }
            process(resp)
        }(url)
    }
    wg.Wait()
}

错误只是log了,调用方完全不知道有失败。如果3个URL中有2个失败,调用方以为全部成功。

挑战4:无法优雅退出

服务收到SIGTERM信号时,正在执行的goroutine被强制中断,正在写入的数据可能损坏,正在处理的事务可能半完成。

挑战5:并发模式组合困难

Worker Pool需要限流,Pipeline需要错误传播,Fan-out需要结果聚合。单独实现每个模式不难,但在一个服务中组合使用时,模式间的交互容易产生死锁。


7种生产级并发模式

模式1:Worker Pool——有界goroutine池

Worker Pool是最基础也最重要的并发模式。核心思想:固定数量的worker从任务队列取任务执行,避免无界并发。

package workerpool

import (
    "context"
    "sync"
)

type Task func(ctx context.Context) error

type Pool struct {
    workers int
    tasks   chan Task
    wg      sync.WaitGroup
}

func NewPool(workers int, bufferSize int) *Pool {
    return &Pool{
        workers: workers,
        tasks:   make(chan Task, bufferSize),
    }
}

func (p *Pool) Start(ctx context.Context) {
    for i := 0; i < p.workers; i++ {
        p.wg.Add(1)
        go func(workerID int) {
            defer p.wg.Done()
            for {
                select {
                case <-ctx.Done():
                    return
                case task, ok := <-p.tasks:
                    if !ok {
                        return
                    }
                    _ = task(ctx)
                }
            }
        }(i)
    }
}

func (p *Pool) Submit(task Task) bool {
    select {
    case p.tasks <- task:
        return true
    default:
        return false
    }
}

func (p *Pool) Stop() {
    close(p.tasks)
    p.wg.Wait()
}

使用示例:

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
    defer cancel()

    pool := NewPool(10, 100)
    pool.Start(ctx)

    urls := []string{
        "https://api.example.com/users",
        "https://api.example.com/orders",
        "https://api.example.com/products",
    }

    for _, url := range urls {
        u := url
        pool.Submit(func(ctx context.Context) error {
            return fetchURL(ctx, u)
        })
    }

    pool.Stop()
}

关键设计点

  • worker数量固定,避免goroutine爆炸
  • 带缓冲的task channel作为任务队列
  • context支持取消,worker可优雅退出
  • Submit非阻塞,任务队列满时返回false

模式2:Fan-out/Fan-in——并行扇出扇入

Fan-out将一个数据源分发到多个goroutine并行处理,Fan-in将多个goroutine的结果汇聚到一个channel。

package fan

import (
    "context"
    "sync"
)

func FanOut[T any](ctx context.Context, source <-chan T, workers int) []<-chan T {
    channels := make([]<-chan T, workers)
    for i := 0; i < workers; i++ {
        ch := make(chan T)
        channels[i] = ch
        go func() {
            defer close(ch)
            for {
                select {
                case <-ctx.Done():
                    return
                case val, ok := <-source:
                    if !ok {
                        return
                    }
                    select {
                    case ch <- val:
                    case <-ctx.Done():
                        return
                    }
                }
            }
        }()
    }
    return channels
}

func FanIn[T any](ctx context.Context, channels ...<-chan T) <-chan T {
    out := make(chan T)
    var wg sync.WaitGroup
    wg.Add(len(channels))

    for _, ch := range channels {
        go func(c <-chan T) {
            defer wg.Done()
            for {
                select {
                case <-ctx.Done():
                    return
                case val, ok := <-c:
                    if !ok {
                        return
                    }
                    select {
                    case out <- val:
                    case <-ctx.Done():
                        return
                    }
                }
            }
        }(ch)
    }

    go func() {
        wg.Wait()
        close(out)
    }()

    return out
}

使用示例——并行处理订单:

func processOrders(ctx context.Context, orders <-chan Order) <-chan Result {
    workers := FanOut(ctx, orders, 5)
    return FanIn(ctx, workers...)
}

关键设计点

  • 泛型支持(Go 1.18+),适用于任意类型
  • 每个fan-out goroutine独立消费source channel
  • fan-in使用WaitGroup等待所有输入channel关闭
  • context取消时所有goroutine都能退出

模式3:Pipeline——阶段式处理流水线

Pipeline将复杂处理拆分为多个阶段(stage),每个阶段是一个goroutine,通过channel连接。

package pipeline

import (
    "context"
)

type Stage[In any, Out any] func(ctx context.Context, in <-chan In) <-chan Out

func NewPipeline[In any, Out any](
    ctx context.Context,
    source <-chan In,
    stages ...Stage[In, In],
) <-chan Out {
    current := source
    for _, stage := range stages {
        current = stage(ctx, current)
    }
    return any(current).(<-chan Out)
}

func NewStage[In any, Out any](
    process func(ctx context.Context, in In) (Out, error),
    bufferSize int,
) Stage[In, Out] {
    return func(ctx context.Context, in <-chan In) <-chan Out {
        out := make(chan Out, bufferSize)
        go func() {
            defer close(out)
            for {
                select {
                case <-ctx.Done():
                    return
                case val, ok := <-in:
                    if !ok {
                        return
                    }
                    result, err := process(ctx, val)
                    if err != nil {
                        continue
                    }
                    select {
                    case out <- result:
                    case <-ctx.Done():
                        return
                    }
                }
            }
        }()
        return out
    }
}

使用示例——数据处理流水线:

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    raw := make(chan RawData, 100)

    validate := NewStage[RawData, ValidData](func(ctx context.Context, in RawData) (ValidData, error) {
        if err := in.Validate(); err != nil {
            return ValidData{}, err
        }
        return in.ToValid(), nil
    }, 50)

    enrich := NewStage[ValidData, EnrichedData](func(ctx context.Context, in ValidData) (EnrichedData, error) {
        return fetchExtraInfo(ctx, in)
    }, 50)

    transform := NewStage[EnrichedData, FinalData](func(ctx context.Context, in EnrichedData) (FinalData, error) {
        return in.Transform()
    }, 50)

    result := NewPipeline(ctx, raw, validate, enrich, transform)

    go func() {
        for r := range result {
            saveToDB(r)
        }
    }()
}

关键设计点

  • 每个stage是独立的goroutine,可独立伸缩
  • channel作为stage间的背压机制
  • 错误在stage内部处理,不中断整个pipeline
  • context取消时所有stage优雅退出

模式4:errgroup——并发错误处理

errgroupsync.WaitGroup的错误感知版本:首个错误会取消所有goroutine。

package main

import (
    "context"
    "fmt"
    "net/http"
    "time"

    "golang.org/x/sync/errgroup"
)

type FetchResult struct {
    URL  string
    Body string
    Size int
}

func fetchMultiple(ctx context.Context, urls []string) ([]FetchResult, error) {
    g, ctx := errgroup.WithContext(ctx)
    results := make([]FetchResult, len(urls))

    for i, url := range urls {
        i, url := i, url
        g.Go(func() error {
            req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
            if err != nil {
                return fmt.Errorf("create request %s: %w", url, err)
            }

            resp, err := http.DefaultClient.Do(req)
            if err != nil {
                return fmt.Errorf("fetch %s: %w", url, err)
            }
            defer resp.Body.Close()

            if resp.StatusCode != http.StatusOK {
                return fmt.Errorf("fetch %s: status %d", url, resp.StatusCode)
            }

            body, err := io.ReadAll(resp.Body)
            if err != nil {
                return fmt.Errorf("read %s: %w", url, err)
            }

            results[i] = FetchResult{
                URL:  url,
                Body: string(body),
                Size: len(body),
            }
            return nil
        })
    }

    if err := g.Wait(); err != nil {
        return nil, err
    }
    return results, nil
}

带并发度限制的errgroup:

func fetchWithLimit(ctx context.Context, urls []string, maxConcurrent int) ([]FetchResult, error) {
    g, ctx := errgroup.WithContext(ctx)
    g.SetLimit(maxConcurrent)

    results := make([]FetchResult, len(urls))
    for i, url := range urls {
        i, url := i, url
        g.Go(func() error {
            results[i], _ = fetchOne(ctx, url)
            return nil
        })
    }

    if err := g.Wait(); err != nil {
        return nil, err
    }
    return results, nil
}

关键设计点

  • errgroup.WithContext自动传播取消信号
  • g.SetLimit(n)控制最大并发数(Go 1.20+)
  • 首个错误取消所有进行中的goroutine
  • 闭包变量捕获需要i, url := i, url

模式5:Semaphore——信号量限流

semaphore.Weighted提供加权信号量,适合不同权重的资源分配场景。

package ratelimit

import (
    "context"
    "fmt"

    "golang.org/x/sync/semaphore"
)

type RateLimiter struct {
    sem *semaphore.Weighted
}

func NewRateLimiter(maxWeight int64) *RateLimiter {
    return &RateLimiter{
        sem: semaphore.NewWeighted(maxWeight),
    }
}

func (r *RateLimiter) Do(ctx context.Context, weight int64, fn func() error) error {
    if err := r.sem.Acquire(ctx, weight); err != nil {
        return fmt.Errorf("acquire semaphore: %w", err)
    }
    defer r.sem.Release(weight)
    return fn()
}

使用示例——API分级限流:

func main() {
    limiter := NewRateLimiter(100)

    err := limiter.Do(context.Background(), 10, func() error {
        return callLightAPI()
    })

    err = limiter.Do(context.Background(), 50, func() error {
        return callHeavyAPI()
    })
}

带超时的信号量获取:

func (r *RateLimiter) DoWithTimeout(ctx context.Context, weight int64, timeout time.Duration, fn func() error) error {
    acquireCtx, cancel := context.WithTimeout(ctx, timeout)
    defer cancel()

    if err := r.sem.Acquire(acquireCtx, weight); err != nil {
        return fmt.Errorf("semaphore acquire timeout: %w", err)
    }
    defer r.sem.Release(weight)
    return fn()
}

关键设计点

  • 加权信号量,不同操作消耗不同配额
  • Acquire支持context取消和超时
  • Release必须与Acquire配对调用
  • 适合API分级限流、资源配额控制

模式6:Context取消与超时

Context是Go并发编程的"生命线",用于传播取消信号、超时和截止时间。

package ctxutil

import (
    "context"
    "fmt"
    "time"
)

type Result struct {
    Data  string
    Error error
}

func FetchWithTimeout(ctx context.Context, url string, timeout time.Duration) (string, error) {
    ctx, cancel := context.WithTimeout(ctx, timeout)
    defer cancel()

    req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
    if err != nil {
        return "", fmt.Errorf("create request: %w", err)
    }

    resp, err := http.DefaultClient.Do(req)
    if err != nil {
        if ctx.Err() == context.DeadlineExceeded {
            return "", fmt.Errorf("fetch %s timed out after %v: %w", url, timeout, ctx.Err())
        }
        return "", fmt.Errorf("fetch %s: %w", url, err)
    }
    defer resp.Body.Close()

    body, err := io.ReadAll(resp.Body)
    if err != nil {
        return "", fmt.Errorf("read response: %w", err)
    }
    return string(body), nil
}

func BatchFetch(ctx context.Context, urls []string, timeout time.Duration) []Result {
    results := make([]Result, len(urls))
    var wg sync.WaitGroup

    for i, url := range urls {
        wg.Add(1)
        go func(idx int, u string) {
            defer wg.Done()
            data, err := FetchWithTimeout(ctx, u, timeout)
            results[idx] = Result{Data: data, Error: err}
        }(i, url)
    }

    wg.Wait()
    return results
}

优雅退出模式:

func main() {
    ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
    defer stop()

    server := &http.Server{Addr: ":8080"}

    go func() {
        <-ctx.Done()
        shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
        defer cancel()
        _ = server.Shutdown(shutdownCtx)
    }()

    _ = server.ListenAndServe()
}

关键设计点

  • WithTimeout/WithDeadline设置超时
  • WithCancel手动控制取消
  • signal.NotifyContext监听系统信号
  • context取消会传播到所有子goroutine
  • defer cancel()防止context泄漏

模式7:生产级并发服务——模式组合

将上述模式组合,构建一个生产级并发服务:Worker Pool + Pipeline + errgroup + Context。

package concurrencyservice

import (
    "context"
    "fmt"
    "sync"
    "time"

    "golang.org/x/sync/errgroup"
)

type Service struct {
    workers    int
    bufferSize int
    timeout    time.Duration
}

func NewService(workers, bufferSize int, timeout time.Duration) *Service {
    return &Service{
        workers:    workers,
        bufferSize: bufferSize,
        timeout:    timeout,
    }
}

type Job struct {
    ID    string
    Input any
}

type Output struct {
    Job    Job
    Result any
    Err    error
}

func (s *Service) Process(ctx context.Context, jobs []Job) ([]Output, error) {
    ctx, cancel := context.WithTimeout(ctx, s.timeout)
    defer cancel()

    g, ctx := errgroup.WithContext(ctx)
    g.SetLimit(s.workers)

    jobCh := make(chan Job, s.bufferSize)
    resultCh := make(chan Output, s.bufferSize)

    g.Go(func() error {
        defer close(jobCh)
        for _, job := range jobs {
            select {
            case <-ctx.Done():
                return ctx.Err()
            case jobCh <- job:
            }
        }
        return nil
    })

    var processWg sync.WaitGroup
    for i := 0; i < s.workers; i++ {
        processWg.Add(1)
        go func() {
            defer processWg.Done()
            for job := range jobCh {
                output := Output{Job: job}
                output.Result, output.Err = s.processOne(ctx, job)
                select {
                case resultCh <- output:
                case <-ctx.Done():
                    return
                }
            }
        }()
    }

    go func() {
        processWg.Wait()
        close(resultCh)
    }()

    var outputs []Output
    for result := range resultCh {
        outputs = append(outputs, result)
    }

    if err := g.Wait(); err != nil {
        return outputs, fmt.Errorf("service process: %w", err)
    }
    return outputs, nil
}

func (s *Service) processOne(ctx context.Context, job Job) (any, error) {
    select {
    case <-ctx.Done():
        return nil, ctx.Err()
    default:
    }
    return fmt.Sprintf("processed-%s", job.ID), nil
}

关键设计点

  • errgroup控制并发度+错误传播
  • channel作为任务分发和结果收集
  • context统一超时和取消
  • WaitGroup确保所有worker完成后才关闭result channel
  • 分层设计:调度层(errgroup)+ 执行层(worker goroutine)+ 收集层(range resultCh)

5大常见陷阱及修复

陷阱1:闭包变量捕获

❌ 错误写法:

for _, url := range urls {
    go func() {
        fetch(url)
    }()
}

所有goroutine共享同一个url变量,最终都fetch最后一个URL。

✅ 正确写法:

for _, url := range urls {
    url := url
    go func() {
        fetch(url)
    }()
}

陷阱2:channel未关闭导致goroutine泄漏

❌ 错误写法:

func producer() <-chan int {
    ch := make(chan int)
    go func() {
        for i := 0; i < 100; i++ {
            ch <- i
        }
    }()
    return ch
}

消费者range channel永远不会结束,goroutine泄漏。

✅ 正确写法:

func producer() <-chan int {
    ch := make(chan int)
    go func() {
        defer close(ch)
        for i := 0; i < 100; i++ {
            ch <- i
        }
    }()
    return ch
}

陷阱3:WaitGroup的Add位置错误

❌ 错误写法:

var wg sync.WaitGroup
for i := 0; i < 10; i++ {
    go func() {
        wg.Add(1)
        defer wg.Done()
        doWork()
    }()
}
wg.Wait()

goroutine可能还没执行到wg.Add(1),主goroutine就调用了wg.Wait(),直接通过。

✅ 正确写法:

var wg sync.WaitGroup
for i := 0; i < 10; i++ {
    wg.Add(1)
    go func() {
        defer wg.Done()
        doWork()
    }()
}
wg.Wait()

陷阱4:select的空case

❌ 错误写法:

select {
case result := <-ch:
    process(result)
case <-time.After(5 * time.Second):
    return errors.New("timeout")
}

每次select都创建新的time.After channel,如果频繁调用会泄漏timer。

✅ 正确写法:

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

select {
case result := <-ch:
    process(result)
case <-ctx.Done():
    return ctx.Err()
}

陷阱5:Mutex复制

❌ 错误写法:

type SafeMap struct {
    mu   sync.Mutex
    data map[string]string
}

func copyMap(m SafeMap) SafeMap {
    return m
}

Mutex零值是"未锁定",复制一个已锁定的Mutex会导致死锁或数据竞争。

✅ 正确写法:

type SafeMap struct {
    mu   *sync.Mutex
    data map[string]string
}

func NewSafeMap() *SafeMap {
    return &SafeMap{
        mu:   &sync.Mutex{},
        data: make(map[string]string),
    }
}

错误排查速查表

错误现象 可能原因 排查方法 解决方案
fatal error: all goroutines are asleep - deadlock! 所有goroutine阻塞,无活跃goroutine 检查channel读写是否配对,select是否有default 确保channel有生产者和消费者,添加context取消
goroutine数量持续增长 goroutine泄漏,channel未关闭 runtime.NumGoroutine()监控,pprof goroutine profile 确保所有goroutine有退出路径,defer close(ch)
内存持续增长 goroutine持有大对象引用,channel堆积 pprof heap profile,检查channel buffer大小 限制channel buffer,及时释放引用
context canceled 错误频繁 上游context被取消,超时设置过短 检查context链路,确认超时时间是否合理 调整超时时间,区分业务错误和超时错误
并发结果丢失 goroutine在写入channel前退出 检查goroutine退出逻辑,确认channel是否已关闭 使用WaitGroup确保所有goroutine完成后再关闭channel
race condition 报错 多个goroutine同时读写共享变量 go test -race,检查全局变量和闭包捕获 使用Mutex/RWMutex保护,或改用channel通信
channel阻塞无响应 生产者/消费者速度不匹配 检查channel buffer大小,监控生产和消费速率 增加buffer,使用select+default非阻塞发送
服务优雅退出失败 goroutine未响应context取消 检查goroutine是否select了ctx.Done() 确保所有长时间运行的goroutine检查ctx.Done()
errgroup只返回一个错误 errgroup设计为首错取消 检查是否需要收集所有错误 使用自定义错误收集器或多个errgroup
信号量获取超时 并发度限制过严,请求排队过长 监控semaphore等待时间,调整权重配额 增加信号量容量,优化权重分配

高级优化技巧

优化1:动态Worker Pool

根据系统负载动态调整worker数量:

package dynamicpool

import (
    "context"
    "runtime"
    "sync"
    "sync/atomic"
    "time"
)

type DynamicPool struct {
    minWorkers int64
    maxWorkers int64
    active     atomic.Int64
    tasks      chan Task
    wg         sync.WaitGroup
    adjustTick *time.Ticker
}

func NewDynamicPool(minW, maxW int) *DynamicPool {
    return &DynamicPool{
        minWorkers: int64(minW),
        maxWorkers: int64(maxW),
        tasks:      make(chan Task, maxW*2),
        adjustTick: time.NewTicker(5 * time.Second),
    }
}

func (p *DynamicPool) Start(ctx context.Context) {
    for i := 0; i < int(p.minWorkers); i++ {
        p.addWorker(ctx)
    }

    go func() {
        for {
            select {
            case <-ctx.Done():
                p.adjustTick.Stop()
                return
            case <-p.adjustTick.C:
                p.adjustWorkers(ctx)
            }
        }
    }()
}

func (p *DynamicPool) addWorker(ctx context.Context) {
    p.active.Add(1)
    p.wg.Add(1)
    go func() {
        defer func() {
            p.active.Add(-1)
            p.wg.Done()
        }()
        for task := range p.tasks {
            _ = task(ctx)
        }
    }()
}

func (p *DynamicPool) adjustWorkers(ctx context.Context) {
    current := p.active.Load()
    queueLen := len(p.tasks)

    if queueLen > int(current) && current < p.maxWorkers {
        p.addWorker(ctx)
    } else if queueLen == 0 && current > p.minWorkers {
        for i := 0; i < 5 && int(p.active.Load()) > int(p.minWorkers); i++ {
            p.tasks <- nil
        }
    }
}

func (p *DynamicPool) Stop() {
    close(p.tasks)
    p.wg.Wait()
}

优化2:批量处理(Batching)

将多个小任务合并为一个批量操作,减少系统调用和IO开销:

package batcher

import (
    "context"
    "sync"
    "time"
)

type Batcher[T any, R any] struct {
    batchSize int
    flushInterval time.Duration
    handler func(ctx context.Context, batch []T) ([]R, error)
    mu      sync.Mutex
    buffer  []T
    results []R
}

func NewBatcher[T any, R any](
    batchSize int,
    flushInterval time.Duration,
    handler func(ctx context.Context, batch []T) ([]R, error),
) *Batcher[T, R] {
    return &Batcher[T, R]{
        batchSize:     batchSize,
        flushInterval: flushInterval,
        handler:       handler,
        buffer:        make([]T, 0, batchSize),
    }
}

func (b *Batcher[T, R]) Add(ctx context.Context, item T) (R, error) {
    b.mu.Lock()
    b.buffer = append(b.buffer, item)

    if len(b.buffer) >= b.batchSize {
        batch := b.buffer
        b.buffer = make([]T, 0, b.batchSize)
        b.mu.Unlock()

        results, err := b.handler(ctx, batch)
        if err != nil {
            var zero R
            return zero, err
        }
        b.results = append(b.results, results...)
        var zero R
        if len(results) > 0 {
            return results[0], nil
        }
        return zero, nil
    }
    b.mu.Unlock()

    var zero R
    return zero, nil
}

func (b *Batcher[T, R]) Flush(ctx context.Context) ([]R, error) {
    b.mu.Lock()
    batch := b.buffer
    b.buffer = make([]T, 0, b.batchSize)
    b.mu.Unlock()

    if len(batch) == 0 {
        return b.results, nil
    }

    results, err := b.handler(ctx, batch)
    if err != nil {
        return nil, err
    }
    b.results = append(b.results, results...)
    return b.results, nil
}

优化3:零拷贝Channel传递

使用指针和sync.Pool减少channel传递时的内存分配:

package zerocopy

import (
    "sync"
)

type Buffer struct {
    Data []byte
}

var bufferPool = sync.Pool{
    New: func() any {
        return &Buffer{Data: make([]byte, 0, 4096)}
    },
}

func GetBuffer() *Buffer {
    return bufferPool.Get().(*Buffer)
}

func PutBuffer(buf *Buffer) {
    buf.Data = buf.Data[:0]
    bufferPool.Put(buf)
}

func ProcessPipeline(ctx context.Context, input <-chan []byte) <-chan *Buffer {
    out := make(chan *Buffer, 64)
    go func() {
        defer close(out)
        for {
            select {
            case <-ctx.Done():
                return
            case data, ok := <-input:
                if !ok {
                    return
                }
                buf := GetBuffer()
                buf.Data = append(buf.Data[:0], data...)

                select {
                case out <- buf:
                case <-ctx.Done():
                    PutBuffer(buf)
                    return
                }
            }
        }
    }()
    return out
}

并发模式对比

特性 goroutine+channel sync.WaitGroup errgroup Worker Pool
并发控制 无内置控制 无内置控制 SetLimit 固定worker数
错误处理 需手动实现 需手动实现 自动首错取消 需手动实现
背压机制 channel buffer task channel
取消传播 需手动context 需手动context 自动context 需手动context
结果收集 channel接收 需共享变量 返回slice channel接收
适用场景 流式数据处理 批量任务等待 并发API调用 限流任务处理
复杂度
goroutine数量 不固定 不固定 有限制 固定
生产推荐度 ⭐⭐⭐ ⭐⭐ ⭐⭐⭐⭐⭐ ⭐⭐⭐⭐⭐

总结

Go的并发编程不是"能跑就行",而是要回答四个问题:多少个goroutine在跑?它们什么时候退出?出错怎么办?资源怎么回收? Worker Pool回答了"多少个",Context回答了"什么时候退出",errgroup回答了"出错怎么办",defer close()回答了"资源怎么回收"。掌握这7种模式,你就掌握了生产级Go并发编程的核心方法论。


推荐工具

  • JSON格式化工具 — 格式化并发服务的JSON响应,快速排查数据结构问题
  • 哈希计算工具 — 计算请求签名和数据校验,确保并发请求的数据一致性
  • Base64编解码 — 处理并发服务中的二进制数据编码传输

本站提供浏览器本地工具,免注册即可试用 →

#Go#并发#goroutine#channel#sync#2026#并发模式