db_executor.go 2.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. package db_executor
  2. import (
  3. "fmt"
  4. "xg_fetl/internal/models"
  5. _ "github.com/go-sql-driver/mysql"
  6. "xorm.io/xorm"
  7. )
  8. type DBExecutor interface {
  9. InitDB(dsn string) error
  10. QueryRowsAsMaps(sqlStr string) ([]map[string]string, error)
  11. GetColumnsAndRows(query string) ([]string, []map[string]interface{}, error)
  12. Close() error
  13. // 定义其他通用的方法
  14. GetDatabaseInfo(database string) (*models.DatabaseInfo, error)
  15. ReadTableFromDBWithPagination(tableName string, offset int, pageSize int) ([]string, [][]string, error)
  16. InsertRecordsToDB(tableName string, headers []string, records [][]string) error
  17. }
  18. type BaseExecutor struct {
  19. Engine *xorm.Engine
  20. }
  21. func (b *BaseExecutor) Close() error {
  22. return b.Engine.Close()
  23. }
  24. // InitDB 初始化数据库连接
  25. func (b *BaseExecutor) InitDB(dsn string) error {
  26. engine, err := xorm.NewEngine("mysql", dsn)
  27. if err != nil {
  28. return fmt.Errorf("failed to initialize DB: %v", err)
  29. }
  30. b.Engine = engine
  31. return nil
  32. }
  33. // QueryRowsAsMaps 通用的查询方法,返回每行结果的映射
  34. func (b *BaseExecutor) QueryRowsAsMaps(sqlStr string) ([]map[string]string, error) {
  35. results, err := b.Engine.QueryString(sqlStr)
  36. if err != nil {
  37. return nil, err
  38. }
  39. return results, nil // xorm 已经将每行结果映射为 map[string]interface{}
  40. }
  41. // GetColumnsAndRows 执行查询并返回字段名和结果集
  42. func (b *BaseExecutor) GetColumnsAndRows(query string) ([]string, []map[string]interface{}, error) {
  43. rows, err := b.Engine.QueryInterface(query)
  44. if err != nil {
  45. return nil, nil, err
  46. }
  47. // 获取列名
  48. if len(rows) == 0 {
  49. return nil, nil, fmt.Errorf("no results found")
  50. }
  51. columns := make([]string, 0, len(rows[0]))
  52. for col := range rows[0] {
  53. columns = append(columns, col)
  54. }
  55. // 返回列名和结果集
  56. return columns, rows, nil
  57. }
  58. // 工厂函数,根据数据库类型创建相应的执行器
  59. func NewDBExecutor(dbType string, dsn string) (DBExecutor, error) {
  60. switch dbType {
  61. case "mysql":
  62. engine, err := xorm.NewEngine("mysql", dsn)
  63. if err != nil {
  64. return nil, fmt.Errorf("failed to initialize DB: %v", err)
  65. }
  66. return &MySQLExecutor{BaseExecutor: BaseExecutor{Engine: engine}}, nil
  67. default:
  68. return nil, fmt.Errorf("unsupported database type: %s", dbType)
  69. }
  70. }