TypeScript Effect 系统:用 Effect-TS 优雅处理副作用

前端工程

为什么 Effect-TS 正在革新 TypeScript 错误处理

在 TypeScript 项目中,异步错误处理一直是个痛点。try/catch 无法捕获异步错误,Promise.catch() 容易遗漏,async/await 让错误处理变得隐式。Effect-TS 通过将副作用建模为值,彻底改变了这一局面。

特性 Effect-TS Promise/async-await fp-ts neverthrow
类型安全错误 ✅ 完全类型化 ❌ unknown ✅ 完全类型化 ✅ 完全类型化
依赖注入 ✅ 内置 Layer ❌ 无 ❌ 需手动 ❌ 无
并发原语 ✅ Fiber ❌ 无原生支持 ❌ 无 ❌ 无
重试/超时 ✅ 内置 ❌ 需手动 ❌ 需手动 ❌ 需手动
可组合性 ✅ 高 ⚠️ 中 ✅ 高 ⚠️ 中
学习曲线 ⚠️ 陡峭 ✅ 低 ❌ 陡峭 ✅ 低
生态成熟度 ✅ 活跃 ✅ 原生 ⚠️ 维护模式 ⚠️ 小众
资源释放 ✅ Scope ❌ finally ❌ 无 ❌ 无

Effect-TS 在 2026 年已经成为 TypeScript 函数式编程的事实标准,其核心思想是:将副作用描述为数据,而非立即执行。这让程序变得可测试、可组合、可推理。


Effect:核心概念与基本操作

Effect 是 Effect-TS 的核心抽象,它描述了一个可能失败的计算。你可以把它理解为"增强版的 Promise"——但它是惰性的、类型安全的、可组合的。

创建 Effect

import { Effect } from "effect";

// 从同步值创建
const syncEffect = Effect.succeed(42);

// 从可能抛出异常的同步代码创建
const riskySync = Effect.sync(() => {
  const data = JSON.parse('{"name":"effect"}');
  return data;
});

// 从异步操作创建
const asyncEffect = Effect.tryPromise({
  try: () => fetch("https://api.example.com/users"),
  catch: (error) => new FetchError(error),
});

// 定义自定义错误类型
class FetchError {
  readonly _tag = "FetchError";
  constructor(readonly cause: unknown) {}
}

class ParseError {
  readonly _tag = "ParseError";
  constructor(readonly cause: unknown) {}
}

链式操作与映射

import { Effect } from "effect";

const program = Effect.gen(function* (_) {
  const response = yield* _(
    Effect.tryPromise({
      try: () => fetch("https://api.example.com/users"),
      catch: (error) => new FetchError(error),
    })
  );

  const data = yield* _(
    Effect.try({
      try: () => response.json(),
      catch: (error) => new ParseError(error),
    })
  );

  return data;
});

// 映射和链式操作
const transformed = pipe(
  Effect.succeed(10),
  Effect.map((n) => n * 2),
  Effect.flatMap((n) => Effect.succeed(n + 5))
);

运行 Effect

import { Effect } from "effect";

const program = Effect.succeed("Hello, Effect-TS!");

// 在 Node.js 中运行
Effect.runPromise(program).then(console.log);
// 输出: Hello, Effect-TS!

// 同步运行(仅限同步 Effect)
Effect.runSync(Effect.succeed(42));
// 输出: 42

// 带 callback 的运行
Effect.runCallback(program, {
  onExit: (exit) => {
    if (exit._tag === "Success") {
      console.log(exit.value);
    } else {
      console.error(exit.cause);
    }
  },
});

Layer:依赖注入系统

Layer 是 Effect-TS 的依赖注入机制,它让你可以将服务的创建与使用完全分离。这在测试和模块化中极为强大。

import { Effect, Layer, Context } from "effect";

// 定义服务接口
class Database extends Context.Tag("Database")<
  Database,
  {
    readonly query: (sql: string) => Effect.Effect<unknown[], DbError>;
    readonly connect: () => Effect.Effect<void, DbError>;
  }
>() {}

class DbError {
  readonly _tag = "DbError";
  constructor(readonly message: string) {}
}

// 实现 PostgreSQL Layer
const PostgresLive = Layer.succeed(Database, {
  query: (sql: string) =>
    Effect.tryPromise({
      try: () => pgPool.query(sql),
      catch: (error) => new DbError(String(error)),
    }),
  connect: () =>
    Effect.tryPromise({
      try: () => pgPool.connect(),
      catch: (error) => new DbError(String(error)),
    }),
});

// 实现 Mock Layer(用于测试)
const DatabaseMock = Layer.succeed(Database, {
  query: (sql: string) => Effect.succeed([{ id: 1, name: "test" }]),
  connect: () => Effect.succeed(void 0),
});

// 使用服务
const getUserById = (id: number) =>
  Effect.gen(function* (_) {
    const db = yield* _(Database);
    const rows = yield* _(db.query(`SELECT * FROM users WHERE id = ${id}`));
    return rows[0];
  });

// 生产环境使用真实 Layer
const program = getUserById(1).pipe(
  Effect.provide(PostgresLive)
);

// 测试环境使用 Mock Layer
const testProgram = getUserById(1).pipe(
  Effect.provide(DatabaseMock)
);

Service:服务定义与组合

Service 是 Effect-TS 中定义可注入服务的标准模式,通过 Context.Tag 实现。

import { Effect, Context, Layer } from "effect";

// 定义 Logger 服务
class Logger extends Context.Tag("Logger")<
  Logger,
  {
    readonly info: (msg: string) => Effect.Effect<void>;
    readonly error: (msg: string) => Effect.Effect<void>;
  }
>() {}

// 定义 Config 服务
class AppConfig extends Context.Tag("AppConfig")<
  AppConfig,
  {
    readonly apiUrl: string;
    readonly timeout: number;
  }
>() {}

// Logger 实现
const LoggerLive = Layer.succeed(Logger, {
  info: (msg) => Effect.sync(() => console.log(`[INFO] ${msg}`)),
  error: (msg) => Effect.sync(() => console.error(`[ERROR] ${msg}`)),
});

// Config 实现
const ConfigLive = Layer.succeed(AppConfig, {
  apiUrl: "https://api.example.com",
  timeout: 5000,
});

// 组合多个服务
const AppLive = Layer.merge(LoggerLive, ConfigLive);

// 业务逻辑使用多个服务
const fetchWithLogging = Effect.gen(function* (_) {
  const logger = yield* _(Logger);
  const config = yield* _(AppConfig);

  yield* _(logger.info(`Fetching from ${config.apiUrl}`));

  const result = yield* _(
    Effect.tryPromise({
      try: () => fetch(config.apiUrl),
      catch: (error) => new FetchError(error),
    })
  );

  yield* _(logger.info("Fetch completed"));
  return result;
}).pipe(Effect.provide(AppLive));

Fiber:并发编程

Fiber 是 Effect-TS 的轻量级并发单元,类似协程但更强大,支持中断、监督和结构化并发。

import { Effect, Fiber } from "effect";

// 并行执行多个 Effect
const parallelExample = Effect.gen(function* (_) {
  const [users, posts, comments] = yield* _(
    Effect.all(
      [
        Effect.tryPromise({
          try: () => fetch("/api/users"),
          catch: (e) => new FetchError(e),
        }),
        Effect.tryPromise({
          try: () => fetch("/api/posts"),
          catch: (e) => new FetchError(e),
        }),
        Effect.tryPromise({
          try: () => fetch("/api/comments"),
          catch: (e) => new FetchError(e),
        }),
      ],
      { concurrency: "unbounded" }
    )
  );

  return { users, posts, comments };
});

// 竞速:取最快返回的结果
const raceExample = Effect.gen(function* (_) {
  const result = yield* _(
    Effect.race(
      Effect.tryPromise({
        try: () => fetch("/api/cdn1/data"),
        catch: (e) => new FetchError(e),
      }),
      Effect.tryPromise({
        try: () => fetch("/api/cdn2/data"),
        catch: (e) => new FetchError(e),
      })
    )
  );
  return result;
});

// 手动 Fiber 控制
const fiberExample = Effect.gen(function* (_) {
  const fiber = yield* _(
    Effect.fork(
      Effect.tryPromise({
        try: () => longRunningTask(),
        catch: (e) => new TaskError(e),
      })
    )
  );

  // 做其他事情...
  yield* _(Effect.sleep("2 seconds"));

  // 等待 Fiber 完成
  const result = yield* _(Fiber.join(fiber));
  return result;
});

// 结构化并发:父 Effect 中断时子 Fiber 自动中断
const structuredConcurrency = Effect.gen(function* (_) {
  yield* _(
    Effect.fork(
      Effect.gen(function* (_) {
        yield* _(Effect.sleep("10 seconds"));
        console.log("这不会执行,因为父 Effect 会先结束");
      })
    )
  );

  yield* _(Effect.sleep("1 second"));
  // 父 Effect 结束,子 Fiber 自动中断
});

从回调到 Promise 到 Effect:完整迁移示例

回调地狱时代

function getUserCallback(id: number, callback: (err: Error | null, user?: User) => void) {
  db.query("SELECT * FROM users WHERE id = ?", [id], (err, rows) => {
    if (err) return callback(err);
    if (rows.length === 0) return callback(new Error("User not found"));
    callback(null, rows[0]);
  });
}

function getOrdersCallback(userId: number, callback: (err: Error | null, orders?: Order[]) => void) {
  db.query("SELECT * FROM orders WHERE userId = ?", [userId], (err, rows) => {
    if (err) return callback(err);
    callback(null, rows);
  });
}

// 使用:回调地狱
getUserCallback(1, (err, user) => {
  if (err) { console.error(err); return; }
  getOrdersCallback(user!.id, (err, orders) => {
    if (err) { console.error(err); return; }
    console.log({ user, orders });
  });
});

Promise/async-await 时代

async function getUser(id: number): Promise<User> {
  const rows = await db.query("SELECT * FROM users WHERE id = ?", [id]);
  if (rows.length === 0) throw new Error("User not found");
  return rows[0];
}

async function getOrders(userId: number): Promise<Order[]> {
  return db.query("SELECT * FROM orders WHERE userId = ?", [userId]);
}

// 使用:错误类型丢失
async function main() {
  try {
    const user = await getUser(1);
    const orders = await getOrders(user.id);
    console.log({ user, orders });
  } catch (error) {
    // error 是 unknown,无法区分是用户不存在还是数据库错误
    console.error(error);
  }
}

Effect-TS 时代

class UserNotFoundError {
  readonly _tag = "UserNotFoundError";
  constructor(readonly userId: number) {}
}

class DbQueryError {
  readonly _tag = "DbQueryError";
  constructor(readonly message: string, readonly cause: unknown) {}
}

const getUser = (id: number) =>
  Effect.gen(function* (_) {
    const db = yield* _(Database);
    const rows = yield* _(
      db.query("SELECT * FROM users WHERE id = ?", [id]).pipe(
        Effect.mapError((e) => new DbQueryError("Query failed", e))
      )
    );
    if (rows.length === 0) {
      return yield* _(Effect.fail(new UserNotFoundError(id)));
    }
    return rows[0] as User;
  });

const getOrders = (userId: number) =>
  Effect.gen(function* (_) {
    const db = yield* _(Database);
    return yield* _(db.query("SELECT * FROM orders WHERE userId = ?", [userId]));
  });

// 使用:完全类型安全的错误处理
const program = Effect.gen(function* (_) {
  const user = yield* _(getUser(1));
  const orders = yield* _(getOrders(user.id));
  return { user, orders };
}).pipe(
  Effect.catchTag("UserNotFoundError", (e) =>
    Effect.succeed({ error: `用户 ${e.userId} 不存在` })
  ),
  Effect.catchTag("DbQueryError", (e) =>
    Effect.succeed({ error: `数据库错误: ${e.message}` })
  )
);

错误处理模式

catchAll:捕获所有错误

import { Effect } from "effect";

const resilient = pipe(
  riskyOperation(),
  Effect.catchAll((error) =>
    Effect.succeed(`恢复自错误: ${String(error)}`)
  )
);

catchTag:按类型捕获错误

import { Effect } from "effect";

const taggedHandler = pipe(
  program,
  Effect.catchTag("FetchError", (e) =>
    Effect.succeed({ fallback: true, reason: e.cause })
  ),
  Effect.catchTag("ParseError", (e) =>
    Effect.succeed({ fallback: true, reason: e.cause })
  )
);

retry:指数退避重试

import { Effect, Schedule } from "effect";

const retryWithBackoff = pipe(
  fetchFromApi(),
  Effect.retry(
    Schedule.exponential("1 second").pipe(
      Schedule.compose(Schedule.recurs(5)),
      Schedule.tap((attempt) =>
        Effect.sync(() => console.log(`第 ${attempt} 次重试...`))
      )
    )
  )
);

// 带条件的重试
const conditionalRetry = pipe(
  fetchFromApi(),
  Effect.retry(
    Schedule.exponential("500 millis").pipe(
      Schedule.whileOutput((delay) => delay < 30_000),
      Schedule.whileInput((error) => error._tag === "NetworkError")
    )
  )
);

timeout:超时控制

import { Effect } from "effect";

class TimeoutError {
  readonly _tag = "TimeoutError";
}

const withTimeout = pipe(
  longRunningOperation(),
  Effect.timeout("5 seconds"),
  Effect.mapError(() => new TimeoutError())
);

// 带超时和回退
const withTimeoutFallback = pipe(
  longRunningOperation(),
  Effect.timeoutFail({
    duration: "3 seconds",
    onTimeout: () => new TimeoutError(),
  }),
  Effect.catchTag("TimeoutError", () =>
    Effect.succeed(getCachedResult())
  )
);

依赖注入实战:完整示例

import { Effect, Context, Layer } from "effect";

// 定义错误类型
class HttpError {
  readonly _tag = "HttpError";
  constructor(readonly status: number, readonly message: string) {}
}

// 定义 HttpClient 服务
class HttpClient extends Context.Tag("HttpClient")<
  HttpClient,
  {
    readonly get: (url: string) => Effect.Effect<Response, HttpError>;
    readonly post: (url: string, body: unknown) => Effect.Effect<Response, HttpError>;
  }
>() {}

// 定义 Cache 服务
class Cache extends Context.Tag("Cache")<
  Cache,
  {
    readonly get: (key: string) => Effect.Effect<unknown | null>;
    readonly set: (key: string, value: unknown, ttl: number) => Effect.Effect<void>;
  }
>() {}

// 真实 HttpClient 实现
const HttpClientLive = Layer.succeed(HttpClient, {
  get: (url) =>
    Effect.tryPromise({
      try: () => fetch(url),
      catch: (error) => new HttpError(0, String(error)),
    }).pipe(
      Effect.flatMap((res) =>
        res.ok
          ? Effect.succeed(res)
          : Effect.fail(new HttpError(res.status, res.statusText))
      )
    ),
  post: (url, body) =>
    Effect.tryPromise({
      try: () =>
        fetch(url, {
          method: "POST",
          headers: { "Content-Type": "application/json" },
          body: JSON.stringify(body),
        }),
      catch: (error) => new HttpError(0, String(error)),
    }).pipe(
      Effect.flatMap((res) =>
        res.ok
          ? Effect.succeed(res)
          : Effect.fail(new HttpError(res.status, res.statusText))
      )
    ),
});

// 内存 Cache 实现
const InMemoryCacheLive = Layer.effect(
  Cache,
  Effect.sync(() => {
    const store = new Map<string, { value: unknown; expires: number }>();
    return {
      get: (key) =>
        Effect.sync(() => {
          const entry = store.get(key);
          if (!entry || entry.expires < Date.now()) return null;
          return entry.value;
        }),
      set: (key, value, ttl) =>
        Effect.sync(() => {
          store.set(key, { value, expires: Date.now() + ttl });
        }),
    };
  })
);

// 业务逻辑:带缓存的 API 调用
const fetchWithCache = (url: string) =>
  Effect.gen(function* (_) {
    const cache = yield* _(Cache);
    const http = yield* _(HttpClient);

    const cached = yield* _(cache.get(url));
    if (cached) return cached;

    const response = yield* _(http.get(url));
    const data = yield* _(
      Effect.try({
        try: () => response.json(),
        catch: (e) => new HttpError(0, `Parse error: ${String(e)}`),
      })
    );

    yield* _(cache.set(url, data, 60_000));
    return data;
  });

// 组装应用
const AppLayer = Layer.merge(HttpClientLive, InMemoryCacheLive);

const main = fetchWithCache("https://api.example.com/data").pipe(
  Effect.provide(AppLayer),
  Effect.catchTag("HttpError", (e) =>
    Effect.succeed({ error: `HTTP ${e.status}: ${e.message}` })
  )
);

并发实战:Fiber 完整示例

import { Effect, Fiber, Ref, Queue } from "effect";

// 生产者-消费者模式
const producerConsumer = Effect.gen(function* (_) {
  const queue = yield* _(Queue.bounded<string>(10));

  const producer = Effect.gen(function* (_) {
    for (let i = 0; i < 5; i++) {
      yield* _(Queue.offer(queue, `消息-${i}`));
      yield* _(Effect.sleep("100 millis"));
    }
    yield* _(Queue.offer(queue, "DONE"));
  });

  const consumer = Effect.gen(function* (_) {
    while (true) {
      const msg = yield* _(Queue.take(queue));
      if (msg === "DONE") break;
      console.log(`消费: ${msg}`);
    }
  });

  yield* _(
    Effect.all([producer, consumer], { concurrency: 2 })
  );
});

// 并行请求带限流
const fetchWithConcurrencyLimit = (urls: string[]) =>
  Effect.gen(function* (_) {
    const semaphore = yield* _(Effect.makeSemaphore(3));

    const results = yield* _(
      Effect.all(
        urls.map((url) =>
          semaphore.withPermits(1)(
            Effect.tryPromise({
              try: () => fetch(url),
              catch: (e) => new FetchError(e),
            })
          )
        ),
        { concurrency: "unbounded" }
      )
    );

    return results;
  });

// Fiber 监督
const supervisedExample = Effect.gen(function* (_) {
  const fiber = yield* _(
    Effect.fork(
      Effect.gen(function* (_) {
        yield* _(Effect.sleep("5 seconds"));
        return "完成";
      })
    )
  );

  // 设置监督策略
  yield* _(
    Fiber.interrupt(fiber).pipe(
      Effect.race(Fiber.join(fiber))
    )
  );
});

5 个常见陷阱与解决方案

1. 忘记运行 Effect

// ❌ 错误:Effect 是惰性的,不运行就不会执行
const program = Effect.succeed(42).pipe(Effect.map(console.log));

// ✅ 正确:必须显式运行
Effect.runPromise(program);

2. 在 Effect 中混用 Promise

// ❌ 错误:直接使用 async/await 破坏了 Effect 的错误追踪
const bad = Effect.async(() => {
  const data = await fetch("/api"); // 语法错误,不能在非 async 函数中使用 await
});

// ✅ 正确:使用 Effect.tryPromise 包装
const good = Effect.tryPromise({
  try: () => fetch("/api"),
  catch: (e) => new FetchError(e),
});

3. 错误类型未标记

// ❌ 错误:使用原始 Error 无法使用 catchTag
const bad = Effect.fail(new Error("something went wrong"));

// ✅ 正确:使用带 _tag 的自定义错误
class AppError {
  readonly _tag = "AppError";
  constructor(readonly message: string) {}
}
const good = Effect.fail(new AppError("something went wrong"));

4. Layer 未提供

// ❌ 错误:使用了 Database 但没有提供 Layer
const program = Effect.gen(function* (_) {
  const db = yield* _(Database);
  return yield* _(db.query("SELECT 1"));
});
Effect.runPromise(program); // 运行时报错:缺少 Database 的 Layer

// ✅ 正确:提供 Layer
Effect.runPromise(program.pipe(Effect.provide(PostgresLive)));

5. 忽略 Fiber 中断

// ❌ 错误:不可中断的 Effect 会导致 Fiber 无法正常关闭
const bad = Effect.sync(() => {
  while (true) {} // 死循环,无法中断
});

// ✅ 正确:使用 Effect.yieldNow 让出控制权
const good = Effect.gen(function* (_) {
  while (true) {
    yield* _(Effect.yieldNow());
  }
});

10 个错误排查项

# 错误现象 原因 解决方案
1 Error: Service not found: Database 未提供对应 Layer 使用 Effect.provide() 注入 Layer
2 Uncaught TypeError: Effect is not a function 导入路径错误 检查 import { Effect } from "effect"
3 Effect 不执行 忘记调用 Effect.runPromise Effect 是惰性的,必须显式运行
4 catchTag 无法匹配错误 错误类缺少 _tag 属性 确保自定义错误有 readonly _tag
5 类型推断失败,出现 never 错误类型与 catch 不匹配 检查 Effect 的错误类型参数
6 Fiber interrupted 意外中断 父 Fiber 结束导致子 Fiber 中断 使用 Effect.forkDaemon 创建守护 Fiber
7 内存泄漏 Fiber 未正确清理 使用结构化并发,避免 forkDaemon 滥用
8 Schedule 重试不生效 重试条件不满足 检查 whileInput / whileOutput 条件
9 Layer 循环依赖 两个 Layer 相互依赖 重构依赖关系,使用 Layer.provide 拆分
10 timeout 不生效 超时时间单位错误 使用字符串格式 "5 seconds" 而非数字

Effect-TS vs Promise/async-await 对比

维度 Effect-TS Promise/async-await
错误类型 编译期完全类型化 unknown,需手动断言
惰性求值 ✅ 延迟执行 ❌ 立即执行
可组合性 ✅ pipe + gen ⚠️ 需手动包装
依赖注入 ✅ Layer 系统 ❌ 无内置方案
并发控制 ✅ Fiber + Semaphore ❌ 无原生支持
重试策略 ✅ Schedule ❌ 需手动实现
超时控制 ✅ 内置 ❌ 需 Promise.race hack
资源安全 ✅ Scope + acquireRelease ⚠️ finally 不保证
中断支持 ✅ Fiber.interrupt ❌ AbortController
可测试性 ✅ 替换 Layer 即可 ⚠️ 需要 mock 模块
调试体验 ⚠️ 堆栈较深 ✅ 原生堆栈清晰
包体积 ⚠️ ~80KB min ✅ 0KB(原生)

推荐工具

在 Effect-TS 开发中,以下在线工具可以帮助你提升效率:


总结:Effect-TS 通过将副作用建模为值,为 TypeScript 带来了前所未有的类型安全和可组合性。掌握 Effect、Layer、Service、Fiber 四大核心概念,配合 catchTag、retry、timeout 等错误处理模式,你就能在 2026 年写出真正健壮的函数式 TypeScript 代码。虽然学习曲线陡峭,但一旦掌握,你将再也无法回到裸 Promise 的时代。

本站提供浏览器本地工具,免注册即可试用 →

#TypeScript Effect#Effect-TS#副作用处理#函数式编程#2026