feat(flow): 添加动态API路由支持通过流程code执行

refactor(engine): 优化节点执行耗时记录
fix(db): 修正结果模式获取逻辑忽略connection.mode
style(i18n): 统一节点描述和输出模式选项的国际化
test(flow): 新增测试流程定义文件
refactor(react): 简化开发环境日志降噪处理
This commit is contained in:
2025-09-20 00:12:40 +08:00
parent 62789fce42
commit d8116ff8dc
13 changed files with 162 additions and 63 deletions

View File

@ -4,6 +4,7 @@ use futures::future::join_all;
use rhai::Engine;
use tracing::info;
use std::time::Instant;
// === 表达式评估支持thread_local 引擎与 AST 缓存,避免全局 Sync/Send 限制 ===
use std::cell::RefCell;
@ -204,6 +205,9 @@ async fn drive_from(
steps += 1;
// 读取节点
let node = match node_map.get(&current) { Some(n) => n, None => break };
// 进入节点:打点
let node_id_str = node.id.0.clone();
let node_start = Instant::now();
{
let mut lg = logs.lock().await;
lg.push(format!("enter node: {}", node.id.0));
@ -287,7 +291,16 @@ async fn drive_from(
}
}
if matches!(node.kind, NodeKind::End) { break; }
// End 节点:记录耗时后结束
if matches!(node.kind, NodeKind::End) {
let duration = node_start.elapsed().as_millis();
{
let mut lg = logs.lock().await;
lg.push(format!("leave node: {} {} ms", node_id_str, duration));
}
info!(target: "udmin.flow", "leave node: {} {} ms", node_id_str, duration);
break;
}
// 选择下一批 link仅在 Condition 节点上评估条件;其他节点忽略条件,直接沿第一条边前进
let mut nexts: Vec<String> = Vec::new();
@ -347,9 +360,25 @@ async fn drive_from(
}
}
if nexts.is_empty() { break; }
// 无后继:记录耗时后结束
if nexts.is_empty() {
let duration = node_start.elapsed().as_millis();
{
let mut lg = logs.lock().await;
lg.push(format!("leave node: {} {} ms", node_id_str, duration));
}
info!(target: "udmin.flow", "leave node: {} {} ms", node_id_str, duration);
break;
}
// 单分支:记录耗时后前进
if nexts.len() == 1 {
let duration = node_start.elapsed().as_millis();
{
let mut lg = logs.lock().await;
lg.push(format!("leave node: {} {} ms", node_id_str, duration));
}
info!(target: "udmin.flow", "leave node: {} {} ms", node_id_str, duration);
current = nexts.remove(0);
continue;
}
@ -369,6 +398,13 @@ async fn drive_from(
current = nexts.into_iter().next().unwrap();
// 在一个安全点等待已分支的完成(这里选择在下一轮进入前等待)
let _ = join_all(futs).await;
// 多分支:记录当前节点耗时(包含等待其他分支完成的时间)
let duration = node_start.elapsed().as_millis();
{
let mut lg = logs.lock().await;
lg.push(format!("leave node: {} {} ms", node_id_str, duration));
}
info!(target: "udmin.flow", "leave node: {} {} ms", node_id_str, duration);
}
Ok(())

View File

@ -25,8 +25,8 @@ impl Executor for DbTask {
// 3) 解析配置(包含可选连接信息)
let (sql, params, output_key, conn, mode_from_db) = parse_db_config(cfg)?;
// 提前读取结果模式,优先 connection.mode其次 db.output.mode/db.outputMode/db.mode
let result_mode = get_result_mode_from_conn(&conn).or(mode_from_db);
// 提前读取结果模式:仅使用 db.output.mode/db.outputMode/db.mode,忽略 connection.mode
let result_mode = mode_from_db;
info!(target = "udmin.flow", "db task: exec sql: {}", sql);
// 4) 获取连接:必须显式声明 db.connection禁止回退到项目全局数据库避免安全风险

View File

@ -0,0 +1,60 @@
use axum::{Router, routing::post, extract::{State, Path}, Json};
use crate::{db::Db, response::ApiResponse, services::flow_service, error::AppError};
use serde_json::Value;
use tracing::{info, error};
pub fn router() -> Router<Db> {
Router::new()
.route("/dynamic/{flow_code}", post(execute_flow))
}
async fn execute_flow(
State(db): State<Db>,
Path(flow_code): Path<String>,
Json(payload): Json<Value>
) -> Result<Json<ApiResponse<Value>>, AppError> {
info!(target = "udmin", "dynamic_api.execute_flow: start flow_code={}", flow_code);
// 1. 通过code查询流程
let flow_doc = match flow_service::get_by_code(&db, &flow_code).await {
Ok(doc) => doc,
Err(e) => {
error!(target = "udmin", error = ?e, "dynamic_api.execute_flow: flow not found flow_code={}", flow_code);
return Err(flow_service::ae(e));
}
};
// 2. 执行流程
let flow_id = flow_doc.id.clone();
info!(target = "udmin", "dynamic_api.execute_flow: found flow id={} for code={}", flow_id, flow_code);
match flow_service::run(&db, &flow_id, flow_service::RunReq { input: payload }, Some((0, "接口".to_string()))).await {
Ok(result) => {
info!(target = "udmin", "dynamic_api.execute_flow: execution successful flow_code={}", flow_code);
// 仅返回上下文中的 http_resp / http_response如果不存在则返回空对象 {}
let ctx = result.ctx;
let data = match ctx {
Value::Object(mut map) => {
if let Some(v) = map.remove("http_resp") {
v
} else if let Some(v) = map.remove("http_response") {
v
} else {
Value::Object(serde_json::Map::new())
}
}
_ => Value::Object(serde_json::Map::new()),
};
Ok(Json(ApiResponse::ok(data)))
},
Err(e) => {
error!(target = "udmin", error = ?e, "dynamic_api.execute_flow: execution failed flow_code={}", flow_code);
let mut full = e.to_string();
for cause in e.chain().skip(1) {
full.push_str(" | ");
full.push_str(&cause.to_string());
}
Err(AppError::InternalMsg(full))
}
}
}

View File

@ -1,13 +1,13 @@
pub mod auth;
pub mod users;
pub mod roles;
pub mod menus;
pub mod departments;
pub mod logs;
// 新增岗位
pub mod positions;
pub mod menus;
pub mod logs;
pub mod flows;
pub mod flow_run_logs;
pub mod dynamic_api;
use axum::Router;
use crate::db::Db;
@ -23,4 +23,5 @@ pub fn api_router() -> Router<Db> {
.merge(flows::router())
.merge(positions::router())
.merge(flow_run_logs::router())
.merge(dynamic_api::router())
}

View File

@ -144,6 +144,17 @@ pub async fn get(db: &Db, id: &str) -> anyhow::Result<FlowDoc> {
Ok(FlowDoc { id: row.id, yaml, design_json })
}
pub async fn get_by_code(db: &Db, code: &str) -> anyhow::Result<FlowDoc> {
let row = db_flow::Entity::find()
.filter(db_flow::Column::Code.eq(code))
.one(db)
.await?;
let row = row.ok_or_else(|| anyhow::anyhow!("flow not found with code: {}", code))?;
let yaml = row.yaml.unwrap_or_default();
let design_json = row.design_json.and_then(|s| serde_json::from_str::<serde_json::Value>(&s).ok());
Ok(FlowDoc { id: row.id, yaml, design_json })
}
pub async fn update(db: &Db, id: &str, req: FlowUpdateReq) -> anyhow::Result<FlowDoc> {
if let Some(yaml) = &req.yaml {
let _parsed: FlowDSL = serde_yaml::from_str(yaml).context("invalid flow yaml")?;
@ -193,7 +204,7 @@ pub async fn run(db: &Db, id: &str, req: RunReq, operator: Option<(i64, String)>
Ok(Some(row)) => row.code,
_ => None,
};
// 获取流程文档并记录失败原因
// 获取流程文档并记录失败原因
let doc = match get(db, id).await {
Ok(d) => d,
Err(e) => {