12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485 |
- package db_executor
- import (
- "fmt"
- "xg_fetl/internal/models"
- _ "github.com/go-sql-driver/mysql"
- "xorm.io/xorm"
- )
- type DBExecutor interface {
- InitDB(dsn string) error
- QueryRowsAsMaps(sqlStr string) ([]map[string]string, error)
- GetColumnsAndRows(query string) ([]string, []map[string]interface{}, error)
- Close() error
- // 定义其他通用的方法
- GetDatabaseInfo(database string) (*models.DatabaseInfo, error)
- ReadTableFromDBWithPagination(tableName string, offset int, pageSize int) ([]string, [][]string, error)
- InsertRecordsToDB(tableName string, headers []string, records [][]string) error
- }
- type BaseExecutor struct {
- Engine *xorm.Engine
- }
- func (b *BaseExecutor) Close() error {
- return b.Engine.Close()
- }
- // InitDB 初始化数据库连接
- func (b *BaseExecutor) InitDB(dsn string) error {
- engine, err := xorm.NewEngine("mysql", dsn)
- if err != nil {
- return fmt.Errorf("failed to initialize DB: %v", err)
- }
- b.Engine = engine
- return nil
- }
- // QueryRowsAsMaps 通用的查询方法,返回每行结果的映射
- func (b *BaseExecutor) QueryRowsAsMaps(sqlStr string) ([]map[string]string, error) {
- results, err := b.Engine.QueryString(sqlStr)
- if err != nil {
- return nil, err
- }
- return results, nil // xorm 已经将每行结果映射为 map[string]interface{}
- }
- // GetColumnsAndRows 执行查询并返回字段名和结果集
- func (b *BaseExecutor) GetColumnsAndRows(query string) ([]string, []map[string]interface{}, error) {
- rows, err := b.Engine.QueryInterface(query)
- if err != nil {
- return nil, nil, err
- }
- // 获取列名
- if len(rows) == 0 {
- return nil, nil, fmt.Errorf("no results found")
- }
- columns := make([]string, 0, len(rows[0]))
- for col := range rows[0] {
- columns = append(columns, col)
- }
- // 返回列名和结果集
- return columns, rows, nil
- }
- // 工厂函数,根据数据库类型创建相应的执行器
- func NewDBExecutor(dbType string, dsn string) (DBExecutor, error) {
- switch dbType {
- case "mysql":
- engine, err := xorm.NewEngine("mysql", dsn)
- if err != nil {
- return nil, fmt.Errorf("failed to initialize DB: %v", err)
- }
- return &MySQLExecutor{BaseExecutor: BaseExecutor{Engine: engine}}, nil
- default:
- return nil, fmt.Errorf("unsupported database type: %s", dbType)
- }
- }
|