Files
dsl_flow/README.md
ayou 81da0fe61f feat: 重构项目结构并添加核心功能
refactor: 将代码按功能模块重新组织到 core/runtime/control 等目录
feat(core): 添加 Context、FlowNode 等核心 trait 和类型
feat(runtime): 实现 FlowEngine 和状态管理
feat(control): 添加顺序/并行/条件控制流节点
feat(nodes): 实现 HTTP/DB/MQ 等业务节点
docs: 更新 README 添加架构说明和快速开始示例
test: 添加性能测试脚本和示例代码
2025-12-14 23:50:40 +08:00

149 lines
6.0 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# DSL Flow
[![Crates.io](https://img.shields.io/crates/v/dsl-flow.svg)](https://crates.io/crates/dsl-flow)
[![Docs.rs](https://docs.rs/dsl-flow/badge.svg)](https://docs.rs/dsl-flow)
[![License](https://img.shields.io/badge/license-MIT-blue.svg)](LICENSE)
**DSL Flow** 是一个基于 Rust 的高性能、可扩展的工作流引擎。它旨在通过声明式的 DSL领域特定语言来编排复杂的异步任务支持状态管理、脚本引擎集成Rhai/JavaScript以及灵活的控制流。
该项目专为构建中间件、API 编排层和数据处理管道而设计,采用清晰的分层架构,易于扩展和维护。
## ✨ 核心特性
* **声明式 DSL**: 使用 Rust 宏 (`sequence!`, `fork_join!`, `group!`) 轻松定义复杂的流程结构。
* **多脚本引擎**: 开箱支持 [Rhai](https://rhai.rs/) 和 JavaScript ([Boa](https://boajs.dev/)) 脚本,用于动态逻辑和数据转换。
* **强大的控制流**:
* **Sequence**: 顺序执行任务。
* **Fork-Join**: 并行执行多个分支支持灵活的结果合并策略Array 或 Object
* **Conditional**: 基于表达式结果的条件分支 (`if-else`)。
* **状态管理**: 支持无状态Stateless和有状态Stateful执行模式。内置内存状态存储可扩展至 Redis/Database。
* **内置节点**: 提供 HTTP 请求、数据库模拟、消息队列模拟、血缘追踪等常用节点。
* **异步高性能**: 基于 `Tokio``Async Trait`,全链路异步非阻塞。
* **模块化架构**: 内核Core、运行时Runtime与节点实现Nodes分离方便二次开发。
## 📦 安装
在你的 `Cargo.toml` 中添加依赖:
```toml
[dependencies]
dsl-flow = "0.1"
```
启用特定特性(可选):
```toml
[dependencies]
dsl-flow = { version = "0.1", features = ["js", "http"] }
```
* `rhai`: 启用 Rhai 脚本引擎(默认开启)。
* `js`: 启用 JavaScript (Boa) 脚本引擎。
* `http`: 启用 HTTP 请求节点支持。
## 🚀 快速开始
以下示例展示了如何定义一个简单的流程:先计算 `1+2`,然后并行计算其 2 倍和 3 倍。
```rust
use dsl_flow::*;
use dsl_flow::dsl::*; // 导入 DSL 宏和辅助函数
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// 1. 初始化引擎
let store = InMemoryStateStore::default();
let engine = FlowEngine::new(store, FlowOptions {
stateful: false,
expr_engine: ExprEngineKind::Rhai
});
// 2. 定义流程
let flow = Flow::new(sequence! {
// 步骤 1: 设置初始值
expr_set(ExprEngineKind::Rhai, "1 + 2", "calc.sum"),
// 步骤 2: 并行执行
fork_join! {
expr_set(ExprEngineKind::Rhai, "ctx.calc.sum * 2", "calc.double"),
expr_set(ExprEngineKind::Rhai, "ctx.calc.sum * 3", "calc.triple")
}
});
// 3. 运行流程
let ctx = Context::new();
let output = engine.run_stateless(&flow, ctx).await?;
println!("Result: {}", output.data);
Ok(())
}
```
## 🏗️ 架构概览
DSL Flow 采用清晰的分层架构设计,各层职责分明:
```text
dsl-flow/src/
├── core/ <-- [内核层] 纯粹的抽象和基础类型
│ ├── mod.rs
│ ├── traits.rs (FlowNode, ExprEngine 接口定义)
│ ├── context.rs (Context 上下文管理)
│ ├── error.rs (统一的错误处理)
│ └── types.rs (NodeId, NodeOutput 等通用类型)
├── runtime/ <-- [运行时层] 负责跑起来
│ ├── mod.rs
│ ├── engine.rs (FlowEngine 调度器)
│ ├── state.rs (FlowState 状态存储)
│ └── expr.rs (Rhai/JS 脚本引擎的具体实现)
├── nodes/ <-- [组件层] 具体的节点实现,按功能归类
│ ├── mod.rs
│ ├── control.rs (流程控制: Sequence, ForkJoin, Group, Conditional)
│ ├── io.rs (IO 操作: Http, Db, Mq)
│ ├── script.rs (脚本节点: ExprSet, ExprGet)
│ └── lineage.rs (血缘追踪)
├── dsl.rs <-- [接口层] 对外暴露的 DSL 构建宏和辅助函数
└── lib.rs <-- [入口] 统一导出公共 API
```
### 分层说明
* **Core (`src/core`)**: 定义了 `FlowNode` 接口、`Context` 上下文和基础类型。它是整个框架的契约层,不依赖具体的业务实现。
* **Runtime (`src/runtime`)**: 包含 `FlowEngine` 调度器、状态存储接口 (`StateStore`) 和脚本引擎适配器 (`ExprEngine`)。它负责将流程定义跑起来。
* **Nodes (`src/nodes`)**: 具体的业务节点实现包括控制流节点Sequence/ForkJoin、IO 节点Http/Db等。
## 🔧 节点类型
### 控制流 (Control Flow)
* **Sequence**: `sequence! { node1, node2 }` - 按顺序执行。
* **ForkJoin**: `fork_join! { node1, node2 }` - 并行执行,结果收集为数组。
* **ForkJoin (Merge)**: `fork_join_merge! { "path", merge_mode, ... }` - 并行执行并合并结果到 Context。
* **Conditional**: `ConditionalNode` - 基于脚本条件的 `if-else` 逻辑。
### 脚本与数据 (Script & Data)
* **ExprSet**: 执行脚本并将结果写入 Context 指定路径。
* **ExprGet**: 仅执行脚本并返回结果。
* **Lineage**: 追踪数据读写血缘。
### IO 与副作用 (Side Effects)
* **HttpNode**: 发送 GET/POST 请求。
* **DbNode**: 模拟数据库操作。
* **MqNode**: 模拟消息队列发送。
## 🤝 贡献
欢迎提交 Issue 和 Pull Request
1. Fork 本仓库。
2. 创建你的特性分支 (`git checkout -b feature/AmazingFeature`)。
3. 提交你的更改 (`git commit -m 'Add some AmazingFeature'`)。
4. 推送到分支 (`git push origin feature/AmazingFeature`)。
5. 打开一个 Pull Request。
## 📄 许可证
本项目采用 MIT 许可证 - 详情请见 [LICENSE](LICENSE) 文件。