oracle_executor.go 9.2 KB


  1. package db_executor
  2. import (
  3. "database/sql"
  4. "fmt"
  5. "strconv"
  6. "strings"
  7. "time"
  8. "xg_fetl/internal/models"
  9. "xorm.io/xorm"
  10. )
  11. type OracleExecutor struct {
  12. BaseExecutor
  13. }
  14. func (o *OracleExecutor) InitDB(dsn string) error {
  15. var err error
  16. o.Engine, err = xorm.NewEngine("mysql", dsn)
  17. if err != nil {
  18. return fmt.Errorf("failed to initialize DB: %v", err)
  19. }
  20. // 设置连接池参数
  21. o.Engine.SetMaxOpenConns(500)
  22. o.Engine.SetMaxIdleConns(500)
  23. o.Engine.SetConnMaxLifetime(time.Hour)
  24. return nil
  25. }
  26. // GetDatabases 获取所有数据库名称(在Oracle中通常称为模式),排除系统数据库(修改为适用于Oracle语法)
  27. func (m *OracleExecutor) GetDatabases(excludeDBs []string) ([]string, error) {
  28. // 在Oracle中,通过查询数据字典视图来获取所有模式(类似于MySQL中的数据库)
  29. // 这里查询 ALL_USERS 视图来获取所有用户模式,因为每个用户模式可看作一个独立的数据库空间
  30. var databases []struct {
  31. Username string `xorm:"Username"`
  32. }
  33. // 执行查询以获取所有模式(类似于MySQL中的数据库)
  34. err := m.Engine.SQL("SELECT Username FROM ALL_USERS").Find(&databases)
  35. if err != nil {
  36. return nil, fmt.Errorf("failed to get databases: %v", err)
  37. }
  38. // 获取所有模式名(类似于MySQL中的数据库名)
  39. var allDBs []string
  40. for _, db := range databases {
  41. allDBs = append(allDBs, db.Username)
  42. }
  43. // 过滤排除的数据库
  44. excludeDBsMap := make(map[string]struct{})
  45. for _, db := range excludeDBs {
  46. excludeDBsMap[db] = struct{}{}
  47. }
  48. var dbs []string
  49. for _, db := range allDBs {
  50. if _, ok := excludeDBsMap[db]; !ok {
  51. dbs = append(dbs, db)
  52. }
  53. }
  54. return dbs, nil
  55. }
  56. func (m *OracleExecutor) GetDatabaseInfo(database string) (*models.DatabaseInfo, error) {
  57. // 获取表列表
  58. tables, err := m.Engine.DBMetas()
  59. if err != nil {
  60. return nil, fmt.Errorf("failed to get tables: %v", err)
  61. }
  62. // 初始化普通表和LOB表信息的映射
  63. tableInfoMap := make(map[string]models.TableInfo) // 普通表
  64. lobTableInfoMap := make(map[string]models.TableInfo) // LOB表
  65. // 查询数据库的总大小
  66. query := `SELECT SUM(bytes) / 1024 / 1024 AS size_mb
  67. FROM dba_segments WHERE owner =?`
  68. var size sql.NullFloat64
  69. found, err := m.Engine.SQL(query, database).Get(&size)
  70. if err != nil {
  71. return nil, fmt.Errorf("failed to get database size: %v", err)
  72. }
  73. if !found {
  74. return nil, fmt.Errorf("no size information found for database: %s", database)
  75. }
  76. // 查询每个表的大小
  77. query = `SELECT table_name, SUM(bytes) / 1024 / 1024 AS size_mb
  78. FROM dba_segments WHERE owner =? AND segment_type = 'TABLE'
  79. GROUP BY table_name`
  80. results, err := m.Engine.QueryString(query, database)
  81. if err != nil {
  82. return nil, fmt.Errorf("failed to get table sizes: %v", err)
  83. }
  84. // 解析每个表的信息
  85. for _, row := range results {
  86. tableName := row["table_name"]
  87. tableSizeStr := row["size_mb"]
  88. tableSize, _ := strconv.ParseFloat(tableSizeStr, 64)
  89. // 初始化TableInfo
  90. info := models.TableInfo{
  91. Name: tableName,
  92. Size: tableSize,
  93. }
  94. // 获取表的DDL
  95. ddlQuery := fmt.Sprintf("SELECT dbms_metadata.get_ddl('TABLE', '%s', '%s') AS ddl FROM dual", tableName, database)
  96. ddlResult, err := m.Engine.QueryString(ddlQuery)
  97. if err != nil {
  98. return nil, fmt.Errorf("failed to get DDL for table %s: %v", tableName, err)
  99. }
  100. if len(ddlResult) > 0 {
  101. info.DDL = ddlResult[0]["ddl"]
  102. }
  103. // 获取表的行数
  104. countQuery := fmt.Sprintf("SELECT COUNT(*) AS row_count FROM %s", tableName)
  105. countResult, err := m.Engine.QueryString(countQuery)
  106. if err != nil {
  107. return nil, fmt.Errorf("failed to get row count for table %s: %v", tableName, err)
  108. }
  109. if len(countResult) > 0 {
  110. info.Count, _ = strconv.Atoi(countResult[0]["row_count"])
  111. }
  112. // 检查表是否包含LOB类型字段
  113. columnQuery := fmt.Sprintf(`SELECT column_name, data_type
  114. FROM all_tab_columns
  115. WHERE owner = '%s' AND table_name = '%s'`, database, tableName)
  116. columns, err := m.Engine.QueryString(columnQuery)
  117. if err != nil {
  118. return nil, fmt.Errorf("failed to get columns for table %s: %v", tableName, err)
  119. }
  120. // 遍历列,检查是否有LOB类型(CLOB, BLOB等)
  121. isLobTable := false
  122. for _, column := range columns {
  123. dataType := column["data_type"]
  124. if dataType == "CLOB" || dataType == "BLOB" || dataType == "NCLOB" {
  125. isLobTable = true
  126. break
  127. }
  128. }
  129. // 将表信息存入对应的映射
  130. if isLobTable {
  131. lobTableInfoMap[tableName] = info
  132. } else {
  133. tableInfoMap[tableName] = info
  134. }
  135. }
  136. // 确定数据库总大小
  137. sizeMB := 0.0
  138. if size.Valid {
  139. sizeMB = size.Float64
  140. }
  141. // 获取存储过程和函数
  142. funcsQuery := `SELECT object_name, object_type, text
  143. FROM all_source
  144. WHERE owner =?`
  145. funcsResults, err := m.Engine.QueryString(funcsQuery, database)
  146. if err != nil {
  147. return nil, fmt.Errorf("获取存储过程/函数失败: %v", err)
  148. }
  149. var functionInfos []models.FunctionInfo
  150. for _, row := range funcsResults {
  151. functionInfo := models.FunctionInfo{
  152. Name: row["object_name"],
  153. Type: row["object_type"],
  154. Definition: row["text"],
  155. }
  156. functionInfos = append(functionInfos, functionInfo)
  157. }
  158. // 获取触发器
  159. triggersQuery := `SELECT trigger_name, triggering_event, table_name,
  160. before_after, statement_type, trigger_body
  161. FROM all_triggers
  162. WHERE owner =?`
  163. triggersResults, err := m.Engine.QueryString(triggersQuery, database)
  164. if err != nil {
  165. return nil, fmt.Errorf("获取触发器失败: %v", err)
  166. }
  167. var triggerInfos []models.TriggerInfo
  168. for _, row := range triggersResults {
  169. triggerInfo := models.TriggerInfo{
  170. Name: row["trigger_name"],
  171. Event: row["triggering_event"],
  172. Table: row["table_name"],
  173. Timing: row["before_after"],
  174. Statement: row["trigger_body"],
  175. }
  176. triggerInfos = append(triggerInfos, triggerInfo)
  177. }
  178. // 返回包含总大小、表数量和表信息的DatabaseInfo结构体
  179. return &models.DatabaseInfo{
  180. SizeMB: sizeMB,
  181. TableCount: len(tables),
  182. Tables: tableInfoMap,
  183. LobTables: lobTableInfoMap,
  184. Functions: functionInfos,
  185. Triggers: triggerInfos,
  186. }, nil
  187. }
  188. // ReadTableFromDBWithPagination 使用 Xorm 分页从数据库中读取表数据(修改为适用于Oracle语法)
  189. // 参数:
  190. // - engine: Xorm 引擎实例,用于连接和操作数据库
  191. // - tableName: 要读取的表名
  192. // - offset: 分页查询的起始偏移量
  193. // - pageSize: 每次分页查询的行数
  194. // 返回值:
  195. // - headers: 表的列名列表
  196. // - records: 表的数据,包含每行的内容
  197. // - error: 如果查询出错,返回相关的错误信息
  198. func (o *OracleExecutor) ReadTableFromDBWithPagination(tableName string, offset int, pageSize int) ([]string, [][]string, error) {
  199. // 构建 SQL 查询,使用 ROWNUM 来实现分页查询数据(Oracle语法)
  200. sql := fmt.Sprintf("SELECT * FROM (SELECT t.*, ROWNUM rnum FROM %s t WHERE ROWNUM <= %d) WHERE rnum > %d", tableName, offset+pageSize, offset)
  201. rows, err := o.Engine.QueryString(sql)
  202. if err != nil {
  203. return nil, nil, fmt.Errorf("failed to query table: %v", err)
  204. }
  205. // 如果没有查询到数据,则返回 nil
  206. if len(rows) == 0 {
  207. return nil, nil, nil
  208. }
  209. // 获取列名
  210. headers := make([]string, 0)
  211. for key := range rows[0] {
  212. headers = append(headers, key)
  213. }
  214. // 获取数据行
  215. records := make([][]string, 0)
  216. for _, row := range rows {
  217. record := make([]string, len(headers))
  218. for i, header := range headers {
  219. record[i] = row[header]
  220. }
  221. records = append(records, record)
  222. }
  223. return headers, records, nil
  224. }
  225. // InsertRecordsToDB 使用 Xorm 的 Session 结合 Prepare 语句将记录插入到数据库中(Oracle语法无需修改插入部分的SQL基本逻辑,因为插入语句在Oracle和MySQL有相似之处,这里假设字段类型等在Oracle中也能适配现有的插入逻辑)
  226. // 参数:
  227. // - engine: Xorm 引擎实例,用于连接和操作数据库
  228. // - tableName: 插入的目标表名
  229. // - headers: 表头,用于生成插入语句中的列名
  230. // - records: 要插入的数据记录,每行对应表的一条记录
  231. // 返回值:
  232. // - error: 如果插入过程中有错误,返回相关的错误信息
  233. func (o *OracleExecutor) InsertRecordsToDB(tableName string, headers []string, records [][]string) error {
  234. if len(records) == 0 {
  235. return nil // 如果没有记录,不进行任何插入操作
  236. }
  237. // 获取 Xorm 底层的 *sql.DB 实例
  238. db := o.Engine.DB().DB
  239. // 构建插入 SQL 语句
  240. columnNames := strings.Join(headers, ", ")
  241. placeholders := strings.Repeat("?, ", len(headers))
  242. placeholders = strings.TrimSuffix(placeholders, ", ")
  243. insertSQL := fmt.Sprintf("INSERT INTO %s (%s) VALUES (%s)", tableName, columnNames, placeholders)
  244. // 准备插入语句
  245. stmt, err := db.Prepare(insertSQL)
  246. if err != nil {
  247. return fmt.Errorf("failed to prepare insert statement: %v", err)
  248. }
  249. defer stmt.Close()
  250. // 插入记录
  251. for _, record := range records {
  252. args := make([]interface{}, len(record))
  253. for i, v := range record {
  254. args[i] = v
  255. }
  256. // 使用准备好的语句执行插入
  257. _, err := stmt.Exec(args...)
  258. if err != nil {
  259. return fmt.Errorf("failed to execute insert statement: %v", err)
  260. }
  261. }
  262. return nil
  263. }