use anyhow::Context as _; use serde::{Deserialize, Serialize}; use crate::error::AppError; use crate::flow::{self, dsl::FlowDSL, engine::FlowEngine, context::{DriveOptions, ExecutionMode, StreamEvent}, log_handler::{FlowLogHandler, DatabaseLogHandler, SseLogHandler}}; use crate::db::Db; use crate::models::flow as db_flow; use crate::models::request_log; // 新增:查询最近修改人 use sea_orm::{EntityTrait, ActiveModelTrait, Set, DbErr, ColumnTrait, QueryFilter, PaginatorTrait, QueryOrder}; use sea_orm::entity::prelude::DateTimeWithTimeZone; // 新增:时间类型 use chrono::{Utc, FixedOffset}; use tracing::{info, error}; // 新增:用于流式事件通道 use tokio::sync::mpsc::Sender; // 新增:用于错误下传递局部上下文与日志 use crate::flow::engine::DriveError; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct FlowSummary { pub id: i64, pub name: String, #[serde(skip_serializing_if = "Option::is_none")] pub code: Option, #[serde(skip_serializing_if = "Option::is_none")] pub remark: Option, pub created_at: DateTimeWithTimeZone, pub updated_at: DateTimeWithTimeZone, #[serde(skip_serializing_if = "Option::is_none")] pub last_modified_by: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct FlowDoc { pub id: i64, pub yaml: String, #[serde(skip_serializing_if = "Option::is_none")] pub design_json: Option, #[serde(skip_serializing_if = "Option::is_none")] pub name: Option, #[serde(skip_serializing_if = "Option::is_none")] pub code: Option, #[serde(skip_serializing_if = "Option::is_none")] pub remark: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct FlowCreateReq { pub yaml: Option, pub name: Option, pub design_json: Option, pub code: Option, pub remark: Option } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct FlowUpdateReq { pub yaml: Option, pub design_json: Option, pub name: Option, pub code: Option, pub remark: Option } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct RunReq { #[serde(default)] pub input: serde_json::Value } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct RunResult { pub ok: bool, pub ctx: serde_json::Value, pub logs: Vec } #[derive(Clone, Debug, serde::Serialize)] pub struct PageResp { pub items: Vec, pub total: u64, pub page: u64, pub page_size: u64 } // list flows from database with pagination & keyword pub async fn list(db: &Db, page: u64, page_size: u64, keyword: Option) -> anyhow::Result> { let mut selector = db_flow::Entity::find(); if let Some(k) = keyword.filter(|s| !s.is_empty()) { let like = format!("%{}%", k); // 名称模糊匹配 + 若关键字可解析为数字则按ID精确匹配 selector = selector.filter( db_flow::Column::Name.like(like.clone()) .or( match k.parse::() { Ok(num) => db_flow::Column::Id.eq(num), Err(_) => db_flow::Column::Name.like(like), } ) ); } let paginator = selector.order_by_desc(db_flow::Column::CreatedAt).paginate(db, page_size); let total = paginator.num_items().await? as u64; let models = paginator.fetch_page(if page > 0 { page - 1 } else { 0 }).await?; let mut items: Vec = Vec::with_capacity(models.len()); for row in models.into_iter() { let id = row.id; let name = row .name .clone() .or_else(|| row.yaml.as_deref().and_then(extract_name)) .unwrap_or_else(|| { let prefix: String = id.to_string().chars().take(8).collect(); format!("flow_{}", prefix) }); // 最近修改人:从请求日志中查找最近一次对该flow的PUT请求 let last_modified_by = request_log::Entity::find() .filter(request_log::Column::Path.like(format!("/api/flows/{}%", id))) .filter(request_log::Column::Method.eq("PUT")) .order_by_desc(request_log::Column::RequestTime) .one(db) .await? .and_then(|m| m.username); items.push(FlowSummary { id, name, code: row.code.clone(), remark: row.remark.clone(), created_at: row.created_at, updated_at: row.updated_at, last_modified_by, }); } Ok(PageResp { items, total, page, page_size }) } // create new flow with yaml or just name pub async fn create(db: &Db, req: FlowCreateReq) -> anyhow::Result { info!(target: "udmin", "flow.create: start"); if let Some(yaml) = &req.yaml { let _parsed: FlowDSL = serde_yaml::from_str(yaml).context("invalid flow yaml")?; info!(target: "udmin", "flow.create: yaml parsed ok"); } let id: i64 = crate::utils::generate_id(); let name = req .name .clone() .or_else(|| req.yaml.as_deref().and_then(extract_name)); let now = Utc::now().with_timezone(&FixedOffset::east_opt(0).unwrap()); let design_json_str = match &req.design_json { Some(v) => serde_json::to_string(v).ok(), None => None }; // 克隆一份用于返回 let ret_name = name.clone(); let ret_code = req.code.clone(); let ret_remark = req.remark.clone(); let am = db_flow::ActiveModel { id: Set(id), name: Set(name.clone()), yaml: Set(req.yaml.clone()), design_json: Set(design_json_str), // 新增: code 与 remark 入库 code: Set(req.code.clone()), remark: Set(req.remark.clone()), created_at: Set(now), updated_at: Set(Utc::now().with_timezone(&FixedOffset::east_opt(0).unwrap())), ..Default::default() }; info!(target: "udmin", "flow.create: inserting into db id={}", id); match db_flow::Entity::insert(am).exec(db).await { Ok(_) => { info!(target: "udmin", "flow.create: insert ok id={}", id); Ok(FlowDoc { id, yaml: req.yaml.unwrap_or_default(), design_json: req.design_json, name: ret_name, code: ret_code, remark: ret_remark }) } Err(DbErr::RecordNotInserted) => { error!(target: "udmin", "flow.create: insert returned RecordNotInserted, verifying by select id={}", id); match db_flow::Entity::find_by_id(id).one(db).await { Ok(Some(_)) => { info!(target: "udmin", "flow.create: found inserted row by id={}, treating as success", id); Ok(FlowDoc { id, yaml: req.yaml.unwrap_or_default(), design_json: req.design_json, name, code: req.code, remark: req.remark }) } Ok(None) => Err(anyhow::anyhow!("insert flow failed").context("verify inserted row not found")), Err(e) => Err(anyhow::Error::new(e).context("insert flow failed")), } } Err(e) => { error!(target: "udmin", error = ?e, "flow.create: insert failed"); Err(anyhow::Error::new(e).context("insert flow failed")) } } } pub async fn get(db: &Db, id: i64) -> anyhow::Result { let row = db_flow::Entity::find_by_id(id).one(db).await?; let row = row.ok_or_else(|| anyhow::anyhow!("not found"))?; let yaml = row.yaml.unwrap_or_default(); let design_json = row.design_json.and_then(|s| serde_json::from_str::(&s).ok()); let name = row .name .clone() .or_else(|| extract_name(&yaml)); Ok(FlowDoc { id: row.id, yaml, design_json, name, code: row.code, remark: row.remark }) } pub async fn get_by_code(db: &Db, code: &str) -> anyhow::Result { let row = db_flow::Entity::find() .filter(db_flow::Column::Code.eq(code)) .one(db) .await?; let row = row.ok_or_else(|| anyhow::anyhow!("flow not found with code: {}", code))?; let yaml = row.yaml.unwrap_or_default(); let design_json = row.design_json.and_then(|s| serde_json::from_str::(&s).ok()); // 名称兜底:数据库 name 为空时,尝试从 YAML 提取 let name = row .name .clone() .or_else(|| extract_name(&yaml)); Ok(FlowDoc { id: row.id, yaml, design_json, name, code: row.code, remark: row.remark }) } pub async fn update(db: &Db, id: i64, req: FlowUpdateReq) -> anyhow::Result { if let Some(yaml) = &req.yaml { let _parsed: FlowDSL = serde_yaml::from_str(yaml).context("invalid flow yaml")?; } let row = db_flow::Entity::find_by_id(id).one(db).await?; let Some(row) = row else { return Err(anyhow::anyhow!("not found")); }; let mut am: db_flow::ActiveModel = row.into(); if let Some(yaml) = req.yaml { let next_name = req .name .or_else(|| extract_name(&yaml)); if let Some(n) = next_name { am.name = Set(Some(n)); } am.yaml = Set(Some(yaml.clone())); } else if let Some(n) = req.name { am.name = Set(Some(n)); } if let Some(dj) = req.design_json { let s = serde_json::to_string(&dj)?; am.design_json = Set(Some(s)); } if let Some(c) = req.code { am.code = Set(Some(c)); } if let Some(r) = req.remark { am.remark = Set(Some(r)); } am.updated_at = Set(Utc::now().with_timezone(&FixedOffset::east_opt(0).unwrap())); am.update(db).await?; let got = db_flow::Entity::find_by_id(id).one(db).await?.unwrap(); let dj = got.design_json.as_deref().and_then(|s| serde_json::from_str::(&s).ok()); Ok(FlowDoc { id, yaml: got.yaml.unwrap_or_default(), design_json: dj, name: got.name, code: got.code, remark: got.remark }) } pub async fn delete(db: &Db, id: i64) -> anyhow::Result<()> { let row = db_flow::Entity::find_by_id(id).one(db).await?; let Some(row) = row else { return Err(anyhow::anyhow!("not found")); }; let am: db_flow::ActiveModel = row.into(); am.delete(db).await?; Ok(()) } pub async fn run(db: &Db, id: i64, req: RunReq, operator: Option<(i64, String)>) -> anyhow::Result { let log_handler = DatabaseLogHandler::new(db.clone()); match run_internal(db, id, req, operator, &log_handler, None).await { Ok((ctx, logs)) => Ok(RunResult { ok: true, ctx, logs }), Err(e) => { if let Some(de) = e.downcast_ref::().cloned() { Ok(RunResult { ok: false, ctx: de.ctx, logs: de.logs }) } else { let mut full = e.to_string(); for cause in e.chain().skip(1) { full.push_str(" | "); full.push_str(&cause.to_string()); } Ok(RunResult { ok: false, ctx: serde_json::json!({}), logs: vec![full] }) } } } } // 新增:流式运行,向外发送节点事件与最终完成事件 pub async fn run_with_stream( db: Db, id: i64, req: RunReq, operator: Option<(i64, String)>, event_tx: Sender, ) -> anyhow::Result<()> { let tx_done = event_tx.clone(); let log_handler = SseLogHandler::new(db.clone(), event_tx.clone()); match run_internal(&db, id, req, operator, &log_handler, Some(event_tx)).await { Ok((_ctx, _logs)) => Ok(()), Err(e) => { if let Some(de) = e.downcast_ref::().cloned() { crate::middlewares::sse::emit_done(&tx_done, false, de.ctx, de.logs).await; } else { let mut full = e.to_string(); for cause in e.chain().skip(1) { full.push_str(" | "); full.push_str(&cause.to_string()); } crate::middlewares::sse::emit_done(&tx_done, false, serde_json::json!({}), vec![full]).await; } Ok(()) } } } // 内部统一的运行方法 async fn run_internal( db: &Db, id: i64, req: RunReq, operator: Option<(i64, String)>, log_handler: &dyn FlowLogHandler, event_tx: Option>, ) -> anyhow::Result<(serde_json::Value, Vec)> { info!(target = "udmin", "flow.run_internal: start id={}", id); let start = Utc::now().with_timezone(&FixedOffset::east_opt(0).unwrap()); let flow_code: Option = match db_flow::Entity::find_by_id(id).one(db).await { Ok(Some(row)) => row.code, _ => None }; let doc = match get(db, id).await { Ok(d) => d, Err(e) => { error!(target = "udmin", error = ?e, "flow.run_internal: get doc failed id={}", id); let error_msg = format!("get doc failed: {}", e); log_handler.log_error(id, flow_code.as_deref(), &req.input, &error_msg, operator, start, 0).await?; return Err(e); } }; info!(target = "udmin", "flow.run_internal: doc loaded id={} has_design_json={} yaml_len={}", id, doc.design_json.is_some(), doc.yaml.len()); // 构建 chain 与 ctx let mut exec_mode: ExecutionMode = ExecutionMode::Sync; let (mut chain, mut ctx) = if let Some(design) = &doc.design_json { let chain_from_json = match flow::dsl::chain_from_design_json(design) { Ok(c) => c, Err(e) => { error!(target = "udmin", error = ?e, "flow.run_internal: build chain from design_json failed id={}", id); let error_msg = format!("build chain from design_json failed: {}", e); log_handler.log_error(id, flow_code.as_deref(), &req.input, &error_msg, operator, start, 0).await?; return Err(e); } }; let mut ctx = req.input.clone(); let supplement = flow::mappers::ctx_from_design_json(design); merge_json(&mut ctx, &supplement); let mode_str = design.get("executionMode").and_then(|v| v.as_str()) .or_else(|| design.get("execution_mode").and_then(|v| v.as_str())) .unwrap_or("sync"); exec_mode = parse_execution_mode(mode_str); let bounded_limit = design.get("concurrencyLimit").and_then(|v| v.as_u64()).map(|x| x as usize); let _ = bounded_limit; (chain_from_json, ctx) } else { let dsl = match serde_yaml::from_str::(&doc.yaml) { Ok(d) => d, Err(e) => { error!(target = "udmin", error = ?e, "flow.run_internal: parse YAML failed id={}", id); let error_msg = format!("parse YAML failed: {}", e); log_handler.log_error(id, flow_code.as_deref(), &req.input, &error_msg, operator, start, 0).await?; return Err(anyhow::Error::new(e).context("invalid flow yaml")); } }; if let Some(m) = dsl.execution_mode.as_deref() { exec_mode = parse_execution_mode(m); } (dsl.into(), req.input.clone()) }; // 兜底回退 if chain.nodes.is_empty() { if !doc.yaml.trim().is_empty() { match serde_yaml::from_str::(&doc.yaml) { Ok(dsl) => { chain = dsl.clone().into(); ctx = req.input.clone(); if let Some(m) = dsl.execution_mode.as_deref() { exec_mode = parse_execution_mode(m); } } Err(e) => { let error_msg = format!("fallback parse YAML failed: {}", e); log_handler.log_error(id, flow_code.as_deref(), &req.input, &error_msg, operator, start, 0).await?; return Err(anyhow::anyhow!("empty chain: design_json produced no nodes and YAML parse failed")); } } } else { let error_msg = "empty chain: both design_json and yaml are empty"; log_handler.log_error(id, flow_code.as_deref(), &req.input, error_msg, operator, start, 0).await?; return Err(anyhow::anyhow!(error_msg)); } } // 任务与引擎 let tasks: flow::task::TaskRegistry = flow::task::get_registry(); let engine = FlowEngine::builder().tasks(tasks).build(); // 执行 let drive_res = engine .drive(&chain, ctx, DriveOptions { execution_mode: exec_mode.clone(), event_tx, bounded_limit: if matches!(exec_mode, ExecutionMode::AsyncBounded) { design_concurrency_limit(&doc.design_json) } else { None }, ..Default::default() }) .await; match drive_res { Ok((mut ctx, logs)) => { // 移除 variable 节点 if let serde_json::Value::Object(map) = &mut ctx { if let Some(serde_json::Value::Object(nodes)) = map.get_mut("nodes") { let keys: Vec = nodes .iter() .filter_map(|(k, v)| if v.get("variable").is_some() { Some(k.clone()) } else { None }) .collect(); for k in keys { nodes.remove(&k); } } } let dur = (Utc::now().with_timezone(&FixedOffset::east_opt(0).unwrap()) - start).num_milliseconds() as i64; log_handler.log_success(id, flow_code.as_deref(), &req.input, &ctx, &logs, operator, start, dur).await?; Ok((ctx, logs)) } Err(e) => { error!(target = "udmin", error = ?e, "flow.run_internal: engine drive failed id={}", id); let dur = (Utc::now().with_timezone(&FixedOffset::east_opt(0).unwrap()) - start).num_milliseconds() as i64; if let Some(de) = e.downcast_ref::().cloned() { log_handler .log_error_detail( id, flow_code.as_deref(), &req.input, &de.ctx, &de.logs, &de.message, operator, start, dur, ) .await?; } else { let error_msg = format!("engine drive failed: {}", e); log_handler .log_error( id, flow_code.as_deref(), &req.input, &error_msg, operator, start, dur, ) .await?; } Err(e) } } } fn extract_name(yaml: &str) -> Option { for line in yaml.lines() { let lt = line.trim(); if lt.starts_with("#") && lt.len() > 1 { return Some(lt.trim_start_matches('#').trim().to_string()); } if lt.starts_with("name:") { let name = lt.trim_start_matches("name:").trim(); if !name.is_empty() { return Some(name.to_string()); } } } None } pub fn ae>(e: E) -> AppError { let err: anyhow::Error = e.into(); let mut full = err.to_string(); for cause in err.chain().skip(1) { full.push_str(" | "); full.push_str(&cause.to_string()); } // MySQL duplicate key example: "Database error: Duplicate entry 'xxx' for key 'idx-unique-flows-code'" // 也兼容包含唯一索引名/关键字的报错信息 if full.contains("Duplicate entry") || full.contains("idx-unique-flows-code") || (full.contains("code") && full.contains("unique")) { return AppError::Conflict("流程编码已存在".to_string()); } AppError::Anyhow(anyhow::anyhow!(full)) } // shallow merge json objects: a <- b fn merge_json(a: &mut serde_json::Value, b: &serde_json::Value) { use serde_json::Value as V; match (a, b) { (V::Object(ao), V::Object(bo)) => { for (k, v) in bo.iter() { match ao.get_mut(k) { Some(av) => merge_json(av, v), None => { ao.insert(k.clone(), v.clone()); } } } } (a_slot, b_val) => { *a_slot = b_val.clone(); } } } // parse execution mode string fn parse_execution_mode(s: &str) -> ExecutionMode { match s.to_ascii_lowercase().as_str() { "async" | "async_fire_and_forget" | "fire_and_forget" => ExecutionMode::AsyncFireAndForget, "queued" | "queue" => ExecutionMode::AsyncQueued, "bounded" | "parallel_bounded" | "bounded_parallel" => ExecutionMode::AsyncBounded, _ => ExecutionMode::Sync, } } fn design_concurrency_limit(design_json: &Option) -> Option { design_json .as_ref() .and_then(|d| d.get("concurrencyLimit")) .and_then(|v| v.as_u64()) .map(|x| x as usize) }