Compare commits
3 Commits
a1b21e87b3
...
flow
| Author | SHA1 | Date | |
|---|---|---|---|
| cbbcd3121f | |||
| 12830b7cf6 | |||
| 75c6974a35 |
61
.trae/documents/Flow 架构与执行图产出计划.md
Normal file
61
.trae/documents/Flow 架构与执行图产出计划.md
Normal file
@ -0,0 +1,61 @@
|
||||
## 输出物
|
||||
- 一份详细的 Flow 架构与执行流程文档,包含可视化图示与文字说明
|
||||
- 以 Mermaid 图(可直接在 IDE/Markdown 预览)呈现;同时提供 ASCII 简版便于纯文本查看
|
||||
|
||||
## 文档与图的组织
|
||||
- 文件:`docs/flow_architecture.md`
|
||||
- 结构:分章节“模块架构”“请求/运行编排”“引擎执行图”“数据模型”“事件与日志”“DSL 与 Design 转换”“执行器生态”
|
||||
- 图格式:Mermaid(flowchart、sequence、class、er);每个图下配文字说明与代码引用(file:line)
|
||||
|
||||
## 图示清单与内容要点
|
||||
1. 模块架构图(Graph LR)
|
||||
- 节点:`flow/{dsl,domain,context,engine,task,executors,mappers,log_handler}`、`services/{flow_service,flow_run_log_service}`、`routes/{flows,flow_run_logs}`、`middlewares/{sse,ws,jwt,http_client}`、`db,redis,models/{flow,flow_run_log}`
|
||||
- 边:解析→构建→驱动→事件→日志入库;路由→服务→引擎;执行器←上下文补充
|
||||
|
||||
2. 请求/运行编排时序图(Sequence)
|
||||
- 参与者:客户端、`routes.flows`、`flow_service`、`dsl/mappers`、`FlowEngine`、`log_handler`、`middlewares.sse/ws`、`models`
|
||||
- 关键步骤:加载流程→解析 YAML/design_json→构造 `ChainDef/ctx`→`drive(...)`→推送事件→写入运行日志→返回结果
|
||||
|
||||
3. 引擎执行图(Flowchart)
|
||||
- 图块:选起点→合流屏障检查→任务执行(Sync/AsyncFireAndForget/Queued/Bounded)→分支选择(条件 JSON/Rhai)→组内追踪与等待(BranchExit/FlowEnd)→结束与事件推送
|
||||
- 特别展示:`group` 信号量(队列/限并发)与写回策略(variable/http/db)
|
||||
|
||||
4. DSL/Design 转换图(Flowchart)
|
||||
- `DesignSyntax` 校验→条件端口兼容→推断 `kind/task/name`→组装 `groups/members/await_policy`→生成 `ChainDef`
|
||||
- 同时展示 `mappers.ctx_from_design_json` 将节点配置落到 `ctx.nodes.<id>.<executor>`
|
||||
|
||||
5. 数据模型图(ER/Class)
|
||||
- 表:`flows(id,name,yaml,design_json,code,remark,created_at,updated_at)`、`flow_run_logs(id,flow_id,flow_code,input,output,ok,logs,user_id,username,started_at,duration_ms,created_at)`
|
||||
- 关系:`flow_run_logs.flow_id → flows.id`
|
||||
|
||||
6. 事件与日志通道图(Graph LR)
|
||||
- SSE/WS:`engine.push_and_emit → middlewares.sse/ws → client`
|
||||
- 入库:`FlowLogHandler(Database|Sse)` → `flow_run_log_service.create`
|
||||
|
||||
7. 执行器生态图(Graph LR)
|
||||
- 节点:`http/db/variable/script_rhai/script_js/script_python/condition`
|
||||
- 输入来源:`ctx.nodes.<id>.*` 与顶层 `ctx.*`;输出写回策略与幂等注意点
|
||||
|
||||
## 信息来源与代码引用
|
||||
- DSL/Design:`backend/src/flow/dsl.rs:60-93,138-170,172-203,246-303`
|
||||
- 领域模型:`backend/src/flow/domain.rs:20-28,31-36,39-47,60-68`
|
||||
- 上下文与事件:`backend/src/flow/context.rs:29-45`
|
||||
- 引擎:`backend/src/flow/engine.rs:117-209,213-577`
|
||||
- 服务编排:`backend/src/services/flow_service.rs:285-305,342-349,351-365,366-399`
|
||||
- 路由:`backend/src/routes/flows.rs:26-35,101-133`
|
||||
- 日志服务:`backend/src/services/flow_run_log_service.rs:46-63,74-131`
|
||||
|
||||
## 实施步骤
|
||||
1. 生成 `docs/flow_architecture.md` 骨架,按章节填入概要说明
|
||||
2. 为每个章节插入 Mermaid 图代码块,并补充 2-4 条关键解释
|
||||
3. 在图下添加代码引用(`file_path:line_number`)
|
||||
4. 预览并调整布局(节点命名统一、边方向一致、避免交叉线)
|
||||
5. 输出 ASCII 简版(备选)供纯文本环境查看
|
||||
|
||||
## 验证
|
||||
- 在 IDE/Markdown 预览验证所有 Mermaid 图可渲染
|
||||
- 随机抽取 3 处代码引用跳转验证准确性
|
||||
|
||||
## 后续维护
|
||||
- 当新增执行器或模式,更新“执行器生态图”和“引擎执行图”
|
||||
- 当模型字段变更,更新 ER 图并在“请求/运行编排时序图”同步最新流程
|
||||
84
README.md
84
README.md
@ -2,6 +2,89 @@
|
||||
|
||||
欢迎来到 UdminAI 项目文档中心。本文档集合提供了项目的完整技术文档,涵盖了架构设计、模块说明、API 文档和最佳实践等内容。
|
||||
|
||||
## 🗂 项目结构
|
||||
|
||||
```
|
||||
udmin_ai/
|
||||
├─ backend/ # Rust 后端(axum + tokio + sea-orm)
|
||||
│ └─ src/
|
||||
│ ├─ flow/ # 流程执行核心
|
||||
│ │ ├─ mod.rs
|
||||
│ │ ├─ domain.rs # ChainDef/Node/Link/Group 等领域模型
|
||||
│ │ ├─ context.rs # 执行选项/流式事件
|
||||
│ │ ├─ engine.rs # FlowEngine 引擎(并发/分支/屏障/事件)
|
||||
│ │ ├─ dsl.rs # Design JSON/DSL 解析与构建
|
||||
│ │ ├─ mappers.rs # 设计映射到 ctx(nodes.<id>.<executor>)
|
||||
│ │ ├─ log_handler.rs # 运行日志与事件推送抽象
|
||||
│ │ ├─ task.rs # 任务注册表与执行器抽象
|
||||
│ │ └─ executors/ # http/db/variable/script_* / condition
|
||||
│ ├─ routes/ # API 路由层
|
||||
│ │ ├─ flows.rs # /flows CRUD 与运行(同步/SSE/WS)
|
||||
│ │ └─ flow_run_logs.rs # /flow_run_logs 查询与删除
|
||||
│ ├─ services/ # 业务编排与持久化
|
||||
│ │ ├─ flow_service.rs # 流程加载/解析/驱动与日志
|
||||
│ │ └─ flow_run_log_service.rs # 运行日志分页与批量删除
|
||||
│ ├─ middlewares/ # 中间件服务
|
||||
│ │ ├─ sse.rs # SSE 服务与事件封装
|
||||
│ │ ├─ ws.rs # WebSocket 服务与转发
|
||||
│ │ ├─ jwt.rs # 认证鉴权与用户提取
|
||||
│ │ └─ http_client.rs # HTTP 客户端封装(reqwest)
|
||||
│ ├─ models/ # SeaORM 数据模型
|
||||
│ │ ├─ flow.rs # flows 表
|
||||
│ │ └─ flow_run_log.rs # flow_run_logs 表
|
||||
│ ├─ db.rs # 数据库初始化与全局句柄
|
||||
│ └─ redis.rs # Redis 访问与令牌校验
|
||||
│
|
||||
├─ frontend/ # TypeScript 前端(React + Vite)
|
||||
│ └─ src/
|
||||
│ ├─ flows/ # 可视化流程编辑器与运行面板
|
||||
│ │ ├─ editor.tsx # 设计器入口(自由布局)
|
||||
│ │ ├─ services/custom-service.ts # 保存与运行(HTTP/SSE/WS)
|
||||
│ │ └─ components/testrun/... # 测试运行面板与实时输出
|
||||
│ ├─ pages/
|
||||
│ │ ├─ FlowList.tsx # 流程列表/新建/编辑
|
||||
│ │ └─ FlowRunLogs.tsx # 运行日志列表
|
||||
│ ├─ utils/
|
||||
│ │ ├─ axios.ts # 统一请求拦截(401 刷新/跳转登录)
|
||||
│ │ └─ token.ts # 本地令牌与用户信息管理
|
||||
│ ├─ App.tsx / main.tsx # 入口与路由
|
||||
│ └─ styles / layouts ... # 样式与布局
|
||||
│
|
||||
├─ docs/ # 文档
|
||||
│ ├─ flow_architecture.md # Flow 架构与执行图(Mermaid)
|
||||
│ └─ ... # 其他专题文档(概览/后端/前端/示例)
|
||||
│
|
||||
└─ .trae/rules/project_rules.md # 项目代码规范(Rust 文件顺序等)
|
||||
```
|
||||
|
||||
## ⚙️ 快速启动
|
||||
|
||||
- 后端启动(默认读取 `.env`,端口:HTTP 9898,SSE 8866,WS 8855)
|
||||
- `cd backend && ENV_FILE=dev cargo run`
|
||||
- 前端启动(默认开发端口 8888)
|
||||
- `cd frontend && npm install && npm run dev`
|
||||
|
||||
## 🔌 主要接口
|
||||
|
||||
- 流程管理
|
||||
- `POST /api/flows` 新建流程(支持 YAML 与 design_json)
|
||||
- `GET /api/flows` 分页查询
|
||||
- `GET/PUT/DELETE /api/flows/{id}` 详情/更新/删除
|
||||
- `POST /api/flows/{id}/run` 同步运行,返回 `{ ok, ctx, logs }`
|
||||
- `POST /api/flows/{id}/run/stream` SSE 流式运行,推送节点事件与完成事件
|
||||
- `GET /api/flows/{id}/run/ws` WebSocket 流式运行
|
||||
- 运行日志
|
||||
- `GET /api/flow_run_logs` 分页查询(支持按 flow_id/flow_code/user/ok 过滤)
|
||||
- `DELETE /api/flow_run_logs/{ids}` 批量删除
|
||||
|
||||
## 🧩 架构与执行
|
||||
|
||||
- 引擎:自研 `FlowEngine`(递归驱动图,合流屏障,分支并行,组等待)
|
||||
- 模式:`sync`(同步)/ `async`(组内异步)/ `queued`(组队列)/ `bounded`(组限并发)
|
||||
- 条件:JSON 条件集合或 `rhai` 表达式;无匹配时回退无条件边
|
||||
- 执行器:`http`、`db`、`variable`、`script_rhai/js/python`、`condition`
|
||||
- 文档详见:`docs/flow_architecture.md`
|
||||
|
||||
## 📚 文档导航
|
||||
|
||||
### 🏗️ 架构文档
|
||||
@ -17,6 +100,7 @@
|
||||
- **[路由层](docs/ROUTES.md)** - API 路由的设计规范、接口定义和中间件集成
|
||||
- **[数据模型](docs/MODELS.md)** - 数据模型的设计原则、实体定义和关系映射
|
||||
- **[中间件](docs/MIDDLEWARES.md)** - 中间件系统的设计理念、核心组件和使用方式
|
||||
- **[Flow 架构与执行图](docs/flow_architecture.md)** - Flow 端到端架构图与执行流程
|
||||
|
||||
### 🛠️ 基础设施
|
||||
|
||||
|
||||
@ -10,6 +10,8 @@ pub struct FlowContext {
|
||||
pub enum ExecutionMode {
|
||||
#[serde(rename = "sync")] Sync,
|
||||
#[serde(rename = "async")] AsyncFireAndForget,
|
||||
#[serde(rename = "queued")] AsyncQueued,
|
||||
#[serde(rename = "bounded")] AsyncBounded,
|
||||
}
|
||||
|
||||
impl Default for ExecutionMode { fn default() -> Self { ExecutionMode::Sync } }
|
||||
@ -35,8 +37,15 @@ pub struct DriveOptions {
|
||||
// 新增:事件通道(仅运行时使用,不做序列化/反序列化)
|
||||
#[serde(default, skip_serializing, skip_deserializing)]
|
||||
pub event_tx: Option<tokio::sync::mpsc::Sender<StreamEvent>>,
|
||||
// 新增:异步分组追踪器(仅运行时使用,不做序列化/反序列化)
|
||||
#[serde(default, skip_serializing, skip_deserializing)]
|
||||
pub async_groups: Option<std::sync::Arc<tokio::sync::Mutex<std::collections::HashMap<String, Vec<tokio::task::JoinHandle<()>>>>>>,
|
||||
#[serde(default, skip_serializing, skip_deserializing)]
|
||||
pub group_semaphores: Option<std::sync::Arc<tokio::sync::Mutex<std::collections::HashMap<String, std::sync::Arc<tokio::sync::Semaphore>>>>>,
|
||||
#[serde(default)]
|
||||
pub bounded_limit: Option<usize>,
|
||||
}
|
||||
|
||||
impl Default for DriveOptions {
|
||||
fn default() -> Self { Self { max_steps: 10_000, execution_mode: ExecutionMode::Sync, event_tx: None } }
|
||||
fn default() -> Self { Self { max_steps: 10_000, execution_mode: ExecutionMode::Sync, event_tx: None, async_groups: None, group_semaphores: None, bounded_limit: None } }
|
||||
}
|
||||
@ -1,4 +1,5 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash, Default)]
|
||||
pub struct NodeId(pub String);
|
||||
@ -41,4 +42,27 @@ pub struct ChainDef {
|
||||
pub nodes: Vec<NodeDef>,
|
||||
#[serde(default)]
|
||||
pub links: Vec<LinkDef>,
|
||||
#[serde(default)]
|
||||
pub groups: HashMap<String, GroupDef>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
pub enum GroupAwaitPolicy {
|
||||
#[serde(rename = "none")] None,
|
||||
#[serde(rename = "node_leave")] NodeLeave,
|
||||
#[serde(rename = "branch_exit")] BranchExit,
|
||||
#[serde(rename = "flow_end")] FlowEnd,
|
||||
}
|
||||
|
||||
impl Default for GroupAwaitPolicy { fn default() -> Self { GroupAwaitPolicy::BranchExit } }
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
|
||||
pub struct GroupDef {
|
||||
pub id: String,
|
||||
#[serde(default)]
|
||||
pub parent_id: Option<String>,
|
||||
#[serde(default)]
|
||||
pub members: Vec<String>,
|
||||
#[serde(default)]
|
||||
pub await_policy: GroupAwaitPolicy,
|
||||
}
|
||||
@ -88,6 +88,7 @@ impl From<FlowDSL> for super::domain::ChainDef {
|
||||
condition: e.condition,
|
||||
})
|
||||
.collect(),
|
||||
groups: std::collections::HashMap::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -179,13 +180,43 @@ fn build_chain_from_design(design: &DesignSyntax) -> anyhow::Result<super::domai
|
||||
use super::domain::{ChainDef, NodeDef, NodeId, NodeKind, LinkDef};
|
||||
|
||||
let mut nodes: Vec<NodeDef> = Vec::new();
|
||||
let mut groups: std::collections::HashMap<String, super::domain::GroupDef> = std::collections::HashMap::new();
|
||||
for n in &design.nodes {
|
||||
let kind = match n.kind.as_str() {
|
||||
"start" => NodeKind::Start,
|
||||
"end" => NodeKind::End,
|
||||
"condition" => NodeKind::Condition,
|
||||
"group" => NodeKind::Task, // group 本身不作为可执行节点,稍后跳过入 nodes
|
||||
_ => NodeKind::Task,
|
||||
};
|
||||
if n.kind.as_str() == "group" {
|
||||
// 解析分组:data.blockIDs(成员节点)、data.parentID(上层分组)、data.awaitPolicy(等待策略)
|
||||
let members: Vec<String> = n
|
||||
.data
|
||||
.get("blockIDs")
|
||||
.and_then(|v| v.as_array())
|
||||
.map(|arr| arr.iter().filter_map(|x| x.as_str().map(|s| s.to_string())).collect())
|
||||
.unwrap_or_default();
|
||||
let parent_id: Option<String> = n
|
||||
.data
|
||||
.get("parentID")
|
||||
.and_then(|v| v.as_str())
|
||||
.map(|s| s.to_string());
|
||||
let await_policy = match n
|
||||
.data
|
||||
.get("awaitPolicy")
|
||||
.and_then(|v| v.as_str())
|
||||
.unwrap_or("branch_exit")
|
||||
{
|
||||
"none" => super::domain::GroupAwaitPolicy::None,
|
||||
"node_leave" => super::domain::GroupAwaitPolicy::NodeLeave,
|
||||
"flow_end" => super::domain::GroupAwaitPolicy::FlowEnd,
|
||||
_ => super::domain::GroupAwaitPolicy::BranchExit,
|
||||
};
|
||||
groups.insert(n.id.clone(), super::domain::GroupDef { id: n.id.clone(), parent_id, members, await_policy });
|
||||
// group 节点不加入执行节点集合
|
||||
continue;
|
||||
}
|
||||
// 从节点 data.title 读取名称,若不存在则为空字符串
|
||||
let name = n.data.get("title").and_then(|v| v.as_str()).unwrap_or("").to_string();
|
||||
// 将可执行类型映射到任务标识(用于绑定任务实现)
|
||||
@ -268,7 +299,7 @@ fn build_chain_from_design(design: &DesignSyntax) -> anyhow::Result<super::domai
|
||||
links.push(LinkDef { from: NodeId(e.from.clone()), to: NodeId(e.to.clone()), condition: cond });
|
||||
}
|
||||
|
||||
Ok(ChainDef { name: design.name.clone(), nodes, links })
|
||||
Ok(ChainDef { name: design.name.clone(), nodes, links, groups })
|
||||
}
|
||||
|
||||
// Rewire external API to typed syntax -> validate -> build
|
||||
|
||||
@ -1,28 +1,27 @@
|
||||
//! 流程执行引擎(engine.rs):驱动 ChainDef 流程图,支持同步/异步任务、条件路由、并发分支与 SSE 推送。
|
||||
use std::cell::RefCell;
|
||||
use std::collections::HashMap;
|
||||
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::time::Instant;
|
||||
|
||||
use crate::flow::executors::condition::eval_condition_json;
|
||||
|
||||
use super::{
|
||||
context::{DriveOptions, ExecutionMode},
|
||||
domain::{ChainDef, NodeKind},
|
||||
domain::{ChainDef, NodeKind, GroupAwaitPolicy, GroupDef},
|
||||
task::TaskRegistry,
|
||||
};
|
||||
|
||||
use futures::future::join_all;
|
||||
use regex::Regex;
|
||||
use rhai::{AST, Engine};
|
||||
use tokio::sync::{Mutex, RwLock};
|
||||
use tokio::sync::{Mutex, RwLock, Semaphore};
|
||||
use tracing::info;
|
||||
use crate::flow::executors::script_rhai::eval_rhai_expr_bool;
|
||||
|
||||
// 结构体:紧随 use
|
||||
/// FlowEngine 管理 TaskRegistry 并驱动流程执行
|
||||
pub struct FlowEngine {
|
||||
pub tasks: TaskRegistry,
|
||||
}
|
||||
|
||||
|
||||
/// 驱动过程中的错误快照,包含触发节点、上下文与过程日志
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct DriveError {
|
||||
pub node_id: String,
|
||||
@ -31,214 +30,207 @@ pub struct DriveError {
|
||||
pub message: String,
|
||||
}
|
||||
|
||||
// === 表达式评估支持:thread_local 引擎与 AST 缓存,避免全局 Sync/Send 限制 ===
|
||||
// Rhai helpers moved to executors/script_rhai.rs
|
||||
|
||||
// 模块:流程执行引擎(engine.rs)
|
||||
// 作用:驱动 ChainDef 流程图,支持:
|
||||
// - 同步/异步(Fire-and-Forget)任务执行
|
||||
// - 条件路由(Rhai 表达式与 JSON 条件)与无条件回退
|
||||
// - 并发分支 fan-out 与 join_all 等待
|
||||
// - SSE 实时事件推送(逐行增量 + 节点级切片)
|
||||
// 设计要点:
|
||||
// - 表达式执行使用 thread_local 的 Rhai Engine 与 AST 缓存,避免全局 Send/Sync 限制
|
||||
// - 共享上下文使用 RwLock 包裹 serde_json::Value;日志聚合使用 Mutex<Vec<String>>
|
||||
// - 不做冲突校验:允许并发修改;最后写回/写入按代码路径覆盖
|
||||
//
|
||||
fn regex_match(s: &str, pat: &str) -> bool {
|
||||
Regex::new(pat).map(|re| re.is_match(s)).unwrap_or(false)
|
||||
/// 构建器:允许注入任务注册表,生成 FlowEngine
|
||||
#[derive(Default)]
|
||||
pub struct FlowEngineBuilder {
|
||||
tasks: Option<TaskRegistry>,
|
||||
}
|
||||
|
||||
// 常用字符串函数,便于在表达式中直接调用(函数式写法)
|
||||
fn contains(s: &str, sub: &str) -> bool { s.contains(sub) }
|
||||
fn starts_with(s: &str, prefix: &str) -> bool { s.starts_with(prefix) }
|
||||
fn ends_with(s: &str, suffix: &str) -> bool { s.ends_with(suffix) }
|
||||
|
||||
// 新增:判空/判不空(支持任意 Dynamic 类型)
|
||||
fn is_empty(v: rhai::Dynamic) -> bool {
|
||||
if v.is_unit() { return true; }
|
||||
if let Some(s) = v.clone().try_cast::<rhai::ImmutableString>() {
|
||||
return s.is_empty();
|
||||
// ---------- helper async functions: push+emit, group join ----------
|
||||
/// 写入节点日志并(若配置)通过 SSE 推送到前端
|
||||
/// - 日志:将消息追加至 `logs`
|
||||
/// - 推送:若存在 `event_tx`,向前端推送单条事件
|
||||
async fn push_and_emit(
|
||||
logs: &std::sync::Arc<tokio::sync::Mutex<Vec<String>>>,
|
||||
opts: &DriveOptions,
|
||||
node_id: &str,
|
||||
ctx: &std::sync::Arc<tokio::sync::RwLock<serde_json::Value>>,
|
||||
msg: String,
|
||||
) {
|
||||
{
|
||||
let mut lg = logs.lock().await;
|
||||
lg.push(msg.clone());
|
||||
}
|
||||
if let Some(a) = v.clone().try_cast::<rhai::Array>() {
|
||||
return a.is_empty();
|
||||
}
|
||||
if let Some(m) = v.clone().try_cast::<rhai::Map>() {
|
||||
return m.is_empty();
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
fn not_empty(v: rhai::Dynamic) -> bool { !is_empty(v) }
|
||||
thread_local! {
|
||||
static RHIA_ENGINE: RefCell<Engine> = RefCell::new({
|
||||
let mut eng = Engine::new();
|
||||
// 限制执行步数,防止复杂表达式消耗过多计算资源
|
||||
eng.set_max_operations(10_000_000);
|
||||
// 严格变量模式,避免拼写错误导致静默为 null
|
||||
eng.set_strict_variables(true);
|
||||
// 注册常用工具函数
|
||||
eng.register_fn("regex_match", regex_match);
|
||||
eng.register_fn("contains", contains);
|
||||
eng.register_fn("starts_with", starts_with);
|
||||
eng.register_fn("ends_with", ends_with);
|
||||
// 新增:注册判空/判不空函数(既可函数式调用,也可方法式调用)
|
||||
eng.register_fn("is_empty", is_empty);
|
||||
eng.register_fn("not_empty", not_empty);
|
||||
eng
|
||||
});
|
||||
|
||||
// 简单的 AST 缓存:以表达式字符串为键存储编译结果(线程本地)
|
||||
static AST_CACHE: RefCell<HashMap<String, AST>> = RefCell::new(HashMap::new());
|
||||
}
|
||||
|
||||
// 评估 Rhai 表达式为 bool,提供 ctx 变量(serde_json::Value)
|
||||
fn eval_rhai_expr_bool(expr: &str, ctx: &serde_json::Value) -> bool {
|
||||
// 构造作用域并注入 ctx
|
||||
let mut scope = rhai::Scope::new();
|
||||
let dyn_ctx = match rhai::serde::to_dynamic(ctx.clone()) { Ok(d) => d, Err(_) => rhai::Dynamic::UNIT };
|
||||
scope.push_dynamic("ctx", dyn_ctx);
|
||||
|
||||
// 先从缓存读取 AST;未命中则编译并写入缓存,然后执行
|
||||
let cached = AST_CACHE.with(|c| c.borrow().get(expr).cloned());
|
||||
if let Some(ast) = cached {
|
||||
return RHIA_ENGINE.with(|eng| eng.borrow().eval_ast_with_scope::<bool>(&mut scope, &ast).unwrap_or(false));
|
||||
}
|
||||
|
||||
let compiled = RHIA_ENGINE.with(|eng| eng.borrow().compile_with_scope(&mut scope, expr));
|
||||
match compiled {
|
||||
Ok(ast) => {
|
||||
// 简单容量控制:超过 1024 条时清空,避免无限增长
|
||||
AST_CACHE.with(|c| {
|
||||
let mut cache = c.borrow_mut();
|
||||
if cache.len() > 1024 { cache.clear(); }
|
||||
cache.insert(expr.to_string(), ast.clone());
|
||||
});
|
||||
RHIA_ENGINE.with(|eng| eng.borrow().eval_ast_with_scope::<bool>(&mut scope, &ast).unwrap_or(false))
|
||||
}
|
||||
Err(_) => false,
|
||||
if let Some(tx) = opts.event_tx.as_ref() {
|
||||
let ctx_snapshot = { ctx.read().await.clone() };
|
||||
crate::middlewares::sse::emit_node(&tx, node_id.to_string(), vec![msg], ctx_snapshot).await;
|
||||
}
|
||||
}
|
||||
|
||||
// 通用:评估 Rhai 表达式并转换为 serde_json::Value,失败返回错误
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum RhaiExecError {
|
||||
Compile { message: String },
|
||||
Runtime { message: String },
|
||||
Serde { message: String },
|
||||
}
|
||||
|
||||
impl std::fmt::Display for RhaiExecError {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
RhaiExecError::Compile { message } => write!(f, "compile error: {}", message),
|
||||
RhaiExecError::Runtime { message } => write!(f, "runtime error: {}", message),
|
||||
RhaiExecError::Serde { message } => write!(f, "serde error: {}", message),
|
||||
/// 按组策略等待异步任务完成
|
||||
/// - FlowEnd:流程结束前统一等待组内所有任务
|
||||
/// - BranchExit:分支退出时等待当前组任务
|
||||
async fn join_groups_by_policy(
|
||||
groups: &std::sync::Arc<std::collections::HashMap<String, GroupDef>>,
|
||||
tracker_opt: &Option<std::sync::Arc<tokio::sync::Mutex<std::collections::HashMap<String, Vec<tokio::task::JoinHandle<()>>>>>>,
|
||||
policy: GroupAwaitPolicy,
|
||||
logs: &std::sync::Arc<tokio::sync::Mutex<Vec<String>>>,
|
||||
opts: &DriveOptions,
|
||||
node_id: &str,
|
||||
ctx: &std::sync::Arc<tokio::sync::RwLock<serde_json::Value>>,
|
||||
) {
|
||||
if let Some(tracker_arc) = tracker_opt.as_ref() {
|
||||
let mut tracker = tracker_arc.lock().await;
|
||||
let group_ids: Vec<String> = groups
|
||||
.iter()
|
||||
.filter(|(_, g)| g.await_policy == policy)
|
||||
.map(|(k, _)| k.clone())
|
||||
.collect();
|
||||
for gid in group_ids {
|
||||
if let Some(handles) = tracker.get_mut(&gid) {
|
||||
let count = handles.len();
|
||||
for h in handles.drain(..) { let _ = h.await; }
|
||||
push_and_emit(logs, opts, node_id, ctx, format!("join group: {} done ({} tasks)", gid, count)).await;
|
||||
info!(target = "udmin.flow", "join group: {} done ({} tasks)", gid, count);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::error::Error for RhaiExecError {}
|
||||
|
||||
pub(crate) fn eval_rhai_expr_json(expr: &str, ctx: &serde_json::Value) -> Result<serde_json::Value, RhaiExecError> {
|
||||
// 构造作用域并注入 ctx
|
||||
let mut scope = rhai::Scope::new();
|
||||
let dyn_ctx = match rhai::serde::to_dynamic(ctx.clone()) { Ok(d) => d, Err(_) => rhai::Dynamic::UNIT };
|
||||
scope.push_dynamic("ctx", dyn_ctx);
|
||||
|
||||
// 先从缓存读取 AST;未命中则编译并写入缓存,然后执行
|
||||
let cached = AST_CACHE.with(|c| c.borrow().get(expr).cloned());
|
||||
let eval = |ast: &AST, scope: &mut rhai::Scope| -> Result<serde_json::Value, RhaiExecError> {
|
||||
RHIA_ENGINE.with(|eng| {
|
||||
eng.borrow()
|
||||
.eval_ast_with_scope::<rhai::Dynamic>(scope, ast)
|
||||
.map_err(|e| RhaiExecError::Runtime { message: e.to_string() })
|
||||
.and_then(|d| rhai::serde::from_dynamic(&d).map_err(|e| RhaiExecError::Serde { message: e.to_string() }))
|
||||
})
|
||||
};
|
||||
|
||||
if let Some(ast) = cached {
|
||||
return eval(&ast, &mut scope);
|
||||
/// 在离开组时按组 ID 进行等待,用于分支退出屏障
|
||||
async fn join_groups_by_ids(
|
||||
groups: &std::sync::Arc<std::collections::HashMap<String, GroupDef>>,
|
||||
tracker_opt: &Option<std::sync::Arc<tokio::sync::Mutex<std::collections::HashMap<String, Vec<tokio::task::JoinHandle<()>>>>>>,
|
||||
group_ids: &[String],
|
||||
logs: &std::sync::Arc<tokio::sync::Mutex<Vec<String>>>,
|
||||
opts: &DriveOptions,
|
||||
node_id: &str,
|
||||
ctx: &std::sync::Arc<tokio::sync::RwLock<serde_json::Value>>,
|
||||
) {
|
||||
if let Some(tracker_arc) = tracker_opt.as_ref() {
|
||||
let mut tracker = tracker_arc.lock().await;
|
||||
for gid in group_ids {
|
||||
if matches!(groups.get(gid).map(|g| g.await_policy.clone()), Some(GroupAwaitPolicy::BranchExit)) {
|
||||
if let Some(handles) = tracker.get_mut(gid) {
|
||||
let count = handles.len();
|
||||
for h in handles.drain(..) { let _ = h.await; }
|
||||
push_and_emit(logs, opts, node_id, ctx, format!("join group: {} done ({} tasks)", gid, count)).await;
|
||||
info!(target = "udmin.flow", "join group: {} done ({} tasks)", gid, count);
|
||||
}
|
||||
}
|
||||
|
||||
let compiled = RHIA_ENGINE.with(|eng| eng.borrow().compile_with_scope(&mut scope, expr));
|
||||
match compiled {
|
||||
Ok(ast) => {
|
||||
AST_CACHE.with(|c| {
|
||||
let mut cache = c.borrow_mut();
|
||||
if cache.len() > 1024 { cache.clear(); }
|
||||
cache.insert(expr.to_string(), ast.clone());
|
||||
});
|
||||
eval(&ast, &mut scope)
|
||||
}
|
||||
Err(e) => Err(RhaiExecError::Compile { message: e.to_string() }),
|
||||
}
|
||||
}
|
||||
|
||||
// ---------- FlowEngine impl ----------
|
||||
impl FlowEngine {
|
||||
pub fn new(tasks: TaskRegistry) -> Self { Self { tasks } }
|
||||
|
||||
pub fn builder() -> FlowEngineBuilder { FlowEngineBuilder::default() }
|
||||
|
||||
/// 驱动流程:构建图与共享上下文,递归执行节点,支持并发与合流
|
||||
pub async fn drive(&self, chain: &ChainDef, ctx: serde_json::Value, opts: DriveOptions) -> anyhow::Result<(serde_json::Value, Vec<String>)> {
|
||||
// 1) 选取起点:优先 Start;否则入度为 0;再否则第一个节点
|
||||
// 查找 start:优先 Start 节点;否则选择入度为 0 的第一个节点;再否则回退第一个节点
|
||||
let start = if let Some(n) = chain
|
||||
.nodes
|
||||
.iter()
|
||||
.find(|n| matches!(n.kind, NodeKind::Start))
|
||||
{
|
||||
// 总体思路:选起点 → 构建邻接与组信息 → 初始化共享状态 → 调用递归驱动
|
||||
// 1) 选起点(Start 优先)
|
||||
let start = if let Some(n) = chain.nodes.iter().find(|n| matches!(n.kind, NodeKind::Start)) {
|
||||
n.id.0.clone()
|
||||
} else {
|
||||
// 计算入度
|
||||
let mut indeg: HashMap<&str, usize> = HashMap::new();
|
||||
for n in &chain.nodes { indeg.entry(n.id.0.as_str()).or_insert(0); }
|
||||
for l in &chain.links { *indeg.entry(l.to.0.as_str()).or_insert(0) += 1; }
|
||||
if let Some(n) = chain.nodes.iter().find(|n| indeg.get(n.id.0.as_str()).copied().unwrap_or(0) == 0) {
|
||||
n.id.0.clone()
|
||||
} else {
|
||||
chain
|
||||
.nodes
|
||||
.first()
|
||||
.ok_or_else(|| anyhow::anyhow!("empty chain"))?
|
||||
.id
|
||||
.0
|
||||
.clone()
|
||||
chain.nodes.first().ok_or_else(|| anyhow::anyhow!("empty chain"))?.id.0.clone()
|
||||
}
|
||||
};
|
||||
|
||||
// 2) 构建可并发共享的数据结构
|
||||
// 拷贝节点与边(保持原有顺序)到拥有所有权的 HashMap,供并发分支安全使用
|
||||
// 2) 构建数据结构(Arc 包裹,便于并发使用)
|
||||
let node_map_owned: HashMap<String, super::domain::NodeDef> = chain.nodes.iter().map(|n| (n.id.0.clone(), n.clone())).collect();
|
||||
let mut adj_owned: HashMap<String, Vec<super::domain::LinkDef>> = HashMap::new();
|
||||
for l in &chain.links { adj_owned.entry(l.from.0.clone()).or_default().push(l.clone()); }
|
||||
let node_map = std::sync::Arc::new(node_map_owned);
|
||||
let adj = std::sync::Arc::new(adj_owned);
|
||||
|
||||
// 共享上下文(允许并发修改,程序端不做冲突校验)
|
||||
let shared_ctx = std::sync::Arc::new(RwLock::new(ctx));
|
||||
// 共享日志聚合
|
||||
let logs_shared = std::sync::Arc::new(Mutex::new(Vec::<String>::new()));
|
||||
|
||||
// 3) 并发驱动从起点开始
|
||||
let tasks = self.tasks.clone();
|
||||
drive_from(tasks, node_map.clone(), adj.clone(), start, shared_ctx.clone(), opts.clone(), logs_shared.clone()).await?;
|
||||
// groups & node->groups
|
||||
let groups_owned: std::collections::HashMap<String, GroupDef> = chain.groups.clone();
|
||||
let groups = std::sync::Arc::new(groups_owned);
|
||||
|
||||
let mut node_groups_map: std::collections::HashMap<String, Vec<String>> = std::collections::HashMap::new();
|
||||
for (gid, g) in groups.as_ref().iter() {
|
||||
for mid in &g.members { node_groups_map.entry(mid.clone()).or_default().push(gid.clone()); }
|
||||
}
|
||||
let node_groups = std::sync::Arc::new(node_groups_map);
|
||||
|
||||
// group depths
|
||||
fn group_depth<'a>(gid: &str, groups: &'a std::collections::HashMap<String, GroupDef>) -> usize {
|
||||
let mut depth = 0usize;
|
||||
let mut cur = gid;
|
||||
while let Some(g) = groups.get(cur) {
|
||||
if let Some(pid) = g.parent_id.as_deref() { depth += 1; cur = pid; } else { break; }
|
||||
}
|
||||
depth
|
||||
}
|
||||
let mut depths: std::collections::HashMap<String, usize> = std::collections::HashMap::new();
|
||||
for gid in groups.as_ref().keys() { depths.insert(gid.clone(), group_depth(gid, groups.as_ref())); }
|
||||
let group_depths = std::sync::Arc::new(depths);
|
||||
|
||||
// in-degrees
|
||||
let mut indeg_owned: HashMap<String, usize> = HashMap::new();
|
||||
for n in &chain.nodes { indeg_owned.insert(n.id.0.clone(), 0); }
|
||||
for l in &chain.links { *indeg_owned.entry(l.to.0.clone()).or_insert(0) += 1; }
|
||||
let in_degrees = std::sync::Arc::new(indeg_owned);
|
||||
|
||||
// executed merge set & arrivals map
|
||||
let executed_merge = std::sync::Arc::new(tokio::sync::Mutex::new(HashSet::<String>::new()));
|
||||
let arrivals = std::sync::Arc::new(tokio::sync::Mutex::new(HashMap::<String, usize>::new()));
|
||||
|
||||
// 4) 发起驱动
|
||||
let tasks = self.tasks.clone();
|
||||
let mut opts2 = opts.clone();
|
||||
if opts2.async_groups.is_none() {
|
||||
opts2.async_groups = Some(std::sync::Arc::new(tokio::sync::Mutex::new(std::collections::HashMap::new())));
|
||||
}
|
||||
if opts2.group_semaphores.is_none() {
|
||||
match opts2.execution_mode {
|
||||
ExecutionMode::AsyncQueued | ExecutionMode::AsyncBounded => {
|
||||
opts2.group_semaphores = Some(std::sync::Arc::new(tokio::sync::Mutex::new(std::collections::HashMap::new())));
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
drive_from(
|
||||
tasks,
|
||||
node_map.clone(),
|
||||
adj.clone(),
|
||||
groups.clone(),
|
||||
node_groups.clone(),
|
||||
group_depths.clone(),
|
||||
in_degrees.clone(),
|
||||
executed_merge.clone(),
|
||||
arrivals.clone(),
|
||||
start,
|
||||
shared_ctx.clone(),
|
||||
opts2.clone(),
|
||||
logs_shared.clone(),
|
||||
).await?;
|
||||
|
||||
// 4) 汇总返回
|
||||
let final_ctx = { shared_ctx.read().await.clone() };
|
||||
let logs = { logs_shared.lock().await.clone() };
|
||||
Ok((final_ctx, logs))
|
||||
}
|
||||
}
|
||||
|
||||
// 从指定节点开始驱动,遇到多条满足条件的边时:
|
||||
// - 第一条在当前任务内继续
|
||||
// - 其余分支并行 spawn,等待全部分支执行完毕后返回
|
||||
// ---------- 递归驱动函数 ----------
|
||||
/// 递归驱动子流程:屏障检查、执行模式选择、分支并行与组等待
|
||||
async fn drive_from(
|
||||
tasks: TaskRegistry,
|
||||
node_map: std::sync::Arc<HashMap<String, super::domain::NodeDef>>,
|
||||
adj: std::sync::Arc<HashMap<String, Vec<super::domain::LinkDef>>>,
|
||||
groups: std::sync::Arc<std::collections::HashMap<String, GroupDef>>,
|
||||
node_groups: std::sync::Arc<std::collections::HashMap<String, Vec<String>>>,
|
||||
group_depths: std::sync::Arc<std::collections::HashMap<String, usize>>,
|
||||
in_degrees: std::sync::Arc<HashMap<String, usize>>,
|
||||
executed_merge: std::sync::Arc<tokio::sync::Mutex<HashSet<String>>>,
|
||||
arrivals: std::sync::Arc<tokio::sync::Mutex<HashMap<String, usize>>>,
|
||||
start: String,
|
||||
ctx: std::sync::Arc<RwLock<serde_json::Value>>, // 共享上下文(并发写入通过写锁串行化,不做冲突校验)
|
||||
ctx: std::sync::Arc<RwLock<serde_json::Value>>,
|
||||
opts: DriveOptions,
|
||||
logs: std::sync::Arc<Mutex<Vec<String>>>,
|
||||
) -> anyhow::Result<()> {
|
||||
@ -248,43 +240,53 @@ async fn drive_from(
|
||||
loop {
|
||||
if steps >= opts.max_steps { break; }
|
||||
steps += 1;
|
||||
// 读取节点
|
||||
|
||||
let node = match node_map.get(¤t) { Some(n) => n, None => break };
|
||||
// 进入节点:打点
|
||||
let node_id_str = node.id.0.clone();
|
||||
let node_start = Instant::now();
|
||||
// 进入节点前记录当前日志长度,便于节点结束时做切片
|
||||
let pre_len = { logs.lock().await.len() };
|
||||
// 在每次追加日志时同步发送一条增量 SSE 事件(仅 1 行日志),以提升实时性
|
||||
// push_and_emit:
|
||||
// - 先将单行日志 push 到共享日志
|
||||
// - 若存在 SSE 通道,截取上下文快照并发送单行增量事件
|
||||
async fn push_and_emit(
|
||||
logs: &std::sync::Arc<tokio::sync::Mutex<Vec<String>>>,
|
||||
opts: &super::context::DriveOptions,
|
||||
node_id: &str,
|
||||
ctx: &std::sync::Arc<tokio::sync::RwLock<serde_json::Value>>,
|
||||
msg: String,
|
||||
) {
|
||||
{
|
||||
let mut lg = logs.lock().await;
|
||||
lg.push(msg.clone());
|
||||
}
|
||||
if let Some(tx) = opts.event_tx.as_ref() {
|
||||
let ctx_snapshot = { ctx.read().await.clone() };
|
||||
crate::middlewares::sse::emit_node(&tx, node_id.to_string(), vec![msg], ctx_snapshot).await;
|
||||
}
|
||||
}
|
||||
// enter 节点也实时推送
|
||||
|
||||
// 进入节点
|
||||
push_and_emit(&logs, &opts, &node_id_str, &ctx, format!("enter node: {}", node.id.0)).await;
|
||||
info!(target: "udmin.flow", "enter node: {}", node.id.0);
|
||||
|
||||
// 执行任务
|
||||
// 合流屏障处理(多前驱且不在组内)
|
||||
// 设计意图:在未全部到达前阻止抢跑,保证后继节点的输入完整
|
||||
let indeg = in_degrees.get(&node_id_str).copied().unwrap_or(0);
|
||||
let in_any_group = node_groups.get(&node_id_str).map(|v| !v.is_empty()).unwrap_or(false);
|
||||
if indeg > 1 && !in_any_group {
|
||||
let arrived_cnt = {
|
||||
let arr = arrivals.lock().await;
|
||||
arr.get(&node_id_str).copied().unwrap_or(0)
|
||||
};
|
||||
if arrived_cnt < indeg {
|
||||
push_and_emit(&logs, &opts, &node_id_str, &ctx, format!("skip node (merge barrier not ready): {} {}/{}", node_id_str, arrived_cnt, indeg)).await;
|
||||
info!(target: "udmin.flow", "skip node (merge barrier not ready): {} {}/{}", node_id_str, arrived_cnt, indeg);
|
||||
break;
|
||||
} else {
|
||||
let mut mark = executed_merge.lock().await;
|
||||
if mark.contains(&node_id_str) {
|
||||
push_and_emit(&logs, &opts, &node_id_str, &ctx, format!("skip node (merge dedup): {}", node_id_str)).await;
|
||||
info!(target: "udmin.flow", "skip node (merge dedup): {}", node_id_str);
|
||||
break;
|
||||
} else {
|
||||
mark.insert(node_id_str.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 执行任务(若有)
|
||||
if let Some(task_name) = &node.task {
|
||||
if let Some(task) = tasks.get(task_name) {
|
||||
match opts.execution_mode {
|
||||
let in_groups = node_groups.get(&node_id_str).map(|v| !v.is_empty()).unwrap_or(false);
|
||||
let node_exec_mode = match opts.execution_mode {
|
||||
ExecutionMode::Sync => ExecutionMode::Sync,
|
||||
ExecutionMode::AsyncFireAndForget => if in_groups { ExecutionMode::AsyncFireAndForget } else { ExecutionMode::Sync },
|
||||
ExecutionMode::AsyncQueued | ExecutionMode::AsyncBounded => if in_groups { ExecutionMode::AsyncFireAndForget } else { ExecutionMode::Sync },
|
||||
};
|
||||
|
||||
match node_exec_mode {
|
||||
ExecutionMode::Sync => {
|
||||
// 使用快照执行,结束后整体写回(允许最后写入覆盖并发修改;程序端不做冲突校验)
|
||||
let mut local_ctx = { ctx.read().await.clone() };
|
||||
match task.execute(&node.id, node, &mut local_ctx).await {
|
||||
Ok(_) => {
|
||||
@ -295,15 +297,13 @@ async fn drive_from(
|
||||
Err(e) => {
|
||||
let err_msg = format!("task error: {}: {}", task_name, e);
|
||||
push_and_emit(&logs, &opts, &node_id_str, &ctx, err_msg.clone()).await;
|
||||
// 捕获快照并返回 DriveError
|
||||
let ctx_snapshot = { ctx.read().await.clone() };
|
||||
let logs_snapshot = { logs.lock().await.clone() };
|
||||
return Err(anyhow::Error::new(DriveError { node_id: node_id_str.clone(), ctx: ctx_snapshot, logs: logs_snapshot, message: err_msg }));
|
||||
}
|
||||
}
|
||||
}
|
||||
ExecutionMode::AsyncFireAndForget => {
|
||||
// fire-and-forget:基于快照执行,不写回共享 ctx(变量任务除外:做有界差异写回)
|
||||
ExecutionMode::AsyncFireAndForget | ExecutionMode::AsyncQueued | ExecutionMode::AsyncBounded => {
|
||||
let task_ctx = { ctx.read().await.clone() };
|
||||
let task_arc = task.clone();
|
||||
let name_for_log = task_name.clone();
|
||||
@ -312,13 +312,35 @@ async fn drive_from(
|
||||
let logs_clone = logs.clone();
|
||||
let ctx_clone = ctx.clone();
|
||||
let event_tx_opt = opts.event_tx.clone();
|
||||
tokio::spawn(async move {
|
||||
|
||||
let sems_opt = opts.group_semaphores.clone();
|
||||
let exec_mode_copy = opts.execution_mode.clone();
|
||||
let bounded_limit = opts.bounded_limit;
|
||||
let deepest_gid_for_async = node_groups.get(&node_id_str).and_then(|gids| {
|
||||
if gids.is_empty() { None } else { gids.iter().max_by_key(|gid| group_depths.get(*gid).copied().unwrap_or(0)).cloned() }
|
||||
});
|
||||
|
||||
let handle: tokio::task::JoinHandle<()> = tokio::spawn(async move {
|
||||
if let Some(gid) = deepest_gid_for_async.clone() {
|
||||
if matches!(exec_mode_copy, ExecutionMode::AsyncQueued | ExecutionMode::AsyncBounded) {
|
||||
if let Some(sems_arc) = sems_opt.as_ref() {
|
||||
let sem = {
|
||||
let mut map = sems_arc.lock().await;
|
||||
use std::collections::hash_map::Entry;
|
||||
match map.entry(gid.clone()) {
|
||||
Entry::Occupied(e) => e.get().clone(),
|
||||
Entry::Vacant(v) => {
|
||||
let limit = match exec_mode_copy { ExecutionMode::AsyncQueued => 1, ExecutionMode::AsyncBounded => bounded_limit.unwrap_or(2), _ => 1 };
|
||||
let s = std::sync::Arc::new(Semaphore::new(limit));
|
||||
v.insert(s.clone());
|
||||
s
|
||||
}
|
||||
}
|
||||
};
|
||||
let _permit = sem.acquire_owned().await.unwrap();
|
||||
let mut c = task_ctx.clone();
|
||||
let _ = task_arc.execute(&node_id, &node_def, &mut c).await;
|
||||
|
||||
// 对 variable 任务执行写回:将顶层新增/修改的键写回共享 ctx,并移除对应 variable 节点
|
||||
if node_def.task.as_deref() == Some("variable") {
|
||||
// 计算顶层差异(排除 nodes),仅在不同或新增时写回
|
||||
let mut changed: Vec<(String, serde_json::Value)> = Vec::new();
|
||||
if let (serde_json::Value::Object(before_map), serde_json::Value::Object(after_map)) = (&task_ctx, &c) {
|
||||
for (k, v_after) in after_map.iter() {
|
||||
@ -342,18 +364,132 @@ async fn drive_from(
|
||||
let mut lg = logs_clone.lock().await;
|
||||
lg.push(format!("exec task done (async): {} (writeback variable)", name_for_log));
|
||||
}
|
||||
// 实时推送异步完成日志
|
||||
if let Some(tx) = event_tx_opt.as_ref() {
|
||||
let ctx_snapshot = { ctx_clone.read().await.clone() };
|
||||
crate::middlewares::sse::emit_node(&tx, node_id.0.clone(), vec![format!("exec task done (async): {} (writeback variable)", name_for_log)], ctx_snapshot).await;
|
||||
}
|
||||
info!(target: "udmin.flow", "exec task done (async): {} (writeback variable)", name_for_log);
|
||||
} else if matches!(node_def.task.as_deref(), Some("http") | Some("db")) {
|
||||
{
|
||||
let mut w = ctx_clone.write().await;
|
||||
if let serde_json::Value::Object(map) = &mut *w {
|
||||
let after_node = c.get("nodes").and_then(|v| v.get(node_id.0.as_str())).cloned();
|
||||
if let Some(serde_json::Value::Object(new_node_obj)) = after_node {
|
||||
if let Some(serde_json::Value::Object(nodes)) = map.get_mut("nodes") {
|
||||
nodes.insert(node_id.0.clone(), serde_json::Value::Object(new_node_obj));
|
||||
} else {
|
||||
let mut nodes_obj = serde_json::Map::new();
|
||||
nodes_obj.insert(node_id.0.clone(), serde_json::Value::Object(new_node_obj));
|
||||
map.insert("nodes".to_string(), serde_json::Value::Object(nodes_obj));
|
||||
}
|
||||
}
|
||||
for key in ["http_response", "db_response"] {
|
||||
let before = task_ctx.get(key);
|
||||
let after = c.get(key);
|
||||
if let Some(v_after) = after {
|
||||
if before != after {
|
||||
map.insert(key.to_string(), v_after.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
{
|
||||
let mut lg = logs_clone.lock().await;
|
||||
lg.push(format!("exec task done (async): {} (writeback outputs)", name_for_log));
|
||||
}
|
||||
if let Some(tx) = event_tx_opt.as_ref() {
|
||||
let ctx_snapshot = { ctx_clone.read().await.clone() };
|
||||
crate::middlewares::sse::emit_node(&tx, node_id.0.clone(), vec![format!("exec task done (async): {} (writeback outputs)", name_for_log)], ctx_snapshot).await;
|
||||
}
|
||||
} else {
|
||||
{
|
||||
let mut lg = logs_clone.lock().await;
|
||||
lg.push(format!("exec task done (async): {}", name_for_log));
|
||||
}
|
||||
if let Some(tx) = event_tx_opt.as_ref() {
|
||||
let ctx_snapshot = { ctx_clone.read().await.clone() };
|
||||
crate::middlewares::sse::emit_node(&tx, node_id.0.clone(), vec![format!("exec task done (async): {}", name_for_log)], ctx_snapshot).await;
|
||||
}
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
let mut c = task_ctx.clone();
|
||||
let _ = task_arc.execute(&node_id, &node_def, &mut c).await;
|
||||
|
||||
// variable 任务差异写回
|
||||
if node_def.task.as_deref() == Some("variable") {
|
||||
let mut changed: Vec<(String, serde_json::Value)> = Vec::new();
|
||||
if let (serde_json::Value::Object(before_map), serde_json::Value::Object(after_map)) = (&task_ctx, &c) {
|
||||
for (k, v_after) in after_map.iter() {
|
||||
if k == "nodes" { continue; }
|
||||
match before_map.get(k) {
|
||||
Some(v_before) if v_before == v_after => {}
|
||||
_ => changed.push((k.clone(), v_after.clone())),
|
||||
}
|
||||
}
|
||||
}
|
||||
{
|
||||
let mut w = ctx_clone.write().await;
|
||||
if let serde_json::Value::Object(map) = &mut *w {
|
||||
for (k, v) in changed.into_iter() { map.insert(k, v); }
|
||||
if let Some(serde_json::Value::Object(nodes)) = map.get_mut("nodes") {
|
||||
nodes.remove(node_id.0.as_str());
|
||||
}
|
||||
}
|
||||
}
|
||||
{
|
||||
let mut lg = logs_clone.lock().await;
|
||||
lg.push(format!("exec task done (async): {} (writeback variable)", name_for_log));
|
||||
}
|
||||
if let Some(tx) = event_tx_opt.as_ref() {
|
||||
let ctx_snapshot = { ctx_clone.read().await.clone() };
|
||||
crate::middlewares::sse::emit_node(&tx, node_id.0.clone(), vec![format!("exec task done (async): {} (writeback variable)", name_for_log)], ctx_snapshot).await;
|
||||
}
|
||||
info!(target: "udmin.flow", "exec task done (async): {} (writeback variable)", name_for_log);
|
||||
} else if matches!(node_def.task.as_deref(), Some("http") | Some("db")) {
|
||||
// http/db 写回策略
|
||||
{
|
||||
let mut w = ctx_clone.write().await;
|
||||
if let serde_json::Value::Object(map) = &mut *w {
|
||||
let after_node = c.get("nodes").and_then(|v| v.get(node_id.0.as_str())).cloned();
|
||||
if let Some(serde_json::Value::Object(new_node_obj)) = after_node {
|
||||
if let Some(serde_json::Value::Object(nodes)) = map.get_mut("nodes") {
|
||||
nodes.insert(node_id.0.clone(), serde_json::Value::Object(new_node_obj));
|
||||
} else {
|
||||
// 写回到 ctx.nodes.<node_id>:记录任务类型与响应体
|
||||
let mut nodes_obj = serde_json::Map::new();
|
||||
nodes_obj.insert(node_id.0.clone(), serde_json::Value::Object(new_node_obj));
|
||||
map.insert("nodes".to_string(), serde_json::Value::Object(nodes_obj));
|
||||
}
|
||||
}
|
||||
// 若顶层 http_response/db_response 发生变化,选择性写回共享 ctx
|
||||
for key in ["http_response", "db_response"] {
|
||||
let before = task_ctx.get(key);
|
||||
let after = c.get(key);
|
||||
if let Some(v_after) = after {
|
||||
if before != after {
|
||||
map.insert(key.to_string(), v_after.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
{
|
||||
let mut lg = logs_clone.lock().await;
|
||||
lg.push(format!("exec task done (async): {} (writeback outputs)", name_for_log));
|
||||
}
|
||||
if let Some(tx) = event_tx_opt.as_ref() {
|
||||
let ctx_snapshot = { ctx_clone.read().await.clone() };
|
||||
crate::middlewares::sse::emit_node(&tx, node_id.0.clone(), vec![format!("exec task done (async): {} (writeback outputs)", name_for_log)], ctx_snapshot).await;
|
||||
}
|
||||
info!(target: "udmin.flow", "exec task done (async): {} (writeback outputs)", name_for_log);
|
||||
} else {
|
||||
{
|
||||
let mut lg = logs_clone.lock().await;
|
||||
lg.push(format!("exec task done (async): {}", name_for_log));
|
||||
}
|
||||
// 实时推送异步完成日志
|
||||
if let Some(tx) = event_tx_opt.as_ref() {
|
||||
let ctx_snapshot = { ctx_clone.read().await.clone() };
|
||||
crate::middlewares::sse::emit_node(&tx, node_id.0.clone(), vec![format!("exec task done (async): {}", name_for_log)], ctx_snapshot).await;
|
||||
@ -361,8 +497,28 @@ async fn drive_from(
|
||||
info!(target: "udmin.flow", "exec task done (async): {}", name_for_log);
|
||||
}
|
||||
});
|
||||
push_and_emit(&logs, &opts, &node_id_str, &ctx, format!("spawn task: {} (async)", task_name)).await;
|
||||
info!(target: "udmin.flow", "spawn task: {} (async)", task_name);
|
||||
|
||||
push_and_emit(&logs, &opts, &node_id_str, &ctx, format!("spawn task: {} (async, by group)", task_name)).await;
|
||||
info!(target: "udmin.flow", "spawn task: {} (async, by group)", task_name);
|
||||
|
||||
// track join handle into group tracker(最内层组)
|
||||
if let Some(gids) = node_groups.get(&node_id_str) {
|
||||
if !gids.is_empty() {
|
||||
let deepest = gids.iter().max_by_key(|gid| group_depths.get(*gid).copied().unwrap_or(0)).cloned();
|
||||
if let Some(gid) = deepest {
|
||||
if let Some(tracker_arc) = opts.async_groups.as_ref() {
|
||||
let mut tracker = tracker_arc.lock().await;
|
||||
use std::collections::hash_map::Entry;
|
||||
match tracker.entry(gid.clone()) {
|
||||
Entry::Occupied(mut e) => { e.get_mut().push(handle); }
|
||||
Entry::Vacant(e) => { e.insert(vec![handle]); }
|
||||
}
|
||||
push_and_emit(&logs, &opts, &node_id_str, &ctx, format!("track async task in group: {}", gid)).await;
|
||||
info!(target: "udmin.flow", "track async task in group: {}", gid);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
@ -371,11 +527,12 @@ async fn drive_from(
|
||||
}
|
||||
}
|
||||
|
||||
// End 节点:记录耗时后结束
|
||||
// End 节点
|
||||
if matches!(node.kind, NodeKind::End) {
|
||||
let duration = node_start.elapsed().as_millis();
|
||||
push_and_emit(&logs, &opts, &node_id_str, &ctx, format!("leave node: {} {} ms", node_id_str, duration)).await;
|
||||
info!(target: "udmin.flow", "leave node: {} {} ms", node_id_str, duration);
|
||||
join_groups_by_policy(&groups, &opts.async_groups, GroupAwaitPolicy::FlowEnd, &logs, &opts, &node_id_str, &ctx).await;
|
||||
if let Some(tx) = opts.event_tx.as_ref() {
|
||||
let node_logs = { let lg = logs.lock().await; lg[pre_len..].to_vec() };
|
||||
let ctx_snapshot = { ctx.read().await.clone() };
|
||||
@ -384,18 +541,17 @@ async fn drive_from(
|
||||
break;
|
||||
}
|
||||
|
||||
// 选择下一批 link:仅在 Condition 节点上评估条件;其他节点忽略条件,直接沿第一条边前进
|
||||
// 选择后继
|
||||
let mut nexts: Vec<String> = Vec::new();
|
||||
if let Some(links) = adj.get(node.id.0.as_str()) {
|
||||
if matches!(node.kind, NodeKind::Condition) {
|
||||
// 条件边:全部评估为真者加入 nexts(空字符串条件视为无条件,不在此处评估)
|
||||
for link in links.iter() {
|
||||
if let Some(cond_str) = &link.condition {
|
||||
if cond_str.trim().is_empty() {
|
||||
// 空条件:视为无条件边,留待后续回退逻辑处理
|
||||
info!(target: "udmin.flow", from=%node.id.0, to=%link.to.0, "condition link: empty (unconditional candidate)");
|
||||
continue;
|
||||
}
|
||||
// 条件:以 '{' 或 '[' 开头按 JSON 规则;否则按 Rhai 表达式
|
||||
let trimmed = cond_str.trim_start();
|
||||
let (kind, ok) = if trimmed.starts_with('{') || trimmed.starts_with('[') {
|
||||
match serde_json::from_str::<serde_json::Value>(cond_str) {
|
||||
@ -412,11 +568,9 @@ async fn drive_from(
|
||||
info!(target: "udmin.flow", from=%node.id.0, to=%link.to.0, cond_kind=%kind, cond_len=%cond_str.len(), result=%ok, "condition link evaluated");
|
||||
if ok { nexts.push(link.to.0.clone()); }
|
||||
} else {
|
||||
// 无 condition 字段:视为无条件边
|
||||
info!(target: "udmin.flow", from=%node.id.0, to=%link.to.0, "condition link: none (unconditional candidate)");
|
||||
}
|
||||
}
|
||||
// 若没有命中条件边,则取第一条无条件边(无条件 = 无 condition 或 空字符串)
|
||||
if nexts.is_empty() {
|
||||
let mut picked = None;
|
||||
for link in links.iter() {
|
||||
@ -434,7 +588,6 @@ async fn drive_from(
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// 非条件节点:忽略条件,fan-out 所有出边(全部并行执行)
|
||||
for link in links.iter() {
|
||||
nexts.push(link.to.0.clone());
|
||||
info!(target: "udmin.flow", from=%node.id.0, to=%link.to.0, "fan-out from non-condition node");
|
||||
@ -442,7 +595,7 @@ async fn drive_from(
|
||||
}
|
||||
}
|
||||
|
||||
// 无后继:记录耗时后结束
|
||||
// 无后继 -> 结束
|
||||
if nexts.is_empty() {
|
||||
let duration = node_start.elapsed().as_millis();
|
||||
{
|
||||
@ -459,8 +612,18 @@ async fn drive_from(
|
||||
break;
|
||||
}
|
||||
|
||||
// 单分支:记录耗时后前进
|
||||
// 单分支
|
||||
if nexts.len() == 1 {
|
||||
let next_id = nexts[0].clone();
|
||||
let cur_groups = node_groups.get(&node_id_str).cloned().unwrap_or_default();
|
||||
let next_groups = node_groups.get(&next_id).cloned().unwrap_or_default();
|
||||
let mut exit_gids: Vec<String> = Vec::new();
|
||||
if !cur_groups.is_empty() {
|
||||
let next_set: HashSet<String> = next_groups.into_iter().collect();
|
||||
for gid in cur_groups.into_iter() { if !next_set.contains(&gid) { exit_gids.push(gid); } }
|
||||
}
|
||||
if !exit_gids.is_empty() { join_groups_by_ids(&groups, &opts.async_groups, &exit_gids, &logs, &opts, &node_id_str, &ctx).await; }
|
||||
|
||||
let duration = node_start.elapsed().as_millis();
|
||||
{
|
||||
let mut lg = logs.lock().await;
|
||||
@ -472,26 +635,44 @@ async fn drive_from(
|
||||
let ctx_snapshot = { ctx.read().await.clone() };
|
||||
crate::middlewares::sse::emit_node(&tx, node_id_str.clone(), node_logs, ctx_snapshot).await;
|
||||
}
|
||||
current = nexts.remove(0);
|
||||
|
||||
// 标记到达(合流计数)
|
||||
{
|
||||
let mut arr = arrivals.lock().await;
|
||||
let ent = arr.entry(next_id.clone()).or_insert(0);
|
||||
*ent += 1;
|
||||
}
|
||||
current = next_id;
|
||||
continue;
|
||||
}
|
||||
|
||||
// 多分支:主分支沿第一条继续,其余分支并行执行并等待完成
|
||||
// 多分支:主分支继续,其他并行
|
||||
let mut futs = Vec::new();
|
||||
for to_id in nexts.iter().skip(1).cloned() {
|
||||
let tasks_c = tasks.clone();
|
||||
let node_map_c = node_map.clone();
|
||||
let adj_c = adj.clone();
|
||||
let groups_c = groups.clone();
|
||||
let node_groups_c = node_groups.clone();
|
||||
let group_depths_c = group_depths.clone();
|
||||
let in_degrees_c = in_degrees.clone();
|
||||
let executed_merge_c = executed_merge.clone();
|
||||
let arrivals_c = arrivals.clone();
|
||||
let ctx_c = ctx.clone();
|
||||
let opts_c = opts.clone();
|
||||
let logs_c = logs.clone();
|
||||
futs.push(drive_from(tasks_c, node_map_c, adj_c, to_id, ctx_c, opts_c, logs_c));
|
||||
futs.push(drive_from(tasks_c, node_map_c, adj_c, groups_c, node_groups_c, group_depths_c, in_degrees_c, executed_merge_c, arrivals_c, to_id, ctx_c, opts_c, logs_c));
|
||||
}
|
||||
// 当前分支继续第一条
|
||||
|
||||
// 当前分支继续第一个
|
||||
current = nexts.into_iter().next().unwrap();
|
||||
// 在一个安全点等待已分支的完成(这里选择在下一轮进入前等待)
|
||||
// 等待并分支完成
|
||||
let _ = join_all(futs).await;
|
||||
// 多分支:记录当前节点耗时(包含等待其他分支完成的时间)
|
||||
|
||||
// branch_exit group join
|
||||
join_groups_by_policy(&groups, &opts.async_groups, GroupAwaitPolicy::BranchExit, &logs, &opts, &node_id_str, &ctx).await;
|
||||
|
||||
// 记录离开(包含等待时间)
|
||||
let duration = node_start.elapsed().as_millis();
|
||||
{
|
||||
let mut lg = logs.lock().await;
|
||||
@ -508,28 +689,26 @@ async fn drive_from(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct FlowEngineBuilder {
|
||||
tasks: Option<TaskRegistry>,
|
||||
}
|
||||
|
||||
impl FlowEngineBuilder {
|
||||
/// 指定任务注册表
|
||||
pub fn tasks(mut self, reg: TaskRegistry) -> Self { self.tasks = Some(reg); self }
|
||||
/// 构建引擎,若未指定则使用默认注册表
|
||||
pub fn build(self) -> FlowEngine {
|
||||
let tasks = self.tasks.unwrap_or_else(|| crate::flow::task::default_registry());
|
||||
FlowEngine { tasks }
|
||||
}
|
||||
}
|
||||
|
||||
/// 使用默认任务注册表构建引擎
|
||||
impl Default for FlowEngine {
|
||||
fn default() -> Self { Self { tasks: crate::flow::task::default_registry() } }
|
||||
}
|
||||
|
||||
|
||||
/// 友好展示:以 message 字段为主进行字符串输出
|
||||
impl std::fmt::Display for DriveError {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{}", self.message)
|
||||
}
|
||||
}
|
||||
|
||||
impl std::error::Error for DriveError {}
|
||||
@ -166,7 +166,7 @@ pub(crate) fn resolve_value(ctx: &serde_json::Value, v: &serde_json::Value) -> R
|
||||
"expression" => {
|
||||
let expr = v.get("content").and_then(|x| x.as_str()).unwrap_or("");
|
||||
if expr.trim().is_empty() { return Ok(V::Null); }
|
||||
Ok(crate::flow::engine::eval_rhai_expr_json(expr, ctx).unwrap_or_else(|_| V::Null))
|
||||
Ok(crate::flow::executors::script_rhai::eval_rhai_expr_json(expr, ctx).unwrap_or_else(|_| V::Null))
|
||||
}
|
||||
_ => Ok(V::Null),
|
||||
}
|
||||
|
||||
@ -6,20 +6,142 @@ use tracing::{debug, info};
|
||||
use anyhow::anyhow;
|
||||
|
||||
use crate::flow::domain::{NodeDef, NodeId};
|
||||
use crate::flow::engine::eval_rhai_expr_json;
|
||||
use crate::flow::task::Executor;
|
||||
|
||||
// ---- Rhai 引擎与表达式评估(迁移自 engine.rs) ----
|
||||
use std::cell::RefCell;
|
||||
use std::collections::HashMap;
|
||||
use regex::Regex;
|
||||
use rhai::{AST, Engine};
|
||||
|
||||
const AST_CACHE_LIMIT: usize = 1024;
|
||||
|
||||
fn contains(s: &str, sub: &str) -> bool { s.contains(sub) }
|
||||
fn starts_with(s: &str, prefix: &str) -> bool { s.starts_with(prefix) }
|
||||
fn ends_with(s: &str, suffix: &str) -> bool { s.ends_with(suffix) }
|
||||
|
||||
fn is_empty(v: rhai::Dynamic) -> bool {
|
||||
if v.is_unit() { return true; }
|
||||
if let Some(s) = v.clone().try_cast::<rhai::ImmutableString>() { return s.is_empty(); }
|
||||
if let Some(a) = v.clone().try_cast::<rhai::Array>() { return a.is_empty(); }
|
||||
if let Some(m) = v.clone().try_cast::<rhai::Map>() { return m.is_empty(); }
|
||||
false
|
||||
}
|
||||
fn not_empty(v: rhai::Dynamic) -> bool { !is_empty(v) }
|
||||
|
||||
thread_local! {
|
||||
static RHAI_ENGINE: RefCell<Engine> = RefCell::new({
|
||||
let mut eng = Engine::new();
|
||||
eng.set_max_operations(10_000_000);
|
||||
eng.set_strict_variables(true);
|
||||
eng.register_fn("regex_match", regex_match);
|
||||
eng.register_fn("contains", contains);
|
||||
eng.register_fn("starts_with", starts_with);
|
||||
eng.register_fn("ends_with", ends_with);
|
||||
eng.register_fn("is_empty", is_empty);
|
||||
eng.register_fn("not_empty", not_empty);
|
||||
eng
|
||||
});
|
||||
|
||||
static AST_CACHE: RefCell<HashMap<String, AST>> = RefCell::new(HashMap::new());
|
||||
static REGEX_CACHE: RefCell<HashMap<String, Regex>> = RefCell::new(HashMap::new());
|
||||
}
|
||||
|
||||
/// 正则匹配:带线程本地缓存
|
||||
fn regex_match(s: &str, pat: &str) -> bool {
|
||||
let compiled = REGEX_CACHE.with(|c| {
|
||||
let mut m = c.borrow_mut();
|
||||
if let Some(re) = m.get(pat) { return Some(re.clone()); }
|
||||
match Regex::new(pat) {
|
||||
Ok(re) => { m.insert(pat.to_string(), re.clone()); Some(re) }
|
||||
Err(_) => None,
|
||||
}
|
||||
});
|
||||
compiled.map(|re| re.is_match(s)).unwrap_or(false)
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum RhaiExecError {
|
||||
Compile { message: String },
|
||||
Runtime { message: String },
|
||||
Serde { message: String },
|
||||
}
|
||||
|
||||
impl std::fmt::Display for RhaiExecError {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
RhaiExecError::Compile { message } => write!(f, "compile error: {}", message),
|
||||
RhaiExecError::Runtime { message } => write!(f, "runtime error: {}", message),
|
||||
RhaiExecError::Serde { message } => write!(f, "serde error: {}", message),
|
||||
}
|
||||
}
|
||||
}
|
||||
impl std::error::Error for RhaiExecError {}
|
||||
|
||||
/// 评估 Rhai 表达式(bool 版本),出错时返回 false
|
||||
pub(crate) fn eval_rhai_expr_bool(expr: &str, ctx: &serde_json::Value) -> bool {
|
||||
let mut scope = rhai::Scope::new();
|
||||
let dyn_ctx = match rhai::serde::to_dynamic(ctx.clone()) { Ok(d) => d, Err(_) => rhai::Dynamic::UNIT };
|
||||
scope.push_dynamic("ctx", dyn_ctx);
|
||||
|
||||
if let Some(ast) = AST_CACHE.with(|c| c.borrow().get(expr).cloned()) {
|
||||
return RHAI_ENGINE.with(|eng| eng.borrow().eval_ast_with_scope::<bool>(&mut scope, &ast).unwrap_or(false));
|
||||
}
|
||||
|
||||
match RHAI_ENGINE.with(|eng| eng.borrow().compile_with_scope(&mut scope, expr)) {
|
||||
Ok(ast) => {
|
||||
AST_CACHE.with(|c| {
|
||||
let mut cache = c.borrow_mut();
|
||||
if cache.len() > AST_CACHE_LIMIT { cache.clear(); }
|
||||
cache.insert(expr.to_string(), ast.clone());
|
||||
});
|
||||
RHAI_ENGINE.with(|eng| eng.borrow().eval_ast_with_scope::<bool>(&mut scope, &ast).unwrap_or(false))
|
||||
}
|
||||
Err(_) => false,
|
||||
}
|
||||
}
|
||||
|
||||
/// 评估 Rhai 表达式并返回 serde_json::Value
|
||||
pub(crate) fn eval_rhai_expr_json(expr: &str, ctx: &serde_json::Value) -> Result<serde_json::Value, RhaiExecError> {
|
||||
let mut scope = rhai::Scope::new();
|
||||
let dyn_ctx = match rhai::serde::to_dynamic(ctx.clone()) { Ok(d) => d, Err(e) => {
|
||||
return Err(RhaiExecError::Serde { message: e.to_string() });
|
||||
} };
|
||||
scope.push_dynamic("ctx", dyn_ctx);
|
||||
|
||||
let eval = |ast: &AST, scope: &mut rhai::Scope| -> Result<serde_json::Value, RhaiExecError> {
|
||||
RHAI_ENGINE.with(|eng| {
|
||||
eng.borrow()
|
||||
.eval_ast_with_scope::<rhai::Dynamic>(scope, ast)
|
||||
.map_err(|e| RhaiExecError::Runtime { message: e.to_string() })
|
||||
.and_then(|d| rhai::serde::from_dynamic(&d).map_err(|e| RhaiExecError::Serde { message: e.to_string() }))
|
||||
})
|
||||
};
|
||||
|
||||
if let Some(ast) = AST_CACHE.with(|c| c.borrow().get(expr).cloned()) {
|
||||
return eval(&ast, &mut scope);
|
||||
}
|
||||
|
||||
match RHAI_ENGINE.with(|eng| eng.borrow().compile_with_scope(&mut scope, expr)) {
|
||||
Ok(ast) => {
|
||||
AST_CACHE.with(|c| {
|
||||
let mut cache = c.borrow_mut();
|
||||
if cache.len() > AST_CACHE_LIMIT { cache.clear(); }
|
||||
cache.insert(expr.to_string(), ast.clone());
|
||||
});
|
||||
eval(&ast, &mut scope)
|
||||
}
|
||||
Err(e) => Err(RhaiExecError::Compile { message: e.to_string() }),
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct ScriptRhaiTask;
|
||||
|
||||
/// 截断长字符串(去掉换行),用于日志预览
|
||||
fn truncate_str(s: &str, max: usize) -> String {
|
||||
let s = s.replace(['\n', '\r'], " ");
|
||||
if s.len() <= max {
|
||||
s
|
||||
} else {
|
||||
format!("{}…", &s[..max])
|
||||
}
|
||||
if s.len() <= max { s } else { format!("{}…", &s[..max]) }
|
||||
}
|
||||
|
||||
/// 对比两个 JSON(仅浅层),返回 (新增字段, 删除字段, 修改字段)
|
||||
@ -30,26 +152,16 @@ fn shallow_diff(before: &Value, after: &Value) -> (Vec<String>, Vec<String>, Vec
|
||||
let mut modified = Vec::new();
|
||||
|
||||
let (Some(bm), Some(am)) = (before.as_object(), after.as_object()) else {
|
||||
if before != after {
|
||||
modified.push("<root>".to_string());
|
||||
}
|
||||
if before != after { modified.push("<root>".to_string()); }
|
||||
return (added, removed, modified);
|
||||
};
|
||||
|
||||
let bkeys: BTreeSet<_> = bm.keys().cloned().collect();
|
||||
let akeys: BTreeSet<_> = am.keys().cloned().collect();
|
||||
|
||||
for k in akeys.difference(&bkeys) {
|
||||
added.push(k.to_string());
|
||||
}
|
||||
for k in bkeys.difference(&akeys) {
|
||||
removed.push(k.to_string());
|
||||
}
|
||||
for k in akeys.intersection(&bkeys) {
|
||||
if bm.get(k) != am.get(k) {
|
||||
modified.push(k.to_string());
|
||||
}
|
||||
}
|
||||
for k in akeys.difference(&bkeys) { added.push(k.to_string()); }
|
||||
for k in bkeys.difference(&akeys) { removed.push(k.to_string()); }
|
||||
for k in akeys.intersection(&bkeys) { if bm.get(k) != am.get(k) { modified.push(k.to_string()); } }
|
||||
(added, removed, modified)
|
||||
}
|
||||
|
||||
|
||||
@ -5,7 +5,7 @@ use tracing::info;
|
||||
|
||||
// crate
|
||||
use crate::flow::domain::{NodeDef, NodeId};
|
||||
use crate::flow::engine::eval_rhai_expr_json;
|
||||
use crate::flow::executors::script_rhai::eval_rhai_expr_json;
|
||||
use crate::flow::task::Executor;
|
||||
|
||||
#[derive(Default)]
|
||||
|
||||
@ -301,6 +301,8 @@ async fn run_internal(
|
||||
.or_else(|| design.get("execution_mode").and_then(|v| v.as_str()))
|
||||
.unwrap_or("sync");
|
||||
exec_mode = parse_execution_mode(mode_str);
|
||||
let bounded_limit = design.get("concurrencyLimit").and_then(|v| v.as_u64()).map(|x| x as usize);
|
||||
let _ = bounded_limit;
|
||||
(chain_from_json, ctx)
|
||||
} else {
|
||||
let dsl = match serde_yaml::from_str::<FlowDSL>(&doc.yaml) {
|
||||
@ -344,7 +346,12 @@ async fn run_internal(
|
||||
|
||||
// 执行
|
||||
let drive_res = engine
|
||||
.drive(&chain, ctx, DriveOptions { execution_mode: exec_mode.clone(), event_tx, ..Default::default() })
|
||||
.drive(&chain, ctx, DriveOptions {
|
||||
execution_mode: exec_mode.clone(),
|
||||
event_tx,
|
||||
bounded_limit: if matches!(exec_mode, ExecutionMode::AsyncBounded) { design_concurrency_limit(&doc.design_json) } else { None },
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
|
||||
match drive_res {
|
||||
@ -446,6 +453,16 @@ fn merge_json(a: &mut serde_json::Value, b: &serde_json::Value) {
|
||||
fn parse_execution_mode(s: &str) -> ExecutionMode {
|
||||
match s.to_ascii_lowercase().as_str() {
|
||||
"async" | "async_fire_and_forget" | "fire_and_forget" => ExecutionMode::AsyncFireAndForget,
|
||||
"queued" | "queue" => ExecutionMode::AsyncQueued,
|
||||
"bounded" | "parallel_bounded" | "bounded_parallel" => ExecutionMode::AsyncBounded,
|
||||
_ => ExecutionMode::Sync,
|
||||
}
|
||||
}
|
||||
|
||||
fn design_concurrency_limit(design_json: &Option<serde_json::Value>) -> Option<usize> {
|
||||
design_json
|
||||
.as_ref()
|
||||
.and_then(|d| d.get("concurrencyLimit"))
|
||||
.and_then(|v| v.as_u64())
|
||||
.map(|x| x as usize)
|
||||
}
|
||||
336
docs/flow/code_js1.json
Normal file
336
docs/flow/code_js1.json
Normal file
@ -0,0 +1,336 @@
|
||||
{
|
||||
"nodes": [
|
||||
{
|
||||
"id": "start_0",
|
||||
"type": "start",
|
||||
"meta": {
|
||||
"position": {
|
||||
"x": 180,
|
||||
"y": 189.8
|
||||
}
|
||||
},
|
||||
"data": {
|
||||
"title": "Start",
|
||||
"outputs": {
|
||||
"type": "object",
|
||||
"properties": {},
|
||||
"required": []
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"id": "end_0",
|
||||
"type": "end",
|
||||
"meta": {
|
||||
"position": {
|
||||
"x": 2940,
|
||||
"y": 189.8
|
||||
}
|
||||
},
|
||||
"data": {
|
||||
"title": "End"
|
||||
}
|
||||
},
|
||||
{
|
||||
"id": "variable_zOb3P",
|
||||
"type": "variable",
|
||||
"meta": {
|
||||
"position": {
|
||||
"x": 2020,
|
||||
"y": 179.8
|
||||
}
|
||||
},
|
||||
"data": {
|
||||
"title": "Variable_1",
|
||||
"assign": [
|
||||
{
|
||||
"operator": "declare",
|
||||
"left": "jss",
|
||||
"right": {
|
||||
"type": "constant",
|
||||
"content": "${\"user_n\"}",
|
||||
"schema": {
|
||||
"type": "string"
|
||||
}
|
||||
}
|
||||
}
|
||||
],
|
||||
"outputs": {
|
||||
"type": "object",
|
||||
"required": [],
|
||||
"properties": {
|
||||
"jss": {
|
||||
"type": "string"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"id": "code_y71Sd",
|
||||
"type": "javascript",
|
||||
"meta": {
|
||||
"position": {
|
||||
"x": 640,
|
||||
"y": 162.3
|
||||
}
|
||||
},
|
||||
"data": {
|
||||
"title": "JS",
|
||||
"inputsValues": {
|
||||
"input": {
|
||||
"type": "constant",
|
||||
"content": "",
|
||||
"schema": {
|
||||
"type": "string"
|
||||
},
|
||||
"extra": {
|
||||
"index": 0
|
||||
}
|
||||
}
|
||||
},
|
||||
"script": {
|
||||
"language": "javascript",
|
||||
"content": "ctx.vri=\"usertest\"; ctx[\"user_n\"]=\"user_nuser_n\" for (let i = 0; i < 5; i++) { //console.log(`第 ${i} 次循环`); ctx[i]=\"v\"+i; }"
|
||||
},
|
||||
"outputs": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"key0": {
|
||||
"type": "string"
|
||||
},
|
||||
"key1": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"key2": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"key21": {
|
||||
"type": "string"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"inputs": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"input": {
|
||||
"type": "string"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"id": "http_0IIt-",
|
||||
"type": "http",
|
||||
"meta": {
|
||||
"position": {
|
||||
"x": 1598.880308880309,
|
||||
"y": -317.18146718146716
|
||||
}
|
||||
},
|
||||
"data": {
|
||||
"title": "HTTP_1",
|
||||
"api": {
|
||||
"method": "GET",
|
||||
"url": {
|
||||
"type": "template",
|
||||
"content": "https://account.aliyun.com"
|
||||
}
|
||||
},
|
||||
"body": {
|
||||
"bodyType": "JSON"
|
||||
},
|
||||
"outputs": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"body": {
|
||||
"type": "string"
|
||||
},
|
||||
"headers": {
|
||||
"type": "object"
|
||||
},
|
||||
"statusCode": {
|
||||
"type": "integer"
|
||||
}
|
||||
}
|
||||
},
|
||||
"timeout": {
|
||||
"timeout": 10000,
|
||||
"retryTimes": 1
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"id": "variable_zy_Ae",
|
||||
"type": "variable",
|
||||
"meta": {
|
||||
"position": {
|
||||
"x": 2480,
|
||||
"y": 179.8
|
||||
}
|
||||
},
|
||||
"data": {
|
||||
"title": "http响应",
|
||||
"assign": [
|
||||
{
|
||||
"operator": "declare",
|
||||
"left": "http_resp",
|
||||
"right": {
|
||||
"type": "ref",
|
||||
"content": [
|
||||
"http_0IIt-",
|
||||
"headers"
|
||||
]
|
||||
}
|
||||
}
|
||||
],
|
||||
"outputs": {
|
||||
"type": "object",
|
||||
"required": [],
|
||||
"properties": {
|
||||
"http_resp": {
|
||||
"type": "object",
|
||||
"required": [],
|
||||
"properties": {}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"id": "code_iyMNK",
|
||||
"type": "script",
|
||||
"meta": {
|
||||
"position": {
|
||||
"x": 1100,
|
||||
"y": 162.3
|
||||
}
|
||||
},
|
||||
"data": {
|
||||
"title": "rhai",
|
||||
"inputsValues": {
|
||||
"input": {
|
||||
"type": "constant",
|
||||
"content": ""
|
||||
}
|
||||
},
|
||||
"script": {
|
||||
"language": "rhai",
|
||||
"content": "// 修改字段 ctx.addr_name = \"Alice\"; ctx.count = 10; // 添加新字段 ctx.city = \"Beijing\"; // 还可以用下标方式 ctx[\"extra\"] = 123;"
|
||||
},
|
||||
"outputs": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"key0": {
|
||||
"type": "string"
|
||||
},
|
||||
"key1": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"key2": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"key21": {
|
||||
"type": "string"
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"required": []
|
||||
},
|
||||
"inputs": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"input": {
|
||||
"type": "string"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"id": "db_LMgXg",
|
||||
"type": "db",
|
||||
"meta": {
|
||||
"position": {
|
||||
"x": 1606.042471042471,
|
||||
"y": 253.8
|
||||
}
|
||||
},
|
||||
"data": {
|
||||
"title": "DB_1",
|
||||
"db": {
|
||||
"sql": {
|
||||
"type": "template",
|
||||
"content": "SELECT t.* FROM udmin.departments t"
|
||||
},
|
||||
"params": [],
|
||||
"outputKey": "db_response",
|
||||
"connection": {
|
||||
"driver": "mysql",
|
||||
"mode": "fields",
|
||||
"database": "udmin",
|
||||
"host": "127.0.0.1",
|
||||
"port": 3306,
|
||||
"username": "root",
|
||||
"password": "123456"
|
||||
},
|
||||
"output": {
|
||||
"mode": "rows"
|
||||
}
|
||||
},
|
||||
"outputs": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"db_response": {
|
||||
"type": "object"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
],
|
||||
"edges": [
|
||||
{
|
||||
"sourceNodeID": "start_0",
|
||||
"targetNodeID": "code_y71Sd"
|
||||
},
|
||||
{
|
||||
"sourceNodeID": "variable_zy_Ae",
|
||||
"targetNodeID": "end_0"
|
||||
},
|
||||
{
|
||||
"sourceNodeID": "http_0IIt-",
|
||||
"targetNodeID": "variable_zOb3P"
|
||||
},
|
||||
{
|
||||
"sourceNodeID": "db_LMgXg",
|
||||
"targetNodeID": "variable_zOb3P"
|
||||
},
|
||||
{
|
||||
"sourceNodeID": "variable_zOb3P",
|
||||
"targetNodeID": "variable_zy_Ae"
|
||||
},
|
||||
{
|
||||
"sourceNodeID": "code_y71Sd",
|
||||
"targetNodeID": "code_iyMNK"
|
||||
},
|
||||
{
|
||||
"sourceNodeID": "code_iyMNK",
|
||||
"targetNodeID": "http_0IIt-"
|
||||
},
|
||||
{
|
||||
"sourceNodeID": "code_iyMNK",
|
||||
"targetNodeID": "db_LMgXg"
|
||||
}
|
||||
]
|
||||
}
|
||||
355
docs/flow/code_js1_group.json
Normal file
355
docs/flow/code_js1_group.json
Normal file
@ -0,0 +1,355 @@
|
||||
{
|
||||
"nodes": [
|
||||
{
|
||||
"id": "start_0",
|
||||
"type": "start",
|
||||
"meta": {
|
||||
"position": {
|
||||
"x": 180,
|
||||
"y": 189.8
|
||||
}
|
||||
},
|
||||
"data": {
|
||||
"title": "Start",
|
||||
"outputs": {
|
||||
"type": "object",
|
||||
"properties": {},
|
||||
"required": []
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"id": "end_0",
|
||||
"type": "end",
|
||||
"meta": {
|
||||
"position": {
|
||||
"x": 2940,
|
||||
"y": 189.8
|
||||
}
|
||||
},
|
||||
"data": {
|
||||
"title": "End"
|
||||
}
|
||||
},
|
||||
{
|
||||
"id": "variable_zOb3P",
|
||||
"type": "variable",
|
||||
"meta": {
|
||||
"position": {
|
||||
"x": 2020,
|
||||
"y": 179.8
|
||||
}
|
||||
},
|
||||
"data": {
|
||||
"title": "Variable_1",
|
||||
"assign": [
|
||||
{
|
||||
"operator": "declare",
|
||||
"left": "jss",
|
||||
"right": {
|
||||
"type": "constant",
|
||||
"content": "${\"user_n\"}",
|
||||
"schema": {
|
||||
"type": "string"
|
||||
}
|
||||
}
|
||||
}
|
||||
],
|
||||
"outputs": {
|
||||
"type": "object",
|
||||
"required": [],
|
||||
"properties": {
|
||||
"jss": {
|
||||
"type": "string"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"id": "code_y71Sd",
|
||||
"type": "javascript",
|
||||
"meta": {
|
||||
"position": {
|
||||
"x": 640,
|
||||
"y": 162.3
|
||||
}
|
||||
},
|
||||
"data": {
|
||||
"title": "JS",
|
||||
"inputsValues": {
|
||||
"input": {
|
||||
"type": "constant",
|
||||
"content": "",
|
||||
"schema": {
|
||||
"type": "string"
|
||||
},
|
||||
"extra": {
|
||||
"index": 0
|
||||
}
|
||||
}
|
||||
},
|
||||
"script": {
|
||||
"language": "javascript",
|
||||
"content": "ctx.vri=\"usertest\"; ctx[\"user_n\"]=\"user_nuser_n\" for (let i = 0; i < 5; i++) { //console.log(`第 ${i} 次循环`); ctx[i]=\"v\"+i; }"
|
||||
},
|
||||
"outputs": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"key0": {
|
||||
"type": "string"
|
||||
},
|
||||
"key1": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"key2": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"key21": {
|
||||
"type": "string"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"inputs": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"input": {
|
||||
"type": "string"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"id": "variable_zy_Ae",
|
||||
"type": "variable",
|
||||
"meta": {
|
||||
"position": {
|
||||
"x": 2480,
|
||||
"y": 179.8
|
||||
}
|
||||
},
|
||||
"data": {
|
||||
"title": "http响应",
|
||||
"assign": [
|
||||
{
|
||||
"operator": "declare",
|
||||
"left": "http_resp",
|
||||
"right": {
|
||||
"type": "ref",
|
||||
"content": [
|
||||
"http_0IIt-",
|
||||
"headers"
|
||||
]
|
||||
}
|
||||
}
|
||||
],
|
||||
"outputs": {
|
||||
"type": "object",
|
||||
"required": [],
|
||||
"properties": {
|
||||
"http_resp": {
|
||||
"type": "object",
|
||||
"required": [],
|
||||
"properties": {}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"id": "code_iyMNK",
|
||||
"type": "script",
|
||||
"meta": {
|
||||
"position": {
|
||||
"x": 1100,
|
||||
"y": 162.3
|
||||
}
|
||||
},
|
||||
"data": {
|
||||
"title": "rhai",
|
||||
"inputsValues": {
|
||||
"input": {
|
||||
"type": "constant",
|
||||
"content": ""
|
||||
}
|
||||
},
|
||||
"script": {
|
||||
"language": "rhai",
|
||||
"content": "// 修改字段 ctx.addr_name = \"Alice\"; ctx.count = 10; // 添加新字段 ctx.city = \"Beijing\"; // 还可以用下标方式 ctx[\"extra\"] = 123;"
|
||||
},
|
||||
"outputs": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"key0": {
|
||||
"type": "string"
|
||||
},
|
||||
"key1": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"key2": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"key21": {
|
||||
"type": "string"
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"required": []
|
||||
},
|
||||
"inputs": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"input": {
|
||||
"type": "string"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"id": "group_o4dbz",
|
||||
"type": "group",
|
||||
"meta": {
|
||||
"position": {
|
||||
"x": 0,
|
||||
"y": 0
|
||||
}
|
||||
},
|
||||
"data": {
|
||||
"color": "Green",
|
||||
"title": "Group_2",
|
||||
"parentID": "root",
|
||||
"blockIDs": [
|
||||
"http_0IIt-",
|
||||
"db_LMgXg"
|
||||
]
|
||||
}
|
||||
},
|
||||
{
|
||||
"id": "http_0IIt-",
|
||||
"type": "http",
|
||||
"meta": {
|
||||
"position": {
|
||||
"x": 1598.880308880309,
|
||||
"y": -317.18146718146716
|
||||
}
|
||||
},
|
||||
"data": {
|
||||
"title": "HTTP_1",
|
||||
"api": {
|
||||
"method": "GET",
|
||||
"url": {
|
||||
"type": "template",
|
||||
"content": "https://account.aliyun.com"
|
||||
}
|
||||
},
|
||||
"body": {
|
||||
"bodyType": "JSON"
|
||||
},
|
||||
"outputs": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"body": {
|
||||
"type": "string"
|
||||
},
|
||||
"headers": {
|
||||
"type": "object"
|
||||
},
|
||||
"statusCode": {
|
||||
"type": "integer"
|
||||
}
|
||||
}
|
||||
},
|
||||
"timeout": {
|
||||
"timeout": 10000,
|
||||
"retryTimes": 1
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"id": "db_LMgXg",
|
||||
"type": "db",
|
||||
"meta": {
|
||||
"position": {
|
||||
"x": 1598.880308880309,
|
||||
"y": 179.8
|
||||
}
|
||||
},
|
||||
"data": {
|
||||
"title": "DB_1",
|
||||
"db": {
|
||||
"sql": {
|
||||
"type": "template",
|
||||
"content": "SELECT t.* FROM udmin.departments t"
|
||||
},
|
||||
"params": [],
|
||||
"outputKey": "db_response",
|
||||
"connection": {
|
||||
"driver": "mysql",
|
||||
"mode": "fields",
|
||||
"database": "udmin",
|
||||
"host": "127.0.0.1",
|
||||
"port": 3306,
|
||||
"username": "root",
|
||||
"password": "123456"
|
||||
},
|
||||
"output": {
|
||||
"mode": "rows"
|
||||
}
|
||||
},
|
||||
"outputs": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"db_response": {
|
||||
"type": "object"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
],
|
||||
"edges": [
|
||||
{
|
||||
"sourceNodeID": "start_0",
|
||||
"targetNodeID": "code_y71Sd"
|
||||
},
|
||||
{
|
||||
"sourceNodeID": "variable_zy_Ae",
|
||||
"targetNodeID": "end_0"
|
||||
},
|
||||
{
|
||||
"sourceNodeID": "http_0IIt-",
|
||||
"targetNodeID": "variable_zOb3P"
|
||||
},
|
||||
{
|
||||
"sourceNodeID": "db_LMgXg",
|
||||
"targetNodeID": "variable_zOb3P"
|
||||
},
|
||||
{
|
||||
"sourceNodeID": "variable_zOb3P",
|
||||
"targetNodeID": "variable_zy_Ae"
|
||||
},
|
||||
{
|
||||
"sourceNodeID": "code_y71Sd",
|
||||
"targetNodeID": "code_iyMNK"
|
||||
},
|
||||
{
|
||||
"sourceNodeID": "code_iyMNK",
|
||||
"targetNodeID": "http_0IIt-"
|
||||
},
|
||||
{
|
||||
"sourceNodeID": "code_iyMNK",
|
||||
"targetNodeID": "db_LMgXg"
|
||||
}
|
||||
]
|
||||
}
|
||||
227
docs/flow_architecture.md
Normal file
227
docs/flow_architecture.md
Normal file
@ -0,0 +1,227 @@
|
||||
# Flow 架构与执行图(udmin)
|
||||
|
||||
## 模块架构
|
||||
```mermaid
|
||||
graph LR
|
||||
subgraph flow
|
||||
dsl[flow/dsl]
|
||||
domain[flow/domain]
|
||||
context[flow/context]
|
||||
engine[flow/engine]
|
||||
task[flow/task]
|
||||
executors[flow/executors/*]
|
||||
mappers[flow/mappers]
|
||||
log_handler[flow/log_handler]
|
||||
end
|
||||
|
||||
subgraph services
|
||||
svc_flow[services/flow_service]
|
||||
svc_logs[services/flow_run_log_service]
|
||||
end
|
||||
|
||||
subgraph routes
|
||||
r_flows[routes/flows]
|
||||
r_run_logs[routes/flow_run_logs]
|
||||
end
|
||||
|
||||
subgraph middlewares
|
||||
mw_sse[middlewares/sse]
|
||||
mw_ws[middlewares/ws]
|
||||
mw_jwt[middlewares/jwt]
|
||||
mw_http[middlewares/http_client]
|
||||
end
|
||||
|
||||
subgraph infra
|
||||
db[db]
|
||||
redis[redis]
|
||||
models_flow[models/flow]
|
||||
models_run[models/flow_run_log]
|
||||
end
|
||||
|
||||
r_flows --> svc_flow
|
||||
r_run_logs --> svc_logs
|
||||
svc_flow --> dsl
|
||||
svc_flow --> mappers
|
||||
dsl --> domain
|
||||
mappers --> context
|
||||
svc_flow --> engine
|
||||
engine --> task
|
||||
task --> executors
|
||||
executors --> mw_http
|
||||
engine --> log_handler
|
||||
log_handler --> svc_logs
|
||||
mw_sse -.events.-> r_flows
|
||||
mw_ws -.events.-> r_flows
|
||||
svc_flow --> models_flow
|
||||
svc_logs --> models_run
|
||||
models_flow --> db
|
||||
models_run --> db
|
||||
mw_jwt --> r_flows
|
||||
redis -.token check.-> mw_jwt
|
||||
```
|
||||
|
||||
引用:
|
||||
- DSL/Design 构建 `backend/src/flow/dsl.rs:60-93,138-170,172-203,246-303`
|
||||
- 领域模型 `backend/src/flow/domain.rs:20-28,31-36,39-47,60-68`
|
||||
- 上下文与事件 `backend/src/flow/context.rs:29-45`
|
||||
- 引擎驱动 `backend/src/flow/engine.rs:117-209,213-577`
|
||||
- 编排服务 `backend/src/services/flow_service.rs:285-305,342-349,351-365,366-399`
|
||||
- 路由入口 `backend/src/routes/flows.rs:26-35,101-133`
|
||||
- 运行日志服务 `backend/src/services/flow_run_log_service.rs:46-63,74-131`
|
||||
|
||||
## 请求/运行编排时序
|
||||
```mermaid
|
||||
sequenceDiagram
|
||||
participant C as Client
|
||||
participant R as routes.flows
|
||||
participant S as services.flow_service
|
||||
participant D as flow.dsl & mappers
|
||||
participant E as FlowEngine
|
||||
participant L as flow.log_handler
|
||||
participant M as models(flow/flow_run_log)
|
||||
participant SSE as middlewares.sse/ws
|
||||
|
||||
C->>R: POST /flows/{id}/run
|
||||
R->>S: run(id, input)
|
||||
S->>M: get flow doc
|
||||
alt design_json
|
||||
S->>D: chain_from_design_json(design)
|
||||
D-->>S: ChainDef
|
||||
S->>D: ctx_from_design_json(design)
|
||||
D-->>S: ctx supplement
|
||||
else YAML
|
||||
S->>D: parse FlowDSL (YAML)
|
||||
D-->>S: ChainDef
|
||||
end
|
||||
S->>E: drive(chain, ctx, opts)
|
||||
E-->>S: (ctx, logs)
|
||||
S->>L: log_success/log_error
|
||||
L->>M: insert flow_run_log
|
||||
E-->>SSE: emit node/done/error (stream)
|
||||
S-->>R: RunResult(ok, ctx, logs)
|
||||
R-->>C: ApiResponse
|
||||
```
|
||||
|
||||
关键实现:
|
||||
- 解析与补充 `backend/src/services/flow_service.rs:285-305`
|
||||
- 引擎驱动 `backend/src/services/flow_service.rs:342-349`
|
||||
- 日志入库与事件推送 `backend/src/flow/log_handler.rs`
|
||||
|
||||
## 引擎执行图(含执行模式)
|
||||
```mermaid
|
||||
flowchart TD
|
||||
A[选择起点: Start/入度=0/首节点] --> B{合流屏障}
|
||||
B -- 未达成/去重 --> Z[跳过当前节点]
|
||||
B -- 达成 --> C{任务?}
|
||||
C -- 否 --> D[选择后继]
|
||||
C -- 是 --> E{执行模式}
|
||||
E -- Sync --> ES[同步执行任务; 写回 ctx]
|
||||
E -- Async(Fire&Forget) --> EA[异步执行; 追踪句柄]
|
||||
E -- Queued(组队列) --> EQ[组信号量=1; 获取后执行]
|
||||
E -- Bounded(限并发) --> EB[组信号量=n; 获取后执行]
|
||||
ES --> D
|
||||
EA --> D
|
||||
EQ --> D
|
||||
EB --> D
|
||||
D -->|条件节点| F{条件评估 JSON/Rhai}
|
||||
F -- 通过 --> G[进入后继]
|
||||
F -- 不通过/无匹配 --> H[挑选无条件后继或停止]
|
||||
D -->|非条件| G
|
||||
G --> I{是否 End}
|
||||
I -- 是 --> J[FlowEnd 等待组策略; 推送事件; 结束]
|
||||
I -- 否 --> K{多分支?}
|
||||
K -- 单分支 --> L[计算组离开; BranchExit 等待]
|
||||
K -- 多分支 --> M[并行驱动其他分支]
|
||||
L --> A
|
||||
M --> L --> A
|
||||
```
|
||||
|
||||
参考:
|
||||
- 模式枚举与选项 `backend/src/flow/context.rs:9-15,29-45`
|
||||
- 合流与分支 `backend/src/flow/engine.rs:245-268,502-574`
|
||||
- 组等待策略 `backend/src/flow/engine.rs:62-115,559-573`
|
||||
|
||||
## DSL/Design 转换
|
||||
```mermaid
|
||||
flowchart LR
|
||||
X0[Design JSON 输入] --> X1[校验: 唯一ID/Start&End/合法边]
|
||||
X1 --> X2[兼容 sourcePortID → source_port_id]
|
||||
X2 --> X3[推断节点 kind/name/task]
|
||||
X3 --> X4[解析组: members/parentID/awaitPolicy]
|
||||
X4 --> X5[组装条件: AND组/端口匹配/启发式]
|
||||
X5 --> X6["生成 ChainDef: nodes links groups"]
|
||||
X0 --> Y1[mappers: 节点配置提取]
|
||||
Y1 --> Y2[ctx.nodes.<id>.<executor>]
|
||||
```
|
||||
|
||||
实现位置:`backend/src/flow/dsl.rs:138-170,172-203,246-303` 与 `backend/src/flow/mappers.rs:27-95`
|
||||
|
||||
## 数据模型(ER)
|
||||
```mermaid
|
||||
erDiagram
|
||||
FLOWS {
|
||||
bigint id PK
|
||||
varchar name
|
||||
text yaml
|
||||
text design_json
|
||||
varchar code
|
||||
varchar remark
|
||||
timestamp created_at
|
||||
timestamp updated_at
|
||||
}
|
||||
FLOW_RUN_LOGS {
|
||||
bigint id PK
|
||||
bigint flow_id FK
|
||||
varchar flow_code
|
||||
text input
|
||||
text output
|
||||
bool ok
|
||||
text logs
|
||||
bigint user_id
|
||||
varchar username
|
||||
timestamp started_at
|
||||
bigint duration_ms
|
||||
timestamp created_at
|
||||
}
|
||||
FLOWS ||--o{ FLOW_RUN_LOGS : has
|
||||
```
|
||||
|
||||
## 事件与日志通道
|
||||
```mermaid
|
||||
graph LR
|
||||
engine_push[engine.push_and_emit] --> sse_emit[middlewares.sse.emit_*]
|
||||
engine_push --> ws_emit[middlewares.ws.forward]
|
||||
sse_emit --> client[前端]
|
||||
ws_emit --> client
|
||||
engine_push --> log_handler_db[DatabaseLogHandler]
|
||||
engine_push --> log_handler_sse[SseLogHandler]
|
||||
log_handler_db --> run_log_service
|
||||
log_handler_sse --> run_log_service
|
||||
run_log_service --> db
|
||||
```
|
||||
|
||||
## 执行器生态
|
||||
```mermaid
|
||||
graph LR
|
||||
subgraph executors
|
||||
http
|
||||
db
|
||||
variable
|
||||
script_rhai
|
||||
script_js
|
||||
script_python
|
||||
condition
|
||||
end
|
||||
ctx_nodes[ctx.nodes.<id>.*] --> executors
|
||||
ctx_global[ctx.* 顶层] --> executors
|
||||
http --> http_out[(http_response)]
|
||||
db --> db_out[(db_response)]
|
||||
variable --> var_out[(ctx键值写回)]
|
||||
condition --> route[分支选择]
|
||||
```
|
||||
|
||||
注意点:写回策略与幂等,参考 `backend/src/flow/engine.rs:294-375`
|
||||
|
||||
---
|
||||
|
||||
以上图示与说明用于快速理解 Flow 的完整链路:从 DSL/Design 解析到引擎驱动与事件/日志通道,再到数据持久化与执行器生态。结合上文的代码引用,可在 IDE 中跳转到具体实现进行深度阅读。
|
||||
@ -7,7 +7,7 @@ import { FC, useContext, useEffect, useRef, useState } from 'react';
|
||||
|
||||
import classnames from 'classnames';
|
||||
import { useService, I18n } from '@flowgram.ai/free-layout-editor';
|
||||
import { Button, SideSheet, Switch, Tag } from '@douyinfe/semi-ui';
|
||||
import { Button, SideSheet, Switch, Tag, Select, InputNumber } from '@douyinfe/semi-ui';
|
||||
import { IconClose, IconPlay, IconSpin } from '@douyinfe/semi-icons';
|
||||
|
||||
import { TestRunJsonInput } from '../testrun-json-input';
|
||||
@ -15,6 +15,7 @@ import { TestRunForm } from '../testrun-form';
|
||||
import { NodeStatusGroup } from '../node-status-bar/group';
|
||||
// 改为使用后端运行服务
|
||||
import { CustomService } from '../../../services';
|
||||
import { tr } from '../../../../utils/i18n';
|
||||
import { SidebarContext } from '../../../context';
|
||||
import { IconCancel } from '../../../assets/icon-cancel';
|
||||
|
||||
@ -76,6 +77,17 @@ export const TestRunSidePanel: FC<TestRunSidePanelProps> = ({ visible, onCancel
|
||||
localStorage.setItem('testrun-input-json-mode', JSON.stringify(checked));
|
||||
};
|
||||
|
||||
// 执行模式与并发上限(仅保存到设计 JSON,不直接随运行请求发送)
|
||||
const [executionMode, setExecutionMode] = useState<'sync'|'async'|'queued'|'bounded'>(() => {
|
||||
const saved = localStorage.getItem('testrun-execution-mode');
|
||||
return (saved === 'async' || saved === 'queued' || saved === 'bounded') ? (saved as any) : 'sync';
|
||||
});
|
||||
const [concurrencyLimit, setConcurrencyLimit] = useState<number | undefined>(() => {
|
||||
const saved = localStorage.getItem('testrun-concurrency-limit');
|
||||
const num = saved ? Number(saved) : NaN;
|
||||
return Number.isFinite(num) && num > 0 ? num : undefined;
|
||||
});
|
||||
|
||||
const extractErrorMsg = (logs: string[] | undefined): string | undefined => {
|
||||
if (!logs || logs.length === 0) return undefined;
|
||||
const patterns = [/failed/i, /error/i, /panic/i];
|
||||
@ -97,9 +109,9 @@ export const TestRunSidePanel: FC<TestRunSidePanelProps> = ({ visible, onCancel
|
||||
setRunning(true);
|
||||
try {
|
||||
// 运行前保存(静默),确保后端 YAML 与编辑器一致;若保存失败则不继续运行
|
||||
const saved = await customService.save({ silent: true });
|
||||
const saved = await customService.save({ silent: true, executionMode, concurrencyLimit });
|
||||
if (!saved) {
|
||||
setErrors([I18n.t('Save failed, cannot run')]);
|
||||
setErrors([tr('Save failed, cannot run')]);
|
||||
return;
|
||||
}
|
||||
|
||||
@ -127,7 +139,7 @@ export const TestRunSidePanel: FC<TestRunSidePanelProps> = ({ visible, onCancel
|
||||
}
|
||||
},
|
||||
onError: (evt) => {
|
||||
const msg = evt.message || I18n.t('Run failed');
|
||||
const msg = evt.message || tr('Run failed');
|
||||
setErrors((prev) => [...(prev || []), msg]);
|
||||
},
|
||||
onDone: (evt) => {
|
||||
@ -144,7 +156,7 @@ export const TestRunSidePanel: FC<TestRunSidePanelProps> = ({ visible, onCancel
|
||||
if (evt.logs && evt.logs.length) setStreamLogs((prev: string[]) => [...prev, ...evt.logs!]);
|
||||
},
|
||||
onError: (evt) => {
|
||||
const msg = evt.message || I18n.t('Run failed');
|
||||
const msg = evt.message || tr('Run failed');
|
||||
setErrors((prev) => [...(prev || []), msg]);
|
||||
},
|
||||
onDone: (evt) => {
|
||||
@ -165,7 +177,7 @@ export const TestRunSidePanel: FC<TestRunSidePanelProps> = ({ visible, onCancel
|
||||
setResult(finished as any);
|
||||
} else {
|
||||
// 流结束但未收到 done 事件,给出提示
|
||||
setErrors((prev) => [...(prev || []), I18n.t('Stream terminated without completion')]);
|
||||
setErrors((prev) => [...(prev || []), tr('Stream terminated without completion')]);
|
||||
}
|
||||
} else {
|
||||
// 普通 HTTP 一次性运行
|
||||
@ -174,20 +186,20 @@ export const TestRunSidePanel: FC<TestRunSidePanelProps> = ({ visible, onCancel
|
||||
if (runRes) {
|
||||
if ((runRes as any).ok === false) {
|
||||
setResult(runRes as any);
|
||||
const err = extractErrorMsg((runRes as any).logs) || I18n.t('Run failed');
|
||||
const err = extractErrorMsg((runRes as any).logs) || tr('Run failed');
|
||||
setErrors([err]);
|
||||
} else {
|
||||
setResult(runRes as any);
|
||||
}
|
||||
} else {
|
||||
setErrors([I18n.t('Run failed')]);
|
||||
setErrors([tr('Run failed')]);
|
||||
}
|
||||
} catch (e: any) {
|
||||
setErrors([e?.message || I18n.t('Run failed')]);
|
||||
setErrors([e?.message || tr('Run failed')]);
|
||||
}
|
||||
}
|
||||
} catch (e: any) {
|
||||
setErrors([e?.message || I18n.t('Run failed')]);
|
||||
setErrors([e?.message || tr('Run failed')]);
|
||||
} finally {
|
||||
setRunning(false);
|
||||
cancelRef.current = null;
|
||||
@ -226,7 +238,7 @@ export const TestRunSidePanel: FC<TestRunSidePanelProps> = ({ visible, onCancel
|
||||
const renderRunning = (
|
||||
<div className={styles['testrun-panel-running']}>
|
||||
<IconSpin spin size="large" />
|
||||
<div className={styles.text}>{I18n.t('Running...')}</div>
|
||||
<div className={styles.text}>{tr('Running...')}</div>
|
||||
{/* 实时输出(仅流式模式显示) */}
|
||||
{streamMode && (
|
||||
<>
|
||||
@ -246,9 +258,9 @@ export const TestRunSidePanel: FC<TestRunSidePanelProps> = ({ visible, onCancel
|
||||
|
||||
const renderStatus = (
|
||||
<div style={{ marginBottom: 8 }}>
|
||||
{result?.ok === true && <Tag color="green">{I18n.t('Success')}</Tag>}
|
||||
{result?.ok === true && <Tag color="green">{tr('Success')}</Tag>}
|
||||
{(errors?.length || result?.ok === false) && (
|
||||
<Tag color="red">{I18n.t('Failed')}</Tag>
|
||||
<Tag color="red">{tr('Failed')}</Tag>
|
||||
)}
|
||||
</div>
|
||||
);
|
||||
@ -256,9 +268,9 @@ export const TestRunSidePanel: FC<TestRunSidePanelProps> = ({ visible, onCancel
|
||||
const renderForm = (
|
||||
<div className={styles['testrun-panel-form']}>
|
||||
<div className={styles['testrun-panel-input']}>
|
||||
<div className={styles.title}>{I18n.t('Input Form')}</div>
|
||||
<div className={styles.title}>{tr('Input Form')}</div>
|
||||
<div className={styles.toggle}>
|
||||
<div>{I18n.t('JSON Mode')}</div>
|
||||
<div>{tr('JSON Mode')}</div>
|
||||
<Switch
|
||||
checked={inputJSONMode}
|
||||
onChange={(checked: boolean) => setInputJSONMode(checked)}
|
||||
@ -266,16 +278,44 @@ export const TestRunSidePanel: FC<TestRunSidePanelProps> = ({ visible, onCancel
|
||||
/>
|
||||
</div>
|
||||
<div className={styles.toggle}>
|
||||
<div>{I18n.t('Streaming Mode')}</div>
|
||||
<div>{tr('Streaming Mode')}</div>
|
||||
<Switch
|
||||
checked={streamMode}
|
||||
onChange={(checked: boolean) => setStreamMode(checked)}
|
||||
size="small"
|
||||
/>
|
||||
</div>
|
||||
<div className={styles.toggle}>
|
||||
<div>{tr('Execution Mode')}</div>
|
||||
<Select
|
||||
value={executionMode}
|
||||
onChange={(v) => { const val = String(v) as any; setExecutionMode(val); localStorage.setItem('testrun-execution-mode', val); }}
|
||||
size="small"
|
||||
style={{ width: 160 }}
|
||||
renderSelectedItem={(option: any) => option?.label ?? tr(String(option?.value || ''))}
|
||||
>
|
||||
<Select.Option value="sync">{tr('sync')}</Select.Option>
|
||||
<Select.Option value="async">{tr('async')}</Select.Option>
|
||||
<Select.Option value="queued">{tr('queued')}</Select.Option>
|
||||
<Select.Option value="bounded">{tr('bounded')}</Select.Option>
|
||||
</Select>
|
||||
</div>
|
||||
{executionMode === 'bounded' && (
|
||||
<div className={styles.toggle}>
|
||||
<div>{tr('Concurrency Limit')}</div>
|
||||
<InputNumber
|
||||
value={typeof concurrencyLimit === 'number' ? concurrencyLimit : undefined}
|
||||
onChange={(v) => { const num = Number(v); const next = Number.isFinite(num) && num > 0 ? num : undefined; setConcurrencyLimit(next); localStorage.setItem('testrun-concurrency-limit', String(next ?? '')); }}
|
||||
size="small"
|
||||
min={1}
|
||||
max={128}
|
||||
style={{ width: 120 }}
|
||||
/>
|
||||
</div>
|
||||
)}
|
||||
{streamMode && (
|
||||
<div className={styles.toggle}>
|
||||
<div>WS</div>
|
||||
<div>{tr('WS')}</div>
|
||||
<Switch
|
||||
checked={useWS}
|
||||
onChange={(checked: boolean) => setUseWS(checked)}
|
||||
@ -298,16 +338,16 @@ export const TestRunSidePanel: FC<TestRunSidePanelProps> = ({ visible, onCancel
|
||||
{/* 运行中(流式)时,直接在表单区域下方展示实时输出,而不是覆盖整块内容 */}
|
||||
{streamMode && isRunning && (
|
||||
<>
|
||||
<NodeStatusGroup title={I18n.t('Logs') + ' (Live)'} data={streamLogs} optional disableCollapse />
|
||||
<NodeStatusGroup title={I18n.t('Context') + ' (Live)'} data={streamCtx} optional disableCollapse />
|
||||
<NodeStatusGroup title={tr('Logs') + ' (Live)'} data={streamLogs} optional disableCollapse />
|
||||
<NodeStatusGroup title={tr('Context') + ' (Live)'} data={streamCtx} optional disableCollapse />
|
||||
</>
|
||||
)}
|
||||
|
||||
{/* 展示后端返回的执行信息:仅在非流式或流式已结束时显示,避免与实时输出重复 */}
|
||||
{(!streamMode || !isRunning) && (
|
||||
<>
|
||||
<NodeStatusGroup title={I18n.t('Logs')} data={result?.logs} optional disableCollapse />
|
||||
<NodeStatusGroup title={I18n.t('Context')} data={result?.ctx} optional disableCollapse />
|
||||
<NodeStatusGroup title={tr('Logs')} data={result?.logs} optional disableCollapse />
|
||||
<NodeStatusGroup title={tr('Context')} data={result?.ctx} optional disableCollapse />
|
||||
</>
|
||||
)}
|
||||
</div>
|
||||
@ -326,13 +366,13 @@ export const TestRunSidePanel: FC<TestRunSidePanelProps> = ({ visible, onCancel
|
||||
[styles.default]: !isRunning,
|
||||
})}
|
||||
>
|
||||
{isRunning ? I18n.t('Running...') : I18n.t('Test Run')}
|
||||
{isRunning ? tr('Running...') : tr('Test Run')}
|
||||
</Button>
|
||||
);
|
||||
|
||||
return (
|
||||
<SideSheet
|
||||
title={I18n.t('Test Run')}
|
||||
title={tr('Test Run')}
|
||||
visible={visible}
|
||||
mask={false}
|
||||
motion={false}
|
||||
@ -351,7 +391,7 @@ export const TestRunSidePanel: FC<TestRunSidePanelProps> = ({ visible, onCancel
|
||||
>
|
||||
<div className={styles['testrun-panel-container']}>
|
||||
<div className={styles['testrun-panel-header']}>
|
||||
<div className={styles['testrun-panel-title']}>{I18n.t('Test Run')}</div>
|
||||
<div className={styles['testrun-panel-title']}>{tr('Test Run')}</div>
|
||||
<Button
|
||||
className={styles['testrun-panel-title']}
|
||||
type="tertiary"
|
||||
|
||||
@ -8,6 +8,7 @@ import { useState, useEffect } from 'react';
|
||||
import { useRefresh } from '@flowgram.ai/free-layout-editor';
|
||||
import { useClientContext } from '@flowgram.ai/free-layout-editor';
|
||||
import { Tooltip, IconButton, Divider } from '@douyinfe/semi-ui';
|
||||
import { tr } from '../../../utils/i18n';
|
||||
import { IconUndo, IconRedo, IconChevronLeft, IconEdit } from '@douyinfe/semi-icons';
|
||||
|
||||
import { TestRunButton } from '../testrun/testrun-button';
|
||||
@ -172,7 +173,7 @@ export const FlowTools = () => {
|
||||
{flowCode ? <span className="code">{flowCode}</span> : null}
|
||||
{flowName ? <span className="name" title={flowName}>{flowName}</span> : null}
|
||||
<span className="actions">
|
||||
<Tooltip content={I18n.t('Edit Base Info')}>
|
||||
<Tooltip content={tr('Edit Base Info')}>
|
||||
<IconButton type="tertiary" theme="borderless" icon={<IconEdit />} onClick={openBaseInfo} />
|
||||
</Tooltip>
|
||||
</span>
|
||||
@ -182,7 +183,7 @@ export const FlowTools = () => {
|
||||
<ToolContainer className="flow-tools">
|
||||
<ToolSection>
|
||||
{/* 返回列表 */}
|
||||
<Tooltip content={I18n.t('Back to List')}>
|
||||
<Tooltip content={tr('Back to List')}>
|
||||
<IconButton
|
||||
type="tertiary"
|
||||
theme="borderless"
|
||||
@ -200,7 +201,7 @@ export const FlowTools = () => {
|
||||
<Minimap visible={minimapVisible} />
|
||||
<Readonly />
|
||||
<Comment />
|
||||
<Tooltip content={I18n.t('Undo')}>
|
||||
<Tooltip content={tr('Undo')}>
|
||||
<IconButton
|
||||
type="tertiary"
|
||||
theme="borderless"
|
||||
@ -209,7 +210,7 @@ export const FlowTools = () => {
|
||||
onClick={() => history.undo()}
|
||||
/>
|
||||
</Tooltip>
|
||||
<Tooltip content={I18n.t('Redo')}>
|
||||
<Tooltip content={tr('Redo')}>
|
||||
<IconButton
|
||||
type="tertiary"
|
||||
theme="borderless"
|
||||
@ -230,7 +231,7 @@ export const FlowTools = () => {
|
||||
|
||||
{/* 基础信息弹窗(对齐新建流程表单) */}
|
||||
<AModal
|
||||
title={`${I18n.t('Edit Base Info')}${baseForm.getFieldValue('name') ? ' - ' + baseForm.getFieldValue('name') : ''}`}
|
||||
title={`${tr('Edit Base Info')}${baseForm.getFieldValue('name') ? ' - ' + baseForm.getFieldValue('name') : ''}`}
|
||||
open={baseOpen}
|
||||
onOk={handleBaseOk}
|
||||
confirmLoading={baseLoading}
|
||||
@ -240,14 +241,14 @@ export const FlowTools = () => {
|
||||
destroyOnHidden
|
||||
>
|
||||
<AForm form={baseForm} layout="vertical" preserve={false}>
|
||||
<AForm.Item name="name" label={I18n.t('Flow Name')} rules={[{ required: true, message: I18n.t('Please input flow name') }, { max: 50, message: I18n.t('Max 50 characters') }]}>
|
||||
<AInput placeholder={I18n.t('Please input flow name')} allowClear />
|
||||
<AForm.Item name="name" label={tr('Flow Name')} rules={[{ required: true, message: tr('Please input flow name') }, { max: 50, message: tr('Max 50 characters') }]}>
|
||||
<AInput placeholder={tr('Please input flow name')} allowClear />
|
||||
</AForm.Item>
|
||||
<AForm.Item name="code" label={I18n.t('Flow Code')} rules={[{ required: true, message: I18n.t('Please input flow code') }, { max: 50, message: I18n.t('Max 50 characters') }]}>
|
||||
<AInput placeholder={I18n.t('Required, recommend letters/numbers/-/_')} allowClear />
|
||||
<AForm.Item name="code" label={tr('Flow Code')} rules={[{ required: true, message: tr('Please input flow code') }, { max: 50, message: tr('Max 50 characters') }]}>
|
||||
<AInput placeholder={tr('Required, recommend letters/numbers/-/_')} allowClear />
|
||||
</AForm.Item>
|
||||
<AForm.Item name="remark" label={I18n.t('Remark')} rules={[{ max: 255, message: I18n.t('Max 255 characters') }]}>
|
||||
<AInput.TextArea rows={3} placeholder={I18n.t('Optional, remark info')} allowClear />
|
||||
<AForm.Item name="remark" label={tr('Remark')} rules={[{ max: 255, message: tr('Max 255 characters') }]}>
|
||||
<AInput.TextArea rows={3} placeholder={tr('Optional, remark info')} allowClear />
|
||||
</AForm.Item>
|
||||
</AForm>
|
||||
</AModal>
|
||||
|
||||
@ -11,6 +11,7 @@ import {
|
||||
WorkflowDocument,
|
||||
} from '@flowgram.ai/free-layout-editor';
|
||||
import { Toast } from '@douyinfe/semi-ui';
|
||||
import { tr } from '../../utils/i18n';
|
||||
import { I18n } from '@flowgram.ai/free-layout-editor';
|
||||
import api, { type ApiResp } from '../../utils/axios';
|
||||
import { stringifyFlowDoc } from '../utils/yaml';
|
||||
@ -73,22 +74,29 @@ export class CustomService {
|
||||
@inject(WorkflowDocument) document!: WorkflowDocument;
|
||||
|
||||
// 新增可选参数,用于静默保存时不弹出 Toast,并返回是否保存成功
|
||||
async save(opts?: { silent?: boolean }): Promise<boolean> {
|
||||
async save(opts?: { silent?: boolean; executionMode?: 'sync'|'async'|'queued'|'bounded'; concurrencyLimit?: number }): Promise<boolean> {
|
||||
const silent = !!opts?.silent;
|
||||
try {
|
||||
const id = getFlowIdFromUrl();
|
||||
if (!id) {
|
||||
if (!silent) Toast.error(I18n.t('Flow ID is missing, cannot save'));
|
||||
if (!silent) Toast.error(tr('Flow ID is missing, cannot save'));
|
||||
return false;
|
||||
}
|
||||
const json = this.document.toJSON() as any;
|
||||
// 在根级写入执行配置(与后端契约保持一致:executionMode/concurrencyLimit)
|
||||
if (opts?.executionMode) {
|
||||
try { json.executionMode = opts.executionMode; } catch {}
|
||||
}
|
||||
if (typeof opts?.concurrencyLimit === 'number') {
|
||||
try { json.concurrencyLimit = opts.concurrencyLimit; } catch {}
|
||||
}
|
||||
const yaml = stringifyFlowDoc(json);
|
||||
// 使用转换后的 design_json,以便后端根据语言选择正确的执行器
|
||||
const designForBackend = transformDesignJsonForBackend(json);
|
||||
const design_json = JSON.stringify(designForBackend);
|
||||
const { data } = await api.put<ApiResp<{ saved: boolean }>>(`/flows/${id}`, { yaml, design_json });
|
||||
if (data?.code === 0) {
|
||||
if (!silent) Toast.success(I18n.t('Saved'));
|
||||
if (!silent) Toast.success(tr('Saved'));
|
||||
try {
|
||||
const key = (() => {
|
||||
const hash = window.location.hash || '';
|
||||
@ -101,12 +109,12 @@ export class CustomService {
|
||||
} catch {}
|
||||
return true;
|
||||
} else {
|
||||
const msg = data?.message || I18n.t('Save failed');
|
||||
const msg = data?.message || tr('Save failed');
|
||||
if (!silent) Toast.error(msg);
|
||||
return false;
|
||||
}
|
||||
} catch (e: any) {
|
||||
const msg = e?.message || I18n.t('Save failed');
|
||||
const msg = e?.message || tr('Save failed');
|
||||
if (!silent) Toast.error(msg);
|
||||
return false;
|
||||
}
|
||||
@ -116,16 +124,16 @@ export class CustomService {
|
||||
try {
|
||||
const id = getFlowIdFromUrl();
|
||||
if (!id) {
|
||||
Toast.error(I18n.t('Flow ID is missing, cannot run'));
|
||||
Toast.error(tr('Flow ID is missing, cannot run'));
|
||||
return null;
|
||||
}
|
||||
const { data } = await api.post<ApiResp<RunResult>>(`/flows/${id}/run`, { input });
|
||||
if (data?.code === 0) {
|
||||
return data.data;
|
||||
}
|
||||
throw new Error(data?.message || I18n.t('Run failed'));
|
||||
throw new Error(data?.message || tr('Run failed'));
|
||||
} catch (e: any) {
|
||||
Toast.error(e?.message || I18n.t('Run failed'));
|
||||
Toast.error(e?.message || tr('Run failed'));
|
||||
return null;
|
||||
}
|
||||
}
|
||||
@ -142,7 +150,7 @@ export class CustomService {
|
||||
) {
|
||||
const id = getFlowIdFromUrl();
|
||||
if (!id) {
|
||||
const err = new Error(I18n.t('Flow ID is missing, cannot run'));
|
||||
const err = new Error(tr('Flow ID is missing, cannot run'));
|
||||
handlers?.onFatal?.(err);
|
||||
return { cancel: () => {}, done: Promise.resolve<RunResult | null>(null) } as const;
|
||||
}
|
||||
|
||||
@ -14,6 +14,65 @@ const api: AxiosInstance = axios.create({ baseURL: configuredBase ? `${configure
|
||||
let isRefreshing = false
|
||||
let pendingQueue: { resolve: () => void; reject: (e: unknown) => void; config: RetryConfig }[] = []
|
||||
|
||||
function redirectToLogin(msg?: string): Promise<never> {
|
||||
clearToken()
|
||||
if (typeof window !== 'undefined') {
|
||||
const isHashRouter = (window.location.hash || '').startsWith('#/')
|
||||
if (isHashRouter) {
|
||||
if (window.location.hash !== '#/login') {
|
||||
window.location.hash = '#/login'
|
||||
}
|
||||
} else {
|
||||
if (window.location.pathname !== '/login') {
|
||||
// 兼容浏览器路由与生产静态部署:优先使用 hash 入口,无法识别时再回退 pathname
|
||||
try { window.location.hash = '#/login' } catch {}
|
||||
try { window.location.href = '/login' } catch {}
|
||||
}
|
||||
}
|
||||
}
|
||||
return Promise.reject(new Error(msg || '未登录或登录已过期'))
|
||||
}
|
||||
|
||||
async function tryRefreshAndRetry(original: RetryConfig, message?: string): Promise<AxiosResponse<any>> {
|
||||
// 已重试过,直接拒绝
|
||||
if (original._retry) {
|
||||
return Promise.reject(new Error(message || '未授权'))
|
||||
}
|
||||
original._retry = true
|
||||
|
||||
const hasToken = !!getToken()
|
||||
if (!hasToken) {
|
||||
return redirectToLogin(message)
|
||||
}
|
||||
|
||||
if (!isRefreshing) {
|
||||
isRefreshing = true
|
||||
try {
|
||||
const { data } = await api.get<ApiResp<{ access_token: string }>>('/auth/refresh')
|
||||
if (data?.code === 0) {
|
||||
const access = data.data?.access_token
|
||||
if (access) setToken(access)
|
||||
pendingQueue.forEach(p => p.resolve())
|
||||
pendingQueue = []
|
||||
return api(original)
|
||||
}
|
||||
// 刷新失败,走登录
|
||||
pendingQueue.forEach(p => p.reject(new Error(data?.message || 'refresh failed')))
|
||||
pendingQueue = []
|
||||
return redirectToLogin(data?.message)
|
||||
} catch (e: any) {
|
||||
pendingQueue.forEach(p => p.reject(e))
|
||||
pendingQueue = []
|
||||
return redirectToLogin((e && e.message) || message)
|
||||
} finally {
|
||||
isRefreshing = false
|
||||
}
|
||||
}
|
||||
return new Promise<void>((resolve, reject) => {
|
||||
pendingQueue.push({ resolve: () => resolve(), reject: (e: unknown) => reject(e as unknown), config: original })
|
||||
}).then(() => api(original))
|
||||
}
|
||||
|
||||
api.interceptors.request.use((config: RetryConfig) => {
|
||||
const token = getToken()
|
||||
if (token) {
|
||||
@ -30,7 +89,25 @@ api.interceptors.request.use((config: RetryConfig) => {
|
||||
})
|
||||
|
||||
api.interceptors.response.use(
|
||||
(r: AxiosResponse) => r,
|
||||
async (r: AxiosResponse<ApiResp<unknown>>) => {
|
||||
// 业务层 401(HTTP 200, body.code=401)统一处理为未授权
|
||||
const body = r?.data as ApiResp<unknown>
|
||||
if (body && typeof body.code === 'number' && body.code === 401) {
|
||||
const original = (r.config || {}) as RetryConfig
|
||||
const reqUrl = (original?.url || '').toString()
|
||||
// 登录接口返回 401:不做 refresh,不跳转
|
||||
if (reqUrl.includes('/auth/login')) {
|
||||
const msg = body?.message || '未登录或登录已过期'
|
||||
return Promise.reject(new Error(msg))
|
||||
}
|
||||
// 刷新接口返回业务 401:直接跳转登录,避免循环重试
|
||||
if (reqUrl.includes('/auth/refresh')) {
|
||||
return redirectToLogin(body?.message)
|
||||
}
|
||||
return tryRefreshAndRetry(original, body?.message)
|
||||
}
|
||||
return r
|
||||
},
|
||||
async (error: AxiosError<ApiResp<unknown>>) => {
|
||||
const original = (error.config || {}) as RetryConfig
|
||||
const status = error.response?.status
|
||||
@ -43,6 +120,10 @@ api.interceptors.response.use(
|
||||
const msg = resp?.message || '用户名或密码错误'
|
||||
return Promise.reject(new Error(msg))
|
||||
}
|
||||
// 刷新接口返回 401:直接跳转登录,避免循环重试
|
||||
if (reqUrl.includes('/auth/refresh')) {
|
||||
return redirectToLogin(resp?.message)
|
||||
}
|
||||
|
||||
// 已经重试过了,直接拒绝
|
||||
if (original._retry) {
|
||||
@ -52,12 +133,7 @@ api.interceptors.response.use(
|
||||
|
||||
const hasToken = !!getToken()
|
||||
if (!hasToken) {
|
||||
// 没有 token 的 401:如果不在登录页,则跳转到登录页;否则仅抛错以便界面提示
|
||||
if (typeof window !== 'undefined' && window.location.pathname !== '/login') {
|
||||
window.location.href = '/login'
|
||||
}
|
||||
const msg = resp?.message || '未登录或登录已过期'
|
||||
return Promise.reject(new Error(msg))
|
||||
return redirectToLogin(resp?.message)
|
||||
}
|
||||
|
||||
// 有 token 的 401:尝试刷新
|
||||
@ -72,14 +148,10 @@ api.interceptors.response.use(
|
||||
pendingQueue = []
|
||||
return api(original)
|
||||
}
|
||||
} catch (e) {
|
||||
} catch (e: any) {
|
||||
pendingQueue.forEach(p => p.reject(e))
|
||||
pendingQueue = []
|
||||
clearToken()
|
||||
if (typeof window !== 'undefined' && window.location.pathname !== '/login') {
|
||||
window.location.href = '/login'
|
||||
}
|
||||
return Promise.reject(e)
|
||||
return redirectToLogin((e && e.message) || resp?.message)
|
||||
} finally {
|
||||
isRefreshing = false
|
||||
}
|
||||
|
||||
44
frontend/src/utils/i18n.ts
Normal file
44
frontend/src/utils/i18n.ts
Normal file
@ -0,0 +1,44 @@
|
||||
export function tr(s: string): string {
|
||||
const lang = (localStorage.getItem('lang') === 'en') ? 'en' : 'zh';
|
||||
if (lang === 'en') return s;
|
||||
const map: Record<string, string> = {
|
||||
'Test Run': '测试运行',
|
||||
'Running...': '运行中...',
|
||||
'JSON Mode': 'JSON 模式',
|
||||
'Streaming Mode': '流式模式',
|
||||
'WS': 'WS',
|
||||
'Execution Mode': '执行模式',
|
||||
'Concurrency Limit': '并发上限',
|
||||
'Success': '成功',
|
||||
'Failed': '失败',
|
||||
'Context': '上下文',
|
||||
'Logs': '日志',
|
||||
'Input Form': '输入表单',
|
||||
'Back to List': '返回列表',
|
||||
'Edit Base Info': '编辑基础信息',
|
||||
'Please input flow name': '请输入流程名称',
|
||||
'Flow Name': '流程名称',
|
||||
'Flow Code': '流程编号',
|
||||
'Remark': '备注',
|
||||
'Max 50 characters': '最多50个字符',
|
||||
'Max 255 characters': '最多255个字符',
|
||||
'Required, recommend letters/numbers/-/_': '必填,建议使用字母/数字/-/_',
|
||||
'Optional, remark info': '可选,备注信息',
|
||||
'Save': '保存',
|
||||
'Undo': '撤销',
|
||||
'Redo': '重做',
|
||||
'Saved': '已保存',
|
||||
'Save failed': '保存失败',
|
||||
'Save failed, cannot run': '保存失败,无法运行',
|
||||
'Run failed': '运行失败',
|
||||
'Flow ID is missing, cannot save': '缺少流程 ID,无法保存',
|
||||
'Flow ID is missing, cannot run': '缺少流程 ID,无法运行',
|
||||
'Stream terminated without completion': '流结束但未完成',
|
||||
// Execution modes
|
||||
'sync': '同步',
|
||||
'async': '异步',
|
||||
'queued': '队列',
|
||||
'bounded': '限并发',
|
||||
};
|
||||
return map[s] ?? s;
|
||||
}
|
||||
File diff suppressed because one or more lines are too long
29
scripts/results/code_js1_batch_20251010_235010.json
Normal file
29
scripts/results/code_js1_batch_20251010_235010.json
Normal file
@ -0,0 +1,29 @@
|
||||
{
|
||||
"url": "http://127.0.0.1:9898/api/dynamic/code_js1",
|
||||
"method": "POST",
|
||||
"total": 5000,
|
||||
"concurrency": 300,
|
||||
"timeout_sec": 8.0,
|
||||
"ok": 5000,
|
||||
"errors": 0,
|
||||
"has_data": 5000,
|
||||
"duration_ms": 13893.509542103857,
|
||||
"throughput_rps": 359.88027250045445,
|
||||
"status_counts": {
|
||||
"200": 5000
|
||||
},
|
||||
"latency_ms": {
|
||||
"min": 446.18216599337757,
|
||||
"avg": 820.1658831051551,
|
||||
"max": 1031.878500012681,
|
||||
"p50": 853.5854578949511,
|
||||
"p90": 910.5232087895274,
|
||||
"p95": 950.7714579813182,
|
||||
"p99": 1012.8602078184485
|
||||
},
|
||||
"resp_size_bytes": {
|
||||
"avg": 42,
|
||||
"min": 42,
|
||||
"max": 42
|
||||
}
|
||||
}
|
||||
29
scripts/results/code_js1_batch_20251011_000537.json
Normal file
29
scripts/results/code_js1_batch_20251011_000537.json
Normal file
@ -0,0 +1,29 @@
|
||||
{
|
||||
"url": "http://127.0.0.1:9898/api/dynamic/code_js1",
|
||||
"method": "POST",
|
||||
"total": 5000,
|
||||
"concurrency": 300,
|
||||
"timeout_sec": 8.0,
|
||||
"ok": 5000,
|
||||
"errors": 0,
|
||||
"has_data": 5000,
|
||||
"duration_ms": 20240.968541940674,
|
||||
"throughput_rps": 247.02375232883037,
|
||||
"status_counts": {
|
||||
"200": 5000
|
||||
},
|
||||
"latency_ms": {
|
||||
"min": 384.1903330758214,
|
||||
"avg": 1199.4089808925055,
|
||||
"max": 1533.5173748899251,
|
||||
"p50": 1244.2287909798324,
|
||||
"p90": 1380.3387091029435,
|
||||
"p95": 1421.7030832078308,
|
||||
"p99": 1466.3815409876406
|
||||
},
|
||||
"resp_size_bytes": {
|
||||
"avg": 42,
|
||||
"min": 42,
|
||||
"max": 42
|
||||
}
|
||||
}
|
||||
29
scripts/results/code_js1_batch_20251011_001656.json
Normal file
29
scripts/results/code_js1_batch_20251011_001656.json
Normal file
@ -0,0 +1,29 @@
|
||||
{
|
||||
"url": "http://127.0.0.1:9898/api/dynamic/code_js1",
|
||||
"method": "POST",
|
||||
"total": 5000,
|
||||
"concurrency": 300,
|
||||
"timeout_sec": 8.0,
|
||||
"ok": 5000,
|
||||
"errors": 0,
|
||||
"has_data": 4257,
|
||||
"duration_ms": 7239.655584096909,
|
||||
"throughput_rps": 690.6405894478352,
|
||||
"status_counts": {
|
||||
"200": 5000
|
||||
},
|
||||
"latency_ms": {
|
||||
"min": 243.2825828436762,
|
||||
"avg": 425.6030804462731,
|
||||
"max": 587.7386662177742,
|
||||
"p50": 414.4594999961555,
|
||||
"p90": 484.4337080139667,
|
||||
"p95": 531.1474581249058,
|
||||
"p99": 569.8145001661032
|
||||
},
|
||||
"resp_size_bytes": {
|
||||
"avg": 40.9598,
|
||||
"min": 35,
|
||||
"max": 42
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user