Files
udmin/docs/FLOW_ENGINE.md
ayou a3f2f99a68 docs: 添加项目文档包括总览、架构、流程引擎和服务层
新增以下文档文件:
- PROJECT_OVERVIEW.md 项目总览文档
- BACKEND_ARCHITECTURE.md 后端架构文档
- FRONTEND_ARCHITECTURE.md 前端架构文档
- FLOW_ENGINE.md 流程引擎文档
- SERVICES.md 服务层文档
- ERROR_HANDLING.md 错误处理模块文档

文档内容涵盖项目整体介绍、技术架构、核心模块设计和实现细节
2025-09-24 20:21:45 +08:00

11 KiB

流程引擎文档

概述

流程引擎是 UdminAI 的核心模块,提供可视化流程设计、执行和监控功能。支持多种节点类型、条件分支、循环控制和并发执行。

架构设计

核心组件

flow/
├── domain.rs          # 领域模型定义
├── dsl.rs             # DSL 解析和构建
├── engine.rs          # 流程执行引擎
├── context.rs         # 执行上下文管理
├── task.rs            # 任务抽象接口
├── log_handler.rs     # 日志处理
├── mappers.rs         # 数据映射器
├── executors/         # 执行器实现
└── mappers/           # 具体映射器

领域模型 (domain.rs)

核心数据结构

ChainDef - 流程链定义

pub struct ChainDef {
    pub nodes: Vec<NodeDef>,    // 节点列表
    pub links: Vec<LinkDef>,    // 连接列表
}

NodeDef - 节点定义

pub struct NodeDef {
    pub id: NodeId,             // 节点唯一标识
    pub kind: NodeKind,         // 节点类型
    pub data: serde_json::Value, // 节点配置数据
}

NodeKind - 节点类型

pub enum NodeKind {
    Start,                      // 开始节点
    End,                        // 结束节点
    Condition,                  // 条件节点
    Http,                       // HTTP 请求节点
    Database,                   // 数据库操作节点
    ScriptJs,                   // JavaScript 脚本节点
    ScriptPython,               // Python 脚本节点
    ScriptRhai,                 // Rhai 脚本节点
    Variable,                   // 变量操作节点
    Task,                       // 通用任务节点
}

LinkDef - 连接定义

pub struct LinkDef {
    pub from: NodeId,           // 源节点
    pub to: NodeId,             // 目标节点
    pub condition: Option<String>, // 连接条件
}

DSL 解析 (dsl.rs)

DSL 结构

FlowDSL - 流程 DSL

pub struct FlowDSL {
    pub nodes: Vec<NodeDSL>,    // 节点列表
    pub edges: Vec<EdgeDSL>,    // 边列表
}

DesignSyntax - 设计语法

pub struct DesignSyntax {
    pub nodes: Vec<NodeSyntax>, // 节点语法
    pub edges: Vec<EdgeSyntax>, // 边语法
}

核心功能

1. 设计验证

pub fn validate_design(design: &DesignSyntax) -> anyhow::Result<()>

验证规则:

  • 节点 ID 唯一性
  • 至少包含一个 start 节点
  • 至少包含一个 end 节点
  • 边的引用合法性
  • 条件节点配置完整性

2. 链构建

pub fn build_chain_from_design(design: &DesignSyntax) -> anyhow::Result<ChainDef>

构建过程:

  1. 节点类型推断
  2. 条件节点处理
  3. 边关系建立
  4. 数据完整性检查

3. 兼容性处理

pub fn chain_from_design_json(input: &str) -> anyhow::Result<ChainDef>

兼容特性:

  • 字符串/对象输入支持
  • 字段回填
  • 版本兼容
  • 错误恢复

执行引擎 (engine.rs)

FlowEngine - 流程执行引擎

核心结构

pub struct FlowEngine {
    tasks: TaskRegistry,        // 任务注册表
}

执行选项

pub struct DriveOptions {
    pub max_steps: Option<usize>,     // 最大执行步数
    pub timeout_ms: Option<u64>,      // 超时时间
    pub parallel: bool,               // 并发执行
    pub stream_events: bool,          // 流事件支持
}

执行流程

1. 起点选择

  • 优先选择 Start 节点
  • 其次选择入度为 0 的节点
  • 最后选择第一个节点

2. 执行策略

  • 串行执行: 按依赖顺序逐个执行
  • 并发执行: 无依赖节点并行执行
  • 条件分支: 根据条件选择执行路径
  • 循环控制: 支持循环节点执行

3. 状态管理

  • 节点执行状态跟踪
  • 上下文数据传递
  • 错误状态处理
  • 执行结果收集

任务注册表 (TaskRegistry)

注册机制

pub struct TaskRegistry {
    executors: HashMap<NodeKind, Box<dyn Executor>>,
}

执行器接口

#[async_trait]
pub trait Executor: Send + Sync {
    async fn execute(
        &self,
        node_id: &NodeId,
        node: &NodeDef,
        ctx: &mut serde_json::Value,
    ) -> anyhow::Result<()>;
}

执行器实现 (executors/)

HTTP 执行器 (http.rs)

功能: 执行 HTTP 请求

配置参数:

  • method: 请求方法 (GET/POST/PUT/DELETE)
  • url: 请求 URL
  • headers: 请求头
  • query: 查询参数
  • body: 请求体
  • timeout_ms: 超时时间
  • insecure: 忽略 SSL 验证

执行流程:

  1. 解析 HTTP 配置
  2. 构建请求参数
  3. 发送 HTTP 请求
  4. 处理响应结果
  5. 更新执行上下文

数据库执行器 (db.rs)

功能: 执行数据库操作

支持操作:

  • SELECT: 查询数据
  • INSERT: 插入数据
  • UPDATE: 更新数据
  • DELETE: 删除数据
  • TRANSACTION: 事务操作

配置参数:

  • sql: SQL 语句
  • params: 参数绑定
  • connection: 连接配置
  • transaction: 事务控制

条件执行器 (condition.rs)

功能: 条件判断和分支控制

条件类型:

  • 简单比较 (==, !=, >, <, >=, <=)
  • 逻辑运算 (AND, OR, NOT)
  • 正则匹配
  • 自定义表达式

执行逻辑:

  1. 解析条件表达式
  2. 从上下文获取变量值
  3. 执行条件计算
  4. 返回布尔结果

脚本执行器

JavaScript 执行器 (script_js.rs)

功能: 执行 JavaScript 代码 引擎: V8 引擎 (通过 rusty_v8)

Python 执行器 (script_python.rs)

功能: 执行 Python 代码 引擎: Python 解释器 (通过 pyo3)

Rhai 执行器 (script_rhai.rs)

功能: 执行 Rhai 脚本 引擎: Rhai 脚本引擎

通用特性:

  • 沙箱执行环境
  • 上下文变量注入
  • 执行结果获取
  • 错误处理和日志

变量执行器 (variable.rs)

功能: 变量操作和数据转换

操作类型:

  • SET: 设置变量值
  • GET: 获取变量值
  • TRANSFORM: 数据转换
  • MERGE: 数据合并
  • EXTRACT: 数据提取

上下文管理 (context.rs)

StreamEvent - 流事件

pub enum StreamEvent {
    NodeStart { node_id: String },              // 节点开始
    NodeComplete { node_id: String },           // 节点完成
    NodeError { node_id: String, error: String }, // 节点错误
    FlowComplete,                               // 流程完成
    FlowError { error: String },                // 流程错误
}

上下文结构

pub struct ExecutionContext {
    pub variables: serde_json::Value,           // 变量存储
    pub node_results: HashMap<String, serde_json::Value>, // 节点结果
    pub execution_log: Vec<LogEntry>,           // 执行日志
    pub start_time: DateTime<Utc>,              // 开始时间
}

上下文操作

  • 变量管理: 设置、获取、更新变量
  • 结果存储: 保存节点执行结果
  • 日志记录: 记录执行过程
  • 状态跟踪: 跟踪执行状态

数据映射器 (mappers/)

HTTP 映射器 (http.rs)

功能: HTTP 请求/响应数据映射

数据库映射器 (db.rs)

功能: 数据库查询结果映射

脚本映射器 (script.rs)

功能: 脚本执行结果映射

变量映射器 (variable.rs)

功能: 变量数据类型映射

日志处理 (log_handler.rs)

日志类型

pub enum LogLevel {
    Debug,
    Info,
    Warn,
    Error,
}

pub struct LogEntry {
    pub level: LogLevel,
    pub message: String,
    pub timestamp: DateTime<Utc>,
    pub node_id: Option<String>,
    pub context: serde_json::Value,
}

日志功能

  • 执行日志: 记录节点执行过程
  • 错误日志: 记录执行错误信息
  • 性能日志: 记录执行时间和资源使用
  • 调试日志: 记录调试信息

流程执行模式

1. 同步执行

  • 阻塞式执行
  • 顺序执行节点
  • 立即返回结果
  • 适用于简单流程

2. 异步执行

  • 非阻塞执行
  • 后台执行流程
  • 通过回调获取结果
  • 适用于长时间运行的流程

3. 流式执行

  • 实时事件推送
  • 执行过程可视化
  • 支持中断和恢复
  • 适用于交互式流程

4. 批量执行

  • 批量处理多个流程
  • 资源优化
  • 并发控制
  • 适用于批处理场景

错误处理

错误类型

pub enum FlowError {
    ParseError(String),                         // 解析错误
    ValidationError(String),                    // 验证错误
    ExecutionError(String),                     // 执行错误
    TimeoutError,                               // 超时错误
    ResourceError(String),                      // 资源错误
}

错误处理策略

  • 重试机制: 自动重试失败的节点
  • 降级处理: 执行备用逻辑
  • 错误传播: 将错误传播到上层
  • 日志记录: 详细记录错误信息

性能优化

执行优化

  • 并发执行: 无依赖节点并行执行
  • 资源池: 复用执行器实例
  • 缓存机制: 缓存执行结果
  • 懒加载: 按需加载执行器

内存优化

  • 上下文清理: 及时清理不需要的数据
  • 流式处理: 大数据流式处理
  • 对象池: 复用对象实例
  • 垃圾回收: 主动触发垃圾回收

网络优化

  • 连接复用: HTTP 连接复用
  • 请求合并: 合并相似请求
  • 超时控制: 合理设置超时时间
  • 重试策略: 智能重试机制

监控和调试

执行监控

  • 执行状态: 实时监控执行状态
  • 性能指标: 收集执行性能数据
  • 资源使用: 监控内存和 CPU 使用
  • 错误统计: 统计错误发生情况

调试支持

  • 断点调试: 支持节点断点
  • 单步执行: 逐步执行节点
  • 变量查看: 查看执行上下文
  • 日志输出: 详细的执行日志

扩展机制

自定义执行器

#[derive(Default)]
pub struct CustomExecutor;

#[async_trait]
impl Executor for CustomExecutor {
    async fn execute(
        &self,
        node_id: &NodeId,
        node: &NodeDef,
        ctx: &mut serde_json::Value,
    ) -> anyhow::Result<()> {
        // 自定义执行逻辑
        Ok(())
    }
}

插件系统

  • 执行器插件: 扩展新的节点类型
  • 中间件插件: 扩展执行过程
  • 映射器插件: 扩展数据映射
  • 日志插件: 扩展日志处理

最佳实践

流程设计

  • 模块化设计: 将复杂流程拆分为子流程
  • 错误处理: 为关键节点添加错误处理
  • 性能考虑: 避免不必要的数据传递
  • 可维护性: 添加适当的注释和文档

节点配置

  • 参数验证: 验证节点配置参数
  • 默认值: 为可选参数提供默认值
  • 类型安全: 使用强类型配置
  • 版本兼容: 保持配置向后兼容

执行优化

  • 并发控制: 合理设置并发度
  • 资源限制: 设置合理的资源限制
  • 超时设置: 为长时间运行的节点设置超时
  • 监控告警: 添加关键指标监控