Go 1.24迭代器模式:从range over func到数据管道的7种生产模式

编程语言

当for循环遇上函数:Go迭代器的范式转移

上周重构数据处理服务,一个3层嵌套的for循环处理10万条记录,内存飙到2GB——因为每层都要把中间结果存成切片再传给下一层。改成迭代器管道后,内存降到15MB,处理速度反而快了30%。关键转变:不再"先收集再处理",而是"边遍历边处理"

Go 1.24正式稳定了range over func语法和iter包,让Go拥有了原生的、零分配的、可组合的迭代器。这不是语法糖,而是数据处理范式的根本转变。本文将从7种生产级Go迭代器模式出发,帮你构建高效、优雅、可组合的数据管道。


核心要点

  • range over func是迭代器的核心语法:Go 1.24让函数直接成为可range的对象
  • Push/Pull迭代器转换:理解迭代器的两种方向,掌握iter.Pull转换
  • 数据管道组合:Map/Filter/Reduce链式组合,零中间分配
  • 惰性求值与无限序列:按需计算,处理无限数据流
  • 并发迭代器与Fan-out:多goroutine并行消费迭代器
  • 迭代器错误处理:优雅处理迭代过程中的错误
  • 生产级迭代器库设计:构建可复用的迭代器工具库

目录

  1. Go迭代器核心概念速查表
  2. Pattern 1:range over func基础迭代器
  3. Pattern 2:Push/Pull迭代器转换
  4. Pattern 3:数据管道组合(Map/Filter/Reduce)
  5. Pattern 4:惰性求值与无限序列
  6. Pattern 5:并发迭代器与Fan-out
  7. Pattern 6:迭代器错误处理
  8. Pattern 7:生产级迭代器库设计
  9. 5个常见坑及解决方案
  10. 10个常见报错排查
  11. 进阶优化技巧
  12. 对比分析:迭代器vs通道vs切片
  13. 在线工具推荐
  14. 总结

Go迭代器核心概念速查表

概念 签名 用途 示例
迭代器函数 func(yield func(V) bool) 单值迭代器 func(yield func(int) bool)
键值迭代器 func(yield func(K, V) bool) 键值对迭代 func(yield func(int, string) bool)
Pull迭代器 func() (V, bool) 按需拉取 next, stop := iter.Pull(seq)
iter.Pull func(Seq[V]) (func() (V, bool), func()) Push转Pull 消费者驱动遍历
iter.Stop 内置stop函数 提前终止迭代 stop() 释放资源
yield返回值 bool 控制迭代继续/停止 yield(v) 返回false则停止
惰性求值 延迟计算 按需生成值 无限序列、文件行
管道组合 函数链式调用 零中间分配 Filter(Map(Seq, fn), pred)

Pattern 1:range over func基础迭代器

问题:传统遍历的内存陷阱

func GetAllUsers(db *sql.DB) ([]User, error) {
    rows, err := db.Query("SELECT id, name, email FROM users")
    if err != nil {
        return nil, err
    }
    defer rows.Close()

    var users []User
    for rows.Next() {
        var u User
        if err := rows.Scan(&u.ID, &u.Name, &u.Email); err != nil {
            return nil, err
        }
        users = append(users, u)
    }
    return users, rows.Err()
}

100万用户?100万条User结构体全部加载到内存。你只需要前10条?不好意思,先全部加载。

解决方案:range over func迭代器

package iterator

import (
    "database/sql"
    "iter"
)

type User struct {
    ID    int
    Name  string
    Email string
}

func AllUsers(db *sql.DB) iter.Seq2[int, User] {
    return func(yield func(int, User) bool) {
        rows, err := db.Query("SELECT id, name, email FROM users")
        if err != nil {
            return
        }
        defer rows.Close()

        i := 0
        for rows.Next() {
            var u User
            if err := rows.Scan(&u.ID, &u.Name, &u.Email); err != nil {
                return
            }
            if !yield(i, u) {
                return
            }
            i++
        }
    }
}

使用方式:

for i, user := range AllUsers(db) {
    fmt.Printf("%d: %s\n", i, user.Name)
    if i >= 9 {
        break
    }
}

break时,yield返回false,迭代器函数直接return——只查询了10条,数据库连接正常关闭

迭代器执行流程

┌─────────────┐     yield(v)      ┌──────────────┐
│  迭代器函数   │ ──────────────→  │   range循环   │
│  (producer)  │                   │  (consumer)  │
│              │  ←──────────────  │              │
│              │   yield返回bool   │              │
└─────────────┘                   └──────────────┘
      │                                  │
      │  yield返回false → return         │
      │  (提前终止)                       │
      └──────────────────────────────────┘

单值迭代器 vs 键值迭代器

type IntSlice []int

func (s IntSlice) Values() iter.Seq[int] {
    return func(yield func(int) bool) {
        for _, v := range s {
            if !yield(v) {
                return
            }
        }
    }
}

func (s IntSlice) All() iter.Seq2[int, int] {
    return func(yield func(int, int) bool) {
        for i, v := range s {
            if !yield(i, v) {
                return
            }
        }
    }
}
nums := IntSlice{10, 20, 30}

for v := range nums.Values() {
    fmt.Println(v)
}

for i, v := range nums.All() {
    fmt.Printf("index=%d value=%d\n", i, v)
}

Pattern 2:Push/Pull迭代器转换

Push迭代器与Pull迭代器的区别

Push迭代器 (iter.Seq)              Pull迭代器 (func() (V, bool))
┌──────────────────┐               ┌──────────────────┐
│  生产者主动推送    │               │  消费者主动拉取    │
│  yield(v) → 消费者 │               │  next() → 生产者   │
│                    │               │                    │
│  适合:range遍历   │               │  适合:手动控制     │
│  适合:管道组合    │               │  适合:提前查看     │
│  适合:惰性求值    │               │  适合:互操作      │
└──────────────────┘               └──────────────────┘
         │                                  │
         │      iter.Pull() 转换            │
         └──────────────────────────────────┘

使用iter.Pull转换

package main

import (
    "fmt"
    "iter"
)

func Countdown(n int) iter.Seq[int] {
    return func(yield func(int) bool) {
        for i := n; i > 0; i-- {
            if !yield(i) {
                return
            }
        }
    }
}

func main() {
    next, stop := iter.Pull(Countdown(5))
    defer stop()

    for {
        v, ok := next()
        if !ok {
            break
        }
        fmt.Println(v)
        if v == 3 {
            fmt.Println("提前终止")
            break
        }
    }
}

Pull迭代器的实际应用:Peek和Take

package iterutil

import "iter"

func Take[V any](seq iter.Seq[V], n int) iter.Seq[V] {
    return func(yield func(V) bool) {
        count := 0
        for v := range seq {
            if count >= n {
                return
            }
            if !yield(v) {
                return
            }
            count++
        }
    }
}

func First[V any](seq iter.Seq[V]) (V, bool) {
    next, stop := iter.Pull(seq)
    defer stop()
    return next()
}

func PeekN[V any](seq iter.Seq[V], n int) []V {
    result := make([]V, 0, n)
    next, stop := iter.Pull(seq)
    defer stop()

    for i := 0; i < n; i++ {
        v, ok := next()
        if !ok {
            break
        }
        result = append(result, v)
    }
    return result
}
nums := Countdown(100)
fmt.Println(First(nums))
fmt.Println(PeekN(nums, 5))

重要:iter.Pull的资源清理

func ProcessLines(filename string) iter.Seq[string] {
    return func(yield func(string) bool) {
        file, err := os.Open(filename)
        if err != nil {
            return
        }
        defer file.Close()

        scanner := bufio.NewScanner(file)
        for scanner.Scan() {
            if !yield(scanner.Text()) {
                return
            }
        }
    }
}

func main() {
    next, stop := iter.Pull(ProcessLines("huge.log"))
    defer stop()

    v, ok := next()
    if ok {
        fmt.Println("第一行:", v)
    }
}

defer stop()确保即使只消费一个值,文件也会被正确关闭。


Pattern 3:数据管道组合(Map/Filter/Reduce)

问题:嵌套循环与中间切片

func ProcessOrders(orders []Order) float64 {
    var active []Order
    for _, o := range orders {
        if o.Status == "active" {
            active = append(active, o)
        }
    }

    var amounts []float64
    for _, o := range active {
        amounts = append(amounts, o.Amount*1.1)
    }

    var total float64
    for _, a := range amounts {
        total += a
    }
    return total
}

3次遍历,2个中间切片。数据量大时,内存和GC压力巨大。

解决方案:迭代器管道

package pipeline

import "iter"

func Map[V any, U any](seq iter.Seq[V], fn func(V) U) iter.Seq[U] {
    return func(yield func(U) bool) {
        for v := range seq {
            if !yield(fn(v)) {
                return
            }
        }
    }
}

func Map2[K any, V any, U any](seq iter.Seq2[K, V], fn func(K, V) U) iter.Seq[U] {
    return func(yield func(U) bool) {
        for k, v := range seq {
            if !yield(fn(k, v)) {
                return
            }
        }
    }
}

func Filter[V any](seq iter.Seq[V], pred func(V) bool) iter.Seq[V] {
    return func(yield func(V) bool) {
        for v := range seq {
            if pred(v) {
                if !yield(v) {
                    return
                }
            }
        }
    }
}

func Filter2[K any, V any](seq iter.Seq2[K, V], pred func(K, V) bool) iter.Seq2[K, V] {
    return func(yield func(K, V) bool) {
        for k, v := range seq {
            if pred(k, v) {
                if !yield(k, v) {
                    return
                }
            }
        }
    }
}

func Reduce[V any, U any](seq iter.Seq[V], init U, fn func(U, V) U) U {
    acc := init
    for v := range seq {
        acc = fn(acc, v)
    }
    return acc
}

使用管道组合

type Order struct {
    ID     int
    Status string
    Amount float64
}

func OrdersFromDB(db *sql.DB) iter.Seq[Order] {
    return func(yield func(Order) bool) {
        rows, _ := db.Query("SELECT id, status, amount FROM orders")
        if rows != nil {
            defer rows.Close()
            for rows.Next() {
                var o Order
                rows.Scan(&o.ID, &o.Status, &o.Amount)
                if !yield(o) {
                    return
                }
            }
        }
    }
}

func main() {
    db, _ := sql.Open("postgres", "dsn")

    total := Reduce(
        Map(
            Filter(
                OrdersFromDB(db),
                func(o Order) bool { return o.Status == "active" },
            ),
            func(o Order) float64 { return o.Amount * 1.1 },
        ),
        0.0,
        func(acc float64, v float64) float64 { return acc + v },
    )

    fmt.Printf("总金额: %.2f\n", total)
}

零中间分配:Filter、Map、Reduce串联,每个元素只经过一次处理。

管道执行流程

OrdersFromDB → Filter(active) → Map(×1.1) → Reduce(+) → total
     │              │                │            │
     │  Order{1,    │  Status==      │  Amount*1.1 │  acc+v
     │  "active",   │  "active"?     │             │
     │  100.0}      │                │             │
     │      ──────→ │  ✓ 通过        │             │
     │              │      ──────→   │  110.0      │
     │              │                │   ──────→   │  110.0
     │
     │  Order{2,    │  Status!=      │             │
     │  "closed",   │  "active"      │             │
     │  200.0}      │  ✗ 过滤掉      │             │
     │      ──────→ │  跳过          │             │

更多管道操作符

func FlatMap[V any, U any](seq iter.Seq[V], fn func(V) iter.Seq[U]) iter.Seq[U] {
    return func(yield func(U) bool) {
        for v := range seq {
            for u := range fn(v) {
                if !yield(u) {
                    return
                }
            }
        }
    }
}

func Zip[V any, U any](seq1 iter.Seq[V], seq2 iter.Seq[U]) iter.Seq2[V, U] {
    return func(yield func(V, U) bool) {
        next1, stop1 := iter.Pull(seq1)
        defer stop1()
        next2, stop2 := iter.Pull(seq2)
        defer stop2()

        for {
            v1, ok1 := next1()
            v2, ok2 := next2()
            if !ok1 || !ok2 {
                return
            }
            if !yield(v1, v2) {
                return
            }
        }
    }
}

func Enumerate[V any](seq iter.Seq[V]) iter.Seq2[int, V] {
    return func(yield func(int, V) bool) {
        i := 0
        for v := range seq {
            if !yield(i, v) {
                return
            }
            i++
        }
    }
}

func Chunk[V any](seq iter.Seq[V], size int) iter.Seq[[]V] {
    return func(yield func([]V) bool) {
        chunk := make([]V, 0, size)
        for v := range seq {
            chunk = append(chunk, v)
            if len(chunk) == size {
                if !yield(chunk) {
                    return
                }
                chunk = make([]V, 0, size)
            }
        }
        if len(chunk) > 0 {
            yield(chunk)
        }
    }
}

Pattern 4:惰性求值与无限序列

问题:预计算全部结果的浪费

func Fibonacci(n int) []int {
    result := make([]int, n)
    if n > 0 {
        result[0] = 0
    }
    if n > 1 {
        result[1] = 1
    }
    for i := 2; i < n; i++ {
        result[i] = result[i-1] + result[i-2]
    }
    return result
}

需要前10个斐波那契数?必须指定n。不知道要多少?只能先算一个"足够大"的值。

解决方案:无限迭代器 + 惰性求值

package lazy

import "iter"

func Fibonacci() iter.Seq[int] {
    return func(yield func(int) bool) {
        a, b := 0, 1
        for {
            if !yield(a) {
                return
            }
            a, b = b, a+b
        }
    }
}

func NaturalNumbers() iter.Seq[int] {
    return func(yield func(int) bool) {
        for i := 0; ; i++ {
            if !yield(i) {
                return
            }
        }
    }
}

func Repeat[V any](v V) iter.Seq[V] {
    return func(yield func(V) bool) {
        for {
            if !yield(v) {
                return
            }
        }
    }
}

func Iterate[V any](init V, fn func(V) V) iter.Seq[V] {
    return func(yield func(V) bool) {
        v := init
        for {
            if !yield(v) {
                return
            }
            v = fn(v)
        }
    }
}

func Cycle[V any](seq iter.Seq[V]) iter.Seq[V] {
    return func(yield func(V) bool) {
        for {
            for v := range seq {
                if !yield(v) {
                    return
                }
            }
        }
    }
}

使用惰性序列

func main() {
    for v := range Take(Fibonacci(), 10) {
        fmt.Print(v, " ")
    }
    fmt.Println()

    squares := Map(
        Take(NaturalNumbers(), 5),
        func(n int) int { return n * n },
    )
    for v := range squares {
        fmt.Print(v, " ")
    }
    fmt.Println()

    powersOf2 := Iterate(1, func(v int) int { return v * 2 })
    for v := range Take(powersOf2, 8) {
        fmt.Print(v, " ")
    }
    fmt.Println()
}

惰性文件处理

func FileLines(path string) iter.Seq[string] {
    return func(yield func(string) bool) {
        f, err := os.Open(path)
        if err != nil {
            return
        }
        defer f.Close()

        scanner := bufio.NewScanner(f)
        for scanner.Scan() {
            if !yield(scanner.Text()) {
                return
            }
        }
    }
}

func Grep(pattern string, lines iter.Seq[string]) iter.Seq[string] {
    re := regexp.MustCompile(pattern)
    return Filter(lines, func(line string) bool {
        return re.MatchString(line)
    })
}

func main() {
    errors := Grep("ERROR", FileLines("/var/log/app.log"))
    for line := range Take(errors, 100) {
        fmt.Println(line)
    }
}

10GB日志文件?只读前100条ERROR行,内存占用几乎为零。


Pattern 5:并发迭代器与Fan-out

问题:单线程迭代器的性能瓶颈

func ProcessImages(images iter.Seq[Image]) []Result {
    var results []Result
    for img := range images {
        r := expensiveTransform(img)
        results = append(results, r)
    }
    return results
}

1000张图片,每张处理100ms,总计100秒。CPU利用率只有12.5%(8核只用1核)。

解决方案:Fan-out并发迭代器

package concurrent

import (
    "iter"
    "sync"
)

func FanOut[V any, U any](seq iter.Seq[V], workers int, fn func(V) U) iter.Seq[U] {
    return func(yield func(U) bool) {
        inputCh := make(chan V)
        outputCh := make(chan U)

        var wg sync.WaitGroup
        for i := 0; i < workers; i++ {
            wg.Add(1)
            go func() {
                defer wg.Done()
                for v := range inputCh {
                    outputCh <- fn(v)
                }
            }()
        }

        go func() {
            for v := range seq {
                inputCh <- v
            }
            close(inputCh)
            wg.Wait()
            close(outputCh)
        }()

        for u := range outputCh {
            if !yield(u) {
                return
            }
        }
    }
}

func FanOutOrdered[V any, U any](seq iter.Seq[V], workers int, fn func(V) U) iter.Seq[U] {
    return func(yield func(U) bool) {
        type indexedResult struct {
            index int
            value U
        }

        next, stop := iter.Pull(seq)
        defer stop()

        inputCh := make(chan indexedInput[V], workers)
        outputCh := make(chan indexedResult, workers)

        type indexedInput[V any] struct {
            index int
            value V
        }

        var wg sync.WaitGroup
        for i := 0; i < workers; i++ {
            wg.Add(1)
            go func() {
                defer wg.Done()
                for inp := range inputCh {
                    outputCh <- indexedResult{
                        index: inp.index,
                        value: fn(inp.value),
                    }
                }
            }()
        }

        go func() {
            idx := 0
            for {
                v, ok := next()
                if !ok {
                    break
                }
                inputCh <- indexedInput[V]{index: idx, value: v}
                idx++
            }
            close(inputCh)
            wg.Wait()
            close(outputCh)
        }()

        results := make(map[int]U)
        nextIdx := 0
        for res := range outputCh {
            results[res.index] = res.value
            for {
                r, ok := results[nextIdx]
                if !ok {
                    break
                }
                delete(results, nextIdx)
                if !yield(r) {
                    return
                }
                nextIdx++
            }
        }
    }
}

使用并发迭代器

type Image struct {
    Path string
    Data []byte
}

type Result struct {
    Path      string
    Thumbnail []byte
}

func LoadImages(paths iter.Seq[string]) iter.Seq[Image] {
    return Map(paths, func(p string) Image {
        data, _ := os.ReadFile(p)
        return Image{Path: p, Data: data}
    })
}

func expensiveTransform(img Image) Result {
    thumbnail := resizeImage(img.Data, 100, 100)
    return Result{Path: img.Path, Thumbnail: thumbnail}
}

func main() {
    paths := SliceIterator([]string{"a.jpg", "b.jpg", "c.jpg"})

    results := FanOut(
        LoadImages(paths),
        runtime.NumCPU(),
        expensiveTransform,
    )

    for r := range results {
        fmt.Printf("处理完成: %s\n", r.Path)
    }
}

并发迭代器架构

               ┌──────────┐
               │ Seq[V]   │
               │ (输入源)  │
               └────┬─────┘
                    │
              ┌─────▼─────┐
              │ inputCh   │
              └─────┬─────┘
                    │
        ┌───────────┼───────────┐
        │           │           │
   ┌────▼───┐  ┌───▼────┐  ┌──▼─────┐
   │Worker 1│  │Worker 2│  │Worker N│
   │ fn(v)  │  │ fn(v)  │  │ fn(v)  │
   └────┬───┘  └───┬────┘  └──┬─────┘
        │          │          │
        └───────────┼──────────┘
                    │
              ┌─────▼─────┐
              │ outputCh  │
              └─────┬─────┘
                    │
              ┌─────▼─────┐
              │ yield(U)  │
              │ (消费者)   │
              └───────────┘

Pattern 6:迭代器错误处理

问题:迭代器中错误被吞掉

func ReadRecords(path string) iter.Seq[Record] {
    return func(yield func(Record) bool) {
        file, _ := os.Open(path)
        defer file.Close()

        decoder := json.NewDecoder(file)
        for decoder.More() {
            var r Record
            if err := decoder.Decode(&r); err != nil {
                return
            }
            if !yield(r) {
                return
            }
        }
    }
}

decoder.Decode出错时,错误信息完全丢失。调用者不知道是正常结束还是出错了。

解决方案:带错误的迭代器

package itererr

import "iter"

type Result[V any] struct {
    Value V
    Err   error
}

func SeqWithError[V any](seq iter.Seq[Result[V]]) (iter.Seq[V], *error) {
    var firstErr error
    values := func(yield func(V) bool) {
        for r := range seq {
            if r.Err != nil {
                if firstErr == nil {
                    firstErr = r.Err
                }
                return
            }
            if !yield(r.Value) {
                return
            }
        }
    }
    return values, &firstErr
}

func Wrap[V any](seq iter.Seq[V], errPtr *error) iter.Seq[Result[V]] {
    return func(yield func(Result[V]) bool) {
        for v := range seq {
            if *errPtr != nil {
                return
            }
            if !yield(Result[V]{Value: v}) {
                return
            }
        }
    }
}

实际应用:数据库行迭代器

package dbiter

import (
    "database/sql"
    "iter"
)

type RowResult[T any] struct {
    Value T
    Err   error
}

func QueryRows[T any](db *sql.DB, query string, scan func(*sql.Rows) (T, error)) iter.Seq[RowResult[T]] {
    return func(yield func(RowResult[T]) bool) {
        rows, err := db.Query(query)
        if err != nil {
            yield(RowResult[T]{Err: err})
            return
        }
        defer rows.Close()

        for rows.Next() {
            v, err := scan(rows)
            if err != nil {
                yield(RowResult[T]{Err: err})
                return
            }
            if !yield(RowResult[T]{Value: v}) {
                return
            }
        }
        if err := rows.Err(); err != nil {
            yield(RowResult[T]{Err: err})
        }
    }
}
type Product struct {
    ID    int
    Name  string
    Price float64
}

func main() {
    db, _ := sql.Open("postgres", "dsn")

    products := QueryRows(db,
        "SELECT id, name, price FROM products",
        func(rows *sql.Rows) (Product, error) {
            var p Product
            err := rows.Scan(&p.ID, &p.Name, &p.Price)
            return p, err
        },
    )

    for r := range products {
        if r.Err != nil {
            log.Printf("迭代出错: %v", r.Err)
            break
        }
        fmt.Printf("%s: $%.2f\n", r.Value.Name, r.Value.Price)
    }
}

错误传播管道

func SafeMap[V any, U any](seq iter.Seq[RowResult[V]], fn func(V) (U, error)) iter.Seq[RowResult[U]] {
    return func(yield func(RowResult[U]) bool) {
        for r := range seq {
            if r.Err != nil {
                if !yield(RowResult[U]{Err: r.Err}) {
                    return
                }
                return
            }
            u, err := fn(r.Value)
            if err != nil {
                if !yield(RowResult[U]{Err: err}) {
                    return
                }
                return
            }
            if !yield(RowResult[U]{Value: u}) {
                return
            }
        }
    }
}

func SafeFilter[V any](seq iter.Seq[RowResult[V]], pred func(V) (bool, error)) iter.Seq[RowResult[V]] {
    return func(yield func(RowResult[V]) bool) {
        for r := range seq {
            if r.Err != nil {
                if !yield(r) {
                    return
                }
                return
            }
            ok, err := pred(r.Value)
            if err != nil {
                if !yield(RowResult[V]{Err: err}) {
                    return
                }
                return
            }
            if ok {
                if !yield(r) {
                    return
                }
            }
        }
    }
}

Pattern 7:生产级迭代器库设计

设计原则

┌─────────────────────────────────────────────┐
│           生产级迭代器库设计原则               │
├─────────────────────────────────────────────┤
│  1. 零分配:管道组合不产生中间切片             │
│  2. 可组合:所有操作返回iter.Seq              │
│  3. 可终止:yield返回false时立即释放资源       │
│  4. 可观测:支持错误传播和指标收集             │
│  5. 可测试:纯函数,无副作用                  │
└─────────────────────────────────────────────┘

完整迭代器工具库

package itool

import "iter"

type Seq[V any] = iter.Seq[V]

type Seq2[K any, V any] = iter.Seq2[K, V]

func FromSlice[V any](s []V) Seq[V] {
    return func(yield func(V) bool) {
        for _, v := range s {
            if !yield(v) {
                return
            }
        }
    }
}

func FromMap[K comparable, V any](m map[K]V) Seq2[K, V] {
    return func(yield func(K, V) bool) {
        for k, v := range m {
            if !yield(k, v) {
                return
            }
        }
    }
}

func FromChannel[V any](ch <-chan V) Seq[V] {
    return func(yield func(V) bool) {
        for v := range ch {
            if !yield(v) {
                return
            }
        }
    }
}

func Generate[V any](fn func() (V, bool)) Seq[V] {
    return func(yield func(V) bool) {
        for {
            v, ok := fn()
            if !ok || !yield(v) {
                return
            }
        }
    }
}

func Concat[V any](seqs ...Seq[V]) Seq[V] {
    return func(yield func(V) bool) {
        for _, seq := range seqs {
            for v := range seq {
                if !yield(v) {
                    return
                }
            }
        }
    }
}

func Distinct[V comparable](seq Seq[V]) Seq[V] {
    return func(yield func(V) bool) {
        seen := make(map[V]bool)
        for v := range seq {
            if !seen[v] {
                seen[v] = true
                if !yield(v) {
                    return
                }
            }
        }
    }
}

func Reverse[V any](seq Seq[V]) Seq[V] {
    return func(yield func(V) bool) {
        var items []V
        for v := range seq {
            items = append(items, v)
        }
        for i := len(items) - 1; i >= 0; i-- {
            if !yield(items[i]) {
                return
            }
        }
    }
}

func Skip[V any](seq Seq[V], n int) Seq[V] {
    return func(yield func(V) bool) {
        i := 0
        for v := range seq {
            if i >= n {
                if !yield(v) {
                    return
                }
            }
            i++
        }
    }
}

func TakeWhile[V any](seq Seq[V], pred func(V) bool) Seq[V] {
    return func(yield func(V) bool) {
        for v := range seq {
            if !pred(v) {
                return
            }
            if !yield(v) {
                return
            }
        }
    }
}

func SkipWhile[V any](seq Seq[V], pred func(V) bool) Seq[V] {
    return func(yield func(V) bool) {
        skipping := true
        for v := range seq {
            if skipping {
                if pred(v) {
                    continue
                }
                skipping = false
            }
            if !yield(v) {
                return
            }
        }
    }
}

func Count[V any](seq Seq[V]) int {
    n := 0
    for range seq {
        n++
    }
    return n
}

func Any[V any](seq Seq[V], pred func(V) bool) bool {
    for v := range seq {
        if pred(v) {
            return true
        }
    }
    return false
}

func All[V any](seq Seq[V], pred func(V) bool) bool {
    for v := range seq {
        if !pred(v) {
            return false
        }
    }
    return true
}

func ForEach[V any](seq Seq[V], fn func(V)) {
    for v := range seq {
        fn(v)
    }
}

func ToSlice[V any](seq Seq[V]) []V {
    var result []V
    for v := range seq {
        result = append(result, v)
    }
    return result
}

func ToMap[K comparable, V any](seq Seq2[K, V]) map[K]V {
    result := make(map[K]V)
    for k, v := range seq {
        result[k] = v
    }
    return result
}

func GroupBy[K comparable, V any](seq Seq2[K, V]) map[K][]V {
    result := make(map[K][]V)
    for k, v := range seq {
        result[k] = append(result[k], v)
    }
    return result
}

使用示例

func main() {
    nums := FromSlice([]int{1, 2, 3, 4, 5, 4, 3, 2, 1})

    result := ToSlice(
        Distinct(
            Filter(
                Map(nums, func(n int) int { return n * 2 }),
                func(n int) bool { return n > 4 },
            ),
        ),
    )

    fmt.Println(result)

    evenCount := Count(Filter(FromSlice([]int{1, 2, 3, 4, 5, 6}), func(n int) bool {
        return n%2 == 0
    }))
    fmt.Println("偶数个数:", evenCount)

    hasNegative := Any(FromSlice([]int{1, 2, 3}), func(n int) bool {
        return n < 0
    })
    fmt.Println("有负数:", hasNegative)
}

5个常见坑及解决方案

坑1:迭代器中捕获循环变量

func BuggyFactory() []iter.Seq[int] {
    var seqs []iter.Seq[int]
    for i := 0; i < 3; i++ {
        seqs = append(seqs, func(yield func(int) bool) {
            yield(i)
        })
    }
    return seqs
}

所有迭代器都返回3。i被闭包捕获,循环结束时值为3。

修复

func FixedFactory() []iter.Seq[int] {
    var seqs []iter.Seq[int]
    for i := 0; i < 3; i++ {
        i := i
        seqs = append(seqs, func(yield func(int) bool) {
            yield(i)
        })
    }
    return seqs
}

坑2:忘记调用stop导致资源泄漏

next, stop := iter.Pull(FileLines("big.log"))
v, ok := next()
fmt.Println(v)

文件永远不会关闭。

修复

next, stop := iter.Pull(FileLines("big.log"))
defer stop()
v, ok := next()
fmt.Println(v)

坑3:迭代器不是可重入的

seq := Fibonacci()
for v := range Take(seq, 5) {
    fmt.Println(v)
}
for v := range Take(seq, 5) {
    fmt.Println(v)
}

第二次range不会输出任何值。迭代器是一次性的。

修复

fibFactory := func() iter.Seq[int] { return Fibonacci() }

for v := range Take(fibFactory(), 5) {
    fmt.Println(v)
}
for v := range Take(fibFactory(), 5) {
    fmt.Println(v)
}

坑4:在迭代器中panic无法被外部recover

func RiskySeq() iter.Seq[int] {
    return func(yield func(int) bool) {
        panic("oops")
    }
}

func main() {
    defer func() {
        if r := recover(); r != nil {
            fmt.Println("recovered:", r)
        }
    }()
    for v := range RiskySeq() {
        fmt.Println(v)
    }
}

Go 1.24中,range over func的panic可以被外部recover。但不要依赖这个行为,迭代器应该自己处理错误。

坑5:并发range同一个迭代器

seq := FromSlice([]int{1, 2, 3, 4, 5})

var wg sync.WaitGroup
for i := 0; i < 3; i++ {
    wg.Add(1)
    go func() {
        defer wg.Done()
        for v := range seq {
            fmt.Println(v)
        }
    }()
}
wg.Wait()

iter.Seq不是并发安全的。多个goroutine同时range会导致数据竞争。

修复:使用Fan-out模式,或为每个goroutine创建独立迭代器。


10个常见报错排查

报错 原因 解决方案
cannot range over seq (variable of type func(yield func(int) bool)) 函数签名不匹配iter.Seq 确保签名是func(yield func(V) bool)
cannot use function as type iter.Seq[int] yield函数参数类型不匹配 检查yield参数类型与Seq的类型参数一致
iter.Pull: iterator did not call stop Pull迭代器未调用stop 始终defer stop()
panic: range over func: yield called after return yield在迭代器return后被调用 检查goroutine中是否延迟调用了yield
deadlock Fan-out中inputCh/outputCh未关闭 确保所有goroutine退出后close channel
data race 多goroutine同时range同一Seq 每个goroutine使用独立迭代器
out of memory 无限迭代器未配合Take使用 始终对无限序列使用Take/Skip限制
goroutine leak 迭代器中启动的goroutine未退出 使用context或done channel控制退出
unexpected EOF during iteration 文件迭代器中文件被外部修改 加文件锁或使用快照
yield returns false but iteration continues 未检查yield返回值 每次yield后检查返回值,false则return

进阶优化技巧

技巧1:预分配减少GC压力

func ToSlicePrealloc[V any](seq Seq[V], hint int) []V {
    result := make([]V, 0, hint)
    for v := range seq {
        result = append(result, v)
    }
    return result[:len(result)]
}

知道大致数量时,预分配避免多次扩容。

技巧2:批处理迭代器减少系统调用

func Batched[V any](seq Seq[V], batchSize int) Seq[[]V] {
    return func(yield func([]V) bool) {
        batch := make([]V, 0, batchSize)
        for v := range seq {
            batch = append(batch, v)
            if len(batch) == batchSize {
                if !yield(batch) {
                    return
                }
                batch = make([]V, 0, batchSize)
            }
        }
        if len(batch) > 0 {
            yield(batch)
        }
    }
}

数据库批量插入时,每100条提交一次,减少网络往返。

技巧3:迭代器与context结合实现超时控制

func WithContext[V any](ctx context.Context, seq Seq[V]) Seq[V] {
    return func(yield func(V) bool) {
        for v := range seq {
            select {
            case <-ctx.Done():
                return
            default:
                if !yield(v) {
                    return
                }
            }
        }
    }
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

for v := range WithContext(ctx, SlowIterator()) {
    fmt.Println(v)
}

对比分析:迭代器 vs 通道 vs 切片

维度 迭代器 (iter.Seq) 通道 (chan) 切片 ([]T)
内存占用 O(1) O(n) buffer O(n)
惰性求值 支持 不支持 不支持
无限序列 支持 不支持 不支持
并发安全
组合性 极好(函数链) 中等(需goroutine) 差(中间切片)
错误处理 需要封装 天然支持 直接返回error
可重入
性能 零分配 有锁开销 拷贝开销
适用场景 数据管道、惰性计算 并发通信 小数据集、随机访问
Go版本要求 1.24+ 1.0+ 1.0+
调试难度 中等 较高

选择决策树

需要惰性求值或无限序列?
├── 是 → 迭代器
└── 否
    ├── 需要并发通信?
    │   └── 是 → 通道
    └── 否
        ├── 数据量小且需要随机访问?
        │   └── 是 → 切片
        └── 否 → 迭代器

在线工具推荐

外部参考


总结

Go 1.24迭代器模式让Go拥有了原生的、零分配的、可组合的数据管道能力。7种核心模式覆盖了从基础遍历到生产级库设计的完整链路:

  1. range over func基础迭代器 — 一切迭代的起点,yield控制流
  2. Push/Pull迭代器转换 — iter.Pull让迭代器可手动控制
  3. 数据管道组合 — Map/Filter/Reduce链式组合,零中间分配
  4. 惰性求值与无限序列 — 按需计算,处理无限数据流
  5. 并发迭代器与Fan-out — 多goroutine并行消费
  6. 迭代器错误处理 — RowResult模式优雅传播错误
  7. 生产级迭代器库设计 — 零分配、可组合、可终止

迭代器不是通道的替代品,也不是切片的替代品。它们是Go数据处理工具箱中的新成员——当你需要惰性求值、零分配管道组合时,迭代器是最佳选择。

相关阅读

本站提供浏览器本地工具,免注册即可试用 →

#Go#迭代器#Iterator#range over func#数据管道#Go 1.24#2026#编程语言