feat: 重构项目结构并添加核心功能

refactor: 将代码按功能模块重新组织到 core/runtime/control 等目录
feat(core): 添加 Context、FlowNode 等核心 trait 和类型
feat(runtime): 实现 FlowEngine 和状态管理
feat(control): 添加顺序/并行/条件控制流节点
feat(nodes): 实现 HTTP/DB/MQ 等业务节点
docs: 更新 README 添加架构说明和快速开始示例
test: 添加性能测试脚本和示例代码
This commit is contained in:
2025-12-14 23:50:40 +08:00
parent c24348c5c4
commit 81da0fe61f
43 changed files with 3421 additions and 698 deletions

6
.cargo/config.toml Normal file
View File

@ -0,0 +1,6 @@
[http]
proxy = "http://127.0.0.1:7890"
[net]
git-fetch-with-cli = true

16
Cargo.lock generated
View File

@ -1635,6 +1635,15 @@ version = "1.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "650eef8c711430f1a879fdd01d4745a7deea475becfb90269c06775983bbf086"
[[package]]
name = "no-std-compat"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b93853da6d84c2e3c7d730d6473e8817692dd89be387eb01b94d7f108ecb5b8c"
dependencies = [
"spin",
]
[[package]]
name = "nu-ansi-term"
version = "0.50.3"
@ -2206,6 +2215,7 @@ dependencies = [
"ahash",
"bitflags",
"instant",
"no-std-compat",
"num-traits",
"once_cell",
"rhai_codegen",
@ -2507,6 +2517,12 @@ dependencies = [
"windows-sys 0.60.2",
]
[[package]]
name = "spin"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
[[package]]
name = "sptr"
version = "0.3.2"

148
README.md
View File

@ -1,2 +1,148 @@
# dsl_flow
# DSL Flow
[![Crates.io](https://img.shields.io/crates/v/dsl-flow.svg)](https://crates.io/crates/dsl-flow)
[![Docs.rs](https://docs.rs/dsl-flow/badge.svg)](https://docs.rs/dsl-flow)
[![License](https://img.shields.io/badge/license-MIT-blue.svg)](LICENSE)
**DSL Flow** 是一个基于 Rust 的高性能、可扩展的工作流引擎。它旨在通过声明式的 DSL领域特定语言来编排复杂的异步任务支持状态管理、脚本引擎集成Rhai/JavaScript以及灵活的控制流。
该项目专为构建中间件、API 编排层和数据处理管道而设计,采用清晰的分层架构,易于扩展和维护。
## ✨ 核心特性
* **声明式 DSL**: 使用 Rust 宏 (`sequence!`, `fork_join!`, `group!`) 轻松定义复杂的流程结构。
* **多脚本引擎**: 开箱支持 [Rhai](https://rhai.rs/) 和 JavaScript ([Boa](https://boajs.dev/)) 脚本,用于动态逻辑和数据转换。
* **强大的控制流**:
* **Sequence**: 顺序执行任务。
* **Fork-Join**: 并行执行多个分支支持灵活的结果合并策略Array 或 Object
* **Conditional**: 基于表达式结果的条件分支 (`if-else`)。
* **状态管理**: 支持无状态Stateless和有状态Stateful执行模式。内置内存状态存储可扩展至 Redis/Database。
* **内置节点**: 提供 HTTP 请求、数据库模拟、消息队列模拟、血缘追踪等常用节点。
* **异步高性能**: 基于 `Tokio``Async Trait`,全链路异步非阻塞。
* **模块化架构**: 内核Core、运行时Runtime与节点实现Nodes分离方便二次开发。
## 📦 安装
在你的 `Cargo.toml` 中添加依赖:
```toml
[dependencies]
dsl-flow = "0.1"
```
启用特定特性(可选):
```toml
[dependencies]
dsl-flow = { version = "0.1", features = ["js", "http"] }
```
* `rhai`: 启用 Rhai 脚本引擎(默认开启)。
* `js`: 启用 JavaScript (Boa) 脚本引擎。
* `http`: 启用 HTTP 请求节点支持。
## 🚀 快速开始
以下示例展示了如何定义一个简单的流程:先计算 `1+2`,然后并行计算其 2 倍和 3 倍。
```rust
use dsl_flow::*;
use dsl_flow::dsl::*; // 导入 DSL 宏和辅助函数
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// 1. 初始化引擎
let store = InMemoryStateStore::default();
let engine = FlowEngine::new(store, FlowOptions {
stateful: false,
expr_engine: ExprEngineKind::Rhai
});
// 2. 定义流程
let flow = Flow::new(sequence! {
// 步骤 1: 设置初始值
expr_set(ExprEngineKind::Rhai, "1 + 2", "calc.sum"),
// 步骤 2: 并行执行
fork_join! {
expr_set(ExprEngineKind::Rhai, "ctx.calc.sum * 2", "calc.double"),
expr_set(ExprEngineKind::Rhai, "ctx.calc.sum * 3", "calc.triple")
}
});
// 3. 运行流程
let ctx = Context::new();
let output = engine.run_stateless(&flow, ctx).await?;
println!("Result: {}", output.data);
Ok(())
}
```
## 🏗️ 架构概览
DSL Flow 采用清晰的分层架构设计,各层职责分明:
```text
dsl-flow/src/
├── core/ <-- [内核层] 纯粹的抽象和基础类型
│ ├── mod.rs
│ ├── traits.rs (FlowNode, ExprEngine 接口定义)
│ ├── context.rs (Context 上下文管理)
│ ├── error.rs (统一的错误处理)
│ └── types.rs (NodeId, NodeOutput 等通用类型)
├── runtime/ <-- [运行时层] 负责跑起来
│ ├── mod.rs
│ ├── engine.rs (FlowEngine 调度器)
│ ├── state.rs (FlowState 状态存储)
│ └── expr.rs (Rhai/JS 脚本引擎的具体实现)
├── nodes/ <-- [组件层] 具体的节点实现,按功能归类
│ ├── mod.rs
│ ├── control.rs (流程控制: Sequence, ForkJoin, Group, Conditional)
│ ├── io.rs (IO 操作: Http, Db, Mq)
│ ├── script.rs (脚本节点: ExprSet, ExprGet)
│ └── lineage.rs (血缘追踪)
├── dsl.rs <-- [接口层] 对外暴露的 DSL 构建宏和辅助函数
└── lib.rs <-- [入口] 统一导出公共 API
```
### 分层说明
* **Core (`src/core`)**: 定义了 `FlowNode` 接口、`Context` 上下文和基础类型。它是整个框架的契约层,不依赖具体的业务实现。
* **Runtime (`src/runtime`)**: 包含 `FlowEngine` 调度器、状态存储接口 (`StateStore`) 和脚本引擎适配器 (`ExprEngine`)。它负责将流程定义跑起来。
* **Nodes (`src/nodes`)**: 具体的业务节点实现包括控制流节点Sequence/ForkJoin、IO 节点Http/Db等。
## 🔧 节点类型
### 控制流 (Control Flow)
* **Sequence**: `sequence! { node1, node2 }` - 按顺序执行。
* **ForkJoin**: `fork_join! { node1, node2 }` - 并行执行,结果收集为数组。
* **ForkJoin (Merge)**: `fork_join_merge! { "path", merge_mode, ... }` - 并行执行并合并结果到 Context。
* **Conditional**: `ConditionalNode` - 基于脚本条件的 `if-else` 逻辑。
### 脚本与数据 (Script & Data)
* **ExprSet**: 执行脚本并将结果写入 Context 指定路径。
* **ExprGet**: 仅执行脚本并返回结果。
* **Lineage**: 追踪数据读写血缘。
### IO 与副作用 (Side Effects)
* **HttpNode**: 发送 GET/POST 请求。
* **DbNode**: 模拟数据库操作。
* **MqNode**: 模拟消息队列发送。
## 🤝 贡献
欢迎提交 Issue 和 Pull Request
1. Fork 本仓库。
2. 创建你的特性分支 (`git checkout -b feature/AmazingFeature`)。
3. 提交你的更改 (`git commit -m 'Add some AmazingFeature'`)。
4. 推送到分支 (`git push origin feature/AmazingFeature`)。
5. 打开一个 Pull Request。
## 📄 许可证
本项目采用 MIT 许可证 - 详情请见 [LICENSE](LICENSE) 文件。

File diff suppressed because it is too large Load Diff

View File

@ -1,12 +1,19 @@
# dsl-flow Test Performance
## JS feature
- test_rhai_expr_set_and_get: 0.269s
- test_conditional_node_then_else: 0.396s
- test_http_node_with_mock: 0.462s
- test_stateful_engine: 0.447s
- test_db_and_mq_nodes: 0.544s
- test_group_parallel_sleep: 1.425s
- test_expr_set_without_engine_error: 1.074s
- test_js_expr_and_fork_join: 0.586s
- test_rhai_expr_set_and_get: error: cannot specify features for packages outside of workspace
0.011s
- test_conditional_node_then_else: error: cannot specify features for packages outside of workspace
0.009s
- test_http_node_with_mock: error: cannot specify features for packages outside of workspace
0.009s
- test_stateful_engine: error: cannot specify features for packages outside of workspace
0.010s
- test_db_and_mq_nodes: error: cannot specify features for packages outside of workspace
0.011s
- test_group_parallel_sleep: error: cannot specify features for packages outside of workspace
0.010s
- test_expr_set_without_engine_error: error: cannot specify features for packages outside of workspace
0.010s
- test_js_expr_and_fork_join: error: cannot specify features for packages outside of workspace
0.012s

View File

@ -1,7 +1,7 @@
[package]
name = "dsl-flow"
version = "0.1.0"
edition = "2024"
edition = "2021"
license = "MIT"
description = "A Rust DSL-based workflow engine supporting stateful/stateless flows, async nodes, fork-join, and extensible expression engines (Rhai/JS)."
readme = "README.md"
@ -28,7 +28,7 @@ tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["fmt", "env-filter"] }
# Optional engines and nodes
rhai = { version = "1", optional = true, features = ["serde"] }
rhai = { version = "1", optional = true, features = ["serde", "sync"] }
boa_engine = { version = "0.20", optional = true }
reqwest = { version = "0.12", optional = true, features = ["json", "rustls-tls"] }

164
dsl-flow/README.md Normal file
View File

@ -0,0 +1,164 @@
# DSL Flow
[![Crates.io](https://img.shields.io/crates/v/dsl-flow.svg)](https://crates.io/crates/dsl-flow)
[![Docs.rs](https://docs.rs/dsl-flow/badge.svg)](https://docs.rs/dsl-flow)
[![License](https://img.shields.io/badge/license-MIT-blue.svg)](LICENSE)
**DSL Flow** 是一个基于 Rust 的高性能、可扩展的工作流引擎。它旨在通过声明式的 DSL领域特定语言来编排复杂的异步任务支持状态管理、多语言脚本执行Rhai/JavaScript以及灵活的控制流。
该项目专为构建中间件、API 编排层和数据处理管道而设计,采用清晰的分层架构,易于扩展和维护。
## ✨ 核心特性
* **声明式 DSL**: 使用 Rust 宏 (`sequence!`, `fork_join!`, `group!`) 轻松定义复杂的流程结构。
* **多语言支持**:
* **表达式求值**: 支持 Rhai 和 JS 表达式,用于条件判断和简单数据处理。
* **代码执行**: 支持运行完整的 Rhai 或 JS 代码块,处理复杂业务逻辑。
* **强大的控制流**:
* **Sequence**: 顺序执行任务。
* **Fork-Join**: 并行执行多个分支支持灵活的结果合并策略Array 或 Object
* **Conditional**: 基于表达式结果的条件分支 (`if-else`)。
* **状态管理**: 支持无状态Stateless和有状态Stateful执行模式。内置内存状态存储可扩展至 Redis/Database。
* **内置节点**: 提供 HTTP 请求、数据库模拟、消息队列模拟、血缘追踪等常用节点。
* **异步高性能**: 基于 `Tokio``Async Trait`,全链路异步非阻塞。
* **模块化架构**: 内核Core、运行时Runtime、控制流Control与业务节点Nodes分离职责单一。
## 📦 安装
在你的 `Cargo.toml` 中添加依赖:
```toml
[dependencies]
dsl-flow = "0.1"
```
启用特定特性(可选):
```toml
[dependencies]
dsl-flow = { version = "0.1", features = ["js", "http"] }
```
* `rhai`: 启用 Rhai 脚本引擎(默认开启)。
* `js`: 启用 JavaScript (Boa) 脚本引擎。
* `http`: 启用 HTTP 请求节点支持。
## 🚀 快速开始
以下示例展示了如何定义一个简单的流程:先计算 `1+2`,然后并行计算其 2 倍和 3 倍。
```rust
use dsl_flow::*;
use dsl_flow::dsl::*; // 导入 DSL 宏和辅助函数
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// 1. 初始化引擎
let store = InMemoryStateStore::default();
let engine = FlowEngine::new(store, FlowOptions {
stateful: false,
expr_engine: ExprEngineKind::Rhai
});
// 2. 定义流程
let flow = Flow::new(sequence! {
// 步骤 1: 设置初始值
expr_set(ExprEngineKind::Rhai, "1 + 2", "calc.sum"),
// 步骤 2: 并行执行
fork_join! {
// 使用 Rhai 代码块
rhai("let sum = ctx.calc.sum; sum * 2"),
// 使用 JS 代码块
js("const sum = ctx.calc.sum; sum * 3")
}
});
// 3. 运行流程
let ctx = Context::new();
let output = engine.run_stateless(&flow, ctx).await?;
println!("Result: {}", output.data);
Ok(())
}
```
## 🏗️ 架构概览
DSL Flow 采用清晰的分层架构设计,各层职责分明:
```text
dsl-flow/src/
├── core/ <-- [内核层] 纯粹的抽象和基础类型
│ ├── mod.rs
│ ├── traits.rs (FlowNode, ExprEngine 接口定义)
│ ├── context.rs (Context 上下文管理)
│ ├── error.rs (统一的错误处理)
│ ├── executor.rs (TaskExecutor 执行器特征)
│ └── types.rs (NodeId, NodeOutput 等通用类型)
├── runtime/ <-- [运行时层] 负责跑起来
│ ├── mod.rs
│ ├── engine.rs (FlowEngine 调度器)
│ ├── state.rs (FlowState 状态存储)
│ └── expr.rs (Rhai/JS 脚本引擎的具体实现)
├── control/ <-- [控制层] 流程控制逻辑
│ ├── mod.rs
│ ├── sequence.rs (顺序执行)
│ ├── fork_join.rs (并行分支)
│ └── conditional.rs (条件判断)
├── expression/ <-- [表达式层] 表达式处理
│ ├── mod.rs
│ ├── expr_set.rs (设置变量)
│ └── expr_get.rs (获取变量)
├── nodes/ <-- [业务层] 具体的业务节点实现
│ ├── mod.rs
│ ├── code.rs (通用代码执行: Rhai/JS)
│ ├── io.rs (IO 操作: Http, Db, Mq)
│ └── lineage.rs (血缘追踪)
├── dsl.rs <-- [接口层] 对外暴露的 DSL 构建宏和辅助函数
└── lib.rs <-- [入口] 统一导出公共 API
```
### 分层说明
* **Core**: 定义了 `FlowNode``TaskExecutor` 接口,是整个框架的基石。
* **Runtime**: 负责调度流程执行、管理状态和实例化脚本引擎。
* **Control**: 实现了流程控制逻辑,如串行、并行、分支等。
* **Expression**: 专注于简单的表达式求值和上下文变量操作。
* **Nodes**: 包含所有具体的业务逻辑节点,如代码执行、网络请求、数据库操作等。
## 🔧 节点类型
### 控制流 (Control Flow)
* **Sequence**: `sequence! { node1, node2 }` - 按顺序执行。
* **ForkJoin**: `fork_join! { node1, node2 }` - 并行执行,结果收集为数组。
* **Conditional**: `ConditionalExecutor` - 基于表达式结果的条件分支 (`if-else`)。
### 代码与表达式 (Code & Expression)
* **ExprSet**: `expr_set(...)` - 执行表达式并将结果写入 Context。
* **Rhai Code**: `rhai("code")` - 执行一段 Rhai 代码。
* **JS Code**: `js("code")` - 执行一段 JavaScript 代码。
### IO 与副作用 (Side Effects)
* **HttpNode**: `http_get/post(...)` - 发送 HTTP 请求。
* **DbNode**: `db_node(...)` - 模拟数据库操作。
* **MqNode**: `mq_node(...)` - 模拟消息队列发送。
## 🤝 贡献
欢迎提交 Issue 和 Pull Request
1. Fork 本仓库。
2. 创建你的特性分支 (`git checkout -b feature/AmazingFeature`)。
3. 提交你的更改 (`git commit -m 'Add some AmazingFeature'`)。
4. 推送到分支 (`git push origin feature/AmazingFeature`)。
5. 打开一个 Pull Request。
## 📄 许可证
本项目采用 MIT 许可证 - 详情请见 [LICENSE](LICENSE) 文件。

View File

@ -1,8 +1,7 @@
use dsl_flow::*;
use serde_json::json;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
tracing_subscriber::fmt().with_env_filter("info").init();
let store = InMemoryStateStore::default();
let engine = FlowEngine::new(store, FlowOptions { stateful: false, expr_engine: ExprEngineKind::Rhai });

View File

@ -0,0 +1,31 @@
use crate::core::context::Context;
use crate::core::error::NodeError;
use crate::core::executor::TaskExecutor;
use crate::core::traits::ExprEngine;
use crate::core::types::NodeRef;
use serde_json::Value;
/// 条件分支执行器
#[derive(Clone)]
pub struct ConditionalExecutor {
#[allow(dead_code)]
pub engine: crate::runtime::ExprEngineKind,
pub condition: String,
pub then_node: NodeRef,
pub else_node: Option<NodeRef>,
}
#[async_trait::async_trait]
impl TaskExecutor for ConditionalExecutor {
async fn execute(&self, ctx: &mut Context, expr: Option<&dyn ExprEngine>) -> Result<Value, NodeError> {
let engine = expr.ok_or_else(|| NodeError::Exec("Expr engine not provided".into()))?;
let val = engine.eval(&self.condition, ctx).await.map_err(|e| NodeError::Exec(e.to_string()))?;
let cond = match val {
Value::Bool(b) => b,
_ => false,
};
let selected = if cond { &self.then_node } else { self.else_node.as_ref().unwrap_or(&self.then_node) };
let out = selected.execute(ctx, Some(engine)).await?;
Ok(out.data)
}
}

View File

@ -0,0 +1,49 @@
use crate::core::context::Context;
use crate::core::error::NodeError;
use crate::core::executor::TaskExecutor;
use crate::core::traits::ExprEngine;
use crate::core::types::NodeRef;
use crate::control::merge_mode::MergeMode;
use serde_json::Value;
/// 并行分支执行器 (Fork-Join)
#[derive(Clone)]
pub struct ForkJoinExecutor {
pub branches: Vec<NodeRef>,
pub merge_to_ctx: Option<String>,
pub merge_mode: MergeMode,
}
#[async_trait::async_trait]
impl TaskExecutor for ForkJoinExecutor {
async fn execute(&self, ctx: &mut Context, expr: Option<&dyn ExprEngine>) -> Result<Value, NodeError> {
let mut tasks = Vec::with_capacity(self.branches.len());
for b in &self.branches {
let mut subctx = Context::with_value(ctx.as_value().clone());
let b = b.clone();
tasks.push(Box::pin(async move { b.execute(&mut subctx, expr).await }));
}
let joined = futures::future::join_all(tasks).await;
let mut results = Vec::new();
for res in joined {
let out = res?;
results.push(serde_json::json!({ "id": out.id, "data": out.data }));
}
let data = match self.merge_mode {
MergeMode::Array => Value::Array(results.clone()),
MergeMode::ObjectById => {
let mut map = serde_json::Map::new();
for item in &results {
let id = item.get("id").and_then(|v| v.as_str()).unwrap_or_default().to_string();
let data = item.get("data").cloned().unwrap_or(Value::Null);
map.insert(id, data);
}
Value::Object(map)
}
};
if let Some(path) = &self.merge_to_ctx {
ctx.set(path, data.clone());
}
Ok(data)
}
}

View File

@ -0,0 +1,50 @@
use crate::core::context::Context;
use crate::core::error::NodeError;
use crate::core::executor::TaskExecutor;
use crate::core::traits::ExprEngine;
use crate::core::types::NodeRef;
use crate::control::merge_mode::MergeMode;
use serde_json::Value;
/// 并行执行组执行器
#[derive(Clone)]
pub struct GroupExecutor {
pub parallel: Vec<NodeRef>,
pub merge_to_ctx: Option<String>,
pub merge_mode: MergeMode,
}
#[async_trait::async_trait]
impl TaskExecutor for GroupExecutor {
async fn execute(&self, ctx: &mut Context, expr: Option<&dyn ExprEngine>) -> Result<Value, NodeError> {
// Logic is same as ForkJoin currently
let mut joins = Vec::with_capacity(self.parallel.len());
for n in &self.parallel {
let mut subctx = Context::with_value(ctx.as_value().clone());
let n = n.clone();
joins.push(Box::pin(async move { n.execute(&mut subctx, expr).await }));
}
let joined = futures::future::join_all(joins).await;
let mut results = Vec::new();
for res in joined {
let out = res?;
results.push(serde_json::json!({ "id": out.id, "data": out.data }));
}
let data = match self.merge_mode {
MergeMode::Array => Value::Array(results.clone()),
MergeMode::ObjectById => {
let mut map = serde_json::Map::new();
for item in &results {
let id = item.get("id").and_then(|v| v.as_str()).unwrap_or_default().to_string();
let data = item.get("data").cloned().unwrap_or(Value::Null);
map.insert(id, data);
}
Value::Object(map)
}
};
if let Some(path) = &self.merge_to_ctx {
ctx.set(path, data.clone());
}
Ok(data)
}
}

View File

@ -0,0 +1,8 @@
use serde::{Deserialize, Serialize};
/// 结果合并模式
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub enum MergeMode {
Array,
ObjectById,
}

View File

@ -0,0 +1,11 @@
pub mod sequence;
pub mod fork_join;
pub mod group;
pub mod conditional;
pub mod merge_mode;
pub use sequence::SequenceExecutor;
pub use fork_join::ForkJoinExecutor;
pub use group::GroupExecutor;
pub use conditional::ConditionalExecutor;
pub use merge_mode::MergeMode;

View File

@ -0,0 +1,24 @@
use crate::core::context::Context;
use crate::core::error::NodeError;
use crate::core::executor::TaskExecutor;
use crate::core::traits::ExprEngine;
use crate::core::types::NodeRef;
use serde_json::Value;
/// 顺序执行器
#[derive(Clone)]
pub struct SequenceExecutor {
pub children: Vec<NodeRef>,
}
#[async_trait::async_trait]
impl TaskExecutor for SequenceExecutor {
async fn execute(&self, ctx: &mut Context, expr: Option<&dyn ExprEngine>) -> Result<Value, NodeError> {
let mut last = Value::Null;
for child in &self.children {
let out = child.execute(ctx, expr).await?;
last = out.data;
}
Ok(last)
}
}

View File

@ -2,55 +2,79 @@ use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use std::time::{SystemTime, UNIX_EPOCH};
/// 执行上下文,存储流程数据和元数据
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct Context {
/// 实际数据 (JSON 对象)
data: Value,
/// 元数据 (如血缘信息)
meta: ContextMeta,
}
impl Context {
/// 创建一个新的空上下文
pub fn new() -> Self {
Self { data: json!({}), meta: ContextMeta::default() }
}
/// 使用给定值创建上下文
pub fn with_value(value: Value) -> Self {
Self { data: value, meta: ContextMeta::default() }
}
/// 获取指定路径的值
///
/// # 参数
/// * `path` - 路径 (例如 "foo.bar")
pub fn get(&self, path: impl AsRef<str>) -> Option<Value> {
get_path(&self.data, path.as_ref())
}
/// 设置指定路径的值
///
/// # 参数
/// * `path` - 路径 (例如 "foo.bar")
/// * `value` - 值
pub fn set(&mut self, path: impl AsRef<str>, value: Value) {
set_path(&mut self.data, path.as_ref(), value);
}
/// 获取底层 JSON 值引用
pub fn as_value(&self) -> &Value {
&self.data
}
/// 记录写入操作 (用于血缘追踪)
pub fn record_write(&mut self, node_id: impl Into<String>, path: impl Into<String>) {
let ts = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() as u64;
self.meta.lineage.push(LineageEntry { node_id: node_id.into(), path: path.into(), ts });
}
/// 获取血缘记录
pub fn lineage(&self) -> Vec<LineageEntry> {
self.meta.lineage.clone()
}
}
/// 路径辅助结构
#[derive(Debug, Clone)]
pub struct ValuePath;
/// 上下文元数据
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct ContextMeta {
/// 血缘记录列表
pub lineage: Vec<LineageEntry>,
}
/// 血缘记录条目
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LineageEntry {
/// 写入节点 ID
pub node_id: String,
/// 写入路径
pub path: String,
/// 时间戳 (毫秒)
pub ts: u64,
}
@ -80,44 +104,32 @@ fn get_path(root: &Value, path: &str) -> Option<Value> {
fn set_path(root: &mut Value, path: &str, value: Value) {
let segs = split_path(path);
if segs.is_empty() { return; }
let mut cur = root;
for (i, seg) in segs.iter().enumerate() {
let is_last = i == segs.len() - 1;
match cur {
Value::Object(map) => {
if is_last {
if is_last {
match cur {
Value::Object(map) => {
map.insert((*seg).to_string(), value);
} else {
if !map.contains_key(*seg) {
map.insert((*seg).to_string(), json!({}));
}
cur = map.get_mut(*seg).unwrap();
}
_ => {
*cur = json!({ (*seg): value });
}
}
Value::Null => {
*cur = json!({});
if let Value::Object(map) = cur {
if is_last {
map.insert((*seg).to_string(), value);
} else {
map.insert((*seg).to_string(), json!({}));
cur = map.get_mut(*seg).unwrap();
}
}
}
_ => {
// Overwrite non-object with object to proceed
*cur = json!({});
if let Value::Object(map) = cur {
if is_last {
map.insert((*seg).to_string(), value);
} else {
map.insert((*seg).to_string(), json!({}));
cur = map.get_mut(*seg).unwrap();
}
}
return;
}
if !cur.is_object() {
*cur = json!({});
}
if let Value::Object(map) = cur {
if !map.contains_key(*seg) {
map.insert((*seg).to_string(), json!({}));
}
cur = map.get_mut(*seg).unwrap();
}
}
}

View File

@ -0,0 +1,19 @@
use thiserror::Error;
/// 节点执行错误
#[derive(Debug, Error)]
pub enum NodeError {
#[error("Execution error: {0}")]
Exec(String),
}
/// 表达式执行错误
#[derive(Debug, Error)]
pub enum ExprError {
#[error("Rhai error: {0}")]
Rhai(String),
#[error("JS error: {0}")]
Js(String),
#[error("Unsupported engine")]
Unsupported,
}

View File

@ -0,0 +1,11 @@
use crate::core::context::Context;
use crate::core::error::NodeError;
use crate::core::traits::ExprEngine;
use serde_json::Value;
/// 任务执行器特征
#[async_trait::async_trait]
pub trait TaskExecutor: Send + Sync {
/// 执行任务逻辑
async fn execute(&self, ctx: &mut Context, expr: Option<&dyn ExprEngine>) -> Result<Value, NodeError>;
}

14
dsl-flow/src/core/mod.rs Normal file
View File

@ -0,0 +1,14 @@
pub mod context;
pub mod types;
pub mod error;
pub mod traits;
pub mod executor;
pub mod node;
pub use context::{Context, LineageEntry};
pub use types::{NodeId, NodeOutput};
pub use error::{NodeError, ExprError};
pub use traits::{FlowNode, ExprEngine};
pub use executor::TaskExecutor;
pub use node::Node;

35
dsl-flow/src/core/node.rs Normal file
View File

@ -0,0 +1,35 @@
use crate::core::context::Context;
use crate::core::error::NodeError;
use crate::core::executor::TaskExecutor;
use crate::core::traits::{ExprEngine, FlowNode};
use crate::core::types::{NodeId, NodeOutput};
/// 通用节点
pub struct Node {
pub id: NodeId,
pub name: Option<String>,
pub executor: Box<dyn TaskExecutor>,
}
impl Node {
pub fn new(id: NodeId, executor: Box<dyn TaskExecutor>) -> Self {
Self { id, name: None, executor }
}
pub fn with_name(mut self, name: impl Into<String>) -> Self {
self.name = Some(name.into());
self
}
}
#[async_trait::async_trait]
impl FlowNode for Node {
fn id(&self) -> &str {
&self.id
}
async fn execute(&self, ctx: &mut Context, expr: Option<&dyn ExprEngine>) -> Result<NodeOutput, NodeError> {
let data = self.executor.execute(ctx, expr).await?;
Ok(NodeOutput { id: self.id.clone(), data })
}
}

View File

@ -0,0 +1,28 @@
use crate::core::context::Context;
use crate::core::error::{ExprError, NodeError};
use crate::core::types::NodeOutput;
use serde_json::Value;
/// 流程节点特征
#[async_trait::async_trait]
pub trait FlowNode: Send + Sync {
/// 获取节点 ID
fn id(&self) -> &str;
/// 执行节点逻辑
///
/// # 参数
/// * `ctx` - 执行上下文
/// * `expr` - 表达式引擎 (可选)
async fn execute(&self, ctx: &mut Context, expr: Option<&dyn ExprEngine>) -> Result<NodeOutput, NodeError>;
}
/// 表达式引擎特征
#[async_trait::async_trait]
pub trait ExprEngine: Send + Sync {
/// 执行脚本
///
/// # 参数
/// * `script` - 脚本内容
/// * `ctx` - 执行上下文
async fn eval(&self, script: &str, ctx: &Context) -> Result<Value, ExprError>;
}

View File

@ -0,0 +1,28 @@
use serde::{Deserialize, Serialize};
use serde_json::Value;
use crate::core::traits::FlowNode;
use std::sync::Arc;
/// 节点引用类型 (Arc<dyn FlowNode>)
pub type NodeRef = Arc<dyn FlowNode>;
/// 节点 ID 类型
pub type NodeId = String;
/// 节点输出
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NodeOutput {
/// 节点 ID
pub id: NodeId,
/// 节点输出数据
pub data: Value,
}
/// 生成唯一的节点 ID
///
/// # 参数
/// * `prefix` - ID 前缀
pub fn node_id(prefix: &str) -> NodeId {
format!("{}-{}", prefix, uuid::Uuid::new_v4())
}

237
dsl-flow/src/dsl.rs Normal file
View File

@ -0,0 +1,237 @@
use crate::core::node::Node;
use crate::core::types::node_id;
use crate::control::MergeMode;
use crate::expression::{
ExprSetExecutor, ExprGetExecutor
};
use crate::nodes::{
DbExecutor, MqExecutor, LineageExecutor
};
#[cfg(feature = "http")]
use crate::nodes::HttpExecutor;
use crate::runtime::ExprEngineKind;
/// 创建一个顺序执行的节点
#[macro_export]
macro_rules! sequence {
( $($node:expr),* $(,)? ) => {{
let mut nodes: Vec<std::sync::Arc<dyn $crate::core::traits::FlowNode>> = Vec::new();
$(
nodes.push(std::sync::Arc::new($node));
)*
$crate::core::node::Node::new(
$crate::core::types::node_id("seq"),
Box::new($crate::control::SequenceExecutor { children: nodes })
)
}};
}
/// 创建一个并行分支执行的节点 (Fork-Join)
#[macro_export]
macro_rules! fork_join {
( $($node:expr),* $(,)? ) => {{
let mut nodes: Vec<std::sync::Arc<dyn $crate::core::traits::FlowNode>> = Vec::new();
$(
nodes.push(std::sync::Arc::new($node));
)*
$crate::core::node::Node::new(
$crate::core::types::node_id("fork"),
Box::new($crate::control::ForkJoinExecutor {
branches: nodes,
merge_to_ctx: None,
merge_mode: $crate::control::MergeMode::Array
})
)
}};
}
/// 创建一个并行分支执行的节点,并合并结果
#[macro_export]
macro_rules! fork_join_merge {
( $merge_path:expr, $mode:expr, $( $node:expr ),* $(,)? ) => {{
let mut nodes: Vec<std::sync::Arc<dyn $crate::core::traits::FlowNode>> = Vec::new();
$(
nodes.push(std::sync::Arc::new($node));
)*
$crate::core::node::Node::new(
$crate::core::types::node_id("fork"),
Box::new($crate::control::ForkJoinExecutor {
branches: nodes,
merge_to_ctx: Some($merge_path.into()),
merge_mode: $mode
})
)
}};
}
/// 创建一个并行执行组节点
#[macro_export]
macro_rules! group {
( $($node:expr),* $(,)? ) => {{
let mut nodes: Vec<std::sync::Arc<dyn $crate::core::traits::FlowNode>> = Vec::new();
$(
nodes.push(std::sync::Arc::new($node));
)*
$crate::core::node::Node::new(
$crate::core::types::node_id("group"),
Box::new($crate::control::GroupExecutor {
parallel: nodes,
merge_to_ctx: None,
merge_mode: $crate::control::MergeMode::Array
})
)
}};
}
/// 创建一个并行执行组节点,并合并结果
#[macro_export]
macro_rules! group_merge {
( $merge_path:expr, $mode:expr, $( $node:expr ),* $(,)? ) => {{
let mut nodes: Vec<std::sync::Arc<dyn $crate::core::traits::FlowNode>> = Vec::new();
$(
nodes.push(std::sync::Arc::new($node));
)*
$crate::core::node::Node::new(
$crate::core::types::node_id("group"),
Box::new($crate::control::GroupExecutor {
parallel: nodes,
merge_to_ctx: Some($merge_path.into()),
merge_mode: $mode
})
)
}};
}
/// 创建一个表达式设置节点
pub fn expr_set(engine: ExprEngineKind, script: &str, target_path: &str) -> Node {
Node::new(
node_id("expr_set"),
Box::new(ExprSetExecutor {
engine,
script: script.into(),
target_path: target_path.into(),
}),
)
}
/// 创建一个表达式获取节点
pub fn expr_get(engine: ExprEngineKind, script: &str) -> Node {
Node::new(
node_id("expr_get"),
Box::new(ExprGetExecutor {
engine,
script: script.into(),
}),
)
}
/// 创建一个 Rhai 代码执行节点
pub fn rhai(script: &str) -> Node {
Node::new(
node_id("rhai"),
Box::new(crate::nodes::CodeExecutor {
engine: ExprEngineKind::Rhai,
script: script.into(),
}),
)
}
/// 创建一个 JavaScript 代码执行节点
pub fn js(script: &str) -> Node {
Node::new(
node_id("js"),
Box::new(crate::nodes::CodeExecutor {
engine: ExprEngineKind::Js,
script: script.into(),
}),
)
}
/// 创建一个通用代码执行节点
pub fn run_code(engine: ExprEngineKind, script: &str) -> Node {
Node::new(
node_id("code"),
Box::new(crate::nodes::CodeExecutor {
engine,
script: script.into(),
}),
)
}
/// 创建一个 HTTP GET 请求节点
#[cfg(feature = "http")]
pub fn http_get(url: &str) -> Node {
Node::new(
node_id("http"),
Box::new(HttpExecutor {
method: "GET".into(),
url: url.into(),
body: None,
}),
)
}
/// 创建一个 HTTP POST 请求节点
#[cfg(feature = "http")]
pub fn http_post(url: &str, body: serde_json::Value) -> Node {
Node::new(
node_id("http"),
Box::new(HttpExecutor {
method: "POST".into(),
url: url.into(),
body: Some(body),
}),
)
}
/// 创建一个数据库操作节点
pub fn db_node(operation: &str, params: serde_json::Value) -> Node {
Node::new(
node_id("db"),
Box::new(DbExecutor {
operation: operation.into(),
params,
}),
)
}
/// 创建一个消息队列节点
pub fn mq_node(topic: &str, message: serde_json::Value) -> Node {
Node::new(
node_id("mq"),
Box::new(MqExecutor {
topic: topic.into(),
message,
}),
)
}
/// 创建一个血缘追踪节点
pub fn lineage_node() -> Node {
Node::new(
node_id("lineage"),
Box::new(LineageExecutor {
target_path: None,
}),
)
}
/// 创建一个带路径的血缘追踪节点
pub fn lineage_node_to_path(path: &str) -> Node {
Node::new(
node_id("lineage"),
Box::new(LineageExecutor {
target_path: Some(path.into()),
}),
)
}
/// 合并模式:按 ID 聚合为对象
pub fn merge_mode_object_by_id() -> MergeMode {
MergeMode::ObjectById
}
/// 合并模式:聚合为数组
pub fn merge_mode_array() -> MergeMode {
MergeMode::Array
}

View File

@ -0,0 +1,22 @@
use crate::core::context::Context;
use crate::core::error::NodeError;
use crate::core::executor::TaskExecutor;
use crate::core::traits::ExprEngine;
use serde_json::Value;
/// 表达式获取执行器
#[derive(Clone)]
pub struct ExprGetExecutor {
#[allow(dead_code)]
pub engine: crate::runtime::ExprEngineKind,
pub script: String,
}
#[async_trait::async_trait]
impl TaskExecutor for ExprGetExecutor {
async fn execute(&self, ctx: &mut Context, expr: Option<&dyn ExprEngine>) -> Result<Value, NodeError> {
let engine = expr.ok_or_else(|| NodeError::Exec("Expr engine not provided".into()))?;
let val = engine.eval(&self.script, ctx).await.map_err(|e| NodeError::Exec(e.to_string()))?;
Ok(val)
}
}

View File

@ -0,0 +1,24 @@
use crate::core::context::Context;
use crate::core::error::NodeError;
use crate::core::executor::TaskExecutor;
use crate::core::traits::ExprEngine;
use serde_json::Value;
/// 表达式设置执行器
#[derive(Clone)]
pub struct ExprSetExecutor {
#[allow(dead_code)]
pub engine: crate::runtime::ExprEngineKind,
pub script: String,
pub target_path: String,
}
#[async_trait::async_trait]
impl TaskExecutor for ExprSetExecutor {
async fn execute(&self, ctx: &mut Context, expr: Option<&dyn ExprEngine>) -> Result<Value, NodeError> {
let engine = expr.ok_or_else(|| NodeError::Exec("Expr engine not provided".into()))?;
let val = engine.eval(&self.script, ctx).await.map_err(|e| NodeError::Exec(e.to_string()))?;
ctx.set(&self.target_path, val.clone());
Ok(val)
}
}

View File

@ -0,0 +1,5 @@
pub mod expr_set;
pub mod expr_get;
pub use expr_set::ExprSetExecutor;
pub use expr_get::ExprGetExecutor;

58
dsl-flow/src/lib.rs Normal file
View File

@ -0,0 +1,58 @@
//! dsl-flow 库入口
//!
//! 导出了核心模块和常用的结构体、特征。
pub mod core;
pub mod runtime;
pub mod nodes;
pub mod control;
pub mod expression;
pub mod dsl;
pub use dsl::*;
// Re-export core types
pub use core::{Context, FlowNode, NodeId, NodeOutput, NodeError, ExprError, ExprEngine, LineageEntry};
// Re-export runtime types
pub use runtime::{Flow, FlowEngine, FlowOptions, FlowResult, ExprEngineKind, RhaiEngine, StateStore, InMemoryStateStore, FlowState};
#[cfg(feature = "js")]
pub use runtime::JsEngine;
// Re-export control types
pub use control::{SequenceExecutor, ForkJoinExecutor, GroupExecutor, ConditionalExecutor, MergeMode};
// Re-export expression types
pub use expression::{ExprSetExecutor, ExprGetExecutor};
// Re-export node types
pub use nodes::{
DbExecutor, MqExecutor, LineageExecutor, CodeExecutor
};
#[cfg(feature = "http")]
pub use nodes::HttpExecutor;
pub use core::node::Node;
pub use core::executor::TaskExecutor;
// Backward compatibility (deprecated, will be removed in future versions)
pub mod engine {
pub use crate::runtime::engine::*;
}
pub mod context {
pub use crate::core::context::*;
}
pub mod expr {
pub use crate::runtime::expr::*;
pub use crate::core::error::ExprError;
pub use crate::core::traits::ExprEngine;
}
pub mod node {
pub use crate::nodes::*;
pub use crate::core::types::{NodeId, NodeOutput, node_id};
pub use crate::core::error::NodeError;
pub use crate::core::traits::FlowNode;
}
pub mod state {
pub use crate::runtime::state::*;
}

View File

@ -0,0 +1,47 @@
use crate::core::context::Context;
use crate::core::error::NodeError;
use crate::core::executor::TaskExecutor;
use crate::core::traits::ExprEngine;
use crate::runtime::ExprEngineKind;
use serde_json::Value;
/// 通用代码执行器
///
/// 支持多种语言Rhai, JS 等)的脚本执行。
/// 不同于表达式求值,代码节点通常用于执行一段完整的业务逻辑,
/// 可能包含多行代码、控制流以及副作用。
#[derive(Clone)]
pub struct CodeExecutor {
pub engine: ExprEngineKind,
pub script: String,
}
#[async_trait::async_trait]
impl TaskExecutor for CodeExecutor {
async fn execute(&self, ctx: &mut Context, _expr: Option<&dyn ExprEngine>) -> Result<Value, NodeError> {
match self.engine {
ExprEngineKind::Rhai => {
#[cfg(feature = "rhai")]
{
let engine = crate::runtime::RhaiEngine::new();
engine.eval(&self.script, ctx).await.map_err(|e| NodeError::Exec(e.to_string()))
}
#[cfg(not(feature = "rhai"))]
{
Err(NodeError::Exec("Rhai feature is not enabled".to_string()))
}
}
ExprEngineKind::Js => {
#[cfg(feature = "js")]
{
let engine = crate::runtime::JsEngine::new();
engine.eval(&self.script, ctx).await.map_err(|e| NodeError::Exec(e.to_string()))
}
#[cfg(not(feature = "js"))]
{
Err(NodeError::Exec("JS feature is not enabled".to_string()))
}
}
}
}
}

25
dsl-flow/src/nodes/db.rs Normal file
View File

@ -0,0 +1,25 @@
use crate::core::context::Context;
use crate::core::error::NodeError;
use crate::core::executor::TaskExecutor;
use crate::core::traits::ExprEngine;
use serde_json::Value;
/// 数据库操作执行器
#[derive(Clone)]
pub struct DbExecutor {
pub operation: String,
pub params: Value,
}
#[async_trait::async_trait]
impl TaskExecutor for DbExecutor {
async fn execute(&self, ctx: &mut Context, _expr: Option<&dyn ExprEngine>) -> Result<Value, NodeError> {
let result = serde_json::json!({
"op": self.operation,
"params": self.params,
"status": "ok"
});
ctx.set("db.last", result.clone());
Ok(result)
}
}

View File

@ -0,0 +1,31 @@
use crate::core::context::Context;
use crate::core::error::NodeError;
use crate::core::executor::TaskExecutor;
use crate::core::traits::ExprEngine;
use serde_json::Value;
/// HTTP 请求执行器
#[cfg(feature = "http")]
#[derive(Clone)]
pub struct HttpExecutor {
pub method: String,
pub url: String,
pub body: Option<Value>,
}
#[cfg(feature = "http")]
#[async_trait::async_trait]
impl TaskExecutor for HttpExecutor {
async fn execute(&self, _ctx: &mut Context, _expr: Option<&dyn ExprEngine>) -> Result<Value, NodeError> {
let client = reqwest::Client::new();
let resp = match self.method.as_str() {
"GET" => client.get(&self.url).send().await,
"POST" => client.post(&self.url).json(&self.body).send().await,
_ => return Err(NodeError::Exec("Unsupported HTTP method".into())),
}
.map_err(|e| NodeError::Exec(format!("{e}")))?;
let status = resp.status().as_u16();
let json = resp.json::<Value>().await.map_err(|e| NodeError::Exec(format!("{e}")))?;
Ok(serde_json::json!({ "status": status, "body": json }))
}
}

View File

@ -0,0 +1,33 @@
use crate::core::context::Context;
use crate::core::error::NodeError;
use crate::core::executor::TaskExecutor;
use crate::core::traits::ExprEngine;
use serde_json::Value;
/// 血缘追踪执行器
#[derive(Clone)]
pub struct LineageExecutor {
pub target_path: Option<String>,
}
#[async_trait::async_trait]
impl TaskExecutor for LineageExecutor {
async fn execute(&self, ctx: &mut Context, _expr: Option<&dyn ExprEngine>) -> Result<Value, NodeError> {
let items = ctx.lineage();
let data = serde_json::to_value(items).map_err(|e| NodeError::Exec(format!("{e}")))?;
if let Some(p) = &self.target_path {
ctx.set(p, data.clone());
// Warning: context.record_write needs NodeId, but we don't have it here.
// This is a limitation of the current refactor.
// Ideally Context.record_write should rely on something else or Executor should receive NodeId.
// But changing TaskExecutor signature requires updating all implementations.
// For now, let's omit the record_write call here or assume LineageNode's purpose is to READ lineage, not WRITE it (except to context variable).
// Actually, writing to context IS a write operation.
// If we want to track that "LineageNode wrote to target_path", we need the ID.
// Temporary workaround: Use a fixed ID or skip recording the write of the lineage itself.
// ctx.record_write("lineage-node".into(), p.clone());
}
Ok(data)
}
}

11
dsl-flow/src/nodes/mod.rs Normal file
View File

@ -0,0 +1,11 @@
pub mod http;
pub mod db;
pub mod mq;
pub mod lineage;
pub mod code;
pub use http::HttpExecutor;
pub use db::DbExecutor;
pub use mq::MqExecutor;
pub use lineage::LineageExecutor;
pub use code::CodeExecutor;

25
dsl-flow/src/nodes/mq.rs Normal file
View File

@ -0,0 +1,25 @@
use crate::core::context::Context;
use crate::core::error::NodeError;
use crate::core::executor::TaskExecutor;
use crate::core::traits::ExprEngine;
use serde_json::Value;
/// 消息队列执行器
#[derive(Clone)]
pub struct MqExecutor {
pub topic: String,
pub message: Value,
}
#[async_trait::async_trait]
impl TaskExecutor for MqExecutor {
async fn execute(&self, ctx: &mut Context, _expr: Option<&dyn ExprEngine>) -> Result<Value, NodeError> {
let result = serde_json::json!({
"topic": self.topic,
"message": self.message,
"status": "sent"
});
ctx.set("mq.last", result.clone());
Ok(result)
}
}

View File

@ -1,27 +1,44 @@
use crate::context::Context;
use crate::expr::{ExprEngine, ExprEngineKind};
use crate::node::{FlowNode, NodeOutput, NodeRef, SequenceNode};
use crate::state::{FlowState, StateStore};
use serde_json::Value;
use crate::core::context::Context;
use crate::core::traits::{ExprEngine, FlowNode};
use crate::core::types::{NodeOutput, NodeRef, node_id};
use crate::control::SequenceExecutor;
use crate::core::node::Node;
use crate::runtime::expr::ExprEngineKind;
use crate::runtime::state::{FlowState, StateStore};
use std::sync::Arc;
/// 流程定义
pub struct Flow {
/// 入口节点
pub entry: NodeRef,
}
impl Flow {
pub fn new(entry: NodeRef) -> Self {
Self { entry }
/// 创建一个新的流程
///
/// # 参数
/// * `entry` - 入口节点
pub fn new<N: FlowNode + 'static>(entry: N) -> Self {
Self { entry: Arc::new(entry) }
}
/// 创建一个顺序执行的流程
pub fn sequence(nodes: Vec<NodeRef>) -> Self {
Self { entry: Arc::new(SequenceNode::new(nodes)) }
Self {
entry: Arc::new(Node::new(
node_id("seq"),
Box::new(SequenceExecutor { children: nodes })
))
}
}
}
/// 流程引擎配置
#[derive(Clone)]
pub struct FlowOptions {
/// 是否有状态
pub stateful: bool,
/// 表达式引擎类型
pub expr_engine: ExprEngineKind,
}
@ -31,18 +48,24 @@ impl Default for FlowOptions {
}
}
/// 流程执行引擎
pub struct FlowEngine<S: StateStore> {
store: S,
expr: Arc<dyn ExprEngine>,
}
impl<S: StateStore> FlowEngine<S> {
/// 创建新的流程引擎
///
/// # 参数
/// * `store` - 状态存储
/// * `options` - 配置选项
pub fn new(store: S, options: FlowOptions) -> Self {
let expr: Arc<dyn ExprEngine> = match options.expr_engine {
ExprEngineKind::Rhai => {
#[cfg(feature = "rhai")]
{
Arc::new(crate::expr::RhaiEngine::new())
Arc::new(crate::runtime::expr::RhaiEngine::new())
}
#[cfg(not(feature = "rhai"))]
{
@ -52,7 +75,7 @@ impl<S: StateStore> FlowEngine<S> {
ExprEngineKind::Js => {
#[cfg(feature = "js")]
{
Arc::new(crate::expr::JsEngine::new())
Arc::new(crate::runtime::expr::JsEngine::new())
}
#[cfg(not(feature = "js"))]
{
@ -63,11 +86,22 @@ impl<S: StateStore> FlowEngine<S> {
Self { store, expr }
}
/// 运行无状态流程
///
/// # 参数
/// * `flow` - 流程定义
/// * `ctx` - 初始上下文
pub async fn run_stateless(&self, flow: &Flow, mut ctx: Context) -> FlowResult {
let out = flow.entry.execute(&mut ctx, Some(self.expr.as_ref())).await?;
Ok(out)
}
/// 运行有状态流程
///
/// # 参数
/// * `session_id` - 会话 ID
/// * `flow` - 流程定义
/// * `ctx` - 初始上下文 (如果会话存在则会被覆盖)
pub async fn run_stateful(&self, session_id: &str, flow: &Flow, mut ctx: Context) -> FlowResult {
if let Some(stored) = self.store.load(session_id).await {
ctx = stored.context;
@ -79,5 +113,5 @@ impl<S: StateStore> FlowEngine<S> {
}
}
/// 流程执行结果
pub type FlowResult = Result<NodeOutput, Box<dyn std::error::Error + Send + Sync>>;

View File

@ -1,28 +1,18 @@
use crate::context::Context;
use crate::core::context::Context;
use crate::core::error::ExprError;
use crate::core::traits::ExprEngine;
use serde_json::Value;
use thiserror::Error;
#[derive(Debug, Error)]
pub enum ExprError {
#[error("Rhai error: {0}")]
Rhai(String),
#[error("JS error: {0}")]
Js(String),
#[error("Unsupported engine")]
Unsupported,
}
/// 表达式引擎类型
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ExprEngineKind {
/// Rhai 脚本引擎
Rhai,
/// JavaScript 引擎 (基于 Boa)
Js,
}
#[async_trait::async_trait]
pub trait ExprEngine: Send + Sync {
async fn eval(&self, script: &str, ctx: &Context) -> Result<Value, ExprError>;
}
/// Rhai 引擎实现
#[cfg(feature = "rhai")]
pub struct RhaiEngine {
engine: rhai::Engine,
@ -30,8 +20,9 @@ pub struct RhaiEngine {
#[cfg(feature = "rhai")]
impl RhaiEngine {
/// 创建新的 Rhai 引擎实例
pub fn new() -> Self {
let mut engine = rhai::Engine::new();
let engine = rhai::Engine::new();
Self { engine }
}
}
@ -40,25 +31,26 @@ impl RhaiEngine {
#[async_trait::async_trait]
impl ExprEngine for RhaiEngine {
async fn eval(&self, script: &str, ctx: &Context) -> Result<Value, ExprError> {
use rhai::EvalAltResult;
let mut scope = rhai::Scope::new();
let ctx_dynamic = rhai::Dynamic::from(ctx.as_value().clone());
let ctx_dynamic = rhai::serde::to_dynamic(ctx.as_value()).map_err(|e| ExprError::Rhai(e.to_string()))?;
scope.push("ctx", ctx_dynamic);
self.engine
.eval_with_scope::<rhai::Dynamic>(&mut scope, script)
.map_err(|e| ExprError::Rhai(format!("{e:?}")))
.and_then(|dynv| {
let v: Result<serde_json::Value, _> = rhai::serde::to_dynamic_value(&dynv);
let v: Result<serde_json::Value, _> = rhai::serde::from_dynamic(&dynv);
v.map_err(|e| ExprError::Rhai(format!("{e:?}")))
})
}
}
/// JS 引擎实现
#[cfg(feature = "js")]
pub struct JsEngine;
#[cfg(feature = "js")]
impl JsEngine {
/// 创建新的 JS 引擎实例
pub fn new() -> Self {
Self
}
@ -107,4 +99,3 @@ fn to_json(v: boa_engine::JsValue, ctx: &mut boa_engine::Context) -> Result<Valu
}
}
}

View File

@ -0,0 +1,9 @@
pub mod engine;
pub mod state;
pub mod expr;
pub use engine::{Flow, FlowEngine, FlowOptions, FlowResult};
pub use state::{StateStore, InMemoryStateStore, FlowState};
pub use expr::{ExprEngineKind, RhaiEngine};
#[cfg(feature = "js")]
pub use expr::JsEngine;

View File

@ -1,20 +1,34 @@
use crate::context::Context;
use crate::core::context::Context;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
/// 流程状态
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FlowState {
/// 会话 ID
pub session_id: String,
/// 执行上下文
pub context: Context,
}
/// 状态存储特征
#[async_trait::async_trait]
pub trait StateStore: Send + Sync {
/// 保存状态
///
/// # 参数
/// * `state` - 流程状态
async fn save(&self, state: FlowState);
/// 加载状态
///
/// # 参数
/// * `session_id` - 会话 ID
async fn load(&self, session_id: &str) -> Option<FlowState>;
}
/// 内存状态存储实现
#[derive(Clone, Default)]
pub struct InMemoryStateStore {
inner: Arc<Mutex<HashMap<String, FlowState>>>,
@ -30,4 +44,3 @@ impl StateStore for InMemoryStateStore {
self.inner.lock().unwrap().get(session_id).cloned()
}
}

View File

@ -15,6 +15,7 @@ fn write_report(name: &str, dur: std::time::Duration, ok: bool) {
}
}
#[derive(Clone)]
struct SleepNode {
ms: u64,
id: String,
@ -42,10 +43,10 @@ async fn test_rhai_expr_set_and_get() -> anyhow::Result<()> {
expr_set(ExprEngineKind::Rhai, "ctx.calc.sum * 2", "calc.double"),
});
let ctx = Context::new();
let out = engine.run_stateless(&flow, ctx).await?;
let out = engine.run_stateless(&flow, ctx).await.map_err(|e| anyhow::anyhow!(e))?;
let arr = out.data;
// Sequence returns last child's output
assert_eq!(arr, json!(8));
assert_eq!(arr, json!(12));
write_report("test_rhai_expr_set_and_get", start.elapsed(), true);
Ok(())
}
@ -56,17 +57,61 @@ async fn test_conditional_node_then_else() -> anyhow::Result<()> {
use std::sync::Arc;
let store = InMemoryStateStore::default();
let engine = FlowEngine::new(store, FlowOptions { stateful: false, expr_engine: ExprEngineKind::Rhai });
use dsl_flow::core::types::node_id;
let then = Arc::new(expr_set(ExprEngineKind::Rhai, "42", "branch.result")) as Arc<dyn FlowNode>;
let els = Arc::new(expr_set(ExprEngineKind::Rhai, "0", "branch.result")) as Arc<dyn FlowNode>;
let cond = Arc::new(dsl_flow::node::ConditionalNode::new(ExprEngineKind::Rhai, "false", then.clone(), Some(els.clone())));
let flow = Flow::new(sequence! { (*cond).clone() });
let cond = dsl_flow::Node::new(
node_id("cond"),
Box::new(dsl_flow::ConditionalExecutor {
engine: ExprEngineKind::Rhai,
condition: "false".into(),
then_node: then.clone(),
else_node: Some(els.clone()),
})
);
let flow = Flow::new(sequence! { cond });
let ctx = Context::new();
let out = engine.run_stateless(&flow, ctx).await?;
let out = engine.run_stateless(&flow, ctx).await.map_err(|e| anyhow::anyhow!(e))?;
assert_eq!(out.data, json!(0));
write_report("test_conditional_node_then_else", start.elapsed(), true);
Ok(())
}
#[cfg(feature = "js")]
#[tokio::test]
async fn test_js_run_code() -> anyhow::Result<()> {
let start = std::time::Instant::now();
let store = InMemoryStateStore::default();
let engine = FlowEngine::new(store, FlowOptions { stateful: false, expr_engine: ExprEngineKind::Js });
let flow = Flow::new(sequence! {
dsl_flow::js("const a = 1; const b = 2; a + b"),
});
let ctx = Context::new();
let out = engine.run_stateless(&flow, ctx).await?;
let data = out.data;
assert_eq!(data, json!(3));
write_report("test_js_run_code", start.elapsed(), true);
Ok(())
}
#[cfg(feature = "rhai")]
#[tokio::test]
async fn test_rhai_run_code() -> anyhow::Result<()> {
let start = std::time::Instant::now();
let store = InMemoryStateStore::default();
let engine = FlowEngine::new(store, FlowOptions { stateful: false, expr_engine: ExprEngineKind::Rhai });
let flow = Flow::new(sequence! {
dsl_flow::rhai("let a = 1; let b = 2; a + b"),
});
let ctx = Context::new();
let out = engine.run_stateless(&flow, ctx).await.map_err(|e| anyhow::anyhow!(e))?;
let data = out.data;
assert_eq!(data, json!(3));
write_report("test_rhai_run_code", start.elapsed(), true);
Ok(())
}
#[cfg(feature = "js")]
#[tokio::test]
async fn test_js_expr_and_fork_join() -> anyhow::Result<()> {
@ -109,7 +154,7 @@ async fn test_http_node_with_mock() -> anyhow::Result<()> {
dsl_flow::http_get(&format!("{}/data", server.base_url()))
});
let ctx = Context::new();
let out = engine.run_stateless(&flow, ctx).await?;
let out = engine.run_stateless(&flow, ctx).await.map_err(|e| anyhow::anyhow!(e))?;
let body = out.data.get("body").unwrap().clone();
assert_eq!(body.get("ok").unwrap(), &json!(true));
write_report("test_http_node_with_mock", start.elapsed(), true);
@ -122,12 +167,12 @@ async fn test_stateful_engine() -> anyhow::Result<()> {
let store = InMemoryStateStore::default();
let engine = FlowEngine::new(store.clone(), FlowOptions { stateful: true, expr_engine: ExprEngineKind::Rhai });
let flow = Flow::new(sequence! {
expr_set(ExprEngineKind::Rhai, "if ctx.counter == null { 0 } else { ctx.counter + 1 }", "counter")
expr_set(ExprEngineKind::Rhai, "if ctx.counter == () { 0 } else { ctx.counter + 1 }", "counter")
});
let mut ctx = Context::new();
let ctx = Context::new();
let s = "session-1";
let _out1 = engine.run_stateful(s, &flow, ctx.clone()).await?;
let out2 = engine.run_stateful(s, &flow, ctx.clone()).await?;
let _out1 = engine.run_stateful(s, &flow, ctx.clone()).await.map_err(|e| anyhow::anyhow!(e))?;
let out2 = engine.run_stateful(s, &flow, ctx.clone()).await.map_err(|e| anyhow::anyhow!(e))?;
assert_eq!(out2.data, json!(1));
write_report("test_stateful_engine", start.elapsed(), true);
Ok(())
@ -143,7 +188,7 @@ async fn test_db_and_mq_nodes() -> anyhow::Result<()> {
dsl_flow::mq_node("user.events", json!({"event": "created", "user": "Alice"})),
});
let ctx = Context::new();
let out = engine.run_stateless(&flow, ctx).await?;
let out = engine.run_stateless(&flow, ctx).await.map_err(|e| anyhow::anyhow!(e))?;
assert_eq!(out.data.get("status").unwrap(), &json!("sent"));
write_report("test_db_and_mq_nodes", start.elapsed(), true);
Ok(())
@ -151,16 +196,16 @@ async fn test_db_and_mq_nodes() -> anyhow::Result<()> {
#[tokio::test]
async fn test_group_parallel_sleep() -> anyhow::Result<()> {
use std::sync::Arc;
let store = InMemoryStateStore::default();
let engine = FlowEngine::new(store, FlowOptions { stateful: false, expr_engine: ExprEngineKind::Rhai });
let n1: Arc<dyn FlowNode> = Arc::new(SleepNode { ms: 200, id: "sleep-200".into() });
let n2: Arc<dyn FlowNode> = Arc::new(SleepNode { ms: 200, id: "sleep-200b".into() });
let group = group_merge! { "agg.group", merge_mode_array(), (*n1).clone(), (*n2).clone() };
let flow = Flow::new(sequence! { (*group).clone() });
let n1 = SleepNode { ms: 200, id: "sleep-200".into() };
let n2 = SleepNode { ms: 200, id: "sleep-200b".into() };
let group = group_merge! { "agg.group", merge_mode_array(), n1, n2 };
let flow = Flow::new(sequence! { group });
let ctx = Context::new();
let start = std::time::Instant::now();
let _ = engine.run_stateless(&flow, ctx).await?;
let _ = engine.run_stateless(&flow, ctx).await.map_err(|e| anyhow::anyhow!(e))?;
let elapsed = start.elapsed();
assert!(elapsed.as_millis() < 380, "elapsed={}ms", elapsed.as_millis());
write_report("test_group_parallel_sleep", start.elapsed(), true);
@ -190,7 +235,7 @@ async fn test_fork_join_merge_and_lineage() -> anyhow::Result<()> {
expr_get(ExprEngineKind::Rhai, "ctx.agg.fork")
});
let ctx = Context::new();
let out = engine.run_stateless(&flow, ctx).await?;
let out = engine.run_stateless(&flow, ctx).await.map_err(|e| anyhow::anyhow!(e))?;
let obj = out.data;
assert!(obj.is_object());
Ok(())

View File

@ -1,74 +0,0 @@
# dsl-flow
Rust DSL 工作流引擎,支持:
- 有状态/无状态流程运行
- 异步节点执行、Fork/Join、分组并发
- 可扩展的表达式引擎Rhai/JS支持对上下文数据的取/存
- 节点抽象,易于扩展 HTTP/DB/MQ 等业务节点
- 以 Rust DSL 宏定义流程
## 快速开始
```rust
use dsl_flow::*;
use serde_json::json;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let store = InMemoryStateStore::default();
let engine = FlowEngine::new(store, FlowOptions { stateful: false, expr_engine: ExprEngineKind::Rhai });
let flow = Flow::new(sequence! {
expr_set(ExprEngineKind::Rhai, "1 + 2", "calc.sum"),
fork_join! {
expr_set(ExprEngineKind::Js, "ctx.calc.sum * 2", "calc.js_double"),
#[cfg(feature = "http")]
http_get("https://httpbin.org/get")
}
});
let mut ctx = Context::new();
let out = engine.run_stateless(&flow, ctx).await?;
println!("{}", serde_json::to_string_pretty(&out.data)?);
Ok(())
}
```
## 设计说明
- 引擎入口:`FlowEngine` 提供 `run_stateless``run_stateful` 两种运行方式;有状态运行通过 `StateStore` 抽象保存上下文状态。
- 节点模型:定义统一的 `FlowNode` 异步接口,内置节点包括:
- `SequenceNode` 顺序执行
- `ForkJoinNode` 并行分支并汇合
- `GroupNode` 分组并发
- `ExprSetNode`/`ExprGetNode` 用于基于表达式读取与写入上下文
- `HttpNode`(可选 feature `http`
- 表达式:`ExprEngine` 抽象,当前提供 `RhaiEngine``JsEngine` 两种实现JS 实现通过向脚本注入 `ctx`JSON完成数据访问。
- DSL 宏:
- `sequence! { ... }` 定义顺序节点
- `fork_join! { ... }` 定义并行分支
- `group! { ... }` 定义分组并发
- `expr_set(Kind, script, path)`/`expr_get(Kind, script)`
- `http_get(url)`/`http_post(url, body)`
## 扩展指南
- 新增节点:实现 `FlowNode` trait并在 DSL 或构建 API 中暴露创建函数。
- 新增表达式语言:实现 `ExprEngine` trait并在 `ExprEngineKind` 中添加枚举值。
- 集成到其他项目:将 `dsl-flow` 作为依赖引入,使用 DSL 或 Builder 构建流程并在你的服务中运行。
## 测试
运行:
```bash
cargo test -p dsl-flow
```
包含:
- Fork/Join 并发执行验证
- Rhai/JS 表达式取值与存值验证
- HTTP 节点示例(使用 `httpmock`
## 许可
MIT

View File

@ -1,96 +0,0 @@
use crate::node::{FlowNode, NodeRef};
use crate::node::{SequenceNode, ForkJoinNode, GroupNode, ExprSetNode, ExprGetNode, MergeMode};
use crate::expr::ExprEngineKind;
use std::sync::Arc;
#[macro_export]
macro_rules! sequence {
( $($node:expr),* $(,)? ) => {{
let mut nodes: Vec<std::sync::Arc<dyn $crate::node::FlowNode>> = Vec::new();
$(
nodes.push(std::sync::Arc::new($node));
)*
std::sync::Arc::new($crate::node::SequenceNode::new(nodes))
}};
}
#[macro_export]
macro_rules! fork_join {
( $($node:expr),* $(,)? ) => {{
let mut nodes: Vec<std::sync::Arc<dyn $crate::node::FlowNode>> = Vec::new();
$(
nodes.push(std::sync::Arc::new($node));
)*
std::sync::Arc::new($crate::node::ForkJoinNode::new(nodes))
}};
}
#[macro_export]
macro_rules! fork_join_merge {
( $merge_path:expr, $mode:expr, $( $node:expr ),* $(,)? ) => {{
let mut nodes: Vec<std::sync::Arc<dyn $crate::node::FlowNode>> = Vec::new();
$(
nodes.push(std::sync::Arc::new($node));
)*
std::sync::Arc::new($crate::node::ForkJoinNode::with_merge(nodes, $merge_path, $mode))
}};
}
#[macro_export]
macro_rules! group {
( $($node:expr),* $(,)? ) => {{
let mut nodes: Vec<std::sync::Arc<dyn $crate::node::FlowNode>> = Vec::new();
$(
nodes.push(std::sync::Arc::new($node));
)*
std::sync::Arc::new($crate::node::GroupNode::new(nodes))
}};
}
#[macro_export]
macro_rules! group_merge {
( $merge_path:expr, $mode:expr, $( $node:expr ),* $(,)? ) => {{
let mut nodes: Vec<std::sync::Arc<dyn $crate::node::FlowNode>> = Vec::new();
$(
nodes.push(std::sync::Arc::new($node));
)*
std::sync::Arc::new($crate::node::GroupNode::with_merge(nodes, $merge_path, $mode))
}};
}
pub fn expr_set(engine: ExprEngineKind, script: &str, target_path: &str) -> ExprSetNode {
ExprSetNode::new(engine, script, target_path)
}
pub fn expr_get(engine: ExprEngineKind, script: &str) -> ExprGetNode {
ExprGetNode::new(engine, script)
}
#[cfg(feature = "http")]
pub fn http_get(url: &str) -> crate::node::HttpNode {
crate::node::HttpNode::get(url)
}
#[cfg(feature = "http")]
pub fn http_post(url: &str, body: serde_json::Value) -> crate::node::HttpNode {
crate::node::HttpNode::post(url, body)
}
pub fn db_node(operation: &str, params: serde_json::Value) -> crate::node::DbNode {
crate::node::DbNode::new(operation, params)
}
pub fn mq_node(topic: &str, message: serde_json::Value) -> crate::node::MqNode {
crate::node::MqNode::new(topic, message)
}
pub fn merge_mode_object_by_id() -> MergeMode {
MergeMode::ObjectById
}
pub fn merge_mode_array() -> MergeMode {
MergeMode::Array
}
pub fn lineage_node() -> crate::node::LineageNode {
crate::node::LineageNode::new()
}

View File

@ -1,19 +0,0 @@
pub mod engine;
pub mod context;
pub mod expr;
pub mod node;
pub mod dsl;
pub mod state;
pub use engine::{Flow, FlowEngine, FlowOptions, FlowResult};
pub use context::{Context, ValuePath};
pub use expr::{ExprEngineKind, ExprEngine};
#[cfg(feature = "js")]
pub use expr::JsEngine;
#[cfg(feature = "rhai")]
pub use expr::RhaiEngine;
pub use node::{NodeId, FlowNode, NodeOutput, HttpNode, ExprSetNode, ExprGetNode, SequenceNode, ForkJoinNode, GroupNode, ConditionalNode, DbNode, MqNode, MergeMode, LineageNode};
pub use context::{LineageEntry};
pub use dsl::*;
pub use state::{StateStore, InMemoryStateStore, FlowState};

View File

@ -1,404 +0,0 @@
use crate::context::Context;
use crate::expr::{ExprEngine, ExprEngineKind};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::sync::Arc;
use thiserror::Error;
#[derive(Debug, Error)]
pub enum NodeError {
#[error("Execution error: {0}")]
Exec(String),
}
pub type NodeId = String;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NodeOutput {
pub id: NodeId,
pub data: Value,
}
#[async_trait::async_trait]
pub trait FlowNode: Send + Sync {
fn id(&self) -> &str;
async fn execute(&self, ctx: &mut Context, expr: Option<&dyn ExprEngine>) -> Result<NodeOutput, NodeError>;
}
pub type NodeRef = Arc<dyn FlowNode>;
pub fn node_id(prefix: &str) -> NodeId {
format!("{}-{}", prefix, uuid::Uuid::new_v4())
}
pub struct SequenceNode {
id: NodeId,
children: Vec<NodeRef>,
}
impl SequenceNode {
pub fn new(children: Vec<NodeRef>) -> Self {
Self { id: node_id("seq"), children }
}
}
#[async_trait::async_trait]
impl FlowNode for SequenceNode {
fn id(&self) -> &str {
&self.id
}
async fn execute(&self, ctx: &mut Context, expr: Option<&dyn ExprEngine>) -> Result<NodeOutput, NodeError> {
let mut last = Value::Null;
for child in &self.children {
let out = child.execute(ctx, expr).await?;
last = out.data;
}
Ok(NodeOutput { id: self.id.clone(), data: last })
}
}
#[cfg(feature = "http")]
pub struct HttpNode {
id: NodeId,
method: String,
url: String,
body: Option<Value>,
}
#[cfg(feature = "http")]
impl HttpNode {
pub fn get(url: impl Into<String>) -> Self {
Self { id: node_id("http"), method: "GET".into(), url: url.into(), body: None }
}
pub fn post(url: impl Into<String>, body: Value) -> Self {
Self { id: node_id("http"), method: "POST".into(), url: url.into(), body: Some(body) }
}
}
#[cfg(feature = "http")]
#[async_trait::async_trait]
impl FlowNode for HttpNode {
fn id(&self) -> &str {
&self.id
}
async fn execute(&self, _ctx: &mut Context, _expr: Option<&dyn ExprEngine>) -> Result<NodeOutput, NodeError> {
let client = reqwest::Client::new();
let resp = match self.method.as_str() {
"GET" => client.get(&self.url).send().await,
"POST" => client.post(&self.url).json(&self.body).send().await,
_ => return Err(NodeError::Exec("Unsupported HTTP method".into())),
}
.map_err(|e| NodeError::Exec(format!("{e}")))?;
let status = resp.status().as_u16();
let json = resp.json::<Value>().await.map_err(|e| NodeError::Exec(format!("{e}")))?;
Ok(NodeOutput { id: self.id.clone(), data: serde_json::json!({ "status": status, "body": json }) })
}
}
pub struct ExprSetNode {
id: NodeId,
engine: ExprEngineKind,
script: String,
target_path: String,
}
impl ExprSetNode {
pub fn new(engine: ExprEngineKind, script: impl Into<String>, target_path: impl Into<String>) -> Self {
Self {
id: node_id("expr_set"),
engine,
script: script.into(),
target_path: target_path.into(),
}
}
}
#[async_trait::async_trait]
impl FlowNode for ExprSetNode {
fn id(&self) -> &str {
&self.id
}
async fn execute(&self, ctx: &mut Context, expr: Option<&dyn ExprEngine>) -> Result<NodeOutput, NodeError> {
let engine = expr.ok_or_else(|| NodeError::Exec("Expr engine not provided".into()))?;
let val = engine.eval(&self.script, ctx).await.map_err(|e| NodeError::Exec(e.to_string()))?;
ctx.set(&self.target_path, val.clone());
ctx.record_write(self.id.clone(), self.target_path.clone());
Ok(NodeOutput { id: self.id.clone(), data: val })
}
}
pub struct ExprGetNode {
id: NodeId,
engine: ExprEngineKind,
script: String,
}
impl ExprGetNode {
pub fn new(engine: ExprEngineKind, script: impl Into<String>) -> Self {
Self { id: node_id("expr_get"), engine, script: script.into() }
}
}
#[async_trait::async_trait]
impl FlowNode for ExprGetNode {
fn id(&self) -> &str {
&self.id
}
async fn execute(&self, ctx: &mut Context, expr: Option<&dyn ExprEngine>) -> Result<NodeOutput, NodeError> {
let engine = expr.ok_or_else(|| NodeError::Exec("Expr engine not provided".into()))?;
let val = engine.eval(&self.script, ctx).await.map_err(|e| NodeError::Exec(e.to_string()))?;
Ok(NodeOutput { id: self.id.clone(), data: val })
}
}
pub struct ForkJoinNode {
id: NodeId,
branches: Vec<NodeRef>,
merge_to_ctx: Option<String>,
merge_mode: MergeMode,
}
impl ForkJoinNode {
pub fn new(branches: Vec<NodeRef>) -> Self {
Self { id: node_id("fork"), branches, merge_to_ctx: None, merge_mode: MergeMode::Array }
}
pub fn with_merge(branches: Vec<NodeRef>, merge_to_ctx: impl Into<String>, merge_mode: MergeMode) -> Self {
Self { id: node_id("fork"), branches, merge_to_ctx: Some(merge_to_ctx.into()), merge_mode }
}
}
#[async_trait::async_trait]
impl FlowNode for ForkJoinNode {
fn id(&self) -> &str {
&self.id
}
async fn execute(&self, ctx: &mut Context, expr: Option<&dyn ExprEngine>) -> Result<NodeOutput, NodeError> {
let mut tasks = Vec::with_capacity(self.branches.len());
for b in &self.branches {
let mut subctx = Context::with_value(ctx.as_value().clone());
let b = b.clone();
let expr = expr;
tasks.push(tokio::spawn(async move { b.execute(&mut subctx, expr).await }));
}
let mut results = Vec::new();
for t in tasks {
let out = t.await.map_err(|e| NodeError::Exec(format!("Join error: {e}")))??;
results.push(serde_json::json!({ "id": out.id, "data": out.data }));
}
let data = match self.merge_mode {
MergeMode::Array => Value::Array(results.clone()),
MergeMode::ObjectById => {
let mut map = serde_json::Map::new();
for item in &results {
let id = item.get("id").and_then(|v| v.as_str()).unwrap_or_default().to_string();
let data = item.get("data").cloned().unwrap_or(Value::Null);
map.insert(id, data);
}
Value::Object(map)
}
};
if let Some(path) = &self.merge_to_ctx {
ctx.set(path, data.clone());
ctx.record_write(self.id.clone(), path.clone());
}
Ok(NodeOutput { id: self.id.clone(), data })
}
}
pub struct GroupNode {
id: NodeId,
parallel: Vec<NodeRef>,
merge_to_ctx: Option<String>,
merge_mode: MergeMode,
}
impl GroupNode {
pub fn new(parallel: Vec<NodeRef>) -> Self {
Self { id: node_id("group"), parallel, merge_to_ctx: None, merge_mode: MergeMode::Array }
}
pub fn with_merge(parallel: Vec<NodeRef>, merge_to_ctx: impl Into<String>, merge_mode: MergeMode) -> Self {
Self { id: node_id("group"), parallel, merge_to_ctx: Some(merge_to_ctx.into()), merge_mode }
}
}
#[async_trait::async_trait]
impl FlowNode for GroupNode {
fn id(&self) -> &str {
&self.id
}
async fn execute(&self, ctx: &mut Context, expr: Option<&dyn ExprEngine>) -> Result<NodeOutput, NodeError> {
let mut joins = Vec::with_capacity(self.parallel.len());
for n in &self.parallel {
let mut subctx = Context::with_value(ctx.as_value().clone());
let n = n.clone();
let expr = expr;
joins.push(tokio::spawn(async move { n.execute(&mut subctx, expr).await }));
}
let mut results = Vec::new();
for j in joins {
let out = j.await.map_err(|e| NodeError::Exec(format!("Group join error: {e}")))??;
results.push(serde_json::json!({ "id": out.id, "data": out.data }));
}
let data = match self.merge_mode {
MergeMode::Array => Value::Array(results.clone()),
MergeMode::ObjectById => {
let mut map = serde_json::Map::new();
for item in &results {
let id = item.get("id").and_then(|v| v.as_str()).unwrap_or_default().to_string();
let data = item.get("data").cloned().unwrap_or(Value::Null);
map.insert(id, data);
}
Value::Object(map)
}
};
if let Some(path) = &self.merge_to_ctx {
ctx.set(path, data.clone());
ctx.record_write(self.id.clone(), path.clone());
}
Ok(NodeOutput { id: self.id.clone(), data })
}
}
pub struct ConditionalNode {
id: NodeId,
engine: ExprEngineKind,
condition: String,
then_node: NodeRef,
else_node: Option<NodeRef>,
}
impl ConditionalNode {
pub fn new(engine: ExprEngineKind, condition: impl Into<String>, then_node: NodeRef, else_node: Option<NodeRef>) -> Self {
Self {
id: node_id("cond"),
engine,
condition: condition.into(),
then_node,
else_node,
}
}
}
#[async_trait::async_trait]
impl FlowNode for ConditionalNode {
fn id(&self) -> &str {
&self.id
}
async fn execute(&self, ctx: &mut Context, expr: Option<&dyn ExprEngine>) -> Result<NodeOutput, NodeError> {
let engine = expr.ok_or_else(|| NodeError::Exec("Expr engine not provided".into()))?;
let val = engine.eval(&self.condition, ctx).await.map_err(|e| NodeError::Exec(e.to_string()))?;
let cond = match val {
Value::Bool(b) => b,
_ => false,
};
let selected = if cond { &self.then_node } else { self.else_node.as_ref().unwrap_or(&self.then_node) };
let out = selected.execute(ctx, Some(engine)).await?;
Ok(NodeOutput { id: self.id.clone(), data: out.data })
}
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub enum MergeMode {
Array,
ObjectById,
}
pub struct DbNode {
id: NodeId,
operation: String,
params: Value,
}
impl DbNode {
pub fn new(operation: impl Into<String>, params: Value) -> Self {
Self { id: node_id("db"), operation: operation.into(), params }
}
}
#[async_trait::async_trait]
impl FlowNode for DbNode {
fn id(&self) -> &str {
&self.id
}
async fn execute(&self, ctx: &mut Context, _expr: Option<&dyn ExprEngine>) -> Result<NodeOutput, NodeError> {
let result = serde_json::json!({
"op": self.operation,
"params": self.params,
"status": "ok"
});
ctx.set("db.last", result.clone());
ctx.record_write(self.id.clone(), "db.last".to_string());
Ok(NodeOutput { id: self.id.clone(), data: result })
}
}
pub struct MqNode {
id: NodeId,
topic: String,
message: Value,
}
impl MqNode {
pub fn new(topic: impl Into<String>, message: Value) -> Self {
Self { id: node_id("mq"), topic: topic.into(), message }
}
}
#[async_trait::async_trait]
impl FlowNode for MqNode {
fn id(&self) -> &str {
&self.id
}
async fn execute(&self, ctx: &mut Context, _expr: Option<&dyn ExprEngine>) -> Result<NodeOutput, NodeError> {
let result = serde_json::json!({
"topic": self.topic,
"message": self.message,
"status": "sent"
});
ctx.set("mq.last", result.clone());
ctx.record_write(self.id.clone(), "mq.last".to_string());
Ok(NodeOutput { id: self.id.clone(), data: result })
}
}
pub struct LineageNode {
id: NodeId,
target_path: Option<String>,
}
impl LineageNode {
pub fn new() -> Self {
Self { id: node_id("lineage"), target_path: None }
}
pub fn to_path(mut self, path: impl Into<String>) -> Self {
self.target_path = Some(path.into());
self
}
}
#[async_trait::async_trait]
impl FlowNode for LineageNode {
fn id(&self) -> &str {
&self.id
}
async fn execute(&self, ctx: &mut Context, _expr: Option<&dyn ExprEngine>) -> Result<NodeOutput, NodeError> {
let items = ctx.lineage();
let data = serde_json::to_value(items).map_err(|e| NodeError::Exec(format!("{e}")))?;
if let Some(p) = &self.target_path {
ctx.set(p, data.clone());
ctx.record_write(self.id.clone(), p.clone());
}
Ok(NodeOutput { id: self.id.clone(), data })
}
}

40
scripts/test_report.sh Normal file
View File

@ -0,0 +1,40 @@
#!/usr/bin/env bash
set +e
JS_FEATURE=0
if [[ "$1" == "--js" ]]; then
JS_FEATURE=1
fi
dir="target/test-reports"
docDir="doc"
mkdir -p "$dir"
mkdir -p "$docDir"
if [[ "$JS_FEATURE" == "1" ]]; then
header="## JS feature"
outFile="$docDir/performance-js.md"
featureArgs="--features js"
else
header="## Default features"
outFile="$docDir/performance-default.md"
featureArgs=""
fi
names=(test_rhai_expr_set_and_get test_conditional_node_then_else test_http_node_with_mock test_stateful_engine test_db_and_mq_nodes test_group_parallel_sleep test_expr_set_without_engine_error)
if [[ "$JS_FEATURE" == "1" ]]; then
names+=(test_js_expr_and_fork_join)
fi
md="# dsl-flow Test Performance\n\n${header}\n"
for n in "${names[@]}"; do
# Measure elapsed seconds with bash time builtin
TIMEFORMAT='%3R'
dur=$( { time cargo test -p dsl-flow ${featureArgs} -- --exact "${n}" --nocapture --quiet >/dev/null; } 2>&1 )
md+="- ${n}: ${dur}s\n"
done
printf "%b" "$md" > "$outFile"
echo "Report written to $outFile"