diff --git a/.trae/rules/examples/job_service.rs b/.trae/rules/examples/job_service.rs new file mode 100644 index 0000000..64410fd --- /dev/null +++ b/.trae/rules/examples/job_service.rs @@ -0,0 +1,124 @@ +//! 模块:定时任务服务(Service Layer) +//! 职责: +//! 1) 负责定时任务(schedule_jobs)的数据库增删改查; +//! 2) 在创建/更新/删除后与调度器同步; +//! 3) 服务启动时加载已启用任务并注册。 + +use std::{future::Future, pin::Pin, sync::Arc}; + +use chrono::{DateTime, FixedOffset, Utc}; +use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, QueryFilter, QueryOrder, Set}; +use tokio_cron_scheduler::Job; +use tracing::{error, info}; + +use crate::{db::Db, error::AppError, models::schedule_job, utils}; + +/// 通用分页响应体 +#[derive(serde::Serialize)] +pub struct PageResp { + pub items: Vec, + pub total: u64, + pub page: u64, + pub page_size: u64, +} + +/// 任务文档(对外返回 DTO) +#[derive(serde::Deserialize, serde::Serialize, Clone, Debug)] +pub struct ScheduleJobDoc { + pub id: i64, + pub name: String, + pub cron_expr: String, + pub enabled: bool, + pub flow_code: String, + pub created_at: DateTime, + pub updated_at: DateTime, +} + +impl From for ScheduleJobDoc { + fn from(m: schedule_job::Model) -> Self { + Self { + id: m.id, + name: m.name, + cron_expr: m.cron_expr, + enabled: m.enabled, + flow_code: m.flow_code, + created_at: m.created_at, + updated_at: m.updated_at, + } + } +} + +/// 创建任务请求体 +#[derive(serde::Deserialize)] +pub struct CreateReq { + pub name: String, + pub cron_expr: String, + pub enabled: bool, + pub flow_code: String, +} + +/// 获取当前 UTC 时间并转为固定偏移 +fn now_fixed_offset() -> DateTime { + Utc::now().with_timezone(&FixedOffset::east_opt(0).unwrap()) +} + +/// 创建任务 +pub async fn create(db: &Db, req: CreateReq) -> Result { + // 1) 校验 cron 表达式 + Job::new_async(&req.cron_expr, |_id, _l| Box::pin(async {})) + .map_err(|e| AppError::BadRequest(format!("无效的 cron 表达式: {e}")))?; + + // 2) 入库 + let am = schedule_job::ActiveModel { + id: Set(crate::utils::generate_id()), + name: Set(req.name), + cron_expr: Set(req.cron_expr), + enabled: Set(req.enabled), + flow_code: Set(req.flow_code), + created_at: Set(now_fixed_offset()), + updated_at: Set(now_fixed_offset()), + }; + let m = am.insert(db).await?; + + // 3) 同步调度器 + let executor = build_executor_for_job(db, &m); + utils::add_or_update_job_by_model(&m, executor).await.map_err(AppError::Anyhow)?; + + Ok(m.into()) +} + +/// 构建任务执行闭包(JobExecutor) +fn build_executor_for_job(db: &Db, m: &schedule_job::Model) -> utils::JobExecutor { + let db = db.clone(); + let job_id = m.id; + let job_name = m.name.clone(); + + Arc::new(move || { + let db = db.clone(); + let job_id = job_id; + let job_name = job_name.clone(); + + Box::pin(async move { + match schedule_job::Entity::find_by_id(job_id).one(&db).await { + Ok(Some(model)) if !model.enabled => { + info!(target = "udmin", job = %job_name, id = %job_id, "scheduler.tick.skip"); + return; + } + Ok(None) => { + info!(target = "udmin", job = %job_name, id = %job_id, "scheduler.tick.deleted"); + if let Err(e) = utils::remove_job_by_id(&job_id).await { + error!(target = "udmin", id = %job_id, error = %e, "scheduler.self_remove.failed"); + } + return; + } + Err(e) => { + error!(target = "udmin", job = %job_name, id = %job_id, error = %e, "scheduler.tick.error"); + return; + } + _ => {} + } + + info!(target = "udmin", job = %job_name, "scheduler.tick.start"); + }) as Pin + Send>> + }) +} diff --git a/.trae/rules/project_rules.md b/.trae/rules/project_rules.md new file mode 100644 index 0000000..20d08f4 --- /dev/null +++ b/.trae/rules/project_rules.md @@ -0,0 +1,152 @@ +# Rust 代码风格规范(用于 AI 生成代码规则) + +## 1. 基础风格 + +* 使用 **Rust 2021 edition**。 +* 缩进统一 **4 空格**,不使用 Tab。 +* 每行代码长度建议不超过 **100 字符**。 +* **花括号风格**: + + ```rust + fn example() { + // good + } + ``` +* 表达式尽量简洁,必要时换行,参数链式调用时 **缩进对齐**。 + +--- + +## 2. 模块与导入 + +* 模块顶部导入,按以下顺序分组,组间空一行: + + 1. **标准库 (`std::..`)** + 2. **第三方库 (`chrono`, `sea_orm`, `tokio` 等)** + 3. **本地 crate (`crate::..`)** +* 统一使用 **显式导入**,禁止 `use super::*` 或 `use crate::*`。 +* 相同模块导入合并: + + ```rust + use sea_orm::{EntityTrait, QueryFilter, ColumnTrait}; + ``` + +--- + +## 3. 命名规则 + +* **模块 / 文件名**:`snake_case` +* **函数 / 变量名**:`snake_case` +* **结构体 / 枚举名**:`PascalCase` +* **常量**:`UPPER_CASE` +* **DTO/请求体/响应体**后缀:`Doc` / `Req` / `Resp` + +示例: + +```rust +pub struct ScheduleJobDoc { .. } +pub struct CreateReq { .. } +pub struct PageResp { .. } +``` + +--- + +## 4. 文档与注释 + +* 每个 **模块** 顶部使用 `//!` 写模块职责说明。 +* 每个 **公开函数** 必须有 `///` 注释,简述用途与主要逻辑。 +* 内部复杂逻辑使用 `//` 单行注释解释。 +* 中文注释优先,避免英文缩写晦涩难懂。 + +示例: + +```rust +/// 创建任务: +/// - 校验 cron 表达式 +/// - 校验唯一性 +/// - 入库后注册调度器 +pub async fn create(..) -> Result<..> { .. } +``` + +--- + +## 5. 错误处理 + +* 错误类型统一用 **自定义错误枚举**(如 `AppError`)。 +* 不直接 `unwrap()` / `expect()`,统一返回 `Result`。 +* 错误信息应清晰且面向用户,内部日志保留技术细节。 + +--- + +## 6. 日志规范 + +* 使用 `tracing` 库,必须带 `target`。 +* 统一格式:`模块.操作.状态` +* 日志字段使用 `key = %value` 或 `key = ?value`,避免拼接字符串。 + +示例: + +```rust +info!(target = "udmin", id = %job_id, enabled = %enabled, "schedule_jobs.update.persisted"); +error!(target = "udmin", id = %job_id, error = %e, "schedule_jobs.run.failed"); +``` + +--- + +## 7. 异步与数据库 + +* 使用 `async fn`,返回 `Result`。 +* SeaORM 查询使用链式写法,**按字段过滤**时一行一个 filter。 +* 分页/排序明确写出,不隐式。 + +示例: + +```rust +let jobs = schedule_job::Entity::find() + .filter(schedule_job::Column::Enabled.eq(true)) + .order_by_desc(schedule_job::Column::UpdatedAt) + .paginate(db, page_size); +``` + +--- + +## 8. 结构体组织 + +* DTO / 请求体 / 响应体 放在模块前部。 +* Service 函数按生命周期顺序:`list → create → update → remove → init`。 +* 工具函数(如 `build_executor_for_job`、`now_fixed_offset`)放在模块最后。 + +--- + +## 9. 闭包与异步执行器 + +* 使用 `Arc::new(move || { Box::pin(async move { .. }) })` 形式。 +* 避免重复 clone,大对象提前 clone 一次再 move 进闭包。 +* 返回 `Pin + Send>>`。 + +--- + +## 10. 时间与 ID + +* 时间统一用 `chrono::Utc::now().with_timezone(&FixedOffset::east_opt(0).unwrap())`,可封装成 `now_fixed_offset()`。 +* ID 统一使用 `id: Set(crate::utils::generate_id())`。 + +--- + +## 11. 统一返回体 + +* 分页接口统一返回 `PageResp`。 +* 单条数据返回 DTO(如 `ScheduleJobDoc`)。 +* 删除接口返回 `Result<(), AppError>`。 + +--- + +## 12. 代码整洁性 + +* 避免嵌套过深,必要时提前 `return`。 +* 冗余 clone 使用 `.clone()` 仅在必须时。 +* 枚举 / match 分支完整,必要时加 `_ => {}` 显式忽略。 + +--- + +⚡ 总结: +生成的代码必须 **简洁、清晰、分组有序、日志一致、错误优雅**,看起来像经验丰富的 Rust 高手写的生产级代码。 diff --git a/backend/Cargo.lock b/backend/Cargo.lock index 8d63ecb..a7f4bb5 100644 --- a/backend/Cargo.lock +++ b/backend/Cargo.lock @@ -661,6 +661,15 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "croner" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c344b0690c1ad1c7176fe18eb173e0c927008fdaaa256e40dfd43ddd149c0843" +dependencies = [ + "chrono", +] + [[package]] name = "crossbeam-queue" version = "0.3.12" @@ -1763,6 +1772,17 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" +[[package]] +name = "num-derive" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed3955f1a9c7c0c15e092f9c887db08b1fc683305fdf6eb6684f22555355e202" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.106", +] + [[package]] name = "num-integer" version = "0.1.46" @@ -3637,6 +3657,21 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "tokio-cron-scheduler" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c71ce8f810abc9fabebccc30302a952f9e89c6cf246fafaf170fef164063141" +dependencies = [ + "chrono", + "croner", + "num-derive", + "num-traits", + "tokio", + "tracing", + "uuid", +] + [[package]] name = "tokio-macros" version = "2.5.0" @@ -3926,6 +3961,7 @@ dependencies = [ "sha2", "thiserror", "tokio", + "tokio-cron-scheduler", "tokio-stream", "tower", "tower-http", diff --git a/backend/Cargo.toml b/backend/Cargo.toml index d05b756..705ddce 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -39,6 +39,7 @@ regex = "1.11.2" reqwest = { version = "0.12.23", features = ["json", "rustls-tls-native-roots"], default-features = false } futures = "0.3.31" percent-encoding = "2.3" +tokio-cron-scheduler = "0.14.0" # 新增: QuickJS 运行时用于 JS 执行器(不启用额外特性) rquickjs = "0.9.0" # 新增: 用于将 mpsc::Receiver 封装为 Stream(SSE) diff --git a/backend/migration/src/lib.rs b/backend/migration/src/lib.rs index d7b6bc9..dacd2f1 100644 --- a/backend/migration/src/lib.rs +++ b/backend/migration/src/lib.rs @@ -23,6 +23,8 @@ mod m20220101_000016_add_unique_index_to_flows_code; mod m20220101_000017_create_flow_run_logs; // 新增:为 flow_run_logs 添加 flow_code 列 mod m20220101_000018_add_flow_code_to_flow_run_logs; +// 新增:计划任务表 +mod m20220101_000019_create_schedule_jobs; pub struct Migrator; @@ -55,6 +57,8 @@ impl MigratorTrait for Migrator { Box::new(m20220101_000017_create_flow_run_logs::Migration), // 新增:为 flow_run_logs 添加 flow_code 列 Box::new(m20220101_000018_add_flow_code_to_flow_run_logs::Migration), + // 新增:计划任务表 + Box::new(m20220101_000019_create_schedule_jobs::Migration), ] } } \ No newline at end of file diff --git a/backend/migration/src/m20220101_000019_create_schedule_jobs.rs b/backend/migration/src/m20220101_000019_create_schedule_jobs.rs new file mode 100644 index 0000000..400b9cd --- /dev/null +++ b/backend/migration/src/m20220101_000019_create_schedule_jobs.rs @@ -0,0 +1,42 @@ +use sea_orm_migration::prelude::*; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .create_table( + Table::create() + .table(ScheduleJobs::Table) + .if_not_exists() + .col(ColumnDef::new(ScheduleJobs::Id).string_len(64).not_null().primary_key()) + .col(ColumnDef::new(ScheduleJobs::Name).string().not_null()) + .col(ColumnDef::new(ScheduleJobs::CronExpr).string().not_null()) + .col(ColumnDef::new(ScheduleJobs::Enabled).boolean().not_null().default(false)) + .col(ColumnDef::new(ScheduleJobs::FlowCode).string().not_null()) + .col(ColumnDef::new(ScheduleJobs::CreatedAt).timestamp().not_null().default(Expr::current_timestamp())) + .col(ColumnDef::new(ScheduleJobs::UpdatedAt).timestamp().not_null().default(Expr::current_timestamp())) + .to_owned() + ) + .await?; + Ok(()) + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager.drop_table(Table::drop().table(ScheduleJobs::Table).to_owned()).await + } +} + +#[derive(Iden)] +enum ScheduleJobs { + Table, + Id, + Name, + CronExpr, + Enabled, + FlowCode, + CreatedAt, + UpdatedAt, +} \ No newline at end of file diff --git a/backend/src/flow/log_handler.rs b/backend/src/flow/log_handler.rs index dd5b525..fd0e5db 100644 --- a/backend/src/flow/log_handler.rs +++ b/backend/src/flow/log_handler.rs @@ -11,19 +11,19 @@ use crate::db::Db; #[async_trait] pub trait FlowLogHandler: Send + Sync { /// 记录流程开始执行 - async fn log_start(&self, flow_id: &str, flow_code: Option<&str>, input: &Value, operator: Option<(i64, String)>) -> anyhow::Result<()>; + async fn log_start(&self, flow_id: i64, flow_code: Option<&str>, input: &Value, operator: Option<(i64, String)>) -> anyhow::Result<()>; /// 记录流程执行失败(仅包含错误信息) - async fn log_error(&self, flow_id: &str, flow_code: Option<&str>, input: &Value, error_msg: &str, operator: Option<(i64, String)>, started_at: DateTime, duration_ms: i64) -> anyhow::Result<()>; + async fn log_error(&self, flow_id: i64, flow_code: Option<&str>, input: &Value, error_msg: &str, operator: Option<(i64, String)>, started_at: DateTime, duration_ms: i64) -> anyhow::Result<()>; /// 记录流程执行失败(包含部分输出与累计日志) - async fn log_error_detail(&self, _flow_id: &str, _flow_code: Option<&str>, _input: &Value, _output: &Value, _logs: &[String], error_msg: &str, _operator: Option<(i64, String)>, _started_at: DateTime, _duration_ms: i64) -> anyhow::Result<()> { + async fn log_error_detail(&self, _flow_id: i64, _flow_code: Option<&str>, _input: &Value, _output: &Value, _logs: &[String], error_msg: &str, _operator: Option<(i64, String)>, _started_at: DateTime, _duration_ms: i64) -> anyhow::Result<()> { // 默认实现:退化为仅错误信息 self.log_error(_flow_id, _flow_code, _input, error_msg, _operator, _started_at, _duration_ms).await } /// 记录流程执行成功 - async fn log_success(&self, flow_id: &str, flow_code: Option<&str>, input: &Value, output: &Value, logs: &[String], operator: Option<(i64, String)>, started_at: DateTime, duration_ms: i64) -> anyhow::Result<()>; + async fn log_success(&self, flow_id: i64, flow_code: Option<&str>, input: &Value, output: &Value, logs: &[String], operator: Option<(i64, String)>, started_at: DateTime, duration_ms: i64) -> anyhow::Result<()>; /// 推送节点执行事件(仅SSE实现需要) async fn emit_node_event(&self, _node_id: &str, _event_type: &str, _data: &Value) -> anyhow::Result<()> { @@ -51,15 +51,15 @@ impl DatabaseLogHandler { #[async_trait] impl FlowLogHandler for DatabaseLogHandler { - async fn log_start(&self, _flow_id: &str, _flow_code: Option<&str>, _input: &Value, _operator: Option<(i64, String)>) -> anyhow::Result<()> { + async fn log_start(&self, _flow_id: i64, _flow_code: Option<&str>, _input: &Value, _operator: Option<(i64, String)>) -> anyhow::Result<()> { // 数据库日志处理器不需要记录开始事件,只在结束时记录 Ok(()) } - async fn log_error(&self, flow_id: &str, flow_code: Option<&str>, input: &Value, error_msg: &str, operator: Option<(i64, String)>, started_at: DateTime, duration_ms: i64) -> anyhow::Result<()> { + async fn log_error(&self, flow_id: i64, flow_code: Option<&str>, input: &Value, error_msg: &str, operator: Option<(i64, String)>, started_at: DateTime, duration_ms: i64) -> anyhow::Result<()> { let (user_id, username) = operator.map(|(u, n)| (Some(u), Some(n))).unwrap_or((None, None)); flow_run_log_service::create(&self.db, CreateRunLogInput { - flow_id: flow_id.to_string(), + flow_id, flow_code: flow_code.map(|s| s.to_string()), input: Some(serde_json::to_string(input).unwrap_or_default()), output: None, @@ -73,7 +73,7 @@ impl FlowLogHandler for DatabaseLogHandler { Ok(()) } - async fn log_error_detail(&self, flow_id: &str, flow_code: Option<&str>, input: &Value, output: &Value, logs: &[String], error_msg: &str, operator: Option<(i64, String)>, started_at: DateTime, duration_ms: i64) -> anyhow::Result<()> { + async fn log_error_detail(&self, flow_id: i64, flow_code: Option<&str>, input: &Value, output: &Value, logs: &[String], error_msg: &str, operator: Option<(i64, String)>, started_at: DateTime, duration_ms: i64) -> anyhow::Result<()> { let (user_id, username) = operator.map(|(u, n)| (Some(u), Some(n))).unwrap_or((None, None)); // 将 error_msg 附加到日志尾部(若最后一条不同),确保日志中有清晰的错误描述且不重复 let mut all_logs = logs.to_vec(); @@ -81,7 +81,7 @@ impl FlowLogHandler for DatabaseLogHandler { all_logs.push(error_msg.to_string()); } flow_run_log_service::create(&self.db, CreateRunLogInput { - flow_id: flow_id.to_string(), + flow_id, flow_code: flow_code.map(|s| s.to_string()), input: Some(serde_json::to_string(input).unwrap_or_default()), output: Some(serde_json::to_string(output).unwrap_or_default()), @@ -95,10 +95,10 @@ impl FlowLogHandler for DatabaseLogHandler { Ok(()) } - async fn log_success(&self, flow_id: &str, flow_code: Option<&str>, input: &Value, output: &Value, logs: &[String], operator: Option<(i64, String)>, started_at: DateTime, duration_ms: i64) -> anyhow::Result<()> { + async fn log_success(&self, flow_id: i64, flow_code: Option<&str>, input: &Value, output: &Value, logs: &[String], operator: Option<(i64, String)>, started_at: DateTime, duration_ms: i64) -> anyhow::Result<()> { let (user_id, username) = operator.map(|(u, n)| (Some(u), Some(n))).unwrap_or((None, None)); flow_run_log_service::create(&self.db, CreateRunLogInput { - flow_id: flow_id.to_string(), + flow_id, flow_code: flow_code.map(|s| s.to_string()), input: Some(serde_json::to_string(input).unwrap_or_default()), output: Some(serde_json::to_string(output).unwrap_or_default()), @@ -127,19 +127,19 @@ impl SseLogHandler { #[async_trait] impl FlowLogHandler for SseLogHandler { - async fn log_start(&self, _flow_id: &str, _flow_code: Option<&str>, _input: &Value, _operator: Option<(i64, String)>) -> anyhow::Result<()> { + async fn log_start(&self, _flow_id: i64, _flow_code: Option<&str>, _input: &Value, _operator: Option<(i64, String)>) -> anyhow::Result<()> { // SSE处理器也不需要记录开始事件 Ok(()) } - async fn log_error(&self, flow_id: &str, flow_code: Option<&str>, input: &Value, error_msg: &str, operator: Option<(i64, String)>, started_at: DateTime, duration_ms: i64) -> anyhow::Result<()> { + async fn log_error(&self, flow_id: i64, flow_code: Option<&str>, input: &Value, error_msg: &str, operator: Option<(i64, String)>, started_at: DateTime, duration_ms: i64) -> anyhow::Result<()> { // 先推送SSE错误事件(不在此处发送 done,交由调用方统一携带 ctx/logs 发送) crate::middlewares::sse::emit_error(&self.event_tx, error_msg.to_string()).await; // 然后记录到数据库(仅错误信息) let (user_id, username) = operator.map(|(u, n)| (Some(u), Some(n))).unwrap_or((None, None)); flow_run_log_service::create(&self.db, CreateRunLogInput { - flow_id: flow_id.to_string(), + flow_id, flow_code: flow_code.map(|s| s.to_string()), input: Some(serde_json::to_string(input).unwrap_or_default()), output: None, @@ -153,7 +153,7 @@ impl FlowLogHandler for SseLogHandler { Ok(()) } - async fn log_error_detail(&self, flow_id: &str, flow_code: Option<&str>, input: &Value, output: &Value, logs: &[String], error_msg: &str, operator: Option<(i64, String)>, started_at: DateTime, duration_ms: i64) -> anyhow::Result<()> { + async fn log_error_detail(&self, flow_id: i64, flow_code: Option<&str>, input: &Value, output: &Value, logs: &[String], error_msg: &str, operator: Option<(i64, String)>, started_at: DateTime, duration_ms: i64) -> anyhow::Result<()> { // 先推送SSE错误事件(不在此处发送 done,交由调用方统一携带 ctx/logs 发送) crate::middlewares::sse::emit_error(&self.event_tx, error_msg.to_string()).await; @@ -164,7 +164,7 @@ impl FlowLogHandler for SseLogHandler { all_logs.push(error_msg.to_string()); } flow_run_log_service::create(&self.db, CreateRunLogInput { - flow_id: flow_id.to_string(), + flow_id, flow_code: flow_code.map(|s| s.to_string()), input: Some(serde_json::to_string(input).unwrap_or_default()), output: Some(serde_json::to_string(output).unwrap_or_default()), @@ -178,14 +178,14 @@ impl FlowLogHandler for SseLogHandler { Ok(()) } - async fn log_success(&self, flow_id: &str, flow_code: Option<&str>, input: &Value, output: &Value, logs: &[String], operator: Option<(i64, String)>, started_at: DateTime, duration_ms: i64) -> anyhow::Result<()> { + async fn log_success(&self, flow_id: i64, flow_code: Option<&str>, input: &Value, output: &Value, logs: &[String], operator: Option<(i64, String)>, started_at: DateTime, duration_ms: i64) -> anyhow::Result<()> { // 先推送SSE完成事件 crate::middlewares::sse::emit_done(&self.event_tx, true, output.clone(), logs.to_vec()).await; // 然后记录到数据库 let (user_id, username) = operator.map(|(u, n)| (Some(u), Some(n))).unwrap_or((None, None)); flow_run_log_service::create(&self.db, CreateRunLogInput { - flow_id: flow_id.to_string(), + flow_id, flow_code: flow_code.map(|s| s.to_string()), input: Some(serde_json::to_string(input).unwrap_or_default()), output: Some(serde_json::to_string(output).unwrap_or_default()), diff --git a/backend/src/main.rs b/backend/src/main.rs index 32ef5b6..e577135 100644 --- a/backend/src/main.rs +++ b/backend/src/main.rs @@ -70,6 +70,14 @@ async fn main() -> anyhow::Result<()> { // run migrations migration::Migrator::up(&db, None).await.expect("migration up"); + // 初始化并启动调度器(仅启动,不加载DB) + if let Err(e) = crate::utils::init_scheduler().await { tracing::error!(target = "udmin", error = %e, "init scheduler failed"); } + + // 由 service 层加载启用任务并注册到调度器 + if let Err(e) = services::schedule_job_service::init_load_enabled_and_register(&db).await { + tracing::error!(target = "udmin", error = %e, "init schedule jobs failed"); + } + let allow_origins = std::env::var("CORS_ALLOW_ORIGINS").unwrap_or_else(|_| "http://localhost:5173".into()); let origin_values: Vec = allow_origins .split(',') diff --git a/backend/src/middlewares/sse.rs b/backend/src/middlewares/sse.rs index 02519f1..63a051b 100644 --- a/backend/src/middlewares/sse.rs +++ b/backend/src/middlewares/sse.rs @@ -158,7 +158,7 @@ struct RunReq { #[serde(default)] input: serde_json::Value } async fn run_sse( State(db): State, - Path(id): Path, + Path(id): Path, Query(q): Query>, headers: HeaderMap, Json(req): Json, @@ -192,11 +192,11 @@ async fn run_sse( // 启动后台任务运行流程,将事件通过 tx 发送 let db_clone = db.clone(); - let id_clone = id.clone(); + let id_clone = id; let input = req.input.clone(); let user_info = Some((claims.uid, claims.sub)); tokio::spawn(async move { - let _ = flow_service::run_with_stream(db_clone, &id_clone, flow_service::RunReq { input }, user_info, tx).await; + let _ = flow_service::run_with_stream(db_clone, id_clone, flow_service::RunReq { input }, user_info, tx).await; }); // 由通用组件把 Receiver 包装为 SSE 响应 diff --git a/backend/src/middlewares/ws.rs b/backend/src/middlewares/ws.rs index 7041a55..81ed83e 100644 --- a/backend/src/middlewares/ws.rs +++ b/backend/src/middlewares/ws.rs @@ -42,7 +42,7 @@ use axum::extract::ws::{WebSocketUpgrade, WebSocket, Message, Utf8Bytes}; pub async fn run_ws( State(db): State, - Path(id): Path, + Path(id): Path, Query(q): Query>, headers: HeaderMap, ws: WebSocketUpgrade, @@ -70,7 +70,7 @@ pub async fn run_ws( } let db_clone = db.clone(); - let id_clone = id.clone(); + let id_clone = id; let user_info = Some((claims.uid, claims.sub)); Ok(ws.on_upgrade(move |socket| async move { @@ -78,7 +78,7 @@ pub async fn run_ws( })) } -pub async fn handle_ws_flow(mut socket: WebSocket, db: Db, id: String, user_info: Option<(i64, String)>) { +pub async fn handle_ws_flow(mut socket: WebSocket, db: Db, id: i64, user_info: Option<(i64, String)>) { use tokio::time::{timeout, Duration}; use tokio::select; use tokio::sync::mpsc; @@ -106,9 +106,9 @@ pub async fn handle_ws_flow(mut socket: WebSocket, db: Db, id: String, user_info // 后台运行流程 let db2 = db.clone(); - let id2 = id.clone(); + let id2 = id; tokio::spawn(async move { - let _ = flow_service::run_with_stream(db2, &id2, flow_service::RunReq { input: input_value }, user_info, tx).await; + let _ = flow_service::run_with_stream(db2, id2, flow_service::RunReq { input: input_value }, user_info, tx).await; }); // 转发事件到 WebSocket diff --git a/backend/src/models/flow.rs b/backend/src/models/flow.rs index 698b7c2..1329ed3 100644 --- a/backend/src/models/flow.rs +++ b/backend/src/models/flow.rs @@ -4,7 +4,7 @@ use sea_orm::entity::prelude::*; #[sea_orm(table_name = "flows")] pub struct Model { #[sea_orm(primary_key)] - pub id: String, + pub id: i64, pub name: Option, pub yaml: Option, pub design_json: Option, diff --git a/backend/src/models/flow_run_log.rs b/backend/src/models/flow_run_log.rs index ce336cd..21cdcc4 100644 --- a/backend/src/models/flow_run_log.rs +++ b/backend/src/models/flow_run_log.rs @@ -5,7 +5,7 @@ use sea_orm::entity::prelude::*; pub struct Model { #[sea_orm(primary_key)] pub id: i64, - pub flow_id: String, + pub flow_id: i64, // 新增:流程编码(可空) pub flow_code: Option, pub input: Option, diff --git a/backend/src/models/mod.rs b/backend/src/models/mod.rs index e0deac4..9747100 100644 --- a/backend/src/models/mod.rs +++ b/backend/src/models/mod.rs @@ -11,4 +11,5 @@ pub mod request_log; pub mod position; pub mod user_position; pub mod flow; -pub mod flow_run_log; \ No newline at end of file +pub mod flow_run_log; +pub mod schedule_job; \ No newline at end of file diff --git a/backend/src/models/schedule_job.rs b/backend/src/models/schedule_job.rs new file mode 100644 index 0000000..bff43dc --- /dev/null +++ b/backend/src/models/schedule_job.rs @@ -0,0 +1,19 @@ +use sea_orm::entity::prelude::*; + +#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel, serde::Serialize, serde::Deserialize)] +#[sea_orm(table_name = "schedule_jobs")] +pub struct Model { + #[sea_orm(primary_key)] + pub id: String, + pub name: String, + pub cron_expr: String, + pub enabled: bool, + pub flow_code: String, + pub created_at: DateTimeWithTimeZone, + pub updated_at: DateTimeWithTimeZone, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation {} + +impl ActiveModelBehavior for ActiveModel {} \ No newline at end of file diff --git a/backend/src/routes/dynamic_api.rs b/backend/src/routes/dynamic_api.rs index 4fcee99..d014a26 100644 --- a/backend/src/routes/dynamic_api.rs +++ b/backend/src/routes/dynamic_api.rs @@ -28,7 +28,7 @@ async fn execute_flow( let flow_id = flow_doc.id.clone(); info!(target = "udmin", "dynamic_api.execute_flow: found flow id={} for code={}", flow_id, flow_code); - match flow_service::run(&db, &flow_id, flow_service::RunReq { input: payload }, Some((0, "接口".to_string()))).await { + match flow_service::run(&db, flow_id, flow_service::RunReq { input: payload }, Some((0, "接口".to_string()))).await { Ok(result) => { info!(target = "udmin", "dynamic_api.execute_flow: execution successful flow_code={}", flow_code); // 仅返回上下文中的 http_resp / http_response,如果不存在则返回空对象 {} diff --git a/backend/src/routes/flows.rs b/backend/src/routes/flows.rs index 9d619f2..474c505 100644 --- a/backend/src/routes/flows.rs +++ b/backend/src/routes/flows.rs @@ -83,26 +83,26 @@ struct UpdateReq { yaml: Option, design_json: Option, Ok(Json(ApiResponse::ok(res))) } - async fn update(State(db): State, Path(id): Path, Json(req): Json) -> Result>, AppError> { - let res = flow_service::update(&db, &id, flow_service::FlowUpdateReq { yaml: req.yaml, design_json: req.design_json, name: req.name, code: req.code, remark: req.remark }).await.map_err(flow_service::ae)?; + async fn update(State(db): State, Path(id): Path, Json(req): Json) -> Result>, AppError> { + let res = flow_service::update(&db, id, flow_service::FlowUpdateReq { yaml: req.yaml, design_json: req.design_json, name: req.name, code: req.code, remark: req.remark }).await.map_err(flow_service::ae)?; Ok(Json(ApiResponse::ok(res))) } -async fn get_one(State(db): State, Path(id): Path) -> Result>, AppError> { - let res = flow_service::get(&db, &id).await.map_err(flow_service::ae)?; +async fn get_one(State(db): State, Path(id): Path) -> Result>, AppError> { + let res = flow_service::get(&db, id).await.map_err(flow_service::ae)?; Ok(Json(ApiResponse::ok(res))) } -async fn remove(State(db): State, Path(id): Path) -> Result>, AppError> { - flow_service::delete(&db, &id).await.map_err(flow_service::ae)?; +async fn remove(State(db): State, Path(id): Path) -> Result>, AppError> { + flow_service::delete(&db, id).await.map_err(flow_service::ae)?; Ok(Json(ApiResponse::ok(serde_json::json!({"deleted": true})))) } #[derive(Deserialize)] struct RunReq { #[serde(default)] input: serde_json::Value } -async fn run(State(db): State, user: AuthUser, Path(id): Path, Json(req): Json) -> Result>, AppError> { - match flow_service::run(&db, &id, flow_service::RunReq { input: req.input }, Some((user.uid, user.username))).await { +async fn run(State(db): State, user: AuthUser, Path(id): Path, Json(req): Json) -> Result>, AppError> { + match flow_service::run(&db, id, flow_service::RunReq { input: req.input }, Some((user.uid, user.username))).await { Ok(r) => Ok(Json(ApiResponse::ok(r))), Err(e) => { // 同步执行:直接把后端错误详细信息返回给前端 @@ -114,18 +114,18 @@ async fn run(State(db): State, user: AuthUser, Path(id): Path, Json( } // 新增:SSE 流式运行端点,请求体沿用 RunReq(只包含 input) -async fn run_stream(State(db): State, user: AuthUser, Path(id): Path, Json(req): Json) -> Result>>, AppError> { +async fn run_stream(State(db): State, user: AuthUser, Path(id): Path, Json(req): Json) -> Result>>, AppError> { // 建立 mpsc 通道用于接收引擎的流式事件 let (tx, rx) = tokio::sync::mpsc::channel::(16); // 启动后台任务运行流程,将事件通过 tx 发送 let db_clone = db.clone(); - let id_clone = id.clone(); + let id_clone = id; let input = req.input.clone(); let user_info = Some((user.uid, user.username)); tokio::spawn(async move { // 复用 flow_service::run 内部大部分逻辑,但通过 DriveOptions 注入 event_tx - let _ = flow_service::run_with_stream(db_clone, &id_clone, flow_service::RunReq { input }, user_info, tx).await; + let _ = flow_service::run_with_stream(db_clone, id_clone, flow_service::RunReq { input }, user_info, tx).await; }); // 由通用组件把 Receiver 包装为 SSE 响应 @@ -136,7 +136,7 @@ async fn run_stream(State(db): State, user: AuthUser, Path(id): Path async fn run_ws( State(db): State, - Path(id): Path, + Path(id): Path, Query(q): Query>, headers: HeaderMap, ws: WebSocketUpgrade, diff --git a/backend/src/routes/mod.rs b/backend/src/routes/mod.rs index 21185b5..3eb8aff 100644 --- a/backend/src/routes/mod.rs +++ b/backend/src/routes/mod.rs @@ -8,6 +8,7 @@ pub mod logs; pub mod flows; pub mod flow_run_logs; pub mod dynamic_api; +pub mod schedule_jobs; use axum::Router; use crate::db::Db; @@ -24,4 +25,5 @@ pub fn api_router() -> Router { .merge(positions::router()) .merge(flow_run_logs::router()) .merge(dynamic_api::router()) + .merge(schedule_jobs::router()) } \ No newline at end of file diff --git a/backend/src/routes/schedule_jobs.rs b/backend/src/routes/schedule_jobs.rs new file mode 100644 index 0000000..906659e --- /dev/null +++ b/backend/src/routes/schedule_jobs.rs @@ -0,0 +1,76 @@ +use axum::{Router, routing::{get, post, put}, extract::{State, Path, Query}, Json}; +use crate::{db::Db, error::AppError, response::ApiResponse, services::{schedule_job_service, flow_service}, models::schedule_job}; +use crate::middlewares::jwt::AuthUser; +use sea_orm::{EntityTrait}; + +pub fn router() -> Router { + Router::new() + .route("/schedule_jobs", get(list).post(create)) + .route("/schedule_jobs/{id}", put(update).delete(remove)) + // 新增:独立启停端点 + .route("/schedule_jobs/{id}/enable", post(enable)) + .route("/schedule_jobs/{id}/disable", post(disable)) + // 新增:手动执行端点 + .route("/schedule_jobs/{id}/execute", post(execute)) +} + +async fn list(State(db): State, Query(p): Query) -> Result>>, AppError> { + let res = schedule_job_service::list(&db, p).await?; + Ok(Json(ApiResponse::ok(res))) +} + +#[derive(serde::Deserialize)] +struct CreateReq { name: String, cron_expr: String, enabled: bool, flow_code: String } + +async fn create(State(db): State, _user: AuthUser, Json(req): Json) -> Result>, AppError> { + let res = schedule_job_service::create(&db, schedule_job_service::CreateReq { name: req.name, cron_expr: req.cron_expr, enabled: req.enabled, flow_code: req.flow_code }).await?; + Ok(Json(ApiResponse::ok(res))) +} + +#[derive(serde::Deserialize)] +struct UpdateReq { name: Option, cron_expr: Option, enabled: Option, flow_code: Option } + +async fn update(State(db): State, _user: AuthUser, Path(id): Path, Json(req): Json) -> Result>, AppError> { + let res = schedule_job_service::update(&db, &id, schedule_job_service::UpdateReq { name: req.name, cron_expr: req.cron_expr, enabled: req.enabled, flow_code: req.flow_code }).await?; + Ok(Json(ApiResponse::ok(res))) +} + +async fn remove(State(db): State, _user: AuthUser, Path(id): Path) -> Result>, AppError> { + schedule_job_service::remove(&db, &id).await?; + Ok(Json(ApiResponse::ok(serde_json::json!({})))) +} + +// 新增:启用指定任务(不需要请求体) +async fn enable(State(db): State, _user: AuthUser, Path(id): Path) -> Result>, AppError> { + let res = schedule_job_service::update(&db, &id, schedule_job_service::UpdateReq { name: None, cron_expr: None, enabled: Some(true), flow_code: None }).await?; + Ok(Json(ApiResponse::ok(res))) +} + +// 新增:禁用指定任务(不需要请求体) +async fn disable(State(db): State, _user: AuthUser, Path(id): Path) -> Result>, AppError> { + let res = schedule_job_service::update(&db, &id, schedule_job_service::UpdateReq { name: None, cron_expr: None, enabled: Some(false), flow_code: None }).await?; + Ok(Json(ApiResponse::ok(res))) +} + +// 新增:手动执行指定任务 +async fn execute(State(db): State, user: AuthUser, Path(id): Path) -> Result>, AppError> { + // 1) 获取任务信息 + let job = schedule_job::Entity::find_by_id(id.to_string()) + .one(&db) + .await? + .ok_or(AppError::NotFound)?; + + // 2) 通过 flow_code 获取流程 + let flow_doc = flow_service::get_by_code(&db, &job.flow_code).await + .map_err(flow_service::ae)?; + + // 3) 执行流程(使用空输入,操作者为当前用户) + let result = flow_service::run( + &db, + flow_doc.id, + flow_service::RunReq { input: serde_json::json!({}) }, + Some((user.uid, user.username)), + ).await.map_err(flow_service::ae)?; + + Ok(Json(ApiResponse::ok(serde_json::to_value(result).map_err(|e| AppError::BadRequest(e.to_string()))?))) +} \ No newline at end of file diff --git a/backend/src/services/flow_run_log_service.rs b/backend/src/services/flow_run_log_service.rs index d7ebeba..2eb68cb 100644 --- a/backend/src/services/flow_run_log_service.rs +++ b/backend/src/services/flow_run_log_service.rs @@ -6,12 +6,12 @@ use chrono::{DateTime, FixedOffset, Utc}; pub struct PageResp { pub items: Vec, pub total: u64, pub page: u64, pub page_size: u64 } #[derive(serde::Deserialize)] -pub struct ListParams { pub page: Option, pub page_size: Option, pub flow_id: Option, pub flow_code: Option, pub user: Option, pub ok: Option } +pub struct ListParams { pub page: Option, pub page_size: Option, pub flow_id: Option, pub flow_code: Option, pub user: Option, pub ok: Option } #[derive(serde::Serialize)] pub struct RunLogItem { pub id: i64, - pub flow_id: String, + pub flow_id: i64, pub flow_code: Option, pub input: Option, pub output: Option, @@ -30,7 +30,7 @@ impl From for RunLogItem { #[derive(serde::Serialize, serde::Deserialize, Clone, Debug)] pub struct CreateRunLogInput { - pub flow_id: String, + pub flow_id: i64, pub flow_code: Option, pub input: Option, pub output: Option, diff --git a/backend/src/services/flow_service.rs b/backend/src/services/flow_service.rs index 131f968..cb81405 100644 --- a/backend/src/services/flow_service.rs +++ b/backend/src/services/flow_service.rs @@ -17,7 +17,7 @@ use crate::flow::engine::DriveError; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct FlowSummary { - pub id: String, + pub id: i64, pub name: String, #[serde(skip_serializing_if = "Option::is_none")] pub code: Option, #[serde(skip_serializing_if = "Option::is_none")] pub remark: Option, @@ -27,7 +27,7 @@ pub struct FlowSummary { } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct FlowDoc { - pub id: String, + pub id: i64, pub yaml: String, #[serde(skip_serializing_if = "Option::is_none")] pub design_json: Option, #[serde(skip_serializing_if = "Option::is_none")] pub name: Option, @@ -51,9 +51,15 @@ pub async fn list(db: &Db, page: u64, page_size: u64, keyword: Option) - let mut selector = db_flow::Entity::find(); if let Some(k) = keyword.filter(|s| !s.is_empty()) { let like = format!("%{}%", k); + // 名称模糊匹配 + 若关键字可解析为数字则按ID精确匹配 selector = selector.filter( db_flow::Column::Name.like(like.clone()) - .or(db_flow::Column::Id.like(like)) + .or( + match k.parse::() { + Ok(num) => db_flow::Column::Id.eq(num), + Err(_) => db_flow::Column::Name.like(like), + } + ) ); } let paginator = selector.order_by_desc(db_flow::Column::CreatedAt).paginate(db, page_size); @@ -61,13 +67,13 @@ pub async fn list(db: &Db, page: u64, page_size: u64, keyword: Option) - let models = paginator.fetch_page(if page > 0 { page - 1 } else { 0 }).await?; let mut items: Vec = Vec::with_capacity(models.len()); for row in models.into_iter() { - let id = row.id.clone(); + let id = row.id; let name = row .name .clone() .or_else(|| row.yaml.as_deref().and_then(extract_name)) .unwrap_or_else(|| { - let prefix: String = id.chars().take(8).collect(); + let prefix: String = id.to_string().chars().take(8).collect(); format!("flow_{}", prefix) }); // 最近修改人:从请求日志中查找最近一次对该flow的PUT请求 @@ -98,7 +104,7 @@ pub async fn create(db: &Db, req: FlowCreateReq) -> anyhow::Result { let _parsed: FlowDSL = serde_yaml::from_str(yaml).context("invalid flow yaml")?; info!(target: "udmin", "flow.create: yaml parsed ok"); } - let id = crate::utils::generate_flow_id(); + let id: i64 = crate::utils::generate_id(); let name = req .name .clone() @@ -110,7 +116,7 @@ pub async fn create(db: &Db, req: FlowCreateReq) -> anyhow::Result { let ret_code = req.code.clone(); let ret_remark = req.remark.clone(); let am = db_flow::ActiveModel { - id: Set(id.clone()), + id: Set(id), name: Set(name.clone()), yaml: Set(req.yaml.clone()), design_json: Set(design_json_str), @@ -122,16 +128,14 @@ pub async fn create(db: &Db, req: FlowCreateReq) -> anyhow::Result { ..Default::default() }; info!(target: "udmin", "flow.create: inserting into db id={}", id); - // Use exec() instead of insert() returning Model to avoid RecordNotInserted on non-AI PK match db_flow::Entity::insert(am).exec(db).await { Ok(_) => { info!(target: "udmin", "flow.create: insert ok id={}", id); Ok(FlowDoc { id, yaml: req.yaml.unwrap_or_default(), design_json: req.design_json, name: ret_name, code: ret_code, remark: ret_remark }) } Err(DbErr::RecordNotInserted) => { - // Workaround for MySQL + non-auto-increment PK: verify by reading back error!(target: "udmin", "flow.create: insert returned RecordNotInserted, verifying by select id={}", id); - match db_flow::Entity::find_by_id(id.clone()).one(db).await { + match db_flow::Entity::find_by_id(id).one(db).await { Ok(Some(_)) => { info!(target: "udmin", "flow.create: found inserted row by id={}, treating as success", id); Ok(FlowDoc { id, yaml: req.yaml.unwrap_or_default(), design_json: req.design_json, name, code: req.code, remark: req.remark }) @@ -147,12 +151,11 @@ pub async fn create(db: &Db, req: FlowCreateReq) -> anyhow::Result { } } -pub async fn get(db: &Db, id: &str) -> anyhow::Result { - let row = db_flow::Entity::find_by_id(id.to_string()).one(db).await?; +pub async fn get(db: &Db, id: i64) -> anyhow::Result { + let row = db_flow::Entity::find_by_id(id).one(db).await?; let row = row.ok_or_else(|| anyhow::anyhow!("not found"))?; let yaml = row.yaml.unwrap_or_default(); let design_json = row.design_json.and_then(|s| serde_json::from_str::(&s).ok()); - // 名称兜底:数据库 name 为空时,尝试从 YAML 提取 let name = row .name .clone() @@ -176,11 +179,11 @@ pub async fn get_by_code(db: &Db, code: &str) -> anyhow::Result { Ok(FlowDoc { id: row.id, yaml, design_json, name, code: row.code, remark: row.remark }) } -pub async fn update(db: &Db, id: &str, req: FlowUpdateReq) -> anyhow::Result { +pub async fn update(db: &Db, id: i64, req: FlowUpdateReq) -> anyhow::Result { if let Some(yaml) = &req.yaml { let _parsed: FlowDSL = serde_yaml::from_str(yaml).context("invalid flow yaml")?; } - let row = db_flow::Entity::find_by_id(id.to_string()).one(db).await?; + let row = db_flow::Entity::find_by_id(id).one(db).await?; let Some(row) = row else { return Err(anyhow::anyhow!("not found")); }; let mut am: db_flow::ActiveModel = row.into(); @@ -192,45 +195,36 @@ pub async fn update(db: &Db, id: &str, req: FlowUpdateReq) -> anyhow::Result(&s).ok()); - Ok(FlowDoc { id: id.to_string(), yaml: got.yaml.unwrap_or_default(), design_json: dj, name: got.name, code: got.code, remark: got.remark }) + Ok(FlowDoc { id, yaml: got.yaml.unwrap_or_default(), design_json: dj, name: got.name, code: got.code, remark: got.remark }) } -pub async fn delete(db: &Db, id: &str) -> anyhow::Result<()> { - let row = db_flow::Entity::find_by_id(id.to_string()).one(db).await?; +pub async fn delete(db: &Db, id: i64) -> anyhow::Result<()> { + let row = db_flow::Entity::find_by_id(id).one(db).await?; let Some(row) = row else { return Err(anyhow::anyhow!("not found")); }; let am: db_flow::ActiveModel = row.into(); am.delete(db).await?; Ok(()) } -pub async fn run(db: &Db, id: &str, req: RunReq, operator: Option<(i64, String)>) -> anyhow::Result { +pub async fn run(db: &Db, id: i64, req: RunReq, operator: Option<(i64, String)>) -> anyhow::Result { let log_handler = DatabaseLogHandler::new(db.clone()); match run_internal(db, id, req, operator, &log_handler, None).await { Ok((ctx, logs)) => Ok(RunResult { ok: true, ctx, logs }), Err(e) => { - // 将运行期错误转换为 ok=false,并尽量带上部分 ctx/logs if let Some(de) = e.downcast_ref::().cloned() { Ok(RunResult { ok: false, ctx: de.ctx, logs: de.logs }) } else { let mut full = e.to_string(); - for cause in e.chain().skip(1) { - full.push_str(" | "); - full.push_str(&cause.to_string()); - } + for cause in e.chain().skip(1) { full.push_str(" | "); full.push_str(&cause.to_string()); } Ok(RunResult { ok: false, ctx: serde_json::json!({}), logs: vec![full] }) } } @@ -240,18 +234,16 @@ pub async fn run(db: &Db, id: &str, req: RunReq, operator: Option<(i64, String)> // 新增:流式运行,向外发送节点事件与最终完成事件 pub async fn run_with_stream( db: Db, - id: &str, + id: i64, req: RunReq, operator: Option<(i64, String)>, event_tx: Sender, ) -> anyhow::Result<()> { - // clone 一份用于错误时补发 done let tx_done = event_tx.clone(); let log_handler = SseLogHandler::new(db.clone(), event_tx.clone()); match run_internal(&db, id, req, operator, &log_handler, Some(event_tx)).await { - Ok((_ctx, _logs)) => Ok(()), // 正常路径:log_success 内已发送 done(true,...) + Ok((_ctx, _logs)) => Ok(()), Err(e) => { - // 错误路径:先在 log_error 中已发送 error 事件;此处补发 done(false,...) if let Some(de) = e.downcast_ref::().cloned() { crate::middlewares::sse::emit_done(&tx_done, false, de.ctx, de.logs).await; } else { @@ -267,23 +259,17 @@ pub async fn run_with_stream( // 内部统一的运行方法 async fn run_internal( db: &Db, - id: &str, + id: i64, req: RunReq, operator: Option<(i64, String)>, log_handler: &dyn FlowLogHandler, - event_tx: Option>, + event_tx: Option>, ) -> anyhow::Result<(serde_json::Value, Vec)> { - // 使用传入的 event_tx(当启用 SSE 时由路由层提供) info!(target = "udmin", "flow.run_internal: start id={}", id); let start = Utc::now().with_timezone(&FixedOffset::east_opt(0).unwrap()); - - // 获取流程编码 - let flow_code: Option = match db_flow::Entity::find_by_id(id.to_string()).one(db).await { - Ok(Some(row)) => row.code, - _ => None, - }; - // 获取流程文档 + let flow_code: Option = match db_flow::Entity::find_by_id(id).one(db).await { Ok(Some(row)) => row.code, _ => None }; + let doc = match get(db, id).await { Ok(d) => d, Err(e) => { @@ -380,7 +366,6 @@ async fn run_internal( Err(e) => { error!(target = "udmin", error = ?e, "flow.run_internal: engine drive failed id={}", id); let dur = (Utc::now().with_timezone(&FixedOffset::east_opt(0).unwrap()) - start).num_milliseconds() as i64; - // 优先记录详细错误(包含部分 ctx 与累计 logs) if let Some(de) = e.downcast_ref::().cloned() { log_handler .log_error_detail( diff --git a/backend/src/services/mod.rs b/backend/src/services/mod.rs index 107a698..2bba363 100644 --- a/backend/src/services/mod.rs +++ b/backend/src/services/mod.rs @@ -3,8 +3,8 @@ pub mod user_service; pub mod role_service; pub mod menu_service; pub mod department_service; -pub mod log_service; -// 新增岗位服务 pub mod position_service; +pub mod log_service; pub mod flow_service; -pub mod flow_run_log_service; \ No newline at end of file +pub mod flow_run_log_service; +pub mod schedule_job_service; \ No newline at end of file diff --git a/backend/src/services/schedule_job_service.rs b/backend/src/services/schedule_job_service.rs new file mode 100644 index 0000000..0094861 --- /dev/null +++ b/backend/src/services/schedule_job_service.rs @@ -0,0 +1,315 @@ +//! 模块:定时任务服务(Service Layer) +//! 职责: +//! 1) 负责定时任务(schedule_jobs)的数据库增删改查; +//! 2) 在创建/更新/删除后,与调度器进行同步(注册、更新、移除); +//! 3) 服务启动时加载已启用任务并注册到调度器; +//! 4) 构建任务执行闭包(JobExecutor),内部做运行期防御与流程触发。 + +use std::{future::Future, pin::Pin, sync::Arc}; + +use chrono::{DateTime, FixedOffset, Utc}; +use sea_orm::{ + ActiveModelTrait, ColumnTrait, EntityTrait, PaginatorTrait, QueryFilter, QueryOrder, Set, +}; +use tokio_cron_scheduler::Job; +use tracing::{error, info}; + +use crate::{ + db::Db, + error::AppError, + models::schedule_job, + services::flow_service, + utils, +}; + +/// 通用分页响应体 +#[derive(serde::Serialize)] +pub struct PageResp { + pub items: Vec, + pub total: u64, + pub page: u64, + pub page_size: u64, +} + +/// 任务文档(DTO) +#[derive(serde::Deserialize, serde::Serialize, Clone, Debug)] +pub struct ScheduleJobDoc { + pub id: String, + pub name: String, + pub cron_expr: String, + pub enabled: bool, + pub flow_code: String, + pub created_at: DateTime, + pub updated_at: DateTime, +} + +impl From for ScheduleJobDoc { + fn from(m: schedule_job::Model) -> Self { + Self { + id: m.id, + name: m.name, + cron_expr: m.cron_expr, + enabled: m.enabled, + flow_code: m.flow_code, + created_at: m.created_at, + updated_at: m.updated_at, + } + } +} + +/// 列表查询参数 +#[derive(serde::Deserialize)] +pub struct ListParams { + pub page: Option, + pub page_size: Option, + pub keyword: Option, + pub enabled: Option, +} + +/// 创建任务请求体 +#[derive(serde::Deserialize)] +pub struct CreateReq { + pub name: String, + pub cron_expr: String, + pub enabled: bool, + pub flow_code: String, +} + +/// 更新任务请求体(部分字段可选) +#[derive(serde::Deserialize)] +pub struct UpdateReq { + pub name: Option, + pub cron_expr: Option, + pub enabled: Option, + pub flow_code: Option, +} + +/// 获取当前 UTC 时间并转为固定偏移(避免多处重复) +fn now_fixed_offset() -> DateTime { + Utc::now().with_timezone(&FixedOffset::east_opt(0).unwrap()) +} + +/// 分页查询任务列表,支持按名称关键字与启用状态筛选 +pub async fn list(db: &Db, p: ListParams) -> Result, AppError> { + let page = p.page.unwrap_or(1); + let page_size = p.page_size.unwrap_or(10); + + let mut query = schedule_job::Entity::find(); + if let Some(k) = p.keyword { + query = query.filter(schedule_job::Column::Name.contains(&k)); + } + if let Some(e) = p.enabled { + query = query.filter(schedule_job::Column::Enabled.eq(e)); + } + + let paginator = query + .order_by_desc(schedule_job::Column::UpdatedAt) + .paginate(db, page_size); + + let total = paginator.num_items().await?; + let docs: Vec = paginator + .fetch_page(page.saturating_sub(1)) + .await? + .into_iter() + .map(Into::into) + .collect(); + + let sample: Vec<_> = docs.iter().take(5).map(|d| (d.id.clone(), d.enabled)).collect(); + info!( + target = "udmin", + total = %total, + page = %page, + page_size = %page_size, + sample = ?sample, + "schedule_jobs.list" + ); + + Ok(PageResp { items: docs, total, page, page_size }) +} + +/// 创建任务 +pub async fn create(db: &Db, req: CreateReq) -> Result { + // 1) 校验 cron 表达式 + Job::new_async(&req.cron_expr, |_uuid, _l| Box::pin(async {})) + .map_err(|e| AppError::BadRequest(format!("无效的 cron 表达式: {e}")))?; + + // 2) 校验 flow_code 唯一启用性 + if req.enabled + && schedule_job::Entity::find() + .filter(schedule_job::Column::FlowCode.eq(req.flow_code.clone())) + .filter(schedule_job::Column::Enabled.eq(true)) + .one(db) + .await? + .is_some() + { + return Err(AppError::Conflict("同一 flow_code 已存在启用中的任务".into())); + } + + // 3) 入库 + let am = schedule_job::ActiveModel { + id: Set(uuid::Uuid::new_v4().to_string()), + name: Set(req.name), + cron_expr: Set(req.cron_expr), + enabled: Set(req.enabled), + flow_code: Set(req.flow_code), + created_at: Set(now_fixed_offset()), + updated_at: Set(now_fixed_offset()), + }; + let m = am.insert(db).await?; + + // 4) 同步调度器 + let executor = build_executor_for_job(db, &m); + utils::add_or_update_job_by_model(&m, executor).await.map_err(AppError::Anyhow)?; + + Ok(m.into()) +} + +/// 更新任务 +pub async fn update(db: &Db, id: &str, req: UpdateReq) -> Result { + let m = schedule_job::Entity::find_by_id(id.to_string()) + .one(db) + .await? + .ok_or(AppError::NotFound)?; + + let next_name = req.name.unwrap_or_else(|| m.name.clone()); + let next_cron = req.cron_expr.unwrap_or_else(|| m.cron_expr.clone()); + let next_enabled = req.enabled.unwrap_or(m.enabled); + let next_flow_code = req.flow_code.unwrap_or_else(|| m.flow_code.clone()); + + Job::new_async(&next_cron, |_uuid, _l| Box::pin(async {})) + .map_err(|e| AppError::BadRequest(format!("无效的 cron 表达式: {e}")))?; + + if next_enabled + && schedule_job::Entity::find() + .filter(schedule_job::Column::FlowCode.eq(next_flow_code.clone())) + .filter(schedule_job::Column::Enabled.eq(true)) + .filter(schedule_job::Column::Id.ne(id.to_string())) + .one(db) + .await? + .is_some() + { + return Err(AppError::Conflict("同一 flow_code 已存在启用中的任务".into())); + } + + info!( + target = "udmin", + id = %m.id, + prev_enabled = %m.enabled, + next_enabled = %next_enabled, + "schedule_jobs.update.apply" + ); + + let mut am: schedule_job::ActiveModel = m.into(); + am.name = Set(next_name); + am.cron_expr = Set(next_cron); + am.enabled = Set(next_enabled); + am.flow_code = Set(next_flow_code); + am.updated_at = Set(now_fixed_offset()); + let updated = am.update(db).await?; + + info!( + target = "udmin", + id = %updated.id, + enabled = %updated.enabled, + "schedule_jobs.update.persisted" + ); + + let executor = build_executor_for_job(db, &updated); + utils::add_or_update_job_by_model(&updated, executor).await.map_err(AppError::Anyhow)?; + + Ok(updated.into()) +} + +/// 删除任务 +pub async fn remove(db: &Db, id: &str) -> Result<(), AppError> { + utils::remove_job_by_id(id).await.map_err(AppError::Anyhow)?; + + let res = schedule_job::Entity::delete_by_id(id.to_string()).exec(db).await?; + if res.rows_affected == 0 { + return Err(AppError::NotFound); + } + Ok(()) +} + +/// 启动时加载并注册所有启用任务 +pub async fn init_load_enabled_and_register(db: &Db) -> Result<(), AppError> { + let enabled_jobs = schedule_job::Entity::find() + .filter(schedule_job::Column::Enabled.eq(true)) + .all(db) + .await?; + + info!( + target = "udmin", + count = enabled_jobs.len(), + "schedule_jobs.init.load_enabled" + ); + + for m in enabled_jobs { + let executor = build_executor_for_job(db, &m); + if let Err(e) = utils::add_or_update_job_by_model(&m, executor).await { + error!( + target = "udmin", + id = %m.id, + error = %e, + "schedule_jobs.init.add_failed" + ); + } + } + Ok(()) +} + +/// 构建任务执行闭包(JobExecutor) +fn build_executor_for_job(db: &Db, m: &schedule_job::Model) -> utils::JobExecutor { + let db = db.clone(); + let job_id = m.id.clone(); + let job_name = m.name.clone(); + let flow_code = m.flow_code.clone(); + + Arc::new(move || { + let db = db.clone(); + let job_id = job_id.clone(); + let job_name = job_name.clone(); + let flow_code = flow_code.clone(); + + Box::pin(async move { + // 运行期防御 + match schedule_job::Entity::find_by_id(job_id.clone()).one(&db).await { + Ok(Some(model)) if !model.enabled => { + info!(target = "udmin", job = %job_name, id = %job_id, "scheduler.tick.skip_disabled"); + return; + } + Ok(None) => { + info!(target = "udmin", job = %job_name, id = %job_id, "scheduler.tick.deleted_remove_self"); + if let Err(e) = utils::remove_job_by_id(&job_id).await { + error!(target = "udmin", id = %job_id, error = %e, "scheduler.self_remove.failed"); + } + return; + } + Err(e) => { + error!(target = "udmin", job = %job_name, id = %job_id, error = %e, "scheduler.tick.check_failed"); + return; + } + _ => {} + } + + // 触发流程执行 + info!(target = "udmin", job = %job_name, flow_code = %flow_code, "scheduler.tick.start"); + match flow_service::get_by_code(&db, &flow_code).await { + Ok(doc) => { + let id = doc.id.clone(); + if let Err(e) = flow_service::run( + &db, + id, + flow_service::RunReq { input: serde_json::json!({}) }, + Some((0, "调度".to_string())), + ).await { + error!(target = "udmin", job = %job_name, flow_code = %flow_code, error = %e, "scheduler.tick.run_failed"); + } + } + Err(e) => { + error!(target = "udmin", job = %job_name, flow_code = %flow_code, error = %e, "scheduler.tick.flow_not_found"); + } + } + }) as Pin + Send>> + }) +} \ No newline at end of file diff --git a/backend/src/utils/ids.rs b/backend/src/utils/ids.rs index dc603cc..7806d5f 100644 --- a/backend/src/utils/ids.rs +++ b/backend/src/utils/ids.rs @@ -56,15 +56,13 @@ pub fn parse_biz_id(id: i64) -> (u16, u8, i64) { (main_id, sub_id, base_id) } -// --- 具体业务场景:Flow 使用的一组常量(可按需扩展/调整) --- -// 你可以把这些常量提到配置或用枚举维护各业务的 main/sub 编码 +// --- 具体业务场景:main/sub 为 1/1 的通用 ID 场景 --- const FLOW_MAIN_ID: u16 = 1; const FLOW_SUB_ID: u8 = 1; -/// 生成 Flow 的 ID,返回十进制字符串,便于与原先 string 类型主键兼容 -pub fn generate_flow_id() -> String { - let id = generate_biz_id(BizIdConfig::new(FLOW_MAIN_ID, FLOW_SUB_ID)); - id.to_string() +/// 通用 ID 生成:main_id=1、sub_id=1,返回十进制字符串(与原先 string 类型主键兼容) +pub fn generate_id() -> i64 { + generate_biz_id(BizIdConfig::new(FLOW_MAIN_ID, FLOW_SUB_ID)) } // --- 日志类 ID 的业务位定义与生成 --- diff --git a/backend/src/utils/mod.rs b/backend/src/utils/mod.rs index a050448..e4c6da8 100644 --- a/backend/src/utils/mod.rs +++ b/backend/src/utils/mod.rs @@ -1,4 +1,6 @@ pub mod password; pub mod ids; +pub mod scheduler; -pub use ids::{init_from_env, generate_biz_id, parse_biz_id, generate_flow_id, BizIdConfig, generate_flow_run_log_id, generate_request_log_id}; \ No newline at end of file +pub use ids::{init_from_env, generate_biz_id, parse_biz_id, generate_id, BizIdConfig, generate_flow_run_log_id, generate_request_log_id}; +pub use scheduler::{init_and_start as init_scheduler, add_or_update_job_by_model, remove_job_by_id, JobExecutor}; \ No newline at end of file diff --git a/backend/src/utils/scheduler.rs b/backend/src/utils/scheduler.rs new file mode 100644 index 0000000..4e6bed3 --- /dev/null +++ b/backend/src/utils/scheduler.rs @@ -0,0 +1,99 @@ +use std::collections::HashMap; +use std::future::Future; +use std::pin::Pin; +use std::sync::Arc; +use once_cell::sync::OnceCell; +use tokio::sync::Mutex; +use tokio_cron_scheduler::{JobScheduler, Job}; +use tracing::{error, info}; +use uuid::Uuid; + +use crate::models::schedule_job; + +static SCHEDULER: OnceCell> = OnceCell::new(); +static JOB_GUIDS: OnceCell>> = OnceCell::new(); + +pub type JobExecutor = Arc Pin + Send>> + Send + Sync>; + +fn scheduler() -> &'static Mutex { + SCHEDULER + .get() + .expect("Scheduler not initialized. Call init_and_start() early in main.") +} + +fn job_guids() -> &'static Mutex> { + JOB_GUIDS + .get() + .expect("JOB_GUIDS not initialized. Call init_and_start() early in main.") +} + +pub async fn init_and_start() -> anyhow::Result<()> { + if SCHEDULER.get().is_some() { + return Ok(()); + } + let js = JobScheduler::new().await?; + SCHEDULER.set(Mutex::new(js)).ok().expect("set scheduler once"); + JOB_GUIDS.set(Mutex::new(HashMap::new())).ok().expect("set job_guids once"); + + // 仅启动调度器,不进行任何数据库加载,数据库加载应由 service 层负责 + let lock = scheduler().lock().await; + lock.start().await?; + drop(lock); + info!(target = "udmin", "scheduler started"); + Ok(()) +} + +/// 新增或更新一个任务(根据 model.id 作为唯一标识)。 +/// - 如果已存在,则先移除旧任务再按最新 cron 重新创建; +/// - 当 enabled=false 时,仅执行移除逻辑,不会重新添加。 +pub async fn add_or_update_job_by_model(m: &schedule_job::Model, executor: JobExecutor) -> anyhow::Result<()> { + // 如果已有旧的 job,先移除 + if let Some(old) = { job_guids().lock().await.get(&m.id).cloned() } { + let js = scheduler().lock().await; + if let Err(e) = js.remove(&old).await { + error!(target = "udmin", id = %m.id, guid = %old, error = %e, "scheduler.remove old failed"); + } + job_guids().lock().await.remove(&m.id); + } + + if !m.enabled { + return Ok(()); + } + + // 构造异步 Job(仅负责调度触发,不做数据库操作) + let cron_expr = m.cron_expr.clone(); + let name = m.name.clone(); + let job_id = m.id.clone(); + let exec_arc = executor.clone(); + + let job = Job::new_async(cron_expr.as_str(), move |_uuid, _l| { + let job_name = name.clone(); + let job_id = job_id.clone(); + let exec = exec_arc.clone(); + Box::pin(async move { + info!(target = "udmin", job = %job_name, id = %job_id, "scheduler.tick: start"); + exec().await; + }) + })?; + + let guid = job.guid(); + { + let js = scheduler().lock().await; + js.add(job).await?; + } + job_guids().lock().await.insert(m.id.clone(), guid); + info!(target = "udmin", id = %m.id, guid = %guid, "scheduler.add: ok"); + Ok(()) +} + +pub async fn remove_job_by_id(id: &str) -> anyhow::Result<()> { + if let Some(g) = { job_guids().lock().await.get(id).cloned() } { + let js = scheduler().lock().await; + if let Err(e) = js.remove(&g).await { + error!(target = "udmin", id = %id, guid = %g, error = %e, "scheduler.remove: failed"); + } + job_guids().lock().await.remove(id); + info!(target = "udmin", id = %id, guid = %g, "scheduler.remove: ok"); + } + Ok(()) +} \ No newline at end of file diff --git a/frontend/src/App.tsx b/frontend/src/App.tsx index 88515f3..9e65792 100644 --- a/frontend/src/App.tsx +++ b/frontend/src/App.tsx @@ -16,6 +16,7 @@ import FlowList from './pages/FlowList' // 引入流程编辑器 import { Flows } from './flows' import FlowRunLogs from './pages/FlowRunLogs' +import ScheduleJobs from './pages/ScheduleJobs' function RequireAuth({ children }: { children: any }) { const token = getToken() @@ -43,6 +44,8 @@ export default function App() { } /> {/* 流程运行日志 */} } /> + {/* 调度任务管理 */} + } /> } /> diff --git a/frontend/src/layouts/MainLayout.tsx b/frontend/src/layouts/MainLayout.tsx index 3a55a0c..b83c36e 100644 --- a/frontend/src/layouts/MainLayout.tsx +++ b/frontend/src/layouts/MainLayout.tsx @@ -651,4 +651,5 @@ export default function MainLayout() { ) - } \ No newline at end of file + } + // 说明:菜单完全依赖后端返回的路径,若需要本地添加“调度任务管理”菜单,请在后端创建菜单项 path: '/schedule-jobs',前端会自动展示。 \ No newline at end of file diff --git a/frontend/src/pages/ScheduleJobs.tsx b/frontend/src/pages/ScheduleJobs.tsx new file mode 100644 index 0000000..952be59 --- /dev/null +++ b/frontend/src/pages/ScheduleJobs.tsx @@ -0,0 +1,282 @@ +import React, { useEffect, useMemo, useState } from 'react' +import { Button, Form, Input, Modal, Popconfirm, Select, Space, Switch, Table, Tag, message } from 'antd' +import type { ColumnsType } from 'antd/es/table' +import api from '../utils/axios' +import { formatDateTime } from '../utils/datetime' +import PageHeader from '../components/PageHeader' +import { EditOutlined, DeleteOutlined } from '@ant-design/icons' +import { PlusOutlined, ReloadOutlined, StopOutlined, CheckCircleOutlined, PlayCircleOutlined } from '@ant-design/icons' + +interface ScheduleJobItem { + id: string + name: string + cron_expr: string + enabled: boolean + flow_code: string + created_at?: string + updated_at?: string +} + +interface PageResp { items: T[]; total: number; page: number; page_size: number } + +interface FlowOption { label: string; value: string } + +export default function ScheduleJobs() { + const [loading, setLoading] = useState(false) + const [data, setData] = useState([]) + const [total, setTotal] = useState(0) + const [page, setPage] = useState(1) + const [pageSize, setPageSize] = useState(10) + const [keyword, setKeyword] = useState('') + const [enabledFilter, setEnabledFilter] = useState<'all' | 'true' | 'false'>('all') + + const [modalOpen, setModalOpen] = useState(false) + const [editing, setEditing] = useState(null) + const [form] = Form.useForm() + const [flowOptions, setFlowOptions] = useState([]) + + const fetchJobs = async (p: number = page, ps: number = pageSize, kw: string = keyword, ef: 'all' | 'true' | 'false' = enabledFilter) => { + setLoading(true) + try { + const params: any = { page: p, page_size: ps, keyword: kw } + if (ef !== 'all') params.enabled = ef === 'true' + const { data } = await api.get('/schedule_jobs', { params }) + if (data?.code === 0) { + const resp = data.data as PageResp + setData(Array.isArray(resp.items) ? resp.items : []) + setTotal(Number(resp.total || 0)) + setPage(Number(resp.page || p)) + setPageSize(Number(resp.page_size || ps)) + } else { + throw new Error(data?.message || '获取任务列表失败') + } + } catch (e: any) { + message.error(e.message || '获取任务列表失败') + } finally { setLoading(false) } + } + + const fetchFlowOptions = async () => { + try { + const { data } = await api.get('/flows', { params: { page: 1, page_size: 1000 } }) + if (data?.code === 0) { + const items = (data.data?.items || []) as any[] + setFlowOptions(items.map(it => ({ label: `${it.name || it.code} (${it.code})`, value: it.code }))) + } + } catch (e) { + // ignore silently + } + } + + useEffect(() => { fetchJobs(1, pageSize, keyword, enabledFilter) }, []) + + const columns: ColumnsType = useMemo(() => [ + { title: '名称', dataIndex: 'name', key: 'name' }, + { title: '流程编码', dataIndex: 'flow_code', key: 'flow_code', render: (v: string) => {v} }, + { title: 'Cron 表达式', dataIndex: 'cron_expr', key: 'cron_expr', render: (v: string) => {v} }, + { title: '状态', dataIndex: 'enabled', key: 'enabled', render: (v: boolean, r) => ( + + {v ? '启用' : '禁用'} + + ) }, + { title: '创建时间', dataIndex: 'created_at', key: 'created_at', render: (v?: string) => v ? formatDateTime(v) : '-' }, + { title: '更新时间', dataIndex: 'updated_at', key: 'updated_at', render: (v?: string) => v ? formatDateTime(v) : '-' }, + { title: '操作', key: 'actions', render: (_: any, record) => ( + + + handleDelete(record)}> + + + 删除 + + + handleExecute(record)}> + + 执行 + + {record.enabled ? ( + handleToggle(record, false)}> + + 禁用 + + ) : ( + handleToggle(record, true)}> + + 启用 + + )} + + ) }, + ], [data]) + + const openCreate = async () => { + setEditing(null) + form.resetFields() + await fetchFlowOptions() + setModalOpen(true) + } + + const openEdit = async (record: ScheduleJobItem) => { + setEditing(record) + await fetchFlowOptions() + form.setFieldsValue({ + name: record.name, + flow_code: record.flow_code, + cron_expr: record.cron_expr, + enabled: record.enabled, + }) + setModalOpen(true) + } + + const handleSubmit = async () => { + try { + const values = await form.validateFields() + if (editing) { + const { data } = await api.put(`/schedule_jobs/${editing.id}`, values) + if (data?.code === 0) { + message.success('更新成功') + setModalOpen(false) + fetchJobs() + } else { + throw new Error(data?.message || '更新失败') + } + } else { + const { data } = await api.post('/schedule_jobs', values) + if (data?.code === 0) { + message.success('创建成功') + setModalOpen(false) + fetchJobs(1, pageSize, keyword, enabledFilter) + } else { + throw new Error(data?.message || '创建失败') + } + } + } catch (e: any) { + if (e?.errorFields) return // 表单校验错误 + message.error(e.message || '保存失败') + } + } + + const handleDelete = async (record: ScheduleJobItem) => { + try { + const { data } = await api.delete(`/schedule_jobs/${record.id}`) + if (data?.code === 0) { + message.success('删除成功') + const nextPage = data?.data?.deleted ? (data?.data?.remaining === 0 && page > 1 ? page - 1 : page) : page + fetchJobs(nextPage, pageSize, keyword, enabledFilter) + } else { + throw new Error(data?.message || '删除失败') + } + } catch (e: any) { + message.error(e.message || '删除失败') + } + } + + const handleToggle = async (record: ScheduleJobItem, next: boolean) => { + try { + const url = next ? `/schedule_jobs/${record.id}/enable` : `/schedule_jobs/${record.id}/disable` + const { data } = await api.post(url) + if (data?.code === 0) { + message.success(next ? '已启用' : '已禁用') + // 就地更新行 + setData(prev => prev.map(it => it.id === record.id ? { ...it, enabled: next } : it)) + } else { + throw new Error(data?.message || '操作失败') + } + } catch (e: any) { + message.error(e.message || '操作失败') + } + } + + const handleExecute = async (record: ScheduleJobItem) => { + try { + const { data } = await api.post(`/schedule_jobs/${record.id}/execute`) + if (data?.code === 0) { + message.success('执行成功') + // 可以在这里显示执行结果或跳转到日志页面 + console.log('执行结果:', data.data) + } else { + throw new Error(data?.message || '执行失败') + } + } catch (e: any) { + message.error(e.message || '执行失败') + } + } + + const handleSearch = () => { + fetchJobs(1, pageSize, keyword, enabledFilter) + } + + return ( +
+ +
+ + setKeyword(e.target.value)} onSearch={handleSearch} style={{ width: 280 }} /> + + + + + + + + + + +
+ ) +} \ No newline at end of file diff --git a/frontend/tsconfig.tsbuildinfo b/frontend/tsconfig.tsbuildinfo index 4e74fab..4e47b86 100644 --- a/frontend/tsconfig.tsbuildinfo +++ b/frontend/tsconfig.tsbuildinfo @@ -1 +1 @@ -{"root":["./src/app.tsx","./src/main.tsx","./src/vite-env.d.ts","./src/components/pageheader.tsx","./src/flows/app.tsx","./src/flows/editor.tsx","./src/flows/index.ts","./src/flows/initial-data.ts","./src/flows/type.d.ts","./src/flows/assets/icon-auto-layout.tsx","./src/flows/assets/icon-cancel.tsx","./src/flows/assets/icon-comment.tsx","./src/flows/assets/icon-minimap.tsx","./src/flows/assets/icon-mouse.tsx","./src/flows/assets/icon-pad.tsx","./src/flows/assets/icon-success.tsx","./src/flows/assets/icon-switch-line.tsx","./src/flows/assets/icon-warning.tsx","./src/flows/components/index.ts","./src/flows/components/add-node/index.tsx","./src/flows/components/add-node/use-add-node.ts","./src/flows/components/base-node/index.tsx","./src/flows/components/base-node/node-wrapper.tsx","./src/flows/components/base-node/styles.tsx","./src/flows/components/base-node/utils.ts","./src/flows/components/comment/constant.ts","./src/flows/components/comment/index.ts","./src/flows/components/comment/model.ts","./src/flows/components/comment/type.ts","./src/flows/components/comment/components/blank-area.tsx","./src/flows/components/comment/components/border-area.tsx","./src/flows/components/comment/components/container.tsx","./src/flows/components/comment/components/content-drag-area.tsx","./src/flows/components/comment/components/drag-area.tsx","./src/flows/components/comment/components/editor.tsx","./src/flows/components/comment/components/index.ts","./src/flows/components/comment/components/more-button.tsx","./src/flows/components/comment/components/render.tsx","./src/flows/components/comment/components/resize-area.tsx","./src/flows/components/comment/hooks/index.ts","./src/flows/components/comment/hooks/use-model.ts","./src/flows/components/comment/hooks/use-overflow.ts","./src/flows/components/comment/hooks/use-size.ts","./src/flows/components/group/color.ts","./src/flows/components/group/constant.ts","./src/flows/components/group/index.ts","./src/flows/components/group/components/background.tsx","./src/flows/components/group/components/color.tsx","./src/flows/components/group/components/header.tsx","./src/flows/components/group/components/icon-group.tsx","./src/flows/components/group/components/index.ts","./src/flows/components/group/components/node-render.tsx","./src/flows/components/group/components/title.tsx","./src/flows/components/group/components/tools.tsx","./src/flows/components/group/components/ungroup.tsx","./src/flows/components/group/components/tips/global-store.ts","./src/flows/components/group/components/tips/icon-close.tsx","./src/flows/components/group/components/tips/index.tsx","./src/flows/components/group/components/tips/is-mac-os.ts","./src/flows/components/group/components/tips/style.ts","./src/flows/components/group/components/tips/use-control.ts","./src/flows/components/line-add-button/button.tsx","./src/flows/components/line-add-button/index.tsx","./src/flows/components/line-add-button/use-visible.ts","./src/flows/components/node-menu/index.tsx","./src/flows/components/node-panel/index.tsx","./src/flows/components/node-panel/node-list.tsx","./src/flows/components/node-panel/node-placeholder.tsx","./src/flows/components/selector-box-popover/index.tsx","./src/flows/components/sidebar/index.tsx","./src/flows/components/sidebar/sidebar-node-renderer.tsx","./src/flows/components/sidebar/sidebar-provider.tsx","./src/flows/components/sidebar/sidebar-renderer.tsx","./src/flows/components/testrun/hooks/index.ts","./src/flows/components/testrun/hooks/use-fields.ts","./src/flows/components/testrun/hooks/use-form-meta.ts","./src/flows/components/testrun/hooks/use-sync-default.ts","./src/flows/components/testrun/node-status-bar/index.tsx","./src/flows/components/testrun/node-status-bar/group/index.tsx","./src/flows/components/testrun/node-status-bar/header/index.tsx","./src/flows/components/testrun/node-status-bar/render/index.tsx","./src/flows/components/testrun/node-status-bar/viewer/index.tsx","./src/flows/components/testrun/testrun-button/index.tsx","./src/flows/components/testrun/testrun-form/index.tsx","./src/flows/components/testrun/testrun-form/type.ts","./src/flows/components/testrun/testrun-json-input/index.tsx","./src/flows/components/testrun/testrun-panel/index.tsx","./src/flows/components/tools/auto-layout.tsx","./src/flows/components/tools/comment.tsx","./src/flows/components/tools/fit-view.tsx","./src/flows/components/tools/index.tsx","./src/flows/components/tools/interactive.tsx","./src/flows/components/tools/minimap-switch.tsx","./src/flows/components/tools/minimap.tsx","./src/flows/components/tools/mouse-pad-selector.tsx","./src/flows/components/tools/readonly.tsx","./src/flows/components/tools/save.tsx","./src/flows/components/tools/styles.tsx","./src/flows/components/tools/switch-line.tsx","./src/flows/components/tools/zoom-select.tsx","./src/flows/context/index.ts","./src/flows/context/node-render-context.ts","./src/flows/context/sidebar-context.ts","./src/flows/form-components/feedback.tsx","./src/flows/form-components/index.ts","./src/flows/form-components/form-content/index.tsx","./src/flows/form-components/form-content/styles.tsx","./src/flows/form-components/form-header/index.tsx","./src/flows/form-components/form-header/styles.tsx","./src/flows/form-components/form-header/title-input.tsx","./src/flows/form-components/form-header/utils.tsx","./src/flows/form-components/form-inputs/index.tsx","./src/flows/form-components/form-inputs/styles.tsx","./src/flows/form-components/form-item/index.tsx","./src/flows/hooks/index.ts","./src/flows/hooks/use-editor-props.tsx","./src/flows/hooks/use-is-sidebar.ts","./src/flows/hooks/use-node-render-context.ts","./src/flows/hooks/use-port-click.ts","./src/flows/nodes/constants.ts","./src/flows/nodes/default-form-meta.tsx","./src/flows/nodes/index.ts","./src/flows/nodes/block-end/form-meta.tsx","./src/flows/nodes/block-end/index.ts","./src/flows/nodes/block-start/form-meta.tsx","./src/flows/nodes/block-start/index.ts","./src/flows/nodes/break/form-meta.tsx","./src/flows/nodes/break/index.ts","./src/flows/nodes/code/form-meta.tsx","./src/flows/nodes/code/index.tsx","./src/flows/nodes/code/types.tsx","./src/flows/nodes/code/components/code.tsx","./src/flows/nodes/code/components/inputs.tsx","./src/flows/nodes/code/components/outputs.tsx","./src/flows/nodes/comment/index.tsx","./src/flows/nodes/condition/form-meta.tsx","./src/flows/nodes/condition/index.ts","./src/flows/nodes/condition/condition-inputs/index.tsx","./src/flows/nodes/condition/condition-inputs/styles.tsx","./src/flows/nodes/continue/form-meta.tsx","./src/flows/nodes/continue/index.ts","./src/flows/nodes/db/form-meta.tsx","./src/flows/nodes/db/index.tsx","./src/flows/nodes/end/form-meta.tsx","./src/flows/nodes/end/index.ts","./src/flows/nodes/group/index.tsx","./src/flows/nodes/http/form-meta.tsx","./src/flows/nodes/http/index.tsx","./src/flows/nodes/http/types.tsx","./src/flows/nodes/http/components/api.tsx","./src/flows/nodes/http/components/body.tsx","./src/flows/nodes/http/components/headers.tsx","./src/flows/nodes/http/components/params.tsx","./src/flows/nodes/http/components/timeout.tsx","./src/flows/nodes/llm/index.ts","./src/flows/nodes/loop/form-meta.tsx","./src/flows/nodes/loop/index.ts","./src/flows/nodes/start/form-meta.tsx","./src/flows/nodes/start/index.ts","./src/flows/nodes/variable/form-meta.tsx","./src/flows/nodes/variable/index.tsx","./src/flows/nodes/variable/types.tsx","./src/flows/plugins/index.ts","./src/flows/plugins/context-menu-plugin/context-menu-layer.tsx","./src/flows/plugins/context-menu-plugin/context-menu-plugin.ts","./src/flows/plugins/context-menu-plugin/index.ts","./src/flows/plugins/runtime-plugin/create-runtime-plugin.ts","./src/flows/plugins/runtime-plugin/index.ts","./src/flows/plugins/runtime-plugin/type.ts","./src/flows/plugins/runtime-plugin/client/base-client.ts","./src/flows/plugins/runtime-plugin/client/index.ts","./src/flows/plugins/runtime-plugin/client/browser-client/index.ts","./src/flows/plugins/runtime-plugin/client/server-client/constant.ts","./src/flows/plugins/runtime-plugin/client/server-client/index.ts","./src/flows/plugins/runtime-plugin/client/server-client/type.ts","./src/flows/plugins/runtime-plugin/runtime-service/index.ts","./src/flows/plugins/variable-panel-plugin/index.ts","./src/flows/plugins/variable-panel-plugin/variable-panel-layer.tsx","./src/flows/plugins/variable-panel-plugin/variable-panel-plugin.ts","./src/flows/plugins/variable-panel-plugin/components/full-variable-list.tsx","./src/flows/plugins/variable-panel-plugin/components/global-variable-editor.tsx","./src/flows/plugins/variable-panel-plugin/components/variable-panel.tsx","./src/flows/services/custom-service.ts","./src/flows/services/index.ts","./src/flows/shortcuts/constants.ts","./src/flows/shortcuts/index.ts","./src/flows/shortcuts/shortcuts.ts","./src/flows/shortcuts/type.ts","./src/flows/shortcuts/collapse/index.ts","./src/flows/shortcuts/copy/index.ts","./src/flows/shortcuts/delete/index.ts","./src/flows/shortcuts/expand/index.ts","./src/flows/shortcuts/paste/index.ts","./src/flows/shortcuts/paste/traverse.ts","./src/flows/shortcuts/paste/unique-workflow.ts","./src/flows/shortcuts/select-all/index.ts","./src/flows/shortcuts/zoom-in/index.ts","./src/flows/shortcuts/zoom-out/index.ts","./src/flows/typings/index.ts","./src/flows/typings/json-schema.ts","./src/flows/typings/node.ts","./src/flows/utils/index.ts","./src/flows/utils/on-drag-line-end.ts","./src/flows/utils/toggle-loop-expanded.ts","./src/flows/utils/yaml.test.ts","./src/flows/utils/yaml.ts","./src/layouts/mainlayout.tsx","./src/pages/dashboard.tsx","./src/pages/departments.tsx","./src/pages/flowlist.tsx","./src/pages/flowrunlogs.tsx","./src/pages/login.tsx","./src/pages/logs.tsx","./src/pages/menus.tsx","./src/pages/permissions.tsx","./src/pages/positions.tsx","./src/pages/roles.tsx","./src/pages/users.tsx","./src/utils/axios.ts","./src/utils/config.ts","./src/utils/datetime.ts","./src/utils/permission.tsx","./src/utils/react18-polyfill.ts","./src/utils/sse.ts","./src/utils/token.ts","./src/utils/__tests__/sse.test.ts"],"version":"5.9.2"} \ No newline at end of file +{"root":["./src/app.tsx","./src/main.tsx","./src/vite-env.d.ts","./src/components/pageheader.tsx","./src/flows/app.tsx","./src/flows/editor.tsx","./src/flows/index.ts","./src/flows/initial-data.ts","./src/flows/type.d.ts","./src/flows/assets/icon-auto-layout.tsx","./src/flows/assets/icon-cancel.tsx","./src/flows/assets/icon-comment.tsx","./src/flows/assets/icon-minimap.tsx","./src/flows/assets/icon-mouse.tsx","./src/flows/assets/icon-pad.tsx","./src/flows/assets/icon-success.tsx","./src/flows/assets/icon-switch-line.tsx","./src/flows/assets/icon-warning.tsx","./src/flows/components/index.ts","./src/flows/components/add-node/index.tsx","./src/flows/components/add-node/use-add-node.ts","./src/flows/components/base-node/index.tsx","./src/flows/components/base-node/node-wrapper.tsx","./src/flows/components/base-node/styles.tsx","./src/flows/components/base-node/utils.ts","./src/flows/components/comment/constant.ts","./src/flows/components/comment/index.ts","./src/flows/components/comment/model.ts","./src/flows/components/comment/type.ts","./src/flows/components/comment/components/blank-area.tsx","./src/flows/components/comment/components/border-area.tsx","./src/flows/components/comment/components/container.tsx","./src/flows/components/comment/components/content-drag-area.tsx","./src/flows/components/comment/components/drag-area.tsx","./src/flows/components/comment/components/editor.tsx","./src/flows/components/comment/components/index.ts","./src/flows/components/comment/components/more-button.tsx","./src/flows/components/comment/components/render.tsx","./src/flows/components/comment/components/resize-area.tsx","./src/flows/components/comment/hooks/index.ts","./src/flows/components/comment/hooks/use-model.ts","./src/flows/components/comment/hooks/use-overflow.ts","./src/flows/components/comment/hooks/use-size.ts","./src/flows/components/group/color.ts","./src/flows/components/group/constant.ts","./src/flows/components/group/index.ts","./src/flows/components/group/components/background.tsx","./src/flows/components/group/components/color.tsx","./src/flows/components/group/components/header.tsx","./src/flows/components/group/components/icon-group.tsx","./src/flows/components/group/components/index.ts","./src/flows/components/group/components/node-render.tsx","./src/flows/components/group/components/title.tsx","./src/flows/components/group/components/tools.tsx","./src/flows/components/group/components/ungroup.tsx","./src/flows/components/group/components/tips/global-store.ts","./src/flows/components/group/components/tips/icon-close.tsx","./src/flows/components/group/components/tips/index.tsx","./src/flows/components/group/components/tips/is-mac-os.ts","./src/flows/components/group/components/tips/style.ts","./src/flows/components/group/components/tips/use-control.ts","./src/flows/components/line-add-button/button.tsx","./src/flows/components/line-add-button/index.tsx","./src/flows/components/line-add-button/use-visible.ts","./src/flows/components/node-menu/index.tsx","./src/flows/components/node-panel/index.tsx","./src/flows/components/node-panel/node-list.tsx","./src/flows/components/node-panel/node-placeholder.tsx","./src/flows/components/selector-box-popover/index.tsx","./src/flows/components/sidebar/index.tsx","./src/flows/components/sidebar/sidebar-node-renderer.tsx","./src/flows/components/sidebar/sidebar-provider.tsx","./src/flows/components/sidebar/sidebar-renderer.tsx","./src/flows/components/testrun/hooks/index.ts","./src/flows/components/testrun/hooks/use-fields.ts","./src/flows/components/testrun/hooks/use-form-meta.ts","./src/flows/components/testrun/hooks/use-sync-default.ts","./src/flows/components/testrun/node-status-bar/index.tsx","./src/flows/components/testrun/node-status-bar/group/index.tsx","./src/flows/components/testrun/node-status-bar/header/index.tsx","./src/flows/components/testrun/node-status-bar/render/index.tsx","./src/flows/components/testrun/node-status-bar/viewer/index.tsx","./src/flows/components/testrun/testrun-button/index.tsx","./src/flows/components/testrun/testrun-form/index.tsx","./src/flows/components/testrun/testrun-form/type.ts","./src/flows/components/testrun/testrun-json-input/index.tsx","./src/flows/components/testrun/testrun-panel/index.tsx","./src/flows/components/tools/auto-layout.tsx","./src/flows/components/tools/comment.tsx","./src/flows/components/tools/fit-view.tsx","./src/flows/components/tools/index.tsx","./src/flows/components/tools/interactive.tsx","./src/flows/components/tools/minimap-switch.tsx","./src/flows/components/tools/minimap.tsx","./src/flows/components/tools/mouse-pad-selector.tsx","./src/flows/components/tools/readonly.tsx","./src/flows/components/tools/save.tsx","./src/flows/components/tools/styles.tsx","./src/flows/components/tools/switch-line.tsx","./src/flows/components/tools/zoom-select.tsx","./src/flows/context/index.ts","./src/flows/context/node-render-context.ts","./src/flows/context/sidebar-context.ts","./src/flows/form-components/feedback.tsx","./src/flows/form-components/index.ts","./src/flows/form-components/form-content/index.tsx","./src/flows/form-components/form-content/styles.tsx","./src/flows/form-components/form-header/index.tsx","./src/flows/form-components/form-header/styles.tsx","./src/flows/form-components/form-header/title-input.tsx","./src/flows/form-components/form-header/utils.tsx","./src/flows/form-components/form-inputs/index.tsx","./src/flows/form-components/form-inputs/styles.tsx","./src/flows/form-components/form-item/index.tsx","./src/flows/hooks/index.ts","./src/flows/hooks/use-editor-props.tsx","./src/flows/hooks/use-is-sidebar.ts","./src/flows/hooks/use-node-render-context.ts","./src/flows/hooks/use-port-click.ts","./src/flows/nodes/constants.ts","./src/flows/nodes/default-form-meta.tsx","./src/flows/nodes/index.ts","./src/flows/nodes/block-end/form-meta.tsx","./src/flows/nodes/block-end/index.ts","./src/flows/nodes/block-start/form-meta.tsx","./src/flows/nodes/block-start/index.ts","./src/flows/nodes/break/form-meta.tsx","./src/flows/nodes/break/index.ts","./src/flows/nodes/code/form-meta.tsx","./src/flows/nodes/code/index.tsx","./src/flows/nodes/code/types.tsx","./src/flows/nodes/code/components/code.tsx","./src/flows/nodes/code/components/inputs.tsx","./src/flows/nodes/code/components/outputs.tsx","./src/flows/nodes/comment/index.tsx","./src/flows/nodes/condition/form-meta.tsx","./src/flows/nodes/condition/index.ts","./src/flows/nodes/condition/condition-inputs/index.tsx","./src/flows/nodes/condition/condition-inputs/styles.tsx","./src/flows/nodes/continue/form-meta.tsx","./src/flows/nodes/continue/index.ts","./src/flows/nodes/db/form-meta.tsx","./src/flows/nodes/db/index.tsx","./src/flows/nodes/end/form-meta.tsx","./src/flows/nodes/end/index.ts","./src/flows/nodes/group/index.tsx","./src/flows/nodes/http/form-meta.tsx","./src/flows/nodes/http/index.tsx","./src/flows/nodes/http/types.tsx","./src/flows/nodes/http/components/api.tsx","./src/flows/nodes/http/components/body.tsx","./src/flows/nodes/http/components/headers.tsx","./src/flows/nodes/http/components/params.tsx","./src/flows/nodes/http/components/timeout.tsx","./src/flows/nodes/llm/index.ts","./src/flows/nodes/loop/form-meta.tsx","./src/flows/nodes/loop/index.ts","./src/flows/nodes/start/form-meta.tsx","./src/flows/nodes/start/index.ts","./src/flows/nodes/variable/form-meta.tsx","./src/flows/nodes/variable/index.tsx","./src/flows/nodes/variable/types.tsx","./src/flows/plugins/index.ts","./src/flows/plugins/context-menu-plugin/context-menu-layer.tsx","./src/flows/plugins/context-menu-plugin/context-menu-plugin.ts","./src/flows/plugins/context-menu-plugin/index.ts","./src/flows/plugins/runtime-plugin/create-runtime-plugin.ts","./src/flows/plugins/runtime-plugin/index.ts","./src/flows/plugins/runtime-plugin/type.ts","./src/flows/plugins/runtime-plugin/client/base-client.ts","./src/flows/plugins/runtime-plugin/client/index.ts","./src/flows/plugins/runtime-plugin/client/browser-client/index.ts","./src/flows/plugins/runtime-plugin/client/server-client/constant.ts","./src/flows/plugins/runtime-plugin/client/server-client/index.ts","./src/flows/plugins/runtime-plugin/client/server-client/type.ts","./src/flows/plugins/runtime-plugin/runtime-service/index.ts","./src/flows/plugins/variable-panel-plugin/index.ts","./src/flows/plugins/variable-panel-plugin/variable-panel-layer.tsx","./src/flows/plugins/variable-panel-plugin/variable-panel-plugin.ts","./src/flows/plugins/variable-panel-plugin/components/full-variable-list.tsx","./src/flows/plugins/variable-panel-plugin/components/global-variable-editor.tsx","./src/flows/plugins/variable-panel-plugin/components/variable-panel.tsx","./src/flows/services/custom-service.ts","./src/flows/services/index.ts","./src/flows/shortcuts/constants.ts","./src/flows/shortcuts/index.ts","./src/flows/shortcuts/shortcuts.ts","./src/flows/shortcuts/type.ts","./src/flows/shortcuts/collapse/index.ts","./src/flows/shortcuts/copy/index.ts","./src/flows/shortcuts/delete/index.ts","./src/flows/shortcuts/expand/index.ts","./src/flows/shortcuts/paste/index.ts","./src/flows/shortcuts/paste/traverse.ts","./src/flows/shortcuts/paste/unique-workflow.ts","./src/flows/shortcuts/select-all/index.ts","./src/flows/shortcuts/zoom-in/index.ts","./src/flows/shortcuts/zoom-out/index.ts","./src/flows/typings/index.ts","./src/flows/typings/json-schema.ts","./src/flows/typings/node.ts","./src/flows/utils/index.ts","./src/flows/utils/on-drag-line-end.ts","./src/flows/utils/toggle-loop-expanded.ts","./src/flows/utils/yaml.test.ts","./src/flows/utils/yaml.ts","./src/layouts/mainlayout.tsx","./src/pages/dashboard.tsx","./src/pages/departments.tsx","./src/pages/flowlist.tsx","./src/pages/flowrunlogs.tsx","./src/pages/login.tsx","./src/pages/logs.tsx","./src/pages/menus.tsx","./src/pages/permissions.tsx","./src/pages/positions.tsx","./src/pages/roles.tsx","./src/pages/schedulejobs.tsx","./src/pages/users.tsx","./src/utils/axios.ts","./src/utils/config.ts","./src/utils/datetime.ts","./src/utils/permission.tsx","./src/utils/react18-polyfill.ts","./src/utils/sse.ts","./src/utils/token.ts","./src/utils/__tests__/sse.test.ts"],"version":"5.9.2"} \ No newline at end of file