| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389 |
- package connection
- import (
- "context"
- "fmt"
- "sync"
- "time"
- "dbview/service/internal/common/databases"
- "dbview/service/internal/common/databases/drivers"
- "dbview/service/internal/common/databases/meta"
- "dbview/service/internal/common/response"
- )
- // PooledConnection 池化连接,包装数据库连接和元数据
- type PooledConnection struct {
- DbConn databases.Connection // 数据库连接
- LastUsed time.Time // 最后使用时间
- Type string // 数据库类型
- Version string // 数据库版本
- DbConfig *meta.ConnectionConfig // 连接配置
- closeChan chan struct{} // 关闭通知通道
- ctx context.Context // 上下文
- cancel context.CancelFunc // 取消函数
- }
- // ConnectionPool 连接池管理器
- type ConnectionPool struct {
- pools map[string]*PooledConnection
- mu sync.RWMutex
- cleanupCtx context.Context
- cleanupCancel context.CancelFunc
- cleanupTicker *time.Ticker
- }
- // PoolStats 描述连接池中单个连接的运行状态快照
- type PoolStats struct {
- LastUsed time.Time
- Type string
- Version string
- IsConnected bool
- MaxIdleTime time.Duration
- MaxLifetime time.Duration
- MaxOpenConns int
- MaxIdleConns int
- }
- // NewConnectionPool 创建新的连接池管理器
- func NewConnectionPool(ctx context.Context) *ConnectionPool {
- cleanupCtx, cleanupCancel := context.WithCancel(ctx)
- pool := &ConnectionPool{
- pools: make(map[string]*PooledConnection),
- cleanupCtx: cleanupCtx,
- cleanupCancel: cleanupCancel,
- }
- // 启动定期清理协程
- go pool.periodicCleanup()
- return pool
- }
- // RegisterConnectionWithConfig 使用新的配置结构注册连接
- func (cp *ConnectionPool) RegisterConnectionWithConfig(connID string, dbType, dbVersion string, config *meta.ConnectionConfig) error {
- cp.mu.Lock()
- defer cp.mu.Unlock()
- // 检查连接ID是否已存在
- if _, exists := cp.pools[connID]; exists {
- return nil
- }
- ctx, cancel := context.WithCancel(cp.cleanupCtx)
- // 立即创建数据库执行器
- executor, err := drivers.Connect(dbType, dbVersion, config)
- if err != nil {
- cancel()
- return fmt.Errorf("创建数据库连接失败: %v", err)
- }
- // 测试连接
- if err := executor.Ping(context.Background()); err != nil {
- executor.Close()
- cancel()
- return fmt.Errorf("数据库连接失败: %v", err)
- }
- cp.pools[connID] = &PooledConnection{
- DbConn: executor,
- LastUsed: time.Now(),
- Type: dbType,
- Version: dbVersion,
- DbConfig: config,
- closeChan: make(chan struct{}),
- ctx: ctx,
- cancel: cancel,
- }
- return nil
- }
- // RegisterConnection 注册一个新的数据库连接配置
- func (cp *ConnectionPool) RegisterConnection(
- connID string,
- dbType string,
- dbVersion string,
- host string,
- port int,
- username string,
- password string,
- dbName string,
- schema string,
- extraParams map[string]string,
- maxIdleTime time.Duration,
- maxLifetime time.Duration,
- maxOpenConns int,
- maxIdleConns int,
- ) error {
- cp.mu.Lock()
- defer cp.mu.Unlock()
- // 检查连接ID是否已存在
- if _, exists := cp.pools[connID]; exists {
- //return fmt.Errorf("连接ID %s 已存在", connID)
- return nil
- }
- ctx, cancel := context.WithCancel(cp.cleanupCtx)
- // 创建连接配置
- config := &meta.ConnectionConfig{
- Host: host,
- Port: port,
- Username: username,
- Password: password,
- Database: dbName,
- Schema: schema,
- //ExtraParams: extraParams,
- MaxOpenConns: maxOpenConns,
- MaxIdleConns: maxIdleConns,
- MaxIdleTime: int(maxIdleTime.Seconds()),
- MaxLifetime: int(maxLifetime.Seconds()),
- }
- // 立即创建数据库执行器
- executor, err := drivers.Connect(dbType, dbVersion, config)
- if err != nil {
- cancel()
- return fmt.Errorf("创建数据库连接失败: %v", err)
- }
- // 测试连接
- if err := executor.Ping(context.Background()); err != nil {
- executor.Close()
- cancel()
- return fmt.Errorf("数据库连接失败: %v", err)
- }
- cp.pools[connID] = &PooledConnection{
- DbConn: executor,
- LastUsed: time.Now(),
- Type: dbType,
- Version: dbVersion,
- DbConfig: config,
- closeChan: make(chan struct{}),
- ctx: ctx,
- cancel: cancel,
- }
- return nil
- }
- // GetConnection 获取已注册的数据库连接
- func (cp *ConnectionPool) GetConnection(connID string) (*PooledConnection, error) {
- cp.mu.RLock()
- pooledConn, exists := cp.pools[connID]
- cp.mu.RUnlock()
- if !exists {
- // 返回可用于错误匹配的已导出哨兵错误
- return nil, fmt.Errorf("%w: %s", response.ErrConnNotExist, connID)
- }
- // 检查连接是否已被关闭
- select {
- case <-pooledConn.ctx.Done():
- return nil, fmt.Errorf("连接 %s 已关闭", connID)
- default:
- }
- // 检查连接是否已初始化
- if pooledConn.DbConn == nil {
- cp.mu.Lock()
- defer cp.mu.Unlock()
- // 再次检查是否已初始化(并发情况下)
- if pooledConn.DbConn == nil {
- // 创建新执行器
- executor, err := drivers.Connect(pooledConn.Type, pooledConn.Version, pooledConn.DbConfig)
- if err != nil {
- return nil, fmt.Errorf("创建数据库执行器失败: %v", err)
- }
- // 测试连接
- if err := executor.Ping(context.Background()); err != nil {
- executor.Close()
- return nil, fmt.Errorf("测试数据库连接失败: %v", err)
- }
- // 保存执行器
- pooledConn.DbConn = executor
- pooledConn.LastUsed = time.Now()
- }
- }
- // 检查连接是否已过期
- maxLifetime := time.Duration(pooledConn.DbConfig.MaxLifetime) * time.Second
- if maxLifetime > 0 && time.Since(pooledConn.LastUsed) > maxLifetime {
- cp.mu.Lock()
- defer cp.mu.Unlock()
- // 再次检查(并发情况下)
- if maxLifetime > 0 && time.Since(pooledConn.LastUsed) > maxLifetime {
- // 关闭旧连接
- pooledConn.DbConn.Close()
- // 创建新执行器
- executor, err := drivers.Connect(pooledConn.Type, pooledConn.Version, pooledConn.DbConfig)
- if err != nil {
- return nil, fmt.Errorf("创建数据库执行器失败: %v", err)
- }
- // 测试连接
- if err := executor.Ping(context.Background()); err != nil {
- executor.Close()
- return nil, fmt.Errorf("测试数据库连接失败: %v", err)
- }
- // 更新执行器
- pooledConn.DbConn = executor
- pooledConn.LastUsed = time.Now()
- }
- } else {
- // 更新最后使用时间
- pooledConn.LastUsed = time.Now()
- }
- return pooledConn, nil
- }
- // CloseConnection 关闭并移除连接
- func (cp *ConnectionPool) CloseConnection(connID string) error {
- cp.mu.Lock()
- defer cp.mu.Unlock()
- if pooledConn, exists := cp.pools[connID]; exists {
- // 标记连接为已关闭
- pooledConn.cancel()
- // 关闭通道通知
- close(pooledConn.closeChan)
- // 关闭实际连接
- if pooledConn.DbConn != nil {
- if closer, ok := pooledConn.DbConn.(interface{ Close() error }); ok {
- if err := closer.Close(); err != nil {
- return fmt.Errorf("关闭数据库连接失败: %v", err)
- }
- }
- pooledConn.DbConn = nil
- }
- // 从池中移除
- delete(cp.pools, connID)
- }
- return nil
- }
- // CloseAllConnections 关闭所有连接
- func (cp *ConnectionPool) CloseAllConnections() error {
- cp.mu.Lock()
- defer cp.mu.Unlock()
- // 停止清理协程
- if cp.cleanupTicker != nil {
- cp.cleanupTicker.Stop()
- }
- cp.cleanupCancel()
- var firstError error
- for connID, pooledConn := range cp.pools {
- // 标记连接为已关闭
- pooledConn.cancel()
- // 关闭通道通知
- close(pooledConn.closeChan)
- // 关闭实际连接
- if pooledConn.DbConn != nil {
- if closer, ok := pooledConn.DbConn.(interface{ Close() error }); ok {
- if err := closer.Close(); err != nil {
- if firstError == nil {
- firstError = fmt.Errorf("关闭连接 %s 失败: %v", connID, err)
- }
- }
- }
- pooledConn.DbConn = nil
- }
- }
- cp.pools = make(map[string]*PooledConnection)
- return firstError
- }
- // periodicCleanup 定期清理空闲连接
- func (cp *ConnectionPool) periodicCleanup() {
- // 默认每5分钟检查一次
- if cp.cleanupTicker == nil {
- cp.cleanupTicker = time.NewTicker(5 * time.Minute)
- }
- for {
- select {
- case <-cp.cleanupTicker.C:
- cp.mu.Lock()
- now := time.Now()
- for connID, pooledConn := range cp.pools {
- // 检查连接是否需要清理
- // 这里可以根据需要添加清理逻辑
- _ = connID
- _ = pooledConn
- _ = now
- }
- cp.mu.Unlock()
- case <-cp.cleanupCtx.Done():
- return
- }
- }
- }
- // GetConnectionStats 获取连接池状态统计
- func (cp *ConnectionPool) GetConnectionStats() map[string]PoolStats {
- cp.mu.RLock()
- defer cp.mu.RUnlock()
- stats := make(map[string]PoolStats)
- for connID, pooledConn := range cp.pools {
- stats[connID] = cp.buildPoolStatsLocked(pooledConn)
- }
- return stats
- }
- // GetConnectionStat 返回指定连接的状态快照
- func (cp *ConnectionPool) GetConnectionStat(connID string) (PoolStats, bool) {
- cp.mu.RLock()
- defer cp.mu.RUnlock()
- pooledConn, exists := cp.pools[connID]
- if !exists {
- return PoolStats{}, false
- }
- return cp.buildPoolStatsLocked(pooledConn), true
- }
- // buildPoolStatsLocked 在持有读写锁的前提下构建连接状态
- func (cp *ConnectionPool) buildPoolStatsLocked(pooledConn *PooledConnection) PoolStats {
- maxIdleTime := time.Duration(pooledConn.DbConfig.MaxIdleTime) * time.Second
- maxLifetime := time.Duration(pooledConn.DbConfig.MaxLifetime) * time.Second
- return PoolStats{
- LastUsed: pooledConn.LastUsed,
- Type: pooledConn.Type,
- Version: pooledConn.Version,
- IsConnected: pooledConn.DbConn != nil,
- MaxIdleTime: maxIdleTime,
- MaxLifetime: maxLifetime,
- MaxOpenConns: pooledConn.DbConfig.MaxOpenConns,
- MaxIdleConns: pooledConn.DbConfig.MaxIdleConns,
- }
- }
|