Go 1.24迭代器模式:從range over func到資料管道的7種生產模式
當for循環遇上函數:Go迭代器的範式轉移
上週重構資料處理服務,一個3層嵌套的for循環處理10萬條記錄,記憶體飆到2GB——因為每層都要把中間結果存成切片再傳給下一層。改成迭代器管道後,記憶體降到15MB,處理速度反而快了30%。關鍵轉變:不再「先收集再處理」,而是「邊遍歷邊處理」。
Go 1.24正式穩定了range over func語法和iter包,讓Go擁有了原生的、零分配的、可組合的迭代器。這不是語法糖,而是資料處理範式的根本轉變。本文將從7種生產級Go迭代器模式出發,幫你構建高效、優雅、可組合的資料管道。
核心要點
- range over func是迭代器的核心語法:Go 1.24讓函數直接成為可range的對象
- Push/Pull迭代器轉換:理解迭代器的兩種方向,掌握
iter.Pull轉換 - 資料管道組合:Map/Filter/Reduce鏈式組合,零中間分配
- 惰性求值與無限序列:按需計算,處理無限資料流
- 並發迭代器與Fan-out:多goroutine並行消費迭代器
- 迭代器錯誤處理:優雅處理迭代過程中的錯誤
- 生產級迭代器庫設計:構建可複用的迭代器工具庫
目錄
- Go迭代器核心概念速查表
- Pattern 1:range over func基礎迭代器
- Pattern 2:Push/Pull迭代器轉換
- Pattern 3:資料管道組合(Map/Filter/Reduce)
- Pattern 4:惰性求值與無限序列
- Pattern 5:並發迭代器與Fan-out
- Pattern 6:迭代器錯誤處理
- Pattern 7:生產級迭代器庫設計
- 5個常見坑及解決方案
- 10個常見報錯排查
- 進階優化技巧
- 對比分析:迭代器vs通道vs切片
- 在線工具推薦
- 總結
Go迭代器核心概念速查表
| 概念 | 簽名 | 用途 | 範例 |
|---|---|---|---|
| 迭代器函數 | func(yield func(V) bool) |
單值迭代器 | func(yield func(int) bool) |
| 鍵值迭代器 | func(yield func(K, V) bool) |
鍵值對迭代 | func(yield func(int, string) bool) |
| Pull迭代器 | func() (V, bool) |
按需拉取 | next, stop := iter.Pull(seq) |
| iter.Pull | func(Seq[V]) (func() (V, bool), func()) |
Push轉Pull | 消費者驅動遍歷 |
| iter.Stop | 內建stop函數 | 提前終止迭代 | stop() 釋放資源 |
| yield返回值 | bool |
控制迭代繼續/停止 | yield(v) 返回false則停止 |
| 惰性求值 | 延遲計算 | 按需生成值 | 無限序列、檔案行 |
| 管道組合 | 函數鏈式呼叫 | 零中間分配 | Filter(Map(Seq, fn), pred) |
Pattern 1:range over func基礎迭代器
問題:傳統遍歷的記憶體陷阱
func GetAllUsers(db *sql.DB) ([]User, error) {
rows, err := db.Query("SELECT id, name, email FROM users")
if err != nil {
return nil, err
}
defer rows.Close()
var users []User
for rows.Next() {
var u User
if err := rows.Scan(&u.ID, &u.Name, &u.Email); err != nil {
return nil, err
}
users = append(users, u)
}
return users, rows.Err()
}
100萬用戶?100萬條User結構體全部載入到記憶體。你只需要前10條?不好意思,先全部載入。
解決方案:range over func迭代器
package iterator
import (
"database/sql"
"iter"
)
type User struct {
ID int
Name string
Email string
}
func AllUsers(db *sql.DB) iter.Seq2[int, User] {
return func(yield func(int, User) bool) {
rows, err := db.Query("SELECT id, name, email FROM users")
if err != nil {
return
}
defer rows.Close()
i := 0
for rows.Next() {
var u User
if err := rows.Scan(&u.ID, &u.Name, &u.Email); err != nil {
return
}
if !yield(i, u) {
return
}
i++
}
}
}
使用方式:
for i, user := range AllUsers(db) {
fmt.Printf("%d: %s\n", i, user.Name)
if i >= 9 {
break
}
}
break時,yield返回false,迭代器函數直接return——只查詢了10條,資料庫連接正常關閉。
迭代器執行流程
┌─────────────┐ yield(v) ┌──────────────┐
│ 迭代器函數 │ ──────────────→ │ range循環 │
│ (producer) │ │ (consumer) │
│ │ ←────────────── │ │
│ │ yield返回bool │ │
└─────────────┘ └──────────────┘
│ │
│ yield返回false → return │
│ (提前終止) │
└──────────────────────────────────┘
單值迭代器 vs 鍵值迭代器
type IntSlice []int
func (s IntSlice) Values() iter.Seq[int] {
return func(yield func(int) bool) {
for _, v := range s {
if !yield(v) {
return
}
}
}
}
func (s IntSlice) All() iter.Seq2[int, int] {
return func(yield func(int, int) bool) {
for i, v := range s {
if !yield(i, v) {
return
}
}
}
}
nums := IntSlice{10, 20, 30}
for v := range nums.Values() {
fmt.Println(v)
}
for i, v := range nums.All() {
fmt.Printf("index=%d value=%d\n", i, v)
}
Pattern 2:Push/Pull迭代器轉換
Push迭代器與Pull迭代器的區別
Push迭代器 (iter.Seq) Pull迭代器 (func() (V, bool))
┌──────────────────┐ ┌──────────────────┐
│ 生產者主動推送 │ │ 消費者主動拉取 │
│ yield(v) → 消費者 │ │ next() → 生產者 │
│ │ │ │
│ 適合:range遍歷 │ │ 適合:手動控制 │
│ 適合:管道組合 │ │ 適合:提前查看 │
│ 適合:惰性求值 │ │ 適合:互操作 │
└──────────────────┘ └──────────────────┘
│ │
│ iter.Pull() 轉換 │
└──────────────────────────────────┘
使用iter.Pull轉換
package main
import (
"fmt"
"iter"
)
func Countdown(n int) iter.Seq[int] {
return func(yield func(int) bool) {
for i := n; i > 0; i-- {
if !yield(i) {
return
}
}
}
}
func main() {
next, stop := iter.Pull(Countdown(5))
defer stop()
for {
v, ok := next()
if !ok {
break
}
fmt.Println(v)
if v == 3 {
fmt.Println("提前終止")
break
}
}
}
Pull迭代器的實際應用:Peek和Take
package iterutil
import "iter"
func Take[V any](seq iter.Seq[V], n int) iter.Seq[V] {
return func(yield func(V) bool) {
count := 0
for v := range seq {
if count >= n {
return
}
if !yield(v) {
return
}
count++
}
}
}
func First[V any](seq iter.Seq[V]) (V, bool) {
next, stop := iter.Pull(seq)
defer stop()
return next()
}
func PeekN[V any](seq iter.Seq[V], n int) []V {
result := make([]V, 0, n)
next, stop := iter.Pull(seq)
defer stop()
for i := 0; i < n; i++ {
v, ok := next()
if !ok {
break
}
result = append(result, v)
}
return result
}
nums := Countdown(100)
fmt.Println(First(nums))
fmt.Println(PeekN(nums, 5))
重要:iter.Pull的資源清理
func ProcessLines(filename string) iter.Seq[string] {
return func(yield func(string) bool) {
file, err := os.Open(filename)
if err != nil {
return
}
defer file.Close()
scanner := bufio.NewScanner(file)
for scanner.Scan() {
if !yield(scanner.Text()) {
return
}
}
}
}
func main() {
next, stop := iter.Pull(ProcessLines("huge.log"))
defer stop()
v, ok := next()
if ok {
fmt.Println("第一行:", v)
}
}
defer stop()確保即使只消費一個值,檔案也會被正確關閉。
Pattern 3:資料管道組合(Map/Filter/Reduce)
問題:嵌套循環與中間切片
func ProcessOrders(orders []Order) float64 {
var active []Order
for _, o := range orders {
if o.Status == "active" {
active = append(active, o)
}
}
var amounts []float64
for _, o := range active {
amounts = append(amounts, o.Amount*1.1)
}
var total float64
for _, a := range amounts {
total += a
}
return total
}
3次遍歷,2個中間切片。資料量大時,記憶體和GC壓力巨大。
解決方案:迭代器管道
package pipeline
import "iter"
func Map[V any, U any](seq iter.Seq[V], fn func(V) U) iter.Seq[U] {
return func(yield func(U) bool) {
for v := range seq {
if !yield(fn(v)) {
return
}
}
}
}
func Map2[K any, V any, U any](seq iter.Seq2[K, V], fn func(K, V) U) iter.Seq[U] {
return func(yield func(U) bool) {
for k, v := range seq {
if !yield(fn(k, v)) {
return
}
}
}
}
func Filter[V any](seq iter.Seq[V], pred func(V) bool) iter.Seq[V] {
return func(yield func(V) bool) {
for v := range seq {
if pred(v) {
if !yield(v) {
return
}
}
}
}
}
func Filter2[K any, V any](seq iter.Seq2[K, V], pred func(K, V) bool) iter.Seq2[K, V] {
return func(yield func(K, V) bool) {
for k, v := range seq {
if pred(k, v) {
if !yield(k, v) {
return
}
}
}
}
}
func Reduce[V any, U any](seq iter.Seq[V], init U, fn func(U, V) U) U {
acc := init
for v := range seq {
acc = fn(acc, v)
}
return acc
}
使用管道組合
type Order struct {
ID int
Status string
Amount float64
}
func OrdersFromDB(db *sql.DB) iter.Seq[Order] {
return func(yield func(Order) bool) {
rows, _ := db.Query("SELECT id, status, amount FROM orders")
if rows != nil {
defer rows.Close()
for rows.Next() {
var o Order
rows.Scan(&o.ID, &o.Status, &o.Amount)
if !yield(o) {
return
}
}
}
}
}
func main() {
db, _ := sql.Open("postgres", "dsn")
total := Reduce(
Map(
Filter(
OrdersFromDB(db),
func(o Order) bool { return o.Status == "active" },
),
func(o Order) float64 { return o.Amount * 1.1 },
),
0.0,
func(acc float64, v float64) float64 { return acc + v },
)
fmt.Printf("總金額: %.2f\n", total)
}
零中間分配:Filter、Map、Reduce串聯,每個元素只經過一次處理。
管道執行流程
OrdersFromDB → Filter(active) → Map(×1.1) → Reduce(+) → total
│ │ │ │
│ Order{1, │ Status== │ Amount*1.1 │ acc+v
│ "active", │ "active"? │ │
│ 100.0} │ │ │
│ ──────→ │ ✓ 通過 │ │
│ │ ──────→ │ 110.0 │
│ │ │ ──────→ │ 110.0
│
│ Order{2, │ Status!= │ │
│ "closed", │ "active" │ │
│ 200.0} │ ✗ 過濾掉 │ │
│ ──────→ │ 跳過 │ │
更多管道操作符
func FlatMap[V any, U any](seq iter.Seq[V], fn func(V) iter.Seq[U]) iter.Seq[U] {
return func(yield func(U) bool) {
for v := range seq {
for u := range fn(v) {
if !yield(u) {
return
}
}
}
}
}
func Zip[V any, U any](seq1 iter.Seq[V], seq2 iter.Seq[U]) iter.Seq2[V, U] {
return func(yield func(V, U) bool) {
next1, stop1 := iter.Pull(seq1)
defer stop1()
next2, stop2 := iter.Pull(seq2)
defer stop2()
for {
v1, ok1 := next1()
v2, ok2 := next2()
if !ok1 || !ok2 {
return
}
if !yield(v1, v2) {
return
}
}
}
}
func Enumerate[V any](seq iter.Seq[V]) iter.Seq2[int, V] {
return func(yield func(int, V) bool) {
i := 0
for v := range seq {
if !yield(i, v) {
return
}
i++
}
}
}
func Chunk[V any](seq iter.Seq[V], size int) iter.Seq[[]V] {
return func(yield func([]V) bool) {
chunk := make([]V, 0, size)
for v := range seq {
chunk = append(chunk, v)
if len(chunk) == size {
if !yield(chunk) {
return
}
chunk = make([]V, 0, size)
}
}
if len(chunk) > 0 {
yield(chunk)
}
}
}
Pattern 4:惰性求值與無限序列
問題:預計算全部結果的浪費
func Fibonacci(n int) []int {
result := make([]int, n)
if n > 0 {
result[0] = 0
}
if n > 1 {
result[1] = 1
}
for i := 2; i < n; i++ {
result[i] = result[i-1] + result[i-2]
}
return result
}
需要前10個斐波那契數?必須指定n。不知道要多少?只能先算一個「足夠大」的值。
解決方案:無限迭代器 + 惰性求值
package lazy
import "iter"
func Fibonacci() iter.Seq[int] {
return func(yield func(int) bool) {
a, b := 0, 1
for {
if !yield(a) {
return
}
a, b = b, a+b
}
}
}
func NaturalNumbers() iter.Seq[int] {
return func(yield func(int) bool) {
for i := 0; ; i++ {
if !yield(i) {
return
}
}
}
}
func Repeat[V any](v V) iter.Seq[V] {
return func(yield func(V) bool) {
for {
if !yield(v) {
return
}
}
}
}
func Iterate[V any](init V, fn func(V) V) iter.Seq[V] {
return func(yield func(V) bool) {
v := init
for {
if !yield(v) {
return
}
v = fn(v)
}
}
}
func Cycle[V any](seq iter.Seq[V]) iter.Seq[V] {
return func(yield func(V) bool) {
for {
for v := range seq {
if !yield(v) {
return
}
}
}
}
}
使用惰性序列
func main() {
for v := range Take(Fibonacci(), 10) {
fmt.Print(v, " ")
}
fmt.Println()
squares := Map(
Take(NaturalNumbers(), 5),
func(n int) int { return n * n },
)
for v := range squares {
fmt.Print(v, " ")
}
fmt.Println()
powersOf2 := Iterate(1, func(v int) int { return v * 2 })
for v := range Take(powersOf2, 8) {
fmt.Print(v, " ")
}
fmt.Println()
}
惰性檔案處理
func FileLines(path string) iter.Seq[string] {
return func(yield func(string) bool) {
f, err := os.Open(path)
if err != nil {
return
}
defer f.Close()
scanner := bufio.NewScanner(f)
for scanner.Scan() {
if !yield(scanner.Text()) {
return
}
}
}
}
func Grep(pattern string, lines iter.Seq[string]) iter.Seq[string] {
re := regexp.MustCompile(pattern)
return Filter(lines, func(line string) bool {
return re.MatchString(line)
})
}
func main() {
errors := Grep("ERROR", FileLines("/var/log/app.log"))
for line := range Take(errors, 100) {
fmt.Println(line)
}
}
10GB日誌檔案?只讀前100條ERROR行,記憶體佔用幾乎為零。
Pattern 5:並發迭代器與Fan-out
問題:單線程迭代器的效能瓶頸
func ProcessImages(images iter.Seq[Image]) []Result {
var results []Result
for img := range images {
r := expensiveTransform(img)
results = append(results, r)
}
return results
}
1000張圖片,每張處理100ms,總計100秒。CPU利用率只有12.5%(8核只用1核)。
解決方案:Fan-out並發迭代器
package concurrent
import (
"iter"
"sync"
)
func FanOut[V any, U any](seq iter.Seq[V], workers int, fn func(V) U) iter.Seq[U] {
return func(yield func(U) bool) {
inputCh := make(chan V)
outputCh := make(chan U)
var wg sync.WaitGroup
for i := 0; i < workers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for v := range inputCh {
outputCh <- fn(v)
}
}()
}
go func() {
for v := range seq {
inputCh <- v
}
close(inputCh)
wg.Wait()
close(outputCh)
}()
for u := range outputCh {
if !yield(u) {
return
}
}
}
}
func FanOutOrdered[V any, U any](seq iter.Seq[V], workers int, fn func(V) U) iter.Seq[U] {
return func(yield func(U) bool) {
type indexedResult struct {
index int
value U
}
next, stop := iter.Pull(seq)
defer stop()
type indexedInput[V any] struct {
index int
value V
}
inputCh := make(chan indexedInput[V], workers)
outputCh := make(chan indexedResult, workers)
var wg sync.WaitGroup
for i := 0; i < workers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for inp := range inputCh {
outputCh <- indexedResult{
index: inp.index,
value: fn(inp.value),
}
}
}()
}
go func() {
idx := 0
for {
v, ok := next()
if !ok {
break
}
inputCh <- indexedInput[V]{index: idx, value: v}
idx++
}
close(inputCh)
wg.Wait()
close(outputCh)
}()
results := make(map[int]U)
nextIdx := 0
for res := range outputCh {
results[res.index] = res.value
for {
r, ok := results[nextIdx]
if !ok {
break
}
delete(results, nextIdx)
if !yield(r) {
return
}
nextIdx++
}
}
}
}
使用並發迭代器
type Image struct {
Path string
Data []byte
}
type Result struct {
Path string
Thumbnail []byte
}
func LoadImages(paths iter.Seq[string]) iter.Seq[Image] {
return Map(paths, func(p string) Image {
data, _ := os.ReadFile(p)
return Image{Path: p, Data: data}
})
}
func expensiveTransform(img Image) Result {
thumbnail := resizeImage(img.Data, 100, 100)
return Result{Path: img.Path, Thumbnail: thumbnail}
}
func main() {
paths := SliceIterator([]string{"a.jpg", "b.jpg", "c.jpg"})
results := FanOut(
LoadImages(paths),
runtime.NumCPU(),
expensiveTransform,
)
for r := range results {
fmt.Printf("處理完成: %s\n", r.Path)
}
}
並發迭代器架構
┌──────────┐
│ Seq[V] │
│ (輸入源) │
└────┬─────┘
│
┌─────▼─────┐
│ inputCh │
└─────┬─────┘
│
┌───────────┼───────────┐
│ │ │
┌────▼───┐ ┌───▼────┐ ┌──▼─────┐
│Worker 1│ │Worker 2│ │Worker N│
│ fn(v) │ │ fn(v) │ │ fn(v) │
└────┬───┘ └───┬────┘ └──┬─────┘
│ │ │
└───────────┼──────────┘
│
┌─────▼─────┐
│ outputCh │
└─────┬─────┘
│
┌─────▼─────┐
│ yield(U) │
│ (消費者) │
└───────────┘
Pattern 6:迭代器錯誤處理
問題:迭代器中錯誤被吞掉
func ReadRecords(path string) iter.Seq[Record] {
return func(yield func(Record) bool) {
file, _ := os.Open(path)
defer file.Close()
decoder := json.NewDecoder(file)
for decoder.More() {
var r Record
if err := decoder.Decode(&r); err != nil {
return
}
if !yield(r) {
return
}
}
}
}
decoder.Decode出錯時,錯誤資訊完全丟失。呼叫者不知道是正常結束還是出錯了。
解決方案:帶錯誤的迭代器
package itererr
import "iter"
type Result[V any] struct {
Value V
Err error
}
func SeqWithError[V any](seq iter.Seq[Result[V]]) (iter.Seq[V], *error) {
var firstErr error
values := func(yield func(V) bool) {
for r := range seq {
if r.Err != nil {
if firstErr == nil {
firstErr = r.Err
}
return
}
if !yield(r.Value) {
return
}
}
}
return values, &firstErr
}
func Wrap[V any](seq iter.Seq[V], errPtr *error) iter.Seq[Result[V]] {
return func(yield func(Result[V]) bool) {
for v := range seq {
if *errPtr != nil {
return
}
if !yield(Result[V]{Value: v}) {
return
}
}
}
}
實際應用:資料庫行迭代器
package dbiter
import (
"database/sql"
"iter"
)
type RowResult[T any] struct {
Value T
Err error
}
func QueryRows[T any](db *sql.DB, query string, scan func(*sql.Rows) (T, error)) iter.Seq[RowResult[T]] {
return func(yield func(RowResult[T]) bool) {
rows, err := db.Query(query)
if err != nil {
yield(RowResult[T]{Err: err})
return
}
defer rows.Close()
for rows.Next() {
v, err := scan(rows)
if err != nil {
yield(RowResult[T]{Err: err})
return
}
if !yield(RowResult[T]{Value: v}) {
return
}
}
if err := rows.Err(); err != nil {
yield(RowResult[T]{Err: err})
}
}
}
type Product struct {
ID int
Name string
Price float64
}
func main() {
db, _ := sql.Open("postgres", "dsn")
products := QueryRows(db,
"SELECT id, name, price FROM products",
func(rows *sql.Rows) (Product, error) {
var p Product
err := rows.Scan(&p.ID, &p.Name, &p.Price)
return p, err
},
)
for r := range products {
if r.Err != nil {
log.Printf("迭代出錯: %v", r.Err)
break
}
fmt.Printf("%s: $%.2f\n", r.Value.Name, r.Value.Price)
}
}
錯誤傳播管道
func SafeMap[V any, U any](seq iter.Seq[RowResult[V]], fn func(V) (U, error)) iter.Seq[RowResult[U]] {
return func(yield func(RowResult[U]) bool) {
for r := range seq {
if r.Err != nil {
if !yield(RowResult[U]{Err: r.Err}) {
return
}
return
}
u, err := fn(r.Value)
if err != nil {
if !yield(RowResult[U]{Err: err}) {
return
}
return
}
if !yield(RowResult[U]{Value: u}) {
return
}
}
}
}
func SafeFilter[V any](seq iter.Seq[RowResult[V]], pred func(V) (bool, error)) iter.Seq[RowResult[V]] {
return func(yield func(RowResult[V]) bool) {
for r := range seq {
if r.Err != nil {
if !yield(r) {
return
}
return
}
ok, err := pred(r.Value)
if err != nil {
if !yield(RowResult[V]{Err: err}) {
return
}
return
}
if ok {
if !yield(r) {
return
}
}
}
}
}
Pattern 7:生產級迭代器庫設計
設計原則
┌─────────────────────────────────────────────┐
│ 生產級迭代器庫設計原則 │
├─────────────────────────────────────────────┤
│ 1. 零分配:管道組合不產生中間切片 │
│ 2. 可組合:所有操作返回iter.Seq │
│ 3. 可終止:yield返回false時立即釋放資源 │
│ 4. 可觀測:支援錯誤傳播和指標收集 │
│ 5. 可測試:純函數,無副作用 │
└─────────────────────────────────────────────┘
完整迭代器工具庫
package itool
import "iter"
type Seq[V any] = iter.Seq[V]
type Seq2[K any, V any] = iter.Seq2[K, V]
func FromSlice[V any](s []V) Seq[V] {
return func(yield func(V) bool) {
for _, v := range s {
if !yield(v) {
return
}
}
}
}
func FromMap[K comparable, V any](m map[K]V) Seq2[K, V] {
return func(yield func(K, V) bool) {
for k, v := range m {
if !yield(k, v) {
return
}
}
}
}
func FromChannel[V any](ch <-chan V) Seq[V] {
return func(yield func(V) bool) {
for v := range ch {
if !yield(v) {
return
}
}
}
}
func Generate[V any](fn func() (V, bool)) Seq[V] {
return func(yield func(V) bool) {
for {
v, ok := fn()
if !ok || !yield(v) {
return
}
}
}
}
func Concat[V any](seqs ...Seq[V]) Seq[V] {
return func(yield func(V) bool) {
for _, seq := range seqs {
for v := range seq {
if !yield(v) {
return
}
}
}
}
}
func Distinct[V comparable](seq Seq[V]) Seq[V] {
return func(yield func(V) bool) {
seen := make(map[V]bool)
for v := range seq {
if !seen[v] {
seen[v] = true
if !yield(v) {
return
}
}
}
}
}
func Reverse[V any](seq Seq[V]) Seq[V] {
return func(yield func(V) bool) {
var items []V
for v := range seq {
items = append(items, v)
}
for i := len(items) - 1; i >= 0; i-- {
if !yield(items[i]) {
return
}
}
}
}
func Skip[V any](seq Seq[V], n int) Seq[V] {
return func(yield func(V) bool) {
i := 0
for v := range seq {
if i >= n {
if !yield(v) {
return
}
}
i++
}
}
}
func TakeWhile[V any](seq Seq[V], pred func(V) bool) Seq[V] {
return func(yield func(V) bool) {
for v := range seq {
if !pred(v) {
return
}
if !yield(v) {
return
}
}
}
}
func SkipWhile[V any](seq Seq[V], pred func(V) bool) Seq[V] {
return func(yield func(V) bool) {
skipping := true
for v := range seq {
if skipping {
if pred(v) {
continue
}
skipping = false
}
if !yield(v) {
return
}
}
}
}
func Count[V any](seq Seq[V]) int {
n := 0
for range seq {
n++
}
return n
}
func Any[V any](seq Seq[V], pred func(V) bool) bool {
for v := range seq {
if pred(v) {
return true
}
}
return false
}
func All[V any](seq Seq[V], pred func(V) bool) bool {
for v := range seq {
if !pred(v) {
return false
}
}
return true
}
func ForEach[V any](seq Seq[V], fn func(V)) {
for v := range seq {
fn(v)
}
}
func ToSlice[V any](seq Seq[V]) []V {
var result []V
for v := range seq {
result = append(result, v)
}
return result
}
func ToMap[K comparable, V any](seq Seq2[K, V]) map[K]V {
result := make(map[K]V)
for k, v := range seq {
result[k] = v
}
return result
}
func GroupBy[K comparable, V any](seq Seq2[K, V]) map[K][]V {
result := make(map[K][]V)
for k, v := range seq {
result[k] = append(result[k], v)
}
return result
}
使用範例
func main() {
nums := FromSlice([]int{1, 2, 3, 4, 5, 4, 3, 2, 1})
result := ToSlice(
Distinct(
Filter(
Map(nums, func(n int) int { return n * 2 }),
func(n int) bool { return n > 4 },
),
),
)
fmt.Println(result)
evenCount := Count(Filter(FromSlice([]int{1, 2, 3, 4, 5, 6}), func(n int) bool {
return n%2 == 0
}))
fmt.Println("偶數個數:", evenCount)
hasNegative := Any(FromSlice([]int{1, 2, 3}), func(n int) bool {
return n < 0
})
fmt.Println("有負數:", hasNegative)
}
5個常見坑及解決方案
坑1:迭代器中捕獲循環變數
func BuggyFactory() []iter.Seq[int] {
var seqs []iter.Seq[int]
for i := 0; i < 3; i++ {
seqs = append(seqs, func(yield func(int) bool) {
yield(i)
})
}
return seqs
}
所有迭代器都返回3。i被閉包捕獲,循環結束時值為3。
修復:
func FixedFactory() []iter.Seq[int] {
var seqs []iter.Seq[int]
for i := 0; i < 3; i++ {
i := i
seqs = append(seqs, func(yield func(int) bool) {
yield(i)
})
}
return seqs
}
坑2:忘記呼叫stop導致資源洩漏
next, stop := iter.Pull(FileLines("big.log"))
v, ok := next()
fmt.Println(v)
檔案永遠不會關閉。
修復:
next, stop := iter.Pull(FileLines("big.log"))
defer stop()
v, ok := next()
fmt.Println(v)
坑3:迭代器不是可重入的
seq := Fibonacci()
for v := range Take(seq, 5) {
fmt.Println(v)
}
for v := range Take(seq, 5) {
fmt.Println(v)
}
第二次range不會輸出任何值。迭代器是一次性的。
修復:
fibFactory := func() iter.Seq[int] { return Fibonacci() }
for v := range Take(fibFactory(), 5) {
fmt.Println(v)
}
for v := range Take(fibFactory(), 5) {
fmt.Println(v)
}
坑4:在迭代器中panic無法被外部recover
func RiskySeq() iter.Seq[int] {
return func(yield func(int) bool) {
panic("oops")
}
}
func main() {
defer func() {
if r := recover(); r != nil {
fmt.Println("recovered:", r)
}
}()
for v := range RiskySeq() {
fmt.Println(v)
}
}
Go 1.24中,range over func的panic可以被外部recover。但不要依賴這個行為,迭代器應該自己處理錯誤。
坑5:並發range同一個迭代器
seq := FromSlice([]int{1, 2, 3, 4, 5})
var wg sync.WaitGroup
for i := 0; i < 3; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for v := range seq {
fmt.Println(v)
}
}()
}
wg.Wait()
iter.Seq不是並發安全的。多個goroutine同時range會導致資料競爭。
修復:使用Fan-out模式,或為每個goroutine建立獨立迭代器。
10個常見報錯排查
| 報錯 | 原因 | 解決方案 |
|---|---|---|
cannot range over seq (variable of type func(yield func(int) bool)) |
函數簽名不匹配iter.Seq | 確保簽名是func(yield func(V) bool) |
cannot use function as type iter.Seq[int] |
yield函數參數類型不匹配 | 檢查yield參數類型與Seq的類型參數一致 |
iter.Pull: iterator did not call stop |
Pull迭代器未呼叫stop | 始終defer stop() |
panic: range over func: yield called after return |
yield在迭代器return後被呼叫 | 檢查goroutine中是否延遲呼叫了yield |
deadlock |
Fan-out中inputCh/outputCh未關閉 | 確保所有goroutine退出後close channel |
data race |
多goroutine同時range同一Seq | 每個goroutine使用獨立迭代器 |
out of memory |
無限迭代器未配合Take使用 | 始終對無限序列使用Take/Skip限制 |
goroutine leak |
迭代器中啟動的goroutine未退出 | 使用context或done channel控制退出 |
unexpected EOF during iteration |
檔案迭代器中檔案被外部修改 | 加檔案鎖或使用快照 |
yield returns false but iteration continues |
未檢查yield返回值 | 每次yield後檢查返回值,false則return |
進階優化技巧
技巧1:預分配減少GC壓力
func ToSlicePrealloc[V any](seq Seq[V], hint int) []V {
result := make([]V, 0, hint)
for v := range seq {
result = append(result, v)
}
return result[:len(result)]
}
知道大致數量時,預分配避免多次擴容。
技巧2:批次處理迭代器減少系統呼叫
func Batched[V any](seq Seq[V], batchSize int) Seq[[]V] {
return func(yield func([]V) bool) {
batch := make([]V, 0, batchSize)
for v := range seq {
batch = append(batch, v)
if len(batch) == batchSize {
if !yield(batch) {
return
}
batch = make([]V, 0, batchSize)
}
}
if len(batch) > 0 {
yield(batch)
}
}
}
資料庫批次插入時,每100條提交一次,減少網路往返。
技巧3:迭代器與context結合實現超時控制
func WithContext[V any](ctx context.Context, seq Seq[V]) Seq[V] {
return func(yield func(V) bool) {
for v := range seq {
select {
case <-ctx.Done():
return
default:
if !yield(v) {
return
}
}
}
}
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
for v := range WithContext(ctx, SlowIterator()) {
fmt.Println(v)
}
對比分析:迭代器 vs 通道 vs 切片
| 維度 | 迭代器 (iter.Seq) | 通道 (chan) | 切片 ([]T) |
|---|---|---|---|
| 記憶體佔用 | O(1) | O(n) buffer | O(n) |
| 惰性求值 | 支援 | 不支援 | 不支援 |
| 無限序列 | 支援 | 不支援 | 不支援 |
| 並發安全 | 否 | 是 | 否 |
| 組合性 | 極好(函數鏈) | 中等(需goroutine) | 差(中間切片) |
| 錯誤處理 | 需要封裝 | 天然支援 | 直接返回error |
| 可重入 | 否 | 否 | 是 |
| 效能 | 零分配 | 有鎖開銷 | 拷貝開銷 |
| 適用場景 | 資料管道、惰性計算 | 並發通信 | 小資料集、隨機存取 |
| Go版本要求 | 1.24+ | 1.0+ | 1.0+ |
| 除錯難度 | 中等 | 較高 | 低 |
選擇決策樹
需要惰性求值或無限序列?
├── 是 → 迭代器
└── 否
├── 需要並發通信?
│ └── 是 → 通道
└── 否
├── 資料量小且需要隨機存取?
│ └── 是 → 切片
└── 否 → 迭代器
在線工具推薦
外部參考
- Go iter包官方文件 — Go標準庫iter包參考
- Go Range Over Func提案 — range over func設計文件
總結
Go 1.24迭代器模式讓Go擁有了原生的、零分配的、可組合的資料管道能力。7種核心模式涵蓋了從基礎遍歷到生產級庫設計的完整鏈路:
- range over func基礎迭代器 — 一切迭代的起點,yield控制流
- Push/Pull迭代器轉換 — iter.Pull讓迭代器可手動控制
- 資料管道組合 — Map/Filter/Reduce鏈式組合,零中間分配
- 惰性求值與無限序列 — 按需計算,處理無限資料流
- 並發迭代器與Fan-out — 多goroutine並行消費
- 迭代器錯誤處理 — RowResult模式優雅傳播錯誤
- 生產級迭代器庫設計 — 零分配、可組合、可終止
迭代器不是通道的替代品,也不是切片的替代品。它們是Go資料處理工具箱中的新成員——當你需要惰性求值、零分配管道組合時,迭代器是最佳選擇。
相關閱讀:
本站提供瀏覽器本地工具,免註冊即可試用 →