reader_services.go 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282
  1. package services
  2. import (
  3. "encoding/csv"
  4. "fmt"
  5. "io"
  6. "log"
  7. "os"
  8. "path/filepath"
  9. "strings"
  10. "sync"
  11. "time"
  12. "xg_fetl/internal/db_executor"
  13. "xg_fetl/internal/models"
  14. )
  15. // ReaderMain 函数读取 CSV 文件并将数据插入数据库
  16. func ReaderMain(
  17. dbName string, // 数据库名称
  18. dirPath string, // CSV 文件所在的根目录
  19. tableInfos map[string]models.TableInfo, // 要处理的表信息
  20. delimiter rune, // CSV 文件的分隔符
  21. pageSize int, // 分页大小,每次读取的记录数
  22. mysqeExec db_executor.DBExecutor, // 数据库执行器接口
  23. ) {
  24. // 创建日志文件
  25. logfileLocation := filepath.Join(dirPath, "import.log")
  26. logFile, err := os.OpenFile(logfileLocation, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
  27. if err != nil {
  28. fmt.Printf("无法创建日志文件:%v\n", err)
  29. return
  30. }
  31. defer logFile.Close()
  32. // 设置日志输出到文件和控制台
  33. multiWriter := io.MultiWriter(os.Stdout, logFile)
  34. logger := log.New(multiWriter, "", log.LstdFlags)
  35. // 创建一个等待组,用于等待所有表的处理完成
  36. var wg sync.WaitGroup
  37. // 创建一个通道,用于收集每个表的处理结果
  38. resultsChan := make(chan TableResult)
  39. // 遍历每个表,启动协程进行处理
  40. for tableName, tableInfo := range tableInfos {
  41. wg.Add(1)
  42. go func(tableName string, tableInfo models.TableInfo) {
  43. defer wg.Done()
  44. // 初始化 TableResult
  45. result := TableResult{
  46. TableName: tableName,
  47. Success: true,
  48. GeneratedFiles: 0,
  49. ErrorCount: 0,
  50. GoroutineCount: 0,
  51. TotalRows: 0,
  52. AverageRowSize: 0,
  53. ExportDuration: 0,
  54. TableSize: 0,
  55. Logs: []string{},
  56. }
  57. // 记录“开始处理表”日志
  58. logMessage(&result, fmt.Sprintf("开始处理表:%s\n", tableName), logger)
  59. // 处理单个表
  60. result = processTable(dbName, dirPath, tableName, tableInfo, delimiter, pageSize, mysqeExec, &result, logger)
  61. // 将结果发送到通道
  62. resultsChan <- result
  63. // 记录“完成处理表”日志
  64. logMessage(&result, fmt.Sprintf("完成处理表:%s\n", tableName), logger)
  65. }(tableName, tableInfo)
  66. }
  67. // 启动一个协程,等待所有表处理完毕后关闭结果通道
  68. go func() {
  69. wg.Wait()
  70. close(resultsChan)
  71. }()
  72. // 收集所有处理结果到 resultsMap
  73. resultsMap := make(map[string]TableResult)
  74. // 从结果通道接收每个表的处理结果
  75. for result := range resultsChan {
  76. // 处理结果,例如输出日志或进行其他处理
  77. logMessage(&result, fmt.Sprintf("表 %s 处理完成,成功:%v,错误数量:%d,总行数:%d\n",
  78. result.TableName, result.Success, result.ErrorCount, result.TotalRows), logger)
  79. // 将结果存储到 resultsMap
  80. resultsMap[result.TableName] = result
  81. }
  82. // 调用 WriteImportReportToLogFile 将总览信息和详细日志写入日志文件
  83. err = WriteImportReportToLogFile("import_report.log", resultsMap, tableInfos)
  84. if err != nil {
  85. logger.Printf("写入导入报告失败:%v\n", err)
  86. } else {
  87. logger.Println("导入报告已成功写入 import_report.log")
  88. }
  89. }
  90. // 处理单个表的函数
  91. func processTable(
  92. dbName string,
  93. dirPath string,
  94. tableName string,
  95. tableInfo models.TableInfo,
  96. delimiter rune,
  97. pageSize int,
  98. mysqeExec db_executor.DBExecutor,
  99. result *TableResult,
  100. logger *log.Logger,
  101. ) TableResult {
  102. logMessage(result, fmt.Sprintf("开始处理表 %s 的数据\n", tableName), logger)
  103. // 记录开始时间
  104. startTime := time.Now()
  105. // 构建表对应的目录路径
  106. tableDir := filepath.Join(dirPath, tableName)
  107. logMessage(result, fmt.Sprintf("表 %s 的目录路径为:%s\n", tableName, tableDir), logger)
  108. // 获取该表目录下的所有 CSV 文件
  109. files, err := os.ReadDir(tableDir)
  110. if err != nil {
  111. result.Success = false
  112. result.Error = err
  113. logMsg := fmt.Sprintf("读取目录失败:%v\n", err)
  114. logMessage(result, logMsg, logger)
  115. return *result
  116. }
  117. // 用于统计行大小的变量
  118. var totalRowSize int64 = 0
  119. // 遍历所有 CSV 文件,启动协程处理
  120. var fileWg sync.WaitGroup
  121. // 控制最大并发数
  122. maxGoroutines := 5
  123. semaphore := make(chan struct{}, maxGoroutines)
  124. for _, file := range files {
  125. if !file.IsDir() && strings.HasPrefix(file.Name(), tableName+"_") && strings.HasSuffix(file.Name(), ".csv") {
  126. // 满足条件的文件
  127. filePath := filepath.Join(tableDir, file.Name())
  128. result.GeneratedFiles++
  129. fileWg.Add(1)
  130. result.GoroutineCount++
  131. logMessage(result, fmt.Sprintf("发现文件:%s,启动协程进行处理\n", filePath), logger)
  132. go func(filePath string) {
  133. defer fileWg.Done()
  134. semaphore <- struct{}{} // 获取信号量
  135. defer func() { <-semaphore }() // 释放信号量
  136. logMessage(result, fmt.Sprintf("开始处理文件:%s\n", filePath), logger)
  137. // 处理单个 CSV 文件
  138. err := processCSVFile(filePath, tableName, delimiter, pageSize, mysqeExec, result, &totalRowSize, logger)
  139. if err != nil {
  140. result.ErrorCount++
  141. logMsg := fmt.Sprintf("处理文件 %s 失败:%v\n", filePath, err)
  142. logMessage(result, logMsg, logger)
  143. } else {
  144. logMessage(result, fmt.Sprintf("文件 %s 处理成功\n", filePath), logger)
  145. }
  146. }(filePath)
  147. }
  148. }
  149. // 等待所有文件处理完毕
  150. fileWg.Wait()
  151. // 计算统计信息
  152. result.ExportDuration = time.Since(startTime)
  153. if result.TotalRows > 0 {
  154. result.AverageRowSize = totalRowSize / int64(result.TotalRows)
  155. }
  156. logMessage(result, fmt.Sprintf("表 %s 的数据处理完成,耗时:%v\n", tableName, result.ExportDuration), logger)
  157. return *result
  158. }
  159. // 处理单个 CSV 文件的函数
  160. func processCSVFile(
  161. filePath string,
  162. tableName string,
  163. delimiter rune,
  164. pageSize int,
  165. mysqeExec db_executor.DBExecutor,
  166. result *TableResult,
  167. totalRowSize *int64,
  168. logger *log.Logger,
  169. ) error {
  170. // 打开 CSV 文件
  171. file, err := os.Open(filePath)
  172. if err != nil {
  173. return fmt.Errorf("打开文件失败:%v", err)
  174. }
  175. defer file.Close()
  176. logMessage(result, fmt.Sprintf("打开文件成功:%s\n", filePath), logger)
  177. // 获取文件大小,更新表的容量
  178. fileInfo, err := file.Stat()
  179. if err == nil {
  180. result.TableSize += fileInfo.Size()
  181. logMessage(result, fmt.Sprintf("文件大小:%d 字节,累计表大小:%d 字节\n", fileInfo.Size(), result.TableSize), logger)
  182. }
  183. // 创建 CSV Reader
  184. reader := csv.NewReader(file)
  185. reader.Comma = delimiter
  186. // 读取表头
  187. headers, err := reader.Read()
  188. if err != nil {
  189. return fmt.Errorf("读取表头失败:%v", err)
  190. }
  191. logMessage(result, fmt.Sprintf("读取表头成功:%v\n", headers), logger)
  192. // 初始化记录批次
  193. var recordsBatch [][]string
  194. for {
  195. // 读取一行记录
  196. record, err := reader.Read()
  197. if err == io.EOF {
  198. // 文件读取完毕,插入剩余的记录
  199. if len(recordsBatch) > 0 {
  200. logMessage(result, fmt.Sprintf("文件读取完毕,插入剩余的 %d 条记录到数据库\n", len(recordsBatch)), logger)
  201. err = mysqeExec.InsertRecordsToDB(tableName, headers, recordsBatch)
  202. if err != nil {
  203. return fmt.Errorf("插入数据库失败:%v", err)
  204. }
  205. // 更新总行数
  206. result.TotalRows += len(recordsBatch)
  207. // 计算行大小
  208. for _, rec := range recordsBatch {
  209. *totalRowSize += int64(len(strings.Join(rec, "")))
  210. }
  211. logMessage(result, fmt.Sprintf("成功插入剩余记录,累计总行数:%d\n", result.TotalRows), logger)
  212. }
  213. break
  214. }
  215. if err != nil {
  216. return fmt.Errorf("读取记录失败:%v", err)
  217. }
  218. // 添加到批次
  219. recordsBatch = append(recordsBatch, record)
  220. // 如果达到批次大小,插入数据库
  221. if len(recordsBatch) >= pageSize {
  222. logMessage(result, fmt.Sprintf("达到批次大小 %d,插入记录到数据库\n", pageSize), logger)
  223. err = mysqeExec.InsertRecordsToDB(tableName, headers, recordsBatch)
  224. if err != nil {
  225. return fmt.Errorf("插入数据库失败:%v", err)
  226. }
  227. // 更新总行数
  228. result.TotalRows += len(recordsBatch)
  229. // 计算行大小
  230. for _, rec := range recordsBatch {
  231. *totalRowSize += int64(len(strings.Join(rec, "")))
  232. }
  233. logMessage(result, fmt.Sprintf("成功插入记录,累计总行数:%d\n", result.TotalRows), logger)
  234. // 清空批次
  235. recordsBatch = recordsBatch[:0]
  236. }
  237. }
  238. return nil
  239. }