Browse Source

未完成。 添加oracle

GTong 6 months ago
parent
commit
6b8096d08f
3 changed files with 308 additions and 5 deletions
  1. 2 2
      config.toml
  2. 303 0
      internal/db_executor/oracle_executor.go
  3. 3 3
      internal/services/reader_services.go

+ 2 - 2
config.toml

@@ -18,7 +18,7 @@ password = "123456"
 
 [import.cfg]
 delimiter = ","
-page_size = 10000
+page_size = 100000
 filename_pattern = "^(.*)_[0-9]+\\.csv$"
 
 
@@ -26,4 +26,4 @@ filename_pattern = "^(.*)_[0-9]+\\.csv$"
 base_file_path = "./GTool/"
 delimiter = ","
 max_file_size = 1024
-page_size = 10000
+page_size = 100000

+ 303 - 0
internal/db_executor/oracle_executor.go

@@ -0,0 +1,303 @@
+package db_executor
+
+import (
+	"database/sql"
+	"fmt"
+	"strconv"
+	"strings"
+	"time"
+	"xg_fetl/internal/models"
+
+	"xorm.io/xorm"
+)
+
+type OracleExecutor struct {
+	BaseExecutor
+}
+
+func (o *OracleExecutor) InitDB(dsn string) error {
+	var err error
+	o.Engine, err = xorm.NewEngine("mysql", dsn)
+	if err != nil {
+		return fmt.Errorf("failed to initialize DB: %v", err)
+	}
+	// 设置连接池参数
+	o.Engine.SetMaxOpenConns(500)
+	o.Engine.SetMaxIdleConns(500)
+	o.Engine.SetConnMaxLifetime(time.Hour)
+	return nil
+}
+
+// GetDatabases 获取所有数据库名称(在Oracle中通常称为模式),排除系统数据库(修改为适用于Oracle语法)
+func (m *OracleExecutor) GetDatabases(excludeDBs []string) ([]string, error) {
+	// 在Oracle中,通过查询数据字典视图来获取所有模式(类似于MySQL中的数据库)
+	// 这里查询 ALL_USERS 视图来获取所有用户模式,因为每个用户模式可看作一个独立的数据库空间
+	var databases []struct {
+		Username string `xorm:"Username"`
+	}
+
+	// 执行查询以获取所有模式(类似于MySQL中的数据库)
+	err := m.Engine.SQL("SELECT Username FROM ALL_USERS").Find(&databases)
+	if err != nil {
+		return nil, fmt.Errorf("failed to get databases: %v", err)
+	}
+
+	// 获取所有模式名(类似于MySQL中的数据库名)
+	var allDBs []string
+	for _, db := range databases {
+		allDBs = append(allDBs, db.Username)
+	}
+
+	// 过滤排除的数据库
+	excludeDBsMap := make(map[string]struct{})
+	for _, db := range excludeDBs {
+		excludeDBsMap[db] = struct{}{}
+	}
+
+	var dbs []string
+	for _, db := range allDBs {
+		if _, ok := excludeDBsMap[db]; !ok {
+			dbs = append(dbs, db)
+		}
+	}
+
+	return dbs, nil
+}
+
+func (m *OracleExecutor) GetDatabaseInfo(database string) (*models.DatabaseInfo, error) {
+
+	// 获取表列表
+	tables, err := m.Engine.DBMetas()
+	if err != nil {
+		return nil, fmt.Errorf("failed to get tables: %v", err)
+	}
+
+	// 初始化普通表和LOB表信息的映射
+	tableInfoMap := make(map[string]models.TableInfo)    // 普通表
+	lobTableInfoMap := make(map[string]models.TableInfo) // LOB表
+
+	// 查询数据库的总大小
+	query := `SELECT SUM(bytes) / 1024 / 1024 AS size_mb
+              FROM dba_segments WHERE owner =?`
+	var size sql.NullFloat64
+	found, err := m.Engine.SQL(query, database).Get(&size)
+	if err != nil {
+		return nil, fmt.Errorf("failed to get database size: %v", err)
+	}
+	if !found {
+		return nil, fmt.Errorf("no size information found for database: %s", database)
+	}
+
+	// 查询每个表的大小
+	query = `SELECT table_name, SUM(bytes) / 1024 / 1024 AS size_mb
+             FROM dba_segments WHERE owner =? AND segment_type = 'TABLE'
+             GROUP BY table_name`
+	results, err := m.Engine.QueryString(query, database)
+	if err != nil {
+		return nil, fmt.Errorf("failed to get table sizes: %v", err)
+	}
+
+	// 解析每个表的信息
+	for _, row := range results {
+		tableName := row["table_name"]
+		tableSizeStr := row["size_mb"]
+		tableSize, _ := strconv.ParseFloat(tableSizeStr, 64)
+
+		// 初始化TableInfo
+		info := models.TableInfo{
+			Name: tableName,
+			Size: tableSize,
+		}
+
+		// 获取表的DDL
+		ddlQuery := fmt.Sprintf("SELECT dbms_metadata.get_ddl('TABLE', '%s', '%s') AS ddl FROM dual", tableName, database)
+		ddlResult, err := m.Engine.QueryString(ddlQuery)
+		if err != nil {
+			return nil, fmt.Errorf("failed to get DDL for table %s: %v", tableName, err)
+		}
+		if len(ddlResult) > 0 {
+			info.DDL = ddlResult[0]["ddl"]
+		}
+
+		// 获取表的行数
+		countQuery := fmt.Sprintf("SELECT COUNT(*) AS row_count FROM %s", tableName)
+		countResult, err := m.Engine.QueryString(countQuery)
+		if err != nil {
+			return nil, fmt.Errorf("failed to get row count for table %s: %v", tableName, err)
+		}
+		if len(countResult) > 0 {
+			info.Count, _ = strconv.Atoi(countResult[0]["row_count"])
+		}
+
+		// 检查表是否包含LOB类型字段
+		columnQuery := fmt.Sprintf(`SELECT column_name, data_type 
+                                     FROM all_tab_columns 
+                                     WHERE owner = '%s' AND table_name = '%s'`, database, tableName)
+		columns, err := m.Engine.QueryString(columnQuery)
+		if err != nil {
+			return nil, fmt.Errorf("failed to get columns for table %s: %v", tableName, err)
+		}
+
+		// 遍历列,检查是否有LOB类型(CLOB, BLOB等)
+		isLobTable := false
+		for _, column := range columns {
+			dataType := column["data_type"]
+			if dataType == "CLOB" || dataType == "BLOB" || dataType == "NCLOB" {
+				isLobTable = true
+				break
+			}
+		}
+
+		// 将表信息存入对应的映射
+		if isLobTable {
+			lobTableInfoMap[tableName] = info
+		} else {
+			tableInfoMap[tableName] = info
+		}
+	}
+
+	// 确定数据库总大小
+	sizeMB := 0.0
+	if size.Valid {
+		sizeMB = size.Float64
+	}
+
+	// 获取存储过程和函数
+	funcsQuery := `SELECT object_name, object_type, text
+                   FROM all_source
+                   WHERE owner =?`
+	funcsResults, err := m.Engine.QueryString(funcsQuery, database)
+	if err != nil {
+		return nil, fmt.Errorf("获取存储过程/函数失败: %v", err)
+	}
+
+	var functionInfos []models.FunctionInfo
+	for _, row := range funcsResults {
+		functionInfo := models.FunctionInfo{
+			Name:       row["object_name"],
+			Type:       row["object_type"],
+			Definition: row["text"],
+		}
+		functionInfos = append(functionInfos, functionInfo)
+	}
+
+	// 获取触发器
+	triggersQuery := `SELECT trigger_name, triggering_event, table_name,
+                         before_after, statement_type, trigger_body
+                         FROM all_triggers
+                         WHERE owner =?`
+	triggersResults, err := m.Engine.QueryString(triggersQuery, database)
+	if err != nil {
+		return nil, fmt.Errorf("获取触发器失败: %v", err)
+	}
+
+	var triggerInfos []models.TriggerInfo
+	for _, row := range triggersResults {
+		triggerInfo := models.TriggerInfo{
+			Name:      row["trigger_name"],
+			Event:     row["triggering_event"],
+			Table:     row["table_name"],
+			Timing:    row["before_after"],
+			Statement: row["trigger_body"],
+		}
+		triggerInfos = append(triggerInfos, triggerInfo)
+	}
+
+	// 返回包含总大小、表数量和表信息的DatabaseInfo结构体
+	return &models.DatabaseInfo{
+		SizeMB:     sizeMB,
+		TableCount: len(tables),
+		Tables:     tableInfoMap,
+		LobTables:  lobTableInfoMap,
+		Functions:  functionInfos,
+		Triggers:   triggerInfos,
+	}, nil
+}
+
+// ReadTableFromDBWithPagination 使用 Xorm 分页从数据库中读取表数据(修改为适用于Oracle语法)
+// 参数:
+// - engine: Xorm 引擎实例,用于连接和操作数据库
+// - tableName: 要读取的表名
+// - offset: 分页查询的起始偏移量
+// - pageSize: 每次分页查询的行数
+// 返回值:
+// - headers: 表的列名列表
+// - records: 表的数据,包含每行的内容
+// - error: 如果查询出错,返回相关的错误信息
+func (o *OracleExecutor) ReadTableFromDBWithPagination(tableName string, offset int, pageSize int) ([]string, [][]string, error) {
+	// 构建 SQL 查询,使用 ROWNUM 来实现分页查询数据(Oracle语法)
+	sql := fmt.Sprintf("SELECT * FROM (SELECT t.*, ROWNUM rnum FROM %s t WHERE ROWNUM <= %d) WHERE rnum > %d", tableName, offset+pageSize, offset)
+	rows, err := o.Engine.QueryString(sql)
+	if err != nil {
+		return nil, nil, fmt.Errorf("failed to query table: %v", err)
+	}
+
+	// 如果没有查询到数据,则返回 nil
+	if len(rows) == 0 {
+		return nil, nil, nil
+	}
+
+	// 获取列名
+	headers := make([]string, 0)
+	for key := range rows[0] {
+		headers = append(headers, key)
+	}
+
+	// 获取数据行
+	records := make([][]string, 0)
+	for _, row := range rows {
+		record := make([]string, len(headers))
+		for i, header := range headers {
+			record[i] = row[header]
+		}
+		records = append(records, record)
+	}
+
+	return headers, records, nil
+}
+
+// InsertRecordsToDB 使用 Xorm 的 Session 结合 Prepare 语句将记录插入到数据库中(Oracle语法无需修改插入部分的SQL基本逻辑,因为插入语句在Oracle和MySQL有相似之处,这里假设字段类型等在Oracle中也能适配现有的插入逻辑)
+// 参数:
+// - engine: Xorm 引擎实例,用于连接和操作数据库
+// - tableName: 插入的目标表名
+// - headers: 表头,用于生成插入语句中的列名
+// - records: 要插入的数据记录,每行对应表的一条记录
+// 返回值:
+// - error: 如果插入过程中有错误,返回相关的错误信息
+func (o *OracleExecutor) InsertRecordsToDB(tableName string, headers []string, records [][]string) error {
+	if len(records) == 0 {
+		return nil // 如果没有记录,不进行任何插入操作
+	}
+
+	// 获取 Xorm 底层的 *sql.DB 实例
+	db := o.Engine.DB().DB
+
+	// 构建插入 SQL 语句
+	columnNames := strings.Join(headers, ", ")
+	placeholders := strings.Repeat("?, ", len(headers))
+	placeholders = strings.TrimSuffix(placeholders, ", ")
+	insertSQL := fmt.Sprintf("INSERT INTO %s (%s) VALUES (%s)", tableName, columnNames, placeholders)
+
+	// 准备插入语句
+	stmt, err := db.Prepare(insertSQL)
+	if err != nil {
+		return fmt.Errorf("failed to prepare insert statement: %v", err)
+	}
+	defer stmt.Close()
+
+	// 插入记录
+	for _, record := range records {
+		args := make([]interface{}, len(record))
+		for i, v := range record {
+			args[i] = v
+		}
+
+		// 使用准备好的语句执行插入
+		_, err := stmt.Exec(args...)
+		if err != nil {
+			return fmt.Errorf("failed to execute insert statement: %v", err)
+		}
+	}
+
+	return nil
+}

+ 3 - 3
internal/services/reader_services.go

@@ -4,7 +4,6 @@ import (
 	"encoding/csv"
 	"fmt"
 	"io"
-	"io/ioutil"
 	"log"
 	"os"
 	"path/filepath"
@@ -26,7 +25,8 @@ func ReaderMain(
 	mysqeExec db_executor.DBExecutor, // 数据库执行器接口
 ) {
 	// 创建日志文件
-	logFile, err := os.OpenFile("process.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
+	logfileLocation := filepath.Join(dirPath, "import.log")
+	logFile, err := os.OpenFile(logfileLocation, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
 	if err != nil {
 		fmt.Printf("无法创建日志文件:%v\n", err)
 		return
@@ -127,7 +127,7 @@ func processTable(
 	logMessage(result, fmt.Sprintf("表 %s 的目录路径为:%s\n", tableName, tableDir), logger)
 
 	// 获取该表目录下的所有 CSV 文件
-	files, err := ioutil.ReadDir(tableDir)
+	files, err := os.ReadDir(tableDir)
 	if err != nil {
 		result.Success = false
 		result.Error = err