Files
udmin/backend/src/flow/engine.rs
ayou 30716686ed feat(ws): 新增WebSocket实时通信支持与SSE独立服务
重构中间件结构,新增ws模块实现WebSocket流程执行实时推送
将SSE服务拆分为独立端口监听,默认8866
优化前端流式模式切换,支持WS/SSE协议选择
统一流式事件处理逻辑,完善错误处理与取消机制
更新Cargo.toml依赖,添加WebSocket相关库
调整代码组织结构,规范导入分组与注释
2025-09-21 22:15:33 +08:00

519 lines
25 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// std
use std::cell::RefCell;
use std::collections::HashMap;
use std::time::Instant;
// third-party
use futures::future::join_all;
use regex::Regex;
use rhai::{AST, Engine};
use tokio::sync::{Mutex, RwLock};
use tracing::info;
// crate
use crate::flow::executors::condition::eval_condition_json;
// super
use super::{
context::{DriveOptions, ExecutionMode},
domain::{ChainDef, NodeKind},
task::TaskRegistry,
};
// 结构体:紧随 use
pub struct FlowEngine {
pub tasks: TaskRegistry,
}
#[derive(Debug, Clone)]
pub struct DriveError {
pub node_id: String,
pub ctx: serde_json::Value,
pub logs: Vec<String>,
pub message: String,
}
// === 表达式评估支持thread_local 引擎与 AST 缓存,避免全局 Sync/Send 限制 ===
// 模块流程执行引擎engine.rs
// 作用:驱动 ChainDef 流程图,支持:
// - 同步/异步Fire-and-Forget任务执行
// - 条件路由Rhai 表达式与 JSON 条件)与无条件回退
// - 并发分支 fan-out 与 join_all 等待
// - SSE 实时事件推送(逐行增量 + 节点级切片)
// 设计要点:
// - 表达式执行使用 thread_local 的 Rhai Engine 与 AST 缓存,避免全局 Send/Sync 限制
// - 共享上下文使用 RwLock 包裹 serde_json::Value日志聚合使用 Mutex<Vec<String>>
// - 不做冲突校验:允许并发修改;最后写回/写入按代码路径覆盖
//
fn regex_match(s: &str, pat: &str) -> bool {
Regex::new(pat).map(|re| re.is_match(s)).unwrap_or(false)
}
// 常用字符串函数,便于在表达式中直接调用(函数式写法)
fn contains(s: &str, sub: &str) -> bool { s.contains(sub) }
fn starts_with(s: &str, prefix: &str) -> bool { s.starts_with(prefix) }
fn ends_with(s: &str, suffix: &str) -> bool { s.ends_with(suffix) }
// 新增:判空/判不空(支持任意 Dynamic 类型)
fn is_empty(v: rhai::Dynamic) -> bool {
if v.is_unit() { return true; }
if let Some(s) = v.clone().try_cast::<rhai::ImmutableString>() {
return s.is_empty();
}
if let Some(a) = v.clone().try_cast::<rhai::Array>() {
return a.is_empty();
}
if let Some(m) = v.clone().try_cast::<rhai::Map>() {
return m.is_empty();
}
false
}
fn not_empty(v: rhai::Dynamic) -> bool { !is_empty(v) }
thread_local! {
static RHIA_ENGINE: RefCell<Engine> = RefCell::new({
let mut eng = Engine::new();
// 限制执行步数,防止复杂表达式消耗过多计算资源
eng.set_max_operations(100_000);
// 严格变量模式,避免拼写错误导致静默为 null
eng.set_strict_variables(true);
// 注册常用工具函数
eng.register_fn("regex_match", regex_match);
eng.register_fn("contains", contains);
eng.register_fn("starts_with", starts_with);
eng.register_fn("ends_with", ends_with);
// 新增:注册判空/判不空函数(既可函数式调用,也可方法式调用)
eng.register_fn("is_empty", is_empty);
eng.register_fn("not_empty", not_empty);
eng
});
// 简单的 AST 缓存:以表达式字符串为键存储编译结果(线程本地)
static AST_CACHE: RefCell<HashMap<String, AST>> = RefCell::new(HashMap::new());
}
// 评估 Rhai 表达式为 bool提供 ctx 变量serde_json::Value
fn eval_rhai_expr_bool(expr: &str, ctx: &serde_json::Value) -> bool {
// 构造作用域并注入 ctx
let mut scope = rhai::Scope::new();
let dyn_ctx = match rhai::serde::to_dynamic(ctx.clone()) { Ok(d) => d, Err(_) => rhai::Dynamic::UNIT };
scope.push("ctx", dyn_ctx);
// 先从缓存读取 AST未命中则编译并写入缓存然后执行
let cached = AST_CACHE.with(|c| c.borrow().get(expr).cloned());
if let Some(ast) = cached {
return RHIA_ENGINE.with(|eng| eng.borrow().eval_ast_with_scope::<bool>(&mut scope, &ast).unwrap_or(false));
}
let compiled = RHIA_ENGINE.with(|eng| eng.borrow().compile(expr));
match compiled {
Ok(ast) => {
// 简单容量控制:超过 1024 条时清空,避免无限增长
AST_CACHE.with(|c| {
let mut cache = c.borrow_mut();
if cache.len() > 1024 { cache.clear(); }
cache.insert(expr.to_string(), ast.clone());
});
RHIA_ENGINE.with(|eng| eng.borrow().eval_ast_with_scope::<bool>(&mut scope, &ast).unwrap_or(false))
}
Err(_) => false,
}
}
// 通用:评估 Rhai 表达式并转换为 serde_json::Value失败返回 None
pub(crate) fn eval_rhai_expr_json(expr: &str, ctx: &serde_json::Value) -> Option<serde_json::Value> {
// 构造作用域并注入 ctx
let mut scope = rhai::Scope::new();
let dyn_ctx = match rhai::serde::to_dynamic(ctx.clone()) { Ok(d) => d, Err(_) => rhai::Dynamic::UNIT };
scope.push("ctx", dyn_ctx);
// 先从缓存读取 AST未命中则编译并写入缓存然后执行
let cached = AST_CACHE.with(|c| c.borrow().get(expr).cloned());
let eval = |ast: &AST, scope: &mut rhai::Scope| -> Option<serde_json::Value> {
RHIA_ENGINE.with(|eng| {
eng.borrow()
.eval_ast_with_scope::<rhai::Dynamic>(scope, ast)
.ok()
.and_then(|d| rhai::serde::from_dynamic(&d).ok())
})
};
if let Some(ast) = cached {
return eval(&ast, &mut scope);
}
let compiled = RHIA_ENGINE.with(|eng| eng.borrow().compile(expr));
match compiled {
Ok(ast) => {
AST_CACHE.with(|c| {
let mut cache = c.borrow_mut();
if cache.len() > 1024 { cache.clear(); }
cache.insert(expr.to_string(), ast.clone());
});
eval(&ast, &mut scope)
}
Err(_) => None,
}
}
impl FlowEngine {
pub fn new(tasks: TaskRegistry) -> Self { Self { tasks } }
pub fn builder() -> FlowEngineBuilder { FlowEngineBuilder::default() }
pub async fn drive(&self, chain: &ChainDef, ctx: serde_json::Value, opts: DriveOptions) -> anyhow::Result<(serde_json::Value, Vec<String>)> {
// 1) 选取起点:优先 Start否则入度为 0再否则第一个节点
// 查找 start优先 Start 节点;否则选择入度为 0 的第一个节点;再否则回退第一个节点
let start = if let Some(n) = chain
.nodes
.iter()
.find(|n| matches!(n.kind, NodeKind::Start))
{
n.id.0.clone()
} else {
// 计算入度
let mut indeg: HashMap<&str, usize> = HashMap::new();
for n in &chain.nodes { indeg.entry(n.id.0.as_str()).or_insert(0); }
for l in &chain.links { *indeg.entry(l.to.0.as_str()).or_insert(0) += 1; }
if let Some(n) = chain.nodes.iter().find(|n| indeg.get(n.id.0.as_str()).copied().unwrap_or(0) == 0) {
n.id.0.clone()
} else {
chain
.nodes
.first()
.ok_or_else(|| anyhow::anyhow!("empty chain"))?
.id
.0
.clone()
}
};
// 2) 构建可并发共享的数据结构
// 拷贝节点与边(保持原有顺序)到拥有所有权的 HashMap供并发分支安全使用
let node_map_owned: HashMap<String, super::domain::NodeDef> = chain.nodes.iter().map(|n| (n.id.0.clone(), n.clone())).collect();
let mut adj_owned: HashMap<String, Vec<super::domain::LinkDef>> = HashMap::new();
for l in &chain.links { adj_owned.entry(l.from.0.clone()).or_default().push(l.clone()); }
let node_map = std::sync::Arc::new(node_map_owned);
let adj = std::sync::Arc::new(adj_owned);
// 共享上下文(允许并发修改,程序端不做冲突校验)
let shared_ctx = std::sync::Arc::new(RwLock::new(ctx));
// 共享日志聚合
let logs_shared = std::sync::Arc::new(Mutex::new(Vec::<String>::new()));
// 3) 并发驱动从起点开始
let tasks = self.tasks.clone();
drive_from(tasks, node_map.clone(), adj.clone(), start, shared_ctx.clone(), opts.clone(), logs_shared.clone()).await?;
// 4) 汇总返回
let final_ctx = { shared_ctx.read().await.clone() };
let logs = { logs_shared.lock().await.clone() };
Ok((final_ctx, logs))
}
}
// 从指定节点开始驱动,遇到多条满足条件的边时:
// - 第一条在当前任务内继续
// - 其余分支并行 spawn等待全部分支执行完毕后返回
async fn drive_from(
tasks: TaskRegistry,
node_map: std::sync::Arc<HashMap<String, super::domain::NodeDef>>,
adj: std::sync::Arc<HashMap<String, Vec<super::domain::LinkDef>>>,
start: String,
ctx: std::sync::Arc<RwLock<serde_json::Value>>, // 共享上下文(并发写入通过写锁串行化,不做冲突校验)
opts: DriveOptions,
logs: std::sync::Arc<Mutex<Vec<String>>>,
) -> anyhow::Result<()> {
let mut current = start;
let mut steps = 0usize;
loop {
if steps >= opts.max_steps { break; }
steps += 1;
// 读取节点
let node = match node_map.get(&current) { Some(n) => n, None => break };
// 进入节点:打点
let node_id_str = node.id.0.clone();
let node_start = Instant::now();
// 进入节点前记录当前日志长度,便于节点结束时做切片
let pre_len = { logs.lock().await.len() };
// 在每次追加日志时同步发送一条增量 SSE 事件(仅 1 行日志),以提升实时性
// push_and_emit
// - 先将单行日志 push 到共享日志
// - 若存在 SSE 通道,截取上下文快照并发送单行增量事件
async fn push_and_emit(
logs: &std::sync::Arc<tokio::sync::Mutex<Vec<String>>>,
opts: &super::context::DriveOptions,
node_id: &str,
ctx: &std::sync::Arc<tokio::sync::RwLock<serde_json::Value>>,
msg: String,
) {
{
let mut lg = logs.lock().await;
lg.push(msg.clone());
}
if let Some(tx) = opts.event_tx.as_ref() {
let ctx_snapshot = { ctx.read().await.clone() };
crate::middlewares::sse::emit_node(&tx, node_id.to_string(), vec![msg], ctx_snapshot).await;
}
}
// enter 节点也实时推送
push_and_emit(&logs, &opts, &node_id_str, &ctx, format!("enter node: {}", node.id.0)).await;
info!(target: "udmin.flow", "enter node: {}", node.id.0);
// 执行任务
if let Some(task_name) = &node.task {
if let Some(task) = tasks.get(task_name) {
match opts.execution_mode {
ExecutionMode::Sync => {
// 使用快照执行,结束后整体写回(允许最后写入覆盖并发修改;程序端不做冲突校验)
let mut local_ctx = { ctx.read().await.clone() };
match task.execute(&node.id, node, &mut local_ctx).await {
Ok(_) => {
{ let mut w = ctx.write().await; *w = local_ctx; }
push_and_emit(&logs, &opts, &node_id_str, &ctx, format!("exec task: {} (sync)", task_name)).await;
info!(target: "udmin.flow", "exec task: {} (sync)", task_name);
}
Err(e) => {
let err_msg = format!("task error: {}: {}", task_name, e);
push_and_emit(&logs, &opts, &node_id_str, &ctx, err_msg.clone()).await;
// 捕获快照并返回 DriveError
let ctx_snapshot = { ctx.read().await.clone() };
let logs_snapshot = { logs.lock().await.clone() };
return Err(anyhow::Error::new(DriveError { node_id: node_id_str.clone(), ctx: ctx_snapshot, logs: logs_snapshot, message: err_msg }));
}
}
}
ExecutionMode::AsyncFireAndForget => {
// fire-and-forget基于快照执行不写回共享 ctx变量任务除外做有界差异写回
let task_ctx = { ctx.read().await.clone() };
let task_arc = task.clone();
let name_for_log = task_name.clone();
let node_id = node.id.clone();
let node_def = node.clone();
let logs_clone = logs.clone();
let ctx_clone = ctx.clone();
let event_tx_opt = opts.event_tx.clone();
tokio::spawn(async move {
let mut c = task_ctx.clone();
let _ = task_arc.execute(&node_id, &node_def, &mut c).await;
// 对 variable 任务执行写回:将顶层新增/修改的键写回共享 ctx并移除对应 variable 节点
if node_def.task.as_deref() == Some("variable") {
// 计算顶层差异(排除 nodes仅在不同或新增时写回
let mut changed: Vec<(String, serde_json::Value)> = Vec::new();
if let (serde_json::Value::Object(before_map), serde_json::Value::Object(after_map)) = (&task_ctx, &c) {
for (k, v_after) in after_map.iter() {
if k == "nodes" { continue; }
match before_map.get(k) {
Some(v_before) if v_before == v_after => {}
_ => changed.push((k.clone(), v_after.clone())),
}
}
}
{
let mut w = ctx_clone.write().await;
if let serde_json::Value::Object(map) = &mut *w {
for (k, v) in changed.into_iter() { map.insert(k, v); }
if let Some(serde_json::Value::Object(nodes)) = map.get_mut("nodes") {
nodes.remove(node_id.0.as_str());
}
}
}
{
let mut lg = logs_clone.lock().await;
lg.push(format!("exec task done (async): {} (writeback variable)", name_for_log));
}
// 实时推送异步完成日志
if let Some(tx) = event_tx_opt.as_ref() {
let ctx_snapshot = { ctx_clone.read().await.clone() };
crate::middlewares::sse::emit_node(&tx, node_id.0.clone(), vec![format!("exec task done (async): {} (writeback variable)", name_for_log)], ctx_snapshot).await;
}
info!(target: "udmin.flow", "exec task done (async): {} (writeback variable)", name_for_log);
} else {
{
let mut lg = logs_clone.lock().await;
lg.push(format!("exec task done (async): {}", name_for_log));
}
// 实时推送异步完成日志
if let Some(tx) = event_tx_opt.as_ref() {
let ctx_snapshot = { ctx_clone.read().await.clone() };
crate::middlewares::sse::emit_node(&tx, node_id.0.clone(), vec![format!("exec task done (async): {}", name_for_log)], ctx_snapshot).await;
}
info!(target: "udmin.flow", "exec task done (async): {}", name_for_log);
}
});
push_and_emit(&logs, &opts, &node_id_str, &ctx, format!("spawn task: {} (async)", task_name)).await;
info!(target: "udmin.flow", "spawn task: {} (async)", task_name);
}
}
} else {
push_and_emit(&logs, &opts, &node_id_str, &ctx, format!("task not found: {} (skip)", task_name)).await;
info!(target: "udmin.flow", "task not found: {} (skip)", task_name);
}
}
// End 节点:记录耗时后结束
if matches!(node.kind, NodeKind::End) {
let duration = node_start.elapsed().as_millis();
push_and_emit(&logs, &opts, &node_id_str, &ctx, format!("leave node: {} {} ms", node_id_str, duration)).await;
info!(target: "udmin.flow", "leave node: {} {} ms", node_id_str, duration);
if let Some(tx) = opts.event_tx.as_ref() {
let node_logs = { let lg = logs.lock().await; lg[pre_len..].to_vec() };
let ctx_snapshot = { ctx.read().await.clone() };
crate::middlewares::sse::emit_node(&tx, node_id_str.clone(), node_logs, ctx_snapshot).await;
}
break;
}
// 选择下一批 link仅在 Condition 节点上评估条件;其他节点忽略条件,直接沿第一条边前进
let mut nexts: Vec<String> = Vec::new();
if let Some(links) = adj.get(node.id.0.as_str()) {
if matches!(node.kind, NodeKind::Condition) {
// 条件边:全部评估为真者加入 nexts空字符串条件视为无条件不在此处评估
for link in links.iter() {
if let Some(cond_str) = &link.condition {
if cond_str.trim().is_empty() {
// 空条件:视为无条件边,留待后续回退逻辑处理
info!(target: "udmin.flow", from=%node.id.0, to=%link.to.0, "condition link: empty (unconditional candidate)");
continue;
}
let trimmed = cond_str.trim_start();
let (kind, ok) = if trimmed.starts_with('{') || trimmed.starts_with('[') {
match serde_json::from_str::<serde_json::Value>(cond_str) {
Ok(v) => {
let snapshot = { ctx.read().await.clone() };
("json", eval_condition_json(&snapshot, &v).unwrap_or(false))
}
Err(_) => ("json_parse_error", false),
}
} else {
let snapshot = { ctx.read().await.clone() };
("rhai", eval_rhai_expr_bool(cond_str, &snapshot))
};
info!(target: "udmin.flow", from=%node.id.0, to=%link.to.0, cond_kind=%kind, cond_len=%cond_str.len(), result=%ok, "condition link evaluated");
if ok { nexts.push(link.to.0.clone()); }
} else {
// 无 condition 字段:视为无条件边
info!(target: "udmin.flow", from=%node.id.0, to=%link.to.0, "condition link: none (unconditional candidate)");
}
}
// 若没有命中条件边,则取第一条无条件边(无条件 = 无 condition 或 空字符串)
if nexts.is_empty() {
let mut picked = None;
for link in links.iter() {
match &link.condition {
None => { picked = Some(link.to.0.clone()); break; }
Some(s) if s.trim().is_empty() => { picked = Some(link.to.0.clone()); break; }
_ => {}
}
}
if let Some(to) = picked {
info!(target: "udmin.flow", from=%node.id.0, to=%to, "condition fallback: pick unconditional");
nexts.push(to);
} else {
info!(target: "udmin.flow", node=%node.id.0, "condition: no matched and no unconditional, stop");
}
}
} else {
// 非条件节点忽略条件fan-out 所有出边(全部并行执行)
for link in links.iter() {
nexts.push(link.to.0.clone());
info!(target: "udmin.flow", from=%node.id.0, to=%link.to.0, "fan-out from non-condition node");
}
}
}
// 无后继:记录耗时后结束
if nexts.is_empty() {
let duration = node_start.elapsed().as_millis();
{
let mut lg = logs.lock().await;
lg.push(format!("leave node: {} {} ms", node_id_str, duration));
}
push_and_emit(&logs, &opts, &node_id_str, &ctx, format!("leave node: {} {} ms", node_id_str, duration)).await;
info!(target: "udmin.flow", "leave node: {} {} ms", node_id_str, duration);
if let Some(tx) = opts.event_tx.as_ref() {
let node_logs = { let lg = logs.lock().await; lg[pre_len..].to_vec() };
let ctx_snapshot = { ctx.read().await.clone() };
crate::middlewares::sse::emit_node(&tx, node_id_str.clone(), node_logs, ctx_snapshot).await;
}
break;
}
// 单分支:记录耗时后前进
if nexts.len() == 1 {
let duration = node_start.elapsed().as_millis();
{
let mut lg = logs.lock().await;
lg.push(format!("leave node: {} {} ms", node_id_str, duration));
}
info!(target: "udmin.flow", "leave node: {} {} ms", node_id_str, duration);
if let Some(tx) = opts.event_tx.as_ref() {
let node_logs = { let lg = logs.lock().await; lg[pre_len..].to_vec() };
let ctx_snapshot = { ctx.read().await.clone() };
crate::middlewares::sse::emit_node(&tx, node_id_str.clone(), node_logs, ctx_snapshot).await;
}
current = nexts.remove(0);
continue;
}
// 多分支:主分支沿第一条继续,其余分支并行执行并等待完成
let mut futs = Vec::new();
for to_id in nexts.iter().skip(1).cloned() {
let tasks_c = tasks.clone();
let node_map_c = node_map.clone();
let adj_c = adj.clone();
let ctx_c = ctx.clone();
let opts_c = opts.clone();
let logs_c = logs.clone();
futs.push(drive_from(tasks_c, node_map_c, adj_c, to_id, ctx_c, opts_c, logs_c));
}
// 当前分支继续第一条
current = nexts.into_iter().next().unwrap();
// 在一个安全点等待已分支的完成(这里选择在下一轮进入前等待)
let _ = join_all(futs).await;
// 多分支:记录当前节点耗时(包含等待其他分支完成的时间)
let duration = node_start.elapsed().as_millis();
{
let mut lg = logs.lock().await;
lg.push(format!("leave node: {} {} ms", node_id_str, duration));
}
info!(target: "udmin.flow", "leave node: {} {} ms", node_id_str, duration);
if let Some(tx) = opts.event_tx.as_ref() {
let node_logs = { let lg = logs.lock().await; lg[pre_len..].to_vec() };
let ctx_snapshot = { ctx.read().await.clone() };
crate::middlewares::sse::emit_node(&tx, node_id_str.clone(), node_logs, ctx_snapshot).await;
}
}
Ok(())
}
#[derive(Default)]
pub struct FlowEngineBuilder {
tasks: Option<TaskRegistry>,
}
impl FlowEngineBuilder {
pub fn tasks(mut self, reg: TaskRegistry) -> Self { self.tasks = Some(reg); self }
pub fn build(self) -> FlowEngine {
let tasks = self.tasks.unwrap_or_else(|| crate::flow::task::default_registry());
FlowEngine { tasks }
}
}
impl Default for FlowEngine {
fn default() -> Self { Self { tasks: crate::flow::task::default_registry() } }
}
impl std::fmt::Display for DriveError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.message)
}
}
impl std::error::Error for DriveError {}