Files
udmin/.trae/rules/examples/job_service.rs
ayou 8c06849254 feat(调度任务): 实现调度任务管理功能
新增调度任务模块,支持任务的增删改查、启停及手动执行
- 后端添加 schedule_job 模型、服务、路由及调度器工具类
- 前端新增调度任务管理页面
- 修改 flow 相关接口将 id 类型从 String 改为 i64
- 添加 tokio-cron-scheduler 依赖实现定时任务调度
- 初始化时加载已启用任务并注册到调度器
2025-09-24 00:21:30 +08:00

125 lines
3.9 KiB
Rust
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

//! 模块定时任务服务Service Layer
//! 职责:
//! 1) 负责定时任务schedule_jobs的数据库增删改查
//! 2) 在创建/更新/删除后与调度器同步;
//! 3) 服务启动时加载已启用任务并注册。
use std::{future::Future, pin::Pin, sync::Arc};
use chrono::{DateTime, FixedOffset, Utc};
use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, QueryFilter, QueryOrder, Set};
use tokio_cron_scheduler::Job;
use tracing::{error, info};
use crate::{db::Db, error::AppError, models::schedule_job, utils};
/// 通用分页响应体
#[derive(serde::Serialize)]
pub struct PageResp<T> {
pub items: Vec<T>,
pub total: u64,
pub page: u64,
pub page_size: u64,
}
/// 任务文档(对外返回 DTO
#[derive(serde::Deserialize, serde::Serialize, Clone, Debug)]
pub struct ScheduleJobDoc {
pub id: i64,
pub name: String,
pub cron_expr: String,
pub enabled: bool,
pub flow_code: String,
pub created_at: DateTime<FixedOffset>,
pub updated_at: DateTime<FixedOffset>,
}
impl From<schedule_job::Model> for ScheduleJobDoc {
fn from(m: schedule_job::Model) -> Self {
Self {
id: m.id,
name: m.name,
cron_expr: m.cron_expr,
enabled: m.enabled,
flow_code: m.flow_code,
created_at: m.created_at,
updated_at: m.updated_at,
}
}
}
/// 创建任务请求体
#[derive(serde::Deserialize)]
pub struct CreateReq {
pub name: String,
pub cron_expr: String,
pub enabled: bool,
pub flow_code: String,
}
/// 获取当前 UTC 时间并转为固定偏移
fn now_fixed_offset() -> DateTime<FixedOffset> {
Utc::now().with_timezone(&FixedOffset::east_opt(0).unwrap())
}
/// 创建任务
pub async fn create(db: &Db, req: CreateReq) -> Result<ScheduleJobDoc, AppError> {
// 1) 校验 cron 表达式
Job::new_async(&req.cron_expr, |_id, _l| Box::pin(async {}))
.map_err(|e| AppError::BadRequest(format!("无效的 cron 表达式: {e}")))?;
// 2) 入库
let am = schedule_job::ActiveModel {
id: Set(crate::utils::generate_id()),
name: Set(req.name),
cron_expr: Set(req.cron_expr),
enabled: Set(req.enabled),
flow_code: Set(req.flow_code),
created_at: Set(now_fixed_offset()),
updated_at: Set(now_fixed_offset()),
};
let m = am.insert(db).await?;
// 3) 同步调度器
let executor = build_executor_for_job(db, &m);
utils::add_or_update_job_by_model(&m, executor).await.map_err(AppError::Anyhow)?;
Ok(m.into())
}
/// 构建任务执行闭包JobExecutor
fn build_executor_for_job(db: &Db, m: &schedule_job::Model) -> utils::JobExecutor {
let db = db.clone();
let job_id = m.id;
let job_name = m.name.clone();
Arc::new(move || {
let db = db.clone();
let job_id = job_id;
let job_name = job_name.clone();
Box::pin(async move {
match schedule_job::Entity::find_by_id(job_id).one(&db).await {
Ok(Some(model)) if !model.enabled => {
info!(target = "udmin", job = %job_name, id = %job_id, "scheduler.tick.skip");
return;
}
Ok(None) => {
info!(target = "udmin", job = %job_name, id = %job_id, "scheduler.tick.deleted");
if let Err(e) = utils::remove_job_by_id(&job_id).await {
error!(target = "udmin", id = %job_id, error = %e, "scheduler.self_remove.failed");
}
return;
}
Err(e) => {
error!(target = "udmin", job = %job_name, id = %job_id, error = %e, "scheduler.tick.error");
return;
}
_ => {}
}
info!(target = "udmin", job = %job_name, "scheduler.tick.start");
}) as Pin<Box<dyn Future<Output = ()> + Send>>
})
}