package db_executor import ( "database/sql" "fmt" "strconv" "strings" "time" "xg_fetl/internal/models" "xorm.io/xorm" ) type OracleExecutor struct { BaseExecutor } func (o *OracleExecutor) InitDB(dsn string) error { var err error o.Engine, err = xorm.NewEngine("mysql", dsn) if err != nil { return fmt.Errorf("failed to initialize DB: %v", err) } // 设置连接池参数 o.Engine.SetMaxOpenConns(500) o.Engine.SetMaxIdleConns(500) o.Engine.SetConnMaxLifetime(time.Hour) return nil } // GetDatabases 获取所有数据库名称(在Oracle中通常称为模式),排除系统数据库(修改为适用于Oracle语法) func (m *OracleExecutor) GetDatabases(excludeDBs []string) ([]string, error) { // 在Oracle中,通过查询数据字典视图来获取所有模式(类似于MySQL中的数据库) // 这里查询 ALL_USERS 视图来获取所有用户模式,因为每个用户模式可看作一个独立的数据库空间 var databases []struct { Username string `xorm:"Username"` } // 执行查询以获取所有模式(类似于MySQL中的数据库) err := m.Engine.SQL("SELECT Username FROM ALL_USERS").Find(&databases) if err != nil { return nil, fmt.Errorf("failed to get databases: %v", err) } // 获取所有模式名(类似于MySQL中的数据库名) var allDBs []string for _, db := range databases { allDBs = append(allDBs, db.Username) } // 过滤排除的数据库 excludeDBsMap := make(map[string]struct{}) for _, db := range excludeDBs { excludeDBsMap[db] = struct{}{} } var dbs []string for _, db := range allDBs { if _, ok := excludeDBsMap[db]; !ok { dbs = append(dbs, db) } } return dbs, nil } func (m *OracleExecutor) GetDatabaseInfo(database string) (*models.DatabaseInfo, error) { // 获取表列表 tables, err := m.Engine.DBMetas() if err != nil { return nil, fmt.Errorf("failed to get tables: %v", err) } // 初始化普通表和LOB表信息的映射 tableInfoMap := make(map[string]models.TableInfo) // 普通表 lobTableInfoMap := make(map[string]models.TableInfo) // LOB表 // 查询数据库的总大小 query := `SELECT SUM(bytes) / 1024 / 1024 AS size_mb FROM dba_segments WHERE owner =?` var size sql.NullFloat64 found, err := m.Engine.SQL(query, database).Get(&size) if err != nil { return nil, fmt.Errorf("failed to get database size: %v", err) } if !found { return nil, fmt.Errorf("no size information found for database: %s", database) } // 查询每个表的大小 query = `SELECT table_name, SUM(bytes) / 1024 / 1024 AS size_mb FROM dba_segments WHERE owner =? AND segment_type = 'TABLE' GROUP BY table_name` results, err := m.Engine.QueryString(query, database) if err != nil { return nil, fmt.Errorf("failed to get table sizes: %v", err) } // 解析每个表的信息 for _, row := range results { tableName := row["table_name"] tableSizeStr := row["size_mb"] tableSize, _ := strconv.ParseFloat(tableSizeStr, 64) // 初始化TableInfo info := models.TableInfo{ Name: tableName, Size: tableSize, } // 获取表的DDL ddlQuery := fmt.Sprintf("SELECT dbms_metadata.get_ddl('TABLE', '%s', '%s') AS ddl FROM dual", tableName, database) ddlResult, err := m.Engine.QueryString(ddlQuery) if err != nil { return nil, fmt.Errorf("failed to get DDL for table %s: %v", tableName, err) } if len(ddlResult) > 0 { info.DDL = ddlResult[0]["ddl"] } // 获取表的行数 countQuery := fmt.Sprintf("SELECT COUNT(*) AS row_count FROM %s", tableName) countResult, err := m.Engine.QueryString(countQuery) if err != nil { return nil, fmt.Errorf("failed to get row count for table %s: %v", tableName, err) } if len(countResult) > 0 { info.Count, _ = strconv.Atoi(countResult[0]["row_count"]) } // 检查表是否包含LOB类型字段 columnQuery := fmt.Sprintf(`SELECT column_name, data_type FROM all_tab_columns WHERE owner = '%s' AND table_name = '%s'`, database, tableName) columns, err := m.Engine.QueryString(columnQuery) if err != nil { return nil, fmt.Errorf("failed to get columns for table %s: %v", tableName, err) } // 遍历列,检查是否有LOB类型(CLOB, BLOB等) isLobTable := false for _, column := range columns { dataType := column["data_type"] if dataType == "CLOB" || dataType == "BLOB" || dataType == "NCLOB" { isLobTable = true break } } // 将表信息存入对应的映射 if isLobTable { lobTableInfoMap[tableName] = info } else { tableInfoMap[tableName] = info } } // 确定数据库总大小 sizeMB := 0.0 if size.Valid { sizeMB = size.Float64 } // 获取存储过程和函数 funcsQuery := `SELECT object_name, object_type, text FROM all_source WHERE owner =?` funcsResults, err := m.Engine.QueryString(funcsQuery, database) if err != nil { return nil, fmt.Errorf("获取存储过程/函数失败: %v", err) } var functionInfos []models.FunctionInfo for _, row := range funcsResults { functionInfo := models.FunctionInfo{ Name: row["object_name"], Type: row["object_type"], Definition: row["text"], } functionInfos = append(functionInfos, functionInfo) } // 获取触发器 triggersQuery := `SELECT trigger_name, triggering_event, table_name, before_after, statement_type, trigger_body FROM all_triggers WHERE owner =?` triggersResults, err := m.Engine.QueryString(triggersQuery, database) if err != nil { return nil, fmt.Errorf("获取触发器失败: %v", err) } var triggerInfos []models.TriggerInfo for _, row := range triggersResults { triggerInfo := models.TriggerInfo{ Name: row["trigger_name"], Event: row["triggering_event"], Table: row["table_name"], Timing: row["before_after"], Statement: row["trigger_body"], } triggerInfos = append(triggerInfos, triggerInfo) } // 返回包含总大小、表数量和表信息的DatabaseInfo结构体 return &models.DatabaseInfo{ SizeMB: sizeMB, TableCount: len(tables), Tables: tableInfoMap, LobTables: lobTableInfoMap, Functions: functionInfos, Triggers: triggerInfos, }, nil } // ReadTableFromDBWithPagination 使用 Xorm 分页从数据库中读取表数据(修改为适用于Oracle语法) // 参数: // - engine: Xorm 引擎实例,用于连接和操作数据库 // - tableName: 要读取的表名 // - offset: 分页查询的起始偏移量 // - pageSize: 每次分页查询的行数 // 返回值: // - headers: 表的列名列表 // - records: 表的数据,包含每行的内容 // - error: 如果查询出错,返回相关的错误信息 func (o *OracleExecutor) ReadTableFromDBWithPagination(tableName string, offset int, pageSize int) ([]string, [][]string, error) { // 构建 SQL 查询,使用 ROWNUM 来实现分页查询数据(Oracle语法) sql := fmt.Sprintf("SELECT * FROM (SELECT t.*, ROWNUM rnum FROM %s t WHERE ROWNUM <= %d) WHERE rnum > %d", tableName, offset+pageSize, offset) rows, err := o.Engine.QueryString(sql) if err != nil { return nil, nil, fmt.Errorf("failed to query table: %v", err) } // 如果没有查询到数据,则返回 nil if len(rows) == 0 { return nil, nil, nil } // 获取列名 headers := make([]string, 0) for key := range rows[0] { headers = append(headers, key) } // 获取数据行 records := make([][]string, 0) for _, row := range rows { record := make([]string, len(headers)) for i, header := range headers { record[i] = row[header] } records = append(records, record) } return headers, records, nil } // InsertRecordsToDB 使用 Xorm 的 Session 结合 Prepare 语句将记录插入到数据库中(Oracle语法无需修改插入部分的SQL基本逻辑,因为插入语句在Oracle和MySQL有相似之处,这里假设字段类型等在Oracle中也能适配现有的插入逻辑) // 参数: // - engine: Xorm 引擎实例,用于连接和操作数据库 // - tableName: 插入的目标表名 // - headers: 表头,用于生成插入语句中的列名 // - records: 要插入的数据记录,每行对应表的一条记录 // 返回值: // - error: 如果插入过程中有错误,返回相关的错误信息 func (o *OracleExecutor) InsertRecordsToDB(tableName string, headers []string, records [][]string) error { if len(records) == 0 { return nil // 如果没有记录,不进行任何插入操作 } // 获取 Xorm 底层的 *sql.DB 实例 db := o.Engine.DB().DB // 构建插入 SQL 语句 columnNames := strings.Join(headers, ", ") placeholders := strings.Repeat("?, ", len(headers)) placeholders = strings.TrimSuffix(placeholders, ", ") insertSQL := fmt.Sprintf("INSERT INTO %s (%s) VALUES (%s)", tableName, columnNames, placeholders) // 准备插入语句 stmt, err := db.Prepare(insertSQL) if err != nil { return fmt.Errorf("failed to prepare insert statement: %v", err) } defer stmt.Close() // 插入记录 for _, record := range records { args := make([]interface{}, len(record)) for i, v := range record { args[i] = v } // 使用准备好的语句执行插入 _, err := stmt.Exec(args...) if err != nil { return fmt.Errorf("failed to execute insert statement: %v", err) } } return nil }