新增以下文档文件: - PROJECT_OVERVIEW.md 项目总览文档 - BACKEND_ARCHITECTURE.md 后端架构文档 - FRONTEND_ARCHITECTURE.md 前端架构文档 - FLOW_ENGINE.md 流程引擎文档 - SERVICES.md 服务层文档 - ERROR_HANDLING.md 错误处理模块文档 文档内容涵盖项目整体介绍、技术架构、核心模块设计和实现细节
11 KiB
11 KiB
流程引擎文档
概述
流程引擎是 UdminAI 的核心模块,提供可视化流程设计、执行和监控功能。支持多种节点类型、条件分支、循环控制和并发执行。
架构设计
核心组件
flow/
├── domain.rs # 领域模型定义
├── dsl.rs # DSL 解析和构建
├── engine.rs # 流程执行引擎
├── context.rs # 执行上下文管理
├── task.rs # 任务抽象接口
├── log_handler.rs # 日志处理
├── mappers.rs # 数据映射器
├── executors/ # 执行器实现
└── mappers/ # 具体映射器
领域模型 (domain.rs)
核心数据结构
ChainDef - 流程链定义
pub struct ChainDef {
pub nodes: Vec<NodeDef>, // 节点列表
pub links: Vec<LinkDef>, // 连接列表
}
NodeDef - 节点定义
pub struct NodeDef {
pub id: NodeId, // 节点唯一标识
pub kind: NodeKind, // 节点类型
pub data: serde_json::Value, // 节点配置数据
}
NodeKind - 节点类型
pub enum NodeKind {
Start, // 开始节点
End, // 结束节点
Condition, // 条件节点
Http, // HTTP 请求节点
Database, // 数据库操作节点
ScriptJs, // JavaScript 脚本节点
ScriptPython, // Python 脚本节点
ScriptRhai, // Rhai 脚本节点
Variable, // 变量操作节点
Task, // 通用任务节点
}
LinkDef - 连接定义
pub struct LinkDef {
pub from: NodeId, // 源节点
pub to: NodeId, // 目标节点
pub condition: Option<String>, // 连接条件
}
DSL 解析 (dsl.rs)
DSL 结构
FlowDSL - 流程 DSL
pub struct FlowDSL {
pub nodes: Vec<NodeDSL>, // 节点列表
pub edges: Vec<EdgeDSL>, // 边列表
}
DesignSyntax - 设计语法
pub struct DesignSyntax {
pub nodes: Vec<NodeSyntax>, // 节点语法
pub edges: Vec<EdgeSyntax>, // 边语法
}
核心功能
1. 设计验证
pub fn validate_design(design: &DesignSyntax) -> anyhow::Result<()>
验证规则:
- 节点 ID 唯一性
- 至少包含一个 start 节点
- 至少包含一个 end 节点
- 边的引用合法性
- 条件节点配置完整性
2. 链构建
pub fn build_chain_from_design(design: &DesignSyntax) -> anyhow::Result<ChainDef>
构建过程:
- 节点类型推断
- 条件节点处理
- 边关系建立
- 数据完整性检查
3. 兼容性处理
pub fn chain_from_design_json(input: &str) -> anyhow::Result<ChainDef>
兼容特性:
- 字符串/对象输入支持
- 字段回填
- 版本兼容
- 错误恢复
执行引擎 (engine.rs)
FlowEngine - 流程执行引擎
核心结构
pub struct FlowEngine {
tasks: TaskRegistry, // 任务注册表
}
执行选项
pub struct DriveOptions {
pub max_steps: Option<usize>, // 最大执行步数
pub timeout_ms: Option<u64>, // 超时时间
pub parallel: bool, // 并发执行
pub stream_events: bool, // 流事件支持
}
执行流程
1. 起点选择
- 优先选择 Start 节点
- 其次选择入度为 0 的节点
- 最后选择第一个节点
2. 执行策略
- 串行执行: 按依赖顺序逐个执行
- 并发执行: 无依赖节点并行执行
- 条件分支: 根据条件选择执行路径
- 循环控制: 支持循环节点执行
3. 状态管理
- 节点执行状态跟踪
- 上下文数据传递
- 错误状态处理
- 执行结果收集
任务注册表 (TaskRegistry)
注册机制
pub struct TaskRegistry {
executors: HashMap<NodeKind, Box<dyn Executor>>,
}
执行器接口
#[async_trait]
pub trait Executor: Send + Sync {
async fn execute(
&self,
node_id: &NodeId,
node: &NodeDef,
ctx: &mut serde_json::Value,
) -> anyhow::Result<()>;
}
执行器实现 (executors/)
HTTP 执行器 (http.rs)
功能: 执行 HTTP 请求
配置参数:
method: 请求方法 (GET/POST/PUT/DELETE)url: 请求 URLheaders: 请求头query: 查询参数body: 请求体timeout_ms: 超时时间insecure: 忽略 SSL 验证
执行流程:
- 解析 HTTP 配置
- 构建请求参数
- 发送 HTTP 请求
- 处理响应结果
- 更新执行上下文
数据库执行器 (db.rs)
功能: 执行数据库操作
支持操作:
SELECT: 查询数据INSERT: 插入数据UPDATE: 更新数据DELETE: 删除数据TRANSACTION: 事务操作
配置参数:
sql: SQL 语句params: 参数绑定connection: 连接配置transaction: 事务控制
条件执行器 (condition.rs)
功能: 条件判断和分支控制
条件类型:
- 简单比较 (==, !=, >, <, >=, <=)
- 逻辑运算 (AND, OR, NOT)
- 正则匹配
- 自定义表达式
执行逻辑:
- 解析条件表达式
- 从上下文获取变量值
- 执行条件计算
- 返回布尔结果
脚本执行器
JavaScript 执行器 (script_js.rs)
功能: 执行 JavaScript 代码 引擎: V8 引擎 (通过 rusty_v8)
Python 执行器 (script_python.rs)
功能: 执行 Python 代码 引擎: Python 解释器 (通过 pyo3)
Rhai 执行器 (script_rhai.rs)
功能: 执行 Rhai 脚本 引擎: Rhai 脚本引擎
通用特性:
- 沙箱执行环境
- 上下文变量注入
- 执行结果获取
- 错误处理和日志
变量执行器 (variable.rs)
功能: 变量操作和数据转换
操作类型:
SET: 设置变量值GET: 获取变量值TRANSFORM: 数据转换MERGE: 数据合并EXTRACT: 数据提取
上下文管理 (context.rs)
StreamEvent - 流事件
pub enum StreamEvent {
NodeStart { node_id: String }, // 节点开始
NodeComplete { node_id: String }, // 节点完成
NodeError { node_id: String, error: String }, // 节点错误
FlowComplete, // 流程完成
FlowError { error: String }, // 流程错误
}
上下文结构
pub struct ExecutionContext {
pub variables: serde_json::Value, // 变量存储
pub node_results: HashMap<String, serde_json::Value>, // 节点结果
pub execution_log: Vec<LogEntry>, // 执行日志
pub start_time: DateTime<Utc>, // 开始时间
}
上下文操作
- 变量管理: 设置、获取、更新变量
- 结果存储: 保存节点执行结果
- 日志记录: 记录执行过程
- 状态跟踪: 跟踪执行状态
数据映射器 (mappers/)
HTTP 映射器 (http.rs)
功能: HTTP 请求/响应数据映射
数据库映射器 (db.rs)
功能: 数据库查询结果映射
脚本映射器 (script.rs)
功能: 脚本执行结果映射
变量映射器 (variable.rs)
功能: 变量数据类型映射
日志处理 (log_handler.rs)
日志类型
pub enum LogLevel {
Debug,
Info,
Warn,
Error,
}
pub struct LogEntry {
pub level: LogLevel,
pub message: String,
pub timestamp: DateTime<Utc>,
pub node_id: Option<String>,
pub context: serde_json::Value,
}
日志功能
- 执行日志: 记录节点执行过程
- 错误日志: 记录执行错误信息
- 性能日志: 记录执行时间和资源使用
- 调试日志: 记录调试信息
流程执行模式
1. 同步执行
- 阻塞式执行
- 顺序执行节点
- 立即返回结果
- 适用于简单流程
2. 异步执行
- 非阻塞执行
- 后台执行流程
- 通过回调获取结果
- 适用于长时间运行的流程
3. 流式执行
- 实时事件推送
- 执行过程可视化
- 支持中断和恢复
- 适用于交互式流程
4. 批量执行
- 批量处理多个流程
- 资源优化
- 并发控制
- 适用于批处理场景
错误处理
错误类型
pub enum FlowError {
ParseError(String), // 解析错误
ValidationError(String), // 验证错误
ExecutionError(String), // 执行错误
TimeoutError, // 超时错误
ResourceError(String), // 资源错误
}
错误处理策略
- 重试机制: 自动重试失败的节点
- 降级处理: 执行备用逻辑
- 错误传播: 将错误传播到上层
- 日志记录: 详细记录错误信息
性能优化
执行优化
- 并发执行: 无依赖节点并行执行
- 资源池: 复用执行器实例
- 缓存机制: 缓存执行结果
- 懒加载: 按需加载执行器
内存优化
- 上下文清理: 及时清理不需要的数据
- 流式处理: 大数据流式处理
- 对象池: 复用对象实例
- 垃圾回收: 主动触发垃圾回收
网络优化
- 连接复用: HTTP 连接复用
- 请求合并: 合并相似请求
- 超时控制: 合理设置超时时间
- 重试策略: 智能重试机制
监控和调试
执行监控
- 执行状态: 实时监控执行状态
- 性能指标: 收集执行性能数据
- 资源使用: 监控内存和 CPU 使用
- 错误统计: 统计错误发生情况
调试支持
- 断点调试: 支持节点断点
- 单步执行: 逐步执行节点
- 变量查看: 查看执行上下文
- 日志输出: 详细的执行日志
扩展机制
自定义执行器
#[derive(Default)]
pub struct CustomExecutor;
#[async_trait]
impl Executor for CustomExecutor {
async fn execute(
&self,
node_id: &NodeId,
node: &NodeDef,
ctx: &mut serde_json::Value,
) -> anyhow::Result<()> {
// 自定义执行逻辑
Ok(())
}
}
插件系统
- 执行器插件: 扩展新的节点类型
- 中间件插件: 扩展执行过程
- 映射器插件: 扩展数据映射
- 日志插件: 扩展日志处理
最佳实践
流程设计
- 模块化设计: 将复杂流程拆分为子流程
- 错误处理: 为关键节点添加错误处理
- 性能考虑: 避免不必要的数据传递
- 可维护性: 添加适当的注释和文档
节点配置
- 参数验证: 验证节点配置参数
- 默认值: 为可选参数提供默认值
- 类型安全: 使用强类型配置
- 版本兼容: 保持配置向后兼容
执行优化
- 并发控制: 合理设置并发度
- 资源限制: 设置合理的资源限制
- 超时设置: 为长时间运行的节点设置超时
- 监控告警: 添加关键指标监控