Files
dsl_flow/dsl-flow
ayou 81da0fe61f feat: 重构项目结构并添加核心功能
refactor: 将代码按功能模块重新组织到 core/runtime/control 等目录
feat(core): 添加 Context、FlowNode 等核心 trait 和类型
feat(runtime): 实现 FlowEngine 和状态管理
feat(control): 添加顺序/并行/条件控制流节点
feat(nodes): 实现 HTTP/DB/MQ 等业务节点
docs: 更新 README 添加架构说明和快速开始示例
test: 添加性能测试脚本和示例代码
2025-12-14 23:50:40 +08:00
..

DSL Flow

Crates.io Docs.rs License

DSL Flow 是一个基于 Rust 的高性能、可扩展的工作流引擎。它旨在通过声明式的 DSL领域特定语言来编排复杂的异步任务支持状态管理、多语言脚本执行Rhai/JavaScript以及灵活的控制流。

该项目专为构建中间件、API 编排层和数据处理管道而设计,采用清晰的分层架构,易于扩展和维护。

核心特性

  • 声明式 DSL: 使用 Rust 宏 (sequence!, fork_join!, group!) 轻松定义复杂的流程结构。
  • 多语言支持:
    • 表达式求值: 支持 Rhai 和 JS 表达式,用于条件判断和简单数据处理。
    • 代码执行: 支持运行完整的 Rhai 或 JS 代码块,处理复杂业务逻辑。
  • 强大的控制流:
    • Sequence: 顺序执行任务。
    • Fork-Join: 并行执行多个分支支持灵活的结果合并策略Array 或 Object
    • Conditional: 基于表达式结果的条件分支 (if-else)。
  • 状态管理: 支持无状态Stateless和有状态Stateful执行模式。内置内存状态存储可扩展至 Redis/Database。
  • 内置节点: 提供 HTTP 请求、数据库模拟、消息队列模拟、血缘追踪等常用节点。
  • 异步高性能: 基于 TokioAsync Trait,全链路异步非阻塞。
  • 模块化架构: 内核Core、运行时Runtime、控制流Control与业务节点Nodes分离职责单一。

📦 安装

在你的 Cargo.toml 中添加依赖:

[dependencies]
dsl-flow = "0.1"

启用特定特性(可选):

[dependencies]
dsl-flow = { version = "0.1", features = ["js", "http"] }
  • rhai: 启用 Rhai 脚本引擎(默认开启)。
  • js: 启用 JavaScript (Boa) 脚本引擎。
  • http: 启用 HTTP 请求节点支持。

🚀 快速开始

以下示例展示了如何定义一个简单的流程:先计算 1+2,然后并行计算其 2 倍和 3 倍。

use dsl_flow::*;
use dsl_flow::dsl::*; // 导入 DSL 宏和辅助函数

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
    // 1. 初始化引擎
    let store = InMemoryStateStore::default();
    let engine = FlowEngine::new(store, FlowOptions {
        stateful: false,
        expr_engine: ExprEngineKind::Rhai
    });

    // 2. 定义流程
    let flow = Flow::new(sequence! {
        // 步骤 1: 设置初始值
        expr_set(ExprEngineKind::Rhai, "1 + 2", "calc.sum"),
        
        // 步骤 2: 并行执行
        fork_join! {
            // 使用 Rhai 代码块
            rhai("let sum = ctx.calc.sum; sum * 2"),
            // 使用 JS 代码块
            js("const sum = ctx.calc.sum; sum * 3")
        }
    });

    // 3. 运行流程
    let ctx = Context::new();
    let output = engine.run_stateless(&flow, ctx).await?;

    println!("Result: {}", output.data);
    Ok(())
}

🏗️ 架构概览

DSL Flow 采用清晰的分层架构设计,各层职责分明:

dsl-flow/src/ 
 ├── core/                <-- [内核层] 纯粹的抽象和基础类型 
 │   ├── mod.rs 
 │   ├── traits.rs        (FlowNode, ExprEngine 接口定义) 
 │   ├── context.rs       (Context 上下文管理) 
 │   ├── error.rs         (统一的错误处理) 
 │   ├── executor.rs      (TaskExecutor 执行器特征)
 │   └── types.rs         (NodeId, NodeOutput 等通用类型) 
 │ 
 ├── runtime/             <-- [运行时层] 负责跑起来 
 │   ├── mod.rs 
 │   ├── engine.rs        (FlowEngine 调度器) 
 │   ├── state.rs         (FlowState 状态存储) 
 │   └── expr.rs          (Rhai/JS 脚本引擎的具体实现) 
 │
 ├── control/             <-- [控制层] 流程控制逻辑
 │   ├── mod.rs
 │   ├── sequence.rs      (顺序执行)
 │   ├── fork_join.rs     (并行分支)
 │   └── conditional.rs   (条件判断)
 │
 ├── expression/          <-- [表达式层] 表达式处理
 │   ├── mod.rs
 │   ├── expr_set.rs      (设置变量)
 │   └── expr_get.rs      (获取变量)
 │ 
 ├── nodes/               <-- [业务层] 具体的业务节点实现
 │   ├── mod.rs 
 │   ├── code.rs          (通用代码执行: Rhai/JS)
 │   ├── io.rs            (IO 操作: Http, Db, Mq) 
 │   └── lineage.rs       (血缘追踪) 
 │ 
 ├── dsl.rs               <-- [接口层] 对外暴露的 DSL 构建宏和辅助函数 
 └── lib.rs               <-- [入口] 统一导出公共 API 

分层说明

  • Core: 定义了 FlowNodeTaskExecutor 接口,是整个框架的基石。
  • Runtime: 负责调度流程执行、管理状态和实例化脚本引擎。
  • Control: 实现了流程控制逻辑,如串行、并行、分支等。
  • Expression: 专注于简单的表达式求值和上下文变量操作。
  • Nodes: 包含所有具体的业务逻辑节点,如代码执行、网络请求、数据库操作等。

🔧 节点类型

控制流 (Control Flow)

  • Sequence: sequence! { node1, node2 } - 按顺序执行。
  • ForkJoin: fork_join! { node1, node2 } - 并行执行,结果收集为数组。
  • Conditional: ConditionalExecutor - 基于表达式结果的条件分支 (if-else)。

代码与表达式 (Code & Expression)

  • ExprSet: expr_set(...) - 执行表达式并将结果写入 Context。
  • Rhai Code: rhai("code") - 执行一段 Rhai 代码。
  • JS Code: js("code") - 执行一段 JavaScript 代码。

IO 与副作用 (Side Effects)

  • HttpNode: http_get/post(...) - 发送 HTTP 请求。
  • DbNode: db_node(...) - 模拟数据库操作。
  • MqNode: mq_node(...) - 模拟消息队列发送。

🤝 贡献

欢迎提交 Issue 和 Pull Request

  1. Fork 本仓库。
  2. 创建你的特性分支 (git checkout -b feature/AmazingFeature)。
  3. 提交你的更改 (git commit -m 'Add some AmazingFeature')。
  4. 推送到分支 (git push origin feature/AmazingFeature)。
  5. 打开一个 Pull Request。

📄 许可证

本项目采用 MIT 许可证 - 详情请见 LICENSE 文件。