Files
udmin/backend/src/main.rs
ayou 8c06849254 feat(调度任务): 实现调度任务管理功能
新增调度任务模块,支持任务的增删改查、启停及手动执行
- 后端添加 schedule_job 模型、服务、路由及调度器工具类
- 前端新增调度任务管理页面
- 修改 flow 相关接口将 id 类型从 String 改为 i64
- 添加 tokio-cron-scheduler 依赖实现定时任务调度
- 初始化时加载已启用任务并注册到调度器
2025-09-24 00:21:30 +08:00

146 lines
5.3 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

mod db;
mod redis;
mod response;
mod error;
pub mod middlewares;
pub mod models;
pub mod services;
pub mod routes;
pub mod utils;
pub mod flow;
use axum::Router;
use axum::http::{HeaderValue, Method};
use tower_http::cors::{CorsLayer, Any, AllowOrigin};
use migration::MigratorTrait;
use axum::middleware;
// 自定义日志时间格式YYYY-MM-DD HH:MM:SS.ssssss不带 T 和 Z
struct LocalTimeFmt;
impl tracing_subscriber::fmt::time::FormatTime for LocalTimeFmt {
fn format_time(&self, w: &mut tracing_subscriber::fmt::format::Writer) -> std::fmt::Result {
let now = chrono::Local::now();
w.write_str(&now.format("%Y-%m-%d %H:%M:%S%.6f").to_string())
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// 增强:支持通过 ENV_FILE 指定要加载的环境文件,并记录实际加载的文件
// - ENV_FILE=prod 或 production => .env.prod
// - ENV_FILE=dev 或 development => .env
// - ENV_FILE=staging => .env.staging
// - ENV_FILE=任意字符串 => 视为显式文件名或路径
let env_file_used: Option<String> = if let Ok(v) = std::env::var("ENV_FILE") {
let filename = match v.trim() {
"" => ".env".to_string(),
"prod" | "production" => ".env.prod".to_string(),
"dev" | "development" => ".env".to_string(),
"staging" | "pre" | "preprod" | "pre-production" => ".env.staging".to_string(),
other => other.to_string(),
};
match dotenvy::from_filename_override(&filename) {
Ok(_) => Some(filename),
Err(_) => Some(format!("{} (not found)", filename)),
}
} else {
match dotenvy::dotenv_override() {
Ok(path) => Some(path.to_string_lossy().to_string()),
Err(_) => None,
}
};
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.with_timer(LocalTimeFmt)
.init();
let db = db::init_db().await?;
// set global DB for tasks
db::set_db(db.clone()).expect("db set failure");
// initialize Redis connection
let redis_pool = redis::init_redis().await?;
redis::set_redis_pool(redis_pool)?;
// 初始化分布式ID生成器读取 ID_MACHINE_ID / ID_NODE_ID
crate::utils::init_from_env();
// run migrations
migration::Migrator::up(&db, None).await.expect("migration up");
// 初始化并启动调度器仅启动不加载DB
if let Err(e) = crate::utils::init_scheduler().await { tracing::error!(target = "udmin", error = %e, "init scheduler failed"); }
// 由 service 层加载启用任务并注册到调度器
if let Err(e) = services::schedule_job_service::init_load_enabled_and_register(&db).await {
tracing::error!(target = "udmin", error = %e, "init schedule jobs failed");
}
let allow_origins = std::env::var("CORS_ALLOW_ORIGINS").unwrap_or_else(|_| "http://localhost:5173".into());
let origin_values: Vec<HeaderValue> = allow_origins
.split(',')
.filter_map(|s| HeaderValue::from_str(s.trim()).ok())
.collect();
let allowed_methods = [
Method::GET,
Method::POST,
Method::PUT,
Method::PATCH,
Method::DELETE,
Method::OPTIONS,
];
let cors = if origin_values.is_empty() {
// 当允许任意来源时,不能与 allow_credentials(true) 同时使用
CorsLayer::new()
.allow_origin(Any)
.allow_methods(allowed_methods.clone())
.allow_headers([
axum::http::header::ACCEPT,
axum::http::header::CONTENT_TYPE,
axum::http::header::AUTHORIZATION,
])
.allow_credentials(false)
} else {
CorsLayer::new()
.allow_origin(AllowOrigin::list(origin_values))
.allow_methods(allowed_methods)
.allow_headers([
axum::http::header::ACCEPT,
axum::http::header::CONTENT_TYPE,
axum::http::header::AUTHORIZATION,
])
.allow_credentials(true)
};
let api = routes::api_router().with_state(db.clone());
let app = Router::new()
.nest("/api", api)
.layer(cors)
.layer(middleware::from_fn_with_state(db.clone(), middlewares::logging::request_logger));
// 读取并记录最终使用的主机与端口(默认端口改为 9898
let app_host = std::env::var("APP_HOST").unwrap_or("0.0.0.0".into());
let app_port = std::env::var("APP_PORT").unwrap_or("9898".into());
if let Some(f) = &env_file_used { tracing::info!("env file loaded: {}", f); } else { tracing::info!("env file loaded: <none>"); }
tracing::info!("resolved APP_HOST={} APP_PORT={}", app_host, app_port);
let http_addr = format!("{}:{}", app_host, app_port);
tracing::info!("listening on {}", http_addr);
// HTTP 服务监听
let http_listener = tokio::net::TcpListener::bind(http_addr.clone()).await?;
let http_server = axum::serve(http_listener, app);
// WS 服务下沉到中间件
let ws_server = middlewares::ws::serve(db.clone());
// 新增SSE 服务独立端口监听(默认 8866可配 SSE_HOST/SSE_PORT
let sse_server = middlewares::sse::serve(db.clone());
tokio::try_join!(http_server, ws_server, sse_server)?;
Ok(())
}