mysql_executor.go 8.8 KB


  1. package db_executor
  2. import (
  3. "database/sql"
  4. "fmt"
  5. "strconv"
  6. "strings"
  7. "time"
  8. "xg_fetl/internal/models"
  9. _ "github.com/go-sql-driver/mysql"
  10. "xorm.io/xorm"
  11. )
  12. type MySQLExecutor struct {
  13. BaseExecutor
  14. }
  15. func (m *MySQLExecutor) InitDB(dsn string) error {
  16. var err error
  17. m.Engine, err = xorm.NewEngine("mysql", dsn)
  18. if err != nil {
  19. return fmt.Errorf("failed to initialize DB: %v", err)
  20. }
  21. // 设置连接池参数
  22. m.Engine.SetMaxOpenConns(10)
  23. m.Engine.SetMaxIdleConns(5)
  24. m.Engine.SetConnMaxLifetime(time.Hour)
  25. return nil
  26. }
  27. // GetDatabases 获取所有数据库名称,排除系统数据库
  28. func (m *MySQLExecutor) GetDatabases(excludeDBs []string) ([]string, error) {
  29. var databases []struct {
  30. Database string `xorm:"Database"`
  31. }
  32. // 执行 SHOW DATABASES 查询
  33. err := m.Engine.SQL("SHOW DATABASES").Find(&databases)
  34. if err != nil {
  35. return nil, fmt.Errorf("failed to get databases: %v", err)
  36. }
  37. // 获取所有数据库名
  38. var allDBs []string
  39. for _, db := range databases {
  40. allDBs = append(allDBs, db.Database)
  41. }
  42. // 过滤排除的数据库
  43. excludeDBsMap := make(map[string]struct{})
  44. for _, db := range excludeDBs {
  45. excludeDBsMap[db] = struct{}{}
  46. }
  47. var dbs []string
  48. for _, db := range allDBs {
  49. if _, ok := excludeDBsMap[db]; !ok {
  50. dbs = append(dbs, db)
  51. }
  52. }
  53. return dbs, nil
  54. }
  55. // 添加MySQL特有的操作方法
  56. func (m *MySQLExecutor) GetEngine() *xorm.Engine {
  57. return m.Engine
  58. }
  59. func (m *MySQLExecutor) GetDatabaseInfo(database string) (*models.DatabaseInfo, error) {
  60. // 获取表列表
  61. tables, err := m.Engine.DBMetas()
  62. if err != nil {
  63. return nil, fmt.Errorf("failed to get tables: %v", err)
  64. }
  65. // 初始化普通表和 LOB 表信息的映射
  66. tableInfoMap := make(map[string]models.TableInfo) // 普通表
  67. lobTableInfoMap := make(map[string]models.TableInfo) // LOB 表
  68. // 查询数据库的总大小
  69. query := `SELECT SUM(data_length + index_length) / 1024 / 1024 AS size_mb
  70. FROM information_schema.tables WHERE table_schema = ?`
  71. var size sql.NullFloat64
  72. found, err := m.Engine.SQL(query, database).Get(&size)
  73. if err != nil {
  74. return nil, fmt.Errorf("failed to get database size: %v", err)
  75. }
  76. if !found {
  77. return nil, fmt.Errorf("no size information found for database: %s", database)
  78. }
  79. // 查询每个表的大小
  80. query = `SELECT TABLE_NAME, (data_length + index_length) / 1024 / 1024 AS size_mb
  81. FROM information_schema.tables WHERE table_schema = ?`
  82. results, err := m.Engine.QueryString(query, database)
  83. if err != nil {
  84. return nil, fmt.Errorf("failed to get table sizes: %v", err)
  85. }
  86. // 解析每个表的信息
  87. for _, row := range results {
  88. tableName := row["TABLE_NAME"]
  89. tableSizeStr := row["size_mb"]
  90. tableSize, _ := strconv.ParseFloat(tableSizeStr, 64)
  91. // 初始化 TableInfo
  92. info := models.TableInfo{
  93. Name: tableName,
  94. Size: tableSize,
  95. }
  96. // 获取表的 DDL
  97. ddlQuery := fmt.Sprintf("SHOW CREATE TABLE `%s`", tableName)
  98. ddlResult, err := m.Engine.QueryString(ddlQuery)
  99. if err != nil {
  100. return nil, fmt.Errorf("failed to get DDL for table %s: %v", tableName, err)
  101. }
  102. if len(ddlResult) > 0 {
  103. info.DDL = ddlResult[0]["Create Table"]
  104. }
  105. // 获取表的行数
  106. countQuery := fmt.Sprintf("SELECT COUNT(*) AS row_count FROM `%s`", tableName)
  107. countResult, err := m.Engine.QueryString(countQuery)
  108. if err != nil {
  109. return nil, fmt.Errorf("failed to get row count for table %s: %v", tableName, err)
  110. }
  111. if len(countResult) > 0 {
  112. info.Count, _ = strconv.Atoi(countResult[0]["row_count"])
  113. }
  114. // 检查表是否包含 LOB 类型字段
  115. columnQuery := fmt.Sprintf(`SELECT COLUMN_NAME, DATA_TYPE
  116. FROM information_schema.columns
  117. WHERE table_schema = '%s' AND table_name = '%s'`, database, tableName)
  118. columns, err := m.Engine.QueryString(columnQuery)
  119. if err != nil {
  120. return nil, fmt.Errorf("failed to get columns for table %s: %v", tableName, err)
  121. }
  122. // 遍历列,检查是否有 LOB 类型(TEXT, BLOB 等)
  123. isLobTable := false
  124. for _, column := range columns {
  125. dataType := column["DATA_TYPE"]
  126. if dataType == "text" || dataType == "blob" || dataType == "mediumtext" || dataType == "longtext" || dataType == "mediumblob" || dataType == "longblob" {
  127. isLobTable = true
  128. break
  129. }
  130. }
  131. // 将表信息存入对应的映射
  132. if isLobTable {
  133. lobTableInfoMap[tableName] = info
  134. } else {
  135. tableInfoMap[tableName] = info
  136. }
  137. }
  138. // 确定数据库总大小
  139. sizeMB := 0.0
  140. if size.Valid {
  141. sizeMB = size.Float64
  142. }
  143. //-- 获取存储过程和函数
  144. funcsQuery := `SELECT ROUTINE_NAME, ROUTINE_TYPE, ROUTINE_DEFINITION
  145. FROM information_schema.ROUTINES
  146. WHERE ROUTINE_SCHEMA = ?`
  147. funcsResults, err := m.Engine.QueryString(funcsQuery, database)
  148. if err != nil {
  149. return nil, fmt.Errorf("获取存储过程/函数失败: %v", err)
  150. }
  151. var functionInfos []models.FunctionInfo
  152. for _, row := range funcsResults {
  153. functionInfo := models.FunctionInfo{
  154. Name: row["ROUTINE_NAME"],
  155. Type: row["ROUTINE_TYPE"],
  156. Definition: row["ROUTINE_DEFINITION"],
  157. }
  158. functionInfos = append(functionInfos, functionInfo)
  159. }
  160. // 获取触发器
  161. triggersQuery := `SELECT TRIGGER_NAME, EVENT_MANIPULATION, EVENT_OBJECT_TABLE,
  162. ACTION_TIMING, ACTION_STATEMENT
  163. FROM information_schema.TRIGGERS
  164. WHERE TRIGGER_SCHEMA = ?`
  165. triggersResults, err := m.Engine.QueryString(triggersQuery, database)
  166. if err != nil {
  167. return nil, fmt.Errorf("获取触发器失败: %v", err)
  168. }
  169. var triggerInfos []models.TriggerInfo
  170. for _, row := range triggersResults {
  171. triggerInfo := models.TriggerInfo{
  172. Name: row["TRIGGER_NAME"],
  173. Event: row["EVENT_MANIPULATION"],
  174. Table: row["EVENT_OBJECT_TABLE"],
  175. Timing: row["ACTION_TIMING"],
  176. Statement: row["ACTION_STATEMENT"],
  177. }
  178. triggerInfos = append(triggerInfos, triggerInfo)
  179. }
  180. // 返回包含总大小、表数量和表信息的 DatabaseInfo 结构体
  181. return &models.DatabaseInfo{
  182. SizeMB: sizeMB,
  183. TableCount: len(tables),
  184. Tables: tableInfoMap,
  185. LobTables: lobTableInfoMap,
  186. Functions: functionInfos,
  187. Triggers: triggerInfos,
  188. }, nil
  189. }
  190. // ReadTableFromDBWithPagination 使用 Xorm 分页从数据库中读取表数据
  191. // 参数:
  192. // - engine: Xorm 引擎实例,用于连接和操作数据库
  193. // - tableName: 要读取的表名
  194. // - offset: 分页查询的起始偏移量
  195. // - pageSize: 每次分页查询的行数
  196. // 返回值:
  197. // - headers: 表的列名列表
  198. // - records: 表的数据,包含每行的内容
  199. // - error: 如果查询出错,返回相关的错误信息
  200. func (m *MySQLExecutor) ReadTableFromDBWithPagination(tableName string, offset int, pageSize int) ([]string, [][]string, error) {
  201. // 构建 SQL 查询,使用 LIMIT 和 OFFSET 来分页查询数据
  202. sql := fmt.Sprintf("SELECT * FROM %s LIMIT %d OFFSET %d", tableName, pageSize, offset)
  203. rows, err := m.Engine.QueryString(sql)
  204. if err != nil {
  205. return nil, nil, fmt.Errorf("failed to query table: %v", err)
  206. }
  207. // 如果没有查询到数据,则返回 nil
  208. if len(rows) == 0 {
  209. return nil, nil, nil
  210. }
  211. // 获取列名
  212. headers := make([]string, 0)
  213. for key := range rows[0] {
  214. headers = append(headers, key)
  215. }
  216. // 获取数据行
  217. records := make([][]string, 0)
  218. for _, row := range rows {
  219. record := make([]string, len(headers))
  220. for i, header := range headers {
  221. record[i] = row[header]
  222. }
  223. records = append(records, record)
  224. }
  225. return headers, records, nil
  226. }
  227. // InsertRecordsToDB 使用 Xorm 的 Session 结合 Prepare 语句将记录插入到数据库中
  228. // 参数:
  229. // - engine: Xorm 引擎实例,用于连接和操作数据库
  230. // - tableName: 插入的目标表名
  231. // - headers: 表头,用于生成插入语句中的列名
  232. // - records: 要插入的数据记录,每行对应表的一条记录
  233. // 返回值:
  234. // - error: 如果插入过程中有错误,返回相关的错误信息
  235. func (m *MySQLExecutor) InsertRecordsToDB(tableName string, headers []string, records [][]string) error {
  236. if len(records) == 0 {
  237. return nil // 如果没有记录,不进行任何插入操作
  238. }
  239. // 获取 Xorm 底层的 *sql.DB 实例
  240. db := m.Engine.DB().DB
  241. // 构建插入 SQL 语句
  242. columnNames := strings.Join(headers, ", ")
  243. placeholders := strings.Repeat("?, ", len(headers))
  244. placeholders = strings.TrimSuffix(placeholders, ", ")
  245. insertSQL := fmt.Sprintf("INSERT INTO %s (%s) VALUES (%s)", tableName, columnNames, placeholders)
  246. // 准备插入语句
  247. stmt, err := db.Prepare(insertSQL)
  248. if err != nil {
  249. return fmt.Errorf("failed to prepare insert statement: %v", err)
  250. }
  251. defer stmt.Close()
  252. // 插入记录
  253. for _, record := range records {
  254. args := make([]interface{}, len(record))
  255. for i, v := range record {
  256. args[i] = v
  257. }
  258. // 使用准备好的语句执行插入
  259. _, err := stmt.Exec(args...)
  260. if err != nil {
  261. return fmt.Errorf("failed to execute insert statement: %v", err)
  262. }
  263. }
  264. return nil
  265. }