123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306 |
- package db_executor
- import (
- "database/sql"
- "fmt"
- "strconv"
- "strings"
- "time"
- "xg_fetl/internal/models"
- _ "github.com/go-sql-driver/mysql"
- "xorm.io/xorm"
- )
- type MySQLExecutor struct {
- BaseExecutor
- }
- func (m *MySQLExecutor) InitDB(dsn string) error {
- var err error
- m.Engine, err = xorm.NewEngine("mysql", dsn)
- if err != nil {
- return fmt.Errorf("failed to initialize DB: %v", err)
- }
- // 设置连接池参数
- m.Engine.SetMaxOpenConns(10)
- m.Engine.SetMaxIdleConns(5)
- m.Engine.SetConnMaxLifetime(time.Hour)
- return nil
- }
- // GetDatabases 获取所有数据库名称,排除系统数据库
- func (m *MySQLExecutor) GetDatabases(excludeDBs []string) ([]string, error) {
- var databases []struct {
- Database string `xorm:"Database"`
- }
- // 执行 SHOW DATABASES 查询
- err := m.Engine.SQL("SHOW DATABASES").Find(&databases)
- if err != nil {
- return nil, fmt.Errorf("failed to get databases: %v", err)
- }
- // 获取所有数据库名
- var allDBs []string
- for _, db := range databases {
- allDBs = append(allDBs, db.Database)
- }
- // 过滤排除的数据库
- excludeDBsMap := make(map[string]struct{})
- for _, db := range excludeDBs {
- excludeDBsMap[db] = struct{}{}
- }
- var dbs []string
- for _, db := range allDBs {
- if _, ok := excludeDBsMap[db]; !ok {
- dbs = append(dbs, db)
- }
- }
- return dbs, nil
- }
- // 添加MySQL特有的操作方法
- func (m *MySQLExecutor) GetEngine() *xorm.Engine {
- return m.Engine
- }
- func (m *MySQLExecutor) GetDatabaseInfo(database string) (*models.DatabaseInfo, error) {
- // 获取表列表
- tables, err := m.Engine.DBMetas()
- if err != nil {
- return nil, fmt.Errorf("failed to get tables: %v", err)
- }
- // 初始化普通表和 LOB 表信息的映射
- tableInfoMap := make(map[string]models.TableInfo) // 普通表
- lobTableInfoMap := make(map[string]models.TableInfo) // LOB 表
- // 查询数据库的总大小
- query := `SELECT SUM(data_length + index_length) / 1024 / 1024 AS size_mb
- FROM information_schema.tables WHERE table_schema = ?`
- var size sql.NullFloat64
- found, err := m.Engine.SQL(query, database).Get(&size)
- if err != nil {
- return nil, fmt.Errorf("failed to get database size: %v", err)
- }
- if !found {
- return nil, fmt.Errorf("no size information found for database: %s", database)
- }
- // 查询每个表的大小
- query = `SELECT TABLE_NAME, (data_length + index_length) / 1024 / 1024 AS size_mb
- FROM information_schema.tables WHERE table_schema = ?`
- results, err := m.Engine.QueryString(query, database)
- if err != nil {
- return nil, fmt.Errorf("failed to get table sizes: %v", err)
- }
- // 解析每个表的信息
- for _, row := range results {
- tableName := row["TABLE_NAME"]
- tableSizeStr := row["size_mb"]
- tableSize, _ := strconv.ParseFloat(tableSizeStr, 64)
- // 初始化 TableInfo
- info := models.TableInfo{
- Name: tableName,
- Size: tableSize,
- }
- // 获取表的 DDL
- ddlQuery := fmt.Sprintf("SHOW CREATE TABLE `%s`", tableName)
- ddlResult, err := m.Engine.QueryString(ddlQuery)
- if err != nil {
- return nil, fmt.Errorf("failed to get DDL for table %s: %v", tableName, err)
- }
- if len(ddlResult) > 0 {
- info.DDL = ddlResult[0]["Create Table"]
- }
- // 获取表的行数
- countQuery := fmt.Sprintf("SELECT COUNT(*) AS row_count FROM `%s`", tableName)
- countResult, err := m.Engine.QueryString(countQuery)
- if err != nil {
- return nil, fmt.Errorf("failed to get row count for table %s: %v", tableName, err)
- }
- if len(countResult) > 0 {
- info.Count, _ = strconv.Atoi(countResult[0]["row_count"])
- }
- // 检查表是否包含 LOB 类型字段
- columnQuery := fmt.Sprintf(`SELECT COLUMN_NAME, DATA_TYPE
- FROM information_schema.columns
- WHERE table_schema = '%s' AND table_name = '%s'`, database, tableName)
- columns, err := m.Engine.QueryString(columnQuery)
- if err != nil {
- return nil, fmt.Errorf("failed to get columns for table %s: %v", tableName, err)
- }
- // 遍历列,检查是否有 LOB 类型(TEXT, BLOB 等)
- isLobTable := false
- for _, column := range columns {
- dataType := column["DATA_TYPE"]
- if dataType == "text" || dataType == "blob" || dataType == "mediumtext" || dataType == "longtext" || dataType == "mediumblob" || dataType == "longblob" {
- isLobTable = true
- break
- }
- }
- // 将表信息存入对应的映射
- if isLobTable {
- lobTableInfoMap[tableName] = info
- } else {
- tableInfoMap[tableName] = info
- }
- }
- // 确定数据库总大小
- sizeMB := 0.0
- if size.Valid {
- sizeMB = size.Float64
- }
- //-- 获取存储过程和函数
- funcsQuery := `SELECT ROUTINE_NAME, ROUTINE_TYPE, ROUTINE_DEFINITION
- FROM information_schema.ROUTINES
- WHERE ROUTINE_SCHEMA = ?`
- funcsResults, err := m.Engine.QueryString(funcsQuery, database)
- if err != nil {
- return nil, fmt.Errorf("获取存储过程/函数失败: %v", err)
- }
- var functionInfos []models.FunctionInfo
- for _, row := range funcsResults {
- functionInfo := models.FunctionInfo{
- Name: row["ROUTINE_NAME"],
- Type: row["ROUTINE_TYPE"],
- Definition: row["ROUTINE_DEFINITION"],
- }
- functionInfos = append(functionInfos, functionInfo)
- }
- // 获取触发器
- triggersQuery := `SELECT TRIGGER_NAME, EVENT_MANIPULATION, EVENT_OBJECT_TABLE,
- ACTION_TIMING, ACTION_STATEMENT
- FROM information_schema.TRIGGERS
- WHERE TRIGGER_SCHEMA = ?`
- triggersResults, err := m.Engine.QueryString(triggersQuery, database)
- if err != nil {
- return nil, fmt.Errorf("获取触发器失败: %v", err)
- }
- var triggerInfos []models.TriggerInfo
- for _, row := range triggersResults {
- triggerInfo := models.TriggerInfo{
- Name: row["TRIGGER_NAME"],
- Event: row["EVENT_MANIPULATION"],
- Table: row["EVENT_OBJECT_TABLE"],
- Timing: row["ACTION_TIMING"],
- Statement: row["ACTION_STATEMENT"],
- }
- triggerInfos = append(triggerInfos, triggerInfo)
- }
- // 返回包含总大小、表数量和表信息的 DatabaseInfo 结构体
- return &models.DatabaseInfo{
- SizeMB: sizeMB,
- TableCount: len(tables),
- Tables: tableInfoMap,
- LobTables: lobTableInfoMap,
- Functions: functionInfos,
- Triggers: triggerInfos,
- }, nil
- }
- // ReadTableFromDBWithPagination 使用 Xorm 分页从数据库中读取表数据
- // 参数:
- // - engine: Xorm 引擎实例,用于连接和操作数据库
- // - tableName: 要读取的表名
- // - offset: 分页查询的起始偏移量
- // - pageSize: 每次分页查询的行数
- // 返回值:
- // - headers: 表的列名列表
- // - records: 表的数据,包含每行的内容
- // - error: 如果查询出错,返回相关的错误信息
- func (m *MySQLExecutor) ReadTableFromDBWithPagination(tableName string, offset int, pageSize int) ([]string, [][]string, error) {
- // 构建 SQL 查询,使用 LIMIT 和 OFFSET 来分页查询数据
- sql := fmt.Sprintf("SELECT * FROM %s LIMIT %d OFFSET %d", tableName, pageSize, offset)
- rows, err := m.Engine.QueryString(sql)
- if err != nil {
- return nil, nil, fmt.Errorf("failed to query table: %v", err)
- }
- // 如果没有查询到数据,则返回 nil
- if len(rows) == 0 {
- return nil, nil, nil
- }
- // 获取列名
- headers := make([]string, 0)
- for key := range rows[0] {
- headers = append(headers, key)
- }
- // 获取数据行
- records := make([][]string, 0)
- for _, row := range rows {
- record := make([]string, len(headers))
- for i, header := range headers {
- record[i] = row[header]
- }
- records = append(records, record)
- }
- return headers, records, nil
- }
- // InsertRecordsToDB 使用 Xorm 的 Session 结合 Prepare 语句将记录插入到数据库中
- // 参数:
- // - engine: Xorm 引擎实例,用于连接和操作数据库
- // - tableName: 插入的目标表名
- // - headers: 表头,用于生成插入语句中的列名
- // - records: 要插入的数据记录,每行对应表的一条记录
- // 返回值:
- // - error: 如果插入过程中有错误,返回相关的错误信息
- func (m *MySQLExecutor) InsertRecordsToDB(tableName string, headers []string, records [][]string) error {
- if len(records) == 0 {
- return nil // 如果没有记录,不进行任何插入操作
- }
- // 获取 Xorm 底层的 *sql.DB 实例
- db := m.Engine.DB().DB
- // 构建插入 SQL 语句
- columnNames := strings.Join(headers, ", ")
- placeholders := strings.Repeat("?, ", len(headers))
- placeholders = strings.TrimSuffix(placeholders, ", ")
- insertSQL := fmt.Sprintf("INSERT INTO %s (%s) VALUES (%s)", tableName, columnNames, placeholders)
- // 准备插入语句
- stmt, err := db.Prepare(insertSQL)
- if err != nil {
- return fmt.Errorf("failed to prepare insert statement: %v", err)
- }
- defer stmt.Close()
- // 插入记录
- for _, record := range records {
- args := make([]interface{}, len(record))
- for i, v := range record {
- args[i] = v
- }
- // 使用准备好的语句执行插入
- _, err := stmt.Exec(args...)
- if err != nil {
- return fmt.Errorf("failed to execute insert statement: %v", err)
- }
- }
- return nil
- }
|