From 446f63e02a05e3d9c2ea86b4b87f1491adb7b6ea Mon Sep 17 00:00:00 2001 From: ayou <550244300@qq.com> Date: Mon, 29 Sep 2025 22:35:47 +0800 Subject: [PATCH] =?UTF-8?q?refactor(script=5Frhai):=20=E9=87=8D=E6=9E=84?= =?UTF-8?q?=20Rhai=20=E8=84=9A=E6=9C=AC=E6=89=A7=E8=A1=8C=E9=80=BB?= =?UTF-8?q?=E8=BE=91=E5=B9=B6=E4=BC=98=E5=8C=96=E4=BB=A3=E7=A0=81=E7=BB=93?= =?UTF-8?q?=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 将文件脚本和 inline 脚本的执行逻辑统一到 exec_rhai_code 函数 - 优化 shallow_diff 函数的实现和可读性 - 提取 read_node_script_file 和 read_node_inline_script 辅助函数 - 清理冗余注释并重新组织导入语句 --- backend/src/flow/engine.rs | 17 +-- backend/src/flow/executors/script_python.rs | 4 - backend/src/flow/executors/script_rhai.rs | 152 ++++++++++---------- backend/src/middlewares/auth_guard.rs | 2 +- 4 files changed, 83 insertions(+), 92 deletions(-) diff --git a/backend/src/flow/engine.rs b/backend/src/flow/engine.rs index 85afe88..c681ee2 100644 --- a/backend/src/flow/engine.rs +++ b/backend/src/flow/engine.rs @@ -1,25 +1,22 @@ -// std +//! 流程执行引擎(engine.rs):驱动 ChainDef 流程图,支持同步/异步任务、条件路由、并发分支与 SSE 推送。 use std::cell::RefCell; use std::collections::HashMap; use std::time::Instant; -// third-party -use futures::future::join_all; -use regex::Regex; -use rhai::{AST, Engine}; -use tokio::sync::{Mutex, RwLock}; -use tracing::info; - -// crate use crate::flow::executors::condition::eval_condition_json; -// super use super::{ context::{DriveOptions, ExecutionMode}, domain::{ChainDef, NodeKind}, task::TaskRegistry, }; +use futures::future::join_all; +use regex::Regex; +use rhai::{AST, Engine}; +use tokio::sync::{Mutex, RwLock}; +use tracing::info; + // 结构体:紧随 use pub struct FlowEngine { pub tasks: TaskRegistry, diff --git a/backend/src/flow/executors/script_python.rs b/backend/src/flow/executors/script_python.rs index 12a5ec0..a6bf76b 100644 --- a/backend/src/flow/executors/script_python.rs +++ b/backend/src/flow/executors/script_python.rs @@ -1,12 +1,8 @@ -// std use std::time::Instant; - -// third-party use async_trait::async_trait; use serde_json::Value; use tracing::{debug, info}; -// crate use crate::flow::domain::{NodeDef, NodeId}; use crate::flow::task::Executor; diff --git a/backend/src/flow/executors/script_rhai.rs b/backend/src/flow/executors/script_rhai.rs index 6cccf53..5843130 100644 --- a/backend/src/flow/executors/script_rhai.rs +++ b/backend/src/flow/executors/script_rhai.rs @@ -1,14 +1,10 @@ -// std use std::fs; use std::time::Instant; - -// third-party use async_trait::async_trait; use serde_json::Value; use tracing::{debug, info}; use anyhow::anyhow; -// crate use crate::flow::domain::{NodeDef, NodeId}; use crate::flow::engine::eval_rhai_expr_json; use crate::flow::task::Executor; @@ -16,125 +12,127 @@ use crate::flow::task::Executor; #[derive(Default)] pub struct ScriptRhaiTask; +/// 截断长字符串(去掉换行),用于日志预览 fn truncate_str(s: &str, max: usize) -> String { - let s = s.replace('\n', " ").replace('\r', " "); - if s.len() <= max { s } else { format!("{}…", &s[..max]) } + let s = s.replace(['\n', '\r'], " "); + if s.len() <= max { + s + } else { + format!("{}…", &s[..max]) + } } +/// 对比两个 JSON(仅浅层),返回 (新增字段, 删除字段, 修改字段) fn shallow_diff(before: &Value, after: &Value) -> (Vec, Vec, Vec) { use std::collections::BTreeSet; let mut added = Vec::new(); let mut removed = Vec::new(); let mut modified = Vec::new(); + let (Some(bm), Some(am)) = (before.as_object(), after.as_object()) else { - if before != after { modified.push("".to_string()); } + if before != after { + modified.push("".to_string()); + } return (added, removed, modified); }; + let bkeys: BTreeSet<_> = bm.keys().cloned().collect(); let akeys: BTreeSet<_> = am.keys().cloned().collect(); - for k in akeys.difference(&bkeys) { added.push((*k).to_string()); } - for k in bkeys.difference(&akeys) { removed.push((*k).to_string()); } + + for k in akeys.difference(&bkeys) { + added.push(k.to_string()); + } + for k in bkeys.difference(&akeys) { + removed.push(k.to_string()); + } for k in akeys.intersection(&bkeys) { - let key = (*k).to_string(); - if bm.get(&key) != am.get(&key) { modified.push(key); } + if bm.get(k) != am.get(k) { + modified.push(k.to_string()); + } } (added, removed, modified) } -pub fn exec_rhai_file(node_id: &NodeId, path: &str, ctx: &mut Value) -> anyhow::Result<()> { - let start = Instant::now(); - let code = match fs::read_to_string(path) { - Ok(s) => s, - Err(e) => { - info!(target = "udmin.flow", node=%node_id.0, err=%e.to_string(), "script task: failed to read Rhai file"); - return Err(anyhow!("failed to read Rhai file: {}", e)); - } - }; - let script = code; +/// 核心执行逻辑:运行 Rhai 脚本,返回更新后的 ctx +fn exec_rhai_code(node_id: &NodeId, script: &str, ctx: &mut Value, source: &str) -> anyhow::Result<()> { if script.trim().is_empty() { - info!(target = "udmin.flow", node=%node_id.0, "script task: empty Rhai file, skip"); + info!(target = "udmin.flow", node=%node_id.0, source, "script_rhai task: empty script, skip"); return Ok(()); } - let preview = truncate_str(&script, 200); - debug!(target = "udmin.flow", node=%node_id.0, preview=%preview, "script task: will execute Rhai file"); + + let start = Instant::now(); + let preview = truncate_str(script, 200); + debug!(target = "udmin.flow", node=%node_id.0, source, preview=%preview, "script_rhai task: will execute script"); let before_ctx = ctx.clone(); let wrapped = format!("{{ {} ; ctx }}", script); - let res = eval_rhai_expr_json(&wrapped, ctx); - let dur_ms = start.elapsed().as_millis(); - match res { + + match eval_rhai_expr_json(&wrapped, ctx) { Ok(new_ctx) => { + let dur_ms = start.elapsed().as_millis(); let (added, removed, modified) = shallow_diff(&before_ctx, &new_ctx); *ctx = new_ctx; - info!(target = "udmin.flow", node=%node_id.0, ms=%dur_ms, added=%added.len(), removed=%removed.len(), modified=%modified.len(), "script task: Rhai file executed and ctx updated"); + + info!(target = "udmin.flow", node=%node_id.0, source, ms=%dur_ms, added=%added.len(), removed=%removed.len(), modified=%modified.len(), "script_rhai task: executed and ctx updated"); if !(added.is_empty() && removed.is_empty() && modified.is_empty()) { - debug!(target = "udmin.flow", node=%node_id.0, ?added, ?removed, ?modified, "script task: ctx shallow diff"); + debug!(target = "udmin.flow", node=%node_id.0, source, ?added, ?removed, ?modified, "script_rhai task: ctx shallow diff"); } + Ok(()) } Err(err) => { - info!(target = "udmin.flow", node=%node_id.0, ms=%dur_ms, preview=%preview, err=%err.to_string(), "script task: Rhai file execution failed, ctx unchanged"); - return Err(anyhow!("Rhai file execution failed: {}", err)); + let dur_ms = start.elapsed().as_millis(); + info!(target = "udmin.flow", node=%node_id.0, source, ms=%dur_ms, preview=%preview, err=%err.to_string(), "script_rhai task: execution failed, ctx unchanged"); + Err(anyhow!("Rhai script execution failed: {}", err)) } } - Ok(()) } +/// 读取节点配置里的脚本文件路径 fn read_node_script_file(ctx: &Value, node_id: &str) -> Option { - if let Some(nodes) = ctx.get("nodes").and_then(|v| v.as_object()) { - if let Some(m) = nodes.get(node_id).and_then(|v| v.get("scripts")).and_then(|v| v.as_object()) { - return m.get("rhai").and_then(|v| v.as_str()).map(|s| s.to_string()); - } - } - None + ctx.get("nodes") + .and_then(|v| v.get(node_id)) + .and_then(|n| n.get("scripts")) + .and_then(|v| v.get("rhai")) + .and_then(|v| v.as_str()) + .map(|s| s.to_string()) +} + +/// 读取节点配置里的 inline 脚本 +fn read_node_inline_script(ctx: &Value, node_id: &str) -> Option { + ctx.get("nodes") + .and_then(|nodes| nodes.get(node_id)) + .and_then(|n| n.get("script").or_else(|| n.get("expr"))) + .and_then(|v| match v { + Value::String(s) => Some(s.clone()), + Value::Object(m) => m + .get("script") + .or_else(|| m.get("expr")) + .and_then(|x| x.as_str()) + .map(|s| s.to_string()), + _ => None, + }) + .or_else(|| ctx.get("script").and_then(|v| v.as_str()).map(|s| s.to_string())) + .or_else(|| ctx.get("expr").and_then(|v| v.as_str()).map(|s| s.to_string())) } #[async_trait] impl Executor for ScriptRhaiTask { async fn execute(&self, node_id: &NodeId, _node: &NodeDef, ctx: &mut Value) -> anyhow::Result<()> { - let start = Instant::now(); - - // 1) 文件脚本优先:nodes..scripts.rhai -> 直接执行文件 + // 1) 优先执行文件脚本:nodes..scripts.rhai if let Some(path) = read_node_script_file(ctx, &node_id.0) { - return exec_rhai_file(node_id, &path, ctx); + let code = fs::read_to_string(&path).map_err(|e| { + info!(target = "udmin.flow", node=%node_id.0, err=%e.to_string(), path, "script_rhai task: failed to read Rhai file"); + anyhow!("failed to read Rhai file: {}", e) + })?; + return exec_rhai_code(node_id, &code, ctx, "file"); } - // 2) inline 脚本(支持 String 或 { script | expr }) - let cfg: Option = ctx.get("nodes") - .and_then(|nodes| nodes.get(&node_id.0)) - .and_then(|n| n.get("script").or_else(|| n.get("expr"))) - .and_then(|v| match v { Value::String(s) => Some(s.clone()), Value::Object(m) => m.get("script").or_else(|| m.get("expr")).and_then(|x| x.as_str()).map(|s| s.to_string()), _ => None }) - .or_else(|| ctx.get("script").and_then(|v| v.as_str()).map(|s| s.to_string())) - .or_else(|| ctx.get("expr").and_then(|v| v.as_str()).map(|s| s.to_string())); - - if let Some(script) = cfg { - if script.trim().is_empty() { - info!(target = "udmin.flow", node=%node_id.0, "script_rhai task: empty inline script, skip"); - return Ok(()); - } - let script_preview = truncate_str(&script, 200); - debug!(target = "udmin.flow", node=%node_id.0, preview=%script_preview, "script_rhai task: will execute Rhai inline script"); - - let before_ctx = ctx.clone(); - let wrapped = format!("{{ {} ; ctx }}", script); - let res = eval_rhai_expr_json(&wrapped, ctx); - let dur_ms = start.elapsed().as_millis(); - match res { - Ok(new_ctx) => { - let (added, removed, modified) = shallow_diff(&before_ctx, &new_ctx); - *ctx = new_ctx; - info!(target = "udmin.flow", node=%node_id.0, ms=%dur_ms, added=%added.len(), removed=%removed.len(), modified=%modified.len(), "script_rhai task: inline executed and ctx updated"); - if !(added.is_empty() && removed.is_empty() && modified.is_empty()) { - debug!(target = "udmin.flow", node=%node_id.0, ?added, ?removed, ?modified, "script_rhai task: ctx shallow diff"); - } - } - Err(err) => { - info!(target = "udmin.flow", node=%node_id.0, ms=%dur_ms, preview=%script_preview, err=%err.to_string(), "script_rhai task: inline execution failed, ctx unchanged"); - return Err(anyhow!("Rhai inline execution failed: {}", err)); - } - } - return Ok(()); + // 2) 其次执行 inline 脚本(支持 string 或 {script|expr}) + if let Some(script) = read_node_inline_script(ctx, &node_id.0) { + return exec_rhai_code(node_id, &script, ctx, "inline"); } + // 3) 没有脚本 → 跳过 info!(target = "udmin.flow", node=%node_id.0, "script_rhai task: no script found, skip"); Ok(()) } diff --git a/backend/src/middlewares/auth_guard.rs b/backend/src/middlewares/auth_guard.rs index f445c1a..6046345 100644 --- a/backend/src/middlewares/auth_guard.rs +++ b/backend/src/middlewares/auth_guard.rs @@ -23,7 +23,7 @@ impl Default for AuthGuardConfig { // 登录/刷新/公开动态接口等路径前缀允许匿名访问 prefix_whitelist: vec![ "/api/auth/", - "/api/dynamic_api/public/", + "/api/dynamic/", ], // 精确路径白名单:如健康检查等 exact_whitelist: vec![