TypeScript Effect 系统:用 Effect-TS 优雅处理副作用
前端工程
为什么 Effect-TS 正在革新 TypeScript 错误处理
在 TypeScript 项目中,异步错误处理一直是个痛点。try/catch 无法捕获异步错误,Promise 的 .catch() 容易遗漏,async/await 让错误处理变得隐式。Effect-TS 通过将副作用建模为值,彻底改变了这一局面。
| 特性 | Effect-TS | Promise/async-await | fp-ts | neverthrow |
|---|---|---|---|---|
| 类型安全错误 | ✅ 完全类型化 | ❌ unknown | ✅ 完全类型化 | ✅ 完全类型化 |
| 依赖注入 | ✅ 内置 Layer | ❌ 无 | ❌ 需手动 | ❌ 无 |
| 并发原语 | ✅ Fiber | ❌ 无原生支持 | ❌ 无 | ❌ 无 |
| 重试/超时 | ✅ 内置 | ❌ 需手动 | ❌ 需手动 | ❌ 需手动 |
| 可组合性 | ✅ 高 | ⚠️ 中 | ✅ 高 | ⚠️ 中 |
| 学习曲线 | ⚠️ 陡峭 | ✅ 低 | ❌ 陡峭 | ✅ 低 |
| 生态成熟度 | ✅ 活跃 | ✅ 原生 | ⚠️ 维护模式 | ⚠️ 小众 |
| 资源释放 | ✅ Scope | ❌ finally | ❌ 无 | ❌ 无 |
Effect-TS 在 2026 年已经成为 TypeScript 函数式编程的事实标准,其核心思想是:将副作用描述为数据,而非立即执行。这让程序变得可测试、可组合、可推理。
Effect:核心概念与基本操作
Effect 是 Effect-TS 的核心抽象,它描述了一个可能失败的计算。你可以把它理解为"增强版的 Promise"——但它是惰性的、类型安全的、可组合的。
创建 Effect
import { Effect } from "effect";
// 从同步值创建
const syncEffect = Effect.succeed(42);
// 从可能抛出异常的同步代码创建
const riskySync = Effect.sync(() => {
const data = JSON.parse('{"name":"effect"}');
return data;
});
// 从异步操作创建
const asyncEffect = Effect.tryPromise({
try: () => fetch("https://api.example.com/users"),
catch: (error) => new FetchError(error),
});
// 定义自定义错误类型
class FetchError {
readonly _tag = "FetchError";
constructor(readonly cause: unknown) {}
}
class ParseError {
readonly _tag = "ParseError";
constructor(readonly cause: unknown) {}
}
链式操作与映射
import { Effect } from "effect";
const program = Effect.gen(function* (_) {
const response = yield* _(
Effect.tryPromise({
try: () => fetch("https://api.example.com/users"),
catch: (error) => new FetchError(error),
})
);
const data = yield* _(
Effect.try({
try: () => response.json(),
catch: (error) => new ParseError(error),
})
);
return data;
});
// 映射和链式操作
const transformed = pipe(
Effect.succeed(10),
Effect.map((n) => n * 2),
Effect.flatMap((n) => Effect.succeed(n + 5))
);
运行 Effect
import { Effect } from "effect";
const program = Effect.succeed("Hello, Effect-TS!");
// 在 Node.js 中运行
Effect.runPromise(program).then(console.log);
// 输出: Hello, Effect-TS!
// 同步运行(仅限同步 Effect)
Effect.runSync(Effect.succeed(42));
// 输出: 42
// 带 callback 的运行
Effect.runCallback(program, {
onExit: (exit) => {
if (exit._tag === "Success") {
console.log(exit.value);
} else {
console.error(exit.cause);
}
},
});
Layer:依赖注入系统
Layer 是 Effect-TS 的依赖注入机制,它让你可以将服务的创建与使用完全分离。这在测试和模块化中极为强大。
import { Effect, Layer, Context } from "effect";
// 定义服务接口
class Database extends Context.Tag("Database")<
Database,
{
readonly query: (sql: string) => Effect.Effect<unknown[], DbError>;
readonly connect: () => Effect.Effect<void, DbError>;
}
>() {}
class DbError {
readonly _tag = "DbError";
constructor(readonly message: string) {}
}
// 实现 PostgreSQL Layer
const PostgresLive = Layer.succeed(Database, {
query: (sql: string) =>
Effect.tryPromise({
try: () => pgPool.query(sql),
catch: (error) => new DbError(String(error)),
}),
connect: () =>
Effect.tryPromise({
try: () => pgPool.connect(),
catch: (error) => new DbError(String(error)),
}),
});
// 实现 Mock Layer(用于测试)
const DatabaseMock = Layer.succeed(Database, {
query: (sql: string) => Effect.succeed([{ id: 1, name: "test" }]),
connect: () => Effect.succeed(void 0),
});
// 使用服务
const getUserById = (id: number) =>
Effect.gen(function* (_) {
const db = yield* _(Database);
const rows = yield* _(db.query(`SELECT * FROM users WHERE id = ${id}`));
return rows[0];
});
// 生产环境使用真实 Layer
const program = getUserById(1).pipe(
Effect.provide(PostgresLive)
);
// 测试环境使用 Mock Layer
const testProgram = getUserById(1).pipe(
Effect.provide(DatabaseMock)
);
Service:服务定义与组合
Service 是 Effect-TS 中定义可注入服务的标准模式,通过 Context.Tag 实现。
import { Effect, Context, Layer } from "effect";
// 定义 Logger 服务
class Logger extends Context.Tag("Logger")<
Logger,
{
readonly info: (msg: string) => Effect.Effect<void>;
readonly error: (msg: string) => Effect.Effect<void>;
}
>() {}
// 定义 Config 服务
class AppConfig extends Context.Tag("AppConfig")<
AppConfig,
{
readonly apiUrl: string;
readonly timeout: number;
}
>() {}
// Logger 实现
const LoggerLive = Layer.succeed(Logger, {
info: (msg) => Effect.sync(() => console.log(`[INFO] ${msg}`)),
error: (msg) => Effect.sync(() => console.error(`[ERROR] ${msg}`)),
});
// Config 实现
const ConfigLive = Layer.succeed(AppConfig, {
apiUrl: "https://api.example.com",
timeout: 5000,
});
// 组合多个服务
const AppLive = Layer.merge(LoggerLive, ConfigLive);
// 业务逻辑使用多个服务
const fetchWithLogging = Effect.gen(function* (_) {
const logger = yield* _(Logger);
const config = yield* _(AppConfig);
yield* _(logger.info(`Fetching from ${config.apiUrl}`));
const result = yield* _(
Effect.tryPromise({
try: () => fetch(config.apiUrl),
catch: (error) => new FetchError(error),
})
);
yield* _(logger.info("Fetch completed"));
return result;
}).pipe(Effect.provide(AppLive));
Fiber:并发编程
Fiber 是 Effect-TS 的轻量级并发单元,类似协程但更强大,支持中断、监督和结构化并发。
import { Effect, Fiber } from "effect";
// 并行执行多个 Effect
const parallelExample = Effect.gen(function* (_) {
const [users, posts, comments] = yield* _(
Effect.all(
[
Effect.tryPromise({
try: () => fetch("/api/users"),
catch: (e) => new FetchError(e),
}),
Effect.tryPromise({
try: () => fetch("/api/posts"),
catch: (e) => new FetchError(e),
}),
Effect.tryPromise({
try: () => fetch("/api/comments"),
catch: (e) => new FetchError(e),
}),
],
{ concurrency: "unbounded" }
)
);
return { users, posts, comments };
});
// 竞速:取最快返回的结果
const raceExample = Effect.gen(function* (_) {
const result = yield* _(
Effect.race(
Effect.tryPromise({
try: () => fetch("/api/cdn1/data"),
catch: (e) => new FetchError(e),
}),
Effect.tryPromise({
try: () => fetch("/api/cdn2/data"),
catch: (e) => new FetchError(e),
})
)
);
return result;
});
// 手动 Fiber 控制
const fiberExample = Effect.gen(function* (_) {
const fiber = yield* _(
Effect.fork(
Effect.tryPromise({
try: () => longRunningTask(),
catch: (e) => new TaskError(e),
})
)
);
// 做其他事情...
yield* _(Effect.sleep("2 seconds"));
// 等待 Fiber 完成
const result = yield* _(Fiber.join(fiber));
return result;
});
// 结构化并发:父 Effect 中断时子 Fiber 自动中断
const structuredConcurrency = Effect.gen(function* (_) {
yield* _(
Effect.fork(
Effect.gen(function* (_) {
yield* _(Effect.sleep("10 seconds"));
console.log("这不会执行,因为父 Effect 会先结束");
})
)
);
yield* _(Effect.sleep("1 second"));
// 父 Effect 结束,子 Fiber 自动中断
});
从回调到 Promise 到 Effect:完整迁移示例
回调地狱时代
function getUserCallback(id: number, callback: (err: Error | null, user?: User) => void) {
db.query("SELECT * FROM users WHERE id = ?", [id], (err, rows) => {
if (err) return callback(err);
if (rows.length === 0) return callback(new Error("User not found"));
callback(null, rows[0]);
});
}
function getOrdersCallback(userId: number, callback: (err: Error | null, orders?: Order[]) => void) {
db.query("SELECT * FROM orders WHERE userId = ?", [userId], (err, rows) => {
if (err) return callback(err);
callback(null, rows);
});
}
// 使用:回调地狱
getUserCallback(1, (err, user) => {
if (err) { console.error(err); return; }
getOrdersCallback(user!.id, (err, orders) => {
if (err) { console.error(err); return; }
console.log({ user, orders });
});
});
Promise/async-await 时代
async function getUser(id: number): Promise<User> {
const rows = await db.query("SELECT * FROM users WHERE id = ?", [id]);
if (rows.length === 0) throw new Error("User not found");
return rows[0];
}
async function getOrders(userId: number): Promise<Order[]> {
return db.query("SELECT * FROM orders WHERE userId = ?", [userId]);
}
// 使用:错误类型丢失
async function main() {
try {
const user = await getUser(1);
const orders = await getOrders(user.id);
console.log({ user, orders });
} catch (error) {
// error 是 unknown,无法区分是用户不存在还是数据库错误
console.error(error);
}
}
Effect-TS 时代
class UserNotFoundError {
readonly _tag = "UserNotFoundError";
constructor(readonly userId: number) {}
}
class DbQueryError {
readonly _tag = "DbQueryError";
constructor(readonly message: string, readonly cause: unknown) {}
}
const getUser = (id: number) =>
Effect.gen(function* (_) {
const db = yield* _(Database);
const rows = yield* _(
db.query("SELECT * FROM users WHERE id = ?", [id]).pipe(
Effect.mapError((e) => new DbQueryError("Query failed", e))
)
);
if (rows.length === 0) {
return yield* _(Effect.fail(new UserNotFoundError(id)));
}
return rows[0] as User;
});
const getOrders = (userId: number) =>
Effect.gen(function* (_) {
const db = yield* _(Database);
return yield* _(db.query("SELECT * FROM orders WHERE userId = ?", [userId]));
});
// 使用:完全类型安全的错误处理
const program = Effect.gen(function* (_) {
const user = yield* _(getUser(1));
const orders = yield* _(getOrders(user.id));
return { user, orders };
}).pipe(
Effect.catchTag("UserNotFoundError", (e) =>
Effect.succeed({ error: `用户 ${e.userId} 不存在` })
),
Effect.catchTag("DbQueryError", (e) =>
Effect.succeed({ error: `数据库错误: ${e.message}` })
)
);
错误处理模式
catchAll:捕获所有错误
import { Effect } from "effect";
const resilient = pipe(
riskyOperation(),
Effect.catchAll((error) =>
Effect.succeed(`恢复自错误: ${String(error)}`)
)
);
catchTag:按类型捕获错误
import { Effect } from "effect";
const taggedHandler = pipe(
program,
Effect.catchTag("FetchError", (e) =>
Effect.succeed({ fallback: true, reason: e.cause })
),
Effect.catchTag("ParseError", (e) =>
Effect.succeed({ fallback: true, reason: e.cause })
)
);
retry:指数退避重试
import { Effect, Schedule } from "effect";
const retryWithBackoff = pipe(
fetchFromApi(),
Effect.retry(
Schedule.exponential("1 second").pipe(
Schedule.compose(Schedule.recurs(5)),
Schedule.tap((attempt) =>
Effect.sync(() => console.log(`第 ${attempt} 次重试...`))
)
)
)
);
// 带条件的重试
const conditionalRetry = pipe(
fetchFromApi(),
Effect.retry(
Schedule.exponential("500 millis").pipe(
Schedule.whileOutput((delay) => delay < 30_000),
Schedule.whileInput((error) => error._tag === "NetworkError")
)
)
);
timeout:超时控制
import { Effect } from "effect";
class TimeoutError {
readonly _tag = "TimeoutError";
}
const withTimeout = pipe(
longRunningOperation(),
Effect.timeout("5 seconds"),
Effect.mapError(() => new TimeoutError())
);
// 带超时和回退
const withTimeoutFallback = pipe(
longRunningOperation(),
Effect.timeoutFail({
duration: "3 seconds",
onTimeout: () => new TimeoutError(),
}),
Effect.catchTag("TimeoutError", () =>
Effect.succeed(getCachedResult())
)
);
依赖注入实战:完整示例
import { Effect, Context, Layer } from "effect";
// 定义错误类型
class HttpError {
readonly _tag = "HttpError";
constructor(readonly status: number, readonly message: string) {}
}
// 定义 HttpClient 服务
class HttpClient extends Context.Tag("HttpClient")<
HttpClient,
{
readonly get: (url: string) => Effect.Effect<Response, HttpError>;
readonly post: (url: string, body: unknown) => Effect.Effect<Response, HttpError>;
}
>() {}
// 定义 Cache 服务
class Cache extends Context.Tag("Cache")<
Cache,
{
readonly get: (key: string) => Effect.Effect<unknown | null>;
readonly set: (key: string, value: unknown, ttl: number) => Effect.Effect<void>;
}
>() {}
// 真实 HttpClient 实现
const HttpClientLive = Layer.succeed(HttpClient, {
get: (url) =>
Effect.tryPromise({
try: () => fetch(url),
catch: (error) => new HttpError(0, String(error)),
}).pipe(
Effect.flatMap((res) =>
res.ok
? Effect.succeed(res)
: Effect.fail(new HttpError(res.status, res.statusText))
)
),
post: (url, body) =>
Effect.tryPromise({
try: () =>
fetch(url, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(body),
}),
catch: (error) => new HttpError(0, String(error)),
}).pipe(
Effect.flatMap((res) =>
res.ok
? Effect.succeed(res)
: Effect.fail(new HttpError(res.status, res.statusText))
)
),
});
// 内存 Cache 实现
const InMemoryCacheLive = Layer.effect(
Cache,
Effect.sync(() => {
const store = new Map<string, { value: unknown; expires: number }>();
return {
get: (key) =>
Effect.sync(() => {
const entry = store.get(key);
if (!entry || entry.expires < Date.now()) return null;
return entry.value;
}),
set: (key, value, ttl) =>
Effect.sync(() => {
store.set(key, { value, expires: Date.now() + ttl });
}),
};
})
);
// 业务逻辑:带缓存的 API 调用
const fetchWithCache = (url: string) =>
Effect.gen(function* (_) {
const cache = yield* _(Cache);
const http = yield* _(HttpClient);
const cached = yield* _(cache.get(url));
if (cached) return cached;
const response = yield* _(http.get(url));
const data = yield* _(
Effect.try({
try: () => response.json(),
catch: (e) => new HttpError(0, `Parse error: ${String(e)}`),
})
);
yield* _(cache.set(url, data, 60_000));
return data;
});
// 组装应用
const AppLayer = Layer.merge(HttpClientLive, InMemoryCacheLive);
const main = fetchWithCache("https://api.example.com/data").pipe(
Effect.provide(AppLayer),
Effect.catchTag("HttpError", (e) =>
Effect.succeed({ error: `HTTP ${e.status}: ${e.message}` })
)
);
并发实战:Fiber 完整示例
import { Effect, Fiber, Ref, Queue } from "effect";
// 生产者-消费者模式
const producerConsumer = Effect.gen(function* (_) {
const queue = yield* _(Queue.bounded<string>(10));
const producer = Effect.gen(function* (_) {
for (let i = 0; i < 5; i++) {
yield* _(Queue.offer(queue, `消息-${i}`));
yield* _(Effect.sleep("100 millis"));
}
yield* _(Queue.offer(queue, "DONE"));
});
const consumer = Effect.gen(function* (_) {
while (true) {
const msg = yield* _(Queue.take(queue));
if (msg === "DONE") break;
console.log(`消费: ${msg}`);
}
});
yield* _(
Effect.all([producer, consumer], { concurrency: 2 })
);
});
// 并行请求带限流
const fetchWithConcurrencyLimit = (urls: string[]) =>
Effect.gen(function* (_) {
const semaphore = yield* _(Effect.makeSemaphore(3));
const results = yield* _(
Effect.all(
urls.map((url) =>
semaphore.withPermits(1)(
Effect.tryPromise({
try: () => fetch(url),
catch: (e) => new FetchError(e),
})
)
),
{ concurrency: "unbounded" }
)
);
return results;
});
// Fiber 监督
const supervisedExample = Effect.gen(function* (_) {
const fiber = yield* _(
Effect.fork(
Effect.gen(function* (_) {
yield* _(Effect.sleep("5 seconds"));
return "完成";
})
)
);
// 设置监督策略
yield* _(
Fiber.interrupt(fiber).pipe(
Effect.race(Fiber.join(fiber))
)
);
});
5 个常见陷阱与解决方案
1. 忘记运行 Effect
// ❌ 错误:Effect 是惰性的,不运行就不会执行
const program = Effect.succeed(42).pipe(Effect.map(console.log));
// ✅ 正确:必须显式运行
Effect.runPromise(program);
2. 在 Effect 中混用 Promise
// ❌ 错误:直接使用 async/await 破坏了 Effect 的错误追踪
const bad = Effect.async(() => {
const data = await fetch("/api"); // 语法错误,不能在非 async 函数中使用 await
});
// ✅ 正确:使用 Effect.tryPromise 包装
const good = Effect.tryPromise({
try: () => fetch("/api"),
catch: (e) => new FetchError(e),
});
3. 错误类型未标记
// ❌ 错误:使用原始 Error 无法使用 catchTag
const bad = Effect.fail(new Error("something went wrong"));
// ✅ 正确:使用带 _tag 的自定义错误
class AppError {
readonly _tag = "AppError";
constructor(readonly message: string) {}
}
const good = Effect.fail(new AppError("something went wrong"));
4. Layer 未提供
// ❌ 错误:使用了 Database 但没有提供 Layer
const program = Effect.gen(function* (_) {
const db = yield* _(Database);
return yield* _(db.query("SELECT 1"));
});
Effect.runPromise(program); // 运行时报错:缺少 Database 的 Layer
// ✅ 正确:提供 Layer
Effect.runPromise(program.pipe(Effect.provide(PostgresLive)));
5. 忽略 Fiber 中断
// ❌ 错误:不可中断的 Effect 会导致 Fiber 无法正常关闭
const bad = Effect.sync(() => {
while (true) {} // 死循环,无法中断
});
// ✅ 正确:使用 Effect.yieldNow 让出控制权
const good = Effect.gen(function* (_) {
while (true) {
yield* _(Effect.yieldNow());
}
});
10 个错误排查项
| # | 错误现象 | 原因 | 解决方案 |
|---|---|---|---|
| 1 | Error: Service not found: Database |
未提供对应 Layer | 使用 Effect.provide() 注入 Layer |
| 2 | Uncaught TypeError: Effect is not a function |
导入路径错误 | 检查 import { Effect } from "effect" |
| 3 | Effect 不执行 | 忘记调用 Effect.runPromise 等 |
Effect 是惰性的,必须显式运行 |
| 4 | catchTag 无法匹配错误 |
错误类缺少 _tag 属性 |
确保自定义错误有 readonly _tag |
| 5 | 类型推断失败,出现 never |
错误类型与 catch 不匹配 | 检查 Effect 的错误类型参数 |
| 6 | Fiber interrupted 意外中断 |
父 Fiber 结束导致子 Fiber 中断 | 使用 Effect.forkDaemon 创建守护 Fiber |
| 7 | 内存泄漏 | Fiber 未正确清理 | 使用结构化并发,避免 forkDaemon 滥用 |
| 8 | Schedule 重试不生效 |
重试条件不满足 | 检查 whileInput / whileOutput 条件 |
| 9 | Layer 循环依赖 | 两个 Layer 相互依赖 | 重构依赖关系,使用 Layer.provide 拆分 |
| 10 | timeout 不生效 |
超时时间单位错误 | 使用字符串格式 "5 seconds" 而非数字 |
Effect-TS vs Promise/async-await 对比
| 维度 | Effect-TS | Promise/async-await |
|---|---|---|
| 错误类型 | 编译期完全类型化 | unknown,需手动断言 |
| 惰性求值 | ✅ 延迟执行 | ❌ 立即执行 |
| 可组合性 | ✅ pipe + gen | ⚠️ 需手动包装 |
| 依赖注入 | ✅ Layer 系统 | ❌ 无内置方案 |
| 并发控制 | ✅ Fiber + Semaphore | ❌ 无原生支持 |
| 重试策略 | ✅ Schedule | ❌ 需手动实现 |
| 超时控制 | ✅ 内置 | ❌ 需 Promise.race hack |
| 资源安全 | ✅ Scope + acquireRelease | ⚠️ finally 不保证 |
| 中断支持 | ✅ Fiber.interrupt | ❌ AbortController |
| 可测试性 | ✅ 替换 Layer 即可 | ⚠️ 需要 mock 模块 |
| 调试体验 | ⚠️ 堆栈较深 | ✅ 原生堆栈清晰 |
| 包体积 | ⚠️ ~80KB min | ✅ 0KB(原生) |
推荐工具
在 Effect-TS 开发中,以下在线工具可以帮助你提升效率:
- JSON 格式化工具 — 调试 API 响应数据时,快速格式化和验证 JSON
- Base64 编解码 — 处理 HTTP 请求头和 Token 编码
- 哈希计算工具 — 生成数据签名和缓存键的哈希值
总结:Effect-TS 通过将副作用建模为值,为 TypeScript 带来了前所未有的类型安全和可组合性。掌握 Effect、Layer、Service、Fiber 四大核心概念,配合 catchTag、retry、timeout 等错误处理模式,你就能在 2026 年写出真正健壮的函数式 TypeScript 代码。虽然学习曲线陡峭,但一旦掌握,你将再也无法回到裸 Promise 的时代。
本站提供浏览器本地工具,免注册即可试用 →
#TypeScript Effect#Effect-TS#副作用处理#函数式编程#2026