TypeScript Effect 系統:用 Effect-TS 優雅處理副作用

前端工程

為什麼 Effect-TS 正在革新 TypeScript 錯誤處理

在 TypeScript 專案中,非同步錯誤處理一直是個痛點。try/catch 無法捕獲非同步錯誤,Promise.catch() 容易遺漏,async/await 讓錯誤處理變得隱式。Effect-TS 透過將副作用建模為值,徹底改變了這一局面。

特性 Effect-TS Promise/async-await fp-ts neverthrow
型別安全錯誤 ✅ 完全型別化 ❌ unknown ✅ 完全型別化 ✅ 完全型別化
依賴注入 ✅ 內建 Layer ❌ 無 ❌ 需手動 ❌ 無
並行原語 ✅ Fiber ❌ 無原生支援 ❌ 無 ❌ 無
重試/逾時 ✅ 內建 ❌ 需手動 ❌ 需手動 ❌ 需手動
可組合性 ✅ 高 ⚠️ 中 ✅ 高 ⚠️ 中
學習曲線 ⚠️ 陡峭 ✅ 低 ❌ 陡峭 ✅ 低
生態成熟度 ✅ 活躍 ✅ 原生 ⚠️ 維護模式 ⚠️ 小眾
資源釋放 ✅ Scope ❌ finally ❌ 無 ❌ 無

Effect-TS 在 2026 年已經成為 TypeScript 函數式程式設計的事實標準,其核心思想是:將副作用描述為資料,而非立即執行。這讓程式變得可測試、可組合、可推理。


Effect:核心概念與基本操作

Effect 是 Effect-TS 的核心抽象,它描述了一個可能失敗的計算。你可以把它理解為「增強版的 Promise」——但它是惰性的、型別安全的、可組合的。

建立 Effect

import { Effect } from "effect";

// 從同步值建立
const syncEffect = Effect.succeed(42);

// 從可能拋出例外同步程式碼建立
const riskySync = Effect.sync(() => {
  const data = JSON.parse('{"name":"effect"}');
  return data;
});

// 從非同步操作建立
const asyncEffect = Effect.tryPromise({
  try: () => fetch("https://api.example.com/users"),
  catch: (error) => new FetchError(error),
});

// 定義自訂錯誤型別
class FetchError {
  readonly _tag = "FetchError";
  constructor(readonly cause: unknown) {}
}

class ParseError {
  readonly _tag = "ParseError";
  constructor(readonly cause: unknown) {}
}

鏈式操作與映射

import { Effect } from "effect";

const program = Effect.gen(function* (_) {
  const response = yield* _(
    Effect.tryPromise({
      try: () => fetch("https://api.example.com/users"),
      catch: (error) => new FetchError(error),
    })
  );

  const data = yield* _(
    Effect.try({
      try: () => response.json(),
      catch: (error) => new ParseError(error),
    })
  );

  return data;
});

// 映射與鏈式操作
const transformed = pipe(
  Effect.succeed(10),
  Effect.map((n) => n * 2),
  Effect.flatMap((n) => Effect.succeed(n + 5))
);

執行 Effect

import { Effect } from "effect";

const program = Effect.succeed("Hello, Effect-TS!");

// 在 Node.js 中執行
Effect.runPromise(program).then(console.log);
// 輸出: Hello, Effect-TS!

// 同步執行(僅限同步 Effect)
Effect.runSync(Effect.succeed(42));
// 輸出: 42

// 使用 callback 執行
Effect.runCallback(program, {
  onExit: (exit) => {
    if (exit._tag === "Success") {
      console.log(exit.value);
    } else {
      console.error(exit.cause);
    }
  },
});

Layer:依賴注入系統

Layer 是 Effect-TS 的依賴注入機制,它讓你可以將服務的建立與使用完全分離。這在測試和模組化中極為強大。

import { Effect, Layer, Context } from "effect";

// 定義服務介面
class Database extends Context.Tag("Database")<
  Database,
  {
    readonly query: (sql: string) => Effect.Effect<unknown[], DbError>;
    readonly connect: () => Effect.Effect<void, DbError>;
  }
>() {}

class DbError {
  readonly _tag = "DbError";
  constructor(readonly message: string) {}
}

// 實作 PostgreSQL Layer
const PostgresLive = Layer.succeed(Database, {
  query: (sql: string) =>
    Effect.tryPromise({
      try: () => pgPool.query(sql),
      catch: (error) => new DbError(String(error)),
    }),
  connect: () =>
    Effect.tryPromise({
      try: () => pgPool.connect(),
      catch: (error) => new DbError(String(error)),
    }),
});

// 實作 Mock Layer(用於測試)
const DatabaseMock = Layer.succeed(Database, {
  query: (sql: string) => Effect.succeed([{ id: 1, name: "test" }]),
  connect: () => Effect.succeed(void 0),
});

// 使用服務
const getUserById = (id: number) =>
  Effect.gen(function* (_) {
    const db = yield* _(Database);
    const rows = yield* _(db.query(`SELECT * FROM users WHERE id = ${id}`));
    return rows[0];
  });

// 生產環境使用真實 Layer
const program = getUserById(1).pipe(
  Effect.provide(PostgresLive)
);

// 測試環境使用 Mock Layer
const testProgram = getUserById(1).pipe(
  Effect.provide(DatabaseMock)
);

Service:服務定義與組合

Service 是 Effect-TS 中定義可注入服務的標準模式,透過 Context.Tag 實作。

import { Effect, Context, Layer } from "effect";

// 定義 Logger 服務
class Logger extends Context.Tag("Logger")<
  Logger,
  {
    readonly info: (msg: string) => Effect.Effect<void>;
    readonly error: (msg: string) => Effect.Effect<void>;
  }
>() {}

// 定義 Config 服務
class AppConfig extends Context.Tag("AppConfig")<
  AppConfig,
  {
    readonly apiUrl: string;
    readonly timeout: number;
  }
>() {}

// Logger 實作
const LoggerLive = Layer.succeed(Logger, {
  info: (msg) => Effect.sync(() => console.log(`[INFO] ${msg}`)),
  error: (msg) => Effect.sync(() => console.error(`[ERROR] ${msg}`)),
});

// Config 實作
const ConfigLive = Layer.succeed(AppConfig, {
  apiUrl: "https://api.example.com",
  timeout: 5000,
});

// 組合多個服務
const AppLive = Layer.merge(LoggerLive, ConfigLive);

// 業務邏輯使用多個服務
const fetchWithLogging = Effect.gen(function* (_) {
  const logger = yield* _(Logger);
  const config = yield* _(AppConfig);

  yield* _(logger.info(`Fetching from ${config.apiUrl}`));

  const result = yield* _(
    Effect.tryPromise({
      try: () => fetch(config.apiUrl),
      catch: (error) => new FetchError(error),
    })
  );

  yield* _(logger.info("Fetch completed"));
  return result;
}).pipe(Effect.provide(AppLive));

Fiber:並行設計

Fiber 是 Effect-TS 的輕量級並行單元,類似協程但更強大,支援中斷、監督和結構化並行。

import { Effect, Fiber } from "effect";

// 平行執行多個 Effect
const parallelExample = Effect.gen(function* (_) {
  const [users, posts, comments] = yield* _(
    Effect.all(
      [
        Effect.tryPromise({
          try: () => fetch("/api/users"),
          catch: (e) => new FetchError(e),
        }),
        Effect.tryPromise({
          try: () => fetch("/api/posts"),
          catch: (e) => new FetchError(e),
        }),
        Effect.tryPromise({
          try: () => fetch("/api/comments"),
          catch: (e) => new FetchError(e),
        }),
      ],
      { concurrency: "unbounded" }
    )
  );

  return { users, posts, comments };
});

// 競速:取最快回傳的結果
const raceExample = Effect.gen(function* (_) {
  const result = yield* _(
    Effect.race(
      Effect.tryPromise({
        try: () => fetch("/api/cdn1/data"),
        catch: (e) => new FetchError(e),
      }),
      Effect.tryPromise({
        try: () => fetch("/api/cdn2/data"),
        catch: (e) => new FetchError(e),
      })
    )
  );
  return result;
});

// 手動 Fiber 控制
const fiberExample = Effect.gen(function* (_) {
  const fiber = yield* _(
    Effect.fork(
      Effect.tryPromise({
        try: () => longRunningTask(),
        catch: (e) => new TaskError(e),
      })
    )
  );

  // 做其他事情...
  yield* _(Effect.sleep("2 seconds"));

  // 等待 Fiber 完成
  const result = yield* _(Fiber.join(fiber));
  return result;
});

// 結構化並行:父 Effect 中斷時子 Fiber 自動中斷
const structuredConcurrency = Effect.gen(function* (_) {
  yield* _(
    Effect.fork(
      Effect.gen(function* (_) {
        yield* _(Effect.sleep("10 seconds"));
        console.log("這不會執行,因為父 Effect 會先結束");
      })
    )
  );

  yield* _(Effect.sleep("1 second"));
  // 父 Effect 結束,子 Fiber 自動中斷
});

從回呼到 Promise 到 Effect:完整遷移範例

回呼地獄時代

function getUserCallback(id: number, callback: (err: Error | null, user?: User) => void) {
  db.query("SELECT * FROM users WHERE id = ?", [id], (err, rows) => {
    if (err) return callback(err);
    if (rows.length === 0) return callback(new Error("User not found"));
    callback(null, rows[0]);
  });
}

function getOrdersCallback(userId: number, callback: (err: Error | null, orders?: Order[]) => void) {
  db.query("SELECT * FROM orders WHERE userId = ?", [userId], (err, rows) => {
    if (err) return callback(err);
    callback(null, rows);
  });
}

// 使用:回呼地獄
getUserCallback(1, (err, user) => {
  if (err) { console.error(err); return; }
  getOrdersCallback(user!.id, (err, orders) => {
    if (err) { console.error(err); return; }
    console.log({ user, orders });
  });
});

Promise/async-await 時代

async function getUser(id: number): Promise<User> {
  const rows = await db.query("SELECT * FROM users WHERE id = ?", [id]);
  if (rows.length === 0) throw new Error("User not found");
  return rows[0];
}

async function getOrders(userId: number): Promise<Order[]> {
  return db.query("SELECT * FROM orders WHERE userId = ?", [userId]);
}

// 使用:錯誤型別遺失
async function main() {
  try {
    const user = await getUser(1);
    const orders = await getOrders(user.id);
    console.log({ user, orders });
  } catch (error) {
    // error 是 unknown,無法區分是使用者不存在還是資料庫錯誤
    console.error(error);
  }
}

Effect-TS 時代

class UserNotFoundError {
  readonly _tag = "UserNotFoundError";
  constructor(readonly userId: number) {}
}

class DbQueryError {
  readonly _tag = "DbQueryError";
  constructor(readonly message: string, readonly cause: unknown) {}
}

const getUser = (id: number) =>
  Effect.gen(function* (_) {
    const db = yield* _(Database);
    const rows = yield* _(
      db.query("SELECT * FROM users WHERE id = ?", [id]).pipe(
        Effect.mapError((e) => new DbQueryError("Query failed", e))
      )
    );
    if (rows.length === 0) {
      return yield* _(Effect.fail(new UserNotFoundError(id)));
    }
    return rows[0] as User;
  });

const getOrders = (userId: number) =>
  Effect.gen(function* (_) {
    const db = yield* _(Database);
    return yield* _(db.query("SELECT * FROM orders WHERE userId = ?", [userId]));
  });

// 使用:完全型別安全的錯誤處理
const program = Effect.gen(function* (_) {
  const user = yield* _(getUser(1));
  const orders = yield* _(getOrders(user.id));
  return { user, orders };
}).pipe(
  Effect.catchTag("UserNotFoundError", (e) =>
    Effect.succeed({ error: `使用者 ${e.userId} 不存在` })
  ),
  Effect.catchTag("DbQueryError", (e) =>
    Effect.succeed({ error: `資料庫錯誤: ${e.message}` })
  )
);

錯誤處理模式

catchAll:捕獲所有錯誤

import { Effect } from "effect";

const resilient = pipe(
  riskyOperation(),
  Effect.catchAll((error) =>
    Effect.succeed(`復原自錯誤: ${String(error)}`)
  )
);

catchTag:按型別捕獲錯誤

import { Effect } from "effect";

const taggedHandler = pipe(
  program,
  Effect.catchTag("FetchError", (e) =>
    Effect.succeed({ fallback: true, reason: e.cause })
  ),
  Effect.catchTag("ParseError", (e) =>
    Effect.succeed({ fallback: true, reason: e.cause })
  )
);

retry:指數退避重試

import { Effect, Schedule } from "effect";

const retryWithBackoff = pipe(
  fetchFromApi(),
  Effect.retry(
    Schedule.exponential("1 second").pipe(
      Schedule.compose(Schedule.recurs(5)),
      Schedule.tap((attempt) =>
        Effect.sync(() => console.log(`第 ${attempt} 次重試...`))
      )
    )
  )
);

// 帶條件的重試
const conditionalRetry = pipe(
  fetchFromApi(),
  Effect.retry(
    Schedule.exponential("500 millis").pipe(
      Schedule.whileOutput((delay) => delay < 30_000),
      Schedule.whileInput((error) => error._tag === "NetworkError")
    )
  )
);

timeout:逾時控制

import { Effect } from "effect";

class TimeoutError {
  readonly _tag = "TimeoutError";
}

const withTimeout = pipe(
  longRunningOperation(),
  Effect.timeout("5 seconds"),
  Effect.mapError(() => new TimeoutError())
);

// 帶逾時與回退
const withTimeoutFallback = pipe(
  longRunningOperation(),
  Effect.timeoutFail({
    duration: "3 seconds",
    onTimeout: () => new TimeoutError(),
  }),
  Effect.catchTag("TimeoutError", () =>
    Effect.succeed(getCachedResult())
  )
);

依賴注入實戰:完整範例

import { Effect, Context, Layer } from "effect";

// 定義錯誤型別
class HttpError {
  readonly _tag = "HttpError";
  constructor(readonly status: number, readonly message: string) {}
}

// 定義 HttpClient 服務
class HttpClient extends Context.Tag("HttpClient")<
  HttpClient,
  {
    readonly get: (url: string) => Effect.Effect<Response, HttpError>;
    readonly post: (url: string, body: unknown) => Effect.Effect<Response, HttpError>;
  }
>() {}

// 定義 Cache 服務
class Cache extends Context.Tag("Cache")<
  Cache,
  {
    readonly get: (key: string) => Effect.Effect<unknown | null>;
    readonly set: (key: string, value: unknown, ttl: number) => Effect.Effect<void>;
  }
>() {}

// 真實 HttpClient 實作
const HttpClientLive = Layer.succeed(HttpClient, {
  get: (url) =>
    Effect.tryPromise({
      try: () => fetch(url),
      catch: (error) => new HttpError(0, String(error)),
    }).pipe(
      Effect.flatMap((res) =>
        res.ok
          ? Effect.succeed(res)
          : Effect.fail(new HttpError(res.status, res.statusText))
      )
    ),
  post: (url, body) =>
    Effect.tryPromise({
      try: () =>
        fetch(url, {
          method: "POST",
          headers: { "Content-Type": "application/json" },
          body: JSON.stringify(body),
        }),
      catch: (error) => new HttpError(0, String(error)),
    }).pipe(
      Effect.flatMap((res) =>
        res.ok
          ? Effect.succeed(res)
          : Effect.fail(new HttpError(res.status, res.statusText))
      )
    ),
});

// 記憶體 Cache 實作
const InMemoryCacheLive = Layer.effect(
  Cache,
  Effect.sync(() => {
    const store = new Map<string, { value: unknown; expires: number }>();
    return {
      get: (key) =>
        Effect.sync(() => {
          const entry = store.get(key);
          if (!entry || entry.expires < Date.now()) return null;
          return entry.value;
        }),
      set: (key, value, ttl) =>
        Effect.sync(() => {
          store.set(key, { value, expires: Date.now() + ttl });
        }),
    };
  })
);

// 業務邏輯:帶快取的 API 呼叫
const fetchWithCache = (url: string) =>
  Effect.gen(function* (_) {
    const cache = yield* _(Cache);
    const http = yield* _(HttpClient);

    const cached = yield* _(cache.get(url));
    if (cached) return cached;

    const response = yield* _(http.get(url));
    const data = yield* _(
      Effect.try({
        try: () => response.json(),
        catch: (e) => new HttpError(0, `Parse error: ${String(e)}`),
      })
    );

    yield* _(cache.set(url, data, 60_000));
    return data;
  });

// 組裝應用
const AppLayer = Layer.merge(HttpClientLive, InMemoryCacheLive);

const main = fetchWithCache("https://api.example.com/data").pipe(
  Effect.provide(AppLayer),
  Effect.catchTag("HttpError", (e) =>
    Effect.succeed({ error: `HTTP ${e.status}: ${e.message}` })
  )
);

並行實戰:Fiber 完整範例

import { Effect, Fiber, Ref, Queue } from "effect";

// 生產者-消費者模式
const producerConsumer = Effect.gen(function* (_) {
  const queue = yield* _(Queue.bounded<string>(10));

  const producer = Effect.gen(function* (_) {
    for (let i = 0; i < 5; i++) {
      yield* _(Queue.offer(queue, `訊息-${i}`));
      yield* _(Effect.sleep("100 millis"));
    }
    yield* _(Queue.offer(queue, "DONE"));
  });

  const consumer = Effect.gen(function* (_) {
    while (true) {
      const msg = yield* _(Queue.take(queue));
      if (msg === "DONE") break;
      console.log(`消費: ${msg}`);
    }
  });

  yield* _(
    Effect.all([producer, consumer], { concurrency: 2 })
  );
});

// 平行請求帶限流
const fetchWithConcurrencyLimit = (urls: string[]) =>
  Effect.gen(function* (_) {
    const semaphore = yield* _(Effect.makeSemaphore(3));

    const results = yield* _(
      Effect.all(
        urls.map((url) =>
          semaphore.withPermits(1)(
            Effect.tryPromise({
              try: () => fetch(url),
              catch: (e) => new FetchError(e),
            })
          )
        ),
        { concurrency: "unbounded" }
      )
    );

    return results;
  });

// Fiber 監督
const supervisedExample = Effect.gen(function* (_) {
  const fiber = yield* _(
    Effect.fork(
      Effect.gen(function* (_) {
        yield* _(Effect.sleep("5 seconds"));
        return "完成";
      })
    )
  );

  yield* _(
    Fiber.interrupt(fiber).pipe(
      Effect.race(Fiber.join(fiber))
    )
  );
});

5 個常見陷阱與解決方案

1. 忘記執行 Effect

// ❌ 錯誤:Effect 是惰性的,不執行就不會運作
const program = Effect.succeed(42).pipe(Effect.map(console.log));

// ✅ 正確:必須明確執行
Effect.runPromise(program);

2. 在 Effect 中混用 Promise

// ❌ 錯誤:直接使用 async/await 破壞了 Effect 的錯誤追蹤
const bad = Effect.async(() => {
  const data = await fetch("/api"); // 語法錯誤
});

// ✅ 正確:使用 Effect.tryPromise 包裝
const good = Effect.tryPromise({
  try: () => fetch("/api"),
  catch: (e) => new FetchError(e),
});

3. 錯誤型別未標記

// ❌ 錯誤:使用原始 Error 無法使用 catchTag
const bad = Effect.fail(new Error("something went wrong"));

// ✅ 正確:使用帶 _tag 的自訂錯誤
class AppError {
  readonly _tag = "AppError";
  constructor(readonly message: string) {}
}
const good = Effect.fail(new AppError("something went wrong"));

4. Layer 未提供

// ❌ 錯誤:使用了 Database 但沒有提供 Layer
const program = Effect.gen(function* (_) {
  const db = yield* _(Database);
  return yield* _(db.query("SELECT 1"));
});
Effect.runPromise(program); // 執行時報錯:缺少 Database 的 Layer

// ✅ 正確:提供 Layer
Effect.runPromise(program.pipe(Effect.provide(PostgresLive)));

5. 忽略 Fiber 中斷

// ❌ 錯誤:不可中斷的 Effect 會導致 Fiber 無法正常關閉
const bad = Effect.sync(() => {
  while (true) {} // 無窮迴圈,無法中斷
});

// ✅ 正確:使用 Effect.yieldNow 讓出控制權
const good = Effect.gen(function* (_) {
  while (true) {
    yield* _(Effect.yieldNow());
  }
});

10 個錯誤排查項

# 錯誤現象 原因 解決方案
1 Error: Service not found: Database 未提供對應 Layer 使用 Effect.provide() 注入 Layer
2 Uncaught TypeError: Effect is not a function 匯入路徑錯誤 檢查 import { Effect } from "effect"
3 Effect 不執行 忘記呼叫 Effect.runPromise Effect 是惰性的,必須明確執行
4 catchTag 無法匹配錯誤 錯誤類別缺少 _tag 屬性 確保自訂錯誤有 readonly _tag
5 型別推斷失敗,出現 never 錯誤型別與 catch 不匹配 檢查 Effect 的錯誤型別參數
6 Fiber interrupted 意外中斷 父 Fiber 結束導致子 Fiber 中斷 使用 Effect.forkDaemon 建立守護 Fiber
7 記憶體洩漏 Fiber 未正確清理 使用結構化並行,避免 forkDaemon 濫用
8 Schedule 重試不生效 重試條件不滿足 檢查 whileInput / whileOutput 條件
9 Layer 循環依賴 兩個 Layer 相互依賴 重構依賴關係,使用 Layer.provide 拆分
10 timeout 不生效 逾時時間單位錯誤 使用字串格式 "5 seconds" 而非數字

Effect-TS vs Promise/async-await 對比

維度 Effect-TS Promise/async-await
錯誤型別 編譯期完全型別化 unknown,需手動斷言
惰性求值 ✅ 延遲執行 ❌ 立即執行
可組合性 ✅ pipe + gen ⚠️ 需手動包裝
依賴注入 ✅ Layer 系統 ❌ 無內建方案
並行控制 ✅ Fiber + Semaphore ❌ 無原生支援
重試策略 ✅ Schedule ❌ 需手動實作
逾時控制 ✅ 內建 ❌ 需 Promise.race hack
資源安全 ✅ Scope + acquireRelease ⚠️ finally 不保證
中斷支援 ✅ Fiber.interrupt ❌ AbortController
可測試性 ✅ 替換 Layer 即可 ⚠️ 需要 mock 模組
除錯體驗 ⚠️ 堆疊較深 ✅ 原生堆疊清晰
套件體積 ⚠️ ~80KB min ✅ 0KB(原生)

推薦工具

在 Effect-TS 開發中,以下線上工具可以幫助你提升效率:


總結:Effect-TS 透過將副作用建模為值,為 TypeScript 帶來了前所未有的型別安全和可組合性。掌握 Effect、Layer、Service、Fiber 四大核心概念,搭配 catchTag、retry、timeout 等錯誤處理模式,你就能在 2026 年寫出真正健壯的函數式 TypeScript 程式碼。雖然學習曲線陡峭,但一旦掌握,你將再也無法回到裸 Promise 的時代。

本站提供瀏覽器本地工具,免註冊即可試用 →

#TypeScript Effect#Effect-TS#副作用处理#函数式编程#2026