Go并发模式实战:从Worker Pool到Pipeline的7种生产模式
当goroutine泄漏遇上无界并发:生产环境的噩梦
凌晨3点,线上服务OOM告警。排查发现:一个HTTP handler里无限制地启动goroutine,每个请求3个goroutine,QPS 1000就是3000个,10分钟就泄漏了18万个goroutine,内存直接打满。更可怕的是,这些goroutine持有的channel引用阻止了GC回收,最终整个节点崩溃。
这不是个例。Go的并发原语虽然简单——go func()一行代码就能启动并发,但生产环境的并发编程远不止启动goroutine那么简单。你需要控制并发度、处理错误传播、实现优雅退出、避免资源泄漏。本文将从7种生产级并发模式出发,帮你构建健壮的Go并发服务。
核心概念速查
| 原语 | 用途 | 关键特性 | 典型场景 |
|---|---|---|---|
goroutine |
轻量级并发执行单元 | 栈内存可伸缩(2KB起),调度由Go runtime管理 | 任何需要并发执行的任务 |
channel |
goroutine间通信 | 类型安全,支持缓冲/无缓冲,可关闭 | 数据传递、信号通知、结果收集 |
sync.WaitGroup |
等待一组goroutine完成 | Add/Done/Wait三件套 |
批量任务等待、并发扇出 |
sync.Mutex |
互斥锁保护共享状态 | 零值可用,支持TryLock(Go 1.18+) |
计数器、缓存更新、配置热更新 |
context.Context |
传播取消信号和超时 | 不可变,只能派生子context | 请求超时、优雅退出、链路追踪 |
errgroup.Group |
并发执行+错误收集 | 首个错误取消所有goroutine | 批量API调用、并行数据获取 |
semaphore.Weighted |
加权信号量限流 | 支持权重,可超时获取 | API限流、资源配额控制 |
生产环境并发编程的5大挑战
挑战1:无界并发导致资源耗尽
func handleRequests(urls []string) {
for _, url := range urls {
go fetch(url)
}
}
每个URL启动一个goroutine,1万个URL就是1万个并发连接。数据库连接池被打满,下游服务被压垮,内存暴涨。
挑战2:goroutine泄漏
func process(ch <-chan int) {
for {
val := <-ch
fmt.Println(val)
}
}
如果channel永远不会关闭,这个goroutine永远不会退出。在长期运行的服务中,泄漏的goroutine会不断累积。
挑战3:错误被静默吞掉
func fetchAll(urls []string) {
var wg sync.WaitGroup
for _, url := range urls {
wg.Add(1)
go func(u string) {
defer wg.Done()
resp, err := http.Get(u)
if err != nil {
log.Printf("fetch %s failed: %v", u, err)
return
}
process(resp)
}(url)
}
wg.Wait()
}
错误只是log了,调用方完全不知道有失败。如果3个URL中有2个失败,调用方以为全部成功。
挑战4:无法优雅退出
服务收到SIGTERM信号时,正在执行的goroutine被强制中断,正在写入的数据可能损坏,正在处理的事务可能半完成。
挑战5:并发模式组合困难
Worker Pool需要限流,Pipeline需要错误传播,Fan-out需要结果聚合。单独实现每个模式不难,但在一个服务中组合使用时,模式间的交互容易产生死锁。
7种生产级并发模式
模式1:Worker Pool——有界goroutine池
Worker Pool是最基础也最重要的并发模式。核心思想:固定数量的worker从任务队列取任务执行,避免无界并发。
package workerpool
import (
"context"
"sync"
)
type Task func(ctx context.Context) error
type Pool struct {
workers int
tasks chan Task
wg sync.WaitGroup
}
func NewPool(workers int, bufferSize int) *Pool {
return &Pool{
workers: workers,
tasks: make(chan Task, bufferSize),
}
}
func (p *Pool) Start(ctx context.Context) {
for i := 0; i < p.workers; i++ {
p.wg.Add(1)
go func(workerID int) {
defer p.wg.Done()
for {
select {
case <-ctx.Done():
return
case task, ok := <-p.tasks:
if !ok {
return
}
_ = task(ctx)
}
}
}(i)
}
}
func (p *Pool) Submit(task Task) bool {
select {
case p.tasks <- task:
return true
default:
return false
}
}
func (p *Pool) Stop() {
close(p.tasks)
p.wg.Wait()
}
使用示例:
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
pool := NewPool(10, 100)
pool.Start(ctx)
urls := []string{
"https://api.example.com/users",
"https://api.example.com/orders",
"https://api.example.com/products",
}
for _, url := range urls {
u := url
pool.Submit(func(ctx context.Context) error {
return fetchURL(ctx, u)
})
}
pool.Stop()
}
关键设计点:
- worker数量固定,避免goroutine爆炸
- 带缓冲的task channel作为任务队列
- context支持取消,worker可优雅退出
Submit非阻塞,任务队列满时返回false
模式2:Fan-out/Fan-in——并行扇出扇入
Fan-out将一个数据源分发到多个goroutine并行处理,Fan-in将多个goroutine的结果汇聚到一个channel。
package fan
import (
"context"
"sync"
)
func FanOut[T any](ctx context.Context, source <-chan T, workers int) []<-chan T {
channels := make([]<-chan T, workers)
for i := 0; i < workers; i++ {
ch := make(chan T)
channels[i] = ch
go func() {
defer close(ch)
for {
select {
case <-ctx.Done():
return
case val, ok := <-source:
if !ok {
return
}
select {
case ch <- val:
case <-ctx.Done():
return
}
}
}
}()
}
return channels
}
func FanIn[T any](ctx context.Context, channels ...<-chan T) <-chan T {
out := make(chan T)
var wg sync.WaitGroup
wg.Add(len(channels))
for _, ch := range channels {
go func(c <-chan T) {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
case val, ok := <-c:
if !ok {
return
}
select {
case out <- val:
case <-ctx.Done():
return
}
}
}
}(ch)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
使用示例——并行处理订单:
func processOrders(ctx context.Context, orders <-chan Order) <-chan Result {
workers := FanOut(ctx, orders, 5)
return FanIn(ctx, workers...)
}
关键设计点:
- 泛型支持(Go 1.18+),适用于任意类型
- 每个fan-out goroutine独立消费source channel
- fan-in使用WaitGroup等待所有输入channel关闭
- context取消时所有goroutine都能退出
模式3:Pipeline——阶段式处理流水线
Pipeline将复杂处理拆分为多个阶段(stage),每个阶段是一个goroutine,通过channel连接。
package pipeline
import (
"context"
)
type Stage[In any, Out any] func(ctx context.Context, in <-chan In) <-chan Out
func NewPipeline[In any, Out any](
ctx context.Context,
source <-chan In,
stages ...Stage[In, In],
) <-chan Out {
current := source
for _, stage := range stages {
current = stage(ctx, current)
}
return any(current).(<-chan Out)
}
func NewStage[In any, Out any](
process func(ctx context.Context, in In) (Out, error),
bufferSize int,
) Stage[In, Out] {
return func(ctx context.Context, in <-chan In) <-chan Out {
out := make(chan Out, bufferSize)
go func() {
defer close(out)
for {
select {
case <-ctx.Done():
return
case val, ok := <-in:
if !ok {
return
}
result, err := process(ctx, val)
if err != nil {
continue
}
select {
case out <- result:
case <-ctx.Done():
return
}
}
}
}()
return out
}
}
使用示例——数据处理流水线:
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
raw := make(chan RawData, 100)
validate := NewStage[RawData, ValidData](func(ctx context.Context, in RawData) (ValidData, error) {
if err := in.Validate(); err != nil {
return ValidData{}, err
}
return in.ToValid(), nil
}, 50)
enrich := NewStage[ValidData, EnrichedData](func(ctx context.Context, in ValidData) (EnrichedData, error) {
return fetchExtraInfo(ctx, in)
}, 50)
transform := NewStage[EnrichedData, FinalData](func(ctx context.Context, in EnrichedData) (FinalData, error) {
return in.Transform()
}, 50)
result := NewPipeline(ctx, raw, validate, enrich, transform)
go func() {
for r := range result {
saveToDB(r)
}
}()
}
关键设计点:
- 每个stage是独立的goroutine,可独立伸缩
- channel作为stage间的背压机制
- 错误在stage内部处理,不中断整个pipeline
- context取消时所有stage优雅退出
模式4:errgroup——并发错误处理
errgroup是sync.WaitGroup的错误感知版本:首个错误会取消所有goroutine。
package main
import (
"context"
"fmt"
"net/http"
"time"
"golang.org/x/sync/errgroup"
)
type FetchResult struct {
URL string
Body string
Size int
}
func fetchMultiple(ctx context.Context, urls []string) ([]FetchResult, error) {
g, ctx := errgroup.WithContext(ctx)
results := make([]FetchResult, len(urls))
for i, url := range urls {
i, url := i, url
g.Go(func() error {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return fmt.Errorf("create request %s: %w", url, err)
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return fmt.Errorf("fetch %s: %w", url, err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("fetch %s: status %d", url, resp.StatusCode)
}
body, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("read %s: %w", url, err)
}
results[i] = FetchResult{
URL: url,
Body: string(body),
Size: len(body),
}
return nil
})
}
if err := g.Wait(); err != nil {
return nil, err
}
return results, nil
}
带并发度限制的errgroup:
func fetchWithLimit(ctx context.Context, urls []string, maxConcurrent int) ([]FetchResult, error) {
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(maxConcurrent)
results := make([]FetchResult, len(urls))
for i, url := range urls {
i, url := i, url
g.Go(func() error {
results[i], _ = fetchOne(ctx, url)
return nil
})
}
if err := g.Wait(); err != nil {
return nil, err
}
return results, nil
}
关键设计点:
errgroup.WithContext自动传播取消信号g.SetLimit(n)控制最大并发数(Go 1.20+)- 首个错误取消所有进行中的goroutine
- 闭包变量捕获需要
i, url := i, url
模式5:Semaphore——信号量限流
semaphore.Weighted提供加权信号量,适合不同权重的资源分配场景。
package ratelimit
import (
"context"
"fmt"
"golang.org/x/sync/semaphore"
)
type RateLimiter struct {
sem *semaphore.Weighted
}
func NewRateLimiter(maxWeight int64) *RateLimiter {
return &RateLimiter{
sem: semaphore.NewWeighted(maxWeight),
}
}
func (r *RateLimiter) Do(ctx context.Context, weight int64, fn func() error) error {
if err := r.sem.Acquire(ctx, weight); err != nil {
return fmt.Errorf("acquire semaphore: %w", err)
}
defer r.sem.Release(weight)
return fn()
}
使用示例——API分级限流:
func main() {
limiter := NewRateLimiter(100)
err := limiter.Do(context.Background(), 10, func() error {
return callLightAPI()
})
err = limiter.Do(context.Background(), 50, func() error {
return callHeavyAPI()
})
}
带超时的信号量获取:
func (r *RateLimiter) DoWithTimeout(ctx context.Context, weight int64, timeout time.Duration, fn func() error) error {
acquireCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
if err := r.sem.Acquire(acquireCtx, weight); err != nil {
return fmt.Errorf("semaphore acquire timeout: %w", err)
}
defer r.sem.Release(weight)
return fn()
}
关键设计点:
- 加权信号量,不同操作消耗不同配额
Acquire支持context取消和超时Release必须与Acquire配对调用- 适合API分级限流、资源配额控制
模式6:Context取消与超时
Context是Go并发编程的"生命线",用于传播取消信号、超时和截止时间。
package ctxutil
import (
"context"
"fmt"
"time"
)
type Result struct {
Data string
Error error
}
func FetchWithTimeout(ctx context.Context, url string, timeout time.Duration) (string, error) {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return "", fmt.Errorf("create request: %w", err)
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
if ctx.Err() == context.DeadlineExceeded {
return "", fmt.Errorf("fetch %s timed out after %v: %w", url, timeout, ctx.Err())
}
return "", fmt.Errorf("fetch %s: %w", url, err)
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return "", fmt.Errorf("read response: %w", err)
}
return string(body), nil
}
func BatchFetch(ctx context.Context, urls []string, timeout time.Duration) []Result {
results := make([]Result, len(urls))
var wg sync.WaitGroup
for i, url := range urls {
wg.Add(1)
go func(idx int, u string) {
defer wg.Done()
data, err := FetchWithTimeout(ctx, u, timeout)
results[idx] = Result{Data: data, Error: err}
}(i, url)
}
wg.Wait()
return results
}
优雅退出模式:
func main() {
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()
server := &http.Server{Addr: ":8080"}
go func() {
<-ctx.Done()
shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
_ = server.Shutdown(shutdownCtx)
}()
_ = server.ListenAndServe()
}
关键设计点:
WithTimeout/WithDeadline设置超时WithCancel手动控制取消signal.NotifyContext监听系统信号- context取消会传播到所有子goroutine
defer cancel()防止context泄漏
模式7:生产级并发服务——模式组合
将上述模式组合,构建一个生产级并发服务:Worker Pool + Pipeline + errgroup + Context。
package concurrencyservice
import (
"context"
"fmt"
"sync"
"time"
"golang.org/x/sync/errgroup"
)
type Service struct {
workers int
bufferSize int
timeout time.Duration
}
func NewService(workers, bufferSize int, timeout time.Duration) *Service {
return &Service{
workers: workers,
bufferSize: bufferSize,
timeout: timeout,
}
}
type Job struct {
ID string
Input any
}
type Output struct {
Job Job
Result any
Err error
}
func (s *Service) Process(ctx context.Context, jobs []Job) ([]Output, error) {
ctx, cancel := context.WithTimeout(ctx, s.timeout)
defer cancel()
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(s.workers)
jobCh := make(chan Job, s.bufferSize)
resultCh := make(chan Output, s.bufferSize)
g.Go(func() error {
defer close(jobCh)
for _, job := range jobs {
select {
case <-ctx.Done():
return ctx.Err()
case jobCh <- job:
}
}
return nil
})
var processWg sync.WaitGroup
for i := 0; i < s.workers; i++ {
processWg.Add(1)
go func() {
defer processWg.Done()
for job := range jobCh {
output := Output{Job: job}
output.Result, output.Err = s.processOne(ctx, job)
select {
case resultCh <- output:
case <-ctx.Done():
return
}
}
}()
}
go func() {
processWg.Wait()
close(resultCh)
}()
var outputs []Output
for result := range resultCh {
outputs = append(outputs, result)
}
if err := g.Wait(); err != nil {
return outputs, fmt.Errorf("service process: %w", err)
}
return outputs, nil
}
func (s *Service) processOne(ctx context.Context, job Job) (any, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
return fmt.Sprintf("processed-%s", job.ID), nil
}
关键设计点:
- errgroup控制并发度+错误传播
- channel作为任务分发和结果收集
- context统一超时和取消
- WaitGroup确保所有worker完成后才关闭result channel
- 分层设计:调度层(errgroup)+ 执行层(worker goroutine)+ 收集层(range resultCh)
5大常见陷阱及修复
陷阱1:闭包变量捕获
❌ 错误写法:
for _, url := range urls {
go func() {
fetch(url)
}()
}
所有goroutine共享同一个url变量,最终都fetch最后一个URL。
✅ 正确写法:
for _, url := range urls {
url := url
go func() {
fetch(url)
}()
}
陷阱2:channel未关闭导致goroutine泄漏
❌ 错误写法:
func producer() <-chan int {
ch := make(chan int)
go func() {
for i := 0; i < 100; i++ {
ch <- i
}
}()
return ch
}
消费者range channel永远不会结束,goroutine泄漏。
✅ 正确写法:
func producer() <-chan int {
ch := make(chan int)
go func() {
defer close(ch)
for i := 0; i < 100; i++ {
ch <- i
}
}()
return ch
}
陷阱3:WaitGroup的Add位置错误
❌ 错误写法:
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
go func() {
wg.Add(1)
defer wg.Done()
doWork()
}()
}
wg.Wait()
goroutine可能还没执行到wg.Add(1),主goroutine就调用了wg.Wait(),直接通过。
✅ 正确写法:
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
doWork()
}()
}
wg.Wait()
陷阱4:select的空case
❌ 错误写法:
select {
case result := <-ch:
process(result)
case <-time.After(5 * time.Second):
return errors.New("timeout")
}
每次select都创建新的time.After channel,如果频繁调用会泄漏timer。
✅ 正确写法:
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
select {
case result := <-ch:
process(result)
case <-ctx.Done():
return ctx.Err()
}
陷阱5:Mutex复制
❌ 错误写法:
type SafeMap struct {
mu sync.Mutex
data map[string]string
}
func copyMap(m SafeMap) SafeMap {
return m
}
Mutex零值是"未锁定",复制一个已锁定的Mutex会导致死锁或数据竞争。
✅ 正确写法:
type SafeMap struct {
mu *sync.Mutex
data map[string]string
}
func NewSafeMap() *SafeMap {
return &SafeMap{
mu: &sync.Mutex{},
data: make(map[string]string),
}
}
错误排查速查表
| 错误现象 | 可能原因 | 排查方法 | 解决方案 |
|---|---|---|---|
fatal error: all goroutines are asleep - deadlock! |
所有goroutine阻塞,无活跃goroutine | 检查channel读写是否配对,select是否有default | 确保channel有生产者和消费者,添加context取消 |
| goroutine数量持续增长 | goroutine泄漏,channel未关闭 | runtime.NumGoroutine()监控,pprof goroutine profile |
确保所有goroutine有退出路径,defer close(ch) |
| 内存持续增长 | goroutine持有大对象引用,channel堆积 | pprof heap profile,检查channel buffer大小 | 限制channel buffer,及时释放引用 |
context canceled 错误频繁 |
上游context被取消,超时设置过短 | 检查context链路,确认超时时间是否合理 | 调整超时时间,区分业务错误和超时错误 |
| 并发结果丢失 | goroutine在写入channel前退出 | 检查goroutine退出逻辑,确认channel是否已关闭 | 使用WaitGroup确保所有goroutine完成后再关闭channel |
race condition 报错 |
多个goroutine同时读写共享变量 | go test -race,检查全局变量和闭包捕获 |
使用Mutex/RWMutex保护,或改用channel通信 |
| channel阻塞无响应 | 生产者/消费者速度不匹配 | 检查channel buffer大小,监控生产和消费速率 | 增加buffer,使用select+default非阻塞发送 |
| 服务优雅退出失败 | goroutine未响应context取消 | 检查goroutine是否select了ctx.Done() | 确保所有长时间运行的goroutine检查ctx.Done() |
| errgroup只返回一个错误 | errgroup设计为首错取消 | 检查是否需要收集所有错误 | 使用自定义错误收集器或多个errgroup |
| 信号量获取超时 | 并发度限制过严,请求排队过长 | 监控semaphore等待时间,调整权重配额 | 增加信号量容量,优化权重分配 |
高级优化技巧
优化1:动态Worker Pool
根据系统负载动态调整worker数量:
package dynamicpool
import (
"context"
"runtime"
"sync"
"sync/atomic"
"time"
)
type DynamicPool struct {
minWorkers int64
maxWorkers int64
active atomic.Int64
tasks chan Task
wg sync.WaitGroup
adjustTick *time.Ticker
}
func NewDynamicPool(minW, maxW int) *DynamicPool {
return &DynamicPool{
minWorkers: int64(minW),
maxWorkers: int64(maxW),
tasks: make(chan Task, maxW*2),
adjustTick: time.NewTicker(5 * time.Second),
}
}
func (p *DynamicPool) Start(ctx context.Context) {
for i := 0; i < int(p.minWorkers); i++ {
p.addWorker(ctx)
}
go func() {
for {
select {
case <-ctx.Done():
p.adjustTick.Stop()
return
case <-p.adjustTick.C:
p.adjustWorkers(ctx)
}
}
}()
}
func (p *DynamicPool) addWorker(ctx context.Context) {
p.active.Add(1)
p.wg.Add(1)
go func() {
defer func() {
p.active.Add(-1)
p.wg.Done()
}()
for task := range p.tasks {
_ = task(ctx)
}
}()
}
func (p *DynamicPool) adjustWorkers(ctx context.Context) {
current := p.active.Load()
queueLen := len(p.tasks)
if queueLen > int(current) && current < p.maxWorkers {
p.addWorker(ctx)
} else if queueLen == 0 && current > p.minWorkers {
for i := 0; i < 5 && int(p.active.Load()) > int(p.minWorkers); i++ {
p.tasks <- nil
}
}
}
func (p *DynamicPool) Stop() {
close(p.tasks)
p.wg.Wait()
}
优化2:批量处理(Batching)
将多个小任务合并为一个批量操作,减少系统调用和IO开销:
package batcher
import (
"context"
"sync"
"time"
)
type Batcher[T any, R any] struct {
batchSize int
flushInterval time.Duration
handler func(ctx context.Context, batch []T) ([]R, error)
mu sync.Mutex
buffer []T
results []R
}
func NewBatcher[T any, R any](
batchSize int,
flushInterval time.Duration,
handler func(ctx context.Context, batch []T) ([]R, error),
) *Batcher[T, R] {
return &Batcher[T, R]{
batchSize: batchSize,
flushInterval: flushInterval,
handler: handler,
buffer: make([]T, 0, batchSize),
}
}
func (b *Batcher[T, R]) Add(ctx context.Context, item T) (R, error) {
b.mu.Lock()
b.buffer = append(b.buffer, item)
if len(b.buffer) >= b.batchSize {
batch := b.buffer
b.buffer = make([]T, 0, b.batchSize)
b.mu.Unlock()
results, err := b.handler(ctx, batch)
if err != nil {
var zero R
return zero, err
}
b.results = append(b.results, results...)
var zero R
if len(results) > 0 {
return results[0], nil
}
return zero, nil
}
b.mu.Unlock()
var zero R
return zero, nil
}
func (b *Batcher[T, R]) Flush(ctx context.Context) ([]R, error) {
b.mu.Lock()
batch := b.buffer
b.buffer = make([]T, 0, b.batchSize)
b.mu.Unlock()
if len(batch) == 0 {
return b.results, nil
}
results, err := b.handler(ctx, batch)
if err != nil {
return nil, err
}
b.results = append(b.results, results...)
return b.results, nil
}
优化3:零拷贝Channel传递
使用指针和sync.Pool减少channel传递时的内存分配:
package zerocopy
import (
"sync"
)
type Buffer struct {
Data []byte
}
var bufferPool = sync.Pool{
New: func() any {
return &Buffer{Data: make([]byte, 0, 4096)}
},
}
func GetBuffer() *Buffer {
return bufferPool.Get().(*Buffer)
}
func PutBuffer(buf *Buffer) {
buf.Data = buf.Data[:0]
bufferPool.Put(buf)
}
func ProcessPipeline(ctx context.Context, input <-chan []byte) <-chan *Buffer {
out := make(chan *Buffer, 64)
go func() {
defer close(out)
for {
select {
case <-ctx.Done():
return
case data, ok := <-input:
if !ok {
return
}
buf := GetBuffer()
buf.Data = append(buf.Data[:0], data...)
select {
case out <- buf:
case <-ctx.Done():
PutBuffer(buf)
return
}
}
}
}()
return out
}
并发模式对比
| 特性 | goroutine+channel | sync.WaitGroup | errgroup | Worker Pool |
|---|---|---|---|---|
| 并发控制 | 无内置控制 | 无内置控制 | SetLimit |
固定worker数 |
| 错误处理 | 需手动实现 | 需手动实现 | 自动首错取消 | 需手动实现 |
| 背压机制 | channel buffer | 无 | 无 | task channel |
| 取消传播 | 需手动context | 需手动context | 自动context | 需手动context |
| 结果收集 | channel接收 | 需共享变量 | 返回slice | channel接收 |
| 适用场景 | 流式数据处理 | 批量任务等待 | 并发API调用 | 限流任务处理 |
| 复杂度 | 中 | 低 | 低 | 中 |
| goroutine数量 | 不固定 | 不固定 | 有限制 | 固定 |
| 生产推荐度 | ⭐⭐⭐ | ⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ |
总结
Go的并发编程不是"能跑就行",而是要回答四个问题:多少个goroutine在跑?它们什么时候退出?出错怎么办?资源怎么回收? Worker Pool回答了"多少个",Context回答了"什么时候退出",errgroup回答了"出错怎么办",defer close()回答了"资源怎么回收"。掌握这7种模式,你就掌握了生产级Go并发编程的核心方法论。
推荐工具
本站提供浏览器本地工具,免注册即可试用 →