Go併發模式實戰:從Worker Pool到Pipeline的7種生產模式
當goroutine泄漏遇上無界併發:生產環境的噩夢
凌晨3點,線上服務OOM告警。排查發現:一個HTTP handler裡無限制地啟動goroutine,每個請求3個goroutine,QPS 1000就是3000個,10分鐘就泄漏了18萬個goroutine,記憶體直接打滿。更可怕的是,這些goroutine持有的channel引用阻止了GC回收,最終整個節點崩潰。
這不是個例。Go的併發原語雖然簡單——go func()一行程式碼就能啟動併發,但生產環境的併發程式設計遠不止啟動goroutine那麼簡單。你需要控制併發度、處理錯誤傳播、實現優雅退出、避免資源泄漏。本文將從7種生產級併發模式出發,幫你構建健壯的Go併發服務。
核心概念速查
| 原語 | 用途 | 關鍵特性 | 典型場景 |
|---|---|---|---|
goroutine |
輕量級併發執行單元 | 棧記憶體可伸縮(2KB起),排程由Go runtime管理 | 任何需要併發執行的任務 |
channel |
goroutine間通訊 | 型別安全,支援緩衝/無緩衝,可關閉 | 資料傳遞、訊號通知、結果收集 |
sync.WaitGroup |
等待一組goroutine完成 | Add/Done/Wait三件套 |
批量任務等待、併發扇出 |
sync.Mutex |
互斥鎖保護共享狀態 | 零值可用,支援TryLock(Go 1.18+) |
計數器、快取更新、配置熱更新 |
context.Context |
傳播取消訊號和超時 | 不可變,只能派生子context | 請求超時、優雅退出、鏈路追蹤 |
errgroup.Group |
併發執行+錯誤收集 | 首個錯誤取消所有goroutine | 批量API呼叫、平行資料獲取 |
semaphore.Weighted |
加權訊號量限流 | 支援權重,可超時獲取 | API限流、資源配額控制 |
生產環境併發程式設計的5大挑戰
挑戰1:無界併發導致資源耗盡
func handleRequests(urls []string) {
for _, url := range urls {
go fetch(url)
}
}
每個URL啟動一個goroutine,1萬個URL就是1萬個併發連線。資料庫連線池被打滿,下游服務被壓垮,記憶體暴漲。
挑戰2:goroutine泄漏
func process(ch <-chan int) {
for {
val := <-ch
fmt.Println(val)
}
}
如果channel永遠不會關閉,這個goroutine永遠不會退出。在長期執行的服務中,泄漏的goroutine會不斷累積。
挑戰3:錯誤被靜默吞掉
func fetchAll(urls []string) {
var wg sync.WaitGroup
for _, url := range urls {
wg.Add(1)
go func(u string) {
defer wg.Done()
resp, err := http.Get(u)
if err != nil {
log.Printf("fetch %s failed: %v", u, err)
return
}
process(resp)
}(url)
}
wg.Wait()
}
錯誤只是log了,呼叫方完全不知道有失敗。如果3個URL中有2個失敗,呼叫方以為全部成功。
挑戰4:無法優雅退出
服務收到SIGTERM訊號時,正在執行的goroutine被強制中斷,正在寫入的資料可能損壞,正在處理的交易可能半完成。
挑戰5:併發模式組合困難
Worker Pool需要限流,Pipeline需要錯誤傳播,Fan-out需要結果聚合。單獨實現每個模式不難,但在一個服務中組合使用時,模式間的互動容易產生死結。
7種生產級併發模式
模式1:Worker Pool——有界goroutine池
Worker Pool是最基礎也最重要的併發模式。核心思想:固定數量的worker從任務佇列取任務執行,避免無界併發。
package workerpool
import (
"context"
"sync"
)
type Task func(ctx context.Context) error
type Pool struct {
workers int
tasks chan Task
wg sync.WaitGroup
}
func NewPool(workers int, bufferSize int) *Pool {
return &Pool{
workers: workers,
tasks: make(chan Task, bufferSize),
}
}
func (p *Pool) Start(ctx context.Context) {
for i := 0; i < p.workers; i++ {
p.wg.Add(1)
go func(workerID int) {
defer p.wg.Done()
for {
select {
case <-ctx.Done():
return
case task, ok := <-p.tasks:
if !ok {
return
}
_ = task(ctx)
}
}
}(i)
}
}
func (p *Pool) Submit(task Task) bool {
select {
case p.tasks <- task:
return true
default:
return false
}
}
func (p *Pool) Stop() {
close(p.tasks)
p.wg.Wait()
}
使用範例:
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
pool := NewPool(10, 100)
pool.Start(ctx)
urls := []string{
"https://api.example.com/users",
"https://api.example.com/orders",
"https://api.example.com/products",
}
for _, url := range urls {
u := url
pool.Submit(func(ctx context.Context) error {
return fetchURL(ctx, u)
})
}
pool.Stop()
}
關鍵設計要點:
- worker數量固定,避免goroutine爆炸
- 帶緩衝的task channel作為任務佇列
- context支援取消,worker可優雅退出
Submit非阻塞,任務佇列滿時回傳false
模式2:Fan-out/Fan-in——平行扇出扇入
Fan-out將一個資料來源分發到多個goroutine平行處理,Fan-in將多個goroutine的結果匯聚到一個channel。
package fan
import (
"context"
"sync"
)
func FanOut[T any](ctx context.Context, source <-chan T, workers int) []<-chan T {
channels := make([]<-chan T, workers)
for i := 0; i < workers; i++ {
ch := make(chan T)
channels[i] = ch
go func() {
defer close(ch)
for {
select {
case <-ctx.Done():
return
case val, ok := <-source:
if !ok {
return
}
select {
case ch <- val:
case <-ctx.Done():
return
}
}
}
}()
}
return channels
}
func FanIn[T any](ctx context.Context, channels ...<-chan T) <-chan T {
out := make(chan T)
var wg sync.WaitGroup
wg.Add(len(channels))
for _, ch := range channels {
go func(c <-chan T) {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
case val, ok := <-c:
if !ok {
return
}
select {
case out <- val:
case <-ctx.Done():
return
}
}
}
}(ch)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
使用範例——平行處理訂單:
func processOrders(ctx context.Context, orders <-chan Order) <-chan Result {
workers := FanOut(ctx, orders, 5)
return FanIn(ctx, workers...)
}
關鍵設計要點:
- 泛型支援(Go 1.18+),適用於任意型別
- 每個fan-out goroutine獨立消費source channel
- fan-in使用WaitGroup等待所有輸入channel關閉
- context取消時所有goroutine都能退出
模式3:Pipeline——階段式處理流水線
Pipeline將複雜處理拆分為多個階段(stage),每個階段是一個goroutine,透過channel連線。
package pipeline
import (
"context"
)
type Stage[In any, Out any] func(ctx context.Context, in <-chan In) <-chan Out
func NewPipeline[In any, Out any](
ctx context.Context,
source <-chan In,
stages ...Stage[In, In],
) <-chan Out {
current := source
for _, stage := range stages {
current = stage(ctx, current)
}
return any(current).(<-chan Out)
}
func NewStage[In any, Out any](
process func(ctx context.Context, in In) (Out, error),
bufferSize int,
) Stage[In, Out] {
return func(ctx context.Context, in <-chan In) <-chan Out {
out := make(chan Out, bufferSize)
go func() {
defer close(out)
for {
select {
case <-ctx.Done():
return
case val, ok := <-in:
if !ok {
return
}
result, err := process(ctx, val)
if err != nil {
continue
}
select {
case out <- result:
case <-ctx.Done():
return
}
}
}
}()
return out
}
}
使用範例——資料處理流水線:
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
raw := make(chan RawData, 100)
validate := NewStage[RawData, ValidData](func(ctx context.Context, in RawData) (ValidData, error) {
if err := in.Validate(); err != nil {
return ValidData{}, err
}
return in.ToValid(), nil
}, 50)
enrich := NewStage[ValidData, EnrichedData](func(ctx context.Context, in ValidData) (EnrichedData, error) {
return fetchExtraInfo(ctx, in)
}, 50)
transform := NewStage[EnrichedData, FinalData](func(ctx context.Context, in EnrichedData) (FinalData, error) {
return in.Transform()
}, 50)
result := NewPipeline(ctx, raw, validate, enrich, transform)
go func() {
for r := range result {
saveToDB(r)
}
}()
}
關鍵設計要點:
- 每個stage是獨立的goroutine,可獨立伸縮
- channel作為stage間的背壓機制
- 錯誤在stage內部處理,不中斷整個pipeline
- context取消時所有stage優雅退出
模式4:errgroup——併發錯誤處理
errgroup是sync.WaitGroup的錯誤感知版本:首個錯誤會取消所有goroutine。
package main
import (
"context"
"fmt"
"net/http"
"time"
"golang.org/x/sync/errgroup"
)
type FetchResult struct {
URL string
Body string
Size int
}
func fetchMultiple(ctx context.Context, urls []string) ([]FetchResult, error) {
g, ctx := errgroup.WithContext(ctx)
results := make([]FetchResult, len(urls))
for i, url := range urls {
i, url := i, url
g.Go(func() error {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return fmt.Errorf("create request %s: %w", url, err)
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return fmt.Errorf("fetch %s: %w", url, err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("fetch %s: status %d", url, resp.StatusCode)
}
body, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("read %s: %w", url, err)
}
results[i] = FetchResult{
URL: url,
Body: string(body),
Size: len(body),
}
return nil
})
}
if err := g.Wait(); err != nil {
return nil, err
}
return results, nil
}
帶併發度限制的errgroup:
func fetchWithLimit(ctx context.Context, urls []string, maxConcurrent int) ([]FetchResult, error) {
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(maxConcurrent)
results := make([]FetchResult, len(urls))
for i, url := range urls {
i, url := i, url
g.Go(func() error {
results[i], _ = fetchOne(ctx, url)
return nil
})
}
if err := g.Wait(); err != nil {
return nil, err
}
return results, nil
}
關鍵設計要點:
errgroup.WithContext自動傳播取消訊號g.SetLimit(n)控制最大併發數(Go 1.20+)- 首個錯誤取消所有進行中的goroutine
- 閉包變數捕獲需要
i, url := i, url
模式5:Semaphore——訊號量限流
semaphore.Weighted提供加權訊號量,適合不同權重的資源分配場景。
package ratelimit
import (
"context"
"fmt"
"golang.org/x/sync/semaphore"
)
type RateLimiter struct {
sem *semaphore.Weighted
}
func NewRateLimiter(maxWeight int64) *RateLimiter {
return &RateLimiter{
sem: semaphore.NewWeighted(maxWeight),
}
}
func (r *RateLimiter) Do(ctx context.Context, weight int64, fn func() error) error {
if err := r.sem.Acquire(ctx, weight); err != nil {
return fmt.Errorf("acquire semaphore: %w", err)
}
defer r.sem.Release(weight)
return fn()
}
使用範例——API分級限流:
func main() {
limiter := NewRateLimiter(100)
err := limiter.Do(context.Background(), 10, func() error {
return callLightAPI()
})
err = limiter.Do(context.Background(), 50, func() error {
return callHeavyAPI()
})
}
帶超時的訊號量獲取:
func (r *RateLimiter) DoWithTimeout(ctx context.Context, weight int64, timeout time.Duration, fn func() error) error {
acquireCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
if err := r.sem.Acquire(acquireCtx, weight); err != nil {
return fmt.Errorf("semaphore acquire timeout: %w", err)
}
defer r.sem.Release(weight)
return fn()
}
關鍵設計要點:
- 加權訊號量,不同操作消耗不同配額
Acquire支援context取消和超時Release必須與Acquire配對呼叫- 適合API分級限流、資源配額控制
模式6:Context取消與超時
Context是Go併發程式設計的「生命線」,用於傳播取消訊號、超時和截止時間。
package ctxutil
import (
"context"
"fmt"
"time"
)
type Result struct {
Data string
Error error
}
func FetchWithTimeout(ctx context.Context, url string, timeout time.Duration) (string, error) {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return "", fmt.Errorf("create request: %w", err)
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
if ctx.Err() == context.DeadlineExceeded {
return "", fmt.Errorf("fetch %s timed out after %v: %w", url, timeout, ctx.Err())
}
return "", fmt.Errorf("fetch %s: %w", url, err)
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return "", fmt.Errorf("read response: %w", err)
}
return string(body), nil
}
func BatchFetch(ctx context.Context, urls []string, timeout time.Duration) []Result {
results := make([]Result, len(urls))
var wg sync.WaitGroup
for i, url := range urls {
wg.Add(1)
go func(idx int, u string) {
defer wg.Done()
data, err := FetchWithTimeout(ctx, u, timeout)
results[idx] = Result{Data: data, Error: err}
}(i, url)
}
wg.Wait()
return results
}
優雅退出模式:
func main() {
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()
server := &http.Server{Addr: ":8080"}
go func() {
<-ctx.Done()
shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
_ = server.Shutdown(shutdownCtx)
}()
_ = server.ListenAndServe()
}
關鍵設計要點:
WithTimeout/WithDeadline設定超時WithCancel手動控制取消signal.NotifyContext監聽系統訊號- context取消會傳播到所有子goroutine
defer cancel()防止context泄漏
模式7:生產級併發服務——模式組合
將上述模式組合,構建一個生產級併發服務:Worker Pool + Pipeline + errgroup + Context。
package concurrencyservice
import (
"context"
"fmt"
"sync"
"time"
"golang.org/x/sync/errgroup"
)
type Service struct {
workers int
bufferSize int
timeout time.Duration
}
func NewService(workers, bufferSize int, timeout time.Duration) *Service {
return &Service{
workers: workers,
bufferSize: bufferSize,
timeout: timeout,
}
}
type Job struct {
ID string
Input any
}
type Output struct {
Job Job
Result any
Err error
}
func (s *Service) Process(ctx context.Context, jobs []Job) ([]Output, error) {
ctx, cancel := context.WithTimeout(ctx, s.timeout)
defer cancel()
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(s.workers)
jobCh := make(chan Job, s.bufferSize)
resultCh := make(chan Output, s.bufferSize)
g.Go(func() error {
defer close(jobCh)
for _, job := range jobs {
select {
case <-ctx.Done():
return ctx.Err()
case jobCh <- job:
}
}
return nil
})
var processWg sync.WaitGroup
for i := 0; i < s.workers; i++ {
processWg.Add(1)
go func() {
defer processWg.Done()
for job := range jobCh {
output := Output{Job: job}
output.Result, output.Err = s.processOne(ctx, job)
select {
case resultCh <- output:
case <-ctx.Done():
return
}
}
}()
}
go func() {
processWg.Wait()
close(resultCh)
}()
var outputs []Output
for result := range resultCh {
outputs = append(outputs, result)
}
if err := g.Wait(); err != nil {
return outputs, fmt.Errorf("service process: %w", err)
}
return outputs, nil
}
func (s *Service) processOne(ctx context.Context, job Job) (any, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
return fmt.Sprintf("processed-%s", job.ID), nil
}
關鍵設計要點:
- errgroup控制併發度+錯誤傳播
- channel作為任務分發和結果收集
- context統一超時和取消
- WaitGroup確保所有worker完成後才關閉result channel
- 分層設計:排程層(errgroup)+ 執行層(worker goroutine)+ 收集層(range resultCh)
5大常見陷阱及修復
陷阱1:閉包變數捕獲
❌ 錯誤寫法:
for _, url := range urls {
go func() {
fetch(url)
}()
}
所有goroutine共享同一個url變數,最終都fetch最後一個URL。
✅ 正確寫法:
for _, url := range urls {
url := url
go func() {
fetch(url)
}()
}
陷阱2:channel未關閉導致goroutine泄漏
❌ 錯誤寫法:
func producer() <-chan int {
ch := make(chan int)
go func() {
for i := 0; i < 100; i++ {
ch <- i
}
}()
return ch
}
消費者range channel永遠不會結束,goroutine泄漏。
✅ 正確寫法:
func producer() <-chan int {
ch := make(chan int)
go func() {
defer close(ch)
for i := 0; i < 100; i++ {
ch <- i
}
}()
return ch
}
陷阱3:WaitGroup的Add位置錯誤
❌ 錯誤寫法:
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
go func() {
wg.Add(1)
defer wg.Done()
doWork()
}()
}
wg.Wait()
goroutine可能還沒執行到wg.Add(1),主goroutine就呼叫了wg.Wait(),直接通過。
✅ 正確寫法:
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
doWork()
}()
}
wg.Wait()
陷阱4:select的time.After泄漏
❌ 錯誤寫法:
select {
case result := <-ch:
process(result)
case <-time.After(5 * time.Second):
return errors.New("timeout")
}
每次select都建立新的time.After channel,如果頻繁呼叫會泄漏timer。
✅ 正確寫法:
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
select {
case result := <-ch:
process(result)
case <-ctx.Done():
return ctx.Err()
}
陷阱5:Mutex複製
❌ 錯誤寫法:
type SafeMap struct {
mu sync.Mutex
data map[string]string
}
func copyMap(m SafeMap) SafeMap {
return m
}
Mutex零值是「未鎖定」,複製一個已鎖定的Mutex會導致死結或資料競爭。
✅ 正確寫法:
type SafeMap struct {
mu *sync.Mutex
data map[string]string
}
func NewSafeMap() *SafeMap {
return &SafeMap{
mu: &sync.Mutex{},
data: make(map[string]string),
}
}
錯誤排查速查表
| 錯誤現象 | 可能原因 | 排查方法 | 解決方案 |
|---|---|---|---|
fatal error: all goroutines are asleep - deadlock! |
所有goroutine阻塞,無活躍goroutine | 檢查channel讀寫是否配對,select是否有default | 確保channel有生產者和消費者,新增context取消 |
| goroutine數量持續增長 | goroutine泄漏,channel未關閉 | runtime.NumGoroutine()監控,pprof goroutine profile |
確保所有goroutine有退出路徑,defer close(ch) |
| 記憶體持續增長 | goroutine持有大物件引用,channel堆積 | pprof heap profile,檢查channel buffer大小 | 限制channel buffer,及時釋放引用 |
context canceled 錯誤頻繁 |
上游context被取消,超時設定過短 | 檢查context鏈路,確認超時時間是否合理 | 調整超時時間,區分業務錯誤和超時錯誤 |
| 併發結果丟失 | goroutine在寫入channel前退出 | 檢查goroutine退出邏輯,確認channel是否已關閉 | 使用WaitGroup確保所有goroutine完成後再關閉channel |
race condition 報錯 |
多個goroutine同時讀寫共享變數 | go test -race,檢查全域變數和閉包捕獲 |
使用Mutex/RWMutex保護,或改用channel通訊 |
| channel阻塞無回應 | 生產者/消費者速度不匹配 | 檢查channel buffer大小,監控生產和消費速率 | 增加buffer,使用select+default非阻塞傳送 |
| 服務優雅退出失敗 | goroutine未回應context取消 | 檢查goroutine是否select了ctx.Done() | 確保所有長時間執行的goroutine檢查ctx.Done() |
| errgroup只回傳一個錯誤 | errgroup設計為首錯取消 | 檢查是否需要收集所有錯誤 | 使用自定義錯誤收集器或多個errgroup |
| 訊號量獲取超時 | 併發度限制過嚴,請求排隊過長 | 監控semaphore等待時間,調整權重配額 | 增加訊號量容量,最佳化權重分配 |
進階最佳化技巧
最佳化1:動態Worker Pool
根據系統負載動態調整worker數量:
package dynamicpool
import (
"context"
"runtime"
"sync"
"sync/atomic"
"time"
)
type DynamicPool struct {
minWorkers int64
maxWorkers int64
active atomic.Int64
tasks chan Task
wg sync.WaitGroup
adjustTick *time.Ticker
}
func NewDynamicPool(minW, maxW int) *DynamicPool {
return &DynamicPool{
minWorkers: int64(minW),
maxWorkers: int64(maxW),
tasks: make(chan Task, maxW*2),
adjustTick: time.NewTicker(5 * time.Second),
}
}
func (p *DynamicPool) Start(ctx context.Context) {
for i := 0; i < int(p.minWorkers); i++ {
p.addWorker(ctx)
}
go func() {
for {
select {
case <-ctx.Done():
p.adjustTick.Stop()
return
case <-p.adjustTick.C:
p.adjustWorkers(ctx)
}
}
}()
}
func (p *DynamicPool) addWorker(ctx context.Context) {
p.active.Add(1)
p.wg.Add(1)
go func() {
defer func() {
p.active.Add(-1)
p.wg.Done()
}()
for task := range p.tasks {
_ = task(ctx)
}
}()
}
func (p *DynamicPool) adjustWorkers(ctx context.Context) {
current := p.active.Load()
queueLen := len(p.tasks)
if queueLen > int(current) && current < p.maxWorkers {
p.addWorker(ctx)
} else if queueLen == 0 && current > p.minWorkers {
for i := 0; i < 5 && int(p.active.Load()) > int(p.minWorkers); i++ {
p.tasks <- nil
}
}
}
func (p *DynamicPool) Stop() {
close(p.tasks)
p.wg.Wait()
}
最佳化2:批次處理(Batching)
將多個小任務合併為一個批次操作,減少系統呼叫和IO開銷:
package batcher
import (
"context"
"sync"
"time"
)
type Batcher[T any, R any] struct {
batchSize int
flushInterval time.Duration
handler func(ctx context.Context, batch []T) ([]R, error)
mu sync.Mutex
buffer []T
results []R
}
func NewBatcher[T any, R any](
batchSize int,
flushInterval time.Duration,
handler func(ctx context.Context, batch []T) ([]R, error),
) *Batcher[T, R] {
return &Batcher[T, R]{
batchSize: batchSize,
flushInterval: flushInterval,
handler: handler,
buffer: make([]T, 0, batchSize),
}
}
func (b *Batcher[T, R]) Add(ctx context.Context, item T) (R, error) {
b.mu.Lock()
b.buffer = append(b.buffer, item)
if len(b.buffer) >= b.batchSize {
batch := b.buffer
b.buffer = make([]T, 0, b.batchSize)
b.mu.Unlock()
results, err := b.handler(ctx, batch)
if err != nil {
var zero R
return zero, err
}
b.results = append(b.results, results...)
var zero R
if len(results) > 0 {
return results[0], nil
}
return zero, nil
}
b.mu.Unlock()
var zero R
return zero, nil
}
func (b *Batcher[T, R]) Flush(ctx context.Context) ([]R, error) {
b.mu.Lock()
batch := b.buffer
b.buffer = make([]T, 0, b.batchSize)
b.mu.Unlock()
if len(batch) == 0 {
return b.results, nil
}
results, err := b.handler(ctx, batch)
if err != nil {
return nil, err
}
b.results = append(b.results, results...)
return b.results, nil
}
最佳化3:零拷貝Channel傳遞
使用指標和sync.Pool減少channel傳遞時的記憶體分配:
package zerocopy
import (
"sync"
)
type Buffer struct {
Data []byte
}
var bufferPool = sync.Pool{
New: func() any {
return &Buffer{Data: make([]byte, 0, 4096)}
},
}
func GetBuffer() *Buffer {
return bufferPool.Get().(*Buffer)
}
func PutBuffer(buf *Buffer) {
buf.Data = buf.Data[:0]
bufferPool.Put(buf)
}
func ProcessPipeline(ctx context.Context, input <-chan []byte) <-chan *Buffer {
out := make(chan *Buffer, 64)
go func() {
defer close(out)
for {
select {
case <-ctx.Done():
return
case data, ok := <-input:
if !ok {
return
}
buf := GetBuffer()
buf.Data = append(buf.Data[:0], data...)
select {
case out <- buf:
case <-ctx.Done():
PutBuffer(buf)
return
}
}
}
}()
return out
}
併發模式對比
| 特性 | goroutine+channel | sync.WaitGroup | errgroup | Worker Pool |
|---|---|---|---|---|
| 併發控制 | 無內建控制 | 無內建控制 | SetLimit |
固定worker數 |
| 錯誤處理 | 需手動實現 | 需手動實現 | 自動首錯取消 | 需手動實現 |
| 背壓機制 | channel buffer | 無 | 無 | task channel |
| 取消傳播 | 需手動context | 需手動context | 自動context | 需手動context |
| 結果收集 | channel接收 | 需共享變數 | 回傳slice | channel接收 |
| 適用場景 | 串流資料處理 | 批量任務等待 | 併發API呼叫 | 限流任務處理 |
| 複雜度 | 中 | 低 | 低 | 中 |
| goroutine數量 | 不固定 | 不固定 | 有限制 | 固定 |
| 生產推薦度 | ⭐⭐⭐ | ⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ |
總結
Go的併發程式設計不是「能跑就行」,而是要回答四個問題:多少個goroutine在跑?它們什麼時候退出?出錯怎麼辦?資源怎麼回收? Worker Pool回答了「多少個」,Context回答了「什麼時候退出」,errgroup回答了「出錯怎麼辦」,defer close()回答了「資源怎麼回收」。掌握這7種模式,你就掌握了生產級Go併發程式設計的核心方法論。
推薦工具
本站提供瀏覽器本地工具,免註冊即可試用 →