package db_executor import ( "fmt" "xg_fetl/internal/models" _ "github.com/go-sql-driver/mysql" "xorm.io/xorm" ) type DBExecutor interface { InitDB(dsn string) error QueryRowsAsMaps(sqlStr string) ([]map[string]string, error) GetColumnsAndRows(query string) ([]string, []map[string]interface{}, error) Close() error // 定义其他通用的方法 GetDatabaseInfo(database string) (*models.DatabaseInfo, error) ReadTableFromDBWithPagination(tableName string, offset int, pageSize int) ([]string, [][]string, error) InsertRecordsToDB(tableName string, headers []string, records [][]string) error } type BaseExecutor struct { Engine *xorm.Engine } func (b *BaseExecutor) Close() error { return b.Engine.Close() } // InitDB 初始化数据库连接 func (b *BaseExecutor) InitDB(dsn string) error { engine, err := xorm.NewEngine("mysql", dsn) if err != nil { return fmt.Errorf("failed to initialize DB: %v", err) } b.Engine = engine return nil } // QueryRowsAsMaps 通用的查询方法,返回每行结果的映射 func (b *BaseExecutor) QueryRowsAsMaps(sqlStr string) ([]map[string]string, error) { results, err := b.Engine.QueryString(sqlStr) if err != nil { return nil, err } return results, nil // xorm 已经将每行结果映射为 map[string]interface{} } // GetColumnsAndRows 执行查询并返回字段名和结果集 func (b *BaseExecutor) GetColumnsAndRows(query string) ([]string, []map[string]interface{}, error) { rows, err := b.Engine.QueryInterface(query) if err != nil { return nil, nil, err } // 获取列名 if len(rows) == 0 { return nil, nil, fmt.Errorf("no results found") } columns := make([]string, 0, len(rows[0])) for col := range rows[0] { columns = append(columns, col) } // 返回列名和结果集 return columns, rows, nil } // 工厂函数,根据数据库类型创建相应的执行器 func NewDBExecutor(dbType string, dsn string) (DBExecutor, error) { switch dbType { case "mysql": engine, err := xorm.NewEngine("mysql", dsn) if err != nil { return nil, fmt.Errorf("failed to initialize DB: %v", err) } return &MySQLExecutor{BaseExecutor: BaseExecutor{Engine: engine}}, nil default: return nil, fmt.Errorf("unsupported database type: %s", dbType) } }