新增以下文档文件: - PROJECT_OVERVIEW.md 项目总览文档 - BACKEND_ARCHITECTURE.md 后端架构文档 - FRONTEND_ARCHITECTURE.md 前端架构文档 - FLOW_ENGINE.md 流程引擎文档 - SERVICES.md 服务层文档 - ERROR_HANDLING.md 错误处理模块文档 文档内容涵盖项目整体介绍、技术架构、核心模块设计和实现细节
31 KiB
31 KiB
UdminAI 数据库模块文档
概述
UdminAI 项目的数据库模块基于 SeaORM 框架构建,提供了完整的数据库抽象层和 ORM 功能。该模块负责数据库连接管理、事务处理、查询构建、迁移管理等核心功能,为整个系统提供可靠的数据持久化支持。
技术架构
核心组件
- SeaORM: 现代化的 Rust ORM 框架
- SQLx: 异步 SQL 工具包
- PostgreSQL: 主数据库(支持 MySQL、SQLite)
- Redis: 缓存和会话存储
- Migration: 数据库版本管理
设计原则
- 类型安全: 编译时查询验证
- 异步优先: 全异步数据库操作
- 连接池: 高效的连接管理
- 事务支持: ACID 事务保证
- 迁移管理: 版本化数据库结构
模块结构
backend/src/
├── db.rs # 数据库连接和配置
├── redis.rs # Redis 连接和操作
└── models/ # 数据模型定义
├── mod.rs
├── user.rs
├── role.rs
├── permission.rs
├── flow.rs
├── schedule_job.rs
└── ...
数据库连接 (db.rs)
功能特性
- 数据库连接池管理
- 多数据库支持
- 连接健康检查
- 自动重连机制
- 性能监控
实现代码
use sea_orm::{
ConnectOptions, Database, DatabaseConnection, DbErr, TransactionTrait,
};
use std::time::Duration;
use tracing::{error, info, warn};
/// 数据库配置
#[derive(Debug, Clone)]
pub struct DatabaseConfig {
pub url: String,
pub max_connections: u32,
pub min_connections: u32,
pub connect_timeout: Duration,
pub idle_timeout: Duration,
pub max_lifetime: Duration,
pub sqlx_logging: bool,
pub sqlx_logging_level: tracing::Level,
}
impl Default for DatabaseConfig {
fn default() -> Self {
Self {
url: "postgresql://localhost/udmin_ai".to_string(),
max_connections: 100,
min_connections: 5,
connect_timeout: Duration::from_secs(8),
idle_timeout: Duration::from_secs(600),
max_lifetime: Duration::from_secs(3600),
sqlx_logging: true,
sqlx_logging_level: tracing::Level::INFO,
}
}
}
/// 数据库连接管理器
#[derive(Debug, Clone)]
pub struct DatabaseManager {
connection: DatabaseConnection,
config: DatabaseConfig,
}
impl DatabaseManager {
/// 创建新的数据库管理器
pub async fn new(config: DatabaseConfig) -> Result<Self, DbErr> {
info!(target = "udmin", url = %config.url, "database.connect.starting");
let mut opt = ConnectOptions::new(&config.url);
opt.max_connections(config.max_connections)
.min_connections(config.min_connections)
.connect_timeout(config.connect_timeout)
.idle_timeout(config.idle_timeout)
.max_lifetime(config.max_lifetime)
.sqlx_logging(config.sqlx_logging)
.sqlx_logging_level(config.sqlx_logging_level);
let connection = Database::connect(opt).await?;
info!(target = "udmin", "database.connect.success");
Ok(Self { connection, config })
}
/// 从环境变量创建数据库管理器
pub async fn from_env() -> Result<Self, DbErr> {
let config = DatabaseConfig {
url: std::env::var("DATABASE_URL")
.unwrap_or_else(|_| "postgresql://localhost/udmin_ai".to_string()),
max_connections: std::env::var("DB_MAX_CONNECTIONS")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(100),
min_connections: std::env::var("DB_MIN_CONNECTIONS")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(5),
connect_timeout: Duration::from_secs(
std::env::var("DB_CONNECT_TIMEOUT")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(8),
),
idle_timeout: Duration::from_secs(
std::env::var("DB_IDLE_TIMEOUT")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(600),
),
max_lifetime: Duration::from_secs(
std::env::var("DB_MAX_LIFETIME")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(3600),
),
sqlx_logging: std::env::var("DB_SQLX_LOGGING")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(true),
sqlx_logging_level: match std::env::var("DB_SQLX_LOGGING_LEVEL")
.unwrap_or_else(|_| "info".to_string())
.to_lowercase()
.as_str()
{
"trace" => tracing::Level::TRACE,
"debug" => tracing::Level::DEBUG,
"info" => tracing::Level::INFO,
"warn" => tracing::Level::WARN,
"error" => tracing::Level::ERROR,
_ => tracing::Level::INFO,
},
};
Self::new(config).await
}
/// 获取数据库连接
pub fn connection(&self) -> &DatabaseConnection {
&self.connection
}
/// 检查数据库连接健康状态
pub async fn health_check(&self) -> Result<(), DbErr> {
use sea_orm::Statement;
let backend = self.connection.get_database_backend();
let stmt = Statement::from_string(backend, "SELECT 1".to_string());
match self.connection.execute(stmt).await {
Ok(_) => {
info!(target = "udmin", "database.health_check.success");
Ok(())
}
Err(e) => {
error!(target = "udmin", error = %e, "database.health_check.failed");
Err(e)
}
}
}
/// 获取连接池统计信息
pub async fn pool_stats(&self) -> DatabasePoolStats {
// 注意:SeaORM 目前不直接暴露连接池统计信息
// 这里提供一个接口,实际实现可能需要通过其他方式获取
DatabasePoolStats {
active_connections: 0, // 需要通过底层 SQLx 获取
idle_connections: 0,
total_connections: 0,
max_connections: self.config.max_connections,
}
}
/// 执行事务
pub async fn transaction<F, R, E>(&self, f: F) -> Result<R, E>
where
F: for<'c> FnOnce(&'c DatabaseConnection) -> futures::future::BoxFuture<'c, Result<R, E>>
+ Send,
E: From<DbErr>,
R: Send,
{
let txn = self.connection.begin().await.map_err(E::from)?;
match f(&txn).await {
Ok(result) => {
txn.commit().await.map_err(E::from)?;
info!(target = "udmin", "database.transaction.committed");
Ok(result)
}
Err(e) => {
if let Err(rollback_err) = txn.rollback().await {
error!(target = "udmin", error = %rollback_err, "database.transaction.rollback_failed");
}
warn!(target = "udmin", "database.transaction.rolled_back");
Err(e)
}
}
}
/// 关闭数据库连接
pub async fn close(&self) -> Result<(), DbErr> {
info!(target = "udmin", "database.connection.closing");
self.connection.close().await?;
info!(target = "udmin", "database.connection.closed");
Ok(())
}
}
/// 数据库连接池统计信息
#[derive(Debug, Clone)]
pub struct DatabasePoolStats {
pub active_connections: u32,
pub idle_connections: u32,
pub total_connections: u32,
pub max_connections: u32,
}
/// 数据库迁移管理
pub struct MigrationManager {
connection: DatabaseConnection,
}
impl MigrationManager {
pub fn new(connection: DatabaseConnection) -> Self {
Self { connection }
}
/// 运行所有待执行的迁移
pub async fn migrate_up(&self) -> Result<(), DbErr> {
info!(target = "udmin", "database.migration.starting");
// 这里需要根据实际的迁移框架实现
// 例如使用 sea-orm-migration
info!(target = "udmin", "database.migration.completed");
Ok(())
}
/// 回滚迁移
pub async fn migrate_down(&self, steps: Option<u32>) -> Result<(), DbErr> {
let steps = steps.unwrap_or(1);
info!(target = "udmin", steps = %steps, "database.migration.rollback.starting");
// 实现迁移回滚逻辑
info!(target = "udmin", steps = %steps, "database.migration.rollback.completed");
Ok(())
}
/// 获取迁移状态
pub async fn migration_status(&self) -> Result<Vec<MigrationInfo>, DbErr> {
// 返回迁移状态信息
Ok(vec![])
}
}
/// 迁移信息
#[derive(Debug, Clone)]
pub struct MigrationInfo {
pub version: String,
pub name: String,
pub applied_at: Option<chrono::DateTime<chrono::Utc>>,
pub is_applied: bool,
}
/// 数据库查询构建器辅助函数
pub mod query_builder {
use sea_orm::{
ColumnTrait, Condition, EntityTrait, PaginatorTrait, QueryFilter, QueryOrder, QuerySelect,
Select,
};
/// 分页查询构建器
pub struct PaginationBuilder<E: EntityTrait> {
select: Select<E>,
page: u64,
page_size: u64,
}
impl<E: EntityTrait> PaginationBuilder<E> {
pub fn new(select: Select<E>) -> Self {
Self {
select,
page: 1,
page_size: 20,
}
}
pub fn page(mut self, page: u64) -> Self {
self.page = page.max(1);
self
}
pub fn page_size(mut self, page_size: u64) -> Self {
self.page_size = page_size.clamp(1, 100);
self
}
pub fn build(self) -> Select<E> {
let offset = (self.page - 1) * self.page_size;
self.select.limit(self.page_size).offset(offset)
}
}
/// 条件构建器
pub struct ConditionBuilder {
condition: Condition,
}
impl ConditionBuilder {
pub fn new() -> Self {
Self {
condition: Condition::all(),
}
}
pub fn add<C>(mut self, column_condition: C) -> Self
where
C: Into<Condition>,
{
self.condition = self.condition.add(column_condition);
self
}
pub fn add_option<C>(mut self, condition: Option<C>) -> Self
where
C: Into<Condition>,
{
if let Some(cond) = condition {
self.condition = self.condition.add(cond);
}
self
}
pub fn build(self) -> Condition {
self.condition
}
}
impl Default for ConditionBuilder {
fn default() -> Self {
Self::new()
}
}
}
/// 数据库错误处理
pub mod error_handler {
use sea_orm::DbErr;
use crate::error::AppError;
/// 将数据库错误转换为应用错误
pub fn handle_db_error(err: DbErr) -> AppError {
match err {
DbErr::RecordNotFound(_) => AppError::NotFound("记录不存在".to_string()),
DbErr::Custom(msg) => AppError::DatabaseError(msg),
DbErr::Conn(msg) => AppError::DatabaseError(format!("连接错误: {}", msg)),
DbErr::Exec(msg) => AppError::DatabaseError(format!("执行错误: {}", msg)),
DbErr::Query(msg) => AppError::DatabaseError(format!("查询错误: {}", msg)),
_ => AppError::DatabaseError("未知数据库错误".to_string()),
}
}
/// 检查是否为唯一约束违反错误
pub fn is_unique_violation(err: &DbErr) -> bool {
match err {
DbErr::Exec(msg) | DbErr::Query(msg) => {
msg.contains("unique constraint") || msg.contains("duplicate key")
}
_ => false,
}
}
/// 检查是否为外键约束违反错误
pub fn is_foreign_key_violation(err: &DbErr) -> bool {
match err {
DbErr::Exec(msg) | DbErr::Query(msg) => {
msg.contains("foreign key constraint")
}
_ => false,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use tokio_test;
#[tokio::test]
async fn test_database_config() {
let config = DatabaseConfig::default();
assert_eq!(config.max_connections, 100);
assert_eq!(config.min_connections, 5);
}
#[tokio::test]
async fn test_condition_builder() {
use query_builder::ConditionBuilder;
use sea_orm::{ColumnTrait, Condition};
let condition = ConditionBuilder::new()
.add_option(Some(Condition::all()))
.build();
// 验证条件构建
assert!(!format!("{:?}", condition).is_empty());
}
#[test]
fn test_error_handling() {
use error_handler::*;
let db_err = DbErr::RecordNotFound("test".to_string());
let app_err = handle_db_error(db_err);
match app_err {
AppError::NotFound(_) => assert!(true),
_ => assert!(false, "Expected NotFound error"),
}
}
}
Redis 连接 (redis.rs)
功能特性
- Redis 连接池管理
- 缓存操作封装
- 会话存储
- 分布式锁
- 发布订阅
实现代码
use redis::{
aio::ConnectionManager, Client, RedisError, RedisResult, AsyncCommands,
};
use serde::{Deserialize, Serialize};
use std::time::Duration;
use tracing::{error, info, warn};
use tokio::time::timeout;
/// Redis 配置
#[derive(Debug, Clone)]
pub struct RedisConfig {
pub url: String,
pub max_connections: u32,
pub connect_timeout: Duration,
pub command_timeout: Duration,
pub retry_attempts: u32,
pub retry_delay: Duration,
}
impl Default for RedisConfig {
fn default() -> Self {
Self {
url: "redis://localhost:6379".to_string(),
max_connections: 50,
connect_timeout: Duration::from_secs(5),
command_timeout: Duration::from_secs(10),
retry_attempts: 3,
retry_delay: Duration::from_millis(100),
}
}
}
/// Redis 连接管理器
#[derive(Debug, Clone)]
pub struct RedisManager {
connection: ConnectionManager,
config: RedisConfig,
}
impl RedisManager {
/// 创建新的 Redis 管理器
pub async fn new(config: RedisConfig) -> RedisResult<Self> {
info!(target = "udmin", url = %config.url, "redis.connect.starting");
let client = Client::open(config.url.clone())?;
let connection = timeout(
config.connect_timeout,
ConnectionManager::new(client),
)
.await
.map_err(|_| RedisError::from((redis::ErrorKind::IoError, "连接超时")))??
;
info!(target = "udmin", "redis.connect.success");
Ok(Self { connection, config })
}
/// 从环境变量创建 Redis 管理器
pub async fn from_env() -> RedisResult<Self> {
let config = RedisConfig {
url: std::env::var("REDIS_URL")
.unwrap_or_else(|_| "redis://localhost:6379".to_string()),
max_connections: std::env::var("REDIS_MAX_CONNECTIONS")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(50),
connect_timeout: Duration::from_secs(
std::env::var("REDIS_CONNECT_TIMEOUT")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(5),
),
command_timeout: Duration::from_secs(
std::env::var("REDIS_COMMAND_TIMEOUT")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(10),
),
retry_attempts: std::env::var("REDIS_RETRY_ATTEMPTS")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(3),
retry_delay: Duration::from_millis(
std::env::var("REDIS_RETRY_DELAY_MS")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(100),
),
};
Self::new(config).await
}
/// 执行带超时的 Redis 命令
async fn execute_with_timeout<F, R>(&self, f: F) -> RedisResult<R>
where
F: futures::Future<Output = RedisResult<R>>,
{
timeout(self.config.command_timeout, f)
.await
.map_err(|_| RedisError::from((redis::ErrorKind::IoError, "命令执行超时")))??
}
/// 设置键值对
pub async fn set<K, V>(&self, key: K, value: V) -> RedisResult<()>
where
K: redis::ToRedisArgs + Send + Sync,
V: redis::ToRedisArgs + Send + Sync,
{
let mut conn = self.connection.clone();
self.execute_with_timeout(async move {
conn.set(key, value).await
}).await
}
/// 设置键值对(带过期时间)
pub async fn setex<K, V>(&self, key: K, value: V, seconds: usize) -> RedisResult<()>
where
K: redis::ToRedisArgs + Send + Sync,
V: redis::ToRedisArgs + Send + Sync,
{
let mut conn = self.connection.clone();
self.execute_with_timeout(async move {
conn.setex(key, seconds, value).await
}).await
}
/// 获取键值
pub async fn get<K, V>(&self, key: K) -> RedisResult<Option<V>>
where
K: redis::ToRedisArgs + Send + Sync,
V: redis::FromRedisValue,
{
let mut conn = self.connection.clone();
self.execute_with_timeout(async move {
conn.get(key).await
}).await
}
/// 删除键
pub async fn del<K>(&self, key: K) -> RedisResult<bool>
where
K: redis::ToRedisArgs + Send + Sync,
{
let mut conn = self.connection.clone();
let result: i32 = self.execute_with_timeout(async move {
conn.del(key).await
}).await?;
Ok(result > 0)
}
/// 检查键是否存在
pub async fn exists<K>(&self, key: K) -> RedisResult<bool>
where
K: redis::ToRedisArgs + Send + Sync,
{
let mut conn = self.connection.clone();
self.execute_with_timeout(async move {
conn.exists(key).await
}).await
}
/// 设置键的过期时间
pub async fn expire<K>(&self, key: K, seconds: usize) -> RedisResult<bool>
where
K: redis::ToRedisArgs + Send + Sync,
{
let mut conn = self.connection.clone();
self.execute_with_timeout(async move {
conn.expire(key, seconds).await
}).await
}
/// 获取键的剩余过期时间
pub async fn ttl<K>(&self, key: K) -> RedisResult<i32>
where
K: redis::ToRedisArgs + Send + Sync,
{
let mut conn = self.connection.clone();
self.execute_with_timeout(async move {
conn.ttl(key).await
}).await
}
/// 原子递增
pub async fn incr<K>(&self, key: K, delta: i64) -> RedisResult<i64>
where
K: redis::ToRedisArgs + Send + Sync,
{
let mut conn = self.connection.clone();
self.execute_with_timeout(async move {
conn.incr(key, delta).await
}).await
}
/// 原子递减
pub async fn decr<K>(&self, key: K, delta: i64) -> RedisResult<i64>
where
K: redis::ToRedisArgs + Send + Sync,
{
let mut conn = self.connection.clone();
self.execute_with_timeout(async move {
conn.decr(key, delta).await
}).await
}
/// 列表左推
pub async fn lpush<K, V>(&self, key: K, value: V) -> RedisResult<i32>
where
K: redis::ToRedisArgs + Send + Sync,
V: redis::ToRedisArgs + Send + Sync,
{
let mut conn = self.connection.clone();
self.execute_with_timeout(async move {
conn.lpush(key, value).await
}).await
}
/// 列表右弹
pub async fn rpop<K, V>(&self, key: K) -> RedisResult<Option<V>>
where
K: redis::ToRedisArgs + Send + Sync,
V: redis::FromRedisValue,
{
let mut conn = self.connection.clone();
self.execute_with_timeout(async move {
conn.rpop(key, None).await
}).await
}
/// 获取列表长度
pub async fn llen<K>(&self, key: K) -> RedisResult<i32>
where
K: redis::ToRedisArgs + Send + Sync,
{
let mut conn = self.connection.clone();
self.execute_with_timeout(async move {
conn.llen(key).await
}).await
}
/// 哈希设置字段
pub async fn hset<K, F, V>(&self, key: K, field: F, value: V) -> RedisResult<bool>
where
K: redis::ToRedisArgs + Send + Sync,
F: redis::ToRedisArgs + Send + Sync,
V: redis::ToRedisArgs + Send + Sync,
{
let mut conn = self.connection.clone();
self.execute_with_timeout(async move {
conn.hset(key, field, value).await
}).await
}
/// 哈希获取字段
pub async fn hget<K, F, V>(&self, key: K, field: F) -> RedisResult<Option<V>>
where
K: redis::ToRedisArgs + Send + Sync,
F: redis::ToRedisArgs + Send + Sync,
V: redis::FromRedisValue,
{
let mut conn = self.connection.clone();
self.execute_with_timeout(async move {
conn.hget(key, field).await
}).await
}
/// 哈希删除字段
pub async fn hdel<K, F>(&self, key: K, field: F) -> RedisResult<bool>
where
K: redis::ToRedisArgs + Send + Sync,
F: redis::ToRedisArgs + Send + Sync,
{
let mut conn = self.connection.clone();
let result: i32 = self.execute_with_timeout(async move {
conn.hdel(key, field).await
}).await?;
Ok(result > 0)
}
/// 检查 Redis 连接健康状态
pub async fn health_check(&self) -> RedisResult<()> {
let mut conn = self.connection.clone();
match self.execute_with_timeout(async move {
redis::cmd("PING").query_async(&mut conn).await
}).await {
Ok(redis::Value::Status(status)) if status == "PONG" => {
info!(target = "udmin", "redis.health_check.success");
Ok(())
}
Ok(_) => {
error!(target = "udmin", "redis.health_check.unexpected_response");
Err(RedisError::from((redis::ErrorKind::ResponseError, "意外的响应")))
}
Err(e) => {
error!(target = "udmin", error = %e, "redis.health_check.failed");
Err(e)
}
}
}
}
/// Redis 缓存操作封装
pub struct RedisCache {
manager: RedisManager,
key_prefix: String,
default_ttl: Duration,
}
impl RedisCache {
pub fn new(manager: RedisManager, key_prefix: String, default_ttl: Duration) -> Self {
Self {
manager,
key_prefix,
default_ttl,
}
}
/// 构建完整的缓存键
fn build_key(&self, key: &str) -> String {
format!("{}:{}", self.key_prefix, key)
}
/// 设置缓存(JSON 序列化)
pub async fn set_json<T>(&self, key: &str, value: &T) -> RedisResult<()>
where
T: Serialize,
{
let json_value = serde_json::to_string(value)
.map_err(|e| RedisError::from((redis::ErrorKind::TypeError, e.to_string())))?;
let full_key = self.build_key(key);
self.manager.setex(full_key, json_value, self.default_ttl.as_secs() as usize).await
}
/// 获取缓存(JSON 反序列化)
pub async fn get_json<T>(&self, key: &str) -> RedisResult<Option<T>>
where
T: for<'de> Deserialize<'de>,
{
let full_key = self.build_key(key);
let json_value: Option<String> = self.manager.get(full_key).await?;
match json_value {
Some(json) => {
let value = serde_json::from_str(&json)
.map_err(|e| RedisError::from((redis::ErrorKind::TypeError, e.to_string())))?;
Ok(Some(value))
}
None => Ok(None),
}
}
/// 删除缓存
pub async fn delete(&self, key: &str) -> RedisResult<bool> {
let full_key = self.build_key(key);
self.manager.del(full_key).await
}
/// 检查缓存是否存在
pub async fn exists(&self, key: &str) -> RedisResult<bool> {
let full_key = self.build_key(key);
self.manager.exists(full_key).await
}
/// 设置缓存过期时间
pub async fn expire(&self, key: &str, ttl: Duration) -> RedisResult<bool> {
let full_key = self.build_key(key);
self.manager.expire(full_key, ttl.as_secs() as usize).await
}
/// 获取或设置缓存(缓存穿透保护)
pub async fn get_or_set<T, F, Fut>(&self, key: &str, fetcher: F) -> RedisResult<T>
where
T: Serialize + for<'de> Deserialize<'de> + Clone,
F: FnOnce() -> Fut,
Fut: futures::Future<Output = Result<T, Box<dyn std::error::Error + Send + Sync>>>,
{
// 先尝试从缓存获取
if let Some(cached_value) = self.get_json::<T>(key).await? {
return Ok(cached_value);
}
// 缓存未命中,调用 fetcher 获取数据
let value = fetcher().await
.map_err(|e| RedisError::from((redis::ErrorKind::TypeError, e.to_string())))?;
// 将数据存入缓存
if let Err(e) = self.set_json(key, &value).await {
warn!(target = "udmin", key = %key, error = %e, "redis.cache.set_failed");
}
Ok(value)
}
}
/// 分布式锁
pub struct DistributedLock {
manager: RedisManager,
key: String,
value: String,
ttl: Duration,
}
impl DistributedLock {
/// 尝试获取锁
pub async fn acquire(
manager: RedisManager,
key: String,
ttl: Duration,
) -> RedisResult<Option<Self>> {
let value = uuid::Uuid::new_v4().to_string();
let lock_key = format!("lock:{}", key);
let mut conn = manager.connection.clone();
let result: Option<String> = redis::cmd("SET")
.arg(&lock_key)
.arg(&value)
.arg("EX")
.arg(ttl.as_secs())
.arg("NX")
.query_async(&mut conn)
.await?;
if result.is_some() {
info!(target = "udmin", key = %key, "distributed_lock.acquired");
Ok(Some(Self {
manager,
key: lock_key,
value,
ttl,
}))
} else {
Ok(None)
}
}
/// 释放锁
pub async fn release(self) -> RedisResult<bool> {
let script = r#"
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
else
return 0
end
"#;
let mut conn = self.manager.connection.clone();
let result: i32 = redis::Script::new(script)
.key(&self.key)
.arg(&self.value)
.invoke_async(&mut conn)
.await?;
let released = result > 0;
if released {
info!(target = "udmin", key = %self.key, "distributed_lock.released");
}
Ok(released)
}
/// 续期锁
pub async fn renew(&self) -> RedisResult<bool> {
let script = r#"
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("EXPIRE", KEYS[1], ARGV[2])
else
return 0
end
"#;
let mut conn = self.manager.connection.clone();
let result: i32 = redis::Script::new(script)
.key(&self.key)
.arg(&self.value)
.arg(self.ttl.as_secs())
.invoke_async(&mut conn)
.await?;
Ok(result > 0)
}
}
#[cfg(test)]
mod tests {
use super::*;
use tokio_test;
#[tokio::test]
async fn test_redis_config() {
let config = RedisConfig::default();
assert_eq!(config.max_connections, 50);
assert_eq!(config.connect_timeout, Duration::from_secs(5));
}
#[test]
fn test_cache_key_building() {
let manager = RedisManager {
connection: todo!(), // 在实际测试中需要模拟
config: RedisConfig::default(),
};
let cache = RedisCache::new(manager, "test".to_string(), Duration::from_secs(300));
assert_eq!(cache.build_key("user:123"), "test:user:123");
}
}
性能优化
连接池优化
- 动态连接池: 根据负载自动调整连接数
- 连接复用: 最大化连接利用率
- 健康检查: 定期检查连接状态
- 超时控制: 防止连接泄漏
查询优化
- 索引优化: 合理设计数据库索引
- 查询缓存: Redis 缓存热点数据
- 批量操作: 减少数据库往返次数
- 分页优化: 高效的分页查询
缓存策略
- 多级缓存: 内存 + Redis 缓存
- 缓存预热: 系统启动时预加载热点数据
- 缓存穿透保护: 防止恶意查询
- 缓存雪崩保护: 错开缓存过期时间
监控和日志
性能监控
- 连接池监控: 活跃连接数、等待队列长度
- 查询性能: 慢查询日志、执行时间统计
- 缓存命中率: Redis 缓存效果监控
- 错误率监控: 数据库错误统计
日志记录
- 操作日志: 记录所有数据库操作
- 性能日志: 记录查询执行时间
- 错误日志: 详细的错误信息和堆栈
- 审计日志: 敏感操作的审计记录
最佳实践
数据库设计
- 规范化设计: 避免数据冗余
- 索引策略: 合理创建和维护索引
- 数据类型: 选择合适的数据类型
- 约束设计: 充分利用数据库约束
查询优化
- 避免 N+1 查询: 使用 JOIN 或预加载
- 分页查询: 使用 LIMIT 和 OFFSET
- 条件过滤: 在数据库层面进行过滤
- 批量操作: 合并多个操作为批量操作
事务管理
- 事务边界: 明确事务的开始和结束
- 隔离级别: 根据需求选择合适的隔离级别
- 死锁处理: 设计避免死锁的策略
- 回滚策略: 合理的错误处理和回滚
缓存使用
- 缓存键设计: 有意义且唯一的缓存键
- 过期策略: 合理设置缓存过期时间
- 缓存更新: 及时更新或删除过期缓存
- 缓存预热: 系统启动时预加载重要数据
总结
UdminAI 的数据库模块提供了完整的数据持久化解决方案,具有以下特点:
- 高性能: 连接池管理和查询优化
- 高可用: 健康检查和自动重连
- 类型安全: SeaORM 提供的编译时检查
- 易维护: 清晰的模块结构和错误处理
- 可扩展: 支持多种数据库和缓存策略
通过合理的架构设计和最佳实践,确保了系统的稳定性、性能和可维护性。