Effect-TS¶
为什么要学¶
Effect 是 TypeScript 的函数式效果系统,解决了 TS 在生产应用中的核心痛点:
- 类型安全的错误处理:错误成为类型签名的一部分,不再遗漏
- 可组合的并发:结构化并发,无回调地狱
- 依赖注入:编译时保证依赖完整性
- 资源管理:自动清理,不会泄漏
- 可观测性:内置日志、追踪、指标
- 重试与中断:优雅的故障恢复机制
如果你觉得 TypeScript 在处理错误、并发、资源管理时太粗糙,Effect 提供了工程级解决方案。
核心概念¶
白话解释¶
Effect 的核心类型 Effect<Success, Error, Requirements> 描述了一个计算: - Success:成功时返回什么 - Error:可能出什么错 - Requirements:需要什么依赖
就像一个"带说明书的蓝图"——描述了做什么、可能出什么问题、需要什么材料。
核心概念对照表¶
安装配置¶
安装¶
npm install effect
# 或
pnpm add effect
bun add effect
# 可选包
npm install @effect/schema # 运行时验证
npm install @effect/platform # 平台服务(HTTP/FS)
TypeScript 配置¶
// tsconfig.json
{
"compilerOptions": {
"strict": true,
"target": "ES2021",
"module": "ESNext",
"moduleResolution": "bundler",
"exactOptionalPropertyTypes": true
}
}
快速上手¶
第一个 Effect¶
import { Effect, Console } from "effect";
// 创建Effect(描述计算,不执行)
const program = Effect.gen(function* () {
yield* Console.log("Hello Effect!");
const result = yield* Effect.succeed(42);
yield* Console.log(`Result: ${result}`);
return result;
});
// 执行Effect
Effect.runPromise(program).then(console.log);
// 输出: Hello Effect! → Result: 42 → 42
类型安全的错误处理¶
import { Effect, Data } from "effect";
// 定义错误类型
class HttpError extends Data.TaggedError("HttpError")<{
status: number;
message: string;
}> {}
class ParseError extends Data.TaggedError("ParseError")<{
input: string;
}> {}
// 函数签名清楚表达可能的错误
const fetchUser = (id: string): Effect.Effect<User, HttpError> =>
Effect.gen(function* () {
const response = yield* Effect.tryPromise({
try: () => fetch(`/api/users/${id}`),
catch: () => new HttpError({ status: 500, message: "Network error" }),
});
if (!response.ok) {
return yield* Effect.fail(
new HttpError({ status: response.status, message: "Not found" })
);
}
return yield* Effect.tryPromise({
try: () => response.json() as Promise<User>,
catch: () => new HttpError({ status: 500, message: "Parse failed" }),
});
});
// 调用者在类型层面知道可能的错误
const program = Effect.gen(function* () {
const user = yield* fetchUser("123");
return user;
});
// program的类型: Effect<User, HttpError, never>
错误恢复¶
const resilient = fetchUser("123").pipe(
// 捕获特定错误
Effect.catchTag("HttpError", (error) => {
if (error.status === 404) {
return Effect.succeed(defaultUser);
}
return Effect.fail(error);
}),
// 重试策略
Effect.retry({ times: 3 }),
);
进阶用法¶
1. 依赖注入(Service + Layer)¶
import { Effect, Context, Layer } from "effect";
// 定义服务接口
class Database extends Context.Tag("Database")<
Database,
{
query: (sql: string) => Effect.Effect<any[]>;
insert: (table: string, data: any) => Effect.Effect<void>;
}
>() {}
class Logger extends Context.Tag("Logger")<
Logger,
{
info: (msg: string) => Effect.Effect<void>;
error: (msg: string) => Effect.Effect<void>;
}
>() {}
// 使用服务(不关心实现)
const getUsers = Effect.gen(function* () {
const db = yield* Database;
const logger = yield* Logger;
yield* logger.info("Fetching users...");
const users = yield* db.query("SELECT * FROM users");
yield* logger.info(`Found ${users.length} users`);
return users;
});
// 类型: Effect<any[], never, Database | Logger>
// 创建实现层
const DatabaseLive = Layer.succeed(Database, {
query: (sql) => Effect.succeed([{ id: 1, name: "张三" }]),
insert: (table, data) => Effect.succeed(undefined),
});
const LoggerLive = Layer.succeed(Logger, {
info: (msg) => Console.log(`[INFO] ${msg}`),
error: (msg) => Console.error(`[ERROR] ${msg}`),
});
// 组合并运行
const AppLayer = Layer.merge(DatabaseLive, LoggerLive);
const runnable = Effect.provide(getUsers, AppLayer);
Effect.runPromise(runnable);
2. 结构化并发¶
import { Effect, Fiber } from "effect";
// 并行执行
const parallel = Effect.all([
fetchUser("1"),
fetchUser("2"),
fetchUser("3"),
], { concurrency: "unbounded" });
// 带限制的并发
const limited = Effect.all(
userIds.map(fetchUser),
{ concurrency: 5 } // 最多5个并行
);
// Race(第一个完成的获胜)
const fastest = Effect.race(fetchFromCDN1, fetchFromCDN2);
// Fork(后台执行)
const background = Effect.gen(function* () {
const fiber = yield* Effect.fork(longRunningTask);
// 继续其他工作...
const result = yield* Fiber.join(fiber);
return result;
});
// 超时
const withTimeout = myEffect.pipe(
Effect.timeout("5 seconds")
);
3. 资源管理(Scope)¶
import { Effect, Scope } from "effect";
// 定义可清理的资源
const acquireDbConnection = Effect.acquireRelease(
// 获取
Effect.sync(() => {
console.log("Opening DB connection");
return { query: (sql: string) => [/* results */] };
}),
// 释放(保证执行)
(conn) => Effect.sync(() => {
console.log("Closing DB connection");
})
);
// 使用资源(自动管理生命周期)
const program = Effect.scoped(
Effect.gen(function* () {
const conn = yield* acquireDbConnection;
return conn.query("SELECT 1");
})
);
// 无论成功失败,连接都会被关闭
4. Stream(异步数据流)¶
import { Stream, Effect, Chunk } from "effect";
// 创建流
const numbers = Stream.range(1, 100);
// 流处理
const processed = numbers.pipe(
Stream.filter((n) => n % 2 === 0),
Stream.map((n) => n * 2),
Stream.take(10),
Stream.runCollect,
);
// 从异步源创建流
const eventStream = Stream.async<string>((emit) => {
const ws = new WebSocket("ws://...");
ws.onmessage = (e) => emit(Effect.succeed(Chunk.of(e.data)));
ws.onerror = () => emit(Effect.fail(new Error("WS error")));
});
5. Schema(运行时验证)¶
import { Schema } from "@effect/schema";
// 定义Schema
const User = Schema.Struct({
id: Schema.Number,
name: Schema.String.pipe(Schema.minLength(1)),
email: Schema.String.pipe(Schema.pattern(/^.+@.+\..+$/)),
age: Schema.Number.pipe(Schema.between(0, 150)),
});
type User = Schema.Schema.Type<typeof User>;
// 解析(带错误处理)
const parseUser = Schema.decodeUnknown(User);
const result = Effect.runSync(
parseUser({ id: 1, name: "张三", email: "z@t.com", age: 25 })
);
// 编码
const encodeUser = Schema.encodeUnknown(User);
6. 重试与调度¶
import { Effect, Schedule } from "effect";
// 重试策略
const retryPolicy = Schedule.exponential("1 second").pipe(
Schedule.compose(Schedule.recurs(5)),
Schedule.jittered,
);
const resilientFetch = fetchData.pipe(
Effect.retry(retryPolicy),
);
// 重复执行
const polling = checkStatus.pipe(
Effect.repeat(Schedule.fixed("10 seconds")),
);
常见问题¶
Q1: 学习曲线太陡?¶
从小处开始: 1. 先用 Effect.gen 替代 try/catch 2. 再加入 Service/Layer 做依赖注入 3. 最后探索并发和 Stream
Q2: 和 fp-ts 的关系?¶
Effect 是 fp-ts 的"下一代"(同一作者团队)。Effect 更实用、更完整、有更好的 DX。新项目直接用 Effect。
Q3: 性能开销?¶
Effect 的运行时开销很小(微秒级)。对于 I/O 密集型应用(大多数 Web 服务)完全可以忽略。
Q4: 团队能接受吗?¶
- Effect 可以渐进式采用
- 在关键模块先用,非关键部分保持现状
- 类型安全的错误处理通常是最容易被接受的入口
参考资源¶
- Effect 官网 - 官方网站
- Effect 文档 - 完整文档
- Effect GitHub - 源代码
- Effect Discord - 社区
- Effect YouTube - 视频教程