//! 模块:定时任务服务(Service Layer) //! 职责: //! 1) 负责定时任务(schedule_jobs)的数据库增删改查; //! 2) 在创建/更新/删除后与调度器同步; //! 3) 服务启动时加载已启用任务并注册。 use std::{future::Future, pin::Pin, sync::Arc}; use chrono::{DateTime, FixedOffset, Utc}; use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, QueryFilter, QueryOrder, Set}; use tokio_cron_scheduler::Job; use tracing::{error, info}; use crate::{db::Db, error::AppError, models::schedule_job, utils}; /// 通用分页响应体 #[derive(serde::Serialize)] pub struct PageResp { pub items: Vec, pub total: u64, pub page: u64, pub page_size: u64, } /// 任务文档(对外返回 DTO) #[derive(serde::Deserialize, serde::Serialize, Clone, Debug)] pub struct ScheduleJobDoc { pub id: i64, pub name: String, pub cron_expr: String, pub enabled: bool, pub flow_code: String, pub created_at: DateTime, pub updated_at: DateTime, } impl From for ScheduleJobDoc { fn from(m: schedule_job::Model) -> Self { Self { id: m.id, name: m.name, cron_expr: m.cron_expr, enabled: m.enabled, flow_code: m.flow_code, created_at: m.created_at, updated_at: m.updated_at, } } } /// 创建任务请求体 #[derive(serde::Deserialize)] pub struct CreateReq { pub name: String, pub cron_expr: String, pub enabled: bool, pub flow_code: String, } /// 获取当前 UTC 时间并转为固定偏移 fn now_fixed_offset() -> DateTime { Utc::now().with_timezone(&FixedOffset::east_opt(0).unwrap()) } /// 创建任务 pub async fn create(db: &Db, req: CreateReq) -> Result { // 1) 校验 cron 表达式 Job::new_async(&req.cron_expr, |_id, _l| Box::pin(async {})) .map_err(|e| AppError::BadRequest(format!("无效的 cron 表达式: {e}")))?; // 2) 入库 let am = schedule_job::ActiveModel { id: Set(crate::utils::generate_id()), name: Set(req.name), cron_expr: Set(req.cron_expr), enabled: Set(req.enabled), flow_code: Set(req.flow_code), created_at: Set(now_fixed_offset()), updated_at: Set(now_fixed_offset()), }; let m = am.insert(db).await?; // 3) 同步调度器 let executor = build_executor_for_job(db, &m); utils::add_or_update_job_by_model(&m, executor).await.map_err(AppError::Anyhow)?; Ok(m.into()) } /// 构建任务执行闭包(JobExecutor) fn build_executor_for_job(db: &Db, m: &schedule_job::Model) -> utils::JobExecutor { let db = db.clone(); let job_id = m.id; let job_name = m.name.clone(); Arc::new(move || { let db = db.clone(); let job_id = job_id; let job_name = job_name.clone(); Box::pin(async move { match schedule_job::Entity::find_by_id(job_id).one(&db).await { Ok(Some(model)) if !model.enabled => { info!(target = "udmin", job = %job_name, id = %job_id, "scheduler.tick.skip"); return; } Ok(None) => { info!(target = "udmin", job = %job_name, id = %job_id, "scheduler.tick.deleted"); if let Err(e) = utils::remove_job_by_id(&job_id).await { error!(target = "udmin", id = %job_id, error = %e, "scheduler.self_remove.failed"); } return; } Err(e) => { error!(target = "udmin", job = %job_name, id = %job_id, error = %e, "scheduler.tick.error"); return; } _ => {} } info!(target = "udmin", job = %job_name, "scheduler.tick.start"); }) as Pin + Send>> }) }