package db_executor import ( "database/sql" "fmt" "strconv" "strings" "time" "xg_fetl/internal/models" _ "github.com/go-sql-driver/mysql" "xorm.io/xorm" ) type MySQLExecutor struct { BaseExecutor } func (m *MySQLExecutor) InitDB(dsn string) error { var err error m.Engine, err = xorm.NewEngine("mysql", dsn) if err != nil { return fmt.Errorf("failed to initialize DB: %v", err) } // 设置连接池参数 m.Engine.SetMaxOpenConns(10) m.Engine.SetMaxIdleConns(5) m.Engine.SetConnMaxLifetime(time.Hour) return nil } // GetDatabases 获取所有数据库名称,排除系统数据库 func (m *MySQLExecutor) GetDatabases(excludeDBs []string) ([]string, error) { var databases []struct { Database string `xorm:"Database"` } // 执行 SHOW DATABASES 查询 err := m.Engine.SQL("SHOW DATABASES").Find(&databases) if err != nil { return nil, fmt.Errorf("failed to get databases: %v", err) } // 获取所有数据库名 var allDBs []string for _, db := range databases { allDBs = append(allDBs, db.Database) } // 过滤排除的数据库 excludeDBsMap := make(map[string]struct{}) for _, db := range excludeDBs { excludeDBsMap[db] = struct{}{} } var dbs []string for _, db := range allDBs { if _, ok := excludeDBsMap[db]; !ok { dbs = append(dbs, db) } } return dbs, nil } // 添加MySQL特有的操作方法 func (m *MySQLExecutor) GetEngine() *xorm.Engine { return m.Engine } func (m *MySQLExecutor) GetDatabaseInfo(database string) (*models.DatabaseInfo, error) { // 获取表列表 tables, err := m.Engine.DBMetas() if err != nil { return nil, fmt.Errorf("failed to get tables: %v", err) } // 初始化普通表和 LOB 表信息的映射 tableInfoMap := make(map[string]models.TableInfo) // 普通表 lobTableInfoMap := make(map[string]models.TableInfo) // LOB 表 // 查询数据库的总大小 query := `SELECT SUM(data_length + index_length) / 1024 / 1024 AS size_mb FROM information_schema.tables WHERE table_schema = ?` var size sql.NullFloat64 found, err := m.Engine.SQL(query, database).Get(&size) if err != nil { return nil, fmt.Errorf("failed to get database size: %v", err) } if !found { return nil, fmt.Errorf("no size information found for database: %s", database) } // 查询每个表的大小 query = `SELECT TABLE_NAME, (data_length + index_length) / 1024 / 1024 AS size_mb FROM information_schema.tables WHERE table_schema = ?` results, err := m.Engine.QueryString(query, database) if err != nil { return nil, fmt.Errorf("failed to get table sizes: %v", err) } // 解析每个表的信息 for _, row := range results { tableName := row["TABLE_NAME"] tableSizeStr := row["size_mb"] tableSize, _ := strconv.ParseFloat(tableSizeStr, 64) // 初始化 TableInfo info := models.TableInfo{ Name: tableName, Size: tableSize, } // 获取表的 DDL ddlQuery := fmt.Sprintf("SHOW CREATE TABLE `%s`", tableName) ddlResult, err := m.Engine.QueryString(ddlQuery) if err != nil { return nil, fmt.Errorf("failed to get DDL for table %s: %v", tableName, err) } if len(ddlResult) > 0 { info.DDL = ddlResult[0]["Create Table"] } // 获取表的行数 countQuery := fmt.Sprintf("SELECT COUNT(*) AS row_count FROM `%s`", tableName) countResult, err := m.Engine.QueryString(countQuery) if err != nil { return nil, fmt.Errorf("failed to get row count for table %s: %v", tableName, err) } if len(countResult) > 0 { info.Count, _ = strconv.Atoi(countResult[0]["row_count"]) } // 检查表是否包含 LOB 类型字段 columnQuery := fmt.Sprintf(`SELECT COLUMN_NAME, DATA_TYPE FROM information_schema.columns WHERE table_schema = '%s' AND table_name = '%s'`, database, tableName) columns, err := m.Engine.QueryString(columnQuery) if err != nil { return nil, fmt.Errorf("failed to get columns for table %s: %v", tableName, err) } // 遍历列,检查是否有 LOB 类型(TEXT, BLOB 等) isLobTable := false for _, column := range columns { dataType := column["DATA_TYPE"] if dataType == "text" || dataType == "blob" || dataType == "mediumtext" || dataType == "longtext" || dataType == "mediumblob" || dataType == "longblob" { isLobTable = true break } } // 将表信息存入对应的映射 if isLobTable { lobTableInfoMap[tableName] = info } else { tableInfoMap[tableName] = info } } // 确定数据库总大小 sizeMB := 0.0 if size.Valid { sizeMB = size.Float64 } //-- 获取存储过程和函数 funcsQuery := `SELECT ROUTINE_NAME, ROUTINE_TYPE, ROUTINE_DEFINITION FROM information_schema.ROUTINES WHERE ROUTINE_SCHEMA = ?` funcsResults, err := m.Engine.QueryString(funcsQuery, database) if err != nil { return nil, fmt.Errorf("获取存储过程/函数失败: %v", err) } var functionInfos []models.FunctionInfo for _, row := range funcsResults { functionInfo := models.FunctionInfo{ Name: row["ROUTINE_NAME"], Type: row["ROUTINE_TYPE"], Definition: row["ROUTINE_DEFINITION"], } functionInfos = append(functionInfos, functionInfo) } // 获取触发器 triggersQuery := `SELECT TRIGGER_NAME, EVENT_MANIPULATION, EVENT_OBJECT_TABLE, ACTION_TIMING, ACTION_STATEMENT FROM information_schema.TRIGGERS WHERE TRIGGER_SCHEMA = ?` triggersResults, err := m.Engine.QueryString(triggersQuery, database) if err != nil { return nil, fmt.Errorf("获取触发器失败: %v", err) } var triggerInfos []models.TriggerInfo for _, row := range triggersResults { triggerInfo := models.TriggerInfo{ Name: row["TRIGGER_NAME"], Event: row["EVENT_MANIPULATION"], Table: row["EVENT_OBJECT_TABLE"], Timing: row["ACTION_TIMING"], Statement: row["ACTION_STATEMENT"], } triggerInfos = append(triggerInfos, triggerInfo) } // 返回包含总大小、表数量和表信息的 DatabaseInfo 结构体 return &models.DatabaseInfo{ SizeMB: sizeMB, TableCount: len(tables), Tables: tableInfoMap, LobTables: lobTableInfoMap, Functions: functionInfos, Triggers: triggerInfos, }, nil } // ReadTableFromDBWithPagination 使用 Xorm 分页从数据库中读取表数据 // 参数: // - engine: Xorm 引擎实例,用于连接和操作数据库 // - tableName: 要读取的表名 // - offset: 分页查询的起始偏移量 // - pageSize: 每次分页查询的行数 // 返回值: // - headers: 表的列名列表 // - records: 表的数据,包含每行的内容 // - error: 如果查询出错,返回相关的错误信息 func (m *MySQLExecutor) ReadTableFromDBWithPagination(tableName string, offset int, pageSize int) ([]string, [][]string, error) { // 构建 SQL 查询,使用 LIMIT 和 OFFSET 来分页查询数据 sql := fmt.Sprintf("SELECT * FROM %s LIMIT %d OFFSET %d", tableName, pageSize, offset) rows, err := m.Engine.QueryString(sql) if err != nil { return nil, nil, fmt.Errorf("failed to query table: %v", err) } // 如果没有查询到数据,则返回 nil if len(rows) == 0 { return nil, nil, nil } // 获取列名 headers := make([]string, 0) for key := range rows[0] { headers = append(headers, key) } // 获取数据行 records := make([][]string, 0) for _, row := range rows { record := make([]string, len(headers)) for i, header := range headers { record[i] = row[header] } records = append(records, record) } return headers, records, nil } // InsertRecordsToDB 使用 Xorm 的 Session 结合 Prepare 语句将记录插入到数据库中 // 参数: // - engine: Xorm 引擎实例,用于连接和操作数据库 // - tableName: 插入的目标表名 // - headers: 表头,用于生成插入语句中的列名 // - records: 要插入的数据记录,每行对应表的一条记录 // 返回值: // - error: 如果插入过程中有错误,返回相关的错误信息 func (m *MySQLExecutor) InsertRecordsToDB(tableName string, headers []string, records [][]string) error { if len(records) == 0 { return nil // 如果没有记录,不进行任何插入操作 } // 获取 Xorm 底层的 *sql.DB 实例 db := m.Engine.DB().DB // 构建插入 SQL 语句 columnNames := strings.Join(headers, ", ") placeholders := strings.Repeat("?, ", len(headers)) placeholders = strings.TrimSuffix(placeholders, ", ") insertSQL := fmt.Sprintf("INSERT INTO %s (%s) VALUES (%s)", tableName, columnNames, placeholders) // 准备插入语句 stmt, err := db.Prepare(insertSQL) if err != nil { return fmt.Errorf("failed to prepare insert statement: %v", err) } defer stmt.Close() // 插入记录 for _, record := range records { args := make([]interface{}, len(record)) for i, v := range record { args[i] = v } // 使用准备好的语句执行插入 _, err := stmt.Exec(args...) if err != nil { return fmt.Errorf("failed to execute insert statement: %v", err) } } return nil }