feat: 新增条件节点和多语言脚本支持

refactor(flow): 将Decision节点重命名为Condition节点
feat(flow): 新增多语言脚本执行器(Rhai/JS/Python)
feat(flow): 实现变量映射和执行功能
feat(flow): 添加条件节点执行逻辑
feat(frontend): 为开始/结束节点添加多语言描述
test: 添加yaml条件转换测试
chore: 移除废弃的storage模块
This commit is contained in:
2025-09-19 13:41:52 +08:00
parent 81757eecf5
commit 62789fce42
25 changed files with 1651 additions and 313 deletions

View File

@ -8,7 +8,7 @@ pub enum NodeKind {
Start,
End,
Task,
Decision,
Condition,
}
impl Default for NodeKind {

View File

@ -17,7 +17,7 @@ pub struct FlowDSL {
pub struct NodeDSL {
pub id: String,
#[serde(default)]
pub kind: String, // start / end / task / decision
pub kind: String, // 节点类型:start / end / task / condition开始/结束/任务/条件)
#[serde(default)]
pub name: String,
#[serde(default)]
@ -46,7 +46,7 @@ impl From<FlowDSL> for super::domain::ChainDef {
kind: match n.kind.to_lowercase().as_str() {
"start" => super::domain::NodeKind::Start,
"end" => super::domain::NodeKind::End,
"decision" => super::domain::NodeKind::Decision,
"decision" | "condition" => super::domain::NodeKind::Condition,
_ => super::domain::NodeKind::Task,
},
name: n.name,
@ -66,131 +66,188 @@ impl From<FlowDSL> for super::domain::ChainDef {
}
}
// ===== New: Parse design_json (free layout JSON) to ChainDef =====
// ===== design_json(前端自由布局 JSON)解析为 ChainDef =====
/// Build ChainDef from design_json (front-end flow JSON)
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DesignSyntax {
#[serde(default)]
pub name: String,
#[serde(default)]
pub nodes: Vec<NodeSyntax>,
#[serde(default)]
pub edges: Vec<EdgeSyntax>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NodeSyntax {
pub id: String,
#[serde(rename = "type", default)]
pub kind: String, // 取值: start | end | condition | http | db | task开始/结束/条件/HTTP/数据库/通用任务)
#[serde(default)]
pub data: serde_json::Value,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EdgeSyntax {
#[serde(alias = "sourceNodeID", alias = "source", alias = "from")]
pub from: String,
#[serde(alias = "targetNodeID", alias = "target", alias = "to")]
pub to: String,
#[serde(default)]
pub source_port_id: Option<String>,
}
/// 从 design_json前端流程 JSON构建 ChainDef
fn validate_design(design: &DesignSyntax) -> anyhow::Result<()> {
use std::collections::HashSet;
let mut ids = HashSet::new();
for n in &design.nodes {
// 节点 ID 不能为空,且在一个流程内必须唯一
if n.id.trim().is_empty() { bail!("node id is required"); }
if !ids.insert(n.id.clone()) { bail!("duplicate node id: {}", n.id); }
}
// 确保至少包含一个开始节点与一个结束节点
let mut start = 0usize;
let mut end = 0usize;
for n in &design.nodes {
match n.kind.as_str() {
"start" => start += 1,
"end" => end += 1,
_ => {}
}
}
anyhow::ensure!(start >= 1, "flow must have at least one start node");
anyhow::ensure!(end >= 1, "flow must have at least one end node");
// 校验边的引用合法性from/to 必须存在且指向已知节点)
for e in &design.edges {
if e.from.is_empty() || e.to.is_empty() { bail!("edge must have both from and to"); }
if !ids.contains(&e.from) { bail!("edge from references unknown node: {}", e.from); }
if !ids.contains(&e.to) { bail!("edge to references unknown node: {}", e.to); }
}
Ok(())
}
fn build_chain_from_design(design: &DesignSyntax) -> anyhow::Result<super::domain::ChainDef> {
use super::domain::{ChainDef, NodeDef, NodeId, NodeKind, LinkDef};
let mut nodes: Vec<NodeDef> = Vec::new();
for n in &design.nodes {
let kind = match n.kind.as_str() {
"start" => NodeKind::Start,
"end" => NodeKind::End,
"condition" => NodeKind::Condition,
_ => NodeKind::Task,
};
// 从节点 data.title 读取名称,若不存在则为空字符串
let name = n.data.get("title").and_then(|v| v.as_str()).unwrap_or("").to_string();
// 将可执行类型映射到任务标识(用于绑定任务实现)
let mut task = match n.kind.as_str() {
"http" => Some("http".to_string()),
"db" => Some("db".to_string()),
"variable" => Some("variable".to_string()),
// 脚本节点:按语言拆分
"script" | "expr" | "script_rhai" => Some("script_rhai".to_string()),
"script_js" | "javascript" | "js" => Some("script_js".to_string()),
"script_python" | "python" | "py" => Some("script_python".to_string()),
_ => None,
};
// 兼容/推断:根据 data.scripts.* 或 inline script/expr 推断脚本类型
if task.is_none() {
if let Some(obj) = n.data.get("scripts").and_then(|v| v.as_object()) {
if obj.get("js").is_some() { task = Some("script_js".to_string()); }
else if obj.get("python").is_some() { task = Some("script_python".to_string()); }
else if obj.get("rhai").is_some() { task = Some("script_rhai".to_string()); }
}
}
if task.is_none() {
if n.data.get("script").is_some() || n.data.get("expr").is_some() {
task = Some("script_rhai".to_string());
}
}
nodes.push(NodeDef { id: NodeId(n.id.clone()), kind, name, task });
}
// 预统计每个 from 节点的出边数量,用于启发式:条件节点仅一条出边且包含多个条件时,默认组装 AND
use std::collections::HashMap;
let mut out_deg: HashMap<&str, usize> = HashMap::new();
for e in &design.edges { *out_deg.entry(e.from.as_str()).or_insert(0) += 1; }
// 兼容旧版的基于 sourcePortID 的条件编码:
// 若边上带有 source_port_id则在源节点条件节点的 data.conditions 中查找同 key 的条件值,作为边的 condition
// 新增:当 source_port_id 为空,或取值为 and/all/group/true 时,将该条件节点内所有 conditions 的 value 组成数组,按 AND 语义挂到该边上
// 进一步新增启发式:当源为条件节点且其出边仅有 1 条,且该节点内包含多个 conditions则即便 source_port_id 指向了具体 key也按 AND 组装
let mut links: Vec<LinkDef> = Vec::new();
for e in &design.edges {
let mut cond: Option<String> = None;
if let Some(src) = design.nodes.iter().find(|x| x.id == e.from) {
if src.kind.as_str() == "condition" {
let conds = src.data.get("conditions").and_then(|v| v.as_array());
let conds_len = conds.map(|a| a.len()).unwrap_or(0);
let only_one_out = out_deg.get(src.id.as_str()).copied().unwrap_or(0) == 1;
match &e.source_port_id {
Some(spid) => {
let spid_l = spid.to_lowercase();
let mut want_group = spid_l == "and" || spid_l == "all" || spid_l == "group" || spid_l == "true";
if !want_group && only_one_out && conds_len > 1 {
// 启发式回退:单出边 + 多条件 => 组装为 AND 组
want_group = true;
}
if want_group {
if let Some(arr) = conds {
let mut values: Vec<Value> = Vec::new();
for item in arr { if let Some(v) = item.get("value").cloned() { values.push(v); } }
if !values.is_empty() { if let Ok(s) = serde_json::to_string(&Value::Array(values)) { cond = Some(s); } }
}
} else {
if let Some(arr) = conds {
if let Some(item) = arr.iter().find(|c| c.get("key").and_then(|v| v.as_str()) == Some(spid.as_str())) {
if let Some(val) = item.get("value") { if let Ok(s) = serde_json::to_string(val) { cond = Some(s); } }
}
}
}
}
None => {
// 没有指定具体端口:将该节点的全部条件组成 AND 组
if let Some(arr) = conds {
let mut values: Vec<Value> = Vec::new();
for item in arr { if let Some(v) = item.get("value").cloned() { values.push(v); } }
if !values.is_empty() { if let Ok(s) = serde_json::to_string(&Value::Array(values)) { cond = Some(s); } }
}
}
}
}
}
links.push(LinkDef { from: NodeId(e.from.clone()), to: NodeId(e.to.clone()), condition: cond });
}
Ok(ChainDef { name: design.name.clone(), nodes, links })
}
// Rewire external API to typed syntax -> validate -> build
pub fn chain_from_design_json(design: &Value) -> anyhow::Result<super::domain::ChainDef> {
use super::domain::{ChainDef, NodeKind};
// Accept both JSON object and stringified JSON
let parsed: Option<Value> = match design {
Value::String(s) => serde_json::from_str::<Value>(s).ok(),
_ => None,
};
let design = parsed.as_ref().unwrap_or(design);
let mut syntax: DesignSyntax = serde_json::from_value(design.clone())?;
let name = design
.get("name")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let (nodes, valid_ids, nodes_arr) = parse_nodes(design)?;
// Basic structure validations
let start_cnt = nodes.iter().filter(|n| matches!(n.kind, NodeKind::Start)).count();
anyhow::ensure!(start_cnt >= 1, "flow must have at least one start node");
let end_cnt = nodes.iter().filter(|n| matches!(n.kind, NodeKind::End)).count();
anyhow::ensure!(end_cnt >= 1, "flow must have at least one end node");
let links = parse_edges(design, &nodes_arr, &valid_ids)?;
Ok(ChainDef { name, nodes, links })
}
fn parse_nodes(design: &Value) -> anyhow::Result<(
Vec<super::domain::NodeDef>,
std::collections::HashSet<String>,
Vec<Value>,
)> {
use super::domain::{NodeDef, NodeId, NodeKind};
use anyhow::bail;
let nodes_arr = design.get("nodes").and_then(|v| v.as_array()).cloned().unwrap_or_default();
let mut nodes: Vec<NodeDef> = Vec::new();
let mut id_set: std::collections::HashSet<String> = std::collections::HashSet::new();
for n in &nodes_arr {
let id = n.get("id").and_then(|v| v.as_str()).unwrap_or_default();
if id.is_empty() { bail!("node id is required"); }
if !id_set.insert(id.to_string()) { bail!("duplicate node id: {id}"); }
let t = n.get("type").and_then(|v| v.as_str()).unwrap_or("task");
let name_field = n
.get("data")
.and_then(|d| d.get("title"))
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let kind = match t {
"start" => NodeKind::Start,
"end" => NodeKind::End,
"condition" => NodeKind::Decision,
_ => NodeKind::Task,
};
// Map type to task executor id (only for executable nodes). Others will be None.
let task = match t {
"http" => Some("http".to_string()),
"db" => Some("db".to_string()),
_ => None,
};
nodes.push(NodeDef { id: NodeId(id.to_string()), kind, name: name_field, task });
}
Ok((nodes, id_set, nodes_arr))
}
fn parse_edges(
design: &Value,
nodes_arr: &Vec<Value>,
valid_ids: &std::collections::HashSet<String>,
) -> anyhow::Result<Vec<super::domain::LinkDef>> {
use super::domain::{LinkDef, NodeId};
use anyhow::bail;
let mut links: Vec<LinkDef> = Vec::new();
// fill source_port_id for backward compat if edges carry sourcePortID
if let Some(arr) = design.get("edges").and_then(|v| v.as_array()) {
for e in arr {
let from = e
.get("sourceNodeID")
.or_else(|| e.get("source"))
.or_else(|| e.get("from"))
.and_then(|v| v.as_str())
.unwrap_or_default()
.to_string();
let to = e
.get("targetNodeID")
.or_else(|| e.get("target"))
.or_else(|| e.get("to"))
.and_then(|v| v.as_str())
.unwrap_or_default()
.to_string();
if from.is_empty() || to.is_empty() { bail!("edge must have both from and to"); }
if !valid_ids.contains(&from) { bail!("edge from references unknown node: {from}"); }
if !valid_ids.contains(&to) { bail!("edge to references unknown node: {to}"); }
// Try build structured condition for edges from a condition node via sourcePortID mapping
let mut cond: Option<String> = None;
for (i, e) in arr.iter().enumerate() {
if let Some(spid) = e.get("sourcePortID").and_then(|v| v.as_str()) {
// find source node
if let Some(src_node) = nodes_arr.iter().find(|n| n.get("id").and_then(|v| v.as_str()) == Some(from.as_str())) {
if src_node.get("type").and_then(|v| v.as_str()) == Some("condition") {
if let Some(conds) = src_node.get("data").and_then(|d| d.get("conditions")).and_then(|v| v.as_array()) {
if let Some(item) = conds.iter().find(|c| c.get("key").and_then(|v| v.as_str()) == Some(spid)) {
if let Some(val) = item.get("value") {
// store JSON string for engine to interpret at runtime
if let Ok(s) = serde_json::to_string(val) { cond = Some(s); }
}
}
}
}
if i < syntax.edges.len() {
syntax.edges[i].source_port_id = Some(spid.to_string());
}
}
links.push(LinkDef { from: NodeId(from), to: NodeId(to), condition: cond });
}
}
Ok(links)
validate_design(&syntax)?;
build_chain_from_design(&syntax)
}
#[cfg(test)]
@ -203,26 +260,24 @@ mod tests {
let design = json!({
"name": "demo",
"nodes": [
{"id": "s1", "type": "start", "data": {"title": "start"}},
{"id": "t1", "type": "http", "data": {"title": "call"}},
{"id": "e1", "type": "end", "data": {"title": "end"}}
{"id": "n1", "type": "start", "data": {"title": "Start"}},
{"id": "n2", "type": "http", "data": {"title": "HTTP"}},
{"id": "n3", "type": "end", "data": {"title": "End"}}
],
"edges": [
{"sourceNodeID": "s1", "targetNodeID": "t1"},
{"sourceNodeID": "t1", "targetNodeID": "e1"}
{"from": "n1", "to": "n2"},
{"from": "n2", "to": "n3"}
]
});
let chain = chain_from_design_json(&design).expect("ok");
assert_eq!(chain.name, "demo");
let chain = chain_from_design_json(&design).unwrap();
assert_eq!(chain.nodes.len(), 3);
assert_eq!(chain.links.len(), 2);
assert_eq!(chain.nodes.iter().find(|n| matches!(n.kind, super::super::domain::NodeKind::Start)).is_some(), true);
assert_eq!(chain.nodes.iter().find(|n| matches!(n.kind, super::super::domain::NodeKind::End)).is_some(), true);
}
#[test]
fn duplicate_node_id_should_error() {
let design = json!({
"name": "demo",
"nodes": [
{"id": "n1", "type": "start"},
{"id": "n1", "type": "end"}
@ -230,44 +285,35 @@ mod tests {
"edges": []
});
let err = chain_from_design_json(&design).unwrap_err();
let msg = format!("{err}");
assert!(msg.contains("duplicate node id"));
assert!(err.to_string().contains("duplicate node id"));
}
#[test]
fn missing_start_or_end_should_error() {
let only_start = json!({
let design = json!({
"name": "demo",
"nodes": [
{"id": "s1", "type": "start"},
{"id": "t1", "type": "task"}
{"id": "n1", "type": "start"}
],
"edges": []
});
let err = chain_from_design_json(&only_start).unwrap_err();
assert!(format!("{err}").contains("end"));
let only_end = json!({
"nodes": [
{"id": "e1", "type": "end"}
],
"edges": []
});
let err = chain_from_design_json(&only_end).unwrap_err();
assert!(format!("{err}").contains("start"));
let err = chain_from_design_json(&design).unwrap_err();
assert!(err.to_string().contains("at least one end"));
}
#[test]
fn edge_ref_unknown_node_should_error() {
let design = json!({
"name": "demo",
"nodes": [
{"id": "s1", "type": "start"},
{"id": "e1", "type": "end"}
{"id": "n1", "type": "start"},
{"id": "n2", "type": "end"}
],
"edges": [
{"sourceNodeID": "s1", "targetNodeID": "x"}
{"from": "n1", "to": "n3"}
]
});
let err = chain_from_design_json(&design).unwrap_err();
assert!(format!("{err}").contains("unknown node"));
assert!(err.to_string().contains("edge to references unknown node"));
}
}

View File

@ -1,9 +1,128 @@
use std::collections::HashMap;
use tokio::sync::{RwLock, Mutex};
use futures::future::join_all;
use rhai::Engine;
use tracing::info;
// === 表达式评估支持thread_local 引擎与 AST 缓存,避免全局 Sync/Send 限制 ===
use std::cell::RefCell;
use rhai::AST;
use regex::Regex;
// 将常用的正则匹配暴露给表达式使用
fn regex_match(s: &str, pat: &str) -> bool {
Regex::new(pat).map(|re| re.is_match(s)).unwrap_or(false)
}
// 常用字符串函数,便于在表达式中直接调用(函数式写法)
fn contains(s: &str, sub: &str) -> bool { s.contains(sub) }
fn starts_with(s: &str, prefix: &str) -> bool { s.starts_with(prefix) }
fn ends_with(s: &str, suffix: &str) -> bool { s.ends_with(suffix) }
// 新增:判空/判不空(支持任意 Dynamic 类型)
fn is_empty(v: rhai::Dynamic) -> bool {
if v.is_unit() { return true; }
if let Some(s) = v.clone().try_cast::<rhai::ImmutableString>() {
return s.is_empty();
}
if let Some(a) = v.clone().try_cast::<rhai::Array>() {
return a.is_empty();
}
if let Some(m) = v.clone().try_cast::<rhai::Map>() {
return m.is_empty();
}
false
}
fn not_empty(v: rhai::Dynamic) -> bool { !is_empty(v) }
thread_local! {
static RHIA_ENGINE: RefCell<Engine> = RefCell::new({
let mut eng = Engine::new();
// 限制执行步数,防止复杂表达式消耗过多计算资源
eng.set_max_operations(100_000);
// 严格变量模式,避免拼写错误导致静默为 null
eng.set_strict_variables(true);
// 注册常用工具函数
eng.register_fn("regex_match", regex_match);
eng.register_fn("contains", contains);
eng.register_fn("starts_with", starts_with);
eng.register_fn("ends_with", ends_with);
// 新增:注册判空/判不空函数(既可函数式调用,也可方法式调用)
eng.register_fn("is_empty", is_empty);
eng.register_fn("not_empty", not_empty);
eng
});
// 简单的 AST 缓存:以表达式字符串为键存储编译结果(线程本地)
static AST_CACHE: RefCell<HashMap<String, AST>> = RefCell::new(HashMap::new());
}
// 评估 Rhai 表达式为 bool提供 ctx 变量serde_json::Value
fn eval_rhai_expr_bool(expr: &str, ctx: &serde_json::Value) -> bool {
// 构造作用域并注入 ctx
let mut scope = rhai::Scope::new();
let dyn_ctx = match rhai::serde::to_dynamic(ctx.clone()) { Ok(d) => d, Err(_) => rhai::Dynamic::UNIT };
scope.push("ctx", dyn_ctx);
// 先从缓存读取 AST未命中则编译并写入缓存然后执行
let cached = AST_CACHE.with(|c| c.borrow().get(expr).cloned());
if let Some(ast) = cached {
return RHIA_ENGINE.with(|eng| eng.borrow().eval_ast_with_scope::<bool>(&mut scope, &ast).unwrap_or(false));
}
let compiled = RHIA_ENGINE.with(|eng| eng.borrow().compile(expr));
match compiled {
Ok(ast) => {
// 简单容量控制:超过 1024 条时清空,避免无限增长
AST_CACHE.with(|c| {
let mut cache = c.borrow_mut();
if cache.len() > 1024 { cache.clear(); }
cache.insert(expr.to_string(), ast.clone());
});
RHIA_ENGINE.with(|eng| eng.borrow().eval_ast_with_scope::<bool>(&mut scope, &ast).unwrap_or(false))
}
Err(_) => false,
}
}
// 通用:评估 Rhai 表达式并转换为 serde_json::Value失败返回 None
pub(crate) fn eval_rhai_expr_json(expr: &str, ctx: &serde_json::Value) -> Option<serde_json::Value> {
// 构造作用域并注入 ctx
let mut scope = rhai::Scope::new();
let dyn_ctx = match rhai::serde::to_dynamic(ctx.clone()) { Ok(d) => d, Err(_) => rhai::Dynamic::UNIT };
scope.push("ctx", dyn_ctx);
// 先从缓存读取 AST未命中则编译并写入缓存然后执行
let cached = AST_CACHE.with(|c| c.borrow().get(expr).cloned());
let eval = |ast: &AST, scope: &mut rhai::Scope| -> Option<serde_json::Value> {
RHIA_ENGINE.with(|eng| {
eng.borrow()
.eval_ast_with_scope::<rhai::Dynamic>(scope, ast)
.ok()
.and_then(|d| rhai::serde::from_dynamic(&d).ok())
})
};
if let Some(ast) = cached {
return eval(&ast, &mut scope);
}
let compiled = RHIA_ENGINE.with(|eng| eng.borrow().compile(expr));
match compiled {
Ok(ast) => {
AST_CACHE.with(|c| {
let mut cache = c.borrow_mut();
if cache.len() > 1024 { cache.clear(); }
cache.insert(expr.to_string(), ast.clone());
});
eval(&ast, &mut scope)
}
Err(_) => None,
}
}
use super::{context::{DriveOptions, ExecutionMode}, domain::{ChainDef, NodeKind}, task::TaskRegistry};
use crate::flow::executors::condition::eval_condition_json;
pub struct FlowEngine {
pub tasks: TaskRegistry,
@ -14,9 +133,8 @@ impl FlowEngine {
pub fn builder() -> FlowEngineBuilder { FlowEngineBuilder::default() }
pub async fn drive(&self, chain: &ChainDef, mut ctx: serde_json::Value, opts: DriveOptions) -> anyhow::Result<(serde_json::Value, Vec<String>)> {
let mut logs = Vec::new();
pub async fn drive(&self, chain: &ChainDef, ctx: serde_json::Value, opts: DriveOptions) -> anyhow::Result<(serde_json::Value, Vec<String>)> {
// 1) 选取起点
// 查找 start优先 Start 节点;否则选择入度为 0 的第一个节点;再否则回退第一个节点
let start = if let Some(n) = chain
.nodes
@ -42,87 +160,218 @@ impl FlowEngine {
}
};
// 邻接表(按 links 的原始顺序保序)
let mut adj: HashMap<&str, Vec<&super::domain::LinkDef>> = HashMap::new();
for l in &chain.links { adj.entry(&l.from.0).or_default().push(l); }
let node_map: HashMap<&str, &super::domain::NodeDef> = chain.nodes.iter().map(|n| (n.id.0.as_str(), n)).collect();
// 2) 构建可并发共享的数据结构
// 拷贝节点与边(保持原有顺序)到拥有所有权的 HashMap供并发分支安全使用
let node_map_owned: HashMap<String, super::domain::NodeDef> = chain.nodes.iter().map(|n| (n.id.0.clone(), n.clone())).collect();
let mut adj_owned: HashMap<String, Vec<super::domain::LinkDef>> = HashMap::new();
for l in &chain.links { adj_owned.entry(l.from.0.clone()).or_default().push(l.clone()); }
let node_map = std::sync::Arc::new(node_map_owned);
let adj = std::sync::Arc::new(adj_owned);
let mut current = start;
let mut steps = 0usize;
while steps < opts.max_steps {
steps += 1;
let node = node_map.get(current.as_str()).ok_or_else(|| anyhow::anyhow!("node not found"))?;
logs.push(format!("enter node: {}", node.id.0));
info!(target: "udmin.flow", "enter node: {}", node.id.0);
// 共享上下文(允许并发修改,程序端不做冲突校验)
let shared_ctx = std::sync::Arc::new(RwLock::new(ctx));
// 共享日志聚合
let logs_shared = std::sync::Arc::new(Mutex::new(Vec::<String>::new()));
// 任务执行
if let Some(task_name) = &node.task {
if let Some(task) = self.tasks.get(task_name) {
match opts.execution_mode {
ExecutionMode::Sync => {
// 直接传入 node_id 与 node避免对 ctx 魔法字段的依赖
task.execute(&node.id, node, &mut ctx).await?;
logs.push(format!("exec task: {} (sync)", task_name));
info!(target: "udmin.flow", "exec task: {} (sync)", task_name);
// 3) 并发驱动从起点开始
let tasks = self.tasks.clone();
drive_from(tasks, node_map.clone(), adj.clone(), start, shared_ctx.clone(), opts.clone(), logs_shared.clone()).await?;
// 4) 汇总返回
let final_ctx = { shared_ctx.read().await.clone() };
let logs = { logs_shared.lock().await.clone() };
Ok((final_ctx, logs))
}
}
// 从指定节点开始驱动,遇到多条满足条件的边时:
// - 第一条在当前任务内继续
// - 其余分支并行 spawn等待全部分支执行完毕后返回
async fn drive_from(
tasks: TaskRegistry,
node_map: std::sync::Arc<HashMap<String, super::domain::NodeDef>>,
adj: std::sync::Arc<HashMap<String, Vec<super::domain::LinkDef>>>,
start: String,
ctx: std::sync::Arc<RwLock<serde_json::Value>>, // 共享上下文(并发写入通过写锁串行化,不做冲突校验)
opts: DriveOptions,
logs: std::sync::Arc<Mutex<Vec<String>>>,
) -> anyhow::Result<()> {
let mut current = start;
let mut steps = 0usize;
loop {
if steps >= opts.max_steps { break; }
steps += 1;
// 读取节点
let node = match node_map.get(&current) { Some(n) => n, None => break };
{
let mut lg = logs.lock().await;
lg.push(format!("enter node: {}", node.id.0));
}
info!(target: "udmin.flow", "enter node: {}", node.id.0);
// 执行任务
if let Some(task_name) = &node.task {
if let Some(task) = tasks.get(task_name) {
match opts.execution_mode {
ExecutionMode::Sync => {
// 使用快照执行,结束后整体写回(允许最后写入覆盖并发修改;程序端不做冲突校验)
let mut local_ctx = { ctx.read().await.clone() };
task.execute(&node.id, node, &mut local_ctx).await?;
{ let mut w = ctx.write().await; *w = local_ctx; }
{
let mut lg = logs.lock().await;
lg.push(format!("exec task: {} (sync)", task_name));
}
ExecutionMode::AsyncFireAndForget => {
// fire-and-forget: 复制一份上下文供该任务使用,主流程不等待
let mut task_ctx = ctx.clone();
let task_arc = task.clone();
let name_for_log = task_name.clone();
let node_id = node.id.clone();
let node_def = (*node).clone();
tokio::spawn(async move {
let _ = task_arc.execute(&node_id, &node_def, &mut task_ctx).await;
info!(target: "udmin.flow", "exec task: {} (sync)", task_name);
}
ExecutionMode::AsyncFireAndForget => {
// fire-and-forget基于快照执行不写回共享 ctx变量任务除外做有界差异写回
let task_ctx = { ctx.read().await.clone() };
let task_arc = task.clone();
let name_for_log = task_name.clone();
let node_id = node.id.clone();
let node_def = node.clone();
let logs_clone = logs.clone();
let ctx_clone = ctx.clone();
tokio::spawn(async move {
let mut c = task_ctx.clone();
let _ = task_arc.execute(&node_id, &node_def, &mut c).await;
// 对 variable 任务执行写回:将顶层新增/修改的键写回共享 ctx并移除对应 variable 节点
if node_def.task.as_deref() == Some("variable") {
// 计算顶层差异(排除 nodes仅在不同或新增时写回
let mut changed: Vec<(String, serde_json::Value)> = Vec::new();
if let (serde_json::Value::Object(before_map), serde_json::Value::Object(after_map)) = (&task_ctx, &c) {
for (k, v_after) in after_map.iter() {
if k == "nodes" { continue; }
match before_map.get(k) {
Some(v_before) if v_before == v_after => {}
_ => changed.push((k.clone(), v_after.clone())),
}
}
}
{
let mut w = ctx_clone.write().await;
if let serde_json::Value::Object(map) = &mut *w {
for (k, v) in changed.into_iter() { map.insert(k, v); }
if let Some(serde_json::Value::Object(nodes)) = map.get_mut("nodes") {
nodes.remove(node_id.0.as_str());
}
}
}
{
let mut lg = logs_clone.lock().await;
lg.push(format!("exec task done (async): {} (writeback variable)", name_for_log));
}
info!(target: "udmin.flow", "exec task done (async): {} (writeback variable)", name_for_log);
} else {
{
let mut lg = logs_clone.lock().await;
lg.push(format!("exec task done (async): {}", name_for_log));
}
info!(target: "udmin.flow", "exec task done (async): {}", name_for_log);
});
logs.push(format!("spawn task: {} (async)", task_name));
info!(target: "udmin.flow", "spawn task: {} (async)", task_name);
}
}
} else {
logs.push(format!("task not found: {} (skip)", task_name));
info!(target: "udmin.flow", "task not found: {} (skip)", task_name);
}
}
if matches!(node.kind, NodeKind::End) { break; }
// 选择下一条 link优先有条件的且为真否则保序选择第一条无条件边
let mut next: Option<String> = None;
if let Some(links) = adj.get(node.id.0.as_str()) {
// 先检测条件边
for link in links.iter() {
if let Some(cond_str) = &link.condition {
// 两种情况:
// 1) 前端序列化的 JSON形如 { left: {type, content}, operator, right? }
// 2) 直接是 rhai 表达式字符串
let ok = if cond_str.trim_start().starts_with('{') {
match serde_json::from_str::<serde_json::Value>(cond_str) {
Ok(v) => eval_condition_json(&ctx, &v).unwrap_or(false),
Err(_) => false,
}
} else {
let mut scope = rhai::Scope::new();
scope.push("ctx", rhai::serde::to_dynamic(ctx.clone()).map_err(|e| anyhow::anyhow!(e.to_string()))?);
let engine = Engine::new();
engine.eval_with_scope::<bool>(&mut scope, cond_str).unwrap_or(false)
};
if ok { next = Some(link.to.0.clone()); break; }
}
}
// 若没有命中条件边,则取第一条无条件边
if next.is_none() {
for link in links.iter() {
if link.condition.is_none() { next = Some(link.to.0.clone()); break; }
});
{
let mut lg = logs.lock().await;
lg.push(format!("spawn task: {} (async)", task_name));
}
info!(target: "udmin.flow", "spawn task: {} (async)", task_name);
}
}
} else {
let mut lg = logs.lock().await;
lg.push(format!("task not found: {} (skip)", task_name));
info!(target: "udmin.flow", "task not found: {} (skip)", task_name);
}
match next { Some(n) => current = n, None => break }
}
Ok((ctx, logs))
if matches!(node.kind, NodeKind::End) { break; }
// 选择下一批 link仅在 Condition 节点上评估条件;其他节点忽略条件,直接沿第一条边前进
let mut nexts: Vec<String> = Vec::new();
if let Some(links) = adj.get(node.id.0.as_str()) {
if matches!(node.kind, NodeKind::Condition) {
// 条件边:全部评估为真者加入 nexts空字符串条件视为无条件不在此处评估
for link in links.iter() {
if let Some(cond_str) = &link.condition {
if cond_str.trim().is_empty() {
// 空条件:视为无条件边,留待后续回退逻辑处理
info!(target: "udmin.flow", from=%node.id.0, to=%link.to.0, "condition link: empty (unconditional candidate)");
continue;
}
let trimmed = cond_str.trim_start();
let (kind, ok) = if trimmed.starts_with('{') || trimmed.starts_with('[') {
match serde_json::from_str::<serde_json::Value>(cond_str) {
Ok(v) => {
let snapshot = { ctx.read().await.clone() };
("json", eval_condition_json(&snapshot, &v).unwrap_or(false))
}
Err(_) => ("json_parse_error", false),
}
} else {
let snapshot = { ctx.read().await.clone() };
("rhai", eval_rhai_expr_bool(cond_str, &snapshot))
};
info!(target: "udmin.flow", from=%node.id.0, to=%link.to.0, cond_kind=%kind, cond_len=%cond_str.len(), result=%ok, "condition link evaluated");
if ok { nexts.push(link.to.0.clone()); }
} else {
// 无 condition 字段:视为无条件边
info!(target: "udmin.flow", from=%node.id.0, to=%link.to.0, "condition link: none (unconditional candidate)");
}
}
// 若没有命中条件边,则取第一条无条件边(无条件 = 无 condition 或 空字符串)
if nexts.is_empty() {
let mut picked = None;
for link in links.iter() {
match &link.condition {
None => { picked = Some(link.to.0.clone()); break; }
Some(s) if s.trim().is_empty() => { picked = Some(link.to.0.clone()); break; }
_ => {}
}
}
if let Some(to) = picked {
info!(target: "udmin.flow", from=%node.id.0, to=%to, "condition fallback: pick unconditional");
nexts.push(to);
} else {
info!(target: "udmin.flow", node=%node.id.0, "condition: no matched and no unconditional, stop");
}
}
} else {
// 非条件节点忽略条件fan-out 所有出边(全部并行执行)
for link in links.iter() {
nexts.push(link.to.0.clone());
info!(target: "udmin.flow", from=%node.id.0, to=%link.to.0, "fan-out from non-condition node");
}
}
}
if nexts.is_empty() { break; }
if nexts.len() == 1 {
current = nexts.remove(0);
continue;
}
// 多分支:主分支沿第一条继续,其余分支并行执行并等待完成
let mut futs = Vec::new();
for to_id in nexts.iter().skip(1).cloned() {
let tasks_c = tasks.clone();
let node_map_c = node_map.clone();
let adj_c = adj.clone();
let ctx_c = ctx.clone();
let opts_c = opts.clone();
let logs_c = logs.clone();
futs.push(drive_from(tasks_c, node_map_c, adj_c, to_id, ctx_c, opts_c, logs_c));
}
// 当前分支继续第一条
current = nexts.into_iter().next().unwrap();
// 在一个安全点等待已分支的完成(这里选择在下一轮进入前等待)
let _ = join_all(futs).await;
}
Ok(())
}
#[derive(Default)]
@ -142,52 +391,138 @@ impl Default for FlowEngine {
fn default() -> Self { Self { tasks: crate::flow::task::default_registry() } }
}
/* moved to executors::condition
fn eval_condition_json(ctx: &serde_json::Value, cond: &serde_json::Value) -> anyhow::Result<bool> {
// 目前支持前端 Condition 组件导出的: { left:{type, content}, operator, right? }
let left = cond.get("left").ok_or_else(|| anyhow::anyhow!("missing left"))?;
let op = cond.get("operator").and_then(|v| v.as_str()).unwrap_or("");
let right = cond.get("right");
let lval = resolve_value(ctx, left)?;
let rval = match right { Some(v) => Some(resolve_value(ctx, v)?), None => None };
use serde_json::Value as V;
let res = match (op, &lval, &rval) {
("contains", V::String(s), Some(V::String(t))) => s.contains(t),
("equals", V::String(s), Some(V::String(t))) => s == t,
("equals", V::Number(a), Some(V::Number(b))) => a == b,
("is_true", V::Bool(b), _) => *b,
("is_false", V::Bool(b), _) => !*b,
("gt", V::Number(a), Some(V::Number(b))) => a.as_f64().unwrap_or(0.0) > b.as_f64().unwrap_or(0.0),
("lt", V::Number(a), Some(V::Number(b))) => a.as_f64().unwrap_or(0.0) < b.as_f64().unwrap_or(0.0),
_ => false,
};
Ok(res)
}
fn resolve_value(ctx: &serde_json::Value, v: &serde_json::Value) -> anyhow::Result<serde_json::Value> {
use serde_json::Value as V;
let t = v.get("type").and_then(|v| v.as_str()).unwrap_or("");
match t {
"constant" => Ok(v.get("content").cloned().unwrap_or(V::Null)),
"ref" => {
// content: [nodeId, field]
if let Some(arr) = v.get("content").and_then(|v| v.as_array()) {
if arr.len() >= 2 {
if let (Some(node), Some(field)) = (arr[0].as_str(), arr[1].as_str()) {
let val = ctx
.get("nodes")
.and_then(|n| n.get(node))
.and_then(|m| m.get(field))
.cloned()
.or_else(|| ctx.get(field).cloned())
.unwrap_or(V::Null);
return Ok(val);
}
}
}
Ok(V::Null)
}
_ => Ok(V::Null),
-fn eval_condition_json(ctx: &serde_json::Value, cond: &serde_json::Value) -> anyhow::Result<bool> {
- // 支持前端 Condition 组件导出的: { left:{type, content}, operator, right? }
- use serde_json::Value as V;
-
- let left = cond.get("left").ok_or_else(|| anyhow::anyhow!("missing left"))?;
- let op_raw = cond.get("operator").and_then(|v| v.as_str()).unwrap_or("");
- let right = cond.get("right");
-
- let lval = resolve_value(ctx, left)?;
- let rval = match right { Some(v) => Some(resolve_value(ctx, v)?), None => None };
-
- // 归一化操作符:忽略大小写,替换下划线为空格
- let op = op_raw.trim().to_lowercase().replace('_', " ");
-
- // 工具函数
- fn to_f64(v: &V) -> Option<f64> {
- match v {
- V::Number(n) => n.as_f64(),
- V::String(s) => s.parse::<f64>().ok(),
- _ => None,
- }
- }
- fn is_empty_val(v: &V) -> bool {
- match v {
- V::Null => true,
- V::String(s) => s.trim().is_empty(),
- V::Array(a) => a.is_empty(),
- V::Object(m) => m.is_empty(),
- _ => false,
- }
- }
- fn json_equal(a: &V, b: &V) -> bool {
- match (a, b) {
- (V::Number(_), V::Number(_)) | (V::Number(_), V::String(_)) | (V::String(_), V::Number(_)) => {
- match (to_f64(a), to_f64(b)) { (Some(x), Some(y)) => x == y, _ => a == b }
- }
- _ => a == b,
- }
- }
- fn contains(left: &V, right: &V) -> bool {
- match (left, right) {
- (V::String(s), V::String(t)) => s.contains(t),
- (V::Array(arr), r) => arr.iter().any(|x| json_equal(x, r)),
- (V::Object(map), V::String(key)) => map.contains_key(key),
- _ => false,
- }
- }
- fn in_op(left: &V, right: &V) -> bool {
- match right {
- V::Array(arr) => arr.iter().any(|x| json_equal(left, x)),
- V::Object(map) => match left { V::String(k) => map.contains_key(k), _ => false },
- V::String(hay) => match left { V::String(needle) => hay.contains(needle), _ => false },
- _ => false,
- }
- }
- fn bool_like(v: &V) -> bool {
- match v {
- V::Bool(b) => *b,
- V::Null => false,
- V::Number(n) => n.as_f64().map(|x| x != 0.0).unwrap_or(false),
- V::String(s) => {
- let s_l = s.trim().to_lowercase();
- if s_l == "true" { true } else if s_l == "false" { false } else { !s_l.is_empty() }
- }
- V::Array(a) => !a.is_empty(),
- V::Object(m) => !m.is_empty(),
- }
- }
-
- let res = match (op.as_str(), &lval, &rval) {
- // 等于 / 不等于(适配所有 JSON 类型;数字按 f64 比较,其他走深度相等)
- ("equal" | "equals" | "==" | "eq", l, Some(r)) => json_equal(l, r),
- ("not equal" | "!=" | "not equals" | "neq", l, Some(r)) => !json_equal(l, r),
-
- // 数字比较
- ("greater than" | ">" | "gt", l, Some(r)) => match (to_f64(l), to_f64(r)) { (Some(a), Some(b)) => a > b, _ => false },
- ("greater than or equal" | ">=" | "gte" | "ge", l, Some(r)) => match (to_f64(l), to_f64(r)) { (Some(a), Some(b)) => a >= b, _ => false },
- ("less than" | "<" | "lt", l, Some(r)) => match (to_f64(l), to_f64(r)) { (Some(a), Some(b)) => a < b, _ => false },
- ("less than or equal" | "<=" | "lte" | "le", l, Some(r)) => match (to_f64(l), to_f64(r)) { (Some(a), Some(b)) => a <= b, _ => false },
-
- // 包含 / 不包含(字符串、数组、对象(键)
- ("contains", l, Some(r)) => contains(l, r),
- ("not contains", l, Some(r)) => !contains(l, r),
-
- // 成员关系left in right / not in
- ("in", l, Some(r)) => in_op(l, r),
- ("not in" | "nin", l, Some(r)) => !in_op(l, r),
-
- // 为空 / 非空字符串、数组、对象、null
- ("is empty" | "empty" | "isempty", l, _) => is_empty_val(l),
- ("is not empty" | "not empty" | "notempty", l, _) => !is_empty_val(l),
-
- // 布尔判断(对各类型进行布尔化)
- ("is true" | "is true?" | "istrue", l, _) => bool_like(l),
- ("is false" | "isfalse", l, _) => !bool_like(l),
-
- _ => false,
- };
- Ok(res)
-}
-
-fn resolve_value(ctx: &serde_json::Value, v: &serde_json::Value) -> anyhow::Result<serde_json::Value> {
- use serde_json::Value as V;
- let t = v.get("type").and_then(|v| v.as_str()).unwrap_or("");
- match t {
- "constant" => Ok(v.get("content").cloned().unwrap_or(V::Null)),
- "ref" => {
- // content: [nodeId, field]
- if let Some(arr) = v.get("content").and_then(|v| v.as_array()) {
- if arr.len() >= 2 {
- if let (Some(node), Some(field)) = (arr[0].as_str(), arr[1].as_str()) {
- let val = ctx
- .get("nodes")
- .and_then(|n| n.get(node))
- .and_then(|m| m.get(field))
- .cloned()
- .or_else(|| ctx.get(field).cloned())
- .unwrap_or(V::Null);
- return Ok(val);
- }
- }
- }
- Ok(V::Null)
- }
- "expression" => {
- let expr = v.get("content").and_then(|x| x.as_str()).unwrap_or("");
- if expr.trim().is_empty() { return Ok(V::Null); }
- Ok(super::engine::eval_rhai_expr_json(expr, ctx).unwrap_or(V::Null))
- }
- _ => Ok(V::Null),
}
}
}
*/

View File

@ -0,0 +1,215 @@
use anyhow::Result;
use serde_json::Value as V;
use tracing::info;
pub(crate) fn eval_condition_json(ctx: &serde_json::Value, cond: &serde_json::Value) -> Result<bool> {
// 新增:若 cond 为数组,按 AND 语义评估(全部为 true 才为 true
if let Some(arr) = cond.as_array() {
let mut all_true = true;
for (idx, item) in arr.iter().enumerate() {
let ok = eval_condition_json(ctx, item)?;
info!(target = "udmin.flow", index = idx, result = %ok, "condition group item (AND)");
if !ok { all_true = false; }
}
info!(target = "udmin.flow", count = arr.len(), result = %all_true, "condition group evaluated (AND)");
return Ok(all_true);
}
// 支持前端 Condition 组件导出的: { left:{type, content}, operator, right? }
let left = cond.get("left").ok_or_else(|| anyhow::anyhow!("missing left"))?;
let op_raw = cond.get("operator").and_then(|v| v.as_str()).unwrap_or("");
let right_raw = cond.get("right");
// 解析弱等于标记:当右值 schema.extra.weak 为 true 时,对字符串比较采用忽略大小写与首尾空白的弱等于
let weak_eq = right_raw
.and_then(|r| r.get("schema"))
.and_then(|s| s.get("extra"))
.and_then(|e| e.get("weak"))
.and_then(|b| b.as_bool())
.unwrap_or(false);
let lval = resolve_value(ctx, left)?;
let rval = match right_raw { Some(v) => Some(resolve_value(ctx, v)?), None => None };
// 归一化操作符:忽略大小写,替换下划线为空格
let op = op_raw.trim().to_lowercase().replace('_', " ");
// 工具函数
fn to_f64(v: &V) -> Option<f64> {
match v {
V::Number(n) => n.as_f64(),
V::String(s) => s.parse::<f64>().ok(),
_ => None,
}
}
fn is_empty_val(v: &V) -> bool {
match v {
V::Null => true,
V::String(s) => s.trim().is_empty(),
V::Array(a) => a.is_empty(),
V::Object(m) => m.is_empty(),
_ => false,
}
}
fn norm_str(s: &str) -> String { s.trim().to_lowercase() }
fn json_equal(a: &V, b: &V, weak: bool) -> bool {
match (a, b) {
// 数字:做宽松比较(字符串转数字)
(V::Number(_), V::Number(_)) | (V::Number(_), V::String(_)) | (V::String(_), V::Number(_)) => {
match (to_f64(a), to_f64(b)) { (Some(x), Some(y)) => x == y, _ => a == b }
}
// 字符串:若 weak 则忽略大小写与首尾空白
(V::String(sa), V::String(sb)) if weak => norm_str(sa) == norm_str(sb),
_ => a == b,
}
}
fn contains(left: &V, right: &V, weak: bool) -> bool {
match (left, right) {
(V::String(s), V::String(t)) => {
if weak { norm_str(s).contains(&norm_str(t)) } else { s.contains(t) }
}
(V::Array(arr), r) => arr.iter().any(|x| json_equal(x, r, weak)),
(V::Object(map), V::String(key)) => {
if weak { map.keys().any(|k| norm_str(k) == norm_str(key)) } else { map.contains_key(key) }
}
_ => false,
}
}
fn in_op(left: &V, right: &V, weak: bool) -> bool {
match right {
V::Array(arr) => arr.iter().any(|x| json_equal(left, x, weak)),
V::Object(map) => match left { V::String(k) => {
if weak { map.keys().any(|kk| norm_str(kk) == norm_str(k)) } else { map.contains_key(k) }
}, _ => false },
V::String(hay) => match left { V::String(needle) => {
if weak { norm_str(hay).contains(&norm_str(needle)) } else { hay.contains(needle) }
}, _ => false },
_ => false,
}
}
fn bool_like(v: &V) -> bool {
match v {
V::Bool(b) => *b,
V::Null => false,
V::Number(n) => n.as_f64().map(|x| x != 0.0).unwrap_or(false),
V::String(s) => {
let s_l = s.trim().to_lowercase();
if s_l == "true" { true } else if s_l == "false" { false } else { !s_l.is_empty() }
}
V::Array(a) => !a.is_empty(),
V::Object(m) => !m.is_empty(),
}
}
let res = match (op.as_str(), &lval, &rval) {
// 等于 / 不等于(适配所有 JSON 类型;数字按 f64 比较,其他走深度相等)
("equal" | "equals" | "==" | "eq", l, Some(r)) => json_equal(l, r, weak_eq),
("not equal" | "!=" | "not equals" | "neq", l, Some(r)) => !json_equal(l, r, weak_eq),
// 数字比较
("greater than" | ">" | "gt", l, Some(r)) => match (to_f64(l), to_f64(r)) { (Some(a), Some(b)) => a > b, _ => false },
("greater than or equal" | ">=" | "gte" | "ge", l, Some(r)) => match (to_f64(l), to_f64(r)) { (Some(a), Some(b)) => a >= b, _ => false },
("less than" | "<" | "lt", l, Some(r)) => match (to_f64(l), to_f64(r)) { (Some(a), Some(b)) => a < b, _ => false },
("less than or equal" | "<=" | "lte" | "le", l, Some(r)) => match (to_f64(l), to_f64(r)) { (Some(a), Some(b)) => a <= b, _ => false },
// 包含 / 不包含(字符串、数组、对象(键)
("contains", l, Some(r)) => contains(l, r, weak_eq),
("not contains", l, Some(r)) => !contains(l, r, weak_eq),
// 成员关系left in right / not in
("in", l, Some(r)) => in_op(l, r, weak_eq),
("not in" | "nin", l, Some(r)) => !in_op(l, r, weak_eq),
// 为空 / 非空字符串、数组、对象、null
("is empty" | "empty" | "isempty", l, _) => is_empty_val(l),
("is not empty" | "not empty" | "notempty", l, _) => !is_empty_val(l),
// 布尔判断(对各类型进行布尔化)
("is true" | "is true?" | "istrue", l, _) => bool_like(l),
("is false" | "isfalse", l, _) => !bool_like(l),
_ => false,
};
// 记录调试日志,便于定位条件为何未命中
let l_dbg = match &lval { V::String(s) => format!("\"{}\"", s), _ => format!("{}", lval) };
let r_dbg = match &rval { Some(V::String(s)) => format!("\"{}\"", s), Some(v) => format!("{}", v), None => "<none>".to_string() };
info!(target = "udmin.flow", op=%op, weak=%weak_eq, left=%l_dbg, right=%r_dbg, result=%res, "condition eval");
Ok(res)
}
pub(crate) fn resolve_value(ctx: &serde_json::Value, v: &serde_json::Value) -> Result<serde_json::Value> {
let t = v.get("type").and_then(|v| v.as_str()).unwrap_or("");
match t {
"constant" => Ok(v.get("content").cloned().unwrap_or(V::Null)),
"ref" => {
// content: [nodeId, field]
if let Some(arr) = v.get("content").and_then(|v| v.as_array()) {
if arr.len() >= 2 {
if let (Some(node), Some(field)) = (arr[0].as_str(), arr[1].as_str()) {
let val = ctx
.get("nodes")
.and_then(|n| n.get(node))
.and_then(|m| m.get(field))
.cloned()
.or_else(|| ctx.get(field).cloned())
.unwrap_or(V::Null);
return Ok(val);
}
}
}
Ok(V::Null)
}
"expression" => {
let expr = v.get("content").and_then(|x| x.as_str()).unwrap_or("");
if expr.trim().is_empty() { return Ok(V::Null); }
Ok(crate::flow::engine::eval_rhai_expr_json(expr, ctx).unwrap_or(V::Null))
}
_ => Ok(V::Null),
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
fn cond_eq_const(left: serde_json::Value, right: serde_json::Value) -> serde_json::Value {
json!({
"left": {"type": "constant", "content": left},
"operator": "eq",
"right": {"type": "constant", "content": right}
})
}
#[test]
fn and_group_all_true() {
let ctx = json!({});
let group = json!([
cond_eq_const(json!(100), json!(100)),
json!({
"left": {"type": "constant", "content": 100},
"operator": ">",
"right": {"type": "constant", "content": 10}
})
]);
let ok = eval_condition_json(&ctx, &group).unwrap();
assert!(ok);
}
#[test]
fn and_group_has_false() {
let ctx = json!({});
let group = json!([
cond_eq_const(json!(100), json!(10)), // false
json!({
"left": {"type": "constant", "content": 100},
"operator": ">",
"right": {"type": "constant", "content": 10}
})
]);
let ok = eval_condition_json(&ctx, &group).unwrap();
assert!(!ok);
}
}

View File

@ -1,2 +1,8 @@
pub mod http;
pub mod db;
pub mod db;
// removed: pub mod expr;
pub mod variable;
pub mod script_rhai;
pub mod script_js;
pub mod script_python;
pub mod condition;

View File

@ -0,0 +1,54 @@
use async_trait::async_trait;
use serde_json::Value;
use tracing::{debug, info};
use std::time::Instant;
use crate::flow::task::Executor;
use crate::flow::domain::{NodeDef, NodeId};
fn read_node_script_file(ctx: &Value, node_id: &str, lang_key: &str) -> Option<String> {
if let Some(nodes) = ctx.get("nodes").and_then(|v| v.as_object()) {
if let Some(m) = nodes.get(node_id).and_then(|v| v.get("scripts")).and_then(|v| v.as_object()) {
return m.get(lang_key).and_then(|v| v.as_str()).map(|s| s.to_string());
}
}
None
}
fn truncate_str(s: &str, max: usize) -> String {
let s = s.replace('\n', " ").replace('\r', " ");
if s.len() <= max { s } else { format!("{}", &s[..max]) }
}
#[derive(Default)]
pub struct ScriptJsTask;
#[async_trait]
impl Executor for ScriptJsTask {
async fn execute(&self, node_id: &NodeId, _node: &NodeDef, ctx: &mut Value) -> anyhow::Result<()> {
let start = Instant::now();
// 优先 nodes.<id>.scripts.js 指定的脚本文件路径
if let Some(path) = read_node_script_file(ctx, &node_id.0, "js") {
let preview = truncate_str(&path, 120);
info!(target = "udmin.flow", node=%node_id.0, file=%preview, "script_js task: JavaScript file execution not implemented yet (skipped)");
return Ok(());
}
// 兼容 inline 配置(暂不执行,仅提示)
let inline = ctx.get("script")
.or_else(|| ctx.get("expr"))
.and_then(|v| v.as_str())
.map(|s| s.to_string());
if let Some(code) = inline {
let preview = truncate_str(&code, 200);
debug!(target = "udmin.flow", node=%node_id.0, preview=%preview, "script_js task: inline script provided, but execution not implemented");
let _elapsed = start.elapsed().as_millis();
info!(target = "udmin.flow", node=%node_id.0, "script_js task: JavaScript execution not implemented yet (skipped)");
return Ok(());
}
info!(target = "udmin.flow", node=%node_id.0, "script_js task: no script found, skip");
Ok(())
}
}

View File

@ -0,0 +1,54 @@
use async_trait::async_trait;
use serde_json::Value;
use tracing::{debug, info};
use std::time::Instant;
use crate::flow::task::Executor;
use crate::flow::domain::{NodeDef, NodeId};
fn read_node_script_file(ctx: &Value, node_id: &str, lang_key: &str) -> Option<String> {
if let Some(nodes) = ctx.get("nodes").and_then(|v| v.as_object()) {
if let Some(m) = nodes.get(node_id).and_then(|v| v.get("scripts")).and_then(|v| v.as_object()) {
return m.get(lang_key).and_then(|v| v.as_str()).map(|s| s.to_string());
}
}
None
}
fn truncate_str(s: &str, max: usize) -> String {
let s = s.replace('\n', " ").replace('\r', " ");
if s.len() <= max { s } else { format!("{}", &s[..max]) }
}
#[derive(Default)]
pub struct ScriptPythonTask;
#[async_trait]
impl Executor for ScriptPythonTask {
async fn execute(&self, node_id: &NodeId, _node: &NodeDef, ctx: &mut Value) -> anyhow::Result<()> {
let start = Instant::now();
// 优先 nodes.<id>.scripts.python 指定的脚本文件路径
if let Some(path) = read_node_script_file(ctx, &node_id.0, "python") {
let preview = truncate_str(&path, 120);
info!(target = "udmin.flow", node=%node_id.0, file=%preview, "script_python task: Python file execution not implemented yet (skipped)");
return Ok(());
}
// 兼容 inline 配置(暂不执行,仅提示)
let inline = ctx.get("script")
.or_else(|| ctx.get("expr"))
.and_then(|v| v.as_str())
.map(|s| s.to_string());
if let Some(code) = inline {
let preview = truncate_str(&code, 200);
debug!(target = "udmin.flow", node=%node_id.0, preview=%preview, "script_python task: inline script provided, but execution not implemented");
let _elapsed = start.elapsed().as_millis();
info!(target = "udmin.flow", node=%node_id.0, "script_python task: Python execution not implemented yet (skipped)");
return Ok(());
}
info!(target = "udmin.flow", node=%node_id.0, "script_python task: no script found, skip");
Ok(())
}
}

View File

@ -0,0 +1,135 @@
use serde_json::Value;
use tracing::{debug, info};
use std::fs;
use std::time::Instant;
use crate::flow::domain::NodeId;
use crate::flow::engine::eval_rhai_expr_json;
use crate::flow::task::Executor;
use crate::flow::domain::NodeDef;
use async_trait::async_trait;
fn truncate_str(s: &str, max: usize) -> String {
let s = s.replace('\n', " ").replace('\r', " ");
if s.len() <= max { s } else { format!("{}", &s[..max]) }
}
fn shallow_diff(before: &Value, after: &Value) -> (Vec<String>, Vec<String>, Vec<String>) {
use std::collections::BTreeSet;
let mut added = Vec::new();
let mut removed = Vec::new();
let mut modified = Vec::new();
let (Some(bm), Some(am)) = (before.as_object(), after.as_object()) else {
if before != after { modified.push("<root>".to_string()); }
return (added, removed, modified);
};
let bkeys: BTreeSet<_> = bm.keys().cloned().collect();
let akeys: BTreeSet<_> = am.keys().cloned().collect();
for k in akeys.difference(&bkeys) { added.push((*k).to_string()); }
for k in bkeys.difference(&akeys) { removed.push((*k).to_string()); }
for k in akeys.intersection(&bkeys) {
let key = (*k).to_string();
if bm.get(&key) != am.get(&key) { modified.push(key); }
}
(added, removed, modified)
}
pub fn exec_rhai_file(node_id: &NodeId, path: &str, ctx: &mut Value) -> anyhow::Result<()> {
let start = Instant::now();
let code = match fs::read_to_string(path) {
Ok(s) => s,
Err(e) => {
info!(target = "udmin.flow", node=%node_id.0, err=%e.to_string(), "script task: failed to read Rhai file");
return Ok(());
}
};
let script = code;
if script.trim().is_empty() {
info!(target = "udmin.flow", node=%node_id.0, "script task: empty Rhai file, skip");
return Ok(());
}
let preview = truncate_str(&script, 200);
debug!(target = "udmin.flow", node=%node_id.0, preview=%preview, "script task: will execute Rhai file");
let before_ctx = ctx.clone();
let wrapped = format!("{{ {} ; ctx }}", script);
let res = eval_rhai_expr_json(&wrapped, ctx);
let dur_ms = start.elapsed().as_millis();
match res {
Some(new_ctx) => {
let (added, removed, modified) = shallow_diff(&before_ctx, &new_ctx);
*ctx = new_ctx;
info!(target = "udmin.flow", node=%node_id.0, ms=%dur_ms, added=%added.len(), removed=%removed.len(), modified=%modified.len(), "script task: Rhai file executed and ctx updated");
if !(added.is_empty() && removed.is_empty() && modified.is_empty()) {
debug!(target = "udmin.flow", node=%node_id.0, ?added, ?removed, ?modified, "script task: ctx shallow diff");
}
}
None => {
info!(target = "udmin.flow", node=%node_id.0, ms=%dur_ms, preview=%preview, "script task: Rhai file execution failed, ctx unchanged");
}
}
Ok(())
}
fn read_node_script_file(ctx: &Value, node_id: &str) -> Option<String> {
if let Some(nodes) = ctx.get("nodes").and_then(|v| v.as_object()) {
if let Some(m) = nodes.get(node_id).and_then(|v| v.get("scripts")).and_then(|v| v.as_object()) {
return m.get("rhai").and_then(|v| v.as_str()).map(|s| s.to_string());
}
}
None
}
#[derive(Default)]
pub struct ScriptRhaiTask;
#[async_trait]
impl Executor for ScriptRhaiTask {
async fn execute(&self, node_id: &NodeId, _node: &NodeDef, ctx: &mut Value) -> anyhow::Result<()> {
let start = Instant::now();
// 1) 文件脚本优先nodes.<id>.scripts.rhai -> 直接执行文件
if let Some(path) = read_node_script_file(ctx, &node_id.0) {
return exec_rhai_file(node_id, &path, ctx);
}
// 2) inline 脚本(支持 String 或 { script | expr }
let cfg: Option<String> = ctx.get("nodes")
.and_then(|nodes| nodes.get(&node_id.0))
.and_then(|n| n.get("script").or_else(|| n.get("expr")))
.and_then(|v| match v { Value::String(s) => Some(s.clone()), Value::Object(m) => m.get("script").or_else(|| m.get("expr")).and_then(|x| x.as_str()).map(|s| s.to_string()), _ => None })
.or_else(|| ctx.get("script").and_then(|v| v.as_str()).map(|s| s.to_string()))
.or_else(|| ctx.get("expr").and_then(|v| v.as_str()).map(|s| s.to_string()));
if let Some(script) = cfg {
if script.trim().is_empty() {
info!(target = "udmin.flow", node=%node_id.0, "script_rhai task: empty inline script, skip");
return Ok(());
}
let script_preview = truncate_str(&script, 200);
debug!(target = "udmin.flow", node=%node_id.0, preview=%script_preview, "script_rhai task: will execute Rhai inline script");
let before_ctx = ctx.clone();
let wrapped = format!("{{ {} ; ctx }}", script);
let res = super::super::engine::eval_rhai_expr_json(&wrapped, ctx);
let dur_ms = start.elapsed().as_millis();
match res {
Some(new_ctx) => {
let (added, removed, modified) = shallow_diff(&before_ctx, &new_ctx);
*ctx = new_ctx;
info!(target = "udmin.flow", node=%node_id.0, ms=%dur_ms, added=%added.len(), removed=%removed.len(), modified=%modified.len(), "script_rhai task: inline executed and ctx updated");
if !(added.is_empty() && removed.is_empty() && modified.is_empty()) {
debug!(target = "udmin.flow", node=%node_id.0, ?added, ?removed, ?modified, "script_rhai task: ctx shallow diff");
}
}
None => {
info!(target = "udmin.flow", node=%node_id.0, ms=%dur_ms, preview=%script_preview, "script_rhai task: inline execution failed, ctx unchanged");
}
}
return Ok(());
}
info!(target = "udmin.flow", node=%node_id.0, "script_rhai task: no script found, skip");
Ok(())
}
}

View File

@ -0,0 +1,133 @@
use async_trait::async_trait;
use serde_json::Value;
use tracing::info;
use crate::flow::task::Executor;
use crate::flow::domain::{NodeDef, NodeId};
use crate::flow::engine::eval_rhai_expr_json;
#[derive(Default)]
pub struct VariableTask;
fn resolve_assign_value(ctx: &Value, v: &Value) -> Value {
use serde_json::Value as V;
// helper: get by object path
fn get_by_path<'a>(mut cur: &'a V, path: &[&str]) -> Option<&'a V> {
for seg in path {
match cur {
V::Object(map) => {
if let Some(next) = map.get(*seg) { cur = next; } else { return None; }
}
_ => return None,
}
}
Some(cur)
}
let t = v.get("type").and_then(|v| v.as_str()).unwrap_or("");
match t {
"constant" => v.get("content").cloned().unwrap_or(V::Null),
"ref" => {
// frontend IFlowValue ref: content is [nodeId, key1, key2, ...] or [topKey, ...]
let parts: Vec<String> = v
.get("content")
.and_then(|v| v.as_array())
.map(|arr| arr.iter().filter_map(|x| x.as_str().map(|s| s.to_string())).collect())
.unwrap_or_default();
if parts.is_empty() { return V::Null; }
// Prefer nodes.<nodeId>.* if node id is provided
if parts.len() >= 1 {
let node_id = &parts[0];
let rest: Vec<&str> = parts.iter().skip(1).map(|s| s.as_str()).collect();
// 1) direct: nodes.<nodeId>.<rest...>
let mut path_nodes: Vec<&str> = vec!["nodes", node_id.as_str()];
path_nodes.extend(rest.iter().copied());
if let Some(val) = get_by_path(ctx, &path_nodes) { return val.clone(); }
// 2) HTTP shortcut: nodes.<nodeId>.http_response.<rest...> (e.g., [node, "body"])
let mut path_http: Vec<&str> = vec!["nodes", node_id.as_str(), "http_response"];
path_http.extend(rest.iter().copied());
if let Some(val) = get_by_path(ctx, &path_http) { return val.clone(); }
}
// Fallback: interpret as top-level path: ctx[parts[0]][parts[1]]...
let path_top: Vec<&str> = parts.iter().map(|s| s.as_str()).collect();
if let Some(val) = get_by_path(ctx, &path_top) { return val.clone(); }
// Additional fallback: if looks like [nodeId, ...rest] but nodes.* missing, try top-level with rest only
if parts.len() >= 2 {
let rest_only: Vec<&str> = parts.iter().skip(1).map(|s| s.as_str()).collect();
if let Some(val) = get_by_path(ctx, &rest_only) { return val.clone(); }
}
V::Null
}
"expression" => {
let expr = v.get("content").and_then(|x| x.as_str()).unwrap_or("");
if expr.trim().is_empty() { return V::Null; }
eval_rhai_expr_json(expr, ctx).unwrap_or(V::Null)
}
_ => {
// fallback: if content exists, treat as constant
v.get("content").cloned().unwrap_or(V::Null)
}
}
}
#[async_trait]
impl Executor for VariableTask {
async fn execute(&self, node_id: &NodeId, _node: &NodeDef, ctx: &mut Value) -> anyhow::Result<()> {
// 读取 variable 配置:仅节点级
let node_id_str = &node_id.0;
let cfg = match ctx.get("nodes") {
Some(nodes) => nodes.get(node_id_str).and_then(|n| n.get("variable")).cloned(),
_ => None,
};
let Some(cfg) = cfg else {
info!(target = "udmin.flow", node=%node_id.0, "variable task: no config found, skip");
return Ok(());
};
// 支持 { assign: [...] } 或直接为数组
let assigns: Vec<Value> = match &cfg {
Value::Array(arr) => arr.clone(),
Value::Object(m) => m.get("assign").and_then(|v| v.as_array()).cloned().unwrap_or_default(),
_ => vec![],
};
if assigns.is_empty() {
info!(target = "udmin.flow", node=%node_id.0, "variable task: empty assign list, skip");
// 移除 variable 节点配置,避免出现在最终 ctx
if let Value::Object(map) = ctx { if let Some(Value::Object(nodes)) = map.get_mut("nodes") { nodes.remove(node_id_str); } }
return Ok(());
}
let mut applied = 0usize;
for item in assigns {
let op = item.get("operator").and_then(|v| v.as_str()).unwrap_or("assign");
let left = item.get("left").and_then(|v| v.as_str()).unwrap_or("");
let right = item.get("right").unwrap_or(&Value::Null);
if left.is_empty() { continue; }
let val = resolve_assign_value(ctx, right);
if let Value::Object(map) = ctx {
let exists = map.contains_key(left);
let do_set = match op {
"declare" => !exists,
_ => true,
};
if do_set {
map.insert(left.to_string(), val);
applied += 1;
}
}
}
// 执行完成后,移除 variable 节点,避免出现在最终 ctx
if let Value::Object(map) = ctx {
if let Some(Value::Object(nodes)) = map.get_mut("nodes") {
nodes.remove(node_id_str);
}
}
info!(target = "udmin.flow", node=%node_id.0, count=%applied, "variable task: assigned variables");
Ok(())
}
}

View File

@ -2,6 +2,8 @@ use serde_json::{json, Value};
mod http;
mod db;
mod variable;
mod script;
/// Trim whitespace and strip wrapping quotes/backticks if present
pub fn sanitize_wrapped(s: &str) -> String {
@ -54,9 +56,37 @@ pub fn ctx_from_design_json(design: &Value) -> Value {
node_cfg.insert("db".into(), v);
}
}
"variable" => {
if let Some(v) = variable::extract_variable_cfg(n) {
node_cfg.insert("variable".into(), v);
}
}
"expr" | "script" | "script_rhai" | "script_js" | "script_python" => {
if let Some(v) = script::extract_script_cfg(n) {
// 直接作为 script 字段保存,执行器会同时兼容 script/expr
node_cfg.insert("script".into(), v);
}
// 多脚本文件: data.scripts.{rhai,js,python}
if let Some(v) = script::extract_scripts_cfg(n) {
node_cfg.insert("scripts".into(), v);
}
}
_ => {}
}
// 兼容:非 expr/script 类型,但 data 内包含脚本配置时也提取
if !node_cfg.contains_key("script") {
if let Some(v) = script::extract_script_cfg(n) {
node_cfg.insert("script".into(), v);
}
}
// 兼容:非 expr/script 类型,但 data.scripts.* 也提取
if !node_cfg.contains_key("scripts") {
if let Some(v) = script::extract_scripts_cfg(n) {
node_cfg.insert("scripts".into(), v);
}
}
if !node_cfg.is_empty() { nodes_map.insert(id.to_string(), Value::Object(node_cfg)); }
}
}

View File

@ -0,0 +1,63 @@
use serde_json::Value;
// Extract single inline script config from a node's design_json: prefer data.script or data.expr
pub fn extract_script_cfg(n: &Value) -> Option<Value> {
let data = n.get("data");
// script may be a plain string, or object with { content: string }
if let Some(Value::String(s)) = data.and_then(|d| d.get("script")) {
let s = super::sanitize_wrapped(s);
if !s.is_empty() { return Some(Value::String(s)); }
}
if let Some(Value::Object(obj)) = data.and_then(|d| d.get("script")) {
if let Some(Value::String(s)) = obj.get("content") {
let s = super::sanitize_wrapped(s);
if !s.is_empty() { return Some(Value::String(s)); }
}
}
// fallback to expr
if let Some(Value::String(s)) = data.and_then(|d| d.get("expr")) {
let s = super::sanitize_wrapped(s);
if !s.is_empty() { return Some(Value::String(s)); }
}
if let Some(Value::Object(obj)) = data.and_then(|d| d.get("expr")) {
if let Some(Value::String(s)) = obj.get("content") {
let s = super::sanitize_wrapped(s);
if !s.is_empty() { return Some(Value::String(s)); }
}
}
None
}
// Extract multi-language scripts config: data.scripts.{rhai,js,python} each can be string (file path) or object { file | path }
pub fn extract_scripts_cfg(n: &Value) -> Option<Value> {
let data = n.get("data");
let scripts_obj = match data.and_then(|d| d.get("scripts")).and_then(|v| v.as_object()) {
Some(m) => m,
None => return None,
};
let mut out = serde_json::Map::new();
for (k, v) in scripts_obj {
let key = k.to_lowercase();
let lang = match key.as_str() {
"rhai" => Some("rhai"),
"js" | "javascript" => Some("js"),
"py" | "python" => Some("python"),
_ => None,
};
if let Some(lang_key) = lang {
let path_opt = match v {
Value::String(s) => Some(super::sanitize_wrapped(s)),
Value::Object(obj) => {
if let Some(Value::String(p)) = obj.get("file").or_else(|| obj.get("path")) {
Some(super::sanitize_wrapped(p))
} else { None }
}
_ => None,
};
if let Some(p) = path_opt { if !p.is_empty() { out.insert(lang_key.to_string(), Value::String(p)); } }
}
}
if out.is_empty() { None } else { Some(Value::Object(out)) }
}

View File

@ -0,0 +1,19 @@
use serde_json::Value;
// Extract variable config: assign list from a node
pub fn extract_variable_cfg(n: &Value) -> Option<Value> {
let data = n.get("data").and_then(|d| d.as_object());
let assigns = data.and_then(|d| d.get("assign")).cloned();
match assigns {
Some(Value::Array(arr)) if !arr.is_empty() => Some(Value::Object(serde_json::Map::from_iter([
("assign".into(), Value::Array(arr))
]))),
Some(v @ Value::Array(_)) => Some(Value::Object(serde_json::Map::from_iter([
("assign".into(), v)
]))),
Some(v @ Value::Object(_)) => Some(Value::Object(serde_json::Map::from_iter([
("assign".into(), Value::Array(vec![v]))
]))),
_ => None,
}
}

View File

@ -3,6 +3,6 @@ pub mod context;
pub mod task;
pub mod engine;
pub mod dsl;
pub mod storage;
// removed: pub mod storage;
pub mod executors;
pub mod mappers;

View File

@ -1,36 +0,0 @@
use once_cell::sync::Lazy;
use std::collections::HashMap;
use std::sync::Mutex;
static STORE: Lazy<Mutex<HashMap<String, String>>> = Lazy::new(|| Mutex::new(HashMap::new()));
pub fn list() -> Vec<(String, String)> {
STORE.lock().unwrap().iter().map(|(k, v)| (k.clone(), v.clone())).collect()
}
pub fn get(id: &str) -> Option<String> { STORE.lock().unwrap().get(id).cloned() }
pub fn put(id: String, yaml: String) { STORE.lock().unwrap().insert(id, yaml); }
pub fn del(id: &str) -> Option<String> { STORE.lock().unwrap().remove(id) }
// ---- New: trait abstraction for storage ----
pub trait Storage: Send + Sync {
fn list(&self) -> Vec<(String, String)>;
fn get(&self, id: &str) -> Option<String>;
fn put(&self, id: &str, yaml: String);
fn del(&self, id: &str) -> Option<String>;
}
/// A trivial Storage implementation that delegates to the module-level global in-memory store.
pub struct GlobalMemoryStorage;
impl Storage for GlobalMemoryStorage {
fn list(&self) -> Vec<(String, String)> { crate::flow::storage::list() }
fn get(&self, id: &str) -> Option<String> { crate::flow::storage::get(id) }
fn put(&self, id: &str, yaml: String) { crate::flow::storage::put(id.to_string(), yaml) }
fn del(&self, id: &str) -> Option<String> { crate::flow::storage::del(id) }
}
pub fn default_storage() -> std::sync::Arc<dyn Storage> { std::sync::Arc::new(GlobalMemoryStorage) }

View File

@ -15,6 +15,18 @@ pub fn default_registry() -> TaskRegistry {
let mut reg: TaskRegistry = TaskRegistry::new();
reg.insert("http".into(), Arc::new(crate::flow::executors::http::HttpTask::default()));
reg.insert("db".into(), Arc::new(crate::flow::executors::db::DbTask::default()));
// Script executors by language
reg.insert("script_rhai".into(), Arc::new(crate::flow::executors::script_rhai::ScriptRhaiTask::default()));
reg.insert("script".into(), Arc::new(crate::flow::executors::script_rhai::ScriptRhaiTask::default())); // default alias -> Rhai
reg.insert("script_js".into(), Arc::new(crate::flow::executors::script_js::ScriptJsTask::default()));
reg.insert("script_python".into(), Arc::new(crate::flow::executors::script_python::ScriptPythonTask::default()));
// Backward-compatible alias: expr -> script_rhai
reg.insert("expr".into(), Arc::new(crate::flow::executors::script_rhai::ScriptRhaiTask::default()));
// register variable executor
reg.insert("variable".into(), Arc::new(crate::flow::executors::variable::VariableTask::default()));
reg
}

View File

@ -364,6 +364,18 @@ pub async fn run(db: &Db, id: &str, req: RunReq, operator: Option<(i64, String)>
}
};
// 兜底移除 variable 节点:不在最终上下文暴露 variable_* 的配置
let mut ctx = ctx;
if let serde_json::Value::Object(map) = &mut ctx {
if let Some(serde_json::Value::Object(nodes)) = map.get_mut("nodes") {
let keys: Vec<String> = nodes
.iter()
.filter_map(|(k, v)| if v.get("variable").is_some() { Some(k.clone()) } else { None })
.collect();
for k in keys { nodes.remove(&k); }
}
}
// 调试:打印处理后的 ctx
//info!(target = "udmin", "flow.run: result ctx={}", serde_json::to_string(&ctx).unwrap_or_else(|_| "<serialize ctx failed>".to_string()));

View File

@ -20,6 +20,7 @@
"@flowgram.ai/free-node-panel-plugin": "^0.4.7",
"@flowgram.ai/free-snap-plugin": "^0.4.7",
"@flowgram.ai/minimap-plugin": "^0.4.7",
"@flowgram.ai/panel-manager-plugin": "^0.4.17",
"@flowgram.ai/runtime-interface": "^0.4.7",
"@flowgram.ai/runtime-js": "^0.4.7",
"antd": "^5.17.0",
@ -42,13 +43,20 @@
"@babel/plugin-proposal-private-property-in-object": "^7.21.11",
"@types/js-yaml": "^4.0.9",
"@types/lodash-es": "^4.17.12",
"@types/node": "^24.5.1",
"@types/react": "^18.3.24",
"@types/react-dom": "^18.3.7",
"@types/styled-components": "^5.1.34",
"@vitejs/plugin-react": "^4.2.0",
"@vitest/coverage-v8": "^3.2.4",
"babel-plugin-transform-typescript-metadata": "^0.3.2",
"jsdom": "^27.0.0",
"less": "^4.2.0",
"typescript": "^5.4.0",
"vite": "^5.2.0"
"vite": "^5.2.0",
"vitest": "^3.2.4"
},
"vitest": {
"environment": "node"
}
}

View File

@ -5,7 +5,7 @@
import { useRef, useEffect } from 'react';
import { Field, FieldRenderProps } from '@flowgram.ai/free-layout-editor';
import { Field, FieldRenderProps, I18n } from '@flowgram.ai/free-layout-editor';
import { Typography, Input } from '@douyinfe/semi-ui';
import { Title } from './styles';
@ -39,7 +39,10 @@ export function TitleInput(props: {
onBlur={() => updateTitleEdit(false)}
/>
) : (
<Text ellipsis={{ showTooltip: true }}>{value}</Text>
// 对默认的 Start/End 标题进行按需本地化显示
<Text ellipsis={{ showTooltip: true }}>{
value === 'Start' || value === 'End' ? I18n.t(value as any) : (value as any)
}</Text>
)}
<Feedback errors={fieldState?.errors} />
</div>

View File

@ -355,6 +355,9 @@ export function useEditorProps(
'Cannot paste nodes to invalid container': '无法粘贴到无效容器',
'Start': '开始',
'End': '结束',
// ==== 开始/结束节点描述 ====
'The starting node of the workflow, used to set the information needed to initiate the workflow.': '流程开始节点,用于设置启动流程所需的信息。',
'The final node of the workflow, used to return the result information after the workflow is run.': '流程结束节点,用于返回流程运行后的结果信息。',
'Variable List': '变量列表',
'Global Editor': '全局变量编辑',
'Global': '全局',

View File

@ -7,6 +7,7 @@ import { FlowNodeRegistry } from '../../typings';
import iconEnd from '../../assets/icon-end.jpg';
import { formMeta } from './form-meta';
import { WorkflowNodeType } from '../constants';
import { I18n } from '@flowgram.ai/free-layout-editor';
export const EndNodeRegistry: FlowNodeRegistry = {
type: WorkflowNodeType.End,
@ -23,7 +24,7 @@ export const EndNodeRegistry: FlowNodeRegistry = {
info: {
icon: iconEnd,
description:
'The final node of the workflow, used to return the result information after the workflow is run.',
I18n.t('The final node of the workflow, used to return the result information after the workflow is run.'),
},
/**
* Render node via formMeta

View File

@ -7,6 +7,7 @@ import { FlowNodeRegistry } from '../../typings';
import iconStart from '../../assets/icon-start.jpg';
import { formMeta } from './form-meta';
import { WorkflowNodeType } from '../constants';
import { I18n } from '@flowgram.ai/free-layout-editor';
export const StartNodeRegistry: FlowNodeRegistry = {
type: WorkflowNodeType.Start,
@ -24,7 +25,7 @@ export const StartNodeRegistry: FlowNodeRegistry = {
info: {
icon: iconStart,
description:
'The starting node of the workflow, used to set the information needed to initiate the workflow.',
I18n.t('The starting node of the workflow, used to set the information needed to initiate the workflow.'),
},
/**
* Render node via formMeta

View File

@ -0,0 +1,145 @@
import { describe, it, expect } from 'vitest'
import { stringifyFlowDoc } from './yaml'
import type { FlowDocumentJSON } from '../typings'
function baseNodes() {
return [
{ id: 'start_1', type: 'start', data: { title: 'Start' }, meta: {} } as any,
{ id: 'cond_1', type: 'condition', data: { title: 'Cond', conditions: [] }, meta: {} } as any,
{ id: 'code_1', type: 'code', data: { title: 'Code' }, meta: {} } as any,
]
}
describe('stringifyFlowDoc condition -> edges.condition', () => {
it('expression mode: raw expr filled to condition', () => {
const nodes = baseNodes()
;(nodes[1] as any).data.conditions = [
{ key: 'if_a', value: { type: 'expression', content: '1 + 1 == 2' } },
]
const doc: FlowDocumentJSON = {
nodes: nodes as any,
edges: [
{ sourceNodeID: 'start_1', targetNodeID: 'cond_1' } as any,
{ sourceNodeID: 'cond_1', sourcePortID: 'if_a', targetNodeID: 'code_1' } as any,
],
}
const y = stringifyFlowDoc(doc)
expect(y).toContain('condition: 1 + 1 == 2')
})
it('structured: is_empty and not_empty (unary ops)', () => {
const nodes = baseNodes()
;(nodes[1] as any).data.conditions = [
{ key: 'if_empty', value: { left: { type: 'ref', content: ['input', 'name'] }, operator: 'is_empty' } },
{ key: 'if_not_empty', value: { left: { type: 'ref', content: ['input', 'age'] }, operator: 'not_empty' } },
]
const doc: FlowDocumentJSON = {
nodes: nodes as any,
edges: [
{ sourceNodeID: 'cond_1', sourcePortID: 'if_empty', targetNodeID: 'code_1' } as any,
{ sourceNodeID: 'cond_1', sourcePortID: 'if_not_empty', targetNodeID: 'code_1' } as any,
],
}
const y = stringifyFlowDoc(doc)
expect(y).toContain('condition: is_empty((ctx["input"]["name"]))')
expect(y).toContain('condition: not_empty((ctx["input"]["age"]))')
})
it('structured: contains builds method call', () => {
const nodes = baseNodes()
;(nodes[1] as any).data.conditions = [
{ key: 'if_contains', value: { left: { type: 'ref', content: ['input', 'q'] }, operator: 'contains', right: { type: 'constant', content: 'hi' } } },
]
const doc: FlowDocumentJSON = {
nodes: nodes as any,
edges: [
{ sourceNodeID: 'cond_1', sourcePortID: 'if_contains', targetNodeID: 'code_1' } as any,
],
}
const y = stringifyFlowDoc(doc)
expect(y).toContain('condition: (ctx["input"]["q"]).contains("hi")')
})
it('structured: is_true / is_false, starts_with / ends_with', () => {
const nodes = baseNodes()
;(nodes[1] as any).data.conditions = [
{ key: 'if_true', value: { left: { type: 'ref', content: ['flags', 'ok'] }, operator: 'is_true' } },
{ key: 'if_false', value: { left: { type: 'ref', content: ['flags', 'ok'] }, operator: 'is_false' } },
{ key: 'if_sw', value: { left: { type: 'ref', content: ['input', 'name'] }, operator: 'starts_with', right: { type: 'constant', content: 'Mr' } } },
{ key: 'if_ew', value: { left: { type: 'ref', content: ['input', 'name'] }, operator: 'ends_with', right: { type: 'constant', content: 'Jr' } } },
]
const doc: FlowDocumentJSON = {
nodes: nodes as any,
edges: [
{ sourceNodeID: 'cond_1', sourcePortID: 'if_true', targetNodeID: 'code_1' } as any,
{ sourceNodeID: 'cond_1', sourcePortID: 'if_false', targetNodeID: 'code_1' } as any,
{ sourceNodeID: 'cond_1', sourcePortID: 'if_sw', targetNodeID: 'code_1' } as any,
{ sourceNodeID: 'cond_1', sourcePortID: 'if_ew', targetNodeID: 'code_1' } as any,
],
}
const y = stringifyFlowDoc(doc)
expect(y).toContain('condition: ((ctx["flags"]["ok"])) == true')
expect(y).toContain('condition: ((ctx["flags"]["ok"])) == false')
expect(y).toContain('condition: (ctx["input"]["name"]).starts_with("Mr")')
expect(y).toContain('condition: (ctx["input"]["name"]).ends_with("Jr")')
})
it('structured: regex uses regex_match helper', () => {
const nodes = baseNodes()
;(nodes[1] as any).data.conditions = [
{ key: 'if_regex', value: { left: { type: 'ref', content: ['input', 'email'] }, operator: 'regex', right: { type: 'constant', content: '^.+@example\\.com$' } } },
]
const doc: FlowDocumentJSON = {
nodes: nodes as any,
edges: [
{ sourceNodeID: 'cond_1', sourcePortID: 'if_regex', targetNodeID: 'code_1' } as any,
],
}
const y = stringifyFlowDoc(doc)
// allow one or two backslashes before the dot depending on string escaping
expect(y).toMatch(/condition: regex_match\(\(ctx\["input"\]\["email"\]\), "\^\.\+@example\\{1,2}\.com\$"\)/)
})
it('edges & node mapping: only conditional edges have condition, empty expr omitted; node kind mapped to condition', () => {
const nodes = baseNodes()
;(nodes[1] as any).data.conditions = [
{ key: 'if_empty_expr', value: { type: 'expression', content: ' ' } },
{ key: 'if_true', value: { left: { type: 'ref', content: ['flags', 'ok'] }, operator: 'is_true' } },
]
const doc: FlowDocumentJSON = {
nodes: nodes as any,
edges: [
{ sourceNodeID: 'start_1', targetNodeID: 'cond_1' } as any,
{ sourceNodeID: 'cond_1', sourcePortID: 'if_empty_expr', targetNodeID: 'code_1' } as any,
{ sourceNodeID: 'cond_1', sourcePortID: 'if_true', targetNodeID: 'code_1' } as any,
],
}
const y = stringifyFlowDoc(doc)
// node kind mapping
expect(y).toContain('\n - id: cond_1\n kind: condition')
// only one condition written (the is_true one)
const count = (y.match(/\bcondition:/g) || []).length
expect(count).toBe(1)
})
it('value building: constant number/boolean/string escaping and ref path', () => {
const nodes = baseNodes()
;(nodes[1] as any).data.conditions = [
{ key: 'num', value: { left: { type: 'ref', content: ['a', 'b'] }, operator: 'eq', right: { type: 'constant', content: 123 } } },
{ key: 'bool', value: { left: { type: 'ref', content: ['a', 'ok'] }, operator: 'eq', right: { type: 'constant', content: true } } },
{ key: 'str', value: { left: { type: 'ref', content: ['a', 's'] }, operator: 'eq', right: { type: 'constant', content: 'a"b\\c\n' } } },
]
const doc: FlowDocumentJSON = {
nodes: nodes as any,
edges: [
{ sourceNodeID: 'cond_1', sourcePortID: 'num', targetNodeID: 'code_1' } as any,
{ sourceNodeID: 'cond_1', sourcePortID: 'bool', targetNodeID: 'code_1' } as any,
{ sourceNodeID: 'cond_1', sourcePortID: 'str', targetNodeID: 'code_1' } as any,
],
}
const y = stringifyFlowDoc(doc)
expect(y).toContain('condition: (ctx["a"]["b"]) == (123)')
expect(y).toContain('condition: (ctx["a"]["ok"]) == (true)')
expect(y).toContain('condition: (ctx["a"]["s"]) == ("a\\"b\\\\c\\n")')
})
})

View File

@ -21,6 +21,9 @@ function mapKindToType(kind: string | undefined): string {
return 'llm'
case 'code':
return 'code'
case 'condition':
case 'decision':
return 'condition'
default:
return 'code'
}
@ -41,6 +44,8 @@ function mapTypeToKind(type: string | undefined): string {
return 'llm'
case 'code':
return 'code'
case 'condition':
return 'condition'
default:
return 'code'
}
@ -102,18 +107,111 @@ export function parseFlowYaml(yamlStr: string): { name?: string; doc: FlowDocume
return { name, doc }
}
// -------- Condition -> Rhai expression helpers --------
function escapeStringLiteral(s: string): string {
return '"' + String(s).replace(/\\/g, '\\\\').replace(/\"/g, '\\"').replace(/\n/g, '\\n').replace(/\r/g, '\\r') + '"'
}
function buildRefPath(ref: any): string {
// ref.content is like [nodeId, key1, key2, ...]
const parts = Array.isArray(ref?.content) ? ref.content : []
if (!parts.length) return 'null'
// Always use bracket-notation: ctx["node"]["key"]...
const segs = parts.map((p: any) => `[${escapeStringLiteral(String(p))}]`).join('')
return `ctx${segs}`
}
function buildValueExpr(v: any): string {
if (!v) return 'null'
const t = v.type || v.kind || ''
switch (t) {
case 'ref':
return buildRefPath(v)
case 'constant': {
const c = v.content
if (typeof c === 'string') return escapeStringLiteral(c)
if (typeof c === 'number') return String(c)
if (typeof c === 'boolean') return String(c)
// fallback stringify
try { return escapeStringLiteral(JSON.stringify(c)) } catch { return 'null' }
}
case 'expression': {
const s = (v.content ?? '').trim()
return s || 'false'
}
case 'template': {
// For now, treat template as a raw string constant
return escapeStringLiteral(String(v.content ?? ''))
}
default: {
// Maybe raw value { content: any }
if (v && 'content' in v) {
return buildValueExpr({ type: 'constant', content: (v as any).content })
}
return 'null'
}
}
}
function buildBinaryOpExpr(op: string, left: string, right?: string): string {
switch (op) {
case 'eq': return `(${left}) == (${right})`
case 'ne': return `(${left}) != (${right})`
case 'gt': return `(${left}) > (${right})`
case 'lt': return `(${left}) < (${right})`
case 'ge': return `(${left}) >= (${right})`
case 'le': return `(${left}) <= (${right})`
case 'contains': return `(${left}).contains(${right})`
case 'starts_with': return `(${left}).starts_with(${right})`
case 'ends_with': return `(${left}).ends_with(${right})`
case 'regex': return `regex_match((${left}), ${right})`
case 'is_true': return `((${left})) == true`
case 'is_false': return `((${left})) == false`
case 'is_empty': return `is_empty((${left}))`
case 'not_empty': return `not_empty((${left}))`
default: return 'false'
}
}
function conditionToRhaiExpr(value: any): string | undefined {
if (!value) return undefined
// expression mode
if (value.type === 'expression') {
const expr = String(value.content ?? '').trim()
return expr || undefined
}
// structured mode: { left, operator, right? }
const left = buildValueExpr(value.left)
const op = String(value.operator || '').toLowerCase()
const right = value.right != null ? buildValueExpr(value.right) : undefined
return buildBinaryOpExpr(op, left, right)
}
export function stringifyFlowDoc(doc: FlowDocumentJSON, name?: string): string {
const nodeMap = new Map<string, any>((doc.nodes || []).map((n: any) => [n.id, n]))
const data: any = {
...(name ? { name } : {}),
nodes: (doc.nodes || []).map((n) => ({
nodes: (doc.nodes || []).map((n: any) => ({
id: n.id,
kind: mapTypeToKind((n as any).type),
name: (n as any).data?.title || (n as any).type || 'node',
})),
edges: (doc.edges || []).map((e) => ({
from: (e as any).sourceNodeID,
to: (e as any).targetNodeID,
kind: mapTypeToKind(n.type),
name: n?.data?.title || n?.type || 'node',
})),
edges: (doc.edges || []).map((e: any) => {
const out: any = {
from: e.sourceNodeID,
to: e.targetNodeID,
}
const src = nodeMap.get(e.sourceNodeID)
if (src?.type === 'condition') {
const key = e.sourcePortID
const conds: any[] = Array.isArray(src?.data?.conditions) ? src.data.conditions : []
const condItem = conds.find((c: any) => c?.key === key)
const expr = conditionToRhaiExpr(condItem?.value)
if (expr) out.condition = expr
}
return out
}),
}
return yaml.dump(data, { lineWidth: 120 })
}

View File

@ -111,7 +111,7 @@ export function setupDevConsoleSuppression() {
origWarn(...args)
}
anyConsole.__patched_console__ = true
;(anyConsole as any).__patched_console__ = true
if (typeof console !== 'undefined' && console.debug) {
console.debug('[DEV] console.* patched to suppress known harmless warnings')
}

File diff suppressed because one or more lines are too long