From 75c6974a35aa761676522229ea3a4a7e6f9fc131 Mon Sep 17 00:00:00 2001 From: ayou <550244300@qq.com> Date: Wed, 3 Dec 2025 20:51:22 +0800 Subject: [PATCH] =?UTF-8?q?feat(flow):=20=E6=96=B0=E5=A2=9E=E5=88=86?= =?UTF-8?q?=E7=BB=84=E6=89=A7=E8=A1=8C=E4=B8=8E=E5=BC=82=E6=AD=A5=E6=A8=A1?= =?UTF-8?q?=E5=BC=8F=E6=94=AF=E6=8C=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit refactor(executors): 将 Rhai 引擎评估逻辑迁移至 script_rhai 模块 docs: 添加 Flow 架构文档与示例 JSON feat(i18n): 新增前端多语言支持 perf(axios): 优化 token 刷新与 401 处理逻辑 style: 统一代码格式化与简化条件判断 --- backend/src/flow/context.rs | 11 +- backend/src/flow/domain.rs | 24 + backend/src/flow/dsl.rs | 33 +- backend/src/flow/engine.rs | 621 +++++++++++------- backend/src/flow/executors/condition.rs | 2 +- backend/src/flow/executors/script_rhai.rs | 152 ++++- backend/src/flow/executors/variable.rs | 2 +- backend/src/services/flow_service.rs | 19 +- docs/flow/code_js1.json | 336 ++++++++++ docs/flow/code_js1_group.json | 355 ++++++++++ docs/flow_architecture.md | 227 +++++++ .../testrun/testrun-panel/index.tsx | 88 ++- frontend/src/flows/components/tools/index.tsx | 23 +- frontend/src/flows/services/custom-service.ts | 26 +- frontend/src/utils/axios.ts | 77 ++- frontend/src/utils/i18n.ts | 44 ++ frontend/tsconfig.tsbuildinfo | 2 +- .../code_js1_batch_20251010_235010.json | 29 + .../code_js1_batch_20251011_000537.json | 29 + .../code_js1_batch_20251011_001656.json | 29 + 20 files changed, 1830 insertions(+), 299 deletions(-) create mode 100644 docs/flow/code_js1.json create mode 100644 docs/flow/code_js1_group.json create mode 100644 docs/flow_architecture.md create mode 100644 frontend/src/utils/i18n.ts create mode 100644 scripts/results/code_js1_batch_20251010_235010.json create mode 100644 scripts/results/code_js1_batch_20251011_000537.json create mode 100644 scripts/results/code_js1_batch_20251011_001656.json diff --git a/backend/src/flow/context.rs b/backend/src/flow/context.rs index 1c0dffc..e25384b 100644 --- a/backend/src/flow/context.rs +++ b/backend/src/flow/context.rs @@ -10,6 +10,8 @@ pub struct FlowContext { pub enum ExecutionMode { #[serde(rename = "sync")] Sync, #[serde(rename = "async")] AsyncFireAndForget, + #[serde(rename = "queued")] AsyncQueued, + #[serde(rename = "bounded")] AsyncBounded, } impl Default for ExecutionMode { fn default() -> Self { ExecutionMode::Sync } } @@ -35,8 +37,15 @@ pub struct DriveOptions { // 新增:事件通道(仅运行时使用,不做序列化/反序列化) #[serde(default, skip_serializing, skip_deserializing)] pub event_tx: Option>, + // 新增:异步分组追踪器(仅运行时使用,不做序列化/反序列化) + #[serde(default, skip_serializing, skip_deserializing)] + pub async_groups: Option>>>>>, + #[serde(default, skip_serializing, skip_deserializing)] + pub group_semaphores: Option>>>>, + #[serde(default)] + pub bounded_limit: Option, } impl Default for DriveOptions { - fn default() -> Self { Self { max_steps: 10_000, execution_mode: ExecutionMode::Sync, event_tx: None } } + fn default() -> Self { Self { max_steps: 10_000, execution_mode: ExecutionMode::Sync, event_tx: None, async_groups: None, group_semaphores: None, bounded_limit: None } } } \ No newline at end of file diff --git a/backend/src/flow/domain.rs b/backend/src/flow/domain.rs index cf12746..675e015 100644 --- a/backend/src/flow/domain.rs +++ b/backend/src/flow/domain.rs @@ -1,4 +1,5 @@ use serde::{Deserialize, Serialize}; +use std::collections::HashMap; #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash, Default)] pub struct NodeId(pub String); @@ -41,4 +42,27 @@ pub struct ChainDef { pub nodes: Vec, #[serde(default)] pub links: Vec, + #[serde(default)] + pub groups: HashMap, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub enum GroupAwaitPolicy { + #[serde(rename = "none")] None, + #[serde(rename = "node_leave")] NodeLeave, + #[serde(rename = "branch_exit")] BranchExit, + #[serde(rename = "flow_end")] FlowEnd, +} + +impl Default for GroupAwaitPolicy { fn default() -> Self { GroupAwaitPolicy::BranchExit } } + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct GroupDef { + pub id: String, + #[serde(default)] + pub parent_id: Option, + #[serde(default)] + pub members: Vec, + #[serde(default)] + pub await_policy: GroupAwaitPolicy, } \ No newline at end of file diff --git a/backend/src/flow/dsl.rs b/backend/src/flow/dsl.rs index eb58b21..57e2780 100644 --- a/backend/src/flow/dsl.rs +++ b/backend/src/flow/dsl.rs @@ -88,6 +88,7 @@ impl From for super::domain::ChainDef { condition: e.condition, }) .collect(), + groups: std::collections::HashMap::default(), } } } @@ -179,13 +180,43 @@ fn build_chain_from_design(design: &DesignSyntax) -> anyhow::Result = Vec::new(); + let mut groups: std::collections::HashMap = std::collections::HashMap::new(); for n in &design.nodes { let kind = match n.kind.as_str() { "start" => NodeKind::Start, "end" => NodeKind::End, "condition" => NodeKind::Condition, + "group" => NodeKind::Task, // group 本身不作为可执行节点,稍后跳过入 nodes _ => NodeKind::Task, }; + if n.kind.as_str() == "group" { + // 解析分组:data.blockIDs(成员节点)、data.parentID(上层分组)、data.awaitPolicy(等待策略) + let members: Vec = n + .data + .get("blockIDs") + .and_then(|v| v.as_array()) + .map(|arr| arr.iter().filter_map(|x| x.as_str().map(|s| s.to_string())).collect()) + .unwrap_or_default(); + let parent_id: Option = n + .data + .get("parentID") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()); + let await_policy = match n + .data + .get("awaitPolicy") + .and_then(|v| v.as_str()) + .unwrap_or("branch_exit") + { + "none" => super::domain::GroupAwaitPolicy::None, + "node_leave" => super::domain::GroupAwaitPolicy::NodeLeave, + "flow_end" => super::domain::GroupAwaitPolicy::FlowEnd, + _ => super::domain::GroupAwaitPolicy::BranchExit, + }; + groups.insert(n.id.clone(), super::domain::GroupDef { id: n.id.clone(), parent_id, members, await_policy }); + // group 节点不加入执行节点集合 + continue; + } // 从节点 data.title 读取名称,若不存在则为空字符串 let name = n.data.get("title").and_then(|v| v.as_str()).unwrap_or("").to_string(); // 将可执行类型映射到任务标识(用于绑定任务实现) @@ -268,7 +299,7 @@ fn build_chain_from_design(design: &DesignSyntax) -> anyhow::Result validate -> build diff --git a/backend/src/flow/engine.rs b/backend/src/flow/engine.rs index c681ee2..6ab6f0d 100644 --- a/backend/src/flow/engine.rs +++ b/backend/src/flow/engine.rs @@ -1,28 +1,27 @@ //! 流程执行引擎(engine.rs):驱动 ChainDef 流程图,支持同步/异步任务、条件路由、并发分支与 SSE 推送。 -use std::cell::RefCell; -use std::collections::HashMap; + +use std::collections::{HashMap, HashSet}; use std::time::Instant; use crate::flow::executors::condition::eval_condition_json; use super::{ context::{DriveOptions, ExecutionMode}, - domain::{ChainDef, NodeKind}, + domain::{ChainDef, NodeKind, GroupAwaitPolicy, GroupDef}, task::TaskRegistry, }; use futures::future::join_all; -use regex::Regex; -use rhai::{AST, Engine}; -use tokio::sync::{Mutex, RwLock}; +use tokio::sync::{Mutex, RwLock, Semaphore}; use tracing::info; +use crate::flow::executors::script_rhai::eval_rhai_expr_bool; -// 结构体:紧随 use +/// FlowEngine 管理 TaskRegistry 并驱动流程执行 pub struct FlowEngine { pub tasks: TaskRegistry, } - +/// 驱动过程中的错误快照,包含触发节点、上下文与过程日志 #[derive(Debug, Clone)] pub struct DriveError { pub node_id: String, @@ -31,214 +30,207 @@ pub struct DriveError { pub message: String, } -// === 表达式评估支持:thread_local 引擎与 AST 缓存,避免全局 Sync/Send 限制 === +// Rhai helpers moved to executors/script_rhai.rs -// 模块:流程执行引擎(engine.rs) -// 作用:驱动 ChainDef 流程图,支持: -// - 同步/异步(Fire-and-Forget)任务执行 -// - 条件路由(Rhai 表达式与 JSON 条件)与无条件回退 -// - 并发分支 fan-out 与 join_all 等待 -// - SSE 实时事件推送(逐行增量 + 节点级切片) -// 设计要点: -// - 表达式执行使用 thread_local 的 Rhai Engine 与 AST 缓存,避免全局 Send/Sync 限制 -// - 共享上下文使用 RwLock 包裹 serde_json::Value;日志聚合使用 Mutex> -// - 不做冲突校验:允许并发修改;最后写回/写入按代码路径覆盖 -// -fn regex_match(s: &str, pat: &str) -> bool { - Regex::new(pat).map(|re| re.is_match(s)).unwrap_or(false) +/// 构建器:允许注入任务注册表,生成 FlowEngine +#[derive(Default)] +pub struct FlowEngineBuilder { + tasks: Option, } -// 常用字符串函数,便于在表达式中直接调用(函数式写法) -fn contains(s: &str, sub: &str) -> bool { s.contains(sub) } -fn starts_with(s: &str, prefix: &str) -> bool { s.starts_with(prefix) } -fn ends_with(s: &str, suffix: &str) -> bool { s.ends_with(suffix) } - -// 新增:判空/判不空(支持任意 Dynamic 类型) -fn is_empty(v: rhai::Dynamic) -> bool { - if v.is_unit() { return true; } - if let Some(s) = v.clone().try_cast::() { - return s.is_empty(); +// ---------- helper async functions: push+emit, group join ---------- +/// 写入节点日志并(若配置)通过 SSE 推送到前端 +/// - 日志:将消息追加至 `logs` +/// - 推送:若存在 `event_tx`,向前端推送单条事件 +async fn push_and_emit( + logs: &std::sync::Arc>>, + opts: &DriveOptions, + node_id: &str, + ctx: &std::sync::Arc>, + msg: String, +) { + { + let mut lg = logs.lock().await; + lg.push(msg.clone()); } - if let Some(a) = v.clone().try_cast::() { - return a.is_empty(); - } - if let Some(m) = v.clone().try_cast::() { - return m.is_empty(); - } - false -} - -fn not_empty(v: rhai::Dynamic) -> bool { !is_empty(v) } -thread_local! { - static RHIA_ENGINE: RefCell = RefCell::new({ - let mut eng = Engine::new(); - // 限制执行步数,防止复杂表达式消耗过多计算资源 - eng.set_max_operations(10_000_000); - // 严格变量模式,避免拼写错误导致静默为 null - eng.set_strict_variables(true); - // 注册常用工具函数 - eng.register_fn("regex_match", regex_match); - eng.register_fn("contains", contains); - eng.register_fn("starts_with", starts_with); - eng.register_fn("ends_with", ends_with); - // 新增:注册判空/判不空函数(既可函数式调用,也可方法式调用) - eng.register_fn("is_empty", is_empty); - eng.register_fn("not_empty", not_empty); - eng - }); - - // 简单的 AST 缓存:以表达式字符串为键存储编译结果(线程本地) - static AST_CACHE: RefCell> = RefCell::new(HashMap::new()); -} - -// 评估 Rhai 表达式为 bool,提供 ctx 变量(serde_json::Value) -fn eval_rhai_expr_bool(expr: &str, ctx: &serde_json::Value) -> bool { - // 构造作用域并注入 ctx - let mut scope = rhai::Scope::new(); - let dyn_ctx = match rhai::serde::to_dynamic(ctx.clone()) { Ok(d) => d, Err(_) => rhai::Dynamic::UNIT }; - scope.push_dynamic("ctx", dyn_ctx); - - // 先从缓存读取 AST;未命中则编译并写入缓存,然后执行 - let cached = AST_CACHE.with(|c| c.borrow().get(expr).cloned()); - if let Some(ast) = cached { - return RHIA_ENGINE.with(|eng| eng.borrow().eval_ast_with_scope::(&mut scope, &ast).unwrap_or(false)); - } - - let compiled = RHIA_ENGINE.with(|eng| eng.borrow().compile_with_scope(&mut scope, expr)); - match compiled { - Ok(ast) => { - // 简单容量控制:超过 1024 条时清空,避免无限增长 - AST_CACHE.with(|c| { - let mut cache = c.borrow_mut(); - if cache.len() > 1024 { cache.clear(); } - cache.insert(expr.to_string(), ast.clone()); - }); - RHIA_ENGINE.with(|eng| eng.borrow().eval_ast_with_scope::(&mut scope, &ast).unwrap_or(false)) - } - Err(_) => false, + if let Some(tx) = opts.event_tx.as_ref() { + let ctx_snapshot = { ctx.read().await.clone() }; + crate::middlewares::sse::emit_node(&tx, node_id.to_string(), vec![msg], ctx_snapshot).await; } } -// 通用:评估 Rhai 表达式并转换为 serde_json::Value,失败返回错误 -#[derive(Debug, Clone)] -pub enum RhaiExecError { - Compile { message: String }, - Runtime { message: String }, - Serde { message: String }, -} - -impl std::fmt::Display for RhaiExecError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - RhaiExecError::Compile { message } => write!(f, "compile error: {}", message), - RhaiExecError::Runtime { message } => write!(f, "runtime error: {}", message), - RhaiExecError::Serde { message } => write!(f, "serde error: {}", message), +/// 按组策略等待异步任务完成 +/// - FlowEnd:流程结束前统一等待组内所有任务 +/// - BranchExit:分支退出时等待当前组任务 +async fn join_groups_by_policy( + groups: &std::sync::Arc>, + tracker_opt: &Option>>>>>, + policy: GroupAwaitPolicy, + logs: &std::sync::Arc>>, + opts: &DriveOptions, + node_id: &str, + ctx: &std::sync::Arc>, +) { + if let Some(tracker_arc) = tracker_opt.as_ref() { + let mut tracker = tracker_arc.lock().await; + let group_ids: Vec = groups + .iter() + .filter(|(_, g)| g.await_policy == policy) + .map(|(k, _)| k.clone()) + .collect(); + for gid in group_ids { + if let Some(handles) = tracker.get_mut(&gid) { + let count = handles.len(); + for h in handles.drain(..) { let _ = h.await; } + push_and_emit(logs, opts, node_id, ctx, format!("join group: {} done ({} tasks)", gid, count)).await; + info!(target = "udmin.flow", "join group: {} done ({} tasks)", gid, count); + } } } } -impl std::error::Error for RhaiExecError {} - -pub(crate) fn eval_rhai_expr_json(expr: &str, ctx: &serde_json::Value) -> Result { - // 构造作用域并注入 ctx - let mut scope = rhai::Scope::new(); - let dyn_ctx = match rhai::serde::to_dynamic(ctx.clone()) { Ok(d) => d, Err(_) => rhai::Dynamic::UNIT }; - scope.push_dynamic("ctx", dyn_ctx); - - // 先从缓存读取 AST;未命中则编译并写入缓存,然后执行 - let cached = AST_CACHE.with(|c| c.borrow().get(expr).cloned()); - let eval = |ast: &AST, scope: &mut rhai::Scope| -> Result { - RHIA_ENGINE.with(|eng| { - eng.borrow() - .eval_ast_with_scope::(scope, ast) - .map_err(|e| RhaiExecError::Runtime { message: e.to_string() }) - .and_then(|d| rhai::serde::from_dynamic(&d).map_err(|e| RhaiExecError::Serde { message: e.to_string() })) - }) - }; - - if let Some(ast) = cached { - return eval(&ast, &mut scope); - } - - let compiled = RHIA_ENGINE.with(|eng| eng.borrow().compile_with_scope(&mut scope, expr)); - match compiled { - Ok(ast) => { - AST_CACHE.with(|c| { - let mut cache = c.borrow_mut(); - if cache.len() > 1024 { cache.clear(); } - cache.insert(expr.to_string(), ast.clone()); - }); - eval(&ast, &mut scope) +/// 在离开组时按组 ID 进行等待,用于分支退出屏障 +async fn join_groups_by_ids( + groups: &std::sync::Arc>, + tracker_opt: &Option>>>>>, + group_ids: &[String], + logs: &std::sync::Arc>>, + opts: &DriveOptions, + node_id: &str, + ctx: &std::sync::Arc>, +) { + if let Some(tracker_arc) = tracker_opt.as_ref() { + let mut tracker = tracker_arc.lock().await; + for gid in group_ids { + if matches!(groups.get(gid).map(|g| g.await_policy.clone()), Some(GroupAwaitPolicy::BranchExit)) { + if let Some(handles) = tracker.get_mut(gid) { + let count = handles.len(); + for h in handles.drain(..) { let _ = h.await; } + push_and_emit(logs, opts, node_id, ctx, format!("join group: {} done ({} tasks)", gid, count)).await; + info!(target = "udmin.flow", "join group: {} done ({} tasks)", gid, count); + } + } } - Err(e) => Err(RhaiExecError::Compile { message: e.to_string() }), } } +// ---------- FlowEngine impl ---------- impl FlowEngine { pub fn new(tasks: TaskRegistry) -> Self { Self { tasks } } pub fn builder() -> FlowEngineBuilder { FlowEngineBuilder::default() } + /// 驱动流程:构建图与共享上下文,递归执行节点,支持并发与合流 pub async fn drive(&self, chain: &ChainDef, ctx: serde_json::Value, opts: DriveOptions) -> anyhow::Result<(serde_json::Value, Vec)> { - // 1) 选取起点:优先 Start;否则入度为 0;再否则第一个节点 - // 查找 start:优先 Start 节点;否则选择入度为 0 的第一个节点;再否则回退第一个节点 - let start = if let Some(n) = chain - .nodes - .iter() - .find(|n| matches!(n.kind, NodeKind::Start)) - { + // 总体思路:选起点 → 构建邻接与组信息 → 初始化共享状态 → 调用递归驱动 + // 1) 选起点(Start 优先) + let start = if let Some(n) = chain.nodes.iter().find(|n| matches!(n.kind, NodeKind::Start)) { n.id.0.clone() } else { - // 计算入度 let mut indeg: HashMap<&str, usize> = HashMap::new(); for n in &chain.nodes { indeg.entry(n.id.0.as_str()).or_insert(0); } for l in &chain.links { *indeg.entry(l.to.0.as_str()).or_insert(0) += 1; } if let Some(n) = chain.nodes.iter().find(|n| indeg.get(n.id.0.as_str()).copied().unwrap_or(0) == 0) { n.id.0.clone() } else { - chain - .nodes - .first() - .ok_or_else(|| anyhow::anyhow!("empty chain"))? - .id - .0 - .clone() + chain.nodes.first().ok_or_else(|| anyhow::anyhow!("empty chain"))?.id.0.clone() } }; - // 2) 构建可并发共享的数据结构 - // 拷贝节点与边(保持原有顺序)到拥有所有权的 HashMap,供并发分支安全使用 + // 2) 构建数据结构(Arc 包裹,便于并发使用) let node_map_owned: HashMap = chain.nodes.iter().map(|n| (n.id.0.clone(), n.clone())).collect(); let mut adj_owned: HashMap> = HashMap::new(); for l in &chain.links { adj_owned.entry(l.from.0.clone()).or_default().push(l.clone()); } let node_map = std::sync::Arc::new(node_map_owned); let adj = std::sync::Arc::new(adj_owned); - // 共享上下文(允许并发修改,程序端不做冲突校验) let shared_ctx = std::sync::Arc::new(RwLock::new(ctx)); - // 共享日志聚合 let logs_shared = std::sync::Arc::new(Mutex::new(Vec::::new())); - // 3) 并发驱动从起点开始 - let tasks = self.tasks.clone(); - drive_from(tasks, node_map.clone(), adj.clone(), start, shared_ctx.clone(), opts.clone(), logs_shared.clone()).await?; + // groups & node->groups + let groups_owned: std::collections::HashMap = chain.groups.clone(); + let groups = std::sync::Arc::new(groups_owned); + + let mut node_groups_map: std::collections::HashMap> = std::collections::HashMap::new(); + for (gid, g) in groups.as_ref().iter() { + for mid in &g.members { node_groups_map.entry(mid.clone()).or_default().push(gid.clone()); } + } + let node_groups = std::sync::Arc::new(node_groups_map); + + // group depths + fn group_depth<'a>(gid: &str, groups: &'a std::collections::HashMap) -> usize { + let mut depth = 0usize; + let mut cur = gid; + while let Some(g) = groups.get(cur) { + if let Some(pid) = g.parent_id.as_deref() { depth += 1; cur = pid; } else { break; } + } + depth + } + let mut depths: std::collections::HashMap = std::collections::HashMap::new(); + for gid in groups.as_ref().keys() { depths.insert(gid.clone(), group_depth(gid, groups.as_ref())); } + let group_depths = std::sync::Arc::new(depths); + + // in-degrees + let mut indeg_owned: HashMap = HashMap::new(); + for n in &chain.nodes { indeg_owned.insert(n.id.0.clone(), 0); } + for l in &chain.links { *indeg_owned.entry(l.to.0.clone()).or_insert(0) += 1; } + let in_degrees = std::sync::Arc::new(indeg_owned); + + // executed merge set & arrivals map + let executed_merge = std::sync::Arc::new(tokio::sync::Mutex::new(HashSet::::new())); + let arrivals = std::sync::Arc::new(tokio::sync::Mutex::new(HashMap::::new())); + + // 4) 发起驱动 + let tasks = self.tasks.clone(); + let mut opts2 = opts.clone(); + if opts2.async_groups.is_none() { + opts2.async_groups = Some(std::sync::Arc::new(tokio::sync::Mutex::new(std::collections::HashMap::new()))); + } + if opts2.group_semaphores.is_none() { + match opts2.execution_mode { + ExecutionMode::AsyncQueued | ExecutionMode::AsyncBounded => { + opts2.group_semaphores = Some(std::sync::Arc::new(tokio::sync::Mutex::new(std::collections::HashMap::new()))); + } + _ => {} + } + } + + drive_from( + tasks, + node_map.clone(), + adj.clone(), + groups.clone(), + node_groups.clone(), + group_depths.clone(), + in_degrees.clone(), + executed_merge.clone(), + arrivals.clone(), + start, + shared_ctx.clone(), + opts2.clone(), + logs_shared.clone(), + ).await?; - // 4) 汇总返回 let final_ctx = { shared_ctx.read().await.clone() }; let logs = { logs_shared.lock().await.clone() }; Ok((final_ctx, logs)) } } -// 从指定节点开始驱动,遇到多条满足条件的边时: -// - 第一条在当前任务内继续 -// - 其余分支并行 spawn,等待全部分支执行完毕后返回 +// ---------- 递归驱动函数 ---------- +/// 递归驱动子流程:屏障检查、执行模式选择、分支并行与组等待 async fn drive_from( tasks: TaskRegistry, node_map: std::sync::Arc>, adj: std::sync::Arc>>, + groups: std::sync::Arc>, + node_groups: std::sync::Arc>>, + group_depths: std::sync::Arc>, + in_degrees: std::sync::Arc>, + executed_merge: std::sync::Arc>>, + arrivals: std::sync::Arc>>, start: String, - ctx: std::sync::Arc>, // 共享上下文(并发写入通过写锁串行化,不做冲突校验) + ctx: std::sync::Arc>, opts: DriveOptions, logs: std::sync::Arc>>, ) -> anyhow::Result<()> { @@ -248,43 +240,53 @@ async fn drive_from( loop { if steps >= opts.max_steps { break; } steps += 1; - // 读取节点 + let node = match node_map.get(¤t) { Some(n) => n, None => break }; - // 进入节点:打点 let node_id_str = node.id.0.clone(); let node_start = Instant::now(); - // 进入节点前记录当前日志长度,便于节点结束时做切片 let pre_len = { logs.lock().await.len() }; - // 在每次追加日志时同步发送一条增量 SSE 事件(仅 1 行日志),以提升实时性 - // push_and_emit: - // - 先将单行日志 push 到共享日志 - // - 若存在 SSE 通道,截取上下文快照并发送单行增量事件 - async fn push_and_emit( - logs: &std::sync::Arc>>, - opts: &super::context::DriveOptions, - node_id: &str, - ctx: &std::sync::Arc>, - msg: String, - ) { - { - let mut lg = logs.lock().await; - lg.push(msg.clone()); - } - if let Some(tx) = opts.event_tx.as_ref() { - let ctx_snapshot = { ctx.read().await.clone() }; - crate::middlewares::sse::emit_node(&tx, node_id.to_string(), vec![msg], ctx_snapshot).await; - } - } - // enter 节点也实时推送 + + // 进入节点 push_and_emit(&logs, &opts, &node_id_str, &ctx, format!("enter node: {}", node.id.0)).await; info!(target: "udmin.flow", "enter node: {}", node.id.0); - // 执行任务 + // 合流屏障处理(多前驱且不在组内) + // 设计意图:在未全部到达前阻止抢跑,保证后继节点的输入完整 + let indeg = in_degrees.get(&node_id_str).copied().unwrap_or(0); + let in_any_group = node_groups.get(&node_id_str).map(|v| !v.is_empty()).unwrap_or(false); + if indeg > 1 && !in_any_group { + let arrived_cnt = { + let arr = arrivals.lock().await; + arr.get(&node_id_str).copied().unwrap_or(0) + }; + if arrived_cnt < indeg { + push_and_emit(&logs, &opts, &node_id_str, &ctx, format!("skip node (merge barrier not ready): {} {}/{}", node_id_str, arrived_cnt, indeg)).await; + info!(target: "udmin.flow", "skip node (merge barrier not ready): {} {}/{}", node_id_str, arrived_cnt, indeg); + break; + } else { + let mut mark = executed_merge.lock().await; + if mark.contains(&node_id_str) { + push_and_emit(&logs, &opts, &node_id_str, &ctx, format!("skip node (merge dedup): {}", node_id_str)).await; + info!(target: "udmin.flow", "skip node (merge dedup): {}", node_id_str); + break; + } else { + mark.insert(node_id_str.clone()); + } + } + } + + // 执行任务(若有) if let Some(task_name) = &node.task { if let Some(task) = tasks.get(task_name) { - match opts.execution_mode { + let in_groups = node_groups.get(&node_id_str).map(|v| !v.is_empty()).unwrap_or(false); + let node_exec_mode = match opts.execution_mode { + ExecutionMode::Sync => ExecutionMode::Sync, + ExecutionMode::AsyncFireAndForget => if in_groups { ExecutionMode::AsyncFireAndForget } else { ExecutionMode::Sync }, + ExecutionMode::AsyncQueued | ExecutionMode::AsyncBounded => if in_groups { ExecutionMode::AsyncFireAndForget } else { ExecutionMode::Sync }, + }; + + match node_exec_mode { ExecutionMode::Sync => { - // 使用快照执行,结束后整体写回(允许最后写入覆盖并发修改;程序端不做冲突校验) let mut local_ctx = { ctx.read().await.clone() }; match task.execute(&node.id, node, &mut local_ctx).await { Ok(_) => { @@ -295,15 +297,13 @@ async fn drive_from( Err(e) => { let err_msg = format!("task error: {}: {}", task_name, e); push_and_emit(&logs, &opts, &node_id_str, &ctx, err_msg.clone()).await; - // 捕获快照并返回 DriveError let ctx_snapshot = { ctx.read().await.clone() }; let logs_snapshot = { logs.lock().await.clone() }; return Err(anyhow::Error::new(DriveError { node_id: node_id_str.clone(), ctx: ctx_snapshot, logs: logs_snapshot, message: err_msg })); } } } - ExecutionMode::AsyncFireAndForget => { - // fire-and-forget:基于快照执行,不写回共享 ctx(变量任务除外:做有界差异写回) + ExecutionMode::AsyncFireAndForget | ExecutionMode::AsyncQueued | ExecutionMode::AsyncBounded => { let task_ctx = { ctx.read().await.clone() }; let task_arc = task.clone(); let name_for_log = task_name.clone(); @@ -312,13 +312,114 @@ async fn drive_from( let logs_clone = logs.clone(); let ctx_clone = ctx.clone(); let event_tx_opt = opts.event_tx.clone(); - tokio::spawn(async move { + + let sems_opt = opts.group_semaphores.clone(); + let exec_mode_copy = opts.execution_mode.clone(); + let bounded_limit = opts.bounded_limit; + let deepest_gid_for_async = node_groups.get(&node_id_str).and_then(|gids| { + if gids.is_empty() { None } else { gids.iter().max_by_key(|gid| group_depths.get(*gid).copied().unwrap_or(0)).cloned() } + }); + + let handle: tokio::task::JoinHandle<()> = tokio::spawn(async move { + if let Some(gid) = deepest_gid_for_async.clone() { + if matches!(exec_mode_copy, ExecutionMode::AsyncQueued | ExecutionMode::AsyncBounded) { + if let Some(sems_arc) = sems_opt.as_ref() { + let sem = { + let mut map = sems_arc.lock().await; + use std::collections::hash_map::Entry; + match map.entry(gid.clone()) { + Entry::Occupied(e) => e.get().clone(), + Entry::Vacant(v) => { + let limit = match exec_mode_copy { ExecutionMode::AsyncQueued => 1, ExecutionMode::AsyncBounded => bounded_limit.unwrap_or(2), _ => 1 }; + let s = std::sync::Arc::new(Semaphore::new(limit)); + v.insert(s.clone()); + s + } + } + }; + let _permit = sem.acquire_owned().await.unwrap(); + let mut c = task_ctx.clone(); + let _ = task_arc.execute(&node_id, &node_def, &mut c).await; + if node_def.task.as_deref() == Some("variable") { + let mut changed: Vec<(String, serde_json::Value)> = Vec::new(); + if let (serde_json::Value::Object(before_map), serde_json::Value::Object(after_map)) = (&task_ctx, &c) { + for (k, v_after) in after_map.iter() { + if k == "nodes" { continue; } + match before_map.get(k) { + Some(v_before) if v_before == v_after => {} + _ => changed.push((k.clone(), v_after.clone())), + } + } + } + { + let mut w = ctx_clone.write().await; + if let serde_json::Value::Object(map) = &mut *w { + for (k, v) in changed.into_iter() { map.insert(k, v); } + if let Some(serde_json::Value::Object(nodes)) = map.get_mut("nodes") { + nodes.remove(node_id.0.as_str()); + } + } + } + { + let mut lg = logs_clone.lock().await; + lg.push(format!("exec task done (async): {} (writeback variable)", name_for_log)); + } + if let Some(tx) = event_tx_opt.as_ref() { + let ctx_snapshot = { ctx_clone.read().await.clone() }; + crate::middlewares::sse::emit_node(&tx, node_id.0.clone(), vec![format!("exec task done (async): {} (writeback variable)", name_for_log)], ctx_snapshot).await; + } + } else if matches!(node_def.task.as_deref(), Some("http") | Some("db")) { + { + let mut w = ctx_clone.write().await; + if let serde_json::Value::Object(map) = &mut *w { + let after_node = c.get("nodes").and_then(|v| v.get(node_id.0.as_str())).cloned(); + if let Some(serde_json::Value::Object(new_node_obj)) = after_node { + if let Some(serde_json::Value::Object(nodes)) = map.get_mut("nodes") { + nodes.insert(node_id.0.clone(), serde_json::Value::Object(new_node_obj)); + } else { + let mut nodes_obj = serde_json::Map::new(); + nodes_obj.insert(node_id.0.clone(), serde_json::Value::Object(new_node_obj)); + map.insert("nodes".to_string(), serde_json::Value::Object(nodes_obj)); + } + } + for key in ["http_response", "db_response"] { + let before = task_ctx.get(key); + let after = c.get(key); + if let Some(v_after) = after { + if before != after { + map.insert(key.to_string(), v_after.clone()); + } + } + } + } + } + { + let mut lg = logs_clone.lock().await; + lg.push(format!("exec task done (async): {} (writeback outputs)", name_for_log)); + } + if let Some(tx) = event_tx_opt.as_ref() { + let ctx_snapshot = { ctx_clone.read().await.clone() }; + crate::middlewares::sse::emit_node(&tx, node_id.0.clone(), vec![format!("exec task done (async): {} (writeback outputs)", name_for_log)], ctx_snapshot).await; + } + } else { + { + let mut lg = logs_clone.lock().await; + lg.push(format!("exec task done (async): {}", name_for_log)); + } + if let Some(tx) = event_tx_opt.as_ref() { + let ctx_snapshot = { ctx_clone.read().await.clone() }; + crate::middlewares::sse::emit_node(&tx, node_id.0.clone(), vec![format!("exec task done (async): {}", name_for_log)], ctx_snapshot).await; + } + } + return; + } + } + } let mut c = task_ctx.clone(); let _ = task_arc.execute(&node_id, &node_def, &mut c).await; - // 对 variable 任务执行写回:将顶层新增/修改的键写回共享 ctx,并移除对应 variable 节点 + // variable 任务差异写回 if node_def.task.as_deref() == Some("variable") { - // 计算顶层差异(排除 nodes),仅在不同或新增时写回 let mut changed: Vec<(String, serde_json::Value)> = Vec::new(); if let (serde_json::Value::Object(before_map), serde_json::Value::Object(after_map)) = (&task_ctx, &c) { for (k, v_after) in after_map.iter() { @@ -342,18 +443,53 @@ async fn drive_from( let mut lg = logs_clone.lock().await; lg.push(format!("exec task done (async): {} (writeback variable)", name_for_log)); } - // 实时推送异步完成日志 if let Some(tx) = event_tx_opt.as_ref() { let ctx_snapshot = { ctx_clone.read().await.clone() }; crate::middlewares::sse::emit_node(&tx, node_id.0.clone(), vec![format!("exec task done (async): {} (writeback variable)", name_for_log)], ctx_snapshot).await; } info!(target: "udmin.flow", "exec task done (async): {} (writeback variable)", name_for_log); + } else if matches!(node_def.task.as_deref(), Some("http") | Some("db")) { + // http/db 写回策略 + { + let mut w = ctx_clone.write().await; + if let serde_json::Value::Object(map) = &mut *w { + let after_node = c.get("nodes").and_then(|v| v.get(node_id.0.as_str())).cloned(); + if let Some(serde_json::Value::Object(new_node_obj)) = after_node { + if let Some(serde_json::Value::Object(nodes)) = map.get_mut("nodes") { + nodes.insert(node_id.0.clone(), serde_json::Value::Object(new_node_obj)); + } else { + // 写回到 ctx.nodes.:记录任务类型与响应体 + let mut nodes_obj = serde_json::Map::new(); + nodes_obj.insert(node_id.0.clone(), serde_json::Value::Object(new_node_obj)); + map.insert("nodes".to_string(), serde_json::Value::Object(nodes_obj)); + } + } + // 若顶层 http_response/db_response 发生变化,选择性写回共享 ctx + for key in ["http_response", "db_response"] { + let before = task_ctx.get(key); + let after = c.get(key); + if let Some(v_after) = after { + if before != after { + map.insert(key.to_string(), v_after.clone()); + } + } + } + } + } + { + let mut lg = logs_clone.lock().await; + lg.push(format!("exec task done (async): {} (writeback outputs)", name_for_log)); + } + if let Some(tx) = event_tx_opt.as_ref() { + let ctx_snapshot = { ctx_clone.read().await.clone() }; + crate::middlewares::sse::emit_node(&tx, node_id.0.clone(), vec![format!("exec task done (async): {} (writeback outputs)", name_for_log)], ctx_snapshot).await; + } + info!(target: "udmin.flow", "exec task done (async): {} (writeback outputs)", name_for_log); } else { { let mut lg = logs_clone.lock().await; lg.push(format!("exec task done (async): {}", name_for_log)); } - // 实时推送异步完成日志 if let Some(tx) = event_tx_opt.as_ref() { let ctx_snapshot = { ctx_clone.read().await.clone() }; crate::middlewares::sse::emit_node(&tx, node_id.0.clone(), vec![format!("exec task done (async): {}", name_for_log)], ctx_snapshot).await; @@ -361,8 +497,28 @@ async fn drive_from( info!(target: "udmin.flow", "exec task done (async): {}", name_for_log); } }); - push_and_emit(&logs, &opts, &node_id_str, &ctx, format!("spawn task: {} (async)", task_name)).await; - info!(target: "udmin.flow", "spawn task: {} (async)", task_name); + + push_and_emit(&logs, &opts, &node_id_str, &ctx, format!("spawn task: {} (async, by group)", task_name)).await; + info!(target: "udmin.flow", "spawn task: {} (async, by group)", task_name); + + // track join handle into group tracker(最内层组) + if let Some(gids) = node_groups.get(&node_id_str) { + if !gids.is_empty() { + let deepest = gids.iter().max_by_key(|gid| group_depths.get(*gid).copied().unwrap_or(0)).cloned(); + if let Some(gid) = deepest { + if let Some(tracker_arc) = opts.async_groups.as_ref() { + let mut tracker = tracker_arc.lock().await; + use std::collections::hash_map::Entry; + match tracker.entry(gid.clone()) { + Entry::Occupied(mut e) => { e.get_mut().push(handle); } + Entry::Vacant(e) => { e.insert(vec![handle]); } + } + push_and_emit(&logs, &opts, &node_id_str, &ctx, format!("track async task in group: {}", gid)).await; + info!(target: "udmin.flow", "track async task in group: {}", gid); + } + } + } + } } } } else { @@ -371,11 +527,12 @@ async fn drive_from( } } - // End 节点:记录耗时后结束 + // End 节点 if matches!(node.kind, NodeKind::End) { let duration = node_start.elapsed().as_millis(); push_and_emit(&logs, &opts, &node_id_str, &ctx, format!("leave node: {} {} ms", node_id_str, duration)).await; info!(target: "udmin.flow", "leave node: {} {} ms", node_id_str, duration); + join_groups_by_policy(&groups, &opts.async_groups, GroupAwaitPolicy::FlowEnd, &logs, &opts, &node_id_str, &ctx).await; if let Some(tx) = opts.event_tx.as_ref() { let node_logs = { let lg = logs.lock().await; lg[pre_len..].to_vec() }; let ctx_snapshot = { ctx.read().await.clone() }; @@ -384,18 +541,17 @@ async fn drive_from( break; } - // 选择下一批 link:仅在 Condition 节点上评估条件;其他节点忽略条件,直接沿第一条边前进 + // 选择后继 let mut nexts: Vec = Vec::new(); if let Some(links) = adj.get(node.id.0.as_str()) { if matches!(node.kind, NodeKind::Condition) { - // 条件边:全部评估为真者加入 nexts(空字符串条件视为无条件,不在此处评估) for link in links.iter() { if let Some(cond_str) = &link.condition { if cond_str.trim().is_empty() { - // 空条件:视为无条件边,留待后续回退逻辑处理 info!(target: "udmin.flow", from=%node.id.0, to=%link.to.0, "condition link: empty (unconditional candidate)"); continue; } + // 条件:以 '{' 或 '[' 开头按 JSON 规则;否则按 Rhai 表达式 let trimmed = cond_str.trim_start(); let (kind, ok) = if trimmed.starts_with('{') || trimmed.starts_with('[') { match serde_json::from_str::(cond_str) { @@ -412,11 +568,9 @@ async fn drive_from( info!(target: "udmin.flow", from=%node.id.0, to=%link.to.0, cond_kind=%kind, cond_len=%cond_str.len(), result=%ok, "condition link evaluated"); if ok { nexts.push(link.to.0.clone()); } } else { - // 无 condition 字段:视为无条件边 info!(target: "udmin.flow", from=%node.id.0, to=%link.to.0, "condition link: none (unconditional candidate)"); } } - // 若没有命中条件边,则取第一条无条件边(无条件 = 无 condition 或 空字符串) if nexts.is_empty() { let mut picked = None; for link in links.iter() { @@ -434,7 +588,6 @@ async fn drive_from( } } } else { - // 非条件节点:忽略条件,fan-out 所有出边(全部并行执行) for link in links.iter() { nexts.push(link.to.0.clone()); info!(target: "udmin.flow", from=%node.id.0, to=%link.to.0, "fan-out from non-condition node"); @@ -442,7 +595,7 @@ async fn drive_from( } } - // 无后继:记录耗时后结束 + // 无后继 -> 结束 if nexts.is_empty() { let duration = node_start.elapsed().as_millis(); { @@ -459,8 +612,18 @@ async fn drive_from( break; } - // 单分支:记录耗时后前进 + // 单分支 if nexts.len() == 1 { + let next_id = nexts[0].clone(); + let cur_groups = node_groups.get(&node_id_str).cloned().unwrap_or_default(); + let next_groups = node_groups.get(&next_id).cloned().unwrap_or_default(); + let mut exit_gids: Vec = Vec::new(); + if !cur_groups.is_empty() { + let next_set: HashSet = next_groups.into_iter().collect(); + for gid in cur_groups.into_iter() { if !next_set.contains(&gid) { exit_gids.push(gid); } } + } + if !exit_gids.is_empty() { join_groups_by_ids(&groups, &opts.async_groups, &exit_gids, &logs, &opts, &node_id_str, &ctx).await; } + let duration = node_start.elapsed().as_millis(); { let mut lg = logs.lock().await; @@ -472,26 +635,44 @@ async fn drive_from( let ctx_snapshot = { ctx.read().await.clone() }; crate::middlewares::sse::emit_node(&tx, node_id_str.clone(), node_logs, ctx_snapshot).await; } - current = nexts.remove(0); + + // 标记到达(合流计数) + { + let mut arr = arrivals.lock().await; + let ent = arr.entry(next_id.clone()).or_insert(0); + *ent += 1; + } + current = next_id; continue; } - // 多分支:主分支沿第一条继续,其余分支并行执行并等待完成 + // 多分支:主分支继续,其他并行 let mut futs = Vec::new(); for to_id in nexts.iter().skip(1).cloned() { let tasks_c = tasks.clone(); let node_map_c = node_map.clone(); let adj_c = adj.clone(); + let groups_c = groups.clone(); + let node_groups_c = node_groups.clone(); + let group_depths_c = group_depths.clone(); + let in_degrees_c = in_degrees.clone(); + let executed_merge_c = executed_merge.clone(); + let arrivals_c = arrivals.clone(); let ctx_c = ctx.clone(); let opts_c = opts.clone(); let logs_c = logs.clone(); - futs.push(drive_from(tasks_c, node_map_c, adj_c, to_id, ctx_c, opts_c, logs_c)); + futs.push(drive_from(tasks_c, node_map_c, adj_c, groups_c, node_groups_c, group_depths_c, in_degrees_c, executed_merge_c, arrivals_c, to_id, ctx_c, opts_c, logs_c)); } - // 当前分支继续第一条 + + // 当前分支继续第一个 current = nexts.into_iter().next().unwrap(); - // 在一个安全点等待已分支的完成(这里选择在下一轮进入前等待) + // 等待并分支完成 let _ = join_all(futs).await; - // 多分支:记录当前节点耗时(包含等待其他分支完成的时间) + + // branch_exit group join + join_groups_by_policy(&groups, &opts.async_groups, GroupAwaitPolicy::BranchExit, &logs, &opts, &node_id_str, &ctx).await; + + // 记录离开(包含等待时间) let duration = node_start.elapsed().as_millis(); { let mut lg = logs.lock().await; @@ -508,28 +689,26 @@ async fn drive_from( Ok(()) } -#[derive(Default)] -pub struct FlowEngineBuilder { - tasks: Option, -} impl FlowEngineBuilder { + /// 指定任务注册表 pub fn tasks(mut self, reg: TaskRegistry) -> Self { self.tasks = Some(reg); self } + /// 构建引擎,若未指定则使用默认注册表 pub fn build(self) -> FlowEngine { let tasks = self.tasks.unwrap_or_else(|| crate::flow::task::default_registry()); FlowEngine { tasks } } } +/// 使用默认任务注册表构建引擎 impl Default for FlowEngine { fn default() -> Self { Self { tasks: crate::flow::task::default_registry() } } } - +/// 友好展示:以 message 字段为主进行字符串输出 impl std::fmt::Display for DriveError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{}", self.message) } } - impl std::error::Error for DriveError {} \ No newline at end of file diff --git a/backend/src/flow/executors/condition.rs b/backend/src/flow/executors/condition.rs index 26b3848..f2bd3d4 100644 --- a/backend/src/flow/executors/condition.rs +++ b/backend/src/flow/executors/condition.rs @@ -166,7 +166,7 @@ pub(crate) fn resolve_value(ctx: &serde_json::Value, v: &serde_json::Value) -> R "expression" => { let expr = v.get("content").and_then(|x| x.as_str()).unwrap_or(""); if expr.trim().is_empty() { return Ok(V::Null); } - Ok(crate::flow::engine::eval_rhai_expr_json(expr, ctx).unwrap_or_else(|_| V::Null)) + Ok(crate::flow::executors::script_rhai::eval_rhai_expr_json(expr, ctx).unwrap_or_else(|_| V::Null)) } _ => Ok(V::Null), } diff --git a/backend/src/flow/executors/script_rhai.rs b/backend/src/flow/executors/script_rhai.rs index 5843130..b7c6d90 100644 --- a/backend/src/flow/executors/script_rhai.rs +++ b/backend/src/flow/executors/script_rhai.rs @@ -6,20 +6,142 @@ use tracing::{debug, info}; use anyhow::anyhow; use crate::flow::domain::{NodeDef, NodeId}; -use crate::flow::engine::eval_rhai_expr_json; use crate::flow::task::Executor; +// ---- Rhai 引擎与表达式评估(迁移自 engine.rs) ---- +use std::cell::RefCell; +use std::collections::HashMap; +use regex::Regex; +use rhai::{AST, Engine}; + +const AST_CACHE_LIMIT: usize = 1024; + +fn contains(s: &str, sub: &str) -> bool { s.contains(sub) } +fn starts_with(s: &str, prefix: &str) -> bool { s.starts_with(prefix) } +fn ends_with(s: &str, suffix: &str) -> bool { s.ends_with(suffix) } + +fn is_empty(v: rhai::Dynamic) -> bool { + if v.is_unit() { return true; } + if let Some(s) = v.clone().try_cast::() { return s.is_empty(); } + if let Some(a) = v.clone().try_cast::() { return a.is_empty(); } + if let Some(m) = v.clone().try_cast::() { return m.is_empty(); } + false +} +fn not_empty(v: rhai::Dynamic) -> bool { !is_empty(v) } + +thread_local! { + static RHAI_ENGINE: RefCell = RefCell::new({ + let mut eng = Engine::new(); + eng.set_max_operations(10_000_000); + eng.set_strict_variables(true); + eng.register_fn("regex_match", regex_match); + eng.register_fn("contains", contains); + eng.register_fn("starts_with", starts_with); + eng.register_fn("ends_with", ends_with); + eng.register_fn("is_empty", is_empty); + eng.register_fn("not_empty", not_empty); + eng + }); + + static AST_CACHE: RefCell> = RefCell::new(HashMap::new()); + static REGEX_CACHE: RefCell> = RefCell::new(HashMap::new()); +} + +/// 正则匹配:带线程本地缓存 +fn regex_match(s: &str, pat: &str) -> bool { + let compiled = REGEX_CACHE.with(|c| { + let mut m = c.borrow_mut(); + if let Some(re) = m.get(pat) { return Some(re.clone()); } + match Regex::new(pat) { + Ok(re) => { m.insert(pat.to_string(), re.clone()); Some(re) } + Err(_) => None, + } + }); + compiled.map(|re| re.is_match(s)).unwrap_or(false) +} + +#[derive(Debug, Clone)] +pub enum RhaiExecError { + Compile { message: String }, + Runtime { message: String }, + Serde { message: String }, +} + +impl std::fmt::Display for RhaiExecError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + RhaiExecError::Compile { message } => write!(f, "compile error: {}", message), + RhaiExecError::Runtime { message } => write!(f, "runtime error: {}", message), + RhaiExecError::Serde { message } => write!(f, "serde error: {}", message), + } + } +} +impl std::error::Error for RhaiExecError {} + +/// 评估 Rhai 表达式(bool 版本),出错时返回 false +pub(crate) fn eval_rhai_expr_bool(expr: &str, ctx: &serde_json::Value) -> bool { + let mut scope = rhai::Scope::new(); + let dyn_ctx = match rhai::serde::to_dynamic(ctx.clone()) { Ok(d) => d, Err(_) => rhai::Dynamic::UNIT }; + scope.push_dynamic("ctx", dyn_ctx); + + if let Some(ast) = AST_CACHE.with(|c| c.borrow().get(expr).cloned()) { + return RHAI_ENGINE.with(|eng| eng.borrow().eval_ast_with_scope::(&mut scope, &ast).unwrap_or(false)); + } + + match RHAI_ENGINE.with(|eng| eng.borrow().compile_with_scope(&mut scope, expr)) { + Ok(ast) => { + AST_CACHE.with(|c| { + let mut cache = c.borrow_mut(); + if cache.len() > AST_CACHE_LIMIT { cache.clear(); } + cache.insert(expr.to_string(), ast.clone()); + }); + RHAI_ENGINE.with(|eng| eng.borrow().eval_ast_with_scope::(&mut scope, &ast).unwrap_or(false)) + } + Err(_) => false, + } +} + +/// 评估 Rhai 表达式并返回 serde_json::Value +pub(crate) fn eval_rhai_expr_json(expr: &str, ctx: &serde_json::Value) -> Result { + let mut scope = rhai::Scope::new(); + let dyn_ctx = match rhai::serde::to_dynamic(ctx.clone()) { Ok(d) => d, Err(e) => { + return Err(RhaiExecError::Serde { message: e.to_string() }); + } }; + scope.push_dynamic("ctx", dyn_ctx); + + let eval = |ast: &AST, scope: &mut rhai::Scope| -> Result { + RHAI_ENGINE.with(|eng| { + eng.borrow() + .eval_ast_with_scope::(scope, ast) + .map_err(|e| RhaiExecError::Runtime { message: e.to_string() }) + .and_then(|d| rhai::serde::from_dynamic(&d).map_err(|e| RhaiExecError::Serde { message: e.to_string() })) + }) + }; + + if let Some(ast) = AST_CACHE.with(|c| c.borrow().get(expr).cloned()) { + return eval(&ast, &mut scope); + } + + match RHAI_ENGINE.with(|eng| eng.borrow().compile_with_scope(&mut scope, expr)) { + Ok(ast) => { + AST_CACHE.with(|c| { + let mut cache = c.borrow_mut(); + if cache.len() > AST_CACHE_LIMIT { cache.clear(); } + cache.insert(expr.to_string(), ast.clone()); + }); + eval(&ast, &mut scope) + } + Err(e) => Err(RhaiExecError::Compile { message: e.to_string() }), + } +} + #[derive(Default)] pub struct ScriptRhaiTask; /// 截断长字符串(去掉换行),用于日志预览 fn truncate_str(s: &str, max: usize) -> String { let s = s.replace(['\n', '\r'], " "); - if s.len() <= max { - s - } else { - format!("{}…", &s[..max]) - } + if s.len() <= max { s } else { format!("{}…", &s[..max]) } } /// 对比两个 JSON(仅浅层),返回 (新增字段, 删除字段, 修改字段) @@ -30,26 +152,16 @@ fn shallow_diff(before: &Value, after: &Value) -> (Vec, Vec, Vec let mut modified = Vec::new(); let (Some(bm), Some(am)) = (before.as_object(), after.as_object()) else { - if before != after { - modified.push("".to_string()); - } + if before != after { modified.push("".to_string()); } return (added, removed, modified); }; let bkeys: BTreeSet<_> = bm.keys().cloned().collect(); let akeys: BTreeSet<_> = am.keys().cloned().collect(); - for k in akeys.difference(&bkeys) { - added.push(k.to_string()); - } - for k in bkeys.difference(&akeys) { - removed.push(k.to_string()); - } - for k in akeys.intersection(&bkeys) { - if bm.get(k) != am.get(k) { - modified.push(k.to_string()); - } - } + for k in akeys.difference(&bkeys) { added.push(k.to_string()); } + for k in bkeys.difference(&akeys) { removed.push(k.to_string()); } + for k in akeys.intersection(&bkeys) { if bm.get(k) != am.get(k) { modified.push(k.to_string()); } } (added, removed, modified) } diff --git a/backend/src/flow/executors/variable.rs b/backend/src/flow/executors/variable.rs index 42c4b82..e381c61 100644 --- a/backend/src/flow/executors/variable.rs +++ b/backend/src/flow/executors/variable.rs @@ -5,7 +5,7 @@ use tracing::info; // crate use crate::flow::domain::{NodeDef, NodeId}; -use crate::flow::engine::eval_rhai_expr_json; +use crate::flow::executors::script_rhai::eval_rhai_expr_json; use crate::flow::task::Executor; #[derive(Default)] diff --git a/backend/src/services/flow_service.rs b/backend/src/services/flow_service.rs index cb81405..36f5847 100644 --- a/backend/src/services/flow_service.rs +++ b/backend/src/services/flow_service.rs @@ -301,6 +301,8 @@ async fn run_internal( .or_else(|| design.get("execution_mode").and_then(|v| v.as_str())) .unwrap_or("sync"); exec_mode = parse_execution_mode(mode_str); + let bounded_limit = design.get("concurrencyLimit").and_then(|v| v.as_u64()).map(|x| x as usize); + let _ = bounded_limit; (chain_from_json, ctx) } else { let dsl = match serde_yaml::from_str::(&doc.yaml) { @@ -344,7 +346,12 @@ async fn run_internal( // 执行 let drive_res = engine - .drive(&chain, ctx, DriveOptions { execution_mode: exec_mode.clone(), event_tx, ..Default::default() }) + .drive(&chain, ctx, DriveOptions { + execution_mode: exec_mode.clone(), + event_tx, + bounded_limit: if matches!(exec_mode, ExecutionMode::AsyncBounded) { design_concurrency_limit(&doc.design_json) } else { None }, + ..Default::default() + }) .await; match drive_res { @@ -446,6 +453,16 @@ fn merge_json(a: &mut serde_json::Value, b: &serde_json::Value) { fn parse_execution_mode(s: &str) -> ExecutionMode { match s.to_ascii_lowercase().as_str() { "async" | "async_fire_and_forget" | "fire_and_forget" => ExecutionMode::AsyncFireAndForget, + "queued" | "queue" => ExecutionMode::AsyncQueued, + "bounded" | "parallel_bounded" | "bounded_parallel" => ExecutionMode::AsyncBounded, _ => ExecutionMode::Sync, } +} + +fn design_concurrency_limit(design_json: &Option) -> Option { + design_json + .as_ref() + .and_then(|d| d.get("concurrencyLimit")) + .and_then(|v| v.as_u64()) + .map(|x| x as usize) } \ No newline at end of file diff --git a/docs/flow/code_js1.json b/docs/flow/code_js1.json new file mode 100644 index 0000000..dc06403 --- /dev/null +++ b/docs/flow/code_js1.json @@ -0,0 +1,336 @@ +{ + "nodes": [ + { + "id": "start_0", + "type": "start", + "meta": { + "position": { + "x": 180, + "y": 189.8 + } + }, + "data": { + "title": "Start", + "outputs": { + "type": "object", + "properties": {}, + "required": [] + } + } + }, + { + "id": "end_0", + "type": "end", + "meta": { + "position": { + "x": 2940, + "y": 189.8 + } + }, + "data": { + "title": "End" + } + }, + { + "id": "variable_zOb3P", + "type": "variable", + "meta": { + "position": { + "x": 2020, + "y": 179.8 + } + }, + "data": { + "title": "Variable_1", + "assign": [ + { + "operator": "declare", + "left": "jss", + "right": { + "type": "constant", + "content": "${\"user_n\"}", + "schema": { + "type": "string" + } + } + } + ], + "outputs": { + "type": "object", + "required": [], + "properties": { + "jss": { + "type": "string" + } + } + } + } + }, + { + "id": "code_y71Sd", + "type": "javascript", + "meta": { + "position": { + "x": 640, + "y": 162.3 + } + }, + "data": { + "title": "JS", + "inputsValues": { + "input": { + "type": "constant", + "content": "", + "schema": { + "type": "string" + }, + "extra": { + "index": 0 + } + } + }, + "script": { + "language": "javascript", + "content": "ctx.vri=\"usertest\"; ctx[\"user_n\"]=\"user_nuser_n\" for (let i = 0; i < 5; i++) { //console.log(`第 ${i} 次循环`); ctx[i]=\"v\"+i; }" + }, + "outputs": { + "type": "object", + "properties": { + "key0": { + "type": "string" + }, + "key1": { + "type": "array", + "items": { + "type": "string" + } + }, + "key2": { + "type": "object", + "properties": { + "key21": { + "type": "string" + } + } + } + } + }, + "inputs": { + "type": "object", + "properties": { + "input": { + "type": "string" + } + } + } + } + }, + { + "id": "http_0IIt-", + "type": "http", + "meta": { + "position": { + "x": 1598.880308880309, + "y": -317.18146718146716 + } + }, + "data": { + "title": "HTTP_1", + "api": { + "method": "GET", + "url": { + "type": "template", + "content": "https://account.aliyun.com" + } + }, + "body": { + "bodyType": "JSON" + }, + "outputs": { + "type": "object", + "properties": { + "body": { + "type": "string" + }, + "headers": { + "type": "object" + }, + "statusCode": { + "type": "integer" + } + } + }, + "timeout": { + "timeout": 10000, + "retryTimes": 1 + } + } + }, + { + "id": "variable_zy_Ae", + "type": "variable", + "meta": { + "position": { + "x": 2480, + "y": 179.8 + } + }, + "data": { + "title": "http响应", + "assign": [ + { + "operator": "declare", + "left": "http_resp", + "right": { + "type": "ref", + "content": [ + "http_0IIt-", + "headers" + ] + } + } + ], + "outputs": { + "type": "object", + "required": [], + "properties": { + "http_resp": { + "type": "object", + "required": [], + "properties": {} + } + } + } + } + }, + { + "id": "code_iyMNK", + "type": "script", + "meta": { + "position": { + "x": 1100, + "y": 162.3 + } + }, + "data": { + "title": "rhai", + "inputsValues": { + "input": { + "type": "constant", + "content": "" + } + }, + "script": { + "language": "rhai", + "content": "// 修改字段 ctx.addr_name = \"Alice\"; ctx.count = 10; // 添加新字段 ctx.city = \"Beijing\"; // 还可以用下标方式 ctx[\"extra\"] = 123;" + }, + "outputs": { + "type": "object", + "properties": { + "key0": { + "type": "string" + }, + "key1": { + "type": "array", + "items": { + "type": "string" + } + }, + "key2": { + "type": "object", + "properties": { + "key21": { + "type": "string" + } + } + } + }, + "required": [] + }, + "inputs": { + "type": "object", + "properties": { + "input": { + "type": "string" + } + } + } + } + }, + { + "id": "db_LMgXg", + "type": "db", + "meta": { + "position": { + "x": 1606.042471042471, + "y": 253.8 + } + }, + "data": { + "title": "DB_1", + "db": { + "sql": { + "type": "template", + "content": "SELECT t.* FROM udmin.departments t" + }, + "params": [], + "outputKey": "db_response", + "connection": { + "driver": "mysql", + "mode": "fields", + "database": "udmin", + "host": "127.0.0.1", + "port": 3306, + "username": "root", + "password": "123456" + }, + "output": { + "mode": "rows" + } + }, + "outputs": { + "type": "object", + "properties": { + "db_response": { + "type": "object" + } + } + } + } + } + ], + "edges": [ + { + "sourceNodeID": "start_0", + "targetNodeID": "code_y71Sd" + }, + { + "sourceNodeID": "variable_zy_Ae", + "targetNodeID": "end_0" + }, + { + "sourceNodeID": "http_0IIt-", + "targetNodeID": "variable_zOb3P" + }, + { + "sourceNodeID": "db_LMgXg", + "targetNodeID": "variable_zOb3P" + }, + { + "sourceNodeID": "variable_zOb3P", + "targetNodeID": "variable_zy_Ae" + }, + { + "sourceNodeID": "code_y71Sd", + "targetNodeID": "code_iyMNK" + }, + { + "sourceNodeID": "code_iyMNK", + "targetNodeID": "http_0IIt-" + }, + { + "sourceNodeID": "code_iyMNK", + "targetNodeID": "db_LMgXg" + } + ] +} \ No newline at end of file diff --git a/docs/flow/code_js1_group.json b/docs/flow/code_js1_group.json new file mode 100644 index 0000000..880779f --- /dev/null +++ b/docs/flow/code_js1_group.json @@ -0,0 +1,355 @@ +{ + "nodes": [ + { + "id": "start_0", + "type": "start", + "meta": { + "position": { + "x": 180, + "y": 189.8 + } + }, + "data": { + "title": "Start", + "outputs": { + "type": "object", + "properties": {}, + "required": [] + } + } + }, + { + "id": "end_0", + "type": "end", + "meta": { + "position": { + "x": 2940, + "y": 189.8 + } + }, + "data": { + "title": "End" + } + }, + { + "id": "variable_zOb3P", + "type": "variable", + "meta": { + "position": { + "x": 2020, + "y": 179.8 + } + }, + "data": { + "title": "Variable_1", + "assign": [ + { + "operator": "declare", + "left": "jss", + "right": { + "type": "constant", + "content": "${\"user_n\"}", + "schema": { + "type": "string" + } + } + } + ], + "outputs": { + "type": "object", + "required": [], + "properties": { + "jss": { + "type": "string" + } + } + } + } + }, + { + "id": "code_y71Sd", + "type": "javascript", + "meta": { + "position": { + "x": 640, + "y": 162.3 + } + }, + "data": { + "title": "JS", + "inputsValues": { + "input": { + "type": "constant", + "content": "", + "schema": { + "type": "string" + }, + "extra": { + "index": 0 + } + } + }, + "script": { + "language": "javascript", + "content": "ctx.vri=\"usertest\"; ctx[\"user_n\"]=\"user_nuser_n\" for (let i = 0; i < 5; i++) { //console.log(`第 ${i} 次循环`); ctx[i]=\"v\"+i; }" + }, + "outputs": { + "type": "object", + "properties": { + "key0": { + "type": "string" + }, + "key1": { + "type": "array", + "items": { + "type": "string" + } + }, + "key2": { + "type": "object", + "properties": { + "key21": { + "type": "string" + } + } + } + } + }, + "inputs": { + "type": "object", + "properties": { + "input": { + "type": "string" + } + } + } + } + }, + { + "id": "variable_zy_Ae", + "type": "variable", + "meta": { + "position": { + "x": 2480, + "y": 179.8 + } + }, + "data": { + "title": "http响应", + "assign": [ + { + "operator": "declare", + "left": "http_resp", + "right": { + "type": "ref", + "content": [ + "http_0IIt-", + "headers" + ] + } + } + ], + "outputs": { + "type": "object", + "required": [], + "properties": { + "http_resp": { + "type": "object", + "required": [], + "properties": {} + } + } + } + } + }, + { + "id": "code_iyMNK", + "type": "script", + "meta": { + "position": { + "x": 1100, + "y": 162.3 + } + }, + "data": { + "title": "rhai", + "inputsValues": { + "input": { + "type": "constant", + "content": "" + } + }, + "script": { + "language": "rhai", + "content": "// 修改字段 ctx.addr_name = \"Alice\"; ctx.count = 10; // 添加新字段 ctx.city = \"Beijing\"; // 还可以用下标方式 ctx[\"extra\"] = 123;" + }, + "outputs": { + "type": "object", + "properties": { + "key0": { + "type": "string" + }, + "key1": { + "type": "array", + "items": { + "type": "string" + } + }, + "key2": { + "type": "object", + "properties": { + "key21": { + "type": "string" + } + } + } + }, + "required": [] + }, + "inputs": { + "type": "object", + "properties": { + "input": { + "type": "string" + } + } + } + } + }, + { + "id": "group_o4dbz", + "type": "group", + "meta": { + "position": { + "x": 0, + "y": 0 + } + }, + "data": { + "color": "Green", + "title": "Group_2", + "parentID": "root", + "blockIDs": [ + "http_0IIt-", + "db_LMgXg" + ] + } + }, + { + "id": "http_0IIt-", + "type": "http", + "meta": { + "position": { + "x": 1598.880308880309, + "y": -317.18146718146716 + } + }, + "data": { + "title": "HTTP_1", + "api": { + "method": "GET", + "url": { + "type": "template", + "content": "https://account.aliyun.com" + } + }, + "body": { + "bodyType": "JSON" + }, + "outputs": { + "type": "object", + "properties": { + "body": { + "type": "string" + }, + "headers": { + "type": "object" + }, + "statusCode": { + "type": "integer" + } + } + }, + "timeout": { + "timeout": 10000, + "retryTimes": 1 + } + } + }, + { + "id": "db_LMgXg", + "type": "db", + "meta": { + "position": { + "x": 1598.880308880309, + "y": 179.8 + } + }, + "data": { + "title": "DB_1", + "db": { + "sql": { + "type": "template", + "content": "SELECT t.* FROM udmin.departments t" + }, + "params": [], + "outputKey": "db_response", + "connection": { + "driver": "mysql", + "mode": "fields", + "database": "udmin", + "host": "127.0.0.1", + "port": 3306, + "username": "root", + "password": "123456" + }, + "output": { + "mode": "rows" + } + }, + "outputs": { + "type": "object", + "properties": { + "db_response": { + "type": "object" + } + } + } + } + } + ], + "edges": [ + { + "sourceNodeID": "start_0", + "targetNodeID": "code_y71Sd" + }, + { + "sourceNodeID": "variable_zy_Ae", + "targetNodeID": "end_0" + }, + { + "sourceNodeID": "http_0IIt-", + "targetNodeID": "variable_zOb3P" + }, + { + "sourceNodeID": "db_LMgXg", + "targetNodeID": "variable_zOb3P" + }, + { + "sourceNodeID": "variable_zOb3P", + "targetNodeID": "variable_zy_Ae" + }, + { + "sourceNodeID": "code_y71Sd", + "targetNodeID": "code_iyMNK" + }, + { + "sourceNodeID": "code_iyMNK", + "targetNodeID": "http_0IIt-" + }, + { + "sourceNodeID": "code_iyMNK", + "targetNodeID": "db_LMgXg" + } + ] +} \ No newline at end of file diff --git a/docs/flow_architecture.md b/docs/flow_architecture.md new file mode 100644 index 0000000..4575936 --- /dev/null +++ b/docs/flow_architecture.md @@ -0,0 +1,227 @@ +# Flow 架构与执行图(udmin) + +## 模块架构 +```mermaid +graph LR + subgraph flow + dsl[flow/dsl] + domain[flow/domain] + context[flow/context] + engine[flow/engine] + task[flow/task] + executors[flow/executors/*] + mappers[flow/mappers] + log_handler[flow/log_handler] + end + + subgraph services + svc_flow[services/flow_service] + svc_logs[services/flow_run_log_service] + end + + subgraph routes + r_flows[routes/flows] + r_run_logs[routes/flow_run_logs] + end + + subgraph middlewares + mw_sse[middlewares/sse] + mw_ws[middlewares/ws] + mw_jwt[middlewares/jwt] + mw_http[middlewares/http_client] + end + + subgraph infra + db[db] + redis[redis] + models_flow[models/flow] + models_run[models/flow_run_log] + end + + r_flows --> svc_flow + r_run_logs --> svc_logs + svc_flow --> dsl + svc_flow --> mappers + dsl --> domain + mappers --> context + svc_flow --> engine + engine --> task + task --> executors + executors --> mw_http + engine --> log_handler + log_handler --> svc_logs + mw_sse -.events.-> r_flows + mw_ws -.events.-> r_flows + svc_flow --> models_flow + svc_logs --> models_run + models_flow --> db + models_run --> db + mw_jwt --> r_flows + redis -.token check.-> mw_jwt +``` + +引用: +- DSL/Design 构建 `backend/src/flow/dsl.rs:60-93,138-170,172-203,246-303` +- 领域模型 `backend/src/flow/domain.rs:20-28,31-36,39-47,60-68` +- 上下文与事件 `backend/src/flow/context.rs:29-45` +- 引擎驱动 `backend/src/flow/engine.rs:117-209,213-577` +- 编排服务 `backend/src/services/flow_service.rs:285-305,342-349,351-365,366-399` +- 路由入口 `backend/src/routes/flows.rs:26-35,101-133` +- 运行日志服务 `backend/src/services/flow_run_log_service.rs:46-63,74-131` + +## 请求/运行编排时序 +```mermaid +sequenceDiagram + participant C as Client + participant R as routes.flows + participant S as services.flow_service + participant D as flow.dsl & mappers + participant E as FlowEngine + participant L as flow.log_handler + participant M as models(flow/flow_run_log) + participant SSE as middlewares.sse/ws + + C->>R: POST /flows/{id}/run + R->>S: run(id, input) + S->>M: get flow doc + alt design_json + S->>D: chain_from_design_json(design) + D-->>S: ChainDef + S->>D: ctx_from_design_json(design) + D-->>S: ctx supplement + else YAML + S->>D: parse FlowDSL (YAML) + D-->>S: ChainDef + end + S->>E: drive(chain, ctx, opts) + E-->>S: (ctx, logs) + S->>L: log_success/log_error + L->>M: insert flow_run_log + E-->>SSE: emit node/done/error (stream) + S-->>R: RunResult(ok, ctx, logs) + R-->>C: ApiResponse +``` + +关键实现: +- 解析与补充 `backend/src/services/flow_service.rs:285-305` +- 引擎驱动 `backend/src/services/flow_service.rs:342-349` +- 日志入库与事件推送 `backend/src/flow/log_handler.rs` + +## 引擎执行图(含执行模式) +```mermaid +flowchart TD + A[选择起点: Start/入度=0/首节点] --> B{合流屏障} + B -- 未达成/去重 --> Z[跳过当前节点] + B -- 达成 --> C{任务?} + C -- 否 --> D[选择后继] + C -- 是 --> E{执行模式} + E -- Sync --> ES[同步执行任务; 写回 ctx] + E -- Async(Fire&Forget) --> EA[异步执行; 追踪句柄] + E -- Queued(组队列) --> EQ[组信号量=1; 获取后执行] + E -- Bounded(限并发) --> EB[组信号量=n; 获取后执行] + ES --> D + EA --> D + EQ --> D + EB --> D + D -->|条件节点| F{条件评估 JSON/Rhai} + F -- 通过 --> G[进入后继] + F -- 不通过/无匹配 --> H[挑选无条件后继或停止] + D -->|非条件| G + G --> I{是否 End} + I -- 是 --> J[FlowEnd 等待组策略; 推送事件; 结束] + I -- 否 --> K{多分支?} + K -- 单分支 --> L[计算组离开; BranchExit 等待] + K -- 多分支 --> M[并行驱动其他分支] + L --> A + M --> L --> A +``` + +参考: +- 模式枚举与选项 `backend/src/flow/context.rs:9-15,29-45` +- 合流与分支 `backend/src/flow/engine.rs:245-268,502-574` +- 组等待策略 `backend/src/flow/engine.rs:62-115,559-573` + +## DSL/Design 转换 +```mermaid +flowchart LR + X0[Design JSON 输入] --> X1[校验: 唯一ID/Start&End/合法边] + X1 --> X2[兼容 sourcePortID → source_port_id] + X2 --> X3[推断节点 kind/name/task] + X3 --> X4[解析组: members/parentID/awaitPolicy] + X4 --> X5[组装条件: AND组/端口匹配/启发式] + X5 --> X6["生成 ChainDef: nodes links groups"] + X0 --> Y1[mappers: 节点配置提取] + Y1 --> Y2[ctx.nodes..] +``` + +实现位置:`backend/src/flow/dsl.rs:138-170,172-203,246-303` 与 `backend/src/flow/mappers.rs:27-95` + +## 数据模型(ER) +```mermaid +erDiagram + FLOWS { + bigint id PK + varchar name + text yaml + text design_json + varchar code + varchar remark + timestamp created_at + timestamp updated_at + } + FLOW_RUN_LOGS { + bigint id PK + bigint flow_id FK + varchar flow_code + text input + text output + bool ok + text logs + bigint user_id + varchar username + timestamp started_at + bigint duration_ms + timestamp created_at + } + FLOWS ||--o{ FLOW_RUN_LOGS : has +``` + +## 事件与日志通道 +```mermaid +graph LR + engine_push[engine.push_and_emit] --> sse_emit[middlewares.sse.emit_*] + engine_push --> ws_emit[middlewares.ws.forward] + sse_emit --> client[前端] + ws_emit --> client + engine_push --> log_handler_db[DatabaseLogHandler] + engine_push --> log_handler_sse[SseLogHandler] + log_handler_db --> run_log_service + log_handler_sse --> run_log_service + run_log_service --> db +``` + +## 执行器生态 +```mermaid +graph LR + subgraph executors + http + db + variable + script_rhai + script_js + script_python + condition + end + ctx_nodes[ctx.nodes..*] --> executors + ctx_global[ctx.* 顶层] --> executors + http --> http_out[(http_response)] + db --> db_out[(db_response)] + variable --> var_out[(ctx键值写回)] + condition --> route[分支选择] +``` + +注意点:写回策略与幂等,参考 `backend/src/flow/engine.rs:294-375` + +--- + +以上图示与说明用于快速理解 Flow 的完整链路:从 DSL/Design 解析到引擎驱动与事件/日志通道,再到数据持久化与执行器生态。结合上文的代码引用,可在 IDE 中跳转到具体实现进行深度阅读。 \ No newline at end of file diff --git a/frontend/src/flows/components/testrun/testrun-panel/index.tsx b/frontend/src/flows/components/testrun/testrun-panel/index.tsx index 6b90065..b1c5b01 100644 --- a/frontend/src/flows/components/testrun/testrun-panel/index.tsx +++ b/frontend/src/flows/components/testrun/testrun-panel/index.tsx @@ -7,7 +7,7 @@ import { FC, useContext, useEffect, useRef, useState } from 'react'; import classnames from 'classnames'; import { useService, I18n } from '@flowgram.ai/free-layout-editor'; -import { Button, SideSheet, Switch, Tag } from '@douyinfe/semi-ui'; +import { Button, SideSheet, Switch, Tag, Select, InputNumber } from '@douyinfe/semi-ui'; import { IconClose, IconPlay, IconSpin } from '@douyinfe/semi-icons'; import { TestRunJsonInput } from '../testrun-json-input'; @@ -15,6 +15,7 @@ import { TestRunForm } from '../testrun-form'; import { NodeStatusGroup } from '../node-status-bar/group'; // 改为使用后端运行服务 import { CustomService } from '../../../services'; +import { tr } from '../../../../utils/i18n'; import { SidebarContext } from '../../../context'; import { IconCancel } from '../../../assets/icon-cancel'; @@ -76,6 +77,17 @@ export const TestRunSidePanel: FC = ({ visible, onCancel localStorage.setItem('testrun-input-json-mode', JSON.stringify(checked)); }; + // 执行模式与并发上限(仅保存到设计 JSON,不直接随运行请求发送) + const [executionMode, setExecutionMode] = useState<'sync'|'async'|'queued'|'bounded'>(() => { + const saved = localStorage.getItem('testrun-execution-mode'); + return (saved === 'async' || saved === 'queued' || saved === 'bounded') ? (saved as any) : 'sync'; + }); + const [concurrencyLimit, setConcurrencyLimit] = useState(() => { + const saved = localStorage.getItem('testrun-concurrency-limit'); + const num = saved ? Number(saved) : NaN; + return Number.isFinite(num) && num > 0 ? num : undefined; + }); + const extractErrorMsg = (logs: string[] | undefined): string | undefined => { if (!logs || logs.length === 0) return undefined; const patterns = [/failed/i, /error/i, /panic/i]; @@ -97,9 +109,9 @@ export const TestRunSidePanel: FC = ({ visible, onCancel setRunning(true); try { // 运行前保存(静默),确保后端 YAML 与编辑器一致;若保存失败则不继续运行 - const saved = await customService.save({ silent: true }); + const saved = await customService.save({ silent: true, executionMode, concurrencyLimit }); if (!saved) { - setErrors([I18n.t('Save failed, cannot run')]); + setErrors([tr('Save failed, cannot run')]); return; } @@ -127,7 +139,7 @@ export const TestRunSidePanel: FC = ({ visible, onCancel } }, onError: (evt) => { - const msg = evt.message || I18n.t('Run failed'); + const msg = evt.message || tr('Run failed'); setErrors((prev) => [...(prev || []), msg]); }, onDone: (evt) => { @@ -144,7 +156,7 @@ export const TestRunSidePanel: FC = ({ visible, onCancel if (evt.logs && evt.logs.length) setStreamLogs((prev: string[]) => [...prev, ...evt.logs!]); }, onError: (evt) => { - const msg = evt.message || I18n.t('Run failed'); + const msg = evt.message || tr('Run failed'); setErrors((prev) => [...(prev || []), msg]); }, onDone: (evt) => { @@ -165,7 +177,7 @@ export const TestRunSidePanel: FC = ({ visible, onCancel setResult(finished as any); } else { // 流结束但未收到 done 事件,给出提示 - setErrors((prev) => [...(prev || []), I18n.t('Stream terminated without completion')]); + setErrors((prev) => [...(prev || []), tr('Stream terminated without completion')]); } } else { // 普通 HTTP 一次性运行 @@ -174,20 +186,20 @@ export const TestRunSidePanel: FC = ({ visible, onCancel if (runRes) { if ((runRes as any).ok === false) { setResult(runRes as any); - const err = extractErrorMsg((runRes as any).logs) || I18n.t('Run failed'); + const err = extractErrorMsg((runRes as any).logs) || tr('Run failed'); setErrors([err]); } else { setResult(runRes as any); } } else { - setErrors([I18n.t('Run failed')]); + setErrors([tr('Run failed')]); } } catch (e: any) { - setErrors([e?.message || I18n.t('Run failed')]); + setErrors([e?.message || tr('Run failed')]); } } } catch (e: any) { - setErrors([e?.message || I18n.t('Run failed')]); + setErrors([e?.message || tr('Run failed')]); } finally { setRunning(false); cancelRef.current = null; @@ -226,7 +238,7 @@ export const TestRunSidePanel: FC = ({ visible, onCancel const renderRunning = (
-
{I18n.t('Running...')}
+
{tr('Running...')}
{/* 实时输出(仅流式模式显示) */} {streamMode && ( <> @@ -246,9 +258,9 @@ export const TestRunSidePanel: FC = ({ visible, onCancel const renderStatus = (
- {result?.ok === true && {I18n.t('Success')}} + {result?.ok === true && {tr('Success')}} {(errors?.length || result?.ok === false) && ( - {I18n.t('Failed')} + {tr('Failed')} )}
); @@ -256,9 +268,9 @@ export const TestRunSidePanel: FC = ({ visible, onCancel const renderForm = (
-
{I18n.t('Input Form')}
+
{tr('Input Form')}
-
{I18n.t('JSON Mode')}
+
{tr('JSON Mode')}
setInputJSONMode(checked)} @@ -266,16 +278,44 @@ export const TestRunSidePanel: FC = ({ visible, onCancel />
-
{I18n.t('Streaming Mode')}
+
{tr('Streaming Mode')}
setStreamMode(checked)} size="small" />
+
+
{tr('Execution Mode')}
+ +
+ {executionMode === 'bounded' && ( +
+
{tr('Concurrency Limit')}
+ { const num = Number(v); const next = Number.isFinite(num) && num > 0 ? num : undefined; setConcurrencyLimit(next); localStorage.setItem('testrun-concurrency-limit', String(next ?? '')); }} + size="small" + min={1} + max={128} + style={{ width: 120 }} + /> +
+ )} {streamMode && (
-
WS
+
{tr('WS')}
setUseWS(checked)} @@ -298,16 +338,16 @@ export const TestRunSidePanel: FC = ({ visible, onCancel {/* 运行中(流式)时,直接在表单区域下方展示实时输出,而不是覆盖整块内容 */} {streamMode && isRunning && ( <> - - + + )} {/* 展示后端返回的执行信息:仅在非流式或流式已结束时显示,避免与实时输出重复 */} {(!streamMode || !isRunning) && ( <> - - + + )}
@@ -326,13 +366,13 @@ export const TestRunSidePanel: FC = ({ visible, onCancel [styles.default]: !isRunning, })} > - {isRunning ? I18n.t('Running...') : I18n.t('Test Run')} + {isRunning ? tr('Running...') : tr('Test Run')} ); return ( = ({ visible, onCancel >
-
{I18n.t('Test Run')}
+
{tr('Test Run')}