Go Concurrency Patterns: 7 Production Patterns from Worker Pool to Pipeline

编程语言

When Goroutine Leaks Meet Unbounded Concurrency: A Production Nightmare

3 AM, production OOM alert. Investigation reveals: an HTTP handler launching goroutines without limits — 3 goroutines per request, QPS 1000 means 3,000, and in 10 minutes, 180,000 leaked goroutines consume all memory. Worse, these goroutines hold channel references that block GC, eventually crashing the entire node.

This isn't an isolated case. Go's concurrency primitives are simple — go func() launches concurrency in one line — but production concurrency programming is far more than just starting goroutines. You need to control concurrency levels, handle error propagation, implement graceful shutdown, and prevent resource leaks. This article covers 7 production-grade concurrency patterns to help you build robust Go concurrent services.


Core Concepts Reference

Primitive Purpose Key Features Typical Use Case
goroutine Lightweight concurrent execution unit Growable stack (2KB initial), Go runtime scheduling Any task requiring concurrent execution
channel Inter-goroutine communication Type-safe, buffered/unbuffered, closeable Data passing, signal notification, result collection
sync.WaitGroup Wait for a group of goroutines Add/Done/Wait trio Batch task waiting, concurrent fan-out
sync.Mutex Mutual exclusion for shared state Zero-value usable, TryLock support (Go 1.18+) Counters, cache updates, config hot-reload
context.Context Propagate cancellation and timeout Immutable, can only derive child contexts Request timeout, graceful shutdown, tracing
errgroup.Group Concurrent execution + error collection First error cancels all goroutines Batch API calls, parallel data fetching
semaphore.Weighted Weighted semaphore rate limiting Supports weights, timeout-aware acquisition API rate limiting, resource quota control

5 Challenges of Production Concurrency Programming

Challenge 1: Unbounded Concurrency Causes Resource Exhaustion

func handleRequests(urls []string) {
    for _, url := range urls {
        go fetch(url)
    }
}

Each URL spawns a goroutine — 10,000 URLs means 10,000 concurrent connections. Database connection pools get exhausted, downstream services get crushed, memory skyrockets.

Challenge 2: Goroutine Leaks

func process(ch <-chan int) {
    for {
        val := <-ch
        fmt.Println(val)
    }
}

If the channel is never closed, this goroutine never exits. In long-running services, leaked goroutines accumulate continuously.

Challenge 3: Errors Silently Swallowed

func fetchAll(urls []string) {
    var wg sync.WaitGroup
    for _, url := range urls {
        wg.Add(1)
        go func(u string) {
            defer wg.Done()
            resp, err := http.Get(u)
            if err != nil {
                log.Printf("fetch %s failed: %v", u, err)
                return
            }
            process(resp)
        }(url)
    }
    wg.Wait()
}

Errors are only logged — the caller has no idea about failures. If 2 out of 3 URLs fail, the caller assumes all succeeded.

Challenge 4: No Graceful Shutdown

When the service receives SIGTERM, running goroutines are forcibly interrupted. In-flight writes may corrupt data, in-progress transactions may be half-completed.

Challenge 5: Difficulty Composing Concurrency Patterns

Worker Pool needs rate limiting, Pipeline needs error propagation, Fan-out needs result aggregation. Implementing each pattern alone isn't hard, but combining them in one service creates interactions that easily lead to deadlocks.


7 Production-Grade Concurrency Patterns

Pattern 1: Worker Pool — Bounded Goroutine Pool

Worker Pool is the most fundamental and important concurrency pattern. Core idea: a fixed number of workers pull tasks from a task queue, avoiding unbounded concurrency.

package workerpool

import (
    "context"
    "sync"
)

type Task func(ctx context.Context) error

type Pool struct {
    workers int
    tasks   chan Task
    wg      sync.WaitGroup
}

func NewPool(workers int, bufferSize int) *Pool {
    return &Pool{
        workers: workers,
        tasks:   make(chan Task, bufferSize),
    }
}

func (p *Pool) Start(ctx context.Context) {
    for i := 0; i < p.workers; i++ {
        p.wg.Add(1)
        go func(workerID int) {
            defer p.wg.Done()
            for {
                select {
                case <-ctx.Done():
                    return
                case task, ok := <-p.tasks:
                    if !ok {
                        return
                    }
                    _ = task(ctx)
                }
            }
        }(i)
    }
}

func (p *Pool) Submit(task Task) bool {
    select {
    case p.tasks <- task:
        return true
    default:
        return false
    }
}

func (p *Pool) Stop() {
    close(p.tasks)
    p.wg.Wait()
}

Usage example:

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
    defer cancel()

    pool := NewPool(10, 100)
    pool.Start(ctx)

    urls := []string{
        "https://api.example.com/users",
        "https://api.example.com/orders",
        "https://api.example.com/products",
    }

    for _, url := range urls {
        u := url
        pool.Submit(func(ctx context.Context) error {
            return fetchURL(ctx, u)
        })
    }

    pool.Stop()
}

Key design points:

  • Fixed worker count prevents goroutine explosion
  • Buffered task channel serves as task queue
  • Context support for cancellation, workers can exit gracefully
  • Submit is non-blocking, returns false when queue is full

Pattern 2: Fan-out/Fan-in — Parallel Scatter-Gather

Fan-out distributes a data source to multiple goroutines for parallel processing, Fan-in merges results from multiple goroutines into one channel.

package fan

import (
    "context"
    "sync"
)

func FanOut[T any](ctx context.Context, source <-chan T, workers int) []<-chan T {
    channels := make([]<-chan T, workers)
    for i := 0; i < workers; i++ {
        ch := make(chan T)
        channels[i] = ch
        go func() {
            defer close(ch)
            for {
                select {
                case <-ctx.Done():
                    return
                case val, ok := <-source:
                    if !ok {
                        return
                    }
                    select {
                    case ch <- val:
                    case <-ctx.Done():
                        return
                    }
                }
            }
        }()
    }
    return channels
}

func FanIn[T any](ctx context.Context, channels ...<-chan T) <-chan T {
    out := make(chan T)
    var wg sync.WaitGroup
    wg.Add(len(channels))

    for _, ch := range channels {
        go func(c <-chan T) {
            defer wg.Done()
            for {
                select {
                case <-ctx.Done():
                    return
                case val, ok := <-c:
                    if !ok {
                        return
                    }
                    select {
                    case out <- val:
                    case <-ctx.Done():
                        return
                    }
                }
            }
        }(ch)
    }

    go func() {
        wg.Wait()
        close(out)
    }()

    return out
}

Usage example — parallel order processing:

func processOrders(ctx context.Context, orders <-chan Order) <-chan Result {
    workers := FanOut(ctx, orders, 5)
    return FanIn(ctx, workers...)
}

Key design points:

  • Generics support (Go 1.18+), applicable to any type
  • Each fan-out goroutine independently consumes from source channel
  • Fan-in uses WaitGroup to wait for all input channels to close
  • All goroutines can exit on context cancellation

Pattern 3: Pipeline — Stage-Based Processing

Pipeline decomposes complex processing into stages, each running as a goroutine connected by channels.

package pipeline

import (
    "context"
)

type Stage[In any, Out any] func(ctx context.Context, in <-chan In) <-chan Out

func NewPipeline[In any, Out any](
    ctx context.Context,
    source <-chan In,
    stages ...Stage[In, In],
) <-chan Out {
    current := source
    for _, stage := range stages {
        current = stage(ctx, current)
    }
    return any(current).(<-chan Out)
}

func NewStage[In any, Out any](
    process func(ctx context.Context, in In) (Out, error),
    bufferSize int,
) Stage[In, Out] {
    return func(ctx context.Context, in <-chan In) <-chan Out {
        out := make(chan Out, bufferSize)
        go func() {
            defer close(out)
            for {
                select {
                case <-ctx.Done():
                    return
                case val, ok := <-in:
                    if !ok {
                        return
                    }
                    result, err := process(ctx, val)
                    if err != nil {
                        continue
                    }
                    select {
                    case out <- result:
                    case <-ctx.Done():
                        return
                    }
                }
            }
        }()
        return out
    }
}

Usage example — data processing pipeline:

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    raw := make(chan RawData, 100)

    validate := NewStage[RawData, ValidData](func(ctx context.Context, in RawData) (ValidData, error) {
        if err := in.Validate(); err != nil {
            return ValidData{}, err
        }
        return in.ToValid(), nil
    }, 50)

    enrich := NewStage[ValidData, EnrichedData](func(ctx context.Context, in ValidData) (EnrichedData, error) {
        return fetchExtraInfo(ctx, in)
    }, 50)

    transform := NewStage[EnrichedData, FinalData](func(ctx context.Context, in EnrichedData) (FinalData, error) {
        return in.Transform()
    }, 50)

    result := NewPipeline(ctx, raw, validate, enrich, transform)

    go func() {
        for r := range result {
            saveToDB(r)
        }
    }()
}

Key design points:

  • Each stage is an independent goroutine, can scale independently
  • Channels provide backpressure between stages
  • Errors handled within stages, don't interrupt the entire pipeline
  • All stages exit gracefully on context cancellation

Pattern 4: errgroup — Concurrent Error Handling

errgroup is the error-aware version of sync.WaitGroup: the first error cancels all goroutines.

package main

import (
    "context"
    "fmt"
    "net/http"
    "time"

    "golang.org/x/sync/errgroup"
)

type FetchResult struct {
    URL  string
    Body string
    Size int
}

func fetchMultiple(ctx context.Context, urls []string) ([]FetchResult, error) {
    g, ctx := errgroup.WithContext(ctx)
    results := make([]FetchResult, len(urls))

    for i, url := range urls {
        i, url := i, url
        g.Go(func() error {
            req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
            if err != nil {
                return fmt.Errorf("create request %s: %w", url, err)
            }

            resp, err := http.DefaultClient.Do(req)
            if err != nil {
                return fmt.Errorf("fetch %s: %w", url, err)
            }
            defer resp.Body.Close()

            if resp.StatusCode != http.StatusOK {
                return fmt.Errorf("fetch %s: status %d", url, resp.StatusCode)
            }

            body, err := io.ReadAll(resp.Body)
            if err != nil {
                return fmt.Errorf("read %s: %w", url, err)
            }

            results[i] = FetchResult{
                URL:  url,
                Body: string(body),
                Size: len(body),
            }
            return nil
        })
    }

    if err := g.Wait(); err != nil {
        return nil, err
    }
    return results, nil
}

errgroup with concurrency limit:

func fetchWithLimit(ctx context.Context, urls []string, maxConcurrent int) ([]FetchResult, error) {
    g, ctx := errgroup.WithContext(ctx)
    g.SetLimit(maxConcurrent)

    results := make([]FetchResult, len(urls))
    for i, url := range urls {
        i, url := i, url
        g.Go(func() error {
            results[i], _ = fetchOne(ctx, url)
            return nil
        })
    }

    if err := g.Wait(); err != nil {
        return nil, err
    }
    return results, nil
}

Key design points:

  • errgroup.WithContext automatically propagates cancellation
  • g.SetLimit(n) controls max concurrency (Go 1.20+)
  • First error cancels all in-progress goroutines
  • Closure variable capture requires i, url := i, url

Pattern 5: Semaphore — Rate Limiting

semaphore.Weighted provides weighted semaphores for resource allocation with different weights.

package ratelimit

import (
    "context"
    "fmt"

    "golang.org/x/sync/semaphore"
)

type RateLimiter struct {
    sem *semaphore.Weighted
}

func NewRateLimiter(maxWeight int64) *RateLimiter {
    return &RateLimiter{
        sem: semaphore.NewWeighted(maxWeight),
    }
}

func (r *RateLimiter) Do(ctx context.Context, weight int64, fn func() error) error {
    if err := r.sem.Acquire(ctx, weight); err != nil {
        return fmt.Errorf("acquire semaphore: %w", err)
    }
    defer r.sem.Release(weight)
    return fn()
}

Usage example — tiered API rate limiting:

func main() {
    limiter := NewRateLimiter(100)

    err := limiter.Do(context.Background(), 10, func() error {
        return callLightAPI()
    })

    err = limiter.Do(context.Background(), 50, func() error {
        return callHeavyAPI()
    })
}

Semaphore with timeout:

func (r *RateLimiter) DoWithTimeout(ctx context.Context, weight int64, timeout time.Duration, fn func() error) error {
    acquireCtx, cancel := context.WithTimeout(ctx, timeout)
    defer cancel()

    if err := r.sem.Acquire(acquireCtx, weight); err != nil {
        return fmt.Errorf("semaphore acquire timeout: %w", err)
    }
    defer r.sem.Release(weight)
    return fn()
}

Key design points:

  • Weighted semaphore, different operations consume different quotas
  • Acquire supports context cancellation and timeout
  • Release must be paired with Acquire
  • Suitable for tiered API rate limiting, resource quota control

Pattern 6: Context Cancellation and Timeout

Context is the "lifeline" of Go concurrency programming, used to propagate cancellation signals, timeouts, and deadlines.

package ctxutil

import (
    "context"
    "fmt"
    "time"
)

type Result struct {
    Data  string
    Error error
}

func FetchWithTimeout(ctx context.Context, url string, timeout time.Duration) (string, error) {
    ctx, cancel := context.WithTimeout(ctx, timeout)
    defer cancel()

    req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
    if err != nil {
        return "", fmt.Errorf("create request: %w", err)
    }

    resp, err := http.DefaultClient.Do(req)
    if err != nil {
        if ctx.Err() == context.DeadlineExceeded {
            return "", fmt.Errorf("fetch %s timed out after %v: %w", url, timeout, ctx.Err())
        }
        return "", fmt.Errorf("fetch %s: %w", url, err)
    }
    defer resp.Body.Close()

    body, err := io.ReadAll(resp.Body)
    if err != nil {
        return "", fmt.Errorf("read response: %w", err)
    }
    return string(body), nil
}

func BatchFetch(ctx context.Context, urls []string, timeout time.Duration) []Result {
    results := make([]Result, len(urls))
    var wg sync.WaitGroup

    for i, url := range urls {
        wg.Add(1)
        go func(idx int, u string) {
            defer wg.Done()
            data, err := FetchWithTimeout(ctx, u, timeout)
            results[idx] = Result{Data: data, Error: err}
        }(i, url)
    }

    wg.Wait()
    return results
}

Graceful shutdown pattern:

func main() {
    ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
    defer stop()

    server := &http.Server{Addr: ":8080"}

    go func() {
        <-ctx.Done()
        shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
        defer cancel()
        _ = server.Shutdown(shutdownCtx)
    }()

    _ = server.ListenAndServe()
}

Key design points:

  • WithTimeout/WithDeadline for setting timeouts
  • WithCancel for manual cancellation control
  • signal.NotifyContext for listening to system signals
  • Context cancellation propagates to all child goroutines
  • defer cancel() prevents context leaks

Pattern 7: Production Concurrent Service — Combining Patterns

Combine the above patterns to build a production-grade concurrent service: Worker Pool + Pipeline + errgroup + Context.

package concurrencyservice

import (
    "context"
    "fmt"
    "sync"
    "time"

    "golang.org/x/sync/errgroup"
)

type Service struct {
    workers    int
    bufferSize int
    timeout    time.Duration
}

func NewService(workers, bufferSize int, timeout time.Duration) *Service {
    return &Service{
        workers:    workers,
        bufferSize: bufferSize,
        timeout:    timeout,
    }
}

type Job struct {
    ID    string
    Input any
}

type Output struct {
    Job    Job
    Result any
    Err    error
}

func (s *Service) Process(ctx context.Context, jobs []Job) ([]Output, error) {
    ctx, cancel := context.WithTimeout(ctx, s.timeout)
    defer cancel()

    g, ctx := errgroup.WithContext(ctx)
    g.SetLimit(s.workers)

    jobCh := make(chan Job, s.bufferSize)
    resultCh := make(chan Output, s.bufferSize)

    g.Go(func() error {
        defer close(jobCh)
        for _, job := range jobs {
            select {
            case <-ctx.Done():
                return ctx.Err()
            case jobCh <- job:
            }
        }
        return nil
    })

    var processWg sync.WaitGroup
    for i := 0; i < s.workers; i++ {
        processWg.Add(1)
        go func() {
            defer processWg.Done()
            for job := range jobCh {
                output := Output{Job: job}
                output.Result, output.Err = s.processOne(ctx, job)
                select {
                case resultCh <- output:
                case <-ctx.Done():
                    return
                }
            }
        }()
    }

    go func() {
        processWg.Wait()
        close(resultCh)
    }()

    var outputs []Output
    for result := range resultCh {
        outputs = append(outputs, result)
    }

    if err := g.Wait(); err != nil {
        return outputs, fmt.Errorf("service process: %w", err)
    }
    return outputs, nil
}

func (s *Service) processOne(ctx context.Context, job Job) (any, error) {
    select {
    case <-ctx.Done():
        return nil, ctx.Err()
    default:
    }
    return fmt.Sprintf("processed-%s", job.ID), nil
}

Key design points:

  • errgroup controls concurrency + error propagation
  • Channels for task distribution and result collection
  • Context for unified timeout and cancellation
  • WaitGroup ensures all workers complete before closing result channel
  • Layered design: scheduling (errgroup) + execution (worker goroutines) + collection (range resultCh)

5 Common Pitfalls and Fixes

Pitfall 1: Closure Variable Capture

❌ Wrong:

for _, url := range urls {
    go func() {
        fetch(url)
    }()
}

All goroutines share the same url variable, ultimately fetching the last URL.

✅ Correct:

for _, url := range urls {
    url := url
    go func() {
        fetch(url)
    }()
}

Pitfall 2: Unclosed Channel Causes Goroutine Leak

❌ Wrong:

func producer() <-chan int {
    ch := make(chan int)
    go func() {
        for i := 0; i < 100; i++ {
            ch <- i
        }
    }()
    return ch
}

Consumers ranging over the channel never finish, goroutine leaks.

✅ Correct:

func producer() <-chan int {
    ch := make(chan int)
    go func() {
        defer close(ch)
        for i := 0; i < 100; i++ {
            ch <- i
        }
    }()
    return ch
}

Pitfall 3: WaitGroup Add Position Error

❌ Wrong:

var wg sync.WaitGroup
for i := 0; i < 10; i++ {
    go func() {
        wg.Add(1)
        defer wg.Done()
        doWork()
    }()
}
wg.Wait()

Goroutines may not have reached wg.Add(1) yet, and the main goroutine's wg.Wait() passes immediately.

✅ Correct:

var wg sync.WaitGroup
for i := 0; i < 10; i++ {
    wg.Add(1)
    go func() {
        defer wg.Done()
        doWork()
    }()
}
wg.Wait()

Pitfall 4: time.After Leak in Select

❌ Wrong:

select {
case result := <-ch:
    process(result)
case <-time.After(5 * time.Second):
    return errors.New("timeout")
}

Each select creates a new time.After channel. Frequent calls leak timers.

✅ Correct:

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

select {
case result := <-ch:
    process(result)
case <-ctx.Done():
    return ctx.Err()
}

Pitfall 5: Mutex Copy

❌ Wrong:

type SafeMap struct {
    mu   sync.Mutex
    data map[string]string
}

func copyMap(m SafeMap) SafeMap {
    return m
}

Mutex zero value is "unlocked". Copying a locked Mutex causes deadlock or data race.

✅ Correct:

type SafeMap struct {
    mu   *sync.Mutex
    data map[string]string
}

func NewSafeMap() *SafeMap {
    return &SafeMap{
        mu:   &sync.Mutex{},
        data: make(map[string]string),
    }
}

Error Troubleshooting Reference

Symptom Possible Cause Investigation Solution
fatal error: all goroutines are asleep - deadlock! All goroutines blocked, no active goroutines Check channel read/write pairing, select default Ensure channels have producers and consumers, add context cancellation
Goroutine count keeps growing Goroutine leak, unclosed channels runtime.NumGoroutine() monitoring, pprof goroutine profile Ensure all goroutines have exit paths, defer close(ch)
Memory keeps growing Goroutines hold large object references, channel accumulation pprof heap profile, check channel buffer size Limit channel buffer, release references promptly
Frequent context canceled errors Upstream context cancelled, timeout too short Check context chain, verify timeout is reasonable Adjust timeout, distinguish business errors from timeout errors
Concurrent results missing Goroutine exits before writing to channel Check goroutine exit logic, confirm channel closure Use WaitGroup to ensure all goroutines complete before closing channel
race condition detected Multiple goroutines read/write shared variables go test -race, check global variables and closure captures Use Mutex/RWMutex for protection, or switch to channel communication
Channel blocked, no response Producer/consumer speed mismatch Check channel buffer size, monitor production/consumption rates Increase buffer, use select+default for non-blocking send
Graceful shutdown failure Goroutines not responding to context cancellation Check if goroutines select on ctx.Done() Ensure all long-running goroutines check ctx.Done()
errgroup returns only one error errgroup designed for first-error cancellation Check if you need to collect all errors Use custom error collector or multiple errgroups
Semaphore acquisition timeout Concurrency limit too strict, requests queuing too long Monitor semaphore wait time, adjust weight quotas Increase semaphore capacity, optimize weight allocation

Advanced Optimization Techniques

Optimization 1: Dynamic Worker Pool

Dynamically adjust worker count based on system load:

package dynamicpool

import (
    "context"
    "runtime"
    "sync"
    "sync/atomic"
    "time"
)

type DynamicPool struct {
    minWorkers int64
    maxWorkers int64
    active     atomic.Int64
    tasks      chan Task
    wg         sync.WaitGroup
    adjustTick *time.Ticker
}

func NewDynamicPool(minW, maxW int) *DynamicPool {
    return &DynamicPool{
        minWorkers: int64(minW),
        maxWorkers: int64(maxW),
        tasks:      make(chan Task, maxW*2),
        adjustTick: time.NewTicker(5 * time.Second),
    }
}

func (p *DynamicPool) Start(ctx context.Context) {
    for i := 0; i < int(p.minWorkers); i++ {
        p.addWorker(ctx)
    }

    go func() {
        for {
            select {
            case <-ctx.Done():
                p.adjustTick.Stop()
                return
            case <-p.adjustTick.C:
                p.adjustWorkers(ctx)
            }
        }
    }()
}

func (p *DynamicPool) addWorker(ctx context.Context) {
    p.active.Add(1)
    p.wg.Add(1)
    go func() {
        defer func() {
            p.active.Add(-1)
            p.wg.Done()
        }()
        for task := range p.tasks {
            _ = task(ctx)
        }
    }()
}

func (p *DynamicPool) adjustWorkers(ctx context.Context) {
    current := p.active.Load()
    queueLen := len(p.tasks)

    if queueLen > int(current) && current < p.maxWorkers {
        p.addWorker(ctx)
    } else if queueLen == 0 && current > p.minWorkers {
        for i := 0; i < 5 && int(p.active.Load()) > int(p.minWorkers); i++ {
            p.tasks <- nil
        }
    }
}

func (p *DynamicPool) Stop() {
    close(p.tasks)
    p.wg.Wait()
}

Optimization 2: Batching

Combine multiple small tasks into one batch operation to reduce syscall and IO overhead:

package batcher

import (
    "context"
    "sync"
    "time"
)

type Batcher[T any, R any] struct {
    batchSize     int
    flushInterval time.Duration
    handler       func(ctx context.Context, batch []T) ([]R, error)
    mu            sync.Mutex
    buffer        []T
    results       []R
}

func NewBatcher[T any, R any](
    batchSize int,
    flushInterval time.Duration,
    handler func(ctx context.Context, batch []T) ([]R, error),
) *Batcher[T, R] {
    return &Batcher[T, R]{
        batchSize:     batchSize,
        flushInterval: flushInterval,
        handler:       handler,
        buffer:        make([]T, 0, batchSize),
    }
}

func (b *Batcher[T, R]) Add(ctx context.Context, item T) (R, error) {
    b.mu.Lock()
    b.buffer = append(b.buffer, item)

    if len(b.buffer) >= b.batchSize {
        batch := b.buffer
        b.buffer = make([]T, 0, b.batchSize)
        b.mu.Unlock()

        results, err := b.handler(ctx, batch)
        if err != nil {
            var zero R
            return zero, err
        }
        b.results = append(b.results, results...)
        var zero R
        if len(results) > 0 {
            return results[0], nil
        }
        return zero, nil
    }
    b.mu.Unlock()

    var zero R
    return zero, nil
}

func (b *Batcher[T, R]) Flush(ctx context.Context) ([]R, error) {
    b.mu.Lock()
    batch := b.buffer
    b.buffer = make([]T, 0, b.batchSize)
    b.mu.Unlock()

    if len(batch) == 0 {
        return b.results, nil
    }

    results, err := b.handler(ctx, batch)
    if err != nil {
        return nil, err
    }
    b.results = append(b.results, results...)
    return b.results, nil
}

Optimization 3: Zero-Copy Channel Passing

Use pointers and sync.Pool to reduce memory allocation during channel passing:

package zerocopy

import (
    "sync"
)

type Buffer struct {
    Data []byte
}

var bufferPool = sync.Pool{
    New: func() any {
        return &Buffer{Data: make([]byte, 0, 4096)}
    },
}

func GetBuffer() *Buffer {
    return bufferPool.Get().(*Buffer)
}

func PutBuffer(buf *Buffer) {
    buf.Data = buf.Data[:0]
    bufferPool.Put(buf)
}

func ProcessPipeline(ctx context.Context, input <-chan []byte) <-chan *Buffer {
    out := make(chan *Buffer, 64)
    go func() {
        defer close(out)
        for {
            select {
            case <-ctx.Done():
                return
            case data, ok := <-input:
                if !ok {
                    return
                }
                buf := GetBuffer()
                buf.Data = append(buf.Data[:0], data...)

                select {
                case out <- buf:
                case <-ctx.Done():
                    PutBuffer(buf)
                    return
                }
            }
        }
    }()
    return out
}

Concurrency Pattern Comparison

Feature goroutine+channel sync.WaitGroup errgroup Worker Pool
Concurrency control No built-in control No built-in control SetLimit Fixed worker count
Error handling Manual implementation Manual implementation Auto first-error cancel Manual implementation
Backpressure Channel buffer None None Task channel
Cancellation propagation Manual context Manual context Auto context Manual context
Result collection Channel receive Shared variables Return slice Channel receive
Use case Stream data processing Batch task waiting Concurrent API calls Rate-limited task processing
Complexity Medium Low Low Medium
Goroutine count Unfixed Unfixed Limited Fixed
Production recommendation ⭐⭐⭐ ⭐⭐ ⭐⭐⭐⭐⭐ ⭐⭐⭐⭐⭐

Summary

Go concurrency programming isn't about "just make it run" — it's about answering four questions: How many goroutines are running? When do they exit? What happens on error? How are resources reclaimed? Worker Pool answers "how many," Context answers "when to exit," errgroup answers "what on error," and defer close() answers "how to reclaim." Master these 7 patterns, and you'll have the core methodology for production-grade Go concurrency programming.


  • JSON Formatter — Format concurrent service JSON responses, quickly debug data structure issues
  • Hash Calculator — Compute request signatures and data checksums, ensure data consistency across concurrent requests
  • Base64 Encode/Decode — Handle binary data encoding in concurrent services

Try these browser-local tools — no sign-up required →

#Go#并发#goroutine#channel#sync#2026#并发模式