package connection import ( "context" "fmt" "sync" "time" "dbview/service/internal/common/databases" "dbview/service/internal/common/databases/drivers" "dbview/service/internal/common/databases/meta" "dbview/service/internal/common/response" ) // PooledConnection 池化连接,包装数据库连接和元数据 type PooledConnection struct { DbConn databases.Connection // 数据库连接 LastUsed time.Time // 最后使用时间 Type string // 数据库类型 Version string // 数据库版本 DbConfig *meta.ConnectionConfig // 连接配置 closeChan chan struct{} // 关闭通知通道 ctx context.Context // 上下文 cancel context.CancelFunc // 取消函数 } // ConnectionPool 连接池管理器 type ConnectionPool struct { pools map[string]*PooledConnection mu sync.RWMutex cleanupCtx context.Context cleanupCancel context.CancelFunc cleanupTicker *time.Ticker } // PoolStats 描述连接池中单个连接的运行状态快照 type PoolStats struct { LastUsed time.Time Type string Version string IsConnected bool MaxIdleTime time.Duration MaxLifetime time.Duration MaxOpenConns int MaxIdleConns int } // NewConnectionPool 创建新的连接池管理器 func NewConnectionPool(ctx context.Context) *ConnectionPool { cleanupCtx, cleanupCancel := context.WithCancel(ctx) pool := &ConnectionPool{ pools: make(map[string]*PooledConnection), cleanupCtx: cleanupCtx, cleanupCancel: cleanupCancel, } // 启动定期清理协程 go pool.periodicCleanup() return pool } // RegisterConnectionWithConfig 使用新的配置结构注册连接 func (cp *ConnectionPool) RegisterConnectionWithConfig(connID string, dbType, dbVersion string, config *meta.ConnectionConfig) error { cp.mu.Lock() defer cp.mu.Unlock() // 检查连接ID是否已存在 if _, exists := cp.pools[connID]; exists { return nil } ctx, cancel := context.WithCancel(cp.cleanupCtx) // 立即创建数据库执行器 executor, err := drivers.Connect(dbType, dbVersion, config) if err != nil { cancel() return fmt.Errorf("创建数据库连接失败: %v", err) } // 测试连接 if err := executor.Ping(context.Background()); err != nil { executor.Close() cancel() return fmt.Errorf("数据库连接失败: %v", err) } cp.pools[connID] = &PooledConnection{ DbConn: executor, LastUsed: time.Now(), Type: dbType, Version: dbVersion, DbConfig: config, closeChan: make(chan struct{}), ctx: ctx, cancel: cancel, } return nil } // RegisterConnection 注册一个新的数据库连接配置 func (cp *ConnectionPool) RegisterConnection( connID string, dbType string, dbVersion string, host string, port int, username string, password string, dbName string, schema string, extraParams map[string]string, maxIdleTime time.Duration, maxLifetime time.Duration, maxOpenConns int, maxIdleConns int, ) error { cp.mu.Lock() defer cp.mu.Unlock() // 检查连接ID是否已存在 if _, exists := cp.pools[connID]; exists { //return fmt.Errorf("连接ID %s 已存在", connID) return nil } ctx, cancel := context.WithCancel(cp.cleanupCtx) // 创建连接配置 config := &meta.ConnectionConfig{ Host: host, Port: port, Username: username, Password: password, Database: dbName, Schema: schema, //ExtraParams: extraParams, MaxOpenConns: maxOpenConns, MaxIdleConns: maxIdleConns, MaxIdleTime: int(maxIdleTime.Seconds()), MaxLifetime: int(maxLifetime.Seconds()), } // 立即创建数据库执行器 executor, err := drivers.Connect(dbType, dbVersion, config) if err != nil { cancel() return fmt.Errorf("创建数据库连接失败: %v", err) } // 测试连接 if err := executor.Ping(context.Background()); err != nil { executor.Close() cancel() return fmt.Errorf("数据库连接失败: %v", err) } cp.pools[connID] = &PooledConnection{ DbConn: executor, LastUsed: time.Now(), Type: dbType, Version: dbVersion, DbConfig: config, closeChan: make(chan struct{}), ctx: ctx, cancel: cancel, } return nil } // GetConnection 获取已注册的数据库连接 func (cp *ConnectionPool) GetConnection(connID string) (*PooledConnection, error) { cp.mu.RLock() pooledConn, exists := cp.pools[connID] cp.mu.RUnlock() if !exists { // 返回可用于错误匹配的已导出哨兵错误 return nil, fmt.Errorf("%w: %s", response.ErrConnNotExist, connID) } // 检查连接是否已被关闭 select { case <-pooledConn.ctx.Done(): return nil, fmt.Errorf("连接 %s 已关闭", connID) default: } // 检查连接是否已初始化 if pooledConn.DbConn == nil { cp.mu.Lock() defer cp.mu.Unlock() // 再次检查是否已初始化(并发情况下) if pooledConn.DbConn == nil { // 创建新执行器 executor, err := drivers.Connect(pooledConn.Type, pooledConn.Version, pooledConn.DbConfig) if err != nil { return nil, fmt.Errorf("创建数据库执行器失败: %v", err) } // 测试连接 if err := executor.Ping(context.Background()); err != nil { executor.Close() return nil, fmt.Errorf("测试数据库连接失败: %v", err) } // 保存执行器 pooledConn.DbConn = executor pooledConn.LastUsed = time.Now() } } // 检查连接是否已过期 maxLifetime := time.Duration(pooledConn.DbConfig.MaxLifetime) * time.Second if maxLifetime > 0 && time.Since(pooledConn.LastUsed) > maxLifetime { cp.mu.Lock() defer cp.mu.Unlock() // 再次检查(并发情况下) if maxLifetime > 0 && time.Since(pooledConn.LastUsed) > maxLifetime { // 关闭旧连接 pooledConn.DbConn.Close() // 创建新执行器 executor, err := drivers.Connect(pooledConn.Type, pooledConn.Version, pooledConn.DbConfig) if err != nil { return nil, fmt.Errorf("创建数据库执行器失败: %v", err) } // 测试连接 if err := executor.Ping(context.Background()); err != nil { executor.Close() return nil, fmt.Errorf("测试数据库连接失败: %v", err) } // 更新执行器 pooledConn.DbConn = executor pooledConn.LastUsed = time.Now() } } else { // 更新最后使用时间 pooledConn.LastUsed = time.Now() } return pooledConn, nil } // CloseConnection 关闭并移除连接 func (cp *ConnectionPool) CloseConnection(connID string) error { cp.mu.Lock() defer cp.mu.Unlock() if pooledConn, exists := cp.pools[connID]; exists { // 标记连接为已关闭 pooledConn.cancel() // 关闭通道通知 close(pooledConn.closeChan) // 关闭实际连接 if pooledConn.DbConn != nil { if closer, ok := pooledConn.DbConn.(interface{ Close() error }); ok { if err := closer.Close(); err != nil { return fmt.Errorf("关闭数据库连接失败: %v", err) } } pooledConn.DbConn = nil } // 从池中移除 delete(cp.pools, connID) } return nil } // CloseAllConnections 关闭所有连接 func (cp *ConnectionPool) CloseAllConnections() error { cp.mu.Lock() defer cp.mu.Unlock() // 停止清理协程 if cp.cleanupTicker != nil { cp.cleanupTicker.Stop() } cp.cleanupCancel() var firstError error for connID, pooledConn := range cp.pools { // 标记连接为已关闭 pooledConn.cancel() // 关闭通道通知 close(pooledConn.closeChan) // 关闭实际连接 if pooledConn.DbConn != nil { if closer, ok := pooledConn.DbConn.(interface{ Close() error }); ok { if err := closer.Close(); err != nil { if firstError == nil { firstError = fmt.Errorf("关闭连接 %s 失败: %v", connID, err) } } } pooledConn.DbConn = nil } } cp.pools = make(map[string]*PooledConnection) return firstError } // periodicCleanup 定期清理空闲连接 func (cp *ConnectionPool) periodicCleanup() { // 默认每5分钟检查一次 if cp.cleanupTicker == nil { cp.cleanupTicker = time.NewTicker(5 * time.Minute) } for { select { case <-cp.cleanupTicker.C: cp.mu.Lock() now := time.Now() for connID, pooledConn := range cp.pools { // 检查连接是否需要清理 // 这里可以根据需要添加清理逻辑 _ = connID _ = pooledConn _ = now } cp.mu.Unlock() case <-cp.cleanupCtx.Done(): return } } } // GetConnectionStats 获取连接池状态统计 func (cp *ConnectionPool) GetConnectionStats() map[string]PoolStats { cp.mu.RLock() defer cp.mu.RUnlock() stats := make(map[string]PoolStats) for connID, pooledConn := range cp.pools { stats[connID] = cp.buildPoolStatsLocked(pooledConn) } return stats } // GetConnectionStat 返回指定连接的状态快照 func (cp *ConnectionPool) GetConnectionStat(connID string) (PoolStats, bool) { cp.mu.RLock() defer cp.mu.RUnlock() pooledConn, exists := cp.pools[connID] if !exists { return PoolStats{}, false } return cp.buildPoolStatsLocked(pooledConn), true } // buildPoolStatsLocked 在持有读写锁的前提下构建连接状态 func (cp *ConnectionPool) buildPoolStatsLocked(pooledConn *PooledConnection) PoolStats { maxIdleTime := time.Duration(pooledConn.DbConfig.MaxIdleTime) * time.Second maxLifetime := time.Duration(pooledConn.DbConfig.MaxLifetime) * time.Second return PoolStats{ LastUsed: pooledConn.LastUsed, Type: pooledConn.Type, Version: pooledConn.Version, IsConnected: pooledConn.DbConn != nil, MaxIdleTime: maxIdleTime, MaxLifetime: maxLifetime, MaxOpenConns: pooledConn.DbConfig.MaxOpenConns, MaxIdleConns: pooledConn.DbConfig.MaxIdleConns, } }