feat(flow): 新增流式执行模式与SSE支持

新增流式执行模式,通过SSE实时推送节点执行事件与日志
重构HTTP执行器与中间件,提取通用HTTP客户端组件
优化前端测试面板,支持流式模式切换与实时日志展示
更新依赖版本并修复密码哈希的随机数生成器问题
修复前端节点类型映射问题,确保Code节点表单可用
This commit is contained in:
2025-09-21 01:48:24 +08:00
parent 296f0ae9f6
commit dd7857940f
24 changed files with 1695 additions and 885 deletions

View File

@ -4,7 +4,7 @@ use anyhow::Context as _;
use serde::{Deserialize, Serialize};
use crate::error::AppError;
use crate::flow::{self, dsl::FlowDSL, engine::FlowEngine, context::{DriveOptions, ExecutionMode}};
use crate::flow::{self, dsl::FlowDSL, engine::FlowEngine, context::{DriveOptions, ExecutionMode, StreamEvent}, log_handler::{FlowLogHandler, DatabaseLogHandler, SseLogHandler}};
use crate::db::Db;
use crate::models::flow as db_flow;
use crate::models::request_log; // 新增:查询最近修改人
@ -14,6 +14,10 @@ use sea_orm::{EntityTrait, ActiveModelTrait, Set, DbErr, ColumnTrait, QueryFilte
use sea_orm::entity::prelude::DateTimeWithTimeZone; // 新增:时间类型
use chrono::{Utc, FixedOffset};
use tracing::{info, error};
// 新增:用于流式事件通道
use tokio::sync::mpsc::Sender;
// 新增:用于错误下传递局部上下文与日志
use crate::flow::engine::DriveError;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FlowSummary {
@ -197,216 +201,200 @@ pub async fn delete(db: &Db, id: &str) -> anyhow::Result<()> {
}
pub async fn run(db: &Db, id: &str, req: RunReq, operator: Option<(i64, String)>) -> anyhow::Result<RunResult> {
info!(target = "udmin", "flow.run: start id={}", id);
let log_handler = DatabaseLogHandler::new(db.clone());
match run_internal(db, id, req, operator, &log_handler, None).await {
Ok((ctx, logs)) => Ok(RunResult { ok: true, ctx, logs }),
Err(e) => {
// 将运行期错误转换为 ok=false并尽量带上部分 ctx/logs
if let Some(de) = e.downcast_ref::<DriveError>().cloned() {
Ok(RunResult { ok: false, ctx: de.ctx, logs: de.logs })
} else {
let mut full = e.to_string();
for cause in e.chain().skip(1) {
full.push_str(" | ");
full.push_str(&cause.to_string());
}
Ok(RunResult { ok: false, ctx: serde_json::json!({}), logs: vec![full] })
}
}
}
}
// 新增:流式运行,向外发送节点事件与最终完成事件
pub async fn run_with_stream(
db: Db,
id: &str,
req: RunReq,
operator: Option<(i64, String)>,
event_tx: Sender<StreamEvent>,
) -> anyhow::Result<()> {
// clone 一份用于错误时补发 done
let tx_done = event_tx.clone();
let log_handler = SseLogHandler::new(db.clone(), event_tx.clone());
match run_internal(&db, id, req, operator, &log_handler, Some(event_tx)).await {
Ok((_ctx, _logs)) => Ok(()), // 正常路径log_success 内已发送 done(true,...)
Err(e) => {
// 错误路径:先在 log_error 中已发送 error 事件;此处补发 done(false,...)
if let Some(de) = e.downcast_ref::<DriveError>().cloned() {
crate::middlewares::sse::emit_done(&tx_done, false, de.ctx, de.logs).await;
} else {
let mut full = e.to_string();
for cause in e.chain().skip(1) { full.push_str(" | "); full.push_str(&cause.to_string()); }
crate::middlewares::sse::emit_done(&tx_done, false, serde_json::json!({}), vec![full]).await;
}
Ok(())
}
}
}
// 内部统一的运行方法
async fn run_internal(
db: &Db,
id: &str,
req: RunReq,
operator: Option<(i64, String)>,
log_handler: &dyn FlowLogHandler,
event_tx: Option<Sender<StreamEvent>>,
) -> anyhow::Result<(serde_json::Value, Vec<String>)> {
// 使用传入的 event_tx当启用 SSE 时由路由层提供)
info!(target = "udmin", "flow.run_internal: start id={}", id);
let start = Utc::now().with_timezone(&FixedOffset::east_opt(0).unwrap());
// 获取流程编码,便于写入运行日志
// 获取流程编码
let flow_code: Option<String> = match db_flow::Entity::find_by_id(id.to_string()).one(db).await {
Ok(Some(row)) => row.code,
_ => None,
};
// 获取流程文档并记录失败原因
// 获取流程文档
let doc = match get(db, id).await {
Ok(d) => d,
Err(e) => {
error!(target = "udmin", error = ?e, "flow.run: get doc failed id={}", id);
// 记录失败日志
let (user_id, username) = operator.map(|(u, n)| (Some(u), Some(n))).unwrap_or((None, None));
let _ = flow_run_log_service::create(db, CreateRunLogInput {
flow_id: id.to_string(),
flow_code: flow_code.clone(),
input: Some(serde_json::to_string(&req.input).unwrap_or_default()),
output: None,
ok: false,
logs: Some(format!("get doc failed: {}", e)),
user_id,
username,
started_at: start,
duration_ms: 0,
}).await;
error!(target = "udmin", error = ?e, "flow.run_internal: get doc failed id={}", id);
let error_msg = format!("get doc failed: {}", e);
log_handler.log_error(id, flow_code.as_deref(), &req.input, &error_msg, operator, start, 0).await?;
return Err(e);
}
};
// 记录文档基本信息,便于判断走 JSON 还是 YAML
info!(target = "udmin", "flow.run: doc loaded id={} has_design_json={} yaml_len={}", id, doc.design_json.is_some(), doc.yaml.len());
info!(target = "udmin", "flow.run_internal: doc loaded id={} has_design_json={} yaml_len={}", id, doc.design_json.is_some(), doc.yaml.len());
// Prefer design_json if present; otherwise fall back to YAML
// 构建 chain 与 ctx
let mut exec_mode: ExecutionMode = ExecutionMode::Sync;
let (mut chain, mut ctx) = if let Some(design) = &doc.design_json {
info!(target = "udmin", "flow.run: building chain from design_json id={}", id);
let chain_from_json = match flow::dsl::chain_from_design_json(design) {
Ok(c) => c,
Err(e) => {
error!(target = "udmin", error = ?e, "flow.run: build chain from design_json failed id={}", id);
// 记录失败日志
let (user_id, username) = operator.as_ref().map(|(u, n)| (Some(*u), Some(n.clone()))).unwrap_or((None, None));
let _ = flow_run_log_service::create(db, CreateRunLogInput {
flow_id: id.to_string(),
flow_code: flow_code.clone(),
input: Some(serde_json::to_string(&req.input).unwrap_or_default()),
output: None,
ok: false,
logs: Some(format!("build chain from design_json failed: {}", e)),
user_id,
username,
started_at: start,
duration_ms: 0,
}).await;
return Err(e);
}
};
let mut ctx = req.input.clone();
// Merge node-scoped configs into ctx under ctx.nodes
let supplement = flow::mappers::ctx_from_design_json(design);
merge_json(&mut ctx, &supplement);
// 解析 executionMode / execution_mode
let mode_str = design.get("executionMode").and_then(|v| v.as_str())
.or_else(|| design.get("execution_mode").and_then(|v| v.as_str()))
.unwrap_or("sync");
exec_mode = parse_execution_mode(mode_str);
info!(target = "udmin", "flow.run: ctx prepared from design_json id={} execution_mode={:?}", id, exec_mode);
(chain_from_json, ctx)
} else {
info!(target = "udmin", "flow.run: parsing YAML id={}", id);
let dsl = match serde_yaml::from_str::<FlowDSL>(&doc.yaml) {
Ok(d) => d,
Err(e) => {
error!(target = "udmin", error = ?e, "flow.run: parse YAML failed id={}", id);
// 记录失败日志
let (user_id, username) = operator.as_ref().map(|(u, n)| (Some(*u), Some(n.clone()))).unwrap_or((None, None));
let _ = flow_run_log_service::create(db, CreateRunLogInput {
flow_id: id.to_string(),
flow_code: flow_code.clone(),
input: Some(serde_json::to_string(&req.input).unwrap_or_default()),
output: None,
ok: false,
logs: Some(format!("parse YAML failed: {}", e)),
user_id,
username,
started_at: start,
duration_ms: 0,
}).await;
return Err(anyhow::Error::new(e).context("invalid flow yaml"));
}
};
// 从 YAML 读取执行模式
if let Some(m) = dsl.execution_mode.as_deref() { exec_mode = parse_execution_mode(m); }
(dsl.into(), req.input.clone())
};
let chain_from_json = match flow::dsl::chain_from_design_json(design) {
Ok(c) => c,
Err(e) => {
error!(target = "udmin", error = ?e, "flow.run_internal: build chain from design_json failed id={}", id);
let error_msg = format!("build chain from design_json failed: {}", e);
log_handler.log_error(id, flow_code.as_deref(), &req.input, &error_msg, operator, start, 0).await?;
return Err(e);
}
};
let mut ctx = req.input.clone();
let supplement = flow::mappers::ctx_from_design_json(design);
merge_json(&mut ctx, &supplement);
let mode_str = design.get("executionMode").and_then(|v| v.as_str())
.or_else(|| design.get("execution_mode").and_then(|v| v.as_str()))
.unwrap_or("sync");
exec_mode = parse_execution_mode(mode_str);
(chain_from_json, ctx)
} else {
let dsl = match serde_yaml::from_str::<FlowDSL>(&doc.yaml) {
Ok(d) => d,
Err(e) => {
error!(target = "udmin", error = ?e, "flow.run_internal: parse YAML failed id={}", id);
let error_msg = format!("parse YAML failed: {}", e);
log_handler.log_error(id, flow_code.as_deref(), &req.input, &error_msg, operator, start, 0).await?;
return Err(anyhow::Error::new(e).context("invalid flow yaml"));
}
};
if let Some(m) = dsl.execution_mode.as_deref() { exec_mode = parse_execution_mode(m); }
(dsl.into(), req.input.clone())
};
// 若 design_json 解析出的 chain 为空,兜底回退到 YAML
// 兜底回退
if chain.nodes.is_empty() {
info!(target = "udmin", "flow.run: empty chain from design_json, fallback to YAML id={}", id);
if !doc.yaml.trim().is_empty() {
match serde_yaml::from_str::<FlowDSL>(&doc.yaml) {
Ok(dsl) => {
chain = dsl.clone().into();
// YAML 分支下 ctx = req.input不再追加 design_json 的补充
ctx = req.input.clone();
if let Some(m) = dsl.execution_mode.as_deref() { exec_mode = parse_execution_mode(m); }
info!(target = "udmin", "flow.run: fallback YAML parsed id={} execution_mode={:?}", id, exec_mode);
}
Err(e) => {
error!(target = "udmin", error = ?e, "flow.run: fallback parse YAML failed id={}", id);
// 保留原空 chain稍后 drive 会再次报错,但这里先返回更明确的错误
let (user_id, username) = operator.as_ref().map(|(u, n)| (Some(*u), Some(n.clone()))).unwrap_or((None, None));
let _ = flow_run_log_service::create(db, CreateRunLogInput {
flow_id: id.to_string(),
flow_code: flow_code.clone(),
input: Some(serde_json::to_string(&req.input).unwrap_or_default()),
output: None,
ok: false,
logs: Some(format!("fallback parse YAML failed: {}", e)),
user_id,
username,
started_at: start,
duration_ms: 0,
}).await;
let error_msg = format!("fallback parse YAML failed: {}", e);
log_handler.log_error(id, flow_code.as_deref(), &req.input, &error_msg, operator, start, 0).await?;
return Err(anyhow::anyhow!("empty chain: design_json produced no nodes and YAML parse failed"));
}
}
} else {
// YAML 也为空
let (user_id, username) = operator.as_ref().map(|(u, n)| (Some(*u), Some(n.clone()))).unwrap_or((None, None));
let _ = flow_run_log_service::create(db, CreateRunLogInput {
flow_id: id.to_string(),
flow_code: flow_code.clone(),
input: Some(serde_json::to_string(&req.input).unwrap_or_default()),
output: None,
ok: false,
logs: Some("empty chain: both design_json and yaml are empty".to_string()),
user_id,
username,
started_at: start,
duration_ms: 0,
}).await;
return Err(anyhow::anyhow!("empty chain: both design_json and yaml are empty"));
let error_msg = "empty chain: both design_json and yaml are empty";
log_handler.log_error(id, flow_code.as_deref(), &req.input, error_msg, operator, start, 0).await?;
return Err(anyhow::anyhow!(error_msg));
}
}
// 从全局注册中心获取任务(若未初始化则返回默认注册表)
// 任务与引擎
let tasks: flow::task::TaskRegistry = flow::task::get_registry();
let engine = FlowEngine::builder().tasks(tasks).build();
info!(target = "udmin", "flow.run: driving engine id={} nodes={} links={} execution_mode={:?}", id, chain.nodes.len(), chain.links.len(), exec_mode);
// 执行
let drive_res = engine
.drive(&chain, ctx, DriveOptions { execution_mode: exec_mode.clone(), ..Default::default() })
.drive(&chain, ctx, DriveOptions { execution_mode: exec_mode.clone(), event_tx, ..Default::default() })
.await;
let (ctx, logs) = match drive_res {
Ok(r) => r,
Err(e) => {
error!(target = "udmin", error = ?e, "flow.run: engine drive failed id={}", id);
let dur = (Utc::now().with_timezone(&FixedOffset::east_opt(0).unwrap()) - start).num_milliseconds() as i64;
let (user_id, username) = operator.as_ref().map(|(u, n)| (Some(*u), Some(n.clone()))).unwrap_or((None, None));
let _ = flow_run_log_service::create(db, CreateRunLogInput {
flow_id: id.to_string(),
flow_code: flow_code.clone(),
input: Some(serde_json::to_string(&req.input).unwrap_or_default()),
output: None,
ok: false,
logs: Some(format!("engine drive failed: {}", e)),
user_id,
username,
started_at: start,
duration_ms: dur,
}).await;
return Err(e);
}
};
// 兜底移除 variable 节点:不在最终上下文暴露 variable_* 的配置
let mut ctx = ctx;
if let serde_json::Value::Object(map) = &mut ctx {
if let Some(serde_json::Value::Object(nodes)) = map.get_mut("nodes") {
let keys: Vec<String> = nodes
.iter()
.filter_map(|(k, v)| if v.get("variable").is_some() { Some(k.clone()) } else { None })
.collect();
for k in keys { nodes.remove(&k); }
match drive_res {
Ok((mut ctx, logs)) => {
// 移除 variable 节点
if let serde_json::Value::Object(map) = &mut ctx {
if let Some(serde_json::Value::Object(nodes)) = map.get_mut("nodes") {
let keys: Vec<String> = nodes
.iter()
.filter_map(|(k, v)| if v.get("variable").is_some() { Some(k.clone()) } else { None })
.collect();
for k in keys { nodes.remove(&k); }
}
}
let dur = (Utc::now().with_timezone(&FixedOffset::east_opt(0).unwrap()) - start).num_milliseconds() as i64;
log_handler.log_success(id, flow_code.as_deref(), &req.input, &ctx, &logs, operator, start, dur).await?;
Ok((ctx, logs))
}
Err(e) => {
error!(target = "udmin", error = ?e, "flow.run_internal: engine drive failed id={}", id);
let dur = (Utc::now().with_timezone(&FixedOffset::east_opt(0).unwrap()) - start).num_milliseconds() as i64;
// 优先记录详细错误(包含部分 ctx 与累计 logs
if let Some(de) = e.downcast_ref::<DriveError>().cloned() {
log_handler
.log_error_detail(
id,
flow_code.as_deref(),
&req.input,
&de.ctx,
&de.logs,
&de.message,
operator,
start,
dur,
)
.await?;
} else {
let error_msg = format!("engine drive failed: {}", e);
log_handler
.log_error(
id,
flow_code.as_deref(),
&req.input,
&error_msg,
operator,
start,
dur,
)
.await?;
}
Err(e)
}
}
// 调试:打印处理后的 ctx
//info!(target = "udmin", "flow.run: result ctx={}", serde_json::to_string(&ctx).unwrap_or_else(|_| "<serialize ctx failed>".to_string()));
info!(target = "udmin", "flow.run: done id={}", id);
// 写入成功日志
let dur = (Utc::now().with_timezone(&FixedOffset::east_opt(0).unwrap()) - start).num_milliseconds() as i64;
let (user_id, username) = operator.map(|(u, n)| (Some(u), Some(n))).unwrap_or((None, None));
let _ = flow_run_log_service::create(db, CreateRunLogInput {
flow_id: id.to_string(),
flow_code: flow_code.clone(),
input: Some(serde_json::to_string(&req.input).unwrap_or_default()),
output: Some(serde_json::to_string(&ctx).unwrap_or_default()),
ok: true,
logs: Some(serde_json::to_string(&logs).unwrap_or_default()),
user_id,
username,
started_at: start,
duration_ms: dur,
}).await;
Ok(RunResult { ok: true, ctx, logs })
}
fn extract_name(yaml: &str) -> Option<String> {