# 流程引擎文档 ## 概述 流程引擎是 UdminAI 的核心模块,提供可视化流程设计、执行和监控功能。支持多种节点类型、条件分支、循环控制和并发执行。 ## 架构设计 ### 核心组件 ``` flow/ ├── domain.rs # 领域模型定义 ├── dsl.rs # DSL 解析和构建 ├── engine.rs # 流程执行引擎 ├── context.rs # 执行上下文管理 ├── task.rs # 任务抽象接口 ├── log_handler.rs # 日志处理 ├── mappers.rs # 数据映射器 ├── executors/ # 执行器实现 └── mappers/ # 具体映射器 ``` ## 领域模型 (domain.rs) ### 核心数据结构 #### ChainDef - 流程链定义 ```rust pub struct ChainDef { pub nodes: Vec, // 节点列表 pub links: Vec, // 连接列表 } ``` #### NodeDef - 节点定义 ```rust pub struct NodeDef { pub id: NodeId, // 节点唯一标识 pub kind: NodeKind, // 节点类型 pub data: serde_json::Value, // 节点配置数据 } ``` #### NodeKind - 节点类型 ```rust pub enum NodeKind { Start, // 开始节点 End, // 结束节点 Condition, // 条件节点 Http, // HTTP 请求节点 Database, // 数据库操作节点 ScriptJs, // JavaScript 脚本节点 ScriptPython, // Python 脚本节点 ScriptRhai, // Rhai 脚本节点 Variable, // 变量操作节点 Task, // 通用任务节点 } ``` #### LinkDef - 连接定义 ```rust pub struct LinkDef { pub from: NodeId, // 源节点 pub to: NodeId, // 目标节点 pub condition: Option, // 连接条件 } ``` ## DSL 解析 (dsl.rs) ### DSL 结构 #### FlowDSL - 流程 DSL ```rust pub struct FlowDSL { pub nodes: Vec, // 节点列表 pub edges: Vec, // 边列表 } ``` #### DesignSyntax - 设计语法 ```rust pub struct DesignSyntax { pub nodes: Vec, // 节点语法 pub edges: Vec, // 边语法 } ``` ### 核心功能 #### 1. 设计验证 ```rust pub fn validate_design(design: &DesignSyntax) -> anyhow::Result<()> ``` **验证规则**: - 节点 ID 唯一性 - 至少包含一个 start 节点 - 至少包含一个 end 节点 - 边的引用合法性 - 条件节点配置完整性 #### 2. 链构建 ```rust pub fn build_chain_from_design(design: &DesignSyntax) -> anyhow::Result ``` **构建过程**: 1. 节点类型推断 2. 条件节点处理 3. 边关系建立 4. 数据完整性检查 #### 3. 兼容性处理 ```rust pub fn chain_from_design_json(input: &str) -> anyhow::Result ``` **兼容特性**: - 字符串/对象输入支持 - 字段回填 - 版本兼容 - 错误恢复 ## 执行引擎 (engine.rs) ### FlowEngine - 流程执行引擎 #### 核心结构 ```rust pub struct FlowEngine { tasks: TaskRegistry, // 任务注册表 } ``` #### 执行选项 ```rust pub struct DriveOptions { pub max_steps: Option, // 最大执行步数 pub timeout_ms: Option, // 超时时间 pub parallel: bool, // 并发执行 pub stream_events: bool, // 流事件支持 } ``` ### 执行流程 #### 1. 起点选择 - 优先选择 Start 节点 - 其次选择入度为 0 的节点 - 最后选择第一个节点 #### 2. 执行策略 - **串行执行**: 按依赖顺序逐个执行 - **并发执行**: 无依赖节点并行执行 - **条件分支**: 根据条件选择执行路径 - **循环控制**: 支持循环节点执行 #### 3. 状态管理 - 节点执行状态跟踪 - 上下文数据传递 - 错误状态处理 - 执行结果收集 ### 任务注册表 (TaskRegistry) #### 注册机制 ```rust pub struct TaskRegistry { executors: HashMap>, } ``` #### 执行器接口 ```rust #[async_trait] pub trait Executor: Send + Sync { async fn execute( &self, node_id: &NodeId, node: &NodeDef, ctx: &mut serde_json::Value, ) -> anyhow::Result<()>; } ``` ## 执行器实现 (executors/) ### HTTP 执行器 (http.rs) **功能**: 执行 HTTP 请求 **配置参数**: - `method`: 请求方法 (GET/POST/PUT/DELETE) - `url`: 请求 URL - `headers`: 请求头 - `query`: 查询参数 - `body`: 请求体 - `timeout_ms`: 超时时间 - `insecure`: 忽略 SSL 验证 **执行流程**: 1. 解析 HTTP 配置 2. 构建请求参数 3. 发送 HTTP 请求 4. 处理响应结果 5. 更新执行上下文 ### 数据库执行器 (db.rs) **功能**: 执行数据库操作 **支持操作**: - `SELECT`: 查询数据 - `INSERT`: 插入数据 - `UPDATE`: 更新数据 - `DELETE`: 删除数据 - `TRANSACTION`: 事务操作 **配置参数**: - `sql`: SQL 语句 - `params`: 参数绑定 - `connection`: 连接配置 - `transaction`: 事务控制 ### 条件执行器 (condition.rs) **功能**: 条件判断和分支控制 **条件类型**: - 简单比较 (==, !=, >, <, >=, <=) - 逻辑运算 (AND, OR, NOT) - 正则匹配 - 自定义表达式 **执行逻辑**: 1. 解析条件表达式 2. 从上下文获取变量值 3. 执行条件计算 4. 返回布尔结果 ### 脚本执行器 #### JavaScript 执行器 (script_js.rs) **功能**: 执行 JavaScript 代码 **引擎**: V8 引擎 (通过 rusty_v8) #### Python 执行器 (script_python.rs) **功能**: 执行 Python 代码 **引擎**: Python 解释器 (通过 pyo3) #### Rhai 执行器 (script_rhai.rs) **功能**: 执行 Rhai 脚本 **引擎**: Rhai 脚本引擎 **通用特性**: - 沙箱执行环境 - 上下文变量注入 - 执行结果获取 - 错误处理和日志 ### 变量执行器 (variable.rs) **功能**: 变量操作和数据转换 **操作类型**: - `SET`: 设置变量值 - `GET`: 获取变量值 - `TRANSFORM`: 数据转换 - `MERGE`: 数据合并 - `EXTRACT`: 数据提取 ## 上下文管理 (context.rs) ### StreamEvent - 流事件 ```rust pub enum StreamEvent { NodeStart { node_id: String }, // 节点开始 NodeComplete { node_id: String }, // 节点完成 NodeError { node_id: String, error: String }, // 节点错误 FlowComplete, // 流程完成 FlowError { error: String }, // 流程错误 } ``` ### 上下文结构 ```rust pub struct ExecutionContext { pub variables: serde_json::Value, // 变量存储 pub node_results: HashMap, // 节点结果 pub execution_log: Vec, // 执行日志 pub start_time: DateTime, // 开始时间 } ``` ### 上下文操作 - **变量管理**: 设置、获取、更新变量 - **结果存储**: 保存节点执行结果 - **日志记录**: 记录执行过程 - **状态跟踪**: 跟踪执行状态 ## 数据映射器 (mappers/) ### HTTP 映射器 (http.rs) **功能**: HTTP 请求/响应数据映射 ### 数据库映射器 (db.rs) **功能**: 数据库查询结果映射 ### 脚本映射器 (script.rs) **功能**: 脚本执行结果映射 ### 变量映射器 (variable.rs) **功能**: 变量数据类型映射 ## 日志处理 (log_handler.rs) ### 日志类型 ```rust pub enum LogLevel { Debug, Info, Warn, Error, } pub struct LogEntry { pub level: LogLevel, pub message: String, pub timestamp: DateTime, pub node_id: Option, pub context: serde_json::Value, } ``` ### 日志功能 - **执行日志**: 记录节点执行过程 - **错误日志**: 记录执行错误信息 - **性能日志**: 记录执行时间和资源使用 - **调试日志**: 记录调试信息 ## 流程执行模式 ### 1. 同步执行 - 阻塞式执行 - 顺序执行节点 - 立即返回结果 - 适用于简单流程 ### 2. 异步执行 - 非阻塞执行 - 后台执行流程 - 通过回调获取结果 - 适用于长时间运行的流程 ### 3. 流式执行 - 实时事件推送 - 执行过程可视化 - 支持中断和恢复 - 适用于交互式流程 ### 4. 批量执行 - 批量处理多个流程 - 资源优化 - 并发控制 - 适用于批处理场景 ## 错误处理 ### 错误类型 ```rust pub enum FlowError { ParseError(String), // 解析错误 ValidationError(String), // 验证错误 ExecutionError(String), // 执行错误 TimeoutError, // 超时错误 ResourceError(String), // 资源错误 } ``` ### 错误处理策略 - **重试机制**: 自动重试失败的节点 - **降级处理**: 执行备用逻辑 - **错误传播**: 将错误传播到上层 - **日志记录**: 详细记录错误信息 ## 性能优化 ### 执行优化 - **并发执行**: 无依赖节点并行执行 - **资源池**: 复用执行器实例 - **缓存机制**: 缓存执行结果 - **懒加载**: 按需加载执行器 ### 内存优化 - **上下文清理**: 及时清理不需要的数据 - **流式处理**: 大数据流式处理 - **对象池**: 复用对象实例 - **垃圾回收**: 主动触发垃圾回收 ### 网络优化 - **连接复用**: HTTP 连接复用 - **请求合并**: 合并相似请求 - **超时控制**: 合理设置超时时间 - **重试策略**: 智能重试机制 ## 监控和调试 ### 执行监控 - **执行状态**: 实时监控执行状态 - **性能指标**: 收集执行性能数据 - **资源使用**: 监控内存和 CPU 使用 - **错误统计**: 统计错误发生情况 ### 调试支持 - **断点调试**: 支持节点断点 - **单步执行**: 逐步执行节点 - **变量查看**: 查看执行上下文 - **日志输出**: 详细的执行日志 ## 扩展机制 ### 自定义执行器 ```rust #[derive(Default)] pub struct CustomExecutor; #[async_trait] impl Executor for CustomExecutor { async fn execute( &self, node_id: &NodeId, node: &NodeDef, ctx: &mut serde_json::Value, ) -> anyhow::Result<()> { // 自定义执行逻辑 Ok(()) } } ``` ### 插件系统 - **执行器插件**: 扩展新的节点类型 - **中间件插件**: 扩展执行过程 - **映射器插件**: 扩展数据映射 - **日志插件**: 扩展日志处理 ## 最佳实践 ### 流程设计 - **模块化设计**: 将复杂流程拆分为子流程 - **错误处理**: 为关键节点添加错误处理 - **性能考虑**: 避免不必要的数据传递 - **可维护性**: 添加适当的注释和文档 ### 节点配置 - **参数验证**: 验证节点配置参数 - **默认值**: 为可选参数提供默认值 - **类型安全**: 使用强类型配置 - **版本兼容**: 保持配置向后兼容 ### 执行优化 - **并发控制**: 合理设置并发度 - **资源限制**: 设置合理的资源限制 - **超时设置**: 为长时间运行的节点设置超时 - **监控告警**: 添加关键指标监控