Files
dsl_flow/dsl-flow/tests/flow_tests.rs
ayou 81da0fe61f feat: 重构项目结构并添加核心功能
refactor: 将代码按功能模块重新组织到 core/runtime/control 等目录
feat(core): 添加 Context、FlowNode 等核心 trait 和类型
feat(runtime): 实现 FlowEngine 和状态管理
feat(control): 添加顺序/并行/条件控制流节点
feat(nodes): 实现 HTTP/DB/MQ 等业务节点
docs: 更新 README 添加架构说明和快速开始示例
test: 添加性能测试脚本和示例代码
2025-12-14 23:50:40 +08:00

244 lines
9.4 KiB
Rust

use dsl_flow::*;
use serde_json::json;
fn write_report(name: &str, dur: std::time::Duration, ok: bool) {
use std::fs::{create_dir_all, OpenOptions};
use std::io::Write;
let _ = create_dir_all("target/test-reports");
if let Ok(mut f) = OpenOptions::new().create(true).append(true).open("target/test-reports/default.jsonl") {
let line = serde_json::json!({
"name": name,
"ok": ok,
"duration_sec": dur.as_secs_f64()
}).to_string();
let _ = writeln!(f, "{}", line);
}
}
#[derive(Clone)]
struct SleepNode {
ms: u64,
id: String,
}
#[async_trait::async_trait]
impl dsl_flow::FlowNode for SleepNode {
fn id(&self) -> &str {
&self.id
}
async fn execute(&self, ctx: &mut dsl_flow::Context, _expr: Option<&dyn dsl_flow::ExprEngine>) -> Result<dsl_flow::NodeOutput, dsl_flow::node::NodeError> {
tokio::time::sleep(std::time::Duration::from_millis(self.ms)).await;
ctx.set("sleep.last", json!({"slept_ms": self.ms}));
Ok(dsl_flow::NodeOutput { id: self.id.clone(), data: json!(self.ms) })
}
}
#[tokio::test]
async fn test_rhai_expr_set_and_get() -> anyhow::Result<()> {
let start = std::time::Instant::now();
let store = InMemoryStateStore::default();
let engine = FlowEngine::new(store, FlowOptions { stateful: false, expr_engine: ExprEngineKind::Rhai });
let flow = Flow::new(sequence! {
expr_set(ExprEngineKind::Rhai, "1 + 2 + 3", "calc.sum"),
expr_set(ExprEngineKind::Rhai, "ctx.calc.sum * 2", "calc.double"),
});
let ctx = Context::new();
let out = engine.run_stateless(&flow, ctx).await.map_err(|e| anyhow::anyhow!(e))?;
let arr = out.data;
// Sequence returns last child's output
assert_eq!(arr, json!(12));
write_report("test_rhai_expr_set_and_get", start.elapsed(), true);
Ok(())
}
#[tokio::test]
async fn test_conditional_node_then_else() -> anyhow::Result<()> {
let start = std::time::Instant::now();
use std::sync::Arc;
let store = InMemoryStateStore::default();
let engine = FlowEngine::new(store, FlowOptions { stateful: false, expr_engine: ExprEngineKind::Rhai });
use dsl_flow::core::types::node_id;
let then = Arc::new(expr_set(ExprEngineKind::Rhai, "42", "branch.result")) as Arc<dyn FlowNode>;
let els = Arc::new(expr_set(ExprEngineKind::Rhai, "0", "branch.result")) as Arc<dyn FlowNode>;
let cond = dsl_flow::Node::new(
node_id("cond"),
Box::new(dsl_flow::ConditionalExecutor {
engine: ExprEngineKind::Rhai,
condition: "false".into(),
then_node: then.clone(),
else_node: Some(els.clone()),
})
);
let flow = Flow::new(sequence! { cond });
let ctx = Context::new();
let out = engine.run_stateless(&flow, ctx).await.map_err(|e| anyhow::anyhow!(e))?;
assert_eq!(out.data, json!(0));
write_report("test_conditional_node_then_else", start.elapsed(), true);
Ok(())
}
#[cfg(feature = "js")]
#[tokio::test]
async fn test_js_run_code() -> anyhow::Result<()> {
let start = std::time::Instant::now();
let store = InMemoryStateStore::default();
let engine = FlowEngine::new(store, FlowOptions { stateful: false, expr_engine: ExprEngineKind::Js });
let flow = Flow::new(sequence! {
dsl_flow::js("const a = 1; const b = 2; a + b"),
});
let ctx = Context::new();
let out = engine.run_stateless(&flow, ctx).await?;
let data = out.data;
assert_eq!(data, json!(3));
write_report("test_js_run_code", start.elapsed(), true);
Ok(())
}
#[cfg(feature = "rhai")]
#[tokio::test]
async fn test_rhai_run_code() -> anyhow::Result<()> {
let start = std::time::Instant::now();
let store = InMemoryStateStore::default();
let engine = FlowEngine::new(store, FlowOptions { stateful: false, expr_engine: ExprEngineKind::Rhai });
let flow = Flow::new(sequence! {
dsl_flow::rhai("let a = 1; let b = 2; a + b"),
});
let ctx = Context::new();
let out = engine.run_stateless(&flow, ctx).await.map_err(|e| anyhow::anyhow!(e))?;
let data = out.data;
assert_eq!(data, json!(3));
write_report("test_rhai_run_code", start.elapsed(), true);
Ok(())
}
#[cfg(feature = "js")]
#[tokio::test]
async fn test_js_expr_and_fork_join() -> anyhow::Result<()> {
let start = std::time::Instant::now();
let store = InMemoryStateStore::default();
let engine = FlowEngine::new(store, FlowOptions { stateful: false, expr_engine: ExprEngineKind::Js });
let flow = Flow::new(sequence! {
expr_set(ExprEngineKind::Js, "({ a: 1, b: 2 })", "obj"),
fork_join! {
expr_set(ExprEngineKind::Js, "ctx.obj.a + ctx.obj.b", "sum"),
expr_set(ExprEngineKind::Js, "ctx.obj.a * ctx.obj.b", "mul")
}
});
let ctx = Context::new();
let out = engine.run_stateless(&flow, ctx).await?;
let data = out.data;
assert!(data.is_array());
write_report("test_js_expr_and_fork_join", start.elapsed(), true);
Ok(())
}
#[cfg(feature = "http")]
#[tokio::test]
async fn test_http_node_with_mock() -> anyhow::Result<()> {
let start = std::time::Instant::now();
use httpmock::MockServer;
use httpmock::Method::GET;
let server = MockServer::start_async().await;
let _m = server.mock_async(|when, then| {
when.method(GET).path("/data");
then.status(200)
.header("content-type", "application/json")
.json_body(json!({ "ok": true, "msg": "hello" }));
}).await;
let store = InMemoryStateStore::default();
let engine = FlowEngine::new(store, FlowOptions { stateful: false, expr_engine: ExprEngineKind::Rhai });
let flow = Flow::new(sequence! {
dsl_flow::http_get(&format!("{}/data", server.base_url()))
});
let ctx = Context::new();
let out = engine.run_stateless(&flow, ctx).await.map_err(|e| anyhow::anyhow!(e))?;
let body = out.data.get("body").unwrap().clone();
assert_eq!(body.get("ok").unwrap(), &json!(true));
write_report("test_http_node_with_mock", start.elapsed(), true);
Ok(())
}
#[tokio::test]
async fn test_stateful_engine() -> anyhow::Result<()> {
let start = std::time::Instant::now();
let store = InMemoryStateStore::default();
let engine = FlowEngine::new(store.clone(), FlowOptions { stateful: true, expr_engine: ExprEngineKind::Rhai });
let flow = Flow::new(sequence! {
expr_set(ExprEngineKind::Rhai, "if ctx.counter == () { 0 } else { ctx.counter + 1 }", "counter")
});
let ctx = Context::new();
let s = "session-1";
let _out1 = engine.run_stateful(s, &flow, ctx.clone()).await.map_err(|e| anyhow::anyhow!(e))?;
let out2 = engine.run_stateful(s, &flow, ctx.clone()).await.map_err(|e| anyhow::anyhow!(e))?;
assert_eq!(out2.data, json!(1));
write_report("test_stateful_engine", start.elapsed(), true);
Ok(())
}
#[tokio::test]
async fn test_db_and_mq_nodes() -> anyhow::Result<()> {
let start = std::time::Instant::now();
let store = InMemoryStateStore::default();
let engine = FlowEngine::new(store, FlowOptions { stateful: false, expr_engine: ExprEngineKind::Rhai });
let flow = Flow::new(sequence! {
dsl_flow::db_node("insert_user", json!({"name": "Alice"})),
dsl_flow::mq_node("user.events", json!({"event": "created", "user": "Alice"})),
});
let ctx = Context::new();
let out = engine.run_stateless(&flow, ctx).await.map_err(|e| anyhow::anyhow!(e))?;
assert_eq!(out.data.get("status").unwrap(), &json!("sent"));
write_report("test_db_and_mq_nodes", start.elapsed(), true);
Ok(())
}
#[tokio::test]
async fn test_group_parallel_sleep() -> anyhow::Result<()> {
let store = InMemoryStateStore::default();
let engine = FlowEngine::new(store, FlowOptions { stateful: false, expr_engine: ExprEngineKind::Rhai });
let n1 = SleepNode { ms: 200, id: "sleep-200".into() };
let n2 = SleepNode { ms: 200, id: "sleep-200b".into() };
let group = group_merge! { "agg.group", merge_mode_array(), n1, n2 };
let flow = Flow::new(sequence! { group });
let ctx = Context::new();
let start = std::time::Instant::now();
let _ = engine.run_stateless(&flow, ctx).await.map_err(|e| anyhow::anyhow!(e))?;
let elapsed = start.elapsed();
assert!(elapsed.as_millis() < 380, "elapsed={}ms", elapsed.as_millis());
write_report("test_group_parallel_sleep", start.elapsed(), true);
Ok(())
}
#[tokio::test]
async fn test_expr_set_without_engine_error() -> anyhow::Result<()> {
let start = std::time::Instant::now();
let mut ctx = Context::new();
let node = expr_set(ExprEngineKind::Rhai, "1+1", "x");
let res = dsl_flow::FlowNode::execute(&node, &mut ctx, None).await;
assert!(res.is_err());
write_report("test_expr_set_without_engine_error", start.elapsed(), true);
Ok(())
}
#[tokio::test]
async fn test_fork_join_merge_and_lineage() -> anyhow::Result<()> {
let store = InMemoryStateStore::default();
let engine = FlowEngine::new(store, FlowOptions { stateful: false, expr_engine: ExprEngineKind::Rhai });
let flow = Flow::new(sequence! {
fork_join_merge! { "agg.fork", merge_mode_object_by_id(),
expr_set(ExprEngineKind::Rhai, "10", "a.x"),
expr_set(ExprEngineKind::Rhai, "20", "b.y")
},
expr_get(ExprEngineKind::Rhai, "ctx.agg.fork")
});
let ctx = Context::new();
let out = engine.run_stateless(&flow, ctx).await.map_err(|e| anyhow::anyhow!(e))?;
let obj = out.data;
assert!(obj.is_object());
Ok(())
}