# DSL Flow [![Crates.io](https://img.shields.io/crates/v/dsl-flow.svg)](https://crates.io/crates/dsl-flow) [![Docs.rs](https://docs.rs/dsl-flow/badge.svg)](https://docs.rs/dsl-flow) [![License](https://img.shields.io/badge/license-MIT-blue.svg)](LICENSE) **DSL Flow** 是一个基于 Rust 的高性能、可扩展的工作流引擎。它旨在通过声明式的 DSL(领域特定语言)来编排复杂的异步任务,支持状态管理、脚本引擎集成(Rhai/JavaScript)以及灵活的控制流。 该项目专为构建中间件、API 编排层和数据处理管道而设计,采用清晰的分层架构,易于扩展和维护。 ## ✨ 核心特性 * **声明式 DSL**: 使用 Rust 宏 (`sequence!`, `fork_join!`, `group!`) 轻松定义复杂的流程结构。 * **多脚本引擎**: 开箱支持 [Rhai](https://rhai.rs/) 和 JavaScript ([Boa](https://boajs.dev/)) 脚本,用于动态逻辑和数据转换。 * **强大的控制流**: * **Sequence**: 顺序执行任务。 * **Fork-Join**: 并行执行多个分支,支持灵活的结果合并策略(Array 或 Object)。 * **Conditional**: 基于表达式结果的条件分支 (`if-else`)。 * **状态管理**: 支持无状态(Stateless)和有状态(Stateful)执行模式。内置内存状态存储,可扩展至 Redis/Database。 * **内置节点**: 提供 HTTP 请求、数据库模拟、消息队列模拟、血缘追踪等常用节点。 * **异步高性能**: 基于 `Tokio` 和 `Async Trait`,全链路异步非阻塞。 * **模块化架构**: 内核(Core)、运行时(Runtime)与节点实现(Nodes)分离,方便二次开发。 ## 📦 安装 在你的 `Cargo.toml` 中添加依赖: ```toml [dependencies] dsl-flow = "0.1" ``` 启用特定特性(可选): ```toml [dependencies] dsl-flow = { version = "0.1", features = ["js", "http"] } ``` * `rhai`: 启用 Rhai 脚本引擎(默认开启)。 * `js`: 启用 JavaScript (Boa) 脚本引擎。 * `http`: 启用 HTTP 请求节点支持。 ## 🚀 快速开始 以下示例展示了如何定义一个简单的流程:先计算 `1+2`,然后并行计算其 2 倍和 3 倍。 ```rust use dsl_flow::*; use dsl_flow::dsl::*; // 导入 DSL 宏和辅助函数 #[tokio::main] async fn main() -> Result<(), Box> { // 1. 初始化引擎 let store = InMemoryStateStore::default(); let engine = FlowEngine::new(store, FlowOptions { stateful: false, expr_engine: ExprEngineKind::Rhai }); // 2. 定义流程 let flow = Flow::new(sequence! { // 步骤 1: 设置初始值 expr_set(ExprEngineKind::Rhai, "1 + 2", "calc.sum"), // 步骤 2: 并行执行 fork_join! { expr_set(ExprEngineKind::Rhai, "ctx.calc.sum * 2", "calc.double"), expr_set(ExprEngineKind::Rhai, "ctx.calc.sum * 3", "calc.triple") } }); // 3. 运行流程 let ctx = Context::new(); let output = engine.run_stateless(&flow, ctx).await?; println!("Result: {}", output.data); Ok(()) } ``` ## 🏗️ 架构概览 DSL Flow 采用清晰的分层架构设计,各层职责分明: ```text dsl-flow/src/ ├── core/ <-- [内核层] 纯粹的抽象和基础类型 │ ├── mod.rs │ ├── traits.rs (FlowNode, ExprEngine 接口定义) │ ├── context.rs (Context 上下文管理) │ ├── error.rs (统一的错误处理) │ └── types.rs (NodeId, NodeOutput 等通用类型) │ ├── runtime/ <-- [运行时层] 负责跑起来 │ ├── mod.rs │ ├── engine.rs (FlowEngine 调度器) │ ├── state.rs (FlowState 状态存储) │ └── expr.rs (Rhai/JS 脚本引擎的具体实现) │ ├── nodes/ <-- [组件层] 具体的节点实现,按功能归类 │ ├── mod.rs │ ├── control.rs (流程控制: Sequence, ForkJoin, Group, Conditional) │ ├── io.rs (IO 操作: Http, Db, Mq) │ ├── script.rs (脚本节点: ExprSet, ExprGet) │ └── lineage.rs (血缘追踪) │ ├── dsl.rs <-- [接口层] 对外暴露的 DSL 构建宏和辅助函数 └── lib.rs <-- [入口] 统一导出公共 API ``` ### 分层说明 * **Core (`src/core`)**: 定义了 `FlowNode` 接口、`Context` 上下文和基础类型。它是整个框架的契约层,不依赖具体的业务实现。 * **Runtime (`src/runtime`)**: 包含 `FlowEngine` 调度器、状态存储接口 (`StateStore`) 和脚本引擎适配器 (`ExprEngine`)。它负责将流程定义跑起来。 * **Nodes (`src/nodes`)**: 具体的业务节点实现,包括控制流节点(Sequence/ForkJoin)、IO 节点(Http/Db)等。 ## 🔧 节点类型 ### 控制流 (Control Flow) * **Sequence**: `sequence! { node1, node2 }` - 按顺序执行。 * **ForkJoin**: `fork_join! { node1, node2 }` - 并行执行,结果收集为数组。 * **ForkJoin (Merge)**: `fork_join_merge! { "path", merge_mode, ... }` - 并行执行并合并结果到 Context。 * **Conditional**: `ConditionalNode` - 基于脚本条件的 `if-else` 逻辑。 ### 脚本与数据 (Script & Data) * **ExprSet**: 执行脚本并将结果写入 Context 指定路径。 * **ExprGet**: 仅执行脚本并返回结果。 * **Lineage**: 追踪数据读写血缘。 ### IO 与副作用 (Side Effects) * **HttpNode**: 发送 GET/POST 请求。 * **DbNode**: 模拟数据库操作。 * **MqNode**: 模拟消息队列发送。 ## 🤝 贡献 欢迎提交 Issue 和 Pull Request! 1. Fork 本仓库。 2. 创建你的特性分支 (`git checkout -b feature/AmazingFeature`)。 3. 提交你的更改 (`git commit -m 'Add some AmazingFeature'`)。 4. 推送到分支 (`git push origin feature/AmazingFeature`)。 5. 打开一个 Pull Request。 ## 📄 许可证 本项目采用 MIT 许可证 - 详情请见 [LICENSE](LICENSE) 文件。