跳转至

Effect-TS

为什么要学

Effect 是 TypeScript 的函数式效果系统,解决了 TS 在生产应用中的核心痛点:

  • 类型安全的错误处理:错误成为类型签名的一部分,不再遗漏
  • 可组合的并发:结构化并发,无回调地狱
  • 依赖注入:编译时保证依赖完整性
  • 资源管理:自动清理,不会泄漏
  • 可观测性:内置日志、追踪、指标
  • 重试与中断:优雅的故障恢复机制

如果你觉得 TypeScript 在处理错误、并发、资源管理时太粗糙,Effect 提供了工程级解决方案。

核心概念

白话解释

Effect 的核心类型 Effect<Success, Error, Requirements> 描述了一个计算: - Success:成功时返回什么 - Error:可能出什么错 - Requirements:需要什么依赖

就像一个"带说明书的蓝图"——描述了做什么、可能出什么问题、需要什么材料。

核心概念对照表

Effect概念传统TS对比说明
EffectPromise带错误和依赖类型的异步计算
pipe / Effect.genasync/await组合效果的方式
Layernew Service()依赖注入层
Scopetry/finally资源生命周期管理
FiberPromise轻量级并发单元
SchedulesetInterval/retry声明式重试/重复策略
Schemazod/io-ts运行时类型验证
StreamAsyncIterator异步数据流
CauseError结构化的错误信息(含堆栈)

安装配置

安装

npm install effect
# 或
pnpm add effect
bun add effect

# 可选包
npm install @effect/schema    # 运行时验证
npm install @effect/platform  # 平台服务(HTTP/FS)

TypeScript 配置

// tsconfig.json
{
  "compilerOptions": {
    "strict": true,
    "target": "ES2021",
    "module": "ESNext",
    "moduleResolution": "bundler",
    "exactOptionalPropertyTypes": true
  }
}

快速上手

第一个 Effect

import { Effect, Console } from "effect";

// 创建Effect(描述计算,不执行)
const program = Effect.gen(function* () {
  yield* Console.log("Hello Effect!");
  const result = yield* Effect.succeed(42);
  yield* Console.log(`Result: ${result}`);
  return result;
});

// 执行Effect
Effect.runPromise(program).then(console.log);
// 输出: Hello Effect! → Result: 42 → 42

类型安全的错误处理

import { Effect, Data } from "effect";

// 定义错误类型
class HttpError extends Data.TaggedError("HttpError")<{
  status: number;
  message: string;
}> {}

class ParseError extends Data.TaggedError("ParseError")<{
  input: string;
}> {}

// 函数签名清楚表达可能的错误
const fetchUser = (id: string): Effect.Effect<User, HttpError> =>
  Effect.gen(function* () {
    const response = yield* Effect.tryPromise({
      try: () => fetch(`/api/users/${id}`),
      catch: () => new HttpError({ status: 500, message: "Network error" }),
    });

    if (!response.ok) {
      return yield* Effect.fail(
        new HttpError({ status: response.status, message: "Not found" })
      );
    }

    return yield* Effect.tryPromise({
      try: () => response.json() as Promise<User>,
      catch: () => new HttpError({ status: 500, message: "Parse failed" }),
    });
  });

// 调用者在类型层面知道可能的错误
const program = Effect.gen(function* () {
  const user = yield* fetchUser("123");
  return user;
});
// program的类型: Effect<User, HttpError, never>

错误恢复

const resilient = fetchUser("123").pipe(
  // 捕获特定错误
  Effect.catchTag("HttpError", (error) => {
    if (error.status === 404) {
      return Effect.succeed(defaultUser);
    }
    return Effect.fail(error);
  }),
  // 重试策略
  Effect.retry({ times: 3 }),
);

进阶用法

1. 依赖注入(Service + Layer)

import { Effect, Context, Layer } from "effect";

// 定义服务接口
class Database extends Context.Tag("Database")<
  Database,
  {
    query: (sql: string) => Effect.Effect<any[]>;
    insert: (table: string, data: any) => Effect.Effect<void>;
  }
>() {}

class Logger extends Context.Tag("Logger")<
  Logger,
  {
    info: (msg: string) => Effect.Effect<void>;
    error: (msg: string) => Effect.Effect<void>;
  }
>() {}

// 使用服务(不关心实现)
const getUsers = Effect.gen(function* () {
  const db = yield* Database;
  const logger = yield* Logger;

  yield* logger.info("Fetching users...");
  const users = yield* db.query("SELECT * FROM users");
  yield* logger.info(`Found ${users.length} users`);
  return users;
});
// 类型: Effect<any[], never, Database | Logger>

// 创建实现层
const DatabaseLive = Layer.succeed(Database, {
  query: (sql) => Effect.succeed([{ id: 1, name: "张三" }]),
  insert: (table, data) => Effect.succeed(undefined),
});

const LoggerLive = Layer.succeed(Logger, {
  info: (msg) => Console.log(`[INFO] ${msg}`),
  error: (msg) => Console.error(`[ERROR] ${msg}`),
});

// 组合并运行
const AppLayer = Layer.merge(DatabaseLive, LoggerLive);
const runnable = Effect.provide(getUsers, AppLayer);
Effect.runPromise(runnable);

2. 结构化并发

import { Effect, Fiber } from "effect";

// 并行执行
const parallel = Effect.all([
  fetchUser("1"),
  fetchUser("2"),
  fetchUser("3"),
], { concurrency: "unbounded" });

// 带限制的并发
const limited = Effect.all(
  userIds.map(fetchUser),
  { concurrency: 5 }  // 最多5个并行
);

// Race(第一个完成的获胜)
const fastest = Effect.race(fetchFromCDN1, fetchFromCDN2);

// Fork(后台执行)
const background = Effect.gen(function* () {
  const fiber = yield* Effect.fork(longRunningTask);
  // 继续其他工作...
  const result = yield* Fiber.join(fiber);
  return result;
});

// 超时
const withTimeout = myEffect.pipe(
  Effect.timeout("5 seconds")
);

3. 资源管理(Scope)

import { Effect, Scope } from "effect";

// 定义可清理的资源
const acquireDbConnection = Effect.acquireRelease(
  // 获取
  Effect.sync(() => {
    console.log("Opening DB connection");
    return { query: (sql: string) => [/* results */] };
  }),
  // 释放(保证执行)
  (conn) => Effect.sync(() => {
    console.log("Closing DB connection");
  })
);

// 使用资源(自动管理生命周期)
const program = Effect.scoped(
  Effect.gen(function* () {
    const conn = yield* acquireDbConnection;
    return conn.query("SELECT 1");
  })
);
// 无论成功失败,连接都会被关闭

4. Stream(异步数据流)

import { Stream, Effect, Chunk } from "effect";

// 创建流
const numbers = Stream.range(1, 100);

// 流处理
const processed = numbers.pipe(
  Stream.filter((n) => n % 2 === 0),
  Stream.map((n) => n * 2),
  Stream.take(10),
  Stream.runCollect,
);

// 从异步源创建流
const eventStream = Stream.async<string>((emit) => {
  const ws = new WebSocket("ws://...");
  ws.onmessage = (e) => emit(Effect.succeed(Chunk.of(e.data)));
  ws.onerror = () => emit(Effect.fail(new Error("WS error")));
});

5. Schema(运行时验证)

import { Schema } from "@effect/schema";

// 定义Schema
const User = Schema.Struct({
  id: Schema.Number,
  name: Schema.String.pipe(Schema.minLength(1)),
  email: Schema.String.pipe(Schema.pattern(/^.+@.+\..+$/)),
  age: Schema.Number.pipe(Schema.between(0, 150)),
});

type User = Schema.Schema.Type<typeof User>;

// 解析(带错误处理)
const parseUser = Schema.decodeUnknown(User);
const result = Effect.runSync(
  parseUser({ id: 1, name: "张三", email: "z@t.com", age: 25 })
);

// 编码
const encodeUser = Schema.encodeUnknown(User);

6. 重试与调度

import { Effect, Schedule } from "effect";

// 重试策略
const retryPolicy = Schedule.exponential("1 second").pipe(
  Schedule.compose(Schedule.recurs(5)),
  Schedule.jittered,
);

const resilientFetch = fetchData.pipe(
  Effect.retry(retryPolicy),
);

// 重复执行
const polling = checkStatus.pipe(
  Effect.repeat(Schedule.fixed("10 seconds")),
);

常见问题

Q1: 学习曲线太陡?

从小处开始: 1. 先用 Effect.gen 替代 try/catch 2. 再加入 Service/Layer 做依赖注入 3. 最后探索并发和 Stream

Q2: 和 fp-ts 的关系?

Effect 是 fp-ts 的"下一代"(同一作者团队)。Effect 更实用、更完整、有更好的 DX。新项目直接用 Effect。

Q3: 性能开销?

Effect 的运行时开销很小(微秒级)。对于 I/O 密集型应用(大多数 Web 服务)完全可以忽略。

Q4: 团队能接受吗?

  • Effect 可以渐进式采用
  • 在关键模块先用,非关键部分保持现状
  • 类型安全的错误处理通常是最容易被接受的入口

参考资源