connection_pool.go 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389
  1. package connection
  2. import (
  3. "context"
  4. "fmt"
  5. "sync"
  6. "time"
  7. "dbview/service/internal/common/databases"
  8. "dbview/service/internal/common/databases/drivers"
  9. "dbview/service/internal/common/databases/meta"
  10. "dbview/service/internal/common/response"
  11. )
  12. // PooledConnection 池化连接,包装数据库连接和元数据
  13. type PooledConnection struct {
  14. DbConn databases.Connection // 数据库连接
  15. LastUsed time.Time // 最后使用时间
  16. Type string // 数据库类型
  17. Version string // 数据库版本
  18. DbConfig *meta.ConnectionConfig // 连接配置
  19. closeChan chan struct{} // 关闭通知通道
  20. ctx context.Context // 上下文
  21. cancel context.CancelFunc // 取消函数
  22. }
  23. // ConnectionPool 连接池管理器
  24. type ConnectionPool struct {
  25. pools map[string]*PooledConnection
  26. mu sync.RWMutex
  27. cleanupCtx context.Context
  28. cleanupCancel context.CancelFunc
  29. cleanupTicker *time.Ticker
  30. }
  31. // PoolStats 描述连接池中单个连接的运行状态快照
  32. type PoolStats struct {
  33. LastUsed time.Time
  34. Type string
  35. Version string
  36. IsConnected bool
  37. MaxIdleTime time.Duration
  38. MaxLifetime time.Duration
  39. MaxOpenConns int
  40. MaxIdleConns int
  41. }
  42. // NewConnectionPool 创建新的连接池管理器
  43. func NewConnectionPool(ctx context.Context) *ConnectionPool {
  44. cleanupCtx, cleanupCancel := context.WithCancel(ctx)
  45. pool := &ConnectionPool{
  46. pools: make(map[string]*PooledConnection),
  47. cleanupCtx: cleanupCtx,
  48. cleanupCancel: cleanupCancel,
  49. }
  50. // 启动定期清理协程
  51. go pool.periodicCleanup()
  52. return pool
  53. }
  54. // RegisterConnectionWithConfig 使用新的配置结构注册连接
  55. func (cp *ConnectionPool) RegisterConnectionWithConfig(connID string, dbType, dbVersion string, config *meta.ConnectionConfig) error {
  56. cp.mu.Lock()
  57. defer cp.mu.Unlock()
  58. // 检查连接ID是否已存在
  59. if _, exists := cp.pools[connID]; exists {
  60. return nil
  61. }
  62. ctx, cancel := context.WithCancel(cp.cleanupCtx)
  63. // 立即创建数据库执行器
  64. executor, err := drivers.Connect(dbType, dbVersion, config)
  65. if err != nil {
  66. cancel()
  67. return fmt.Errorf("创建数据库连接失败: %v", err)
  68. }
  69. // 测试连接
  70. if err := executor.Ping(context.Background()); err != nil {
  71. executor.Close()
  72. cancel()
  73. return fmt.Errorf("数据库连接失败: %v", err)
  74. }
  75. cp.pools[connID] = &PooledConnection{
  76. DbConn: executor,
  77. LastUsed: time.Now(),
  78. Type: dbType,
  79. Version: dbVersion,
  80. DbConfig: config,
  81. closeChan: make(chan struct{}),
  82. ctx: ctx,
  83. cancel: cancel,
  84. }
  85. return nil
  86. }
  87. // RegisterConnection 注册一个新的数据库连接配置
  88. func (cp *ConnectionPool) RegisterConnection(
  89. connID string,
  90. dbType string,
  91. dbVersion string,
  92. host string,
  93. port int,
  94. username string,
  95. password string,
  96. dbName string,
  97. schema string,
  98. extraParams map[string]string,
  99. maxIdleTime time.Duration,
  100. maxLifetime time.Duration,
  101. maxOpenConns int,
  102. maxIdleConns int,
  103. ) error {
  104. cp.mu.Lock()
  105. defer cp.mu.Unlock()
  106. // 检查连接ID是否已存在
  107. if _, exists := cp.pools[connID]; exists {
  108. //return fmt.Errorf("连接ID %s 已存在", connID)
  109. return nil
  110. }
  111. ctx, cancel := context.WithCancel(cp.cleanupCtx)
  112. // 创建连接配置
  113. config := &meta.ConnectionConfig{
  114. Host: host,
  115. Port: port,
  116. Username: username,
  117. Password: password,
  118. Database: dbName,
  119. Schema: schema,
  120. //ExtraParams: extraParams,
  121. MaxOpenConns: maxOpenConns,
  122. MaxIdleConns: maxIdleConns,
  123. MaxIdleTime: int(maxIdleTime.Seconds()),
  124. MaxLifetime: int(maxLifetime.Seconds()),
  125. }
  126. // 立即创建数据库执行器
  127. executor, err := drivers.Connect(dbType, dbVersion, config)
  128. if err != nil {
  129. cancel()
  130. return fmt.Errorf("创建数据库连接失败: %v", err)
  131. }
  132. // 测试连接
  133. if err := executor.Ping(context.Background()); err != nil {
  134. executor.Close()
  135. cancel()
  136. return fmt.Errorf("数据库连接失败: %v", err)
  137. }
  138. cp.pools[connID] = &PooledConnection{
  139. DbConn: executor,
  140. LastUsed: time.Now(),
  141. Type: dbType,
  142. Version: dbVersion,
  143. DbConfig: config,
  144. closeChan: make(chan struct{}),
  145. ctx: ctx,
  146. cancel: cancel,
  147. }
  148. return nil
  149. }
  150. // GetConnection 获取已注册的数据库连接
  151. func (cp *ConnectionPool) GetConnection(connID string) (*PooledConnection, error) {
  152. cp.mu.RLock()
  153. pooledConn, exists := cp.pools[connID]
  154. cp.mu.RUnlock()
  155. if !exists {
  156. // 返回可用于错误匹配的已导出哨兵错误
  157. return nil, fmt.Errorf("%w: %s", response.ErrConnNotExist, connID)
  158. }
  159. // 检查连接是否已被关闭
  160. select {
  161. case <-pooledConn.ctx.Done():
  162. return nil, fmt.Errorf("连接 %s 已关闭", connID)
  163. default:
  164. }
  165. // 检查连接是否已初始化
  166. if pooledConn.DbConn == nil {
  167. cp.mu.Lock()
  168. defer cp.mu.Unlock()
  169. // 再次检查是否已初始化(并发情况下)
  170. if pooledConn.DbConn == nil {
  171. // 创建新执行器
  172. executor, err := drivers.Connect(pooledConn.Type, pooledConn.Version, pooledConn.DbConfig)
  173. if err != nil {
  174. return nil, fmt.Errorf("创建数据库执行器失败: %v", err)
  175. }
  176. // 测试连接
  177. if err := executor.Ping(context.Background()); err != nil {
  178. executor.Close()
  179. return nil, fmt.Errorf("测试数据库连接失败: %v", err)
  180. }
  181. // 保存执行器
  182. pooledConn.DbConn = executor
  183. pooledConn.LastUsed = time.Now()
  184. }
  185. }
  186. // 检查连接是否已过期
  187. maxLifetime := time.Duration(pooledConn.DbConfig.MaxLifetime) * time.Second
  188. if maxLifetime > 0 && time.Since(pooledConn.LastUsed) > maxLifetime {
  189. cp.mu.Lock()
  190. defer cp.mu.Unlock()
  191. // 再次检查(并发情况下)
  192. if maxLifetime > 0 && time.Since(pooledConn.LastUsed) > maxLifetime {
  193. // 关闭旧连接
  194. pooledConn.DbConn.Close()
  195. // 创建新执行器
  196. executor, err := drivers.Connect(pooledConn.Type, pooledConn.Version, pooledConn.DbConfig)
  197. if err != nil {
  198. return nil, fmt.Errorf("创建数据库执行器失败: %v", err)
  199. }
  200. // 测试连接
  201. if err := executor.Ping(context.Background()); err != nil {
  202. executor.Close()
  203. return nil, fmt.Errorf("测试数据库连接失败: %v", err)
  204. }
  205. // 更新执行器
  206. pooledConn.DbConn = executor
  207. pooledConn.LastUsed = time.Now()
  208. }
  209. } else {
  210. // 更新最后使用时间
  211. pooledConn.LastUsed = time.Now()
  212. }
  213. return pooledConn, nil
  214. }
  215. // CloseConnection 关闭并移除连接
  216. func (cp *ConnectionPool) CloseConnection(connID string) error {
  217. cp.mu.Lock()
  218. defer cp.mu.Unlock()
  219. if pooledConn, exists := cp.pools[connID]; exists {
  220. // 标记连接为已关闭
  221. pooledConn.cancel()
  222. // 关闭通道通知
  223. close(pooledConn.closeChan)
  224. // 关闭实际连接
  225. if pooledConn.DbConn != nil {
  226. if closer, ok := pooledConn.DbConn.(interface{ Close() error }); ok {
  227. if err := closer.Close(); err != nil {
  228. return fmt.Errorf("关闭数据库连接失败: %v", err)
  229. }
  230. }
  231. pooledConn.DbConn = nil
  232. }
  233. // 从池中移除
  234. delete(cp.pools, connID)
  235. }
  236. return nil
  237. }
  238. // CloseAllConnections 关闭所有连接
  239. func (cp *ConnectionPool) CloseAllConnections() error {
  240. cp.mu.Lock()
  241. defer cp.mu.Unlock()
  242. // 停止清理协程
  243. if cp.cleanupTicker != nil {
  244. cp.cleanupTicker.Stop()
  245. }
  246. cp.cleanupCancel()
  247. var firstError error
  248. for connID, pooledConn := range cp.pools {
  249. // 标记连接为已关闭
  250. pooledConn.cancel()
  251. // 关闭通道通知
  252. close(pooledConn.closeChan)
  253. // 关闭实际连接
  254. if pooledConn.DbConn != nil {
  255. if closer, ok := pooledConn.DbConn.(interface{ Close() error }); ok {
  256. if err := closer.Close(); err != nil {
  257. if firstError == nil {
  258. firstError = fmt.Errorf("关闭连接 %s 失败: %v", connID, err)
  259. }
  260. }
  261. }
  262. pooledConn.DbConn = nil
  263. }
  264. }
  265. cp.pools = make(map[string]*PooledConnection)
  266. return firstError
  267. }
  268. // periodicCleanup 定期清理空闲连接
  269. func (cp *ConnectionPool) periodicCleanup() {
  270. // 默认每5分钟检查一次
  271. if cp.cleanupTicker == nil {
  272. cp.cleanupTicker = time.NewTicker(5 * time.Minute)
  273. }
  274. for {
  275. select {
  276. case <-cp.cleanupTicker.C:
  277. cp.mu.Lock()
  278. now := time.Now()
  279. for connID, pooledConn := range cp.pools {
  280. // 检查连接是否需要清理
  281. // 这里可以根据需要添加清理逻辑
  282. _ = connID
  283. _ = pooledConn
  284. _ = now
  285. }
  286. cp.mu.Unlock()
  287. case <-cp.cleanupCtx.Done():
  288. return
  289. }
  290. }
  291. }
  292. // GetConnectionStats 获取连接池状态统计
  293. func (cp *ConnectionPool) GetConnectionStats() map[string]PoolStats {
  294. cp.mu.RLock()
  295. defer cp.mu.RUnlock()
  296. stats := make(map[string]PoolStats)
  297. for connID, pooledConn := range cp.pools {
  298. stats[connID] = cp.buildPoolStatsLocked(pooledConn)
  299. }
  300. return stats
  301. }
  302. // GetConnectionStat 返回指定连接的状态快照
  303. func (cp *ConnectionPool) GetConnectionStat(connID string) (PoolStats, bool) {
  304. cp.mu.RLock()
  305. defer cp.mu.RUnlock()
  306. pooledConn, exists := cp.pools[connID]
  307. if !exists {
  308. return PoolStats{}, false
  309. }
  310. return cp.buildPoolStatsLocked(pooledConn), true
  311. }
  312. // buildPoolStatsLocked 在持有读写锁的前提下构建连接状态
  313. func (cp *ConnectionPool) buildPoolStatsLocked(pooledConn *PooledConnection) PoolStats {
  314. maxIdleTime := time.Duration(pooledConn.DbConfig.MaxIdleTime) * time.Second
  315. maxLifetime := time.Duration(pooledConn.DbConfig.MaxLifetime) * time.Second
  316. return PoolStats{
  317. LastUsed: pooledConn.LastUsed,
  318. Type: pooledConn.Type,
  319. Version: pooledConn.Version,
  320. IsConnected: pooledConn.DbConn != nil,
  321. MaxIdleTime: maxIdleTime,
  322. MaxLifetime: maxLifetime,
  323. MaxOpenConns: pooledConn.DbConfig.MaxOpenConns,
  324. MaxIdleConns: pooledConn.DbConfig.MaxIdleConns,
  325. }
  326. }