refactor: 将代码按功能模块重新组织到 core/runtime/control 等目录 feat(core): 添加 Context、FlowNode 等核心 trait 和类型 feat(runtime): 实现 FlowEngine 和状态管理 feat(control): 添加顺序/并行/条件控制流节点 feat(nodes): 实现 HTTP/DB/MQ 等业务节点 docs: 更新 README 添加架构说明和快速开始示例 test: 添加性能测试脚本和示例代码
6.0 KiB
6.0 KiB
DSL Flow
DSL Flow 是一个基于 Rust 的高性能、可扩展的工作流引擎。它旨在通过声明式的 DSL(领域特定语言)来编排复杂的异步任务,支持状态管理、脚本引擎集成(Rhai/JavaScript)以及灵活的控制流。
该项目专为构建中间件、API 编排层和数据处理管道而设计,采用清晰的分层架构,易于扩展和维护。
✨ 核心特性
- 声明式 DSL: 使用 Rust 宏 (
sequence!,fork_join!,group!) 轻松定义复杂的流程结构。 - 多脚本引擎: 开箱支持 Rhai 和 JavaScript (Boa) 脚本,用于动态逻辑和数据转换。
- 强大的控制流:
- Sequence: 顺序执行任务。
- Fork-Join: 并行执行多个分支,支持灵活的结果合并策略(Array 或 Object)。
- Conditional: 基于表达式结果的条件分支 (
if-else)。
- 状态管理: 支持无状态(Stateless)和有状态(Stateful)执行模式。内置内存状态存储,可扩展至 Redis/Database。
- 内置节点: 提供 HTTP 请求、数据库模拟、消息队列模拟、血缘追踪等常用节点。
- 异步高性能: 基于
Tokio和Async Trait,全链路异步非阻塞。 - 模块化架构: 内核(Core)、运行时(Runtime)与节点实现(Nodes)分离,方便二次开发。
📦 安装
在你的 Cargo.toml 中添加依赖:
[dependencies]
dsl-flow = "0.1"
启用特定特性(可选):
[dependencies]
dsl-flow = { version = "0.1", features = ["js", "http"] }
rhai: 启用 Rhai 脚本引擎(默认开启)。js: 启用 JavaScript (Boa) 脚本引擎。http: 启用 HTTP 请求节点支持。
🚀 快速开始
以下示例展示了如何定义一个简单的流程:先计算 1+2,然后并行计算其 2 倍和 3 倍。
use dsl_flow::*;
use dsl_flow::dsl::*; // 导入 DSL 宏和辅助函数
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// 1. 初始化引擎
let store = InMemoryStateStore::default();
let engine = FlowEngine::new(store, FlowOptions {
stateful: false,
expr_engine: ExprEngineKind::Rhai
});
// 2. 定义流程
let flow = Flow::new(sequence! {
// 步骤 1: 设置初始值
expr_set(ExprEngineKind::Rhai, "1 + 2", "calc.sum"),
// 步骤 2: 并行执行
fork_join! {
expr_set(ExprEngineKind::Rhai, "ctx.calc.sum * 2", "calc.double"),
expr_set(ExprEngineKind::Rhai, "ctx.calc.sum * 3", "calc.triple")
}
});
// 3. 运行流程
let ctx = Context::new();
let output = engine.run_stateless(&flow, ctx).await?;
println!("Result: {}", output.data);
Ok(())
}
🏗️ 架构概览
DSL Flow 采用清晰的分层架构设计,各层职责分明:
dsl-flow/src/
├── core/ <-- [内核层] 纯粹的抽象和基础类型
│ ├── mod.rs
│ ├── traits.rs (FlowNode, ExprEngine 接口定义)
│ ├── context.rs (Context 上下文管理)
│ ├── error.rs (统一的错误处理)
│ └── types.rs (NodeId, NodeOutput 等通用类型)
│
├── runtime/ <-- [运行时层] 负责跑起来
│ ├── mod.rs
│ ├── engine.rs (FlowEngine 调度器)
│ ├── state.rs (FlowState 状态存储)
│ └── expr.rs (Rhai/JS 脚本引擎的具体实现)
│
├── nodes/ <-- [组件层] 具体的节点实现,按功能归类
│ ├── mod.rs
│ ├── control.rs (流程控制: Sequence, ForkJoin, Group, Conditional)
│ ├── io.rs (IO 操作: Http, Db, Mq)
│ ├── script.rs (脚本节点: ExprSet, ExprGet)
│ └── lineage.rs (血缘追踪)
│
├── dsl.rs <-- [接口层] 对外暴露的 DSL 构建宏和辅助函数
└── lib.rs <-- [入口] 统一导出公共 API
分层说明
- Core (
src/core): 定义了FlowNode接口、Context上下文和基础类型。它是整个框架的契约层,不依赖具体的业务实现。 - Runtime (
src/runtime): 包含FlowEngine调度器、状态存储接口 (StateStore) 和脚本引擎适配器 (ExprEngine)。它负责将流程定义跑起来。 - Nodes (
src/nodes): 具体的业务节点实现,包括控制流节点(Sequence/ForkJoin)、IO 节点(Http/Db)等。
🔧 节点类型
控制流 (Control Flow)
- Sequence:
sequence! { node1, node2 }- 按顺序执行。 - ForkJoin:
fork_join! { node1, node2 }- 并行执行,结果收集为数组。 - ForkJoin (Merge):
fork_join_merge! { "path", merge_mode, ... }- 并行执行并合并结果到 Context。 - Conditional:
ConditionalNode- 基于脚本条件的if-else逻辑。
脚本与数据 (Script & Data)
- ExprSet: 执行脚本并将结果写入 Context 指定路径。
- ExprGet: 仅执行脚本并返回结果。
- Lineage: 追踪数据读写血缘。
IO 与副作用 (Side Effects)
- HttpNode: 发送 GET/POST 请求。
- DbNode: 模拟数据库操作。
- MqNode: 模拟消息队列发送。
🤝 贡献
欢迎提交 Issue 和 Pull Request!
- Fork 本仓库。
- 创建你的特性分支 (
git checkout -b feature/AmazingFeature)。 - 提交你的更改 (
git commit -m 'Add some AmazingFeature')。 - 推送到分支 (
git push origin feature/AmazingFeature)。 - 打开一个 Pull Request。
📄 许可证
本项目采用 MIT 许可证 - 详情请见 LICENSE 文件。