feat(调度任务): 实现调度任务管理功能

新增调度任务模块,支持任务的增删改查、启停及手动执行
- 后端添加 schedule_job 模型、服务、路由及调度器工具类
- 前端新增调度任务管理页面
- 修改 flow 相关接口将 id 类型从 String 改为 i64
- 添加 tokio-cron-scheduler 依赖实现定时任务调度
- 初始化时加载已启用任务并注册到调度器
This commit is contained in:
2025-09-24 00:21:30 +08:00
parent cadd336dee
commit 8c06849254
29 changed files with 1253 additions and 103 deletions

View File

@ -0,0 +1,124 @@
//! 模块定时任务服务Service Layer
//! 职责:
//! 1) 负责定时任务schedule_jobs的数据库增删改查
//! 2) 在创建/更新/删除后与调度器同步;
//! 3) 服务启动时加载已启用任务并注册。
use std::{future::Future, pin::Pin, sync::Arc};
use chrono::{DateTime, FixedOffset, Utc};
use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, QueryFilter, QueryOrder, Set};
use tokio_cron_scheduler::Job;
use tracing::{error, info};
use crate::{db::Db, error::AppError, models::schedule_job, utils};
/// 通用分页响应体
#[derive(serde::Serialize)]
pub struct PageResp<T> {
pub items: Vec<T>,
pub total: u64,
pub page: u64,
pub page_size: u64,
}
/// 任务文档(对外返回 DTO
#[derive(serde::Deserialize, serde::Serialize, Clone, Debug)]
pub struct ScheduleJobDoc {
pub id: i64,
pub name: String,
pub cron_expr: String,
pub enabled: bool,
pub flow_code: String,
pub created_at: DateTime<FixedOffset>,
pub updated_at: DateTime<FixedOffset>,
}
impl From<schedule_job::Model> for ScheduleJobDoc {
fn from(m: schedule_job::Model) -> Self {
Self {
id: m.id,
name: m.name,
cron_expr: m.cron_expr,
enabled: m.enabled,
flow_code: m.flow_code,
created_at: m.created_at,
updated_at: m.updated_at,
}
}
}
/// 创建任务请求体
#[derive(serde::Deserialize)]
pub struct CreateReq {
pub name: String,
pub cron_expr: String,
pub enabled: bool,
pub flow_code: String,
}
/// 获取当前 UTC 时间并转为固定偏移
fn now_fixed_offset() -> DateTime<FixedOffset> {
Utc::now().with_timezone(&FixedOffset::east_opt(0).unwrap())
}
/// 创建任务
pub async fn create(db: &Db, req: CreateReq) -> Result<ScheduleJobDoc, AppError> {
// 1) 校验 cron 表达式
Job::new_async(&req.cron_expr, |_id, _l| Box::pin(async {}))
.map_err(|e| AppError::BadRequest(format!("无效的 cron 表达式: {e}")))?;
// 2) 入库
let am = schedule_job::ActiveModel {
id: Set(crate::utils::generate_id()),
name: Set(req.name),
cron_expr: Set(req.cron_expr),
enabled: Set(req.enabled),
flow_code: Set(req.flow_code),
created_at: Set(now_fixed_offset()),
updated_at: Set(now_fixed_offset()),
};
let m = am.insert(db).await?;
// 3) 同步调度器
let executor = build_executor_for_job(db, &m);
utils::add_or_update_job_by_model(&m, executor).await.map_err(AppError::Anyhow)?;
Ok(m.into())
}
/// 构建任务执行闭包JobExecutor
fn build_executor_for_job(db: &Db, m: &schedule_job::Model) -> utils::JobExecutor {
let db = db.clone();
let job_id = m.id;
let job_name = m.name.clone();
Arc::new(move || {
let db = db.clone();
let job_id = job_id;
let job_name = job_name.clone();
Box::pin(async move {
match schedule_job::Entity::find_by_id(job_id).one(&db).await {
Ok(Some(model)) if !model.enabled => {
info!(target = "udmin", job = %job_name, id = %job_id, "scheduler.tick.skip");
return;
}
Ok(None) => {
info!(target = "udmin", job = %job_name, id = %job_id, "scheduler.tick.deleted");
if let Err(e) = utils::remove_job_by_id(&job_id).await {
error!(target = "udmin", id = %job_id, error = %e, "scheduler.self_remove.failed");
}
return;
}
Err(e) => {
error!(target = "udmin", job = %job_name, id = %job_id, error = %e, "scheduler.tick.error");
return;
}
_ => {}
}
info!(target = "udmin", job = %job_name, "scheduler.tick.start");
}) as Pin<Box<dyn Future<Output = ()> + Send>>
})
}

View File

@ -0,0 +1,152 @@
# Rust 代码风格规范(用于 AI 生成代码规则)
## 1. 基础风格
* 使用 **Rust 2021 edition**
* 缩进统一 **4 空格**,不使用 Tab。
* 每行代码长度建议不超过 **100 字符**
* **花括号风格**
```rust
fn example() {
// good
}
```
* 表达式尽量简洁,必要时换行,参数链式调用时 **缩进对齐**。
---
## 2. 模块与导入
* 模块顶部导入,按以下顺序分组,组间空一行:
1. **标准库 (`std::..`)**
2. **第三方库 (`chrono`, `sea_orm`, `tokio` 等)**
3. **本地 crate (`crate::..`)**
* 统一使用 **显式导入**,禁止 `use super::*` 或 `use crate::*`。
* 相同模块导入合并:
```rust
use sea_orm::{EntityTrait, QueryFilter, ColumnTrait};
```
---
## 3. 命名规则
* **模块 / 文件名**`snake_case`
* **函数 / 变量名**`snake_case`
* **结构体 / 枚举名**`PascalCase`
* **常量**`UPPER_CASE`
* **DTO/请求体/响应体**后缀:`Doc` / `Req` / `Resp`
示例:
```rust
pub struct ScheduleJobDoc { .. }
pub struct CreateReq { .. }
pub struct PageResp<T> { .. }
```
---
## 4. 文档与注释
* 每个 **模块** 顶部使用 `//!` 写模块职责说明。
* 每个 **公开函数** 必须有 `///` 注释,简述用途与主要逻辑。
* 内部复杂逻辑使用 `//` 单行注释解释。
* 中文注释优先,避免英文缩写晦涩难懂。
示例:
```rust
/// 创建任务:
/// - 校验 cron 表达式
/// - 校验唯一性
/// - 入库后注册调度器
pub async fn create(..) -> Result<..> { .. }
```
---
## 5. 错误处理
* 错误类型统一用 **自定义错误枚举**(如 `AppError`)。
* 不直接 `unwrap()` / `expect()`,统一返回 `Result<T, AppError>`。
* 错误信息应清晰且面向用户,内部日志保留技术细节。
---
## 6. 日志规范
* 使用 `tracing` 库,必须带 `target`。
* 统一格式:`模块.操作.状态`
* 日志字段使用 `key = %value` 或 `key = ?value`,避免拼接字符串。
示例:
```rust
info!(target = "udmin", id = %job_id, enabled = %enabled, "schedule_jobs.update.persisted");
error!(target = "udmin", id = %job_id, error = %e, "schedule_jobs.run.failed");
```
---
## 7. 异步与数据库
* 使用 `async fn`,返回 `Result<T, AppError>`。
* SeaORM 查询使用链式写法,**按字段过滤**时一行一个 filter。
* 分页/排序明确写出,不隐式。
示例:
```rust
let jobs = schedule_job::Entity::find()
.filter(schedule_job::Column::Enabled.eq(true))
.order_by_desc(schedule_job::Column::UpdatedAt)
.paginate(db, page_size);
```
---
## 8. 结构体组织
* DTO / 请求体 / 响应体 放在模块前部。
* Service 函数按生命周期顺序:`list → create → update → remove → init`。
* 工具函数(如 `build_executor_for_job`、`now_fixed_offset`)放在模块最后。
---
## 9. 闭包与异步执行器
* 使用 `Arc::new(move || { Box::pin(async move { .. }) })` 形式。
* 避免重复 clone大对象提前 clone 一次再 move 进闭包。
* 返回 `Pin<Box<dyn Future<Output = ()> + Send>>`。
---
## 10. 时间与 ID
* 时间统一用 `chrono::Utc::now().with_timezone(&FixedOffset::east_opt(0).unwrap())`,可封装成 `now_fixed_offset()`。
* ID 统一使用 `id: Set(crate::utils::generate_id())`。
---
## 11. 统一返回体
* 分页接口统一返回 `PageResp<T>`。
* 单条数据返回 DTO如 `ScheduleJobDoc`)。
* 删除接口返回 `Result<(), AppError>`。
---
## 12. 代码整洁性
* 避免嵌套过深,必要时提前 `return`。
* 冗余 clone 使用 `.clone()` 仅在必须时。
* 枚举 / match 分支完整,必要时加 `_ => {}` 显式忽略。
---
⚡ 总结:
生成的代码必须 **简洁、清晰、分组有序、日志一致、错误优雅**,看起来像经验丰富的 Rust 高手写的生产级代码。

36
backend/Cargo.lock generated
View File

@ -661,6 +661,15 @@ dependencies = [
"cfg-if", "cfg-if",
] ]
[[package]]
name = "croner"
version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c344b0690c1ad1c7176fe18eb173e0c927008fdaaa256e40dfd43ddd149c0843"
dependencies = [
"chrono",
]
[[package]] [[package]]
name = "crossbeam-queue" name = "crossbeam-queue"
version = "0.3.12" version = "0.3.12"
@ -1763,6 +1772,17 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9"
[[package]]
name = "num-derive"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed3955f1a9c7c0c15e092f9c887db08b1fc683305fdf6eb6684f22555355e202"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.106",
]
[[package]] [[package]]
name = "num-integer" name = "num-integer"
version = "0.1.46" version = "0.1.46"
@ -3637,6 +3657,21 @@ dependencies = [
"windows-sys 0.59.0", "windows-sys 0.59.0",
] ]
[[package]]
name = "tokio-cron-scheduler"
version = "0.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c71ce8f810abc9fabebccc30302a952f9e89c6cf246fafaf170fef164063141"
dependencies = [
"chrono",
"croner",
"num-derive",
"num-traits",
"tokio",
"tracing",
"uuid",
]
[[package]] [[package]]
name = "tokio-macros" name = "tokio-macros"
version = "2.5.0" version = "2.5.0"
@ -3926,6 +3961,7 @@ dependencies = [
"sha2", "sha2",
"thiserror", "thiserror",
"tokio", "tokio",
"tokio-cron-scheduler",
"tokio-stream", "tokio-stream",
"tower", "tower",
"tower-http", "tower-http",

View File

@ -39,6 +39,7 @@ regex = "1.11.2"
reqwest = { version = "0.12.23", features = ["json", "rustls-tls-native-roots"], default-features = false } reqwest = { version = "0.12.23", features = ["json", "rustls-tls-native-roots"], default-features = false }
futures = "0.3.31" futures = "0.3.31"
percent-encoding = "2.3" percent-encoding = "2.3"
tokio-cron-scheduler = "0.14.0"
# 新增: QuickJS 运行时用于 JS 执行器(不启用额外特性) # 新增: QuickJS 运行时用于 JS 执行器(不启用额外特性)
rquickjs = "0.9.0" rquickjs = "0.9.0"
# 新增: 用于将 mpsc::Receiver 封装为 StreamSSE # 新增: 用于将 mpsc::Receiver 封装为 StreamSSE

View File

@ -23,6 +23,8 @@ mod m20220101_000016_add_unique_index_to_flows_code;
mod m20220101_000017_create_flow_run_logs; mod m20220101_000017_create_flow_run_logs;
// 新增:为 flow_run_logs 添加 flow_code 列 // 新增:为 flow_run_logs 添加 flow_code 列
mod m20220101_000018_add_flow_code_to_flow_run_logs; mod m20220101_000018_add_flow_code_to_flow_run_logs;
// 新增:计划任务表
mod m20220101_000019_create_schedule_jobs;
pub struct Migrator; pub struct Migrator;
@ -55,6 +57,8 @@ impl MigratorTrait for Migrator {
Box::new(m20220101_000017_create_flow_run_logs::Migration), Box::new(m20220101_000017_create_flow_run_logs::Migration),
// 新增:为 flow_run_logs 添加 flow_code 列 // 新增:为 flow_run_logs 添加 flow_code 列
Box::new(m20220101_000018_add_flow_code_to_flow_run_logs::Migration), Box::new(m20220101_000018_add_flow_code_to_flow_run_logs::Migration),
// 新增:计划任务表
Box::new(m20220101_000019_create_schedule_jobs::Migration),
] ]
} }
} }

View File

@ -0,0 +1,42 @@
use sea_orm_migration::prelude::*;
#[derive(DeriveMigrationName)]
pub struct Migration;
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.create_table(
Table::create()
.table(ScheduleJobs::Table)
.if_not_exists()
.col(ColumnDef::new(ScheduleJobs::Id).string_len(64).not_null().primary_key())
.col(ColumnDef::new(ScheduleJobs::Name).string().not_null())
.col(ColumnDef::new(ScheduleJobs::CronExpr).string().not_null())
.col(ColumnDef::new(ScheduleJobs::Enabled).boolean().not_null().default(false))
.col(ColumnDef::new(ScheduleJobs::FlowCode).string().not_null())
.col(ColumnDef::new(ScheduleJobs::CreatedAt).timestamp().not_null().default(Expr::current_timestamp()))
.col(ColumnDef::new(ScheduleJobs::UpdatedAt).timestamp().not_null().default(Expr::current_timestamp()))
.to_owned()
)
.await?;
Ok(())
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager.drop_table(Table::drop().table(ScheduleJobs::Table).to_owned()).await
}
}
#[derive(Iden)]
enum ScheduleJobs {
Table,
Id,
Name,
CronExpr,
Enabled,
FlowCode,
CreatedAt,
UpdatedAt,
}

View File

@ -11,19 +11,19 @@ use crate::db::Db;
#[async_trait] #[async_trait]
pub trait FlowLogHandler: Send + Sync { pub trait FlowLogHandler: Send + Sync {
/// 记录流程开始执行 /// 记录流程开始执行
async fn log_start(&self, flow_id: &str, flow_code: Option<&str>, input: &Value, operator: Option<(i64, String)>) -> anyhow::Result<()>; async fn log_start(&self, flow_id: i64, flow_code: Option<&str>, input: &Value, operator: Option<(i64, String)>) -> anyhow::Result<()>;
/// 记录流程执行失败(仅包含错误信息) /// 记录流程执行失败(仅包含错误信息)
async fn log_error(&self, flow_id: &str, flow_code: Option<&str>, input: &Value, error_msg: &str, operator: Option<(i64, String)>, started_at: DateTime<FixedOffset>, duration_ms: i64) -> anyhow::Result<()>; async fn log_error(&self, flow_id: i64, flow_code: Option<&str>, input: &Value, error_msg: &str, operator: Option<(i64, String)>, started_at: DateTime<FixedOffset>, duration_ms: i64) -> anyhow::Result<()>;
/// 记录流程执行失败(包含部分输出与累计日志) /// 记录流程执行失败(包含部分输出与累计日志)
async fn log_error_detail(&self, _flow_id: &str, _flow_code: Option<&str>, _input: &Value, _output: &Value, _logs: &[String], error_msg: &str, _operator: Option<(i64, String)>, _started_at: DateTime<FixedOffset>, _duration_ms: i64) -> anyhow::Result<()> { async fn log_error_detail(&self, _flow_id: i64, _flow_code: Option<&str>, _input: &Value, _output: &Value, _logs: &[String], error_msg: &str, _operator: Option<(i64, String)>, _started_at: DateTime<FixedOffset>, _duration_ms: i64) -> anyhow::Result<()> {
// 默认实现:退化为仅错误信息 // 默认实现:退化为仅错误信息
self.log_error(_flow_id, _flow_code, _input, error_msg, _operator, _started_at, _duration_ms).await self.log_error(_flow_id, _flow_code, _input, error_msg, _operator, _started_at, _duration_ms).await
} }
/// 记录流程执行成功 /// 记录流程执行成功
async fn log_success(&self, flow_id: &str, flow_code: Option<&str>, input: &Value, output: &Value, logs: &[String], operator: Option<(i64, String)>, started_at: DateTime<FixedOffset>, duration_ms: i64) -> anyhow::Result<()>; async fn log_success(&self, flow_id: i64, flow_code: Option<&str>, input: &Value, output: &Value, logs: &[String], operator: Option<(i64, String)>, started_at: DateTime<FixedOffset>, duration_ms: i64) -> anyhow::Result<()>;
/// 推送节点执行事件仅SSE实现需要 /// 推送节点执行事件仅SSE实现需要
async fn emit_node_event(&self, _node_id: &str, _event_type: &str, _data: &Value) -> anyhow::Result<()> { async fn emit_node_event(&self, _node_id: &str, _event_type: &str, _data: &Value) -> anyhow::Result<()> {
@ -51,15 +51,15 @@ impl DatabaseLogHandler {
#[async_trait] #[async_trait]
impl FlowLogHandler for DatabaseLogHandler { impl FlowLogHandler for DatabaseLogHandler {
async fn log_start(&self, _flow_id: &str, _flow_code: Option<&str>, _input: &Value, _operator: Option<(i64, String)>) -> anyhow::Result<()> { async fn log_start(&self, _flow_id: i64, _flow_code: Option<&str>, _input: &Value, _operator: Option<(i64, String)>) -> anyhow::Result<()> {
// 数据库日志处理器不需要记录开始事件,只在结束时记录 // 数据库日志处理器不需要记录开始事件,只在结束时记录
Ok(()) Ok(())
} }
async fn log_error(&self, flow_id: &str, flow_code: Option<&str>, input: &Value, error_msg: &str, operator: Option<(i64, String)>, started_at: DateTime<FixedOffset>, duration_ms: i64) -> anyhow::Result<()> { async fn log_error(&self, flow_id: i64, flow_code: Option<&str>, input: &Value, error_msg: &str, operator: Option<(i64, String)>, started_at: DateTime<FixedOffset>, duration_ms: i64) -> anyhow::Result<()> {
let (user_id, username) = operator.map(|(u, n)| (Some(u), Some(n))).unwrap_or((None, None)); let (user_id, username) = operator.map(|(u, n)| (Some(u), Some(n))).unwrap_or((None, None));
flow_run_log_service::create(&self.db, CreateRunLogInput { flow_run_log_service::create(&self.db, CreateRunLogInput {
flow_id: flow_id.to_string(), flow_id,
flow_code: flow_code.map(|s| s.to_string()), flow_code: flow_code.map(|s| s.to_string()),
input: Some(serde_json::to_string(input).unwrap_or_default()), input: Some(serde_json::to_string(input).unwrap_or_default()),
output: None, output: None,
@ -73,7 +73,7 @@ impl FlowLogHandler for DatabaseLogHandler {
Ok(()) Ok(())
} }
async fn log_error_detail(&self, flow_id: &str, flow_code: Option<&str>, input: &Value, output: &Value, logs: &[String], error_msg: &str, operator: Option<(i64, String)>, started_at: DateTime<FixedOffset>, duration_ms: i64) -> anyhow::Result<()> { async fn log_error_detail(&self, flow_id: i64, flow_code: Option<&str>, input: &Value, output: &Value, logs: &[String], error_msg: &str, operator: Option<(i64, String)>, started_at: DateTime<FixedOffset>, duration_ms: i64) -> anyhow::Result<()> {
let (user_id, username) = operator.map(|(u, n)| (Some(u), Some(n))).unwrap_or((None, None)); let (user_id, username) = operator.map(|(u, n)| (Some(u), Some(n))).unwrap_or((None, None));
// 将 error_msg 附加到日志尾部(若最后一条不同),确保日志中有清晰的错误描述且不重复 // 将 error_msg 附加到日志尾部(若最后一条不同),确保日志中有清晰的错误描述且不重复
let mut all_logs = logs.to_vec(); let mut all_logs = logs.to_vec();
@ -81,7 +81,7 @@ impl FlowLogHandler for DatabaseLogHandler {
all_logs.push(error_msg.to_string()); all_logs.push(error_msg.to_string());
} }
flow_run_log_service::create(&self.db, CreateRunLogInput { flow_run_log_service::create(&self.db, CreateRunLogInput {
flow_id: flow_id.to_string(), flow_id,
flow_code: flow_code.map(|s| s.to_string()), flow_code: flow_code.map(|s| s.to_string()),
input: Some(serde_json::to_string(input).unwrap_or_default()), input: Some(serde_json::to_string(input).unwrap_or_default()),
output: Some(serde_json::to_string(output).unwrap_or_default()), output: Some(serde_json::to_string(output).unwrap_or_default()),
@ -95,10 +95,10 @@ impl FlowLogHandler for DatabaseLogHandler {
Ok(()) Ok(())
} }
async fn log_success(&self, flow_id: &str, flow_code: Option<&str>, input: &Value, output: &Value, logs: &[String], operator: Option<(i64, String)>, started_at: DateTime<FixedOffset>, duration_ms: i64) -> anyhow::Result<()> { async fn log_success(&self, flow_id: i64, flow_code: Option<&str>, input: &Value, output: &Value, logs: &[String], operator: Option<(i64, String)>, started_at: DateTime<FixedOffset>, duration_ms: i64) -> anyhow::Result<()> {
let (user_id, username) = operator.map(|(u, n)| (Some(u), Some(n))).unwrap_or((None, None)); let (user_id, username) = operator.map(|(u, n)| (Some(u), Some(n))).unwrap_or((None, None));
flow_run_log_service::create(&self.db, CreateRunLogInput { flow_run_log_service::create(&self.db, CreateRunLogInput {
flow_id: flow_id.to_string(), flow_id,
flow_code: flow_code.map(|s| s.to_string()), flow_code: flow_code.map(|s| s.to_string()),
input: Some(serde_json::to_string(input).unwrap_or_default()), input: Some(serde_json::to_string(input).unwrap_or_default()),
output: Some(serde_json::to_string(output).unwrap_or_default()), output: Some(serde_json::to_string(output).unwrap_or_default()),
@ -127,19 +127,19 @@ impl SseLogHandler {
#[async_trait] #[async_trait]
impl FlowLogHandler for SseLogHandler { impl FlowLogHandler for SseLogHandler {
async fn log_start(&self, _flow_id: &str, _flow_code: Option<&str>, _input: &Value, _operator: Option<(i64, String)>) -> anyhow::Result<()> { async fn log_start(&self, _flow_id: i64, _flow_code: Option<&str>, _input: &Value, _operator: Option<(i64, String)>) -> anyhow::Result<()> {
// SSE处理器也不需要记录开始事件 // SSE处理器也不需要记录开始事件
Ok(()) Ok(())
} }
async fn log_error(&self, flow_id: &str, flow_code: Option<&str>, input: &Value, error_msg: &str, operator: Option<(i64, String)>, started_at: DateTime<FixedOffset>, duration_ms: i64) -> anyhow::Result<()> { async fn log_error(&self, flow_id: i64, flow_code: Option<&str>, input: &Value, error_msg: &str, operator: Option<(i64, String)>, started_at: DateTime<FixedOffset>, duration_ms: i64) -> anyhow::Result<()> {
// 先推送SSE错误事件不在此处发送 done交由调用方统一携带 ctx/logs 发送) // 先推送SSE错误事件不在此处发送 done交由调用方统一携带 ctx/logs 发送)
crate::middlewares::sse::emit_error(&self.event_tx, error_msg.to_string()).await; crate::middlewares::sse::emit_error(&self.event_tx, error_msg.to_string()).await;
// 然后记录到数据库(仅错误信息) // 然后记录到数据库(仅错误信息)
let (user_id, username) = operator.map(|(u, n)| (Some(u), Some(n))).unwrap_or((None, None)); let (user_id, username) = operator.map(|(u, n)| (Some(u), Some(n))).unwrap_or((None, None));
flow_run_log_service::create(&self.db, CreateRunLogInput { flow_run_log_service::create(&self.db, CreateRunLogInput {
flow_id: flow_id.to_string(), flow_id,
flow_code: flow_code.map(|s| s.to_string()), flow_code: flow_code.map(|s| s.to_string()),
input: Some(serde_json::to_string(input).unwrap_or_default()), input: Some(serde_json::to_string(input).unwrap_or_default()),
output: None, output: None,
@ -153,7 +153,7 @@ impl FlowLogHandler for SseLogHandler {
Ok(()) Ok(())
} }
async fn log_error_detail(&self, flow_id: &str, flow_code: Option<&str>, input: &Value, output: &Value, logs: &[String], error_msg: &str, operator: Option<(i64, String)>, started_at: DateTime<FixedOffset>, duration_ms: i64) -> anyhow::Result<()> { async fn log_error_detail(&self, flow_id: i64, flow_code: Option<&str>, input: &Value, output: &Value, logs: &[String], error_msg: &str, operator: Option<(i64, String)>, started_at: DateTime<FixedOffset>, duration_ms: i64) -> anyhow::Result<()> {
// 先推送SSE错误事件不在此处发送 done交由调用方统一携带 ctx/logs 发送) // 先推送SSE错误事件不在此处发送 done交由调用方统一携带 ctx/logs 发送)
crate::middlewares::sse::emit_error(&self.event_tx, error_msg.to_string()).await; crate::middlewares::sse::emit_error(&self.event_tx, error_msg.to_string()).await;
@ -164,7 +164,7 @@ impl FlowLogHandler for SseLogHandler {
all_logs.push(error_msg.to_string()); all_logs.push(error_msg.to_string());
} }
flow_run_log_service::create(&self.db, CreateRunLogInput { flow_run_log_service::create(&self.db, CreateRunLogInput {
flow_id: flow_id.to_string(), flow_id,
flow_code: flow_code.map(|s| s.to_string()), flow_code: flow_code.map(|s| s.to_string()),
input: Some(serde_json::to_string(input).unwrap_or_default()), input: Some(serde_json::to_string(input).unwrap_or_default()),
output: Some(serde_json::to_string(output).unwrap_or_default()), output: Some(serde_json::to_string(output).unwrap_or_default()),
@ -178,14 +178,14 @@ impl FlowLogHandler for SseLogHandler {
Ok(()) Ok(())
} }
async fn log_success(&self, flow_id: &str, flow_code: Option<&str>, input: &Value, output: &Value, logs: &[String], operator: Option<(i64, String)>, started_at: DateTime<FixedOffset>, duration_ms: i64) -> anyhow::Result<()> { async fn log_success(&self, flow_id: i64, flow_code: Option<&str>, input: &Value, output: &Value, logs: &[String], operator: Option<(i64, String)>, started_at: DateTime<FixedOffset>, duration_ms: i64) -> anyhow::Result<()> {
// 先推送SSE完成事件 // 先推送SSE完成事件
crate::middlewares::sse::emit_done(&self.event_tx, true, output.clone(), logs.to_vec()).await; crate::middlewares::sse::emit_done(&self.event_tx, true, output.clone(), logs.to_vec()).await;
// 然后记录到数据库 // 然后记录到数据库
let (user_id, username) = operator.map(|(u, n)| (Some(u), Some(n))).unwrap_or((None, None)); let (user_id, username) = operator.map(|(u, n)| (Some(u), Some(n))).unwrap_or((None, None));
flow_run_log_service::create(&self.db, CreateRunLogInput { flow_run_log_service::create(&self.db, CreateRunLogInput {
flow_id: flow_id.to_string(), flow_id,
flow_code: flow_code.map(|s| s.to_string()), flow_code: flow_code.map(|s| s.to_string()),
input: Some(serde_json::to_string(input).unwrap_or_default()), input: Some(serde_json::to_string(input).unwrap_or_default()),
output: Some(serde_json::to_string(output).unwrap_or_default()), output: Some(serde_json::to_string(output).unwrap_or_default()),

View File

@ -70,6 +70,14 @@ async fn main() -> anyhow::Result<()> {
// run migrations // run migrations
migration::Migrator::up(&db, None).await.expect("migration up"); migration::Migrator::up(&db, None).await.expect("migration up");
// 初始化并启动调度器仅启动不加载DB
if let Err(e) = crate::utils::init_scheduler().await { tracing::error!(target = "udmin", error = %e, "init scheduler failed"); }
// 由 service 层加载启用任务并注册到调度器
if let Err(e) = services::schedule_job_service::init_load_enabled_and_register(&db).await {
tracing::error!(target = "udmin", error = %e, "init schedule jobs failed");
}
let allow_origins = std::env::var("CORS_ALLOW_ORIGINS").unwrap_or_else(|_| "http://localhost:5173".into()); let allow_origins = std::env::var("CORS_ALLOW_ORIGINS").unwrap_or_else(|_| "http://localhost:5173".into());
let origin_values: Vec<HeaderValue> = allow_origins let origin_values: Vec<HeaderValue> = allow_origins
.split(',') .split(',')

View File

@ -158,7 +158,7 @@ struct RunReq { #[serde(default)] input: serde_json::Value }
async fn run_sse( async fn run_sse(
State(db): State<Db>, State(db): State<Db>,
Path(id): Path<String>, Path(id): Path<i64>,
Query(q): Query<HashMap<String, String>>, Query(q): Query<HashMap<String, String>>,
headers: HeaderMap, headers: HeaderMap,
Json(req): Json<RunReq>, Json(req): Json<RunReq>,
@ -192,11 +192,11 @@ async fn run_sse(
// 启动后台任务运行流程,将事件通过 tx 发送 // 启动后台任务运行流程,将事件通过 tx 发送
let db_clone = db.clone(); let db_clone = db.clone();
let id_clone = id.clone(); let id_clone = id;
let input = req.input.clone(); let input = req.input.clone();
let user_info = Some((claims.uid, claims.sub)); let user_info = Some((claims.uid, claims.sub));
tokio::spawn(async move { tokio::spawn(async move {
let _ = flow_service::run_with_stream(db_clone, &id_clone, flow_service::RunReq { input }, user_info, tx).await; let _ = flow_service::run_with_stream(db_clone, id_clone, flow_service::RunReq { input }, user_info, tx).await;
}); });
// 由通用组件把 Receiver 包装为 SSE 响应 // 由通用组件把 Receiver 包装为 SSE 响应

View File

@ -42,7 +42,7 @@ use axum::extract::ws::{WebSocketUpgrade, WebSocket, Message, Utf8Bytes};
pub async fn run_ws( pub async fn run_ws(
State(db): State<Db>, State(db): State<Db>,
Path(id): Path<String>, Path(id): Path<i64>,
Query(q): Query<HashMap<String, String>>, Query(q): Query<HashMap<String, String>>,
headers: HeaderMap, headers: HeaderMap,
ws: WebSocketUpgrade, ws: WebSocketUpgrade,
@ -70,7 +70,7 @@ pub async fn run_ws(
} }
let db_clone = db.clone(); let db_clone = db.clone();
let id_clone = id.clone(); let id_clone = id;
let user_info = Some((claims.uid, claims.sub)); let user_info = Some((claims.uid, claims.sub));
Ok(ws.on_upgrade(move |socket| async move { Ok(ws.on_upgrade(move |socket| async move {
@ -78,7 +78,7 @@ pub async fn run_ws(
})) }))
} }
pub async fn handle_ws_flow(mut socket: WebSocket, db: Db, id: String, user_info: Option<(i64, String)>) { pub async fn handle_ws_flow(mut socket: WebSocket, db: Db, id: i64, user_info: Option<(i64, String)>) {
use tokio::time::{timeout, Duration}; use tokio::time::{timeout, Duration};
use tokio::select; use tokio::select;
use tokio::sync::mpsc; use tokio::sync::mpsc;
@ -106,9 +106,9 @@ pub async fn handle_ws_flow(mut socket: WebSocket, db: Db, id: String, user_info
// 后台运行流程 // 后台运行流程
let db2 = db.clone(); let db2 = db.clone();
let id2 = id.clone(); let id2 = id;
tokio::spawn(async move { tokio::spawn(async move {
let _ = flow_service::run_with_stream(db2, &id2, flow_service::RunReq { input: input_value }, user_info, tx).await; let _ = flow_service::run_with_stream(db2, id2, flow_service::RunReq { input: input_value }, user_info, tx).await;
}); });
// 转发事件到 WebSocket // 转发事件到 WebSocket

View File

@ -4,7 +4,7 @@ use sea_orm::entity::prelude::*;
#[sea_orm(table_name = "flows")] #[sea_orm(table_name = "flows")]
pub struct Model { pub struct Model {
#[sea_orm(primary_key)] #[sea_orm(primary_key)]
pub id: String, pub id: i64,
pub name: Option<String>, pub name: Option<String>,
pub yaml: Option<String>, pub yaml: Option<String>,
pub design_json: Option<String>, pub design_json: Option<String>,

View File

@ -5,7 +5,7 @@ use sea_orm::entity::prelude::*;
pub struct Model { pub struct Model {
#[sea_orm(primary_key)] #[sea_orm(primary_key)]
pub id: i64, pub id: i64,
pub flow_id: String, pub flow_id: i64,
// 新增:流程编码(可空) // 新增:流程编码(可空)
pub flow_code: Option<String>, pub flow_code: Option<String>,
pub input: Option<String>, pub input: Option<String>,

View File

@ -12,3 +12,4 @@ pub mod position;
pub mod user_position; pub mod user_position;
pub mod flow; pub mod flow;
pub mod flow_run_log; pub mod flow_run_log;
pub mod schedule_job;

View File

@ -0,0 +1,19 @@
use sea_orm::entity::prelude::*;
#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel, serde::Serialize, serde::Deserialize)]
#[sea_orm(table_name = "schedule_jobs")]
pub struct Model {
#[sea_orm(primary_key)]
pub id: String,
pub name: String,
pub cron_expr: String,
pub enabled: bool,
pub flow_code: String,
pub created_at: DateTimeWithTimeZone,
pub updated_at: DateTimeWithTimeZone,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}
impl ActiveModelBehavior for ActiveModel {}

View File

@ -28,7 +28,7 @@ async fn execute_flow(
let flow_id = flow_doc.id.clone(); let flow_id = flow_doc.id.clone();
info!(target = "udmin", "dynamic_api.execute_flow: found flow id={} for code={}", flow_id, flow_code); info!(target = "udmin", "dynamic_api.execute_flow: found flow id={} for code={}", flow_id, flow_code);
match flow_service::run(&db, &flow_id, flow_service::RunReq { input: payload }, Some((0, "接口".to_string()))).await { match flow_service::run(&db, flow_id, flow_service::RunReq { input: payload }, Some((0, "接口".to_string()))).await {
Ok(result) => { Ok(result) => {
info!(target = "udmin", "dynamic_api.execute_flow: execution successful flow_code={}", flow_code); info!(target = "udmin", "dynamic_api.execute_flow: execution successful flow_code={}", flow_code);
// 仅返回上下文中的 http_resp / http_response如果不存在则返回空对象 {} // 仅返回上下文中的 http_resp / http_response如果不存在则返回空对象 {}

View File

@ -83,26 +83,26 @@ struct UpdateReq { yaml: Option<String>, design_json: Option<serde_json::Value>,
Ok(Json(ApiResponse::ok(res))) Ok(Json(ApiResponse::ok(res)))
} }
async fn update(State(db): State<Db>, Path(id): Path<String>, Json(req): Json<UpdateReq>) -> Result<Json<ApiResponse<flow_service::FlowDoc>>, AppError> { async fn update(State(db): State<Db>, Path(id): Path<i64>, Json(req): Json<UpdateReq>) -> Result<Json<ApiResponse<flow_service::FlowDoc>>, AppError> {
let res = flow_service::update(&db, &id, flow_service::FlowUpdateReq { yaml: req.yaml, design_json: req.design_json, name: req.name, code: req.code, remark: req.remark }).await.map_err(flow_service::ae)?; let res = flow_service::update(&db, id, flow_service::FlowUpdateReq { yaml: req.yaml, design_json: req.design_json, name: req.name, code: req.code, remark: req.remark }).await.map_err(flow_service::ae)?;
Ok(Json(ApiResponse::ok(res))) Ok(Json(ApiResponse::ok(res)))
} }
async fn get_one(State(db): State<Db>, Path(id): Path<String>) -> Result<Json<ApiResponse<flow_service::FlowDoc>>, AppError> { async fn get_one(State(db): State<Db>, Path(id): Path<i64>) -> Result<Json<ApiResponse<flow_service::FlowDoc>>, AppError> {
let res = flow_service::get(&db, &id).await.map_err(flow_service::ae)?; let res = flow_service::get(&db, id).await.map_err(flow_service::ae)?;
Ok(Json(ApiResponse::ok(res))) Ok(Json(ApiResponse::ok(res)))
} }
async fn remove(State(db): State<Db>, Path(id): Path<String>) -> Result<Json<ApiResponse<serde_json::Value>>, AppError> { async fn remove(State(db): State<Db>, Path(id): Path<i64>) -> Result<Json<ApiResponse<serde_json::Value>>, AppError> {
flow_service::delete(&db, &id).await.map_err(flow_service::ae)?; flow_service::delete(&db, id).await.map_err(flow_service::ae)?;
Ok(Json(ApiResponse::ok(serde_json::json!({"deleted": true})))) Ok(Json(ApiResponse::ok(serde_json::json!({"deleted": true}))))
} }
#[derive(Deserialize)] #[derive(Deserialize)]
struct RunReq { #[serde(default)] input: serde_json::Value } struct RunReq { #[serde(default)] input: serde_json::Value }
async fn run(State(db): State<Db>, user: AuthUser, Path(id): Path<String>, Json(req): Json<RunReq>) -> Result<Json<ApiResponse<flow_service::RunResult>>, AppError> { async fn run(State(db): State<Db>, user: AuthUser, Path(id): Path<i64>, Json(req): Json<RunReq>) -> Result<Json<ApiResponse<flow_service::RunResult>>, AppError> {
match flow_service::run(&db, &id, flow_service::RunReq { input: req.input }, Some((user.uid, user.username))).await { match flow_service::run(&db, id, flow_service::RunReq { input: req.input }, Some((user.uid, user.username))).await {
Ok(r) => Ok(Json(ApiResponse::ok(r))), Ok(r) => Ok(Json(ApiResponse::ok(r))),
Err(e) => { Err(e) => {
// 同步执行:直接把后端错误详细信息返回给前端 // 同步执行:直接把后端错误详细信息返回给前端
@ -114,18 +114,18 @@ async fn run(State(db): State<Db>, user: AuthUser, Path(id): Path<String>, Json(
} }
// 新增SSE 流式运行端点,请求体沿用 RunReq只包含 input // 新增SSE 流式运行端点,请求体沿用 RunReq只包含 input
async fn run_stream(State(db): State<Db>, user: AuthUser, Path(id): Path<String>, Json(req): Json<RunReq>) -> Result<axum::response::sse::Sse<impl futures::Stream<Item = Result<axum::response::sse::Event, std::convert::Infallible>>>, AppError> { async fn run_stream(State(db): State<Db>, user: AuthUser, Path(id): Path<i64>, Json(req): Json<RunReq>) -> Result<axum::response::sse::Sse<impl futures::Stream<Item = Result<axum::response::sse::Event, std::convert::Infallible>>>, AppError> {
// 建立 mpsc 通道用于接收引擎的流式事件 // 建立 mpsc 通道用于接收引擎的流式事件
let (tx, rx) = tokio::sync::mpsc::channel::<crate::flow::context::StreamEvent>(16); let (tx, rx) = tokio::sync::mpsc::channel::<crate::flow::context::StreamEvent>(16);
// 启动后台任务运行流程,将事件通过 tx 发送 // 启动后台任务运行流程,将事件通过 tx 发送
let db_clone = db.clone(); let db_clone = db.clone();
let id_clone = id.clone(); let id_clone = id;
let input = req.input.clone(); let input = req.input.clone();
let user_info = Some((user.uid, user.username)); let user_info = Some((user.uid, user.username));
tokio::spawn(async move { tokio::spawn(async move {
// 复用 flow_service::run 内部大部分逻辑,但通过 DriveOptions 注入 event_tx // 复用 flow_service::run 内部大部分逻辑,但通过 DriveOptions 注入 event_tx
let _ = flow_service::run_with_stream(db_clone, &id_clone, flow_service::RunReq { input }, user_info, tx).await; let _ = flow_service::run_with_stream(db_clone, id_clone, flow_service::RunReq { input }, user_info, tx).await;
}); });
// 由通用组件把 Receiver 包装为 SSE 响应 // 由通用组件把 Receiver 包装为 SSE 响应
@ -136,7 +136,7 @@ async fn run_stream(State(db): State<Db>, user: AuthUser, Path(id): Path<String>
async fn run_ws( async fn run_ws(
State(db): State<Db>, State(db): State<Db>,
Path(id): Path<String>, Path(id): Path<i64>,
Query(q): Query<HashMap<String, String>>, Query(q): Query<HashMap<String, String>>,
headers: HeaderMap, headers: HeaderMap,
ws: WebSocketUpgrade, ws: WebSocketUpgrade,

View File

@ -8,6 +8,7 @@ pub mod logs;
pub mod flows; pub mod flows;
pub mod flow_run_logs; pub mod flow_run_logs;
pub mod dynamic_api; pub mod dynamic_api;
pub mod schedule_jobs;
use axum::Router; use axum::Router;
use crate::db::Db; use crate::db::Db;
@ -24,4 +25,5 @@ pub fn api_router() -> Router<Db> {
.merge(positions::router()) .merge(positions::router())
.merge(flow_run_logs::router()) .merge(flow_run_logs::router())
.merge(dynamic_api::router()) .merge(dynamic_api::router())
.merge(schedule_jobs::router())
} }

View File

@ -0,0 +1,76 @@
use axum::{Router, routing::{get, post, put}, extract::{State, Path, Query}, Json};
use crate::{db::Db, error::AppError, response::ApiResponse, services::{schedule_job_service, flow_service}, models::schedule_job};
use crate::middlewares::jwt::AuthUser;
use sea_orm::{EntityTrait};
pub fn router() -> Router<Db> {
Router::new()
.route("/schedule_jobs", get(list).post(create))
.route("/schedule_jobs/{id}", put(update).delete(remove))
// 新增:独立启停端点
.route("/schedule_jobs/{id}/enable", post(enable))
.route("/schedule_jobs/{id}/disable", post(disable))
// 新增:手动执行端点
.route("/schedule_jobs/{id}/execute", post(execute))
}
async fn list(State(db): State<Db>, Query(p): Query<schedule_job_service::ListParams>) -> Result<Json<ApiResponse<schedule_job_service::PageResp<schedule_job_service::ScheduleJobDoc>>>, AppError> {
let res = schedule_job_service::list(&db, p).await?;
Ok(Json(ApiResponse::ok(res)))
}
#[derive(serde::Deserialize)]
struct CreateReq { name: String, cron_expr: String, enabled: bool, flow_code: String }
async fn create(State(db): State<Db>, _user: AuthUser, Json(req): Json<CreateReq>) -> Result<Json<ApiResponse<schedule_job_service::ScheduleJobDoc>>, AppError> {
let res = schedule_job_service::create(&db, schedule_job_service::CreateReq { name: req.name, cron_expr: req.cron_expr, enabled: req.enabled, flow_code: req.flow_code }).await?;
Ok(Json(ApiResponse::ok(res)))
}
#[derive(serde::Deserialize)]
struct UpdateReq { name: Option<String>, cron_expr: Option<String>, enabled: Option<bool>, flow_code: Option<String> }
async fn update(State(db): State<Db>, _user: AuthUser, Path(id): Path<String>, Json(req): Json<UpdateReq>) -> Result<Json<ApiResponse<schedule_job_service::ScheduleJobDoc>>, AppError> {
let res = schedule_job_service::update(&db, &id, schedule_job_service::UpdateReq { name: req.name, cron_expr: req.cron_expr, enabled: req.enabled, flow_code: req.flow_code }).await?;
Ok(Json(ApiResponse::ok(res)))
}
async fn remove(State(db): State<Db>, _user: AuthUser, Path(id): Path<String>) -> Result<Json<ApiResponse<serde_json::Value>>, AppError> {
schedule_job_service::remove(&db, &id).await?;
Ok(Json(ApiResponse::ok(serde_json::json!({}))))
}
// 新增:启用指定任务(不需要请求体)
async fn enable(State(db): State<Db>, _user: AuthUser, Path(id): Path<String>) -> Result<Json<ApiResponse<schedule_job_service::ScheduleJobDoc>>, AppError> {
let res = schedule_job_service::update(&db, &id, schedule_job_service::UpdateReq { name: None, cron_expr: None, enabled: Some(true), flow_code: None }).await?;
Ok(Json(ApiResponse::ok(res)))
}
// 新增:禁用指定任务(不需要请求体)
async fn disable(State(db): State<Db>, _user: AuthUser, Path(id): Path<String>) -> Result<Json<ApiResponse<schedule_job_service::ScheduleJobDoc>>, AppError> {
let res = schedule_job_service::update(&db, &id, schedule_job_service::UpdateReq { name: None, cron_expr: None, enabled: Some(false), flow_code: None }).await?;
Ok(Json(ApiResponse::ok(res)))
}
// 新增:手动执行指定任务
async fn execute(State(db): State<Db>, user: AuthUser, Path(id): Path<String>) -> Result<Json<ApiResponse<serde_json::Value>>, AppError> {
// 1) 获取任务信息
let job = schedule_job::Entity::find_by_id(id.to_string())
.one(&db)
.await?
.ok_or(AppError::NotFound)?;
// 2) 通过 flow_code 获取流程
let flow_doc = flow_service::get_by_code(&db, &job.flow_code).await
.map_err(flow_service::ae)?;
// 3) 执行流程(使用空输入,操作者为当前用户)
let result = flow_service::run(
&db,
flow_doc.id,
flow_service::RunReq { input: serde_json::json!({}) },
Some((user.uid, user.username)),
).await.map_err(flow_service::ae)?;
Ok(Json(ApiResponse::ok(serde_json::to_value(result).map_err(|e| AppError::BadRequest(e.to_string()))?)))
}

View File

@ -6,12 +6,12 @@ use chrono::{DateTime, FixedOffset, Utc};
pub struct PageResp<T> { pub items: Vec<T>, pub total: u64, pub page: u64, pub page_size: u64 } pub struct PageResp<T> { pub items: Vec<T>, pub total: u64, pub page: u64, pub page_size: u64 }
#[derive(serde::Deserialize)] #[derive(serde::Deserialize)]
pub struct ListParams { pub page: Option<u64>, pub page_size: Option<u64>, pub flow_id: Option<String>, pub flow_code: Option<String>, pub user: Option<String>, pub ok: Option<bool> } pub struct ListParams { pub page: Option<u64>, pub page_size: Option<u64>, pub flow_id: Option<i64>, pub flow_code: Option<String>, pub user: Option<String>, pub ok: Option<bool> }
#[derive(serde::Serialize)] #[derive(serde::Serialize)]
pub struct RunLogItem { pub struct RunLogItem {
pub id: i64, pub id: i64,
pub flow_id: String, pub flow_id: i64,
pub flow_code: Option<String>, pub flow_code: Option<String>,
pub input: Option<String>, pub input: Option<String>,
pub output: Option<String>, pub output: Option<String>,
@ -30,7 +30,7 @@ impl From<flow_run_log::Model> for RunLogItem {
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug)] #[derive(serde::Serialize, serde::Deserialize, Clone, Debug)]
pub struct CreateRunLogInput { pub struct CreateRunLogInput {
pub flow_id: String, pub flow_id: i64,
pub flow_code: Option<String>, pub flow_code: Option<String>,
pub input: Option<String>, pub input: Option<String>,
pub output: Option<String>, pub output: Option<String>,

View File

@ -17,7 +17,7 @@ use crate::flow::engine::DriveError;
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FlowSummary { pub struct FlowSummary {
pub id: String, pub id: i64,
pub name: String, pub name: String,
#[serde(skip_serializing_if = "Option::is_none")] pub code: Option<String>, #[serde(skip_serializing_if = "Option::is_none")] pub code: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")] pub remark: Option<String>, #[serde(skip_serializing_if = "Option::is_none")] pub remark: Option<String>,
@ -27,7 +27,7 @@ pub struct FlowSummary {
} }
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FlowDoc { pub struct FlowDoc {
pub id: String, pub id: i64,
pub yaml: String, pub yaml: String,
#[serde(skip_serializing_if = "Option::is_none")] pub design_json: Option<serde_json::Value>, #[serde(skip_serializing_if = "Option::is_none")] pub design_json: Option<serde_json::Value>,
#[serde(skip_serializing_if = "Option::is_none")] pub name: Option<String>, #[serde(skip_serializing_if = "Option::is_none")] pub name: Option<String>,
@ -51,9 +51,15 @@ pub async fn list(db: &Db, page: u64, page_size: u64, keyword: Option<String>) -
let mut selector = db_flow::Entity::find(); let mut selector = db_flow::Entity::find();
if let Some(k) = keyword.filter(|s| !s.is_empty()) { if let Some(k) = keyword.filter(|s| !s.is_empty()) {
let like = format!("%{}%", k); let like = format!("%{}%", k);
// 名称模糊匹配 + 若关键字可解析为数字则按ID精确匹配
selector = selector.filter( selector = selector.filter(
db_flow::Column::Name.like(like.clone()) db_flow::Column::Name.like(like.clone())
.or(db_flow::Column::Id.like(like)) .or(
match k.parse::<i64>() {
Ok(num) => db_flow::Column::Id.eq(num),
Err(_) => db_flow::Column::Name.like(like),
}
)
); );
} }
let paginator = selector.order_by_desc(db_flow::Column::CreatedAt).paginate(db, page_size); let paginator = selector.order_by_desc(db_flow::Column::CreatedAt).paginate(db, page_size);
@ -61,13 +67,13 @@ pub async fn list(db: &Db, page: u64, page_size: u64, keyword: Option<String>) -
let models = paginator.fetch_page(if page > 0 { page - 1 } else { 0 }).await?; let models = paginator.fetch_page(if page > 0 { page - 1 } else { 0 }).await?;
let mut items: Vec<FlowSummary> = Vec::with_capacity(models.len()); let mut items: Vec<FlowSummary> = Vec::with_capacity(models.len());
for row in models.into_iter() { for row in models.into_iter() {
let id = row.id.clone(); let id = row.id;
let name = row let name = row
.name .name
.clone() .clone()
.or_else(|| row.yaml.as_deref().and_then(extract_name)) .or_else(|| row.yaml.as_deref().and_then(extract_name))
.unwrap_or_else(|| { .unwrap_or_else(|| {
let prefix: String = id.chars().take(8).collect(); let prefix: String = id.to_string().chars().take(8).collect();
format!("flow_{}", prefix) format!("flow_{}", prefix)
}); });
// 最近修改人从请求日志中查找最近一次对该flow的PUT请求 // 最近修改人从请求日志中查找最近一次对该flow的PUT请求
@ -98,7 +104,7 @@ pub async fn create(db: &Db, req: FlowCreateReq) -> anyhow::Result<FlowDoc> {
let _parsed: FlowDSL = serde_yaml::from_str(yaml).context("invalid flow yaml")?; let _parsed: FlowDSL = serde_yaml::from_str(yaml).context("invalid flow yaml")?;
info!(target: "udmin", "flow.create: yaml parsed ok"); info!(target: "udmin", "flow.create: yaml parsed ok");
} }
let id = crate::utils::generate_flow_id(); let id: i64 = crate::utils::generate_id();
let name = req let name = req
.name .name
.clone() .clone()
@ -110,7 +116,7 @@ pub async fn create(db: &Db, req: FlowCreateReq) -> anyhow::Result<FlowDoc> {
let ret_code = req.code.clone(); let ret_code = req.code.clone();
let ret_remark = req.remark.clone(); let ret_remark = req.remark.clone();
let am = db_flow::ActiveModel { let am = db_flow::ActiveModel {
id: Set(id.clone()), id: Set(id),
name: Set(name.clone()), name: Set(name.clone()),
yaml: Set(req.yaml.clone()), yaml: Set(req.yaml.clone()),
design_json: Set(design_json_str), design_json: Set(design_json_str),
@ -122,16 +128,14 @@ pub async fn create(db: &Db, req: FlowCreateReq) -> anyhow::Result<FlowDoc> {
..Default::default() ..Default::default()
}; };
info!(target: "udmin", "flow.create: inserting into db id={}", id); info!(target: "udmin", "flow.create: inserting into db id={}", id);
// Use exec() instead of insert() returning Model to avoid RecordNotInserted on non-AI PK
match db_flow::Entity::insert(am).exec(db).await { match db_flow::Entity::insert(am).exec(db).await {
Ok(_) => { Ok(_) => {
info!(target: "udmin", "flow.create: insert ok id={}", id); info!(target: "udmin", "flow.create: insert ok id={}", id);
Ok(FlowDoc { id, yaml: req.yaml.unwrap_or_default(), design_json: req.design_json, name: ret_name, code: ret_code, remark: ret_remark }) Ok(FlowDoc { id, yaml: req.yaml.unwrap_or_default(), design_json: req.design_json, name: ret_name, code: ret_code, remark: ret_remark })
} }
Err(DbErr::RecordNotInserted) => { Err(DbErr::RecordNotInserted) => {
// Workaround for MySQL + non-auto-increment PK: verify by reading back
error!(target: "udmin", "flow.create: insert returned RecordNotInserted, verifying by select id={}", id); error!(target: "udmin", "flow.create: insert returned RecordNotInserted, verifying by select id={}", id);
match db_flow::Entity::find_by_id(id.clone()).one(db).await { match db_flow::Entity::find_by_id(id).one(db).await {
Ok(Some(_)) => { Ok(Some(_)) => {
info!(target: "udmin", "flow.create: found inserted row by id={}, treating as success", id); info!(target: "udmin", "flow.create: found inserted row by id={}, treating as success", id);
Ok(FlowDoc { id, yaml: req.yaml.unwrap_or_default(), design_json: req.design_json, name, code: req.code, remark: req.remark }) Ok(FlowDoc { id, yaml: req.yaml.unwrap_or_default(), design_json: req.design_json, name, code: req.code, remark: req.remark })
@ -147,12 +151,11 @@ pub async fn create(db: &Db, req: FlowCreateReq) -> anyhow::Result<FlowDoc> {
} }
} }
pub async fn get(db: &Db, id: &str) -> anyhow::Result<FlowDoc> { pub async fn get(db: &Db, id: i64) -> anyhow::Result<FlowDoc> {
let row = db_flow::Entity::find_by_id(id.to_string()).one(db).await?; let row = db_flow::Entity::find_by_id(id).one(db).await?;
let row = row.ok_or_else(|| anyhow::anyhow!("not found"))?; let row = row.ok_or_else(|| anyhow::anyhow!("not found"))?;
let yaml = row.yaml.unwrap_or_default(); let yaml = row.yaml.unwrap_or_default();
let design_json = row.design_json.and_then(|s| serde_json::from_str::<serde_json::Value>(&s).ok()); let design_json = row.design_json.and_then(|s| serde_json::from_str::<serde_json::Value>(&s).ok());
// 名称兜底:数据库 name 为空时,尝试从 YAML 提取
let name = row let name = row
.name .name
.clone() .clone()
@ -176,11 +179,11 @@ pub async fn get_by_code(db: &Db, code: &str) -> anyhow::Result<FlowDoc> {
Ok(FlowDoc { id: row.id, yaml, design_json, name, code: row.code, remark: row.remark }) Ok(FlowDoc { id: row.id, yaml, design_json, name, code: row.code, remark: row.remark })
} }
pub async fn update(db: &Db, id: &str, req: FlowUpdateReq) -> anyhow::Result<FlowDoc> { pub async fn update(db: &Db, id: i64, req: FlowUpdateReq) -> anyhow::Result<FlowDoc> {
if let Some(yaml) = &req.yaml { if let Some(yaml) = &req.yaml {
let _parsed: FlowDSL = serde_yaml::from_str(yaml).context("invalid flow yaml")?; let _parsed: FlowDSL = serde_yaml::from_str(yaml).context("invalid flow yaml")?;
} }
let row = db_flow::Entity::find_by_id(id.to_string()).one(db).await?; let row = db_flow::Entity::find_by_id(id).one(db).await?;
let Some(row) = row else { return Err(anyhow::anyhow!("not found")); }; let Some(row) = row else { return Err(anyhow::anyhow!("not found")); };
let mut am: db_flow::ActiveModel = row.into(); let mut am: db_flow::ActiveModel = row.into();
@ -192,45 +195,36 @@ pub async fn update(db: &Db, id: &str, req: FlowUpdateReq) -> anyhow::Result<Flo
am.yaml = Set(Some(yaml.clone())); am.yaml = Set(Some(yaml.clone()));
} else if let Some(n) = req.name { am.name = Set(Some(n)); } } else if let Some(n) = req.name { am.name = Set(Some(n)); }
if let Some(dj) = req.design_json { if let Some(dj) = req.design_json { let s = serde_json::to_string(&dj)?; am.design_json = Set(Some(s)); }
let s = serde_json::to_string(&dj)?;
am.design_json = Set(Some(s));
}
if let Some(c) = req.code { am.code = Set(Some(c)); } if let Some(c) = req.code { am.code = Set(Some(c)); }
if let Some(r) = req.remark { am.remark = Set(Some(r)); } if let Some(r) = req.remark { am.remark = Set(Some(r)); }
// update timestamp
am.updated_at = Set(Utc::now().with_timezone(&FixedOffset::east_opt(0).unwrap())); am.updated_at = Set(Utc::now().with_timezone(&FixedOffset::east_opt(0).unwrap()));
am.update(db).await?; am.update(db).await?;
// return latest yaml let got = db_flow::Entity::find_by_id(id).one(db).await?.unwrap();
let got = db_flow::Entity::find_by_id(id.to_string()).one(db).await?.unwrap();
let dj = got.design_json.as_deref().and_then(|s| serde_json::from_str::<serde_json::Value>(&s).ok()); let dj = got.design_json.as_deref().and_then(|s| serde_json::from_str::<serde_json::Value>(&s).ok());
Ok(FlowDoc { id: id.to_string(), yaml: got.yaml.unwrap_or_default(), design_json: dj, name: got.name, code: got.code, remark: got.remark }) Ok(FlowDoc { id, yaml: got.yaml.unwrap_or_default(), design_json: dj, name: got.name, code: got.code, remark: got.remark })
} }
pub async fn delete(db: &Db, id: &str) -> anyhow::Result<()> { pub async fn delete(db: &Db, id: i64) -> anyhow::Result<()> {
let row = db_flow::Entity::find_by_id(id.to_string()).one(db).await?; let row = db_flow::Entity::find_by_id(id).one(db).await?;
let Some(row) = row else { return Err(anyhow::anyhow!("not found")); }; let Some(row) = row else { return Err(anyhow::anyhow!("not found")); };
let am: db_flow::ActiveModel = row.into(); let am: db_flow::ActiveModel = row.into();
am.delete(db).await?; am.delete(db).await?;
Ok(()) Ok(())
} }
pub async fn run(db: &Db, id: &str, req: RunReq, operator: Option<(i64, String)>) -> anyhow::Result<RunResult> { pub async fn run(db: &Db, id: i64, req: RunReq, operator: Option<(i64, String)>) -> anyhow::Result<RunResult> {
let log_handler = DatabaseLogHandler::new(db.clone()); let log_handler = DatabaseLogHandler::new(db.clone());
match run_internal(db, id, req, operator, &log_handler, None).await { match run_internal(db, id, req, operator, &log_handler, None).await {
Ok((ctx, logs)) => Ok(RunResult { ok: true, ctx, logs }), Ok((ctx, logs)) => Ok(RunResult { ok: true, ctx, logs }),
Err(e) => { Err(e) => {
// 将运行期错误转换为 ok=false并尽量带上部分 ctx/logs
if let Some(de) = e.downcast_ref::<DriveError>().cloned() { if let Some(de) = e.downcast_ref::<DriveError>().cloned() {
Ok(RunResult { ok: false, ctx: de.ctx, logs: de.logs }) Ok(RunResult { ok: false, ctx: de.ctx, logs: de.logs })
} else { } else {
let mut full = e.to_string(); let mut full = e.to_string();
for cause in e.chain().skip(1) { for cause in e.chain().skip(1) { full.push_str(" | "); full.push_str(&cause.to_string()); }
full.push_str(" | ");
full.push_str(&cause.to_string());
}
Ok(RunResult { ok: false, ctx: serde_json::json!({}), logs: vec![full] }) Ok(RunResult { ok: false, ctx: serde_json::json!({}), logs: vec![full] })
} }
} }
@ -240,18 +234,16 @@ pub async fn run(db: &Db, id: &str, req: RunReq, operator: Option<(i64, String)>
// 新增:流式运行,向外发送节点事件与最终完成事件 // 新增:流式运行,向外发送节点事件与最终完成事件
pub async fn run_with_stream( pub async fn run_with_stream(
db: Db, db: Db,
id: &str, id: i64,
req: RunReq, req: RunReq,
operator: Option<(i64, String)>, operator: Option<(i64, String)>,
event_tx: Sender<StreamEvent>, event_tx: Sender<StreamEvent>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
// clone 一份用于错误时补发 done
let tx_done = event_tx.clone(); let tx_done = event_tx.clone();
let log_handler = SseLogHandler::new(db.clone(), event_tx.clone()); let log_handler = SseLogHandler::new(db.clone(), event_tx.clone());
match run_internal(&db, id, req, operator, &log_handler, Some(event_tx)).await { match run_internal(&db, id, req, operator, &log_handler, Some(event_tx)).await {
Ok((_ctx, _logs)) => Ok(()), // 正常路径log_success 内已发送 done(true,...) Ok((_ctx, _logs)) => Ok(()),
Err(e) => { Err(e) => {
// 错误路径:先在 log_error 中已发送 error 事件;此处补发 done(false,...)
if let Some(de) = e.downcast_ref::<DriveError>().cloned() { if let Some(de) = e.downcast_ref::<DriveError>().cloned() {
crate::middlewares::sse::emit_done(&tx_done, false, de.ctx, de.logs).await; crate::middlewares::sse::emit_done(&tx_done, false, de.ctx, de.logs).await;
} else { } else {
@ -267,23 +259,17 @@ pub async fn run_with_stream(
// 内部统一的运行方法 // 内部统一的运行方法
async fn run_internal( async fn run_internal(
db: &Db, db: &Db,
id: &str, id: i64,
req: RunReq, req: RunReq,
operator: Option<(i64, String)>, operator: Option<(i64, String)>,
log_handler: &dyn FlowLogHandler, log_handler: &dyn FlowLogHandler,
event_tx: Option<Sender<StreamEvent>>, event_tx: Option<Sender<StreamEvent>>,
) -> anyhow::Result<(serde_json::Value, Vec<String>)> { ) -> anyhow::Result<(serde_json::Value, Vec<String>)> {
// 使用传入的 event_tx当启用 SSE 时由路由层提供)
info!(target = "udmin", "flow.run_internal: start id={}", id); info!(target = "udmin", "flow.run_internal: start id={}", id);
let start = Utc::now().with_timezone(&FixedOffset::east_opt(0).unwrap()); let start = Utc::now().with_timezone(&FixedOffset::east_opt(0).unwrap());
// 获取流程编码 let flow_code: Option<String> = match db_flow::Entity::find_by_id(id).one(db).await { Ok(Some(row)) => row.code, _ => None };
let flow_code: Option<String> = match db_flow::Entity::find_by_id(id.to_string()).one(db).await {
Ok(Some(row)) => row.code,
_ => None,
};
// 获取流程文档
let doc = match get(db, id).await { let doc = match get(db, id).await {
Ok(d) => d, Ok(d) => d,
Err(e) => { Err(e) => {
@ -380,7 +366,6 @@ async fn run_internal(
Err(e) => { Err(e) => {
error!(target = "udmin", error = ?e, "flow.run_internal: engine drive failed id={}", id); error!(target = "udmin", error = ?e, "flow.run_internal: engine drive failed id={}", id);
let dur = (Utc::now().with_timezone(&FixedOffset::east_opt(0).unwrap()) - start).num_milliseconds() as i64; let dur = (Utc::now().with_timezone(&FixedOffset::east_opt(0).unwrap()) - start).num_milliseconds() as i64;
// 优先记录详细错误(包含部分 ctx 与累计 logs
if let Some(de) = e.downcast_ref::<DriveError>().cloned() { if let Some(de) = e.downcast_ref::<DriveError>().cloned() {
log_handler log_handler
.log_error_detail( .log_error_detail(

View File

@ -3,8 +3,8 @@ pub mod user_service;
pub mod role_service; pub mod role_service;
pub mod menu_service; pub mod menu_service;
pub mod department_service; pub mod department_service;
pub mod log_service;
// 新增岗位服务
pub mod position_service; pub mod position_service;
pub mod log_service;
pub mod flow_service; pub mod flow_service;
pub mod flow_run_log_service; pub mod flow_run_log_service;
pub mod schedule_job_service;

View File

@ -0,0 +1,315 @@
//! 模块定时任务服务Service Layer
//! 职责:
//! 1) 负责定时任务schedule_jobs的数据库增删改查
//! 2) 在创建/更新/删除后,与调度器进行同步(注册、更新、移除);
//! 3) 服务启动时加载已启用任务并注册到调度器;
//! 4) 构建任务执行闭包JobExecutor内部做运行期防御与流程触发。
use std::{future::Future, pin::Pin, sync::Arc};
use chrono::{DateTime, FixedOffset, Utc};
use sea_orm::{
ActiveModelTrait, ColumnTrait, EntityTrait, PaginatorTrait, QueryFilter, QueryOrder, Set,
};
use tokio_cron_scheduler::Job;
use tracing::{error, info};
use crate::{
db::Db,
error::AppError,
models::schedule_job,
services::flow_service,
utils,
};
/// 通用分页响应体
#[derive(serde::Serialize)]
pub struct PageResp<T> {
pub items: Vec<T>,
pub total: u64,
pub page: u64,
pub page_size: u64,
}
/// 任务文档DTO
#[derive(serde::Deserialize, serde::Serialize, Clone, Debug)]
pub struct ScheduleJobDoc {
pub id: String,
pub name: String,
pub cron_expr: String,
pub enabled: bool,
pub flow_code: String,
pub created_at: DateTime<FixedOffset>,
pub updated_at: DateTime<FixedOffset>,
}
impl From<schedule_job::Model> for ScheduleJobDoc {
fn from(m: schedule_job::Model) -> Self {
Self {
id: m.id,
name: m.name,
cron_expr: m.cron_expr,
enabled: m.enabled,
flow_code: m.flow_code,
created_at: m.created_at,
updated_at: m.updated_at,
}
}
}
/// 列表查询参数
#[derive(serde::Deserialize)]
pub struct ListParams {
pub page: Option<u64>,
pub page_size: Option<u64>,
pub keyword: Option<String>,
pub enabled: Option<bool>,
}
/// 创建任务请求体
#[derive(serde::Deserialize)]
pub struct CreateReq {
pub name: String,
pub cron_expr: String,
pub enabled: bool,
pub flow_code: String,
}
/// 更新任务请求体(部分字段可选)
#[derive(serde::Deserialize)]
pub struct UpdateReq {
pub name: Option<String>,
pub cron_expr: Option<String>,
pub enabled: Option<bool>,
pub flow_code: Option<String>,
}
/// 获取当前 UTC 时间并转为固定偏移(避免多处重复)
fn now_fixed_offset() -> DateTime<FixedOffset> {
Utc::now().with_timezone(&FixedOffset::east_opt(0).unwrap())
}
/// 分页查询任务列表,支持按名称关键字与启用状态筛选
pub async fn list(db: &Db, p: ListParams) -> Result<PageResp<ScheduleJobDoc>, AppError> {
let page = p.page.unwrap_or(1);
let page_size = p.page_size.unwrap_or(10);
let mut query = schedule_job::Entity::find();
if let Some(k) = p.keyword {
query = query.filter(schedule_job::Column::Name.contains(&k));
}
if let Some(e) = p.enabled {
query = query.filter(schedule_job::Column::Enabled.eq(e));
}
let paginator = query
.order_by_desc(schedule_job::Column::UpdatedAt)
.paginate(db, page_size);
let total = paginator.num_items().await?;
let docs: Vec<ScheduleJobDoc> = paginator
.fetch_page(page.saturating_sub(1))
.await?
.into_iter()
.map(Into::into)
.collect();
let sample: Vec<_> = docs.iter().take(5).map(|d| (d.id.clone(), d.enabled)).collect();
info!(
target = "udmin",
total = %total,
page = %page,
page_size = %page_size,
sample = ?sample,
"schedule_jobs.list"
);
Ok(PageResp { items: docs, total, page, page_size })
}
/// 创建任务
pub async fn create(db: &Db, req: CreateReq) -> Result<ScheduleJobDoc, AppError> {
// 1) 校验 cron 表达式
Job::new_async(&req.cron_expr, |_uuid, _l| Box::pin(async {}))
.map_err(|e| AppError::BadRequest(format!("无效的 cron 表达式: {e}")))?;
// 2) 校验 flow_code 唯一启用性
if req.enabled
&& schedule_job::Entity::find()
.filter(schedule_job::Column::FlowCode.eq(req.flow_code.clone()))
.filter(schedule_job::Column::Enabled.eq(true))
.one(db)
.await?
.is_some()
{
return Err(AppError::Conflict("同一 flow_code 已存在启用中的任务".into()));
}
// 3) 入库
let am = schedule_job::ActiveModel {
id: Set(uuid::Uuid::new_v4().to_string()),
name: Set(req.name),
cron_expr: Set(req.cron_expr),
enabled: Set(req.enabled),
flow_code: Set(req.flow_code),
created_at: Set(now_fixed_offset()),
updated_at: Set(now_fixed_offset()),
};
let m = am.insert(db).await?;
// 4) 同步调度器
let executor = build_executor_for_job(db, &m);
utils::add_or_update_job_by_model(&m, executor).await.map_err(AppError::Anyhow)?;
Ok(m.into())
}
/// 更新任务
pub async fn update(db: &Db, id: &str, req: UpdateReq) -> Result<ScheduleJobDoc, AppError> {
let m = schedule_job::Entity::find_by_id(id.to_string())
.one(db)
.await?
.ok_or(AppError::NotFound)?;
let next_name = req.name.unwrap_or_else(|| m.name.clone());
let next_cron = req.cron_expr.unwrap_or_else(|| m.cron_expr.clone());
let next_enabled = req.enabled.unwrap_or(m.enabled);
let next_flow_code = req.flow_code.unwrap_or_else(|| m.flow_code.clone());
Job::new_async(&next_cron, |_uuid, _l| Box::pin(async {}))
.map_err(|e| AppError::BadRequest(format!("无效的 cron 表达式: {e}")))?;
if next_enabled
&& schedule_job::Entity::find()
.filter(schedule_job::Column::FlowCode.eq(next_flow_code.clone()))
.filter(schedule_job::Column::Enabled.eq(true))
.filter(schedule_job::Column::Id.ne(id.to_string()))
.one(db)
.await?
.is_some()
{
return Err(AppError::Conflict("同一 flow_code 已存在启用中的任务".into()));
}
info!(
target = "udmin",
id = %m.id,
prev_enabled = %m.enabled,
next_enabled = %next_enabled,
"schedule_jobs.update.apply"
);
let mut am: schedule_job::ActiveModel = m.into();
am.name = Set(next_name);
am.cron_expr = Set(next_cron);
am.enabled = Set(next_enabled);
am.flow_code = Set(next_flow_code);
am.updated_at = Set(now_fixed_offset());
let updated = am.update(db).await?;
info!(
target = "udmin",
id = %updated.id,
enabled = %updated.enabled,
"schedule_jobs.update.persisted"
);
let executor = build_executor_for_job(db, &updated);
utils::add_or_update_job_by_model(&updated, executor).await.map_err(AppError::Anyhow)?;
Ok(updated.into())
}
/// 删除任务
pub async fn remove(db: &Db, id: &str) -> Result<(), AppError> {
utils::remove_job_by_id(id).await.map_err(AppError::Anyhow)?;
let res = schedule_job::Entity::delete_by_id(id.to_string()).exec(db).await?;
if res.rows_affected == 0 {
return Err(AppError::NotFound);
}
Ok(())
}
/// 启动时加载并注册所有启用任务
pub async fn init_load_enabled_and_register(db: &Db) -> Result<(), AppError> {
let enabled_jobs = schedule_job::Entity::find()
.filter(schedule_job::Column::Enabled.eq(true))
.all(db)
.await?;
info!(
target = "udmin",
count = enabled_jobs.len(),
"schedule_jobs.init.load_enabled"
);
for m in enabled_jobs {
let executor = build_executor_for_job(db, &m);
if let Err(e) = utils::add_or_update_job_by_model(&m, executor).await {
error!(
target = "udmin",
id = %m.id,
error = %e,
"schedule_jobs.init.add_failed"
);
}
}
Ok(())
}
/// 构建任务执行闭包JobExecutor
fn build_executor_for_job(db: &Db, m: &schedule_job::Model) -> utils::JobExecutor {
let db = db.clone();
let job_id = m.id.clone();
let job_name = m.name.clone();
let flow_code = m.flow_code.clone();
Arc::new(move || {
let db = db.clone();
let job_id = job_id.clone();
let job_name = job_name.clone();
let flow_code = flow_code.clone();
Box::pin(async move {
// 运行期防御
match schedule_job::Entity::find_by_id(job_id.clone()).one(&db).await {
Ok(Some(model)) if !model.enabled => {
info!(target = "udmin", job = %job_name, id = %job_id, "scheduler.tick.skip_disabled");
return;
}
Ok(None) => {
info!(target = "udmin", job = %job_name, id = %job_id, "scheduler.tick.deleted_remove_self");
if let Err(e) = utils::remove_job_by_id(&job_id).await {
error!(target = "udmin", id = %job_id, error = %e, "scheduler.self_remove.failed");
}
return;
}
Err(e) => {
error!(target = "udmin", job = %job_name, id = %job_id, error = %e, "scheduler.tick.check_failed");
return;
}
_ => {}
}
// 触发流程执行
info!(target = "udmin", job = %job_name, flow_code = %flow_code, "scheduler.tick.start");
match flow_service::get_by_code(&db, &flow_code).await {
Ok(doc) => {
let id = doc.id.clone();
if let Err(e) = flow_service::run(
&db,
id,
flow_service::RunReq { input: serde_json::json!({}) },
Some((0, "调度".to_string())),
).await {
error!(target = "udmin", job = %job_name, flow_code = %flow_code, error = %e, "scheduler.tick.run_failed");
}
}
Err(e) => {
error!(target = "udmin", job = %job_name, flow_code = %flow_code, error = %e, "scheduler.tick.flow_not_found");
}
}
}) as Pin<Box<dyn Future<Output = ()> + Send>>
})
}

View File

@ -56,15 +56,13 @@ pub fn parse_biz_id(id: i64) -> (u16, u8, i64) {
(main_id, sub_id, base_id) (main_id, sub_id, base_id)
} }
// --- 具体业务场景:Flow 使用的一组常量(可按需扩展/调整) --- // --- 具体业务场景:main/sub 为 1/1 的通用 ID 场景 ---
// 你可以把这些常量提到配置或用枚举维护各业务的 main/sub 编码
const FLOW_MAIN_ID: u16 = 1; const FLOW_MAIN_ID: u16 = 1;
const FLOW_SUB_ID: u8 = 1; const FLOW_SUB_ID: u8 = 1;
/// 生成 Flow 的 ID,返回十进制字符串,便于与原先 string 类型主键兼容 /// 通用 ID 生成main_id=1、sub_id=1,返回十进制字符串与原先 string 类型主键兼容
pub fn generate_flow_id() -> String { pub fn generate_id() -> i64 {
let id = generate_biz_id(BizIdConfig::new(FLOW_MAIN_ID, FLOW_SUB_ID)); generate_biz_id(BizIdConfig::new(FLOW_MAIN_ID, FLOW_SUB_ID))
id.to_string()
} }
// --- 日志类 ID 的业务位定义与生成 --- // --- 日志类 ID 的业务位定义与生成 ---

View File

@ -1,4 +1,6 @@
pub mod password; pub mod password;
pub mod ids; pub mod ids;
pub mod scheduler;
pub use ids::{init_from_env, generate_biz_id, parse_biz_id, generate_flow_id, BizIdConfig, generate_flow_run_log_id, generate_request_log_id}; pub use ids::{init_from_env, generate_biz_id, parse_biz_id, generate_id, BizIdConfig, generate_flow_run_log_id, generate_request_log_id};
pub use scheduler::{init_and_start as init_scheduler, add_or_update_job_by_model, remove_job_by_id, JobExecutor};

View File

@ -0,0 +1,99 @@
use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use once_cell::sync::OnceCell;
use tokio::sync::Mutex;
use tokio_cron_scheduler::{JobScheduler, Job};
use tracing::{error, info};
use uuid::Uuid;
use crate::models::schedule_job;
static SCHEDULER: OnceCell<Mutex<JobScheduler>> = OnceCell::new();
static JOB_GUIDS: OnceCell<Mutex<HashMap<String, Uuid>>> = OnceCell::new();
pub type JobExecutor = Arc<dyn Fn() -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>;
fn scheduler() -> &'static Mutex<JobScheduler> {
SCHEDULER
.get()
.expect("Scheduler not initialized. Call init_and_start() early in main.")
}
fn job_guids() -> &'static Mutex<HashMap<String, Uuid>> {
JOB_GUIDS
.get()
.expect("JOB_GUIDS not initialized. Call init_and_start() early in main.")
}
pub async fn init_and_start() -> anyhow::Result<()> {
if SCHEDULER.get().is_some() {
return Ok(());
}
let js = JobScheduler::new().await?;
SCHEDULER.set(Mutex::new(js)).ok().expect("set scheduler once");
JOB_GUIDS.set(Mutex::new(HashMap::new())).ok().expect("set job_guids once");
// 仅启动调度器,不进行任何数据库加载,数据库加载应由 service 层负责
let lock = scheduler().lock().await;
lock.start().await?;
drop(lock);
info!(target = "udmin", "scheduler started");
Ok(())
}
/// 新增或更新一个任务(根据 model.id 作为唯一标识)。
/// - 如果已存在,则先移除旧任务再按最新 cron 重新创建;
/// - 当 enabled=false 时,仅执行移除逻辑,不会重新添加。
pub async fn add_or_update_job_by_model(m: &schedule_job::Model, executor: JobExecutor) -> anyhow::Result<()> {
// 如果已有旧的 job先移除
if let Some(old) = { job_guids().lock().await.get(&m.id).cloned() } {
let js = scheduler().lock().await;
if let Err(e) = js.remove(&old).await {
error!(target = "udmin", id = %m.id, guid = %old, error = %e, "scheduler.remove old failed");
}
job_guids().lock().await.remove(&m.id);
}
if !m.enabled {
return Ok(());
}
// 构造异步 Job仅负责调度触发不做数据库操作
let cron_expr = m.cron_expr.clone();
let name = m.name.clone();
let job_id = m.id.clone();
let exec_arc = executor.clone();
let job = Job::new_async(cron_expr.as_str(), move |_uuid, _l| {
let job_name = name.clone();
let job_id = job_id.clone();
let exec = exec_arc.clone();
Box::pin(async move {
info!(target = "udmin", job = %job_name, id = %job_id, "scheduler.tick: start");
exec().await;
})
})?;
let guid = job.guid();
{
let js = scheduler().lock().await;
js.add(job).await?;
}
job_guids().lock().await.insert(m.id.clone(), guid);
info!(target = "udmin", id = %m.id, guid = %guid, "scheduler.add: ok");
Ok(())
}
pub async fn remove_job_by_id(id: &str) -> anyhow::Result<()> {
if let Some(g) = { job_guids().lock().await.get(id).cloned() } {
let js = scheduler().lock().await;
if let Err(e) = js.remove(&g).await {
error!(target = "udmin", id = %id, guid = %g, error = %e, "scheduler.remove: failed");
}
job_guids().lock().await.remove(id);
info!(target = "udmin", id = %id, guid = %g, "scheduler.remove: ok");
}
Ok(())
}

View File

@ -16,6 +16,7 @@ import FlowList from './pages/FlowList'
// 引入流程编辑器 // 引入流程编辑器
import { Flows } from './flows' import { Flows } from './flows'
import FlowRunLogs from './pages/FlowRunLogs' import FlowRunLogs from './pages/FlowRunLogs'
import ScheduleJobs from './pages/ScheduleJobs'
function RequireAuth({ children }: { children: any }) { function RequireAuth({ children }: { children: any }) {
const token = getToken() const token = getToken()
@ -43,6 +44,8 @@ export default function App() {
<Route path="/flows/editor" element={<Flows />} /> <Route path="/flows/editor" element={<Flows />} />
{/* 流程运行日志 */} {/* 流程运行日志 */}
<Route path="/flow-run-logs" element={<FlowRunLogs />} /> <Route path="/flow-run-logs" element={<FlowRunLogs />} />
{/* 调度任务管理 */}
<Route path="/schedule-jobs" element={<ScheduleJobs />} />
</Route> </Route>
<Route path="*" element={<Navigate to="/" replace />} /> <Route path="*" element={<Navigate to="/" replace />} />
</Routes> </Routes>

View File

@ -652,3 +652,4 @@ export default function MainLayout() {
</PermissionProvider> </PermissionProvider>
) )
} }
// 说明:菜单完全依赖后端返回的路径,若需要本地添加“调度任务管理”菜单,请在后端创建菜单项 path: '/schedule-jobs',前端会自动展示。

View File

@ -0,0 +1,282 @@
import React, { useEffect, useMemo, useState } from 'react'
import { Button, Form, Input, Modal, Popconfirm, Select, Space, Switch, Table, Tag, message } from 'antd'
import type { ColumnsType } from 'antd/es/table'
import api from '../utils/axios'
import { formatDateTime } from '../utils/datetime'
import PageHeader from '../components/PageHeader'
import { EditOutlined, DeleteOutlined } from '@ant-design/icons'
import { PlusOutlined, ReloadOutlined, StopOutlined, CheckCircleOutlined, PlayCircleOutlined } from '@ant-design/icons'
interface ScheduleJobItem {
id: string
name: string
cron_expr: string
enabled: boolean
flow_code: string
created_at?: string
updated_at?: string
}
interface PageResp<T> { items: T[]; total: number; page: number; page_size: number }
interface FlowOption { label: string; value: string }
export default function ScheduleJobs() {
const [loading, setLoading] = useState(false)
const [data, setData] = useState<ScheduleJobItem[]>([])
const [total, setTotal] = useState(0)
const [page, setPage] = useState(1)
const [pageSize, setPageSize] = useState(10)
const [keyword, setKeyword] = useState('')
const [enabledFilter, setEnabledFilter] = useState<'all' | 'true' | 'false'>('all')
const [modalOpen, setModalOpen] = useState(false)
const [editing, setEditing] = useState<ScheduleJobItem | null>(null)
const [form] = Form.useForm()
const [flowOptions, setFlowOptions] = useState<FlowOption[]>([])
const fetchJobs = async (p: number = page, ps: number = pageSize, kw: string = keyword, ef: 'all' | 'true' | 'false' = enabledFilter) => {
setLoading(true)
try {
const params: any = { page: p, page_size: ps, keyword: kw }
if (ef !== 'all') params.enabled = ef === 'true'
const { data } = await api.get('/schedule_jobs', { params })
if (data?.code === 0) {
const resp = data.data as PageResp<ScheduleJobItem>
setData(Array.isArray(resp.items) ? resp.items : [])
setTotal(Number(resp.total || 0))
setPage(Number(resp.page || p))
setPageSize(Number(resp.page_size || ps))
} else {
throw new Error(data?.message || '获取任务列表失败')
}
} catch (e: any) {
message.error(e.message || '获取任务列表失败')
} finally { setLoading(false) }
}
const fetchFlowOptions = async () => {
try {
const { data } = await api.get('/flows', { params: { page: 1, page_size: 1000 } })
if (data?.code === 0) {
const items = (data.data?.items || []) as any[]
setFlowOptions(items.map(it => ({ label: `${it.name || it.code} (${it.code})`, value: it.code })))
}
} catch (e) {
// ignore silently
}
}
useEffect(() => { fetchJobs(1, pageSize, keyword, enabledFilter) }, [])
const columns: ColumnsType<ScheduleJobItem> = useMemo(() => [
{ title: '名称', dataIndex: 'name', key: 'name' },
{ title: '流程编码', dataIndex: 'flow_code', key: 'flow_code', render: (v: string) => <Tag color="blue">{v}</Tag> },
{ title: 'Cron 表达式', dataIndex: 'cron_expr', key: 'cron_expr', render: (v: string) => <code style={{ fontFamily: 'monospace' }}>{v}</code> },
{ title: '状态', dataIndex: 'enabled', key: 'enabled', render: (v: boolean, r) => (
<Space size={8}>
<Tag color={v ? 'green' : undefined}>{v ? '启用' : '禁用'}</Tag>
</Space>
) },
{ title: '创建时间', dataIndex: 'created_at', key: 'created_at', render: (v?: string) => v ? formatDateTime(v) : '-' },
{ title: '更新时间', dataIndex: 'updated_at', key: 'updated_at', render: (v?: string) => v ? formatDateTime(v) : '-' },
{ title: '操作', key: 'actions', render: (_: any, record) => (
<Space size="small" align="center">
<Button type="link" icon={<EditOutlined />} onClick={() => openEdit(record)}></Button>
<Popconfirm title="确认删除该任务?" onConfirm={() => handleDelete(record)}>
<a className="action-link action-danger">
<DeleteOutlined />
<span></span>
</a>
</Popconfirm>
<a className="action-link" onClick={() => handleExecute(record)}>
<PlayCircleOutlined style={{ color: '#1890ff' }} />
<span></span>
</a>
{record.enabled ? (
<a className="action-link" onClick={() => handleToggle(record, false)}>
<StopOutlined />
<span></span>
</a>
) : (
<a className="action-link" onClick={() => handleToggle(record, true)}>
<CheckCircleOutlined style={{ color: '#52c41a' }} />
<span></span>
</a>
)}
</Space>
) },
], [data])
const openCreate = async () => {
setEditing(null)
form.resetFields()
await fetchFlowOptions()
setModalOpen(true)
}
const openEdit = async (record: ScheduleJobItem) => {
setEditing(record)
await fetchFlowOptions()
form.setFieldsValue({
name: record.name,
flow_code: record.flow_code,
cron_expr: record.cron_expr,
enabled: record.enabled,
})
setModalOpen(true)
}
const handleSubmit = async () => {
try {
const values = await form.validateFields()
if (editing) {
const { data } = await api.put(`/schedule_jobs/${editing.id}`, values)
if (data?.code === 0) {
message.success('更新成功')
setModalOpen(false)
fetchJobs()
} else {
throw new Error(data?.message || '更新失败')
}
} else {
const { data } = await api.post('/schedule_jobs', values)
if (data?.code === 0) {
message.success('创建成功')
setModalOpen(false)
fetchJobs(1, pageSize, keyword, enabledFilter)
} else {
throw new Error(data?.message || '创建失败')
}
}
} catch (e: any) {
if (e?.errorFields) return // 表单校验错误
message.error(e.message || '保存失败')
}
}
const handleDelete = async (record: ScheduleJobItem) => {
try {
const { data } = await api.delete(`/schedule_jobs/${record.id}`)
if (data?.code === 0) {
message.success('删除成功')
const nextPage = data?.data?.deleted ? (data?.data?.remaining === 0 && page > 1 ? page - 1 : page) : page
fetchJobs(nextPage, pageSize, keyword, enabledFilter)
} else {
throw new Error(data?.message || '删除失败')
}
} catch (e: any) {
message.error(e.message || '删除失败')
}
}
const handleToggle = async (record: ScheduleJobItem, next: boolean) => {
try {
const url = next ? `/schedule_jobs/${record.id}/enable` : `/schedule_jobs/${record.id}/disable`
const { data } = await api.post(url)
if (data?.code === 0) {
message.success(next ? '已启用' : '已禁用')
// 就地更新行
setData(prev => prev.map(it => it.id === record.id ? { ...it, enabled: next } : it))
} else {
throw new Error(data?.message || '操作失败')
}
} catch (e: any) {
message.error(e.message || '操作失败')
}
}
const handleExecute = async (record: ScheduleJobItem) => {
try {
const { data } = await api.post(`/schedule_jobs/${record.id}/execute`)
if (data?.code === 0) {
message.success('执行成功')
// 可以在这里显示执行结果或跳转到日志页面
console.log('执行结果:', data.data)
} else {
throw new Error(data?.message || '执行失败')
}
} catch (e: any) {
message.error(e.message || '执行失败')
}
}
const handleSearch = () => {
fetchJobs(1, pageSize, keyword, enabledFilter)
}
return (
<div>
<PageHeader items={["系统管理", "调度任务管理"]} title="" />
<div style={{ background: '#fff', padding: 16, marginBottom: 12 }}>
<Space wrap>
<Input.Search allowClear placeholder="关键字" value={keyword} onChange={e => setKeyword(e.target.value)} onSearch={handleSearch} style={{ width: 280 }} />
<Select
value={enabledFilter}
onChange={(v) => setEnabledFilter(v)}
style={{ width: 160 }}
options={[
{ label: '全部状态', value: 'all' },
{ label: '仅启用', value: 'true' },
{ label: '仅禁用', value: 'false' },
]}
/>
<Button type="primary" onClick={() => fetchJobs(1, pageSize, keyword, enabledFilter)}></Button>
<Button onClick={() => { setKeyword(''); setEnabledFilter('all'); fetchJobs(1, pageSize, '', 'all') }}></Button>
</Space>
</div>
<div style={{ background: '#fff', padding: 16 }}>
<div style={{ marginBottom: 12 }}>
<Space>
<Button type="primary" icon={<PlusOutlined />} onClick={openCreate}></Button>
<Button icon={<ReloadOutlined />} onClick={() => fetchJobs(page, pageSize, keyword, enabledFilter)}></Button>
</Space>
</div>
<Table
rowKey="id"
loading={loading}
dataSource={data}
columns={columns}
pagination={{
current: page,
pageSize,
total,
showSizeChanger: true,
onChange: (p, ps) => fetchJobs(p, ps, keyword, enabledFilter),
}}
/>
</div>
<Modal
open={modalOpen}
title={editing ? '编辑任务' : '新增任务'}
onCancel={() => setModalOpen(false)}
onOk={handleSubmit}
destroyOnClose
okText="保存"
cancelText="取消"
>
<Form form={form} layout="vertical" preserve={false} initialValues={{ enabled: true }}>
<Form.Item label="名称" name="name" rules={[{ required: true, message: '请输入名称' }]}>
<Input placeholder="例如:每小时同步数据" />
</Form.Item>
<Form.Item label="流程编码" name="flow_code" rules={[{ required: true, message: '请选择流程编码' }]}>
<Select
showSearch
placeholder="请选择绑定的流程(使用流程的 code"
options={flowOptions}
filterOption={(input, option) => (option?.label as string).toLowerCase().includes(input.toLowerCase())}
/>
</Form.Item>
<Form.Item label="Cron 表达式" name="cron_expr" rules={[{ required: true, message: '请输入 Cron 表达式' }]}>
<Input placeholder="例如0 * * * * *(每分钟)或 0 0 * * *(每日 0 点)" />
</Form.Item>
<Form.Item label="是否启用" name="enabled" valuePropName="checked">
<Switch />
</Form.Item>
</Form>
</Modal>
</div>
)
}

File diff suppressed because one or more lines are too long