writer_services.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500
  1. package services
  2. import (
  3. "fmt"
  4. "io"
  5. "io/ioutil"
  6. "log"
  7. "os"
  8. "path/filepath"
  9. "sort"
  10. "sync"
  11. "time"
  12. "xg_fetl/internal/db_executor"
  13. "xg_fetl/internal/models"
  14. "xg_fetl/module/csv/csvwriter"
  15. )
  16. /*
  17. WriteMain 导出数据库表数据到 CSV 文件,基于分页和文件大小进行分片
  18. 参数:
  19. * executor: 数据库执行器,用于读取数据库表中的数据
  20. * tableNames: 要导出的表名数组
  21. * baseFilePath: 导出文件的基本路径,用于生成不同分片的文件名
  22. * delimiter: CSV 的分隔符
  23. * maxFileSize: 单个 CSV 文件的最大大小
  24. * pageSize: 每次分页读取的行数
  25. */
  26. func WriteMain(
  27. executor db_executor.DBExecutor,
  28. tableNames map[string]models.TableInfo,
  29. baseFilePath string,
  30. delimiter rune,
  31. maxFileSize int64,
  32. pageSize int) error {
  33. {
  34. // --------------
  35. // fmt.Println("--------tableName: 开始导出表数据--------", tableName)
  36. // // 开始分页读取并导出数据
  37. // offset := 0
  38. // for {
  39. // // 从数据库中分页读取数据
  40. // headers, records, err := executor.ReadTableFromDBWithPagination(tableName, offset, pageSize)
  41. // if err != nil {
  42. // log.Fatalf("Failed to read table with pagination: %v", err)
  43. // return err
  44. // }
  45. // fmt.Println("循环")
  46. // // 如果没有更多数据,则结束循环
  47. // if records == nil {
  48. // break
  49. // }
  50. // fmt.Println("baseFilePath: ", baseFilePath)
  51. // csvFilePath := fmt.Sprintf("%s/%s", baseFilePath, tableName)
  52. // // 将数据写入 CSV 文件中
  53. // err = csvwriter.WriteRecordsToCSVWithFileSizeLimit(csvFilePath, headers, records, delimiter, maxFileSize, &fileIndex, &currentFileSize, &currentFilePath)
  54. // if err != nil {
  55. // log.Fatalf("Failed to write records to CSV: %v", err)
  56. // return err
  57. // }
  58. // // 增加偏移量,以获取下一页数据
  59. // offset += pageSize
  60. // }
  61. // fmt.Println("--------tableName 完成--------", tableName)
  62. // }
  63. err := exportTableConcurrently(tableNames, baseFilePath, pageSize, delimiter, &executor, maxFileSize)
  64. if err != nil {
  65. fmt.Println("导出表数据失败:", err)
  66. return err
  67. }
  68. fmt.Println("Table exported successfully with size limit and pagination")
  69. // return nil
  70. }
  71. return nil
  72. }
  73. // 定义用于接收每个表处理结果的结构体
  74. type TableResult struct {
  75. TableName string
  76. Success bool
  77. Error error
  78. GeneratedFiles int
  79. ErrorCount int // 记录该表发生的错误数量
  80. GoroutineCount int // 记录该表使用的协程数量
  81. TotalRows int // 表的总行数
  82. AverageRowSize int64 // 平均行大小
  83. ExportDuration time.Duration // 导出所花费的时间
  84. EstimatedDuration time.Duration // 估算的导出时间
  85. TableSize int64 // 表的容量(字节)
  86. Logs []string // 保存该表的日志信息
  87. }
  88. // 获取当前时间字符串
  89. func currentTime() string {
  90. return time.Now().Format("2006-01-02 15:04:05")
  91. }
  92. // 对表名进行排序,返回排序后的表名列表
  93. func sortedTableNames(tableInfos map[string]models.TableInfo) []string {
  94. tableNames := make([]string, 0, len(tableInfos))
  95. for name := range tableInfos {
  96. tableNames = append(tableNames, name)
  97. }
  98. sort.Strings(tableNames)
  99. return tableNames
  100. }
  101. /**
  102. * exportTableConcurrently 并发地导出多个表的数据到 CSV 文件。
  103. *
  104. * 参数:
  105. * - tableInfos: 包含表信息的映射,键为表名,值为 TableInfo。
  106. * - baseFilePath: 基础文件路径,用于存储导出的 CSV 文件。
  107. * - pageSize: 分页大小,决定每次从数据库读取的记录数量。
  108. * - delimiter: CSV 文件的分隔符。
  109. * - executor: 数据库执行器,用于执行数据库查询。
  110. * - maxFileSizeMB: 单个 CSV 文件的最大大小(以 MB 为单位)。
  111. *
  112. * 返回值:
  113. * - error: 如果发生错误,返回错误信息。
  114. */
  115. func exportTableConcurrently(
  116. tableInfos map[string]models.TableInfo,
  117. baseFilePath string,
  118. pageSize int,
  119. delimiter rune,
  120. executor *db_executor.DBExecutor,
  121. maxFileSizeMB int64,
  122. ) error {
  123. var (
  124. wg sync.WaitGroup
  125. resultsMap = make(map[string]TableResult)
  126. resultsMutex sync.Mutex // 用于保护 resultsMap 的并发访问
  127. maxConcurrentTableExports = 3
  128. tableSemaphore = make(chan struct{}, maxConcurrentTableExports)
  129. maxConcurrentFileExports = 100
  130. )
  131. // ! 创建日志文件
  132. logFilePath := filepath.Join(baseFilePath, "export.log")
  133. logFile, err := os.OpenFile(logFilePath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)
  134. if err != nil {
  135. return fmt.Errorf("无法打开日志文件: %v", err)
  136. }
  137. defer logFile.Close()
  138. // 创建日志记录器,不添加任何前缀或标志
  139. logger := log.New(logFile, "", 0)
  140. // ! 估算数据传输速率
  141. dataTransferRate, err := estimateDataTransferRate()
  142. if err != nil {
  143. msg := fmt.Sprintf("%s 无法估算数据传输速率,默认使用 50 MB/s。错误信息:%v\n", currentTime(), err)
  144. fmt.Print(msg)
  145. logger.Print(msg)
  146. dataTransferRate = 50 * 1024 * 1024 // 默认 50 MB/s
  147. } else {
  148. msg := fmt.Sprintf("%s 估算的数据传输速率为:%.2f MB/s\n", currentTime(), dataTransferRate/(1024*1024))
  149. fmt.Print(msg)
  150. logger.Print(msg)
  151. }
  152. // ! 遍历每个表进行处理,使用协程
  153. for _, tableName := range sortedTableNames(tableInfos) {
  154. tableInfo := tableInfos[tableName]
  155. wg.Add(1)
  156. go func(tableInfo models.TableInfo) {
  157. tableSemaphore <- struct{}{} // 获取表级信号量
  158. defer func() {
  159. <-tableSemaphore // 释放表级信号量
  160. wg.Done()
  161. }()
  162. tableName := tableInfo.Name
  163. totalRecords := tableInfo.Count
  164. // 创建一个 TableResult 来保存该表的结果和日志
  165. result := TableResult{
  166. TableName: tableName,
  167. TotalRows: totalRecords,
  168. GoroutineCount: 1, // 包含当前协程
  169. }
  170. // !创建存储当前表的文件夹
  171. tableDir := filepath.Join(baseFilePath, tableName)
  172. if err := os.MkdirAll(tableDir, 0755); err != nil {
  173. msg := fmt.Sprintf("%s 创建表 %s 文件夹失败: %v\n", currentTime(), tableName, err)
  174. logMessage(&result, msg, logger)
  175. result.Success = false
  176. result.Error = err
  177. result.ErrorCount = 1
  178. resultsMutex.Lock()
  179. resultsMap[tableName] = result
  180. resultsMutex.Unlock()
  181. return
  182. }
  183. // !估算每条记录的平均大小
  184. averageRecordSize, err := estimateAverageRecordSize(*executor, tableName)
  185. if err != nil {
  186. msg := fmt.Sprintf("%s 无法估算表 %s 平均记录大小: %v\n", currentTime(), tableName, err)
  187. logMessage(&result, msg, logger)
  188. result.Success = false
  189. result.Error = err
  190. result.ErrorCount = 1
  191. resultsMutex.Lock()
  192. resultsMap[tableName] = result
  193. resultsMutex.Unlock()
  194. return
  195. }
  196. result.AverageRowSize = averageRecordSize
  197. msg := fmt.Sprintf("%s 表 %s 的平均记录大小: %d 字节\n", currentTime(), tableName, averageRecordSize)
  198. logMessage(&result, msg, logger)
  199. // !计算表的容量
  200. tableSize := averageRecordSize * int64(totalRecords)
  201. result.TableSize = tableSize
  202. tableSizeMB := float64(tableSize) / (1024.0 * 1024.0)
  203. msg = fmt.Sprintf("%s 表 %s 的容量约为:%.2f MB\n", currentTime(), tableName, tableSizeMB)
  204. logMessage(&result, msg, logger)
  205. // !估算导出时间
  206. estimatedDurationSeconds := float64(tableSize) / dataTransferRate
  207. estimatedDuration := time.Duration(estimatedDurationSeconds * float64(time.Second))
  208. result.EstimatedDuration = estimatedDuration
  209. msg = fmt.Sprintf("%s 表 %s 估算导出时间:%.2f 秒\n", currentTime(), tableName, estimatedDurationSeconds)
  210. logMessage(&result, msg, logger)
  211. // !计算每个 CSV 文件应包含的记录数
  212. maxFileSizeBytes := maxFileSizeMB * 1024 * 1024 // 将 MB 转换为字节
  213. recordsPerFile := int(maxFileSizeBytes / averageRecordSize)
  214. if recordsPerFile <= 0 {
  215. recordsPerFile = 1000 // 设置一个最小值,避免 recordsPerFile 为 0
  216. }
  217. msg = fmt.Sprintf("%s 表 %s 的每个文件应包含的记录数: %d\n", currentTime(), tableName, recordsPerFile)
  218. logMessage(&result, msg, logger)
  219. // !计算需要的 CSV 文件数量
  220. totalFiles := (totalRecords + recordsPerFile - 1) / recordsPerFile
  221. msg = fmt.Sprintf("%s 表 %s 需要的 CSV 文件数量: %d\n", currentTime(), tableName, totalFiles)
  222. logMessage(&result, msg, logger)
  223. //! 记录导出开始时间
  224. startTime := time.Now()
  225. var fileWg sync.WaitGroup
  226. fileSemaphore := make(chan struct{}, maxConcurrentFileExports)
  227. maxRetries := 3
  228. // 错误计数器
  229. var errorCount int32
  230. // 为每个 CSV 文件启动一个协程
  231. for fileIndex := 0; fileIndex < totalFiles; fileIndex++ {
  232. fileWg.Add(1)
  233. result.GoroutineCount++ // 增加协程计数器
  234. go func(fileIndex int) {
  235. fileSemaphore <- struct{}{} // 获取文件级信号量
  236. defer func() {
  237. <-fileSemaphore // 释放文件级信号量
  238. fileWg.Done()
  239. }()
  240. offset := fileIndex * recordsPerFile
  241. limit := recordsPerFile
  242. if offset+limit > totalRecords {
  243. limit = totalRecords - offset
  244. }
  245. csvFilePath := filepath.Join(tableDir, fmt.Sprintf("%s_%d.csv", tableName, fileIndex+1))
  246. attempt := 0
  247. for {
  248. attempt++
  249. err := exportDataRangeToCSV(*executor, tableName, csvFilePath, delimiter, offset, limit, pageSize)
  250. if err != nil {
  251. if attempt < maxRetries {
  252. msg := fmt.Sprintf("%s 表 %s 导出到文件 %s 时发生错误 (第 %d 次尝试): %v,重试...\n", currentTime(), tableName, csvFilePath, attempt, err)
  253. logMessage(&result, msg, logger)
  254. time.Sleep(2 * time.Second)
  255. continue
  256. } else {
  257. msg := fmt.Sprintf("%s 表 %s 导出到文件 %s 时发生错误 (第 %d 次尝试): %v,已达最大重试次数。\n", currentTime(), tableName, csvFilePath, attempt, err)
  258. logMessage(&result, msg, logger)
  259. // 增加错误计数器
  260. errorCount++
  261. break
  262. }
  263. } else {
  264. msg := fmt.Sprintf("%s 表 %s 的文件 %s 导出完成。\n", currentTime(), tableName, filepath.Base(csvFilePath))
  265. logMessage(&result, msg, logger)
  266. break
  267. }
  268. }
  269. }(fileIndex)
  270. }
  271. // 等待当前表的所有文件导出完成
  272. fileWg.Wait()
  273. // 计算实际导出耗时
  274. exportDuration := time.Since(startTime)
  275. result.ExportDuration = exportDuration
  276. msg = fmt.Sprintf("%s 表 %s 导出完成,共生成 %d 个文件,使用了 %d 个协程。\n", currentTime(), tableName, totalFiles, result.GoroutineCount)
  277. logMessage(&result, msg, logger)
  278. success := errorCount == 0
  279. var finalError error
  280. if !success {
  281. finalError = fmt.Errorf("表 %s 导出过程中发生 %d 个错误", tableName, errorCount)
  282. }
  283. result.Success = success
  284. result.Error = finalError
  285. result.ErrorCount = int(errorCount)
  286. result.GeneratedFiles = totalFiles
  287. resultsMutex.Lock()
  288. resultsMap[tableName] = result
  289. resultsMutex.Unlock()
  290. }(tableInfo)
  291. }
  292. // 等待所有表处理完成
  293. wg.Wait()
  294. // 写入日志文件
  295. err = WriteExportReportToLogFile(logFilePath, resultsMap, tableInfos)
  296. if err != nil {
  297. return fmt.Errorf("无法写入日志文件: %v", err)
  298. }
  299. return nil
  300. }
  301. /**
  302. * estimateAverageRecordSize 估算指定表中每条记录的平均大小(以字节为单位)。
  303. *
  304. * 参数:
  305. * - executor: 数据库执行器,用于执行数据库查询。
  306. * - tableName: 需要估算的表名。
  307. *
  308. * 返回值:
  309. * - int64: 平均每条记录的大小(字节)。
  310. * - error: 如果发生错误,返回错误信息。
  311. */
  312. func estimateAverageRecordSize(executor db_executor.DBExecutor, tableName string) (int64, error) {
  313. // 随机抽取一定数量的记录,计算平均大小
  314. sampleSize := 100 // 抽样的记录数
  315. offset := 0 // 从第 0 条记录开始抽样
  316. // 使用 ReadTableFromDBWithPagination 函数获取样本数据
  317. _, records, err := executor.ReadTableFromDBWithPagination(tableName, offset, sampleSize)
  318. if err != nil {
  319. return 0, fmt.Errorf("failed to query table: %v", err)
  320. }
  321. // 检查是否获取到了数据
  322. if records == nil || len(records) == 0 {
  323. return 0, fmt.Errorf("没有获取到抽样记录,表数量可能为 0")
  324. }
  325. var totalSize int64 = 0 // 总记录大小
  326. var count int64 = 0 // 记录数
  327. // 遍历每一条记录,计算其大小
  328. for _, record := range records {
  329. var recordSize int64 = 0
  330. for _, val := range record {
  331. if val != "" {
  332. recordSize += int64(len(val)) // 累加字段值的长度
  333. }
  334. }
  335. totalSize += recordSize // 累加每条记录的大小
  336. count++
  337. }
  338. if count == 0 {
  339. return 0, fmt.Errorf("没有获取到抽样记录,表数量可能为 0")
  340. }
  341. averageSize := totalSize / count // 计算平均记录大小
  342. return averageSize, nil
  343. }
  344. // exportDataRangeToCSV 导出指定范围的数据到 CSV 文件
  345. // 参数:
  346. // - executor: 数据库执行器
  347. // - tableName: 表名
  348. // - csvFilePath: CSV 文件路径
  349. // - delimiter: CSV 分隔符
  350. // - offset: 数据起始偏移量
  351. // - limit: 数据读取限制数量
  352. // - pageSize: 分页大小
  353. // 返回值:
  354. // - error: 如果发生错误,返回错误信息
  355. func exportDataRangeToCSV(
  356. executor db_executor.DBExecutor,
  357. tableName string,
  358. csvFilePath string,
  359. delimiter rune,
  360. offset int,
  361. limit int,
  362. pageSize int,
  363. ) error {
  364. // !首先确保 CSV 文件存在,创建新文件并写入表头
  365. var headers []string
  366. var headersWritten bool = false
  367. // !计算需要分页的次数
  368. totalPages := (limit + pageSize - 1) / pageSize
  369. for page := 0; page < totalPages; page++ {
  370. pageOffset := offset + page*pageSize
  371. pageLimit := pageSize
  372. if pageOffset+pageLimit > offset+limit {
  373. pageLimit = (offset + limit) - pageOffset
  374. }
  375. // !从数据库读取数据
  376. currentHeaders, records, err := executor.ReadTableFromDBWithPagination(tableName, pageOffset, pageLimit)
  377. if err != nil {
  378. return fmt.Errorf("failed to read data from table %s: %v", tableName, err)
  379. }
  380. // !仅在首次获取到表头时进行赋值
  381. if !headersWritten && currentHeaders != nil {
  382. headers = currentHeaders
  383. headersWritten = true
  384. }
  385. // !判断是否需要写入表头
  386. appendToFile := page > 0 // 第一页时创建新文件,之后的页追加写入
  387. // !写入数据到 CSV 文件
  388. err = csvwriter.WriteRecordsToCSV(csvFilePath, headers, records, delimiter, appendToFile)
  389. if err != nil {
  390. return fmt.Errorf("failed to write data to CSV: %v", err)
  391. }
  392. fmt.Printf("表 %s 文件 %s 已写入 %d/%d 页。\n", tableName, filepath.Base(csvFilePath), page+1, totalPages)
  393. }
  394. return nil
  395. }
  396. // 估算数据传输速率的函数
  397. func estimateDataTransferRate() (float64, error) {
  398. // 创建临时文件
  399. tmpFile, err := ioutil.TempFile("", "data_transfer_test")
  400. if err != nil {
  401. return 0, err
  402. }
  403. defer os.Remove(tmpFile.Name()) // 确保测试完成后删除临时文件
  404. defer tmpFile.Close()
  405. // 准备测试数据,大小为 50 MB
  406. testDataSize := 50 * 1024 * 1024 // 50 MB
  407. testData := make([]byte, testDataSize)
  408. // 写入测试数据并测量时间
  409. startTime := time.Now()
  410. _, err = tmpFile.Write(testData)
  411. if err != nil {
  412. return 0, err
  413. }
  414. writeDuration := time.Since(startTime)
  415. // 计算写入速率
  416. writeRate := float64(testDataSize) / writeDuration.Seconds()
  417. // 读取测试数据并测量时间
  418. tmpFile.Seek(0, io.SeekStart)
  419. readBuffer := make([]byte, testDataSize)
  420. startTime = time.Now()
  421. _, err = io.ReadFull(tmpFile, readBuffer)
  422. if err != nil && err != io.EOF {
  423. return 0, err
  424. }
  425. readDuration := time.Since(startTime)
  426. // 计算读取速率
  427. readRate := float64(testDataSize) / readDuration.Seconds()
  428. // 取读写速率的平均值作为数据传输速率
  429. dataTransferRate := (writeRate + readRate) / 2
  430. return dataTransferRate, nil
  431. }