diff --git a/backend/src/db.rs b/backend/src/db.rs index e95a810..8d2fb27 100644 --- a/backend/src/db.rs +++ b/backend/src/db.rs @@ -25,6 +25,7 @@ pub fn set_db(conn: Db) -> Result<(), AppError> { .map_err(|_| AppError::Anyhow(anyhow::anyhow!("Db already initialized"))) } +#[allow(dead_code)] pub fn get_db() -> Result<&'static Db, AppError> { GLOBAL_DB .get() diff --git a/backend/src/flow/dsl.rs b/backend/src/flow/dsl.rs index bedb7c9..9900647 100644 --- a/backend/src/flow/dsl.rs +++ b/backend/src/flow/dsl.rs @@ -1,5 +1,6 @@ use serde::{Deserialize, Serialize}; use serde_json::Value; +use anyhow::bail; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct FlowDSL { @@ -65,11 +66,11 @@ impl From for super::domain::ChainDef { } } -// ===== New: Parse design_json (free layout JSON) to ChainDef and build execution context ===== +// ===== New: Parse design_json (free layout JSON) to ChainDef ===== /// Build ChainDef from design_json (front-end flow JSON) pub fn chain_from_design_json(design: &Value) -> anyhow::Result { - use super::domain::{ChainDef, NodeDef, NodeId, NodeKind, LinkDef}; + use super::domain::{ChainDef, NodeKind}; // Accept both JSON object and stringified JSON let parsed: Option = match design { @@ -84,11 +85,35 @@ pub fn chain_from_design_json(design: &Value) -> anyhow::Result= 1, "flow must have at least one start node"); + let end_cnt = nodes.iter().filter(|n| matches!(n.kind, NodeKind::End)).count(); + anyhow::ensure!(end_cnt >= 1, "flow must have at least one end node"); + + let links = parse_edges(design, &nodes_arr, &valid_ids)?; + + Ok(ChainDef { name, nodes, links }) +} + +fn parse_nodes(design: &Value) -> anyhow::Result<( + Vec, + std::collections::HashSet, + Vec, +)> { + use super::domain::{NodeDef, NodeId, NodeKind}; + use anyhow::bail; + let nodes_arr = design.get("nodes").and_then(|v| v.as_array()).cloned().unwrap_or_default(); let mut nodes: Vec = Vec::new(); + let mut id_set: std::collections::HashSet = std::collections::HashSet::new(); for n in &nodes_arr { let id = n.get("id").and_then(|v| v.as_str()).unwrap_or_default(); + if id.is_empty() { bail!("node id is required"); } + if !id_set.insert(id.to_string()) { bail!("duplicate node id: {id}"); } let t = n.get("type").and_then(|v| v.as_str()).unwrap_or("task"); let name_field = n .get("data") @@ -111,6 +136,17 @@ pub fn chain_from_design_json(design: &Value) -> anyhow::Result, + valid_ids: &std::collections::HashSet, +) -> anyhow::Result> { + use super::domain::{LinkDef, NodeId}; + use anyhow::bail; + let mut links: Vec = Vec::new(); if let Some(arr) = design.get("edges").and_then(|v| v.as_array()) { for e in arr { @@ -129,6 +165,10 @@ pub fn chain_from_design_json(design: &Value) -> anyhow::Result = None; if let Some(spid) = e.get("sourcePortID").and_then(|v| v.as_str()) { @@ -150,114 +190,84 @@ pub fn chain_from_design_json(design: &Value) -> anyhow::Result String { - let mut t = s.trim(); - if t.len() >= 2 { - let bytes = t.as_bytes(); - let first = bytes[0] as char; - let last = bytes[t.len() - 1] as char; - if (first == '`' && last == '`') || (first == '"' && last == '"') || (first == '\'' && last == '\'') { - t = &t[1..t.len() - 1]; - t = t.trim(); - // Handle stray trailing backslash left by an attempted escape of the closing quote/backtick - if t.ends_with('\\') { - t = &t[..t.len() - 1]; - } - } - } - t.to_string() -} - -/// Build ctx supplement from design_json: fill node-scope configs for executors, e.g., nodes..http -pub fn ctx_from_design_json(design: &Value) -> Value { +#[cfg(test)] +mod tests { + use super::*; use serde_json::json; - // Accept both JSON object and stringified JSON - let parsed: Option = match design { - Value::String(s) => serde_json::from_str::(s).ok(), - _ => None, - }; - let design = parsed.as_ref().unwrap_or(design); - - let mut nodes_map = serde_json::Map::new(); - - if let Some(arr) = design.get("nodes").and_then(|v| v.as_array()) { - for n in arr { - let id = match n.get("id").and_then(|v| v.as_str()) { - Some(s) => s, - None => continue, - }; - let node_type = n.get("type").and_then(|v| v.as_str()).unwrap_or(""); - let mut node_cfg = serde_json::Map::new(); - - match node_type { - "http" => { - // Extract http config: method, url, headers, query, body - let data = n.get("data"); - let api = data.and_then(|d| d.get("api")); - let method = api.and_then(|a| a.get("method")).and_then(|v| v.as_str()).unwrap_or("GET").to_string(); - let url_val = api.and_then(|a| a.get("url")); - let raw_url = match url_val { - Some(Value::String(s)) => s.clone(), - Some(Value::Object(obj)) => obj.get("content").and_then(|v| v.as_str()).unwrap_or("").to_string(), - _ => String::new(), - }; - let url = sanitize_wrapped(&raw_url); - if !url.is_empty() { - let mut http_obj = serde_json::Map::new(); - http_obj.insert("method".into(), Value::String(method)); - http_obj.insert("url".into(), Value::String(url)); - // Optionally: headers/query/body - if let Some(hs) = api.and_then(|a| a.get("headers")).and_then(|v| v.as_object()) { - let mut heads = serde_json::Map::new(); - for (k, v) in hs.iter() { - if let Some(s) = v.as_str() { heads.insert(k.clone(), Value::String(s.to_string())); } - } - if !heads.is_empty() { http_obj.insert("headers".into(), Value::Object(heads)); } - } - if let Some(qs) = api.and_then(|a| a.get("query")).and_then(|v| v.as_object()) { - let mut query = serde_json::Map::new(); - for (k, v) in qs.iter() { query.insert(k.clone(), v.clone()); } - if !query.is_empty() { http_obj.insert("query".into(), Value::Object(query)); } - } - if let Some(body_obj) = data.and_then(|d| d.get("body")).and_then(|v| v.as_object()) { - // try body.content or body.json - if let Some(Value::Object(json_body)) = body_obj.get("json") { http_obj.insert("body".into(), Value::Object(json_body.clone())); } - else if let Some(Value::String(s)) = body_obj.get("content") { http_obj.insert("body".into(), Value::String(s.clone())); } - } - node_cfg.insert("http".into(), Value::Object(http_obj)); - } - } - "db" => { - // Extract db config: sql, params, outputKey - let data = n.get("data"); - if let Some(db_cfg) = data.and_then(|d| d.get("db")).and_then(|v| v.as_object()) { - let mut db_obj = serde_json::Map::new(); - // sql can be string or object with content - let raw_sql = db_cfg.get("sql"); - let sql = match raw_sql { - Some(Value::String(s)) => sanitize_wrapped(s), - Some(Value::Object(o)) => o.get("content").and_then(|v| v.as_str()).map(sanitize_wrapped).unwrap_or_default(), - _ => String::new(), - }; - if !sql.is_empty() { db_obj.insert("sql".into(), Value::String(sql)); } - if let Some(p) = db_cfg.get("params") { db_obj.insert("params".into(), p.clone()); } - if let Some(Value::String(k)) = db_cfg.get("outputKey") { db_obj.insert("outputKey".into(), Value::String(k.clone())); } - if let Some(conn) = db_cfg.get("connection") { db_obj.insert("connection".into(), conn.clone()); } - if !db_obj.is_empty() { node_cfg.insert("db".into(), Value::Object(db_obj)); } - } - } - _ => {} - } - - if !node_cfg.is_empty() { nodes_map.insert(id.to_string(), Value::Object(node_cfg)); } - } + #[test] + fn build_chain_ok_with_start_end_and_tasks() { + let design = json!({ + "name": "demo", + "nodes": [ + {"id": "s1", "type": "start", "data": {"title": "start"}}, + {"id": "t1", "type": "http", "data": {"title": "call"}}, + {"id": "e1", "type": "end", "data": {"title": "end"}} + ], + "edges": [ + {"sourceNodeID": "s1", "targetNodeID": "t1"}, + {"sourceNodeID": "t1", "targetNodeID": "e1"} + ] + }); + let chain = chain_from_design_json(&design).expect("ok"); + assert_eq!(chain.name, "demo"); + assert_eq!(chain.nodes.len(), 3); + assert_eq!(chain.links.len(), 2); + assert_eq!(chain.nodes.iter().find(|n| matches!(n.kind, super::super::domain::NodeKind::Start)).is_some(), true); + assert_eq!(chain.nodes.iter().find(|n| matches!(n.kind, super::super::domain::NodeKind::End)).is_some(), true); } - json!({ "nodes": Value::Object(nodes_map) }) + #[test] + fn duplicate_node_id_should_error() { + let design = json!({ + "nodes": [ + {"id": "n1", "type": "start"}, + {"id": "n1", "type": "end"} + ], + "edges": [] + }); + let err = chain_from_design_json(&design).unwrap_err(); + let msg = format!("{err}"); + assert!(msg.contains("duplicate node id")); + } + + #[test] + fn missing_start_or_end_should_error() { + let only_start = json!({ + "nodes": [ + {"id": "s1", "type": "start"}, + {"id": "t1", "type": "task"} + ], + "edges": [] + }); + let err = chain_from_design_json(&only_start).unwrap_err(); + assert!(format!("{err}").contains("end")); + + let only_end = json!({ + "nodes": [ + {"id": "e1", "type": "end"} + ], + "edges": [] + }); + let err = chain_from_design_json(&only_end).unwrap_err(); + assert!(format!("{err}").contains("start")); + } + + #[test] + fn edge_ref_unknown_node_should_error() { + let design = json!({ + "nodes": [ + {"id": "s1", "type": "start"}, + {"id": "e1", "type": "end"} + ], + "edges": [ + {"sourceNodeID": "s1", "targetNodeID": "x"} + ] + }); + let err = chain_from_design_json(&design).unwrap_err(); + assert!(format!("{err}").contains("unknown node")); + } } \ No newline at end of file diff --git a/backend/src/flow/engine.rs b/backend/src/flow/engine.rs index 7f45751..6c0dbfd 100644 --- a/backend/src/flow/engine.rs +++ b/backend/src/flow/engine.rs @@ -12,6 +12,8 @@ pub struct FlowEngine { impl FlowEngine { pub fn new(tasks: TaskRegistry) -> Self { Self { tasks } } + pub fn builder() -> FlowEngineBuilder { FlowEngineBuilder::default() } + pub async fn drive(&self, chain: &ChainDef, mut ctx: serde_json::Value, opts: DriveOptions) -> anyhow::Result<(serde_json::Value, Vec)> { let mut logs = Vec::new(); @@ -58,19 +60,20 @@ impl FlowEngine { if let Some(task) = self.tasks.get(task_name) { match opts.execution_mode { ExecutionMode::Sync => { - if let serde_json::Value::Object(obj) = &mut ctx { obj.insert("__current_node_id".to_string(), serde_json::Value::String(node.id.0.clone())); } - task.execute(&mut ctx).await?; + // 直接传入 node_id 与 node,避免对 ctx 魔法字段的依赖 + task.execute(&node.id, node, &mut ctx).await?; logs.push(format!("exec task: {} (sync)", task_name)); info!(target: "udmin.flow", "exec task: {} (sync)", task_name); } ExecutionMode::AsyncFireAndForget => { // fire-and-forget: 复制一份上下文供该任务使用,主流程不等待 let mut task_ctx = ctx.clone(); - if let serde_json::Value::Object(obj) = &mut task_ctx { obj.insert("__current_node_id".to_string(), serde_json::Value::String(node.id.0.clone())); } let task_arc = task.clone(); let name_for_log = task_name.clone(); + let node_id = node.id.clone(); + let node_def = (*node).clone(); tokio::spawn(async move { - let _ = task_arc.execute(&mut task_ctx).await; + let _ = task_arc.execute(&node_id, &node_def, &mut task_ctx).await; info!(target: "udmin.flow", "exec task done (async): {}", name_for_log); }); logs.push(format!("spawn task: {} (async)", task_name)); @@ -122,6 +125,23 @@ impl FlowEngine { } } +#[derive(Default)] +pub struct FlowEngineBuilder { + tasks: Option, +} + +impl FlowEngineBuilder { + pub fn tasks(mut self, reg: TaskRegistry) -> Self { self.tasks = Some(reg); self } + pub fn build(self) -> FlowEngine { + let tasks = self.tasks.unwrap_or_else(|| crate::flow::task::default_registry()); + FlowEngine { tasks } + } +} + +impl Default for FlowEngine { + fn default() -> Self { Self { tasks: crate::flow::task::default_registry() } } +} + fn eval_condition_json(ctx: &serde_json::Value, cond: &serde_json::Value) -> anyhow::Result { // 目前支持前端 Condition 组件导出的: { left:{type, content}, operator, right? } let left = cond.get("left").ok_or_else(|| anyhow::anyhow!("missing left"))?; diff --git a/backend/src/flow/executors/db.rs b/backend/src/flow/executors/db.rs index 2e8410b..55ff7f2 100644 --- a/backend/src/flow/executors/db.rs +++ b/backend/src/flow/executors/db.rs @@ -2,21 +2,17 @@ use async_trait::async_trait; use serde_json::{json, Value}; use tracing::info; -use crate::flow::task::TaskComponent; +use crate::flow::task::Executor; +use crate::flow::domain::{NodeDef, NodeId}; #[derive(Default)] pub struct DbTask; #[async_trait] -impl TaskComponent for DbTask { - async fn execute(&self, ctx: &mut Value) -> anyhow::Result<()> { - // 1) 获取当前节点ID - let node_id_opt = ctx - .get("__current_node_id") - .and_then(|v| v.as_str()) - .map(|s| s.to_string()); - - // 2) 读取 db 配置:仅节点级 db,不再回退到全局 ctx.db,避免误用项目数据库 +impl Executor for DbTask { + async fn execute(&self, node_id: &NodeId, _node: &NodeDef, ctx: &mut Value) -> anyhow::Result<()> { + // 1) 读取 db 配置:仅节点级 db,不再回退到全局 ctx.db,避免误用项目数据库 + let node_id_opt = Some(node_id.0.clone()); let cfg = match (&node_id_opt, ctx.get("nodes")) { (Some(node_id), Some(nodes)) => nodes.get(&node_id).and_then(|n| n.get("db")).cloned(), _ => None, @@ -83,10 +79,10 @@ impl TaskComponent for DbTask { let mut obj = serde_json::Map::new(); // 读取列名列表 let cols = row.column_names(); - for (idx, col_name) in cols.iter().enumerate() { + for col_name in cols.iter() { let key = col_name.to_string(); // 尝试以通用 JSON 值提取(优先字符串、数值、布尔、二进制、null) - let val = try_get_as_json(&row, idx, &key); + let val = try_get_as_json(&row, &key); obj.insert(key, val); } out.push(Value::Object(obj)); @@ -130,9 +126,7 @@ impl TaskComponent for DbTask { *url = "***".to_string(); } } - Value::String(s) => { - *s = "***".to_string(); - } + Value::String(s) => { *s = "***".to_string(); } _ => {} } } @@ -247,15 +241,18 @@ fn json_to_db_value(v: Value) -> anyhow::Result { Ok(dv) } -fn try_get_as_json(row: &sea_orm::QueryResult, idx: usize, col_name: &str) -> Value { - use sea_orm::TryGetable; - // 尝试多种基础类型 - if let Ok(v) = row.try_get::>("", col_name) { return v.map(Value::String).unwrap_or(Value::Null); } - if let Ok(v) = row.try_get::>("", col_name) { return v.map(|x| json!(x)).unwrap_or(Value::Null); } - if let Ok(v) = row.try_get::>("", col_name) { return v.map(|x| json!(x)).unwrap_or(Value::Null); } - if let Ok(v) = row.try_get::>("", col_name) { return v.map(|x| json!(x)).unwrap_or(Value::Null); } - if let Ok(v) = row.try_get::>("", col_name) { return v.map(|x| json!(x)).unwrap_or(Value::Null); } - // 回退:按索引读取成字符串 - if let Ok(v) = row.try_get_by_index::>(idx) { return v.map(Value::String).unwrap_or(Value::Null); } - Value::Null +fn try_get_as_json(row: &sea_orm::QueryResult, col_name: &str) -> Value { + // 该函数在原文件其余部分定义,保持不变 + #[allow(unused)] + fn guess_text(bytes: &[u8]) -> Option { + String::from_utf8(bytes.to_vec()).ok() + } + row.try_get::("", col_name) + .map(Value::String) + .or_else(|_| row.try_get::("", col_name).map(|v| Value::Number(v.into()))) + .or_else(|_| row.try_get::("", col_name).map(|v| Value::Number(v.into()))) + .or_else(|_| row.try_get::("", col_name).map(|v| serde_json::Number::from_f64(v).map(Value::Number).unwrap_or(Value::Null))) + .or_else(|_| row.try_get::("", col_name).map(Value::Bool)) + .or_else(|_| row.try_get::>("", col_name).map(|v| guess_text(&v).map(Value::String).unwrap_or(Value::Null))) + .unwrap_or_else(|_| Value::Null) } \ No newline at end of file diff --git a/backend/src/flow/executors/http.rs b/backend/src/flow/executors/http.rs index c0a020a..56f835c 100644 --- a/backend/src/flow/executors/http.rs +++ b/backend/src/flow/executors/http.rs @@ -5,7 +5,8 @@ use std::collections::HashMap; use std::time::Duration; use reqwest::Certificate; -use crate::flow::task::TaskComponent; +use crate::flow::task::Executor; +use crate::flow::domain::{NodeDef, NodeId}; #[derive(Default)] pub struct HttpTask; @@ -19,23 +20,16 @@ struct HttpOpts { } #[async_trait] -impl TaskComponent for HttpTask { - async fn execute(&self, ctx: &mut Value) -> anyhow::Result<()> { - // 1) 读取当前节点ID(由引擎在执行前写入 ctx.__current_node_id) - let node_id_opt = ctx - .get("__current_node_id") - .and_then(|v| v.as_str()) - .map(|s| s.to_string()); - - // 2) 从 ctx 中提取 http 配置 - // 优先 nodes..http,其次全局 http +impl Executor for HttpTask { + async fn execute(&self, node_id: &NodeId, _node: &NodeDef, ctx: &mut Value) -> anyhow::Result<()> { + // 1) 从 ctx 中提取 http 配置:优先 nodes..http,其次全局 http + let node_id_opt = Some(node_id.0.clone()); let cfg = match (&node_id_opt, ctx.get("nodes")) { (Some(node_id), Some(nodes)) => nodes.get(&node_id).and_then(|n| n.get("http")).cloned(), _ => None, }.or_else(|| ctx.get("http").cloned()); let Some(cfg) = cfg else { - // 未提供配置,直接跳过(也可选择返回错误) info!(target = "udmin.flow", "http task: no config found, skip"); return Ok(()); }; @@ -85,9 +79,7 @@ impl TaskComponent for HttpTask { req = req.query(&pairs); } - if let Some(b) = body { - req = req.json(&b); - } + if let Some(b) = body { req = req.json(&b); } let resp = req.send().await?; let status = resp.status().as_u16(); @@ -118,9 +110,7 @@ impl TaskComponent for HttpTask { } } // 退回:写入全局 - if let Value::Object(map) = ctx { - map.insert("http_response".to_string(), result); - } + if let Value::Object(map) = ctx { map.insert("http_response".to_string(), result); } Ok(()) } } diff --git a/backend/src/flow/mappers.rs b/backend/src/flow/mappers.rs new file mode 100644 index 0000000..21dc717 --- /dev/null +++ b/backend/src/flow/mappers.rs @@ -0,0 +1,65 @@ +use serde_json::{json, Value}; + +mod http; +mod db; + +/// Trim whitespace and strip wrapping quotes/backticks if present +pub fn sanitize_wrapped(s: &str) -> String { + let mut t = s.trim(); + if t.len() >= 2 { + let bytes = t.as_bytes(); + let first = bytes[0] as char; + let last = bytes[t.len() - 1] as char; + if (first == '`' && last == '`') || (first == '"' && last == '"') || (first == '\'' && last == '\'') { + t = &t[1..t.len() - 1]; + t = t.trim(); + // Handle stray trailing backslash left by an attempted escape of the closing quote/backtick + if t.ends_with('\\') { + t = &t[..t.len() - 1]; + } + } + } + t.to_string() +} + +/// Build ctx supplement from design_json: fill node-scope configs for executors, e.g., nodes.. +/// This module intentionally keeps executor-specific mapping away from the DSL parser. +pub fn ctx_from_design_json(design: &Value) -> Value { + // Accept both JSON object and stringified JSON + let parsed: Option = match design { + Value::String(s) => serde_json::from_str::(s).ok(), + _ => None, + }; + let design = parsed.as_ref().unwrap_or(design); + + let mut nodes_map = serde_json::Map::new(); + + if let Some(arr) = design.get("nodes").and_then(|v| v.as_array()) { + for n in arr { + let id = match n.get("id").and_then(|v| v.as_str()) { + Some(s) => s, + None => continue, + }; + let node_type = n.get("type").and_then(|v| v.as_str()).unwrap_or(""); + let mut node_cfg = serde_json::Map::new(); + + match node_type { + "http" => { + if let Some(v) = http::extract_http_cfg(n) { + node_cfg.insert("http".into(), v); + } + } + "db" => { + if let Some(v) = db::extract_db_cfg(n) { + node_cfg.insert("db".into(), v); + } + } + _ => {} + } + + if !node_cfg.is_empty() { nodes_map.insert(id.to_string(), Value::Object(node_cfg)); } + } + } + + json!({ "nodes": Value::Object(nodes_map) }) +} \ No newline at end of file diff --git a/backend/src/flow/mappers/db.rs b/backend/src/flow/mappers/db.rs new file mode 100644 index 0000000..35eb101 --- /dev/null +++ b/backend/src/flow/mappers/db.rs @@ -0,0 +1,35 @@ +use serde_json::Value; + +// Extract db config: sql, params, outputKey, connection from a node +pub fn extract_db_cfg(n: &Value) -> Option { + let data = n.get("data"); + let db_cfg = data.and_then(|d| d.get("db")).and_then(|v| v.as_object())?; + + let mut db_obj = serde_json::Map::new(); + // sql can be string or object with content + let raw_sql = db_cfg.get("sql"); + let sql = match raw_sql { + Some(Value::String(s)) => super::sanitize_wrapped(s), + Some(Value::Object(o)) => o + .get("content") + .and_then(|v| v.as_str()) + .map(super::sanitize_wrapped) + .unwrap_or_default(), + _ => String::new(), + }; + if !sql.is_empty() { + db_obj.insert("sql".into(), Value::String(sql)); + } + + if let Some(p) = db_cfg.get("params") { + db_obj.insert("params".into(), p.clone()); + } + if let Some(Value::String(k)) = db_cfg.get("outputKey") { + db_obj.insert("outputKey".into(), Value::String(k.clone())); + } + if let Some(conn) = db_cfg.get("connection") { + db_obj.insert("connection".into(), conn.clone()); + } + + if db_obj.is_empty() { None } else { Some(Value::Object(db_obj)) } +} \ No newline at end of file diff --git a/backend/src/flow/mappers/http.rs b/backend/src/flow/mappers/http.rs new file mode 100644 index 0000000..6ce7acc --- /dev/null +++ b/backend/src/flow/mappers/http.rs @@ -0,0 +1,65 @@ +use serde_json::Value; + +// Extract http config: method, url, headers, query, body from a node +pub fn extract_http_cfg(n: &Value) -> Option { + let data = n.get("data"); + let api = data.and_then(|d| d.get("api")); + let method = api + .and_then(|a| a.get("method")) + .and_then(|v| v.as_str()) + .unwrap_or("GET") + .to_string(); + let url_val = api.and_then(|a| a.get("url")); + let raw_url = match url_val { + Some(Value::String(s)) => s.clone(), + Some(Value::Object(obj)) => obj + .get("content") + .and_then(|v| v.as_str()) + .unwrap_or("") + .to_string(), + _ => String::new(), + }; + let url = super::sanitize_wrapped(&raw_url); + if url.is_empty() { + return None; + } + + let mut http_obj = serde_json::Map::new(); + http_obj.insert("method".into(), Value::String(method)); + http_obj.insert("url".into(), Value::String(url)); + + // Optional: headers + if let Some(hs) = api.and_then(|a| a.get("headers")).and_then(|v| v.as_object()) { + let mut heads = serde_json::Map::new(); + for (k, v) in hs.iter() { + if let Some(s) = v.as_str() { + heads.insert(k.clone(), Value::String(s.to_string())); + } + } + if !heads.is_empty() { + http_obj.insert("headers".into(), Value::Object(heads)); + } + } + + // Optional: query + if let Some(qs) = api.and_then(|a| a.get("query")).and_then(|v| v.as_object()) { + let mut query = serde_json::Map::new(); + for (k, v) in qs.iter() { + query.insert(k.clone(), v.clone()); + } + if !query.is_empty() { + http_obj.insert("query".into(), Value::Object(query)); + } + } + + // Optional: body + if let Some(body_obj) = data.and_then(|d| d.get("body")).and_then(|v| v.as_object()) { + if let Some(Value::Object(json_body)) = body_obj.get("json") { + http_obj.insert("body".into(), Value::Object(json_body.clone())); + } else if let Some(Value::String(s)) = body_obj.get("content") { + http_obj.insert("body".into(), Value::String(s.clone())); + } + } + + Some(Value::Object(http_obj)) +} \ No newline at end of file diff --git a/backend/src/flow/mod.rs b/backend/src/flow/mod.rs index 9366ce2..6f05ecb 100644 --- a/backend/src/flow/mod.rs +++ b/backend/src/flow/mod.rs @@ -4,4 +4,5 @@ pub mod task; pub mod engine; pub mod dsl; pub mod storage; -pub mod executors; \ No newline at end of file +pub mod executors; +pub mod mappers; \ No newline at end of file diff --git a/backend/src/flow/storage.rs b/backend/src/flow/storage.rs index db25c74..06d9509 100644 --- a/backend/src/flow/storage.rs +++ b/backend/src/flow/storage.rs @@ -12,4 +12,25 @@ pub fn get(id: &str) -> Option { STORE.lock().unwrap().get(id).cloned() pub fn put(id: String, yaml: String) { STORE.lock().unwrap().insert(id, yaml); } -pub fn del(id: &str) -> Option { STORE.lock().unwrap().remove(id) } \ No newline at end of file +pub fn del(id: &str) -> Option { STORE.lock().unwrap().remove(id) } + +// ---- New: trait abstraction for storage ---- + +pub trait Storage: Send + Sync { + fn list(&self) -> Vec<(String, String)>; + fn get(&self, id: &str) -> Option; + fn put(&self, id: &str, yaml: String); + fn del(&self, id: &str) -> Option; +} + +/// A trivial Storage implementation that delegates to the module-level global in-memory store. +pub struct GlobalMemoryStorage; + +impl Storage for GlobalMemoryStorage { + fn list(&self) -> Vec<(String, String)> { crate::flow::storage::list() } + fn get(&self, id: &str) -> Option { crate::flow::storage::get(id) } + fn put(&self, id: &str, yaml: String) { crate::flow::storage::put(id.to_string(), yaml) } + fn del(&self, id: &str) -> Option { crate::flow::storage::del(id) } +} + +pub fn default_storage() -> std::sync::Arc { std::sync::Arc::new(GlobalMemoryStorage) } \ No newline at end of file diff --git a/backend/src/flow/task.rs b/backend/src/flow/task.rs index ee16bd4..b53bf61 100644 --- a/backend/src/flow/task.rs +++ b/backend/src/flow/task.rs @@ -1,14 +1,15 @@ use async_trait::async_trait; use serde_json::Value; +use std::sync::{Arc, RwLock, OnceLock}; + +use crate::flow::domain::{NodeDef, NodeId}; #[async_trait] -pub trait TaskComponent: Send + Sync { - async fn execute(&self, ctx: &mut Value) -> anyhow::Result<()>; +pub trait Executor: Send + Sync { + async fn execute(&self, node_id: &NodeId, node: &NodeDef, ctx: &mut Value) -> anyhow::Result<()>; } -pub type TaskRegistry = std::collections::HashMap>; - -use std::sync::{Arc, RwLock, OnceLock}; +pub type TaskRegistry = std::collections::HashMap>; pub fn default_registry() -> TaskRegistry { let mut reg: TaskRegistry = TaskRegistry::new(); @@ -27,7 +28,7 @@ pub fn get_registry() -> TaskRegistry { } /// Register/override a single task into global registry. -pub fn register_global_task(name: impl Into, task: Arc) { +pub fn register_global_task(name: impl Into, task: Arc) { let lock = GLOBAL_TASK_REGISTRY.get_or_init(|| RwLock::new(default_registry())); let mut w = lock.write().expect("lock poisoned"); w.insert(name.into(), task); diff --git a/backend/src/redis.rs b/backend/src/redis.rs index 59a73f0..b399b4c 100644 --- a/backend/src/redis.rs +++ b/backend/src/redis.rs @@ -61,6 +61,7 @@ impl RedisHelper { } /// 删除键 + #[allow(dead_code)] pub async fn del(key: &str) -> Result<(), AppError> { let mut conn = get_redis()?.clone(); let _: i32 = conn.del(key).await @@ -69,6 +70,7 @@ impl RedisHelper { } /// 检查键是否存在 + #[allow(dead_code)] pub async fn exists(key: &str) -> Result { let mut conn = get_redis()?.clone(); let result: bool = conn.exists(key).await @@ -77,6 +79,7 @@ impl RedisHelper { } /// 设置键的过期时间 + #[allow(dead_code)] pub async fn expire(key: &str, seconds: u64) -> Result<(), AppError> { let mut conn = get_redis()?.clone(); let _: bool = conn.expire(key, seconds as i64).await @@ -136,12 +139,14 @@ impl TokenRedis { } /// 删除用户的访问token + #[allow(dead_code)] pub async fn revoke_access_token(user_id: i64) -> Result<(), AppError> { let key = format!("token:access:user:{}", user_id); RedisHelper::del(&key).await } /// 删除用户的刷新token + #[allow(dead_code)] pub async fn revoke_refresh_token(user_id: i64) -> Result<(), AppError> { let key = format!("token:refresh:user:{}", user_id); RedisHelper::del(&key).await diff --git a/backend/src/services/flow_service.rs b/backend/src/services/flow_service.rs index 165551a..0a0ce09 100644 --- a/backend/src/services/flow_service.rs +++ b/backend/src/services/flow_service.rs @@ -246,7 +246,7 @@ pub async fn run(db: &Db, id: &str, req: RunReq, operator: Option<(i64, String)> }; let mut ctx = req.input.clone(); // Merge node-scoped configs into ctx under ctx.nodes - let supplement = flow::dsl::ctx_from_design_json(design); + let supplement = flow::mappers::ctx_from_design_json(design); merge_json(&mut ctx, &supplement); // 解析 executionMode / execution_mode let mode_str = design.get("executionMode").and_then(|v| v.as_str()) @@ -335,7 +335,7 @@ pub async fn run(db: &Db, id: &str, req: RunReq, operator: Option<(i64, String)> // 从全局注册中心获取任务(若未初始化则返回默认注册表) let tasks: flow::task::TaskRegistry = flow::task::get_registry(); - let engine = FlowEngine::new(tasks); + let engine = FlowEngine::builder().tasks(tasks).build(); info!(target = "udmin", "flow.run: driving engine id={} nodes={} links={} execution_mode={:?}", id, chain.nodes.len(), chain.links.len(), exec_mode); // 执行