123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303 |
- package db_executor
- import (
- "database/sql"
- "fmt"
- "strconv"
- "strings"
- "time"
- "xg_fetl/internal/models"
- "xorm.io/xorm"
- )
- type OracleExecutor struct {
- BaseExecutor
- }
- func (o *OracleExecutor) InitDB(dsn string) error {
- var err error
- o.Engine, err = xorm.NewEngine("mysql", dsn)
- if err != nil {
- return fmt.Errorf("failed to initialize DB: %v", err)
- }
-
- o.Engine.SetMaxOpenConns(500)
- o.Engine.SetMaxIdleConns(500)
- o.Engine.SetConnMaxLifetime(time.Hour)
- return nil
- }
- func (m *OracleExecutor) GetDatabases(excludeDBs []string) ([]string, error) {
-
-
- var databases []struct {
- Username string `xorm:"Username"`
- }
-
- err := m.Engine.SQL("SELECT Username FROM ALL_USERS").Find(&databases)
- if err != nil {
- return nil, fmt.Errorf("failed to get databases: %v", err)
- }
-
- var allDBs []string
- for _, db := range databases {
- allDBs = append(allDBs, db.Username)
- }
-
- excludeDBsMap := make(map[string]struct{})
- for _, db := range excludeDBs {
- excludeDBsMap[db] = struct{}{}
- }
- var dbs []string
- for _, db := range allDBs {
- if _, ok := excludeDBsMap[db]; !ok {
- dbs = append(dbs, db)
- }
- }
- return dbs, nil
- }
- func (m *OracleExecutor) GetDatabaseInfo(database string) (*models.DatabaseInfo, error) {
-
- tables, err := m.Engine.DBMetas()
- if err != nil {
- return nil, fmt.Errorf("failed to get tables: %v", err)
- }
-
- tableInfoMap := make(map[string]models.TableInfo)
- lobTableInfoMap := make(map[string]models.TableInfo)
-
- query := `SELECT SUM(bytes) / 1024 / 1024 AS size_mb
- FROM dba_segments WHERE owner =?`
- var size sql.NullFloat64
- found, err := m.Engine.SQL(query, database).Get(&size)
- if err != nil {
- return nil, fmt.Errorf("failed to get database size: %v", err)
- }
- if !found {
- return nil, fmt.Errorf("no size information found for database: %s", database)
- }
-
- query = `SELECT table_name, SUM(bytes) / 1024 / 1024 AS size_mb
- FROM dba_segments WHERE owner =? AND segment_type = 'TABLE'
- GROUP BY table_name`
- results, err := m.Engine.QueryString(query, database)
- if err != nil {
- return nil, fmt.Errorf("failed to get table sizes: %v", err)
- }
-
- for _, row := range results {
- tableName := row["table_name"]
- tableSizeStr := row["size_mb"]
- tableSize, _ := strconv.ParseFloat(tableSizeStr, 64)
-
- info := models.TableInfo{
- Name: tableName,
- Size: tableSize,
- }
-
- ddlQuery := fmt.Sprintf("SELECT dbms_metadata.get_ddl('TABLE', '%s', '%s') AS ddl FROM dual", tableName, database)
- ddlResult, err := m.Engine.QueryString(ddlQuery)
- if err != nil {
- return nil, fmt.Errorf("failed to get DDL for table %s: %v", tableName, err)
- }
- if len(ddlResult) > 0 {
- info.DDL = ddlResult[0]["ddl"]
- }
-
- countQuery := fmt.Sprintf("SELECT COUNT(*) AS row_count FROM %s", tableName)
- countResult, err := m.Engine.QueryString(countQuery)
- if err != nil {
- return nil, fmt.Errorf("failed to get row count for table %s: %v", tableName, err)
- }
- if len(countResult) > 0 {
- info.Count, _ = strconv.Atoi(countResult[0]["row_count"])
- }
-
- columnQuery := fmt.Sprintf(`SELECT column_name, data_type
- FROM all_tab_columns
- WHERE owner = '%s' AND table_name = '%s'`, database, tableName)
- columns, err := m.Engine.QueryString(columnQuery)
- if err != nil {
- return nil, fmt.Errorf("failed to get columns for table %s: %v", tableName, err)
- }
-
- isLobTable := false
- for _, column := range columns {
- dataType := column["data_type"]
- if dataType == "CLOB" || dataType == "BLOB" || dataType == "NCLOB" {
- isLobTable = true
- break
- }
- }
-
- if isLobTable {
- lobTableInfoMap[tableName] = info
- } else {
- tableInfoMap[tableName] = info
- }
- }
-
- sizeMB := 0.0
- if size.Valid {
- sizeMB = size.Float64
- }
-
- funcsQuery := `SELECT object_name, object_type, text
- FROM all_source
- WHERE owner =?`
- funcsResults, err := m.Engine.QueryString(funcsQuery, database)
- if err != nil {
- return nil, fmt.Errorf("获取存储过程/函数失败: %v", err)
- }
- var functionInfos []models.FunctionInfo
- for _, row := range funcsResults {
- functionInfo := models.FunctionInfo{
- Name: row["object_name"],
- Type: row["object_type"],
- Definition: row["text"],
- }
- functionInfos = append(functionInfos, functionInfo)
- }
-
- triggersQuery := `SELECT trigger_name, triggering_event, table_name,
- before_after, statement_type, trigger_body
- FROM all_triggers
- WHERE owner =?`
- triggersResults, err := m.Engine.QueryString(triggersQuery, database)
- if err != nil {
- return nil, fmt.Errorf("获取触发器失败: %v", err)
- }
- var triggerInfos []models.TriggerInfo
- for _, row := range triggersResults {
- triggerInfo := models.TriggerInfo{
- Name: row["trigger_name"],
- Event: row["triggering_event"],
- Table: row["table_name"],
- Timing: row["before_after"],
- Statement: row["trigger_body"],
- }
- triggerInfos = append(triggerInfos, triggerInfo)
- }
-
- return &models.DatabaseInfo{
- SizeMB: sizeMB,
- TableCount: len(tables),
- Tables: tableInfoMap,
- LobTables: lobTableInfoMap,
- Functions: functionInfos,
- Triggers: triggerInfos,
- }, nil
- }
- func (o *OracleExecutor) ReadTableFromDBWithPagination(tableName string, offset int, pageSize int) ([]string, [][]string, error) {
-
- sql := fmt.Sprintf("SELECT * FROM (SELECT t.*, ROWNUM rnum FROM %s t WHERE ROWNUM <= %d) WHERE rnum > %d", tableName, offset+pageSize, offset)
- rows, err := o.Engine.QueryString(sql)
- if err != nil {
- return nil, nil, fmt.Errorf("failed to query table: %v", err)
- }
-
- if len(rows) == 0 {
- return nil, nil, nil
- }
-
- headers := make([]string, 0)
- for key := range rows[0] {
- headers = append(headers, key)
- }
-
- records := make([][]string, 0)
- for _, row := range rows {
- record := make([]string, len(headers))
- for i, header := range headers {
- record[i] = row[header]
- }
- records = append(records, record)
- }
- return headers, records, nil
- }
- func (o *OracleExecutor) InsertRecordsToDB(tableName string, headers []string, records [][]string) error {
- if len(records) == 0 {
- return nil
- }
-
- db := o.Engine.DB().DB
-
- columnNames := strings.Join(headers, ", ")
- placeholders := strings.Repeat("?, ", len(headers))
- placeholders = strings.TrimSuffix(placeholders, ", ")
- insertSQL := fmt.Sprintf("INSERT INTO %s (%s) VALUES (%s)", tableName, columnNames, placeholders)
-
- stmt, err := db.Prepare(insertSQL)
- if err != nil {
- return fmt.Errorf("failed to prepare insert statement: %v", err)
- }
- defer stmt.Close()
-
- for _, record := range records {
- args := make([]interface{}, len(record))
- for i, v := range record {
- args[i] = v
- }
-
- _, err := stmt.Exec(args...)
- if err != nil {
- return fmt.Errorf("failed to execute insert statement: %v", err)
- }
- }
- return nil
- }
|