feat(flows): 新增流程编辑器基础功能与相关组件

feat(backend): 添加流程模型与服务支持
feat(frontend): 实现流程编辑器UI与交互
feat(assets): 添加流程节点图标资源
feat(plugins): 实现上下文菜单和运行时插件
feat(components): 新增基础节点和侧边栏组件
feat(routes): 添加流程相关路由配置
feat(models): 创建流程和运行日志数据模型
feat(services): 实现流程服务层逻辑
feat(migration): 添加流程相关数据库迁移
feat(config): 更新前端配置支持流程编辑器
feat(utils): 增强axios错误处理和工具函数
This commit is contained in:
2025-09-15 00:27:13 +08:00
parent 9da3978f91
commit b0963e5e37
291 changed files with 17947 additions and 86 deletions

View File

@ -1,5 +1,7 @@
use sea_orm::{ConnectOptions, Database, DatabaseConnection};
use std::time::Duration;
use once_cell::sync::OnceCell;
use crate::error::AppError;
pub type Db = DatabaseConnection;
@ -12,4 +14,19 @@ pub async fn init_db() -> anyhow::Result<Db> {
.sqlx_logging(false);
let conn = Database::connect(opt).await?;
Ok(conn)
}
// ===== Global DB connection (OnceCell) =====
static GLOBAL_DB: OnceCell<Db> = OnceCell::new();
pub fn set_db(conn: Db) -> Result<(), AppError> {
GLOBAL_DB
.set(conn)
.map_err(|_| AppError::Anyhow(anyhow::anyhow!("Db already initialized")))
}
pub fn get_db() -> Result<&'static Db, AppError> {
GLOBAL_DB
.get()
.ok_or_else(|| AppError::Anyhow(anyhow::anyhow!("Db not initialized")))
}

View File

@ -8,7 +8,10 @@ pub enum AppError {
#[error("forbidden")] Forbidden,
#[error("forbidden: {0}")] ForbiddenMsg(String),
#[error("bad request: {0}")] BadRequest(String),
#[error("conflict: {0}")] Conflict(String),
#[error("not found")] NotFound,
// 新增:允许在个别接口(如同步执行运行)明确返回后端的错误信息
#[error("internal error: {0}")] InternalMsg(String),
#[error(transparent)] Db(#[from] sea_orm::DbErr),
#[error(transparent)] Jwt(#[from] jsonwebtoken::errors::Error),
#[error(transparent)] Anyhow(#[from] anyhow::Error),
@ -22,7 +25,12 @@ impl IntoResponse for AppError {
AppError::Forbidden => (StatusCode::FORBIDDEN, 403, "forbidden".to_string()),
AppError::ForbiddenMsg(m) => (StatusCode::FORBIDDEN, 403, m.clone()),
AppError::BadRequest(m) => (StatusCode::BAD_REQUEST, 400, m.clone()),
AppError::Conflict(m) => (StatusCode::CONFLICT, 409, m.clone()),
AppError::NotFound => (StatusCode::NOT_FOUND, 404, "not found".into()),
// Treat JWT decode/validation errors as 401 to allow frontend to refresh tokens
AppError::Jwt(_) => (StatusCode::UNAUTHORIZED, 401, "unauthorized".to_string()),
// 新增:对 InternalMsg 直接以 500 返回详细消息
AppError::InternalMsg(m) => (StatusCode::INTERNAL_SERVER_ERROR, 500, m.clone()),
_ => (StatusCode::INTERNAL_SERVER_ERROR, 500, "internal error".into()),
};
(status, Json(ApiResponse::<serde_json::Value> { code, message: msg, data: None })).into_response()

View File

@ -0,0 +1,27 @@
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct FlowContext {
#[serde(default)]
pub data: serde_json::Value,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum ExecutionMode {
#[serde(rename = "sync")] Sync,
#[serde(rename = "async")] AsyncFireAndForget,
}
impl Default for ExecutionMode { fn default() -> Self { ExecutionMode::Sync } }
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DriveOptions {
#[serde(default)]
pub max_steps: usize,
#[serde(default)]
pub execution_mode: ExecutionMode,
}
impl Default for DriveOptions {
fn default() -> Self { Self { max_steps: 10_000, execution_mode: ExecutionMode::Sync } }
}

View File

@ -0,0 +1,44 @@
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash, Default)]
pub struct NodeId(pub String);
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum NodeKind {
Start,
End,
Task,
Decision,
}
impl Default for NodeKind {
fn default() -> Self { Self::Task }
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct NodeDef {
pub id: NodeId,
#[serde(default)]
pub kind: NodeKind,
#[serde(default)]
pub name: String,
#[serde(default)]
pub task: Option<String>, // 绑定的任务组件标识
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct LinkDef {
pub from: NodeId,
pub to: NodeId,
#[serde(default)]
pub condition: Option<String>, // 条件脚本,返回 bool
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct ChainDef {
#[serde(default)]
pub name: String,
pub nodes: Vec<NodeDef>,
#[serde(default)]
pub links: Vec<LinkDef>,
}

263
backend/src/flow/dsl.rs Normal file
View File

@ -0,0 +1,263 @@
use serde::{Deserialize, Serialize};
use serde_json::Value;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FlowDSL {
#[serde(default)]
pub name: String,
#[serde(default, alias = "executionMode")]
pub execution_mode: Option<String>,
pub nodes: Vec<NodeDSL>,
#[serde(default)]
pub edges: Vec<EdgeDSL>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NodeDSL {
pub id: String,
#[serde(default)]
pub kind: String, // start / end / task / decision
#[serde(default)]
pub name: String,
#[serde(default)]
pub task: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EdgeDSL {
#[serde(alias = "source", alias = "from", rename = "from")]
pub from: String,
#[serde(alias = "target", alias = "to", rename = "to")]
pub to: String,
#[serde(default)]
pub condition: Option<String>,
}
impl From<FlowDSL> for super::domain::ChainDef {
fn from(v: FlowDSL) -> Self {
super::domain::ChainDef {
name: v.name,
nodes: v
.nodes
.into_iter()
.map(|n| super::domain::NodeDef {
id: super::domain::NodeId(n.id),
kind: match n.kind.to_lowercase().as_str() {
"start" => super::domain::NodeKind::Start,
"end" => super::domain::NodeKind::End,
"decision" => super::domain::NodeKind::Decision,
_ => super::domain::NodeKind::Task,
},
name: n.name,
task: n.task,
})
.collect(),
links: v
.edges
.into_iter()
.map(|e| super::domain::LinkDef {
from: super::domain::NodeId(e.from),
to: super::domain::NodeId(e.to),
condition: e.condition,
})
.collect(),
}
}
}
// ===== New: Parse design_json (free layout JSON) to ChainDef and build execution context =====
/// Build ChainDef from design_json (front-end flow JSON)
pub fn chain_from_design_json(design: &Value) -> anyhow::Result<super::domain::ChainDef> {
use super::domain::{ChainDef, NodeDef, NodeId, NodeKind, LinkDef};
// Accept both JSON object and stringified JSON
let parsed: Option<Value> = match design {
Value::String(s) => serde_json::from_str::<Value>(s).ok(),
_ => None,
};
let design = parsed.as_ref().unwrap_or(design);
let name = design
.get("name")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let nodes_arr = design.get("nodes").and_then(|v| v.as_array()).cloned().unwrap_or_default();
let mut nodes: Vec<NodeDef> = Vec::new();
for n in &nodes_arr {
let id = n.get("id").and_then(|v| v.as_str()).unwrap_or_default();
let t = n.get("type").and_then(|v| v.as_str()).unwrap_or("task");
let name_field = n
.get("data")
.and_then(|d| d.get("title"))
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let kind = match t {
"start" => NodeKind::Start,
"end" => NodeKind::End,
"condition" => NodeKind::Decision,
_ => NodeKind::Task,
};
// Map type to task executor id (only for executable nodes). Others will be None.
let task = match t {
"http" => Some("http".to_string()),
"db" => Some("db".to_string()),
_ => None,
};
nodes.push(NodeDef { id: NodeId(id.to_string()), kind, name: name_field, task });
}
let mut links: Vec<LinkDef> = Vec::new();
if let Some(arr) = design.get("edges").and_then(|v| v.as_array()) {
for e in arr {
let from = e
.get("sourceNodeID")
.or_else(|| e.get("source"))
.or_else(|| e.get("from"))
.and_then(|v| v.as_str())
.unwrap_or_default()
.to_string();
let to = e
.get("targetNodeID")
.or_else(|| e.get("target"))
.or_else(|| e.get("to"))
.and_then(|v| v.as_str())
.unwrap_or_default()
.to_string();
// Try build structured condition for edges from a condition node via sourcePortID mapping
let mut cond: Option<String> = None;
if let Some(spid) = e.get("sourcePortID").and_then(|v| v.as_str()) {
// find source node
if let Some(src_node) = nodes_arr.iter().find(|n| n.get("id").and_then(|v| v.as_str()) == Some(from.as_str())) {
if src_node.get("type").and_then(|v| v.as_str()) == Some("condition") {
if let Some(conds) = src_node.get("data").and_then(|d| d.get("conditions")).and_then(|v| v.as_array()) {
if let Some(item) = conds.iter().find(|c| c.get("key").and_then(|v| v.as_str()) == Some(spid)) {
if let Some(val) = item.get("value") {
// store JSON string for engine to interpret at runtime
if let Ok(s) = serde_json::to_string(val) { cond = Some(s); }
}
}
}
}
}
}
links.push(LinkDef { from: NodeId(from), to: NodeId(to), condition: cond });
}
}
Ok(ChainDef { name, nodes, links })
}
/// Trim whitespace and strip wrapping quotes/backticks if present
fn sanitize_wrapped(s: &str) -> String {
let mut t = s.trim();
if t.len() >= 2 {
let bytes = t.as_bytes();
let first = bytes[0] as char;
let last = bytes[t.len() - 1] as char;
if (first == '`' && last == '`') || (first == '"' && last == '"') || (first == '\'' && last == '\'') {
t = &t[1..t.len() - 1];
t = t.trim();
// Handle stray trailing backslash left by an attempted escape of the closing quote/backtick
if t.ends_with('\\') {
t = &t[..t.len() - 1];
}
}
}
t.to_string()
}
/// Build ctx supplement from design_json: fill node-scope configs for executors, e.g., nodes.<id>.http
pub fn ctx_from_design_json(design: &Value) -> Value {
use serde_json::json;
// Accept both JSON object and stringified JSON
let parsed: Option<Value> = match design {
Value::String(s) => serde_json::from_str::<Value>(s).ok(),
_ => None,
};
let design = parsed.as_ref().unwrap_or(design);
let mut nodes_map = serde_json::Map::new();
if let Some(arr) = design.get("nodes").and_then(|v| v.as_array()) {
for n in arr {
let id = match n.get("id").and_then(|v| v.as_str()) {
Some(s) => s,
None => continue,
};
let node_type = n.get("type").and_then(|v| v.as_str()).unwrap_or("");
let mut node_cfg = serde_json::Map::new();
match node_type {
"http" => {
// Extract http config: method, url, headers, query, body
let data = n.get("data");
let api = data.and_then(|d| d.get("api"));
let method = api.and_then(|a| a.get("method")).and_then(|v| v.as_str()).unwrap_or("GET").to_string();
let url_val = api.and_then(|a| a.get("url"));
let raw_url = match url_val {
Some(Value::String(s)) => s.clone(),
Some(Value::Object(obj)) => obj.get("content").and_then(|v| v.as_str()).unwrap_or("").to_string(),
_ => String::new(),
};
let url = sanitize_wrapped(&raw_url);
if !url.is_empty() {
let mut http_obj = serde_json::Map::new();
http_obj.insert("method".into(), Value::String(method));
http_obj.insert("url".into(), Value::String(url));
// Optionally: headers/query/body
if let Some(hs) = api.and_then(|a| a.get("headers")).and_then(|v| v.as_object()) {
let mut heads = serde_json::Map::new();
for (k, v) in hs.iter() {
if let Some(s) = v.as_str() { heads.insert(k.clone(), Value::String(s.to_string())); }
}
if !heads.is_empty() { http_obj.insert("headers".into(), Value::Object(heads)); }
}
if let Some(qs) = api.and_then(|a| a.get("query")).and_then(|v| v.as_object()) {
let mut query = serde_json::Map::new();
for (k, v) in qs.iter() { query.insert(k.clone(), v.clone()); }
if !query.is_empty() { http_obj.insert("query".into(), Value::Object(query)); }
}
if let Some(body_obj) = data.and_then(|d| d.get("body")).and_then(|v| v.as_object()) {
// try body.content or body.json
if let Some(Value::Object(json_body)) = body_obj.get("json") { http_obj.insert("body".into(), Value::Object(json_body.clone())); }
else if let Some(Value::String(s)) = body_obj.get("content") { http_obj.insert("body".into(), Value::String(s.clone())); }
}
node_cfg.insert("http".into(), Value::Object(http_obj));
}
}
"db" => {
// Extract db config: sql, params, outputKey
let data = n.get("data");
if let Some(db_cfg) = data.and_then(|d| d.get("db")).and_then(|v| v.as_object()) {
let mut db_obj = serde_json::Map::new();
// sql can be string or object with content
let raw_sql = db_cfg.get("sql");
let sql = match raw_sql {
Some(Value::String(s)) => sanitize_wrapped(s),
Some(Value::Object(o)) => o.get("content").and_then(|v| v.as_str()).map(sanitize_wrapped).unwrap_or_default(),
_ => String::new(),
};
if !sql.is_empty() { db_obj.insert("sql".into(), Value::String(sql)); }
if let Some(p) = db_cfg.get("params") { db_obj.insert("params".into(), p.clone()); }
if let Some(Value::String(k)) = db_cfg.get("outputKey") { db_obj.insert("outputKey".into(), Value::String(k.clone())); }
if let Some(conn) = db_cfg.get("connection") { db_obj.insert("connection".into(), conn.clone()); }
if !db_obj.is_empty() { node_cfg.insert("db".into(), Value::Object(db_obj)); }
}
}
_ => {}
}
if !node_cfg.is_empty() { nodes_map.insert(id.to_string(), Value::Object(node_cfg)); }
}
}
json!({ "nodes": Value::Object(nodes_map) })
}

173
backend/src/flow/engine.rs Normal file
View File

@ -0,0 +1,173 @@
use std::collections::HashMap;
use rhai::Engine;
use tracing::info;
use super::{context::{DriveOptions, ExecutionMode}, domain::{ChainDef, NodeKind}, task::TaskRegistry};
pub struct FlowEngine {
pub tasks: TaskRegistry,
}
impl FlowEngine {
pub fn new(tasks: TaskRegistry) -> Self { Self { tasks } }
pub async fn drive(&self, chain: &ChainDef, mut ctx: serde_json::Value, opts: DriveOptions) -> anyhow::Result<(serde_json::Value, Vec<String>)> {
let mut logs = Vec::new();
// 查找 start优先 Start 节点;否则选择入度为 0 的第一个节点;再否则回退第一个节点
let start = if let Some(n) = chain
.nodes
.iter()
.find(|n| matches!(n.kind, NodeKind::Start))
{
n.id.0.clone()
} else {
// 计算入度
let mut indeg: HashMap<&str, usize> = HashMap::new();
for n in &chain.nodes { indeg.entry(n.id.0.as_str()).or_insert(0); }
for l in &chain.links { *indeg.entry(l.to.0.as_str()).or_insert(0) += 1; }
if let Some(n) = chain.nodes.iter().find(|n| indeg.get(n.id.0.as_str()).copied().unwrap_or(0) == 0) {
n.id.0.clone()
} else {
chain
.nodes
.first()
.ok_or_else(|| anyhow::anyhow!("empty chain"))?
.id
.0
.clone()
}
};
// 邻接表(按 links 的原始顺序保序)
let mut adj: HashMap<&str, Vec<&super::domain::LinkDef>> = HashMap::new();
for l in &chain.links { adj.entry(&l.from.0).or_default().push(l); }
let node_map: HashMap<&str, &super::domain::NodeDef> = chain.nodes.iter().map(|n| (n.id.0.as_str(), n)).collect();
let mut current = start;
let mut steps = 0usize;
while steps < opts.max_steps {
steps += 1;
let node = node_map.get(current.as_str()).ok_or_else(|| anyhow::anyhow!("node not found"))?;
logs.push(format!("enter node: {}", node.id.0));
info!(target: "udmin.flow", "enter node: {}", node.id.0);
// 任务执行
if let Some(task_name) = &node.task {
if let Some(task) = self.tasks.get(task_name) {
match opts.execution_mode {
ExecutionMode::Sync => {
if let serde_json::Value::Object(obj) = &mut ctx { obj.insert("__current_node_id".to_string(), serde_json::Value::String(node.id.0.clone())); }
task.execute(&mut ctx).await?;
logs.push(format!("exec task: {} (sync)", task_name));
info!(target: "udmin.flow", "exec task: {} (sync)", task_name);
}
ExecutionMode::AsyncFireAndForget => {
// fire-and-forget: 复制一份上下文供该任务使用,主流程不等待
let mut task_ctx = ctx.clone();
if let serde_json::Value::Object(obj) = &mut task_ctx { obj.insert("__current_node_id".to_string(), serde_json::Value::String(node.id.0.clone())); }
let task_arc = task.clone();
let name_for_log = task_name.clone();
tokio::spawn(async move {
let _ = task_arc.execute(&mut task_ctx).await;
info!(target: "udmin.flow", "exec task done (async): {}", name_for_log);
});
logs.push(format!("spawn task: {} (async)", task_name));
info!(target: "udmin.flow", "spawn task: {} (async)", task_name);
}
}
} else {
logs.push(format!("task not found: {} (skip)", task_name));
info!(target: "udmin.flow", "task not found: {} (skip)", task_name);
}
}
if matches!(node.kind, NodeKind::End) { break; }
// 选择下一条 link优先有条件的且为真否则保序选择第一条无条件边
let mut next: Option<String> = None;
if let Some(links) = adj.get(node.id.0.as_str()) {
// 先检测条件边
for link in links.iter() {
if let Some(cond_str) = &link.condition {
// 两种情况:
// 1) 前端序列化的 JSON形如 { left: {type, content}, operator, right? }
// 2) 直接是 rhai 表达式字符串
let ok = if cond_str.trim_start().starts_with('{') {
match serde_json::from_str::<serde_json::Value>(cond_str) {
Ok(v) => eval_condition_json(&ctx, &v).unwrap_or(false),
Err(_) => false,
}
} else {
let mut scope = rhai::Scope::new();
scope.push("ctx", rhai::serde::to_dynamic(ctx.clone()).map_err(|e| anyhow::anyhow!(e.to_string()))?);
let engine = Engine::new();
engine.eval_with_scope::<bool>(&mut scope, cond_str).unwrap_or(false)
};
if ok { next = Some(link.to.0.clone()); break; }
}
}
// 若没有命中条件边,则取第一条无条件边
if next.is_none() {
for link in links.iter() {
if link.condition.is_none() { next = Some(link.to.0.clone()); break; }
}
}
}
match next { Some(n) => current = n, None => break }
}
Ok((ctx, logs))
}
}
fn eval_condition_json(ctx: &serde_json::Value, cond: &serde_json::Value) -> anyhow::Result<bool> {
// 目前支持前端 Condition 组件导出的: { left:{type, content}, operator, right? }
let left = cond.get("left").ok_or_else(|| anyhow::anyhow!("missing left"))?;
let op = cond.get("operator").and_then(|v| v.as_str()).unwrap_or("");
let right = cond.get("right");
let lval = resolve_value(ctx, left)?;
let rval = match right { Some(v) => Some(resolve_value(ctx, v)?), None => None };
use serde_json::Value as V;
let res = match (op, &lval, &rval) {
("contains", V::String(s), Some(V::String(t))) => s.contains(t),
("equals", V::String(s), Some(V::String(t))) => s == t,
("equals", V::Number(a), Some(V::Number(b))) => a == b,
("is_true", V::Bool(b), _) => *b,
("is_false", V::Bool(b), _) => !*b,
("gt", V::Number(a), Some(V::Number(b))) => a.as_f64().unwrap_or(0.0) > b.as_f64().unwrap_or(0.0),
("lt", V::Number(a), Some(V::Number(b))) => a.as_f64().unwrap_or(0.0) < b.as_f64().unwrap_or(0.0),
_ => false,
};
Ok(res)
}
fn resolve_value(ctx: &serde_json::Value, v: &serde_json::Value) -> anyhow::Result<serde_json::Value> {
use serde_json::Value as V;
let t = v.get("type").and_then(|v| v.as_str()).unwrap_or("");
match t {
"constant" => Ok(v.get("content").cloned().unwrap_or(V::Null)),
"ref" => {
// content: [nodeId, field]
if let Some(arr) = v.get("content").and_then(|v| v.as_array()) {
if arr.len() >= 2 {
if let (Some(node), Some(field)) = (arr[0].as_str(), arr[1].as_str()) {
let val = ctx
.get("nodes")
.and_then(|n| n.get(node))
.and_then(|m| m.get(field))
.cloned()
.or_else(|| ctx.get(field).cloned())
.unwrap_or(V::Null);
return Ok(val);
}
}
}
Ok(V::Null)
}
_ => Ok(V::Null),
}
}

View File

@ -0,0 +1,261 @@
use async_trait::async_trait;
use serde_json::{json, Value};
use tracing::info;
use crate::flow::task::TaskComponent;
#[derive(Default)]
pub struct DbTask;
#[async_trait]
impl TaskComponent for DbTask {
async fn execute(&self, ctx: &mut Value) -> anyhow::Result<()> {
// 1) 获取当前节点ID
let node_id_opt = ctx
.get("__current_node_id")
.and_then(|v| v.as_str())
.map(|s| s.to_string());
// 2) 读取 db 配置:仅节点级 db不再回退到全局 ctx.db避免误用项目数据库
let cfg = match (&node_id_opt, ctx.get("nodes")) {
(Some(node_id), Some(nodes)) => nodes.get(&node_id).and_then(|n| n.get("db")).cloned(),
_ => None,
};
let Some(cfg) = cfg else {
info!(target = "udmin.flow", "db task: no config found, skip");
return Ok(());
};
// 3) 解析配置(包含可选连接信息)
let (sql, params, output_key, conn, mode_from_db) = parse_db_config(cfg)?;
// 提前读取结果模式,优先 connection.mode其次 db.output.mode/db.outputMode/db.mode
let result_mode = get_result_mode_from_conn(&conn).or(mode_from_db);
info!(target = "udmin.flow", "db task: exec sql: {}", sql);
// 4) 获取连接:必须显式声明 db.connection禁止回退到项目全局数据库避免安全风险
let db: std::borrow::Cow<'_, crate::db::Db>;
let tmp_conn; // 用于在本作用域内持有临时连接
use sea_orm::{Statement, ConnectionTrait};
let conn_cfg = conn.ok_or_else(|| anyhow::anyhow!("db task: connection config is required (db.connection)"))?;
// 构造 URL 并建立临时连接
let url = extract_connection_url(conn_cfg)?;
use sea_orm::{ConnectOptions, Database};
use std::time::Duration;
let mut opt = ConnectOptions::new(url);
opt.max_connections(20)
.min_connections(1)
.connect_timeout(Duration::from_secs(8))
.idle_timeout(Duration::from_secs(120))
.sqlx_logging(true);
tmp_conn = Database::connect(opt).await?;
db = std::borrow::Cow::Owned(tmp_conn);
// 判定是否为 SELECT简单判断前缀允许前导空白与括号
let is_select = {
let s = sql.trim_start();
let s = s.trim_start_matches('(');
s.to_uppercase().starts_with("SELECT")
};
// 构建参数列表(支持位置和命名两种形式)
let params_vec: Vec<sea_orm::Value> = match params {
None => vec![],
Some(Value::Array(arr)) => arr.into_iter().map(json_to_db_value).collect::<anyhow::Result<_>>()?,
Some(Value::Object(obj)) => {
// 对命名参数对象,保持插入顺序不可控,这里仅将值收集为位置绑定,建议 SQL 使用 `?` 占位
obj.into_iter().map(|(_, v)| json_to_db_value(v)).collect::<anyhow::Result<_>>()?
}
Some(v) => {
// 其它类型:当作单个位置参数
vec![json_to_db_value(v)?]
}
};
let stmt = Statement::from_sql_and_values(db.get_database_backend(), &sql, params_vec);
let result = if is_select {
let rows = db.query_all(stmt).await?;
// 将 QueryResult 转换为 JSON 数组
let mut out = Vec::with_capacity(rows.len());
for row in rows {
let mut obj = serde_json::Map::new();
// 读取列名列表
let cols = row.column_names();
for (idx, col_name) in cols.iter().enumerate() {
let key = col_name.to_string();
// 尝试以通用 JSON 值提取优先字符串、数值、布尔、二进制、null
let val = try_get_as_json(&row, idx, &key);
obj.insert(key, val);
}
out.push(Value::Object(obj));
}
// 默认 rows 模式:直接返回数组
match result_mode.as_deref() {
// 返回首行字段对象(无则 Null
Some("fields") | Some("first") => {
if let Some(Value::Object(m)) = out.get(0) { Value::Object(m.clone()) } else { Value::Null }
}
// 默认与显式 rows 都返回数组
_ => Value::Array(out),
}
} else {
let exec = db.execute(stmt).await?;
// 非 SELECT 默认返回受影响行数
match result_mode.as_deref() {
// 如显式要求 rows则返回空数组
Some("rows") => json!([]),
_ => json!(exec.rows_affected()),
}
};
// 5) 写回 ctx并对敏感信息脱敏
let write_key = output_key.unwrap_or_else(|| "db_response".to_string());
if let (Some(node_id), Some(obj)) = (node_id_opt, ctx.as_object_mut()) {
if let Some(nodes) = obj.get_mut("nodes").and_then(|v| v.as_object_mut()) {
if let Some(target) = nodes.get_mut(&node_id).and_then(|v| v.as_object_mut()) {
// 写入结果
target.insert(write_key, result);
// 对密码字段脱敏(保留其它配置不变)
if let Some(dbv) = target.get_mut("db") {
if let Some(dbo) = dbv.as_object_mut() {
if let Some(connv) = dbo.get_mut("connection") {
match connv {
Value::Object(m) => {
if let Some(pw) = m.get_mut("password") {
*pw = Value::String("***".to_string());
}
if let Some(Value::String(url)) = m.get_mut("url") {
*url = "***".to_string();
}
}
Value::String(s) => {
*s = "***".to_string();
}
_ => {}
}
}
}
}
return Ok(());
}
}
}
if let Value::Object(map) = ctx { map.insert(write_key, result); }
Ok(())
}
}
fn parse_db_config(cfg: Value) -> anyhow::Result<(String, Option<Value>, Option<String>, Option<Value>, Option<String>)> {
match cfg {
Value::String(sql) => Ok((sql, None, None, None, None)),
Value::Object(mut m) => {
let sql = m
.remove("sql")
.and_then(|v| v.as_str().map(|s| s.to_string()))
.ok_or_else(|| anyhow::anyhow!("db config missing sql"))?;
let params = m.remove("params");
let output_key = m.remove("outputKey").and_then(|v| v.as_str().map(|s| s.to_string()));
// 在移除 connection 前,从 db 层读取可能的输出模式
let mode_from_db = {
// db.output.mode
let from_output = m.get("output").and_then(|v| v.as_object()).and_then(|o| o.get("mode")).and_then(|v| v.as_str()).map(|s| s.to_string());
// db.outputMode 或 db.mode
let from_flat = m.get("outputMode").and_then(|v| v.as_str()).map(|s| s.to_string())
.or_else(|| m.get("mode").and_then(|v| v.as_str()).map(|s| s.to_string()));
from_output.or(from_flat)
};
let conn = m.remove("connection");
// 安全策略:必须显式声明连接,禁止默认落到全局数据库
if conn.is_none() {
return Err(anyhow::anyhow!("db config missing connection (db.connection is required)"));
}
Ok((sql, params, output_key, conn, mode_from_db))
}
_ => Err(anyhow::anyhow!("invalid db config")),
}
}
fn extract_connection_url(cfg: Value) -> anyhow::Result<String> {
match cfg {
Value::String(url) => Ok(url),
Value::Object(mut m) => {
if let Some(url) = m.remove("url").and_then(|v| v.as_str().map(|s| s.to_string())) {
return Ok(url);
}
let driver = m
.remove("driver")
.and_then(|v| v.as_str().map(|s| s.to_string()))
.unwrap_or_else(|| "mysql".to_string());
// sqlite 特殊处理:仅需要 database文件路径或 :memory:
if driver == "sqlite" {
let database = m.remove("database").and_then(|v| v.as_str().map(|s| s.to_string())).ok_or_else(|| anyhow::anyhow!("connection.database is required for sqlite unless url provided"))?;
return Ok(format!("sqlite://{}", database));
}
let host = m.remove("host").and_then(|v| v.as_str().map(|s| s.to_string())).unwrap_or_else(|| "localhost".to_string());
let port = m.remove("port").map(|v| match v { Value::Number(n) => n.to_string(), Value::String(s) => s, _ => String::new() });
let database = m.remove("database").and_then(|v| v.as_str().map(|s| s.to_string())).ok_or_else(|| anyhow::anyhow!("connection.database is required unless url provided"))?;
let username = m.remove("username").and_then(|v| v.as_str().map(|s| s.to_string())).ok_or_else(|| anyhow::anyhow!("connection.username is required unless url provided"))?;
let password = m.remove("password").and_then(|v| v.as_str().map(|s| s.to_string())).unwrap_or_default();
let port_part = port.filter(|s| !s.is_empty()).map(|s| format!(":{}", s)).unwrap_or_default();
let url = format!(
"{}://{}:{}@{}{}{}",
driver,
percent_encoding::utf8_percent_encode(&username, percent_encoding::NON_ALPHANUMERIC),
percent_encoding::utf8_percent_encode(&password, percent_encoding::NON_ALPHANUMERIC),
host,
port_part,
format!("/{}", database)
);
Ok(url)
}
_ => Err(anyhow::anyhow!("invalid connection config")),
}
}
fn get_result_mode_from_conn(conn: &Option<Value>) -> Option<String> {
match conn {
Some(Value::Object(m)) => m.get("mode").and_then(|v| v.as_str()).map(|s| s.to_string()),
_ => None,
}
}
fn json_to_db_value(v: Value) -> anyhow::Result<sea_orm::Value> {
use sea_orm::Value as DbValue;
let dv = match v {
Value::Null => DbValue::String(None),
Value::Bool(b) => DbValue::Bool(Some(b)),
Value::Number(n) => {
if let Some(i) = n.as_i64() { DbValue::BigInt(Some(i)) }
else if let Some(u) = n.as_u64() { DbValue::BigUnsigned(Some(u)) }
else if let Some(f) = n.as_f64() { DbValue::Double(Some(f)) }
else { DbValue::String(None) }
}
Value::String(s) => DbValue::String(Some(Box::new(s))),
Value::Array(arr) => {
// 无通用跨库数组类型:存为 JSON 字符串
let s = serde_json::to_string(&Value::Array(arr))?;
DbValue::String(Some(Box::new(s)))
}
Value::Object(obj) => {
let s = serde_json::to_string(&Value::Object(obj))?;
DbValue::String(Some(Box::new(s)))
}
};
Ok(dv)
}
fn try_get_as_json(row: &sea_orm::QueryResult, idx: usize, col_name: &str) -> Value {
use sea_orm::TryGetable;
// 尝试多种基础类型
if let Ok(v) = row.try_get::<Option<String>>("", col_name) { return v.map(Value::String).unwrap_or(Value::Null); }
if let Ok(v) = row.try_get::<Option<i64>>("", col_name) { return v.map(|x| json!(x)).unwrap_or(Value::Null); }
if let Ok(v) = row.try_get::<Option<u64>>("", col_name) { return v.map(|x| json!(x)).unwrap_or(Value::Null); }
if let Ok(v) = row.try_get::<Option<f64>>("", col_name) { return v.map(|x| json!(x)).unwrap_or(Value::Null); }
if let Ok(v) = row.try_get::<Option<bool>>("", col_name) { return v.map(|x| json!(x)).unwrap_or(Value::Null); }
// 回退:按索引读取成字符串
if let Ok(v) = row.try_get_by_index::<Option<String>>(idx) { return v.map(Value::String).unwrap_or(Value::Null); }
Value::Null
}

View File

@ -0,0 +1,161 @@
use async_trait::async_trait;
use serde_json::{Value, json, Map};
use tracing::info;
use std::collections::HashMap;
use std::time::Duration;
use reqwest::Certificate;
use crate::flow::task::TaskComponent;
#[derive(Default)]
pub struct HttpTask;
#[derive(Default, Clone)]
struct HttpOpts {
timeout_ms: Option<u64>,
insecure: bool,
ca_pem: Option<String>,
http1_only: bool,
}
#[async_trait]
impl TaskComponent for HttpTask {
async fn execute(&self, ctx: &mut Value) -> anyhow::Result<()> {
// 1) 读取当前节点ID由引擎在执行前写入 ctx.__current_node_id
let node_id_opt = ctx
.get("__current_node_id")
.and_then(|v| v.as_str())
.map(|s| s.to_string());
// 2) 从 ctx 中提取 http 配置
// 优先 nodes.<node_id>.http其次全局 http
let cfg = match (&node_id_opt, ctx.get("nodes")) {
(Some(node_id), Some(nodes)) => nodes.get(&node_id).and_then(|n| n.get("http")).cloned(),
_ => None,
}.or_else(|| ctx.get("http").cloned());
let Some(cfg) = cfg else {
// 未提供配置,直接跳过(也可选择返回错误)
info!(target = "udmin.flow", "http task: no config found, skip");
return Ok(());
};
// 3) 解析配置
let (method, url, headers, query, body, opts) = parse_http_config(cfg)?;
info!(target = "udmin.flow", "http task: {} {}", method, url);
// 4) 发送请求(支持 HTTPS 相关选项)
let client = {
let mut builder = reqwest::Client::builder();
if let Some(ms) = opts.timeout_ms { builder = builder.timeout(Duration::from_millis(ms)); }
if opts.insecure { builder = builder.danger_accept_invalid_certs(true); }
if opts.http1_only { builder = builder.http1_only(); }
if let Some(pem) = opts.ca_pem {
if let Ok(cert) = Certificate::from_pem(pem.as_bytes()) {
builder = builder.add_root_certificate(cert);
}
}
builder.build()?
};
let mut req = client.request(method.parse()?, url);
if let Some(hs) = headers {
use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
let mut map = HeaderMap::new();
for (k, v) in hs {
if let (Ok(name), Ok(value)) = (HeaderName::from_bytes(k.as_bytes()), HeaderValue::from_str(&v)) {
map.insert(name, value);
}
}
req = req.headers(map);
}
if let Some(qs) = query {
// 将查询参数转成 (String, String) 列表,便于 reqwest 序列化
let mut pairs: Vec<(String, String)> = Vec::new();
for (k, v) in qs {
let s = match v {
Value::String(s) => s,
Value::Number(n) => n.to_string(),
Value::Bool(b) => b.to_string(),
other => other.to_string(),
};
pairs.push((k, s));
}
req = req.query(&pairs);
}
if let Some(b) = body {
req = req.json(&b);
}
let resp = req.send().await?;
let status = resp.status().as_u16();
let headers_out: Map<String, Value> = resp
.headers()
.iter()
.map(|(k, v)| (k.to_string(), Value::String(v.to_str().unwrap_or("").to_string())))
.collect();
// 尝试以 JSON 解析,否则退回文本
let text = resp.text().await?;
let parsed_body: Value = serde_json::from_str(&text).unwrap_or_else(|_| Value::String(text));
// 5) 将结果写回 ctx
let result = json!({
"status": status,
"headers": headers_out,
"body": parsed_body,
});
// 优先写 nodes.<node_id>.http_response否则写入全局 http_response
if let (Some(node_id), Some(obj)) = (node_id_opt, ctx.as_object_mut()) {
if let Some(nodes) = obj.get_mut("nodes").and_then(|v| v.as_object_mut()) {
if let Some(target) = nodes.get_mut(&node_id).and_then(|v| v.as_object_mut()) {
target.insert("http_response".to_string(), result);
return Ok(());
}
}
}
// 退回:写入全局
if let Value::Object(map) = ctx {
map.insert("http_response".to_string(), result);
}
Ok(())
}
}
fn parse_http_config(cfg: Value) -> anyhow::Result<(
String,
String,
Option<HashMap<String, String>>,
Option<Map<String, Value>>,
Option<Value>,
HttpOpts,
)> {
// 支持两种配置:
// 1) 字符串:视为 URL方法 GET
// 2) 对象:{ method, url, headers, query, body }
match cfg {
Value::String(url) => Ok(("GET".into(), url, None, None, None, HttpOpts::default())),
Value::Object(mut m) => {
let method = m.remove("method").and_then(|v| v.as_str().map(|s| s.to_uppercase())).unwrap_or_else(|| "GET".into());
let url = m.remove("url").and_then(|v| v.as_str().map(|s| s.to_string()))
.ok_or_else(|| anyhow::anyhow!("http config missing url"))?;
let headers = m.remove("headers").and_then(|v| v.as_object().cloned()).map(|obj| {
obj.into_iter().filter_map(|(k, v)| v.as_str().map(|s| (k, s.to_string()))).collect::<HashMap<String, String>>()
});
let query = m.remove("query").and_then(|v| v.as_object().cloned());
let body = m.remove("body");
// 可选 HTTPS/超时/HTTP 版本配置
let timeout_ms = m.remove("timeout_ms").and_then(|v| v.as_u64());
let insecure = m.remove("insecure").and_then(|v| v.as_bool()).unwrap_or(false);
let http1_only = m.remove("http1_only").and_then(|v| v.as_bool()).unwrap_or(false);
let ca_pem = m.remove("ca_pem").and_then(|v| v.as_str().map(|s| s.to_string()));
let opts = HttpOpts { timeout_ms, insecure, ca_pem, http1_only };
Ok((method, url, headers, query, body, opts))
}
_ => Err(anyhow::anyhow!("invalid http config")),
}
}

View File

@ -0,0 +1,2 @@
pub mod http;
pub mod db;

7
backend/src/flow/mod.rs Normal file
View File

@ -0,0 +1,7 @@
pub mod domain;
pub mod context;
pub mod task;
pub mod engine;
pub mod dsl;
pub mod storage;
pub mod executors;

View File

@ -0,0 +1,15 @@
use once_cell::sync::Lazy;
use std::collections::HashMap;
use std::sync::Mutex;
static STORE: Lazy<Mutex<HashMap<String, String>>> = Lazy::new(|| Mutex::new(HashMap::new()));
pub fn list() -> Vec<(String, String)> {
STORE.lock().unwrap().iter().map(|(k, v)| (k.clone(), v.clone())).collect()
}
pub fn get(id: &str) -> Option<String> { STORE.lock().unwrap().get(id).cloned() }
pub fn put(id: String, yaml: String) { STORE.lock().unwrap().insert(id, yaml); }
pub fn del(id: &str) -> Option<String> { STORE.lock().unwrap().remove(id) }

41
backend/src/flow/task.rs Normal file
View File

@ -0,0 +1,41 @@
use async_trait::async_trait;
use serde_json::Value;
#[async_trait]
pub trait TaskComponent: Send + Sync {
async fn execute(&self, ctx: &mut Value) -> anyhow::Result<()>;
}
pub type TaskRegistry = std::collections::HashMap<String, std::sync::Arc<dyn TaskComponent>>;
use std::sync::{Arc, RwLock, OnceLock};
pub fn default_registry() -> TaskRegistry {
let mut reg: TaskRegistry = TaskRegistry::new();
reg.insert("http".into(), Arc::new(crate::flow::executors::http::HttpTask::default()));
reg.insert("db".into(), Arc::new(crate::flow::executors::db::DbTask::default()));
reg
}
// ===== Global registry (for DI/registry center) =====
static GLOBAL_TASK_REGISTRY: OnceLock<RwLock<TaskRegistry>> = OnceLock::new();
/// Get a snapshot of current registry (clone of HashMap). If not initialized, it will be filled with default_registry().
pub fn get_registry() -> TaskRegistry {
let lock = GLOBAL_TASK_REGISTRY.get_or_init(|| RwLock::new(default_registry()));
lock.read().expect("lock poisoned").clone()
}
/// Register/override a single task into global registry.
pub fn register_global_task(name: impl Into<String>, task: Arc<dyn TaskComponent>) {
let lock = GLOBAL_TASK_REGISTRY.get_or_init(|| RwLock::new(default_registry()));
let mut w = lock.write().expect("lock poisoned");
w.insert(name.into(), task);
}
/// Initialize or mutate the global registry with a custom initializer.
pub fn init_global_registry_with(init: impl FnOnce(&mut TaskRegistry)) {
let lock = GLOBAL_TASK_REGISTRY.get_or_init(|| RwLock::new(default_registry()));
let mut w = lock.write().expect("lock poisoned");
init(&mut w);
}

View File

@ -7,7 +7,7 @@ pub mod models;
pub mod services;
pub mod routes;
pub mod utils;
//pub mod workflow;
pub mod flow;
use axum::Router;
use axum::http::{HeaderValue, Method};
@ -15,6 +15,16 @@ use tower_http::cors::{CorsLayer, Any, AllowOrigin};
use migration::MigratorTrait;
use axum::middleware;
// 自定义日志时间格式YYYY-MM-DD HH:MM:SS.ssssss不带 T 和 Z
struct LocalTimeFmt;
impl tracing_subscriber::fmt::time::FormatTime for LocalTimeFmt {
fn format_time(&self, w: &mut tracing_subscriber::fmt::format::Writer) -> std::fmt::Result {
let now = chrono::Local::now();
w.write_str(&now.format("%Y-%m-%d %H:%M:%S%.6f").to_string())
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// 增强:支持通过 ENV_FILE 指定要加载的环境文件,并记录实际加载的文件
@ -41,9 +51,14 @@ async fn main() -> anyhow::Result<()> {
}
};
tracing_subscriber::fmt().with_env_filter(tracing_subscriber::EnvFilter::from_default_env()).init();
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.with_timer(LocalTimeFmt)
.init();
let db = db::init_db().await?;
// set global DB for tasks
db::set_db(db.clone()).expect("db set failure");
// initialize Redis connection
let redis_pool = redis::init_redis().await?;

View File

@ -20,8 +20,10 @@ pub fn encode_token(claims: &Claims, secret: &str) -> Result<String, AppError> {
pub fn decode_token(token: &str, secret: &str) -> Result<Claims, AppError> {
let key = DecodingKey::from_secret(secret.as_bytes());
let data = jsonwebtoken::decode::<Claims>(token, &key, &Validation::default())?;
Ok(data.claims)
match jsonwebtoken::decode::<Claims>(token, &key, &Validation::default()) {
Ok(data) => Ok(data.claims),
Err(_) => Err(AppError::Unauthorized),
}
}
#[derive(Clone, Debug)]

View File

@ -0,0 +1,21 @@
use sea_orm::entity::prelude::*;
#[derive(Clone, Debug, PartialEq, DeriveEntityModel)]
#[sea_orm(table_name = "flows")]
pub struct Model {
#[sea_orm(primary_key)]
pub id: String,
pub name: Option<String>,
pub yaml: Option<String>,
pub design_json: Option<String>,
// 新增:流程编号与备注
pub code: Option<String>,
pub remark: Option<String>,
pub created_at: DateTimeWithTimeZone,
pub updated_at: DateTimeWithTimeZone,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}
impl ActiveModelBehavior for ActiveModel {}

View File

@ -0,0 +1,25 @@
use sea_orm::entity::prelude::*;
#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel, serde::Serialize, serde::Deserialize)]
#[sea_orm(table_name = "flow_run_logs")]
pub struct Model {
#[sea_orm(primary_key)]
pub id: i64,
pub flow_id: String,
// 新增:流程编码(可空)
pub flow_code: Option<String>,
pub input: Option<String>,
pub output: Option<String>,
pub ok: bool,
pub logs: Option<String>,
pub user_id: Option<i64>,
pub username: Option<String>,
pub started_at: DateTimeWithTimeZone,
pub duration_ms: i64,
pub created_at: DateTimeWithTimeZone,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}
impl ActiveModelBehavior for ActiveModel {}

View File

@ -9,4 +9,6 @@ pub mod user_department;
pub mod request_log;
// 新增岗位与用户岗位关联模型
pub mod position;
pub mod user_position;
pub mod user_position;
pub mod flow;
pub mod flow_run_log;

View File

@ -0,0 +1,13 @@
use axum::{Router, routing::get, extract::{State, Query}, Json};
use crate::{db::Db, response::ApiResponse, services::flow_run_log_service};
pub fn router() -> Router<Db> {
Router::new().route("/flow_run_logs", get(list))
}
async fn list(State(db): State<Db>, Query(p): Query<flow_run_log_service::ListParams>) -> Json<ApiResponse<flow_run_log_service::PageResp<flow_run_log_service::RunLogItem>>> {
match flow_run_log_service::list(&db, p).await {
Ok(res) => Json(ApiResponse::ok(res)),
Err(e) => Json(ApiResponse::err(500, format!("{}", e))),
}
}

View File

@ -0,0 +1,71 @@
use axum::{Router, routing::{post, get}, extract::{State, Path, Query}, Json};
use crate::{db::Db, response::ApiResponse, services::flow_service, error::AppError};
use serde::Deserialize;
use tracing::{info, error};
use crate::middlewares::jwt::AuthUser;
pub fn router() -> Router<Db> {
Router::new()
.route("/flows", post(create).get(list))
.route("/flows/{id}", get(get_one).put(update).delete(remove))
.route("/flows/{id}/run", post(run))
}
#[derive(Deserialize)]
struct PageParams { page: Option<u64>, page_size: Option<u64>, keyword: Option<String> }
async fn list(State(db): State<Db>, Query(p): Query<PageParams>) -> Result<Json<ApiResponse<flow_service::PageResp<flow_service::FlowSummary>>>, AppError> {
let page = p.page.unwrap_or(1);
let page_size = p.page_size.unwrap_or(10);
let res = flow_service::list(&db, page, page_size, p.keyword).await.map_err(flow_service::ae)?;
Ok(Json(ApiResponse::ok(res)))
}
#[derive(Deserialize)]
struct CreateReq { yaml: Option<String>, name: Option<String>, design_json: Option<serde_json::Value>, code: Option<String>, remark: Option<String> }
#[derive(Deserialize)]
struct UpdateReq { yaml: Option<String>, design_json: Option<serde_json::Value>, name: Option<String>, code: Option<String>, remark: Option<String> }
async fn create(State(db): State<Db>, Json(req): Json<CreateReq>) -> Result<Json<ApiResponse<flow_service::FlowDoc>>, AppError> {
info!(target = "udmin", "routes.flows.create: start");
let res = match flow_service::create(&db, flow_service::FlowCreateReq { yaml: req.yaml, name: req.name, design_json: req.design_json, code: req.code, remark: req.remark }).await {
Ok(r) => { info!(target = "udmin", id = %r.id, "routes.flows.create: ok"); r }
Err(e) => {
error!(target = "udmin", error = ?e, "routes.flows.create: failed");
// 将错误恢复为统一映射,避免对外暴露内部细节
return Err(flow_service::ae(e));
}
};
Ok(Json(ApiResponse::ok(res)))
}
async fn update(State(db): State<Db>, Path(id): Path<String>, Json(req): Json<UpdateReq>) -> Result<Json<ApiResponse<flow_service::FlowDoc>>, AppError> {
let res = flow_service::update(&db, &id, flow_service::FlowUpdateReq { yaml: req.yaml, design_json: req.design_json, name: req.name, code: req.code, remark: req.remark }).await.map_err(flow_service::ae)?;
Ok(Json(ApiResponse::ok(res)))
}
async fn get_one(State(db): State<Db>, Path(id): Path<String>) -> Result<Json<ApiResponse<flow_service::FlowDoc>>, AppError> {
let res = flow_service::get(&db, &id).await.map_err(flow_service::ae)?;
Ok(Json(ApiResponse::ok(res)))
}
async fn remove(State(db): State<Db>, Path(id): Path<String>) -> Result<Json<ApiResponse<serde_json::Value>>, AppError> {
flow_service::delete(&db, &id).await.map_err(flow_service::ae)?;
Ok(Json(ApiResponse::ok(serde_json::json!({"deleted": true}))))
}
#[derive(Deserialize)]
struct RunReq { #[serde(default)] input: serde_json::Value }
async fn run(State(db): State<Db>, user: AuthUser, Path(id): Path<String>, Json(req): Json<RunReq>) -> Result<Json<ApiResponse<flow_service::RunResult>>, AppError> {
match flow_service::run(&db, &id, flow_service::RunReq { input: req.input }, Some((user.uid, user.username))).await {
Ok(r) => Ok(Json(ApiResponse::ok(r))),
Err(e) => {
// 同步执行:直接把后端错误详细信息返回给前端
let mut full = e.to_string();
for cause in e.chain().skip(1) { full.push_str(" | "); full.push_str(&cause.to_string()); }
Err(AppError::InternalMsg(full))
}
}
}

View File

@ -6,6 +6,8 @@ pub mod departments;
pub mod logs;
// 新增岗位
pub mod positions;
pub mod flows;
pub mod flow_run_logs;
use axum::Router;
use crate::db::Db;
@ -18,6 +20,7 @@ pub fn api_router() -> Router<Db> {
.merge(menus::router())
.merge(departments::router())
.merge(logs::router())
// 合并岗位路由
.merge(flows::router())
.merge(positions::router())
.merge(flow_run_logs::router())
}

View File

@ -0,0 +1,78 @@
use crate::{db::Db, models::flow_run_log};
use sea_orm::{ActiveModelTrait, Set, EntityTrait, PaginatorTrait, QueryFilter, QueryOrder, ColumnTrait};
use chrono::{DateTime, FixedOffset, Utc};
#[derive(serde::Serialize)]
pub struct PageResp<T> { pub items: Vec<T>, pub total: u64, pub page: u64, pub page_size: u64 }
#[derive(serde::Deserialize)]
pub struct ListParams { pub page: Option<u64>, pub page_size: Option<u64>, pub flow_id: Option<String>, pub flow_code: Option<String>, pub user: Option<String>, pub ok: Option<bool> }
#[derive(serde::Serialize)]
pub struct RunLogItem {
pub id: i64,
pub flow_id: String,
pub flow_code: Option<String>,
pub input: Option<String>,
pub output: Option<String>,
pub ok: bool,
pub logs: Option<String>,
pub user_id: Option<i64>,
pub username: Option<String>,
pub started_at: chrono::DateTime<chrono::FixedOffset>,
pub duration_ms: i64,
}
impl From<flow_run_log::Model> for RunLogItem {
fn from(m: flow_run_log::Model) -> Self {
Self { id: m.id, flow_id: m.flow_id, flow_code: m.flow_code, input: m.input, output: m.output, ok: m.ok, logs: m.logs, user_id: m.user_id, username: m.username, started_at: m.started_at, duration_ms: m.duration_ms }
}
}
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug)]
pub struct CreateRunLogInput {
pub flow_id: String,
pub flow_code: Option<String>,
pub input: Option<String>,
pub output: Option<String>,
pub ok: bool,
pub logs: Option<String>,
pub user_id: Option<i64>,
pub username: Option<String>,
pub started_at: DateTime<FixedOffset>,
pub duration_ms: i64,
}
pub async fn create(db: &Db, input: CreateRunLogInput) -> anyhow::Result<i64> {
let am = flow_run_log::ActiveModel {
id: Default::default(),
flow_id: Set(input.flow_id),
flow_code: Set(input.flow_code),
input: Set(input.input),
output: Set(input.output),
ok: Set(input.ok),
logs: Set(input.logs),
user_id: Set(input.user_id),
username: Set(input.username),
started_at: Set(input.started_at),
duration_ms: Set(input.duration_ms),
created_at: Set(Utc::now().with_timezone(&FixedOffset::east_opt(0).unwrap())),
};
let m = am.insert(db).await?;
Ok(m.id)
}
pub async fn list(db: &Db, p: ListParams) -> anyhow::Result<PageResp<RunLogItem>> {
let page = p.page.unwrap_or(1); let page_size = p.page_size.unwrap_or(10);
let mut selector = flow_run_log::Entity::find();
if let Some(fid) = p.flow_id { selector = selector.filter(flow_run_log::Column::FlowId.eq(fid)); }
if let Some(fcode) = p.flow_code { selector = selector.filter(flow_run_log::Column::FlowCode.eq(fcode)); }
if let Some(u) = p.user {
let like = format!("%{}%", u);
selector = selector.filter(flow_run_log::Column::Username.like(like));
}
if let Some(ok) = p.ok { selector = selector.filter(flow_run_log::Column::Ok.eq(ok)); }
let paginator = selector.order_by_desc(flow_run_log::Column::Id).paginate(db, page_size);
let total = paginator.num_items().await? as u64;
let models = paginator.fetch_page(if page>0 { page-1 } else { 0 }).await?;
Ok(PageResp { items: models.into_iter().map(Into::into).collect(), total, page, page_size })
}

View File

@ -0,0 +1,438 @@
// removed unused: use std::collections::HashMap;
// removed unused: use std::sync::Mutex;
use anyhow::Context as _;
use serde::{Deserialize, Serialize};
use crate::error::AppError;
use crate::flow::{self, dsl::FlowDSL, engine::FlowEngine, context::{DriveOptions, ExecutionMode}};
use crate::db::Db;
use crate::models::flow as db_flow;
use crate::models::request_log; // 新增:查询最近修改人
use crate::services::flow_run_log_service;
use crate::services::flow_run_log_service::CreateRunLogInput;
use sea_orm::{EntityTrait, ActiveModelTrait, Set, DbErr, ColumnTrait, QueryFilter, PaginatorTrait, QueryOrder};
use sea_orm::entity::prelude::DateTimeWithTimeZone; // 新增:时间类型
use chrono::{Utc, FixedOffset};
use tracing::{info, error};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FlowSummary {
pub id: String,
pub name: String,
#[serde(skip_serializing_if = "Option::is_none")] pub code: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")] pub remark: Option<String>,
pub created_at: DateTimeWithTimeZone,
pub updated_at: DateTimeWithTimeZone,
#[serde(skip_serializing_if = "Option::is_none")] pub last_modified_by: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FlowDoc { pub id: String, pub yaml: String, #[serde(skip_serializing_if = "Option::is_none")] pub design_json: Option<serde_json::Value> }
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FlowCreateReq { pub yaml: Option<String>, pub name: Option<String>, pub design_json: Option<serde_json::Value>, pub code: Option<String>, pub remark: Option<String> }
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FlowUpdateReq { pub yaml: Option<String>, pub design_json: Option<serde_json::Value>, pub name: Option<String>, pub code: Option<String>, pub remark: Option<String> }
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RunReq { #[serde(default)] pub input: serde_json::Value }
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RunResult { pub ok: bool, pub ctx: serde_json::Value, pub logs: Vec<String> }
#[derive(Clone, Debug, serde::Serialize)]
pub struct PageResp<T> { pub items: Vec<T>, pub total: u64, pub page: u64, pub page_size: u64 }
// list flows from database with pagination & keyword
pub async fn list(db: &Db, page: u64, page_size: u64, keyword: Option<String>) -> anyhow::Result<PageResp<FlowSummary>> {
let mut selector = db_flow::Entity::find();
if let Some(k) = keyword.filter(|s| !s.is_empty()) {
let like = format!("%{}%", k);
selector = selector.filter(
db_flow::Column::Name.like(like.clone())
.or(db_flow::Column::Id.like(like))
);
}
let paginator = selector.order_by_desc(db_flow::Column::CreatedAt).paginate(db, page_size);
let total = paginator.num_items().await? as u64;
let models = paginator.fetch_page(if page > 0 { page - 1 } else { 0 }).await?;
let mut items: Vec<FlowSummary> = Vec::with_capacity(models.len());
for row in models.into_iter() {
let id = row.id.clone();
let name = row
.name
.clone()
.or_else(|| row.yaml.as_deref().and_then(extract_name))
.unwrap_or_else(|| {
let prefix: String = id.chars().take(8).collect();
format!("flow_{}", prefix)
});
// 最近修改人从请求日志中查找最近一次对该flow的PUT请求
let last_modified_by = request_log::Entity::find()
.filter(request_log::Column::Path.like(format!("/api/flows/{}%", id)))
.filter(request_log::Column::Method.eq("PUT"))
.order_by_desc(request_log::Column::RequestTime)
.one(db)
.await?
.and_then(|m| m.username);
items.push(FlowSummary {
id,
name,
code: row.code.clone(),
remark: row.remark.clone(),
created_at: row.created_at,
updated_at: row.updated_at,
last_modified_by,
});
}
Ok(PageResp { items, total, page, page_size })
}
// create new flow with yaml or just name
pub async fn create(db: &Db, req: FlowCreateReq) -> anyhow::Result<FlowDoc> {
info!(target: "udmin", "flow.create: start");
if let Some(yaml) = &req.yaml {
let _parsed: FlowDSL = serde_yaml::from_str(yaml).context("invalid flow yaml")?;
info!(target: "udmin", "flow.create: yaml parsed ok");
}
let id = uuid::Uuid::new_v4().to_string();
let name = req
.name
.clone()
.or_else(|| req.yaml.as_deref().and_then(extract_name));
let now = Utc::now().with_timezone(&FixedOffset::east_opt(0).unwrap());
let design_json_str = match &req.design_json { Some(v) => serde_json::to_string(v).ok(), None => None };
let am = db_flow::ActiveModel {
id: Set(id.clone()),
name: Set(name),
yaml: Set(req.yaml.clone()),
design_json: Set(design_json_str),
// 新增: code 与 remark 入库
code: Set(req.code.clone()),
remark: Set(req.remark.clone()),
created_at: Set(now),
updated_at: Set(Utc::now().with_timezone(&FixedOffset::east_opt(0).unwrap())),
..Default::default()
};
info!(target: "udmin", "flow.create: inserting into db id={}", id);
// Use exec() instead of insert() returning Model to avoid RecordNotInserted on non-AI PK
match db_flow::Entity::insert(am).exec(db).await {
Ok(_) => {
info!(target: "udmin", "flow.create: insert ok id={}", id);
Ok(FlowDoc { id, yaml: req.yaml.unwrap_or_default(), design_json: req.design_json })
}
Err(DbErr::RecordNotInserted) => {
// Workaround for MySQL + non-auto-increment PK: verify by reading back
error!(target: "udmin", "flow.create: insert returned RecordNotInserted, verifying by select id={}", id);
match db_flow::Entity::find_by_id(id.clone()).one(db).await {
Ok(Some(_)) => {
info!(target: "udmin", "flow.create: found inserted row by id={}, treating as success", id);
Ok(FlowDoc { id, yaml: req.yaml.unwrap_or_default(), design_json: req.design_json })
}
Ok(None) => Err(anyhow::anyhow!("insert flow failed").context("verify inserted row not found")),
Err(e) => Err(anyhow::Error::new(e).context("insert flow failed")),
}
}
Err(e) => {
error!(target: "udmin", error = ?e, "flow.create: insert failed");
Err(anyhow::Error::new(e).context("insert flow failed"))
}
}
}
pub async fn get(db: &Db, id: &str) -> anyhow::Result<FlowDoc> {
let row = db_flow::Entity::find_by_id(id.to_string()).one(db).await?;
let row = row.ok_or_else(|| anyhow::anyhow!("not found"))?;
let yaml = row.yaml.unwrap_or_default();
let design_json = row.design_json.and_then(|s| serde_json::from_str::<serde_json::Value>(&s).ok());
Ok(FlowDoc { id: row.id, yaml, design_json })
}
pub async fn update(db: &Db, id: &str, req: FlowUpdateReq) -> anyhow::Result<FlowDoc> {
if let Some(yaml) = &req.yaml {
let _parsed: FlowDSL = serde_yaml::from_str(yaml).context("invalid flow yaml")?;
}
let row = db_flow::Entity::find_by_id(id.to_string()).one(db).await?;
let Some(row) = row else { return Err(anyhow::anyhow!("not found")); };
let mut am: db_flow::ActiveModel = row.into();
if let Some(yaml) = req.yaml {
let next_name = req
.name
.or_else(|| extract_name(&yaml));
if let Some(n) = next_name { am.name = Set(Some(n)); }
am.yaml = Set(Some(yaml.clone()));
} else if let Some(n) = req.name { am.name = Set(Some(n)); }
if let Some(dj) = req.design_json {
let s = serde_json::to_string(&dj)?;
am.design_json = Set(Some(s));
}
if let Some(c) = req.code { am.code = Set(Some(c)); }
if let Some(r) = req.remark { am.remark = Set(Some(r)); }
// update timestamp
am.updated_at = Set(Utc::now().with_timezone(&FixedOffset::east_opt(0).unwrap()));
am.update(db).await?;
// return latest yaml
let got = db_flow::Entity::find_by_id(id.to_string()).one(db).await?.unwrap();
let dj = got.design_json.as_deref().and_then(|s| serde_json::from_str::<serde_json::Value>(&s).ok());
Ok(FlowDoc { id: id.to_string(), yaml: got.yaml.unwrap_or_default(), design_json: dj })
}
pub async fn delete(db: &Db, id: &str) -> anyhow::Result<()> {
let row = db_flow::Entity::find_by_id(id.to_string()).one(db).await?;
let Some(row) = row else { return Err(anyhow::anyhow!("not found")); };
let am: db_flow::ActiveModel = row.into();
am.delete(db).await?;
Ok(())
}
pub async fn run(db: &Db, id: &str, req: RunReq, operator: Option<(i64, String)>) -> anyhow::Result<RunResult> {
info!(target = "udmin", "flow.run: start id={}", id);
let start = Utc::now().with_timezone(&FixedOffset::east_opt(0).unwrap());
// 获取流程编码,便于写入运行日志
let flow_code: Option<String> = match db_flow::Entity::find_by_id(id.to_string()).one(db).await {
Ok(Some(row)) => row.code,
_ => None,
};
// 获取流程文档并记录失败原因
let doc = match get(db, id).await {
Ok(d) => d,
Err(e) => {
error!(target = "udmin", error = ?e, "flow.run: get doc failed id={}", id);
// 记录失败日志
let (user_id, username) = operator.map(|(u, n)| (Some(u), Some(n))).unwrap_or((None, None));
let _ = flow_run_log_service::create(db, CreateRunLogInput {
flow_id: id.to_string(),
flow_code: flow_code.clone(),
input: Some(serde_json::to_string(&req.input).unwrap_or_default()),
output: None,
ok: false,
logs: Some(format!("get doc failed: {}", e)),
user_id,
username,
started_at: start,
duration_ms: 0,
}).await;
return Err(e);
}
};
// 记录文档基本信息,便于判断走 JSON 还是 YAML
info!(target = "udmin", "flow.run: doc loaded id={} has_design_json={} yaml_len={}", id, doc.design_json.is_some(), doc.yaml.len());
// Prefer design_json if present; otherwise fall back to YAML
let mut exec_mode: ExecutionMode = ExecutionMode::Sync;
let (mut chain, mut ctx) = if let Some(design) = &doc.design_json {
info!(target = "udmin", "flow.run: building chain from design_json id={}", id);
let chain_from_json = match flow::dsl::chain_from_design_json(design) {
Ok(c) => c,
Err(e) => {
error!(target = "udmin", error = ?e, "flow.run: build chain from design_json failed id={}", id);
// 记录失败日志
let (user_id, username) = operator.as_ref().map(|(u, n)| (Some(*u), Some(n.clone()))).unwrap_or((None, None));
let _ = flow_run_log_service::create(db, CreateRunLogInput {
flow_id: id.to_string(),
flow_code: flow_code.clone(),
input: Some(serde_json::to_string(&req.input).unwrap_or_default()),
output: None,
ok: false,
logs: Some(format!("build chain from design_json failed: {}", e)),
user_id,
username,
started_at: start,
duration_ms: 0,
}).await;
return Err(e);
}
};
let mut ctx = req.input.clone();
// Merge node-scoped configs into ctx under ctx.nodes
let supplement = flow::dsl::ctx_from_design_json(design);
merge_json(&mut ctx, &supplement);
// 解析 executionMode / execution_mode
let mode_str = design.get("executionMode").and_then(|v| v.as_str())
.or_else(|| design.get("execution_mode").and_then(|v| v.as_str()))
.unwrap_or("sync");
exec_mode = parse_execution_mode(mode_str);
info!(target = "udmin", "flow.run: ctx prepared from design_json id={} execution_mode={:?}", id, exec_mode);
(chain_from_json, ctx)
} else {
info!(target = "udmin", "flow.run: parsing YAML id={}", id);
let dsl = match serde_yaml::from_str::<FlowDSL>(&doc.yaml) {
Ok(d) => d,
Err(e) => {
error!(target = "udmin", error = ?e, "flow.run: parse YAML failed id={}", id);
// 记录失败日志
let (user_id, username) = operator.as_ref().map(|(u, n)| (Some(*u), Some(n.clone()))).unwrap_or((None, None));
let _ = flow_run_log_service::create(db, CreateRunLogInput {
flow_id: id.to_string(),
flow_code: flow_code.clone(),
input: Some(serde_json::to_string(&req.input).unwrap_or_default()),
output: None,
ok: false,
logs: Some(format!("parse YAML failed: {}", e)),
user_id,
username,
started_at: start,
duration_ms: 0,
}).await;
return Err(anyhow::Error::new(e).context("invalid flow yaml"));
}
};
// 从 YAML 读取执行模式
if let Some(m) = dsl.execution_mode.as_deref() { exec_mode = parse_execution_mode(m); }
(dsl.into(), req.input.clone())
};
// 若 design_json 解析出的 chain 为空,兜底回退到 YAML
if chain.nodes.is_empty() {
info!(target = "udmin", "flow.run: empty chain from design_json, fallback to YAML id={}", id);
if !doc.yaml.trim().is_empty() {
match serde_yaml::from_str::<FlowDSL>(&doc.yaml) {
Ok(dsl) => {
chain = dsl.clone().into();
// YAML 分支下 ctx = req.input不再追加 design_json 的补充
ctx = req.input.clone();
if let Some(m) = dsl.execution_mode.as_deref() { exec_mode = parse_execution_mode(m); }
info!(target = "udmin", "flow.run: fallback YAML parsed id={} execution_mode={:?}", id, exec_mode);
}
Err(e) => {
error!(target = "udmin", error = ?e, "flow.run: fallback parse YAML failed id={}", id);
// 保留原空 chain稍后 drive 会再次报错,但这里先返回更明确的错误
let (user_id, username) = operator.as_ref().map(|(u, n)| (Some(*u), Some(n.clone()))).unwrap_or((None, None));
let _ = flow_run_log_service::create(db, CreateRunLogInput {
flow_id: id.to_string(),
flow_code: flow_code.clone(),
input: Some(serde_json::to_string(&req.input).unwrap_or_default()),
output: None,
ok: false,
logs: Some(format!("fallback parse YAML failed: {}", e)),
user_id,
username,
started_at: start,
duration_ms: 0,
}).await;
return Err(anyhow::anyhow!("empty chain: design_json produced no nodes and YAML parse failed"));
}
}
} else {
// YAML 也为空
let (user_id, username) = operator.as_ref().map(|(u, n)| (Some(*u), Some(n.clone()))).unwrap_or((None, None));
let _ = flow_run_log_service::create(db, CreateRunLogInput {
flow_id: id.to_string(),
flow_code: flow_code.clone(),
input: Some(serde_json::to_string(&req.input).unwrap_or_default()),
output: None,
ok: false,
logs: Some("empty chain: both design_json and yaml are empty".to_string()),
user_id,
username,
started_at: start,
duration_ms: 0,
}).await;
return Err(anyhow::anyhow!("empty chain: both design_json and yaml are empty"));
}
}
// 从全局注册中心获取任务(若未初始化则返回默认注册表)
let tasks: flow::task::TaskRegistry = flow::task::get_registry();
let engine = FlowEngine::new(tasks);
info!(target = "udmin", "flow.run: driving engine id={} nodes={} links={} execution_mode={:?}", id, chain.nodes.len(), chain.links.len(), exec_mode);
// 执行
let drive_res = engine
.drive(&chain, ctx, DriveOptions { execution_mode: exec_mode.clone(), ..Default::default() })
.await;
let (ctx, logs) = match drive_res {
Ok(r) => r,
Err(e) => {
error!(target = "udmin", error = ?e, "flow.run: engine drive failed id={}", id);
let dur = (Utc::now().with_timezone(&FixedOffset::east_opt(0).unwrap()) - start).num_milliseconds() as i64;
let (user_id, username) = operator.as_ref().map(|(u, n)| (Some(*u), Some(n.clone()))).unwrap_or((None, None));
let _ = flow_run_log_service::create(db, CreateRunLogInput {
flow_id: id.to_string(),
flow_code: flow_code.clone(),
input: Some(serde_json::to_string(&req.input).unwrap_or_default()),
output: None,
ok: false,
logs: Some(format!("engine drive failed: {}", e)),
user_id,
username,
started_at: start,
duration_ms: dur,
}).await;
return Err(e);
}
};
// 调试:打印处理后的 ctx
//info!(target = "udmin", "flow.run: result ctx={}", serde_json::to_string(&ctx).unwrap_or_else(|_| "<serialize ctx failed>".to_string()));
info!(target = "udmin", "flow.run: done id={}", id);
// 写入成功日志
let dur = (Utc::now().with_timezone(&FixedOffset::east_opt(0).unwrap()) - start).num_milliseconds() as i64;
let (user_id, username) = operator.map(|(u, n)| (Some(u), Some(n))).unwrap_or((None, None));
let _ = flow_run_log_service::create(db, CreateRunLogInput {
flow_id: id.to_string(),
flow_code: flow_code.clone(),
input: Some(serde_json::to_string(&req.input).unwrap_or_default()),
output: Some(serde_json::to_string(&ctx).unwrap_or_default()),
ok: true,
logs: Some(serde_json::to_string(&logs).unwrap_or_default()),
user_id,
username,
started_at: start,
duration_ms: dur,
}).await;
Ok(RunResult { ok: true, ctx, logs })
}
fn extract_name(yaml: &str) -> Option<String> {
for line in yaml.lines() {
let lt = line.trim();
if lt.starts_with("#") && lt.len() > 1 { return Some(lt.trim_start_matches('#').trim().to_string()); }
if lt.starts_with("name:") {
let name = lt.trim_start_matches("name:").trim();
if !name.is_empty() { return Some(name.to_string()); }
}
}
None
}
pub fn ae<E: Into<anyhow::Error>>(e: E) -> AppError {
let err: anyhow::Error = e.into();
let mut full = err.to_string();
for cause in err.chain().skip(1) {
full.push_str(" | ");
full.push_str(&cause.to_string());
}
// MySQL duplicate key example: "Database error: Duplicate entry 'xxx' for key 'idx-unique-flows-code'"
// 也兼容包含唯一索引名/关键字的报错信息
if full.contains("Duplicate entry") || full.contains("idx-unique-flows-code") || (full.contains("code") && full.contains("unique")) {
return AppError::Conflict("流程编码已存在".to_string());
}
AppError::Anyhow(anyhow::anyhow!(full))
}
// shallow merge json objects: a <- b
fn merge_json(a: &mut serde_json::Value, b: &serde_json::Value) {
use serde_json::Value as V;
match (a, b) {
(V::Object(ao), V::Object(bo)) => {
for (k, v) in bo.iter() {
match ao.get_mut(k) {
Some(av) => merge_json(av, v),
None => { ao.insert(k.clone(), v.clone()); }
}
}
}
(a_slot, b_val) => { *a_slot = b_val.clone(); }
}
}
// parse execution mode string
fn parse_execution_mode(s: &str) -> ExecutionMode {
match s.to_ascii_lowercase().as_str() {
"async" | "async_fire_and_forget" | "fire_and_forget" => ExecutionMode::AsyncFireAndForget,
_ => ExecutionMode::Sync,
}
}

View File

@ -5,4 +5,6 @@ pub mod menu_service;
pub mod department_service;
pub mod log_service;
// 新增岗位服务
pub mod position_service;
pub mod position_service;
pub mod flow_service;
pub mod flow_run_log_service;