init
This commit is contained in:
3577
Cargo.lock
generated
Normal file
3577
Cargo.lock
generated
Normal file
File diff suppressed because it is too large
Load Diff
10
Cargo.toml
Normal file
10
Cargo.toml
Normal file
@ -0,0 +1,10 @@
|
||||
[workspace]
|
||||
members = ["dsl-flow"]
|
||||
resolver = "3"
|
||||
|
||||
[profile.dev]
|
||||
opt-level = 0
|
||||
|
||||
[profile.release]
|
||||
opt-level = 3
|
||||
|
||||
11
doc/performance-default.md
Normal file
11
doc/performance-default.md
Normal file
@ -0,0 +1,11 @@
|
||||
# dsl-flow Test Performance
|
||||
|
||||
## Default features
|
||||
- test_rhai_expr_set_and_get: 89.303s
|
||||
- test_conditional_node_then_else: 76.627s
|
||||
- test_http_node_with_mock: 57.951s
|
||||
- test_stateful_engine: 56.686s
|
||||
- test_db_and_mq_nodes: 58.595s
|
||||
- test_group_parallel_sleep: 57.494s
|
||||
- test_expr_set_without_engine_error: 38.232s
|
||||
|
||||
12
doc/performance-js.md
Normal file
12
doc/performance-js.md
Normal file
@ -0,0 +1,12 @@
|
||||
# dsl-flow Test Performance
|
||||
|
||||
## JS feature
|
||||
- test_rhai_expr_set_and_get: 0.269s
|
||||
- test_conditional_node_then_else: 0.396s
|
||||
- test_http_node_with_mock: 0.462s
|
||||
- test_stateful_engine: 0.447s
|
||||
- test_db_and_mq_nodes: 0.544s
|
||||
- test_group_parallel_sleep: 1.425s
|
||||
- test_expr_set_without_engine_error: 1.074s
|
||||
- test_js_expr_and_fork_join: 0.586s
|
||||
|
||||
32
scripts/test_report.ps1
Normal file
32
scripts/test_report.ps1
Normal file
@ -0,0 +1,32 @@
|
||||
param([switch]$JsFeature)
|
||||
|
||||
$dir = "target/test-reports"
|
||||
New-Item -ItemType Directory -Force -Path $dir | Out-Null
|
||||
$docDir = "doc"
|
||||
New-Item -ItemType Directory -Force -Path $docDir | Out-Null
|
||||
|
||||
if ($JsFeature) {
|
||||
$featureArgs = "--features js"
|
||||
$header = "## JS feature"
|
||||
$outFile = "$docDir/performance-js.md"
|
||||
} else {
|
||||
$featureArgs = ""
|
||||
$header = "## Default features"
|
||||
$outFile = "$docDir/performance-default.md"
|
||||
}
|
||||
|
||||
$names = @('test_rhai_expr_set_and_get','test_conditional_node_then_else','test_http_node_with_mock','test_stateful_engine','test_db_and_mq_nodes','test_group_parallel_sleep','test_expr_set_without_engine_error')
|
||||
if ($JsFeature) { $names += 'test_js_expr_and_fork_join' }
|
||||
|
||||
$md = "# dsl-flow Test Performance`n`n$header`n"
|
||||
foreach ($n in $names) {
|
||||
if ($JsFeature) {
|
||||
$t = Measure-Command { & cargo test -p dsl-flow --features js -- --exact $n --nocapture --quiet }
|
||||
} else {
|
||||
$t = Measure-Command { & cargo test -p dsl-flow -- --exact $n --nocapture --quiet }
|
||||
}
|
||||
$sec = [math]::Round($t.TotalSeconds, 3)
|
||||
$md += ("- {0}: {1}s{2}" -f $n, $sec, [Environment]::NewLine)
|
||||
}
|
||||
|
||||
Set-Content -Path $outFile -Value $md
|
||||
39
src/Cargo.toml
Normal file
39
src/Cargo.toml
Normal file
@ -0,0 +1,39 @@
|
||||
[package]
|
||||
name = "dsl-flow"
|
||||
version = "0.1.0"
|
||||
edition = "2024"
|
||||
license = "MIT"
|
||||
description = "A Rust DSL-based workflow engine supporting stateful/stateless flows, async nodes, fork-join, and extensible expression engines (Rhai/JS)."
|
||||
readme = "README.md"
|
||||
|
||||
[lib]
|
||||
name = "dsl_flow"
|
||||
path = "src/lib.rs"
|
||||
|
||||
[features]
|
||||
default = ["rhai", "http"]
|
||||
rhai = ["dep:rhai"]
|
||||
js = ["dep:boa_engine"]
|
||||
http = ["dep:reqwest"]
|
||||
|
||||
[dependencies]
|
||||
tokio = { version = "1", features = ["rt-multi-thread", "macros", "sync", "time"] }
|
||||
futures = "0.3"
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
serde_json = { version = "1.0" }
|
||||
thiserror = "1.0"
|
||||
async-trait = "0.1"
|
||||
uuid = { version = "1", features = ["v4"] }
|
||||
tracing = "0.1"
|
||||
tracing-subscriber = { version = "0.3", features = ["fmt", "env-filter"] }
|
||||
|
||||
# Optional engines and nodes
|
||||
rhai = { version = "1", optional = true, features = ["serde"] }
|
||||
boa_engine = { version = "0.20", optional = true }
|
||||
reqwest = { version = "0.12", optional = true, features = ["json", "rustls-tls"] }
|
||||
|
||||
[dev-dependencies]
|
||||
tokio = { version = "1", features = ["rt-multi-thread", "macros", "sync", "time"] }
|
||||
httpmock = "0.7"
|
||||
anyhow = "1.0"
|
||||
|
||||
74
src/README.md
Normal file
74
src/README.md
Normal file
@ -0,0 +1,74 @@
|
||||
# dsl-flow
|
||||
|
||||
Rust DSL 工作流引擎,支持:
|
||||
- 有状态/无状态流程运行
|
||||
- 异步节点执行、Fork/Join、分组并发
|
||||
- 可扩展的表达式引擎(Rhai/JS),支持对上下文数据的取/存
|
||||
- 节点抽象,易于扩展 HTTP/DB/MQ 等业务节点
|
||||
- 以 Rust DSL 宏定义流程
|
||||
|
||||
## 快速开始
|
||||
|
||||
```rust
|
||||
use dsl_flow::*;
|
||||
use serde_json::json;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
let store = InMemoryStateStore::default();
|
||||
let engine = FlowEngine::new(store, FlowOptions { stateful: false, expr_engine: ExprEngineKind::Rhai });
|
||||
let flow = Flow::new(sequence! {
|
||||
expr_set(ExprEngineKind::Rhai, "1 + 2", "calc.sum"),
|
||||
fork_join! {
|
||||
expr_set(ExprEngineKind::Js, "ctx.calc.sum * 2", "calc.js_double"),
|
||||
#[cfg(feature = "http")]
|
||||
http_get("https://httpbin.org/get")
|
||||
}
|
||||
});
|
||||
let mut ctx = Context::new();
|
||||
let out = engine.run_stateless(&flow, ctx).await?;
|
||||
println!("{}", serde_json::to_string_pretty(&out.data)?);
|
||||
Ok(())
|
||||
}
|
||||
```
|
||||
|
||||
## 设计说明
|
||||
|
||||
- 引擎入口:`FlowEngine` 提供 `run_stateless` 与 `run_stateful` 两种运行方式;有状态运行通过 `StateStore` 抽象保存上下文状态。
|
||||
- 节点模型:定义统一的 `FlowNode` 异步接口,内置节点包括:
|
||||
- `SequenceNode` 顺序执行
|
||||
- `ForkJoinNode` 并行分支并汇合
|
||||
- `GroupNode` 分组并发
|
||||
- `ExprSetNode`/`ExprGetNode` 用于基于表达式读取与写入上下文
|
||||
- `HttpNode`(可选 feature `http`)
|
||||
- 表达式:`ExprEngine` 抽象,当前提供 `RhaiEngine` 与 `JsEngine` 两种实现;JS 实现通过向脚本注入 `ctx`(JSON)完成数据访问。
|
||||
- DSL 宏:
|
||||
- `sequence! { ... }` 定义顺序节点
|
||||
- `fork_join! { ... }` 定义并行分支
|
||||
- `group! { ... }` 定义分组并发
|
||||
- `expr_set(Kind, script, path)`/`expr_get(Kind, script)`
|
||||
- `http_get(url)`/`http_post(url, body)`
|
||||
|
||||
## 扩展指南
|
||||
|
||||
- 新增节点:实现 `FlowNode` trait,并在 DSL 或构建 API 中暴露创建函数。
|
||||
- 新增表达式语言:实现 `ExprEngine` trait,并在 `ExprEngineKind` 中添加枚举值。
|
||||
- 集成到其他项目:将 `dsl-flow` 作为依赖引入,使用 DSL 或 Builder 构建流程并在你的服务中运行。
|
||||
|
||||
## 测试
|
||||
|
||||
运行:
|
||||
|
||||
```bash
|
||||
cargo test -p dsl-flow
|
||||
```
|
||||
|
||||
包含:
|
||||
- Fork/Join 并发执行验证
|
||||
- Rhai/JS 表达式取值与存值验证
|
||||
- HTTP 节点示例(使用 `httpmock`)
|
||||
|
||||
## 许可
|
||||
|
||||
MIT
|
||||
|
||||
20
src/examples/basic.rs
Normal file
20
src/examples/basic.rs
Normal file
@ -0,0 +1,20 @@
|
||||
use dsl_flow::*;
|
||||
use serde_json::json;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
tracing_subscriber::fmt().with_env_filter("info").init();
|
||||
let store = InMemoryStateStore::default();
|
||||
let engine = FlowEngine::new(store, FlowOptions { stateful: false, expr_engine: ExprEngineKind::Rhai });
|
||||
let flow = Flow::new(sequence! {
|
||||
expr_set(ExprEngineKind::Rhai, "1 + 2", "calc.sum"),
|
||||
fork_join! {
|
||||
expr_set(ExprEngineKind::Rhai, "ctx.calc.sum * 2", "calc.double"),
|
||||
expr_set(ExprEngineKind::Rhai, "ctx.calc.sum * 3", "calc.triple")
|
||||
}
|
||||
});
|
||||
let ctx = Context::new();
|
||||
let out = engine.run_stateless(&flow, ctx).await?;
|
||||
println!("{}", serde_json::to_string_pretty(&out.data)?);
|
||||
Ok(())
|
||||
}
|
||||
67
src/examples/report.rs
Normal file
67
src/examples/report.rs
Normal file
@ -0,0 +1,67 @@
|
||||
use std::fs;
|
||||
use std::path::PathBuf;
|
||||
use serde_json::Value;
|
||||
|
||||
fn read_json_lines(path: &str) -> Vec<Value> {
|
||||
let p = PathBuf::from(path);
|
||||
if !p.exists() {
|
||||
return vec![];
|
||||
}
|
||||
let content = fs::read_to_string(p).unwrap_or_default();
|
||||
content
|
||||
.lines()
|
||||
.filter_map(|l| serde_json::from_str::<Value>(l).ok())
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn summarize(items: Vec<Value>) -> (usize, usize, f64, Vec<(String, f64)>) {
|
||||
let mut total = 0usize;
|
||||
let mut failed = 0usize;
|
||||
let mut duration = 0f64;
|
||||
let mut tests = Vec::new();
|
||||
for v in items {
|
||||
let t = v.get("type").and_then(|x| x.as_str()).unwrap_or("");
|
||||
if t == "test" {
|
||||
let name = v.get("name").and_then(|x| x.as_str()).unwrap_or("").to_string();
|
||||
let event = v.get("event").and_then(|x| x.as_str()).unwrap_or("");
|
||||
let time = v.get("exec_time").and_then(|x| x.as_f64()).unwrap_or(0.0);
|
||||
total += 1;
|
||||
if event == "failed" {
|
||||
failed += 1;
|
||||
}
|
||||
duration += time;
|
||||
tests.push((name, time));
|
||||
}
|
||||
}
|
||||
tests.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap());
|
||||
(total, failed, duration, tests)
|
||||
}
|
||||
|
||||
fn main() {
|
||||
let out_dir = PathBuf::from("target/test-reports");
|
||||
let _ = fs::create_dir_all(&out_dir);
|
||||
let default = read_json_lines("target/test-report-default.json");
|
||||
let js = read_json_lines("target/test-report-js.json");
|
||||
let (t1, f1, d1, s1) = summarize(default);
|
||||
let (t2, f2, d2, s2) = summarize(js);
|
||||
let mut md = String::new();
|
||||
md.push_str("# dsl-flow Test Report\n");
|
||||
md.push_str("\n");
|
||||
md.push_str("## Default features\n");
|
||||
md.push_str(&format!("- total: {}\n- failed: {}\n- duration: {:.3}s\n", t1, f1, d1));
|
||||
md.push_str("- top slow tests:\n");
|
||||
for (name, time) in s1.iter().take(5) {
|
||||
md.push_str(&format!(" - {}: {:.3}s\n", name, time));
|
||||
}
|
||||
md.push_str("\n");
|
||||
md.push_str("## JS feature\n");
|
||||
md.push_str(&format!("- total: {}\n- failed: {}\n- duration: {:.3}s\n", t2, f2, d2));
|
||||
md.push_str("- top slow tests:\n");
|
||||
for (name, time) in s2.iter().take(5) {
|
||||
md.push_str(&format!(" - {}: {:.3}s\n", name, time));
|
||||
}
|
||||
let out_path = out_dir.join("summary.md");
|
||||
let _ = fs::write(out_path, md);
|
||||
println!("report generated in target/test-reports/summary.md");
|
||||
}
|
||||
|
||||
123
src/src/context.rs
Normal file
123
src/src/context.rs
Normal file
@ -0,0 +1,123 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::{json, Value};
|
||||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
|
||||
pub struct Context {
|
||||
data: Value,
|
||||
meta: ContextMeta,
|
||||
}
|
||||
|
||||
impl Context {
|
||||
pub fn new() -> Self {
|
||||
Self { data: json!({}), meta: ContextMeta::default() }
|
||||
}
|
||||
|
||||
pub fn with_value(value: Value) -> Self {
|
||||
Self { data: value, meta: ContextMeta::default() }
|
||||
}
|
||||
|
||||
pub fn get(&self, path: impl AsRef<str>) -> Option<Value> {
|
||||
get_path(&self.data, path.as_ref())
|
||||
}
|
||||
|
||||
pub fn set(&mut self, path: impl AsRef<str>, value: Value) {
|
||||
set_path(&mut self.data, path.as_ref(), value);
|
||||
}
|
||||
|
||||
pub fn as_value(&self) -> &Value {
|
||||
&self.data
|
||||
}
|
||||
|
||||
pub fn record_write(&mut self, node_id: impl Into<String>, path: impl Into<String>) {
|
||||
let ts = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() as u64;
|
||||
self.meta.lineage.push(LineageEntry { node_id: node_id.into(), path: path.into(), ts });
|
||||
}
|
||||
|
||||
pub fn lineage(&self) -> Vec<LineageEntry> {
|
||||
self.meta.lineage.clone()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ValuePath;
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
|
||||
pub struct ContextMeta {
|
||||
pub lineage: Vec<LineageEntry>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct LineageEntry {
|
||||
pub node_id: String,
|
||||
pub path: String,
|
||||
pub ts: u64,
|
||||
}
|
||||
|
||||
fn split_path(path: &str) -> Vec<&str> {
|
||||
path.split('.').filter(|s| !s.is_empty()).collect()
|
||||
}
|
||||
|
||||
fn get_path(root: &Value, path: &str) -> Option<Value> {
|
||||
let mut cur = root;
|
||||
for seg in split_path(path) {
|
||||
match cur {
|
||||
Value::Object(map) => {
|
||||
cur = map.get(seg)?;
|
||||
}
|
||||
Value::Array(arr) => {
|
||||
if let Ok(idx) = seg.parse::<usize>() {
|
||||
cur = arr.get(idx)?;
|
||||
} else {
|
||||
return None;
|
||||
}
|
||||
}
|
||||
_ => return None,
|
||||
}
|
||||
}
|
||||
Some(cur.clone())
|
||||
}
|
||||
|
||||
fn set_path(root: &mut Value, path: &str, value: Value) {
|
||||
let segs = split_path(path);
|
||||
let mut cur = root;
|
||||
for (i, seg) in segs.iter().enumerate() {
|
||||
let is_last = i == segs.len() - 1;
|
||||
match cur {
|
||||
Value::Object(map) => {
|
||||
if is_last {
|
||||
map.insert((*seg).to_string(), value);
|
||||
} else {
|
||||
if !map.contains_key(*seg) {
|
||||
map.insert((*seg).to_string(), json!({}));
|
||||
}
|
||||
cur = map.get_mut(*seg).unwrap();
|
||||
}
|
||||
}
|
||||
Value::Null => {
|
||||
*cur = json!({});
|
||||
if let Value::Object(map) = cur {
|
||||
if is_last {
|
||||
map.insert((*seg).to_string(), value);
|
||||
} else {
|
||||
map.insert((*seg).to_string(), json!({}));
|
||||
cur = map.get_mut(*seg).unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
// Overwrite non-object with object to proceed
|
||||
*cur = json!({});
|
||||
if let Value::Object(map) = cur {
|
||||
if is_last {
|
||||
map.insert((*seg).to_string(), value);
|
||||
} else {
|
||||
map.insert((*seg).to_string(), json!({}));
|
||||
cur = map.get_mut(*seg).unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
96
src/src/dsl.rs
Normal file
96
src/src/dsl.rs
Normal file
@ -0,0 +1,96 @@
|
||||
use crate::node::{FlowNode, NodeRef};
|
||||
use crate::node::{SequenceNode, ForkJoinNode, GroupNode, ExprSetNode, ExprGetNode, MergeMode};
|
||||
use crate::expr::ExprEngineKind;
|
||||
use std::sync::Arc;
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! sequence {
|
||||
( $($node:expr),* $(,)? ) => {{
|
||||
let mut nodes: Vec<std::sync::Arc<dyn $crate::node::FlowNode>> = Vec::new();
|
||||
$(
|
||||
nodes.push(std::sync::Arc::new($node));
|
||||
)*
|
||||
std::sync::Arc::new($crate::node::SequenceNode::new(nodes))
|
||||
}};
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! fork_join {
|
||||
( $($node:expr),* $(,)? ) => {{
|
||||
let mut nodes: Vec<std::sync::Arc<dyn $crate::node::FlowNode>> = Vec::new();
|
||||
$(
|
||||
nodes.push(std::sync::Arc::new($node));
|
||||
)*
|
||||
std::sync::Arc::new($crate::node::ForkJoinNode::new(nodes))
|
||||
}};
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! fork_join_merge {
|
||||
( $merge_path:expr, $mode:expr, $( $node:expr ),* $(,)? ) => {{
|
||||
let mut nodes: Vec<std::sync::Arc<dyn $crate::node::FlowNode>> = Vec::new();
|
||||
$(
|
||||
nodes.push(std::sync::Arc::new($node));
|
||||
)*
|
||||
std::sync::Arc::new($crate::node::ForkJoinNode::with_merge(nodes, $merge_path, $mode))
|
||||
}};
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! group {
|
||||
( $($node:expr),* $(,)? ) => {{
|
||||
let mut nodes: Vec<std::sync::Arc<dyn $crate::node::FlowNode>> = Vec::new();
|
||||
$(
|
||||
nodes.push(std::sync::Arc::new($node));
|
||||
)*
|
||||
std::sync::Arc::new($crate::node::GroupNode::new(nodes))
|
||||
}};
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! group_merge {
|
||||
( $merge_path:expr, $mode:expr, $( $node:expr ),* $(,)? ) => {{
|
||||
let mut nodes: Vec<std::sync::Arc<dyn $crate::node::FlowNode>> = Vec::new();
|
||||
$(
|
||||
nodes.push(std::sync::Arc::new($node));
|
||||
)*
|
||||
std::sync::Arc::new($crate::node::GroupNode::with_merge(nodes, $merge_path, $mode))
|
||||
}};
|
||||
}
|
||||
|
||||
pub fn expr_set(engine: ExprEngineKind, script: &str, target_path: &str) -> ExprSetNode {
|
||||
ExprSetNode::new(engine, script, target_path)
|
||||
}
|
||||
|
||||
pub fn expr_get(engine: ExprEngineKind, script: &str) -> ExprGetNode {
|
||||
ExprGetNode::new(engine, script)
|
||||
}
|
||||
|
||||
#[cfg(feature = "http")]
|
||||
pub fn http_get(url: &str) -> crate::node::HttpNode {
|
||||
crate::node::HttpNode::get(url)
|
||||
}
|
||||
|
||||
#[cfg(feature = "http")]
|
||||
pub fn http_post(url: &str, body: serde_json::Value) -> crate::node::HttpNode {
|
||||
crate::node::HttpNode::post(url, body)
|
||||
}
|
||||
|
||||
pub fn db_node(operation: &str, params: serde_json::Value) -> crate::node::DbNode {
|
||||
crate::node::DbNode::new(operation, params)
|
||||
}
|
||||
|
||||
pub fn mq_node(topic: &str, message: serde_json::Value) -> crate::node::MqNode {
|
||||
crate::node::MqNode::new(topic, message)
|
||||
}
|
||||
|
||||
pub fn merge_mode_object_by_id() -> MergeMode {
|
||||
MergeMode::ObjectById
|
||||
}
|
||||
pub fn merge_mode_array() -> MergeMode {
|
||||
MergeMode::Array
|
||||
}
|
||||
|
||||
pub fn lineage_node() -> crate::node::LineageNode {
|
||||
crate::node::LineageNode::new()
|
||||
}
|
||||
83
src/src/engine.rs
Normal file
83
src/src/engine.rs
Normal file
@ -0,0 +1,83 @@
|
||||
use crate::context::Context;
|
||||
use crate::expr::{ExprEngine, ExprEngineKind};
|
||||
use crate::node::{FlowNode, NodeOutput, NodeRef, SequenceNode};
|
||||
use crate::state::{FlowState, StateStore};
|
||||
use serde_json::Value;
|
||||
use std::sync::Arc;
|
||||
|
||||
pub struct Flow {
|
||||
pub entry: NodeRef,
|
||||
}
|
||||
|
||||
impl Flow {
|
||||
pub fn new(entry: NodeRef) -> Self {
|
||||
Self { entry }
|
||||
}
|
||||
|
||||
pub fn sequence(nodes: Vec<NodeRef>) -> Self {
|
||||
Self { entry: Arc::new(SequenceNode::new(nodes)) }
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct FlowOptions {
|
||||
pub stateful: bool,
|
||||
pub expr_engine: ExprEngineKind,
|
||||
}
|
||||
|
||||
impl Default for FlowOptions {
|
||||
fn default() -> Self {
|
||||
Self { stateful: false, expr_engine: ExprEngineKind::Rhai }
|
||||
}
|
||||
}
|
||||
|
||||
pub struct FlowEngine<S: StateStore> {
|
||||
store: S,
|
||||
expr: Arc<dyn ExprEngine>,
|
||||
}
|
||||
|
||||
impl<S: StateStore> FlowEngine<S> {
|
||||
pub fn new(store: S, options: FlowOptions) -> Self {
|
||||
let expr: Arc<dyn ExprEngine> = match options.expr_engine {
|
||||
ExprEngineKind::Rhai => {
|
||||
#[cfg(feature = "rhai")]
|
||||
{
|
||||
Arc::new(crate::expr::RhaiEngine::new())
|
||||
}
|
||||
#[cfg(not(feature = "rhai"))]
|
||||
{
|
||||
panic!("Rhai feature not enabled")
|
||||
}
|
||||
}
|
||||
ExprEngineKind::Js => {
|
||||
#[cfg(feature = "js")]
|
||||
{
|
||||
Arc::new(crate::expr::JsEngine::new())
|
||||
}
|
||||
#[cfg(not(feature = "js"))]
|
||||
{
|
||||
panic!("JS feature not enabled")
|
||||
}
|
||||
}
|
||||
};
|
||||
Self { store, expr }
|
||||
}
|
||||
|
||||
pub async fn run_stateless(&self, flow: &Flow, mut ctx: Context) -> FlowResult {
|
||||
let out = flow.entry.execute(&mut ctx, Some(self.expr.as_ref())).await?;
|
||||
Ok(out)
|
||||
}
|
||||
|
||||
pub async fn run_stateful(&self, session_id: &str, flow: &Flow, mut ctx: Context) -> FlowResult {
|
||||
if let Some(stored) = self.store.load(session_id).await {
|
||||
ctx = stored.context;
|
||||
}
|
||||
let out = flow.entry.execute(&mut ctx, Some(self.expr.as_ref())).await?;
|
||||
let state = FlowState { session_id: session_id.to_string(), context: ctx };
|
||||
self.store.save(state).await;
|
||||
Ok(out)
|
||||
}
|
||||
}
|
||||
|
||||
pub type FlowResult = Result<NodeOutput, Box<dyn std::error::Error + Send + Sync>>;
|
||||
|
||||
110
src/src/expr.rs
Normal file
110
src/src/expr.rs
Normal file
@ -0,0 +1,110 @@
|
||||
use crate::context::Context;
|
||||
use serde_json::Value;
|
||||
use thiserror::Error;
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum ExprError {
|
||||
#[error("Rhai error: {0}")]
|
||||
Rhai(String),
|
||||
#[error("JS error: {0}")]
|
||||
Js(String),
|
||||
#[error("Unsupported engine")]
|
||||
Unsupported,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum ExprEngineKind {
|
||||
Rhai,
|
||||
Js,
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
pub trait ExprEngine: Send + Sync {
|
||||
async fn eval(&self, script: &str, ctx: &Context) -> Result<Value, ExprError>;
|
||||
}
|
||||
|
||||
#[cfg(feature = "rhai")]
|
||||
pub struct RhaiEngine {
|
||||
engine: rhai::Engine,
|
||||
}
|
||||
|
||||
#[cfg(feature = "rhai")]
|
||||
impl RhaiEngine {
|
||||
pub fn new() -> Self {
|
||||
let mut engine = rhai::Engine::new();
|
||||
Self { engine }
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "rhai")]
|
||||
#[async_trait::async_trait]
|
||||
impl ExprEngine for RhaiEngine {
|
||||
async fn eval(&self, script: &str, ctx: &Context) -> Result<Value, ExprError> {
|
||||
use rhai::EvalAltResult;
|
||||
let mut scope = rhai::Scope::new();
|
||||
let ctx_dynamic = rhai::Dynamic::from(ctx.as_value().clone());
|
||||
scope.push("ctx", ctx_dynamic);
|
||||
self.engine
|
||||
.eval_with_scope::<rhai::Dynamic>(&mut scope, script)
|
||||
.map_err(|e| ExprError::Rhai(format!("{e:?}")))
|
||||
.and_then(|dynv| {
|
||||
let v: Result<serde_json::Value, _> = rhai::serde::to_dynamic_value(&dynv);
|
||||
v.map_err(|e| ExprError::Rhai(format!("{e:?}")))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "js")]
|
||||
pub struct JsEngine;
|
||||
|
||||
#[cfg(feature = "js")]
|
||||
impl JsEngine {
|
||||
pub fn new() -> Self {
|
||||
Self
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "js")]
|
||||
#[async_trait::async_trait]
|
||||
impl ExprEngine for JsEngine {
|
||||
async fn eval(&self, script: &str, ctx: &Context) -> Result<Value, ExprError> {
|
||||
use boa_engine::{Context as JsContext, Source};
|
||||
let mut js_ctx = JsContext::default();
|
||||
let ctx_json = ctx.as_value().to_string();
|
||||
let wrapped = format!("const ctx = JSON.parse(`{}`);\n({})", escape_backticks(&ctx_json), script);
|
||||
let value = boa_engine::JsValue::eval(Source::from_bytes(wrapped.as_bytes()), &mut js_ctx)
|
||||
.map_err(|e| ExprError::Js(format!("{e:?}")))?;
|
||||
to_json(value, &mut js_ctx).map_err(|e| ExprError::Js(e))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "js")]
|
||||
fn escape_backticks(s: &str) -> String {
|
||||
s.replace('`', "\\`")
|
||||
}
|
||||
|
||||
#[cfg(feature = "js")]
|
||||
fn to_json(v: boa_engine::JsValue, ctx: &mut boa_engine::Context) -> Result<Value, String> {
|
||||
use boa_engine::JsValue;
|
||||
if v.is_null() || v.is_undefined() {
|
||||
Ok(Value::Null)
|
||||
} else if let Some(b) = v.as_boolean() {
|
||||
Ok(Value::Bool(b))
|
||||
} else if let Some(n) = v.as_number() {
|
||||
Ok(serde_json::Number::from_f64(n)
|
||||
.map(Value::Number)
|
||||
.unwrap_or(Value::Null))
|
||||
} else if let Some(s) = v.as_string() {
|
||||
Ok(Value::String(s.to_std_string_escaped().to_string()))
|
||||
} else {
|
||||
// Try JSON.stringify
|
||||
let script = format!("JSON.stringify({})", v.display().to_string());
|
||||
let res = JsValue::eval(script.as_str().into(), ctx).map_err(|e| format!("{e:?}"))?;
|
||||
if let Some(s) = res.as_string() {
|
||||
serde_json::from_str::<Value>(&s.to_std_string_escaped()).map_err(|e| format!("{e}"))
|
||||
} else {
|
||||
Err("Unsupported JS value".to_string())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
19
src/src/lib.rs
Normal file
19
src/src/lib.rs
Normal file
@ -0,0 +1,19 @@
|
||||
pub mod engine;
|
||||
pub mod context;
|
||||
pub mod expr;
|
||||
pub mod node;
|
||||
pub mod dsl;
|
||||
pub mod state;
|
||||
|
||||
pub use engine::{Flow, FlowEngine, FlowOptions, FlowResult};
|
||||
pub use context::{Context, ValuePath};
|
||||
pub use expr::{ExprEngineKind, ExprEngine};
|
||||
#[cfg(feature = "js")]
|
||||
pub use expr::JsEngine;
|
||||
#[cfg(feature = "rhai")]
|
||||
pub use expr::RhaiEngine;
|
||||
pub use node::{NodeId, FlowNode, NodeOutput, HttpNode, ExprSetNode, ExprGetNode, SequenceNode, ForkJoinNode, GroupNode, ConditionalNode, DbNode, MqNode, MergeMode, LineageNode};
|
||||
pub use context::{LineageEntry};
|
||||
pub use dsl::*;
|
||||
pub use state::{StateStore, InMemoryStateStore, FlowState};
|
||||
|
||||
404
src/src/node.rs
Normal file
404
src/src/node.rs
Normal file
@ -0,0 +1,404 @@
|
||||
use crate::context::Context;
|
||||
use crate::expr::{ExprEngine, ExprEngineKind};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::Value;
|
||||
use std::sync::Arc;
|
||||
use thiserror::Error;
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum NodeError {
|
||||
#[error("Execution error: {0}")]
|
||||
Exec(String),
|
||||
}
|
||||
|
||||
pub type NodeId = String;
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct NodeOutput {
|
||||
pub id: NodeId,
|
||||
pub data: Value,
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
pub trait FlowNode: Send + Sync {
|
||||
fn id(&self) -> &str;
|
||||
async fn execute(&self, ctx: &mut Context, expr: Option<&dyn ExprEngine>) -> Result<NodeOutput, NodeError>;
|
||||
}
|
||||
|
||||
pub type NodeRef = Arc<dyn FlowNode>;
|
||||
|
||||
pub fn node_id(prefix: &str) -> NodeId {
|
||||
format!("{}-{}", prefix, uuid::Uuid::new_v4())
|
||||
}
|
||||
|
||||
pub struct SequenceNode {
|
||||
id: NodeId,
|
||||
children: Vec<NodeRef>,
|
||||
}
|
||||
|
||||
impl SequenceNode {
|
||||
pub fn new(children: Vec<NodeRef>) -> Self {
|
||||
Self { id: node_id("seq"), children }
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl FlowNode for SequenceNode {
|
||||
fn id(&self) -> &str {
|
||||
&self.id
|
||||
}
|
||||
|
||||
async fn execute(&self, ctx: &mut Context, expr: Option<&dyn ExprEngine>) -> Result<NodeOutput, NodeError> {
|
||||
let mut last = Value::Null;
|
||||
for child in &self.children {
|
||||
let out = child.execute(ctx, expr).await?;
|
||||
last = out.data;
|
||||
}
|
||||
Ok(NodeOutput { id: self.id.clone(), data: last })
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "http")]
|
||||
pub struct HttpNode {
|
||||
id: NodeId,
|
||||
method: String,
|
||||
url: String,
|
||||
body: Option<Value>,
|
||||
}
|
||||
|
||||
#[cfg(feature = "http")]
|
||||
impl HttpNode {
|
||||
pub fn get(url: impl Into<String>) -> Self {
|
||||
Self { id: node_id("http"), method: "GET".into(), url: url.into(), body: None }
|
||||
}
|
||||
pub fn post(url: impl Into<String>, body: Value) -> Self {
|
||||
Self { id: node_id("http"), method: "POST".into(), url: url.into(), body: Some(body) }
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "http")]
|
||||
#[async_trait::async_trait]
|
||||
impl FlowNode for HttpNode {
|
||||
fn id(&self) -> &str {
|
||||
&self.id
|
||||
}
|
||||
|
||||
async fn execute(&self, _ctx: &mut Context, _expr: Option<&dyn ExprEngine>) -> Result<NodeOutput, NodeError> {
|
||||
let client = reqwest::Client::new();
|
||||
let resp = match self.method.as_str() {
|
||||
"GET" => client.get(&self.url).send().await,
|
||||
"POST" => client.post(&self.url).json(&self.body).send().await,
|
||||
_ => return Err(NodeError::Exec("Unsupported HTTP method".into())),
|
||||
}
|
||||
.map_err(|e| NodeError::Exec(format!("{e}")))?;
|
||||
let status = resp.status().as_u16();
|
||||
let json = resp.json::<Value>().await.map_err(|e| NodeError::Exec(format!("{e}")))?;
|
||||
Ok(NodeOutput { id: self.id.clone(), data: serde_json::json!({ "status": status, "body": json }) })
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ExprSetNode {
|
||||
id: NodeId,
|
||||
engine: ExprEngineKind,
|
||||
script: String,
|
||||
target_path: String,
|
||||
}
|
||||
|
||||
impl ExprSetNode {
|
||||
pub fn new(engine: ExprEngineKind, script: impl Into<String>, target_path: impl Into<String>) -> Self {
|
||||
Self {
|
||||
id: node_id("expr_set"),
|
||||
engine,
|
||||
script: script.into(),
|
||||
target_path: target_path.into(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl FlowNode for ExprSetNode {
|
||||
fn id(&self) -> &str {
|
||||
&self.id
|
||||
}
|
||||
|
||||
async fn execute(&self, ctx: &mut Context, expr: Option<&dyn ExprEngine>) -> Result<NodeOutput, NodeError> {
|
||||
let engine = expr.ok_or_else(|| NodeError::Exec("Expr engine not provided".into()))?;
|
||||
let val = engine.eval(&self.script, ctx).await.map_err(|e| NodeError::Exec(e.to_string()))?;
|
||||
ctx.set(&self.target_path, val.clone());
|
||||
ctx.record_write(self.id.clone(), self.target_path.clone());
|
||||
Ok(NodeOutput { id: self.id.clone(), data: val })
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ExprGetNode {
|
||||
id: NodeId,
|
||||
engine: ExprEngineKind,
|
||||
script: String,
|
||||
}
|
||||
|
||||
impl ExprGetNode {
|
||||
pub fn new(engine: ExprEngineKind, script: impl Into<String>) -> Self {
|
||||
Self { id: node_id("expr_get"), engine, script: script.into() }
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl FlowNode for ExprGetNode {
|
||||
fn id(&self) -> &str {
|
||||
&self.id
|
||||
}
|
||||
|
||||
async fn execute(&self, ctx: &mut Context, expr: Option<&dyn ExprEngine>) -> Result<NodeOutput, NodeError> {
|
||||
let engine = expr.ok_or_else(|| NodeError::Exec("Expr engine not provided".into()))?;
|
||||
let val = engine.eval(&self.script, ctx).await.map_err(|e| NodeError::Exec(e.to_string()))?;
|
||||
Ok(NodeOutput { id: self.id.clone(), data: val })
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ForkJoinNode {
|
||||
id: NodeId,
|
||||
branches: Vec<NodeRef>,
|
||||
merge_to_ctx: Option<String>,
|
||||
merge_mode: MergeMode,
|
||||
}
|
||||
|
||||
impl ForkJoinNode {
|
||||
pub fn new(branches: Vec<NodeRef>) -> Self {
|
||||
Self { id: node_id("fork"), branches, merge_to_ctx: None, merge_mode: MergeMode::Array }
|
||||
}
|
||||
pub fn with_merge(branches: Vec<NodeRef>, merge_to_ctx: impl Into<String>, merge_mode: MergeMode) -> Self {
|
||||
Self { id: node_id("fork"), branches, merge_to_ctx: Some(merge_to_ctx.into()), merge_mode }
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl FlowNode for ForkJoinNode {
|
||||
fn id(&self) -> &str {
|
||||
&self.id
|
||||
}
|
||||
|
||||
async fn execute(&self, ctx: &mut Context, expr: Option<&dyn ExprEngine>) -> Result<NodeOutput, NodeError> {
|
||||
let mut tasks = Vec::with_capacity(self.branches.len());
|
||||
for b in &self.branches {
|
||||
let mut subctx = Context::with_value(ctx.as_value().clone());
|
||||
let b = b.clone();
|
||||
let expr = expr;
|
||||
tasks.push(tokio::spawn(async move { b.execute(&mut subctx, expr).await }));
|
||||
}
|
||||
let mut results = Vec::new();
|
||||
for t in tasks {
|
||||
let out = t.await.map_err(|e| NodeError::Exec(format!("Join error: {e}")))??;
|
||||
results.push(serde_json::json!({ "id": out.id, "data": out.data }));
|
||||
}
|
||||
let data = match self.merge_mode {
|
||||
MergeMode::Array => Value::Array(results.clone()),
|
||||
MergeMode::ObjectById => {
|
||||
let mut map = serde_json::Map::new();
|
||||
for item in &results {
|
||||
let id = item.get("id").and_then(|v| v.as_str()).unwrap_or_default().to_string();
|
||||
let data = item.get("data").cloned().unwrap_or(Value::Null);
|
||||
map.insert(id, data);
|
||||
}
|
||||
Value::Object(map)
|
||||
}
|
||||
};
|
||||
if let Some(path) = &self.merge_to_ctx {
|
||||
ctx.set(path, data.clone());
|
||||
ctx.record_write(self.id.clone(), path.clone());
|
||||
}
|
||||
Ok(NodeOutput { id: self.id.clone(), data })
|
||||
}
|
||||
}
|
||||
|
||||
pub struct GroupNode {
|
||||
id: NodeId,
|
||||
parallel: Vec<NodeRef>,
|
||||
merge_to_ctx: Option<String>,
|
||||
merge_mode: MergeMode,
|
||||
}
|
||||
|
||||
impl GroupNode {
|
||||
pub fn new(parallel: Vec<NodeRef>) -> Self {
|
||||
Self { id: node_id("group"), parallel, merge_to_ctx: None, merge_mode: MergeMode::Array }
|
||||
}
|
||||
pub fn with_merge(parallel: Vec<NodeRef>, merge_to_ctx: impl Into<String>, merge_mode: MergeMode) -> Self {
|
||||
Self { id: node_id("group"), parallel, merge_to_ctx: Some(merge_to_ctx.into()), merge_mode }
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl FlowNode for GroupNode {
|
||||
fn id(&self) -> &str {
|
||||
&self.id
|
||||
}
|
||||
|
||||
async fn execute(&self, ctx: &mut Context, expr: Option<&dyn ExprEngine>) -> Result<NodeOutput, NodeError> {
|
||||
let mut joins = Vec::with_capacity(self.parallel.len());
|
||||
for n in &self.parallel {
|
||||
let mut subctx = Context::with_value(ctx.as_value().clone());
|
||||
let n = n.clone();
|
||||
let expr = expr;
|
||||
joins.push(tokio::spawn(async move { n.execute(&mut subctx, expr).await }));
|
||||
}
|
||||
let mut results = Vec::new();
|
||||
for j in joins {
|
||||
let out = j.await.map_err(|e| NodeError::Exec(format!("Group join error: {e}")))??;
|
||||
results.push(serde_json::json!({ "id": out.id, "data": out.data }));
|
||||
}
|
||||
let data = match self.merge_mode {
|
||||
MergeMode::Array => Value::Array(results.clone()),
|
||||
MergeMode::ObjectById => {
|
||||
let mut map = serde_json::Map::new();
|
||||
for item in &results {
|
||||
let id = item.get("id").and_then(|v| v.as_str()).unwrap_or_default().to_string();
|
||||
let data = item.get("data").cloned().unwrap_or(Value::Null);
|
||||
map.insert(id, data);
|
||||
}
|
||||
Value::Object(map)
|
||||
}
|
||||
};
|
||||
if let Some(path) = &self.merge_to_ctx {
|
||||
ctx.set(path, data.clone());
|
||||
ctx.record_write(self.id.clone(), path.clone());
|
||||
}
|
||||
Ok(NodeOutput { id: self.id.clone(), data })
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ConditionalNode {
|
||||
id: NodeId,
|
||||
engine: ExprEngineKind,
|
||||
condition: String,
|
||||
then_node: NodeRef,
|
||||
else_node: Option<NodeRef>,
|
||||
}
|
||||
|
||||
impl ConditionalNode {
|
||||
pub fn new(engine: ExprEngineKind, condition: impl Into<String>, then_node: NodeRef, else_node: Option<NodeRef>) -> Self {
|
||||
Self {
|
||||
id: node_id("cond"),
|
||||
engine,
|
||||
condition: condition.into(),
|
||||
then_node,
|
||||
else_node,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl FlowNode for ConditionalNode {
|
||||
fn id(&self) -> &str {
|
||||
&self.id
|
||||
}
|
||||
|
||||
async fn execute(&self, ctx: &mut Context, expr: Option<&dyn ExprEngine>) -> Result<NodeOutput, NodeError> {
|
||||
let engine = expr.ok_or_else(|| NodeError::Exec("Expr engine not provided".into()))?;
|
||||
let val = engine.eval(&self.condition, ctx).await.map_err(|e| NodeError::Exec(e.to_string()))?;
|
||||
let cond = match val {
|
||||
Value::Bool(b) => b,
|
||||
_ => false,
|
||||
};
|
||||
let selected = if cond { &self.then_node } else { self.else_node.as_ref().unwrap_or(&self.then_node) };
|
||||
let out = selected.execute(ctx, Some(engine)).await?;
|
||||
Ok(NodeOutput { id: self.id.clone(), data: out.data })
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
|
||||
pub enum MergeMode {
|
||||
Array,
|
||||
ObjectById,
|
||||
}
|
||||
|
||||
pub struct DbNode {
|
||||
id: NodeId,
|
||||
operation: String,
|
||||
params: Value,
|
||||
}
|
||||
|
||||
impl DbNode {
|
||||
pub fn new(operation: impl Into<String>, params: Value) -> Self {
|
||||
Self { id: node_id("db"), operation: operation.into(), params }
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl FlowNode for DbNode {
|
||||
fn id(&self) -> &str {
|
||||
&self.id
|
||||
}
|
||||
|
||||
async fn execute(&self, ctx: &mut Context, _expr: Option<&dyn ExprEngine>) -> Result<NodeOutput, NodeError> {
|
||||
let result = serde_json::json!({
|
||||
"op": self.operation,
|
||||
"params": self.params,
|
||||
"status": "ok"
|
||||
});
|
||||
ctx.set("db.last", result.clone());
|
||||
ctx.record_write(self.id.clone(), "db.last".to_string());
|
||||
Ok(NodeOutput { id: self.id.clone(), data: result })
|
||||
}
|
||||
}
|
||||
|
||||
pub struct MqNode {
|
||||
id: NodeId,
|
||||
topic: String,
|
||||
message: Value,
|
||||
}
|
||||
|
||||
impl MqNode {
|
||||
pub fn new(topic: impl Into<String>, message: Value) -> Self {
|
||||
Self { id: node_id("mq"), topic: topic.into(), message }
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl FlowNode for MqNode {
|
||||
fn id(&self) -> &str {
|
||||
&self.id
|
||||
}
|
||||
|
||||
async fn execute(&self, ctx: &mut Context, _expr: Option<&dyn ExprEngine>) -> Result<NodeOutput, NodeError> {
|
||||
let result = serde_json::json!({
|
||||
"topic": self.topic,
|
||||
"message": self.message,
|
||||
"status": "sent"
|
||||
});
|
||||
ctx.set("mq.last", result.clone());
|
||||
ctx.record_write(self.id.clone(), "mq.last".to_string());
|
||||
Ok(NodeOutput { id: self.id.clone(), data: result })
|
||||
}
|
||||
}
|
||||
|
||||
pub struct LineageNode {
|
||||
id: NodeId,
|
||||
target_path: Option<String>,
|
||||
}
|
||||
|
||||
impl LineageNode {
|
||||
pub fn new() -> Self {
|
||||
Self { id: node_id("lineage"), target_path: None }
|
||||
}
|
||||
pub fn to_path(mut self, path: impl Into<String>) -> Self {
|
||||
self.target_path = Some(path.into());
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl FlowNode for LineageNode {
|
||||
fn id(&self) -> &str {
|
||||
&self.id
|
||||
}
|
||||
|
||||
async fn execute(&self, ctx: &mut Context, _expr: Option<&dyn ExprEngine>) -> Result<NodeOutput, NodeError> {
|
||||
let items = ctx.lineage();
|
||||
let data = serde_json::to_value(items).map_err(|e| NodeError::Exec(format!("{e}")))?;
|
||||
if let Some(p) = &self.target_path {
|
||||
ctx.set(p, data.clone());
|
||||
ctx.record_write(self.id.clone(), p.clone());
|
||||
}
|
||||
Ok(NodeOutput { id: self.id.clone(), data })
|
||||
}
|
||||
}
|
||||
|
||||
33
src/src/state.rs
Normal file
33
src/src/state.rs
Normal file
@ -0,0 +1,33 @@
|
||||
use crate::context::Context;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct FlowState {
|
||||
pub session_id: String,
|
||||
pub context: Context,
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
pub trait StateStore: Send + Sync {
|
||||
async fn save(&self, state: FlowState);
|
||||
async fn load(&self, session_id: &str) -> Option<FlowState>;
|
||||
}
|
||||
|
||||
#[derive(Clone, Default)]
|
||||
pub struct InMemoryStateStore {
|
||||
inner: Arc<Mutex<HashMap<String, FlowState>>>,
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl StateStore for InMemoryStateStore {
|
||||
async fn save(&self, state: FlowState) {
|
||||
self.inner.lock().unwrap().insert(state.session_id.clone(), state);
|
||||
}
|
||||
|
||||
async fn load(&self, session_id: &str) -> Option<FlowState> {
|
||||
self.inner.lock().unwrap().get(session_id).cloned()
|
||||
}
|
||||
}
|
||||
|
||||
198
src/tests/flow_tests.rs
Normal file
198
src/tests/flow_tests.rs
Normal file
@ -0,0 +1,198 @@
|
||||
use dsl_flow::*;
|
||||
use serde_json::json;
|
||||
|
||||
fn write_report(name: &str, dur: std::time::Duration, ok: bool) {
|
||||
use std::fs::{create_dir_all, OpenOptions};
|
||||
use std::io::Write;
|
||||
let _ = create_dir_all("target/test-reports");
|
||||
if let Ok(mut f) = OpenOptions::new().create(true).append(true).open("target/test-reports/default.jsonl") {
|
||||
let line = serde_json::json!({
|
||||
"name": name,
|
||||
"ok": ok,
|
||||
"duration_sec": dur.as_secs_f64()
|
||||
}).to_string();
|
||||
let _ = writeln!(f, "{}", line);
|
||||
}
|
||||
}
|
||||
|
||||
struct SleepNode {
|
||||
ms: u64,
|
||||
id: String,
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl dsl_flow::FlowNode for SleepNode {
|
||||
fn id(&self) -> &str {
|
||||
&self.id
|
||||
}
|
||||
async fn execute(&self, ctx: &mut dsl_flow::Context, _expr: Option<&dyn dsl_flow::ExprEngine>) -> Result<dsl_flow::NodeOutput, dsl_flow::node::NodeError> {
|
||||
tokio::time::sleep(std::time::Duration::from_millis(self.ms)).await;
|
||||
ctx.set("sleep.last", json!({"slept_ms": self.ms}));
|
||||
Ok(dsl_flow::NodeOutput { id: self.id.clone(), data: json!(self.ms) })
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_rhai_expr_set_and_get() -> anyhow::Result<()> {
|
||||
let start = std::time::Instant::now();
|
||||
let store = InMemoryStateStore::default();
|
||||
let engine = FlowEngine::new(store, FlowOptions { stateful: false, expr_engine: ExprEngineKind::Rhai });
|
||||
let flow = Flow::new(sequence! {
|
||||
expr_set(ExprEngineKind::Rhai, "1 + 2 + 3", "calc.sum"),
|
||||
expr_set(ExprEngineKind::Rhai, "ctx.calc.sum * 2", "calc.double"),
|
||||
});
|
||||
let ctx = Context::new();
|
||||
let out = engine.run_stateless(&flow, ctx).await?;
|
||||
let arr = out.data;
|
||||
// Sequence returns last child's output
|
||||
assert_eq!(arr, json!(8));
|
||||
write_report("test_rhai_expr_set_and_get", start.elapsed(), true);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_conditional_node_then_else() -> anyhow::Result<()> {
|
||||
let start = std::time::Instant::now();
|
||||
use std::sync::Arc;
|
||||
let store = InMemoryStateStore::default();
|
||||
let engine = FlowEngine::new(store, FlowOptions { stateful: false, expr_engine: ExprEngineKind::Rhai });
|
||||
let then = Arc::new(expr_set(ExprEngineKind::Rhai, "42", "branch.result")) as Arc<dyn FlowNode>;
|
||||
let els = Arc::new(expr_set(ExprEngineKind::Rhai, "0", "branch.result")) as Arc<dyn FlowNode>;
|
||||
let cond = Arc::new(dsl_flow::node::ConditionalNode::new(ExprEngineKind::Rhai, "false", then.clone(), Some(els.clone())));
|
||||
let flow = Flow::new(sequence! { (*cond).clone() });
|
||||
let ctx = Context::new();
|
||||
let out = engine.run_stateless(&flow, ctx).await?;
|
||||
assert_eq!(out.data, json!(0));
|
||||
write_report("test_conditional_node_then_else", start.elapsed(), true);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(feature = "js")]
|
||||
#[tokio::test]
|
||||
async fn test_js_expr_and_fork_join() -> anyhow::Result<()> {
|
||||
let start = std::time::Instant::now();
|
||||
let store = InMemoryStateStore::default();
|
||||
let engine = FlowEngine::new(store, FlowOptions { stateful: false, expr_engine: ExprEngineKind::Js });
|
||||
let flow = Flow::new(sequence! {
|
||||
expr_set(ExprEngineKind::Js, "({ a: 1, b: 2 })", "obj"),
|
||||
fork_join! {
|
||||
expr_set(ExprEngineKind::Js, "ctx.obj.a + ctx.obj.b", "sum"),
|
||||
expr_set(ExprEngineKind::Js, "ctx.obj.a * ctx.obj.b", "mul")
|
||||
}
|
||||
});
|
||||
let ctx = Context::new();
|
||||
let out = engine.run_stateless(&flow, ctx).await?;
|
||||
let data = out.data;
|
||||
assert!(data.is_array());
|
||||
write_report("test_js_expr_and_fork_join", start.elapsed(), true);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(feature = "http")]
|
||||
#[tokio::test]
|
||||
async fn test_http_node_with_mock() -> anyhow::Result<()> {
|
||||
let start = std::time::Instant::now();
|
||||
use httpmock::MockServer;
|
||||
use httpmock::Method::GET;
|
||||
|
||||
let server = MockServer::start_async().await;
|
||||
let _m = server.mock_async(|when, then| {
|
||||
when.method(GET).path("/data");
|
||||
then.status(200)
|
||||
.header("content-type", "application/json")
|
||||
.json_body(json!({ "ok": true, "msg": "hello" }));
|
||||
}).await;
|
||||
|
||||
let store = InMemoryStateStore::default();
|
||||
let engine = FlowEngine::new(store, FlowOptions { stateful: false, expr_engine: ExprEngineKind::Rhai });
|
||||
let flow = Flow::new(sequence! {
|
||||
dsl_flow::http_get(&format!("{}/data", server.base_url()))
|
||||
});
|
||||
let ctx = Context::new();
|
||||
let out = engine.run_stateless(&flow, ctx).await?;
|
||||
let body = out.data.get("body").unwrap().clone();
|
||||
assert_eq!(body.get("ok").unwrap(), &json!(true));
|
||||
write_report("test_http_node_with_mock", start.elapsed(), true);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_stateful_engine() -> anyhow::Result<()> {
|
||||
let start = std::time::Instant::now();
|
||||
let store = InMemoryStateStore::default();
|
||||
let engine = FlowEngine::new(store.clone(), FlowOptions { stateful: true, expr_engine: ExprEngineKind::Rhai });
|
||||
let flow = Flow::new(sequence! {
|
||||
expr_set(ExprEngineKind::Rhai, "if ctx.counter == null { 0 } else { ctx.counter + 1 }", "counter")
|
||||
});
|
||||
let mut ctx = Context::new();
|
||||
let s = "session-1";
|
||||
let _out1 = engine.run_stateful(s, &flow, ctx.clone()).await?;
|
||||
let out2 = engine.run_stateful(s, &flow, ctx.clone()).await?;
|
||||
assert_eq!(out2.data, json!(1));
|
||||
write_report("test_stateful_engine", start.elapsed(), true);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_db_and_mq_nodes() -> anyhow::Result<()> {
|
||||
let start = std::time::Instant::now();
|
||||
let store = InMemoryStateStore::default();
|
||||
let engine = FlowEngine::new(store, FlowOptions { stateful: false, expr_engine: ExprEngineKind::Rhai });
|
||||
let flow = Flow::new(sequence! {
|
||||
dsl_flow::db_node("insert_user", json!({"name": "Alice"})),
|
||||
dsl_flow::mq_node("user.events", json!({"event": "created", "user": "Alice"})),
|
||||
});
|
||||
let ctx = Context::new();
|
||||
let out = engine.run_stateless(&flow, ctx).await?;
|
||||
assert_eq!(out.data.get("status").unwrap(), &json!("sent"));
|
||||
write_report("test_db_and_mq_nodes", start.elapsed(), true);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_group_parallel_sleep() -> anyhow::Result<()> {
|
||||
use std::sync::Arc;
|
||||
let store = InMemoryStateStore::default();
|
||||
let engine = FlowEngine::new(store, FlowOptions { stateful: false, expr_engine: ExprEngineKind::Rhai });
|
||||
let n1: Arc<dyn FlowNode> = Arc::new(SleepNode { ms: 200, id: "sleep-200".into() });
|
||||
let n2: Arc<dyn FlowNode> = Arc::new(SleepNode { ms: 200, id: "sleep-200b".into() });
|
||||
let group = group_merge! { "agg.group", merge_mode_array(), (*n1).clone(), (*n2).clone() };
|
||||
let flow = Flow::new(sequence! { (*group).clone() });
|
||||
let ctx = Context::new();
|
||||
let start = std::time::Instant::now();
|
||||
let _ = engine.run_stateless(&flow, ctx).await?;
|
||||
let elapsed = start.elapsed();
|
||||
assert!(elapsed.as_millis() < 380, "elapsed={}ms", elapsed.as_millis());
|
||||
write_report("test_group_parallel_sleep", start.elapsed(), true);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_expr_set_without_engine_error() -> anyhow::Result<()> {
|
||||
let start = std::time::Instant::now();
|
||||
let mut ctx = Context::new();
|
||||
let node = expr_set(ExprEngineKind::Rhai, "1+1", "x");
|
||||
let res = dsl_flow::FlowNode::execute(&node, &mut ctx, None).await;
|
||||
assert!(res.is_err());
|
||||
write_report("test_expr_set_without_engine_error", start.elapsed(), true);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_fork_join_merge_and_lineage() -> anyhow::Result<()> {
|
||||
let store = InMemoryStateStore::default();
|
||||
let engine = FlowEngine::new(store, FlowOptions { stateful: false, expr_engine: ExprEngineKind::Rhai });
|
||||
let flow = Flow::new(sequence! {
|
||||
fork_join_merge! { "agg.fork", merge_mode_object_by_id(),
|
||||
expr_set(ExprEngineKind::Rhai, "10", "a.x"),
|
||||
expr_set(ExprEngineKind::Rhai, "20", "b.y")
|
||||
},
|
||||
expr_get(ExprEngineKind::Rhai, "ctx.agg.fork")
|
||||
});
|
||||
let ctx = Context::new();
|
||||
let out = engine.run_stateless(&flow, ctx).await?;
|
||||
let obj = out.data;
|
||||
assert!(obj.is_object());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user