CockroachDB分布式SQL实战:2026年从MySQL在线迁移到NewSQL的完整方案

编程语言

MySQL分库分表已经让你精疲力竭了?

分片键选错导致热点、跨分片JOIN性能灾难、分布式事务靠消息队列勉强拼凑、扩容需要停机迁移数据——当你的MySQL集群规模达到TB级,分库分表的运维成本已经远超业务价值。2026年,NewSQL数据库给出了更优雅的答案:CockroachDB,一个原生分布式、强一致性、兼容PostgreSQL协议的SQL数据库。

本文将带你完成从MySQL到CockroachDB的在线零停机迁移,从架构原理到实操步骤,从分布式事务到多区域部署,一篇文章搞定。


CockroachDB架构核心

概念 说明 与MySQL对比
Range 数据按范围分片的基本单位(默认64MB) 类似MySQL的Partition,但自动管理
Replica 每个Range默认3副本,用Raft共识 MySQL MGR的类似机制,但更成熟
Lease Holder Range的读写领导者,处理本地读 类似MySQL的主节点,但粒度更细
DistSQL 分布式SQL引擎,自动并行化查询 MySQL需要手动分片+中间件
HLC 混合逻辑时钟,提供全局时序 MySQL依赖GTID,跨分片无全局时序

分布式事务ACID保证

CockroachDB使用Parallel Commits协议实现分布式事务,无需2PC的协调者单点:

事务流程:
1. 写入意图(Write Intent)到所有涉及Range
2. 并行发送PREPARE和COMMIT到所有Range
3. 任一节点崩溃时,其他节点通过Write Intent恢复
4. 事务延迟 = 1次网络往返(而非2PC的2次)

问题分析:MySQL迁移到CockroachDB的挑战

  1. SQL兼容性:CockroachDB兼容PostgreSQL协议,MySQL语法需要改写
  2. 自增ID:MySQL的AUTO_INCREMENT在分布式下不连续,需改用UUID或SEQUENCE
  3. 外键级联:CockroachDB支持外键但不推荐级联更新/删除(性能问题)
  4. 存储引擎差异:InnoDB的聚簇索引 vs CockroachDB的LSM-tree
  5. 字符集:MySQL的utf8mb4对应CockroachDB的UTF8(默认)

分步实操:MySQL在线迁移

Step 1:安装CockroachDB

# Linux
curl -L https://binaries.cockroachdb.com/cockroach-v24.3.0.linux-amd64.tgz | tar -xz
sudo cp cockroach-v24.3.0.linux-amd64/cockroach /usr/local/bin/

# Docker
docker pull cockroachdb/cockroach:v24.3.0

# 启动单节点(开发测试)
cockroach start-single-node --insecure --listen-addr=localhost:26257 --store=path=/data/cockroach

Step 2:启动生产级集群

# 节点1
cockroach start \
  --insecure \
  --store=path=/data/cockroach1 \
  --listen-addr=192.168.1.10:26257 \
  --http-addr=192.168.1.10:8080 \
  --join=192.168.1.10:26257,192.168.1.11:26257,192.168.1.12:26257 \
  --background

# 节点2、3类似,修改IP和store路径

# 初始化集群(仅首次)
cockroach init --insecure --host=192.168.1.10:26257

Step 3:使用MOLT迁移工具

MOLT(Migrate Off Legacy Technologies)是CockroachDB官方迁移工具,支持MySQL在线迁移。

# 安装MOLT
curl -L https://binaries.cockroachdb.com/molt/v1.0.0/molt_linux_amd64.tar.gz | tar -xz
sudo cp molt /usr/local/bin/

# Step 3.1: 评估迁移可行性
molt assess \
  --source-type=mysql \
  --source="root:password@tcp(mysql-host:3306)/mydb" \
  --target="postgresql://root@cockroach-host:26257/mydb?sslmode=disable"

# Step 3.2: 导出Schema
molt dump-schema \
  --source-type=mysql \
  --source="root:password@tcp(mysql-host:3306)/mydb" \
  --output-dir=./schema

# Step 3.3: 转换Schema(MySQL→CockroachDB)
molt convert-schema \
  --input-dir=./schema \
  --output-dir=./converted-schema

# Step 3.4: 应用Schema到CockroachDB
cockroach sql --insecure --host=cockroach-host < ./converted-schema/schema.sql

# Step 3.5: 在线数据迁移(CDC方式,零停机)
molt migrate-data \
  --source-type=mysql \
  --source="root:password@tcp(mysql-host:3306)/mydb" \
  --target="postgresql://root@cockroach-host:26257/mydb?sslmode=disable" \
  --mode=cdc \
  --batch-size=10000 \
  --concurrency=8

Step 4:SQL语法改写

-- MySQL: AUTO_INCREMENT
CREATE TABLE users (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    name VARCHAR(100)
);

-- CockroachDB: 使用SERIAL(底层是UUID)
CREATE TABLE users (
    id SERIAL PRIMARY KEY,
    name VARCHAR(100)
);

-- 或者使用SEQUENCE
CREATE SEQUENCE user_id_seq START 1 INCREMENT 1;
CREATE TABLE users (
    id BIGINT DEFAULT nextval('user_id_seq') PRIMARY KEY,
    name VARCHAR(100)
);

-- MySQL: LIMIT offset, count
SELECT * FROM orders LIMIT 10, 20;

-- CockroachDB: LIMIT count OFFSET offset
SELECT * FROM orders LIMIT 20 OFFSET 10;

-- MySQL: GROUP_CONCAT
SELECT dept, GROUP_CONCAT(name) FROM employees GROUP BY dept;

-- CockroachDB: STRING_AGG
SELECT dept, STRING_AGG(name, ',') FROM employees GROUP BY dept;

-- MySQL: INSERT ... ON DUPLICATE KEY UPDATE
INSERT INTO users (id, name) VALUES (1, 'Alice')
ON DUPLICATE KEY UPDATE name = VALUES(name);

-- CockroachDB: UPSERT
UPSERT INTO users (id, name) VALUES (1, 'Alice');

完整代码:应用层迁移示例

package main

import (
    "context"
    "database/sql"
    "fmt"
    "log"
    "time"

    _ "github.com/lib/pq"
)

type Order struct {
    ID        int
    UserID    int
    Amount    float64
    Status    string
    CreatedAt time.Time
}

type OrderRepo struct {
    db *sql.DB
}

func NewOrderRepo(connStr string) (*OrderRepo, error) {
    db, err := sql.Open("postgres", connStr)
    if err != nil {
        return nil, fmt.Errorf("open db: %w", err)
    }
    db.SetMaxOpenConns(25)
    db.SetMaxIdleConns(10)
    db.SetConnMaxLifetime(5 * time.Minute)
    return &OrderRepo{db: db}, nil
}

func (r *OrderRepo) CreateOrder(ctx context.Context, order *Order) error {
    query := `
        INSERT INTO orders (user_id, amount, status, created_at)
        VALUES ($1, $2, $3, $4)
        RETURNING id`
    return r.db.QueryRowContext(ctx, query,
        order.UserID, order.Amount, order.Status, order.CreatedAt,
    ).Scan(&order.ID)
}

func (r *OrderRepo) TransferOrder(ctx context.Context, fromUserID, toUserID int, amount float64) error {
    tx, err := r.db.BeginTx(ctx, nil)
    if err != nil {
        return fmt.Errorf("begin tx: %w", err)
    }
    defer tx.Rollback()

    deductQuery := `UPDATE accounts SET balance = balance - $1 WHERE user_id = $2 AND balance >= $1`
    result, err := tx.ExecContext(ctx, deductQuery, amount, fromUserID)
    if err != nil {
        return fmt.Errorf("deduct: %w", err)
    }
    if n, _ := result.RowsAffected(); n == 0 {
        return fmt.Errorf("insufficient balance")
    }

    creditQuery := `UPDATE accounts SET balance = balance + $1 WHERE user_id = $2`
    if _, err := tx.ExecContext(ctx, creditQuery, amount, toUserID); err != nil {
        return fmt.Errorf("credit: %w", err)
    }

    logQuery := `INSERT INTO transfer_logs (from_user, to_user, amount, created_at) VALUES ($1, $2, $3, NOW())`
    if _, err := tx.ExecContext(ctx, logQuery, fromUserID, toUserID, amount); err != nil {
        return fmt.Errorf("log: %w", err)
    }

    return tx.Commit()
}

func (r *OrderRepo) GetOrdersByUser(ctx context.Context, userID int, limit, offset int) ([]Order, error) {
    query := `SELECT id, user_id, amount, status, created_at FROM orders WHERE user_id = $1 ORDER BY created_at DESC LIMIT $2 OFFSET $3`
    rows, err := r.db.QueryContext(ctx, query, userID, limit, offset)
    if err != nil {
        return nil, err
    }
    defer rows.Close()

    var orders []Order
    for rows.Next() {
        var o Order
        if err := rows.Scan(&o.ID, &o.UserID, &o.Amount, &o.Status, &o.CreatedAt); err != nil {
            return nil, err
        }
        orders = append(orders, o)
    }
    return orders, nil
}

func main() {
    repo, err := NewOrderRepo("postgresql://root@localhost:26257/mydb?sslmode=disable")
    if err != nil {
        log.Fatal(err)
    }
    defer repo.db.Close()

    ctx := context.Background()
    err = repo.TransferOrder(ctx, 1, 2, 100.50)
    if err != nil {
        log.Fatal("Transfer failed:", err)
    }
    fmt.Println("Transfer succeeded")
}

多区域部署配置

-- 设置集群区域
ALTER RANGE default CONFIGURE ZONE USING num_replicas = 5, constraints = '{"+us-east-1":2, "+eu-west-1":2, "+ap-southeast-1":1}';

-- 数据库级别区域配置
ALTER DATABASE mydb CONFIGURE ZONE USING num_replicas = 5, constraints = '{"+us-east-1":2, "+eu-west-1":2, "+ap-southeast-1":1}';

-- 表级别:用户表就近读
ALTER TABLE users CONFIGURE ZONE USING num_replicas = 5, lease_preferences = '[[+us-east-1]]';

-- 设置分区(Geo-Partitioned Replicas)
ALTER PARTITION us_east OF TABLE orders CONFIGURE ZONE USING constraints = '{"+us-east-1":2, "+eu-west-1":1}';
ALTER PARTITION eu_west OF TABLE orders CONFIGURE ZONE USING constraints = '{"+eu-west-1":2, "+us-east-1":1}';

避坑指南

坑1:AUTO_INCREMENT改为SERIAL后ID不连续

-- ❌ 期望:1, 2, 3, 4...
-- ✅ 实际:SERIAL底层是unique_rowid(),产生大数如575946408524943361
-- 解决:如需连续ID,用SEQUENCE
CREATE SEQUENCE order_seq;
CREATE TABLE orders (id BIGINT DEFAULT nextval('order_seq') PRIMARY KEY, ...);

坑2:外键级联更新性能灾难

-- ❌ 在分布式下级联更新会触发跨Range事务
CREATE TABLE orders (
    id SERIAL PRIMARY KEY,
    user_id INT REFERENCES users(id) ON UPDATE CASCADE
);

-- ✅ 不使用级联,应用层处理
CREATE TABLE orders (
    id SERIAL PRIMARY KEY,
    user_id INT REFERENCES users(id) ON DELETE RESTRICT
);

坑3:大事务超时

-- ❌ 单事务更新10万行,超过5分钟事务超时
BEGIN;
UPDATE large_table SET status = 'processed' WHERE created_at < '2026-01-01';
COMMIT;

-- ✅ 分批更新
BEGIN;
UPDATE large_table SET status = 'processed' WHERE created_at < '2026-01-01' LIMIT 1000;
COMMIT;
-- 循环执行直到affected rows = 0

坑4:SELECT * FOR UPDATE导致Range锁争用

-- ❌ 锁住整个Range
SELECT * FROM orders WHERE user_id = 1 FOR UPDATE;

-- ✅ 使用AS OF SYSTEM TIME读取历史快照,避免锁
SELECT * FROM orders AS OF SYSTEM TIME '-5s' WHERE user_id = 1;

坑5:忽略Zone Configuration导致读延迟高

-- ❌ 默认3副本随机分布,跨区域读延迟200ms+
-- ✅ 配置lease_preferences让读请求就近
ALTER TABLE users CONFIGURE ZONE USING lease_preferences = '[[+us-east-1]]';

报错排查

序号 报错信息 原因 解决方法
1 ERROR: restart transaction: TransactionRetryWithProtoRefreshError 事务冲突,需重试 使用CRDB的事务重试包装器,或降低并发度
2 ERROR: transaction deadline exceeded 事务执行超过5分钟超时 拆分大事务为小批次,或调整transaction_timeout
3 ERROR: duplicate key value violates unique constraint SERIAL生成的ID冲突(极少发生) 使用UUID或显式SEQUENCE
4 ERROR: foreign key violation 外键约束检查失败 确保引用数据存在,检查插入顺序
5 ERROR: value type tuple doesn't match type of column MySQL的隐式类型转换在CRDB不支持 显式CAST类型转换
6 ERROR: unimplemented: this syntax is not supported MySQL特有语法不支持 查阅兼容性文档,改写为PG语法
7 ERROR: memory budget exceeded 查询中间结果超过内存限制 添加LIMIT、优化JOIN顺序、增大sql.memory.cluster_client
8 ERROR: node is draining 节点正在下线 等待节点完成drain或连接其他节点
9 ERROR: replica not available 副本数不足(节点宕机) 确保存活节点>=quorum,等待副本恢复
10 ERROR: incompatible schema change 在线Schema Change冲突 等待前一个Schema Change完成,或使用ALTER ... EXPERIMENTAL ONLINE

进阶优化

1. 读取历史数据避免锁争用

-- 读取5秒前的快照,不阻塞写入
SELECT * FROM orders AS OF SYSTEM TIME INTERVAL '-5s'
WHERE status = 'pending';

2. 批量插入优化

-- 使用IMPORT而非INSERT批量导入
IMPORT INTO orders (id, user_id, amount, status)
CSV DATA (
    'nfs://data/orders_part1.csv',
    'nfs://data/orders_part2.csv'
) WITH skip = 1;

3. 自动重试事务包装

func RetryTx(ctx context.Context, db *sql.DB, fn func(*sql.Tx) error) error {
    for i := 0; i < 5; i++ {
        tx, err := db.BeginTx(ctx, nil)
        if err != nil {
            return err
        }
        err = fn(tx)
        if err == nil {
            return tx.Commit()
        }
        tx.Rollback()
        if !isRetryableError(err) {
            return err
        }
        time.Sleep(time.Duration(100*(i+1)) * time.Millisecond)
    }
    return fmt.Errorf("max retries exceeded")
}

对比分析

维度 MySQL分库分表 CockroachDB TiDB Spanner
分布式事务 需中间件 原生ACID 原生ACID 原生ACID
SQL兼容 MySQL PostgreSQL MySQL PostgreSQL
在线扩容 需停机迁移 自动Rebalance 自动Rebalance 自动
多区域 需自建 原生支持 有限支持 原生支持
一致性 最终一致 强一致(Serializable) RC/SI 强一致
运维复杂度 低(托管)
成本
开源 ✅(BSL)

总结:从MySQL迁移到CockroachDB不是简单的"换数据库",而是从"分库分表+中间件"到"原生分布式SQL"的架构升级。关键路径:用MOLT工具在线迁移→改写不兼容SQL→配置多区域Zone→分批处理大事务→配置事务重试。CockroachDB的强一致性和自动Rebalance让运维成本降低80%,但前提是理解其分布式事务机制和SQL兼容边界。


在线工具推荐

本站提供浏览器本地工具,免注册即可试用 →

#CockroachDB#分布式SQL#NewSQL#分布式事务#多区域#在线迁移#高可用#一致性